tokio/runtime/signal/mod.rs
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143
#![cfg_attr(not(feature = "rt"), allow(dead_code))]
//! Signal driver
use crate::runtime::{driver, io};
use crate::signal::registry::globals;
use mio::net::UnixStream;
use std::io::{self as std_io, Read};
use std::sync::{Arc, Weak};
use std::time::Duration;
/// Responsible for registering wakeups when an OS signal is received, and
/// subsequently dispatching notifications to any signal listeners as appropriate.
///
/// Note: this driver relies on having an enabled IO driver in order to listen to
/// pipe write wakeups.
#[derive(Debug)]
pub(crate) struct Driver {
/// Thread parker. The `Driver` park implementation delegates to this.
io: io::Driver,
/// A pipe for receiving wake events from the signal handler
receiver: UnixStream,
/// Shared state. The driver keeps a strong ref and the handle keeps a weak
/// ref. The weak ref is used to check if the driver is still active before
/// trying to register a signal handler.
inner: Arc<()>,
}
#[derive(Debug, Default)]
pub(crate) struct Handle {
/// Paired w/ the `Arc` above and is used to check if the driver is still
/// around before attempting to register a signal handler.
inner: Weak<()>,
}
// ===== impl Driver =====
impl Driver {
/// Creates a new signal `Driver` instance that delegates wakeups to `park`.
pub(crate) fn new(io: io::Driver, io_handle: &io::Handle) -> std_io::Result<Self> {
use std::mem::ManuallyDrop;
use std::os::unix::io::{AsRawFd, FromRawFd};
// NB: We give each driver a "fresh" receiver file descriptor to avoid
// the issues described in alexcrichton/tokio-process#42.
//
// In the past we would reuse the actual receiver file descriptor and
// swallow any errors around double registration of the same descriptor.
// I'm not sure if the second (failed) registration simply doesn't end
// up receiving wake up notifications, or there could be some race
// condition when consuming readiness events, but having distinct
// descriptors appears to mitigate this.
//
// Unfortunately we cannot just use a single global UnixStream instance
// either, since we can't assume they will always be registered with the
// exact same reactor.
//
// Mio 0.7 removed `try_clone()` as an API due to unexpected behavior
// with registering dups with the same reactor. In this case, duping is
// safe as each dup is registered with separate reactors **and** we
// only expect at least one dup to receive the notification.
// Manually drop as we don't actually own this instance of UnixStream.
let receiver_fd = globals().receiver.as_raw_fd();
// safety: there is nothing unsafe about this, but the `from_raw_fd` fn is marked as unsafe.
let original =
ManuallyDrop::new(unsafe { std::os::unix::net::UnixStream::from_raw_fd(receiver_fd) });
let mut receiver = UnixStream::from_std(original.try_clone()?);
io_handle.register_signal_receiver(&mut receiver)?;
Ok(Self {
io,
receiver,
inner: Arc::new(()),
})
}
/// Returns a handle to this event loop which can be sent across threads
/// and can be used as a proxy to the event loop itself.
pub(crate) fn handle(&self) -> Handle {
Handle {
inner: Arc::downgrade(&self.inner),
}
}
pub(crate) fn park(&mut self, handle: &driver::Handle) {
self.io.park(handle);
self.process();
}
pub(crate) fn park_timeout(&mut self, handle: &driver::Handle, duration: Duration) {
self.io.park_timeout(handle, duration);
self.process();
}
pub(crate) fn shutdown(&mut self, handle: &driver::Handle) {
self.io.shutdown(handle);
}
fn process(&mut self) {
// If the signal pipe has not received a readiness event, then there is
// nothing else to do.
if !self.io.consume_signal_ready() {
return;
}
// Drain the pipe completely so we can receive a new readiness event
// if another signal has come in.
let mut buf = [0; 128];
#[allow(clippy::unused_io_amount)]
loop {
match self.receiver.read(&mut buf) {
Ok(0) => panic!("EOF on self-pipe"),
Ok(_) => continue, // Keep reading
Err(e) if e.kind() == std_io::ErrorKind::WouldBlock => break,
Err(e) => panic!("Bad read on self-pipe: {e}"),
}
}
// Broadcast any signals which were received
globals().broadcast();
}
}
// ===== impl Handle =====
impl Handle {
pub(crate) fn check_inner(&self) -> std_io::Result<()> {
if self.inner.strong_count() > 0 {
Ok(())
} else {
Err(std_io::Error::new(
std_io::ErrorKind::Other,
"signal driver gone",
))
}
}
}