SQLAlchemy
安装:
pip3 install sqlalchemy
用SQLAlchemy创建表:
#导入这些模块 from sqlalchemy.orm import sessionmaker from sqlalchemy import create_engine from sqlalchemy.ext.declarative import declarative_base from sqlalchemy import Column,Integer,String,create_engine Base = declarative_base() #定义这个类,和下面的表名不是一个属性 class User(Base): __tablename__ = 'users' id = Column(Integer,primary_key=True) name=Column(String(32),index=True,nullable=False) engine = create_engine( "mysql+pymysql://root:123456@127.0.0.1:3306/liuyandb?charset=utf8", max_overflow=0, # 超过连接池大小外最多创建的连接 pool_size=5, # 连接池大小 pool_timeout=30, # 池中没有线程最多等待的时间,否则报错 pool_recycle=-1 # 多久之后对线程池中的线程进行一次连接的回收(重置) ) if __name__ =="__main__": Base.metadata.create_all(engine)
一、新增
#创建连接 from sqlalchemy.orm import sessionmaker from sqlalchemy import create_engine from models import Users engine = create_engine( "mysql+pymysql://root:123456@127.0.0.1:3306/s9day120?charset=utf8", max_overflow=0, # 超过连接池大小外最多创建的连接 pool_size=5, # 连接池大小 pool_timeout=30, # 池中没有线程最多等待的时间,否则报错 pool_recycle=-1 # 多久之后对线程池中的线程进行一次连接的回收(重置) ) SessionFactory = sessionmaker(bind=engine) # 根据Users类对users表进行增删改查 session = SessionFactory()
1、单个数据新增
obj=User1(name="小叮当") #单个数据创建,创建个对象提交就行了 session.add(obj) session.commit()
2、多个对象新增
session.add_all([ User1(name="大头儿子"), User1(name="小头爸爸"), ]) session.commit()
二、查询
1、全部查询
result=session.query(User1).all() for i in result: print(i.name)
2、筛选
result=session.query(User1).filter(User1.id>1)
for i in result:
print(i.id,i.name)
3、取对象
result=session.query(User1).filter(User1.id>1).first()
print(result.name)
三、删除
session.query(User1).filter(User1.id ==6).delete()
session.commit()
四、改
session.query(User1).filter(User1.id==8).update({User1.name:"二师兄"}) session.commit()
在对字符串进行拼接的时候,会被默认为数字,操作不会执行,所以要指定是字符串拼接:synchronize_session=False
session.query(User1).filter(User1.id==8).update({User1.name:"二师兄"+"过河"},synchronize_session=False) session.commit()
五、关闭数据库
def drop_all(): engine = create_engine( "mysql+pymysql://root:123456@127.0.0.1:3306/s9day120?charset=utf8", max_overflow=0, # 超过连接池大小外最多创建的连接 pool_size=5, # 连接池大小 pool_timeout=30, # 池中没有线程最多等待的时间,否则报错 pool_recycle=-1 # 多久之后对线程池中的线程进行一次连接的回收(重置) ) Base.metadata.drop_all(engine)
六、SQLAlchemy的其他常用操作
# ############################## 其他常用 ############################### # 1. 指定列 select id,name as cname from users;
result = session.query(Users.id,Users.name.label('cname')).all()
for item in result:
print(item[0],item.id,item.cname) # 2. 默认条件and
session.query(Users).filter(Users.id > 1, Users.name == 'eric').all() # 3. between
session.query(Users).filter(Users.id.between(1, 3), Users.name == 'eric').all() # 4. in
session.query(Users).filter(Users.id.in_([1,3,4])).all()
session.query(Users).filter(~Users.id.in_([1,3,4])).all() # 5. 子查询
session.query(Users).filter(Users.id.in_(session.query(Users.id).filter(Users.name=='eric'))).all() # 6. and 和 or 默认是and
from sqlalchemy import and_, or_
session.query(Users).filter(Users.id > 3, Users.name == 'eric').all()
session.query(Users).filter(and_(Users.id > 3, Users.name == 'eric')).all()
session.query(Users).filter(or_(Users.id < 2, Users.name == 'eric')).all()
session.query(Users).filter(or_(Users.id < 2,and_(Users.name == 'eric', Users.id > 3),Users.extra != "")).all() # 7. filter_by #它是根据字段去查询的,正常的其他是根据条件查询到
session.query(Users).filter_by(name='alex').all() # 8. 通配符
ret = session.query(Users).filter(Users.name.like('e%')).all()
ret = session.query(Users).filter(~Users.name.like('e%')).all() # 9. 切片
result = session.query(Users)[1:2] # 10.排序
ret = session.query(Users).order_by(Users.name.desc()).all()
ret = session.query(Users).order_by(Users.name.desc(), Users.id.asc()).all() #先根据第一个条件Users.name排序,第一个条件相同再根据第二个条件排序 # 11. group by from sqlalchemy.sql import func ret = session.query(
Users.depart_id,
func.count(Users.id), ).group_by(Users.depart_id).all()
for item in ret:
print(item)
from sqlalchemy.sql import func
ret = session.query(
Users.depart_id,
func.count(Users.id), ).group_by(Users.depart_id).having(func.count(Users.id) >= 2).all() #用group_by排序之后,后面的条件只能用having不能用where
for item in ret:
print(item) 12.union 和 union all (前提是两个表的字段要一致)union是把两个表上下联系起来组合成一张表,去重,union_all和它相同,不去重。 """ select id,name from users UNION select id,name from users; """ q1 = session.query(Users.name).filter(Users.id > 2)
q2 = session.query(Favor.caption).filter(Favor.nid < 2)
ret = q1.union(q2).all()
q1 = session.query(Users.name).filter(Users.id > 2)
q2 = session.query(Favor.caption).filter(Favor.nid < 2)
ret = q1.union_all(q2).all() session.close()
7、更多查询
关于 一对多,多对多(两个一对多),先要创建这些关系
from sqlalchemy.ext.declarative import declarative_base from sqlalchemy import Column from sqlalchemy import Integer,String,Text,Date,DateTime,ForeignKey from sqlalchemy import create_engine from sqlalchemy.orm import relationship Base = declarative_base() class Depart(Base): __tablename__ = 'depart' id = Column(Integer, primary_key=True) title = Column(String(32), index=True, nullable=False) class User1(Base): __tablename__ = 'users' id = Column(Integer, primary_key=True) name = Column(String(32), index=True, nullable=False) depart_id = Column(Integer,ForeignKey("depart.id")) dp=relationship("Depart",backref="pers") def create_all(): engine = create_engine( "mysql+pymysql://root:123456@127.0.0.1:3306/liuyandb?charset=utf8", max_overflow=0, # 超过连接池大小外最多创建的连接 pool_size=5, # 连接池大小 pool_timeout=30, # 池中没有线程最多等待的时间,否则报错 pool_recycle=-1 # 多久之后对线程池中的线程进行一次连接的回收(重置) ) Base.metadata.create_all(engine) def drop_all(): engine = create_engine( "mysql+pymysql://root:123456@127.0.0.1:3306/liuyandb?charset=utf8", max_overflow=0, # 超过连接池大小外最多创建的连接 pool_size=5, # 连接池大小 pool_timeout=30, # 池中没有线程最多等待的时间,否则报错 pool_recycle=-1 # 多久之后对线程池中的线程进行一次连接的回收(重置) ) Base.metadata.drop_all(engine) if __name__ == '__main__': # drop_all() create_all()
使用:
from sqlalchemy.orm import sessionmaker from sqlalchemy import create_engine from models import Users,Depart engine = create_engine( "mysql+pymysql://root:123456@127.0.0.1:3306/s9day120?charset=utf8", max_overflow=0, # 超过连接池大小外最多创建的连接 pool_size=5, # 连接池大小 pool_timeout=30, # 池中没有线程最多等待的时间,否则报错 pool_recycle=-1 # 多久之后对线程池中的线程进行一次连接的回收(重置) ) SessionFactory = sessionmaker(bind=engine) # 根据Users类对users表进行增删改查 session = SessionFactory() 1. 查询所有用户 ret = session.query(Users).all() for row in ret: print(row.id,row.name,row.depart_id) ret=session.query(Depart) for i in ret: print(i.title) 2. 查询所有用户+所属部门名称 ret = session.query(Users.id,Users.name,Depart.title).join(Depart,Users.depart_id == Depart.id).all() for row in ret: print(row.id,row.name,row.title) query = session.query(Users.id,Users.name,Depart.title).join(Depart,Users.depart_id == Depart.id,isouter=True) print(query) ret=session.query(User1.id,User1.name,Depart.title).join(Depart,User1.depart_id == Depart.id).all() for i in ret: print(i.id,i.name,i.title) 3. relation字段:查询所有用户+所属部门名称 ret = session.query(Users).all() for row in ret: print(row.id,row.name,row.depart_id,row.dp.title) ret=session.query(User1).all() for i in ret: print(i.id,i.name,i.dp.title) 4. relation字段:查询销售部所有的人员 obj = session.query(Depart).filter(Depart.title == '销售').first() for row in obj.pers: print(row.id,row.name,obj.title) 5. 创建一个名称叫:IT部门,再在该部门中添加一个员工:田硕 方式一: d1 = Depart(title='IT') session.add(d1) session.commit() u1 = Users(name='田硕',depart_id=d1.id) session.add(u1) session.commit() 方式二: u1 = Users(name='田硕',dp=Depart(title='IT')) session.add(u1) session.commit() ret=session.query(Depart).filter(Depart.title=="销售").first() for i in ret.pers: print(i.id,i.name,ret.title) 6. 创建一个名称叫:王者荣耀,再在该部门中添加一个员工:龚林峰/长好梦/王爷们 d1 = Depart(title='王者荣耀') d1.pers = [Users(name='龚林峰'),Users(name='长好梦'),Users(name='王爷们'),] session.add(d1) session.commit() u=Depart(title="剑客") u.pers=[ User1(name="三毛"), User1(name="葫芦娃"), User1(name="金角"), ] session.add(u) session.commit() session.close()
8、连接的两种方式
- 连接两种方式: 方式一: from sqlalchemy.orm import sessionmaker from sqlalchemy import create_engine from models import Student,Course,Student2Course engine = create_engine( "mysql+pymysql://root:123456@127.0.0.1:3306/s9day120?charset=utf8", max_overflow=0, # 超过连接池大小外最多创建的连接 pool_size=5, # 连接池大小 pool_timeout=30, # 池中没有线程最多等待的时间,否则报错 pool_recycle=-1 # 多久之后对线程池中的线程进行一次连接的回收(重置) ) SessionFactory = sessionmaker(bind=engine) def task(): # 去连接池中获取一个连接 session = SessionFactory() ret = session.query(Student).all() # 将连接交还给连接池 session.close() from threading import Thread for i in range(20): t = Thread(target=task) t.start() 方式二:(推荐,基于Threading.local实现) from sqlalchemy.orm import sessionmaker from sqlalchemy import create_engine from sqlalchemy.orm import scoped_session from models import Student,Course,Student2Course engine = create_engine( "mysql+pymysql://root:123456@127.0.0.1:3306/s9day120?charset=utf8", max_overflow=0, # 超过连接池大小外最多创建的连接 pool_size=5, # 连接池大小 pool_timeout=30, # 池中没有线程最多等待的时间,否则报错 pool_recycle=-1 # 多久之后对线程池中的线程进行一次连接的回收(重置) ) SessionFactory = sessionmaker(bind=engine) session = scoped_session(SessionFactory) def task(): ret = session.query(Student).all() # 将连接交还给连接池 session.remove() from threading import Thread for i in range(20): t = Thread(target=task) t.start() - 执行原生SQL from sqlalchemy.orm import sessionmaker from sqlalchemy import create_engine from sqlalchemy.orm import scoped_session from models import Student,Course,Student2Course engine = create_engine( "mysql+pymysql://root:123456@127.0.0.1:3306/s9day120?charset=utf8", max_overflow=0, # 超过连接池大小外最多创建的连接 pool_size=5, # 连接池大小 pool_timeout=30, # 池中没有线程最多等待的时间,否则报错 pool_recycle=-1 # 多久之后对线程池中的线程进行一次连接的回收(重置) ) SessionFactory = sessionmaker(bind=engine) session = scoped_session(SessionFactory) def task(): """""" # 方式一: """ # 查询 # cursor = session.execute('select * from users') # result = cursor.fetchall() # 添加 cursor = session.execute('INSERT INTO users(name) VALUES(:value)', params={"value": 'wupeiqi'}) session.commit() print(cursor.lastrowid) """ # 方式二: """ # conn = engine.raw_connection() # cursor = conn.cursor() # cursor.execute( # "select * from t1" # ) # result = cursor.fetchall() # cursor.close() # conn.close() """ # 将连接交还给连接池 session.remove() from threading import Thread for i in range(20): t = Thread(target=task) t.start()优质内容筛选与推荐>>