Dr.Lt.Data 2866193baf ● feat: Draft pip package policy management system (not yet integrated)
Add comprehensive pip dependency conflict resolution framework as draft implementation. This is self-contained and does not affect existing
ComfyUI Manager functionality.

Key components:
- pip_util.py with PipBatch class for policy-driven package management
- Lazy-loaded policy system supporting base + user overrides
- Multi-stage policy execution (uninstall → apply_first_match → apply_all_matches → restore)
- Conditional policies based on platform, installed packages, and ComfyUI version
- Comprehensive test suite covering edge cases, workflows, and platform scenarios
- Design and implementation documentation

Policy capabilities (draft):
- Package replacement (e.g., PIL → Pillow, opencv-python → opencv-contrib-python)
- Version pinning to prevent dependency conflicts
- Dependency protection during installations
- Platform-specific handling (Linux/Windows, GPU detection)
- Pre-removal and post-restoration workflows

Testing infrastructure:
- Pytest-based test suite with isolated environments
- Dependency analysis tools for conflict detection
- Coverage for policy priority, edge cases, and environment recovery

Status: Draft implementation complete, integration with manager workflows pending.
2025-10-04 08:55:59 +09:00

630 lines
24 KiB
Python

"""
pip_util - Policy-based pip package management system
This module provides a policy-based approach to pip package installation
to minimize dependency conflicts and protect existing installed packages.
Usage:
# Batch operations (policy auto-loaded)
with PipBatch() as batch:
batch.ensure_not_installed()
batch.install("numpy>=1.20")
batch.install("pandas>=2.0")
batch.install("scipy>=1.7")
batch.ensure_installed()
"""
import json
import logging
import platform
import re
import subprocess
from pathlib import Path
from typing import Dict, List, Optional, Tuple
from packaging.requirements import Requirement
from packaging.specifiers import SpecifierSet
from packaging.version import Version
from . import manager_util, context
logger = logging.getLogger(__name__)
# Global policy cache (lazy loaded on first access)
_pip_policy_cache: Optional[Dict] = None
def get_pip_policy() -> Dict:
"""
Get pip policy with lazy loading.
Returns the cached policy if available, otherwise loads it from files.
This function automatically loads the policy on first access.
Thread safety: This function is NOT thread-safe.
Ensure single-threaded access during initialization.
Returns:
Dictionary of merged pip policies
Example:
>>> policy = get_pip_policy()
>>> numpy_policy = policy.get("numpy", {})
"""
global _pip_policy_cache
# Return cached policy if already loaded
if _pip_policy_cache is not None:
logger.debug("Returning cached pip policy")
return _pip_policy_cache
logger.info("Loading pip policies...")
# Load base policy
base_policy = {}
base_policy_path = Path(manager_util.comfyui_manager_path) / "pip-policy.json"
try:
if base_policy_path.exists():
with open(base_policy_path, 'r', encoding='utf-8') as f:
base_policy = json.load(f)
logger.debug(f"Loaded base policy from {base_policy_path}")
else:
logger.warning(f"Base policy file not found: {base_policy_path}")
except json.JSONDecodeError as e:
logger.error(f"Failed to parse base policy JSON: {e}")
base_policy = {}
except Exception as e:
logger.warning(f"Failed to read base policy file: {e}")
base_policy = {}
# Load user policy
user_policy = {}
user_policy_path = Path(context.manager_files_path) / "pip-policy.user.json"
try:
if user_policy_path.exists():
with open(user_policy_path, 'r', encoding='utf-8') as f:
user_policy = json.load(f)
logger.debug(f"Loaded user policy from {user_policy_path}")
else:
# Create empty user policy file
user_policy_path.parent.mkdir(parents=True, exist_ok=True)
with open(user_policy_path, 'w', encoding='utf-8') as f:
json.dump({"_comment": "User-specific pip policy overrides"}, f, indent=2)
logger.info(f"Created empty user policy file: {user_policy_path}")
except json.JSONDecodeError as e:
logger.warning(f"Failed to parse user policy JSON: {e}")
user_policy = {}
except Exception as e:
logger.warning(f"Failed to read user policy file: {e}")
user_policy = {}
# Merge policies (package-level override: user completely replaces base per package)
merged_policy = base_policy.copy()
for package_name, package_policy in user_policy.items():
if package_name.startswith("_"): # Skip metadata fields like _comment
continue
merged_policy[package_name] = package_policy # Complete package replacement
# Store in global cache
_pip_policy_cache = merged_policy
logger.info(f"Policy loaded successfully: {len(_pip_policy_cache)} package policies")
return _pip_policy_cache
class PipBatch:
"""
Pip package installation batch manager.
Maintains pip freeze cache during a batch of operations for performance optimization.
Usage pattern:
# Batch operations (policy auto-loaded)
with PipBatch() as batch:
batch.ensure_not_installed()
batch.install("numpy>=1.20")
batch.install("pandas>=2.0")
batch.install("scipy>=1.7")
batch.ensure_installed()
Attributes:
_installed_cache: Cache of installed packages from pip freeze
"""
def __init__(self):
"""Initialize PipBatch with empty cache."""
self._installed_cache: Optional[Dict[str, str]] = None
def __enter__(self):
"""Enter context manager."""
return self
def __exit__(self, exc_type, exc_val, exc_tb):
"""Exit context manager and clear cache."""
self._installed_cache = None
return False
def _refresh_installed_cache(self) -> None:
"""
Refresh the installed packages cache by executing pip freeze.
Parses pip freeze output into a dictionary of {package_name: version}.
Ignores editable packages and comments.
Raises:
No exceptions raised - failures result in empty cache with warning log
"""
try:
cmd = manager_util.make_pip_cmd(["freeze"])
result = subprocess.run(cmd, capture_output=True, text=True, check=True)
packages = {}
for line in result.stdout.strip().split('\n'):
line = line.strip()
# Skip empty lines
if not line:
continue
# Skip editable packages (-e /path/to/package or -e git+https://...)
# Editable packages don't have version info and are typically development-only
if line.startswith('-e '):
continue
# Skip comments (defensive: pip freeze typically doesn't output comments,
# but this handles manually edited requirements.txt or future pip changes)
if line.startswith('#'):
continue
# Parse package==version
if '==' in line:
try:
package_name, version = line.split('==', 1)
packages[package_name.strip()] = version.strip()
except ValueError:
logger.warning(f"Failed to parse pip freeze line: {line}")
continue
self._installed_cache = packages
logger.debug(f"Refreshed installed packages cache: {len(packages)} packages")
except subprocess.CalledProcessError as e:
logger.warning(f"pip freeze failed: {e}")
self._installed_cache = {}
except Exception as e:
logger.warning(f"Failed to refresh installed packages cache: {e}")
self._installed_cache = {}
def _get_installed_packages(self) -> Dict[str, str]:
"""
Get cached installed packages, refresh if cache is None.
Returns:
Dictionary of {package_name: version}
"""
if self._installed_cache is None:
self._refresh_installed_cache()
return self._installed_cache
def _invalidate_cache(self) -> None:
"""
Invalidate the installed packages cache.
Should be called after install/uninstall operations.
"""
self._installed_cache = None
def _parse_package_spec(self, package_info: str) -> Tuple[str, Optional[str]]:
"""
Parse package spec string into package name and version spec using PEP 508.
Uses the packaging library to properly parse package specifications according to
PEP 508 standard, which handles complex cases like extras and multiple version
constraints that simple regex cannot handle correctly.
Args:
package_info: Package specification like "numpy", "numpy==1.26.0", "numpy>=1.20.0",
or complex specs like "package[extra]>=1.0,<2.0"
Returns:
Tuple of (package_name, version_spec)
Examples: ("numpy", "==1.26.0"), ("pandas", ">=2.0.0"), ("scipy", None)
Package names are normalized (e.g., "NumPy" -> "numpy")
Raises:
ValueError: If package_info cannot be parsed according to PEP 508
Example:
>>> batch._parse_package_spec("numpy>=1.20")
("numpy", ">=1.20")
>>> batch._parse_package_spec("requests[security]>=2.0,<3.0")
("requests", ">=2.0,<3.0")
"""
try:
req = Requirement(package_info)
package_name = req.name # Normalized package name
version_spec = str(req.specifier) if req.specifier else None
return package_name, version_spec
except Exception as e:
raise ValueError(f"Invalid package spec: {package_info}") from e
def _evaluate_condition(self, condition: Optional[Dict], package_name: str,
installed_packages: Dict[str, str]) -> bool:
"""
Evaluate policy condition and return whether it's satisfied.
Args:
condition: Policy condition object (dict) or None
package_name: Current package being processed
installed_packages: Dictionary of {package_name: version}
Returns:
True if condition is satisfied, False otherwise
None condition always returns True
Example:
>>> condition = {"type": "installed", "package": "numpy", "spec": ">=1.20"}
>>> batch._evaluate_condition(condition, "numba", {"numpy": "1.26.0"})
True
"""
# No condition means always satisfied
if condition is None:
return True
condition_type = condition.get("type")
if condition_type == "installed":
# Check if a package is installed with optional version spec
target_package = condition.get("package", package_name)
installed_version = installed_packages.get(target_package)
# Package not installed
if installed_version is None:
return False
# Check version spec if provided
spec = condition.get("spec")
if spec:
try:
specifier = SpecifierSet(spec)
return Version(installed_version) in specifier
except Exception as e:
logger.warning(f"Failed to compare version {installed_version} with spec {spec}: {e}")
return False
# Package is installed (no spec check)
return True
elif condition_type == "platform":
# Check platform conditions (os, has_gpu, comfyui_version)
conditions_met = True
# Check OS
if "os" in condition:
expected_os = condition["os"].lower()
actual_os = platform.system().lower()
if expected_os not in actual_os and actual_os not in expected_os:
conditions_met = False
# Check GPU availability
if "has_gpu" in condition:
expected_gpu = condition["has_gpu"]
try:
import torch
has_gpu = torch.cuda.is_available()
except ImportError:
has_gpu = False
if expected_gpu != has_gpu:
conditions_met = False
# Check ComfyUI version
if "comfyui_version" in condition:
# TODO: Implement ComfyUI version check
logger.warning("ComfyUI version condition not yet implemented")
return conditions_met
else:
logger.warning(f"Unknown condition type: {condition_type}")
return False
def install(self, package_info: str, extra_index_url: Optional[str] = None,
override_policy: bool = False) -> bool:
"""
Install a pip package with policy-based modifications.
Args:
package_info: Package specification (e.g., "numpy", "numpy==1.26.0", "numpy>=1.20.0")
extra_index_url: Additional package repository URL (optional)
override_policy: If True, skip policy application and install directly (default: False)
Returns:
True if installation succeeded, False if skipped by policy
Raises:
ValueError: If package_info cannot be parsed
subprocess.CalledProcessError: If installation fails (depending on policy on_failure settings)
Example:
>>> with PipBatch() as batch:
... batch.install("numpy>=1.20")
... batch.install("torch", override_policy=True)
"""
# Parse package spec
try:
package_name, version_spec = self._parse_package_spec(package_info)
except ValueError as e:
logger.error(f"Invalid package spec: {e}")
raise
# Get installed packages cache
installed_packages = self._get_installed_packages()
# Override policy - skip to direct installation
if override_policy:
logger.info(f"Installing {package_info} (policy override)")
cmd = manager_util.make_pip_cmd(["install", package_info])
if extra_index_url:
cmd.extend(["--extra-index-url", extra_index_url])
try:
subprocess.run(cmd, check=True)
self._invalidate_cache()
logger.info(f"Successfully installed {package_info}")
return True
except subprocess.CalledProcessError as e:
logger.error(f"Failed to install {package_info}: {e}")
raise
# Get policy (lazy loading)
pip_policy = get_pip_policy()
policy = pip_policy.get(package_name, {})
# If no policy, proceed with default installation
if not policy:
logger.debug(f"No policy found for {package_name}, proceeding with default installation")
cmd = manager_util.make_pip_cmd(["install", package_info])
if extra_index_url:
cmd.extend(["--extra-index-url", extra_index_url])
try:
subprocess.run(cmd, check=True)
self._invalidate_cache()
logger.info(f"Successfully installed {package_info}")
return True
except subprocess.CalledProcessError as e:
logger.error(f"Failed to install {package_info}: {e}")
raise
# Apply apply_first_match policies (exclusive - first match only)
final_package_info = package_info
final_extra_index_url = extra_index_url
policy_reason = None
apply_first_match = policy.get("apply_first_match", [])
for policy_item in apply_first_match:
condition = policy_item.get("condition")
if self._evaluate_condition(condition, package_name, installed_packages):
policy_type = policy_item.get("type")
if policy_type == "skip":
reason = policy_item.get("reason", "No reason provided")
logger.info(f"Skipping installation of {package_name}: {reason}")
return False
elif policy_type == "force_version":
forced_version = policy_item.get("version")
final_package_info = f"{package_name}=={forced_version}"
policy_reason = policy_item.get("reason")
if "extra_index_url" in policy_item:
final_extra_index_url = policy_item["extra_index_url"]
logger.info(f"Force version for {package_name}: {forced_version} ({policy_reason})")
break # First match only
elif policy_type == "replace":
replacement = policy_item.get("replacement")
replacement_version = policy_item.get("version", "")
if replacement_version:
final_package_info = f"{replacement}{replacement_version}"
else:
final_package_info = replacement
policy_reason = policy_item.get("reason")
if "extra_index_url" in policy_item:
final_extra_index_url = policy_item["extra_index_url"]
logger.info(f"Replacing {package_name} with {final_package_info}: {policy_reason}")
break # First match only
# Apply apply_all_matches policies (cumulative - all matches)
additional_packages = []
pinned_packages = []
pin_on_failure = "fail"
apply_all_matches = policy.get("apply_all_matches", [])
for policy_item in apply_all_matches:
condition = policy_item.get("condition")
if self._evaluate_condition(condition, package_name, installed_packages):
policy_type = policy_item.get("type")
if policy_type == "pin_dependencies":
pin_list = policy_item.get("pinned_packages", [])
for pkg in pin_list:
installed_version = installed_packages.get(pkg)
if installed_version:
pinned_packages.append(f"{pkg}=={installed_version}")
else:
logger.warning(f"Cannot pin {pkg}: not currently installed")
pin_on_failure = policy_item.get("on_failure", "fail")
reason = policy_item.get("reason", "")
logger.info(f"Pinning dependencies: {pinned_packages} ({reason})")
elif policy_type == "install_with":
additional = policy_item.get("additional_packages", [])
additional_packages.extend(additional)
reason = policy_item.get("reason", "")
logger.info(f"Installing additional packages: {additional} ({reason})")
elif policy_type == "warn":
message = policy_item.get("message", "")
allow_continue = policy_item.get("allow_continue", True)
logger.warning(f"Policy warning for {package_name}: {message}")
if not allow_continue:
# TODO: Implement user confirmation
logger.info("User confirmation required (not implemented, continuing)")
# Build final package list
packages_to_install = [final_package_info] + pinned_packages + additional_packages
# Execute installation
cmd = manager_util.make_pip_cmd(["install"] + packages_to_install)
if final_extra_index_url:
cmd.extend(["--extra-index-url", final_extra_index_url])
try:
subprocess.run(cmd, check=True)
self._invalidate_cache()
if policy_reason:
logger.info(f"Successfully installed {final_package_info}: {policy_reason}")
else:
logger.info(f"Successfully installed {final_package_info}")
return True
except subprocess.CalledProcessError as e:
# Handle installation failure
if pinned_packages and pin_on_failure == "retry_without_pin":
logger.warning(f"Installation failed with pinned dependencies, retrying without pins")
retry_cmd = manager_util.make_pip_cmd(["install", final_package_info])
if final_extra_index_url:
retry_cmd.extend(["--extra-index-url", final_extra_index_url])
try:
subprocess.run(retry_cmd, check=True)
self._invalidate_cache()
logger.info(f"Successfully installed {final_package_info} (without pins)")
return True
except subprocess.CalledProcessError as retry_error:
logger.error(f"Retry installation also failed: {retry_error}")
raise
elif pin_on_failure == "fail":
logger.error(f"Installation failed: {e}")
raise
else:
logger.warning(f"Installation failed, but continuing: {e}")
return False
def ensure_not_installed(self) -> List[str]:
"""
Remove all packages matching uninstall policies (batch processing).
Iterates through all package policies and executes uninstall actions
where conditions are satisfied.
Returns:
List of removed package names
Example:
>>> with PipBatch() as batch:
... removed = batch.ensure_not_installed()
... print(f"Removed: {removed}")
"""
# Get policy (lazy loading)
pip_policy = get_pip_policy()
installed_packages = self._get_installed_packages()
removed_packages = []
for package_name, policy in pip_policy.items():
uninstall_policies = policy.get("uninstall", [])
for uninstall_policy in uninstall_policies:
condition = uninstall_policy.get("condition")
if self._evaluate_condition(condition, package_name, installed_packages):
target = uninstall_policy.get("target")
reason = uninstall_policy.get("reason", "No reason provided")
# Check if target is installed
if target in installed_packages:
try:
cmd = manager_util.make_pip_cmd(["uninstall", "-y", target])
subprocess.run(cmd, check=True)
logger.info(f"Uninstalled {target}: {reason}")
removed_packages.append(target)
# Remove from cache
del installed_packages[target]
except subprocess.CalledProcessError as e:
logger.warning(f"Failed to uninstall {target}: {e}")
# First match only per package
break
return removed_packages
def ensure_installed(self) -> List[str]:
"""
Restore all packages matching restore policies (batch processing).
Iterates through all package policies and executes restore actions
where conditions are satisfied.
Returns:
List of restored package names
Example:
>>> with PipBatch() as batch:
... batch.install("numpy>=1.20")
... restored = batch.ensure_installed()
... print(f"Restored: {restored}")
"""
# Get policy (lazy loading)
pip_policy = get_pip_policy()
installed_packages = self._get_installed_packages()
restored_packages = []
for package_name, policy in pip_policy.items():
restore_policies = policy.get("restore", [])
for restore_policy in restore_policies:
condition = restore_policy.get("condition")
if self._evaluate_condition(condition, package_name, installed_packages):
target = restore_policy.get("target")
version = restore_policy.get("version")
reason = restore_policy.get("reason", "No reason provided")
extra_index_url = restore_policy.get("extra_index_url")
# Check if target needs restoration
current_version = installed_packages.get(target)
if current_version is None or current_version != version:
try:
package_spec = f"{target}=={version}"
cmd = manager_util.make_pip_cmd(["install", package_spec])
if extra_index_url:
cmd.extend(["--extra-index-url", extra_index_url])
subprocess.run(cmd, check=True)
logger.info(f"Restored {package_spec}: {reason}")
restored_packages.append(target)
# Update cache
installed_packages[target] = version
except subprocess.CalledProcessError as e:
logger.warning(f"Failed to restore {target}: {e}")
# First match only per package
break
return restored_packages