[refactor] Remove redundant ExecutionStatus NamedTuple

- Eliminate TaskQueue.ExecutionStatus NamedTuple in favor of generated TaskExecutionStatus Pydantic model
- Remove manual conversion logic between NamedTuple and Pydantic model
- Use single source of truth for task execution status
- Clean up unused imports (Literal, NamedTuple)
- Maintain consistent data model usage throughout TaskQueue
This commit is contained in:
bymyself 2025-06-13 14:42:00 -07:00
parent 1d575fb654
commit a59e6e176e

View File

@ -23,7 +23,7 @@ import urllib.request
import uuid
import zipfile
from datetime import datetime
from typing import Any, Dict, List, Literal, NamedTuple, Optional
from typing import Any, Dict, List, Optional
import folder_paths
import latent_preview
@ -211,10 +211,6 @@ class TaskQueue:
self.batch_start_time = None
self.batch_state_before = None
class ExecutionStatus(NamedTuple):
status_str: Literal["success", "error", "skip"]
completed: bool
messages: List[str]
def get_current_state(self) -> TaskStateMessage:
return TaskStateMessage(
@ -294,7 +290,7 @@ class TaskQueue:
item: QueueTaskItem,
task_index: int,
result_msg: str,
status: Optional["TaskQueue.ExecutionStatus"] = None,
status: Optional[TaskExecutionStatus] = None,
) -> None:
"""Mark task as completed and add to history"""
@ -308,14 +304,6 @@ class TaskQueue:
if len(self.history_tasks) > MAXIMUM_HISTORY_SIZE:
self.history_tasks.pop(next(iter(self.history_tasks)))
# Convert TaskQueue.ExecutionStatus to TaskExecutionStatus Pydantic model
pydantic_status: Optional[TaskExecutionStatus] = None
if status is not None:
pydantic_status = TaskExecutionStatus(
status_str=status.status_str,
completed=status.completed,
messages=status.messages
)
# Update history
self.history_tasks[item.ui_id] = TaskHistoryItem(
@ -324,7 +312,7 @@ class TaskQueue:
timestamp=datetime.fromisoformat(timestamp),
result=result_msg,
kind=item.kind,
status=pydantic_status,
status=status,
)
# Force cache refresh for successful pack-modifying operations
@ -869,7 +857,7 @@ async def task_worker():
except Exception:
msg = f"Exception: {(kind, item)}"
await task_queue.task_done(
item, task_index, msg, TaskQueue.ExecutionStatus("error", True, [msg])
item, task_index, msg, TaskExecutionStatus(status_str="error", completed=True, messages=[msg])
)
return
@ -881,11 +869,11 @@ async def task_worker():
# Determine status
if result_msg == "success":
status = TaskQueue.ExecutionStatus("success", True, [])
status = TaskExecutionStatus(status_str="success", completed=True, messages=[])
elif result_msg == "skip":
status = TaskQueue.ExecutionStatus("skip", True, [])
status = TaskExecutionStatus(status_str="skip", completed=True, messages=[])
else:
status = TaskQueue.ExecutionStatus("error", True, [result_msg])
status = TaskExecutionStatus(status_str="error", completed=True, messages=[result_msg])
await task_queue.task_done(item, task_index, result_msg, status)