I have a PR swapping from Command::output()
to a custom implementation using threads to handle the subprocess' stdout/stderr while the program is running, instead of waiting for it to complete. The approach was motivated by this answer.
// Executes the given command, capturing its output and exit code in the returned Invocation. // If output_streams is present the output of the command is _also_ written to these streams // concurrently, in order to support displaying a command's output while simultaneously caching // it (instead of waiting for the command to complete before outputting anything). fn execute_subprocess( cmd: impl Into<std::process::Command>, output_streams: Option<(impl Write+Send, impl Write+Send)> ) -> Result<Invocation> { fn maybe_tee(mut source: impl Read, mut sink: Option<impl Write>) -> std::io::Result<Vec<u8>> { let mut ret = Vec::new(); // This initialization can be avoided (safely) once // https://github.com/rust-lang/rust/issues/78485 is stable. let mut buf = [0u8; 1024 * 10]; loop { let num_read = source.read(&mut buf)?; if num_read == 0 { break; } let buf = &buf[..num_read]; if let Some(ref mut sink) = sink { sink.write_all(buf)?; sink.flush()?; } ret.extend(buf); } Ok(ret) } let (out_sink, err_sink) = match output_streams { Some((out, err)) => (Some(out), Some(err)), None => (None, None), }; let mut command: std::process::Command = cmd.into(); use std::process::Stdio; let command = command.stdout(Stdio::piped()).stderr(Stdio::piped()); let start = Instant::now(); let mut child = command.spawn().with_context(|| format!("Failed to run command: {:?}", command))?; let child_out = child.stdout.take().ok_or(anyhow!("cannot attach to child stdout"))?; let child_err = child.stderr.take().ok_or(anyhow!("cannot attach to child stderr"))?; // Using scoped threads means we can take a Write+Send instead of a W+S+'static, allowing // callers to pass mutable references (such as `&mut Vec<u8>`). See also // https://stackoverflow.com/q/32750829/113632 let (stdout, stderr) = std::thread::scope(|s| { let thread_out = s.spawn(|| maybe_tee(child_out, out_sink)); let thread_err = s.spawn(|| maybe_tee(child_err, err_sink)); let stdout = thread_out.join().expect("child stdout thread failed to join").context("stdout pipe failed")?; let stderr = thread_err.join().expect("child stderr thread failed to join").context("stderr pipe failed")?; anyhow::Ok((stdout, stderr)) })?; let status = child.wait()?; let runtime = start.elapsed(); Ok(Invocation { stdout, stderr, // TODO handle signals, see https://stackoverflow.com/q/66272686 exit_code: status.code().unwrap_or(126), runtime, }) }
I'd appreciate any thoughts on the implementation - or especially the API - of this function1. In particular:
- Callers are expected to provide a pair of
impl Write+Send
types, which is compatible withstd::io::Stdout
/Stderr
as well as user-provided types such as&mut Vec<u8>
(specifically intended for unit testing).- Unfortunately
StdoutLock
/StderrLock
are notSend
and therefore cannot be specified here - in principle I believe that would be a better option but I'm not sure how to support that along with otherWrite
impls.
- Unfortunately
std::thread::scope
is used to simplify the sinks' signature - without scoping I believe it's necessary to useimpl Write+Send+'static
which prevents using borrowed types like&mut Vec<u8>
.- Trying to avoid
.expect()
but I'm not sure how to do so with the.join()
results, when I swap the expect for?
I hit errors. - Any general feedback about idiomatic Rust and patterns that could be improved / simplified here.
1 Of course if you're interested in taking a look at the whole PR that'd be greatly appreciated! :)
///
) that describe the function, its params and return value. 3) You introduce threading, but.take()
the output streams.\$\endgroup\$maybe_tee()
helper out would just pollute the class' namespace. Maybe it's worth doing anyways though. What's the issue with using.take()
here? I borrowed that pattern from the linked SO post.\$\endgroup\$stdout
in a thread, which is not actually the case.\$\endgroup\$