Taming late-arriving data with watermarks
In streaming systems, late data isn’t a bug — it’s a guarantee. Mobile devices go offline, network partitions delay events, batch micro-jobs flush hours after the fact. Building a pipeline that falls apart when events arrive out of order means rebuilding it the moment you hit production.
Event time vs processing time
Every streaming event carries two timestamps. Processing time is when your job sees the record — wall-clock time on the broker or the worker node. Event time is when the event actually happened, embedded in the payload by the producer.
The gap between the two is called skew. On a healthy pipeline serving a single datacenter, skew might be 200–400 ms. On a mobile app that syncs after regaining connectivity, skew routinely hits 15–30 minutes. A ride-hailing service once reported GPS pings arriving up to 4 hours late from devices in tunnels.
If you window on processing time you get fast results, but they drift with load spikes and network jitter. If you window on event time you get correct aggregations — but you need a mechanism to decide when “enough” events for a window have arrived. That mechanism is the watermark.
Watermarks: a heuristic for completeness
A watermark at time W is a claim: all events with event-time ≤ W have been observed. The runtime advances W as it ingests records. The simplest strategy is:
W(t) = max_seen_event_time − max_out_of_orderness
If the latest event you’ve seen has timestamp t=20 and you configure a 5-minute lag, your watermark sits at t=15. Any window that ends before t=15 can now be finalized.
In Apache Flink you wire this up with a WatermarkStrategy:
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.datastream.connectors.kafka import KafkaSource
from pyflink.common.watermark_strategy import WatermarkStrategy, Duration
from pyflink.common.serialization import SimpleStringSchema
env = StreamExecutionEnvironment.get_execution_environment()
watermark_strategy = (
WatermarkStrategy
.for_bounded_out_of_orderness(Duration.of_minutes(5))
.with_timestamp_assigner(
lambda event, _: event["event_ts_ms"] # extract event time from payload
)
.with_idleness(Duration.of_minutes(2)) # advance WM even when a partition goes quiet
)
source = (
KafkaSource.builder()
.set_bootstrap_servers("kafka:9092")
.set_topics("clickstream")
.set_value_only_deserializer(SimpleStringSchema())
.build()
)
stream = env.from_source(source, watermark_strategy, "clickstream-source")
Two details matter here. First, with_idleness prevents a stalled Kafka partition from pinning the global watermark — without it, one quiet shard blocks every downstream window. Second, the lag value (5 minutes) is a business decision, not a technical one: it trades result latency for completeness. Tighten it and you emit faster but drop more stragglers; loosen it and results arrive late but miss fewer records.
Windowing and allowed lateness
Once the watermark passes a window’s end timestamp, Flink fires the window and emits a result. But a 5-minute out-of-orderness bound doesn’t mean events can’t be even later than that. Allowed lateness is a second buffer: after the watermark fires a window, you keep the window state alive for an additional duration and re-fire on every late record that arrives within that buffer.
from pyflink.datastream.window import TumblingEventTimeWindows, Time
from pyflink.datastream.functions import ReduceFunction
result = (
stream
.key_by(lambda e: e["user_id"])
.window(TumblingEventTimeWindows.of(Time.minutes(10)))
.allowed_lateness(Time.minutes(2)) # keep window state open 2 min after WM fires it
.reduce(SumReducer())
)
With Spark Structured Streaming the equivalent is the watermark + dropDuplicates pattern, and the sink must be append- or update-mode aware. A common production setup:
-- Spark Structured Streaming DDL equivalent (Databricks Delta sink)
CREATE OR REPLACE STREAMING TABLE click_counts
TBLPROPERTIES ('pipelines.autoOptimize.zOrderCols' = 'user_id')
AS
SELECT
user_id,
window(event_ts, '10 minutes').start AS window_start,
COUNT(*) AS clicks
FROM STREAM(LIVE.raw_clicks)
WATERMARK event_ts DELAY OF INTERVAL 5 MINUTES
GROUP BY user_id, window(event_ts, '10 minutes');
<rect x="240" y="50" width="170" height="60" rx="6" fill="none" stroke="#e23a45" stroke-width="2.2"/>
<text x="325" y="85" font-family="JetBrains Mono, monospace" font-size="11" fill="#ff8088" text-anchor="middle">window [10,20)</text>
<text x="325" y="100" font-family="JetBrains Mono, monospace" font-size="10" fill="#8a7e7b" text-anchor="middle">OPEN</text>
<!-- allowed lateness extension -->
<rect x="220" y="50" width="20" height="60" rx="0" fill="rgba(226,58,69,0.15)" stroke="#e23a45" stroke-width="1.2" stroke-dasharray="4 3"/>
<text x="230" y="40" font-family="JetBrains Mono, monospace" font-size="10" fill="#e23a45" text-anchor="middle">+2 min</text>
<text x="230" y="30" font-family="JetBrains Mono, monospace" font-size="10" fill="#8a7e7b" text-anchor="middle">allowed</text>
<text x="230" y="20" font-family="JetBrains Mono, monospace" font-size="10" fill="#8a7e7b" text-anchor="middle">lateness</text>
<!-- side output box -->
<rect x="530" y="50" width="180" height="60" rx="6" fill="none" stroke="#8a7e7b" stroke-width="1.5" stroke-dasharray="6 4"/>
<text x="620" y="78" font-family="JetBrains Mono, monospace" font-size="11" fill="#8a7e7b" text-anchor="middle">side output</text>
<text x="620" y="95" font-family="JetBrains Mono, monospace" font-size="10" fill="#8a7e7b" text-anchor="middle">(stragglers)</text>
<!-- arrow from window to side output -->
<line x1="430" y1="80" x2="526" y2="80" stroke="#8a7e7b" stroke-width="1.5" stroke-dasharray="4 3"/>
<polygon points="526,75 538,80 526,85" fill="#8a7e7b"/>
<text x="460" y="70" font-family="JetBrains Mono, monospace" font-size="10" fill="#8a7e7b" text-anchor="middle">too late</text>
<!-- watermark label -->
<line x1="415" y1="30" x2="415" y2="145" stroke="#e23a45" stroke-width="2" stroke-dasharray="6 4"/>
<text x="420" y="155" font-family="JetBrains Mono, monospace" font-size="11" fill="#e23a45">WM</text>
<!-- horizontal base line -->
<line x1="40" y1="140" x2="720" y2="140" stroke="#8a7e7b" stroke-width="1" stroke-dasharray="2 4"/>
What to do with stragglers (side outputs)
Records that arrive after both the watermark and the allowed lateness window are dropped by default. That’s almost always wrong in production. Flink’s side output API lets you route them to a separate stream instead:
from pyflink.datastream.output_tag import OutputTag
late_tag = OutputTag("late-events")
windowed = (
stream
.key_by(lambda e: e["user_id"])
.window(TumblingEventTimeWindows.of(Time.minutes(10)))
.allowed_lateness(Time.minutes(2))
.side_output_late_data(late_tag)
.reduce(SumReducer())
)
late_stream = windowed.get_side_output(late_tag)
late_stream.sink_to(late_events_sink) # write to a correction table or DLQ
The late_stream carries the full original record. A common pattern is to sink it to a dead-letter queue (Kafka topic clicks.late) and run a daily reconciliation job that re-aggregates and upserts corrections into the main table. This keeps your real-time path fast and your data complete over a longer horizon.
Three things to monitor on this setup:
- Late-record rate (
late_events / total_events). A sudden spike — say from 0.3 % to 4 % — usually points to a producer clock drift or a deployment that flushed a large backlog. - Side-output lag: if
late_streamis growing and not draining, your correction job is behind. - Watermark stall: if the watermark stops advancing, check for an idle or stalled Kafka partition. The
with_idlenessguard handles transient stalls; persistent stalls need an alert.
Takeaway
Event time + watermarks + allowed lateness + side outputs is a four-part contract. Each part handles a specific failure mode:
- Event time gives you correct semantics regardless of arrival order.
- Watermarks let you close windows without waiting forever.
- Allowed lateness catches near-misses without reprocessing the full window.
- Side outputs preserve every record so you can audit and correct downstream.
The late-record rate is the single number that tells you how well this contract is holding. Track it per topic, per consumer group, and per window size. If it drifts, the problem is almost always upstream — a producer change, a mobile SDK update, or a timezone bug baked into an event-time field.
Trong hệ thống streaming, dữ liệu đến muộn không phải là lỗi — đó là điều chắc chắn xảy ra. Thiết bị di động mất kết nối, phân vùng mạng làm trễ sự kiện, các micro-batch job flush sau nhiều giờ. Xây dựng pipeline sụp đổ khi sự kiện đến không theo thứ tự đồng nghĩa với việc phải làm lại từ đầu ngay khi lên production.
Event time vs processing time
Mỗi sự kiện streaming mang hai timestamp. Processing time là thời điểm job của bạn nhìn thấy bản ghi — thời gian thực trên broker hoặc worker node. Event time là thời điểm sự kiện thực sự xảy ra, được nhúng vào payload bởi producer.
Khoảng cách giữa hai giá trị này gọi là skew. Trên một pipeline lành mạnh phục vụ một datacenter, skew có thể là 200–400 ms. Trên ứng dụng di động đồng bộ sau khi lấy lại kết nối, skew thường đạt 15–30 phút. Một dịch vụ gọi xe từng ghi nhận GPS ping đến muộn tới 4 giờ từ thiết bị trong hầm.
Nếu bạn window theo processing time, kết quả ra nhanh nhưng bị lệch theo load spike và network jitter. Nếu window theo event time, bạn có aggregation đúng — nhưng cần một cơ chế để xác định khi nào “đủ” sự kiện cho một window đã đến. Cơ chế đó là watermark.
Watermark: heuristic về tính đầy đủ
Watermark tại thời điểm W là một khẳng định: tất cả sự kiện có event-time ≤ W đã được quan sát. Runtime đẩy W tiến khi nhận bản ghi. Chiến lược đơn giản nhất là:
W(t) = max_seen_event_time − max_out_of_orderness
Nếu sự kiện mới nhất bạn thấy có timestamp t=20 và bạn cấu hình độ trễ 5 phút, watermark của bạn đứng ở t=15. Bất kỳ window nào kết thúc trước t=15 đều có thể được finalize.
Trong Apache Flink, bạn cấu hình điều này bằng WatermarkStrategy:
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.datastream.connectors.kafka import KafkaSource
from pyflink.common.watermark_strategy import WatermarkStrategy, Duration
from pyflink.common.serialization import SimpleStringSchema
env = StreamExecutionEnvironment.get_execution_environment()
watermark_strategy = (
WatermarkStrategy
.for_bounded_out_of_orderness(Duration.of_minutes(5))
.with_timestamp_assigner(
lambda event, _: event["event_ts_ms"] # lấy event time từ payload
)
.with_idleness(Duration.of_minutes(2)) # đẩy WM tiến kể cả khi partition im lặng
)
source = (
KafkaSource.builder()
.set_bootstrap_servers("kafka:9092")
.set_topics("clickstream")
.set_value_only_deserializer(SimpleStringSchema())
.build()
)
stream = env.from_source(source, watermark_strategy, "clickstream-source")
Hai chi tiết quan trọng ở đây. Thứ nhất, with_idleness ngăn một Kafka partition bị treo kéo ghim global watermark — nếu không có nó, một shard im lặng sẽ chặn mọi window downstream. Thứ hai, giá trị độ trễ (5 phút) là quyết định nghiệp vụ, không phải kỹ thuật: nó đánh đổi latency kết quả với tính đầy đủ. Rút ngắn thì emit nhanh hơn nhưng bỏ nhiều straggler hơn; nới rộng thì kết quả đến muộn hơn nhưng bỏ ít bản ghi hơn.
Windowing và allowed lateness
Khi watermark vượt qua timestamp kết thúc của một window, Flink fire window đó và emit kết quả. Nhưng giới hạn out-of-orderness 5 phút không có nghĩa là sự kiện không thể đến muộn hơn thế. Allowed lateness là buffer thứ hai: sau khi watermark fire một window, bạn giữ trạng thái window đó sống thêm một khoảng và re-fire với mỗi bản ghi trễ đến trong buffer đó.
from pyflink.datastream.window import TumblingEventTimeWindows, Time
from pyflink.datastream.functions import ReduceFunction
result = (
stream
.key_by(lambda e: e["user_id"])
.window(TumblingEventTimeWindows.of(Time.minutes(10)))
.allowed_lateness(Time.minutes(2)) # giữ trạng thái window 2 phút sau khi WM fire
.reduce(SumReducer())
)
Với Spark Structured Streaming, tương đương là pattern watermark + dropDuplicates, và sink phải hỗ trợ append- hoặc update-mode. Một setup production phổ biến:
-- Spark Structured Streaming DDL tương đương (Databricks Delta sink)
CREATE OR REPLACE STREAMING TABLE click_counts
TBLPROPERTIES ('pipelines.autoOptimize.zOrderCols' = 'user_id')
AS
SELECT
user_id,
window(event_ts, '10 minutes').start AS window_start,
COUNT(*) AS clicks
FROM STREAM(LIVE.raw_clicks)
WATERMARK event_ts DELAY OF INTERVAL 5 MINUTES
GROUP BY user_id, window(event_ts, '10 minutes');
<rect x="240" y="50" width="170" height="60" rx="6" fill="none" stroke="#e23a45" stroke-width="2.2"/>
<text x="325" y="85" font-family="JetBrains Mono, monospace" font-size="11" fill="#ff8088" text-anchor="middle">window [10,20)</text>
<text x="325" y="100" font-family="JetBrains Mono, monospace" font-size="10" fill="#8a7e7b" text-anchor="middle">OPEN</text>
<!-- allowed lateness extension -->
<rect x="220" y="50" width="20" height="60" rx="0" fill="rgba(226,58,69,0.15)" stroke="#e23a45" stroke-width="1.2" stroke-dasharray="4 3"/>
<text x="230" y="40" font-family="JetBrains Mono, monospace" font-size="10" fill="#e23a45" text-anchor="middle">+2 min</text>
<text x="230" y="30" font-family="JetBrains Mono, monospace" font-size="10" fill="#8a7e7b" text-anchor="middle">allowed</text>
<text x="230" y="20" font-family="JetBrains Mono, monospace" font-size="10" fill="#8a7e7b" text-anchor="middle">lateness</text>
<!-- side output box -->
<rect x="530" y="50" width="180" height="60" rx="6" fill="none" stroke="#8a7e7b" stroke-width="1.5" stroke-dasharray="6 4"/>
<text x="620" y="78" font-family="JetBrains Mono, monospace" font-size="11" fill="#8a7e7b" text-anchor="middle">side output</text>
<text x="620" y="95" font-family="JetBrains Mono, monospace" font-size="10" fill="#8a7e7b" text-anchor="middle">(stragglers)</text>
<!-- arrow from window to side output -->
<line x1="430" y1="80" x2="526" y2="80" stroke="#8a7e7b" stroke-width="1.5" stroke-dasharray="4 3"/>
<polygon points="526,75 538,80 526,85" fill="#8a7e7b"/>
<text x="460" y="70" font-family="JetBrains Mono, monospace" font-size="10" fill="#8a7e7b" text-anchor="middle">quá muộn</text>
<!-- watermark label -->
<line x1="415" y1="30" x2="415" y2="145" stroke="#e23a45" stroke-width="2" stroke-dasharray="6 4"/>
<text x="420" y="155" font-family="JetBrains Mono, monospace" font-size="11" fill="#e23a45">WM</text>
<!-- horizontal base line -->
<line x1="40" y1="140" x2="720" y2="140" stroke="#8a7e7b" stroke-width="1" stroke-dasharray="2 4"/>
Xử lý straggler bằng side output
Bản ghi đến sau cả watermark lẫn cửa sổ allowed lateness mặc định bị bỏ. Điều đó hầu như luôn sai trong production. API side output của Flink cho phép bạn định tuyến chúng sang một stream riêng thay thế:
from pyflink.datastream.output_tag import OutputTag
late_tag = OutputTag("late-events")
windowed = (
stream
.key_by(lambda e: e["user_id"])
.window(TumblingEventTimeWindows.of(Time.minutes(10)))
.allowed_lateness(Time.minutes(2))
.side_output_late_data(late_tag)
.reduce(SumReducer())
)
late_stream = windowed.get_side_output(late_tag)
late_stream.sink_to(late_events_sink) # ghi vào correction table hoặc DLQ
late_stream mang toàn bộ bản ghi gốc. Pattern phổ biến là sink nó vào dead-letter queue (Kafka topic clicks.late) và chạy một reconciliation job hàng ngày để re-aggregate và upsert bản sửa vào bảng chính. Cách này giữ real-time path nhanh và dữ liệu đầy đủ trên horizon dài hơn.
Ba điều cần monitor cho setup này:
- Tỷ lệ bản ghi trễ (
late_events / total_events). Tăng đột biến — ví dụ từ 0,3 % lên 4 % — thường chỉ vào clock drift của producer hoặc deployment đã flush một backlog lớn. - Side-output lag: nếu
late_streamđang tăng và không drain được, reconciliation job đang bị tụt hậu. - Watermark stall: nếu watermark ngừng tiến, hãy kiểm tra Kafka partition bị idle hoặc stall. Guard
with_idlenessxử lý stall tạm thời; stall dai dẳng cần alert.
Tổng kết
Event time + watermark + allowed lateness + side output là hợp đồng bốn phần. Mỗi phần xử lý một failure mode cụ thể:
- Event time cho bạn ngữ nghĩa đúng bất kể thứ tự đến.
- Watermark cho phép đóng window mà không chờ mãi mãi.
- Allowed lateness bắt các miss gần mà không cần reprocess toàn bộ window.
- Side output giữ lại mọi bản ghi để bạn có thể audit và sửa downstream.
Tỷ lệ bản ghi trễ là con số duy nhất cho bạn biết hợp đồng này đang giữ tốt đến đâu. Theo dõi nó theo từng topic, consumer group, và window size. Nếu nó trôi dạt, vấn đề hầu như luôn ở upstream — thay đổi producer, cập nhật mobile SDK, hoặc bug timezone bị bake vào event-time field.