dbt-pgtrickle

A dbt package that integrates pg_trickle stream tables into your dbt project via a custom stream_table materialization.

No custom Python adapter required — works with the standard dbt-postgres adapter. Just Jinja SQL macros that call pg_trickle’s SQL API.

Prerequisites

Requirement Minimum Version
dbt Core ≥ 1.9
dbt-postgres adapter Matching dbt Core version
PostgreSQL 18.x
pg_trickle extension ≥ 0.1.0 (CREATE EXTENSION pg_trickle;)

Installation

Add to your packages.yml:

packages:
  - git: "https://github.com/<org>/pg-trickle.git"
    revision: v0.1.0
    subdirectory: "dbt-pgtrickle"

Then run:

dbt deps

Quick Start

Create a model with materialized='stream_table':

-- models/marts/order_totals.sql
{{
  config(
    materialized='stream_table',
    schedule='5m',
    refresh_mode='DIFFERENTIAL'
  )
}}

SELECT
    customer_id,
    SUM(amount) AS total_amount,
    COUNT(*) AS order_count
FROM {{ source('raw', 'orders') }}
GROUP BY customer_id
dbt run --select order_totals   # Creates the stream table
dbt test --select order_totals  # Tests work normally (it's a real table)

Configuration Reference

Key Type Default Description
materialized string Must be 'stream_table'
schedule string/null '1m' Refresh schedule (e.g., '5m', '1h', cron). null for pg_trickle’s CALCULATED schedule.
refresh_mode string 'DIFFERENTIAL' 'FULL' or 'DIFFERENTIAL'
initialize bool true Populate on creation
status string/null null 'ACTIVE' or 'PAUSED'. When set, applies on subsequent runs via alter_stream_table().
stream_table_name string model name Override stream table name
stream_table_schema string target schema Override schema

Project-level defaults

# dbt_project.yml
models:
  my_project:
    marts:
      +materialized: stream_table
      +schedule: '5m'
      +refresh_mode: DIFFERENTIAL

Operations

pgtrickle_refresh — Manual refresh

dbt run-operation pgtrickle_refresh --args '{"model_name": "order_totals"}'

refresh_all_stream_tables — Refresh all in dependency order

Refreshes all dbt-managed stream tables in topological (dependency) order. Upstream tables are refreshed before downstream ones. Designed for CI pipelines: run after dbt run and before dbt test to ensure all data is current.

# Refresh all dbt-managed stream tables
dbt run-operation refresh_all_stream_tables

# Refresh only stream tables in a specific schema
dbt run-operation refresh_all_stream_tables --args '{"schema": "analytics"}'

drop_all_stream_tables — Drop dbt-managed stream tables

Drops only stream tables defined as dbt models (safe in shared environments):

dbt run-operation drop_all_stream_tables

drop_all_stream_tables_force — Drop ALL stream tables

Drops everything from the pg_trickle catalog, including non-dbt stream tables:

dbt run-operation drop_all_stream_tables_force

pgtrickle_check_cdc_health — CDC pipeline health

dbt run-operation pgtrickle_check_cdc_health

Raises an error (non-zero exit) if any CDC source is unhealthy.

Freshness Monitoring

Native dbt source freshness is not supported (the last_refresh_at column lives in the catalog, not on the stream table). Use the pgtrickle_check_freshness run-operation instead:

# Check all active stream tables (defaults: warn=600s, error=1800s)
dbt run-operation pgtrickle_check_freshness

# Custom thresholds
dbt run-operation pgtrickle_check_freshness \
  --args '{model_name: order_totals, warn_seconds: 300, error_seconds: 900}'

Exits non-zero when any stream table exceeds the error threshold — safe for CI.

Useful dbt Commands

# List all stream table models
dbt ls --select config.materialized:stream_table

# Full refresh (drop + recreate)
dbt run --select order_totals --full-refresh

# Build models + tests in DAG order
dbt build --select order_totals

Note: dbt build runs stream table models early in the DAG. If downstream models depend on a stream table with initialize: false, the table may not be populated yet.

Testing

Stream tables are standard PostgreSQL heap tables — all dbt tests work normally:

models:
  - name: order_totals
    columns:
      - name: customer_id
        tests:
          - not_null
          - unique

Stream Table Health Test

Use the built-in stream_table_healthy generic test to fail your dbt test suite when a stream table is stale, erroring, or paused:

models:
  - name: order_totals
    tests:
      - dbt_pgtrickle.stream_table_healthy:
          warn_seconds: 300  # fail if stale for more than 5 minutes

The test queries pgtrickle.pg_stat_stream_tables and returns rows for any unhealthy condition. An empty result set means the stream table is healthy.

Stream Table Status Macro

For more programmatic control, use the pgtrickle_stream_table_status() macro directly in custom tests or run-operations:

{%- set st = dbt_pgtrickle.pgtrickle_stream_table_status('order_totals', warn_seconds=300) -%}
{# st.status is one of: 'healthy', 'stale', 'erroring', 'paused', 'not_found' #}
{# st.staleness_seconds, st.consecutive_errors, st.total_refreshes, etc. #}

__pgt_row_id Column

pg_trickle adds an internal __pgt_row_id column to stream tables for row identity tracking. This column:

  • Appears in SELECT * and dbt docs generate
  • Does not affect dbt test unless you check column counts
  • Can be documented to reduce confusion:
columns:
  - name: __pgt_row_id
    description: "Internal pg_trickle row identity hash. Ignore this column."

Limitations

Limitation Workaround
No in-place query alteration Materialization auto-drops and recreates when query changes
__pgt_row_id visible Document it; exclude in downstream SELECT
No native dbt source freshness Use pgtrickle_check_freshness run-operation
No dbt snapshot support Snapshot the stream table as a regular table
Query change detection is whitespace-sensitive dbt compiles deterministically; unnecessary recreations are safe
PostgreSQL 18 required Extension requirement
Shared version tags with pg_trickle extension Pin to specific git revision

Contributing

See AGENTS.md for development guidelines and the implementation plan for design rationale.

Running tests locally

The quickest way (requires Docker and dbt installed):

# Full run — builds Docker image, starts container, runs tests, cleans up
just test-dbt

# Fast run — reuses existing Docker image (run after first build)
just test-dbt-fast

Or use the script directly with options:

cd dbt-pgtrickle/integration_tests/scripts

# Default: builds image, runs tests with dbt 1.9, cleans up
./run_dbt_tests.sh

# Skip image rebuild (faster iteration)
./run_dbt_tests.sh --skip-build

# Keep the container running after tests (for debugging)
./run_dbt_tests.sh --skip-build --keep-container

# Use a custom port (avoids conflicts with local PostgreSQL)
PGPORT=25432 ./run_dbt_tests.sh

Manual testing against an existing pg_trickle instance

If you already have PostgreSQL 18 + pg_trickle running locally:

export PGHOST=localhost PGPORT=5432 PGUSER=postgres PGPASSWORD=postgres PGDATABASE=postgres
cd dbt-pgtrickle/integration_tests
dbt deps
dbt seed
dbt run
./scripts/wait_for_populated.sh order_totals 30
dbt test
dbt run-operation drop_all_stream_tables

License

Apache 2.0 — see LICENSE.