Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion projects/core/dsl/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,13 @@

from .task import task, retry, when, always
from .runtime import execute_tasks, clear_tasks
from .script_manager import get_script_manager, reset_script_manager
from . import shell
from . import toolbox
from . import template
from . import context

__all__ = ['always', 'clear_tasks', 'context', 'execute_tasks', 'retry', 'shell', 'task', 'template', 'when']
__all__ = [
'always', 'clear_tasks', 'context', 'execute_tasks', 'get_script_manager',
'reset_script_manager', 'retry', 'shell', 'task', 'template', 'toolbox', 'when'
]
21 changes: 19 additions & 2 deletions projects/core/dsl/log.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,26 @@

LINE_WIDTH = 80

# Configure logging to show info messages
def setup_clean_logger(name: str):
"""Set up logger that shows only the message without prefix"""
logger = logging.getLogger(name)
logger.setLevel(logging.INFO)

# Only configure if not already configured
if not logger.handlers:
# Create console handler with clean format
console_handler = logging.StreamHandler()
console_handler.setLevel(logging.INFO)
console_handler.setFormatter(logging.Formatter('%(message)s'))

logger.addHandler(console_handler)

logger.propagate = False # Don't propagate to root logger
return logger

# Configure clean logging for DSL operations
logging.basicConfig(level=logging.INFO, format='%(message)s')
logger = logging.getLogger(__name__)
logger = setup_clean_logger('DSL')


def log_task_header(task_name: str, task_doc: str, rel_filename: str, line_no: int):
Expand Down
3 changes: 2 additions & 1 deletion projects/core/dsl/runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,8 @@
from dataclasses import dataclass
from typing import Optional

logger = logging.getLogger(__name__)
logger = logging.getLogger('DSL')
logger.propagate = False # Don't show logger prefix

@dataclass
class CommandResult:
Expand Down
77 changes: 49 additions & 28 deletions projects/core/dsl/runtime.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,15 +10,15 @@
import os
from datetime import datetime
from pathlib import Path
from typing import List

import projects.core.library.env as env
from projects.core.library.run import SignalError
from .log import log_execution_banner, log_completion_banner, logger
from .context import create_task_parameters

# Import from task.py to avoid circular imports
from .task import _task_registry, _task_results, ConditionError, RetryFailure
from .task import ConditionError, RetryFailure
from .script_manager import get_script_manager


class TaskExecutionError(Exception):
Expand Down Expand Up @@ -66,6 +66,12 @@ def execute_tasks(function_args: dict = None):
filename = caller_frame.f_code.co_filename
command_name = _get_toolbox_function_name(filename)

# Get relative filename to match task registration
try:
rel_filename = str(Path(filename).relative_to(env.FORGE_HOME))
except ValueError:
rel_filename = filename

# Use NextArtifactDir for proper storage management
with env.NextArtifactDir(command_name) as artifact_dir:

Expand All @@ -91,16 +97,22 @@ def execute_tasks(function_args: dict = None):
_generate_execution_metadata(function_args, caller_frame, meta_dir)
_generate_restart_script(function_args, caller_frame, meta_dir)

# Separate normal tasks from always tasks
normal_tasks = [t for t in _task_registry if not t.get('always_execute', False)]
always_tasks = [t for t in _task_registry if t.get('always_execute', False)]
# Execute tasks only from the calling file
script_manager = get_script_manager()
file_tasks = list(script_manager.get_tasks_from_file(rel_filename))

if not file_tasks:
logger.error(f"No tasks found for file: {rel_filename}")
log_completion_banner(function_args, status="NO_TASKS")
raise RuntimeError(f"No tasks found for file: {rel_filename}")

execution_error = None

# Execute normal tasks
try:
for task_info in normal_tasks:
_execute_single_task(task_info, args, shared_context)
while file_tasks:
current_task_info = file_tasks.pop(0)
_execute_single_task(current_task_info, args, shared_context)
Comment on lines +100 to +114
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical

@always handling is broken in both execution phases.

The first loop never checks always_execute, so @always tasks run like normal tasks. After a failure, the second loop then runs every remaining task as if it were @always, and Line 139 calls _execute_single_task(task_info, ...) with an undefined name. A normal task failure will either execute non-@always tasks unexpectedly or crash with NameError.

🛠️ Suggested structure
-            file_tasks = list(script_manager.get_tasks_from_file(rel_filename))
-
-            if not file_tasks:
+            registered_tasks = list(script_manager.get_tasks_from_file(rel_filename))
+
+            if not registered_tasks:
                 logger.error(f"No tasks found for file: {rel_filename}")
                 log_completion_banner(function_args, status="NO_TASKS")
                 raise RuntimeError(f"No tasks found for file: {rel_filename}")
+
+            file_tasks = [task for task in registered_tasks if not task.get('always_execute')]
+            always_tasks = [task for task in registered_tasks if task.get('always_execute')]
@@
-            if file_tasks:
-                logging.warning("Executing the `@always` tasks ...")
-                while file_tasks:
+            if always_tasks:
+                logger.warning("Executing the `@always` tasks ...")
+                while always_tasks:
                     try:
-                        current_task_info = file_tasks.pop(0)
-                        _execute_single_task(task_info, args, shared_context)
+                        current_task_info = always_tasks.pop(0)
+                        _execute_single_task(current_task_info, args, shared_context)

Also applies to: 133-140

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@projects/core/dsl/runtime.py` around lines 100 - 114, The first execution
loop should skip tasks marked with always_execute and only run non-always tasks:
in the loop that pops from file_tasks (using
get_script_manager().get_tasks_from_file and variable current_task_info), if
current_task_info.always_execute is True then defer it (e.g., append to a
separate always_tasks list) instead of executing it; call
_execute_single_task(current_task_info, args, shared_context) only for
non-always tasks. On failure, the recovery loop must iterate over that
always_tasks list (not all remaining file_tasks) and call
_execute_single_task(task_info, args, shared_context) for each, ensuring the
loop variable is task_info (not the undefined name `task`) so no NameError
occurs.


except (KeyboardInterrupt, SignalError):
logger.info("")
logger.fatal("==> INTERRUPTED: Received KeyboardInterrupt (Ctrl+C)")
Expand All @@ -118,21 +130,25 @@ def execute_tasks(function_args: dict = None):
# Save error to re-raise after always tasks execute
execution_error = e

# Always execute "always" tasks
try:
for task_info in always_tasks:
_execute_single_task(task_info, args, shared_context)
except Exception as e:
# If normal tasks succeeded but always task failed, raise always task error
if execution_error is None:
raise
# If both failed, log always task error but preserve original error
logger.error(f"==> ALWAYS TASK ALSO FAILED: {e}")
logger.info("")
# Always execute "always" tasks from this file
if file_tasks:
logging.warning("Executing the @always tasks ...")
while file_tasks:
try:
current_task_info = file_tasks.pop(0)
_execute_single_task(task_info, args, shared_context)

except Exception as e:
# If normal tasks succeeded but always task failed, raise always task error
if execution_error is None:
execution_error = e

logger.error(f"==> ALWAYS TASK ALSO FAILED: {e}")
logger.info("")

# Re-raise original error if normal tasks failed
if execution_error:
log_completion_banner(function_args, status=f"COMPLETED ({execution_error.__class__.__name__})")
log_completion_banner(function_args, status=f"FAILED ({execution_error.__class__.__name__})")
raise execution_error

# Log completion banner if execution was successful
Expand All @@ -146,7 +162,7 @@ def execute_tasks(function_args: dict = None):
return shared_context
finally:
# Clean up the file handler to prevent leaks
dsl_logger = logging.getLogger('projects.core.dsl')
dsl_logger = logging.getLogger('DSL')
dsl_logger.removeHandler(file_handler)
file_handler.close()

Expand Down Expand Up @@ -235,11 +251,16 @@ def _execute_single_task(task_info, args, shared_context):
raise task_error


def clear_tasks():
"""Clear the task registry (useful for testing)"""
global _task_registry, _task_results
_task_registry = []
_task_results = {}
def clear_tasks(file_path=None):
"""
Clear the task registry (useful for testing)

Args:
file_path: If specified, only clear tasks from this file.
If None, clear all tasks from all files.
"""
script_manager = get_script_manager()
script_manager.clear_tasks(file_path)


def _generate_execution_metadata(function_args: dict, caller_frame, meta_dir):
Expand Down Expand Up @@ -287,8 +308,8 @@ def _setup_execution_logging(artifact_dir):
# Use same format as console output
file_handler.setFormatter(logging.Formatter('%(message)s'))

# Add handler to the parent DSL logger so all DSL modules inherit it
dsl_logger = logging.getLogger('projects.core.dsl')
# Add handler to the DSL logger so all DSL modules inherit it
dsl_logger = logging.getLogger('DSL')
dsl_logger.addHandler(file_handler)

return log_file, file_handler
Expand Down
147 changes: 147 additions & 0 deletions projects/core/dsl/script_manager.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,147 @@
"""
ScriptManager - Task registry and state management for DSL framework

Provides clean separation between task definitions and execution state.
"""

from typing import Dict, List, Optional
from pathlib import Path
import logging

logger = logging.getLogger('DSL')


class TaskResult:
"""Container for task results that can be referenced in conditions"""
def __init__(self, task_name: str):
self.task_name = task_name
self._result = None
self._executed = False

@property
def return_value(self):
"""Get the return value of the task"""
return self._result

def _set_result(self, result):
self._result = result
self._executed = True


class ScriptManager:
"""
Manages task registration and execution state per script file

Provides clean separation between task definitions and runtime state,
avoiding global variable pollution and enabling better testing/isolation.
"""

def __init__(self):
# Task registry organized by source file path
self._task_registry: Dict[str, List[dict]] = {}
# Task results organized by task name
self._task_results: Dict[str, TaskResult] = {}
Comment on lines +41 to +43
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Task results need a file-scoped key, not just task_name.

_task_results[task_name] collides as soon as two scripts define the same task function name. The later registration replaces the earlier TaskResult, so some_task.status.return_value can point at the wrong execution, and clear_tasks(source_file) can delete another file's status object.

Also applies to: 45-60, 81-107

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@projects/core/dsl/script_manager.py` around lines 41 - 43, The _task_results
dict currently keys only by task_name causing collisions across scripts; change
the keying to a file-scoped composite key (e.g. f"{source_file}:{task_name}" or
a tuple (source_file, task_name)) wherever TaskResult entries are created,
accessed, or deleted: update the registration logic that populates
self._task_results (refer to _task_registry and the method that registers
tasks), update lookups that read task results/status (wherever
_task_results[task_name] is used), and update clear_tasks(source_file) to remove
only entries whose composite key matches the given source_file; ensure
TaskResult creation, updates, and any status.return_value references use the new
composite key consistently to avoid cross-file replacement.


def register_task(self, task_info: dict, source_file: str) -> None:
"""
Register a task from a specific source file

Args:
task_info: Dictionary containing task metadata
source_file: Relative path to the source file defining the task
"""
if source_file not in self._task_registry:
self._task_registry[source_file] = []

self._task_registry[source_file].append(task_info)

# Create result container for this task
task_name = task_info['name']
self._task_results[task_name] = TaskResult(task_name)

logger.debug(f"Registered task '{task_name}' from {source_file}")

def get_task_result(self, task_name: str) -> Optional[TaskResult]:
"""Get the result container for a specific task"""
return self._task_results.get(task_name)


def get_tasks_from_file(self, source_file: str) -> List[dict]:
"""
Get tasks from a specific source file

Args:
source_file: Relative path to the source file

Returns:
List of task info dictionaries from that file
"""
return self._task_registry.get(source_file, [])

def clear_tasks(self, source_file: Optional[str] = None) -> None:
"""
Clear tasks from registry

Args:
source_file: If specified, only clear tasks from this file.
If None, clear all tasks from all files.
"""
if source_file is None:
# Clear all tasks from all files
logger.debug("Clearing all tasks from script manager")
self._task_registry.clear()
self._task_results.clear()
else:
# Clear tasks from specific file
if source_file in self._task_registry:
tasks_to_remove = self._task_registry[source_file]

# Clear task results for tasks from this file
for task_info in tasks_to_remove:
task_name = task_info['name']
if task_name in self._task_results:
del self._task_results[task_name]

# Remove tasks from this file
del self._task_registry[source_file]
logger.debug(f"Cleared {len(tasks_to_remove)} tasks from {source_file}")

def get_registry_summary(self) -> Dict[str, int]:
"""
Get a summary of the current task registry

Returns:
Dictionary mapping source files to task counts
"""
return {
file_path: len(tasks)
for file_path, tasks in self._task_registry.items()
}

def has_tasks(self) -> bool:
"""Check if any tasks are registered"""
return bool(self._task_registry)

def get_file_count(self) -> int:
"""Get the number of files with registered tasks"""
return len(self._task_registry)

def get_total_task_count(self) -> int:
"""Get the total number of registered tasks across all files"""
return sum(len(tasks) for tasks in self._task_registry.values())


# Global script manager instance
# This provides the interface while keeping state encapsulated
_script_manager = ScriptManager()


def get_script_manager() -> ScriptManager:
"""Get the global script manager instance"""
return _script_manager


def reset_script_manager() -> None:
"""Reset the global script manager (useful for testing)"""
global _script_manager
_script_manager = ScriptManager()
Loading
Loading