From f672cca471b9f6ad274e46e753174f1bf0d1f80a Mon Sep 17 00:00:00 2001 From: Trevor Hilton Date: Mon, 30 Sep 2024 15:42:22 -0400 Subject: [PATCH 1/2] feat: enable parquet cache config through CLI --- influxdb3/src/commands/serve.rs | 91 ++++++++++++++++++++---- influxdb3_write/src/parquet_cache/mod.rs | 26 ++++--- 2 files changed, 96 insertions(+), 21 deletions(-) diff --git a/influxdb3/src/commands/serve.rs b/influxdb3/src/commands/serve.rs index cdf51e33447..0742075da79 100644 --- a/influxdb3/src/commands/serve.rs +++ b/influxdb3/src/commands/serve.rs @@ -1,5 +1,6 @@ //! Entrypoint for InfluxDB 3.0 Edge Server +use anyhow::{bail, Context}; use clap_blocks::{ memory_size::MemorySize, object_store::{make_object_store, ObjectStoreConfig, ObjectStoreType}, @@ -22,11 +23,11 @@ use influxdb3_write::{ }; use iox_query::exec::{DedicatedExecutor, Executor, ExecutorConfig}; use iox_time::SystemProvider; -use object_store::DynObjectStore; +use object_store::ObjectStore; use observability_deps::tracing::*; use panic_logging::SendPanicsToTracing; use parquet_file::storage::{ParquetStorage, StorageId}; -use std::{collections::HashMap, path::Path}; +use std::{collections::HashMap, path::Path, str::FromStr}; use std::{num::NonZeroUsize, sync::Arc}; use thiserror::Error; use tokio::net::TcpListener; @@ -216,6 +217,67 @@ pub struct Config { /// for any hosts that share the same object store configuration, i.e., the same bucket. #[clap(long = "host-id", env = "INFLUXDB3_HOST_IDENTIFIER_PREFIX", action)] pub host_identifier_prefix: String, + + /// The size of the in-memory Parquet cache in bytes. + #[clap( + long = "parquet-mem-cache-size", + env = "INFLUXDB3_PARQUET_MEM_CACHE_SIZE", + default_value_t = 1024 * 1024 * 1024, + action + )] + pub parquet_mem_cache_size: usize, + + /// The percentage of entries to prune during a prune operation on the in-memory Parquet cache. + /// + /// This must be a number between 0 and 1. + #[clap( + long = "parquet-mem-cache-prune-percentage", + env = "INFLUXDB3_PARQUET_MEM_CACHE_PRUNE_PERCENTAGE", + default_value = "0.1", + action + )] + pub parquet_mem_cache_prune_percentage: PrunePercent, + + /// The interval on which to check if the in-memory Parquet cache needs to be pruned. + #[clap( + long = "parquet-mem-cache-prune-interval", + env = "INFLUXDB3_PARQUET_MEM_CACHE_PRUNE_INTERVAL", + default_value = "10ms", + action + )] + pub parquet_mem_cache_prune_interval: humantime::Duration, + + /// Disable the in-memory Parquet cache. + #[clap( + long = "disable-parquet-mem-cache", + env = "INFLUXDB3_DISABLE_PARQUET_MEM_CACHE", + default_value_t = false, + action + )] + pub disable_parquet_mem_cache: bool, +} + +#[derive(Debug, Clone, Copy)] +pub struct PrunePercent(f64); + +impl From for f64 { + fn from(value: PrunePercent) -> Self { + value.0 + } +} + +impl FromStr for PrunePercent { + type Err = anyhow::Error; + + fn from_str(s: &str) -> std::prelude::v1::Result { + let p = s + .parse::() + .context("failed to parse prune percent as f64")?; + if p <= 0.0 || p >= 1.0 { + bail!("prune percent must be between 0 and 1"); + } + Ok(Self(p)) + } } /// If `p` does not exist, try to create it as a directory. @@ -258,19 +320,22 @@ pub async fn command(config: Config) -> Result<()> { // Construct a token to trigger clean shutdown let frontend_shutdown = CancellationToken::new(); - let object_store: Arc = + let object_store: Arc = make_object_store(&config.object_store_config).map_err(Error::ObjectStoreParsing)?; let time_provider = Arc::new(SystemProvider::new()); - // TODO(trevor): make the cache capacity and prune percent configurable/optional: - let cache_capacity = 1024 * 1024 * 1024; - let prune_percent = 0.1; - let (object_store, parquet_cache) = create_cached_obj_store_and_oracle( - object_store, - Arc::clone(&time_provider) as _, - cache_capacity, - prune_percent, - ); + let (object_store, parquet_cache) = if !config.disable_parquet_mem_cache { + let (object_store, parquet_cache) = create_cached_obj_store_and_oracle( + object_store, + Arc::clone(&time_provider) as _, + config.parquet_mem_cache_size, + config.parquet_mem_cache_prune_percentage.into(), + config.parquet_mem_cache_prune_interval.into(), + ); + (object_store, Some(parquet_cache)) + } else { + (object_store, None) + }; let trace_exporter = config.tracing_config.build()?; @@ -349,7 +414,7 @@ pub async fn command(config: Config) -> Result<()> { Arc::::clone(&time_provider), Arc::clone(&exec), wal_config, - Some(parquet_cache), + parquet_cache, ) .await .map_err(|e| Error::WriteBufferInit(e.into()))?, diff --git a/influxdb3_write/src/parquet_cache/mod.rs b/influxdb3_write/src/parquet_cache/mod.rs index d033717d946..cc60bfec4c5 100644 --- a/influxdb3_write/src/parquet_cache/mod.rs +++ b/influxdb3_write/src/parquet_cache/mod.rs @@ -77,11 +77,10 @@ impl MemCacheOracle { /// This spawns two background tasks: /// * one to handle registered [`CacheRequest`]s /// * one to prune deleted and un-needed cache entries on an interval - // TODO(trevor): this should be more configurable, e.g., channel size, prune interval - fn new(mem_cached_store: Arc) -> Self { + fn new(mem_cached_store: Arc, prune_interval: Duration) -> Self { let (cache_request_tx, cache_request_rx) = channel(CACHE_REQUEST_BUFFER_SIZE); background_cache_request_handler(Arc::clone(&mem_cached_store), cache_request_rx); - background_cache_pruner(mem_cached_store); + background_cache_pruner(mem_cached_store, prune_interval); Self { cache_request_tx } } } @@ -104,6 +103,7 @@ pub fn create_cached_obj_store_and_oracle( time_provider: Arc, cache_capacity: usize, prune_percent: f64, + prune_interval: Duration, ) -> (Arc, Arc) { let store = Arc::new(MemCachedObjectStore::new( object_store, @@ -111,7 +111,7 @@ pub fn create_cached_obj_store_and_oracle( time_provider, prune_percent, )); - let oracle = Arc::new(MemCacheOracle::new(Arc::clone(&store))); + let oracle = Arc::new(MemCacheOracle::new(Arc::clone(&store), prune_interval)); (store, oracle) } @@ -120,7 +120,13 @@ pub fn test_cached_obj_store_and_oracle( object_store: Arc, time_provider: Arc, ) -> (Arc, Arc) { - create_cached_obj_store_and_oracle(object_store, time_provider, 1024 * 1024 * 1024, 0.1) + create_cached_obj_store_and_oracle( + object_store, + time_provider, + 1024 * 1024 * 1024, + 0.1, + Duration::from_millis(10), + ) } /// A value in the cache, containing the actual bytes as well as object store metadata @@ -655,10 +661,12 @@ fn background_cache_request_handler( } /// A background task for pruning un-needed entries in the cache -// TODO(trevor): the interval could be configurable -fn background_cache_pruner(mem_store: Arc) -> tokio::task::JoinHandle<()> { +fn background_cache_pruner( + mem_store: Arc, + interval_duration: Duration, +) -> tokio::task::JoinHandle<()> { tokio::spawn(async move { - let mut interval = tokio::time::interval(Duration::from_millis(10)); + let mut interval = tokio::time::interval(interval_duration); interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip); loop { interval.tick().await; @@ -755,11 +763,13 @@ pub(crate) mod tests { // these are magic numbers that will make it so the third entry exceeds the cache capacity: let cache_capacity_bytes = 60; let cache_prune_percent = 0.4; + let cache_prune_interval = Duration::from_millis(10); let (cached_store, oracle) = create_cached_obj_store_and_oracle( Arc::clone(&inner_store) as _, Arc::clone(&time_provider) as _, cache_capacity_bytes, cache_prune_percent, + cache_prune_interval, ); // PUT an entry into the store: let path_1 = Path::from("0.parquet"); From ed00ced0aae26d6a793f34f245934b17bacc1616 Mon Sep 17 00:00:00 2001 From: Trevor Hilton Date: Mon, 30 Sep 2024 19:26:24 -0400 Subject: [PATCH 2/2] refactor: PR feedback --- influxdb3/src/commands/serve.rs | 16 +++++++++------- 1 file changed, 9 insertions(+), 7 deletions(-) diff --git a/influxdb3/src/commands/serve.rs b/influxdb3/src/commands/serve.rs index 0742075da79..b1eff68a711 100644 --- a/influxdb3/src/commands/serve.rs +++ b/influxdb3/src/commands/serve.rs @@ -218,11 +218,11 @@ pub struct Config { #[clap(long = "host-id", env = "INFLUXDB3_HOST_IDENTIFIER_PREFIX", action)] pub host_identifier_prefix: String, - /// The size of the in-memory Parquet cache in bytes. + /// The size of the in-memory Parquet cache in megabytes (MB). #[clap( - long = "parquet-mem-cache-size", - env = "INFLUXDB3_PARQUET_MEM_CACHE_SIZE", - default_value_t = 1024 * 1024 * 1024, + long = "parquet-mem-cache-size-mb", + env = "INFLUXDB3_PARQUET_MEM_CACHE_SIZE_MB", + default_value = "1000", action )] pub parquet_mem_cache_size: usize, @@ -239,15 +239,17 @@ pub struct Config { pub parquet_mem_cache_prune_percentage: PrunePercent, /// The interval on which to check if the in-memory Parquet cache needs to be pruned. + /// + /// Enter as a human-readable time, e.g., "1s", "100ms", "1m", etc. #[clap( long = "parquet-mem-cache-prune-interval", env = "INFLUXDB3_PARQUET_MEM_CACHE_PRUNE_INTERVAL", - default_value = "10ms", + default_value = "1s", action )] pub parquet_mem_cache_prune_interval: humantime::Duration, - /// Disable the in-memory Parquet cache. + /// Disable the in-memory Parquet cache. By default, the cache is enabled. #[clap( long = "disable-parquet-mem-cache", env = "INFLUXDB3_DISABLE_PARQUET_MEM_CACHE", @@ -328,7 +330,7 @@ pub async fn command(config: Config) -> Result<()> { let (object_store, parquet_cache) = create_cached_obj_store_and_oracle( object_store, Arc::clone(&time_provider) as _, - config.parquet_mem_cache_size, + config.parquet_mem_cache_size * 1_000, config.parquet_mem_cache_prune_percentage.into(), config.parquet_mem_cache_prune_interval.into(), );