Skip to content

Commit

Permalink
[ENH] Use the log position from sysdb to pull log
Browse files Browse the repository at this point in the history
  • Loading branch information
Ishiihara committed May 6, 2024
1 parent 73c23c6 commit 24fb408
Show file tree
Hide file tree
Showing 8 changed files with 148 additions and 10 deletions.
4 changes: 2 additions & 2 deletions rust/worker/src/compactor/compaction_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -354,7 +354,7 @@ mod tests {
dimension: Some(1),
tenant: tenant_1.clone(),
database: "database_1".to_string(),
log_position: 0,
log_position: -1,
version: 0,
};

Expand All @@ -366,7 +366,7 @@ mod tests {
dimension: Some(1),
tenant: tenant_2.clone(),
database: "database_2".to_string(),
log_position: 0,
log_position: -1,
version: 0,
};
sysdb.add_collection(collection_1);
Expand Down
139 changes: 138 additions & 1 deletion rust/worker/src/compactor/scheduler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@ impl Scheduler {
}

// TODO: make querying the last compaction time in batch
let log_position_in_collecion = collection[0].log_position;
let tenant_ids = vec![collection[0].tenant.clone()];
let tenant = self.sysdb.get_last_compaction_time(tenant_ids).await;

Expand All @@ -96,12 +97,26 @@ impl Scheduler {
}
};

let mut offset = collection_info.first_log_offset;
// ofsset in log is the first offset in the log that has not been compacted. Note that
// since the offset is the first offset of log we get from the log service, we should
// use this offset to pull data from the log service.
if log_position_in_collecion + 1 < offset {
panic!(
"offset in sysdb is less than offset in log, this should not happen!"
)
} else {
// The offset in sysdb is the last offset that has been compacted.
// We need to start from the next offset.
offset = log_position_in_collecion + 1;
}

collection_records.push(CollectionRecord {
id: collection[0].id,
tenant_id: collection[0].tenant.clone(),
last_compaction_time,
first_record_time: collection_info.first_log_ts,
offset: collection_info.first_log_offset,
offset,
collection_version: collection[0].version,
});
}
Expand Down Expand Up @@ -316,4 +331,126 @@ mod tests {
let jobs = scheduler.get_jobs();
assert_eq!(jobs.count(), 1);
}

#[tokio::test]
#[should_panic(
expected = "offset in sysdb is less than offset in log, this should not happen!"
)]
async fn test_scheduler_panic() {
let mut log = Box::new(InMemoryLog::new());

let collection_uuid_1 = Uuid::from_str("00000000-0000-0000-0000-000000000001").unwrap();
log.add_log(
collection_uuid_1.clone(),
Box::new(InternalLogRecord {
collection_id: collection_uuid_1.clone(),
log_offset: 0,
log_ts: 1,
record: LogRecord {
log_offset: 0,
record: OperationRecord {
id: "embedding_id_1".to_string(),
embedding: None,
encoding: None,
metadata: None,
operation: Operation::Add,
},
},
}),
);
log.add_log(
collection_uuid_1.clone(),
Box::new(InternalLogRecord {
collection_id: collection_uuid_1.clone(),
log_offset: 1,
log_ts: 2,
record: LogRecord {
log_offset: 1,
record: OperationRecord {
id: "embedding_id_1".to_string(),
embedding: None,
encoding: None,
metadata: None,
operation: Operation::Add,
},
},
}),
);
log.add_log(
collection_uuid_1.clone(),
Box::new(InternalLogRecord {
collection_id: collection_uuid_1.clone(),
log_offset: 2,
log_ts: 3,
record: LogRecord {
log_offset: 2,
record: OperationRecord {
id: "embedding_id_1".to_string(),
embedding: None,
encoding: None,
metadata: None,
operation: Operation::Add,
},
},
}),
);
log.add_log(
collection_uuid_1.clone(),
Box::new(InternalLogRecord {
collection_id: collection_uuid_1.clone(),
log_offset: 3,
log_ts: 4,
record: LogRecord {
log_offset: 3,
record: OperationRecord {
id: "embedding_id_1".to_string(),
embedding: None,
encoding: None,
metadata: None,
operation: Operation::Add,
},
},
}),
);
let _ = log.update_collection_log_offset(collection_uuid_1, 2).await;

let mut sysdb = Box::new(TestSysDb::new());

let tenant_1 = "tenant_1".to_string();
let collection_1 = Collection {
id: collection_uuid_1,
name: "collection_1".to_string(),
metadata: None,
dimension: Some(1),
tenant: tenant_1.clone(),
database: "database_1".to_string(),
log_position: 0,
version: 0,
};

sysdb.add_collection(collection_1);

let last_compaction_time_1 = 2;
sysdb.add_tenant_last_compaction_time(tenant_1, last_compaction_time_1);

let my_ip = "0.0.0.1".to_string();
let scheduler_policy = Box::new(LasCompactionTimeSchedulerPolicy {});
let max_concurrent_jobs = 1000;

// Set assignment policy
let mut assignment_policy = Box::new(RendezvousHashingAssignmentPolicy::new());
assignment_policy.set_members(vec![my_ip.clone()]);

let mut scheduler = Scheduler::new(
my_ip.clone(),
log,
sysdb.clone(),
scheduler_policy,
max_concurrent_jobs,
assignment_policy,
);

scheduler.set_memberlist(vec![my_ip.clone()]);
scheduler.schedule().await;
}
}
2 changes: 1 addition & 1 deletion rust/worker/src/execution/operators/brute_force_knn.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use crate::{distance::DistanceFunction, execution::operator::Operator};
use async_trait::async_trait;
use std::cmp::Ordering;
use std::collections::BinaryHeap;
use tracing::{debug, trace};
use tracing::trace;

/// The brute force k-nearest neighbors operator is responsible for computing the k-nearest neighbors
/// of a given query vector against a set of vectors using brute force calculation.
Expand Down
2 changes: 0 additions & 2 deletions rust/worker/src/execution/operators/merge_knn_results.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,3 @@
use std::f64::consts::E;

use crate::{
blockstore::provider::BlockfileProvider,
errors::ChromaError,
Expand Down
1 change: 0 additions & 1 deletion rust/worker/src/execution/operators/pull_log.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ use crate::log::log::Log;
use crate::log::log::PullLogsError;
use crate::types::LogRecord;
use async_trait::async_trait;
use tracing::debug;
use tracing::trace;
use uuid::Uuid;

Expand Down
4 changes: 4 additions & 0 deletions rust/worker/src/execution/operators/register.rs
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,10 @@ impl Operator<RegisterInput, RegisterOutput> for RegisterOperator {
input.segment_flush_info.clone(),
)
.await;

// We must make sure that the log postion in sysdb is always greater than or equal to the log position
// in the log service. If the log position in sysdb is less than the log position in the log service,
// the we may lose data in compaction.
let sysdb_registration_result = match result {
Ok(response) => response,
Err(error) => return Err(RegisterError::FlushCompactionError(error)),
Expand Down
5 changes: 2 additions & 3 deletions rust/worker/src/execution/orchestration/compact.rs
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,6 @@ impl CompactOrchestrator {
// TODO: It is possible that the offset_id from the compaction job is wrong since the log service
// can have an outdated view of the offset. We should filter out entries from the log based on the start offset
// of the segment, and not fully respect the offset_id from the compaction job

async fn pull_logs(&mut self, self_address: Box<dyn Receiver<PullLogsResult>>) {
self.state = ExecutionState::PullLogs;
let operator = PullLogsOperator::new(self.log.clone());
Expand Down Expand Up @@ -268,7 +267,7 @@ impl CompactOrchestrator {
}
}

async fn flush_sysdb(
async fn register(
&mut self,
log_position: i64,
segment_flush_info: Arc<[SegmentFlushInfo]>,
Expand Down Expand Up @@ -522,7 +521,7 @@ impl Handler<FlushS3Result> for CompactOrchestrator {
match message {
Ok(msg) => {
// Unwrap should be safe here as we are guaranteed to have a value by construction
self.flush_sysdb(
self.register(
self.pulled_log_offset.unwrap(),
msg.segment_flush_info,
_ctx.sender.as_receiver(),
Expand Down
1 change: 1 addition & 0 deletions rust/worker/src/log/log.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ use uuid::Uuid;
/// - collection_id: the id of the collection that needs to be compacted
/// - first_log_offset: the offset of the first log entry in the collection that needs to be compacted
/// - first_log_ts: the timestamp of the first log entry in the collection that needs to be compacted
#[derive(Debug)]
pub(crate) struct CollectionInfo {
pub(crate) collection_id: String,
pub(crate) first_log_offset: i64,
Expand Down

0 comments on commit 24fb408

Please sign in to comment.