Skip to content

feat: use cache for workflow queues#312

Merged
jason-lynch merged 1 commit intomainfrom
feat/cache-workflow-queues
Mar 31, 2026
Merged

feat: use cache for workflow queues#312
jason-lynch merged 1 commit intomainfrom
feat/cache-workflow-queues

Conversation

@jason-lynch
Copy link
Copy Markdown
Member

Summary

Updates the workflows backend to use the write-through cache for workflow and activity queue items. This eliminates a frequent, expensive range query and enables us to reduce the queue polling frequency to the default 200ms without overloading Etcd. We were previously performing these range queries every 500ms, but the queries themselves could take up to 1.5s - even when the range was empty.

Testing

  • Database operations and overall E2E runtime should be ~30% faster.
  • If you enable warn logging for Etcd, you should no longer see the workflow and activity range queries reported by the slow query logger
{
  "profiling_enabled": true,
  "client_addresses": ["127.0.0.1"],
  "etcd_server": {
    "log_level": "warn"
  }
}

Notes for Reviewers

I disabled one of the workflow backend tests and included an explanation in the Makefile. This test passes most of the time, but that occasional failure is annoying.

@coderabbitai
Copy link
Copy Markdown

coderabbitai bot commented Mar 20, 2026

No actionable comments were generated in the recent review. 🎉

ℹ️ Recent review info
⚙️ Run configuration

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro

Run ID: a505e40d-dfbf-4615-bf33-eda4c8123499

📥 Commits

Reviewing files that changed from the base of the PR and between 0213f8c and 7120397.

📒 Files selected for processing (10)
  • Makefile
  • server/internal/app/app.go
  • server/internal/storage/cache.go
  • server/internal/workflows/backend/etcd/activity_queue_item/store.go
  • server/internal/workflows/backend/etcd/etcd.go
  • server/internal/workflows/backend/etcd/etcd_test.go
  • server/internal/workflows/backend/etcd/store.go
  • server/internal/workflows/backend/etcd/workflow_queue_item/store.go
  • server/internal/workflows/provide.go
  • server/internal/workflows/worker.go
✅ Files skipped from review due to trivial changes (2)
  • server/internal/app/app.go
  • server/internal/workflows/provide.go
🚧 Files skipped from review as they are similar to previous changes (4)
  • server/internal/workflows/backend/etcd/etcd_test.go
  • server/internal/workflows/worker.go
  • server/internal/workflows/backend/etcd/activity_queue_item/store.go
  • server/internal/workflows/backend/etcd/workflow_queue_item/store.go

📝 Walkthrough

Walkthrough

Adds an in-memory cache layer to etcd-backed workflow queue stores, exposes cache lifecycle and error channels through the backend and worker, serializes task polling with mutexes, updates tests to start/stop caches, and introduces Makefile flags to control test caching behavior.

Changes

Cohort / File(s) Summary
Build Configuration
Makefile
Added TEST_DISABLE_CACHE ?= 0, test_disable_cache derivation, workflows_backend_skip skip flag; updated test, test-etcd-lifecycle, test-workflows-backend, and test-ci targets to pass conditional -count=1 and skip flags.
Store cache implementations
server/internal/workflows/backend/etcd/activity_queue_item/store.go, server/internal/workflows/backend/etcd/workflow_queue_item/store.go
Added internal cache storage.Cache[*Value], lifecycle methods StartCache/StopCache, PropagateErrors, and rewired all Get/Create/Update/Delete operations to use the cache instead of direct etcd ops.
Backend store & orchestration
server/internal/workflows/backend/etcd/store.go, server/internal/workflows/backend/etcd/etcd.go
Added errCh in store, StartCaches/StopCaches on store and backend, exposed Error() <-chan error, and introduced workflowMu/activityMu to serialize GetWorkflowTask/GetActivityTask.
Worker, DI, and app wiring
server/internal/workflows/worker.go, server/internal/workflows/provide.go, server/internal/app/app.go
Worker now depends on concrete *etcd.Backend, starts/stops backend caches in Start/Shutdown, exposes backend Error(), DI provides *etcd.Backend, and app registers worker as an error producer.
Tests
server/internal/workflows/backend/etcd/etcd_test.go
Tests now call backend.StartCaches(...) and register backend.StopCaches via t.Cleanup, failing on startup errors. test-ci adds a targeted skip for a flaky workflow backend test.
Cache internals
server/internal/storage/cache.go
Stop() now acquires c.mu while resetting internal state to avoid concurrent access during shutdown.

Poem

🐇 In burrows of keys and gentle locks,
I tuck cache carrots in tiny flocks.
Workers wake, their channels sing,
Errors travel on my spring.
Hop—start, stop—woven threads align.

🚥 Pre-merge checks | ✅ 1 | ❌ 2

❌ Failed checks (1 warning, 1 inconclusive)

Check name Status Explanation Resolution
Docstring Coverage ⚠️ Warning Docstring coverage is 0.00% which is insufficient. The required threshold is 80.00%. Write docstrings for the functions missing them to satisfy the coverage threshold.
Description check ❓ Inconclusive The description covers Summary, Testing, and Notes for Reviewers sections. However, it is missing the Changes and Checklist sections required by the template. Add a bulleted Changes section listing high-level modifications, and complete the Checklist section indicating which items were addressed (tests, documentation, issue linking, changelog, breaking changes).
✅ Passed checks (1 passed)
Check name Status Explanation
Title check ✅ Passed The title clearly summarizes the main change: introducing cache usage for workflow queues, which is the core feature across all file modifications.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing Touches
📝 Generate docstrings
  • Create stacked PR
  • Commit on current branch
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Commit unit tests in branch feat/cache-workflow-queues

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

@jason-lynch jason-lynch force-pushed the feat/write-through-cache branch from 6fa02cb to 8739ec8 Compare March 20, 2026 21:36
@jason-lynch jason-lynch force-pushed the feat/cache-workflow-queues branch from 296cb36 to 9ea4d4d Compare March 20, 2026 21:36
@jason-lynch jason-lynch force-pushed the feat/write-through-cache branch from 8739ec8 to 798404c Compare March 20, 2026 21:40
@jason-lynch jason-lynch force-pushed the feat/cache-workflow-queues branch from 9ea4d4d to dd0345d Compare March 20, 2026 21:40
@jason-lynch jason-lynch force-pushed the feat/write-through-cache branch from 798404c to 31df024 Compare March 24, 2026 00:16
@jason-lynch jason-lynch force-pushed the feat/cache-workflow-queues branch from dd0345d to 0213f8c Compare March 24, 2026 00:16
@jason-lynch
Copy link
Copy Markdown
Member Author

@coderabbitai review

@coderabbitai
Copy link
Copy Markdown

coderabbitai bot commented Mar 24, 2026

✅ Actions performed

Review triggered.

Note: CodeRabbit is an incremental review system and does not re-review already reviewed commits. This command is applicable only when automatic reviews are paused.

Copy link
Copy Markdown

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 3

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (1)
server/internal/workflows/worker.go (1)

56-72: ⚠️ Potential issue | 🟠 Major

Stop backend caches when worker.Start fails.

StartCaches is bound to the parent ctx, but the error path only cancels childCtx. If w.worker.Start(childCtx) fails, the cache watchers keep running until some later cleanup happens.

Suggested fix
 func (w *Worker) Start(ctx context.Context) error {
 	if w.cancel != nil {
 		return fmt.Errorf("workflows worker already started")
 	}
 	if err := w.be.StartCaches(ctx); err != nil {
-		return err
+		return fmt.Errorf("failed to start backend caches: %w", err)
 	}
 
 	w.logger.Debug().Msg("starting workflows worker")
@@
 	if err := w.worker.Start(childCtx); err != nil {
 		cancel()
+		w.be.StopCaches()
 		return fmt.Errorf("failed to start worker: %w", err)
 	}
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@server/internal/workflows/worker.go` around lines 56 - 72, In Worker.Start,
ensure backend caches started by w.be.StartCaches(ctx) are stopped if
w.worker.Start(childCtx) fails: after calling cancel() on childCtx, call the
corresponding cache shutdown method (e.g., w.be.StopCaches(ctx) or StopCaches
with the same parent context) and handle/log any error before returning the
wrapped error from w.worker.Start; update the error path in Worker.Start to
invoke this cache stop so cache watchers don't keep running when w.worker.Start
fails.
🧹 Nitpick comments (1)
server/internal/workflows/backend/etcd/activity_queue_item/store.go (1)

27-32: Make the unstarted-cache state fail explicitly.

Store is now unusable until StartCache runs, but every method below dereferences s.cache directly. One missed init call turns into a nil-pointer panic instead of a clear startup error. Consider constructing the cache in NewStore and separating only the watch lifecycle, or returning a sentinel error when the cache was never started.

Also applies to: 54-72, 74-103

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@server/internal/workflows/backend/etcd/activity_queue_item/store.go` around
lines 27 - 32, Store's methods currently dereference s.cache and will panic if
StartCache wasn't called; make the unstarted-cache state fail explicitly by
either initializing the cache in NewStore or adding a nil-check that returns a
sentinel error. Add a package-level var ErrCacheNotStarted error and update
NewStore/constructor to create storage.Cache[*Value] (so StartCache only begins
watching) or, if you prefer lazy start, add a helper method ensureCacheStarted()
used by all public methods (Get/Put/Delete/whatever methods referencing s.cache)
that returns ErrCacheNotStarted when s.cache == nil; update all methods that
currently use s.cache directly to call ensureCacheStarted() first and return the
sentinel error instead of dereferencing a nil pointer. Ensure StartCache sets up
the watch lifecycle but does not replace the invariant that s.cache is non-nil
(or documents the sentinel error path).
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@Makefile`:
- Around line 95-104: The CI target test-ci still runs the workflows backend
package set without the flaky-test exclusion; update the test-ci make target to
mirror test-workflows-backend by adding the same
--packages='./server/internal/workflows/backend/etcd/...' invocation and the --
-tags=workflows_backend_test and
-skip=Test_EtcdBackendE2E/AutoExpiration/StartsWorkflowAndRemoves flags (i.e.,
apply the same -skip used in test-workflows-backend) so CI runs with the flaky
test quarantined just like the specialized target.

In `@server/internal/workflows/backend/etcd/store.go`:
- Around line 52-58: The StartCaches method can leave WorkflowQueueItem running
if ActivityQueueItem.StartCache(ctx) fails; modify StartCaches so that after
WorkflowQueueItem.StartCache succeeds but before returning error from a failing
ActivityQueueItem.StartCache call you explicitly stop/rollback the workflow
cache (call the corresponding cleanup method such as WorkflowQueueItem.StopCache
or WorkflowQueueItem.CloseCache with ctx) and handle any error from that
cleanup, then return an error that wraps the original Activity start error (and
optionally the cleanup error) so the store is not left half-initialized; update
references in StartCaches to use WorkflowQueueItem.StartCache,
ActivityQueueItem.StartCache and the WorkflowQueueItem cleanup method.

In `@server/internal/workflows/backend/etcd/workflow_queue_item/store.go`:
- Around line 61-79: Don't assign s.cache until the cache has successfully
started: create the cache into a local variable (e.g., c :=
storage.NewCache(...)), call c.Start(ctx) and if Start returns nil then set
s.cache = c; on Start error call c.Stop() (if needed) and return the error so
s.cache remains nil. Also update StopCache to clear the field after stopping
(call s.cache.Stop() then set s.cache = nil) so future StartCache won't hit the
fast path while the cache is stopped; touch the Store.StartCache,
Store.StopCache and s.cache usage accordingly.

---

Outside diff comments:
In `@server/internal/workflows/worker.go`:
- Around line 56-72: In Worker.Start, ensure backend caches started by
w.be.StartCaches(ctx) are stopped if w.worker.Start(childCtx) fails: after
calling cancel() on childCtx, call the corresponding cache shutdown method
(e.g., w.be.StopCaches(ctx) or StopCaches with the same parent context) and
handle/log any error before returning the wrapped error from w.worker.Start;
update the error path in Worker.Start to invoke this cache stop so cache
watchers don't keep running when w.worker.Start fails.

---

Nitpick comments:
In `@server/internal/workflows/backend/etcd/activity_queue_item/store.go`:
- Around line 27-32: Store's methods currently dereference s.cache and will
panic if StartCache wasn't called; make the unstarted-cache state fail
explicitly by either initializing the cache in NewStore or adding a nil-check
that returns a sentinel error. Add a package-level var ErrCacheNotStarted error
and update NewStore/constructor to create storage.Cache[*Value] (so StartCache
only begins watching) or, if you prefer lazy start, add a helper method
ensureCacheStarted() used by all public methods (Get/Put/Delete/whatever methods
referencing s.cache) that returns ErrCacheNotStarted when s.cache == nil; update
all methods that currently use s.cache directly to call ensureCacheStarted()
first and return the sentinel error instead of dereferencing a nil pointer.
Ensure StartCache sets up the watch lifecycle but does not replace the invariant
that s.cache is non-nil (or documents the sentinel error path).

ℹ️ Review info
⚙️ Run configuration

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro

Run ID: 0e8b9165-a7eb-4e05-ab88-0a0f16bc52ab

📥 Commits

Reviewing files that changed from the base of the PR and between 31df024 and 0213f8c.

📒 Files selected for processing (9)
  • Makefile
  • server/internal/app/app.go
  • server/internal/workflows/backend/etcd/activity_queue_item/store.go
  • server/internal/workflows/backend/etcd/etcd.go
  • server/internal/workflows/backend/etcd/etcd_test.go
  • server/internal/workflows/backend/etcd/store.go
  • server/internal/workflows/backend/etcd/workflow_queue_item/store.go
  • server/internal/workflows/provide.go
  • server/internal/workflows/worker.go

@jason-lynch jason-lynch force-pushed the feat/cache-workflow-queues branch from 0213f8c to 0b869cb Compare March 24, 2026 19:10
@jason-lynch jason-lynch force-pushed the feat/write-through-cache branch from 31df024 to c86bdfe Compare March 25, 2026 00:16
@jason-lynch jason-lynch force-pushed the feat/cache-workflow-queues branch from 0b869cb to 41795bd Compare March 25, 2026 00:16
@jason-lynch jason-lynch force-pushed the feat/write-through-cache branch from c86bdfe to 7a8a522 Compare March 25, 2026 13:16
@jason-lynch jason-lynch force-pushed the feat/cache-workflow-queues branch from 41795bd to ae0307c Compare March 25, 2026 13:16
@jason-lynch jason-lynch requested a review from tsivaprasad March 30, 2026 15:37
@jason-lynch jason-lynch force-pushed the feat/write-through-cache branch from 7a8a522 to ceead79 Compare March 31, 2026 17:26
@jason-lynch jason-lynch force-pushed the feat/cache-workflow-queues branch from ae0307c to 471d763 Compare March 31, 2026 17:26
@codacy-production
Copy link
Copy Markdown

codacy-production bot commented Mar 31, 2026

Up to standards ✅

🟢 Issues 0 issues

Results:
0 new issues

View in Codacy

🟢 Metrics 0 complexity . -6 duplication

Metric Results
Complexity 0
Duplication -6

View in Codacy

TIP This summary will be updated as you push new changes. Give us feedback

@jason-lynch jason-lynch force-pushed the feat/write-through-cache branch from ceead79 to 7dcf042 Compare March 31, 2026 18:07
@jason-lynch jason-lynch force-pushed the feat/cache-workflow-queues branch from 471d763 to 91a1384 Compare March 31, 2026 18:07
@jason-lynch jason-lynch force-pushed the feat/write-through-cache branch from 7dcf042 to 4e6e1e5 Compare March 31, 2026 18:48
@jason-lynch jason-lynch force-pushed the feat/cache-workflow-queues branch from 91a1384 to f7f8114 Compare March 31, 2026 18:48
Base automatically changed from feat/write-through-cache to main March 31, 2026 19:34
Updates the workflows backend to use the write-through cache for
workflow and activity queue items. This eliminates a frequent, expensive
range query, and enables us to reduce the queue polling frequency to the
default 200ms without overloading Etcd. We were previously performing
these range queries every 500ms, but the queries themselves could take
up to 1.5s - even when the range was empty.
@jason-lynch jason-lynch force-pushed the feat/cache-workflow-queues branch from f7f8114 to 7120397 Compare March 31, 2026 19:38
@coderabbitai
Copy link
Copy Markdown

coderabbitai bot commented Mar 31, 2026

Caution

Review failed

An error occurred during the review process. Please try again later.

📝 Walkthrough

Walkthrough

Adds an in-memory cache layer to etcd-backed workflow queue stores, exposes cache lifecycle and error channels through the backend and worker, serializes task polling with mutexes, updates tests to start/stop caches, and introduces Makefile flags to control test caching behavior.

Changes

Cohort / File(s) Summary
Build Configuration
Makefile
Added TEST_DISABLE_CACHE ?= 0, test_disable_cache derivation, workflows_backend_skip skip flag; updated test, test-etcd-lifecycle, test-workflows-backend, and test-ci targets to pass conditional -count=1 and skip flags.
Store cache implementations
server/internal/workflows/backend/etcd/activity_queue_item/store.go, server/internal/workflows/backend/etcd/workflow_queue_item/store.go
Added internal cache storage.Cache[*Value], lifecycle methods StartCache/StopCache, PropagateErrors, and rewired all Get/Create/Update/Delete operations to use the cache instead of direct etcd ops.
Backend store & orchestration
server/internal/workflows/backend/etcd/store.go, server/internal/workflows/backend/etcd/etcd.go
Added errCh in store, StartCaches/StopCaches on store and backend, exposed Error() <-chan error, and introduced workflowMu/activityMu to serialize GetWorkflowTask/GetActivityTask.
Worker, DI, and app wiring
server/internal/workflows/worker.go, server/internal/workflows/provide.go, server/internal/app/app.go
Worker now depends on concrete *etcd.Backend, starts/stops backend caches in Start/Shutdown, exposes backend Error(), DI provides *etcd.Backend, and app registers worker as an error producer.
Tests
server/internal/workflows/backend/etcd/etcd_test.go
Tests now call backend.StartCaches(...) and register backend.StopCaches via t.Cleanup, failing on startup errors. test-ci adds a targeted skip for a flaky workflow backend test.
Cache internals
server/internal/storage/cache.go
Stop() now acquires c.mu while resetting internal state to avoid concurrent access during shutdown.

Poem

🐇 In burrows of keys and gentle locks,
I tuck cache carrots in tiny flocks.
Workers wake, their channels sing,
Errors travel on my spring.
Hop—start, stop—woven threads align.

🚥 Pre-merge checks | ✅ 1 | ❌ 2

❌ Failed checks (1 warning, 1 inconclusive)

Check name Status Explanation Resolution
Docstring Coverage ⚠️ Warning Docstring coverage is 0.00% which is insufficient. The required threshold is 80.00%. Write docstrings for the functions missing them to satisfy the coverage threshold.
Description check ❓ Inconclusive The description covers Summary, Testing, and Notes for Reviewers sections. However, it is missing the Changes and Checklist sections required by the template. Add a bulleted Changes section listing high-level modifications, and complete the Checklist section indicating which items were addressed (tests, documentation, issue linking, changelog, breaking changes).
✅ Passed checks (1 passed)
Check name Status Explanation
Title check ✅ Passed The title clearly summarizes the main change: introducing cache usage for workflow queues, which is the core feature across all file modifications.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing Touches
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Commit unit tests in branch feat/cache-workflow-queues

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

@jason-lynch jason-lynch merged commit 70a9aa4 into main Mar 31, 2026
3 checks passed
@jason-lynch jason-lynch deleted the feat/cache-workflow-queues branch March 31, 2026 20:09
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants