Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,11 @@ jobs:
PGPASSWORD=pgque_test psql -h localhost -U postgres -d pgque_test \
-v ON_ERROR_STOP=1 -f sql/pgque.sql

- name: Run two-session receive lock harness
run: |
PGQUE_TEST_DSN=postgresql://postgres:pgque_test@localhost:5432/pgque_test \
tests/two_session_receive_lock.sh

- name: Run regression tests
run: |
PGPASSWORD=pgque_test psql -h localhost -U postgres -d pgque_test \
Expand Down
39 changes: 39 additions & 0 deletions build/transform.sh
Original file line number Diff line number Diff line change
Expand Up @@ -607,6 +607,45 @@ END {

echo "PASS: get_batch_cursor SECURITY header injected (extra_where is trusted SQL)"

# Serialize concurrent receive()/next_batch_custom() calls for the same
# (queue, consumer) cursor. Upstream PgQ allowed two sessions to read
# sub_batch = NULL concurrently and allocate distinct batch IDs; the later
# UPDATE overwrote the first batch. Locking the subscription row makes the
# second session re-read the committed active batch and return idempotently.
NEXT_BATCH_FILE="${OUTPUT_DIR}/functions/pgque.next_batch.sql"
python3 - "${NEXT_BATCH_FILE}" <<'PYPATCH'
from pathlib import Path
import sys
p = Path(sys.argv[1])
s = p.read_text()
old = """ and s.sub_queue = q.queue_id
and s.sub_consumer = c.co_id;
"""
new = """ and s.sub_queue = q.queue_id
and s.sub_consumer = c.co_id
for update of s;
"""
if old not in s:
raise SystemExit('next_batch_custom subscription lookup not found')
s = s.replace(old, new, 1)
marker = """begin
select s.sub_queue, s.sub_consumer, s.sub_id, s.sub_batch,
"""
comment = """begin
-- PgQue transformation: serialize same-consumer receive()/next_batch_custom()
-- calls by locking the subscription cursor row before reading sub_batch.
-- The second session blocks here, then re-reads the active batch_id and
-- returns idempotently instead of allocating a second batch (#97/#125).
select s.sub_queue, s.sub_consumer, s.sub_id, s.sub_batch,
"""
if marker not in s:
raise SystemExit('next_batch_custom begin marker not found')
s = s.replace(marker, comment, 1)
p.write_text(s)
PYPATCH

echo "PASS: next_batch_custom locks subscription row for concurrent receive"

# -- Assembly: build sql/pgque.sql ------------------------------------

echo ""
Expand Down
2 changes: 2 additions & 0 deletions docs/reference.md
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,8 @@ Grant: `pgque_reader`. Source: `sql/pgque-api/receive.sql`.
select * from pgque.receive('orders', 'processor', 100);
```

**Single-worker-per-consumer contract.** Each consumer name is a single cursor that advances through ticks in sequence. Only one worker should call `receive()` for a given `(queue, consumer)` pair at a time. Concurrent calls for the same consumer are serialised internally via a row-level lock on the subscription cursor: the second caller blocks until the first finishes, then observes the already-open batch and returns it unchanged. Running two workers under the same consumer name does not provide parallelism — it provides redundancy at best (both workers receive the same batch) and is unsupported. Use distinct consumer names for parallel workers on the same queue; they each get an independent cursor over all events (fan-out).

**Batch-ownership caveat.** `max_return` limits the number of rows returned to the caller, but `ack(batch_id)` advances the consumer cursor past the entire underlying batch. If `max_return < ticker_max_count`, calling `ack()` after a partial receive will drop the unreturned rows from the consumer's perspective. Either consume the full batch before acking, or use `max_return >= ticker_max_count` for safe pagination.

#### `pgque.ack(batch_id bigint) → integer`
Expand Down
7 changes: 6 additions & 1 deletion sql/pgque-api/cooperative_consumers.sql
Original file line number Diff line number Diff line change
Expand Up @@ -245,6 +245,10 @@ declare
cons_id integer;
sub_role text;
begin
-- Serialize same-consumer legacy receive()/next_batch_custom() calls by
-- locking the subscription cursor row before reading sub_batch. This keeps
-- the cooperative override's non-coop path aligned with the transformed
-- PgQ base function above (#97/#125).
select
s.sub_queue,
s.sub_consumer,
Expand Down Expand Up @@ -283,7 +287,8 @@ begin
and t2.tick_id = s.sub_next_tick
where
q.queue_name = i_queue_name
and c.co_name = i_consumer_name;
and c.co_name = i_consumer_name
for update of s;
if not found then
errmsg := 'Not subscriber to queue: '
|| coalesce(i_queue_name, 'NULL')
Expand Down
8 changes: 8 additions & 0 deletions sql/pgque-api/receive.sql
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,14 @@ begin
end $$;

-- pgque.receive() -- wraps next_batch + get_batch_events
--
-- Single-worker-per-consumer contract:
-- Each (queue, consumer) pair is a single cursor. Concurrent calls for
-- the same consumer are serialised by FOR UPDATE inside
-- pgque.next_batch_custom() (pgque.sql). The second caller blocks until
-- the first commits, then observes the open batch_id and returns it
-- without opening a new one. Two workers under the same consumer name
-- do not get parallelism; use distinct consumer names for fan-out.
create or replace function pgque.receive(
i_queue text, i_consumer text, i_max_return int default 100)
returns setof pgque.message as $$
Expand Down
22 changes: 20 additions & 2 deletions sql/pgque-tle.sql
Original file line number Diff line number Diff line change
Expand Up @@ -2179,6 +2179,10 @@ declare
sub_id integer;
cons_id integer;
begin
-- PgQue transformation: serialize same-consumer receive()/next_batch_custom()
-- calls by locking the subscription cursor row before reading sub_batch.
-- The second session blocks here, then re-reads the active batch_id and
-- returns idempotently instead of allocating a second batch (#97/#125).
select s.sub_queue, s.sub_consumer, s.sub_id, s.sub_batch,
t1.tick_id, t1.tick_time, t1.tick_event_seq,
t2.tick_id, t2.tick_time, t2.tick_event_seq
Expand All @@ -2197,7 +2201,8 @@ begin
where q.queue_name = i_queue_name
and c.co_name = i_consumer_name
and s.sub_queue = q.queue_id
and s.sub_consumer = c.co_id;
and s.sub_consumer = c.co_id
for update of s;
if not found then
errmsg := 'Not subscriber to queue: '
|| coalesce(i_queue_name, 'NULL')
Expand Down Expand Up @@ -5427,6 +5432,14 @@ begin
end $$;

-- pgque.receive() -- wraps next_batch + get_batch_events
--
-- Single-worker-per-consumer contract:
-- Each (queue, consumer) pair is a single cursor. Concurrent calls for
-- the same consumer are serialised by FOR UPDATE inside
-- pgque.next_batch_custom() (pgque.sql). The second caller blocks until
-- the first commits, then observes the open batch_id and returns it
-- without opening a new one. Two workers under the same consumer name
-- do not get parallelism; use distinct consumer names for fan-out.
create or replace function pgque.receive(
i_queue text, i_consumer text, i_max_return int default 100)
returns setof pgque.message as $$
Expand Down Expand Up @@ -5807,6 +5820,10 @@ declare
cons_id integer;
sub_role text;
begin
-- Serialize same-consumer legacy receive()/next_batch_custom() calls by
-- locking the subscription cursor row before reading sub_batch. This keeps
-- the cooperative override's non-coop path aligned with the transformed
-- PgQ base function above (#97/#125).
select
s.sub_queue,
s.sub_consumer,
Expand Down Expand Up @@ -5845,7 +5862,8 @@ begin
and t2.tick_id = s.sub_next_tick
where
q.queue_name = i_queue_name
and c.co_name = i_consumer_name;
and c.co_name = i_consumer_name
for update of s;
if not found then
errmsg := 'Not subscriber to queue: '
|| coalesce(i_queue_name, 'NULL')
Expand Down
22 changes: 20 additions & 2 deletions sql/pgque.sql
Original file line number Diff line number Diff line change
Expand Up @@ -2091,6 +2091,10 @@ declare
sub_id integer;
cons_id integer;
begin
-- PgQue transformation: serialize same-consumer receive()/next_batch_custom()
-- calls by locking the subscription cursor row before reading sub_batch.
-- The second session blocks here, then re-reads the active batch_id and
-- returns idempotently instead of allocating a second batch (#97/#125).
select s.sub_queue, s.sub_consumer, s.sub_id, s.sub_batch,
t1.tick_id, t1.tick_time, t1.tick_event_seq,
t2.tick_id, t2.tick_time, t2.tick_event_seq
Expand All @@ -2109,7 +2113,8 @@ begin
where q.queue_name = i_queue_name
and c.co_name = i_consumer_name
and s.sub_queue = q.queue_id
and s.sub_consumer = c.co_id;
and s.sub_consumer = c.co_id
for update of s;
if not found then
errmsg := 'Not subscriber to queue: '
|| coalesce(i_queue_name, 'NULL')
Expand Down Expand Up @@ -5339,6 +5344,14 @@ begin
end $$;

-- pgque.receive() -- wraps next_batch + get_batch_events
--
-- Single-worker-per-consumer contract:
-- Each (queue, consumer) pair is a single cursor. Concurrent calls for
-- the same consumer are serialised by FOR UPDATE inside
-- pgque.next_batch_custom() (pgque.sql). The second caller blocks until
-- the first commits, then observes the open batch_id and returns it
-- without opening a new one. Two workers under the same consumer name
-- do not get parallelism; use distinct consumer names for fan-out.
create or replace function pgque.receive(
i_queue text, i_consumer text, i_max_return int default 100)
returns setof pgque.message as $$
Expand Down Expand Up @@ -5719,6 +5732,10 @@ declare
cons_id integer;
sub_role text;
begin
-- Serialize same-consumer legacy receive()/next_batch_custom() calls by
-- locking the subscription cursor row before reading sub_batch. This keeps
-- the cooperative override's non-coop path aligned with the transformed
-- PgQ base function above (#97/#125).
select
s.sub_queue,
s.sub_consumer,
Expand Down Expand Up @@ -5757,7 +5774,8 @@ begin
and t2.tick_id = s.sub_next_tick
where
q.queue_name = i_queue_name
and c.co_name = i_consumer_name;
and c.co_name = i_consumer_name
for update of s;
if not found then
errmsg := 'Not subscriber to queue: '
|| coalesce(i_queue_name, 'NULL')
Expand Down
Loading
Loading