Runtime data contracts and quarantine¶
Runtime data contracts are the execution layer for schema contracts. Type inference and physical design decide what the target should look like; runtime enforcement decides which rows are safe to stage, which rows need diagnostics, and whether state may advance.
Execution flow¶
flowchart TD
Rows["Extracted rows"] --> Contract["SchemaContract"]
Contract --> Enforcer["ContractEnforcementService"]
Enforcer -->|valid| Staging["Target staging rows"]
Enforcer -->|bad + strict| Fail["Fail load before state commit"]
Enforcer -->|bad + quarantine| Quarantine["etl_state.__dpone__quarantine / JSONL evidence"]
Enforcer -->|bad + variant_column| Variant["__dpone__nc__<column>"]
Staging --> Finalizer["Sink finalizer"]
Quarantine --> Evidence["Data contract evidence bundle"]
Finalizer --> State["Advance state only when committed"]
Enforcement modes¶
| Mode | Target rows | Bad rows | State advance |
|---|---|---|---|
strict |
only valid rows | reject and fail run | no |
coerce |
valid and safely coerced rows | reject unsafe rows | no when rejects exist |
quarantine |
only valid rows | quarantine with diagnostics | yes when load succeeds |
warn |
best-effort rows | warning diagnostics | yes when load succeeds |
strict is the production default. Use quarantine for dirty REST/API, file,
and semi-structured feeds where good rows should continue while bad rows are
auditable.
Conflict policies¶
| Policy | Behavior |
|---|---|
fail |
Stop before target mutation. |
variant_column |
Put incompatible values in __dpone__nc__<column> and keep the original target column clean. |
quarantine |
Store bad rows with diagnostics outside the target table. |
Example¶
schema_contract:
enforcement: quarantine
columns:
amount:
type: decimal
precision: 18
scale: 2
nullable: false
updated_at:
type: timestamp
timezone: true
nullable: false
sink:
options:
type_inference:
conflict_policy: quarantine
Python API¶
from dpone.ops.quarantine import QuarantineService
from dpone.readiness.schema_contracts import SchemaContract
from dpone.type_system import ContractEnforcementService
contract = SchemaContract.from_config({
"enforcement": "quarantine",
"columns": {"amount": {"type": "decimal", "precision": 18, "scale": 2}},
})
result = ContractEnforcementService(
quarantine=QuarantineService(".dpone/quarantine"),
).enforce(
rows=[{"amount": "12.30"}, {"amount": "bad"}],
contract=contract,
run_id="01J...",
load_id="01J...",
)
assert result.target_rows == [{"amount": "12.30"}]
assert result.state_commit_allowed
Evidence bundle¶
Use the data contract evidence writer when a release gate or catalog needs a single artifact with enforcement, DDL, compatibility, and OpenLineage facets.
from dpone.ops.data_contract_evidence import DataContractEvidenceBundleWriter
artifact = DataContractEvidenceBundleWriter(".dpone/evidence/orders").write(
run_id="01J...",
pipeline="orders",
enforcement=result,
ddl_apply={"applied": True, "blockers": []},
compatibility={"passed": True, "decisions": []},
)
Generated files:
| File | Purpose |
|---|---|
data_contract_evidence.json |
Machine-readable certification and lineage evidence. |
data_contract_evidence.md |
Human runbook-friendly release artifact. |
Streaming and native fast paths¶
Streaming and native paths are covered by Streaming-safe contracts. Row streams are validated chunk-by-chunk. Opaque file and partitioned native artifacts fail closed unless the source export step marks them as prevalidated.
Runbook¶
| Symptom | Action |
|---|---|
strict run fails |
Inspect diagnostics, fix source data or schema contract, rerun without advancing state. |
| Quarantine grows | Export quarantine rows, fix parsing/source contract, replay after validation. |
| Empty strings look like NULL | Keep type_inference.empty_string_is_null: false unless the source contract explicitly says otherwise. |
| Variant column appears | Treat __dpone__nc__<column> as an expand-contract migration, update downstream consumers, then clean up manually. |
| State advanced with bad rows | This is valid only for quarantine or warn; strict/coerce rejects block state advancement. |