Skip to content

Core SDK: sync/async workflow duplication, retry thundering herd, and silent exception suppression #1553

@MervinPraison

Description

@MervinPraison

Summary

In-depth analysis of src/praisonai-agents/praisonaiagents identified three high-impact architectural gaps in the core SDK that affect reliability, multi-agent scalability, and debuggability. Each gap has been validated against the source code with specific file paths and line numbers.


Gap 1: workflow() / aworkflow() duplication with feature-parity drift

Problem

Process.workflow() (lines 912–1462, ~550 LOC) and Process.aworkflow() (lines 426–793, ~370 LOC) in process/process.py are near-identical copies with only sync/async differences. This violates the DRY principle and has already caused two concrete feature-parity gaps:

1. Sync workflow() is missing timeout enforcement

aworkflow() enforces workflow_timeout (lines 430, 467–471):

# process/process.py:430
workflow_start = time.monotonic()  # For timeout enforcement

# process/process.py:467-471
if self.workflow_timeout is not None:
    elapsed = time.monotonic() - workflow_start
    if elapsed > self.workflow_timeout:
        logging.warning(f"Workflow timeout ({self.workflow_timeout}s) exceeded ...")
        self.workflow_cancelled = True
        break

But workflow() (line 942 onward) has no timeout logic at all. A sync workflow with workflow_timeout=60 will run forever — the parameter is silently ignored.

2. Async aworkflow() is missing start-task loop/CSV handling

workflow() has ~100 lines of CSV/file parsing for loop-type start tasks (lines 962–1059) that create subtasks from input files. aworkflow() explicitly acknowledges this with a TODO:

# process/process.py:458
# TODO: start task with loop feature is not available in aworkflow method

This means any async workflow with a loop-type start task that reads from a CSV file will silently skip task creation.

3. Same duplication in hierarchical methods

hierarchical() (lines 1470–1574) and ahierarchical() (lines 801–910) are also near-identical copies. Same for _get_manager_instructions_with_fallback() / _get_manager_instructions_with_fallback_async() (lines 319–365 / 289–317). Additionally, the sync workflow() task-reset logic (lines 1311–1332) runs without the async state lock that aworkflow() uses (lines 642–664):

# aworkflow() — protected by async lock (lines 642-644):
lock = await self._get_state_lock()
async with lock:
    if self.tasks[task_id].status == "completed":
        ...

# workflow() — NO lock protection (lines 1312-1313):
if self.tasks[task_id].status == "completed":
    # Direct mutation, no synchronization
    ...

Resolution

Extract the shared workflow logic into a single private method that accepts a strategy parameter (sync vs async), or use an internal async-first implementation with a sync wrapper using asyncio.run(). The key shared logic is:

  • Workflow relationship building (previous_tasks wiring)
  • Start task discovery
  • Loop/decision routing
  • Task reset logic
  • Timeout enforcement
  • Validation feedback propagation

This would eliminate ~500 lines of duplication and ensure feature parity automatically.


Gap 2: LLM retry backoff has no jitter — thundering herd in multi-agent setups

Problem

The error classifier's get_retry_delay() in llm/error_classifier.py returns deterministic delays with no randomization:

# llm/error_classifier.py — get_retry_delay()
def get_retry_delay(category: ErrorCategory, attempt: int = 1, base_delay: float = 1.0) -> float:
    attempt = max(1, attempt)
    if not should_retry(category):
        return 0
    if category == ErrorCategory.RATE_LIMIT:
        return min(base_delay * (3 ** attempt), 60.0)      # Always exactly 3.0, 9.0, 27.0, 60.0
    elif category == ErrorCategory.CONTEXT_LIMIT:
        return base_delay * 0.5                              # Always exactly 0.5
    elif category == ErrorCategory.TRANSIENT:
        return min(base_delay * (2 ** attempt), 30.0)       # Always exactly 2.0, 4.0, 8.0, 16.0, 30.0
    return 0

Both _call_with_retry() (line 718) and _call_with_retry_async() (line 807) in llm/llm.py consume these deterministic delays directly:

# llm/llm.py:798-800 (sync path)
if self._rate_limiter is not None:
    self._rate_limiter.wait_for_retry(retry_delay)
else:
    time.sleep(retry_delay)

# llm/llm.py:874-877 (async path)
if self._rate_limiter is not None:
    await self._rate_limiter.wait_for_retry_async(retry_delay)
else:
    await asyncio.sleep(retry_delay)

Impact: When N agents running in a multi-agent workflow all hit a rate limit at the same time (common with shared API keys), they all compute the exact same retry delay (e.g., 3.0s) and retry simultaneously — causing another rate limit, then all wait 9.0s, retry again simultaneously, etc. This is the classic thundering herd problem and can cause cascading failures that exhaust all retry attempts without ever succeeding.

Resolution

Add jitter to get_retry_delay(). The standard approach is "full jitter" as recommended by AWS:

import random

def get_retry_delay(category: ErrorCategory, attempt: int = 1, base_delay: float = 1.0) -> float:
    attempt = max(1, attempt)
    if not should_retry(category):
        return 0
    
    if category == ErrorCategory.RATE_LIMIT:
        max_delay = min(base_delay * (3 ** attempt), 60.0)
    elif category == ErrorCategory.CONTEXT_LIMIT:
        return base_delay * 0.5  # No jitter needed — not a contention issue
    elif category == ErrorCategory.TRANSIENT:
        max_delay = min(base_delay * (2 ** attempt), 30.0)
    else:
        return 0
    
    # Full jitter: uniform random between 0 and computed max
    return random.uniform(0, max_delay)

This ensures agents desynchronize their retries naturally. No changes needed in the callers (_call_with_retry / _call_with_retry_async).


Gap 3: Silent exception suppression in task execution pipeline hides failures

Problem

task/task.py systematically catches and suppresses exceptions in the task execution pipeline, storing them in a non_fatal_errors list that callers must explicitly check but have no mechanism to enforce. This creates invisible failures that are extremely difficult to debug in production.

Three escalating levels of suppression:

Level 1 — store_in_memory() drops exceptions entirely (lines 637–639):

# task/task.py:623-639
def store_in_memory(self, content: str, agent_name: str = None, task_id: str = None):
    if self.memory:
        try:
            self.memory.store_long_term(text=content, metadata={...})
        except Exception as e:
            logger.error(f"Task {self.id}: Failed to store content in memory: {e}")
            logger.exception(e)
            # Exception logged but NOT added to non_fatal_errors
            # Caller has ZERO visibility that storage failed

Unlike the execute_callback path, this method doesn't even append to non_fatal_errors. The failure is completely invisible to the orchestrator.

Level 2 — execute_callback() catches all memory + callback errors (lines 732–758):

# task/task.py:732-737 (memory operations)
except Exception as e:
    error_msg = f"memory operations: {e}"
    self.non_fatal_errors.append(error_msg)
    logger.error(f"Task {self.id}: Failed to process memory operations: {e}")
    logger.exception(e)
    # Continue execution even if memory operations fail

# task/task.py:747-751 (user callbacks)
except Exception as e:
    error_msg = f"callback: {e}"
    self.non_fatal_errors.append(error_msg)
    logger.error(f"Task {self.id}: Failed to execute callback: {e}")
    logger.exception(e)

The task reports "success" even when the user's callback throws. The commented-out TODO at lines 754–756 acknowledges this:

# TODO: Consider raising if callback is marked as critical
# if getattr(self, 'callback_critical', False):
#     raise

Level 3 — _verify_memory_ready() uses bare except to mask config errors (lines 618–621):

# task/task.py:618-621
except Exception:
    # If any error occurs during readiness check, consider memory not ready
    return False

If memory configuration is wrong (e.g., missing ChromaDB, bad connection string), this silently returns False instead of surfacing the configuration error. The downstream effect: execute_callback skips all memory operations because _verify_memory_ready() said "not ready", and the user sees no error — just silently missing quality metrics and memory storage.

The non_fatal_errors list itself is problematic:

# output/models.py:28
non_fatal_errors: Optional[list[str]] = None

# main.py:680
non_fatal_errors: Optional[list[str]] = None

The field exists on TaskOutput but nothing in the framework checks it by default. Users must know to inspect task.result.non_fatal_errors after every task — there's no callback, no warning in the output, no configurable policy.

Resolution

  1. Add a fail_on_error policy to Task that controls whether suppressed errors should raise:
class Task:
    def __init__(self, ..., fail_on_callback_error: bool = False, fail_on_memory_error: bool = False):
        self.fail_on_callback_error = fail_on_callback_error
        self.fail_on_memory_error = fail_on_memory_error
  1. Make store_in_memory() consistent — append to non_fatal_errors like execute_callback does:
def store_in_memory(self, content: str, ...):
    if self.memory:
        try:
            self.memory.store_long_term(...)
        except Exception as e:
            error_msg = f"store_in_memory: {e}"
            self.non_fatal_errors.append(error_msg)
            logger.error(...)
            if self.fail_on_memory_error:
                raise
  1. Surface non_fatal_errors in task output — emit a warning via the hooks system when non_fatal_errors is non-empty so the orchestrator and user are aware without having to poll:
if self.non_fatal_errors:
    task_output.non_fatal_errors = list(self.non_fatal_errors)
    # Emit hook so orchestrators can react
    if self._hook_runner:
        self._hook_runner.run("on_task_warnings", {
            "task_id": self.id,
            "warnings": self.non_fatal_errors
        })
  1. Make _verify_memory_ready() surface config errors instead of silently returning False:
def _verify_memory_ready(self) -> bool:
    if not self.memory:
        return False
    has_adapter = hasattr(self.memory, 'memory_adapter') and self.memory.memory_adapter is not None
    has_sqlite = hasattr(self.memory, '_sqlite_adapter') and self.memory._sqlite_adapter is not None
    if not (has_adapter or has_sqlite):
        logger.warning(f"Task {self.id}: Memory initialized but no adapter available — check memory configuration")
    return has_adapter or has_sqlite

Validation Summary

Gap File Lines Validated Impact
Sync workflow missing timeout process/process.py 942–1462 (missing vs 467–471) workflow_timeout param silently ignored in sync mode Workflows can hang forever
Async workflow missing loop/CSV process/process.py 458 (TODO) vs 962–1059 Confirmed by explicit TODO comment Async loop tasks silently skip CSV input
Sync workflow reset without lock process/process.py 1312 vs 642–644 Sync path has no state lock Race condition in concurrent task completion
Retry no jitter llm/error_classifier.py get_retry_delay() Deterministic: 3^attempt, 2^attempt Thundering herd exhausts retries
store_in_memory drops errors task/task.py 637–639 Not appended to non_fatal_errors Invisible data loss
execute_callback suppresses all task/task.py 732–751 Errors stored in list nobody checks User callbacks fail silently
_verify_memory_ready bare except task/task.py 618–621 Returns False on config errors Misconfiguration hidden

Metadata

Metadata

Assignees

No one assigned

    Labels

    bugSomething isn't workingclaudeAuto-trigger Claude analysisenhancementNew feature or request

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions