diff --git a/modelq/app/base.py b/modelq/app/base.py index 3f96ff1..a5d5664 100644 --- a/modelq/app/base.py +++ b/modelq/app/base.py @@ -152,6 +152,9 @@ def __init__( ) self.worker_threads = [] + # Set on SIGTERM/SIGINT so worker loops stop pulling and exit cleanly + # after finishing their current task (see shutdown()). + self._shutdown_event = threading.Event() if server_id is None: # Attempt to load the server_id from a local file: server_id = self._get_or_create_server_id_file() @@ -662,6 +665,21 @@ def wrapper(*args, **kwargs): return wrapper return decorator + def shutdown(self, timeout: float = 300.0): + """Signal worker loops to stop after their current task, then wait. + + Called on SIGTERM/SIGINT (see the CLI) so an in-flight task finishes — + and, for synchronous task handlers, its upload completes — before the + process exits, instead of daemon worker threads being killed mid-task + (which orphans the task and loses its result). Waits up to `timeout` + seconds in total for the workers to drain. + """ + self._shutdown_event.set() + deadline = time.time() + timeout + for thread in self.worker_threads: + remaining = max(0.1, deadline - time.time()) + thread.join(timeout=remaining) + def start_workers(self, no_of_workers: int = 1): """ Starts worker threads to pop tasks from 'ml_tasks' and process them. @@ -703,7 +721,7 @@ def start_workers(self, no_of_workers: int = 1): # 4) Worker threads def worker_loop(worker_id): self.check_middleware("after_worker_boot") - while True: + while not self._shutdown_event.is_set(): try: # Check worker health before picking up tasks if not self.worker_healthy: @@ -712,7 +730,7 @@ def worker_loop(worker_id): continue self.update_server_status(f"worker_{worker_id}: idle") - task_data = self.redis_client.blpop("ml_tasks") # blocks until a task is available + task_data = self.redis_client.blpop("ml_tasks", timeout=5) # wake every 5s so shutdown is noticed promptly if not task_data: continue diff --git a/modelq/app/cli/main.py b/modelq/app/cli/main.py index a18989c..0b8c24b 100644 --- a/modelq/app/cli/main.py +++ b/modelq/app/cli/main.py @@ -93,7 +93,11 @@ def run_workers( raise typer.Exit(1) finally: logger.info("Shutting down workers...") - typer.echo("🛑 Shutting down workers...") + typer.echo("🛑 Draining workers (finishing any in-flight task)...") + try: + app_instance.shutdown() + except Exception as e: + logger.warning(f"Error while draining workers: {e}") typer.echo("✅ Shutdown complete") @app.command() diff --git a/pyproject.toml b/pyproject.toml index bba1b93..2d5ad6b 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [tool.poetry] name = "modelq" -version = "1.0.14" +version = "1.0.15" description = "Celery-like task queue for ML inference." authors = ["Tanmaypatil123 "] readme = "README.md"