reverie/reverie-examples/chunky_print.rs
Jason White cc6d88bfd3 ci: Fix clippy warnings (#23)
Summary:
Fixes all current clippy warnings[1] so CI is green again.

Fixing the warning `clippy::needless_pass_by_ref_mut` became a little involved. The internal version of clippy isn't recent enough to have this warning and so just doing `#[allow(clippy::needless_pass_by_ref_mut)]` leads to another error. The resulting change fixes the clippy warning and gets rid of some of the shenanigans I was doing to avoid allocating path buffers within the child process.

[1]: https://github.com/facebookexperimental/reverie/actions/runs/6164196219/job/16729396694

Pull Request resolved: https://github.com/facebookexperimental/reverie/pull/23

Reviewed By: VladimirMakaev

Differential Revision: D49209202

Pulled By: jasonwhite

fbshipit-source-id: b03ff432783910bef11fc239d146659dc2c0db30
2023-09-13 15:01:16 -07:00

274 lines
8.8 KiB
Rust

/*
* Copyright (c) Meta Platforms, Inc. and affiliates.
* All rights reserved.
*
* This source code is licensed under the BSD-style license found in the
* LICENSE file in the root directory of this source tree.
*/
//! This tool will chunk together printed output from each thread, over fixed
//! time intervals.
use std::collections::HashMap;
use std::fmt::Write;
use std::io;
use std::sync::atomic::AtomicBool;
use std::sync::atomic::Ordering;
use std::sync::Mutex;
use clap::Parser;
use reverie::syscalls::Addr;
use reverie::syscalls::MemoryAccess;
use reverie::syscalls::Syscall;
use reverie::Error;
use reverie::GlobalTool;
use reverie::Guest;
use reverie::Tid;
use reverie::Tool;
use reverie_util::CommonToolArguments;
use serde::Deserialize;
use serde::Serialize;
use tracing::debug;
use tracing::info;
use tracing::trace;
/// How many system calls (in each thread) define an epoch?
const EPOCH: u64 = 10;
#[derive(PartialEq, Debug, Eq, Hash, Clone, Serialize, Deserialize, Copy)]
pub enum Which {
Stderr,
Stdout,
}
/// Send individual print attepmts (write calls) to the global object:
#[derive(PartialEq, Debug, Eq, Hash, Clone, Serialize, Deserialize)]
pub enum Msg {
/// Route a print over to the tracer to issue.
Print(Which, Vec<u8>),
/// Tick the logical clock.
Tick,
/// Print all buffered messages, cutting off the epoch early
Flush,
}
type LogicalTime = u64;
#[derive(Debug, Default)]
struct ChunkyPrintGlobal(Mutex<Inner>);
#[derive(Debug, Default)]
struct Inner {
times: HashMap<Tid, LogicalTime>,
printbuf: HashMap<Tid, Vec<(Which, Vec<u8>)>>,
epoch_num: u64,
}
#[reverie::global_tool]
impl GlobalTool for ChunkyPrintGlobal {
type Request = Msg;
type Response = ();
type Config = ();
async fn receive_rpc(&self, from: Tid, m: Msg) {
let mut mg = self.0.lock().unwrap();
match m {
Msg::Print(w, s) => {
let v = mg.printbuf.entry(from).or_default();
v.push((w, s));
}
Msg::Tick => {
let ticks = mg.times.entry(from).or_insert(0);
*ticks += 1;
mg.check_epoch();
}
Msg::Flush => {
let _ = mg.flush_messages();
}
}
}
}
impl Inner {
/// Check if the epoch has expired and flush the buffer.
fn check_epoch(&mut self) {
if self.times.iter().all(|(_p, t)| (*t > EPOCH)) {
let _ = self.flush_messages();
self.times.iter_mut().for_each(|(_, t)| *t -= EPOCH);
self.epoch_num += 1;
}
}
fn flush_messages(&mut self) -> io::Result<()> {
let non_empty = self
.printbuf
.iter()
.fold(0, |acc, (_, v)| if v.is_empty() { acc } else { acc + 1 });
if non_empty > 1 {
let mut strbuf = String::new();
for (tid, v) in self.printbuf.iter() {
let _ = write!(&mut strbuf, "tid {}:{{", tid);
let mut iter = v.iter();
if let Some((_, b)) = iter.next() {
let _ = write!(&mut strbuf, "{}", b.len());
for (_, b) in iter {
let _ = write!(&mut strbuf, ", {}", b.len());
}
}
let _ = write!(&mut strbuf, "}} ");
}
info!(
" [chunky_print] {} threads concurrent output in epoch {}, sizes: {}",
non_empty, self.epoch_num, strbuf
);
} else {
debug!(
" [chunky_print] output from {} thread(s) in epoch {}: {} bytes",
non_empty,
self.epoch_num,
self.printbuf
.iter()
.fold(0, |acc, (_, v)| v.iter().fold(acc, |a, (_, b)| a + b.len()))
);
}
for (tid, v) in self.printbuf.iter_mut() {
for (w, b) in v.iter() {
match w {
Which::Stdout => {
trace!(
" [chunky_print] writing {} bytes to stdout from tid {}",
b.len(),
tid
);
io::Write::write_all(&mut io::stdout(), b)?;
}
Which::Stderr => {
trace!(
" [chunky_print] writing {} bytes to stderr from tid {}",
b.len(),
tid
);
io::Write::write_all(&mut io::stderr(), b)?;
}
}
}
v.clear();
}
io::Write::flush(&mut io::stdout())?;
io::Write::flush(&mut io::stderr())?;
Ok(())
}
}
#[derive(Debug, Default)]
struct ChunkyPrintLocal {
stdout_disconnected: AtomicBool,
stderr_disconnected: AtomicBool,
}
impl Clone for ChunkyPrintLocal {
fn clone(&self) -> Self {
ChunkyPrintLocal {
stdout_disconnected: AtomicBool::new(self.stdout_disconnected.load(Ordering::SeqCst)),
stderr_disconnected: AtomicBool::new(self.stderr_disconnected.load(Ordering::SeqCst)),
}
}
}
fn read_tracee_memory<T: Guest<ChunkyPrintLocal>>(
guest: &T,
addr: Addr<u8>,
len: usize,
) -> Result<Vec<u8>, Error> {
let mut buf = vec![0; len];
guest.memory().read_exact(addr, &mut buf)?;
Ok(buf)
}
#[reverie::tool]
impl Tool for ChunkyPrintLocal {
type GlobalState = ChunkyPrintGlobal;
type ThreadState = ();
async fn handle_syscall_event<T: Guest<Self>>(
&self,
guest: &mut T,
call: Syscall,
) -> Result<i64, Error> {
let _ = guest.send_rpc(Msg::Tick).await;
match call {
// Here we make some attempt to catch redirections:
// FIXME: De-dup the dup
#[cfg(target_arch = "x86_64")]
Syscall::Dup2(d) => {
match d.newfd() {
1 => self.stdout_disconnected.store(true, Ordering::SeqCst),
2 => self.stderr_disconnected.store(true, Ordering::SeqCst),
_ => {}
}
guest.tail_inject(call).await
}
Syscall::Dup3(d) => {
match d.newfd() {
1 => self.stdout_disconnected.store(true, Ordering::SeqCst),
2 => self.stderr_disconnected.store(true, Ordering::SeqCst),
_ => {}
}
guest.tail_inject(call).await
}
Syscall::Write(w) => {
match w.fd() {
1 | 2 => {
let which = if w.fd() == 1 {
if self.stdout_disconnected.load(Ordering::SeqCst) {
debug!(
" [chunky_print] letting through write on redirected stdout, {} bytes.",
w.len()
);
return guest.tail_inject(call).await;
}
Which::Stdout
} else {
if self.stderr_disconnected.load(Ordering::SeqCst) {
debug!(
" [chunky_print] letting through write on redirected stderr, {} bytes.",
w.len()
);
return guest.tail_inject(call).await;
}
Which::Stderr
};
let buf = read_tracee_memory(guest, w.buf().unwrap(), w.len())?;
let _ = guest.send_rpc(Msg::Print(which, buf)).await;
info!(
" [chunky_print] suppressed write of {} bytes to fd {}",
w.len(),
w.fd()
);
// Suppress the original system call:
Ok(w.len() as i64)
}
_ => guest.tail_inject(call).await,
}
}
_ => guest.tail_inject(call).await,
}
}
}
#[tokio::main]
async fn main() -> Result<(), Error> {
let args = CommonToolArguments::from_args();
let log_guard = args.init_tracing();
let tracer = reverie_ptrace::TracerBuilder::<ChunkyPrintLocal>::new(args.into())
.spawn()
.await?;
let (status, global_state) = tracer.wait().await?;
trace!(" [chunky_print] global exit, flushing last messages.");
let _ = global_state.0.lock().unwrap().flush_messages();
drop(log_guard); // Flush logs before exiting.
status.raise_or_exit()
}