diff --git a/comfyui_manager/glob/manager_server.py b/comfyui_manager/glob/manager_server.py index 8f00dd39..799cb953 100644 --- a/comfyui_manager/glob/manager_server.py +++ b/comfyui_manager/glob/manager_server.py @@ -105,21 +105,23 @@ def is_loopback(address): return False -def error_response(status: int, message: str, error_type: Optional[str] = None) -> web.Response: +def error_response( + status: int, message: str, error_type: Optional[str] = None +) -> web.Response: """Create a standardized error response. - + Args: status: HTTP status code message: Error message error_type: Optional error type/category - + Returns: web.Response with JSON error body """ error_data = {"error": message} if error_type: error_data["error_type"] = error_type - + return web.json_response(error_data, status=status) @@ -192,17 +194,14 @@ class TaskQueue: def is_processing(self) -> bool: """Check if the queue is currently processing tasks""" - return ( - self._worker_task is not None - and self._worker_task.is_alive() - ) - + return self._worker_task is not None and self._worker_task.is_alive() + def start_worker(self) -> bool: """Start the task worker if not already running. Returns True if started, False if already running.""" if self._worker_task is not None and self._worker_task.is_alive(): logging.debug("[ComfyUI-Manager] Worker already running, skipping start") return False # Already running - + logging.debug("[ComfyUI-Manager] Starting task worker thread") self._worker_task = threading.Thread(target=lambda: asyncio.run(task_worker())) self._worker_task.start() @@ -228,7 +227,7 @@ class TaskQueue: client_id: Optional client ID. If None, broadcasts to all clients. If provided, sends only to that specific client. """ - PromptServer.instance.send_sync(msg, update.model_dump(mode='json'), client_id) + PromptServer.instance.send_sync(msg, update.model_dump(mode="json"), client_id) def put(self, item) -> None: """Add a task to the queue. Item can be a dict or QueueTaskItem model.""" @@ -248,13 +247,16 @@ class TaskQueue: # Use current_index as priority (earlier tasks have lower numbers) priority = self.current_index self.current_index += 1 - + # Push tuple: (priority, task_counter, item) # task_counter ensures stable sort for items with same priority heapq.heappush(self.pending_tasks, (priority, self.task_counter, item)) logging.debug( "[ComfyUI-Manager] Task added to queue: kind=%s, ui_id=%s, client_id=%s, pending_count=%d", - item.kind, item.ui_id, item.client_id, len(self.pending_tasks) + item.kind, + item.ui_id, + item.client_id, + len(self.pending_tasks), ) self.not_empty.notify() @@ -283,7 +285,11 @@ class TaskQueue: self.task_counter += 1 logging.debug( "[ComfyUI-Manager] Task retrieved from queue: kind=%s, ui_id=%s, task_index=%d, running_count=%d, pending_count=%d", - item.kind, item.ui_id, task_index, len(self.running_tasks), len(self.pending_tasks) + item.kind, + item.ui_id, + task_index, + len(self.running_tasks), + len(self.pending_tasks), ) TaskQueue.send_queue_state_update( ManagerMessageName.cm_task_started.value, @@ -313,16 +319,17 @@ class TaskQueue: self.running_tasks.pop(task_index, None) logging.debug( "[ComfyUI-Manager] Task completed: kind=%s, ui_id=%s, task_index=%d, status=%s, running_count=%d", - item.kind, item.ui_id, task_index, + item.kind, + item.ui_id, + task_index, status.status_str if status else "unknown", - len(self.running_tasks) + len(self.running_tasks), ) # Manage history size if len(self.history_tasks) > MAXIMUM_HISTORY_SIZE: self.history_tasks.pop(next(iter(self.history_tasks))) - # Update history self.history_tasks[item.ui_id] = TaskHistoryItem( ui_id=item.ui_id, @@ -334,18 +341,32 @@ class TaskQueue: ) # Force cache refresh for successful pack-modifying operations - pack_modifying_tasks = {OperationType.install.value, OperationType.uninstall.value, OperationType.enable.value, OperationType.disable.value} - if (item.kind in pack_modifying_tasks and - status and status.status_str == OperationResult.success.value): + pack_modifying_tasks = { + OperationType.install.value, + OperationType.uninstall.value, + OperationType.enable.value, + OperationType.disable.value, + } + if ( + item.kind in pack_modifying_tasks + and status + and status.status_str == OperationResult.success.value + ): try: logging.debug( "[ComfyUI-Manager] Refreshing cache after successful %s operation", - item.kind + item.kind, ) # Force unified_manager to refresh its installed packages cache - await core.unified_manager.reload(ManagerDatabaseSource.cache.value, dont_wait=True, update_cnr_map=False) + await core.unified_manager.reload( + ManagerDatabaseSource.cache.value, + dont_wait=True, + update_cnr_map=False, + ) except Exception as e: - logging.warning(f"[ComfyUI-Manager] Failed to refresh cache after {item.kind}: {e}") + logging.warning( + f"[ComfyUI-Manager] Failed to refresh cache after {item.kind}: {e}" + ) # Send WebSocket message indicating task is complete TaskQueue.send_queue_state_update( @@ -379,7 +400,9 @@ class TaskQueue: with self.mutex: pending_count = len(self.pending_tasks) self.pending_tasks = [] - logging.debug("[ComfyUI-Manager] Queue wiped: cleared %d pending tasks", pending_count) + logging.debug( + "[ComfyUI-Manager] Queue wiped: cleared %d pending tasks", pending_count + ) def abort(self) -> None: """Abort current operations""" @@ -390,7 +413,8 @@ class TaskQueue: self.running_tasks = {} logging.debug( "[ComfyUI-Manager] Queue aborted: cleared %d pending and %d running tasks", - pending_count, running_count + pending_count, + running_count, ) def delete_history_item(self, ui_id: str) -> None: @@ -457,8 +481,9 @@ class TaskQueue: context.manager_batch_history_path, self.batch_id + ".json" ) logging.debug( - "[ComfyUI-Manager] Finalizing batch: batch_id=%s, history_count=%d", - self.batch_id, len(self.history_tasks) + "[ComfyUI-Manager] Finalizing batch: batch_id=%s, history_count=%d", + self.batch_id, + len(self.history_tasks), ) try: @@ -475,13 +500,25 @@ class TaskQueue: operations=operations, total_operations=len(operations), successful_operations=len( - [op for op in operations if op.result == OperationResult.success.value] + [ + op + for op in operations + if op.result == OperationResult.success.value + ] ), failed_operations=len( - [op for op in operations if op.result == OperationResult.failed.value] + [ + op + for op in operations + if op.result == OperationResult.failed.value + ] ), skipped_operations=len( - [op for op in operations if op.result == OperationResult.skipped.value] + [ + op + for op in operations + if op.result == OperationResult.skipped.value + ] ), ) @@ -493,9 +530,12 @@ class TaskQueue: logging.debug( "[ComfyUI-Manager] Batch history saved: batch_id=%s, path=%s, total_ops=%d, successful=%d, failed=%d, skipped=%d", - self.batch_id, batch_path, batch_record.total_operations, - batch_record.successful_operations, batch_record.failed_operations, - batch_record.skipped_operations + self.batch_id, + batch_path, + batch_record.total_operations, + batch_record.successful_operations, + batch_record.failed_operations, + batch_record.skipped_operations, ) # Reset batch tracking @@ -545,11 +585,11 @@ class TaskQueue: # Determine install method and repository URL install_method = "git" if pack_info.get("aux_id") else "cnr" repository_url = None - + if pack_info.get("aux_id"): # It's a git-based node, construct GitHub URL repository_url = f"https://github.com/{pack_info['aux_id']}" - + installed_nodes[pack_name] = InstalledNodeInfo( name=pack_name, version=pack_info.get("ver", "unknown"), @@ -587,7 +627,7 @@ class TaskQueue: def _get_manager_version(self) -> str: """Get ComfyUI Manager version.""" try: - version_code = getattr(core, 'version_code', [4, 0]) + version_code = getattr(core, "version_code", [4, 0]) return f"V{version_code[0]}.{version_code[1]}" except Exception: return None @@ -596,13 +636,13 @@ class TaskQueue: """Get current security level.""" try: config = core.get_config() - level_str = config.get('security_level', 'normal') + level_str = config.get("security_level", "normal") # Map the string to SecurityLevel enum level_mapping = { - 'strong': SecurityLevel.strong, - 'normal': SecurityLevel.normal, - 'normal-': SecurityLevel.normal_, - 'weak': SecurityLevel.weak + "strong": SecurityLevel.strong, + "normal": SecurityLevel.normal, + "normal-": SecurityLevel.normal_, + "weak": SecurityLevel.weak, } return level_mapping.get(level_str, SecurityLevel.normal) except Exception: @@ -612,7 +652,7 @@ class TaskQueue: """Get current network mode.""" try: config = core.get_config() - return config.get('network_mode', 'online') + return config.get("network_mode", "online") except Exception: return None @@ -620,14 +660,14 @@ class TaskQueue: """Get selected CLI arguments.""" try: cli_args = {} - if hasattr(args, 'listen'): - cli_args['listen'] = args.listen - if hasattr(args, 'port'): - cli_args['port'] = args.port - if hasattr(args, 'preview_method'): - cli_args['preview_method'] = str(args.preview_method) - if hasattr(args, 'enable_manager_legacy_ui'): - cli_args['enable_manager_legacy_ui'] = args.enable_manager_legacy_ui + if hasattr(args, "listen"): + cli_args["listen"] = args.listen + if hasattr(args, "port"): + cli_args["port"] = args.port + if hasattr(args, "preview_method"): + cli_args["preview_method"] = str(args.preview_method) + if hasattr(args, "enable_manager_legacy_ui"): + cli_args["enable_manager_legacy_ui"] = args.enable_manager_legacy_ui return cli_args except Exception: return {} @@ -644,8 +684,8 @@ class TaskQueue: """Get list of custom nodes that failed to import.""" try: # Check if the import_failed_extensions set is available - if hasattr(sys, '__comfyui_manager_import_failed_extensions'): - failed_set = getattr(sys, '__comfyui_manager_import_failed_extensions') + if hasattr(sys, "__comfyui_manager_import_failed_extensions"): + failed_set = getattr(sys, "__comfyui_manager_import_failed_extensions") return list(failed_set) if failed_set else [] return [] except Exception: @@ -666,7 +706,13 @@ class TaskQueue: for ui_id, task in self.history_tasks.items(): result_status = OperationResult.success if task.status: - status_str = task.status.status_str if hasattr(task.status, 'status_str') else task.status.get("status_str", OperationResult.success.value) + status_str = ( + task.status.status_str + if hasattr(task.status, "status_str") + else task.status.get( + "status_str", OperationResult.success.value + ) + ) if status_str == OperationResult.error.value: result_status = OperationResult.failed elif status_str == OperationResult.skip.value: @@ -693,9 +739,11 @@ task_queue = TaskQueue() # Preview method initialization if args.preview_method == latent_preview.LatentPreviewMethod.NoPreviews: - environment_utils.set_preview_method(core.get_config()['preview_method']) + environment_utils.set_preview_method(core.get_config()["preview_method"]) else: - logging.warning("[ComfyUI-Manager] Since --preview-method is set, ComfyUI-Manager's preview method feature will be ignored.") + logging.warning( + "[ComfyUI-Manager] Since --preview-method is set, ComfyUI-Manager's preview method feature will be ignored." + ) async def task_worker(): @@ -708,10 +756,13 @@ async def task_worker(): channel = params.channel mode = params.mode skip_post_install = params.skip_post_install - + logging.debug( "[ComfyUI-Manager] Installing node: id=%s, version=%s, channel=%s, mode=%s", - node_id, node_version, channel, mode + node_id, + node_version, + channel, + mode, ) try: @@ -761,10 +812,9 @@ async def task_worker(): async def do_update(params: UpdatePackParams) -> str: node_name = params.node_name node_ver = params.node_ver - + logging.debug( - "[ComfyUI-Manager] Updating node: name=%s, version=%s", - node_name, node_ver + "[ComfyUI-Manager] Updating node: name=%s, version=%s", node_name, node_ver ) try: @@ -808,7 +858,7 @@ async def task_worker(): async def do_update_comfyui(params: UpdateComfyUIParams) -> str: try: repo_path = os.path.dirname(folder_paths.__file__) - + # Check if this is a version switch operation if params.target_version: # Switch to specific version @@ -820,7 +870,8 @@ async def task_worker(): is_stable = params.is_stable if params.is_stable is not None else True logging.debug( "[ComfyUI-Manager] Updating ComfyUI: is_stable=%s, repo_path=%s", - is_stable, repo_path + is_stable, + repo_path, ) latest_tag = None if is_stable: @@ -870,10 +921,11 @@ async def task_worker(): async def do_uninstall(params: UninstallPackParams) -> str: node_name = params.node_name is_unknown = params.is_unknown - + logging.debug( "[ComfyUI-Manager] Uninstalling node: name=%s, is_unknown=%s", - node_name, is_unknown + node_name, + is_unknown, ) try: @@ -892,16 +944,15 @@ async def task_worker(): async def do_disable(params: DisablePackParams) -> str: node_name = params.node_name - + logging.debug( "[ComfyUI-Manager] Disabling node: name=%s, is_unknown=%s", - node_name, params.is_unknown + node_name, + params.is_unknown, ) - + try: - res = core.unified_manager.unified_disable( - node_name, params.is_unknown - ) + res = core.unified_manager.unified_disable(node_name, params.is_unknown) if res: return OperationResult.success.value @@ -978,13 +1029,18 @@ async def task_worker(): timeout = 4096 task = task_queue.get(timeout) if task is None: - is_empty_queue = task_queue.total_count() == 0 and len(task_queue.running_tasks) == 0 + is_empty_queue = ( + task_queue.total_count() == 0 and len(task_queue.running_tasks) == 0 + ) if is_empty_queue: logging.debug("[ComfyUI-Manager] Queue empty - all tasks completed") did_complete_tasks = task_queue.done_count() > 0 if did_complete_tasks: - logging.debug("[ComfyUI-Manager] Finalizing batch history with %d completed tasks", task_queue.done_count()) + logging.debug( + "[ComfyUI-Manager] Finalizing batch history with %d completed tasks", + task_queue.done_count(), + ) task_queue.finalize() logging.debug("[ComfyUI-Manager] Batch finalization complete") @@ -1004,7 +1060,10 @@ async def task_worker(): logging.debug( "[ComfyUI-Manager] Processing task: kind=%s, ui_id=%s, client_id=%s, task_index=%d", - kind, item.ui_id, item.client_id, task_index + kind, + item.ui_id, + item.client_id, + task_index, ) try: @@ -1032,10 +1091,17 @@ async def task_worker(): msg = f"Exception: {(kind, item)}" logging.error( "[ComfyUI-Manager] Task execution exception: kind=%s, ui_id=%s, error=%s", - kind, item.ui_id, traceback.format_exc() + kind, + item.ui_id, + traceback.format_exc(), ) await task_queue.task_done( - item, task_index, msg, TaskExecutionStatus(status_str=OperationResult.error, completed=True, messages=[msg]) + item, + task_index, + msg, + TaskExecutionStatus( + status_str=OperationResult.error, completed=True, messages=[msg] + ), ) return @@ -1047,15 +1113,24 @@ async def task_worker(): # Determine status if result_msg == OperationResult.success.value: - status = TaskExecutionStatus(status_str=OperationResult.success, completed=True, messages=[]) + status = TaskExecutionStatus( + status_str=OperationResult.success, completed=True, messages=[] + ) elif result_msg == OperationResult.skip.value: - status = TaskExecutionStatus(status_str=OperationResult.skip, completed=True, messages=[]) + status = TaskExecutionStatus( + status_str=OperationResult.skip, completed=True, messages=[] + ) else: - status = TaskExecutionStatus(status_str=OperationResult.error, completed=True, messages=[result_msg]) + status = TaskExecutionStatus( + status_str=OperationResult.error, completed=True, messages=[result_msg] + ) logging.debug( "[ComfyUI-Manager] Task execution completed: kind=%s, ui_id=%s, status=%s, result=%s", - kind, item.ui_id, status.status_str, result_msg + kind, + item.ui_id, + status.status_str, + result_msg, ) await task_queue.task_done(item, task_index, result_msg, status) @@ -1079,7 +1154,9 @@ async def queue_task(request) -> web.Response: task_item = QueueTaskItem.model_validate(json_data) logging.debug( "[ComfyUI-Manager] Queueing task via API: kind=%s, ui_id=%s, client_id=%s", - task_item.kind, task_item.ui_id, task_item.client_id + task_item.kind, + task_item.ui_id, + task_item.client_id, ) TaskQueue.instance.put(task_item) # maybe start worker @@ -1142,7 +1219,7 @@ async def get_history(request): batch_path = os.path.join(context.manager_batch_history_path, json_name) logging.debug( "[ComfyUI-Manager] Fetching batch history: id=%s", - request.rel_url.query["id"] + request.rel_url.query["id"], ) with open(batch_path, "r", encoding="utf-8") as file: @@ -1155,10 +1232,12 @@ async def get_history(request): ui_id = request.rel_url.query.get("ui_id") max_items = request.rel_url.query.get("max_items") offset = request.rel_url.query.get("offset", -1) - + logging.debug( "[ComfyUI-Manager] Fetching history: client_id=%s, ui_id=%s, max_items=%s", - client_id, ui_id, max_items + client_id, + ui_id, + max_items, ) if max_items: @@ -1229,7 +1308,7 @@ async def fetch_customnode_mappings(request): async def fetch_updates(request): """ DEPRECATED: This endpoint is no longer supported. - + Repository fetching has been removed from the API. Updates should be performed through the queue system using update operations. """ @@ -1237,9 +1316,9 @@ async def fetch_updates(request): { "error": "This endpoint has been deprecated", "message": "Repository fetching is no longer supported. Please use the update operations through the queue system.", - "deprecated": True + "deprecated": True, }, - status=410 # 410 Gone + status=410, # 410 Gone ) @@ -1250,7 +1329,9 @@ async def update_all(request: web.Request) -> web.Response: query_params = UpdateAllQueryParams.model_validate(dict(request.rel_url.query)) return await _update_all(query_params) except ValidationError as e: - return web.json_response({"error": "Validation error", "details": e.errors()}, status=400) + return web.json_response( + {"error": "Validation error", "details": e.errors()}, status=400 + ) async def _update_all(params: UpdateAllQueryParams) -> web.Response: @@ -1262,10 +1343,12 @@ async def _update_all(params: UpdateAllQueryParams) -> web.Response: base_ui_id = params.ui_id client_id = params.client_id mode = params.mode.value if params.mode else ManagerDatabaseSource.remote.value - + logging.debug( "[ComfyUI-Manager] Update all requested: client_id=%s, base_ui_id=%s, mode=%s", - client_id, base_ui_id, mode + client_id, + base_ui_id, + mode, ) if mode == ManagerDatabaseSource.local.value: @@ -1284,10 +1367,10 @@ async def _update_all(params: UpdateAllQueryParams) -> web.Response: continue update_task = QueueTaskItem( - kind=OperationType.update.value, + kind=OperationType.update.value, ui_id=f"{base_ui_id}_{k}", # Use client's base ui_id + node name client_id=client_id, - params=UpdatePackParams(node_name=k, node_ver=v[0]) + params=UpdatePackParams(node_name=k, node_ver=v[0]), ) task_queue.put(update_task) update_count += 1 @@ -1299,17 +1382,18 @@ async def _update_all(params: UpdateAllQueryParams) -> web.Response: continue update_task = QueueTaskItem( - kind=OperationType.update.value, + kind=OperationType.update.value, ui_id=f"{base_ui_id}_{k}", # Use client's base ui_id + node name client_id=client_id, - params=UpdatePackParams(node_name=k, node_ver="unknown") + params=UpdatePackParams(node_name=k, node_ver="unknown"), ) task_queue.put(update_task) update_count += 1 logging.debug( "[ComfyUI-Manager] Update all queued %d tasks for client_id=%s", - update_count, client_id + update_count, + client_id, ) return web.Response(status=200) @@ -1476,7 +1560,6 @@ async def import_fail_info(request): return web.Response(status=500, text="Internal server error") - @routes.get("/v2/manager/queue/reset") async def reset_queue(request): logging.debug("[ComfyUI-Manager] Queue reset requested") @@ -1537,13 +1620,11 @@ async def queue_count(request): ) - - @routes.get("/v2/manager/queue/start") async def queue_start(request): logging.debug("[ComfyUI-Manager] Queue start requested") started = task_queue.start_worker() - + if started: logging.debug("[ComfyUI-Manager] Queue worker started successfully") return web.Response(status=200) # Started successfully @@ -1557,27 +1638,31 @@ async def update_comfyui(request): """Queue a ComfyUI update based on the configured update policy.""" try: # Validate query parameters using Pydantic model - query_params = UpdateComfyUIQueryParams.model_validate(dict(request.rel_url.query)) - + query_params = UpdateComfyUIQueryParams.model_validate( + dict(request.rel_url.query) + ) + # Check if stable parameter was provided, otherwise use config if query_params.stable is None: is_stable = core.get_config()["update_policy"] != "nightly-comfyui" else: is_stable = query_params.stable - + client_id = query_params.client_id ui_id = query_params.ui_id except ValidationError as e: - return web.json_response({"error": "Validation error", "details": e.errors()}, status=400) - + return web.json_response( + {"error": "Validation error", "details": e.errors()}, status=400 + ) + # Create update-comfyui task task = QueueTaskItem( ui_id=ui_id, client_id=client_id, kind=OperationType.update_comfyui.value, - params=UpdateComfyUIParams(is_stable=is_stable) + params=UpdateComfyUIParams(is_stable=is_stable), ) - + task_queue.put(task) return web.Response(status=200) @@ -1601,24 +1686,28 @@ async def comfyui_versions(request): async def comfyui_switch_version(request): try: # Validate query parameters using Pydantic model - query_params = ComfyUISwitchVersionQueryParams.model_validate(dict(request.rel_url.query)) - + query_params = ComfyUISwitchVersionQueryParams.model_validate( + dict(request.rel_url.query) + ) + target_version = query_params.ver client_id = query_params.client_id ui_id = query_params.ui_id - + # Create update-comfyui task with target version task = QueueTaskItem( ui_id=ui_id, client_id=client_id, kind=OperationType.update_comfyui.value, - params=UpdateComfyUIParams(target_version=target_version) + params=UpdateComfyUIParams(target_version=target_version), ) - + task_queue.put(task) return web.Response(status=200) except ValidationError as e: - return web.json_response({"error": "Validation error", "details": e.errors()}, status=400) + return web.json_response( + {"error": "Validation error", "details": e.errors()}, status=400 + ) except Exception as e: logging.error(f"ComfyUI version switch fail: {e}", file=sys.stderr) return web.Response(status=400) @@ -1628,24 +1717,24 @@ async def comfyui_switch_version(request): async def install_model(request): try: json_data = await request.json() - + # Validate required fields - if 'client_id' not in json_data: + if "client_id" not in json_data: return web.Response(status=400, text="Missing required field: client_id") - if 'ui_id' not in json_data: + if "ui_id" not in json_data: return web.Response(status=400, text="Missing required field: ui_id") - + # Validate model metadata model_data = ModelMetadata.model_validate(json_data) - + # Create install-model task with client-provided IDs task = QueueTaskItem( - ui_id=json_data['ui_id'], - client_id=json_data['client_id'], + ui_id=json_data["ui_id"], + client_id=json_data["client_id"], kind=OperationType.install_model.value, - params=model_data + params=model_data, ) - + task_queue.put(task) return web.Response(status=200) except ValidationError as e: @@ -1816,8 +1905,12 @@ async def default_cache_update(): ) else: # load at least once - await core.unified_manager.reload(ManagerDatabaseSource.remote.value, dont_wait=False) - await core.unified_manager.get_custom_nodes(channel_url, ManagerDatabaseSource.remote.value) + await core.unified_manager.reload( + ManagerDatabaseSource.remote.value, dont_wait=False + ) + await core.unified_manager.get_custom_nodes( + channel_url, ManagerDatabaseSource.remote.value + ) else: await core.unified_manager.reload( ManagerDatabaseSource.remote.value, dont_wait=False, update_cnr_map=False