From 363c90f4e3b723bdd1a20e105f1aae0cff1043ed Mon Sep 17 00:00:00 2001 From: Christopher Anderson Date: Wed, 24 Sep 2025 17:50:24 +1000 Subject: [PATCH] Added subgraph support for Widget2Str --- nodes/nodes.py | 268 ++++++++++++++++++++++++++++++++++++++++--------- 1 file changed, 223 insertions(+), 45 deletions(-) diff --git a/nodes/nodes.py b/nodes/nodes.py index afc793f..663b55b 100644 --- a/nodes/nodes.py +++ b/nodes/nodes.py @@ -804,6 +804,7 @@ The choices are loaded from 'custom_dimensions.json' in the nodes folder. return (latent, int(width), int(height),) +# noinspection PyShadowingNames class WidgetToString: @classmethod def IS_CHANGED(cls,*,id,node_title,any_input,**kwargs): @@ -819,11 +820,11 @@ class WidgetToString: "return_all": ("BOOLEAN", {"default": False}), }, "optional": { - "any_input": (IO.ANY, ), - "node_title": ("STRING", {"multiline": False}), - "allowed_float_decimals": ("INT", {"default": 2, "min": 0, "max": 10, "tooltip": "Number of decimal places to display for float values"}), - - }, + "any_input": (IO.ANY, ), + "node_title": ("STRING", {"multiline": False}), + "allowed_float_decimals": ("INT", {"default": 2, "min": 0, "max": 10, "tooltip": "Number of decimal places to display for float values"}), + + }, "hidden": {"extra_pnginfo": "EXTRA_PNGINFO", "prompt": "PROMPT", "unique_id": "UNIQUE_ID",}, @@ -842,54 +843,231 @@ The 'any_input' is required for making sure the node you want the value from exi """ def get_widget_value(self, id, widget_name, extra_pnginfo, prompt, unique_id, return_all=False, any_input=None, node_title="", allowed_float_decimals=2): + """ + Retrieves the value of the specified widget from a node in the workflow and + returns it as a string. + + If no `id` or `node_title` is provided, the method attempts to identify the + node using the `any_input` connection in the workflow. Enable node ID display + in ComfyUI's "Manager" menu to view node IDs, or use a manually edited node + title for searching. NOTE: A node does not have a title unless it is manually + edited to something other than its default value. + + Args: + id (int): The unique ID of the target node. If 0, the method relies on + other methods to determine the node. TODO: change to a STRING (breaking change) + widget_name (str): The name of the widget whose value needs to be retrieved. + extra_pnginfo (dict): A dictionary containing workflow metadata, including + node connections and state. + prompt (dict): A dictionary containing node-specific data with input + settings to extract widget values. + unique_id (str): The unique identifier of the current node instance, used + to match the `any_input` connection. + return_all (bool): If True, retrieves and returns all input values from + the node, formatted as a string. + any_input (str): Optional. A link reference used to determine the node if + no `id` or `node_title` is provided. + node_title (str): Optional. The title of the node to search for. Titles + are valid only if manually assigned in ComfyUI. + allowed_float_decimals (int): The number of decimal places to which float + values should be rounded in the output. + + Returns: + str or tuple: + - If `return_all` is False, returns a tuple with the value of the + specified widget. + - If `return_all` is True, returns a formatted string containing all + input values for the node. + + Raises: + ValueError: If no matching node is found for the given `id`, `node_title`, + or `any_input`. + NameError: If the specified widget does not exist in the identified node. + """ workflow = extra_pnginfo["workflow"] - #print(json.dumps(workflow, indent=4)) results = [] - node_id = None # Initialize node_id to handle cases where no match is found - link_id = None + target_full_node_id = None # string like "5", "5:1", "5:9:6" + active_link_id = None + + # Normalize incoming ids which may be lists/tuples (e.g., ["7:9:14", 0]) + def normalize_any_id(value): + # If list/tuple, take the first element which should be the id/path + if isinstance(value, (list, tuple)) and value: + value = value[0] + # Convert ints to str + if isinstance(value, int): + return str(value) + # Pass through strings; None -> empty + return value if isinstance(value, str) else "" + + id_str = normalize_any_id(id) + unique_id_str = normalize_any_id(unique_id) + + # Map of (scope_key, link_id) -> full_node_id + # scope_key: '' for top-level, or the subgraph instance path for nested nodes (e.g., '5', '5:9') link_to_node_map = {} - for node in workflow["nodes"]: - if node_title: - if "title" in node: - if node["title"] == node_title: - node_id = node["id"] - break - else: - print("Node title not found.") - elif id != 0: - if node["id"] == id: - node_id = id - break - elif any_input is not None: - if node["type"] == "WidgetToString" and node["id"] == int(unique_id) and not link_id: - for node_input in node["inputs"]: - if node_input["name"] == "any_input": - link_id = node_input["link"] - - # Construct a map of links to node IDs for future reference - node_outputs = node.get("outputs", None) - if not node_outputs: + # Build a map of subgraph id -> definition for quick lookup + defs = workflow.get("definitions", {}) or {} + subgraph_defs = {sg.get("id"): sg for sg in (defs.get("subgraphs", []) or []) if sg.get("id")} + + # Helper: register output links -> node map (scoped) + def register_links(scope_key, node_obj, full_node_id): + outputs = node_obj.get("outputs") or [] + for out in outputs: + links = out.get("links") + if not links: continue - for output in node_outputs: - node_links = output.get("links", None) - if not node_links: - continue - for link in node_links: - link_to_node_map[link] = node["id"] - if link_id and link == link_id: - break - - if link_id: - node_id = link_to_node_map.get(link_id, None) + if isinstance(links, list): + for lid in links: + if lid is None: + continue + link_to_node_map[(scope_key, lid)] = full_node_id - if node_id is None: - raise ValueError("No matching node found for the given title or id") + # Recursive emitter for a subgraph instance + # instance_path: the full path to this subgraph instance (e.g., '5' or '5:9') + def emit_subgraph_instance(sub_def, instance_path): + for snode in (sub_def.get("nodes") or []): + child_id = str(snode.get("id")) + full_id = f"{instance_path}:{child_id}" + # Yield the node with the scope of this subgraph instance + yield full_id, instance_path, snode + # If this node itself is a subgraph instance, recurse + stype = snode.get("type") + nested_def = subgraph_defs.get(stype) + if nested_def is not None: + nested_instance_path = full_id # e.g., '5:9' + for inner in emit_subgraph_instance(nested_def, nested_instance_path): + yield inner + + # Master iterator: yields all nodes with their full_node_id and scope + def iter_all_nodes(): + # 1) Top-level nodes + for node in workflow.get("nodes", []): + full_node_id = str(node.get("id")) + scope_key = "" # top-level link id space + yield full_node_id, scope_key, node + + # 2) If a top-level node is an instance of a subgraph, emit its internal nodes + ntype = node.get("type") + sg_def = subgraph_defs.get(ntype) + if sg_def is not None: + instance_path = full_node_id # e.g., '5' + for item in emit_subgraph_instance(sg_def, instance_path): + yield item + + # Helpers for id/unique_id handling + def match_id_with_fullness(candidate_full_id, requested_id): + # Exact match if the request is fully qualified + if ":" in requested_id: + return candidate_full_id == requested_id + # Otherwise, allow exact top-level id or ending with ":child" + return candidate_full_id == requested_id or candidate_full_id.endswith(f":{requested_id}") + + def parent_scope_of(full_id): + parts = full_id.split(":") + return ":".join(parts[:-1]) if len(parts) > 1 else "" + + def resolve_scope_from_unique_id(u_str): + # Fully qualified: everything before the last segment is the scope + if ":" in u_str: + return parent_scope_of(u_str) + + # Not qualified: try to infer from prompt keys by suffix + suffix = f":{u_str}" + matches = [k for k in prompt.keys() if isinstance(k, str) and k.endswith(suffix)] + matches = list(dict.fromkeys(matches)) # dedupe + if len(matches) == 1: + return parent_scope_of(matches[0]) + elif len(matches) == 0: + return None + else: + raise ValueError( + f"Ambiguous unique_id '{u_str}'. Multiple subgraph instances match. " + f"Use a fully qualified id like 'parentPath:{u_str}' (e.g., '5:9:{u_str}')." + ) + + # First: build a complete list of nodes and the scoped link map + all_nodes = [] + for full_node_id, scope_key, node in iter_all_nodes(): + all_nodes.append((full_node_id, scope_key, node)) + register_links(scope_key, node, full_node_id) + + # Try title or id first + if node_title: + for full_node_id, _, node in all_nodes: + if "title" in node and node.get("title") == node_title: + target_full_node_id = full_node_id + break + # If title matched, do not attempt any_input fallback + any_input = None + elif id_str not in ("", "0"): + matches = [fid for fid, _, _ in all_nodes if match_id_with_fullness(fid, id_str)] + if len(matches) > 1 and ":" not in id_str and any(m != id_str for m in matches): + raise ValueError( + f"Ambiguous id '{id_str}'. Multiple nodes match across (nested) subgraphs. " + f"Use a fully qualified id like '5:9:{id_str}'." + ) + target_full_node_id = matches[0] if matches else None + + # Resolve via any_input + unique_id if still not found + if target_full_node_id is None and any_input is not None and unique_id_str: + # If unique_id is fully qualified, select that exact node + wts_full_id = None + if ":" in unique_id_str: + for fid, _, node in all_nodes: + if fid == unique_id_str and node.get("type") == "WidgetToString": + wts_full_id = fid + break + if wts_full_id is None: + raise ValueError(f"No WidgetToString found for unique_id '{unique_id_str}'") + found_scope_key = parent_scope_of(wts_full_id) + else: + # Infer scope from prompt keys when unqualified + found_scope_key = resolve_scope_from_unique_id(unique_id_str) + candidates = [] + if found_scope_key: + candidates.append(f"{found_scope_key}:{unique_id_str}") + else: + candidates.append(unique_id_str) + + for fid, scope_key, node in all_nodes: + if node.get("type") == "WidgetToString" and fid in candidates: + wts_full_id = fid + if not found_scope_key: + found_scope_key = parent_scope_of(fid) + break + + if wts_full_id is None: + raise ValueError(f"No WidgetToString found for unique_id '{unique_id_str}'") + + # With the WidgetToString located, read its any_input link id + wts_node = next(node for fid, _, node in all_nodes if fid == wts_full_id) + for node_input in (wts_node.get("inputs") or []): + if node_input.get("name") == "any_input": + active_link_id = node_input.get("link") + break + + if active_link_id is None: + raise ValueError(f"WidgetToString '{wts_full_id}' has no 'any_input' link") + + # Resolve the producer of that link within the correct scope + target_full_node_id = link_to_node_map.get((found_scope_key or "", active_link_id)) + if target_full_node_id is None: + raise ValueError( + f"Could not resolve link {active_link_id} in scope '{found_scope_key}'. " + f"The subgraph clone’s links may not have been discovered." + ) + + if target_full_node_id is None: + raise ValueError("No matching node found for the given title, id, or any_input") + + values = prompt.get(str(target_full_node_id)) + if not values: + raise ValueError(f"No prompt entry found for node id: {target_full_node_id}") - values = prompt[str(node_id)] if "inputs" in values: if return_all: - # Format items based on type formatted_items = [] for k, v in values["inputs"].items(): if isinstance(v, float): @@ -906,7 +1084,7 @@ The 'any_input' is required for making sure the node you want the value from exi v = str(v) return (v, ) else: - raise NameError(f"Widget not found: {node_id}.{widget_name}") + raise NameError(f"Widget not found: {target_full_node_id}.{widget_name}") return (', '.join(results).strip(', '), ) class DummyOut: