Skip to content

MSSQL -> Kafka

This guide is a copy/paste-ready starting point for loading data from MSSQL into Kafka with dpone.

When to use this path

Use this path when MSSQL is the system of record or ingestion boundary and Kafka is the landing, warehouse, event-log, or downstream replication target.

Copy/paste manifest

name: mssql_to_kafka_example

source:
  type: mssql
  connection_id: mssql_dwh
  connection_type: vault
  table:
    schema: dbo
    name: orders
  options:
    incremental_column: updated_at
    batch_size: 50000
    export_mode: bcp

sink:
  type: kafka
  connection_id: kafka_cluster
  connection_type: vault
  topic: dwh.orders
  strategy:
    mode: incremental_merge
    unique_key: order_id
    merge_policy: event_upsert
    duplicate_policy: fail
  options:
    message_format: json
    envelope: flat
    key:
      mode: unique_key
    delivery:
      mode: at_least_once
      compression_type: zstd
      linger_ms: 20

state:
  type: postgres
  connection_id: postgres_state
  table:
    schema: etl_state
    name: dpone_state

quality:
  mode: fail
  checks:
    - type: min_rows
      threshold: 1
    - type: source_target_count
      tolerance_pct: 0.1

observability:
  artifacts:
    enabled: true
    path: .dpone/runs/mssql_to_kafka

Run it locally:

dpone plan examples/mssql_to_kafka.yaml --format md
dpone batch run examples/mssql_to_kafka.yaml

Supported load strategies

Strategy Status Notes
full_refresh Supported Uses staging first, then applies the target-specific finalization plan.
incremental_append Supported Uses staging first, then applies the target-specific finalization plan.
incremental_merge Supported Produces keyed upsert/delete events with merge_policy: event_upsert; no topic mutation occurs.
replace Supported Uses staging first, then applies the target-specific finalization plan.
partition_replace Not supported Kafka topics are append-only logs; use incremental_merge with merge_policy: event_upsert.
snapshot_reconciliation Not recommended The target does not expose a safe native finalization path for this strategy.
cdc Source-specific Uses typed CDC offsets and advances state only after sink success.

See Load strategies for the detailed algorithm for each strategy.

Runtime algorithm

flowchart TD
    A["Resolve manifest and registry entries"] --> B["Create MSSQL source"]
    B --> C["Plan bounded extract"]
    C --> D["Read through BCP queryout or pyodbc streaming cursor"]
    D --> E["Emit ExtractResult with schema and artifact"]
    E --> F["Plan schema evolution"]
    F --> G["Create Kafka staging or event batch"]
    G --> H["Load through bounded producer with delivery aggregation and optional idempotence"]
    H --> I["Apply finalization strategy"]
    I --> J["Run quality and reconciliation checks"]
    J --> K["Advance state only after success"]

Strategy behavior

  • full_refresh: extract the selected source boundary, load into staging, and replace the target according to the target's safe finalization path.
  • incremental_append: extract only the incremental boundary and append rows through staging or event production.
  • incremental_merge: produce keyed upsert/delete events with merge_policy: event_upsert; requires unique_key.
  • replace: reload a bounded predicate window through staging and then atomically replace the matching target slice.
  • snapshot_reconciliation: compare the latest source snapshot with the target key set and apply configured physical-delete or soft-delete behavior through staging-first plans.
  • partition_replace: not supported for Kafka sinks because a topic is an append-only event log.

Schema evolution and type mapping

Schema evolution is enabled by default and runs before the staging/final load path:

  1. Read source schema from ExtractResult.schema.
  2. Introspect the Kafka target schema.
  3. Apply safe additions and widening operations.
  4. Fail breaking changes by default.
  5. If configured, route incompatible type changes to __dpone__nc__<column>.

Use Schema evolution and Type mapping matrix when adding columns or changing source types.

Runbook

  1. Start with dpone doctor --profile local and fix missing extras or native clients.
  2. Run dpone plan <manifest> --format md and review source boundary, staging path, schema evolution, state, and quality checks.
  3. Run a small bounded window first.
  4. Inspect the run artifact under .dpone/runs/mssql_to_kafka.
  5. For incremental jobs, verify state before enabling a schedule.
  6. For delete-aware jobs, run reconciliation in report-only mode before enabling physical deletes.
  7. Promote the manifest through GitOps after the plan and artifact are reviewed.

Type contracts and physical design

This flow supports the shared dpone type-governance stack:

  • Type inference for source metadata, sampled profiling, confidence, and empty string vs NULL behavior.
  • Schema contracts for explicit logical column types, enforcement modes, and __dpone__nc__* variant columns.
  • Physical design for target-specific DDL such as concrete SQL types, indexes, partitioning, compression, ClickHouse LowCardinality, and BigQuery clustering.

Use dpone schema infer --manifest ... and dpone schema physical-plan --manifest ... before enabling new table DDL in production.