Skip to content

Runtime data contracts and quarantine

Runtime data contracts are the execution layer for schema contracts. Type inference and physical design decide what the target should look like; runtime enforcement decides which rows are safe to stage, which rows need diagnostics, and whether state may advance.

Execution flow

flowchart TD
    Rows["Extracted rows"] --> Contract["SchemaContract"]
    Contract --> Enforcer["ContractEnforcementService"]
    Enforcer -->|valid| Staging["Target staging rows"]
    Enforcer -->|bad + strict| Fail["Fail load before state commit"]
    Enforcer -->|bad + quarantine| Quarantine["etl_state.__dpone__quarantine / JSONL evidence"]
    Enforcer -->|bad + variant_column| Variant["__dpone__nc__<column>"]
    Staging --> Finalizer["Sink finalizer"]
    Quarantine --> Evidence["Data contract evidence bundle"]
    Finalizer --> State["Advance state only when committed"]

Enforcement modes

Mode Target rows Bad rows State advance
strict only valid rows reject and fail run no
coerce valid and safely coerced rows reject unsafe rows no when rejects exist
quarantine only valid rows quarantine with diagnostics yes when load succeeds
warn best-effort rows warning diagnostics yes when load succeeds

strict is the production default. Use quarantine for dirty REST/API, file, and semi-structured feeds where good rows should continue while bad rows are auditable.

Conflict policies

Policy Behavior
fail Stop before target mutation.
variant_column Put incompatible values in __dpone__nc__<column> and keep the original target column clean.
quarantine Store bad rows with diagnostics outside the target table.

Example

schema_contract:
  enforcement: quarantine
  columns:
    amount:
      type: decimal
      precision: 18
      scale: 2
      nullable: false
    updated_at:
      type: timestamp
      timezone: true
      nullable: false

sink:
  options:
    type_inference:
      conflict_policy: quarantine

Python API

from dpone.ops.quarantine import QuarantineService
from dpone.readiness.schema_contracts import SchemaContract
from dpone.type_system import ContractEnforcementService

contract = SchemaContract.from_config({
    "enforcement": "quarantine",
    "columns": {"amount": {"type": "decimal", "precision": 18, "scale": 2}},
})

result = ContractEnforcementService(
    quarantine=QuarantineService(".dpone/quarantine"),
).enforce(
    rows=[{"amount": "12.30"}, {"amount": "bad"}],
    contract=contract,
    run_id="01J...",
    load_id="01J...",
)

assert result.target_rows == [{"amount": "12.30"}]
assert result.state_commit_allowed

Evidence bundle

Use the data contract evidence writer when a release gate or catalog needs a single artifact with enforcement, DDL, compatibility, and OpenLineage facets.

from dpone.ops.data_contract_evidence import DataContractEvidenceBundleWriter

artifact = DataContractEvidenceBundleWriter(".dpone/evidence/orders").write(
    run_id="01J...",
    pipeline="orders",
    enforcement=result,
    ddl_apply={"applied": True, "blockers": []},
    compatibility={"passed": True, "decisions": []},
)

Generated files:

File Purpose
data_contract_evidence.json Machine-readable certification and lineage evidence.
data_contract_evidence.md Human runbook-friendly release artifact.

Streaming and native fast paths

Streaming and native paths are covered by Streaming-safe contracts. Row streams are validated chunk-by-chunk. Opaque file and partitioned native artifacts fail closed unless the source export step marks them as prevalidated.

Runbook

Symptom Action
strict run fails Inspect diagnostics, fix source data or schema contract, rerun without advancing state.
Quarantine grows Export quarantine rows, fix parsing/source contract, replay after validation.
Empty strings look like NULL Keep type_inference.empty_string_is_null: false unless the source contract explicitly says otherwise.
Variant column appears Treat __dpone__nc__<column> as an expand-contract migration, update downstream consumers, then clean up manually.
State advanced with bad rows This is valid only for quarantine or warn; strict/coerce rejects block state advancement.