diff --git a/Cargo.lock b/Cargo.lock index 47363319..af67b12e 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -428,7 +428,7 @@ dependencies = [ [[package]] name = "bothan-api" -version = "0.1.0" +version = "0.1.1" dependencies = [ "async-trait", "bothan-band", @@ -456,7 +456,7 @@ dependencies = [ [[package]] name = "bothan-api-cli" -version = "0.1.0" +version = "0.1.1" dependencies = [ "anyhow", "bothan-api", @@ -493,7 +493,7 @@ dependencies = [ [[package]] name = "bothan-band" -version = "0.1.0" +version = "0.1.1" dependencies = [ "async-trait", "bothan-lib", @@ -514,7 +514,7 @@ dependencies = [ [[package]] name = "bothan-binance" -version = "0.1.0" +version = "0.1.1" dependencies = [ "async-trait", "bothan-lib", @@ -534,7 +534,7 @@ dependencies = [ [[package]] name = "bothan-bitfinex" -version = "0.1.0" +version = "0.1.1" dependencies = [ "async-trait", "bothan-lib", @@ -553,7 +553,7 @@ dependencies = [ [[package]] name = "bothan-bybit" -version = "0.1.0" +version = "0.1.1" dependencies = [ "async-trait", "bothan-lib", @@ -571,7 +571,7 @@ dependencies = [ [[package]] name = "bothan-client" -version = "0.1.0" +version = "0.1.1" dependencies = [ "pbjson", "prost", @@ -585,7 +585,7 @@ dependencies = [ [[package]] name = "bothan-coinbase" -version = "0.1.0" +version = "0.1.1" dependencies = [ "async-trait", "bothan-lib", @@ -605,7 +605,7 @@ dependencies = [ [[package]] name = "bothan-coingecko" -version = "0.1.0" +version = "0.1.1" dependencies = [ "async-trait", "bothan-lib", @@ -624,7 +624,7 @@ dependencies = [ [[package]] name = "bothan-coinmarketcap" -version = "0.1.0" +version = "0.1.1" dependencies = [ "async-trait", "bothan-lib", @@ -645,7 +645,7 @@ dependencies = [ [[package]] name = "bothan-core" -version = "0.1.0" +version = "0.1.1" dependencies = [ "async-trait", "axum 0.8.4", @@ -687,7 +687,7 @@ dependencies = [ [[package]] name = "bothan-htx" -version = "0.1.0" +version = "0.1.1" dependencies = [ "async-trait", "bothan-lib", @@ -706,7 +706,7 @@ dependencies = [ [[package]] name = "bothan-kraken" -version = "0.1.0" +version = "0.1.1" dependencies = [ "async-trait", "bothan-lib", @@ -725,7 +725,7 @@ dependencies = [ [[package]] name = "bothan-lib" -version = "0.1.0" +version = "0.1.1" dependencies = [ "async-trait", "bincode", @@ -747,7 +747,7 @@ dependencies = [ [[package]] name = "bothan-okx" -version = "0.1.0" +version = "0.1.1" dependencies = [ "async-trait", "bothan-lib", @@ -3531,14 +3531,13 @@ checksum = "f64def088c51c9510a8579e3c5d67c65349dcf755e5479ad3d010aa6454e2c32" [[package]] name = "strum_macros" -version = "0.27.1" +version = "0.27.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c77a8c5abcaf0f9ce05d62342b7d298c346515365c36b673df4ebe3ced01fde8" +checksum = "7695ce3845ea4b33927c055a39dc438a45b059f7c1b3d91d38d10355fb8cbca7" dependencies = [ "heck", "proc-macro2", "quote", - "rustversion", "syn 2.0.104", ] diff --git a/Cargo.toml b/Cargo.toml index 2e9336be..89c685aa 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -17,22 +17,20 @@ resolver = "2" [workspace.dependencies] bothan-api = { path = "bothan-api/server" } -bothan-core = { path = "bothan-core", version = "0.1.0" } -bothan-client = { path = "bothan-api/client/rust-client", version = "0.1.0" } -bothan-lib = { path = "bothan-lib", version = "0.1.0" } +bothan-core = { path = "bothan-core", version = "0.1.1" } +bothan-client = { path = "bothan-api/client/rust-client", version = "0.1.1" } +bothan-lib = { path = "bothan-lib", version = "0.1.1" } -bothan-binance = { path = "bothan-binance", version = "0.1.0" } -bothan-bitfinex = { path = "bothan-bitfinex", version = "0.1.0" } -bothan-bybit = { path = "bothan-bybit", version = "0.1.0" } -bothan-coinbase = { path = "bothan-coinbase", version = "0.1.0" } -bothan-coingecko = { path = "bothan-coingecko", version = "0.1.0" } -bothan-coinmarketcap = { path = "bothan-coinmarketcap", version = "0.1.0" } -bothan-htx = { path = "bothan-htx", version = "0.1.0" } -bothan-kraken = { path = "bothan-kraken", version = "0.1.0" } -bothan-okx = { path = "bothan-okx", version = "0.1.0" } -bothan-band = { path = "bothan-band", version = "0.1.0" } - -anyhow = "1.0.86" +bothan-binance = { path = "bothan-binance", version = "0.1.1" } +bothan-bitfinex = { path = "bothan-bitfinex", version = "0.1.1" } +bothan-bybit = { path = "bothan-bybit", version = "0.1.1" } +bothan-coinbase = { path = "bothan-coinbase", version = "0.1.1" } +bothan-coingecko = { path = "bothan-coingecko", version = "0.1.1" } +bothan-coinmarketcap = { path = "bothan-coinmarketcap", version = "0.1.1" } +bothan-htx = { path = "bothan-htx", version = "0.1.1" } +bothan-kraken = { path = "bothan-kraken", version = "0.1.1" } +bothan-okx = { path = "bothan-okx", version = "0.1.1" } +bothan-band = { path = "bothan-band", version = "0.1.1" } async-trait = "0.1.77" bincode = "2.0.1" chrono = "0.4.39" @@ -46,8 +44,6 @@ mockito = "1.4.0" num-traits = "0.2.19" opentelemetry = { version = "0.28.0", features = ["metrics"] } prost = "0.13.1" -protoc-gen-prost = "0.4.0" -protoc-gen-tonic = "0.4.1" rand = "0.8.5" reqwest = { version = "0.12.3", features = ["json"] } rust_decimal = "1.10.2" diff --git a/README.md b/README.md index 4238f903..2c1c01a1 100644 --- a/README.md +++ b/README.md @@ -23,7 +23,7 @@ This project comprises primarily of 6 main components: - `bothan-{exchange}` - Exchange-specific implementations - [`proto`](proto/) - Protocol buffer definitions -## Supported Data Sources +## Supported Crypto Data Sources - [Binance](bothan-binance) - [Bitfinex](bothan-bitfinex) @@ -38,6 +38,12 @@ This project comprises primarily of 6 main components: - [Band/kiwi](bothan-band) - [Band/macaw](bothan-band) +## Supported Forex Data Sources + +- [Band/owlet](bothan-band) +- [Band/fieldfare](bothan-band) +- [Band/xenops](bothan-band) + ## Features - **Unified API**: Consistent interface across all supported exchanges diff --git a/bothan-api/client/rust-client/Cargo.toml b/bothan-api/client/rust-client/Cargo.toml index 5c629a70..1810e5fa 100644 --- a/bothan-api/client/rust-client/Cargo.toml +++ b/bothan-api/client/rust-client/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "bothan-client" -version = "0.1.0" +version = "0.1.1" description = "Rust client for the Bothan API" authors.workspace = true edition.workspace = true diff --git a/bothan-api/server-cli/Cargo.toml b/bothan-api/server-cli/Cargo.toml index e4041850..3d95b790 100644 --- a/bothan-api/server-cli/Cargo.toml +++ b/bothan-api/server-cli/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "bothan-api-cli" -version = "0.1.0" +version = "0.1.1" edition.workspace = true license.workspace = true repository.workspace = true diff --git a/bothan-api/server-cli/config.example.toml b/bothan-api/server-cli/config.example.toml index 51c3c3be..5f53f266 100644 --- a/bothan-api/server-cli/config.example.toml +++ b/bothan-api/server-cli/config.example.toml @@ -41,6 +41,11 @@ enabled = false # The threshold in seconds after which data is considered stale. stale_threshold = 300 +# Manager configuration for handling forex data sources +[manager.forex] +# The threshold in seconds after which data is considered stale. +stale_threshold = 7200 + # Configuration for the data sources that the manager will use. # If any of these [manager.crypto.source] sections (e.g., section manager.crypto.source.binance) are removed, # that specific source will not be used in bothan. @@ -114,6 +119,24 @@ url = "https://macaw.bandchain.org" # Update interval for Band Macaw source update_interval = "1m" +[manager.forex.source.band_owlet] +# URL for Band Owlet source +url = "https://owlet.bandchain.org" +# Update interval for Band Owlet source +update_interval = "1m" + +[manager.forex.source.band_fieldfare] +# URL for Band Fieldfare source +url = "https://fieldfare.bandchain.org" +# Update interval for Band Fieldfare source +update_interval = "1m" + +[manager.forex.source.band_xenops] +# URL for Band Xenops source +url = "https://xenops.bandchain.org" +# Update interval for Band Xenops source +update_interval = "1m" + # Telemetry configuration [telemetry] # Enable or disable telemetry. diff --git a/bothan-api/server-cli/src/commands/query.rs b/bothan-api/server-cli/src/commands/query.rs index 9072f824..4d21404c 100644 --- a/bothan-api/server-cli/src/commands/query.rs +++ b/bothan-api/server-cli/src/commands/query.rs @@ -6,7 +6,7 @@ //! //! ## Features //! -//! - Query prices from Binance, Bitfinex, Bybit, Coinbase, CoinGecko, CoinMarketCap, HTX, Kraken, OKX +//! - Query prices from Binance, Bitfinex, Bybit, Coinbase, CoinGecko, CoinMarketCap, HTX, Kraken, OKX, Band (Kiwi, Macaw, Owlet, Fieldfare, Xenops) //! - Customizable timeout and query IDs //! - Pretty-printed table output //! @@ -124,55 +124,90 @@ pub enum QuerySubCommand { #[clap(flatten)] args: QueryArgs, }, + /// Query Band/owlet prices + #[clap(name = "band/owlet")] + BandOwlet { + #[clap(flatten)] + args: QueryArgs, + }, + /// Query Band/fieldfare prices + #[clap(name = "band/fieldfare")] + BandFieldfare { + #[clap(flatten)] + args: QueryArgs, + }, + /// Query Band/xenops prices + #[clap(name = "band/xenops")] + BandXenops { + #[clap(flatten)] + args: QueryArgs, + }, } impl QueryCli { pub async fn run(&self, app_config: AppConfig) -> anyhow::Result<()> { - let source_config = app_config.manager.crypto.source; + let crypto_config = app_config.manager.crypto.and_then(|c| c.source); + let forex_config = app_config.manager.forex.and_then(|f| f.source); let config_err = anyhow!("Config is missing. Please check your config.toml."); match &self.subcommand { QuerySubCommand::Binance { args } => { - let opts = source_config.binance.ok_or(config_err)?; + let opts = crypto_config.and_then(|c| c.binance).ok_or(config_err)?; query_binance(opts, &args.query_ids, args.timeout).await?; } QuerySubCommand::Bitfinex { args } => { - let opts = source_config.bitfinex.ok_or(config_err)?; + let opts = crypto_config.and_then(|c| c.bitfinex).ok_or(config_err)?; query_bitfinex(opts, &args.query_ids, args.timeout).await?; } QuerySubCommand::Bybit { args } => { - let opts = source_config.bybit.ok_or(config_err)?; + let opts = crypto_config.and_then(|c| c.bybit).ok_or(config_err)?; query_bybit(opts, &args.query_ids, args.timeout).await?; } QuerySubCommand::Coinbase { args } => { - let opts = source_config.coinbase.ok_or(config_err)?; + let opts = crypto_config.and_then(|c| c.coinbase).ok_or(config_err)?; query_coinbase(opts, &args.query_ids, args.timeout).await?; } QuerySubCommand::CoinGecko { args } => { - let opts = source_config.coingecko.ok_or(config_err)?; + let opts = crypto_config.and_then(|c| c.coingecko).ok_or(config_err)?; query_coingecko(opts, &args.query_ids, args.timeout).await?; } QuerySubCommand::CoinMarketCap { args } => { - let opts = source_config.coinmarketcap.ok_or(config_err)?; + let opts = crypto_config + .and_then(|c| c.coinmarketcap) + .ok_or(config_err)?; query_coinmarketcap(opts, &args.query_ids, args.timeout).await?; } QuerySubCommand::Htx { args } => { - let opts = source_config.htx.ok_or(config_err)?; + let opts = crypto_config.and_then(|c| c.htx).ok_or(config_err)?; query_htx(opts, &args.query_ids, args.timeout).await?; } QuerySubCommand::Kraken { args } => { - let opts = source_config.kraken.ok_or(config_err)?; + let opts = crypto_config.and_then(|c| c.kraken).ok_or(config_err)?; query_kraken(opts, &args.query_ids, args.timeout).await?; } QuerySubCommand::Okx { args } => { - let opts = source_config.okx.ok_or(config_err)?; + let opts = crypto_config.and_then(|c| c.okx).ok_or(config_err)?; query_okx(opts, &args.query_ids, args.timeout).await?; } QuerySubCommand::BandKiwi { args } => { - let opts = source_config.band_kiwi.ok_or(config_err)?; + let opts = crypto_config.and_then(|c| c.band_kiwi).ok_or(config_err)?; query_band(opts, &args.query_ids, args.timeout).await?; } QuerySubCommand::BandMacaw { args } => { - let opts = source_config.band_macaw.ok_or(config_err)?; + let opts = crypto_config.and_then(|c| c.band_macaw).ok_or(config_err)?; + query_band(opts, &args.query_ids, args.timeout).await?; + } + QuerySubCommand::BandOwlet { args } => { + let opts = forex_config.and_then(|f| f.band_owlet).ok_or(config_err)?; + query_band(opts, &args.query_ids, args.timeout).await?; + } + QuerySubCommand::BandFieldfare { args } => { + let opts = forex_config + .and_then(|f| f.band_fieldfare) + .ok_or(config_err)?; + query_band(opts, &args.query_ids, args.timeout).await?; + } + QuerySubCommand::BandXenops { args } => { + let opts = forex_config.and_then(|f| f.band_xenops).ok_or(config_err)?; query_band(opts, &args.query_ids, args.timeout).await?; } } diff --git a/bothan-api/server-cli/src/commands/start.rs b/bothan-api/server-cli/src/commands/start.rs index afe33027..3c4d6b0a 100644 --- a/bothan-api/server-cli/src/commands/start.rs +++ b/bothan-api/server-cli/src/commands/start.rs @@ -15,11 +15,12 @@ use bothan_api::api::BothanServer; use bothan_api::config::AppConfig; use bothan_api::config::ipfs::IpfsAuthentication; use bothan_api::config::manager::crypto_info::sources::CryptoSourceConfigs; +use bothan_api::config::manager::forex_info::sources::ForexSourceConfigs; use bothan_api::proto::bothan::v1::{BothanServiceServer, FILE_DESCRIPTOR_SET}; use bothan_api::{REGISTRY_REQUIREMENT, VERSION}; use bothan_core::ipfs::{IpfsClient, IpfsClientBuilder}; -use bothan_core::manager::CryptoAssetInfoManager; -use bothan_core::manager::crypto_asset_info::CryptoAssetWorkerOpts; +use bothan_core::manager::AssetInfoManager; +use bothan_core::manager::asset_info::AssetWorkerOpts; use bothan_core::monitoring::{Client as MonitoringClient, Signer}; use bothan_core::store::rocksdb::RocksDbStore; use bothan_core::telemetry; @@ -187,23 +188,47 @@ async fn init_bothan_server( ipfs_client: IpfsClient, monitoring_client: Option>, ) -> anyhow::Result>> { - let stale_threshold = config.manager.crypto.stale_threshold; + let prefix_stale_thresholds = init_prefix_stale_thresholds( + config.manager.crypto.as_ref().map(|c| c.stale_threshold), + config.manager.forex.as_ref().map(|f| f.stale_threshold), + ); let bothan_version = Version::from_str(VERSION).with_context(|| "Failed to parse bothan version")?; let registry_version_requirement = VersionReq::from_str(REGISTRY_REQUIREMENT) .with_context(|| "Failed to parse registry version requirement")?; - let opts = match init_crypto_opts(&config.manager.crypto.source).await { + let crypto_opts = match init_crypto_opts( + config + .manager + .crypto + .as_ref() + .and_then(|c| c.source.as_ref()), + ) + .await + { Ok(workers) => workers, Err(e) => { bail!("failed to initialize workers: {:?}", e); } }; - let manager = match CryptoAssetInfoManager::build( + + let forex_opts = match &config.manager.forex { + Some(forex) => match init_forex_opts(forex.source.as_ref()).await { + Ok(workers) => workers, + Err(e) => { + bail!("failed to initialize workers: {:?}", e); + } + }, + None => HashMap::new(), + }; + + let worker_opts = crypto_opts.into_iter().chain(forex_opts).collect(); + + let manager = match AssetInfoManager::build( store, - opts, + worker_opts, ipfs_client, - stale_threshold, + prefix_stale_thresholds, bothan_version, registry_version_requirement, monitoring_client, @@ -237,28 +262,58 @@ async fn init_bothan_server( Ok(Arc::new(BothanServer::new(manager, metrics))) } +fn init_prefix_stale_thresholds( + crypto_stale_threshold: Option, + forex_stale_threshold: Option, +) -> HashMap { + let mut map = HashMap::new(); + if let Some(threshold) = crypto_stale_threshold { + map.insert('C', threshold); + } + if let Some(threshold) = forex_stale_threshold { + map.insert('F', threshold); + } + map +} + async fn init_crypto_opts( - source: &CryptoSourceConfigs, -) -> Result, AssetWorkerError> { + source: Option<&CryptoSourceConfigs>, +) -> Result, AssetWorkerError> { + let mut worker_opts = HashMap::new(); + + if let Some(source) = source { + add_worker_opts(&mut worker_opts, &source.binance).await?; + add_worker_opts(&mut worker_opts, &source.bitfinex).await?; + add_worker_opts(&mut worker_opts, &source.bybit).await?; + add_worker_opts(&mut worker_opts, &source.coinbase).await?; + add_worker_opts(&mut worker_opts, &source.coingecko).await?; + add_worker_opts(&mut worker_opts, &source.coinmarketcap).await?; + add_worker_opts(&mut worker_opts, &source.htx).await?; + add_worker_opts(&mut worker_opts, &source.kraken).await?; + add_worker_opts(&mut worker_opts, &source.okx).await?; + add_worker_opts(&mut worker_opts, &source.band_kiwi).await?; + add_worker_opts(&mut worker_opts, &source.band_macaw).await?; + } + + Ok(worker_opts) +} + +async fn init_forex_opts( + source: Option<&ForexSourceConfigs>, +) -> Result, AssetWorkerError> { let mut worker_opts = HashMap::new(); - add_worker_opts(&mut worker_opts, &source.binance).await?; - add_worker_opts(&mut worker_opts, &source.bitfinex).await?; - add_worker_opts(&mut worker_opts, &source.bybit).await?; - add_worker_opts(&mut worker_opts, &source.coinbase).await?; - add_worker_opts(&mut worker_opts, &source.coingecko).await?; - add_worker_opts(&mut worker_opts, &source.coinmarketcap).await?; - add_worker_opts(&mut worker_opts, &source.htx).await?; - add_worker_opts(&mut worker_opts, &source.kraken).await?; - add_worker_opts(&mut worker_opts, &source.okx).await?; - add_worker_opts(&mut worker_opts, &source.band_kiwi).await?; - add_worker_opts(&mut worker_opts, &source.band_macaw).await?; + if let Some(source) = source { + add_worker_opts(&mut worker_opts, &source.band_owlet).await?; + add_worker_opts(&mut worker_opts, &source.band_fieldfare).await?; + add_worker_opts(&mut worker_opts, &source.band_xenops).await?; + } Ok(worker_opts) } -async fn add_worker_opts>( - workers_opts: &mut HashMap, +async fn add_worker_opts>( + workers_opts: &mut HashMap, opts: &Option, ) -> Result<(), AssetWorkerError> { if let Some(opts) = opts { diff --git a/bothan-api/server/Cargo.toml b/bothan-api/server/Cargo.toml index f820df49..be696899 100644 --- a/bothan-api/server/Cargo.toml +++ b/bothan-api/server/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "bothan-api" -version = "0.1.0" +version = "0.1.1" edition.workspace = true license.workspace = true repository.workspace = true diff --git a/bothan-api/server/src/api/server.rs b/bothan-api/server/src/api/server.rs index 1432f9c5..b5680af6 100644 --- a/bothan-api/server/src/api/server.rs +++ b/bothan-api/server/src/api/server.rs @@ -16,8 +16,8 @@ use std::sync::Arc; use std::time::Instant; -use bothan_core::manager::CryptoAssetInfoManager; -use bothan_core::manager::crypto_asset_info::error::{PushMonitoringRecordError, SetRegistryError}; +use bothan_core::manager::AssetInfoManager; +use bothan_core::manager::asset_info::error::{PushMonitoringRecordError, SetRegistryError}; use bothan_lib::metrics::server::{Metrics, ServiceName}; use bothan_lib::store::Store; use semver::Version; @@ -35,13 +35,13 @@ pub const PRECISION: u32 = 9; /// The `BothanServer` struct represents a server that implements the `BothanService` trait. pub struct BothanServer { - manager: Arc>, + manager: Arc>, metrics: Metrics, } impl BothanServer { /// Creates a new `BothanServer` instance. - pub fn new(manager: Arc>, metrics: Metrics) -> Self { + pub fn new(manager: Arc>, metrics: Metrics) -> Self { BothanServer { manager, metrics } } } diff --git a/bothan-api/server/src/api/utils.rs b/bothan-api/server/src/api/utils.rs index f80aca2b..0b20b1e0 100644 --- a/bothan-api/server/src/api/utils.rs +++ b/bothan-api/server/src/api/utils.rs @@ -7,7 +7,7 @@ //! //! - `parse_price_state`: Converts a `PriceState` to a `Price` API response. -use bothan_core::manager::crypto_asset_info::types::PriceState; +use bothan_core::manager::asset_info::types::PriceState; use rust_decimal::prelude::Zero; use tracing::error; diff --git a/bothan-api/server/src/config/manager.rs b/bothan-api/server/src/config/manager.rs index 2f01cfd6..c2fb6985 100644 --- a/bothan-api/server/src/config/manager.rs +++ b/bothan-api/server/src/config/manager.rs @@ -10,14 +10,30 @@ //! ``` use crypto_info::CryptoInfoManagerConfig; +use forex_info::ForexInfoManagerConfig; use serde::{Deserialize, Serialize}; +/// Shared Band worker serde helpers. +pub(crate) mod band_serde; /// Crypto info manager configuration module. pub mod crypto_info; +/// Forex info manager configuration module. +pub mod forex_info; /// The configuration for all bothan-api's manager. -#[derive(Clone, Debug, Default, Serialize, Deserialize)] +#[derive(Clone, Debug, Serialize, Deserialize)] pub struct ManagerConfig { /// The configuration for the crypto info manager. - pub crypto: CryptoInfoManagerConfig, + pub crypto: Option, + /// The configuration for the forex info manager. + pub forex: Option, +} + +impl Default for ManagerConfig { + fn default() -> Self { + ManagerConfig { + crypto: Some(CryptoInfoManagerConfig::default()), + forex: Some(ForexInfoManagerConfig::default()), + } + } } diff --git a/bothan-api/server/src/config/manager/band_serde.rs b/bothan-api/server/src/config/manager/band_serde.rs new file mode 100644 index 00000000..1008476a --- /dev/null +++ b/bothan-api/server/src/config/manager/band_serde.rs @@ -0,0 +1,19 @@ +// Macro to generate deserialization functions for Band workers with preset names. +// This macro defines a function that: +// - Deserializes an Option, +// - If present, creates a new WorkerOpts with the given name and original URL/update_interval. +macro_rules! de_band_named { + ($fn_name:ident, $name:expr) => { + fn $fn_name<'de, D>(d: D) -> Result, D::Error> + where + D: serde::Deserializer<'de>, + { + use serde::Deserialize; + let v = Option::::deserialize(d)?; + let v = v.map(|w| bothan_band::WorkerOpts::new($name, &w.url, Some(w.update_interval))); + Ok(v) + } + }; +} + +pub(crate) use de_band_named; diff --git a/bothan-api/server/src/config/manager/crypto_info.rs b/bothan-api/server/src/config/manager/crypto_info.rs index 4db5bb6f..03354101 100644 --- a/bothan-api/server/src/config/manager/crypto_info.rs +++ b/bothan-api/server/src/config/manager/crypto_info.rs @@ -20,7 +20,7 @@ pub mod sources; #[derive(Clone, Debug, Serialize, Deserialize)] pub struct CryptoInfoManagerConfig { /// The source configuration for the crypto asset info manager. - pub source: CryptoSourceConfigs, + pub source: Option, /// The stale threshold for the crypto asset info (in seconds). /// Any source that has not been updated in this amount of time /// relative to the call will be considered stale. @@ -37,7 +37,7 @@ impl Default for CryptoInfoManagerConfig { /// Creates a new `CryptoInfoManagerConfig` with default values. fn default() -> Self { CryptoInfoManagerConfig { - source: CryptoSourceConfigs::default(), + source: Some(CryptoSourceConfigs::default()), stale_threshold: default_stale_threshold(), } } diff --git a/bothan-api/server/src/config/manager/crypto_info/sources.rs b/bothan-api/server/src/config/manager/crypto_info/sources.rs index 85c8ab83..52163c64 100644 --- a/bothan-api/server/src/config/manager/crypto_info/sources.rs +++ b/bothan-api/server/src/config/manager/crypto_info/sources.rs @@ -11,6 +11,8 @@ use serde::{Deserialize, Serialize}; +use crate::config::manager::band_serde::de_band_named; + /// Configuration for the worker sources for crypto asset info. #[derive(Clone, Debug, Serialize, Deserialize)] pub struct CryptoSourceConfigs { @@ -46,25 +48,9 @@ pub struct CryptoSourceConfigs { pub band_macaw: Option, } -// Macro to generate deserialization functions for Band workers with preset names. -// This macro defines a function that: -// - Deserializes an Option, -// - If present, creates a new WorkerOpts with the given name and original URL/update_interval. -macro_rules! de_band_named { - ($fn_name:ident, $name:expr) => { - fn $fn_name<'de, D>(d: D) -> Result, D::Error> - where - D: serde::Deserializer<'de>, - { - let v = Option::::deserialize(d)?; - let v = v.map(|w| bothan_band::WorkerOpts::new($name, &w.url, Some(w.update_interval))); - Ok(v) - } - }; -} - const BAND1_WORKER_NAME: &str = "band/kiwi"; de_band_named!(de_kiwi, BAND1_WORKER_NAME); + const BAND2_WORKER_NAME: &str = "band/macaw"; de_band_named!(de_macaw, BAND2_WORKER_NAME); diff --git a/bothan-api/server/src/config/manager/forex_info.rs b/bothan-api/server/src/config/manager/forex_info.rs new file mode 100644 index 00000000..bce30b61 --- /dev/null +++ b/bothan-api/server/src/config/manager/forex_info.rs @@ -0,0 +1,37 @@ +//! Bothan API server forex info manager configuration. +//! +//! Settings for forex asset info sources and staleness threshold. + +use serde::{Deserialize, Serialize}; + +use crate::config::manager::forex_info::sources::ForexSourceConfigs; + +/// Forex info source configuration module. +pub mod sources; + +/// Configuration for the Bothan API Server's forex asset info manager. +#[derive(Clone, Debug, Serialize, Deserialize)] +pub struct ForexInfoManagerConfig { + /// The source configuration for the forex asset info manager. + pub source: Option, + /// The stale threshold for the forex asset info (in seconds). + /// Any source that has not been updated in this amount of time + /// relative to the call will be considered stale. + #[serde(default = "default_stale_threshold")] + pub stale_threshold: i64, +} + +/// Returns the default stale threshold (in seconds). +fn default_stale_threshold() -> i64 { + 7200 +} + +impl Default for ForexInfoManagerConfig { + /// Creates a new `ForexInfoManagerConfig` with default values. + fn default() -> Self { + ForexInfoManagerConfig { + source: Some(ForexSourceConfigs::default()), + stale_threshold: default_stale_threshold(), + } + } +} diff --git a/bothan-api/server/src/config/manager/forex_info/sources.rs b/bothan-api/server/src/config/manager/forex_info/sources.rs new file mode 100644 index 00000000..a0a331e8 --- /dev/null +++ b/bothan-api/server/src/config/manager/forex_info/sources.rs @@ -0,0 +1,61 @@ +//! Bothan API server forex source configuration. +//! +//! Worker options for supported forex data sources. + +use serde::{Deserialize, Serialize}; + +use crate::config::manager::band_serde::de_band_named; + +/// Configuration for the worker sources for forex asset info. +#[derive(Clone, Debug, Serialize, Deserialize)] +pub struct ForexSourceConfigs { + /// Band/owlet worker options. + /// + /// NOTE: The `name` field in `WorkerOpts` is marked with `#[serde(skip)]`, so deserialized instances + /// will have an empty/default name. The custom deserializer `de_owlet` reconstructs the options + #[serde(default, deserialize_with = "de_owlet")] + pub band_owlet: Option, + /// Band/fieldfare worker options. + /// + /// NOTE: The `name` field in `WorkerOpts` is marked with `#[serde(skip)]`, so deserialized instances + /// will have an empty/default name. The custom deserializer `de_fieldfare` reconstructs the options + #[serde(default, deserialize_with = "de_fieldfare")] + pub band_fieldfare: Option, + /// Band/xenops worker options. + /// + /// NOTE: The `name` field in `WorkerOpts` is marked with `#[serde(skip)]`, so deserialized instances + /// will have an empty/default name. The custom deserializer `de_xenops` reconstructs the options + #[serde(default, deserialize_with = "de_xenops")] + pub band_xenops: Option, +} + +const BAND1_WORKER_NAME: &str = "band/owlet"; +de_band_named!(de_owlet, BAND1_WORKER_NAME); + +const BAND2_WORKER_NAME: &str = "band/fieldfare"; +de_band_named!(de_fieldfare, BAND2_WORKER_NAME); + +const BAND3_WORKER_NAME: &str = "band/xenops"; +de_band_named!(de_xenops, BAND3_WORKER_NAME); + +impl Default for ForexSourceConfigs { + fn default() -> Self { + ForexSourceConfigs { + band_owlet: Some(bothan_band::WorkerOpts::new( + "band/owlet", + "https://owlet.bandchain.org", + None, + )), + band_fieldfare: Some(bothan_band::WorkerOpts::new( + "band/fieldfare", + "https://fieldfare.bandchain.org", + None, + )), + band_xenops: Some(bothan_band::WorkerOpts::new( + "band/xenops", + "https://xenops.bandchain.org", + None, + )), + } + } +} diff --git a/bothan-band/Cargo.toml b/bothan-band/Cargo.toml index e2b90c5c..ad7dd1d8 100644 --- a/bothan-band/Cargo.toml +++ b/bothan-band/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "bothan-band" -version = "0.1.0" +version = "0.1.1" description = "Rust client for the Band source with Bothan integration" edition.workspace = true license.workspace = true diff --git a/bothan-binance/Cargo.toml b/bothan-binance/Cargo.toml index 6e79322d..713a0674 100644 --- a/bothan-binance/Cargo.toml +++ b/bothan-binance/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "bothan-binance" -version = "0.1.0" +version = "0.1.1" description = "Rust client for the Binance exchange with Bothan integration" edition.workspace = true license.workspace = true diff --git a/bothan-bitfinex/Cargo.toml b/bothan-bitfinex/Cargo.toml index 41dd2bcf..203eba9d 100644 --- a/bothan-bitfinex/Cargo.toml +++ b/bothan-bitfinex/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "bothan-bitfinex" -version = "0.1.0" +version = "0.1.1" description = "Rust client for the Bitfinex exchange with Bothan integration" edition.workspace = true license.workspace = true diff --git a/bothan-bitfinex/src/api/msg/ticker.rs b/bothan-bitfinex/src/api/msg/ticker.rs index f5f5a674..8e940903 100644 --- a/bothan-bitfinex/src/api/msg/ticker.rs +++ b/bothan-bitfinex/src/api/msg/ticker.rs @@ -61,7 +61,7 @@ mod test { #[test] fn test_parse_tickers_from_array() { - let json = r#"[["tBTCUSD",101530,39.76548266,101540,32.24226311,2680,0.0271063,101550,661.88869229,102760,98740],["fUSD",0.000180427397260274,0.0002,120,35441993.51575242,0.00008219,2,39208.22419296,-0.00005519,-0.5017,0.00005481,406448929.8255126,0.000137,0.000024,null,null,5863426.35928275]]"#; + let json = r#"[["tBTCUSD",101530,39.76548266,101540,32.24226311,2680,0.0271063,101550,661.88869229,102760,98740,1457539475000],["fUSD",0.000180427397260274,0.0002,120,35441993.51575242,0.00008219,2,39208.22419296,-0.00005519,-0.5017,0.00005481,406448929.8255126,0.000137,0.000024,null,null,5863426.35928275,1457539475000]]"#; let ticker: Vec = serde_json::from_str(json).unwrap(); let expected = vec![ @@ -77,6 +77,7 @@ mod test { volume: 661.88869229, high: 102760.0, low: 98740.0, + first_trade: 1457539475000, }), Ticker::Funding(funding::Ticker { symbol: "fUSD".to_string(), @@ -94,6 +95,7 @@ mod test { high: 0.000137, low: 0.000024, frr_amount_available: 5863426.35928275, + first_trade: 1457539475000, }), ]; assert_eq!(ticker, expected); diff --git a/bothan-bitfinex/src/api/msg/ticker/funding.rs b/bothan-bitfinex/src/api/msg/ticker/funding.rs index 6d2a18b0..ddbef5f9 100644 --- a/bothan-bitfinex/src/api/msg/ticker/funding.rs +++ b/bothan-bitfinex/src/api/msg/ticker/funding.rs @@ -40,6 +40,8 @@ pub struct Ticker { pub low: f64, /// The amount of funding that is available at the Flash Return Rate. pub frr_amount_available: f64, + /// Timestamp of the first trade + pub first_trade: i64, } impl<'de> Deserialize<'de> for Ticker { @@ -65,6 +67,7 @@ impl<'de> Deserialize<'de> for Ticker { High, Low, FrrAmountAvailable, + FirstTrade, } struct FundingTickerVisitor {} @@ -72,7 +75,7 @@ impl<'de> Deserialize<'de> for Ticker { type Value = Ticker; fn expecting(&self, formatter: &mut fmt::Formatter) -> fmt::Result { - formatter.write_str("tuple with length 15") + formatter.write_str("tuple with length 18") } fn visit_seq(self, mut seq: V) -> Result @@ -131,6 +134,9 @@ impl<'de> Deserialize<'de> for Ticker { let frr_amount_available = seq .next_element()? .ok_or_else(|| de::Error::invalid_length(16, &self))?; + let first_trade = seq + .next_element()? + .ok_or_else(|| de::Error::invalid_length(17, &self))?; let funding_ticker = Ticker { symbol, @@ -148,6 +154,7 @@ impl<'de> Deserialize<'de> for Ticker { high, low, frr_amount_available, + first_trade, }; Ok(funding_ticker) } @@ -171,6 +178,7 @@ impl<'de> Deserialize<'de> for Ticker { let mut high = None; let mut low = None; let mut frr_amount_available = None; + let mut first_trade = None; while let Some(key) = map.next_key()? { match key { @@ -264,6 +272,12 @@ impl<'de> Deserialize<'de> for Ticker { } frr_amount_available = Some(map.next_value()?); } + Field::FirstTrade => { + if first_trade.is_some() { + return Err(de::Error::duplicate_field("first_trade")); + } + first_trade = Some(map.next_value()?); + } } } @@ -288,6 +302,8 @@ impl<'de> Deserialize<'de> for Ticker { let low = low.ok_or_else(|| de::Error::missing_field("low"))?; let frr_amount_available = frr_amount_available .ok_or_else(|| de::Error::missing_field("frr_amount_available"))?; + let first_trade = + first_trade.ok_or_else(|| de::Error::missing_field("first_trade"))?; let ticker = Ticker { symbol, @@ -305,6 +321,7 @@ impl<'de> Deserialize<'de> for Ticker { high, low, frr_amount_available, + first_trade, }; Ok(ticker) @@ -327,6 +344,7 @@ impl<'de> Deserialize<'de> for Ticker { "high", "low", "frr_amount_available", + "first_trade", ]; deserializer.deserialize_struct("FundingTicker", FIELDS, FundingTickerVisitor {}) } @@ -338,7 +356,7 @@ mod tests { #[test] fn test_deserialize_funding_ticker_from_array() { - let json = r#"["fUSD",0.00018055342465753425,0.0002,120,35545399.51575242,0.00008219178082191781,2,28117235.06098758,-0.0000278,-0.2528,0.00008219,413386933.358769,0.000137,0.000025,null,null,5817583.43063814]"#; + let json = r#"["fUSD",0.00018055342465753425,0.0002,120,35545399.51575242,0.00008219178082191781,2,28117235.06098758,-0.0000278,-0.2528,0.00008219,413386933.358769,0.000137,0.000025,null,null,5817583.43063814,1457539475000]"#; let funding_ticker: Ticker = serde_json::from_str(json).unwrap(); let expected = Ticker { @@ -357,6 +375,7 @@ mod tests { high: 0.000137, low: 0.000025, frr_amount_available: 5817583.43063814, + first_trade: 1457539475000, }; assert_eq!(funding_ticker, expected); @@ -369,13 +388,13 @@ mod tests { assert_eq!( funding_ticker.err().unwrap().to_string(), - "invalid length 16, expected tuple with length 15 at line 1 column 178" + "invalid length 16, expected tuple with length 18 at line 1 column 178" ); } #[test] fn test_deserialize_funding_ticker_from_array_with_invalid_value() { - let json = r#"[1000000,0.00018055342465753425,0.0002,120,35545399.51575242,0.00008219178082191781,2,28117235.06098758,-0.0000278,-0.2528,0.00008219,413386933.358769,0.000137,0.000025,null,null,5817583.43063814]"#; + let json = r#"[1000000,0.00018055342465753425,0.0002,120,35545399.51575242,0.00008219178082191781,2,28117235.06098758,-0.0000278,-0.2528,0.00008219,413386933.358769,0.000137,0.000025,null,null,5817583.43063814,1457539475000]"#; let funding_ticker: Result = serde_json::from_str(json); assert_eq!( @@ -386,7 +405,7 @@ mod tests { #[test] fn test_deserialize_funding_ticker_from_map() { - let json = r#"{"symbol":"fUSD","frr":0.00018055342465753425,"bid":0.0002,"bid_period":120,"bid_size":35545399.51575242,"ask":0.00008219178082191781,"ask_period":2,"ask_size":28117235.06098758,"daily_change":-0.0000278,"daily_change_relative":-0.2528,"last_price":0.00008219,"volume":413386933.358769,"high":0.000137,"low":0.000025,"frr_amount_available":5817583.43063814}"#; + let json = r#"{"symbol":"fUSD","frr":0.00018055342465753425,"bid":0.0002,"bid_period":120,"bid_size":35545399.51575242,"ask":0.00008219178082191781,"ask_period":2,"ask_size":28117235.06098758,"daily_change":-0.0000278,"daily_change_relative":-0.2528,"last_price":0.00008219,"volume":413386933.358769,"high":0.000137,"low":0.000025,"frr_amount_available":5817583.43063814,"first_trade":1457539475000}"#; let funding_ticker: Ticker = serde_json::from_str(json).unwrap(); let expected = Ticker { @@ -405,6 +424,7 @@ mod tests { high: 0.000137, low: 0.000025, frr_amount_available: 5817583.43063814, + first_trade: 1457539475000, }; assert_eq!(funding_ticker, expected); @@ -428,7 +448,7 @@ mod tests { assert_eq!( funding_ticker.err().unwrap().to_string(), - "unknown field `abc`, expected one of `symbol`, `frr`, `bid`, `bid_period`, `bid_size`, `ask`, `ask_period`, `ask_size`, `daily_change`, `daily_change_relative`, `last_price`, `volume`, `high`, `low`, `frr_amount_available` at line 1 column 6" + "unknown field `abc`, expected one of `symbol`, `frr`, `bid`, `bid_period`, `bid_size`, `ask`, `ask_period`, `ask_size`, `daily_change`, `daily_change_relative`, `last_price`, `volume`, `high`, `low`, `frr_amount_available`, `first_trade` at line 1 column 6" ); } diff --git a/bothan-bitfinex/src/api/msg/ticker/spot.rs b/bothan-bitfinex/src/api/msg/ticker/spot.rs index 54b34408..049f25a0 100644 --- a/bothan-bitfinex/src/api/msg/ticker/spot.rs +++ b/bothan-bitfinex/src/api/msg/ticker/spot.rs @@ -32,6 +32,8 @@ pub struct Ticker { pub high: f64, /// Daily low. pub low: f64, + /// Timestamp of the first trade. + pub first_trade: i64, } impl<'de> Deserialize<'de> for Ticker { @@ -53,6 +55,7 @@ impl<'de> Deserialize<'de> for Ticker { Volume, High, Low, + FirstTrade, } struct SpotTickerVisitor {} @@ -60,7 +63,7 @@ impl<'de> Deserialize<'de> for Ticker { type Value = Ticker; fn expecting(&self, formatter: &mut fmt::Formatter) -> fmt::Result { - formatter.write_str("tuple with length 11") + formatter.write_str("tuple with length 12") } fn visit_seq(self, mut seq: V) -> Result @@ -100,6 +103,11 @@ impl<'de> Deserialize<'de> for Ticker { let low = seq .next_element()? .ok_or_else(|| de::Error::invalid_length(10, &self))?; + let first_trade = seq + .next_element()? + .ok_or_else(|| de::Error::invalid_length(11, &self))?; + + while seq.next_element::()?.is_some() {} let spot_ticker = Ticker { symbol, @@ -113,6 +121,7 @@ impl<'de> Deserialize<'de> for Ticker { volume, high, low, + first_trade, }; Ok(spot_ticker) } @@ -132,6 +141,7 @@ impl<'de> Deserialize<'de> for Ticker { let mut volume = None; let mut high = None; let mut low = None; + let mut first_trade = None; while let Some(key) = map.next_key()? { match key { @@ -201,6 +211,12 @@ impl<'de> Deserialize<'de> for Ticker { } low = Some(map.next_value()?); } + Field::FirstTrade => { + if first_trade.is_some() { + return Err(de::Error::duplicate_field("first_trade")); + } + first_trade = Some(map.next_value()?); + } } } @@ -218,6 +234,8 @@ impl<'de> Deserialize<'de> for Ticker { let volume = volume.ok_or_else(|| de::Error::missing_field("volume"))?; let high = high.ok_or_else(|| de::Error::missing_field("high"))?; let low = low.ok_or_else(|| de::Error::missing_field("low"))?; + let first_trade = + first_trade.ok_or_else(|| de::Error::missing_field("first_trade"))?; let ticker = Ticker { symbol, @@ -231,6 +249,7 @@ impl<'de> Deserialize<'de> for Ticker { volume, high, low, + first_trade, }; Ok(ticker) } @@ -248,6 +267,7 @@ impl<'de> Deserialize<'de> for Ticker { "volume", "high", "low", + "first_trade", ]; deserializer.deserialize_struct("SpotTicker", FIELDS, SpotTickerVisitor {}) } @@ -259,7 +279,7 @@ mod tests { #[test] fn test_deserialize_spot_ticker_from_array() { - let json = r#"["tBTCUSD",101740,93.86022424,101750,38.06413103,2132,0.02140175,101750,663.27534767,102760,98740]"#; + let json = r#"["tBTCUSD",101740,93.86022424,101750,38.06413103,2132,0.02140175,101750,663.27534767,102760,98740,1457539475000]"#; let spot_ticker: Ticker = serde_json::from_str(json).unwrap(); let expected = Ticker { @@ -274,6 +294,7 @@ mod tests { volume: 663.27534767, high: 102760.0, low: 98740.0, + first_trade: 1457539475000, }; assert_eq!(spot_ticker, expected); @@ -286,7 +307,7 @@ mod tests { assert_eq!( spot_ticker.err().unwrap().to_string(), - "invalid length 9, expected tuple with length 11 at line 1 column 85" + "invalid length 9, expected tuple with length 12 at line 1 column 85" ); } @@ -304,7 +325,7 @@ mod tests { #[test] fn test_deserialize_spot_ticker_from_map() { - let json = r#"{"symbol":"tBTCUSD","bid":101740,"bid_size":93.86022424,"ask":101750,"ask_size":38.06413103,"daily_change":2132,"daily_change_relative":0.02140175,"last_price":101750,"volume":663.27534767,"high":102760,"low":98740.0}"#; + let json = r#"{"symbol":"tBTCUSD","bid":101740,"bid_size":93.86022424,"ask":101750,"ask_size":38.06413103,"daily_change":2132,"daily_change_relative":0.02140175,"last_price":101750,"volume":663.27534767,"high":102760,"low":98740.0,"first_trade":1457539475000}"#; let spot_ticker: Ticker = serde_json::from_str(json).unwrap(); let expected = Ticker { @@ -319,6 +340,7 @@ mod tests { volume: 663.27534767, high: 102760.0, low: 98740.0, + first_trade: 1457539475000, }; assert_eq!(spot_ticker, expected); @@ -342,7 +364,7 @@ mod tests { assert_eq!( spot_ticker.err().unwrap().to_string(), - "unknown field `abc`, expected one of `symbol`, `bid`, `bid_size`, `ask`, `ask_size`, `daily_change`, `daily_change_relative`, `last_price`, `volume`, `high`, `low` at line 1 column 6" + "unknown field `abc`, expected one of `symbol`, `bid`, `bid_size`, `ask`, `ask_size`, `daily_change`, `daily_change_relative`, `last_price`, `volume`, `high`, `low`, `first_trade` at line 1 column 6" ); } diff --git a/bothan-bitfinex/src/api/rest.rs b/bothan-bitfinex/src/api/rest.rs index 2cf87c99..b00b6311 100644 --- a/bothan-bitfinex/src/api/rest.rs +++ b/bothan-bitfinex/src/api/rest.rs @@ -86,7 +86,7 @@ impl RestApi { /// /// # Examples /// - /// ```rust + /// ```rust,no_run /// use bothan_bitfinex::api::rest::RestApi; /// use reqwest::Client; /// use url::Url; diff --git a/bothan-bybit/Cargo.toml b/bothan-bybit/Cargo.toml index 768be99d..661cf0fc 100644 --- a/bothan-bybit/Cargo.toml +++ b/bothan-bybit/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "bothan-bybit" -version = "0.1.0" +version = "0.1.1" description = "Rust client for the Bybit exchange with Bothan integration" edition.workspace = true license.workspace = true diff --git a/bothan-coinbase/Cargo.toml b/bothan-coinbase/Cargo.toml index 9a188983..814f23c1 100644 --- a/bothan-coinbase/Cargo.toml +++ b/bothan-coinbase/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "bothan-coinbase" -version = "0.1.0" +version = "0.1.1" description = "Rust client for the Coinbase exchange with Bothan integration" edition.workspace = true license.workspace = true diff --git a/bothan-coingecko/Cargo.toml b/bothan-coingecko/Cargo.toml index b033007d..e1a4f089 100644 --- a/bothan-coingecko/Cargo.toml +++ b/bothan-coingecko/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "bothan-coingecko" -version = "0.1.0" +version = "0.1.1" description = "Rust client for the CoinGecko exchange with Bothan integration" edition.workspace = true license.workspace = true diff --git a/bothan-coinmarketcap/Cargo.toml b/bothan-coinmarketcap/Cargo.toml index 61888470..77381edb 100644 --- a/bothan-coinmarketcap/Cargo.toml +++ b/bothan-coinmarketcap/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "bothan-coinmarketcap" -version = "0.1.0" +version = "0.1.1" description = "Rust client for the CoinMarketCap exchange with Bothan integration" edition.workspace = true license.workspace = true diff --git a/bothan-core/Cargo.toml b/bothan-core/Cargo.toml index d7e6d514..e333ba77 100644 --- a/bothan-core/Cargo.toml +++ b/bothan-core/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "bothan-core" -version = "0.1.0" +version = "0.1.1" description = "Core library for Bothan" authors.workspace = true edition.workspace = true diff --git a/bothan-core/src/ipfs/client.rs b/bothan-core/src/ipfs/client.rs index 6a5b698c..cd4fb6d5 100644 --- a/bothan-core/src/ipfs/client.rs +++ b/bothan-core/src/ipfs/client.rs @@ -6,6 +6,7 @@ use reqwest::{Client, StatusCode}; use crate::ipfs::error::Error; +#[derive(Clone)] pub struct IpfsClient { url: String, client: Client, diff --git a/bothan-core/src/manager.rs b/bothan-core/src/manager.rs index 4908c9f1..c226a301 100644 --- a/bothan-core/src/manager.rs +++ b/bothan-core/src/manager.rs @@ -1,7 +1,7 @@ -//! Manager module for crypto asset information and workers. +//! Manager module for crypto and forex asset information and workers. //! -//! Provides the crypto asset info manager and related types. +//! Provides the crypto and forex asset info manager and related types. -pub use crypto_asset_info::CryptoAssetInfoManager; +pub use asset_info::AssetInfoManager; -pub mod crypto_asset_info; +pub mod asset_info; diff --git a/bothan-core/src/manager/crypto_asset_info.rs b/bothan-core/src/manager/asset_info.rs similarity index 67% rename from bothan-core/src/manager/crypto_asset_info.rs rename to bothan-core/src/manager/asset_info.rs index 18099013..d9adfa2e 100644 --- a/bothan-core/src/manager/crypto_asset_info.rs +++ b/bothan-core/src/manager/asset_info.rs @@ -2,9 +2,9 @@ //! //! Provides types and logic for managing crypto asset information. -pub use manager::CryptoAssetInfoManager; -pub use worker::CryptoAssetWorker; -pub use worker::opts::CryptoAssetWorkerOpts; +pub use manager::AssetInfoManager; +pub use worker::AnyAssetWorker; +pub use worker::opts::AssetWorkerOpts; pub mod error; pub(super) mod manager; diff --git a/bothan-core/src/manager/crypto_asset_info/error.rs b/bothan-core/src/manager/asset_info/error.rs similarity index 100% rename from bothan-core/src/manager/crypto_asset_info/error.rs rename to bothan-core/src/manager/asset_info/error.rs diff --git a/bothan-core/src/manager/crypto_asset_info/manager.rs b/bothan-core/src/manager/asset_info/manager.rs similarity index 83% rename from bothan-core/src/manager/crypto_asset_info/manager.rs rename to bothan-core/src/manager/asset_info/manager.rs index a8b6d6a2..a1f1191f 100644 --- a/bothan-core/src/manager/crypto_asset_info/manager.rs +++ b/bothan-core/src/manager/asset_info/manager.rs @@ -10,7 +10,7 @@ use std::time::Duration; use bothan_lib::metrics::store::Metrics; use bothan_lib::registry::{Invalid, Registry}; use bothan_lib::store::Store; -use bothan_lib::worker::AssetWorker; +use bothan_lib::worker::AssetWorker as AssetWorkerTrait; use mini_moka::sync::Cache; use semver::{Version, VersionReq}; use serde_json::from_str; @@ -19,22 +19,22 @@ use tokio::time::sleep; use crate::ipfs::IpfsClient; use crate::ipfs::error::Error as IpfsError; -use crate::manager::crypto_asset_info::error::{ +use crate::manager::asset_info::error::{ PostHeartbeatError, PushMonitoringRecordError, SetRegistryError, }; -use crate::manager::crypto_asset_info::price::tasks::get_signal_price_states; -use crate::manager::crypto_asset_info::types::{ - CryptoAssetManagerInfo, MONITORING_TTL, PriceSignalComputationRecord, PriceState, +use crate::manager::asset_info::price::tasks::get_signal_price_states; +use crate::manager::asset_info::types::{ + AssetManagerInfo, MONITORING_TTL, PriceSignalComputationRecord, PriceState, }; -use crate::manager::crypto_asset_info::worker::opts::CryptoAssetWorkerOpts; -use crate::manager::crypto_asset_info::worker::{CryptoAssetWorker, build_workers}; +use crate::manager::asset_info::worker::opts::AssetWorkerOpts; +use crate::manager::asset_info::worker::{AnyAssetWorker, build_workers}; use crate::monitoring::{Client as MonitoringClient, create_uuid}; -pub struct CryptoAssetInfoManager { +pub struct AssetInfoManager { store: S, - opts: HashMap, - workers: Mutex>, - stale_threshold: i64, + worker_opts: HashMap, + workers: Mutex>, + prefix_stale_thresholds: HashMap, ipfs_client: IpfsClient, bothan_version: Version, registry_version_requirement: VersionReq, @@ -43,13 +43,14 @@ pub struct CryptoAssetInfoManager { metrics: Metrics, } -impl CryptoAssetInfoManager { - /// builds a new `CryptoAssetInfoManager`. +impl AssetInfoManager { + /// builds a new `AssetInfoManager`. + #[allow(clippy::too_many_arguments)] pub async fn build( store: S, - opts: HashMap, + worker_opts: HashMap, ipfs_client: IpfsClient, - stale_threshold: i64, + prefix_stale_thresholds: HashMap, bothan_version: Version, registry_version_requirement: VersionReq, monitoring_client: Option>, @@ -60,15 +61,15 @@ impl CryptoAssetInfoManager { let registry = store.get_registry().await?; - let workers = Mutex::new(build_workers(®istry, &opts, store.clone()).await); + let workers = Mutex::new(build_workers(®istry, &worker_opts, store.clone()).await); let metrics = Metrics::new(); - let manager = CryptoAssetInfoManager { + let manager = AssetInfoManager { store, - opts, + worker_opts, workers, - stale_threshold, + prefix_stale_thresholds, ipfs_client, bothan_version, registry_version_requirement, @@ -80,8 +81,8 @@ impl CryptoAssetInfoManager { Ok(manager) } - /// Gets the `CryptoAssetManagerInfo`. - pub async fn get_info(&self) -> Result { + /// Gets the `AssetManagerInfo`. + pub async fn get_info(&self) -> Result { let bothan_version = self.bothan_version.to_string(); let registry_hash = self .store @@ -95,9 +96,9 @@ impl CryptoAssetInfoManager { .await .iter() .map(|w| w.name().to_string()) - .collect(); + .collect::>(); - Ok(CryptoAssetManagerInfo::new( + Ok(AssetManagerInfo::new( bothan_version, registry_hash, registry_version_requirement, @@ -121,7 +122,8 @@ impl CryptoAssetInfoManager { .await .iter() .map(|w| w.name().to_string()) - .collect(); + .collect::>(); + let bothan_version = self.bothan_version.clone(); let registry_hash = self .store @@ -145,16 +147,13 @@ impl CryptoAssetInfoManager { ) -> Result<(String, Vec), S::Error> { let registry = self.store.get_registry().await?; - let current_time = chrono::Utc::now().timestamp(); - let stale_cutoff = current_time - self.stale_threshold; - let mut records = Vec::new(); let price_states = get_signal_price_states( ids, &self.store, ®istry, - stale_cutoff, + &self.prefix_stale_thresholds, &mut records, &self.metrics, ) @@ -250,7 +249,7 @@ impl CryptoAssetInfoManager { // TODO: find method to wait for connections to clear up thats better than sleeping for 1 second sleep(Duration::from_secs(1)).await; - let workers = build_workers(®istry, &self.opts, self.store.clone()).await; + let workers = build_workers(®istry, &self.worker_opts, self.store.clone()).await; *locked_workers = workers; Ok(()) diff --git a/bothan-core/src/manager/crypto_asset_info/price.rs b/bothan-core/src/manager/asset_info/price.rs similarity index 100% rename from bothan-core/src/manager/crypto_asset_info/price.rs rename to bothan-core/src/manager/asset_info/price.rs diff --git a/bothan-core/src/manager/crypto_asset_info/price/cache.rs b/bothan-core/src/manager/asset_info/price/cache.rs similarity index 97% rename from bothan-core/src/manager/crypto_asset_info/price/cache.rs rename to bothan-core/src/manager/asset_info/price/cache.rs index b3aac3c8..967c3863 100644 --- a/bothan-core/src/manager/crypto_asset_info/price/cache.rs +++ b/bothan-core/src/manager/asset_info/price/cache.rs @@ -8,7 +8,7 @@ use std::hash::Hash; use rust_decimal::Decimal; -use crate::manager::crypto_asset_info::types::PriceState; +use crate::manager::asset_info::types::PriceState; /// In-memory cache for storing `PriceState` values keyed by asset or signal ID. pub struct PriceCache { diff --git a/bothan-core/src/manager/crypto_asset_info/price/error.rs b/bothan-core/src/manager/asset_info/price/error.rs similarity index 100% rename from bothan-core/src/manager/crypto_asset_info/price/error.rs rename to bothan-core/src/manager/asset_info/price/error.rs diff --git a/bothan-core/src/manager/crypto_asset_info/price/tasks.rs b/bothan-core/src/manager/asset_info/price/tasks.rs similarity index 95% rename from bothan-core/src/manager/crypto_asset_info/price/tasks.rs rename to bothan-core/src/manager/asset_info/price/tasks.rs index e89ec102..b9ede272 100644 --- a/bothan-core/src/manager/crypto_asset_info/price/tasks.rs +++ b/bothan-core/src/manager/asset_info/price/tasks.rs @@ -5,7 +5,7 @@ //! processor and post-processors, and handles any missing prerequisites by recursively //! fetching the required data. -use std::collections::HashSet; +use std::collections::{HashMap, HashSet}; use std::time::Instant; use bothan_lib::metrics::store::{Metrics, Operation, OperationStatus}; @@ -18,23 +18,27 @@ use num_traits::Zero; use rust_decimal::Decimal; use tracing::{debug, error, info, warn}; -use crate::manager::crypto_asset_info::price::cache::PriceCache; -use crate::manager::crypto_asset_info::price::error::{Error, MissingPrerequisiteError}; -use crate::manager::crypto_asset_info::types::{PriceSignalComputationRecord, PriceState}; +use crate::manager::asset_info::price::cache::PriceCache; +use crate::manager::asset_info::price::error::{Error, MissingPrerequisiteError}; +use crate::manager::asset_info::types::{PriceSignalComputationRecord, PriceState}; use crate::monitoring::types::{OperationRecord, ProcessRecord, SourceRecord}; +const DEFAULT_STALE_TIME: i64 = 600; // 10 minutes + // TODO: Allow records to be Option /// Computes the price states for a list of signal ids. pub async fn get_signal_price_states( ids: Vec, store: &S, registry: &Registry, - stale_cutoff: i64, + prefix_stale_thresholds: &HashMap, records: &mut Vec, metrics: &Metrics, ) -> Vec { let mut cache = PriceCache::new(); + let current_time = chrono::Utc::now().timestamp(); + let mut queue = ids.clone(); while let Some(id) = queue.pop() { if cache.contains(&id) { @@ -46,7 +50,8 @@ pub async fn get_signal_price_states( &id, store, registry, - stale_cutoff, + current_time, + prefix_stale_thresholds, &cache, &mut record, metrics, @@ -91,17 +96,20 @@ pub async fn get_signal_price_states( .collect() } +#[allow(clippy::too_many_arguments)] async fn compute_signal_result( id: &str, store: &S, registry: &Registry, - stale_cutoff: i64, + current_time: i64, + prefix_stale_thresholds: &HashMap, cache: &PriceCache, record: &mut PriceSignalComputationRecord, metrics: &Metrics, ) -> Result { match registry.get(id) { Some(signal) => { + let stale_cutoff = current_time - get_stale_time(id, prefix_stale_thresholds); let source_results = compute_source_result(signal, store, cache, stale_cutoff, record, metrics).await?; @@ -136,7 +144,20 @@ async fn compute_signal_result( Ok(processed_signal) } - None => Err(Error::InvalidSignal), + _ => Err(Error::InvalidSignal), + } +} + +fn get_stale_time(id: &str, prefix_stale_thresholds: &HashMap) -> i64 { + match id.split(':').next() { + Some(prefix) => match prefix.chars().next() { + Some(first_letter) => prefix_stale_thresholds + .get(&first_letter) + .cloned() + .unwrap_or(DEFAULT_STALE_TIME), + None => DEFAULT_STALE_TIME, + }, + None => DEFAULT_STALE_TIME, } } @@ -400,7 +421,9 @@ mod tests { ); let registry = mock_registry(); - let stale_cutoff = 0; + let current_timestamp = chrono::Utc::now().timestamp(); + let mut prefix_stale_thresholds = HashMap::new(); + prefix_stale_thresholds.insert('C', current_timestamp); let mut records = Vec::new(); let metrics = Metrics::new(); @@ -408,7 +431,7 @@ mod tests { ids, &mock_store, ®istry, - stale_cutoff, + &prefix_stale_thresholds, &mut records, &metrics, ) @@ -484,14 +507,16 @@ mod tests { let ids = vec!["CS:BTC-USD".to_string(), "CS:USDT-USD".to_string()]; let mock_store = MockStore::default(); let registry = mock_registry(); - let stale_cutoff = 0; + let current_timestamp = chrono::Utc::now().timestamp(); + let mut prefix_stale_thresholds = HashMap::new(); + prefix_stale_thresholds.insert('C', current_timestamp); let mut records = Vec::new(); let metrics = Metrics::new(); let res = get_signal_price_states( ids, &mock_store, ®istry, - stale_cutoff, + &prefix_stale_thresholds, &mut records, &metrics, ) @@ -569,7 +594,9 @@ mod tests { ); let registry = mock_registry(); - let stale_cutoff = 10000; + let current_timestamp = chrono::Utc::now().timestamp(); + let mut prefix_stale_thresholds = HashMap::new(); + prefix_stale_thresholds.insert('C', current_timestamp - 10000); let mut records = Vec::new(); let metrics = Metrics::new(); @@ -577,7 +604,7 @@ mod tests { ids, &mock_store, ®istry, - stale_cutoff, + &prefix_stale_thresholds, &mut records, &metrics, ) diff --git a/bothan-core/src/manager/crypto_asset_info/signal_ids.rs b/bothan-core/src/manager/asset_info/signal_ids.rs similarity index 100% rename from bothan-core/src/manager/crypto_asset_info/signal_ids.rs rename to bothan-core/src/manager/asset_info/signal_ids.rs diff --git a/bothan-core/src/manager/crypto_asset_info/types.rs b/bothan-core/src/manager/asset_info/types.rs similarity index 91% rename from bothan-core/src/manager/crypto_asset_info/types.rs rename to bothan-core/src/manager/asset_info/types.rs index 00025df8..4b298e9b 100644 --- a/bothan-core/src/manager/crypto_asset_info/types.rs +++ b/bothan-core/src/manager/asset_info/types.rs @@ -19,7 +19,7 @@ pub enum PriceState { Unsupported, } -pub struct CryptoAssetManagerInfo { +pub struct AssetManagerInfo { pub bothan_version: String, pub registry_hash: String, pub registry_version_requirement: String, @@ -27,7 +27,7 @@ pub struct CryptoAssetManagerInfo { pub monitoring_enabled: bool, } -impl CryptoAssetManagerInfo { +impl AssetManagerInfo { pub fn new( bothan_version: String, registry_hash: String, @@ -35,7 +35,7 @@ impl CryptoAssetManagerInfo { active_sources: Vec, monitoring_enabled: bool, ) -> Self { - CryptoAssetManagerInfo { + AssetManagerInfo { bothan_version, registry_hash, registry_version_requirement, diff --git a/bothan-core/src/manager/asset_info/worker.rs b/bothan-core/src/manager/asset_info/worker.rs new file mode 100644 index 00000000..b59158e7 --- /dev/null +++ b/bothan-core/src/manager/asset_info/worker.rs @@ -0,0 +1,115 @@ +//! Worker trait and implementations for asset sources. + +pub mod opts; + +use std::collections::HashMap; + +use bothan_lib::registry::{Registry, Valid}; +use bothan_lib::store::Store; +use bothan_lib::worker::AssetWorker; +use bothan_lib::worker::error::AssetWorkerError; +use derive_more::From; +use tracing::{error, info}; + +use crate::manager::asset_info::signal_ids::get_source_batched_query_ids; +use crate::manager::asset_info::worker::opts::AssetWorkerOpts; + +#[derive(From)] +pub enum AnyAssetWorker { + Binance(bothan_binance::Worker), + Bitfinex(bothan_bitfinex::Worker), + Bybit(bothan_bybit::Worker), + Coinbase(bothan_coinbase::Worker), + CoinGecko(bothan_coingecko::Worker), + CoinMarketCap(bothan_coinmarketcap::Worker), + Htx(bothan_htx::Worker), + Kraken(bothan_kraken::Worker), + Okx(bothan_okx::Worker), + Band(bothan_band::Worker), +} + +#[async_trait::async_trait] +impl AssetWorker for AnyAssetWorker { + type Opts = AssetWorkerOpts; + + fn name(&self) -> &'static str { + match self { + AnyAssetWorker::Binance(w) => w.name(), + AnyAssetWorker::Bitfinex(w) => w.name(), + AnyAssetWorker::Bybit(w) => w.name(), + AnyAssetWorker::Coinbase(w) => w.name(), + AnyAssetWorker::CoinGecko(w) => w.name(), + AnyAssetWorker::CoinMarketCap(w) => w.name(), + AnyAssetWorker::Htx(w) => w.name(), + AnyAssetWorker::Kraken(w) => w.name(), + AnyAssetWorker::Okx(w) => w.name(), + AnyAssetWorker::Band(w) => w.name(), + } + } + + async fn build( + opts: Self::Opts, + store: &S, + ids: Vec, + ) -> Result { + Ok(match opts { + AssetWorkerOpts::Binance(opts) => { + AnyAssetWorker::from(bothan_binance::Worker::build(opts, store, ids).await?) + } + AssetWorkerOpts::Bitfinex(opts) => { + AnyAssetWorker::from(bothan_bitfinex::Worker::build(opts, store, ids).await?) + } + AssetWorkerOpts::Bybit(opts) => { + AnyAssetWorker::from(bothan_bybit::Worker::build(opts, store, ids).await?) + } + AssetWorkerOpts::Coinbase(opts) => { + AnyAssetWorker::from(bothan_coinbase::Worker::build(opts, store, ids).await?) + } + AssetWorkerOpts::CoinGecko(opts) => { + AnyAssetWorker::from(bothan_coingecko::Worker::build(opts, store, ids).await?) + } + AssetWorkerOpts::CoinMarketCap(opts) => { + AnyAssetWorker::from(bothan_coinmarketcap::Worker::build(opts, store, ids).await?) + } + AssetWorkerOpts::Htx(opts) => { + AnyAssetWorker::from(bothan_htx::Worker::build(opts, store, ids).await?) + } + AssetWorkerOpts::Kraken(opts) => { + AnyAssetWorker::from(bothan_kraken::Worker::build(opts, store, ids).await?) + } + AssetWorkerOpts::Okx(opts) => { + AnyAssetWorker::from(bothan_okx::Worker::build(opts, store, ids).await?) + } + AssetWorkerOpts::Band(opts) => { + AnyAssetWorker::from(bothan_band::Worker::build(opts, store, ids).await?) + } + }) + } +} + +pub async fn build_workers( + registry: &Registry, + opts: &HashMap, + store: S, +) -> Vec { + let mut workers = Vec::with_capacity(opts.len()); + for (source_id, query_id) in get_source_batched_query_ids(registry).drain() { + match opts.get(&source_id) { + Some(opts) => { + let ids = query_id.into_iter().collect(); + let builder_callable = AnyAssetWorker::build(opts.clone(), &store, ids); + let worker = match builder_callable.await { + Ok(worker) => worker, + Err(e) => { + error!("failed to build worker {}: {}", source_id, e); + continue; + } + }; + workers.push(worker); + } + None => info!("worker {} not activated", source_id), + } + } + + workers +} diff --git a/bothan-core/src/manager/asset_info/worker/opts.rs b/bothan-core/src/manager/asset_info/worker/opts.rs new file mode 100644 index 00000000..ec0df44c --- /dev/null +++ b/bothan-core/src/manager/asset_info/worker/opts.rs @@ -0,0 +1,97 @@ +//! Worker options for configuring asset source workers. + +use { + bothan_band, bothan_binance, bothan_bitfinex, bothan_bybit, bothan_coinbase, bothan_coingecko, + bothan_coinmarketcap, bothan_htx, bothan_kraken, bothan_okx, +}; + +#[derive(Clone)] +pub enum AssetWorkerOpts { + Binance(bothan_binance::WorkerOpts), + Bitfinex(bothan_bitfinex::WorkerOpts), + Bybit(bothan_bybit::WorkerOpts), + Coinbase(bothan_coinbase::WorkerOpts), + CoinGecko(bothan_coingecko::WorkerOpts), + CoinMarketCap(bothan_coinmarketcap::WorkerOpts), + Htx(bothan_htx::WorkerOpts), + Kraken(bothan_kraken::WorkerOpts), + Okx(bothan_okx::WorkerOpts), + Band(bothan_band::WorkerOpts), +} + +impl AssetWorkerOpts { + pub fn name(&self) -> &str { + match self { + AssetWorkerOpts::Binance(_) => "binance", + AssetWorkerOpts::Bitfinex(_) => "bitfinex", + AssetWorkerOpts::Bybit(_) => "bybit", + AssetWorkerOpts::Coinbase(_) => "coinbase", + AssetWorkerOpts::CoinGecko(_) => "coingecko", + AssetWorkerOpts::CoinMarketCap(_) => "coinmarketcap", + AssetWorkerOpts::Htx(_) => "htx", + AssetWorkerOpts::Kraken(_) => "kraken", + AssetWorkerOpts::Okx(_) => "okx", + AssetWorkerOpts::Band(opts) => opts.name(), + } + } +} + +impl From for AssetWorkerOpts { + fn from(value: bothan_binance::WorkerOpts) -> Self { + AssetWorkerOpts::Binance(value) + } +} + +impl From for AssetWorkerOpts { + fn from(value: bothan_bitfinex::WorkerOpts) -> Self { + AssetWorkerOpts::Bitfinex(value) + } +} + +impl From for AssetWorkerOpts { + fn from(value: bothan_bybit::WorkerOpts) -> Self { + AssetWorkerOpts::Bybit(value) + } +} + +impl From for AssetWorkerOpts { + fn from(value: bothan_coinbase::WorkerOpts) -> Self { + AssetWorkerOpts::Coinbase(value) + } +} + +impl From for AssetWorkerOpts { + fn from(value: bothan_coingecko::WorkerOpts) -> Self { + AssetWorkerOpts::CoinGecko(value) + } +} + +impl From for AssetWorkerOpts { + fn from(value: bothan_coinmarketcap::WorkerOpts) -> Self { + AssetWorkerOpts::CoinMarketCap(value) + } +} + +impl From for AssetWorkerOpts { + fn from(value: bothan_htx::WorkerOpts) -> Self { + AssetWorkerOpts::Htx(value) + } +} + +impl From for AssetWorkerOpts { + fn from(value: bothan_kraken::WorkerOpts) -> Self { + AssetWorkerOpts::Kraken(value) + } +} + +impl From for AssetWorkerOpts { + fn from(value: bothan_okx::WorkerOpts) -> Self { + AssetWorkerOpts::Okx(value) + } +} + +impl From for AssetWorkerOpts { + fn from(value: bothan_band::WorkerOpts) -> Self { + AssetWorkerOpts::Band(value) + } +} diff --git a/bothan-core/src/manager/crypto_asset_info/worker.rs b/bothan-core/src/manager/crypto_asset_info/worker.rs deleted file mode 100644 index 252b449b..00000000 --- a/bothan-core/src/manager/crypto_asset_info/worker.rs +++ /dev/null @@ -1,115 +0,0 @@ -//! Worker trait and implementations for crypto asset sources. - -pub mod opts; - -use std::collections::HashMap; - -use bothan_lib::registry::{Registry, Valid}; -use bothan_lib::store::Store; -use bothan_lib::worker::AssetWorker; -use bothan_lib::worker::error::AssetWorkerError; -use derive_more::From; -use tracing::{error, info}; - -use crate::manager::crypto_asset_info::signal_ids::get_source_batched_query_ids; -use crate::manager::crypto_asset_info::worker::opts::CryptoAssetWorkerOpts; - -#[derive(From)] -pub enum CryptoAssetWorker { - Binance(bothan_binance::Worker), - Bitfinex(bothan_bitfinex::Worker), - Bybit(bothan_bybit::Worker), - Coinbase(bothan_coinbase::Worker), - CoinGecko(bothan_coingecko::Worker), - CoinMarketCap(bothan_coinmarketcap::Worker), - Htx(bothan_htx::Worker), - Kraken(bothan_kraken::Worker), - Okx(bothan_okx::Worker), - Band(bothan_band::Worker), -} - -#[async_trait::async_trait] -impl AssetWorker for CryptoAssetWorker { - type Opts = CryptoAssetWorkerOpts; - - fn name(&self) -> &'static str { - match self { - CryptoAssetWorker::Binance(w) => w.name(), - CryptoAssetWorker::Bitfinex(w) => w.name(), - CryptoAssetWorker::Bybit(w) => w.name(), - CryptoAssetWorker::Coinbase(w) => w.name(), - CryptoAssetWorker::CoinGecko(w) => w.name(), - CryptoAssetWorker::CoinMarketCap(w) => w.name(), - CryptoAssetWorker::Htx(w) => w.name(), - CryptoAssetWorker::Kraken(w) => w.name(), - CryptoAssetWorker::Okx(w) => w.name(), - CryptoAssetWorker::Band(w) => w.name(), - } - } - - async fn build( - opts: Self::Opts, - store: &S, - ids: Vec, - ) -> Result { - Ok(match opts { - CryptoAssetWorkerOpts::Binance(opts) => { - CryptoAssetWorker::from(bothan_binance::Worker::build(opts, store, ids).await?) - } - CryptoAssetWorkerOpts::Bitfinex(opts) => { - CryptoAssetWorker::from(bothan_bitfinex::Worker::build(opts, store, ids).await?) - } - CryptoAssetWorkerOpts::Bybit(opts) => { - CryptoAssetWorker::from(bothan_bybit::Worker::build(opts, store, ids).await?) - } - CryptoAssetWorkerOpts::Coinbase(opts) => { - CryptoAssetWorker::from(bothan_coinbase::Worker::build(opts, store, ids).await?) - } - CryptoAssetWorkerOpts::CoinGecko(opts) => { - CryptoAssetWorker::from(bothan_coingecko::Worker::build(opts, store, ids).await?) - } - CryptoAssetWorkerOpts::CoinMarketCap(opts) => CryptoAssetWorker::from( - bothan_coinmarketcap::Worker::build(opts, store, ids).await?, - ), - CryptoAssetWorkerOpts::Htx(opts) => { - CryptoAssetWorker::from(bothan_htx::Worker::build(opts, store, ids).await?) - } - CryptoAssetWorkerOpts::Kraken(opts) => { - CryptoAssetWorker::from(bothan_kraken::Worker::build(opts, store, ids).await?) - } - CryptoAssetWorkerOpts::Okx(opts) => { - CryptoAssetWorker::from(bothan_okx::Worker::build(opts, store, ids).await?) - } - CryptoAssetWorkerOpts::Band(opts) => { - CryptoAssetWorker::from(bothan_band::Worker::build(opts, store, ids).await?) - } - }) - } -} - -pub async fn build_workers( - registry: &Registry, - opts: &HashMap, - store: S, -) -> Vec { - let mut workers = Vec::with_capacity(opts.len()); - for (source_id, query_id) in get_source_batched_query_ids(registry).drain() { - match opts.get(&source_id) { - Some(opts) => { - let ids = query_id.into_iter().collect(); - let builder_callable = CryptoAssetWorker::build(opts.clone(), &store, ids); - let worker = match builder_callable.await { - Ok(worker) => worker, - Err(e) => { - error!("failed to build worker {}: {}", source_id, e); - continue; - } - }; - workers.push(worker); - } - None => info!("worker {} not activated", source_id), - } - } - - workers -} diff --git a/bothan-core/src/manager/crypto_asset_info/worker/opts.rs b/bothan-core/src/manager/crypto_asset_info/worker/opts.rs deleted file mode 100644 index 8f36016f..00000000 --- a/bothan-core/src/manager/crypto_asset_info/worker/opts.rs +++ /dev/null @@ -1,92 +0,0 @@ -//! Worker options for configuring crypto asset source workers. - -#[derive(Clone)] -pub enum CryptoAssetWorkerOpts { - Binance(bothan_binance::WorkerOpts), - Bitfinex(bothan_bitfinex::WorkerOpts), - Bybit(bothan_bybit::WorkerOpts), - Coinbase(bothan_coinbase::WorkerOpts), - CoinGecko(bothan_coingecko::WorkerOpts), - CoinMarketCap(bothan_coinmarketcap::WorkerOpts), - Htx(bothan_htx::WorkerOpts), - Kraken(bothan_kraken::WorkerOpts), - Okx(bothan_okx::WorkerOpts), - Band(bothan_band::WorkerOpts), -} - -impl CryptoAssetWorkerOpts { - pub fn name(&self) -> &str { - match self { - CryptoAssetWorkerOpts::Binance(_) => "binance", - CryptoAssetWorkerOpts::Bitfinex(_) => "bitfinex", - CryptoAssetWorkerOpts::Bybit(_) => "bybit", - CryptoAssetWorkerOpts::Coinbase(_) => "coinbase", - CryptoAssetWorkerOpts::CoinGecko(_) => "coingecko", - CryptoAssetWorkerOpts::CoinMarketCap(_) => "coinmarketcap", - CryptoAssetWorkerOpts::Htx(_) => "htx", - CryptoAssetWorkerOpts::Kraken(_) => "kraken", - CryptoAssetWorkerOpts::Okx(_) => "okx", - CryptoAssetWorkerOpts::Band(opts) => opts.name(), - } - } -} - -impl From for CryptoAssetWorkerOpts { - fn from(value: bothan_binance::WorkerOpts) -> Self { - CryptoAssetWorkerOpts::Binance(value) - } -} - -impl From for CryptoAssetWorkerOpts { - fn from(value: bothan_bitfinex::WorkerOpts) -> Self { - CryptoAssetWorkerOpts::Bitfinex(value) - } -} - -impl From for CryptoAssetWorkerOpts { - fn from(value: bothan_bybit::WorkerOpts) -> Self { - CryptoAssetWorkerOpts::Bybit(value) - } -} - -impl From for CryptoAssetWorkerOpts { - fn from(value: bothan_coinbase::WorkerOpts) -> Self { - CryptoAssetWorkerOpts::Coinbase(value) - } -} - -impl From for CryptoAssetWorkerOpts { - fn from(value: bothan_coingecko::WorkerOpts) -> Self { - CryptoAssetWorkerOpts::CoinGecko(value) - } -} - -impl From for CryptoAssetWorkerOpts { - fn from(value: bothan_coinmarketcap::WorkerOpts) -> Self { - CryptoAssetWorkerOpts::CoinMarketCap(value) - } -} - -impl From for CryptoAssetWorkerOpts { - fn from(value: bothan_htx::WorkerOpts) -> Self { - CryptoAssetWorkerOpts::Htx(value) - } -} - -impl From for CryptoAssetWorkerOpts { - fn from(value: bothan_kraken::WorkerOpts) -> Self { - CryptoAssetWorkerOpts::Kraken(value) - } -} - -impl From for CryptoAssetWorkerOpts { - fn from(value: bothan_okx::WorkerOpts) -> Self { - CryptoAssetWorkerOpts::Okx(value) - } -} - -impl From for CryptoAssetWorkerOpts { - fn from(value: bothan_band::WorkerOpts) -> Self { - CryptoAssetWorkerOpts::Band(value) - } -} diff --git a/bothan-htx/Cargo.toml b/bothan-htx/Cargo.toml index 516ad857..fb38265c 100644 --- a/bothan-htx/Cargo.toml +++ b/bothan-htx/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "bothan-htx" -version = "0.1.0" +version = "0.1.1" description = "Rust client for the HTX exchange with Bothan integration" edition.workspace = true license.workspace = true diff --git a/bothan-htx/src/api/websocket.rs b/bothan-htx/src/api/websocket.rs index 09be7416..d04a2c2d 100644 --- a/bothan-htx/src/api/websocket.rs +++ b/bothan-htx/src/api/websocket.rs @@ -375,7 +375,6 @@ impl AssetInfoProvider for WebSocketConnection { } /// Parses market data from the HTX WebSocket API into `AssetInfo`. -/// /// This function extracts the asset identifier from the channel name and creates /// an `AssetInfo` instance with the last price and timestamp from the ticker data. /// @@ -393,6 +392,8 @@ impl AssetInfoProvider for WebSocketConnection { /// Returns a `ListeningError` if: /// - The channel ID cannot be extracted from the channel name /// - The price data contains invalid values (NaN) +/// +#[allow(clippy::result_large_err)] fn parse_data(data: super::types::Data) -> Result { let ch = data.ch; let id = ch diff --git a/bothan-kraken/Cargo.toml b/bothan-kraken/Cargo.toml index 222b4855..ad0f6787 100644 --- a/bothan-kraken/Cargo.toml +++ b/bothan-kraken/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "bothan-kraken" -version = "0.1.0" +version = "0.1.1" description = "Rust client for the Kraken exchange with Bothan integration" edition.workspace = true license.workspace = true diff --git a/bothan-lib/Cargo.toml b/bothan-lib/Cargo.toml index 093b9676..56e2cea7 100644 --- a/bothan-lib/Cargo.toml +++ b/bothan-lib/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "bothan-lib" -version = "0.1.0" +version = "0.1.1" description = "Library contain base functionality and types for Bothan" authors.workspace = true edition.workspace = true diff --git a/bothan-okx/Cargo.toml b/bothan-okx/Cargo.toml index 6d78662c..22a7dc7a 100644 --- a/bothan-okx/Cargo.toml +++ b/bothan-okx/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "bothan-okx" -version = "0.1.0" +version = "0.1.1" description = "Rust client for the OKX exchange with Bothan integration" edition.workspace = true license.workspace = true diff --git a/docs/architecture.md b/docs/architecture.md index ae658c3f..4da88634 100644 --- a/docs/architecture.md +++ b/docs/architecture.md @@ -82,7 +82,7 @@ Each provider has its own dedicated module: - `bothan-htx`: Integration with HTX - `bothan-kraken`: Integration with Kraken - `bothan-okx`: Integration with OKX -- `bothan-band`: Integration with Band sources (e.g. band/kiwi, band/macaw) +- `bothan-band`: Integration with Band sources (e.g. band/kiwi, band/macaw, band/owlet, band/fieldfare, band/xenops) These modules implement provider-specific logic while conforming to common interfaces defined in the core components.