2025-06-13 19:37:57 -07:00

1747 lines
58 KiB
Python

"""
ComfyUI Manager Server
Main server implementation providing REST API endpoints for ComfyUI Manager functionality.
Handles task queue management, custom node operations, model installation, and system configuration.
"""
import asyncio
import concurrent
import copy
import heapq
import json
import logging
import os
import platform
import re
import shutil
import subprocess # don't remove this
import sys
import threading
import traceback
import urllib.request
import uuid
import zipfile
from datetime import datetime
from typing import Any, Dict, List, Literal, NamedTuple, Optional
import folder_paths
import latent_preview
import nodes
from aiohttp import web
from comfy.cli_args import args
from pydantic import ValidationError
from comfyui_manager.glob.utils import (
formatting_utils,
model_utils,
security_utils,
node_pack_utils,
environment_utils,
)
from server import PromptServer
from . import manager_core as core
from ..common import manager_util
from ..common import cm_global
from ..common import manager_downloader
from ..common import context
from ..data_models import (
QueueTaskItem,
TaskHistoryItem,
TaskStateMessage,
TaskExecutionStatus,
MessageTaskDone,
MessageTaskStarted,
MessageUpdate,
ManagerMessageName,
BatchExecutionRecord,
ComfyUISystemState,
BatchOperation,
InstalledNodeInfo,
ComfyUIVersionInfo,
InstallPackParams,
UpdatePackParams,
UpdateAllPacksParams,
UpdateComfyUIParams,
FixPackParams,
UninstallPackParams,
DisablePackParams,
EnablePackParams,
ModelMetadata,
)
from .constants import (
model_dir_name_map,
SECURITY_MESSAGE_MIDDLE_OR_BELOW,
)
if not manager_util.is_manager_pip_package():
network_mode_description = "offline"
else:
network_mode_description = core.get_config()["network_mode"]
logging.info("[ComfyUI-Manager] network_mode: " + network_mode_description)
MAXIMUM_HISTORY_SIZE = 10000
routes = PromptServer.instance.routes
def is_loopback(address):
import ipaddress
try:
return ipaddress.ip_address(address).is_loopback
except ValueError:
return False
is_local_mode = is_loopback(args.listen)
def validate_required_params(request: web.Request, required_params: List[str]) -> Optional[web.Response]:
"""Validate that all required query parameters are present.
Args:
request: The aiohttp request object
required_params: List of required parameter names
Returns:
web.Response with 400 status if validation fails, None if validation passes
"""
missing_params = []
for param in required_params:
if param not in request.rel_url.query:
missing_params.append(param)
if missing_params:
missing_str = ", ".join(missing_params)
return web.Response(
status=400,
text=f"Missing required parameter(s): {missing_str}"
)
return None
def error_response(status: int, message: str, error_type: Optional[str] = None) -> web.Response:
"""Create a standardized error response.
Args:
status: HTTP status code
message: Error message
error_type: Optional error type/category
Returns:
web.Response with JSON error body
"""
error_data = {"error": message}
if error_type:
error_data["error_type"] = error_type
return web.json_response(error_data, status=status)
class ManagerFuncsInComfyUI(core.ManagerFuncs):
def get_current_preview_method(self):
if args.preview_method == latent_preview.LatentPreviewMethod.Auto:
return "auto"
elif args.preview_method == latent_preview.LatentPreviewMethod.Latent2RGB:
return "latent2rgb"
elif args.preview_method == latent_preview.LatentPreviewMethod.TAESD:
return "taesd"
else:
return "none"
def run_script(self, cmd, cwd="."):
if len(cmd) > 0 and cmd[0].startswith("#"):
logging.error(f"[ComfyUI-Manager] Unexpected behavior: `{cmd}`")
return 0
process = subprocess.Popen(
cmd,
cwd=cwd,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True,
bufsize=1,
env=core.get_script_env(),
)
stdout_thread = threading.Thread(
target=formatting_utils.handle_stream, args=(process.stdout, "")
)
stderr_thread = threading.Thread(
target=formatting_utils.handle_stream, args=(process.stderr, "[!]")
)
stdout_thread.start()
stderr_thread.start()
stdout_thread.join()
stderr_thread.join()
return process.wait()
core.manager_funcs = ManagerFuncsInComfyUI()
from comfyui_manager.common.manager_downloader import (
download_url,
download_url_with_agent,
)
class TaskQueue:
instance = None
def __init__(self):
TaskQueue.instance = self
self.mutex = threading.RLock()
self.not_empty = threading.Condition(self.mutex)
self.current_index = 0
self.pending_tasks = []
self.running_tasks = {}
self.history_tasks = {}
self.task_counter = 0
self.batch_id = None
self.batch_start_time = None
self.batch_state_before = None
class ExecutionStatus(NamedTuple):
status_str: Literal["success", "error", "skip"]
completed: bool
messages: List[str]
def get_current_state(self) -> TaskStateMessage:
return TaskStateMessage(
history=self.get_history(),
running_queue=self.get_current_queue()[0],
pending_queue=self.get_current_queue()[1],
installed_packs=core.get_installed_node_packs(),
)
@staticmethod
def send_queue_state_update(
msg: str, update: MessageUpdate, client_id: Optional[str] = None
) -> None:
"""Send queue state update to clients.
Args:
msg: Message type/event name
update: Update data to send
client_id: Optional client ID. If None, broadcasts to all clients.
If provided, sends only to that specific client.
"""
PromptServer.instance.send_sync(msg, update.model_dump(mode='json'), client_id)
def put(self, item) -> None:
"""Add a task to the queue. Item can be a dict or QueueTaskItem model."""
with self.mutex:
# Start a new batch if this is the first task after queue was empty
if (
self.batch_id is None
and len(self.pending_tasks) == 0
and len(self.running_tasks) == 0
):
self._start_new_batch()
# Convert to Pydantic model if it's a dict
if isinstance(item, dict):
item = QueueTaskItem(**item)
heapq.heappush(self.pending_tasks, item)
self.not_empty.notify()
def _start_new_batch(self) -> None:
"""Start a new batch session for tracking operations."""
self.batch_id = (
f"batch_{datetime.now().strftime('%Y%m%d_%H%M%S')}_{uuid.uuid4().hex[:8]}"
)
self.batch_start_time = datetime.now().isoformat()
self.batch_state_before = self._capture_system_state()
logging.info("[ComfyUI-Manager] Started new batch: %s", self.batch_id)
def get(
self, timeout: Optional[float] = None
) -> tuple[Optional[QueueTaskItem], int]:
with self.not_empty:
while len(self.pending_tasks) == 0:
self.not_empty.wait(timeout=timeout)
if timeout is not None and len(self.pending_tasks) == 0:
return None
item = heapq.heappop(self.pending_tasks)
task_index = self.task_counter
self.running_tasks[task_index] = copy.deepcopy(item)
self.task_counter += 1
TaskQueue.send_queue_state_update(
ManagerMessageName.cm_task_started.value,
MessageTaskStarted(
ui_id=item.ui_id,
kind=item.kind,
timestamp=datetime.now(),
state=self.get_current_state(),
),
client_id=item.client_id, # Send task started only to the client that requested it
)
return item, task_index
def task_done(
self,
item: QueueTaskItem,
task_index: int,
result_msg: str,
status: Optional["TaskQueue.ExecutionStatus"] = None,
) -> None:
"""Mark task as completed and add to history"""
with self.mutex:
timestamp = datetime.now().isoformat()
# Remove task from running_tasks using the task_index
self.running_tasks.pop(task_index, None)
# Manage history size
if len(self.history_tasks) > MAXIMUM_HISTORY_SIZE:
self.history_tasks.pop(next(iter(self.history_tasks)))
# Convert TaskQueue.ExecutionStatus to TaskExecutionStatus Pydantic model
pydantic_status: Optional[TaskExecutionStatus] = None
if status is not None:
pydantic_status = TaskExecutionStatus(
status_str=status.status_str,
completed=status.completed,
messages=status.messages
)
# Update history
self.history_tasks[item.ui_id] = TaskHistoryItem(
ui_id=item.ui_id,
client_id=item.client_id,
timestamp=datetime.fromisoformat(timestamp),
result=result_msg,
kind=item.kind,
status=pydantic_status,
)
# Send WebSocket message indicating task is complete
TaskQueue.send_queue_state_update(
ManagerMessageName.cm_task_completed.value,
MessageTaskDone(
ui_id=item.ui_id,
result=result_msg,
kind=item.kind,
status=pydantic_status,
timestamp=datetime.fromisoformat(timestamp),
state=self.get_current_state(),
),
client_id=item.client_id, # Send completion only to the client that requested it
)
def get_current_queue(self) -> tuple[list[QueueTaskItem], list[QueueTaskItem]]:
"""Get current running and remaining tasks"""
with self.mutex:
running = list(self.running_tasks.values())
remaining = copy.copy(self.pending_tasks)
return running, remaining
def get_tasks_remaining(self) -> int:
"""Get number of tasks remaining"""
with self.mutex:
return len(self.pending_tasks) + len(self.running_tasks)
def wipe_queue(self) -> None:
"""Clear all task queue"""
with self.mutex:
self.pending_tasks = []
def abort(self) -> None:
"""Abort current operations"""
with self.mutex:
self.pending_tasks = []
self.running_tasks = {}
def delete_history_item(self, ui_id: str) -> None:
"""Delete specific task from history"""
with self.mutex:
self.history_tasks.pop(ui_id, None)
def get_history(
self,
ui_id: Optional[str] = None,
max_items: Optional[int] = None,
offset: int = -1,
) -> dict[str, TaskHistoryItem]:
"""Get task history. If ui_id (task id) is passsed, only return that task's history item entry."""
with self.mutex:
if ui_id is None:
out = {}
i = 0
if offset < 0 and max_items is not None:
offset = len(self.history_tasks) - max_items
for k in self.history_tasks:
if i >= offset:
out[k] = self.history_tasks[k]
if max_items is not None and len(out) >= max_items:
break
i += 1
return out
elif ui_id in self.history_tasks:
return self.history_tasks[ui_id]
else:
return {}
def done_count(self) -> int:
"""Get the number of completed tasks in history.
Returns:
int: Number of tasks that have been completed and are stored in history.
Returns 0 if history_tasks is None (defensive programming).
"""
return len(self.history_tasks) if self.history_tasks is not None else 0
def total_count(self) -> int:
"""Get the total number of tasks currently in the system (pending + running).
Returns:
int: Combined count of pending and running tasks.
Returns 0 if either collection is None (defensive programming).
"""
return (
len(self.pending_tasks) + len(self.running_tasks)
if self.pending_tasks is not None and self.running_tasks is not None
else 0
)
def finalize(self) -> None:
"""Finalize a completed task batch by saving execution history to disk.
This method is intended to be called when the queue transitions from having
tasks to being completely empty (no pending or running tasks). It will create
a comprehensive snapshot of the ComfyUI state and all operations performed.
"""
if self.batch_id is not None:
batch_path = os.path.join(
context.manager_batch_history_path, self.batch_id + ".json"
)
try:
end_time = datetime.now().isoformat()
state_after = self._capture_system_state()
operations = self._extract_batch_operations()
batch_record = BatchExecutionRecord(
batch_id=self.batch_id,
start_time=self.batch_start_time,
end_time=end_time,
state_before=self.batch_state_before,
state_after=state_after,
operations=operations,
total_operations=len(operations),
successful_operations=len(
[op for op in operations if op.result == "success"]
),
failed_operations=len(
[op for op in operations if op.result == "failed"]
),
skipped_operations=len(
[op for op in operations if op.result == "skipped"]
),
)
# Save to disk
with open(batch_path, "w", encoding="utf-8") as json_file:
json.dump(
batch_record.model_dump(), json_file, indent=4, default=str
)
logging.info(f"[ComfyUI-Manager] Batch history saved: {batch_path}")
# Reset batch tracking
self.batch_id = None
self.batch_start_time = None
self.batch_state_before = None
except Exception as e:
logging.error(f"[ComfyUI-Manager] Failed to save batch history: {e}")
def _capture_system_state(self) -> ComfyUISystemState:
"""Capture current ComfyUI system state for batch record."""
return ComfyUISystemState(
snapshot_time=datetime.now().isoformat(),
comfyui_version=self._get_comfyui_version_info(),
python_version=platform.python_version(),
platform_info=f"{platform.system()} {platform.release()} ({platform.machine()})",
installed_nodes=self._get_installed_nodes(),
)
def _get_comfyui_version_info(self) -> ComfyUIVersionInfo:
"""Get ComfyUI version information."""
try:
version_info = core.get_comfyui_versions()
current_version = version_info[1] if len(version_info) > 1 else "unknown"
return ComfyUIVersionInfo(version=current_version)
except Exception:
return ComfyUIVersionInfo(version="unknown")
def _get_installed_nodes(self) -> dict[str, InstalledNodeInfo]:
"""Get information about installed node packages."""
installed_nodes = {}
try:
node_packs = core.get_installed_node_packs()
for pack_name, pack_info in node_packs.items():
installed_nodes[pack_name] = InstalledNodeInfo(
name=pack_name,
version=pack_info.get("ver", "unknown"),
install_method="unknown",
enabled=pack_info.get("enabled", True),
)
except Exception as e:
logging.warning(f"[ComfyUI-Manager] Failed to get installed nodes: {e}")
return installed_nodes
def _extract_batch_operations(self) -> list[BatchOperation]:
"""Extract operations from completed task history for this batch."""
operations = []
try:
for ui_id, task in self.history_tasks.items():
result_status = "success"
if task.status:
status_str = task.status.get("status_str", "success")
if status_str == "error":
result_status = "failed"
elif status_str == "skip":
result_status = "skipped"
operation = BatchOperation(
operation_id=ui_id,
operation_type=task.kind,
target=f"task_{ui_id}",
result=result_status,
start_time=task.timestamp,
client_id=task.client_id,
)
operations.append(operation)
except Exception as e:
logging.warning(
f"[ComfyUI-Manager] Failed to extract batch operations: {e}"
)
return operations
task_queue = TaskQueue()
# Preview method initialization
if args.preview_method == latent_preview.LatentPreviewMethod.NoPreviews:
environment_utils.set_preview_method(core.get_config()['preview_method'])
else:
logging.warning("[ComfyUI-Manager] Since --preview-method is set, ComfyUI-Manager's preview method feature will be ignored.")
# Legacy variables for compatibility
task_worker_thread = None
task_worker_lock = threading.Lock()
# Note: Model path utilities moved to model_utils.py to avoid duplication
async def task_worker():
await core.unified_manager.reload("cache")
async def do_install(params: InstallPackParams) -> str:
node_id = params.id
node_version = params.selected_version
channel = params.channel
mode = params.mode
skip_post_install = params.skip_post_install
try:
node_spec = core.unified_manager.resolve_node_spec(
f"{node_id}@{node_version}"
)
if node_spec is None:
logging.error(
f"Cannot resolve install target: '{node_id}@{node_version}'"
)
return f"Cannot resolve install target: '{node_id}@{node_version}'"
node_name, version_spec, is_specified = node_spec
res = await core.unified_manager.install_by_id(
node_name,
version_spec,
channel,
mode,
return_postinstall=skip_post_install,
) # discard post install if skip_post_install mode
if res.action not in [
"skip",
"enable",
"install-git",
"install-cnr",
"switch-cnr",
]:
logging.error(f"[ComfyUI-Manager] Installation failed:\n{res.msg}")
return res.msg
elif not res.result:
logging.error(f"[ComfyUI-Manager] Installation failed:\n{res.msg}")
return res.msg
return "success"
except Exception:
traceback.print_exc()
return "Installation failed"
async def do_enable(params: EnablePackParams) -> str:
cnr_id = params.cnr_id
core.unified_manager.unified_enable(cnr_id)
return "success"
async def do_update(params: UpdatePackParams) -> str:
node_name = params.node_name
node_ver = params.node_ver
try:
res = core.unified_manager.unified_update(node_name, node_ver)
if res.ver == "unknown":
url = core.unified_manager.unknown_active_nodes[node_name][0]
try:
title = os.path.basename(url)
except Exception:
title = node_name
else:
url = core.unified_manager.cnr_map[node_name].get("repository")
title = core.unified_manager.cnr_map[node_name]["name"]
manager_util.clear_pip_cache()
if url is not None:
base_res = {"url": url, "title": title}
else:
base_res = {"title": title}
if res.result:
if res.action == "skip":
base_res["msg"] = "skip"
return base_res
else:
base_res["msg"] = "success"
return base_res
base_res["msg"] = f"An error occurred while updating '{node_name}'."
logging.error(
f"\nERROR: An error occurred while updating '{node_name}'. (res.result={res.result}, res.action={res.action})"
)
return base_res
except Exception:
traceback.print_exc()
return {"msg": f"An error occurred while updating '{node_name}'."}
async def do_update_comfyui(params: UpdateComfyUIParams) -> str:
try:
repo_path = os.path.dirname(folder_paths.__file__)
# Check if this is a version switch operation
if params.target_version:
# Switch to specific version
logging.info(f"Switching ComfyUI to version: {params.target_version}")
core.switch_comfyui(params.target_version)
return f"success-switched-{params.target_version}"
else:
# Regular update operation
is_stable = params.is_stable if params.is_stable is not None else True
latest_tag = None
if is_stable:
res, latest_tag = core.update_to_stable_comfyui(repo_path)
else:
res = core.update_path(repo_path)
if res == "fail":
logging.error("ComfyUI update failed")
return "fail"
elif res == "updated":
if is_stable:
logging.info("ComfyUI is updated to latest stable version.")
return "success-stable-" + latest_tag
else:
logging.info("ComfyUI is updated to latest nightly version.")
return "success-nightly"
else: # skipped
logging.info("ComfyUI is up-to-date.")
return "skip"
except Exception:
traceback.print_exc()
return "An error occurred while updating 'comfyui'."
async def do_fix(params: FixPackParams) -> str:
node_name = params.node_name
node_ver = params.node_ver
try:
res = core.unified_manager.unified_fix(node_name, node_ver)
if res.result:
return "success"
else:
logging.error(res.msg)
logging.error(
f"\nERROR: An error occurred while fixing '{node_name}@{node_ver}'."
)
except Exception:
traceback.print_exc()
return f"An error occurred while fixing '{node_name}@{node_ver}'."
async def do_uninstall(params: UninstallPackParams) -> str:
node_name = params.node_name
is_unknown = params.is_unknown
try:
res = core.unified_manager.unified_uninstall(node_name, is_unknown)
if res.result:
return "success"
logging.error(
f"\nERROR: An error occurred while uninstalling '{node_name}'."
)
except Exception:
traceback.print_exc()
return f"An error occurred while uninstalling '{node_name}'."
async def do_disable(params: DisablePackParams) -> str:
node_name = params.node_name
try:
res = core.unified_manager.unified_disable(
node_name, params.is_unknown
)
if res:
return "success"
except Exception:
traceback.print_exc()
return f"Failed to disable: '{node_name}'"
async def do_install_model(params: ModelMetadata) -> str:
json_data = params.model_dump()
model_path = model_utils.get_model_path(json_data)
model_url = json_data.get("url")
res = False
try:
if model_path is not None:
logging.info(
f"Install model '{json_data['name']}' from '{model_url}' into '{model_path}'"
)
if json_data["filename"] == "<huggingface>":
if os.path.exists(
os.path.join(model_path, os.path.dirname(json_data["url"]))
):
logging.error(
f"[ComfyUI-Manager] the model path already exists: {model_path}"
)
return f"The model path already exists: {model_path}"
logging.info(
f"[ComfyUI-Manager] Downloading '{model_url}' into '{model_path}'"
)
manager_downloader.download_repo_in_bytes(
repo_id=model_url, local_dir=model_path
)
return "success"
elif not core.get_config()["model_download_by_agent"] and (
model_url.startswith("https://github.com")
or model_url.startswith("https://huggingface.co")
or model_url.startswith("https://heibox.uni-heidelberg.de")
):
model_dir = model_utils.get_model_dir(json_data, True)
download_url(model_url, model_dir, filename=json_data["filename"])
if model_path.endswith(".zip"):
res = core.unzip(model_path)
else:
res = True
if res:
return "success"
else:
res = download_url_with_agent(model_url, model_path)
if res and model_path.endswith(".zip"):
res = core.unzip(model_path)
else:
logging.error(
f"[ComfyUI-Manager] Model installation error: invalid model type - {json_data['type']}"
)
if res:
return "success"
except Exception as e:
logging.error(f"[ComfyUI-Manager] ERROR: {e}", file=sys.stderr)
return f"Model installation error: {model_url}"
async def do_update_all(params: UpdateAllPacksParams):
# For update-all tasks, we need client info from the original task
# This should not be called anymore since update_all now creates individual tasks
return "error: update_all should create individual tasks, not use task worker"
while True:
timeout = 4096
task = task_queue.get(timeout)
if task is None:
# Check if queue is truly empty (no pending or running tasks)
if task_queue.total_count() == 0 and len(task_queue.running_tasks) == 0:
logging.info("\n[ComfyUI-Manager] All tasks are completed.")
# Trigger batch history serialization if there are completed tasks
if task_queue.done_count() > 0:
logging.info("[ComfyUI-Manager] Finalizing batch history...")
task_queue.finalize()
logging.info("[ComfyUI-Manager] Batch history saved.")
logging.info("\nAfter restarting ComfyUI, please refresh the browser.")
res = {"status": "all-done"}
# Broadcast general status updates to all clients
PromptServer.instance.send_sync("cm-queue-status", res)
return
item, task_index = task
kind = item.kind
print(f"Processing task: {kind} with item: {item} at index: {task_index}")
try:
if kind == "install":
msg = await do_install(item.params)
elif kind == "enable":
msg = await do_enable(item.params)
elif kind == "install-model":
msg = await do_install_model(item.params)
elif kind == "update":
msg = await do_update(item.params)
elif kind == "update-main":
msg = await do_update(item.params)
elif kind == "update-comfyui":
msg = await do_update_comfyui(item.params)
elif kind == "fix":
msg = await do_fix(item.params)
elif kind == "uninstall":
msg = await do_uninstall(item.params)
elif kind == "disable":
msg = await do_disable(item.params)
else:
msg = "Unexpected kind: " + kind
except Exception:
msg = f"Exception: {(kind, item)}"
task_queue.task_done(
item, task_index, msg, TaskQueue.ExecutionStatus("error", True, [msg])
)
return
# Determine status and message for task completion
if isinstance(msg, dict) and "msg" in msg:
result_msg = msg["msg"]
else:
result_msg = msg
# Determine status
if result_msg == "success":
status = TaskQueue.ExecutionStatus("success", True, [])
elif result_msg == "skip":
status = TaskQueue.ExecutionStatus("skip", True, [])
else:
status = TaskQueue.ExecutionStatus("error", True, [result_msg])
task_queue.task_done(item, task_index, result_msg, status)
@routes.post("/v2/manager/queue/task")
async def queue_task(request) -> web.Response:
"""Add a new task to the processing queue.
Accepts task data via JSON POST and adds it to the TaskQueue for processing.
The task worker will automatically pick up and process queued tasks.
Args:
request: aiohttp request containing JSON task data
Returns:
web.Response: HTTP 200 on successful queueing, HTTP 400 on validation error
"""
try:
json_data = await request.json()
# Validate input using Pydantic model
task_item = QueueTaskItem.model_validate(json_data)
TaskQueue.instance.put(task_item)
# maybe start worker
return web.Response(status=200)
except ValidationError as e:
logging.error(f"[ComfyUI-Manager] Invalid task data: {e}")
return web.Response(status=400, text=f"Invalid task data: {e}")
except Exception as e:
logging.error(f"[ComfyUI-Manager] Error processing task: {e}")
return web.Response(status=500, text="Internal server error")
@routes.get("/v2/manager/queue/history_list")
async def get_history_list(request) -> web.Response:
"""Get list of available batch history files.
Returns a list of batch history IDs sorted by modification time (newest first).
These IDs can be used with the history endpoint to retrieve detailed batch information.
Returns:
web.Response: JSON response with 'ids' array of history file IDs
"""
history_path = context.manager_batch_history_path
try:
files = [
os.path.join(history_path, f)
for f in os.listdir(history_path)
if os.path.isfile(os.path.join(history_path, f))
]
files.sort(key=lambda x: os.path.getmtime(x), reverse=True)
history_ids = [os.path.basename(f)[:-5] for f in files]
return web.json_response(
{"ids": list(history_ids)}, content_type="application/json"
)
except Exception as e:
logging.error(f"[ComfyUI-Manager] /v2/manager/queue/history_list - {e}")
return web.Response(status=400)
@routes.get("/v2/manager/queue/history")
async def get_history(request):
"""Get task history with optional client filtering.
Query parameters:
id: Batch history ID (for file-based history)
client_id: Optional client ID to filter current session history
ui_id: Optional specific task ID to get single task history
max_items: Maximum number of items to return
offset: Offset for pagination
Returns:
JSON with filtered history data
"""
try:
# Handle file-based batch history
if "id" in request.rel_url.query:
json_name = request.rel_url.query["id"] + ".json"
batch_path = os.path.join(context.manager_batch_history_path, json_name)
with open(batch_path, "r", encoding="utf-8") as file:
json_str = file.read()
json_obj = json.loads(json_str)
return web.json_response(json_obj, content_type="application/json")
# Handle current session history with optional filtering
client_id = request.rel_url.query.get("client_id")
ui_id = request.rel_url.query.get("ui_id")
max_items = request.rel_url.query.get("max_items")
offset = request.rel_url.query.get("offset", -1)
if max_items:
max_items = int(max_items)
if offset:
offset = int(offset)
# Get history from TaskQueue
if ui_id:
history = task_queue.get_history(ui_id=ui_id)
else:
history = task_queue.get_history(max_items=max_items, offset=offset)
# Filter by client_id if provided
if client_id and isinstance(history, dict):
filtered_history = {
task_id: task_data
for task_id, task_data in history.items()
if hasattr(task_data, "client_id") and task_data.client_id == client_id
}
history = filtered_history
return web.json_response({"history": history}, content_type="application/json")
except Exception as e:
logging.error(f"[ComfyUI-Manager] /v2/manager/queue/history - {e}")
return web.Response(status=400)
@routes.get("/v2/customnode/getmappings")
async def fetch_customnode_mappings(request):
"""
provide unified (node -> node pack) mapping list
"""
mode = request.rel_url.query["mode"]
nickname_mode = False
if mode == "nickname":
mode = "local"
nickname_mode = True
json_obj = await core.get_data_by_mode(mode, "extension-node-map.json")
json_obj = core.map_to_unified_keys(json_obj)
if nickname_mode:
json_obj = node_pack_utils.nickname_filter(json_obj)
all_nodes = set()
patterns = []
for k, x in json_obj.items():
all_nodes.update(set(x[0]))
if "nodename_pattern" in x[1]:
patterns.append((x[1]["nodename_pattern"], x[0]))
missing_nodes = set(nodes.NODE_CLASS_MAPPINGS.keys()) - all_nodes
for x in missing_nodes:
for pat, item in patterns:
if re.match(pat, x):
item.append(x)
return web.json_response(json_obj, content_type="application/json")
@routes.get("/v2/customnode/fetch_updates")
async def fetch_updates(request):
"""
DEPRECATED: This endpoint is no longer supported.
Repository fetching has been removed from the API.
Updates should be performed through the queue system using update operations.
"""
return web.json_response(
{
"error": "This endpoint has been deprecated",
"message": "Repository fetching is no longer supported. Please use the update operations through the queue system.",
"deprecated": True
},
status=410 # 410 Gone
)
@routes.get("/v2/manager/queue/update_all")
async def update_all(request: web.Request) -> web.Response:
# Validate required query parameters
validation_error = validate_required_params(request, ["client_id", "ui_id"])
if validation_error:
return validation_error
json_data = dict(request.rel_url.query)
return await _update_all(json_data)
async def _update_all(json_data: Dict[str, Any]) -> web.Response:
if not security_utils.is_allowed_security_level("middle"):
logging.error(SECURITY_MESSAGE_MIDDLE_OR_BELOW)
return web.Response(status=403)
# Extract client info
base_ui_id = json_data["ui_id"]
client_id = json_data["client_id"]
mode = json_data.get("mode", "remote")
if mode == "local":
channel = "local"
else:
channel = core.get_config()["channel_url"]
await core.unified_manager.reload(mode)
await core.unified_manager.get_custom_nodes(channel, mode)
for k, v in core.unified_manager.active_nodes.items():
if k == "comfyui-manager":
# skip updating comfyui-manager if desktop version
if os.environ.get("__COMFYUI_DESKTOP_VERSION__"):
continue
update_task = QueueTaskItem(
kind="update",
ui_id=f"{base_ui_id}_{k}", # Use client's base ui_id + node name
client_id=client_id,
params=UpdatePackParams(node_name=k, node_ver=v[0])
)
task_queue.put(update_task)
for k, v in core.unified_manager.unknown_active_nodes.items():
if k == "comfyui-manager":
# skip updating comfyui-manager if desktop version
if os.environ.get("__COMFYUI_DESKTOP_VERSION__"):
continue
update_task = QueueTaskItem(
kind="update",
ui_id=f"{base_ui_id}_{k}", # Use client's base ui_id + node name
client_id=client_id,
params=UpdatePackParams(node_name=k, node_ver="unknown")
)
task_queue.put(update_task)
return web.Response(status=200)
@routes.get("/v2/manager/is_legacy_manager_ui")
async def is_legacy_manager_ui(request):
return web.json_response(
{"is_legacy_manager_ui": args.enable_manager_legacy_ui},
content_type="application/json",
status=200,
)
# freeze imported version
startup_time_installed_node_packs = core.get_installed_node_packs()
@routes.get("/v2/customnode/installed")
async def installed_list(request):
mode = request.query.get("mode", "default")
if mode == "imported":
res = startup_time_installed_node_packs
else:
res = core.get_installed_node_packs()
return web.json_response(res, content_type="application/json")
def check_model_installed(json_obj):
def is_exists(model_dir_name, filename, url):
if filename == "<huggingface>":
filename = os.path.basename(url)
dirs = folder_paths.get_folder_paths(model_dir_name)
for x in dirs:
if os.path.exists(os.path.join(x, filename)):
return True
return False
model_dir_names = [
"checkpoints",
"loras",
"vae",
"text_encoders",
"diffusion_models",
"clip_vision",
"embeddings",
"diffusers",
"vae_approx",
"controlnet",
"gligen",
"upscale_models",
"hypernetworks",
"photomaker",
"classifiers",
]
total_models_files = set()
for x in model_dir_names:
for y in folder_paths.get_filename_list(x):
total_models_files.add(y)
def process_model_phase(item):
if (
"diffusion" not in item["filename"]
and "pytorch" not in item["filename"]
and "model" not in item["filename"]
):
# non-general name case
if item["filename"] in total_models_files:
item["installed"] = "True"
return
if item["save_path"] == "default":
model_dir_name = model_dir_name_map.get(item["type"].lower())
if model_dir_name is not None:
item["installed"] = str(
is_exists(model_dir_name, item["filename"], item["url"])
)
else:
item["installed"] = "False"
else:
model_dir_name = item["save_path"].split("/")[0]
if model_dir_name in folder_paths.folder_names_and_paths:
if is_exists(model_dir_name, item["filename"], item["url"]):
item["installed"] = "True"
if "installed" not in item:
if item["filename"] == "<huggingface>":
filename = os.path.basename(item["url"])
else:
filename = item["filename"]
fullpath = os.path.join(
folder_paths.models_dir, item["save_path"], filename
)
item["installed"] = "True" if os.path.exists(fullpath) else "False"
with concurrent.futures.ThreadPoolExecutor(8) as executor:
for item in json_obj["models"]:
executor.submit(process_model_phase, item)
@routes.get("/v2/snapshot/getlist")
async def get_snapshot_list(request):
items = [
f[:-5] for f in os.listdir(context.manager_snapshot_path) if f.endswith(".json")
]
items.sort(reverse=True)
return web.json_response({"items": items}, content_type="application/json")
@routes.get("/v2/snapshot/remove")
async def remove_snapshot(request):
if not security_utils.is_allowed_security_level("middle"):
logging.error(SECURITY_MESSAGE_MIDDLE_OR_BELOW)
return web.Response(status=403)
try:
target = request.rel_url.query["target"]
path = os.path.join(context.manager_snapshot_path, f"{target}.json")
if os.path.exists(path):
os.remove(path)
return web.Response(status=200)
except Exception:
return web.Response(status=400)
@routes.get("/v2/snapshot/restore")
async def restore_snapshot(request):
if not security_utils.is_allowed_security_level("middle"):
logging.error(SECURITY_MESSAGE_MIDDLE_OR_BELOW)
return web.Response(status=403)
try:
target = request.rel_url.query["target"]
path = os.path.join(context.manager_snapshot_path, f"{target}.json")
if os.path.exists(path):
if not os.path.exists(context.manager_startup_script_path):
os.makedirs(context.manager_startup_script_path)
target_path = os.path.join(
context.manager_startup_script_path, "restore-snapshot.json"
)
shutil.copy(path, target_path)
logging.info(f"Snapshot restore scheduled: `{target}`")
return web.Response(status=200)
logging.error(f"Snapshot file not found: `{path}`")
return web.Response(status=400)
except Exception:
return web.Response(status=400)
@routes.get("/v2/snapshot/get_current")
async def get_current_snapshot_api(request):
try:
return web.json_response(
await core.get_current_snapshot(), content_type="application/json"
)
except Exception:
return web.Response(status=400)
@routes.get("/v2/snapshot/save")
async def save_snapshot(request):
try:
await core.save_snapshot_with_postfix("snapshot")
return web.Response(status=200)
except Exception:
return web.Response(status=400)
def unzip_install(files):
temp_filename = "manager-temp.zip"
for url in files:
if url.endswith("/"):
url = url[:-1]
try:
headers = {
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/58.0.3029.110 Safari/537.3"
}
req = urllib.request.Request(url, headers=headers)
response = urllib.request.urlopen(req)
data = response.read()
with open(temp_filename, "wb") as f:
f.write(data)
with zipfile.ZipFile(temp_filename, "r") as zip_ref:
zip_ref.extractall(core.get_default_custom_nodes_path())
os.remove(temp_filename)
except Exception as e:
logging.error(f"Install(unzip) error: {url} / {e}", file=sys.stderr)
return False
logging.info("Installation was successful.")
return True
@routes.post("/v2/customnode/import_fail_info")
async def import_fail_info(request):
try:
json_data = await request.json()
# Basic validation - ensure we have either cnr_id or url
if not isinstance(json_data, dict):
return web.Response(status=400, text="Request body must be a JSON object")
if "cnr_id" not in json_data and "url" not in json_data:
return web.Response(
status=400, text="Either 'cnr_id' or 'url' field is required"
)
if "cnr_id" in json_data:
if not isinstance(json_data["cnr_id"], str):
return web.Response(status=400, text="'cnr_id' must be a string")
module_name = core.unified_manager.get_module_name(json_data["cnr_id"])
else:
if not isinstance(json_data["url"], str):
return web.Response(status=400, text="'url' must be a string")
module_name = core.unified_manager.get_module_name(json_data["url"])
if module_name is not None:
info = cm_global.error_dict.get(module_name)
if info is not None:
return web.json_response(info)
return web.Response(status=400)
except Exception as e:
logging.error(f"[ComfyUI-Manager] Error processing import fail info: {e}")
return web.Response(status=500, text="Internal server error")
@routes.get("/v2/manager/queue/reset")
async def reset_queue(request):
task_queue.wipe_queue()
return web.Response(status=200)
@routes.get("/v2/manager/queue/status")
async def queue_count(request):
"""Get current queue status with optional client filtering.
Query parameters:
client_id: Optional client ID to filter tasks
Returns:
JSON with queue counts and processing status
"""
client_id = request.query.get("client_id")
if client_id:
# Filter tasks by client_id
running_client_tasks = [
task
for task in task_queue.running_tasks.values()
if task.client_id == client_id
]
pending_client_tasks = [
task
for task in task_queue.pending_tasks
if task.client_id == client_id
]
history_client_tasks = {
ui_id: task
for ui_id, task in task_queue.history_tasks.items()
if hasattr(task, "client_id") and task.client_id == client_id
}
return web.json_response(
{
"client_id": client_id,
"total_count": len(pending_client_tasks) + len(running_client_tasks),
"done_count": len(history_client_tasks),
"in_progress_count": len(running_client_tasks),
"pending_count": len(pending_client_tasks),
"is_processing": task_worker_thread is not None
and task_worker_thread.is_alive(),
}
)
else:
# Return overall status
return web.json_response(
{
"total_count": task_queue.total_count(),
"done_count": task_queue.done_count(),
"in_progress_count": len(task_queue.running_tasks),
"pending_count": len(task_queue.pending_tasks),
"is_processing": task_worker_thread is not None
and task_worker_thread.is_alive(),
}
)
task_worker_thread: threading.Thread = None
@routes.get("/v2/manager/queue/start")
async def queue_start(request):
with task_worker_lock:
# finalize_temp_queue_batch()
return _queue_start()
def _queue_start():
global task_worker_thread
if task_worker_thread is not None and task_worker_thread.is_alive():
return web.Response(status=201) # already in-progress
task_worker_thread = threading.Thread(target=lambda: asyncio.run(task_worker()))
task_worker_thread.start()
return web.Response(status=200)
@routes.get("/v2/manager/queue/update_comfyui")
async def update_comfyui(request):
"""Queue a ComfyUI update based on the configured update policy."""
# Validate required query parameters
validation_error = validate_required_params(request, ["client_id", "ui_id"])
if validation_error:
return validation_error
is_stable = core.get_config()["update_policy"] != "nightly-comfyui"
client_id = request.rel_url.query["client_id"]
ui_id = request.rel_url.query["ui_id"]
# Create update-comfyui task
task = QueueTaskItem(
ui_id=ui_id,
client_id=client_id,
kind="update-comfyui",
params=UpdateComfyUIParams(is_stable=is_stable)
)
task_queue.put(task)
return web.Response(status=200)
@routes.get("/v2/comfyui_manager/comfyui_versions")
async def comfyui_versions(request):
try:
res, current, latest = core.get_comfyui_versions()
return web.json_response(
{"versions": res, "current": current},
status=200,
content_type="application/json",
)
except Exception as e:
logging.error(f"ComfyUI update fail: {e}", file=sys.stderr)
return web.Response(status=400)
@routes.get("/v2/comfyui_manager/comfyui_switch_version")
async def comfyui_switch_version(request):
try:
# Validate required query parameters
validation_error = validate_required_params(request, ["ver", "client_id", "ui_id"])
if validation_error:
return validation_error
target_version = request.rel_url.query["ver"]
client_id = request.rel_url.query["client_id"]
ui_id = request.rel_url.query["ui_id"]
# Create update-comfyui task with target version
task = QueueTaskItem(
ui_id=ui_id,
client_id=client_id,
kind="update-comfyui",
params=UpdateComfyUIParams(target_version=target_version)
)
task_queue.put(task)
return web.Response(status=200)
except Exception as e:
logging.error(f"ComfyUI version switch fail: {e}", file=sys.stderr)
return web.Response(status=400)
async def check_whitelist_for_model(item):
json_obj = await core.get_data_by_mode("cache", "model-list.json")
for x in json_obj.get("models", []):
if (
x["save_path"] == item["save_path"]
and x["base"] == item["base"]
and x["filename"] == item["filename"]
):
return True
json_obj = await core.get_data_by_mode("local", "model-list.json")
for x in json_obj.get("models", []):
if (
x["save_path"] == item["save_path"]
and x["base"] == item["base"]
and x["filename"] == item["filename"]
):
return True
return False
@routes.post("/v2/manager/queue/install_model")
async def install_model(request):
try:
json_data = await request.json()
# Validate required fields
if 'client_id' not in json_data:
return web.Response(status=400, text="Missing required field: client_id")
if 'ui_id' not in json_data:
return web.Response(status=400, text="Missing required field: ui_id")
# Validate model metadata
model_data = ModelMetadata.model_validate(json_data)
# Create install-model task with client-provided IDs
task = QueueTaskItem(
ui_id=json_data['ui_id'],
client_id=json_data['client_id'],
kind="install-model",
params=model_data
)
task_queue.put(task)
return web.Response(status=200)
except ValidationError as e:
logging.error(f"[ComfyUI-Manager] Invalid model data: {e}")
return web.Response(status=400, text=f"Invalid model data: {e}")
except Exception as e:
logging.error(f"[ComfyUI-Manager] Error processing model install: {e}")
return web.Response(status=500, text="Internal server error")
@routes.get("/v2/manager/db_mode")
async def db_mode(request):
if "value" in request.rel_url.query:
environment_utils.set_db_mode(request.rel_url.query["value"])
core.write_config()
else:
return web.Response(text=core.get_config()["db_mode"], status=200)
return web.Response(status=200)
@routes.get("/v2/manager/policy/update")
async def update_policy(request):
if "value" in request.rel_url.query:
environment_utils.set_update_policy(request.rel_url.query["value"])
core.write_config()
else:
return web.Response(text=core.get_config()["update_policy"], status=200)
return web.Response(status=200)
@routes.get("/v2/manager/channel_url_list")
async def channel_url_list(request):
channels = core.get_channel_dict()
if "value" in request.rel_url.query:
channel_url = channels.get(request.rel_url.query["value"])
if channel_url is not None:
core.get_config()["channel_url"] = channel_url
core.write_config()
else:
selected = "custom"
selected_url = core.get_config()["channel_url"]
for name, url in channels.items():
if url == selected_url:
selected = name
break
res = {"selected": selected, "list": core.get_channel_list()}
return web.json_response(res, status=200)
return web.Response(status=200)
@routes.get("/v2/manager/reboot")
def restart(self):
if not security_utils.is_allowed_security_level("middle"):
logging.error(SECURITY_MESSAGE_MIDDLE_OR_BELOW)
return web.Response(status=403)
try:
sys.stdout.close_log()
except Exception:
pass
if "__COMFY_CLI_SESSION__" in os.environ:
with open(os.path.join(os.environ["__COMFY_CLI_SESSION__"] + ".reboot"), "w"):
pass
print(
"\nRestarting...\n\n"
) # This printing should not be logging - that will be ugly
exit(0)
print(
"\nRestarting... [Legacy Mode]\n\n"
) # This printing should not be logging - that will be ugly
sys_argv = sys.argv.copy()
if "--windows-standalone-build" in sys_argv:
sys_argv.remove("--windows-standalone-build")
if sys_argv[0].endswith("__main__.py"): # this is a python module
module_name = os.path.basename(os.path.dirname(sys_argv[0]))
cmds = [sys.executable, "-m", module_name] + sys_argv[1:]
elif sys.platform.startswith("win32"):
cmds = ['"' + sys.executable + '"', '"' + sys_argv[0] + '"'] + sys_argv[1:]
else:
cmds = [sys.executable] + sys_argv
print(f"Command: {cmds}", flush=True)
return os.execv(sys.executable, cmds)
@routes.get("/v2/manager/version")
async def get_version(request):
return web.Response(text=core.version_str, status=200)
async def _confirm_try_install(sender, custom_node_url, msg):
json_obj = await core.get_data_by_mode("default", "custom-node-list.json")
sender = manager_util.sanitize_tag(sender)
msg = manager_util.sanitize_tag(msg)
target = core.lookup_customnode_by_url(json_obj, custom_node_url)
if target is not None:
PromptServer.instance.send_sync(
"cm-api-try-install-customnode",
{"sender": sender, "target": target, "msg": msg},
)
else:
logging.error(
f"[ComfyUI Manager API] Failed to try install - Unknown custom node url '{custom_node_url}'"
)
def confirm_try_install(sender, custom_node_url, msg):
asyncio.run(_confirm_try_install(sender, custom_node_url, msg))
cm_global.register_api("cm.try-install-custom-node", confirm_try_install)
async def default_cache_update():
core.refresh_channel_dict()
channel_url = core.get_config()["channel_url"]
async def get_cache(filename):
try:
if core.get_config()["default_cache_as_channel_url"]:
uri = f"{channel_url}/{filename}"
else:
uri = f"{core.DEFAULT_CHANNEL}/{filename}"
cache_uri = str(manager_util.simple_hash(uri)) + "_" + filename
cache_uri = os.path.join(manager_util.cache_dir, cache_uri)
json_obj = await manager_util.get_data(uri, True)
with manager_util.cache_lock:
with open(cache_uri, "w", encoding="utf-8") as file:
json.dump(json_obj, file, indent=4, sort_keys=True)
logging.info(f"[ComfyUI-Manager] default cache updated: {uri}")
except Exception as e:
logging.error(
f"[ComfyUI-Manager] Failed to perform initial fetching '{filename}': {e}"
)
traceback.print_exc()
if (
core.get_config()["network_mode"] != "offline"
and not manager_util.is_manager_pip_package()
):
a = get_cache("custom-node-list.json")
b = get_cache("extension-node-map.json")
c = get_cache("model-list.json")
d = get_cache("alter-list.json")
e = get_cache("github-stats.json")
await asyncio.gather(a, b, c, d, e)
if core.get_config()["network_mode"] == "private":
logging.info(
"[ComfyUI-Manager] The private comfyregistry is not yet supported in `network_mode=private`."
)
else:
# load at least once
await core.unified_manager.reload("remote", dont_wait=False)
await core.unified_manager.get_custom_nodes(channel_url, "remote")
else:
await core.unified_manager.reload(
"remote", dont_wait=False, update_cnr_map=False
)
logging.info("[ComfyUI-Manager] All startup tasks have been completed.")
threading.Thread(target=lambda: asyncio.run(default_cache_update())).start()
if not os.path.exists(context.manager_config_path):
core.get_config()
core.write_config()
cm_global.register_extension(
"ComfyUI-Manager",
{
"version": core.version,
"name": "ComfyUI Manager",
"nodes": {},
"description": "This extension provides the ability to manage custom nodes in ComfyUI.",
},
)