Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
239 changes: 146 additions & 93 deletions src/executor/shared/fifo.rs
Original file line number Diff line number Diff line change
@@ -1,17 +1,19 @@
use crate::prelude::*;
use anyhow::Context;
use futures::StreamExt;
use nix::{sys::time::TimeValLike, time::clock_gettime};
use runner_shared::artifacts::ExecutionTimestamps;
use runner_shared::fifo::{Command as FifoCommand, MarkerType};
use runner_shared::fifo::{RUNNER_ACK_FIFO, RUNNER_CTL_FIFO};
use std::cmp::Ordering;
use std::path::{Path, PathBuf};
use std::{collections::HashSet, time::Duration};
use tokio::io::{AsyncReadExt, AsyncWriteExt};
use tokio::io::AsyncWriteExt;
use tokio::net::unix::pid_t;
use tokio::net::unix::pipe::OpenOptions as TokioPipeOpenOptions;
use tokio::net::unix::pipe::Receiver as TokioPipeReader;
use tokio::net::unix::pipe::Sender as TokioPipeSender;
use tokio_util::codec::{FramedRead, LengthDelimitedCodec};

fn create_fifo<P: AsRef<std::path::Path>>(path: P) -> anyhow::Result<()> {
// Remove the previous FIFO (if it exists)
Expand Down Expand Up @@ -71,7 +73,7 @@ pub struct FifoBenchmarkData {

pub struct RunnerFifo {
ack_fifo: TokioPipeSender,
ctl_fifo: TokioPipeReader,
ctl_reader: FramedRead<TokioPipeReader, LengthDelimitedCodec>,
}

fn get_pipe_open_options() -> TokioPipeOpenOptions {
Expand All @@ -84,30 +86,37 @@ fn get_pipe_open_options() -> TokioPipeOpenOptions {

impl RunnerFifo {
pub fn new() -> anyhow::Result<Self> {
create_fifo(RUNNER_CTL_FIFO)?;
create_fifo(RUNNER_ACK_FIFO)?;
Self::open(RUNNER_CTL_FIFO.as_ref(), RUNNER_ACK_FIFO.as_ref())
}

pub fn open(ctl_path: &Path, ack_path: &Path) -> anyhow::Result<Self> {
create_fifo(ctl_path)?;
create_fifo(ack_path)?;

let ack_fifo = get_pipe_open_options().open_sender(ack_path)?;
let ctl_fifo = get_pipe_open_options().open_receiver(ctl_path)?;

let ack_fifo = get_pipe_open_options().open_sender(RUNNER_ACK_FIFO)?;
let ctl_fifo = get_pipe_open_options().open_receiver(RUNNER_CTL_FIFO)?;
let codec = LengthDelimitedCodec::builder()
.length_field_length(4)
.little_endian()
.new_codec();
let ctl_reader = FramedRead::new(ctl_fifo, codec);

Ok(Self { ctl_fifo, ack_fifo })
Ok(Self {
ack_fifo,
ctl_reader,
})
}

pub async fn recv_cmd(&mut self) -> anyhow::Result<FifoCommand> {
let mut len_buffer = [0u8; 4];
self.ctl_fifo.read_exact(&mut len_buffer).await?;
let message_len = u32::from_le_bytes(len_buffer) as usize;

let mut buffer = vec![0u8; message_len];
loop {
if self.ctl_fifo.read_exact(&mut buffer).await.is_ok() {
break;
}
}
let bytes = self
.ctl_reader
.next()
.await
.ok_or_else(|| anyhow::anyhow!("FIFO stream closed"))??;

let decoded = bincode::deserialize(&buffer).with_context(|| {
format!("Failed to deserialize FIFO command (len: {message_len}, data: {buffer:?})")
})?;
let decoded = bincode::deserialize(&bytes)
.with_context(|| format!("Failed to deserialize FIFO command (data: {bytes:?})"))?;
Ok(decoded)
}

Expand Down Expand Up @@ -145,90 +154,96 @@ impl RunnerFifo {
};

let mut benchmark_started = false;

// Outer loop: continues until health check fails
loop {
let is_alive = health_check().await?;
if !is_alive {
break;
}
// Inner loop: process commands until timeout/error
loop {
let result = tokio::time::timeout(Duration::from_secs(1), self.recv_cmd()).await;
let cmd = match result {
Ok(Ok(cmd)) => cmd,
Ok(Err(e)) => {
warn!("Failed to parse FIFO command: {e}");
break;
}
Err(_) => break, // Timeout
};
trace!("Received command: {cmd:?}");

let result = tokio::time::timeout(Duration::from_secs(1), self.recv_cmd()).await;
let cmd = match result {
Ok(Ok(cmd)) => cmd,
Ok(Err(e)) => {
warn!("Failed to parse FIFO command: {e}");
break;
// Try executor-specific handler first
if let Some(response) = handle_cmd(&cmd).await? {
self.send_cmd(response).await?;
continue;
}
Err(_) => continue,
};
trace!("Received command: {cmd:?}");

// Try executor-specific handler first
if let Some(response) = handle_cmd(&cmd).await? {
self.send_cmd(response).await?;
continue;
}

// Fall through to shared implementation for standard commands
match &cmd {
FifoCommand::CurrentBenchmark { pid, uri } => {
bench_order_by_timestamp.push((current_time(), uri.to_string()));
bench_pids.insert(*pid);
self.send_cmd(FifoCommand::Ack).await?;
}
FifoCommand::StartBenchmark => {
if !benchmark_started {
benchmark_started = true;
markers.push(MarkerType::SampleStart(current_time()));
} else {
warn!("Received duplicate StartBenchmark command, ignoring");
// Fall through to shared implementation for standard commands
match &cmd {
FifoCommand::CurrentBenchmark { pid, uri } => {
bench_order_by_timestamp.push((current_time(), uri.to_string()));
bench_pids.insert(*pid);
self.send_cmd(FifoCommand::Ack).await?;
}
self.send_cmd(FifoCommand::Ack).await?;
}
FifoCommand::StopBenchmark => {
if benchmark_started {
benchmark_started = false;
markers.push(MarkerType::SampleEnd(current_time()));
} else {
warn!("Received StopBenchmark command before StartBenchmark, ignoring");
FifoCommand::StartBenchmark => {
if !benchmark_started {
benchmark_started = true;
markers.push(MarkerType::SampleStart(current_time()));
} else {
warn!("Received duplicate StartBenchmark command, ignoring");
}
self.send_cmd(FifoCommand::Ack).await?;
}
self.send_cmd(FifoCommand::Ack).await?;
}
FifoCommand::SetIntegration { name, version } => {
integration = Some((name.into(), version.into()));
self.send_cmd(FifoCommand::Ack).await?;
}
FifoCommand::AddMarker { marker, .. } => {
markers.push(*marker);
self.send_cmd(FifoCommand::Ack).await?;
}
FifoCommand::SetVersion(protocol_version) => {
match protocol_version.cmp(&runner_shared::fifo::CURRENT_PROTOCOL_VERSION) {
Ordering::Less => {
if *protocol_version
< runner_shared::fifo::MINIMAL_SUPPORTED_PROTOCOL_VERSION
{
bail!(
"Integration is using a version of the protocol that is smaller than the minimal supported protocol version ({protocol_version} < {}). \
Please update the integration to a supported version.",
runner_shared::fifo::MINIMAL_SUPPORTED_PROTOCOL_VERSION
);
}
self.send_cmd(FifoCommand::Ack).await?;
FifoCommand::StopBenchmark => {
if benchmark_started {
benchmark_started = false;
markers.push(MarkerType::SampleEnd(current_time()));
} else {
warn!("Received StopBenchmark command before StartBenchmark, ignoring");
}
Ordering::Greater => bail!(
"Runner is using an incompatible protocol version ({} < {protocol_version}). Please update the runner to the latest version.",
runner_shared::fifo::CURRENT_PROTOCOL_VERSION
),
Ordering::Equal => {
self.send_cmd(FifoCommand::Ack).await?;
self.send_cmd(FifoCommand::Ack).await?;
}
FifoCommand::SetIntegration { name, version } => {
integration = Some((name.into(), version.into()));
self.send_cmd(FifoCommand::Ack).await?;
}
FifoCommand::AddMarker { marker, .. } => {
markers.push(*marker);
self.send_cmd(FifoCommand::Ack).await?;
}
FifoCommand::SetVersion(protocol_version) => {
match protocol_version.cmp(&runner_shared::fifo::CURRENT_PROTOCOL_VERSION) {
Ordering::Less => {
if *protocol_version
< runner_shared::fifo::MINIMAL_SUPPORTED_PROTOCOL_VERSION
{
bail!(
"Integration is using a version of the protocol that is smaller than the minimal supported protocol version ({protocol_version} < {}). \
Please update the integration to a supported version.",
runner_shared::fifo::MINIMAL_SUPPORTED_PROTOCOL_VERSION
);
}
self.send_cmd(FifoCommand::Ack).await?;
}
Ordering::Greater => bail!(
"Runner is using an incompatible protocol version ({} < {protocol_version}). Please update the runner to the latest version.",
runner_shared::fifo::CURRENT_PROTOCOL_VERSION
),
Ordering::Equal => {
self.send_cmd(FifoCommand::Ack).await?;
}
}
}
}
_ => {
warn!("Unhandled FIFO command: {cmd:?}");
self.send_cmd(FifoCommand::Err).await?;
_ => {
warn!("Unhandled FIFO command: {cmd:?}");
self.send_cmd(FifoCommand::Err).await?;
}
}
}

let is_alive = health_check().await?;
if !is_alive {
info!("Process terminated, stopping the command handler");
break;
}
}

let marker_result = ExecutionTimestamps::new(&bench_order_by_timestamp, &markers);
Expand All @@ -240,3 +255,41 @@ impl RunnerFifo {
Ok((marker_result, fifo_data))
}
}

#[cfg(test)]
mod tests {
use super::*;
use std::time::Duration;
use tokio::io::AsyncWriteExt;

#[tokio::test]
async fn recv_cmd_is_not_cancel_safe() {
let temp_dir = tempfile::tempdir().unwrap();
let ctl_path = temp_dir.path().join("ctl_fifo");
let ack_path = temp_dir.path().join("ack_fifo");

let mut fifo = RunnerFifo::open(&ctl_path, &ack_path).unwrap();
let mut writer = get_pipe_open_options().open_sender(&ctl_path).unwrap();

let cmd = FifoCommand::Ack;
let payload = bincode::serialize(&cmd).unwrap();
let len_bytes = (payload.len() as u32).to_le_bytes();

tokio::spawn(async move {
writer.write_all(&len_bytes).await.unwrap();
writer.write_all(&payload[..1]).await.unwrap();
tokio::time::sleep(Duration::from_millis(50)).await;
writer.write_all(&payload[1..]).await.unwrap();
});

let first = tokio::time::timeout(Duration::from_millis(10), fifo.recv_cmd()).await;
assert!(first.is_err(), "Expected timeout on first recv_cmd");

let second = tokio::time::timeout(Duration::from_millis(200), fifo.recv_cmd()).await;

assert!(
matches!(second, Ok(Ok(FifoCommand::Ack))),
"recv_cmd should be cancel-safe: expected Ok(Ok(Ack)), got: {second:?}"
);
}
}
50 changes: 28 additions & 22 deletions src/executor/wall_time/perf/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,10 +31,7 @@ use runner_shared::metadata::PerfMetadata;
use runner_shared::unwind_data::UnwindData;
use std::path::Path;
use std::path::PathBuf;
use std::sync::Arc;
use std::time::Duration;
use std::{cell::OnceCell, collections::HashMap, process::ExitStatus};
use tokio::sync::Mutex;

mod jit_dump;
mod memory_mappings;
Expand Down Expand Up @@ -205,10 +202,12 @@ impl PerfRunner {
let cmd = wrap_with_sudo(wrapped_builder)?.build();
debug!("cmd: {cmd:?}");

let on_process_started = async |_| -> anyhow::Result<()> {
let on_process_started = async |pid| -> anyhow::Result<()> {
// If we output pipedata, we do not parse the perf map during teardown yet, so we need to parse memory
// maps as we receive the `CurrentBenchmark` fifo commands.
let data = Self::handle_fifo(runner_fifo, perf_fifo, self.output_pipedata).await?;
let data =
Self::handle_fifo(runner_fifo, perf_fifo, self.output_pipedata, pid as pid_t)
.await?;
self.benchmark_data.set(data).unwrap_or_else(|_| {
error!("Failed to set benchmark data in PerfRunner");
});
Expand Down Expand Up @@ -248,37 +247,44 @@ impl PerfRunner {

async fn handle_fifo(
mut runner_fifo: RunnerFifo,
perf_fifo: PerfFifo,
mut perf_fifo: PerfFifo,
parse_memory_maps: bool,
pid: pid_t,
) -> anyhow::Result<BenchmarkData> {
let mut symbols_by_pid = HashMap::<pid_t, ProcessSymbols>::new();
let mut unwind_data_by_pid = HashMap::<pid_t, Vec<UnwindData>>::new();

let perf_fifo = Arc::new(Mutex::new(perf_fifo));
let mut perf_ping_timeout = 5;
// The runner spawns a `bash` process, which will execute perf and the benchmark. To check if it
// terminated, we have to check if all the sub-processes terminated.
// We can't check for the `bash` process, because it will terminate after the FIFO handler has
// finished (because we're using a single-threaded runtime).
let mut sys = sysinfo::System::new();
let health_check = async || {
let perf_ping = tokio::time::timeout(Duration::from_secs(perf_ping_timeout), async {
perf_fifo.lock().await.ping().await
})
.await;
if let Ok(Err(_)) | Err(_) = perf_ping {
debug!("Failed to ping perf FIFO, ending perf fifo loop");
return Ok(false);
}
// Perf has started successfully, we can decrease the timeout for future pings
perf_ping_timeout = 1;
sys.refresh_processes(sysinfo::ProcessesToUpdate::All, true);
let process = sys.process(sysinfo::Pid::from_u32(pid as u32));

match process {
None => Ok(false),
Some(_proc) => {
let has_children = sys.processes().values().any(|p| {
p.parent()
.map(|parent_pid| parent_pid.as_u32() == pid as u32)
.unwrap_or(false)
});

Ok(true)
Ok(has_children)
}
}
};

let on_cmd = async |cmd: &FifoCommand| {
#[allow(deprecated)]
match cmd {
FifoCommand::StartBenchmark => {
perf_fifo.lock().await.start_events().await?;
perf_fifo.start_events().await?;
}
FifoCommand::StopBenchmark => {
perf_fifo.lock().await.stop_events().await?;
perf_fifo.stop_events().await?;
}
FifoCommand::CurrentBenchmark { pid, .. } => {
#[cfg(target_os = "linux")]
Expand All @@ -294,7 +300,7 @@ impl PerfRunner {
}
}
FifoCommand::PingPerf => {
if perf_fifo.lock().await.ping().await.is_err() {
if perf_fifo.ping().await.is_err() {
return Ok(Some(FifoCommand::Err));
}
return Ok(Some(FifoCommand::Ack));
Expand Down
Loading