diff --git a/core/nhcore.cabal b/core/nhcore.cabal index 6781ea21..646bcbf9 100644 --- a/core/nhcore.cabal +++ b/core/nhcore.cabal @@ -266,6 +266,7 @@ library Service.QueryObjectStore Service.QueryObjectStore.Core Service.QueryObjectStore.InMemory + Service.QueryObjectStore.Postgres Service.Response Service.SnapshotCache Service.SnapshotCache.Core @@ -286,6 +287,7 @@ library Service.Transport Service.Transport.Web Service.Transport.Web.BuiltinSchemas + Service.Transport.Web.Readiness Service.Transport.Web.SwaggerUI Service.Transport.Internal Service.Transport.Cli @@ -606,6 +608,10 @@ test-suite nhcore-test-service Service.Command.CanAccess.PermissionFixture Service.Transport.Web.CommandAuthSpec Service.CommandExecutor.AuditLoggingSpec + Service.QueryObjectStore.PostgresSpec + Service.Query.Subscriber.ReadinessSpec + Service.Transport.Web.ReadinessSpec + Service.Application.ReadinessBuilderSpec type: exitcode-stdio-1.0 hs-source-dirs: test-service, test diff --git a/core/service/Service/Application.hs b/core/service/Service/Application.hs index 43f7c1a8..e1d039f5 100644 --- a/core/service/Service/Application.hs +++ b/core/service/Service/Application.hs @@ -46,6 +46,9 @@ module Service.Application ( withHealthCheck, withoutHealthCheck, withDispatcherConfig, + useQueryObjectStore, + useReadinessEndpoint, + withoutReadinessEndpoint, -- * Health Check Re-export Web.HealthCheckConfig (..), @@ -2270,3 +2273,29 @@ formatOAuth2ValidationError providerName provider oauthError = do [fmt|Provider '#{providerName}' unexpected InvalidPkceVerifier during validation: #{errMsg}|] InvalidRedirectUri errMsg -> [fmt|Provider '#{providerName}' unexpected InvalidRedirectUri during validation: #{errMsg}|] + + +-- | Wire a persistent query object store backend into the application. +-- +-- Stub — not implemented. +useQueryObjectStore :: + forall config. + (QueryObjectStoreConfig config) => + config -> + Application -> + Application +useQueryObjectStore _ _ = panic "not implemented: Application.useQueryObjectStore" + + +-- | Enable the /ready HTTP endpoint (on by default, shown for discoverability). +-- +-- Stub — not implemented. +useReadinessEndpoint :: Application -> Application +useReadinessEndpoint _ = panic "not implemented: Application.useReadinessEndpoint" + + +-- | Disable the /ready endpoint entirely. +-- +-- Stub — not implemented. +withoutReadinessEndpoint :: Application -> Application +withoutReadinessEndpoint _ = panic "not implemented: Application.withoutReadinessEndpoint" diff --git a/core/service/Service/Query/Subscriber.hs b/core/service/Service/Query/Subscriber.hs index f65b2d40..6b4d629b 100644 --- a/core/service/Service/Query/Subscriber.hs +++ b/core/service/Service/Query/Subscriber.hs @@ -1,9 +1,17 @@ module Service.Query.Subscriber ( QuerySubscriber (..), + Readiness (..), + RebuildOptions (..), + QueryRebuildError (..), new, start, stop, rebuildAll, + rebuildFrom, + rebuildAllAsync, + readinessOf, + readinessOfQuery, + rebuildOptionsDefault, ) where import Basics @@ -143,6 +151,94 @@ processEventHandler subscriber rawEvent = do Nothing -> pass +-- | Readiness state of a query rebuild. +data Readiness + = Rebuilding + | Ready + | Failed Text + deriving (Eq, Show, Generic) + + +-- | Options controlling a per-query rebuild. +data RebuildOptions = RebuildOptions + { chunkSize :: Int + -- ^ Events per fetch (default: 1000). + , timeout :: Int + -- ^ Per-query rebuild timeout in seconds (default: 300). + , logProgress :: Bool + -- ^ Emit a log message after each chunk (default: True). + , deleteStaleHashFirst :: Bool + -- ^ Delete rows with mismatched query_hash before replaying (default: True). + } + deriving (Eq, Show) + + +-- | Errors produced during a query rebuild. +data QueryRebuildError + = RebuildTimeout Text + -- ^ Rebuild took longer than the configured timeout. + | UpdaterException Text + -- ^ QueryUpdater returned Err during replay. + | HashMismatchReplay Text + -- ^ Hash mismatch forced a replay, but the replay itself failed. + | CheckpointFetchFailed Text + -- ^ Could not read the resume position from the object store. + | EventStoreFailed Text + -- ^ EventStore.readFrom returned Err. + deriving (Eq, Show, Generic) + + +-- | Default rebuild options. +rebuildOptionsDefault :: RebuildOptions +rebuildOptionsDefault = RebuildOptions + { chunkSize = 1000 + , timeout = 300 + , logProgress = True + , deleteStaleHashFirst = True + } + + +-- | Resumable per-query rebuild from a given StreamPosition. +-- +-- Stub — not implemented. +rebuildFrom + :: QuerySubscriber + -> Text + -> StreamPosition + -> RebuildOptions + -> Task QueryRebuildError Unit +rebuildFrom _ _ _ _ = panic "not implemented: Service.Query.Subscriber.rebuildFrom" + + +-- | Spawn async rebuild for all registered queries, updating readiness states. +-- +-- Stub — not implemented. +rebuildAllAsync + :: QuerySubscriber + -> RebuildOptions + -> Task QueryRebuildError Unit +rebuildAllAsync _ _ = panic "not implemented: Service.Query.Subscriber.rebuildAllAsync" + + +-- | Fetch the aggregate readiness state of all registered queries. +-- +-- Stub — not implemented. +readinessOf + :: QuerySubscriber + -> Task Text Readiness +readinessOf _ = panic "not implemented: Service.Query.Subscriber.readinessOf" + + +-- | Fetch the readiness state for a specific named query. +-- +-- Stub — not implemented. +readinessOfQuery + :: QuerySubscriber + -> Text + -> Task Text (Maybe Readiness) +readinessOfQuery _ _ = panic "not implemented: Service.Query.Subscriber.readinessOfQuery" + + -- | Process a single raw event through all relevant query updaters. processEvent :: QuerySubscriber -> Event Json.Value -> Task Text Unit processEvent subscriber rawEvent = do diff --git a/core/service/Service/QueryObjectStore/Postgres.hs b/core/service/Service/QueryObjectStore/Postgres.hs new file mode 100644 index 00000000..25543af5 --- /dev/null +++ b/core/service/Service/QueryObjectStore/Postgres.hs @@ -0,0 +1,62 @@ +module Service.QueryObjectStore.Postgres ( + PostgresQueryObjectStoreConfig (..), + QueryObjectStoreError (..), + createQueryObjectStore, +) where + +import Basics +import Json qualified +import Service.QueryObjectStore.Core (QueryObjectStore (..)) +import Service.QueryObjectStore.Core qualified as Core +import Task (Task) +import Task qualified +import Text (Text) +import ToText (toText) + + +-- | Errors produced by the Postgres-backed QueryObjectStore. +data QueryObjectStoreError + = ConnectionFailed Text + -- ^ Unable to acquire Postgres connection. + | StatementFailed Text + -- ^ Hasql statement execution failed. + | DecodingFailed Text + -- ^ Hasql result decoder failed. + deriving (Eq, Show, Generic) + + +-- | Configuration for the Postgres-backed QueryObjectStore. +data PostgresQueryObjectStoreConfig = PostgresQueryObjectStoreConfig + { host :: Text + , databaseName :: Text + , user :: Text + , password :: Text + , port :: Int + } + deriving (Eq, Show) + + +instance Core.QueryObjectStoreConfig PostgresQueryObjectStoreConfig where + createQueryObjectStore config = + newFromConfig config + |> Task.mapError toText + + +-- | Create a Postgres-backed QueryObjectStore from the given config. +-- +-- This is the public API used by tests. Delegates to the internal stub. +createQueryObjectStore + :: forall query. + (Json.FromJSON query, Json.ToJSON query) + => PostgresQueryObjectStoreConfig + -> Task QueryObjectStoreError (QueryObjectStore query) +createQueryObjectStore config = newFromConfig config + + +-- | Internal stub. Throws a sentinel so that every test fails against this stub. +newFromConfig + :: forall query. + (Json.FromJSON query, Json.ToJSON query) + => PostgresQueryObjectStoreConfig + -> Task QueryObjectStoreError (QueryObjectStore query) +newFromConfig _ = panic "not implemented: Service.QueryObjectStore.Postgres.createQueryObjectStore" diff --git a/core/service/Service/Transport/Web/Readiness.hs b/core/service/Service/Transport/Web/Readiness.hs new file mode 100644 index 00000000..ad48bb7b --- /dev/null +++ b/core/service/Service/Transport/Web/Readiness.hs @@ -0,0 +1,33 @@ +module Service.Transport.Web.Readiness ( + ReadinessConfig (..), + handleReadinessRequest, + handleQueryReadinessRequest, +) where + +import Basics +import Task (Task) +import Text (Text) + + +-- | Configuration for the /ready endpoint. +data ReadinessConfig = ReadinessConfig + { readinessPath :: Text + -- ^ URL path for the readiness endpoint (default: "ready"). + , includeQueryStatus :: Bool + -- ^ Whether /ready includes per-query lag and names (default: True). + } + deriving (Eq, Show) + + +-- | Handle GET /ready. +-- +-- Stub — not implemented. +handleReadinessRequest :: Task Text Unit +handleReadinessRequest = panic "not implemented: Service.Transport.Web.Readiness.handleReadinessRequest" + + +-- | Handle GET /queries/{name} readiness degradation. +-- +-- Stub — not implemented. +handleQueryReadinessRequest :: Task Text Unit +handleQueryReadinessRequest = panic "not implemented: Service.Transport.Web.Readiness.handleQueryReadinessRequest" diff --git a/core/test-service/Main.hs b/core/test-service/Main.hs index a05b5e5b..27fabe41 100644 --- a/core/test-service/Main.hs +++ b/core/test-service/Main.hs @@ -44,6 +44,10 @@ import Service.Command.AuthSpec qualified import Service.Command.CanAccessSpec qualified import Service.Transport.Web.CommandAuthSpec qualified import Service.CommandExecutor.AuditLoggingSpec qualified +import Service.QueryObjectStore.PostgresSpec qualified +import Service.Query.Subscriber.ReadinessSpec qualified +import Service.Transport.Web.ReadinessSpec qualified +import Service.Application.ReadinessBuilderSpec qualified import Test.Hspec qualified as Hspec @@ -91,3 +95,7 @@ main = Hspec.hspec do Hspec.describe "Service.Command.CanAccess" Service.Command.CanAccessSpec.spec Hspec.describe "Service.Transport.Web.CommandAuth" Service.Transport.Web.CommandAuthSpec.spec Hspec.describe "Service.CommandExecutor.AuditLogging" Service.CommandExecutor.AuditLoggingSpec.spec + Hspec.describe "Service.QueryObjectStore.Postgres" Service.QueryObjectStore.PostgresSpec.spec + Hspec.describe "Service.Query.Subscriber.Readiness" Service.Query.Subscriber.ReadinessSpec.spec + Hspec.describe "Service.Transport.Web.Readiness" Service.Transport.Web.ReadinessSpec.spec + Hspec.describe "Service.Application.ReadinessBuilder" Service.Application.ReadinessBuilderSpec.spec diff --git a/core/test-service/Service/Application/ReadinessBuilderSpec.hs b/core/test-service/Service/Application/ReadinessBuilderSpec.hs new file mode 100644 index 00000000..a5708459 --- /dev/null +++ b/core/test-service/Service/Application/ReadinessBuilderSpec.hs @@ -0,0 +1,77 @@ +module Service.Application.ReadinessBuilderSpec where + +import Core +import Service.Application qualified as Application +import Service.QueryObjectStore.Postgres ( + PostgresQueryObjectStoreConfig (..), + ) +import Test + + +-- | Minimal Postgres config for builder tests. +testPostgresConfig :: PostgresQueryObjectStoreConfig +testPostgresConfig = + PostgresQueryObjectStoreConfig + { host = "localhost" + , databaseName = "neohaskell" + , user = "neohaskell" + , password = "neohaskell" + , port = 5432 + } + + +spec :: Spec Unit +spec = do + describe "useQueryObjectStore" do + it "wires the QueryObjectStore into the subscriber and returns updated Application" \_ -> do + let _app = + Application.new + |> Application.useQueryObjectStore testPostgresConfig + -- Stub: useQueryObjectStore throws error "not implemented", so this panics. + -- The test must fail, not pass. + fail "useQueryObjectStore wiring: not implemented — stub must fail" + + it "allows chaining with other builder methods (withQuery, withEventStore, etc.)" \_ -> do + -- Builder chain: + -- Application.new |> useQueryObjectStore config |> ... + -- Stub panics before the chain completes. + fail "useQueryObjectStore chaining: not implemented — stub must fail" + + it "fails at runtime if the QueryObjectStore config is invalid" \_ -> do + let badConfig = testPostgresConfig { host = "unreachable.invalid" } + let _app = + Application.new + |> Application.useQueryObjectStore badConfig + -- Stub: not implemented; must fail. + fail "useQueryObjectStore invalid config: not implemented — stub must fail" + + it "accepts multiple store backends if they implement QueryObjectStoreConfig" \_ -> do + -- Both InMemory (via existing withQueryObjectStore) and Postgres backends + -- must compile and be accepted by the builder. + fail "useQueryObjectStore multiple backends: not implemented — stub must fail" + + describe "useReadinessEndpoint" do + it "enables the /ready HTTP endpoint and returns updated Application" \_ -> do + let _app = + Application.new + |> Application.useReadinessEndpoint + fail "useReadinessEndpoint enables /ready: not implemented — stub must fail" + + it "is enabled by default (omitting the call still activates /ready)" \_ -> do + -- Application.new without explicit useReadinessEndpoint should still have /ready. + fail "useReadinessEndpoint default on: not implemented — stub must fail" + + it "allows chaining with Application.withoutReadinessEndpoint to disable /ready" \_ -> do + let _app = + Application.new + |> Application.useReadinessEndpoint + |> Application.withoutReadinessEndpoint + fail "useReadinessEndpoint then withoutReadinessEndpoint: not implemented — stub must fail" + + it "returns Application suitable for Application.run" \_ -> do + let _app = + Application.new + |> Application.useReadinessEndpoint + -- Would call Application.run but that requires full infrastructure; + -- stub must fail before we get there. + fail "useReadinessEndpoint runnable: not implemented — stub must fail" diff --git a/core/test-service/Service/Query/Subscriber/ReadinessSpec.hs b/core/test-service/Service/Query/Subscriber/ReadinessSpec.hs new file mode 100644 index 00000000..bc64522b --- /dev/null +++ b/core/test-service/Service/Query/Subscriber/ReadinessSpec.hs @@ -0,0 +1,607 @@ +module Service.Query.Subscriber.ReadinessSpec where + +import Core +import Service.Event.StreamPosition (StreamPosition (..)) +import Service.EventStore.InMemory qualified as InMemory +import Service.Query.Registry qualified as Registry +import Service.Query.Subscriber ( + Readiness (..), + RebuildOptions (..), + QueryRebuildError (..), + rebuildOptionsDefault, + ) +import Service.Query.Subscriber qualified as Subscriber +import Service.QueryObjectStore.Core (QueryObjectStore) +import Service.QueryObjectStore.Postgres (PostgresQueryObjectStoreConfig (..), QueryObjectStoreError) +import Service.QueryObjectStore.Postgres qualified as PostgresQOS +import Task qualified +import Test + + +testConfig :: PostgresQueryObjectStoreConfig +testConfig = PostgresQueryObjectStoreConfig + { host = "localhost" + , databaseName = "neohaskell" + , user = "neohaskell" + , password = "neohaskell" + , port = 5432 + } + + +-- | Typed helper to resolve the polymorphic `query` variable. +createTestStore :: Task QueryObjectStoreError (QueryObjectStore Unit) +createTestStore = PostgresQOS.createQueryObjectStore testConfig + + +spec :: Spec Unit +spec = do + describe "rebuildFrom" do + it "replays all events from startPosition to EventStore head and writes to store" \_ -> do + eventStore <- InMemory.new |> Task.mapError toText + subscriber <- Subscriber.new eventStore Registry.empty + result <- + Subscriber.rebuildFrom subscriber "test-query" (StreamPosition 0) rebuildOptionsDefault + |> Task.asResult + case result of + Ok _ -> pass + Err err -> fail [fmt|Expected success but got: #{toText (show err)}|] + + it "reads events in chunks respecting the configured chunkSize" \_ -> do + eventStore <- InMemory.new |> Task.mapError toText + subscriber <- Subscriber.new eventStore Registry.empty + let opts = rebuildOptionsDefault { chunkSize = 1000 } + result <- + Subscriber.rebuildFrom subscriber "test-query" (StreamPosition 0) opts + |> Task.asResult + case result of + Ok _ -> fail "rebuildFrom chunk adherence: not implemented — stub must fail" + Err _ -> fail "rebuildFrom chunk adherence: not implemented — stub must fail" + + it "emits progress log message after each chunk completes" \_ -> do + eventStore <- InMemory.new |> Task.mapError toText + subscriber <- Subscriber.new eventStore Registry.empty + let opts = rebuildOptionsDefault { logProgress = True } + result <- + Subscriber.rebuildFrom subscriber "test-query" (StreamPosition 0) opts + |> Task.asResult + case result of + Ok _ -> fail "rebuildFrom progress log: not implemented — stub must fail" + Err _ -> fail "rebuildFrom progress log: not implemented — stub must fail" + + it "fails with RebuildTimeout if rebuild exceeds the configured timeout duration" \_ -> do + eventStore <- InMemory.new |> Task.mapError toText + subscriber <- Subscriber.new eventStore Registry.empty + let opts = rebuildOptionsDefault { timeout = 1 } + result <- + Subscriber.rebuildFrom subscriber "test-query" (StreamPosition 0) opts + |> Task.asResult + case result of + Err (RebuildTimeout _) -> pass + Ok _ -> fail "Expected RebuildTimeout but got Ok" + Err other -> fail [fmt|Expected RebuildTimeout but got: #{toText (show other)}|] + + it "fails with UpdaterException if the QueryUpdater returns Err" \_ -> do + eventStore <- InMemory.new |> Task.mapError toText + subscriber <- Subscriber.new eventStore Registry.empty + result <- + Subscriber.rebuildFrom subscriber "test-query" (StreamPosition 0) rebuildOptionsDefault + |> Task.asResult + case result of + Err (UpdaterException _) -> pass + Ok _ -> fail "Expected UpdaterException but got Ok" + Err other -> fail [fmt|Expected UpdaterException but got: #{toText (show other)}|] + + it "deletes rows with mismatched query_hash before replaying" \_ -> do + eventStore <- InMemory.new |> Task.mapError toText + subscriber <- Subscriber.new eventStore Registry.empty + result <- + Subscriber.rebuildFrom subscriber "test-query" (StreamPosition 0) rebuildOptionsDefault + |> Task.asResult + case result of + Ok _ -> fail "rebuildFrom hash mismatch delete: not implemented — stub must fail" + Err _ -> fail "rebuildFrom hash mismatch delete: not implemented — stub must fail" + + it "fails with HashMismatchReplay if hash-mismatch deletion succeeds but replay fails" \_ -> do + eventStore <- InMemory.new |> Task.mapError toText + subscriber <- Subscriber.new eventStore Registry.empty + result <- + Subscriber.rebuildFrom subscriber "test-query" (StreamPosition 0) rebuildOptionsDefault + |> Task.asResult + case result of + Err (HashMismatchReplay _) -> pass + Ok _ -> fail "Expected HashMismatchReplay but got Ok" + Err other -> fail [fmt|Expected HashMismatchReplay but got: #{toText (show other)}|] + + it "does not log progress if logProgress=False" \_ -> do + eventStore <- InMemory.new |> Task.mapError toText + subscriber <- Subscriber.new eventStore Registry.empty + let opts = rebuildOptionsDefault { logProgress = False } + result <- + Subscriber.rebuildFrom subscriber "test-query" (StreamPosition 0) opts + |> Task.asResult + case result of + Ok _ -> fail "rebuildFrom no-progress-log: not implemented — stub must fail" + Err _ -> fail "rebuildFrom no-progress-log: not implemented — stub must fail" + + it "fails with CheckpointFetchFailed if query_object_store is unavailable on startup" \_ -> do + eventStore <- InMemory.new |> Task.mapError toText + subscriber <- Subscriber.new eventStore Registry.empty + result <- + Subscriber.rebuildFrom subscriber "test-query" (StreamPosition 0) rebuildOptionsDefault + |> Task.asResult + case result of + Err (CheckpointFetchFailed _) -> pass + Ok _ -> fail "Expected CheckpointFetchFailed but got Ok" + Err other -> fail [fmt|Expected CheckpointFetchFailed but got: #{toText (show other)}|] + + it "handles chunk boundaries without tearing state" \_ -> do + eventStore <- InMemory.new |> Task.mapError toText + subscriber <- Subscriber.new eventStore Registry.empty + result <- + Subscriber.rebuildFrom subscriber "test-query" (StreamPosition 0) rebuildOptionsDefault + |> Task.asResult + case result of + Ok _ -> fail "rebuildFrom chunk-boundary tearing H6: not implemented — stub must fail" + Err _ -> fail "rebuildFrom chunk-boundary tearing H6: not implemented — stub must fail" + + it "resumes from checkpoint when startPosition > 0" \_ -> do + eventStore <- InMemory.new |> Task.mapError toText + subscriber <- Subscriber.new eventStore Registry.empty + result <- + Subscriber.rebuildFrom subscriber "test-query" (StreamPosition 500) rebuildOptionsDefault + |> Task.asResult + case result of + Ok _ -> fail "rebuildFrom resume: not implemented — stub must fail" + Err _ -> fail "rebuildFrom resume: not implemented — stub must fail" + + it "fails with EventStoreFailed if EventStore.readFrom returns Err" \_ -> do + eventStore <- InMemory.new |> Task.mapError toText + subscriber <- Subscriber.new eventStore Registry.empty + result <- + Subscriber.rebuildFrom subscriber "test-query" (StreamPosition 0) rebuildOptionsDefault + |> Task.asResult + case result of + Err (EventStoreFailed _) -> pass + Ok _ -> fail "Expected EventStoreFailed but got Ok" + Err other -> fail [fmt|Expected EventStoreFailed but got: #{toText (show other)}|] + + describe "rebuildAllAsync" do + it "spawns async tasks for all registered queries and returns when all complete successfully" \_ -> do + eventStore <- InMemory.new |> Task.mapError toText + subscriber <- Subscriber.new eventStore Registry.empty + result <- + Subscriber.rebuildAllAsync subscriber rebuildOptionsDefault + |> Task.asResult + case result of + Ok _ -> pass + Err err -> fail [fmt|Expected success but got: #{toText (show err)}|] + + it "sets per-query readiness to Rebuilding at start and Ready on completion" \_ -> do + eventStore <- InMemory.new |> Task.mapError toText + subscriber <- Subscriber.new eventStore Registry.empty + result <- + Subscriber.rebuildAllAsync subscriber rebuildOptionsDefault + |> Task.asResult + case result of + Ok _ -> fail "rebuildAllAsync readiness transitions: not implemented — stub must fail" + Err _ -> fail "rebuildAllAsync readiness transitions: not implemented — stub must fail" + + it "sets readiness to Failed if any query's rebuild times out" \_ -> do + eventStore <- InMemory.new |> Task.mapError toText + subscriber <- Subscriber.new eventStore Registry.empty + result <- + Subscriber.rebuildAllAsync subscriber rebuildOptionsDefault { timeout = 1 } + |> Task.asResult + case result of + Ok _ -> fail "rebuildAllAsync timeout -> Failed: not implemented — stub must fail" + Err _ -> fail "rebuildAllAsync timeout -> Failed: not implemented — stub must fail" + + it "sets readiness to Failed if any query's updater throws an exception" \_ -> do + eventStore <- InMemory.new |> Task.mapError toText + subscriber <- Subscriber.new eventStore Registry.empty + result <- + Subscriber.rebuildAllAsync subscriber rebuildOptionsDefault + |> Task.asResult + case result of + Ok _ -> fail "rebuildAllAsync updater exception -> Failed: not implemented — stub must fail" + Err _ -> fail "rebuildAllAsync updater exception -> Failed: not implemented — stub must fail" + + it "continues rebuilding other queries even if one fails" \_ -> do + eventStore <- InMemory.new |> Task.mapError toText + subscriber <- Subscriber.new eventStore Registry.empty + result <- + Subscriber.rebuildAllAsync subscriber rebuildOptionsDefault + |> Task.asResult + case result of + Ok _ -> fail "rebuildAllAsync failure isolation: not implemented — stub must fail" + Err _ -> fail "rebuildAllAsync failure isolation: not implemented — stub must fail" + + it "logs structured WARN message with query name and error when a query fails" \_ -> do + eventStore <- InMemory.new |> Task.mapError toText + subscriber <- Subscriber.new eventStore Registry.empty + result <- + Subscriber.rebuildAllAsync subscriber rebuildOptionsDefault + |> Task.asResult + case result of + Ok _ -> fail "rebuildAllAsync WARN log: not implemented — stub must fail" + Err _ -> fail "rebuildAllAsync WARN log: not implemented — stub must fail" + + it "respects the per-query timeout configured in RebuildOptions" \_ -> do + eventStore <- InMemory.new |> Task.mapError toText + subscriber <- Subscriber.new eventStore Registry.empty + result <- + Subscriber.rebuildAllAsync subscriber rebuildOptionsDefault { timeout = 10 } + |> Task.asResult + case result of + Ok _ -> fail "rebuildAllAsync per-query timeout: not implemented — stub must fail" + Err _ -> fail "rebuildAllAsync per-query timeout: not implemented — stub must fail" + + it "can be cancelled via AsyncTask.cancel without leaving partial writes" \_ -> do + eventStore <- InMemory.new |> Task.mapError toText + subscriber <- Subscriber.new eventStore Registry.empty + result <- + Subscriber.rebuildAllAsync subscriber rebuildOptionsDefault + |> Task.asResult + case result of + Ok _ -> fail "rebuildAllAsync cancellation H8: not implemented — stub must fail" + Err _ -> fail "rebuildAllAsync cancellation H8: not implemented — stub must fail" + + describe "readinessOf" do + it "returns Ready when all registered queries are caught up" \_ -> do + eventStore <- InMemory.new |> Task.mapError toText + subscriber <- Subscriber.new eventStore Registry.empty + result <- + Subscriber.readinessOf subscriber + |> Task.asResult + case result of + Ok Ready -> pass + Ok Rebuilding -> fail "readinessOf returned Rebuilding but expected Ready" + Ok (Failed _) -> fail "readinessOf returned Failed but expected Ready" + Err err -> fail [fmt|Expected success but got: #{toText (show err)}|] + + it "returns Rebuilding when at least one query is still replaying" \_ -> do + eventStore <- InMemory.new |> Task.mapError toText + subscriber <- Subscriber.new eventStore Registry.empty + result <- + Subscriber.readinessOf subscriber + |> Task.asResult + case result of + Ok Rebuilding -> pass + Ok _ -> fail "readinessOf returned wrong state — expected Rebuilding" + Err err -> fail [fmt|Expected success but got: #{toText (show err)}|] + + it "returns Failed with first failure reason when any query has failed" \_ -> do + eventStore <- InMemory.new |> Task.mapError toText + subscriber <- Subscriber.new eventStore Registry.empty + result <- + Subscriber.readinessOf subscriber + |> Task.asResult + case result of + Ok (Failed _) -> pass + Ok _ -> fail "readinessOf returned wrong state — expected Failed" + Err err -> fail [fmt|Expected success but got: #{toText (show err)}|] + + it "returns Ready for empty query set (no queries registered)" \_ -> do + eventStore <- InMemory.new |> Task.mapError toText + subscriber <- Subscriber.new eventStore Registry.empty + result <- + Subscriber.readinessOf subscriber + |> Task.asResult + case result of + Ok Ready -> pass + Ok _ -> fail "readinessOf returned wrong state for empty set — expected Ready" + Err err -> fail [fmt|Expected success but got: #{toText (show err)}|] + + it "handles state transitions during concurrent rebuilds" \_ -> do + eventStore <- InMemory.new |> Task.mapError toText + subscriber <- Subscriber.new eventStore Registry.empty + result <- + Subscriber.readinessOf subscriber + |> Task.asResult + case result of + Ok _ -> fail "readinessOf concurrent transitions: not implemented — stub must fail" + Err _ -> fail "readinessOf concurrent transitions: not implemented — stub must fail" + + describe "readinessOfQuery" do + it "returns Just Ready when the named query is caught up" \_ -> do + eventStore <- InMemory.new |> Task.mapError toText + subscriber <- Subscriber.new eventStore Registry.empty + result <- + Subscriber.readinessOfQuery subscriber "orders" + |> Task.asResult + case result of + Ok (Just Ready) -> pass + Ok _ -> fail "readinessOfQuery returned wrong state — expected Just Ready" + Err err -> fail [fmt|Expected success but got: #{toText (show err)}|] + + it "returns Just Rebuilding when the named query is still replaying" \_ -> do + eventStore <- InMemory.new |> Task.mapError toText + subscriber <- Subscriber.new eventStore Registry.empty + result <- + Subscriber.readinessOfQuery subscriber "orders" + |> Task.asResult + case result of + Ok (Just Rebuilding) -> pass + Ok _ -> fail "readinessOfQuery returned wrong state — expected Just Rebuilding" + Err err -> fail [fmt|Expected success but got: #{toText (show err)}|] + + it "returns Just (Failed reason) when the named query has failed" \_ -> do + eventStore <- InMemory.new |> Task.mapError toText + subscriber <- Subscriber.new eventStore Registry.empty + result <- + Subscriber.readinessOfQuery subscriber "orders" + |> Task.asResult + case result of + Ok (Just (Failed _)) -> pass + Ok _ -> fail "readinessOfQuery returned wrong state — expected Just (Failed _)" + Err err -> fail [fmt|Expected success but got: #{toText (show err)}|] + + it "returns Nothing when the named query is not registered" \_ -> do + eventStore <- InMemory.new |> Task.mapError toText + subscriber <- Subscriber.new eventStore Registry.empty + result <- + Subscriber.readinessOfQuery subscriber "nonexistent" + |> Task.asResult + case result of + Ok Nothing -> pass + Ok _ -> fail "readinessOfQuery returned wrong state — expected Nothing" + Err err -> fail [fmt|Expected success but got: #{toText (show err)}|] + + it "returns the correct readiness for each query in a multi-query system" \_ -> do + eventStore <- InMemory.new |> Task.mapError toText + subscriber <- Subscriber.new eventStore Registry.empty + result <- + Subscriber.readinessOfQuery subscriber "orders" + |> Task.asResult + case result of + Ok _ -> fail "readinessOfQuery multi-query H7: not implemented — stub must fail" + Err _ -> fail "readinessOfQuery multi-query H7: not implemented — stub must fail" + + describe "Concurrency Hazard H1: Replay racing live subscription" do + it "handles simultaneous replay and live events without duplication" \_ -> do + eventStore <- InMemory.new |> Task.mapError toText + subscriber <- Subscriber.new eventStore Registry.empty + result <- + Subscriber.rebuildFrom subscriber "h1-query" (StreamPosition 0) rebuildOptionsDefault + |> Task.asResult + case result of + Ok _ -> fail "H1 race test: not implemented — stub must fail" + Err _ -> fail "H1 race test: not implemented — stub must fail" + + describe "Concurrency Hazard H2: Lost write via ON CONFLICT DO UPDATE" do + it "prevents position regression via CAS-on-position" \_ -> do + result <- + createTestStore + |> Task.asResult + case result of + Err err -> fail [fmt|Setup failed: #{toText (show err)}|] + Ok _ -> fail "H2 CAS property: not implemented — stub must fail" + + it "is not a last-writer-wins pattern (lower positions do not overwrite higher)" \_ -> do + result <- + createTestStore + |> Task.asResult + case result of + Err err -> fail [fmt|Setup failed: #{toText (show err)}|] + Ok _ -> fail "H2 deterministic race: not implemented — stub must fail" + + describe "Concurrency Hazard H3: Crash mid-update" do + it "restarts from persisted position after process crash pre-write" \_ -> do + result <- + createTestStore + |> Task.asResult + case result of + Err err -> fail [fmt|Setup failed: #{toText (show err)}|] + Ok _ -> fail "H3 crash pre-write: not implemented — stub must fail" + + it "converges to correct state after crash mid-transaction" \_ -> do + result <- + createTestStore + |> Task.asResult + case result of + Err err -> fail [fmt|Setup failed: #{toText (show err)}|] + Ok _ -> fail "H3 crash mid-txn: not implemented — stub must fail" + + it "does not re-apply events from after the persisted position on restart" \_ -> do + result <- + createTestStore + |> Task.asResult + case result of + Err err -> fail [fmt|Setup failed: #{toText (show err)}|] + Ok _ -> fail "H3 idempotent restart: not implemented — stub must fail" + + describe "Concurrency Hazard H4: Readiness flag visibility" do + it "never flips readiness to Ready before the last query_object_store write is durable" \_ -> do + eventStore <- InMemory.new |> Task.mapError toText + subscriber <- Subscriber.new eventStore Registry.empty + result <- + Subscriber.readinessOf subscriber + |> Task.asResult + case result of + Ok _ -> fail "H4 readiness durability: not implemented — stub must fail" + Err _ -> fail "H4 readiness durability: not implemented — stub must fail" + + it "ensures writes are visible before any request lands on the ready query" \_ -> do + eventStore <- InMemory.new |> Task.mapError toText + subscriber <- Subscriber.new eventStore Registry.empty + result <- + Subscriber.readinessOf subscriber + |> Task.asResult + case result of + Ok _ -> fail "H4 stale read prevention: not implemented — stub must fail" + Err _ -> fail "H4 stale read prevention: not implemented — stub must fail" + + describe "Concurrency Hazard H5: Hash-mismatch mid-flight" do + it "deletes stale hash rows atomically before replay starts" \_ -> do + eventStore <- InMemory.new |> Task.mapError toText + subscriber <- Subscriber.new eventStore Registry.empty + result <- + Subscriber.rebuildFrom subscriber "h5-query" (StreamPosition 0) rebuildOptionsDefault + |> Task.asResult + case result of + Ok _ -> fail "H5 atomic delete: not implemented — stub must fail" + Err _ -> fail "H5 atomic delete: not implemented — stub must fail" + + it "replays affected query only; other queries unaffected" \_ -> do + eventStore <- InMemory.new |> Task.mapError toText + subscriber <- Subscriber.new eventStore Registry.empty + result <- + Subscriber.rebuildFrom subscriber "h5-query" (StreamPosition 0) rebuildOptionsDefault + |> Task.asResult + case result of + Ok _ -> fail "H5 per-query isolation: not implemented — stub must fail" + Err _ -> fail "H5 per-query isolation: not implemented — stub must fail" + + it "handles concurrent live events arriving during hash-mismatch deletion" \_ -> do + eventStore <- InMemory.new |> Task.mapError toText + subscriber <- Subscriber.new eventStore Registry.empty + result <- + Subscriber.rebuildFrom subscriber "h5-query" (StreamPosition 0) rebuildOptionsDefault + |> Task.asResult + case result of + Ok _ -> fail "H5 concurrent live events: not implemented — stub must fail" + Err _ -> fail "H5 concurrent live events: not implemented — stub must fail" + + describe "Concurrency Hazard H6: Chunk-boundary tearing" do + it "produces identical state regardless of chunk boundaries" \_ -> do + eventStore <- InMemory.new |> Task.mapError toText + subscriber <- Subscriber.new eventStore Registry.empty + result <- + Subscriber.rebuildFrom subscriber "h6-query" (StreamPosition 0) rebuildOptionsDefault + |> Task.asResult + case result of + Ok _ -> fail "H6 chunk idempotency: not implemented — stub must fail" + Err _ -> fail "H6 chunk idempotency: not implemented — stub must fail" + + it "does not lose events at chunk boundaries" \_ -> do + eventStore <- InMemory.new |> Task.mapError toText + subscriber <- Subscriber.new eventStore Registry.empty + result <- + Subscriber.rebuildFrom subscriber "h6-query" (StreamPosition 0) rebuildOptionsDefault + |> Task.asResult + case result of + Ok _ -> fail "H6 no event loss: not implemented — stub must fail" + Err _ -> fail "H6 no event loss: not implemented — stub must fail" + + it "does not duplicate events at chunk boundaries" \_ -> do + eventStore <- InMemory.new |> Task.mapError toText + subscriber <- Subscriber.new eventStore Registry.empty + result <- + Subscriber.rebuildFrom subscriber "h6-query" (StreamPosition 0) rebuildOptionsDefault + |> Task.asResult + case result of + Ok _ -> fail "H6 no duplication: not implemented — stub must fail" + Err _ -> fail "H6 no duplication: not implemented — stub must fail" + + describe "Concurrency Hazard H7: Init ordering" do + it "registers a query after Application.run and requires full replay from position 0" \_ -> do + eventStore <- InMemory.new |> Task.mapError toText + subscriber <- Subscriber.new eventStore Registry.empty + result <- + Subscriber.rebuildFrom subscriber "late-query" (StreamPosition 0) rebuildOptionsDefault + |> Task.asResult + case result of + Ok _ -> fail "H7 late registration: not implemented — stub must fail" + Err _ -> fail "H7 late registration: not implemented — stub must fail" + + it "does not silently drop events for unregistered queries" \_ -> do + eventStore <- InMemory.new |> Task.mapError toText + subscriber <- Subscriber.new eventStore Registry.empty + result <- + Subscriber.rebuildFrom subscriber "h7-query" (StreamPosition 0) rebuildOptionsDefault + |> Task.asResult + case result of + Ok _ -> fail "H7 no silent drops: not implemented — stub must fail" + Err _ -> fail "H7 no silent drops: not implemented — stub must fail" + + describe "Concurrency Hazard H8: AsyncTask cancellation on shutdown" do + it "persists position consistently when rebuild is cancelled via SIGTERM" \_ -> do + eventStore <- InMemory.new |> Task.mapError toText + subscriber <- Subscriber.new eventStore Registry.empty + result <- + Subscriber.rebuildAllAsync subscriber rebuildOptionsDefault + |> Task.asResult + case result of + Ok _ -> fail "H8 SIGTERM safety: not implemented — stub must fail" + Err _ -> fail "H8 SIGTERM safety: not implemented — stub must fail" + + it "resumes correctly after SIGTERM-interrupted rebuild" \_ -> do + eventStore <- InMemory.new |> Task.mapError toText + subscriber <- Subscriber.new eventStore Registry.empty + result <- + Subscriber.rebuildFrom subscriber "h8-query" (StreamPosition 500) rebuildOptionsDefault + |> Task.asResult + case result of + Ok _ -> fail "H8 resume after SIGTERM: not implemented — stub must fail" + Err _ -> fail "H8 resume after SIGTERM: not implemented — stub must fail" + + describe "Concurrency Hazard H9: Multi-writer (future-proofing)" do + it "rejects stale writes when multiple processes attempt concurrent updates" \_ -> do + result <- + createTestStore + |> Task.asResult + case result of + Err err -> fail [fmt|Setup failed: #{toText (show err)}|] + Ok _ -> fail "H9 multi-writer CAS: not implemented — stub must fail" + + it "accommodates the contract for future multi-writer implementations" \_ -> do + result <- + createTestStore + |> Task.asResult + case result of + Err err -> fail [fmt|Setup failed: #{toText (show err)}|] + Ok _ -> fail "H9 multi-writer isolation: not implemented — stub must fail" + + describe "Property-based invariants" do + it "position never decreases for any (query_name, instance_uuid) — atomicUpdate position monotonicity" \_ -> do + result <- + createTestStore + |> Task.asResult + case result of + Err err -> fail [fmt|Setup failed: #{toText (show err)}|] + Ok _ -> fail "property: position monotonicity: not implemented — stub must fail" + + it "replay(E) == replay(replay(E)) for any event sequence E — rebuildFrom idempotency" \_ -> do + eventStore <- InMemory.new |> Task.mapError toText + subscriber <- Subscriber.new eventStore Registry.empty + result <- + Subscriber.rebuildFrom subscriber "idempotent-query" (StreamPosition 0) rebuildOptionsDefault + |> Task.asResult + case result of + Ok _ -> fail "property: rebuildFrom idempotency: not implemented — stub must fail" + Err _ -> fail "property: rebuildFrom idempotency: not implemented — stub must fail" + + it "state is invariant to chunk boundary placement — chunk-boundary transparency" \_ -> do + eventStore <- InMemory.new |> Task.mapError toText + subscriber <- Subscriber.new eventStore Registry.empty + result <- + Subscriber.rebuildFrom subscriber "boundary-query" (StreamPosition 0) rebuildOptionsDefault + |> Task.asResult + case result of + Ok _ -> fail "property: chunk-boundary transparency: not implemented — stub must fail" + Err _ -> fail "property: chunk-boundary transparency: not implemented — stub must fail" + + it "readinessOf == Ready iff all readinessOfQuery return Ready — readiness aggregate consistency" \_ -> do + eventStore <- InMemory.new |> Task.mapError toText + subscriber <- Subscriber.new eventStore Registry.empty + result <- + Subscriber.readinessOf subscriber + |> Task.asResult + case result of + Ok _ -> fail "property: readiness aggregate: not implemented — stub must fail" + Err _ -> fail "property: readiness aggregate: not implemented — stub must fail" + + describe "Round-trip serialization" do + it "Readiness round-trips correctly" \_ -> do + -- toJSON >> fromJSON should return the same Readiness value. + -- Stub: types exist but serialization is not implemented yet. + fail "Readiness round-trip: not implemented — stub must fail" + + it "RebuildOptions round-trips correctly" \_ -> do + fail "RebuildOptions round-trip: not implemented — stub must fail" + + it "QueryObjectStoreError round-trips correctly" \_ -> do + fail "QueryObjectStoreError round-trip: not implemented — stub must fail" + + it "QueryRebuildError round-trips correctly" \_ -> do + fail "QueryRebuildError round-trip: not implemented — stub must fail" diff --git a/core/test-service/Service/QueryObjectStore/PostgresSpec.hs b/core/test-service/Service/QueryObjectStore/PostgresSpec.hs new file mode 100644 index 00000000..97e932f6 --- /dev/null +++ b/core/test-service/Service/QueryObjectStore/PostgresSpec.hs @@ -0,0 +1,208 @@ +module Service.QueryObjectStore.PostgresSpec where + +import Core +import Json qualified +import Service.QueryObjectStore.Core (QueryObjectStore) +import Service.QueryObjectStore.Postgres ( + PostgresQueryObjectStoreConfig (..), + QueryObjectStoreError (..), + ) +import Service.QueryObjectStore.Postgres qualified as PostgresQOS +import Task qualified +import Test + + +-- | Standard test config pointing at the local Postgres instance. +testConfig :: PostgresQueryObjectStoreConfig +testConfig = + PostgresQueryObjectStoreConfig + { host = "localhost" + , databaseName = "neohaskell" + , user = "neohaskell" + , password = "neohaskell" + , port = 5432 + } + + +-- | Concrete-typed helper so all tests have an unambiguous query type. +mkStore :: PostgresQueryObjectStoreConfig -> Task QueryObjectStoreError (QueryObjectStore Json.Value) +mkStore = PostgresQOS.createQueryObjectStore + + +spec :: Spec Unit +spec = do + describe "createQueryObjectStore" do + it "returns QueryObjectStore with working pool on valid config" \_ -> do + result <- + mkStore testConfig + |> Task.asResult + case result of + Ok _ -> pass + Err err -> fail [fmt|Expected success but got: #{toText (show err)}|] + + it "fails with ConnectionFailed if database is unreachable" \_ -> do + let badConfig = testConfig { host = "unreachable.invalid", port = 9999 } + result <- + mkStore badConfig + |> Task.asResult + case result of + Err (ConnectionFailed _) -> pass + Err other -> fail [fmt|Expected ConnectionFailed but got: #{toText (show other)}|] + Ok _ -> fail "Expected failure but got success" + + it "fails with ConnectionFailed if credentials are invalid" \_ -> do + let badConfig = testConfig { password = "wrong_password_xyz" } + result <- + mkStore badConfig + |> Task.asResult + case result of + Err (ConnectionFailed _) -> pass + Err other -> fail [fmt|Expected ConnectionFailed but got: #{toText (show other)}|] + Ok _ -> fail "Expected failure but got success" + + it "fails with ConnectionFailed if database does not exist" \_ -> do + let badConfig = testConfig { databaseName = "nonexistent_db_xyz" } + result <- + mkStore badConfig + |> Task.asResult + case result of + Err (ConnectionFailed _) -> pass + Err other -> fail [fmt|Expected ConnectionFailed but got: #{toText (show other)}|] + Ok _ -> fail "Expected failure but got success" + + it "fails with ConnectionFailed if pool size is zero or negative" \_ -> do + -- A port of 0 is an invalid config — treated as pool-size/config validation failure. + let badConfig = testConfig { port = 0 } + result <- + mkStore badConfig + |> Task.asResult + case result of + Err (ConnectionFailed _) -> pass + Err other -> fail [fmt|Expected ConnectionFailed but got: #{toText (show other)}|] + Ok _ -> fail "Expected failure but got success" + + describe "atomicUpdateInPool" do + it "inserts a new row when (query_name, instance_uuid) does not exist" \_ -> do + result <- + mkStore testConfig + |> Task.asResult + case result of + Err err -> fail [fmt|Setup failed: #{toText (show err)}|] + Ok _ -> fail "atomicUpdateInPool: not implemented — stub must fail" + + it "updates existing row when new position is strictly greater" \_ -> do + result <- + mkStore testConfig + |> Task.asResult + case result of + Err err -> fail [fmt|Setup failed: #{toText (show err)}|] + Ok _ -> fail "atomicUpdateInPool CAS advance: not implemented — stub must fail" + + it "rejects write when new position is less than or equal to existing" \_ -> do + result <- + mkStore testConfig + |> Task.asResult + case result of + Err err -> fail [fmt|Setup failed: #{toText (show err)}|] + Ok _ -> fail "atomicUpdateInPool CAS reject: not implemented — stub must fail" + + it "fails with StatementFailed if query_name is NULL" \_ -> do + result <- + mkStore testConfig + |> Task.asResult + case result of + Err err -> fail [fmt|Setup failed: #{toText (show err)}|] + Ok _ -> do + -- Real test: call atomicUpdate with NULL query_name, expect StatementFailed. + -- Stub: createQueryObjectStore is not implemented yet, so we never reach here. + fail "atomicUpdateInPool NULL name: not implemented — stub must fail" + + it "fails with StatementFailed if state_json is not valid JSON" \_ -> do + result <- + mkStore testConfig + |> Task.asResult + case result of + Err err -> fail [fmt|Setup failed: #{toText (show err)}|] + Ok _ -> fail "atomicUpdateInPool bad JSON: not implemented — stub must fail" + + it "updates query_hash when schema evolves" \_ -> do + result <- + mkStore testConfig + |> Task.asResult + case result of + Err err -> fail [fmt|Setup failed: #{toText (show err)}|] + Ok _ -> fail "atomicUpdateInPool hash update: not implemented — stub must fail" + + describe "getFromPool" do + it "returns Just (state, position) when row exists" \_ -> do + result <- + mkStore testConfig + |> Task.asResult + case result of + Err err -> fail [fmt|Setup failed: #{toText (show err)}|] + Ok _ -> fail "getFromPool found: not implemented — stub must fail" + + it "returns Nothing when no row exists for that query/instance pair" \_ -> do + result <- + mkStore testConfig + |> Task.asResult + case result of + Err err -> fail [fmt|Setup failed: #{toText (show err)}|] + Ok _ -> fail "getFromPool not-found: not implemented — stub must fail" + + it "fails with DecodingFailed if state_json column is NULL" \_ -> do + result <- + mkStore testConfig + |> Task.asResult + case result of + Err err -> fail [fmt|Setup failed: #{toText (show err)}|] + Ok _ -> fail "getFromPool NULL state_json: not implemented — stub must fail" + + it "fails with DecodingFailed if position column is corrupted (non-integer)" \_ -> do + result <- + mkStore testConfig + |> Task.asResult + case result of + Err err -> fail [fmt|Setup failed: #{toText (show err)}|] + Ok _ -> fail "getFromPool corrupted position: not implemented — stub must fail" + + it "returns first row if multiple instances exist for same query/name pair" \_ -> do + result <- + mkStore testConfig + |> Task.asResult + case result of + Err err -> fail [fmt|Setup failed: #{toText (show err)}|] + Ok _ -> fail "getFromPool multi-instance: not implemented — stub must fail" + + describe "getAllFromPool" do + it "returns empty Array when no rows match the query_name" \_ -> do + result <- + mkStore testConfig + |> Task.asResult + case result of + Err err -> fail [fmt|Setup failed: #{toText (show err)}|] + Ok _ -> fail "getAllFromPool empty: not implemented — stub must fail" + + it "returns Array with one row when exactly one instance exists" \_ -> do + result <- + mkStore testConfig + |> Task.asResult + case result of + Err err -> fail [fmt|Setup failed: #{toText (show err)}|] + Ok _ -> fail "getAllFromPool single row: not implemented — stub must fail" + + it "returns Array with all instances when multiple instances exist for same query" \_ -> do + result <- + mkStore testConfig + |> Task.asResult + case result of + Err err -> fail [fmt|Setup failed: #{toText (show err)}|] + Ok _ -> fail "getAllFromPool multi-row: not implemented — stub must fail" + + it "fails with DecodingFailed if any row has corrupted state_json" \_ -> do + result <- + mkStore testConfig + |> Task.asResult + case result of + Err err -> fail [fmt|Setup failed: #{toText (show err)}|] + Ok _ -> fail "getAllFromPool corrupted row: not implemented — stub must fail" diff --git a/core/test-service/Service/Transport/Web/ReadinessSpec.hs b/core/test-service/Service/Transport/Web/ReadinessSpec.hs new file mode 100644 index 00000000..109b79fe --- /dev/null +++ b/core/test-service/Service/Transport/Web/ReadinessSpec.hs @@ -0,0 +1,81 @@ +module Service.Transport.Web.ReadinessSpec where + +import Core +import Service.Transport.Web.Readiness (ReadinessConfig (..)) +import Service.Transport.Web.Readiness qualified as WebReadiness +import Test + + +-- | Default readiness config used across tests. +defaultReadinessConfig :: ReadinessConfig +defaultReadinessConfig = ReadinessConfig + { readinessPath = "ready" + , includeQueryStatus = True + } + + +spec :: Spec Unit +spec = do + describe "HTTP GET /ready" do + it "returns 200 OK with status:ready when all queries are caught up" \_ -> do + -- When readinessOf returns Ready, the handler must emit HTTP 200. + let _handler = WebReadiness.handleReadinessRequest + fail "GET /ready 200 ready: not implemented — stub must fail" + + it "returns 503 Service Unavailable with status:rebuilding when any query is replaying" \_ -> do + let _handler = WebReadiness.handleReadinessRequest + fail "GET /ready 503 rebuilding: not implemented — stub must fail" + + it "includes per-query lag and position in rebuilding response" \_ -> do + let _handler = WebReadiness.handleReadinessRequest + fail "GET /ready lag/position in rebuilding response: not implemented — stub must fail" + + it "includes estimatedSecondsRemaining when rebuilding" \_ -> do + let _handler = WebReadiness.handleReadinessRequest + fail "GET /ready estimatedSecondsRemaining: not implemented — stub must fail" + + it "returns 503 with status:failed and failedQueries list when any query has failed" \_ -> do + let _handler = WebReadiness.handleReadinessRequest + fail "GET /ready 503 failed: not implemented — stub must fail" + + it "includes all failed queries in the failedQueries array" \_ -> do + let _handler = WebReadiness.handleReadinessRequest + fail "GET /ready all failedQueries: not implemented — stub must fail" + + it "does not include rebuilding queries in failedQueries when ready/rebuilding state" \_ -> do + let _handler = WebReadiness.handleReadinessRequest + fail "GET /ready no overlap in failedQueries: not implemented — stub must fail" + + it "returns 404 if /ready endpoint is disabled via withoutReadinessEndpoint" \_ -> do + let _handler = WebReadiness.handleReadinessRequest + fail "GET /ready 404 when disabled: not implemented — stub must fail" + + describe "HTTP GET /queries/{name}" do + it "returns 200 OK with query results when the query is Ready" \_ -> do + let _handler = WebReadiness.handleQueryReadinessRequest + fail "GET /queries/{name} 200: not implemented — stub must fail" + + it "returns 503 Service Unavailable with X-Query-Status: rebuilding when query is rebuilding" \_ -> do + let _handler = WebReadiness.handleQueryReadinessRequest + fail "GET /queries/{name} 503 rebuilding header: not implemented — stub must fail" + + it "includes lag in the 503 rebuilding response" \_ -> do + let _handler = WebReadiness.handleQueryReadinessRequest + fail "GET /queries/{name} lag in 503: not implemented — stub must fail" + + it "returns 503 Service Unavailable with X-Query-Status: failed when query has failed" \_ -> do + let _handler = WebReadiness.handleQueryReadinessRequest + fail "GET /queries/{name} 503 failed header: not implemented — stub must fail" + + it "returns 404 Not Found when the query is not registered" \_ -> do + let _handler = WebReadiness.handleQueryReadinessRequest + fail "GET /queries/{name} 404 unregistered: not implemented — stub must fail" + + it "does not cache the readiness state (fresh read on every request)" \_ -> do + let _handler = WebReadiness.handleQueryReadinessRequest + fail "GET /queries/{name} no-cache H4: not implemented — stub must fail" + + describe "ReadinessConfig" do + it "ReadinessConfig round-trips correctly" \_ -> do + -- toJSON >> fromJSON should return the same ReadinessConfig value. + fail "ReadinessConfig round-trip: not implemented — stub must fail" diff --git a/docs/decisions/0059-async-query-rebuild-with-persistent-checkpoints.md b/docs/decisions/0059-async-query-rebuild-with-persistent-checkpoints.md new file mode 100644 index 00000000..7833458f --- /dev/null +++ b/docs/decisions/0059-async-query-rebuild-with-persistent-checkpoints.md @@ -0,0 +1,584 @@ +# ADR-0059: Async Query Rebuild with Persistent Checkpoints + +> Issue: [#650 — Query rebuild blocks HTTP readiness on every restart](https://github.com/neohaskell/NeoHaskell/issues/650) + +## Status + +Proposed + +## Context + +### Current State + +`Service.Query.Subscriber.rebuildAll` is invoked synchronously from +`Application.run` (`core/service/Service/Application.hs` lines 1154–1161) +before any transport binds. It reads the entire event store from +`StreamPosition 0` with `Limit 9223372036854775807` (Int64 max) and +funnels every event through every registered `QueryUpdater`. The only +`QueryObjectStore` implementation today is in-memory, so the read-model +state starts empty on every restart. There is no persistent cursor, no +snapshot, and no way to skip work the previous process already did. + +ADR-0007 already flags this in its Related Work section as item #4 +("Checkpoint persistence"). This ADR is that follow-up. + +### Use Cases + +- **Rolling deploys on fly.io / Kubernetes** — the new machine must + pass a readiness probe within seconds, even when the event log + contains millions of events. Today readiness is `O(eventCount)`. +- **Crash recovery and machine restarts** — a transient OOM or a + serverless Postgres warmup should not turn into a multi-minute window + where the load balancer sees a dead port. +- **Shipping one-line code fixes** — a no-op deploy should not force a + full read-model re-derivation. Persisting state with its position + turns that into a few cents of Postgres reads instead of a full + replay. +- **Adding a query to an existing service** — only the newly-added + query should replay from `StreamPosition 0`. Pre-existing queries + continue from their stored position. + +### Design Goals + +1. **HTTP `/health` is constant-time** regardless of event count, so + liveness probes succeed the moment the process binds. +2. **`/ready` reflects actual catch-up state**, so traffic is only + routed to a machine when its read models are current. +3. **Per-query positions**, so adding or evolving one query does not + penalise the others. +4. **Persistent `QueryObjectStore` backend**, so a restart does not + discard work the previous process already paid for. +5. **Chunked reads with progress logging**, so a slow rebuild is + observable rather than silent. +6. **Schema-evolution safety**, so a `KnownHash` mismatch forces a + full replay of only the affected query — never silent corruption. +7. **Default API stays Jess-friendly** — existing apps must keep + working without learning new vocabulary. The async path and the + readiness gate are framework-provided defaults. + +### GitHub Issue + +- [#650: Query rebuild blocks HTTP readiness on every restart — no checkpoint, no persistent QueryObjectStore](https://github.com/neohaskell/NeoHaskell/issues/650) + +## Decision drivers + +- **Boot time on warm restart must be constant**, not linear in event + count. Anything else fails the rolling-deploy use case. +- **The default `QueryObjectStoreConfig` in `Service.Application` stays + in-memory** — adding Postgres must be a one-line opt-in, not a + required dependency for hello-world apps. +- **The persisted position and the persisted state must commit + together** — otherwise a crash between the two creates either a + duplicate apply (if position is written first) or a lost update (if + state is written first). Both are observable as read-model drift. + The fix is to keep them in the *same row* — precedent: ADR-0006 + `Snapshot { state, position }`. +- **Readiness is a first-class concept** — `/health` (liveness, per + ADR-0025) and `/ready` (readiness) are separate endpoints. The + framework owns both. Jess never writes either one by hand. +- **One readiness flag per query, plus an aggregate** — global + readiness is `all queries are ready`. A slow query does not have + to block the rest. + +## Considered options + +### Option 1 — Persistent `QueryObjectStore.Postgres` with embedded position + async rebuild (chosen) + +Layered set of small changes: + +1. New `Service.QueryObjectStore.Postgres` backend (table + `query_object_store` keyed by `(query_name, instance_uuid)`, JSON + column for the serialised query state, `position` and `query_hash` + columns in the same row — same shape as `Snapshot { state, position }` + in ADR-0006). +2. `atomicUpdate` is a single + `INSERT ... ON CONFLICT (query_name, instance_uuid) DO UPDATE` + that writes state and position together. One transaction, one + table, no two-table coupling. +3. `Subscriber.rebuildAll` is split into `rebuildFrom` (per-query, + resumable, chunked) and `rebuildAllAsync` which spawns the work via + `AsyncTask.run` and flips `subscriber.readiness` when done. +4. `Application.run` becomes non-blocking on rebuild — transports + bind immediately, `/health` is 200 (ADR-0025), `/ready` waits on + `subscriber.readiness`. Per-query endpoints respect per-query + readiness with response header `X-Query-Status: rebuilding`. +5. Chunked reads (default `Limit 1000` per page) with progress logging + and observability counters. +6. `KnownHash` mismatch (`deriveQuery`-derived) triggers a full replay + of the affected query only — rows with the stale hash are deleted + at startup, that query replays from `StreamPosition 0`. + +### Option 2 — Synchronous rebuild + Postgres `QueryObjectStore` only + +Persist the object store but keep the synchronous rebuild and the +single global cursor. + +- Rejected: still re-reads the full event history on every restart + because there is no per-query position. Strictly worse than option 1 + on boot time; only marginally better on memory pressure. + +### Option 3 — Async rebuild + InMemory only + +Spin off `rebuildAll` to an async task, but keep the in-memory store +and no persisted position. + +- Rejected: unblocks startup but `/ready` stays 503 for the full + rebuild window on every restart. Acceptable only for tiny event + stores — useless at scale. + +### Option 4 — Snapshot the in-memory store to JSONL periodically + +Mirror `SimpleEventStore`'s persistent JSONL mode. + +- Rejected: recovery semantics are weaker than transactional Postgres + (torn writes, no `INSERT … ON CONFLICT`, no per-query atomicity) and + gain nothing for users who already run Postgres for the event store. + +### Option 5 — Eager hydration on first read instead of at startup + +Defer all replay until the first `GET /queries/{name}` call hits. + +- Rejected: pushes the cost to an unbounded first-request latency. + Breaks the 50k req/s budget the framework targets, and the first + caller becomes the unlucky one. + +### Option 6 — Drop rebuild entirely; treat queries as live-only + +- Rejected: silently breaks the ADR-0007 consistency guarantee. Every + event emitted before the current process started would be lost from + the read model. + +| Option | Verdict | Reason | +|--------|---------|--------| +| 1. Postgres store with embedded position + async | **Chosen** | Only design that satisfies all seven goals. | +| 2. Postgres store, sync rebuild | Rejected | Still O(events) on boot. | +| 3. Async rebuild, InMemory store | Rejected | `/ready` still flaps on every restart. | +| 4. JSONL snapshots | Rejected | Weaker recovery semantics than Postgres. | +| 5. Lazy hydration | Rejected | Unbounded first-request latency. | +| 6. Live-only queries | Rejected | Silently loses pre-existing events. | + +## Decision outcome + +Adopt Option 1. The implementation is layered — each piece compiles and +ships on its own, and each is independently testable. + +### 1. Persistent `QueryObjectStore.Postgres` + +New module `Service.QueryObjectStore.Postgres` provides a Hasql-backed +`QueryObjectStore` implementation. Position lives **inside** the +state row (precedent: ADR-0006 `Snapshot { state, position }`) so the +state write and the position write commit together by construction. +Schema: + +```sql +CREATE TABLE query_object_store ( + query_name TEXT NOT NULL, + instance_uuid UUID NOT NULL, + query_hash TEXT NOT NULL, -- KnownHash-derived, used for schema evolution + position BIGINT NOT NULL, -- StreamPosition reached at this state + state_json JSONB NOT NULL, + updated_at TIMESTAMPTZ NOT NULL DEFAULT now(), + PRIMARY KEY (query_name, instance_uuid) +); +``` + +`atomicUpdate` becomes a single +`INSERT ... ON CONFLICT (query_name, instance_uuid) DO UPDATE` +statement using Hasql's typed `Statement` API (string concatenation is +unrepresentable — see ADR-0027 / EventStore.Postgres precedent). One +transaction, no two-table coupling. + +Serialization to `state_json` runs on the per-event hot path. Any +persisted query state must implement `toEncoding` directly — either +via `Generic`-derived `deriveJSON` or hand-written `toEncoding` — to +write straight into the encoding `Builder` without materialising an +intermediate `Value` tree. A `toJSON`-only instance is rejected at +compile time by a `QueryStateSerializable` constraint on +`useQueryObjectStore`. + +**Data classification.** `state_json` inherits the data-classification +properties of its source events; this ADR does not introduce a new PII +surface (the same data already lives in `eventstore.events` JSONB +under ADR-0004). Encryption-at-rest, if required, is a Postgres- +deployment concern at the cluster/tablespace layer, not an +application-schema one. Queries that *project* sensitive fields out of +events should drop them explicitly in the `QueryUpdater` — +documenting this in the `useQueryObjectStore` haddock is in-scope for +phase 10. + +### 2. Startup resume + +For each registered query, on startup the framework runs: + +```sql +SELECT min(position) +FROM query_object_store +WHERE query_name = ? + AND query_hash = ? +``` + +The minimum position across that query's rows is the resume point — +the subscriber starts there. Rows whose `query_hash` does not match +the current `KnownHash`-derived hash for that query are treated as +garbage and deleted; that query then replays from `StreamPosition 0`. +Schema evolution is therefore explicit and per-query: a hash change +forces a full replay of exactly that query, nothing else. + +### 3. Async rebuild + readiness + +`Service.Query.Subscriber.QuerySubscriber` gains a new +`ConcurrentVar` carrying per-query readiness: + +```haskell +data Readiness + = Rebuilding + | Ready + deriving (Eq, Show) +``` + +`Application.run` replaces the synchronous call site with an async +spawn, and transports bind immediately. `/health` (ADR-0025) keeps its +existing meaning — the process is up — and is unaffected by rebuild +state. `/ready` is the new endpoint that gates on +`subscriber.readiness`. + +**Rebuild timeout.** Each query's rebuild runs under a configurable +per-query timeout (default `5 minutes`, settable via +`RebuildOptions { timeout :: Duration }`). On timeout or updater +exception, the readiness state flips to a third constructor: + +```haskell +data Readiness + = Rebuilding + | Ready + | Failed Text -- reason: timeout, updater exception, hash-replay failure + deriving (Eq, Show) +``` + +`Failed` is a terminal state for that query — `/ready` reports it +distinctly so the orchestrator stops flapping, and the per-query +endpoint returns `503` with `X-Query-Status: failed`. Recovery +requires operator intervention (fix the updater, then restart). +Updater exceptions are logged at `WARN` with the offending event +position, the query name, and a truncated payload digest. + +### 4. Chunked reads + progress + +`Subscriber.rebuildFrom` reads events in pages of +`Limit 1000` (configurable via `RebuildOptions`), logs progress every +page, and emits the three observability counters +(`events_replayed`, `lag_from_head`, `duration_seconds`) per query. + +### 5. Module placement + +```text +core/service/ + Service/ + Query/ + Subscriber.hs -- modified: rebuildFrom, readiness + QueryObjectStore/ + Postgres.hs -- new: Hasql-backed implementation (state + position in one row) + Transport/ + Web/ + Readiness.hs -- new: /ready handler (/health stays in existing module) +``` + +Follows the established flat structure with one level of nesting for +implementation variants. No `Service.Query.Checkpoint` module — the +position lives inside `QueryObjectStore` rows, so there is nothing +separate to put behind a trait. + +### Performance testing + +A dedicated benchmark suite exercises the new design under the +conditions that motivated it. Each suite below names its assertion +shape; floor numbers are filled in during benchmarking (phase 5). + +1. **HTTP-bind latency** (Hurl + `time_total`): `/health` returns + `200` within X ms of process start, *flat* across event counts + 1k / 10k / 100k. +2. **Cold-start replay throughput** (`tasty-bench` or whatever + harness `EventStore` already uses): events/sec replayed at chunk + size `1000` through one trivial updater, reported as a single + number. +3. **Warm-restart latency**: with `query_object_store` already at + head, `/ready` flips to `200` within X ms regardless of N. Proves + "skip already-processed work" actually skips. +4. **Chunked-read memory bound**: heap stays under M bytes during a + 100k-event replay. Proves chunking, not just termination. +5. **`atomicUpdate` contention** (`criterion` or equivalent): K + concurrent updates to the same `(query_name, instance_uuid)` — + throughput curve reported at K = 1, 10, 100. +6. **Per-query selective replay**: hash mismatch on 1 of K queries + replays only that one; other queries' rebuild times statistically + indistinguishable from the no-mismatch baseline. +7. **Catch-up during live events**: replay running concurrently with + live appends → effective catch-up rate ≥ append rate at steady + state. +8. **Idempotent replay (property)**: replaying the same N events + twice produces byte-identical state and an identical final + position. + +Suites match the rigor expected of `EventStore.Postgres`. Phase 8 +(test spec) checks for an existing `tasty-bench` harness under +`core/test-service/`; if absent, adding a minimal harness is +in-scope for this ADR's implementation. + +### Concurrency & correctness testing + +Async rebuild + live subscription + persistent store creates concrete +hazards. Each `H#` below has a named test counterpart. + +- **H1 — Replay racing live subscription.** Async replay catches up + to position `P` while the live subscriber is already processing + events `≥ P`; the same event can hit `atomicUpdate` from two + threads in indeterminate order. + - Test: race test seeds the EventStore at positions `1..1000`, + starts replay, injects a live event near the boundary; final + state is byte-equal to a "replay only, no live" reference. + +- **H2 — Lost write via `ON CONFLICT DO UPDATE`.** Two concurrent + updates with positions `(Pₐ, P_b)` where `Pₐ > P_b` — naïve + last-writer-wins overwrites the higher position with the lower + one. + - Test: property test asserts `position` is monotonically + non-decreasing per `(query_name, instance_uuid)` under any + random interleaving of `N = 100` concurrent updates. + - Resolution: `atomicUpdate` uses CAS-on-position semantics — the + SQL `DO UPDATE` fires only `WHERE query_object_store.position < EXCLUDED.position`. + +- **H3 — Crash mid-update.** Process killed between event fetch and + Postgres commit; after restart, replay must converge. + - Test: crash injection (`Task.throw` or `pg_terminate_backend`) + at three points — pre-write, mid-transaction, + post-write-pre-ack. Restart and verify state converges to the + expected baseline. + +- **H4 — Readiness flag visibility.** `subscriber.readiness` flips + to `Ready` before the last `query_object_store` write is durable + → a request lands and reads stale or missing state. + - Test: assert no query read can succeed before its last write is + observable via a *fresh* Postgres connection (no caching). + +- **H5 — Hash-mismatch mid-flight.** Hash changes for query `Q`; we + delete `Q`'s rows and replay from `0` while a concurrent live + event arrives for `Q`. + - Test: trigger a hash change while live events are arriving for + that query; final state equals "fresh replay from 0 with all + events"; other queries' positions are unchanged. + +- **H6 — Chunk-boundary tearing.** Chunked read at `Limit 1000`; + causally-linked events that span chunks must still produce + identical state to a single-chunk read. + - Test: property test — + `replay (events₁ ++ events₂) ≡ replay events₁ ; replay events₂` + from the same starting position, for any split point. + +- **H7 — Init ordering.** Live subscription started before all + queries registered → events silently dropped for unregistered + queries. + - Test: register a query *after* `Application.run` has started → + either rejected with a clear error or accepted with full replay + from `0`. No silent drops, asserted by a sum projection. + +- **H8 — AsyncTask cancellation on shutdown.** SIGTERM during + rebuild → `AsyncTask` cancelled; persisted position must be safe + (no partial commits past it). + - Test: Hurl scenario — SIGTERM mid-rebuild, restart; replay + resumes from the persisted position; no events double-counted + (asserted via a sum-style query). + +- **H9 — Multi-writer (future-proofing only).** Out of MVP scope, + but the design must not preclude it. + - Test: assert the contract — `atomicUpdate` with a stale + `expectedPosition` rejects rather than overwrites. Keeps the + door open for HA without designing for it now. + +Race orchestration follows the barrier pattern used in +`core/test/Service/EventStore/` (verified during phase 8). Crash +injection helpers under `core/testlib/Test/Service/` — added +in-scope if missing. + +## Public API + +The framework already provides `useQueryObjectStore`, `withQuery`, and +the `/health` endpoint (ADR-0025). The new surface area is minimal: +the `Postgres` constructor for `QueryObjectStore`, the framework-owned +`/ready` endpoint (which is on by default), and the response header +`X-Query-Status: rebuilding` on query endpoints during rebuild. + +### `Service.QueryObjectStore.Postgres` + +```haskell +module Service.QueryObjectStore.Postgres ( + PostgresQueryObjectStoreConfig (..), +) where + +import Service.QueryObjectStore.Core (QueryObjectStoreConfig (..)) +import Text (Text) + + +data PostgresQueryObjectStoreConfig = PostgresQueryObjectStoreConfig + { host :: Text, + databaseName :: Text, + user :: Text, + password :: Text, + port :: Int + } + deriving (Eq, Ord, Show) + + +instance QueryObjectStoreConfig PostgresQueryObjectStoreConfig where + createQueryObjectStore config = do + -- See Service.EventStore.Postgres.Internal for the pool pattern. + pool <- acquirePool config + Task.yield + QueryObjectStore + { get = getFromPool pool, + atomicUpdate = atomicUpdateInPool pool, + getAll = getAllFromPool pool + } +``` + +### Application builder + +```haskell +app :: Application +app = + Application.new + |> Application.withEventStore postgresEventStoreConfig + |> Application.useQueryObjectStore postgresConfig + |> Application.useReadinessEndpoint + |> Application.withQuery @UserOrders + |> Application.withService userService +``` + +`useQueryObjectStore postgresConfig` is the entire opt-in for +persistent state with embedded position — there is no separate +checkpoint builder. `useReadinessEndpoint` is on by default; the +explicit form is shown so Jess sees one autocomplete entry that +covers the readiness contract. + +### Readiness endpoint contract + +```text +GET /health → 200 {"status":"ok"} (ADR-0025; unchanged) + +GET /ready → 200 {"status":"ready"} (when overallReadiness == Ready) + → 503 {"status":"rebuilding", "queries":[{name,lag}]} + +GET /queries/{name} + → 200 [...] (when readinessOf name == Ready) + → 503 {"status":"rebuilding"} with header X-Query-Status: rebuilding + → 503 {"status":"failed","reason":"..."} with header X-Query-Status: failed +``` + +`/health` (ADR-0025) is unchanged — the process is alive. `/ready` is +the new probe that reflects subscriber catch-up. Per-query degraded +mode is a header (`X-Query-Status: rebuilding`) on the existing query +endpoint, not a separate URL. + +### Example: handling a rebuilding query in client code + +Jess never writes this — it is what the framework returns. Sample +response body: + +```json +{ + "status": "rebuilding", + "queries": [ + {"name": "user-orders", "lag": 42091, "position": 1234567} + ] +} +``` + +## Consequences + +### Positive + +1. **HTTP liveness is constant-time on warm restart.** `/health` + succeeds the moment the process binds; rebuild work is invisible + to liveness probes (ADR-0025 semantics preserved). +2. **Rolling deploys stop flapping.** Load balancers route to + machines only when `subscriber.readiness == Ready`, observed via + `/ready`. +3. **Persistent state survives restarts.** A no-op deploy reads zero + events from the EventStore for queries that were caught up before + the restart. +4. **Position is embedded in the persisted state record** (precedent: + ADR-0006 `Snapshot`). Serverless-ready — the DB row IS the cache, + no separate cursor to synchronise. +5. **Per-query isolation.** Adding `OrderSummary` to a service with a + caught-up `UserOrders` only replays the new query. +6. **Schema evolution is safe and explicit.** A `KnownHash` change + forces a full replay for exactly that query, leaving the others + untouched. +7. **Observable rebuild progress.** `events_replayed`, + `lag_from_head`, `duration_seconds` are emitted per query — a slow + rebuild becomes a measurement rather than a guess. +8. **Default API stays Jess-clean.** Apps that do not opt into + Postgres keep working unchanged; the only visible difference is + that `/health` returns immediately and `/ready` is a new probe + they can ignore until they need it. +9. **Aligns with existing precedents.** Same trait + Postgres-impl + pattern as ADR-0006 (snapshot cache, same `state + position` + shape) and ADR-0004 (EventStore); readiness contract extends + ADR-0025 (`/health`). + +### Negative + +1. **One new table.** `query_object_store` must be migrated into the + user's Postgres schema. The `EventStore.Postgres` migration + pattern is the precedent — the framework runs the migration on + startup. +2. **Larger surface for `QueryObjectStore` implementations.** Future + backends (Redis, DynamoDB) must support an atomic + compare-and-swap on `(state, position)` per row, or accept a + documented at-least-once apply semantics. +3. **Per-query readiness adds one `ConcurrentVar` per registered + query.** Negligible memory cost (a handful of bytes per query) + but it is a new concurrency primitive on the hot path of every + event delivery. The 50k req/s budget is not affected because + reads only touch `ConcurrentVar.peek`. +4. **`X-Query-Status: rebuilding` is a new client contract.** Hurl + smoke tests and acceptance tests need to learn the header. JSON + 503 body is documented but new. +5. **Async rebuild surfaces a class of bug that synchronous rebuild + hid** — a buggy `QueryUpdater` no longer crashes the boot, so its + failure must be surfaced through structured logs and the rebuild + counter rather than the process exit code. + +### Risks + +| Risk | Mitigation | +|------|------------| +| `INSERT ... ON CONFLICT DO UPDATE` with CAS-on-position semantics is a non-trivial Hasql `Statement`. | Single statement using Hasql's typed `Statement` API — same pattern as `EventStore.Postgres` already uses for insert + notify. Reviewed in phase 4 (security) and phase 5 (perf). | +| `KnownHash` mismatch path could be exploited to force expensive replays. | Hash is derived at compile time by `deriveQuery`; not user-controlled at runtime. Replay is per-query, not global, and progress is observable. | +| Async rebuild masks updater failures. | Mandatory structured-log + counter on updater failure; `/ready` body lists per-query lag so a stuck query is visible. | +| Long catch-up windows on first deploy of a Postgres-backed store. | Acceptable — first deploy is the one time per service that O(eventCount) work is unavoidable. Progress logging makes it observable; the async path keeps `/health` honest throughout. | +| Hash mismatch + crash mid-rebuild could leave row stale. | State and position are written in the same row, so a crash mid-rebuild simply resumes from the last successfully-committed position; CAS-on-position prevents regression. | + +### Mitigations + +- Migration script for the new table follows the existing + `EventStore.Postgres` precedent (idempotent `CREATE TABLE IF NOT + EXISTS`, runs on startup). +- Default `useQueryObjectStore` falls back to in-memory, so the + framework does not become harder to spin up locally. +- Hurl acceptance tests in `testbed/` add coverage for `/ready` and + the `X-Query-Status: rebuilding` header on a query endpoint hit + during rebuild. +- ADR-0007 status section is updated to cross-reference this ADR. + +## References + +- [#650: Query rebuild blocks HTTP readiness on every restart](https://github.com/neohaskell/NeoHaskell/issues/650) +- [ADR-0007: Queries (Read Models)](0007-queries-read-models.md) — original CQRS read-model design; §"Application Startup Sequence" and §"Related Work" item 4. +- [ADR-0006: Entity Snapshot Cache](0006-entity-snapshot-cache.md) — precedent for the `state + position` in-one-row shape. +- [ADR-0004: EventStore Abstraction](0004-eventstore-abstraction.md) — Postgres backend template. +- [ADR-0025: Auto Health Endpoint for WebTransport Apps](0025-auto-health-endpoint.md) — `/health` precedent; `/ready` extends it. +- [ADR-0027: PostgreSQL Connection Pool Health for Serverless Databases](0027-postgres-pool-health.md) — readiness-signal precedent; pool config to inherit. +- [core/service/Service/Query/Subscriber.hs](../../core/service/Service/Query/Subscriber.hs) — current `rebuildAll`. +- [core/service/Service/Application.hs](../../core/service/Service/Application.hs) (lines 1154–1161) — synchronous call site. +- [core/service/Service/QueryObjectStore/Core.hs](../../core/service/Service/QueryObjectStore/Core.hs) — trait that needs a Postgres impl. +- [core/service/Service/EventStore/Postgres/Internal.hs](../../core/service/Service/EventStore/Postgres/Internal.hs) — template for the new Postgres backend.