From 11f17c0f874d90fd979142063d0db25c078719ae Mon Sep 17 00:00:00 2001 From: Zoran Cvetkov Date: Wed, 21 Aug 2024 23:22:24 +0300 Subject: [PATCH 1/7] request all entity types in a single SQL query --- graph/src/blockchain/block_stream.rs | 54 ++++++++------ graph/src/components/store/mod.rs | 2 +- graph/src/components/store/traits.rs | 6 +- store/postgres/src/block_range.rs | 6 +- store/postgres/src/deployment_store.rs | 4 +- store/postgres/src/relational.rs | 9 ++- store/postgres/src/relational_queries.rs | 69 ++++++++++++------ store/postgres/src/writable.rs | 23 +++--- store/test-store/tests/graph/entity_cache.rs | 2 +- store/test-store/tests/postgres/writable.rs | 77 +++++--------------- 10 files changed, 126 insertions(+), 126 deletions(-) diff --git a/graph/src/blockchain/block_stream.rs b/graph/src/blockchain/block_stream.rs index 7704d772731..43d05b0e518 100644 --- a/graph/src/blockchain/block_stream.rs +++ b/graph/src/blockchain/block_stream.rs @@ -437,33 +437,39 @@ async fn get_entities_for_range( schema: &InputSchema, from: BlockNumber, to: BlockNumber, -) -> Result>, Error> { - let mut entities_by_block = BTreeMap::new(); - + /* + ) -> Result>, Error> { + let mut entities_by_block = BTreeMap::new(); + + for entity_name in &filter.entities { + let entity_type = schema.entity_type(entity_name)?; + + let entity_ranges = store.get_range(&entity_type, from..to)?; + + for (block_number, entity_vec) in entity_ranges { + let mut entity_vec = entity_vec + .into_iter() + .map(|e| EntityWithType { + entity_type: entity_type.clone(), + entity: e, + }) + .collect(); + + entities_by_block + .entry(block_number) + .and_modify(|existing_vec: &mut Vec| { + existing_vec.append(&mut entity_vec); + }) + .or_insert(entity_vec); + } + */ +) -> Result>, Error> { + let mut entity_types = vec![]; for entity_name in &filter.entities { let entity_type = schema.entity_type(entity_name)?; - - let entity_ranges = store.get_range(&entity_type, from..to)?; - - for (block_number, entity_vec) in entity_ranges { - let mut entity_vec = entity_vec - .into_iter() - .map(|e| EntityWithType { - entity_type: entity_type.clone(), - entity: e, - }) - .collect(); - - entities_by_block - .entry(block_number) - .and_modify(|existing_vec: &mut Vec| { - existing_vec.append(&mut entity_vec); - }) - .or_insert(entity_vec); - } + entity_types.push(entity_type); } - - Ok(entities_by_block) + Ok(store.get_range(entity_types, from..to)?) } impl TriggersAdapterWrapper { diff --git a/graph/src/components/store/mod.rs b/graph/src/components/store/mod.rs index d0414abf096..72fd88bed33 100644 --- a/graph/src/components/store/mod.rs +++ b/graph/src/components/store/mod.rs @@ -1041,7 +1041,7 @@ impl ReadStore for EmptyStore { fn get_range( &self, - _entity_type: &EntityType, + _entity_types: Vec, _block_range: Range, ) -> Result>, StoreError> { Ok(BTreeMap::new()) diff --git a/graph/src/components/store/traits.rs b/graph/src/components/store/traits.rs index d93665332c6..79587b6653d 100644 --- a/graph/src/components/store/traits.rs +++ b/graph/src/components/store/traits.rs @@ -231,7 +231,7 @@ pub trait ReadStore: Send + Sync + 'static { /// Looks up entities using the given store key for a range of blocks. fn get_range( &self, - entity_type: &EntityType, + entity_types: Vec, block_range: Range, ) -> Result>, StoreError>; @@ -259,10 +259,10 @@ impl ReadStore for Arc { fn get_range( &self, - entity_type: &EntityType, + entity_types: Vec, block_range: Range, ) -> Result>, StoreError> { - (**self).get_range(entity_type, block_range) + (**self).get_range(entity_types, block_range) } fn get_derived( diff --git a/store/postgres/src/block_range.rs b/store/postgres/src/block_range.rs index 2387f6abd0a..9c7af7cd361 100644 --- a/store/postgres/src/block_range.rs +++ b/store/postgres/src/block_range.rs @@ -161,7 +161,7 @@ impl EntityBlockRange { let BlockRange(start, finish) = block_range; self.compare_column(out); - out.push_sql(" >= "); + out.push_sql(">= "); match start { Bound::Included(block) => out.push_bind_param::(block)?, Bound::Excluded(block) => { @@ -170,9 +170,9 @@ impl EntityBlockRange { } Bound::Unbounded => unimplemented!(), }; - out.push_sql(" AND "); + out.push_sql(" and"); self.compare_column(out); - out.push_sql(" <= "); + out.push_sql("<= "); match finish { Bound::Included(block) => { out.push_bind_param::(block)?; diff --git a/store/postgres/src/deployment_store.rs b/store/postgres/src/deployment_store.rs index f500f2aa6ed..608329ab5a2 100644 --- a/store/postgres/src/deployment_store.rs +++ b/store/postgres/src/deployment_store.rs @@ -1059,12 +1059,12 @@ impl DeploymentStore { pub(crate) fn get_range( &self, site: Arc, - entity_type: &EntityType, + entity_types: Vec, block_range: Range, ) -> Result>, StoreError> { let mut conn = self.get_conn()?; let layout = self.layout(&mut conn, site)?; - layout.find_range(&mut conn, entity_type, block_range) + layout.find_range(&mut conn, entity_types, block_range) } pub(crate) fn get_derived( diff --git a/store/postgres/src/relational.rs b/store/postgres/src/relational.rs index 792c7d8342e..99a59b35035 100644 --- a/store/postgres/src/relational.rs +++ b/store/postgres/src/relational.rs @@ -518,12 +518,15 @@ impl Layout { pub fn find_range( &self, conn: &mut PgConnection, - entity_type: &EntityType, + entity_types: Vec, block_range: Range, ) -> Result>, StoreError> { - let table = self.table_for_entity(entity_type)?; + let mut tables = vec![]; + for et in entity_types { + tables.push(self.table_for_entity(&et)?.as_ref()) + } let mut entities: BTreeMap> = BTreeMap::new(); - if let Some(vec) = FindRangeQuery::new(table.as_ref(), block_range) + if let Some(vec) = FindRangeQuery::new(&tables, block_range) .get_results::(conn) .optional()? { diff --git a/store/postgres/src/relational_queries.rs b/store/postgres/src/relational_queries.rs index e01cc3db85b..e15a37298c8 100644 --- a/store/postgres/src/relational_queries.rs +++ b/store/postgres/src/relational_queries.rs @@ -2008,14 +2008,20 @@ impl<'a, Conn> RunQueryDsl for FindQuery<'a> {} #[derive(Debug, Clone)] pub struct FindRangeQuery<'a> { - table: &'a Table, - eb_range: EntityBlockRange, + tables: &'a Vec<&'a Table>, + imm_range: EntityBlockRange, + mut_range: EntityBlockRange, } impl<'a> FindRangeQuery<'a> { - pub fn new(table: &'a Table, block_range: Range) -> Self { - let eb_range = EntityBlockRange::new(table.immutable, block_range); - Self { table, eb_range } + pub fn new(tables: &'a Vec<&Table>, block_range: Range) -> Self { + let imm_range = EntityBlockRange::new(true, block_range.clone()); + let mut_range = EntityBlockRange::new(false, block_range); + Self { + tables, + imm_range, + mut_range, + } } } @@ -2023,22 +2029,43 @@ impl<'a> QueryFragment for FindRangeQuery<'a> { fn walk_ast<'b>(&'b self, mut out: AstPass<'_, 'b, Pg>) -> QueryResult<()> { out.unsafe_to_cache_prepared(); - // Generate - // select '..' as entity, to_jsonb(e.*) as data - // from schema.table e where id = $1 - out.push_sql("select "); - out.push_bind_param::(self.table.object.as_str())?; - out.push_sql(" as entity, to_jsonb(e.*) as data\n"); - out.push_sql(" from "); - out.push_sql(self.table.qualified_name.as_str()); - out.push_sql(" e\n where "); - // TODO: do we need to care about it? - // if self.table.has_causality_region { - // out.push_sql("causality_region = "); - // out.push_bind_param::(&self.key.causality_region)?; - // out.push_sql(" and "); - // } - self.eb_range.contains(&mut out) + let mut iter = self.tables.iter().peekable(); + while let Some(table) = iter.next() { + // Generate + // select '..' as entity, to_jsonb(e.*) as data, block$ as block_number + // from schema.table e where id = $1 + out.push_sql("select "); + out.push_bind_param::(table.object.as_str())?; + out.push_sql(" as entity, to_jsonb(e.*) as data,"); + if table.immutable { + self.imm_range.compare_column(&mut out) + } else { + self.mut_range.compare_column(&mut out) + } + out.push_sql("as block_number\n"); + out.push_sql(" from "); + out.push_sql(table.qualified_name.as_str()); + out.push_sql(" e\n where"); + // TODO: do we need to care about it? + // if self.table.has_causality_region { + // out.push_sql("causality_region = "); + // out.push_bind_param::(&self.key.causality_region)?; + // out.push_sql(" and "); + // } + if table.immutable { + self.imm_range.contains(&mut out)?; + } else { + self.mut_range.contains(&mut out)?; + } + // more elements left? + if iter.peek().is_some() { + out.push_sql("\nunion all\n"); // with the next + } else { + out.push_sql("\norder by block_number"); // on the last + } + } + + Ok(()) } } diff --git a/store/postgres/src/writable.rs b/store/postgres/src/writable.rs index 6c2faef3e14..de95797b164 100644 --- a/store/postgres/src/writable.rs +++ b/store/postgres/src/writable.rs @@ -354,12 +354,15 @@ impl SyncStore { fn get_range( &self, - entity_type: &EntityType, + entity_types: Vec, block_range: Range, ) -> Result>, StoreError> { retry::forever(&self.logger, "get_range", || { - self.writable - .get_range(self.site.cheap_clone(), entity_type, block_range.clone()) + self.writable.get_range( + self.site.cheap_clone(), + entity_types.clone(), + block_range.clone(), + ) }) } @@ -1230,10 +1233,10 @@ impl Queue { fn get_range( &self, - entity_type: &EntityType, + entity_types: Vec, block_range: Range, ) -> Result>, StoreError> { - self.store.get_range(entity_type, block_range) + self.store.get_range(entity_types, block_range) } fn get_derived( @@ -1450,12 +1453,12 @@ impl Writer { fn get_range( &self, - entity_type: &EntityType, + entity_types: Vec, block_range: Range, ) -> Result>, StoreError> { match self { - Writer::Sync(store) => store.get_range(entity_type, block_range), - Writer::Async { queue, .. } => queue.get_range(entity_type, block_range), + Writer::Sync(store) => store.get_range(entity_types, block_range), + Writer::Async { queue, .. } => queue.get_range(entity_types, block_range), } } @@ -1589,10 +1592,10 @@ impl ReadStore for WritableStore { fn get_range( &self, - entity_type: &EntityType, + entity_types: Vec, block_range: Range, ) -> Result>, StoreError> { - self.writer.get_range(entity_type, block_range) + self.writer.get_range(entity_types, block_range) } fn get_derived( diff --git a/store/test-store/tests/graph/entity_cache.rs b/store/test-store/tests/graph/entity_cache.rs index 314f82d1072..a80b5d6d0c2 100644 --- a/store/test-store/tests/graph/entity_cache.rs +++ b/store/test-store/tests/graph/entity_cache.rs @@ -69,7 +69,7 @@ impl ReadStore for MockStore { fn get_range( &self, - _entity_type: &EntityType, + _entity_types: Vec, _block_range: Range, ) -> Result>, StoreError> { todo!() diff --git a/store/test-store/tests/postgres/writable.rs b/store/test-store/tests/postgres/writable.rs index b5f9842fe81..1eae8938730 100644 --- a/store/test-store/tests/postgres/writable.rs +++ b/store/test-store/tests/postgres/writable.rs @@ -125,7 +125,6 @@ async fn insert_count( count: u8, counter_type: &EntityType, id: &str, - id2: &str, ) { let count_key_local = |id: &str| counter_type.parse_key(id).unwrap(); let data = entity! { TEST_SUBGRAPH_SCHEMA => @@ -136,22 +135,9 @@ async fn insert_count( key: count_key_local(&data.get("id").unwrap().to_string()), data, }; - let data = entity! { TEST_SUBGRAPH_SCHEMA => - id: id2, - count :count as i32, - }; - let entity_op2 = EntityOperation::Set { - key: count_key_local(&data.get("id").unwrap().to_string()), - data, - }; - transact_entity_operations( - store, - deployment, - block_pointer(block), - vec![entity_op, entity_op2], - ) - .await - .unwrap(); + transact_entity_operations(store, deployment, block_pointer(block), vec![entity_op]) + .await + .unwrap(); } async fn insert_count_mutable( @@ -160,7 +146,7 @@ async fn insert_count_mutable( block: u8, count: u8, ) { - insert_count(store, deployment, block, count, &COUNTER_TYPE, "1", "2").await; + insert_count(store, deployment, block, count, &COUNTER_TYPE, "1").await; } async fn insert_count_immutable( @@ -175,8 +161,7 @@ async fn insert_count_immutable( block, count, &COUNTER2_TYPE, - &(block).to_string(), - &(block + 1).to_string(), + &(block / 2).to_string(), ) .await; } @@ -343,46 +328,22 @@ fn restart() { }) } -async fn read_range( - store: Arc, - writable: Arc, - deployment: DeploymentLocator, - mutable: bool, -) -> usize { - let subgraph_store = store.subgraph_store(); - writable.deployment_synced().unwrap(); - - for count in 1..=7 { - if mutable { - insert_count_mutable(&subgraph_store, &deployment, 2 * count, 4 * count).await - } else { - insert_count_immutable(&subgraph_store, &deployment, 2 * count, 4 * count).await - } - } - writable.flush().await.unwrap(); - - let br: Range = 4..8; - let et: &EntityType = if mutable { - &COUNTER_TYPE - } else { - &COUNTER2_TYPE - }; - let e = writable.get_range(et, br).unwrap(); - e.iter().map(|(_, v)| v.iter()).flatten().count() -} - #[test] -fn read_range_mutable() { +fn read_range_test() { run_test(|store, writable, deployment| async move { - let num_entities = read_range(store, writable, deployment, true).await; - assert_eq!(num_entities, 6) // TODO: fix it - it should be 4 as the range is open - }) -} + let subgraph_store = store.subgraph_store(); + writable.deployment_synced().unwrap(); -#[test] -fn read_range_immutable() { - run_test(|store, writable, deployment| async move { - let num_entities = read_range(store, writable, deployment, false).await; - assert_eq!(num_entities, 6) // TODO: fix it - it should be 4 as the range is open + for count in 1..=7 { + insert_count_mutable(&subgraph_store, &deployment, 2 * count, 4 * count).await; + insert_count_immutable(&subgraph_store, &deployment, 2 * count + 1, 4 * count).await; + } + writable.flush().await.unwrap(); + + let br: Range = 4..8; + writable.deployment_synced().unwrap(); + let entity_types = vec![COUNTER_TYPE.clone(), COUNTER2_TYPE.clone()]; + let e = writable.get_range(entity_types, br).unwrap(); + assert_eq!(e.len(), 5) // TODO: fix it - it should be 4 as the range is open }) } From 79154500a1340ef5b189bf0918d41a2babc3829f Mon Sep 17 00:00:00 2001 From: Zoran Cvetkov Date: Fri, 23 Aug 2024 18:46:15 +0300 Subject: [PATCH 2/7] add types to the enities --- graph/src/blockchain/block_stream.rs | 28 +------------------- graph/src/components/store/mod.rs | 3 ++- graph/src/components/store/traits.rs | 6 ++--- store/postgres/src/deployment_store.rs | 4 +-- store/postgres/src/relational.rs | 20 +++++++++----- store/postgres/src/writable.rs | 10 +++---- store/test-store/tests/graph/entity_cache.rs | 4 +-- 7 files changed, 29 insertions(+), 46 deletions(-) diff --git a/graph/src/blockchain/block_stream.rs b/graph/src/blockchain/block_stream.rs index 43d05b0e518..e3f61688fde 100644 --- a/graph/src/blockchain/block_stream.rs +++ b/graph/src/blockchain/block_stream.rs @@ -437,33 +437,7 @@ async fn get_entities_for_range( schema: &InputSchema, from: BlockNumber, to: BlockNumber, - /* - ) -> Result>, Error> { - let mut entities_by_block = BTreeMap::new(); - - for entity_name in &filter.entities { - let entity_type = schema.entity_type(entity_name)?; - - let entity_ranges = store.get_range(&entity_type, from..to)?; - - for (block_number, entity_vec) in entity_ranges { - let mut entity_vec = entity_vec - .into_iter() - .map(|e| EntityWithType { - entity_type: entity_type.clone(), - entity: e, - }) - .collect(); - - entities_by_block - .entry(block_number) - .and_modify(|existing_vec: &mut Vec| { - existing_vec.append(&mut entity_vec); - }) - .or_insert(entity_vec); - } - */ -) -> Result>, Error> { +) -> Result>, Error> { let mut entity_types = vec![]; for entity_name in &filter.entities { let entity_type = schema.entity_type(entity_name)?; diff --git a/graph/src/components/store/mod.rs b/graph/src/components/store/mod.rs index 72fd88bed33..ccaa78a9a4f 100644 --- a/graph/src/components/store/mod.rs +++ b/graph/src/components/store/mod.rs @@ -26,6 +26,7 @@ use std::sync::atomic::{AtomicUsize, Ordering}; use std::sync::{Arc, RwLock}; use std::time::Duration; +use crate::blockchain::block_stream::EntityWithType; use crate::blockchain::{Block, BlockHash, BlockPtr}; use crate::cheap_clone::CheapClone; use crate::components::store::write::EntityModification; @@ -1043,7 +1044,7 @@ impl ReadStore for EmptyStore { &self, _entity_types: Vec, _block_range: Range, - ) -> Result>, StoreError> { + ) -> Result>, StoreError> { Ok(BTreeMap::new()) } diff --git a/graph/src/components/store/traits.rs b/graph/src/components/store/traits.rs index 79587b6653d..a7cecad129b 100644 --- a/graph/src/components/store/traits.rs +++ b/graph/src/components/store/traits.rs @@ -6,7 +6,7 @@ use async_trait::async_trait; use web3::types::{Address, H256}; use super::*; -use crate::blockchain::block_stream::FirehoseCursor; +use crate::blockchain::block_stream::{EntityWithType, FirehoseCursor}; use crate::blockchain::{BlockTime, ChainIdentifier}; use crate::components::metrics::stopwatch::StopwatchMetrics; use crate::components::server::index_node::VersionInfo; @@ -233,7 +233,7 @@ pub trait ReadStore: Send + Sync + 'static { &self, entity_types: Vec, block_range: Range, - ) -> Result>, StoreError>; + ) -> Result>, StoreError>; /// Reverse lookup fn get_derived( @@ -261,7 +261,7 @@ impl ReadStore for Arc { &self, entity_types: Vec, block_range: Range, - ) -> Result>, StoreError> { + ) -> Result>, StoreError> { (**self).get_range(entity_types, block_range) } diff --git a/store/postgres/src/deployment_store.rs b/store/postgres/src/deployment_store.rs index 608329ab5a2..faa50a0d1ad 100644 --- a/store/postgres/src/deployment_store.rs +++ b/store/postgres/src/deployment_store.rs @@ -4,7 +4,7 @@ use diesel::pg::PgConnection; use diesel::r2d2::{ConnectionManager, PooledConnection}; use diesel::{prelude::*, sql_query}; use graph::anyhow::Context; -use graph::blockchain::block_stream::FirehoseCursor; +use graph::blockchain::block_stream::{EntityWithType, FirehoseCursor}; use graph::blockchain::BlockTime; use graph::components::store::write::RowGroup; use graph::components::store::{ @@ -1061,7 +1061,7 @@ impl DeploymentStore { site: Arc, entity_types: Vec, block_range: Range, - ) -> Result>, StoreError> { + ) -> Result>, StoreError> { let mut conn = self.get_conn()?; let layout = self.layout(&mut conn, site)?; layout.find_range(&mut conn, entity_types, block_range) diff --git a/store/postgres/src/relational.rs b/store/postgres/src/relational.rs index 99a59b35035..15a6f333c7d 100644 --- a/store/postgres/src/relational.rs +++ b/store/postgres/src/relational.rs @@ -24,6 +24,7 @@ use diesel::serialize::{Output, ToSql}; use diesel::sql_types::Text; use diesel::{connection::SimpleConnection, Connection}; use diesel::{debug_query, sql_query, OptionalExtension, PgConnection, QueryResult, RunQueryDsl}; +use graph::blockchain::block_stream::EntityWithType; use graph::blockchain::BlockTime; use graph::cheap_clone::CheapClone; use graph::components::store::write::{RowGroup, WriteChunk}; @@ -520,23 +521,30 @@ impl Layout { conn: &mut PgConnection, entity_types: Vec, block_range: Range, - ) -> Result>, StoreError> { + ) -> Result>, StoreError> { let mut tables = vec![]; + let mut et_map: HashMap> = HashMap::new(); for et in entity_types { - tables.push(self.table_for_entity(&et)?.as_ref()) + tables.push(self.table_for_entity(&et)?.as_ref()); + et_map.insert(et.to_string(), Arc::new(et)); } - let mut entities: BTreeMap> = BTreeMap::new(); + let mut entities: BTreeMap> = BTreeMap::new(); if let Some(vec) = FindRangeQuery::new(&tables, block_range) .get_results::(conn) .optional()? { for e in vec { let block = e.clone().deserialize_block_number::()?; - let en = e.deserialize_with_layout::(self, None)?; + let entity_type = e.entity_type(&self.input_schema); + let entity = e.deserialize_with_layout::(self, None)?; + let ewt = EntityWithType { + entity_type, + entity, + }; match entities.get_mut(&block) { - Some(vec) => vec.push(en), + Some(vec) => vec.push(ewt), None => { - let _ = entities.insert(block, vec![en]); + let _ = entities.insert(block, vec![ewt]); } }; } diff --git a/store/postgres/src/writable.rs b/store/postgres/src/writable.rs index de95797b164..3436db04004 100644 --- a/store/postgres/src/writable.rs +++ b/store/postgres/src/writable.rs @@ -5,7 +5,7 @@ use std::sync::{Mutex, RwLock, TryLockError as RwLockError}; use std::time::Instant; use std::{collections::BTreeMap, sync::Arc}; -use graph::blockchain::block_stream::FirehoseCursor; +use graph::blockchain::block_stream::{EntityWithType, FirehoseCursor}; use graph::blockchain::BlockTime; use graph::components::store::{Batch, DeploymentCursorTracker, DerivedEntityQuery, ReadStore}; use graph::constraint_violation; @@ -356,7 +356,7 @@ impl SyncStore { &self, entity_types: Vec, block_range: Range, - ) -> Result>, StoreError> { + ) -> Result>, StoreError> { retry::forever(&self.logger, "get_range", || { self.writable.get_range( self.site.cheap_clone(), @@ -1235,7 +1235,7 @@ impl Queue { &self, entity_types: Vec, block_range: Range, - ) -> Result>, StoreError> { + ) -> Result>, StoreError> { self.store.get_range(entity_types, block_range) } @@ -1455,7 +1455,7 @@ impl Writer { &self, entity_types: Vec, block_range: Range, - ) -> Result>, StoreError> { + ) -> Result>, StoreError> { match self { Writer::Sync(store) => store.get_range(entity_types, block_range), Writer::Async { queue, .. } => queue.get_range(entity_types, block_range), @@ -1594,7 +1594,7 @@ impl ReadStore for WritableStore { &self, entity_types: Vec, block_range: Range, - ) -> Result>, StoreError> { + ) -> Result>, StoreError> { self.writer.get_range(entity_types, block_range) } diff --git a/store/test-store/tests/graph/entity_cache.rs b/store/test-store/tests/graph/entity_cache.rs index a80b5d6d0c2..15315d89333 100644 --- a/store/test-store/tests/graph/entity_cache.rs +++ b/store/test-store/tests/graph/entity_cache.rs @@ -1,4 +1,4 @@ -use graph::blockchain::block_stream::FirehoseCursor; +use graph::blockchain::block_stream::{EntityWithType, FirehoseCursor}; use graph::blockchain::BlockTime; use graph::components::store::{ DeploymentCursorTracker, DerivedEntityQuery, GetScope, LoadRelatedRequest, ReadStore, @@ -71,7 +71,7 @@ impl ReadStore for MockStore { &self, _entity_types: Vec, _block_range: Range, - ) -> Result>, StoreError> { + ) -> Result>, StoreError> { todo!() } From 20e6e8ed6d5338297634b18e2a00dc1649febae2 Mon Sep 17 00:00:00 2001 From: Zoran Cvetkov Date: Wed, 28 Aug 2024 20:09:51 +0300 Subject: [PATCH 3/7] query for both lower and upper range --- graph/src/blockchain/block_stream.rs | 9 +++ store/postgres/src/block_range.rs | 20 ++++-- store/postgres/src/relational.rs | 7 +- store/postgres/src/relational_queries.rs | 82 +++++++++++++----------- 4 files changed, 75 insertions(+), 43 deletions(-) diff --git a/graph/src/blockchain/block_stream.rs b/graph/src/blockchain/block_stream.rs index e3f61688fde..caa9ca25e31 100644 --- a/graph/src/blockchain/block_stream.rs +++ b/graph/src/blockchain/block_stream.rs @@ -426,7 +426,16 @@ async fn scan_subgraph_triggers( } } +#[derive(Debug)] +pub enum EntitySubgraphOperation { + Create, + Modify, + Delete, +} + +#[derive(Debug)] pub struct EntityWithType { + pub entity_op: EntitySubgraphOperation, pub entity_type: EntityType, pub entity: Entity, } diff --git a/store/postgres/src/block_range.rs b/store/postgres/src/block_range.rs index 9c7af7cd361..7b06d2223bb 100644 --- a/store/postgres/src/block_range.rs +++ b/store/postgres/src/block_range.rs @@ -135,19 +135,23 @@ impl<'a> QueryFragment for BlockRangeUpperBoundClause<'a> { /// Helper for generating SQL fragments for selecting entities in a specific block range #[derive(Debug, Clone, Copy)] pub enum EntityBlockRange { - Mutable(BlockRange), // TODO: check if this is a proper type here (maybe Range?) + Mutable((BlockRange, bool)), // TODO: check if this is a proper type here (maybe Range?) Immutable(BlockRange), } impl EntityBlockRange { - pub fn new(immutable: bool, block_range: std::ops::Range) -> Self { + pub fn new( + immutable: bool, + block_range: std::ops::Range, + is_uppper_range: bool, + ) -> Self { let start: Bound = Bound::Included(block_range.start); let end: Bound = Bound::Excluded(block_range.end); let block_range: BlockRange = BlockRange(start, end); if immutable { Self::Immutable(block_range) } else { - Self::Mutable(block_range) + Self::Mutable((block_range, is_uppper_range)) } } @@ -155,7 +159,7 @@ impl EntityBlockRange { pub fn contains<'b>(&'b self, out: &mut AstPass<'_, 'b, Pg>) -> QueryResult<()> { out.unsafe_to_cache_prepared(); let block_range = match self { - EntityBlockRange::Mutable(br) => br, + EntityBlockRange::Mutable((br, _)) => br, EntityBlockRange::Immutable(br) => br, }; let BlockRange(start, finish) = block_range; @@ -186,7 +190,13 @@ impl EntityBlockRange { pub fn compare_column(&self, out: &mut AstPass) { match self { - EntityBlockRange::Mutable(_) => out.push_sql(" lower(block_range) "), + EntityBlockRange::Mutable((_, is_upper_range)) => { + if *is_upper_range { + out.push_sql(" upper(block_range) ") + } else { + out.push_sql(" lower(block_range) ") + } + } EntityBlockRange::Immutable(_) => out.push_sql(" block$ "), } } diff --git a/store/postgres/src/relational.rs b/store/postgres/src/relational.rs index 15a6f333c7d..392f4498dbd 100644 --- a/store/postgres/src/relational.rs +++ b/store/postgres/src/relational.rs @@ -24,7 +24,7 @@ use diesel::serialize::{Output, ToSql}; use diesel::sql_types::Text; use diesel::{connection::SimpleConnection, Connection}; use diesel::{debug_query, sql_query, OptionalExtension, PgConnection, QueryResult, RunQueryDsl}; -use graph::blockchain::block_stream::EntityWithType; +use graph::blockchain::block_stream::{EntitySubgraphOperation, EntityWithType}; use graph::blockchain::BlockTime; use graph::cheap_clone::CheapClone; use graph::components::store::write::{RowGroup, WriteChunk}; @@ -529,15 +529,18 @@ impl Layout { et_map.insert(et.to_string(), Arc::new(et)); } let mut entities: BTreeMap> = BTreeMap::new(); - if let Some(vec) = FindRangeQuery::new(&tables, block_range) + if let Some(vec) = FindRangeQuery::new(&tables, false, block_range) .get_results::(conn) .optional()? { + // TODO: issue query with upper range and find modifictions and deletions for e in vec { let block = e.clone().deserialize_block_number::()?; let entity_type = e.entity_type(&self.input_schema); let entity = e.deserialize_with_layout::(self, None)?; + let entity_op = EntitySubgraphOperation::Create; let ewt = EntityWithType { + entity_op, entity_type, entity, }; diff --git a/store/postgres/src/relational_queries.rs b/store/postgres/src/relational_queries.rs index e15a37298c8..dfc55ee2225 100644 --- a/store/postgres/src/relational_queries.rs +++ b/store/postgres/src/relational_queries.rs @@ -2009,16 +2009,22 @@ impl<'a, Conn> RunQueryDsl for FindQuery<'a> {} #[derive(Debug, Clone)] pub struct FindRangeQuery<'a> { tables: &'a Vec<&'a Table>, + is_upper_range: bool, imm_range: EntityBlockRange, mut_range: EntityBlockRange, } impl<'a> FindRangeQuery<'a> { - pub fn new(tables: &'a Vec<&Table>, block_range: Range) -> Self { - let imm_range = EntityBlockRange::new(true, block_range.clone()); - let mut_range = EntityBlockRange::new(false, block_range); + pub fn new( + tables: &'a Vec<&Table>, + is_upper_range: bool, + block_range: Range, + ) -> Self { + let imm_range = EntityBlockRange::new(true, block_range.clone(), false); + let mut_range = EntityBlockRange::new(false, block_range, is_upper_range); Self { tables, + is_upper_range, imm_range, mut_range, } @@ -2028,42 +2034,46 @@ impl<'a> FindRangeQuery<'a> { impl<'a> QueryFragment for FindRangeQuery<'a> { fn walk_ast<'b>(&'b self, mut out: AstPass<'_, 'b, Pg>) -> QueryResult<()> { out.unsafe_to_cache_prepared(); + let mut first = true; - let mut iter = self.tables.iter().peekable(); - while let Some(table) = iter.next() { - // Generate - // select '..' as entity, to_jsonb(e.*) as data, block$ as block_number - // from schema.table e where id = $1 - out.push_sql("select "); - out.push_bind_param::(table.object.as_str())?; - out.push_sql(" as entity, to_jsonb(e.*) as data,"); - if table.immutable { - self.imm_range.compare_column(&mut out) - } else { - self.mut_range.compare_column(&mut out) - } - out.push_sql("as block_number\n"); - out.push_sql(" from "); - out.push_sql(table.qualified_name.as_str()); - out.push_sql(" e\n where"); - // TODO: do we need to care about it? - // if self.table.has_causality_region { - // out.push_sql("causality_region = "); - // out.push_bind_param::(&self.key.causality_region)?; - // out.push_sql(" and "); - // } - if table.immutable { - self.imm_range.contains(&mut out)?; - } else { - self.mut_range.contains(&mut out)?; - } - // more elements left? - if iter.peek().is_some() { - out.push_sql("\nunion all\n"); // with the next - } else { - out.push_sql("\norder by block_number"); // on the last + for table in self.tables.iter() { + // the immutable entities don't have upper range and also can't be modified or deleted + if !(self.is_upper_range && table.immutable) { + if first { + first = false; + } else { + out.push_sql("\nunion all\n"); + } + + // Generate + // select '..' as entity, to_jsonb(e.*) as data, block$ as block_number + // from schema.table e where id = $1 + out.push_sql("select "); + out.push_bind_param::(table.object.as_str())?; + out.push_sql(" as entity, to_jsonb(e.*) as data,"); + if table.immutable { + self.imm_range.compare_column(&mut out) + } else { + self.mut_range.compare_column(&mut out) + } + out.push_sql("as block_number, id\n"); + out.push_sql(" from "); + out.push_sql(table.qualified_name.as_str()); + out.push_sql(" e\n where"); + // TODO: do we need to care about it? + // if self.table.has_causality_region { + // out.push_sql("causality_region = "); + // out.push_bind_param::(&self.key.causality_region)?; + // out.push_sql(" and "); + // } + if table.immutable { + self.imm_range.contains(&mut out)?; + } else { + self.mut_range.contains(&mut out)?; + } } } + out.push_sql("\norder by block_number, entity, id"); Ok(()) } From 98aa78ad013428a11be8991453c686d5fdfc54bf Mon Sep 17 00:00:00 2001 From: Zoran Cvetkov Date: Mon, 2 Sep 2024 17:28:33 +0300 Subject: [PATCH 4/7] deduce the types of operation create, modify or delete --- store/postgres/src/relational.rs | 101 ++++++++++++++++++----- store/postgres/src/relational_queries.rs | 78 +++++++++-------- 2 files changed, 124 insertions(+), 55 deletions(-) diff --git a/store/postgres/src/relational.rs b/store/postgres/src/relational.rs index 392f4498dbd..e2475e8f220 100644 --- a/store/postgres/src/relational.rs +++ b/store/postgres/src/relational.rs @@ -53,8 +53,8 @@ use std::sync::{Arc, Mutex}; use std::time::{Duration, Instant}; use crate::relational_queries::{ - ConflictingEntitiesData, ConflictingEntitiesQuery, FindChangesQuery, FindDerivedQuery, - FindPossibleDeletionsQuery, ReturnedEntityData, + ConflictingEntitiesData, ConflictingEntitiesQuery, EntityDataExt, FindChangesQuery, + FindDerivedQuery, FindPossibleDeletionsQuery, ReturnedEntityData, }; use crate::{ primary::{Namespace, Site}, @@ -529,28 +529,85 @@ impl Layout { et_map.insert(et.to_string(), Arc::new(et)); } let mut entities: BTreeMap> = BTreeMap::new(); - if let Some(vec) = FindRangeQuery::new(&tables, false, block_range) - .get_results::(conn) + let lower_vec = FindRangeQuery::new(&tables, false, block_range.clone()) + .get_results::(conn) .optional()? - { - // TODO: issue query with upper range and find modifictions and deletions - for e in vec { - let block = e.clone().deserialize_block_number::()?; - let entity_type = e.entity_type(&self.input_schema); - let entity = e.deserialize_with_layout::(self, None)?; - let entity_op = EntitySubgraphOperation::Create; - let ewt = EntityWithType { - entity_op, - entity_type, - entity, - }; - match entities.get_mut(&block) { - Some(vec) => vec.push(ewt), - None => { - let _ = entities.insert(block, vec![ewt]); + .unwrap_or_default(); + let upper_vec = FindRangeQuery::new(&tables, true, block_range) + .get_results::(conn) + .optional()? + .unwrap_or_default(); + let mut lower_iter = lower_vec.iter().fuse().peekable(); + let mut upper_iter = upper_vec.iter().fuse().peekable(); + let mut lower_now = lower_iter.next(); + let mut upper_now = upper_iter.next(); + let mut lower = lower_now.unwrap_or(&EntityDataExt::default()).clone(); + let mut upper = upper_now.unwrap_or(&EntityDataExt::default()).clone(); + let transform = |ede: EntityDataExt, + entity_op: EntitySubgraphOperation| + -> Result<(EntityWithType, BlockNumber), StoreError> { + let e = EntityData::new(ede.entity, ede.data); + let block = ede.block_number; + let entity_type = e.entity_type(&self.input_schema); + let entity = e.deserialize_with_layout::(self, None)?; + let ewt = EntityWithType { + entity_op, + entity_type, + entity, + }; + Ok((ewt, block)) + }; + while lower_now.is_some() || upper_now.is_some() { + let (ewt, block) = if lower_now.is_some() { + if upper_now.is_some() { + if lower > upper { + // we have upper bound at this block, but no lower bounds at the same block so it's deletion + let (ewt, block) = transform(upper, EntitySubgraphOperation::Delete)?; + // advance upper + upper_now = upper_iter.next(); + upper = upper_now.unwrap_or(&EntityDataExt::default()).clone(); + (ewt, block) + } else if lower < upper { + // we have lower bound at this block but no upper bound at the same block so its creation + let (ewt, block) = transform(lower, EntitySubgraphOperation::Create)?; + // advance lower + lower_now = lower_iter.next(); + lower = lower_now.unwrap_or(&EntityDataExt::default()).clone(); + (ewt, block) + } else { + assert!(upper == lower); + let (ewt, block) = transform(lower, EntitySubgraphOperation::Modify)?; + // advance both + lower_now = lower_iter.next(); + lower = lower_now.unwrap_or(&EntityDataExt::default()).clone(); + upper_now = upper_iter.next(); + upper = upper_now.unwrap_or(&EntityDataExt::default()).clone(); + (ewt, block) } - }; - } + } else { + // we have lower bound at this block but no upper bound at the same block so its creation + let (ewt, block) = transform(lower, EntitySubgraphOperation::Create)?; + // advance lower + lower_now = lower_iter.next(); + lower = lower_now.unwrap_or(&EntityDataExt::default()).clone(); + (ewt, block) + } + } else { + // we have upper bound at this block, but no lower bounds at all so it's deletion + assert!(upper_now.is_some()); + let (ewt, block) = transform(upper, EntitySubgraphOperation::Delete)?; + // advance upper + upper_now = upper_iter.next(); + upper = upper_now.unwrap_or(&EntityDataExt::default()).clone(); + (ewt, block) + }; + + match entities.get_mut(&block) { + Some(vec) => vec.push(ewt), + None => { + let _ = entities.insert(block, vec![ewt]); + } + }; } Ok(entities) } diff --git a/store/postgres/src/relational_queries.rs b/store/postgres/src/relational_queries.rs index dfc55ee2225..2f8a936b535 100644 --- a/store/postgres/src/relational_queries.rs +++ b/store/postgres/src/relational_queries.rs @@ -17,7 +17,6 @@ use graph::data::store::{Id, IdType, NULL}; use graph::data::store::{IdList, IdRef, QueryObject}; use graph::data::value::{Object, Word}; use graph::data_source::CausalityRegion; -use graph::prelude::regex::Regex; use graph::prelude::{ anyhow, r, serde_json, BlockNumber, ChildMultiplicity, Entity, EntityCollection, EntityFilter, EntityLink, EntityOrder, EntityOrderByChild, EntityOrderByChildInfo, EntityRange, EntityWindow, @@ -27,6 +26,7 @@ use graph::schema::{EntityKey, EntityType, FulltextAlgorithm, FulltextConfig, In use graph::{components::store::AttributeNames, data::store::scalar}; use inflector::Inflector; use itertools::Itertools; +use std::cmp::Ordering; use std::collections::{BTreeMap, BTreeSet, HashSet}; use std::convert::TryFrom; use std::fmt::{self, Display}; @@ -478,40 +478,12 @@ pub struct EntityData { } impl EntityData { - pub fn entity_type(&self, schema: &InputSchema) -> EntityType { - schema.entity_type(&self.entity).unwrap() + pub fn new(entity: String, data: serde_json::Value) -> EntityData { + EntityData { entity, data } } - pub fn deserialize_block_number(self) -> Result { - use serde_json::Value as j; - match self.data { - j::Object(map) => { - let mut entries = map.into_iter().filter_map(move |(key, json)| { - if key == "block_range" { - let r = json.as_str().unwrap(); - let rx = Regex::new("\\[(?P[0-9]+),([0-9]+)?\\)").unwrap(); - let cap = rx.captures(r).unwrap(); - let start = cap - .name("start") - .map(|mtch| mtch.as_str().to_string()) - .unwrap(); - let n = start.parse::().unwrap(); - Some(n) - } else if key == "block$" { - let block = json.as_i64().unwrap() as i32; - Some(block) - } else { - None - } - }); - let en = entries.next().unwrap(); - assert!(entries.next().is_none()); // there should be just one block_range field - Ok(en) - } - _ => unreachable!( - "we use `to_json` in our queries, and will therefore always get an object back" - ), - } + pub fn entity_type(&self, schema: &InputSchema) -> EntityType { + schema.entity_type(&self.entity).unwrap() } /// Map the `EntityData` using the schema information in `Layout` @@ -586,6 +558,46 @@ impl EntityData { } } +#[derive(QueryableByName, Clone, Debug, Default, Eq)] +pub struct EntityDataExt { + #[diesel(sql_type = Text)] + pub entity: String, + #[diesel(sql_type = Jsonb)] + pub data: serde_json::Value, + #[diesel(sql_type = Integer)] + pub block_number: i32, + #[diesel(sql_type = Text)] + pub id: String, +} + +impl Ord for EntityDataExt { + fn cmp(&self, other: &Self) -> Ordering { + let ord = self.block_number.cmp(&other.block_number); + if ord != Ordering::Equal { + ord + } else { + let ord = self.entity.cmp(&other.entity); + if ord != Ordering::Equal { + ord + } else { + self.id.cmp(&other.id) + } + } + } +} + +impl PartialOrd for EntityDataExt { + fn partial_cmp(&self, other: &Self) -> Option { + Some(self.cmp(other)) + } +} + +impl PartialEq for EntityDataExt { + fn eq(&self, other: &Self) -> bool { + self.cmp(other) == Ordering::Equal + } +} + /// The equivalent of `graph::data::store::Value` but in a form that does /// not require further transformation during `walk_ast`. This form takes /// the idiosyncrasies of how we serialize values into account (e.g., that From 504ccdde5e116663c6e6a8f22475184a31d3080e Mon Sep 17 00:00:00 2001 From: Zoran Cvetkov Date: Tue, 3 Sep 2024 13:41:55 +0300 Subject: [PATCH 5/7] add vid to the result struct in order to sort it afterwards --- graph/src/blockchain/block_stream.rs | 1 + store/postgres/src/relational.rs | 8 ++++++++ store/postgres/src/relational_queries.rs | 4 +++- 3 files changed, 12 insertions(+), 1 deletion(-) diff --git a/graph/src/blockchain/block_stream.rs b/graph/src/blockchain/block_stream.rs index caa9ca25e31..9f8b6fce8a9 100644 --- a/graph/src/blockchain/block_stream.rs +++ b/graph/src/blockchain/block_stream.rs @@ -438,6 +438,7 @@ pub struct EntityWithType { pub entity_op: EntitySubgraphOperation, pub entity_type: EntityType, pub entity: Entity, + pub vid: i64, } async fn get_entities_for_range( diff --git a/store/postgres/src/relational.rs b/store/postgres/src/relational.rs index e2475e8f220..4402dcefb6c 100644 --- a/store/postgres/src/relational.rs +++ b/store/postgres/src/relational.rs @@ -550,10 +550,12 @@ impl Layout { let block = ede.block_number; let entity_type = e.entity_type(&self.input_schema); let entity = e.deserialize_with_layout::(self, None)?; + let vid = ede.vid; let ewt = EntityWithType { entity_op, entity_type, entity, + vid, }; Ok((ewt, block)) }; @@ -609,6 +611,12 @@ impl Layout { } }; } + + // sort the elements in each blocks bucket by vid + for (_, vec) in &mut entities { + vec.sort_by(|a, b| a.vid.cmp(&b.vid)); + } + Ok(entities) } diff --git a/store/postgres/src/relational_queries.rs b/store/postgres/src/relational_queries.rs index 2f8a936b535..abe8e706ac9 100644 --- a/store/postgres/src/relational_queries.rs +++ b/store/postgres/src/relational_queries.rs @@ -568,6 +568,8 @@ pub struct EntityDataExt { pub block_number: i32, #[diesel(sql_type = Text)] pub id: String, + #[diesel(sql_type = BigInt)] + pub vid: i64, } impl Ord for EntityDataExt { @@ -2068,7 +2070,7 @@ impl<'a> QueryFragment for FindRangeQuery<'a> { } else { self.mut_range.compare_column(&mut out) } - out.push_sql("as block_number, id\n"); + out.push_sql("as block_number, id, vid\n"); out.push_sql(" from "); out.push_sql(table.qualified_name.as_str()); out.push_sql(" e\n where"); From 74093de914821fd1ee2a3efcb7ab714957627321 Mon Sep 17 00:00:00 2001 From: Zoran Cvetkov Date: Wed, 4 Sep 2024 18:29:40 +0300 Subject: [PATCH 6/7] better test --- store/postgres/src/block_range.rs | 2 +- store/postgres/src/relational_queries.rs | 3 +- store/test-store/tests/postgres/writable.rs | 112 ++++++++++++-------- 3 files changed, 68 insertions(+), 49 deletions(-) diff --git a/store/postgres/src/block_range.rs b/store/postgres/src/block_range.rs index 7b06d2223bb..13ede1096a6 100644 --- a/store/postgres/src/block_range.rs +++ b/store/postgres/src/block_range.rs @@ -135,7 +135,7 @@ impl<'a> QueryFragment for BlockRangeUpperBoundClause<'a> { /// Helper for generating SQL fragments for selecting entities in a specific block range #[derive(Debug, Clone, Copy)] pub enum EntityBlockRange { - Mutable((BlockRange, bool)), // TODO: check if this is a proper type here (maybe Range?) + Mutable((BlockRange, bool)), Immutable(BlockRange), } diff --git a/store/postgres/src/relational_queries.rs b/store/postgres/src/relational_queries.rs index abe8e706ac9..9776aef9d01 100644 --- a/store/postgres/src/relational_queries.rs +++ b/store/postgres/src/relational_queries.rs @@ -578,6 +578,7 @@ impl Ord for EntityDataExt { if ord != Ordering::Equal { ord } else { + // TODO: check if this and the next cmp for string match postgress C localle let ord = self.entity.cmp(&other.entity); if ord != Ordering::Equal { ord @@ -2074,7 +2075,7 @@ impl<'a> QueryFragment for FindRangeQuery<'a> { out.push_sql(" from "); out.push_sql(table.qualified_name.as_str()); out.push_sql(" e\n where"); - // TODO: do we need to care about it? + // TODO: add casuality region to the query // if self.table.has_causality_region { // out.push_sql("causality_region = "); // out.push_bind_param::(&self.key.causality_region)?; diff --git a/store/test-store/tests/postgres/writable.rs b/store/test-store/tests/postgres/writable.rs index 1eae8938730..ef619c13323 100644 --- a/store/test-store/tests/postgres/writable.rs +++ b/store/test-store/tests/postgres/writable.rs @@ -1,10 +1,10 @@ -use graph::blockchain::block_stream::FirehoseCursor; +use graph::blockchain::block_stream::{EntityWithType, FirehoseCursor}; use graph::data::subgraph::schema::DeploymentCreate; use graph::data::value::Word; use graph::data_source::CausalityRegion; use graph::schema::{EntityKey, EntityType, InputSchema}; use lazy_static::lazy_static; -use std::collections::BTreeSet; +use std::collections::{BTreeMap, BTreeSet}; use std::marker::PhantomData; use std::ops::Range; use test_store::*; @@ -123,49 +123,40 @@ async fn insert_count( deployment: &DeploymentLocator, block: u8, count: u8, - counter_type: &EntityType, - id: &str, + immutable: bool, ) { - let count_key_local = |id: &str| counter_type.parse_key(id).unwrap(); + let count_key_local = |counter_type: &EntityType, id: &str| counter_type.parse_key(id).unwrap(); let data = entity! { TEST_SUBGRAPH_SCHEMA => - id: id, - count :count as i32, + id: "1", + count: count as i32 }; - let entity_op = EntityOperation::Set { - key: count_key_local(&data.get("id").unwrap().to_string()), - data, + let entity_op = if (block != 3 && block != 5 && block != 7) || !immutable { + EntityOperation::Set { + key: count_key_local(&COUNTER_TYPE, &data.get("id").unwrap().to_string()), + data, + } + } else { + EntityOperation::Remove { + key: count_key_local(&COUNTER_TYPE, &data.get("id").unwrap().to_string()), + } }; - transact_entity_operations(store, deployment, block_pointer(block), vec![entity_op]) + let mut ops = vec![entity_op]; + if immutable && block < 6 { + let data = entity! { TEST_SUBGRAPH_SCHEMA => + id: &block.to_string(), + count :count as i32, + }; + let entity_op = EntityOperation::Set { + key: count_key_local(&COUNTER2_TYPE, &data.get("id").unwrap().to_string()), + data, + }; + ops.push(entity_op); + } + transact_entity_operations(store, deployment, block_pointer(block), ops) .await .unwrap(); } -async fn insert_count_mutable( - store: &Arc, - deployment: &DeploymentLocator, - block: u8, - count: u8, -) { - insert_count(store, deployment, block, count, &COUNTER_TYPE, "1").await; -} - -async fn insert_count_immutable( - store: &Arc, - deployment: &DeploymentLocator, - block: u8, - count: u8, -) { - insert_count( - store, - deployment, - block, - count, - &COUNTER2_TYPE, - &(block / 2).to_string(), - ) - .await; -} - async fn pause_writer(deployment: &DeploymentLocator) { flush(deployment).await.unwrap(); writable::allow_steps(deployment, 0).await; @@ -191,13 +182,13 @@ where } for count in 1..4 { - insert_count_mutable(&subgraph_store, &deployment, count, count).await; + insert_count(&subgraph_store, &deployment, count, count, false).await; } // Test reading back with pending writes to the same entity pause_writer(&deployment).await; for count in 4..7 { - insert_count_mutable(&subgraph_store, &deployment, count, count).await; + insert_count(&subgraph_store, &deployment, count, count, false).await; } assert_eq!(6, read_count()); @@ -206,7 +197,7 @@ where // Test reading back with pending writes and a pending revert for count in 7..10 { - insert_count_mutable(&subgraph_store, &deployment, count, count).await; + insert_count(&subgraph_store, &deployment, count, count, false).await; } writable .revert_block_operations(block_pointer(2), FirehoseCursor::None) @@ -331,19 +322,46 @@ fn restart() { #[test] fn read_range_test() { run_test(|store, writable, deployment| async move { + let result_entities = vec![ + r#"(1, [EntityWithType { entity_op: Create, entity_type: EntityType(Counter), entity: Entity { count: Int(2), id: String("1") }, vid: 1 }, EntityWithType { entity_op: Create, entity_type: EntityType(Counter2), entity: Entity { count: Int(2), id: String("1") }, vid: 1 }])"#, + r#"(2, [EntityWithType { entity_op: Modify, entity_type: EntityType(Counter), entity: Entity { count: Int(4), id: String("1") }, vid: 2 }, EntityWithType { entity_op: Create, entity_type: EntityType(Counter2), entity: Entity { count: Int(4), id: String("2") }, vid: 2 }])"#, + r#"(3, [EntityWithType { entity_op: Delete, entity_type: EntityType(Counter), entity: Entity { count: Int(4), id: String("1") }, vid: 2 }, EntityWithType { entity_op: Create, entity_type: EntityType(Counter2), entity: Entity { count: Int(6), id: String("3") }, vid: 3 }])"#, + r#"(4, [EntityWithType { entity_op: Create, entity_type: EntityType(Counter), entity: Entity { count: Int(8), id: String("1") }, vid: 3 }, EntityWithType { entity_op: Create, entity_type: EntityType(Counter2), entity: Entity { count: Int(8), id: String("4") }, vid: 4 }])"#, + r#"(5, [EntityWithType { entity_op: Delete, entity_type: EntityType(Counter), entity: Entity { count: Int(8), id: String("1") }, vid: 3 }, EntityWithType { entity_op: Create, entity_type: EntityType(Counter2), entity: Entity { count: Int(10), id: String("5") }, vid: 5 }])"#, + r#"(6, [EntityWithType { entity_op: Create, entity_type: EntityType(Counter), entity: Entity { count: Int(12), id: String("1") }, vid: 4 }])"#, + r#"(7, [EntityWithType { entity_op: Delete, entity_type: EntityType(Counter), entity: Entity { count: Int(12), id: String("1") }, vid: 4 }])"#, + ]; let subgraph_store = store.subgraph_store(); writable.deployment_synced().unwrap(); - for count in 1..=7 { - insert_count_mutable(&subgraph_store, &deployment, 2 * count, 4 * count).await; - insert_count_immutable(&subgraph_store, &deployment, 2 * count + 1, 4 * count).await; + for count in 1..=5 { + insert_count(&subgraph_store, &deployment, count, 2 * count, true).await; } writable.flush().await.unwrap(); - - let br: Range = 4..8; writable.deployment_synced().unwrap(); + + let br: Range = 0..18; let entity_types = vec![COUNTER_TYPE.clone(), COUNTER2_TYPE.clone()]; - let e = writable.get_range(entity_types, br).unwrap(); - assert_eq!(e.len(), 5) // TODO: fix it - it should be 4 as the range is open + let e: BTreeMap> = writable + .get_range(entity_types.clone(), br.clone()) + .unwrap(); + assert_eq!(e.len(), 5); + for en in &e { + let index = *en.0 - 1; + let a = result_entities[index as usize]; + assert_eq!(a, format!("{:?}", en)); + } + for count in 6..=7 { + insert_count(&subgraph_store, &deployment, count, 2 * count, true).await; + } + writable.flush().await.unwrap(); + writable.deployment_synced().unwrap(); + let e: BTreeMap> = writable.get_range(entity_types, br).unwrap(); + assert_eq!(e.len(), 7); + for en in &e { + let index = *en.0 - 1; + let a = result_entities[index as usize]; + assert_eq!(a, format!("{:?}", en)); + } }) } From e03a54919965439f109c14e5df95a23272bfa2fa Mon Sep 17 00:00:00 2001 From: Zoran Cvetkov Date: Fri, 6 Sep 2024 19:29:59 +0300 Subject: [PATCH 7/7] Add casuality region to the SQL querry --- graph/src/blockchain/block_stream.rs | 4 ++-- graph/src/components/store/mod.rs | 1 + graph/src/components/store/traits.rs | 4 +++- store/postgres/src/deployment_store.rs | 3 ++- store/postgres/src/relational.rs | 5 +++-- store/postgres/src/relational_queries.rs | 15 +++++++++------ store/postgres/src/writable.rs | 17 +++++++++++++---- store/test-store/tests/graph/entity_cache.rs | 1 + store/test-store/tests/postgres/writable.rs | 6 ++++-- 9 files changed, 38 insertions(+), 18 deletions(-) diff --git a/graph/src/blockchain/block_stream.rs b/graph/src/blockchain/block_stream.rs index 9f8b6fce8a9..9cc7e4b710e 100644 --- a/graph/src/blockchain/block_stream.rs +++ b/graph/src/blockchain/block_stream.rs @@ -1,5 +1,5 @@ use crate::blockchain::SubgraphFilter; -use crate::data_source::subgraph; +use crate::data_source::{subgraph, CausalityRegion}; use crate::substreams::Clock; use crate::substreams_rpc::response::Message as SubstreamsMessage; use crate::substreams_rpc::BlockScopedData; @@ -453,7 +453,7 @@ async fn get_entities_for_range( let entity_type = schema.entity_type(entity_name)?; entity_types.push(entity_type); } - Ok(store.get_range(entity_types, from..to)?) + Ok(store.get_range(entity_types, CausalityRegion::ONCHAIN, from..to)?) } impl TriggersAdapterWrapper { diff --git a/graph/src/components/store/mod.rs b/graph/src/components/store/mod.rs index ccaa78a9a4f..dc7dfe8150c 100644 --- a/graph/src/components/store/mod.rs +++ b/graph/src/components/store/mod.rs @@ -1043,6 +1043,7 @@ impl ReadStore for EmptyStore { fn get_range( &self, _entity_types: Vec, + _causality_region: CausalityRegion, _block_range: Range, ) -> Result>, StoreError> { Ok(BTreeMap::new()) diff --git a/graph/src/components/store/traits.rs b/graph/src/components/store/traits.rs index a7cecad129b..1fb46e344f7 100644 --- a/graph/src/components/store/traits.rs +++ b/graph/src/components/store/traits.rs @@ -232,6 +232,7 @@ pub trait ReadStore: Send + Sync + 'static { fn get_range( &self, entity_types: Vec, + causality_region: CausalityRegion, block_range: Range, ) -> Result>, StoreError>; @@ -260,9 +261,10 @@ impl ReadStore for Arc { fn get_range( &self, entity_types: Vec, + causality_region: CausalityRegion, block_range: Range, ) -> Result>, StoreError> { - (**self).get_range(entity_types, block_range) + (**self).get_range(entity_types, causality_region, block_range) } fn get_derived( diff --git a/store/postgres/src/deployment_store.rs b/store/postgres/src/deployment_store.rs index faa50a0d1ad..9870179a620 100644 --- a/store/postgres/src/deployment_store.rs +++ b/store/postgres/src/deployment_store.rs @@ -1060,11 +1060,12 @@ impl DeploymentStore { &self, site: Arc, entity_types: Vec, + causality_region: CausalityRegion, block_range: Range, ) -> Result>, StoreError> { let mut conn = self.get_conn()?; let layout = self.layout(&mut conn, site)?; - layout.find_range(&mut conn, entity_types, block_range) + layout.find_range(&mut conn, entity_types, causality_region, block_range) } pub(crate) fn get_derived( diff --git a/store/postgres/src/relational.rs b/store/postgres/src/relational.rs index 4402dcefb6c..be9f155e4ba 100644 --- a/store/postgres/src/relational.rs +++ b/store/postgres/src/relational.rs @@ -520,6 +520,7 @@ impl Layout { &self, conn: &mut PgConnection, entity_types: Vec, + causality_region: CausalityRegion, block_range: Range, ) -> Result>, StoreError> { let mut tables = vec![]; @@ -529,11 +530,11 @@ impl Layout { et_map.insert(et.to_string(), Arc::new(et)); } let mut entities: BTreeMap> = BTreeMap::new(); - let lower_vec = FindRangeQuery::new(&tables, false, block_range.clone()) + let lower_vec = FindRangeQuery::new(&tables, causality_region, false, block_range.clone()) .get_results::(conn) .optional()? .unwrap_or_default(); - let upper_vec = FindRangeQuery::new(&tables, true, block_range) + let upper_vec = FindRangeQuery::new(&tables, causality_region, true, block_range) .get_results::(conn) .optional()? .unwrap_or_default(); diff --git a/store/postgres/src/relational_queries.rs b/store/postgres/src/relational_queries.rs index 9776aef9d01..94ae41eb262 100644 --- a/store/postgres/src/relational_queries.rs +++ b/store/postgres/src/relational_queries.rs @@ -2024,6 +2024,7 @@ impl<'a, Conn> RunQueryDsl for FindQuery<'a> {} #[derive(Debug, Clone)] pub struct FindRangeQuery<'a> { tables: &'a Vec<&'a Table>, + causality_region: CausalityRegion, is_upper_range: bool, imm_range: EntityBlockRange, mut_range: EntityBlockRange, @@ -2032,6 +2033,7 @@ pub struct FindRangeQuery<'a> { impl<'a> FindRangeQuery<'a> { pub fn new( tables: &'a Vec<&Table>, + causality_region: CausalityRegion, is_upper_range: bool, block_range: Range, ) -> Self { @@ -2039,6 +2041,7 @@ impl<'a> FindRangeQuery<'a> { let mut_range = EntityBlockRange::new(false, block_range, is_upper_range); Self { tables, + causality_region, is_upper_range, imm_range, mut_range, @@ -2075,12 +2078,12 @@ impl<'a> QueryFragment for FindRangeQuery<'a> { out.push_sql(" from "); out.push_sql(table.qualified_name.as_str()); out.push_sql(" e\n where"); - // TODO: add casuality region to the query - // if self.table.has_causality_region { - // out.push_sql("causality_region = "); - // out.push_bind_param::(&self.key.causality_region)?; - // out.push_sql(" and "); - // } + // add casuality region to the query + if table.has_causality_region { + out.push_sql("causality_region = "); + out.push_bind_param::(&self.causality_region)?; + out.push_sql(" and "); + } if table.immutable { self.imm_range.contains(&mut out)?; } else { diff --git a/store/postgres/src/writable.rs b/store/postgres/src/writable.rs index 3436db04004..ae89bc56e58 100644 --- a/store/postgres/src/writable.rs +++ b/store/postgres/src/writable.rs @@ -355,12 +355,14 @@ impl SyncStore { fn get_range( &self, entity_types: Vec, + causality_region: CausalityRegion, block_range: Range, ) -> Result>, StoreError> { retry::forever(&self.logger, "get_range", || { self.writable.get_range( self.site.cheap_clone(), entity_types.clone(), + causality_region, block_range.clone(), ) }) @@ -1234,9 +1236,11 @@ impl Queue { fn get_range( &self, entity_types: Vec, + causality_region: CausalityRegion, block_range: Range, ) -> Result>, StoreError> { - self.store.get_range(entity_types, block_range) + self.store + .get_range(entity_types, causality_region, block_range) } fn get_derived( @@ -1454,11 +1458,14 @@ impl Writer { fn get_range( &self, entity_types: Vec, + causality_region: CausalityRegion, block_range: Range, ) -> Result>, StoreError> { match self { - Writer::Sync(store) => store.get_range(entity_types, block_range), - Writer::Async { queue, .. } => queue.get_range(entity_types, block_range), + Writer::Sync(store) => store.get_range(entity_types, causality_region, block_range), + Writer::Async { queue, .. } => { + queue.get_range(entity_types, causality_region, block_range) + } } } @@ -1593,9 +1600,11 @@ impl ReadStore for WritableStore { fn get_range( &self, entity_types: Vec, + causality_region: CausalityRegion, block_range: Range, ) -> Result>, StoreError> { - self.writer.get_range(entity_types, block_range) + self.writer + .get_range(entity_types, causality_region, block_range) } fn get_derived( diff --git a/store/test-store/tests/graph/entity_cache.rs b/store/test-store/tests/graph/entity_cache.rs index 15315d89333..9762dcf68a9 100644 --- a/store/test-store/tests/graph/entity_cache.rs +++ b/store/test-store/tests/graph/entity_cache.rs @@ -70,6 +70,7 @@ impl ReadStore for MockStore { fn get_range( &self, _entity_types: Vec, + _causality_region: CausalityRegion, _block_range: Range, ) -> Result>, StoreError> { todo!() diff --git a/store/test-store/tests/postgres/writable.rs b/store/test-store/tests/postgres/writable.rs index ef619c13323..2540c62f966 100644 --- a/store/test-store/tests/postgres/writable.rs +++ b/store/test-store/tests/postgres/writable.rs @@ -343,7 +343,7 @@ fn read_range_test() { let br: Range = 0..18; let entity_types = vec![COUNTER_TYPE.clone(), COUNTER2_TYPE.clone()]; let e: BTreeMap> = writable - .get_range(entity_types.clone(), br.clone()) + .get_range(entity_types.clone(), CausalityRegion::ONCHAIN, br.clone()) .unwrap(); assert_eq!(e.len(), 5); for en in &e { @@ -356,7 +356,9 @@ fn read_range_test() { } writable.flush().await.unwrap(); writable.deployment_synced().unwrap(); - let e: BTreeMap> = writable.get_range(entity_types, br).unwrap(); + let e: BTreeMap> = writable + .get_range(entity_types, CausalityRegion::ONCHAIN, br) + .unwrap(); assert_eq!(e.len(), 7); for en in &e { let index = *en.0 - 1;