diff --git a/rivet-cli/src/main.rs b/rivet-cli/src/main.rs index 702ceb5..33a7cde 100644 --- a/rivet-cli/src/main.rs +++ b/rivet-cli/src/main.rs @@ -18,6 +18,7 @@ use rivet_core::store::Store; use rivet_core::validate; mod docs; +mod mcp; mod render; mod schema_cmd; mod serve; @@ -186,6 +187,20 @@ enum Command { /// Scope validation to a named baseline (cumulative) #[arg(long)] baseline: Option, + + /// Track failure convergence across runs to detect agent retry loops + #[arg(long)] + track_convergence: bool, + }, + + /// Show a single artifact by ID + Get { + /// Artifact ID to retrieve + id: String, + + /// Output format: "text" (default), "json", or "yaml" + #[arg(short, long, default_value = "text")] + format: String, }, /// List artifacts, optionally filtered by type @@ -597,6 +612,9 @@ enum Command { /// Start the language server (LSP over stdio) Lsp, + + /// Start the MCP server (stdio transport) + Mcp, } #[derive(Subcommand)] @@ -727,25 +745,31 @@ fn run(cli: Cli) -> Result { if let Command::Lsp = &cli.command { return cmd_lsp(&cli); } + if let Command::Mcp = &cli.command { + return cmd_mcp(); + } match &cli.command { Command::Init { .. } | Command::Docs { .. } | Command::Context | Command::CommitMsgCheck { .. } - | Command::Lsp => unreachable!(), + | Command::Lsp + | Command::Mcp => unreachable!(), Command::Stpa { path, schema } => cmd_stpa(path, schema.as_deref(), &cli), Command::Validate { format, direct, skip_external_validation, baseline, + track_convergence, } => cmd_validate( &cli, format, *direct, *skip_external_validation, baseline.as_deref(), + *track_convergence, ), Command::List { r#type, @@ -759,6 +783,7 @@ fn run(cli: Cli) -> Result { format, baseline.as_deref(), ), + Command::Get { id, format } => cmd_get(&cli, id, format), Command::Stats { format, baseline } => cmd_stats(&cli, format, baseline.as_deref()), Command::Coverage { format, @@ -1792,6 +1817,7 @@ fn cmd_validate( direct: bool, skip_external_validation: bool, baseline_name: Option<&str>, + track_convergence: bool, ) -> Result { check_for_updates(); @@ -2078,6 +2104,51 @@ fn cmd_validate( } } + // ── Convergence tracking ──────────────────────────────────────────── + if track_convergence { + let convergence_dir = cli.project.join(".rivet"); + let convergence_path = convergence_dir.join("convergence.json"); + + let mut tracker = if convergence_path.exists() { + let json = std::fs::read_to_string(&convergence_path) + .context("failed to read convergence state")?; + rivet_core::convergence::ConvergenceTracker::from_json(&json) + .context("failed to parse convergence state")? + } else { + rivet_core::convergence::ConvergenceTracker::new() + }; + + let report = tracker.record_run(&diagnostics); + + // Save updated state. + std::fs::create_dir_all(&convergence_dir).context("failed to create .rivet directory")?; + let json = tracker + .to_json() + .context("failed to serialize convergence state")?; + std::fs::write(&convergence_path, json).context("failed to write convergence state")?; + + // Print convergence guidance. + if !report.repeated_failures.is_empty() || !report.resolved_failures.is_empty() { + println!(); + println!("Convergence tracking (run #{}):", report.run_number); + + if !report.resolved_failures.is_empty() { + println!( + " \u{2714} {} failure(s) resolved since last run", + report.resolved_failures.len() + ); + } + + if !report.repeated_failures.is_empty() { + println!( + " \u{26a0} {} repeated failure(s) \u{2014} {}", + report.repeated_failures.len(), + report.strategy.guidance() + ); + } + } + } + Ok(errors == 0 && cross_errors == 0) } @@ -2187,6 +2258,92 @@ fn collect_yaml_files(path: &std::path::Path, out: &mut Vec<(String, String)>) - Ok(()) } +/// Show a single artifact by ID. +fn cmd_get(cli: &Cli, id: &str, format: &str) -> Result { + let ctx = ProjectContext::load(cli)?; + + let Some(artifact) = ctx.store.get(id) else { + eprintln!("error: artifact '{}' not found", id); + return Ok(false); + }; + + match format { + "json" => { + let links_json: Vec = artifact + .links + .iter() + .map(|l| { + serde_json::json!({ + "type": l.link_type, + "target": l.target, + }) + }) + .collect(); + let fields_json: serde_json::Value = artifact + .fields + .iter() + .map(|(k, v)| { + let json_val = serde_json::to_value(v).unwrap_or(serde_json::Value::Null); + (k.clone(), json_val) + }) + .collect::>() + .into(); + let output = serde_json::json!({ + "command": "get", + "id": artifact.id, + "type": artifact.artifact_type, + "title": artifact.title, + "status": artifact.status.as_deref().unwrap_or(""), + "description": artifact.description.as_deref().unwrap_or(""), + "tags": artifact.tags, + "links": links_json, + "fields": fields_json, + }); + println!("{}", serde_json::to_string_pretty(&output).unwrap()); + } + "yaml" => { + // Serialize the artifact back to YAML + let yaml = serde_yaml::to_string(artifact) + .unwrap_or_else(|e| format!("# failed to serialize: {e}")); + print!("{yaml}"); + } + _ => { + // Human-readable text format + println!("ID: {}", artifact.id); + println!("Type: {}", artifact.artifact_type); + println!("Title: {}", artifact.title); + println!("Status: {}", artifact.status.as_deref().unwrap_or("-")); + if let Some(desc) = &artifact.description { + println!("Description: {}", desc.trim()); + } + if !artifact.tags.is_empty() { + println!("Tags: {}", artifact.tags.join(", ")); + } + if !artifact.fields.is_empty() { + println!("Fields:"); + for (key, value) in &artifact.fields { + let val_str = match value { + serde_yaml::Value::String(s) => s.clone(), + other => serde_yaml::to_string(other) + .unwrap_or_default() + .trim() + .to_string(), + }; + println!(" {}: {}", key, val_str); + } + } + if !artifact.links.is_empty() { + println!("Links:"); + for link in &artifact.links { + println!(" {} -> {}", link.link_type, link.target); + } + } + } + } + + Ok(true) +} + /// List artifacts. fn cmd_list( cli: &Cli, @@ -3026,6 +3183,7 @@ fn cmd_diff( direct: false, skip_external_validation: false, baseline: None, + track_convergence: false, }, }; let head_cli = Cli { @@ -3037,6 +3195,7 @@ fn cmd_diff( direct: false, skip_external_validation: false, baseline: None, + track_convergence: false, }, }; let bc = ProjectContext::load(&base_cli)?; @@ -5536,6 +5695,11 @@ fn strip_html_tags(html: &str) -> String { .replace(""", "\"") } +fn cmd_mcp() -> Result { + mcp::run()?; + Ok(true) +} + fn cmd_lsp(cli: &Cli) -> Result { use lsp_server::{Connection, Message, Response}; use lsp_types::*; @@ -5612,23 +5776,8 @@ fn cmd_lsp(cli: &Cli) -> Result { (source_set, schema_set) }; - // Publish initial diagnostics from salsa - let store = db.store(source_set); - let diagnostics = db.diagnostics(source_set, schema_set); - let mut prev_diagnostic_files: std::collections::HashSet = - std::collections::HashSet::new(); - lsp_publish_salsa_diagnostics( - &connection, - &diagnostics, - &store, - &mut prev_diagnostic_files, - ); - eprintln!( - "rivet lsp: initialized with {} artifacts (salsa incremental)", - store.len() - ); - // Build supplementary state for rendering + let store = db.store(source_set); let render_schema = db.schema(schema_set); let mut render_graph = rivet_core::links::LinkGraph::build(&store, &render_schema); @@ -5657,6 +5806,26 @@ fn cmd_lsp(cli: &Cli) -> Result { } } + // Publish initial diagnostics from salsa, plus document [[ID]] reference + // validation. validate_documents() checks that every [[ID]] wiki-link in + // markdown documents points to an artifact that exists in the store; + // broken refs are surfaced as LSP warnings in the source .md file. + let mut diagnostics = db.diagnostics(source_set, schema_set); + diagnostics.extend(validate::validate_documents(&doc_store, &store)); + let mut prev_diagnostic_files: std::collections::HashSet = + std::collections::HashSet::new(); + lsp_publish_salsa_diagnostics( + &connection, + &diagnostics, + &store, + &mut prev_diagnostic_files, + ); + eprintln!( + "rivet lsp: initialized with {} artifacts, {} documents (salsa incremental)", + store.len(), + doc_store.len() + ); + let repo_context = crate::serve::RepoContext { project_name: project_dir .file_name() @@ -5988,8 +6157,12 @@ fn cmd_lsp(cli: &Cli) -> Result { } } // Re-query diagnostics (salsa recomputes only what changed) - let new_diagnostics = db.diagnostics(source_set, schema_set); + // and append document [[ID]] reference validation so + // broken wiki-links in markdown files are reported. + let mut new_diagnostics = db.diagnostics(source_set, schema_set); let new_store = db.store(source_set); + new_diagnostics + .extend(validate::validate_documents(&doc_store, &new_store)); lsp_publish_salsa_diagnostics( &connection, &new_diagnostics, @@ -6010,6 +6183,10 @@ fn cmd_lsp(cli: &Cli) -> Result { &render_schema, ); diagnostics_cache = db.diagnostics(source_set, schema_set); + diagnostics_cache.extend(validate::validate_documents( + &doc_store, + &render_store, + )); // Send artifactsChanged notification let changed_notification = lsp_server::Notification { @@ -6042,9 +6219,14 @@ fn cmd_lsp(cli: &Cli) -> Result { change.text.clone(), ); if updated { - // Re-query diagnostics incrementally - let diagnostics = db.diagnostics(source_set, schema_set); + // Re-query diagnostics incrementally, + // including document [[ID]] reference validation. + let mut diagnostics = + db.diagnostics(source_set, schema_set); let store = db.store(source_set); + diagnostics.extend(validate::validate_documents( + &doc_store, &store, + )); lsp_publish_salsa_diagnostics( &connection, &diagnostics, diff --git a/rivet-cli/src/mcp.rs b/rivet-cli/src/mcp.rs new file mode 100644 index 0000000..2853c45 --- /dev/null +++ b/rivet-cli/src/mcp.rs @@ -0,0 +1,381 @@ +//! MCP (Model Context Protocol) server for Rivet. +//! +//! Implements the MCP protocol over stdio using JSON-RPC 2.0. +//! This allows AI coding assistants (Claude Code, Cursor, etc.) to interact +//! with Rivet projects programmatically — validating artifacts, listing them, +//! and querying project statistics. + +use std::io::{self, BufRead, Write}; +use std::path::Path; + +use anyhow::{Context, Result}; +use serde_json::{Value, json}; + +use rivet_core::links::LinkGraph; +use rivet_core::schema::Severity; +use rivet_core::store::Store; +use rivet_core::validate; + +// ── JSON-RPC helpers ──────────────────────────────────────────────────── + +fn jsonrpc_result(id: Value, result: Value) -> Value { + json!({ + "jsonrpc": "2.0", + "id": id, + "result": result, + }) +} + +fn jsonrpc_error(id: Value, code: i64, message: &str) -> Value { + json!({ + "jsonrpc": "2.0", + "id": id, + "error": { + "code": code, + "message": message, + }, + }) +} + +// ── Tool definitions ──────────────────────────────────────────────────── + +fn tool_definitions() -> Vec { + vec![ + json!({ + "name": "rivet_validate", + "description": "Validate artifacts against schemas and return diagnostics. Returns errors, warnings, and informational messages about the project's artifact consistency.", + "inputSchema": { + "type": "object", + "properties": { + "project_dir": { + "type": "string", + "description": "Path to the project directory containing rivet.yaml. Defaults to the current working directory." + } + }, + "required": [] + } + }), + json!({ + "name": "rivet_list", + "description": "List artifacts in the project, optionally filtered by type. Returns artifact IDs, types, titles, statuses, and link counts.", + "inputSchema": { + "type": "object", + "properties": { + "project_dir": { + "type": "string", + "description": "Path to the project directory containing rivet.yaml. Defaults to the current working directory." + }, + "type_filter": { + "type": "string", + "description": "Filter by artifact type (e.g., 'requirement', 'design-decision')" + }, + "status_filter": { + "type": "string", + "description": "Filter by lifecycle status (e.g., 'draft', 'active', 'approved')" + } + }, + "required": [] + } + }), + json!({ + "name": "rivet_stats", + "description": "Return project statistics: artifact counts by type, total count, orphan artifacts (no links), and broken link count.", + "inputSchema": { + "type": "object", + "properties": { + "project_dir": { + "type": "string", + "description": "Path to the project directory containing rivet.yaml. Defaults to the current working directory." + } + }, + "required": [] + } + }), + ] +} + +// ── Project loading (simplified from main.rs) ─────────────────────────── + +struct McpProject { + store: Store, + schema: rivet_core::schema::Schema, + graph: LinkGraph, +} + +fn load_project(project_dir: &Path) -> Result { + let config_path = project_dir.join("rivet.yaml"); + let config = rivet_core::load_project_config(&config_path) + .with_context(|| format!("loading {}", config_path.display()))?; + + // Resolve schemas directory + let schemas_dir = { + let project_schemas = project_dir.join("schemas"); + if project_schemas.exists() { + project_schemas + } else if let Ok(exe) = std::env::current_exe() { + if let Some(parent) = exe.parent() { + let bin_schemas = parent.join("../schemas"); + if bin_schemas.exists() { + bin_schemas + } else { + project_schemas + } + } else { + project_schemas + } + } else { + project_schemas + } + }; + + let schema = rivet_core::load_schemas(&config.project.schemas, &schemas_dir) + .context("loading schemas")?; + + let mut store = Store::new(); + for source in &config.sources { + let artifacts = rivet_core::load_artifacts(source, project_dir) + .with_context(|| format!("loading source '{}'", source.path))?; + for artifact in artifacts { + store.upsert(artifact); + } + } + + let graph = LinkGraph::build(&store, &schema); + Ok(McpProject { + store, + schema, + graph, + }) +} + +// ── Tool implementations ──────────────────────────────────────────────── + +fn tool_validate(project_dir: &Path) -> Result { + let proj = load_project(project_dir)?; + let diagnostics = validate::validate(&proj.store, &proj.schema, &proj.graph); + + let errors = diagnostics + .iter() + .filter(|d| d.severity == Severity::Error) + .count(); + let warnings = diagnostics + .iter() + .filter(|d| d.severity == Severity::Warning) + .count(); + let infos = diagnostics + .iter() + .filter(|d| d.severity == Severity::Info) + .count(); + + let diag_json: Vec = diagnostics + .iter() + .map(|d| { + json!({ + "severity": format!("{:?}", d.severity).to_lowercase(), + "artifact_id": d.artifact_id, + "message": d.message, + }) + }) + .collect(); + + let result_str = if errors > 0 { "FAIL" } else { "PASS" }; + Ok(json!({ + "result": result_str, + "errors": errors, + "warnings": warnings, + "infos": infos, + "diagnostics": diag_json, + })) +} + +fn tool_list( + project_dir: &Path, + type_filter: Option<&str>, + status_filter: Option<&str>, +) -> Result { + let proj = load_project(project_dir)?; + + let query = rivet_core::query::Query { + artifact_type: type_filter.map(|s| s.to_string()), + status: status_filter.map(|s| s.to_string()), + ..Default::default() + }; + let results = rivet_core::query::execute(&proj.store, &query); + + let artifacts_json: Vec = results + .iter() + .map(|a| { + json!({ + "id": a.id, + "type": a.artifact_type, + "title": a.title, + "status": a.status.as_deref().unwrap_or("-"), + "links": a.links.len(), + }) + }) + .collect(); + + Ok(json!({ + "count": results.len(), + "artifacts": artifacts_json, + })) +} + +fn tool_stats(project_dir: &Path) -> Result { + let proj = load_project(project_dir)?; + let orphans = proj.graph.orphans(&proj.store); + + let mut types = serde_json::Map::new(); + let mut type_names: Vec<&str> = proj.store.types().collect(); + type_names.sort(); + for t in &type_names { + types.insert(t.to_string(), json!(proj.store.count_by_type(t))); + } + + Ok(json!({ + "total": proj.store.len(), + "types": types, + "orphans": orphans, + "broken_links": proj.graph.broken.len(), + })) +} + +// ── Tool dispatch ─────────────────────────────────────────────────────── + +fn dispatch_tool(name: &str, arguments: &Value) -> Value { + let project_dir_str = arguments + .get("project_dir") + .and_then(Value::as_str) + .unwrap_or("."); + let project_dir = std::path::PathBuf::from(project_dir_str); + + let result = match name { + "rivet_validate" => tool_validate(&project_dir), + "rivet_list" => { + let type_filter = arguments.get("type_filter").and_then(Value::as_str); + let status_filter = arguments.get("status_filter").and_then(Value::as_str); + tool_list(&project_dir, type_filter, status_filter) + } + "rivet_stats" => tool_stats(&project_dir), + _ => { + return json!({ + "content": [{ + "type": "text", + "text": format!("Unknown tool: {name}"), + }], + "isError": true, + }); + } + }; + + match result { + Ok(value) => json!({ + "content": [{ + "type": "text", + "text": serde_json::to_string_pretty(&value).unwrap_or_default(), + }], + }), + Err(e) => json!({ + "content": [{ + "type": "text", + "text": format!("Error: {e:#}"), + }], + "isError": true, + }), + } +} + +// ── Request handler ───────────────────────────────────────────────────── + +fn handle_request(method: &str, params: &Value, id: Value) -> Option { + match method { + "initialize" => Some(jsonrpc_result( + id, + json!({ + "protocolVersion": "2024-11-05", + "capabilities": { + "tools": {} + }, + "serverInfo": { + "name": "rivet-mcp", + "version": env!("CARGO_PKG_VERSION"), + } + }), + )), + "notifications/initialized" => { + // Client acknowledges initialization — no response needed. + None + } + "tools/list" => Some(jsonrpc_result( + id, + json!({ + "tools": tool_definitions(), + }), + )), + "tools/call" => { + let name = params.get("name").and_then(Value::as_str).unwrap_or(""); + let arguments = params.get("arguments").cloned().unwrap_or(json!({})); + + let result = dispatch_tool(name, &arguments); + Some(jsonrpc_result(id, result)) + } + "ping" => Some(jsonrpc_result(id, json!({}))), + _ => Some(jsonrpc_error( + id, + -32601, + &format!("Method not found: {method}"), + )), + } +} + +// ── Main server loop ──────────────────────────────────────────────────── + +/// Run the MCP server, reading JSON-RPC messages from stdin and writing +/// responses to stdout. Diagnostics go to stderr. +pub fn run() -> Result<()> { + eprintln!("rivet mcp: starting MCP server (stdio transport)..."); + + let stdin = io::stdin(); + let mut stdout = io::stdout(); + + for line in stdin.lock().lines() { + let line = line.context("reading stdin")?; + let line = line.trim(); + if line.is_empty() { + continue; + } + + let msg: Value = match serde_json::from_str(line) { + Ok(v) => v, + Err(e) => { + eprintln!("rivet mcp: invalid JSON: {e}"); + let err = jsonrpc_error(Value::Null, -32700, &format!("Parse error: {e}")); + writeln!(stdout, "{}", serde_json::to_string(&err).unwrap())?; + stdout.flush()?; + continue; + } + }; + + let method = msg.get("method").and_then(Value::as_str).unwrap_or(""); + let params = msg.get("params").cloned().unwrap_or(json!({})); + let id = msg.get("id").cloned().unwrap_or(Value::Null); + + // Notifications have no id — we still process them but don't respond. + let is_notification = !msg.as_object().is_some_and(|o| o.contains_key("id")); + + if is_notification { + // Process the notification (side effects only). + let _ = handle_request(method, ¶ms, Value::Null); + continue; + } + + if let Some(response) = handle_request(method, ¶ms, id.clone()) { + let response_str = serde_json::to_string(&response).context("serializing response")?; + writeln!(stdout, "{response_str}")?; + stdout.flush()?; + } + } + + eprintln!("rivet mcp: stdin closed, shutting down."); + Ok(()) +} diff --git a/rivet-cli/tests/cli_commands.rs b/rivet-cli/tests/cli_commands.rs index 7b75488..8c5a808 100644 --- a/rivet-cli/tests/cli_commands.rs +++ b/rivet-cli/tests/cli_commands.rs @@ -772,3 +772,109 @@ fn embed_matrix() { "should contain matrix output. Got: {stdout}" ); } + +// ── rivet get ────────────────────────────────────────────────────────── + +/// `rivet get REQ-001` succeeds and shows the artifact in text format. +#[test] +fn get_text_shows_artifact() { + let output = Command::new(rivet_bin()) + .args([ + "--project", + project_root().to_str().unwrap(), + "get", + "REQ-001", + ]) + .output() + .expect("failed to execute rivet get REQ-001"); + + assert!( + output.status.success(), + "rivet get REQ-001 must exit 0. stderr: {}", + String::from_utf8_lossy(&output.stderr) + ); + + let stdout = String::from_utf8_lossy(&output.stdout); + assert!( + stdout.contains("REQ-001"), + "output must contain artifact ID. Got:\n{stdout}" + ); + assert!( + stdout.contains("requirement"), + "output must contain artifact type. Got:\n{stdout}" + ); + assert!( + stdout.contains("Text-file-first"), + "output must contain artifact title. Got:\n{stdout}" + ); +} + +/// `rivet get REQ-001 --format json` produces valid JSON with id, type, title. +#[test] +fn get_json_produces_valid_output() { + let output = Command::new(rivet_bin()) + .args([ + "--project", + project_root().to_str().unwrap(), + "get", + "REQ-001", + "--format", + "json", + ]) + .output() + .expect("failed to execute rivet get REQ-001 --format json"); + + assert!( + output.status.success(), + "rivet get REQ-001 --format json must exit 0. stderr: {}", + String::from_utf8_lossy(&output.stderr) + ); + + let stdout = String::from_utf8_lossy(&output.stdout); + let parsed: serde_json::Value = serde_json::from_str(&stdout).expect("get JSON must be valid"); + + assert_eq!( + parsed.get("command").and_then(|v| v.as_str()), + Some("get"), + "JSON envelope must have command 'get'" + ); + assert_eq!( + parsed.get("id").and_then(|v| v.as_str()), + Some("REQ-001"), + "JSON must contain correct id" + ); + assert_eq!( + parsed.get("type").and_then(|v| v.as_str()), + Some("requirement"), + "JSON must contain correct type" + ); + assert!( + parsed.get("title").and_then(|v| v.as_str()).is_some(), + "JSON must contain title" + ); +} + +/// `rivet get NONEXISTENT` returns non-zero exit code and prints an error. +#[test] +fn get_nonexistent_returns_error() { + let output = Command::new(rivet_bin()) + .args([ + "--project", + project_root().to_str().unwrap(), + "get", + "NONEXISTENT", + ]) + .output() + .expect("failed to execute rivet get NONEXISTENT"); + + assert!( + !output.status.success(), + "rivet get NONEXISTENT must exit non-zero" + ); + + let stderr = String::from_utf8_lossy(&output.stderr); + assert!( + stderr.contains("not found"), + "stderr must mention 'not found'. Got:\n{stderr}" + ); +} diff --git a/rivet-core/src/convergence.rs b/rivet-core/src/convergence.rs new file mode 100644 index 0000000..10d3eb9 --- /dev/null +++ b/rivet-core/src/convergence.rs @@ -0,0 +1,596 @@ +//! Agent convergence tracking — detects when AI agents get stuck in retry loops +//! by tracking validation failure signatures across runs. + +use std::collections::HashMap; +use std::fmt; + +use serde::{Deserialize, Serialize}; + +use crate::schema::Severity; +use crate::validate::Diagnostic; + +// ── Failure signature ────────────────────────────────────────────────── + +/// Normalized fingerprint for a validation failure. +/// +/// Format: `{severity}:{rule}:{artifact_id}:{message_hash}` +#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)] +pub struct FailureSignature(pub String); + +impl FailureSignature { + /// Build a deterministic signature from a diagnostic. + pub fn from_diagnostic(d: &Diagnostic) -> Self { + let severity = severity_str(d.severity); + let artifact = d.artifact_id.as_deref().unwrap_or("_"); + let msg_hash = simple_hash(&d.message); + FailureSignature(format!("{severity}:{}:{artifact}:{msg_hash:016x}", d.rule)) + } +} + +impl fmt::Display for FailureSignature { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.write_str(&self.0) + } +} + +/// Deterministic (non-cryptographic) hash for message strings. +/// +/// Uses FNV-1a so the result is stable across runs and platforms. +fn simple_hash(s: &str) -> u64 { + let mut hash: u64 = 0xcbf29ce484222325; // FNV offset basis + for byte in s.bytes() { + hash ^= u64::from(byte); + hash = hash.wrapping_mul(0x100000001b3); // FNV prime + } + hash +} + +fn severity_str(s: Severity) -> &'static str { + match s { + Severity::Error => "error", + Severity::Warning => "warning", + Severity::Info => "info", + } +} + +// ── Failure record ───────────────────────────────────────────────────── + +/// Tracks how many times a particular failure has occurred. +#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] +pub struct FailureRecord { + /// Number of validation runs where this failure appeared. + pub occurrence_count: u32, + /// Run number when this failure was first seen. + pub first_seen: u32, + /// Run number when this failure was last seen. + pub last_seen: u32, +} + +// ── Retry strategy ───────────────────────────────────────────────────── + +/// Recommended strategy based on how many times a failure has recurred. +#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)] +pub enum RetryStrategy { + /// First occurrence — just show the diagnostic. + Normal, + /// Second occurrence — read the diagnostic more carefully. + ExpandedContext, + /// Third occurrence — try something fundamentally different. + DifferentApproach, + /// Four or more — flag for human review. + HumanReview, +} + +impl RetryStrategy { + /// Select a strategy based on occurrence count. + pub fn from_count(count: u32) -> Self { + match count { + 0 | 1 => RetryStrategy::Normal, + 2 => RetryStrategy::ExpandedContext, + 3 => RetryStrategy::DifferentApproach, + _ => RetryStrategy::HumanReview, + } + } + + /// Human-readable guidance message. + pub fn guidance(&self) -> &'static str { + match self { + RetryStrategy::Normal => "New failure — review the diagnostic.", + RetryStrategy::ExpandedContext => "This failed before. Read the diagnostic carefully.", + RetryStrategy::DifferentApproach => { + "Your approach is NOT working. Try something fundamentally different." + } + RetryStrategy::HumanReview => "Flagged for human review.", + } + } +} + +impl fmt::Display for RetryStrategy { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.write_str(self.guidance()) + } +} + +// ── Convergence report ───────────────────────────────────────────────── + +/// Report produced after recording a validation run. +#[derive(Debug, Clone)] +pub struct ConvergenceReport { + /// Overall recommended strategy (worst-case across all repeated failures). + pub strategy: RetryStrategy, + /// Failures that appeared in a previous run and are still present. + pub repeated_failures: Vec<(FailureSignature, FailureRecord)>, + /// Failures appearing for the first time in this run. + pub new_failures: Vec, + /// Failures that were previously tracked but did not appear in this run. + pub resolved_failures: Vec, + /// Current run number. + pub run_number: u32, +} + +impl ConvergenceReport { + /// Human-readable summary of the convergence state. + pub fn summary(&self) -> String { + let mut parts = Vec::new(); + + if !self.repeated_failures.is_empty() { + parts.push(format!( + "{} repeated failure(s)", + self.repeated_failures.len() + )); + } + if !self.new_failures.is_empty() { + parts.push(format!("{} new failure(s)", self.new_failures.len())); + } + if !self.resolved_failures.is_empty() { + parts.push(format!( + "{} resolved failure(s)", + self.resolved_failures.len() + )); + } + + if parts.is_empty() { + "No failures tracked.".to_string() + } else { + format!("Run #{}: {}", self.run_number, parts.join(", ")) + } + } +} + +// ── Convergence tracker ──────────────────────────────────────────────── + +/// Persistent tracker that records failure signatures across validation runs. +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct ConvergenceTracker { + /// Total number of validation runs recorded. + pub run_count: u32, + /// Map from failure signature to its record. + pub failures: HashMap, +} + +impl Default for ConvergenceTracker { + fn default() -> Self { + Self::new() + } +} + +impl ConvergenceTracker { + /// Create a new, empty tracker. + pub fn new() -> Self { + Self { + run_count: 0, + failures: HashMap::new(), + } + } + + /// Record a validation run and produce a convergence report. + /// + /// `diagnostics` are the diagnostics from the current validation run. + pub fn record_run(&mut self, diagnostics: &[Diagnostic]) -> ConvergenceReport { + self.run_count += 1; + let current_run = self.run_count; + + // Compute signatures for current diagnostics. + let current_sigs: HashMap = diagnostics + .iter() + .map(|d| (FailureSignature::from_diagnostic(d), ())) + .collect(); + + let mut repeated = Vec::new(); + let mut new_failures = Vec::new(); + + // Update records for current failures. + for sig in current_sigs.keys() { + let record = self.failures.entry(sig.clone()).or_insert(FailureRecord { + occurrence_count: 0, + first_seen: current_run, + last_seen: current_run, + }); + + let was_new = record.occurrence_count == 0; + record.occurrence_count += 1; + record.last_seen = current_run; + + if was_new { + new_failures.push(sig.clone()); + } else { + repeated.push((sig.clone(), record.clone())); + } + } + + // Detect resolved failures (present in tracker but not in current run). + let resolved: Vec = self + .failures + .keys() + .filter(|sig| !current_sigs.contains_key(*sig)) + .cloned() + .collect(); + + // Remove resolved failures from the tracker. + for sig in &resolved { + self.failures.remove(sig); + } + + // Determine overall strategy (worst-case across repeated failures). + let strategy = repeated + .iter() + .map(|(_, rec)| RetryStrategy::from_count(rec.occurrence_count)) + .max_by_key(|s| match s { + RetryStrategy::Normal => 0, + RetryStrategy::ExpandedContext => 1, + RetryStrategy::DifferentApproach => 2, + RetryStrategy::HumanReview => 3, + }) + .unwrap_or(RetryStrategy::Normal); + + ConvergenceReport { + strategy, + repeated_failures: repeated, + new_failures, + resolved_failures: resolved, + run_number: current_run, + } + } + + /// Serialize the tracker to JSON. + pub fn to_json(&self) -> Result { + serde_json::to_string_pretty(self) + } + + /// Deserialize a tracker from JSON. + pub fn from_json(json: &str) -> Result { + serde_json::from_str(json) + } +} + +// ── Tests ────────────────────────────────────────────────────────────── + +#[cfg(test)] +mod tests { + use super::*; + use crate::schema::Severity; + use crate::validate::Diagnostic; + + fn make_diag( + severity: Severity, + rule: &str, + artifact_id: Option<&str>, + msg: &str, + ) -> Diagnostic { + Diagnostic { + severity, + artifact_id: artifact_id.map(String::from), + rule: rule.to_string(), + message: msg.to_string(), + source_file: None, + line: None, + column: None, + } + } + + // ── Signature generation ─────────────────────────────────────────── + + #[test] + fn signature_is_deterministic() { + let d = make_diag( + Severity::Error, + "missing-link", + Some("REQ-001"), + "no trace link", + ); + let s1 = FailureSignature::from_diagnostic(&d); + let s2 = FailureSignature::from_diagnostic(&d); + assert_eq!(s1, s2); + } + + #[test] + fn signature_varies_by_severity() { + let d1 = make_diag(Severity::Error, "rule-a", Some("X"), "msg"); + let d2 = make_diag(Severity::Warning, "rule-a", Some("X"), "msg"); + assert_ne!( + FailureSignature::from_diagnostic(&d1), + FailureSignature::from_diagnostic(&d2), + ); + } + + #[test] + fn signature_varies_by_rule() { + let d1 = make_diag(Severity::Error, "rule-a", Some("X"), "msg"); + let d2 = make_diag(Severity::Error, "rule-b", Some("X"), "msg"); + assert_ne!( + FailureSignature::from_diagnostic(&d1), + FailureSignature::from_diagnostic(&d2), + ); + } + + #[test] + fn signature_varies_by_artifact() { + let d1 = make_diag(Severity::Error, "rule-a", Some("A"), "msg"); + let d2 = make_diag(Severity::Error, "rule-a", Some("B"), "msg"); + assert_ne!( + FailureSignature::from_diagnostic(&d1), + FailureSignature::from_diagnostic(&d2), + ); + } + + #[test] + fn signature_varies_by_message() { + let d1 = make_diag(Severity::Error, "rule-a", Some("X"), "alpha"); + let d2 = make_diag(Severity::Error, "rule-a", Some("X"), "beta"); + assert_ne!( + FailureSignature::from_diagnostic(&d1), + FailureSignature::from_diagnostic(&d2), + ); + } + + #[test] + fn signature_handles_no_artifact() { + let d = make_diag(Severity::Warning, "parse-error", None, "bad yaml"); + let sig = FailureSignature::from_diagnostic(&d); + assert!( + sig.0.contains(":_:"), + "expected underscore for missing artifact, got {sig}" + ); + } + + #[test] + fn signature_format_has_four_parts() { + let d = make_diag(Severity::Error, "my-rule", Some("ART-1"), "something broke"); + let sig = FailureSignature::from_diagnostic(&d); + let parts: Vec<&str> = sig.0.split(':').collect(); + assert_eq!( + parts.len(), + 4, + "expected 4 colon-separated parts, got: {sig}" + ); + assert_eq!(parts[0], "error"); + assert_eq!(parts[1], "my-rule"); + assert_eq!(parts[2], "ART-1"); + // parts[3] is the hex hash + assert_eq!(parts[3].len(), 16, "hash should be 16 hex chars"); + } + + // ── Record tracking across runs ──────────────────────────────────── + + #[test] + fn first_run_all_new() { + let mut tracker = ConvergenceTracker::new(); + let diags = vec![ + make_diag(Severity::Error, "r1", Some("A"), "fail"), + make_diag(Severity::Warning, "r2", Some("B"), "warn"), + ]; + let report = tracker.record_run(&diags); + assert_eq!(report.run_number, 1); + assert_eq!(report.new_failures.len(), 2); + assert!(report.repeated_failures.is_empty()); + assert!(report.resolved_failures.is_empty()); + assert_eq!(report.strategy, RetryStrategy::Normal); + } + + #[test] + fn second_run_detects_repeated() { + let mut tracker = ConvergenceTracker::new(); + let diags = vec![make_diag(Severity::Error, "r1", Some("A"), "fail")]; + + let _r1 = tracker.record_run(&diags); + let r2 = tracker.record_run(&diags); + + assert_eq!(r2.run_number, 2); + assert_eq!(r2.repeated_failures.len(), 1); + assert!(r2.new_failures.is_empty()); + assert_eq!(r2.repeated_failures[0].1.occurrence_count, 2); + assert_eq!(r2.strategy, RetryStrategy::ExpandedContext); + } + + #[test] + fn third_run_escalates_to_different_approach() { + let mut tracker = ConvergenceTracker::new(); + let diags = vec![make_diag(Severity::Error, "r1", Some("A"), "fail")]; + + let _r1 = tracker.record_run(&diags); + let _r2 = tracker.record_run(&diags); + let r3 = tracker.record_run(&diags); + + assert_eq!(r3.strategy, RetryStrategy::DifferentApproach); + assert_eq!(r3.repeated_failures[0].1.occurrence_count, 3); + } + + #[test] + fn fourth_run_escalates_to_human_review() { + let mut tracker = ConvergenceTracker::new(); + let diags = vec![make_diag(Severity::Error, "r1", Some("A"), "fail")]; + + for _ in 0..4 { + tracker.record_run(&diags); + } + // The 4th run produces the report with HumanReview + // (already captured by the last record_run above, + // but let's do a 5th to verify 4+ stays at HumanReview) + let r5 = tracker.record_run(&diags); + assert_eq!(r5.strategy, RetryStrategy::HumanReview); + } + + // ── Strategy escalation ──────────────────────────────────────────── + + #[test] + fn strategy_escalation_sequence() { + assert_eq!(RetryStrategy::from_count(0), RetryStrategy::Normal); + assert_eq!(RetryStrategy::from_count(1), RetryStrategy::Normal); + assert_eq!(RetryStrategy::from_count(2), RetryStrategy::ExpandedContext); + assert_eq!( + RetryStrategy::from_count(3), + RetryStrategy::DifferentApproach + ); + assert_eq!(RetryStrategy::from_count(4), RetryStrategy::HumanReview); + assert_eq!(RetryStrategy::from_count(100), RetryStrategy::HumanReview); + } + + #[test] + fn worst_case_strategy_wins() { + let mut tracker = ConvergenceTracker::new(); + let persistent = make_diag(Severity::Error, "r1", Some("A"), "persistent"); + let fresh = make_diag(Severity::Warning, "r2", Some("B"), "new thing"); + + // Run 1-3 with persistent failure only. + for _ in 0..3 { + tracker.record_run(&[persistent.clone()]); + } + + // Run 4 adds a new failure alongside the persistent one. + let report = tracker.record_run(&[persistent.clone(), fresh]); + // persistent is now at count=4 -> HumanReview + // fresh is new -> Normal + // Overall strategy should be HumanReview (worst case). + assert_eq!(report.strategy, RetryStrategy::HumanReview); + assert_eq!(report.new_failures.len(), 1); + assert_eq!(report.repeated_failures.len(), 1); + } + + // ── Resolved failure detection ───────────────────────────────────── + + #[test] + fn resolved_failures_detected() { + let mut tracker = ConvergenceTracker::new(); + let d1 = make_diag(Severity::Error, "r1", Some("A"), "fail"); + let d2 = make_diag(Severity::Warning, "r2", Some("B"), "warn"); + + // Run 1: both failures present. + tracker.record_run(&[d1.clone(), d2.clone()]); + + // Run 2: only d1 remains. + let report = tracker.record_run(&[d1]); + + assert_eq!(report.resolved_failures.len(), 1); + let resolved_sig = FailureSignature::from_diagnostic(&d2); + assert!(report.resolved_failures.contains(&resolved_sig)); + } + + #[test] + fn resolved_failures_removed_from_tracker() { + let mut tracker = ConvergenceTracker::new(); + let d1 = make_diag(Severity::Error, "r1", Some("A"), "fail"); + + tracker.record_run(&[d1.clone()]); + // Run with no failures resolves d1. + let report = tracker.record_run(&[]); + assert_eq!(report.resolved_failures.len(), 1); + assert!(tracker.failures.is_empty()); + } + + #[test] + fn resolved_then_reintroduced() { + let mut tracker = ConvergenceTracker::new(); + let d = make_diag(Severity::Error, "r1", Some("A"), "fail"); + + // Run 1: introduce. + tracker.record_run(&[d.clone()]); + // Run 2: resolve. + tracker.record_run(&[]); + // Run 3: reintroduce — should be treated as new. + let report = tracker.record_run(&[d]); + assert_eq!(report.new_failures.len(), 1); + assert!(report.repeated_failures.is_empty()); + } + + // ── JSON serialization/deserialization ────────────────────────────── + + #[test] + fn json_round_trip() { + let mut tracker = ConvergenceTracker::new(); + let diags = vec![ + make_diag(Severity::Error, "r1", Some("A"), "fail"), + make_diag(Severity::Warning, "r2", Some("B"), "warn"), + ]; + tracker.record_run(&diags); + tracker.record_run(&diags); + + let json = tracker.to_json().unwrap(); + let restored = ConvergenceTracker::from_json(&json).unwrap(); + + assert_eq!(restored.run_count, tracker.run_count); + assert_eq!(restored.failures.len(), tracker.failures.len()); + for (sig, record) in &tracker.failures { + let restored_record = restored.failures.get(sig).expect("signature should exist"); + assert_eq!(restored_record, record); + } + } + + #[test] + fn json_empty_tracker() { + let tracker = ConvergenceTracker::new(); + let json = tracker.to_json().unwrap(); + let restored = ConvergenceTracker::from_json(&json).unwrap(); + assert_eq!(restored.run_count, 0); + assert!(restored.failures.is_empty()); + } + + #[test] + fn json_deserialization_from_literal() { + let json = r#"{ + "run_count": 2, + "failures": { + "error:r1:A:00cafe0000000000": { + "occurrence_count": 2, + "first_seen": 1, + "last_seen": 2 + } + } + }"#; + let tracker = ConvergenceTracker::from_json(json).unwrap(); + assert_eq!(tracker.run_count, 2); + assert_eq!(tracker.failures.len(), 1); + } + + // ── Report summary ───────────────────────────────────────────────── + + #[test] + fn report_summary_no_failures() { + let mut tracker = ConvergenceTracker::new(); + let report = tracker.record_run(&[]); + assert_eq!(report.summary(), "No failures tracked."); + } + + #[test] + fn report_summary_with_failures() { + let mut tracker = ConvergenceTracker::new(); + let d = make_diag(Severity::Error, "r1", Some("A"), "fail"); + tracker.record_run(&[d.clone()]); + let report = tracker.record_run(&[d]); + let summary = report.summary(); + assert!(summary.contains("1 repeated failure(s)"), "got: {summary}"); + assert!(summary.starts_with("Run #2"), "got: {summary}"); + } + + // ── Duplicate diagnostics in single run ──────────────────────────── + + #[test] + fn duplicate_diagnostics_collapsed() { + let mut tracker = ConvergenceTracker::new(); + let d = make_diag(Severity::Error, "r1", Some("A"), "fail"); + // Same diagnostic appears twice in one run. + let report = tracker.record_run(&[d.clone(), d]); + // Should count as a single failure, not two. + assert_eq!(report.new_failures.len(), 1); + assert_eq!(tracker.failures.len(), 1); + } +} diff --git a/rivet-core/src/lib.rs b/rivet-core/src/lib.rs index 6b2ef96..9d70231 100644 --- a/rivet-core/src/lib.rs +++ b/rivet-core/src/lib.rs @@ -1,6 +1,7 @@ pub mod adapter; pub mod bazel; pub mod commits; +pub mod convergence; pub mod coverage; pub mod db; pub mod diff; diff --git a/rivet-core/src/validate.rs b/rivet-core/src/validate.rs index ac63c4c..a88ba02 100644 --- a/rivet-core/src/validate.rs +++ b/rivet-core/src/validate.rs @@ -447,15 +447,18 @@ pub fn validate_documents(doc_store: &DocumentStore, store: &Store) -> Vec