From 8e9dbc93f39d5c99a4c7fcb326c842287ca73c38 Mon Sep 17 00:00:00 2001 From: Robert Escriva Date: Fri, 4 Oct 2024 16:27:28 -0700 Subject: [PATCH] [ENH] Disk and memory-backed cache with Foyer 0.10. (#2890) ## Description of changes The lfu/lru/weighted_lfu caches didn't initialize a hybrid cache. This PR rewrites the chroma_cache crate to hide implementation details of the cache as much as possible. Noteworthy callouts from this PR: - We don't currently support cleaning up of tempfiles evicted from cache. This is status-quo. - cops-disk-cache-config-writer and cops-memory-cache-config-writer provide command-line arguments to generate the YAML for the Foyer cache. - Cache is a trait. PersistentCache allows for requiring persistence. All PersistentCache can be cast to Cache. ## Test plan *How are these changes tested?* - [] Tests pass locally with `pytest` for python, `yarn test` for js, `cargo test` for rust ## Documentation Changes *Are all docstrings for user-facing APIs updated if required? Do we need to make documentation changes in the [docs repository](https://github.com/chroma-core/docs)?* --- Cargo.lock | 221 +++++++--- .../blockstore/src/arrow/block/delta/delta.rs | 20 +- rust/blockstore/src/arrow/block/types.rs | 113 +++++- rust/blockstore/src/arrow/blockfile.rs | 71 ++-- rust/blockstore/src/arrow/concurrency_test.rs | 17 +- rust/blockstore/src/arrow/config.rs | 2 +- rust/blockstore/src/arrow/provider.rs | 47 ++- rust/blockstore/src/arrow/sparse_index.rs | 9 +- rust/blockstore/src/config.rs | 2 +- rust/blockstore/src/provider.rs | 17 +- rust/cache/Cargo.toml | 8 +- .../src/bin/cops-disk-cache-config-writer.rs | 14 + .../bin/cops-memory-cache-config-writer.rs | 12 + rust/cache/src/cache.rs | 263 ------------ rust/cache/src/config.rs | 32 -- rust/cache/src/foyer.rs | 377 ++++++++++++++++++ rust/cache/src/lib.rs | 128 ++++-- rust/cache/src/nop.rs | 25 ++ rust/cache/src/unbounded.rs | 69 ++++ rust/index/benches/full_text.rs | 9 +- rust/index/src/config.rs | 2 +- rust/index/src/hnsw_provider.rs | 52 +-- .../src/compactor/compaction_manager.rs | 12 +- rust/worker/src/config.rs | 197 +++++++-- .../operators/hydrate_metadata_results.rs | 6 +- .../execution/operators/metadata_filtering.rs | 18 +- .../src/segment/distributed_hnsw_segment.rs | 5 +- rust/worker/src/segment/metadata_segment.rs | 21 +- rust/worker/src/segment/types.rs | 21 +- rust/worker/src/server.rs | 9 +- 30 files changed, 1205 insertions(+), 594 deletions(-) create mode 100644 rust/cache/src/bin/cops-disk-cache-config-writer.rs create mode 100644 rust/cache/src/bin/cops-memory-cache-config-writer.rs delete mode 100644 rust/cache/src/cache.rs delete mode 100644 rust/cache/src/config.rs create mode 100644 rust/cache/src/foyer.rs create mode 100644 rust/cache/src/nop.rs create mode 100644 rust/cache/src/unbounded.rs diff --git a/Cargo.lock b/Cargo.lock index dc159776e25..5e5efad13ab 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -134,6 +134,21 @@ version = "1.7.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "69f7f8c3906b62b754cd5326047894316021dcfe5a194c8ea52bdd94934a3457" +[[package]] +name = "array-util" +version = "1.0.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7e509844de8f09b90a2c3444684a2b6695f4071360e13d2fda0af9f749cc2ed6" +dependencies = [ + "arrayvec", +] + +[[package]] +name = "arrayvec" +version = "0.7.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7c02d123df017efcdfbd739ef81735b36c5ba83ec3c59c80a9d7ecc718f92e50" + [[package]] name = "arrow" version = "52.0.0" @@ -986,12 +1001,6 @@ version = "2.4.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ed570934406eb16438a4e976b1b4500774099c13b8cb96eec99f620f05090ddf" -[[package]] -name = "bitmaps" -version = "3.2.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a1d084b0137aaa901caf9f1e8b21daa6aa24d41cd806e111335541eff9683bd6" - [[package]] name = "bitpacking" version = "0.8.4" @@ -1086,6 +1095,12 @@ dependencies = [ "pkg-config", ] +[[package]] +name = "cache-padded" +version = "1.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "981520c98f422fcc584dc1a95c334e6953900b9106bc47a9839b81790009eb21" + [[package]] name = "cast" version = "0.3.0" @@ -1116,9 +1131,9 @@ checksum = "baf1de4339761588bc0619e3cbc0120ee582ebb74b53b4efbf79117bd2da40fd" [[package]] name = "cfg_aliases" -version = "0.1.1" +version = "0.2.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "fd16c4719339c4530435d38e511904438d07cce7950afa3718a84ac36c10e89e" +checksum = "613afe47fcd5fac7ccf1db93babcb082c5994d996f20b8b159f2ad1658eb5724" [[package]] name = "chroma" @@ -1160,14 +1175,19 @@ dependencies = [ name = "chroma-cache" version = "0.1.0" dependencies = [ + "anyhow", "async-trait", "chroma-config", "chroma-error", "chroma-types", + "clap", "foyer", "parking_lot", "serde", + "serde_yaml", "thiserror", + "tracing", + "uuid", ] [[package]] @@ -1626,6 +1646,16 @@ dependencies = [ "memchr", ] +[[package]] +name = "ctor" +version = "0.1.26" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6d2301688392eb071b0bf1a37be05c469d3cc4dbbd95df672fe28ab021e6a096" +dependencies = [ + "quote", + "syn 1.0.109", +] + [[package]] name = "darling" version = "0.14.4" @@ -1970,55 +2000,59 @@ dependencies = [ [[package]] name = "foyer" -version = "0.8.9" +version = "0.10.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6e7321845edd6be7f1d505413409d119cddf1fc110197d1d0606fb28cbec6d28" +checksum = "ca1a3a9bf97a9d592d69c2210c5524668788f72ef1b7ea3054cfc1f8a3fb1257" dependencies = [ "ahash", "anyhow", "foyer-common", - "foyer-intrusive", "foyer-memory", "foyer-storage", + "futures", + "madsim-tokio", + "minitrace", + "pin-project", + "tracing", ] [[package]] name = "foyer-common" -version = "0.6.4" +version = "0.8.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "deef44aeda48784c3ec1316fe64ed8d9cacbd50b37afcfd1aa211d4eb6360b61" +checksum = "e109f1fd012cc2f0785db5711c73c388fca2d2ac6e2aabb4eaa3a8af98b4dcdb" dependencies = [ - "ahash", "bytes", "cfg-if", "crossbeam", + "futures", "hashbrown 0.14.3", - "itertools 0.12.1", + "itertools 0.13.0", "madsim-tokio", + "metrics", + "minitrace", "nix", "parking_lot", + "pin-project", "rustversion", "serde", ] [[package]] name = "foyer-intrusive" -version = "0.5.3" +version = "0.8.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7145b59837d77f0090582f5631a6e36f01f09e451ab82685fdba6c9bdac7bd0d" +checksum = "2e7ad91ad11f2c94bb4b3b88acd447f3145224415d776f163d2b06a9ed24e63a" dependencies = [ - "bytes", "foyer-common", - "itertools 0.12.1", - "memoffset", - "parking_lot", + "itertools 0.13.0", ] [[package]] name = "foyer-memory" -version = "0.3.6" +version = "0.6.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c28baff47d3b1e4a4d416186066ec7bd590b7b50b6332a25e62fab1686904570" +checksum = "29273589433f89d61347b196754bfa22357420d1adbf815d354c23512106a194" dependencies = [ "ahash", "bitflags 2.4.2", @@ -2027,39 +2061,42 @@ dependencies = [ "foyer-intrusive", "futures", "hashbrown 0.14.3", - "itertools 0.12.1", + "itertools 0.13.0", "libc", "madsim-tokio", + "minitrace", "parking_lot", + "pin-project", "serde", + "tracing", ] [[package]] name = "foyer-storage" -version = "0.7.6" +version = "0.9.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e3ec3638bdc66bc1c83aba27805770d2cafd18ec553e337e2563d69a5676898b" +checksum = "19f533cf09e39963cc3886f9f165d235c7d4f937f4b09b48c09827ac158876f4" dependencies = [ "ahash", "allocator-api2", "anyhow", + "array-util", + "async-channel", "bincode", "bitflags 2.4.2", - "bitmaps", "bytes", "either", "foyer-common", "foyer-memory", "futures", - "itertools 0.12.1", + "itertools 0.13.0", "lazy_static", "libc", "lz4", "madsim-tokio", - "memoffset", + "minitrace", "parking_lot", - "paste", - "prometheus", + "pin-project", "rand", "serde", "thiserror", @@ -2699,9 +2736,9 @@ dependencies = [ [[package]] name = "itertools" -version = "0.12.1" +version = "0.13.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ba291022dbbd398a455acf126c1e341954079855bc60dfdda641363bd6922569" +checksum = "413ee7dfc52ee1a4949ceeb7dbc8a33f2d6c088194d9f922fb8318faf1f01186" dependencies = [ "either", ] @@ -2953,9 +2990,9 @@ dependencies = [ [[package]] name = "libc" -version = "0.2.153" +version = "0.2.159" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9c198f91728a82281a64e1f4f9eeb25d82cb32a5de251c6bd1b5154d63a8e7bd" +checksum = "561d97a539a36e26a9a5fad1ea11a3039a67714694aaa379433e580854bc3dc5" [[package]] name = "libm" @@ -3159,12 +3196,13 @@ dependencies = [ ] [[package]] -name = "memoffset" -version = "0.9.1" +name = "metrics" +version = "0.23.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "488016bfae457b036d996092f6cb448677611ce4449e970ceaf42695203f218a" +checksum = "884adb57038347dfbaf2d5065887b6cf4312330dc8e94bc30a1a839bd79d3261" dependencies = [ - "autocfg", + "ahash", + "portable-atomic", ] [[package]] @@ -3179,6 +3217,33 @@ version = "0.2.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "68354c5c6bd36d73ff3feceb05efa59b6acb7626617f4962be322a825e61f79a" +[[package]] +name = "minitrace" +version = "0.6.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "197d538cd69839d49a593c8c72df44291b0ea3296ecc0c85529002c53c8fbc6f" +dependencies = [ + "minitrace-macro", + "minstant", + "once_cell", + "parking_lot", + "pin-project", + "rand", + "rtrb", +] + +[[package]] +name = "minitrace-macro" +version = "0.6.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "14efd4b574325fcb981bce1ac700b9ccf071ec2eb94f7a6a6b583a84f228ba47" +dependencies = [ + "proc-macro-error", + "proc-macro2", + "quote", + "syn 1.0.109", +] + [[package]] name = "miniz_oxide" version = "0.7.2" @@ -3197,6 +3262,16 @@ dependencies = [ "adler2", ] +[[package]] +name = "minstant" +version = "0.1.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1fb9b5c752f145ac5046bccc3c4f62892e3c950c1d1eab80c5949cd68a2078db" +dependencies = [ + "ctor", + "web-time", +] + [[package]] name = "mio" version = "1.0.2" @@ -3264,9 +3339,9 @@ dependencies = [ [[package]] name = "nix" -version = "0.28.0" +version = "0.29.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ab2156c4fce2f8df6c499cc1c763e4394b7482525bf2a9701c9d79d215f519e4" +checksum = "71e2746dc3a24dd78b3cfcb7be93368c6de9963d30f43a6a73998a9cf4b17b46" dependencies = [ "bitflags 2.4.2", "cfg-if", @@ -3835,6 +3910,30 @@ dependencies = [ "syn 2.0.52", ] +[[package]] +name = "proc-macro-error" +version = "1.0.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "da25490ff9892aab3fcf7c36f08cfb902dd3e71ca0f9f9517bea02a73a5ce38c" +dependencies = [ + "proc-macro-error-attr", + "proc-macro2", + "quote", + "syn 1.0.109", + "version_check", +] + +[[package]] +name = "proc-macro-error-attr" +version = "1.0.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a1be40180e52ecc98ad80b184934baf3d0d29f979574e439af5a55274b35f869" +dependencies = [ + "proc-macro2", + "quote", + "version_check", +] + [[package]] name = "proc-macro2" version = "1.0.79" @@ -3857,21 +3956,6 @@ dependencies = [ "yansi", ] -[[package]] -name = "prometheus" -version = "0.13.4" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3d33c28a30771f7f96db69893f78b857f7450d7e0237e9c8fc6427a81bae7ed1" -dependencies = [ - "cfg-if", - "fnv", - "lazy_static", - "memchr", - "parking_lot", - "protobuf", - "thiserror", -] - [[package]] name = "proptest" version = "1.5.0" @@ -3978,12 +4062,6 @@ dependencies = [ "prost 0.12.3", ] -[[package]] -name = "protobuf" -version = "2.28.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "106dd99e98437432fed6519dedecfade6a06a73bb7b2a1e019fdd2bee5778d94" - [[package]] name = "quick-error" version = "1.2.3" @@ -4244,6 +4322,15 @@ dependencies = [ "byteorder", ] +[[package]] +name = "rtrb" +version = "0.2.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "99e704dd104faf2326a320140f70f0b736d607c1caa1b1748a6c568a79819109" +dependencies = [ + "cache-padded", +] + [[package]] name = "rust-stemmers" version = "1.2.0" @@ -5781,6 +5868,16 @@ dependencies = [ "wasm-bindgen", ] +[[package]] +name = "web-time" +version = "1.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5a6580f308b1fad9207618087a65c04e7a10bc77e02c8e84e9b00dd4b12fa0bb" +dependencies = [ + "js-sys", + "wasm-bindgen", +] + [[package]] name = "which" version = "4.4.2" diff --git a/rust/blockstore/src/arrow/block/delta/delta.rs b/rust/blockstore/src/arrow/block/delta/delta.rs index 7927ebf388d..7fcdf961221 100644 --- a/rust/blockstore/src/arrow/block/delta/delta.rs +++ b/rust/blockstore/src/arrow/block/delta/delta.rs @@ -127,10 +127,8 @@ impl BlockDelta { #[cfg(test)] mod test { use crate::arrow::{block::Block, config::TEST_MAX_BLOCK_SIZE_BYTES, provider::BlockManager}; - use chroma_cache::{ - cache::Cache, - config::{CacheConfig, UnboundedCacheConfig}, - }; + #[cfg(test)] + use chroma_cache::new_cache_for_test; use chroma_storage::{local::LocalStorage, Storage}; use chroma_types::{DataRecord, MetadataValue}; use rand::{random, Rng}; @@ -157,7 +155,7 @@ mod test { let tmp_dir = tempfile::tempdir().unwrap(); let path = tmp_dir.path().to_str().unwrap(); let storage = Storage::Local(LocalStorage::new(path)); - let cache = Cache::new(&CacheConfig::Unbounded(UnboundedCacheConfig {})); + let cache = new_cache_for_test(); let block_manager = BlockManager::new(storage, TEST_MAX_BLOCK_SIZE_BYTES, cache); let delta = block_manager.create::<&str, Vec>(); @@ -199,7 +197,7 @@ mod test { let tmp_dir = tempfile::tempdir().unwrap(); let path = tmp_dir.path().to_str().unwrap(); let storage = Storage::Local(LocalStorage::new(tmp_dir.path().to_str().unwrap())); - let cache = Cache::new(&CacheConfig::Unbounded(UnboundedCacheConfig {})); + let cache = new_cache_for_test(); let block_manager = BlockManager::new(storage, TEST_MAX_BLOCK_SIZE_BYTES, cache); let delta = block_manager.create::<&str, String>(); let delta_id = delta.id; @@ -260,7 +258,7 @@ mod test { let tmp_dir = tempfile::tempdir().unwrap(); let path = tmp_dir.path().to_str().unwrap(); let storage = Storage::Local(LocalStorage::new(path)); - let cache = Cache::new(&CacheConfig::Unbounded(UnboundedCacheConfig {})); + let cache = new_cache_for_test(); let block_manager = BlockManager::new(storage, TEST_MAX_BLOCK_SIZE_BYTES, cache); let delta = block_manager.create::(); @@ -299,7 +297,7 @@ mod test { let tmp_dir = tempfile::tempdir().unwrap(); let path = tmp_dir.path().to_str().unwrap(); let storage = Storage::Local(LocalStorage::new(path)); - let cache = Cache::new(&CacheConfig::Unbounded(UnboundedCacheConfig {})); + let cache = new_cache_for_test(); let block_manager = BlockManager::new(storage, TEST_MAX_BLOCK_SIZE_BYTES, cache); let delta = block_manager.create::<&str, RoaringBitmap>(); @@ -335,7 +333,7 @@ mod test { let tmp_dir = tempfile::tempdir().unwrap(); let path = tmp_dir.path().to_str().unwrap(); let storage = Storage::Local(LocalStorage::new(path)); - let cache = Cache::new(&CacheConfig::Unbounded(UnboundedCacheConfig {})); + let cache = new_cache_for_test(); let block_manager = BlockManager::new(storage, TEST_MAX_BLOCK_SIZE_BYTES, cache); let ids = ["embedding_id_2", "embedding_id_0", "embedding_id_1"]; let embeddings = [ @@ -399,7 +397,7 @@ mod test { let tmp_dir = tempfile::tempdir().unwrap(); let path = tmp_dir.path().to_str().unwrap(); let storage = Storage::Local(LocalStorage::new(path)); - let cache = Cache::new(&CacheConfig::Unbounded(UnboundedCacheConfig {})); + let cache = new_cache_for_test(); let block_manager = BlockManager::new(storage, TEST_MAX_BLOCK_SIZE_BYTES, cache); let delta = block_manager.create::(); @@ -427,7 +425,7 @@ mod test { let tmp_dir = tempfile::tempdir().unwrap(); let path = tmp_dir.path().to_str().unwrap(); let storage = Storage::Local(LocalStorage::new(tmp_dir.path().to_str().unwrap())); - let cache = Cache::new(&CacheConfig::Unbounded(UnboundedCacheConfig {})); + let cache = new_cache_for_test(); let block_manager = BlockManager::new(storage, TEST_MAX_BLOCK_SIZE_BYTES, cache); let delta = block_manager.create::(); let delta_id = delta.id; diff --git a/rust/blockstore/src/arrow/block/types.rs b/rust/blockstore/src/arrow/block/types.rs index 4fceea86f6f..776effb9eaf 100644 --- a/rust/blockstore/src/arrow/block/types.rs +++ b/rust/blockstore/src/arrow/block/types.rs @@ -1,4 +1,6 @@ -use super::delta::BlockDelta; +use std::cmp::Ordering::{Equal, Greater, Less}; +use std::io::SeekFrom; + use crate::arrow::types::{ArrowReadableKey, ArrowReadableValue}; use arrow::array::ArrayData; use arrow::buffer::Buffer; @@ -9,15 +11,65 @@ use arrow::{ array::{Array, StringArray}, record_batch::RecordBatch, }; -use chroma_cache::cache::Cacheable; use chroma_error::{ChromaError, ErrorCodes}; -use std::cmp::Ordering::{Equal, Greater, Less}; -use std::io::SeekFrom; +use serde::de::Error as DeError; +use serde::ser::Error as SerError; +use serde::{Deserialize, Serialize}; use thiserror::Error; use uuid::Uuid; +use super::delta::BlockDelta; + const ARROW_ALIGNMENT: usize = 64; +/// A RecordBatchWrapper looks like a record batch, but also implements serde's Serialize and +/// Deserialize. +#[derive(Clone, Debug)] +#[repr(transparent)] +pub struct RecordBatchWrapper(pub RecordBatch); + +impl std::ops::Deref for RecordBatchWrapper { + type Target = RecordBatch; + + fn deref(&self) -> &Self::Target { + &self.0 + } +} + +impl std::ops::DerefMut for RecordBatchWrapper { + fn deref_mut(&mut self) -> &mut Self::Target { + &mut self.0 + } +} + +impl From for RecordBatchWrapper { + fn from(rb: RecordBatch) -> Self { + Self(rb) + } +} + +impl Serialize for RecordBatchWrapper { + fn serialize(&self, serializer: S) -> Result + where + S: serde::Serializer, + { + let data = Block::record_batch_to_bytes(self).map_err(S::Error::custom)?; + serializer.serialize_bytes(&data) + } +} + +impl<'de> Deserialize<'de> for RecordBatchWrapper { + fn deserialize(deserializer: D) -> Result + where + D: serde::Deserializer<'de>, + { + let data = Vec::::deserialize(deserializer)?; + let reader = std::io::Cursor::new(data); + let rb = Block::load_record_batch(reader, false).map_err(D::Error::custom)?; + Ok(RecordBatchWrapper(rb)) + } +} + /// A block in a blockfile. A block is a sorted collection of data that is immutable once it has been committed. /// Blocks are the fundamental unit of storage in the blockstore and are used to store data in the form of (key, value) pairs. /// These pairs are stored in an Arrow record batch with the schema (prefix, key, value). @@ -29,19 +81,18 @@ const ARROW_ALIGNMENT: usize = 64; /// A Block holds BlockData via its Inner. Conceptually, the BlockData being loaded into memory is an optimization. The Block interface /// could also support out of core operations where the BlockData is loaded from disk on demand. Currently we force operations to be in-core /// but could expand to out-of-core in the future. -#[derive(Clone, Debug)] +#[derive(Clone, Debug, Deserialize, Serialize)] pub struct Block { // The data is stored in an Arrow record batch with the column schema (prefix, key, value). // These are stored in sorted order by prefix and key for efficient lookups. - pub data: RecordBatch, + pub data: RecordBatchWrapper, pub id: Uuid, } -impl Cacheable for Block {} - impl Block { /// Create a concrete block from an id and the underlying record batch of data pub fn from_record_batch(id: Uuid, data: RecordBatch) -> Self { + let data = data.into(); Self { id, data } } @@ -369,17 +420,22 @@ impl Block { /// Convert the block to bytes in Arrow IPC format pub fn to_bytes(&self) -> Result, BlockToBytesError> { + Self::record_batch_to_bytes(&self.data) + } + + /// Convert the record batch to bytes in Arrow IPC format + fn record_batch_to_bytes(rb: &RecordBatch) -> Result, BlockToBytesError> { let mut bytes = Vec::new(); // Scope the writer so that it is dropped before we return the bytes { - let mut writer = - match arrow::ipc::writer::FileWriter::try_new(&mut bytes, &self.data.schema()) { - Ok(writer) => writer, - Err(e) => { - return Err(BlockToBytesError::ArrowError(e)); - } - }; - match writer.write(&self.data) { + let mut writer = match arrow::ipc::writer::FileWriter::try_new(&mut bytes, &rb.schema()) + { + Ok(writer) => writer, + Err(e) => { + return Err(BlockToBytesError::ArrowError(e)); + } + }; + match writer.write(rb) { Ok(_) => {} Err(e) => { return Err(BlockToBytesError::ArrowError(e)); @@ -438,7 +494,16 @@ impl Block { Self::load_with_reader(reader, id, validate) } - fn load_with_reader(mut reader: R, id: Uuid, validate: bool) -> Result + fn load_with_reader(reader: R, id: Uuid, validate: bool) -> Result + where + R: std::io::Read + std::io::Seek, + { + let batch = Self::load_record_batch(reader, validate)?; + // TODO: how to store / hydrate id? + Ok(Self::from_record_batch(id, batch)) + } + + fn load_record_batch(mut reader: R, validate: bool) -> Result where R: std::io::Read + std::io::Seek, { @@ -459,9 +524,13 @@ impl Block { return Err(BlockLoadError::NoRecordBatches); } }; + Ok(batch) + } +} - // TODO: how to store / hydrate id? - Ok(Self::from_record_batch(id, batch)) +impl chroma_cache::Weighted for Block { + fn weight(&self) -> usize { + 1 } } @@ -550,6 +619,10 @@ pub enum BlockLoadError { ArrowLayoutVerificationError(#[from] ArrowLayoutVerificationError), #[error("No record batches in IPC file")] NoRecordBatches, + #[error(transparent)] + BlockToBytesError(#[from] crate::arrow::block::types::BlockToBytesError), + #[error(transparent)] + CacheError(#[from] chroma_cache::CacheError), } impl ChromaError for BlockLoadError { @@ -559,6 +632,8 @@ impl ChromaError for BlockLoadError { BlockLoadError::ArrowError(_) => ErrorCodes::Internal, BlockLoadError::ArrowLayoutVerificationError(_) => ErrorCodes::Internal, BlockLoadError::NoRecordBatches => ErrorCodes::Internal, + BlockLoadError::BlockToBytesError(_) => ErrorCodes::Internal, + BlockLoadError::CacheError(_) => ErrorCodes::Internal, } } } diff --git a/rust/blockstore/src/arrow/blockfile.rs b/rust/blockstore/src/arrow/blockfile.rs index 9f44fc92c2d..0e6ded1373d 100644 --- a/rust/blockstore/src/arrow/blockfile.rs +++ b/rust/blockstore/src/arrow/blockfile.rs @@ -318,7 +318,7 @@ impl<'me, K: ArrowReadableKey<'me> + Into, V: ArrowReadableValue<'me // We do not dispatch if block is present in the block manager's cache // but not present in the reader's cache (i.e. loaded_blocks). The // next read for this block using this reader instance will populate it. - if !self.block_manager.cached(block_id) + if !self.block_manager.cached(block_id).await && !self.loaded_blocks.lock().contains_key(block_id) { futures.push(self.get_block(*block_id)); @@ -671,10 +671,7 @@ mod tests { use crate::{ arrow::config::TEST_MAX_BLOCK_SIZE_BYTES, arrow::provider::ArrowBlockfileProvider, }; - use chroma_cache::{ - cache::Cache, - config::{CacheConfig, UnboundedCacheConfig}, - }; + use chroma_cache::new_cache_for_test; use chroma_storage::{local::LocalStorage, Storage}; use chroma_types::{DataRecord, MetadataValue}; use proptest::prelude::*; @@ -687,8 +684,8 @@ mod tests { async fn test_count() { let tmp_dir = tempfile::tempdir().unwrap(); let storage = Storage::Local(LocalStorage::new(tmp_dir.path().to_str().unwrap())); - let block_cache = Cache::new(&CacheConfig::Unbounded(UnboundedCacheConfig {})); - let sparse_index_cache = Cache::new(&CacheConfig::Unbounded(UnboundedCacheConfig {})); + let block_cache = new_cache_for_test(); + let sparse_index_cache = new_cache_for_test(); let blockfile_provider = ArrowBlockfileProvider::new( storage, TEST_MAX_BLOCK_SIZE_BYTES, @@ -724,8 +721,8 @@ mod tests { Runtime::new().unwrap().block_on(async { let tmp_dir = tempfile::tempdir().unwrap(); let storage = Storage::Local(LocalStorage::new(tmp_dir.path().to_str().unwrap())); - let block_cache = Cache::new(&CacheConfig::Unbounded(UnboundedCacheConfig {})); - let sparse_index_cache = Cache::new(&CacheConfig::Unbounded(UnboundedCacheConfig {})); + let block_cache = new_cache_for_test(); + let sparse_index_cache = new_cache_for_test(); let blockfile_provider = ArrowBlockfileProvider::new( storage, TEST_MAX_BLOCK_SIZE_BYTES, @@ -786,8 +783,8 @@ mod tests { Runtime::new().unwrap().block_on(async { let tmp_dir = tempfile::tempdir().unwrap(); let storage = Storage::Local(LocalStorage::new(tmp_dir.path().to_str().unwrap())); - let block_cache = Cache::new(&CacheConfig::Unbounded(UnboundedCacheConfig {})); - let sparse_index_cache = Cache::new(&CacheConfig::Unbounded(UnboundedCacheConfig {})); + let block_cache = new_cache_for_test(); + let sparse_index_cache = new_cache_for_test(); let blockfile_provider = ArrowBlockfileProvider::new( storage, TEST_MAX_BLOCK_SIZE_BYTES, @@ -894,8 +891,8 @@ mod tests { async fn test_blockfile() { let tmp_dir = tempfile::tempdir().unwrap(); let storage = Storage::Local(LocalStorage::new(tmp_dir.path().to_str().unwrap())); - let block_cache = Cache::new(&CacheConfig::Unbounded(UnboundedCacheConfig {})); - let sparse_index_cache = Cache::new(&CacheConfig::Unbounded(UnboundedCacheConfig {})); + let block_cache = new_cache_for_test(); + let sparse_index_cache = new_cache_for_test(); let blockfile_provider = ArrowBlockfileProvider::new( storage, TEST_MAX_BLOCK_SIZE_BYTES, @@ -931,8 +928,8 @@ mod tests { async fn test_splitting() { let tmp_dir = tempfile::tempdir().unwrap(); let storage = Storage::Local(LocalStorage::new(tmp_dir.path().to_str().unwrap())); - let block_cache = Cache::new(&CacheConfig::Unbounded(UnboundedCacheConfig {})); - let sparse_index_cache = Cache::new(&CacheConfig::Unbounded(UnboundedCacheConfig {})); + let block_cache = new_cache_for_test(); + let sparse_index_cache = new_cache_for_test(); let blockfile_provider = ArrowBlockfileProvider::new( storage, TEST_MAX_BLOCK_SIZE_BYTES, @@ -1045,8 +1042,8 @@ mod tests { async fn test_splitting_boundary() { let tmp_dir = tempfile::tempdir().unwrap(); let storage = Storage::Local(LocalStorage::new(tmp_dir.path().to_str().unwrap())); - let block_cache = Cache::new(&CacheConfig::Unbounded(UnboundedCacheConfig {})); - let sparse_index_cache = Cache::new(&CacheConfig::Unbounded(UnboundedCacheConfig {})); + let block_cache = new_cache_for_test(); + let sparse_index_cache = new_cache_for_test(); let blockfile_provider = ArrowBlockfileProvider::new( storage, TEST_MAX_BLOCK_SIZE_BYTES, @@ -1087,8 +1084,8 @@ mod tests { async fn test_string_value() { let tmp_dir = tempfile::tempdir().unwrap(); let storage = Storage::Local(LocalStorage::new(tmp_dir.path().to_str().unwrap())); - let block_cache = Cache::new(&CacheConfig::Unbounded(UnboundedCacheConfig {})); - let sparse_index_cache = Cache::new(&CacheConfig::Unbounded(UnboundedCacheConfig {})); + let block_cache = new_cache_for_test(); + let sparse_index_cache = new_cache_for_test(); let blockfile_provider = ArrowBlockfileProvider::new( storage, TEST_MAX_BLOCK_SIZE_BYTES, @@ -1124,8 +1121,8 @@ mod tests { async fn test_float_key() { let tmp_dir = tempfile::tempdir().unwrap(); let storage = Storage::Local(LocalStorage::new(tmp_dir.path().to_str().unwrap())); - let block_cache = Cache::new(&CacheConfig::Unbounded(UnboundedCacheConfig {})); - let sparse_index_cache = Cache::new(&CacheConfig::Unbounded(UnboundedCacheConfig {})); + let block_cache = new_cache_for_test(); + let sparse_index_cache = new_cache_for_test(); let provider = ArrowBlockfileProvider::new( storage, TEST_MAX_BLOCK_SIZE_BYTES, @@ -1158,8 +1155,8 @@ mod tests { async fn test_roaring_bitmap_value() { let tmp_dir = tempfile::tempdir().unwrap(); let storage = Storage::Local(LocalStorage::new(tmp_dir.path().to_str().unwrap())); - let block_cache = Cache::new(&CacheConfig::Unbounded(UnboundedCacheConfig {})); - let sparse_index_cache = Cache::new(&CacheConfig::Unbounded(UnboundedCacheConfig {})); + let block_cache = new_cache_for_test(); + let sparse_index_cache = new_cache_for_test(); let blockfile_provider = ArrowBlockfileProvider::new( storage, TEST_MAX_BLOCK_SIZE_BYTES, @@ -1204,8 +1201,8 @@ mod tests { async fn test_uint_key_val() { let tmp_dir = tempfile::tempdir().unwrap(); let storage = Storage::Local(LocalStorage::new(tmp_dir.path().to_str().unwrap())); - let block_cache = Cache::new(&CacheConfig::Unbounded(UnboundedCacheConfig {})); - let sparse_index_cache = Cache::new(&CacheConfig::Unbounded(UnboundedCacheConfig {})); + let block_cache = new_cache_for_test(); + let sparse_index_cache = new_cache_for_test(); let blockfile_provider = ArrowBlockfileProvider::new( storage, TEST_MAX_BLOCK_SIZE_BYTES, @@ -1238,8 +1235,8 @@ mod tests { async fn test_data_record_val() { let tmp_dir = tempfile::tempdir().unwrap(); let storage = Storage::Local(LocalStorage::new(tmp_dir.path().to_str().unwrap())); - let block_cache = Cache::new(&CacheConfig::Unbounded(UnboundedCacheConfig {})); - let sparse_index_cache = Cache::new(&CacheConfig::Unbounded(UnboundedCacheConfig {})); + let block_cache = new_cache_for_test(); + let sparse_index_cache = new_cache_for_test(); let blockfile_provider = ArrowBlockfileProvider::new( storage, TEST_MAX_BLOCK_SIZE_BYTES, @@ -1290,8 +1287,8 @@ mod tests { // Tests the case where a value is larger than half the block size let tmp_dir = tempfile::tempdir().unwrap(); let storage = Storage::Local(LocalStorage::new(tmp_dir.path().to_str().unwrap())); - let block_cache = Cache::new(&CacheConfig::Unbounded(UnboundedCacheConfig {})); - let sparse_index_cache = Cache::new(&CacheConfig::Unbounded(UnboundedCacheConfig {})); + let block_cache = new_cache_for_test(); + let sparse_index_cache = new_cache_for_test(); let blockfile_provider = ArrowBlockfileProvider::new( storage, TEST_MAX_BLOCK_SIZE_BYTES, @@ -1328,8 +1325,8 @@ mod tests { async fn test_delete() { let tmp_dir = tempfile::tempdir().unwrap(); let storage = Storage::Local(LocalStorage::new(tmp_dir.path().to_str().unwrap())); - let block_cache = Cache::new(&CacheConfig::Unbounded(UnboundedCacheConfig {})); - let sparse_index_cache = Cache::new(&CacheConfig::Unbounded(UnboundedCacheConfig {})); + let block_cache = new_cache_for_test(); + let sparse_index_cache = new_cache_for_test(); let blockfile_provider = ArrowBlockfileProvider::new( storage, TEST_MAX_BLOCK_SIZE_BYTES, @@ -1392,8 +1389,8 @@ mod tests { async fn test_get_at_index() { let tmp_dir = tempfile::tempdir().unwrap(); let storage = Storage::Local(LocalStorage::new(tmp_dir.path().to_str().unwrap())); - let block_cache = Cache::new(&CacheConfig::Unbounded(UnboundedCacheConfig {})); - let sparse_index_cache = Cache::new(&CacheConfig::Unbounded(UnboundedCacheConfig {})); + let block_cache = new_cache_for_test(); + let sparse_index_cache = new_cache_for_test(); let blockfile_provider = ArrowBlockfileProvider::new( storage, TEST_MAX_BLOCK_SIZE_BYTES, @@ -1431,8 +1428,8 @@ mod tests { async fn test_first_block_removal() { let tmp_dir = tempfile::tempdir().unwrap(); let storage = Storage::Local(LocalStorage::new(tmp_dir.path().to_str().unwrap())); - let block_cache = Cache::new(&CacheConfig::Unbounded(UnboundedCacheConfig {})); - let sparse_index_cache = Cache::new(&CacheConfig::Unbounded(UnboundedCacheConfig {})); + let block_cache = new_cache_for_test(); + let sparse_index_cache = new_cache_for_test(); let blockfile_provider = ArrowBlockfileProvider::new( storage, TEST_MAX_BLOCK_SIZE_BYTES, @@ -1523,8 +1520,8 @@ mod tests { async fn test_write_to_same_key_many_times() { let tmp_dir = tempfile::tempdir().unwrap(); let storage = Storage::Local(LocalStorage::new(tmp_dir.path().to_str().unwrap())); - let block_cache = Cache::new(&CacheConfig::Unbounded(UnboundedCacheConfig {})); - let sparse_index_cache = Cache::new(&CacheConfig::Unbounded(UnboundedCacheConfig {})); + let block_cache = new_cache_for_test(); + let sparse_index_cache = new_cache_for_test(); let blockfile_provider = ArrowBlockfileProvider::new( storage, TEST_MAX_BLOCK_SIZE_BYTES, diff --git a/rust/blockstore/src/arrow/concurrency_test.rs b/rust/blockstore/src/arrow/concurrency_test.rs index 9b7e0d675c5..9f74048fa2f 100644 --- a/rust/blockstore/src/arrow/concurrency_test.rs +++ b/rust/blockstore/src/arrow/concurrency_test.rs @@ -1,28 +1,21 @@ #[cfg(test)] mod tests { use crate::arrow::{config::TEST_MAX_BLOCK_SIZE_BYTES, provider::ArrowBlockfileProvider}; - use chroma_cache::{ - cache::Cache, - config::{CacheConfig, LruConfig}, - }; + use chroma_cache::new_cache_for_test; use chroma_storage::{local::LocalStorage, Storage}; use rand::Rng; use shuttle::{future, thread}; #[test] fn test_blockfile_shuttle() { - const BLOCK_CACHE_CAPACITY: usize = 1000; - const SPARSE_INDEX_CACHE_CAPACITY: usize = 1000; shuttle::check_random( || { let tmp_dir = tempfile::tempdir().unwrap(); let storage = Storage::Local(LocalStorage::new(tmp_dir.path().to_str().unwrap())); - let block_cache = Cache::new(&CacheConfig::Lru(LruConfig { - capacity: BLOCK_CACHE_CAPACITY, - })); - let sparse_index_cache = Cache::new(&CacheConfig::Lru(LruConfig { - capacity: SPARSE_INDEX_CACHE_CAPACITY, - })); + // NOTE(rescrv): I chose to use non-persistent caches here to maximize chance of a + // race condition outside the cache. + let block_cache = new_cache_for_test(); + let sparse_index_cache = new_cache_for_test(); let blockfile_provider = ArrowBlockfileProvider::new( storage, TEST_MAX_BLOCK_SIZE_BYTES, diff --git a/rust/blockstore/src/arrow/config.rs b/rust/blockstore/src/arrow/config.rs index 6cbcdc1ea5f..ac010ad7184 100644 --- a/rust/blockstore/src/arrow/config.rs +++ b/rust/blockstore/src/arrow/config.rs @@ -1,4 +1,4 @@ -use chroma_cache::config::CacheConfig; +use chroma_cache::CacheConfig; use serde::Deserialize; // A small block size for testing, so that triggering splits etc is easier diff --git a/rust/blockstore/src/arrow/provider.rs b/rust/blockstore/src/arrow/provider.rs index ff3c559e41c..0d3cfe01241 100644 --- a/rust/blockstore/src/arrow/provider.rs +++ b/rust/blockstore/src/arrow/provider.rs @@ -12,7 +12,7 @@ use crate::{ BlockfileReader, BlockfileWriter, Key, Value, }; use async_trait::async_trait; -use chroma_cache::cache::Cache; +use chroma_cache::{Cache, CacheError}; use chroma_config::Configurable; use chroma_error::{ChromaError, ErrorCodes}; use chroma_storage::Storage; @@ -34,8 +34,8 @@ impl ArrowBlockfileProvider { pub fn new( storage: Storage, max_block_size_bytes: usize, - block_cache: Cache, - sparse_index_cache: Cache, + block_cache: Box>, + sparse_index_cache: Box>, ) -> Self { Self { block_manager: BlockManager::new(storage.clone(), max_block_size_bytes, block_cache), @@ -78,9 +78,10 @@ impl ArrowBlockfileProvider { Ok(BlockfileWriter::ArrowBlockfileWriter(file)) } - pub fn clear(&self) { - self.block_manager.block_cache.clear(); - self.sparse_index_manager.cache.clear(); + pub async fn clear(&self) -> Result<(), CacheError> { + self.block_manager.block_cache.clear().await?; + self.sparse_index_manager.cache.clear().await?; + Ok(()) } pub async fn fork( @@ -186,7 +187,7 @@ impl ChromaError for ForkError { /// is a placeholder for that. #[derive(Clone)] pub(super) struct BlockManager { - block_cache: Cache, + block_cache: Arc>, storage: Storage, max_block_size_bytes: usize, write_mutex: Arc>, @@ -196,8 +197,9 @@ impl BlockManager { pub(super) fn new( storage: Storage, max_block_size_bytes: usize, - block_cache: Cache, + block_cache: Box>, ) -> Self { + let block_cache: Arc> = block_cache.into(); Self { block_cache, storage, @@ -252,14 +254,14 @@ impl BlockManager { Block::from_record_batch(delta_id, record_batch) } - pub(super) fn cached(&self, id: &Uuid) -> bool { - self.block_cache.get(id).is_some() + pub(super) async fn cached(&self, id: &Uuid) -> bool { + self.block_cache.get(id).await.ok().is_some() } pub(super) async fn get(&self, id: &Uuid) -> Result, GetError> { - let block = self.block_cache.get(id); + let block = self.block_cache.get(id).await.ok().flatten(); match block { - Some(block) => Ok(Some(block.clone())), + Some(block) => Ok(Some(block)), None => async { let key = format!("block/{}", id); let bytes_res = self @@ -277,14 +279,18 @@ impl BlockManager { match block { Ok(block) => { let _guard = self.write_mutex.lock().await; - match self.block_cache.get(id) { - Some(b) => { + match self.block_cache.get(id).await { + Ok(Some(b)) => { Ok(Some(b)) } - None => { - self.block_cache.insert(*id, block.clone()); + Ok(None) => { + self.block_cache.insert(*id, block.clone()).await; Ok(Some(block)) } + Err(e) => { + tracing::error!("Error getting block from cache {:?}", e); + Err(GetError::BlockLoadError(e.into())) + } } } Err(e) => { @@ -372,12 +378,13 @@ impl ChromaError for SparseIndexManagerError { #[derive(Clone)] pub(super) struct SparseIndexManager { - cache: Cache, + cache: Arc>, storage: Storage, } impl SparseIndexManager { - pub fn new(storage: Storage, cache: Cache) -> Self { + pub fn new(storage: Storage, cache: Box>) -> Self { + let cache: Arc> = cache.into(); Self { cache, storage } } @@ -385,7 +392,7 @@ impl SparseIndexManager { &self, id: &Uuid, ) -> Result, SparseIndexManagerError> { - let index = self.cache.get(id); + let index = self.cache.get(id).await.ok().flatten(); match index { Some(index) => Ok(Some(index)), None => { @@ -423,7 +430,7 @@ impl SparseIndexManager { let index = SparseIndex::from_block::(promoted_block); match index { Ok(index) => { - self.cache.insert(*id, index.clone()); + self.cache.insert(*id, index.clone()).await; Ok(Some(index)) } Err(e) => { diff --git a/rust/blockstore/src/arrow/sparse_index.rs b/rust/blockstore/src/arrow/sparse_index.rs index d4b319e3b4f..3c2de14ae66 100644 --- a/rust/blockstore/src/arrow/sparse_index.rs +++ b/rust/blockstore/src/arrow/sparse_index.rs @@ -1,5 +1,4 @@ use crate::key::{CompositeKey, KeyWrapper}; -use chroma_cache::cache::Cacheable; use chroma_error::ChromaError; use core::panic; use parking_lot::Mutex; @@ -84,8 +83,6 @@ impl SparseIndexData { } } -impl Cacheable for SparseIndex {} - impl SparseIndex { pub(super) fn new(id: Uuid) -> Self { let forward = BTreeMap::new(); @@ -560,6 +557,12 @@ impl Debug for SparseIndex { } } +impl chroma_cache::Weighted for SparseIndex { + fn weight(&self) -> usize { + 1 + } +} + #[cfg(test)] mod tests { use super::*; diff --git a/rust/blockstore/src/config.rs b/rust/blockstore/src/config.rs index cf876b8ef9a..4fdfa59641b 100644 --- a/rust/blockstore/src/config.rs +++ b/rust/blockstore/src/config.rs @@ -2,6 +2,6 @@ use serde::Deserialize; #[derive(Deserialize, Debug, Clone)] pub enum BlockfileProviderConfig { - Arrow(super::arrow::config::ArrowBlockfileProviderConfig), + Arrow(Box), Memory, } diff --git a/rust/blockstore/src/provider.rs b/rust/blockstore/src/provider.rs index f0b77ad98e3..4d2bcdfc018 100644 --- a/rust/blockstore/src/provider.rs +++ b/rust/blockstore/src/provider.rs @@ -11,7 +11,7 @@ use super::memory::storage::{Readable, Writeable}; use super::types::BlockfileWriter; use super::{BlockfileReader, Key, Value}; use async_trait::async_trait; -use chroma_cache::cache::Cache; +use chroma_cache::Cache; use chroma_config::Configurable; use chroma_error::{ChromaError, ErrorCodes}; use chroma_storage::Storage; @@ -47,8 +47,8 @@ impl BlockfileProvider { pub fn new_arrow( storage: Storage, max_block_size_bytes: usize, - block_cache: Cache, - sparse_index_cache: Cache, + block_cache: Box>, + sparse_index_cache: Box>, ) -> Self { BlockfileProvider::ArrowBlockfileProvider(ArrowBlockfileProvider::new( storage, @@ -89,11 +89,14 @@ impl BlockfileProvider { } } - pub fn clear(&self) { + pub async fn clear(&self) -> Result<(), Box> { match self { BlockfileProvider::HashMapBlockfileProvider(provider) => provider.clear(), - BlockfileProvider::ArrowBlockfileProvider(provider) => provider.clear(), - } + BlockfileProvider::ArrowBlockfileProvider(provider) => { + provider.clear().await.map_err(|e| Box::new(e) as _)? + } + }; + Ok(()) } pub async fn fork( @@ -119,7 +122,7 @@ impl Configurable<(BlockfileProviderConfig, Storage)> for BlockfileProvider { BlockfileProviderConfig::Arrow(blockfile_config) => { Ok(BlockfileProvider::ArrowBlockfileProvider( ArrowBlockfileProvider::try_from_config(&( - blockfile_config.clone(), + *blockfile_config.clone(), storage.clone(), )) .await?, diff --git a/rust/cache/Cargo.toml b/rust/cache/Cargo.toml index 4c21dc92f25..7f27a9df8e3 100644 --- a/rust/cache/Cargo.toml +++ b/rust/cache/Cargo.toml @@ -7,12 +7,18 @@ edition = "2021" path = "src/lib.rs" [dependencies] -foyer = "0.8" +clap = { version = "4", features = ["derive"] } +foyer = "0.10" +anyhow = "1.0" +# TODO(rescrv): Deprecated. Find a suitable replacement for such things. +serde_yaml = "0.9" thiserror = { workspace = true } serde = { workspace = true } async-trait = { workspace = true } parking_lot = { workspace = true } +tracing = { workspace = true } +uuid = { workspace = true } chroma-config = { workspace = true } chroma-error = { workspace = true } diff --git a/rust/cache/src/bin/cops-disk-cache-config-writer.rs b/rust/cache/src/bin/cops-disk-cache-config-writer.rs new file mode 100644 index 00000000000..edf2616c9b3 --- /dev/null +++ b/rust/cache/src/bin/cops-disk-cache-config-writer.rs @@ -0,0 +1,14 @@ +//! Writes a disk cache config to stdout. + +use clap::Parser; + +use chroma_cache::{CacheConfig, FoyerCacheConfig}; + +fn main() { + let config = FoyerCacheConfig::parse(); + if config.dir.is_none() { + panic!("Disk cache is required for disk cache config writer"); + } + let out = serde_yaml::to_string(&CacheConfig::Disk(config)).unwrap(); + print!("{out}"); +} diff --git a/rust/cache/src/bin/cops-memory-cache-config-writer.rs b/rust/cache/src/bin/cops-memory-cache-config-writer.rs new file mode 100644 index 00000000000..8d879210a6f --- /dev/null +++ b/rust/cache/src/bin/cops-memory-cache-config-writer.rs @@ -0,0 +1,12 @@ +//! Writes a memory cache config to stdout. + +use clap::Parser; + +use chroma_cache::{CacheConfig, FoyerCacheConfig}; + +fn main() { + let mut config = FoyerCacheConfig::parse(); + config.dir = None; + let out = serde_yaml::to_string(&CacheConfig::Memory(config)).unwrap(); + print!("{out}"); +} diff --git a/rust/cache/src/cache.rs b/rust/cache/src/cache.rs deleted file mode 100644 index c08920cd9be..00000000000 --- a/rust/cache/src/cache.rs +++ /dev/null @@ -1,263 +0,0 @@ -use crate::config::CacheConfig; -use async_trait::async_trait; -use chroma_config::Configurable; -use chroma_error::{ChromaError, ErrorCodes}; -use core::hash::Hash; -use foyer::{Cache as FoyerCache, CacheBuilder, LfuConfig, LruConfig}; -use parking_lot::RwLock; -use std::collections::HashMap; -use std::sync::Arc; -use thiserror::Error; - -#[derive(Clone)] -pub enum Cache -where - K: Send + Sync + Clone + Hash + Eq + 'static, - V: Send + Sync + Clone + Cacheable + 'static, -{ - Unbounded(UnboundedCache), - Foyer(FoyerCacheWrapper), -} - -impl< - K: Send + Sync + Clone + Hash + Eq + 'static, - V: Send + Sync + Clone + Cacheable + 'static, - > Cache -{ - pub fn new(config: &CacheConfig) -> Self { - match config { - CacheConfig::Unbounded(_) => Cache::Unbounded(UnboundedCache::new(config)), - CacheConfig::Lru(_) => Cache::Foyer(FoyerCacheWrapper::new(config)), - CacheConfig::Lfu(_) => Cache::Foyer(FoyerCacheWrapper::new(config)), - CacheConfig::WeightedLru(_) => Cache::Foyer(FoyerCacheWrapper::new(config)), - } - } - - pub fn insert(&self, key: K, value: V) { - match self { - Cache::Unbounded(cache) => cache.insert(key, value), - Cache::Foyer(cache) => { - cache.insert(key, value); - } - } - } - - pub fn get(&self, key: &K) -> Option { - match self { - Cache::Unbounded(cache) => cache.get(key), - Cache::Foyer(cache) => { - let entry = cache.get(key); - match entry { - Some(v) => { - let value = v.to_owned(); - Some(value) - } - None => None, - } - } - } - } - - pub fn pop(&self) -> Option<(K, V)> { - match self { - Cache::Unbounded(cache) => cache.pop(), - Cache::Foyer(cache) => cache - .cache - .pop() - .map(|entry| (entry.key().clone(), entry.value().clone())), - } - } - - pub fn remove(&self, key: &K) { - match self { - Cache::Unbounded(cache) => { - cache.cache.write().remove(key); - } - Cache::Foyer(cache) => { - cache.cache.remove(key); - } - } - } - - pub fn clear(&self) { - match self { - Cache::Unbounded(cache) => { - let mut write_guard = cache.cache.write(); - write_guard.clear(); - write_guard.shrink_to_fit(); - } - Cache::Foyer(cache) => { - cache.clear(); - } - } - } -} - -#[derive(Clone)] -pub struct UnboundedCache -where - K: Send + Sync + Clone + Hash + Eq + 'static, - V: Send + Sync + Clone + Cacheable + 'static, -{ - cache: Arc>>, -} - -impl UnboundedCache -where - K: Send + Sync + Clone + Hash + Eq + 'static, - V: Send + Sync + Clone + Cacheable + 'static, -{ - pub fn new(config: &CacheConfig) -> Self { - match config { - CacheConfig::Unbounded(_) => UnboundedCache { - cache: Arc::new(RwLock::new(HashMap::new())), - }, - CacheConfig::Lru(_) => panic!("Invalid cache configuration"), - CacheConfig::Lfu(_) => panic!("Invalid cache configuration"), - CacheConfig::WeightedLru(_) => panic!("Invalid cache configuration"), - } - } - - pub fn insert(&self, key: K, value: V) { - self.cache.write().insert(key, value); - } - - pub fn get(&self, key: &K) -> Option { - let read_guard = self.cache.read(); - let value = read_guard.get(key); - value.cloned() - } - - pub fn pop(&self) -> Option<(K, V)> { - let read_guard = self.cache.read(); - let mut write_guard = self.cache.write(); - if let Some(first_key) = read_guard.keys().next() { - if let Some(value) = write_guard.remove(first_key) { - return Some((first_key.clone(), value)); - } - } - - None - } -} - -#[derive(Clone)] -pub struct FoyerCacheWrapper -where - K: Send + Sync + Hash + Eq + 'static, - V: Send + Sync + Clone + Cacheable + 'static, -{ - cache: FoyerCache, -} - -impl FoyerCacheWrapper -where - K: Send + Sync + Hash + Eq + 'static, - V: Send + Sync + Clone + Cacheable + 'static, -{ - pub fn new(config: &CacheConfig) -> Self { - match config { - CacheConfig::Lru(lru) => { - // TODO: add more eviction config - let eviction_config = LruConfig::default(); - let cache_builder = - CacheBuilder::new(lru.capacity).with_eviction_config(eviction_config); - FoyerCacheWrapper { - cache: cache_builder.build(), - } - } - CacheConfig::Lfu(lfu) => { - // TODO: add more eviction config - let eviction_config = LfuConfig::default(); - let cache_builder = - CacheBuilder::new(lfu.capacity).with_eviction_config(eviction_config); - FoyerCacheWrapper { - cache: cache_builder.build(), - } - } - CacheConfig::WeightedLru(weighted_lru) => { - // TODO: add more eviction config - let eviction_config = LruConfig::default(); - let cache_builder = CacheBuilder::new(weighted_lru.capacity) - .with_eviction_config(eviction_config) - .with_weighter(|_key: &_, value: &V| value.weight()); - FoyerCacheWrapper { - cache: cache_builder.build(), - } - } - CacheConfig::Unbounded(_) => panic!("Invalid cache configuration"), - } - } - - pub fn insert(&self, key: K, value: V) { - self.cache.insert(key, value); - } - - pub fn get(&self, key: &K) -> Option { - let entry = self.cache.get(key); - match entry { - Some(v) => { - let value = v.value().to_owned(); - Some(value) - } - None => None, - } - } - - pub fn clear(&self) { - self.cache.clear(); - } -} - -#[async_trait] -impl Configurable for UnboundedCache -where - K: Send + Sync + Clone + Hash + Eq + 'static, - V: Send + Sync + Clone + Cacheable + 'static, -{ - async fn try_from_config(config: &CacheConfig) -> Result> { - match config { - CacheConfig::Unbounded(_) => Ok(UnboundedCache::new(config)), - CacheConfig::Lfu(_) => Err(Box::new(CacheConfigError::InvalidCacheConfig)), - CacheConfig::Lru(_) => Err(Box::new(CacheConfigError::InvalidCacheConfig)), - CacheConfig::WeightedLru(_) => Err(Box::new(CacheConfigError::InvalidCacheConfig)), - } - } -} - -#[async_trait] -impl Configurable for FoyerCacheWrapper -where - K: Send + Sync + Hash + Eq + 'static, - V: Send + Sync + Clone + Cacheable + 'static, -{ - async fn try_from_config(config: &CacheConfig) -> Result> { - match config { - CacheConfig::Lru(_lru) => Ok(FoyerCacheWrapper::new(config)), - CacheConfig::Lfu(_lfu) => Ok(FoyerCacheWrapper::new(config)), - CacheConfig::WeightedLru(_weighted_lru) => Ok(FoyerCacheWrapper::new(config)), - CacheConfig::Unbounded(_) => Err(Box::new(CacheConfigError::InvalidCacheConfig)), - } - } -} - -#[derive(Error, Debug)] -pub enum CacheConfigError { - #[error("Invalid cache config")] - InvalidCacheConfig, -} - -impl ChromaError for CacheConfigError { - fn code(&self) -> ErrorCodes { - match self { - CacheConfigError::InvalidCacheConfig => ErrorCodes::InvalidArgument, - } - } -} - -pub trait Cacheable { - // By default the weight of a type that is cacheable is 1. - fn weight(&self) -> usize { - 1 - } -} diff --git a/rust/cache/src/config.rs b/rust/cache/src/config.rs deleted file mode 100644 index 998688b750b..00000000000 --- a/rust/cache/src/config.rs +++ /dev/null @@ -1,32 +0,0 @@ -use serde::Deserialize; - -#[derive(Deserialize, Debug, Clone)] -pub enum CacheConfig { - // case-insensitive - #[serde(alias = "unbounded")] - Unbounded(UnboundedCacheConfig), - #[serde(alias = "lru")] - Lru(LruConfig), - #[serde(alias = "lfu")] - Lfu(LfuConfig), - #[serde(alias = "weighted_lru")] - WeightedLru(WeightedLruConfig), -} - -#[derive(Deserialize, Debug, Clone)] -pub struct UnboundedCacheConfig {} - -#[derive(Deserialize, Debug, Clone)] -pub struct LruConfig { - pub capacity: usize, -} - -#[derive(Deserialize, Debug, Clone)] -pub struct LfuConfig { - pub capacity: usize, -} - -#[derive(Deserialize, Debug, Clone)] -pub struct WeightedLruConfig { - pub capacity: usize, -} diff --git a/rust/cache/src/foyer.rs b/rust/cache/src/foyer.rs new file mode 100644 index 00000000000..f610179945b --- /dev/null +++ b/rust/cache/src/foyer.rs @@ -0,0 +1,377 @@ +use std::hash::Hash; +use std::sync::Arc; +use std::time::Duration; + +use chroma_error::ChromaError; +use clap::Parser; +use foyer::{ + CacheBuilder, DirectFsDeviceOptionsBuilder, FifoConfig, HybridCacheBuilder, InvalidRatioPicker, + LfuConfig, LruConfig, RateLimitPicker, S3FifoConfig, StorageKey, StorageValue, TracingConfig, +}; +use serde::{Deserialize, Serialize}; + +use super::{CacheError, Weighted}; + +const MIB: usize = 1024 * 1024; + +const fn default_capacity() -> usize { + 1048576 +} + +const fn default_mem() -> usize { + 1024 +} + +const fn default_disk() -> usize { + 1024 +} + +const fn default_file_size() -> usize { + 64 +} + +const fn default_flushers() -> usize { + 64 +} + +const fn default_flush() -> bool { + false +} + +const fn default_reclaimers() -> usize { + 4 +} + +const fn default_recover_concurrency() -> usize { + 16 +} + +const fn default_admission_rate_limit() -> usize { + 50 +} + +const fn default_shards() -> usize { + 64 +} + +fn default_eviction() -> String { + "lfu".to_string() +} + +const fn default_invalid_ratio() -> f64 { + 0.8 +} + +const fn default_trace_insert_us() -> usize { + 1000 * 1000 +} + +const fn default_trace_get_us() -> usize { + 1000 * 1000 +} + +const fn default_trace_obtain_us() -> usize { + 1000 * 1000 +} + +const fn default_trace_remove_us() -> usize { + 1000 * 1000 +} + +const fn default_trace_fetch_us() -> usize { + 1000 * 1000 +} + +#[derive(Deserialize, Debug, Clone, Serialize, Parser)] +pub struct FoyerCacheConfig { + /// Directory for disk cache data. + #[arg(short, long)] + pub dir: Option, + + /// In-memory cache capacity. (items) + #[arg(long, default_value_t = 1048576)] + #[serde(default = "default_capacity")] + pub capacity: usize, + + /// In-memory cache capacity. (MiB) + #[arg(long, default_value_t = 1024)] + #[serde(default = "default_mem")] + pub mem: usize, + + /// Disk cache capacity. (MiB) + #[arg(long, default_value_t = 1024)] + #[serde(default = "default_disk")] + pub disk: usize, + + /// Disk cache file size. (MiB) + #[arg(long, default_value_t = 64)] + #[serde(default = "default_file_size")] + pub file_size: usize, + + /// Flusher count. + #[arg(long, default_value_t = 4)] + #[serde(default = "default_flushers")] + pub flushers: usize, + + /// AKA fsync + #[arg(long, default_value_t = false)] + #[serde(default = "default_flush")] + pub flush: bool, + + /// Reclaimer count. + #[arg(long, default_value_t = 4)] + #[serde(default = "default_reclaimers")] + pub reclaimers: usize, + + /// Recover concurrency. + #[arg(long, default_value_t = 16)] + #[serde(default = "default_recover_concurrency")] + pub recover_concurrency: usize, + + /// Enable rated ticket admission picker if `admission_rate_limit > 0`. (MiB/s) + #[arg(long, default_value_t = 50)] + #[serde(default = "default_admission_rate_limit")] + pub admission_rate_limit: usize, + + /// Shards of both in-memory cache and disk cache indexer. + #[arg(long, default_value_t = 64)] + #[serde(default = "default_shards")] + pub shards: usize, + + /// Eviction algorithm to use + #[arg(long, default_value = "lfu")] + #[serde(default = "default_eviction")] + pub eviction: String, + + /// Ratio of invalid entries to be evicted. + #[arg(long, default_value_t = 0.8)] + #[serde(default = "default_invalid_ratio")] + pub invalid_ratio: f64, + + /// Record insert trace threshold. Only effective with "mtrace" feature. + #[arg(long, default_value_t = 1000 * 1000)] + #[serde(default = "default_trace_insert_us")] + pub trace_insert_us: usize, + + /// Record get trace threshold. Only effective with "mtrace" feature. + #[arg(long, default_value_t = 1000 * 1000)] + #[serde(default = "default_trace_get_us")] + pub trace_get_us: usize, + + /// Record obtain trace threshold. Only effective with "mtrace" feature. + #[arg(long, default_value_t = 1000 * 1000)] + #[serde(default = "default_trace_obtain_us")] + pub trace_obtain_us: usize, + + /// Record remove trace threshold. Only effective with "mtrace" feature. + #[arg(long, default_value_t = 1000 * 1000)] + #[serde(default = "default_trace_remove_us")] + pub trace_remove_us: usize, + + /// Record fetch trace threshold. Only effective with "mtrace" feature. + #[arg(long, default_value_t = 1000 * 1000)] + #[serde(default = "default_trace_fetch_us")] + pub trace_fetch_us: usize, +} + +impl FoyerCacheConfig { + /// Build a hybrid disk and memory cache. + pub async fn build_hybrid( + &self, + ) -> Result>, Box> + where + K: Clone + Send + Sync + StorageKey + Eq + PartialEq + Hash + 'static, + V: Clone + Send + Sync + StorageValue + Weighted + 'static, + { + FoyerHybridCache::hybrid(self).await + } + + /// Build an in-memory-only cache. + pub async fn build_memory( + &self, + ) -> Result>, Box> + where + K: Clone + Send + Sync + Eq + PartialEq + Hash + 'static, + V: Clone + Send + Sync + Weighted + 'static, + { + FoyerPlainCache::memory(self).await + } +} + +#[derive(Clone)] +pub struct FoyerHybridCache +where + K: Clone + Send + Sync + StorageKey + Eq + PartialEq + Hash + 'static, + V: Clone + Send + Sync + StorageValue + Weighted + 'static, +{ + cache: foyer::HybridCache, +} + +impl FoyerHybridCache +where + K: Clone + Send + Sync + StorageKey + Eq + PartialEq + Hash + 'static, + V: Clone + Send + Sync + StorageValue + Weighted + 'static, +{ + /// Build a hybrid disk and memory cache. + pub async fn hybrid( + config: &FoyerCacheConfig, + ) -> Result>, Box> { + let tracing_config = TracingConfig::default(); + tracing_config + .set_record_hybrid_insert_threshold(Duration::from_micros(config.trace_insert_us as _)); + tracing_config + .set_record_hybrid_get_threshold(Duration::from_micros(config.trace_get_us as _)); + tracing_config + .set_record_hybrid_obtain_threshold(Duration::from_micros(config.trace_obtain_us as _)); + tracing_config + .set_record_hybrid_remove_threshold(Duration::from_micros(config.trace_remove_us as _)); + tracing_config + .set_record_hybrid_fetch_threshold(Duration::from_micros(config.trace_fetch_us as _)); + + let builder = HybridCacheBuilder::::new() + .with_tracing_config(tracing_config) + .memory(config.mem * MIB) + .with_shards(config.shards); + + let builder = match config.eviction.as_str() { + "lru" => builder.with_eviction_config(LruConfig::default()), + "lfu" => builder.with_eviction_config(LfuConfig::default()), + "fifo" => builder.with_eviction_config(FifoConfig::default()), + "s3fifo" => builder.with_eviction_config(S3FifoConfig::default()), + _ => { + return Err(Box::new(CacheError::InvalidCacheConfig(format!( + "eviction: {}", + config.eviction + )))); + } + }; + + let Some(dir) = config.dir.as_ref() else { + return Err(Box::new(CacheError::InvalidCacheConfig( + "missing dir".to_string(), + ))); + }; + + let mut builder = builder + .with_weighter(|_, v| v.weight()) + .storage() + .with_device_config( + DirectFsDeviceOptionsBuilder::new(dir) + .with_capacity(config.disk * MIB) + .with_file_size(config.file_size * MIB) + .build(), + ) + .with_flush(config.flush) + .with_indexer_shards(config.shards) + .with_recover_concurrency(config.recover_concurrency) + .with_flushers(config.flushers) + .with_reclaimers(config.reclaimers) + .with_eviction_pickers(vec![Box::new(InvalidRatioPicker::new( + config.invalid_ratio, + ))]); + + if config.admission_rate_limit > 0 { + builder = builder.with_admission_picker(Arc::new(RateLimitPicker::new( + config.admission_rate_limit * MIB, + ))); + } + let cache = builder.build().await.map_err(|e| { + Box::new(CacheError::InvalidCacheConfig(format!( + "builder failed: {:?}", + e + ))) as _ + })?; + Ok(Box::new(FoyerHybridCache { cache })) + } +} + +#[async_trait::async_trait] +impl super::Cache for FoyerHybridCache +where + K: Clone + Send + Sync + StorageKey + Eq + PartialEq + Hash + 'static, + V: Clone + Send + Sync + StorageValue + Weighted + 'static, +{ + async fn get(&self, key: &K) -> Result, CacheError> { + Ok(self.cache.get(key).await?.map(|v| v.value().clone())) + } + + async fn insert(&self, key: K, value: V) { + self.cache.insert(key, value); + } + + async fn remove(&self, key: &K) { + self.cache.remove(key); + } + + async fn clear(&self) -> Result<(), CacheError> { + Ok(self.cache.clear().await?) + } +} + +impl super::PersistentCache for FoyerHybridCache +where + K: Clone + Send + Sync + StorageKey + Eq + PartialEq + Hash + 'static, + V: Clone + Send + Sync + StorageValue + Weighted + 'static, +{ +} + +#[derive(Clone)] +pub struct FoyerPlainCache +where + K: Clone + Send + Sync + Eq + PartialEq + Hash + 'static, + V: Clone + Send + Sync + Weighted + 'static, +{ + cache: foyer::Cache, +} + +impl FoyerPlainCache +where + K: Clone + Send + Sync + Eq + PartialEq + Hash + 'static, + V: Clone + Send + Sync + Weighted + 'static, +{ + /// Build an in-memory cache. + pub async fn memory( + config: &FoyerCacheConfig, + ) -> Result>, Box> { + let tracing_config = TracingConfig::default(); + tracing_config + .set_record_hybrid_insert_threshold(Duration::from_micros(config.trace_insert_us as _)); + tracing_config + .set_record_hybrid_get_threshold(Duration::from_micros(config.trace_get_us as _)); + tracing_config + .set_record_hybrid_obtain_threshold(Duration::from_micros(config.trace_obtain_us as _)); + tracing_config + .set_record_hybrid_remove_threshold(Duration::from_micros(config.trace_remove_us as _)); + tracing_config + .set_record_hybrid_fetch_threshold(Duration::from_micros(config.trace_fetch_us as _)); + + let cache = CacheBuilder::new(config.capacity) + .with_shards(config.shards) + .build(); + Ok(Box::new(FoyerPlainCache { cache })) + } +} + +#[async_trait::async_trait] +impl super::Cache for FoyerPlainCache +where + K: Clone + Send + Sync + Eq + PartialEq + Hash + 'static, + V: Clone + Send + Sync + Weighted + 'static, +{ + async fn get(&self, key: &K) -> Result, CacheError> { + Ok(self.cache.get(key).map(|v| v.value().clone())) + } + + async fn insert(&self, key: K, value: V) { + self.cache.insert(key, value); + } + + async fn remove(&self, key: &K) { + self.cache.remove(key); + } + + async fn clear(&self) -> Result<(), CacheError> { + self.cache.clear(); + Ok(()) + } +} diff --git a/rust/cache/src/lib.rs b/rust/cache/src/lib.rs index ad3a8488041..45553141da5 100644 --- a/rust/cache/src/lib.rs +++ b/rust/cache/src/lib.rs @@ -1,30 +1,112 @@ -pub mod cache; -pub mod config; - -use crate::cache::Cache; -use crate::config::CacheConfig; -use cache::Cacheable; -use chroma_config::Configurable; -use chroma_error::ChromaError; use std::hash::Hash; -pub async fn from_config(config: &CacheConfig) -> Result, Box> +use ::foyer::{StorageKey, StorageValue}; +use chroma_error::{ChromaError, ErrorCodes}; +use serde::{Deserialize, Serialize}; +use thiserror::Error; + +mod foyer; +mod nop; +mod unbounded; + +use crate::nop::NopCache; +use crate::unbounded::UnboundedCache; + +pub use foyer::FoyerCacheConfig; +pub use unbounded::UnboundedCacheConfig; + +/// A CacheError represents an error that occurred while interacting with a cache. +/// +/// InvalidCacheConfig is used at configuration. It should not be returned in steady state. +/// DiskError captures errors that occur when serving from cache. +#[derive(Error, Debug)] +pub enum CacheError { + #[error("Invalid cache config")] + InvalidCacheConfig(String), + #[error("I/O error when serving from cache")] + DiskError(#[from] anyhow::Error), +} + +impl ChromaError for CacheError { + fn code(&self) -> ErrorCodes { + match self { + CacheError::InvalidCacheConfig(_) => ErrorCodes::InvalidArgument, + CacheError::DiskError(_) => ErrorCodes::Unavailable, + } + } +} + +/// A cache configuration. +/// "unbounded" is a cache that doesn't evict. +/// "disk" is a foyer-backed cache that lives on disk. +/// "memory" is a foyer-backed cache that lives in memory. +#[derive(Deserialize, Debug, Clone, Serialize)] +pub enum CacheConfig { + // case-insensitive + #[serde(rename = "unbounded")] + Unbounded(UnboundedCacheConfig), + #[serde(rename = "disk")] + Disk(FoyerCacheConfig), + #[serde(rename = "memory")] + #[serde(alias = "lru")] + #[serde(alias = "lfu")] + #[serde(alias = "weighted_lru")] + Memory(FoyerCacheConfig), + #[serde(rename = "nop")] + Nop, +} + +/// A cache offers async access. It's unspecified whether this cache is persistent or not. +#[async_trait::async_trait] +pub trait Cache: Send + Sync where - K: Send + Sync + Clone + Hash + Eq + 'static, - V: Send + Sync + Clone + Cacheable + 'static, + K: Clone + Send + Sync + Eq + PartialEq + Hash + 'static, + V: Clone + Send + Sync + Weighted + 'static, +{ + async fn insert(&self, key: K, value: V); + async fn get(&self, key: &K) -> Result, CacheError>; + async fn remove(&self, key: &K); + async fn clear(&self) -> Result<(), CacheError>; +} + +/// A persistent cache extends the traits of a cache to require StorageKey and StorageValue. +pub trait PersistentCache: Cache +where + K: Clone + Send + Sync + Eq + PartialEq + Hash + StorageKey + 'static, + V: Clone + Send + Sync + StorageValue + Weighted + 'static, +{ +} + +/// A trait to capture the weight of objects in the system. +pub trait Weighted { + fn weight(&self) -> usize; +} + +/// Create a new cache from the provided config. +pub async fn from_config( + config: &CacheConfig, +) -> Result>, Box> +where + K: Clone + Send + Sync + Eq + PartialEq + Hash + 'static, + V: Clone + Send + Sync + Weighted + 'static, { match config { - CacheConfig::Unbounded(_) => Ok(Cache::Unbounded( - crate::cache::UnboundedCache::try_from_config(config).await?, - )), - CacheConfig::Lru(_) => Ok(Cache::Foyer( - crate::cache::FoyerCacheWrapper::try_from_config(config).await?, - )), - CacheConfig::Lfu(_) => Ok(Cache::Foyer( - crate::cache::FoyerCacheWrapper::try_from_config(config).await?, - )), - CacheConfig::WeightedLru(_) => Ok(Cache::Foyer( - crate::cache::FoyerCacheWrapper::try_from_config(config).await?, - )), + CacheConfig::Unbounded(unbounded_config) => { + Ok(Box::new(UnboundedCache::new(unbounded_config))) + } + CacheConfig::Memory(c) => Ok(c.build_memory().await? as _), + CacheConfig::Disk(_) => Err(Box::new(CacheError::InvalidCacheConfig( + "from_config was called with disk".to_string(), + ))), + CacheConfig::Nop => Ok(Box::new(NopCache)), } } + +/// Create a new cache for testing purposes. +pub fn new_cache_for_test() -> Box> +where + K: Send + Sync + Clone + Eq + PartialEq + Hash + 'static, + V: Send + Sync + Clone + Weighted + 'static, +{ + Box::new(UnboundedCache::new(&UnboundedCacheConfig::default())) +} diff --git a/rust/cache/src/nop.rs b/rust/cache/src/nop.rs new file mode 100644 index 00000000000..1e703c20b45 --- /dev/null +++ b/rust/cache/src/nop.rs @@ -0,0 +1,25 @@ +use std::hash::Hash; + +use super::{CacheError, Weighted}; + +/// A zero-configuration cache that doesn't evict. +pub struct NopCache; + +#[async_trait::async_trait] +impl super::Cache for NopCache +where + K: Clone + Send + Sync + Eq + PartialEq + Hash + 'static, + V: Clone + Send + Sync + Weighted + 'static, +{ + async fn get(&self, _: &K) -> Result, CacheError> { + Ok(None) + } + + async fn insert(&self, _: K, _: V) {} + + async fn remove(&self, _: &K) {} + + async fn clear(&self) -> Result<(), CacheError> { + Ok(()) + } +} diff --git a/rust/cache/src/unbounded.rs b/rust/cache/src/unbounded.rs new file mode 100644 index 00000000000..49a3a30e15e --- /dev/null +++ b/rust/cache/src/unbounded.rs @@ -0,0 +1,69 @@ +use std::collections::HashMap; +use std::hash::Hash; +use std::sync::Arc; + +use parking_lot::RwLock; + +use super::{CacheError, Weighted}; + +/// A zero-configuration cache that doesn't evict. +/// Mostly useful for testing. +#[derive(Debug, Default, Clone, serde::Deserialize, serde::Serialize)] +pub struct UnboundedCacheConfig {} + +impl UnboundedCacheConfig { + pub fn build(&self) -> Box> + where + K: Clone + Send + Sync + Eq + PartialEq + Hash + 'static, + V: Clone + Send + Sync + Clone + Weighted + 'static, + { + Box::new(UnboundedCache::new(self)) + } +} + +/// A zero-configuration cache that doesn't evict. +pub struct UnboundedCache +where + K: Clone + Send + Sync + Eq + PartialEq + Hash + 'static, + V: Clone + Send + Sync + Clone + Weighted + 'static, +{ + cache: Arc>>, +} + +impl UnboundedCache +where + K: Clone + Send + Sync + Eq + PartialEq + Hash + 'static, + V: Clone + Send + Sync + Clone + Weighted + 'static, +{ + pub fn new(_: &UnboundedCacheConfig) -> Self { + Self { + cache: Arc::new(RwLock::new(HashMap::new())), + } + } +} + +#[async_trait::async_trait] +impl super::Cache for UnboundedCache +where + K: Clone + Send + Sync + Eq + PartialEq + Hash + 'static, + V: Clone + Send + Sync + Weighted + 'static, +{ + async fn get(&self, key: &K) -> Result, CacheError> { + let read_guard = self.cache.read(); + let value = read_guard.get(key); + Ok(value.cloned()) + } + + async fn insert(&self, key: K, value: V) { + self.cache.write().insert(key, value); + } + + async fn remove(&self, key: &K) { + self.cache.write().remove(key); + } + + async fn clear(&self) -> Result<(), CacheError> { + self.cache.write().clear(); + Ok(()) + } +} diff --git a/rust/index/benches/full_text.rs b/rust/index/benches/full_text.rs index c249214bd7d..a7a9e4d9cc3 100644 --- a/rust/index/benches/full_text.rs +++ b/rust/index/benches/full_text.rs @@ -2,10 +2,7 @@ use std::sync::Arc; use anyhow::Result; use chroma_blockstore::{arrow::provider::ArrowBlockfileProvider, provider::BlockfileProvider}; -use chroma_cache::{ - cache::Cache, - config::{CacheConfig, UnboundedCacheConfig}, -}; +use chroma_cache::UnboundedCacheConfig; use chroma_index::fulltext::{ tokenizer::TantivyChromaTokenizer, types::{FullTextIndexReader, FullTextIndexWriter}, @@ -85,8 +82,8 @@ const BLOCK_SIZE: usize = 8 * 1024 * 1024; // 8MB fn create_blockfile_provider(storage_dir: &str) -> BlockfileProvider { let storage = Storage::Local(LocalStorage::new(storage_dir)); - let block_cache = Cache::new(&CacheConfig::Unbounded(UnboundedCacheConfig {})); - let sparse_index_cache = Cache::new(&CacheConfig::Unbounded(UnboundedCacheConfig {})); + let block_cache = UnboundedCacheConfig {}.build(); + let sparse_index_cache = UnboundedCacheConfig {}.build(); let arrow_blockfile_provider = ArrowBlockfileProvider::new(storage.clone(), BLOCK_SIZE, block_cache, sparse_index_cache); BlockfileProvider::ArrowBlockfileProvider(arrow_blockfile_provider) diff --git a/rust/index/src/config.rs b/rust/index/src/config.rs index 485d9189a13..a8b77d37b2b 100644 --- a/rust/index/src/config.rs +++ b/rust/index/src/config.rs @@ -1,4 +1,4 @@ -use chroma_cache::config::CacheConfig; +use chroma_cache::CacheConfig; use serde::Deserialize; #[derive(Deserialize, Debug, Clone)] diff --git a/rust/index/src/hnsw_provider.rs b/rust/index/src/hnsw_provider.rs index 2c5e927a818..09aa1cacc1f 100644 --- a/rust/index/src/hnsw_provider.rs +++ b/rust/index/src/hnsw_provider.rs @@ -7,7 +7,7 @@ use super::{ }; use async_trait::async_trait; -use chroma_cache::cache::{Cache, Cacheable}; +use chroma_cache::Cache; use chroma_config::Configurable; use chroma_error::ChromaError; use chroma_error::ErrorCodes; @@ -47,7 +47,7 @@ const FILES: [&str; 4] = [ // their ref count goes to 0. #[derive(Clone)] pub struct HnswIndexProvider { - cache: Cache, + cache: Arc>, pub temporary_storage_path: PathBuf, storage: Storage, write_mutex: Arc>, @@ -58,16 +58,6 @@ pub struct HnswIndexRef { pub inner: Arc>, } -impl Cacheable for HnswIndexRef { - fn weight(&self) -> usize { - let index = self.inner.read(); - if index.len() == 0 { - return 1; - } - index.len() * std::mem::size_of::() * index.dimensionality() as usize - } -} - impl Debug for HnswIndexProvider { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { f.debug_struct("HnswIndexProvider") @@ -83,6 +73,7 @@ impl Configurable<(HnswProviderConfig, Storage)> for HnswIndexProvider { ) -> Result> { let (hnsw_config, storage) = config; let cache = chroma_cache::from_config(&hnsw_config.hnsw_cache_config).await?; + let cache: Arc> = cache.into(); Ok(Self { cache, storage: storage.clone(), @@ -92,8 +83,23 @@ impl Configurable<(HnswProviderConfig, Storage)> for HnswIndexProvider { } } +impl chroma_cache::Weighted for HnswIndexRef { + fn weight(&self) -> usize { + let index = self.inner.read(); + if index.len() == 0 { + return 1; + } + index.len() * std::mem::size_of::() * index.dimensionality() as usize + } +} + impl HnswIndexProvider { - pub fn new(storage: Storage, storage_path: PathBuf, cache: Cache) -> Self { + pub fn new( + storage: Storage, + storage_path: PathBuf, + cache: Box>, + ) -> Self { + let cache: Arc> = cache.into(); Self { cache, storage, @@ -102,8 +108,8 @@ impl HnswIndexProvider { } } - pub fn get(&self, index_id: &Uuid, collection_id: &Uuid) -> Option { - match self.cache.get(collection_id) { + pub async fn get(&self, index_id: &Uuid, collection_id: &Uuid) -> Option { + match self.cache.get(collection_id).await.ok().flatten() { Some(index) => { let index_with_lock = index.inner.read(); if index_with_lock.id == *index_id { @@ -177,13 +183,13 @@ impl HnswIndexProvider { match HnswIndex::load(storage_path_str, &index_config, new_id) { Ok(index) => { let _guard = self.write_mutex.lock().await; - match self.get(&new_id, &segment.collection) { + match self.get(&new_id, &segment.collection).await { Some(index) => Ok(index.clone()), None => { let index = HnswIndexRef { inner: Arc::new(RwLock::new(index)), }; - self.cache.insert(segment.collection, index.clone()); + self.cache.insert(segment.collection, index.clone()).await; Ok(index) } } @@ -310,13 +316,13 @@ impl HnswIndexProvider { match HnswIndex::load(index_storage_path_str, &index_config, *id) { Ok(index) => { let _guard = self.write_mutex.lock().await; - match self.get(id, &segment.collection) { + match self.get(id, &segment.collection).await { Some(index) => Ok(index.clone()), None => { let index = HnswIndexRef { inner: Arc::new(RwLock::new(index)), }; - self.cache.insert(segment.collection, index.clone()); + self.cache.insert(segment.collection, index.clone()).await; Ok(index) } } @@ -369,13 +375,13 @@ impl HnswIndexProvider { .map_err(|e| Box::new(HnswIndexProviderCreateError::IndexInitError(e)))?; let _guard = self.write_mutex.lock().await; - match self.get(&id, &segment.collection) { + match self.get(&id, &segment.collection).await { Some(index) => Ok(index.clone()), None => { let index = HnswIndexRef { inner: Arc::new(RwLock::new(index)), }; - self.cache.insert(segment.collection, index.clone()); + self.cache.insert(segment.collection, index.clone()).await; Ok(index) } } @@ -562,7 +568,7 @@ pub enum HnswIndexProviderFileError { #[cfg(test)] mod tests { use super::*; - use chroma_cache::config::{CacheConfig, UnboundedCacheConfig}; + use chroma_cache::new_cache_for_test; use chroma_storage::local::LocalStorage; use chroma_types::SegmentType; use std::collections::HashMap; @@ -576,7 +582,7 @@ mod tests { tokio::fs::create_dir_all(&hnsw_tmp_path).await.unwrap(); let storage = Storage::Local(LocalStorage::new(storage_dir.to_str().unwrap())); - let cache = Cache::new(&CacheConfig::Unbounded(UnboundedCacheConfig {})); + let cache = new_cache_for_test(); let provider = HnswIndexProvider::new(storage, hnsw_tmp_path, cache); let segment = Segment { id: Uuid::new_v4(), diff --git a/rust/worker/src/compactor/compaction_manager.rs b/rust/worker/src/compactor/compaction_manager.rs index cc0701c1d39..210d43edb4a 100644 --- a/rust/worker/src/compactor/compaction_manager.rs +++ b/rust/worker/src/compactor/compaction_manager.rs @@ -307,7 +307,9 @@ impl Handler for CompactionManager { self.compact_batch(&mut ids).await; self.hnsw_index_provider.purge_by_id(&ids).await; - self.blockfile_provider.clear(); + if let Err(err) = self.blockfile_provider.clear().await { + tracing::error!("Failed to clear blockfile provider: {:?}", err); + } // Compaction is done, schedule the next compaction ctx.scheduler @@ -336,7 +338,7 @@ mod tests { use crate::log::log::InternalLogRecord; use crate::sysdb::test_sysdb::TestSysDb; use chroma_blockstore::arrow::config::TEST_MAX_BLOCK_SIZE_BYTES; - use chroma_cache::{cache::Cache, config::CacheConfig, config::UnboundedCacheConfig}; + use chroma_cache::new_cache_for_test; use chroma_storage::local::LocalStorage; use chroma_types::{Collection, LogRecord, Operation, OperationRecord, Segment}; use std::collections::HashMap; @@ -523,9 +525,9 @@ mod tests { // Set memberlist scheduler.set_memberlist(vec![my_member_id.clone()]); - let block_cache = Cache::new(&CacheConfig::Unbounded(UnboundedCacheConfig {})); - let sparse_index_cache = Cache::new(&CacheConfig::Unbounded(UnboundedCacheConfig {})); - let hnsw_cache = Cache::new(&CacheConfig::Unbounded(UnboundedCacheConfig {})); + let block_cache = new_cache_for_test(); + let sparse_index_cache = new_cache_for_test(); + let hnsw_cache = new_cache_for_test(); let mut manager = CompactionManager::new( scheduler, log, diff --git a/rust/worker/src/config.rs b/rust/worker/src/config.rs index c5cb9bb48ce..55f3751395e 100644 --- a/rust/worker/src/config.rs +++ b/rust/worker/src/config.rs @@ -193,17 +193,18 @@ mod tests { block_manager_config: max_block_size_bytes: 16384 block_cache_config: - lru: + memory: capacity: 1000 sparse_index_manager_config: sparse_index_cache_config: - lru: + memory: capacity: 1000 hnsw_provider: hnsw_temporary_path: "~/tmp" hnsw_cache_config: - weighted_lru: - capacity: 1073741824 + disk: + capacity: 8589934592 # 8GB + eviction: lru compaction_service: service_name: "compaction-service" @@ -258,17 +259,18 @@ mod tests { block_manager_config: max_block_size_bytes: 16384 block_cache_config: - lru: + memory: capacity: 1000 sparse_index_manager_config: sparse_index_cache_config: - lru: + memory: capacity: 1000 hnsw_provider: hnsw_temporary_path: "~/tmp" hnsw_cache_config: - weighted_lru: - capacity: 1073741824 + disk: + capacity: 8589934592 # 8GB + eviction: lru "#, ); let config = RootConfig::load(); @@ -337,17 +339,18 @@ mod tests { block_manager_config: max_block_size_bytes: 16384 block_cache_config: - lru: + memory: capacity: 1000 sparse_index_manager_config: sparse_index_cache_config: - lru: + memory: capacity: 1000 hnsw_provider: hnsw_temporary_path: "~/tmp" hnsw_cache_config: - weighted_lru: + disk: capacity: 1073741824 + eviction: lru compaction_service: service_name: "compaction-service" @@ -402,17 +405,18 @@ mod tests { block_manager_config: max_block_size_bytes: 16384 block_cache_config: - lru: + memory: capacity: 1000 sparse_index_manager_config: sparse_index_cache_config: - lru: + memory: capacity: 1000 hnsw_provider: hnsw_temporary_path: "~/tmp" hnsw_cache_config: - weighted_lru: + disk: capacity: 1073741824 + eviction: lru "#, ); let config = RootConfig::load_from_path("random_path.yaml"); @@ -499,17 +503,18 @@ mod tests { block_manager_config: max_block_size_bytes: 16384 block_cache_config: - lru: + memory: capacity: 1000 sparse_index_manager_config: sparse_index_cache_config: - lru: + memory: capacity: 1000 hnsw_provider: hnsw_temporary_path: "~/tmp" hnsw_cache_config: - weighted_lru: + disk: capacity: 1073741824 + eviction: lru compaction_service: service_name: "compaction-service" @@ -564,17 +569,18 @@ mod tests { block_manager_config: max_block_size_bytes: 16384 block_cache_config: - lru: + memory: capacity: 1000 sparse_index_manager_config: sparse_index_cache_config: - lru: + memory: capacity: 1000 hnsw_provider: hnsw_temporary_path: "~/tmp" hnsw_cache_config: - weighted_lru: + disk: capacity: 1073741824 + eviction: lru "#, ); let config = RootConfig::load(); @@ -663,16 +669,16 @@ mod tests { block_manager_config: max_block_size_bytes: 16384 block_cache_config: - lru: + memory: capacity: 1000 sparse_index_manager_config: sparse_index_cache_config: - lru: + memory: capacity: 1000 hnsw_provider: hnsw_temporary_path: "~/tmp" hnsw_cache_config: - weighted_lru: + memory: capacity: 1073741824 compaction_service: @@ -714,17 +720,18 @@ mod tests { block_manager_config: max_block_size_bytes: 16384 block_cache_config: - lru: + memory: capacity: 1000 sparse_index_manager_config: sparse_index_cache_config: - lru: + memory: capacity: 1000 hnsw_provider: hnsw_temporary_path: "~/tmp" hnsw_cache_config: - weighted_lru: + disk: capacity: 1073741824 + eviction: lru "#, ); let config = RootConfig::load(); @@ -759,4 +766,142 @@ mod tests { // Sanity check that root config loads from default path correctly let _ = RootConfig::load(); } + + #[test] + #[serial] + fn test_config_without_cache_directive() { + Jail::expect_with(|jail| { + let _ = jail.create_file( + "random_path.yaml", + r#" + query_service: + service_name: "query-service" + otel_endpoint: "http://jaeger:4317" + my_member_id: "query-service-0" + my_port: 50051 + assignment_policy: + RendezvousHashing: + hasher: Murmur3 + memberlist_provider: + CustomResource: + kube_namespace: "chroma" + memberlist_name: "query-service-memberlist" + queue_size: 100 + sysdb: + Grpc: + host: "localhost" + port: 50051 + connect_timeout_ms: 5000 + request_timeout_ms: 1000 + storage: + AdmissionControlledS3: + s3_config: + bucket: "chroma" + credentials: Minio + connect_timeout_ms: 5000 + request_timeout_ms: 1000 + upload_part_size_bytes: 8388608 + download_part_size_bytes: 8388608 + rate_limiting_policy: + CountBasedPolicy: + max_concurrent_requests: 15 + log: + Grpc: + host: "localhost" + port: 50051 + connect_timeout_ms: 5000 + request_timeout_ms: 1000 + dispatcher: + num_worker_threads: 4 + dispatcher_queue_size: 100 + worker_queue_size: 100 + blockfile_provider: + Arrow: + block_manager_config: + max_block_size_bytes: 16384 + block_cache_config: + nop + sparse_index_manager_config: + sparse_index_cache_config: + nop + hnsw_provider: + hnsw_temporary_path: "~/tmp" + hnsw_cache_config: + nop + + compaction_service: + service_name: "compaction-service" + otel_endpoint: "http://jaeger:4317" + my_member_id: "compaction-service-0" + my_port: 50051 + assignment_policy: + RendezvousHashing: + hasher: Murmur3 + memberlist_provider: + CustomResource: + kube_namespace: "chroma" + memberlist_name: "compaction-service-memberlist" + queue_size: 100 + sysdb: + Grpc: + host: "localhost" + port: 50051 + connect_timeout_ms: 5000 + request_timeout_ms: 1000 + storage: + AdmissionControlledS3: + s3_config: + bucket: "chroma" + credentials: Minio + connect_timeout_ms: 5000 + request_timeout_ms: 1000 + upload_part_size_bytes: 8388608 + download_part_size_bytes: 8388608 + rate_limiting_policy: + CountBasedPolicy: + max_concurrent_requests: 15 + log: + Grpc: + host: "localhost" + port: 50051 + connect_timeout_ms: 5000 + request_timeout_ms: 1000 + dispatcher: + num_worker_threads: 4 + dispatcher_queue_size: 100 + worker_queue_size: 100 + compactor: + compaction_manager_queue_size: 1000 + max_concurrent_jobs: 100 + compaction_interval_sec: 60 + min_compaction_size: 10 + max_compaction_size: 10000 + max_partition_size: 5000 + blockfile_provider: + Arrow: + block_manager_config: + max_block_size_bytes: 16384 + block_cache_config: + nop + sparse_index_manager_config: + sparse_index_cache_config: + nop + hnsw_provider: + hnsw_temporary_path: "~/tmp" + hnsw_cache_config: + nop + "#, + ); + let config = RootConfig::load_from_path("random_path.yaml"); + assert_eq!(config.query_service.my_member_id, "query-service-0"); + assert_eq!(config.query_service.my_port, 50051); + + assert_eq!( + config.compaction_service.my_member_id, + "compaction-service-0" + ); + assert_eq!(config.compaction_service.my_port, 50051); + Ok(()) + }); + } } diff --git a/rust/worker/src/execution/operators/hydrate_metadata_results.rs b/rust/worker/src/execution/operators/hydrate_metadata_results.rs index 71a467af2d8..bd2a2e95b48 100644 --- a/rust/worker/src/execution/operators/hydrate_metadata_results.rs +++ b/rust/worker/src/execution/operators/hydrate_metadata_results.rs @@ -229,7 +229,7 @@ mod test { arrow::{config::TEST_MAX_BLOCK_SIZE_BYTES, provider::ArrowBlockfileProvider}, provider::BlockfileProvider, }; - use chroma_cache::{cache::Cache, config::CacheConfig, config::UnboundedCacheConfig}; + use chroma_cache::new_cache_for_test; use chroma_storage::{local::LocalStorage, Storage}; use chroma_types::{ Chunk, LogRecord, MetadataValue, Operation, OperationRecord, UpdateMetadataValue, @@ -242,8 +242,8 @@ mod test { async fn test_merge_and_hydrate() { let tmp_dir = tempfile::tempdir().unwrap(); let storage = Storage::Local(LocalStorage::new(tmp_dir.path().to_str().unwrap())); - let block_cache = Cache::new(&CacheConfig::Unbounded(UnboundedCacheConfig {})); - let sparse_index_cache = Cache::new(&CacheConfig::Unbounded(UnboundedCacheConfig {})); + let block_cache = new_cache_for_test(); + let sparse_index_cache = new_cache_for_test(); let arrow_blockfile_provider = ArrowBlockfileProvider::new( storage, TEST_MAX_BLOCK_SIZE_BYTES, diff --git a/rust/worker/src/execution/operators/metadata_filtering.rs b/rust/worker/src/execution/operators/metadata_filtering.rs index ff0220e6314..1fbbba705d8 100644 --- a/rust/worker/src/execution/operators/metadata_filtering.rs +++ b/rust/worker/src/execution/operators/metadata_filtering.rs @@ -580,7 +580,7 @@ mod test { arrow::{config::TEST_MAX_BLOCK_SIZE_BYTES, provider::ArrowBlockfileProvider}, provider::BlockfileProvider, }; - use chroma_cache::{cache::Cache, config::CacheConfig, config::UnboundedCacheConfig}; + use chroma_cache::new_cache_for_test; use chroma_storage::{local::LocalStorage, Storage}; use chroma_types::{ BooleanOperator, Chunk, DirectDocumentComparison, DirectWhereComparison, DocumentOperator, @@ -595,8 +595,8 @@ mod test { async fn where_and_where_document_from_log() { let tmp_dir = tempfile::tempdir().unwrap(); let storage = Storage::Local(LocalStorage::new(tmp_dir.path().to_str().unwrap())); - let block_cache = Cache::new(&CacheConfig::Unbounded(UnboundedCacheConfig {})); - let sparse_index_cache = Cache::new(&CacheConfig::Unbounded(UnboundedCacheConfig {})); + let block_cache = new_cache_for_test(); + let sparse_index_cache = new_cache_for_test(); let arrow_blockfile_provider = ArrowBlockfileProvider::new( storage, TEST_MAX_BLOCK_SIZE_BYTES, @@ -803,8 +803,8 @@ mod test { async fn where_from_metadata_segment() { let tmp_dir = tempfile::tempdir().unwrap(); let storage = Storage::Local(LocalStorage::new(tmp_dir.path().to_str().unwrap())); - let block_cache = Cache::new(&CacheConfig::Unbounded(UnboundedCacheConfig {})); - let sparse_index_cache = Cache::new(&CacheConfig::Unbounded(UnboundedCacheConfig {})); + let block_cache = new_cache_for_test(); + let sparse_index_cache = new_cache_for_test(); let arrow_blockfile_provider = ArrowBlockfileProvider::new( storage, TEST_MAX_BLOCK_SIZE_BYTES, @@ -987,8 +987,8 @@ mod test { async fn query_ids_only() { let tmp_dir = tempfile::tempdir().unwrap(); let storage = Storage::Local(LocalStorage::new(tmp_dir.path().to_str().unwrap())); - let block_cache = Cache::new(&CacheConfig::Unbounded(UnboundedCacheConfig {})); - let sparse_index_cache = Cache::new(&CacheConfig::Unbounded(UnboundedCacheConfig {})); + let block_cache = new_cache_for_test(); + let sparse_index_cache = new_cache_for_test(); let arrow_blockfile_provider = ArrowBlockfileProvider::new( storage, TEST_MAX_BLOCK_SIZE_BYTES, @@ -1180,8 +1180,8 @@ mod test { async fn test_composite_filter() { let tmp_dir = tempfile::tempdir().unwrap(); let storage = Storage::Local(LocalStorage::new(tmp_dir.path().to_str().unwrap())); - let block_cache = Cache::new(&CacheConfig::Unbounded(UnboundedCacheConfig {})); - let sparse_index_cache = Cache::new(&CacheConfig::Unbounded(UnboundedCacheConfig {})); + let block_cache = new_cache_for_test(); + let sparse_index_cache = new_cache_for_test(); let arrow_blockfile_provider = ArrowBlockfileProvider::new( storage, TEST_MAX_BLOCK_SIZE_BYTES, diff --git a/rust/worker/src/segment/distributed_hnsw_segment.rs b/rust/worker/src/segment/distributed_hnsw_segment.rs index cd8ccb2aaf9..7c159dd771f 100644 --- a/rust/worker/src/segment/distributed_hnsw_segment.rs +++ b/rust/worker/src/segment/distributed_hnsw_segment.rs @@ -321,7 +321,10 @@ impl DistributedHNSWSegmentReader { }; let index = - match hnsw_index_provider.get(&index_uuid, &segment.collection) { + match hnsw_index_provider + .get(&index_uuid, &segment.collection) + .await + { Some(index) => index, None => { match hnsw_index_provider diff --git a/rust/worker/src/segment/metadata_segment.rs b/rust/worker/src/segment/metadata_segment.rs index 1bf157f6e3c..3205b3b5b3e 100644 --- a/rust/worker/src/segment/metadata_segment.rs +++ b/rust/worker/src/segment/metadata_segment.rs @@ -1269,10 +1269,7 @@ mod test { arrow::{config::TEST_MAX_BLOCK_SIZE_BYTES, provider::ArrowBlockfileProvider}, provider::BlockfileProvider, }; - use chroma_cache::{ - cache::Cache, - config::{CacheConfig, UnboundedCacheConfig}, - }; + use chroma_cache::new_cache_for_test; use chroma_storage::{local::LocalStorage, Storage}; use chroma_types::{ Chunk, DirectDocumentComparison, DirectWhereComparison, LogRecord, MetadataValue, @@ -1285,8 +1282,8 @@ mod test { async fn empty_blocks() { let tmp_dir = tempfile::tempdir().unwrap(); let storage = Storage::Local(LocalStorage::new(tmp_dir.path().to_str().unwrap())); - let block_cache = Cache::new(&CacheConfig::Unbounded(UnboundedCacheConfig {})); - let sparse_index_cache = Cache::new(&CacheConfig::Unbounded(UnboundedCacheConfig {})); + let block_cache = new_cache_for_test(); + let sparse_index_cache = new_cache_for_test(); let arrow_blockfile_provider = ArrowBlockfileProvider::new( storage, TEST_MAX_BLOCK_SIZE_BYTES, @@ -1571,8 +1568,8 @@ mod test { async fn metadata_update_same_key_different_type() { let tmp_dir = tempfile::tempdir().unwrap(); let storage = Storage::Local(LocalStorage::new(tmp_dir.path().to_str().unwrap())); - let block_cache = Cache::new(&CacheConfig::Unbounded(UnboundedCacheConfig {})); - let sparse_index_cache = Cache::new(&CacheConfig::Unbounded(UnboundedCacheConfig {})); + let block_cache = new_cache_for_test(); + let sparse_index_cache = new_cache_for_test(); let arrow_blockfile_provider = ArrowBlockfileProvider::new( storage, TEST_MAX_BLOCK_SIZE_BYTES, @@ -1823,8 +1820,8 @@ mod test { async fn metadata_deletes() { let tmp_dir = tempfile::tempdir().unwrap(); let storage = Storage::Local(LocalStorage::new(tmp_dir.path().to_str().unwrap())); - let block_cache = Cache::new(&CacheConfig::Unbounded(UnboundedCacheConfig {})); - let sparse_index_cache = Cache::new(&CacheConfig::Unbounded(UnboundedCacheConfig {})); + let block_cache = new_cache_for_test(); + let sparse_index_cache = new_cache_for_test(); let arrow_blockfile_provider = ArrowBlockfileProvider::new( storage, TEST_MAX_BLOCK_SIZE_BYTES, @@ -2044,8 +2041,8 @@ mod test { async fn document_updates() { let tmp_dir = tempfile::tempdir().unwrap(); let storage = Storage::Local(LocalStorage::new(tmp_dir.path().to_str().unwrap())); - let block_cache = Cache::new(&CacheConfig::Unbounded(UnboundedCacheConfig {})); - let sparse_index_cache = Cache::new(&CacheConfig::Unbounded(UnboundedCacheConfig {})); + let block_cache = new_cache_for_test(); + let sparse_index_cache = new_cache_for_test(); let arrow_blockfile_provider = ArrowBlockfileProvider::new( storage, TEST_MAX_BLOCK_SIZE_BYTES, diff --git a/rust/worker/src/segment/types.rs b/rust/worker/src/segment/types.rs index b28473bfba4..0a76b929d55 100644 --- a/rust/worker/src/segment/types.rs +++ b/rust/worker/src/segment/types.rs @@ -830,10 +830,7 @@ mod tests { arrow::{config::TEST_MAX_BLOCK_SIZE_BYTES, provider::ArrowBlockfileProvider}, provider::BlockfileProvider, }; - use chroma_cache::{ - cache::Cache, - config::{CacheConfig, UnboundedCacheConfig}, - }; + use chroma_cache::new_cache_for_test; use chroma_storage::{local::LocalStorage, Storage}; use chroma_types::{ DirectDocumentComparison, DirectWhereComparison, PrimitiveOperator, Where, WhereComparison, @@ -845,8 +842,8 @@ mod tests { async fn test_materializer_add_delete_upsert() { let tmp_dir = tempfile::tempdir().unwrap(); let storage = Storage::Local(LocalStorage::new(tmp_dir.path().to_str().unwrap())); - let block_cache = Cache::new(&CacheConfig::Unbounded(UnboundedCacheConfig {})); - let sparse_index_cache = Cache::new(&CacheConfig::Unbounded(UnboundedCacheConfig {})); + let block_cache = new_cache_for_test(); + let sparse_index_cache = new_cache_for_test(); let arrow_blockfile_provider = ArrowBlockfileProvider::new( storage, TEST_MAX_BLOCK_SIZE_BYTES, @@ -1134,8 +1131,8 @@ mod tests { async fn test_materializer_add_upsert() { let tmp_dir = tempfile::tempdir().unwrap(); let storage = Storage::Local(LocalStorage::new(tmp_dir.path().to_str().unwrap())); - let block_cache = Cache::new(&CacheConfig::Unbounded(UnboundedCacheConfig {})); - let sparse_index_cache = Cache::new(&CacheConfig::Unbounded(UnboundedCacheConfig {})); + let block_cache = new_cache_for_test(); + let sparse_index_cache = new_cache_for_test(); let arrow_blockfile_provider = ArrowBlockfileProvider::new( storage, TEST_MAX_BLOCK_SIZE_BYTES, @@ -1415,8 +1412,8 @@ mod tests { async fn test_materializer_add_delete_upsert_update() { let tmp_dir = tempfile::tempdir().unwrap(); let storage = Storage::Local(LocalStorage::new(tmp_dir.path().to_str().unwrap())); - let block_cache = Cache::new(&CacheConfig::Unbounded(UnboundedCacheConfig {})); - let sparse_index_cache = Cache::new(&CacheConfig::Unbounded(UnboundedCacheConfig {})); + let block_cache = new_cache_for_test(); + let sparse_index_cache = new_cache_for_test(); let arrow_blockfile_provider = ArrowBlockfileProvider::new( storage, TEST_MAX_BLOCK_SIZE_BYTES, @@ -1715,8 +1712,8 @@ mod tests { async fn test_materializer_basic() { let tmp_dir = tempfile::tempdir().unwrap(); let storage = Storage::Local(LocalStorage::new(tmp_dir.path().to_str().unwrap())); - let block_cache = Cache::new(&CacheConfig::Unbounded(UnboundedCacheConfig {})); - let sparse_index_cache = Cache::new(&CacheConfig::Unbounded(UnboundedCacheConfig {})); + let block_cache = new_cache_for_test(); + let sparse_index_cache = new_cache_for_test(); let arrow_blockfile_provider = ArrowBlockfileProvider::new( storage, TEST_MAX_BLOCK_SIZE_BYTES, diff --git a/rust/worker/src/server.rs b/rust/worker/src/server.rs index c4c85703be1..2e9731254ff 100644 --- a/rust/worker/src/server.rs +++ b/rust/worker/src/server.rs @@ -648,7 +648,8 @@ mod tests { use crate::sysdb::test_sysdb::TestSysDb; use crate::system; use chroma_blockstore::arrow::config::TEST_MAX_BLOCK_SIZE_BYTES; - use chroma_cache::{cache::Cache, config::CacheConfig, config::UnboundedCacheConfig}; + #[cfg(test)] + use chroma_cache::new_cache_for_test; use chroma_proto::debug_client::DebugClient; use chroma_storage::{local::LocalStorage, Storage}; use tempfile::tempdir; @@ -659,9 +660,9 @@ mod tests { let log = InMemoryLog::new(); let tmp_dir = tempdir().unwrap(); let storage = Storage::Local(LocalStorage::new(tmp_dir.path().to_str().unwrap())); - let block_cache = Cache::new(&CacheConfig::Unbounded(UnboundedCacheConfig {})); - let sparse_index_cache = Cache::new(&CacheConfig::Unbounded(UnboundedCacheConfig {})); - let hnsw_index_cache = Cache::new(&CacheConfig::Unbounded(UnboundedCacheConfig {})); + let block_cache = new_cache_for_test(); + let sparse_index_cache = new_cache_for_test(); + let hnsw_index_cache = new_cache_for_test(); let port = random_port::PortPicker::new().pick().unwrap(); let mut server = WorkerServer { dispatcher: None,