Django 的开发哲学是”快”——一个 models.py,一个 views.py,加几行 URL 配置,功能就上线了。这里面没有业务层的概念,Model 既管持久化又管业务逻辑,View 既管 HTTP 又管权限又管数据组装。小程序这么做没问题,但 ServerManager 的代码量滚到了十几万行,面条代码开始反噬了——改一个 WebSocket 动作的权限逻辑,得在 Consumer 里翻三个文件才能找到 auth 在哪里做的。
所以我们做了一次彻底的重构,把核心层(apps/core/)从业务代码里抽出来,用 frozen dataclass 做契约、用 command bus 做调度、用 event bus 做解耦、用 outbox 做可靠投递。下面把每一块的思路和为什么这么做聊一下。
契约先行:frozen dataclass
所有跨层传递的数据结构一律用 @dataclass(frozen=True) 定义成契约。frozen=True 不是摆设——它保证对象创建后不可变,不会有谁在半路偷偷改掉 user_id 或者塞一个新字段进去。
最核心的是 AuthPrincipal——一个统一的身份类型:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
| @dataclass(frozen=True) class AuthPrincipal: principal_type: Literal["user", "node", "download_ticket", "anonymous"] user_id: int | None = None username: str = "" node_uuid: str | None = None role_ids: tuple[int, ...] = () permission_codes: tuple[str, ...] = () is_superuser: bool = False is_disabled: bool = False
@property def is_node(self) -> bool: return self.principal_type == "node"
def has_permission(self, code: str) -> bool: return self.is_superuser or "all" in self.permission_codes or code in self.permission_codes
|
无论你是 HTTP 请求里的登录用户、WebSocket 连接上的节点、一次性下载票据、还是匿名访问——都用同一个 AuthPrincipal。中间件构造它,后续所有层只读它。不需要再判断”这个 request 是用户还是节点”,看 principal_type 就行。
同级的还有 RequestContext(携带 request_id、IP、语言等)和 DomainEvent(领域事件),都是 frozen 的。在代码规范里甚至有明确禁令:不可以在 DTO 上使用 getattr/setattr。
为什么用 dataclass 而不是 Pydantic?因为 dataclass 是标准库、零依赖、快。Pydantic 的验证和序列化能力在这里用不上——这些契约对象只在进程内传递,不序列化。
模块清单:声明式的权限和路由注册
Django 项目长大的一个征兆就是 URL 配置变成一坨意大利面。urls.py 里几百行 path(),你搞不清哪个 URL 要什么权限、哪个 WebSocket 动作归哪个模块管。
重构后,每个模块用 ModuleManifest 声明自己的一切:
1 2 3 4 5 6 7 8 9 10 11 12
| @dataclass(frozen=True) class ModuleManifest: module_id: str label: str url_prefix: str = "" urls_module: str = "" permissions: tuple[PermissionSpec, ...] = () permission_categories: tuple[PermissionCategorySpec, ...] = () routes: tuple[RouteSpec, ...] = () websocket_actions: tuple[WebSocketActionSpec, ...] = () dashboard_providers: tuple[DashboardProviderSpec, ...] = () scheduler_jobs: tuple[SchedulerJobSpec, ...] = ()
|
ModuleManifestRegistry 在启动时做全量校验——模块 ID 不能重复、URL 前缀不能冲突、权限码不能重复、路由引用的权限必须已声明、WebSocket 动作不能重复……
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
| class ModuleManifestRegistry: def validate(self) -> list[ManifestViolation]: violations = [] seen_modules = set() seen_permissions = set() seen_routes = set() seen_ws = set() for manifest in self._manifests: if manifest.module_id in seen_modules: violations.append(ManifestViolation("DUP_MODULE", ...)) for route in manifest.routes: violations.extend( _validate_requirement(route.permission, all_permissions, ...) ) return violations
|
这样做的好处是权限变成了一张全局表,哪些模块有哪些权限、谁依赖谁,一目了然。代码审查不需要再去找零散的 @permission_required 装饰器。
Command Bus:把 WebSocket 消息调度收口
重构之前,业务代码直接调 channel_layer.group_send() 发 WebSocket 消息,散落在各个 view 和 consumer 里。重构后,所有到节点的命令走同一条路——NodeCommandBus:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
| class NodeCommandBus: def __init__(self, adapter: ChannelsNodeAdapter | None = None): self._adapter = adapter or default_channels_node_adapter
async def send(self, command: NodeCommand) -> NodeCommandResult: payload = command.payload if is_dataclass(payload): payload = asdict(payload) event = { "type": "send.message", "action": command.action, "request_id": command.request_id, "payload": payload, "requires_signature": command.requires_signature, } await self._adapter.send_to_node(command.node_uuid, event) return NodeCommandResult(request_id=command.request_id, status="sent")
|
业务代码只跟 NodeCommand 和 NodeCommandResult 打交道,不碰 Channels 的底层 API。好处在哪?
- 单一出口——所有发往节点的消息都经过 bus,日志、签名、权限检查统一处理
- 可测试——测试时 inject 一个 mock adapter 就行,不需要起 Channels layer
- 可追踪——每条命令都有
request_id,从产生到送达全链路可追踪
新代码规范很明确:不允许在 bus 之外直接调 channel_layer.group_send()。
Event Bus:模块间松耦合
有了 Command Bus 管节点方向,模块之间的横向通知就要靠 Event Bus 了:
1 2 3 4 5 6 7 8 9 10 11 12 13 14
| class DomainEventBus: def __init__(self): self._subscribers: dict[str, list[Subscriber]] = defaultdict(list)
def subscribe(self, event_type: str, subscriber: Subscriber) -> None: self._subscribers[event_type].append(subscriber)
def publish(self, event: DomainEvent) -> list[object]: results = [] for subscriber in tuple(self._subscribers.get(event.event_type, ())): results.append(subscriber(event)) for subscriber in tuple(self._subscribers.get("*", ())): results.append(subscriber(event)) return results
|
简单到只有两个方法,但这就够了。* 通配符订阅让审计模块可以监听所有事件做日志,不需要每个事件都显式通知它。
事件也是 frozen dataclass:
1 2 3 4 5 6 7 8 9 10
| @dataclass(frozen=True) class DomainEvent: event_id: str event_type: str occurred_at: datetime actor: AuthPrincipal | None payload: object aggregate_type: str = "" aggregate_id: str = "" idempotency_key: str = ""
|
idempotency_key 是给消费端用的——同一个事件处理两次不会出事。这个后面 Outbox 模式会用到。
Unit of Work + Outbox:把事务做对
Django 的 transaction.atomic() 够用,但有个经典问题:业务逻辑在事务里写了一堆东西到数据库,事务提交前发了 WebSocket 消息。结果消息先到了消费端,消费端查数据库——还没提交呢,查不到。
UnitOfWork 极其简单,就是 transaction.atomic() 的薄封装:
1 2 3 4 5 6 7 8 9 10
| class UnitOfWork: @contextmanager def atomic(self) -> Iterator[None]: with transaction.atomic(): yield
def on_commit(self, callback: Callable[[], object]) -> None: transaction.on_commit(callback)
default_unit_of_work = UnitOfWork()
|
但配合 OutboxPublisher 就有意思了:
1 2 3 4 5 6 7
| class OutboxPublisher: def publish_after_commit(self, event: OutboxEvent) -> OutboxEvent: if self._use_persistent: transaction.on_commit(lambda: self._persist_event(event)) else: transaction.on_commit(lambda: self._enqueue_memory(event)) return event
|
on_commit 的回调在事务成功提交后执行。如果事务回滚了,事件也不会被persist——数据一致性天然保证。
OutboxPublisher 有两种模式:持久模式写入 OutboxRecord 数据库表,供 worker 消费;内存模式用 deque 做进程内分发,适合开发环境和测试。
1 2 3 4 5 6 7 8 9
| @dataclass(frozen=True) class OutboxEvent: event_id: str event_type: str aggregate_type: str aggregate_id: str idempotency_key: str payload: Any status: Literal["pending", "sent", "failed"] = "pending"
|
Worker 消费后改状态为 sent;失败了改 failed,retry_count 加 1。有 retry_failed(max_retries=3) 方法捞回失败的事件重试。整套机制保证了”业务操作成功则事件一定投递,业务操作失败则事件一定不投递”。
不只是代码,更是约束
这次重构最有价值的不是代码本身,而是写在项目规范文档里的约束规则:
- DTO 必须用
frozen=True 的 dataclass - 不允许在 DTO 上使用
getattr/setattr - 发往节点的消息必须走
NodeCommandBus,禁止直接 channel_layer.group_send() - 所有跨模块事件走
DomainEventBus - 有副作用的操作必须包在
UnitOfWork.atomic() 里 - 事件投递必须用
OutboxPublisher.publish_after_commit()
代码是人写的,人会犯错。但架构约束写进了开发规范,lint 也能检查,新来的人只要按规范走,就不会写出面条代码。
Django 不会告诉你”这里应该分层”——它甚至鼓励你把所有东西塞进 View。但当一个项目长到十几万行,分层就是刚需,不是可选。把契约、总线、事件、Outbox 这些概念从业务代码里提取出来放进 core/ 层,剩下的业务模块只用关心自己的事。