Apr 09, 2026 · 5 min read

Data contracts before dashboards

By the time a broken pipeline shows up in a business dashboard, the damage is done: executives have seen wrong numbers, reports have been shared, and you’re spending your afternoon explaining why revenue dropped 40% yesterday. Monitoring at the metric layer is the last line of defense — and you want the problem caught much earlier.

Data contracts enforce agreements at the schema boundary, the moment a producer publishes data a consumer depends on. A contract specifies the expected fields, types, and invariants, and a violation fails fast before the bad data propagates. Treating contracts as a deployment gate — no merge without a passing contract check — means data quality shifts left, from incident response to continuous prevention.

Monitoring metrics is too late

You already know the failure mode: an upstream team renames user_id to userId, or drops the event_ts column, or starts emitting null where a NOT NULL invariant was assumed. Your dbt model runs, your Airflow DAG finishes green, and 6 hours later a stakeholder notices the retention chart flatlined.

By that point you’re doing archaeology. Which pipeline? Which table version? Which commit? The blast radius of a schema change that slipped through undetected is proportional to how long it ran unnoticed — often measured in hours, sometimes days.

The real cost isn’t the fix. It’s the trust erosion. Every unexplained anomaly from here forward gets blamed on data engineering, regardless of root cause.

Metric-level monitoring (Great Expectations row counts, dbt tests on final marts) catches symptoms. Contracts catch causes, at the source.

What a data contract specifies

A data contract is a versioned, machine-readable document that codifies the agreement between a producer (the team that owns and publishes a dataset) and every consumer (any pipeline, model, or service that reads it).

Three layers matter:

  • Schema — field names, types, nullability, primary/foreign key constraints. Non-negotiable; any deviation is a breaking change.
  • Semantics — what values mean. status can be active | churned | suspended — not 0 | 1 | 2. Units: revenue_usd is in dollars, not cents. event_ts is UTC, not local time.
  • SLA — freshness guarantee (data arrives by 06:00 UTC), row-count floor (>50k events/day), p99 latency for streaming topics.

A minimal YAML contract looks like this:

# contracts/orders.v2.yaml
schema: orders
version: "2.1.0"
owner: "platform-data@company.com"
consumers:
  - analytics-mart
  - billing-service

fields:
  - name: order_id
    type: STRING
    nullable: false
    description: "UUID v4, globally unique"
  - name: user_id
    type: STRING
    nullable: false
  - name: status
    type: STRING
    nullable: false
    enum: [pending, confirmed, shipped, cancelled]
  - name: revenue_usd
    type: NUMERIC(12,2)
    nullable: false
    description: "Gross revenue in USD, pre-tax"
  - name: created_at
    type: TIMESTAMP
    nullable: false
    description: "UTC"

sla:
  freshness_minutes: 30
  min_rows_per_day: 10000
PRODUCER orders.v2.yaml CONTRACT GATE schema · semantics · SLA CI enforced analytics-mart consumer billing-service consumer dbt-staging consumer REJECTED — breaking change
Figure 1 — contract gate blocks breaking changes before they reach consumers

Enforce at the boundary in CI

Defining contracts in YAML does nothing unless something enforces them. The enforcement point is CI — specifically, a schema compatibility check that runs on every pull request that touches a table or topic definition.

The check has two jobs: validate the proposed schema against the contract spec, and detect whether the change is breaking or backward-compatible.

# ci/check_contract.py
import yaml
import sys
from dataclasses import dataclass
from typing import List

BREAKING_TYPE_CHANGES = {
    ("STRING", "INTEGER"),
    ("NUMERIC", "STRING"),
    ("TIMESTAMP", "STRING"),
}

@dataclass
class Field:
    name: str
    type: str
    nullable: bool

def load_fields(contract_path: str) -> dict[str, Field]:
    with open(contract_path) as f:
        contract = yaml.safe_load(f)
    return {
        field["name"]: Field(
            name=field["name"],
            type=field["type"],
            nullable=field.get("nullable", True),
        )
        for field in contract["fields"]
    }

def check_compatibility(old_path: str, new_path: str) -> List[str]:
    old = load_fields(old_path)
    new = load_fields(new_path)
    errors = []

    for name, old_field in old.items():
        if name not in new:
            errors.append(f"BREAKING: field '{name}' removed")
            continue
        new_field = new[name]
        if (old_field.type, new_field.type) in BREAKING_TYPE_CHANGES:
            errors.append(
                f"BREAKING: field '{name}' type changed "
                f"{old_field.type} -> {new_field.type}"
            )
        if old_field.nullable is False and new_field.nullable is True:
            errors.append(
                f"BREAKING: field '{name}' changed NOT NULL -> nullable"
            )

    return errors

if __name__ == "__main__":
    old_contract, new_contract = sys.argv[1], sys.argv[2]
    errors = check_compatibility(old_contract, new_contract)
    if errors:
        print("Contract compatibility check FAILED:")
        for e in errors:
            print(f"  {e}")
        sys.exit(1)
    print("Contract compatibility check PASSED")

Wire this into your GitHub Actions workflow: compare the current contract YAML against the main-branch version on every PR. A breaking change fails the check and blocks the merge. A backward-compatible addition — new nullable field, new enum value, tightened SLA — passes.

This costs one step in CI. It saves you hours of incident triage.

PULL REQUEST CONTRACT CHECK check_contract.py compare vs main PASS merge allowed FAIL merge blocked compat breaking
Figure 2 — CI pipeline: compatible changes merge; breaking changes are blocked at PR time

Versioning and breaking-change policy

Contracts follow semver with explicit rules for what constitutes a major version bump:

ChangeClassification
Add nullable fieldbackward-compatible (minor)
Add new enum valuebackward-compatible (minor)
Remove fieldbreaking (major)
Rename fieldbreaking (major)
Change NOT NULL → nullablebreaking (major)
Tighten type (STRING → UUID format)breaking (major)
Loosen SLAbreaking (major — notify consumers)

When a breaking change is genuinely required, the workflow is:

  1. Publish orders.v3.yaml alongside orders.v2.yaml.
  2. Notify registered consumers (the consumers: list in the contract).
  3. Run both versions in parallel for a migration window — typically 2–4 weeks.
  4. Deprecate v2 only after all consumers have migrated and confirmed in writing.

This is not bureaucracy. It is the operational cost of having multiple teams depend on a shared dataset. The alternative — silent schema drift — is more expensive.

Closing takeaway

Start with your highest-traffic, most downstream-depended-on table. Write a contract YAML for it. Add a CI check that runs git diff main -- contracts/ and validates any changed file. Do this before you build the next dashboard.

The first time the CI check catches a breaking change before it merges, you’ll understand why this belongs at the start of the pipeline, not the end.

Khi một pipeline hỏng xuất hiện trên dashboard kinh doanh, thiệt hại đã xong: lãnh đạo đã thấy số sai, báo cáo đã được chia sẻ, và bạn đang dành cả buổi chiều giải thích tại sao doanh thu hôm qua giảm 40%. Monitoring ở tầng metric là tuyến phòng thủ cuối cùng — và bạn muốn bắt được vấn đề sớm hơn nhiều.

Data contract thực thi thỏa thuận tại ranh giới schema, ngay lúc producer phát hành dữ liệu mà consumer phụ thuộc vào. Một contract chỉ định các trường, kiểu dữ liệu và bất biến mong đợi, và khi vi phạm sẽ fail nhanh trước khi dữ liệu xấu lan ra. Coi contract như một deployment gate — không merge nếu contract check chưa pass — có nghĩa là chất lượng dữ liệu dịch chuyển sang trái, từ xử lý sự cố sang phòng ngừa liên tục.

Monitoring metric là quá muộn

Bạn đã biết kiểu lỗi này rồi: team upstream đổi tên user_id thành userId, hoặc xóa cột event_ts, hoặc bắt đầu emit null ở nơi được giả định là NOT NULL. DAG Airflow của bạn chạy xong, model dbt hoàn thành với trạng thái xanh, và 6 tiếng sau stakeholder nhận ra biểu đồ retention phẳng lì.

Đến lúc đó bạn đang làm công việc khảo cổ. Pipeline nào? Phiên bản bảng nào? Commit nào? Mức độ thiệt hại của một schema change lọt qua không bị phát hiện tỷ lệ thuận với thời gian nó chạy không ai hay — thường tính bằng giờ, đôi khi tính bằng ngày.

Chi phí thực sự không phải là việc sửa lỗi. Mà là sự xói mòn niềm tin. Mỗi bất thường không giải thích được từ đây về sau đều bị đổ lỗi cho data engineering, bất kể nguyên nhân gốc rễ là gì.

Monitoring ở tầng metric (kiểm tra row count bằng Great Expectations, dbt test trên final mart) bắt triệu chứng. Contract bắt nguyên nhân, tại nguồn.

Data contract chỉ định những gì

Một data contract là tài liệu được versioned, machine-readable, mã hóa thỏa thuận giữa producer (team sở hữu và phát hành dataset) và mọi consumer (bất kỳ pipeline, model, hoặc service nào đọc nó).

Ba tầng quan trọng:

  • Schema — tên trường, kiểu dữ liệu, nullable, ràng buộc primary/foreign key. Không thể thương lượng; bất kỳ thay đổi nào là breaking change.
  • Semantics — ý nghĩa của các giá trị. status có thể là active | churned | suspended — không phải 0 | 1 | 2. Đơn vị: revenue_usd tính bằng đô la, không phải cent. event_ts là UTC, không phải giờ địa phương.
  • SLA — cam kết về độ tươi (dữ liệu đến trước 06:00 UTC), ngưỡng row count tối thiểu (>50k event/ngày), p99 latency cho streaming topic.

Một contract YAML tối giản trông như thế này:

# contracts/orders.v2.yaml
schema: orders
version: "2.1.0"
owner: "platform-data@company.com"
consumers:
  - analytics-mart
  - billing-service

fields:
  - name: order_id
    type: STRING
    nullable: false
    description: "UUID v4, globally unique"
  - name: user_id
    type: STRING
    nullable: false
  - name: status
    type: STRING
    nullable: false
    enum: [pending, confirmed, shipped, cancelled]
  - name: revenue_usd
    type: NUMERIC(12,2)
    nullable: false
    description: "Gross revenue in USD, pre-tax"
  - name: created_at
    type: TIMESTAMP
    nullable: false
    description: "UTC"

sla:
  freshness_minutes: 30
  min_rows_per_day: 10000
PRODUCER orders.v2.yaml CONTRACT GATE schema · semantics · SLA CI enforced analytics-mart consumer billing-service consumer dbt-staging consumer REJECTED — breaking change
Hình 1 — contract gate chặn breaking change trước khi đến tay consumer

Thực thi tại ranh giới trong CI

Định nghĩa contract trong YAML không làm được gì nếu không có gì thực thi chúng. Điểm thực thi là CI — cụ thể là một kiểm tra tính tương thích schema chạy trên mọi pull request chạm vào định nghĩa bảng hoặc topic.

Kiểm tra có hai nhiệm vụ: xác nhận schema được đề xuất so với spec trong contract, và phát hiện xem thay đổi đó là breaking hay backward-compatible.

# ci/check_contract.py
import yaml
import sys
from dataclasses import dataclass
from typing import List

BREAKING_TYPE_CHANGES = {
    ("STRING", "INTEGER"),
    ("NUMERIC", "STRING"),
    ("TIMESTAMP", "STRING"),
}

@dataclass
class Field:
    name: str
    type: str
    nullable: bool

def load_fields(contract_path: str) -> dict[str, Field]:
    with open(contract_path) as f:
        contract = yaml.safe_load(f)
    return {
        field["name"]: Field(
            name=field["name"],
            type=field["type"],
            nullable=field.get("nullable", True),
        )
        for field in contract["fields"]
    }

def check_compatibility(old_path: str, new_path: str) -> List[str]:
    old = load_fields(old_path)
    new = load_fields(new_path)
    errors = []

    for name, old_field in old.items():
        if name not in new:
            errors.append(f"BREAKING: field '{name}' removed")
            continue
        new_field = new[name]
        if (old_field.type, new_field.type) in BREAKING_TYPE_CHANGES:
            errors.append(
                f"BREAKING: field '{name}' type changed "
                f"{old_field.type} -> {new_field.type}"
            )
        if old_field.nullable is False and new_field.nullable is True:
            errors.append(
                f"BREAKING: field '{name}' changed NOT NULL -> nullable"
            )

    return errors

if __name__ == "__main__":
    old_contract, new_contract = sys.argv[1], sys.argv[2]
    errors = check_compatibility(old_contract, new_contract)
    if errors:
        print("Contract compatibility check FAILED:")
        for e in errors:
            print(f"  {e}")
        sys.exit(1)
    print("Contract compatibility check PASSED")

Kết nối script này vào GitHub Actions workflow: so sánh contract YAML hiện tại với phiên bản trên nhánh main trên mỗi PR. Một breaking change làm fail kiểm tra và chặn merge. Một thêm mới backward-compatible — trường nullable mới, giá trị enum mới, SLA chặt hơn — sẽ pass.

Điều này tốn một bước trong CI. Nó giúp bạn tiết kiệm hàng giờ điều tra sự cố.

PULL REQUEST CONTRACT CHECK check_contract.py compare vs main PASS merge allowed FAIL merge blocked compat breaking
Hình 2 — CI pipeline: thay đổi tương thích được merge; breaking change bị chặn tại PR

Versioning và chính sách breaking change

Contract theo semver với quy tắc rõ ràng về những gì là major version bump:

Thay đổiPhân loại
Thêm trường nullablebackward-compatible (minor)
Thêm giá trị enum mớibackward-compatible (minor)
Xóa trườngbreaking (major)
Đổi tên trườngbreaking (major)
Đổi NOT NULL → nullablebreaking (major)
Thắt chặt kiểu (STRING → UUID format)breaking (major)
Nới lỏng SLAbreaking (major — thông báo consumer)

Khi một breaking change thực sự cần thiết, quy trình là:

  1. Phát hành orders.v3.yaml song song với orders.v2.yaml.
  2. Thông báo cho các consumer đã đăng ký (danh sách consumers: trong contract).
  3. Chạy cả hai phiên bản song song trong thời gian migration — thường 2–4 tuần.
  4. Deprecate v2 chỉ sau khi tất cả consumer đã migrate và xác nhận bằng văn bản.

Đây không phải quan liêu. Đây là chi phí vận hành khi nhiều team phụ thuộc vào một dataset dùng chung. Phương án thay thế — schema drift âm thầm — tốn kém hơn.

Kết luận

Bắt đầu với bảng có lưu lượng cao nhất, bị phụ thuộc nhiều nhất về phía hạ nguồn. Viết một contract YAML cho nó. Thêm một CI check chạy git diff main -- contracts/ và xác nhận bất kỳ file nào bị thay đổi. Làm điều này trước khi bạn xây dựng dashboard tiếp theo.

Lần đầu tiên CI check bắt được một breaking change trước khi nó được merge, bạn sẽ hiểu tại sao nó cần đứng ở đầu pipeline, không phải ở cuối.