Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[ENH] HNSW should fork #2124

Merged
merged 54 commits into from
May 6, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
54 commits
Select commit Hold shift + click to select a range
7b77590
wip
HammadB Apr 8, 2024
cf1cc76
wip
HammadB Apr 9, 2024
2c34676
wip
HammadB Apr 19, 2024
ea17419
wip
HammadB Apr 20, 2024
4efd1fd
wip
HammadB Apr 21, 2024
af40efc
wup
HammadB Apr 23, 2024
cd28833
static
HammadB Apr 23, 2024
03f043a
wip
HammadB Apr 24, 2024
387b321
Passing
HammadB Apr 25, 2024
9713f77
fix
HammadB Apr 26, 2024
ba781d6
merge
HammadB Apr 26, 2024
233a63d
cleanup
HammadB Apr 26, 2024
74ff663
Mark as xfail
HammadB Apr 27, 2024
fb13240
[ENH] Query merging
HammadB Apr 26, 2024
f8a4f62
lint
HammadB Apr 26, 2024
cbd83ca
cleanup
HammadB Apr 26, 2024
b2bf926
ignore protogen
HammadB Apr 26, 2024
1479970
merge fix
HammadB Apr 27, 2024
7acf546
sydb svc port
HammadB Apr 29, 2024
2f2adba
wip
HammadB Apr 8, 2024
1215d5d
wip
HammadB Apr 9, 2024
64a2225
wip
HammadB Apr 19, 2024
22cb1b6
wip
HammadB Apr 20, 2024
2d0d3e2
wip
HammadB Apr 21, 2024
56e7573
wup
HammadB Apr 23, 2024
20a8d5e
static
HammadB Apr 23, 2024
ee096e5
wip
HammadB Apr 24, 2024
440b2f0
Passing
HammadB Apr 25, 2024
0d738d6
fix
HammadB Apr 26, 2024
a73ab43
merge
HammadB Apr 26, 2024
7b3158f
cleanup
HammadB Apr 26, 2024
658cc1d
Mark as xfail
HammadB Apr 27, 2024
3de8d20
sydb svc port
HammadB Apr 29, 2024
a318337
merged
HammadB May 1, 2024
9915511
[ENH] Query merging
HammadB Apr 26, 2024
2ba28a5
lint
HammadB Apr 26, 2024
c07e449
cleanup
HammadB Apr 26, 2024
0e1adf8
ignore protogen
HammadB Apr 26, 2024
00e7a69
merge fix
HammadB Apr 27, 2024
515af61
Merge branch 'hammad/query_hnsw' of github.com:chroma-core/chroma int…
HammadB May 2, 2024
11bcef9
merge main
HammadB May 2, 2024
6603455
cleanup unwraps
HammadB May 2, 2024
ff52cfe
cleanup
HammadB May 2, 2024
8d212cb
cleanup
HammadB May 2, 2024
82e6a15
query
HammadB May 2, 2024
31c5879
wip[]
HammadB May 2, 2024
bb101e1
query
HammadB May 2, 2024
f0d6c9b
fork
HammadB May 3, 2024
85ad3c2
Merge branch 'main' into hammad/hnsw_fork
HammadB May 3, 2024
b91bb90
merge main
HammadB May 3, 2024
0e37888
wip
HammadB May 6, 2024
2f68175
Merge branch 'main' into hammad/hnsw_fork
HammadB May 6, 2024
77f3955
done?
HammadB May 6, 2024
fa33f62
flush error handling
HammadB May 6, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions rust/worker/src/errors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,3 +44,5 @@ pub(crate) enum ErrorCodes {
pub(crate) trait ChromaError: Error + Send {
fn code(&self) -> ErrorCodes;
}

impl Error for Box<dyn ChromaError> {}
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

used to allow #[from] on Box

8 changes: 4 additions & 4 deletions rust/worker/src/execution/operators/flush_s3.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,8 @@ use crate::types::SegmentFlushInfo;
use crate::{
execution::operator::Operator,
segment::{
distributed_hnsw_segment::DistributedHNSWSegment, record_segment::RecordSegmentWriter,
SegmentWriter,
distributed_hnsw_segment::DistributedHNSWSegmentWriter,
record_segment::RecordSegmentWriter, SegmentWriter,
},
};
use async_trait::async_trait;
Expand All @@ -24,13 +24,13 @@ impl FlushS3Operator {
#[derive(Debug)]
pub struct FlushS3Input {
record_segment_writer: RecordSegmentWriter,
hnsw_segment_writer: Box<DistributedHNSWSegment>,
hnsw_segment_writer: Box<DistributedHNSWSegmentWriter>,
}

impl FlushS3Input {
pub fn new(
record_segment_writer: RecordSegmentWriter,
hnsw_segment_writer: Box<DistributedHNSWSegment>,
hnsw_segment_writer: Box<DistributedHNSWSegmentWriter>,
) -> Self {
Self {
record_segment_writer,
Expand Down
4 changes: 2 additions & 2 deletions rust/worker/src/execution/operators/hnsw_knn.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use crate::{
errors::ChromaError, execution::operator::Operator,
segment::distributed_hnsw_segment::DistributedHNSWSegment,
segment::distributed_hnsw_segment::DistributedHNSWSegmentReader,
};
use async_trait::async_trait;

Expand All @@ -9,7 +9,7 @@ pub struct HnswKnnOperator {}

#[derive(Debug)]
pub struct HnswKnnOperatorInput {
pub segment: Box<DistributedHNSWSegment>,
pub segment: Box<DistributedHNSWSegmentReader>,
pub query: Vec<f32>,
pub k: usize,
}
Expand Down
8 changes: 4 additions & 4 deletions rust/worker/src/execution/operators/write_segments.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use crate::segment::SegmentWriter;
use crate::{
execution::{data::data_chunk::Chunk, operator::Operator},
segment::{
distributed_hnsw_segment::DistributedHNSWSegment, record_segment::RecordSegmentWriter,
distributed_hnsw_segment::DistributedHNSWSegmentWriter, record_segment::RecordSegmentWriter,
},
types::LogRecord,
};
Expand All @@ -21,14 +21,14 @@ impl WriteSegmentsOperator {
#[derive(Debug)]
pub struct WriteSegmentsInput {
record_segment_writer: RecordSegmentWriter,
hnsw_segment_writer: Box<DistributedHNSWSegment>,
hnsw_segment_writer: Box<DistributedHNSWSegmentWriter>,
chunk: Chunk<LogRecord>,
}

impl<'me> WriteSegmentsInput {
pub fn new(
record_segment_writer: RecordSegmentWriter,
hnsw_segment_writer: Box<DistributedHNSWSegment>,
hnsw_segment_writer: Box<DistributedHNSWSegmentWriter>,
chunk: Chunk<LogRecord>,
) -> Self {
WriteSegmentsInput {
Expand All @@ -42,7 +42,7 @@ impl<'me> WriteSegmentsInput {
#[derive(Debug)]
pub struct WriteSegmentsOutput {
pub(crate) record_segment_writer: RecordSegmentWriter,
pub(crate) hnsw_segment_writer: Box<DistributedHNSWSegment>,
pub(crate) hnsw_segment_writer: Box<DistributedHNSWSegmentWriter>,
}

pub type WriteSegmentsResult = Result<WriteSegmentsOutput, ()>;
Expand Down
9 changes: 5 additions & 4 deletions rust/worker/src/execution/orchestration/compact.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ use crate::execution::operators::write_segments::WriteSegmentsOperator;
use crate::execution::operators::write_segments::WriteSegmentsResult;
use crate::index::hnsw_provider::HnswIndexProvider;
use crate::log::log::Log;
use crate::segment::distributed_hnsw_segment::DistributedHNSWSegment;
use crate::segment::distributed_hnsw_segment::DistributedHNSWSegmentWriter;
use crate::segment::record_segment::RecordSegmentWriter;
use crate::segment::LogMaterializer;
use crate::segment::SegmentFlusher;
Expand Down Expand Up @@ -250,7 +250,7 @@ impl CompactOrchestrator {
async fn flush_s3(
&mut self,
record_segment_writer: RecordSegmentWriter,
hnsw_segment_writer: Box<DistributedHNSWSegment>,
hnsw_segment_writer: Box<DistributedHNSWSegmentWriter>,
self_address: Box<dyn Receiver<FlushS3Result>>,
) {
self.state = ExecutionState::Flush;
Expand Down Expand Up @@ -296,7 +296,8 @@ impl CompactOrchestrator {

async fn get_segment_writers(
&mut self,
) -> Result<(RecordSegmentWriter, Box<DistributedHNSWSegment>), Box<dyn ChromaError>> {
) -> Result<(RecordSegmentWriter, Box<DistributedHNSWSegmentWriter>), Box<dyn ChromaError>>
{
// Care should be taken to use the same writers across the compaction process
// Since the segment writers are stateful, we should not create new writers for each partition
// Nor should we create new writers across different tasks
Expand Down Expand Up @@ -376,7 +377,7 @@ impl CompactOrchestrator {
.dimension
.expect("Dimension is required in the compactor");

let hnsw_segment_writer = match DistributedHNSWSegment::from_segment(
let hnsw_segment_writer = match DistributedHNSWSegmentWriter::from_segment(
hnsw_segment,
dimension as usize,
self.hnsw_index_provider.clone(),
Expand Down
24 changes: 18 additions & 6 deletions rust/worker/src/execution/orchestration/hnsw.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,10 @@ use crate::execution::operators::merge_knn_results::{
};
use crate::execution::operators::pull_log::PullLogsResult;
use crate::index::hnsw_provider::HnswIndexProvider;
use crate::segment::distributed_hnsw_segment::DistributedHNSWSegment;
use crate::segment::distributed_hnsw_segment::{
DistributedHNSWSegmentFromSegmentError, DistributedHNSWSegmentReader,
DistributedHNSWSegmentWriter,
};
use crate::sysdb::sysdb::{GetCollectionsError, GetSegmentsError, SysDb};
use crate::system::{ComponentContext, System};
use crate::types::{Collection, LogRecord, Segment, SegmentType, VectorQueryResult};
Expand Down Expand Up @@ -233,7 +236,7 @@ impl HnswQueryOrchestrator {
.expect("Invariant violation. Collection dimension is not set");

// Fetch the data needed for the duration of the query - The HNSW Segment, The record Segment and the Collection
let hnsw_segment_reader = match DistributedHNSWSegment::from_segment(
let hnsw_segment_reader = match DistributedHNSWSegmentReader::from_segment(
// These unwraps are safe because we have already checked that the segments are set in the orchestrator on_start
hnsw_segment,
dimensionality as usize,
Expand All @@ -242,10 +245,19 @@ impl HnswQueryOrchestrator {
.await
{
Ok(reader) => reader,
Err(e) => {
self.terminate_with_error(e, ctx);
return;
}
Err(e) => match *e {
DistributedHNSWSegmentFromSegmentError::Uninitialized => {
// no task, decrement the merge dependency count and return
self.hnsw_result_distances = Some(Vec::new());
self.hnsw_result_offset_ids = Some(Vec::new());
self.merge_dependency_count -= 1;
return;
}
_ => {
self.terminate_with_error(e, ctx);
return;
}
},
};

println!("Created HNSW Segment Reader: {:?}", hnsw_segment_reader);
Expand Down
8 changes: 5 additions & 3 deletions rust/worker/src/index/hnsw.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@ pub(crate) struct HnswIndexConfig {
pub(crate) enum HnswIndexFromSegmentError {
#[error("Missing config `{0}`")]
MissingConfig(String),
#[error("Invalid metadata value")]
MetadataValueError(#[from] MetadataValueConversionError),
}

impl ChromaError for HnswIndexFromSegmentError {
Expand All @@ -47,7 +49,7 @@ impl HnswIndexConfig {
pub(crate) fn from_segment(
segment: &Segment,
persist_path: &std::path::Path,
) -> Result<HnswIndexConfig, Box<dyn ChromaError>> {
) -> Result<HnswIndexConfig, Box<HnswIndexFromSegmentError>> {
let persist_path = match persist_path.to_str() {
Some(persist_path) => persist_path,
None => {
Expand Down Expand Up @@ -78,7 +80,7 @@ impl HnswIndexConfig {
fn get_metadata_value_as<'a, T>(
metadata: &'a Metadata,
key: &str,
) -> Result<T, Box<dyn ChromaError>>
) -> Result<T, Box<HnswIndexFromSegmentError>>
where
T: TryFrom<&'a MetadataValue, Error = MetadataValueConversionError>,
{
Expand All @@ -92,7 +94,7 @@ impl HnswIndexConfig {
};
match res {
Ok(value) => Ok(value),
Err(e) => Err(Box::new(e)),
Err(e) => Err(Box::new(HnswIndexFromSegmentError::MetadataValueError(e))),
}
}

Expand Down
Loading
Loading