Data contracts before dashboards
By the time a broken pipeline shows up in a business dashboard, the damage is done: executives have seen wrong numbers, reports have been shared, and you’re spending your afternoon explaining why revenue dropped 40% yesterday. Monitoring at the metric layer is the last line of defense — and you want the problem caught much earlier.
Data contracts enforce agreements at the schema boundary, the moment a producer publishes data a consumer depends on. A contract specifies the expected fields, types, and invariants, and a violation fails fast before the bad data propagates. Treating contracts as a deployment gate — no merge without a passing contract check — means data quality shifts left, from incident response to continuous prevention.
Monitoring metrics is too late
You already know the failure mode: an upstream team renames user_id to userId, or drops the event_ts column, or starts emitting null where a NOT NULL invariant was assumed. Your dbt model runs, your Airflow DAG finishes green, and 6 hours later a stakeholder notices the retention chart flatlined.
By that point you’re doing archaeology. Which pipeline? Which table version? Which commit? The blast radius of a schema change that slipped through undetected is proportional to how long it ran unnoticed — often measured in hours, sometimes days.
The real cost isn’t the fix. It’s the trust erosion. Every unexplained anomaly from here forward gets blamed on data engineering, regardless of root cause.
Metric-level monitoring (Great Expectations row counts, dbt tests on final marts) catches symptoms. Contracts catch causes, at the source.
What a data contract specifies
A data contract is a versioned, machine-readable document that codifies the agreement between a producer (the team that owns and publishes a dataset) and every consumer (any pipeline, model, or service that reads it).
Three layers matter:
- Schema — field names, types, nullability, primary/foreign key constraints. Non-negotiable; any deviation is a breaking change.
- Semantics — what values mean.
statuscan beactive | churned | suspended— not0 | 1 | 2. Units:revenue_usdis in dollars, not cents.event_tsis UTC, not local time. - SLA — freshness guarantee (data arrives by 06:00 UTC), row-count floor (>50k events/day), p99 latency for streaming topics.
A minimal YAML contract looks like this:
# contracts/orders.v2.yaml
schema: orders
version: "2.1.0"
owner: "platform-data@company.com"
consumers:
- analytics-mart
- billing-service
fields:
- name: order_id
type: STRING
nullable: false
description: "UUID v4, globally unique"
- name: user_id
type: STRING
nullable: false
- name: status
type: STRING
nullable: false
enum: [pending, confirmed, shipped, cancelled]
- name: revenue_usd
type: NUMERIC(12,2)
nullable: false
description: "Gross revenue in USD, pre-tax"
- name: created_at
type: TIMESTAMP
nullable: false
description: "UTC"
sla:
freshness_minutes: 30
min_rows_per_day: 10000
Enforce at the boundary in CI
Defining contracts in YAML does nothing unless something enforces them. The enforcement point is CI — specifically, a schema compatibility check that runs on every pull request that touches a table or topic definition.
The check has two jobs: validate the proposed schema against the contract spec, and detect whether the change is breaking or backward-compatible.
# ci/check_contract.py
import yaml
import sys
from dataclasses import dataclass
from typing import List
BREAKING_TYPE_CHANGES = {
("STRING", "INTEGER"),
("NUMERIC", "STRING"),
("TIMESTAMP", "STRING"),
}
@dataclass
class Field:
name: str
type: str
nullable: bool
def load_fields(contract_path: str) -> dict[str, Field]:
with open(contract_path) as f:
contract = yaml.safe_load(f)
return {
field["name"]: Field(
name=field["name"],
type=field["type"],
nullable=field.get("nullable", True),
)
for field in contract["fields"]
}
def check_compatibility(old_path: str, new_path: str) -> List[str]:
old = load_fields(old_path)
new = load_fields(new_path)
errors = []
for name, old_field in old.items():
if name not in new:
errors.append(f"BREAKING: field '{name}' removed")
continue
new_field = new[name]
if (old_field.type, new_field.type) in BREAKING_TYPE_CHANGES:
errors.append(
f"BREAKING: field '{name}' type changed "
f"{old_field.type} -> {new_field.type}"
)
if old_field.nullable is False and new_field.nullable is True:
errors.append(
f"BREAKING: field '{name}' changed NOT NULL -> nullable"
)
return errors
if __name__ == "__main__":
old_contract, new_contract = sys.argv[1], sys.argv[2]
errors = check_compatibility(old_contract, new_contract)
if errors:
print("Contract compatibility check FAILED:")
for e in errors:
print(f" {e}")
sys.exit(1)
print("Contract compatibility check PASSED")
Wire this into your GitHub Actions workflow: compare the current contract YAML against the main-branch version on every PR. A breaking change fails the check and blocks the merge. A backward-compatible addition — new nullable field, new enum value, tightened SLA — passes.
This costs one step in CI. It saves you hours of incident triage.
Versioning and breaking-change policy
Contracts follow semver with explicit rules for what constitutes a major version bump:
| Change | Classification |
|---|---|
| Add nullable field | backward-compatible (minor) |
| Add new enum value | backward-compatible (minor) |
| Remove field | breaking (major) |
| Rename field | breaking (major) |
| Change NOT NULL → nullable | breaking (major) |
| Tighten type (STRING → UUID format) | breaking (major) |
| Loosen SLA | breaking (major — notify consumers) |
When a breaking change is genuinely required, the workflow is:
- Publish
orders.v3.yamlalongsideorders.v2.yaml. - Notify registered consumers (the
consumers:list in the contract). - Run both versions in parallel for a migration window — typically 2–4 weeks.
- Deprecate v2 only after all consumers have migrated and confirmed in writing.
This is not bureaucracy. It is the operational cost of having multiple teams depend on a shared dataset. The alternative — silent schema drift — is more expensive.
Closing takeaway
Start with your highest-traffic, most downstream-depended-on table. Write a contract YAML for it. Add a CI check that runs git diff main -- contracts/ and validates any changed file. Do this before you build the next dashboard.
The first time the CI check catches a breaking change before it merges, you’ll understand why this belongs at the start of the pipeline, not the end.
Khi một pipeline hỏng xuất hiện trên dashboard kinh doanh, thiệt hại đã xong: lãnh đạo đã thấy số sai, báo cáo đã được chia sẻ, và bạn đang dành cả buổi chiều giải thích tại sao doanh thu hôm qua giảm 40%. Monitoring ở tầng metric là tuyến phòng thủ cuối cùng — và bạn muốn bắt được vấn đề sớm hơn nhiều.
Data contract thực thi thỏa thuận tại ranh giới schema, ngay lúc producer phát hành dữ liệu mà consumer phụ thuộc vào. Một contract chỉ định các trường, kiểu dữ liệu và bất biến mong đợi, và khi vi phạm sẽ fail nhanh trước khi dữ liệu xấu lan ra. Coi contract như một deployment gate — không merge nếu contract check chưa pass — có nghĩa là chất lượng dữ liệu dịch chuyển sang trái, từ xử lý sự cố sang phòng ngừa liên tục.
Monitoring metric là quá muộn
Bạn đã biết kiểu lỗi này rồi: team upstream đổi tên user_id thành userId, hoặc xóa cột event_ts, hoặc bắt đầu emit null ở nơi được giả định là NOT NULL. DAG Airflow của bạn chạy xong, model dbt hoàn thành với trạng thái xanh, và 6 tiếng sau stakeholder nhận ra biểu đồ retention phẳng lì.
Đến lúc đó bạn đang làm công việc khảo cổ. Pipeline nào? Phiên bản bảng nào? Commit nào? Mức độ thiệt hại của một schema change lọt qua không bị phát hiện tỷ lệ thuận với thời gian nó chạy không ai hay — thường tính bằng giờ, đôi khi tính bằng ngày.
Chi phí thực sự không phải là việc sửa lỗi. Mà là sự xói mòn niềm tin. Mỗi bất thường không giải thích được từ đây về sau đều bị đổ lỗi cho data engineering, bất kể nguyên nhân gốc rễ là gì.
Monitoring ở tầng metric (kiểm tra row count bằng Great Expectations, dbt test trên final mart) bắt triệu chứng. Contract bắt nguyên nhân, tại nguồn.
Data contract chỉ định những gì
Một data contract là tài liệu được versioned, machine-readable, mã hóa thỏa thuận giữa producer (team sở hữu và phát hành dataset) và mọi consumer (bất kỳ pipeline, model, hoặc service nào đọc nó).
Ba tầng quan trọng:
- Schema — tên trường, kiểu dữ liệu, nullable, ràng buộc primary/foreign key. Không thể thương lượng; bất kỳ thay đổi nào là breaking change.
- Semantics — ý nghĩa của các giá trị.
statuscó thể làactive | churned | suspended— không phải0 | 1 | 2. Đơn vị:revenue_usdtính bằng đô la, không phải cent.event_tslà UTC, không phải giờ địa phương. - SLA — cam kết về độ tươi (dữ liệu đến trước 06:00 UTC), ngưỡng row count tối thiểu (>50k event/ngày), p99 latency cho streaming topic.
Một contract YAML tối giản trông như thế này:
# contracts/orders.v2.yaml
schema: orders
version: "2.1.0"
owner: "platform-data@company.com"
consumers:
- analytics-mart
- billing-service
fields:
- name: order_id
type: STRING
nullable: false
description: "UUID v4, globally unique"
- name: user_id
type: STRING
nullable: false
- name: status
type: STRING
nullable: false
enum: [pending, confirmed, shipped, cancelled]
- name: revenue_usd
type: NUMERIC(12,2)
nullable: false
description: "Gross revenue in USD, pre-tax"
- name: created_at
type: TIMESTAMP
nullable: false
description: "UTC"
sla:
freshness_minutes: 30
min_rows_per_day: 10000
Thực thi tại ranh giới trong CI
Định nghĩa contract trong YAML không làm được gì nếu không có gì thực thi chúng. Điểm thực thi là CI — cụ thể là một kiểm tra tính tương thích schema chạy trên mọi pull request chạm vào định nghĩa bảng hoặc topic.
Kiểm tra có hai nhiệm vụ: xác nhận schema được đề xuất so với spec trong contract, và phát hiện xem thay đổi đó là breaking hay backward-compatible.
# ci/check_contract.py
import yaml
import sys
from dataclasses import dataclass
from typing import List
BREAKING_TYPE_CHANGES = {
("STRING", "INTEGER"),
("NUMERIC", "STRING"),
("TIMESTAMP", "STRING"),
}
@dataclass
class Field:
name: str
type: str
nullable: bool
def load_fields(contract_path: str) -> dict[str, Field]:
with open(contract_path) as f:
contract = yaml.safe_load(f)
return {
field["name"]: Field(
name=field["name"],
type=field["type"],
nullable=field.get("nullable", True),
)
for field in contract["fields"]
}
def check_compatibility(old_path: str, new_path: str) -> List[str]:
old = load_fields(old_path)
new = load_fields(new_path)
errors = []
for name, old_field in old.items():
if name not in new:
errors.append(f"BREAKING: field '{name}' removed")
continue
new_field = new[name]
if (old_field.type, new_field.type) in BREAKING_TYPE_CHANGES:
errors.append(
f"BREAKING: field '{name}' type changed "
f"{old_field.type} -> {new_field.type}"
)
if old_field.nullable is False and new_field.nullable is True:
errors.append(
f"BREAKING: field '{name}' changed NOT NULL -> nullable"
)
return errors
if __name__ == "__main__":
old_contract, new_contract = sys.argv[1], sys.argv[2]
errors = check_compatibility(old_contract, new_contract)
if errors:
print("Contract compatibility check FAILED:")
for e in errors:
print(f" {e}")
sys.exit(1)
print("Contract compatibility check PASSED")
Kết nối script này vào GitHub Actions workflow: so sánh contract YAML hiện tại với phiên bản trên nhánh main trên mỗi PR. Một breaking change làm fail kiểm tra và chặn merge. Một thêm mới backward-compatible — trường nullable mới, giá trị enum mới, SLA chặt hơn — sẽ pass.
Điều này tốn một bước trong CI. Nó giúp bạn tiết kiệm hàng giờ điều tra sự cố.
Versioning và chính sách breaking change
Contract theo semver với quy tắc rõ ràng về những gì là major version bump:
| Thay đổi | Phân loại |
|---|---|
| Thêm trường nullable | backward-compatible (minor) |
| Thêm giá trị enum mới | backward-compatible (minor) |
| Xóa trường | breaking (major) |
| Đổi tên trường | breaking (major) |
| Đổi NOT NULL → nullable | breaking (major) |
| Thắt chặt kiểu (STRING → UUID format) | breaking (major) |
| Nới lỏng SLA | breaking (major — thông báo consumer) |
Khi một breaking change thực sự cần thiết, quy trình là:
- Phát hành
orders.v3.yamlsong song vớiorders.v2.yaml. - Thông báo cho các consumer đã đăng ký (danh sách
consumers:trong contract). - Chạy cả hai phiên bản song song trong thời gian migration — thường 2–4 tuần.
- Deprecate v2 chỉ sau khi tất cả consumer đã migrate và xác nhận bằng văn bản.
Đây không phải quan liêu. Đây là chi phí vận hành khi nhiều team phụ thuộc vào một dataset dùng chung. Phương án thay thế — schema drift âm thầm — tốn kém hơn.
Kết luận
Bắt đầu với bảng có lưu lượng cao nhất, bị phụ thuộc nhiều nhất về phía hạ nguồn. Viết một contract YAML cho nó. Thêm một CI check chạy git diff main -- contracts/ và xác nhận bất kỳ file nào bị thay đổi. Làm điều này trước khi bạn xây dựng dashboard tiếp theo.
Lần đầu tiên CI check bắt được một breaking change trước khi nó được merge, bạn sẽ hiểu tại sao nó cần đứng ở đầu pipeline, không phải ở cuối.