Conversation
|
Caution Review failedPull request was closed or merged during review 📝 WalkthroughWalkthroughAdded a scheduled YouTube comments sync job, centralized YouTube token refresh helper with in-flight deduplication and Redis caching, three BullMQ workers with an aggregator and lifecycle wiring, worker-side configs/models/utils, server startup changes to start the cron and adjust shutdown flow, and a new Changes
Sequence Diagram(s)sequenceDiagram
participant Cron as Cron Scheduler
participant Server as Server
participant DB as Database
participant Cache as Redis
participant YouTube as YouTube API
participant Worker as BullMQ Workers
Cron->>Server: trigger sync (*/30 * * * *)
Server->>DB: query channels
DB-->>Server: channels[]
loop per channel
Server->>Cache: get cached access token (userId)
alt token valid
Cache-->>Server: accessToken
else
Server->>DB: load user.refreshToken
DB-->>Server: refreshToken
Server->>YouTube: refresh OAuth token
YouTube-->>Server: new accessToken
Server->>Cache: store token + expiry
end
Server->>DB: list videos for channel
DB-->>Server: videos[]
loop batches (size 5)
par for each video
Server->>YouTube: fetch comments for videoId
YouTube-->>Server: comments[]
Server->>DB: upsert comments
Server->>Worker: enqueue classify/generate jobs
end
end
end
Server->>Server: log completion
Estimated code review effort🎯 4 (Complex) | ⏱️ ~45 minutes Possibly related PRs
Poem
🚥 Pre-merge checks | ✅ 1 | ❌ 2❌ Failed checks (1 warning, 1 inconclusive)
✅ Passed checks (1 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches🧪 Generate unit tests (beta)
⚔️ Resolve merge conflicts
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Actionable comments posted: 11
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@server/server.js`:
- Around line 29-55: The shutdown sequence currently closes BullMQ workers and
the database before calling server.close(), which allows new incoming
connections to be accepted after dependencies are torn down; change the order in
the SIGTERM/SIGINT handler so you first call server.close() (the promisified
close already used), await it to stop accepting new connections, then
dynamically import "../worker/tasks/index.js" and await
Promise.all(workers.map(w => w.close())), and finally call disconnectdb();
preserve the existing try/catch logging around each step (server.close, workers
close, disconnectdb) to surface errors using the same handlers.
- Around line 8-13: The dynamic import of the worker tasks
(import("../worker/tasks/index.js")) is not awaited inside the connectdb()
then-callback, so the process may call app.listen() before workers are
initialized; change the connectdb() continuation to an async function and await
the import and startSyncCronJob (or at least await the import and handle/rethrow
errors) so startup fails fast on worker bootstrap errors and app.listen() only
runs after workers are loaded. Ensure you reference the connectdb() promise
chain, the import("../worker/tasks/index.js") call, and startSyncCronJob() when
making these changes so errors propagate and prevent starting the HTTP server if
worker initialization fails.
In `@server/src/jobs/syncComments.job.js`:
- Around line 31-35: The Video model's channelId field is declared unique which
prevents multiple Video documents per channel and breaks the batching logic in
the sync job; remove the unique: true constraint from the channelId schema
definition in the Video model (keep unique: true on videoId) so the Video model
allows many videos per channel, then run your migrations/DB update to drop the
unique index on channelId so Video.find({ channelId: ... }) in the syncComments
job returns all channel videos as intended.
- Around line 65-70: The cron task is currently fire-and-forget and may overlap;
modify startSyncCronJob so it captures and returns the ScheduledTask from
cron.schedule(), make the scheduled callback async and await
syncAllChannelsGlobally(), and pass { noOverlap: true } (or the equivalent
option) to cron.schedule so runs are skipped while a previous run is running;
return the task from startSyncCronJob so the caller (e.g. code that uses
startSyncCronJob and may call task.stop()) can manage shutdown.
In `@server/src/utils/youtubeToken.helper.js`:
- Around line 17-79: getValidYoutubeToken currently only flags reAuthNeeded for
missing user.refreshToken, causing transient Redis/Mongo/OAuth network errors to
be misinterpreted as auth revocations; update the function so only genuine OAuth
re-auth errors (e.g., OAuth refresh failures that indicate
invalid_grant/token_revoked/invalid_client as identified from
oauth2Client.refreshAccessToken rejection details like error.code or
error.response?.data?.error) are tagged with err.reAuthNeeded = true, while
other failures from redis.get/set, User.findById, or network/timeouts propagate
without reAuthNeeded; specifically, wrap calls to redis.get, User.findById, and
oauth2Client.refreshAccessToken with targeted try/catch blocks, set reAuthNeeded
only when the caught error matches OAuth permanent-auth failure signatures in
the refreshPromise executor (and rethrow), and leave other errors untouched so
callers can distinguish transient dependency failures.
- Around line 75-76: The detached .finally() on refreshPromise creates an
unobserved rejection; replace it by awaiting the refresh flow inside a
try/finally so the cleanup always runs without creating a new discarded promise.
Locate the code that creates refreshPromise and the
ongoingRefreshPromises.delete(userId) call and wrap the await/use of
refreshPromise in a try { await refreshPromise } finally {
ongoingRefreshPromises.delete(userId) } (or if refreshPromise is returned to
callers, perform the promise chain inside an async IIFE or function using
try/finally) so the delete executes reliably and no unhandled rejection is
produced.
In `@worker/tasks/generate.worker.js`:
- Around line 23-27: The code silently uses a provided personaId even when the
Persona record or its systemPrompt is missing, so update the logic around
Persona.findById and personaDescription to avoid persisting an invalid persona
reference: after calling Persona.findById(personaId) (in the generate worker),
if no persona is returned or persona.systemPrompt is empty, clear or nullify
personaId (and set personaDescription = ''), and optionally log a warning; only
set personaDescription from persona.systemPrompt when the field exists. This
ensures personaId is not kept when the actual persona context is absent.
- Around line 60-69: The catch block currently finds any Reply by commentId and
marks it failed, which can overwrite unrelated replies; instead, track the
specific Reply instance created by this job (e.g., store the created reply's _id
or the reply object returned when calling reply.save()) and in the catch only
update that document (use Reply.findById(createdReplyId) or check reply._id)
before setting status = 'failed' and saving; if no local createdReplyId exists,
do not touch other Reply documents. Ensure references to Reply.findOne,
reply.save(), job.id and commentId are replaced by a lookup/update that uses the
saved reply's _id.
In `@worker/tasks/index.js`:
- Around line 13-15: The failed-event handler uses job.id directly which can
throw because BullMQ v5 may pass job as undefined; update the
worker.on('failed', (job, err) => ...) handler to guard the job object when
building the log message (e.g., derive an id string with job?.id ?? 'unknown' or
similar) and use that safe id when calling logger.error so the listener never
dereferences a null/undefined job.
In `@worker/tasks/postReply.worker.js`:
- Around line 68-79: After a successful YouTube post, the post-success DB
updates (reply.save() and Comment.findByIdAndUpdate()) must be made idempotent
and retriable so failures don't leave the Reply stuck in "publishing"; replace
the plain reply.save() with an atomic findOneAndUpdate on Reply (filter {_id:
reply._id, status: 'publishing'}) that sets ytReplyId, status:'published' and
publishedAt in one operation, and perform Comment.findByIdAndUpdate(..., { $inc:
{ replyCount: 1 } }) with a small retry/backoff loop; if retries exhaust,
enqueue a durable compensation/recovery job (or mark a separate recovery flag on
Reply) so the work can be reclaimed—refer to reply.save(),
Comment.findByIdAndUpdate(), and the claim/status check that uses status { $nin:
['published','publishing'] } to ensure the change is idempotent and recoverable.
- Around line 58-66: The insert can succeed on YouTube but our job may fail
before we mark it succeeded, causing retries to duplicate replies; make the
operation idempotent by (1) marking the DB row to 'publishing' as soon as the
job starts (use the same place where the job claims the reply) and storing a
youtubeReplyId field, (2) before calling youtube.comments.insert(...) check
comment.youtubeReplyId (or equivalent) and skip the insert if a reply id already
exists, (3) after a successful insert set youtubePostSucceeded = true and
persist the returned reply id into comment.youtubeReplyId in the same try block,
and (4) in the catch/finally logic (where you currently reset to 'failed') first
check comment.youtubeReplyId and only set to 'failed' if no youtubeReplyId was
recorded; reference youtube.comments.insert, comment.ytCommentId,
youtubePostSucceeded and the catch/reset-to-failed block to locate where to add
these guards.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: defaults
Review profile: CHILL
Plan: Pro
Run ID: 856b0c59-3c02-41c5-8538-16095b8ae31e
⛔ Files ignored due to path filters (1)
server/package-lock.jsonis excluded by!**/package-lock.json
📒 Files selected for processing (13)
branches.txtremote.txtserver/package.jsonserver/server.jsserver/src/jobs/syncComments.job.jsserver/src/middleware/youtubeToken.middleware.jsserver/src/models/Reply.models.jsserver/src/utils/youtubeToken.helper.jsworker/tasks/__init__.pyworker/tasks/classify.worker.jsworker/tasks/generate.worker.jsworker/tasks/index.jsworker/tasks/postReply.worker.js
| connectdb() | ||
| .then(() => { | ||
| // Initialize BullMQ workers at startup from the new tasks directory | ||
| import("../worker/tasks/index.js"); | ||
|
|
||
| startSyncCronJob(); |
There was a problem hiding this comment.
🧩 Analysis chain
🏁 Script executed:
find . -name "server.js" -type f | head -20Repository: ashutosh2652/ReplyPilot
Length of output: 85
🏁 Script executed:
cat -n server/server.jsRepository: ashutosh2652/ReplyPilot
Length of output: 2362
🏁 Script executed:
cat -n worker/tasks/index.js 2>/dev/null || echo "File not found or checking structure"Repository: ashutosh2652/ReplyPilot
Length of output: 1062
🏁 Script executed:
ls -la worker/ 2>/dev/null || echo "Worker directory structure"Repository: ashutosh2652/ReplyPilot
Length of output: 229
🏁 Script executed:
rg "workers" server/src/app.js -A 2 -B 2Repository: ashutosh2652/ReplyPilot
Length of output: 49
🏁 Script executed:
rg "SIGTERM\|SIGINT" --type js | head -20Repository: ashutosh2652/ReplyPilot
Length of output: 49
🏁 Script executed:
cat -n server/src/config/db.js | head -30Repository: ashutosh2652/ReplyPilot
Length of output: 1098
Await worker bootstrap before calling app.listen().
import() returns a promise and rejects if module loading or evaluation fails. Line 11 doesn't await it, so startup proceeds even if worker initialization fails. The HTTP server (line 18) will open and accept requests to queue jobs, but no BullMQ workers will be running to process them. Make the callback async and await the import so startup fails fast if worker bootstrap fails. (developer.mozilla.org)
Fail-fast startup change
connectdb()
- .then(() => {
+ .then(async () => {
// Initialize BullMQ workers at startup from the new tasks directory
- import("../worker/tasks/index.js");
+ await import("../worker/tasks/index.js");
startSyncCronJob();
app.on("error", (error) => {🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@server/server.js` around lines 8 - 13, The dynamic import of the worker tasks
(import("../worker/tasks/index.js")) is not awaited inside the connectdb()
then-callback, so the process may call app.listen() before workers are
initialized; change the connectdb() continuation to an async function and await
the import and startSyncCronJob (or at least await the import and handle/rethrow
errors) so startup fails fast on worker bootstrap errors and app.listen() only
runs after workers are loaded. Ensure you reference the connectdb() promise
chain, the import("../worker/tasks/index.js") call, and startSyncCronJob() when
making these changes so errors propagate and prevent starting the HTTP server if
worker initialization fails.
| ["SIGTERM", "SIGINT"].forEach((sig) => | ||
| process.on(sig, async () => { | ||
| console.info(`Caught ${sig} dranning...`); | ||
| await disconnectdb(); | ||
| server.close(() => process.exit(0)); | ||
| console.info(`Caught ${sig}, draining...`); | ||
|
|
||
| // 1. Await the dynamic import so we hold a reference to the workers array | ||
| try { | ||
| const { default: workers } = await import("../worker/tasks/index.js"); | ||
| await Promise.all(workers.map((w) => w.close())); | ||
| console.info("BullMQ workers closed."); | ||
| } catch (err) { | ||
| console.error("Error closing BullMQ workers:", err); | ||
| } | ||
|
|
||
| // 2. Only after workers are shut down, disconnect the database | ||
| try { | ||
| await disconnectdb(); | ||
| console.info("Database disconnected."); | ||
| } catch (err) { | ||
| console.error("Error disconnecting database:", err); | ||
| } | ||
|
|
||
| // 3. Close the HTTP server (promisified so we can await it) | ||
| await new Promise((resolve, reject) => { | ||
| server.close((err) => (err ? reject(err) : resolve())); | ||
| }).catch((err) => console.error("Error closing HTTP server:", err)); | ||
|
|
||
| process.exit(0); |
There was a problem hiding this comment.
🧩 Analysis chain
🏁 Script executed:
find . -name "server.js" -path "*/server/*" | head -5Repository: ashutosh2652/ReplyPilot
Length of output: 85
🏁 Script executed:
cat -n server/server.js | head -80Repository: ashutosh2652/ReplyPilot
Length of output: 2362
🌐 Web query:
Node.js server.close() behavior graceful shutdown in-flight requests
💡 Result:
Node.js http.Server.close([callback]) stops the server from accepting new TCP connections and ensures the 'close' event is emitted once all connections are ended. It gracefully allows in-flight HTTP requests (those actively sending a request or waiting for a response) to complete, while idle keep-alive connections are closed more aggressively in recent versions. Key behavior: - No new TCP connections accepted after close is called. - Existing connections with active HTTP requests complete normally. - Idle keep-alive connections (no active request): In Node.js v19+, server.close automatically closes them via internal call to closeIdleConnections for faster shutdown. This was intended new behavior but accidentally backported to some v18.x releases and fixed in v18.20+ to match pre-v19 behavior. - The callback (if provided) fires after all tracked HTTP connections end and server emits 'close'. - server.close waits asynchronously for connections to drain; use a timeout + process.exit(1) in graceful shutdown handlers to avoid hanging. For complete graceful shutdown (e.g., on SIGTERM): 1. Call server.close 2. Optionally server.closeIdleConnections (v18.2+) to force close idle keep-alives if needed. 3. Set timeout to force process.exit(1). 4. Close other resources (DB, etc.) after server 'close' event. Example: const http = require('http'); const server = http.createServer((req, res) => { res.end('ok'); }); server.listen(3000); process.on('SIGTERM', async => { console.log('Shutting down gracefully'); server.close((err) => { if (err) { console.error(err); process.exit(1); } console.log('Server closed'); process.exit(0); }); // Optional: force idle if (typeof server.closeIdleConnections === 'function') { server.closeIdleConnections; } // Force kill after 30s setTimeout( => { console.error('Could not close connections in time, forcing shut down'); process.exit(1); }, 30000); }); As of 2026 (Node v25+), this behavior holds; check exact version for v18 nuances.
Citations:
- 1: https://nodejs.org/api/http.html
- 2: https://nodejs.org/docs/latest/api/http.html
- 3: http.server.close() closes idle connections in v20 and v21 nodejs/node#53939
- 4: http: added closeAllConnections and closeIdleConnections to http.server nodejs/node#42812
- 5: http://nodejs.org/docs/latest/api/http.html
- 6: https://nodejs.org/docs/latest-v18.x/api/http.html
- 7:
http.server.close()closes idle connections since 18.19.0 nodejs/node#51677
Reorder graceful shutdown to stop accepting connections before tearing down dependencies.
Node.js server.close() immediately stops accepting new TCP connections while allowing in-flight requests to complete. The current sequence (lines 34-48) closes workers and MongoDB before calling server.close(), meaning new connections can still be accepted during this window but will fail because their dependencies are gone. Reverse the order: call server.close() first (line 51-53), then close workers, then disconnect the database. This ensures new connections are rejected immediately while the server shuts down cleanly.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@server/server.js` around lines 29 - 55, The shutdown sequence currently
closes BullMQ workers and the database before calling server.close(), which
allows new incoming connections to be accepted after dependencies are torn down;
change the order in the SIGTERM/SIGINT handler so you first call server.close()
(the promisified close already used), await it to stop accepting new
connections, then dynamically import "../worker/tasks/index.js" and await
Promise.all(workers.map(w => w.close())), and finally call disconnectdb();
preserve the existing try/catch logging around each step (server.close, workers
close, disconnectdb) to surface errors using the same handlers.
| // Find all videos for this specific channel | ||
| const videos = await Video.find({ channelId: channel.channelId }).lean(); | ||
| if(!videos || videos.length === 0) continue; | ||
|
|
||
| const BATCH_SIZE = 5; // Process 5 videos concurrently so we don't totally smash the API or RAM |
There was a problem hiding this comment.
🧩 Analysis chain
🏁 Script executed:
# First, check if the Video model file exists and examine the channelId field definition
find . -name "Video.models.js" -o -name "Video*.js" | grep -i modelRepository: ashutosh2652/ReplyPilot
Length of output: 102
🏁 Script executed:
# Read the Video model to verify the channelId unique constraint
cat server/src/models/Video.models.jsRepository: ashutosh2652/ReplyPilot
Length of output: 1413
🏁 Script executed:
# Also examine the syncComments.job.js file around lines 31-35 for context
head -50 server/src/jobs/syncComments.job.js | tail -30Repository: ashutosh2652/ReplyPilot
Length of output: 1740
This query conflicts with the current Video uniqueness constraint.
server/src/models/Video.models.js declares channelId with unique: true, which means each channel can have at most one video document in the database. The job's batching logic at lines 31–35 assumes multiple videos per channel and will fail to sync a complete channel library. Remove the unique: true constraint from channelId (keep it on videoId, which is already unique).
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@server/src/jobs/syncComments.job.js` around lines 31 - 35, The Video model's
channelId field is declared unique which prevents multiple Video documents per
channel and breaks the batching logic in the sync job; remove the unique: true
constraint from the channelId schema definition in the Video model (keep unique:
true on videoId) so the Video model allows many videos per channel, then run
your migrations/DB update to drop the unique index on channelId so Video.find({
channelId: ... }) in the syncComments job returns all channel videos as
intended.
| const startSyncCronJob = () => { | ||
| cron.schedule("*/30 * * * *", () => { | ||
| syncAllChannelsGlobally(); | ||
| }); | ||
| logger.info("Cron Job Initialized: Synchronizing YouTube Comments every 30 minutes."); | ||
| }; |
There was a problem hiding this comment.
🧩 Analysis chain
🏁 Script executed:
cat -n server/src/jobs/syncComments.job.js | head -80Repository: ashutosh2652/ReplyPilot
Length of output: 3736
🏁 Script executed:
rg -A 10 "const syncAllChannelsGlobally" server/src/jobs/Repository: ashutosh2652/ReplyPilot
Length of output: 848
🏁 Script executed:
rg -n "startSyncCronJob" server/Repository: ashutosh2652/ReplyPilot
Length of output: 340
🏁 Script executed:
cat -n server/server.js | grep -A 20 -B 5 "shutdown\|graceful\|close\|stop" | head -60Repository: ashutosh2652/ReplyPilot
Length of output: 1273
Return a managed cron task and await the sync run.
node-cron.schedule() returns a ScheduledTask, which your code discards. The callback on line 66 is not async, so line 67 drops the promise from syncAllChannelsGlobally() (which is async). This allows slow syncs to overlap, and since the task is never captured or returned, server/server.js has no way to stop it during graceful shutdown. Make the callback async, await the sync, enable noOverlap to skip runs while the previous sync is still executing, and return the task to the caller so shutdown can clean it up.
Managed cron-task shape
const startSyncCronJob = () => {
- cron.schedule("*/30 * * * *", () => {
- syncAllChannelsGlobally();
- });
+ return cron.schedule(
+ "*/30 * * * *",
+ async () => {
+ await syncAllChannelsGlobally();
+ },
+ { noOverlap: true, name: "youtube-comment-sync" }
+ );
logger.info("Cron Job Initialized: Synchronizing YouTube Comments every 30 minutes.");
};🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@server/src/jobs/syncComments.job.js` around lines 65 - 70, The cron task is
currently fire-and-forget and may overlap; modify startSyncCronJob so it
captures and returns the ScheduledTask from cron.schedule(), make the scheduled
callback async and await syncAllChannelsGlobally(), and pass { noOverlap: true }
(or the equivalent option) to cron.schedule so runs are skipped while a previous
run is running; return the task from startSyncCronJob so the caller (e.g. code
that uses startSyncCronJob and may call task.stop()) can manage shutdown.
| export async function getValidYoutubeToken(userId) { | ||
| if (!userId) { | ||
| throw new Error('UserId is required to fetch youtube token'); | ||
| } | ||
|
|
||
| const cachedToken = await redis.get(keys.ytAccessToken(userId)); | ||
| const expiryStr = await redis.get(keys.ytTokenExpiry(userId)); | ||
| const expiryAt = expiryStr ? Number(expiryStr) : 0; | ||
| const bufferMs = 5 * 60 * 1000; // 5 minutes buffer | ||
|
|
||
| // 1. If valid in Redis, return it immediately | ||
| if (cachedToken && Date.now() < expiryAt - bufferMs) { | ||
| return cachedToken; | ||
| } | ||
|
|
||
| // 2. Fetch user to get long-lived refresh token | ||
| const user = await User.findById(userId).select("+refreshToken").lean(); | ||
| if (!user?.refreshToken) { | ||
| const err = new Error('Youtube not connected - Please reauthenticate'); | ||
| err.reAuthNeeded = true; | ||
| throw err; | ||
| } | ||
|
|
||
| // 3. Check if another request is ALREADY refreshing the token right now | ||
| if (ongoingRefreshPromises.has(userId)) { | ||
| logger.debug(`Request for user ${userId} is waiting for an ongoing refresh to finish...`); | ||
| return await ongoingRefreshPromises.get(userId); | ||
| } | ||
|
|
||
| const oauth2Client = new google.auth.OAuth2( | ||
| env.GOOGLE_CLIENT_ID, | ||
| env.GOOGLE_CLIENT_SECRET | ||
| ); | ||
| oauth2Client.setCredentials({ refresh_token: user.refreshToken }); | ||
|
|
||
| // 4. We are Request #1 (the leader). Register the IOU note BEFORE the async | ||
| // work begins so any concurrent caller that reaches the has() check above | ||
| // immediately receives this same promise (zero-window race condition fix). | ||
| const refreshPromise = new Promise((resolve, reject) => { | ||
| oauth2Client.refreshAccessToken() | ||
| .then(async ({ credentials }) => { | ||
| const newAccessToken = credentials.access_token; | ||
| const newExpiresAt = credentials.expiry_date ?? Date.now() + 60 * 60 * 1000; | ||
| const ttlSeconds = Math.floor((newExpiresAt - Date.now()) / 1000); | ||
|
|
||
| await redis.set(keys.ytAccessToken(userId), newAccessToken, { EX: ttlSeconds }); | ||
| await redis.set(keys.ytTokenExpiry(userId), String(newExpiresAt), { EX: ttlSeconds }); | ||
|
|
||
| logger.debug('Access Token refreshed for user:', userId); | ||
| resolve(newAccessToken); | ||
| }) | ||
| .catch(reject); | ||
| }); | ||
|
|
||
| // Register synchronously — new Promise() construction is synchronous, so this | ||
| // line runs before any microtask from the executor above can be scheduled. | ||
| ongoingRefreshPromises.set(userId, refreshPromise); | ||
|
|
||
| // 5. Clean up on both settle paths so the map never leaks a stale entry. | ||
| refreshPromise.finally(() => ongoingRefreshPromises.delete(userId)); | ||
|
|
||
| return await refreshPromise; | ||
| } |
There was a problem hiding this comment.
Differentiate re-auth errors from dependency failures.
Line 35 is the only path that marks an error as reAuthNeeded. Every Redis, MongoDB, and OAuth refresh failure elsewhere in this helper bubbles out as a generic exception, but server/src/middleware/youtubeToken.middleware.js:19-30 turns generic failures into 401 + re-auth and server/src/jobs/syncComments.job.js:23-29 logs them as auth failures and skips the channel. A transient dependency outage will therefore look like revoked YouTube access and suppress scheduled syncs. Normalize the helper's error types here so callers reserve re-auth flows for actual auth problems.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@server/src/utils/youtubeToken.helper.js` around lines 17 - 79,
getValidYoutubeToken currently only flags reAuthNeeded for missing
user.refreshToken, causing transient Redis/Mongo/OAuth network errors to be
misinterpreted as auth revocations; update the function so only genuine OAuth
re-auth errors (e.g., OAuth refresh failures that indicate
invalid_grant/token_revoked/invalid_client as identified from
oauth2Client.refreshAccessToken rejection details like error.code or
error.response?.data?.error) are tagged with err.reAuthNeeded = true, while
other failures from redis.get/set, User.findById, or network/timeouts propagate
without reAuthNeeded; specifically, wrap calls to redis.get, User.findById, and
oauth2Client.refreshAccessToken with targeted try/catch blocks, set reAuthNeeded
only when the caught error matches OAuth permanent-auth failure signatures in
the refreshPromise executor (and rethrow), and leave other errors untouched so
callers can distinguish transient dependency failures.
| if (personaId) { | ||
| const persona = await Persona.findById(personaId); | ||
| if (persona) { | ||
| personaDescription = persona.systemPrompt || persona.description || ''; | ||
| } |
There was a problem hiding this comment.
Don't silently drop persona context.
Line 26 falls back to persona.description, but server/src/models/Persona.models.js:29-30 only defines systemPrompt. A missing persona or missing prompt will generate a generic reply while still persisting the supplied personaId.
Possible fix
let personaDescription = '';
if (personaId) {
const persona = await Persona.findById(personaId);
- if (persona) {
- personaDescription = persona.systemPrompt || persona.description || '';
- }
+ if (!persona) {
+ throw new Error(`Persona not found for id: ${personaId}`);
+ }
+ if (!persona.systemPrompt) {
+ throw new Error(`Persona ${personaId} has no systemPrompt`);
+ }
+ personaDescription = persona.systemPrompt;
}📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| if (personaId) { | |
| const persona = await Persona.findById(personaId); | |
| if (persona) { | |
| personaDescription = persona.systemPrompt || persona.description || ''; | |
| } | |
| if (personaId) { | |
| const persona = await Persona.findById(personaId); | |
| if (!persona) { | |
| throw new Error(`Persona not found for id: ${personaId}`); | |
| } | |
| if (!persona.systemPrompt) { | |
| throw new Error(`Persona ${personaId} has no systemPrompt`); | |
| } | |
| personaDescription = persona.systemPrompt; | |
| } |
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@worker/tasks/generate.worker.js` around lines 23 - 27, The code silently uses
a provided personaId even when the Persona record or its systemPrompt is
missing, so update the logic around Persona.findById and personaDescription to
avoid persisting an invalid persona reference: after calling
Persona.findById(personaId) (in the generate worker), if no persona is returned
or persona.systemPrompt is empty, clear or nullify personaId (and set
personaDescription = ''), and optionally log a warning; only set
personaDescription from persona.systemPrompt when the field exists. This ensures
personaId is not kept when the actual persona context is absent.
| worker.on('failed', (job, err) => { | ||
| logger.error(`[Worker ${worker.name}] Job ${job.id} failed with error: ${err.message}`); | ||
| }); |
There was a problem hiding this comment.
🧩 Analysis chain
🏁 Script executed:
# First, let's check if the file exists and read the relevant lines
cd /tmp/repo 2>/dev/null || echo "Not in repo yet"
# Find the worker/tasks/index.js file
find . -name "index.js" -path "*/worker/tasks/*" 2>/dev/null | head -5Repository: ashutosh2652/ReplyPilot
Length of output: 106
🏁 Script executed:
# Check the structure and find the file
git ls-files | grep -E "worker.*tasks.*index.js"Repository: ashutosh2652/ReplyPilot
Length of output: 88
🏁 Script executed:
# List files to understand the structure
ls -la worker/tasks/ 2>/dev/null || echo "Directory not found, searching..."
git ls-files "worker/tasks/*"Repository: ashutosh2652/ReplyPilot
Length of output: 560
🏁 Script executed:
# Check package.json for BullMQ version
cat package.json | grep -A2 -B2 "bullmq\|bull" || echo "Checking for BullMQ dependency..."Repository: ashutosh2652/ReplyPilot
Length of output: 162
🏁 Script executed:
# Search for any mention of BullMQ in the codebase
rg "bullmq|BullMQ" --type json --type js -i -l 2>/dev/null | head -10Repository: ashutosh2652/ReplyPilot
Length of output: 311
🏁 Script executed:
# Read the worker/tasks/index.js file
cat -n worker/tasks/index.jsRepository: ashutosh2652/ReplyPilot
Length of output: 1062
🏁 Script executed:
# Check the BullMQ version in server/package.json
cat server/package.json | grep -A3 -B3 "bullmq"Repository: ashutosh2652/ReplyPilot
Length of output: 239
🏁 Script executed:
# Check if there are node_modules or type definitions available
find . -name "*.d.ts" -path "*bullmq*" 2>/dev/null | head -5Repository: ashutosh2652/ReplyPilot
Length of output: 49
🏁 Script executed:
# Let's check the BullMQ version to confirm v5 API
cat server/package.json | grep -E "bullmq|version"Repository: ashutosh2652/ReplyPilot
Length of output: 113
🌐 Web query:
BullMQ v5 worker failed event listener signature documentation
💡 Result:
The BullMQ v5 worker 'failed' event listener signature, as defined in the WorkerListener interface, is: failed: (job: Job, error: Error, prev: string) => void This event is triggered when a job throws an exception during processing. The 'job' parameter may be undefined if a stalled job reaches the stalled limit and is deleted due to the removeOnFail option. Usage example: worker.on('failed', (job: Job | undefined, error: Error, prev: string) => { console.log(Job ${job?.id} failed with error: ${error.message}, previous state: ${prev}); });
Citations:
- 1: https://api.docs.bullmq.io/interfaces/v5.WorkerListener.html
- 2: https://api.docs.bullmq.io/interfaces/v3.WorkerListener.html
- 3: https://docs.bullmq.io/guide/workers
Guard job parameter against undefined in failed listener.
BullMQ v5's worker failed event signature is (job: Job | undefined, error, prev). The job parameter is undefined when stalled jobs are deleted, so accessing job.id directly will throw. Use job?.id ?? 'unknown' instead.
Defensive logging fix
worker.on('failed', (job, err) => {
- logger.error(`[Worker ${worker.name}] Job ${job.id} failed with error: ${err.message}`);
+ logger.error(`[Worker ${worker.name}] Job ${job?.id ?? 'unknown'} failed with error: ${err.message}`);
});📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| worker.on('failed', (job, err) => { | |
| logger.error(`[Worker ${worker.name}] Job ${job.id} failed with error: ${err.message}`); | |
| }); | |
| worker.on('failed', (job, err) => { | |
| logger.error(`[Worker ${worker.name}] Job ${job?.id ?? 'unknown'} failed with error: ${err.message}`); | |
| }); |
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@worker/tasks/index.js` around lines 13 - 15, The failed-event handler uses
job.id directly which can throw because BullMQ v5 may pass job as undefined;
update the worker.on('failed', (job, err) => ...) handler to guard the job
object when building the log message (e.g., derive an id string with job?.id ??
'unknown' or similar) and use that safe id when calling logger.error so the
listener never dereferences a null/undefined job.
worker/tasks/postReply.worker.js
Outdated
| youtubePostSucceeded = true; | ||
| const ytReplyId = response.data.id; | ||
|
|
||
| // 6. Update the Reply document to 'published' | ||
| reply.ytReplyId = ytReplyId; | ||
| reply.status = 'published'; | ||
| reply.publishedAt = new Date(); | ||
| await reply.save(); | ||
|
|
||
| // 7. Atomic increment of the Comment's reply count | ||
| await Comment.findByIdAndUpdate(comment._id, { $inc: { replyCount: 1 } }); | ||
|
|
There was a problem hiding this comment.
🧩 Analysis chain
🏁 Script executed:
wc -l worker/tasks/postReply.worker.jsRepository: ashutosh2652/ReplyPilot
Length of output: 103
🏁 Script executed:
cat -n worker/tasks/postReply.worker.js | head -120Repository: ashutosh2652/ReplyPilot
Length of output: 5011
Post-success database failures are not recoverable through the retry loop.
If reply.save() (line 75) fails after a successful YouTube post, the reply remains stuck in 'publishing' status. On retry, the atomic claim condition at line 21 (status: { $nin: ['published', 'publishing'] }) prevents reclaiming the job, causing line 28 to return 'already_handled' and halt further recovery attempts. The same issue applies to Comment.findByIdAndUpdate() at line 78 — if it fails, the replyCount stays stale with no recovery path. The code acknowledges this gap at line 97 ("Manual intervention may be needed") but provides no automatic fix.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@worker/tasks/postReply.worker.js` around lines 68 - 79, After a successful
YouTube post, the post-success DB updates (reply.save() and
Comment.findByIdAndUpdate()) must be made idempotent and retriable so failures
don't leave the Reply stuck in "publishing"; replace the plain reply.save() with
an atomic findOneAndUpdate on Reply (filter {_id: reply._id, status:
'publishing'}) that sets ytReplyId, status:'published' and publishedAt in one
operation, and perform Comment.findByIdAndUpdate(..., { $inc: { replyCount: 1 }
}) with a small retry/backoff loop; if retries exhaust, enqueue a durable
compensation/recovery job (or mark a separate recovery flag on Reply) so the
work can be reclaimed—refer to reply.save(), Comment.findByIdAndUpdate(), and
the claim/status check that uses status { $nin: ['published','publishing'] } to
ensure the change is idempotent and recoverable.
There was a problem hiding this comment.
Actionable comments posted: 2
♻️ Duplicate comments (3)
worker/tasks/generate.worker.js (1)
24-28:⚠️ Potential issue | 🟠 MajorAvoid persisting
personaIdwhen persona context is unresolved.If
Persona.findById(personaId)is null (orsystemPromptis empty), the code still saves the providedpersonaIdonReply, which can leave an invalid persona reference while generating a generic reply.Proposed fix
- let personaDescription = ''; + let personaDescription = ''; + let resolvedPersonaId = null; if (personaId) { const persona = await Persona.findById(personaId); - if (persona) { - personaDescription = persona.systemPrompt || persona.description || ''; - } + if (persona?.systemPrompt?.trim()) { + personaDescription = persona.systemPrompt; + resolvedPersonaId = persona._id; + } else { + logger.warn(`Generate Job ${job.id}: persona ${personaId} missing or has empty systemPrompt; ignoring persona context`); + } } @@ - personaId: personaId || null, + personaId: resolvedPersonaId,Also applies to: 47-47
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@worker/tasks/generate.worker.js` around lines 24 - 28, The code sets personaId on the Reply even when Persona.findById(personaId) returns null or provides no prompt, leaving invalid references; update generate.worker.js so that after fetching persona via Persona.findById(personaId) you only assign personaId to the Reply (and any persisted object) when persona exists and (persona.systemPrompt || persona.description) is non-empty; otherwise clear or omit personaId and fall back to the generic personaDescription path (this change should be applied to the block that assigns personaDescription and to the similar logic around the later occurrence near the other instance referenced in the review).worker/tasks/postReply.worker.js (2)
84-91:⚠️ Potential issue | 🟠 Major
replyCountcan still drift permanently after a successful publish.If Line 88 succeeds and Line 91 fails, the retry path will later return
already_handled, soComment.replyCountnever self-heals. Make the counter update recoverable/idempotent, or derive it from replies instead of doing a best-effort increment after the terminalpublishedtransition.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@worker/tasks/postReply.worker.js` around lines 84 - 91, The comment warns that if reply.save() succeeds but Comment.findByIdAndUpdate(...) fails the system will never recover the lost replyCount; make the counter update atomic or idempotent: either wrap the reply update and the Comment.$inc in a single MongoDB transaction/session so both persist or both roll back, or add an idempotency flag on Reply (e.g. reply.counted boolean) and change the increment to a conditional update that only increments when counted is false (atomically set counted=true and $inc replyCount in one operation), referencing reply.save(), Comment.findByIdAndUpdate, replyCount, and the published status transition to locate the logic to change.
66-81:⚠️ Potential issue | 🔴 CriticalAn ambiguous
comments.insert()attempt is still blindly retried.After Line 66 has been sent, a transport failure before the response arrives or a Line 81 save failure can leave YouTube having accepted the reply while MongoDB still has no
ytReplyId. The publiccomments.insertdocs only describesnippet.textOriginal/snippet.parentIdas request data and a comment resource as the success response; they do not expose any request-id or idempotency control, so retrying from this ambiguous state can publish a second public reply. Keep the reply in a reconciliation state once an insert attempt has been made, and verify/recover before issuing another insert. (developers.google.com)Also applies to: 99-116
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@worker/tasks/postReply.worker.js` around lines 66 - 81, The code blindly retries youtube.comments.insert, which can create duplicate public replies if the insert result is ambiguous; change flow in postReply.worker.js so that before calling youtube.comments.insert you mark the reply in DB as a reconciliation/pending state (e.g., set reply.state = 'pending_youtube' and await reply.save()), then perform the insert; on any transport/error or if saving after insert fails, do not immediately re-insert—instead call youtube.comments.list (use parentId = comment.ytCommentId) to search for an existing reply (match by textOriginal or by author/channel if available) and if found set reply.ytReplyId and reply.state = 'posted' and save; only if no matching existing comment is discovered should you retry the insert with backoff; update handling around youtube.comments.insert, the ytReplyId variable, youtubePostSucceeded flag, and reply.save() to implement this reconciliation logic.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@worker/tasks/generate.worker.js`:
- Around line 32-36: The worker is sending personaDescription in the POST
payload to /api/v1/generate but the FastAPI Pydantic schema ReplyInput (and the
generate_reply handler) expects persona, so personaDescription is ignored; fix
by renaming the payload key in the httpClient.post call to persona (replace
personaDescription with persona) so the API receives and uses the intended
persona value, or alternatively update the ReplyInput schema and generate_reply
parameter to accept and map personaDescription to persona if you prefer changing
the API contract.
In `@worker/tasks/postReply.worker.js`:
- Around line 18-25: The current claim query in Reply.findOneAndUpdate allows
rows with status 'publishing' (and other non-publishable states) so two workers
can both claim the same reply; change the predicate to only match explicit
publishable states (e.g., use status: { $in: ['ready_for_publish',
'pending_publish', ...] } or the exact statuses your state machine defines as
publishable) and exclude 'publishing', 'published', 'pending_review', and
'rejected' so only intended rows are claimable; keep the atomic set to status:
'publishing' and implement a separate lease/reconciliation path to detect and
recover stale 'publishing' rows rather than making them generally claimable.
---
Duplicate comments:
In `@worker/tasks/generate.worker.js`:
- Around line 24-28: The code sets personaId on the Reply even when
Persona.findById(personaId) returns null or provides no prompt, leaving invalid
references; update generate.worker.js so that after fetching persona via
Persona.findById(personaId) you only assign personaId to the Reply (and any
persisted object) when persona exists and (persona.systemPrompt ||
persona.description) is non-empty; otherwise clear or omit personaId and fall
back to the generic personaDescription path (this change should be applied to
the block that assigns personaDescription and to the similar logic around the
later occurrence near the other instance referenced in the review).
In `@worker/tasks/postReply.worker.js`:
- Around line 84-91: The comment warns that if reply.save() succeeds but
Comment.findByIdAndUpdate(...) fails the system will never recover the lost
replyCount; make the counter update atomic or idempotent: either wrap the reply
update and the Comment.$inc in a single MongoDB transaction/session so both
persist or both roll back, or add an idempotency flag on Reply (e.g.
reply.counted boolean) and change the increment to a conditional update that
only increments when counted is false (atomically set counted=true and $inc
replyCount in one operation), referencing reply.save(),
Comment.findByIdAndUpdate, replyCount, and the published status transition to
locate the logic to change.
- Around line 66-81: The code blindly retries youtube.comments.insert, which can
create duplicate public replies if the insert result is ambiguous; change flow
in postReply.worker.js so that before calling youtube.comments.insert you mark
the reply in DB as a reconciliation/pending state (e.g., set reply.state =
'pending_youtube' and await reply.save()), then perform the insert; on any
transport/error or if saving after insert fails, do not immediately
re-insert—instead call youtube.comments.list (use parentId =
comment.ytCommentId) to search for an existing reply (match by textOriginal or
by author/channel if available) and if found set reply.ytReplyId and reply.state
= 'posted' and save; only if no matching existing comment is discovered should
you retry the insert with backoff; update handling around
youtube.comments.insert, the ytReplyId variable, youtubePostSucceeded flag, and
reply.save() to implement this reconciliation logic.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: defaults
Review profile: CHILL
Plan: Pro
Run ID: dcad4c14-e040-4cb6-9007-a8563988497f
📒 Files selected for processing (2)
worker/tasks/generate.worker.jsworker/tasks/postReply.worker.js
| const aiResponse = await httpClient.post('/api/v1/generate', { | ||
| text: comment.text, | ||
| tone: tone || 'friendly', | ||
| personaDescription | ||
| }); |
There was a problem hiding this comment.
🧩 Analysis chain
🏁 Script executed:
#!/bin/bash
set -euo pipefail
# Verify the request model/handler for /api/v1/generate and expected field names.
rg -n -C3 --iglob '*.py' '/api/v1/generate|def .*generate|personaDescription|comment|persona|text' ai-service
rg -n -C3 --iglob '*.py' 'BaseModel|Request|comment:|text:|persona:|personaDescription:' ai-service
# Verify worker payload usage in JS side for cross-check.
rg -n -C3 --iglob '*.js' "httpClient\\.post\\('/api/v1/generate'|personaDescription|text: comment\\.text|tone:" worker serverRepository: ashutosh2652/ReplyPilot
Length of output: 11286
Fix field name mismatch: change worker payload personaDescription to persona.
The worker sends personaDescription in the POST request to /api/v1/generate, but the FastAPI schema (ReplyInput) expects persona. This causes Pydantic to ignore the field, and persona defaults to "default", skipping all persona conditioning. Either rename the field in the worker to persona, or update the API schema to accept personaDescription and map it to the generate_reply function parameter.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@worker/tasks/generate.worker.js` around lines 32 - 36, The worker is sending
personaDescription in the POST payload to /api/v1/generate but the FastAPI
Pydantic schema ReplyInput (and the generate_reply handler) expects persona, so
personaDescription is ignored; fix by renaming the payload key in the
httpClient.post call to persona (replace personaDescription with persona) so the
API receives and uses the intended persona value, or alternatively update the
ReplyInput schema and generate_reply parameter to accept and map
personaDescription to persona if you prefer changing the API contract.
| // 1. Atomically "claim" the reply by setting its status to 'publishing' | ||
| // This prevents other workers from picking up the same reply simultaneously | ||
| // Note: We allow 'publishing' status here to support retries in an idempotent way | ||
| const reply = await Reply.findOneAndUpdate( | ||
| { _id: replyId, status: { $ne: 'published' } }, | ||
| { $set: { status: 'publishing' } }, | ||
| { new: true } | ||
| ); |
There was a problem hiding this comment.
$ne: 'published' does not actually claim the reply.
Line 22 still matches rows already in publishing, so two overlapping jobs for the same replyId can both clear this step and race into the YouTube post. Because the Reply state machine also includes pending_review and rejected, this predicate can publish rows that were never meant to leave review. Restrict the claim to explicit publishable states only, and recover stale publishing rows via a lease/reconciliation path instead of making them generally claimable.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@worker/tasks/postReply.worker.js` around lines 18 - 25, The current claim
query in Reply.findOneAndUpdate allows rows with status 'publishing' (and other
non-publishable states) so two workers can both claim the same reply; change the
predicate to only match explicit publishable states (e.g., use status: { $in:
['ready_for_publish', 'pending_publish', ...] } or the exact statuses your state
machine defines as publishable) and exclude 'publishing', 'published',
'pending_review', and 'rejected' so only intended rows are claimable; keep the
atomic set to status: 'publishing' and implement a separate lease/reconciliation
path to detect and recover stale 'publishing' rows rather than making them
generally claimable.
Summary by CodeRabbit
New Features
Improvements