Kafka -> Kafka¶
This guide is a copy/paste-ready starting point for loading data from Kafka into Kafka with dpone.
When to use this path¶
Use this path when Kafka is the system of record or ingestion boundary and Kafka is the landing, warehouse, event-log, or downstream replication target.
Copy/paste manifest¶
name: kafka_to_kafka_example
source:
type: kafka
connection_id: kafka_cluster
connection_type: vault
topic: orders_events
options:
group_id: dpone.orders.batch
read_mode: offsets
offset_storage: dpone
start_from: stored
message_format: json
envelope: auto
batch_size: 50000
sink:
type: kafka
connection_id: kafka_cluster
connection_type: vault
topic: dwh.orders
strategy:
mode: incremental_merge
unique_key: order_id
merge_policy: event_upsert
duplicate_policy: fail
options:
message_format: json
envelope: flat
key:
mode: unique_key
delivery:
mode: at_least_once
compression_type: zstd
linger_ms: 20
state:
type: postgres
connection_id: postgres_state
table:
schema: etl_state
name: dpone_state
quality:
mode: fail
checks:
- type: min_rows
threshold: 1
- type: source_target_count
tolerance_pct: 0.1
observability:
artifacts:
enabled: true
path: .dpone/runs/kafka_to_kafka
Run it locally:
Supported load strategies¶
| Strategy | Status | Notes |
|---|---|---|
full_refresh |
Supported | Uses staging first, then applies the target-specific finalization plan. |
incremental_append |
Supported | Uses staging first, then applies the target-specific finalization plan. |
incremental_merge |
Supported | Produces keyed upsert/delete events with merge_policy: event_upsert; no topic mutation occurs. |
replace |
Supported | Uses staging first, then applies the target-specific finalization plan. |
partition_replace |
Not supported | Kafka topics are append-only logs; use incremental_merge with merge_policy: event_upsert. |
snapshot_reconciliation |
Not recommended | The target does not expose a safe native finalization path for this strategy. |
kafka_offsets |
Kafka source only | Consumes a bounded offset window and persists offsets after sink success. |
See Load strategies for the detailed algorithm for each strategy.
Runtime algorithm¶
flowchart TD
A["Resolve manifest and registry entries"] --> B["Create Kafka source"]
B --> C["Plan bounded extract"]
C --> D["Read through bounded offset, timestamp, or max-record Kafka batch consume"]
D --> E["Emit ExtractResult with schema and artifact"]
E --> F["Plan schema evolution"]
F --> G["Create Kafka staging or event batch"]
G --> H["Load through bounded producer with delivery aggregation and optional idempotence"]
H --> I["Apply finalization strategy"]
I --> J["Run quality and reconciliation checks"]
J --> K["Advance state only after success"]
Strategy behavior¶
full_refresh: extract the selected source boundary, load into staging, and replace the target according to the target's safe finalization path.incremental_append: extract only the incremental boundary and append rows through staging or event production.incremental_merge: produce keyed upsert/delete events withmerge_policy: event_upsert; requiresunique_key.replace: reload a bounded predicate window through staging and then atomically replace the matching target slice.snapshot_reconciliation: compare the latest source snapshot with the target key set and apply configured physical-delete or soft-delete behavior through staging-first plans.partition_replace: not supported for Kafka sinks because a topic is an append-only event log.
Schema evolution and type mapping¶
Schema evolution is enabled by default and runs before the staging/final load path:
- Read source schema from
ExtractResult.schema. - Introspect the Kafka target schema.
- Apply safe additions and widening operations.
- Fail breaking changes by default.
- If configured, route incompatible type changes to
__dpone__nc__<column>.
Use Schema evolution and Type mapping matrix when adding columns or changing source types.
Runbook¶
- Start with
dpone doctor --profile localand fix missing extras or native clients. - Run
dpone plan <manifest> --format mdand review source boundary, staging path, schema evolution, state, and quality checks. - Run a small bounded window first.
- Inspect the run artifact under
.dpone/runs/kafka_to_kafka. - For incremental jobs, verify state before enabling a schedule.
- For delete-aware jobs, run reconciliation in report-only mode before enabling physical deletes.
- Promote the manifest through GitOps after the plan and artifact are reviewed.
Cross-links¶
- Source -> sink matrix
- Load strategies
- Schema evolution
- Type mapping matrix
- Reconciliation and CDC
- Performance guide
Type contracts and physical design¶
This flow supports the shared dpone type-governance stack:
- Type inference for source metadata, sampled profiling, confidence, and empty string vs
NULLbehavior. - Schema contracts for explicit logical column types, enforcement modes, and
__dpone__nc__*variant columns. - Physical design for target-specific DDL such as concrete SQL types, indexes, partitioning, compression, ClickHouse
LowCardinality, and BigQuery clustering.
Use dpone schema infer --manifest ... and dpone schema physical-plan --manifest ... before enabling new table DDL in production.