use std::{
future::Future,
sync::{atomic::AtomicBool, Arc, Mutex},
time::Duration,
};
use avail_rust::prelude::*;
use tokio::task::JoinHandle;
type SharedLock<T> = Arc<Mutex<T>>;
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
enum Kind {
Manual,
Stream,
}
pub async fn run() -> Result<(), ClientError> {
let sdk = SDK::new(SDK::turing_endpoint()).await?;
let mut indexer = Indexer::new(sdk.clone());
indexer.run(Kind::Stream);
// Fetching blocks in procedural way
let mut sub = indexer.subscribe().await;
for _ in 0..3 {
let block = sub.fetch().await;
println!("Current: Block Height: {}, Block Hash: {:?}", block.height, block.hash)
}
// Fetching historical blocks
sub.block_height -= 100;
for _ in 0..3 {
let block = sub.fetch().await;
println!(
"Historical: Block Height: {}, Block Hash: {:?}",
block.height, block.hash
)
}
// Callback
let mut sub = indexer.callback(callback).await;
tokio::time::sleep(Duration::from_secs(25)).await;
sub.shutdown();
indexer.shutdown();
tokio::time::sleep(Duration::from_secs(3)).await;
Ok(())
}
async fn callback(block: IndexedBlock) {
println!("Callback: Block Height: {}, Block Hash: {:?}", block.height, block.hash)
}
#[derive(Clone)]
struct Indexer {
block: SharedLock<Option<IndexedBlock>>,
sdk: Arc<SDK>,
thread: SharedLock<Option<JoinHandle<()>>>,
}
impl Indexer {
pub fn new(sdk: SDK) -> Self {
Self {
block: Arc::new(Mutex::new(None)),
sdk: Arc::new(sdk),
thread: Arc::new(Mutex::new(None)),
}
}
pub fn run(&mut self, kind: Kind) {
if self.thread.lock().unwrap().is_some() {
return;
}
let block = self.block.clone();
let sdk = self.sdk.clone();
let t = tokio::spawn(async move {
println!("Kind: {:?}", kind);
match kind {
Kind::Manual => Self::task_man(block, sdk).await,
Kind::Stream => Self::task_sub(block, sdk).await,
};
});
self.thread = Arc::new(Mutex::new(Some(t)))
}
pub fn shutdown(&mut self) {
let lock = self.thread.lock().unwrap();
let Some(t) = lock.as_ref() else {
return;
};
t.abort();
}
pub async fn get_block(&self, block_height: u32) -> IndexedBlock {
loop {
let block = self.block.lock().unwrap().clone();
let Some(block) = block else {
tokio::time::sleep(Duration::from_secs(5)).await;
continue;
};
if block_height > block.height {
tokio::time::sleep(Duration::from_secs(5)).await;
continue;
}
if block_height == block.height {
return block;
}
let block_hash = self.sdk.client.block_hash(block_height).await.unwrap();
let block = Block::new(&self.sdk.client, block_hash.clone()).await.unwrap();
return IndexedBlock {
height: block_height,
hash: block_hash,
block,
};
}
}
pub async fn subscribe(&self) -> Subscription {
let block_height = loop {
let height = {
let block = self.block.lock().unwrap();
block.as_ref().map(|x| x.height)
};
if height.is_none() {
tokio::time::sleep(Duration::from_secs(5)).await;
continue;
}
break height.unwrap();
};
Subscription::new(block_height, self.clone())
}
pub async fn callback<F>(&self, cb: fn(IndexedBlock) -> F) -> Subscription
where
F: Future + std::marker::Send + 'static,
{
let sub = self.subscribe().await;
let mut sub2 = sub.clone();
tokio::spawn(async move {
loop {
let block = sub2.fetch().await;
cb(block).await;
}
});
sub
}
async fn task_man(shared_block: SharedLock<Option<IndexedBlock>>, sdk: Arc<SDK>) {
loop {
let new_hash = sdk.client.finalized_block_hash().await.unwrap();
let cur_hash = {
let block = shared_block.lock().unwrap();
block.as_ref().map(|x| x.hash)
};
if cur_hash.is_some_and(|x| x == new_hash) {
tokio::time::sleep(Duration::from_secs(15)).await;
continue;
}
let new_block = Block::new(&sdk.client, new_hash).await.unwrap();
let new_height = sdk.client.block_number(new_hash.clone()).await.unwrap();
let mut cur_block = shared_block.lock().unwrap();
*cur_block = Some(IndexedBlock {
height: new_height,
hash: new_hash,
block: new_block,
})
}
}
async fn task_sub(shared_block: SharedLock<Option<IndexedBlock>>, sdk: Arc<SDK>) {
let mut stream = sdk.client.blocks().subscribe_finalized().await.unwrap();
loop {
let block = stream.next().await.unwrap();
let block = match block {
Ok(b) => b,
Err(e) => {
if e.is_disconnected_will_reconnect() {
println!("The RPC connection was lost and we may have missed a few blocks");
continue;
}
panic!("Something is wrong");
},
};
let height = block.number();
let hash = block.hash();
let block = Block::from_block(block).await.unwrap();
let mut cur_block = shared_block.lock().unwrap();
*cur_block = Some(IndexedBlock { height, hash, block })
}
}
}
#[derive(Clone)]
struct Subscription {
pub indexer: Indexer,
pub block_height: u32,
pub shutdown: Arc<AtomicBool>,
}
impl Subscription {
pub fn new(block_height: u32, indexer: Indexer) -> Self {
Self {
indexer,
block_height,
shutdown: Arc::new(AtomicBool::new(false)),
}
}
pub async fn fetch(&mut self) -> IndexedBlock {
let block = self.indexer.get_block(self.block_height).await;
self.block_height += 1;
block
}
pub fn shutdown(&mut self) {
self.shutdown.store(false, std::sync::atomic::Ordering::Relaxed);
}
}
#[derive(Clone)]
struct IndexedBlock {
height: u32,
hash: H256,
block: Block,
}