-
Notifications
You must be signed in to change notification settings - Fork 0
Revert "feat: implement distributed scheduler using Redis sorted sets" #72
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Revert "feat: implement distributed scheduler using Redis sorted sets" #72
Conversation
Summary by CodeRabbit
✏️ Tip: You can customize this high-level summary in your review settings. WalkthroughThis PR replaces the custom Redis-backed distributed scheduler with a cron-based scheduler using the robfig/cron library. Pattern parsing logic, scheduler management code, and associated tests are removed, while queue structure is refactored to use cron scheduling with updated job readiness logic. Changes
Sequence DiagramsequenceDiagram
participant Job as Job
participant Queue as Queue
participant Cron as Cron Scheduler
participant Redis as Redis
Note over Job,Cron: New Cron-Based Scheduling Flow
Job->>Queue: ScheduleJob(jobId, pattern)
Queue->>Cron: Add job entry with pattern
Cron->>Cron: Evaluate pattern (async)
loop Every tick
Cron->>Cron: Check if pattern matches current time
alt Pattern matched
Cron->>Queue: Trigger job execution
Queue->>Redis: Mark job as ready
Queue->>Job: Process
end
end
Job->>Queue: IsReady()
alt Queue has scheduler
Queue->>Job: return true
else No scheduler
Queue->>Job: return status == waiting || active
end
Estimated code review effort🎯 4 (Complex) | ⏱️ ~60 minutes Possibly related PRs
Poem
🚥 Pre-merge checks | ✅ 3✅ Passed checks (3 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing touches
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 3
🤖 Fix all issues with AI agents
In `@job.go`:
- Around line 112-120: IsReady currently returns true whenever
job.queue.scheduler != nil, which causes jobs with terminal statuses (e.g.,
CompletedStatus, FailedStatus) to be treated as ready and re-run; update
Job.IsReady to also check job.Status and return false for finished statuses (at
minimum CompletedStatus and FailedStatus) when job.queue.scheduler is set, or
alternatively modify the cron-triggered Run() path to explicitly skip jobs with
those terminal statuses (refer to Job.IsReady, Job.Status, CompletedStatus,
FailedStatus, queue.scheduler, Run(), and the queue's q.jobs handling) so
finished jobs are not re-processed if RemoveOnComplete/RemoveOnFail is disabled.
In `@queue.go`:
- Around line 191-199: Process registers a cron job every time it's called,
causing duplicate entries because q.scheduler.AddFunc(q.cronPattern, func() {
q.Run() }) is invoked repeatedly; modify Queue.Process to guard against
re-registering by tracking whether the cron job was already added (e.g., add a
boolean field like cronRegistered or schedulerStarted on the Queue struct) and
only call AddFunc and q.scheduler.Start() when that flag is false, setting the
flag to true after successful AddFunc; ensure the guard checks q.scheduler !=
nil and still assigns q.jobFnc each call so jobFnc can be updated without
duplicating the cron registration.
- Around line 420-429: Pause currently only flips q.running but leaves the cron
scheduler running and there's no shutdown path; update Pause to call the cron
scheduler's stop method (e.g., q.scheduler.Stop()) to halt scheduled goroutines
and avoid repeated Run() invocations, and update Resume to start the scheduler
(e.g., q.scheduler.Start()) only when it is not already running before setting
q.running = true and calling q.Run(); additionally add a public Close or Stop
method on Queue (e.g., func (q *Queue) Close()) that sets q.running = false,
stops the scheduler (q.scheduler.Stop()) and performs any other cleanup needed
to avoid resource leaks and goroutine retention so the queue can be cleanly shut
down.
| // IsReady returns true if the job is ready to be processed. If the job uses a | ||
| // scheduler, it will always be ready. Otherwise, the job is ready if it is | ||
| // waiting or active. | ||
| func (job *Job) IsReady() bool { | ||
| return job.Status == WaitStatus || job.Status == ActiveStatus | ||
| if job.queue.scheduler == nil { | ||
| return job.Status == WaitStatus || job.Status == ActiveStatus | ||
| } | ||
| return true | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Potential issue: Finished jobs incorrectly marked as ready when scheduler is active.
When a scheduler is configured, IsReady() unconditionally returns true, including for jobs with CompletedStatus or FailedStatus. If RemoveOnComplete or RemoveOnFail is disabled, these finished jobs remain in q.jobs and will be re-processed on subsequent cron triggers.
Consider filtering out finished jobs:
Proposed fix
func (job *Job) IsReady() bool {
if job.queue.scheduler == nil {
return job.Status == WaitStatus || job.Status == ActiveStatus
}
- return true
+ return job.Status == WaitStatus || job.Status == ActiveStatus
}Alternatively, ensure the cron-triggered Run() path explicitly skips finished jobs if this behavior is intentional.
🤖 Prompt for AI Agents
In `@job.go` around lines 112 - 120, IsReady currently returns true whenever
job.queue.scheduler != nil, which causes jobs with terminal statuses (e.g.,
CompletedStatus, FailedStatus) to be treated as ready and re-run; update
Job.IsReady to also check job.Status and return false for finished statuses (at
minimum CompletedStatus and FailedStatus) when job.queue.scheduler is set, or
alternatively modify the cron-triggered Run() path to explicitly skip jobs with
those terminal statuses (refer to Job.IsReady, Job.Status, CompletedStatus,
FailedStatus, queue.scheduler, Run(), and the queue's q.jobs handling) so
finished jobs are not re-processed if RemoveOnComplete/RemoveOnFail is disabled.
| func (q *Queue) Process(jobFnc JobFnc) { | ||
| q.jobFnc = jobFnc | ||
| if q.scheduler != nil { | ||
| _, err := q.scheduler.AddFunc(q.cronPattern, func() { q.Run() }) | ||
| if err != nil { | ||
| q.formatLog(LoggerError, "failed to add job: %v", err) | ||
| } | ||
| q.scheduler.Start() | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Calling Process() multiple times will register duplicate cron jobs.
Each call to Process() adds a new cron entry via AddFunc(). If called more than once, Run() will be triggered multiple times per cron interval, causing duplicate job processing.
Proposed fix: Guard against duplicate registration
func (q *Queue) Process(jobFnc JobFnc) {
q.jobFnc = jobFnc
if q.scheduler != nil {
+ // Prevent duplicate registration if Process is called multiple times
+ q.scheduler.Stop()
+ q.scheduler = cron.New()
_, err := q.scheduler.AddFunc(q.cronPattern, func() { q.Run() })
if err != nil {
q.formatLog(LoggerError, "failed to add job: %v", err)
+ return
}
q.scheduler.Start()
}
}Alternatively, track whether the scheduler has been initialized with a boolean flag.
📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| func (q *Queue) Process(jobFnc JobFnc) { | |
| q.jobFnc = jobFnc | |
| if q.scheduler != nil { | |
| _, err := q.scheduler.AddFunc(q.cronPattern, func() { q.Run() }) | |
| if err != nil { | |
| q.formatLog(LoggerError, "failed to add job: %v", err) | |
| } | |
| q.scheduler.Start() | |
| } | |
| func (q *Queue) Process(jobFnc JobFnc) { | |
| q.jobFnc = jobFnc | |
| if q.scheduler != nil { | |
| // Prevent duplicate registration if Process is called multiple times | |
| q.scheduler.Stop() | |
| q.scheduler = cron.New() | |
| _, err := q.scheduler.AddFunc(q.cronPattern, func() { q.Run() }) | |
| if err != nil { | |
| q.formatLog(LoggerError, "failed to add job: %v", err) | |
| return | |
| } | |
| q.scheduler.Start() | |
| } | |
| } |
🤖 Prompt for AI Agents
In `@queue.go` around lines 191 - 199, Process registers a cron job every time
it's called, causing duplicate entries because
q.scheduler.AddFunc(q.cronPattern, func() { q.Run() }) is invoked repeatedly;
modify Queue.Process to guard against re-registering by tracking whether the
cron job was already added (e.g., add a boolean field like cronRegistered or
schedulerStarted on the Queue struct) and only call AddFunc and
q.scheduler.Start() when that flag is false, setting the flag to true after
successful AddFunc; ensure the guard checks q.scheduler != nil and still assigns
q.jobFnc each call so jobFnc can be updated without duplicating the cron
registration.
| func (q *Queue) Pause() { | ||
| q.running = false | ||
| q.stopScheduler() | ||
| } | ||
|
|
||
| // Resume resumes the queue from a paused state. When resumed, the queue will | ||
| // accept new jobs and run any jobs in the queue. The scheduler is also restarted | ||
| // if it was previously configured. | ||
| // accept new jobs and run any jobs in the queue. | ||
| func (q *Queue) Resume() { | ||
| q.running = true | ||
| if q.config.Pattern != "" { | ||
| interval := q.config.ScheduleInterval | ||
| if interval == 0 { | ||
| // Try to parse the pattern to get interval | ||
| parsedInterval, err := parsePattern(q.config.Pattern) | ||
| if err != nil { | ||
| // Log warning and fall back to default | ||
| q.formatLog(LoggerWarn, "Failed to parse pattern '%s': %v, using default 5s interval", q.config.Pattern, err) | ||
| interval = 5 * time.Second | ||
| } else { | ||
| interval = parsedInterval | ||
| } | ||
| } | ||
| q.startScheduler(interval) | ||
| } | ||
| q.Run() | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pause() does not stop the cron scheduler, and there's no cleanup mechanism.
Two issues:
-
Pause doesn't stop scheduler: When
Pause()is called, the cron scheduler continues firing, causing repeatedRun()calls that immediately return after attempting mutex locks and logging warnings. This is wasteful and generates log noise. -
Resource leak: The
*cron.Cronscheduler spawns goroutines but is never stopped. There's noClose()orStop()method exposed to shut down the scheduler gracefully.
Proposed fix: Add scheduler control in Pause/Resume and a Close method
func (q *Queue) Pause() {
q.running = false
+ if q.scheduler != nil {
+ q.scheduler.Stop()
+ }
}
func (q *Queue) Resume() {
q.running = true
+ if q.scheduler != nil {
+ q.scheduler.Start()
+ }
q.Run()
}
+
+// Close stops the scheduler and releases resources.
+func (q *Queue) Close() {
+ q.running = false
+ if q.scheduler != nil {
+ ctx := q.scheduler.Stop()
+ <-ctx.Done() // Wait for running jobs to complete
+ }
+}🤖 Prompt for AI Agents
In `@queue.go` around lines 420 - 429, Pause currently only flips q.running but
leaves the cron scheduler running and there's no shutdown path; update Pause to
call the cron scheduler's stop method (e.g., q.scheduler.Stop()) to halt
scheduled goroutines and avoid repeated Run() invocations, and update Resume to
start the scheduler (e.g., q.scheduler.Start()) only when it is not already
running before setting q.running = true and calling q.Run(); additionally add a
public Close or Stop method on Queue (e.g., func (q *Queue) Close()) that sets
q.running = false, stops the scheduler (q.scheduler.Stop()) and performs any
other cleanup needed to avoid resource leaks and goroutine retention so the
queue can be cleanly shut down.
Reverts #71