Skip to content

Commit

Permalink
refactor: upgrade pipeline to use latest Gasket lib (#151)
Browse files Browse the repository at this point in the history
  • Loading branch information
paulobressan authored Dec 15, 2023
1 parent 2799ce1 commit b075a3d
Show file tree
Hide file tree
Showing 52 changed files with 2,298 additions and 2,300 deletions.
916 changes: 796 additions & 120 deletions Cargo.lock

Large diffs are not rendered by default.

20 changes: 11 additions & 9 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,28 +12,26 @@ authors = ["Santiago Carmuega <[email protected]>"]


[dependencies]
pallas = "0.17.0"
pallas = "0.19.1"
# pallas = { path = "../pallas/pallas" }
# pallas = { git = "https://github.com/txpipe/pallas.git" }

gasket = { version = "0.5.0", features = ["derive"] }
# gasket = { path = "../../construkts/gasket-rs", features = ["derive"] }
# gasket = { git = "https://github.com/construkts/gasket-rs.git", features = ["derive"] }

hex = "0.4.3"
net2 = "0.2.37"
bech32 = "0.8.1"
clap = { version = "3.2.6", features = ["derive"] }
log = "0.4.14"
env_logger = "0.9.0"
merge = "0.1.0"
config = { version = "0.13.0", default-features = false, features = [
"toml",
"json",
] }
config = { version = "0.13.0", default-features = false, features = ["toml","json"] }
serde = { version = "1.0.136", features = ["derive"] }
serde_json = "1.0.79"
minicbor = "0.14.1"
prometheus_exporter = { version = "0.8.4", default-features = false }
# gasket = { path = "../../construkts/gasket-rs" }
gasket = { git = "https://github.com/construkts/gasket-rs.git" }
thiserror = "1.0.30"
redis = "0.21.5"
sled = "0.34.7"
lazy_static = "1.4.0"
rayon = "1.5.3"
Expand All @@ -50,6 +48,10 @@ indicatif = { version = "0.17.0-rc.11", optional = true }

# required for CI to complete successfully
openssl = { version = "0.10", optional = true, features = ["vendored"] }
tracing = "0.1.37"
tracing-subscriber = "0.3.17"
async-trait = "0.1.73"
r2d2_redis = "0.14.0"

[features]
async = ["futures", "tokio"]
Expand Down
26 changes: 26 additions & 0 deletions examples/redis/daemon.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
[source]
type = "N2N"
peers = ["relays-new.cardano-mainnet.iohk.io:3001"]

[chain]
type = "mainnet"

[intersect]
type = "Point"
value = [
104699772,
"19525913a14c4540a782d188c333f2c54d1845620aef56e3166a2c1fffb800fc"
]

[enrich]
type = "Sled"
db_path = "./"

[[reducers]]
type = "FullUtxosByAddress"
filter = ["addr1z8snz7c4974vzdpxu65ruphl3zjdvtxw8strf2c2tmqnxz2j2c79gy9l76sdg0xwhd7r0c0kna0tycz4y5s6mlenh8pq0xmsha"]

[storage]
type = "Redis"
url = "redis://127.0.0.1/1"

18 changes: 18 additions & 0 deletions examples/redis/docker-compose.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
version: "3"
services:
redis:
image: redis
container_name: redis
ports:
- "6379:6379"
networks:
- redis-network
healthcheck:
test: ["CMD", "redis-cli", "ping"]
interval: 5s
timeout: 5s
retries: 5

networks:
redis-network:
driver: bridge
80 changes: 35 additions & 45 deletions src/bin/scrolls/console.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,17 +3,16 @@ use std::{
time::{Duration, Instant},
};

use gasket::metrics::Reading;
use lazy_static::{__Deref, lazy_static};
use log::Log;
use scrolls::bootstrap::Pipeline;
use gasket::{metrics::Reading, runtime::Tether};
use lazy_static::lazy_static;
use tracing::{debug, error, warn};

#[derive(clap::ValueEnum, Clone)]
pub enum Mode {
/// shows progress as a plain sequence of logs
Plain,
/// shows aggregated progress and metrics
TUI,
Tui,
}

impl Default for Mode {
Expand All @@ -24,7 +23,7 @@ impl Default for Mode {

struct TuiConsole {
chainsync_progress: indicatif::ProgressBar,
received_blocks: indicatif::ProgressBar,
fetched_blocks: indicatif::ProgressBar,
reducer_ops_count: indicatif::ProgressBar,
storage_ops_count: indicatif::ProgressBar,
enrich_inserts: indicatif::ProgressBar,
Expand Down Expand Up @@ -62,7 +61,7 @@ impl TuiConsole {
.unwrap(),
),
),
received_blocks: Self::build_counter_spinner("received blocks", &container),
fetched_blocks: Self::build_counter_spinner("fetched blocks", &container),
enrich_inserts: Self::build_counter_spinner("enrich inserts", &container),
enrich_removes: Self::build_counter_spinner("enrich removes", &container),
enrich_matches: Self::build_counter_spinner("enrich matches", &container),
Expand All @@ -73,17 +72,16 @@ impl TuiConsole {
}
}

fn refresh(&self, pipeline: &Pipeline) {
for tether in pipeline.tethers.iter() {
fn refresh<'a>(&self, tethers: impl Iterator<Item = &'a Tether>) {
for tether in tethers {
let state = match tether.check_state() {
gasket::runtime::TetherState::Dropped => "dropped!",
gasket::runtime::TetherState::Blocked(_) => "blocked!",
gasket::runtime::TetherState::Alive(x) => match x {
gasket::runtime::StageState::Bootstrap => "bootstrapping...",
gasket::runtime::StageState::Working => "working...",
gasket::runtime::StageState::Idle => "idle...",
gasket::runtime::StageState::StandBy => "stand-by...",
gasket::runtime::StageState::Teardown => "tearing down...",
gasket::runtime::StagePhase::Bootstrap => "bootstrapping...",
gasket::runtime::StagePhase::Working => "working...",
gasket::runtime::StagePhase::Teardown => "tearing down...",
gasket::runtime::StagePhase::Ended => "ended",
},
};

Expand All @@ -97,9 +95,9 @@ impl TuiConsole {
(_, "last_block", Reading::Gauge(x)) => {
self.chainsync_progress.set_position(x as u64);
}
(_, "received_blocks", Reading::Count(x)) => {
self.received_blocks.set_position(x);
self.received_blocks.set_message(state);
(_, "fetched_blocks", Reading::Count(x)) => {
self.chainsync_progress.set_position(x);
self.chainsync_progress.set_message(state);
}
("reducers", "ops_count", Reading::Count(x)) => {
self.reducer_ops_count.set_position(x);
Expand Down Expand Up @@ -142,19 +140,6 @@ impl TuiConsole {
}
}

impl Log for TuiConsole {
fn enabled(&self, metadata: &log::Metadata) -> bool {
metadata.level() >= log::Level::Info
}

fn log(&self, record: &log::Record) {
self.chainsync_progress
.set_message(format!("{}", record.args()))
}

fn flush(&self) {}
}

struct PlainConsole {
last_report: Mutex<Instant>,
}
Expand All @@ -166,31 +151,34 @@ impl PlainConsole {
}
}

fn refresh(&self, pipeline: &Pipeline) {
fn refresh<'a>(&self, tethers: impl Iterator<Item = &'a Tether>) {
let mut last_report = self.last_report.lock().unwrap();

if last_report.elapsed() <= Duration::from_secs(10) {
return;
}

for tether in pipeline.tethers.iter() {
for tether in tethers {
match tether.check_state() {
gasket::runtime::TetherState::Dropped => {
log::error!("[{}] stage tether has been dropped", tether.name());
error!("[{}] stage tether has been dropped", tether.name());
}
gasket::runtime::TetherState::Blocked(_) => {
log::warn!("[{}] stage tehter is blocked or not reporting state", tether.name());
warn!(
"[{}] stage tehter is blocked or not reporting state",
tether.name()
);
}
gasket::runtime::TetherState::Alive(state) => {
log::debug!("[{}] stage is alive with state: {:?}", tether.name(), state);
debug!("[{}] stage is alive with state: {:?}", tether.name(), state);
match tether.read_metrics() {
Ok(readings) => {
for (key, value) in readings {
log::debug!("[{}] metric `{}` = {:?}", tether.name(), key, value);
debug!("[{}] metric `{}` = {:?}", tether.name(), key, value);
}
}
Err(err) => {
log::error!("[{}] error reading metrics: {}", tether.name(), err)
error!("[{}] error reading metrics: {}", tether.name(), err)
}
}
}
Expand All @@ -210,17 +198,19 @@ lazy_static! {
}

pub fn initialize(mode: &Option<Mode>) {
match mode {
Some(Mode::TUI) => log::set_logger(TUI_CONSOLE.deref())
.map(|_| log::set_max_level(log::LevelFilter::Info))
.unwrap(),
_ => env_logger::init(),
if !matches!(mode, Some(Mode::Tui)) {
tracing::subscriber::set_global_default(
tracing_subscriber::FmtSubscriber::builder()
.with_max_level(tracing::Level::DEBUG)
.finish(),
)
.unwrap();
}
}

pub fn refresh(mode: &Option<Mode>, pipeline: &Pipeline) {
pub fn refresh<'a>(mode: &Option<Mode>, tethers: impl Iterator<Item = &'a Tether>) {
match mode {
Some(Mode::TUI) => TUI_CONSOLE.refresh(pipeline),
_ => PLAIN_CONSOLE.refresh(pipeline),
Some(Mode::Tui) => TUI_CONSOLE.refresh(tethers),
_ => PLAIN_CONSOLE.refresh(tethers),
}
}
Loading

0 comments on commit b075a3d

Please sign in to comment.