第17章 Flask Web开发

学习目标

完成本章学习后,读者应能够:

  1. 理解Web框架的本质:掌握WSGI/ASGI协议原理,理解HTTP请求-响应生命周期在框架中的实现机制
  2. 掌握Flask架构设计:深入理解Flask的微框架设计哲学、应用上下文与请求上下文机制
  3. 熟练运用路由系统:实现RESTful路由设计、动态路由、蓝图模块化与URL构建策略
  4. 精通模板引擎:掌握Jinja2模板继承、自定义过滤器、宏定义与模板性能优化
  5. 构建数据持久层:运用Flask-SQLAlchemy实现ORM映射、关系建模、迁移管理与查询优化
  6. 实现认证授权体系:基于Flask-Login构建完整的用户认证、会话管理与权限控制系统
  7. 掌握工程化实践:运用应用工厂模式、蓝图架构、配置管理实现可维护的大型应用
  8. 实施安全防护:理解并防御XSS、CSRF、SQL注入、点击劫持等Web安全威胁
  9. 完成生产部署:掌握Gunicorn/uWSGI部署、容器化、性能调优与监控体系

17.1 Web框架基础理论

17.1.1 HTTP协议与Web应用架构

Web应用的本质是遵循HTTP协议进行请求-响应交互的程序。理解HTTP协议的工作机制是掌握任何Web框架的前提。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
客户端 (Browser)                          服务端 (Web Server)
| |
| 1. DNS解析 → TCP三次握手 → TLS协商 |
| |
| 2. HTTP请求 ──────────────────────────> |
| GET /api/users HTTP/1.1 |
| Host: example.com |
| Accept: application/json |
| |
| 3. 服务器处理请求 |
| WSGI Server → Flask App → View Func |
| |
| 4. HTTP响应 <────────────────────────── |
| HTTP/1.1 200 OK |
| Content-Type: application/json |
| {"users": [...]} |
| |
| 5. TCP四次挥手 |

HTTP请求方法与语义对照:

方法语义幂等性安全性典型用途
GET获取资源查询数据
POST创建资源提交表单、创建记录
PUT全量更新替换整个资源
PATCH增量更新修改部分字段
DELETE删除资源删除记录
HEAD获取元信息检查资源是否存在
OPTIONS获取支持方法CORS预检请求

17.1.2 WSGI协议深度解析

WSGI(Web Server Gateway Interface,PEP 3333)是Python Web应用与服务器之间的标准接口协议。Flask基于WSGI构建,理解WSGI对掌握Flask的运行机制至关重要。

1
2
3
4
5
def simple_wsgi_app(environ, start_response):
status = "200 OK"
headers = [("Content-Type", "text/plain; charset=utf-8")]
start_response(status, headers)
return [b"Hello, WSGI!"]

WSGI接口的两个核心组件:

  • environ:包含请求信息的字典,由WSGI服务器填充
  • start_response:可调用对象,用于设置响应状态码和头部
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
def detailed_wsgi_app(environ, start_response):
method = environ.get("REQUEST_METHOD", "GET")
path = environ.get("PATH_INFO", "/")
query_string = environ.get("QUERY_STRING", "")
content_type = environ.get("CONTENT_TYPE", "")
content_length = int(environ.get("CONTENT_LENGTH", 0) or 0)
server_name = environ.get("SERVER_NAME", "localhost")
server_port = environ.get("SERVER_PORT", "80")

request_body = environ["wsgi.input"].read(content_length) if content_length > 0 else b""

response_body = (
f"Method: {method}\n"
f"Path: {path}\n"
f"Query: {query_string}\n"
f"Content-Type: {content_type}\n"
f"Body: {request_body.decode('utf-8')}\n"
f"Server: {server_name}:{server_port}\n"
).encode("utf-8")

status = "200 OK"
headers = [
("Content-Type", "text/plain; charset=utf-8"),
("Content-Length", str(len(response_body))),
]
start_response(status, headers)
return [response_body]

WSGI中间件模式——在应用与服务器之间插入处理层:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
class TimingMiddleware:
def __init__(self, app):
self.app = app

def __call__(self, environ, start_response):
import time
start = time.perf_counter()

def custom_start_response(status, headers, exc_info=None):
elapsed = time.perf_counter() - start
headers.append(("X-Response-Time", f"{elapsed:.4f}s"))
return start_response(status, headers, exc_info)

return self.app(environ, custom_start_response)


class RequestLoggingMiddleware:
def __init__(self, app, logger=None):
self.app = app
self.logger = logger

def __call__(self, environ, start_response):
method = environ.get("REQUEST_METHOD", "GET")
path = environ.get("PATH_INFO", "/")
query = environ.get("QUERY_STRING", "")
remote = environ.get("REMOTE_ADDR", "-")

if self.logger:
self.logger.info(f"{remote} {method} {path}?{query}")

return self.app(environ, start_response)

17.1.3 从WSGI到ASGI:异步Web的演进

ASGI(Asynchronous Server Gateway Interface)是WSGI的异步继承者,支持WebSocket、HTTP/2和长轮询等场景:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
async def asgi_app(scope, receive, send):
if scope["type"] == "http":
await send({
"type": "http.response.start",
"status": 200,
"headers": [[b"content-type", b"text/plain"]],
})
await send({
"type": "http.response.body",
"body": b"Hello, ASGI!",
})
elif scope["type"] == "websocket":
while True:
message = await receive()
if message["type"] == "websocket.connect":
await send({"type": "websocket.accept"})
elif message["type"] == "websocket.receive":
await send({
"type": "websocket.send",
"text": f"Echo: {message.get('text', '')}",
})
elif message["type"] == "websocket.disconnect":
break

Flask 3.0+已开始支持ASGI模式,可通过异步视图函数处理并发请求:

1
2
3
4
5
6
7
8
9
from flask import Flask

app = Flask(__name__)

@app.route("/async-endpoint")
async def async_handler():
import asyncio
await asyncio.sleep(0.1)
return {"message": "Async response"}

17.1.4 Flask的设计哲学

Flask遵循”微框架”(Microframework)设计理念,其核心原则包括:

  1. 显式优于隐式:不强制项目结构,开发者拥有完全控制权
  2. 最小核心:核心仅提供路由、模板、请求上下文,其余通过扩展实现
  3. 可组合性:通过蓝图和扩展机制构建复杂应用
  4. 开发者友好:提供优秀的调试工具和详细的错误信息

Flask核心依赖仅两个库:

依赖功能说明
WerkzeugWSGI工具库提供路由、请求/响应对象、开发服务器、调试器
Jinja2模板引擎提供模板渲染、继承、过滤器、宏等
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
Flask应用架构层次:

┌─────────────────────────────────────────┐
│ Flask Application │
│ ┌───────────┐ ┌──────────┐ ┌──────┐ │
│ │ Routing │ │ Template │ │ Ctx │ │
│ │ System │ │ Engine │ │ Mgmt │ │
│ └─────┬─────┘ └────┬─────┘ └──┬───┘ │
│ │ │ │ │
│ ┌─────┴─────┐ ┌────┴─────┐ ┌─┴───┐ │
│ │ Werkzeug │ │ Jinja2 │ │ Ctx │ │
│ │ Routing │ │ Template │ │ Loc │ │
│ └───────────┘ └──────────┘ └─────┘ │
│ │
│ ┌─────────────────────────────────────┐│
│ │ Extension System ││
│ │ SQLAlchemy │ Login │ WTForms │ ... ││
│ └─────────────────────────────────────┘│
└─────────────────────────────────────────┘

17.2 Flask应用核心机制

17.2.1 应用初始化与配置

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
from flask import Flask
import os


class Config:
SECRET_KEY = os.environ.get("SECRET_KEY", "dev-key-change-in-production")
SQLALCHEMY_DATABASE_URI = os.environ.get(
"DATABASE_URL", "sqlite:///app.db"
)
SQLALCHEMY_TRACK_MODIFICATIONS = False
SQLALCHEMY_ENGINE_OPTIONS = {
"pool_size": 10,
"pool_recycle": 3600,
"pool_pre_ping": True,
}
JSON_SORT_KEYS = False
MAX_CONTENT_LENGTH = 16 * 1024 * 1024


class DevelopmentConfig(Config):
DEBUG = True
SQLALCHEMY_ECHO = True


class TestingConfig(Config):
TESTING = True
SQLALCHEMY_DATABASE_URI = "sqlite:///:memory:"
WTF_CSRF_ENABLED = False


class ProductionConfig(Config):
DEBUG = False
SQLALCHEMY_ECHO = False


config_map = {
"development": DevelopmentConfig,
"testing": TestingConfig,
"production": ProductionConfig,
"default": DevelopmentConfig,
}

17.2.2 应用工厂模式

应用工厂模式(Application Factory Pattern)是Flask推荐的项目组织方式,它将应用创建过程封装为函数,支持:

  • 多配置环境切换
  • 避免循环导入
  • 便于测试时创建独立应用实例
  • 支持多应用实例运行
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
from flask import Flask
from flask_sqlalchemy import SQLAlchemy
from flask_migrate import Migrate
from flask_login import LoginManager
from flask_cors import CORS

db = SQLAlchemy()
migrate = Migrate()
login_manager = LoginManager()
cors = CORS()


def create_app(config_name="default"):
app = Flask(__name__)
app.config.from_object(config_map[config_name])

db.init_app(app)
migrate.init_app(app, db)
login_manager.init_app(app)
cors.init_app(app)

login_manager.login_view = "auth.login"
login_manager.login_message_category = "info"

from .main import main_bp
from .auth import auth_bp
from .api import api_bp

app.register_blueprint(main_bp)
app.register_blueprint(auth_bp, url_prefix="/auth")
app.register_blueprint(api_bp, url_prefix="/api/v1")

register_error_handlers(app)
register_template_filters(app)
register_cli_commands(app)
register_hooks(app)

return app


def register_error_handlers(app):
from flask import render_template, jsonify

def wants_json_response():
from flask import request
return (
request.accept_mimetypes.best_match(["application/json", "text/html"])
== "application/json"
)

@app.errorhandler(404)
def not_found(error):
if wants_json_response():
return jsonify({"error": "Not found", "status": 404}), 404
return render_template("errors/404.html"), 404

@app.errorhandler(500)
def internal_error(error):
db.session.rollback()
if wants_json_response():
return jsonify({"error": "Internal server error", "status": 500}), 500
return render_template("errors/500.html"), 500

@app.errorhandler(403)
def forbidden(error):
if wants_json_response():
return jsonify({"error": "Forbidden", "status": 403}), 403
return render_template("errors/403.html"), 403


def register_template_filters(app):
@app.template_filter("datetime_format")
def datetime_format(value, format="%Y-%m-%d %H:%M"):
if value is None:
return ""
return value.strftime(format)

@app.template_filter("truncate_chars")
def truncate_chars(value, length=100):
if len(value) <= length:
return value
return value[:length].rsplit(" ", 1)[0] + "..."


def register_cli_commands(app):
import click

@app.cli.command("init-db")
def init_db():
db.create_all()
click.echo("数据库初始化完成。")

@app.cli.command("create-admin")
@click.argument("username")
@click.argument("email")
@click.argument("password")
def create_admin(username, email, password):
from .models import User
user = User(username=username, email=email, is_admin=True)
user.set_password(password)
db.session.add(user)
db.session.commit()
click.echo(f"管理员用户 {username} 创建成功。")


def register_hooks(app):
from flask import g, request
import time

@app.before_request
def before_request():
g.start_time = time.perf_counter()

@app.after_request
def after_request(response):
if hasattr(g, "start_time"):
elapsed = time.perf_counter() - g.start_time
response.headers["X-Response-Time"] = f"{elapsed:.4f}s"
return response

17.2.3 上下文机制深度解析

Flask的上下文机制是其架构的核心,理解上下文对排查”Working outside of application context”等错误至关重要。

Flask存在两种上下文:

上下文类型作用域包含对象生命周期
应用上下文应用级别current_appg请求期间或手动推送
请求上下文请求级别requestsession单个HTTP请求
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
from flask import Flask, current_app, g, request

app = Flask(__name__)
app.config["APP_NAME"] = "MyApp"


@app.route("/context-demo")
def context_demo():
app_ctx_info = {
"app_name": current_app.name,
"config_app_name": current_app.config["APP_NAME"],
"debug_mode": current_app.debug,
}

request_ctx_info = {
"method": request.method,
"url": request.url,
"endpoint": request.endpoint,
"blueprint": request.blueprint,
"view_args": request.view_args,
}

g.request_id = "req-12345"
g.user_ip = request.remote_addr

return {
"app_context": app_ctx_info,
"request_context": request_ctx_info,
"g_data": {"request_id": g.request_id, "user_ip": g.user_ip},
}

手动推送上下文(用于CLI脚本、测试等场景):

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
from flask import Flask

app = Flask(__name__)
app.config["SQLALCHEMY_DATABASE_URI"] = "sqlite:///test.db"


with app.app_context():
from flask import current_app
print(current_app.name)
print(current_app.config["SQLALCHEMY_DATABASE_URI"])


with app.test_request_context("/api/test?foo=bar", method="POST"):
from flask import request
print(request.method)
print(request.url)
print(request.args.get("foo"))

上下文栈的实现原理(简化版):

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
class AppContext:
def __init__(self, app):
self.app = app
self.g = app.app_ctx_globals_class()
self._refcnt = 0

def push(self):
self._refcnt += 1
_app_ctx_stack.push(self)

def pop(self, exc=None):
self._refcnt -= 1
if self._refcnt <= 0:
_app_ctx_stack.pop()

def __enter__(self):
self.push()
return self

def __exit__(self, *args):
self.pop()


class RequestContext:
def __init__(self, app, environ, request=None, session=None):
self.app = app
self.request = request or app.request_class(environ)
self.session = session

def push(self):
app_ctx = _app_ctx_stack.top
if app_ctx is None or app_ctx.app != self.app:
app_ctx = self.app.app_context()
app_ctx.push()
self._got_app_context = True
_request_ctx_stack.push(self)

def pop(self, exc=None):
_request_ctx_stack.pop()
if getattr(self, "_got_app_context", False):
app_ctx = _app_ctx_stack.pop()
app_ctx.pop(exc)

17.3 路由系统

17.3.1 路由注册与匹配机制

Flask使用Werkzeug的路由系统,基于MapRule实现URL匹配:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
from flask import Flask

app = Flask(__name__)


@app.route("/")
def index():
return "Home"


@app.route("/hello")
def hello():
return "Hello"


@app.route("/user/<username>")
def user_profile(username):
return f"User: {username}"


@app.route("/post/<int:post_id>")
def post_detail(post_id):
return f"Post: {post_id}"


@app.route("/post/<int:post_id>/comment/<int:comment_id>")
def post_comment(post_id, comment_id):
return f"Post {post_id}, Comment {comment_id}"


@app.route("/path/<path:filepath>")
def serve_path(filepath):
return f"Path: {filepath}"


@app.route("/download/<uuid:file_id>")
def download(file_id):
return f"File ID: {file_id}"

路由参数转换器:

转换器匹配规则示例URLPython类型
string任意文本(不含//user/<name>str
int正整数/post/<int:id>int
float浮点数/price/<float:p>float
path任意文本(含//file/<path:p>str
uuidUUID字符串/item/<uuid:id>uuid.UUID
any指定选项之一/<any(a,b):page>str

自定义转换器:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
from werkzeug.routing import BaseConverter


class ListConverter(BaseConverter):
def to_python(self, value):
return value.split(",")

def to_url(self, values):
return ",".join(str(v) for v in values)


class RegexConverter(BaseConverter):
def __init__(self, url_map, regex_pattern):
super().__init__(url_map)
self.regex = regex_pattern


app.url_map.converters["list"] = ListConverter
app.url_map.converters["regex"] = RegexConverter


@app.route("/items/<list:items>")
def items_view(items):
return {"items": items}


@app.route("/version/<regex(r'\d+\.\d+\.\d+'):version>")
def version_info(version):
return {"version": version}

17.3.2 HTTP方法与RESTful路由设计

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
from flask import Flask, request, jsonify

app = Flask(__name__)


@app.route("/api/articles", methods=["GET", "POST"])
def articles_collection():
if request.method == "GET":
page = request.args.get("page", 1, type=int)
per_page = request.args.get("per_page", 10, type=int)
return jsonify({
"articles": [],
"page": page,
"per_page": per_page,
})
elif request.method == "POST":
data = request.get_json()
if not data or "title" not in data:
return jsonify({"error": "标题不能为空"}), 400
return jsonify({"id": 1, "title": data["title"]}), 201


@app.route("/api/articles/<int:article_id>", methods=["GET", "PUT", "DELETE"])
def article_resource(article_id):
if request.method == "GET":
return jsonify({"id": article_id, "title": "示例文章"})
elif request.method == "PUT":
data = request.get_json()
return jsonify({"id": article_id, "title": data.get("title", "")})
elif request.method == "DELETE":
return "", 204


@app.route("/api/articles/<int:article_id>/comments", methods=["GET", "POST"])
def article_comments(article_id):
if request.method == "GET":
return jsonify({"article_id": article_id, "comments": []})
elif request.method == "POST":
data = request.get_json()
return jsonify({
"id": 1,
"article_id": article_id,
"content": data.get("content", ""),
}), 201

使用MethodView实现类视图,更优雅地组织RESTful API:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
from flask import Flask, request, jsonify
from flask.views import MethodView

app = Flask(__name__)


class ArticleAPI(MethodView):
def get(self, article_id):
if article_id is None:
page = request.args.get("page", 1, type=int)
return jsonify({"articles": [], "page": page})
return jsonify({"id": article_id, "title": "示例文章"})

def post(self):
data = request.get_json()
if not data or "title" not in data:
return jsonify({"error": "标题不能为空"}), 400
return jsonify({"id": 1, "title": data["title"]}), 201

def put(self, article_id):
data = request.get_json()
return jsonify({"id": article_id, "title": data.get("title", "")})

def delete(self, article_id):
return "", 204


article_view = ArticleAPI.as_view("article_api")
app.add_url_rule(
"/api/articles",
defaults={"article_id": None},
view_func=article_view,
methods=["GET"],
)
app.add_url_rule("/api/articles", view_func=article_view, methods=["POST"])
app.add_url_rule(
"/api/articles/<int:article_id>",
view_func=article_view,
methods=["GET", "PUT", "DELETE"],
)

17.3.3 蓝图(Blueprint)模块化架构

蓝图是Flask实现模块化的核心机制,它允许将应用拆分为独立的功能模块:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
from flask import Blueprint

auth_bp = Blueprint(
"auth",
__name__,
template_folder="templates",
static_folder="static",
)


@auth_bp.route("/login", methods=["GET", "POST"])
def login():
return "Login Page"


@auth_bp.route("/register", methods=["GET", "POST"])
def register():
return "Register Page"


@auth_bp.route("/logout")
def logout():
return "Logged out"
1
2
3
4
5
6
7
8
9
10
11
12
13
from flask import Blueprint, jsonify

api_bp = Blueprint("api", __name__, url_prefix="/api/v1")


@api_bp.route("/status")
def status():
return jsonify({"status": "ok", "version": "1.0"})


@api_bp.route("/users", methods=["GET"])
def list_users():
return jsonify({"users": []})

蓝图嵌套——构建多层模块结构:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
from flask import Blueprint

api_v1_bp = Blueprint("api_v1", __name__, url_prefix="/api/v1")
api_v2_bp = Blueprint("api_v2", __name__, url_prefix="/api/v2")

users_v1_bp = Blueprint("users_v1", __name__)
articles_v1_bp = Blueprint("articles_v1", __name__)


@users_v1_bp.route("/users")
def list_users():
return {"version": "v1", "users": []}


@articles_v1_bp.route("/articles")
def list_articles():
return {"version": "v1", "articles": []}


api_v1_bp.register_blueprint(users_v1_bp)
api_v1_bp.register_blueprint(articles_v1_bp)

17.4 请求与响应

17.4.1 请求对象详解

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
from flask import Flask, request

app = Flask(__name__)


@app.route("/request-info", methods=["GET", "POST", "PUT"])
def request_info():
info = {
"method": request.method,
"url": request.url,
"base_url": request.base_url,
"url_root": request.url_root,
"path": request.path,
"full_path": request.full_path,
"scheme": request.scheme,
"is_secure": request.is_secure,
"host": request.host,
"remote_addr": request.remote_addr,
}
return info


@app.route("/query-demo")
def query_demo():
search = request.args.get("q", "")
page = request.args.get("page", 1, type=int)
sort = request.args.get("sort", "created_at")
tags = request.args.getlist("tag")
return {
"search": search,
"page": page,
"sort": sort,
"tags": tags,
}


@app.route("/form-demo", methods=["POST"])
def form_demo():
username = request.form.get("username")
password = request.form.get("password")
hobbies = request.form.getlist("hobby")
return {
"username": username,
"hobbies": hobbies,
}


@app.route("/json-demo", methods=["POST"])
def json_demo():
data = request.get_json(silent=True)
if data is None:
return {"error": "Invalid JSON"}, 400
return {"received": data, "content_type": request.content_type}


@app.route("/headers-demo")
def headers_demo():
return {
"user_agent": request.user_agent.string,
"user_agent_browser": request.user_agent.browser,
"user_agent_platform": request.user_agent.platform,
"accept_languages": [str(l) for l in request.accept_languages],
"authorization": request.headers.get("Authorization"),
}


@app.route("/cookie-demo")
def cookie_demo():
all_cookies = {k: v for k, v in request.cookies.items()}
session_id = request.cookies.get("session_id")
return {"all_cookies": all_cookies, "session_id": session_id}

17.4.2 响应构建

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
from flask import (
Flask,
make_response,
jsonify,
Response,
stream_with_context,
redirect,
url_for,
send_file,
)
import json
import time
import io
import csv

app = Flask(__name__)


@app.route("/response-text")
def text_response():
return "Plain text response"


@app.route("/response-json")
def json_response():
return jsonify(
message="Success",
data={"id": 1, "name": "Example"},
status="ok",
)


@app.route("/response-custom")
def custom_response():
resp = make_response(jsonify({"message": "Created"}), 201)
resp.headers["X-Custom-Header"] = "CustomValue"
resp.headers["Cache-Control"] = "no-cache, no-store, must-revalidate"
return resp


@app.route("/response-cookie")
def cookie_response():
resp = make_response("Cookie已设置")
resp.set_cookie(
"session_token",
value="abc123def456",
max_age=3600,
httponly=True,
secure=True,
samesite="Lax",
)
return resp


@app.route("/response-redirect")
def redirect_response():
return redirect(url_for("text_response"))


@app.route("/response-stream")
def stream_response():
def generate():
for i in range(10):
json_data = json.dumps({"count": i, "timestamp": time.time()})
yield f"data: {json_data}\n\n"
time.sleep(0.5)

return Response(
stream_with_context(generate()),
mimetype="text/event-stream",
headers={"Cache-Control": "no-cache", "X-Accel-Buffering": "no"},
)


@app.route("/response-download")
def download_response():
buffer = io.BytesIO(b"Hello, this is a downloadable file content.")
return send_file(
buffer,
as_attachment=True,
download_name="example.txt",
mimetype="text/plain",
)


@app.route("/response-csv")
def csv_response():
output = io.StringIO()
writer = csv.writer(output)
writer.writerow(["ID", "Name", "Email"])
writer.writerow([1, "Alice", "alice@example.com"])
writer.writerow([2, "Bob", "bob@example.com"])

resp = make_response(output.getvalue())
resp.headers["Content-Type"] = "text/csv; charset=utf-8"
resp.headers["Content-Disposition"] = "attachment; filename=users.csv"
return resp

17.4.3 请求钩子(Hooks)

请求钩子允许在请求处理的不同阶段插入通用逻辑:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
from flask import Flask, g, request
import time
import uuid

app = Flask(__name__)


@app.before_request
def before_request_hook():
g.request_id = str(uuid.uuid4())[:8]
g.start_time = time.perf_counter()
g.user = None

auth_header = request.headers.get("Authorization")
if auth_header and auth_header.startswith("Bearer "):
token = auth_header[7:]
g.user = verify_token(token)


@app.after_request
def after_request_hook(response):
if hasattr(g, "request_id"):
response.headers["X-Request-ID"] = g.request_id
if hasattr(g, "start_time"):
elapsed = time.perf_counter() - g.start_time
response.headers["X-Response-Time"] = f"{elapsed:.4f}s"
return response


@app.teardown_request
def teardown_request_hook(exception=None):
if exception:
app.logger.error(
f"请求异常: {exception}, "
f"Request-ID: {getattr(g, 'request_id', 'N/A')}"
)


@app.teardown_appcontext
def teardown_appcontext_hook(exception=None):
from . import db
if exception:
db.session.rollback()
db.session.remove()


def verify_token(token):
if token == "valid-token":
return {"id": 1, "username": "admin"}
return None

钩子执行顺序:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
请求进入


before_first_request (仅首次请求)


before_request (按注册顺序执行)


视图函数处理


after_request (按注册逆序执行)


teardown_request (无论是否异常都执行)


teardown_appcontext (清理应用上下文)

17.5 Jinja2模板引擎

17.5.1 模板渲染与继承

Jinja2是Flask的默认模板引擎,提供模板继承、宏、过滤器等强大功能。

基础模板 templates/base.html

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
<!DOCTYPE html>
<html lang="zh-CN">
<head>
<meta charset="UTF-8">
<meta name="viewport" content="width=device-width, initial-scale=1.0">
<title>{% block title %}MyApp{% endblock %}</title>
<link rel="stylesheet" href="{{ url_for('static', filename='css/style.css') }}">
{% block extra_css %}{% endblock %}
</head>
<body>
<nav class="navbar">
<div class="container">
<a class="brand" href="{{ url_for('main.index') }}">MyApp</a>
<ul class="nav-links">
<li><a href="{{ url_for('main.index') }}">首页</a></li>
<li><a href="{{ url_for('main.about') }}">关于</a></li>
{% if current_user.is_authenticated %}
<li><a href="{{ url_for('auth.profile') }}">{{ current_user.username }}</a></li>
<li><a href="{{ url_for('auth.logout') }}">退出</a></li>
{% else %}
<li><a href="{{ url_for('auth.login') }}">登录</a></li>
<li><a href="{{ url_for('auth.register') }}">注册</a></li>
{% endif %}
</ul>
</div>
</nav>

<main class="container">
{% with messages = get_flashed_messages(with_categories=true) %}
{% if messages %}
<div class="flash-messages">
{% for category, message in messages %}
<div class="alert alert-{{ category }}">{{ message }}</div>
{% endfor %}
</div>
{% endif %}
{% endwith %}

{% block content %}{% endblock %}
</main>

<footer class="footer">
<div class="container">
<p>&copy; 2026 MyApp. All rights reserved.</p>
</div>
</footer>

<script src="{{ url_for('static', filename='js/main.js') }}"></script>
{% block extra_js %}{% endblock %}
</body>
</html>

子模板 templates/main/index.html

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
{% extends "base.html" %}

{% block title %}首页 - {{ super() }}{% endblock %}

{% block content %}
<section class="hero">
<h1>欢迎来到 MyApp</h1>
<p>一个基于Flask构建的Web应用</p>
</section>

<section class="features">
<h2>核心功能</h2>
<div class="grid">
{% for feature in features %}
<div class="card">
<h3>{{ feature.title }}</h3>
<p>{{ feature.description }}</p>
<a href="{{ feature.url }}">了解更多 &rarr;</a>
</div>
{% endfor %}
</div>
</section>
{% endblock %}

列表模板 templates/main/articles.html

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
{% extends "base.html" %}

{% block title %}文章列表 - {{ super() }}{% endblock %}

{% block content %}
<h1>文章列表</h1>

{% if articles %}
<table class="table">
<thead>
<tr>
<th>标题</th>
<th>作者</th>
<th>发布时间</th>
<th>操作</th>
</tr>
</thead>
<tbody>
{% for article in articles %}
<tr>
<td><a href="{{ url_for('main.article_detail', article_id=article.id) }}">{{ article.title }}</a></td>
<td>{{ article.author.username }}</td>
<td>{{ article.created_at | datetime_format }}</td>
<td>
<a href="{{ url_for('main.edit_article', article_id=article.id) }}">编辑</a>
<form method="post" action="{{ url_for('main.delete_article', article_id=article.id) }}" style="display:inline">
<input type="hidden" name="csrf_token" value="{{ csrf_token() }}">
<button type="submit" onclick="return confirm('确认删除?')">删除</button>
</form>
</td>
</tr>
{% endfor %}
</tbody>
</table>

{% if pagination.pages > 1 %}
<nav class="pagination">
{% if pagination.has_prev %}
<a href="{{ url_for('main.articles', page=pagination.prev_num) }}">&laquo; 上一页</a>
{% endif %}

{% for page_num in pagination.iter_pages() %}
{% if page_num %}
<a href="{{ url_for('main.articles', page=page_num) }}"
class="{{ 'active' if page_num == pagination.page else '' }}">{{ page_num }}</a>
{% else %}
<span class="ellipsis">...</span>
{% endif %}
{% endfor %}

{% if pagination.has_next %}
<a href="{{ url_for('main.articles', page=pagination.next_num) }}">下一页 &raquo;</a>
{% endif %}
</nav>
{% endif %}

{% else %}
<p class="empty-state">暂无文章。</p>
{% endif %}
{% endblock %}

17.5.2 自定义过滤器与宏

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
from flask import Flask

app = Flask(__name__)


@app.template_filter("datetime_format")
def datetime_format(value, format="%Y-%m-%d %H:%M"):
if value is None:
return ""
return value.strftime(format)


@app.template_filter("currency")
def currency_format(value, symbol="¥"):
if value is None:
return ""
return f"{symbol}{value:,.2f}"


@app.template_filter("pluralize")
def pluralize(count, singular="", plural="s"):
if count == 1:
return singular
return plural


@app.template_filter("truncate_html")
def truncate_html(value, length=200, end="..."):
import re
text = re.sub(r"<[^>]+>", "", value)
if len(text) <= length:
return value
return text[:length].rsplit(" ", 1)[0] + end


@app.template_filter("time_ago")
def time_ago(value):
from datetime import datetime
now = datetime.now()
diff = now - value
seconds = int(diff.total_seconds())

intervals = [
(31536000, "年"),
(2592000, "个月"),
(604800, "周"),
(86400, "天"),
(3600, "小时"),
(60, "分钟"),
(1, "秒"),
]

for interval, label in intervals:
count = seconds // interval
if count > 0:
return f"{count}{label}前"
return "刚刚"

模板宏——可复用的模板组件:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
{# templates/macros/forms.html #}

{% macro render_field(field, label_visible=true) -%}
<div class="form-group {% if field.errors %}has-error{% endif %}">
{% if label_visible %}
<label for="{{ field.id }}">{{ field.label.text }}</label>
{% endif %}
{{ field(class="form-control" + (" is-invalid" if field.errors else ""), **kwargs) }}
{% if field.errors %}
<div class="invalid-feedback">
{% for error in field.errors %}
<span>{{ error }}</span>
{% endfor %}
</div>
{% elif field.description %}
<small class="form-text text-muted">{{ field.description }}</small>
{% endif %}
</div>
{%- endmacro %}


{% macro render_pagination(pagination, endpoint) -%}
{% if pagination.pages > 1 %}
<nav aria-label="分页导航">
<ul class="pagination">
<li class="page-item {{ 'disabled' if not pagination.has_prev else '' }}">
<a class="page-link" href="{{ url_for(endpoint, page=pagination.prev_num) if pagination.has_prev else '#' }}">&laquo;</a>
</li>
{% for page_num in pagination.iter_pages(left_edge=1, right_edge=1, left_current=2, right_current=2) %}
{% if page_num %}
<li class="page-item {{ 'active' if page_num == pagination.page else '' }}">
<a class="page-link" href="{{ url_for(endpoint, page=page_num) }}">{{ page_num }}</a>
</li>
{% else %}
<li class="page-item disabled"><span class="page-link">...</span></li>
{% endif %}
{% endfor %}
<li class="page-item {{ 'disabled' if not pagination.has_next else '' }}">
<a class="page-link" href="{{ url_for(endpoint, page=pagination.next_num) if pagination.has_next else '#' }}">&raquo;</a>
</li>
</ul>
</nav>
{% endif %}
{%- endmacro %}

17.6 表单处理与验证

17.6.1 Flask-WTF集成

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
from flask_wtf import FlaskForm
from wtforms import (
StringField,
PasswordField,
TextAreaField,
SelectField,
BooleanField,
FileField,
)
from wtforms.validators import (
DataRequired,
Email,
Length,
EqualTo,
Optional,
Regexp,
ValidationError,
)


class RegistrationForm(FlaskForm):
username = StringField(
"用户名",
validators=[
DataRequired(message="用户名不能为空"),
Length(min=3, max=20, message="用户名长度需在3-20个字符之间"),
Regexp(
r"^[a-zA-Z][a-zA-Z0-9_]*$",
message="用户名只能包含字母、数字和下划线,且以字母开头",
),
],
)
email = StringField(
"邮箱",
validators=[
DataRequired(message="邮箱不能为空"),
Email(message="请输入有效的邮箱地址"),
],
)
password = PasswordField(
"密码",
validators=[
DataRequired(message="密码不能为空"),
Length(min=8, message="密码长度不能少于8个字符"),
],
)
confirm_password = PasswordField(
"确认密码",
validators=[
DataRequired(message="请确认密码"),
EqualTo("password", message="两次输入的密码不一致"),
],
)
agree_terms = BooleanField(
"同意服务条款",
validators=[DataRequired(message="必须同意服务条款才能注册")],
)

def validate_username(self, field):
from .models import User
if User.query.filter_by(username=field.data).first():
raise ValidationError("该用户名已被注册")

def validate_email(self, field):
from .models import User
if User.query.filter_by(email=field.data).first():
raise ValidationError("该邮箱已被注册")


class ArticleForm(FlaskForm):
title = StringField(
"标题",
validators=[
DataRequired(message="标题不能为空"),
Length(max=200, message="标题不能超过200个字符"),
],
)
content = TextAreaField(
"内容",
validators=[DataRequired(message="内容不能为空")],
)
category_id = SelectField(
"分类",
coerce=int,
validators=[DataRequired(message="请选择分类")],
)
tags = StringField("标签(用逗号分隔)", validators=[Optional()])
is_published = BooleanField("发布", default=True)


class ChangePasswordForm(FlaskForm):
old_password = PasswordField(
"当前密码",
validators=[DataRequired(message="请输入当前密码")],
)
new_password = PasswordField(
"新密码",
validators=[
DataRequired(message="请输入新密码"),
Length(min=8, message="密码长度不能少于8个字符"),
],
)
confirm_password = PasswordField(
"确认新密码",
validators=[
DataRequired(message="请确认新密码"),
EqualTo("new_password", message="两次输入的密码不一致"),
],
)

def validate_new_password(self, field):
if field.data == self.old_password.data:
raise ValidationError("新密码不能与当前密码相同")

17.6.2 表单视图与模板集成

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
from flask import (
Blueprint,
render_template,
redirect,
url_for,
flash,
request,
)
from flask_login import login_user, logout_user, login_required, current_user

auth_bp = Blueprint("auth", __name__)


@auth_bp.route("/register", methods=["GET", "POST"])
def register():
from .forms import RegistrationForm
from .models import User, db

if current_user.is_authenticated:
return redirect(url_for("main.index"))

form = RegistrationForm()
if form.validate_on_submit():
user = User(
username=form.username.data,
email=form.email.data,
)
user.set_password(form.password.data)
db.session.add(user)
db.session.commit()

flash("注册成功!请登录。", "success")
return redirect(url_for("auth.login"))

return render_template("auth/register.html", form=form)


@auth_bp.route("/login", methods=["GET", "POST"])
def login():
from .forms import LoginForm
from .models import User

if current_user.is_authenticated:
return redirect(url_for("main.index"))

form = LoginForm()
if form.validate_on_submit():
user = User.query.filter_by(username=form.username.data).first()
if user and user.check_password(form.password.data):
login_user(user, remember=form.remember.data)
next_page = request.args.get("next")
flash("登录成功!", "success")
return redirect(next_page or url_for("main.index"))
flash("用户名或密码错误", "danger")

return render_template("auth/login.html", form=form)


@auth_bp.route("/logout")
@login_required
def logout():
logout_user()
flash("已退出登录。", "info")
return redirect(url_for("main.index"))

表单模板:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
{% extends "base.html" %}
{% from "macros/forms.html" import render_field %}

{% block title %}注册 - {{ super() }}{% endblock %}

{% block content %}
<div class="auth-container">
<h1>创建账户</h1>

<form method="post" novalidate>
{{ form.hidden_tag() }}

{{ render_field(form.username, placeholder="请输入用户名") }}
{{ render_field(form.email, placeholder="请输入邮箱", type="email") }}
{{ render_field(form.password, placeholder="请输入密码") }}
{{ render_field(form.confirm_password, placeholder="请再次输入密码") }}
{{ render_field(form.agree_terms) }}

<button type="submit" class="btn btn-primary">注册</button>
</form>

<p>已有账户?<a href="{{ url_for('auth.login') }}">立即登录</a></p>
</div>
{% endblock %}

17.6.3 文件上传处理

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
import os
import uuid
from pathlib import Path
from flask import Blueprint, request, jsonify, current_app
from werkzeug.utils import secure_filename

upload_bp = Blueprint("upload", __name__)

ALLOWED_IMAGE_EXTENSIONS = {"png", "jpg", "jpeg", "gif", "webp"}
ALLOWED_DOCUMENT_EXTENSIONS = {"pdf", "doc", "docx", "xls", "xlsx", "txt"}
MAX_IMAGE_SIZE = 5 * 1024 * 1024
MAX_DOCUMENT_SIZE = 20 * 1024 * 1024


def allowed_file(filename, allowed_extensions):
return (
"." in filename
and filename.rsplit(".", 1)[1].lower() in allowed_extensions
)


def generate_safe_filename(filename):
name = secure_filename(filename)
return f"{uuid.uuid4().hex[:8]}_{name}"


def get_upload_path(category="images"):
upload_root = Path(current_app.config.get("UPLOAD_FOLDER", "uploads"))
target_dir = upload_root / category
target_dir.mkdir(parents=True, exist_ok=True)
return target_dir


@upload_bp.route("/upload/image", methods=["POST"])
def upload_image():
if "file" not in request.files:
return jsonify({"error": "未找到文件"}), 400

file = request.files["file"]
if file.filename == "":
return jsonify({"error": "未选择文件"}), 400

if not allowed_file(file.filename, ALLOWED_IMAGE_EXTENSIONS):
return jsonify({"error": f"不支持的文件类型"}), 400

file.seek(0, 2)
file_size = file.tell()
file.seek(0)

if file_size > MAX_IMAGE_SIZE:
return jsonify({"error": "文件大小超过5MB限制"}), 400

filename = generate_safe_filename(file.filename)
save_path = get_upload_path("images") / filename
file.save(str(save_path))

return jsonify({
"message": "上传成功",
"filename": filename,
"url": f"/uploads/images/{filename}",
"size": file_size,
}), 201

17.7 数据库集成

17.7.1 SQLAlchemy ORM模型定义

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
from datetime import datetime, timezone
from flask_sqlalchemy import SQLAlchemy
from flask_login import UserMixin
from werkzeug.security import generate_password_hash, check_password_hash

db = SQLAlchemy()


class User(UserMixin, db.Model):
__tablename__ = "users"

id = db.Column(db.Integer, primary_key=True)
username = db.Column(db.String(80), unique=True, nullable=False, index=True)
email = db.Column(db.String(120), unique=True, nullable=False, index=True)
password_hash = db.Column(db.String(256), nullable=False)
nickname = db.Column(db.String(50))
bio = db.Column(db.Text)
avatar_url = db.Column(db.String(256))
is_active = db.Column(db.Boolean, default=True, nullable=False)
is_admin = db.Column(db.Boolean, default=False, nullable=False)
created_at = db.Column(
db.DateTime, default=lambda: datetime.now(timezone.utc), nullable=False
)
updated_at = db.Column(
db.DateTime,
default=lambda: datetime.now(timezone.utc),
onupdate=lambda: datetime.now(timezone.utc),
)

posts = db.relationship("Post", backref="author", lazy="dynamic")
comments = db.relationship("Comment", backref="author", lazy="dynamic")

def set_password(self, password):
self.password_hash = generate_password_hash(password)

def check_password(self, password):
return check_password_hash(self.password_hash, password)

def __repr__(self):
return f"<User {self.username}>"


post_tags = db.Table(
"post_tags",
db.Column("post_id", db.Integer, db.ForeignKey("posts.id"), primary_key=True),
db.Column("tag_id", db.Integer, db.ForeignKey("tags.id"), primary_key=True),
)


class Post(db.Model):
__tablename__ = "posts"

id = db.Column(db.Integer, primary_key=True)
title = db.Column(db.String(200), nullable=False)
slug = db.Column(db.String(200), unique=True, nullable=False, index=True)
content = db.Column(db.Text, nullable=False)
summary = db.Column(db.String(500))
is_published = db.Column(db.Boolean, default=False, index=True)
view_count = db.Column(db.Integer, default=0)
author_id = db.Column(db.Integer, db.ForeignKey("users.id"), nullable=False)
category_id = db.Column(db.Integer, db.ForeignKey("categories.id"))
created_at = db.Column(
db.DateTime, default=lambda: datetime.now(timezone.utc), nullable=False, index=True
)
updated_at = db.Column(
db.DateTime,
default=lambda: datetime.now(timezone.utc),
onupdate=lambda: datetime.now(timezone.utc),
)

comments = db.relationship("Comment", backref="post", lazy="dynamic")
tags = db.relationship(
"Tag", secondary=post_tags, backref=db.backref("posts", lazy="dynamic")
)

def __repr__(self):
return f"<Post {self.title}>"


class Category(db.Model):
__tablename__ = "categories"

id = db.Column(db.Integer, primary_key=True)
name = db.Column(db.String(50), unique=True, nullable=False)
slug = db.Column(db.String(50), unique=True, nullable=False)
description = db.Column(db.String(200))

posts = db.relationship("Post", backref="category", lazy="dynamic")

def __repr__(self):
return f"<Category {self.name}>"


class Tag(db.Model):
__tablename__ = "tags"

id = db.Column(db.Integer, primary_key=True)
name = db.Column(db.String(50), unique=True, nullable=False)
slug = db.Column(db.String(50), unique=True, nullable=False)

def __repr__(self):
return f"<Tag {self.name}>"


class Comment(db.Model):
__tablename__ = "comments"

id = db.Column(db.Integer, primary_key=True)
content = db.Column(db.Text, nullable=False)
is_approved = db.Column(db.Boolean, default=False)
post_id = db.Column(db.Integer, db.ForeignKey("posts.id"), nullable=False, index=True)
author_id = db.Column(db.Integer, db.ForeignKey("users.id"), nullable=False)
parent_id = db.Column(db.Integer, db.ForeignKey("comments.id"))
created_at = db.Column(
db.DateTime, default=lambda: datetime.now(timezone.utc), nullable=False
)

replies = db.relationship(
"Comment", backref=db.backref("parent", remote_side=[id]), lazy="dynamic"
)

def __repr__(self):
return f"<Comment {self.id}>"

17.7.2 数据库迁移管理

1
2
3
from flask_migrate import Migrate

migrate = Migrate()

迁移命令:

1
2
3
4
5
6
7
flask db init                  # 初始化迁移仓库
flask db migrate -m "描述" # 生成迁移脚本
flask db upgrade # 执行迁移
flask db downgrade # 回滚迁移
flask db history # 查看迁移历史
flask db current # 查看当前版本
flask db stamp head # 标记当前数据库为最新版本

自定义迁移脚本示例:

1
2
3
4
5
6
7
8
9
10
11
def upgrade():
op.add_column("users", sa.Column("last_login_at", sa.DateTime(), nullable=True))
op.create_index("ix_users_last_login", "users", ["last_login_at"])
op.execute(
"UPDATE users SET last_login_at = created_at WHERE last_login_at IS NULL"
)


def downgrade():
op.drop_index("ix_users_last_login", table_name="users")
op.drop_column("users", "last_login_at")

17.7.3 查询优化

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
from sqlalchemy import func, or_, desc
from sqlalchemy.orm import joinedload, subqueryload, contains_eager


class PostQuery:
@staticmethod
def get_published(page=1, per_page=10):
return (
Post.query.filter_by(is_published=True)
.options(joinedload(Post.author), joinedload(Post.category))
.order_by(Post.created_at.desc())
.paginate(page=page, per_page=per_page, error_out=False)
)

@staticmethod
def get_by_slug(slug):
return (
Post.query.filter_by(slug=slug, is_published=True)
.options(joinedload(Post.author), subqueryload(Post.tags))
.first_or_404()
)

@staticmethod
def search(keyword, page=1, per_page=10):
search_filter = or_(
Post.title.ilike(f"%{keyword}%"),
Post.content.ilike(f"%{keyword}%"),
Post.summary.ilike(f"%{keyword}%"),
)
return (
Post.query.filter(search_filter, Post.is_published == True)
.options(joinedload(Post.author))
.order_by(Post.created_at.desc())
.paginate(page=page, per_page=per_page, error_out=False)
)

@staticmethod
def get_by_category(category_slug, page=1, per_page=10):
return (
Post.query.join(Post.category)
.filter(Category.slug == category_slug, Post.is_published == True)
.options(contains_eager(Post.category), joinedload(Post.author))
.order_by(Post.created_at.desc())
.paginate(page=page, per_page=per_page, error_out=False)
)

@staticmethod
def get_stats():
return db.session.query(
func.date_trunc("month", Post.created_at).label("month"),
func.count(Post.id).label("count"),
).filter(
Post.is_published == True
).group_by(
func.date_trunc("month", Post.created_at)
).order_by(
desc("month")
).all()

@staticmethod
def get_popular(limit=10):
return (
Post.query.filter_by(is_published=True)
.order_by(Post.view_count.desc())
.limit(limit)
.all()
)

N+1查询问题与解决方案:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
# N+1查询问题(低效)
posts = Post.query.all()
for post in posts:
print(post.author.username) # 每次循环都执行一次查询

# 解决方案1: joinedload (JOIN查询)
posts = Post.query.options(joinedload(Post.author)).all()

# 解决方案2: subqueryload (子查询)
posts = Post.query.options(subqueryload(Post.tags)).all()

# 解决方案3: contains_eager (手动控制JOIN)
posts = (
Post.query.join(Post.author)
.filter(User.is_active == True)
.options(contains_eager(Post.author))
.all()
)

17.8 用户认证与授权

17.8.1 Flask-Login集成

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
from flask_login import LoginManager
from .models import User

login_manager = LoginManager()
login_manager.login_view = "auth.login"
login_manager.login_message = "请登录后访问此页面"
login_manager.login_message_category = "info"
login_manager.session_protection = "strong"


@login_manager.user_loader
def load_user(user_id):
return db.session.get(User, int(user_id))


@login_manager.unauthorized_handler
def unauthorized():
from flask import request, jsonify

if (
request.accept_mimetypes.best_match(["application/json", "text/html"])
== "application/json"
):
return jsonify({"error": "未授权访问", "status": 401}), 401
return redirect(url_for("auth.login"))

17.8.2 JWT认证

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
import jwt
from datetime import datetime, timedelta, timezone
from functools import wraps
from flask import request, jsonify, current_app


class TokenService:
@staticmethod
def generate_access_token(user_id, expires_in=3600):
payload = {
"sub": user_id,
"iat": datetime.now(timezone.utc),
"exp": datetime.now(timezone.utc) + timedelta(seconds=expires_in),
"type": "access",
}
return jwt.encode(
payload,
current_app.config["SECRET_KEY"],
algorithm="HS256",
)

@staticmethod
def generate_refresh_token(user_id, expires_in=2592000):
payload = {
"sub": user_id,
"iat": datetime.now(timezone.utc),
"exp": datetime.now(timezone.utc) + timedelta(seconds=expires_in),
"type": "refresh",
}
return jwt.encode(
payload,
current_app.config["SECRET_KEY"],
algorithm="HS256",
)

@staticmethod
def decode_token(token):
try:
payload = jwt.decode(
token,
current_app.config["SECRET_KEY"],
algorithms=["HS256"],
)
return payload
except jwt.ExpiredSignatureError:
return None
except jwt.InvalidTokenError:
return None


def token_required(f):
@wraps(f)
def decorated(*args, **kwargs):
auth_header = request.headers.get("Authorization")
if not auth_header or not auth_header.startswith("Bearer "):
return jsonify({"error": "缺少有效的认证令牌"}), 401

token = auth_header[7:]
payload = TokenService.decode_token(token)

if payload is None:
return jsonify({"error": "令牌无效或已过期"}), 401

if payload.get("type") != "access":
return jsonify({"error": "令牌类型错误"}), 401

from .models import User
user = db.session.get(User, payload["sub"])
if user is None or not user.is_active:
return jsonify({"error": "用户不存在或已禁用"}), 401

request.current_user = user
return f(*args, **kwargs)

return decorated


def admin_required(f):
@wraps(f)
@token_required
def decorated(*args, **kwargs):
if not request.current_user.is_admin:
return jsonify({"error": "需要管理员权限"}), 403
return f(*args, **kwargs)

return decorated

17.8.3 权限控制系统

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
from functools import wraps
from flask import abort
from flask_login import current_user


class Permission:
READ = 0x01
CREATE = 0x02
UPDATE = 0x04
DELETE = 0x08
ADMIN = 0x80


class Role:
GUEST = 0x01
AUTHOR = 0x01 | 0x02 | 0x04
EDITOR = 0x01 | 0x02 | 0x04 | 0x08
ADMIN = 0x01 | 0x02 | 0x04 | 0x08 | 0x80


def permission_required(permission):
def decorator(f):
@wraps(f)
def decorated(*args, **kwargs):
if not current_user.is_authenticated:
abort(401)
if not has_permission(current_user, permission):
abort(403)
return f(*args, **kwargs)
return decorated
return decorator


def has_permission(user, permission):
if user.is_admin:
return True
role_permissions = getattr(user, "role_permissions", 0)
return (role_permissions & permission) == permission


def ownership_required(model_class, id_param="id", owner_attr="author_id"):
def decorator(f):
@wraps(f)
def decorated(*args, **kwargs):
resource_id = kwargs.get(id_param)
resource = model_class.query.get_or_404(resource_id)

if current_user.is_admin:
return f(*args, **kwargs)

owner_id = getattr(resource, owner_attr)
if owner_id != current_user.id:
abort(403)

return f(*args, **kwargs, resource=resource)
return decorated
return decorator

17.9 安全防护

17.9.1 CSRF防护

1
2
3
from flask_wtf.csrf import CSRFProtect

csrf = CSRFProtect()

AJAX请求中传递CSRF令牌:

1
2
3
4
5
6
7
8
9
10
11
// 在所有AJAX请求中自动添加CSRF令牌
const csrfToken = document.querySelector('meta[name="csrf-token"]').content;

fetch("/api/endpoint", {
method: "POST",
headers: {
"Content-Type": "application/json",
"X-CSRFToken": csrfToken,
},
body: JSON.stringify(data),
});
1
2
<!-- 在基础模板中添加CSRF元标签 -->
<meta name="csrf-token" content="{{ csrf_token() }}">

17.9.2 安全头部配置

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
from flask import Flask
from flask_talisman import Talisman

app = Flask(__name__)

talisman = Talisman(
app,
force_https=True,
strict_transport_security=True,
strict_transport_security_max_age=31536000,
strict_transport_security_include_subdomains=True,
content_security_policy={
"default-src": "'self'",
"script-src": ["'self'", "https://cdn.jsdelivr.net"],
"style-src": ["'self'", "'unsafe-inline'", "https://cdn.jsdelivr.net"],
"img-src": ["'self'", "data:", "https:"],
"font-src": ["'self'", "https://cdn.jsdelivr.net"],
"connect-src": ["'self'", "https://api.example.com"],
"frame-ancestors": "'none'",
},
referrer_policy="strict-origin-when-cross-origin",
)

17.9.3 输入验证与SQL注入防护

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
from markupsafe import escape, Markup
import bleach


def sanitize_html(content, allowed_tags=None, allowed_attributes=None):
default_tags = [
"p", "br", "strong", "em", "u", "h1", "h2", "h3",
"h4", "h5", "h6", "ul", "ol", "li", "a", "blockquote",
"code", "pre", "img",
]
default_attributes = {
"a": ["href", "title"],
"img": ["src", "alt", "width", "height"],
"code": ["class"],
}

tags = allowed_tags or default_tags
attrs = allowed_attributes or default_attributes

cleaned = bleach.clean(
content,
tags=tags,
attributes=attrs,
strip=True,
)
return Markup(cleaned)

SQL注入防护的核心原则——始终使用参数化查询:

1
2
3
4
5
6
7
8
9
10
11
# 危险!SQL注入漏洞
query = f"SELECT * FROM users WHERE username = '{username}'"

# 安全:ORM方式
user = User.query.filter_by(username=username).first()

# 安全:原生SQL参数化
result = db.session.execute(
text("SELECT * FROM users WHERE username = :username"),
{"username": username},
)

17.9.4 密码安全

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
from werkzeug.security import generate_password_hash, check_password_hash
import secrets


class PasswordPolicy:
MIN_LENGTH = 8
MAX_LENGTH = 128
REQUIRE_UPPERCASE = True
REQUIRE_LOWERCASE = True
REQUIRE_DIGIT = True
REQUIRE_SPECIAL = True

@classmethod
def validate(cls, password):
errors = []
if len(password) < cls.MIN_LENGTH:
errors.append(f"密码长度不能少于{cls.MIN_LENGTH}个字符")
if len(password) > cls.MAX_LENGTH:
errors.append(f"密码长度不能超过{cls.MAX_LENGTH}个字符")
if cls.REQUIRE_UPPERCASE and not any(c.isupper() for c in password):
errors.append("密码必须包含大写字母")
if cls.REQUIRE_LOWERCASE and not any(c.islower() for c in password):
errors.append("密码必须包含小写字母")
if cls.REQUIRE_DIGIT and not any(c.isdigit() for c in password):
errors.append("密码必须包含数字")
if cls.REQUIRE_SPECIAL and not any(
c in "!@#$%^&*()_+-=[]{}|;:,.<>?" for c in password
):
errors.append("密码必须包含特殊字符")
return errors

@staticmethod
def hash_password(password):
return generate_password_hash(password, method="pbkdf2:sha256", salt_length=16)

@staticmethod
def verify_password(password, password_hash):
return check_password_hash(password_hash, password)

@staticmethod
def generate_token(nbytes=32):
return secrets.token_urlsafe(nbytes)

17.10 RESTful API开发

17.10.1 API蓝图与序列化

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
from flask import Blueprint, request, jsonify
from marshmallow import Schema, fields, validate, post_load, EXCLUDE

api_bp = Blueprint("api", __name__, url_prefix="/api/v1")


class UserSchema(Schema):
id = fields.Integer(dump_only=True)
username = fields.String(required=True, validate=validate.Length(min=3, max=20))
email = fields.Email(required=True)
nickname = fields.String(load_default="")
bio = fields.String(load_default="")
created_at = fields.DateTime(dump_only=True)

class Meta:
unknown = EXCLUDE


class PostSchema(Schema):
id = fields.Integer(dump_only=True)
title = fields.String(required=True, validate=validate.Length(min=1, max=200))
content = fields.String(required=True)
summary = fields.String(load_default="")
is_published = fields.Boolean(load_default=False)
slug = fields.String(dump_only=True)
author = fields.Nested(UserSchema, dump_only=True)
tags = fields.List(fields.String(), load_default=[])
created_at = fields.DateTime(dump_only=True)
updated_at = fields.DateTime(dump_only=True)


class PaginatedSchema(Schema):
items = fields.List(fields.Dict())
page = fields.Integer()
per_page = fields.Integer()
total = fields.Integer()
pages = fields.Integer()
has_next = fields.Boolean()
has_prev = fields.Boolean()

17.10.2 API视图实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
from flask import Blueprint, request, jsonify
from .models import User, Post, db
from .schemas import UserSchema, PostSchema
from .auth import token_required, admin_required

api_bp = Blueprint("api", __name__, url_prefix="/api/v1")

user_schema = UserSchema()
users_schema = UserSchema(many=True)
post_schema = PostSchema()
posts_schema = PostSchema(many=True)


@api_bp.route("/posts", methods=["GET"])
def get_posts():
page = request.args.get("page", 1, type=int)
per_page = request.args.get("per_page", 10, type=int)
per_page = min(per_page, 100)

pagination = (
Post.query.filter_by(is_published=True)
.order_by(Post.created_at.desc())
.paginate(page=page, per_page=per_page, error_out=False)
)

return jsonify({
"items": posts_schema.dump(pagination.items),
"page": pagination.page,
"per_page": pagination.per_page,
"total": pagination.total,
"pages": pagination.pages,
})


@api_bp.route("/posts/<int:post_id>", methods=["GET"])
def get_post(post_id):
post = Post.query.get_or_404(post_id)
return jsonify(post_schema.dump(post))


@api_bp.route("/posts", methods=["POST"])
@token_required
def create_post():
json_data = request.get_json()
if not json_data:
return jsonify({"error": "未提供输入数据"}), 400

errors = post_schema.validate(json_data)
if errors:
return jsonify({"errors": errors}), 422

data = post_schema.load(json_data)
post = Post(
title=data["title"],
content=data["content"],
summary=data.get("summary", ""),
is_published=data.get("is_published", False),
author_id=request.current_user.id,
)
db.session.add(post)
db.session.commit()

return jsonify(post_schema.dump(post)), 201


@api_bp.route("/posts/<int:post_id>", methods=["PUT"])
@token_required
def update_post(post_id):
post = Post.query.get_or_404(post_id)

if post.author_id != request.current_user.id and not request.current_user.is_admin:
return jsonify({"error": "无权限修改此文章"}), 403

json_data = request.get_json()
if not json_data:
return jsonify({"error": "未提供输入数据"}), 400

errors = post_schema.validate(json_data, partial=True)
if errors:
return jsonify({"errors": errors}), 422

data = post_schema.load(json_data, partial=True)
for key, value in data.items():
setattr(post, key, value)

db.session.commit()
return jsonify(post_schema.dump(post))


@api_bp.route("/posts/<int:post_id>", methods=["DELETE"])
@token_required
def delete_post(post_id):
post = Post.query.get_or_404(post_id)

if post.author_id != request.current_user.id and not request.current_user.is_admin:
return jsonify({"error": "无权限删除此文章"}), 403

db.session.delete(post)
db.session.commit()
return "", 204

17.10.3 API错误处理与分页

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
from flask import jsonify
from werkzeug.exceptions import HTTPException


class APIError(Exception):
def __init__(self, message, status_code=400, payload=None):
super().__init__()
self.message = message
self.status_code = status_code
self.payload = payload

def to_dict(self):
rv = dict(self.payload or {})
rv["error"] = self.message
rv["status"] = self.status_code
return rv


@api_bp.errorhandler(APIError)
def handle_api_error(error):
return jsonify(error.to_dict()), error.status_code


@api_bp.errorhandler(HTTPException)
def handle_http_error(error):
return jsonify({"error": error.description, "status": error.code}), error.code

17.11 测试

17.11.1 测试配置与固件

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
import pytest
from app import create_app, db as _db
from app.models import User


@pytest.fixture(scope="session")
def app():
app = create_app("testing")
yield app


@pytest.fixture(scope="function")
def db(app):
with app.app_context():
_db.create_all()
yield _db
_db.session.remove()
_db.drop_all()


@pytest.fixture(scope="function")
def client(app, db):
return app.test_client()


@pytest.fixture(scope="function")
def sample_user(db):
user = User(username="testuser", email="test@example.com")
user.set_password("TestPass123!")
db.session.add(user)
db.session.commit()
return user


@pytest.fixture(scope="function")
def auth_headers(app, sample_user):
from app.auth import TokenService
with app.app_context():
token = TokenService.generate_access_token(sample_user.id)
return {"Authorization": f"Bearer {token}"}

17.11.2 视图测试

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
class TestAuthViews:
def test_register_page(self, client):
resp = client.get("/auth/register")
assert resp.status_code == 200
assert b"register" in resp.data.lower()

def test_register_success(self, client, db):
resp = client.post("/auth/register", data={
"username": "newuser",
"email": "new@example.com",
"password": "NewPass123!",
"confirm_password": "NewPass123!",
"agree_terms": True,
}, follow_redirects=True)
assert resp.status_code == 200
user = User.query.filter_by(username="newuser").first()
assert user is not None

def test_register_duplicate_username(self, client, sample_user):
resp = client.post("/auth/register", data={
"username": sample_user.username,
"email": "other@example.com",
"password": "NewPass123!",
"confirm_password": "NewPass123!",
"agree_terms": True,
})
assert resp.status_code == 200
assert b"already" in resp.data or b"已被" in resp.data

def test_login_success(self, client, sample_user):
resp = client.post("/auth/login", data={
"username": "testuser",
"password": "TestPass123!",
}, follow_redirects=True)
assert resp.status_code == 200

def test_login_invalid_password(self, client, sample_user):
resp = client.post("/auth/login", data={
"username": "testuser",
"password": "wrongpassword",
})
assert resp.status_code == 200
assert b"error" in resp.data.lower() or b"错误" in resp.data

def test_logout(self, client, sample_user):
client.post("/auth/login", data={
"username": "testuser",
"password": "TestPass123!",
})
resp = client.get("/auth/logout", follow_redirects=True)
assert resp.status_code == 200

17.11.3 API测试

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
import json


class TestPostAPI:
def test_get_posts(self, client):
resp = client.get("/api/v1/posts")
assert resp.status_code == 200
data = resp.get_json()
assert "items" in data
assert "page" in data

def test_create_post_unauthorized(self, client):
resp = client.post("/api/v1/posts", json={"title": "Test"})
assert resp.status_code == 401

def test_create_post_success(self, client, auth_headers):
resp = client.post(
"/api/v1/posts",
json={"title": "Test Post", "content": "Content here"},
headers=auth_headers,
)
assert resp.status_code == 201
data = resp.get_json()
assert data["title"] == "Test Post"

def test_create_post_validation_error(self, client, auth_headers):
resp = client.post(
"/api/v1/posts",
json={"title": ""},
headers=auth_headers,
)
assert resp.status_code == 422

def test_update_post(self, client, auth_headers, db):
from app.models import Post
post = Post(title="Old", content="Old content", author_id=1)

## 17.12 本章小结

本章系统介绍了Flask Web开发的完整体系:

1. **Web框架基础**:HTTP协议、WSGI/ASGI协议、Flask设计哲学
2. **应用核心机制**:应用工厂模式、上下文机制、配置管理
3. **路由系统**:动态路由、蓝图模块化、URL构建
4. **模板引擎**:Jinja2语法、模板继承、自定义过滤器
5. **数据持久层**:SQLAlchemy ORM、关系建模、查询优化
6. **认证授权**:Flask-Login、JWT认证、权限控制
7. **安全防护**:CSRF防护、安全头部、输入验证、密码安全
8. **RESTful API**:API设计、序列化、错误处理
9. **测试**:测试配置、视图测试、API测试

## 17.13 延伸阅读

### 17.13.1 Flask官方资源

- **Flask官方文档** (https://flask.palletsprojects.com/) — Flask权威指南
- **Werkzeug文档** (https://werkzeug.palletsprojects.com/) — WSGI工具库
- **Jinja2文档** (https://jinja.palletsprojects.com/) — 模板引擎

### 17.13.2 扩展与生态

- **Flask-SQLAlchemy** (https://flask-sqlalchemy.palletsprojects.com/) — ORM集成
- **Flask-Migrate** (https://flask-migrate.readthedocs.io/) — 数据库迁移
- **Flask-RESTful** (https://flask-restful.readthedocs.io/) — REST API扩展
- **Flask-Login** (https://flask-login.readthedocs.io/) — 用户认证

### 17.13.3 进阶书籍

- **《Flask Web Development》** (Miguel Grinberg) — Flask开发经典
- **《Python Web Development with Flask》** — 现代Flask实践
- **《Architecture Patterns with Python》** — Python架构模式

### 17.13.4 部署与运维

- **Gunicorn** (https://gunicorn.org/) — WSGI服务器
- **uWSGI** (https://uwsgi-docs.readthedocs.io/) — 应用服务器
- **Docker部署** (https://docs.docker.com/) — 容器化部署
- **Nginx配置** (https://nginx.org/en/docs/) — 反向代理

---

db.session.add(post)
db.session.commit()

resp = client.put(
f"/api/v1/posts/{post.id}",
json={"title": "New Title"},
headers=auth_headers,
)
assert resp.status_code == 200
assert resp.get_json()["title"] == "New Title"

def test_delete_post(self, client, auth_headers, db):
from app.models import Post
post = Post(title="Delete Me", content="Content", author_id=1)
db.session.add(post)
db.session.commit()

resp = client.delete(
f"/api/v1/posts/{post.id}",
headers=auth_headers,
)
assert resp.status_code == 204

17.12 生产部署

17.12.1 Gunicorn部署

1
2
3
4
pip install gunicorn
gunicorn -w 4 -b 0.0.0.0:8000 "app:create_app()"
gunicorn -w 4 -b 0.0.0.0:8000 --worker-class gthread --threads 2 "app:create_app()"
gunicorn -w 4 -k uvicorn.workers.UvicornWorker -b 0.0.0.0:8000 "app:create_app()"

Gunicorn配置文件 gunicorn.conf.py

1
2
3
4
5
6
7
8
9
10
11
12
13
14
import multiprocessing

bind = "0.0.0.0:8000"
workers = multiprocessing.cpu_count() * 2 + 1
worker_class = "gthread"
threads = 2
timeout = 120
keepalive = 5
accesslog = "-"
errorlog = "-"
loglevel = "info"
preload_app = True
max_requests = 5000
max_requests_jitter = 500

17.12.2 Docker容器化

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
FROM python:3.12-slim

WORKDIR /app

COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt

COPY . .

RUN useradd --create-home appuser
USER appuser

EXPOSE 8000

CMD ["gunicorn", "-c", "gunicorn.conf.py", "app:create_app()"]
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
version: "3.8"
services:
web:
build: .
ports:
- "8000:8000"
environment:
- FLASK_ENV=production
- SECRET_KEY=${SECRET_KEY}
- DATABASE_URL=postgresql://user:pass@db:5432/app
depends_on:
- db
- redis
restart: unless-stopped

db:
image: postgres:16-alpine
environment:
POSTGRES_DB: app
POSTGRES_USER: user
POSTGRES_PASSWORD: pass
volumes:
- pgdata:/var/lib/postgresql/data

redis:
image: redis:7-alpine
restart: unless-stopped

nginx:
image: nginx:alpine
ports:
- "80:80"
- "443:443"
volumes:
- ./nginx.conf:/etc/nginx/nginx.conf:ro
- ./certs:/etc/nginx/certs:ro
depends_on:
- web

volumes:
pgdata:

17.12.3 Nginx反向代理

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
upstream flask_app {
server web:8000;
}

server {
listen 80;
server_name example.com;
return 301 https://$server_name$request_uri;
}

server {
listen 443 ssl http2;
server_name example.com;

ssl_certificate /etc/nginx/certs/cert.pem;
ssl_certificate_key /etc/nginx/certs/key.pem;

client_max_body_size 16M;

location / {
proxy_pass http://flask_app;
proxy_set_header Host $host;
proxy_set_header X-Real-IP $remote_addr;
proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
proxy_set_header X-Forwarded-Proto $scheme;
}

location /static/ {
alias /app/static/;
expires 30d;
add_header Cache-Control "public, immutable";
}

location /uploads/ {
alias /app/uploads/;
expires 7d;
}
}

17.12.4 性能优化策略

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
from flask import Flask
from flask_caching import Cache

app = Flask(__name__)
app.config["CACHE_TYPE"] = "RedisCache"
app.config["CACHE_REDIS_URL"] = "redis://localhost:6379/0"
app.config["CACHE_DEFAULT_TIMEOUT"] = 300

cache = Cache(app)


@app.route("/api/posts")
@cache.cached(timeout=60, query_string=True)
def get_posts():
from .models import Post
posts = Post.query.filter_by(is_published=True).all()
return jsonify([p.to_dict() for p in posts])


@app.route("/api/posts/<int:post_id>")
@cache.cached(timeout=300)
def get_post(post_id):
from .models import Post
post = Post.query.get_or_404(post_id)
return jsonify(post.to_dict())

数据库连接池优化:

1
2
3
4
5
6
7
app.config["SQLALCHEMY_ENGINE_OPTIONS"] = {
"pool_size": 20,
"max_overflow": 10,
"pool_recycle": 3600,
"pool_pre_ping": True,
"echo_pool": False,
}

17.13 前沿技术动态

17.13.1 Flask 3.0+新特性

Flask 3.0于2023年底发布,带来了多项重要改进:

  • 简化扩展初始化:扩展现在支持延迟初始化,不再需要传递app实例
  • 改进的异步支持:原生支持async def视图函数
  • 更严格的路由匹配:修复了部分边缘情况的路由冲突
  • 改进的JSON支持:默认使用Python内置的json模块,支持自定义JSON编码器
1
2
3
4
5
6
7
8
9
10
11
12
13
from flask import Flask

app = Flask(__name__)

app.json.encoder = lambda x: str(x)


@app.route("/async-data")
async def async_data():
import httpx
async with httpx.AsyncClient() as client:
resp = await client.get("https://api.example.com/data")
return resp.json()

17.13.2 Flask与前端框架集成

现代Flask应用常与Vue.js、React等前端框架配合使用:

1
2
3
4
5
6
7
8
9
10
11
from flask import Flask, send_from_directory

app = Flask(__name__, static_folder="dist")


@app.route("/", defaults={"path": ""})
@app.route("/<path:path>")
def serve_spa(path):
if path and os.path.exists(os.path.join("dist", path)):
return send_from_directory("dist", path)
return send_from_directory("dist", "index.html")

17.13.3 微服务架构中的Flask

Flask因其轻量级特性,在微服务架构中广泛使用:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
from flask import Flask, jsonify
from flask_consulate import Consul

app = Flask(__name__)

consul = Consul(app, host="consul-server", port=8500)
consul.register_service(
name="user-service",
port=8000,
tags=["flask", "user", "api"],
)


@app.route("/health")
def health():
return jsonify({"status": "healthy"})


@app.route("/api/users/<int:user_id>")
def get_user(user_id):
return jsonify({"id": user_id, "username": "example"})

17.14 本章小结

本章系统阐述了Flask Web开发的核心知识体系:

  1. Web框架基础:深入理解WSGI/ASGI协议原理,掌握HTTP请求-响应生命周期
  2. Flask核心机制:应用工厂模式、上下文机制(应用上下文与请求上下文)、配置管理
  3. 路由系统:动态路由、自定义转换器、RESTful设计、蓝图模块化
  4. 请求与响应:请求对象属性、响应构建、流式响应、请求钩子
  5. 模板引擎:Jinja2继承体系、自定义过滤器、模板宏
  6. 表单处理:Flask-WTF验证、文件上传安全处理
  7. 数据库集成:SQLAlchemy ORM、关系建模、迁移管理、N+1查询优化
  8. 认证授权:Flask-Login会话认证、JWT令牌认证、位运算权限控制
  9. 安全防护:CSRF、XSS、SQL注入防御、安全头部、密码安全
  10. RESTful API:Marshmallow序列化、API错误处理、分页
  11. 测试:pytest固件、视图测试、API测试
  12. 生产部署:Gunicorn、Docker容器化、Nginx反向代理、缓存优化

17.15 习题与项目练习

基础题

  1. 使用应用工厂模式创建一个Flask应用,包含首页、关于页面和联系页面三个蓝图。

  2. 实现一个支持用户注册、登录、退出的完整认证系统,要求包含密码强度验证和CSRF防护。

  3. 使用Flask-SQLAlchemy设计一个博客系统的数据模型,包含用户、文章、分类、标签和评论五个模型及其关联关系。

进阶题

  1. 实现一个完整的RESTful API,支持文章的CRUD操作,包含JWT认证、Marshmallow序列化验证和分页功能。

  2. 设计并实现一个基于位运算的权限控制系统,支持角色(访客、作者、编辑、管理员)和权限(读、写、删、管理)的灵活组合。

  3. 实现文件上传功能,要求支持图片和文档两种类型,包含文件类型验证、大小限制、安全文件名生成和存储路径管理。

综合项目

  1. 博客平台项目:构建一个功能完整的博客平台,包含以下功能:

    • 用户注册/登录(邮箱验证)
    • 文章发布/编辑/删除(Markdown支持)
    • 分类和标签管理
    • 评论系统(支持嵌套回复)
    • 全文搜索
    • 管理后台
    • RESTful API
    • 完整的测试覆盖
  2. 任务管理系统项目:构建一个团队协作任务管理系统,包含:

    • 用户认证与权限管理
    • 项目和任务CRUD
    • 任务分配与状态流转
    • 实时通知(WebSocket)
    • API接口
    • Docker部署配置

思考题

  1. Flask的”微框架”设计哲学在什么场景下优于Django的”全栈框架”设计?在什么场景下相反?请从技术架构、团队协作、项目规模三个维度分析。

  2. 在微服务架构中,Flask应用如何实现服务发现、配置中心、链路追踪等分布式系统必需的能力?请设计一个基于Flask的微服务技术栈方案。


下一章:第18章 Django Web开发