分布式系统里有个经典问题:业务数据写数据库成功了,但通知消息还没发出去——进程就挂了。或者反过来,消息先发了,业务数据入库失败——消费端收到了一个不存在的事件。
Django 项目里这问题尤其常见。你在一个 transaction.atomic() 里做了这两件事:
1 2 3 4 5 6 with transaction.atomic(): order = Order.objects.create(...) channel_layer.group_send(f"order_{order.id } " , { "type" : "order.created" , "order_id" : order.id , })
看似没问题,但如果 group_send 执行了而事务回滚了——消费端收到一个已经被回滚的 order_id,去数据库里查,查不到。反过来,如果事务提交了但 group_send 之前进程崩了——消息就丢了,消费端永远不知道这个订单被创建了。
ServerManager 的 UnitOfWork 和 OutboxPublisher 把这个问题彻底解决了。
UnitOfWork:transaction.atomic() 的门面 UnitOfWork 只有一层薄包装:
1 2 3 4 5 6 7 8 9 10 class UnitOfWork : @contextmanager def atomic (self ) -> Iterator[None ]: with transaction.atomic(): yield def on_commit (self, callback: Callable [[], object ] ) -> None : transaction.on_commit(callback) default_unit_of_work = UnitOfWork()
你没看错,就这。但 on_commit() 是关键——它注册的回调只在事务成功提交后才执行。回滚了?回调不执行。这让业务代码可以在事务里”预约”事件,不必担心事件提前泄露。
OutboxPublisher:写 DB 才是真的发了 1 2 3 4 5 6 7 8 9 @dataclass(frozen=True ) class OutboxEvent : event_id: str event_type: str aggregate_type: str aggregate_id: str idempotency_key: str payload: Any status: Literal ["pending" , "sent" , "failed" ] = "pending"
OutboxEvent 也是 frozen dataclass——不可变,创建后谁也改不了。
核心方法是 publish_after_commit():
1 2 3 4 5 6 7 8 9 10 11 12 class OutboxPublisher : def __init__ (self, *, use_persistent: bool = True ): self ._use_persistent = use_persistent self ._memory_queue: deque[OutboxEvent] = deque(maxlen=10000 ) self ._handlers: dict [str , list [Callable ]] = {} def publish_after_commit (self, event: OutboxEvent ) -> OutboxEvent: if self ._use_persistent: transaction.on_commit(lambda : self ._persist_event(event)) else : transaction.on_commit(lambda : self ._enqueue_memory(event)) return event
transaction.on_commit() 是 Django 提供的功能——注册的回调在当前事务提交后才执行。所以 self._persist_event(event) 只会在数据库写入成功后被调用。
持久模式的实现:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 def _persist_event (self, event: OutboxEvent ) -> None : try : OutboxRecord.objects.update_or_create( idempotency_key=event.idempotency_key, defaults={ "event_id" : event.event_id, "event_type" : event.event_type, "aggregate_type" : event.aggregate_type, "aggregate_id" : event.aggregate_id, "payload" : event.payload if isinstance (event.payload, dict ) else {"data" : str (event.payload)}, "status" : "pending" , }, ) except Exception: logger.exception("Failed to persist outbox event: %s" , event.event_id)
update_or_create 配合 idempotency_key——相同幂等键的事件不会重复创建。即使 publish_after_commit() 被调用两次(比如业务代码写了两次同一个事件),数据库里也只有一条记录。
OutboxRecord:持久化的保证 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 class OutboxRecord (models.Model): event_id = models.CharField(max_length=64 , unique=True , primary_key=True ) event_type = models.CharField(max_length=128 , db_index=True ) aggregate_type = models.CharField(max_length=128 ) aggregate_id = models.CharField(max_length=128 , db_index=True ) idempotency_key = models.CharField(max_length=128 , unique=True ) payload = models.JSONField() status = models.CharField(max_length=16 , choices=[("pending" , "Pending" ), ("sent" , "Sent" ), ("failed" , "Failed" )], default="pending" , db_index=True ) created_at = models.DateTimeField(auto_now_add=True ) updated_at = models.DateTimeField(auto_now=True ) retry_count = models.IntegerField(default=0 ) class Meta : db_table = "core_outbox_event" indexes = [ models.Index(fields=["status" , "created_at" ], name="outbox_status_created_idx" ), ]
联合索引 (status, created_at) 让 worker 可以高效地查出所有 status=pending 的记录,按创建时间排序消费。idempotency_key 的唯一约束防止重复。
Worker 消费:从 pending 到 sent Worker 的逻辑很直白——轮询 pending 状态的事件,处理后标记为 sent:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 def consume_pending (self, limit: int = 100 ) -> list [OutboxEvent]: if not self ._use_persistent: return list (self ._memory_queue) records = list ( OutboxRecord.objects.filter (status="pending" ) .order_by("created_at" )[:limit] ) return [r.to_event() for r in records]def mark_sent (self, event_id: str ) -> None : if self ._use_persistent: OutboxRecord.objects.filter (event_id=event_id).update(status="sent" ) self ._memory_queue = deque( (e for e in self ._memory_queue if e.event_id != event_id), maxlen=self ._memory_queue.maxlen, )
消息处理成功就 mark_sent,事件从 pending → sent,完成一次投递生命周期。
失败重试:三次机会 万一 Worker 处理失败了?mark_failed() 和 retry_failed() 配合使用:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 def mark_failed (self, event_id: str ) -> None : if self ._use_persistent: OutboxRecord.objects.filter (event_id=event_id).update( status="failed" , retry_count=models.F("retry_count" ) + 1 , )def retry_failed (self, max_retries: int = 3 ) -> list [OutboxEvent]: if not self ._use_persistent: return [] records = list ( OutboxRecord.objects.filter ( status="failed" , retry_count__lt=max_retries, ).order_by("created_at" )[:100 ] ) ids = [r.event_id for r in records] OutboxRecord.objects.filter (event_id__in=ids).update(status="pending" ) return [r.to_event() for r in records]
失败后 retry_count 加 1,状态改为 failed。定时任务调用 retry_failed(3) 把重试次数不到 3 次的失败事件捞出来,状态改回 pending,交给 Worker 重新处理。超过 3 次的?留在 failed 状态,等人工排查。
内存模式:开发环境的简单回退 不是所有场景都需要持久化——开发环境可能跑的是 SQLite,起 Worker 太重了。use_persistent=False 切到内存模式:
1 2 3 4 5 6 7 8 9 10 11 def _enqueue_memory (self, event: OutboxEvent ) -> None : self ._memory_queue.append(event) self ._dispatch_memory(event)def _dispatch_memory (self, event: OutboxEvent ) -> None : handlers = self ._handlers.get(event.event_type, []) for handler in handlers: try : handler(event) except Exception: logger.exception("Handler failed for event: %s" , event.event_id)
内存模式用 deque(maxlen=10000) 存事件,同时直接调注册的 handler。不写数据库,不启动 Worker,事务里 on_commit 触发后同步执行。当然,进程挂了就没了——开发环境可以接受。
注册 handler:
1 2 def register_handler (self, event_type: str , handler: Callable [[OutboxEvent], None ] ) -> None : self ._handlers.setdefault(event_type, []).append(handler)
跟 DomainEventBus 类似,但 Outbox 的 handler 处理的是投递层面的事(发 WebSocket、调 webhooks 等),而 EventBus 的 handler 处理的是业务逻辑层面的响应。
业务代码怎么用 一个完整的例子——创建集群后发送事件:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 from apps.core.persistence.outbox import default_outbox_publisher, OutboxEventfrom apps.core.persistence.unit_of_work import default_unit_of_workfrom apps.core.contracts.auth import AuthPrincipalimport uuidfrom datetime import datetimedef create_group (name, user_principal: AuthPrincipal ): with default_unit_of_work.atomic(): group = Node_Group.objects.create(name=name) default_outbox_publisher.publish_after_commit(OutboxEvent( event_id=str (uuid.uuid4()), event_type="group.created" , occurred_at=datetime.now(), actor=user_principal, payload={"group_id" : group.id , "name" : group.name}, aggregate_type="group" , aggregate_id=str (group.id ), idempotency_key=f"group.created:{group.id } " , ))
idempotency_key 用 group.created:{group.id} 格式——同一个集群的创建事件只会有一条记录,即使 publish_after_commit 被意外调用两次。
为什么不用消息队列? Redis 的 list 已经做了任务队列(apps/workers/task_queue.py),为什么 Outbox 还要用数据库?
因为数据库事务和消息队列是两个不同的保障域。Redis list 的 LPUSH 是即时操作,没法跟 Django 的 transaction.atomic() 绑定——如果事务回滚,已经 LPUSH 的消息没法自动撤回。但 OutboxRecord 是数据库表,跟业务数据在同一个事务里——要么一起提交,要么一起回滚。这是只有关系型数据库才能给的保证。
Redis 任务队列适合”即发即忘”的任务——发邮件、生成报表、下载文件。Outbox 适合”业务数据不丢,事件就不丢”的场景——订单创建、节点上线、集群删除这种核心业务事件。
两者不冲突,各管各的。