Skip to content

Commit

Permalink
refactor: improvements to the catlog API
Browse files Browse the repository at this point in the history
Updated methods on the catalog API to:
* use Copy types if possible,
* keep the original methods to get by name, while including a new _id
  method for fetching schema etc.
* added some other convenience methods for ids and schema

Refactored code to use new methods as appropriate. This makes the API
more flexible and reduces the amount of locking.
  • Loading branch information
hiltontj committed Oct 11, 2024
1 parent eb24b3b commit bafce4c
Show file tree
Hide file tree
Showing 9 changed files with 146 additions and 127 deletions.
67 changes: 54 additions & 13 deletions influxdb3_catalog/src/catalog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -140,11 +140,7 @@ impl Catalog {
}

pub fn db_or_create(&self, db_name: &str) -> Result<Arc<DatabaseSchema>> {
let db = self
.db_name_to_id(db_name.into())
.and_then(|db_id| self.inner.read().databases.get(&db_id).map(Arc::clone));

let db = match db {
let db = match self.db_schema(db_name) {
Some(db) => db,
None => {
let mut inner = self.inner.write();
Expand All @@ -168,16 +164,36 @@ impl Catalog {
Ok(db)
}

pub fn db_name_to_id(&self, db_name: Arc<str>) -> Option<DbId> {
self.inner.read().db_map.get_by_right(&db_name).copied()
pub fn db_name_to_id(&self, db_name: impl Into<Arc<str>>) -> Option<DbId> {
self.inner
.read()
.db_map
.get_by_right(&db_name.into())
.copied()
}

pub fn db_id_to_name(&self, db_id: DbId) -> Option<Arc<str>> {
self.inner.read().db_map.get_by_left(&db_id).map(Arc::clone)
}

pub fn db_schema(&self, id: &DbId) -> Option<Arc<DatabaseSchema>> {
self.inner.read().databases.get(id).cloned()
pub fn db_schema(&self, db_name: impl Into<Arc<str>>) -> Option<Arc<DatabaseSchema>> {
self.db_schema_and_id(db_name).map(|(_, schema)| schema)
}

pub fn db_schema_by_id(&self, db_id: DbId) -> Option<Arc<DatabaseSchema>> {
self.inner.read().databases.get(&db_id).cloned()
}

pub fn db_schema_and_id(
&self,
db_name: impl Into<Arc<str>>,
) -> Option<(DbId, Arc<DatabaseSchema>)> {
let inner = self.inner.read();
let db_id = inner.db_map.get_by_right(&db_name.into())?;
inner
.databases
.get(db_id)
.map(|db| (*db_id, Arc::clone(db)))
}

pub fn sequence_number(&self) -> SequenceNumber {
Expand Down Expand Up @@ -591,13 +607,38 @@ impl DatabaseSchema {
Ok(new_db)
}

pub fn get_table_schema(&self, table_id: TableId) -> Option<&Schema> {
pub fn table_schema(&self, table_name: impl Into<Arc<str>>) -> Option<Schema> {
self.table_schema_and_id(table_name)
.map(|(_, schema)| schema.clone())
}

pub fn table_schema_by_id(&self, table_id: TableId) -> Option<Schema> {
self.tables
.get(&table_id)
.map(|table| table.influx_schema())
.cloned()
}

pub fn table_schema_and_id(
&self,
table_name: impl Into<Arc<str>>,
) -> Option<(TableId, Schema)> {
self.table_map
.get_by_right(&table_name.into())
.and_then(|table_id| {
self.tables
.get(table_id)
.map(|table_def| (*table_id, table_def.influx_schema().clone()))
})
}

pub fn table_definition(&self, table_name: impl Into<Arc<str>>) -> Option<&TableDefinition> {
self.table_map
.get_by_right(&table_name.into())
.and_then(|table_id| self.tables.get(table_id))
}

pub fn get_table(&self, table_id: TableId) -> Option<&TableDefinition> {
pub fn table_definition_by_id(&self, table_id: TableId) -> Option<&TableDefinition> {
self.tables.get(&table_id)
}

Expand All @@ -620,8 +661,8 @@ impl DatabaseSchema {
self.tables.values()
}

pub fn table_name_to_id(&self, table_name: Arc<str>) -> Option<TableId> {
self.table_map.get_by_right(&table_name).copied()
pub fn table_name_to_id(&self, table_name: impl Into<Arc<str>>) -> Option<TableId> {
self.table_map.get_by_right(&table_name.into()).copied()
}

pub fn table_id_to_name(&self, table_id: TableId) -> Option<Arc<str>> {
Expand Down
25 changes: 9 additions & 16 deletions influxdb3_server/src/http.rs
Original file line number Diff line number Diff line change
Expand Up @@ -692,18 +692,15 @@ where
ttl,
} = self.read_body_json(req).await?;

let db_id = self
let (db_id, db_schema) = self
.write_buffer
.catalog()
.db_name_to_id(db.as_str().into())
.db_schema_and_id(db)
.ok_or_else(|| WriteBufferError::DbDoesNotExist)?;
let table_id = self
.write_buffer
.catalog()
.db_schema(&db_id)
.expect("db should exist")
.table_name_to_id(table.as_str().into())
let table_id = db_schema
.table_name_to_id(table.as_str())
.ok_or_else(|| WriteBufferError::TableDoesNotExist)?;

match self
.write_buffer
.create_last_cache(
Expand Down Expand Up @@ -742,17 +739,13 @@ where
self.read_body_json(req).await?
};

let db_id = self
let (db_id, db_schema) = self
.write_buffer
.catalog()
.db_name_to_id(db.into())
.db_schema_and_id(db)
.ok_or_else(|| WriteBufferError::DbDoesNotExist)?;
let table_id = self
.write_buffer
.catalog()
.db_schema(&db_id)
.expect("db should exist")
.table_name_to_id(table.into())
let table_id = db_schema
.table_name_to_id(table)
.ok_or_else(|| WriteBufferError::TableDoesNotExist)?;
self.write_buffer
.delete_last_cache(db_id, table_id, &name)
Expand Down
24 changes: 12 additions & 12 deletions influxdb3_server/src/query_executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -304,12 +304,11 @@ impl QueryDatabase for QueryExecutorImpl {
) -> Result<Option<Arc<dyn QueryNamespace>>, DataFusionError> {
let _span_recorder = SpanRecorder::new(span);

let db_id = self.catalog.db_name_to_id(name.into()).ok_or_else(|| {
let db_schema = self.catalog.db_schema(name).ok_or_else(|| {
DataFusionError::External(Box::new(Error::DatabaseNotFound {
db_name: name.into(),
}))
})?;
let db_schema = self.catalog.db_schema(&db_id).expect("database exists");
Ok(Some(Arc::new(Database::new(
db_schema,
Arc::clone(&self.write_buffer),
Expand Down Expand Up @@ -376,16 +375,17 @@ impl Database {
}

async fn query_table(&self, table_name: &str) -> Option<Arc<QueryTable>> {
let table_name = table_name.into();
let table_id = self.db_schema.table_name_to_id(Arc::clone(&table_name))?;
self.db_schema.get_table_schema(table_id).map(|schema| {
Arc::new(QueryTable {
db_schema: Arc::clone(&self.db_schema),
table_name,
schema: schema.clone(),
write_buffer: Arc::clone(&self.write_buffer),
let table_name: Arc<str> = table_name.into();
self.db_schema
.table_schema(Arc::clone(&table_name))
.map(|schema| {
Arc::new(QueryTable {
db_schema: Arc::clone(&self.db_schema),
table_name,
schema: schema.clone(),
write_buffer: Arc::clone(&self.write_buffer),
})
})
})
}
}

Expand Down Expand Up @@ -512,7 +512,7 @@ impl SchemaProvider for Database {
}

fn table_exist(&self, name: &str) -> bool {
self.db_schema.table_name_to_id(name.into()).is_some()
self.db_schema.table_name_to_id(name).is_some()
}
}

Expand Down
4 changes: 2 additions & 2 deletions influxdb3_server/src/system_tables/parquet_files.rs
Original file line number Diff line number Diff line change
Expand Up @@ -95,9 +95,9 @@ impl IoxSystemTable for ParquetFilesTable {
self.db_id,
self.buffer
.catalog()
.db_schema(&self.db_id)
.db_schema_by_id(self.db_id)
.expect("db exists")
.table_name_to_id(table_name.as_str().into())
.table_name_to_id(table_name.as_str())
.expect("table exists"),
);

Expand Down
19 changes: 8 additions & 11 deletions influxdb3_write/src/last_cache/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -196,17 +196,14 @@ impl LastCacheProvider {
table
.iter()
.flat_map(|(table_id, table_map)| {
table_map.iter().map(|(lc_name, lc)| {
lc.to_definition(
*table_id,
self.catalog
.db_schema(&db)
.expect("db exists")
.table_id_to_name(*table_id)
.expect("table exists")
.to_string(),
lc_name,
)
let table_name = self
.catalog
.db_schema_by_id(db)
.expect("db exists")
.table_id_to_name(*table_id)
.expect("table exists");
table_map.iter().map(move |(lc_name, lc)| {
lc.to_definition(*table_id, table_name.as_ref(), lc_name)
})
})
.collect()
Expand Down
4 changes: 2 additions & 2 deletions influxdb3_write/src/last_cache/table_function.rs
Original file line number Diff line number Diff line change
Expand Up @@ -100,9 +100,9 @@ impl TableFunctionImpl for LastCacheFunction {
let table_id = self
.provider
.catalog
.db_schema(&self.db_id)
.db_schema_by_id(self.db_id)
.expect("db exists")
.table_name_to_id(table_name.as_str().into())
.table_name_to_id(table_name.as_str())
.expect("table exists");

match self.provider.get_cache_name_and_schema(
Expand Down
45 changes: 16 additions & 29 deletions influxdb3_write/src/write_buffer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -294,47 +294,34 @@ impl WriteBufferImpl {
projection: Option<&Vec<usize>>,
ctx: &dyn Session,
) -> Result<Vec<Arc<dyn QueryChunk>>, DataFusionError> {
let db_id = self
.catalog
.db_name_to_id(database_name.into())
.ok_or_else(|| DataFusionError::Execution(format!("db {} not found", database_name)))?;

let db_schema = self
.catalog
.db_schema(&db_id)
.expect("Already checked db exists");

let table_id = db_schema
.table_name_to_id(table_name.into())
.ok_or_else(|| {
let db_schema = self.catalog.db_schema(database_name).ok_or_else(|| {
DataFusionError::Execution(format!("database {} not found", database_name))
})?;

let (table_id, table_schema) =
db_schema.table_schema_and_id(table_name).ok_or_else(|| {
DataFusionError::Execution(format!(
"table {} not found in db {}",
table_name, database_name
))
})?;

let table_schema = db_schema
.tables
.get(&table_id)
.expect("Already checked id exists")
.schema
.clone();

let mut chunks = self.buffer.get_table_chunks(
Arc::clone(&db_schema),
table_name,
filters,
projection,
ctx,
)?;

let parquet_files = self.persisted_files.get_files(db_schema.id, table_id);

let mut chunk_order = chunks.len() as i64;

for parquet_file in parquet_files {
let parquet_chunk = parquet_chunk_from_file(
&parquet_file,
table_schema.schema(),
&table_schema,
self.persister.object_store_url().clone(),
self.persister.object_store(),
chunk_order,
Expand Down Expand Up @@ -470,12 +457,12 @@ impl LastCacheManager for WriteBufferImpl {
) -> Result<Option<LastCacheDefinition>, Error> {
let cache_name = cache_name.map(Into::into);
let catalog = self.catalog();
let db_schema = catalog.db_schema(&db_id).ok_or(Error::DbDoesNotExist)?;
let db_schema = catalog
.db_schema_by_id(db_id)
.ok_or(Error::DbDoesNotExist)?;
let schema = db_schema
.get_table(table_id)
.ok_or(Error::TableDoesNotExist)?
.schema()
.clone();
.table_schema_by_id(table_id)
.ok_or(Error::TableDoesNotExist)?;

if let Some(info) = self.last_cache.create_cache(CreateCacheArguments {
db_id,
Expand All @@ -485,7 +472,7 @@ impl LastCacheManager for WriteBufferImpl {
.table_id_to_name(table_id)
.expect("table exists")
.to_string(),
schema: schema.schema().clone(),
schema,
cache_name,
count,
ttl,
Expand Down Expand Up @@ -514,7 +501,7 @@ impl LastCacheManager for WriteBufferImpl {
cache_name: &str,
) -> crate::Result<(), self::Error> {
let catalog = self.catalog();
let db_schema = catalog.db_schema(&db_id).expect("db should exist");
let db_schema = catalog.db_schema_by_id(db_id).expect("db should exist");
self.last_cache.delete_cache(db_id, tbl_id, cache_name)?;
catalog.delete_last_cache(db_id, tbl_id, cache_name);

Expand Down Expand Up @@ -582,7 +569,7 @@ mod tests {
Precision::Nanosecond,
);

let db = catalog.db_schema(&DbId::from(0)).unwrap();
let db = catalog.db_schema_by_id(DbId::from(0)).unwrap();

assert_eq!(db.tables.len(), 2);
// cpu table
Expand Down
Loading

0 comments on commit bafce4c

Please sign in to comment.