[feat] Add client_id support to task queue system

- Add client_id field to QueueTaskItem and TaskHistoryItem models
- Implement client-specific WebSocket message routing
- Add client filtering to queue status and history endpoints
- Follow ComfyUI patterns for session management
- Create data_models package for better code organization
This commit is contained in:
bymyself 2025-06-06 15:54:28 -07:00
parent 3870abfd2d
commit 601f1bf452
10 changed files with 1309 additions and 916 deletions

View File

@ -0,0 +1,26 @@
"""
Data models for ComfyUI Manager.
This package contains Pydantic models used throughout the ComfyUI Manager
for data validation, serialization, and type safety.
"""
from .task_queue import (
QueueTaskItem,
TaskHistoryItem,
TaskStateMessage,
MessageTaskDone,
MessageTaskStarted,
MessageUpdate,
ManagerMessageName,
)
__all__ = [
"QueueTaskItem",
"TaskHistoryItem",
"TaskStateMessage",
"MessageTaskDone",
"MessageTaskStarted",
"MessageUpdate",
"ManagerMessageName",
]

View File

@ -0,0 +1,69 @@
"""
Task queue data models for ComfyUI Manager.
Contains Pydantic models for task queue management, WebSocket messaging,
and task state tracking.
"""
from typing import Optional, Union, Dict
from enum import Enum
from pydantic import BaseModel
class QueueTaskItem(BaseModel):
"""Represents a task item in the queue."""
ui_id: str
client_id: str
kind: str
class TaskHistoryItem(BaseModel):
"""Represents a completed task in the history."""
ui_id: str
client_id: str
kind: str
timestamp: str
result: str
status: Optional[dict] = None
class TaskStateMessage(BaseModel):
"""Current state of the task queue system."""
history: Dict[str, TaskHistoryItem]
running_queue: list[QueueTaskItem]
pending_queue: list[QueueTaskItem]
class MessageTaskDone(BaseModel):
"""WebSocket message sent when a task completes."""
ui_id: str
result: str
kind: str
status: Optional[dict]
timestamp: str
state: TaskStateMessage
class MessageTaskStarted(BaseModel):
"""WebSocket message sent when a task starts."""
ui_id: str
kind: str
timestamp: str
state: TaskStateMessage
# Union type for all possible WebSocket message updates
MessageUpdate = Union[MessageTaskDone, MessageTaskStarted]
class ManagerMessageName(Enum):
"""WebSocket message type constants."""
TASK_DONE = "cm-task-completed"
TASK_STARTED = "cm-task-started"
STATUS = "cm-queue-status"

View File

@ -0,0 +1,39 @@
from comfy.cli_args import args
SECURITY_MESSAGE_MIDDLE_OR_BELOW = "ERROR: To use this action, a security_level of `middle or below` is required. Please contact the administrator.\nReference: https://github.com/ltdrdata/ComfyUI-Manager#security-policy"
SECURITY_MESSAGE_NORMAL_MINUS = "ERROR: To use this feature, you must either set '--listen' to a local IP and set the security level to 'normal-' or lower, or set the security level to 'middle' or 'weak'. Please contact the administrator.\nReference: https://github.com/ltdrdata/ComfyUI-Manager#security-policy"
SECURITY_MESSAGE_GENERAL = "ERROR: This installation is not allowed in this security_level. Please contact the administrator.\nReference: https://github.com/ltdrdata/ComfyUI-Manager#security-policy"
SECURITY_MESSAGE_NORMAL_MINUS_MODEL = "ERROR: Downloading models that are not in '.safetensors' format is only allowed for models registered in the 'default' channel at this security level. If you want to download this model, set the security level to 'normal-' or lower."
def is_loopback(address):
import ipaddress
try:
return ipaddress.ip_address(address).is_loopback
except ValueError:
return False
is_local_mode = is_loopback(args.listen)
model_dir_name_map = {
"checkpoints": "checkpoints",
"checkpoint": "checkpoints",
"unclip": "checkpoints",
"text_encoders": "text_encoders",
"clip": "text_encoders",
"vae": "vae",
"lora": "loras",
"t2i-adapter": "controlnet",
"t2i-style": "controlnet",
"controlnet": "controlnet",
"clip_vision": "clip_vision",
"gligen": "gligen",
"upscale": "upscale_models",
"embedding": "embeddings",
"embeddings": "embeddings",
"unet": "diffusion_models",
"diffusion_model": "diffusion_models",
}

File diff suppressed because it is too large Load Diff

View File

View File

@ -0,0 +1,141 @@
import os
import git
import logging
import traceback
from comfyui_manager.common import context, manager_util
import folder_paths
from comfy.cli_args import args
import latent_preview
from comfyui_manager.glob import manager_core as core
from comfyui_manager.common import cm_global
comfy_ui_hash = "-"
comfyui_tag = None
def print_comfyui_version():
global comfy_ui_hash
global comfyui_tag
is_detached = False
try:
repo = git.Repo(os.path.dirname(folder_paths.__file__))
core.comfy_ui_revision = len(list(repo.iter_commits("HEAD")))
comfy_ui_hash = repo.head.commit.hexsha
cm_global.variables["comfyui.revision"] = core.comfy_ui_revision
core.comfy_ui_commit_datetime = repo.head.commit.committed_datetime
cm_global.variables["comfyui.commit_datetime"] = core.comfy_ui_commit_datetime
is_detached = repo.head.is_detached
current_branch = repo.active_branch.name
comfyui_tag = context.get_comfyui_tag()
try:
if (
not os.environ.get("__COMFYUI_DESKTOP_VERSION__")
and core.comfy_ui_commit_datetime.date()
< core.comfy_ui_required_commit_datetime.date()
):
logging.warning(
f"\n\n## [WARN] ComfyUI-Manager: Your ComfyUI version ({core.comfy_ui_revision})[{core.comfy_ui_commit_datetime.date()}] is too old. Please update to the latest version. ##\n\n"
)
except Exception:
pass
# process on_revision_detected -->
if "cm.on_revision_detected_handler" in cm_global.variables:
for k, f in cm_global.variables["cm.on_revision_detected_handler"]:
try:
f(core.comfy_ui_revision)
except Exception:
logging.error(f"[ERROR] '{k}' on_revision_detected_handler")
traceback.print_exc()
del cm_global.variables["cm.on_revision_detected_handler"]
else:
logging.warning(
"[ComfyUI-Manager] Some features are restricted due to your ComfyUI being outdated."
)
# <--
if current_branch == "master":
if comfyui_tag:
logging.info(
f"### ComfyUI Version: {comfyui_tag} | Released on '{core.comfy_ui_commit_datetime.date()}'"
)
else:
logging.info(
f"### ComfyUI Revision: {core.comfy_ui_revision} [{comfy_ui_hash[:8]}] | Released on '{core.comfy_ui_commit_datetime.date()}'"
)
else:
if comfyui_tag:
logging.info(
f"### ComfyUI Version: {comfyui_tag} on '{current_branch}' | Released on '{core.comfy_ui_commit_datetime.date()}'"
)
else:
logging.info(
f"### ComfyUI Revision: {core.comfy_ui_revision} on '{current_branch}' [{comfy_ui_hash[:8]}] | Released on '{core.comfy_ui_commit_datetime.date()}'"
)
except Exception:
if is_detached:
logging.info(
f"### ComfyUI Revision: {core.comfy_ui_revision} [{comfy_ui_hash[:8]}] *DETACHED | Released on '{core.comfy_ui_commit_datetime.date()}'"
)
else:
logging.info(
"### ComfyUI Revision: UNKNOWN (The currently installed ComfyUI is not a Git repository)"
)
def set_preview_method(method):
if method == "auto":
args.preview_method = latent_preview.LatentPreviewMethod.Auto
elif method == "latent2rgb":
args.preview_method = latent_preview.LatentPreviewMethod.Latent2RGB
elif method == "taesd":
args.preview_method = latent_preview.LatentPreviewMethod.TAESD
else:
args.preview_method = latent_preview.LatentPreviewMethod.NoPreviews
core.get_config()["preview_method"] = method
def set_update_policy(mode):
core.get_config()["update_policy"] = mode
def set_db_mode(mode):
core.get_config()["db_mode"] = mode
def setup_environment():
git_exe = core.get_config()["git_exe"]
if git_exe != "":
git.Git().update_environment(GIT_PYTHON_GIT_EXECUTABLE=git_exe)
def initialize_environment():
context.comfy_path = os.path.dirname(folder_paths.__file__)
core.js_path = os.path.join(context.comfy_path, "web", "extensions")
local_db_model = os.path.join(manager_util.comfyui_manager_path, "model-list.json")
local_db_alter = os.path.join(manager_util.comfyui_manager_path, "alter-list.json")
local_db_custom_node_list = os.path.join(
manager_util.comfyui_manager_path, "custom-node-list.json"
)
local_db_extension_node_mappings = os.path.join(
manager_util.comfyui_manager_path, "extension-node-map.json"
)
set_preview_method(core.get_config()["preview_method"])
environment_utils.print_comfyui_version()
setup_environment()
core.check_invalid_nodes()

View File

@ -0,0 +1,21 @@
import locale
import sys
def handle_stream(stream, prefix):
stream.reconfigure(encoding=locale.getpreferredencoding(), errors="replace")
for msg in stream:
if (
prefix == "[!]"
and ("it/s]" in msg or "s/it]" in msg)
and ("%|" in msg or "it [" in msg)
):
if msg.startswith("100%"):
print("\r" + msg, end="", file=sys.stderr),
else:
print("\r" + msg[:-1], end="", file=sys.stderr),
else:
if prefix == "[!]":
print(prefix, msg, end="", file=sys.stderr)
else:
print(prefix, msg, end="")

View File

@ -0,0 +1,73 @@
import os
import logging
import folder_paths
from comfyui_manager.glob import manager_core as core
def get_model_dir(data, show_log=False):
if "download_model_base" in folder_paths.folder_names_and_paths:
models_base = folder_paths.folder_names_and_paths["download_model_base"][0][0]
else:
models_base = folder_paths.models_dir
# NOTE: Validate to prevent path traversal.
if any(char in data["filename"] for char in {"/", "\\", ":"}):
return None
def resolve_custom_node(save_path):
save_path = save_path[13:] # remove 'custom_nodes/'
# NOTE: Validate to prevent path traversal.
if save_path.startswith(os.path.sep) or ":" in save_path:
return None
repo_name = save_path.replace("\\", "/").split("/")[
0
] # get custom node repo name
# NOTE: The creation of files within the custom node path should be removed in the future.
repo_path = core.lookup_installed_custom_nodes_legacy(repo_name)
if repo_path is not None and repo_path[0]:
# Returns the retargeted path based on the actually installed repository
return os.path.join(os.path.dirname(repo_path[1]), save_path)
else:
return None
if data["save_path"] != "default":
if ".." in data["save_path"] or data["save_path"].startswith("/"):
if show_log:
logging.info(
f"[WARN] '{data['save_path']}' is not allowed path. So it will be saved into 'models/etc'."
)
base_model = os.path.join(models_base, "etc")
else:
if data["save_path"].startswith("custom_nodes"):
base_model = resolve_custom_node(data["save_path"])
if base_model is None:
if show_log:
logging.info(
f"[ComfyUI-Manager] The target custom node for model download is not installed: {data['save_path']}"
)
return None
else:
base_model = os.path.join(models_base, data["save_path"])
else:
model_dir_name = model_dir_name_map.get(data["type"].lower())
if model_dir_name is not None:
base_model = folder_paths.folder_names_and_paths[model_dir_name][0][0]
else:
base_model = os.path.join(models_base, "etc")
return base_model
def get_model_path(data, show_log=False):
base_model = get_model_dir(data, show_log)
if base_model is None:
return None
else:
if data["filename"] == "<huggingface>":
return os.path.join(base_model, os.path.basename(data["url"]))
else:
return os.path.join(base_model, data["filename"])

View File

@ -0,0 +1,65 @@
import concurrent.futures
from comfyui_manager.glob import manager_core as core
def check_state_of_git_node_pack(
node_packs, do_fetch=False, do_update_check=True, do_update=False
):
if do_fetch:
print("Start fetching...", end="")
elif do_update:
print("Start updating...", end="")
elif do_update_check:
print("Start update check...", end="")
def process_custom_node(item):
core.check_state_of_git_node_pack_single(
item, do_fetch, do_update_check, do_update
)
with concurrent.futures.ThreadPoolExecutor(4) as executor:
for k, v in node_packs.items():
if v.get("active_version") in ["unknown", "nightly"]:
executor.submit(process_custom_node, v)
if do_fetch:
print("\x1b[2K\rFetching done.")
elif do_update:
update_exists = any(
item.get("updatable", False) for item in node_packs.values()
)
if update_exists:
print("\x1b[2K\rUpdate done.")
else:
print("\x1b[2K\rAll extensions are already up-to-date.")
elif do_update_check:
print("\x1b[2K\rUpdate check done.")
def nickname_filter(json_obj):
preemptions_map = {}
for k, x in json_obj.items():
if "preemptions" in x[1]:
for y in x[1]["preemptions"]:
preemptions_map[y] = k
elif k.endswith("/ComfyUI"):
for y in x[0]:
preemptions_map[y] = k
updates = {}
for k, x in json_obj.items():
removes = set()
for y in x[0]:
k2 = preemptions_map.get(y)
if k2 is not None and k != k2:
removes.add(y)
if len(removes) > 0:
updates[k] = [y for y in x[0] if y not in removes]
for k, v in updates.items():
json_obj[k][0] = v
return json_obj

View File

@ -0,0 +1,42 @@
from comfyui_manager.glob import manager_core as core
def is_allowed_security_level(level):
if level == "block":
return False
elif level == "high":
if is_local_mode:
return core.get_config()["security_level"] in ["weak", "normal-"]
else:
return core.get_config()["security_level"] == "weak"
elif level == "middle":
return core.get_config()["security_level"] in ["weak", "normal", "normal-"]
else:
return True
async def get_risky_level(files, pip_packages):
json_data1 = await core.get_data_by_mode("local", "custom-node-list.json")
json_data2 = await core.get_data_by_mode(
"cache",
"custom-node-list.json",
channel_url="https://raw.githubusercontent.com/ltdrdata/ComfyUI-Manager/main",
)
all_urls = set()
for x in json_data1["custom_nodes"] + json_data2["custom_nodes"]:
all_urls.update(x.get("files", []))
for x in files:
if x not in all_urls:
return "high"
all_pip_packages = set()
for x in json_data1["custom_nodes"] + json_data2["custom_nodes"]:
all_pip_packages.update(x.get("pip", []))
for p in pip_packages:
if p not in all_pip_packages:
return "block"
return "middle"