From 706b6d831728ff469fe88264100ab3cefb0f0a87 Mon Sep 17 00:00:00 2001 From: bymyself Date: Fri, 13 Jun 2025 14:46:11 -0700 Subject: [PATCH] [refactor] Remove legacy thread management for TaskQueue - Add proper async worker management to TaskQueue class - Remove redundant task_worker_thread and task_worker_lock global variables - Replace manual threading with async task management - Update is_processing() logic to use TaskQueue state instead of thread status - Implement automatic worker cleanup when queue processing completes - Simplify queue start endpoint to use TaskQueue.start_worker() --- comfyui_manager/glob/manager_server.py | 59 +++++++++++++++----------- 1 file changed, 34 insertions(+), 25 deletions(-) diff --git a/comfyui_manager/glob/manager_server.py b/comfyui_manager/glob/manager_server.py index 1918df8a..41e2d0ac 100644 --- a/comfyui_manager/glob/manager_server.py +++ b/comfyui_manager/glob/manager_server.py @@ -210,7 +210,31 @@ class TaskQueue: self.batch_id = None self.batch_start_time = None self.batch_state_before = None + self._worker_task = None + def is_processing(self) -> bool: + """Check if the queue is currently processing tasks""" + return ( + self._worker_task is not None + and not self._worker_task.done() + and (len(self.running_tasks) > 0 or len(self.pending_tasks) > 0) + ) + + async def start_worker(self) -> bool: + """Start the task worker if not already running. Returns True if started, False if already running.""" + if self._worker_task is not None and not self._worker_task.done(): + return False # Already running + + self._worker_task = asyncio.create_task(self._worker()) + return True + + async def _worker(self): + """Internal worker that processes the task queue""" + try: + await task_worker() + finally: + # Clean up worker reference when done + self._worker_task = None def get_current_state(self) -> TaskStateMessage: return TaskStateMessage( @@ -332,7 +356,7 @@ class TaskQueue: ui_id=item.ui_id, result=result_msg, kind=item.kind, - status=pydantic_status, + status=status, timestamp=datetime.fromisoformat(timestamp), state=self.get_current_state(), ), @@ -542,10 +566,6 @@ if args.preview_method == latent_preview.LatentPreviewMethod.NoPreviews: else: logging.warning("[ComfyUI-Manager] Since --preview-method is set, ComfyUI-Manager's preview method feature will be ignored.") -# Legacy variables for compatibility -task_worker_thread = None -task_worker_lock = threading.Lock() - # Note: Model path utilities moved to model_utils.py to avoid duplication @@ -1394,8 +1414,7 @@ async def queue_count(request): "done_count": len(history_client_tasks), "in_progress_count": len(running_client_tasks), "pending_count": len(pending_client_tasks), - "is_processing": task_worker_thread is not None - and task_worker_thread.is_alive(), + "is_processing": len(running_client_tasks) > 0, } ) else: @@ -1406,32 +1425,22 @@ async def queue_count(request): "done_count": task_queue.done_count(), "in_progress_count": len(task_queue.running_tasks), "pending_count": len(task_queue.pending_tasks), - "is_processing": task_worker_thread is not None - and task_worker_thread.is_alive(), + "is_processing": task_queue.is_processing(), } ) -task_worker_thread: threading.Thread = None @routes.get("/v2/manager/queue/start") async def queue_start(request): - with task_worker_lock: - # finalize_temp_queue_batch() - return _queue_start() - - -def _queue_start(): - global task_worker_thread - - if task_worker_thread is not None and task_worker_thread.is_alive(): - return web.Response(status=201) # already in-progress - - task_worker_thread = threading.Thread(target=lambda: asyncio.run(task_worker())) - task_worker_thread.start() - - return web.Response(status=200) + # finalize_temp_queue_batch() + started = await task_queue.start_worker() + + if started: + return web.Response(status=200) # Started successfully + else: + return web.Response(status=201) # Already in-progress @routes.get("/v2/manager/queue/update_comfyui")