Object storage staging¶
dpone supports a common object storage staging contract for S3, Google Cloud Storage, Azure Blob Storage, and a local filesystem emulator for CI.
The goal is simple: large extracts can be written once as files, uploaded to a durable staging area with checksums, then loaded by target-native fast paths. The same manifest becomes evidence for replay, certification, and run reports.
Contents¶
- Install
- Supported URI forms
- How staging works
- Python API
- Manifest shape
- Cleanup policy
- Runbook
- Related docs
Install¶
Or install only one cloud SDK:
Supported URI forms¶
| Provider | URI | Notes |
|---|---|---|
| S3 | s3://bucket/path/prefix |
Uses boto3 when S3ObjectStorageClient is used. |
| GCS | gs://bucket/path/prefix |
Uses google-cloud-storage when GCSObjectStorageClient is used. |
| Azure short | az://container/path/prefix |
Account comes from the injected BlobServiceClient or connection string. |
| Azure explicit | azure://account/container/path/prefix |
Account is preserved in the URI model for diagnostics. |
How staging works¶
flowchart LR
Export["Source native export files"]
Plan["ObjectStorageStagingPlan"]
Client["ObjectStorageClient\nS3/GCS/Azure/local"]
Upload["Upload files"]
Manifest["ObjectStorageStagingManifest"]
Sink["Target native loader"]
Evidence["Run artifacts / certification"]
Export --> Plan
Plan --> Client
Client --> Upload
Upload --> Manifest
Manifest --> Sink
Manifest --> Evidence
Every staged object records:
- object URI;
- file name;
- byte size;
- SHA-256 checksum;
- optional content type.
Python API¶
Local CI example:
from pathlib import Path
from dpone.staging.object_storage import ObjectStorageStagingPlan, ObjectStorageStagingService
from dpone.storage import LocalObjectStorageClient
client = LocalObjectStorageClient(root_dir=".dpone/object-storage-local")
service = ObjectStorageStagingService(client=client)
manifest = service.stage_files(
ObjectStorageStagingPlan(
base_uri="s3://dpone-stage/orders",
run_id="01JZDPONEOBJECTSTAGING000",
dataset="landing",
table="orders",
file_format="tsv",
compression="none",
cleanup_policy="delete_on_success",
),
[Path("part-000.tsv"), Path("part-001.tsv")],
)
print(manifest.to_dict())
Cloud adapters accept injected clients for dependency injection and testing:
from dpone.storage import S3ObjectStorageClient
client = S3ObjectStorageClient() # uses boto3.client("s3")
from dpone.storage import GCSObjectStorageClient
client = GCSObjectStorageClient() # uses google.cloud.storage.Client()
from dpone.storage import AzureBlobObjectStorageClient
client = AzureBlobObjectStorageClient(connection_string="...")
Manifest shape¶
{
"provider": "s3",
"base_uri": "s3://dpone-stage/orders",
"run_id": "01JZDPONEOBJECTSTAGING000",
"dataset": "landing",
"table": "orders",
"file_format": "tsv",
"compression": "none",
"cleanup_policy": "delete_on_success",
"object_count": 2,
"total_size_bytes": 128,
"objects": [
{
"uri": "s3://dpone-stage/orders/01JZDPONEOBJECTSTAGING000/part-000.tsv",
"file_name": "part-000.tsv",
"size_bytes": 64,
"sha256": "..."
}
]
}
Cleanup policy¶
| Policy | Behavior |
|---|---|
retain |
Keep staged files for replay/debugging. |
delete_on_success |
Delete run prefix after successful sink commit. |
delete_always |
Delete run prefix when cleanup is called, regardless of run outcome. |
Production recommendation:
- keep
retainfor CDC replay, incident recovery, and certification runs; - use
delete_on_successfor routine high-volume batch jobs when replay artifacts are stored elsewhere; - avoid
delete_alwaysunless data is also recoverable from source.
Runbook¶
| Symptom | Likely cause | Action |
|---|---|---|
| Upload fails with auth error | Cloud SDK credentials are missing or scoped incorrectly. | Run dpone doctor, verify IAM/SAS/account roles, and retry with a single small file. |
| Checksum mismatch in downstream evidence | File changed after staging or wrong local path uploaded. | Re-export files, stage again, and compare sha256 values before target load. |
| Target cannot read staged files | Bucket/container permissions differ from uploader permissions. | Grant target service account read/list on the staging prefix. |
| Cleanup deletes too much | Base URI points too high in the namespace. | Use per-pipeline prefixes and verify the generated run_id prefix before cleanup. |
| Azure URI cannot resolve account | az:// form was used without configured BlobServiceClient. |
Use azure://account/container/prefix for diagnostics or inject a configured service client. |