Summary
In-depth analysis of src/praisonai-agents/praisonaiagents identified three high-impact architectural gaps in the core SDK that affect reliability, multi-agent scalability, and debuggability. Each gap has been validated against the source code with specific file paths and line numbers.
Gap 1: workflow() / aworkflow() duplication with feature-parity drift
Problem
Process.workflow() (lines 912–1462, ~550 LOC) and Process.aworkflow() (lines 426–793, ~370 LOC) in process/process.py are near-identical copies with only sync/async differences. This violates the DRY principle and has already caused two concrete feature-parity gaps:
1. Sync workflow() is missing timeout enforcement
aworkflow() enforces workflow_timeout (lines 430, 467–471):
# process/process.py:430
workflow_start = time.monotonic() # For timeout enforcement
# process/process.py:467-471
if self.workflow_timeout is not None:
elapsed = time.monotonic() - workflow_start
if elapsed > self.workflow_timeout:
logging.warning(f"Workflow timeout ({self.workflow_timeout}s) exceeded ...")
self.workflow_cancelled = True
break
But workflow() (line 942 onward) has no timeout logic at all. A sync workflow with workflow_timeout=60 will run forever — the parameter is silently ignored.
2. Async aworkflow() is missing start-task loop/CSV handling
workflow() has ~100 lines of CSV/file parsing for loop-type start tasks (lines 962–1059) that create subtasks from input files. aworkflow() explicitly acknowledges this with a TODO:
# process/process.py:458
# TODO: start task with loop feature is not available in aworkflow method
This means any async workflow with a loop-type start task that reads from a CSV file will silently skip task creation.
3. Same duplication in hierarchical methods
hierarchical() (lines 1470–1574) and ahierarchical() (lines 801–910) are also near-identical copies. Same for _get_manager_instructions_with_fallback() / _get_manager_instructions_with_fallback_async() (lines 319–365 / 289–317). Additionally, the sync workflow() task-reset logic (lines 1311–1332) runs without the async state lock that aworkflow() uses (lines 642–664):
# aworkflow() — protected by async lock (lines 642-644):
lock = await self._get_state_lock()
async with lock:
if self.tasks[task_id].status == "completed":
...
# workflow() — NO lock protection (lines 1312-1313):
if self.tasks[task_id].status == "completed":
# Direct mutation, no synchronization
...
Resolution
Extract the shared workflow logic into a single private method that accepts a strategy parameter (sync vs async), or use an internal async-first implementation with a sync wrapper using asyncio.run(). The key shared logic is:
- Workflow relationship building (previous_tasks wiring)
- Start task discovery
- Loop/decision routing
- Task reset logic
- Timeout enforcement
- Validation feedback propagation
This would eliminate ~500 lines of duplication and ensure feature parity automatically.
Gap 2: LLM retry backoff has no jitter — thundering herd in multi-agent setups
Problem
The error classifier's get_retry_delay() in llm/error_classifier.py returns deterministic delays with no randomization:
# llm/error_classifier.py — get_retry_delay()
def get_retry_delay(category: ErrorCategory, attempt: int = 1, base_delay: float = 1.0) -> float:
attempt = max(1, attempt)
if not should_retry(category):
return 0
if category == ErrorCategory.RATE_LIMIT:
return min(base_delay * (3 ** attempt), 60.0) # Always exactly 3.0, 9.0, 27.0, 60.0
elif category == ErrorCategory.CONTEXT_LIMIT:
return base_delay * 0.5 # Always exactly 0.5
elif category == ErrorCategory.TRANSIENT:
return min(base_delay * (2 ** attempt), 30.0) # Always exactly 2.0, 4.0, 8.0, 16.0, 30.0
return 0
Both _call_with_retry() (line 718) and _call_with_retry_async() (line 807) in llm/llm.py consume these deterministic delays directly:
# llm/llm.py:798-800 (sync path)
if self._rate_limiter is not None:
self._rate_limiter.wait_for_retry(retry_delay)
else:
time.sleep(retry_delay)
# llm/llm.py:874-877 (async path)
if self._rate_limiter is not None:
await self._rate_limiter.wait_for_retry_async(retry_delay)
else:
await asyncio.sleep(retry_delay)
Impact: When N agents running in a multi-agent workflow all hit a rate limit at the same time (common with shared API keys), they all compute the exact same retry delay (e.g., 3.0s) and retry simultaneously — causing another rate limit, then all wait 9.0s, retry again simultaneously, etc. This is the classic thundering herd problem and can cause cascading failures that exhaust all retry attempts without ever succeeding.
Resolution
Add jitter to get_retry_delay(). The standard approach is "full jitter" as recommended by AWS:
import random
def get_retry_delay(category: ErrorCategory, attempt: int = 1, base_delay: float = 1.0) -> float:
attempt = max(1, attempt)
if not should_retry(category):
return 0
if category == ErrorCategory.RATE_LIMIT:
max_delay = min(base_delay * (3 ** attempt), 60.0)
elif category == ErrorCategory.CONTEXT_LIMIT:
return base_delay * 0.5 # No jitter needed — not a contention issue
elif category == ErrorCategory.TRANSIENT:
max_delay = min(base_delay * (2 ** attempt), 30.0)
else:
return 0
# Full jitter: uniform random between 0 and computed max
return random.uniform(0, max_delay)
This ensures agents desynchronize their retries naturally. No changes needed in the callers (_call_with_retry / _call_with_retry_async).
Gap 3: Silent exception suppression in task execution pipeline hides failures
Problem
task/task.py systematically catches and suppresses exceptions in the task execution pipeline, storing them in a non_fatal_errors list that callers must explicitly check but have no mechanism to enforce. This creates invisible failures that are extremely difficult to debug in production.
Three escalating levels of suppression:
Level 1 — store_in_memory() drops exceptions entirely (lines 637–639):
# task/task.py:623-639
def store_in_memory(self, content: str, agent_name: str = None, task_id: str = None):
if self.memory:
try:
self.memory.store_long_term(text=content, metadata={...})
except Exception as e:
logger.error(f"Task {self.id}: Failed to store content in memory: {e}")
logger.exception(e)
# Exception logged but NOT added to non_fatal_errors
# Caller has ZERO visibility that storage failed
Unlike the execute_callback path, this method doesn't even append to non_fatal_errors. The failure is completely invisible to the orchestrator.
Level 2 — execute_callback() catches all memory + callback errors (lines 732–758):
# task/task.py:732-737 (memory operations)
except Exception as e:
error_msg = f"memory operations: {e}"
self.non_fatal_errors.append(error_msg)
logger.error(f"Task {self.id}: Failed to process memory operations: {e}")
logger.exception(e)
# Continue execution even if memory operations fail
# task/task.py:747-751 (user callbacks)
except Exception as e:
error_msg = f"callback: {e}"
self.non_fatal_errors.append(error_msg)
logger.error(f"Task {self.id}: Failed to execute callback: {e}")
logger.exception(e)
The task reports "success" even when the user's callback throws. The commented-out TODO at lines 754–756 acknowledges this:
# TODO: Consider raising if callback is marked as critical
# if getattr(self, 'callback_critical', False):
# raise
Level 3 — _verify_memory_ready() uses bare except to mask config errors (lines 618–621):
# task/task.py:618-621
except Exception:
# If any error occurs during readiness check, consider memory not ready
return False
If memory configuration is wrong (e.g., missing ChromaDB, bad connection string), this silently returns False instead of surfacing the configuration error. The downstream effect: execute_callback skips all memory operations because _verify_memory_ready() said "not ready", and the user sees no error — just silently missing quality metrics and memory storage.
The non_fatal_errors list itself is problematic:
# output/models.py:28
non_fatal_errors: Optional[list[str]] = None
# main.py:680
non_fatal_errors: Optional[list[str]] = None
The field exists on TaskOutput but nothing in the framework checks it by default. Users must know to inspect task.result.non_fatal_errors after every task — there's no callback, no warning in the output, no configurable policy.
Resolution
- Add a
fail_on_error policy to Task that controls whether suppressed errors should raise:
class Task:
def __init__(self, ..., fail_on_callback_error: bool = False, fail_on_memory_error: bool = False):
self.fail_on_callback_error = fail_on_callback_error
self.fail_on_memory_error = fail_on_memory_error
- Make
store_in_memory() consistent — append to non_fatal_errors like execute_callback does:
def store_in_memory(self, content: str, ...):
if self.memory:
try:
self.memory.store_long_term(...)
except Exception as e:
error_msg = f"store_in_memory: {e}"
self.non_fatal_errors.append(error_msg)
logger.error(...)
if self.fail_on_memory_error:
raise
- Surface
non_fatal_errors in task output — emit a warning via the hooks system when non_fatal_errors is non-empty so the orchestrator and user are aware without having to poll:
if self.non_fatal_errors:
task_output.non_fatal_errors = list(self.non_fatal_errors)
# Emit hook so orchestrators can react
if self._hook_runner:
self._hook_runner.run("on_task_warnings", {
"task_id": self.id,
"warnings": self.non_fatal_errors
})
- Make
_verify_memory_ready() surface config errors instead of silently returning False:
def _verify_memory_ready(self) -> bool:
if not self.memory:
return False
has_adapter = hasattr(self.memory, 'memory_adapter') and self.memory.memory_adapter is not None
has_sqlite = hasattr(self.memory, '_sqlite_adapter') and self.memory._sqlite_adapter is not None
if not (has_adapter or has_sqlite):
logger.warning(f"Task {self.id}: Memory initialized but no adapter available — check memory configuration")
return has_adapter or has_sqlite
Validation Summary
| Gap |
File |
Lines |
Validated |
Impact |
| Sync workflow missing timeout |
process/process.py |
942–1462 (missing vs 467–471) |
workflow_timeout param silently ignored in sync mode |
Workflows can hang forever |
| Async workflow missing loop/CSV |
process/process.py |
458 (TODO) vs 962–1059 |
Confirmed by explicit TODO comment |
Async loop tasks silently skip CSV input |
| Sync workflow reset without lock |
process/process.py |
1312 vs 642–644 |
Sync path has no state lock |
Race condition in concurrent task completion |
| Retry no jitter |
llm/error_classifier.py |
get_retry_delay() |
Deterministic: 3^attempt, 2^attempt |
Thundering herd exhausts retries |
store_in_memory drops errors |
task/task.py |
637–639 |
Not appended to non_fatal_errors |
Invisible data loss |
execute_callback suppresses all |
task/task.py |
732–751 |
Errors stored in list nobody checks |
User callbacks fail silently |
_verify_memory_ready bare except |
task/task.py |
618–621 |
Returns False on config errors |
Misconfiguration hidden |
Summary
In-depth analysis of
src/praisonai-agents/praisonaiagentsidentified three high-impact architectural gaps in the core SDK that affect reliability, multi-agent scalability, and debuggability. Each gap has been validated against the source code with specific file paths and line numbers.Gap 1:
workflow()/aworkflow()duplication with feature-parity driftProblem
Process.workflow()(lines 912–1462, ~550 LOC) andProcess.aworkflow()(lines 426–793, ~370 LOC) inprocess/process.pyare near-identical copies with only sync/async differences. This violates the DRY principle and has already caused two concrete feature-parity gaps:1. Sync
workflow()is missing timeout enforcementaworkflow()enforcesworkflow_timeout(lines 430, 467–471):But
workflow()(line 942 onward) has no timeout logic at all. A sync workflow withworkflow_timeout=60will run forever — the parameter is silently ignored.2. Async
aworkflow()is missing start-task loop/CSV handlingworkflow()has ~100 lines of CSV/file parsing for loop-type start tasks (lines 962–1059) that create subtasks from input files.aworkflow()explicitly acknowledges this with a TODO:This means any async workflow with a loop-type start task that reads from a CSV file will silently skip task creation.
3. Same duplication in hierarchical methods
hierarchical()(lines 1470–1574) andahierarchical()(lines 801–910) are also near-identical copies. Same for_get_manager_instructions_with_fallback()/_get_manager_instructions_with_fallback_async()(lines 319–365 / 289–317). Additionally, the syncworkflow()task-reset logic (lines 1311–1332) runs without the async state lock thataworkflow()uses (lines 642–664):Resolution
Extract the shared workflow logic into a single private method that accepts a strategy parameter (sync vs async), or use an internal async-first implementation with a sync wrapper using
asyncio.run(). The key shared logic is:This would eliminate ~500 lines of duplication and ensure feature parity automatically.
Gap 2: LLM retry backoff has no jitter — thundering herd in multi-agent setups
Problem
The error classifier's
get_retry_delay()inllm/error_classifier.pyreturns deterministic delays with no randomization:Both
_call_with_retry()(line 718) and_call_with_retry_async()(line 807) inllm/llm.pyconsume these deterministic delays directly:Impact: When N agents running in a multi-agent workflow all hit a rate limit at the same time (common with shared API keys), they all compute the exact same retry delay (e.g., 3.0s) and retry simultaneously — causing another rate limit, then all wait 9.0s, retry again simultaneously, etc. This is the classic thundering herd problem and can cause cascading failures that exhaust all retry attempts without ever succeeding.
Resolution
Add jitter to
get_retry_delay(). The standard approach is "full jitter" as recommended by AWS:This ensures agents desynchronize their retries naturally. No changes needed in the callers (
_call_with_retry/_call_with_retry_async).Gap 3: Silent exception suppression in task execution pipeline hides failures
Problem
task/task.pysystematically catches and suppresses exceptions in the task execution pipeline, storing them in anon_fatal_errorslist that callers must explicitly check but have no mechanism to enforce. This creates invisible failures that are extremely difficult to debug in production.Three escalating levels of suppression:
Level 1 —
store_in_memory()drops exceptions entirely (lines 637–639):Unlike the
execute_callbackpath, this method doesn't even append tonon_fatal_errors. The failure is completely invisible to the orchestrator.Level 2 —
execute_callback()catches all memory + callback errors (lines 732–758):The task reports "success" even when the user's callback throws. The commented-out TODO at lines 754–756 acknowledges this:
Level 3 —
_verify_memory_ready()uses bare except to mask config errors (lines 618–621):If memory configuration is wrong (e.g., missing ChromaDB, bad connection string), this silently returns
Falseinstead of surfacing the configuration error. The downstream effect:execute_callbackskips all memory operations because_verify_memory_ready()said "not ready", and the user sees no error — just silently missing quality metrics and memory storage.The
non_fatal_errorslist itself is problematic:The field exists on
TaskOutputbut nothing in the framework checks it by default. Users must know to inspecttask.result.non_fatal_errorsafter every task — there's no callback, no warning in the output, no configurable policy.Resolution
fail_on_errorpolicy to Task that controls whether suppressed errors should raise:store_in_memory()consistent — append tonon_fatal_errorslikeexecute_callbackdoes:non_fatal_errorsin task output — emit a warning via the hooks system whennon_fatal_errorsis non-empty so the orchestrator and user are aware without having to poll:_verify_memory_ready()surface config errors instead of silently returningFalse:Validation Summary
process/process.pyworkflow_timeoutparam silently ignored in sync modeprocess/process.pyprocess/process.pyllm/error_classifier.pyget_retry_delay()3^attempt,2^attemptstore_in_memorydrops errorstask/task.pynon_fatal_errorsexecute_callbacksuppresses alltask/task.py_verify_memory_readybare excepttask/task.pyFalseon config errors