PLAN: Circular References in the Stream Table Dependency Graph

Status: Parts 1–5 implemented (v0.6.0 foundation + v0.7.0 scheduler integration)

Objective

Enable stream tables to form mutual dependencies (cycles) in the dependency graph, where stream table A references stream table B and vice versa. The system must refresh such cycles deterministically by iterating to a fixed point, analogous to how recursive CTEs work within a single query.


Background

Current Behavior

The dependency graph is enforced as a strict DAG. When create_stream_table() is called, check_for_cycles() in src/api.rs builds the full graph, adds the proposed edges, and runs Kahn’s algorithm. If a cycle is detected, the creation is rejected with PgTrickleError::CycleDetected.

The scheduler in src/scheduler.rs relies on topological_order() (Kahn’s BFS) to determine refresh order. This algorithm is undefined for graphs with cycles — it simply excludes nodes that are part of cycles.

Precedent: Recursive CTEs

The extension already solves the same fundamental problem — circular data flow — at the query level. WITH RECURSIVE CTEs use:

  1. Semi-naive evaluation: Propagate only new rows through the recursive term until no new rows are produced (fixed point).
  2. Delete-and-Rederive (DRed): Handle mixed INSERT/DELETE changes by over-deleting, then rederiving to restore rows with alternative derivations.

Both strategies converge to a fixed point for monotone queries (queries where adding input rows can only add output rows, never remove them). Non-monotone queries (containing negation, EXCEPT, aggregates) may not converge.

Theoretical Foundation

The theory comes from Datalog stratification and DBSP (Dynamic Batch Stream Processing):

  • Monotone cycles always converge: JOINs, UNIONs, projections, filters without negation.
  • Non-monotone cycles may oscillate: EXCEPT, NOT IN, anti-joins, aggregates (COUNT can decrease when rows are deleted).
  • Stratification partitions a program into layers where cycles only occur within monotone strata, and non-monotone dependencies only flow between strata (always downward, never in a cycle).

Design

Core Concept: Strongly Connected Components (SCCs)

Replace the strict DAG requirement with a condensation graph: decompose the dependency graph into its strongly connected components (SCCs) using Tarjan’s algorithm. Each SCC becomes a single “super-node” in the condensation, which is always a DAG.

Before:                    Condensation:
  A ──→ B                    [A, B] ──→ C
  B ──→ A
  B ──→ C

SCC₁ = {A, B}  (cycle)
SCC₂ = {C}     (singleton)

The scheduler processes SCCs in topological order of the condensation graph. Singleton SCCs (no cycle) are refreshed exactly as today. Multi-node SCCs are iterated to fixed point.

Fixed-Point Iteration for an SCC

For an SCC containing stream tables {ST₁, ST₂, ..., STₙ}:

iteration = 0
loop:
    total_changes = 0
    for each STᵢ in SCC (any order):
        (inserted, deleted) = refresh(STᵢ)
        total_changes += inserted + deleted
    iteration += 1
    if total_changes == 0:
        break  // fixed point reached
    if iteration >= max_iterations:
        mark all STᵢ as ERROR
        log warning "SCC did not converge after {max_iterations} iterations"
        break

Each refresh produces a delta (rows inserted + deleted). When a full pass over all SCC members produces zero changes, the system has converged.

Monotonicity Analysis (Static Safety Check)

At create_stream_table() time, if the proposed stream table would create a cycle, analyze whether the cycle is monotone-safe:

Safe operators (monotone — adding input rows can only add output rows): - Scan, Filter, Project, InnerJoin, LeftJoin - UnionAll, Union, Distinct - CteScan, RecursiveCte

Unsafe operators (non-monotone — adding input rows may remove output rows): - Aggregate (COUNT/SUM can decrease on DELETE) - Except (adding rows to the right branch removes output) - Window (rank can change, causing rows to appear/disappear)

For cycles containing only safe operators, convergence is guaranteed (in at most N iterations where N is the longest simple path in the SCC). For cycles containing unsafe operators, the system should either:

  1. Reject — refuse to create the cycle (safest default)
  2. Warn and allow — with explicit user opt-in via a parameter like ALLOW CIRCULAR

Convergence Guarantee

For monotone cycles, the theory guarantees convergence because: - Each iteration can only add rows (monotonicity) - The result set is bounded (finite number of possible rows from finite input tables) - Therefore, the sequence of result sets forms a monotonically increasing chain bounded above, which must reach a fixed point in finite steps

The maximum number of iterations is bounded by the output cardinality, but in practice convergence happens in much fewer iterations (typically 2-5 for transitive closure patterns).


Implementation

Part 1 — Tarjan’s Algorithm for SCC Decomposition

Replace or augment Kahn’s algorithm with Tarjan’s algorithm to decompose the graph into SCCs.

New Method: StDag::compute_sccs()

/// A strongly connected component of the dependency graph.
pub struct Scc {
    /// Node IDs in this SCC.
    pub nodes: Vec<NodeId>,
    /// True if this SCC has more than one node (i.e., contains a cycle).
    pub is_cyclic: bool,
}

/// Compute SCCs using Tarjan's algorithm (1972).
///
/// Returns SCCs in reverse topological order of the condensation graph
/// (upstream SCCs first).
pub fn compute_sccs(&self) -> Vec<Scc> { ... }

Tarjan’s algorithm runs in O(V + E) time, same as Kahn’s.

New Method: StDag::condensation_order()

Replaces topological_order() for the scheduler:

/// Return stream table nodes in refresh order, with SCCs grouped.
///
/// Singleton SCCs (no cycle) are returned as-is. Multi-node SCCs are
/// returned as groups that must be iterated to fixed point.
pub fn condensation_order(&self) -> Result<Vec<Scc>, PgTrickleError> {
    let sccs = self.compute_sccs();
    // SCCs are already in reverse topological order from Tarjan's
    Ok(sccs)
}

Files to Change

File Change
src/dag.rs Add Scc struct, compute_sccs(), condensation_order()
src/dag.rs (tests) Unit tests for SCC decomposition

Unit Tests

#[test]
fn test_scc_no_cycles() {
    // A → B → C: three singleton SCCs
}

#[test]
fn test_scc_simple_cycle() {
    // A → B → A: one SCC {A, B}
}

#[test]
fn test_scc_mixed() {
    // {A, B} cycle → C singleton → {D, E} cycle
    // condensation order: [A,B], [C], [D,E]
}

#[test]
fn test_scc_self_loop() {
    // A → A: one SCC {A} (is_cyclic = true)
}

#[test]
fn test_condensation_order_is_topological() {
    // verify upstream SCCs appear before downstream SCCs
}

Estimated Effort

Medium — ~120 lines for Tarjan’s algorithm + ~80 lines of tests.


Part 2 — Monotonicity Checker

Add a static analysis pass that determines whether an OpTree is monotone (safe for cyclic fixed-point iteration).

New Function: check_monotonicity()

/// Check if an OpTree is monotone (safe for cyclic dependencies).
///
/// Returns `Ok(())` if all operators are monotone, or `Err` with the
/// first non-monotone operator found.
pub fn check_monotonicity(tree: &OpTree) -> Result<(), PgTrickleError> {
    match tree {
        OpTree::Scan { .. } => Ok(()),
        OpTree::Filter { child, .. } => check_monotonicity(child),
        OpTree::Project { child, .. } => check_monotonicity(child),
        OpTree::InnerJoin { left, right, .. } => {
            check_monotonicity(left)?;
            check_monotonicity(right)
        }
        OpTree::LeftJoin { left, right, .. } => {
            check_monotonicity(left)?;
            check_monotonicity(right)
        }
        OpTree::Distinct { child } => check_monotonicity(child),
        OpTree::UnionAll { children } => {
            for c in children { check_monotonicity(c)?; }
            Ok(())
        }
        OpTree::Aggregate { .. } => Err(PgTrickleError::UnsupportedOperator(
            "Aggregate is not monotone — cannot be part of a cyclic dependency".into()
        )),
        OpTree::Except { .. } => Err(PgTrickleError::UnsupportedOperator(
            "EXCEPT is not monotone — cannot be part of a cyclic dependency".into()
        )),
        OpTree::Window { .. } => Err(PgTrickleError::UnsupportedOperator(
            "Window functions are not monotone — cannot be part of a cyclic dependency".into()
        )),
        // Recurse into subqueries, CTEs, etc.
        OpTree::Subquery { child, .. } => check_monotonicity(child),
        OpTree::CteScan { .. } => Ok(()),
        OpTree::RecursiveCte { base, recursive, .. } => {
            check_monotonicity(base)?;
            check_monotonicity(recursive)
        }
        OpTree::RecursiveSelfRef { .. } => Ok(()),
    }
}

Integration with create_stream_table()

When check_for_cycles() detects a cycle, instead of immediately returning CycleDetected, it should:

  1. Parse the defining queries of all stream tables in the proposed cycle
  2. Run check_monotonicity() on each
  3. If all are monotone → allow the cycle (store a flag in the catalog)
  4. If any is non-monotone → return CycleDetected with an enhanced error message explaining which operator breaks monotonicity

Files to Change

File Change
src/dvm/parser.rs Add check_monotonicity() function
src/api.rs Update check_for_cycles() to allow monotone cycles
src/dvm/parser.rs (tests) Unit tests for monotonicity checker

Unit Tests

#[test]
fn test_scan_filter_project_is_monotone() { }

#[test]
fn test_join_is_monotone() { }

#[test]
fn test_aggregate_is_not_monotone() { }

#[test]
fn test_except_is_not_monotone() { }

#[test]
fn test_window_is_not_monotone() { }

#[test]
fn test_nested_non_monotone_detected() {
    // Filter { child: Aggregate { ... } } → non-monotone
}

Estimated Effort

Low — ~60 lines for the checker + ~40 lines of tests.


Part 3 — Catalog Changes

Track which stream tables participate in cyclic SCCs so the scheduler and refresh logic know to use fixed-point iteration.

Schema Change

ALTER TABLE pgtrickle.pgt_stream_tables
    ADD COLUMN scc_id INT;
  • scc_id IS NULL — stream table is in a singleton SCC (no cycle)
  • scc_id = N — stream table is part of cyclic SCC N (all members share the same value)

When a create_stream_table() creates a cycle: 1. Assign a new scc_id to all members of the cycle 2. Store it in the pgt_stream_tables catalog

When drop_stream_table() removes a member and breaks the cycle: 1. Recompute SCCs 2. Clear scc_id for any members no longer in a cycle

New Refresh History Column

ALTER TABLE pgtrickle.pgt_refresh_history
    ADD COLUMN fixpoint_iteration INT;

Records which iteration of the fixed-point loop produced this refresh. NULL for non-cyclic refreshes. Useful for debugging convergence.

Files to Change

File Change
src/lib.rs Add scc_id column to pgt_stream_tables DDL, fixpoint_iteration to pgt_refresh_history
src/catalog.rs Add scc_id to StreamTableMeta, fixpoint_iteration to RefreshRecord
tests/common/mod.rs Mirror DDL changes
docs/SQL_REFERENCE.md Document new columns

Estimated Effort

Low — ~40 lines across DDL + struct changes.


Part 4 — GUC Configuration

Add configuration variables for fixed-point iteration.

// Maximum iterations before declaring non-convergence
pub static PGS_MAX_FIXPOINT_ITERATIONS: GucSetting<i32> = GucSetting::<i32>::new(100);

// Whether to allow cyclic dependencies (opt-in safety gate)
pub static PGS_ALLOW_CIRCULAR: GucSetting<bool> = GucSetting::<bool>::new(false);
GUC Default Description
pg_trickle.max_fixpoint_iterations 100 Maximum iterations per SCC before declaring non-convergence and marking members as ERROR
pg_trickle.allow_circular false Master switch: must be true to create cyclic dependencies. When false, cycle detection rejects as today

Files to Change

File Change
src/config.rs Add two new GUCs
docs/CONFIGURATION.md Document new GUCs

Estimated Effort

Low — ~30 lines.


Part 5 — Scheduler: Fixed-Point Iteration

Modify the scheduler’s refresh loop to handle multi-node SCCs.

Current Flow

DAG → topological_order() → for each ST: check schedule + refresh

New Flow

DAG → condensation_order() → for each SCC:
    if singleton:
        check schedule + refresh (unchanged)
    if cyclic:
        if any member needs refresh:
            iterate_to_fixpoint(SCC)

iterate_to_fixpoint() Implementation

fn iterate_to_fixpoint(
    scc: &Scc,
    dag: &StDag,
    retry_states: &mut HashMap<i64, RetryState>,
    retry_policy: &RetryPolicy,
    now_ms: u64,
) {
    let max_iter = config::pg_trickle_max_fixpoint_iterations();

    for iteration in 0..max_iter {
        let mut total_changes: i64 = 0;

        for node_id in &scc.nodes {
            let pgt_id = match node_id {
                NodeId::StreamTable(id) => *id,
                _ => continue,
            };

            let st = match load_st_by_id(pgt_id) {
                Some(st) => st,
                None => continue,
            };

            if st.status != StStatus::Active {
                continue;
            }

            let has_changes = check_upstream_changes(&st);
            let action = refresh::determine_refresh_action(&st, has_changes);

            match action {
                RefreshAction::NoData if iteration > 0 => {
                    // This member has converged — no changes from upstream
                }
                _ => {
                    let result = execute_scheduled_refresh_with_iteration(&st, action, iteration);
                    match result {
                        RefreshOutcome::Success(inserted, deleted) => {
                            total_changes += inserted + deleted;
                        }
                        RefreshOutcome::RetryableFailure | RefreshOutcome::PermanentFailure => {
                            // Abort the entire SCC iteration — can't converge
                            // if a member fails
                            log!(
                                "pg_trickle: SCC fixpoint aborted — {}.{} failed",
                                st.pgt_schema, st.pgt_name,
                            );
                            return;
                        }
                    }
                }
            }
        }

        if total_changes == 0 {
            log!(
                "pg_trickle: SCC converged after {} iteration(s)",
                iteration + 1,
            );
            return;
        }
    }

    // Non-convergence: mark all SCC members as ERROR
    warning!(
        "pg_trickle: SCC did not converge after {} iterations — marking members as ERROR",
        max_iter,
    );
    for node_id in &scc.nodes {
        if let NodeId::StreamTable(id) = node_id {
            if let Err(e) = StreamTableMeta::set_status(*id, "ERROR") {
                log!("pg_trickle: failed to set ERROR status for ST {}: {}", id, e);
            }
        }
    }
}

Convergence Detection

The key insight is that execute_differential_refresh() already returns (rows_inserted, rows_deleted). When both are zero for all members in a pass, the SCC has converged. No new mechanism is needed — the existing return values provide natural convergence detection.

For FULL refresh mode within an SCC, convergence detection requires comparing the new result against the current storage, which the FULL refresh path doesn’t currently do (it truncates and re-inserts). Two options:

  1. Prohibit FULL mode in cyclic SCCs — only DIFFERENTIAL mode is meaningful for fixed-point iteration. FULL mode would re-insert all rows every iteration, never converging.
  2. Use a hash comparison — compute a hash of the FULL result before and after. This adds overhead but enables convergence detection without differential tracking.

Recommendation: Require DIFFERENTIAL mode for all members of a cyclic SCC. This is natural since fixed-point iteration is inherently an incremental process.

Schedule Semantics for SCCs

When any member of an SCC exceeds its schedule, the entire SCC is refreshed (iterated to fixed point). The effective schedule for the SCC is MIN(schedule) across all members. This follows naturally from the semantics: if any member is stale, all members might be stale due to the mutual dependency.

Files to Change

File Change
src/scheduler.rs Replace topological_order() with condensation_order(), add iterate_to_fixpoint()
src/scheduler.rs (tests) Unit tests for fixed-point convergence
src/refresh.rs Return change counts more prominently; validate DIFFERENTIAL mode for SCC members

Estimated Effort

High — ~200 lines for scheduler changes + ~100 lines of tests.


Part 6 — Validation at Creation Time ✅ Done

Implemented in CYC-6 commit. check_for_cycles() and check_for_cycles_alter() now conditionally allow monotone cycles via validate_cycle_allowed(). SCC IDs are assigned/recomputed by assign_scc_ids_from_dag() on create, alter, and drop.

Modify check_for_cycles() in src/api.rs to conditionally allow cycles.

New Flow

fn check_for_cycles(source_relids: &[(Oid, String)]) -> Result<(), PgTrickleError> {
    // ... existing DAG build ...

    match dag.detect_cycles() {
        Ok(()) => Ok(()),  // No cycle — always allowed
        Err(PgTrickleError::CycleDetected(nodes)) => {
            // Cycle detected — check if allowed
            if !config::pg_trickle_allow_circular() {
                return Err(PgTrickleError::CycleDetected(nodes));
            }

            // Check monotonicity of all STs in the cycle
            for node_name in &nodes {
                let meta = StreamTableMeta::get_by_name(node_name)?;
                let parse_result = parse_defining_query(&meta.defining_query)?;
                check_monotonicity(&parse_result.tree)?;
            }

            // All members are monotone — allow the cycle
            // Verify all members use DIFFERENTIAL mode
            for node_name in &nodes {
                let meta = StreamTableMeta::get_by_name(node_name)?;
                if meta.refresh_mode != RefreshMode::Differential {
                    return Err(PgTrickleError::InvalidArgument(format!(
                        "stream table '{}' must use DIFFERENTIAL refresh mode \
                         to participate in a circular dependency",
                        node_name,
                    )));
                }
            }

            Ok(())
        }
        Err(e) => Err(e),
    }
}

Files to Change

File Change
src/api.rs Update check_for_cycles() to conditionally allow monotone cycles
src/api.rs Assign scc_id to cycle members after successful creation
src/api.rs Update drop_stream_table() to recompute SCCs and clear scc_id

Estimated Effort

Medium — ~100 lines.


Part 7 — Monitoring and Observability ✅ Done

Implemented in CYC-7 commit. pg_stat_stream_tables view now includes scc_id and last_fixpoint_iterations. pgt_status() returns scc_id. New pgt_scc_status() function exposes SCC topology and convergence metrics.

Surface cycle and convergence information in the monitoring infrastructure.

Update pg_stat_stream_tables View

Add columns: - scc_id INT — SCC identifier (NULL if not in a cycle) - last_fixpoint_iterations INT — number of iterations in the last fixed-point run

Update pgt_status()

Add column: - scc_id INT — which SCC this ST belongs to (NULL for non-cyclic)

New Monitoring Function: pgtrickle.pgt_scc_status()

pgtrickle.pgt_scc_status() → SETOF record(
    scc_id              INT,
    member_count        INT,
    members             TEXT[],
    last_iterations     INT,
    last_converged_at   TIMESTAMPTZ,
    is_monotone         BOOL
)

Files to Change

File Change
src/lib.rs Update pg_stat_stream_tables view
src/api.rs Update pgt_status(), add pgt_scc_status()
docs/SQL_REFERENCE.md Document new columns and function
src/monitor.rs Update Prometheus metrics if applicable

Estimated Effort

Medium — ~80 lines.


Part 8 — Documentation and E2E Tests ✅ Done

Implemented in CYC-8 commit. New tests/e2e_circular_tests.rs with 6 test scenarios covering all circular dependency paths. README limitations table updated.

E2E Tests

-- Test 1: Simple cycle with monotone queries (transitive closure)
CREATE TABLE edges (src INT, dst INT);
INSERT INTO edges VALUES (1,2), (2,3);

SET pg_trickle.allow_circular = true;

-- reach_a finds paths through reach_b
SELECT pgtrickle.create_stream_table('reach_a',
    'SELECT DISTINCT e.src, rb.dst
     FROM edges e
     INNER JOIN reach_b rb ON e.dst = rb.src',
    '1m', 'DIFFERENTIAL');

-- reach_b starts from edges and extends through reach_a
SELECT pgtrickle.create_stream_table('reach_b',
    'SELECT DISTINCT e.src, e.dst FROM edges e
     UNION ALL
     SELECT ra.src, e.dst
     FROM reach_a ra
     INNER JOIN edges e ON ra.dst = e.src',
    '1m', 'DIFFERENTIAL');

-- After refresh cycle converges: both contain transitive closure
-- reach_a: (1,3), (2,3) — or similar depending on exact split
-- reach_b: (1,2), (2,3), (1,3)

-- Test 2: Non-monotone cycle rejected
-- (aggregate in cycle member)

-- Test 3: Convergence within max_iterations

-- Test 4: Non-convergence hits max_iterations → ERROR status

-- Test 5: Drop member breaks cycle → scc_id cleared

-- Test 6: allow_circular=false rejects cycles (default behavior preserved)

Documentation

  • Add “Circular Dependencies” section to docs/ARCHITECTURE.md
  • Add pg_trickle.max_fixpoint_iterations and pg_trickle.allow_circular to docs/CONFIGURATION.md
  • Add pgt_scc_status() to docs/SQL_REFERENCE.md
  • Add note to README.md Limitations table: Circular dependencies | ✅ Supported | Monotone queries only; opt-in via pg_trickle.allow_circular

Files to Change

File Change
tests/e2e_circular_tests.rs New file — E2E tests for circular dependencies
docs/ARCHITECTURE.md Add circular dependency section
docs/CONFIGURATION.md Document new GUCs
docs/SQL_REFERENCE.md Document pgt_scc_status() and new columns
README.md Update Limitations table

Estimated Effort

Medium — ~150 lines of tests + ~100 lines of documentation.


Execution Order

Part Description Depends On Complexity
1 Tarjan’s SCC algorithm Medium
2 Monotonicity checker Low
3 Catalog changes Low
4 GUC configuration Low
5 Scheduler fixed-point iteration Parts 1, 3, 4 High
6 Creation-time validation Parts 1, 2, 3, 4 Medium
7 Monitoring and observability Parts 3, 5 Medium
8 Documentation and E2E tests All above Medium

Recommended order: Parts 1-4 (independent, can be done in parallel) → Part 5 → Part 6 → Part 7 → Part 8

Total estimated effort: ~1000 lines of code + ~350 lines of tests + documentation.


Risk Assessment

Risk Severity Mitigation
Non-convergence causes resource exhaustion High Hard cap via max_fixpoint_iterations GUC; ERROR status on non-convergence
Users accidentally create cycles Medium allow_circular = false by default; explicit opt-in required
Non-monotone cycle slips past checker Medium Conservative checker — reject anything with Aggregate, Except, Window in cycle
Performance regression for non-cyclic graphs Low Tarjan’s is O(V+E) like Kahn’s; singleton SCCs follow the existing fast path
FULL mode in cycles never converges Medium Reject FULL mode for cycle members at creation time
SCC recomputation on drop is expensive Low Only recompute when dropping a member of a known SCC (scc_id IS NOT NULL)

Alternatives Considered

Alternative 1: Rewrite as a single WITH RECURSIVE query

Instead of supporting inter-table cycles, automatically rewrite mutually dependent stream tables into a single stream table with a WITH RECURSIVE CTE. This avoids the complexity of fixed-point iteration in the scheduler.

Rejected because: This changes the user’s desired topology. Users may want separate stream tables for independent scheduling, permissions, or operational isolation. It also requires complex query rewriting to merge two defining queries into one recursive CTE.

Alternative 2: Allow cycles but only with FULL refresh

Refresh all members of a cycle using FULL mode, comparing old vs. new results to detect convergence. This avoids the requirement for DIFFERENTIAL mode.

Rejected because: FULL refresh of N tables × M iterations is extremely expensive. The entire point of cycles is to propagate incremental changes, which requires DIFFERENTIAL mode.

Alternative 3: Async fixed-point with separate worker

Spawn a dedicated background worker per cyclic SCC that iterates independently of the main scheduler.

Deferred: This is a valid optimization for large systems with many SCCs, but adds significant complexity (worker lifecycle management, inter-worker coordination). The initial implementation should use the existing scheduler with synchronous iteration.


Implementation Status

Part Description Status
1 Tarjan’s SCC algorithm ✅ Done (v0.6.0)
2 Monotonicity checker ✅ Done (v0.6.0)
3 Catalog changes ✅ Done (v0.6.0)
4 GUC configuration ✅ Done (v0.6.0)
5 Scheduler fixed-point iteration ✅ Done (v0.7.0)
6 Creation-time validation ❌ Not started
7 Monitoring and observability ❌ Not started
8 Documentation and E2E tests ❌ Not started