Flask数据库

2023年 8月 12日 62.2k 0

一 数据库的设置

WEB应用中普遍使用的是关系模型的数据库,关系型数据库把所有的数据都存储在表中,表用来给应用的实体建模,表的列数是固定的,行数是可变的。它使用结构化的查询语言。关系型数据库的列定义了表中表示的实体的数据属性。比如:商品表里有name、price、number等。 flask本身不限定数据库的选择,你可以选择sql或NoSQL的任何一种。也可以选择更方便的SQLALchemy,类似于Django的ORM。SQLALchemy实际上是对数据库的抽象,让开发者不用直接和SQL语句打交道,而是通过python对象来操作数据库,在舍弃一些性能开销的同时,换来的是开发效率的较大提升。

SQLAlchemy是一个关系型数据库框架,它提供了高层的ORM和底层的原生数据库的操作。flask-sqlalchemy是一个简化了SQLAlchemy操作的flask扩展。

在Flask中使用mysql数据库,需要安装一个flask-sqlalchemy的扩展。

pip3 install flask-sqlalchemy

要连接Mysql数据库,仍需要安装flask-mysqldb

pip3 install flask-mysqldb

 

使用Flask-SQLAlchemy管理数据库

使用Flask-SQLAlchemy扩展操作数据库,首先需要建立数据库连接。数据库连接通过URL指定,而且程序使用的数据库必须保存到Flask配置对象的SQLALCHEMY_DATABASE_URI键中。

Flask的数据库设置:

app.config['SQLALCHEMY_DATABASE_URI'] = 'mysql://root:mysql@127.0.0.1:3306/test'

 

常用的SQLAlchemy字段类型

类型名 Python中类型 说明
Integer int 普通整数,一般是32位
SmallInteger int 取值范围小的整数,一般是16位
BigInteger int或long 不限制精度的整数
Float float 浮点数
Numeric decimal.Decimal 普通整数,一般是32位
String str 变长字符串
Text str 变长字符串,对较长或不限长度的字符串做了优化
Unicode unicode 变长Unicode字符串
UnicodeText unicode 变长Unicode字符串,对较长或不限长度的字符串做了优化
Boolean bool 布尔值
Date datetime.date 时间
Time datetime.datetime 日期和时间
LargeBinary str 二进制文件

常用的SQLAlchemy列选项

选项名 说明
primary_key 如果为True,代表表的主键
unique 如果为True,代表这列不允许出现重复的值
index 如果为True,为这列创建索引,提高查询效率
nullable 如果为True,允许有空值,如果为False,不允许有空值
default 为这列定义默认值

 

常用的SQLAlchemy关系选项

选项名 说明
backref 在关系的另一模型中添加反向引用
primary join 明确指定两个模型之间使用的联结条件
uselist 如果为False,不使用列表,而使用标量值
order_by 指定关系中记录的排序方式
secondary 指定多对多中记录的排序方式
secondary join 在SQLAlchemy中无法自行决定时,指定多对多关系中的二级联结条件

 

二 自定义模型类

模型表示程序使用的数据实体,在Flask-SQLAlchemy中,模型一般是Python类,继承自db.Model,db是SQLAlchemy类的实例,代表程序使用的数据库。

类中的属性对应数据库表中的列。id为主键,是由Flask-SQLAlchemy管理。db.Column类构造函数的第一个参数是数据库列和模型属性类型。

如下示例:定义了两个模型类,用户和角色。

from flask import Flask
from flask_sqlalchemy import SQLAlchemy

app = Flask(__name__)

class Config(object):
"""配置参数"""
# sqlalchemy的配置参数
SQLALCHEMY_DATABASE_URI = "mysql://root:123456@127.0.0.1:3306/db_flask"
# 设置成 True,SQLAlchemy 将会追踪对象的修改并且发送信号。这需要额外的内存, 如果不必要的可以禁用它。
SQLALCHEMY_TRACK_MODIFICATioNS = True

# 实例化SQLAlchemy对象
app.config.from_object(Config)

# 创建数据库sqlalchemy工具对象
db = SQLAlchemy(app)

# 表名常见规范
# ihome --> ih_user 数据库缩写_表名
# tbl_user --> tbl_表名

# 创建数据库模型类

class Role(db.Model):
"""用户表"""
__tablename__ = "tbl_roles"

id = db.Column(db.Integer, primary_key=True)
name = db.Column(db.String(64), unique=True)
users = db.relationship("User", backref="role")

class User(db.Model):
"""用户表"""
__tablename__ = "tbl_users" # 指明数据库的表名

id = db.Column(db.Integer, primary_key=True) # 整型的主键, 会默认设置为自增主键
name = db.Column(db.String(64), unique=True)
email = db.Column(db.String(128))
passWord = db.Column(db.String(128))
role_id = db.Column(db.Integer, db.ForeignKey("tbl_roles.id"))

View Code

 

三 数据库基本操作

在Flask-SQLAlchemy中,插入、修改、删除操作,均由数据库会话管理。会话用db.session表示。在准备把数据写入数据库前,要先将数据添加到会话中然后调用commit()方法提交会话。

数据库会话是为了保证数据的一致性,避免因部分更新导致数据不一致。提交操作把会话对象全部写入数据库,如果写入过程发生错误,整个会话都会失效。

数据库会话也可以回滚,通过db.session.rollback()方法,实现会话提交数据前的状态。

在Flask-SQLAlchemy中,查询操作是通过query对象操作数据。最基本的查询是返回表中所有数据,可以通过过滤器进行更精确的数据库查询。

创建表:

db.create_all()

删除表:

db.drop_all()

插入一条数据:

# 创建对象
ro1 = Role(name='admin')
# session记录对象任务
db.session.add(ro1)
# 提交任务到数据库
db.session.commit()

ro2 = Role(name='user')
db.session.add(ro2)
db.session.commit()

View Code

一次插入多条数据

us1 = User(name='wang', email='wang@163.com', password='123456', role_id=ro1.id)
us2 = User(name='zhang', email='zhang@163.com', password='201512', role_id=ro2.id)
us3 = User(name='chen', email='chen@126.com', password='987654', role_id=ro2.id)
us4 = User(name='zhou', email='zhou@163.com', password='456789', role_id=ro1.id)

# 一次保存多条数据
db.session.add_all([us1, us2, us3, us4])
db.session.commit()

View Code

 

3.1 查询:

常用的SQLAlchemy查询过滤器

过滤器 说明
filter() 把过滤器添加到原查询上,返回一个新查询
filter_by() 把等值过滤器添加到原查询上,返回一个新查询
limit 使用指定的值限定原查询返回的结果
offset() 偏移原查询返回的结果,返回一个新查询
order_by() 根据指定条件对原查询结果进行排序,返回一个新查询
group_by() 根据指定条件对原查询结果进行分组,返回一个新查询

常用的SQLAlchemy查询执行器

方法 说明
all() 以列表形式返回查询的所有结果
first() 返回查询的第一个结果,如果未查到,返回None
first_or_404() 返回查询的第一个结果,如果未查到,返回404
get() 返回指定主键对应的行,如不存在,返回None
get_or_404() 返回指定主键对应的行,如不存在,返回404
count() 返回查询结果的数量
paginate() 返回一个Paginate对象,它包含指定范围内的结果

filter_by精确查询:

>>> User.query.filter_by(name="wang").all()
[]
>>> user = User.query.filter_by(name="wang").all()
>>> user[0].name
'wang'

View Code

first()返回查询到的第一个对象

>>> user = User.query.first()
>>> user

>>> user.name
'wang'

View Code

all()返回查询到的所有对象

>>> User.query.all()
[, , , ]

View Code

filter模糊查询,返回名字结尾字符为g的所有数据。

>>> users = User.query.filter(User.name.endswith("g")).all()
>>> users
[, ]
>>> users[0].name
'wang'

View Code

get(),参数为主键,如果主键不存在没有返回内容

>>> User.query.get(1)

>>> user = User.query.get(1)
>>> user.name
'wang'

View Code

逻辑非,返回名字不等于wang的所有数据。

>>> user.query.filter(User.name!="wang").all()
[, , ]

View Code

逻辑与,需要导入and,返回and()条件满足的所有数据。

>>> from sqlalchemy import and_
>>> user = User.query.filter(and_(User.name!="wang",User.email.endswith("163.com"))).all()
>>> user
[, ]

View Code

逻辑或,需要导入or_

>>> from sqlalchemy import or_
>>> User.query.filter(or_(User.name!="wang", User.email.endswith("163.com"))).all()
[, , , ]

View Code

not_ 相当于取反

>>> from sqlalchemy import not_
>>> User.query.filter(not_(User.name=="chen")).all()
[, , ]

View Code

使用db.session查询

>>> db.session.query(Role).all()
[, ]
>>> db.session.query(Role).get(2)

>>> db.session.query(Role).first()

View Code

取不到数据返回None

>>> user= User.query.get(5)
>>> user
>>> type(user)

View Code

offset偏移

>>> users = User.query.offset(2).all()
>>> users[0].name
'chen'

View Code

limit

>>> users = User.query.offset(1).limit(2).all()
>>> users[0].name
'zhang'
>>> users[1].name
'chen'

View Code

order_by

>>> users = User.query.order_by(User.id.desc()).all()
>>> users
[, , , ]
>>> users[0].name
'zhou'

View Code

group_by

>>> from sqlalchemy import func
>>> db.session.query(User.role_id, func.count(User.role_id)).group_by(User.role_id)

>>> db.session.query(User.role_id, func.count(User.role_id)).group_by(User.role_id).all()
[(1, 2), (2, 2)]

View Code

在每个模型类中添加__repr__:

class Role(db.Model):
"""用户表"""
__tablename__ = "tbl_roles"

id = db.Column(db.Integer, primary_key=True)
name = db.Column(db.String(64), unique=True)
users = db.relationship("User", backref="role")

def __repr__(self):
return "Role object:%s" % self.name

class User(db.Model):
"""用户表"""
__tablename__ = "tbl_users" # 指明数据库的表名

id = db.Column(db.Integer, primary_key=True) # 整型的主键, 会默认设置为自增主键
name = db.Column(db.String(64), unique=True)
email = db.Column(db.String(128))
password = db.Column(db.String(128))
role_id = db.Column(db.Integer, db.ForeignKey("tbl_roles.id"))

def __repr__(self):
return "User object:%s" % self.name

View Code

>>> User.query.get(1)
User object:wang

View Code

关联查询

>>> ro = Role.query.get(1)
>>> type(ro)

>>> ro.users
[User object:wang, User object:zhou]

>>> user = User.query.get(1)
>>> user
User object:wang
>>> user.role_id
1
>>> Role.query.get(user.role_id)
Role object:admin
>>> user.role
Role object:admin

View Code

 

3.2 更新数据

第一种方法:

>>> user = User.query.get(4)
>>> user
User object:zhou
>>> user.name = "test"
>>> db.session.add(user)
>>> db.session.commit()
>>> User.query.get(4)
User object:test

View Code

第二种方法:

>>> user = User.query.filter_by(id=1).update({"name":"test1"})
>>> db.session.commit()
>>> User.query.get(1)
User object:test1

View Code

 

3.3 删除

>>> user =User.query.get(1)
>>> db.session.delete(user)
>>> db.session.commit()

View Code

 

四 图书案例

视图

from flask import Flask, render_template, request, redirect, url_for
from flask_sqlalchemy import SQLAlchemy
from flask_wtf import FlaskForm
from wtforms import StringField, SubmitField
from wtforms.validators import DataRequired

app = Flask(__name__)

class Config(object):
SQLALCHEMY_DATABASE_URI = "mysql://root:123456@127.0.0.1:3306/db_flask"
SQLALCHEMY_TRACK_MODIFICATIONS = True
SECRET_KEY = "sahq28y1qhihsd0-121ewq"

# 实例化SQLAlchemy对象
app.config.from_object(Config)

# 创建数据库sqlalchemy工具对象
db = SQLAlchemy(app)

# 定义数据库的模型
class Author(db.Model):
"""作者"""
__tablename__ = "tbl_authors"

id = db.Column(db.Integer, primary_key=True)
name = db.Column(db.String(32), unique=True)
books = db.relationship("Book", backref="author")

class Book(db.Model):
"""书籍"""
__tablename__ = "tbl_books"

id = db.Column(db.Integer, primary_key=True)
name = db.Column(db.String(64), unique=True)
author_id = db.Column(db.Integer, db.ForeignKey("tbl_authors.id"))

# 创建表单模型类
class AuthorBookForm(FlaskForm):
"""作者数据表单模型类"""
author_name = StringField(label="作者", validators=[DataRequired("作者必填")])
book_name = StringField(label="书籍", validators=[DataRequired("书籍必填")])
submit = SubmitField(label="保存")

@app.route("/index", methods=["POST", "GET"])
def index():
form = AuthorBookForm()
if form.validate_on_submit():
author_name = form.author_name.data
book_name = form.book_name.data

author = Author(name=author_name)
db.session.add(author)
db.session.commit()

book = Book(name=book_name, author_id=author.id)
# book = Book(name=book_name, author=author)
db.session.add(book)
db.session.commit()

authors = Author.query.all()
return render_template("author_book.html", form=form, authors=authors)

@app.route("/delete_book")
def delete_book():
book_id = request.args.get("book_id")
book = Book.query.get(book_id)
author_id = book.id
author = Author.query.get(author_id)
db.session.delete(book)
db.session.commit()

db.session.delete(author)
db.session.commit()
return redirect(url_for("index"))

if __name__ == '__main__':
# db.drop_all()
# db.create_all()
# au_xi = Author(name='我吃西红柿')
# au_qian = Author(name='萧潜')
# au_san = Author(name='唐家三少')
# db.session.add_all([au_xi, au_qian, au_san])
# db.session.commit()
#
# bk_xi = Book(name='吞噬星空', author_id=au_xi.id)
# bk_xi2 = Book(name='寸芒', author_id=au_qian.id)
# bk_qian = Book(name='飘渺之旅', author_id=au_qian.id)
# bk_san = Book(name='冰火魔厨', author_id=au_san.id)
# db.session.add_all([bk_xi, bk_xi2, bk_qian, bk_san])
db.session.commit()

app.run(debug=True)

View Code

模板

Title

添加作者和书籍

{{ form.csrf_token }}
{{ form.author_name.label }}
{{ form.author_name }}
{{ form.author_name.errors.0 }}

{{ form.book_name.label }}
{{ form.book_name }}
{{ form.book_name.errors.0 }}

{{ form.submit }}

{% for author in authors %}
{{ author.name }}

    {% for book in author.books %} 删除

  • {{ book.name }}
  • {% endfor %}

{% endfor %}

View Code

 

五 数据库迁移

在Flask中可以使用Flask-Migrate扩展,来实现数据迁移。并且集成到Flask-Script中,所有操作通过命令就能完成。

为了导出数据库迁移命令,Flask-Migrate提供了一个MigrateCommand类,可以附加到flask-script的manager对象上。

pip3 install flask-migrate

文件: _migration.py

from flask import Flask
from flask_sqlalchemy import SQLAlchemy
from flask_script import Manager
from flask_migrate import Migrate, MigrateCommand

app = Flask(__name__)

class Config(object):
SQLALCHEMY_DATABASE_URI = "mysql://root:123456@127.0.0.1:3306/db_flask"
SQLALCHEMY_TRACK_MODIFICATIONS = True

app.config.from_object(Config)

# 创建sqlalchemy的数据库连接对象
db = SQLAlchemy(app)

# 创建flask脚本管理工具对象
manager = Manager(app)

# 创建数据库迁移工具对象
Migrate(app, db)

# 向manager对象中添加数据库的操作命令
manager.add_command("db", MigrateCommand)

# 定义模型Role
class Role(db.Model):
# 定义表名
__tablename__ = 'roles'
# 定义列对象
id = db.Column(db.Integer, primary_key=True)
name = db.Column(db.String(64), unique=True)

def __repr__(self):
return 'Role:'.format(self.name)

# 定义用户
class User(db.Model):
__tablename__ = 'users'
id = db.Column(db.Integer, primary_key=True)
username = db.Column(db.String(64), unique=True, index=True)

def __repr__(self):
return 'User:'.format(self.username)

if __name__ == '__main__':
manager.run()

View Code

 

创建迁移仓库

#这个命令会创建migrations文件夹,所有迁移文件都放在里面。
python _migrate.py db init

 

创建迁移脚本

自动创建迁移脚本有两个函数,upgrade()函数把迁移中的改动应用到数据库中。downgrade()函数则将改动删除。自动创建的迁移脚本会根据模型定义和数据库当前状态的差异,生成upgrade()和downgrade()函数的内容。对比不一定完全正确,有可能会遗漏一些细节,需要进行检查

#创建自动迁移脚本
python _migrate.py db migrate -m 'initial migration' # -m 表示备注

 

更新数据库

python _migrate.py db upgrade

此时数据库里已经存在数据表了,如果需要回到之前的迁移版本,使用回退命令

 

回退数据库

回退数据库时,需要指定回退版本号,由于版本号是随机字符串,为避免出错,建议先使用python _migrate.py db history命令查看历史版本的具体版本号,然后复制具体版本号执行回退。

python _migrate.py db downgrade 版本号

 

六 发送邮件

在开发过程中,很多应用程序都需要通过邮件提醒用户,Flask的扩展包Flask-Mail通过包装了Python内置的smtplib包,可以用在Flask程序中发送邮件。

Flask-Mail连接到简单邮件协议(Simple Mail Transfer Protocol,SMTP)服务器,并把邮件交给服务器发送。

如下示例,通过开启QQ邮箱SMTP服务设置,发送邮件。

from flask import Flask
from flask_mail import Mail, Message

app = Flask(__name__)
#配置邮件:服务器/端口/传输层安全协议/邮箱名/密码
app.config.update(
DEBUG = True,
MAIL_SERVER='smtp.qq.com',
MAIL_PROT=465,
MAIL_USE_TLS = True,
MAIL_USERNAME = 'xxxxxxxx@qq.com',
MAIL_PASSWORD = 'xxxxxx',
)

mail = Mail(app)

@app.route('/')
def index():
# sender 发送方,recipients 接收方列表
msg = Message("This is a test ",sender='11111@qq.com', recipients=['aaaaaa@163.com','11111@qq.com'])
#邮件内容
msg.body = "Flask test mail"
#发送邮件
mail.send(msg)
print "Mail sent"
return "Sent Succeed"

if __name__ == "__main__":
app.run()

View Code

 

相关文章

Oracle如何使用授予和撤销权限的语法和示例
Awesome Project: 探索 MatrixOrigin 云原生分布式数据库
下载丨66页PDF,云和恩墨技术通讯(2024年7月刊)
社区版oceanbase安装
Oracle 导出CSV工具-sqluldr2
ETL数据集成丨快速将MySQL数据迁移至Doris数据库

发布评论