From ea672bdd1557a8af5a871350f0f07d09ccc8825b Mon Sep 17 00:00:00 2001 From: Hai Huang Date: Thu, 30 Apr 2026 10:08:09 -0400 Subject: [PATCH] feat: deferred credential resolution with user consent MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Add runtime credential resolution that prompts the user (via OS-native dialog) before sharing secrets with sandboxes. When a provider is created with the sentinel value "openshell:deferred", credentials are not stored on the gateway. Instead, the CLI registers as a credential authority via a bidirectional gRPC stream, and the sandbox L7 proxy requests credentials on-demand when outbound API calls require them. Key components: - Proto: ResolveCredential and RegisterCredentialAuthority RPCs - Server: CredentialAuthorityRegistry with generation-based unregister - Sandbox: DeferredCredentialResolver with fail-closed semantics - CLI: credential_authority module with macOS/Linux dialog support The feature is fully backward compatible — existing providers with real credential values continue to work unchanged. Deferred mode is opt-in via the "openshell:deferred" sentinel in the credential value. Signed-off-by: Hai Huang Signed-off-by: Hai Huang --- crates/openshell-cli/Cargo.toml | 1 + .../openshell-cli/src/credential_authority.rs | 215 +++++++++++++++ crates/openshell-cli/src/lib.rs | 1 + crates/openshell-cli/src/main.rs | 17 ++ crates/openshell-cli/src/run.rs | 59 ++++ crates/openshell-cli/src/ssh.rs | 2 +- .../src/deferred_credentials.rs | 65 +++++ crates/openshell-sandbox/src/grpc_client.rs | 11 +- crates/openshell-sandbox/src/l7/relay.rs | 81 +++++- crates/openshell-sandbox/src/l7/rest.rs | 72 ++++- crates/openshell-sandbox/src/lib.rs | 103 ++++--- crates/openshell-sandbox/src/proxy.rs | 21 +- crates/openshell-sandbox/src/secrets.rs | 60 +++- .../openshell-server/src/grpc/credentials.rs | 259 ++++++++++++++++++ crates/openshell-server/src/grpc/mod.rs | 24 +- crates/openshell-server/src/grpc/policy.rs | 4 +- crates/openshell-server/src/grpc/provider.rs | 45 ++- crates/openshell-server/src/lib.rs | 4 + crates/openshell-server/src/sandbox_index.rs | 10 + proto/openshell.proto | 51 +++- 20 files changed, 1021 insertions(+), 84 deletions(-) create mode 100644 crates/openshell-cli/src/credential_authority.rs create mode 100644 crates/openshell-sandbox/src/deferred_credentials.rs create mode 100644 crates/openshell-server/src/grpc/credentials.rs diff --git a/crates/openshell-cli/Cargo.toml b/crates/openshell-cli/Cargo.toml index b3a006fdd..bf6065194 100644 --- a/crates/openshell-cli/Cargo.toml +++ b/crates/openshell-cli/Cargo.toml @@ -63,6 +63,7 @@ tokio-tungstenite = { workspace = true } # Streams futures = { workspace = true } +tokio-stream = { workspace = true } nix = { workspace = true } # URL parsing diff --git a/crates/openshell-cli/src/credential_authority.rs b/crates/openshell-cli/src/credential_authority.rs new file mode 100644 index 000000000..964f9c2f2 --- /dev/null +++ b/crates/openshell-cli/src/credential_authority.rs @@ -0,0 +1,215 @@ +// SPDX-FileCopyrightText: Copyright (c) 2025-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved. +// SPDX-License-Identifier: Apache-2.0 + +//! CLI-side credential authority for deferred provider secrets. +//! +//! When a sandbox uses deferred providers, the CLI opens a bidirectional gRPC +//! stream to the gateway (`RegisterCredentialAuthority`). The gateway relays +//! credential requests from sandbox supervisors; the CLI prompts the user via +//! an OS-native dialog and responds with the secret read from local env vars. + +use std::collections::HashMap; + +use miette::{IntoDiagnostic, Result}; +use openshell_core::proto::{CredentialRequest, CredentialResponse}; +use tokio::sync::mpsc; +use tokio_stream::wrappers::ReceiverStream; +use tonic::codec::Streaming; +use tracing::{debug, info, warn}; + +use crate::tls::GrpcClient; + +#[derive(Clone, Debug)] +enum ApprovalDecision { + Once(String), + Always(String), + Deny, +} + +/// Spawn the credential authority in the background (fire-and-forget). +pub fn spawn_credential_authority(client: GrpcClient) -> tokio::task::JoinHandle<()> { + tokio::spawn(async move { + if let Err(e) = run_credential_authority(client).await { + tracing::debug!("Credential authority exited: {e}"); + } + }) +} + +/// Run the credential authority event loop. +/// +/// Opens a bidirectional `RegisterCredentialAuthority` stream to the gateway +/// and handles incoming credential requests by prompting the user. This blocks +/// until the stream is closed (Ctrl-C or gateway disconnect). +pub async fn run_credential_authority(mut client: GrpcClient) -> Result<()> { + let (resp_tx, resp_rx) = mpsc::channel::(16); + + // Seed the client→server stream so the HTTP/2 body has an initial DATA + // frame. Without this, the empty stream is dropped by intermediaries + // (NodePort, Docker port mapping) before real traffic arrives. + resp_tx + .send(CredentialResponse { + request_id: String::new(), + approved: false, + value: String::new(), + }) + .await + .into_diagnostic()?; + + let response_stream = ReceiverStream::new(resp_rx); + + let mut request_stream: Streaming = client + .register_credential_authority(response_stream) + .await + .into_diagnostic()? + .into_inner(); + + let mut approval_cache: HashMap = HashMap::new(); + + info!("Listening for credential requests... (Ctrl-C to detach)"); + + while let Some(req) = request_stream + .message() + .await + .into_diagnostic()? + { + debug!( + env_key = %req.env_key, + destination_host = %req.destination_host, + sandbox_name = %req.sandbox_name, + "Credential request received" + ); + + let decision = if let Some(cached) = approval_cache.get(&req.env_key) { + cached.clone() + } else { + prompt_user(&req.sandbox_name, &req.env_key, &req.destination_host)? + }; + + let response = match &decision { + ApprovalDecision::Always(value) => { + approval_cache + .insert(req.env_key.clone(), ApprovalDecision::Always(value.clone())); + CredentialResponse { + request_id: req.request_id, + approved: true, + value: value.clone(), + } + } + ApprovalDecision::Once(value) => CredentialResponse { + request_id: req.request_id, + approved: true, + value: value.clone(), + }, + ApprovalDecision::Deny => { + approval_cache.insert(req.env_key.clone(), ApprovalDecision::Deny); + CredentialResponse { + request_id: req.request_id, + approved: false, + value: String::new(), + } + } + }; + + if resp_tx.send(response).await.is_err() { + warn!("Gateway stream closed, exiting credential authority"); + break; + } + } + + Ok(()) +} + +/// Show an OS-native dialog and read the secret from a local env var. +fn prompt_user( + sandbox_name: &str, + env_key: &str, + destination_host: &str, +) -> Result { + let choice = show_native_dialog(sandbox_name, env_key, destination_host)?; + + match choice.as_str() { + "Once" | "Always" => { + let value = std::env::var(env_key).map_err(|_| { + miette::miette!( + "{env_key} is not set in the local environment. \ + Set it and retry, or deny the request." + ) + })?; + if choice == "Always" { + Ok(ApprovalDecision::Always(value)) + } else { + Ok(ApprovalDecision::Once(value)) + } + } + _ => Ok(ApprovalDecision::Deny), + } +} + +/// Display an OS-native dialog asking the user to approve credential sharing. +/// +/// Returns "Once", "Always", or "Deny". +fn show_native_dialog( + sandbox_name: &str, + env_key: &str, + destination_host: &str, +) -> Result { + #[cfg(target_os = "macos")] + { + let script = format!( + r#"display dialog "Sandbox '{}' requests {}\nDestination: {}" buttons {{"Deny", "Once", "Always"}} default button "Once" with title "OpenShell Credential Request""#, + sandbox_name, env_key, destination_host + ); + let output = std::process::Command::new("osascript") + .arg("-e") + .arg(&script) + .output() + .into_diagnostic()?; + + let stdout = String::from_utf8_lossy(&output.stdout); + if stdout.contains("Always") { + Ok("Always".to_string()) + } else if stdout.contains("Once") { + Ok("Once".to_string()) + } else { + Ok("Deny".to_string()) + } + } + + #[cfg(all(not(target_os = "macos"), target_os = "linux"))] + { + let text = format!( + "Sandbox '{}' requests {}\nDestination: {}", + sandbox_name, env_key, destination_host + ); + let output = std::process::Command::new("zenity") + .args([ + "--list", + "--radiolist", + "--title=OpenShell Credential Request", + &format!("--text={text}"), + "--column=", + "--column=Choice", + "TRUE", + "Once", + "FALSE", + "Always", + "FALSE", + "Deny", + ]) + .output() + .into_diagnostic()?; + + let stdout = String::from_utf8_lossy(&output.stdout).trim().to_string(); + if stdout == "Always" || stdout == "Once" { + Ok(stdout) + } else { + Ok("Deny".to_string()) + } + } + + #[cfg(all(not(target_os = "macos"), not(target_os = "linux")))] + { + warn!("No native dialog available on this platform, defaulting to Deny"); + Ok("Deny".to_string()) + } +} diff --git a/crates/openshell-cli/src/lib.rs b/crates/openshell-cli/src/lib.rs index 1746547ef..a4868b56e 100644 --- a/crates/openshell-cli/src/lib.rs +++ b/crates/openshell-cli/src/lib.rs @@ -11,6 +11,7 @@ pub(crate) static TEST_ENV_LOCK: std::sync::Mutex<()> = std::sync::Mutex::new(() pub mod auth; pub mod bootstrap; pub mod completers; +pub mod credential_authority; pub mod edge_tunnel; pub(crate) mod policy_update; pub mod run; diff --git a/crates/openshell-cli/src/main.rs b/crates/openshell-cli/src/main.rs index 3502c2b07..537a0da00 100644 --- a/crates/openshell-cli/src/main.rs +++ b/crates/openshell-cli/src/main.rs @@ -2532,11 +2532,28 @@ async fn main() -> Result<()> { } SandboxCommands::Connect { name, editor } => { let name = resolve_sandbox_name(name, &ctx.name)?; + // Spawn credential authority for deferred providers. + let has_cred_authority = { + match openshell_cli::tls::grpc_client(endpoint, &tls).await { + Ok(cred_client) => { + tracing::debug!("Credential authority: connecting to gateway"); + openshell_cli::credential_authority::spawn_credential_authority(cred_client); + true + } + Err(e) => { + tracing::debug!("Credential authority: failed to connect: {e}"); + false + } + } + }; if let Some(editor) = editor.map(Into::into) { run::sandbox_connect_editor( endpoint, &ctx.name, &name, editor, &tls, ) .await?; + } else if has_cred_authority { + // Avoid exec() so the credential authority task stays alive. + openshell_cli::ssh::sandbox_connect_without_exec(endpoint, &name, &tls).await?; } else { run::sandbox_connect(endpoint, &name, &tls).await?; } diff --git a/crates/openshell-cli/src/run.rs b/crates/openshell-cli/src/run.rs index 87489014a..ca03bb75d 100644 --- a/crates/openshell-cli/src/run.rs +++ b/crates/openshell-cli/src/run.rs @@ -2458,6 +2458,19 @@ pub async fn sandbox_create( ); } + // Spawn credential authority in the background for deferred providers. + { + match grpc_client(&effective_server, &effective_tls).await { + Ok(cred_client) => { + tracing::debug!("Credential authority: connecting to gateway"); + crate::credential_authority::spawn_credential_authority(cred_client); + } + Err(e) => { + tracing::debug!("Credential authority: failed to connect: {e}"); + } + } + } + if let Some(editor) = editor { let ssh_gateway_name = effective_tls.gateway_name().unwrap_or(gateway_name); sandbox_connect_editor( @@ -3354,6 +3367,52 @@ async fn auto_create_provider( .discover_existing(provider_type) .map_err(|err| miette::miette!("failed to discover provider '{provider_type}': {err}"))?; let Some(discovered) = discovered else { + // No local credentials found — offer deferred mode + let env_keys = registry.credential_env_vars(provider_type); + if !env_keys.is_empty() && std::io::stdin().is_terminal() { + let use_deferred = Confirm::new() + .with_prompt("No local credentials found. Share on demand (deferred)?") + .default(true) + .interact() + .into_diagnostic()?; + if use_deferred { + let deferred_creds: HashMap = env_keys + .iter() + .map(|k| (k.to_string(), "openshell:deferred".to_string())) + .collect(); + let name = preferred_name + .map(|s| s.to_string()) + .unwrap_or_else(|| provider_type.to_string()); + let request = CreateProviderRequest { + provider: Some(Provider { + metadata: Some(openshell_core::proto::datamodel::v1::ObjectMeta { + id: String::new(), + name: name.clone(), + created_at_ms: 0, + labels: std::collections::HashMap::new(), + }), + r#type: provider_type.to_string(), + credentials: deferred_creds, + config: std::collections::HashMap::new(), + }), + }; + client + .create_provider(request) + .await + .map_err(|status| miette::miette!("failed to create deferred provider: {status}"))?; + eprintln!( + "{} Created provider {} ({}) [deferred — credentials will be requested on demand]", + "✓".green().bold(), + name, + provider_type, + ); + if seen_names.insert(name.clone()) { + configured_names.push(name); + } + eprintln!(); + return Ok(()); + } + } eprintln!( "{} No existing local credentials/config found for '{}'. You can configure it from inside the sandbox.", "!".yellow(), diff --git a/crates/openshell-cli/src/ssh.rs b/crates/openshell-cli/src/ssh.rs index f883d9684..600c4164e 100644 --- a/crates/openshell-cli/src/ssh.rs +++ b/crates/openshell-cli/src/ssh.rs @@ -277,7 +277,7 @@ pub async fn sandbox_connect(server: &str, name: &str, tls: &TlsOptions) -> Resu sandbox_connect_with_mode(server, name, tls, true).await } -pub(crate) async fn sandbox_connect_without_exec( +pub async fn sandbox_connect_without_exec( server: &str, name: &str, tls: &TlsOptions, diff --git a/crates/openshell-sandbox/src/deferred_credentials.rs b/crates/openshell-sandbox/src/deferred_credentials.rs new file mode 100644 index 000000000..520f39b93 --- /dev/null +++ b/crates/openshell-sandbox/src/deferred_credentials.rs @@ -0,0 +1,65 @@ +// SPDX-FileCopyrightText: Copyright (c) 2025-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved. +// SPDX-License-Identifier: Apache-2.0 + +use miette::{IntoDiagnostic, Result}; +use openshell_core::proto::open_shell_client::OpenShellClient; +use openshell_core::proto::{ResolveCredentialRequest, ResolveCredentialResponse}; +use tonic::transport::Channel; +use tracing::debug; + +use crate::secrets::SecretResolver; + +/// Handle for resolving deferred credentials via gRPC callback to the gateway, +/// which relays to the CLI credential authority. +#[derive(Clone)] +pub struct DeferredCredentialResolver { + client: OpenShellClient, + sandbox_id: String, +} + +impl DeferredCredentialResolver { + pub fn new(client: OpenShellClient, sandbox_id: String) -> Self { + Self { client, sandbox_id } + } + + /// Resolve a deferred placeholder by calling the gateway. + /// + /// The gateway relays the request to the CLI's bidirectional + /// `RegisterCredentialAuthority` stream, where the user approves/denies + /// via an OS-native dialog. This call blocks until a response arrives + /// or the RPC times out. + pub async fn resolve( + &self, + placeholder: &str, + destination_host: &str, + ) -> Result { + let env_key = SecretResolver::env_key_for_placeholder(placeholder) + .unwrap_or(placeholder) + .to_string(); + + debug!( + sandbox_id = %self.sandbox_id, + env_key = %env_key, + destination_host = %destination_host, + "Requesting deferred credential from CLI" + ); + + let response: ResolveCredentialResponse = self + .client + .clone() + .resolve_credential(ResolveCredentialRequest { + sandbox_id: self.sandbox_id.clone(), + env_key, + destination_host: destination_host.to_string(), + }) + .await + .into_diagnostic()? + .into_inner(); + + if response.approved { + Ok(response.value) + } else { + Err(miette::miette!("credential request denied by user")) + } + } +} diff --git a/crates/openshell-sandbox/src/grpc_client.rs b/crates/openshell-sandbox/src/grpc_client.rs index 0af6476c5..dfff60955 100644 --- a/crates/openshell-sandbox/src/grpc_client.rs +++ b/crates/openshell-sandbox/src/grpc_client.rs @@ -193,13 +193,13 @@ pub async fn sync_policy(endpoint: &str, sandbox: &str, policy: &ProtoSandboxPol /// Fetch provider environment variables for a sandbox from OpenShell server via gRPC. /// -/// Returns a map of environment variable names to values derived from provider -/// credentials configured on the sandbox. Returns an empty map if the sandbox -/// has no providers or the call fails. +/// Returns two maps: +/// - credentials: env vars subject to placeholder resolution (deferred/proxy) +/// - config: env vars injected literally into the child process pub async fn fetch_provider_environment( endpoint: &str, sandbox_id: &str, -) -> Result> { +) -> Result<(HashMap, HashMap)> { debug!(endpoint = %endpoint, sandbox_id = %sandbox_id, "Fetching provider environment"); let mut client = connect(endpoint).await?; @@ -211,7 +211,8 @@ pub async fn fetch_provider_environment( .await .into_diagnostic()?; - Ok(response.into_inner().environment) + let inner = response.into_inner(); + Ok((inner.environment, inner.config_environment)) } /// A reusable gRPC client for the OpenShell service. diff --git a/crates/openshell-sandbox/src/l7/relay.rs b/crates/openshell-sandbox/src/l7/relay.rs index a0e54062d..37cd7a9b4 100644 --- a/crates/openshell-sandbox/src/l7/relay.rs +++ b/crates/openshell-sandbox/src/l7/relay.rs @@ -35,6 +35,8 @@ pub struct L7EvalContext { pub cmdline_paths: Vec, /// Supervisor-only placeholder resolver for outbound headers. pub(crate) secret_resolver: Option>, + /// Handle for resolving deferred credentials via gateway→CLI callback. + pub(crate) credential_resolver: Option>, } /// Run protocol-aware L7 inspection on a tunnel. @@ -136,6 +138,10 @@ where allow_encoded_slash: config.allow_encoded_slash, ..Default::default() }); + // Local resolver that can be updated when deferred credentials are resolved. + // Cloned from the shared context so mutations don't affect other connections. + let mut local_resolver: Option> = ctx.secret_resolver.clone(); + loop { // Parse one HTTP request from client let req = match provider.parse_request(client).await { @@ -165,20 +171,55 @@ where // Rewrite credential placeholders in the request target BEFORE OPA // evaluation. OPA sees the redacted path; the resolved path goes only // to the upstream write. - let (eval_target, redacted_target) = if let Some(ref resolver) = ctx.secret_resolver { - match secrets::rewrite_target_for_eval(&req.target, resolver) { + let (eval_target, redacted_target) = if local_resolver.is_some() { + let resolver_ref = local_resolver.as_ref().unwrap(); + match secrets::rewrite_target_for_eval(&req.target, resolver_ref) { Ok(result) => (result.resolved, result.redacted), Err(e) => { - warn!( - host = %ctx.host, - port = ctx.port, - error = %e, - "credential resolution failed in request target, rejecting" - ); - let response = b"HTTP/1.1 500 Internal Server Error\r\nContent-Length: 0\r\nConnection: close\r\n\r\n"; - client.write_all(response).await.into_diagnostic()?; - client.flush().await.into_diagnostic()?; - return Ok(()); + // Check if this is a deferred credential that can be resolved at runtime + if let (Some(placeholder), Some(cred_resolver)) = (e.placeholder.as_ref(), &ctx.credential_resolver) { + debug!( + host = %ctx.host, + placeholder = %placeholder, + "Deferred credential in request target, resolving via CLI callback" + ); + match cred_resolver.resolve(placeholder, &ctx.host).await { + Ok(value) => { + let mut updated = (*resolver_ref).as_ref().clone(); + updated.insert_resolved(placeholder.clone(), value); + local_resolver = Some(Arc::new(updated)); + // Retry with the updated resolver + match secrets::rewrite_target_for_eval(&req.target, local_resolver.as_ref().unwrap()) { + Ok(result) => (result.resolved, result.redacted), + Err(retry_err) => { + warn!(host = %ctx.host, error = %retry_err, "credential resolution still failed after deferred resolve"); + let response = b"HTTP/1.1 500 Internal Server Error\r\nContent-Length: 0\r\nConnection: close\r\n\r\n"; + client.write_all(response).await.into_diagnostic()?; + client.flush().await.into_diagnostic()?; + return Ok(()); + } + } + } + Err(deny_err) => { + info!(host = %ctx.host, error = %deny_err, "Deferred credential denied by user"); + let response = b"HTTP/1.1 403 Forbidden\r\nContent-Length: 0\r\nConnection: close\r\n\r\n"; + client.write_all(response).await.into_diagnostic()?; + client.flush().await.into_diagnostic()?; + return Ok(()); + } + } + } else { + warn!( + host = %ctx.host, + port = ctx.port, + error = %e, + "credential resolution failed in request target, rejecting" + ); + let response = b"HTTP/1.1 500 Internal Server Error\r\nContent-Length: 0\r\nConnection: close\r\n\r\n"; + client.write_all(response).await.into_diagnostic()?; + client.flush().await.into_diagnostic()?; + return Ok(()); + } } } } else { @@ -258,12 +299,22 @@ where let _ = &eval_target; if allowed || config.enforcement == EnforcementMode::Audit { - // Forward request to upstream and relay response + // Forward request to upstream and relay response. + // Use deferred-aware relay that can resolve credentials mid-request. + tracing::debug!( + host = %ctx.host, + has_resolver = local_resolver.is_some(), + has_cred_resolver = ctx.credential_resolver.is_some(), + "L7 relay: forwarding request" + ); let outcome = crate::l7::rest::relay_http_request_with_resolver( &req, client, upstream, - ctx.secret_resolver.as_deref(), + None, + Some(&mut local_resolver), + ctx.credential_resolver.as_ref(), + Some(&ctx.host), ) .await?; match outcome { @@ -452,7 +503,7 @@ where // relay_http_request_with_resolver handles both directions: it sends // the request upstream and reads the response back to the client. let outcome = - crate::l7::rest::relay_http_request_with_resolver(&req, client, upstream, resolver) + crate::l7::rest::relay_http_request_with_resolver(&req, client, upstream, resolver, None, None, None) .await?; match outcome { diff --git a/crates/openshell-sandbox/src/l7/rest.rs b/crates/openshell-sandbox/src/l7/rest.rs index 4d8909b9e..10fe176de 100644 --- a/crates/openshell-sandbox/src/l7/rest.rs +++ b/crates/openshell-sandbox/src/l7/rest.rs @@ -11,6 +11,7 @@ use crate::l7::provider::{BodyLength, L7Provider, L7Request, RelayOutcome}; use crate::secrets::rewrite_http_header_block; use miette::{IntoDiagnostic, Result, miette}; use std::collections::HashMap; +use std::sync::Arc; use tokio::io::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt}; use tracing::{debug, warn}; @@ -325,7 +326,7 @@ where C: AsyncRead + AsyncWrite + Unpin, U: AsyncRead + AsyncWrite + Unpin, { - relay_http_request_with_resolver(req, client, upstream, None).await + relay_http_request_with_resolver(req, client, upstream, None, None, None, None).await } pub(crate) async fn relay_http_request_with_resolver( @@ -333,6 +334,9 @@ pub(crate) async fn relay_http_request_with_resolver( client: &mut C, upstream: &mut U, resolver: Option<&crate::secrets::SecretResolver>, + local_resolver: Option<&mut Option>>, + credential_resolver: Option<&Arc>, + host: Option<&str>, ) -> Result where C: AsyncRead + AsyncWrite + Unpin, @@ -344,8 +348,48 @@ where .position(|w| w == b"\r\n\r\n") .map_or(req.raw_header.len(), |p| p + 4); - let rewrite_result = rewrite_http_header_block(&req.raw_header[..header_end], resolver) - .map_err(|e| miette!("credential injection failed: {e}"))?; + let rewrite_result = if let (Some(local_resolver), Some(host)) = (local_resolver, host) { + // Deferred path: try rewrite, on unresolved placeholder resolve via CLI callback. + match rewrite_http_header_block(&req.raw_header[..header_end], local_resolver.as_deref()) { + Ok(result) => result, + Err(e) => { + tracing::debug!( + host = %host, + placeholder = ?e.placeholder, + has_credential_resolver = credential_resolver.is_some(), + "Unresolved placeholder detected in headers" + ); + if let (Some(placeholder), Some(cred_resolver)) = (e.placeholder.as_ref(), credential_resolver) { + tracing::debug!( + host = %host, + placeholder = %placeholder, + "Deferred credential in headers, resolving via CLI callback" + ); + match cred_resolver.resolve(placeholder, host).await { + Ok(value) => { + tracing::debug!(host = %host, "Deferred credential resolved successfully"); + let mut updated = (**local_resolver.as_ref().unwrap()).clone(); + updated.insert_resolved(placeholder.clone(), value); + *local_resolver = Some(Arc::new(updated)); + rewrite_http_header_block(&req.raw_header[..header_end], local_resolver.as_deref()) + .map_err(|e| miette!("credential injection failed after deferred resolve: {e}"))? + } + Err(resolve_err) => { + tracing::warn!(host = %host, error = %resolve_err, "Deferred credential resolution failed"); + return Err(miette!("deferred credential denied by user for {host}")); + } + } + } else { + tracing::warn!(host = %host, "No credential resolver available, failing"); + return Err(miette!("credential injection failed: {e}")); + } + } + } + } else { + // Simple path: direct rewrite with optional static resolver. + rewrite_http_header_block(&req.raw_header[..header_end], resolver) + .map_err(|e| miette!("credential injection failed: {e}"))? + }; upstream .write_all(&rewrite_result.rewritten) @@ -374,11 +418,6 @@ where let outcome = relay_response(&req.action, upstream, client).await?; - // Validate that the client actually requested an upgrade before accepting - // a 101 from upstream. Per RFC 9110 Section 7.8, the server MUST NOT send - // 101 unless the client sent Upgrade + Connection: Upgrade headers. A - // non-compliant or malicious upstream could send an unsolicited 101 to - // bypass L7 inspection. if matches!(outcome, RelayOutcome::Upgraded { .. }) { let header_str = String::from_utf8_lossy(&req.raw_header[..header_end]); if !client_requested_upgrade(&header_str) { @@ -1886,6 +1925,9 @@ mod tests { &mut proxy_to_client, &mut proxy_to_upstream, None, + None, + None, + None, ), ) .await @@ -1943,6 +1985,9 @@ mod tests { &mut proxy_to_client, &mut proxy_to_upstream, None, + None, + None, + None, ), ) .await @@ -2068,6 +2113,9 @@ mod tests { &mut proxy_to_client, &mut proxy_to_upstream, resolver.as_ref(), + None, + None, + None, ), ) .await @@ -2151,7 +2199,10 @@ mod tests { &req, &mut proxy_to_client, &mut proxy_to_upstream, - None, // <-- No resolver, as in the L4 raw tunnel path + None, + None, + None, + None, ), ) .await @@ -2240,6 +2291,9 @@ mod tests { &mut proxy_to_client, &mut proxy_to_upstream, resolver, + None, + None, + None, ), ) .await diff --git a/crates/openshell-sandbox/src/lib.rs b/crates/openshell-sandbox/src/lib.rs index 34ee80bb5..bb64ff2d2 100644 --- a/crates/openshell-sandbox/src/lib.rs +++ b/crates/openshell-sandbox/src/lib.rs @@ -7,6 +7,7 @@ pub mod bypass_monitor; mod child_env; +pub mod deferred_credentials; pub mod denial_aggregator; mod grpc_client; mod identity; @@ -267,43 +268,82 @@ pub async fn run_sandbox( // Fetch provider environment variables from the server. // This is done after loading the policy so the sandbox can still start // even if provider env fetch fails (graceful degradation). - let provider_env = if let (Some(id), Some(endpoint)) = (&sandbox_id, &openshell_endpoint) { - match grpc_client::fetch_provider_environment(endpoint, id).await { - Ok(env) => { - ocsf_emit!( - ConfigStateChangeBuilder::new(ocsf_ctx()) - .severity(SeverityId::Informational) - .status(StatusId::Success) - .state(StateId::Enabled, "loaded") - .message(format!( - "Fetched provider environment [env_count:{}]", - env.len() - )) - .build() - ); - env + let (credential_env, config_env) = + if let (Some(id), Some(endpoint)) = (&sandbox_id, &openshell_endpoint) { + match grpc_client::fetch_provider_environment(endpoint, id).await { + Ok((creds, config)) => { + ocsf_emit!( + ConfigStateChangeBuilder::new(ocsf_ctx()) + .severity(SeverityId::Informational) + .status(StatusId::Success) + .state(StateId::Enabled, "loaded") + .message(format!( + "Fetched provider environment [cred_count:{}, config_count:{}]", + creds.len(), + config.len() + )) + .build() + ); + (creds, config) + } + Err(e) => { + ocsf_emit!( + ConfigStateChangeBuilder::new(ocsf_ctx()) + .severity(SeverityId::Medium) + .status(StatusId::Failure) + .state(StateId::Other, "degraded") + .message(format!( + "Failed to fetch provider environment, continuing without: {e}" + )) + .build() + ); + (std::collections::HashMap::new(), std::collections::HashMap::new()) + } } - Err(e) => { - ocsf_emit!( - ConfigStateChangeBuilder::new(ocsf_ctx()) - .severity(SeverityId::Medium) - .status(StatusId::Failure) - .state(StateId::Other, "degraded") - .message(format!( - "Failed to fetch provider environment, continuing without: {e}" - )) - .build() - ); - std::collections::HashMap::new() + } else { + (std::collections::HashMap::new(), std::collections::HashMap::new()) + }; + + let (mut provider_env, secret_resolver) = SecretResolver::from_provider_env(credential_env); + // Config env vars are injected literally (not through SecretResolver). + for (key, value) in config_env { + provider_env.entry(key).or_insert(value); + } + let secret_resolver = secret_resolver.map(Arc::new); + + // Create deferred credential resolver if any provider uses deferred secrets. + let has_deferred = secret_resolver.as_ref().map_or(false, |r| r.has_deferred()); + tracing::debug!( + has_deferred = has_deferred, + has_sandbox_id = sandbox_id.is_some(), + has_endpoint = openshell_endpoint.is_some(), + "Deferred credential resolver check" + ); + let deferred_resolver = if has_deferred { + if let (Some(id), Some(endpoint)) = (&sandbox_id, &openshell_endpoint) { + match grpc_client::CachedOpenShellClient::connect(endpoint).await { + Ok(client) => { + tracing::debug!("Deferred credential resolver created successfully"); + Some(Arc::new( + deferred_credentials::DeferredCredentialResolver::new( + client.raw_client(), + id.clone(), + ), + )) + } + Err(e) => { + tracing::warn!("Failed to create deferred credential resolver: {e}"); + None + } } + } else { + tracing::warn!("Missing sandbox_id or endpoint for deferred resolver"); + None } } else { - std::collections::HashMap::new() + None }; - let (provider_env, secret_resolver) = SecretResolver::from_provider_env(provider_env); - let secret_resolver = secret_resolver.map(Arc::new); - // Create identity cache for SHA256 TOFU when OPA is active let identity_cache = opa_engine .as_ref() @@ -480,6 +520,7 @@ pub async fn run_sandbox( inference_ctx, secret_resolver.clone(), denial_tx, + deferred_resolver.clone(), ) .await?; (Some(proxy_handle), denial_rx, bypass_denial_tx) diff --git a/crates/openshell-sandbox/src/proxy.rs b/crates/openshell-sandbox/src/proxy.rs index 69ff97b04..ed8367d7f 100644 --- a/crates/openshell-sandbox/src/proxy.rs +++ b/crates/openshell-sandbox/src/proxy.rs @@ -150,6 +150,7 @@ impl ProxyHandle { inference_ctx: Option>, secret_resolver: Option>, denial_tx: Option>, + deferred_resolver: Option>, ) -> Result { // Use override bind_addr, fall back to policy http_addr, then default // to loopback:3128. The default allows the proxy to function when no @@ -189,9 +190,10 @@ impl ProxyHandle { let inf = inference_ctx.clone(); let resolver = secret_resolver.clone(); let dtx = denial_tx.clone(); + let deferred = deferred_resolver.clone(); tokio::spawn(async move { if let Err(err) = handle_tcp_connection( - stream, opa, cache, spid, tls, inf, resolver, dtx, + stream, opa, cache, spid, tls, inf, resolver, dtx, deferred, ) .await { @@ -304,6 +306,7 @@ async fn handle_tcp_connection( inference_ctx: Option>, secret_resolver: Option>, denial_tx: Option>, + deferred_resolver: Option>, ) -> Result<()> { let mut buf = vec![0u8; MAX_HEADER_BYTES]; let mut used = 0usize; @@ -348,6 +351,7 @@ async fn handle_tcp_connection( entrypoint_pid, secret_resolver, denial_tx.as_ref(), + deferred_resolver, ) .await; } @@ -661,6 +665,16 @@ async fn handle_tcp_connection( // Check if endpoint has L7 config for protocol-aware inspection let l7_config = query_l7_config(&opa_engine, &decision, &host_lc, port); + tracing::debug!( + "CONNECT {host}:{port} l7={l7} tls={tls} resolver={resolver} deferred={deferred}", + host = host_lc, + port = port, + l7 = l7_config.is_some(), + tls = tls_state.is_some(), + resolver = secret_resolver.is_some(), + deferred = deferred_resolver.is_some(), + ); + // Log the allowed CONNECT — use CONNECT_L7 when L7 inspection follows, // so log consumers can distinguish L4-only decisions from tunnel lifecycle events. let connect_msg = if l7_config.is_some() { @@ -713,6 +727,7 @@ async fn handle_tcp_connection( .map(|p| p.to_string_lossy().into_owned()) .collect(), secret_resolver: secret_resolver.clone(), + credential_resolver: deferred_resolver.clone(), }; if effective_tls_skip { @@ -2243,7 +2258,7 @@ fn rewrite_forward_request( if secret_resolver.is_some() { let output_str = String::from_utf8_lossy(&output); if output_str.contains(crate::secrets::PLACEHOLDER_PREFIX_PUBLIC) { - return Err(crate::secrets::UnresolvedPlaceholderError { location: "header" }); + return Err(crate::secrets::UnresolvedPlaceholderError { location: "header", placeholder: None }); } } @@ -2267,6 +2282,7 @@ async fn handle_forward_proxy( entrypoint_pid: Arc, secret_resolver: Option>, denial_tx: Option<&mpsc::UnboundedSender>, + deferred_resolver: Option>, ) -> Result<()> { // 1. Parse the absolute-form URI. `path` is marked `mut` so that, when an // L7 config applies, the canonicalized form produced below replaces it @@ -2450,6 +2466,7 @@ async fn handle_forward_proxy( .map(|p| p.to_string_lossy().into_owned()) .collect(), secret_resolver: secret_resolver.clone(), + credential_resolver: deferred_resolver.clone(), }; // Canonicalize the request-target. The canonical form is fed to OPA diff --git a/crates/openshell-sandbox/src/secrets.rs b/crates/openshell-sandbox/src/secrets.rs index a27537c91..5e7abb68e 100644 --- a/crates/openshell-sandbox/src/secrets.rs +++ b/crates/openshell-sandbox/src/secrets.rs @@ -3,10 +3,14 @@ use base64::Engine as _; use std::collections::HashMap; +use std::collections::HashSet; use std::fmt; const PLACEHOLDER_PREFIX: &str = "openshell:resolve:env:"; +/// Sentinel value stored in provider credentials for deferred (on-demand) secrets. +pub(crate) const DEFERRED_SENTINEL: &str = "openshell:deferred"; + /// Public access to the placeholder prefix for fail-closed scanning in other modules. pub(crate) const PLACEHOLDER_PREFIX_PUBLIC: &str = PLACEHOLDER_PREFIX; @@ -16,6 +20,19 @@ fn is_env_key_char(b: u8) -> bool { b.is_ascii_alphanumeric() || b == b'_' } +fn extract_placeholder_from_str(s: &str) -> Option { + if let Some(start) = s.find(PLACEHOLDER_PREFIX) { + let rest = &s[start..]; + let end = rest + .bytes() + .position(|b| !is_env_key_char(b) && b != b':') + .unwrap_or(rest.len()); + Some(rest[..end].to_string()) + } else { + None + } +} + // --------------------------------------------------------------------------- // Error and result types // --------------------------------------------------------------------------- @@ -25,6 +42,7 @@ fn is_env_key_char(b: u8) -> bool { #[derive(Debug)] pub(crate) struct UnresolvedPlaceholderError { pub location: &'static str, // "header", "query_param", "path" + pub placeholder: Option, } impl fmt::Display for UnresolvedPlaceholderError { @@ -64,6 +82,7 @@ pub(crate) struct RewriteTargetResult { #[derive(Debug, Clone, Default)] pub struct SecretResolver { by_placeholder: HashMap, + deferred_keys: HashSet, } impl SecretResolver { @@ -76,14 +95,36 @@ impl SecretResolver { let mut child_env = HashMap::with_capacity(provider_env.len()); let mut by_placeholder = HashMap::with_capacity(provider_env.len()); + let mut deferred_keys = HashSet::new(); for (key, value) in provider_env { let placeholder = placeholder_for_env_key(&key); child_env.insert(key, placeholder.clone()); - by_placeholder.insert(placeholder, value); + if value == DEFERRED_SENTINEL { + deferred_keys.insert(placeholder); + } else { + by_placeholder.insert(placeholder, value); + } } - (child_env, Some(Self { by_placeholder })) + (child_env, Some(Self { by_placeholder, deferred_keys })) + } + + pub(crate) fn has_deferred(&self) -> bool { + !self.deferred_keys.is_empty() + } + + pub(crate) fn is_deferred(&self, placeholder: &str) -> bool { + self.deferred_keys.contains(placeholder) + } + + pub(crate) fn insert_resolved(&mut self, placeholder: String, value: String) { + self.deferred_keys.remove(&placeholder); + self.by_placeholder.insert(placeholder, value); + } + + pub(crate) fn env_key_for_placeholder(placeholder: &str) -> Option<&str> { + placeholder.strip_prefix(PLACEHOLDER_PREFIX) } /// Resolve a placeholder string to the real secret value. @@ -485,12 +526,12 @@ fn rewrite_path_segment( %reason, "credential resolution rejected: resolved value unsafe for path" ); - UnresolvedPlaceholderError { location: "path" } + UnresolvedPlaceholderError { location: "path", placeholder: Some(full_placeholder.to_string()) } })?; resolved.push_str(secret); redacted.push_str("[CREDENTIAL]"); } else { - return Err(UnresolvedPlaceholderError { location: "path" }); + return Err(UnresolvedPlaceholderError { location: "path", placeholder: Some(full_placeholder.to_string()) }); } pos = key_end; } else { @@ -527,9 +568,9 @@ fn rewrite_uri_query_params( redacted_params.push(format!("{key}=[CREDENTIAL]")); any_rewritten = true; } else if decoded_value.contains(PLACEHOLDER_PREFIX) { - // Placeholder detected but not resolved return Err(UnresolvedPlaceholderError { location: "query_param", + placeholder: Some(decoded_value.to_string()), }); } else { resolved_params.push(param.to_string()); @@ -611,14 +652,16 @@ pub(crate) fn rewrite_http_header_block( // in both raw form and percent-decoded form of the output header block. let output_header = String::from_utf8_lossy(&output[..output.len().min(header_end + 256)]); if output_header.contains(PLACEHOLDER_PREFIX) { - return Err(UnresolvedPlaceholderError { location: "header" }); + let ph = extract_placeholder_from_str(&output_header); + return Err(UnresolvedPlaceholderError { location: "header", placeholder: ph }); } // Also check percent-decoded form of the request line (F5 — encoded placeholder bypass) let rewritten_rl = output_header.split("\r\n").next().unwrap_or(""); let decoded_rl = percent_decode(rewritten_rl); if decoded_rl.contains(PLACEHOLDER_PREFIX) { - return Err(UnresolvedPlaceholderError { location: "path" }); + let ph = extract_placeholder_from_str(&decoded_rl); + return Err(UnresolvedPlaceholderError { location: "path", placeholder: ph }); } Ok(RewriteResult { @@ -650,7 +693,8 @@ pub(crate) fn rewrite_target_for_eval( // Also check percent-decoded form let decoded = percent_decode(target); if decoded.contains(PLACEHOLDER_PREFIX) { - return Err(UnresolvedPlaceholderError { location: "path" }); + let ph = extract_placeholder_from_str(&decoded); + return Err(UnresolvedPlaceholderError { location: "path", placeholder: ph }); } return Ok(RewriteTargetResult { resolved: target.to_string(), diff --git a/crates/openshell-server/src/grpc/credentials.rs b/crates/openshell-server/src/grpc/credentials.rs new file mode 100644 index 000000000..e8b384dd0 --- /dev/null +++ b/crates/openshell-server/src/grpc/credentials.rs @@ -0,0 +1,259 @@ +// SPDX-FileCopyrightText: Copyright (c) 2025-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved. +// SPDX-License-Identifier: Apache-2.0 + +//! Deferred credential relay between sandbox supervisors and CLI credential authorities. + +use std::collections::HashMap; +use std::sync::Arc; +use std::sync::atomic::{AtomicU64, Ordering}; + +use openshell_core::proto::{ + CredentialRequest, CredentialResponse, ResolveCredentialRequest, ResolveCredentialResponse, +}; +use tokio::sync::{Mutex, mpsc, oneshot}; +use tokio_stream::StreamExt; +use tonic::{Request, Response, Status}; +use tracing::{debug, info, warn}; +use uuid::Uuid; + +use crate::ServerState; + +/// Registry of connected CLI credential authorities and pending resolution requests. +#[derive(Debug, Default)] +pub struct CredentialAuthorityRegistry { + /// CLI authority streams, keyed by a session identifier (typically user/sandbox scope). + /// Each entry includes a generation ID to prevent stale reader tasks from + /// removing newer registrations. + authorities: Mutex)>>, + /// Monotonically increasing generation counter for authority registrations. + generation: AtomicU64, + /// Pending credential resolutions waiting for a CLI response. + /// Keyed by request_id, completed when the CLI sends back a CredentialResponse. + pending: Mutex>>, +} + +impl CredentialAuthorityRegistry { + pub fn new() -> Self { + Self::default() + } + + /// Resolve a deferred credential by forwarding to the registered CLI authority. + /// + /// Returns the CLI's response or an error if no authority is registered or the + /// authority disconnected/timed out. + pub async fn resolve( + &self, + req: ResolveCredentialRequest, + sandbox_name: Option, + ) -> Result { + let request_id = Uuid::new_v4().to_string(); + let sandbox_id = req.sandbox_id.clone(); + + debug!( + request_id = %request_id, + sandbox_id = %sandbox_id, + env_key = %req.env_key, + "resolve: looking up authority" + ); + + // Find a registered authority for this sandbox (or a global authority). + let (authority_tx, registered_keys) = { + let authorities = self.authorities.lock().await; + let keys: Vec<_> = authorities.keys().cloned().collect(); + debug!(registered_authorities = ?keys, "resolve: registered authorities"); + let tx = authorities + .get(&sandbox_id) + .or_else(|| authorities.get("global")) + .map(|(_, sender)| sender.clone()); + (tx, keys) + }; + + let Some(authority_tx) = authority_tx else { + warn!("resolve: no authority found (sandbox_id={sandbox_id}, registered={registered_keys:?})"); + return Err(Status::unavailable( + "no credential authority registered — is the CLI connected?", + )); + }; + + debug!(request_id = %request_id, "resolve: authority found, creating oneshot"); + + // Create a oneshot channel for the response + let (resp_tx, resp_rx) = oneshot::channel(); + { + let mut pending = self.pending.lock().await; + pending.insert(request_id.clone(), resp_tx); + } + + // Send the request to the CLI + let cred_request = CredentialRequest { + request_id: request_id.clone(), + sandbox_id: sandbox_id.clone(), + sandbox_name: sandbox_name.unwrap_or_else(|| sandbox_id.clone()), + env_key: req.env_key.clone(), + destination_host: req.destination_host.clone(), + }; + + if authority_tx.send(cred_request).await.is_err() { + warn!(request_id = %request_id, "resolve: authority_tx.send failed — CLI disconnected"); + self.pending.lock().await.remove(&request_id); + return Err(Status::unavailable("credential authority disconnected")); + } + + debug!(request_id = %request_id, "resolve: request sent to CLI, waiting for response"); + + // Wait for the CLI response (with timeout) + let timeout_result = + tokio::time::timeout(std::time::Duration::from_secs(60), resp_rx).await; + + let response = match timeout_result { + Ok(Ok(resp)) => { + debug!(request_id = %request_id, approved = resp.approved, "resolve: got CLI response"); + resp + } + Ok(Err(_)) => { + warn!(request_id = %request_id, "resolve: oneshot channel dropped"); + self.pending.lock().await.remove(&request_id); + return Err(Status::internal("credential authority channel dropped")); + } + Err(_) => { + warn!(request_id = %request_id, "resolve: 60s timeout"); + self.pending.lock().await.remove(&request_id); + return Err(Status::deadline_exceeded( + "credential request timed out (60s)", + )); + } + }; + + Ok(ResolveCredentialResponse { + approved: response.approved, + value: response.value, + }) + } + + /// Register a CLI as credential authority and relay requests/responses. + /// Returns a generation ID that must be passed to `unregister_authority`. + pub async fn register_authority( + &self, + scope: String, + request_tx: mpsc::Sender, + ) -> u64 { + let reg_id = self.generation.fetch_add(1, Ordering::Relaxed); + let mut authorities = self.authorities.lock().await; + authorities.insert(scope.clone(), (reg_id, request_tx)); + let keys: Vec<_> = authorities.keys().cloned().collect(); + info!("register_authority: scope={scope} reg_id={reg_id} total={keys:?}"); + reg_id + } + + /// Remove a registered authority, but only if the generation matches. + /// Prevents stale reader tasks from removing a newer registration. + pub async fn unregister_authority(&self, scope: &str, reg_id: u64) { + let mut authorities = self.authorities.lock().await; + if let Some((stored_reg_id, _)) = authorities.get(scope) { + if *stored_reg_id == reg_id { + authorities.remove(scope); + let keys: Vec<_> = authorities.keys().cloned().collect(); + info!("unregister_authority: scope={scope} reg_id={reg_id} remaining={keys:?}"); + } else { + debug!("unregister_authority: scope={scope} reg_id={reg_id} skipped (current reg_id={})", stored_reg_id); + } + } + } + + /// Complete a pending credential resolution with the CLI's response. + pub async fn complete_resolution(&self, response: CredentialResponse) { + let mut pending = self.pending.lock().await; + if let Some(tx) = pending.remove(&response.request_id) { + let _ = tx.send(response); + } else { + warn!( + request_id = %response.request_id, + "Received credential response for unknown request" + ); + } + } +} + +/// Handle `ResolveCredential` RPC (called by sandbox supervisor). +pub async fn handle_resolve_credential( + state: &ServerState, + request: Request, +) -> Result, Status> { + let req = request.into_inner(); + debug!( + sandbox_id = %req.sandbox_id, + env_key = %req.env_key, + destination_host = %req.destination_host, + "Received deferred credential resolution request" + ); + + let sandbox_name = state.sandbox_index.sandbox_name_for_id(&req.sandbox_id); + let response = state.credential_authority_registry.resolve(req, sandbox_name).await?; + Ok(Response::new(response)) +} + +/// Handle `RegisterCredentialAuthority` bidirectional streaming RPC (called by CLI). +/// +/// The CLI sends `CredentialResponse` messages on its stream; the gateway pushes +/// `CredentialRequest` messages back. The stream stays open for the sandbox lifetime. +pub async fn handle_register_credential_authority( + state: &Arc, + request: Request>, +) -> Result< + Response> + Send>>>, + Status, +> { + let mut inbound = request.into_inner(); + + // Create a channel for sending requests to the CLI + let (request_tx, request_rx) = mpsc::channel::(16); + + // Register as authority (use "global" scope for now — PoC simplification) + let scope = "global".to_string(); + let reg_id = state + .credential_authority_registry + .register_authority(scope.clone(), request_tx) + .await; + + debug!("CLI registered as credential authority (scope: {scope}, reg_id: {reg_id})"); + + // Spawn a task to read responses from the CLI and complete pending resolutions + let state_clone = state.clone(); + let scope_clone = scope.clone(); + tokio::spawn(async move { + debug!("credential authority reader task started (scope: {scope_clone}, reg_id: {reg_id})"); + loop { + match inbound.message().await { + Ok(Some(response)) => { + if response.request_id.is_empty() { + debug!("credential authority: seed frame received (scope: {scope_clone}, reg_id: {reg_id})"); + continue; + } + debug!(request_id = %response.request_id, approved = response.approved, "credential authority: received CLI response"); + state_clone + .credential_authority_registry + .complete_resolution(response) + .await; + } + Ok(None) => { + info!("credential authority: CLI stream ended (scope: {scope_clone}, reg_id: {reg_id})"); + break; + } + Err(e) => { + warn!("credential authority: CLI stream error: {e} (scope: {scope_clone}, reg_id: {reg_id})"); + break; + } + } + } + state_clone + .credential_authority_registry + .unregister_authority(&scope_clone, reg_id) + .await; + }); + + // Convert the request receiver into a stream for the response + let stream = tokio_stream::wrappers::ReceiverStream::new(request_rx) + .map(Ok); + + Ok(Response::new(Box::pin(stream))) +} diff --git a/crates/openshell-server/src/grpc/mod.rs b/crates/openshell-server/src/grpc/mod.rs index 969204dad..fee6fae0a 100644 --- a/crates/openshell-server/src/grpc/mod.rs +++ b/crates/openshell-server/src/grpc/mod.rs @@ -3,6 +3,7 @@ //! gRPC service implementation. +pub(crate) mod credentials; pub(crate) mod policy; mod provider; mod sandbox; @@ -12,6 +13,7 @@ use openshell_core::proto::{ ApproveAllDraftChunksRequest, ApproveAllDraftChunksResponse, ApproveDraftChunkRequest, ApproveDraftChunkResponse, ClearDraftChunksRequest, ClearDraftChunksResponse, CreateProviderRequest, CreateSandboxRequest, CreateSshSessionRequest, CreateSshSessionResponse, + CredentialRequest, CredentialResponse, DeleteProviderRequest, DeleteProviderResponse, DeleteSandboxRequest, DeleteSandboxResponse, EditDraftChunkRequest, EditDraftChunkResponse, ExecSandboxEvent, ExecSandboxRequest, GatewayMessage, GetDraftHistoryRequest, GetDraftHistoryResponse, GetDraftPolicyRequest, @@ -23,7 +25,8 @@ use openshell_core::proto::{ ListSandboxPoliciesRequest, ListSandboxPoliciesResponse, ListSandboxesRequest, ListSandboxesResponse, ProviderResponse, PushSandboxLogsRequest, PushSandboxLogsResponse, RejectDraftChunkRequest, RejectDraftChunkResponse, RelayFrame, ReportPolicyStatusRequest, - ReportPolicyStatusResponse, RevokeSshSessionRequest, RevokeSshSessionResponse, SandboxResponse, + ReportPolicyStatusResponse, ResolveCredentialRequest, ResolveCredentialResponse, + RevokeSshSessionRequest, RevokeSshSessionResponse, SandboxResponse, SandboxStreamEvent, ServiceStatus, SubmitPolicyAnalysisRequest, SubmitPolicyAnalysisResponse, SupervisorMessage, UndoDraftChunkRequest, UndoDraftChunkResponse, UpdateConfigRequest, UpdateConfigResponse, UpdateProviderRequest, WatchSandboxRequest, open_shell_server::OpenShell, @@ -418,6 +421,25 @@ impl OpenShell for OpenShellService { crate::supervisor_session::handle_relay_stream(&self.state.supervisor_sessions, request) .await } + + // --- Deferred credentials --- + + async fn resolve_credential( + &self, + request: Request, + ) -> Result, Status> { + credentials::handle_resolve_credential(&self.state, request).await + } + + type RegisterCredentialAuthorityStream = + Pin> + Send + 'static>>; + + async fn register_credential_authority( + &self, + request: Request>, + ) -> Result, Status> { + credentials::handle_register_credential_authority(&self.state, request).await + } } // --------------------------------------------------------------------------- diff --git a/crates/openshell-server/src/grpc/policy.rs b/crates/openshell-server/src/grpc/policy.rs index b0cfc6f38..7ca759758 100644 --- a/crates/openshell-server/src/grpc/policy.rs +++ b/crates/openshell-server/src/grpc/policy.rs @@ -449,7 +449,7 @@ pub(super) async fn handle_get_sandbox_provider_environment( .spec .ok_or_else(|| Status::internal("sandbox has no spec"))?; - let environment = + let (environment, config_environment) = super::provider::resolve_provider_environment(state.store.as_ref(), &spec.providers) .await?; @@ -457,11 +457,13 @@ pub(super) async fn handle_get_sandbox_provider_environment( sandbox_id = %sandbox_id, provider_count = spec.providers.len(), env_count = environment.len(), + config_count = config_environment.len(), "GetSandboxProviderEnvironment request completed successfully" ); Ok(Response::new(GetSandboxProviderEnvironmentResponse { environment, + config_environment, })) } diff --git a/crates/openshell-server/src/grpc/provider.rs b/crates/openshell-server/src/grpc/provider.rs index 2e3876fb8..16f587a0d 100644 --- a/crates/openshell-server/src/grpc/provider.rs +++ b/crates/openshell-server/src/grpc/provider.rs @@ -213,15 +213,28 @@ fn merge_map( /// collects credential key-value pairs. Returns a map of environment variables /// to inject into the sandbox. When duplicate keys appear across providers, the /// first provider's value wins. +/// +/// Returns `(credentials, config)` — credentials are subject to placeholder +/// resolution by the sandbox L7 proxy; config values are injected literally. pub(super) async fn resolve_provider_environment( store: &Store, provider_names: &[String], -) -> Result, Status> { +) -> Result< + ( + std::collections::HashMap, + std::collections::HashMap, + ), + Status, +> { if provider_names.is_empty() { - return Ok(std::collections::HashMap::new()); + return Ok(( + std::collections::HashMap::new(), + std::collections::HashMap::new(), + )); } let mut env = std::collections::HashMap::new(); + let mut config_env = std::collections::HashMap::new(); for name in provider_names { let provider = store @@ -241,9 +254,15 @@ pub(super) async fn resolve_provider_environment( ); } } + + for (key, value) in &provider.config { + if is_valid_env_key(key) { + config_env.entry(key.clone()).or_insert_with(|| value.clone()); + } + } } - Ok(env) + Ok((env, config_env)) } pub(super) fn is_valid_env_key(key: &str) -> bool { @@ -726,8 +745,9 @@ mod tests { #[tokio::test] async fn resolve_provider_env_empty_list_returns_empty() { let store = Store::connect("sqlite::memory:").await.unwrap(); - let result = resolve_provider_environment(&store, &[]).await.unwrap(); + let (result, config) = resolve_provider_environment(&store, &[]).await.unwrap(); assert!(result.is_empty()); + assert!(config.is_empty()); } #[tokio::test] @@ -755,12 +775,16 @@ mod tests { }; create_provider_record(&store, provider).await.unwrap(); - let result = resolve_provider_environment(&store, &["claude-local".to_string()]) + let (result, config) = resolve_provider_environment(&store, &["claude-local".to_string()]) .await .unwrap(); assert_eq!(result.get("ANTHROPIC_API_KEY"), Some(&"sk-abc".to_string())); assert_eq!(result.get("CLAUDE_API_KEY"), Some(&"sk-abc".to_string())); assert!(!result.contains_key("endpoint")); + assert_eq!( + config.get("endpoint"), + Some(&"https://api.anthropic.com".to_string()) + ); } #[tokio::test] @@ -795,7 +819,7 @@ mod tests { }; create_provider_record(&store, provider).await.unwrap(); - let result = resolve_provider_environment(&store, &["test-provider".to_string()]) + let (result, _config) = resolve_provider_environment(&store, &["test-provider".to_string()]) .await .unwrap(); assert_eq!(result.get("VALID_KEY"), Some(&"value".to_string())); @@ -844,7 +868,7 @@ mod tests { .await .unwrap(); - let result = resolve_provider_environment( + let (result, _config) = resolve_provider_environment( &store, &["claude-local".to_string(), "gitlab-local".to_string()], ) @@ -895,7 +919,7 @@ mod tests { .await .unwrap(); - let result = resolve_provider_environment( + let (result, _config) = resolve_provider_environment( &store, &["provider-a".to_string(), "provider-b".to_string()], ) @@ -954,7 +978,7 @@ mod tests { .unwrap() .unwrap(); let spec = loaded.spec.unwrap(); - let env = resolve_provider_environment(&store, &spec.providers) + let (env, _config) = resolve_provider_environment(&store, &spec.providers) .await .unwrap(); @@ -987,11 +1011,12 @@ mod tests { .unwrap() .unwrap(); let spec = loaded.spec.unwrap(); - let env = resolve_provider_environment(&store, &spec.providers) + let (env, config) = resolve_provider_environment(&store, &spec.providers) .await .unwrap(); assert!(env.is_empty()); + assert!(config.is_empty()); } #[tokio::test] diff --git a/crates/openshell-server/src/lib.rs b/crates/openshell-server/src/lib.rs index d587c0bb3..8039d61eb 100644 --- a/crates/openshell-server/src/lib.rs +++ b/crates/openshell-server/src/lib.rs @@ -94,6 +94,9 @@ pub struct ServerState { /// can be constructed before `ServerState` and still /// query session state to surface supervisor readiness. pub supervisor_sessions: Arc, + + /// Registry of connected CLI credential authorities for deferred credential resolution. + pub credential_authority_registry: grpc::credentials::CredentialAuthorityRegistry, } fn is_benign_tls_handshake_failure(error: &std::io::Error) -> bool { @@ -126,6 +129,7 @@ impl ServerState { ssh_connections_by_sandbox: Mutex::new(HashMap::new()), settings_mutex: tokio::sync::Mutex::new(()), supervisor_sessions, + credential_authority_registry: grpc::credentials::CredentialAuthorityRegistry::new(), } } } diff --git a/crates/openshell-server/src/sandbox_index.rs b/crates/openshell-server/src/sandbox_index.rs index 9ad7c941b..c7b1724e7 100644 --- a/crates/openshell-server/src/sandbox_index.rs +++ b/crates/openshell-server/src/sandbox_index.rs @@ -61,4 +61,14 @@ impl SandboxIndex { let inner = self.inner.read().expect("sandbox index lock poisoned"); inner.agent_pod_to_id.get(pod).cloned() } + + #[must_use] + pub fn sandbox_name_for_id(&self, id: &str) -> Option { + let inner = self.inner.read().expect("sandbox index lock poisoned"); + inner + .sandbox_name_to_id + .iter() + .find(|(_, v)| v.as_str() == id) + .map(|(k, _)| k.clone()) + } } diff --git a/proto/openshell.proto b/proto/openshell.proto index 48a8fece2..b4c20b35c 100644 --- a/proto/openshell.proto +++ b/proto/openshell.proto @@ -155,6 +155,17 @@ service OpenShell { // Get decision history for a sandbox's draft policy. rpc GetDraftHistory(GetDraftHistoryRequest) returns (GetDraftHistoryResponse); + + // --------------------------------------------------------------------------- + // Deferred credential RPCs + // --------------------------------------------------------------------------- + + // Resolve a deferred credential at runtime (called by sandbox supervisor). + rpc ResolveCredential(ResolveCredentialRequest) returns (ResolveCredentialResponse); + + // Register as credential authority for deferred providers (called by CLI). + // The CLI sends CredentialResponse messages; the gateway pushes CredentialRequest messages. + rpc RegisterCredentialAuthority(stream CredentialResponse) returns (stream CredentialRequest); } // Health check request. @@ -571,8 +582,10 @@ message GetSandboxProviderEnvironmentRequest { // Get sandbox provider environment response. message GetSandboxProviderEnvironmentResponse { - // Provider credential environment variables. + // Provider credential environment variables (subject to placeholder resolution). map environment = 1; + // Provider config environment variables (injected literally, not placeholder'd). + map config_environment = 2; } // --------------------------------------------------------------------------- @@ -1136,3 +1149,39 @@ message GetDraftHistoryResponse { // Chronological decision history. repeated DraftHistoryEntry entries = 1; } + +// --------------------------------------------------------------------------- +// Deferred credential messages +// --------------------------------------------------------------------------- + +// Sandbox→Gateway: resolve a deferred credential at runtime. +message ResolveCredentialRequest { + string sandbox_id = 1; + // Environment variable key, e.g. "ANTHROPIC_API_KEY". + string env_key = 2; + // Destination host the request is heading to, e.g. "api.anthropic.com". + string destination_host = 3; +} + +message ResolveCredentialResponse { + bool approved = 1; + // The secret value (only set when approved=true). + string value = 2; +} + +// Gateway→CLI: request a credential from the registered authority. +message CredentialRequest { + string request_id = 1; + string sandbox_id = 2; + string sandbox_name = 3; + string env_key = 4; + string destination_host = 5; +} + +// CLI→Gateway: respond to a credential request. +message CredentialResponse { + string request_id = 1; + bool approved = 2; + // The secret value (only set when approved=true). + string value = 3; +}