DAG Topology & Refresh Propagation Performance

Status: Analysis
Date: 2026-03-24
Related: ARCHITECTURE.md · CONFIGURATION.md · REPORT_PARALLELIZATION.md · STATUS_PERFORMANCE.md


1. Overview

This report analyzes how DAG topology affects end-to-end change propagation latency in pg_trickle — the time from a row being written to the most upstream source table until it is visible in the most downstream stream table. The analysis covers three refresh modes (sequential CALCULATED, scheduled, and parallel), five representative topologies, and configuration tuning guidance.

Key Variables

Symbol Meaning Default
$N$ Total number of stream tables
$D$ DAG depth (longest path from source to leaf)
$W$ Maximum DAG width (STs at the widest level)
$T_r$ Per-ST refresh wall-clock time 5–200 ms
$I_s$ Scheduler interval (scheduler_interval_ms) 1000 ms
$I_p$ Parallel poll interval (when workers in-flight) 200 ms
$C$ max_concurrent_refreshes (per-DB) 4
$C_g$ max_dynamic_refresh_workers (cluster-wide) 4

2. Refresh Modes

2.1 Sequential CALCULATED (no explicit schedule)

When stream tables have no explicit SCHEDULE, the scheduler uses upstream change detection as the refresh trigger. The topological iteration order guarantees upstream STs are refreshed before downstream STs within the same tick. A key property of the implementation:

“CALCULATED STs: refresh whenever upstream sources have pending changes. The topological iteration order guarantees upstream STs are refreshed first within a tick, so their change buffers are already populated by the time we check here.”
src/scheduler.rs, check_schedule()

After refreshing ST₁, the scheduler updates data_timestamp via SPI. When ST₂ is evaluated next (same tick, same transaction), has_stream_table_source_changes() compares data_timestamp values and sees the write from earlier in the same transaction. The change cascades through the entire DAG in a single tick, regardless of depth.

Propagation latency:

$$L_{\text{calc}} = I_s + N \times T_r$$

The scheduler interval is the wait for the tick to begin; once inside the tick, all $N$ STs are refreshed sequentially in topological order.

When an upstream ST’s data_timestamp has advanced, the downstream ST receives RefreshAction::Full rather than differential, because there is no CDC change buffer for ST-to-ST propagation — the scheduler detects staleness via data_timestamp comparison only (see Section 6, insight 4).

2.2 Scheduled Mode (explicit SCHEDULE)

With explicit schedules (e.g., SCHEDULE '1s'), check_schedule() compares last_refresh_at against the schedule interval. A downstream ST’s schedule timer is independent of its upstream’s refresh — it won’t fire until its own interval expires. Each tick advances at most one depth level of the DAG.

Propagation latency:

$$L_{\text{sched}} = D \times \max(I_s, S)$$

where $S$ is the schedule interval. Since the scheduler only wakes every $I_s$ milliseconds, the floor is the scheduler interval even if the schedule is shorter (e.g., SCHEDULE '100ms' still propagates at $I_s = 1\text{s}$ per hop).

2.3 Parallel Mode (parallel_refresh_mode = 'on')

The ExecutionUnitDag identifies independent execution units that can run concurrently. The coordinator dispatches up to $C$ workers per database, capped at $C_g$ cluster-wide. Each worker is a separate PostgreSQL background worker process with its own SPI connection — spawned on demand, not pooled.

Between dispatch rounds, the scheduler sleeps for $I_p = 200\text{ms}$ (the short poll interval used when workers are in-flight). Workers run as separate background processes concurrently with this latch wait — $T_r$ is absorbed into the poll period when $T_r \leq I_p$. When $T_r > I_p$, the coordinator polls multiple times per batch; per-batch time ≈ $\lceil T_r / I_p \rceil \times I_p$.

Each round dispatches workers for all ready execution units (those whose upstream dependencies have completed in the same tick via Step 1→Step 3 of the dispatch loop).

Propagation latency for a single level of width $W_l$:

$$L{\text{level}} = \left\lceil \frac{W_l}{C{\text{eff}}} \right\rceil \times \max(I_p, T_r)$$

where $C_{\text{eff}} = \min(C, C_g - \text{workers_used_by_other_dbs})$.

Total propagation latency:

$$L{\text{parallel}} = \sum{l=1}^{D} \left\lceil \frac{W_l}{C_{\text{eff}}} \right\rceil \times \max(I_p, T_r)$$

For the common case $T_r \leq I_p = 200\text{ms}$, this simplifies to $B \times I_p$ where $B = \sum_l \lceil W_l / C_{\text{eff}} \rceil$ is the total number of dispatch rounds.


3. Topology Analysis

3.1 Linear Chain (depth = N, width = 1)

source → ST₁ → ST₂ → ST₃ → ... → ST_N

The worst case for propagation depth. Every ST depends on the previous one — zero parallelism is possible.

Mode Latency (N=500)
CALCULATED $1\text{s} + 500 \times T_r$
SCHEDULE ‘1s’ $500 \times 1\text{s} = 8.3\text{min}$
Parallel $500 \times I_p \approx 100\text{s}$ — worse than sequential

Recommendation: Use CALCULATED mode (sequential). Parallel mode adds ~200 ms of poll overhead per hop with no concurrency benefit, turning a ~5-second job into a ~100-second crawl. Do not enable parallel_refresh_mode for linear chains.

3.2 Wide DAG (depth = D, width ≈ N/D)

source → [ST₁ .. ST₅₀] → [ST₅₁ .. ST₁₀₀] → ... → [ST₄₅₁ .. ST₅₀₀]
         level 1           level 2                   level 10

N=500, D=10, ~50 STs per level. Siblings at the same level are independent.

Mode Latency
CALCULATED $1\text{s} + 500 \times T_r$ (same as linear — all 500 run sequentially)
SCHEDULE ‘1s’ $10 \times 1\text{s} = 10\text{s}$
Parallel (C=4) $10 \times 13 \times I_p \approx 26\text{s}$
Parallel (C=16) $10 \times 4 \times I_p \approx 8\text{s}$

At $T_r = 100\text{ms}$:

Mode Latency
CALCULATED $\approx 51\text{s}$
Parallel (C=16) $10 \times 4 \times 200\text{ms} \approx 8\text{s}$

Recommendation: Parallel mode with $C = 16$ delivers significant speedup over sequential for expensive refreshes. The cross-over point is where CALCULATED cost ($I_s + N \times T_r$) equals parallel cost ($B \times I_p$); for N=500, D=10, C=16 this is $T_r \approx 15\text{ms}$. For cheaper refreshes, the 200 ms poll overhead dominates and sequential CALCULATED mode is faster.

3.3 Fan-Out Tree (depth = D, exponentially widening)

source → ST₁ → [ST₂, ST₃] → [ST₄, ST₅, ST₆, ST₇] → ...

Common when a single source table feeds a master ST that branches into domain-specific materialized views at each level.

Level STs at Level Cumulative
1 1 1
2 2 3
3 4 7
$2^{l-1}$ $2l - 1$

Characteristics: - Early levels are bottlenecked by single-ST dependencies (parallel mode has little impact). - Deeper levels benefit strongly from parallelism as width grows exponentially. - CALCULATED mode handles the entire tree in one tick regardless.

Recommendation: Parallel mode helps significantly at the leaves. Set $C$ to match the expected leaf-level width, but keep in mind that the narrow upper levels cannot benefit. A fan-out tree of depth 9 and 511 STs has 256 STs at the leaf level — $C = 16..32$ is practical.

3.4 Diamond / Convergence Pattern

source → ST_A → ST_C
source → ST_B ↗

Multiple upstream STs feed into a shared downstream ST. pg_trickle groups diamond patterns into consistency groups and (when diamond_consistency = 'atomic') refreshes all members within a single sub-transaction to prevent inconsistent intermediate states.

Consistency implications: - Atomic diamond groups add SAVEPOINT overhead (~1–2 ms per group). - All members of the group must be refreshed together — the refresh time is the sum of all members, not the max. - In parallel mode, atomic groups are dispatched as a single execution unit — no intra-group parallelism.

Recommendation: Diamonds with many members (e.g., 10+ STs in one consistency group) should be checked for necessity — often a simpler query structure eliminates the diamond. When diamonds are unavoidable, ensure the per-ST refresh time is low to keep the atomic group refresh fast.

3.5 Mixed Topology (Real-World)

Real deployments typically combine all of the above:

orders ──→ orders_daily ──→ orders_summary
        ├→ orders_by_region ─┤
        └→ orders_by_product ─┘──→ exec_dashboard
products ──→ product_stats ──────→ exec_dashboard

Characteristics: - Heterogeneous depth: some paths are 2 hops, others are 4+. - Diamond convergence at aggregation points (e.g., exec_dashboard). - Mix of cheap STs (simple filters, ~5 ms) and expensive STs (multi-join aggregates, ~200 ms).

Recommendation: Use tiered scheduling (pg_trickle.tiered_scheduling = true) to assign fast-changing operational STs to the hot tier (1× schedule) and slow-changing analytical STs to warm (2×) or economy (10×). This reduces wasted work without a complex DAG restructuring.

3.6 Cyclic SCCs (circular dependencies)

When pg_trickle.allow_circular = true, strongly connected components are handled via fixed-point iteration — the scheduler refreshes all SCC members repeatedly until convergence (zero net row-count delta) or the maximum iteration limit (default 10).

Propagation latency:

$$L_{\text{scc}} = K \times |SCC| \times T_r$$

where $K$ is the number of iterations to convergence (typically 2–4 for well-structured circular dependencies, up to 10 for pathological cases).

Recommendation: Circular dependencies are expensive. Each iteration performs FULL refreshes of all SCC members. Keep SCCs small (2–3 members) and ensure they converge quickly. Use pgtrickle.refresh_stream_table() with max_iterations to test convergence offline before enabling in production.


4. Configuration Tuning Guide

4.1 max_concurrent_refreshes (per-database)

DAG Shape Recommended Value Rationale
Linear chain N/A (parallel_refresh_mode = 'off') No parallelism possible
Width ≤ 10 8 Clears each level in 1–2 batches
Width ~50 16 Clears each level in ~4 batches
Width ~100+ 24–32 Diminishing returns beyond level width

4.2 max_dynamic_refresh_workers (cluster-wide)

Deployment Recommended Value Rationale
Single database Match max_concurrent_refreshes No contention from other DBs
2–3 databases 1.5× per-DB cap (e.g., 24) Allow fair sharing
Many databases 32–64 Prevent starvation; per-DB caps divide the budget

4.3 max_worker_processes (PostgreSQL)

Each dynamic refresh worker consumes one max_worker_processes slot. Budget:

$$\texttt{max_worker_processes} \geq 1{\text{launcher}} + N{\text{db}} + C_g + \text{autovac} + \text{parallel_query} + \text{other}$$

Target $C_g$ Minimum max_worker_processes
4 (default) 16
8 20
16 26
32 42

4.4 scheduler_interval_ms

The scheduler interval is the floor for single-hop propagation latency in scheduled mode. Reducing it below 1000 ms increases tick frequency but also increases CPU overhead from DAG evaluation, schedule checks, and SPI queries.

Setting Use Case
1000 ms (default) General purpose
500 ms Low-latency requirements with < 100 STs
200 ms Sub-second propagation for small DAGs (< 20 STs)

Warning: Below 200 ms, the scheduler’s own overhead (DAG rebuild check, per-ST schedule evaluation, frontier comparison) may consume a significant fraction of the tick, especially with 100+ STs.

4.5 Poll Interval & Dispatch Overhead

In parallel mode, the scheduler polls for worker completion every 200 ms (min(scheduler_interval_ms, 200)). Workers run concurrently during this wait. The “wasted wait” — fraction of the poll period spent idle after workers have already completed — is:

$$\text{wasted wait} = \frac{I_p - T_r}{I_p} \quad (T_r \leq I_p)$$

When $T_r > I_p$, workers outlast one poll cycle; overhead becomes $(\lceil T_r / I_p \rceil \times I_p - T_r)\,/\,(\lceil T_r / I_p \rceil \times I_p)$.

$T_r$ Wasted Wait Assessment
10 ms 95% Poll overhead dominates — sequential is faster at this scale
50 ms 75% Marginal benefit; near break-even for N=500 workloads
100 ms 50% Parallelism pays off for wide DAGs
200 ms ~0% Maximum efficiency — workers fill the entire poll window
300 ms+ ~33% $T_r > I_p$ regime; coordinator takes two polls per batch

Rule of thumb: Enable parallel mode when $N \times T_r$ (sequential cost) substantially exceeds $B \times I_p$ (parallel cost), where $B = \sum \lceil W_l / C \rceil$ is the total dispatch rounds. For N=500, D=10, C=16 the threshold is $T_r \approx 15\text{ms}$. The DAG must also have meaningful width (≥ 4 independent STs per level) to benefit.


5. Performance Projections

5.1 Linear Chain (N=500, D=500, W=1)

Mode $T_r = 10\text{ms}$ $T_r = 100\text{ms}$
CALCULATED 6 s 51 s
SCHEDULE ‘1s’ 500 s (8.3 min) 500 s (8.3 min)
Parallel (C=4) ~100 s ~100 s
Parallel (C=16) ~100 s ~100 s

5.2 Wide DAG (N=500, D=10, W≈50)

Mode $T_r = 10\text{ms}$ $T_r = 100\text{ms}$
CALCULATED 6 s 51 s
SCHEDULE ‘1s’ 10 s 10 s
Parallel (C=4) ~26 s ~26 s
Parallel (C=16) ~8 s ~8 s
Parallel (C=32) ~4 s ~4 s

5.3 Fan-Out Tree (N=511, D=9, binary)

Mode $T_r = 10\text{ms}$ $T_r = 100\text{ms}$
CALCULATED 6.1 s 52 s
Parallel (C=16) ~7 s ~7 s

The narrow upper levels (1–4 STs) are sequential bottlenecks; the wide lower levels (64–256 STs) benefit from parallelism.

5.4 Summary: Optimal Mode by Topology

Topology Best Mode Why
Linear chain CALCULATED (sequential) Zero parallelism; poll overhead hurts
Wide DAG, cheap refresh CALCULATED Poll overhead outweighs parallelism
Wide DAG, expensive refresh Parallel, C=16 4× speedup at the widest levels
Fan-out tree Parallel or CALCULATED Depends on leaf-level refresh cost
Small DAG (< 20 STs) CALCULATED Not worth the complexity
Mixed production Parallel + tiered scheduling Balance latency vs. resource usage

6. Key Architectural Insights

  1. CALCULATED mode cascades the full DAG in one tick. Because stream-table- sourced STs use data_timestamp comparison (not CDC buffers) and the scheduler processes STs in topological order within a single transaction, the entire DAG converges in one pass. Depth is irrelevant for latency — only total ST count matters.

  2. Scheduled mode makes depth the bottleneck. Each depth level requires a separate tick to propagate. Reducing DAG depth (by restructuring queries to reference source tables directly rather than intermediate STs) is the most effective optimization.

  3. Parallel mode trades per-hop overhead for width throughput. The 200 ms poll interval creates a fixed cost per dispatch round. This is profitable only when the per-ST refresh time is comparable to or exceeds the poll interval, and when there are enough independent STs to fill the worker budget.

  4. Stream-table sources force FULL refresh (when ST-upstream changes are detected). When an upstream stream table’s data_timestamp advances, the downstream ST is forced to RefreshAction::Full because there is no CDC change buffer for ST-to-ST propagation — a DIFFERENTIAL refresh would be a no-op with nothing to merge. Critically, this is conditional: if an ST has mixed sources (both a base table and an upstream ST) and only the base table has changes, has_stream_table_changes is false and the normal determine_refresh_action() path runs, which can return Differential. The FULL override triggers only when has_changes && has_stream_table_changes are both true. Each FULL refresh reads from the already-materialized upstream ST table — it does not re-execute upstream queries — so work does not compound with depth.

  5. Worker processes are ephemeral. Each parallel refresh worker is a separate OS process with its own SPI connection — spawned, used, and terminated. There is no connection pool. High values of max_concurrent_refreshes increase process churn and connection setup overhead. The practical ceiling is ~32 for most deployments.

  6. Diamond consistency groups serialize their members. Atomic diamond groups wrap all member refreshes in a sub-transaction. The group’s refresh time is the sum of its members, not the max. In parallel mode, the entire group is dispatched as one unit — it occupies one worker slot but takes longer than a singleton unit.


7. Recommendations

For low-latency propagation (< 5s end-to-end)

  • Keep DAG depth ≤ 5.
  • Use CALCULATED mode (no explicit schedule).
  • Optimize per-ST refresh time via differential mode for base-table sources.
  • Avoid stream-table-to-stream-table chains longer than 3 hops (each hop is a FULL refresh).

For high-throughput wide DAGs (100+ STs)

  • Enable parallel_refresh_mode = 'on'.
  • Set max_concurrent_refreshes to half the widest DAG level, capped at available CPU cores.
  • Set max_dynamic_refresh_workers equal to max_concurrent_refreshes for single-database deployments.
  • Ensure max_worker_processes has sufficient headroom.

For mixed production workloads

  • Enable tiered scheduling to reduce unnecessary refresh cycles on low-priority STs.
  • Use explicit SCHEDULE only for STs that require guaranteed freshness bounds — leave others as CALCULATED for fastest propagation.
  • Monitor per-ST refresh duration via pgtrickle.pgt_stream_tables and tune the parallelism budget based on observed $T_r$ values.

When to restructure the DAG

  • Linear chains > 10 deep: Consider flattening by having downstream STs query source tables directly (with appropriate WHERE clauses) instead of chaining through intermediate STs.
  • Diamonds with 5+ members: Simplify the query structure to reduce the consistency group size, or accept eventual consistency (diamond_consistency = 'eventual') to allow independent refresh.
  • Circular dependencies: Keep SCCs to 2–3 members maximum. Each fixpoint iteration is $|SCC| \times T_r$ and convergence is not guaranteed to be fast.