Skip to content

Commit

Permalink
[CHORE] Remove the need to pop from the disk cache during compaction (#…
Browse files Browse the repository at this point in the history
…2865)

## Description of changes

*Summarize the changes made by this PR.*
- Compaction doesn't need to use the cache.pop. We know exactly which
compactions we trigger.
 - This will unblock upgrading foyer within chroma.

## Test plan
*How are these changes tested?*

- [ ] `cargo check` locally + ci

## Documentation Changes
*Are all docstrings for user-facing APIs updated if required? Do we need
to make documentation changes in the [docs
repository](https://github.com/chroma-core/docs)?*

---------

Co-authored-by: Max Isom <[email protected]>
  • Loading branch information
rescrv and codetheweb authored Sep 26, 2024
1 parent 5fb6085 commit bed6a22
Show file tree
Hide file tree
Showing 3 changed files with 18 additions and 11 deletions.
7 changes: 3 additions & 4 deletions rust/index/src/hnsw_provider.rs
Original file line number Diff line number Diff line change
Expand Up @@ -428,10 +428,9 @@ impl HnswIndexProvider {
Ok(())
}

/// Purge all entries from the cache and remove temporary files from disk.
pub async fn purge_all_entries(&mut self) {
while let Some((_, index)) = self.cache.pop() {
let index_id = index.inner.read().id;
/// Purge entries from the cache by index ID and remove temporary files from disk.
pub async fn purge_by_id(&mut self, index_ids: &[Uuid]) {
for index_id in index_ids {
match self.remove_temporary_files(&index_id).await {
Ok(_) => {}
Err(e) => {
Expand Down
16 changes: 12 additions & 4 deletions rust/worker/src/compactor/compaction_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ use tracing::instrument;
use tracing::span;
use tracing::Instrument;
use tracing::Span;
use uuid::Uuid;

pub(crate) struct CompactionManager {
system: Option<System>,
Expand Down Expand Up @@ -145,7 +146,7 @@ impl CompactionManager {

// TODO: make the return type more informative
#[instrument(name = "CompactionManager::compact_batch")]
pub(crate) async fn compact_batch(&mut self) -> (u32, u32) {
pub(crate) async fn compact_batch(&mut self, compacted: &mut Vec<Uuid>) -> (u32, u32) {
self.scheduler.schedule().await;
let mut jobs = FuturesUnordered::new();
for job in self.scheduler.get_jobs() {
Expand All @@ -161,6 +162,7 @@ impl CompactionManager {
match job {
Ok(result) => {
println!("Compaction completed: {:?}", result);
compacted.push(result.compaction_job.collection_id);
num_completed_jobs += 1;
}
Err(e) => {
Expand Down Expand Up @@ -298,9 +300,10 @@ impl Handler<ScheduleMessage> for CompactionManager {
ctx: &ComponentContext<CompactionManager>,
) {
println!("CompactionManager: Performing compaction");
self.compact_batch().await;
let mut ids = Vec::new();
self.compact_batch(&mut ids).await;

self.hnsw_index_provider.purge_all_entries().await;
self.hnsw_index_provider.purge_by_id(&ids).await;
self.blockfile_provider.clear();

// Compaction is done, schedule the next compaction
Expand Down Expand Up @@ -549,8 +552,13 @@ mod tests {
let dispatcher_handle = system.start_component(dispatcher);
manager.set_dispatcher(dispatcher_handle);
manager.set_system(system);
let (num_completed, number_failed) = manager.compact_batch().await;
let mut compacted = vec![];
let (num_completed, number_failed) = manager.compact_batch(&mut compacted).await;
assert_eq!(num_completed, 2);
assert_eq!(number_failed, 0);
assert!(
(compacted == vec![collection_uuid_1, collection_uuid_2])
|| (compacted == vec![collection_uuid_2, collection_uuid_1])
);
}
}
6 changes: 3 additions & 3 deletions rust/worker/src/execution/orchestration/compact.rs
Original file line number Diff line number Diff line change
Expand Up @@ -152,9 +152,9 @@ impl ChromaError for CompactionError {
// TODO: we need to improve this response
#[derive(Debug)]
pub struct CompactionResponse {
id: Uuid,
compaction_job: CompactionJob,
message: String,
pub(crate) id: Uuid,
pub(crate) compaction_job: CompactionJob,
pub(crate) message: String,
}

impl CompactOrchestrator {
Expand Down

0 comments on commit bed6a22

Please sign in to comment.