Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 20 additions & 2 deletions modelq/app/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,9 @@ def __init__(
)

self.worker_threads = []
# Set on SIGTERM/SIGINT so worker loops stop pulling and exit cleanly
# after finishing their current task (see shutdown()).
self._shutdown_event = threading.Event()
if server_id is None:
# Attempt to load the server_id from a local file:
server_id = self._get_or_create_server_id_file()
Expand Down Expand Up @@ -662,6 +665,21 @@ def wrapper(*args, **kwargs):
return wrapper
return decorator

def shutdown(self, timeout: float = 300.0):
"""Signal worker loops to stop after their current task, then wait.

Called on SIGTERM/SIGINT (see the CLI) so an in-flight task finishes —
and, for synchronous task handlers, its upload completes — before the
process exits, instead of daemon worker threads being killed mid-task
(which orphans the task and loses its result). Waits up to `timeout`
seconds in total for the workers to drain.
"""
self._shutdown_event.set()
deadline = time.time() + timeout
for thread in self.worker_threads:
remaining = max(0.1, deadline - time.time())
thread.join(timeout=remaining)

def start_workers(self, no_of_workers: int = 1):
"""
Starts worker threads to pop tasks from 'ml_tasks' and process them.
Expand Down Expand Up @@ -703,7 +721,7 @@ def start_workers(self, no_of_workers: int = 1):
# 4) Worker threads
def worker_loop(worker_id):
self.check_middleware("after_worker_boot")
while True:
while not self._shutdown_event.is_set():
try:
# Check worker health before picking up tasks
if not self.worker_healthy:
Expand All @@ -712,7 +730,7 @@ def worker_loop(worker_id):
continue

self.update_server_status(f"worker_{worker_id}: idle")
task_data = self.redis_client.blpop("ml_tasks") # blocks until a task is available
task_data = self.redis_client.blpop("ml_tasks", timeout=5) # wake every 5s so shutdown is noticed promptly
if not task_data:
continue

Expand Down
6 changes: 5 additions & 1 deletion modelq/app/cli/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,11 @@ def run_workers(
raise typer.Exit(1)
finally:
logger.info("Shutting down workers...")
typer.echo("🛑 Shutting down workers...")
typer.echo("🛑 Draining workers (finishing any in-flight task)...")
try:
app_instance.shutdown()
except Exception as e:
logger.warning(f"Error while draining workers: {e}")
typer.echo("✅ Shutdown complete")

@app.command()
Expand Down
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[tool.poetry]
name = "modelq"
version = "1.0.14"
version = "1.0.15"
description = "Celery-like task queue for ML inference."
authors = ["Tanmaypatil123 <tanmay@modelslab.com>"]
readme = "README.md"
Expand Down
Loading