Skip to content

feat(aggregator): backfill admin route + auth-gated admin surface#979

Merged
ascorbic merged 8 commits into
mainfrom
feat/aggregator-backfill
May 10, 2026
Merged

feat(aggregator): backfill admin route + auth-gated admin surface#979
ascorbic merged 8 commits into
mainfrom
feat/aggregator-backfill

Conversation

@ascorbic
Copy link
Copy Markdown
Collaborator

What does this PR do?

Cold-start backfill primitive for the aggregator + auth gate on the existing /_admin/* routes. Fourth slice on the same workstream as #971 / #972 / #975.

Operator hits POST /_admin/backfill with { "dids": ["did:plc:...", ...] }, the worker resolves each DID's PDS, calls com.atproto.repo.listRecords for every NSID in WANTED_COLLECTIONS, and batches the results onto the existing Records Queue. The consumer's verification + write + idempotency machinery handles the rest — same code path as live Jetstream, distinguished only by trigger.

Both /_admin/start and /_admin/backfill now require Authorization: Bearer <ADMIN_TOKEN>. The token is declared via the new secrets.required config property in wrangler.jsonc, which gives:

  • typed env.ADMIN_TOKEN: string from wrangler types
  • deploy-time validation that the secret is set
  • production set via wrangler secret put ADMIN_TOKEN
  • tests bind a stub via miniflare in vitest.config.ts

Three commits — the foundational PR plus two rounds of adversarial-review fixes:

  1. feat(aggregator): backfill admin route + auth-gated admin surface — the substantive piece. backfill.ts with backfillDids / backfillDid / backfillCollection and an explicit parseRkeyFromUri validator. Admin auth via requireAdminAuth(request, env) doing constant-time compare. runBackfill constructs a fresh DidResolver per invocation (with the production D1-backed cache), iterates DIDs serially, batches enqueues per page, logs per-DID progress so an operator watching wrangler tail sees where a long backfill is up to. isPlainObject extracted from three duplicate copies into apps/aggregator/src/utils.ts. .dev.vars.example added for new-contributor onboarding. Round-1 adversarial-review findings folded in: max-page guard + cursor-equality check + DID list size cap + per-page records cap + sendBatch instead of sequential send + 404 first-page-only + lax DID validation tightened + duplicate-DID dedup + jetstreamRecord no longer set on backfill jobs (consumer's DLQ payload would otherwise mislabel listRecords data) + stricter rkey validation against the atproto rkey grammar.

  2. fix(aggregator): round 2 review fixes for backfill + admin auth — second review pass found 1 HIGH (sendBatch cap mismatch) + 2 MEDIUM (Bearer case-sensitivity, unbounded enqueue cost) + 5 LOW. Fixes covered: MAX_RECORDS_PER_PAGE lowered from 200 → 100 to match Cloudflare Queues' sendBatch cap (the test stub accepted any size, hiding the production divergence); RFC 6750 §2.1 case-insensitive Bearer scheme; new MAX_TOTAL_ENQUEUE = 50_000 per-invocation cap shared via an EnqueueBudget object so a leaked admin token can't pump arbitrary queue cost; /_admin/start now requires POST (was accepting any method, contradicting its own documented usage); per-collection enqueued counter computed via before - budget.remaining delta so mid-pagination throws don't undercount; tightened wrangler.jsonc secrets comment about per-named-environment inheritance.

  3. fix(aggregator): round 3 review fixes for backfill — third review pass found 1 HIGH (no listRecords timeout) + 2 MEDIUM + 2 LOW + 1 NIT. Fixes covered: AbortController + 15s default timeout on the listRecords fetch (matching pds-verify.ts's pattern; a hung PDS would otherwise have starved every DID after it in the serial loop); EnqueueLimitReached now appends a sentinel error so partial-DID logs don't look like clean small publishers; budget-cap test asserts exact count (was slack <= 30, would have accepted off-by-one regressions); whitespace-only ADMIN_TOKEN now rejected (trim().length === 0 fails closed); summary log distinguishes didsRequested vs didsProcessed so the early-abort case is visible without correlating against an earlier warn line; the unreachable defense-in-depth check that would have silently dropped records on misuse replaced with a module-load static guard.

A round-4 review hasn't run; the round-3 fixes are minimal and the prior two rounds were each substantial. Convergence trajectory: round 1 → 12 bugs, round 2 → 8, round 3 → 6 (only one a behavior bug). Happy to spin another if you want belt-and-braces.

Closes #

Type of change

Checklist

AI-generated code disclosure

  • This PR includes AI-generated code — model/tool: Claude Opus 4.7 (1M context)

Test output

@emdash-cms/aggregator         Test Files  7 passed (7)   Tests 136 passed (136)
[rest of workspace unchanged]

Notes for review

Pre-deploy operator workflow. The secrets.required declaration means wrangler deploy will refuse to ship without an ADMIN_TOKEN. Operator workflow is:

wrangler secret put ADMIN_TOKEN              # paste a long random string
wrangler deploy
curl -X POST -H "Authorization: Bearer $ADMIN_TOKEN" https://api.emdashcms.com/_admin/start
curl -X POST -H "Authorization: Bearer $ADMIN_TOKEN" -H "Content-Type: application/json" \
  -d '{"dids":["did:plc:..."]}' https://api.emdashcms.com/_admin/backfill

.dev.vars.example shows the local-dev shape for new contributors.

Why no Constellation-driven discovery (no-explicit-DIDs path). Documented in the plan and in code comments: live discovery is Jetstream's job — the consumer writes known_publishers opportunistically on any record event for a new DID. Backfill exists for the cold-start gap (publishers who published before the aggregator was listening) and for operator-triggered recovery after a known outage. Constellation is a follow-up if/when periodic re-discovery becomes valuable; v1 doesn't need it.

Why no reconciliation cron. Earlier drafts of the plan called for a 6-hour listRecords-based reconciliation pass; removed after stress-testing the actual cases. Backfill (operator-triggered) covers the same diff-and-tombstone work for any DID list. If we discover production reasons for periodic automation later, the cost is ~30 lines + a last_reconciliation_at cursor in ingest_state — reversible cheaply. Plan §"Why no reconciliation cron" has the full rationale.

Slice 1 remaining. After this merges: read API (5 aggregator XRPC endpoints + cached com.atproto.sync.getRecord) + wiring EMDASH_REGISTRY_URL at deploy. That's it for Slice 1.

ascorbic added 3 commits May 10, 2026 16:37
Cold-start discovery primitive. Operator hits POST /_admin/backfill with
a JSON list of DIDs; the worker resolves each DID's PDS endpoint, calls
com.atproto.repo.listRecords for every NSID in WANTED_COLLECTIONS, and
batches the results onto the existing Records Queue. The consumer's
verification + write + idempotency machinery handles the rest — same
code path as live Jetstream, distinguished only by trigger.

Live discovery is still Jetstream's job; this exists for the cold-start
gap (publishers who published before the aggregator was listening) and
for operator-initiated recovery after a known outage. There is no
periodic scheduler — see plan §"Why no reconciliation cron".

ADMIN AUTH

Both admin routes (/_admin/start and /_admin/backfill) now require
`Authorization: Bearer <ADMIN_TOKEN>`. Wrangler's `secrets.required`
declares the token in config, so:
- `wrangler types` generates a typed `env.ADMIN_TOKEN: string`
- `wrangler deploy` refuses to ship if the secret isn't set
- Production sets via `wrangler secret put ADMIN_TOKEN`
- Tests bind a stub via vitest miniflare bindings (test-admin-token)
Constant-time compare to avoid leaking length/prefix via timing.

Backfill specifically is what makes this auth gate non-optional:
unauthenticated, did:web resolution turns the worker into an
attacker-controlled HTTP fetcher (caller picks both the DID-doc URL
and the resulting PDS endpoint). Same gate on /_admin/start for
symmetry — anyone with the token can do operational things.

DEFENSES IN BACKFILL

Adversarial review (round 1) found bugs that needed fixing before
shipping. All addressed:

- BLOCKER: pagination loop had no max-page guard. A PDS that echoes
  the same cursor pinned the worker indefinitely. Now capped at
  MAX_PAGES_PER_COLLECTION (1000) plus a cursor-equality check that
  catches the common case in 2 iterations.
- MAJOR: dids[] and per-page records had no size cap. Both capped now
  (MAX_BACKFILL_DIDS=1000, MAX_RECORDS_PER_PAGE=200) so a single POST
  can't flood the queue.
- MAJOR: per-record sequential queue.send() couldn't possibly finish
  a 4000-record DID inside the waitUntil budget. Switched to
  sendBatch (one batch per page).
- MAJOR: runBackfill summary log only fired at the end; if waitUntil
  exhausted mid-loop the operator saw nothing. Added per-DID logs
  via onDidComplete callback + try/catch wrapping the whole runner
  so any throw surfaces as "[aggregator] backfill aborted".
- MINOR: 404 mid-pagination silently truncated; now distinguished
  from first-page-404 (which legitimately means "no records of this
  collection") and surfaced as a partial-failure error.
- MINOR: parseBackfillBody only checked startsWith("did:"). Now
  validates against DID_PATTERN and dedupes the list via Set.
- MINOR: parseRkeyFromUri accepted any non-empty string after the
  expected slashes. Now validates against the atproto rkey grammar
  ([A-Za-z0-9._:~-]{1,512}) so injection-shaped rkeys are skipped.
- MINOR: backfill jobs were setting jetstreamRecord, which the
  consumer's DLQ payload field would have mislabeled as
  Jetstream-supplied data. Dropped the field on backfill jobs.
- NIT: isPlainObject was duplicated 3x; extracted to src/utils.ts.

128 tests pass (was 112). Typecheck clean. Zero lint diagnostics.

The remaining adversarial-review items I decided to skip:
- Three near-duplicate MapDidDocCache test stubs across 3 files —
  flagged as NIT, deferred to its own refactor PR.
- Constellation-driven discovery (no-explicit-DIDs path). Documented
  in the plan as intentional Slice 1 scope: explicit DIDs is the
  primary path; Constellation is a follow-up if needed.
Round 2 found a HIGH (sendBatch cap mismatch), 2 MEDIUM (Bearer
case-sensitivity, unbounded enqueue cost), and several LOWs. All
addressed.

HIGH

- MAX_RECORDS_PER_PAGE was 200 (PAGE_SIZE * 2) but Cloudflare Queues'
  sendBatch hard-caps at 100. A real PDS that loosely interpreted the
  ?limit= query and returned 101+ records would have made our
  sendBatch throw, dropping the entire collection's records via the
  per-collection try/catch. The in-memory CapturingQueue test stub
  accepted any size, so the test suite didn't catch the production
  divergence. Lowered the cap to 100 so a compliant page maps 1:1 to
  one batch send; oversize pages throw with a clear "exceeding
  per-page cap" message rather than the queue swallowing them. Added
  a defense-in-depth check at the sendBatch site too.

MEDIUM

- Bearer scheme was matched case-sensitively. RFC 6750 §2.1 makes
  the auth scheme case-insensitive; `curl -H "authorization: bearer
  ..."` with lowercase scheme would have got a 401 with a valid
  token. Now lowercase-compare the prefix.

- Unbounded enqueue cost from a single authenticated POST. Worst
  case theoretical 400M queue messages per request (1000 DIDs × 4
  collections × 1000 pages × 100 records = ~$160 in queue billing).
  Added MAX_TOTAL_ENQUEUE = 50_000 as a per-invocation cap, shared
  across DIDs via a budget object. Reaching the cap throws
  EnqueueLimitReached; the outer loop catches and stops processing
  further DIDs, logs a warning so operators see partial completion.
  Bounds blast radius if ADMIN_TOKEN ever leaks.

LOW

- `/_admin/start` accepted any HTTP method, contradicting its own
  documented POST usage. Now requires POST (returns 405 with
  `allow: POST` on other methods), matching `/_admin/backfill`.

- Per-collection enqueued counter undercounted on mid-pagination
  throws. Switched from `enqueued += await backfillCollection(...)`
  (which loses the partial count when the await rejects) to
  tracking via the shared budget object's `before - after` delta,
  computed in both the success and the error paths.

- wrangler.jsonc `secrets.required` comment overstated the
  enforcement guarantees. Updated to be accurate about the
  `wrangler dev` warning + `wrangler deploy` block + the
  per-named-environment inheritance gotcha. Documents the runtime
  503 fail-closed behavior so misconfigured deploys can't leave
  admin routes unauth-passable.

- Added .dev.vars.example for new contributors. Local devs who
  don't create .dev.vars otherwise hit a confusing 503 on admin
  routes; example file makes the requirement obvious.

NEW TESTS

- Enqueue budget cap: budget of 30 stops the loop after 30 records
  enqueued.
- POST-only on /_admin/start: GET returns 405; POST + valid token
  returns 204.
- Case-insensitive Bearer: lowercase scheme accepted.

131 tests pass (was 128). Typecheck clean. Zero lint diagnostics in
apps/aggregator.

NOT FIXED

- worker-configuration.d.ts is committed (round-2 LOW). Repo-wide
  convention; left for a separate Discussion rather than changed
  here.
Round 3 found 1 HIGH (no listRecords timeout), 2 MEDIUM
(EnqueueLimitReached observability + slack budget-cap test), 2 LOW
(whitespace-only ADMIN_TOKEN, log discrepancy on early-abort), 1 NIT
(unreachable defense-in-depth check that drops records when triggered).
All addressed.

HIGH

- listRecords fetch had no timeout. A hung publisher PDS that accepted
  the connection but stalled the body would have blocked
  `await fetchImpl(...)` until workerd's overall sub-request budget
  exhausted, starving every DID after it in the serial loop. pds-verify
  uses an AbortController for the same kind of upstream call;
  inconsistent. Now wraps the listRecords fetch in an
  AbortController + 15s default timeout (override via
  `BackfillDeps.listRecordsTimeoutMs` for tests). On AbortError, throws
  with cause attached so per-collection error log includes the
  underlying signal.

MEDIUM

- EnqueueLimitReached returned without recording an error in
  `BackfillDidResult.errors`. Operator's per-DID log showed
  `enqueued: 47, errors: []` for a half-processed DID — looked like
  a successful 47-record publisher. Now appends a sentinel error
  ("enqueue cap reached mid-DID at collection X — partial backfill,
  re-run to complete") so partial completion is visible. Also added
  triggerDid to the warn log in `backfillDids` so operators can see
  WHICH DID drained the budget.

- Budget-cap test asserted `<= 30`, which would silently accept
  off-by-one regressions in the inner break (capping at 29 or 0
  would still pass). The exact answer is 30 (budget=30 + page of
  50 → inner break at iter 31, sendBatch sends 30, post-send throw
  fires). Now asserts `toBe(30)`. Also asserts the sentinel error
  appears.

LOW

- Whitespace-only ADMIN_TOKEN was accepted. `if (!expected)` catches
  "" and undefined but accepts "   " (operator pastes a blank line
  into `wrangler secret put`). Now `expected.trim().length === 0`
  also fails closed.

- `runBackfill` summary log conflated didCount (requested) with
  didsProcessed (actual). When the cap fires and the loop breaks
  early, the log said `didCount: 1000, totalEnqueued: 50000` even
  if only 200 DIDs ran. Now logs both `didsRequested` and
  `didsProcessed` separately so the early-abort case is visible
  in the summary line without correlating against an earlier warn.

NIT

- Defense-in-depth `if (messages.length > QUEUE_SEND_BATCH_CAP)`
  inside the page loop was unreachable given
  `MAX_RECORDS_PER_PAGE === QUEUE_SEND_BATCH_CAP === 100`, AND if
  it ever fired (future bump of MAX_RECORDS_PER_PAGE), it would
  drop the entire page silently into errors[]. Replaced with a
  module-load static guard that throws at boot if the invariant
  is violated — better failure mode (boot crash vs silent data
  loss) and the runtime check disappears.

NEW TESTS

- listRecords timeout: hung-fetch stub never resolves, abort fires
  at 25ms, error message matches "timed out after 25ms".
- Case-insensitive Bearer regression guards: canonical "Bearer",
  uppercase "BEARER", non-Bearer "Basic" rejected, empty token
  after prefix rejected.

136 tests pass (was 131). Typecheck clean. Zero lint diagnostics in
apps/aggregator.

CONVERGENCE NOTE

Round 1 → 12 bugs. Round 2 → 1 HIGH + 2 MED + 5 LOW = 8 bugs.
Round 3 → 1 HIGH + 2 MED + 2 LOW + 1 NIT = 6 findings, only the
HIGH was a behavior bug; rest were quality / observability /
test-strength. Reviewer assessed convergence is real but not
complete; only the HIGH would have blocked. The slice has now had
three rounds with diminishing severity and no new bugs introduced
by prior fix rounds.
Copilot AI review requested due to automatic review settings May 10, 2026 16:25
@cloudflare-workers-and-pages
Copy link
Copy Markdown

cloudflare-workers-and-pages Bot commented May 10, 2026

Deploying with  Cloudflare Workers  Cloudflare Workers

The latest updates on your project. Learn more about integrating Git with Workers.

Status Name Latest Commit Updated (UTC)
✅ Deployment successful!
View logs
emdash-i18n 6e2e24a May 10 2026, 06:13 PM

@cloudflare-workers-and-pages
Copy link
Copy Markdown

cloudflare-workers-and-pages Bot commented May 10, 2026

Deploying with  Cloudflare Workers  Cloudflare Workers

The latest updates on your project. Learn more about integrating Git with Workers.

Status Name Latest Commit Updated (UTC)
✅ Deployment successful!
View logs
emdash-perf-coordinator 6e2e24a May 10 2026, 06:13 PM

@changeset-bot
Copy link
Copy Markdown

changeset-bot Bot commented May 10, 2026

⚠️ No Changeset found

Latest commit: 6e2e24a

Merging this PR will not cause a version bump for any packages. If these changes should not result in a new version, you're good to go. If these changes should result in a version bump, you need to add a changeset.

This PR includes no changesets

When changesets are added to this PR, you'll see the packages that this PR includes changesets for and the associated semver types

Click here to learn what changesets are, and how to add one.

Click here if you're a maintainer who wants to add a changeset to this PR

@cloudflare-workers-and-pages
Copy link
Copy Markdown

cloudflare-workers-and-pages Bot commented May 10, 2026

Deploying with  Cloudflare Workers  Cloudflare Workers

The latest updates on your project. Learn more about integrating Git with Workers.

Status Name Latest Commit Updated (UTC)
✅ Deployment successful!
View logs
docs 6e2e24a May 10 2026, 06:14 PM

@cloudflare-workers-and-pages
Copy link
Copy Markdown

cloudflare-workers-and-pages Bot commented May 10, 2026

Deploying with  Cloudflare Workers  Cloudflare Workers

The latest updates on your project. Learn more about integrating Git with Workers.

Status Name Latest Commit Updated (UTC)
✅ Deployment successful!
View logs
emdash-demo-cache 6e2e24a May 10 2026, 06:14 PM

@cloudflare-workers-and-pages
Copy link
Copy Markdown

cloudflare-workers-and-pages Bot commented May 10, 2026

Deploying with  Cloudflare Workers  Cloudflare Workers

The latest updates on your project. Learn more about integrating Git with Workers.

Status Name Latest Commit Updated (UTC)
✅ Deployment successful!
View logs
emdash-playground 6e2e24a May 10 2026, 06:14 PM

@github-actions
Copy link
Copy Markdown
Contributor

Scope check

This PR changes 1,481 lines across 9 files. Large PRs are harder to review and more likely to be closed without review.

If this scope is intentional, no action needed. A maintainer will review it. If not, please consider splitting this into smaller PRs.

See CONTRIBUTING.md for contribution guidelines.

@github-actions
Copy link
Copy Markdown
Contributor

PR template validation failed

Please fix the following issues by editing your PR description:

See CONTRIBUTING.md for the full contribution policy.

@pkg-pr-new
Copy link
Copy Markdown

pkg-pr-new Bot commented May 10, 2026

Open in StackBlitz

@emdash-cms/admin

npm i https://pkg.pr.new/@emdash-cms/admin@979

@emdash-cms/auth

npm i https://pkg.pr.new/@emdash-cms/auth@979

@emdash-cms/blocks

npm i https://pkg.pr.new/@emdash-cms/blocks@979

@emdash-cms/cloudflare

npm i https://pkg.pr.new/@emdash-cms/cloudflare@979

emdash

npm i https://pkg.pr.new/emdash@979

create-emdash

npm i https://pkg.pr.new/create-emdash@979

@emdash-cms/gutenberg-to-portable-text

npm i https://pkg.pr.new/@emdash-cms/gutenberg-to-portable-text@979

@emdash-cms/x402

npm i https://pkg.pr.new/@emdash-cms/x402@979

@emdash-cms/plugin-ai-moderation

npm i https://pkg.pr.new/@emdash-cms/plugin-ai-moderation@979

@emdash-cms/plugin-atproto

npm i https://pkg.pr.new/@emdash-cms/plugin-atproto@979

@emdash-cms/plugin-audit-log

npm i https://pkg.pr.new/@emdash-cms/plugin-audit-log@979

@emdash-cms/plugin-color

npm i https://pkg.pr.new/@emdash-cms/plugin-color@979

@emdash-cms/plugin-embeds

npm i https://pkg.pr.new/@emdash-cms/plugin-embeds@979

@emdash-cms/plugin-forms

npm i https://pkg.pr.new/@emdash-cms/plugin-forms@979

@emdash-cms/plugin-webhook-notifier

npm i https://pkg.pr.new/@emdash-cms/plugin-webhook-notifier@979

commit: 6e2e24a

Copy link
Copy Markdown
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Adds an operator-triggered cold-start backfill primitive to the aggregator Worker and gates the existing admin surface behind a Bearer token secret, aligning deployment/runtime config, production wiring, and workers-pool tests.

Changes:

  • Introduces POST /_admin/backfill to resolve DIDs → PDS, listRecords across WANTED_COLLECTIONS, and enqueue RecordsJobs with pagination + defensive caps.
  • Auth-gates /_admin/start and /_admin/backfill via Authorization: Bearer <ADMIN_TOKEN> and adds secrets.required + test bindings for the secret.
  • Extracts shared isPlainObject helper and adds comprehensive backfill/admin-route test coverage.

Reviewed changes

Copilot reviewed 8 out of 9 changed files in this pull request and generated 3 comments.

Show a summary per file
File Description
apps/aggregator/wrangler.jsonc Declares ADMIN_TOKEN as a required secret for deploy-time validation/typegen.
apps/aggregator/worker-configuration.d.ts Updates generated Env typings to include ADMIN_TOKEN.
apps/aggregator/vitest.config.ts Binds a stub ADMIN_TOKEN in Miniflare for workers-pool tests.
apps/aggregator/test/backfill.test.ts Adds backfill pagination/defense tests + admin auth/method/input validation tests.
apps/aggregator/src/utils.ts Adds shared isPlainObject helper.
apps/aggregator/src/records-consumer.ts Reuses shared isPlainObject instead of an inline duplicate.
apps/aggregator/src/index.ts Adds auth-gated /_admin/start + new /_admin/backfill route and production runBackfill wiring.
apps/aggregator/src/backfill.ts Implements backfill core (DID loop, collection pagination, enqueue budgeting, rkey validation, timeouts).
apps/aggregator/.dev.vars.example Documents local dev ADMIN_TOKEN shape for contributors.

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Comment thread apps/aggregator/src/index.ts Outdated
Comment on lines +61 to +68
* Constant-time string equality. Use over `===` for any compare against a
* secret to avoid leaking length/prefix information through timing channels.
*/
function timingSafeEqual(a: string, b: string): boolean {
if (a.length !== b.length) return false;
let diff = 0;
for (let i = 0; i < a.length; i++) {
diff |= a.charCodeAt(i) ^ b.charCodeAt(i);
Comment on lines +295 to +303
const body: unknown = await response.json();
const records = extractListRecordsBody(body);
if (records.length > MAX_RECORDS_PER_PAGE) {
throw new Error(
`PDS returned ${records.length} records, exceeding per-page cap of ${MAX_RECORDS_PER_PAGE}`,
);
}
cursor = extractCursor(body);

Comment thread apps/aggregator/src/backfill.ts Outdated
Comment on lines +363 to +375
function parseRkeyFromUri(uri: string, expectedCollection: string): string | null {
const expectedPrefix = `at://`;
if (!uri.startsWith(expectedPrefix)) return null;
const tail = uri.slice(expectedPrefix.length);
const slash1 = tail.indexOf("/");
if (slash1 < 0) return null;
const slash2 = tail.indexOf("/", slash1 + 1);
if (slash2 < 0) return null;
const collection = tail.slice(slash1 + 1, slash2);
if (collection !== expectedCollection) return null;
const rkey = tail.slice(slash2 + 1);
if (!RKEY_PATTERN.test(rkey)) return null;
return rkey;
ascorbic added 2 commits May 10, 2026 18:28
…tUntil cap

The previous backfill ran one giant serial loop inside the POST handler's
`ctx.waitUntil`, which Cloudflare hard-caps at 30 seconds wall-clock.
Realistically that limited a single backfill POST to ~15-25 DIDs before
in-flight work was cancelled, regardless of per-DID work being well-bounded.

Restructured into a dedicated `BACKFILL_QUEUE` of `{did, collection}` jobs.
The POST handler now just discovers DIDs (via relay or explicit list) and
fans pairs onto the queue via `sendBatch` — fast and trivially fits in
waitUntil. A new per-pair consumer (`backfill-consumer.ts`) does the
listRecords pagination + records-queue fan-out work asynchronously, with
queue retry/backoff/concurrency for free.

Cap changes for the new fan-out threat model:
- `MAX_BACKFILL_DIDS` lowered 1000 → 100 (each submitted DID becomes
  `WANTED_COLLECTIONS.length` consumer invocations, so the explicit-list
  ceiling needs to be much lower than the old serial-loop cap).
- New `MAX_DISCOVERED_DIDS = 1000` defends discovery against a runaway
  relay flooding the queue.
- Dropped the global `MAX_TOTAL_ENQUEUE` budget — it doesn't translate
  across consumer invocations. Per-pair work is still bounded by
  `MAX_PAGES_PER_COLLECTION` × `MAX_RECORDS_PER_PAGE`.
- Parallelize `enqueueBackfillJobs` sendBatch calls. Serial awaits over
  ~40 batches (worst case from `MAX_DISCOVERED_DIDS` × `WANTED_COLLECTIONS`)
  ate noticeably into the POST handler's 30s waitUntil budget on top of
  discovery's own fetches.
- Drop backfill consumer `max_batch_size` from 5 → 1. Sequential
  processing inside a batch could chain `LIST_RECORDS_TIMEOUT_MS = 15s`
  per page × multiple messages and blow the consumer wall-clock budget.
  At backfill rates (operator-triggered, not steady-state), throughput
  doesn't matter; the simpler invariant does.
- Add `drainBackfillDeadLetterBatch` consumer for the backfill DLQ.
  Mirrors the records-DLQ pattern (log loud + ack) so messages don't
  silently accumulate. No D1 forensics row — the operator's recovery
  action is "re-trigger backfill for the affected DID", which only
  needs the (did, collection) pair from the log line.
- Fix stale comment in the backfill route handler that referenced the
  deleted `backfillDids` orchestrator.
- Update `MAX_BACKFILL_DIDS` comment to acknowledge that
  `MAX_DISCOVERED_DIDS` is the actually-larger ceiling — the explicit
  path is the tighter "operator types it themselves" bucket by design.
Copy link
Copy Markdown
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Copilot reviewed 10 out of 11 changed files in this pull request and generated 4 comments.

Comment thread apps/aggregator/src/index.ts Outdated
Comment on lines +77 to +84
* Constant-time string equality. Use over `===` for any compare against a
* secret to avoid leaking length/prefix information through timing channels.
*/
function timingSafeEqual(a: string, b: string): boolean {
if (a.length !== b.length) return false;
let diff = 0;
for (let i = 0; i < a.length; i++) {
diff |= a.charCodeAt(i) ^ b.charCodeAt(i);
Comment thread apps/aggregator/src/backfill.ts Outdated
Comment on lines +403 to +405
if (!isPlainObject(body)) return [];
const records = body["records"];
if (!Array.isArray(records)) return [];
Comment thread apps/aggregator/src/backfill.ts Outdated
Comment on lines +425 to +434
* and validate it against the atproto rkey grammar. Returns null if any
* step fails; callers treat that as "this isn't a record we recognise"
* and skip the entry without aborting the page.
*/
function parseRkeyFromUri(uri: string, expectedCollection: string): string | null {
const expectedPrefix = `at://`;
if (!uri.startsWith(expectedPrefix)) return null;
const tail = uri.slice(expectedPrefix.length);
const slash1 = tail.indexOf("/");
if (slash1 < 0) return null;
Comment on lines +20 to +24
* The DLQ is intentionally not auto-drained today — backfill is operator-
* triggered, so DLQ inspection is part of the operator's workflow when a
* backfill POST shows partial completion in `wrangler tail`. A drain
* consumer can land later when we have a clear ack policy (probably:
* write a row to D1 and ack, like the records-DLQ drain).
ascorbic added 3 commits May 10, 2026 18:48
Three Copilot review comments on PR #979 — all pointing at hand-rolled
code that should be using library functions:

- Swap hand-rolled `timingSafeEqual` (XOR loop) for workerd's audited
  `crypto.subtle.timingSafeEqual`. Update the comment to be honest about
  the residual length-leak (the primitive returns false on length
  mismatch, so the strict zero-leak guarantee only holds against
  same-length inputs — acceptable here because ADMIN_TOKEN's length is
  fixed at deploy time and any realistic length-via-timing attack
  requires more requests than other defences would tolerate).

- Swap hand-rolled `parseRkeyFromUri` for `parseCanonicalResourceUri`
  from `@atcute/lexicons/syntax`. Library validates the DID, NSID, and
  rkey grammars in one call. Adds an explicit DID-equality check so a
  buggy/malicious PDS that returns records under another publisher's
  DID is dropped at source rather than churning dead-letters in the
  records consumer.

- Make `extractListRecordsBody`/`extractCursor` throw on malformed
  bodies instead of silently returning `[]`/`undefined`. A 200 response
  with the wrong shape is upstream breakage and the operator-triggered
  integrity path should surface that loudly. Throws propagate out via
  the queue consumer's retry path.

Also adds tests for the new strict validation paths and the
DID-mismatch rejection.
`@atcute/lexicons/syntax` exports the canonical `isDid` validator.
Two files in this package were each carrying their own copy of
`/^did:[a-z]+:[A-Za-z0-9._%:-]+$/` — looser than the library
(which is length-bounded 7-2048 and forbids trailing `:` or `%`)
and not type-narrowing.

Replaces both:
- `did-resolver.ts`: drops the local regex + `isDid` wrapper, imports
  the library version.
- `index.ts` (backfill admin route): drops the local regex, calls the
  library directly in `parseBackfillBody`.

Existing tests cover the validation behavior; library is at least as
strict as the prior pattern so nothing legitimate gets newly rejected.
- `pds-verify.isAtprotoDid` now composes with library `isDid` from
  `@atcute/lexicons/syntax` before checking the method prefix. The
  previous bare `startsWith` check accepted strings like `did:plc:`
  (empty body) — defensive in practice (the resolver upstream rejects
  malformed DIDs first), but the library call carries weight at
  verification time as a second-line check.
- `backfill-consumer.ts` module docstring still claimed the backfill
  DLQ "is intentionally not auto-drained today" even though the
  drain consumer (`drainBackfillDeadLetterBatch`) was added to this
  file in the previous review-fix round. Updated to describe what's
  actually shipped.
@ascorbic ascorbic merged commit a44fb7f into main May 10, 2026
35 of 36 checks passed
@ascorbic ascorbic deleted the feat/aggregator-backfill branch May 10, 2026 18:25
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants