事务性 Outbox 模式在 Django 里的落地:消息绝不丢,也绝不提前到

分布式系统里有个经典问题:业务数据写数据库成功了,但通知消息还没发出去——进程就挂了。或者反过来,消息先发了,业务数据入库失败——消费端收到了一个不存在的事件。

Django 项目里这问题尤其常见。你在一个 transaction.atomic() 里做了这两件事:

1
2
3
4
5
6
with transaction.atomic():
order = Order.objects.create(...)
channel_layer.group_send(f"order_{order.id}", {
"type": "order.created",
"order_id": order.id,
})

看似没问题,但如果 group_send 执行了而事务回滚了——消费端收到一个已经被回滚的 order_id,去数据库里查,查不到。反过来,如果事务提交了但 group_send 之前进程崩了——消息就丢了,消费端永远不知道这个订单被创建了。

ServerManager 的 UnitOfWorkOutboxPublisher 把这个问题彻底解决了。

UnitOfWork:transaction.atomic() 的门面

UnitOfWork 只有一层薄包装:

1
2
3
4
5
6
7
8
9
10
class UnitOfWork:
@contextmanager
def atomic(self) -> Iterator[None]:
with transaction.atomic():
yield

def on_commit(self, callback: Callable[[], object]) -> None:
transaction.on_commit(callback)

default_unit_of_work = UnitOfWork()

你没看错,就这。但 on_commit() 是关键——它注册的回调只在事务成功提交后才执行。回滚了?回调不执行。这让业务代码可以在事务里”预约”事件,不必担心事件提前泄露。

OutboxPublisher:写 DB 才是真的发了

1
2
3
4
5
6
7
8
9
@dataclass(frozen=True)
class OutboxEvent:
event_id: str
event_type: str
aggregate_type: str
aggregate_id: str
idempotency_key: str
payload: Any
status: Literal["pending", "sent", "failed"] = "pending"

OutboxEvent 也是 frozen dataclass——不可变,创建后谁也改不了。

核心方法是 publish_after_commit()

1
2
3
4
5
6
7
8
9
10
11
12
class OutboxPublisher:
def __init__(self, *, use_persistent: bool = True):
self._use_persistent = use_persistent
self._memory_queue: deque[OutboxEvent] = deque(maxlen=10000)
self._handlers: dict[str, list[Callable]] = {}

def publish_after_commit(self, event: OutboxEvent) -> OutboxEvent:
if self._use_persistent:
transaction.on_commit(lambda: self._persist_event(event))
else:
transaction.on_commit(lambda: self._enqueue_memory(event))
return event

transaction.on_commit() 是 Django 提供的功能——注册的回调在当前事务提交后才执行。所以 self._persist_event(event) 只会在数据库写入成功后被调用。

持久模式的实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
def _persist_event(self, event: OutboxEvent) -> None:
try:
OutboxRecord.objects.update_or_create(
idempotency_key=event.idempotency_key,
defaults={
"event_id": event.event_id,
"event_type": event.event_type,
"aggregate_type": event.aggregate_type,
"aggregate_id": event.aggregate_id,
"payload": event.payload if isinstance(event.payload, dict) else {"data": str(event.payload)},
"status": "pending",
},
)
except Exception:
logger.exception("Failed to persist outbox event: %s", event.event_id)

update_or_create 配合 idempotency_key——相同幂等键的事件不会重复创建。即使 publish_after_commit() 被调用两次(比如业务代码写了两次同一个事件),数据库里也只有一条记录。

OutboxRecord:持久化的保证

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
class OutboxRecord(models.Model):
event_id = models.CharField(max_length=64, unique=True, primary_key=True)
event_type = models.CharField(max_length=128, db_index=True)
aggregate_type = models.CharField(max_length=128)
aggregate_id = models.CharField(max_length=128, db_index=True)
idempotency_key = models.CharField(max_length=128, unique=True)
payload = models.JSONField()
status = models.CharField(max_length=16, choices=[("pending", "Pending"), ("sent", "Sent"), ("failed", "Failed")], default="pending", db_index=True)
created_at = models.DateTimeField(auto_now_add=True)
updated_at = models.DateTimeField(auto_now=True)
retry_count = models.IntegerField(default=0)

class Meta:
db_table = "core_outbox_event"
indexes = [
models.Index(fields=["status", "created_at"], name="outbox_status_created_idx"),
]

联合索引 (status, created_at) 让 worker 可以高效地查出所有 status=pending 的记录,按创建时间排序消费。idempotency_key 的唯一约束防止重复。

Worker 消费:从 pending 到 sent

Worker 的逻辑很直白——轮询 pending 状态的事件,处理后标记为 sent

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
def consume_pending(self, limit: int = 100) -> list[OutboxEvent]:
if not self._use_persistent:
return list(self._memory_queue)
records = list(
OutboxRecord.objects.filter(status="pending")
.order_by("created_at")[:limit]
)
return [r.to_event() for r in records]

def mark_sent(self, event_id: str) -> None:
if self._use_persistent:
OutboxRecord.objects.filter(event_id=event_id).update(status="sent")
# 内存模式直接从队列里移除
self._memory_queue = deque(
(e for e in self._memory_queue if e.event_id != event_id),
maxlen=self._memory_queue.maxlen,
)

消息处理成功就 mark_sent,事件从 pendingsent,完成一次投递生命周期。

失败重试:三次机会

万一 Worker 处理失败了?mark_failed()retry_failed() 配合使用:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
def mark_failed(self, event_id: str) -> None:
if self._use_persistent:
OutboxRecord.objects.filter(event_id=event_id).update(
status="failed",
retry_count=models.F("retry_count") + 1,
)

def retry_failed(self, max_retries: int = 3) -> list[OutboxEvent]:
if not self._use_persistent:
return []
records = list(
OutboxRecord.objects.filter(
status="failed",
retry_count__lt=max_retries,
).order_by("created_at")[:100]
)
ids = [r.event_id for r in records]
OutboxRecord.objects.filter(event_id__in=ids).update(status="pending")
return [r.to_event() for r in records]

失败后 retry_count 加 1,状态改为 failed。定时任务调用 retry_failed(3) 把重试次数不到 3 次的失败事件捞出来,状态改回 pending,交给 Worker 重新处理。超过 3 次的?留在 failed 状态,等人工排查。

内存模式:开发环境的简单回退

不是所有场景都需要持久化——开发环境可能跑的是 SQLite,起 Worker 太重了。use_persistent=False 切到内存模式:

1
2
3
4
5
6
7
8
9
10
11
def _enqueue_memory(self, event: OutboxEvent) -> None:
self._memory_queue.append(event)
self._dispatch_memory(event)

def _dispatch_memory(self, event: OutboxEvent) -> None:
handlers = self._handlers.get(event.event_type, [])
for handler in handlers:
try:
handler(event)
except Exception:
logger.exception("Handler failed for event: %s", event.event_id)

内存模式用 deque(maxlen=10000) 存事件,同时直接调注册的 handler。不写数据库,不启动 Worker,事务里 on_commit 触发后同步执行。当然,进程挂了就没了——开发环境可以接受。

注册 handler:

1
2
def register_handler(self, event_type: str, handler: Callable[[OutboxEvent], None]) -> None:
self._handlers.setdefault(event_type, []).append(handler)

DomainEventBus 类似,但 Outbox 的 handler 处理的是投递层面的事(发 WebSocket、调 webhooks 等),而 EventBus 的 handler 处理的是业务逻辑层面的响应。

业务代码怎么用

一个完整的例子——创建集群后发送事件:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
from apps.core.persistence.outbox import default_outbox_publisher, OutboxEvent
from apps.core.persistence.unit_of_work import default_unit_of_work
from apps.core.contracts.auth import AuthPrincipal
import uuid
from datetime import datetime

def create_group(name, user_principal: AuthPrincipal):
with default_unit_of_work.atomic():
group = Node_Group.objects.create(name=name)

# 事务内预约事件
default_outbox_publisher.publish_after_commit(OutboxEvent(
event_id=str(uuid.uuid4()),
event_type="group.created",
occurred_at=datetime.now(),
actor=user_principal,
payload={"group_id": group.id, "name": group.name},
aggregate_type="group",
aggregate_id=str(group.id),
idempotency_key=f"group.created:{group.id}",
))

# 事务提交后,OutboxPublisher 才把事件持久化
# Worker 捞取后发 WebSocket 通知

idempotency_keygroup.created:{group.id} 格式——同一个集群的创建事件只会有一条记录,即使 publish_after_commit 被意外调用两次。

为什么不用消息队列?

Redis 的 list 已经做了任务队列(apps/workers/task_queue.py),为什么 Outbox 还要用数据库?

因为数据库事务和消息队列是两个不同的保障域。Redis list 的 LPUSH 是即时操作,没法跟 Django 的 transaction.atomic() 绑定——如果事务回滚,已经 LPUSH 的消息没法自动撤回。但 OutboxRecord 是数据库表,跟业务数据在同一个事务里——要么一起提交,要么一起回滚。这是只有关系型数据库才能给的保证。

Redis 任务队列适合”即发即忘”的任务——发邮件、生成报表、下载文件。Outbox 适合”业务数据不丢,事件就不丢”的场景——订单创建、节点上线、集群删除这种核心业务事件。

两者不冲突,各管各的。

欢迎关注我的其它发布渠道