安装:

 pip3 install sqlalchemy

用SQLAlchemy创建表:

#导入这些模块
from sqlalchemy.orm import sessionmaker
from sqlalchemy import create_engine
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy import Column,Integer,String,create_engine

Base = declarative_base()

#定义这个类,和下面的表名不是一个属性
class User(Base):
    __tablename__ = 'users'

    id = Column(Integer,primary_key=True)
    name=Column(String(32),index=True,nullable=False)

engine = create_engine(
        "mysql+pymysql://root:123456@127.0.0.1:3306/liuyandb?charset=utf8",  
        max_overflow=0,  # 超过连接池大小外最多创建的连接
        pool_size=5,  # 连接池大小
        pool_timeout=30,  # 池中没有线程最多等待的时间,否则报错
        pool_recycle=-1  # 多久之后对线程池中的线程进行一次连接的回收(重置)
    )


if __name__ =="__main__":
    Base.metadata.create_all(engine)

一、新增

#创建连接
from sqlalchemy.orm import sessionmaker
from sqlalchemy import create_engine
from models import Users

engine = create_engine(
        "mysql+pymysql://root:123456@127.0.0.1:3306/s9day120?charset=utf8",
        max_overflow=0,  # 超过连接池大小外最多创建的连接
        pool_size=5,  # 连接池大小
        pool_timeout=30,  # 池中没有线程最多等待的时间,否则报错
        pool_recycle=-1  # 多久之后对线程池中的线程进行一次连接的回收(重置)
    )
SessionFactory = sessionmaker(bind=engine)

# 根据Users类对users表进行增删改查
session = SessionFactory()

1、单个数据新增

obj=User1(name="小叮当")  #单个数据创建,创建个对象提交就行了
session.add(obj)
session.commit()

2、多个对象新增

session.add_all([
    User1(name="大头儿子"),
    User1(name="小头爸爸"),
])
session.commit()

二、查询

1、全部查询

result=session.query(User1).all()
for i in result:
    print(i.name)

2、筛选

result=session.query(User1).filter(User1.id>1)
for i in result:
    print(i.id,i.name)

3、取对象

result=session.query(User1).filter(User1.id>1).first()
print(result.name)

三、删除

session.query(User1).filter(User1.id ==6).delete()
session.commit()

四、改

session.query(User1).filter(User1.id==8).update({User1.name:"二师兄"})
session.commit()

在对字符串进行拼接的时候,会被默认为数字,操作不会执行,所以要指定是字符串拼接:synchronize_session=False

session.query(User1).filter(User1.id==8).update({User1.name:"二师兄"+"过河"},synchronize_session=False)
session.commit()

五、关闭数据库

def drop_all():
    engine = create_engine(
        "mysql+pymysql://root:123456@127.0.0.1:3306/s9day120?charset=utf8",
        max_overflow=0,  # 超过连接池大小外最多创建的连接
        pool_size=5,  # 连接池大小
        pool_timeout=30,  # 池中没有线程最多等待的时间,否则报错
        pool_recycle=-1  # 多久之后对线程池中的线程进行一次连接的回收(重置)
    )
    Base.metadata.drop_all(engine)

六、SQLAlchemy的其他常用操作

# ############################## 其他常用 ###############################
# 1. 指定列 select id,name as cname from users;
result = session.query(Users.id,Users.name.label('cname')).all()

for item in result:
print(item[0],item.id,item.cname)
# 2. 默认条件and
session.query(Users).filter(Users.id > 1, Users.name == 'eric').all()
# 3. between
session.query(Users).filter(Users.id.between(1, 3), Users.name == 'eric').all() # 4. in
session.query(Users).filter(Users.id.in_([1,3,4])).all()

session.query(Users).filter(~Users.id.in_([1,3,4])).all()
# 5. 子查询
session.query(Users).filter(Users.id.in_(session.query(Users.id).filter(Users.name=='eric'))).all()
# 6. and 和 or 默认是and
from sqlalchemy import and_, or_

session.query(Users).filter(Users.id > 3, Users.name == 'eric').all()

session.query(Users).filter(and_(Users.id > 3, Users.name == 'eric')).all()

session.query(Users).filter(or_(Users.id < 2, Users.name == 'eric')).all()

session.query(Users).filter(
or_(Users.id < 2,and_(Users.name == 'eric', Users.id > 3),Users.extra != "")).all() # 7. filter_by #它是根据字段去查询的,正常的其他是根据条件查询到
session.query(Users).filter_by(name='alex').all()
# 8. 通配符
ret = session.query(Users).filter(Users.name.like('e%')).all()

ret = session.query(Users).filter(~Users.name.like('e%')).all()
# 9. 切片
result = session.query(Users)[1:2]
# 10.排序
ret = session.query(Users).order_by(Users.name.desc()).all()

ret = session.query(Users).order_by(Users.name.desc(), Users.id.asc()).all() #先根据第一个条件Users.name排序,第一个条件相同再根据第二个条件排序
# 11. group by from sqlalchemy.sql import func ret = session.query(
Users.depart_id,

func.count(Users.id),
).group_by(Users.depart_id).all()
for item in ret:

print(item)


from sqlalchemy.sql import func

ret = session.query(

Users.depart_id,

func.count(Users.id),
).group_by(Users.depart_id).having(func.count(Users.id) >= 2).all() #用group_by排序之后,后面的条件只能用having不能用where

for item in ret:

print(item)
12.union 和 union all (前提是两个表的字段要一致)union是把两个表上下联系起来组合成一张表,去重,union_all和它相同,不去重。 """ select id,name from users UNION select id,name from users; """ q1 = session.query(Users.name).filter(Users.id > 2)
q2 = session.query(Favor.caption).filter(Favor.nid < 2)

ret = q1.union(q2).all()


q1 = session.query(Users.name).filter(Users.id > 2)

q2 = session.query(Favor.caption).filter(Favor.nid < 2)

ret = q1.union_all(q2).all()
session.close()

7、更多查询

关于 一对多,多对多(两个一对多),先要创建这些关系

from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy import Column
from sqlalchemy import Integer,String,Text,Date,DateTime,ForeignKey
from sqlalchemy import create_engine
from sqlalchemy.orm import relationship


Base = declarative_base()

class Depart(Base):
    __tablename__ = 'depart'
    id = Column(Integer, primary_key=True)
    title = Column(String(32), index=True, nullable=False)

class User1(Base):
    __tablename__ = 'users'

    id = Column(Integer, primary_key=True)
    name = Column(String(32), index=True, nullable=False)
    depart_id = Column(Integer,ForeignKey("depart.id"))
    dp=relationship("Depart",backref="pers")

def create_all():
    engine = create_engine(
        "mysql+pymysql://root:123456@127.0.0.1:3306/liuyandb?charset=utf8",
        max_overflow=0,  # 超过连接池大小外最多创建的连接
        pool_size=5,  # 连接池大小
        pool_timeout=30,  # 池中没有线程最多等待的时间,否则报错
        pool_recycle=-1  # 多久之后对线程池中的线程进行一次连接的回收(重置)
    )

    Base.metadata.create_all(engine)

def drop_all():
    engine = create_engine(
        "mysql+pymysql://root:123456@127.0.0.1:3306/liuyandb?charset=utf8",
        max_overflow=0,  # 超过连接池大小外最多创建的连接
        pool_size=5,  # 连接池大小
        pool_timeout=30,  # 池中没有线程最多等待的时间,否则报错
        pool_recycle=-1  # 多久之后对线程池中的线程进行一次连接的回收(重置)
    )
    Base.metadata.drop_all(engine)

if __name__ == '__main__':
    # drop_all()
    create_all()

使用:

from sqlalchemy.orm import sessionmaker
from sqlalchemy import create_engine
from models import Users,Depart

engine = create_engine(
        "mysql+pymysql://root:123456@127.0.0.1:3306/s9day120?charset=utf8",
        max_overflow=0,  # 超过连接池大小外最多创建的连接
        pool_size=5,  # 连接池大小
        pool_timeout=30,  # 池中没有线程最多等待的时间,否则报错
        pool_recycle=-1  # 多久之后对线程池中的线程进行一次连接的回收(重置)
    )
SessionFactory = sessionmaker(bind=engine)

# 根据Users类对users表进行增删改查
session = SessionFactory()

1. 查询所有用户
ret = session.query(Users).all()
for row in ret:
    print(row.id,row.name,row.depart_id)

ret=session.query(Depart)
for i in ret:
    print(i.title)

2. 查询所有用户+所属部门名称
ret = session.query(Users.id,Users.name,Depart.title).join(Depart,Users.depart_id == Depart.id).all()
for row in ret:
    print(row.id,row.name,row.title)

query = session.query(Users.id,Users.name,Depart.title).join(Depart,Users.depart_id == Depart.id,isouter=True)
print(query)

ret=session.query(User1.id,User1.name,Depart.title).join(Depart,User1.depart_id == Depart.id).all()
for i in ret:
    print(i.id,i.name,i.title)

3. relation字段:查询所有用户+所属部门名称
ret = session.query(Users).all()
for row in ret:
    print(row.id,row.name,row.depart_id,row.dp.title)

ret=session.query(User1).all()
for i in ret:
    print(i.id,i.name,i.dp.title)

4. relation字段:查询销售部所有的人员
obj = session.query(Depart).filter(Depart.title == '销售').first()
for row in obj.pers:
    print(row.id,row.name,obj.title)



5. 创建一个名称叫:IT部门,再在该部门中添加一个员工:田硕
方式一:
d1 = Depart(title='IT')
session.add(d1)
session.commit()

u1 = Users(name='田硕',depart_id=d1.id)
session.add(u1)
session.commit()

方式二:
u1 = Users(name='田硕',dp=Depart(title='IT'))
session.add(u1)
session.commit()

ret=session.query(Depart).filter(Depart.title=="销售").first()
for i in ret.pers:
    print(i.id,i.name,ret.title)

6. 创建一个名称叫:王者荣耀,再在该部门中添加一个员工:龚林峰/长好梦/王爷们
d1 = Depart(title='王者荣耀')
d1.pers = [Users(name='龚林峰'),Users(name='长好梦'),Users(name='王爷们'),]
session.add(d1)
session.commit()

u=Depart(title="剑客")
u.pers=[
    User1(name="三毛"),
    User1(name="葫芦娃"),
    User1(name="金角"),
]

session.add(u)
session.commit()

session.close()

8、连接的两种方式

        - 连接两种方式:
            方式一:
                from sqlalchemy.orm import sessionmaker
                from sqlalchemy import create_engine
                from models import Student,Course,Student2Course

                engine = create_engine(
                        "mysql+pymysql://root:123456@127.0.0.1:3306/s9day120?charset=utf8",
                        max_overflow=0,  # 超过连接池大小外最多创建的连接
                        pool_size=5,  # 连接池大小
                        pool_timeout=30,  # 池中没有线程最多等待的时间,否则报错
                        pool_recycle=-1  # 多久之后对线程池中的线程进行一次连接的回收(重置)
                    )
                SessionFactory = sessionmaker(bind=engine)

                def task():
                    # 去连接池中获取一个连接
                    session = SessionFactory()

                    ret = session.query(Student).all()

                    # 将连接交还给连接池
                    session.close()


                from threading import Thread

                for i in range(20):
                    t = Thread(target=task)
                    t.start()

            方式二:(推荐,基于Threading.local实现)
                from sqlalchemy.orm import sessionmaker
                from sqlalchemy import create_engine
                from sqlalchemy.orm import scoped_session
                from models import Student,Course,Student2Course

                engine = create_engine(
                        "mysql+pymysql://root:123456@127.0.0.1:3306/s9day120?charset=utf8",
                        max_overflow=0,  # 超过连接池大小外最多创建的连接
                        pool_size=5,  # 连接池大小
                        pool_timeout=30,  # 池中没有线程最多等待的时间,否则报错
                        pool_recycle=-1  # 多久之后对线程池中的线程进行一次连接的回收(重置)
                    )
                SessionFactory = sessionmaker(bind=engine)
                session = scoped_session(SessionFactory)


                def task():
                    ret = session.query(Student).all()
                    # 将连接交还给连接池
                    session.remove()


                from threading import Thread

                for i in range(20):
                    t = Thread(target=task)
                    t.start()

    
        - 执行原生SQL
            from sqlalchemy.orm import sessionmaker
            from sqlalchemy import create_engine
            from sqlalchemy.orm import scoped_session
            from models import Student,Course,Student2Course

            engine = create_engine(
                    "mysql+pymysql://root:123456@127.0.0.1:3306/s9day120?charset=utf8",
                    max_overflow=0,  # 超过连接池大小外最多创建的连接
                    pool_size=5,  # 连接池大小
                    pool_timeout=30,  # 池中没有线程最多等待的时间,否则报错
                    pool_recycle=-1  # 多久之后对线程池中的线程进行一次连接的回收(重置)
                )
            SessionFactory = sessionmaker(bind=engine)
            session = scoped_session(SessionFactory)


            def task():
                """"""
                # 方式一:
                """
                # 查询
                # cursor = session.execute('select * from users')
                # result = cursor.fetchall()

                # 添加
                cursor = session.execute('INSERT INTO users(name) VALUES(:value)', params={"value": 'wupeiqi'})
                session.commit()
                print(cursor.lastrowid)
                """
                # 方式二:
                """
                # conn = engine.raw_connection()
                # cursor = conn.cursor()
                # cursor.execute(
                #     "select * from t1"
                # )
                # result = cursor.fetchall()
                # cursor.close()
                # conn.close()
                """

                # 将连接交还给连接池
                session.remove()


            from threading import Thread

            for i in range(20):
                t = Thread(target=task)
                t.start()

优质内容筛选与推荐>>
1、关于css3--ie9以下不兼容的那些事
2、移动端处理2倍图片和3倍图片
3、2018年国庆长假微信数据报告
4、POJ 1451 T9 <Trie> 找出现频率最高的字符串
5、重温JS


长按二维码向我转账

受苹果公司新规定影响,微信 iOS 版的赞赏功能被关闭,可通过二维码转账支持公众号。

    阅读
    好看
    已推荐到看一看
    你的朋友可以在“发现”-“看一看”看到你认为好看的文章。
    已取消,“好看”想法已同步删除
    已推荐到看一看 和朋友分享想法
    最多200字,当前共 发送

    已发送

    朋友将在看一看看到

    确定
    分享你的想法...
    取消

    分享想法到看一看

    确定
    最多200字,当前共

    发送中

    网络异常,请稍后重试

    微信扫一扫
    关注该公众号





    联系我们

    欢迎来到TinyMind。

    关于TinyMind的内容或商务合作、网站建议,举报不良信息等均可联系我们。

    TinyMind客服邮箱:support@tinymind.net.cn