BigQuery Integration¶
dpone supports BigQuery as an analytical sink and state backend through the gcp extra:
Recommended manifest¶
source:
type: postgres
connection_id: postgres_oltp
connection_type: vault
table: {schema: public, name: orders}
options:
incremental_column: updated_at
sink:
type: bigquery
connection_id: bigquery_dwh
connection_type: vault
table: {schema: landing, name: orders}
strategy:
mode: incremental_merge
unique_key: [id]
merge_policy: delete_insert
duplicate_policy: fail
Load strategy behavior¶
| Strategy | BigQuery sink behavior |
|---|---|
full_refresh |
Create/replace via staging and configured exchange path. |
incremental_append |
Insert staged rows, optionally insert only missing keys. |
incremental_merge |
Default merge_policy: delete_insert; optional CTAS shadow_swap. |
replace |
Delete rows matching custom_predicate, then insert staged rows. |
partition_replace |
Native time-partition overwrite through partition decorators when possible; partition-column scoped delete/insert fallback otherwise. |
incremental_merge SQL¶
Default delete_insert:
DELETE FROM `project.landing.orders` AS t
WHERE EXISTS (
SELECT 1
FROM `project.staging.orders__tmp` AS s
WHERE s.id = t.id
);
INSERT INTO `project.landing.orders` (...)
SELECT ... FROM `project.staging.orders__tmp`;
Optional shadow_swap:
CREATE TABLE `project.landing.orders__dpone_shadow_ab12cd34` AS
SELECT t.*
FROM `project.landing.orders` AS t
WHERE NOT EXISTS (
SELECT 1 FROM `project.staging.orders__tmp` AS s WHERE s.id = t.id
);
INSERT INTO `project.landing.orders__dpone_shadow_ab12cd34` (...)
SELECT ... FROM `project.staging.orders__tmp`;
ALTER TABLE `project.landing.orders` RENAME TO orders__dpone_backup_ab12cd34;
ALTER TABLE `project.landing.orders__dpone_shadow_ab12cd34` RENAME TO orders;
DROP TABLE `project.landing.orders__dpone_backup_ab12cd34`;
BigQuery table rename can fail for recently streamed tables or restricted metadata states. In that case use the default delete_insert policy or rerun after the table is no longer under streaming rename restrictions.
partition_replace SQL¶
Native time-partition overwrite uses BigQuery query job destination partition decorators and WRITE_TRUNCATE:
-- Runtime submits this SELECT as a BigQuery query job.
-- Destination table: project.landing.orders$20260603
-- Write disposition: WRITE_TRUNCATE
SELECT ...
FROM `project.staging.orders__tmp`
WHERE CAST(`business_date` AS STRING) = @partition_value;
Native prerequisites:
- Target table has BigQuery time partitioning.
- Staged partition values can be converted to
YYYYMMDDdecorators. - The source emits a complete replacement slice for every staged partition value.
native_mode: requiredfails before fallback when any partition cannot use the native path.
Fallback SQL:
DELETE FROM `project.landing.orders`
WHERE CAST(`business_date` AS STRING) IN (
SELECT DISTINCT CAST(`business_date` AS STRING)
FROM `project.staging.orders__tmp`
);
INSERT INTO `project.landing.orders` (...)
SELECT ... FROM `project.staging.orders__tmp`;
Use native_mode: fallback when you intentionally want the safe SQL fallback and do not want query-job partition decorators.