.#654
Conversation
Closes #650 ## What Adds per-query checkpoint persistence and an async rebuild path so query read models survive restarts without replaying the full event history. The HTTP server now binds before query rebuild completes — `/health` returns 200 immediately, `/ready` returns 200 after rebuild finishes. ### Changes - **`Service.Query.Subscriber`** — adds `rebuildFrom` with chunked read loop (1000 events per page), per-query checkpoint routing via `needsProcessing`, and a `ready :: ConcurrentVar Bool` field. The existing `rebuildAll` is preserved for backward compatibility. - **`Service.QueryObjectStore.Postgres`** — new Postgres-backed `QueryObjectStore` using hasql (stubbed — full Postgres implementation tracked in #651). - **`Service.Query.Checkpoint`** — new `CheckpointStore` trait with `getPositions`, `setPosition`, and `getMinPosition` (stubbed). - **Tests** — 53 new pending test cases across `PostgresSpec`, `CheckpointSpec`, and `SubscriberCheckpointSpec` covering rebuild, readiness, checkpoint CRUD, and per-query routing. ### ADR See [ADR-0059](docs/decisions/0059-query-rebuild-async-checkpoint.md) for the full design: persistent QueryObjectStore, per-query checkpoints, async rebuild, chunked reads, readiness endpoints, and observability. ### Future work - [#651: Periodic QueryObjectStore snapshots](#651) - [#652: Configurable TTL on checkpoints](#652) - [#653: Graceful rebuild cancellation on shutdown](#653) ### Hlint ``` No hints ```
WalkthroughThis PR implements persistent, chunked, checkpointed query rebuild: Postgres and in-memory CheckpointStore implementations, Postgres QueryObjectStore, checkpoint-aware Subscriber.rebuildFrom with a ready flag, async startup of rebuilds, tests, cabal exports, ADR-0059, and an e2e readiness test. ChangesCheckpoint-based Rebuild Infrastructure
Sequence Diagram(s) sequenceDiagram
participant Application
participant QuerySubscriber
participant CheckpointStore
participant EventStore
participant QueryObjectStore
Application->>QuerySubscriber: Async run rebuildFrom(startPos, CheckpointStore)
QuerySubscriber->>CheckpointStore: getPositions()
QuerySubscriber->>EventStore: readForward(from startPos, chunkSize)
EventStore->>QuerySubscriber: return chunk(events)
QuerySubscriber->>QueryObjectStore: updateQuery(instanceId, event) for needed queries
QuerySubscriber->>CheckpointStore: setPosition(queryName, globalPosition) per-chunk
Estimated code review effort🎯 4 (Complex) | ⏱️ ~45 minutes Possibly related issues
🚥 Pre-merge checks | ✅ 4 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (4 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches📝 Generate docstrings
🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Actionable comments posted: 9
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Inline comments:
In `@core/service/Service/Query/Checkpoint.hs`:
- Around line 29-36: The exported CheckpointStore is currently wired to panic
for getPositions, setPosition, and getMinPosition in the new function; replace
those latent panics with proper Task failures or a failed constructor so errors
surface as Task errors instead of crashing. In the new function, construct
CheckpointStore with implementations of getPositions, setPosition, and
getMinPosition that return Task.fail (or convert a Left error into Task via
Task.fromEither) carrying a descriptive error type/message (e.g.,
"CheckpointNotImplemented" or include connectionString), or change new to return
an immediate Task failure when the store cannot be created; reference the new
function and the CheckpointStore fields getPositions, setPosition, and
getMinPosition when making this change.
In `@core/service/Service/Query/Subscriber.hs`:
- Around line 129-137: rebuildFrom currently seeds rebuildChunkLoop with
Map.empty, which prevents using persisted per-query checkpoints; update
rebuildFrom (for QuerySubscriber) to load/hydrate the persisted checkpoint map
(per-query checkpoint state) before calling rebuildChunkLoop and pass that map
instead of Map.empty, and ensure rebuildChunkLoop still falls back to
defaultCheckpoint/StreamPosition (constructed from rawFromPosition) for any
queries missing in the hydrated map so resume semantics work per-query.
- Around line 131-137: subscriber.ready is set to False before calling
rebuildChunkLoop but only reset to True in the successful eventsReplayed == 0
branch, leaving readiness stuck False on any failure; modify the control flow so
ready is always reset on every exit (success, early-return, or exception). Wrap
the call to rebuildChunkLoop (or add a finalizer inside rebuildChunkLoop) with a
guaranteed cleanup using the Task/IO finalizer/bracket/`finally` equivalent so
that ConcurrentVar.modify (\_ -> True) is invoked no matter how the task
terminates; reference the symbols subscriber.ready, ConcurrentVar.modify,
rebuildChunkLoop and the eventsReplayed branch to locate where to add the
guaranteed reset.
- Around line 239-249: The Err branch in processUpdatersForEventAtIndex
currently logs and ignores updater failures (using updaterName and Err errText)
which allows readiness to be set later; change the error handling so failures
are recorded and prevent marking the query as ready: when Err errText occurs,
log the error (include errText and updaterName) and do NOT add updaterName to
successfulQueryNames, instead append or propagate a failure marker (e.g., a
failedQueryNames set or return a Result/Either from
processUpdatersForEventAtIndex). Then update the readiness check (the code that
inspects successfulQueryNames vs queryNamesNeedingProcessing, referenced around
the earlier readiness-marking block and lines ~187-193) to only set ready=true
when all required query names are present in successfulQueryNames and there are
no recorded failures. Ensure callers of processUpdatersForEventAtIndex handle
the new failure indicator so readiness is not reported when any updater failed.
In `@core/test-service/Service/Query/CheckpointSpec.hs`:
- Around line 11-75: The spec file currently leaves all tests pending; replace
each pending "not implemented" with real Hspec assertions that exercise the
Checkpoint API functions getPositions, setPosition, getMinPosition, and
needsProcessing: create a test Postgres fixture or in-memory DB setup, seed
checkpoints/events as described in each example (e.g., insert checkpoints at
positions 5/10/15, set then overwrite positions, test Int64 max, simulate lost
connection for error cases), call the corresponding function (getPositions,
setPosition, getMinPosition, needsProcessing) and assert expected results (Maps
containing entries, Just/Nothing, specific query names, deterministic outputs),
and include negative/error assertions for connection failures; ensure each it
block becomes executable and deterministic so the spec validates checkpoint
behavior before merging.
- Line 4: Replace the unqualified "import Core" with a qualified or explicit
import per repo rules: change to a qualified import such as "import qualified
Core as Core" (or list exactly the needed symbols with "import Core (..)" if
only specific names are used) and update any unqualified references in
Query.CheckpointSpec.hs to use the module qualifier (e.g., Core.someFunction) so
no unqualified Core identifiers remain; ensure the module compiles and adjust
any conflicting local names accordingly.
In `@core/test-service/Service/Query/SubscriberCheckpointSpec.hs`:
- Around line 11-53: The spec file contains many pending tests for rebuildFrom
and subscriber.ready: replace each pending "not implemented" with concrete Hspec
tests that invoke Service.Query.Subscriber.rebuildFrom and inspect
Service.Query.Subscriber.ready using controlled mocks/fakes; for each example
create a fake EventStore (returning empty stream, specific positions, chunks, or
errors) and stubbed QueryUpdater(s) (normal, throwing, logging) to assert
processed event counts, checkpoint advances, logged errors, idempotency (call
rebuildFrom at P then P+1 and assert at most one event processed), and readiness
transitions (ready True/False during and after rebuild, stays False on crash).
Reference the functions Subscriber.rebuildFrom and subscriber.ready and use the
existing test helpers/mocks in this module to simulate Task failures, head
position, and chunk sizes; ensure assertions check return values, checkpoints,
and that logging occurred where expected.
In `@core/test-service/Service/QueryObjectStore/PostgresSpec.hs`:
- Around line 11-66: The spec currently leaves every test pending; implement
executable tests in core/test-service/Service/QueryObjectStore/PostgresSpec.hs
replacing each pending with assertions that exercise the actual QueryObjectStore
Postgres implementation: write tests for get (cases "returns Just query when
instance exists", "returns Nothing when instance does not exist", "deserializes
unicode JSON correctly", and "returns StorageError on connection failure") by
creating a real test DB or using the test harness/fixture, inserting fixtures,
and asserting lookup/deserialize behaviors and proper StorageError when the
connection/pool is closed or mocked to fail; implement atomicUpdate tests for
"updates existing instance", "creates new instance when Nothing → Just",
"deletes instance via Just → Nothing", "returns SerializationError on invalid
JSON" and "serializes concurrent updates without lost updates" by calling the
atomicUpdate function, asserting returned and persisted states, simulating
invalid JSON to trigger SerializationError, and using concurrent threads
(forkIO/async) to perform parallel atomicUpdate calls and assert no lost
updates; implement delete tests ("deletes existing instance", "idempotent delete
of nonexistent instance", "returns StorageError on connection failure",
"recreate after delete restores the instance") by invoking delete and verifying
idempotency and recreation; implement getAll tests ("returns Array with 3
instances", "returns empty Array when no instances stored", "returns Array with
10000 instances", "returns StorageError on connection failure") by populating
the store with the required counts (use batching for 10k), verifying returned
arrays and handling of connection failures; ensure each test uses proper
setup/teardown (clean DB, transactions or isolated test schema), uses the real
functions get, atomicUpdate, delete, getAll, and asserts concrete results rather
than pending.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Path: .coderabbit.yaml
Review profile: ASSERTIVE
Plan: Pro
Run ID: 373b4a98-274e-4a9d-a1cd-d408e4fe3b9a
📒 Files selected for processing (9)
core/nhcore.cabalcore/service/Service/Query/Checkpoint.hscore/service/Service/Query/Subscriber.hscore/service/Service/QueryObjectStore/Postgres.hscore/test-service/Main.hscore/test-service/Service/Query/CheckpointSpec.hscore/test-service/Service/Query/SubscriberCheckpointSpec.hscore/test-service/Service/QueryObjectStore/PostgresSpec.hsdocs/decisions/0059-query-rebuild-async-checkpoint.md
| {-# OPTIONS_GHC -Wno-unused-imports #-} | ||
| module Service.Query.CheckpointSpec where | ||
|
|
||
| import Core |
There was a problem hiding this comment.
🛠️ Refactor suggestion | 🟠 Major | ⚡ Quick win
Use qualified imports instead of import Core.
This import style violates the repo’s Haskell import rule for core/** modules; switch to explicit/qualified imports.
As per coding guidelines, "Use qualified imports and module-qualified names; never use unqualified imports except for explicit imports like import Foo (Foo)."
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@core/test-service/Service/Query/CheckpointSpec.hs` at line 4, Replace the
unqualified "import Core" with a qualified or explicit import per repo rules:
change to a qualified import such as "import qualified Core as Core" (or list
exactly the needed symbols with "import Core (..)" if only specific names are
used) and update any unqualified references in Query.CheckpointSpec.hs to use
the module qualifier (e.g., Core.someFunction) so no unqualified Core
identifiers remain; ensure the module compiles and adjust any conflicting local
names accordingly.
No stubs — all CheckpointStore, QueryObjectStore.Postgres, and Subscriber operations are real implementations. - Postgres CheckpointStore with query_checkpoint table (get/set/min) - InMemory CheckpointStore for testing - Postgres QueryObjectStore with atomic transactional upsert - rebuildFrom hydrates persisted checkpoints, saves per-chunk - rebuildAll + rebuildFrom manage ready flag via Task.finally - Failed updater errors logged, checkpoint not advanced - Application runs rebuild via AsyncTask.run (transports bind immediately) - 26 new test cases: CheckpointSpec, SubscriberCheckpointSpec, PostgresSpec - E2E readiness hurl test in testbed Closes #650
There was a problem hiding this comment.
Actionable comments posted: 11
♻️ Duplicate comments (1)
core/service/Service/Query/Subscriber.hs (1)
218-230:⚠️ Potential issue | 🟠 Major | 🏗️ Heavy liftFailed updaters do not advance checkpoints—but readiness may still declare victory prematurely.
I note that failed updaters are excluded from
successfulQueryNames(lines 222-229), so their checkpoints won't advance and they'll retry in subsequent chunks. This is sound retry logic.However, if a particular updater fails persistently across all chunks, the final
ready=Trueis still set once no events remain. The system then reports readiness while that query projection may be incomplete or inconsistent.Consider tracking persistently failing query names and either:
- Preventing
ready=Trueif any query failed in the final chunk, or- Logging a warning that lists queries that never caught up.
This aligns with the past review comment that readiness should not be reported when projections are incomplete.
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@core/service/Service/Query/Subscriber.hs` around lines 218 - 230, Failed updaters are never added to successfulQueryNames so checkpoints don't advance, but readiness can still be set to true; modify the processing flow (rooted at processUpdatersForEventAtIndex and the variable successfulQueryNames) to thread a persistentFailure set/list through the recursion: on Err errText for updaterName add updaterName to persistentFailures (in addition to skipping successfulQueryNames), and when the final readiness decision is computed, check persistentFailures—if non-empty either set ready=False or emit a clear warning listing persistentFailures so readiness is not declared successful for incomplete projections. Ensure the new persistentFailures is updated wherever processUpdatersForEventAtIndex advances and consulted by the readiness determination logic.
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Inline comments:
In `@core/service/Service/Application.hs`:
- Around line 1157-1164: The AsyncTask.run block calling Subscriber.rebuildAll
and Subscriber.start currently swallows exceptions; change it to use the same
pattern as runWithAsync: wrap the async action in a try/catch (or equivalent)
that logs any exception via the application's logger and ensures failure does
not silently flip subscriber.ready to True (i.e., only set ready on successful
completion), so replace the plain AsyncTask.run { Subscriber.rebuildAll
subscriber; Subscriber.start subscriber } with a runWithAsync-style wrapper that
logs errors and preserves correct ready state.
In `@core/service/Service/Query/Checkpoint.hs`:
- Around line 139-144: The decoder for the MIN aggregate is currently using
Decoders.nonNullable which will fail when the table is empty; update the decoder
in getMinPositionImpl to use a nullable decoder (remove Decoders.nonNullable and
use Decoders.nullable or the appropriate nullable int8 decoder) so SQL NULL maps
to Nothing, and adjust the result handling so that a decoded Nothing yields
Task.yield Nothing and a Just pos yields Task.yield (Just (StreamPosition pos));
refer to the variables/constructs decoder, Decoders.rowMaybe, Decoders.column,
Decoders.int8, runPool, Session.statement and StreamPosition to locate and
update the code.
In `@core/service/Service/Query/Subscriber.hs`:
- Around line 97-98: The code creates defaultCheckpoint = StreamPosition
(rawFromPosition - 1) which can produce a negative StreamPosition when
fromPosition is StreamPosition 0; update Subscriber.rebuildFrom to guard/clamp
that subtraction: compute rawDefault = max 0 (rawFromPosition - 1) and construct
defaultCheckpoint via the smart constructor (or handle its Maybe) so you never
persist a negative position, and also add a defensive check in
CheckpointStore.setPosition to ignore/normalize negative inputs (use
StreamPosition.new or clamp to 0) to ensure persistence and comparisons are
never corrupted.
In `@core/service/Service/QueryObjectStore/Postgres.hs`:
- Line 176: The code is ignoring the composite primary key (query_name,
instance_id); update getImpl, atomicUpdateImpl, the upsert logic, and
deleteStatement to always include and filter by query_name derived from the
query type (e.g., via NameOf query) instead of only instance_id or hardcoding
'default'. Concretely: change the SQL in getImpl ("SELECT instance_id,
payload::text FROM queryobjectstore WHERE instance_id = $1") and
atomicUpdateImpl lock SQL to add "AND query_name = $N" and pass NameOf query as
the additional parameter, update the upsert to use the actual query_name value
instead of 'default', and make deleteStatement delete WHERE query_name = <NameOf
query> AND instance_id = <id>; ensure parameter ordering and binding in
functions getImpl, atomicUpdateImpl, upsert, and deleteStatement are adjusted
accordingly.
- Around line 240-248: The current reduce over rows silently drops failures from
Json.decodeText; update the reducer (the Array.reduce that processes rows |>
Array.fromLinkedList) to capture decode failures instead of swallowing them:
either log the decode error (include Json.decodeText error, row.rowPayload and
an identifier like row.rowId) inside the Err branch using your logging facility,
or change the function return to a Result-like shape (e.g., { successes: Array,
errors: Array } or Task (Result ...)) where the reducer appends Ok decodes to
successes and Err details to errors; ensure the final Task.yield returns the new
structure (or the logged-and-continued successes) so operators can detect
partial failures.
- Around line 214-219: The decode failure for existing rows is being
swallowed—when Json.decodeText fails you should not set currentValue to Nothing;
instead propagate a SerializationError back into the Hasql session so
atomicUpdate semantics hold. In Postgres.hs replace the Err _ -> Nothing branch
(around maybeRow / currentValue) with logic that constructs and returns/throws a
SerializationError (consistent with the module's Hasql error propagation
pattern) so updateFn never receives a spurious Nothing for corrupted rows;
reference Json.decodeText, currentValue, maybeRow, updateFn and
SerializationError when applying the change.
- Around line 150-154: The initializeTable routine currently always truncates
the queryobjectstore via truncateSql/truncateStmt and runPool
(Session.statement), which destroys persisted data; remove the unconditional
TRUNCATE or gate it behind an explicit reset flag parameter (e.g., add a Bool
resetOnStartup argument to initializeTable or read a config/reset function) and
only execute the runPool (Session.statement) truncateStmt when that flag is
true; ensure any callers of initializeTable are updated to pass the flag or
default to false so production startups do not erase the table.
In `@core/test-service/Service/Query/CheckpointSpec.hs`:
- Around line 126-128: Replace the no-op test body with a concrete assertion:
construct a query/state with an empty checkpoint list and call the module
function that decides where to resume (e.g., determineStartingCheckpoint /
computeStartPoint / getLatestCheckpoint in this test module); assert it
indicates "start from beginning" (e.g., returns Nothing or a StartFromBeginning
value) instead of a checkpoint. Update the test in the it "E3: No
checkpoints..." block to build the empty-checkpoint input, invoke that decision
function, and assert the expected start-from-beginning result.
In `@core/test-service/Service/Query/SubscriberCheckpointSpec.hs`:
- Around line 70-77: After calling Subscriber.rebuildFrom subscriber
checkpointStore (StreamPosition 0), replace the trailing pass with an assertion
that verifies no events were replayed; for example, assert subscriber is still
in a ready state or that the processed count remains zero by calling the
appropriate accessor on subscriber (e.g., Subscriber.ready or
Subscriber.processedCount) and comparing to True or 0 respectively so the test
fails if any events were processed. Ensure the assertion uses the existing
subscriber and checkpointStore values created via Subscriber.new and
CheckpointInMemory.new.
- Around line 44-45: The test titles in SubscriberCheckpointSpec (the string
literals passed to each it ...) are misleading and don't match the actual start
positions used by the test inputs; update each it description so it accurately
reflects the numeric start/checkpoint values used in the test body (e.g. change
"Start from position 100 with checkpoints — only events at >= 101 processed" to
the actual start value used (P) and the expected processed range (>= P+1), and
likewise fix the other mismatched it descriptions in the same spec so names
reflect the real scenarios executed by the code).
In `@core/test/Service/ApplicationSpec.hs`:
- Line 163: The test "rebuilds queries asynchronously after startup" in
ApplicationSpec.hs uses a fixed 100ms threadDelay (and similar sleeps at the
later location) which is fragile; replace the fixed sleep with a bounded-polling
or wait-with-timeout approach that repeatedly checks the observable condition
(e.g. the query-rebuild result or a flag/state exposed by the in-memory store)
until success or a short timeout elapses, or refactor the code under test to
emit a synchronization primitive (MVar/TMVar/STM/Async) and wait on that
instead; update both the test at "rebuilds queries asynchronously after startup"
and the other occurrence (lines ~191-193) to use the polling-with-timeout or
explicit notification rather than a fixed threadDelay so tests are robust across
slow CI and fast local runs.
---
Duplicate comments:
In `@core/service/Service/Query/Subscriber.hs`:
- Around line 218-230: Failed updaters are never added to successfulQueryNames
so checkpoints don't advance, but readiness can still be set to true; modify the
processing flow (rooted at processUpdatersForEventAtIndex and the variable
successfulQueryNames) to thread a persistentFailure set/list through the
recursion: on Err errText for updaterName add updaterName to persistentFailures
(in addition to skipping successfulQueryNames), and when the final readiness
decision is computed, check persistentFailures—if non-empty either set
ready=False or emit a clear warning listing persistentFailures so readiness is
not declared successful for incomplete projections. Ensure the new
persistentFailures is updated wherever processUpdatersForEventAtIndex advances
and consulted by the readiness determination logic.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Path: .coderabbit.yaml
Review profile: ASSERTIVE
Plan: Pro
Run ID: 4af5e287-d162-41d4-bbbc-ff7d51f36207
📒 Files selected for processing (11)
core/nhcore.cabalcore/service/Service/Application.hscore/service/Service/Query/Checkpoint.hscore/service/Service/Query/Checkpoint/InMemory.hscore/service/Service/Query/Subscriber.hscore/service/Service/QueryObjectStore/Postgres.hscore/test-service/Service/Query/CheckpointSpec.hscore/test-service/Service/Query/SubscriberCheckpointSpec.hscore/test-service/Service/QueryObjectStore/PostgresSpec.hscore/test/Service/ApplicationSpec.hstestbed/tests/readiness.hurl
| -- 7. Start async query rebuild and live subscription | ||
| -- The rebuild runs in the background so transports can bind immediately. | ||
| -- /health returns 200 right away; /ready returns 200 when subscriber.ready is True. | ||
| AsyncTask.run do | ||
| Subscriber.rebuildAll subscriber | ||
| Subscriber.start subscriber | ||
|
|
||
| pass |
There was a problem hiding this comment.
Silent failure in async rebuild—errors vanish into the void, mortal.
The async task spawned on lines 1160-1162 runs rebuildAll and start without any error handling. If either fails, the error is silently lost:
/healthreturns 200 (liveness succeeds)subscriber.readymay be setTrueviaTask.finallyeven on error/readywould then return 200 despite rebuild failure- No logs indicate the failure occurred
Consider wrapping with error logging (similar to runWithAsync on lines 1404-1409) so operators can diagnose startup failures:
🛡️ Suggested improvement
-- 7. Start async query rebuild and live subscription
AsyncTask.run do
- Subscriber.rebuildAll subscriber
- Subscriber.start subscriber
+ result <- do
+ Subscriber.rebuildAll subscriber
+ Subscriber.start subscriber
+ |> Task.asResult
+ case result of
+ Err err -> Log.critical [fmt|Query subscriber startup failed: #{err}|]
+ |> Task.ignoreError
+ Ok _ -> pass🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@core/service/Service/Application.hs` around lines 1157 - 1164, The
AsyncTask.run block calling Subscriber.rebuildAll and Subscriber.start currently
swallows exceptions; change it to use the same pattern as runWithAsync: wrap the
async action in a try/catch (or equivalent) that logs any exception via the
application's logger and ensures failure does not silently flip subscriber.ready
to True (i.e., only set ready on successful completion), so replace the plain
AsyncTask.run { Subscriber.rebuildAll subscriber; Subscriber.start subscriber }
with a runWithAsync-style wrapper that logs errors and preserves correct ready
state.
| let decoder = Decoders.rowMaybe (Decoders.column (Decoders.nonNullable Decoders.int8)) | ||
| let statement = Statement (Text.toBytes sql |> Bytes.unwrap) encoder decoder True | ||
| result <- runPool pool (Session.statement () statement) | ||
| case result of | ||
| Nothing -> Task.yield Nothing | ||
| Just pos -> Task.yield (Just (StreamPosition pos)) |
There was a problem hiding this comment.
🧩 Analysis chain
🏁 Script executed:
#!/bin/bash
set -euo pipefail
# Locate and print the relevant sections of the file
FILE="core/service/Service/Query/Checkpoint.hs"
if [ ! -f "$FILE" ]; then
echo "Missing file: $FILE" >&2
exit 1
fi
echo "=== Snippet around getMinPositionImpl ==="
nl -ba "$FILE" | sed -n '110,175p'
echo
echo "=== Find signature for getMinPositionImpl / min position functions ==="
rg -n "getMinPositionImpl|MinPosition|last_global_position|query_checkpoint" "$FILE"Repository: neohaskell/NeoHaskell
Length of output: 169
🏁 Script executed:
#!/bin/bash
set -euo pipefail
FILE="core/service/Service/Query/Checkpoint.hs"
echo "=== Snippet around getMinPositionImpl (lines 120-175) ==="
cat -n "$FILE" | sed -n '120,175p'
echo
echo "=== Find signature/decoder for getMinPositionImpl ==="
rg -n "getMinPositionImpl|MIN\\(|last_global_position|query_checkpoint" "$FILE"Repository: neohaskell/NeoHaskell
Length of output: 2487
🏁 Script executed:
#!/bin/bash
set -euo pipefail
FILE="core/service/Service/Query/Checkpoint.hs"
echo "=== Lines 1-120 (pool helpers + initializeTable) ==="
cat -n "$FILE" | sed -n '1,120p'
echo
echo "=== Find definition of runPool ==="
rg -n "runPool\\b" "$FILE"
echo
echo "=== Lines around runPool definition ==="
START=$(rg -n "runPool\\b" "$FILE" | head -n1 | cut -d: -f1)
# print a small window; avoid assuming too much
END=$((START+60))
cat -n "$FILE" | sed -n "${START},${END}p"Repository: neohaskell/NeoHaskell
Length of output: 8661
🌐 Web query:
Hasql Decoders.rowMaybe semantics when decoding NULL column with Decoders.nonNullable; MIN(...) returning NULL behavior
💡 Result:
Yes—Hasql’s rowMaybe and nonNullable semantics are about (1) whether a resultset row exists vs not, and (2) whether individual column values are allowed to be SQL NULL. They are independent concerns. 1) Hasql.Decoders.rowMaybe semantics Hasql.Decoders.rowMaybe has type Row a -> Result (Maybe a), and is documented as “Maybe one row or none.” [1] So: - If the query returns zero rows, rowMaybe yields Nothing. - If the query returns exactly one row, rowMaybe yields Just decodedRow. - It does not mean “a single row whose columns may be NULL”; it means the presence/absence of a row in the resultset. [1] 2) Hasql.Decoders.nonNullable semantics Hasql.Decoders.nonNullable wraps a value decoder and documents: “Specify that a decoder produces a non-nullable value.” [2] Additionally, Hasql has nullable/nonNullable combinators for column nullability: column:: NullableOrNot Value a -> Row a, and nullable:: decoder a -> NullableOrNot decoder (Maybe a)... / nonNullable:: decoder a -> NullableOrNot decoder a... [1] So if you use nonNullable for a column decoder, and the database returns SQL NULL for that column, Hasql will treat that as an invalid “parse more columns than there are” / “UnexpectedNull” type decoding error (the source code defines a RowError constructor for “Appears on the attempt to parse a NULL as some value.”) [3]. 3) When you decode MIN(...) / MAX(...) over an empty set In SQL, MIN/MAX over an empty table produce NULL values (your example shows NULL NULL when the table has no rows). A Haskell decoder cannot “recover” from the fact the database returned NULL: it must either - decode those NULLs as nullable (using nullable / Maybe in your row decoder), or - expect non-NULL and fail with an UnexpectedNull error when the NULLs arrive. [2][3] 4) Practical implication for your specific question (“MIN(...) returning NULL behavior”) - The database will return NULL for MIN(FROM) and MAX(TILL) when there are no input rows. - If you decode that output using rowMaybe, you must still consider that rowMaybe only accounts for whether there is a result row. Aggregate queries like SELECT min(...), max(...) typically return exactly one row even when the input set is empty, so rowMaybe will usually still be Just (...), but the decoded columns may be NULL. - If you decode those columns with nonNullable, the NULL columns will cause a decoding failure due to the attempt to parse a NULL as some value (UnexpectedNull). [2][3] 5) What to do instead Option A: decode MIN/MAX columns as nullable (use nullable / Maybe types for those columns). Option B: if you truly want “no result when input is empty”, modify SQL to return zero rows when the source is empty (e.g., with HAVING COUNT(*) = 0/<>0 or similar), so that rowMaybe can produce Nothing. (The Hasql docs themselves define rowMaybe strictly as “Maybe one row or none”.) [1] If you tell me your exact Hasql query/decoder types (and which Hasql version you’re using), I can suggest the exact decoder expression that matches whether you want Nothing vs Just with NULL fields.
Citations:
- 1: https://hackage.haskell.org/package/hasql-1.6.3/docs/Hasql-Decoders.html
- 2: https://hackage.haskell.org/package/hasql-1.6.4.1/docs/Hasql-Decoders.html
- 3: https://hackage.haskell.org/package/hasql-0.14.0.2/docs/src/Hasql-Query.html
Decode MIN(last_global_position) as nullable in getMinPositionImpl
SELECT MIN(...) returns a single row with a SQL NULL when query_checkpoint is empty; decoding that column with nonNullable can fail instead of producing Nothing. Decode the aggregate column as nullable and map NULL to Nothing.
Proposed fix
- let decoder = Decoders.rowMaybe (Decoders.column (Decoders.nonNullable Decoders.int8))
+ let decoder = Decoders.rowMaybe (Decoders.column (Decoders.nullable Decoders.int8))
let statement = Statement (Text.toBytes sql |> Bytes.unwrap) encoder decoder True
result <- runPool pool (Session.statement () statement)
case result of
Nothing -> Task.yield Nothing
- Just pos -> Task.yield (Just (StreamPosition pos))
+ Just Nothing -> Task.yield Nothing
+ Just (Just pos) -> Task.yield (Just (StreamPosition pos))📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| let decoder = Decoders.rowMaybe (Decoders.column (Decoders.nonNullable Decoders.int8)) | |
| let statement = Statement (Text.toBytes sql |> Bytes.unwrap) encoder decoder True | |
| result <- runPool pool (Session.statement () statement) | |
| case result of | |
| Nothing -> Task.yield Nothing | |
| Just pos -> Task.yield (Just (StreamPosition pos)) | |
| let decoder = Decoders.rowMaybe (Decoders.column (Decoders.nullable Decoders.int8)) | |
| let statement = Statement (Text.toBytes sql |> Bytes.unwrap) encoder decoder True | |
| result <- runPool pool (Session.statement () statement) | |
| case result of | |
| Nothing -> Task.yield Nothing | |
| Just Nothing -> Task.yield Nothing | |
| Just (Just pos) -> Task.yield (Just (StreamPosition pos)) |
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@core/service/Service/Query/Checkpoint.hs` around lines 139 - 144, The decoder
for the MIN aggregate is currently using Decoders.nonNullable which will fail
when the table is empty; update the decoder in getMinPositionImpl to use a
nullable decoder (remove Decoders.nonNullable and use Decoders.nullable or the
appropriate nullable int8 decoder) so SQL NULL maps to Nothing, and adjust the
result handling so that a decoded Nothing yields Task.yield Nothing and a Just
pos yields Task.yield (Just (StreamPosition pos)); refer to the
variables/constructs decoder, Decoders.rowMaybe, Decoders.column, Decoders.int8,
runPool, Session.statement and StreamPosition to locate and update the code.
| let (StreamPosition rawFromPosition) = fromPosition | ||
| let defaultCheckpoint = StreamPosition (rawFromPosition - 1) |
There was a problem hiding this comment.
🧩 Analysis chain
🏁 Script executed:
#!/bin/bash
# Check how StreamPosition is defined and whether negative values are handled
ast-grep --pattern 'newtype StreamPosition = StreamPosition $_'
rg -n 'StreamPosition' --type=hs -C2 | head -60Repository: neohaskell/NeoHaskell
Length of output: 6364
🏁 Script executed:
#!/bin/bash
set -euo pipefail
# Inspect StreamPosition definition and any smart constructors/guards
echo "---- StreamPosition.hs ----"
sed -n '1,200p' core/service/Service/Event/StreamPosition.hs
# Find defaultCheckpoint usage in Subscriber.hs
echo "---- Subscriber defaultCheckpoint context ----"
rg -n "defaultCheckpoint|rawFromPosition - 1" core/service/Service/Query/Subscriber.hs -n
sed -n '70,130p' core/service/Service/Query/Subscriber.hs
# Find CheckpointStore interface/impl usage around StreamPosition
echo "---- CheckpointStore setPosition/getPositions usage ----"
rg -n "CheckpointStore|getPositions|setPosition" core/service/Service/Query/Subscriber.hs core/service/Service/Query -S
# Locate CheckpointStore implementations likely touching StreamPosition (by name)
echo "---- CheckpointStore implementations ----"
fd -t f CheckpointStore core | head -50
fd -t f '*Checkpoint*' core/service/Service/Query | head -50Repository: neohaskell/NeoHaskell
Length of output: 7709
🏁 Script executed:
#!/bin/bash
set -euo pipefail
echo "---- StreamPosition.hs ----"
sed -n '1,220p' core/service/Service/Event/StreamPosition.hs
echo "---- Subscriber defaultCheckpoint context ----"
sed -n '70,140p' core/service/Service/Query/Subscriber.hs
rg -n "defaultCheckpoint|rawFromPosition - 1" core/service/Service/Query/Subscriber.hs -n
echo "---- CheckpointStore setPosition/getPositions usage (Query folder) ----"
rg -n "getPositions|setPosition|CheckpointStore" core/service/Service/Query -S
echo "---- Find checkpoint modules/types under core/service/Service ----"
fd -t f '*Checkpoint*' core/service/Service | head -80Repository: neohaskell/NeoHaskell
Length of output: 7547
🏁 Script executed:
#!/bin/bash
set -euo pipefail
echo "---- StreamPosition.hs ----"
sed -n '1,240p' core/service/Service/Event/StreamPosition.hs
echo "---- Subscriber defaultCheckpoint context ----"
sed -n '70,160p' core/service/Service/Query/Subscriber.hs
echo "---- CheckpointStore setPosition/getPositions usage (Query folder) ----"
rg -n "getPositions|setPosition|CheckpointStore" core/service/Service/Query -S
echo "---- Find files with 'Checkpoint' in name under core/service/Service ----"
find core/service/Service -type f -iname '*checkpoint*' | head -80Repository: neohaskell/NeoHaskell
Length of output: 7955
🏁 Script executed:
#!/bin/bash
set -euo pipefail
echo "---- StreamPosition.hs ----"
sed -n '1,240p' core/service/Service/Event/StreamPosition.hs
echo "---- Subscriber defaultCheckpoint context ----"
sed -n '70,160p' core/service/Service/Query/Subscriber.hs
echo "---- CheckpointStore setPosition/getPositions usage (Query folder) ----"
rg -n "getPositions|setPosition|CheckpointStore" core/service/Service/Query -S
echo "---- Find files with 'Checkpoint' in name under core/service/Service ----"
find core/service/Service -type f -iname '*checkpoint*' | head -80Repository: neohaskell/NeoHaskell
Length of output: 7955
🏁 Script executed:
#!/bin/bash
set -euo pipefail
echo "---- StreamPosition.hs ----"
sed -n '1,240p' core/service/Service/Event/StreamPosition.hs
echo "---- Subscriber defaultCheckpoint context ----"
sed -n '70,160p' core/service/Service/Query/Subscriber.hs
echo "---- CheckpointStore setPosition/getPositions usage (Query folder) ----"
rg -n "getPositions|setPosition|CheckpointStore" core/service/Service/Query -S
echo "---- Find files with 'Checkpoint' in name under core/service/Service ----"
find core/service/Service -type f -iname '*checkpoint*' | head -80Repository: neohaskell/NeoHaskell
Length of output: 7955
🏁 Script executed:
#!/bin/bash
set -euo pipefail
echo "---- StreamPosition.hs ----"
sed -n '1,240p' core/service/Service/Event/StreamPosition.hs
echo "---- Subscriber defaultCheckpoint context ----"
sed -n '70,160p' core/service/Service/Query/Subscriber.hs
echo "---- CheckpointStore setPosition/getPositions usage (Query folder) ----"
rg -n "getPositions|setPosition|CheckpointStore" core/service/Service/Query -S
echo "---- Find files with 'Checkpoint' in name under core/service/Service ----"
find core/service/Service -type f -iname '*checkpoint*' | head -120Repository: neohaskell/NeoHaskell
Length of output: 7955
Guard defaultCheckpoint when fromPosition is StreamPosition 0
StreamPosition is newtype StreamPosition = StreamPosition Int64 with a smart constructor new :: Int64 -> Maybe StreamPosition that rejects negatives, but Subscriber.rebuildFrom computes defaultCheckpoint = StreamPosition (rawFromPosition - 1)—so when fromPosition is StreamPosition 0, it produces StreamPosition (-1) by using the raw constructor. Ensure CheckpointStore.setPosition/persistence path safely handles -1 (or clamp/avoid persisting -1) so checkpoint comparisons and storage cannot be corrupted.
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@core/service/Service/Query/Subscriber.hs` around lines 97 - 98, The code
creates defaultCheckpoint = StreamPosition (rawFromPosition - 1) which can
produce a negative StreamPosition when fromPosition is StreamPosition 0; update
Subscriber.rebuildFrom to guard/clamp that subtraction: compute rawDefault = max
0 (rawFromPosition - 1) and construct defaultCheckpoint via the smart
constructor (or handle its Maybe) so you never persist a negative position, and
also add a defensive check in CheckpointStore.setPosition to ignore/normalize
negative inputs (use StreamPosition.new or clamp to 0) to ensure persistence and
comparisons are never corrupted.
| -- Truncate leftover data from previous runs (safe on empty table). | ||
| -- In production, the table is empty on first start and rebuild repopulates. | ||
| let truncateSql = "TRUNCATE queryobjectstore" | ||
| let truncateStmt = Statement (Text.toBytes truncateSql |> Bytes.unwrap) Encoders.noParams Decoders.noResult True | ||
| runPool pool (Session.statement () truncateStmt) |
There was a problem hiding this comment.
Hear My Decree: TRUNCATE on startup shall obliterate all persistence.
The initializeTable function truncates the queryobjectstore table on every startup. This annihilates any persisted query state from previous runs, entirely defeating the purpose of this "persistent" store. The comment claims it is "safe on empty table" yet it executes unconditionally.
Remove the truncate or gate it behind an explicit reset/test flag.
🔥 Proposed fix: remove truncation
runPool pool (Session.statement () statement)
- -- Truncate leftover data from previous runs (safe on empty table).
- -- In production, the table is empty on first start and rebuild repopulates.
- let truncateSql = "TRUNCATE queryobjectstore"
- let truncateStmt = Statement (Text.toBytes truncateSql |> Bytes.unwrap) Encoders.noParams Decoders.noResult True
- runPool pool (Session.statement () truncateStmt)📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| -- Truncate leftover data from previous runs (safe on empty table). | |
| -- In production, the table is empty on first start and rebuild repopulates. | |
| let truncateSql = "TRUNCATE queryobjectstore" | |
| let truncateStmt = Statement (Text.toBytes truncateSql |> Bytes.unwrap) Encoders.noParams Decoders.noResult True | |
| runPool pool (Session.statement () truncateStmt) | |
| runPool pool (Session.statement () statement) |
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@core/service/Service/QueryObjectStore/Postgres.hs` around lines 150 - 154,
The initializeTable routine currently always truncates the queryobjectstore via
truncateSql/truncateStmt and runPool (Session.statement), which destroys
persisted data; remove the unconditional TRUNCATE or gate it behind an explicit
reset flag parameter (e.g., add a Bool resetOnStartup argument to
initializeTable or read a config/reset function) and only execute the runPool
(Session.statement) truncateStmt when that flag is true; ensure any callers of
initializeTable are updated to pass the flag or default to false so production
startups do not erase the table.
| -- | Get a query instance by its ID. | ||
| getImpl :: forall query. (Json.FromJSON query) => HasqlPool.Pool -> Uuid -> Task Error (Maybe query) | ||
| getImpl pool queryId = do | ||
| let sql = "SELECT instance_id, payload::text FROM queryobjectstore WHERE instance_id = $1" |
There was a problem hiding this comment.
The schema's composite key is forsaken.
The schema declares PRIMARY KEY (query_name, instance_id), yet:
getImpl(Line 176) selects only byinstance_idatomicUpdateImpl(Line 192) locks only byinstance_id- The upsert (Line 197) hardcodes
'default'forquery_name deleteStatement(Line 204) deletes only byinstance_id
If multiple query types share instance IDs, operations shall corrupt or return incorrect data. Derive query_name from the query type (perhaps via NameOf query) and filter accordingly.
Also applies to: 192-197, 204-207
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@core/service/Service/QueryObjectStore/Postgres.hs` at line 176, The code is
ignoring the composite primary key (query_name, instance_id); update getImpl,
atomicUpdateImpl, the upsert logic, and deleteStatement to always include and
filter by query_name derived from the query type (e.g., via NameOf query)
instead of only instance_id or hardcoding 'default'. Concretely: change the SQL
in getImpl ("SELECT instance_id, payload::text FROM queryobjectstore WHERE
instance_id = $1") and atomicUpdateImpl lock SQL to add "AND query_name = $N"
and pass NameOf query as the additional parameter, update the upsert to use the
actual query_name value instead of 'default', and make deleteStatement delete
WHERE query_name = <NameOf query> AND instance_id = <id>; ensure parameter
ordering and binding in functions getImpl, atomicUpdateImpl, upsert, and
deleteStatement are adjusted accordingly.
| rows | ||
| |> Array.fromLinkedList | ||
| |> Array.reduce | ||
| (\row acc -> case Json.decodeText row.rowPayload of | ||
| Ok val -> acc |> Array.push val | ||
| Err _ -> acc | ||
| ) | ||
| Array.empty | ||
| |> Task.yield |
There was a problem hiding this comment.
Rows that fail to decode vanish without trace.
When Json.decodeText fails (Lines 243-245), the row is silently dropped from results. This could mask widespread data corruption. Consider logging decode failures or returning a partial-failure indicator so operators may diagnose issues.
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@core/service/Service/QueryObjectStore/Postgres.hs` around lines 240 - 248,
The current reduce over rows silently drops failures from Json.decodeText;
update the reducer (the Array.reduce that processes rows |>
Array.fromLinkedList) to capture decode failures instead of swallowing them:
either log the decode error (include Json.decodeText error, row.rowPayload and
an identifier like row.rowId) inside the Err branch using your logging facility,
or change the function return to a Result-like shape (e.g., { successes: Array,
errors: Array } or Task (Result ...)) where the reducer appends Ok decodes to
successes and Err details to errors; ensure the final Task.yield returns the new
structure (or the logged-and-continued successes) so operators can detect
partial failures.
| it "E3: No checkpoints for a query means it needs processing from the beginning" \_ -> do | ||
| -- Covered by subscriber integration tests | ||
| pass |
There was a problem hiding this comment.
Replace the no-op test with a real assertion.
Line 128 uses pass, so this case never validates the claimed behavior.
Suggested fix
- it "E3: No checkpoints for a query means it needs processing from the beginning" \_ -> do
- -- Covered by subscriber integration tests
- pass
+ it "E3: No checkpoints for a query means it needs processing from the beginning" \_ -> do
+ let eventPosition = StreamPosition 5 :: StreamPosition
+ let checkpoint = Nothing :: Maybe StreamPosition
+ (case checkpoint of
+ Nothing -> True
+ Just cp -> cp < eventPosition) |> shouldBe True🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@core/test-service/Service/Query/CheckpointSpec.hs` around lines 126 - 128,
Replace the no-op test body with a concrete assertion: construct a query/state
with an empty checkpoint list and call the module function that decides where to
resume (e.g., determineStartingCheckpoint / computeStartPoint /
getLatestCheckpoint in this test module); assert it indicates "start from
beginning" (e.g., returns Nothing or a StartFromBeginning value) instead of a
checkpoint. Update the test in the it "E3: No checkpoints..." block to build the
empty-checkpoint input, invoke that decision function, and assert the expected
start-from-beginning result.
| it "Start from position 100 with checkpoints — only events at >= 101 processed" \_ -> do | ||
| eventStore <- InMemory.new |> Task.mapError toText |
There was a problem hiding this comment.
Align test names with actual scenarios.
The titles describe different setups than what the code executes (Line 44 says position 100/checkpoints; Line 110 says P then P+1). Rename to match the real inputs so failures are diagnosable.
Also applies to: 64-68, 110-111, 129-136
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@core/test-service/Service/Query/SubscriberCheckpointSpec.hs` around lines 44
- 45, The test titles in SubscriberCheckpointSpec (the string literals passed to
each it ...) are misleading and don't match the actual start positions used by
the test inputs; update each it description so it accurately reflects the
numeric start/checkpoint values used in the test body (e.g. change "Start from
position 100 with checkpoints — only events at >= 101 processed" to the actual
start value used (P) and the expected processed range (>= P+1), and likewise fix
the other mismatched it descriptions in the same spec so names reflect the real
scenarios executed by the code).
| it "EventStore returns empty stream — returns with 0 events replayed" \_ -> do | ||
| eventStore <- InMemory.new |> Task.mapError toText | ||
| checkpointStore <- CheckpointInMemory.new | ||
| let registry = Registry.empty | ||
| subscriber <- Subscriber.new eventStore registry | ||
|
|
||
| Subscriber.rebuildFrom subscriber checkpointStore (StreamPosition 0) | ||
| pass |
There was a problem hiding this comment.
Assert observable behavior in the empty-stream case.
This test currently ends with pass; add at least one assertion (e.g., ready state or processed count remains zero) so it guards regressions.
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@core/test-service/Service/Query/SubscriberCheckpointSpec.hs` around lines 70
- 77, After calling Subscriber.rebuildFrom subscriber checkpointStore
(StreamPosition 0), replace the trailing pass with an assertion that verifies no
events were replayed; for example, assert subscriber is still in a ready state
or that the processed count remains zero by calling the appropriate accessor on
subscriber (e.g., Subscriber.ready or Subscriber.processedCount) and comparing
to True or 0 respectively so the test fails if any events were processed. Ensure
the assertion uses the existing subscriber and checkpointStore values created
via Subscriber.new and CheckpointInMemory.new.
| Application.runWith eventStore app | ||
|
|
||
| it "rebuilds queries before starting services" \_ -> do | ||
| it "rebuilds queries asynchronously after startup" \_ -> do |
There was a problem hiding this comment.
🧹 Nitpick | 🔵 Trivial | 💤 Low value
Thy timing-based wait is acceptable, though fragile in the face of slow CI.
The 100ms sleep waits for async rebuild completion. While functional, fixed sleeps can cause flakiness on slow runners or waste time on fast ones. This is acceptable for a unit test with an in-memory store, but be vigilant if flakiness manifests.
Also applies to: 191-193
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@core/test/Service/ApplicationSpec.hs` at line 163, The test "rebuilds queries
asynchronously after startup" in ApplicationSpec.hs uses a fixed 100ms
threadDelay (and similar sleeps at the later location) which is fragile; replace
the fixed sleep with a bounded-polling or wait-with-timeout approach that
repeatedly checks the observable condition (e.g. the query-rebuild result or a
flag/state exposed by the in-memory store) until success or a short timeout
elapses, or refactor the code under test to emit a synchronization primitive
(MVar/TMVar/STM/Async) and wait on that instead; update both the test at
"rebuilds queries asynchronously after startup" and the other occurrence (lines
~191-193) to use the polling-with-timeout or explicit notification rather than a
fixed threadDelay so tests are robust across slow CI and fast local runs.
.