-
Notifications
You must be signed in to change notification settings - Fork 0
feat: implement distributed scheduler using Redis sorted sets #71
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
Ren0503
commented
Jan 17, 2026
- Add scheduler.go with Redis-based distributed scheduling
- Implement ScheduleJob, GetNextJobs, and RemoveJob methods
- Add comprehensive unit tests in scheduler_test.go
- Update queue.go to integrate new scheduler
- Maintain backward compatibility with cron patterns
- Update dependencies in go.mod and go.sum
Summary by CodeRabbit
✏️ Tip: You can customize this high-level summary in your review settings. WalkthroughReplaces cron-based scheduling with an internal Redis-backed scheduler: removes the cron dependency, adds background polling, exposes ScheduleJob/GetScheduledJobs/RemoveScheduledJob, integrates scheduler lifecycle into Queue (start/stop on New/Pause/Resume), adds pattern parsing and tests. Changes
Sequence DiagramsequenceDiagram
participant App as "App"
participant Queue as "Queue"
participant Scheduler as "Scheduler"
participant Redis as "Redis"
App->>Queue: New(... Pattern ...)
Queue->>Scheduler: startScheduler(interval)
Scheduler->>Scheduler: ticker loop (every Interval)
App->>Queue: ScheduleJob(jobId, runAt)
Queue->>Redis: ZADD scheduled_set runAt jobId
Redis-->>Queue: OK
Scheduler->>Redis: ZRANGEBYSCORE scheduled_set -inf now
Redis-->>Scheduler: [due jobIds]
Scheduler->>Redis: ZREM scheduled_set jobId
Scheduler->>Queue: AddJob(jobId)
Queue->>Redis: queue push ops
App->>Queue: GetScheduledJobs()
Queue->>Redis: ZRANGE scheduled_set 0 -1 WITHSCORES
Redis-->>Queue: [(jobId, score)...]
Queue-->>App: []ScheduledJobInfo
App->>Queue: RemoveScheduledJob(jobId)
Queue->>Redis: ZREM scheduled_set jobId
Redis-->>Queue: OK
App->>Queue: Pause()
Queue->>Scheduler: stopScheduler()
Scheduler-->>Queue: stopped
Estimated code review effort🎯 4 (Complex) | ⏱️ ~45 minutes Poem
🚥 Pre-merge checks | ✅ 2 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (2 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing touches
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
Codecov Report❌ Patch coverage is
📢 Thoughts on this report? Let us know! |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 8
🤖 Fix all issues with AI agents
In `@job.go`:
- Around line 112-116: The IsReady method currently returns true for both
WaitStatus and ActiveStatus; remove ActiveStatus from the readiness check so
IsReady only returns true when job.Status == WaitStatus, and update the comment
to state that only waiting jobs are ready (delayed/retries handled elsewhere);
locate the IsReady function and adjust the boolean comparison to reference only
WaitStatus (keep references to WaitStatus and ActiveStatus in the codebase for
clarity but do not include ActiveStatus in IsReady).
In `@queue.go`:
- Around line 430-443: Resume currently calls q.startScheduler unconditionally
which can spawn duplicate scheduler goroutines if Resume is called repeatedly;
update Queue.Resume to check a scheduler-running guard before calling
q.startScheduler (e.g., a boolean like q.schedulerRunning or an existing
q.scheduler handle) and only start the scheduler when that guard is false,
ensuring you set the guard true when startScheduler begins and clear it in
Pause/stop logic; keep existing behavior for q.config.Pattern and q.Run and
ensure access to the guard is protected with the same mutex used for q.running
if needed.
- Around line 100-117: The code treats opt.Pattern as a boolean flag and
duplicates the default interval logic in two places (the initializer and
Resume()), so either remove Pattern in favor of an explicit EnableScheduler bool
or implement cron parsing to compute the interval from Pattern (e.g., support
"@every ..." and cron expressions) and use that value when calling
queue.startScheduler; also extract the duplicated default interval logic into a
helper like computeScheduleInterval(opt) and replace both sites (the init block
referencing opt.Pattern/opt.ScheduleInterval and the Resume() method) to call
that helper and then queue.startScheduler(interval).
In `@scheduler_test.go`:
- Around line 64-95: Test_RemoveScheduledJob creates a schedulerQueue but never
stops it, causing resource leakage; add cleanup to call schedulerQueue.Pause()
when the test finishes (e.g., t.Cleanup(func(){ schedulerQueue.Pause() }) or
defer schedulerQueue.Pause() immediately after creating schedulerQueue) so the
queue started in Test_RemoveScheduledJob is properly paused and cleaned up.
- Around line 13-62: The Test_ScheduleJob leaves the scheduler goroutine and
Redis scheduled keys active; after asserting behavior call
schedulerQueue.Pause() (or the queue's stop method) to stop the scheduler
goroutine and remove/cleanup scheduled keys returned by
schedulerQueue.GetScheduledJobs() (e.g., delete scheduled job entries by their
JobId) so Redis state is cleared between tests; also replace the hardcoded
redis.Options Addr "localhost:6379" with a value read from an environment
variable (e.g., os.Getenv("REDIS_ADDR") with a sensible default) when creating
schedulerQueue to make the test CI-friendly.
In `@scheduler.go`:
- Around line 34-41: stopScheduler may close q.schedulerDone twice if called
multiple times because q.schedulerTicker is not cleared; update stopScheduler to
atomically stop and clear the ticker and ensure the done channel is closed only
once by checking and niling the fields: when q.schedulerTicker != nil, call
q.schedulerTicker.Stop(), set q.schedulerTicker = nil, then
close(q.schedulerDone) and set q.schedulerDone = nil (or otherwise guard closure
with a boolean), and keep the existing log via q.formatLog; reference the
stopScheduler method and the q.schedulerTicker / q.schedulerDone fields when
making the change.
- Around line 10-32: startScheduler can leak a goroutine if called while one is
already running because it overwrites q.schedulerDone and q.schedulerTicker
without signaling/closing the previous ones; before creating a new ticker and
done channel, check existing q.schedulerTicker/q.schedulerDone in startScheduler
and stop/close them (call q.schedulerTicker.Stop() and close the previous
q.schedulerDone or send a signal) to ensure the previous goroutine exits, then
assign the new q.schedulerTicker and q.schedulerDone and start the goroutine
that calls q.processScheduledJobs(); keep q.formatLog usage unchanged to report
the new interval.
- Around line 92-134: processScheduledJobs currently drops job payloads by
calling AddJob with Data: nil; instead persist job data in Redis (e.g., a hash
keyed by a pattern like "job:data:<id>") when scheduling, then in
processScheduledJobs after a successful ZRem of q.schedulerKey fetch the payload
via q.client.HGet (or HGetAll) for that jobId and pass the retrieved data into
q.AddJob so Data is preserved; also replace fmt.Sprintf("%f", now) with
strconv.FormatFloat(now, 'f', -1, 64) when building the score string.
🧹 Nitpick comments (1)
scheduler.go (1)
43-56: Consider validatingrunAtis in the future.Scheduling a job in the past will cause it to be immediately processed on the next scheduler tick, which may be unexpected behavior. Consider adding validation or at least documenting this behavior.
Optional: Add validation for runAt
func (q *Queue) ScheduleJob(jobId string, runAt time.Time) error { + if runAt.Before(time.Now()) { + q.formatLog(LoggerWarn, "Scheduling job %s in the past; will be processed immediately", jobId) + } score := float64(runAt.Unix())
| // IsReady returns true if the job is ready to be processed. | ||
| // Jobs are ready if they are waiting or active. | ||
| func (job *Job) IsReady() bool { | ||
| if job.queue.scheduler == nil { | ||
| return job.Status == WaitStatus || job.Status == ActiveStatus | ||
| } | ||
| return true | ||
| return job.Status == WaitStatus || job.Status == ActiveStatus | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🧩 Analysis chain
🏁 Script executed:
#!/bin/bash
# Find how IsReady is used in the codebase
rg -n -A5 -B2 'IsReady\(\)' job.go
# Also check the status constants to understand their meaning
echo "=== Job Status Constants ==="
rg -n 'Status.*=.*iota|WaitStatus|ActiveStatus|CompletedStatus|FailedStatus' job.go | head -20Repository: tinh-tinh/queue
Length of output: 975
🏁 Script executed:
#!/bin/bash
# Find all calls to IsReady() across the codebase
rg -n 'IsReady\(\)' --type go -A3 -B3
# Also find any filtering or selection logic related to job status
echo "=== Job selection/filtering logic ==="
rg -n 'Status.*==' --type go -B2 -A2 | head -40Repository: tinh-tinh/queue
Length of output: 2113
🏁 Script executed:
#!/bin/bash
# Get context around IsReady() usage in queue.go
sed -n '210,240p' queue.go
# Also check if there's any special handling for ActiveStatus
echo "=== Context around ActiveStatus handling ==="
sed -n '460,470p' queue.goRepository: tinh-tinh/queue
Length of output: 1108
🏁 Script executed:
#!/bin/bash
# Get the full Run() function to understand execution flow
sed -n '210,280p' queue.goRepository: tinh-tinh/queue
Length of output: 1551
🏁 Script executed:
#!/bin/bash
# Check the entire job.go file to understand state transitions
wc -l job.go
cat -n job.goRepository: tinh-tinh/queue
Length of output: 4272
🏁 Script executed:
#!/bin/bash
# Check if there are any other places where job status is modified
rg -n '\.Status\s*=' --type go -B2 -A1Repository: tinh-tinh/queue
Length of output: 2115
ActiveStatus should not be included in IsReady().
A job in ActiveStatus represents one currently being or previously being processed. Including it in the ready check violates job state semantics and could cause duplicate processing.
The job lifecycle should be: WaitStatus → ActiveStatus (during Process()) → CompletedStatus/FailedStatus/DelayedStatus (final states). Only WaitStatus jobs should be returned from IsReady(). Delayed jobs are handled separately in the Retry() method and should not be included here.
The comment "Jobs are ready if they are waiting or active" is misleading—only waiting jobs are ready for initial processing.
Suggested fix
// IsReady returns true if the job is ready to be processed.
-// Jobs are ready if they are waiting or active.
+// Jobs are ready if they are waiting.
func (job *Job) IsReady() bool {
- return job.Status == WaitStatus || job.Status == ActiveStatus
+ return job.Status == WaitStatus
}📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| // IsReady returns true if the job is ready to be processed. | |
| // Jobs are ready if they are waiting or active. | |
| func (job *Job) IsReady() bool { | |
| if job.queue.scheduler == nil { | |
| return job.Status == WaitStatus || job.Status == ActiveStatus | |
| } | |
| return true | |
| return job.Status == WaitStatus || job.Status == ActiveStatus | |
| } | |
| // IsReady returns true if the job is ready to be processed. | |
| // Jobs are ready if they are waiting. | |
| func (job *Job) IsReady() bool { | |
| return job.Status == WaitStatus | |
| } |
🤖 Prompt for AI Agents
In `@job.go` around lines 112 - 116, The IsReady method currently returns true for
both WaitStatus and ActiveStatus; remove ActiveStatus from the readiness check
so IsReady only returns true when job.Status == WaitStatus, and update the
comment to state that only waiting jobs are ready (delayed/retries handled
elsewhere); locate the IsReady function and adjust the boolean comparison to
reference only WaitStatus (keep references to WaitStatus and ActiveStatus in the
codebase for clarity but do not include ActiveStatus in IsReady).
| // Initialize scheduler key for distributed scheduling | ||
| queue.schedulerKey = queue.cachedKey + ":scheduled" | ||
|
|
||
| // Start distributed scheduler if Pattern is configured | ||
| if opt.Pattern != "" { | ||
| // TODO: Parse cron pattern to determine interval | ||
| // For now, use default interval from ScheduleInterval option | ||
| interval := opt.ScheduleInterval | ||
| if interval == 0 { | ||
| interval = 5 * time.Second | ||
| } | ||
| queue.startScheduler(interval) | ||
| } | ||
|
|
||
| if opt.Timeout == 0 { | ||
| queue.config.Timeout = 1 * time.Minute | ||
| } | ||
|
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🧩 Analysis chain
🏁 Script executed:
#!/bin/bash
# Check how Pattern is used in the codebase
echo "=== Searching for Pattern usage ==="
rg -n -A 2 -B 2 'Pattern' --type goRepository: tinh-tinh/queue
Length of output: 1796
🏁 Script executed:
#!/bin/bash
# Find and examine the Resume() method around lines 436-439
echo "=== Finding Resume() method ==="
rg -n 'func.*Resume' --type go -A 10Repository: tinh-tinh/queue
Length of output: 445
🏁 Script executed:
#!/bin/bash
# Look for cron parsing or scheduler initialization
echo "=== Searching for cron-related code ==="
rg -n -i 'cron|parse' --type go | head -20Repository: tinh-tinh/queue
Length of output: 328
🏁 Script executed:
#!/bin/bash
# Find the queue.go file and check its structure
echo "=== Finding queue.go ==="
fd -name "queue.go" -type fRepository: tinh-tinh/queue
Length of output: 331
🏁 Script executed:
#!/bin/bash
# Check startScheduler implementation
echo "=== Examining startScheduler implementation ==="
rg -n 'func.*startScheduler' --type go -A 20Repository: tinh-tinh/queue
Length of output: 887
🏁 Script executed:
#!/bin/bash
# Check if Pattern field is used anywhere except as a boolean check
echo "=== Looking for other Pattern usages ==="
rg -n '\bPattern\b' --type go -B 1 -A 1 | grep -v testRepository: tinh-tinh/queue
Length of output: 749
🏁 Script executed:
#!/bin/bash
# Check for any cron library imports or usage
echo "=== Checking for cron libraries ==="
rg -n 'import|cron|robfig|cronexpr' --type go | head -30Repository: tinh-tinh/queue
Length of output: 638
The Pattern option is misleading—cron syntax is not actually parsed, and the scheduler logic is duplicated.
The TODO on lines 105-106 confirms the cron pattern is not being parsed. Despite the documentation claiming Pattern is "the cron pattern to use for scheduling jobs," the implementation ignores the pattern value entirely—it acts only as a boolean flag to enable/disable the scheduler. Users providing patterns like "@every 1s" or "0 * * * *" (as shown in tests) might reasonably expect cron-compatible behavior, but the code uses only ScheduleInterval for polling with no pattern evaluation.
Additionally, the default interval logic (lines 107-110) is duplicated identically in Resume() (lines 436-439). Both check Pattern != "", set a default interval of 5 seconds, and call startScheduler(interval).
Consider either:
- Removing
Patternand using an explicitEnableScheduler booloption, or - Implementing actual cron pattern parsing to determine the interval
Extract the default interval logic to a helper method to eliminate duplication.
🤖 Prompt for AI Agents
In `@queue.go` around lines 100 - 117, The code treats opt.Pattern as a boolean
flag and duplicates the default interval logic in two places (the initializer
and Resume()), so either remove Pattern in favor of an explicit EnableScheduler
bool or implement cron parsing to compute the interval from Pattern (e.g.,
support "@every ..." and cron expressions) and use that value when calling
queue.startScheduler; also extract the duplicated default interval logic into a
helper like computeScheduleInterval(opt) and replace both sites (the init block
referencing opt.Pattern/opt.ScheduleInterval and the Resume() method) to call
that helper and then queue.startScheduler(interval).
| // Resume resumes the queue from a paused state. When resumed, the queue will | ||
| // accept new jobs and run any jobs in the queue. | ||
| // accept new jobs and run any jobs in the queue. The scheduler is also restarted | ||
| // if it was previously configured. | ||
| func (q *Queue) Resume() { | ||
| q.running = true | ||
| if q.config.Pattern != "" { | ||
| interval := q.config.ScheduleInterval | ||
| if interval == 0 { | ||
| interval = 5 * time.Second | ||
| } | ||
| q.startScheduler(interval) | ||
| } | ||
| q.Run() | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Potential issue: calling startScheduler without checking if already running.
If Resume() is called multiple times without Pause() in between, this could start multiple scheduler goroutines. Consider adding a guard to prevent duplicate scheduler starts.
Suggested fix
func (q *Queue) Resume() {
q.running = true
if q.config.Pattern != "" {
+ // Prevent duplicate scheduler starts
+ if q.schedulerTicker != nil {
+ return
+ }
interval := q.config.ScheduleInterval
if interval == 0 {
interval = 5 * time.Second
}
q.startScheduler(interval)
}
q.Run()
}🤖 Prompt for AI Agents
In `@queue.go` around lines 430 - 443, Resume currently calls q.startScheduler
unconditionally which can spawn duplicate scheduler goroutines if Resume is
called repeatedly; update Queue.Resume to check a scheduler-running guard before
calling q.startScheduler (e.g., a boolean like q.schedulerRunning or an existing
q.scheduler handle) and only start the scheduler when that guard is false,
ensuring you set the guard true when startScheduler begins and clear it in
Pause/stop logic; keep existing behavior for q.config.Pattern and q.Run and
ensure access to the guard is protected with the same mutex used for q.running
if needed.
| func Test_ScheduleJob(t *testing.T) { | ||
| schedulerQueue := queue.New("scheduler_test", &queue.Options{ | ||
| Connect: &redis.Options{ | ||
| Addr: "localhost:6379", | ||
| Password: "", | ||
| DB: 0, | ||
| }, | ||
| Workers: 3, | ||
| RetryFailures: 0, | ||
| Pattern: "@every 1s", // Enable scheduler | ||
| ScheduleInterval: 1 * time.Second, | ||
| }) | ||
|
|
||
| // Track processed jobs | ||
| processedJobs := make(map[string]bool) | ||
| var mu sync.Mutex | ||
|
|
||
| schedulerQueue.Process(func(job *queue.Job) { | ||
| job.Process(func() error { | ||
| mu.Lock() | ||
| processedJobs[job.Id] = true | ||
| mu.Unlock() | ||
| return nil | ||
| }) | ||
| }) | ||
|
|
||
| // Schedule a job to run 2 seconds from now | ||
| runAt := time.Now().Add(2 * time.Second) | ||
| err := schedulerQueue.ScheduleJob("scheduled_job_1", runAt) | ||
| require.Nil(t, err) | ||
|
|
||
| // Verify job is in scheduled set | ||
| scheduledJobs, err := schedulerQueue.GetScheduledJobs() | ||
| require.Nil(t, err) | ||
| require.Equal(t, 1, len(scheduledJobs)) | ||
| require.Equal(t, "scheduled_job_1", scheduledJobs[0].JobId) | ||
|
|
||
| // Wait for job to be processed (2s + 1s buffer) | ||
| time.Sleep(3 * time.Second) | ||
|
|
||
| // Verify job was processed | ||
| mu.Lock() | ||
| require.True(t, processedJobs["scheduled_job_1"]) | ||
| mu.Unlock() | ||
|
|
||
| // Verify job is no longer in scheduled set | ||
| scheduledJobs, err = schedulerQueue.GetScheduledJobs() | ||
| require.Nil(t, err) | ||
| require.Equal(t, 0, len(scheduledJobs)) | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Test cleanup missing: scheduler goroutine and Redis state not cleaned up.
The test doesn't call Pause() to stop the scheduler goroutine, which could leak into subsequent tests. Additionally, the scheduled job keys in Redis are not cleaned up, which could cause test pollution.
Suggested fix
// Verify job is no longer in scheduled set
scheduledJobs, err = schedulerQueue.GetScheduledJobs()
require.Nil(t, err)
require.Equal(t, 0, len(scheduledJobs))
+
+ // Cleanup
+ schedulerQueue.Pause()
}Consider using environment variable for Redis address.
Hardcoding localhost:6379 may not work in all CI/CD environments.
📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| func Test_ScheduleJob(t *testing.T) { | |
| schedulerQueue := queue.New("scheduler_test", &queue.Options{ | |
| Connect: &redis.Options{ | |
| Addr: "localhost:6379", | |
| Password: "", | |
| DB: 0, | |
| }, | |
| Workers: 3, | |
| RetryFailures: 0, | |
| Pattern: "@every 1s", // Enable scheduler | |
| ScheduleInterval: 1 * time.Second, | |
| }) | |
| // Track processed jobs | |
| processedJobs := make(map[string]bool) | |
| var mu sync.Mutex | |
| schedulerQueue.Process(func(job *queue.Job) { | |
| job.Process(func() error { | |
| mu.Lock() | |
| processedJobs[job.Id] = true | |
| mu.Unlock() | |
| return nil | |
| }) | |
| }) | |
| // Schedule a job to run 2 seconds from now | |
| runAt := time.Now().Add(2 * time.Second) | |
| err := schedulerQueue.ScheduleJob("scheduled_job_1", runAt) | |
| require.Nil(t, err) | |
| // Verify job is in scheduled set | |
| scheduledJobs, err := schedulerQueue.GetScheduledJobs() | |
| require.Nil(t, err) | |
| require.Equal(t, 1, len(scheduledJobs)) | |
| require.Equal(t, "scheduled_job_1", scheduledJobs[0].JobId) | |
| // Wait for job to be processed (2s + 1s buffer) | |
| time.Sleep(3 * time.Second) | |
| // Verify job was processed | |
| mu.Lock() | |
| require.True(t, processedJobs["scheduled_job_1"]) | |
| mu.Unlock() | |
| // Verify job is no longer in scheduled set | |
| scheduledJobs, err = schedulerQueue.GetScheduledJobs() | |
| require.Nil(t, err) | |
| require.Equal(t, 0, len(scheduledJobs)) | |
| } | |
| func Test_ScheduleJob(t *testing.T) { | |
| schedulerQueue := queue.New("scheduler_test", &queue.Options{ | |
| Connect: &redis.Options{ | |
| Addr: "localhost:6379", | |
| Password: "", | |
| DB: 0, | |
| }, | |
| Workers: 3, | |
| RetryFailures: 0, | |
| Pattern: "@every 1s", // Enable scheduler | |
| ScheduleInterval: 1 * time.Second, | |
| }) | |
| // Track processed jobs | |
| processedJobs := make(map[string]bool) | |
| var mu sync.Mutex | |
| schedulerQueue.Process(func(job *queue.Job) { | |
| job.Process(func() error { | |
| mu.Lock() | |
| processedJobs[job.Id] = true | |
| mu.Unlock() | |
| return nil | |
| }) | |
| }) | |
| // Schedule a job to run 2 seconds from now | |
| runAt := time.Now().Add(2 * time.Second) | |
| err := schedulerQueue.ScheduleJob("scheduled_job_1", runAt) | |
| require.Nil(t, err) | |
| // Verify job is in scheduled set | |
| scheduledJobs, err := schedulerQueue.GetScheduledJobs() | |
| require.Nil(t, err) | |
| require.Equal(t, 1, len(scheduledJobs)) | |
| require.Equal(t, "scheduled_job_1", scheduledJobs[0].JobId) | |
| // Wait for job to be processed (2s + 1s buffer) | |
| time.Sleep(3 * time.Second) | |
| // Verify job was processed | |
| mu.Lock() | |
| require.True(t, processedJobs["scheduled_job_1"]) | |
| mu.Unlock() | |
| // Verify job is no longer in scheduled set | |
| scheduledJobs, err = schedulerQueue.GetScheduledJobs() | |
| require.Nil(t, err) | |
| require.Equal(t, 0, len(scheduledJobs)) | |
| // Cleanup | |
| schedulerQueue.Pause() | |
| } |
🤖 Prompt for AI Agents
In `@scheduler_test.go` around lines 13 - 62, The Test_ScheduleJob leaves the
scheduler goroutine and Redis scheduled keys active; after asserting behavior
call schedulerQueue.Pause() (or the queue's stop method) to stop the scheduler
goroutine and remove/cleanup scheduled keys returned by
schedulerQueue.GetScheduledJobs() (e.g., delete scheduled job entries by their
JobId) so Redis state is cleared between tests; also replace the hardcoded
redis.Options Addr "localhost:6379" with a value read from an environment
variable (e.g., os.Getenv("REDIS_ADDR") with a sensible default) when creating
schedulerQueue to make the test CI-friendly.
| func Test_RemoveScheduledJob(t *testing.T) { | ||
| schedulerQueue := queue.New("remove_scheduled_test", &queue.Options{ | ||
| Connect: &redis.Options{ | ||
| Addr: "localhost:6379", | ||
| Password: "", | ||
| DB: 0, | ||
| }, | ||
| Workers: 3, | ||
| RetryFailures: 0, | ||
| Pattern: "@every 1s", | ||
| ScheduleInterval: 1 * time.Second, | ||
| }) | ||
|
|
||
| // Schedule a job for 5 seconds from now | ||
| runAt := time.Now().Add(5 * time.Second) | ||
| err := schedulerQueue.ScheduleJob("job_to_remove", runAt) | ||
| require.Nil(t, err) | ||
|
|
||
| // Verify job is scheduled | ||
| scheduledJobs, err := schedulerQueue.GetScheduledJobs() | ||
| require.Nil(t, err) | ||
| require.Equal(t, 1, len(scheduledJobs)) | ||
|
|
||
| // Remove the scheduled job | ||
| err = schedulerQueue.RemoveScheduledJob("job_to_remove") | ||
| require.Nil(t, err) | ||
|
|
||
| // Verify job is no longer scheduled | ||
| scheduledJobs, err = schedulerQueue.GetScheduledJobs() | ||
| require.Nil(t, err) | ||
| require.Equal(t, 0, len(scheduledJobs)) | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same cleanup issues as Test_ScheduleJob.
This test also lacks scheduler cleanup via Pause(). Consider adding cleanup in a t.Cleanup() callback or defer.
Suggested fix using t.Cleanup
func Test_RemoveScheduledJob(t *testing.T) {
schedulerQueue := queue.New("remove_scheduled_test", &queue.Options{
Connect: &redis.Options{
Addr: "localhost:6379",
Password: "",
DB: 0,
},
Workers: 3,
RetryFailures: 0,
Pattern: "@every 1s",
ScheduleInterval: 1 * time.Second,
})
+ t.Cleanup(func() {
+ schedulerQueue.Pause()
+ })🤖 Prompt for AI Agents
In `@scheduler_test.go` around lines 64 - 95, Test_RemoveScheduledJob creates a
schedulerQueue but never stops it, causing resource leakage; add cleanup to call
schedulerQueue.Pause() when the test finishes (e.g., t.Cleanup(func(){
schedulerQueue.Pause() }) or defer schedulerQueue.Pause() immediately after
creating schedulerQueue) so the queue started in Test_RemoveScheduledJob is
properly paused and cleaned up.
| // processScheduledJobs checks for jobs ready to run and moves them to the waiting list. | ||
| // This method is called periodically by the scheduler loop. | ||
| func (q *Queue) processScheduledJobs() { | ||
| now := float64(time.Now().Unix()) | ||
|
|
||
| // Find all jobs with score <= current timestamp | ||
| results, err := q.client.ZRangeByScoreWithScores(q.ctx, q.schedulerKey, &redis.ZRangeBy{ | ||
| Min: "-inf", | ||
| Max: fmt.Sprintf("%f", now), | ||
| }).Result() | ||
|
|
||
| if err != nil { | ||
| q.formatLog(LoggerError, "Failed to get ready scheduled jobs: %v", err) | ||
| return | ||
| } | ||
|
|
||
| if len(results) == 0 { | ||
| return | ||
| } | ||
|
|
||
| // Process each ready job | ||
| for _, z := range results { | ||
| jobId, ok := z.Member.(string) | ||
| if !ok { | ||
| continue | ||
| } | ||
|
|
||
| // Atomically remove from scheduled set (only one instance will succeed) | ||
| removed, err := q.client.ZRem(q.ctx, q.schedulerKey, jobId).Result() | ||
| if err != nil || removed == 0 { | ||
| // Another instance already processed this job | ||
| continue | ||
| } | ||
|
|
||
| // Add job to the queue | ||
| q.AddJob(AddJobOptions{ | ||
| Id: jobId, | ||
| Data: nil, // Scheduled jobs don't have data in this implementation | ||
| }) | ||
|
|
||
| q.formatLog(LoggerInfo, "Moved scheduled job %s to waiting list", jobId) | ||
| } | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Critical: Scheduled jobs lose their data payload.
On line 129, Data: nil means that when a scheduled job is moved to the waiting queue, it has no associated data. This is a significant limitation that makes the scheduler much less useful - users would expect to schedule a job with data and have that data available when processed.
Consider storing job data alongside the job ID in Redis (e.g., using a hash or storing serialized job data as the member).
Would you like me to suggest an implementation that preserves job data using a Redis hash alongside the sorted set?
Minor: Use strconv.FormatFloat instead of fmt.Sprintf for the score.
Minor performance improvement
results, err := q.client.ZRangeByScoreWithScores(q.ctx, q.schedulerKey, &redis.ZRangeBy{
Min: "-inf",
- Max: fmt.Sprintf("%f", now),
+ Max: strconv.FormatFloat(now, 'f', -1, 64),
}).Result()🤖 Prompt for AI Agents
In `@scheduler.go` around lines 92 - 134, processScheduledJobs currently drops job
payloads by calling AddJob with Data: nil; instead persist job data in Redis
(e.g., a hash keyed by a pattern like "job:data:<id>") when scheduling, then in
processScheduledJobs after a successful ZRem of q.schedulerKey fetch the payload
via q.client.HGet (or HGetAll) for that jobId and pass the retrieved data into
q.AddJob so Data is preserved; also replace fmt.Sprintf("%f", now) with
strconv.FormatFloat(now, 'f', -1, 64) when building the score string.
3d69f78 to
8e697c0
Compare
- Add scheduler.go with Redis-based distributed scheduling - Implement ScheduleJob, GetNextJobs, and RemoveJob methods - Add comprehensive unit tests in scheduler_test.go - Update queue.go to integrate new scheduler - Maintain backward compatibility with cron patterns Pattern Parser Enhancements: - Add pattern_parser.go with support for @every and cron expressions - Support cron patterns: */N minutes/hours, hourly, daily, weekly, monthly - Add pattern_parser_test.go with 26 test cases - Automatic interval calculation from cron expressions Goroutine Leak Fixes: - Add schedulerRunning flag to prevent duplicate scheduler instances - Fix double-close panic with proper nil checks - Add scheduler_leak_test.go to verify no leaks on pause/resume - Capture ticker/channel in local variables to prevent race conditions All tests passing (9 scheduler tests + 26 pattern tests + 4 leak tests)
8e697c0 to
e6faade
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 3
🤖 Fix all issues with AI agents
In `@pattern_parser_test.go`:
- Around line 126-129: The "empty pattern" test case is misleading because
opt.Pattern == "" causes scheduler initialization to be skipped (see queue.go
check if opt.Pattern != ""), so either remove this test case or change it to
assert the "no scheduler started" behavior; locate the test case named "empty
pattern" in pattern_parser_test.go and either delete the table entry or update
its expected outcome to reflect that an empty Pattern means "no scheduler" (or
add a comment clarifying that behavior), and ensure any assertions reference the
scheduler initialization logic (e.g., opt.Pattern and the scheduler start path)
rather than expecting fallback parsing.
In `@scheduler_leak_test.go`:
- Around line 84-96: The test has a data race: the map processedJobs is written
inside the worker callback (q.Process / job.Process) and read from the main test
goroutine; protect concurrent access by replacing processedJobs with a
concurrency-safe structure (e.g., use sync.Map or guard the map with a
sync.Mutex) and update accesses inside the q.Process callback and the
verification read accordingly (references: processedJobs, q.Process,
job.Process); ensure writes use the safe API (Store/Load if sync.Map or
lock/unlock around map writes/reads) so the race is eliminated.
- Around line 117-118: The test currently uses a no-op assertion require.True(t,
true, ...) which always passes; replace it with a real non-panic check by
wrapping repeated q.Pause() calls in require.NotPanics (or require.NotPanicsf)
to assert multiple Pause() invocations do not panic—for example, call
require.NotPanics(t, func(){ q.Pause(); q.Pause(); q.Pause() }) so the test
verifies Pause() behavior instead of a tautology.
♻️ Duplicate comments (3)
queue.go (2)
104-119: Interval resolution logic is duplicated inResume().The interval resolution logic (check
ScheduleInterval, fallback toparsePattern, fallback to 5s) appears identically in bothNew()(lines 106-117) andResume()(lines 443-454). Consider extracting to a helper method.Suggested refactor
// resolveScheduleInterval returns the polling interval for the scheduler. func (q *Queue) resolveScheduleInterval() time.Duration { interval := q.config.ScheduleInterval if interval == 0 { parsedInterval, err := parsePattern(q.config.Pattern) if err != nil { q.formatLog(LoggerWarn, "Failed to parse pattern '%s': %v, using default 5s interval", q.config.Pattern, err) interval = 5 * time.Second } else { interval = parsedInterval } } return interval }Then in
New()andResume():if opt.Pattern != "" { queue.startScheduler(queue.resolveScheduleInterval()) }
440-458: Verify thatstartSchedulerguards against duplicate starts.The past review flagged that calling
Resume()multiple times could start duplicate scheduler goroutines. The fix should either be instartScheduler()(checkingschedulerRunning) or here inResume().#!/bin/bash # Check if startScheduler has a guard against duplicate starts rg -n -A 20 'func.*startScheduler' --type gojob.go (1)
112-116:ActiveStatusshould not be included inIsReady().Including
ActiveStatusin the readiness check violates job state semantics and could cause duplicate processing. A job inActiveStatusis currently being processed, and onlyWaitStatusjobs should be considered ready for initial processing.The job lifecycle should be:
WaitStatus→ActiveStatus(duringProcess()) →CompletedStatus/FailedStatus/DelayedStatus.Suggested fix
// IsReady returns true if the job is ready to be processed. -// Jobs are ready if they are waiting or active. +// Jobs are ready if they are waiting. func (job *Job) IsReady() bool { - return job.Status == WaitStatus || job.Status == ActiveStatus + return job.Status == WaitStatus }
🧹 Nitpick comments (3)
pattern_parser_test.go (2)
102-118: Tests don't verify the actual parsed interval.The tests only assert that the queue is created successfully (
require.NotNil(t, q)), but don't verify that the pattern was parsed to the expected duration. This means the tests could pass even if the parsing logic is incorrect.Consider adding a way to verify the actual interval used, or testing
parsePatterndirectly if it can be exported for testing.
102-118: Missing cleanup for queues with active schedulers.Each test creates a queue with an active scheduler but doesn't clean up. This can cause goroutine leaks across test runs.
Suggested fix for valid pattern tests
t.Run(tt.name, func(t *testing.T) { // Test indirectly through Queue creation q := queue.New("test_pattern_"+tt.name, &queue.Options{ Connect: &redis.Options{ Addr: "localhost:6379", Password: "", DB: 0, }, Workers: 1, RetryFailures: 0, Pattern: tt.pattern, // Don't set ScheduleInterval to force pattern parsing }) require.NotNil(t, q) + t.Cleanup(func() { + q.Pause() // Stop the scheduler + }) })pattern_parser.go (1)
95-98: Hourly detection may be overly broad.The condition
hour == "*" && day == "*" && month == "*" && weekday == "*"would match patterns like"5 * * * *"(run at minute 5 of every hour), returning a 1-hour interval. However, it would also match"*/5 * * * *"if the*/5check above fails for some reason.More importantly, the comment says "Hourly: '0 * * * *' or 'N * * * *'" but the logic doesn't validate the minute field. A pattern like
"30 * * * *"would correctly be treated as hourly, but users might expect different behavior.Consider adding a comment clarifying that any non-step minute pattern with all wildcards for other fields is treated as hourly.
| { | ||
| name: "empty pattern", | ||
| pattern: "", | ||
| }, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Empty pattern test doesn't exercise the fallback behavior.
Based on queue.go line 105, when Pattern is empty (""), the scheduler initialization is skipped entirely (if opt.Pattern != "" { ... }). This test case won't trigger the pattern parsing fallback—it simply won't start a scheduler at all.
Consider removing this case or documenting that empty pattern means "no scheduler" rather than "fallback to default."
🤖 Prompt for AI Agents
In `@pattern_parser_test.go` around lines 126 - 129, The "empty pattern" test case
is misleading because opt.Pattern == "" causes scheduler initialization to be
skipped (see queue.go check if opt.Pattern != ""), so either remove this test
case or change it to assert the "no scheduler started" behavior; locate the test
case named "empty pattern" in pattern_parser_test.go and either delete the table
entry or update its expected outcome to reflect that an empty Pattern means "no
scheduler" (or add a comment clarifying that behavior), and ensure any
assertions reference the scheduler initialization logic (e.g., opt.Pattern and
the scheduler start path) rather than expecting fallback parsing.
| processedJobs := make(map[string]bool) | ||
| q.Process(func(job *queue.Job) { | ||
| job.Process(func() error { | ||
| processedJobs[job.Id] = true | ||
| return nil | ||
| }) | ||
| }) | ||
|
|
||
| // Wait for job to be processed | ||
| time.Sleep(3 * time.Second) | ||
|
|
||
| // Verify job was processed | ||
| require.True(t, processedJobs["test_job"], "Job should have been processed after resume") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Data race: concurrent map access without synchronization.
The processedJobs map is written from the Process callback (which runs in a worker goroutine) and read in the main test goroutine. This is a data race.
Suggested fix using sync.Map
// Verify scheduler is working by checking if job gets processed
- processedJobs := make(map[string]bool)
+ var processedJobs sync.Map
q.Process(func(job *queue.Job) {
job.Process(func() error {
- processedJobs[job.Id] = true
+ processedJobs.Store(job.Id, true)
return nil
})
})
// Wait for job to be processed
time.Sleep(3 * time.Second)
// Verify job was processed
- require.True(t, processedJobs["test_job"], "Job should have been processed after resume")
+ _, processed := processedJobs.Load("test_job")
+ require.True(t, processed, "Job should have been processed after resume")📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| processedJobs := make(map[string]bool) | |
| q.Process(func(job *queue.Job) { | |
| job.Process(func() error { | |
| processedJobs[job.Id] = true | |
| return nil | |
| }) | |
| }) | |
| // Wait for job to be processed | |
| time.Sleep(3 * time.Second) | |
| // Verify job was processed | |
| require.True(t, processedJobs["test_job"], "Job should have been processed after resume") | |
| var processedJobs sync.Map | |
| q.Process(func(job *queue.Job) { | |
| job.Process(func() error { | |
| processedJobs.Store(job.Id, true) | |
| return nil | |
| }) | |
| }) | |
| // Wait for job to be processed | |
| time.Sleep(3 * time.Second) | |
| // Verify job was processed | |
| _, processed := processedJobs.Load("test_job") | |
| require.True(t, processed, "Job should have been processed after resume") |
🤖 Prompt for AI Agents
In `@scheduler_leak_test.go` around lines 84 - 96, The test has a data race: the
map processedJobs is written inside the worker callback (q.Process /
job.Process) and read from the main test goroutine; protect concurrent access by
replacing processedJobs with a concurrency-safe structure (e.g., use sync.Map or
guard the map with a sync.Mutex) and update accesses inside the q.Process
callback and the verification read accordingly (references: processedJobs,
q.Process, job.Process); ensure writes use the safe API (Store/Load if sync.Map
or lock/unlock around map writes/reads) so the race is eliminated.
| // Verify no panic occurred | ||
| require.True(t, true, "Multiple Pause calls should not panic") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No-op assertion: require.True(t, true, ...) always passes.
This assertion doesn't actually verify anything. If testing that multiple Pause() calls don't panic, use require.NotPanics.
Suggested fix
- // Verify no panic occurred
- require.True(t, true, "Multiple Pause calls should not panic")
+ // If we reach here without panic, the test passesOr wrap the calls in require.NotPanics:
require.NotPanics(t, func() {
q.Pause()
q.Pause()
q.Pause()
})📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| // Verify no panic occurred | |
| require.True(t, true, "Multiple Pause calls should not panic") | |
| // If we reach here without panic, the test passes |
🤖 Prompt for AI Agents
In `@scheduler_leak_test.go` around lines 117 - 118, The test currently uses a
no-op assertion require.True(t, true, ...) which always passes; replace it with
a real non-panic check by wrapping repeated q.Pause() calls in require.NotPanics
(or require.NotPanicsf) to assert multiple Pause() invocations do not panic—for
example, call require.NotPanics(t, func(){ q.Pause(); q.Pause(); q.Pause() }) so
the test verifies Pause() behavior instead of a tautology.