Contents
AGENTS.md — Development Guidelines for dbt-pg-trickle
Project Overview
dbt package providing a stream_table custom materialization for the
pg-trickle PostgreSQL extension. Pure Jinja
SQL macros — no Python adapter code. Works with stock dbt-postgres.
- Language: Jinja2 SQL (dbt macros)
- Framework: dbt Core ≥ 1.9, ≤ 1.10, dbt-postgres adapter
- Target database: PostgreSQL 18 with pg_trickle extension
- Package name:
dbt_pg_trickle - License: Apache 2.0
Key docs: plans/PLAN_DBT_MACRO.md · README.md · CHANGELOG.md
Directory Layout
macros/
├── materializations/
│ └── stream_table.sql # Core custom materialization — entry point for dbt run
├── adapters/
│ ├── create_stream_table.sql # Wraps pgtrickle.create_stream_table()
│ ├── create_or_replace_stream_table.sql # Wraps pgtrickle.create_or_replace_stream_table() (≥ 0.6.0)
│ ├── alter_stream_table.sql # Wraps pgtrickle.alter_stream_table()
│ ├── drop_stream_table.sql # Wraps pgtrickle.drop_stream_table()
│ └── refresh_stream_table.sql # Wraps pgtrickle.refresh_stream_table()
├── hooks/
│ └── source_freshness.sql # Source freshness via pg_stat_stream_tables
├── operations/
│ ├── refresh.sql # Manual refresh run-operation (single table)
│ ├── refresh_all.sql # Refresh all stream tables in dependency order
│ ├── drop_all.sql # Drop all stream tables run-operation
│ └── check_cdc_health.sql # CDC health check run-operation
└── utils/
├── stream_table_exists.sql # Check catalog for stream table existence
├── get_stream_table_info.sql # Read stream table metadata from catalog
├── stream_table_status.sql # Health status (healthy/stale/erroring/paused)
└── has_create_or_replace.sql # Detect pg_trickle ≥ 0.6.0
tests/
└── generic/
└── stream_table_healthy.sql # Generic dbt test for stream table health
integration_tests/ # Standalone dbt project for testing
├── dbt_project.yml
├── profiles.yml # Postgres → localhost:5432
├── packages.yml # Installs parent via local: ../
├── models/ # Test stream table models
├── seeds/ # CSV fixtures
└── tests/ # Data tests / assertions
Workflow — Always Do This
After any macro change, run the integration test suite:
# The just command handles Docker, Python venv, and dbt installation:
just test-dbt-fast # fast (reuses existing E2E Docker image)
just test-dbt # full (rebuilds Docker image from source)
The test runner (integration_tests/scripts/run_dbt_tests.sh) automatically:
- Creates a dedicated .venv-dbt virtual environment using Python 3.13
(dbt-core 1.10 requires Python ≤3.13; mashumaro dependency incompatible with 3.14)
- Installs dbt-core~=1.10 and dbt-postgres~=1.10 into that venv
- Adds the Homebrew-keg PostgreSQL bin/ to PATH so psql is available
To run dbt commands manually after just test-dbt-fast (while the container is
still up), activate the venv first:
bash
source .venv-dbt/bin/activate
cd dbt-pgtrickle/integration_tests
dbt deps && dbt run && dbt test
After changes to the materialization logic, also test the full-refresh and alter paths:
dbt run --full-refresh # Test drop/recreate
sleep 5
dbt test
Before committing, verify:
1. All integration tests pass
2. No Jinja compilation errors (dbt compile succeeds)
3. Macro naming is consistent with conventions below
Jinja SQL Conventions
Naming
- Macros:
pgtrickle_<action>prefix (e.g.,pgtrickle_create_stream_table) - Materialization:
stream_table(registered via{% materialization stream_table, adapter='postgres' %}) - Config keys: snake_case (e.g.,
stream_table_name,refresh_mode) - Files: One macro per file, filename matches macro name (without
pgtrickle_prefix for adapters)
Quoting and Safety
- Always use
dbt.string_literal()to quote user-provided string values passed to SQL - Never concatenate raw user input into SQL strings
- Guard
run_query()calls with{% if execute %}to prevent parse-time execution
Whitespace Control
- Use
{%- ... -%}(trim) on variable assignments:{%- set foo = config.get('bar') -%} - Use
{{ ... }}(no trim) on output expressions that produce SQL - Put
{# comments #}on separate lines for readability
Error Handling
- Use
exceptions.raise_compiler_error()for fatal errors (missing config, invalid values) - Use
log("pg-trickle: ...", info=true)for informational messages - Check
run_query()results before accessing.rows:sql {% set result = run_query(query) %} {% if result and result.rows | length > 0 %} {{ return(result.rows[0]) }} {% else %} {{ return(none) }} {% endif %} - Return
none(not empty dict) when a stream table is not found
Materialization Structure
The stream_table materialization must handle exactly four cases:
1. Full refresh + exists: Drop, then create
2. Does not exist: Create
3. Exists + query changed: Drop, then create
4. Exists + query unchanged: Alter (schedule/mode only) or no-op
Always call adapter.cache_new() after creating a new relation.
Always call run_hooks(pre_hooks) / run_hooks(post_hooks).
Always return {'relations': [target_relation]}.
DDL Transaction Safety (CRITICAL)
Never use run_query() for DDL calls to pgtrickle.* functions.
dbt wraps each model’s main statement in an implicit BEGIN … ROLLBACK on the
model’s connection. run_query() reuses the same connection, so the DDL is
silently rolled back when the main transaction ends — even when the model
reports OK.
Instead, use {% call statement(..., auto_begin=False, fetch_result=False) %}
with explicit BEGIN; … COMMIT; embedded in the SQL:
{% call statement('pgtrickle_create', auto_begin=False, fetch_result=False) %}
BEGIN;
SELECT pgtrickle.create_stream_table(...);
COMMIT;
{% endcall %}
This guarantees the DDL commits unconditionally regardless of what dbt’s connection state is.
Rule: Every dbt-pgtrickle/macros/adapters/*.sql macro that mutates the
database (create, drop, alter, refresh) must use this pattern.
Config Defaults
| Key | Default | Valid Values |
|---|---|---|
schedule |
'1m' |
pg-trickle duration: 30s, 1m, 5m, 1h, etc. |
refresh_mode |
'DIFFERENTIAL' |
'DIFFERENTIAL', 'FULL' |
initialize |
true |
true, false |
pg-trickle SQL API
All functions live in the pgtrickle schema. The extension must be loaded via
shared_preload_libraries = 'pg_trickle'.
| Function | Purpose |
|---|---|
pgtrickle.create_stream_table(name, query, schedule, refresh_mode, initialize) |
Create a new stream table |
pgtrickle.alter_stream_table(name, schedule, refresh_mode, status) |
Alter config (cannot change query) |
pgtrickle.drop_stream_table(name) |
Drop stream table and all resources |
pgtrickle.refresh_stream_table(name) |
Synchronous manual refresh |
Catalog: pgtrickle.pgt_stream_tables (metadata), pgtrickle.pg_stat_stream_tables (monitoring view)
Testing
Test tiers
| Tier | What | Command |
|---|---|---|
| Compile check | Jinja syntax, macro resolution | dbt compile |
| Integration tests | Full lifecycle against real PG + pg-trickle | cd integration_tests && dbt seed && dbt run && dbt test |
| Full-refresh path | Drop/recreate behavior | dbt run --full-refresh && dbt test |
| Operations test | Manual refresh, drop-all | dbt run-operation refresh --args '{model_name: ...}' |
Test infrastructure
- Integration tests require PostgreSQL 18 with pg-trickle extension
- Build the Docker image:
docker build -t pg-trickle-e2e:latest -f tests/Dockerfile.e2e .(from the pg-trickle repo) - Start:
docker run -d -e POSTGRES_PASSWORD=postgres -p 5432:5432 pg-trickle-e2e:latest - Create extension:
psql -U postgres -c "CREATE EXTENSION pg_trickle;"
Test naming
- Data tests:
assert_<what_is_tested>.sql(e.g.,assert_totals_correct.sql) - Schema tests: defined in
schema.ymlalongside model definitions
Do’s and Don'ts
Do
- Prefix all macros with
pgtrickle_(except the materialization itself) - Use
dbt.string_literal()for all string parameters to pg-trickle functions - Guard all
run_query()calls with{% if execute %} - Log actions with
{{ log("pg-trickle: ...", info=true) }} - Handle the “stream table not found” case gracefully (return
none) - Test both create and full-refresh paths
- Keep macros focused — one pg-trickle function per adapter macro
Don’t
- Don’t use
adapter.dispatch()— this package is PostgreSQL-only - Don’t call pg-trickle functions outside of macro wrappers
- Don’t use
run_query()without{% if execute %}guard - Don’t modify the
__pgt_row_idcolumn — it’s managed by pg-trickle - Don’t assume
ALTERcan change the defining query — it can’t; must drop/recreate - Don’t use
printlnorprint— uselog()for dbt-compatible output - Don’t add Python code — this is a macro-only package (Option A)
Commit Messages
Follow conventional commits:
feat: add stream_table materialization
fix: handle schema-qualified names in existence check
test: add full-refresh integration test
docs: update config reference in README
chore: update dbt version constraint
Code Review Checklist
- [ ] Macro names follow
pgtrickle_convention - [ ] All string params use
dbt.string_literal() - [ ] All
run_query()calls guarded by{% if execute %} - [ ] Materialization handles all four lifecycle cases
- [ ] Integration tests pass (create, test, full-refresh, test)
- [ ] Log messages are informative and prefixed with
pg-trickle: - [ ] No raw SQL concatenation of user input
- [ ] CHANGELOG.md updated