Skip to content

Commit

Permalink
[ENH] Disk and memory-backed cache with Foyer 0.10. (#2890)
Browse files Browse the repository at this point in the history
## Description of changes

The lfu/lru/weighted_lfu caches didn't initialize a hybrid cache. This
PR rewrites the chroma_cache crate to hide implementation details of the
cache as much as possible. Noteworthy callouts from this PR:

- We don't currently support cleaning up of tempfiles evicted from
cache. This is status-quo.
- cops-disk-cache-config-writer and cops-memory-cache-config-writer
provide command-line arguments to generate the YAML for the Foyer cache.
- Cache<K, V> is a trait. PersistentCache<K, V> allows for requiring
persistence. All PersistentCache<K, V> can be cast to Cache<K, V>.

## Test plan
*How are these changes tested?*

- [] Tests pass locally with `pytest` for python, `yarn test` for js,
`cargo test` for rust

## Documentation Changes
*Are all docstrings for user-facing APIs updated if required? Do we need
to make documentation changes in the [docs
repository](https://github.com/chroma-core/docs)?*
  • Loading branch information
rescrv authored Oct 4, 2024
1 parent 200c0c4 commit 8e9dbc9
Show file tree
Hide file tree
Showing 30 changed files with 1,205 additions and 594 deletions.
221 changes: 159 additions & 62 deletions Cargo.lock

Large diffs are not rendered by default.

20 changes: 9 additions & 11 deletions rust/blockstore/src/arrow/block/delta/delta.rs
Original file line number Diff line number Diff line change
Expand Up @@ -127,10 +127,8 @@ impl BlockDelta {
#[cfg(test)]
mod test {
use crate::arrow::{block::Block, config::TEST_MAX_BLOCK_SIZE_BYTES, provider::BlockManager};
use chroma_cache::{
cache::Cache,
config::{CacheConfig, UnboundedCacheConfig},
};
#[cfg(test)]
use chroma_cache::new_cache_for_test;
use chroma_storage::{local::LocalStorage, Storage};
use chroma_types::{DataRecord, MetadataValue};
use rand::{random, Rng};
Expand All @@ -157,7 +155,7 @@ mod test {
let tmp_dir = tempfile::tempdir().unwrap();
let path = tmp_dir.path().to_str().unwrap();
let storage = Storage::Local(LocalStorage::new(path));
let cache = Cache::new(&CacheConfig::Unbounded(UnboundedCacheConfig {}));
let cache = new_cache_for_test();
let block_manager = BlockManager::new(storage, TEST_MAX_BLOCK_SIZE_BYTES, cache);
let delta = block_manager.create::<&str, Vec<u32>>();

Expand Down Expand Up @@ -199,7 +197,7 @@ mod test {
let tmp_dir = tempfile::tempdir().unwrap();
let path = tmp_dir.path().to_str().unwrap();
let storage = Storage::Local(LocalStorage::new(tmp_dir.path().to_str().unwrap()));
let cache = Cache::new(&CacheConfig::Unbounded(UnboundedCacheConfig {}));
let cache = new_cache_for_test();
let block_manager = BlockManager::new(storage, TEST_MAX_BLOCK_SIZE_BYTES, cache);
let delta = block_manager.create::<&str, String>();
let delta_id = delta.id;
Expand Down Expand Up @@ -260,7 +258,7 @@ mod test {
let tmp_dir = tempfile::tempdir().unwrap();
let path = tmp_dir.path().to_str().unwrap();
let storage = Storage::Local(LocalStorage::new(path));
let cache = Cache::new(&CacheConfig::Unbounded(UnboundedCacheConfig {}));
let cache = new_cache_for_test();
let block_manager = BlockManager::new(storage, TEST_MAX_BLOCK_SIZE_BYTES, cache);
let delta = block_manager.create::<f32, String>();

Expand Down Expand Up @@ -299,7 +297,7 @@ mod test {
let tmp_dir = tempfile::tempdir().unwrap();
let path = tmp_dir.path().to_str().unwrap();
let storage = Storage::Local(LocalStorage::new(path));
let cache = Cache::new(&CacheConfig::Unbounded(UnboundedCacheConfig {}));
let cache = new_cache_for_test();
let block_manager = BlockManager::new(storage, TEST_MAX_BLOCK_SIZE_BYTES, cache);
let delta = block_manager.create::<&str, RoaringBitmap>();

Expand Down Expand Up @@ -335,7 +333,7 @@ mod test {
let tmp_dir = tempfile::tempdir().unwrap();
let path = tmp_dir.path().to_str().unwrap();
let storage = Storage::Local(LocalStorage::new(path));
let cache = Cache::new(&CacheConfig::Unbounded(UnboundedCacheConfig {}));
let cache = new_cache_for_test();
let block_manager = BlockManager::new(storage, TEST_MAX_BLOCK_SIZE_BYTES, cache);
let ids = ["embedding_id_2", "embedding_id_0", "embedding_id_1"];
let embeddings = [
Expand Down Expand Up @@ -399,7 +397,7 @@ mod test {
let tmp_dir = tempfile::tempdir().unwrap();
let path = tmp_dir.path().to_str().unwrap();
let storage = Storage::Local(LocalStorage::new(path));
let cache = Cache::new(&CacheConfig::Unbounded(UnboundedCacheConfig {}));
let cache = new_cache_for_test();
let block_manager = BlockManager::new(storage, TEST_MAX_BLOCK_SIZE_BYTES, cache);
let delta = block_manager.create::<u32, String>();

Expand Down Expand Up @@ -427,7 +425,7 @@ mod test {
let tmp_dir = tempfile::tempdir().unwrap();
let path = tmp_dir.path().to_str().unwrap();
let storage = Storage::Local(LocalStorage::new(tmp_dir.path().to_str().unwrap()));
let cache = Cache::new(&CacheConfig::Unbounded(UnboundedCacheConfig {}));
let cache = new_cache_for_test();
let block_manager = BlockManager::new(storage, TEST_MAX_BLOCK_SIZE_BYTES, cache);
let delta = block_manager.create::<u32, u32>();
let delta_id = delta.id;
Expand Down
113 changes: 94 additions & 19 deletions rust/blockstore/src/arrow/block/types.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
use super::delta::BlockDelta;
use std::cmp::Ordering::{Equal, Greater, Less};
use std::io::SeekFrom;

use crate::arrow::types::{ArrowReadableKey, ArrowReadableValue};
use arrow::array::ArrayData;
use arrow::buffer::Buffer;
Expand All @@ -9,15 +11,65 @@ use arrow::{
array::{Array, StringArray},
record_batch::RecordBatch,
};
use chroma_cache::cache::Cacheable;
use chroma_error::{ChromaError, ErrorCodes};
use std::cmp::Ordering::{Equal, Greater, Less};
use std::io::SeekFrom;
use serde::de::Error as DeError;
use serde::ser::Error as SerError;
use serde::{Deserialize, Serialize};
use thiserror::Error;
use uuid::Uuid;

use super::delta::BlockDelta;

const ARROW_ALIGNMENT: usize = 64;

/// A RecordBatchWrapper looks like a record batch, but also implements serde's Serialize and
/// Deserialize.
#[derive(Clone, Debug)]
#[repr(transparent)]
pub struct RecordBatchWrapper(pub RecordBatch);

impl std::ops::Deref for RecordBatchWrapper {
type Target = RecordBatch;

fn deref(&self) -> &Self::Target {
&self.0
}
}

impl std::ops::DerefMut for RecordBatchWrapper {
fn deref_mut(&mut self) -> &mut Self::Target {
&mut self.0
}
}

impl From<RecordBatch> for RecordBatchWrapper {
fn from(rb: RecordBatch) -> Self {
Self(rb)
}
}

impl Serialize for RecordBatchWrapper {
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
where
S: serde::Serializer,
{
let data = Block::record_batch_to_bytes(self).map_err(S::Error::custom)?;
serializer.serialize_bytes(&data)
}
}

impl<'de> Deserialize<'de> for RecordBatchWrapper {
fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
where
D: serde::Deserializer<'de>,
{
let data = Vec::<u8>::deserialize(deserializer)?;
let reader = std::io::Cursor::new(data);
let rb = Block::load_record_batch(reader, false).map_err(D::Error::custom)?;
Ok(RecordBatchWrapper(rb))
}
}

/// A block in a blockfile. A block is a sorted collection of data that is immutable once it has been committed.
/// Blocks are the fundamental unit of storage in the blockstore and are used to store data in the form of (key, value) pairs.
/// These pairs are stored in an Arrow record batch with the schema (prefix, key, value).
Expand All @@ -29,19 +81,18 @@ const ARROW_ALIGNMENT: usize = 64;
/// A Block holds BlockData via its Inner. Conceptually, the BlockData being loaded into memory is an optimization. The Block interface
/// could also support out of core operations where the BlockData is loaded from disk on demand. Currently we force operations to be in-core
/// but could expand to out-of-core in the future.
#[derive(Clone, Debug)]
#[derive(Clone, Debug, Deserialize, Serialize)]
pub struct Block {
// The data is stored in an Arrow record batch with the column schema (prefix, key, value).
// These are stored in sorted order by prefix and key for efficient lookups.
pub data: RecordBatch,
pub data: RecordBatchWrapper,
pub id: Uuid,
}

impl Cacheable for Block {}

impl Block {
/// Create a concrete block from an id and the underlying record batch of data
pub fn from_record_batch(id: Uuid, data: RecordBatch) -> Self {
let data = data.into();
Self { id, data }
}

Expand Down Expand Up @@ -369,17 +420,22 @@ impl Block {

/// Convert the block to bytes in Arrow IPC format
pub fn to_bytes(&self) -> Result<Vec<u8>, BlockToBytesError> {
Self::record_batch_to_bytes(&self.data)
}

/// Convert the record batch to bytes in Arrow IPC format
fn record_batch_to_bytes(rb: &RecordBatch) -> Result<Vec<u8>, BlockToBytesError> {
let mut bytes = Vec::new();
// Scope the writer so that it is dropped before we return the bytes
{
let mut writer =
match arrow::ipc::writer::FileWriter::try_new(&mut bytes, &self.data.schema()) {
Ok(writer) => writer,
Err(e) => {
return Err(BlockToBytesError::ArrowError(e));
}
};
match writer.write(&self.data) {
let mut writer = match arrow::ipc::writer::FileWriter::try_new(&mut bytes, &rb.schema())
{
Ok(writer) => writer,
Err(e) => {
return Err(BlockToBytesError::ArrowError(e));
}
};
match writer.write(rb) {
Ok(_) => {}
Err(e) => {
return Err(BlockToBytesError::ArrowError(e));
Expand Down Expand Up @@ -438,7 +494,16 @@ impl Block {
Self::load_with_reader(reader, id, validate)
}

fn load_with_reader<R>(mut reader: R, id: Uuid, validate: bool) -> Result<Self, BlockLoadError>
fn load_with_reader<R>(reader: R, id: Uuid, validate: bool) -> Result<Self, BlockLoadError>
where
R: std::io::Read + std::io::Seek,
{
let batch = Self::load_record_batch(reader, validate)?;
// TODO: how to store / hydrate id?
Ok(Self::from_record_batch(id, batch))
}

fn load_record_batch<R>(mut reader: R, validate: bool) -> Result<RecordBatch, BlockLoadError>
where
R: std::io::Read + std::io::Seek,
{
Expand All @@ -459,9 +524,13 @@ impl Block {
return Err(BlockLoadError::NoRecordBatches);
}
};
Ok(batch)
}
}

// TODO: how to store / hydrate id?
Ok(Self::from_record_batch(id, batch))
impl chroma_cache::Weighted for Block {
fn weight(&self) -> usize {
1
}
}

Expand Down Expand Up @@ -550,6 +619,10 @@ pub enum BlockLoadError {
ArrowLayoutVerificationError(#[from] ArrowLayoutVerificationError),
#[error("No record batches in IPC file")]
NoRecordBatches,
#[error(transparent)]
BlockToBytesError(#[from] crate::arrow::block::types::BlockToBytesError),
#[error(transparent)]
CacheError(#[from] chroma_cache::CacheError),
}

impl ChromaError for BlockLoadError {
Expand All @@ -559,6 +632,8 @@ impl ChromaError for BlockLoadError {
BlockLoadError::ArrowError(_) => ErrorCodes::Internal,
BlockLoadError::ArrowLayoutVerificationError(_) => ErrorCodes::Internal,
BlockLoadError::NoRecordBatches => ErrorCodes::Internal,
BlockLoadError::BlockToBytesError(_) => ErrorCodes::Internal,
BlockLoadError::CacheError(_) => ErrorCodes::Internal,
}
}
}
Expand Down
Loading

0 comments on commit 8e9dbc9

Please sign in to comment.