Skip to content

Connections and credentials

This guide describes every supported way to pass source, sink, and state credentials into dpone.

dpone normalizes all connection providers into one runtime DTO: CredentialsConfig. Runtime connectors should not know whether a secret came from environment variables, Airflow, Vault, or inline params.

Supported providers

connection_type Best for Secret storage Notes
env local dev, Docker, GitHub Actions, Kubernetes secrets process environment Recommended for OSS examples and CI without Vault.
airflow Airflow production deployments Airflow Connections Default runtime provider when connection_type is omitted in direct factory usage.
vault enterprise production HashiCorp Vault-compatible KV Uses the public vault-kv-client package.
params tests, smoke runs, generated examples inline JSON string Do not commit real secrets with this mode.

Connector coverage matrix

Runtime family env airflow vault params
Postgres source/sink/state yes yes yes yes
MSSQL source/sink/state yes yes yes yes
ClickHouse source/sink yes yes yes yes
BigQuery sink/state yes yes yes yes
Kafka source/sink/state offsets yes yes yes yes
Generic REST API source yes yes yes yes

Specialized API connectors can keep their own credential contract, but generic REST uses the same four providers. For a specialized API connector that only implements from_vault(), use connection_type: vault until that connector adds from_credentials().

Manifest shape

Every source and sink uses the same connection fields:

source:
  type: postgres
  connection_id: postgres_source
  connection_type: env

sink:
  type: mssql
  connection_id: mssql_dwh
  connection_type: env

For Vault, add the secret location:

source:
  type: postgres
  connection_id: postgres_source
  connection_type: vault
  vault_mount_point: secret
  vault_path: postgres/source

For params, connection_id is a JSON object:

sink:
  type: mssql
  connection_type: params
  connection_id: >-
    {"host":"localhost","port":1433,"database":"dpone","username":"sa","password":"secret","driver":"ODBC Driver 18 for SQL Server","trust_server_certificate":"yes","bcp_path":"bcp"}

Environment variables

The environment provider uppercases connection_id and reads fields from <CONNECTION_ID>_<FIELD>.

Use underscores in connection_id values, for example mssql_dwh, so shell variable names stay portable.

Postgres env example

export POSTGRES_SOURCE_HOST=localhost
export POSTGRES_SOURCE_PORT=5432
export POSTGRES_SOURCE_DATABASE=dpone
export POSTGRES_SOURCE_USERNAME=dpone
export POSTGRES_SOURCE_PASSWORD='secret'
export POSTGRES_SOURCE_SCHEMA=public
source:
  type: postgres
  connection_id: postgres_source
  connection_type: env

MSSQL env example

export MSSQL_DWH_HOST=localhost
export MSSQL_DWH_PORT=1433
export MSSQL_DWH_DATABASE=dpone
export MSSQL_DWH_USERNAME=sa
export MSSQL_DWH_PASSWORD='secret'
export MSSQL_DWH_DRIVER='ODBC Driver 18 for SQL Server'
export MSSQL_DWH_ENCRYPT=yes
export MSSQL_DWH_TRUST_SERVER_CERTIFICATE=yes
export MSSQL_DWH_BCP_PATH=/opt/mssql-tools18/bin/bcp
sink:
  type: mssql
  connection_id: mssql_dwh
  connection_type: env

ClickHouse env example

export CLICKHOUSE_DWH_HOST=localhost
export CLICKHOUSE_DWH_PORT=9000
export CLICKHOUSE_DWH_DATABASE=dpone
export CLICKHOUSE_DWH_USERNAME=default
export CLICKHOUSE_DWH_PASSWORD='secret'
export CLICKHOUSE_DWH_SECURE=false
export CLICKHOUSE_DWH_COMPRESSION=true
export CLICKHOUSE_DWH_CONNECT_TIMEOUT=10
export CLICKHOUSE_DWH_SEND_RECEIVE_TIMEOUT=300
export CLICKHOUSE_DWH_SETTINGS='{"max_threads":4}'
sink:
  type: clickhouse
  connection_id: clickhouse_dwh
  connection_type: env

BigQuery env example

Use either inline service-account JSON:

export BIGQUERY_DWH_PROJECT_ID=demo-project
export BIGQUERY_DWH_SERVICE_ACCOUNT_INFO='{"type":"service_account","project_id":"demo-project","client_email":"svc@example.com","private_key":"-----BEGIN PRIVATE KEY-----\n...\n-----END PRIVATE KEY-----\n"}'

Or a key file path:

export BIGQUERY_DWH_PROJECT_ID=demo-project
export BIGQUERY_DWH_SERVICE_ACCOUNT_KEY_FILE=/secrets/bigquery-service-account.json
sink:
  type: bigquery
  connection_id: bigquery_dwh
  connection_type: env

Kafka env example

export KAFKA_CLUSTER_BOOTSTRAP_SERVERS=localhost:9092
export KAFKA_CLUSTER_SECURITY_PROTOCOL=SASL_SSL
export KAFKA_CLUSTER_SASL_MECHANISM=PLAIN
export KAFKA_CLUSTER_SASL_USERNAME=dpone
export KAFKA_CLUSTER_SASL_PASSWORD='secret'
export KAFKA_CLUSTER_SCHEMA_REGISTRY_URL=http://localhost:8081
export KAFKA_CLUSTER_SCHEMA_REGISTRY_USERNAME=dpone
export KAFKA_CLUSTER_SCHEMA_REGISTRY_PASSWORD='secret'
source:
  type: kafka
  connection_id: kafka_cluster
  connection_type: env

Generic REST env example

export REST_ORDERS_ENDPOINT=https://api.example.com
export REST_ORDERS_TOKEN='bearer-token'
export REST_ORDERS_API_KEY='api-key-if-needed'
source:
  type: api
  api_type: rest
  connection_id: rest_orders
  connection_type: env
  options:
    path: /v1/orders
    records_path: data.items

Airflow Connections

Airflow provider reads standard fields from BaseHook.get_connection() and connector-specific settings from connection extra JSON.

Supported Airflow conn_type values

Runtime family Airflow conn_type values
Postgres postgres
MSSQL mssql, microsoft mssql, sqlserver, odbc
ClickHouse clickhouse, ch
BigQuery bigquery, google_cloud_platform, gcp, google_cloud
Kafka kafka, confluent
Generic REST http, https, rest, api, generic_rest

MSSQL Airflow extra

{
  "driver": "ODBC Driver 18 for SQL Server",
  "encrypt": "yes",
  "trust_server_certificate": "yes",
  "query_timeout": 300,
  "bcp_path": "/opt/mssql-tools18/bin/bcp"
}

ClickHouse Airflow extra

{
  "secure": false,
  "compression": true,
  "connect_timeout": 10,
  "send_receive_timeout": 300,
  "settings": {"max_threads": 4}
}

BigQuery Airflow extra

Inline service account:

{
  "project_id": "demo-project",
  "keyfile_dict": {
    "type": "service_account",
    "project_id": "demo-project",
    "client_email": "svc@example.com",
    "private_key": "-----BEGIN PRIVATE KEY-----\n...\n-----END PRIVATE KEY-----\n"
  }
}

Key file path:

{
  "project_id": "demo-project",
  "keyfile_path": "/opt/airflow/secrets/bigquery-service-account.json"
}

REST Airflow extra

{
  "endpoint": "https://api.example.com",
  "token": "bearer-token",
  "api_key": "api-key-if-needed"
}

Vault secrets

Vault secrets use the same logical field names as env and params. Common aliases are also accepted for database fields.

{
  "host": "localhost",
  "port": 1433,
  "database": "dpone",
  "username": "sa",
  "password": "secret",
  "driver": "ODBC Driver 18 for SQL Server",
  "trust_server_certificate": "yes",
  "bcp_path": "bcp"
}

Database aliases accepted by Vault:

Canonical field Vault aliases
host db_host
port db_port
database db_database
username user, db_user
password db_password

BigQuery Vault fields:

{
  "project_id": "demo-project",
  "service_account_info": {
    "type": "service_account",
    "project_id": "demo-project",
    "client_email": "svc@example.com",
    "private_key": "-----BEGIN PRIVATE KEY-----\n...\n-----END PRIVATE KEY-----\n"
  }
}

Generic REST Vault fields:

{
  "endpoint": "https://api.example.com",
  "token": "bearer-token",
  "api_key": "api-key-if-needed"
}

Params JSON

params is useful for tests and smoke runs. Avoid real secrets in committed manifests.

Postgres:

source:
  type: postgres
  connection_type: params
  connection_id: >-
    {"host":"localhost","port":5432,"database":"dpone","username":"dpone","password":"secret"}

MSSQL:

sink:
  type: mssql
  connection_type: params
  connection_id: >-
    {"host":"localhost","port":1433,"database":"dpone","username":"sa","password":"secret","driver":"ODBC Driver 18 for SQL Server","trust_server_certificate":"yes","bcp_path":"bcp"}

ClickHouse:

sink:
  type: clickhouse
  connection_type: params
  connection_id: >-
    {"host":"localhost","port":9000,"database":"dpone","username":"default","password":"secret","secure":false,"settings":{"max_threads":4}}

BigQuery:

sink:
  type: bigquery
  connection_type: params
  connection_id: >-
    {"project_id":"demo-project","service_account_key_file":"/secrets/bigquery-service-account.json"}

Kafka:

sink:
  type: kafka
  connection_type: params
  connection_id: >-
    {"bootstrap_servers":"localhost:9092","schema_registry_url":"http://localhost:8081"}

Generic REST:

source:
  type: api
  api_type: rest
  connection_type: params
  connection_id: >-
    {"endpoint":"https://api.example.com","token":"bearer-token"}
  options:
    path: /v1/orders
    records_path: data.items

Runbooks

Incomplete credentials

Check the canonical required fields:

Runtime family Required fields
Postgres host, database, username, password
MSSQL host, database; username/password optional for trusted auth
ClickHouse host, database, username
BigQuery project_id plus service_account_info or service_account_key_file
Kafka bootstrap_servers
Generic REST endpoint through credentials or source.options.endpoint

Env connection works locally but fails in CI

Run:

dpone doctor --profile ci --format md

Then verify the exact uppercase prefix. For connection_id: mssql_dwh, the prefix is MSSQL_DWH_.

Airflow connection extra is ignored

Validate that extra is valid JSON. Invalid JSON is intentionally ignored with a warning so a broken extra block does not crash unrelated imports.

REST source still asks for Vault

Generic REST supports all providers. Specialized API connectors may still only support Vault until they implement from_credentials(). For new API connectors, implement both from_vault() and from_credentials().

BigQuery cannot find credentials

Use one of these two patterns:

BIGQUERY_DWH_SERVICE_ACCOUNT_INFO='{"project_id":"demo-project",...}'

or:

BIGQUERY_DWH_SERVICE_ACCOUNT_KEY_FILE=/absolute/path/key.json

If both are present, key file path takes precedence in the runtime factory.

Developer contract for new connectors

New connectors should accept a normalized CredentialsConfig and provide a small factory method when credentials are not passed directly:

class ExampleConnector:
    @classmethod
    def from_credentials(cls, credentials: CredentialsConfig, **kwargs):
        return cls(
            endpoint=credentials.endpoint,
            token=credentials.token,
            **kwargs,
        )

This keeps provider-specific logic inside dpone.runtime.credentials and avoids copying Vault/env/Airflow parsing into connector modules.