Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[ENH] Use log position from sysdb to as the offset to pull from log #2132

Merged
merged 1 commit into from
May 6, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions rust/worker/src/compactor/compaction_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -354,7 +354,7 @@ mod tests {
dimension: Some(1),
tenant: tenant_1.clone(),
database: "database_1".to_string(),
log_position: 0,
log_position: -1,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this need to be -1?

version: 0,
};

Expand All @@ -366,7 +366,7 @@ mod tests {
dimension: Some(1),
tenant: tenant_2.clone(),
database: "database_2".to_string(),
log_position: 0,
log_position: -1,
version: 0,
};
sysdb.add_collection(collection_1);
Expand Down
139 changes: 138 additions & 1 deletion rust/worker/src/compactor/scheduler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@ impl Scheduler {
}

// TODO: make querying the last compaction time in batch
let log_position_in_collecion = collection[0].log_position;
let tenant_ids = vec![collection[0].tenant.clone()];
let tenant = self.sysdb.get_last_compaction_time(tenant_ids).await;

Expand All @@ -96,12 +97,26 @@ impl Scheduler {
}
};

let mut offset = collection_info.first_log_offset;
// offset in log is the first offset in the log that has not been compacted. Note that
// since the offset is the first offset of log we get from the log service, we should
// use this offset to pull data from the log service.
if log_position_in_collecion + 1 < offset {
panic!(
"offset in sysdb is less than offset in log, this should not happen!"
)
} else {
// The offset in sysdb is the last offset that has been compacted.
// We need to start from the next offset.
offset = log_position_in_collecion + 1;
}

collection_records.push(CollectionRecord {
id: collection[0].id,
tenant_id: collection[0].tenant.clone(),
last_compaction_time,
first_record_time: collection_info.first_log_ts,
offset: collection_info.first_log_offset,
offset,
collection_version: collection[0].version,
});
}
Expand Down Expand Up @@ -316,4 +331,126 @@ mod tests {
let jobs = scheduler.get_jobs();
assert_eq!(jobs.count(), 1);
}

#[tokio::test]
#[should_panic(
expected = "offset in sysdb is less than offset in log, this should not happen!"
)]
async fn test_scheduler_panic() {
let mut log = Box::new(InMemoryLog::new());

let collection_uuid_1 = Uuid::from_str("00000000-0000-0000-0000-000000000001").unwrap();
log.add_log(
collection_uuid_1.clone(),
Box::new(InternalLogRecord {
collection_id: collection_uuid_1.clone(),
log_offset: 0,
log_ts: 1,
record: LogRecord {
log_offset: 0,
record: OperationRecord {
id: "embedding_id_1".to_string(),
embedding: None,
encoding: None,
metadata: None,
operation: Operation::Add,
},
},
}),
);
log.add_log(
collection_uuid_1.clone(),
Box::new(InternalLogRecord {
collection_id: collection_uuid_1.clone(),
log_offset: 1,
log_ts: 2,
record: LogRecord {
log_offset: 1,
record: OperationRecord {
id: "embedding_id_1".to_string(),
embedding: None,
encoding: None,
metadata: None,
operation: Operation::Add,
},
},
}),
);
log.add_log(
collection_uuid_1.clone(),
Box::new(InternalLogRecord {
collection_id: collection_uuid_1.clone(),
log_offset: 2,
log_ts: 3,
record: LogRecord {
log_offset: 2,
record: OperationRecord {
id: "embedding_id_1".to_string(),
embedding: None,
encoding: None,
metadata: None,
operation: Operation::Add,
},
},
}),
);
log.add_log(
collection_uuid_1.clone(),
Box::new(InternalLogRecord {
collection_id: collection_uuid_1.clone(),
log_offset: 3,
log_ts: 4,
record: LogRecord {
log_offset: 3,
record: OperationRecord {
id: "embedding_id_1".to_string(),
embedding: None,
encoding: None,
metadata: None,
operation: Operation::Add,
},
},
}),
);
let _ = log.update_collection_log_offset(collection_uuid_1, 2).await;

let mut sysdb = Box::new(TestSysDb::new());

let tenant_1 = "tenant_1".to_string();
let collection_1 = Collection {
id: collection_uuid_1,
name: "collection_1".to_string(),
metadata: None,
dimension: Some(1),
tenant: tenant_1.clone(),
database: "database_1".to_string(),
log_position: 0,
version: 0,
};

sysdb.add_collection(collection_1);

let last_compaction_time_1 = 2;
sysdb.add_tenant_last_compaction_time(tenant_1, last_compaction_time_1);

let my_ip = "0.0.0.1".to_string();
let scheduler_policy = Box::new(LasCompactionTimeSchedulerPolicy {});
let max_concurrent_jobs = 1000;

// Set assignment policy
let mut assignment_policy = Box::new(RendezvousHashingAssignmentPolicy::new());
assignment_policy.set_members(vec![my_ip.clone()]);

let mut scheduler = Scheduler::new(
my_ip.clone(),
log,
sysdb.clone(),
scheduler_policy,
max_concurrent_jobs,
assignment_policy,
);

scheduler.set_memberlist(vec![my_ip.clone()]);
scheduler.schedule().await;
}
}
2 changes: 1 addition & 1 deletion rust/worker/src/execution/operators/brute_force_knn.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use crate::{distance::DistanceFunction, execution::operator::Operator};
use async_trait::async_trait;
use std::cmp::Ordering;
use std::collections::BinaryHeap;
use tracing::{debug, trace};
use tracing::trace;

/// The brute force k-nearest neighbors operator is responsible for computing the k-nearest neighbors
/// of a given query vector against a set of vectors using brute force calculation.
Expand Down
2 changes: 0 additions & 2 deletions rust/worker/src/execution/operators/merge_knn_results.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,3 @@
use std::f64::consts::E;

use crate::{
blockstore::provider::BlockfileProvider,
errors::ChromaError,
Expand Down
1 change: 0 additions & 1 deletion rust/worker/src/execution/operators/pull_log.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ use crate::log::log::Log;
use crate::log::log::PullLogsError;
use crate::types::LogRecord;
use async_trait::async_trait;
use tracing::debug;
use tracing::trace;
use uuid::Uuid;

Expand Down
4 changes: 4 additions & 0 deletions rust/worker/src/execution/operators/register.rs
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,10 @@ impl Operator<RegisterInput, RegisterOutput> for RegisterOperator {
input.segment_flush_info.clone(),
)
.await;

// We must make sure that the log postion in sysdb is always greater than or equal to the log position
// in the log service. If the log position in sysdb is less than the log position in the log service,
// the we may lose data in compaction.
let sysdb_registration_result = match result {
Ok(response) => response,
Err(error) => return Err(RegisterError::FlushCompactionError(error)),
Expand Down
5 changes: 2 additions & 3 deletions rust/worker/src/execution/orchestration/compact.rs
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,6 @@ impl CompactOrchestrator {
// TODO: It is possible that the offset_id from the compaction job is wrong since the log service
// can have an outdated view of the offset. We should filter out entries from the log based on the start offset
// of the segment, and not fully respect the offset_id from the compaction job

async fn pull_logs(&mut self, self_address: Box<dyn Receiver<PullLogsResult>>) {
self.state = ExecutionState::PullLogs;
let operator = PullLogsOperator::new(self.log.clone());
Expand Down Expand Up @@ -268,7 +267,7 @@ impl CompactOrchestrator {
}
}

async fn flush_sysdb(
async fn register(
&mut self,
log_position: i64,
segment_flush_info: Arc<[SegmentFlushInfo]>,
Expand Down Expand Up @@ -522,7 +521,7 @@ impl Handler<FlushS3Result> for CompactOrchestrator {
match message {
Ok(msg) => {
// Unwrap should be safe here as we are guaranteed to have a value by construction
self.flush_sysdb(
self.register(
self.pulled_log_offset.unwrap(),
msg.segment_flush_info,
_ctx.sender.as_receiver(),
Expand Down
1 change: 1 addition & 0 deletions rust/worker/src/log/log.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ use uuid::Uuid;
/// - collection_id: the id of the collection that needs to be compacted
/// - first_log_offset: the offset of the first log entry in the collection that needs to be compacted
/// - first_log_ts: the timestamp of the first log entry in the collection that needs to be compacted
#[derive(Debug)]
pub(crate) struct CollectionInfo {
pub(crate) collection_id: String,
pub(crate) first_log_offset: i64,
Expand Down
Loading