Flask学习笔记 - 数据库

Flask 数据库操作

Flask 提供了多种方式来与数据库进行交互,包括直接使用 SQL 和利用 ORM(对象关系映射)工具,如 SQLAlchemy。

  1. 使用SQLAlchemy
  2. 创建和管理数据库:使用 db.create_all() 创建表。
  3. CRUD 操作:添加、读取、更新和删除记录。
  4. 查询操作:执行基本和复杂查询,包括排序和分页。
  5. Flask-Migrate:使用 Flask-Migrate 管理数据库迁移。
  6. 执行原始 SQL:使用原始 SQL 语句进行数据库操作。

使用SQLAlchemy

SQLAlchemy 是一个强大的 ORM 库,可以简化数据库操作,通过 Python 对象与数据库表进行交互。

Flask-SQLAlchemy 是 Flask 的一个扩展,用于集成 SQLAlchemy。

1
pip install flask-sqlalchemy

创建和管理数据库

模型是数据库表的 Python 类,每个模型类代表数据库中的一张表。

app.py

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
from flask import Flask
from flask_sqlalchemy import SQLAlchemy
import sys

app = Flask(__name__)
app.config['SQLALCHEMY_DATABASE_URI'] = 'sqlite:///mydatabase.db'
app.config['SQLALCHEMY_TRACK_MODIFICATIONS'] = False
db = SQLAlchemy(app)

# OMR
class User(db.Model):
id = db.Column(db.Integer, primary_key=True)
username = db.Column(db.String(80), unique=True, nullable=False)
email = db.Column(db.String(120), unique=True, nullable=False)

def __repr__(self):
return f'<User {self.username}>'

@app.route('/')
def index():
return 'Hello, World!'

@app.route('/table')
def table():
# 检查是否数据库是否已经有对应的数据表
result = ""
connection = db.engine.connect()
if not db.engine.dialect.has_table(connection, 'user'):
with app.app_context():
db.create_all()
result = "Table 'user' created successfully."
else:
result = "Table 'user' already exists."
connection.close()
return result

if __name__ == '__main__':
port = sys.argv[1] if len(sys.argv) > 1 else 5001
app.run(port=int(port),debug=True)

启动服务 python Flask/06/app.py

发现../..会创建一个instance目录,用浏览器访问table路径,第一次会显示

01.png

02.png

Table 'user' created successfully.

后面刷新会显示

Table 'user' already exists.

03.png

然后在instance目录下会生成mydatabase.db的文件

04.png

用终端打开该文件,.tables确认数据库中有user表,.schema users显示表结构与代码中的设置能匹配

05.png

基本的CURD操作

创建记录

1
2
3
4
5
6
7
@app.route('/add_user')
def add_user():
new_user = User(username='john_doe', email='john@example.com')
db.session.add(new_user)
db.session.commit()
return 'User added!'

会报错,修改app.py

1
2
3
new_user = User()
new_user.username = username
new_user.email = email

06.png

读取记录

1
2
3
4
@app.route('/get_users')
def get_users():
users = User.query.all() # 获取所有用户
return '<br>'.join([f'{user.username} ({user.email})' for user in users])

User.query.all():查询所有用户记录。

07.png

更新记录

1
2
3
4
5
6
7
8
@app.route('/update_user/<int:user_id>')
def update_user(user_id):
user = User.query.get(user_id)
if user:
user.username = '新名字'
db.session.commit()
return 'User updated!'
return 'User not found!'

User.query.get(user_id):通过主键查询单个用户记录。

更新字段值并提交事务。

08.png

删除记录

1
2
3
4
5
6
7
8
@app.route('/delete_user/<int:user_id>')
def delete_user(user_id):
user = User.query.get(user_id)
if user:
db.session.delete(user)
db.session.commit()
return 'User deleted!'
return 'User not found!'

先输入个错误的2
09.png

输入正确的id,1

10.png

删除成功, 检查数据库表也删除了

恢复数据多添加几个User数据,报错了

1
2
3
4
5
IntegrityError
sqlalchemy.exc.IntegrityError: (sqlite3.IntegrityError) UNIQUE constraint failed: user.email
[SQL: INSERT INTO user (username, email) VALUES (?, ?)]
[parameters: ('john_doe', 'john@example.com')]
(Background on this error at: https://sqlalche.me/e/20/gkpj)

因为原来添加时都加了 unique 唯一性的键

1
username = db.Column(db.String(80), unique=True, nullable=False)    email = db.Column(db.String(120), unique=True, nullable=False)

app.py 加个时间戳

1
2
3
4
5
6
7
8
9
10
11
12
...
import time
...
@app.route('/add_user')
def add_user():
timestamp = time.time()
username = f'john_doe_{timestamp}'
email = f'john_{timestamp}@example.com'
new_user = User(username=username, email=email)
db.session.add(new_user)
db.session.commit()
return 'User added!'

查询操作

SQLAlchemy 提供了丰富的查询功能,可以通过查询对象来执行各种查询操作。

app.py 查询操作

1
2
3
4
5
6
7
8
@app.route('/query/<username>')
def query(username):
print(f"username: {username}")
user = User.query.filter_by(username=username).first()
if user:
return f"User:{user.username} ({user.email})"
return 'User not found!'

app.py 复杂查询

1
2
3
4
5
6
7
8
9
10
11
12
...
from sqlalchemy import or_
...

@app.route('/query2/<username>/<email>')
def query2(username,email):
print(f"username: {username}")
print(f"email: {email}")
user = User.query.filter(or_(User.username == username, User.email == email)).first()
if user:
return f"User:{user.username} ({user.email})"
return 'User not found!'

11.png

12.png

app.py 排序与分页

1
2
3
4
5
6
@app.route('/query3')
def query3():
users = User.query.order_by(User.username).paginate(page=1, per_page=10)
if users:
return '<br>'.join([f'{user.username} ({user.email})' for user in users.items])
return 'Users not found!'

13.png

使用 Flask-Migrate 进行迁移

Flask-Migrate 是一个用于数据库迁移的扩展,基于 Alembic,可以帮助你管理数据库的版本控制。

1
2
pip install flask-migrate
# 或pip3 install flask-migrate

配置 Flask-Migrate

app.py

1
2
3
from flask_migrate import Migrate

migrate = Migrate(app, db)

初始化迁移

1
flask db init

app.py同级的终端目录下执行该命令

会生成migrations目录

截屏2025-04-05 17.01.39.png

创建迁移脚本

1
flask db migrate -m "Initial migration."

应用完成后,生成的DB文件

截屏2025-04-05 17.15.07.png

表结构有的,但是数据是没有的,要重新导数据?

执行原始SQL

1
2
3
4
5
6
7
8
...
from sqlalchemy import text
...

@app.route('/raw_sql')
def raw_sql():
result = db.session.execute(text('SELECT * FROM user'))
return '<br>'.join([str(row) for row in result])

db.session.execute():执行原始 SQL 查询。里面的SQL语句要加text 包装下

参考

  1. Flask数据库操作
Vue学习笔记 - 安装与环境搭建 Flask学习笔记 - 表单
Your browser is out-of-date!

Update your browser to view this website correctly. Update my browser now

×