Conversation
|
No actionable comments were generated in the recent review. 🎉 ℹ️ Recent review info⚙️ Run configurationConfiguration used: Organization UI Review profile: CHILL Plan: Pro Run ID: 📒 Files selected for processing (10)
✅ Files skipped from review due to trivial changes (2)
🚧 Files skipped from review as they are similar to previous changes (4)
📝 WalkthroughWalkthroughAdds an in-memory cache layer to etcd-backed workflow queue stores, exposes cache lifecycle and error channels through the backend and worker, serializes task polling with mutexes, updates tests to start/stop caches, and introduces Makefile flags to control test caching behavior. Changes
Poem
🚥 Pre-merge checks | ✅ 1 | ❌ 2❌ Failed checks (1 warning, 1 inconclusive)
✅ Passed checks (1 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches📝 Generate docstrings
🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
6fa02cb to
8739ec8
Compare
296cb36 to
9ea4d4d
Compare
8739ec8 to
798404c
Compare
9ea4d4d to
dd0345d
Compare
798404c to
31df024
Compare
dd0345d to
0213f8c
Compare
|
@coderabbitai review |
✅ Actions performedReview triggered.
|
There was a problem hiding this comment.
Actionable comments posted: 3
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
server/internal/workflows/worker.go (1)
56-72:⚠️ Potential issue | 🟠 MajorStop backend caches when
worker.Startfails.
StartCachesis bound to the parentctx, but the error path only cancelschildCtx. Ifw.worker.Start(childCtx)fails, the cache watchers keep running until some later cleanup happens.Suggested fix
func (w *Worker) Start(ctx context.Context) error { if w.cancel != nil { return fmt.Errorf("workflows worker already started") } if err := w.be.StartCaches(ctx); err != nil { - return err + return fmt.Errorf("failed to start backend caches: %w", err) } w.logger.Debug().Msg("starting workflows worker") @@ if err := w.worker.Start(childCtx); err != nil { cancel() + w.be.StopCaches() return fmt.Errorf("failed to start worker: %w", err) }🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@server/internal/workflows/worker.go` around lines 56 - 72, In Worker.Start, ensure backend caches started by w.be.StartCaches(ctx) are stopped if w.worker.Start(childCtx) fails: after calling cancel() on childCtx, call the corresponding cache shutdown method (e.g., w.be.StopCaches(ctx) or StopCaches with the same parent context) and handle/log any error before returning the wrapped error from w.worker.Start; update the error path in Worker.Start to invoke this cache stop so cache watchers don't keep running when w.worker.Start fails.
🧹 Nitpick comments (1)
server/internal/workflows/backend/etcd/activity_queue_item/store.go (1)
27-32: Make the unstarted-cache state fail explicitly.
Storeis now unusable untilStartCacheruns, but every method below dereferencess.cachedirectly. One missed init call turns into a nil-pointer panic instead of a clear startup error. Consider constructing the cache inNewStoreand separating only the watch lifecycle, or returning a sentinel error when the cache was never started.Also applies to: 54-72, 74-103
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@server/internal/workflows/backend/etcd/activity_queue_item/store.go` around lines 27 - 32, Store's methods currently dereference s.cache and will panic if StartCache wasn't called; make the unstarted-cache state fail explicitly by either initializing the cache in NewStore or adding a nil-check that returns a sentinel error. Add a package-level var ErrCacheNotStarted error and update NewStore/constructor to create storage.Cache[*Value] (so StartCache only begins watching) or, if you prefer lazy start, add a helper method ensureCacheStarted() used by all public methods (Get/Put/Delete/whatever methods referencing s.cache) that returns ErrCacheNotStarted when s.cache == nil; update all methods that currently use s.cache directly to call ensureCacheStarted() first and return the sentinel error instead of dereferencing a nil pointer. Ensure StartCache sets up the watch lifecycle but does not replace the invariant that s.cache is non-nil (or documents the sentinel error path).
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@Makefile`:
- Around line 95-104: The CI target test-ci still runs the workflows backend
package set without the flaky-test exclusion; update the test-ci make target to
mirror test-workflows-backend by adding the same
--packages='./server/internal/workflows/backend/etcd/...' invocation and the --
-tags=workflows_backend_test and
-skip=Test_EtcdBackendE2E/AutoExpiration/StartsWorkflowAndRemoves flags (i.e.,
apply the same -skip used in test-workflows-backend) so CI runs with the flaky
test quarantined just like the specialized target.
In `@server/internal/workflows/backend/etcd/store.go`:
- Around line 52-58: The StartCaches method can leave WorkflowQueueItem running
if ActivityQueueItem.StartCache(ctx) fails; modify StartCaches so that after
WorkflowQueueItem.StartCache succeeds but before returning error from a failing
ActivityQueueItem.StartCache call you explicitly stop/rollback the workflow
cache (call the corresponding cleanup method such as WorkflowQueueItem.StopCache
or WorkflowQueueItem.CloseCache with ctx) and handle any error from that
cleanup, then return an error that wraps the original Activity start error (and
optionally the cleanup error) so the store is not left half-initialized; update
references in StartCaches to use WorkflowQueueItem.StartCache,
ActivityQueueItem.StartCache and the WorkflowQueueItem cleanup method.
In `@server/internal/workflows/backend/etcd/workflow_queue_item/store.go`:
- Around line 61-79: Don't assign s.cache until the cache has successfully
started: create the cache into a local variable (e.g., c :=
storage.NewCache(...)), call c.Start(ctx) and if Start returns nil then set
s.cache = c; on Start error call c.Stop() (if needed) and return the error so
s.cache remains nil. Also update StopCache to clear the field after stopping
(call s.cache.Stop() then set s.cache = nil) so future StartCache won't hit the
fast path while the cache is stopped; touch the Store.StartCache,
Store.StopCache and s.cache usage accordingly.
---
Outside diff comments:
In `@server/internal/workflows/worker.go`:
- Around line 56-72: In Worker.Start, ensure backend caches started by
w.be.StartCaches(ctx) are stopped if w.worker.Start(childCtx) fails: after
calling cancel() on childCtx, call the corresponding cache shutdown method
(e.g., w.be.StopCaches(ctx) or StopCaches with the same parent context) and
handle/log any error before returning the wrapped error from w.worker.Start;
update the error path in Worker.Start to invoke this cache stop so cache
watchers don't keep running when w.worker.Start fails.
---
Nitpick comments:
In `@server/internal/workflows/backend/etcd/activity_queue_item/store.go`:
- Around line 27-32: Store's methods currently dereference s.cache and will
panic if StartCache wasn't called; make the unstarted-cache state fail
explicitly by either initializing the cache in NewStore or adding a nil-check
that returns a sentinel error. Add a package-level var ErrCacheNotStarted error
and update NewStore/constructor to create storage.Cache[*Value] (so StartCache
only begins watching) or, if you prefer lazy start, add a helper method
ensureCacheStarted() used by all public methods (Get/Put/Delete/whatever methods
referencing s.cache) that returns ErrCacheNotStarted when s.cache == nil; update
all methods that currently use s.cache directly to call ensureCacheStarted()
first and return the sentinel error instead of dereferencing a nil pointer.
Ensure StartCache sets up the watch lifecycle but does not replace the invariant
that s.cache is non-nil (or documents the sentinel error path).
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
Run ID: 0e8b9165-a7eb-4e05-ab88-0a0f16bc52ab
📒 Files selected for processing (9)
Makefileserver/internal/app/app.goserver/internal/workflows/backend/etcd/activity_queue_item/store.goserver/internal/workflows/backend/etcd/etcd.goserver/internal/workflows/backend/etcd/etcd_test.goserver/internal/workflows/backend/etcd/store.goserver/internal/workflows/backend/etcd/workflow_queue_item/store.goserver/internal/workflows/provide.goserver/internal/workflows/worker.go
0213f8c to
0b869cb
Compare
31df024 to
c86bdfe
Compare
0b869cb to
41795bd
Compare
c86bdfe to
7a8a522
Compare
41795bd to
ae0307c
Compare
7a8a522 to
ceead79
Compare
ae0307c to
471d763
Compare
Up to standards ✅🟢 Issues
|
| Metric | Results |
|---|---|
| Complexity | 0 |
| Duplication | -6 |
TIP This summary will be updated as you push new changes. Give us feedback
ceead79 to
7dcf042
Compare
471d763 to
91a1384
Compare
7dcf042 to
4e6e1e5
Compare
91a1384 to
f7f8114
Compare
Updates the workflows backend to use the write-through cache for workflow and activity queue items. This eliminates a frequent, expensive range query, and enables us to reduce the queue polling frequency to the default 200ms without overloading Etcd. We were previously performing these range queries every 500ms, but the queries themselves could take up to 1.5s - even when the range was empty.
f7f8114 to
7120397
Compare
|
Caution Review failedAn error occurred during the review process. Please try again later. 📝 WalkthroughWalkthroughAdds an in-memory cache layer to etcd-backed workflow queue stores, exposes cache lifecycle and error channels through the backend and worker, serializes task polling with mutexes, updates tests to start/stop caches, and introduces Makefile flags to control test caching behavior. Changes
Poem
🚥 Pre-merge checks | ✅ 1 | ❌ 2❌ Failed checks (1 warning, 1 inconclusive)
✅ Passed checks (1 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
Summary
Updates the workflows backend to use the write-through cache for workflow and activity queue items. This eliminates a frequent, expensive range query and enables us to reduce the queue polling frequency to the default 200ms without overloading Etcd. We were previously performing these range queries every 500ms, but the queries themselves could take up to 1.5s - even when the range was empty.
Testing
{ "profiling_enabled": true, "client_addresses": ["127.0.0.1"], "etcd_server": { "log_level": "warn" } }Notes for Reviewers
I disabled one of the workflow backend tests and included an explanation in the
Makefile. This test passes most of the time, but that occasional failure is annoying.