Skip to content

Commit

Permalink
feat: make pipeline compatible with Conway era (#168)
Browse files Browse the repository at this point in the history
  • Loading branch information
scarmuega authored Aug 23, 2024
1 parent a593efd commit 89293d9
Show file tree
Hide file tree
Showing 31 changed files with 398 additions and 317 deletions.
199 changes: 129 additions & 70 deletions Cargo.lock

Large diffs are not rendered by default.

16 changes: 11 additions & 5 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,20 +12,26 @@ authors = ["Santiago Carmuega <[email protected]>"]


[dependencies]
pallas = "0.17.0"
# pallas = "0.17.0"
# pallas = { path = "../pallas/pallas" }
# pallas = { git = "https://github.com/txpipe/pallas.git" }

pallas-multiplexer = "0.18.2"
pallas-miniprotocols = "0.18.2"
pallas-primitives = "0.29.0"
pallas-traverse = "0.29.0"
pallas-addresses = "0.29.0"
pallas-codec = "0.29.0"
pallas-crypto = "0.29.0"

hex = "0.4.3"
net2 = "0.2.37"
bech32 = "0.8.1"
clap = { version = "3.2.6", features = ["derive"] }
log = "0.4.14"
env_logger = "0.9.0"
merge = "0.1.0"
config = { version = "0.13.0", default-features = false, features = [
"toml",
"json",
] }
config = { version = "0.13.0", default-features = false, features = ["toml", "json"] }
serde = { version = "1.0.136", features = ["derive"] }
serde_json = "1.0.79"
minicbor = "0.14.1"
Expand Down
8 changes: 4 additions & 4 deletions src/crosscut/args.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use pallas::network::miniprotocols::{
use pallas_miniprotocols::{
Point,
MAINNET_MAGIC,
TESTNET_MAGIC,
Expand Down Expand Up @@ -151,7 +151,7 @@ impl IntersectConfig {
/// Optional configuration to stop processing new blocks after processing:
/// 1. a block with the given hash
/// 2. the first block on or after a given absolute slot
/// 3. TODO: a total of X blocks
/// 3. TODO: a total of X blocks
#[derive(Deserialize, Debug, Clone)]
pub struct FinalizeConfig {
until_hash: Option<String>,
Expand All @@ -174,13 +174,13 @@ pub fn should_finalize(
return expected == &hex::encode(current);
}
}

if let Some(max) = config.max_block_slot {
if last_point.slot_or_default() >= max {
return true;
}
}

// if let Some(max) = config.max_block_quantity {
// if block_count >= max {
// return true;
Expand Down
4 changes: 2 additions & 2 deletions src/crosscut/epochs.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
// TODO this is temporary, we should actually use this code from Pallas as this
// is very generic code

use pallas::ledger::traverse::MultiEraBlock;
use pallas_traverse::MultiEraBlock;

fn post_byron_epoch_for_slot(shelley_known_slot: u64, shelley_epoch_length: u32, slot: u64) -> u64 {
let last_byron_epoch_no = 208;
Expand All @@ -25,7 +25,7 @@ pub fn block_epoch(chain: &super::ChainWellKnownInfo, block: &MultiEraBlock) ->
let slot = block.slot();

match block.era() {
pallas::ledger::traverse::Era::Byron => {
pallas_traverse::Era::Byron => {
byron_epoch_for_slot(chain.byron_epoch_length, chain.byron_slot_length, slot)
}
_ => post_byron_epoch_for_slot(chain.shelley_known_slot, chain.shelley_epoch_length, slot),
Expand Down
11 changes: 4 additions & 7 deletions src/crosscut/filters.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@
use pallas::ledger::{
addresses::Address,
traverse::{MultiEraBlock, MultiEraTx},
};
use pallas_addresses::Address;
use pallas_traverse::{MultiEraBlock, MultiEraTx};
use serde::Deserialize;

use crate::prelude::*;
Expand Down Expand Up @@ -93,7 +91,6 @@ pub struct TransactionPattern {
pub is_valid: Option<bool>,
}


#[derive(Deserialize, Clone)]
#[serde(rename_all = "snake_case")]
pub enum Predicate {
Expand Down Expand Up @@ -224,7 +221,7 @@ fn eval_block(block: &MultiEraBlock, pattern: &BlockPattern) -> Result<bool, cra

fn eval_transaction(tx: &MultiEraTx, pattern: &TransactionPattern) -> Result<bool, crate::Error> {
if let Some(b) = pattern.is_valid {
return Ok(tx.is_valid() == b)
return Ok(tx.is_valid() == b);
}

Ok(false)
Expand Down Expand Up @@ -287,7 +284,7 @@ pub fn eval_predicate(

#[cfg(test)]
mod tests {
use pallas::ledger::traverse::MultiEraBlock;
use pallas_traverse::MultiEraBlock;

use crate::{
crosscut::policies::{ErrorAction, RuntimePolicy},
Expand Down
7 changes: 3 additions & 4 deletions src/enrich/sled.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,9 @@ use gasket::{
runtime::{spawn_stage, WorkOutcome},
};

use pallas::{
codec::minicbor,
ledger::traverse::{Era, MultiEraBlock, MultiEraTx, OutputRef},
};
use pallas_codec::minicbor;
use pallas_traverse::{Era, MultiEraBlock, MultiEraTx, OutputRef};

use rayon::iter::{IntoParallelRefIterator, ParallelIterator};
use serde::Deserialize;
use sled::IVec;
Expand Down
10 changes: 4 additions & 6 deletions src/model.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,8 @@
use std::{collections::HashMap, fmt::Debug};

use pallas::{
ledger::traverse::{Era, MultiEraBlock, MultiEraOutput, MultiEraTx, OutputRef},
network::miniprotocols::Point,
crypto::hash::Hash,
};
use pallas_crypto::hash::Hash;
use pallas_miniprotocols::Point;
use pallas_traverse::{Era, MultiEraBlock, MultiEraOutput, MultiEraTx, OutputRef};

use crate::prelude::*;

Expand Down Expand Up @@ -60,7 +58,7 @@ impl BlockContext {
.consumes()
.iter()
.map(|i| i.output_ref())
.map(|r| self.find_utxo(&r).map(|u| (r,u)))
.map(|r| self.find_utxo(&r).map(|u| (r, u)))
.map(|r| r.apply_policy(policy))
.collect::<Result<Vec<_>, _>>()?
.into_iter()
Expand Down
39 changes: 26 additions & 13 deletions src/reducers/address_by_asset.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
use pallas::ledger::traverse::MultiEraOutput;
use pallas::ledger::traverse::{Asset, MultiEraBlock};
use pallas_traverse::MultiEraAsset;
use pallas_traverse::MultiEraBlock;
use pallas_traverse::MultiEraOutput;
use pallas_traverse::MultiEraPolicyAssets;
use serde::Deserialize;

use crate::{model, prelude::*};
Expand All @@ -19,16 +21,20 @@ pub struct Reducer {
}

impl Reducer {
fn to_string_output(&self, asset: Asset) -> Option<String> {
match asset.policy_hex() {
Some(policy_id) if policy_id.eq(&self.config.policy_id_hex) => match asset {
Asset::NativeAsset(_, name, _) => match self.convert_to_ascii {
true => String::from_utf8(name).ok(),
false => Some(hex::encode(name)),
},
_ => None,
},
_ => None,
fn to_string_output(
&self,
policy: &MultiEraPolicyAssets,
asset: &MultiEraAsset,
) -> Option<String> {
let policy_id = policy.policy().to_string();

if policy_id.eq(&self.config.policy_id_hex) {
match self.convert_to_ascii {
true => asset.to_ascii_name(),
false => Some(hex::encode(asset.name())),
}
} else {
None
}
}

Expand All @@ -40,7 +46,14 @@ impl Reducer {
let asset_names: Vec<_> = txo
.non_ada_assets()
.into_iter()
.filter_map(|x| self.to_string_output(x))
.flat_map(|policy| {
policy
.assets()
.iter()
.map(|asset| self.to_string_output(&policy, asset))
.collect::<Vec<_>>()
})
.flatten()
.collect();

if asset_names.is_empty() {
Expand Down
4 changes: 2 additions & 2 deletions src/reducers/address_by_txo.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use gasket::error::AsWorkError;
use pallas::crypto::hash::Hash;
use pallas::ledger::traverse::MultiEraBlock;
use pallas_crypto::hash::Hash;
use pallas_traverse::MultiEraBlock;
use serde::Deserialize;

use crate::prelude::*;
Expand Down
6 changes: 3 additions & 3 deletions src/reducers/addresses_by_stake.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use pallas::ledger::addresses::{Address, StakeAddress};
use pallas::ledger::traverse::MultiEraBlock;
use pallas_addresses::{Address, StakeAddress};
use pallas_traverse::MultiEraBlock;
use serde::Deserialize;

use crate::{crosscut, model, prelude::*};
Expand Down Expand Up @@ -91,7 +91,7 @@ impl Config {
#[cfg(test)]
mod test {
use super::any_address_to_stake_bech32;
use pallas::ledger::addresses::Address;
use pallas_addresses::Address;

#[test]
fn stake_bech32() {
Expand Down
50 changes: 21 additions & 29 deletions src/reducers/asset_holders_by_asset_id.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
use pallas::ledger::traverse::{Asset, MultiEraOutput};
use pallas::ledger::traverse::{MultiEraBlock, OutputRef};
use pallas_traverse::MultiEraOutput;
use pallas_traverse::{MultiEraBlock, OutputRef};
use serde::Deserialize;

use crate::{crosscut, model, prelude::*};
use pallas::crypto::hash::Hash;
use pallas_crypto::hash::Hash;

use crate::crosscut::epochs::block_epoch;
use std::str::FromStr;
Expand Down Expand Up @@ -76,22 +76,18 @@ impl Reducer {

let address = utxo.address().map(|addr| addr.to_string()).or_panic()?;

for asset in utxo.assets() {
match asset {
Asset::NativeAsset(policy_id, _, quantity) => {
if self.is_policy_id_accepted(&policy_id) {
let subject = asset.subject();
let key = self.config_key(subject, epoch_no);
let delta = quantity as i64 * (-1);
for policy in utxo.non_ada_assets() {
for asset in policy.assets() {
if self.is_policy_id_accepted(policy.policy()) {
let subject = format!("{}.{}", policy.policy(), hex::encode(asset.name()));
let key = self.config_key(subject, epoch_no);
let delta = asset.output_coin().unwrap_or_default() as i64 * (-1);

let crdt =
model::CRDTCommand::SortedSetRemove(key, address.to_string(), delta);
let crdt = model::CRDTCommand::SortedSetRemove(key, address.to_string(), delta);

output.send(gasket::messaging::Message::from(crdt))?;
}
output.send(gasket::messaging::Message::from(crdt))?;
}
_ => (),
};
}
}

Ok(())
Expand All @@ -108,22 +104,18 @@ impl Reducer {
.map(|addr| addr.to_string())
.or_panic()?;

for asset in tx_output.assets() {
match asset {
Asset::NativeAsset(policy_id, _, quantity) => {
if self.is_policy_id_accepted(&policy_id) {
let subject = asset.subject();
let key = self.config_key(subject, epoch_no);
let delta = quantity as i64;
for policy in tx_output.non_ada_assets() {
for asset in policy.assets() {
if self.is_policy_id_accepted(policy.policy()) {
let subject = format!("{}.{}", policy.policy(), hex::encode(asset.name()));
let key = self.config_key(subject, epoch_no);
let delta = asset.output_coin().unwrap_or_default() as i64;

let crdt =
model::CRDTCommand::SortedSetAdd(key, address.to_string(), delta);
let crdt = model::CRDTCommand::SortedSetAdd(key, address.to_string(), delta);

output.send(gasket::messaging::Message::from(crdt))?;
}
output.send(gasket::messaging::Message::from(crdt))?;
}
_ => {}
};
}
}

Ok(())
Expand Down
4 changes: 2 additions & 2 deletions src/reducers/balance_by_address.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use pallas::ledger::traverse::MultiEraOutput;
use pallas::ledger::traverse::{MultiEraBlock, OutputRef};
use pallas_traverse::MultiEraOutput;
use pallas_traverse::{MultiEraBlock, OutputRef};
use serde::Deserialize;

use crate::{crosscut, model, prelude::*};
Expand Down
15 changes: 6 additions & 9 deletions src/reducers/block_header_by_hash.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use pallas::ledger::traverse::MultiEraBlock;
use pallas_traverse::MultiEraBlock;
use serde::Deserialize;

use crate::prelude::*;
Expand All @@ -23,20 +23,17 @@ impl Reducer {
output: &mut super::OutputPort,
) -> Result<(), gasket::error::Error> {
if filter_matches_block!(self, block, ctx) {
let value = block
.header()
.cbor()
.to_vec();

let value = block.header().cbor().to_vec();

let crdt = model::CRDTCommand::any_write_wins(
self.config.key_prefix.as_deref(),
block.hash(),
value
value,
);

output.send(gasket::messaging::Message::from(crdt))?;
}

Ok(())
}
}
Expand Down
Loading

0 comments on commit 89293d9

Please sign in to comment.