Connections and credentials¶
This guide describes every supported way to pass source, sink, and state
credentials into dpone.
dpone normalizes all connection providers into one runtime DTO:
CredentialsConfig. Runtime connectors should not know whether a secret came
from environment variables, Airflow, Vault, or inline params.
Supported providers¶
connection_type |
Best for | Secret storage | Notes |
|---|---|---|---|
env |
local dev, Docker, GitHub Actions, Kubernetes secrets | process environment | Recommended for OSS examples and CI without Vault. |
airflow |
Airflow production deployments | Airflow Connections | Default runtime provider when connection_type is omitted in direct factory usage. |
vault |
enterprise production | HashiCorp Vault-compatible KV | Uses the public vault-kv-client package. |
params |
tests, smoke runs, generated examples | inline JSON string | Do not commit real secrets with this mode. |
Connector coverage matrix¶
| Runtime family | env |
airflow |
vault |
params |
|---|---|---|---|---|
| Postgres source/sink/state | yes | yes | yes | yes |
| MSSQL source/sink/state | yes | yes | yes | yes |
| ClickHouse source/sink | yes | yes | yes | yes |
| BigQuery sink/state | yes | yes | yes | yes |
| Kafka source/sink/state offsets | yes | yes | yes | yes |
| Generic REST API source | yes | yes | yes | yes |
Specialized API connectors can keep their own credential contract, but generic
REST uses the same four providers. For a specialized API connector that only
implements from_vault(), use connection_type: vault until that connector
adds from_credentials().
Manifest shape¶
Every source and sink uses the same connection fields:
source:
type: postgres
connection_id: postgres_source
connection_type: env
sink:
type: mssql
connection_id: mssql_dwh
connection_type: env
For Vault, add the secret location:
source:
type: postgres
connection_id: postgres_source
connection_type: vault
vault_mount_point: secret
vault_path: postgres/source
For params, connection_id is a JSON object:
sink:
type: mssql
connection_type: params
connection_id: >-
{"host":"localhost","port":1433,"database":"dpone","username":"sa","password":"secret","driver":"ODBC Driver 18 for SQL Server","trust_server_certificate":"yes","bcp_path":"bcp"}
Environment variables¶
The environment provider uppercases connection_id and reads fields from
<CONNECTION_ID>_<FIELD>.
Use underscores in connection_id values, for example mssql_dwh, so shell
variable names stay portable.
Postgres env example¶
export POSTGRES_SOURCE_HOST=localhost
export POSTGRES_SOURCE_PORT=5432
export POSTGRES_SOURCE_DATABASE=dpone
export POSTGRES_SOURCE_USERNAME=dpone
export POSTGRES_SOURCE_PASSWORD='secret'
export POSTGRES_SOURCE_SCHEMA=public
MSSQL env example¶
export MSSQL_DWH_HOST=localhost
export MSSQL_DWH_PORT=1433
export MSSQL_DWH_DATABASE=dpone
export MSSQL_DWH_USERNAME=sa
export MSSQL_DWH_PASSWORD='secret'
export MSSQL_DWH_DRIVER='ODBC Driver 18 for SQL Server'
export MSSQL_DWH_ENCRYPT=yes
export MSSQL_DWH_TRUST_SERVER_CERTIFICATE=yes
export MSSQL_DWH_BCP_PATH=/opt/mssql-tools18/bin/bcp
ClickHouse env example¶
export CLICKHOUSE_DWH_HOST=localhost
export CLICKHOUSE_DWH_PORT=9000
export CLICKHOUSE_DWH_DATABASE=dpone
export CLICKHOUSE_DWH_USERNAME=default
export CLICKHOUSE_DWH_PASSWORD='secret'
export CLICKHOUSE_DWH_SECURE=false
export CLICKHOUSE_DWH_COMPRESSION=true
export CLICKHOUSE_DWH_CONNECT_TIMEOUT=10
export CLICKHOUSE_DWH_SEND_RECEIVE_TIMEOUT=300
export CLICKHOUSE_DWH_SETTINGS='{"max_threads":4}'
BigQuery env example¶
Use either inline service-account JSON:
export BIGQUERY_DWH_PROJECT_ID=demo-project
export BIGQUERY_DWH_SERVICE_ACCOUNT_INFO='{"type":"service_account","project_id":"demo-project","client_email":"svc@example.com","private_key":"-----BEGIN PRIVATE KEY-----\n...\n-----END PRIVATE KEY-----\n"}'
Or a key file path:
export BIGQUERY_DWH_PROJECT_ID=demo-project
export BIGQUERY_DWH_SERVICE_ACCOUNT_KEY_FILE=/secrets/bigquery-service-account.json
Kafka env example¶
export KAFKA_CLUSTER_BOOTSTRAP_SERVERS=localhost:9092
export KAFKA_CLUSTER_SECURITY_PROTOCOL=SASL_SSL
export KAFKA_CLUSTER_SASL_MECHANISM=PLAIN
export KAFKA_CLUSTER_SASL_USERNAME=dpone
export KAFKA_CLUSTER_SASL_PASSWORD='secret'
export KAFKA_CLUSTER_SCHEMA_REGISTRY_URL=http://localhost:8081
export KAFKA_CLUSTER_SCHEMA_REGISTRY_USERNAME=dpone
export KAFKA_CLUSTER_SCHEMA_REGISTRY_PASSWORD='secret'
Generic REST env example¶
export REST_ORDERS_ENDPOINT=https://api.example.com
export REST_ORDERS_TOKEN='bearer-token'
export REST_ORDERS_API_KEY='api-key-if-needed'
source:
type: api
api_type: rest
connection_id: rest_orders
connection_type: env
options:
path: /v1/orders
records_path: data.items
Airflow Connections¶
Airflow provider reads standard fields from BaseHook.get_connection() and
connector-specific settings from connection extra JSON.
Supported Airflow conn_type values¶
| Runtime family | Airflow conn_type values |
|---|---|
| Postgres | postgres |
| MSSQL | mssql, microsoft mssql, sqlserver, odbc |
| ClickHouse | clickhouse, ch |
| BigQuery | bigquery, google_cloud_platform, gcp, google_cloud |
| Kafka | kafka, confluent |
| Generic REST | http, https, rest, api, generic_rest |
MSSQL Airflow extra¶
{
"driver": "ODBC Driver 18 for SQL Server",
"encrypt": "yes",
"trust_server_certificate": "yes",
"query_timeout": 300,
"bcp_path": "/opt/mssql-tools18/bin/bcp"
}
ClickHouse Airflow extra¶
{
"secure": false,
"compression": true,
"connect_timeout": 10,
"send_receive_timeout": 300,
"settings": {"max_threads": 4}
}
BigQuery Airflow extra¶
Inline service account:
{
"project_id": "demo-project",
"keyfile_dict": {
"type": "service_account",
"project_id": "demo-project",
"client_email": "svc@example.com",
"private_key": "-----BEGIN PRIVATE KEY-----\n...\n-----END PRIVATE KEY-----\n"
}
}
Key file path:
{
"project_id": "demo-project",
"keyfile_path": "/opt/airflow/secrets/bigquery-service-account.json"
}
REST Airflow extra¶
Vault secrets¶
Vault secrets use the same logical field names as env and params. Common aliases are also accepted for database fields.
{
"host": "localhost",
"port": 1433,
"database": "dpone",
"username": "sa",
"password": "secret",
"driver": "ODBC Driver 18 for SQL Server",
"trust_server_certificate": "yes",
"bcp_path": "bcp"
}
Database aliases accepted by Vault:
| Canonical field | Vault aliases |
|---|---|
host |
db_host |
port |
db_port |
database |
db_database |
username |
user, db_user |
password |
db_password |
BigQuery Vault fields:
{
"project_id": "demo-project",
"service_account_info": {
"type": "service_account",
"project_id": "demo-project",
"client_email": "svc@example.com",
"private_key": "-----BEGIN PRIVATE KEY-----\n...\n-----END PRIVATE KEY-----\n"
}
}
Generic REST Vault fields:
Params JSON¶
params is useful for tests and smoke runs. Avoid real secrets in committed
manifests.
Postgres:
source:
type: postgres
connection_type: params
connection_id: >-
{"host":"localhost","port":5432,"database":"dpone","username":"dpone","password":"secret"}
MSSQL:
sink:
type: mssql
connection_type: params
connection_id: >-
{"host":"localhost","port":1433,"database":"dpone","username":"sa","password":"secret","driver":"ODBC Driver 18 for SQL Server","trust_server_certificate":"yes","bcp_path":"bcp"}
ClickHouse:
sink:
type: clickhouse
connection_type: params
connection_id: >-
{"host":"localhost","port":9000,"database":"dpone","username":"default","password":"secret","secure":false,"settings":{"max_threads":4}}
BigQuery:
sink:
type: bigquery
connection_type: params
connection_id: >-
{"project_id":"demo-project","service_account_key_file":"/secrets/bigquery-service-account.json"}
Kafka:
sink:
type: kafka
connection_type: params
connection_id: >-
{"bootstrap_servers":"localhost:9092","schema_registry_url":"http://localhost:8081"}
Generic REST:
source:
type: api
api_type: rest
connection_type: params
connection_id: >-
{"endpoint":"https://api.example.com","token":"bearer-token"}
options:
path: /v1/orders
records_path: data.items
Runbooks¶
Incomplete credentials¶
Check the canonical required fields:
| Runtime family | Required fields |
|---|---|
| Postgres | host, database, username, password |
| MSSQL | host, database; username/password optional for trusted auth |
| ClickHouse | host, database, username |
| BigQuery | project_id plus service_account_info or service_account_key_file |
| Kafka | bootstrap_servers |
| Generic REST | endpoint through credentials or source.options.endpoint |
Env connection works locally but fails in CI¶
Run:
Then verify the exact uppercase prefix. For connection_id: mssql_dwh, the
prefix is MSSQL_DWH_.
Airflow connection extra is ignored¶
Validate that extra is valid JSON. Invalid JSON is intentionally ignored with a
warning so a broken extra block does not crash unrelated imports.
REST source still asks for Vault¶
Generic REST supports all providers. Specialized API connectors may still only
support Vault until they implement from_credentials(). For new API connectors,
implement both from_vault() and from_credentials().
BigQuery cannot find credentials¶
Use one of these two patterns:
or:
If both are present, key file path takes precedence in the runtime factory.
Developer contract for new connectors¶
New connectors should accept a normalized CredentialsConfig and provide a small
factory method when credentials are not passed directly:
class ExampleConnector:
@classmethod
def from_credentials(cls, credentials: CredentialsConfig, **kwargs):
return cls(
endpoint=credentials.endpoint,
token=credentials.token,
**kwargs,
)
This keeps provider-specific logic inside dpone.runtime.credentials and avoids
copying Vault/env/Airflow parsing into connector modules.