tokio/runtime/scheduler/
mod.rs
1cfg_rt! {
2 pub(crate) mod current_thread;
3 pub(crate) use current_thread::CurrentThread;
4
5 mod defer;
6 use defer::Defer;
7
8 pub(crate) mod inject;
9 pub(crate) use inject::Inject;
10
11 use crate::runtime::TaskHooks;
12
13 use crate::runtime::WorkerMetrics;
14}
15
16cfg_rt_multi_thread! {
17 mod block_in_place;
18 pub(crate) use block_in_place::block_in_place;
19
20 mod lock;
21 use lock::Lock;
22
23 pub(crate) mod multi_thread;
24 pub(crate) use multi_thread::MultiThread;
25}
26
27use crate::runtime::driver;
28
29#[derive(Debug, Clone)]
30pub(crate) enum Handle {
31 #[cfg(feature = "rt")]
32 CurrentThread(Arc<current_thread::Handle>),
33
34 #[cfg(feature = "rt-multi-thread")]
35 MultiThread(Arc<multi_thread::Handle>),
36
37 #[cfg(not(feature = "rt"))]
40 #[allow(dead_code)]
41 Disabled,
42}
43
44#[cfg(feature = "rt")]
45pub(super) enum Context {
46 CurrentThread(current_thread::Context),
47
48 #[cfg(feature = "rt-multi-thread")]
49 MultiThread(multi_thread::Context),
50}
51
52impl Handle {
53 #[cfg_attr(not(feature = "full"), allow(dead_code))]
54 pub(crate) fn driver(&self) -> &driver::Handle {
55 match *self {
56 #[cfg(feature = "rt")]
57 Handle::CurrentThread(ref h) => &h.driver,
58
59 #[cfg(feature = "rt-multi-thread")]
60 Handle::MultiThread(ref h) => &h.driver,
61
62 #[cfg(not(feature = "rt"))]
63 Handle::Disabled => unreachable!(),
64 }
65 }
66}
67
68cfg_rt! {
69 use crate::future::Future;
70 use crate::loom::sync::Arc;
71 use crate::runtime::{blocking, task::Id};
72 use crate::runtime::context;
73 use crate::task::JoinHandle;
74 use crate::util::RngSeedGenerator;
75 use std::task::Waker;
76
77 macro_rules! match_flavor {
78 ($self:expr, $ty:ident($h:ident) => $e:expr) => {
79 match $self {
80 $ty::CurrentThread($h) => $e,
81
82 #[cfg(feature = "rt-multi-thread")]
83 $ty::MultiThread($h) => $e,
84 }
85 }
86 }
87
88 impl Handle {
89 #[track_caller]
90 pub(crate) fn current() -> Handle {
91 match context::with_current(Clone::clone) {
92 Ok(handle) => handle,
93 Err(e) => panic!("{}", e),
94 }
95 }
96
97 pub(crate) fn blocking_spawner(&self) -> &blocking::Spawner {
98 match_flavor!(self, Handle(h) => &h.blocking_spawner)
99 }
100
101 pub(crate) fn is_local(&self) -> bool {
102 match self {
103 Handle::CurrentThread(h) => h.local_tid.is_some(),
104
105 #[cfg(feature = "rt-multi-thread")]
106 Handle::MultiThread(_) => false,
107 }
108 }
109
110 pub(crate) fn can_spawn_local_on_local_runtime(&self) -> bool {
112 match self {
113 Handle::CurrentThread(h) => h.local_tid.map(|x| std::thread::current().id() == x).unwrap_or(false),
114
115 #[cfg(feature = "rt-multi-thread")]
116 Handle::MultiThread(_) => false,
117 }
118 }
119
120 pub(crate) fn spawn<F>(&self, future: F, id: Id) -> JoinHandle<F::Output>
121 where
122 F: Future + Send + 'static,
123 F::Output: Send + 'static,
124 {
125 match self {
126 Handle::CurrentThread(h) => current_thread::Handle::spawn(h, future, id),
127
128 #[cfg(feature = "rt-multi-thread")]
129 Handle::MultiThread(h) => multi_thread::Handle::spawn(h, future, id),
130 }
131 }
132
133 #[allow(irrefutable_let_patterns)]
139 pub(crate) unsafe fn spawn_local<F>(&self, future: F, id: Id) -> JoinHandle<F::Output>
140 where
141 F: Future + 'static,
142 F::Output: 'static,
143 {
144 if let Handle::CurrentThread(h) = self {
145 current_thread::Handle::spawn_local(h, future, id)
146 } else {
147 panic!("Only current_thread and LocalSet have spawn_local internals implemented")
148 }
149 }
150
151 pub(crate) fn shutdown(&self) {
152 match *self {
153 Handle::CurrentThread(_) => {},
154
155 #[cfg(feature = "rt-multi-thread")]
156 Handle::MultiThread(ref h) => h.shutdown(),
157 }
158 }
159
160 pub(crate) fn seed_generator(&self) -> &RngSeedGenerator {
161 match_flavor!(self, Handle(h) => &h.seed_generator)
162 }
163
164 pub(crate) fn as_current_thread(&self) -> &Arc<current_thread::Handle> {
165 match self {
166 Handle::CurrentThread(handle) => handle,
167 #[cfg(feature = "rt-multi-thread")]
168 _ => panic!("not a CurrentThread handle"),
169 }
170 }
171
172 pub(crate) fn hooks(&self) -> &TaskHooks {
173 match self {
174 Handle::CurrentThread(h) => &h.task_hooks,
175 #[cfg(feature = "rt-multi-thread")]
176 Handle::MultiThread(h) => &h.task_hooks,
177 }
178 }
179 }
180
181 impl Handle {
182 pub(crate) fn num_workers(&self) -> usize {
183 match self {
184 Handle::CurrentThread(_) => 1,
185 #[cfg(feature = "rt-multi-thread")]
186 Handle::MultiThread(handle) => handle.num_workers(),
187 }
188 }
189
190 pub(crate) fn num_alive_tasks(&self) -> usize {
191 match_flavor!(self, Handle(handle) => handle.num_alive_tasks())
192 }
193
194 pub(crate) fn injection_queue_depth(&self) -> usize {
195 match_flavor!(self, Handle(handle) => handle.injection_queue_depth())
196 }
197
198 pub(crate) fn worker_metrics(&self, worker: usize) -> &WorkerMetrics {
199 match_flavor!(self, Handle(handle) => handle.worker_metrics(worker))
200 }
201 }
202
203 cfg_unstable_metrics! {
204 use crate::runtime::SchedulerMetrics;
205
206 impl Handle {
207 cfg_64bit_metrics! {
208 pub(crate) fn spawned_tasks_count(&self) -> u64 {
209 match_flavor!(self, Handle(handle) => handle.spawned_tasks_count())
210 }
211 }
212
213 pub(crate) fn num_blocking_threads(&self) -> usize {
214 match_flavor!(self, Handle(handle) => handle.num_blocking_threads())
215 }
216
217 pub(crate) fn num_idle_blocking_threads(&self) -> usize {
218 match_flavor!(self, Handle(handle) => handle.num_idle_blocking_threads())
219 }
220
221 pub(crate) fn scheduler_metrics(&self) -> &SchedulerMetrics {
222 match_flavor!(self, Handle(handle) => handle.scheduler_metrics())
223 }
224
225 pub(crate) fn worker_local_queue_depth(&self, worker: usize) -> usize {
226 match_flavor!(self, Handle(handle) => handle.worker_local_queue_depth(worker))
227 }
228
229 pub(crate) fn blocking_queue_depth(&self) -> usize {
230 match_flavor!(self, Handle(handle) => handle.blocking_queue_depth())
231 }
232 }
233 }
234
235 impl Context {
236 #[track_caller]
237 pub(crate) fn expect_current_thread(&self) -> ¤t_thread::Context {
238 match self {
239 Context::CurrentThread(context) => context,
240 #[cfg(feature = "rt-multi-thread")]
241 _ => panic!("expected `CurrentThread::Context`")
242 }
243 }
244
245 pub(crate) fn defer(&self, waker: &Waker) {
246 match_flavor!(self, Context(context) => context.defer(waker));
247 }
248
249 cfg_rt_multi_thread! {
250 #[track_caller]
251 pub(crate) fn expect_multi_thread(&self) -> &multi_thread::Context {
252 match self {
253 Context::MultiThread(context) => context,
254 _ => panic!("expected `MultiThread::Context`")
255 }
256 }
257 }
258 }
259}
260
261cfg_not_rt! {
262 #[cfg(any(
263 feature = "net",
264 all(unix, feature = "process"),
265 all(unix, feature = "signal"),
266 feature = "time",
267 ))]
268 impl Handle {
269 #[track_caller]
270 pub(crate) fn current() -> Handle {
271 panic!("{}", crate::util::error::CONTEXT_MISSING_ERROR)
272 }
273 }
274}