Skip to content

Object storage staging

dpone supports a common object storage staging contract for S3, Google Cloud Storage, Azure Blob Storage, and a local filesystem emulator for CI.

The goal is simple: large extracts can be written once as files, uploaded to a durable staging area with checksums, then loaded by target-native fast paths. The same manifest becomes evidence for replay, certification, and run reports.

Contents

Install

pip install "dpone[object_storage]"

Or install only one cloud SDK:

pip install "dpone[s3]"
pip install "dpone[gcp]"
pip install "dpone[azure]"

Supported URI forms

Provider URI Notes
S3 s3://bucket/path/prefix Uses boto3 when S3ObjectStorageClient is used.
GCS gs://bucket/path/prefix Uses google-cloud-storage when GCSObjectStorageClient is used.
Azure short az://container/path/prefix Account comes from the injected BlobServiceClient or connection string.
Azure explicit azure://account/container/path/prefix Account is preserved in the URI model for diagnostics.

How staging works

flowchart LR
    Export["Source native export files"]
    Plan["ObjectStorageStagingPlan"]
    Client["ObjectStorageClient\nS3/GCS/Azure/local"]
    Upload["Upload files"]
    Manifest["ObjectStorageStagingManifest"]
    Sink["Target native loader"]
    Evidence["Run artifacts / certification"]

    Export --> Plan
    Plan --> Client
    Client --> Upload
    Upload --> Manifest
    Manifest --> Sink
    Manifest --> Evidence

Every staged object records:

  • object URI;
  • file name;
  • byte size;
  • SHA-256 checksum;
  • optional content type.

Python API

Local CI example:

from pathlib import Path

from dpone.staging.object_storage import ObjectStorageStagingPlan, ObjectStorageStagingService
from dpone.storage import LocalObjectStorageClient

client = LocalObjectStorageClient(root_dir=".dpone/object-storage-local")
service = ObjectStorageStagingService(client=client)

manifest = service.stage_files(
    ObjectStorageStagingPlan(
        base_uri="s3://dpone-stage/orders",
        run_id="01JZDPONEOBJECTSTAGING000",
        dataset="landing",
        table="orders",
        file_format="tsv",
        compression="none",
        cleanup_policy="delete_on_success",
    ),
    [Path("part-000.tsv"), Path("part-001.tsv")],
)

print(manifest.to_dict())

Cloud adapters accept injected clients for dependency injection and testing:

from dpone.storage import S3ObjectStorageClient

client = S3ObjectStorageClient()  # uses boto3.client("s3")
from dpone.storage import GCSObjectStorageClient

client = GCSObjectStorageClient()  # uses google.cloud.storage.Client()
from dpone.storage import AzureBlobObjectStorageClient

client = AzureBlobObjectStorageClient(connection_string="...")

Manifest shape

{
  "provider": "s3",
  "base_uri": "s3://dpone-stage/orders",
  "run_id": "01JZDPONEOBJECTSTAGING000",
  "dataset": "landing",
  "table": "orders",
  "file_format": "tsv",
  "compression": "none",
  "cleanup_policy": "delete_on_success",
  "object_count": 2,
  "total_size_bytes": 128,
  "objects": [
    {
      "uri": "s3://dpone-stage/orders/01JZDPONEOBJECTSTAGING000/part-000.tsv",
      "file_name": "part-000.tsv",
      "size_bytes": 64,
      "sha256": "..."
    }
  ]
}

Cleanup policy

Policy Behavior
retain Keep staged files for replay/debugging.
delete_on_success Delete run prefix after successful sink commit.
delete_always Delete run prefix when cleanup is called, regardless of run outcome.

Production recommendation:

  • keep retain for CDC replay, incident recovery, and certification runs;
  • use delete_on_success for routine high-volume batch jobs when replay artifacts are stored elsewhere;
  • avoid delete_always unless data is also recoverable from source.

Runbook

Symptom Likely cause Action
Upload fails with auth error Cloud SDK credentials are missing or scoped incorrectly. Run dpone doctor, verify IAM/SAS/account roles, and retry with a single small file.
Checksum mismatch in downstream evidence File changed after staging or wrong local path uploaded. Re-export files, stage again, and compare sha256 values before target load.
Target cannot read staged files Bucket/container permissions differ from uploader permissions. Grant target service account read/list on the staging prefix.
Cleanup deletes too much Base URI points too high in the namespace. Use per-pipeline prefixes and verify the generated run_id prefix before cleanup.
Azure URI cannot resolve account az:// form was used without configured BlobServiceClient. Use azure://account/container/prefix for diagnostics or inject a configured service client.