Why your pipeline should assume failure
Every pipeline fails eventually. A broker drops, a deploy ships a bad schema, a job runs out of memory at 3 am. The question isn’t if but when — and what your system does about it.
After enough late-night pages, I settled on one principle: design for failure from the start, don’t bolt it on later. The pipelines I’ve seen get retired weren’t the clever ones — they were the ones that couldn’t recover gracefully. Here are the three pillars I build on every time.
Failure is the default, not the exception
In a distributed system, every network call can fail. Every disk write can be interrupted. Every schema change can surface a bug that wasn’t visible in staging. Treating failure as an edge case is the mistake — it’s the steady state you operate inside.
A Kafka broker restarts. Your consumer group re-balances. The transient timeout that looked harmless gets retried — and suddenly you have duplicate rows in your aggregate table because nobody made the write idempotent.
The cost of that assumption compounds: engineers add DISTINCT hacks, dashboards silently report inflated numbers, and on-call runbooks grow a new section titled “manual dedup if count drifts.” None of that work would exist if the write had been safe to repeat from day one.
The mindset shift is small but load-bearing: assume the message will be delivered more than once, assume the job will be interrupted mid-run, assume the network will partition at the worst possible moment — and design the code to be correct under all three.
Idempotency: make every write safe to repeat
An idempotent operation produces the same result whether it runs once or ten times. It’s the property that makes retries safe — and retries are how distributed systems recover without human intervention.
The practical move: write by natural key with an upsert, never a blind INSERT. When a job re-runs after a crash at row 40,000, it overwrites the correct rows rather than appending duplicates.
-- idempotent upsert: safe to run on replay
INSERT INTO daily_metrics (event_date, user_id, event_count)
SELECT
DATE(event_ts) AS event_date,
user_id,
COUNT(*) AS event_count
FROM staging_events
WHERE batch_id = :batch_id
GROUP BY 1, 2
ON CONFLICT (event_date, user_id)
DO UPDATE SET
event_count = EXCLUDED.event_count,
updated_at = NOW();
The batch_id parameter is the key detail. Every run gets the same batch_id for the same logical window — so re-running the query produces identical output. No cleanup step, no manual dedup, no incident.
For streaming, the same pattern applies at the consumer level: commit the offset together with the output inside one atomic transaction, or use an idempotent producer with a stable transaction_id. The two writes must be inseparable or you trade one failure mode for another.
Checkpointing: know exactly where to resume
The difference between resume and restart is a checkpoint. Without it, a crash forces a full replay from the beginning — every record re-processed, every downstream system hit again, every API quota burned.
With checkpoints, recovery is surgical. The job reads the last committed position, skips everything before it, and continues from exactly that offset. A 6-hour backfill that crashes at hour 5 costs one hour of re-work, not six.
Checkpoint design has two rules:
-
Write atomically. The offset and the output go in together. If you commit the output first and then crash before committing the offset, you’ll skip records on the next run. If you commit the offset first and crash before the output, you’ll silently drop data. Neither is acceptable — use a transaction or an idempotent write that naturally ties the two together.
-
Make checkpoints granular enough to matter. A checkpoint every 100,000 rows costs almost nothing and caps your worst-case replay at 100,000 rows. A checkpoint per batch of 10 million rows means a crash at row 9.9M replays 9.9M rows on recovery.
# resume from the last committed checkpoint
state = store.load(key)
for ev in stream.since(state.offset):
out = transform(ev)
# atomic: write result and advance offset in one call
store.commit(ev.offset, out)
The store.commit call here is the load-bearing line. It must be atomic — either both the offset and the output are persisted, or neither is. In practice this means: a single MULTI/EXEC in Redis, a single BEGIN/COMMIT in Postgres, or a transactional Kafka producer with send_offsets_to_transaction.
Self-healing recovery
Checkpoints and idempotency solve the mechanics. The last layer is making the system invoke those mechanics automatically, without a human waking up to trigger them.
A self-healing pipeline has three properties:
- It retries with backoff. Transient errors — network hiccups, a downstream rate-limit — resolve themselves if you wait. Retry immediately and you amplify the problem. Retry with exponential backoff (capped at a sane ceiling, say 5 minutes) and most incidents close themselves.
- It writes to a dead-letter queue. Records that fail after N retries go to a DLQ rather than halting the pipeline. The main flow keeps moving; bad records are inspected and replayed separately when the root cause is fixed.
- It surfaces the right metrics.
retry_count,dlq_size,checkpoint_lag_seconds— these three numbers tell you everything about pipeline health. Alert on them, not on raw error logs.
The recovery loop I reach for in practice:
MAX_RETRIES = 5
BACKOFF_BASE = 2 # seconds
for ev in stream.since(state.offset):
retries = 0
while retries < MAX_RETRIES:
try:
out = transform(ev)
store.commit(ev.offset, out)
break
except TransientError:
wait = BACKOFF_BASE ** retries
time.sleep(wait)
retries += 1
else:
dlq.send(ev) # exhausted retries → dead-letter
store.commit(ev.offset, None) # still advance offset
The store.commit after dlq.send is important: you advance past the bad record so the pipeline doesn’t stall. The DLQ is where the investigation happens; the main stream keeps flowing.
The one-line takeaway
Design for failure from the start — idempotent writes, granular checkpoints, automatic retry with a dead-letter escape valve. The result isn’t a more complex system. It’s a system where a crash at 3 am is a logged metric, not a pager alert.
“Designing for failure isn’t pessimism — it’s respect for how systems actually behave in production.”
Start small: pick one pipeline, make each stage retryable, then measure your page count before and after. The number will tell the story.
Mọi pipeline rồi sẽ hỏng. Một broker mất kết nối, một deploy đưa vào schema sai, một job hết bộ nhớ lúc 3 giờ sáng. Câu hỏi không phải là nếu mà là khi nào — và hệ thống của bạn ứng xử ra sao khi điều đó xảy ra.
Sau nhiều năm bị gọi dậy giữa đêm, tôi rút ra một nguyên tắc: thiết kế cho thất bại ngay từ đầu, đừng vá nó về sau. Những pipeline tôi thấy bị khai tử không phải là những cái phức tạp nhất — mà là những cái không thể phục hồi gọn gàng. Dưới đây là ba trụ cột tôi luôn áp dụng.
Lỗi là mặc định, không phải ngoại lệ
Trong hệ thống phân tán, mỗi lệnh gọi mạng đều có thể thất bại. Mỗi lần ghi đĩa đều có thể bị gián đoạn. Mỗi thay đổi schema đều có thể lộ ra một bug không nhìn thấy trên staging. Xem thất bại là trường hợp ngoại lệ chính là sai lầm — đó là trạng thái ổn định bạn đang vận hành bên trong.
Một Kafka broker khởi động lại. Consumer group của bạn re-balance. Cái timeout thoáng qua trông có vẻ vô hại được retry — và đột nhiên bạn có các hàng trùng lặp trong bảng aggregate vì không ai làm cho lệnh ghi trở nên idempotent.
Chi phí của giả định đó cộng dồn: kỹ sư thêm hack DISTINCT, dashboard âm thầm báo con số phồng lên, và runbook on-call mọc thêm mục “manual dedup if count drifts”. Tất cả những công việc đó sẽ không tồn tại nếu lệnh ghi đã an toàn để lặp lại ngay từ ngày đầu.
Sự thay đổi tư duy nhỏ nhưng quan trọng: giả sử message sẽ được giao hơn một lần, giả sử job sẽ bị ngắt giữa chừng, giả sử mạng sẽ phân vùng vào lúc tệ nhất — và thiết kế code để đúng trong cả ba trường hợp.
Idempotency: làm cho mỗi lệnh ghi an toàn khi lặp lại
Một thao tác idempotent tạo ra kết quả giống nhau dù chạy một lần hay mười lần. Đó là thuộc tính giúp retry an toàn — và retry là cách hệ thống phân tán phục hồi mà không cần con người can thiệp.
Mẹo thực dụng: ghi theo khoá tự nhiên với upsert, đừng dùng INSERT mù quáng. Khi job chạy lại sau crash ở hàng thứ 40.000, nó ghi đè đúng hàng thay vì thêm bản trùng.
-- idempotent upsert: safe to run on replay
INSERT INTO daily_metrics (event_date, user_id, event_count)
SELECT
DATE(event_ts) AS event_date,
user_id,
COUNT(*) AS event_count
FROM staging_events
WHERE batch_id = :batch_id
GROUP BY 1, 2
ON CONFLICT (event_date, user_id)
DO UPDATE SET
event_count = EXCLUDED.event_count,
updated_at = NOW();
Tham số batch_id là chi tiết then chốt. Mỗi lần chạy nhận cùng batch_id cho cùng cửa sổ thời gian — nên chạy lại query tạo ra kết quả giống hệt. Không cần bước cleanup, không cần dedup thủ công, không có incident.
Với streaming, cùng pattern áp dụng ở cấp consumer: commit offset cùng với kết quả trong một transaction atomic, hoặc dùng idempotent producer với transaction_id ổn định. Hai lệnh ghi phải không thể tách rời — nếu không bạn đổi một failure mode này lấy cái khác.
Checkpoint: biết chính xác nơi cần tiếp tục
Sự khác biệt giữa tiếp tục và khởi động lại là checkpoint. Không có checkpoint, một crash buộc phải replay toàn bộ từ đầu — mọi bản ghi được xử lý lại, mọi hệ thống downstream bị gọi lại, mọi quota API bị tiêu hết.
Với checkpoint, phục hồi rất chính xác. Job đọc vị trí đã commit cuối cùng, bỏ qua tất cả những gì trước đó, và tiếp tục từ đúng offset đó. Một backfill 6 tiếng crash ở giờ thứ 5 chỉ tốn thêm một giờ xử lý lại, không phải sáu.
Thiết kế checkpoint có hai quy tắc:
-
Ghi nguyên tử. Offset và kết quả phải vào cùng nhau. Nếu bạn commit kết quả trước rồi crash trước khi commit offset, lần chạy tiếp theo sẽ bỏ qua các bản ghi đó. Nếu commit offset trước và crash trước khi có kết quả, bạn âm thầm mất dữ liệu. Cả hai đều không chấp nhận được — dùng transaction hoặc lệnh ghi idempotent gắn hai thứ lại với nhau.
-
Làm cho checkpoint đủ chi tiết để có ý nghĩa. Checkpoint mỗi 100.000 hàng gần như không tốn gì và giới hạn replay tệ nhất là 100.000 hàng. Checkpoint mỗi batch 10 triệu hàng nghĩa là crash ở hàng 9,9 triệu phải replay 9,9 triệu hàng khi phục hồi.
# resume from the last committed checkpoint
state = store.load(key)
for ev in stream.since(state.offset):
out = transform(ev)
# atomic: write result and advance offset in one call
store.commit(ev.offset, out)
Lời gọi store.commit ở đây là dòng quan trọng nhất. Nó phải nguyên tử — hoặc cả offset lẫn kết quả đều được lưu, hoặc không cái nào cả. Trong thực tế điều này có nghĩa là: một lệnh MULTI/EXEC trong Redis, một lệnh BEGIN/COMMIT trong Postgres, hoặc Kafka producer có giao dịch với send_offsets_to_transaction.
Phục hồi tự lành
Checkpoint và idempotency giải quyết cơ chế. Lớp cuối cùng là làm cho hệ thống tự động gọi những cơ chế đó — không cần con người thức dậy để kích hoạt.
Một pipeline tự lành có ba thuộc tính:
- Retry với backoff. Các lỗi thoáng qua — mạng chập chờn, downstream bị rate-limit — tự giải quyết nếu bạn chờ. Retry ngay lập tức sẽ khuếch đại vấn đề. Retry với exponential backoff (giới hạn ở mức hợp lý, ví dụ 5 phút) và hầu hết incident sẽ tự đóng lại.
- Ghi vào dead-letter queue. Các bản ghi thất bại sau N lần retry đi vào DLQ thay vì làm dừng pipeline. Luồng chính tiếp tục chạy; các bản ghi xấu được kiểm tra và replay riêng khi tìm ra nguyên nhân gốc rễ.
- Đo đúng metric.
retry_count,dlq_size,checkpoint_lag_seconds— ba con số này cho bạn biết mọi thứ về sức khỏe pipeline. Cảnh báo dựa trên chúng, không phải trên raw error log.
Vòng phục hồi tôi hay dùng trong thực tế:
MAX_RETRIES = 5
BACKOFF_BASE = 2 # seconds
for ev in stream.since(state.offset):
retries = 0
while retries < MAX_RETRIES:
try:
out = transform(ev)
store.commit(ev.offset, out)
break
except TransientError:
wait = BACKOFF_BASE ** retries
time.sleep(wait)
retries += 1
else:
dlq.send(ev) # exhausted retries → dead-letter
store.commit(ev.offset, None) # still advance offset
Lệnh store.commit sau dlq.send rất quan trọng: bạn tiến qua bản ghi xấu để pipeline không bị kẹt. DLQ là nơi điều tra xảy ra; luồng chính tiếp tục chạy.
Điều cần ghi nhớ trong một dòng
Thiết kế cho thất bại ngay từ đầu — lệnh ghi idempotent, checkpoint đủ chi tiết, retry tự động với van thoát dead-letter. Kết quả không phải là hệ thống phức tạp hơn. Mà là hệ thống mà một crash lúc 3 giờ sáng chỉ là một metric được log, không phải một cảnh báo pager.
“Thiết kế cho thất bại không phải bi quan — đó là tôn trọng thực tế vận hành.”
Bắt đầu nhỏ: chọn một pipeline, làm cho mỗi tầng có thể retry, rồi đo số lần bị gọi trước và sau. Con số sẽ tự nói lên điều cần nói.