Plan: Bootstrap Gating for External Source Population

Date: 2026-03-04 Status: IMPLEMENTED Last Updated: 2026-03-14


1. Problem Statement

Initial Population Creates Unnecessary Churn

When external ETL processes initially populate source tables, the pg_trickle scheduler sees changes arriving and eagerly refreshes downstream stream tables. If two (or more) source tables feed into a shared downstream ST and their ETL processes run independently, the scheduler will:

  1. Detect changes in the first source and refresh the downstream ST — joining partially-loaded data from source A with an empty source B.
  2. Repeat on every scheduler tick as batches land — producing many intermediate results, all incomplete.
  3. Only converge to the correct result after both ETL processes finish.

This wastes compute and produces misleading intermediate states that may be visible to downstream consumers.

Motivating Example

An analytics platform bootstraps two tables from an external data warehouse:

ETL-A: COPY orders FROM 's3://…'          (takes 10 min)
ETL-B: COPY order_lines FROM 's3://…'     (takes 20 min)

The DAG:

orders ──→ order_summary ──┐
                           ├──→ order_report
order_lines ──→ line_summary ──┘

Without bootstrap gating:

Time Event Result
T+0 ETL-A starts loading orders
T+1m Scheduler tick — orders has changes, order_lines empty order_summary refreshed (partial), line_summary empty, order_report has orders without lines
T+2m More orders batches land Another churn cycle
T+10m ETL-A finishes order_summary nearly correct, line_summary still empty
T+11m ETL-B first batches order_report joins full orders with partial lines
T+20m ETL-B finishes Finally correct

Over 20 minutes the scheduler has executed ~20 unnecessary refresh cycles, each producing incomplete results.

Why Existing Mechanisms Don’t Help

Mechanism Why insufficient
initialize => false Prevents initial population of the ST itself, but doesn’t prevent the scheduler from refreshing it once changes appear in sources. Also requires the user to manually trigger initialization later.
Diamond consistency No shared PostgreSQL ancestor — the sources are independent base tables.
Cross-source snapshot Ensures PG-level snapshot coherence, but external temporal skew is already baked into the base table contents.
Watermark gating (PLAN_WATERMARK_GATING.md) Addresses ongoing temporal alignment of external sources. Bootstrap is a one-time readiness problem — simpler, but related. See §7 for how they compose.

Core Insight

The scheduler needs a gate that prevents downstream processing until all required sources have completed their initial population. Only the external process knows when initial load is complete. The gate should:

  • Be configurable — not all source tables need bootstrap gating.
  • Be temporary — once all sources are ready, it lifts permanently.
  • Be simple — binary ready/not-ready per source, no temporal algebra.
  • Compose naturally with watermark gating for the steady-state case.

2. Design Options

2.1 Option A — Explicit mark_source_ready() Function

The external ETL process calls a SQL function after the initial load:

-- ETL-A finishes initial population
BEGIN;
  COPY orders FROM 's3://warehouse/orders.parquet';
  SELECT pgtrickle.mark_source_ready('orders');
COMMIT;

The scheduler checks: for each ST, if any transitive source has a bootstrap gate configured but has not been marked ready, skip the ST.

Configuration — at the source level:

-- Before ETL starts, declare that these sources need bootstrap gating
SELECT pgtrickle.set_bootstrap_gate('orders');
SELECT pgtrickle.set_bootstrap_gate('order_lines');

-- Or at stream table creation time
SELECT pgtrickle.create_stream_table('order_report', '...', '1m',
    await_sources => ARRAY['orders', 'order_lines']
);

Readiness signal semantics:

  • Transactional: mark_source_ready() is part of the caller’s transaction. The readiness signal becomes visible only when the data does — preventing the scheduler from seeing “ready” before the data commits.
  • Idempotent: Calling mark_source_ready() on an already-ready source is a no-op.
  • Permanent: Once marked ready, the gate is lifted for the lifetime of the source’s participation in the DAG. The gate can be re-enabled via set_bootstrap_gate() for a re-bootstrap scenario.
  • Signal: Notifies the scheduler (via pg_notify or shared-memory signal) so gating is re-evaluated on the next tick.

2.2 Option B — Compose with Watermark Gating

Treat bootstrap as a special case of watermarks: a source in a watermark group whose watermark is NULL (never advanced) is considered “not ready.”

The current watermark gating plan (§9, question #9) proposes that NULL watermarks mean “ungated.” This option inverts that default for sources that are members of a watermark group: NULL = gated, first advance_watermark() = ready.

-- Create watermark group — gating is active immediately
SELECT pgtrickle.create_watermark_group('order_pipeline',
    sources   => ARRAY['orders', 'order_lines'],
    tolerance => '0 seconds'
);

-- ETL-A finishes, advances watermark for the first time
SELECT pgtrickle.advance_watermark('orders', '2026-03-01');

-- ETL-B finishes, advances watermark for the first time
SELECT pgtrickle.advance_watermark('order_lines', '2026-03-01');

-- Both sources now have non-NULL watermarks → alignment evaluates → STs refresh

Bootstrap naturally transitions to steady-state temporal alignment — no separate API needed.

2.3 Option C — initialize => false + Deferred Initialization

Extend the existing initialize parameter with an await_sources list. The ST is created but not scheduled until all listed sources signal readiness:

SELECT pgtrickle.create_stream_table('order_report', '...', '1m',
    initialize => false,
    await_sources => ARRAY['orders', 'order_lines']
);

When all awaited sources are marked ready, the scheduler auto-initializes the ST (runs the initial full refresh) and begins normal scheduling.

2.4 Option D — Source-Level DAG Gating

Model readiness as a property of source nodes in the DAG. A gated source blocks all transitive downstream STs from refreshing.

-- Register sources as gated before ETL starts
SELECT pgtrickle.gate_source('orders');
SELECT pgtrickle.gate_source('order_lines');

-- ETL runs...

-- Signal completion
SELECT pgtrickle.ungate_source('orders');
SELECT pgtrickle.ungate_source('order_lines');

Any ST with a transitive dependency on a gated source is automatically skipped. No per-ST configuration needed — gating flows through the DAG.


3. Comparison

A — mark_source_ready() B — Watermark composition C — Deferred init D — DAG source gating
API surface New: set_bootstrap_gate(), mark_source_ready() Reuses watermark API Extends existing create_stream_table New: gate_source(), ungate_source()
Configuration granularity Per-source or per-ST (await_sources) Per watermark group Per-ST Per-source (DAG propagation)
Ongoing overhead None after bootstrap Full watermark evaluation on every tick None after init None after ungating
Composes with watermarks Orthogonal — both can apply Native composition Orthogonal Orthogonal — subsumable by watermarks
Requires watermark infra No Yes No No
Handles re-bootstrap Yes (set_bootstrap_gate() re-arms) Yes (reset watermark to NULL — needs new API) Partially (needs_reinit + re-create) Yes (gate_source() re-arms)
Intermediate ST behavior Configurable (per-ST await_sources) Depends on watermark gating mode (§5 of watermark plan) Only the ST with await_sources is gated All transitive descendants blocked
Implementation complexity Low Medium (depends on watermark plan) Low Low-Medium

4. Recommendation

Phased Approach

Phase 1 — Option D (Source-Level DAG Gating): Implement as a standalone, low-overhead feature. It’s the simplest model with the strongest automatic propagation — gating a source automatically gates everything downstream without per-ST configuration.

The API is two functions:

-- Gate a source (blocks all downstream STs)
SELECT pgtrickle.gate_source('orders');

-- Ungate a source (allows downstream processing)
SELECT pgtrickle.ungate_source('orders');

Plus an introspection function:

-- Show current gate status for all tracked sources
SELECT * FROM pgtrickle.source_gates();

Phase 2 — Watermark Subsumption (Option B): When the watermark gating plan (PLAN_WATERMARK_GATING.md) is implemented, bootstrap gating becomes a natural subset: a source with no watermark (NULL) in a watermark group is effectively gated. The gate_source() / ungate_source() API can be retained as a convenience for users who don’t need ongoing temporal alignment — internally mapped to a lightweight boolean flag rather than the full watermark machinery.

Why Option D First

  1. DAG-native propagation. Gating a source automatically blocks all transitive descendants. The user doesn’t need to know the DAG structure — they just declare which sources aren’t ready yet.

  2. Minimal API surface. Two functions plus one introspection view. No new parameters on create_stream_table.

  3. Zero ongoing overhead. The gate is a boolean check at the start of each scheduler tick. Once all sources are ungated, the check is trivial.

  4. Clean composition path. When watermarks land, the bootstrap gate becomes “has this source ever had a watermark advanced?” — no API breakage, just a richer underlying mechanism.

  5. Independent of watermark timeline. Works equally well whether the external data has a meaningful timestamp dimension or not (e.g., a one- time CSV import has no temporal axis).


5. Detailed Design (Phase 1)

5.1 Catalog

Add a gated column to pgtrickle.pgt_change_tracking:

ALTER TABLE pgtrickle.pgt_change_tracking
  ADD COLUMN gated BOOLEAN NOT NULL DEFAULT false;

Alternatively, a standalone table:

CREATE TABLE pgtrickle.pgt_source_gates (
    source_relid  OID PRIMARY KEY,
    gated         BOOLEAN NOT NULL DEFAULT true,
    gated_at      TIMESTAMPTZ NOT NULL DEFAULT now(),
    ungated_at    TIMESTAMPTZ,
    gated_by      TEXT  -- session_user / application_name
);

The standalone table is cleaner — it tracks gating history and doesn’t pollute the CDC tracking table with unrelated state.

5.2 SQL Functions

-- Gate a source table. All downstream STs are blocked from refreshing.
-- Idempotent: gating an already-gated source is a no-op.
pgtrickle.gate_source(source_table TEXT) → void

-- Ungate a source table. Downstream STs are unblocked if all their
-- transitive sources are ungated. Transactional: the ungate becomes
-- visible when the caller's transaction commits.
-- Idempotent: ungating an already-ungated source is a no-op.
pgtrickle.ungate_source(source_table TEXT) → void

-- Introspection: current gate state for all tracked sources.
pgtrickle.source_gates() → TABLE (
    source_table  TEXT,
    schema_name   TEXT,
    gated         BOOLEAN,
    gated_at      TIMESTAMPTZ,
    ungated_at    TIMESTAMPTZ,
    gated_by      TEXT
)

5.3 Scheduler Integration

In run_scheduler_tick(), before processing each ST in topological order:

fn is_source_gated(dag: &StDag, node_id: NodeId) -> bool {
    // Collect transitive source set for this ST
    let sources = dag.transitive_sources(node_id);
    // Check if any source has an active gate
    sources.iter().any(|src| {
        // Look up gate status from catalog or cached state
        source_gate_status(src.relid) == Gated
    })
}

If any transitive source is gated: - Skip the ST for this tick. - Log: "pg_trickle: skipping {schema}.{name} — source {source} is gated (bootstrap)". - Record action = 'SKIP' with status = 'SKIPPED' in pgt_refresh_history, with a descriptive skip reason.

5.4 Signal Mechanism

ungate_source() should signal the scheduler so the gating predicate is re-evaluated promptly rather than waiting for the next natural tick:

  • Option 1: pg_notify('pgtrickle_gate_change', source_relid::text) — lightweight, the scheduler already listens for notifications.
  • Option 2: Bump the shared-memory DAG version counter — triggers a re-evaluation on the next tick. Heavier than needed since the DAG structure hasn’t changed, only gating state.

Recommendation: Use pg_notify for a lightweight signal. The scheduler can subscribe to a pgtrickle_source_gate channel and re-evaluate gating predicates without a full DAG rebuild.

5.5 Caching

The scheduler should cache gate status at the start of each tick rather than querying the catalog per-ST. A single query fetches all gated sources:

SELECT source_relid FROM pgtrickle.pgt_source_gates WHERE gated = true

This set is intersected with each ST’s transitive source set during the tick. The cache is invalidated on the next tick (or immediately if a pg_notify signal is received).

5.6 Interaction with initialize => false

The initialize parameter on create_stream_table controls whether the ST is populated at creation time. Bootstrap gating is orthogonal:

initialize Source gated Behavior
true No Normal: ST is created and immediately populated
true Yes ST creation succeeds, initial populate runs (with partial data). Subsequent refreshes are blocked until sources are ungated. Recommendation: warn at creation time that gated sources may produce incomplete initial results.
false No ST created empty. First scheduler tick triggers initial refresh.
false Yes ST created empty. Scheduler skips until all sources ungated. First refresh after ungating is the initial population. This is the recommended bootstrap pattern.

Recommended pattern for bootstrap:

-- 1. Gate sources before ETL starts
SELECT pgtrickle.gate_source('orders');
SELECT pgtrickle.gate_source('order_lines');

-- 2. Create stream tables (initialize => false avoids premature population)
SELECT pgtrickle.create_stream_table('order_summary',
    'SELECT region, SUM(amount) FROM orders GROUP BY region',
    '1m', initialize => false);
SELECT pgtrickle.create_stream_table('order_report',
    'SELECT ... FROM order_summary JOIN line_summary ...',
    '1m', initialize => false);

-- 3. Run ETL processes (can be parallel, any order)
BEGIN;
  COPY orders FROM 's3://…';
  SELECT pgtrickle.ungate_source('orders');
COMMIT;

BEGIN;
  COPY order_lines FROM 's3://…';
  SELECT pgtrickle.ungate_source('order_lines');
COMMIT;

-- 4. Next scheduler tick: all sources ungated → STs initialize and refresh

6. Extended Use Cases

A. Re-Bootstrap After Schema Change

An external system changes its schema, requiring a full re-import:

-- Gate the source, truncate, re-import
SELECT pgtrickle.gate_source('orders');
TRUNCATE orders;

-- Full re-import
COPY orders FROM 's3://warehouse/orders_v2.parquet';
SELECT pgtrickle.ungate_source('orders');

During the truncate + re-import window, downstream STs are not refreshed, avoiding the churn of processing intermediate states during re-population.

B. Coordinated Multi-Source Bootstrap

Multiple sources with different load times:

-- Gate all sources upfront
SELECT pgtrickle.gate_source('customers');
SELECT pgtrickle.gate_source('orders');
SELECT pgtrickle.gate_source('products');

-- ETL processes run independently, potentially in parallel
-- Each ungates its source upon completion
-- Downstream STs that depend on all three only start refreshing
-- when the last source is ungated

C. Partial DAG Gating

Only gate sources that feed into a specific part of the DAG:

customers ──→ customer_stats ──┐
                               ├──→ customer_report
orders ──→ order_summary ──────┘
                │
                └──→ order_dashboard (only depends on orders)

If only customers is gated: - customer_stats and customer_report are blocked (transitive dependency). - order_summary and order_dashboard refresh normally (no dependency on gated source).

D. Bootstrap with Existing Stream Tables

Adding a new source to an existing pipeline:

-- New source table, needs initial population
CREATE TABLE inventory (sku TEXT, qty INT);
SELECT pgtrickle.gate_source('inventory');

-- Create a new ST that joins existing and new sources
SELECT pgtrickle.create_stream_table('inventory_report',
    'SELECT o.*, i.qty FROM order_summary o JOIN inventory i ON ...',
    '1m', initialize => false);

-- ETL populates inventory
COPY inventory FROM 's3://…';
SELECT pgtrickle.ungate_source('inventory');

Existing STs (order_summary, etc.) continue refreshing normally since they don’t depend on the gated source.


7. Composability with Other Mechanisms

Bootstrap Gating + Watermark Gating

Bootstrap gating and watermark gating serve different lifecycle phases:

┌─────────────────┐     ┌──────────────────────────────────┐
│  BOOTSTRAP PHASE │     │       STEADY-STATE PHASE          │
│                  │     │                                    │
│  gate_source()   │────▶│  advance_watermark(T) on each     │
│  ETL loads data  │     │  load cycle; watermark group       │
│  ungate_source() │     │  ensures temporal alignment        │
└─────────────────┘     └──────────────────────────────────┘

Both can be active simultaneously. The scheduler evaluates gates in order:

  1. Bootstrap gate: Is any transitive source gated? → Skip.
  2. Watermark gate: Are watermark groups aligned? → Skip if misaligned.
  3. Normal scheduling: Check schedule, changes, etc. → Refresh.

When watermarks land, the bootstrap gate for a source can optionally be auto-lifted when the first watermark is advanced (configurable):

SELECT pgtrickle.gate_source('orders',
    auto_ungate_on_watermark => true  -- lift gate on first advance_watermark()
);

Bootstrap Gating + Diamond Consistency

Orthogonal. Diamond consistency governs how structurally related STs are refreshed atomically. Bootstrap gating governs whether they refresh at all. A diamond group where any source is gated simply doesn’t refresh as a unit.

Bootstrap Gating + refresh_mode = 'IMMEDIATE'

IMMEDIATE mode STs are refreshed synchronously within the source DML transaction. Bootstrap gating doesn’t apply to IMMEDIATE STs — they are maintained on every write by definition. If the user doesn’t want IMMEDIATE STs to see partial bootstrap data, they should use DIFFERENTIAL mode during bootstrap and switch to IMMEDIATE after ungating.


8. Open Questions

  1. Should gate_source() work on tables not yet tracked by pg_trickle? If the user gates a table before creating any ST that depends on it, should pg_trickle record the gate in pgt_source_gates speculatively? This is useful for the “gate first, create STs second” pattern (§5.6). Proposed: yes — the gate table is independent of pgt_change_tracking.

  2. Should ungating be automatic after the first successful refresh? Once all sources are ungated and the downstream ST completes its first full refresh, the gate has served its purpose. Should the gate rows be cleaned up automatically, or kept for audit? Proposed: keep the rows with ungated_at timestamp, but the scheduler ignores rows where gated = false.

  3. Should there be a timeout / stale gate warning? If a source has been gated for longer than a configurable duration (e.g., 1 hour), pg_trickle could emit a warning: “source X has been gated for 1h — is the ETL still running?” This helps detect stuck ETL processes.

  4. Should create_stream_table auto-detect gated sources and set initialize => false? If the user creates a ST whose sources include a gated source, should pg_trickle automatically skip initial population (since it would produce incomplete results)? Proposed: yes, with a NOTICE explaining why.

  5. Per-ST override? Should a ST be able to opt out of bootstrap gating even if one of its sources is gated? Use case: a monitoring ST that intentionally shows partial progress during bootstrap. Proposed: add bootstrap_gating => 'ignore' on create_stream_table for this case.

  6. Bulk gate/ungate? For pipelines with many sources, a convenience function: sql SELECT pgtrickle.gate_sources(ARRAY['orders', 'order_lines', 'products']); SELECT pgtrickle.ungate_sources(ARRAY['orders', 'order_lines', 'products']);


9. Implementation Steps

Step 1 — Catalog: pgt_source_gates

Create table (§5.1), add to upgrade migration SQL. Structs in catalog.rs.

Step 2 — gate_source() / ungate_source() SQL Functions

#[pg_extern(schema = "pgtrickle")] in api.rs. Validation (source must be a valid relation), idempotency, transactional semantics, pg_notify signal.

Step 3 — Scheduler Gate Check

In scheduler.rs, at the start of each ST evaluation in topological order: 1. Load gated source set (cached per tick). 2. Compute transitive source set for the ST. 3. Intersect. If non-empty, skip with reason.

Step 4 — source_gates() Introspection Function

Return current gate status for all tracked sources.

Step 5 — create_stream_table Integration

When a new ST is created, check if any of its sources are gated. If so: - Set initialize => false automatically (skip initial population). - Emit NOTICE: “Source table X is currently gated; initial population deferred until all sources are ungated.”

Step 6 — Tests

Test Type What it proves
test_gate_source_blocks_downstream E2E Gated source prevents all downstream ST refresh
test_ungate_source_allows_refresh E2E Ungating triggers refresh on next tick
test_gate_multiple_sources_all_must_ungate E2E ST with two gated sources waits for both
test_gate_partial_dag E2E Only STs depending on gated source are blocked
test_gate_idempotent Unit Double gate / double ungate are no-ops
test_gate_transactional E2E Ungate not visible until caller commits
test_gate_create_st_auto_skip_init E2E ST created with gated source skips initialization
test_gate_with_immediate_mode E2E IMMEDIATE STs unaffected by source gates

Step 7 — Documentation

  • docs/SQL_REFERENCE.md: gate_source(), ungate_source(), source_gates().
  • docs/tutorials/: “Bootstrapping External Data Sources” tutorial.
  • CHANGELOG.md.

10. Prior Art

  1. Apache Airflow Sensors — Airflow’s ExternalTaskSensor and S3KeySensor gate downstream tasks until upstream dependencies are satisfied. The bootstrap gate is analogous: a sensor that waits for “source data is fully loaded” before allowing downstream processing.

  2. dbt ref() Dependencies + --defer — dbt’s --defer flag allows models to reference artifacts from a prior run when upstream models haven’t been rebuilt yet. The bootstrap gate is a stronger guarantee: don’t run the model at all until inputs are ready, rather than using stale data.

  3. Kafka Consumer Group Readiness — Kafka Streams applications wait until all partitions are assigned and initial offsets are committed before processing begins. The bootstrap gate applies the same principle to pg_trickle’s scheduler.

  4. Flink Checkpoint Barriers — Flink uses barriers in the data stream to coordinate exactly-once processing across operators. While more granular than bootstrap gating, the principle of “don’t process until all inputs are at a known state” is the same.

  5. Materialize SINCE Frontier — Materialize tracks a SINCE timestamp frontier per source. A source whose frontier is at the initial epoch hasn’t produced any meaningful output yet. Downstream operators wait until all input frontiers advance past the epoch before producing results.


11. Relationship to Other Plans

Plan Relationship
PLAN_WATERMARK_GATING.md Complementary. Bootstrap gating handles one-time initial-readiness. Watermark gating handles ongoing temporal alignment. Phase 2 subsumes bootstrap into watermarks via “NULL watermark = gated.”
PLAN_DIAMOND_DEPENDENCY_CONSISTENCY.md Orthogonal. Diamond consistency governs structural refresh atomicity. Bootstrap gating governs whether refresh occurs at all.
PLAN_CROSS_SOURCE_SNAPSHOT_CONSISTENCY.md Orthogonal. Snapshot consistency ensures PG-level coherence. Bootstrap gating ensures external data completeness.
PLAN_FUSE.md Complementary. Fuse halts refresh on anomalous change volume. Bootstrap gating halts refresh on incomplete source population. Both are pre-conditions evaluated before refresh.
PLAN_TRANSACTIONAL_IVM.md Bootstrap gating does not apply to IMMEDIATE mode STs (maintained synchronously in the DML transaction). Users should defer IMMEDIATE mode activation until after bootstrap.