From c010d98a3b09af4f04ec2f22052bd5bf11331211 Mon Sep 17 00:00:00 2001 From: not-matthias Date: Tue, 20 Jan 2026 18:01:07 +0100 Subject: [PATCH 1/4] fix(runner): dont run health check in hot path Running the perf ping everytime before we receive a command puts unnecessary load on the other end, and also has the chance of failing to receive an answer in time. --- src/executor/shared/fifo.rs | 153 +++++++++++++++++++----------------- 1 file changed, 79 insertions(+), 74 deletions(-) diff --git a/src/executor/shared/fifo.rs b/src/executor/shared/fifo.rs index 56d1c525..e769f81c 100644 --- a/src/executor/shared/fifo.rs +++ b/src/executor/shared/fifo.rs @@ -145,89 +145,94 @@ impl RunnerFifo { }; let mut benchmark_started = false; - loop { - let is_alive = health_check().await?; - if !is_alive { - break; - } - let result = tokio::time::timeout(Duration::from_secs(1), self.recv_cmd()).await; - let cmd = match result { - Ok(Ok(cmd)) => cmd, - Ok(Err(e)) => { - warn!("Failed to parse FIFO command: {e}"); - break; + // Outer loop: continues until health check fails + loop { + // Inner loop: process commands until timeout/error + loop { + let result = tokio::time::timeout(Duration::from_secs(1), self.recv_cmd()).await; + let cmd = match result { + Ok(Ok(cmd)) => cmd, + Ok(Err(e)) => { + warn!("Failed to parse FIFO command: {e}"); + break; + } + Err(_) => break, // Timeout + }; + trace!("Received command: {cmd:?}"); + + // Try executor-specific handler first + if let Some(response) = handle_cmd(&cmd).await? { + self.send_cmd(response).await?; + continue; } - Err(_) => continue, - }; - trace!("Received command: {cmd:?}"); - - // Try executor-specific handler first - if let Some(response) = handle_cmd(&cmd).await? { - self.send_cmd(response).await?; - continue; - } - // Fall through to shared implementation for standard commands - match &cmd { - FifoCommand::CurrentBenchmark { pid, uri } => { - bench_order_by_timestamp.push((current_time(), uri.to_string())); - bench_pids.insert(*pid); - self.send_cmd(FifoCommand::Ack).await?; - } - FifoCommand::StartBenchmark => { - if !benchmark_started { - benchmark_started = true; - markers.push(MarkerType::SampleStart(current_time())); - } else { - warn!("Received duplicate StartBenchmark command, ignoring"); + // Fall through to shared implementation for standard commands + match &cmd { + FifoCommand::CurrentBenchmark { pid, uri } => { + bench_order_by_timestamp.push((current_time(), uri.to_string())); + bench_pids.insert(*pid); + self.send_cmd(FifoCommand::Ack).await?; } - self.send_cmd(FifoCommand::Ack).await?; - } - FifoCommand::StopBenchmark => { - if benchmark_started { - benchmark_started = false; - markers.push(MarkerType::SampleEnd(current_time())); - } else { - warn!("Received StopBenchmark command before StartBenchmark, ignoring"); + FifoCommand::StartBenchmark => { + if !benchmark_started { + benchmark_started = true; + markers.push(MarkerType::SampleStart(current_time())); + } else { + warn!("Received duplicate StartBenchmark command, ignoring"); + } + self.send_cmd(FifoCommand::Ack).await?; } - self.send_cmd(FifoCommand::Ack).await?; - } - FifoCommand::SetIntegration { name, version } => { - integration = Some((name.into(), version.into())); - self.send_cmd(FifoCommand::Ack).await?; - } - FifoCommand::AddMarker { marker, .. } => { - markers.push(*marker); - self.send_cmd(FifoCommand::Ack).await?; - } - FifoCommand::SetVersion(protocol_version) => { - match protocol_version.cmp(&runner_shared::fifo::CURRENT_PROTOCOL_VERSION) { - Ordering::Less => { - if *protocol_version - < runner_shared::fifo::MINIMAL_SUPPORTED_PROTOCOL_VERSION - { - bail!( - "Integration is using a version of the protocol that is smaller than the minimal supported protocol version ({protocol_version} < {}). \ - Please update the integration to a supported version.", - runner_shared::fifo::MINIMAL_SUPPORTED_PROTOCOL_VERSION - ); - } - self.send_cmd(FifoCommand::Ack).await?; + FifoCommand::StopBenchmark => { + if benchmark_started { + benchmark_started = false; + markers.push(MarkerType::SampleEnd(current_time())); + } else { + warn!("Received StopBenchmark command before StartBenchmark, ignoring"); } - Ordering::Greater => bail!( - "Runner is using an incompatible protocol version ({} < {protocol_version}). Please update the runner to the latest version.", - runner_shared::fifo::CURRENT_PROTOCOL_VERSION - ), - Ordering::Equal => { - self.send_cmd(FifoCommand::Ack).await?; + self.send_cmd(FifoCommand::Ack).await?; + } + FifoCommand::SetIntegration { name, version } => { + integration = Some((name.into(), version.into())); + self.send_cmd(FifoCommand::Ack).await?; + } + FifoCommand::AddMarker { marker, .. } => { + markers.push(*marker); + self.send_cmd(FifoCommand::Ack).await?; + } + FifoCommand::SetVersion(protocol_version) => { + match protocol_version.cmp(&runner_shared::fifo::CURRENT_PROTOCOL_VERSION) { + Ordering::Less => { + if *protocol_version + < runner_shared::fifo::MINIMAL_SUPPORTED_PROTOCOL_VERSION + { + bail!( + "Integration is using a version of the protocol that is smaller than the minimal supported protocol version ({protocol_version} < {}). \ + Please update the integration to a supported version.", + runner_shared::fifo::MINIMAL_SUPPORTED_PROTOCOL_VERSION + ); + } + self.send_cmd(FifoCommand::Ack).await?; + } + Ordering::Greater => bail!( + "Runner is using an incompatible protocol version ({} < {protocol_version}). Please update the runner to the latest version.", + runner_shared::fifo::CURRENT_PROTOCOL_VERSION + ), + Ordering::Equal => { + self.send_cmd(FifoCommand::Ack).await?; + } } } + _ => { + warn!("Unhandled FIFO command: {cmd:?}"); + self.send_cmd(FifoCommand::Err).await?; + } } - _ => { - warn!("Unhandled FIFO command: {cmd:?}"); - self.send_cmd(FifoCommand::Err).await?; - } + } + + let is_alive = health_check().await?; + if !is_alive { + break; } } From 9f8122986d7b8c8abb4c24e11e3c80072cc4b88d Mon Sep 17 00:00:00 2001 From: not-matthias Date: Wed, 21 Jan 2026 18:51:09 +0100 Subject: [PATCH 2/4] chore(runner): add cancel-safety test for FIFO --- src/executor/shared/fifo.rs | 50 ++++++++++++++++++++++++++++++++++--- 1 file changed, 46 insertions(+), 4 deletions(-) diff --git a/src/executor/shared/fifo.rs b/src/executor/shared/fifo.rs index e769f81c..36b87f85 100644 --- a/src/executor/shared/fifo.rs +++ b/src/executor/shared/fifo.rs @@ -84,11 +84,15 @@ fn get_pipe_open_options() -> TokioPipeOpenOptions { impl RunnerFifo { pub fn new() -> anyhow::Result { - create_fifo(RUNNER_CTL_FIFO)?; - create_fifo(RUNNER_ACK_FIFO)?; + Self::open(RUNNER_CTL_FIFO.as_ref(), RUNNER_ACK_FIFO.as_ref()) + } + + pub fn open(ctl_path: &Path, ack_path: &Path) -> anyhow::Result { + create_fifo(ctl_path)?; + create_fifo(ack_path)?; - let ack_fifo = get_pipe_open_options().open_sender(RUNNER_ACK_FIFO)?; - let ctl_fifo = get_pipe_open_options().open_receiver(RUNNER_CTL_FIFO)?; + let ack_fifo = get_pipe_open_options().open_sender(ack_path)?; + let ctl_fifo = get_pipe_open_options().open_receiver(ctl_path)?; Ok(Self { ctl_fifo, ack_fifo }) } @@ -245,3 +249,41 @@ impl RunnerFifo { Ok((marker_result, fifo_data)) } } + +#[cfg(test)] +mod tests { + use super::*; + use std::time::Duration; + use tokio::io::AsyncWriteExt; + + #[tokio::test] + async fn recv_cmd_is_not_cancel_safe() { + let temp_dir = tempfile::tempdir().unwrap(); + let ctl_path = temp_dir.path().join("ctl_fifo"); + let ack_path = temp_dir.path().join("ack_fifo"); + + let mut fifo = RunnerFifo::open(&ctl_path, &ack_path).unwrap(); + let mut writer = get_pipe_open_options().open_sender(&ctl_path).unwrap(); + + let cmd = FifoCommand::Ack; + let payload = bincode::serialize(&cmd).unwrap(); + let len_bytes = (payload.len() as u32).to_le_bytes(); + + tokio::spawn(async move { + writer.write_all(&len_bytes).await.unwrap(); + writer.write_all(&payload[..1]).await.unwrap(); + tokio::time::sleep(Duration::from_millis(50)).await; + writer.write_all(&payload[1..]).await.unwrap(); + }); + + let first = tokio::time::timeout(Duration::from_millis(10), fifo.recv_cmd()).await; + assert!(first.is_err(), "Expected timeout on first recv_cmd"); + + let second = tokio::time::timeout(Duration::from_millis(200), fifo.recv_cmd()).await; + + assert!( + matches!(second, Ok(Ok(FifoCommand::Ack))), + "recv_cmd should be cancel-safe: expected Ok(Ok(Ack)), got: {second:?}" + ); + } +} From e2d87f9196d3d9810d62e4010839c60bae0fb582 Mon Sep 17 00:00:00 2001 From: not-matthias Date: Wed, 21 Jan 2026 18:52:39 +0100 Subject: [PATCH 3/4] fix(runner): make FIFO cancel-safe --- src/executor/shared/fifo.rs | 39 +++++++++++++++++++++---------------- 1 file changed, 22 insertions(+), 17 deletions(-) diff --git a/src/executor/shared/fifo.rs b/src/executor/shared/fifo.rs index 36b87f85..a7eb6ea6 100644 --- a/src/executor/shared/fifo.rs +++ b/src/executor/shared/fifo.rs @@ -1,5 +1,6 @@ use crate::prelude::*; use anyhow::Context; +use futures::StreamExt; use nix::{sys::time::TimeValLike, time::clock_gettime}; use runner_shared::artifacts::ExecutionTimestamps; use runner_shared::fifo::{Command as FifoCommand, MarkerType}; @@ -7,11 +8,12 @@ use runner_shared::fifo::{RUNNER_ACK_FIFO, RUNNER_CTL_FIFO}; use std::cmp::Ordering; use std::path::{Path, PathBuf}; use std::{collections::HashSet, time::Duration}; -use tokio::io::{AsyncReadExt, AsyncWriteExt}; +use tokio::io::AsyncWriteExt; use tokio::net::unix::pid_t; use tokio::net::unix::pipe::OpenOptions as TokioPipeOpenOptions; use tokio::net::unix::pipe::Receiver as TokioPipeReader; use tokio::net::unix::pipe::Sender as TokioPipeSender; +use tokio_util::codec::{FramedRead, LengthDelimitedCodec}; fn create_fifo>(path: P) -> anyhow::Result<()> { // Remove the previous FIFO (if it exists) @@ -71,7 +73,7 @@ pub struct FifoBenchmarkData { pub struct RunnerFifo { ack_fifo: TokioPipeSender, - ctl_fifo: TokioPipeReader, + ctl_reader: FramedRead, } fn get_pipe_open_options() -> TokioPipeOpenOptions { @@ -94,24 +96,27 @@ impl RunnerFifo { let ack_fifo = get_pipe_open_options().open_sender(ack_path)?; let ctl_fifo = get_pipe_open_options().open_receiver(ctl_path)?; - Ok(Self { ctl_fifo, ack_fifo }) + let codec = LengthDelimitedCodec::builder() + .length_field_length(4) + .little_endian() + .new_codec(); + let ctl_reader = FramedRead::new(ctl_fifo, codec); + + Ok(Self { + ack_fifo, + ctl_reader, + }) } pub async fn recv_cmd(&mut self) -> anyhow::Result { - let mut len_buffer = [0u8; 4]; - self.ctl_fifo.read_exact(&mut len_buffer).await?; - let message_len = u32::from_le_bytes(len_buffer) as usize; - - let mut buffer = vec![0u8; message_len]; - loop { - if self.ctl_fifo.read_exact(&mut buffer).await.is_ok() { - break; - } - } - - let decoded = bincode::deserialize(&buffer).with_context(|| { - format!("Failed to deserialize FIFO command (len: {message_len}, data: {buffer:?})") - })?; + let bytes = self + .ctl_reader + .next() + .await + .ok_or_else(|| anyhow::anyhow!("FIFO stream closed"))??; + + let decoded = bincode::deserialize(&bytes) + .with_context(|| format!("Failed to deserialize FIFO command (data: {bytes:?})"))?; Ok(decoded) } From 90355943abf7d8c50f213257255d6c6cfca9f97e Mon Sep 17 00:00:00 2001 From: not-matthias Date: Thu, 22 Jan 2026 12:01:21 +0100 Subject: [PATCH 4/4] fix(runner): replace perf fifo check with pid check The FIFO check is very error prone and also slows down the teardown, since we're waiting for the last timeout. --- src/executor/shared/fifo.rs | 1 + src/executor/wall_time/perf/mod.rs | 50 +++++++++++++++++------------- 2 files changed, 29 insertions(+), 22 deletions(-) diff --git a/src/executor/shared/fifo.rs b/src/executor/shared/fifo.rs index a7eb6ea6..03dd9022 100644 --- a/src/executor/shared/fifo.rs +++ b/src/executor/shared/fifo.rs @@ -241,6 +241,7 @@ impl RunnerFifo { let is_alive = health_check().await?; if !is_alive { + info!("Process terminated, stopping the command handler"); break; } } diff --git a/src/executor/wall_time/perf/mod.rs b/src/executor/wall_time/perf/mod.rs index 3ea55d5d..5fcbc218 100644 --- a/src/executor/wall_time/perf/mod.rs +++ b/src/executor/wall_time/perf/mod.rs @@ -31,10 +31,7 @@ use runner_shared::metadata::PerfMetadata; use runner_shared::unwind_data::UnwindData; use std::path::Path; use std::path::PathBuf; -use std::sync::Arc; -use std::time::Duration; use std::{cell::OnceCell, collections::HashMap, process::ExitStatus}; -use tokio::sync::Mutex; mod jit_dump; mod memory_mappings; @@ -205,10 +202,12 @@ impl PerfRunner { let cmd = wrap_with_sudo(wrapped_builder)?.build(); debug!("cmd: {cmd:?}"); - let on_process_started = async |_| -> anyhow::Result<()> { + let on_process_started = async |pid| -> anyhow::Result<()> { // If we output pipedata, we do not parse the perf map during teardown yet, so we need to parse memory // maps as we receive the `CurrentBenchmark` fifo commands. - let data = Self::handle_fifo(runner_fifo, perf_fifo, self.output_pipedata).await?; + let data = + Self::handle_fifo(runner_fifo, perf_fifo, self.output_pipedata, pid as pid_t) + .await?; self.benchmark_data.set(data).unwrap_or_else(|_| { error!("Failed to set benchmark data in PerfRunner"); }); @@ -248,37 +247,44 @@ impl PerfRunner { async fn handle_fifo( mut runner_fifo: RunnerFifo, - perf_fifo: PerfFifo, + mut perf_fifo: PerfFifo, parse_memory_maps: bool, + pid: pid_t, ) -> anyhow::Result { let mut symbols_by_pid = HashMap::::new(); let mut unwind_data_by_pid = HashMap::>::new(); - let perf_fifo = Arc::new(Mutex::new(perf_fifo)); - let mut perf_ping_timeout = 5; + // The runner spawns a `bash` process, which will execute perf and the benchmark. To check if it + // terminated, we have to check if all the sub-processes terminated. + // We can't check for the `bash` process, because it will terminate after the FIFO handler has + // finished (because we're using a single-threaded runtime). + let mut sys = sysinfo::System::new(); let health_check = async || { - let perf_ping = tokio::time::timeout(Duration::from_secs(perf_ping_timeout), async { - perf_fifo.lock().await.ping().await - }) - .await; - if let Ok(Err(_)) | Err(_) = perf_ping { - debug!("Failed to ping perf FIFO, ending perf fifo loop"); - return Ok(false); - } - // Perf has started successfully, we can decrease the timeout for future pings - perf_ping_timeout = 1; + sys.refresh_processes(sysinfo::ProcessesToUpdate::All, true); + let process = sys.process(sysinfo::Pid::from_u32(pid as u32)); + + match process { + None => Ok(false), + Some(_proc) => { + let has_children = sys.processes().values().any(|p| { + p.parent() + .map(|parent_pid| parent_pid.as_u32() == pid as u32) + .unwrap_or(false) + }); - Ok(true) + Ok(has_children) + } + } }; let on_cmd = async |cmd: &FifoCommand| { #[allow(deprecated)] match cmd { FifoCommand::StartBenchmark => { - perf_fifo.lock().await.start_events().await?; + perf_fifo.start_events().await?; } FifoCommand::StopBenchmark => { - perf_fifo.lock().await.stop_events().await?; + perf_fifo.stop_events().await?; } FifoCommand::CurrentBenchmark { pid, .. } => { #[cfg(target_os = "linux")] @@ -294,7 +300,7 @@ impl PerfRunner { } } FifoCommand::PingPerf => { - if perf_fifo.lock().await.ping().await.is_err() { + if perf_fifo.ping().await.is_err() { return Ok(Some(FifoCommand::Err)); } return Ok(Some(FifoCommand::Ack));