diff --git a/README.md b/README.md index 835798b..ef5abcf 100644 --- a/README.md +++ b/README.md @@ -1,6 +1,6 @@
Unix-style pipelines for MCP tools — compose complex tool workflows as single pipeline requests
+Unix-style pipelines for MCP tools -compose complex tool workflows as single pipeline requests
Introduction · @@ -14,7 +14,7 @@ ## Introduction -Model Context Shell is a system that lets AI agents compose [MCP](https://modelcontextprotocol.io/) tool calls similar to Unix shell scripting. Instead of the agent orchestrating each tool call individually (loading all intermediate data into context), agents can express complex workflows as pipelines that execute server-side. +Model Context Shell lets AI agents compose [MCP](https://modelcontextprotocol.io/) tool calls using something like Unix shell scripting. Instead of the agent orchestrating each tool call individually (loading all intermediate data into context), it can express a workflow as a pipeline that executes server-side. For example, an agent can express a multi-step workflow as a single pipeline: @@ -23,11 +23,11 @@ flowchart LR A["Fetch users (MCP)"] --> B["Extract profile URLs (Shell)"] --> C["for_each (Shell)"] --> C1["Fetch profile (MCP)"] --> D["Filter and sort (Shell)"] ``` -This pipeline fetches a list, extracts URLs, fetches each one, filters the results, and returns only the final output to the agent — no intermediate data in context. +This pipeline fetches a list, extracts URLs, fetches each one, filters the results, and returns only the final output to the agent. No intermediate data in context. ### Why this matters -[MCP](https://modelcontextprotocol.io/) is great — standardized interfaces, structured data, extensible ecosystem. But for complex workflows, the agent has to orchestrate each tool call individually, loading all intermediate results into context. Model Context Shell adds a pipeline layer — the agent sends a single pipeline, and the server coordinates the tools, returning only the final result: +[MCP](https://modelcontextprotocol.io/) is great, but for complex workflows the agent has to orchestrate each tool call individually, loading all intermediate results into context. Model Context Shell adds a pipeline layer: the agent sends a single pipeline, and the server coordinates the tools, returning only the final result: ```mermaid flowchart LR @@ -60,27 +60,27 @@ flowchart LR Example query: "List all Pokemon over 50 kg that have the chlorophyll ability" -Instead of 7+ separate tool calls loading all Pokemon data into context, the agent constructed a single pipeline that: +Instead of 7+ separate tool calls loading all Pokemon data into context, the agent built a single pipeline that: - Fetched the ability data - Extracted Pokemon URLs - Fetched each Pokemon's details (7 API calls) - Filtered by weight and formatted the results -**Result**: Only the final answer is loaded into context — no intermediate API responses. +Only the final answer is loaded into context, not the intermediate API responses. -In practice, agents don't construct the perfect pipeline on the first try. They typically run a few exploratory queries first to understand the shape of the data before building the final pipeline. To keep this process fast and cheap, the server includes a preview stage powered by [headson](https://github.com/kantord/headson) that returns a compact structural summary of the data — enough for the agent to plan its transformations without loading the full dataset into context. +In practice, agents don't get the pipeline right on the first try. They typically run a few exploratory queries to understand the shape of the data before building the final pipeline. To keep this fast and cheap, the server includes a preview stage powered by [headson](https://github.com/kantord/headson) that returns a compact structural summary of the data, enough for the agent to plan its transformations without loading the full dataset into context. ### Design -Agents already have access to full shell environments and can call any CLI tool, which has significant overlap with what MCP tools provide. Rather than duplicating that, Model Context Shell explores whether similar workflows can be achieved in a safer, simpler MCP-native environment. Patterns like parallel map-reduce over tool call results are not common today because MCP doesn't natively support them, but they seem like a natural fit for coordinating tool calls — imagine fetching all console errors via a Chrome DevTools MCP server and creating a separate GitHub issue for each one. A system tailored to these patterns can make them first-class operations. +Agents already have access to full shell environments and can call any CLI tool, which overlaps a lot with what MCP tools provide. Rather than duplicating that, Model Context Shell tries to achieve similar workflows in a safer, simpler MCP-native environment. Patterns like parallel map-reduce over tool call results are uncommon today because MCP doesn't natively support them, but they're a natural fit for coordinating tool calls -imagine fetching all console errors via a Chrome DevTools MCP server and creating a separate GitHub issue for each one. -The execution engine works with JSON pipeline definitions directly — agents construct pipelines from the MCP tool schema alone, without needing shell syntax. Commands are never passed through a shell interpreter; each command and its arguments are passed as separate elements to the underlying process (`shell=False`), eliminating shell injection risks entirely. Data flows between stages as JSON, preserving types through the pipeline rather than reducing everything to strings. MCP tool arguments are validated against their JSON Schema by the receiving server, giving agents type-checked feedback when they construct pipelines incorrectly. +The execution engine works with JSON pipeline definitions directly. Agents construct pipelines from the MCP tool schema alone, without needing shell syntax. Commands are never passed through a shell interpreter; each command and its arguments are passed as separate elements to the underlying process (`shell=False`), so there's no shell injection. Data flows between stages as JSON, preserving types through the pipeline rather than reducing everything to strings. -The result is a more constrained system compared to a general-purpose shell — only a fixed set of data transformation commands is available, and all execution happens inside a container. +It's more constrained than a general-purpose shell: only a fixed set of data transformation commands is available, and all execution happens inside a container. ### How it works -Model Context Shell is packaged as an MCP server, which makes it easy to use with any agent that supports the protocol. It could also be packaged as a library built directly into an agent. +Model Context Shell is packaged as an MCP server, so any agent that supports the protocol can use it. It could also be packaged as a library built directly into an agent. The server exposes four tools to the agent via MCP: @@ -89,7 +89,7 @@ The server exposes four tools to the agent via MCP: | `execute_pipeline` | Execute a pipeline of tool calls and shell commands | | `list_all_tools` | Discover all tools available from MCP servers via [ToolHive](https://stacklok.com/download/) | | `get_tool_details` | Get the full schema and description for a specific tool | -| `list_available_shell_commands` | Show the whitelist of allowed CLI commands | +| `list_available_shell_commands` | Show the allowlist of CLI commands | The agent constructs pipelines as JSON arrays of stages. Data flows from one stage to the next, similar to Unix pipes. There are three stage types: @@ -98,7 +98,7 @@ The agent constructs pipelines as JSON arrays of stages. Data flows from one sta {"type": "tool", "name": "fetch", "server": "fetch", "args": {"url": "https://..."}} ``` -**Command stages** transform data using whitelisted shell commands: +**Command stages** transform data using allowed shell commands: ```json {"type": "command", "command": "jq", "args": ["-c", ".results[] | {id, name}"]} ``` @@ -108,9 +108,9 @@ The agent constructs pipelines as JSON arrays of stages. Data flows from one sta {"type": "preview", "chars": 3000} ``` -Any tool stage can set `"for_each": true` to process items one-by-one. The preceding stage must output JSONL (one JSON object per line), and the tool is called once per line. Results are collected into an array. This enables patterns like "fetch a list of URLs, then fetch each one" in a single pipeline call, using a single reused connection for efficiency. +Any tool stage can set `"for_each": true` to process items one-by-one. The preceding stage must output JSONL (one JSON object per line), and the tool is called once per line. Results are collected into an array. So "fetch a list of URLs, then fetch each one" is a single pipeline call, using a single reused connection. -Here is a full example — a pipeline that fetches users, extracts their profile URLs, fetches each profile, and filters for active users: +Full example -fetch users, extract their profile URLs, fetch each profile, filter for active users: ```json [ @@ -125,7 +125,7 @@ Here is a full example — a pipeline that fetches users, extracts their profile ### Prerequisites -- [ToolHive](https://stacklok.com/download/) (`thv`) — a runtime for managing MCP servers +- [ToolHive](https://stacklok.com/download/) (`thv`) -a runtime for managing MCP servers ### Quick start @@ -139,7 +139,7 @@ thv run ghcr.io/stackloklabs/model-context-shell:latest --network host --foregro thv run ghcr.io/stackloklabs/model-context-shell:latest --foreground --transport streamable-http ``` -Once running, you can find the server's address with `thv list`, which shows the URL and port for each running server. If you've registered your AI client with `thv client setup`, ToolHive configures it to discover running servers automatically — see the [CLI quickstart](https://docs.stacklok.com/toolhive/tutorials/quickstart-cli) for details. +Once running, `thv list` shows the URL and port for each running server. If you've registered your AI client with `thv client setup`, ToolHive configures it to discover running servers automatically. See the [CLI quickstart](https://docs.stacklok.com/toolhive/tutorials/quickstart-cli) for details. Model Context Shell works with any existing MCP servers running through ToolHive, and relies on ToolHive's authentication model for connected servers. @@ -174,15 +174,15 @@ See the [ToolHive documentation](https://docs.stacklok.com/toolhive) for the ful ### Tips -**Connect only Model Context Shell to your agent** — For best results, don't connect individual MCP servers directly to the agent alongside Model Context Shell. When agents have direct access to tools, they may call them individually instead of composing efficient pipelines. The server can access all your MCP servers through ToolHive automatically. +**Connect only Model Context Shell to your agent.** Don't connect individual MCP servers directly to the agent alongside Model Context Shell. When agents have direct access to tools, they tend to call them individually instead of composing pipelines. The server can access all your MCP servers through ToolHive automatically. -**Some agents need encouragement** — Most agents will use the shell naturally for complex tasks, but some may need a hint in their system prompt (e.g., "Use Model Context Shell pipelines to combine multiple tool calls efficiently"). +**Some agents need encouragement.** Most agents will use the shell naturally for complex tasks, but some may need a hint in their system prompt (e.g., "Use Model Context Shell pipelines to combine multiple tool calls efficiently"). ## Security ToolHive runs Model Context Shell in an isolated container, so shell commands have no access to the host filesystem or network. The MCP servers it coordinates also run in their own separate containers, managed by ToolHive. -- **Allowed commands only**: A fixed whitelist of safe, read-only data transformation commands (`jq`, `grep`, `sed`, `awk`, `sort`, `uniq`, `cut`, `wc`, `head`, `tail`, `tr`, `date`, `bc`, `paste`, `shuf`, `join`, `sleep`) +- **Allowed commands only**: A fixed allowlist of safe, read-only data transformation commands (`jq`, `grep`, `sed`, `awk`, `sort`, `uniq`, `cut`, `wc`, `head`, `tail`, `tr`, `date`, `bc`, `paste`, `shuf`, `join`, `sleep`) - **No shell injection**: Commands are executed with `shell=False`, arguments passed separately - **MCP tools only**: All external operations go through approved MCP servers @@ -215,23 +215,23 @@ uv run pyright ## Specification -For now, this project serves as a living specification — the implementation _is_ the spec. As the idea matures, a more formal specification may be extracted from it. +For now, this project serves as a living specification -the implementation _is_ the spec. A more formal specification may be extracted later. -**Execution model.** The current execution model is a scriptable map-reduce pipeline. Stages run sequentially, with `for_each` providing the map step over tool calls. This could be extended with a more generic mini-interpreter for evaluating more complex pipelines, but the current thinking is that it would never grow into a full-blown programming language. After a certain level of complexity, it makes more sense for agents to write a larger piece of code directly, or combine written code with the shell approach. That said, the built-in access to tools like `jq` and `awk` already makes the pipeline model surprisingly capable for most data transformation tasks. +**Execution model.** The current execution model is a scriptable map-reduce pipeline. Stages run sequentially, with `for_each` providing the map step over tool calls. This could be extended with a more generic mini-interpreter, but it probably shouldn't grow into a full programming language. Past a certain complexity, it makes more sense for agents to write code directly, or combine written code with the shell approach. That said, built-in access to tools like `jq` and `awk` already makes the pipeline model pretty capable for most data transformation tasks. -**Pipeline schema.** The pipeline format is defined by the `execute_pipeline` tool in [`main.py`](https://github.com/StacklokLabs/model-context-shell/blob/main/main.py). Since FastMCP generates the JSON Schema from the function signature and docstring, this serves as the canonical schema definition. +**Pipeline schema.** The pipeline stages are defined as typed Pydantic models in [`models.py`](https://github.com/StacklokLabs/model-context-shell/blob/main/models.py). FastMCP generates a discriminated-union JSON Schema from these models, so MCP clients can validate pipelines before sending them. -**ToolHive and security.** The reliance on ToolHive and container isolation is a practical choice — it was the simplest way to get a working, secure system. ToolHive handles tool discovery, container management, and networking, which let this project focus on the pipeline execution model itself. A different deployment model could be used in the future without changing the core concept. +**ToolHive and security.** The reliance on ToolHive and container isolation is a practical choice -it was the simplest way to get a working, secure system. ToolHive handles tool discovery, container management, and networking, which lets this project focus on the pipeline execution model itself. A different deployment model could be used without changing the core concept. ## RFC -This project is both a working tech demo and an early-stage RFC for the concept of composable MCP tool pipelines. Rather than writing a detailed specification upfront, the goal is to gather feedback on the idea by providing something concrete to try. +This is both a working tech demo and an early-stage RFC for composable MCP tool pipelines. Rather than writing a detailed spec upfront, the goal is to gather feedback by providing something concrete to try. -If you have thoughts on the approach, ideas for improvements, or use cases we haven't considered, please share them in the [Discussions](https://github.com/StacklokLabs/model-context-shell/discussions) section. +If you have thoughts, ideas, or use cases we haven't considered, share them in the [Discussions](https://github.com/StacklokLabs/model-context-shell/discussions) section. ## Contributing -Contributions, ideas, and feedback are welcome! Please see [CONTRIBUTING.md](CONTRIBUTING.md) for guidelines, including our DCO sign-off requirement. +Contributions and feedback welcome. See [CONTRIBUTING.md](CONTRIBUTING.md) for guidelines, including the DCO sign-off requirement. ## License diff --git a/main.py b/main.py index d9a0f9a..e5c1a3c 100644 --- a/main.py +++ b/main.py @@ -4,6 +4,7 @@ import mcp_client import toolhive_client +from models import PipelineStage from shell_engine import ShellEngine mcp = FastMCP( @@ -50,7 +51,7 @@ def list_available_shell_commands() -> list[str]: @mcp.tool() -async def execute_pipeline(pipeline: list[dict]) -> str: +async def execute_pipeline(pipeline: list[PipelineStage]) -> str: """ Execute a pipeline of tool calls and shell commands to coordinate multiple operations. @@ -79,7 +80,7 @@ async def execute_pipeline(pipeline: list[dict]) -> str: Command Stage: {"type": "command", "command": "jq", "args": ["-c", ".field"]} - - Runs whitelisted shell commands (see list_available_shell_commands) + - Runs allowed shell commands (see list_available_shell_commands) - Command and args MUST be separate (security requirement) Preview Stage: diff --git a/models.py b/models.py new file mode 100644 index 0000000..f643589 --- /dev/null +++ b/models.py @@ -0,0 +1,41 @@ +"""Pydantic models for pipeline stages. + +These models generate a discriminated-union JSON Schema so that MCP clients +and agents can validate pipelines before sending them. +""" + +from typing import Annotated, Literal + +from pydantic import BaseModel, Field + + +class ToolStage(BaseModel): + """Call an external tool from an MCP server.""" + + type: Literal["tool"] + name: str = Field(min_length=1) + server: str = Field(min_length=1) + args: dict = Field(default_factory=dict) + for_each: bool = False + + +class CommandStage(BaseModel): + """Run an allowed shell command.""" + + type: Literal["command"] + command: str = Field(min_length=1) + args: list[str] = Field(default_factory=list) + for_each: bool = False + timeout: float | None = None + + +class PreviewStage(BaseModel): + """Summarize upstream data for inspection.""" + + type: Literal["preview"] + chars: int = Field(default=3000, gt=0) + + +PipelineStage = Annotated[ + ToolStage | CommandStage | PreviewStage, Field(discriminator="type") +] diff --git a/pyproject.toml b/pyproject.toml index 4e919c6..f547b2f 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -42,7 +42,7 @@ ignore = [ quote-style = "double" [tool.coverage.run] -source = ["main", "shell_engine", "mcp_client", "toolhive_client"] +source = ["main", "shell_engine", "mcp_client", "toolhive_client", "models"] branch = true [tool.coverage.report] @@ -50,6 +50,6 @@ show_missing = true skip_covered = false [tool.pyright] -include = ["main.py", "mcp_client.py", "shell_engine.py", "toolhive_client.py", "tests"] +include = ["main.py", "mcp_client.py", "shell_engine.py", "toolhive_client.py", "models.py", "tests"] pythonVersion = "3.13" typeCheckingMode = "standard" diff --git a/shell_engine.py b/shell_engine.py index 7f4de14..33c5049 100644 --- a/shell_engine.py +++ b/shell_engine.py @@ -15,6 +15,8 @@ import headson +from models import CommandStage, PipelineStage, PreviewStage, ToolStage + def _running_in_container() -> bool: """Detect if we're running inside a container (Docker, Podman, etc.). @@ -49,7 +51,7 @@ def _running_in_container() -> bool: return False -# Whitelist of allowed shell commands +# Allowlist of shell commands # Note: Commands that only generate hardcoded text (echo, printf) are excluded # to enforce tool-first architecture where all data comes from real sources ALLOWED_COMMANDS = [ @@ -488,12 +490,12 @@ async def tool_stage( # Fallback to string representation return str(result) - async def execute_pipeline(self, pipeline: list[dict]) -> str: + async def execute_pipeline(self, pipeline: list[PipelineStage]) -> str: """ Execute a pipeline of tool calls and shell commands. Args: - pipeline: List of pipeline stage dictionaries + pipeline: List of typed pipeline stages Returns: Final output of the pipeline @@ -503,54 +505,32 @@ async def execute_pipeline(self, pipeline: list[dict]) -> str: upstream: Iterable[str] = iter([]) # Process each stage in the pipeline - for idx, item in enumerate(pipeline): - item_type = item.get("type") - for_each = item.get("for_each", False) - - if item_type == "command": - command = item.get("command", "") - cmd_args = item.get("args", []) - cmd_timeout = item.get( - "timeout" - ) # Optional timeout for this specific command - - if not command: - raise ValueError("Command stage missing 'command' field") - - if not isinstance(cmd_args, list): - raise ValueError( - f"Command 'args' must be an array, got {type(cmd_args).__name__}" - ) - + for idx, stage in enumerate(pipeline): + if isinstance(stage, CommandStage): try: # Validate command before execution - self.validate_command(command) + self.validate_command(stage.command) upstream = self.shell_stage( - command, - cmd_args, + stage.command, + stage.args, upstream, - for_each=for_each, - timeout=cmd_timeout, + for_each=stage.for_each, + timeout=stage.timeout, ) except Exception as e: raise RuntimeError( f"Stage {idx + 1} (command) failed: {str(e)}" ) - elif item_type == "tool": - tool_name = item.get("name", "") - server_name = item.get("server", "") - args = item.get("args", {}) - - if not tool_name: - raise ValueError("Tool stage missing 'name' field") - if not server_name: - raise ValueError("Tool stage missing 'server' field") - + elif isinstance(stage, ToolStage): try: # Tool stages consume all upstream and return a result result = await self.tool_stage( - server_name, tool_name, args, upstream, for_each=for_each + stage.server, + stage.name, + stage.args, + upstream, + for_each=stage.for_each, ) # Convert result back to a stream for next stage # Ensure result ends with newline for proper shell command processing @@ -560,20 +540,13 @@ async def execute_pipeline(self, pipeline: list[dict]) -> str: upstream = iter([result]) except Exception as e: raise RuntimeError( - f"Stage {idx + 1} (tool {server_name}/{tool_name}) failed: {str(e)}" + f"Stage {idx + 1} (tool {stage.server}/{stage.name}) failed: {str(e)}" ) - elif item_type == "preview": + elif isinstance(stage, PreviewStage): # Preview stage: summarize upstream data for the agent to inspect # Uses headson to create a structure-aware preview within a char budget # Output is NOT valid JSON - it uses pseudo-format with /* N more */ markers - chars = item.get("chars", 3000) - - if not isinstance(chars, int) or chars <= 0: - raise ValueError( - f"Preview 'chars' must be a positive integer, got {chars}" - ) - try: # Collect upstream data input_data = "".join(upstream) @@ -585,7 +558,7 @@ async def execute_pipeline(self, pipeline: list[dict]) -> str: format="json", style="detailed", input_format="json", - byte_budget=chars, # headson uses byte_budget param + byte_budget=stage.chars, # headson uses byte_budget param ) # Add clear marker that this is a preview, not real data @@ -601,9 +574,6 @@ async def execute_pipeline(self, pipeline: list[dict]) -> str: f"Stage {idx + 1} (preview) failed: {str(e)}" ) - else: - raise ValueError(f"Unknown pipeline item type: {item_type}") - # Collect final output output = "".join(upstream) return output diff --git a/tests/test_bwrap_integration.py b/tests/test_bwrap_integration.py index 91cb607..c673a36 100644 --- a/tests/test_bwrap_integration.py +++ b/tests/test_bwrap_integration.py @@ -1,7 +1,17 @@ import pytest +from pydantic import TypeAdapter +from models import PipelineStage from shell_engine import ShellEngine +_pipeline_adapter = TypeAdapter(list[PipelineStage]) + + +def _make_pipeline(raw: list[dict]) -> list[PipelineStage]: + """Convert a list of plain dicts into validated PipelineStage models.""" + return _pipeline_adapter.validate_python(raw) + + # Skip all bwrap tests - bwrap doesn't work reliably in CI/Docker environments pytestmark = pytest.mark.skip(reason="bwrap tests disabled - not reliable in CI/Docker") @@ -17,13 +27,15 @@ async def dummy_tool_caller(server, tool, args): async def test_proc_is_mounted_and_readable(): engine = await _new_engine() - pipeline = [ - { - "type": "command", - "command": "head", - "args": ["-n", "1", "/proc/self/mountinfo"], - } - ] + pipeline = _make_pipeline( + [ + { + "type": "command", + "command": "head", + "args": ["-n", "1", "/proc/self/mountinfo"], + } + ] + ) out = await engine.execute_pipeline(pipeline) assert out.strip() != "" @@ -43,7 +55,7 @@ async def test_tmp_is_writable_tmpfs_and_readable_within_command(): "close(f) }" ) - pipeline = [{"type": "command", "command": "awk", "args": [prog]}] + pipeline = _make_pipeline([{"type": "command", "command": "awk", "args": [prog]}]) out = await engine.execute_pipeline(pipeline) assert "hello" in out @@ -63,7 +75,7 @@ async def test_root_is_read_only_cannot_create_files(): 'close(f); if (c>0) { print "WROTE" } else { print "NOPE" } }' ) - pipeline = [{"type": "command", "command": "awk", "args": [prog]}] + pipeline = _make_pipeline([{"type": "command", "command": "awk", "args": [prog]}]) with pytest.raises(RuntimeError, match="(Read-only file system|Permission denied)"): await engine.execute_pipeline(pipeline) @@ -82,7 +94,7 @@ async def test_usr_is_read_only_cannot_create_files(): 'close(f); if (c>0) { print "WROTE" } else { print "NOPE" } }' ) - pipeline = [{"type": "command", "command": "awk", "args": [prog]}] + pipeline = _make_pipeline([{"type": "command", "command": "awk", "args": [prog]}]) with pytest.raises(RuntimeError, match="(Read-only file system|Permission denied)"): await engine.execute_pipeline(pipeline) diff --git a/tests/test_shell_engine.py b/tests/test_shell_engine.py index 40cf968..e9a24c8 100644 --- a/tests/test_shell_engine.py +++ b/tests/test_shell_engine.py @@ -6,9 +6,18 @@ from unittest.mock import AsyncMock import pytest +from pydantic import TypeAdapter, ValidationError +from models import PipelineStage from shell_engine import ShellEngine +_pipeline_adapter = TypeAdapter(list[PipelineStage]) + + +def _make_pipeline(raw: list[dict]) -> list[PipelineStage]: + """Convert a list of plain dicts into validated PipelineStage models.""" + return _pipeline_adapter.validate_python(raw) + def _bwrap_functional() -> bool: """Check if bwrap is installed and functional (can create namespaces).""" @@ -400,10 +409,12 @@ async def test_execute_pipeline_single_command(self): mock_caller = AsyncMock(return_value=MockToolResult("test output")) engine = ShellEngine(tool_caller=mock_caller) - pipeline = [ - {"type": "tool", "name": "generate", "server": "test", "args": {}}, - {"type": "command", "command": "grep", "args": ["test"]}, - ] + pipeline = _make_pipeline( + [ + {"type": "tool", "name": "generate", "server": "test", "args": {}}, + {"type": "command", "command": "grep", "args": ["test"]}, + ] + ) result = await engine.execute_pipeline(pipeline) @@ -415,10 +426,12 @@ async def test_execute_pipeline_multiple_commands(self): mock_caller = AsyncMock(return_value=MockToolResult("apple\nbanana\ncherry")) engine = ShellEngine(tool_caller=mock_caller) - pipeline = [ - {"type": "tool", "name": "get_data", "server": "test", "args": {}}, - {"type": "command", "command": "grep", "args": ["a"]}, - ] + pipeline = _make_pipeline( + [ + {"type": "tool", "name": "get_data", "server": "test", "args": {}}, + {"type": "command", "command": "grep", "args": ["a"]}, + ] + ) result = await engine.execute_pipeline(pipeline) @@ -431,14 +444,16 @@ async def test_execute_pipeline_with_tool(self): mock_caller = AsyncMock(return_value=MockToolResult("tool result")) engine = ShellEngine(tool_caller=mock_caller) - pipeline = [ - { - "type": "tool", - "name": "test_tool", - "server": "test_server", - "args": {"key": "value"}, - } - ] + pipeline = _make_pipeline( + [ + { + "type": "tool", + "name": "test_tool", + "server": "test_server", + "args": {"key": "value"}, + } + ] + ) result = await engine.execute_pipeline(pipeline) @@ -451,15 +466,17 @@ async def test_execute_pipeline_mixed_stages(self): mock_caller = AsyncMock(return_value=MockToolResult('{"data": "test"}')) engine = ShellEngine(tool_caller=mock_caller) - pipeline = [ - { - "type": "tool", - "name": "fetch", - "server": "http", - "args": {"url": "http://example.com"}, - }, - {"type": "command", "command": "jq", "args": [".data"]}, - ] + pipeline = _make_pipeline( + [ + { + "type": "tool", + "name": "fetch", + "server": "http", + "args": {"url": "http://example.com"}, + }, + {"type": "command", "command": "jq", "args": [".data"]}, + ] + ) result = await engine.execute_pipeline(pipeline) @@ -470,7 +487,9 @@ async def test_execute_pipeline_invalid_command(self): mock_caller = AsyncMock() engine = ShellEngine(tool_caller=mock_caller) - pipeline = [{"type": "command", "command": "rm", "args": ["-rf", "/"]}] + pipeline = _make_pipeline( + [{"type": "command", "command": "rm", "args": ["-rf", "/"]}] + ) with pytest.raises( RuntimeError, match="Pipeline execution failed.*not allowed" @@ -478,75 +497,42 @@ async def test_execute_pipeline_invalid_command(self): await engine.execute_pipeline(pipeline) async def test_execute_pipeline_missing_command_field(self): - """Test pipeline with missing command field.""" - mock_caller = AsyncMock() - engine = ShellEngine(tool_caller=mock_caller) - - pipeline = [ - {"type": "command", "args": ["test"]} # Missing "command" field - ] - - with pytest.raises( - RuntimeError, match="Pipeline execution failed.*missing 'command' field" - ): - await engine.execute_pipeline(pipeline) + """Test pipeline with missing command field raises ValidationError.""" + with pytest.raises(ValidationError): + _make_pipeline( + [ + {"type": "command", "args": ["test"]} # Missing "command" field + ] + ) async def test_execute_pipeline_missing_tool_name_field(self): - """Test pipeline with missing tool name field.""" - mock_caller = AsyncMock() - engine = ShellEngine(tool_caller=mock_caller) - - # Missing "name" field - pipeline = [{"type": "tool", "server": "test_server", "args": {}}] - - with pytest.raises( - RuntimeError, match="Pipeline execution failed.*missing 'name' field" - ): - await engine.execute_pipeline(pipeline) + """Test pipeline with missing tool name field raises ValidationError.""" + with pytest.raises(ValidationError): + _make_pipeline([{"type": "tool", "server": "test_server", "args": {}}]) async def test_execute_pipeline_missing_tool_server_field(self): - """Test pipeline with missing tool server field.""" - mock_caller = AsyncMock() - engine = ShellEngine(tool_caller=mock_caller) - - # Missing "server" field - pipeline = [{"type": "tool", "name": "test_tool", "args": {}}] - - with pytest.raises( - RuntimeError, match="Pipeline execution failed.*missing 'server' field" - ): - await engine.execute_pipeline(pipeline) + """Test pipeline with missing tool server field raises ValidationError.""" + with pytest.raises(ValidationError): + _make_pipeline([{"type": "tool", "name": "test_tool", "args": {}}]) async def test_execute_pipeline_invalid_args_type(self): - """Test pipeline with invalid args type (not a list).""" - mock_caller = AsyncMock() - engine = ShellEngine(tool_caller=mock_caller) - - pipeline = [{"type": "command", "command": "grep", "args": "not-a-list"}] - - with pytest.raises( - RuntimeError, match="Pipeline execution failed.*must be an array" - ): - await engine.execute_pipeline(pipeline) + """Test pipeline with invalid args type (not a list) raises ValidationError.""" + with pytest.raises(ValidationError): + _make_pipeline( + [{"type": "command", "command": "grep", "args": "not-a-list"}] + ) async def test_execute_pipeline_unknown_stage_type(self): - """Test pipeline with unknown stage type.""" - mock_caller = AsyncMock() - engine = ShellEngine(tool_caller=mock_caller) - - pipeline = [{"type": "unknown_type", "data": "test"}] - - with pytest.raises( - RuntimeError, match="Pipeline execution failed.*Unknown pipeline item type" - ): - await engine.execute_pipeline(pipeline) + """Test pipeline with unknown stage type raises ValidationError.""" + with pytest.raises(ValidationError): + _make_pipeline([{"type": "unknown_type", "data": "test"}]) async def test_execute_pipeline_empty_pipeline(self): """Test pipeline with no stages returns empty string.""" mock_caller = AsyncMock() engine = ShellEngine(tool_caller=mock_caller) - pipeline = [] + pipeline = _make_pipeline([]) result = await engine.execute_pipeline(pipeline) @@ -568,10 +554,12 @@ async def mock_caller(server, tool, args): engine = ShellEngine(tool_caller=mock_caller) - pipeline = [ - {"type": "tool", "name": "list_urls", "server": "api", "args": {}}, - {"type": "tool", "name": "fetch", "server": "http", "for_each": True}, - ] + pipeline = _make_pipeline( + [ + {"type": "tool", "name": "list_urls", "server": "api", "args": {}}, + {"type": "tool", "name": "fetch", "server": "http", "for_each": True}, + ] + ) result = await engine.execute_pipeline(pipeline) @@ -594,10 +582,12 @@ async def test_preview_stage_basic(self): mock_caller = AsyncMock(return_value=MockToolResult(large_data)) engine = ShellEngine(tool_caller=mock_caller) - pipeline = [ - {"type": "tool", "name": "get_data", "server": "test", "args": {}}, - {"type": "preview", "chars": 500}, - ] + pipeline = _make_pipeline( + [ + {"type": "tool", "name": "get_data", "server": "test", "args": {}}, + {"type": "preview", "chars": 500}, + ] + ) result = await engine.execute_pipeline(pipeline) @@ -616,10 +606,12 @@ async def test_preview_stage_shows_omission_markers(self): mock_caller = AsyncMock(return_value=MockToolResult(large_array)) engine = ShellEngine(tool_caller=mock_caller) - pipeline = [ - {"type": "tool", "name": "get_data", "server": "test", "args": {}}, - {"type": "preview", "chars": 200}, - ] + pipeline = _make_pipeline( + [ + {"type": "tool", "name": "get_data", "server": "test", "args": {}}, + {"type": "preview", "chars": 200}, + ] + ) result = await engine.execute_pipeline(pipeline) @@ -631,10 +623,12 @@ async def test_preview_stage_default_chars(self): mock_caller = AsyncMock(return_value=MockToolResult('{"test": "data"}')) engine = ShellEngine(tool_caller=mock_caller) - pipeline = [ - {"type": "tool", "name": "get_data", "server": "test", "args": {}}, - {"type": "preview"}, # No chars specified - ] + pipeline = _make_pipeline( + [ + {"type": "tool", "name": "get_data", "server": "test", "args": {}}, + {"type": "preview"}, # No chars specified + ] + ) # Should not raise, uses default result = await engine.execute_pipeline(pipeline) @@ -642,16 +636,13 @@ async def test_preview_stage_default_chars(self): async def test_preview_stage_invalid_chars(self): """Test that preview stage rejects invalid chars parameter.""" - mock_caller = AsyncMock(return_value=MockToolResult('{"test": "data"}')) - engine = ShellEngine(tool_caller=mock_caller) - - pipeline = [ - {"type": "tool", "name": "get_data", "server": "test", "args": {}}, - {"type": "preview", "chars": -100}, - ] - - with pytest.raises(RuntimeError, match="chars.*must be a positive integer"): - await engine.execute_pipeline(pipeline) + with pytest.raises(ValidationError): + _make_pipeline( + [ + {"type": "tool", "name": "get_data", "server": "test", "args": {}}, + {"type": "preview", "chars": -100}, + ] + ) @requires_bwrap async def test_preview_stage_in_middle_of_pipeline(self): @@ -660,11 +651,13 @@ async def test_preview_stage_in_middle_of_pipeline(self): engine = ShellEngine(tool_caller=mock_caller) # Preview in the middle - subsequent stages see preview output, not original data - pipeline = [ - {"type": "tool", "name": "get_data", "server": "test", "args": {}}, - {"type": "preview", "chars": 500}, - {"type": "command", "command": "wc", "args": ["-l"]}, # Count lines - ] + pipeline = _make_pipeline( + [ + {"type": "tool", "name": "get_data", "server": "test", "args": {}}, + {"type": "preview", "chars": 500}, + {"type": "command", "command": "wc", "args": ["-l"]}, # Count lines + ] + ) result = await engine.execute_pipeline(pipeline) # wc -l should return a number (the line count of the preview) @@ -683,7 +676,9 @@ async def failing_caller(server, tool, args): engine = ShellEngine(tool_caller=failing_caller) - pipeline = [{"type": "tool", "name": "test", "server": "test", "args": {}}] + pipeline = _make_pipeline( + [{"type": "tool", "name": "test", "server": "test", "args": {}}] + ) with pytest.raises( RuntimeError, match="Pipeline execution failed.*Tool call failed" @@ -695,15 +690,17 @@ async def test_stage_error_includes_stage_number(self): mock_caller = AsyncMock(return_value=MockToolResult("ok")) engine = ShellEngine(tool_caller=mock_caller) - pipeline = [ - {"type": "tool", "name": "get_data", "server": "test", "args": {}}, - { - "type": "command", - "command": "rm", - "args": ["-rf", "/"], - }, # Stage 2 - forbidden - {"type": "command", "command": "grep", "args": ["never reached"]}, - ] + pipeline = _make_pipeline( + [ + {"type": "tool", "name": "get_data", "server": "test", "args": {}}, + { + "type": "command", + "command": "rm", + "args": ["-rf", "/"], + }, # Stage 2 - forbidden + {"type": "command", "command": "grep", "args": ["never reached"]}, + ] + ) with pytest.raises(RuntimeError, match="Pipeline execution failed.*Stage 2"): await engine.execute_pipeline(pipeline) @@ -716,10 +713,12 @@ async def test_shell_command_error_with_stderr(self): mock_caller = AsyncMock(return_value=MockToolResult('"just a string"')) engine = ShellEngine(tool_caller=mock_caller) - pipeline = [ - {"type": "tool", "name": "get_data", "server": "test", "args": {}}, - {"type": "command", "command": "jq", "args": [".name"]}, - ] + pipeline = _make_pipeline( + [ + {"type": "tool", "name": "get_data", "server": "test", "args": {}}, + {"type": "command", "command": "jq", "args": [".name"]}, + ] + ) with pytest.raises( RuntimeError, match="Command 'jq' failed with exit code.*Stderr:" @@ -734,10 +733,12 @@ async def test_shell_command_grep_no_match_no_error(self): # grep exits with 1 when no match, but doesn't write to stderr # This should NOT raise an error - just return empty output - pipeline = [ - {"type": "tool", "name": "get_data", "server": "test", "args": {}}, - {"type": "command", "command": "grep", "args": ["nonexistent_pattern"]}, - ] + pipeline = _make_pipeline( + [ + {"type": "tool", "name": "get_data", "server": "test", "args": {}}, + {"type": "command", "command": "grep", "args": ["nonexistent_pattern"]}, + ] + ) result = await engine.execute_pipeline(pipeline) # Should complete successfully with empty output @@ -806,9 +807,9 @@ async def test_execute_pipeline_command_timeout(self): engine = ShellEngine(tool_caller=mock_caller) # Pipeline with a command that times out - pipeline = [ - {"type": "command", "command": "sleep", "args": ["10"], "timeout": 0.2} - ] + pipeline = _make_pipeline( + [{"type": "command", "command": "sleep", "args": ["10"], "timeout": 0.2}] + ) start = time.time() with pytest.raises(RuntimeError, match="Pipeline execution failed.*timed out"): @@ -824,16 +825,18 @@ async def test_execute_pipeline_for_each_with_timeout(self): engine = ShellEngine(tool_caller=mock_caller) # Each line would cause a slow command, but should timeout - pipeline = [ - {"type": "tool", "name": "get_lines", "server": "test", "args": {}}, - { - "type": "command", - "command": "sleep", - "args": ["10"], - "for_each": True, - "timeout": 0.2, - }, - ] + pipeline = _make_pipeline( + [ + {"type": "tool", "name": "get_lines", "server": "test", "args": {}}, + { + "type": "command", + "command": "sleep", + "args": ["10"], + "for_each": True, + "timeout": 0.2, + }, + ] + ) start = time.time() with pytest.raises(RuntimeError, match="Pipeline execution failed.*timed out"):