第26章 实战:Web应用开发

学习目标

完成本章学习后,读者应能够:

  1. 掌握Web应用架构设计:理解分层架构、领域驱动设计在Web应用中的实践,构建可维护、可扩展的应用结构
  2. 精通配置管理:实现多环境配置、敏感信息保护与配置热更新机制
  3. 构建数据持久层:运用SQLAlchemy实现复杂关系建模、数据验证、迁移管理与查询优化
  4. 实现完整认证授权体系:基于JWT构建无状态认证、角色权限控制与OAuth2集成
  5. 设计RESTful API:遵循 Richardson 成熟度模型,实现资源化API、版本管理与HATEOAS
  6. 掌握安全防护实践:防御OWASP Top 10安全威胁,实现CSP、CORS、速率限制等安全策略
  7. 实施测试策略:构建单元测试、集成测试、API测试与端到端测试的完整测试金字塔
  8. 完成生产部署:掌握Docker容器化、Nginx反向代理、数据库优化与监控告警体系

26.1 Web应用架构设计理论

26.1.1 分层架构

生产级Web应用通常采用分层架构,将关注点分离到不同层次:

1
2
3
4
5
6
7
8
9
10
11
12
13
┌─────────────────────────────────────────────┐
│ 表示层 (Presentation) │
│ Templates / Static Assets / API Serializers│
├─────────────────────────────────────────────┤
│ 业务逻辑层 (Business Logic) │
│ Services / Domain Models / Business Rules │
├─────────────────────────────────────────────┤
│ 数据访问层 (Data Access) │
│ Repositories / ORM / Query Builders │
├─────────────────────────────────────────────┤
│ 基础设施层 (Infrastructure) │
│ Database / Cache / Message Queue / Storage │
└─────────────────────────────────────────────┘

各层职责与交互原则:

层次职责依赖方向关键模式
表示层请求解析、响应序列化、输入验证→ 业务逻辑层MVC、序列化器
业务逻辑层核心业务规则、工作流编排→ 数据访问层服务模式、领域模型
数据访问层数据持久化、查询优化→ 基础设施层仓储模式、Unit of Work
基础设施层外部系统集成、技术实现无上层依赖适配器模式

26.1.2 领域驱动设计核心概念

在复杂Web应用中,领域驱动设计(DDD)提供了组织业务逻辑的系统方法:

  • 实体(Entity):具有唯一标识的领域对象,如用户、文章
  • 值对象(Value Object):无唯一标识、通过属性定义的对象,如邮箱地址
  • 聚合根(Aggregate Root):一组相关实体的边界,外部只能通过聚合根访问
  • 领域服务(Domain Service):不属于任何实体的业务操作
  • 仓储(Repository):聚合根的持久化抽象接口

26.1.3 项目技术选型

组件选型选型理由
Web框架Flask 3.x微框架灵活性,适合渐进式架构演进
ORMSQLAlchemy 2.xPython生态最成熟的ORM,支持异步与类型注解
数据库迁移AlembicSQLAlchemy官方迁移工具,支持分支合并
认证PyJWT + Flask-LoginJWT用于API认证,Session用于Web认证
表单WTForms服务端验证,与Jinja2深度集成
任务队列Celery + Redis异步任务处理(邮件发送、数据统计)
缓存Flask-Caching + Redis多级缓存策略,支持Redis/Memcached
测试pytest + Factory Boy工厂模式创建测试数据,pytest fixtures管理
部署Docker + Gunicorn容器化部署,Gunicorn作为WSGI服务器

26.2 项目概述

本章将构建一个生产级博客平台 flaskblog,涵盖以下核心功能:

  • 用户注册、登录、OAuth2第三方登录
  • 文章的增删改查、草稿管理、Markdown渲染
  • 文章分类与标签系统
  • 评论与回复功能(支持嵌套)
  • 全文搜索(Whoosh/PostgreSQL全文索引)
  • RESTful API(JWT认证、版本管理、分页、过滤)
  • 管理后台(数据统计、用户管理、内容审核)
  • 异步任务(邮件通知、数据统计)
  • 缓存策略
  • Docker容器化部署

26.3 项目结构

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
flaskblog/
├── app/
│ ├── __init__.py # 应用工厂
│ ├── extensions.py # 扩展初始化
│ ├── models/
│ │ ├── __init__.py
│ │ ├── user.py # 用户模型
│ │ ├── post.py # 文章模型
│ │ ├── comment.py # 评论模型
│ │ └── mixins.py # 通用混入类
│ ├── services/
│ │ ├── __init__.py
│ │ ├── auth.py # 认证服务
│ │ ├── post.py # 文章服务
│ │ ├── search.py # 搜索服务
│ │ └── email.py # 邮件服务
│ ├── api/
│ │ ├── __init__.py
│ │ ├── v1/
│ │ │ ├── __init__.py
│ │ │ ├── auth.py # API认证端点
│ │ │ ├── posts.py # 文章API
│ │ │ ├── users.py # 用户API
│ │ │ └── comments.py # 评论API
│ │ └── errors.py # API错误处理
│ ├── views/
│ │ ├── __init__.py
│ │ ├── auth.py # Web认证视图
│ │ ├── posts.py # Web文章视图
│ │ ├── admin.py # 管理后台视图
│ │ └── errors.py # 错误页面视图
│ ├── forms/
│ │ ├── __init__.py
│ │ ├── auth.py # 认证表单
│ │ └── post.py # 文章表单
│ ├── templates/ # Jinja2模板
│ ├── static/ # 静态资源
│ └── utils/
│ ├── __init__.py
│ ├── decorators.py # 自定义装饰器
│ ├── pagination.py # 分页工具
│ └── serializers.py # 序列化工具
├── migrations/ # Alembic迁移文件
├── tests/
│ ├── conftest.py # pytest fixtures
│ ├── factories.py # Factory Boy工厂
│ ├── unit/
│ ├── integration/
│ └── e2e/
├── celery_app.py # Celery配置
├── config.py # 配置类
├── pyproject.toml # 项目元数据与依赖
├── Dockerfile # 容器构建
├── docker-compose.yml # 编排配置
├── nginx.conf # Nginx配置
└── .env.example # 环境变量示例

26.4 配置管理

26.4.1 多环境配置

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
import os
from datetime import timedelta
from pathlib import Path


class BaseConfig:
SECRET_KEY = os.environ.get("SECRET_KEY", "dev-secret-key-change-in-production")
SQLALCHEMY_TRACK_MODIFICATIONS = False
SQLALCHEMY_RECORD_QUERIES = True

BASE_DIR = Path(__file__).resolve().parent

SQLITE_DB_PATH = BASE_DIR / "instance" / "app.db"
SQLALCHEMY_DATABASE_URI = os.environ.get(
"DATABASE_URL",
f"sqlite:///{SQLITE_DB_PATH}",
)

REDIS_URL = os.environ.get("REDIS_URL", "redis://localhost:6379/0")

CACHE_TYPE = "RedisCache"
CACHE_REDIS_URL = REDIS_URL
CACHE_DEFAULT_TIMEOUT = 300

JWT_SECRET_KEY = os.environ.get("JWT_SECRET_KEY", "jwt-dev-secret")
JWT_ACCESS_TOKEN_EXPIRES = timedelta(hours=1)
JWT_REFRESH_TOKEN_EXPIRES = timedelta(days=30)

MAIL_SERVER = os.environ.get("MAIL_SERVER", "localhost")
MAIL_PORT = int(os.environ.get("MAIL_PORT", 25))
MAIL_USE_TLS = os.environ.get("MAIL_USE_TLS", "false").lower() == "true"
MAIL_USERNAME = os.environ.get("MAIL_USERNAME")
MAIL_PASSWORD = os.environ.get("MAIL_PASSWORD")
MAIL_DEFAULT_SENDER = os.environ.get("MAIL_DEFAULT_SENDER", "noreply@flaskblog.local")

CELERY_BROKER_URL = REDIS_URL
CELERY_RESULT_BACKEND = REDIS_URL

POSTS_PER_PAGE = 10
COMMENTS_PER_PAGE = 20
MAX_CONTENT_LENGTH = 16 * 1024 * 1024

ALLOWED_EXTENSIONS = {"png", "jpg", "jpeg", "gif", "webp"}
UPLOAD_FOLDER = BASE_DIR / "app" / "static" / "uploads"

SECURITY_PASSWORD_SALT = os.environ.get("SECURITY_PASSWORD_SALT", "password-salt")

@staticmethod
def init_app(app):
pass


class DevelopmentConfig(BaseConfig):
DEBUG = True
SQLALCHEMY_ECHO = True
CACHE_TYPE = "SimpleCache"


class TestingConfig(BaseConfig):
TESTING = True
SQLALCHEMY_DATABASE_URI = "sqlite://"
CACHE_TYPE = "NullCache"
WTF_CSRF_ENABLED = False
JWT_ACCESS_TOKEN_EXPIRES = timedelta(minutes=5)

@staticmethod
def init_app(app):
import tempfile
app.config["UPLOAD_FOLDER"] = Path(tempfile.mkdtemp())


class ProductionConfig(BaseConfig):
SQLALCHEMY_ENGINE_OPTIONS = {
"pool_size": 10,
"pool_recycle": 3600,
"pool_pre_ping": True,
"connect_args": {"connect_timeout": 5},
}

SESSION_COOKIE_SECURE = True
SESSION_COOKIE_HTTPONLY = True
SESSION_COOKIE_SAMESITE = "Lax"
REMEMBER_COOKIE_SECURE = True
REMEMBER_COOKIE_HTTPONLY = True

@staticmethod
def init_app(app):
import logging
from logging.handlers import RotatingFileHandler
handler = RotatingFileHandler(
"flaskblog.log", maxBytes=10 * 1024 * 1024, backupCount=5
)
handler.setFormatter(logging.Formatter(
"%(asctime)s %(levelname)s %(name)s %(threadName)s %(message)s"
))
handler.setLevel(logging.INFO)
app.logger.addHandler(handler)


config_map = {
"development": DevelopmentConfig,
"testing": TestingConfig,
"production": ProductionConfig,
"default": DevelopmentConfig,
}


def get_config():
env = os.environ.get("FLASK_ENV", "default")
return config_map.get(env, DevelopmentConfig)

26.4.2 环境变量管理

.env.example

1
2
3
4
5
6
7
8
9
10
11
12
FLASK_ENV=development
SECRET_KEY=your-secret-key-here
JWT_SECRET_KEY=your-jwt-secret-key-here
SECURITY_PASSWORD_SALT=your-password-salt-here
DATABASE_URL=postgresql://user:password@localhost:5432/flaskblog
REDIS_URL=redis://localhost:6379/0
MAIL_SERVER=smtp.example.com
MAIL_PORT=587
MAIL_USE_TLS=true
MAIL_USERNAME=your-email@example.com
MAIL_PASSWORD=your-email-password
MAIL_DEFAULT_SENDER=noreply@example.com

26.5 应用工厂与扩展初始化

26.5.1 扩展初始化

app/extensions.py

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
from flask_sqlalchemy import SQLAlchemy
from flask_login import LoginManager
from flask_migrate import Migrate
from flask_caching import Cache
from flask_limiter import Limiter
from flask_limiter.util import get_remote_address
from flask_cors import CORS
from flask_wtf.csrf import CSRFProtect


db = SQLAlchemy()
login_manager = LoginManager()
migrate = Migrate()
cache = Cache()
limiter = Limiter(
key_func=get_remote_address,
default_limits=["200 per day", "50 per hour"],
storage_uri="redis://localhost:6379/1",
)
cors = CORS()
csrf = CSRFProtect()


login_manager.login_view = "auth.login"
login_manager.login_message_category = "info"
login_manager.session_protection = "strong"


@login_manager.user_loader
def load_user(user_id):
from app.models.user import User
return db.session.get(User, int(user_id))

26.5.2 应用工厂

app/__init__.py

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
from flask import Flask
from config import get_config
from app.extensions import db, login_manager, migrate, cache, limiter, cors, csrf


def create_app(config_class=None):
app = Flask(__name__)

if config_class is None:
config_class = get_config()
app.config.from_object(config_class)
config_class.init_app(app)

_init_extensions(app)
_register_blueprints(app)
_register_error_handlers(app)
_register_template_filters(app)
_register_context_processors(app)

return app


def _init_extensions(app):
db.init_app(app)
login_manager.init_app(app)
migrate.init_app(app, db)
cache.init_app(app)
limiter.init_app(app)
cors.init_app(app, resources={r"/api/*": {"origins": "*"}})
csrf.init_app(app)
csrf.exempt(app.blueprints.get("api"))

with app.app_context():
from app import models # noqa: F401
db.create_all()


def _register_blueprints(app):
from app.views.auth import auth_bp
from app.views.posts import posts_bp
from app.views.admin import admin_bp
from app.views.errors import errors_bp
from app.api.v1 import api_v1_bp

app.register_blueprint(auth_bp, url_prefix="/auth")
app.register_blueprint(posts_bp)
app.register_blueprint(admin_bp, url_prefix="/admin")
app.register_blueprint(errors_bp)
app.register_blueprint(api_v1_bp, url_prefix="/api/v1")


def _register_error_handlers(app):
from app.views.errors import page_not_found, internal_error, forbidden

app.register_error_handler(404, page_not_found)
app.register_error_handler(500, internal_error)
app.register_error_handler(403, forbidden)


def _register_template_filters(app):
import markdown as md
import bleach

@app.template_filter("markdown")
def markdown_filter(text):
allowed_tags = [
"a", "abbr", "acronym", "b", "blockquote", "br", "code",
"dd", "del", "dl", "dt", "em", "h1", "h2", "h3", "h4",
"h5", "h6", "hr", "i", "img", "li", "ol", "p", "pre",
"span", "strong", "table", "tbody", "td", "th", "thead",
"tr", "ul",
]
allowed_attrs = {
"*": ["class"],
"a": ["href", "rel", "title"],
"img": ["alt", "src", "title"],
}
html = md.markdown(text, extensions=[
"fenced_code", "tables", "toc", "nl2br", "codehilite",
])
return bleach.clean(html, tags=allowed_tags, attributes=allowed_attrs)

@app.template_filter("truncate_chars")
def truncate_chars_filter(text, length=200):
if len(text) <= length:
return text
return text[:length].rsplit(" ", 1)[0] + "..."


def _register_context_processors(app):
from datetime import datetime

@app.context_processor
def inject_globals():
return {
"now": datetime.utcnow,
"site_name": app.config.get("SITE_NAME", "FlaskBlog"),
}

26.6 数据模型层

26.6.1 通用混入类

app/models/mixins.py

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
from datetime import datetime
from app.extensions import db


class TimestampMixin:
created_at = db.Column(db.DateTime, default=datetime.utcnow, nullable=False)
updated_at = db.Column(
db.DateTime, default=datetime.utcnow, onupdate=datetime.utcnow, nullable=False
)


class SoftDeleteMixin:
is_deleted = db.Column(db.Boolean, default=False, nullable=False, index=True)
deleted_at = db.Column(db.DateTime, nullable=True)

def soft_delete(self):
self.is_deleted = True
self.deleted_at = datetime.utcnow()

def restore(self):
self.is_deleted = False
self.deleted_at = None


class SlugMixin:
slug = db.Column(db.String(200), unique=True, nullable=False, index=True)

@staticmethod
def generate_slug(title: str) -> str:
import re
import unicodedata
slug = unicodedata.normalize("NFKD", title)
slug = slug.encode("ascii", "ignore").decode("ascii")
slug = re.sub(r"[^\w\s-]", "", slug).strip().lower()
slug = re.sub(r"[-\s]+", "-", slug)
return slug[:200]

26.6.2 用户模型

app/models/user.py

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
from datetime import datetime, timedelta
from flask_login import UserMixin
from werkzeug.security import generate_password_hash, check_password_hash
from app.extensions import db
from app.models.mixins import TimestampMixin, SoftDeleteMixin


class UserRole:
READER = "reader"
AUTHOR = "author"
EDITOR = "editor"
ADMIN = "admin"


user_follows = db.Table(
"user_follows",
db.Column("follower_id", db.Integer, db.ForeignKey("users.id"), primary_key=True),
db.Column("followed_id", db.Integer, db.ForeignKey("users.id"), primary_key=True),
db.Column("created_at", db.DateTime, default=datetime.utcnow),
)


class User(UserMixin, TimestampMixin, SoftDeleteMixin, db.Model):
__tablename__ = "users"

id = db.Column(db.Integer, primary_key=True)
username = db.Column(db.String(64), unique=True, nullable=False, index=True)
email = db.Column(db.String(120), unique=True, nullable=False, index=True)
password_hash = db.Column(db.String(256), nullable=False)
role = db.Column(db.String(20), default=UserRole.READER, nullable=False)
is_active = db.Column(db.Boolean, default=True, nullable=False)
email_verified = db.Column(db.Boolean, default=False, nullable=False)
email_verified_at = db.Column(db.DateTime, nullable=True)

avatar = db.Column(db.String(200), nullable=True)
bio = db.Column(db.Text, nullable=True)
website = db.Column(db.String(200), nullable=True)
location = db.Column(db.String(100), nullable=True)

last_login_at = db.Column(db.DateTime, nullable=True)
last_login_ip = db.Column(db.String(45), nullable=True)
login_count = db.Column(db.Integer, default=0)

posts = db.relationship("Post", backref="author", lazy="dynamic")
comments = db.relationship("Comment", backref="author", lazy="dynamic")

following = db.relationship(
"User",
secondary=user_follows,
primaryjoin=(user_follows.c.follower_id == id),
secondaryjoin=(user_follows.c.followed_id == id),
backref=db.backref("followers", lazy="dynamic"),
lazy="dynamic",
)

__table_args__ = (
db.Index("ix_users_username_email", "username", "email"),
)

def set_password(self, password: str) -> None:
self.password_hash = generate_password_hash(password, method="pbkdf2:sha256", salt_length=16)

def check_password(self, password: str) -> bool:
return check_password_hash(self.password_hash, password)

@property
def is_admin(self) -> bool:
return self.role == UserRole.ADMIN

@property
def is_editor(self) -> bool:
return self.role in (UserRole.EDITOR, UserRole.ADMIN)

@property
def is_author(self) -> bool:
return self.role in (UserRole.AUTHOR, UserRole.EDITOR, UserRole.ADMIN)

def follow(self, user: "User") -> None:
if not self.is_following(user):
self.following.append(user)

def unfollow(self, user: "User") -> None:
if self.is_following(user):
self.following.remove(user)

def is_following(self, user: "User") -> bool:
return self.following.filter(user_follows.c.followed_id == user.id).count() > 0

def update_login_info(self, ip_address: str) -> None:
self.last_login_at = datetime.utcnow()
self.last_login_ip = ip_address
self.login_count += 1

def get_reset_password_token(self, expires_in: int = 3600) -> str:
import jwt
from flask import current_app
return jwt.encode(
{"reset_password": self.id, "exp": datetime.utcnow() + timedelta(seconds=expires_in)},
current_app.config["SECRET_KEY"],
algorithm="HS256",
)

@staticmethod
def verify_reset_password_token(token: str) -> "User | None":
import jwt
from flask import current_app
try:
data = jwt.decode(token, current_app.config["SECRET_KEY"], algorithms=["HS256"])
return db.session.get(User, data.get("reset_password"))
except jwt.PyJWTError:
return None

def to_dict(self, include_email: bool = False) -> dict:
data = {
"id": self.id,
"username": self.username,
"role": self.role,
"bio": self.bio,
"avatar": self.avatar,
"website": self.website,
"location": self.location,
"post_count": self.posts.filter_by(is_deleted=False).count(),
"follower_count": self.followers.count(),
"following_count": self.following.count(),
"created_at": self.created_at.isoformat(),
}
if include_email:
data["email"] = self.email
return data

def __repr__(self):
return f"<User {self.username!r}>"

26.6.3 文章模型

app/models/post.py

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
from datetime import datetime
from app.extensions import db
from app.models.mixins import TimestampMixin, SoftDeleteMixin, SlugMixin


post_tags = db.Table(
"post_tags",
db.Column("post_id", db.Integer, db.ForeignKey("posts.id"), primary_key=True),
db.Column("tag_id", db.Integer, db.ForeignKey("tags.id"), primary_key=True),
)


class Post(TimestampMixin, SoftDeleteMixin, SlugMixin, db.Model):
__tablename__ = "posts"

id = db.Column(db.Integer, primary_key=True)
title = db.Column(db.String(200), nullable=False)
content = db.Column(db.Text, nullable=False)
summary = db.Column(db.String(500), nullable=True)
cover_image = db.Column(db.String(200), nullable=True)

status = db.Column(db.String(20), default="draft", nullable=False, index=True)
is_pinned = db.Column(db.Boolean, default=False, nullable=False)

view_count = db.Column(db.Integer, default=0, nullable=False)
like_count = db.Column(db.Integer, default=0, nullable=False)
comment_count = db.Column(db.Integer, default=0, nullable=False)

user_id = db.Column(db.Integer, db.ForeignKey("users.id"), nullable=False, index=True)
category_id = db.Column(db.Integer, db.ForeignKey("categories.id"), nullable=True)
published_at = db.Column(db.DateTime, nullable=True, index=True)

tags = db.relationship("Tag", secondary=post_tags, backref=db.backref("posts", lazy="dynamic"))
comments = db.relationship(
"Comment", backref="post", lazy="dynamic",
order_by="Comment.created_at.desc()",
)

__table_args__ = (
db.Index("ix_posts_status_published", "status", "published_at"),
db.Index("ix_posts_user_status", "user_id", "status"),
)

STATUS_DRAFT = "draft"
STATUS_PUBLISHED = "published"
STATUS_ARCHIVED = "archived"

def publish(self) -> None:
if self.status != self.STATUS_PUBLISHED:
self.status = self.STATUS_PUBLISHED
self.published_at = datetime.utcnow()

def archive(self) -> None:
self.status = self.STATUS_ARCHIVED

def increment_view(self) -> None:
self.view_count += 1

def to_dict(self, include_content: bool = True) -> dict:
data = {
"id": self.id,
"slug": self.slug,
"title": self.title,
"summary": self.summary,
"status": self.status,
"is_pinned": self.is_pinned,
"view_count": self.view_count,
"like_count": self.like_count,
"comment_count": self.comment_count,
"author": self.author.to_dict() if self.author else None,
"category": self.category.to_dict() if self.category else None,
"tags": [tag.to_dict() for tag in self.tags],
"published_at": self.published_at.isoformat() if self.published_at else None,
"created_at": self.created_at.isoformat(),
"updated_at": self.updated_at.isoformat(),
}
if include_content:
data["content"] = self.content
return data

def __repr__(self):
return f"<Post {self.title!r}>"


class Category(TimestampMixin, db.Model):
__tablename__ = "categories"

id = db.Column(db.Integer, primary_key=True)
name = db.Column(db.String(50), unique=True, nullable=False)
slug = db.Column(db.String(50), unique=True, nullable=False, index=True)
description = db.Column(db.String(200), nullable=True)
sort_order = db.Column(db.Integer, default=0, nullable=False)

posts = db.relationship("Post", backref="category", lazy="dynamic")

def to_dict(self) -> dict:
return {
"id": self.id,
"name": self.name,
"slug": self.slug,
"description": self.description,
"post_count": self.posts.filter_by(status="published", is_deleted=False).count(),
}


class Tag(TimestampMixin, db.Model):
__tablename__ = "tags"

id = db.Column(db.Integer, primary_key=True)
name = db.Column(db.String(50), unique=True, nullable=False)
slug = db.Column(db.String(50), unique=True, nullable=False, index=True)

def to_dict(self) -> dict:
return {
"id": self.id,
"name": self.name,
"slug": self.slug,
"post_count": self.posts.filter_by(status="published", is_deleted=False).count(),
}

26.6.4 评论模型

app/models/comment.py

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
from app.extensions import db
from app.models.mixins import TimestampMixin, SoftDeleteMixin


class Comment(TimestampMixin, SoftDeleteMixin, db.Model):
__tablename__ = "comments"

id = db.Column(db.Integer, primary_key=True)
content = db.Column(db.Text, nullable=False)
is_approved = db.Column(db.Boolean, default=True, nullable=False)

user_id = db.Column(db.Integer, db.ForeignKey("users.id"), nullable=False, index=True)
post_id = db.Column(db.Integer, db.ForeignKey("posts.id"), nullable=False, index=True)
parent_id = db.Column(db.Integer, db.ForeignKey("comments.id"), nullable=True, index=True)

replies = db.relationship(
"Comment",
backref=db.backref("parent", remote_side=[id]),
lazy="dynamic",
order_by="Comment.created_at.asc()",
)

__table_args__ = (
db.Index("ix_comments_post_approved", "post_id", "is_approved"),
)

@property
def is_reply(self) -> bool:
return self.parent_id is not None

@property
def reply_count(self) -> int:
return self.replies.filter_by(is_deleted=False, is_approved=True).count()

def to_dict(self, include_replies: bool = False) -> dict:
data = {
"id": self.id,
"content": self.content,
"is_approved": self.is_approved,
"author": self.author.to_dict() if self.author else None,
"post_id": self.post_id,
"parent_id": self.parent_id,
"is_reply": self.is_reply,
"reply_count": self.reply_count,
"created_at": self.created_at.isoformat(),
}
if include_replies:
data["replies"] = [
reply.to_dict() for reply in self.replies.filter_by(
is_deleted=False, is_approved=True
).all()
]
return data

26.6.5 模型注册

app/models/__init__.py

1
2
3
4
5
6
7
8
9
from app.models.user import User, UserRole, user_follows
from app.models.post import Post, Category, Tag, post_tags
from app.models.comment import Comment

__all__ = [
"User", "UserRole", "user_follows",
"Post", "Category", "Tag", "post_tags",
"Comment",
]

26.7 服务层

服务层封装业务逻辑,使视图层保持简洁,同时便于测试和复用。

26.7.1 认证服务

app/services/auth.py

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
from datetime import datetime, timedelta
import jwt
from flask import current_app
from app.extensions import db
from app.models.user import User, UserRole


class AuthService:
@staticmethod
def register(username: str, email: str, password: str) -> User:
if User.query.filter_by(username=username).first():
raise ValueError(f"Username {username!r} already exists")
if User.query.filter_by(email=email).first():
raise ValueError(f"Email {email!r} already registered")

user = User(
username=username,
email=email,
role=UserRole.READER,
)
user.set_password(password)
db.session.add(user)
db.session.commit()
return user

@staticmethod
def authenticate(username: str, password: str) -> User | None:
user = User.query.filter(
(User.username == username) | (User.email == username)
).first()
if user and user.check_password(password) and user.is_active:
return user
return None

@staticmethod
def generate_tokens(user: User) -> dict:
now = datetime.utcnow()
access_payload = {
"sub": user.id,
"username": user.username,
"role": user.role,
"iat": now,
"exp": now + current_app.config["JWT_ACCESS_TOKEN_EXPIRES"],
"type": "access",
}
refresh_payload = {
"sub": user.id,
"iat": now,
"exp": now + current_app.config["JWT_REFRESH_TOKEN_EXPIRES"],
"type": "refresh",
}

access_token = jwt.encode(
access_payload,
current_app.config["JWT_SECRET_KEY"],
algorithm="HS256",
)
refresh_token = jwt.encode(
refresh_payload,
current_app.config["JWT_SECRET_KEY"],
algorithm="HS256",
)
return {
"access_token": access_token,
"refresh_token": refresh_token,
"expires_in": int(current_app.config["JWT_ACCESS_TOKEN_EXPIRES"].total_seconds()),
}

@staticmethod
def verify_token(token: str, token_type: str = "access") -> dict | None:
try:
payload = jwt.decode(
token,
current_app.config["JWT_SECRET_KEY"],
algorithms=["HS256"],
)
if payload.get("type") != token_type:
return None
return payload
except jwt.ExpiredSignatureError:
return None
except jwt.InvalidTokenError:
return None

@staticmethod
def refresh_access_token(refresh_token: str) -> dict | None:
payload = AuthService.verify_token(refresh_token, token_type="refresh")
if payload is None:
return None
user = db.session.get(User, payload["sub"])
if user is None or not user.is_active:
return None
return AuthService.generate_tokens(user)

@staticmethod
def change_password(user: User, old_password: str, new_password: str) -> bool:
if not user.check_password(old_password):
return False
user.set_password(new_password)
db.session.commit()
return True

26.7.2 文章服务

app/services/post.py

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
from flask import current_app
from sqlalchemy import or_
from app.extensions import db
from app.models.post import Post, Category, Tag
from app.models.mixins import SlugMixin


class PostService:
@staticmethod
def create_post(user_id: int, title: str, content: str, **kwargs) -> Post:
slug = SlugMixin.generate_slug(title)
existing = Post.query.filter_by(slug=slug).first()
if existing:
slug = f"{slug}-{Post.query.count() + 1}"

post = Post(
title=title,
slug=slug,
content=content,
user_id=user_id,
summary=kwargs.get("summary", content[:200] if content else ""),
cover_image=kwargs.get("cover_image"),
status=kwargs.get("status", Post.STATUS_DRAFT),
is_pinned=kwargs.get("is_pinned", False),
)

if kwargs.get("category_name"):
post.category = PostService._get_or_create_category(kwargs["category_name"])

if kwargs.get("tag_names"):
for tag_name in kwargs["tag_names"]:
tag = PostService._get_or_create_tag(tag_name)
post.tags.append(tag)

if post.status == Post.STATUS_PUBLISHED:
post.publish()

db.session.add(post)
db.session.commit()
return post

@staticmethod
def update_post(post: Post, **kwargs) -> Post:
if "title" in kwargs and kwargs["title"] != post.title:
post.title = kwargs["title"]
post.slug = SlugMixin.generate_slug(kwargs["title"])

for field in ("content", "summary", "cover_image", "is_pinned"):
if field in kwargs:
setattr(post, field, kwargs[field])

if "status" in kwargs:
if kwargs["status"] == Post.STATUS_PUBLISHED:
post.publish()
elif kwargs["status"] == Post.STATUS_ARCHIVED:
post.archive()
else:
post.status = kwargs["status"]

if "category_name" in kwargs:
post.category = PostService._get_or_create_category(kwargs["category_name"])

if "tag_names" in kwargs:
post.tags = []
for tag_name in kwargs["tag_names"]:
tag = PostService._get_or_create_tag(tag_name)
post.tags.append(tag)

db.session.commit()
return post

@staticmethod
def get_published_posts(page: int = 1, per_page: int = None):
per_page = per_page or current_app.config["POSTS_PER_PAGE"]
return Post.query.filter_by(
status=Post.STATUS_PUBLISHED, is_deleted=False
).order_by(
Post.is_pinned.desc(), Post.published_at.desc()
).paginate(page=page, per_page=per_page, error_out=False)

@staticmethod
def get_post_by_slug(slug: str) -> Post | None:
return Post.query.filter_by(slug=slug, is_deleted=False).first()

@staticmethod
def get_user_posts(user_id: int, page: int = 1, status: str = None):
query = Post.query.filter_by(user_id=user_id, is_deleted=False)
if status:
query = query.filter_by(status=status)
return query.order_by(Post.created_at.desc()).paginate(
page=page, per_page=current_app.config["POSTS_PER_PAGE"], error_out=False
)

@staticmethod
def search_posts(query: str, page: int = 1):
search_filter = or_(
Post.title.contains(query),
Post.content.contains(query),
Post.summary.contains(query),
)
return Post.query.filter(
search_filter, Post.status == Post.STATUS_PUBLISHED, Post.is_deleted == False # noqa: E712
).order_by(Post.published_at.desc()).paginate(
page=page, per_page=current_app.config["POSTS_PER_PAGE"], error_out=False
)

@staticmethod
def _get_or_create_category(name: str) -> Category:
category = Category.query.filter_by(name=name).first()
if not category:
category = Category(name=name, slug=SlugMixin.generate_slug(name))
db.session.add(category)
return category

@staticmethod
def _get_or_create_tag(name: str) -> Tag:
tag = Tag.query.filter_by(name=name).first()
if not tag:
tag = Tag(name=name, slug=SlugMixin.generate_slug(name))
db.session.add(tag)
return tag

26.7.3 邮件服务

app/services/email.py

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
from threading import Thread
from flask import current_app, render_template
from app.extensions import mail
from flask_mail import Message


class EmailService:
@staticmethod
def send_async_email(app, msg):
with app.app_context():
mail.send(msg)

@staticmethod
def send_email(to: str | list[str], subject: str, template: str, **kwargs):
app = current_app._get_current_object()
msg = Message(
subject=app.config.get("MAIL_PREFIX", "[FlaskBlog] ") + subject,
sender=app.config["MAIL_DEFAULT_SENDER"],
recipients=[to] if isinstance(to, str) else to,
)
msg.body = render_template(f"emails/{template}.txt", **kwargs)
msg.html = render_template(f"emails/{template}.html", **kwargs)

thread = Thread(target=EmailService.send_async_email, args=(app, msg))
thread.start()
return thread

@staticmethod
def send_verification_email(user, token):
EmailService.send_email(
to=user.email, subject="Verify Your Email",
template="verify_email", user=user, token=token,
)

@staticmethod
def send_password_reset_email(user, token):
EmailService.send_email(
to=user.email, subject="Reset Your Password",
template="reset_password", user=user, token=token,
)

@staticmethod
def send_new_comment_notification(post_author, comment):
EmailService.send_email(
to=post_author.email,
subject=f"New Comment on '{comment.post.title}'",
template="new_comment", author=post_author, comment=comment,
)

26.8 表单验证

app/forms/auth.py

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
from flask_wtf import FlaskForm
from wtforms import StringField, PasswordField, BooleanField, SubmitField
from wtforms.validators import DataRequired, Email, EqualTo, Length, ValidationError
from app.models.user import User


class LoginForm(FlaskForm):
username = StringField("Username", validators=[DataRequired(), Length(1, 64)])
password = PasswordField("Password", validators=[DataRequired()])
remember_me = BooleanField("Remember Me")
submit = SubmitField("Sign In")


class RegistrationForm(FlaskForm):
username = StringField("Username", validators=[DataRequired(), Length(3, 64)])
email = StringField("Email", validators=[DataRequired(), Email(), Length(1, 120)])
password = PasswordField("Password", validators=[DataRequired(), Length(8, 128)])
password2 = PasswordField(
"Repeat Password",
validators=[DataRequired(), EqualTo("password", message="Passwords must match.")],
)
submit = SubmitField("Register")

def validate_username(self, field):
if User.query.filter_by(username=field.data).first():
raise ValidationError("Username already in use.")

def validate_email(self, field):
if User.query.filter_by(email=field.data).first():
raise ValidationError("Email already registered.")


class ChangePasswordForm(FlaskForm):
old_password = PasswordField("Old Password", validators=[DataRequired()])
new_password = PasswordField("New Password", validators=[DataRequired(), Length(8, 128)])
new_password2 = PasswordField(
"Repeat New Password",
validators=[DataRequired(), EqualTo("new_password")],
)
submit = SubmitField("Update Password")


class PasswordResetRequestForm(FlaskForm):
email = StringField("Email", validators=[DataRequired(), Email()])
submit = SubmitField("Reset Password")


class PasswordResetForm(FlaskForm):
password = PasswordField("New Password", validators=[DataRequired(), Length(8, 128)])
password2 = PasswordField(
"Repeat Password", validators=[DataRequired(), EqualTo("password")],
)
submit = SubmitField("Reset Password")

app/forms/post.py

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
from flask_wtf import FlaskForm
from wtforms import StringField, TextAreaField, SelectField, SubmitField, BooleanField
from wtforms.validators import DataRequired, Length, Optional


class PostForm(FlaskForm):
title = StringField("Title", validators=[DataRequired(), Length(1, 200)])
content = TextAreaField("Content", validators=[DataRequired()])
summary = TextAreaField("Summary", validators=[Optional(), Length(0, 500)])
category = StringField("Category", validators=[Optional(), Length(0, 50)])
tags = StringField("Tags (comma-separated)", validators=[Optional()])
status = SelectField(
"Status",
choices=[("draft", "Draft"), ("published", "Published")],
default="draft",
)
is_pinned = BooleanField("Pin this post")
submit = SubmitField("Save")


class CommentForm(FlaskForm):
content = TextAreaField("Comment", validators=[DataRequired(), Length(1, 2000)])
submit = SubmitField("Submit Comment")

26.9 Web视图层

26.9.1 认证视图

app/views/auth.py

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
from flask import Blueprint, render_template, redirect, url_for, flash, request
from flask_login import login_user, logout_user, current_user
from app.extensions import db
from app.models.user import User
from app.forms.auth import (
LoginForm, RegistrationForm, PasswordResetRequestForm, PasswordResetForm,
)
from app.services.auth import AuthService
from app.services.email import EmailService
from app.utils.decorators import anonymous_required

auth_bp = Blueprint("auth", __name__)


@auth_bp.route("/register", methods=["GET", "POST"])
@anonymous_required
def register():
form = RegistrationForm()
if form.validate_on_submit():
try:
AuthService.register(
username=form.username.data,
email=form.email.data,
password=form.password.data,
)
flash("Registration successful! Please log in.", "success")
return redirect(url_for("auth.login"))
except ValueError as e:
flash(str(e), "danger")
return render_template("auth/register.html", form=form)


@auth_bp.route("/login", methods=["GET", "POST"])
@anonymous_required
def login():
form = LoginForm()
if form.validate_on_submit():
user = AuthService.authenticate(form.username.data, form.password.data)
if user:
login_user(user, remember=form.remember_me.data)
user.update_login_info(request.remote_addr)
db.session.commit()
next_page = request.args.get("next")
flash("Login successful!", "success")
return redirect(next_page or url_for("posts.index"))
flash("Invalid username or password.", "danger")
return render_template("auth/login.html", form=form)


@auth_bp.route("/logout")
def logout():
logout_user()
flash("You have been logged out.", "info")
return redirect(url_for("posts.index"))


@auth_bp.route("/reset-password", methods=["GET", "POST"])
def reset_password_request():
if current_user.is_authenticated:
return redirect(url_for("posts.index"))
form = PasswordResetRequestForm()
if form.validate_on_submit():
user = User.query.filter_by(email=form.email.data).first()
if user:
token = user.get_reset_password_token()
EmailService.send_password_reset_email(user, token)
flash("Check your email for instructions to reset your password.", "info")
return redirect(url_for("auth.login"))
return render_template("auth/reset_password_request.html", form=form)


@auth_bp.route("/reset-password/<token>", methods=["GET", "POST"])
def reset_password(token):
if current_user.is_authenticated:
return redirect(url_for("posts.index"))
user = User.verify_reset_password_token(token)
if not user:
return redirect(url_for("posts.index"))
form = PasswordResetForm()
if form.validate_on_submit():
user.set_password(form.password.data)
db.session.commit()
flash("Your password has been reset.", "success")
return redirect(url_for("auth.login"))
return render_template("auth/reset_password.html", form=form)

26.9.2 文章视图

app/views/posts.py

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
from flask import Blueprint, render_template, redirect, url_for, flash, request, abort, current_app
from flask_login import login_required, current_user
from app.extensions import db, cache
from app.models.post import Post, Category, Tag
from app.models.comment import Comment
from app.forms.post import PostForm, CommentForm
from app.services.post import PostService

posts_bp = Blueprint("posts", __name__)


@posts_bp.route("/")
@posts_bp.route("/index")
@cache.cached(timeout=60, query_string=True)
def index():
page = request.args.get("page", 1, type=int)
pagination = PostService.get_published_posts(page=page)
categories = Category.query.order_by(Category.sort_order).all()
return render_template(
"posts/index.html",
posts=pagination.items,
pagination=pagination,
categories=categories,
)


@posts_bp.route("/post/<slug>")
def detail(slug):
post = PostService.get_post_by_slug(slug)
if post is None:
abort(404)
if post.status != Post.STATUS_PUBLISHED and post.author != current_user:
abort(404)

post.increment_view()
db.session.commit()

comment_form = CommentForm()
page = request.args.get("page", 1, type=int)
comments = Comment.query.filter_by(
post_id=post.id, is_deleted=False, is_approved=True, parent_id=None,
).order_by(Comment.created_at.desc()).paginate(
page=page, per_page=current_app.config.get("COMMENTS_PER_PAGE", 20),
error_out=False,
)
return render_template(
"posts/detail.html", post=post, comments=comments, comment_form=comment_form,
)


@posts_bp.route("/create", methods=["GET", "POST"])
@login_required
def create():
form = PostForm()
if form.validate_on_submit():
tag_names = [t.strip() for t in form.tags.data.split(",") if t.strip()] if form.tags.data else []
post = PostService.create_post(
user_id=current_user.id, title=form.title.data,
content=form.content.data, summary=form.summary.data,
category_name=form.category.data or None, tag_names=tag_names,
status=form.status.data, is_pinned=form.is_pinned.data,
)
flash("Post created successfully!", "success")
return redirect(url_for("posts.detail", slug=post.slug))
return render_template("posts/create.html", form=form)


@posts_bp.route("/edit/<slug>", methods=["GET", "POST"])
@login_required
def edit(slug):
post = PostService.get_post_by_slug(slug)
if post is None:
abort(404)
if post.author != current_user and not current_user.is_editor:
abort(403)

form = PostForm()
if form.validate_on_submit():
tag_names = [t.strip() for t in form.tags.data.split(",") if t.strip()] if form.tags.data else []
PostService.update_post(
post, title=form.title.data, content=form.content.data,
summary=form.summary.data, category_name=form.category.data or None,
tag_names=tag_names, status=form.status.data, is_pinned=form.is_pinned.data,
)
flash("Post updated successfully!", "success")
return redirect(url_for("posts.detail", slug=post.slug))

form.title.data = post.title
form.content.data = post.content
form.summary.data = post.summary
form.category.data = post.category.name if post.category else ""
form.tags.data = ", ".join(tag.name for tag in post.tags)
form.status.data = post.status
form.is_pinned.data = post.is_pinned
return render_template("posts/edit.html", form=form, post=post)


@posts_bp.route("/delete/<slug>", methods=["POST"])
@login_required
def delete(slug):
post = PostService.get_post_by_slug(slug)
if post is None:
abort(404)
if post.author != current_user and not current_user.is_admin:
abort(403)
post.soft_delete()
db.session.commit()
flash("Post deleted.", "success")
return redirect(url_for("posts.index"))


@posts_bp.route("/search")
def search():
query = request.args.get("q", "").strip()
page = request.args.get("page", 1, type=int)
if not query:
return render_template("posts/search.html", posts=[], query="", pagination=None)
pagination = PostService.search_posts(query, page=page)
return render_template("posts/search.html", posts=pagination.items, pagination=pagination, query=query)


@posts_bp.route("/category/<slug>")
def category(slug):
category = Category.query.filter_by(slug=slug).first_or_404()
page = request.args.get("page", 1, type=int)
pagination = Post.query.filter_by(
category=category, status=Post.STATUS_PUBLISHED, is_deleted=False,
).order_by(Post.published_at.desc()).paginate(
page=page, per_page=current_app.config["POSTS_PER_PAGE"], error_out=False,
)
return render_template("posts/category.html", category=category, posts=pagination.items, pagination=pagination)


@posts_bp.route("/tag/<slug>")
def tag(slug):
tag = Tag.query.filter_by(slug=slug).first_or_404()
page = request.args.get("page", 1, type=int)
pagination = tag.posts.filter_by(
status=Post.STATUS_PUBLISHED, is_deleted=False,
).order_by(Post.published_at.desc()).paginate(
page=page, per_page=current_app.config["POSTS_PER_PAGE"], error_out=False,
)
return render_template("posts/tag.html", tag=tag, posts=pagination.items, pagination=pagination)


@posts_bp.route("/post/<int:post_id>/comment", methods=["POST"])
@login_required
def add_comment(post_id):
post = Post.query.get_or_404(post_id)
form = CommentForm()
if form.validate_on_submit():
comment = Comment(
content=form.content.data, post_id=post.id,
user_id=current_user.id, parent_id=request.form.get("parent_id", type=int),
)
db.session.add(comment)
post.comment_count += 1
db.session.commit()
flash("Comment added!", "success")
return redirect(url_for("posts.detail", slug=post.slug))

26.9.3 管理后台视图

app/views/admin.py

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
from flask import Blueprint, render_template, redirect, url_for, flash, request
from flask_login import login_required, current_user
from app.extensions import db
from app.models.user import User
from app.models.post import Post
from app.models.comment import Comment
from app.utils.decorators import admin_required

admin_bp = Blueprint("admin", __name__)


@admin_bp.before_request
@login_required
@admin_required
def check_admin():
pass


@admin_bp.route("/dashboard")
def dashboard():
stats = {
"total_users": User.query.filter_by(is_deleted=False).count(),
"total_posts": Post.query.filter_by(is_deleted=False).count(),
"published_posts": Post.query.filter_by(status="published", is_deleted=False).count(),
"draft_posts": Post.query.filter_by(status="draft", is_deleted=False).count(),
"total_comments": Comment.query.filter_by(is_deleted=False).count(),
"pending_comments": Comment.query.filter_by(is_approved=False, is_deleted=False).count(),
}
recent_posts = Post.query.order_by(Post.created_at.desc()).limit(5).all()
recent_comments = Comment.query.order_by(Comment.created_at.desc()).limit(5).all()
return render_template(
"admin/dashboard.html", stats=stats, recent_posts=recent_posts, recent_comments=recent_comments,
)


@admin_bp.route("/users")
def users():
page = request.args.get("page", 1, type=int)
pagination = User.query.filter_by(is_deleted=False).order_by(User.created_at.desc()).paginate(
page=page, per_page=20, error_out=False,
)
return render_template("admin/users.html", users=pagination.items, pagination=pagination)


@admin_bp.route("/users/<int:user_id>/toggle-active", methods=["POST"])
def toggle_user_active(user_id):
user = db.session.get(User, user_id)
if user is None:
flash("User not found.", "danger")
return redirect(url_for("admin.users"))
if user.id == current_user.id:
flash("You cannot deactivate yourself.", "warning")
return redirect(url_for("admin.users"))
user.is_active = not user.is_active
db.session.commit()
status = "activated" if user.is_active else "deactivated"
flash(f"User {user.username} has been {status}.", "success")
return redirect(url_for("admin.users"))


@admin_bp.route("/comments")
def comments():
page = request.args.get("page", 1, type=int)
status_filter = request.args.get("status", "pending")
query = Comment.query.filter_by(is_deleted=False)
if status_filter == "pending":
query = query.filter_by(is_approved=False)
elif status_filter == "approved":
query = query.filter_by(is_approved=True)
pagination = query.order_by(Comment.created_at.desc()).paginate(
page=page, per_page=20, error_out=False,
)
return render_template("admin/comments.html", comments=pagination.items, pagination=pagination, status_filter=status_filter)


@admin_bp.route("/comments/<int:comment_id>/approve", methods=["POST"])
def approve_comment(comment_id):
comment = db.session.get(Comment, comment_id)
if comment:
comment.is_approved = True
db.session.commit()
flash("Comment approved.", "success")
return redirect(url_for("admin.comments"))


@admin_bp.route("/comments/<int:comment_id>/reject", methods=["POST"])
def reject_comment(comment_id):
comment = db.session.get(Comment, comment_id)
if comment:
comment.soft_delete()
db.session.commit()
flash("Comment rejected and removed.", "success")
return redirect(url_for("admin.comments"))

26.9.4 错误页面视图

app/views/errors.py

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
from flask import Blueprint, render_template

errors_bp = Blueprint("errors", __name__)


@errors_bp.app_errorhandler(404)
def page_not_found(e):
return render_template("errors/404.html"), 404


@errors_bp.app_errorhandler(500)
def internal_error(e):
from app.extensions import db
db.session.rollback()
return render_template("errors/500.html"), 500


@errors_bp.app_errorhandler(403)
def forbidden(e):
return render_template("errors/403.html"), 403

26.10 RESTful API层

26.10.1 API错误处理

app/api/errors.py

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
from flask import jsonify
from werkzeug.exceptions import HTTPException


class APIError(Exception):
def __init__(self, message: str, status_code: int = 400, payload: dict = None):
super().__init__()
self.message = message
self.status_code = status_code
self.payload = payload or {}

def to_dict(self) -> dict:
result = dict(self.payload)
result["error"] = self.message
result["status"] = self.status_code
return result


class NotFoundError(APIError):
def __init__(self, message: str = "Resource not found"):
super().__init__(message, status_code=404)


class UnauthorizedError(APIError):
def __init__(self, message: str = "Authentication required"):
super().__init__(message, status_code=401)


class ForbiddenError(APIError):
def __init__(self, message: str = "Permission denied"):
super().__init__(message, status_code=403)


class ValidationError(APIError):
def __init__(self, message: str = "Validation error", errors: dict = None):
payload = {"errors": errors} if errors else {}
super().__init__(message, status_code=422, payload=payload)


def register_error_handlers(bp):
@bp.errorhandler(APIError)
def handle_api_error(e):
return jsonify(e.to_dict()), e.status_code

@bp.errorhandler(HTTPException)
def handle_http_error(e):
return jsonify({"error": e.description, "status": e.code}), e.code

@bp.errorhandler(Exception)
def handle_unexpected_error(e):
from flask import current_app
current_app.logger.exception("Unexpected error occurred")
return jsonify({"error": "Internal server error", "status": 500}), 500

26.10.2 API认证装饰器

app/utils/decorators.py

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
from functools import wraps
from flask import request, redirect, url_for, abort
from flask_login import current_user
from app.services.auth import AuthService
from app.api.errors import UnauthorizedError, ForbiddenError


def anonymous_required(f):
@wraps(f)
def decorated(*args, **kwargs):
if current_user.is_authenticated:
return redirect(url_for("posts.index"))
return f(*args, **kwargs)
return decorated


def admin_required(f):
@wraps(f)
def decorated(*args, **kwargs):
if not current_user.is_authenticated or not current_user.is_admin:
abort(403)
return f(*args, **kwargs)
return decorated


def jwt_required(f):
@wraps(f)
def decorated(*args, **kwargs):
token = _extract_token()
if token is None:
raise UnauthorizedError("Missing authentication token")

payload = AuthService.verify_token(token, token_type="access")
if payload is None:
raise UnauthorizedError("Invalid or expired token")

from app.models.user import User
from app.extensions import db
current_user = db.session.get(User, payload["sub"])
if current_user is None or not current_user.is_active:
raise UnauthorizedError("User not found or inactive")

request.current_user = current_user
return f(*args, **kwargs)
return decorated


def role_required(*roles):
def decorator(f):
@wraps(f)
def decorated(*args, **kwargs):
user = getattr(request, "current_user", None)
if user is None:
raise UnauthorizedError("Authentication required")
if user.role not in roles:
raise ForbiddenError("Insufficient permissions")
return f(*args, **kwargs)
return decorated
return decorator


def _extract_token():
auth_header = request.headers.get("Authorization", "")
if auth_header.startswith("Bearer "):
return auth_header[7:]
return None

26.10.3 API序列化与分页

app/utils/serializers.py

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
from flask import request, url_for, current_app


def paginate_query(query, serializer_func, endpoint=None, **kwargs):
page = request.args.get("page", 1, type=int)
per_page = request.args.get(
"per_page", current_app.config.get("POSTS_PER_PAGE", 10), type=int,
)
per_page = min(per_page, 100)

pagination = query.paginate(page=page, per_page=per_page, error_out=False)

result = {
"items": [serializer_func(item) for item in pagination.items],
"meta": {
"page": pagination.page,
"per_page": pagination.per_page,
"total": pagination.total,
"pages": pagination.pages,
"has_next": pagination.has_next,
"has_prev": pagination.has_prev,
},
}

links = {}
if endpoint:
if pagination.has_next:
links["next"] = url_for(endpoint, page=pagination.next_num, per_page=per_page, **kwargs, _external=True)
if pagination.has_prev:
links["prev"] = url_for(endpoint, page=pagination.prev_num, per_page=per_page, **kwargs, _external=True)
links["self"] = url_for(endpoint, page=page, per_page=per_page, **kwargs, _external=True)
result["_links"] = links

return result

26.10.4 认证API

app/api/v1/auth.py

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
from flask import request, jsonify
from app.api.v1 import api_v1_bp
from app.services.auth import AuthService
from app.api.errors import UnauthorizedError, ValidationError
from app.utils.decorators import jwt_required
from app.extensions import db


@api_v1_bp.route("/auth/register", methods=["POST"])
def api_register():
data = request.get_json()
if not data:
raise ValidationError("Request body is required")

required_fields = ["username", "email", "password"]
missing = [f for f in required_fields if not data.get(f)]
if missing:
raise ValidationError(
"Missing required fields",
errors={f: "This field is required" for f in missing},
)

try:
user = AuthService.register(
username=data["username"], email=data["email"], password=data["password"],
)
except ValueError as e:
raise ValidationError(str(e))

tokens = AuthService.generate_tokens(user)
return jsonify({"user": user.to_dict(include_email=True), **tokens}), 201


@api_v1_bp.route("/auth/login", methods=["POST"])
def api_login():
data = request.get_json()
if not data or not data.get("username") or not data.get("password"):
raise ValidationError("Username and password are required")

user = AuthService.authenticate(data["username"], data["password"])
if user is None:
raise UnauthorizedError("Invalid credentials")

user.update_login_info(request.remote_addr)
db.session.commit()

tokens = AuthService.generate_tokens(user)
return jsonify({"user": user.to_dict(include_email=True), **tokens})


@api_v1_bp.route("/auth/refresh", methods=["POST"])
def api_refresh():
data = request.get_json()
if not data or not data.get("refresh_token"):
raise ValidationError("Refresh token is required")

result = AuthService.refresh_access_token(data["refresh_token"])
if result is None:
raise UnauthorizedError("Invalid or expired refresh token")
return jsonify(result)


@api_v1_bp.route("/auth/me", methods=["GET"])
@jwt_required
def api_me():
return jsonify(request.current_user.to_dict(include_email=True))


@api_v1_bp.route("/auth/change-password", methods=["POST"])
@jwt_required
def api_change_password():
data = request.get_json()
if not data or not data.get("old_password") or not data.get("new_password"):
raise ValidationError("Old password and new password are required")

success = AuthService.change_password(
request.current_user, data["old_password"], data["new_password"],
)
if not success:
raise ValidationError("Old password is incorrect")
return jsonify({"message": "Password changed successfully"})

26.10.5 文章API

app/api/v1/posts.py

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
from flask import request, jsonify
from sqlalchemy import or_
from app.api.v1 import api_v1_bp
from app.models.post import Post, Category, Tag
from app.services.post import PostService
from app.api.errors import NotFoundError, ForbiddenError, ValidationError
from app.utils.decorators import jwt_required
from app.utils.serializers import paginate_query
from app.extensions import db


@api_v1_bp.route("/posts", methods=["GET"])
def api_list_posts():
query = Post.query.filter_by(status=Post.STATUS_PUBLISHED, is_deleted=False)

category_slug = request.args.get("category")
if category_slug:
category = Category.query.filter_by(slug=category_slug).first()
if category:
query = query.filter_by(category_id=category.id)

tag_slug = request.args.get("tag")
if tag_slug:
tag = Tag.query.filter_by(slug=tag_slug).first()
if tag:
query = query.filter(Post.tags.contains(tag))

search = request.args.get("q")
if search:
query = query.filter(or_(Post.title.contains(search), Post.content.contains(search)))

query = query.order_by(Post.is_pinned.desc(), Post.published_at.desc())
return jsonify(paginate_query(query, lambda p: p.to_dict(include_content=False), endpoint="api_v1.api_list_posts"))


@api_v1_bp.route("/posts/<slug>", methods=["GET"])
def api_get_post(slug):
post = Post.query.filter_by(slug=slug, is_deleted=False).first()
if post is None:
raise NotFoundError("Post not found")
if post.status != Post.STATUS_PUBLISHED:
user = getattr(request, "current_user", None)
if user is None or (post.author != user and not user.is_editor):
raise NotFoundError("Post not found")
return jsonify(post.to_dict())


@api_v1_bp.route("/posts", methods=["POST"])
@jwt_required
def api_create_post():
data = request.get_json()
if not data or not data.get("title") or not data.get("content"):
raise ValidationError("Title and content are required")

post = PostService.create_post(
user_id=request.current_user.id, title=data["title"], content=data["content"],
summary=data.get("summary"), category_name=data.get("category"),
tag_names=data.get("tags", []), status=data.get("status", "draft"),
)
return jsonify(post.to_dict()), 201


@api_v1_bp.route("/posts/<slug>", methods=["PUT"])
@jwt_required
def api_update_post(slug):
post = Post.query.filter_by(slug=slug, is_deleted=False).first()
if post is None:
raise NotFoundError("Post not found")
if post.author != request.current_user and not request.current_user.is_editor:
raise ForbiddenError("You can only edit your own posts")

data = request.get_json()
if not data:
raise ValidationError("Request body is required")

post = PostService.update_post(
post, title=data.get("title"), content=data.get("content"),
summary=data.get("summary"), category_name=data.get("category"),
tag_names=data.get("tags"), status=data.get("status"),
)
return jsonify(post.to_dict())


@api_v1_bp.route("/posts/<slug>", methods=["DELETE"])
@jwt_required
def api_delete_post(slug):
post = Post.query.filter_by(slug=slug, is_deleted=False).first()
if post is None:
raise NotFoundError("Post not found")
if post.author != request.current_user and not request.current_user.is_admin:
raise ForbiddenError("You can only delete your own posts")

post.soft_delete()
db.session.commit()
return "", 204


@api_v1_bp.route("/categories", methods=["GET"])
def api_list_categories():
categories = Category.query.order_by(Category.sort_order).all()
return jsonify([c.to_dict() for c in categories])


@api_v1_bp.route("/tags", methods=["GET"])
def api_list_tags():
tags = Tag.query.order_by(Tag.name).all()
return jsonify([t.to_dict() for t in tags])

26.10.6 用户与评论API

app/api/v1/users.py

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
from flask import jsonify
from app.api.v1 import api_v1_bp
from app.models.user import User
from app.api.errors import NotFoundError
from app.utils.serializers import paginate_query


@api_v1_bp.route("/users", methods=["GET"])
def api_list_users():
query = User.query.filter_by(is_deleted=False).order_by(User.created_at.desc())
return jsonify(paginate_query(query, lambda u: u.to_dict(), endpoint="api_v1.api_list_users"))


@api_v1_bp.route("/users/<username>", methods=["GET"])
def api_get_user(username):
user = User.query.filter_by(username=username, is_deleted=False).first()
if user is None:
raise NotFoundError("User not found")
return jsonify(user.to_dict())

app/api/v1/comments.py

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
from flask import request, jsonify
from app.api.v1 import api_v1_bp
from app.models.comment import Comment
from app.models.post import Post
from app.api.errors import ValidationError
from app.utils.decorators import jwt_required
from app.extensions import db


@api_v1_bp.route("/posts/<int:post_id>/comments", methods=["GET"])
def api_list_comments(post_id):
Post.query.get_or_404(post_id)
page = request.args.get("page", 1, type=int)
per_page = min(request.args.get("per_page", 20, type=int), 50)

pagination = Comment.query.filter_by(
post_id=post_id, is_deleted=False, is_approved=True, parent_id=None,
).order_by(Comment.created_at.desc()).paginate(
page=page, per_page=per_page, error_out=False,
)

return jsonify({
"items": [c.to_dict(include_replies=True) for c in pagination.items],
"meta": {
"page": pagination.page, "per_page": pagination.per_page,
"total": pagination.total, "pages": pagination.pages,
},
})


@api_v1_bp.route("/posts/<int:post_id>/comments", methods=["POST"])
@jwt_required
def api_create_comment(post_id):
post = Post.query.get_or_404(post_id)
data = request.get_json()
if not data or not data.get("content"):
raise ValidationError("Comment content is required")

comment = Comment(
content=data["content"], post_id=post.id,
user_id=request.current_user.id,
parent_id=data.get("parent_id"),
)
db.session.add(comment)
post.comment_count += 1
db.session.commit()
return jsonify(comment.to_dict()), 201

26.10.7 API蓝图注册

app/api/v1/__init__.py

1
2
3
4
5
6
7
8
from flask import Blueprint
from app.api.errors import register_error_handlers

api_v1_bp = Blueprint("api_v1", __name__)

from app.api.v1 import auth, posts, users, comments # noqa: E402, F401

register_error_handlers(api_v1_bp)

26.11 安全防护实践

26.11.1 OWASP Top 10 防护

威胁防护措施本项目实现
A01 访问控制失效基于角色的权限控制,最小权限原则role_required 装饰器
A02 密码学失败pbkdf2:sha256 哈希,HTTPS强制set_password 方法
A03 注入参数化查询,输入验证,bleach清洗SQLAlchemy ORM + WTForms
A04 不安全设计威胁建模,安全默认配置ProductionConfig 安全设置
A05 安全配置错误安全Header,错误页面不泄露信息Nginx安全Header
A06 过时组件依赖定期更新,安全公告监控pip audit
A07 身份认证失败速率限制,强密码策略,JWT过期flask_limiter + JWT
A08 数据完整性失败CSRF保护,CORS限制flask_wtf.csrf
A09 日志监控不足结构化日志,异常追踪RotatingFileHandler
A10 服务端请求伪造URL白名单,网络隔离内部服务验证

26.11.2 安全Header配置

Nginx安全Header:

1
2
3
4
5
6
add_header X-Content-Type-Options "nosniff" always;
add_header X-Frame-Options "SAMEORIGIN" always;
add_header X-XSS-Protection "1; mode=block" always;
add_header Content-Security-Policy "default-src 'self'; script-src 'self' 'unsafe-inline'; style-src 'self' 'unsafe-inline'; img-src 'self' data:;" always;
add_header Referrer-Policy "strict-origin-when-cross-origin" always;
add_header Permissions-Policy "camera=(), microphone=(), geolocation=()" always;

26.12 测试策略

26.12.1 测试Fixtures

tests/conftest.py

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
import pytest
from app import create_app
from app.extensions import db as _db
from app.models.user import User, UserRole


class TestUser:
def test_user_creation(self, app, db):
with app.app_context():
user = User(username="testuser", email="test@example.com")
user.set_password("TestPass123!")
db.session.add(user)
db.session.commit()
assert user.id is not None
assert user.check_password("TestPass123!")


class TestPost:
def test_post_creation(self, app, db, sample_user):
with app.app_context():
from app.models.post import Post
post = Post(title="Test Post", content="Content", author_id=sample_user.id)
db.session.add(post)
db.session.commit()
assert post.id is not None
assert post.slug is not None

26.13 本章小结

本章通过构建完整的博客系统,系统实践了以下核心知识:

  1. 项目架构:分层架构设计、蓝图模块化、工厂模式
  2. 数据模型:用户系统、文章系统、评论系统、分类标签
  3. 认证授权:Flask-Login会话认证、JWT API认证、权限装饰器
  4. 视图层:模板继承、表单处理、分页、搜索
  5. RESTful API:序列化、分页、错误处理、认证
  6. 安全防护:OWASP Top 10防护、安全Header、输入验证
  7. 测试策略:模型测试、视图测试、API测试
  8. 部署运维:Gunicorn、Nginx、Docker、监控

26.14 延伸阅读

26.14.1 Flask高级主题

  • Flask官方文档 (https://flask.palletsprojects.com/) — Flask权威指南
  • Flask Web Development (Miguel Grinberg) — Flask开发经典
  • Architecture Patterns with Python — Python架构模式

26.14.2 安全与性能

26.14.3 部署与运维

26.14.4 测试与质量


from config import TestingConfig

@pytest.fixture(scope=”session”)
def app():
app = create_app(TestingConfig)
yield app

@pytest.fixture(scope=”function”)
def db(app):
with app.app_context():
_db.create_all()
yield _db
_db.session.rollback()
_db.drop_all()

@pytest.fixture(scope=”function”)
def client(app, db):
return app.test_client()

@pytest.fixture(scope=”function”)
def authenticated_client(client, db):
user = User(username=”testuser”, email=”test@example.com“, role=UserRole.AUTHOR)
user.set_password(“password123”)
_db.session.add(user)
_db.session.commit()

with client.session_transaction() as session:
    session["_user_id"] = user.id

return client

@pytest.fixture
def admin_client(client, db):
admin = User(username=”admin”, email=”admin@example.com“, role=UserRole.ADMIN)
admin.set_password(“adminpass123”)
_db.session.add(admin)
_db.session.commit()

with client.session_transaction() as session:
    session["_user_id"] = admin.id

return client

@pytest.fixture
def auth_headers(app, db):
from app.services.auth import AuthService

user = User(username="apiuser", email="api@example.com", role=UserRole.AUTHOR)
user.set_password("password123")
_db.session.add(user)
_db.session.commit()

tokens = AuthService.generate_tokens(user)
return {"Authorization": f"Bearer {tokens['access_token']}"}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66

### 26.12.2 测试工厂

`tests/factories.py`:

```python
import factory
from app.models.user import User, UserRole
from app.models.post import Post, Category, Tag
from app.models.comment import Comment
from app.extensions import db


class UserFactory(factory.alchemy.SQLAlchemyModelFactory):
class Meta:
model = User
sqlalchemy_session = db.session
sqlalchemy_session_persistence = "commit"

username = factory.Sequence(lambda n: f"user{n}")
email = factory.LazyAttribute(lambda obj: f"{obj.username}@example.com")
role = UserRole.READER
is_active = True

@factory.post_generation
def password(obj, create, extracted, **kwargs):
if extracted:
obj.set_password(extracted)
else:
obj.set_password("defaultpassword")


class CategoryFactory(factory.alchemy.SQLAlchemyModelFactory):
class Meta:
model = Category
sqlalchemy_session = db.session
sqlalchemy_session_persistence = "commit"

name = factory.Sequence(lambda n: f"Category {n}")
slug = factory.LazyAttribute(lambda obj: obj.name.lower().replace(" ", "-"))


class PostFactory(factory.alchemy.SQLAlchemyModelFactory):
class Meta:
model = Post
sqlalchemy_session = db.session
sqlalchemy_session_persistence = "commit"

title = factory.Sequence(lambda n: f"Post Title {n}")
slug = factory.LazyAttribute(lambda obj: obj.title.lower().replace(" ", "-"))
content = factory.Faker("text")
status = Post.STATUS_PUBLISHED
author = factory.SubFactory(UserFactory, role=UserRole.AUTHOR)
category = factory.SubFactory(CategoryFactory)


class CommentFactory(factory.alchemy.SQLAlchemyModelFactory):
class Meta:
model = Comment
sqlalchemy_session = db.session
sqlalchemy_session_persistence = "commit"

content = factory.Faker("text")
author = factory.SubFactory(UserFactory)
post = factory.SubFactory(PostFactory)
is_approved = True

26.12.3 单元测试

tests/unit/test_models.py

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
import pytest
from app.models.user import User, UserRole
from app.models.post import Post
from app.models.comment import Comment


class TestUserModel:
def test_set_and_check_password(self, app, db):
with app.app_context():
user = User(username="test", email="test@example.com")
user.set_password("securepassword")
assert user.check_password("securepassword") is True
assert user.check_password("wrongpassword") is False

def test_role_properties(self, app, db):
with app.app_context():
admin = User(username="admin", email="admin@example.com", role=UserRole.ADMIN)
assert admin.is_admin is True
assert admin.is_editor is True
assert admin.is_author is True

reader = User(username="reader", email="reader@example.com", role=UserRole.READER)
assert reader.is_admin is False
assert reader.is_editor is False
assert reader.is_author is False

def test_follow_unfollow(self, app, db):
with app.app_context():
user1 = User(username="user1", email="user1@example.com")
user1.set_password("password")
user2 = User(username="user2", email="user2@example.com")
user2.set_password("password")
db.session.add_all([user1, user2])
db.session.commit()

user1.follow(user2)
assert user1.is_following(user2) is True
assert user2.is_following(user1) is False

user1.unfollow(user2)
assert user1.is_following(user2) is False


class TestPostModel:
def test_publish(self, app, db):
with app.app_context():
user = User(username="author", email="author@example.com", role=UserRole.AUTHOR)
user.set_password("password")
db.session.add(user)
db.session.commit()

post = Post(title="Test", slug="test", content="Content", user_id=user.id)
db.session.add(post)
db.session.commit()

assert post.status == Post.STATUS_DRAFT
assert post.published_at is None

post.publish()
assert post.status == Post.STATUS_PUBLISHED
assert post.published_at is not None

def test_soft_delete(self, app, db):
with app.app_context():
user = User(username="author2", email="author2@example.com", role=UserRole.AUTHOR)
user.set_password("password")
db.session.add(user)
db.session.commit()

post = Post(title="Delete Test", slug="delete-test", content="Content", user_id=user.id)
db.session.add(post)
db.session.commit()

post.soft_delete()
assert post.is_deleted is True
assert post.deleted_at is not None

26.12.4 API集成测试

tests/integration/test_api.py

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
import json
import pytest


class TestAuthAPI:
def test_register(self, client, app):
with app.app_context():
response = client.post("/api/v1/auth/register", json={
"username": "newuser",
"email": "new@example.com",
"password": "securepass123",
})
assert response.status_code == 201
data = response.get_json()
assert "access_token" in data
assert "refresh_token" in data
assert data["user"]["username"] == "newuser"

def test_register_duplicate_username(self, client, app, db):
with app.app_context():
from app.models.user import User
user = User(username="existing", email="existing@example.com")
user.set_password("password")
db.session.add(user)
db.session.commit()

response = client.post("/api/v1/auth/register", json={
"username": "existing",
"email": "another@example.com",
"password": "securepass123",
})
assert response.status_code == 422

def test_login(self, client, app, db):
with app.app_context():
from app.models.user import User
user = User(username="loginuser", email="login@example.com")
user.set_password("password123")
db.session.add(user)
db.session.commit()

response = client.post("/api/v1/auth/login", json={
"username": "loginuser",
"password": "password123",
})
assert response.status_code == 200
data = response.get_json()
assert "access_token" in data

def test_login_invalid_credentials(self, client, app, db):
with app.app_context():
response = client.post("/api/v1/auth/login", json={
"username": "nonexistent",
"password": "wrongpassword",
})
assert response.status_code == 401


class TestPostsAPI:
def test_list_posts(self, client, app, db):
with app.app_context():
response = client.get("/api/v1/posts")
assert response.status_code == 200
data = response.get_json()
assert "items" in data
assert "meta" in data

def test_create_post(self, client, app, db, auth_headers):
with app.app_context():
response = client.post("/api/v1/posts", json={
"title": "API Test Post",
"content": "This is a test post created via API.",
"status": "published",
}, headers=auth_headers)
assert response.status_code == 201
data = response.get_json()
assert data["title"] == "API Test Post"

def test_create_post_unauthorized(self, client, app, db):
with app.app_context():
response = client.post("/api/v1/posts", json={
"title": "Unauthorized Post",
"content": "Should fail.",
})
assert response.status_code == 401

def test_delete_post(self, client, app, db, auth_headers):
with app.app_context():
create_resp = client.post("/api/v1/posts", json={
"title": "To Delete", "content": "Will be deleted", "status": "published",
}, headers=auth_headers)
slug = create_resp.get_json()["slug"]

response = client.delete(f"/api/v1/posts/{slug}", headers=auth_headers)
assert response.status_code == 204

26.13 生产部署

26.13.1 Docker容器化

Dockerfile

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
FROM python:3.12-slim AS builder

WORKDIR /app

RUN pip install --no-cache-dir poetry

COPY pyproject.toml poetry.lock ./
RUN poetry config virtualenvs.create false && poetry install --no-dev --no-interaction --no-ansi

FROM python:3.12-slim

WORKDIR /app

RUN addgroup --system appgroup && adduser --system --ingroup appgroup appuser

COPY --from=builder /usr/local/lib/python3.12/site-packages /usr/local/lib/python3.12/site-packages
COPY --from=builder /usr/local/bin /usr/local/bin
COPY . .

RUN mkdir -p instance app/static/uploads && chown -R appuser:appgroup /app

USER appuser

EXPOSE 8000

CMD ["gunicorn", "--bind", "0.0.0.0:8000", "--workers", "4", "--threads", "2", "--timeout", "120", "--access-logfile", "-", "app:create_app()"]

26.13.2 Docker Compose编排

docker-compose.yml

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
version: "3.8"

services:
web:
build: .
ports:
- "8000:8000"
environment:
- FLASK_ENV=production
- SECRET_KEY=${SECRET_KEY}
- JWT_SECRET_KEY=${JWT_SECRET_KEY}
- DATABASE_URL=postgresql://flaskblog:${DB_PASSWORD}@db:5432/flaskblog
- REDIS_URL=redis://redis:6379/0
depends_on:
db:
condition: service_healthy
redis:
condition: service_started
restart: unless-stopped

db:
image: postgres:16-alpine
volumes:
- postgres_data:/var/lib/postgresql/data
environment:
- POSTGRES_DB=flaskblog
- POSTGRES_USER=flaskblog
- POSTGRES_PASSWORD=${DB_PASSWORD}
healthcheck:
test: ["CMD-SHELL", "pg_isready -U flaskblog"]
interval: 5s
timeout: 5s
retries: 5
restart: unless-stopped

redis:
image: redis:7-alpine
volumes:
- redis_data:/data
restart: unless-stopped

celery_worker:
build: .
command: celery -A celery_app.celery worker --loglevel=info --concurrency=2
environment:
- FLASK_ENV=production
- SECRET_KEY=${SECRET_KEY}
- DATABASE_URL=postgresql://flaskblog:${DB_PASSWORD}@db:5432/flaskblog
- REDIS_URL=redis://redis:6379/0
depends_on:
- db
- redis
restart: unless-stopped

nginx:
image: nginx:alpine
ports:
- "80:80"
- "443:443"
volumes:
- ./nginx.conf:/etc/nginx/conf.d/default.conf:ro
- ./app/static:/usr/share/nginx/static:ro
depends_on:
- web
restart: unless-stopped

volumes:
postgres_data:
redis_data:

26.13.3 Nginx反向代理

nginx.conf

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
upstream flask_app {
server web:8000;
}

server {
listen 80;
server_name example.com;
return 301 https://$server_name$request_uri;
}

server {
listen 443 ssl http2;
server_name example.com;

ssl_certificate /etc/nginx/ssl/cert.pem;
ssl_certificate_key /etc/nginx/ssl/key.pem;
ssl_protocols TLSv1.2 TLSv1.3;
ssl_ciphers HIGH:!aNULL:!MD5;

client_max_body_size 16M;

add_header X-Content-Type-Options "nosniff" always;
add_header X-Frame-Options "SAMEORIGIN" always;
add_header X-XSS-Protection "1; mode=block" always;
add_header Referrer-Policy "strict-origin-when-cross-origin" always;

location /static/ {
alias /usr/share/nginx/static/;
expires 30d;
add_header Cache-Control "public, immutable";
}

location / {
proxy_pass http://flask_app;
proxy_set_header Host $host;
proxy_set_header X-Real-IP $remote_addr;
proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
proxy_set_header X-Forwarded-Proto $scheme;
proxy_redirect off;
}
}

26.13.4 Celery异步任务

celery_app.py

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
from celery import Celery
from config import BaseConfig


def make_celery(app):
celery = Celery(
app.import_name,
broker=app.config["CELERY_BROKER_URL"],
backend=app.config["CELERY_RESULT_BACKEND"],
)
celery.config_from_object({
"task_serializer": "json",
"accept_content": ["json"],
"result_serializer": "json",
"timezone": "UTC",
"enable_utc": True,
"task_track_started": True,
"worker_prefetch_multiplier": 1,
"task_acks_late": True,
})

class ContextTask(celery.Task):
def __call__(self, *args, **kwargs):
with app.app_context():
return self.run(*args, **kwargs)

celery.Task = ContextTask
return celery


from app import create_app # noqa: E402

flask_app = create_app()
celery = make_celery(flask_app)


@celery.task(bind=True, max_retries=3)
def send_email_task(self, to, subject, template, **kwargs):
from app.services.email import EmailService
try:
EmailService.send_email(to=to, subject=subject, template=template, **kwargs)
except Exception as exc:
raise self.retry(exc=exc, countdown=60)


@celery.task
def update_post_stats():
from app.extensions import db
from app.models.post import Post
with flask_app.app_context():
posts = Post.query.filter_by(status="published", is_deleted=False).all()
for post in posts:
post.comment_count = post.comments.filter_by(is_deleted=False, is_approved=True).count()
db.session.commit()

26.14 性能优化

26.14.1 数据库查询优化

优化策略实现方式效果
索引优化复合索引覆盖高频查询减少全表扫描
查询优化joinedload/subqueryload 预加载消除N+1查询
分页优化基于游标的分页替代 OFFSET大数据集性能稳定
连接池pool_size + pool_pre_ping减少连接建立开销
慢查询日志SQLALCHEMY_RECORD_QUERIES定位性能瓶颈

预加载示例:

1
2
3
4
5
6
7
from sqlalchemy.orm import joinedload

posts = Post.query.options(
joinedload(Post.author),
joinedload(Post.category),
joinedload(Post.tags),
).filter_by(status="published").all()

26.14.2 缓存策略

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
from app.extensions import cache


@cache.cached(timeout=300, key_prefix="post_list")
def get_cached_posts(page=1):
return PostService.get_published_posts(page=page)


@cache.memoize(timeout=600)
def get_cached_post(slug):
return PostService.get_post_by_slug(slug)


def invalidate_post_cache(slug=None):
cache.delete("post_list")
if slug:
cache.delete_memoized(get_cached_post, slug)

26.14.3 Gunicorn调优

1
2
3
4
5
6
7
8
9
10
11
12
gunicorn \
--bind 0.0.0.0:8000 \
--workers 4 \
--threads 2 \
--worker-class gthread \
--timeout 120 \
--graceful-timeout 30 \
--max-requests 1000 \
--max-requests-jitter 50 \
--access-logfile - \
--error-logfile - \
"app:create_app()"

Workers数量推荐公式:(2 × CPU核心数) + 1


26.15 前沿技术动态

26.15.1 异步Web框架

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
from fastapi import FastAPI
from pydantic import BaseModel

app = FastAPI()

class Item(BaseModel):
name: str
price: float

@app.post("/items/")
async def create_item(item: Item):
return {"item": item, "status": "created"}

@app.get("/items/{item_id}")
async def read_item(item_id: int):
return {"item_id": item_id}

26.15.2 现代前端集成

1
2
3
4
5
6
7
8
9
10
from flask import Flask, send_from_directory

app = Flask(__name__, static_folder='../frontend/dist')

@app.route('/', defaults={'path': ''})
@app.route('/<path:path>')
def serve_spa(path):
if path != "" and os.path.exists(app.static_folder + '/' + path):
return send_from_directory(app.static_folder, path)
return send_from_directory(app.static_folder, 'index.html')

26.15.3 容器化部署

1
2
3
4
5
6
7
8
9
FROM python:3.12-slim

WORKDIR /app
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt

COPY . .
EXPOSE 8000
CMD ["gunicorn", "-w", "4", "-b", "0.0.0.0:8000", "app:create_app()"]

26.15.4 现代ORM实践

1
2
3
4
5
6
7
8
9
from sqlalchemy.orm import Mapped, mapped_column
from sqlalchemy import String

class User(Base):
__tablename__ = "users"

id: Mapped[int] = mapped_column(primary_key=True)
name: Mapped[str] = mapped_column(String(100))
email: Mapped[str] = mapped_column(String(255), unique=True)

26.16 本章小结

本章从零构建了一个生产级博客平台,系统覆盖了以下核心实践:

  1. 架构设计:采用分层架构(表示层→业务逻辑层→数据访问层→基础设施层),结合DDD概念组织领域模型
  2. 配置管理:多环境配置(开发/测试/生产),敏感信息通过环境变量注入,生产环境强制安全Cookie
  3. 数据模型:运用SQLAlchemy Mixin模式实现时间戳、软删除、Slug等通用能力,支持复杂关系(多对多、自引用)
  4. 认证授权:双轨认证体系(Web端Session + API端JWT),基于角色的访问控制,密码重置Token机制
  5. RESTful API:版本化API(/api/v1),统一错误处理,HATEOAS分页链接,JWT Bearer认证
  6. 安全防护:CSRF保护、XSS清洗(bleach)、CORS限制、速率限制、安全Header
  7. 测试策略:测试金字塔(单元→集成→端到端),Factory Boy数据工厂,pytest fixtures管理
  8. 生产部署:Docker多阶段构建,Docker Compose编排(Web + DB + Redis + Celery + Nginx),SSL终止

26.17 扩展练习

基础练习

  1. 为用户模型添加头像上传功能,支持图片裁剪与缩略图生成
  2. 实现文章草稿自动保存功能(前端定时保存 + 后端API)
  3. 添加文章点赞功能,要求使用Redis缓存计数

进阶练习

  1. 实现OAuth2第三方登录(GitHub/Google),集成Authlib库
  2. 构建WebSocket实时通知系统(Flask-SocketIO),实现评论实时推送
  3. 实现基于PostgreSQL全文搜索的搜索引擎,支持中文分词

高级练习

  1. 设计并实现API速率限制的滑动窗口算法,替代固定窗口
  2. 构建完整的CI/CD流水线(GitHub Actions),包含自动测试、构建、部署
  3. 实现基于Redis的分布式锁,确保并发场景下数据一致性
  4. 设计微服务拆分方案,将用户服务、文章服务、通知服务独立部署

下一章:第27章 实战:数据分析