Skip to content

DUNE-DAQ/ipm

Folders and files

NameName
Last commit message
Last commit date

Latest commit

 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 

Repository files navigation

ipm

Inter-Process Messaging

The IPM library provides the low-level library for for sending messages between DUNE DAQ processes. IPM deals with messages consisting of arrays of bytes: higher-level concepts such as object serialization/deserialization will be handled by other libraries and processes building on IPM.

IPM provides two communication patterns:

  1. Sender/Receiver, a pattern in which one sender talks to one receiver
  2. Publisher/Subscriber, a pattern in which one sender talks to zero or more receivers. Each message goes to all subscribers

Users should interact with IPM via the interfaces dunedaq::ipm::Sender, dunedaq::ipm::Receiver and dunedaq::ipm::Subscriber, which are created using the factory functions dunedaq::ipm::makeIPM(Sender|Receiver|Subscriber), which each take a string argument giving the implementation type. The currently-available implementation types all use ZeroMQ, and are:

  • ZmqSender implementing dunedaq::ipm::Sender in the sender/receiver pattern
  • ZmqReceiver implementing dunedaq::ipm::Receiver
  • ZmqPublisher implementing dunedaq::ipm::Sender in the publisher/subscriber pattern
  • ZmqSubscriber implementing dunedaq::ipm::Subscriber

Additioanlly, the CallbackAdapter class implements callback functionality for the ZmqReceiver and ZmqSubscriber class, since ZeroMQ does not have a native callback functionality. It does this by managing a thread which calls receive in a loop, calling the given function when data is returned.

Basic example of the sender/receiver pattern:

// Sender side
std::shared_ptr<dunedaq::ipm::Sender> sender=dunedaq::ipm::makeIPMSender("ZmqSender");
sender->connect_for_sends({ {"connection_string", "tcp://127.0.0.1:12345"} });
void* message= ... ;
// Last arg is send timeout
sender->send(message, message_size, std::chrono::milliseconds(10));

// Receiver side
std::shared_ptr<dunedaq::ipm::Receiver> receiver=dunedaq::ipm::makeIPMReceiver("ZmqReceiver");
receiver->connect_for_receives({ {"connection_string", "tcp://127.0.0.1:12345"} });
// Arg is receive timeout
Receiver::Response response=receiver->receive(std::chrono::milliseconds(10));
// ... do something with response.data or response.metadata

Basic example of the publisher/subscriber pattern:

// Publisher side
std::shared_ptr<dunedaq::ipm::Sender> publisher=dunedaq::ipm::makeIPMSender("ZmqPublisher");
publisher->connect_for_sends({ {"connection_string", "tcp://127.0.0.1:12345"} });
void* message= ... ;
// Third arg is send timeout; last arg is topic for subscribers to subscribe to
publisher->send(message, message_size, std::chrono::milliseconds(10), "topic");

// Subscriber side
std::shared_ptr<dunedaq::ipm::Subscriber> subscriber=dunedaq::ipm::makeIPMReceiver("ZmqSubscriber");
subscriber->connect_for_receives({ {"connection_string", "tcp://127.0.0.1:12345"} });
subscriber->subscribe("topic");
// Arg is receive timeout
Receiver::Response response=subscriber->receive(std::chrono::milliseconds(10));
// ... do something with response.data or response.metadata

More complete examples can be found in the test/plugins directory.

There is an asymmetry between send and receive, where send takes a void* and receive returns a std::vector<char>. This is a result of the fact that IPM does not own the memory being passed to send, but it does have to transfer the memory returned from receive.

API Diagram

Class Diagrams

ZeroMQ Configuration Variables

Currently, ZmqContext.hpp has two environment variables used to configure ZeroMQ within each application:

  • IPM_ZMQ_IO_THREADS: Sets the number of threads in the ZeroMQ context. ipm does not specify a default, the ZeroMQ default is 1.
  • IPM_ZMQ_MAX_SOCKETS: Set the maximum number of sockets allowed on the context. ipm uses a minimum value of 16636.

About

Inter-Process Messaging.

Topics

Resources

Stars

Watchers

Forks

Packages

No packages published

Contributors 13