diff --git a/langship-cli/README.md b/langship-cli/README.md index 6b76c2e..dab818a 100644 --- a/langship-cli/README.md +++ b/langship-cli/README.md @@ -91,6 +91,7 @@ langship envs reorder ... langship pipelines list [-o ...] langship pipelines get [-o json|yaml|table] langship pipelines push [--id ] [--name ...] +langship pipelines validate langship pipelines delete [-y] langship creds list [-o ...] diff --git a/langship-cli/pyproject.toml b/langship-cli/pyproject.toml index f86d957..3b1b153 100644 --- a/langship-cli/pyproject.toml +++ b/langship-cli/pyproject.toml @@ -21,5 +21,14 @@ dependencies = [ [project.scripts] langship = "langship.cli:main" +[project.optional-dependencies] +dev = [ + "pytest>=8.0.0", +] + [tool.hatch.build.targets.wheel] packages = ["src/langship"] + +[tool.pytest.ini_options] +testpaths = ["tests"] +pythonpath = ["src"] diff --git a/langship-cli/src/langship/cli.py b/langship-cli/src/langship/cli.py index 2458405..a5b78f7 100644 --- a/langship-cli/src/langship/cli.py +++ b/langship-cli/src/langship/cli.py @@ -30,7 +30,7 @@ app.add_typer(agents_cmd.app, name="agents", help="Manage agents (repos + env subscriptions).") app.add_typer(envs_cmd.app, name="envs", help="Manage environments (named, ordered pipeline lists).") -app.add_typer(pipelines_cmd.app, name="pipelines", help="Manage pipeline definitions (push from file, dump).") +app.add_typer(pipelines_cmd.app, name="pipelines", help="Manage pipeline definitions (push, validate, list, get, delete).") app.add_typer(creds_cmd.app, name="creds", help="Manage the global credential pool (aws / gcp / kv).") app.add_typer(runs_cmd.app, name="runs", help="Inspect executions and stream logs.") diff --git a/langship-cli/src/langship/commands/pipelines.py b/langship-cli/src/langship/commands/pipelines.py index 2dfdfb1..50a9465 100644 --- a/langship-cli/src/langship/commands/pipelines.py +++ b/langship-cli/src/langship/commands/pipelines.py @@ -1,15 +1,18 @@ -"""pipelines list / get / push / delete. +"""pipelines list / get / push / delete / validate. `push` takes a local file containing the pipeline definition (n8n-format JSON, or YAML if PyYAML is installed). If the file (or the top-level object) carries an `id`, the pipeline is updated; otherwise a new one is created and its id printed. The file is the source of truth — GitOps. + +`validate` checks the definition locally before hitting the server. """ from __future__ import annotations import json from pathlib import Path +from typing import Any import typer @@ -18,6 +21,36 @@ app = typer.Typer(no_args_is_help=True) +# n8n trigger types that map to flow-nodes-base.trigger — kept in sync with +# pkg/engine/parser.go so local validation matches server behaviour. +_TRIGGER_TYPES = frozenset({ + "n8n-nodes-base.webhook", + "n8n-nodes-base.scheduleTrigger", + "n8n-nodes-base.manualTrigger", + "n8n-nodes-base.formTrigger", + "n8n-nodes-base.activationTrigger", + "n8n-nodes-base.errorTrigger", + "n8n-nodes-base.executeWorkflowTrigger", + "n8n-nodes-base.n8nTrigger", + "n8n-nodes-base.sseTrigger", + "n8n-nodes-base.workflowTrigger", + "n8n-nodes-base.localFileTrigger", + "n8n-nodes-base.emailReadImap", + "n8n-nodes-base.rssFeedReadTrigger", + "n8n-nodes-base.evaluationTrigger", + "n8n-nodes-base.start", + "n8n-nodes-base.cron", + "n8n-nodes-base.interval", + "n8n-nodes-langchain.chatTrigger", + "n8n-nodes-langchain.mcpTrigger", + "flow-nodes-base.trigger", +}) + + +def _is_trigger_type(node_type: str) -> bool: + """Return True if the node type is treated as a trigger.""" + return node_type in _TRIGGER_TYPES + def _load_file(path: Path) -> dict: text = path.read_text() @@ -31,6 +64,128 @@ def _load_file(path: Path) -> dict: return json.loads(text) +def _validate_definition(doc: dict) -> list[str]: + """Validate a parsed pipeline definition, returning a list of errors. + + Checks match the server-side validations in pkg/engine/parser.go plus + common-sense structural rules. + """ + errors: list[str] = [] + + definition = doc.get("definition", doc) + + nodes = definition.get("nodes") + if not isinstance(nodes, list): + errors.append("'nodes' must be a non-empty array") + return errors + if len(nodes) == 0: + errors.append("'nodes' array is empty") + return errors + + seen_ids: set[str] = set() + seen_names: set[str] = set() + trigger_count = 0 + + for i, node in enumerate(nodes): + prefix = f"nodes[{i}]" + if not isinstance(node, dict): + errors.append(f"{prefix}: expected an object, got {type(node).__name__}") + continue + + node_id = node.get("id") + if not node_id or not isinstance(node_id, str): + errors.append(f"{prefix}: missing or invalid 'id' (must be a non-empty string)") + elif node_id in seen_ids: + errors.append(f"{prefix}: duplicate node id '{node_id}'") + else: + seen_ids.add(node_id) + + node_name = node.get("name") + if not node_name or not isinstance(node_name, str): + errors.append(f"{prefix}: missing or invalid 'name' (must be a non-empty string)") + elif node_name in seen_names: + errors.append(f"{prefix}: duplicate node name '{node_name}'") + else: + seen_names.add(node_name) + + node_type = node.get("type") + if not node_type or not isinstance(node_type, str): + errors.append(f"{prefix}: missing or invalid 'type' (must be a non-empty string)") + elif _is_trigger_type(node_type): + trigger_count += 1 + + pos = node.get("position") + if pos is not None and (not isinstance(pos, (list, tuple)) or len(pos) != 2): + errors.append(f"{prefix}: 'position' must be an array of 2 numbers") + + if trigger_count == 0: + errors.append("workflow must have exactly one trigger node (found 0)") + elif trigger_count > 1: + errors.append(f"workflow must have exactly one trigger node (found {trigger_count})") + + connections = definition.get("connections") + if connections is not None: + if not isinstance(connections, dict): + errors.append("'connections' must be an object") + else: + for source_name, conn_targets in connections.items(): + if source_name not in seen_names: + errors.append(f"connections: source node '{source_name}' not found in nodes") + continue + if not isinstance(conn_targets, dict): + errors.append(f"connections['{source_name}']: expected an object with 'main' key") + continue + main = conn_targets.get("main") + if not isinstance(main, list): + errors.append(f"connections['{source_name}']: 'main' must be an array") + continue + for output_idx, output_group in enumerate(main): + if not isinstance(output_group, list): + errors.append(f"connections['{source_name}'].main[{output_idx}]: expected an array of targets") + continue + for target_idx, target in enumerate(output_group): + if not isinstance(target, dict): + errors.append(f"connections['{source_name}'].main[{output_idx}][{target_idx}]: expected an object with 'node' field") + continue + target_node = target.get("node") + if not target_node or not isinstance(target_node, str): + errors.append(f"connections['{source_name}'].main[{output_idx}][{target_idx}]: missing or invalid 'node'") + elif target_node not in seen_names: + errors.append(f"connections['{source_name}'].main[{output_idx}][{target_idx}]: references unknown node '{target_node}'") + + return errors + + +@app.command("validate") +def validate_pipeline( + file: Path = typer.Argument(..., exists=True, readable=True, help="JSON/YAML file with the pipeline definition."), +) -> None: + """Validate a pipeline definition locally without contacting the server. + + Checks structure, required fields, duplicate names/ids, exactly one + trigger node, and that connections reference valid nodes. + """ + from ..utils import err_console + + try: + doc = _load_file(file) + except (json.JSONDecodeError, ValueError) as e: + die(f"invalid {file.suffix} file: {e}") + + if not isinstance(doc, dict): + die("file must contain a JSON/YAML object") + + errors = _validate_definition(doc) + if not errors: + console.print(f"[green]✓[/green] [bold]{file}[/bold] is valid") + return + + err_console.print(f"[red]✗[/red] [bold]{file}[/bold] has {len(errors)} error{'s' if len(errors) > 1 else ''}:") + for err in errors: + err_console.print(f" [red]·[/red] {err}") + raise typer.Exit(code=1) + + @app.command("list") def list_pipelines(output: str = typer.Option("table", "--output", "-o", help="table | json | yaml")) -> None: """List pipeline definitions.""" diff --git a/langship-cli/tests/__init__.py b/langship-cli/tests/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/langship-cli/tests/test_load_file.py b/langship-cli/tests/test_load_file.py new file mode 100644 index 0000000..d8b730c --- /dev/null +++ b/langship-cli/tests/test_load_file.py @@ -0,0 +1,32 @@ +"""Tests for the pipeline file loader (JSON and YAML).""" + +from __future__ import annotations + +import json +from pathlib import Path + +import pytest + +from langship.commands.pipelines import _load_file + + +def test_load_json_file(tmp_path: Path): + p = tmp_path / "pipeline.json" + p.write_text(json.dumps({"name": "x", "nodes": []})) + assert _load_file(p) == {"name": "x", "nodes": []} + + +def test_load_invalid_json_raises(tmp_path: Path): + p = tmp_path / "bad.json" + p.write_text("{not valid json") + with pytest.raises(json.JSONDecodeError): + _load_file(p) + + +def test_load_yaml_file_requires_pyyaml(tmp_path: Path, monkeypatch: pytest.MonkeyPatch): + pytest.importorskip("yaml") + import yaml as _yaml # noqa: F401 (importorskip guarantees availability) + + p = tmp_path / "pipeline.yaml" + p.write_text("name: x\nnodes: []\n") + assert _load_file(p) == {"name": "x", "nodes": []} diff --git a/langship-cli/tests/test_validate_pipeline.py b/langship-cli/tests/test_validate_pipeline.py new file mode 100644 index 0000000..42e2b4d --- /dev/null +++ b/langship-cli/tests/test_validate_pipeline.py @@ -0,0 +1,240 @@ +"""Tests for the pipeline validation logic. + +Covers both the happy path (valid pipelines) and the error cases +(trigger, nodes, connections) raised by `_validate_definition`. +""" + +from __future__ import annotations + +import pytest + +from langship.commands.pipelines import _is_trigger_type, _validate_definition + + +# A valid hello-world definition used as the baseline. +VALID_PIPELINE = { + "name": "hello", + "nodes": [ + { + "id": "1", + "name": "Trigger", + "type": "n8n-nodes-base.manualTrigger", + "typeVersion": 1, + "parameters": {}, + "position": [0, 0], + }, + { + "id": "2", + "name": "Set Greeting", + "type": "n8n-nodes-base.set", + "typeVersion": 3, + "parameters": {}, + "position": [200, 0], + }, + { + "id": "3", + "name": "End", + "type": "n8n-nodes-base.noop", + "typeVersion": 1, + "parameters": {}, + "position": [400, 0], + }, + ], + "connections": { + "Trigger": {"main": [[{"node": "Set Greeting", "type": "main", "index": 0}]]}, + "Set Greeting": {"main": [[{"node": "End", "type": "main", "index": 0}]]}, + }, +} + + +# --- happy path ------------------------------------------------------------- + + +def test_valid_pipeline_has_no_errors(): + assert _validate_definition(VALID_PIPELINE) == [] + + +def test_valid_pipeline_in_definition_wrapper(): + # The CLI accepts both bare n8n-format and {definition: {...}} shapes. + wrapped = {"name": "hello", "definition": VALID_PIPELINE} + assert _validate_definition(wrapped) == [] + + +def test_valid_pipeline_without_connections(): + doc = { + "name": "t", + "nodes": [ + {"id": "1", "name": "Trigger", "type": "flow-nodes-base.trigger", "position": [0, 0]}, + ], + } + assert _validate_definition(doc) == [] + + +# --- trigger rules ---------------------------------------------------------- + + +@pytest.mark.parametrize( + "node_type", + [ + "n8n-nodes-base.manualTrigger", + "n8n-nodes-base.webhook", + "n8n-nodes-base.scheduleTrigger", + "n8n-nodes-base.cron", + "n8n-nodes-langchain.chatTrigger", + "flow-nodes-base.trigger", + ], +) +def test_is_trigger_type_recognizes_known_triggers(node_type): + assert _is_trigger_type(node_type) is True + + +def test_non_trigger_type_is_not_trigger(): + assert _is_trigger_type("n8n-nodes-base.set") is False + assert _is_trigger_type("flow-nodes-base.code") is False + + +def test_no_trigger_node_is_an_error(): + doc = { + "nodes": [ + {"id": "1", "name": "A", "type": "n8n-nodes-base.set", "position": [0, 0]}, + {"id": "2", "name": "B", "type": "n8n-nodes-base.noop", "position": [200, 0]}, + ], + "connections": {"A": {"main": [[{"node": "B", "type": "main", "index": 0}]]}}, + } + errors = _validate_definition(doc) + assert any("trigger node (found 0)" in e for e in errors) + + +def test_multiple_triggers_is_an_error(): + doc = { + "nodes": [ + {"id": "1", "name": "A", "type": "n8n-nodes-base.manualTrigger", "position": [0, 0]}, + {"id": "2", "name": "B", "type": "n8n-nodes-base.webhook", "position": [200, 0]}, + ], + } + errors = _validate_definition(doc) + assert any("trigger node (found 2)" in e for e in errors) + + +# --- nodes shape ------------------------------------------------------------ + + +def test_missing_nodes_array_is_an_error(): + errors = _validate_definition({"name": "x"}) + assert errors and "'nodes' must be a non-empty array" in errors[0] + + +def test_empty_nodes_array_is_an_error(): + errors = _validate_definition({"nodes": []}) + assert errors and "'nodes' array is empty" in errors[0] + + +def test_node_missing_id_is_an_error(): + doc = { + "nodes": [ + {"name": "Trigger", "type": "n8n-nodes-base.manualTrigger", "position": [0, 0]}, + ], + } + errors = _validate_definition(doc) + assert any("missing or invalid 'id'" in e for e in errors) + + +def test_duplicate_node_ids_is_an_error(): + doc = { + "nodes": [ + {"id": "1", "name": "A", "type": "n8n-nodes-base.manualTrigger", "position": [0, 0]}, + {"id": "1", "name": "B", "type": "n8n-nodes-base.set", "position": [200, 0]}, + ], + } + errors = _validate_definition(doc) + assert any("duplicate node id '1'" in e for e in errors) + + +def test_duplicate_node_names_is_an_error(): + doc = { + "nodes": [ + {"id": "1", "name": "X", "type": "n8n-nodes-base.manualTrigger", "position": [0, 0]}, + {"id": "2", "name": "X", "type": "n8n-nodes-base.set", "position": [200, 0]}, + ], + } + errors = _validate_definition(doc) + assert any("duplicate node name 'X'" in e for e in errors) + + +def test_node_missing_type_is_an_error(): + doc = { + "nodes": [ + {"id": "1", "name": "X", "position": [0, 0]}, + ], + } + errors = _validate_definition(doc) + assert any("missing or invalid 'type'" in e for e in errors) + + +def test_position_must_be_pair(): + doc = { + "nodes": [ + { + "id": "1", + "name": "Trigger", + "type": "n8n-nodes-base.manualTrigger", + "position": [0], + }, + ], + } + errors = _validate_definition(doc) + assert any("'position' must be an array of 2 numbers" in e for e in errors) + + +# --- connections ------------------------------------------------------------ + + +def test_connection_to_unknown_node_is_an_error(): + doc = { + "nodes": [ + {"id": "1", "name": "Trigger", "type": "n8n-nodes-base.manualTrigger", "position": [0, 0]}, + ], + "connections": { + "Trigger": {"main": [[{"node": "Missing", "type": "main", "index": 0}]]}, + }, + } + errors = _validate_definition(doc) + assert any("references unknown node 'Missing'" in e for e in errors) + + +def test_connection_with_unknown_source_is_an_error(): + doc = { + "nodes": [ + {"id": "1", "name": "Trigger", "type": "n8n-nodes-base.manualTrigger", "position": [0, 0]}, + ], + "connections": { + "Ghost": {"main": [[{"node": "Trigger", "type": "main", "index": 0}]]}, + }, + } + errors = _validate_definition(doc) + assert any("source node 'Ghost' not found in nodes" in e for e in errors) + + +def test_connections_must_be_object(): + doc = { + "nodes": [ + {"id": "1", "name": "Trigger", "type": "n8n-nodes-base.manualTrigger", "position": [0, 0]}, + ], + "connections": "not a dict", + } + errors = _validate_definition(doc) + assert any("'connections' must be an object" in e for e in errors) + + +def test_connection_target_must_have_node_field(): + doc = { + "nodes": [ + {"id": "1", "name": "Trigger", "type": "n8n-nodes-base.manualTrigger", "position": [0, 0]}, + {"id": "2", "name": "End", "type": "n8n-nodes-base.noop", "position": [200, 0]}, + ], + "connections": { + "Trigger": {"main": [[{"type": "main", "index": 0}]]}, # no 'node' field + }, + } + errors = _validate_definition(doc) + assert any("missing or invalid 'node'" in e for e in errors)