tokio/runtime/scheduler/
mod.rs

1cfg_rt! {
2    pub(crate) mod current_thread;
3    pub(crate) use current_thread::CurrentThread;
4
5    mod defer;
6    use defer::Defer;
7
8    pub(crate) mod inject;
9    pub(crate) use inject::Inject;
10
11    use crate::runtime::TaskHooks;
12
13    use crate::runtime::WorkerMetrics;
14}
15
16cfg_rt_multi_thread! {
17    mod block_in_place;
18    pub(crate) use block_in_place::block_in_place;
19
20    mod lock;
21    use lock::Lock;
22
23    pub(crate) mod multi_thread;
24    pub(crate) use multi_thread::MultiThread;
25}
26
27use crate::runtime::driver;
28
29#[derive(Debug, Clone)]
30pub(crate) enum Handle {
31    #[cfg(feature = "rt")]
32    CurrentThread(Arc<current_thread::Handle>),
33
34    #[cfg(feature = "rt-multi-thread")]
35    MultiThread(Arc<multi_thread::Handle>),
36
37    // TODO: This is to avoid triggering "dead code" warnings many other places
38    // in the codebase. Remove this during a later cleanup
39    #[cfg(not(feature = "rt"))]
40    #[allow(dead_code)]
41    Disabled,
42}
43
44#[cfg(feature = "rt")]
45pub(super) enum Context {
46    CurrentThread(current_thread::Context),
47
48    #[cfg(feature = "rt-multi-thread")]
49    MultiThread(multi_thread::Context),
50}
51
52impl Handle {
53    #[cfg_attr(not(feature = "full"), allow(dead_code))]
54    pub(crate) fn driver(&self) -> &driver::Handle {
55        match *self {
56            #[cfg(feature = "rt")]
57            Handle::CurrentThread(ref h) => &h.driver,
58
59            #[cfg(feature = "rt-multi-thread")]
60            Handle::MultiThread(ref h) => &h.driver,
61
62            #[cfg(not(feature = "rt"))]
63            Handle::Disabled => unreachable!(),
64        }
65    }
66}
67
68cfg_rt! {
69    use crate::future::Future;
70    use crate::loom::sync::Arc;
71    use crate::runtime::{blocking, task::Id};
72    use crate::runtime::context;
73    use crate::task::JoinHandle;
74    use crate::util::RngSeedGenerator;
75    use std::task::Waker;
76
77    macro_rules! match_flavor {
78        ($self:expr, $ty:ident($h:ident) => $e:expr) => {
79            match $self {
80                $ty::CurrentThread($h) => $e,
81
82                #[cfg(feature = "rt-multi-thread")]
83                $ty::MultiThread($h) => $e,
84            }
85        }
86    }
87
88    impl Handle {
89        #[track_caller]
90        pub(crate) fn current() -> Handle {
91            match context::with_current(Clone::clone) {
92                Ok(handle) => handle,
93                Err(e) => panic!("{}", e),
94            }
95        }
96
97        pub(crate) fn blocking_spawner(&self) -> &blocking::Spawner {
98            match_flavor!(self, Handle(h) => &h.blocking_spawner)
99        }
100
101        pub(crate) fn is_local(&self) -> bool {
102            match self {
103                Handle::CurrentThread(h) => h.local_tid.is_some(),
104
105                #[cfg(feature = "rt-multi-thread")]
106                Handle::MultiThread(_) => false,
107            }
108        }
109
110        /// Returns true if this is a local runtime and the runtime is owned by the current thread.
111        pub(crate) fn can_spawn_local_on_local_runtime(&self) -> bool {
112            match self {
113                Handle::CurrentThread(h) => h.local_tid.map(|x| std::thread::current().id() == x).unwrap_or(false),
114
115                #[cfg(feature = "rt-multi-thread")]
116                Handle::MultiThread(_) => false,
117            }
118        }
119
120        pub(crate) fn spawn<F>(&self, future: F, id: Id) -> JoinHandle<F::Output>
121        where
122            F: Future + Send + 'static,
123            F::Output: Send + 'static,
124        {
125            match self {
126                Handle::CurrentThread(h) => current_thread::Handle::spawn(h, future, id),
127
128                #[cfg(feature = "rt-multi-thread")]
129                Handle::MultiThread(h) => multi_thread::Handle::spawn(h, future, id),
130            }
131        }
132
133        /// Spawn a local task
134        ///
135        /// # Safety
136        /// This should only be called in `LocalRuntime` if the runtime has been verified to be owned
137        /// by the current thread.
138        #[allow(irrefutable_let_patterns)]
139        pub(crate) unsafe fn spawn_local<F>(&self, future: F, id: Id) -> JoinHandle<F::Output>
140        where
141            F: Future + 'static,
142            F::Output: 'static,
143        {
144            if let Handle::CurrentThread(h) = self {
145                current_thread::Handle::spawn_local(h, future, id)
146            } else {
147                panic!("Only current_thread and LocalSet have spawn_local internals implemented")
148            }
149        }
150
151        pub(crate) fn shutdown(&self) {
152            match *self {
153                Handle::CurrentThread(_) => {},
154
155                #[cfg(feature = "rt-multi-thread")]
156                Handle::MultiThread(ref h) => h.shutdown(),
157            }
158        }
159
160        pub(crate) fn seed_generator(&self) -> &RngSeedGenerator {
161            match_flavor!(self, Handle(h) => &h.seed_generator)
162        }
163
164        pub(crate) fn as_current_thread(&self) -> &Arc<current_thread::Handle> {
165            match self {
166                Handle::CurrentThread(handle) => handle,
167                #[cfg(feature = "rt-multi-thread")]
168                _ => panic!("not a CurrentThread handle"),
169            }
170        }
171
172        pub(crate) fn hooks(&self) -> &TaskHooks {
173            match self {
174                Handle::CurrentThread(h) => &h.task_hooks,
175                #[cfg(feature = "rt-multi-thread")]
176                Handle::MultiThread(h) => &h.task_hooks,
177            }
178        }
179    }
180
181    impl Handle {
182        pub(crate) fn num_workers(&self) -> usize {
183            match self {
184                Handle::CurrentThread(_) => 1,
185                #[cfg(feature = "rt-multi-thread")]
186                Handle::MultiThread(handle) => handle.num_workers(),
187            }
188        }
189
190        pub(crate) fn num_alive_tasks(&self) -> usize {
191            match_flavor!(self, Handle(handle) => handle.num_alive_tasks())
192        }
193
194        pub(crate) fn injection_queue_depth(&self) -> usize {
195            match_flavor!(self, Handle(handle) => handle.injection_queue_depth())
196        }
197
198        pub(crate) fn worker_metrics(&self, worker: usize) -> &WorkerMetrics {
199            match_flavor!(self, Handle(handle) => handle.worker_metrics(worker))
200        }
201    }
202
203    cfg_unstable_metrics! {
204        use crate::runtime::SchedulerMetrics;
205
206        impl Handle {
207            cfg_64bit_metrics! {
208                pub(crate) fn spawned_tasks_count(&self) -> u64 {
209                    match_flavor!(self, Handle(handle) => handle.spawned_tasks_count())
210                }
211            }
212
213            pub(crate) fn num_blocking_threads(&self) -> usize {
214                match_flavor!(self, Handle(handle) => handle.num_blocking_threads())
215            }
216
217            pub(crate) fn num_idle_blocking_threads(&self) -> usize {
218                match_flavor!(self, Handle(handle) => handle.num_idle_blocking_threads())
219            }
220
221            pub(crate) fn scheduler_metrics(&self) -> &SchedulerMetrics {
222                match_flavor!(self, Handle(handle) => handle.scheduler_metrics())
223            }
224
225            pub(crate) fn worker_local_queue_depth(&self, worker: usize) -> usize {
226                match_flavor!(self, Handle(handle) => handle.worker_local_queue_depth(worker))
227            }
228
229            pub(crate) fn blocking_queue_depth(&self) -> usize {
230                match_flavor!(self, Handle(handle) => handle.blocking_queue_depth())
231            }
232        }
233    }
234
235    impl Context {
236        #[track_caller]
237        pub(crate) fn expect_current_thread(&self) -> &current_thread::Context {
238            match self {
239                Context::CurrentThread(context) => context,
240                #[cfg(feature = "rt-multi-thread")]
241                _ => panic!("expected `CurrentThread::Context`")
242            }
243        }
244
245        pub(crate) fn defer(&self, waker: &Waker) {
246            match_flavor!(self, Context(context) => context.defer(waker));
247        }
248
249        cfg_rt_multi_thread! {
250            #[track_caller]
251            pub(crate) fn expect_multi_thread(&self) -> &multi_thread::Context {
252                match self {
253                    Context::MultiThread(context) => context,
254                    _ => panic!("expected `MultiThread::Context`")
255                }
256            }
257        }
258    }
259}
260
261cfg_not_rt! {
262    #[cfg(any(
263        feature = "net",
264        all(unix, feature = "process"),
265        all(unix, feature = "signal"),
266        feature = "time",
267    ))]
268    impl Handle {
269        #[track_caller]
270        pub(crate) fn current() -> Handle {
271            panic!("{}", crate::util::error::CONTEXT_MISSING_ERROR)
272        }
273    }
274}