Split mempool into tx and da mempool (#636)

* Split mempool into tx and da mempool

* Fix tests

* Differentiate between da and cl mempool msgs in consensus

* Common mempool msg type

---------

Co-authored-by: Gusto <bacvinka@gmail.com>
This commit is contained in:
Daniel Sanchez 2024-04-18 15:07:28 +02:00 committed by GitHub
parent 7e4d00cc78
commit e085b6bef4
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
15 changed files with 602 additions and 344 deletions

View File

@ -22,8 +22,8 @@ use utoipa_swagger_ui::SwaggerUi;
use full_replication::{Blob, Certificate};
use nomos_core::{da::blob, header::HeaderId, tx::Transaction};
use nomos_mempool::{
network::adapters::libp2p::Libp2pAdapter as MempoolNetworkAdapter, openapi::Status,
MempoolMetrics,
network::adapters::libp2p::Libp2pAdapter as MempoolNetworkAdapter,
tx::service::openapi::Status, MempoolMetrics,
};
use nomos_network::backends::libp2p::Libp2p as NetworkBackend;
use nomos_storage::backends::StorageSerde;
@ -351,10 +351,9 @@ where
Tx: Transaction + Clone + Debug + Hash + Serialize + DeserializeOwned + Send + Sync + 'static,
<Tx as Transaction>::Hash: std::cmp::Ord + Debug + Send + Sync + 'static,
{
make_request_and_return_response!(mempool::add::<
make_request_and_return_response!(mempool::add_tx::<
NetworkBackend,
MempoolNetworkAdapter<Tx, <Tx as Transaction>::Hash>,
nomos_mempool::Transaction,
Tx,
<Tx as Transaction>::Hash,
>(&handle, tx, Transaction::hash))
@ -372,10 +371,9 @@ async fn add_cert(
State(handle): State<OverwatchHandle>,
Json(cert): Json<Certificate>,
) -> Response {
make_request_and_return_response!(mempool::add::<
make_request_and_return_response!(mempool::add_cert::<
NetworkBackend,
MempoolNetworkAdapter<Certificate, <Blob as blob::Blob>::Hash>,
nomos_mempool::Certificate,
Certificate,
<Blob as blob::Blob>::Hash,
>(

View File

@ -24,10 +24,7 @@ use nomos_da::{
};
use nomos_log::Logger;
use nomos_mempool::network::adapters::libp2p::Libp2pAdapter as MempoolNetworkAdapter;
use nomos_mempool::{
backend::mockpool::MockPool, Certificate as CertDiscriminant, MempoolService,
Transaction as TxDiscriminant,
};
use nomos_mempool::{backend::mockpool::MockPool, TxMempoolService};
#[cfg(feature = "metrics")]
use nomos_metrics::Metrics;
use nomos_network::backends::libp2p::Libp2p as NetworkBackend;
@ -42,6 +39,7 @@ pub use nomos_core::{
da::certificate::select::FillSize as FillSizeWithBlobsCertificate,
tx::select::FillSize as FillSizeWithTx,
};
use nomos_mempool::da::service::DaMempoolService;
use nomos_network::NetworkService;
use nomos_system_sig::SystemSig;
use overwatch_derive::*;
@ -78,20 +76,29 @@ pub type DataAvailability = DataAvailabilityService<
DaNetworkAdapter<Blob, Attestation>,
>;
type Mempool<K, V, D> = MempoolService<MempoolNetworkAdapter<K, V>, MockPool<HeaderId, K, V>, D>;
pub type DaMempool = DaMempoolService<
MempoolNetworkAdapter<
Certificate,
<<Certificate as certificate::Certificate>::Blob as blob::Blob>::Hash,
>,
MockPool<
HeaderId,
Certificate,
<<Certificate as certificate::Certificate>::Blob as blob::Blob>::Hash,
>,
>;
pub type TxMempool = TxMempoolService<
MempoolNetworkAdapter<Tx, <Tx as Transaction>::Hash>,
MockPool<HeaderId, Tx, <Tx as Transaction>::Hash>,
>;
#[derive(Services)]
pub struct Nomos {
logging: ServiceHandle<Logger>,
network: ServiceHandle<NetworkService<NetworkBackend>>,
cl_mempool: ServiceHandle<Mempool<Tx, <Tx as Transaction>::Hash, TxDiscriminant>>,
da_mempool: ServiceHandle<
Mempool<
Certificate,
<<Certificate as certificate::Certificate>::Blob as blob::Blob>::Hash,
CertDiscriminant,
>,
>,
cl_mempool: ServiceHandle<TxMempool>,
da_mempool: ServiceHandle<DaMempool>,
cryptarchia: ServiceHandle<Cryptarchia>,
http: ServiceHandle<ApiService<AxumBackend<Tx, Wire, MB16>>>,
da: ServiceHandle<DataAvailability>,

View File

@ -73,7 +73,7 @@ fn main() -> Result<()> {
network: config.network,
logging: config.log,
http: config.http,
cl_mempool: nomos_mempool::Settings {
cl_mempool: nomos_mempool::TxMempoolSettings {
backend: (),
network: AdapterSettings {
topic: String::from(nomos_node::CL_TOPIC),
@ -81,7 +81,7 @@ fn main() -> Result<()> {
},
registry: registry.clone(),
},
da_mempool: nomos_mempool::Settings {
da_mempool: nomos_mempool::DaMempoolSettings {
backend: (),
network: AdapterSettings {
topic: String::from(nomos_node::DA_TOPIC),

View File

@ -3,18 +3,15 @@ use core::{fmt::Debug, hash::Hash};
use nomos_core::header::HeaderId;
use nomos_core::tx::Transaction;
use nomos_mempool::{
backend::mockpool::MockPool,
network::adapters::libp2p::Libp2pAdapter as MempoolNetworkAdapter,
openapi::{MempoolMetrics, Status},
MempoolMsg, MempoolService, Transaction as TxDiscriminant,
backend::mockpool::MockPool, network::adapters::libp2p::Libp2pAdapter as MempoolNetworkAdapter,
tx::service::openapi::Status, MempoolMetrics, MempoolMsg, TxMempoolService,
};
use serde::{Deserialize, Serialize};
use tokio::sync::oneshot;
type ClMempoolService<T> = MempoolService<
type ClMempoolService<T> = TxMempoolService<
MempoolNetworkAdapter<T, <T as Transaction>::Hash>,
MockPool<HeaderId, T, <T as Transaction>::Hash>,
TxDiscriminant,
>;
pub async fn cl_mempool_metrics<T>(

View File

@ -5,18 +5,17 @@ use nomos_da::{
backend::memory_cache::BlobCache, network::adapters::libp2p::Libp2pAdapter as DaNetworkAdapter,
DaMsg, DataAvailabilityService,
};
use nomos_mempool::da::service::DaMempoolService;
use nomos_mempool::{
backend::mockpool::MockPool,
network::adapters::libp2p::Libp2pAdapter as MempoolNetworkAdapter,
openapi::{MempoolMetrics, Status},
Certificate as CertDiscriminant, MempoolMsg, MempoolService,
backend::mockpool::MockPool, network::adapters::libp2p::Libp2pAdapter as MempoolNetworkAdapter,
tx::service::openapi::Status,
};
use nomos_mempool::{MempoolMetrics, MempoolMsg};
use tokio::sync::oneshot;
pub type DaMempoolService = MempoolService<
pub type MempoolServiceDa = DaMempoolService<
MempoolNetworkAdapter<Certificate, <Blob as blob::Blob>::Hash>,
MockPool<HeaderId, Certificate, <Blob as blob::Blob>::Hash>,
CertDiscriminant,
>;
pub type DataAvailability = DataAvailabilityService<
@ -28,7 +27,7 @@ pub type DataAvailability = DataAvailabilityService<
pub async fn da_mempool_metrics(
handle: &overwatch_rs::overwatch::handle::OverwatchHandle,
) -> Result<MempoolMetrics, super::DynError> {
let relay = handle.relay::<DaMempoolService>().connect().await?;
let relay = handle.relay::<MempoolServiceDa>().connect().await?;
let (sender, receiver) = oneshot::channel();
relay
.send(MempoolMsg::Metrics {
@ -44,7 +43,7 @@ pub async fn da_mempool_status(
handle: &overwatch_rs::overwatch::handle::OverwatchHandle,
items: Vec<<Blob as blob::Blob>::Hash>,
) -> Result<Vec<Status<HeaderId>>, super::DynError> {
let relay = handle.relay::<DaMempoolService>().connect().await?;
let relay = handle.relay::<MempoolServiceDa>().connect().await?;
let (sender, receiver) = oneshot::channel();
relay
.send(MempoolMsg::Status {

View File

@ -1,12 +1,13 @@
use core::{fmt::Debug, hash::Hash};
use nomos_core::header::HeaderId;
use nomos_mempool::{
backend::mockpool::MockPool, network::NetworkAdapter, Discriminant, MempoolMsg, MempoolService,
backend::mockpool::MockPool, network::NetworkAdapter, DaMempoolService, MempoolMsg,
TxMempoolService,
};
use nomos_network::backends::NetworkBackend;
use tokio::sync::oneshot;
pub async fn add<N, A, D, Item, Key>(
pub async fn add_tx<N, A, Item, Key>(
handle: &overwatch_rs::overwatch::handle::OverwatchHandle,
item: Item,
converter: impl Fn(&Item) -> Key,
@ -15,12 +16,45 @@ where
N: NetworkBackend,
A: NetworkAdapter<Backend = N, Item = Item, Key = Key> + Send + Sync + 'static,
A::Settings: Send + Sync,
D: Discriminant,
Item: Clone + Debug + Send + Sync + 'static + Hash,
Key: Clone + Debug + Ord + Hash + 'static,
{
let relay = handle
.relay::<MempoolService<A, MockPool<HeaderId, Item, Key>, D>>()
.relay::<TxMempoolService<A, MockPool<HeaderId, Item, Key>>>()
.connect()
.await?;
let (sender, receiver) = oneshot::channel();
relay
.send(MempoolMsg::Add {
key: converter(&item),
item,
reply_channel: sender,
})
.await
.map_err(|(e, _)| e)?;
match receiver.await {
Ok(Ok(())) => Ok(()),
Ok(Err(())) => Err("mempool error".into()),
Err(e) => Err(e.into()),
}
}
pub async fn add_cert<N, A, Item, Key>(
handle: &overwatch_rs::overwatch::handle::OverwatchHandle,
item: Item,
converter: impl Fn(&Item) -> Key,
) -> Result<(), super::DynError>
where
N: NetworkBackend,
A: NetworkAdapter<Backend = N, Item = Item, Key = Key> + Send + Sync + 'static,
A::Settings: Send + Sync,
Item: Clone + Debug + Send + Sync + 'static + Hash,
Key: Clone + Debug + Ord + Hash + 'static,
{
let relay = handle
.relay::<DaMempoolService<A, MockPool<HeaderId, Item, Key>>>()
.connect()
.await?;
let (sender, receiver) = oneshot::channel();

View File

@ -15,8 +15,8 @@ use nomos_core::{
header::cryptarchia::Builder,
};
use nomos_mempool::{
backend::MemPool, network::NetworkAdapter as MempoolAdapter, Certificate as CertDiscriminant,
MempoolMsg, MempoolService, Transaction as TxDiscriminant,
backend::MemPool, network::NetworkAdapter as MempoolAdapter, DaMempoolService, MempoolMsg,
TxMempoolService,
};
use nomos_network::NetworkService;
use nomos_storage::{backends::StorageBackend, StorageMsg, StorageService};
@ -149,8 +149,8 @@ where
// underlying networking backend. We need this so we can relay and check the types properly
// when implementing ServiceCore for CryptarchiaConsensus
network_relay: Relay<NetworkService<A::Backend>>,
cl_mempool_relay: Relay<MempoolService<ClPoolAdapter, ClPool, TxDiscriminant>>,
da_mempool_relay: Relay<MempoolService<DaPoolAdapter, DaPool, CertDiscriminant>>,
cl_mempool_relay: Relay<TxMempoolService<ClPoolAdapter, ClPool>>,
da_mempool_relay: Relay<DaMempoolService<DaPoolAdapter, DaPool>>,
block_subscription_sender: broadcast::Sender<Block<ClPool::Item, DaPool::Item>>,
storage_relay: Relay<StorageService<Storage>>,
}

View File

@ -0,0 +1 @@
pub mod service;

View File

@ -0,0 +1,244 @@
/// Re-export for OpenAPI
#[cfg(feature = "openapi")]
pub mod openapi {
pub use crate::backend::Status;
}
// std
use std::fmt::Debug;
// crates
// TODO: Add again after metrics refactor
// #[cfg(feature = "metrics")]
// use super::metrics::Metrics;
use futures::StreamExt;
use nomos_metrics::NomosRegistry;
// internal
use crate::backend::MemPool;
use crate::network::NetworkAdapter;
use crate::{MempoolMetrics, MempoolMsg};
use nomos_network::{NetworkMsg, NetworkService};
use overwatch_rs::services::life_cycle::LifecycleMessage;
use overwatch_rs::services::{
handle::ServiceStateHandle,
relay::{OutboundRelay, Relay},
state::{NoOperator, NoState},
ServiceCore, ServiceData, ServiceId,
};
use tracing::error;
pub struct DaMempoolService<N, P>
where
N: NetworkAdapter<Item = P::Item, Key = P::Key>,
P: MemPool,
P::Settings: Clone,
P::Item: Debug + 'static,
P::Key: Debug + 'static,
P::BlockId: Debug + 'static,
{
service_state: ServiceStateHandle<Self>,
network_relay: Relay<NetworkService<N::Backend>>,
pool: P,
// TODO: Add again after metrics refactor
// #[cfg(feature = "metrics")]
// metrics: Option<Metrics>,
}
impl<N, P> ServiceData for DaMempoolService<N, P>
where
N: NetworkAdapter<Item = P::Item, Key = P::Key>,
P: MemPool,
P::Settings: Clone,
P::Item: Debug + 'static,
P::Key: Debug + 'static,
P::BlockId: Debug + 'static,
{
const SERVICE_ID: ServiceId = "mempool-da";
type Settings = DaMempoolSettings<P::Settings, N::Settings>;
type State = NoState<Self::Settings>;
type StateOperator = NoOperator<Self::State>;
type Message = MempoolMsg<<P as MemPool>::BlockId, <P as MemPool>::Item, <P as MemPool>::Key>;
}
#[async_trait::async_trait]
impl<N, P> ServiceCore for DaMempoolService<N, P>
where
P: MemPool + Send + 'static,
P::Settings: Clone + Send + Sync + 'static,
N::Settings: Clone + Send + Sync + 'static,
P::Item: Clone + Debug + Send + Sync + 'static,
P::Key: Debug + Send + Sync + 'static,
P::BlockId: Send + Debug + 'static,
N: NetworkAdapter<Item = P::Item, Key = P::Key> + Send + Sync + 'static,
{
fn init(service_state: ServiceStateHandle<Self>) -> Result<Self, overwatch_rs::DynError> {
let network_relay = service_state.overwatch_handle.relay();
let settings = service_state.settings_reader.get_updated_settings();
// TODO: Refactor metrics to be reusable then replug it again
// #[cfg(feature = "metrics")]
// let metrics = settings
// .registry
// .map(|reg| Metrics::new(reg, service_state.id()));
Ok(Self {
service_state,
network_relay,
pool: P::new(settings.backend),
// #[cfg(feature = "metrics")]
// metrics,
})
}
async fn run(mut self) -> Result<(), overwatch_rs::DynError> {
let Self {
mut service_state,
network_relay,
mut pool,
..
} = self;
let mut network_relay: OutboundRelay<_> = network_relay
.connect()
.await
.expect("Relay connection with NetworkService should succeed");
let adapter = N::new(
service_state.settings_reader.get_updated_settings().network,
network_relay.clone(),
);
let adapter = adapter.await;
let mut network_items = adapter.transactions_stream().await;
let mut lifecycle_stream = service_state.lifecycle_handle.message_stream();
loop {
tokio::select! {
Some(msg) = service_state.inbound_relay.recv() => {
// TODO: replug metrics once refactor is done
// #[cfg(feature = "metrics")]
// if let Some(metrics) = &self.metrics { metrics.record(&msg) }
Self::handle_mempool_message(msg, &mut pool, &mut network_relay, &mut service_state).await;
}
Some((key, item )) = network_items.next() => {
pool.add_item(key, item).unwrap_or_else(|e| {
tracing::debug!("could not add item to the pool due to: {}", e)
});
}
Some(msg) = lifecycle_stream.next() => {
if Self::should_stop_service(msg).await {
break;
}
}
}
}
Ok(())
}
}
impl<N, P> DaMempoolService<N, P>
where
P: MemPool + Send + 'static,
P::Settings: Clone + Send + Sync + 'static,
N::Settings: Clone + Send + Sync + 'static,
P::Item: Clone + Debug + Send + Sync + 'static,
P::Key: Debug + Send + Sync + 'static,
P::BlockId: Debug + Send + 'static,
N: NetworkAdapter<Item = P::Item, Key = P::Key> + Send + Sync + 'static,
{
async fn should_stop_service(message: LifecycleMessage) -> bool {
match message {
LifecycleMessage::Shutdown(sender) => {
if sender.send(()).is_err() {
error!(
"Error sending successful shutdown signal from service {}",
Self::SERVICE_ID
);
}
true
}
LifecycleMessage::Kill => true,
}
}
async fn handle_mempool_message(
message: MempoolMsg<P::BlockId, P::Item, P::Key>,
pool: &mut P,
network_relay: &mut OutboundRelay<NetworkMsg<N::Backend>>,
service_state: &mut ServiceStateHandle<Self>,
) {
match message {
MempoolMsg::Add {
item,
key,
reply_channel,
} => {
match pool.add_item(key, item.clone()) {
Ok(_id) => {
// Broadcast the item to the network
let net = network_relay.clone();
let settings = service_state.settings_reader.get_updated_settings().network;
// move sending to a new task so local operations can complete in the meantime
tokio::spawn(async move {
let adapter = N::new(settings, net).await;
adapter.send(item).await;
});
if let Err(e) = reply_channel.send(Ok(())) {
tracing::debug!("Failed to send reply to AddTx: {:?}", e);
}
}
Err(e) => {
tracing::debug!("could not add tx to the pool due to: {}", e);
}
}
}
MempoolMsg::View {
ancestor_hint,
reply_channel,
} => {
reply_channel
.send(pool.view(ancestor_hint))
.unwrap_or_else(|_| tracing::debug!("could not send back pool view"));
}
MempoolMsg::MarkInBlock { ids, block } => {
pool.mark_in_block(&ids, block);
}
#[cfg(test)]
MempoolMsg::BlockItems {
block,
reply_channel,
} => {
reply_channel
.send(pool.block_items(block))
.unwrap_or_else(|_| tracing::debug!("could not send back block items"));
}
MempoolMsg::Prune { ids } => {
pool.prune(&ids);
}
MempoolMsg::Metrics { reply_channel } => {
let metrics = MempoolMetrics {
pending_items: pool.pending_item_count(),
last_item_timestamp: pool.last_item_timestamp(),
};
reply_channel
.send(metrics)
.unwrap_or_else(|_| tracing::debug!("could not send back mempool metrics"));
}
MempoolMsg::Status {
items,
reply_channel,
} => {
reply_channel
.send(pool.status(&items))
.unwrap_or_else(|_| tracing::debug!("could not send back mempool status"));
}
}
}
}
#[derive(Clone, Debug)]
pub struct DaMempoolSettings<B, N> {
pub backend: B,
pub network: N,
pub registry: Option<NomosRegistry>,
}

View File

@ -1,67 +1,15 @@
pub mod backend;
#[cfg(feature = "metrics")]
pub mod metrics;
pub mod da;
pub mod network;
pub mod tx;
/// Re-export for OpenAPI
#[cfg(feature = "openapi")]
pub mod openapi {
pub use super::{backend::Status, MempoolMetrics};
}
// std
use std::{
fmt::{Debug, Error, Formatter},
marker::PhantomData,
};
// crates
use futures::StreamExt;
#[cfg(feature = "metrics")]
use metrics::Metrics;
use nomos_metrics::NomosRegistry;
use backend::Status;
use overwatch_rs::services::relay::RelayMessage;
use std::fmt::{Debug, Error, Formatter};
use tokio::sync::oneshot::Sender;
// internal
use crate::network::NetworkAdapter;
use backend::{MemPool, Status};
use nomos_network::{NetworkMsg, NetworkService};
use overwatch_rs::services::life_cycle::LifecycleMessage;
use overwatch_rs::services::{
handle::ServiceStateHandle,
relay::{OutboundRelay, Relay, RelayMessage},
state::{NoOperator, NoState},
ServiceCore, ServiceData, ServiceId,
};
use tracing::error;
pub struct MempoolService<N, P, D>
where
N: NetworkAdapter<Item = P::Item, Key = P::Key>,
P: MemPool,
P::Settings: Clone,
P::Item: Debug + 'static,
P::Key: Debug + 'static,
P::BlockId: Debug + 'static,
D: Discriminant,
{
service_state: ServiceStateHandle<Self>,
network_relay: Relay<NetworkService<N::Backend>>,
pool: P,
#[cfg(feature = "metrics")]
metrics: Option<Metrics>,
// This is an hack because SERVICE_ID has to be univoque and associated const
// values can't depend on generic parameters.
// Unfortunately, this means that the mempools for certificates and transactions
// would have the same SERVICE_ID and break overwatch asumptions.
_d: PhantomData<D>,
}
#[cfg_attr(feature = "openapi", derive(utoipa::ToSchema))]
#[derive(serde::Serialize, serde::Deserialize)]
pub struct MempoolMetrics {
pub pending_items: usize,
pub last_item_timestamp: u64,
}
pub use da::service::{DaMempoolService, DaMempoolSettings};
pub use tx::service::{TxMempoolService, TxMempoolSettings};
pub enum MempoolMsg<BlockId, Item, Key> {
Add {
@ -123,223 +71,14 @@ where
}
}
#[cfg_attr(feature = "openapi", derive(utoipa::ToSchema))]
#[derive(serde::Serialize, serde::Deserialize)]
pub struct MempoolMetrics {
pub pending_items: usize,
pub last_item_timestamp: u64,
}
impl<BlockId: 'static, Item: 'static, Key: 'static> RelayMessage
for MempoolMsg<BlockId, Item, Key>
{
}
pub struct Transaction;
pub struct Certificate;
pub trait Discriminant {
const ID: &'static str;
}
impl Discriminant for Transaction {
const ID: &'static str = "mempool-cl";
}
impl Discriminant for Certificate {
const ID: &'static str = "mempool-da";
}
impl<N, P, D> ServiceData for MempoolService<N, P, D>
where
N: NetworkAdapter<Item = P::Item, Key = P::Key>,
P: MemPool,
P::Settings: Clone,
P::Item: Debug + 'static,
P::Key: Debug + 'static,
P::BlockId: Debug + 'static,
D: Discriminant,
{
const SERVICE_ID: ServiceId = D::ID;
type Settings = Settings<P::Settings, N::Settings>;
type State = NoState<Self::Settings>;
type StateOperator = NoOperator<Self::State>;
type Message = MempoolMsg<<P as MemPool>::BlockId, <P as MemPool>::Item, <P as MemPool>::Key>;
}
#[async_trait::async_trait]
impl<N, P, D> ServiceCore for MempoolService<N, P, D>
where
P: MemPool + Send + 'static,
P::Settings: Clone + Send + Sync + 'static,
N::Settings: Clone + Send + Sync + 'static,
P::Item: Clone + Debug + Send + Sync + 'static,
P::Key: Debug + Send + Sync + 'static,
P::BlockId: Send + Debug + 'static,
N: NetworkAdapter<Item = P::Item, Key = P::Key> + Send + Sync + 'static,
D: Discriminant + Send,
{
fn init(service_state: ServiceStateHandle<Self>) -> Result<Self, overwatch_rs::DynError> {
let network_relay = service_state.overwatch_handle.relay();
let settings = service_state.settings_reader.get_updated_settings();
#[cfg(feature = "metrics")]
let metrics = settings
.registry
.map(|reg| Metrics::new(reg, service_state.id()));
Ok(Self {
service_state,
network_relay,
pool: P::new(settings.backend),
#[cfg(feature = "metrics")]
metrics,
_d: PhantomData,
})
}
async fn run(mut self) -> Result<(), overwatch_rs::DynError> {
let Self {
mut service_state,
network_relay,
mut pool,
..
} = self;
let mut network_relay: OutboundRelay<_> = network_relay
.connect()
.await
.expect("Relay connection with NetworkService should succeed");
let adapter = N::new(
service_state.settings_reader.get_updated_settings().network,
network_relay.clone(),
);
let adapter = adapter.await;
let mut network_items = adapter.transactions_stream().await;
let mut lifecycle_stream = service_state.lifecycle_handle.message_stream();
loop {
tokio::select! {
Some(msg) = service_state.inbound_relay.recv() => {
#[cfg(feature = "metrics")]
if let Some(metrics) = &self.metrics { metrics.record(&msg) }
Self::handle_mempool_message(msg, &mut pool, &mut network_relay, &mut service_state).await;
}
Some((key, item )) = network_items.next() => {
pool.add_item(key, item).unwrap_or_else(|e| {
tracing::debug!("could not add item to the pool due to: {}", e)
});
}
Some(msg) = lifecycle_stream.next() => {
if Self::should_stop_service(msg).await {
break;
}
}
}
}
Ok(())
}
}
impl<N, P, D> MempoolService<N, P, D>
where
P: MemPool + Send + 'static,
P::Settings: Clone + Send + Sync + 'static,
N::Settings: Clone + Send + Sync + 'static,
P::Item: Clone + Debug + Send + Sync + 'static,
P::Key: Debug + Send + Sync + 'static,
P::BlockId: Debug + Send + 'static,
N: NetworkAdapter<Item = P::Item, Key = P::Key> + Send + Sync + 'static,
D: Discriminant + Send,
{
async fn should_stop_service(message: LifecycleMessage) -> bool {
match message {
LifecycleMessage::Shutdown(sender) => {
if sender.send(()).is_err() {
error!(
"Error sending successful shutdown signal from service {}",
Self::SERVICE_ID
);
}
true
}
LifecycleMessage::Kill => true,
}
}
async fn handle_mempool_message(
message: MempoolMsg<P::BlockId, P::Item, P::Key>,
pool: &mut P,
network_relay: &mut OutboundRelay<NetworkMsg<N::Backend>>,
service_state: &mut ServiceStateHandle<Self>,
) {
match message {
MempoolMsg::Add {
item,
key,
reply_channel,
} => {
match pool.add_item(key, item.clone()) {
Ok(_id) => {
// Broadcast the item to the network
let net = network_relay.clone();
let settings = service_state.settings_reader.get_updated_settings().network;
// move sending to a new task so local operations can complete in the meantime
tokio::spawn(async move {
let adapter = N::new(settings, net).await;
adapter.send(item).await;
});
if let Err(e) = reply_channel.send(Ok(())) {
tracing::debug!("Failed to send reply to AddTx: {:?}", e);
}
}
Err(e) => {
tracing::debug!("could not add tx to the pool due to: {}", e);
}
}
}
MempoolMsg::View {
ancestor_hint,
reply_channel,
} => {
reply_channel
.send(pool.view(ancestor_hint))
.unwrap_or_else(|_| tracing::debug!("could not send back pool view"));
}
MempoolMsg::MarkInBlock { ids, block } => {
pool.mark_in_block(&ids, block);
}
#[cfg(test)]
MempoolMsg::BlockItems {
block,
reply_channel,
} => {
reply_channel
.send(pool.block_items(block))
.unwrap_or_else(|_| tracing::debug!("could not send back block items"));
}
MempoolMsg::Prune { ids } => {
pool.prune(&ids);
}
MempoolMsg::Metrics { reply_channel } => {
let metrics = MempoolMetrics {
pending_items: pool.pending_item_count(),
last_item_timestamp: pool.last_item_timestamp(),
};
reply_channel
.send(metrics)
.unwrap_or_else(|_| tracing::debug!("could not send back mempool metrics"));
}
MempoolMsg::Status {
items,
reply_channel,
} => {
reply_channel
.send(pool.status(&items))
.unwrap_or_else(|_| tracing::debug!("could not send back mempool status"));
}
}
}
}
#[derive(Clone, Debug)]
pub struct Settings<B, N> {
pub backend: B,
pub network: N,
pub registry: Option<NomosRegistry>,
}

View File

@ -4,6 +4,6 @@ use serde::{Deserialize, Serialize};
// internal
#[derive(Serialize, Deserialize)]
pub struct TransactionMsg<Tx> {
pub tx: Tx,
pub struct PayloadMsg<Payload> {
pub payload: Payload,
}

View File

@ -8,7 +8,7 @@ use nomos_metrics::{
};
use overwatch_rs::services::ServiceId;
// internal
use crate::MempoolMsg;
use super::service::TxMempoolMsg;
#[derive(Debug, Clone, Hash, PartialEq, Eq, EncodeLabelValue)]
enum MempoolMsgType {
@ -18,17 +18,17 @@ enum MempoolMsgType {
MarkInBlock,
}
impl<BlockId, I, K> From<&MempoolMsg<BlockId, I, K>> for MempoolMsgType
impl<BlockId, I, K> From<&TxMempoolMsg<BlockId, I, K>> for MempoolMsgType
where
I: 'static + Debug,
K: 'static + Debug,
{
fn from(event: &MempoolMsg<BlockId, I, K>) -> Self {
fn from(event: &TxMempoolMsg<BlockId, I, K>) -> Self {
match event {
MempoolMsg::Add { .. } => MempoolMsgType::Add,
MempoolMsg::View { .. } => MempoolMsgType::View,
MempoolMsg::Prune { .. } => MempoolMsgType::Prune,
MempoolMsg::MarkInBlock { .. } => MempoolMsgType::MarkInBlock,
TxMempoolMsg::Add { .. } => MempoolMsgType::Add,
TxMempoolMsg::View { .. } => MempoolMsgType::View,
TxMempoolMsg::Prune { .. } => MempoolMsgType::Prune,
TxMempoolMsg::MarkInBlock { .. } => MempoolMsgType::MarkInBlock,
_ => unimplemented!(),
}
}
@ -60,16 +60,16 @@ impl Metrics {
Self { messages }
}
pub(crate) fn record<BlockId, I, K>(&self, msg: &MempoolMsg<BlockId, I, K>)
pub(crate) fn record<BlockId, I, K>(&self, msg: &TxMempoolMsg<BlockId, I, K>)
where
I: 'static + Debug,
K: 'static + Debug,
{
match msg {
MempoolMsg::Add { .. }
| MempoolMsg::View { .. }
| MempoolMsg::Prune { .. }
| MempoolMsg::MarkInBlock { .. } => {
TxMempoolMsg::Add { .. }
| TxMempoolMsg::View { .. }
| TxMempoolMsg::Prune { .. }
| TxMempoolMsg::MarkInBlock { .. } => {
self.messages
.get_or_create(&MessageLabels { label: msg.into() })
.inc();

View File

@ -0,0 +1,3 @@
#[cfg(feature = "metrics")]
pub mod metrics;
pub mod service;

View File

@ -0,0 +1,240 @@
/// Re-export for OpenAPI
#[cfg(feature = "openapi")]
pub mod openapi {
pub use crate::backend::Status;
}
// std
use std::fmt::Debug;
// crates
#[cfg(feature = "metrics")]
use super::metrics::Metrics;
use futures::StreamExt;
use nomos_metrics::NomosRegistry;
// internal
use crate::backend::MemPool;
use crate::network::NetworkAdapter;
use crate::{MempoolMetrics, MempoolMsg};
use nomos_network::{NetworkMsg, NetworkService};
use overwatch_rs::services::life_cycle::LifecycleMessage;
use overwatch_rs::services::{
handle::ServiceStateHandle,
relay::{OutboundRelay, Relay},
state::{NoOperator, NoState},
ServiceCore, ServiceData, ServiceId,
};
use tracing::error;
pub struct TxMempoolService<N, P>
where
N: NetworkAdapter<Item = P::Item, Key = P::Key>,
P: MemPool,
P::Settings: Clone,
P::Item: Debug + 'static,
P::Key: Debug + 'static,
P::BlockId: Debug + 'static,
{
service_state: ServiceStateHandle<Self>,
network_relay: Relay<NetworkService<N::Backend>>,
pool: P,
#[cfg(feature = "metrics")]
metrics: Option<Metrics>,
}
impl<N, P> ServiceData for TxMempoolService<N, P>
where
N: NetworkAdapter<Item = P::Item, Key = P::Key>,
P: MemPool,
P::Settings: Clone,
P::Item: Debug + 'static,
P::Key: Debug + 'static,
P::BlockId: Debug + 'static,
{
const SERVICE_ID: ServiceId = "mempool-cl";
type Settings = TxMempoolSettings<P::Settings, N::Settings>;
type State = NoState<Self::Settings>;
type StateOperator = NoOperator<Self::State>;
type Message = MempoolMsg<<P as MemPool>::BlockId, <P as MemPool>::Item, <P as MemPool>::Key>;
}
#[async_trait::async_trait]
impl<N, P> ServiceCore for TxMempoolService<N, P>
where
P: MemPool + Send + 'static,
P::Settings: Clone + Send + Sync + 'static,
N::Settings: Clone + Send + Sync + 'static,
P::Item: Clone + Debug + Send + Sync + 'static,
P::Key: Debug + Send + Sync + 'static,
P::BlockId: Send + Debug + 'static,
N: NetworkAdapter<Item = P::Item, Key = P::Key> + Send + Sync + 'static,
{
fn init(service_state: ServiceStateHandle<Self>) -> Result<Self, overwatch_rs::DynError> {
let network_relay = service_state.overwatch_handle.relay();
let settings = service_state.settings_reader.get_updated_settings();
#[cfg(feature = "metrics")]
let metrics = settings
.registry
.map(|reg| Metrics::new(reg, service_state.id()));
Ok(Self {
service_state,
network_relay,
pool: P::new(settings.backend),
#[cfg(feature = "metrics")]
metrics,
})
}
async fn run(mut self) -> Result<(), overwatch_rs::DynError> {
let Self {
mut service_state,
network_relay,
mut pool,
..
} = self;
let mut network_relay: OutboundRelay<_> = network_relay
.connect()
.await
.expect("Relay connection with NetworkService should succeed");
let adapter = N::new(
service_state.settings_reader.get_updated_settings().network,
network_relay.clone(),
);
let adapter = adapter.await;
let mut network_items = adapter.transactions_stream().await;
let mut lifecycle_stream = service_state.lifecycle_handle.message_stream();
loop {
tokio::select! {
Some(msg) = service_state.inbound_relay.recv() => {
#[cfg(feature = "metrics")]
if let Some(metrics) = &self.metrics { metrics.record(&msg) }
Self::handle_mempool_message(msg, &mut pool, &mut network_relay, &mut service_state).await;
}
Some((key, item )) = network_items.next() => {
pool.add_item(key, item).unwrap_or_else(|e| {
tracing::debug!("could not add item to the pool due to: {}", e)
});
}
Some(msg) = lifecycle_stream.next() => {
if Self::should_stop_service(msg).await {
break;
}
}
}
}
Ok(())
}
}
impl<N, P> TxMempoolService<N, P>
where
P: MemPool + Send + 'static,
P::Settings: Clone + Send + Sync + 'static,
N::Settings: Clone + Send + Sync + 'static,
P::Item: Clone + Debug + Send + Sync + 'static,
P::Key: Debug + Send + Sync + 'static,
P::BlockId: Debug + Send + 'static,
N: NetworkAdapter<Item = P::Item, Key = P::Key> + Send + Sync + 'static,
{
async fn should_stop_service(message: LifecycleMessage) -> bool {
match message {
LifecycleMessage::Shutdown(sender) => {
if sender.send(()).is_err() {
error!(
"Error sending successful shutdown signal from service {}",
Self::SERVICE_ID
);
}
true
}
LifecycleMessage::Kill => true,
}
}
async fn handle_mempool_message(
message: MempoolMsg<P::BlockId, P::Item, P::Key>,
pool: &mut P,
network_relay: &mut OutboundRelay<NetworkMsg<N::Backend>>,
service_state: &mut ServiceStateHandle<Self>,
) {
match message {
MempoolMsg::Add {
item,
key,
reply_channel,
} => {
match pool.add_item(key, item.clone()) {
Ok(_id) => {
// Broadcast the item to the network
let net = network_relay.clone();
let settings = service_state.settings_reader.get_updated_settings().network;
// move sending to a new task so local operations can complete in the meantime
tokio::spawn(async move {
let adapter = N::new(settings, net).await;
adapter.send(item).await;
});
if let Err(e) = reply_channel.send(Ok(())) {
tracing::debug!("Failed to send reply to AddTx: {:?}", e);
}
}
Err(e) => {
tracing::debug!("could not add tx to the pool due to: {}", e);
}
}
}
MempoolMsg::View {
ancestor_hint,
reply_channel,
} => {
reply_channel
.send(pool.view(ancestor_hint))
.unwrap_or_else(|_| tracing::debug!("could not send back pool view"));
}
MempoolMsg::MarkInBlock { ids, block } => {
pool.mark_in_block(&ids, block);
}
#[cfg(test)]
MempoolMsg::BlockItems {
block,
reply_channel,
} => {
reply_channel
.send(pool.block_items(block))
.unwrap_or_else(|_| tracing::debug!("could not send back block items"));
}
MempoolMsg::Prune { ids } => {
pool.prune(&ids);
}
MempoolMsg::Metrics { reply_channel } => {
let metrics = MempoolMetrics {
pending_items: pool.pending_item_count(),
last_item_timestamp: pool.last_item_timestamp(),
};
reply_channel
.send(metrics)
.unwrap_or_else(|_| tracing::debug!("could not send back mempool metrics"));
}
MempoolMsg::Status {
items,
reply_channel,
} => {
reply_channel
.send(pool.status(&items))
.unwrap_or_else(|_| tracing::debug!("could not send back mempool status"));
}
}
}
}
#[derive(Clone, Debug)]
pub struct TxMempoolSettings<B, N> {
pub backend: B,
pub network: N,
pub registry: Option<NomosRegistry>,
}

View File

@ -13,7 +13,7 @@ use overwatch_rs::{overwatch::OverwatchRunner, services::handle::ServiceHandle};
use nomos_mempool::{
backend::mockpool::MockPool,
network::adapters::mock::{MockAdapter, MOCK_TX_CONTENT_TOPIC},
MempoolMsg, MempoolService, Settings, Transaction,
MempoolMsg, TxMempoolService, TxMempoolSettings,
};
#[derive(Services)]
@ -21,11 +21,7 @@ struct MockPoolNode {
logging: ServiceHandle<Logger>,
network: ServiceHandle<NetworkService<Mock>>,
mockpool: ServiceHandle<
MempoolService<
MockAdapter,
MockPool<HeaderId, MockTransaction<MockMessage>, MockTxId>,
Transaction,
>,
TxMempoolService<MockAdapter, MockPool<HeaderId, MockTransaction<MockMessage>, MockTxId>>,
>,
}
@ -65,7 +61,7 @@ fn test_mockmempool() {
weights: None,
},
},
mockpool: Settings {
mockpool: TxMempoolSettings {
backend: (),
network: (),
registry: None,
@ -78,11 +74,11 @@ fn test_mockmempool() {
.unwrap();
let network = app.handle().relay::<NetworkService<Mock>>();
let mempool = app.handle().relay::<MempoolService<
MockAdapter,
MockPool<HeaderId, MockTransaction<MockMessage>, MockTxId>,
Transaction,
>>();
let mempool =
app.handle().relay::<TxMempoolService<
MockAdapter,
MockPool<HeaderId, MockTransaction<MockMessage>, MockTxId>,
>>();
app.spawn(async move {
let network_outbound = network.connect().await.unwrap();