From d5534bb498502eb0acd5422b6046d2999e617acc Mon Sep 17 00:00:00 2001 From: Liquan Pei Date: Fri, 3 May 2024 14:58:17 -0700 Subject: [PATCH] [ENH] Use the log position from sysdb to pull log --- .../src/compactor/compaction_manager.rs | 4 +- rust/worker/src/compactor/scheduler.rs | 139 +++++++++++++++++- .../execution/operators/brute_force_knn.rs | 2 +- .../execution/operators/merge_knn_results.rs | 2 - .../src/execution/operators/pull_log.rs | 1 - .../src/execution/operators/register.rs | 4 + .../src/execution/orchestration/compact.rs | 5 +- rust/worker/src/log/log.rs | 1 + 8 files changed, 148 insertions(+), 10 deletions(-) diff --git a/rust/worker/src/compactor/compaction_manager.rs b/rust/worker/src/compactor/compaction_manager.rs index bea1dcfd4b7..14bbbc5de31 100644 --- a/rust/worker/src/compactor/compaction_manager.rs +++ b/rust/worker/src/compactor/compaction_manager.rs @@ -354,7 +354,7 @@ mod tests { dimension: Some(1), tenant: tenant_1.clone(), database: "database_1".to_string(), - log_position: 0, + log_position: -1, version: 0, }; @@ -366,7 +366,7 @@ mod tests { dimension: Some(1), tenant: tenant_2.clone(), database: "database_2".to_string(), - log_position: 0, + log_position: -1, version: 0, }; sysdb.add_collection(collection_1); diff --git a/rust/worker/src/compactor/scheduler.rs b/rust/worker/src/compactor/scheduler.rs index 6fa1bd95308..343db7ed8fc 100644 --- a/rust/worker/src/compactor/scheduler.rs +++ b/rust/worker/src/compactor/scheduler.rs @@ -82,6 +82,7 @@ impl Scheduler { } // TODO: make querying the last compaction time in batch + let log_position_in_collecion = collection[0].log_position; let tenant_ids = vec![collection[0].tenant.clone()]; let tenant = self.sysdb.get_last_compaction_time(tenant_ids).await; @@ -96,12 +97,26 @@ impl Scheduler { } }; + let mut offset = collection_info.first_log_offset; + // offset in log is the first offset in the log that has not been compacted. Note that + // since the offset is the first offset of log we get from the log service, we should + // use this offset to pull data from the log service. + if log_position_in_collecion + 1 < offset { + panic!( + "offset in sysdb is less than offset in log, this should not happen!" + ) + } else { + // The offset in sysdb is the last offset that has been compacted. + // We need to start from the next offset. + offset = log_position_in_collecion + 1; + } + collection_records.push(CollectionRecord { id: collection[0].id, tenant_id: collection[0].tenant.clone(), last_compaction_time, first_record_time: collection_info.first_log_ts, - offset: collection_info.first_log_offset, + offset, collection_version: collection[0].version, }); } @@ -316,4 +331,126 @@ mod tests { let jobs = scheduler.get_jobs(); assert_eq!(jobs.count(), 1); } + + #[tokio::test] + #[should_panic( + expected = "offset in sysdb is less than offset in log, this should not happen!" + )] + async fn test_scheduler_panic() { + let mut log = Box::new(InMemoryLog::new()); + + let collection_uuid_1 = Uuid::from_str("00000000-0000-0000-0000-000000000001").unwrap(); + log.add_log( + collection_uuid_1.clone(), + Box::new(InternalLogRecord { + collection_id: collection_uuid_1.clone(), + log_offset: 0, + log_ts: 1, + record: LogRecord { + log_offset: 0, + record: OperationRecord { + id: "embedding_id_1".to_string(), + embedding: None, + encoding: None, + metadata: None, + operation: Operation::Add, + }, + }, + }), + ); + log.add_log( + collection_uuid_1.clone(), + Box::new(InternalLogRecord { + collection_id: collection_uuid_1.clone(), + log_offset: 1, + log_ts: 2, + record: LogRecord { + log_offset: 1, + record: OperationRecord { + id: "embedding_id_1".to_string(), + embedding: None, + encoding: None, + metadata: None, + operation: Operation::Add, + }, + }, + }), + ); + log.add_log( + collection_uuid_1.clone(), + Box::new(InternalLogRecord { + collection_id: collection_uuid_1.clone(), + log_offset: 2, + log_ts: 3, + record: LogRecord { + log_offset: 2, + record: OperationRecord { + id: "embedding_id_1".to_string(), + embedding: None, + encoding: None, + metadata: None, + operation: Operation::Add, + }, + }, + }), + ); + log.add_log( + collection_uuid_1.clone(), + Box::new(InternalLogRecord { + collection_id: collection_uuid_1.clone(), + log_offset: 3, + log_ts: 4, + record: LogRecord { + log_offset: 3, + record: OperationRecord { + id: "embedding_id_1".to_string(), + embedding: None, + encoding: None, + metadata: None, + operation: Operation::Add, + }, + }, + }), + ); + let _ = log.update_collection_log_offset(collection_uuid_1, 2).await; + + let mut sysdb = Box::new(TestSysDb::new()); + + let tenant_1 = "tenant_1".to_string(); + let collection_1 = Collection { + id: collection_uuid_1, + name: "collection_1".to_string(), + metadata: None, + dimension: Some(1), + tenant: tenant_1.clone(), + database: "database_1".to_string(), + log_position: 0, + version: 0, + }; + + sysdb.add_collection(collection_1); + + let last_compaction_time_1 = 2; + sysdb.add_tenant_last_compaction_time(tenant_1, last_compaction_time_1); + + let my_ip = "0.0.0.1".to_string(); + let scheduler_policy = Box::new(LasCompactionTimeSchedulerPolicy {}); + let max_concurrent_jobs = 1000; + + // Set assignment policy + let mut assignment_policy = Box::new(RendezvousHashingAssignmentPolicy::new()); + assignment_policy.set_members(vec![my_ip.clone()]); + + let mut scheduler = Scheduler::new( + my_ip.clone(), + log, + sysdb.clone(), + scheduler_policy, + max_concurrent_jobs, + assignment_policy, + ); + + scheduler.set_memberlist(vec![my_ip.clone()]); + scheduler.schedule().await; + } } diff --git a/rust/worker/src/execution/operators/brute_force_knn.rs b/rust/worker/src/execution/operators/brute_force_knn.rs index 091fad6440e..7b2c15566cc 100644 --- a/rust/worker/src/execution/operators/brute_force_knn.rs +++ b/rust/worker/src/execution/operators/brute_force_knn.rs @@ -4,7 +4,7 @@ use crate::{distance::DistanceFunction, execution::operator::Operator}; use async_trait::async_trait; use std::cmp::Ordering; use std::collections::BinaryHeap; -use tracing::{debug, trace}; +use tracing::trace; /// The brute force k-nearest neighbors operator is responsible for computing the k-nearest neighbors /// of a given query vector against a set of vectors using brute force calculation. diff --git a/rust/worker/src/execution/operators/merge_knn_results.rs b/rust/worker/src/execution/operators/merge_knn_results.rs index 31ce80622a4..c23054a6080 100644 --- a/rust/worker/src/execution/operators/merge_knn_results.rs +++ b/rust/worker/src/execution/operators/merge_knn_results.rs @@ -1,5 +1,3 @@ -use std::f64::consts::E; - use crate::{ blockstore::provider::BlockfileProvider, errors::ChromaError, diff --git a/rust/worker/src/execution/operators/pull_log.rs b/rust/worker/src/execution/operators/pull_log.rs index f62e861f7e4..750d394607d 100644 --- a/rust/worker/src/execution/operators/pull_log.rs +++ b/rust/worker/src/execution/operators/pull_log.rs @@ -4,7 +4,6 @@ use crate::log::log::Log; use crate::log::log::PullLogsError; use crate::types::LogRecord; use async_trait::async_trait; -use tracing::debug; use tracing::trace; use uuid::Uuid; diff --git a/rust/worker/src/execution/operators/register.rs b/rust/worker/src/execution/operators/register.rs index 5d97508a29e..e43b75c7984 100644 --- a/rust/worker/src/execution/operators/register.rs +++ b/rust/worker/src/execution/operators/register.rs @@ -115,6 +115,10 @@ impl Operator for RegisterOperator { input.segment_flush_info.clone(), ) .await; + + // We must make sure that the log postion in sysdb is always greater than or equal to the log position + // in the log service. If the log position in sysdb is less than the log position in the log service, + // the we may lose data in compaction. let sysdb_registration_result = match result { Ok(response) => response, Err(error) => return Err(RegisterError::FlushCompactionError(error)), diff --git a/rust/worker/src/execution/orchestration/compact.rs b/rust/worker/src/execution/orchestration/compact.rs index da2cd3d497a..f292eeaf856 100644 --- a/rust/worker/src/execution/orchestration/compact.rs +++ b/rust/worker/src/execution/orchestration/compact.rs @@ -162,7 +162,6 @@ impl CompactOrchestrator { // TODO: It is possible that the offset_id from the compaction job is wrong since the log service // can have an outdated view of the offset. We should filter out entries from the log based on the start offset // of the segment, and not fully respect the offset_id from the compaction job - async fn pull_logs(&mut self, self_address: Box>) { self.state = ExecutionState::PullLogs; let operator = PullLogsOperator::new(self.log.clone()); @@ -268,7 +267,7 @@ impl CompactOrchestrator { } } - async fn flush_sysdb( + async fn register( &mut self, log_position: i64, segment_flush_info: Arc<[SegmentFlushInfo]>, @@ -522,7 +521,7 @@ impl Handler for CompactOrchestrator { match message { Ok(msg) => { // Unwrap should be safe here as we are guaranteed to have a value by construction - self.flush_sysdb( + self.register( self.pulled_log_offset.unwrap(), msg.segment_flush_info, _ctx.sender.as_receiver(), diff --git a/rust/worker/src/log/log.rs b/rust/worker/src/log/log.rs index 525537515bf..927e740cbf6 100644 --- a/rust/worker/src/log/log.rs +++ b/rust/worker/src/log/log.rs @@ -18,6 +18,7 @@ use uuid::Uuid; /// - collection_id: the id of the collection that needs to be compacted /// - first_log_offset: the offset of the first log entry in the collection that needs to be compacted /// - first_log_ts: the timestamp of the first log entry in the collection that needs to be compacted +#[derive(Debug)] pub(crate) struct CollectionInfo { pub(crate) collection_id: String, pub(crate) first_log_offset: i64,