# 创建连接相关
from sqlalchemy import create_engine
# 和 sqlapi 交互,执行转换后的 sql 语句,用于创建基类
from sqlalchemy.ext.declarative import declarative_base
# 创建表中的字段(列)
from sqlalchemy import Column
# 表中字段的属性
from sqlalchemy import Column, String, Integer, BigInteger, Boolean, Text, SmallInteger, Float, DateTime, Date
from sqlalchemy import UniqueConstraint, Index
from sqlalchemy.orm import sessionmaker, relationship
conn_str = '{mysql_conn}://{username}:{password}@{host}/{dbname}?charset=utf8&autocommit=true'.format(
mysql_conn=settings.DB_CONN if settings.DB_CONN else 'mysql',
username=settings.DB_USERNAME,
password=settings.DB_PASSWORD,
host=settings.DB_HOST,
dbname=settings.DB_NAME)
engine = create_engine(conn_str, pool_size=50, max_overflow=50,
pool_recycle=3600, echo_pool=False, echo=settings.DB_ECHO)
# 创建基类
Base = declarative_base()
# 创建单表
class Person(Base):
__tablename__ = 'person' # 表名 - 必有属性
id = Column(Integer, primary_key=True) # 每个表都要设定一个主键 - 必有属性
# 设置单表字段
name = Column(String(32))
age = Column(Integer, default=0, nullable=True) # 设置字段属性
__table_args__ = (
# 设置联合唯一
UniqueConstraint('id', 'name', name='uix_id_name'),
# 建立索引
Index('uix_id_name', 'name'),
)
def int_db():
"""创建所有定义的表到数据库中"""
Base.metadata.create_all(engine)
def drop_db():
"""从数据库中删除所有定义的表"""
Base.metadata.drop_all(engine)
if __name__ == "__main__":
# 执行创建表
#init_db()
# 创建会话实例对象
Session = sessionmaker(bind=engine)
session = Session()
session.query(类名/表名)
: 返回的是对象
session.query(类名.字段名)
: 返回的是含有该字段的元组对象
# 所有数据,且结果集中是一个一个的对象
ret = session.query(Person).all()
# 结果 [obj1, obj2, obj3]
# 指定字段查询,返回所有的数据,是一个列表,列表内是一个一个的元组
ret = session.query(Person.name, Person.age).all()
# 结果 [('augustrush', '18'), ('taylorswift', '19'), ('Jaychou', '23')]
for name, age in session.query(Person.name, Person.age).all():
print(name, age)
'''
# 输出结果
yangge 18
qiangge 19
shark 23
'''
可以使用label()
给每个列名起别名
for row in session.query(Person.name.label('p_name')).all():
print(row.p_name)
filter_by()
接收的是关键字参数, filter_by本质上最后还是要调用filter。
filter()
允许使用Python的比较或关系运算符,实现更灵活的查询
# filter_by()
ret = session.query(Person).filter_by(name='yangge').first()
# 结果 Person(id=2, name=yangge, age=18, city=BeiJing)
# filter()
ret = session.query(Person).filter(Person.name=='yangge').first()
# 结果 Person(id=2, name=yangge, age=18, city=BeiJing)
以下适用于filter()
query = session.query(Person)
以下查询都是以这个查询对象为基础的过滤
results = query.filter(Person.name == 'august rush').all()
results = query.filter(Person.name != 'august rush').all()
在某些数据库中,这个可能会不区分大小写,也有可能区分大小写。
results = query.filter(Person.name.like('%august%')).all()
确保忽略大小写, 大部分数据库不支持 ilike
results = query.filter(Person.name.ilike('%AUGUST%')).all()
results = query.filter(Person.id.in_([1, 2])).all()
使用波浪号~ 表示非
results = query.filter(~Person.id.in_([2, 3])).all()
使用 between 表示范围
results = query.filter(Person.id.between(1, 3)).all() # 在这个范围内
results = query.filter(~Person.id.between(1, 3)).all() # 不在这个范围内
数据库中的空字符串不是 NUll , python 中的 None 存到数据库中是 NULL。
results = query.filter(Person.name == None).all()
# 或者
results = query.filter(Person.name.is_(None)).all()
results = query.filter(Person.name != None).all()
# 或者
results = query.filter(~Person.name.is_(None)).all()
# 或者
results = query.filter(Person.name.isnot(None)).all()
# 使用 and_()
from sqlalchemy import and_
results = query.filter(and_(Person.name == 'august rush', Person.age == 23)).all()
# 或者使用逗号
query.filter(Person.name == 'august rush', Person.age == 23).all()
from sqlalchemy import or_
results = query.filter(or_(Person.name == 'august rush', Person.age == 33)).all()
results = query.filter(or_(
Person.id > 1,
and_(Person.name == 'august rush', Person.id < 10)
)).all()
all() 返回的是所有的结果集,是列表
first() 返回的是所有结果集中的第一个数据
ret = session.query(Person).all()
# 结果 [obj1,obj2,obj3]
ret = session.query(Person).first()
# 结果 obj1
提取结果集中的所有数据,假如没有或者数据多于一条则会报错
找到后返回的是一个元组
try:
results = query.filter(Person.id == 3).one()
print(results)
except sqlalchemy.exc.MultipleResultsFound as e:
print(e)
except sqlalchemy.exc.NoResultFound as e:
print(e)
和 one() 一样,但是没找到返回 None
try:
results = query.filter(Person.id == 3).one_or_none()
print(results)
except sqlalchemy.exc.MultipleResultsFound as e:
print(e)
scalar() 调用 one() 方法,找不到,返回 None
找到后返回的是赤裸裸的数据
try:
results = query.filter(Person.id > 0).scalar()
print(results)
except sqlalchemy.exc.MultipleResultsFound as e:
print(e)
使用 python 的切片控制输出多少行
results = query.all()[0:1]
print(results)
# 正序
results = query.order_by(Person.id).all()
# 倒序
results = query.order_by(Person.id.desc()).all()
# 先按名字排序,假如有相同的再按照 id 排序
results = query.order_by(Person.name, Person.id).all()
count = query.count()
session.query(Person).filter(Person.age=='18').count()
results = query.filter(Person.id.in_(
session.query(Person.id).filter(Person.id > 0)
)).all()
from sqlalchemy.sql import func
# 统计表中所有的数据
results = session.query(func.count('*')).select_from(Person).first()
# 以年龄分组,并统计每组的数据数量
results = session.query(func.count(Person.age)).group_by(Person.age).all()
# 以年龄为分组,并统计每组的最大/最小 id 号,年龄总和/平均值,
results = session.query(func.max(Person.id), func.min(Person.id), func.sum(Person.age), func.avg(Person.age))\
.group_by(Person.age).all()
# 从分组的数据中再查找需要的数据
results = session.query(func.max(Person.id), func.min(Person.id), func.sum(Person.age), func.avg(Person.age))\
.group_by(Person.age).having(func.min(Person.id) > 1).all()
# 再创建一个表
class Student(Base):
__tablename__ = 'student'
id = Column(Integer,primary_key=True)
name = Column(String(12))
age = Column(String(2))
city = Column(String(16))
# 组合 用一条数据将两个表中的要查询的数据组合在一张表里展示出来
q1 = session.query(Teacher.name).filter(Teacher.id > 2)
q2 = session.query(Student.name).filter(Student.id < 2)
## 去重
ret = q1.union(q2).all()
## 不去重
q1 = session.query(Teacher.name).filter(Teacher.id > 2)
q2 = session.query(Student.name).filter(Student.id < 2)
ret = q1.union_all(q2).all()
关于union联合查询有一个说法很形象:join查询就像是横向扩展,将多张表的数据横向组合在一起,而union像是纵向扩展,将多张表数据纵向排列起来
# 多表联查
results = session.query(Person, Job).filter(Person.job_id == Job.id).all()
''' 结果
[(Person(id=1, name=august rush, age=23), Job(id=1, name=python developer, salary=10000.0, location=zhengzhou)), (Person(id=2, name=taylor swift, age=33), Job(id=5, name=singer, salary=100000.0, location=new york))]
'''
# 使用join() inner join
results = session.query(Person, Job).join(Job, Person.job_id == Job.id).all()
''' sql
SELECT person.id AS person_id, person.job_id AS person_job_id, person.name AS person_name, person.age AS person_age, job.id AS job_id, job.name AS job_name, job.salary AS job_salary, job.location AS job_location
FROM person INNER JOIN job ON person.job_id = job.id
'''
''' 结果
[(Person(id=1, name=august rush, age=23), Job(id=1, name=python developer, salary=10000.0, location=zhengzhou)), (Person(id=2, name=taylor swift, age=33), Job(id=5, name=singer, salary=100000.0, location=new york))]
'''
# 使用join() left join
results = session.query(Person, Job).join(Job, Person.job_id == Job.id, isouter=True).all()
results = session.query(Person, Job).outerjoin(Job, Person.job_id == Job.id).all()
'''sql
SELECT person.id AS person_id, person.job_id AS person_job_id, person.name AS person_name, person.age AS person_age, job.id AS job_id, job.name AS job_name, job.salary AS job_salary, job.location AS job_location
FROM person LEFT OUTER JOIN job ON person.job_id = job.id
'''
'''结果
[(Person(id=1, name=august rush, age=23), Job(id=1, name=python developer, salary=10000.0, location=zhengzhou)), (Person(id=2, name=taylor swift, age=33), Job(id=5, name=singer, salary=100000.0, location=new york))]
'''
基于Nginx+Supervisord+uWSGI+Django1.11.1+Python3.6.5构建