[feat] Implement comprehensive system state capture for batch records

This commit is contained in:
bymyself 2025-06-17 13:08:35 -07:00
parent 350cb767c3
commit 03ecda3cfe

View File

@ -73,6 +73,10 @@ from ..data_models import (
DisablePackParams, DisablePackParams,
EnablePackParams, EnablePackParams,
ModelMetadata, ModelMetadata,
OperationType,
OperationResult,
ManagerDatabaseSource,
SecurityLevel,
) )
from .constants import ( from .constants import (
@ -100,9 +104,6 @@ def is_loopback(address):
return False return False
is_local_mode = is_loopback(args.listen)
def validate_required_params(request: web.Request, required_params: List[str]) -> Optional[web.Response]: def validate_required_params(request: web.Request, required_params: List[str]) -> Optional[web.Response]:
"""Validate that all required query parameters are present. """Validate that all required query parameters are present.
@ -222,8 +223,10 @@ class TaskQueue:
def start_worker(self) -> bool: def start_worker(self) -> bool:
"""Start the task worker if not already running. Returns True if started, False if already running.""" """Start the task worker if not already running. Returns True if started, False if already running."""
if self._worker_task is not None and self._worker_task.is_alive(): if self._worker_task is not None and self._worker_task.is_alive():
logging.debug("[ComfyUI-Manager] Worker already running, skipping start")
return False # Already running return False # Already running
logging.debug("[ComfyUI-Manager] Starting task worker thread")
self._worker_task = threading.Thread(target=lambda: asyncio.run(task_worker())) self._worker_task = threading.Thread(target=lambda: asyncio.run(task_worker()))
self._worker_task.start() self._worker_task.start()
return True return True
@ -266,6 +269,10 @@ class TaskQueue:
item = QueueTaskItem(**item) item = QueueTaskItem(**item)
heapq.heappush(self.pending_tasks, item) heapq.heappush(self.pending_tasks, item)
logging.debug(
"[ComfyUI-Manager] Task added to queue: kind=%s, ui_id=%s, client_id=%s, pending_count=%d",
item.kind, item.ui_id, item.client_id, len(self.pending_tasks)
)
self.not_empty.notify() self.not_empty.notify()
def _start_new_batch(self) -> None: def _start_new_batch(self) -> None:
@ -284,11 +291,16 @@ class TaskQueue:
while len(self.pending_tasks) == 0: while len(self.pending_tasks) == 0:
self.not_empty.wait(timeout=timeout) self.not_empty.wait(timeout=timeout)
if timeout is not None and len(self.pending_tasks) == 0: if timeout is not None and len(self.pending_tasks) == 0:
logging.debug("[ComfyUI-Manager] Task queue get timed out")
return None return None
item = heapq.heappop(self.pending_tasks) item = heapq.heappop(self.pending_tasks)
task_index = self.task_counter task_index = self.task_counter
self.running_tasks[task_index] = copy.deepcopy(item) self.running_tasks[task_index] = copy.deepcopy(item)
self.task_counter += 1 self.task_counter += 1
logging.debug(
"[ComfyUI-Manager] Task retrieved from queue: kind=%s, ui_id=%s, task_index=%d, running_count=%d, pending_count=%d",
item.kind, item.ui_id, task_index, len(self.running_tasks), len(self.pending_tasks)
)
TaskQueue.send_queue_state_update( TaskQueue.send_queue_state_update(
ManagerMessageName.cm_task_started.value, ManagerMessageName.cm_task_started.value,
MessageTaskStarted( MessageTaskStarted(
@ -315,6 +327,12 @@ class TaskQueue:
# Remove task from running_tasks using the task_index # Remove task from running_tasks using the task_index
self.running_tasks.pop(task_index, None) self.running_tasks.pop(task_index, None)
logging.debug(
"[ComfyUI-Manager] Task completed: kind=%s, ui_id=%s, task_index=%d, status=%s, running_count=%d",
item.kind, item.ui_id, task_index,
status.status_str if status else "unknown",
len(self.running_tasks)
)
# Manage history size # Manage history size
if len(self.history_tasks) > MAXIMUM_HISTORY_SIZE: if len(self.history_tasks) > MAXIMUM_HISTORY_SIZE:
@ -332,12 +350,16 @@ class TaskQueue:
) )
# Force cache refresh for successful pack-modifying operations # Force cache refresh for successful pack-modifying operations
pack_modifying_tasks = {"install", "uninstall", "enable", "disable"} pack_modifying_tasks = {OperationType.install.value, OperationType.uninstall.value, OperationType.enable.value, OperationType.disable.value}
if (item.kind in pack_modifying_tasks and if (item.kind in pack_modifying_tasks and
status and status.status_str == "success"): status and status.status_str == OperationResult.success.value):
try: try:
logging.debug(
"[ComfyUI-Manager] Refreshing cache after successful %s operation",
item.kind
)
# Force unified_manager to refresh its installed packages cache # Force unified_manager to refresh its installed packages cache
await core.unified_manager.reload("cache", dont_wait=True, update_cnr_map=False) await core.unified_manager.reload(ManagerDatabaseSource.cache.value, dont_wait=True, update_cnr_map=False)
except Exception as e: except Exception as e:
logging.warning(f"[ComfyUI-Manager] Failed to refresh cache after {item.kind}: {e}") logging.warning(f"[ComfyUI-Manager] Failed to refresh cache after {item.kind}: {e}")
@ -370,13 +392,21 @@ class TaskQueue:
def wipe_queue(self) -> None: def wipe_queue(self) -> None:
"""Clear all task queue""" """Clear all task queue"""
with self.mutex: with self.mutex:
pending_count = len(self.pending_tasks)
self.pending_tasks = [] self.pending_tasks = []
logging.debug("[ComfyUI-Manager] Queue wiped: cleared %d pending tasks", pending_count)
def abort(self) -> None: def abort(self) -> None:
"""Abort current operations""" """Abort current operations"""
with self.mutex: with self.mutex:
pending_count = len(self.pending_tasks)
running_count = len(self.running_tasks)
self.pending_tasks = [] self.pending_tasks = []
self.running_tasks = {} self.running_tasks = {}
logging.debug(
"[ComfyUI-Manager] Queue aborted: cleared %d pending and %d running tasks",
pending_count, running_count
)
def delete_history_item(self, ui_id: str) -> None: def delete_history_item(self, ui_id: str) -> None:
"""Delete specific task from history""" """Delete specific task from history"""
@ -441,6 +471,10 @@ class TaskQueue:
batch_path = os.path.join( batch_path = os.path.join(
context.manager_batch_history_path, self.batch_id + ".json" context.manager_batch_history_path, self.batch_id + ".json"
) )
logging.debug(
"[ComfyUI-Manager] Finalizing batch: batch_id=%s, history_count=%d",
self.batch_id, len(self.history_tasks)
)
try: try:
end_time = datetime.now().isoformat() end_time = datetime.now().isoformat()
@ -456,13 +490,13 @@ class TaskQueue:
operations=operations, operations=operations,
total_operations=len(operations), total_operations=len(operations),
successful_operations=len( successful_operations=len(
[op for op in operations if op.result == "success"] [op for op in operations if op.result == OperationResult.success.value]
), ),
failed_operations=len( failed_operations=len(
[op for op in operations if op.result == "failed"] [op for op in operations if op.result == OperationResult.failed.value]
), ),
skipped_operations=len( skipped_operations=len(
[op for op in operations if op.result == "skipped"] [op for op in operations if op.result == OperationResult.skipped.value]
), ),
) )
@ -472,7 +506,12 @@ class TaskQueue:
batch_record.model_dump(), json_file, indent=4, default=str batch_record.model_dump(), json_file, indent=4, default=str
) )
logging.debug(f"[ComfyUI-Manager] Batch history saved: {batch_path}") logging.debug(
"[ComfyUI-Manager] Batch history saved: batch_id=%s, path=%s, total_ops=%d, successful=%d, failed=%d, skipped=%d",
self.batch_id, batch_path, batch_record.total_operations,
batch_record.successful_operations, batch_record.failed_operations,
batch_record.skipped_operations
)
# Reset batch tracking # Reset batch tracking
self.batch_id = None self.batch_id = None
@ -484,12 +523,22 @@ class TaskQueue:
def _capture_system_state(self) -> ComfyUISystemState: def _capture_system_state(self) -> ComfyUISystemState:
"""Capture current ComfyUI system state for batch record.""" """Capture current ComfyUI system state for batch record."""
logging.debug("[ComfyUI-Manager] Capturing system state for batch record")
return ComfyUISystemState( return ComfyUISystemState(
snapshot_time=datetime.now().isoformat(), snapshot_time=datetime.now().isoformat(),
comfyui_version=self._get_comfyui_version_info(), comfyui_version=self._get_comfyui_version_info(),
python_version=platform.python_version(), python_version=platform.python_version(),
platform_info=f"{platform.system()} {platform.release()} ({platform.machine()})", platform_info=f"{platform.system()} {platform.release()} ({platform.machine()})",
installed_nodes=self._get_installed_nodes(), installed_nodes=self._get_installed_nodes(),
comfyui_root_path=self._get_comfyui_root_path(),
model_paths=self._get_model_paths(),
manager_version=self._get_manager_version(),
security_level=self._get_security_level(),
network_mode=self._get_network_mode(),
cli_args=self._get_cli_args(),
custom_nodes_count=self._get_custom_nodes_count(),
failed_imports=self._get_failed_imports(),
pip_packages=self._get_pip_packages(),
) )
def _get_comfyui_version_info(self) -> ComfyUIVersionInfo: def _get_comfyui_version_info(self) -> ComfyUIVersionInfo:
@ -508,10 +557,19 @@ class TaskQueue:
try: try:
node_packs = core.get_installed_node_packs() node_packs = core.get_installed_node_packs()
for pack_name, pack_info in node_packs.items(): for pack_name, pack_info in node_packs.items():
# Determine install method and repository URL
install_method = "git" if pack_info.get("aux_id") else "cnr"
repository_url = None
if pack_info.get("aux_id"):
# It's a git-based node, construct GitHub URL
repository_url = f"https://github.com/{pack_info['aux_id']}"
installed_nodes[pack_name] = InstalledNodeInfo( installed_nodes[pack_name] = InstalledNodeInfo(
name=pack_name, name=pack_name,
version=pack_info.get("ver", "unknown"), version=pack_info.get("ver", "unknown"),
install_method="unknown", install_method=install_method,
repository_url=repository_url,
enabled=pack_info.get("enabled", True), enabled=pack_info.get("enabled", True),
) )
except Exception as e: except Exception as e:
@ -519,25 +577,121 @@ class TaskQueue:
return installed_nodes return installed_nodes
def _get_comfyui_root_path(self) -> str:
"""Get ComfyUI root installation directory."""
try:
return os.path.dirname(folder_paths.__file__)
except Exception:
return None
def _get_model_paths(self) -> dict[str, list[str]]:
"""Get model paths for different model types."""
try:
model_paths = {}
for model_type in model_dir_name_map.keys():
try:
paths = folder_paths.get_folder_paths(model_type)
if paths:
model_paths[model_type] = paths
except Exception:
continue
return model_paths
except Exception:
return {}
def _get_manager_version(self) -> str:
"""Get ComfyUI Manager version."""
try:
version_code = getattr(core, 'version_code', [4, 0])
return f"V{version_code[0]}.{version_code[1]}"
except Exception:
return None
def _get_security_level(self) -> SecurityLevel:
"""Get current security level."""
try:
config = core.get_config()
level_str = config.get('security_level', 'normal')
# Map the string to SecurityLevel enum
level_mapping = {
'strong': SecurityLevel.strong,
'normal': SecurityLevel.normal,
'normal-': SecurityLevel.normal_,
'weak': SecurityLevel.weak
}
return level_mapping.get(level_str, SecurityLevel.normal)
except Exception:
return None
def _get_network_mode(self) -> str:
"""Get current network mode."""
try:
config = core.get_config()
return config.get('network_mode', 'online')
except Exception:
return None
def _get_cli_args(self) -> dict[str, Any]:
"""Get selected CLI arguments."""
try:
cli_args = {}
if hasattr(args, 'listen'):
cli_args['listen'] = args.listen
if hasattr(args, 'port'):
cli_args['port'] = args.port
if hasattr(args, 'preview_method'):
cli_args['preview_method'] = str(args.preview_method)
if hasattr(args, 'enable_manager_legacy_ui'):
cli_args['enable_manager_legacy_ui'] = args.enable_manager_legacy_ui
return cli_args
except Exception:
return {}
def _get_custom_nodes_count(self) -> int:
"""Get total number of custom node packages."""
try:
node_packs = core.get_installed_node_packs()
return len(node_packs)
except Exception:
return 0
def _get_failed_imports(self) -> list[str]:
"""Get list of custom nodes that failed to import."""
try:
# Check if the import_failed_extensions set is available
if hasattr(sys, '__comfyui_manager_import_failed_extensions'):
failed_set = getattr(sys, '__comfyui_manager_import_failed_extensions')
return list(failed_set) if failed_set else []
return []
except Exception:
return []
def _get_pip_packages(self) -> dict[str, str]:
"""Get installed pip packages."""
try:
return core.get_installed_pip_packages()
except Exception:
return {}
def _extract_batch_operations(self) -> list[BatchOperation]: def _extract_batch_operations(self) -> list[BatchOperation]:
"""Extract operations from completed task history for this batch.""" """Extract operations from completed task history for this batch."""
operations = [] operations = []
try: try:
for ui_id, task in self.history_tasks.items(): for ui_id, task in self.history_tasks.items():
result_status = "success" result_status = OperationResult.success
if task.status: if task.status:
status_str = task.status.get("status_str", "success") status_str = task.status.status_str if hasattr(task.status, 'status_str') else task.status.get("status_str", OperationResult.success.value)
if status_str == "error": if status_str == OperationResult.error.value:
result_status = "failed" result_status = OperationResult.failed
elif status_str == "skip": elif status_str == OperationResult.skip.value:
result_status = "skipped" result_status = OperationResult.skipped
operation = BatchOperation( operation = BatchOperation(
operation_id=ui_id, operation_id=ui_id,
operation_type=task.kind, operation_type=task.kind,
target=f"task_{ui_id}", target=f"task_{ui_id}",
result=result_status, result=result_status.value,
start_time=task.timestamp, start_time=task.timestamp,
client_id=task.client_id, client_id=task.client_id,
) )
@ -563,7 +717,8 @@ else:
async def task_worker(): async def task_worker():
await core.unified_manager.reload("cache") logging.debug("[ComfyUI-Manager] Task worker started")
await core.unified_manager.reload(ManagerDatabaseSource.cache.value)
async def do_install(params: InstallPackParams) -> str: async def do_install(params: InstallPackParams) -> str:
node_id = params.id node_id = params.id
@ -571,6 +726,11 @@ async def task_worker():
channel = params.channel channel = params.channel
mode = params.mode mode = params.mode
skip_post_install = params.skip_post_install skip_post_install = params.skip_post_install
logging.debug(
"[ComfyUI-Manager] Installing node: id=%s, version=%s, channel=%s, mode=%s",
node_id, node_version, channel, mode
)
try: try:
node_spec = core.unified_manager.resolve_node_spec( node_spec = core.unified_manager.resolve_node_spec(
@ -605,19 +765,25 @@ async def task_worker():
logging.error(f"[ComfyUI-Manager] Installation failed:\n{res.msg}") logging.error(f"[ComfyUI-Manager] Installation failed:\n{res.msg}")
return res.msg return res.msg
return "success" return OperationResult.success.value
except Exception: except Exception:
traceback.print_exc() traceback.print_exc()
return "Installation failed" return "Installation failed"
async def do_enable(params: EnablePackParams) -> str: async def do_enable(params: EnablePackParams) -> str:
cnr_id = params.cnr_id cnr_id = params.cnr_id
logging.debug("[ComfyUI-Manager] Enabling node: cnr_id=%s", cnr_id)
core.unified_manager.unified_enable(cnr_id) core.unified_manager.unified_enable(cnr_id)
return "success" return OperationResult.success.value
async def do_update(params: UpdatePackParams) -> str: async def do_update(params: UpdatePackParams) -> str:
node_name = params.node_name node_name = params.node_name
node_ver = params.node_ver node_ver = params.node_ver
logging.debug(
"[ComfyUI-Manager] Updating node: name=%s, version=%s",
node_name, node_ver
)
try: try:
res = core.unified_manager.unified_update(node_name, node_ver) res = core.unified_manager.unified_update(node_name, node_ver)
@ -641,10 +807,10 @@ async def task_worker():
if res.result: if res.result:
if res.action == "skip": if res.action == "skip":
base_res["msg"] = "skip" base_res["msg"] = OperationResult.skip.value
return base_res return base_res
else: else:
base_res["msg"] = "success" base_res["msg"] = OperationResult.success.value
return base_res return base_res
base_res["msg"] = f"An error occurred while updating '{node_name}'." base_res["msg"] = f"An error occurred while updating '{node_name}'."
@ -670,6 +836,10 @@ async def task_worker():
else: else:
# Regular update operation # Regular update operation
is_stable = params.is_stable if params.is_stable is not None else True is_stable = params.is_stable if params.is_stable is not None else True
logging.debug(
"[ComfyUI-Manager] Updating ComfyUI: is_stable=%s, repo_path=%s",
is_stable, repo_path
)
latest_tag = None latest_tag = None
if is_stable: if is_stable:
res, latest_tag = core.update_to_stable_comfyui(repo_path) res, latest_tag = core.update_to_stable_comfyui(repo_path)
@ -688,7 +858,7 @@ async def task_worker():
return "success-nightly" return "success-nightly"
else: # skipped else: # skipped
logging.info("ComfyUI is up-to-date.") logging.info("ComfyUI is up-to-date.")
return "skip" return OperationResult.skip.value
except Exception: except Exception:
traceback.print_exc() traceback.print_exc()
@ -703,7 +873,7 @@ async def task_worker():
res = core.unified_manager.unified_fix(node_name, node_ver) res = core.unified_manager.unified_fix(node_name, node_ver)
if res.result: if res.result:
return "success" return OperationResult.success.value
else: else:
logging.error(res.msg) logging.error(res.msg)
@ -718,12 +888,17 @@ async def task_worker():
async def do_uninstall(params: UninstallPackParams) -> str: async def do_uninstall(params: UninstallPackParams) -> str:
node_name = params.node_name node_name = params.node_name
is_unknown = params.is_unknown is_unknown = params.is_unknown
logging.debug(
"[ComfyUI-Manager] Uninstalling node: name=%s, is_unknown=%s",
node_name, is_unknown
)
try: try:
res = core.unified_manager.unified_uninstall(node_name, is_unknown) res = core.unified_manager.unified_uninstall(node_name, is_unknown)
if res.result: if res.result:
return "success" return OperationResult.success.value
logging.error( logging.error(
f"\nERROR: An error occurred while uninstalling '{node_name}'." f"\nERROR: An error occurred while uninstalling '{node_name}'."
@ -735,13 +910,19 @@ async def task_worker():
async def do_disable(params: DisablePackParams) -> str: async def do_disable(params: DisablePackParams) -> str:
node_name = params.node_name node_name = params.node_name
logging.debug(
"[ComfyUI-Manager] Disabling node: name=%s, is_unknown=%s",
node_name, params.is_unknown
)
try: try:
res = core.unified_manager.unified_disable( res = core.unified_manager.unified_disable(
node_name, params.is_unknown node_name, params.is_unknown
) )
if res: if res:
return "success" return OperationResult.success.value
except Exception: except Exception:
traceback.print_exc() traceback.print_exc()
@ -778,7 +959,7 @@ async def task_worker():
repo_id=model_url, local_dir=model_path repo_id=model_url, local_dir=model_path
) )
return "success" return OperationResult.success.value
elif not core.get_config()["model_download_by_agent"] and ( elif not core.get_config()["model_download_by_agent"] and (
model_url.startswith("https://github.com") model_url.startswith("https://github.com")
@ -793,7 +974,7 @@ async def task_worker():
res = True res = True
if res: if res:
return "success" return OperationResult.success.value
else: else:
res = download_url_with_agent(model_url, model_path) res = download_url_with_agent(model_url, model_path)
if res and model_path.endswith(".zip"): if res and model_path.endswith(".zip"):
@ -804,7 +985,7 @@ async def task_worker():
) )
if res: if res:
return "success" return OperationResult.success.value
except Exception as e: except Exception as e:
logging.error(f"[ComfyUI-Manager] ERROR: {e}", file=sys.stderr) logging.error(f"[ComfyUI-Manager] ERROR: {e}", file=sys.stderr)
@ -823,53 +1004,62 @@ async def task_worker():
if task is None: if task is None:
# Check if queue is truly empty (no pending or running tasks) # Check if queue is truly empty (no pending or running tasks)
if task_queue.total_count() == 0 and len(task_queue.running_tasks) == 0: if task_queue.total_count() == 0 and len(task_queue.running_tasks) == 0:
logging.debug("\n[ComfyUI-Manager] All tasks are completed.") logging.debug("[ComfyUI-Manager] Queue empty - all tasks completed")
# Trigger batch history serialization if there are completed tasks # Trigger batch history serialization if there are completed tasks
if task_queue.done_count() > 0: if task_queue.done_count() > 0:
logging.debug("[ComfyUI-Manager] Finalizing batch history...") logging.debug("[ComfyUI-Manager] Finalizing batch history with %d completed tasks", task_queue.done_count())
task_queue.finalize() task_queue.finalize()
logging.debug("[ComfyUI-Manager] Batch history saved.") logging.debug("[ComfyUI-Manager] Batch finalization complete")
logging.info("\nAfter restarting ComfyUI, please refresh the browser.") logging.info("\nAfter restarting ComfyUI, please refresh the browser.")
res = {"status": "all-done"} res = {"status": "all-done"}
# Broadcast general status updates to all clients # Broadcast general status updates to all clients
logging.debug("[ComfyUI-Manager] Broadcasting queue all-done status")
PromptServer.instance.send_sync("cm-queue-status", res) PromptServer.instance.send_sync("cm-queue-status", res)
logging.debug("[ComfyUI-Manager] Task worker exiting")
return return
item, task_index = task item, task_index = task
kind = item.kind kind = item.kind
logging.debug(f"Processing task: {kind} with item: {item} at index: {task_index}") logging.debug(
"[ComfyUI-Manager] Processing task: kind=%s, ui_id=%s, client_id=%s, task_index=%d",
kind, item.ui_id, item.client_id, task_index
)
try: try:
if kind == "install": if kind == OperationType.install.value:
msg = await do_install(item.params) msg = await do_install(item.params)
elif kind == "enable": elif kind == OperationType.enable.value:
msg = await do_enable(item.params) msg = await do_enable(item.params)
elif kind == "install-model": elif kind == OperationType.install_model.value:
msg = await do_install_model(item.params) msg = await do_install_model(item.params)
elif kind == "update": elif kind == OperationType.update.value:
msg = await do_update(item.params) msg = await do_update(item.params)
elif kind == "update-main": elif kind == "update-main":
msg = await do_update(item.params) msg = await do_update(item.params)
elif kind == "update-comfyui": elif kind == OperationType.update_comfyui.value:
msg = await do_update_comfyui(item.params) msg = await do_update_comfyui(item.params)
elif kind == "fix": elif kind == OperationType.fix.value:
msg = await do_fix(item.params) msg = await do_fix(item.params)
elif kind == "uninstall": elif kind == OperationType.uninstall.value:
msg = await do_uninstall(item.params) msg = await do_uninstall(item.params)
elif kind == "disable": elif kind == OperationType.disable.value:
msg = await do_disable(item.params) msg = await do_disable(item.params)
else: else:
msg = "Unexpected kind: " + kind msg = "Unexpected kind: " + kind
except Exception: except Exception:
msg = f"Exception: {(kind, item)}" msg = f"Exception: {(kind, item)}"
logging.error(
"[ComfyUI-Manager] Task execution exception: kind=%s, ui_id=%s, error=%s",
kind, item.ui_id, traceback.format_exc()
)
await task_queue.task_done( await task_queue.task_done(
item, task_index, msg, TaskExecutionStatus(status_str="error", completed=True, messages=[msg]) item, task_index, msg, TaskExecutionStatus(status_str=OperationResult.error, completed=True, messages=[msg])
) )
return return
@ -880,13 +1070,17 @@ async def task_worker():
result_msg = msg result_msg = msg
# Determine status # Determine status
if result_msg == "success": if result_msg == OperationResult.success.value:
status = TaskExecutionStatus(status_str="success", completed=True, messages=[]) status = TaskExecutionStatus(status_str=OperationResult.success, completed=True, messages=[])
elif result_msg == "skip": elif result_msg == OperationResult.skip.value:
status = TaskExecutionStatus(status_str="skip", completed=True, messages=[]) status = TaskExecutionStatus(status_str=OperationResult.skip, completed=True, messages=[])
else: else:
status = TaskExecutionStatus(status_str="error", completed=True, messages=[result_msg]) status = TaskExecutionStatus(status_str=OperationResult.error, completed=True, messages=[result_msg])
logging.debug(
"[ComfyUI-Manager] Task execution completed: kind=%s, ui_id=%s, status=%s, result=%s",
kind, item.ui_id, status.status_str, result_msg
)
await task_queue.task_done(item, task_index, result_msg, status) await task_queue.task_done(item, task_index, result_msg, status)
@ -907,6 +1101,10 @@ async def queue_task(request) -> web.Response:
json_data = await request.json() json_data = await request.json()
# Validate input using Pydantic model # Validate input using Pydantic model
task_item = QueueTaskItem.model_validate(json_data) task_item = QueueTaskItem.model_validate(json_data)
logging.debug(
"[ComfyUI-Manager] Queueing task via API: kind=%s, ui_id=%s, client_id=%s",
task_item.kind, task_item.ui_id, task_item.client_id
)
TaskQueue.instance.put(task_item) TaskQueue.instance.put(task_item)
# maybe start worker # maybe start worker
return web.Response(status=200) return web.Response(status=200)
@ -966,6 +1164,10 @@ async def get_history(request):
if "id" in request.rel_url.query: if "id" in request.rel_url.query:
json_name = request.rel_url.query["id"] + ".json" json_name = request.rel_url.query["id"] + ".json"
batch_path = os.path.join(context.manager_batch_history_path, json_name) batch_path = os.path.join(context.manager_batch_history_path, json_name)
logging.debug(
"[ComfyUI-Manager] Fetching batch history: id=%s",
request.rel_url.query["id"]
)
with open(batch_path, "r", encoding="utf-8") as file: with open(batch_path, "r", encoding="utf-8") as file:
json_str = file.read() json_str = file.read()
@ -977,6 +1179,11 @@ async def get_history(request):
ui_id = request.rel_url.query.get("ui_id") ui_id = request.rel_url.query.get("ui_id")
max_items = request.rel_url.query.get("max_items") max_items = request.rel_url.query.get("max_items")
offset = request.rel_url.query.get("offset", -1) offset = request.rel_url.query.get("offset", -1)
logging.debug(
"[ComfyUI-Manager] Fetching history: client_id=%s, ui_id=%s, max_items=%s",
client_id, ui_id, max_items
)
if max_items: if max_items:
max_items = int(max_items) max_items = int(max_items)
@ -1080,6 +1287,11 @@ async def _update_all(json_data: Dict[str, Any]) -> web.Response:
base_ui_id = json_data["ui_id"] base_ui_id = json_data["ui_id"]
client_id = json_data["client_id"] client_id = json_data["client_id"]
mode = json_data.get("mode", "remote") mode = json_data.get("mode", "remote")
logging.debug(
"[ComfyUI-Manager] Update all requested: client_id=%s, base_ui_id=%s, mode=%s",
client_id, base_ui_id, mode
)
if mode == "local": if mode == "local":
channel = "local" channel = "local"
@ -1089,6 +1301,7 @@ async def _update_all(json_data: Dict[str, Any]) -> web.Response:
await core.unified_manager.reload(mode) await core.unified_manager.reload(mode)
await core.unified_manager.get_custom_nodes(channel, mode) await core.unified_manager.get_custom_nodes(channel, mode)
update_count = 0
for k, v in core.unified_manager.active_nodes.items(): for k, v in core.unified_manager.active_nodes.items():
if k == "comfyui-manager": if k == "comfyui-manager":
# skip updating comfyui-manager if desktop version # skip updating comfyui-manager if desktop version
@ -1096,12 +1309,13 @@ async def _update_all(json_data: Dict[str, Any]) -> web.Response:
continue continue
update_task = QueueTaskItem( update_task = QueueTaskItem(
kind="update", kind=OperationType.update.value,
ui_id=f"{base_ui_id}_{k}", # Use client's base ui_id + node name ui_id=f"{base_ui_id}_{k}", # Use client's base ui_id + node name
client_id=client_id, client_id=client_id,
params=UpdatePackParams(node_name=k, node_ver=v[0]) params=UpdatePackParams(node_name=k, node_ver=v[0])
) )
task_queue.put(update_task) task_queue.put(update_task)
update_count += 1
for k, v in core.unified_manager.unknown_active_nodes.items(): for k, v in core.unified_manager.unknown_active_nodes.items():
if k == "comfyui-manager": if k == "comfyui-manager":
@ -1110,13 +1324,18 @@ async def _update_all(json_data: Dict[str, Any]) -> web.Response:
continue continue
update_task = QueueTaskItem( update_task = QueueTaskItem(
kind="update", kind=OperationType.update.value,
ui_id=f"{base_ui_id}_{k}", # Use client's base ui_id + node name ui_id=f"{base_ui_id}_{k}", # Use client's base ui_id + node name
client_id=client_id, client_id=client_id,
params=UpdatePackParams(node_name=k, node_ver="unknown") params=UpdatePackParams(node_name=k, node_ver="unknown")
) )
task_queue.put(update_task) task_queue.put(update_task)
update_count += 1
logging.debug(
"[ComfyUI-Manager] Update all queued %d tasks for client_id=%s",
update_count, client_id
)
return web.Response(status=200) return web.Response(status=200)
@ -1363,6 +1582,7 @@ async def import_fail_info(request):
@routes.get("/v2/manager/queue/reset") @routes.get("/v2/manager/queue/reset")
async def reset_queue(request): async def reset_queue(request):
logging.debug("[ComfyUI-Manager] Queue reset requested")
task_queue.wipe_queue() task_queue.wipe_queue()
return web.Response(status=200) return web.Response(status=200)
@ -1427,11 +1647,14 @@ async def queue_count(request):
@routes.get("/v2/manager/queue/start") @routes.get("/v2/manager/queue/start")
async def queue_start(request): async def queue_start(request):
# finalize_temp_queue_batch() # finalize_temp_queue_batch()
logging.debug("[ComfyUI-Manager] Queue start requested")
started = task_queue.start_worker() started = task_queue.start_worker()
if started: if started:
logging.debug("[ComfyUI-Manager] Queue worker started successfully")
return web.Response(status=200) # Started successfully return web.Response(status=200) # Started successfully
else: else:
logging.debug("[ComfyUI-Manager] Queue worker already in progress")
return web.Response(status=201) # Already in-progress return web.Response(status=201) # Already in-progress
@ -1451,7 +1674,7 @@ async def update_comfyui(request):
task = QueueTaskItem( task = QueueTaskItem(
ui_id=ui_id, ui_id=ui_id,
client_id=client_id, client_id=client_id,
kind="update-comfyui", kind=OperationType.update_comfyui.value,
params=UpdateComfyUIParams(is_stable=is_stable) params=UpdateComfyUIParams(is_stable=is_stable)
) )
@ -1490,7 +1713,7 @@ async def comfyui_switch_version(request):
task = QueueTaskItem( task = QueueTaskItem(
ui_id=ui_id, ui_id=ui_id,
client_id=client_id, client_id=client_id,
kind="update-comfyui", kind=OperationType.update_comfyui.value,
params=UpdateComfyUIParams(target_version=target_version) params=UpdateComfyUIParams(target_version=target_version)
) )
@ -1502,7 +1725,7 @@ async def comfyui_switch_version(request):
async def check_whitelist_for_model(item): async def check_whitelist_for_model(item):
json_obj = await core.get_data_by_mode("cache", "model-list.json") json_obj = await core.get_data_by_mode(ManagerDatabaseSource.cache.value, "model-list.json")
for x in json_obj.get("models", []): for x in json_obj.get("models", []):
if ( if (
@ -1512,7 +1735,7 @@ async def check_whitelist_for_model(item):
): ):
return True return True
json_obj = await core.get_data_by_mode("local", "model-list.json") json_obj = await core.get_data_by_mode(ManagerDatabaseSource.local.value, "model-list.json")
for x in json_obj.get("models", []): for x in json_obj.get("models", []):
if ( if (
@ -1543,7 +1766,7 @@ async def install_model(request):
task = QueueTaskItem( task = QueueTaskItem(
ui_id=json_data['ui_id'], ui_id=json_data['ui_id'],
client_id=json_data['client_id'], client_id=json_data['client_id'],
kind="install-model", kind=OperationType.install_model.value,
params=model_data params=model_data
) )
@ -1717,11 +1940,11 @@ async def default_cache_update():
) )
else: else:
# load at least once # load at least once
await core.unified_manager.reload("remote", dont_wait=False) await core.unified_manager.reload(ManagerDatabaseSource.remote.value, dont_wait=False)
await core.unified_manager.get_custom_nodes(channel_url, "remote") await core.unified_manager.get_custom_nodes(channel_url, ManagerDatabaseSource.remote.value)
else: else:
await core.unified_manager.reload( await core.unified_manager.reload(
"remote", dont_wait=False, update_cnr_map=False ManagerDatabaseSource.remote.value, dont_wait=False, update_cnr_map=False
) )
logging.info("[ComfyUI-Manager] All startup tasks have been completed.") logging.info("[ComfyUI-Manager] All startup tasks have been completed.")