Write-Audit-Publish (WAP) Pattern
Implement WAP by writing a Python script using the bauplan SDK. Do NOT use CLI commands.
The three steps: Write (ingest to temp branch) → Audit (quality checks) → Publish (merge to main)
Branch safety: All operations happen on a temporary branch, NEVER on main. By default, branches are kept open for inspection after success or failure.
Atomic multi-table operations: merge_branch is atomic. You can create or modify multiple tables on a branch, and when you merge, either all changes apply to main or none do. This enables safe multi-table ingestion workflows.
Before writing the WAP script, you MUST ask the user for the following parameters:
- S3 path (required): The S3 URI pattern for the source data (e.g.,
s3://bucket/path/*.parquet)
- Table name (required): The name for the target table
- On success behavior (optional):
inspect (default): Keep the branch open for user inspection before merging
merge: Automatically merge to main and delete the branch
- On failure behavior (optional):
keep (default): Leave the branch open for inspection/debugging
delete: Delete the failed branch
WAP Script Template
This is the complete template for WAP:
python
1"""
2WAP (Write-Audit-Publish) Template for bauplan data ingestion.
3
4Usage:
5 python wap_template.py
6
7Or import and call wap_ingest() with your parameters.
8"""
9import bauplan
10from datetime import datetime
11
12
13def wap_ingest(
14 table_name: str,
15 s3_path: str,
16 namespace: str = "bauplan",
17 on_success: str = "inspect", # "inspect" (default) or "merge"
18 on_failure: str = "keep", # "keep" (default) or "delete"
19):
20 """
21 Write-Audit-Publish flow for safe data ingestion.
22
23 Args:
24 table_name: Target table name
25 s3_path: S3 URI pattern (e.g., 's3://bucket/path/*.parquet')
26 namespace: Target namespace (default: 'bauplan')
27 on_success: "inspect" to keep branch for review, "merge" to auto-merge
28 on_failure: "keep" to preserve branch for debugging, "delete" to cleanup
29
30 Returns:
31 tuple: (branch_name, success)
32 """
33 client = bauplan.Client()
34
35 # Generate unique branch name using username
36 info = client.info()
37 username = info.user.username
38 branch_name = f"{username}.wap_{table_name}_{int(datetime.now().timestamp())}"
39
40 success = False
41 try:
42 # === WRITE PHASE ===
43 # 1. Create temporary branch from main (must not exist)
44 assert not client.has_branch(branch_name), (
45 f"Branch '{branch_name}' already exists - this should be an ephemeral branch"
46 )
47 client.create_branch(branch_name, from_ref="main")
48
49 # 2. Verify table doesn't exist on branch before creating
50 assert not client.has_table(
51 table=table_name, ref=branch_name, namespace=namespace
52 ), (
53 f"Table '{namespace}.{table_name}' already exists on branch - refusing to overwrite"
54 )
55
56 # 3. Create table (schema inferred from S3 files)
57 client.create_table(
58 table=table_name,
59 search_uri=s3_path,
60 namespace=namespace,
61 branch=branch_name,
62 )
63
64 # 4. Import data into table
65 client.import_data(
66 table=table_name,
67 search_uri=s3_path,
68 namespace=namespace,
69 branch=branch_name,
70 )
71
72 # === AUDIT PHASE ===
73 # 5. Run quality check: verify data was imported
74 fq_table = f"{namespace}.{table_name}"
75 result = client.query(
76 query=f"SELECT COUNT(*) as row_count FROM {fq_table}", ref=branch_name
77 )
78 row_count = result.column("row_count")[0].as_py()
79 assert row_count > 0, "No data was imported"
80 print(f"Imported {row_count} rows")
81
82 success = True
83
84 # === PUBLISH PHASE ===
85 if on_success == "merge":
86 # 6. Merge to main and cleanup
87 client.merge_branch(source_ref=branch_name, into_branch="main")
88 print(f"Successfully published {table_name} to main")
89 client.delete_branch(branch_name)
90 print(f"Cleaned up branch: {branch_name}")
91 else:
92 # Keep branch for inspection
93 print(
94 f"WAP completed successfully. Branch '{branch_name}' ready for inspection."
95 )
96 print(
97 f"To merge manually: client.merge_branch(source_ref='{branch_name}', into_branch='main')"
98 )
99
100 except Exception as e:
101 print(f"WAP failed: {e}")
102 if on_failure == "delete":
103 if client.has_branch(branch_name):
104 client.delete_branch(branch_name)
105 print(f"Cleaned up failed branch: {branch_name}")
106 else:
107 print(f"Branch '{branch_name}' preserved for inspection/debugging.")
108 raise
109
110 return branch_name, success
111
112
113if __name__ == "__main__":
114 # Example: customize these parameters
115 branch, success = wap_ingest(
116 table_name="my_table",
117 s3_path="s3://my-bucket/data/*.parquet",
118 namespace="bauplan",
119 on_success="inspect",
120 on_failure="keep",
121 )
Minimal usage:
python
1from wap_template import wap_ingest
2
3branch, success = wap_ingest(
4 table_name="orders",
5 s3_path="s3://my-bucket/data/*.parquet",
6 namespace="bauplan",
7 on_success="inspect", # or "merge"
8 on_failure="keep" # or "delete"
9)
Key SDK Methods
| Method | Description |
|---|
bauplan.Client() | Initialize the bauplan client |
client.info() | Get client info; access username via .user.username |
client.create_branch(name, from_ref="main") | Create a new branch from specified ref |
client.has_branch(name) | Check if branch exists |
client.delete_branch(name) | Delete a branch |
client.create_table(table, search_uri, ...) | Create table with schema inferred from S3 |
client.import_data(table, search_uri, ...) | Import data from S3 into table |
client.query(query, ref) | Run SQL query, returns PyArrow Table |
client.merge_branch(source_ref, into_branch) | Merge branch into target |
client.has_table(table, ref, namespace) | Check if table exists on branch |
SDK Reference: For detailed method signatures, check https://docs.bauplanlabs.com/reference/bauplan
Workflow Checklist
Copy and track progress:
WAP Progress:
Example Output
Successful run (on_success="inspect"):
bash
1$ python wap_script.py
2Imported 15234 rows
3WAP completed successfully. Branch 'alice.wap_orders_1704067200' ready for inspection.
4To merge manually: client.merge_branch(source_ref='alice.wap_orders_1704067200', into_branch='main')
Successful run (on_success="merge"):
bash
1$ python wap_script.py
2Imported 15234 rows
3Successfully published orders to main
4Cleaned up branch: alice.wap_orders_1704067200
Failed run (on_failure="keep"):
bash
1$ python wap_script.py
2WAP failed: No data was imported
3Branch 'alice.wap_orders_1704067200' preserved for inspection/debugging.
WAP on Existing Tables
To append data to an existing table, skip create_table and only call import_data:
python
1# Table already exists on main - just import new data
2client.import_data(
3 table=table_name,
4 search_uri=s3_path,
5 namespace=namespace,
6 branch=branch_name
7)
This appends rows to the existing table schema. The audit and publish phases remain the same: the new rows are automatically sandboxed on the branch until merged.
CLI Merge After Inspection
When on_success="inspect" (default), the branch is left open for user review. If the user asks to merge after inspecting the data, use the CLI:
bash
1# 1. Checkout to main first (required before merging)
2bauplan checkout main
3
4# 2. Merge the WAP branch into main
5bauplan branch merge <username>.wap_<table_name>_<timestamp>
6
7# 3. Optionally delete the branch after successful merge
8bauplan branch rm <username>.wap_<table_name>_<timestamp>
Note: You must be on main to run bauplan branch merge. The branch name is printed by the WAP script upon completion.