diff --git a/Cargo.lock b/Cargo.lock index ae6a8b8ff..7aa5022ef 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -341,17 +341,32 @@ dependencies = [ "anstyle", "anstyle-parse", "anstyle-query", - "anstyle-wincon", + "anstyle-wincon 1.0.1", "colorchoice", "is-terminal", "utf8parse", ] +[[package]] +name = "anstream" +version = "0.6.21" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "43d5b281e737544384e969a5ccad3f1cdd24b48086a0fc1b2a5262a26b8f4f4a" +dependencies = [ + "anstyle", + "anstyle-parse", + "anstyle-query", + "anstyle-wincon 3.0.11", + "colorchoice", + "is_terminal_polyfill", + "utf8parse", +] + [[package]] name = "anstyle" -version = "1.0.0" +version = "1.0.13" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "41ed9a86bf92ae6580e0a31281f65a1b1d867c0cc68d5346e2ae128dddfa6a7d" +checksum = "5192cca8006f1fd4f7237516f40fa183bb07f8fbdfedaa0036de5ea9b0b45e78" [[package]] name = "anstyle-parse" @@ -381,6 +396,17 @@ dependencies = [ "windows-sys 0.48.0", ] +[[package]] +name = "anstyle-wincon" +version = "3.0.11" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "291e6a250ff86cd4a820112fb8898808a366d8f9f58ce16d1f538353ad55747d" +dependencies = [ + "anstyle", + "once_cell_polyfill", + "windows-sys 0.61.2", +] + [[package]] name = "anyhow" version = "1.0.75" @@ -1494,7 +1520,7 @@ version = "4.3.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c1458a1df40e1e2afebb7ab60ce55c1fa8f431146205aa5f4887e0b111c27636" dependencies = [ - "anstream", + "anstream 0.3.2", "anstyle", "bitflags 1.3.2", "clap_lex 0.5.0", @@ -2201,6 +2227,29 @@ dependencies = [ "cfg-if", ] +[[package]] +name = "env_filter" +version = "0.1.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1bf3c259d255ca70051b30e2e95b5446cdb8949ac4cd22c0d7fd634d89f568e2" +dependencies = [ + "log", + "regex", +] + +[[package]] +name = "env_logger" +version = "0.11.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6c012a26a7f605efc424dd53697843a72be7dc86ad2d01f7814337794a12231d" +dependencies = [ + "anstream 0.6.21", + "anstyle", + "env_filter", + "humantime", + "log", +] + [[package]] name = "equivalent" version = "1.0.1" @@ -2333,6 +2382,17 @@ version = "0.2.9" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "28dea519a9695b9977216879a3ebfddf92f1c08c05d984f8996aecd6ecdc811d" +[[package]] +name = "filetime" +version = "0.2.27" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f98844151eee8917efc50bd9e8318cb963ae8b297431495d3f758616ea5c57db" +dependencies = [ + "cfg-if", + "libc", + "libredox", +] + [[package]] name = "find-msvc-tools" version = "0.1.5" @@ -2487,6 +2547,15 @@ version = "1.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "42703706b716c37f96a77aea830392ad231f44c9e9a67872fa5548707e11b11c" +[[package]] +name = "fsevent-sys" +version = "4.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "76ee7a02da4d231650c7cea31349b889be2f45ddb3ef3032d2ec8185f6313fd2" +dependencies = [ + "libc", +] + [[package]] name = "futures" version = "0.3.28" @@ -2924,6 +2993,12 @@ version = "1.0.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c4a1e36c821dbe04574f602848a19f742f4fb3c98d40449f11bcad18d6b17421" +[[package]] +name = "humantime" +version = "2.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "135b12329e5e3ce057a9f972339ea52bc954fe1e9358ef27f95e89716fbc5424" + [[package]] name = "hyper" version = "0.14.26" @@ -3115,6 +3190,26 @@ dependencies = [ "serde", ] +[[package]] +name = "inotify" +version = "0.10.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fdd168d97690d0b8c412d6b6c10360277f4d7ee495c5d0d5d5fe0854923255cc" +dependencies = [ + "bitflags 1.3.2", + "inotify-sys", + "libc", +] + +[[package]] +name = "inotify-sys" +version = "0.1.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e05c02b5e89bff3b946cedeca278abc628fe811e604f027c45a8aa3cf793d0eb" +dependencies = [ + "libc", +] + [[package]] name = "inout" version = "0.1.4" @@ -3174,6 +3269,12 @@ dependencies = [ "windows-sys 0.48.0", ] +[[package]] +name = "is_terminal_polyfill" +version = "1.70.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a6cb138bb79a146c1bd460005623e142ef0181e3d0219cb493e02f7d08a35695" + [[package]] name = "iso8601" version = "0.6.1" @@ -3305,6 +3406,26 @@ dependencies = [ "unicode-normalization", ] +[[package]] +name = "kqueue" +version = "1.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "eac30106d7dce88daf4a3fcb4879ea939476d5074a9b7ddd0fb97fa4bed5596a" +dependencies = [ + "kqueue-sys", + "libc", +] + +[[package]] +name = "kqueue-sys" +version = "1.0.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ed9625ffda8729b85e45cf04090035ac368927b8cebc34898e7c120f52e4838b" +dependencies = [ + "bitflags 1.3.2", + "libc", +] + [[package]] name = "language-tags" version = "0.3.2" @@ -3577,6 +3698,17 @@ version = "0.2.11" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8355be11b20d696c8f18f6cc018c4e372165b1fa8126cef092399c9951984ffa" +[[package]] +name = "libredox" +version = "0.1.12" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3d0b95e02c851351f877147b7deea7b1afb1df71b63aa5f8270716e0c5720616" +dependencies = [ + "bitflags 2.9.1", + "libc", + "redox_syscall 0.7.1", +] + [[package]] name = "linear-map" version = "1.2.0" @@ -3768,6 +3900,7 @@ checksum = "80e04d1dcff3aae0704555fe5fee3bcfaf3d1fdf8a7e521d5b9d2b42acb52cec" dependencies = [ "hermit-abi 0.3.9", "libc", + "log", "wasi 0.11.0+wasi-snapshot-preview1", "windows-sys 0.52.0", ] @@ -3843,6 +3976,34 @@ dependencies = [ "minimal-lexical", ] +[[package]] +name = "notify" +version = "7.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c533b4c39709f9ba5005d8002048266593c1cfaf3c5f0739d5b8ab0c6c504009" +dependencies = [ + "bitflags 2.9.1", + "filetime", + "fsevent-sys", + "inotify", + "kqueue", + "libc", + "log", + "mio 1.0.2", + "notify-types", + "walkdir", + "windows-sys 0.52.0", +] + +[[package]] +name = "notify-types" +version = "1.0.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "585d3cb5e12e01aed9e8a1f70d5c6b5e86fe2a6e48fc8cd0b3e0b8df6f6eb174" +dependencies = [ + "instant", +] + [[package]] name = "nu-ansi-term" version = "0.50.1" @@ -4002,6 +4163,12 @@ version = "1.18.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "dd8b5dd2ae5ed71462c540258bedcb51965123ad7e7ccf4b9a8cafaa4a63576d" +[[package]] +name = "once_cell_polyfill" +version = "1.70.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "384b8ab6d37215f3c5301a95a4accb5d64aa607f1fcb26a11b5303878451b4fe" + [[package]] name = "opaque-debug" version = "0.3.1" @@ -4605,6 +4772,15 @@ dependencies = [ "bitflags 1.3.2", ] +[[package]] +name = "redox_syscall" +version = "0.7.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "35985aa610addc02e24fc232012c86fd11f14111180f902b67e2d5331f8ebf2b" +dependencies = [ + "bitflags 2.9.1", +] + [[package]] name = "regex" version = "1.9.4" @@ -5648,7 +5824,9 @@ dependencies = [ "async-trait", "aws-smithy-types", "chrono", + "env_logger", "log", + "notify", "open-feature", "reqwest", "serde", @@ -5658,6 +5836,7 @@ dependencies = [ "superposition_types", "thiserror 1.0.58", "tokio", + "tokio-util", "uuid", ] @@ -6498,9 +6677,9 @@ checksum = "5190c9442dcdaf0ddd50f37420417d219ae5261bbf5db120d0f9bab996c9cba1" [[package]] name = "utf8parse" -version = "0.2.1" +version = "0.2.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "711b9620af191e0cdc7468a8d14e709c3dcdb115b36f838e601583af800a370a" +checksum = "06abde3611657adf66d383f00b093d7faecc7fa57071cce2578660c9f1010821" [[package]] name = "uuid" @@ -6776,6 +6955,12 @@ dependencies = [ "windows-targets 0.48.0", ] +[[package]] +name = "windows-link" +version = "0.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f0805222e57f7521d6a62e36fa9163bc891acd422f971defe97d64e70d0a4fe5" + [[package]] name = "windows-sys" version = "0.42.0" @@ -6818,6 +7003,15 @@ dependencies = [ "windows-targets 0.52.0", ] +[[package]] +name = "windows-sys" +version = "0.61.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ae137229bcbd6cdf0f7b80a31df61766145077ddf49416a728b02cb3921ff3fc" +dependencies = [ + "windows-link", +] + [[package]] name = "windows-targets" version = "0.42.2" diff --git a/crates/superposition_provider/Cargo.toml b/crates/superposition_provider/Cargo.toml index 52bc13c91..747a67f50 100644 --- a/crates/superposition_provider/Cargo.toml +++ b/crates/superposition_provider/Cargo.toml @@ -31,6 +31,11 @@ superposition_sdk = { workspace = true, features = ["behavior-version-latest"] } # Superposition types for proper type conversion superposition_types = { workspace = true } +notify = "7" +tokio-util = "0.7" + +[dev-dependencies] +env_logger = "0.11" [lints] workspace = true diff --git a/crates/superposition_provider/examples/config.toml b/crates/superposition_provider/examples/config.toml new file mode 100644 index 000000000..e4adc822a --- /dev/null +++ b/crates/superposition_provider/examples/config.toml @@ -0,0 +1,20 @@ +[default-configs] +timeout = { value = 30, schema = { type = "integer" } } +currency = { value = "Rupee", schema = { type = "string", enum = ["Rupee", "Dollar", "Euro"] } } +price = { value = 10000, schema = { type = "integer", minimum = 0 } } + +[dimensions] +os = { position = 1, schema = { type = "string" } } +city = { position = 2, schema = { type = "string" } } + +[[overrides]] +_context_ = { os = "linux" } +timeout = 45 + +[[overrides]] +_context_ = { city = "Boston" } +currency = "Dollar" + +[[overrides]] +_context_ = { city = "Berlin" } +currency = "Euro" diff --git a/crates/superposition_provider/examples/local_file_example.rs b/crates/superposition_provider/examples/local_file_example.rs new file mode 100644 index 000000000..29e51bf1d --- /dev/null +++ b/crates/superposition_provider/examples/local_file_example.rs @@ -0,0 +1,34 @@ +use std::path::PathBuf; + +use open_feature::EvaluationContext; +use superposition_provider::{ + data_source::file::FileDataSource, local_provider::LocalResolutionProvider, + traits::AllFeatureProvider, OnDemandStrategy, RefreshStrategy, +}; + +#[tokio::main] +async fn main() { + env_logger::init(); + + let manifest_dir = PathBuf::from(env!("CARGO_MANIFEST_DIR")); + let file_source = FileDataSource::new(manifest_dir.join("examples/config.toml")); + + let provider = LocalResolutionProvider::new( + Box::new(file_source), + None, + RefreshStrategy::OnDemand(OnDemandStrategy { + ttl: 60, + ..Default::default() + }), + ); + provider.init().await.unwrap(); + + let context = EvaluationContext::default() + .with_custom_field("os", "linux") + .with_custom_field("city", "Boston"); + + let config = provider.resolve_all_features(&context).await.unwrap(); + println!("Config: {:?}", config); + + provider.close().await.unwrap(); +} diff --git a/crates/superposition_provider/examples/local_file_watch_example.rs b/crates/superposition_provider/examples/local_file_watch_example.rs new file mode 100644 index 000000000..deaa91ebd --- /dev/null +++ b/crates/superposition_provider/examples/local_file_watch_example.rs @@ -0,0 +1,38 @@ +use std::path::PathBuf; + +use open_feature::EvaluationContext; +use superposition_provider::{ + data_source::file::FileDataSource, local_provider::LocalResolutionProvider, + traits::AllFeatureProvider, RefreshStrategy, WatchStrategy, +}; + +#[tokio::main] +async fn main() { + env_logger::init(); + + let manifest_dir = PathBuf::from(env!("CARGO_MANIFEST_DIR")); + let config_path = manifest_dir.join("examples/config.toml"); + + println!("Watching config file: {:?}", config_path); + println!("Edit the file in another terminal to see changes.\n"); + + let file_source = FileDataSource::new(config_path); + + let provider = LocalResolutionProvider::new( + Box::new(file_source), + None, + RefreshStrategy::Watch(WatchStrategy::default()), + ); + provider.init().await.unwrap(); + + let context = EvaluationContext::default() + .with_custom_field("os", "linux") + .with_custom_field("city", "Boston"); + + // Poll in a loop to show updated values after file changes + loop { + let config = provider.resolve_all_features(&context).await.unwrap(); + println!("Config: {:?}", config); + tokio::time::sleep(tokio::time::Duration::from_secs(2)).await; + } +} diff --git a/crates/superposition_provider/examples/local_http_example.rs b/crates/superposition_provider/examples/local_http_example.rs new file mode 100644 index 000000000..a28db8c54 --- /dev/null +++ b/crates/superposition_provider/examples/local_http_example.rs @@ -0,0 +1,41 @@ +use open_feature::EvaluationContext; +use superposition_provider::{ + data_source::http::HttpDataSource, + local_provider::LocalResolutionProvider, + traits::{AllFeatureProvider, FeatureExperimentMeta}, + PollingStrategy, RefreshStrategy, SuperpositionOptions, +}; + +#[tokio::main] +async fn main() { + env_logger::init(); + + let http_source = HttpDataSource::new(SuperpositionOptions::new( + "http://localhost:8080".to_string(), + "token".to_string(), + "localorg".to_string(), + "dev".to_string(), + )); + + let provider = LocalResolutionProvider::new( + Box::new(http_source), + None, + RefreshStrategy::Polling(PollingStrategy { + interval: 30, + timeout: Some(10), + }), + ); + provider.init().await.unwrap(); + + let context = EvaluationContext::default() + .with_targeting_key("user-1234") + .with_custom_field("dimension", "d2"); + + let all_config = provider.resolve_all_features(&context).await.unwrap(); + println!("All config: {:?}", all_config); + + let variants = provider.get_applicable_variants(&context).await.unwrap(); + println!("Variants: {:?}", variants); + + provider.close().await.unwrap(); +} diff --git a/crates/superposition_provider/examples/local_with_fallback_example.rs b/crates/superposition_provider/examples/local_with_fallback_example.rs new file mode 100644 index 000000000..d63a1d085 --- /dev/null +++ b/crates/superposition_provider/examples/local_with_fallback_example.rs @@ -0,0 +1,69 @@ +use std::path::PathBuf; + +use open_feature::{EvaluationContext, OpenFeature}; +use superposition_provider::{ + data_source::file::FileDataSource, data_source::http::HttpDataSource, + local_provider::LocalResolutionProvider, PollingStrategy, RefreshStrategy, + SuperpositionOptions, +}; +use tokio::time::{sleep, Duration}; + +#[tokio::main] +async fn main() { + env_logger::init(); + + let http_source = HttpDataSource::new(SuperpositionOptions::new( + "http://localhost:8080".to_string(), + "token".to_string(), + "localorg".to_string(), + "dev".to_string(), + )); + + let manifest_dir = PathBuf::from(env!("CARGO_MANIFEST_DIR")); + let file_source = FileDataSource::new(manifest_dir.join("examples/config.toml")); + + let provider = LocalResolutionProvider::new( + Box::new(http_source), + Some(Box::new(file_source)), + RefreshStrategy::Polling(PollingStrategy { + interval: 10, + timeout: Some(10), + }), + ); + + // Register with OpenFeature and create a client + let mut api = OpenFeature::singleton_mut().await; + api.set_provider(provider).await; + let client = api.create_client(); + + // Allow time for the provider to initialize via OpenFeature + sleep(Duration::from_secs(2)).await; + + println!("=== Superposition Fallback + Polling Example ==="); + println!("Primary: HTTP (localhost:8080), Fallback: config.toml"); + println!("Polling every 10s. Printing config every 5s (Ctrl-C to stop).\n"); + + let context = EvaluationContext::default() + .with_targeting_key("user-456") + .with_custom_field("os", "linux") + .with_custom_field("city", "Berlin"); + + loop { + let ts = chrono::Utc::now().format("%H:%M:%S"); + + match client + .get_string_value("currency", Some(&context), None) + .await + { + Ok(value) => print!("[{}] currency = {}", ts, value), + Err(e) => print!("[{}] currency error: {:?}", ts, e), + } + + match client.get_int_value("timeout", Some(&context), None).await { + Ok(value) => println!(" | timeout = {}", value), + Err(e) => println!(" | timeout error: {:?}", e), + } + + sleep(Duration::from_secs(5)).await; + } +} diff --git a/crates/superposition_provider/examples/polling_example.rs b/crates/superposition_provider/examples/polling_example.rs new file mode 100644 index 000000000..31445501d --- /dev/null +++ b/crates/superposition_provider/examples/polling_example.rs @@ -0,0 +1,102 @@ +/// Demonstrates the Polling refresh strategy with LocalResolutionProvider +/// using the OpenFeature client interface. +/// +/// This example connects to a Superposition server via HTTP, polls for config +/// changes every 10 seconds, and prints a config value in a loop using the +/// standard OpenFeature client API. Change the config on the server and watch +/// the printed value update automatically. +/// +/// Usage: +/// RUST_LOG=info cargo run --example polling_example +/// +/// Environment variables (all optional, with defaults shown): +/// SUPERPOSITION_ENDPOINT http://localhost:8080 +/// SUPERPOSITION_TOKEN token +/// SUPERPOSITION_ORG_ID localorg +/// SUPERPOSITION_WORKSPACE dev +/// POLL_INTERVAL 10 (seconds between server polls) +/// PRINT_INTERVAL 5 (seconds between printing the value) +/// CONFIG_KEY max_connections (the config key to watch) +use std::env; + +use open_feature::{EvaluationContext, OpenFeature}; +use superposition_provider::{ + data_source::http::HttpDataSource, local_provider::LocalResolutionProvider, PollingStrategy, + RefreshStrategy, SuperpositionOptions, +}; +use tokio::time::{sleep, Duration}; + +fn env_or(key: &str, default: &str) -> String { + env::var(key).unwrap_or_else(|_| default.to_string()) +} + +#[tokio::main] +async fn main() { + env_logger::init(); + + let endpoint = env_or("SUPERPOSITION_ENDPOINT", "http://localhost:8080"); + let token = env_or("SUPERPOSITION_TOKEN", "token"); + let org_id = env_or("SUPERPOSITION_ORG_ID", "localorg"); + let workspace = env_or("SUPERPOSITION_WORKSPACE", "dev"); + let poll_interval: u64 = env_or("POLL_INTERVAL", "10").parse().unwrap_or(10); + let print_interval: u64 = env_or("PRINT_INTERVAL", "5").parse().unwrap_or(5); + let config_key = env_or("CONFIG_KEY", "max_connections"); + + println!("=== Superposition Polling Example ==="); + println!("Endpoint: {}", endpoint); + println!("Org / Workspace: {} / {}", org_id, workspace); + println!("Poll interval: {}s", poll_interval); + println!("Print interval: {}s", print_interval); + println!("Watching key: {}", config_key); + println!(); + + let http_source = HttpDataSource::new(SuperpositionOptions::new( + endpoint, token, org_id, workspace, + )); + + let provider = LocalResolutionProvider::new( + Box::new(http_source), + None, + RefreshStrategy::Polling(PollingStrategy { + interval: poll_interval, + timeout: Some(10), + }), + ); + + // Register with OpenFeature and create a client + let mut api = OpenFeature::singleton_mut().await; + api.set_provider(provider).await; + let client = api.create_client(); + + // Allow time for the provider to initialize via OpenFeature + sleep(Duration::from_secs(2)).await; + + println!( + "Provider ready. Printing config every {}s (Ctrl-C to stop).\n", + print_interval + ); + + let context = EvaluationContext::default(); + + loop { + match client + .get_int_value(&config_key, Some(&context), None) + .await + { + Ok(value) => println!( + "[{}] {} = {}", + chrono::Utc::now().format("%H:%M:%S"), + config_key, + value + ), + Err(e) => eprintln!( + "[{}] Error resolving {}: {:?}", + chrono::Utc::now().format("%H:%M:%S"), + config_key, + e + ), + } + + sleep(Duration::from_secs(print_interval)).await; + } +} diff --git a/crates/superposition_provider/src/client.rs b/crates/superposition_provider/src/client.rs index 8027d22bd..2b7e27176 100644 --- a/crates/superposition_provider/src/client.rs +++ b/crates/superposition_provider/src/client.rs @@ -97,6 +97,12 @@ impl CacConfig { on_demand_strategy.timeout.unwrap_or(30) ); } + RefreshStrategy::Watch(_) => { + info!("Using Watch refresh strategy"); + } + RefreshStrategy::Manual => { + info!("Using Manual refresh strategy"); + } } Ok(()) @@ -344,6 +350,12 @@ impl ExperimentationConfig { on_demand_strategy.ttl ); } + RefreshStrategy::Watch(_) => { + info!("Using Watch refresh strategy for experiments"); + } + RefreshStrategy::Manual => { + info!("Using Manual refresh strategy for experiments"); + } } Ok(()) diff --git a/crates/superposition_provider/src/data_source.rs b/crates/superposition_provider/src/data_source.rs new file mode 100644 index 000000000..10374df24 --- /dev/null +++ b/crates/superposition_provider/src/data_source.rs @@ -0,0 +1,129 @@ +pub mod file; +pub mod http; + +use async_trait::async_trait; +use chrono::{DateTime, Utc}; +use serde_json::{Map, Value}; +use superposition_core::experiment::ExperimentGroups; +use superposition_core::Experiments; +use superposition_types::Config; + +use crate::types::Result; + +/// Holds a resolved configuration along with the time it was fetched. +#[derive(Debug, Clone)] +pub struct ConfigData { + pub config: Config, + pub fetched_at: DateTime, +} + +impl ConfigData { + pub fn new(config: Config) -> Self { + Self { + config, + fetched_at: Utc::now(), + } + } +} + +/// Holds active experiments and experiment groups along with the time they were fetched. +#[derive(Debug, Clone)] +pub struct ExperimentData { + pub experiments: Experiments, + pub experiment_groups: ExperimentGroups, + pub fetched_at: DateTime, +} + +impl ExperimentData { + pub fn new(experiments: Experiments, experiment_groups: ExperimentGroups) -> Self { + Self { + experiments, + experiment_groups, + fetched_at: Utc::now(), + } + } + + /// Filter experiments by context using a matcher function (apply or partial_apply), + /// and optionally by prefix. + pub fn filter( + &self, + context: Option<&Map>, + prefix_filter: Option<&[String]>, + matcher: fn(&Map, &Map) -> bool, + ) -> ExperimentData { + let mut filtered_experiments = self.experiments.clone(); + + // Filter by context using the provided matcher + if let Some(ctx) = context { + if !ctx.is_empty() { + filtered_experiments.retain(|exp| matcher(&exp.context, ctx)); + } + } + + // Filter by prefix: keep experiments where any variant has an override key + // starting with any of the prefixes + if let Some(prefixes) = prefix_filter { + if !prefixes.is_empty() { + filtered_experiments.retain(|exp| { + exp.variants.iter().any(|variant| { + let overrides = variant.overrides.clone().into_inner(); + overrides.keys().any(|key| { + prefixes.iter().any(|prefix| key.starts_with(prefix)) + }) + }) + }); + } + } + + // Keep all groups (they reference experiments by ID) + ExperimentData::new(filtered_experiments, self.experiment_groups.clone()) + } +} + +/// Trait for fetching configuration and experiment data from a Superposition backend. +/// +/// Implementors provide the transport mechanism (e.g. HTTP, file-based) while consumers +/// interact with this unified interface. +#[async_trait] +pub trait SuperpositionDataSource: Send + Sync { + /// Fetch the full resolved configuration. + async fn fetch_config(&self) -> Result; + + /// Fetch a resolved configuration filtered by the given context and key prefixes. + async fn fetch_filtered_config( + &self, + context: Option<&Map>, + prefix_filter: Option<&[String]>, + ) -> Result; + + /// Fetch all active experiments. + async fn fetch_active_experiments(&self) -> Result>; + + /// Fetch active experiments whose conditions are candidates for the given context + /// and key prefixes. + async fn fetch_candidate_active_experiments( + &self, + context: Option<&Map>, + prefix_filter: Option<&[String]>, + ) -> Result>; + + /// Fetch active experiments that match the given context and key prefixes. + async fn fetch_matching_active_experiments( + &self, + context: Option<&Map>, + prefix_filter: Option<&[String]>, + ) -> Result>; + + /// Whether this data source supports experiments. + fn supports_experiments(&self) -> bool; + + /// Set up a file watcher and return a stream of change notifications. + /// + /// Returns `Ok(None)` if this data source does not support watching. + fn watch(&self) -> Result> { + Ok(None) + } + + /// Clean up any resources held by this data source. + async fn close(&self) -> Result<()>; +} diff --git a/crates/superposition_provider/src/data_source/file.rs b/crates/superposition_provider/src/data_source/file.rs new file mode 100644 index 000000000..0bed088cb --- /dev/null +++ b/crates/superposition_provider/src/data_source/file.rs @@ -0,0 +1,141 @@ +use std::collections::HashSet; +use std::path::PathBuf; +use std::sync::{Arc, Mutex}; + +use async_trait::async_trait; +use log::error; +use notify::{Event, RecommendedWatcher, Watcher}; +use serde_json::{Map, Value}; +use superposition_core::toml::parse_toml_config; + +use super::{ConfigData, ExperimentData, SuperpositionDataSource}; +use crate::types::{Result, SuperpositionError, WatchStream}; + +pub struct FileDataSource { + file_path: PathBuf, + watcher: Arc>>, +} + +impl FileDataSource { + pub fn new(file_path: PathBuf) -> Self { + Self { + file_path, + watcher: Arc::new(Mutex::new(None)), + } + } +} + +#[async_trait] +impl SuperpositionDataSource for FileDataSource { + async fn fetch_config(&self) -> Result { + let content = + tokio::fs::read_to_string(&self.file_path) + .await + .map_err(|e| { + SuperpositionError::ConfigError(format!( + "Failed to read config file {:?}: {}", + self.file_path, e + )) + })?; + + let config = parse_toml_config(&content).map_err(|e| { + SuperpositionError::ConfigError(format!( + "Failed to parse TOML config: {}", + e + )) + })?; + + Ok(ConfigData::new(config)) + } + + async fn fetch_filtered_config( + &self, + context: Option<&Map>, + prefix_filter: Option<&[String]>, + ) -> Result { + let config_data = self.fetch_config().await?; + let mut config = config_data.config; + + if let Some(ctx) = context { + if !ctx.is_empty() { + config = config.filter_by_dimensions(ctx); + } + } + + if let Some(prefixes) = prefix_filter { + if !prefixes.is_empty() { + let prefix_set: HashSet = + HashSet::from_iter(prefixes.iter().cloned()); + config = config.filter_by_prefix(&prefix_set); + } + } + + Ok(ConfigData::new(config)) + } + + async fn fetch_active_experiments(&self) -> Result> { + Ok(None) + } + + async fn fetch_candidate_active_experiments( + &self, + _context: Option<&Map>, + _prefix_filter: Option<&[String]>, + ) -> Result> { + Ok(None) + } + + async fn fetch_matching_active_experiments( + &self, + _context: Option<&Map>, + _prefix_filter: Option<&[String]>, + ) -> Result> { + Ok(None) + } + + fn supports_experiments(&self) -> bool { + false + } + + fn watch(&self) -> Result> { + let (tx, rx) = tokio::sync::mpsc::channel(16); + + let mut watcher = notify::recommended_watcher(move |res: std::result::Result| { + match res { + Ok(_event) => { + let _ = tx.try_send(()); + } + Err(e) => { + error!("FileDataSource: watch error: {}", e); + } + } + }) + .map_err(|e| { + SuperpositionError::ConfigError(format!("Failed to create file watcher: {}", e)) + })?; + + watcher + .watch(&self.file_path, notify::RecursiveMode::NonRecursive) + .map_err(|e| { + SuperpositionError::ConfigError(format!( + "Failed to watch file {:?}: {}", + self.file_path, e + )) + })?; + + let mut guard = self.watcher.lock().map_err(|e| { + SuperpositionError::ConfigError(format!("Failed to lock watcher mutex: {}", e)) + })?; + *guard = Some(watcher); + + Ok(Some(WatchStream { receiver: rx })) + } + + async fn close(&self) -> Result<()> { + let mut guard = self.watcher.lock().map_err(|e| { + SuperpositionError::ConfigError(format!("Failed to lock watcher mutex: {}", e)) + })?; + *guard = None; + Ok(()) + } +} diff --git a/crates/superposition_provider/src/data_source/http.rs b/crates/superposition_provider/src/data_source/http.rs new file mode 100644 index 000000000..e60584227 --- /dev/null +++ b/crates/superposition_provider/src/data_source/http.rs @@ -0,0 +1,181 @@ +use std::collections::HashSet; + +use async_trait::async_trait; +use log::info; +use serde_json::{Map, Value}; +use superposition_sdk::types::ExperimentStatusType; +use superposition_sdk::{Client, Config as SdkConfig}; +use superposition_types::logic::{apply, partial_apply}; + +use super::{ConfigData, ExperimentData, SuperpositionDataSource}; +use crate::types::{Result, SuperpositionError, SuperpositionOptions}; +use crate::utils::ConversionUtils; + +pub struct HttpDataSource { + options: SuperpositionOptions, +} + +impl HttpDataSource { + pub fn new(options: SuperpositionOptions) -> Self { + Self { options } + } + + fn create_client(&self) -> Client { + let sdk_config = SdkConfig::builder() + .endpoint_url(&self.options.endpoint) + .bearer_token(self.options.token.clone().into()) + .behavior_version_latest() + .build(); + + Client::from_conf(sdk_config) + } + + async fn fetch_experiments_and_groups(&self) -> Result { + let client = self.create_client(); + + let (experiments_result, groups_result) = tokio::join!( + async { + client + .list_experiment() + .workspace_id(&self.options.workspace_id) + .org_id(&self.options.org_id) + .all(true) + .status(ExperimentStatusType::Created) + .status(ExperimentStatusType::Inprogress) + .send() + .await + .map_err(|e| { + SuperpositionError::NetworkError(format!( + "Failed to list experiments: {}", + e + )) + }) + }, + async { + client + .list_experiment_groups() + .workspace_id(&self.options.workspace_id) + .org_id(&self.options.org_id) + .all(true) + .send() + .await + .map_err(|e| { + SuperpositionError::NetworkError(format!( + "Failed to list experiment groups: {}", + e + )) + }) + } + ); + + let experiments_response = experiments_result?; + let groups_response = groups_result?; + + let experiments = + ConversionUtils::convert_experiments_response(&experiments_response)?; + let experiment_groups = + ConversionUtils::convert_experiment_groups_response(&groups_response)?; + + info!( + "Successfully fetched {} experiments and {} experiment groups", + experiments.len(), + experiment_groups.len() + ); + + Ok(ExperimentData::new(experiments, experiment_groups)) + } + +} + +#[async_trait] +impl SuperpositionDataSource for HttpDataSource { + async fn fetch_config(&self) -> Result { + let client = self.create_client(); + + info!("Fetching config from Superposition service using SDK"); + + let response = client + .get_config() + .workspace_id(&self.options.workspace_id) + .org_id(&self.options.org_id) + .send() + .await + .map_err(|e| { + SuperpositionError::NetworkError(format!( + "Failed to get config: {}", + e + )) + })?; + + let config = ConversionUtils::convert_get_config_response(&response)?; + + info!( + "Successfully fetched config with {} contexts, {} overrides, {} default configs", + config.contexts.len(), + config.overrides.len(), + config.default_configs.len() + ); + + Ok(ConfigData::new(config)) + } + + async fn fetch_filtered_config( + &self, + context: Option<&Map>, + prefix_filter: Option<&[String]>, + ) -> Result { + let config_data = self.fetch_config().await?; + let mut config = config_data.config; + + // Filter by dimensions if context is provided and non-empty + if let Some(ctx) = context { + if !ctx.is_empty() { + config = config.filter_by_dimensions(ctx); + } + } + + // Filter by prefix if prefix_filter is provided and non-empty + if let Some(prefixes) = prefix_filter { + if !prefixes.is_empty() { + let prefix_set: HashSet = + HashSet::from_iter(prefixes.iter().cloned()); + config = config.filter_by_prefix(&prefix_set); + } + } + + Ok(ConfigData::new(config)) + } + + async fn fetch_active_experiments(&self) -> Result> { + let data = self.fetch_experiments_and_groups().await?; + Ok(Some(data)) + } + + async fn fetch_candidate_active_experiments( + &self, + context: Option<&Map>, + prefix_filter: Option<&[String]>, + ) -> Result> { + let data = self.fetch_experiments_and_groups().await?; + let filtered = data.filter(context, prefix_filter, partial_apply); + Ok(Some(filtered)) + } + + async fn fetch_matching_active_experiments( + &self, + context: Option<&Map>, + prefix_filter: Option<&[String]>, + ) -> Result> { + let data = self.fetch_experiments_and_groups().await?; + let filtered = data.filter(context, prefix_filter, apply); + Ok(Some(filtered)) + } + + fn supports_experiments(&self) -> bool { + true + } + + async fn close(&self) -> Result<()> { + Ok(()) + } +} diff --git a/crates/superposition_provider/src/lib.rs b/crates/superposition_provider/src/lib.rs index 059e47cf1..a97757c15 100644 --- a/crates/superposition_provider/src/lib.rs +++ b/crates/superposition_provider/src/lib.rs @@ -1,10 +1,18 @@ pub mod client; +pub mod data_source; +pub mod local_provider; pub mod provider; +pub mod remote_provider; +pub mod traits; pub mod types; pub mod utils; pub use client::*; +pub use data_source::{ConfigData, ExperimentData, SuperpositionDataSource}; +pub use local_provider::LocalResolutionProvider; pub use provider::*; +pub use remote_provider::SuperpositionAPIProvider; +pub use traits::*; pub use types::*; pub use open_feature::{ diff --git a/crates/superposition_provider/src/local_provider.rs b/crates/superposition_provider/src/local_provider.rs new file mode 100644 index 000000000..9602c131a --- /dev/null +++ b/crates/superposition_provider/src/local_provider.rs @@ -0,0 +1,798 @@ +use std::collections::{HashMap, HashSet}; +use std::sync::Arc; + +use async_trait::async_trait; +use log::{debug, error, info, warn}; +use open_feature::{ + provider::FeatureProvider, + provider::{ProviderMetadata, ProviderStatus, ResolutionDetails}, + EvaluationContext, EvaluationError, EvaluationErrorCode, EvaluationResult, StructValue, +}; +use serde_json::{Map, Value}; +use superposition_core::{eval_config, get_applicable_variants, MergeStrategy}; +use superposition_types::logic::{apply, partial_apply}; +use superposition_types::DimensionInfo; +use tokio::sync::RwLock; +use tokio::time::{sleep, Duration}; +use tokio_util::sync::CancellationToken; + +use crate::data_source::{ConfigData, ExperimentData, SuperpositionDataSource}; +use crate::traits::{AllFeatureProvider, FeatureExperimentMeta}; +use crate::types::*; +use crate::utils::ConversionUtils; + +pub struct LocalResolutionProvider { + primary: Arc, + fallback: Option>, + refresh_strategy: RefreshStrategy, + cached_config: Arc>>, + cached_experiments: Arc>>, + cancel_token: CancellationToken, + metadata: ProviderMetadata, + status: RwLock, +} + +impl LocalResolutionProvider { + pub fn new( + primary: Box, + fallback: Option>, + refresh_strategy: RefreshStrategy, + ) -> Self { + Self { + primary: Arc::from(primary), + fallback: fallback.map(Arc::from), + refresh_strategy, + cached_config: Arc::new(RwLock::new(None)), + cached_experiments: Arc::new(RwLock::new(None)), + cancel_token: CancellationToken::new(), + metadata: ProviderMetadata { + name: "LocalResolutionProvider".to_string(), + }, + status: RwLock::new(ProviderStatus::NotReady), + } + } + + pub async fn init(&self) -> Result<()> { + // Fetch initial config from primary, fall back if needed + let config_data = match self.primary.fetch_config().await { + Ok(data) => { + info!("LocalResolutionProvider: fetched config from primary source"); + data + } + Err(e) => { + warn!( + "LocalResolutionProvider: primary config fetch failed: {}", + e + ); + if let Some(fallback) = &self.fallback { + fallback.fetch_config().await.map_err(|fb_err| { + error!( + "LocalResolutionProvider: fallback config fetch also failed: {}", + fb_err + ); + SuperpositionError::ConfigError(format!( + "Both primary and fallback config fetch failed. Primary: {}. Fallback: {}", + e, fb_err + )) + })? + } else { + return Err(SuperpositionError::ConfigError(format!( + "Primary config fetch failed and no fallback configured: {}", + e + ))); + } + } + }; + + { + let mut cached = self.cached_config.write().await; + *cached = Some(config_data); + } + + // Fetch experiments best-effort: try primary, optionally fallback, but don't fail + let exp_data = match self.primary.fetch_active_experiments().await { + Ok(data) => data, + Err(e) => { + warn!( + "LocalResolutionProvider: primary experiment fetch failed (best-effort): {}", + e + ); + if let Some(fallback) = &self.fallback { + match fallback.fetch_active_experiments().await { + Ok(data) => data, + Err(fb_err) => { + warn!( + "LocalResolutionProvider: fallback experiment fetch also failed (best-effort): {}", + fb_err + ); + None + } + } + } else { + None + } + } + }; + + if let Some(data) = exp_data { + let mut cached = self.cached_experiments.write().await; + *cached = Some(data); + info!("LocalResolutionProvider: experiments cached"); + } + + // Start refresh strategy + match &self.refresh_strategy { + RefreshStrategy::Polling(polling_strategy) => { + info!( + "LocalResolutionProvider: starting polling with interval={}s", + polling_strategy.interval + ); + self.start_polling(polling_strategy.interval, self.cancel_token.clone()) + .await; + } + RefreshStrategy::OnDemand(on_demand_strategy) => { + info!( + "LocalResolutionProvider: using OnDemand strategy with ttl={}s", + on_demand_strategy.ttl + ); + } + RefreshStrategy::Watch(watch_strategy) => { + let debounce_ms = watch_strategy.debounce_ms.unwrap_or(500); + match self.primary.watch() { + Ok(Some(stream)) => { + info!( + "LocalResolutionProvider: starting watch with debounce={}ms", + debounce_ms + ); + self.start_watching(stream, debounce_ms, self.cancel_token.clone()) + .await; + } + Ok(None) => { + warn!("Watch strategy selected but data source does not support watching"); + } + Err(e) => { + warn!("Failed to start watch: {}", e); + } + } + } + RefreshStrategy::Manual => { + info!("LocalResolutionProvider: using Manual refresh strategy"); + } + } + + { + let mut status = self.status.write().await; + *status = ProviderStatus::Ready; + } + + Ok(()) + } + + pub async fn refresh(&self) -> Result<()> { + self.do_refresh().await + } + + pub async fn close(&self) -> Result<()> { + // Cancel background tasks cooperatively + self.cancel_token.cancel(); + + // Close data sources + if let Err(e) = self.primary.close().await { + warn!("LocalResolutionProvider: error closing primary source: {}", e); + } + if let Some(fallback) = &self.fallback { + if let Err(e) = fallback.close().await { + warn!( + "LocalResolutionProvider: error closing fallback source: {}", + e + ); + } + } + + // Clear caches + { + let mut cached = self.cached_config.write().await; + *cached = None; + } + { + let mut cached = self.cached_experiments.write().await; + *cached = None; + } + + // Set status to NotReady + { + let mut status = self.status.write().await; + *status = ProviderStatus::NotReady; + } + + Ok(()) + } + + async fn do_refresh(&self) -> Result<()> { + // Fetch config from primary; keep last known good on failure + let config_result = self.primary.fetch_config().await; + match &config_result { + Ok(data) => { + let mut cached = self.cached_config.write().await; + *cached = Some(data.clone()); + debug!("LocalResolutionProvider: config refreshed from primary"); + } + Err(e) => { + warn!( + "LocalResolutionProvider: config refresh failed, keeping last known good: {}", + e + ); + } + } + + // Experiments refresh is best-effort, don't propagate errors + if self.primary.supports_experiments() { + match self.primary.fetch_active_experiments().await { + Ok(Some(data)) => { + let mut cached = self.cached_experiments.write().await; + *cached = Some(data); + debug!("LocalResolutionProvider: experiments refreshed from primary"); + } + Ok(None) => { + debug!("LocalResolutionProvider: no experiments returned from primary"); + } + Err(e) => { + warn!( + "LocalResolutionProvider: experiment refresh failed, keeping last known good: {}", + e + ); + } + } + } + + config_result.map(|_| ()) + } + + async fn start_polling(&self, interval: u64, token: CancellationToken) { + let primary = self.primary.clone(); + let cached_config = self.cached_config.clone(); + let cached_experiments = self.cached_experiments.clone(); + + tokio::spawn(async move { + loop { + tokio::select! { + _ = token.cancelled() => { + info!("LocalResolutionProvider: polling cancelled"); + break; + } + _ = sleep(Duration::from_secs(interval)) => { + // Refresh config + match primary.fetch_config().await { + Ok(data) => { + let mut cached = cached_config.write().await; + *cached = Some(data); + debug!("LocalResolutionProvider: config updated via polling"); + } + Err(e) => { + error!("LocalResolutionProvider: polling config error: {}", e); + } + } + + // Refresh experiments + match primary.fetch_active_experiments().await { + Ok(Some(data)) => { + let mut cached = cached_experiments.write().await; + *cached = Some(data); + debug!("LocalResolutionProvider: experiments updated via polling"); + } + Ok(None) => {} + Err(e) => { + error!( + "LocalResolutionProvider: polling experiments error: {}", + e + ); + } + } + } + } + } + }); + } + + async fn start_watching( + &self, + mut watch_stream: crate::types::WatchStream, + debounce_ms: u64, + token: CancellationToken, + ) { + let primary = self.primary.clone(); + let cached_config = self.cached_config.clone(); + let cached_experiments = self.cached_experiments.clone(); + + tokio::spawn(async move { + loop { + tokio::select! { + _ = token.cancelled() => { + info!("LocalResolutionProvider: watch cancelled"); + break; + } + msg = watch_stream.receiver.recv() => { + match msg { + Some(()) => { + // Debounce: wait, then drain any queued events + sleep(Duration::from_millis(debounce_ms)).await; + while watch_stream.receiver.try_recv().is_ok() {} + + // Refresh config + match primary.fetch_config().await { + Ok(data) => { + let mut cached = cached_config.write().await; + *cached = Some(data); + debug!("LocalResolutionProvider: config updated via watch"); + } + Err(e) => { + error!( + "LocalResolutionProvider: watch config refresh error: {}", + e + ); + } + } + + // Refresh experiments + match primary.fetch_active_experiments().await { + Ok(Some(data)) => { + let mut cached = cached_experiments.write().await; + *cached = Some(data); + debug!( + "LocalResolutionProvider: experiments updated via watch" + ); + } + Ok(None) => {} + Err(e) => { + error!( + "LocalResolutionProvider: watch experiments refresh error: {}", + e + ); + } + } + } + None => { + info!("LocalResolutionProvider: watch channel closed, stopping"); + break; + } + } + } + } + } + }); + } + + async fn ensure_fresh_data(&self) -> Result<()> { + if let RefreshStrategy::OnDemand(on_demand) = &self.refresh_strategy { + let ttl = on_demand.ttl; + let use_stale_on_error = on_demand.use_stale_on_error.unwrap_or(false); + + let should_refresh = { + let cached = self.cached_config.read().await; + match cached.as_ref() { + Some(data) => { + let elapsed = + (chrono::Utc::now() - data.fetched_at).num_seconds(); + elapsed > ttl as i64 + } + None => true, + } + }; + + if should_refresh { + debug!("LocalResolutionProvider: TTL expired, refreshing on-demand"); + if let Err(e) = self.do_refresh().await { + if !use_stale_on_error { + return Err(e); + } + warn!( + "LocalResolutionProvider: on-demand refresh failed, using stale data: {}", + e + ); + } + } + } + Ok(()) + } + + async fn get_dimensions_info(&self) -> HashMap { + let cached = self.cached_config.read().await; + match cached.as_ref() { + Some(data) => data.config.dimensions.clone(), + None => HashMap::new(), + } + } + + async fn eval_with_context( + &self, + context: &EvaluationContext, + prefix_filter: Option<&[String]>, + ) -> Result> { + self.ensure_fresh_data().await?; + + let (mut query_data, targeting_key) = + ConversionUtils::evaluation_context_to_query(context); + + let dimensions_info = self.get_dimensions_info().await; + + // If experiments are cached, get applicable variants and inject variantIds + { + let cached_exp = self.cached_experiments.read().await; + if let Some(exp_data) = cached_exp.as_ref() { + match get_applicable_variants( + &dimensions_info, + exp_data.experiments.clone(), + &exp_data.experiment_groups, + &query_data, + &targeting_key.clone().unwrap_or_default(), + None, + ) { + Ok(variant_ids) => { + query_data.insert( + "variantIds".to_string(), + Value::Array( + variant_ids.into_iter().map(Value::String).collect(), + ), + ); + } + Err(e) => { + warn!( + "LocalResolutionProvider: failed to get applicable variants: {}", + e + ); + } + } + } + } + + // Evaluate config using cached data + let cached = self.cached_config.read().await; + match cached.as_ref() { + Some(config_data) => eval_config( + (*config_data.config.default_configs).clone(), + &config_data.config.contexts, + &config_data.config.overrides, + &config_data.config.dimensions, + &query_data, + MergeStrategy::MERGE, + prefix_filter.map(|p| p.to_vec()), + ) + .map_err(|e| { + SuperpositionError::ConfigError(format!( + "Failed to evaluate config: {}", + e + )) + }), + None => Err(SuperpositionError::ConfigError( + "No cached config available".into(), + )), + } + } +} + +#[async_trait] +impl SuperpositionDataSource for LocalResolutionProvider { + async fn fetch_config(&self) -> Result { + self.ensure_fresh_data().await?; + let cached = self.cached_config.read().await; + cached + .clone() + .ok_or_else(|| SuperpositionError::ConfigError("No cached config available".into())) + } + + async fn fetch_filtered_config( + &self, + context: Option<&Map>, + prefix_filter: Option<&[String]>, + ) -> Result { + let config_data = self.fetch_config().await?; + let mut config = config_data.config; + + if let Some(ctx) = context { + if !ctx.is_empty() { + config = config.filter_by_dimensions(ctx); + } + } + if let Some(prefixes) = prefix_filter { + if !prefixes.is_empty() { + let prefix_set = HashSet::from_iter(prefixes.iter().cloned()); + config = config.filter_by_prefix(&prefix_set); + } + } + Ok(ConfigData::new(config)) + } + + async fn fetch_active_experiments(&self) -> Result> { + self.ensure_fresh_data().await?; + let cached = self.cached_experiments.read().await; + Ok(cached.clone()) + } + + async fn fetch_candidate_active_experiments( + &self, + context: Option<&Map>, + prefix_filter: Option<&[String]>, + ) -> Result> { + self.ensure_fresh_data().await?; + let cached = self.cached_experiments.read().await; + Ok(cached + .as_ref() + .map(|data| data.filter(context, prefix_filter, partial_apply))) + } + + async fn fetch_matching_active_experiments( + &self, + context: Option<&Map>, + prefix_filter: Option<&[String]>, + ) -> Result> { + self.ensure_fresh_data().await?; + let cached = self.cached_experiments.read().await; + Ok(cached + .as_ref() + .map(|data| data.filter(context, prefix_filter, apply))) + } + + fn supports_experiments(&self) -> bool { + true + } + + async fn close(&self) -> Result<()> { + LocalResolutionProvider::close(self).await + } +} + +#[async_trait] +impl AllFeatureProvider for LocalResolutionProvider { + async fn resolve_all_features( + &self, + context: &EvaluationContext, + ) -> Result> { + self.eval_with_context(context, None).await + } + + async fn resolve_all_features_with_filter( + &self, + context: &EvaluationContext, + prefix_filter: Option<&[String]>, + ) -> Result> { + self.eval_with_context(context, prefix_filter).await + } +} + +#[async_trait] +impl FeatureExperimentMeta for LocalResolutionProvider { + async fn get_applicable_variants( + &self, + context: &EvaluationContext, + ) -> Result> { + self.ensure_fresh_data().await?; + + let (query_data, targeting_key) = + ConversionUtils::evaluation_context_to_query(context); + let dimensions_info = self.get_dimensions_info().await; + + let cached_exp = self.cached_experiments.read().await; + match cached_exp.as_ref() { + Some(exp_data) => { + get_applicable_variants( + &dimensions_info, + exp_data.experiments.clone(), + &exp_data.experiment_groups, + &query_data, + &targeting_key.unwrap_or_default(), + None, + ) + .map_err(|e| { + SuperpositionError::ConfigError(format!( + "Failed to get applicable variants: {}", + e + )) + }) + } + None => Ok(vec![]), + } + } +} + +#[async_trait] +impl FeatureProvider for LocalResolutionProvider { + async fn initialize(&mut self, _context: &EvaluationContext) { + info!("Initializing LocalResolutionProvider..."); + { + let mut status = self.status.write().await; + *status = ProviderStatus::NotReady; + } + if (self.init().await).is_err() { + let mut status = self.status.write().await; + *status = ProviderStatus::Error; + return; + } + + let mut status = self.status.write().await; + *status = ProviderStatus::Ready; + + info!("LocalResolutionProvider initialized successfully"); + } + + async fn resolve_bool_value( + &self, + flag_key: &str, + evaluation_context: &EvaluationContext, + ) -> EvaluationResult> { + match self.resolve_all_features(evaluation_context).await { + Ok(config) => match config.get(flag_key) { + Some(value) => match value.as_bool() { + Some(bool_val) => Ok(ResolutionDetails::new(bool_val)), + None => Err(EvaluationError { + code: EvaluationErrorCode::TypeMismatch, + message: Some(format!("Flag '{}' is not a boolean", flag_key)), + }), + }, + None => Err(EvaluationError { + code: EvaluationErrorCode::FlagNotFound, + message: Some(format!("Flag '{}' not found", flag_key)), + }), + }, + Err(e) => { + error!("Error evaluating boolean flag {}: {}", flag_key, e); + Err(EvaluationError { + code: EvaluationErrorCode::General(format!( + "Error evaluating flag '{}': {}", + flag_key, e + )), + message: Some(format!("Error evaluating flag '{}': {}", flag_key, e)), + }) + } + } + } + + async fn resolve_string_value( + &self, + flag_key: &str, + evaluation_context: &EvaluationContext, + ) -> EvaluationResult> { + match self.resolve_all_features(evaluation_context).await { + Ok(config) => match config.get(flag_key) { + Some(value) => match value.as_str() { + Some(str_val) => Ok(ResolutionDetails::new(str_val.to_owned())), + None => Err(EvaluationError { + code: EvaluationErrorCode::TypeMismatch, + message: Some(format!("Flag '{}' is not a string", flag_key)), + }), + }, + None => Err(EvaluationError { + code: EvaluationErrorCode::FlagNotFound, + message: Some(format!("Flag '{}' not found", flag_key)), + }), + }, + Err(e) => { + error!("Error evaluating String flag {}: {}", flag_key, e); + Err(EvaluationError { + code: EvaluationErrorCode::General(format!( + "Error evaluating flag '{}': {}", + flag_key, e + )), + message: Some(format!("Error evaluating flag '{}': {}", flag_key, e)), + }) + } + } + } + + async fn resolve_int_value( + &self, + flag_key: &str, + evaluation_context: &EvaluationContext, + ) -> EvaluationResult> { + match self.resolve_all_features(evaluation_context).await { + Ok(config) => match config.get(flag_key) { + Some(value) => match value.as_i64() { + Some(int_val) => Ok(ResolutionDetails::new(int_val)), + None => Err(EvaluationError { + code: EvaluationErrorCode::TypeMismatch, + message: Some(format!("Flag '{}' is not an integer", flag_key)), + }), + }, + None => Err(EvaluationError { + code: EvaluationErrorCode::FlagNotFound, + message: Some(format!("Flag '{}' not found", flag_key)), + }), + }, + Err(e) => { + error!("Error evaluating integer flag {}: {}", flag_key, e); + Err(EvaluationError { + code: EvaluationErrorCode::General(format!( + "Error evaluating flag '{}': {}", + flag_key, e + )), + message: Some(format!("Error evaluating flag '{}': {}", flag_key, e)), + }) + } + } + } + + async fn resolve_float_value( + &self, + flag_key: &str, + evaluation_context: &EvaluationContext, + ) -> EvaluationResult> { + match self.resolve_all_features(evaluation_context).await { + Ok(config) => match config.get(flag_key) { + Some(value) => match value.as_f64() { + Some(float_val) => Ok(ResolutionDetails::new(float_val)), + None => Err(EvaluationError { + code: EvaluationErrorCode::TypeMismatch, + message: Some(format!("Flag '{}' is not a float", flag_key)), + }), + }, + None => Err(EvaluationError { + code: EvaluationErrorCode::FlagNotFound, + message: Some(format!("Flag '{}' not found", flag_key)), + }), + }, + Err(e) => { + error!("Error evaluating float flag {}: {}", flag_key, e); + Err(EvaluationError { + code: EvaluationErrorCode::General(format!( + "Error evaluating flag '{}': {}", + flag_key, e + )), + message: Some(format!("Error evaluating flag '{}': {}", flag_key, e)), + }) + } + } + } + + async fn resolve_struct_value( + &self, + flag_key: &str, + evaluation_context: &EvaluationContext, + ) -> EvaluationResult> { + match self.resolve_all_features(evaluation_context).await { + Ok(config) => match config.get(flag_key) { + Some(value) => match ConversionUtils::serde_value_to_struct_value(value) { + Ok(struct_value) => Ok(ResolutionDetails::new(struct_value)), + Err(e) => { + error!("Error converting value to StructValue: {}", e); + Err(EvaluationError { + code: EvaluationErrorCode::TypeMismatch, + message: Some(format!( + "Flag '{}' is not a struct: {}", + flag_key, e + )), + }) + } + }, + None => Err(EvaluationError { + code: EvaluationErrorCode::FlagNotFound, + message: Some(format!("Flag '{}' not found", flag_key)), + }), + }, + Err(e) => { + error!("Error evaluating Object flag {}: {}", flag_key, e); + Err(EvaluationError { + code: EvaluationErrorCode::General(format!( + "Error evaluating flag '{}': {}", + flag_key, e + )), + message: Some(format!("Error evaluating flag '{}': {}", flag_key, e)), + }) + } + } + } + + fn metadata(&self) -> &ProviderMetadata { + &self.metadata + } + + fn status(&self) -> ProviderStatus { + match self.status.try_read() { + Ok(status) => match *status { + ProviderStatus::Ready => ProviderStatus::Ready, + ProviderStatus::Error => ProviderStatus::Error, + ProviderStatus::NotReady => ProviderStatus::NotReady, + ProviderStatus::STALE => ProviderStatus::STALE, + }, + Err(_) => ProviderStatus::NotReady, + } + } +} diff --git a/crates/superposition_provider/src/remote_provider.rs b/crates/superposition_provider/src/remote_provider.rs new file mode 100644 index 000000000..69d310562 --- /dev/null +++ b/crates/superposition_provider/src/remote_provider.rs @@ -0,0 +1,521 @@ +use std::collections::HashMap; +use std::sync::Arc; +use std::time::{Duration, Instant}; + +use async_trait::async_trait; +use aws_smithy_types::Document; +use log::{debug, error, info, warn}; +use open_feature::{ + provider::FeatureProvider, + provider::{ProviderMetadata, ProviderStatus, ResolutionDetails}, + EvaluationContext, EvaluationError, EvaluationErrorCode, EvaluationResult, StructValue, +}; +use serde_json::{Map, Value}; +use superposition_sdk::{Client, Config as SdkConfig}; +use tokio::sync::RwLock; + +use crate::traits::{AllFeatureProvider, FeatureExperimentMeta}; +use crate::types::*; +use crate::utils::ConversionUtils; + +// --------------------------------------------------------------------------- +// ResponseCache internals +// --------------------------------------------------------------------------- + +struct CacheEntry { + value: Map, + created_at: Instant, +} + +struct ResponseCache { + entries: HashMap, + max_entries: usize, + ttl: Duration, +} + +impl ResponseCache { + fn new(max_entries: usize, ttl: Duration) -> Self { + Self { + entries: HashMap::new(), + max_entries, + ttl, + } + } + + fn get(&self, key: &str) -> Option<&Map> { + self.entries.get(key).and_then(|entry| { + if entry.created_at.elapsed() < self.ttl { + Some(&entry.value) + } else { + None + } + }) + } + + fn put(&mut self, key: String, value: Map) { + // If at capacity, evict expired entries first + if self.entries.len() >= self.max_entries { + let now = Instant::now(); + self.entries + .retain(|_, entry| now.duration_since(entry.created_at) < self.ttl); + } + + // If still at capacity, remove the oldest entry + if self.entries.len() >= self.max_entries { + if let Some(oldest_key) = self + .entries + .iter() + .min_by_key(|(_, entry)| entry.created_at) + .map(|(k, _)| k.clone()) + { + self.entries.remove(&oldest_key); + } + } + + self.entries.insert( + key, + CacheEntry { + value, + created_at: Instant::now(), + }, + ); + } + + fn cache_key(context: &EvaluationContext) -> String { + let mut parts: Vec = Vec::new(); + + // Include targeting_key + if let Some(ref tk) = context.targeting_key { + parts.push(format!("tk={}", tk)); + } + + // Include sorted custom_fields for deterministic keys + let mut field_keys: Vec<&String> = context.custom_fields.keys().collect(); + field_keys.sort(); + for k in field_keys { + if let Some(v) = context.custom_fields.get(k) { + let serde_val = + ConversionUtils::convert_evaluation_context_value_to_serde_value(v); + parts.push(format!("{}={}", k, serde_val)); + } + } + + parts.join("|") + } +} + +// --------------------------------------------------------------------------- +// SuperpositionAPIProvider +// --------------------------------------------------------------------------- + +pub struct SuperpositionAPIProvider { + options: SuperpositionOptions, + cache: Option>>, + metadata: ProviderMetadata, + status: RwLock, +} + +impl SuperpositionAPIProvider { + /// Create a new provider without response caching. + pub fn new(options: SuperpositionOptions) -> Self { + Self { + options, + cache: None, + metadata: ProviderMetadata { + name: "SuperpositionAPIProvider".to_string(), + }, + status: RwLock::new(ProviderStatus::NotReady), + } + } + + /// Create a new provider with response caching. + pub fn with_cache(options: SuperpositionOptions, cache_options: CacheOptions) -> Self { + let max_entries = cache_options.size.unwrap_or(1000); + let ttl = Duration::from_secs(cache_options.ttl.unwrap_or(300)); + let cache = ResponseCache::new(max_entries, ttl); + + Self { + options, + cache: Some(Arc::new(RwLock::new(cache))), + metadata: ProviderMetadata { + name: "SuperpositionAPIProvider".to_string(), + }, + status: RwLock::new(ProviderStatus::NotReady), + } + } + + fn create_client(&self) -> Client { + let sdk_config = SdkConfig::builder() + .endpoint_url(&self.options.endpoint) + .bearer_token(self.options.token.clone().into()) + .behavior_version_latest() + .build(); + + Client::from_conf(sdk_config) + } + + async fn resolve_remote( + &self, + context: &EvaluationContext, + prefix_filter: Option<&[String]>, + ) -> Result> { + // 1. Check cache for the full (unfiltered) result + if let Some(ref cache_arc) = self.cache { + let cache_key = ResponseCache::cache_key(context); + let cache = cache_arc.read().await; + if let Some(cached_value) = cache.get(&cache_key) { + debug!("SuperpositionAPIProvider: cache hit for key"); + let result = if let Some(prefixes) = prefix_filter { + filter_by_prefix(cached_value, prefixes) + } else { + cached_value.clone() + }; + return Ok(result); + } + } + + // 2. Create SDK client + let client = self.create_client(); + + // 3. Extract context and targeting_key + let (query_data, _targeting_key) = + ConversionUtils::evaluation_context_to_query(context); + + // 4. Build and send the get_resolved_config request + // Always fetch WITHOUT prefix filter so we can cache the full result + let mut builder = client + .get_resolved_config() + .workspace_id(&self.options.workspace_id) + .org_id(&self.options.org_id); + + // Set context dimensions from evaluation context + let sdk_context: HashMap = query_data + .iter() + .map(|(k, v)| (k.clone(), serde_value_to_document(v))) + .collect(); + builder = builder.set_context(Some(sdk_context)); + + // NOTE: We intentionally do NOT set prefix filter on the SDK request + // so we always get the full config and can cache it. Prefix filtering + // is applied locally after caching. + + let response = builder.send().await.map_err(|e| { + SuperpositionError::NetworkError(format!( + "Failed to get resolved config: {}", + e + )) + })?; + + // 5. Convert response Document to Map + let config_doc = response.config(); + let config_value = ConversionUtils::document_to_value(config_doc).map_err(|e| { + SuperpositionError::SerializationError(format!( + "Failed to convert resolved config response: {}", + e + )) + })?; + + let full_result = match config_value { + Value::Object(map) => map, + other => { + warn!( + "SuperpositionAPIProvider: resolved config is not an object, wrapping: {:?}", + other + ); + let mut map = Map::new(); + map.insert("_value".to_string(), other); + map + } + }; + + // Cache the full (unfiltered) result + if let Some(ref cache_arc) = self.cache { + let cache_key = ResponseCache::cache_key(context); + let mut cache = cache_arc.write().await; + cache.put(cache_key, full_result.clone()); + } + + // Apply prefix filtering locally + let result = if let Some(prefixes) = prefix_filter { + filter_by_prefix(&full_result, prefixes) + } else { + full_result + }; + + Ok(result) + } + +} + +// --------------------------------------------------------------------------- +// Helper functions +// --------------------------------------------------------------------------- + +/// Filter a config map to only include keys that start with one of the given prefixes. +fn filter_by_prefix(config: &Map, prefixes: &[String]) -> Map { + if prefixes.is_empty() { + return config.clone(); + } + config + .iter() + .filter(|(key, _)| prefixes.iter().any(|prefix| key.starts_with(prefix))) + .map(|(k, v)| (k.clone(), v.clone())) + .collect() +} + +/// Convert a serde_json::Value to an aws_smithy_types::Document. +fn serde_value_to_document(value: &Value) -> Document { + match value { + Value::Null => Document::Null, + Value::Bool(b) => Document::Bool(*b), + Value::Number(n) => { + if let Some(u) = n.as_u64() { + Document::Number(aws_smithy_types::Number::PosInt(u)) + } else if let Some(i) = n.as_i64() { + Document::Number(aws_smithy_types::Number::NegInt(i)) + } else if let Some(f) = n.as_f64() { + Document::Number(aws_smithy_types::Number::Float(f)) + } else { + Document::Null + } + } + Value::String(s) => Document::String(s.clone()), + Value::Array(arr) => { + Document::Array(arr.iter().map(serde_value_to_document).collect()) + } + Value::Object(obj) => { + let map: HashMap = obj + .iter() + .map(|(k, v)| (k.clone(), serde_value_to_document(v))) + .collect(); + Document::Object(map) + } + } +} + +// --------------------------------------------------------------------------- +// Trait implementations +// --------------------------------------------------------------------------- + +#[async_trait] +impl AllFeatureProvider for SuperpositionAPIProvider { + async fn resolve_all_features( + &self, + context: &EvaluationContext, + ) -> Result> { + self.resolve_remote(context, None).await + } + + async fn resolve_all_features_with_filter( + &self, + context: &EvaluationContext, + prefix_filter: Option<&[String]>, + ) -> Result> { + self.resolve_remote(context, prefix_filter).await + } +} + +#[async_trait] +impl FeatureExperimentMeta for SuperpositionAPIProvider { + async fn get_applicable_variants( + &self, + _context: &EvaluationContext, + ) -> Result> { + // Remote resolution handles experiments server-side + Ok(vec![]) + } +} + +#[async_trait] +impl FeatureProvider for SuperpositionAPIProvider { + async fn initialize(&mut self, _context: &EvaluationContext) { + info!("Initializing SuperpositionAPIProvider..."); + { + let mut status = self.status.write().await; + *status = ProviderStatus::Ready; + } + info!("SuperpositionAPIProvider initialized successfully"); + } + + async fn resolve_bool_value( + &self, + flag_key: &str, + evaluation_context: &EvaluationContext, + ) -> EvaluationResult> { + match self.resolve_all_features(evaluation_context).await { + Ok(config) => match config.get(flag_key) { + Some(value) => match value.as_bool() { + Some(bool_val) => Ok(ResolutionDetails::new(bool_val)), + None => Err(EvaluationError { + code: EvaluationErrorCode::TypeMismatch, + message: Some(format!("Flag '{}' is not a boolean", flag_key)), + }), + }, + None => Err(EvaluationError { + code: EvaluationErrorCode::FlagNotFound, + message: Some(format!("Flag '{}' not found", flag_key)), + }), + }, + Err(e) => { + error!("Error evaluating boolean flag {}: {}", flag_key, e); + Err(EvaluationError { + code: EvaluationErrorCode::General(format!( + "Error evaluating flag '{}': {}", + flag_key, e + )), + message: Some(format!("Error evaluating flag '{}': {}", flag_key, e)), + }) + } + } + } + + async fn resolve_string_value( + &self, + flag_key: &str, + evaluation_context: &EvaluationContext, + ) -> EvaluationResult> { + match self.resolve_all_features(evaluation_context).await { + Ok(config) => match config.get(flag_key) { + Some(value) => match value.as_str() { + Some(str_val) => Ok(ResolutionDetails::new(str_val.to_owned())), + None => Err(EvaluationError { + code: EvaluationErrorCode::TypeMismatch, + message: Some(format!("Flag '{}' is not a string", flag_key)), + }), + }, + None => Err(EvaluationError { + code: EvaluationErrorCode::FlagNotFound, + message: Some(format!("Flag '{}' not found", flag_key)), + }), + }, + Err(e) => { + error!("Error evaluating String flag {}: {}", flag_key, e); + Err(EvaluationError { + code: EvaluationErrorCode::General(format!( + "Error evaluating flag '{}': {}", + flag_key, e + )), + message: Some(format!("Error evaluating flag '{}': {}", flag_key, e)), + }) + } + } + } + + async fn resolve_int_value( + &self, + flag_key: &str, + evaluation_context: &EvaluationContext, + ) -> EvaluationResult> { + match self.resolve_all_features(evaluation_context).await { + Ok(config) => match config.get(flag_key) { + Some(value) => match value.as_i64() { + Some(int_val) => Ok(ResolutionDetails::new(int_val)), + None => Err(EvaluationError { + code: EvaluationErrorCode::TypeMismatch, + message: Some(format!("Flag '{}' is not an integer", flag_key)), + }), + }, + None => Err(EvaluationError { + code: EvaluationErrorCode::FlagNotFound, + message: Some(format!("Flag '{}' not found", flag_key)), + }), + }, + Err(e) => { + error!("Error evaluating integer flag {}: {}", flag_key, e); + Err(EvaluationError { + code: EvaluationErrorCode::General(format!( + "Error evaluating flag '{}': {}", + flag_key, e + )), + message: Some(format!("Error evaluating flag '{}': {}", flag_key, e)), + }) + } + } + } + + async fn resolve_float_value( + &self, + flag_key: &str, + evaluation_context: &EvaluationContext, + ) -> EvaluationResult> { + match self.resolve_all_features(evaluation_context).await { + Ok(config) => match config.get(flag_key) { + Some(value) => match value.as_f64() { + Some(float_val) => Ok(ResolutionDetails::new(float_val)), + None => Err(EvaluationError { + code: EvaluationErrorCode::TypeMismatch, + message: Some(format!("Flag '{}' is not a float", flag_key)), + }), + }, + None => Err(EvaluationError { + code: EvaluationErrorCode::FlagNotFound, + message: Some(format!("Flag '{}' not found", flag_key)), + }), + }, + Err(e) => { + error!("Error evaluating float flag {}: {}", flag_key, e); + Err(EvaluationError { + code: EvaluationErrorCode::General(format!( + "Error evaluating flag '{}': {}", + flag_key, e + )), + message: Some(format!("Error evaluating flag '{}': {}", flag_key, e)), + }) + } + } + } + + async fn resolve_struct_value( + &self, + flag_key: &str, + evaluation_context: &EvaluationContext, + ) -> EvaluationResult> { + match self.resolve_all_features(evaluation_context).await { + Ok(config) => match config.get(flag_key) { + Some(value) => match ConversionUtils::serde_value_to_struct_value(value) { + Ok(struct_value) => Ok(ResolutionDetails::new(struct_value)), + Err(e) => { + error!("Error converting value to StructValue: {}", e); + Err(EvaluationError { + code: EvaluationErrorCode::TypeMismatch, + message: Some(format!( + "Flag '{}' is not a struct: {}", + flag_key, e + )), + }) + } + }, + None => Err(EvaluationError { + code: EvaluationErrorCode::FlagNotFound, + message: Some(format!("Flag '{}' not found", flag_key)), + }), + }, + Err(e) => { + error!("Error evaluating Object flag {}: {}", flag_key, e); + Err(EvaluationError { + code: EvaluationErrorCode::General(format!( + "Error evaluating flag '{}': {}", + flag_key, e + )), + message: Some(format!("Error evaluating flag '{}': {}", flag_key, e)), + }) + } + } + } + + fn metadata(&self) -> &ProviderMetadata { + &self.metadata + } + + fn status(&self) -> ProviderStatus { + match self.status.try_read() { + Ok(status) => match *status { + ProviderStatus::Ready => ProviderStatus::Ready, + ProviderStatus::Error => ProviderStatus::Error, + ProviderStatus::NotReady => ProviderStatus::NotReady, + ProviderStatus::STALE => ProviderStatus::STALE, + }, + Err(_) => ProviderStatus::NotReady, + } + } +} diff --git a/crates/superposition_provider/src/traits.rs b/crates/superposition_provider/src/traits.rs new file mode 100644 index 000000000..596379828 --- /dev/null +++ b/crates/superposition_provider/src/traits.rs @@ -0,0 +1,39 @@ +use async_trait::async_trait; +use open_feature::EvaluationContext; +use serde_json::{Map, Value}; + +use crate::types::Result; + +/// Trait for bulk configuration resolution. +/// +/// Implementors provide the ability to resolve all feature flags at once, +/// optionally filtered by key prefixes. +#[async_trait] +pub trait AllFeatureProvider: Send + Sync { + /// Resolve all features for the given evaluation context. + async fn resolve_all_features( + &self, + context: &EvaluationContext, + ) -> Result>; + + /// Resolve all features for the given evaluation context, optionally + /// filtered to only include keys matching the provided prefixes. + async fn resolve_all_features_with_filter( + &self, + context: &EvaluationContext, + prefix_filter: Option<&[String]>, + ) -> Result>; +} + +/// Trait for experiment variant resolution. +/// +/// Implementors provide the ability to determine which experiment variants +/// are applicable for a given evaluation context. +#[async_trait] +pub trait FeatureExperimentMeta: Send + Sync { + /// Get the list of applicable experiment variant IDs for the given context. + async fn get_applicable_variants( + &self, + context: &EvaluationContext, + ) -> Result>; +} diff --git a/crates/superposition_provider/src/types.rs b/crates/superposition_provider/src/types.rs index e407bcdfa..2fcafeb0a 100644 --- a/crates/superposition_provider/src/types.rs +++ b/crates/superposition_provider/src/types.rs @@ -105,10 +105,32 @@ impl Default for OnDemandStrategy { } } +/// Configuration for the watch refresh strategy. +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct WatchStrategy { + /// Debounce duration in milliseconds (default: 500). + pub debounce_ms: Option, +} + +impl Default for WatchStrategy { + fn default() -> Self { + Self { + debounce_ms: Some(500), + } + } +} + +/// A stream of change notifications from a data source. +pub struct WatchStream { + pub receiver: tokio::sync::mpsc::Receiver<()>, +} + #[derive(Debug, Clone, Serialize, Deserialize)] pub enum RefreshStrategy { Polling(PollingStrategy), OnDemand(OnDemandStrategy), + Watch(WatchStrategy), + Manual, } impl Default for RefreshStrategy { diff --git a/crates/superposition_provider/src/utils.rs b/crates/superposition_provider/src/utils.rs index 9f5d400d4..fef2e089a 100644 --- a/crates/superposition_provider/src/utils.rs +++ b/crates/superposition_provider/src/utils.rs @@ -577,6 +577,26 @@ impl ConversionUtils { dimension_data } + /// Convert an EvaluationContext into a (Map, Option) tuple + /// containing the custom fields as serde values and the targeting key. + /// This is used by both local and remote providers. + pub fn evaluation_context_to_query( + ctx: &open_feature::EvaluationContext, + ) -> (Map, Option) { + let context = ctx + .custom_fields + .iter() + .map(|(k, v)| { + ( + k.clone(), + Self::convert_evaluation_context_value_to_serde_value(v), + ) + }) + .collect(); + + (context, ctx.targeting_key.clone()) + } + /// Convert Config back to the legacy format for compatibility with existing provider logic pub fn config_to_legacy_format(config: &Config) -> HashMap { let mut result = HashMap::new(); diff --git a/docs/plans/2026-02-14-configuration-resolver-design.md b/docs/plans/2026-02-14-configuration-resolver-design.md new file mode 100644 index 000000000..dc8261df5 --- /dev/null +++ b/docs/plans/2026-02-14-configuration-resolver-design.md @@ -0,0 +1,226 @@ +# Configuration Resolver (Superposition Provider Refactor) + +## Overview + +Refactor the `superposition_provider` crate to introduce a trait-based architecture with pluggable data sources, supporting both local (in-process) and remote configuration resolution. The existing implementation (`provider.rs`, `client.rs`) is kept for comparison. + +## Goals + +1. Define clean trait abstractions: `AllFeatureProvider`, `FeatureExperimentMeta`, `SuperpositionDataSource` +2. Support pluggable data sources (HTTP, File/TOML) via `SuperpositionDataSource` +3. Implement `LocalResolutionProvider` with primary + fallback data sources and multiple refresh strategies +4. Implement `SuperpositionAPIProvider` for remote resolution +5. All providers implement OpenFeature's `FeatureProvider` trait +6. Provide examples for HTTP, File, and primary+fallback configurations + +## Module Layout + +``` +crates/superposition_provider/src/ +├── lib.rs # Re-exports +├── types.rs # Error types, options (existing, extended) +├── traits.rs # AllFeatureProvider + FeatureExperimentMeta traits +├── data_source.rs # SuperpositionDataSource trait, ConfigData, ExperimentData +├── data_source/ +│ ├── http.rs # HttpDataSource +│ └── file.rs # FileDataSource (TOML) +├── local_provider.rs # LocalResolutionProvider +├── remote_provider.rs # SuperpositionAPIProvider +├── utils.rs # Existing ConversionUtils (kept) +├── provider.rs # Existing provider (kept for comparison) +└── client.rs # Existing client (kept for comparison) +``` + +Uses Rust 2018 module style (`data_source.rs` + `data_source/` directory, no `mod.rs`). + +## Core Traits + +### AllFeatureProvider + +Bulk configuration resolution. Returns all (or filtered) resolved config values. + +```rust +#[async_trait] +pub trait AllFeatureProvider: Send + Sync { + async fn resolve_all_features( + &self, + context: &EvaluationContext, + ) -> Result>; + + async fn resolve_all_features_with_filter( + &self, + context: &EvaluationContext, + prefix_filter: Option<&[String]>, + ) -> Result>; +} +``` + +### FeatureExperimentMeta + +Experiment variant resolution. + +```rust +#[async_trait] +pub trait FeatureExperimentMeta: Send + Sync { + async fn get_applicable_variants( + &self, + context: &EvaluationContext, + ) -> Result>; +} +``` + +### SuperpositionDataSource + +Abstraction for raw data fetching. Each source implements its own filtering logic. + +```rust +pub struct ConfigData { + pub config: Config, + pub fetched_at: DateTime, +} + +pub struct ExperimentData { + pub experiments: Experiments, + pub experiment_groups: ExperimentGroups, + pub fetched_at: DateTime, +} + +#[async_trait] +pub trait SuperpositionDataSource: Send + Sync { + async fn fetch_config(&self) -> Result; + + async fn fetch_filtered_config( + &self, + context: Option<&Map>, + prefix_filter: Option<&[String]>, + ) -> Result; + + async fn fetch_active_experiments(&self) -> Result>; + + async fn fetch_candidate_active_experiments( + &self, + context: Option<&Map>, + prefix_filter: Option<&[String]>, + ) -> Result>; + + async fn fetch_matching_active_experiments( + &self, + context: Option<&Map>, + prefix_filter: Option<&[String]>, + ) -> Result>; + + fn supports_experiments(&self) -> bool; + + async fn close(&self) -> Result<()>; +} +``` + +- `fetch_candidate_active_experiments` - partial context matching (uses `Contextual::filter_by_eval`) +- `fetch_matching_active_experiments` - exact context matching (uses `Contextual::filter_exact_match`) + +## Data Source Implementations + +### HttpDataSource + +Wraps `superposition_sdk::Client`. Takes `SuperpositionOptions`. + +- `fetch_config` → `client.get_config()` +- `fetch_filtered_config` → fetches full config, filters locally via `Config::filter_by_prefix` / `Config::filter_by_dimensions` +- `fetch_active_experiments` → `client.list_experiment()` + `client.list_experiment_groups()` +- `fetch_candidate_active_experiments` → fetches all, filters via `Contextual::filter_by_eval` +- `fetch_matching_active_experiments` → fetches all, filters via `Contextual::filter_exact_match` +- `supports_experiments` → `true` +- `close` → no-op + +### FileDataSource + +Reads TOML config via `superposition_core::toml::parse_toml_config`. Takes `PathBuf`. + +- `fetch_config` → reads file, parses TOML +- `fetch_filtered_config` → parses full config, filters locally +- All experiment methods → `Ok(None)` +- `supports_experiments` → `false` +- `close` → no-op + +## LocalResolutionProvider + +In-process resolver with primary + optional fallback data source. + +```rust +pub struct LocalResolutionProvider { + primary: Box, + fallback: Option>, + refresh_strategy: RefreshStrategy, + cached_config: Arc>>, + cached_experiments: Arc>>, + polling_task: RwLock>>, +} +``` + +### Initialization + +1. Fetch from primary data source +2. If primary fails and fallback exists, fetch from fallback +3. If both fail, return error +4. Start refresh strategy + +### Refresh Strategies + +- **Polling** - spawns tokio task that periodically fetches from primary +- **OnDemand** - lazy fetch with TTL; uses stale data if fetch fails (configurable) +- **Manual** - no automatic refresh; caller invokes `refresh()` explicitly + +On refresh failure: keep last known good data (never overwrite cache with error). + +### Trait Implementations + +- **AllFeatureProvider**: reads cached config, calls `eval_config`. If experiments cached, calls `get_applicable_variants` and injects `variantIds` into context before evaluation. +- **FeatureExperimentMeta**: reads cached experiments/groups, calls `get_applicable_variants`. +- **FeatureProvider** (OpenFeature): delegates to `resolve_all_features`, extracts single key. + +### Public API + +```rust +impl LocalResolutionProvider { + pub fn new(primary, fallback, refresh_strategy) -> Self; + pub async fn init(&self) -> Result<()>; + pub async fn refresh(&self) -> Result<()>; + pub async fn close(&self) -> Result<()>; +} +``` + +## SuperpositionAPIProvider (Remote Resolution) + +Sends context over network, gets resolved config back. + +```rust +pub struct SuperpositionAPIProvider { + options: SuperpositionOptions, + cache: Option>, +} +``` + +- Uses `superposition_sdk::Client` to call resolve APIs +- Optional local response cache keyed on hash of (context + targeting_key) with TTL eviction +- Implements `AllFeatureProvider`, `FeatureExperimentMeta`, and OpenFeature `FeatureProvider` + +## EvaluationContext + +Re-uses `open_feature::EvaluationContext` directly. No custom wrapper type. All traits and providers accept `&EvaluationContext` from the OpenFeature SDK. + +## Examples + +Three example binaries in `crates/superposition_provider/examples/`: + +### `local_http_example.rs` +LocalResolutionProvider with HTTP source, polling refresh. + +### `local_file_example.rs` +LocalResolutionProvider with TOML file source, on-demand refresh. + +### `local_with_fallback_example.rs` +LocalResolutionProvider with HTTP primary + TOML fallback. Demonstrates: +- Bulk resolution (`resolve_all_features`) +- Filtered resolution (`resolve_all_features_with_filter`) +- Experiment variants (`get_applicable_variants`) +- Manual refresh (`refresh()`) diff --git a/docs/plans/2026-02-14-configuration-resolver-plan.md b/docs/plans/2026-02-14-configuration-resolver-plan.md new file mode 100644 index 000000000..e97031ad8 --- /dev/null +++ b/docs/plans/2026-02-14-configuration-resolver-plan.md @@ -0,0 +1,1987 @@ +# Configuration Resolver Implementation Plan + +> **For Claude:** REQUIRED SUB-SKILL: Use superpowers:executing-plans to implement this plan task-by-task. + +**Goal:** Refactor the `superposition_provider` crate to use a trait-based architecture with pluggable data sources (HTTP, File), supporting both local and remote configuration resolution. + +**Architecture:** Three core traits (`AllFeatureProvider`, `FeatureExperimentMeta`, `SuperpositionDataSource`) with two provider implementations (`LocalResolutionProvider`, `SuperpositionAPIProvider`). Data sources are pluggable via `SuperpositionDataSource` trait. All providers implement OpenFeature's `FeatureProvider`. + +**Tech Stack:** Rust, async-trait, tokio, serde_json, open-feature, superposition_core (eval_config, get_applicable_variants, parse_toml_config), superposition_sdk (HTTP client), superposition_types (Config, Contextual) + +**Design doc:** `docs/plans/2026-02-14-configuration-resolver-design.md` + +--- + +### Task 1: Define core traits (`traits.rs`) + +**Files:** +- Create: `crates/superposition_provider/src/traits.rs` +- Modify: `crates/superposition_provider/src/lib.rs` + +**Step 1: Create `traits.rs` with `AllFeatureProvider` and `FeatureExperimentMeta` traits** + +```rust +use async_trait::async_trait; +use open_feature::EvaluationContext; +use serde_json::{Map, Value}; + +use crate::types::Result; + +/// Trait for bulk configuration resolution +/// +/// Provides methods to resolve all feature flags at once, +/// which is more efficient than resolving them one by one. +#[async_trait] +pub trait AllFeatureProvider: Send + Sync { + /// Resolve all features for the given evaluation context + async fn resolve_all_features( + &self, + context: &EvaluationContext, + ) -> Result>; + + /// Resolve features matching prefix filters + /// + /// If prefix_filter is None, behaves like resolve_all_features + async fn resolve_all_features_with_filter( + &self, + context: &EvaluationContext, + prefix_filter: Option<&[String]>, + ) -> Result>; +} + +/// Trait for experiment metadata and variant resolution +#[async_trait] +pub trait FeatureExperimentMeta: Send + Sync { + /// Get applicable variant IDs for the given context + async fn get_applicable_variants( + &self, + context: &EvaluationContext, + ) -> Result>; +} +``` + +**Step 2: Add `traits` module to `lib.rs`** + +Add `pub mod traits;` to `crates/superposition_provider/src/lib.rs` and re-export: `pub use traits::*;` + +**Step 3: Verify it compiles** + +Run: `cargo check -p superposition_provider` +Expected: compiles with no errors + +**Step 4: Commit** + +```bash +git add crates/superposition_provider/src/traits.rs crates/superposition_provider/src/lib.rs +git commit -m "feat: add AllFeatureProvider and FeatureExperimentMeta traits" +``` + +--- + +### Task 2: Define `SuperpositionDataSource` trait (`data_source.rs`) + +**Files:** +- Create: `crates/superposition_provider/src/data_source.rs` +- Modify: `crates/superposition_provider/src/lib.rs` + +**Step 1: Create `data_source.rs` with trait, `ConfigData`, and `ExperimentData`** + +```rust +use async_trait::async_trait; +use chrono::{DateTime, Utc}; +use serde_json::{Map, Value}; +use superposition_core::experiment::ExperimentGroups; +use superposition_core::Experiments; +use superposition_types::Config; + +use crate::types::Result; + +pub mod file; +pub mod http; + +/// Data fetched from a configuration source +#[derive(Debug, Clone)] +pub struct ConfigData { + pub config: Config, + pub fetched_at: DateTime, +} + +impl ConfigData { + pub fn new(config: Config) -> Self { + Self { + config, + fetched_at: Utc::now(), + } + } +} + +/// Experiment data fetched from a source +#[derive(Debug, Clone)] +pub struct ExperimentData { + pub experiments: Experiments, + pub experiment_groups: ExperimentGroups, + pub fetched_at: DateTime, +} + +impl ExperimentData { + pub fn new(experiments: Experiments, experiment_groups: ExperimentGroups) -> Self { + Self { + experiments, + experiment_groups, + fetched_at: Utc::now(), + } + } +} + +/// Trait for abstracting data sources for Superposition configuration and experiments +/// +/// Allows plugging different data sources (HTTP, File, Redis, etc.) +/// into the Superposition provider system. Each implementation handles +/// its own filtering logic. +#[async_trait] +pub trait SuperpositionDataSource: Send + Sync { + /// Fetch the latest configuration from the data source + async fn fetch_config(&self) -> Result; + + /// Fetch configuration with context/prefix filters + /// + /// Each data source implements its own filtering strategy. + /// HTTP may use server-side filtering; File filters after parse. + async fn fetch_filtered_config( + &self, + context: Option<&Map>, + prefix_filter: Option<&[String]>, + ) -> Result; + + /// Fetch all active experiment data + /// + /// Returns None if the data source doesn't support experiments + async fn fetch_active_experiments(&self) -> Result>; + + /// Fetch active experiments filtered with partial context matching + /// + /// Uses partial/candidate matching - experiments whose context + /// could potentially match the given dimensions. + /// Returns None if the data source doesn't support experiments. + async fn fetch_candidate_active_experiments( + &self, + context: Option<&Map>, + prefix_filter: Option<&[String]>, + ) -> Result>; + + /// Fetch active experiments filtered with exact context matching + /// + /// Uses exact matching - only experiments whose context + /// exactly matches the given dimensions. + /// Returns None if the data source doesn't support experiments. + async fn fetch_matching_active_experiments( + &self, + context: Option<&Map>, + prefix_filter: Option<&[String]>, + ) -> Result>; + + /// Check if this data source supports experiments + fn supports_experiments(&self) -> bool; + + /// Close and cleanup resources used by this data source + async fn close(&self) -> Result<()>; +} +``` + +**Step 2: Create placeholder files for submodules** + +Create empty `crates/superposition_provider/src/data_source/http.rs` and `crates/superposition_provider/src/data_source/file.rs` (just comments for now so it compiles). + +**Step 3: Add `data_source` module to `lib.rs`** + +Add `pub mod data_source;` and re-export: `pub use data_source::{ConfigData, ExperimentData, SuperpositionDataSource};` + +**Step 4: Verify it compiles** + +Run: `cargo check -p superposition_provider` +Expected: compiles with no errors + +**Step 5: Commit** + +```bash +git add crates/superposition_provider/src/data_source.rs crates/superposition_provider/src/data_source/ crates/superposition_provider/src/lib.rs +git commit -m "feat: add SuperpositionDataSource trait with ConfigData and ExperimentData" +``` + +--- + +### Task 3: Implement `HttpDataSource` + +**Files:** +- Modify: `crates/superposition_provider/src/data_source/http.rs` + +**Reference files:** +- `crates/superposition_provider/src/client.rs` (existing HTTP fetch logic in `CacConfig::get_config_static`, `ExperimentationConfig::get_experiments_static`, `ExperimentationConfig::get_experiment_groups_static`) +- `crates/superposition_provider/src/utils.rs` (`ConversionUtils::convert_get_config_response`, `convert_experiments_response`, `convert_experiment_groups_response`) + +**Step 1: Implement `HttpDataSource` struct and `SuperpositionDataSource` trait** + +```rust +use std::collections::HashSet; + +use async_trait::async_trait; +use log::info; +use serde_json::{Map, Value}; +use superposition_types::logic::{apply, partial_apply}; + +use crate::data_source::{ConfigData, ExperimentData, SuperpositionDataSource}; +use crate::types::{Result, SuperpositionError, SuperpositionOptions}; +use crate::utils::ConversionUtils; + +/// HTTP-based data source using the Superposition SDK +/// +/// Fetches configuration and experiment data from the Superposition +/// service over HTTP. +pub struct HttpDataSource { + options: SuperpositionOptions, +} + +impl HttpDataSource { + pub fn new(options: SuperpositionOptions) -> Self { + Self { options } + } + + fn create_client(&self) -> superposition_sdk::Client { + use superposition_sdk::{Client, Config as SdkConfig}; + + let sdk_config = SdkConfig::builder() + .endpoint_url(&self.options.endpoint) + .bearer_token(self.options.token.clone().into()) + .behavior_version_latest() + .build(); + + Client::from_conf(sdk_config) + } + + async fn fetch_experiments_and_groups(&self) -> Result { + use superposition_sdk::types::ExperimentStatusType; + + let client = self.create_client(); + + let (experiments_result, groups_result) = tokio::join!( + client + .list_experiment() + .workspace_id(&self.options.workspace_id) + .org_id(&self.options.org_id) + .all(true) + .status(ExperimentStatusType::Created) + .status(ExperimentStatusType::Inprogress) + .send(), + client + .list_experiment_groups() + .workspace_id(&self.options.workspace_id) + .org_id(&self.options.org_id) + .all(true) + .send() + ); + + let experiments_response = experiments_result.map_err(|e| { + SuperpositionError::NetworkError(format!("Failed to list experiments: {}", e)) + })?; + let groups_response = groups_result.map_err(|e| { + SuperpositionError::NetworkError(format!("Failed to list experiment groups: {}", e)) + })?; + + let experiments = ConversionUtils::convert_experiments_response(&experiments_response)?; + let groups = ConversionUtils::convert_experiment_groups_response(&groups_response)?; + + Ok(ExperimentData::new(experiments, groups)) + } + + /// Filter experiments by partial context match (any dimension matches) + fn filter_experiments_candidate( + exp_data: &ExperimentData, + context: &Map, + ) -> ExperimentData { + let filtered_experiments = exp_data + .experiments + .iter() + .filter(|exp| partial_apply(&exp.context, context)) + .cloned() + .collect(); + + let filtered_groups = exp_data + .experiment_groups + .iter() + .filter(|group| partial_apply(&group.context, context)) + .cloned() + .collect(); + + ExperimentData::new(filtered_experiments, filtered_groups) + } + + /// Filter experiments by exact context match (all dimensions must match) + fn filter_experiments_matching( + exp_data: &ExperimentData, + context: &Map, + ) -> ExperimentData { + let filtered_experiments = exp_data + .experiments + .iter() + .filter(|exp| apply(&exp.context, context)) + .cloned() + .collect(); + + let filtered_groups = exp_data + .experiment_groups + .iter() + .filter(|group| apply(&group.context, context)) + .cloned() + .collect(); + + ExperimentData::new(filtered_experiments, filtered_groups) + } + + /// Filter experiments by prefix - keep experiments that have variants + /// with override keys matching any of the given prefixes + fn filter_experiments_by_prefix( + exp_data: &ExperimentData, + prefixes: &[String], + ) -> ExperimentData { + let filtered_experiments = exp_data + .experiments + .iter() + .filter(|exp| { + exp.variants.iter().any(|variant| { + let overrides: Map = variant.overrides.clone().into(); + overrides.keys().any(|key| { + prefixes.iter().any(|prefix| key.starts_with(prefix)) + }) + }) + }) + .cloned() + .collect(); + + // Keep all groups - they reference experiments by ID + ExperimentData::new(filtered_experiments, exp_data.experiment_groups.clone()) + } +} + +#[async_trait] +impl SuperpositionDataSource for HttpDataSource { + async fn fetch_config(&self) -> Result { + let client = self.create_client(); + + info!("HttpDataSource: fetching config"); + + let response = client + .get_config() + .workspace_id(&self.options.workspace_id) + .org_id(&self.options.org_id) + .send() + .await + .map_err(|e| { + SuperpositionError::NetworkError(format!("Failed to get config: {}", e)) + })?; + + let config = ConversionUtils::convert_get_config_response(&response)?; + Ok(ConfigData::new(config)) + } + + async fn fetch_filtered_config( + &self, + context: Option<&Map>, + prefix_filter: Option<&[String]>, + ) -> Result { + let config_data = self.fetch_config().await?; + let mut config = config_data.config; + + if let Some(ctx) = context { + if !ctx.is_empty() { + config = config.filter_by_dimensions(ctx); + } + } + + if let Some(prefixes) = prefix_filter { + if !prefixes.is_empty() { + config = config.filter_by_prefix(&HashSet::from_iter(prefixes.iter().cloned())); + } + } + + Ok(ConfigData::new(config)) + } + + async fn fetch_active_experiments(&self) -> Result> { + let data = self.fetch_experiments_and_groups().await?; + Ok(Some(data)) + } + + async fn fetch_candidate_active_experiments( + &self, + context: Option<&Map>, + prefix_filter: Option<&[String]>, + ) -> Result> { + let mut data = self.fetch_experiments_and_groups().await?; + + if let Some(ctx) = context { + if !ctx.is_empty() { + data = Self::filter_experiments_candidate(&data, ctx); + } + } + + if let Some(prefixes) = prefix_filter { + if !prefixes.is_empty() { + data = Self::filter_experiments_by_prefix(&data, prefixes); + } + } + + Ok(Some(data)) + } + + async fn fetch_matching_active_experiments( + &self, + context: Option<&Map>, + prefix_filter: Option<&[String]>, + ) -> Result> { + let mut data = self.fetch_experiments_and_groups().await?; + + if let Some(ctx) = context { + if !ctx.is_empty() { + data = Self::filter_experiments_matching(&data, ctx); + } + } + + if let Some(prefixes) = prefix_filter { + if !prefixes.is_empty() { + data = Self::filter_experiments_by_prefix(&data, prefixes); + } + } + + Ok(Some(data)) + } + + fn supports_experiments(&self) -> bool { + true + } + + async fn close(&self) -> Result<()> { + Ok(()) + } +} +``` + +**Step 2: Verify it compiles** + +Run: `cargo check -p superposition_provider` +Expected: compiles with no errors (may need minor adjustments for `partial_apply` / `apply` import paths) + +**Step 3: Commit** + +```bash +git add crates/superposition_provider/src/data_source/http.rs +git commit -m "feat: implement HttpDataSource for SuperpositionDataSource trait" +``` + +--- + +### Task 4: Implement `FileDataSource` + +**Files:** +- Modify: `crates/superposition_provider/src/data_source/file.rs` + +**Reference files:** +- `crates/superposition_core/src/toml.rs` (`parse_toml_config`) + +**Step 1: Implement `FileDataSource`** + +```rust +use std::collections::HashSet; +use std::path::PathBuf; + +use async_trait::async_trait; +use log::info; +use serde_json::{Map, Value}; +use superposition_core::toml::parse_toml_config; + +use crate::data_source::{ConfigData, ExperimentData, SuperpositionDataSource}; +use crate::types::{Result, SuperpositionError}; + +/// File-based data source using TOML configuration files +/// +/// Reads configuration from a TOML file using `superposition_core::toml::parse_toml_config`. +/// Does not support experiments - all experiment methods return `Ok(None)`. +pub struct FileDataSource { + file_path: PathBuf, +} + +impl FileDataSource { + pub fn new(file_path: PathBuf) -> Self { + Self { file_path } + } +} + +#[async_trait] +impl SuperpositionDataSource for FileDataSource { + async fn fetch_config(&self) -> Result { + info!("FileDataSource: reading config from {:?}", self.file_path); + + let content = tokio::fs::read_to_string(&self.file_path) + .await + .map_err(|e| { + SuperpositionError::ConfigError(format!( + "Failed to read config file {:?}: {}", + self.file_path, e + )) + })?; + + let config = parse_toml_config(&content).map_err(|e| { + SuperpositionError::ConfigError(format!( + "Failed to parse TOML config from {:?}: {}", + self.file_path, e + )) + })?; + + Ok(ConfigData::new(config)) + } + + async fn fetch_filtered_config( + &self, + context: Option<&Map>, + prefix_filter: Option<&[String]>, + ) -> Result { + let config_data = self.fetch_config().await?; + let mut config = config_data.config; + + if let Some(ctx) = context { + if !ctx.is_empty() { + config = config.filter_by_dimensions(ctx); + } + } + + if let Some(prefixes) = prefix_filter { + if !prefixes.is_empty() { + config = config.filter_by_prefix(&HashSet::from_iter(prefixes.iter().cloned())); + } + } + + Ok(ConfigData::new(config)) + } + + async fn fetch_active_experiments(&self) -> Result> { + Ok(None) + } + + async fn fetch_candidate_active_experiments( + &self, + _context: Option<&Map>, + _prefix_filter: Option<&[String]>, + ) -> Result> { + Ok(None) + } + + async fn fetch_matching_active_experiments( + &self, + _context: Option<&Map>, + _prefix_filter: Option<&[String]>, + ) -> Result> { + Ok(None) + } + + fn supports_experiments(&self) -> bool { + false + } + + async fn close(&self) -> Result<()> { + Ok(()) + } +} +``` + +**Step 2: Verify it compiles** + +Run: `cargo check -p superposition_provider` +Expected: compiles with no errors + +**Step 3: Commit** + +```bash +git add crates/superposition_provider/src/data_source/file.rs +git commit -m "feat: implement FileDataSource for SuperpositionDataSource trait" +``` + +--- + +### Task 5: Add `Manual` variant to `RefreshStrategy` + +**Files:** +- Modify: `crates/superposition_provider/src/types.rs` + +**Step 1: Add `Manual` variant to `RefreshStrategy` enum** + +Add to the existing `RefreshStrategy` enum in `types.rs`: + +```rust +#[derive(Debug, Clone, Serialize, Deserialize)] +pub enum RefreshStrategy { + Polling(PollingStrategy), + OnDemand(OnDemandStrategy), + Manual, +} +``` + +**Step 2: Verify it compiles** + +Run: `cargo check -p superposition_provider` +Expected: compiles (existing match statements in `client.rs` will need a `Manual` arm - add a no-op arm if needed) + +**Step 3: Commit** + +```bash +git add crates/superposition_provider/src/types.rs crates/superposition_provider/src/client.rs +git commit -m "feat: add Manual variant to RefreshStrategy" +``` + +--- + +### Task 6: Implement `LocalResolutionProvider` + +**Files:** +- Create: `crates/superposition_provider/src/local_provider.rs` +- Modify: `crates/superposition_provider/src/lib.rs` + +**Reference files:** +- `crates/superposition_provider/src/provider.rs` (existing `SuperpositionProvider` for pattern reference) +- `crates/superposition_provider/src/client.rs` (existing refresh/polling logic) +- `crates/superposition_core/src/config.rs` (`eval_config`) +- `crates/superposition_core/src/experiment.rs` (`get_applicable_variants`) + +**Step 1: Create `local_provider.rs` with struct and construction/init/refresh/close** + +```rust +use std::collections::HashMap; +use std::sync::Arc; + +use async_trait::async_trait; +use log::{debug, error, info, warn}; +use open_feature::{ + provider::{FeatureProvider, ProviderMetadata, ProviderStatus, ResolutionDetails}, + EvaluationContext, EvaluationError, EvaluationErrorCode, EvaluationResult, StructValue, +}; +use serde_json::{Map, Value}; +use superposition_core::{eval_config, get_applicable_variants, MergeStrategy}; +use superposition_types::DimensionInfo; +use tokio::sync::RwLock; +use tokio::task::JoinHandle; +use tokio::time::{sleep, Duration}; + +use crate::data_source::{ConfigData, ExperimentData, SuperpositionDataSource}; +use crate::traits::{AllFeatureProvider, FeatureExperimentMeta}; +use crate::types::*; +use crate::utils::ConversionUtils; + +/// Local (in-process) resolution provider +/// +/// Caches raw configuration and experiment data from a primary data source +/// (with optional fallback), and resolves configuration locally using +/// `superposition_core::eval_config`. +pub struct LocalResolutionProvider { + primary: Arc, + fallback: Option>, + refresh_strategy: RefreshStrategy, + cached_config: Arc>>, + cached_experiments: Arc>>, + polling_task: RwLock>>, + metadata: ProviderMetadata, + status: RwLock, +} + +impl LocalResolutionProvider { + pub fn new( + primary: Box, + fallback: Option>, + refresh_strategy: RefreshStrategy, + ) -> Self { + Self { + primary: Arc::from(primary), + fallback: fallback.map(Arc::from), + refresh_strategy, + cached_config: Arc::new(RwLock::new(None)), + cached_experiments: Arc::new(RwLock::new(None)), + polling_task: RwLock::new(None), + metadata: ProviderMetadata { + name: "LocalResolutionProvider".to_string(), + }, + status: RwLock::new(ProviderStatus::NotReady), + } + } + + /// Initialize the provider: fetch initial data and start refresh strategy + pub async fn init(&self) -> Result<()> { + info!("Initializing LocalResolutionProvider..."); + + // Try primary first + let config_result = self.primary.fetch_config().await; + let exp_result = self.primary.fetch_active_experiments().await; + + match config_result { + Ok(config_data) => { + let mut cached = self.cached_config.write().await; + *cached = Some(config_data); + info!("Config fetched from primary data source"); + } + Err(primary_err) => { + warn!("Primary data source failed: {}", primary_err); + // Try fallback + if let Some(fallback) = &self.fallback { + match fallback.fetch_config().await { + Ok(config_data) => { + let mut cached = self.cached_config.write().await; + *cached = Some(config_data); + info!("Config fetched from fallback data source"); + } + Err(fallback_err) => { + error!("Fallback data source also failed: {}", fallback_err); + return Err(SuperpositionError::ConfigError(format!( + "Both primary ({}) and fallback ({}) data sources failed", + primary_err, fallback_err + ))); + } + } + } else { + return Err(primary_err); + } + } + } + + // Handle experiments (best-effort, only if primary supports them) + match exp_result { + Ok(Some(exp_data)) => { + let mut cached = self.cached_experiments.write().await; + *cached = Some(exp_data); + info!("Experiments fetched from primary data source"); + } + Ok(None) => { + debug!("Primary data source does not support experiments"); + } + Err(e) => { + warn!("Failed to fetch experiments from primary: {}", e); + // Try fallback for experiments + if let Some(fallback) = &self.fallback { + if fallback.supports_experiments() { + match fallback.fetch_active_experiments().await { + Ok(Some(exp_data)) => { + let mut cached = self.cached_experiments.write().await; + *cached = Some(exp_data); + info!("Experiments fetched from fallback data source"); + } + _ => { + warn!("Fallback also failed to fetch experiments"); + } + } + } + } + } + } + + // Start refresh strategy + match &self.refresh_strategy { + RefreshStrategy::Polling(strategy) => { + info!("Starting polling with interval={}s", strategy.interval); + let task = self.start_polling(strategy.interval).await; + let mut polling_task = self.polling_task.write().await; + *polling_task = Some(task); + } + RefreshStrategy::OnDemand(strategy) => { + info!("Using on-demand strategy with ttl={}s", strategy.ttl); + } + RefreshStrategy::Manual => { + info!("Using manual refresh strategy"); + } + } + + let mut status = self.status.write().await; + *status = ProviderStatus::Ready; + + info!("LocalResolutionProvider initialized successfully"); + Ok(()) + } + + /// Manually refresh data from the primary data source + pub async fn refresh(&self) -> Result<()> { + info!("Manual refresh triggered"); + self.do_refresh().await + } + + /// Close the provider and clean up resources + pub async fn close(&self) -> Result<()> { + let mut polling_task = self.polling_task.write().await; + if let Some(task) = polling_task.take() { + task.abort(); + } + + self.primary.close().await?; + if let Some(fallback) = &self.fallback { + fallback.close().await?; + } + + let mut cached_config = self.cached_config.write().await; + *cached_config = None; + let mut cached_experiments = self.cached_experiments.write().await; + *cached_experiments = None; + + let mut status = self.status.write().await; + *status = ProviderStatus::NotReady; + + Ok(()) + } + + async fn do_refresh(&self) -> Result<()> { + // Fetch config - keep last known good on failure + match self.primary.fetch_config().await { + Ok(config_data) => { + let mut cached = self.cached_config.write().await; + *cached = Some(config_data); + debug!("Config refreshed from primary"); + } + Err(e) => { + warn!("Refresh failed for config, keeping last known good: {}", e); + } + } + + // Fetch experiments - keep last known good on failure + if self.primary.supports_experiments() { + match self.primary.fetch_active_experiments().await { + Ok(Some(exp_data)) => { + let mut cached = self.cached_experiments.write().await; + *cached = Some(exp_data); + debug!("Experiments refreshed from primary"); + } + Ok(None) => {} + Err(e) => { + warn!( + "Refresh failed for experiments, keeping last known good: {}", + e + ); + } + } + } + + Ok(()) + } + + async fn start_polling(&self, interval: u64) -> JoinHandle<()> { + let primary = self.primary.clone(); + let cached_config = self.cached_config.clone(); + let cached_experiments = self.cached_experiments.clone(); + + tokio::spawn(async move { + loop { + sleep(Duration::from_secs(interval)).await; + + match primary.fetch_config().await { + Ok(config_data) => { + let mut cached = cached_config.write().await; + *cached = Some(config_data); + debug!("Config updated via polling"); + } + Err(e) => { + error!("Polling config refresh failed: {}", e); + } + } + + if primary.supports_experiments() { + match primary.fetch_active_experiments().await { + Ok(Some(exp_data)) => { + let mut cached = cached_experiments.write().await; + *cached = Some(exp_data); + debug!("Experiments updated via polling"); + } + Ok(None) => {} + Err(e) => { + error!("Polling experiments refresh failed: {}", e); + } + } + } + } + }) + } + + async fn ensure_fresh_data(&self) -> Result<()> { + if let RefreshStrategy::OnDemand(strategy) = &self.refresh_strategy { + let should_refresh = { + let cached = self.cached_config.read().await; + match cached.as_ref() { + Some(data) => { + let elapsed = chrono::Utc::now() - data.fetched_at; + elapsed.num_seconds() > strategy.ttl as i64 + } + None => true, + } + }; + + if should_refresh { + debug!("On-demand TTL expired, refreshing"); + if let Err(e) = self.do_refresh().await { + if !strategy.use_stale_on_error.unwrap_or(false) { + return Err(e); + } + warn!("On-demand refresh failed, using stale data: {}", e); + } + } + } + Ok(()) + } + + fn get_context_from_evaluation_context( + evaluation_context: &EvaluationContext, + ) -> (Map, Option) { + let context = evaluation_context + .custom_fields + .iter() + .map(|(k, v)| { + ( + k.clone(), + ConversionUtils::convert_evaluation_context_value_to_serde_value(v), + ) + }) + .collect(); + + (context, evaluation_context.targeting_key.clone()) + } + + async fn get_dimensions_info(&self) -> HashMap { + let cached = self.cached_config.read().await; + cached + .as_ref() + .map(|d| d.config.dimensions.clone()) + .unwrap_or_default() + } + + async fn eval_with_context( + &self, + context: &EvaluationContext, + prefix_filter: Option<&[String]>, + ) -> Result> { + self.ensure_fresh_data().await?; + + let (mut query_data, targeting_key) = + Self::get_context_from_evaluation_context(context); + + let dimensions_info = self.get_dimensions_info().await; + + // Get experiment variants if available + let cached_experiments = self.cached_experiments.read().await; + if let Some(exp_data) = cached_experiments.as_ref() { + let variant_ids = get_applicable_variants( + &dimensions_info, + exp_data.experiments.clone(), + &exp_data.experiment_groups, + &query_data, + &targeting_key.unwrap_or_default(), + prefix_filter.map(|p| p.to_vec()), + ) + .map_err(|e| { + SuperpositionError::ConfigError(format!( + "Failed to get applicable variants: {}", + e + )) + })?; + + query_data.insert( + "variantIds".to_string(), + Value::Array(variant_ids.into_iter().map(Value::String).collect()), + ); + } + drop(cached_experiments); + + // Evaluate config + let cached_config = self.cached_config.read().await; + match cached_config.as_ref() { + Some(config_data) => eval_config( + (*config_data.config.default_configs).clone(), + &config_data.config.contexts, + &config_data.config.overrides, + &config_data.config.dimensions, + &query_data, + MergeStrategy::MERGE, + prefix_filter.map(|p| p.to_vec()), + ) + .map_err(|e| { + SuperpositionError::ConfigError(format!("Failed to evaluate config: {}", e)) + }), + None => Err(SuperpositionError::ConfigError( + "No cached config available".into(), + )), + } + } +} +``` + +**Step 2: Implement `AllFeatureProvider` for `LocalResolutionProvider`** + +```rust +#[async_trait] +impl AllFeatureProvider for LocalResolutionProvider { + async fn resolve_all_features( + &self, + context: &EvaluationContext, + ) -> Result> { + self.eval_with_context(context, None).await + } + + async fn resolve_all_features_with_filter( + &self, + context: &EvaluationContext, + prefix_filter: Option<&[String]>, + ) -> Result> { + self.eval_with_context(context, prefix_filter).await + } +} +``` + +**Step 3: Implement `FeatureExperimentMeta` for `LocalResolutionProvider`** + +```rust +#[async_trait] +impl FeatureExperimentMeta for LocalResolutionProvider { + async fn get_applicable_variants( + &self, + context: &EvaluationContext, + ) -> Result> { + self.ensure_fresh_data().await?; + + let (query_data, targeting_key) = + Self::get_context_from_evaluation_context(context); + let dimensions_info = self.get_dimensions_info().await; + + let cached_experiments = self.cached_experiments.read().await; + match cached_experiments.as_ref() { + Some(exp_data) => get_applicable_variants( + &dimensions_info, + exp_data.experiments.clone(), + &exp_data.experiment_groups, + &query_data, + &targeting_key.unwrap_or_default(), + None, + ) + .map_err(|e| { + SuperpositionError::ConfigError(format!( + "Failed to get applicable variants: {}", + e + )) + }), + None => Ok(vec![]), + } + } +} +``` + +**Step 4: Implement `FeatureProvider` (OpenFeature) for `LocalResolutionProvider`** + +Follow the same pattern as `crates/superposition_provider/src/provider.rs`: + +```rust +#[async_trait] +impl FeatureProvider for LocalResolutionProvider { + async fn initialize(&mut self, _context: &EvaluationContext) { + info!("Initializing LocalResolutionProvider via OpenFeature..."); + { + let mut status = self.status.write().await; + *status = ProviderStatus::NotReady; + } + if (self.init().await).is_err() { + let mut status = self.status.write().await; + *status = ProviderStatus::Error; + return; + } + let mut status = self.status.write().await; + *status = ProviderStatus::Ready; + } + + async fn resolve_bool_value( + &self, + flag_key: &str, + evaluation_context: &EvaluationContext, + ) -> EvaluationResult> { + match self.resolve_all_features(evaluation_context).await { + Ok(config) => { + if let Some(value) = config.get(flag_key) { + if let Some(bool_val) = value.as_bool() { + return Ok(ResolutionDetails::new(bool_val)); + } + } + Err(EvaluationError { + code: EvaluationErrorCode::FlagNotFound, + message: Some("Flag not found in configuration".to_string()), + }) + } + Err(e) => { + error!("Error evaluating boolean flag {}: {}", flag_key, e); + Err(EvaluationError { + code: EvaluationErrorCode::FlagNotFound, + message: Some(format!("Error evaluating flag: {}", e)), + }) + } + } + } + + async fn resolve_string_value( + &self, + flag_key: &str, + evaluation_context: &EvaluationContext, + ) -> EvaluationResult> { + match self.resolve_all_features(evaluation_context).await { + Ok(config) => { + if let Some(value) = config.get(flag_key) { + if let Some(str_val) = value.as_str() { + return Ok(ResolutionDetails::new(str_val.to_owned())); + } + } + Err(EvaluationError { + code: EvaluationErrorCode::FlagNotFound, + message: Some("Flag not found in configuration".to_string()), + }) + } + Err(e) => { + error!("Error evaluating string flag {}: {}", flag_key, e); + Err(EvaluationError { + code: EvaluationErrorCode::FlagNotFound, + message: Some(format!("Error evaluating flag: {}", e)), + }) + } + } + } + + async fn resolve_int_value( + &self, + flag_key: &str, + evaluation_context: &EvaluationContext, + ) -> EvaluationResult> { + match self.resolve_all_features(evaluation_context).await { + Ok(config) => { + if let Some(value) = config.get(flag_key) { + if let Some(int_val) = value.as_i64() { + return Ok(ResolutionDetails::new(int_val)); + } + } + Err(EvaluationError { + code: EvaluationErrorCode::FlagNotFound, + message: Some("Flag not found in configuration".to_string()), + }) + } + Err(e) => { + error!("Error evaluating integer flag {}: {}", flag_key, e); + Err(EvaluationError { + code: EvaluationErrorCode::FlagNotFound, + message: Some(format!("Error evaluating flag: {}", e)), + }) + } + } + } + + async fn resolve_float_value( + &self, + flag_key: &str, + evaluation_context: &EvaluationContext, + ) -> EvaluationResult> { + match self.resolve_all_features(evaluation_context).await { + Ok(config) => { + if let Some(value) = config.get(flag_key) { + if let Some(float_val) = value.as_f64() { + return Ok(ResolutionDetails::new(float_val)); + } + } + Err(EvaluationError { + code: EvaluationErrorCode::FlagNotFound, + message: Some("Flag not found in configuration".to_string()), + }) + } + Err(e) => { + error!("Error evaluating float flag {}: {}", flag_key, e); + Err(EvaluationError { + code: EvaluationErrorCode::FlagNotFound, + message: Some(format!("Error evaluating flag: {}", e)), + }) + } + } + } + + async fn resolve_struct_value( + &self, + flag_key: &str, + evaluation_context: &EvaluationContext, + ) -> EvaluationResult> { + match self.resolve_all_features(evaluation_context).await { + Ok(config) => { + if let Some(value) = config.get(flag_key) { + match ConversionUtils::serde_value_to_struct_value(value) { + Ok(struct_value) => { + return Ok(ResolutionDetails::new(struct_value)); + } + Err(e) => { + return Err(EvaluationError { + code: EvaluationErrorCode::ParseError, + message: Some(format!("Failed to parse struct value: {}", e)), + }); + } + } + } + Err(EvaluationError { + code: EvaluationErrorCode::FlagNotFound, + message: Some("Flag not found in configuration".to_string()), + }) + } + Err(e) => { + error!("Error evaluating struct flag {}: {}", flag_key, e); + Err(EvaluationError { + code: EvaluationErrorCode::FlagNotFound, + message: Some(format!("Error evaluating flag: {}", e)), + }) + } + } + } + + fn metadata(&self) -> &ProviderMetadata { + &self.metadata + } + + fn status(&self) -> ProviderStatus { + match self.status.try_read() { + Ok(status) => match *status { + ProviderStatus::Ready => ProviderStatus::Ready, + ProviderStatus::Error => ProviderStatus::Error, + ProviderStatus::NotReady => ProviderStatus::NotReady, + ProviderStatus::STALE => ProviderStatus::STALE, + }, + Err(_) => ProviderStatus::NotReady, + } + } +} +``` + +**Step 5: Add module to `lib.rs`** + +Add `pub mod local_provider;` and re-export: `pub use local_provider::LocalResolutionProvider;` + +**Step 6: Verify it compiles** + +Run: `cargo check -p superposition_provider` +Expected: compiles with no errors + +**Step 7: Commit** + +```bash +git add crates/superposition_provider/src/local_provider.rs crates/superposition_provider/src/lib.rs +git commit -m "feat: implement LocalResolutionProvider with primary/fallback and refresh strategies" +``` + +--- + +### Task 7: Implement `SuperpositionAPIProvider` (Remote Resolution) + +**Files:** +- Create: `crates/superposition_provider/src/remote_provider.rs` +- Modify: `crates/superposition_provider/src/lib.rs` + +**Step 1: Create `remote_provider.rs`** + +Note: This provider calls a resolve API endpoint that accepts context and returns already-resolved config. The exact SDK method for remote resolution needs to be verified against the `superposition_sdk` crate. If a dedicated resolve endpoint exists, use it. Otherwise, this provider can use `get_config` + local eval as a fallback (similar to LocalResolutionProvider but without caching raw data). + +```rust +use std::collections::HashMap; +use std::sync::Arc; +use std::time::{Duration, Instant}; + +use async_trait::async_trait; +use log::{debug, error, info}; +use open_feature::{ + provider::{FeatureProvider, ProviderMetadata, ProviderStatus, ResolutionDetails}, + EvaluationContext, EvaluationError, EvaluationErrorCode, EvaluationResult, StructValue, +}; +use serde_json::{Map, Value}; +use tokio::sync::RwLock; + +use crate::traits::{AllFeatureProvider, FeatureExperimentMeta}; +use crate::types::*; +use crate::utils::ConversionUtils; + +struct CacheEntry { + value: Map, + created_at: Instant, +} + +struct ResponseCache { + entries: HashMap, + max_entries: usize, + ttl: Duration, +} + +impl ResponseCache { + fn new(max_entries: usize, ttl: Duration) -> Self { + Self { + entries: HashMap::new(), + max_entries, + ttl, + } + } + + fn get(&self, key: &str) -> Option<&Map> { + self.entries.get(key).and_then(|entry| { + if entry.created_at.elapsed() < self.ttl { + Some(&entry.value) + } else { + None + } + }) + } + + fn put(&mut self, key: String, value: Map) { + // Evict expired entries if at capacity + if self.entries.len() >= self.max_entries { + self.entries + .retain(|_, entry| entry.created_at.elapsed() < self.ttl); + } + // If still at capacity, remove oldest + if self.entries.len() >= self.max_entries { + if let Some(oldest_key) = self + .entries + .iter() + .min_by_key(|(_, entry)| entry.created_at) + .map(|(k, _)| k.clone()) + { + self.entries.remove(&oldest_key); + } + } + self.entries.insert( + key, + CacheEntry { + value, + created_at: Instant::now(), + }, + ); + } + + fn cache_key(context: &EvaluationContext) -> String { + // Hash the context for cache key + let mut parts: Vec = Vec::new(); + if let Some(tk) = &context.targeting_key { + parts.push(format!("tk:{}", tk)); + } + let mut field_keys: Vec<&String> = context.custom_fields.keys().collect(); + field_keys.sort(); + for key in field_keys { + let value = &context.custom_fields[key]; + parts.push(format!( + "{}:{}", + key, + ConversionUtils::convert_evaluation_context_value_to_serde_value(value) + )); + } + parts.join("|") + } +} + +/// Remote resolution provider using the Superposition API +/// +/// Sends evaluation context over the network and receives resolved +/// configuration from the Superposition service. +pub struct SuperpositionAPIProvider { + options: SuperpositionOptions, + cache: Option>>, + metadata: ProviderMetadata, + status: RwLock, +} + +impl SuperpositionAPIProvider { + pub fn new(options: SuperpositionOptions) -> Self { + Self { + options, + cache: None, + metadata: ProviderMetadata { + name: "SuperpositionAPIProvider".to_string(), + }, + status: RwLock::new(ProviderStatus::Ready), + } + } + + pub fn with_cache(options: SuperpositionOptions, cache_options: CacheOptions) -> Self { + let cache = ResponseCache::new( + cache_options.size.unwrap_or(1000), + Duration::from_secs(cache_options.ttl.unwrap_or(300)), + ); + Self { + options, + cache: Some(Arc::new(RwLock::new(cache))), + metadata: ProviderMetadata { + name: "SuperpositionAPIProvider".to_string(), + }, + status: RwLock::new(ProviderStatus::Ready), + } + } + + fn create_client(&self) -> superposition_sdk::Client { + use superposition_sdk::{Client, Config as SdkConfig}; + + let sdk_config = SdkConfig::builder() + .endpoint_url(&self.options.endpoint) + .bearer_token(self.options.token.clone().into()) + .behavior_version_latest() + .build(); + + Client::from_conf(sdk_config) + } + + async fn resolve_remote( + &self, + context: &EvaluationContext, + prefix_filter: Option<&[String]>, + ) -> Result> { + // Check cache first + if let Some(cache) = &self.cache { + let cache_key = ResponseCache::cache_key(context); + let cache_read = cache.read().await; + if let Some(cached) = cache_read.get(&cache_key) { + debug!("Cache hit for context"); + let result = if let Some(prefixes) = prefix_filter { + cached + .iter() + .filter(|(k, _)| prefixes.iter().any(|p| k.starts_with(p))) + .map(|(k, v)| (k.clone(), v.clone())) + .collect() + } else { + cached.clone() + }; + return Ok(result); + } + drop(cache_read); + } + + let client = self.create_client(); + + let (query_data, targeting_key) = Self::get_context_from_evaluation_context(context); + + // Use the SDK resolve API + // Build query parameters from the context + let mut request = client + .get_resolved_config() + .workspace_id(&self.options.workspace_id) + .org_id(&self.options.org_id); + + // Add targeting key if present + if let Some(tk) = &targeting_key { + request = request.targeting_key(tk.as_str()); + } + + // Add context fields as query parameters + // The resolve API typically accepts context as a JSON body or query params + // Using the SDK's built-in method for passing context + for (key, value) in &query_data { + request = request.context(key.clone(), value.to_string()); + } + + if let Some(prefixes) = prefix_filter { + for prefix in prefixes { + request = request.filter_keys(prefix.clone()); + } + } + + let response = request.send().await.map_err(|e| { + SuperpositionError::NetworkError(format!("Failed to resolve config: {}", e)) + })?; + + // Convert response to Map + let config = ConversionUtils::convert_get_config_response(&response)?; + + // For remote resolution, the response should already be resolved + // We return the default_configs as the resolved values + let result: Map = (*config.default_configs).clone(); + + // Cache the result + if let Some(cache) = &self.cache { + let cache_key = ResponseCache::cache_key(context); + let mut cache_write = cache.write().await; + cache_write.put(cache_key, result.clone()); + } + + Ok(result) + } + + fn get_context_from_evaluation_context( + evaluation_context: &EvaluationContext, + ) -> (Map, Option) { + let context = evaluation_context + .custom_fields + .iter() + .map(|(k, v)| { + ( + k.clone(), + ConversionUtils::convert_evaluation_context_value_to_serde_value(v), + ) + }) + .collect(); + + (context, evaluation_context.targeting_key.clone()) + } +} + +#[async_trait] +impl AllFeatureProvider for SuperpositionAPIProvider { + async fn resolve_all_features( + &self, + context: &EvaluationContext, + ) -> Result> { + self.resolve_remote(context, None).await + } + + async fn resolve_all_features_with_filter( + &self, + context: &EvaluationContext, + prefix_filter: Option<&[String]>, + ) -> Result> { + self.resolve_remote(context, prefix_filter).await + } +} + +#[async_trait] +impl FeatureExperimentMeta for SuperpositionAPIProvider { + async fn get_applicable_variants( + &self, + _context: &EvaluationContext, + ) -> Result> { + // Remote resolution handles experiments server-side + // The resolved config already includes experiment results + // Return empty - variants are already applied in the resolved config + Ok(vec![]) + } +} + +#[async_trait] +impl FeatureProvider for SuperpositionAPIProvider { + async fn initialize(&mut self, _context: &EvaluationContext) { + let mut status = self.status.write().await; + *status = ProviderStatus::Ready; + } + + async fn resolve_bool_value( + &self, + flag_key: &str, + evaluation_context: &EvaluationContext, + ) -> EvaluationResult> { + match self.resolve_all_features(evaluation_context).await { + Ok(config) => { + if let Some(value) = config.get(flag_key) { + if let Some(bool_val) = value.as_bool() { + return Ok(ResolutionDetails::new(bool_val)); + } + } + Err(EvaluationError { + code: EvaluationErrorCode::FlagNotFound, + message: Some("Flag not found".to_string()), + }) + } + Err(e) => Err(EvaluationError { + code: EvaluationErrorCode::FlagNotFound, + message: Some(format!("Error: {}", e)), + }), + } + } + + async fn resolve_string_value( + &self, + flag_key: &str, + evaluation_context: &EvaluationContext, + ) -> EvaluationResult> { + match self.resolve_all_features(evaluation_context).await { + Ok(config) => { + if let Some(value) = config.get(flag_key) { + if let Some(str_val) = value.as_str() { + return Ok(ResolutionDetails::new(str_val.to_owned())); + } + } + Err(EvaluationError { + code: EvaluationErrorCode::FlagNotFound, + message: Some("Flag not found".to_string()), + }) + } + Err(e) => Err(EvaluationError { + code: EvaluationErrorCode::FlagNotFound, + message: Some(format!("Error: {}", e)), + }), + } + } + + async fn resolve_int_value( + &self, + flag_key: &str, + evaluation_context: &EvaluationContext, + ) -> EvaluationResult> { + match self.resolve_all_features(evaluation_context).await { + Ok(config) => { + if let Some(value) = config.get(flag_key) { + if let Some(int_val) = value.as_i64() { + return Ok(ResolutionDetails::new(int_val)); + } + } + Err(EvaluationError { + code: EvaluationErrorCode::FlagNotFound, + message: Some("Flag not found".to_string()), + }) + } + Err(e) => Err(EvaluationError { + code: EvaluationErrorCode::FlagNotFound, + message: Some(format!("Error: {}", e)), + }), + } + } + + async fn resolve_float_value( + &self, + flag_key: &str, + evaluation_context: &EvaluationContext, + ) -> EvaluationResult> { + match self.resolve_all_features(evaluation_context).await { + Ok(config) => { + if let Some(value) = config.get(flag_key) { + if let Some(float_val) = value.as_f64() { + return Ok(ResolutionDetails::new(float_val)); + } + } + Err(EvaluationError { + code: EvaluationErrorCode::FlagNotFound, + message: Some("Flag not found".to_string()), + }) + } + Err(e) => Err(EvaluationError { + code: EvaluationErrorCode::FlagNotFound, + message: Some(format!("Error: {}", e)), + }), + } + } + + async fn resolve_struct_value( + &self, + flag_key: &str, + evaluation_context: &EvaluationContext, + ) -> EvaluationResult> { + match self.resolve_all_features(evaluation_context).await { + Ok(config) => { + if let Some(value) = config.get(flag_key) { + match ConversionUtils::serde_value_to_struct_value(value) { + Ok(struct_value) => { + return Ok(ResolutionDetails::new(struct_value)); + } + Err(e) => { + return Err(EvaluationError { + code: EvaluationErrorCode::ParseError, + message: Some(format!("Failed to parse struct value: {}", e)), + }); + } + } + } + Err(EvaluationError { + code: EvaluationErrorCode::FlagNotFound, + message: Some("Flag not found".to_string()), + }) + } + Err(e) => Err(EvaluationError { + code: EvaluationErrorCode::FlagNotFound, + message: Some(format!("Error: {}", e)), + }), + } + } + + fn metadata(&self) -> &ProviderMetadata { + &self.metadata + } + + fn status(&self) -> ProviderStatus { + match self.status.try_read() { + Ok(status) => match *status { + ProviderStatus::Ready => ProviderStatus::Ready, + ProviderStatus::Error => ProviderStatus::Error, + ProviderStatus::NotReady => ProviderStatus::NotReady, + ProviderStatus::STALE => ProviderStatus::STALE, + }, + Err(_) => ProviderStatus::NotReady, + } + } +} +``` + +**Step 2: Add module to `lib.rs`** + +Add `pub mod remote_provider;` and re-export: `pub use remote_provider::SuperpositionAPIProvider;` + +**Step 3: Verify it compiles** + +Run: `cargo check -p superposition_provider` +Expected: compiles (the `get_resolved_config` SDK method may need adjustment based on actual SDK API - check `superposition_sdk` for the correct method name and parameters) + +**Step 4: Commit** + +```bash +git add crates/superposition_provider/src/remote_provider.rs crates/superposition_provider/src/lib.rs +git commit -m "feat: implement SuperpositionAPIProvider for remote resolution" +``` + +--- + +### Task 8: Update `lib.rs` with final re-exports + +**Files:** +- Modify: `crates/superposition_provider/src/lib.rs` + +**Step 1: Update `lib.rs` to export all new modules** + +```rust +// Existing modules (kept for comparison) +pub mod client; +pub mod provider; +pub mod types; +pub mod utils; + +// New trait-based architecture +pub mod data_source; +pub mod local_provider; +pub mod remote_provider; +pub mod traits; + +// Re-exports - existing +pub use client::*; +pub use provider::*; +pub use types::*; + +// Re-exports - new +pub use data_source::{ConfigData, ExperimentData, SuperpositionDataSource}; +pub use local_provider::LocalResolutionProvider; +pub use remote_provider::SuperpositionAPIProvider; +pub use traits::*; + +pub use open_feature::{ + provider::{ProviderMetadata, ProviderStatus, ResolutionDetails}, + EvaluationContext, +}; +``` + +**Step 2: Verify it compiles** + +Run: `cargo check -p superposition_provider` +Expected: compiles with no errors + +**Step 3: Commit** + +```bash +git add crates/superposition_provider/src/lib.rs +git commit -m "feat: update lib.rs with re-exports for new trait-based architecture" +``` + +--- + +### Task 9: Create examples + +**Files:** +- Create: `crates/superposition_provider/examples/local_http_example.rs` +- Create: `crates/superposition_provider/examples/local_file_example.rs` +- Create: `crates/superposition_provider/examples/local_with_fallback_example.rs` + +**Step 1: Create `local_http_example.rs`** + +```rust +use open_feature::EvaluationContext; +use superposition_provider::{ + data_source::http::HttpDataSource, + local_provider::LocalResolutionProvider, + traits::{AllFeatureProvider, FeatureExperimentMeta}, + PollingStrategy, RefreshStrategy, SuperpositionOptions, +}; + +#[tokio::main] +async fn main() { + env_logger::init(); + + let http_source = HttpDataSource::new(SuperpositionOptions::new( + "http://localhost:8080".to_string(), + "token".to_string(), + "org1".to_string(), + "workspace1".to_string(), + )); + + let provider = LocalResolutionProvider::new( + Box::new(http_source), + None, + RefreshStrategy::Polling(PollingStrategy { + interval: 30, + timeout: Some(10), + }), + ); + provider.init().await.unwrap(); + + let context = EvaluationContext::default() + .with_targeting_key("user-123") + .with_custom_field("os", "android"); + + // AllFeatureProvider usage + let all_config = provider.resolve_all_features(&context).await.unwrap(); + println!("All config: {:?}", all_config); + + // FeatureExperimentMeta usage + let variants = provider.get_applicable_variants(&context).await.unwrap(); + println!("Variants: {:?}", variants); + + provider.close().await.unwrap(); +} +``` + +**Step 2: Create `local_file_example.rs`** + +```rust +use std::path::PathBuf; + +use open_feature::EvaluationContext; +use superposition_provider::{ + data_source::file::FileDataSource, + local_provider::LocalResolutionProvider, + traits::AllFeatureProvider, + OnDemandStrategy, RefreshStrategy, +}; + +#[tokio::main] +async fn main() { + env_logger::init(); + + let file_source = FileDataSource::new(PathBuf::from("./config.toml")); + + let provider = LocalResolutionProvider::new( + Box::new(file_source), + None, + RefreshStrategy::OnDemand(OnDemandStrategy { + ttl: 60, + ..Default::default() + }), + ); + provider.init().await.unwrap(); + + let context = EvaluationContext::default() + .with_custom_field("os", "linux"); + + let config = provider.resolve_all_features(&context).await.unwrap(); + println!("Config: {:?}", config); + + provider.close().await.unwrap(); +} +``` + +**Step 3: Create `local_with_fallback_example.rs`** + +```rust +use std::path::PathBuf; + +use open_feature::EvaluationContext; +use superposition_provider::{ + data_source::file::FileDataSource, + data_source::http::HttpDataSource, + local_provider::LocalResolutionProvider, + traits::{AllFeatureProvider, FeatureExperimentMeta}, + PollingStrategy, RefreshStrategy, SuperpositionOptions, +}; + +#[tokio::main] +async fn main() { + env_logger::init(); + + // Primary: HTTP data source + let http_source = HttpDataSource::new(SuperpositionOptions::new( + "http://localhost:8080".to_string(), + "token".to_string(), + "org1".to_string(), + "workspace1".to_string(), + )); + + // Fallback: TOML file data source + let file_source = FileDataSource::new(PathBuf::from("./config.toml")); + + let provider = LocalResolutionProvider::new( + Box::new(http_source), + Some(Box::new(file_source)), + RefreshStrategy::Polling(PollingStrategy { + interval: 30, + timeout: Some(10), + }), + ); + provider.init().await.unwrap(); + + let context = EvaluationContext::default() + .with_targeting_key("user-456") + .with_custom_field("os", "android") + .with_custom_field("app_version", "3.2.1"); + + // Bulk resolution + let all_config = provider.resolve_all_features(&context).await.unwrap(); + println!("All config: {:?}", all_config); + + // Filtered resolution + let filtered = provider + .resolve_all_features_with_filter( + &context, + Some(&["payment.".to_string(), "ui.".to_string()]), + ) + .await + .unwrap(); + println!("Filtered config: {:?}", filtered); + + // Experiment variants + let variants = provider.get_applicable_variants(&context).await.unwrap(); + println!("Variants: {:?}", variants); + + // Manual refresh + provider.refresh().await.unwrap(); + + provider.close().await.unwrap(); +} +``` + +**Step 4: Add `env_logger` dev-dependency to `Cargo.toml`** + +Add to `[dev-dependencies]` in `crates/superposition_provider/Cargo.toml`: +```toml +[dev-dependencies] +env_logger = "0.11" +``` + +**Step 5: Verify examples compile** + +Run: `cargo check -p superposition_provider --examples` +Expected: compiles with no errors + +**Step 6: Commit** + +```bash +git add crates/superposition_provider/examples/ crates/superposition_provider/Cargo.toml +git commit -m "feat: add examples for LocalResolutionProvider with HTTP, File, and fallback" +``` + +--- + +### Task 10: Final verification + +**Step 1: Run full crate check** + +Run: `cargo check -p superposition_provider` +Expected: compiles with no errors and no warnings + +**Step 2: Run existing tests** + +Run: `cargo test -p superposition_provider -- --skip test_rust_provider_integration` +Expected: existing unit tests pass + +**Step 3: Check examples compile** + +Run: `cargo check -p superposition_provider --examples` +Expected: compiles + +**Step 4: Run workspace check to ensure no regressions** + +Run: `cargo check --workspace` +Expected: no compilation errors in any crate + +**Step 5: Commit any final fixes** + +```bash +git add -A +git commit -m "fix: final adjustments for configuration resolver implementation" +```