Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
56 changes: 43 additions & 13 deletions queue_job/jobrunner/runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -207,27 +207,57 @@ def _connection_info_for(db_name):


def _async_http_get(scheme, host, port, user, password, db_name, job_uuid):
# Method to set failed job (due to timeout, etc) as pending,
# to avoid keeping it as enqueued.
# Method to set failed job (due to timeout, etc) as pending or failed,
# to avoid keeping it as enqueued. Properly handles retry counting.
def set_job_pending():
connection_info = _connection_info_for(db_name)
conn = psycopg2.connect(**connection_info)
conn.set_isolation_level(ISOLATION_LEVEL_AUTOCOMMIT)
with closing(conn.cursor()) as cr:
# First, get the current retry count and max_retries
cr.execute(
"UPDATE queue_job SET state=%s, "
"date_enqueued=NULL, date_started=NULL "
"WHERE uuid=%s and state=%s "
"RETURNING uuid",
(PENDING, job_uuid, ENQUEUED),
"SELECT retry, max_retries FROM queue_job WHERE uuid=%s AND state=%s",
(job_uuid, ENQUEUED),
)
if cr.fetchone():
_logger.warning(
"state of job %s was reset from %s to %s",
job_uuid,
ENQUEUED,
PENDING,
result = cr.fetchone()
if not result:
return

current_retry, max_retries = result
new_retry = current_retry + 1

# Check if we've exceeded the retry limit
if max_retries and new_retry >= max_retries:
# Set job to FAILED state with timeout error message
cr.execute(
"UPDATE queue_job SET state=%s, retry=%s, "
"date_enqueued=NULL, date_started=NULL, "
"exc_name=%s, exc_message=%s "
"WHERE uuid=%s AND state=%s "
"RETURNING uuid",
(FAILED, new_retry, "requests.Timeout",
"Job timed out after %d retries" % new_retry,
job_uuid, ENQUEUED),
)
if cr.fetchone():
_logger.warning(
"Job %s failed due to timeout after %d retries (max: %d)",
job_uuid, new_retry, max_retries
)
else:
# Set job to PENDING state for retry
cr.execute(
"UPDATE queue_job SET state=%s, retry=%s, "
"date_enqueued=NULL, date_started=NULL "
"WHERE uuid=%s AND state=%s "
"RETURNING uuid",
(PENDING, new_retry, job_uuid, ENQUEUED),
)
if cr.fetchone():
_logger.warning(
"Job %s timed out, retry %d/%d, reset to %s",
job_uuid, new_retry, max_retries or "unlimited", PENDING
)

# TODO: better way to HTTP GET asynchronously (grequest, ...)?
# if this was python3 I would be doing this with
Expand Down
Loading
Loading