Skip to content

Commit

Permalink
refactor: use i32 for hits and augment prune
Browse files Browse the repository at this point in the history
  • Loading branch information
hiltontj committed Sep 25, 2024
1 parent fba6d6b commit b98ca29
Showing 1 changed file with 45 additions and 28 deletions.
73 changes: 45 additions & 28 deletions influxdb3_write/src/parquet_cache/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use std::{
fmt::Debug,
ops::Range,
sync::{
atomic::{AtomicI8, AtomicUsize, Ordering},
atomic::{AtomicI32, AtomicUsize, Ordering},
Arc,
},
time::Duration,
Expand Down Expand Up @@ -138,7 +138,7 @@ struct CacheEntry {
///
/// When first created in the `Fetching` state, this will be set to -1, which will prevent
/// the fetching entry from being evicted by a prune operation
hits: AtomicI8,
hits: AtomicI32,
}

impl CacheEntry {
Expand All @@ -147,7 +147,7 @@ impl CacheEntry {
let _ = self
.hits
.fetch_update(Ordering::SeqCst, Ordering::SeqCst, |x| {
if x == i8::MAX {
if x == i32::MAX {
None
} else {
Some(x + 1)
Expand All @@ -157,7 +157,7 @@ impl CacheEntry {

/// Get the approximate memory footprint of this entry in bytes
fn size(&self) -> usize {
self.state.size() + std::mem::size_of::<AtomicI8>()
self.state.size() + std::mem::size_of::<AtomicI32>()
}
}

Expand All @@ -184,6 +184,10 @@ impl CacheEntryState {
}

/// A cache for storing objects from object storage by their [`Path`]
///
/// This acts as a Least-Frequently-Used (LFU) cache that allows for concurrent reads. See the
/// [`Cache::prune`] method for implementation of how the cache entries are pruned. Pruning must
/// be invoked externally, e.g., on an interval.
#[derive(Debug)]
struct Cache {
/// The maximum amount of memory this cache should occupy
Expand Down Expand Up @@ -247,7 +251,7 @@ impl Cache {
let (tx, _) = watch::channel(None);
let entry = CacheEntry {
state: CacheEntryState::Fetching(tx),
hits: AtomicI8::new(-1),
hits: AtomicI32::new(-1),
};
let additional = entry.size();
self.map.write().await.insert(path.clone(), entry);
Expand All @@ -266,7 +270,7 @@ impl Cache {
let cache_value = Arc::new(value);
entry.state = CacheEntryState::Success(Arc::clone(&cache_value));
let watch_count = tx.receiver_count().min((i8::MAX - 1) as usize);
entry.hits.store((watch_count + 1) as i8, Ordering::SeqCst);
entry.hits.store((watch_count + 1) as i32, Ordering::SeqCst);
// TODO(trevor): what if size is greater than cache capacity?
let additional = entry.size();
self.used.fetch_add(additional, Ordering::SeqCst);
Expand Down Expand Up @@ -296,31 +300,43 @@ impl Cache {
used >= self.capacity
}

/// Prune unused entries from the cache until the `max_freed_amount` has been pruned
/// Prune least frequently hit entries from the cache stopping once the `max_freed_amount` has
/// been pruned.
///
/// Entries that are not pruned will have their hit count decremented, while entries with a
/// negative hit count will be skipped since those are entries that are in the process of being
/// fetched.
///
/// Entries that have been used will have their used counts decremented
/// This performs two passes over the cache's inner map:
/// 1. to check for the lowest hit count
/// 2. to do the actual pruning
///
/// Since the inner map is an [`IndexMap`], the order of iteration during pruning will follow
/// the insertion order to the map, and therefore will prune "older" entries before "newer" ones.
async fn prune(&self) {
let mut map = self.map.write().await;
let lowest_hit_count = map
.iter()
.map(|(_, entry)| entry.hits.load(Ordering::Relaxed))
.filter(|hits| *hits >= 0)
.min()
.unwrap_or(0);
let mut freed = 0;
self.map.write().await.retain(|_, entry| {
if freed >= self.max_free_amount {
return true;
}
let hits = entry.hits.load(Ordering::SeqCst);
match hits.cmp(&0) {
std::cmp::Ordering::Less => {
// entries that are < 0 are fetching, so we keep those
true
}
std::cmp::Ordering::Equal => {
// prune entries that have not been hit
freed += entry.state.size();
false
}
std::cmp::Ordering::Greater => {
// decrement this entry's hit count for subsequent prunes
entry.hits.fetch_sub(1, Ordering::SeqCst);
true
}
map.retain(|_, entry| {
let hits = entry.hits.load(Ordering::Relaxed);
if hits < 0 {
// entries that are < 0 are fetching, so we keep those
true
} else if hits <= lowest_hit_count && freed < self.max_free_amount {
// prune entries that have been hit the least, unless we have already freed our
// max free amount:
freed += entry.state.size();
false
} else {
// decrement this entry's hit count for subsequent prunes by the lowest hit count
// this will act as a reset while preserving the hit count ranking accross entries
entry.hits.fetch_sub(lowest_hit_count, Ordering::Relaxed);
true
}
});
self.used.fetch_sub(freed, Ordering::SeqCst);
Expand Down Expand Up @@ -761,6 +777,7 @@ mod tests {

// GET paris from the cached store, this will not be served by the cache, because paris was
// evicted by neelix:
tokio::time::sleep(Duration::from_millis(100)).await;
assert_payload_at_equals!(cached_store, payload_2, path_2);
assert_eq!(4, inner_store.total_get_request_count());
assert_eq!(1, inner_store.get_request_count(&path_1));
Expand Down

0 comments on commit b98ca29

Please sign in to comment.