Django 项目的整洁架构重构:从面条代码到契约先行

Django 的开发哲学是”快”——一个 models.py,一个 views.py,加几行 URL 配置,功能就上线了。这里面没有业务层的概念,Model 既管持久化又管业务逻辑,View 既管 HTTP 又管权限又管数据组装。小程序这么做没问题,但 ServerManager 的代码量滚到了十几万行,面条代码开始反噬了——改一个 WebSocket 动作的权限逻辑,得在 Consumer 里翻三个文件才能找到 auth 在哪里做的。

所以我们做了一次彻底的重构,把核心层(apps/core/)从业务代码里抽出来,用 frozen dataclass 做契约、用 command bus 做调度、用 event bus 做解耦、用 outbox 做可靠投递。下面把每一块的思路和为什么这么做聊一下。

契约先行:frozen dataclass

所有跨层传递的数据结构一律用 @dataclass(frozen=True) 定义成契约。frozen=True 不是摆设——它保证对象创建后不可变,不会有谁在半路偷偷改掉 user_id 或者塞一个新字段进去。

最核心的是 AuthPrincipal——一个统一的身份类型:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
@dataclass(frozen=True)
class AuthPrincipal:
principal_type: Literal["user", "node", "download_ticket", "anonymous"]
user_id: int | None = None
username: str = ""
node_uuid: str | None = None
role_ids: tuple[int, ...] = ()
permission_codes: tuple[str, ...] = ()
is_superuser: bool = False
is_disabled: bool = False

@property
def is_node(self) -> bool:
return self.principal_type == "node"

def has_permission(self, code: str) -> bool:
return self.is_superuser or "all" in self.permission_codes or code in self.permission_codes

无论你是 HTTP 请求里的登录用户、WebSocket 连接上的节点、一次性下载票据、还是匿名访问——都用同一个 AuthPrincipal。中间件构造它,后续所有层只读它。不需要再判断”这个 request 是用户还是节点”,看 principal_type 就行。

同级的还有 RequestContext(携带 request_id、IP、语言等)和 DomainEvent(领域事件),都是 frozen 的。在代码规范里甚至有明确禁令:不可以在 DTO 上使用 getattr/setattr

为什么用 dataclass 而不是 Pydantic?因为 dataclass 是标准库、零依赖、快。Pydantic 的验证和序列化能力在这里用不上——这些契约对象只在进程内传递,不序列化。

模块清单:声明式的权限和路由注册

Django 项目长大的一个征兆就是 URL 配置变成一坨意大利面。urls.py 里几百行 path(),你搞不清哪个 URL 要什么权限、哪个 WebSocket 动作归哪个模块管。

重构后,每个模块用 ModuleManifest 声明自己的一切:

1
2
3
4
5
6
7
8
9
10
11
12
@dataclass(frozen=True)
class ModuleManifest:
module_id: str
label: str
url_prefix: str = ""
urls_module: str = ""
permissions: tuple[PermissionSpec, ...] = ()
permission_categories: tuple[PermissionCategorySpec, ...] = ()
routes: tuple[RouteSpec, ...] = ()
websocket_actions: tuple[WebSocketActionSpec, ...] = ()
dashboard_providers: tuple[DashboardProviderSpec, ...] = ()
scheduler_jobs: tuple[SchedulerJobSpec, ...] = ()

ModuleManifestRegistry 在启动时做全量校验——模块 ID 不能重复、URL 前缀不能冲突、权限码不能重复、路由引用的权限必须已声明、WebSocket 动作不能重复……

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
class ModuleManifestRegistry:
def validate(self) -> list[ManifestViolation]:
violations = []
seen_modules = set()
seen_permissions = set()
seen_routes = set()
seen_ws = set()
# ...
for manifest in self._manifests:
if manifest.module_id in seen_modules:
violations.append(ManifestViolation("DUP_MODULE", ...))
# 权限引用检查:路由声明的权限必须存在于某个模块的 permissions 里
for route in manifest.routes:
violations.extend(
_validate_requirement(route.permission, all_permissions, ...)
)
return violations

这样做的好处是权限变成了一张全局表,哪些模块有哪些权限、谁依赖谁,一目了然。代码审查不需要再去找零散的 @permission_required 装饰器。

Command Bus:把 WebSocket 消息调度收口

重构之前,业务代码直接调 channel_layer.group_send() 发 WebSocket 消息,散落在各个 view 和 consumer 里。重构后,所有到节点的命令走同一条路——NodeCommandBus

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
class NodeCommandBus:
def __init__(self, adapter: ChannelsNodeAdapter | None = None):
self._adapter = adapter or default_channels_node_adapter

async def send(self, command: NodeCommand) -> NodeCommandResult:
payload = command.payload
if is_dataclass(payload):
payload = asdict(payload)
event = {
"type": "send.message",
"action": command.action,
"request_id": command.request_id,
"payload": payload,
"requires_signature": command.requires_signature,
}
await self._adapter.send_to_node(command.node_uuid, event)
return NodeCommandResult(request_id=command.request_id, status="sent")

业务代码只跟 NodeCommandNodeCommandResult 打交道,不碰 Channels 的底层 API。好处在哪?

  1. 单一出口——所有发往节点的消息都经过 bus,日志、签名、权限检查统一处理
  2. 可测试——测试时 inject 一个 mock adapter 就行,不需要起 Channels layer
  3. 可追踪——每条命令都有 request_id,从产生到送达全链路可追踪

新代码规范很明确:不允许在 bus 之外直接调 channel_layer.group_send()

Event Bus:模块间松耦合

有了 Command Bus 管节点方向,模块之间的横向通知就要靠 Event Bus 了:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
class DomainEventBus:
def __init__(self):
self._subscribers: dict[str, list[Subscriber]] = defaultdict(list)

def subscribe(self, event_type: str, subscriber: Subscriber) -> None:
self._subscribers[event_type].append(subscriber)

def publish(self, event: DomainEvent) -> list[object]:
results = []
for subscriber in tuple(self._subscribers.get(event.event_type, ())):
results.append(subscriber(event))
for subscriber in tuple(self._subscribers.get("*", ())):
results.append(subscriber(event))
return results

简单到只有两个方法,但这就够了。* 通配符订阅让审计模块可以监听所有事件做日志,不需要每个事件都显式通知它。

事件也是 frozen dataclass:

1
2
3
4
5
6
7
8
9
10
@dataclass(frozen=True)
class DomainEvent:
event_id: str
event_type: str
occurred_at: datetime
actor: AuthPrincipal | None
payload: object
aggregate_type: str = ""
aggregate_id: str = ""
idempotency_key: str = ""

idempotency_key 是给消费端用的——同一个事件处理两次不会出事。这个后面 Outbox 模式会用到。

Unit of Work + Outbox:把事务做对

Django 的 transaction.atomic() 够用,但有个经典问题:业务逻辑在事务里写了一堆东西到数据库,事务提交前发了 WebSocket 消息。结果消息先到了消费端,消费端查数据库——还没提交呢,查不到。

UnitOfWork 极其简单,就是 transaction.atomic() 的薄封装:

1
2
3
4
5
6
7
8
9
10
class UnitOfWork:
@contextmanager
def atomic(self) -> Iterator[None]:
with transaction.atomic():
yield

def on_commit(self, callback: Callable[[], object]) -> None:
transaction.on_commit(callback)

default_unit_of_work = UnitOfWork()

但配合 OutboxPublisher 就有意思了:

1
2
3
4
5
6
7
class OutboxPublisher:
def publish_after_commit(self, event: OutboxEvent) -> OutboxEvent:
if self._use_persistent:
transaction.on_commit(lambda: self._persist_event(event))
else:
transaction.on_commit(lambda: self._enqueue_memory(event))
return event

on_commit 的回调在事务成功提交后执行。如果事务回滚了,事件也不会被persist——数据一致性天然保证。

OutboxPublisher 有两种模式:持久模式写入 OutboxRecord 数据库表,供 worker 消费;内存模式用 deque 做进程内分发,适合开发环境和测试。

1
2
3
4
5
6
7
8
9
@dataclass(frozen=True)
class OutboxEvent:
event_id: str
event_type: str
aggregate_type: str
aggregate_id: str
idempotency_key: str
payload: Any
status: Literal["pending", "sent", "failed"] = "pending"

Worker 消费后改状态为 sent;失败了改 failedretry_count 加 1。有 retry_failed(max_retries=3) 方法捞回失败的事件重试。整套机制保证了”业务操作成功则事件一定投递,业务操作失败则事件一定不投递”。

不只是代码,更是约束

这次重构最有价值的不是代码本身,而是写在项目规范文档里的约束规则:

  • DTO 必须用 frozen=True 的 dataclass
  • 不允许在 DTO 上使用 getattr/setattr
  • 发往节点的消息必须走 NodeCommandBus,禁止直接 channel_layer.group_send()
  • 所有跨模块事件走 DomainEventBus
  • 有副作用的操作必须包在 UnitOfWork.atomic()
  • 事件投递必须用 OutboxPublisher.publish_after_commit()

代码是人写的,人会犯错。但架构约束写进了开发规范,lint 也能检查,新来的人只要按规范走,就不会写出面条代码。

Django 不会告诉你”这里应该分层”——它甚至鼓励你把所有东西塞进 View。但当一个项目长到十几万行,分层就是刚需,不是可选。把契约、总线、事件、Outbox 这些概念从业务代码里提取出来放进 core/ 层,剩下的业务模块只用关心自己的事。

欢迎关注我的其它发布渠道