[refactor] Remove legacy thread management for TaskQueue

- Add proper async worker management to TaskQueue class
- Remove redundant task_worker_thread and task_worker_lock global variables
- Replace manual threading with async task management
- Update is_processing() logic to use TaskQueue state instead of thread status
- Implement automatic worker cleanup when queue processing completes
- Simplify queue start endpoint to use TaskQueue.start_worker()
This commit is contained in:
bymyself 2025-06-13 14:46:11 -07:00
parent a59e6e176e
commit 706b6d8317

View File

@ -210,7 +210,31 @@ class TaskQueue:
self.batch_id = None
self.batch_start_time = None
self.batch_state_before = None
self._worker_task = None
def is_processing(self) -> bool:
"""Check if the queue is currently processing tasks"""
return (
self._worker_task is not None
and not self._worker_task.done()
and (len(self.running_tasks) > 0 or len(self.pending_tasks) > 0)
)
async def start_worker(self) -> bool:
"""Start the task worker if not already running. Returns True if started, False if already running."""
if self._worker_task is not None and not self._worker_task.done():
return False # Already running
self._worker_task = asyncio.create_task(self._worker())
return True
async def _worker(self):
"""Internal worker that processes the task queue"""
try:
await task_worker()
finally:
# Clean up worker reference when done
self._worker_task = None
def get_current_state(self) -> TaskStateMessage:
return TaskStateMessage(
@ -332,7 +356,7 @@ class TaskQueue:
ui_id=item.ui_id,
result=result_msg,
kind=item.kind,
status=pydantic_status,
status=status,
timestamp=datetime.fromisoformat(timestamp),
state=self.get_current_state(),
),
@ -542,10 +566,6 @@ if args.preview_method == latent_preview.LatentPreviewMethod.NoPreviews:
else:
logging.warning("[ComfyUI-Manager] Since --preview-method is set, ComfyUI-Manager's preview method feature will be ignored.")
# Legacy variables for compatibility
task_worker_thread = None
task_worker_lock = threading.Lock()
# Note: Model path utilities moved to model_utils.py to avoid duplication
@ -1394,8 +1414,7 @@ async def queue_count(request):
"done_count": len(history_client_tasks),
"in_progress_count": len(running_client_tasks),
"pending_count": len(pending_client_tasks),
"is_processing": task_worker_thread is not None
and task_worker_thread.is_alive(),
"is_processing": len(running_client_tasks) > 0,
}
)
else:
@ -1406,32 +1425,22 @@ async def queue_count(request):
"done_count": task_queue.done_count(),
"in_progress_count": len(task_queue.running_tasks),
"pending_count": len(task_queue.pending_tasks),
"is_processing": task_worker_thread is not None
and task_worker_thread.is_alive(),
"is_processing": task_queue.is_processing(),
}
)
task_worker_thread: threading.Thread = None
@routes.get("/v2/manager/queue/start")
async def queue_start(request):
with task_worker_lock:
# finalize_temp_queue_batch()
return _queue_start()
def _queue_start():
global task_worker_thread
if task_worker_thread is not None and task_worker_thread.is_alive():
return web.Response(status=201) # already in-progress
task_worker_thread = threading.Thread(target=lambda: asyncio.run(task_worker()))
task_worker_thread.start()
return web.Response(status=200)
# finalize_temp_queue_batch()
started = await task_queue.start_worker()
if started:
return web.Response(status=200) # Started successfully
else:
return web.Response(status=201) # Already in-progress
@routes.get("/v2/manager/queue/update_comfyui")