Exactly-once is a lie (and what to do instead)
Every Kafka tutorial, every Flink docs page, and every streaming vendor pitch eventually mentions exactly-once semantics. It sounds like a guarantee the framework provides so you can stop thinking about duplicates. It isn’t. What those systems actually deliver is at-least-once delivery plus idempotent producers and transactional writes within a single hop. The moment data crosses more than one hop — broker to stream processor to database sink — you are back to at-least-once. The duplicate problem doesn’t disappear; it moves to wherever your pipeline ends.
What “exactly-once” actually means
Kafka’s exactly-once semantics (EOS) guarantees that a producer will not write the same sequence of records more than once to a partition, and a transactional consumer-producer pair will either commit both the offset advance and the downstream write or neither. That is a strong guarantee, but it is scoped to one producer, one broker, one consumer-producer pair in the same transaction.
The sink at the other end of your pipeline is almost never inside that transaction boundary. A Postgres INSERT, a BigQuery streaming write, a REST call to an external service — none of them participate in Kafka’s transaction protocol. When the consumer crashes after writing to Postgres but before committing the Kafka offset, the record is reprocessed. Your sink sees it twice.
Flink’s “exactly-once checkpointing” is the same idea scoped differently: it guarantees that operator state is consistent with the checkpoint, not that external side effects are deduplicated. If the job fails mid-checkpoint and restores, any writes that happened after the last checkpoint are replayed.
The honest framing: exactly-once delivery is unsolvable across heterogeneous systems without distributed transactions, and distributed transactions are expensive. The practical answer is at-least-once delivery plus idempotent consumers.
At-least-once + idempotency = effectively-once
An idempotent operation produces the same result whether it runs once or ten times. An upsert keyed on a natural identifier is idempotent. An INSERT that appends a new row every time is not.
“Effectively-once” is the term some papers use for the combination: let the transport retry freely, design the sink so retries are harmless. The result is indistinguishable from exactly-once from the consumer’s perspective, without the coordination overhead of distributed transactions.
The shift in design posture is significant. Instead of asking “how do I prevent the message from being delivered twice?”, you ask “how do I make this write safe to replay?” That question has tractable answers.
Designing idempotent sinks: natural keys and upserts
The pattern is straightforward: identify a natural key that makes a record unique by business meaning, then make every write a MERGE or ON CONFLICT DO UPDATE against that key.
A natural key is not a surrogate sequence. It is something derived from the event itself: (order_id, event_type), (device_id, reading_ts), (user_id, date, metric_name). If you cannot find a natural key, that is a schema signal — the upstream event probably needs a stable event_id.
-- PostgreSQL: idempotent upsert for a device readings sink
INSERT INTO device_readings (device_id, reading_ts, value_celsius, source_topic, source_offset)
VALUES (%(device_id)s, %(reading_ts)s, %(value)s, %(topic)s, %(offset)s)
ON CONFLICT (device_id, reading_ts)
DO UPDATE SET
value_celsius = EXCLUDED.value_celsius,
source_offset = EXCLUDED.source_offset,
updated_at = now()
WHERE device_readings.source_offset < EXCLUDED.source_offset;
The WHERE clause in the DO UPDATE matters: it rejects a stale retry that arrives after a newer record has already been written for the same key. Without it, a late-arriving duplicate from an older Kafka offset could overwrite a more recent value.
For BigQuery or columnar stores that don’t support row-level upserts natively, the pattern shifts to periodic MERGE jobs: stream into a staging table with appends, then merge staging into the final table on a schedule. The staging table absorbs duplicates; the merge deduplicates by keeping the record with the latest source offset per key.
A few practical notes:
- Unique indexes are your enforcement layer. Declare
UNIQUE (device_id, reading_ts)explicitly; don’t rely only on application logic. - Include source offset in the sink row. It costs 8 bytes and gives you an audit trail plus the
WHEREguard shown above. - Composite keys with timestamps need timezone clarity. Store everything in UTC. A
reading_tsthat means different things in Europe and Asia will produce phantom duplicates.
Deduplication windows and their cost
Sometimes you don’t control the sink schema — you’re landing raw events into a data lake and a downstream model will query them. In that case, deduplication happens at read time via a window query or a periodic compaction job.
The canonical read-time dedup query uses a window function:
-- BigQuery / Spark SQL: deduplicate by keeping the record with the
-- highest source_offset per natural key
SELECT * EXCEPT(rn)
FROM (
SELECT *,
ROW_NUMBER() OVER (
PARTITION BY device_id, reading_ts
ORDER BY source_offset DESC
) AS rn
FROM device_readings_raw
)
WHERE rn = 1;
The cost of this approach is real: every query that uses the raw table must apply the dedup layer, or you build a materialized view that refreshes on a schedule. Latency goes up; compute goes up. The upsert approach at write time is almost always cheaper at query time — pay the dedup cost once at ingest, not on every read.
The window approach is still valuable for late-arriving corrections: if an upstream system emits a corrected record for the same event_id with a higher version number or offset, the window query picks it up automatically on the next refresh. An upsert sink would need an explicit version check in its DO UPDATE WHERE clause to achieve the same.
Takeaway
Exactly-once is not a property you receive from a framework. It is a property you build into your sinks. The mechanical steps are: find the natural key, make every write a MERGE or ON CONFLICT DO UPDATE, include source offset in the target row, and declare a UNIQUE constraint. Do those four things and you get effectively-once semantics regardless of how many times the broker retries. The framework’s exactly-once mode is still worth enabling — it reduces duplicate volume and simplifies auditing — but it is not the guarantee it sounds like. Treat it as a performance optimization, not a correctness guarantee.
Mọi bài hướng dẫn Kafka, mọi trang tài liệu Flink, và mọi bài pitch của vendor streaming đều đề cập tới exactly-once semantics. Nghe như một đảm bảo mà framework cung cấp để bạn không cần lo về duplicate. Thực tế không phải vậy. Điều những hệ thống đó thực sự cung cấp là at-least-once delivery cộng với idempotent producer và transactional write trong phạm vi một hop. Khi dữ liệu vượt qua nhiều hơn một hop — broker → stream processor → database sink — bạn quay lại at-least-once. Vấn đề duplicate không biến mất; nó dịch chuyển đến điểm cuối của pipeline.
”Exactly-once” thực sự có nghĩa là gì
Exactly-once semantics (EOS) của Kafka đảm bảo rằng một producer sẽ không ghi cùng một chuỗi record vào một partition quá một lần, và một cặp consumer-producer transactional sẽ commit cả hai — offset advance lẫn downstream write — hoặc không cái nào. Đây là đảm bảo mạnh, nhưng nó chỉ có phạm vi một producer, một broker, một cặp consumer-producer trong cùng transaction.
Sink ở đầu kia của pipeline của bạn hầu như không bao giờ nằm trong ranh giới transaction đó. Một INSERT vào Postgres, một streaming write vào BigQuery, một REST call tới dịch vụ bên ngoài — không cái nào tham gia vào giao thức transaction của Kafka. Khi consumer crash sau khi ghi vào Postgres nhưng trước khi commit offset Kafka, record được xử lý lại. Sink của bạn thấy nó hai lần.
“Exactly-once checkpointing” của Flink cũng là ý tưởng tương tự với phạm vi khác: nó đảm bảo operator state nhất quán với checkpoint, không phải là external side effect được dedup. Nếu job fail giữa chừng và restore, mọi write xảy ra sau checkpoint cuối cùng đều được replay.
Cách nói thẳng: exactly-once delivery là bài toán không giải được trên các hệ thống không đồng nhất mà không cần distributed transaction, và distributed transaction thì tốn kém. Đáp án thực tế là at-least-once delivery cộng idempotent consumer.
At-least-once + idempotency = effectively-once
Thao tác idempotent tạo ra cùng một kết quả dù chạy một lần hay mười lần. Upsert keyed trên natural identifier là idempotent. INSERT append một row mới mỗi lần thì không.
“Effectively-once” là thuật ngữ một số paper dùng cho tổ hợp này: để transport tự do retry, thiết kế sink sao cho retry vô hại. Kết quả không thể phân biệt với exactly-once từ góc nhìn của consumer, mà không cần overhead coordination của distributed transaction.
Sự thay đổi tư duy thiết kế là đáng kể. Thay vì hỏi “làm sao tôi ngăn message được deliver hai lần?”, bạn hỏi “làm sao tôi làm cho write này an toàn khi replay?” Câu hỏi đó có đáp án thực tế.
Thiết kế idempotent sink: natural key và upsert
Pattern này đơn giản: xác định natural key làm cho một record là duy nhất theo nghĩa nghiệp vụ, sau đó làm cho mỗi lần write là MERGE hoặc ON CONFLICT DO UPDATE trên key đó.
Natural key không phải surrogate sequence. Đó là thứ được dẫn xuất từ chính event: (order_id, event_type), (device_id, reading_ts), (user_id, date, metric_name). Nếu bạn không tìm được natural key, đó là tín hiệu từ schema — event upstream có lẽ cần một event_id ổn định.
-- PostgreSQL: idempotent upsert cho sink device readings
INSERT INTO device_readings (device_id, reading_ts, value_celsius, source_topic, source_offset)
VALUES (%(device_id)s, %(reading_ts)s, %(value)s, %(topic)s, %(offset)s)
ON CONFLICT (device_id, reading_ts)
DO UPDATE SET
value_celsius = EXCLUDED.value_celsius,
source_offset = EXCLUDED.source_offset,
updated_at = now()
WHERE device_readings.source_offset < EXCLUDED.source_offset;
Mệnh đề WHERE trong DO UPDATE quan trọng: nó từ chối retry cũ đến sau khi một record mới hơn đã được ghi cho cùng key. Nếu không có nó, một duplicate đến muộn từ offset Kafka cũ hơn có thể ghi đè lên giá trị mới hơn.
Với BigQuery hoặc columnar store không hỗ trợ row-level upsert tự nhiên, pattern chuyển sang các MERGE job định kỳ: stream vào staging table bằng append, sau đó merge staging vào table cuối theo lịch. Staging table hấp thụ duplicate; merge dedup bằng cách giữ record có source offset cao nhất trên mỗi key.
Một số lưu ý thực tế:
- Unique index là lớp enforcement. Khai báo
UNIQUE (device_id, reading_ts)tường minh; đừng chỉ dựa vào logic ứng dụng. - Đưa source offset vào sink row. Tốn 8 byte và cho bạn audit trail cộng guard
WHEREở trên. - Composite key có timestamp cần rõ timezone. Lưu tất cả theo UTC.
reading_tscó nghĩa khác nhau ở châu Âu và châu Á sẽ tạo ra phantom duplicate.
Deduplication window và chi phí của nó
Đôi khi bạn không kiểm soát được schema sink — bạn đang đổ raw event vào data lake và downstream model sẽ query chúng. Trong trường hợp đó, dedup xảy ra tại query time thông qua window query hoặc compaction job định kỳ.
Query dedup tại read time dùng window function:
-- BigQuery / Spark SQL: dedup bằng cách giữ record có
-- source_offset cao nhất trên mỗi natural key
SELECT * EXCEPT(rn)
FROM (
SELECT *,
ROW_NUMBER() OVER (
PARTITION BY device_id, reading_ts
ORDER BY source_offset DESC
) AS rn
FROM device_readings_raw
)
WHERE rn = 1;
Chi phí của cách này là thực: mỗi query dùng raw table phải áp dụng lớp dedup, hoặc bạn dựng materialized view refresh theo lịch. Latency tăng; compute tăng. Cách upsert tại write time hầu như luôn rẻ hơn tại query time — trả chi phí dedup một lần lúc ingest, không phải mỗi lần đọc.
Window approach vẫn có giá trị cho correction đến muộn: nếu upstream system emit một record đã sửa cho cùng event_id với version number hoặc offset cao hơn, window query tự động chọn nó trong lần refresh tiếp theo. Upsert sink cần một DO UPDATE WHERE với version check tường minh để đạt được điều tương tự.
Kết luận
Exactly-once không phải thuộc tính bạn nhận từ framework. Đó là thuộc tính bạn xây vào sink. Các bước cụ thể: tìm natural key, làm mỗi lần write thành MERGE hoặc ON CONFLICT DO UPDATE, đưa source offset vào target row, và khai báo constraint UNIQUE. Làm bốn điều đó và bạn có effectively-once semantics bất kể broker retry bao nhiêu lần. Exactly-once mode của framework vẫn nên bật — nó giảm duplicate volume và đơn giản hóa audit — nhưng nó không phải đảm bảo mạnh như nghe có vẻ. Hãy xem đó là tối ưu hiệu năng, không phải đảm bảo tính đúng đắn.