diff --git a/Cargo.lock b/Cargo.lock index 169c4cb7..9dfe9bec 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -160,6 +160,18 @@ dependencies = [ "password-hash", ] +[[package]] +name = "arrayref" +version = "0.3.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "76a2e8124351fda1ef8aaaa3bbd7ebbcb486bbcd4225aca0aa0d84bb2db8fecb" + +[[package]] +name = "arrayvec" +version = "0.7.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7c02d123df017efcdfbd739ef81735b36c5ba83ec3c59c80a9d7ecc718f92e50" + [[package]] name = "askama" version = "0.15.4" @@ -517,6 +529,20 @@ dependencies = [ "digest", ] +[[package]] +name = "blake3" +version = "1.8.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2468ef7d57b3fb7e16b576e8377cdbde2320c60e1491e961d11da40fc4f02a2d" +dependencies = [ + "arrayref", + "arrayvec", + "cc", + "cfg-if", + "constant_time_eq", + "cpufeatures", +] + [[package]] name = "block-buffer" version = "0.10.4" @@ -790,6 +816,12 @@ version = "0.9.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c2459377285ad874054d797f3ccebf984978aa39129f6eafde5cdc8315b612f8" +[[package]] +name = "constant_time_eq" +version = "0.4.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3d52eff69cd5e647efe296129160853a42795992097e8af39800e1060caeea9b" + [[package]] name = "convert_case" version = "0.10.0" @@ -864,6 +896,7 @@ dependencies = [ "askama", "async-trait", "axum", + "blake3", "bytes", "chrono", "chrono-tz", @@ -880,6 +913,7 @@ dependencies = [ "fake", "fantoccini", "form_urlencoded", + "fs4", "futures-core", "futures-util", "grass", @@ -1658,6 +1692,17 @@ version = "2.0.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "28dd6caf6059519a65843af8fe2a3ae298b14b80179855aeb4adc2c1934ee619" +[[package]] +name = "fs4" +version = "0.13.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8640e34b88f7652208ce9e88b1a37a2ae95227d84abec377ccd3c5cfeb141ed4" +dependencies = [ + "rustix", + "tokio", + "windows-sys 0.59.0", +] + [[package]] name = "futures" version = "0.3.31" diff --git a/Cargo.toml b/Cargo.toml index 7d5b93e9..1151b348 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -21,7 +21,7 @@ resolver = "2" [workspace.package] edition = "2024" -rust-version = "1.88" +rust-version = "1.89" license = "MIT OR Apache-2.0" homepage = "https://cot.rs" repository = "https://github.com/cot-rs/cot" @@ -67,6 +67,7 @@ async-stream = "0.3" async-trait = "0.1" axum = { version = "0.8", default-features = false } backtrace = "0.3.76" +blake3 = "1.8.3" bytes = "1.11" cargo_toml = "0.22" chrono = { version = "0.4.43", default-features = false } @@ -90,6 +91,7 @@ email_address = "0.2.9" fake = "4" fantoccini = "0.22" form_urlencoded = "1" +fs4 = { version = "0.13.1", features = ["tokio", "sync"] } futures = { version = "0.3", default-features = false } futures-core = { version = "0.3", default-features = false } futures-util = { version = "0.3", default-features = false } diff --git a/cot-macros/src/cache.rs b/cot-macros/src/cache.rs index 7226580e..ff95608b 100644 --- a/cot-macros/src/cache.rs +++ b/cot-macros/src/cache.rs @@ -6,6 +6,7 @@ pub(super) fn fn_to_cache_test(test_fn: &ItemFn) -> TokenStream { let test_fn_name = &test_fn.sig.ident; let memory_ident = format_ident!("{}_memory", test_fn_name); let redis_ident = format_ident!("{}_redis", test_fn_name); + let file_ident = format_ident!("{}_file", test_fn_name); let result = quote! { #[::cot::test] @@ -28,7 +29,17 @@ pub(super) fn fn_to_cache_test(test_fn: &ItemFn) -> TokenStream { cache.cleanup().await.unwrap_or_else(|err| panic!("Failed to cleanup: {err:?}")); #test_fn - } + } + + #[::cot::test] + async fn #file_ident() { + let mut cache = cot::test::TestCache::new_file().unwrap(); + #test_fn_name(&mut cache).await; + + cache.cleanup().await.unwrap_or_else(|err| panic!("Failed to cleanup file cache: {err:?}")); + + #test_fn + } }; result } diff --git a/cot/Cargo.toml b/cot/Cargo.toml index f12f86d3..d39ad3a1 100644 --- a/cot/Cargo.toml +++ b/cot/Cargo.toml @@ -20,6 +20,7 @@ aide = { workspace = true, optional = true } askama = { workspace = true, features = ["std"] } async-trait.workspace = true axum = { workspace = true, features = ["http1", "tokio"] } +blake3.workspace = true bytes.workspace = true chrono = { workspace = true, features = ["alloc", "serde", "clock"] } chrono-tz.workspace = true @@ -34,6 +35,7 @@ digest.workspace = true email_address.workspace = true fake = { workspace = true, optional = true, features = ["derive", "chrono"] } form_urlencoded.workspace = true +fs4 = { workspace = true, optional = true } futures-core.workspace = true futures-util.workspace = true hex.workspace = true @@ -116,7 +118,7 @@ json = ["dep:serde_json", "cot_core/json"] openapi = ["json", "dep:aide", "dep:schemars"] swagger-ui = ["openapi", "dep:swagger-ui-redist"] live-reload = ["dep:tower-livereload"] -cache = ["json"] +cache = ["json", "fs4"] test = [] [lib] diff --git a/cot/src/cache.rs b/cot/src/cache.rs index a51f84fc..e452aa20 100644 --- a/cot/src/cache.rs +++ b/cot/src/cache.rs @@ -122,6 +122,7 @@ use serde::Serialize; use serde::de::DeserializeOwned; use thiserror::Error; +use crate::cache::store::file::FileStore; use crate::cache::store::memory::Memory; #[cfg(feature = "redis")] use crate::cache::store::redis::Redis; @@ -786,9 +787,12 @@ impl Cache { let redis_store = Redis::new(url, pool_size)?; Self::new(redis_store, config.prefix.clone(), config.timeout) } - _ => { - unimplemented!(); + CacheStoreTypeConfig::File { ref path } => { + let file_store = FileStore::new(path.clone())?; + Self::new(file_store, config.prefix.clone(), config.timeout) } + #[expect(unused)] + _ => unimplemented!(), } }; @@ -981,4 +985,36 @@ mod tests { let result = Cache::from_config(&config).await; assert!(result.is_ok()); } + + #[cot::test] + async fn test_cache_from_config_file() { + use std::env; + + use crate::config::{CacheConfig, CacheStoreConfig}; + + let temp_path = + env::temp_dir().join(format!("test_cot_cache_file_store_{}", std::process::id())); + let cloned_path = temp_path.clone(); + + let config = CacheConfig::builder() + .store(CacheStoreConfig { + store_type: CacheStoreTypeConfig::File { path: temp_path }, + }) + .prefix("test_file") + .timeout(Timeout::After(Duration::from_secs(60))) + .build(); + + let cache = Cache::from_config(&config) + .await + .expect("Failed to create cache from config"); + + cache + .insert("test_key", "test_value".to_string()) + .await + .unwrap(); + let retrieved = cache.get("test_key").await.unwrap(); + assert_eq!(retrieved, Some("test_value".to_string())); + + let _ = tokio::fs::remove_dir_all(&cloned_path).await; + } } diff --git a/cot/src/cache/store.rs b/cot/src/cache/store.rs index 10f4c406..468e389e 100644 --- a/cot/src/cache/store.rs +++ b/cot/src/cache/store.rs @@ -5,6 +5,7 @@ //! provide a simple asynchronous interface for storing, retrieving, and //! managing cached values, optionally with expiration policies. +pub mod file; pub mod memory; #[cfg(feature = "redis")] pub mod redis; diff --git a/cot/src/cache/store/file.rs b/cot/src/cache/store/file.rs new file mode 100644 index 00000000..dcb4e9d5 --- /dev/null +++ b/cot/src/cache/store/file.rs @@ -0,0 +1,892 @@ +//! File-based cache store implementation. +//! +//! This store uses the local file system as the backend for caching. It +//! provides atomic writes via sync-then-rename and active validation for +//! TTL-based expiration. +//! +//! # Examples +//! +//! ```no_run +//! # use cot::cache::store::file::FileStore; +//! # use cot::cache::store::CacheStore; +//! # use cot::config::Timeout; +//! # use std::path::PathBuf; +//! # #[tokio::main] +//! # async fn main() { +//! +//! let path = PathBuf::from("./cache_data"); +//! let store = FileStore::new(path).expect("Failed to initialize store"); +//! +//! let key = "example_key".to_string(); +//! let value = serde_json::json!({"data": "example_value"}); +//! +//! store.insert(key.clone(), value.clone(), Timeout::default()).await.unwrap(); +//! +//! let retrieved = store.get(&key).await.unwrap(); +//! assert_eq!(retrieved, Some(value)); +//! # } +//! ``` +//! +//! # Expiration Policy +//! +//! Cache files are evicted on `contains_key` and `get`. +//! No background collector is implemented. +//! +//! # Cache File Format +//! +//! The cache file consists of a timestamp header, which +//! currently is as long as the byte representation of +//! `DateTime`, an i64 integer. +//! +//! | Section | Start-Index | End-Index | Size | +//! |---------------|-------------|-----------|----------------| +//! | Expiry header | 0 | 7 | i64 (8 bytes) | +//! | Cache data | 8 | EOF | length of data | +use std::borrow::Cow; +use std::path::Path; + +use blake3::hash; +use chrono::{DateTime, Utc}; +use cot_core::error::impl_into_cot_error; +use fs4::tokio::AsyncFileExt; +use serde_json::Value; +use thiserror::Error; +use tokio::fs::OpenOptions; +use tokio::io::{AsyncReadExt, AsyncSeekExt, AsyncWriteExt, SeekFrom}; +use tokio::task::spawn_blocking; + +use crate::cache::store::{CacheStore, CacheStoreError, CacheStoreResult}; +use crate::config::Timeout; + +const ERROR_PREFIX: &str = "file-based cache store error:"; +const TEMPFILE_SUFFIX: &str = "tmp"; + +// This is a retry limit for edge-cases where a file has been +// created but failed to be locked immediately, where such case +// happens multiple times. +const INTERNAL_MAX_RETRIES: i32 = 5; + +// This header offset skips exactly one i64 integer, +// which is the basis of our current expiry timestamp +const EXPIRY_HEADER_OFFSET: usize = size_of::(); + +/// Errors specific to the file based cache store. +#[derive(Debug, Error)] +#[non_exhaustive] +pub enum FileCacheStoreError { + /// An error occured during directory creation + #[error("{ERROR_PREFIX} directory creation error: {0}")] + DirCreation(Box), + + /// An error occured during temp file creation + #[error("{ERROR_PREFIX} temporary file creation error: {0}")] + TempFileCreation(Box), + + /// An error occured during write/stream file + #[error("{ERROR_PREFIX} I/O error: {0}")] + Io(Box), + + /// An error occured during data serialization + #[error("{ERROR_PREFIX} serialization error: {0}")] + Serialize(Box), + + /// An error occured during data deserialization + #[error("{ERROR_PREFIX} deserialization error: {0}")] + Deserialize(Box), +} + +impl_into_cot_error!(FileCacheStoreError); + +impl From for CacheStoreError { + fn from(err: FileCacheStoreError) -> Self { + let full = err.to_string(); + + match err { + FileCacheStoreError::Serialize(_) => CacheStoreError::Serialize(full), + FileCacheStoreError::Deserialize(_) => CacheStoreError::Deserialize(full), + _ => CacheStoreError::Backend(full), + } + } +} + +/// A file-backed cache store implementation. +/// +/// This store uses the local file system for caching. +/// +/// # Examples +/// ```no_run +/// use std::path::Path; +/// +/// use cot::cache::store::file::FileStore; +/// +/// let store = FileStore::new(Path::new("./cache_dir")).unwrap(); +/// ``` +#[derive(Debug, Clone)] +pub struct FileStore { + dir_path: Cow<'static, Path>, +} + +impl FileStore { + /// Creates a new `FileStore` at the specified directory. + /// + /// This will attempt to create the directory and its parents if they do not + /// exist. + /// + /// # Errors + /// + /// Returns [`FileCacheStoreError::DirCreation`] if the directory cannot be + /// created due to permissions or other I/O issues. + /// + /// # Examples + /// + /// ```no_run + /// use std::path::PathBuf; + /// + /// use cot::cache::store::file::FileStore; + /// + /// // Using a string slice + /// let path = PathBuf::from("./cache"); + /// let store = FileStore::new(path).unwrap(); + /// + /// // Using a PathBuf + /// let path = PathBuf::from("/var/lib/myapp/cache"); + /// let store = FileStore::new(path).unwrap(); + /// ``` + pub fn new(dir: impl Into>) -> CacheStoreResult { + let dir_path = dir.into(); + + let store = Self { dir_path }; + store.create_dir_root_sync()?; + + Ok(store) + } + + fn create_dir_root_sync(&self) -> CacheStoreResult<()> { + std::fs::create_dir_all(&self.dir_path) + .map_err(|e| FileCacheStoreError::DirCreation(Box::new(e)))?; + + // When a crash happens mid-flight, we'll may have some .tmp files + // This ensures that we have no .tmp files lingering around on startup + if let Ok(entries) = std::fs::read_dir(&self.dir_path) { + for entry in entries.flatten() { + let path = entry.path(); + + if path.extension().is_some_and(|ext| ext == TEMPFILE_SUFFIX) + && let Ok(file) = std::fs::OpenOptions::new() + .write(true) + .create(true) + .truncate(false) + .open(&path) + && file.try_lock().is_ok() + { + let _ = std::fs::remove_file(path); + } + } + } + + Ok(()) + } + + async fn create_dir_root(&self) -> CacheStoreResult<()> { + tokio::fs::create_dir_all(&self.dir_path) + .await + .map_err(|e| FileCacheStoreError::DirCreation(Box::new(e)))?; + + Ok(()) + } + + async fn write(&self, key: String, value: Value, expiry: Timeout) -> CacheStoreResult<()> { + let key_hash = FileStore::create_key_hash(&key); + let (mut file, file_path) = self.create_file_temp(&key_hash).await?; + + self.serialize_data(value, expiry, &mut file, &file_path) + .await?; + + // Here's the flow: + // We sync the data after writing to it. This ensures that we get the correctly + // written data even if the later ops fail. We unlock first to let + // progress move. This is free when there are no concurrent accessor. If we + // rename then unlock, the final file gets locked instead when subsequent + // readers progress, defeating the purpose of this method. We create a + // new file handle to see whether the file still exists or not. If it doesn't + // exist, we move on. It's most likely has been deleted by + // `create_dir_root_sync` or `clear`. Finally, we rename only if the + // thread reasonably hold the "last" lock to the .tmp. This doesn't + // guarantee that no .tmp file will be renamed by another thread, but it pushes + // the guarantees towards "writers write and hold in .tmp file, not contesting + // the final file" + file.sync_data() + .await + .map_err(|e| FileCacheStoreError::Io(Box::new(e)))?; + + file.unlock_async() + .await + .map_err(|e| FileCacheStoreError::Io(Box::new(e)))?; + + // The return `Ok(())`on these two methods + // are to anticipate the "unlocked file" window mentioned in + // `create_file_temp()`. + + let new_file_handle = match OpenOptions::new().read(true).open(&file_path).await { + Ok(handle) => handle, + Err(e) if e.kind() == std::io::ErrorKind::NotFound => return Ok(()), + Err(e) => return Err(FileCacheStoreError::Io(Box::new(e)))?, + }; + + if new_file_handle.try_lock_shared().is_ok_and(|lock| lock) + && let Err(e) = tokio::fs::rename(&file_path, self.dir_path.join(&key_hash)).await + { + if e.kind() == std::io::ErrorKind::NotFound { + return Ok(()); + } + + return Err(FileCacheStoreError::Io(Box::new(e)))?; + } + + Ok(()) + } + + async fn read(&self, key: &str) -> CacheStoreResult> { + let Some((mut file, file_path)) = self.open_file_for_reading(key).await? else { + return Ok(None); + }; + + self.deserialize_data(&mut file, &file_path).await + } + + fn create_key_hash(key: &str) -> String { + let key_hash_hex = hash(key.as_bytes()); + format!("{key_hash_hex}") + } + + async fn serialize_data( + &self, + value: Value, + expiry: Timeout, + file: &mut tokio::fs::File, + file_path: &Path, + ) -> CacheStoreResult<()> { + let result = async { + let timeout = expiry.canonicalize(); + let seconds: i64 = match timeout { + Timeout::Never => i64::MAX, + Timeout::AtDateTime(date_time) => date_time.timestamp(), + Timeout::After(_) => unreachable!("should've been converted by canonicalize"), + }; + + let data = serde_json::to_vec(&value) + .map_err(|e| FileCacheStoreError::Serialize(Box::new(e)))?; + + file.write_all(&seconds.to_le_bytes()) + .await + .map_err(|e| FileCacheStoreError::Io(Box::new(e)))?; + + file.write_all(&data) + .await + .map_err(|e| FileCacheStoreError::Io(Box::new(e)))?; + + Ok(()) + } + .await; + + if result.is_err() { + let _ = tokio::fs::remove_file(file_path).await; + } + + result + } + + // Check expiry also removes the file + // when expired. This makes the read + // process more efficient with less + // error propagation + async fn check_expiry( + &self, + file: &mut tokio::fs::File, + file_path: &Path, + ) -> CacheStoreResult { + let mut header: [u8; EXPIRY_HEADER_OFFSET] = [0; EXPIRY_HEADER_OFFSET]; + + let _ = file + .read_exact(&mut header) + .await + .map_err(|e| FileCacheStoreError::Deserialize(Box::new(e)))?; + let seconds = i64::from_le_bytes(header); + // This may look inefficient, but this ensures portability + // By making this method reset its own cursor, + // the logic is reusable without the risk of forgetting to reset cursor + file.seek(SeekFrom::Start(0)) + .await + .map_err(|e| FileCacheStoreError::Io(Box::new(e)))?; + + let expiry = if seconds == i64::MAX { + Timeout::Never + } else { + let date_time = DateTime::from_timestamp(seconds, 0) + .ok_or_else(|| FileCacheStoreError::Deserialize("date time corrupted".into()))? + .with_timezone(&Utc) + .fixed_offset(); + Timeout::AtDateTime(date_time) + }; + + if expiry.is_expired(None) { + tokio::fs::remove_file(file_path) + .await + .map_err(|e| FileCacheStoreError::Io(Box::new(e)))?; + return Ok(false); + } + + Ok(true) + } + + async fn deserialize_data( + &self, + file: &mut tokio::fs::File, + file_path: &Path, + ) -> CacheStoreResult> { + if !self.check_expiry(file, file_path).await? { + return Ok(None); + } + + let mut buffer = Vec::new(); + + // Advances cursor by the expiry header offset + // EXPIRY_HEADER_OFFSET is a usize that stores + // the size of i64. It is unlikely that this will + // overflow. + // This direct cast currently works without any other + // wrapping addition or fallback conversion + file.seek(SeekFrom::Start(EXPIRY_HEADER_OFFSET as u64)) + .await + .map_err(|e| FileCacheStoreError::Io(Box::new(e)))?; + file.read_to_end(&mut buffer) + .await + .map_err(|e| FileCacheStoreError::Io(Box::new(e)))?; + + let value: Value = serde_json::from_slice(&buffer) + .map_err(|e| FileCacheStoreError::Deserialize(Box::new(e)))?; + + Ok(Some(value)) + } + + async fn create_file_temp( + &self, + key_hash: &str, + ) -> CacheStoreResult<(tokio::fs::File, std::path::PathBuf)> { + let temp_path = self.dir_path.join(format!("{key_hash}.{TEMPFILE_SUFFIX}")); + + // We must let the loop to propagate upwards to catch sudden missing cache + // directory Then it would be easier for us to wait for file creation + // where we offload one lock check into the OS by using `create_new()`. So, the + // flow looks like this, + // 1. `create_new()` -> fail, we check if the error is AlreadyExists. + // 2. In a condition where (1) is triggered, we park task into a blocking thread + // waiting for the lock to move. + // 3. The blocking thread will only wait for the existing file in the temp_path + // + // This approach was chosen because we can't possibly (at least for now) to + // create a file AND lock that file atomically. A window where the existing file + // may get renamed or deleted is expected. Therefore, the blocking task is a + // pessimistic write. + + let mut retry_count = 0; + let temp_file = loop { + match OpenOptions::new() + .write(true) + .read(true) + .create_new(true) + .truncate(true) + .open(&temp_path) + .await + { + Ok(handle) => { + if let Ok(lock_acquired) = handle.try_lock_exclusive() + && lock_acquired + { + break handle; + } + + // We try again when we can't lock maybe due to external processes. + // However, if failure keeps happening here, we must abort. + retry_count += 1; + if retry_count > INTERNAL_MAX_RETRIES { + return Err(FileCacheStoreError::TempFileCreation(Box::new( + std::io::Error::new( + std::io::ErrorKind::TimedOut, + "permission error or directory is busy", + ), + )))?; + } + continue; + } + // Trigger to enter the task handoff + Err(e) if e.kind() == std::io::ErrorKind::AlreadyExists => { + let cloned_temp_path = temp_path.clone(); + let new_temp_file: Result = + spawn_blocking(move || { + let Ok(file_handle) = std::fs::OpenOptions::new() + .write(true) + .read(true) + .create(true) + .truncate(false) + .open(cloned_temp_path) + else { + return Err(FileCacheStoreError::Io(Box::new(e))); + }; + + let Ok(()) = file_handle.lock() else { + return Err(FileCacheStoreError::TempFileCreation(Box::new(e))); + }; + + let Ok(()) = file_handle.set_len(0) else { + return Err(FileCacheStoreError::TempFileCreation(Box::new(e))); + }; + + Ok(tokio::fs::File::from_std(file_handle)) + }) + .await + .map_err(|e| FileCacheStoreError::TempFileCreation(Box::new(e)))?; + break new_temp_file?; + } + // Trigger to create the new directory + Err(e) if e.kind() == std::io::ErrorKind::NotFound => self + .create_dir_root() + .await + .map_err(|e| FileCacheStoreError::DirCreation(Box::new(e)))?, + Err(e) => { + return Err(FileCacheStoreError::TempFileCreation(Box::new(e)))?; + } + } + }; + + Ok((temp_file, temp_path)) + } + + async fn open_file_for_reading( + &self, + key: &str, + ) -> CacheStoreResult> { + let key_hash = FileStore::create_key_hash(key); + let path = self.dir_path.join(&key_hash); + match OpenOptions::new().read(true).open(&path).await { + Ok(f) => Ok(Some((f, path))), + Err(e) if e.kind() == std::io::ErrorKind::NotFound => Ok(None), + Err(e) => Err(FileCacheStoreError::Io(Box::new(e)).into()), + } + } +} + +impl CacheStore for FileStore { + async fn get(&self, key: &str) -> CacheStoreResult> { + match self.read(key).await? { + Some(value) => Ok(Some(value)), + None => Ok(None), + } + } + + async fn insert(&self, key: String, value: Value, expiry: Timeout) -> CacheStoreResult<()> { + self.write(key, value, expiry).await?; + Ok(()) + } + + async fn remove(&self, key: &str) -> CacheStoreResult<()> { + if let Some((_file, file_path)) = self.open_file_for_reading(key).await? { + tokio::fs::remove_file(file_path) + .await + .map_err(|e| FileCacheStoreError::Io(Box::new(e)))?; + } + + Ok(()) + } + + async fn clear(&self) -> CacheStoreResult<()> { + // First, try to remove the whole thing. + // If failure happens, we fallback to iterative removal + if tokio::fs::remove_dir_all(&self.dir_path).await.is_ok() { + tokio::fs::create_dir_all(&self.dir_path) + .await + .map_err(|e| FileCacheStoreError::DirCreation(Box::new(e)))?; + return Ok(()); + } + + if let Ok(entries) = std::fs::read_dir(&self.dir_path) { + for entry in entries.flatten() { + let path = entry.path(); + if let Ok(file) = std::fs::OpenOptions::new() + .write(true) + .truncate(false) + .open(&path) + && file.try_lock().is_ok() + { + let _ = std::fs::remove_file(&path); + } + } + } + + Ok(()) + } + + async fn approx_size(&self) -> CacheStoreResult { + let mut entries = match tokio::fs::read_dir(&self.dir_path).await { + Ok(e) => e, + Err(e) if e.kind() == std::io::ErrorKind::NotFound => return Ok(0), + Err(e) => return Err(FileCacheStoreError::Io(Box::new(e)).into()), + }; + + let mut total_size: usize = 0; + + while let Ok(Some(entry)) = entries.next_entry().await { + let path = entry.path(); + let is_temp = path.extension().is_some_and(|ext| ext == TEMPFILE_SUFFIX); + + if let Ok(meta) = entry.metadata().await + && meta.is_file() + && !is_temp + { + total_size += 1; + } + } + + Ok(total_size) + } + + async fn contains_key(&self, key: &str) -> CacheStoreResult { + let Ok(Some((mut file, file_path))) = self.open_file_for_reading(key).await else { + return Ok(false); + }; + + // Cache eviction on contains_key() based on TTL + self.check_expiry(&mut file, &file_path).await + } +} + +#[cfg(test)] +mod tests { + use std::time::Duration; + + use chrono::Utc; + use tempfile::tempdir; + + use crate::cache::store::file::{FileCacheStoreError, FileStore}; + use crate::cache::store::{CacheStore, CacheStoreError}; + use crate::config::Timeout; + + fn make_store_path() -> std::path::PathBuf { + tempdir().expect("failed to create dir").keep() + } + + #[cot::test] + async fn test_create_dir() { + let path = make_store_path(); + let _ = FileStore::new(path.clone()).expect("failed to init store"); + + assert!(path.exists()); + assert!(path.is_dir()); + + tokio::fs::remove_dir_all(path) + .await + .expect("failed to cleanup tempdir"); + } + + #[cot::test] + async fn test_create_dir_on_existing() { + let path = make_store_path(); + let _ = FileStore::new(path.clone()).expect("failed to init store"); + let _ = FileStore::new(path.clone()).expect("failed to init second store"); + + assert!(path.exists()); + assert!(path.is_dir()); + + tokio::fs::remove_dir_all(path) + .await + .expect("failed to cleanup tempdir"); + } + + #[cot::test] + async fn test_insert_and_read_single() { + let path = make_store_path(); + + let store = FileStore::new(path.clone()).expect("failed to init store"); + let key = "test_key".to_string(); + let value = serde_json::json!({ "id": 1, "message": "hello world" }); + + store + .insert(key.clone(), value.clone(), Timeout::Never) + .await + .expect("failed to insert data to store"); + + let retrieved = store.read(&key).await.expect("failed to read from store"); + + assert!(retrieved.is_some(), "retrieved value should not be None"); + assert_eq!( + retrieved.unwrap(), + value, + "retrieved value does not match inserted value" + ); + + let _ = tokio::fs::remove_dir_all(&path).await; + } + + #[cot::test] + async fn test_insert_and_read_after_delete_single() { + let path = make_store_path(); + + let store = FileStore::new(path.clone()).expect("failed to init store"); + let key = "test_key".to_string(); + let value = serde_json::json!({ "id": 1, "message": "hello world" }); + + store + .insert(key.clone(), value.clone(), Timeout::Never) + .await + .expect("failed to insert data to store"); + + store.remove(&key).await.expect("failed to delete entry"); + + let retrieved = store.read(&key).await.expect("failed to read from store"); + assert!(retrieved.is_none(), "retrieved value should not be Some"); + + let _ = tokio::fs::remove_dir_all(&path).await; + } + + #[cot::test] + async fn test_clear_double_free() { + let path = make_store_path(); + + let store = FileStore::new(path.clone()).expect("failed to init store"); + let key = "test_key".to_string(); + let value = serde_json::json!({ "id": 1, "message": "hello world" }); + + store + .insert(key.clone(), value.clone(), Timeout::Never) + .await + .expect("failed to insert data to store"); + + store.clear().await.expect("failed to clear"); + store + .clear() + .await + .expect("failed to clear the second time"); + + let retrieved = store.read(&key).await.expect("failed to read from store"); + + assert!(path.is_dir(), "path must be dir"); + assert!(retrieved.is_none(), "retrieved value should not be Some"); + + let _ = tokio::fs::remove_dir_all(&path).await; + } + + #[cot::test] + async fn test_approx_size() { + let path = make_store_path(); + + let store = FileStore::new(path.clone()).expect("failed to init store"); + let key = "test_key".to_string(); + let key_2 = "test_key_2".to_string(); + + let value = serde_json::json!({ "id": 1, "message": "hello world" }); + + store + .insert(key.clone(), value.clone(), Timeout::Never) + .await + .expect("failed to insert data to store"); + store + .insert(key.clone(), value.clone(), Timeout::Never) + .await + .expect("failed to insert data to store"); + store + .insert(key_2.clone(), value.clone(), Timeout::Never) + .await + .expect("failed to insert data to store"); + + let data_length: usize = 2; + + let entry_length = store + .approx_size() + .await + .expect("failed to get approx file"); + + assert_eq!(data_length, entry_length); + + let _ = tokio::fs::remove_dir_all(&path).await; + } + + #[cot::test] + async fn test_contains_key() { + let path = make_store_path(); + + let store = FileStore::new(path.clone()).expect("failed to init store"); + let key = "test_key".to_string(); + let value = serde_json::json!({ "id": 1, "message": "hello world" }); + + store + .insert(key.clone(), value.clone(), Timeout::Never) + .await + .expect("failed to insert data to store"); + + let exist = store + .contains_key(&key) + .await + .expect("failed to check key existence"); + + assert!(exist); + + let _ = tokio::fs::remove_dir_all(&path).await; + } + + #[cot::test] + async fn test_expiration_integrity() { + let path = make_store_path(); + + let store = FileStore::new(path.clone()).expect("failed to init store"); + let key = "test_key".to_string(); + let value = serde_json::json!({ "id": 1, "message": "hello world" }); + + let past = Utc::now() - Duration::from_secs(1); + let past_fixed = past.fixed_offset(); + let expiry = Timeout::AtDateTime(past_fixed); + + store + .insert(key.clone(), value.clone(), expiry) + .await + .expect("failed to insert data to store"); + + // test file is None + let retrieved = store.get(&key).await.expect("failed to read from store"); + assert!(retrieved.is_none()); + + // test file doesn't exist + let exist = store + .contains_key(&key) + .await + .expect("failed to check key existence"); + assert!(!exist); + + // test size is 0 + let size = store.approx_size().await.expect("failed to check size"); + assert_eq!(size, 0); + + let _ = tokio::fs::remove_dir_all(&path).await; + } + + // Ignored in miri since it currently doesn't support + // blocking lock operation + #[cfg_attr(miri, ignore)] + #[cot::test] + async fn test_interference_during_write() { + let path = make_store_path(); + let store = FileStore::new(path.clone()).expect("failed to init"); + let key = "test_key".to_string(); + let value = serde_json::json!({ "id": 1 }); + + let handles: Vec<_> = (0..10) + .map(|_| { + let (s, k, v) = (store.clone(), key.clone(), value.clone()); + tokio::spawn(async move { s.insert(k, v, Timeout::Never).await }) + }) + .collect(); + + let _store_2 = FileStore::new(path.clone()).expect("failed to init interference"); + + for h in handles { + h.await.unwrap().expect("insert failed"); + } + + let res = store.read(&key).await.expect("read failed"); + assert_eq!(res.unwrap(), value); + let _ = tokio::fs::remove_dir_all(&path).await; + } + + #[cfg_attr(miri, ignore)] + #[cot::test] + async fn test_clear_during_write() { + let path = make_store_path(); + let store = FileStore::new(path.clone()).expect("failed to init"); + let _ = tokio::fs::remove_dir_all(&path).await; + + let mut handles = Vec::new(); + for i in 0..10 { + let s = store.clone(); + handles.push(tokio::spawn(async move { + if i % 2 == 0 { + let _ = s + .insert("k".into(), serde_json::json!({"i": i}), Timeout::Never) + .await; + } else { + let _ = s.clear().await; + } + })); + } + + for h in handles { + h.await.unwrap(); + } + let _ = tokio::fs::remove_dir_all(&path).await; + } + + #[cfg_attr(miri, ignore)] + #[cot::test] + async fn test_thundering_write() { + let path = make_store_path(); + let store = FileStore::new(path.clone()).expect("failed to init"); + let value = serde_json::json!({ "id": 1 }); + + let tasks: Vec<_> = (0..10) + .map(|_| { + let s = store.clone(); + let v = value.clone(); + tokio::spawn(async move { s.insert("key".into(), v, Timeout::Never).await }) + }) + .collect(); + + for h in tasks { + h.await.unwrap().expect("task panicked"); + } + + let retrieved = store.read("key").await.expect("failed to read from store"); + assert_eq!(retrieved.unwrap(), value); + + let _ = tokio::fs::remove_dir_all(&path).await; + } + + #[cot::test] + async fn test_from_file_cache_store_error_to_cache_store_error() { + let file_error = FileCacheStoreError::Io(Box::new(std::io::Error::other("disk failure"))); + let cache_error: CacheStoreError = file_error.into(); + assert_eq!( + cache_error.to_string(), + "cache store error: backend error: file-based cache store error: I/O error: disk failure" + ); + + let file_error = + FileCacheStoreError::Serialize(Box::new(std::io::Error::other("json fail"))); + let cache_error: CacheStoreError = file_error.into(); + assert_eq!( + cache_error.to_string(), + "cache store error: serialization error: file-based cache store error: serialization error: json fail" + ); + + let file_error = + FileCacheStoreError::Deserialize(Box::new(std::io::Error::other("corrupt header"))); + let cache_error: CacheStoreError = file_error.into(); + assert_eq!( + cache_error.to_string(), + "cache store error: deserialization error: file-based cache store error: deserialization error: corrupt header" + ); + + let file_error = + FileCacheStoreError::DirCreation(Box::new(std::io::Error::other("permission denied"))); + let cache_error: CacheStoreError = file_error.into(); + assert_eq!( + cache_error.to_string(), + "cache store error: backend error: file-based cache store error: directory creation error: permission denied" + ); + + let file_error = + FileCacheStoreError::TempFileCreation(Box::new(std::io::Error::other("no space left"))); + let cache_error: CacheStoreError = file_error.into(); + assert_eq!( + cache_error.to_string(), + "cache store error: backend error: file-based cache store error: temporary file creation error: no space left" + ); + } +} diff --git a/cot/src/test.rs b/cot/src/test.rs index 2beee985..997b2037 100644 --- a/cot/src/test.rs +++ b/cot/src/test.rs @@ -4,6 +4,8 @@ use std::any::Any; use std::future::poll_fn; use std::marker::PhantomData; use std::net::{IpAddr, Ipv4Addr, SocketAddr}; +#[cfg(feature = "cache")] +use std::path::PathBuf; use std::sync::Arc; use async_trait::async_trait; @@ -27,6 +29,10 @@ use crate::auth::{Auth, AuthBackend, NoAuthBackend, User, UserId}; #[cfg(feature = "cache")] use crate::cache::Cache; #[cfg(feature = "cache")] +use crate::cache::store::file::FileCacheStoreError; +#[cfg(feature = "cache")] +use crate::cache::store::file::FileStore; +#[cfg(feature = "cache")] use crate::cache::store::memory::Memory; #[cfg(feature = "redis")] use crate::cache::store::redis::Redis; @@ -1763,6 +1769,9 @@ enum CacheKind { #[expect(unused)] allocator: RedisDbAllocator, }, + File { + path: PathBuf, + }, } /// A test cache. @@ -1897,6 +1906,59 @@ impl TestCache { Ok(this) } + /// Create a new file store test cache + /// + /// This will create a new directory for the test cache instance + /// in `{tmp_dir}/cot_test_cache_run_{i}` where `tmp_dir` is + /// the path returned by `std::env::temp_dir()` and `i` is the + /// instance number of the directory. The constructor will iterate + /// over the base path before committing the directory where the + /// cache directory will be created. + /// + /// # Examples + /// + /// ```no_run + /// use cot::test::TestCache; + /// + /// # #[tokio::main] + /// # async fn main() -> cot::Result<()> { + /// let test_cache = TestCache::new_file()?; + /// let cache = test_cache.cache(); + /// + /// // do something with the cache + /// + /// # Ok(()) + /// # } + /// ``` + /// + /// # Errors + /// + /// Returns error if the cache directory could not be created + pub fn new_file() -> Result { + let base_path = std::env::temp_dir().join(format!("cot_test_cache_{}", std::process::id())); + + let _ = std::fs::create_dir_all(&base_path); + + let mut i = 0; + let path = loop { + let candidate = base_path.join(format!("run_{i}")); + + match std::fs::create_dir(&candidate) { + Ok(()) => break candidate, + Err(e) if e.kind() == std::io::ErrorKind::AlreadyExists => { + i += 1; + } + Err(e) => return Err(FileCacheStoreError::DirCreation(Box::new(e)))?, + } + }; + + let file_store = FileStore::new(path.clone())?; + + let cache = Cache::new(file_store, Some("dev_test".to_string()), Timeout::default()); + + Ok(Self::new(cache, CacheKind::File { path })) + } + /// Get the cache. /// /// # Examples @@ -1946,6 +2008,9 @@ impl TestCache { if let CacheKind::Redis { allocator: _ } = &self.kind { self.cache.clear().await?; } + if let CacheKind::File { path } = &self.kind { + let _ = tokio::fs::remove_dir_all(path).await; + } Ok(()) } } diff --git a/deny.toml b/deny.toml index d1a9a769..e4fab4ac 100644 --- a/deny.toml +++ b/deny.toml @@ -23,4 +23,5 @@ allow = [ "0BSD", "BSD-3-Clause", "Zlib", + "BSD-2-Clause", ]