From 1114f611afc120614859d6350c94e49c0a5baa32 Mon Sep 17 00:00:00 2001 From: Julian Sparber Date: Mon, 19 Jan 2026 17:44:06 +0100 Subject: [PATCH 1/6] node: Update p2panda to new API --- Cargo.lock | 29 ++--- reflection-node/Cargo.toml | 12 +-- reflection-node/src/author_tracker.rs | 8 +- reflection-node/src/lib.rs | 1 + reflection-node/src/network.rs | 123 ++++++++++++++++++++++ reflection-node/src/node_inner.rs | 90 ++++------------ reflection-node/src/subscription_inner.rs | 72 +++++-------- reflection-node/src/topic.rs | 17 +-- reflection-node/src/topic_store.rs | 2 +- 9 files changed, 204 insertions(+), 150 deletions(-) create mode 100644 reflection-node/src/network.rs diff --git a/Cargo.lock b/Cargo.lock index 55aa002..7626ff3 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3686,7 +3686,7 @@ dependencies = [ [[package]] name = "p2panda-core" version = "0.4.0" -source = "git+https://github.com/p2panda/p2panda?rev=a83a80b2ce0733c9437ceeaae33503d3b3742436#a83a80b2ce0733c9437ceeaae33503d3b3742436" +source = "git+https://github.com/p2panda/p2panda?rev=d9bff24ce2805302fcbc6554fbaba51b278e918c#d9bff24ce2805302fcbc6554fbaba51b278e918c" dependencies = [ "blake3", "ciborium", @@ -3699,9 +3699,9 @@ dependencies = [ ] [[package]] -name = "p2panda-discovery-next" +name = "p2panda-discovery" version = "0.4.0" -source = "git+https://github.com/p2panda/p2panda?rev=a83a80b2ce0733c9437ceeaae33503d3b3742436#a83a80b2ce0733c9437ceeaae33503d3b3742436" +source = "git+https://github.com/p2panda/p2panda?rev=d9bff24ce2805302fcbc6554fbaba51b278e918c#d9bff24ce2805302fcbc6554fbaba51b278e918c" dependencies = [ "blake3", "futures-util", @@ -3713,9 +3713,9 @@ dependencies = [ ] [[package]] -name = "p2panda-net-next" +name = "p2panda-net" version = "0.4.0" -source = "git+https://github.com/p2panda/p2panda?rev=a83a80b2ce0733c9437ceeaae33503d3b3742436#a83a80b2ce0733c9437ceeaae33503d3b3742436" +source = "git+https://github.com/p2panda/p2panda?rev=d9bff24ce2805302fcbc6554fbaba51b278e918c#d9bff24ce2805302fcbc6554fbaba51b278e918c" dependencies = [ "ciborium", "futures-channel", @@ -3725,8 +3725,9 @@ dependencies = [ "iroh-base", "iroh-gossip", "p2panda-core", - "p2panda-discovery-next", - "p2panda-sync-next", + "p2panda-discovery", + "p2panda-store", + "p2panda-sync", "ractor", "rand 0.9.2", "rand_chacha 0.9.0", @@ -3741,7 +3742,7 @@ dependencies = [ [[package]] name = "p2panda-store" version = "0.4.0" -source = "git+https://github.com/p2panda/p2panda?rev=a83a80b2ce0733c9437ceeaae33503d3b3742436#a83a80b2ce0733c9437ceeaae33503d3b3742436" +source = "git+https://github.com/p2panda/p2panda?rev=d9bff24ce2805302fcbc6554fbaba51b278e918c#d9bff24ce2805302fcbc6554fbaba51b278e918c" dependencies = [ "ciborium", "hex", @@ -3754,7 +3755,7 @@ dependencies = [ [[package]] name = "p2panda-stream" version = "0.4.0" -source = "git+https://github.com/p2panda/p2panda?rev=a83a80b2ce0733c9437ceeaae33503d3b3742436#a83a80b2ce0733c9437ceeaae33503d3b3742436" +source = "git+https://github.com/p2panda/p2panda?rev=d9bff24ce2805302fcbc6554fbaba51b278e918c#d9bff24ce2805302fcbc6554fbaba51b278e918c" dependencies = [ "ciborium", "futures-channel", @@ -3767,9 +3768,9 @@ dependencies = [ ] [[package]] -name = "p2panda-sync-next" +name = "p2panda-sync" version = "0.4.0" -source = "git+https://github.com/p2panda/p2panda?rev=a83a80b2ce0733c9437ceeaae33503d3b3742436#a83a80b2ce0733c9437ceeaae33503d3b3742436" +source = "git+https://github.com/p2panda/p2panda?rev=d9bff24ce2805302fcbc6554fbaba51b278e918c#d9bff24ce2805302fcbc6554fbaba51b278e918c" dependencies = [ "futures", "futures-util", @@ -4349,11 +4350,11 @@ dependencies = [ "hex", "iroh", "p2panda-core", - "p2panda-discovery-next", - "p2panda-net-next", + "p2panda-discovery", + "p2panda-net", "p2panda-store", "p2panda-stream", - "p2panda-sync-next", + "p2panda-sync", "rand_chacha 0.9.0", "serde", "serde_bytes", diff --git a/reflection-node/Cargo.toml b/reflection-node/Cargo.toml index d6b00ec..26a191a 100644 --- a/reflection-node/Cargo.toml +++ b/reflection-node/Cargo.toml @@ -12,12 +12,12 @@ authors = [ thiserror = "2.0.17" chrono = "0.4.42" ciborium = "0.2.2" -p2panda-core = { git = "https://github.com/p2panda/p2panda", rev = "a83a80b2ce0733c9437ceeaae33503d3b3742436" } -p2panda-discovery = { git = "https://github.com/p2panda/p2panda", rev = "a83a80b2ce0733c9437ceeaae33503d3b3742436", package = "p2panda-discovery-next" } -p2panda-net = { git = "https://github.com/p2panda/p2panda", rev = "a83a80b2ce0733c9437ceeaae33503d3b3742436", package = "p2panda-net-next" } -p2panda-store = { git = "https://github.com/p2panda/p2panda", rev = "a83a80b2ce0733c9437ceeaae33503d3b3742436", features = ["sqlite"], default-features = false } -p2panda-stream = { git = "https://github.com/p2panda/p2panda", rev = "a83a80b2ce0733c9437ceeaae33503d3b3742436" } -p2panda-sync = { git = "https://github.com/p2panda/p2panda", rev = "a83a80b2ce0733c9437ceeaae33503d3b3742436", package = "p2panda-sync-next" } +p2panda-core = { git = "https://github.com/p2panda/p2panda", rev = "d9bff24ce2805302fcbc6554fbaba51b278e918c" } +p2panda-discovery = { git = "https://github.com/p2panda/p2panda", rev = "d9bff24ce2805302fcbc6554fbaba51b278e918c" } +p2panda-net = { git = "https://github.com/p2panda/p2panda", rev = "d9bff24ce2805302fcbc6554fbaba51b278e918c" } +p2panda-store = { git = "https://github.com/p2panda/p2panda", rev = "d9bff24ce2805302fcbc6554fbaba51b278e918c", features = ["sqlite"], default-features = false } +p2panda-stream = { git = "https://github.com/p2panda/p2panda", rev = "d9bff24ce2805302fcbc6554fbaba51b278e918c" } +p2panda-sync = { git = "https://github.com/p2panda/p2panda", rev = "d9bff24ce2805302fcbc6554fbaba51b278e918c" } serde = { version = "1.0.228", features = ["derive"] } serde_bytes = "0.11.19" sqlx = { version = "0.8.6", features = ["runtime-tokio", "sqlite", "chrono"], default-features = false } diff --git a/reflection-node/src/author_tracker.rs b/reflection-node/src/author_tracker.rs index a7d5791..f665f2b 100644 --- a/reflection-node/src/author_tracker.rs +++ b/reflection-node/src/author_tracker.rs @@ -10,7 +10,7 @@ use crate::topic::SubscribableTopic; use chrono::Utc; use p2panda_core::cbor::{DecodeError, decode_cbor, encode_cbor}; use p2panda_core::{PrivateKey, PublicKey}; -use p2panda_net::streams::EphemeralStream; +use p2panda_net::gossip::GossipHandle; use tokio::sync::{Mutex, RwLock}; use tracing::error; @@ -47,7 +47,7 @@ pub struct AuthorTracker { last_ping: Mutex>, subscribable_topic: Arc, node: Arc, - tx: RwLock>, + tx: RwLock>, } impl AuthorTracker { @@ -60,7 +60,7 @@ impl AuthorTracker { }) } - pub async fn set_topic_tx(&self, tx: Option) { + pub async fn set_topic_tx(&self, tx: Option) { let mut tx_guard = self.tx.write().await; // Send good bye message to the network if let Some(tx) = tx_guard.as_ref() { @@ -174,7 +174,7 @@ impl AuthorTracker { } } -async fn send_message(private_key: &PrivateKey, tx: &EphemeralStream, message: AuthorMessage) { +async fn send_message(private_key: &PrivateKey, tx: &GossipHandle, message: AuthorMessage) { // FIXME: We need to add the current time to the message, // because iroh doesn't broadcast twice the same message message. let author_message = match encode_cbor(&(&message, SystemTime::now())) { diff --git a/reflection-node/src/lib.rs b/reflection-node/src/lib.rs index bcd59d8..5eb23d1 100644 --- a/reflection-node/src/lib.rs +++ b/reflection-node/src/lib.rs @@ -1,5 +1,6 @@ mod author_tracker; mod ephemerial_operation; +mod network; pub mod node; mod node_inner; mod operation; diff --git a/reflection-node/src/network.rs b/reflection-node/src/network.rs new file mode 100644 index 0000000..4ed6fe4 --- /dev/null +++ b/reflection-node/src/network.rs @@ -0,0 +1,123 @@ +use std::sync::LazyLock; + +use thiserror::Error; +use tracing::error; + +use p2panda_core::Hash; +use p2panda_core::PrivateKey; +use p2panda_net::address_book::{AddressBook, AddressBookError}; +use p2panda_net::gossip::{Gossip, GossipError}; +use p2panda_net::iroh_endpoint::{Endpoint, EndpointError}; +use p2panda_net::iroh_mdns::{MdnsDiscovery, MdnsDiscoveryError, MdnsDiscoveryMode}; +use p2panda_net::{TopicId, addrs::NodeInfo}; + +use crate::operation::ReflectionExtensions; +use crate::operation_store::OperationStore; +use crate::topic_store::{LogId, TopicStore}; + +static RELAY_URL: LazyLock = LazyLock::new(|| { + "https://euc1-1.relay.n0.iroh-canary.iroh.link" + .parse() + .expect("valid relay URL") +}); + +static BOOTSTRAP_NODE: LazyLock = LazyLock::new(|| { + let endpoint_addr = iroh::EndpointAddr::new( + "7ccdbeed587a8ec8c71cdc9b98e941ac597e11b0216aac1387ef81089a4930b2" + .parse() + .expect("valid bootstrap node id"), + ) + .with_relay_url(RELAY_URL.clone()); + NodeInfo::from(endpoint_addr).bootstrap() +}); + +type TopicSyncManager = p2panda_sync::manager::TopicSyncManager< + TopicId, + p2panda_store::SqliteStore, + TopicStore, + LogId, + ReflectionExtensions, +>; +pub type LogSync = p2panda_net::sync::LogSync< + p2panda_store::SqliteStore, + LogId, + ReflectionExtensions, + TopicStore, +>; +pub type LogSyncError = p2panda_net::sync::LogSyncError; +pub type SyncHandle = p2panda_net::sync::SyncHandle; +pub type SyncHandleError = p2panda_net::sync::SyncHandleError; + +#[derive(Error, Debug)] +pub enum NetworkError { + #[error(transparent)] + Gossip(#[from] GossipError), + #[error(transparent)] + LogSync(#[from] LogSyncError), + #[error(transparent)] + AddressBook(#[from] AddressBookError), + #[error(transparent)] + MdnsDiscovery(#[from] MdnsDiscoveryError), + #[error(transparent)] + Endpoint(#[from] EndpointError), +} + +#[allow(dead_code)] +pub struct Network { + pub(crate) mdns_discovery: MdnsDiscovery, + pub(crate) gossip: Gossip, + pub(crate) log_sync: LogSync, + pub(crate) endpoint: Endpoint, +} + +// FIXME: Endpoint, LogSync, MdnsDiscovery, and Gossip should implement debug +impl std::fmt::Debug for Network { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("Network").finish() + } +} + +impl Network { + pub async fn new( + private_key: &PrivateKey, + network_id: &Hash, + topic_store: &TopicStore, + operation_store: &OperationStore, + ) -> Result { + let address_book = AddressBook::builder().spawn().await?; + + if let Err(error) = address_book.insert_node_info(BOOTSTRAP_NODE.clone()).await { + error!("Failed to add bootstrap node to the address book: {error}"); + } + + let endpoint = Endpoint::builder(address_book.clone()) + .network_id(network_id.into()) + .private_key(private_key.clone()) + .relay_url(RELAY_URL.clone()) + .spawn() + .await?; + + let mdns_discovery = MdnsDiscovery::builder(address_book.clone(), endpoint.clone()) + .mode(MdnsDiscoveryMode::Active) + .spawn() + .await?; + let gossip = Gossip::builder(address_book.clone(), endpoint.clone()) + .spawn() + .await?; + let log_sync = LogSync::builder( + operation_store.clone_inner(), + topic_store.clone(), + endpoint.clone(), + gossip.clone(), + ) + .spawn() + .await?; + + Ok(Network { + mdns_discovery, + gossip, + log_sync, + endpoint, + }) + } +} diff --git a/reflection-node/src/node_inner.rs b/reflection-node/src/node_inner.rs index c91f7ac..a946440 100644 --- a/reflection-node/src/node_inner.rs +++ b/reflection-node/src/node_inner.rs @@ -1,34 +1,21 @@ use std::path::PathBuf; -use std::sync::{Arc, LazyLock}; +use std::sync::Arc; use crate::ephemerial_operation::EphemerialOperation; +use crate::network::Network; use crate::node::{ConnectionMode, NodeError}; -use crate::operation::ReflectionExtensions; use crate::operation_store::OperationStore; use crate::subscription_inner::SubscriptionInner; use crate::topic::{SubscribableTopic, TopicError}; -use crate::topic_store::{LogId, TopicStore}; +use crate::topic_store::TopicStore; use crate::utils::CombinedMigrationSource; use p2panda_core::{Hash, PrivateKey}; -use p2panda_discovery::address_book::AddressBookStore; -use p2panda_discovery::address_book::memory::MemoryStore as MemoryAddressBook; -use p2panda_net::{MdnsDiscoveryMode, NodeInfo, TopicId}; -use p2panda_net::{Network, NetworkBuilder}; +use p2panda_net::TopicId; use p2panda_store::sqlite::store::migrations as operation_store_migrations; -use p2panda_sync::managers::topic_sync_manager::TopicSyncManagerConfig; -use rand_chacha::rand_core::SeedableRng; use sqlx::{migrate::Migrator, sqlite}; use tokio::sync::{Notify, RwLock}; -use tracing::{error, info, warn}; - -pub type TopicSyncManager = p2panda_sync::managers::topic_sync_manager::TopicSyncManager< - TopicId, - p2panda_store::SqliteStore, - TopicStore, - LogId, - ReflectionExtensions, ->; +use tracing::{info, warn}; #[derive(Debug, serde::Serialize, serde::Deserialize)] pub(crate) enum MessageType { @@ -42,26 +29,10 @@ pub struct NodeInner { pub(crate) topic_store: TopicStore, pub(crate) private_key: PrivateKey, pub(crate) network_id: Hash, - pub(crate) network: RwLock>>, + pub(crate) network: RwLock>, pub(crate) network_notifier: Notify, } -static RELAY_URL: LazyLock = LazyLock::new(|| { - "https://euc1-1.relay.n0.iroh-canary.iroh.link" - .parse() - .expect("valid relay URL") -}); - -static BOOTSTRAP_NODE: LazyLock = LazyLock::new(|| { - let endpoint_addr = iroh::EndpointAddr::new( - "7ccdbeed587a8ec8c71cdc9b98e941ac597e11b0216aac1387ef81089a4930b2" - .parse() - .expect("valid bootstrap node id"), - ) - .with_relay_url(RELAY_URL.clone()); - NodeInfo::from(endpoint_addr).bootstrap() -}); - impl NodeInner { pub async fn new( network_id: Hash, @@ -102,7 +73,14 @@ impl NodeInner { unimplemented!("Bluetooth is currently not implemented") } ConnectionMode::Network => { - setup_network(&private_key, &network_id, &topic_store, &operation_store).await + match Network::new(&private_key, &network_id, &topic_store, &operation_store).await + { + Ok(network) => Some(network), + Err(error) => { + warn!("Failed to startup network: {error}"); + None + } + } } }; @@ -129,13 +107,20 @@ impl NodeInner { unimplemented!("Bluetooth is currently not implemented") } ConnectionMode::Network => { - setup_network( + match Network::new( &self.private_key, &self.network_id, &self.topic_store, &self.operation_store, ) .await + { + Ok(network) => Some(network), + Err(error) => { + warn!("Failed to startup network: {error}"); + None + } + } } }; @@ -179,34 +164,3 @@ impl NodeInner { Ok(()) } } - -async fn setup_network( - private_key: &PrivateKey, - network_id: &Hash, - topic_store: &TopicStore, - operation_store: &OperationStore, -) -> Option> { - let address_book = MemoryAddressBook::new(rand_chacha::ChaCha20Rng::from_os_rng()); - - if let Err(error) = address_book.insert_node_info(BOOTSTRAP_NODE.clone()).await { - error!("Failed to add bootstrap node to the address book: {error}"); - } - - let sync_conf = TopicSyncManagerConfig { - store: operation_store.clone_inner(), - topic_map: topic_store.clone(), - }; - let network = NetworkBuilder::new(network_id.into()) - .private_key(private_key.clone()) - .mdns(MdnsDiscoveryMode::Active) - .relay(RELAY_URL.clone()) - .build(address_book, sync_conf) - .await; - - if let Err(error) = network { - warn!("Failed to startup network: {error}"); - None - } else { - network.ok() - } -} diff --git a/reflection-node/src/subscription_inner.rs b/reflection-node/src/subscription_inner.rs index a52eea5..8c59540 100644 --- a/reflection-node/src/subscription_inner.rs +++ b/reflection-node/src/subscription_inner.rs @@ -3,17 +3,13 @@ use std::ops::{Deref, DerefMut, Drop}; use std::sync::Arc; use chrono::Utc; -use p2panda_core::Hash; use p2panda_core::{ Body, Header, cbor::{decode_cbor, encode_cbor}, }; -use p2panda_net::{ - Network, TopicId, - streams::{EphemeralStream, EventuallyConsistentStream}, -}; +use p2panda_net::{TopicId, gossip::GossipHandle}; use p2panda_stream::IngestExt; -use p2panda_sync::protocols::topic_log_sync::TopicLogSyncEvent; +use p2panda_sync::TopicLogSyncEvent; use tokio::{ sync::{RwLock, mpsc}, task::{AbortHandle, spawn}, @@ -23,14 +19,16 @@ use tracing::{error, info, warn}; use crate::author_tracker::{AuthorMessage, AuthorTracker}; use crate::ephemerial_operation::EphemerialOperation; +use crate::network::Network; +use crate::network::SyncHandle; use crate::node_inner::MessageType; -use crate::node_inner::{NodeInner, TopicSyncManager}; +use crate::node_inner::NodeInner; use crate::operation::{LogType, ReflectionExtensions}; use crate::topic::{SubscribableTopic, TopicError}; pub struct SubscriptionInner { - ephemeral_tx: RwLock>, - tx: RwLock>>, + ephemeral_tx: RwLock>, + tx: RwLock>, pub(crate) node: Arc, pub(crate) id: TopicId, pub(crate) subscribable_topic: Arc, @@ -246,18 +244,14 @@ impl SubscriptionInner { // FIXME: return errors async fn setup_network( node: &Arc, - network: &Network, + network: &Network, id: TopicId, subscribable_topic: &Arc, author_tracker: &Arc>, -) -> ( - Option>, - Option, - Vec, -) { +) -> (Option, Option, Vec) { let mut abort_handles = Vec::with_capacity(3); - let stream = match network.stream(id, true).await { + let stream = match network.log_sync.stream(id, true).await { Ok(result) => result, Err(error) => { warn!( @@ -275,7 +269,7 @@ async fn setup_network( mpsc::channel::<(Header, Option, Vec)>(128); let abort_handle = spawn(async move { - while let Ok(event) = topic_rx.recv().await { + while let Some(Ok(event)) = topic_rx.next().await { match event.event() { TopicLogSyncEvent::Operation(operation) => { match validate_and_unpack(operation.as_ref().to_owned(), id) { @@ -297,13 +291,8 @@ async fn setup_network( abort_handles.push(abort_handle); - // Generate a different id than the eventually consistent streams to avoid collisions. - // - // @TODO(adz): We want to throw an error if users try to subscribe with the same id across - // different streams. - let ephemeral_id = Hash::new(id); - let ephemeral_stream = network.ephemeral_stream(ephemeral_id.into()).await.unwrap(); - let mut ephemeral_rx = ephemeral_stream.subscribe().await.unwrap(); + let ephemeral_stream = network.gossip.stream(id).await.unwrap(); + let mut ephemeral_rx = ephemeral_stream.subscribe(); let ephemeral_tx = ephemeral_stream; author_tracker.set_topic_tx(Some(ephemeral_tx)).await; @@ -311,7 +300,14 @@ async fn setup_network( let author_tracker_clone = author_tracker.clone(); let subscribable_topic_clone = subscribable_topic.clone(); let abort_handle = spawn(async move { - while let Ok(bytes) = ephemeral_rx.recv().await { + while let Some(bytes) = ephemeral_rx.next().await { + let bytes = match bytes { + Ok(bytes) => bytes, + Err(error) => { + error!("Error while reciving ephemeral message: {error}"); + continue; + } + }; match decode_cbor(&bytes[..]) { Ok(MessageType::Ephemeral(operation)) => { if let Some((author, body)) = operation.validate_and_unpack() { @@ -400,8 +396,7 @@ async fn setup_network( info!("Network subscription set up for topic {}", hex::encode(id)); - let ephemeral_id = Hash::new(id); - let ephemeral_tx = network.ephemeral_stream(ephemeral_id.into()).await.unwrap(); + let ephemeral_tx = network.gossip.stream(id).await.unwrap(); (Some(topic_tx), Some(ephemeral_tx), abort_handles) } @@ -409,8 +404,8 @@ async fn setup_network( async fn teardown_network( id: &TopicId, author_tracker: &Arc>, - tx: Option>, - ephemeral_tx: Option, + tx: Option, + ephemeral_tx: Option, abort_handles: Vec, ) { for handle in abort_handles { @@ -419,27 +414,14 @@ async fn teardown_network( author_tracker.set_topic_tx(None).await; - if let Some(ephemeral_tx) = ephemeral_tx - && let Err(error) = ephemeral_tx.close() - { - error!( - "Failed to tear down ephemeral channel for topic {}: {error}", - hex::encode(id) - ); - } - - if let Some(tx) = tx { - if let Err(error) = tx.close() { - error!( - "Failed to tear down persistent channel for topic {}: {error}", - hex::encode(id) - ); - } + if tx.is_some() { info!( "Network subscription torn down for topic {}", hex::encode(id) ); } + drop(tx); + drop(ephemeral_tx); } type OperationWithRawHeader = (Header, Option, Vec); diff --git a/reflection-node/src/topic.rs b/reflection-node/src/topic.rs index c2319d0..d7a5202 100644 --- a/reflection-node/src/topic.rs +++ b/reflection-node/src/topic.rs @@ -1,21 +1,15 @@ use std::sync::Arc; -use crate::operation::ReflectionExtensions; +use crate::network::SyncHandleError; use crate::operation_store::CreationError; use crate::subscription_inner::SubscriptionInner; -use p2panda_core::{Operation, PublicKey}; -use p2panda_net::streams::StreamError; +use p2panda_core::PublicKey; +use p2panda_net::gossip::GossipHandleError; use thiserror::Error; use tokio::task::{AbortHandle, JoinError}; use tracing::info; -impl From>> for TopicError { - fn from(value: StreamError>) -> Self { - TopicError::Publish(Box::new(value)) - } -} - #[derive(Debug, Error)] pub enum TopicError { #[error(transparent)] @@ -25,10 +19,9 @@ pub enum TopicError { #[error(transparent)] Encode(#[from] p2panda_core::cbor::EncodeError), #[error(transparent)] - // FIXME: The error is huge so but it into a Box - Publish(Box>>), + Publish(#[from] SyncHandleError), #[error(transparent)] - PublishEphemeral(#[from] StreamError>), + PublishEphemeral(#[from] GossipHandleError), #[error(transparent)] Runtime(#[from] JoinError), } diff --git a/reflection-node/src/topic_store.rs b/reflection-node/src/topic_store.rs index 6d33a9b..9948368 100644 --- a/reflection-node/src/topic_store.rs +++ b/reflection-node/src/topic_store.rs @@ -5,7 +5,7 @@ use chrono::{DateTime, Utc}; use p2panda_core::PublicKey; use p2panda_net::TopicId; use p2panda_store::LogStore; -use p2panda_sync::{log_sync::Logs, topic_log_sync::TopicLogMap}; +use p2panda_sync::{Logs, traits::TopicLogMap}; use serde::{Deserialize, Serialize}; use sqlx::{FromRow, Row}; use tracing::error; From 799f9603d32a7fa7ff2ce31f01c749a20fc85e02 Mon Sep 17 00:00:00 2001 From: Sam Andreae Date: Mon, 26 Jan 2026 11:17:31 +0000 Subject: [PATCH 2/6] Bump all p2panda crates to v0.5.0 --- Cargo.lock | 33 ++++++++++++++--------- reflection-node/Cargo.toml | 12 ++++----- reflection-node/src/network.rs | 13 ++------- reflection-node/src/subscription_inner.rs | 9 ++++--- reflection-node/src/topic.rs | 14 +++++++--- reflection-node/src/topic_store.rs | 5 ++-- 6 files changed, 47 insertions(+), 39 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 7626ff3..a1b079d 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2592,7 +2592,8 @@ dependencies = [ [[package]] name = "iroh-gossip" version = "0.95.0" -source = "git+https://github.com/p2panda/iroh-gossip?rev=6fad0f740c031876dbb412c7b8237776314d763e#6fad0f740c031876dbb412c7b8237776314d763e" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "026dd31b487ec5e80ac0240f4eb70cd6c0a2800f6ef44beca5329443c194bb22" dependencies = [ "blake3", "bytes", @@ -3685,8 +3686,9 @@ dependencies = [ [[package]] name = "p2panda-core" -version = "0.4.0" -source = "git+https://github.com/p2panda/p2panda?rev=d9bff24ce2805302fcbc6554fbaba51b278e918c#d9bff24ce2805302fcbc6554fbaba51b278e918c" +version = "0.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3f4d6c893ad3c2733cef96f8732dc129942794d0643a1882032ee3af964af608" dependencies = [ "blake3", "ciborium", @@ -3700,8 +3702,9 @@ dependencies = [ [[package]] name = "p2panda-discovery" -version = "0.4.0" -source = "git+https://github.com/p2panda/p2panda?rev=d9bff24ce2805302fcbc6554fbaba51b278e918c#d9bff24ce2805302fcbc6554fbaba51b278e918c" +version = "0.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fd89b9dff73d36c4e8aad9cb5c4cbce222f54064c06e861eda1b9e50346b8535" dependencies = [ "blake3", "futures-util", @@ -3714,8 +3717,9 @@ dependencies = [ [[package]] name = "p2panda-net" -version = "0.4.0" -source = "git+https://github.com/p2panda/p2panda?rev=d9bff24ce2805302fcbc6554fbaba51b278e918c#d9bff24ce2805302fcbc6554fbaba51b278e918c" +version = "0.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c5c13c7f545454292d5f1ca66bad82d5035ede00710ac516c2d57cccbb7f18b2" dependencies = [ "ciborium", "futures-channel", @@ -3741,8 +3745,9 @@ dependencies = [ [[package]] name = "p2panda-store" -version = "0.4.0" -source = "git+https://github.com/p2panda/p2panda?rev=d9bff24ce2805302fcbc6554fbaba51b278e918c#d9bff24ce2805302fcbc6554fbaba51b278e918c" +version = "0.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "19fe570753376178f807a548041deddea836d06b391e1d9eb82b4282e1f83ec0" dependencies = [ "ciborium", "hex", @@ -3754,8 +3759,9 @@ dependencies = [ [[package]] name = "p2panda-stream" -version = "0.4.0" -source = "git+https://github.com/p2panda/p2panda?rev=d9bff24ce2805302fcbc6554fbaba51b278e918c#d9bff24ce2805302fcbc6554fbaba51b278e918c" +version = "0.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fd512da2ee180ab571acf10cf2dd0410c211cd6f1bcba4478513c1caf03b716a" dependencies = [ "ciborium", "futures-channel", @@ -3769,8 +3775,9 @@ dependencies = [ [[package]] name = "p2panda-sync" -version = "0.4.0" -source = "git+https://github.com/p2panda/p2panda?rev=d9bff24ce2805302fcbc6554fbaba51b278e918c#d9bff24ce2805302fcbc6554fbaba51b278e918c" +version = "0.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "84093572a6bf473fad4809813ebb7a5395eaa0144af806bc85ab65a24019bdc3" dependencies = [ "futures", "futures-util", diff --git a/reflection-node/Cargo.toml b/reflection-node/Cargo.toml index 26a191a..63fe53c 100644 --- a/reflection-node/Cargo.toml +++ b/reflection-node/Cargo.toml @@ -12,12 +12,12 @@ authors = [ thiserror = "2.0.17" chrono = "0.4.42" ciborium = "0.2.2" -p2panda-core = { git = "https://github.com/p2panda/p2panda", rev = "d9bff24ce2805302fcbc6554fbaba51b278e918c" } -p2panda-discovery = { git = "https://github.com/p2panda/p2panda", rev = "d9bff24ce2805302fcbc6554fbaba51b278e918c" } -p2panda-net = { git = "https://github.com/p2panda/p2panda", rev = "d9bff24ce2805302fcbc6554fbaba51b278e918c" } -p2panda-store = { git = "https://github.com/p2panda/p2panda", rev = "d9bff24ce2805302fcbc6554fbaba51b278e918c", features = ["sqlite"], default-features = false } -p2panda-stream = { git = "https://github.com/p2panda/p2panda", rev = "d9bff24ce2805302fcbc6554fbaba51b278e918c" } -p2panda-sync = { git = "https://github.com/p2panda/p2panda", rev = "d9bff24ce2805302fcbc6554fbaba51b278e918c" } +p2panda-core = { version = "0.5.0" } +p2panda-discovery = { version = "0.5.0" } +p2panda-net = { version = "0.5.0" } +p2panda-store = { version = "0.5.0", features = ["sqlite"], default-features = false } +p2panda-stream = { version = "0.5.0" } +p2panda-sync = { version = "0.5.0" } serde = { version = "1.0.228", features = ["derive"] } serde_bytes = "0.11.19" sqlx = { version = "0.8.6", features = ["runtime-tokio", "sqlite", "chrono"], default-features = false } diff --git a/reflection-node/src/network.rs b/reflection-node/src/network.rs index 4ed6fe4..03e4cdc 100644 --- a/reflection-node/src/network.rs +++ b/reflection-node/src/network.rs @@ -6,10 +6,10 @@ use tracing::error; use p2panda_core::Hash; use p2panda_core::PrivateKey; use p2panda_net::address_book::{AddressBook, AddressBookError}; +use p2panda_net::addrs::NodeInfo; use p2panda_net::gossip::{Gossip, GossipError}; use p2panda_net::iroh_endpoint::{Endpoint, EndpointError}; use p2panda_net::iroh_mdns::{MdnsDiscovery, MdnsDiscoveryError, MdnsDiscoveryMode}; -use p2panda_net::{TopicId, addrs::NodeInfo}; use crate::operation::ReflectionExtensions; use crate::operation_store::OperationStore; @@ -31,22 +31,13 @@ static BOOTSTRAP_NODE: LazyLock = LazyLock::new(|| { NodeInfo::from(endpoint_addr).bootstrap() }); -type TopicSyncManager = p2panda_sync::manager::TopicSyncManager< - TopicId, - p2panda_store::SqliteStore, - TopicStore, - LogId, - ReflectionExtensions, ->; pub type LogSync = p2panda_net::sync::LogSync< p2panda_store::SqliteStore, LogId, ReflectionExtensions, TopicStore, >; -pub type LogSyncError = p2panda_net::sync::LogSyncError; -pub type SyncHandle = p2panda_net::sync::SyncHandle; -pub type SyncHandleError = p2panda_net::sync::SyncHandleError; +pub type LogSyncError = p2panda_net::sync::LogSyncError; #[derive(Error, Debug)] pub enum NetworkError { diff --git a/reflection-node/src/subscription_inner.rs b/reflection-node/src/subscription_inner.rs index 8c59540..be8b7f6 100644 --- a/reflection-node/src/subscription_inner.rs +++ b/reflection-node/src/subscription_inner.rs @@ -3,13 +3,14 @@ use std::ops::{Deref, DerefMut, Drop}; use std::sync::Arc; use chrono::Utc; +use p2panda_core::Operation; use p2panda_core::{ Body, Header, cbor::{decode_cbor, encode_cbor}, }; use p2panda_net::{TopicId, gossip::GossipHandle}; use p2panda_stream::IngestExt; -use p2panda_sync::TopicLogSyncEvent; +use p2panda_sync::protocols::TopicLogSyncEvent as Event; use tokio::{ sync::{RwLock, mpsc}, task::{AbortHandle, spawn}, @@ -20,12 +21,14 @@ use tracing::{error, info, warn}; use crate::author_tracker::{AuthorMessage, AuthorTracker}; use crate::ephemerial_operation::EphemerialOperation; use crate::network::Network; -use crate::network::SyncHandle; use crate::node_inner::MessageType; use crate::node_inner::NodeInner; use crate::operation::{LogType, ReflectionExtensions}; use crate::topic::{SubscribableTopic, TopicError}; +pub type SyncHandle = + p2panda_net::sync::SyncHandle, Event>; + pub struct SubscriptionInner { ephemeral_tx: RwLock>, tx: RwLock>, @@ -271,7 +274,7 @@ async fn setup_network( let abort_handle = spawn(async move { while let Some(Ok(event)) = topic_rx.next().await { match event.event() { - TopicLogSyncEvent::Operation(operation) => { + Event::Operation(operation) => { match validate_and_unpack(operation.as_ref().to_owned(), id) { Ok(data) => { persistent_tx.send(data).await.unwrap(); diff --git a/reflection-node/src/topic.rs b/reflection-node/src/topic.rs index d7a5202..cb824fe 100644 --- a/reflection-node/src/topic.rs +++ b/reflection-node/src/topic.rs @@ -1,15 +1,21 @@ use std::sync::Arc; -use crate::network::SyncHandleError; +use crate::operation::ReflectionExtensions; use crate::operation_store::CreationError; use crate::subscription_inner::SubscriptionInner; -use p2panda_core::PublicKey; -use p2panda_net::gossip::GossipHandleError; +use p2panda_core::{Operation, PublicKey}; +use p2panda_sync::protocols::TopicLogSyncEvent; use thiserror::Error; +use tokio::sync::mpsc; use tokio::task::{AbortHandle, JoinError}; use tracing::info; +pub type SyncHandleError = p2panda_net::sync::SyncHandleError< + Operation, + TopicLogSyncEvent, +>; + #[derive(Debug, Error)] pub enum TopicError { #[error(transparent)] @@ -21,7 +27,7 @@ pub enum TopicError { #[error(transparent)] Publish(#[from] SyncHandleError), #[error(transparent)] - PublishEphemeral(#[from] GossipHandleError), + PublishEphemeral(#[from] mpsc::error::SendError>), #[error(transparent)] Runtime(#[from] JoinError), } diff --git a/reflection-node/src/topic_store.rs b/reflection-node/src/topic_store.rs index 9948368..b757abf 100644 --- a/reflection-node/src/topic_store.rs +++ b/reflection-node/src/topic_store.rs @@ -5,7 +5,8 @@ use chrono::{DateTime, Utc}; use p2panda_core::PublicKey; use p2panda_net::TopicId; use p2panda_store::LogStore; -use p2panda_sync::{Logs, traits::TopicLogMap}; +use p2panda_sync::protocols::Logs; +use p2panda_sync::traits::TopicMap; use serde::{Deserialize, Serialize}; use sqlx::{FromRow, Row}; use tracing::error; @@ -238,7 +239,7 @@ impl LogId { } } -impl TopicLogMap for TopicStore { +impl TopicMap> for TopicStore { type Error = sqlx::Error; async fn get(&self, topic: &TopicId) -> Result, Self::Error> { From a3b0d35cc5bcbc4831ca27aa230ea9e9f1a8a3bb Mon Sep 17 00:00:00 2001 From: Sam Andreae Date: Mon, 26 Jan 2026 11:21:01 +0000 Subject: [PATCH 3/6] Patch required iroh-gossip version in Cargo.toml --- reflection-node/Cargo.toml | 3 +++ 1 file changed, 3 insertions(+) diff --git a/reflection-node/Cargo.toml b/reflection-node/Cargo.toml index 63fe53c..12ade4f 100644 --- a/reflection-node/Cargo.toml +++ b/reflection-node/Cargo.toml @@ -28,3 +28,6 @@ test-log = { version = "0.2.18", default-features = false, features = ["trace", hex = "0.4.3" rand_chacha = { version = "0.9.0", features = ["os_rng"] } iroh = "0.95.1" + +[patch.crates-io] +iroh-gossip = { git = "https://github.com/p2panda/iroh-gossip", rev = "533c34a2758518ece19c1de9f21bc40d61f9b5a5" } \ No newline at end of file From e46fe74b71d636a8c0b36a5dbb692ae015118ab3 Mon Sep 17 00:00:00 2001 From: Sam Andreae Date: Tue, 27 Jan 2026 12:45:48 +0000 Subject: [PATCH 4/6] Update bootstrap node id --- reflection-node/src/network.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/reflection-node/src/network.rs b/reflection-node/src/network.rs index 03e4cdc..2938037 100644 --- a/reflection-node/src/network.rs +++ b/reflection-node/src/network.rs @@ -23,7 +23,7 @@ static RELAY_URL: LazyLock = LazyLock::new(|| { static BOOTSTRAP_NODE: LazyLock = LazyLock::new(|| { let endpoint_addr = iroh::EndpointAddr::new( - "7ccdbeed587a8ec8c71cdc9b98e941ac597e11b0216aac1387ef81089a4930b2" + "9f63a15ab95959a992af96bf72fbc3e7dc98eeb4799f788bb07b20125053e795" .parse() .expect("valid bootstrap node id"), ) From a2bbd345131811308285b1a8d37093b9c2721a9e Mon Sep 17 00:00:00 2001 From: Sam Andreae Date: Wed, 28 Jan 2026 10:07:06 +0000 Subject: [PATCH 5/6] Instantiate Discovery service when building network --- reflection-node/src/network.rs | 12 ++++++++++++ 1 file changed, 12 insertions(+) diff --git a/reflection-node/src/network.rs b/reflection-node/src/network.rs index 2938037..212e572 100644 --- a/reflection-node/src/network.rs +++ b/reflection-node/src/network.rs @@ -1,5 +1,7 @@ use std::sync::LazyLock; +use p2panda_net::Discovery; +use p2panda_net::discovery::DiscoveryError; use thiserror::Error; use tracing::error; @@ -50,12 +52,15 @@ pub enum NetworkError { #[error(transparent)] MdnsDiscovery(#[from] MdnsDiscoveryError), #[error(transparent)] + Discovery(#[from] DiscoveryError), + #[error(transparent)] Endpoint(#[from] EndpointError), } #[allow(dead_code)] pub struct Network { pub(crate) mdns_discovery: MdnsDiscovery, + pub(crate) discovery: Discovery, pub(crate) gossip: Gossip, pub(crate) log_sync: LogSync, pub(crate) endpoint: Endpoint, @@ -92,9 +97,15 @@ impl Network { .mode(MdnsDiscoveryMode::Active) .spawn() .await?; + + let discovery = Discovery::builder(address_book.clone(), endpoint.clone()) + .spawn() + .await?; + let gossip = Gossip::builder(address_book.clone(), endpoint.clone()) .spawn() .await?; + let log_sync = LogSync::builder( operation_store.clone_inner(), topic_store.clone(), @@ -106,6 +117,7 @@ impl Network { Ok(Network { mdns_discovery, + discovery, gossip, log_sync, endpoint, From 187fa6a5db7d69b2e1433db06875b21758907e0d Mon Sep 17 00:00:00 2001 From: Sam Andreae Date: Wed, 28 Jan 2026 11:59:23 +0000 Subject: [PATCH 6/6] Move iroh-gossip patch into workspace Cargo.toml --- Cargo.toml | 3 +++ reflection-node/Cargo.toml | 5 +---- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 9a8dcb2..5c76ead 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,3 +1,6 @@ [workspace] resolver = "2" members = ["reflection-app", "reflection-doc", "reflection-node"] + +[patch.crates-io] +iroh-gossip = { git = "https://github.com/p2panda/iroh-gossip", rev = "533c34a2758518ece19c1de9f21bc40d61f9b5a5" } \ No newline at end of file diff --git a/reflection-node/Cargo.toml b/reflection-node/Cargo.toml index 12ade4f..668527b 100644 --- a/reflection-node/Cargo.toml +++ b/reflection-node/Cargo.toml @@ -27,7 +27,4 @@ tracing = "0.1" test-log = { version = "0.2.18", default-features = false, features = ["trace", "color"] } hex = "0.4.3" rand_chacha = { version = "0.9.0", features = ["os_rng"] } -iroh = "0.95.1" - -[patch.crates-io] -iroh-gossip = { git = "https://github.com/p2panda/iroh-gossip", rev = "533c34a2758518ece19c1de9f21bc40d61f9b5a5" } \ No newline at end of file +iroh = "0.95.1" \ No newline at end of file