1. Another queuing system?

What makes Qgres unique is that it is simple.

What makes most queuing systems complex is they allow for concurrent publishers and subscribers on a single queue. That creates all kinds of headaches and race conditions.

Qgres simplifies things by forcing per-queue serialization of either publishing or consuming. That makes it simple to either track subscribers (if publishing is serialized), or to allow multiple publishers (if only one consumer can remove from the queue at once).

Obviously, there are significant drawbacks to this design. Qgres is not meant to solve every queuing need. If you need something more sophisticated in Postgres, I suggest you check out PgQ; it’s much more sophisticated … and more complicated ;).

2. Installation

The easiest way to install Qgres is to use the PGXN client.

To load Qgres into a database named "database_name":

pgxn load -d database_name qgres

Alternatively, you can use pgxn to install the extension on the server, and then lead it using CREATE EXTENSION:

pgxn install qgres
psql
CREATE EXTENSION qgres;

If you have this repository checked out, you can use GNU make:

gmake
gmake test
sudo gmake install

You will need to use CREATE EXTENSION to load the extension into your database.

3. Queue Types

It is very important that you choose the right type of queue for your needs, especially since once you define a queue you can’t change it’s type.

Note
Queue choice is a bit more complicated than whether you think you need simultaneous publishers or consumers.

3.1. Serial Publisher

The key attribute of Serial Publisher (or SP) queues is that queue entries happen in a guaranteed order. Entries are tragged with an sequence number that is guaranteed to always increase one at a time per item. This is perfect for ensuring that you always know the exact order of events in the queue, regardless of things like clock skew (and trust me, timestamps are a horrible way to try and do this). This guaranteed monotonic ordering makes it extremely simple to find a range of events (using int8range), or to do things like find every tenth event (using % 10). You can also have as many subscribers as you want; Qgres will track what entries have or have not been seen by each subscriber.

The downside is that only one transaction can publish an entry in a queue at a time. If another transaction has called "Publish"() before you do, your "Publish"() will block until the other transaction finishes. This restriction is PER QUEUE.

INFO: It wouldn’t be too hard to support "partitioned queues", where items being published to a single "logical queue" are broken down further into multiple partitions. Each partition would, in effect, be it’s own queue.

3.2. Serial Remover

Serial Remover (or SR) queues are the opposite of SP queues: there is no ordering to entries, because everyone can add() into a queue at the same time. This allows for a very high rate of entry creation.

There are two downsides to SR queues. One is that only a single transaction may consume from a SC queue at a time. If another transaction has called "Remove"() before you do, your "Consume"() will block until the other transaction finishes. This restriction is PER QUEUE.

The other restriction is that there is absolutely no guarantee of what order consumers will see entries in. They can be consumed in literally any order.

What about "Job/Batch queues"?

The challenge with a queue that’s meant to hold workitems for later processing by automated batches is that typically you want multiple consumers, but with each item being seen by only one consumer. That’s relatively easy to do in Postgres 9.5 and newer, thanks to SELECT ... FOR UPDATE SKIP LOCKED, so this will probably be supported in the future.

A note on syntax conventions

To facilitate usage awareness (or context), Qgres provides different functions for each Queue Type. For example, you "Publish"() to SP queues, but you simply add() into SR queues. Using this distinctive syntax makes it easier to remember when you’re coding what type of queue you’re using.

4. Queue entries

Queue entries can contain text, binary (bytea), and/or JSONB data. The queue storage table has all 3 fields, so any combination may be used on any given entry.

Note
In the future I hope to add support for custom types.

5. Performance

Qgres is not meant to be a high-performance queuing system. Currently it simply stores all entries in a single table, so if you push it too hard you’ll end up with perforance problems. How much performance you can get will depend on hardware, the rate of add() and consume() ("Publish"() and "Remove"() tend to be self-limiting), the number of unseen entries in all the queues, and whether the database has any long-running transactions (because those interfere with vacuum). Decent hardware might be able to do hundreds of entries per second (and a much higher burst, at least on SR queues), but thousands per second is probably asking for trouble.

Note
There are some storage tricks that can be added in the future to improve performance.

6. LISTEN/NOTIFY

Not currently supported, but probably in the future.

7. Command Reference

7.1. Roles

There are 3 roles defined for queue management and usage. These should be granted to other roles to allow them to interact with queues.

qgres__queue_manage

Grants EXECUTE on queue management functions

qgres__queue_insert

Grants EXECUTE on event creation functions

qgres__queue_delete

Grants EXECUTE on event consumption functions

7.2. Queue Management Commands

7.2.1. queue__create(queue_name, queue_type) RETURNS queue_id int

Creates a new queue. Returned ID is guaranteed immutable for the life of the queue. Queue manager role only.

queue_name

Name for the queue, case-insensitive.

queue_type

Type of queue, must be one of Serial Publisher, SP, Serial Remover, or SR.

7.2.2. queue__drop(queue_name, [force])

Drops a queue. Queue manager role only.

queue_name

name of queue

queue_id

id of queue

[force]

if true, drop the queue even if it still has entries

7.2.3. queue__get(queue_name) RETURNS queue

7.2.4. queue__get(queue_id) RETURNS queue

7.2.5. queue__get_id(queue_name) RETURNS queue_id

Returns all information about a queue (or only the queue_id for queue__get_id()). Throws an error if the queue doesn’t exist.

See also: VIEW queue.

7.3. Serial Provider Commands

These commands only work on SP queues.

7.3.1. consumer__register(queue_name, consumer_name)

Registers a new consumer. SP queues track what entries have been seen on a consumer-by-consumer basis. Entries will not be removed until seen by all registered consumers, so don’t leave consumers un-attended! Queue delete role only.

queue_name

Name of queue, case-insensitive.

consumer_name

Name for consumer, case-insensitive.

Note
Unlike queues, there doesn’t seem to be much need for renaming consumers, or providing an immutable ID. Drop me a line if you have a use case for it.

7.3.2. consumer__drop(queue_name, consumer_name)

Drops a consumer. Queue delete role only.

7.3.3. "Publish"({queue_name|queue_id}, [bytea], [jsonb], [text]) RETURNS sequence_number

7.3.4. "Publish"({queue_name|queue_id}, {bytea|jsonb|text}) RETURNS sequence_number

Creates a new entry in the queue. Returns the (bigint) sequence number for that entry in the queue, which is guaranteed to be unique, strictly increasing, and gapless within a single transaction.

Both versions accept queue_name OR queue_id.

The first version accepts any combination of bytea, jsonb or text, all of which are optional (the default value for each is NULL).

The second version accepts a single input value, determined by input type.

Queue insert role only.

Warning
If you pass in data that is of type "unknown" results are not guaranteed. If you’re not coming from a pre-defined field or variable, you should cast your input.

TODO: Allow a queue to specify what fields may or may not be used.

7.3.5. consume({queue_name|queue_id}, consumer_name, [limit]) RETURNS entry(sequence_number, bytea, jsonb, text)

Consumes entries from a queue. This has no effect on other consumers. Queue delete role only.

{queue_name|queue_id}

Queue name (case-insensitive) or queue ID

consumer_name

Consumer name (case-insensitive)

[limit]

If specified, consume() will return no more than limit rows

INFO: Unless this is called in a SERIALIZE or REPEATABLE READ transaction, it’s possible for the function to return no entries on one invocation, but return new entries on a subsequent call as other transactions commit. Entries will never be skipped over, though.

Note
This function also handles queue cleanup, by deleting any entries that have been seen by all other consumers.

7.4. Serial Remover Commands

These commands only work on SR queues.

7.4.1. add({queue_name|queue_id}, [bytea], [jsonb], [text]) RETURNS void

7.4.2. add({queue_name|queue_id}, {bytea|jsonb|text}) RETURNS void

Creates a new entry in the queue. Because there is no guaranteed ordering of SR queues no sequence number is returned. Queue insert role only.

Both versions accept queue_name OR queue_id.

The first version accepts any combination of bytea, jsonb or text, all of which are optional (the default value for each is NULL).

The second version accepts a single input value, determined by input type.

Warning
If you pass in data that is of type "unknown" results are not guaranteed. If you’re not coming from a pre-defined field or variable, you should cast your input.

TODO: Allow a queue to specify what fields may or may not be used.

7.4.3. "Remove"({queue_name|queue_id}, [limit]) RETURNS entry(bytea, jsonb, text)

Removes and returns entries from the queue. There is no guaranteed ordering. Under some conditions you might get entries back in the order in which they were inserted, but that should never be counted on. This is especially likely to happen if there are very few entries in the queue, such as when you’re testing. Queue delete role only.

Warning
There is no guaranteed blocking between multiple callers of this function. Theoretically, if Postgres happens to chose different plans separate backends could execute at the same time. A given queue entry is guaranteed to only be seen once though.
{queue_name|queue_id}

Queue name (case-insensitive) or queue ID

[limit]

If specified, "Remove"() will return multiple entries at once.