Skip to content

Conversation

@jsparber
Copy link
Collaborator

No description provided.

@jsparber jsparber force-pushed the jsparber/new_p2panda branch from 23c36d4 to 3f9f4a6 Compare January 19, 2026 16:52
@jsparber jsparber force-pushed the jsparber/new_p2panda branch from 3f9f4a6 to dad8f49 Compare January 19, 2026 17:28
@jsparber jsparber force-pushed the jsparber/new_p2panda branch from dad8f49 to 1114f61 Compare January 20, 2026 17:14

let abort_handle = spawn(async move {
while let Ok(event) = topic_rx.recv().await {
while let Some(Ok(event)) = topic_rx.next().await {
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What's the error that can happen on each item of the stream? Is the error something persistent, or can we just go to the next item in the stream when we see an error?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same thing for the GossipSubscription

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looking at the code GossipSubscription error is only https://docs.rs/tokio-stream/latest/tokio_stream/wrappers/errors/enum.BroadcastStreamRecvError.html and it will be fine to keep reading after an error. I think the item stream should return the specific error not an enum of all errors.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Now looked also at the SyncSubscription is the same as the GossipSubscription. So the only error that can happen is https://docs.rs/tokio-stream/latest/tokio_stream/wrappers/errors/enum.BroadcastStreamRecvError.html. I wonder if that error couldn't be handled internally.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Now looked also at the SyncSubscription is the same as the GossipSubscription. So the only error that can happen is https://docs.rs/tokio-stream/latest/tokio_stream/wrappers/errors/enum.BroadcastStreamRecvError.html. I wonder if that error couldn't be handled internally.

I'm actually touching on this area for my work to remove the unwanted trait leakage from p2panda-sync. I will make it so that BroadcastStreamReccError is returned from the sync subscription. I could do the same with gossip as well.

What did you mean by handling the error internally though?

Copy link
Member

@adzialocha adzialocha Jan 21, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The error indicates that messages have been dropped, which I believe is important to know, even though the user doesn't have a chance to recover from this in the gossip case?

However, this is an issue with this particular mpmc channel implementation of tokio. We could switch to flume or something else which offers bounded channels who will instead apply backpressure rather than dropping items. I believe we can then also hide the https://docs.rs/flume/latest/flume/enum.RecvError.html error since it'll be equal to a Stream termination (yielding None). Looks like they implement it exactly like that: https://docs.rs/flume/latest/src/flume/async.rs.html#573

Copy link
Collaborator Author

@jsparber jsparber Jan 21, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What did you mean by handling the error internally though?

I mean not exposing the error to the consumer at all, and make sure internally that it doesn't happen or that it's handled. For the consumer it's quite unclear how to handle the error, is there a way to request missed operations because of the slow read?

@mycognosist
Copy link
Collaborator

Here's an issue @sandreae and I found while testing this together:

  • We moved the patch into the workspace root Cargo.toml and then tested Reflection
    • Initial sync and updates work for a new doc
    • Then one peer exits the doc (back to the main screen) and opens it again
    • No connection ever occurs...
    • We think this might have something to do with some state not being dropped
    • We believe that the network should be torn down when you leave a doc, and recreated when you join one
    • This behaviour does not occur on the current Flatpak version of Reflection

Any ideas, @jsparber ?

@jsparber
Copy link
Collaborator Author

  • We believe that the network should be torn down when you leave a doc, and recreated when you join one

Could make sense even though it might make entering a document slower. Anyhow, this doesn't seam relevant here.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants