Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
53 changes: 42 additions & 11 deletions queue_job/jobrunner/runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,7 @@

from . import queue_job_config
from .channels import ENQUEUED, NOT_DONE, PENDING, ChannelManager
from ..job import FAILED

SELECT_TIMEOUT = 60
ERROR_RECOVERY_DELAY = 5
Expand Down Expand Up @@ -214,20 +215,50 @@ def set_job_pending():
conn = psycopg2.connect(**connection_info)
conn.set_isolation_level(ISOLATION_LEVEL_AUTOCOMMIT)
with closing(conn.cursor()) as cr:
# First, get the current retry count and max_retries to check limits
cr.execute(
"UPDATE queue_job SET state=%s, "
"date_enqueued=NULL, date_started=NULL "
"WHERE uuid=%s and state=%s "
"RETURNING uuid",
(PENDING, job_uuid, ENQUEUED),
"SELECT retry, max_retries FROM queue_job WHERE uuid=%s AND state=%s",
(job_uuid, ENQUEUED),
)
if cr.fetchone():
_logger.warning(
"state of job %s was reset from %s to %s",
job_uuid,
ENQUEUED,
PENDING,
result = cr.fetchone()
if not result:
return
current_retry, max_retries = result

# Increment retry counter for timeout
new_retry = current_retry + 1

# Check if we have exceeded max retries
if max_retries and new_retry >= max_retries:
# Mark job as failed due to max retries exceeded
cr.execute(
"UPDATE queue_job SET state=%s, retry=%s, "
"date_enqueued=NULL, date_started=NULL, "
"exc_name=%s, exc_message=%s, exc_info=%s "
"WHERE uuid=%s and state=%s",
(FAILED, new_retry, 'TimeoutError',
f'Job failed due to timeout after {new_retry} retries (max: {max_retries})',
f'Timeout occurred during job execution. Retry count: {new_retry}/{max_retries}',
job_uuid, ENQUEUED),
)
if cr.rowcount:
_logger.warning(
"Job %s failed due to timeout after %d retries (max: %d)",
job_uuid, new_retry, max_retries
)
else:
# Reset to pending with incremented retry count
cr.execute(
"UPDATE queue_job SET state=%s, retry=%s, "
"date_enqueued=NULL, date_started=NULL "
"WHERE uuid=%s and state=%s",
(PENDING, new_retry, job_uuid, ENQUEUED),
)
if cr.rowcount:
_logger.warning(
"Job %s timeout, retry %d/%s, reset to %s",
job_uuid, new_retry, max_retries or '∞', PENDING,
)

# TODO: better way to HTTP GET asynchronously (grequest, ...)?
# if this was python3 I would be doing this with
Expand Down
Loading