tokio/runtime/builder.rs
1#![cfg_attr(loom, allow(unused_imports))]
2
3use crate::runtime::handle::Handle;
4use crate::runtime::{blocking, driver, Callback, HistogramBuilder, Runtime, TaskCallback};
5#[cfg(tokio_unstable)]
6use crate::runtime::{metrics::HistogramConfiguration, LocalOptions, LocalRuntime, TaskMeta};
7use crate::util::rand::{RngSeed, RngSeedGenerator};
8
9use crate::runtime::blocking::BlockingPool;
10use crate::runtime::scheduler::CurrentThread;
11use std::fmt;
12use std::io;
13use std::thread::ThreadId;
14use std::time::Duration;
15
16/// Builds Tokio Runtime with custom configuration values.
17///
18/// Methods can be chained in order to set the configuration values. The
19/// Runtime is constructed by calling [`build`].
20///
21/// New instances of `Builder` are obtained via [`Builder::new_multi_thread`]
22/// or [`Builder::new_current_thread`].
23///
24/// See function level documentation for details on the various configuration
25/// settings.
26///
27/// [`build`]: method@Self::build
28/// [`Builder::new_multi_thread`]: method@Self::new_multi_thread
29/// [`Builder::new_current_thread`]: method@Self::new_current_thread
30///
31/// # Examples
32///
33/// ```
34/// use tokio::runtime::Builder;
35///
36/// fn main() {
37/// // build runtime
38/// let runtime = Builder::new_multi_thread()
39/// .worker_threads(4)
40/// .thread_name("my-custom-name")
41/// .thread_stack_size(3 * 1024 * 1024)
42/// .build()
43/// .unwrap();
44///
45/// // use runtime ...
46/// }
47/// ```
48pub struct Builder {
49 /// Runtime type
50 kind: Kind,
51
52 /// Whether or not to enable the I/O driver
53 enable_io: bool,
54 nevents: usize,
55
56 /// Whether or not to enable the time driver
57 enable_time: bool,
58
59 /// Whether or not the clock should start paused.
60 start_paused: bool,
61
62 /// The number of worker threads, used by Runtime.
63 ///
64 /// Only used when not using the current-thread executor.
65 worker_threads: Option<usize>,
66
67 /// Cap on thread usage.
68 max_blocking_threads: usize,
69
70 /// Name fn used for threads spawned by the runtime.
71 pub(super) thread_name: ThreadNameFn,
72
73 /// Stack size used for threads spawned by the runtime.
74 pub(super) thread_stack_size: Option<usize>,
75
76 /// Callback to run after each thread starts.
77 pub(super) after_start: Option<Callback>,
78
79 /// To run before each worker thread stops
80 pub(super) before_stop: Option<Callback>,
81
82 /// To run before each worker thread is parked.
83 pub(super) before_park: Option<Callback>,
84
85 /// To run after each thread is unparked.
86 pub(super) after_unpark: Option<Callback>,
87
88 /// To run before each task is spawned.
89 pub(super) before_spawn: Option<TaskCallback>,
90
91 /// To run before each poll
92 #[cfg(tokio_unstable)]
93 pub(super) before_poll: Option<TaskCallback>,
94
95 /// To run after each poll
96 #[cfg(tokio_unstable)]
97 pub(super) after_poll: Option<TaskCallback>,
98
99 /// To run after each task is terminated.
100 pub(super) after_termination: Option<TaskCallback>,
101
102 /// Customizable keep alive timeout for `BlockingPool`
103 pub(super) keep_alive: Option<Duration>,
104
105 /// How many ticks before pulling a task from the global/remote queue?
106 ///
107 /// When `None`, the value is unspecified and behavior details are left to
108 /// the scheduler. Each scheduler flavor could choose to either pick its own
109 /// default value or use some other strategy to decide when to poll from the
110 /// global queue. For example, the multi-threaded scheduler uses a
111 /// self-tuning strategy based on mean task poll times.
112 pub(super) global_queue_interval: Option<u32>,
113
114 /// How many ticks before yielding to the driver for timer and I/O events?
115 pub(super) event_interval: u32,
116
117 /// When true, the multi-threade scheduler LIFO slot should not be used.
118 ///
119 /// This option should only be exposed as unstable.
120 pub(super) disable_lifo_slot: bool,
121
122 /// Specify a random number generator seed to provide deterministic results
123 pub(super) seed_generator: RngSeedGenerator,
124
125 /// When true, enables task poll count histogram instrumentation.
126 pub(super) metrics_poll_count_histogram_enable: bool,
127
128 /// Configures the task poll count histogram
129 pub(super) metrics_poll_count_histogram: HistogramBuilder,
130
131 #[cfg(tokio_unstable)]
132 pub(super) unhandled_panic: UnhandledPanic,
133}
134
135cfg_unstable! {
136 /// How the runtime should respond to unhandled panics.
137 ///
138 /// Instances of `UnhandledPanic` are passed to `Builder::unhandled_panic`
139 /// to configure the runtime behavior when a spawned task panics.
140 ///
141 /// See [`Builder::unhandled_panic`] for more details.
142 #[derive(Debug, Clone)]
143 #[non_exhaustive]
144 pub enum UnhandledPanic {
145 /// The runtime should ignore panics on spawned tasks.
146 ///
147 /// The panic is forwarded to the task's [`JoinHandle`] and all spawned
148 /// tasks continue running normally.
149 ///
150 /// This is the default behavior.
151 ///
152 /// # Examples
153 ///
154 /// ```
155 /// use tokio::runtime::{self, UnhandledPanic};
156 ///
157 /// # pub fn main() {
158 /// let rt = runtime::Builder::new_current_thread()
159 /// .unhandled_panic(UnhandledPanic::Ignore)
160 /// .build()
161 /// .unwrap();
162 ///
163 /// let task1 = rt.spawn(async { panic!("boom"); });
164 /// let task2 = rt.spawn(async {
165 /// // This task completes normally
166 /// "done"
167 /// });
168 ///
169 /// rt.block_on(async {
170 /// // The panic on the first task is forwarded to the `JoinHandle`
171 /// assert!(task1.await.is_err());
172 ///
173 /// // The second task completes normally
174 /// assert!(task2.await.is_ok());
175 /// })
176 /// # }
177 /// ```
178 ///
179 /// [`JoinHandle`]: struct@crate::task::JoinHandle
180 Ignore,
181
182 /// The runtime should immediately shutdown if a spawned task panics.
183 ///
184 /// The runtime will immediately shutdown even if the panicked task's
185 /// [`JoinHandle`] is still available. All further spawned tasks will be
186 /// immediately dropped and call to [`Runtime::block_on`] will panic.
187 ///
188 /// # Examples
189 ///
190 /// ```should_panic
191 /// use tokio::runtime::{self, UnhandledPanic};
192 ///
193 /// # pub fn main() {
194 /// let rt = runtime::Builder::new_current_thread()
195 /// .unhandled_panic(UnhandledPanic::ShutdownRuntime)
196 /// .build()
197 /// .unwrap();
198 ///
199 /// rt.spawn(async { panic!("boom"); });
200 /// rt.spawn(async {
201 /// // This task never completes.
202 /// });
203 ///
204 /// rt.block_on(async {
205 /// // Do some work
206 /// # loop { tokio::task::yield_now().await; }
207 /// })
208 /// # }
209 /// ```
210 ///
211 /// [`JoinHandle`]: struct@crate::task::JoinHandle
212 ShutdownRuntime,
213 }
214}
215
216pub(crate) type ThreadNameFn = std::sync::Arc<dyn Fn() -> String + Send + Sync + 'static>;
217
218#[derive(Clone, Copy)]
219pub(crate) enum Kind {
220 CurrentThread,
221 #[cfg(feature = "rt-multi-thread")]
222 MultiThread,
223}
224
225impl Builder {
226 /// Returns a new builder with the current thread scheduler selected.
227 ///
228 /// Configuration methods can be chained on the return value.
229 ///
230 /// To spawn non-`Send` tasks on the resulting runtime, combine it with a
231 /// [`LocalSet`].
232 ///
233 /// [`LocalSet`]: crate::task::LocalSet
234 pub fn new_current_thread() -> Builder {
235 #[cfg(loom)]
236 const EVENT_INTERVAL: u32 = 4;
237 // The number `61` is fairly arbitrary. I believe this value was copied from golang.
238 #[cfg(not(loom))]
239 const EVENT_INTERVAL: u32 = 61;
240
241 Builder::new(Kind::CurrentThread, EVENT_INTERVAL)
242 }
243
244 /// Returns a new builder with the multi thread scheduler selected.
245 ///
246 /// Configuration methods can be chained on the return value.
247 #[cfg(feature = "rt-multi-thread")]
248 #[cfg_attr(docsrs, doc(cfg(feature = "rt-multi-thread")))]
249 pub fn new_multi_thread() -> Builder {
250 // The number `61` is fairly arbitrary. I believe this value was copied from golang.
251 Builder::new(Kind::MultiThread, 61)
252 }
253
254 /// Returns a new runtime builder initialized with default configuration
255 /// values.
256 ///
257 /// Configuration methods can be chained on the return value.
258 pub(crate) fn new(kind: Kind, event_interval: u32) -> Builder {
259 Builder {
260 kind,
261
262 // I/O defaults to "off"
263 enable_io: false,
264 nevents: 1024,
265
266 // Time defaults to "off"
267 enable_time: false,
268
269 // The clock starts not-paused
270 start_paused: false,
271
272 // Read from environment variable first in multi-threaded mode.
273 // Default to lazy auto-detection (one thread per CPU core)
274 worker_threads: None,
275
276 max_blocking_threads: 512,
277
278 // Default thread name
279 thread_name: std::sync::Arc::new(|| "tokio-runtime-worker".into()),
280
281 // Do not set a stack size by default
282 thread_stack_size: None,
283
284 // No worker thread callbacks
285 after_start: None,
286 before_stop: None,
287 before_park: None,
288 after_unpark: None,
289
290 before_spawn: None,
291 after_termination: None,
292
293 #[cfg(tokio_unstable)]
294 before_poll: None,
295 #[cfg(tokio_unstable)]
296 after_poll: None,
297
298 keep_alive: None,
299
300 // Defaults for these values depend on the scheduler kind, so we get them
301 // as parameters.
302 global_queue_interval: None,
303 event_interval,
304
305 seed_generator: RngSeedGenerator::new(RngSeed::new()),
306
307 #[cfg(tokio_unstable)]
308 unhandled_panic: UnhandledPanic::Ignore,
309
310 metrics_poll_count_histogram_enable: false,
311
312 metrics_poll_count_histogram: HistogramBuilder::default(),
313
314 disable_lifo_slot: false,
315 }
316 }
317
318 /// Enables both I/O and time drivers.
319 ///
320 /// Doing this is a shorthand for calling `enable_io` and `enable_time`
321 /// individually. If additional components are added to Tokio in the future,
322 /// `enable_all` will include these future components.
323 ///
324 /// # Examples
325 ///
326 /// ```
327 /// use tokio::runtime;
328 ///
329 /// let rt = runtime::Builder::new_multi_thread()
330 /// .enable_all()
331 /// .build()
332 /// .unwrap();
333 /// ```
334 pub fn enable_all(&mut self) -> &mut Self {
335 #[cfg(any(
336 feature = "net",
337 all(unix, feature = "process"),
338 all(unix, feature = "signal")
339 ))]
340 self.enable_io();
341 #[cfg(feature = "time")]
342 self.enable_time();
343
344 self
345 }
346
347 /// Sets the number of worker threads the `Runtime` will use.
348 ///
349 /// This can be any number above 0 though it is advised to keep this value
350 /// on the smaller side.
351 ///
352 /// This will override the value read from environment variable `TOKIO_WORKER_THREADS`.
353 ///
354 /// # Default
355 ///
356 /// The default value is the number of cores available to the system.
357 ///
358 /// When using the `current_thread` runtime this method has no effect.
359 ///
360 /// # Examples
361 ///
362 /// ## Multi threaded runtime with 4 threads
363 ///
364 /// ```
365 /// use tokio::runtime;
366 ///
367 /// // This will spawn a work-stealing runtime with 4 worker threads.
368 /// let rt = runtime::Builder::new_multi_thread()
369 /// .worker_threads(4)
370 /// .build()
371 /// .unwrap();
372 ///
373 /// rt.spawn(async move {});
374 /// ```
375 ///
376 /// ## Current thread runtime (will only run on the current thread via `Runtime::block_on`)
377 ///
378 /// ```
379 /// use tokio::runtime;
380 ///
381 /// // Create a runtime that _must_ be driven from a call
382 /// // to `Runtime::block_on`.
383 /// let rt = runtime::Builder::new_current_thread()
384 /// .build()
385 /// .unwrap();
386 ///
387 /// // This will run the runtime and future on the current thread
388 /// rt.block_on(async move {});
389 /// ```
390 ///
391 /// # Panics
392 ///
393 /// This will panic if `val` is not larger than `0`.
394 #[track_caller]
395 pub fn worker_threads(&mut self, val: usize) -> &mut Self {
396 assert!(val > 0, "Worker threads cannot be set to 0");
397 self.worker_threads = Some(val);
398 self
399 }
400
401 /// Specifies the limit for additional threads spawned by the Runtime.
402 ///
403 /// These threads are used for blocking operations like tasks spawned
404 /// through [`spawn_blocking`], this includes but is not limited to:
405 /// - [`fs`] operations
406 /// - dns resolution through [`ToSocketAddrs`]
407 /// - writing to [`Stdout`] or [`Stderr`]
408 /// - reading from [`Stdin`]
409 ///
410 /// Unlike the [`worker_threads`], they are not always active and will exit
411 /// if left idle for too long. You can change this timeout duration with [`thread_keep_alive`].
412 ///
413 /// It's recommended to not set this limit too low in order to avoid hanging on operations
414 /// requiring [`spawn_blocking`].
415 ///
416 /// The default value is 512.
417 ///
418 /// # Queue Behavior
419 ///
420 /// When a blocking task is submitted, it will be inserted into a queue. If available, one of
421 /// the idle threads will be notified to run the task. Otherwise, if the threshold set by this
422 /// method has not been reached, a new thread will be spawned. If no idle thread is available
423 /// and no more threads are allowed to be spawned, the task will remain in the queue until one
424 /// of the busy threads pick it up. Note that since the queue does not apply any backpressure,
425 /// it could potentially grow unbounded.
426 ///
427 /// # Panics
428 ///
429 /// This will panic if `val` is not larger than `0`.
430 ///
431 /// # Upgrading from 0.x
432 ///
433 /// In old versions `max_threads` limited both blocking and worker threads, but the
434 /// current `max_blocking_threads` does not include async worker threads in the count.
435 ///
436 /// [`spawn_blocking`]: fn@crate::task::spawn_blocking
437 /// [`fs`]: mod@crate::fs
438 /// [`ToSocketAddrs`]: trait@crate::net::ToSocketAddrs
439 /// [`Stdout`]: struct@crate::io::Stdout
440 /// [`Stdin`]: struct@crate::io::Stdin
441 /// [`Stderr`]: struct@crate::io::Stderr
442 /// [`worker_threads`]: Self::worker_threads
443 /// [`thread_keep_alive`]: Self::thread_keep_alive
444 #[track_caller]
445 #[cfg_attr(docsrs, doc(alias = "max_threads"))]
446 pub fn max_blocking_threads(&mut self, val: usize) -> &mut Self {
447 assert!(val > 0, "Max blocking threads cannot be set to 0");
448 self.max_blocking_threads = val;
449 self
450 }
451
452 /// Sets name of threads spawned by the `Runtime`'s thread pool.
453 ///
454 /// The default name is "tokio-runtime-worker".
455 ///
456 /// # Examples
457 ///
458 /// ```
459 /// # use tokio::runtime;
460 ///
461 /// # pub fn main() {
462 /// let rt = runtime::Builder::new_multi_thread()
463 /// .thread_name("my-pool")
464 /// .build();
465 /// # }
466 /// ```
467 pub fn thread_name(&mut self, val: impl Into<String>) -> &mut Self {
468 let val = val.into();
469 self.thread_name = std::sync::Arc::new(move || val.clone());
470 self
471 }
472
473 /// Sets a function used to generate the name of threads spawned by the `Runtime`'s thread pool.
474 ///
475 /// The default name fn is `|| "tokio-runtime-worker".into()`.
476 ///
477 /// # Examples
478 ///
479 /// ```
480 /// # use tokio::runtime;
481 /// # use std::sync::atomic::{AtomicUsize, Ordering};
482 /// # pub fn main() {
483 /// let rt = runtime::Builder::new_multi_thread()
484 /// .thread_name_fn(|| {
485 /// static ATOMIC_ID: AtomicUsize = AtomicUsize::new(0);
486 /// let id = ATOMIC_ID.fetch_add(1, Ordering::SeqCst);
487 /// format!("my-pool-{}", id)
488 /// })
489 /// .build();
490 /// # }
491 /// ```
492 pub fn thread_name_fn<F>(&mut self, f: F) -> &mut Self
493 where
494 F: Fn() -> String + Send + Sync + 'static,
495 {
496 self.thread_name = std::sync::Arc::new(f);
497 self
498 }
499
500 /// Sets the stack size (in bytes) for worker threads.
501 ///
502 /// The actual stack size may be greater than this value if the platform
503 /// specifies minimal stack size.
504 ///
505 /// The default stack size for spawned threads is 2 MiB, though this
506 /// particular stack size is subject to change in the future.
507 ///
508 /// # Examples
509 ///
510 /// ```
511 /// # use tokio::runtime;
512 ///
513 /// # pub fn main() {
514 /// let rt = runtime::Builder::new_multi_thread()
515 /// .thread_stack_size(32 * 1024)
516 /// .build();
517 /// # }
518 /// ```
519 pub fn thread_stack_size(&mut self, val: usize) -> &mut Self {
520 self.thread_stack_size = Some(val);
521 self
522 }
523
524 /// Executes function `f` after each thread is started but before it starts
525 /// doing work.
526 ///
527 /// This is intended for bookkeeping and monitoring use cases.
528 ///
529 /// # Examples
530 ///
531 /// ```
532 /// # use tokio::runtime;
533 /// # pub fn main() {
534 /// let runtime = runtime::Builder::new_multi_thread()
535 /// .on_thread_start(|| {
536 /// println!("thread started");
537 /// })
538 /// .build();
539 /// # }
540 /// ```
541 #[cfg(not(loom))]
542 pub fn on_thread_start<F>(&mut self, f: F) -> &mut Self
543 where
544 F: Fn() + Send + Sync + 'static,
545 {
546 self.after_start = Some(std::sync::Arc::new(f));
547 self
548 }
549
550 /// Executes function `f` before each thread stops.
551 ///
552 /// This is intended for bookkeeping and monitoring use cases.
553 ///
554 /// # Examples
555 ///
556 /// ```
557 /// # use tokio::runtime;
558 /// # pub fn main() {
559 /// let runtime = runtime::Builder::new_multi_thread()
560 /// .on_thread_stop(|| {
561 /// println!("thread stopping");
562 /// })
563 /// .build();
564 /// # }
565 /// ```
566 #[cfg(not(loom))]
567 pub fn on_thread_stop<F>(&mut self, f: F) -> &mut Self
568 where
569 F: Fn() + Send + Sync + 'static,
570 {
571 self.before_stop = Some(std::sync::Arc::new(f));
572 self
573 }
574
575 /// Executes function `f` just before a thread is parked (goes idle).
576 /// `f` is called within the Tokio context, so functions like [`tokio::spawn`](crate::spawn)
577 /// can be called, and may result in this thread being unparked immediately.
578 ///
579 /// This can be used to start work only when the executor is idle, or for bookkeeping
580 /// and monitoring purposes.
581 ///
582 /// Note: There can only be one park callback for a runtime; calling this function
583 /// more than once replaces the last callback defined, rather than adding to it.
584 ///
585 /// # Examples
586 ///
587 /// ## Multithreaded executor
588 /// ```
589 /// # use std::sync::Arc;
590 /// # use std::sync::atomic::{AtomicBool, Ordering};
591 /// # use tokio::runtime;
592 /// # use tokio::sync::Barrier;
593 /// # pub fn main() {
594 /// let once = AtomicBool::new(true);
595 /// let barrier = Arc::new(Barrier::new(2));
596 ///
597 /// let runtime = runtime::Builder::new_multi_thread()
598 /// .worker_threads(1)
599 /// .on_thread_park({
600 /// let barrier = barrier.clone();
601 /// move || {
602 /// let barrier = barrier.clone();
603 /// if once.swap(false, Ordering::Relaxed) {
604 /// tokio::spawn(async move { barrier.wait().await; });
605 /// }
606 /// }
607 /// })
608 /// .build()
609 /// .unwrap();
610 ///
611 /// runtime.block_on(async {
612 /// barrier.wait().await;
613 /// })
614 /// # }
615 /// ```
616 /// ## Current thread executor
617 /// ```
618 /// # use std::sync::Arc;
619 /// # use std::sync::atomic::{AtomicBool, Ordering};
620 /// # use tokio::runtime;
621 /// # use tokio::sync::Barrier;
622 /// # pub fn main() {
623 /// let once = AtomicBool::new(true);
624 /// let barrier = Arc::new(Barrier::new(2));
625 ///
626 /// let runtime = runtime::Builder::new_current_thread()
627 /// .on_thread_park({
628 /// let barrier = barrier.clone();
629 /// move || {
630 /// let barrier = barrier.clone();
631 /// if once.swap(false, Ordering::Relaxed) {
632 /// tokio::spawn(async move { barrier.wait().await; });
633 /// }
634 /// }
635 /// })
636 /// .build()
637 /// .unwrap();
638 ///
639 /// runtime.block_on(async {
640 /// barrier.wait().await;
641 /// })
642 /// # }
643 /// ```
644 #[cfg(not(loom))]
645 pub fn on_thread_park<F>(&mut self, f: F) -> &mut Self
646 where
647 F: Fn() + Send + Sync + 'static,
648 {
649 self.before_park = Some(std::sync::Arc::new(f));
650 self
651 }
652
653 /// Executes function `f` just after a thread unparks (starts executing tasks).
654 ///
655 /// This is intended for bookkeeping and monitoring use cases; note that work
656 /// in this callback will increase latencies when the application has allowed one or
657 /// more runtime threads to go idle.
658 ///
659 /// Note: There can only be one unpark callback for a runtime; calling this function
660 /// more than once replaces the last callback defined, rather than adding to it.
661 ///
662 /// # Examples
663 ///
664 /// ```
665 /// # use tokio::runtime;
666 /// # pub fn main() {
667 /// let runtime = runtime::Builder::new_multi_thread()
668 /// .on_thread_unpark(|| {
669 /// println!("thread unparking");
670 /// })
671 /// .build();
672 ///
673 /// runtime.unwrap().block_on(async {
674 /// tokio::task::yield_now().await;
675 /// println!("Hello from Tokio!");
676 /// })
677 /// # }
678 /// ```
679 #[cfg(not(loom))]
680 pub fn on_thread_unpark<F>(&mut self, f: F) -> &mut Self
681 where
682 F: Fn() + Send + Sync + 'static,
683 {
684 self.after_unpark = Some(std::sync::Arc::new(f));
685 self
686 }
687
688 /// Executes function `f` just before a task is spawned.
689 ///
690 /// `f` is called within the Tokio context, so functions like
691 /// [`tokio::spawn`](crate::spawn) can be called, and may result in this callback being
692 /// invoked immediately.
693 ///
694 /// This can be used for bookkeeping or monitoring purposes.
695 ///
696 /// Note: There can only be one spawn callback for a runtime; calling this function more
697 /// than once replaces the last callback defined, rather than adding to it.
698 ///
699 /// This *does not* support [`LocalSet`](crate::task::LocalSet) at this time.
700 ///
701 /// **Note**: This is an [unstable API][unstable]. The public API of this type
702 /// may break in 1.x releases. See [the documentation on unstable
703 /// features][unstable] for details.
704 ///
705 /// [unstable]: crate#unstable-features
706 ///
707 /// # Examples
708 ///
709 /// ```
710 /// # use tokio::runtime;
711 /// # pub fn main() {
712 /// let runtime = runtime::Builder::new_current_thread()
713 /// .on_task_spawn(|_| {
714 /// println!("spawning task");
715 /// })
716 /// .build()
717 /// .unwrap();
718 ///
719 /// runtime.block_on(async {
720 /// tokio::task::spawn(std::future::ready(()));
721 ///
722 /// for _ in 0..64 {
723 /// tokio::task::yield_now().await;
724 /// }
725 /// })
726 /// # }
727 /// ```
728 #[cfg(all(not(loom), tokio_unstable))]
729 #[cfg_attr(docsrs, doc(cfg(tokio_unstable)))]
730 pub fn on_task_spawn<F>(&mut self, f: F) -> &mut Self
731 where
732 F: Fn(&TaskMeta<'_>) + Send + Sync + 'static,
733 {
734 self.before_spawn = Some(std::sync::Arc::new(f));
735 self
736 }
737
738 /// Executes function `f` just before a task is polled
739 ///
740 /// `f` is called within the Tokio context, so functions like
741 /// [`tokio::spawn`](crate::spawn) can be called, and may result in this callback being
742 /// invoked immediately.
743 ///
744 /// **Note**: This is an [unstable API][unstable]. The public API of this type
745 /// may break in 1.x releases. See [the documentation on unstable
746 /// features][unstable] for details.
747 ///
748 /// [unstable]: crate#unstable-features
749 ///
750 /// # Examples
751 ///
752 /// ```
753 /// # use std::sync::{atomic::AtomicUsize, Arc};
754 /// # use tokio::task::yield_now;
755 /// # pub fn main() {
756 /// let poll_start_counter = Arc::new(AtomicUsize::new(0));
757 /// let poll_start = poll_start_counter.clone();
758 /// let rt = tokio::runtime::Builder::new_multi_thread()
759 /// .enable_all()
760 /// .on_before_task_poll(move |meta| {
761 /// println!("task {} is about to be polled", meta.id())
762 /// })
763 /// .build()
764 /// .unwrap();
765 /// let task = rt.spawn(async {
766 /// yield_now().await;
767 /// });
768 /// let _ = rt.block_on(task);
769 ///
770 /// # }
771 /// ```
772 #[cfg(tokio_unstable)]
773 #[cfg_attr(docsrs, doc(cfg(tokio_unstable)))]
774 pub fn on_before_task_poll<F>(&mut self, f: F) -> &mut Self
775 where
776 F: Fn(&TaskMeta<'_>) + Send + Sync + 'static,
777 {
778 self.before_poll = Some(std::sync::Arc::new(f));
779 self
780 }
781
782 /// Executes function `f` just after a task is polled
783 ///
784 /// `f` is called within the Tokio context, so functions like
785 /// [`tokio::spawn`](crate::spawn) can be called, and may result in this callback being
786 /// invoked immediately.
787 ///
788 /// **Note**: This is an [unstable API][unstable]. The public API of this type
789 /// may break in 1.x releases. See [the documentation on unstable
790 /// features][unstable] for details.
791 ///
792 /// [unstable]: crate#unstable-features
793 ///
794 /// # Examples
795 ///
796 /// ```
797 /// # use std::sync::{atomic::AtomicUsize, Arc};
798 /// # use tokio::task::yield_now;
799 /// # pub fn main() {
800 /// let poll_stop_counter = Arc::new(AtomicUsize::new(0));
801 /// let poll_stop = poll_stop_counter.clone();
802 /// let rt = tokio::runtime::Builder::new_multi_thread()
803 /// .enable_all()
804 /// .on_after_task_poll(move |meta| {
805 /// println!("task {} completed polling", meta.id());
806 /// })
807 /// .build()
808 /// .unwrap();
809 /// let task = rt.spawn(async {
810 /// yield_now().await;
811 /// });
812 /// let _ = rt.block_on(task);
813 ///
814 /// # }
815 /// ```
816 #[cfg(tokio_unstable)]
817 #[cfg_attr(docsrs, doc(cfg(tokio_unstable)))]
818 pub fn on_after_task_poll<F>(&mut self, f: F) -> &mut Self
819 where
820 F: Fn(&TaskMeta<'_>) + Send + Sync + 'static,
821 {
822 self.after_poll = Some(std::sync::Arc::new(f));
823 self
824 }
825
826 /// Executes function `f` just after a task is terminated.
827 ///
828 /// `f` is called within the Tokio context, so functions like
829 /// [`tokio::spawn`](crate::spawn) can be called.
830 ///
831 /// This can be used for bookkeeping or monitoring purposes.
832 ///
833 /// Note: There can only be one task termination callback for a runtime; calling this
834 /// function more than once replaces the last callback defined, rather than adding to it.
835 ///
836 /// This *does not* support [`LocalSet`](crate::task::LocalSet) at this time.
837 ///
838 /// **Note**: This is an [unstable API][unstable]. The public API of this type
839 /// may break in 1.x releases. See [the documentation on unstable
840 /// features][unstable] for details.
841 ///
842 /// [unstable]: crate#unstable-features
843 ///
844 /// # Examples
845 ///
846 /// ```
847 /// # use tokio::runtime;
848 /// # pub fn main() {
849 /// let runtime = runtime::Builder::new_current_thread()
850 /// .on_task_terminate(|_| {
851 /// println!("killing task");
852 /// })
853 /// .build()
854 /// .unwrap();
855 ///
856 /// runtime.block_on(async {
857 /// tokio::task::spawn(std::future::ready(()));
858 ///
859 /// for _ in 0..64 {
860 /// tokio::task::yield_now().await;
861 /// }
862 /// })
863 /// # }
864 /// ```
865 #[cfg(all(not(loom), tokio_unstable))]
866 #[cfg_attr(docsrs, doc(cfg(tokio_unstable)))]
867 pub fn on_task_terminate<F>(&mut self, f: F) -> &mut Self
868 where
869 F: Fn(&TaskMeta<'_>) + Send + Sync + 'static,
870 {
871 self.after_termination = Some(std::sync::Arc::new(f));
872 self
873 }
874
875 /// Creates the configured `Runtime`.
876 ///
877 /// The returned `Runtime` instance is ready to spawn tasks.
878 ///
879 /// # Examples
880 ///
881 /// ```
882 /// use tokio::runtime::Builder;
883 ///
884 /// let rt = Builder::new_multi_thread().build().unwrap();
885 ///
886 /// rt.block_on(async {
887 /// println!("Hello from the Tokio runtime");
888 /// });
889 /// ```
890 pub fn build(&mut self) -> io::Result<Runtime> {
891 match &self.kind {
892 Kind::CurrentThread => self.build_current_thread_runtime(),
893 #[cfg(feature = "rt-multi-thread")]
894 Kind::MultiThread => self.build_threaded_runtime(),
895 }
896 }
897
898 /// Creates the configured `LocalRuntime`.
899 ///
900 /// The returned `LocalRuntime` instance is ready to spawn tasks.
901 ///
902 /// # Panics
903 /// This will panic if `current_thread` is not the selected runtime flavor.
904 /// All other runtime flavors are unsupported by [`LocalRuntime`].
905 ///
906 /// [`LocalRuntime`]: [crate::runtime::LocalRuntime]
907 ///
908 /// # Examples
909 ///
910 /// ```
911 /// use tokio::runtime::Builder;
912 ///
913 /// let rt = Builder::new_current_thread().build_local(&mut Default::default()).unwrap();
914 ///
915 /// rt.block_on(async {
916 /// println!("Hello from the Tokio runtime");
917 /// });
918 /// ```
919 #[allow(unused_variables, unreachable_patterns)]
920 #[cfg(tokio_unstable)]
921 #[cfg_attr(docsrs, doc(cfg(tokio_unstable)))]
922 pub fn build_local(&mut self, options: &LocalOptions) -> io::Result<LocalRuntime> {
923 match &self.kind {
924 Kind::CurrentThread => self.build_current_thread_local_runtime(),
925 _ => panic!("Only current_thread is supported when building a local runtime"),
926 }
927 }
928
929 fn get_cfg(&self) -> driver::Cfg {
930 driver::Cfg {
931 enable_pause_time: match self.kind {
932 Kind::CurrentThread => true,
933 #[cfg(feature = "rt-multi-thread")]
934 Kind::MultiThread => false,
935 },
936 enable_io: self.enable_io,
937 enable_time: self.enable_time,
938 start_paused: self.start_paused,
939 nevents: self.nevents,
940 }
941 }
942
943 /// Sets a custom timeout for a thread in the blocking pool.
944 ///
945 /// By default, the timeout for a thread is set to 10 seconds. This can
946 /// be overridden using `.thread_keep_alive()`.
947 ///
948 /// # Example
949 ///
950 /// ```
951 /// # use tokio::runtime;
952 /// # use std::time::Duration;
953 /// # pub fn main() {
954 /// let rt = runtime::Builder::new_multi_thread()
955 /// .thread_keep_alive(Duration::from_millis(100))
956 /// .build();
957 /// # }
958 /// ```
959 pub fn thread_keep_alive(&mut self, duration: Duration) -> &mut Self {
960 self.keep_alive = Some(duration);
961 self
962 }
963
964 /// Sets the number of scheduler ticks after which the scheduler will poll the global
965 /// task queue.
966 ///
967 /// A scheduler "tick" roughly corresponds to one `poll` invocation on a task.
968 ///
969 /// By default the global queue interval is 31 for the current-thread scheduler. Please see
970 /// [the module documentation] for the default behavior of the multi-thread scheduler.
971 ///
972 /// Schedulers have a local queue of already-claimed tasks, and a global queue of incoming
973 /// tasks. Setting the interval to a smaller value increases the fairness of the scheduler,
974 /// at the cost of more synchronization overhead. That can be beneficial for prioritizing
975 /// getting started on new work, especially if tasks frequently yield rather than complete
976 /// or await on further I/O. Conversely, a higher value prioritizes existing work, and
977 /// is a good choice when most tasks quickly complete polling.
978 ///
979 /// [the module documentation]: crate::runtime#multi-threaded-runtime-behavior-at-the-time-of-writing
980 ///
981 /// # Panics
982 ///
983 /// This function will panic if 0 is passed as an argument.
984 ///
985 /// # Examples
986 ///
987 /// ```
988 /// # use tokio::runtime;
989 /// # pub fn main() {
990 /// let rt = runtime::Builder::new_multi_thread()
991 /// .global_queue_interval(31)
992 /// .build();
993 /// # }
994 /// ```
995 #[track_caller]
996 pub fn global_queue_interval(&mut self, val: u32) -> &mut Self {
997 assert!(val > 0, "global_queue_interval must be greater than 0");
998 self.global_queue_interval = Some(val);
999 self
1000 }
1001
1002 /// Sets the number of scheduler ticks after which the scheduler will poll for
1003 /// external events (timers, I/O, and so on).
1004 ///
1005 /// A scheduler "tick" roughly corresponds to one `poll` invocation on a task.
1006 ///
1007 /// By default, the event interval is `61` for all scheduler types.
1008 ///
1009 /// Setting the event interval determines the effective "priority" of delivering
1010 /// these external events (which may wake up additional tasks), compared to
1011 /// executing tasks that are currently ready to run. A smaller value is useful
1012 /// when tasks frequently spend a long time in polling, or frequently yield,
1013 /// which can result in overly long delays picking up I/O events. Conversely,
1014 /// picking up new events requires extra synchronization and syscall overhead,
1015 /// so if tasks generally complete their polling quickly, a higher event interval
1016 /// will minimize that overhead while still keeping the scheduler responsive to
1017 /// events.
1018 ///
1019 /// # Examples
1020 ///
1021 /// ```
1022 /// # use tokio::runtime;
1023 /// # pub fn main() {
1024 /// let rt = runtime::Builder::new_multi_thread()
1025 /// .event_interval(31)
1026 /// .build();
1027 /// # }
1028 /// ```
1029 pub fn event_interval(&mut self, val: u32) -> &mut Self {
1030 self.event_interval = val;
1031 self
1032 }
1033
1034 cfg_unstable! {
1035 /// Configure how the runtime responds to an unhandled panic on a
1036 /// spawned task.
1037 ///
1038 /// By default, an unhandled panic (i.e. a panic not caught by
1039 /// [`std::panic::catch_unwind`]) has no impact on the runtime's
1040 /// execution. The panic's error value is forwarded to the task's
1041 /// [`JoinHandle`] and all other spawned tasks continue running.
1042 ///
1043 /// The `unhandled_panic` option enables configuring this behavior.
1044 ///
1045 /// * `UnhandledPanic::Ignore` is the default behavior. Panics on
1046 /// spawned tasks have no impact on the runtime's execution.
1047 /// * `UnhandledPanic::ShutdownRuntime` will force the runtime to
1048 /// shutdown immediately when a spawned task panics even if that
1049 /// task's `JoinHandle` has not been dropped. All other spawned tasks
1050 /// will immediately terminate and further calls to
1051 /// [`Runtime::block_on`] will panic.
1052 ///
1053 /// # Panics
1054 /// This method panics if called with [`UnhandledPanic::ShutdownRuntime`]
1055 /// on a runtime other than the current thread runtime.
1056 ///
1057 /// # Unstable
1058 ///
1059 /// This option is currently unstable and its implementation is
1060 /// incomplete. The API may change or be removed in the future. See
1061 /// issue [tokio-rs/tokio#4516] for more details.
1062 ///
1063 /// # Examples
1064 ///
1065 /// The following demonstrates a runtime configured to shutdown on
1066 /// panic. The first spawned task panics and results in the runtime
1067 /// shutting down. The second spawned task never has a chance to
1068 /// execute. The call to `block_on` will panic due to the runtime being
1069 /// forcibly shutdown.
1070 ///
1071 /// ```should_panic
1072 /// use tokio::runtime::{self, UnhandledPanic};
1073 ///
1074 /// # pub fn main() {
1075 /// let rt = runtime::Builder::new_current_thread()
1076 /// .unhandled_panic(UnhandledPanic::ShutdownRuntime)
1077 /// .build()
1078 /// .unwrap();
1079 ///
1080 /// rt.spawn(async { panic!("boom"); });
1081 /// rt.spawn(async {
1082 /// // This task never completes.
1083 /// });
1084 ///
1085 /// rt.block_on(async {
1086 /// // Do some work
1087 /// # loop { tokio::task::yield_now().await; }
1088 /// })
1089 /// # }
1090 /// ```
1091 ///
1092 /// [`JoinHandle`]: struct@crate::task::JoinHandle
1093 /// [tokio-rs/tokio#4516]: https://github.com/tokio-rs/tokio/issues/4516
1094 pub fn unhandled_panic(&mut self, behavior: UnhandledPanic) -> &mut Self {
1095 if !matches!(self.kind, Kind::CurrentThread) && matches!(behavior, UnhandledPanic::ShutdownRuntime) {
1096 panic!("UnhandledPanic::ShutdownRuntime is only supported in current thread runtime");
1097 }
1098
1099 self.unhandled_panic = behavior;
1100 self
1101 }
1102
1103 /// Disables the LIFO task scheduler heuristic.
1104 ///
1105 /// The multi-threaded scheduler includes a heuristic for optimizing
1106 /// message-passing patterns. This heuristic results in the **last**
1107 /// scheduled task being polled first.
1108 ///
1109 /// To implement this heuristic, each worker thread has a slot which
1110 /// holds the task that should be polled next. However, this slot cannot
1111 /// be stolen by other worker threads, which can result in lower total
1112 /// throughput when tasks tend to have longer poll times.
1113 ///
1114 /// This configuration option will disable this heuristic resulting in
1115 /// all scheduled tasks being pushed into the worker-local queue, which
1116 /// is stealable.
1117 ///
1118 /// Consider trying this option when the task "scheduled" time is high
1119 /// but the runtime is underutilized. Use [tokio-rs/tokio-metrics] to
1120 /// collect this data.
1121 ///
1122 /// # Unstable
1123 ///
1124 /// This configuration option is considered a workaround for the LIFO
1125 /// slot not being stealable. When the slot becomes stealable, we will
1126 /// revisit whether or not this option is necessary. See
1127 /// issue [tokio-rs/tokio#4941].
1128 ///
1129 /// # Examples
1130 ///
1131 /// ```
1132 /// use tokio::runtime;
1133 ///
1134 /// let rt = runtime::Builder::new_multi_thread()
1135 /// .disable_lifo_slot()
1136 /// .build()
1137 /// .unwrap();
1138 /// ```
1139 ///
1140 /// [tokio-rs/tokio-metrics]: https://github.com/tokio-rs/tokio-metrics
1141 /// [tokio-rs/tokio#4941]: https://github.com/tokio-rs/tokio/issues/4941
1142 pub fn disable_lifo_slot(&mut self) -> &mut Self {
1143 self.disable_lifo_slot = true;
1144 self
1145 }
1146
1147 /// Specifies the random number generation seed to use within all
1148 /// threads associated with the runtime being built.
1149 ///
1150 /// This option is intended to make certain parts of the runtime
1151 /// deterministic (e.g. the [`tokio::select!`] macro). In the case of
1152 /// [`tokio::select!`] it will ensure that the order that branches are
1153 /// polled is deterministic.
1154 ///
1155 /// In addition to the code specifying `rng_seed` and interacting with
1156 /// the runtime, the internals of Tokio and the Rust compiler may affect
1157 /// the sequences of random numbers. In order to ensure repeatable
1158 /// results, the version of Tokio, the versions of all other
1159 /// dependencies that interact with Tokio, and the Rust compiler version
1160 /// should also all remain constant.
1161 ///
1162 /// # Examples
1163 ///
1164 /// ```
1165 /// # use tokio::runtime::{self, RngSeed};
1166 /// # pub fn main() {
1167 /// let seed = RngSeed::from_bytes(b"place your seed here");
1168 /// let rt = runtime::Builder::new_current_thread()
1169 /// .rng_seed(seed)
1170 /// .build();
1171 /// # }
1172 /// ```
1173 ///
1174 /// [`tokio::select!`]: crate::select
1175 pub fn rng_seed(&mut self, seed: RngSeed) -> &mut Self {
1176 self.seed_generator = RngSeedGenerator::new(seed);
1177 self
1178 }
1179 }
1180
1181 cfg_unstable_metrics! {
1182 /// Enables tracking the distribution of task poll times.
1183 ///
1184 /// Task poll times are not instrumented by default as doing so requires
1185 /// calling [`Instant::now()`] twice per task poll, which could add
1186 /// measurable overhead. Use the [`Handle::metrics()`] to access the
1187 /// metrics data.
1188 ///
1189 /// The histogram uses fixed bucket sizes. In other words, the histogram
1190 /// buckets are not dynamic based on input values. Use the
1191 /// `metrics_poll_time_histogram` builder methods to configure the
1192 /// histogram details.
1193 ///
1194 /// By default, a linear histogram with 10 buckets each 100 microseconds wide will be used.
1195 /// This has an extremely low memory footprint, but may not provide enough granularity. For
1196 /// better granularity with low memory usage, use [`metrics_poll_time_histogram_configuration()`]
1197 /// to select [`LogHistogram`] instead.
1198 ///
1199 /// # Examples
1200 ///
1201 /// ```
1202 /// use tokio::runtime;
1203 ///
1204 /// let rt = runtime::Builder::new_multi_thread()
1205 /// .enable_metrics_poll_time_histogram()
1206 /// .build()
1207 /// .unwrap();
1208 /// # // Test default values here
1209 /// # fn us(n: u64) -> std::time::Duration { std::time::Duration::from_micros(n) }
1210 /// # let m = rt.handle().metrics();
1211 /// # assert_eq!(m.poll_time_histogram_num_buckets(), 10);
1212 /// # assert_eq!(m.poll_time_histogram_bucket_range(0), us(0)..us(100));
1213 /// # assert_eq!(m.poll_time_histogram_bucket_range(1), us(100)..us(200));
1214 /// ```
1215 ///
1216 /// [`Handle::metrics()`]: crate::runtime::Handle::metrics
1217 /// [`Instant::now()`]: std::time::Instant::now
1218 /// [`LogHistogram`]: crate::runtime::LogHistogram
1219 /// [`metrics_poll_time_histogram_configuration()`]: Builder::metrics_poll_time_histogram_configuration
1220 pub fn enable_metrics_poll_time_histogram(&mut self) -> &mut Self {
1221 self.metrics_poll_count_histogram_enable = true;
1222 self
1223 }
1224
1225 /// Deprecated. Use [`enable_metrics_poll_time_histogram()`] instead.
1226 ///
1227 /// [`enable_metrics_poll_time_histogram()`]: Builder::enable_metrics_poll_time_histogram
1228 #[deprecated(note = "`poll_count_histogram` related methods have been renamed `poll_time_histogram` to better reflect their functionality.")]
1229 #[doc(hidden)]
1230 pub fn enable_metrics_poll_count_histogram(&mut self) -> &mut Self {
1231 self.enable_metrics_poll_time_histogram()
1232 }
1233
1234 /// Sets the histogram scale for tracking the distribution of task poll
1235 /// times.
1236 ///
1237 /// Tracking the distribution of task poll times can be done using a
1238 /// linear or log scale. When using linear scale, each histogram bucket
1239 /// will represent the same range of poll times. When using log scale,
1240 /// each histogram bucket will cover a range twice as big as the
1241 /// previous bucket.
1242 ///
1243 /// **Default:** linear scale.
1244 ///
1245 /// # Examples
1246 ///
1247 /// ```
1248 /// use tokio::runtime::{self, HistogramScale};
1249 ///
1250 /// # #[allow(deprecated)]
1251 /// let rt = runtime::Builder::new_multi_thread()
1252 /// .enable_metrics_poll_time_histogram()
1253 /// .metrics_poll_count_histogram_scale(HistogramScale::Log)
1254 /// .build()
1255 /// .unwrap();
1256 /// ```
1257 #[deprecated(note = "use `metrics_poll_time_histogram_configuration`")]
1258 pub fn metrics_poll_count_histogram_scale(&mut self, histogram_scale: crate::runtime::HistogramScale) -> &mut Self {
1259 self.metrics_poll_count_histogram.legacy_mut(|b|b.scale = histogram_scale);
1260 self
1261 }
1262
1263 /// Configure the histogram for tracking poll times
1264 ///
1265 /// By default, a linear histogram with 10 buckets each 100 microseconds wide will be used.
1266 /// This has an extremely low memory footprint, but may not provide enough granularity. For
1267 /// better granularity with low memory usage, use [`LogHistogram`] instead.
1268 ///
1269 /// # Examples
1270 /// Configure a [`LogHistogram`] with [default configuration]:
1271 /// ```
1272 /// use tokio::runtime;
1273 /// use tokio::runtime::{HistogramConfiguration, LogHistogram};
1274 ///
1275 /// let rt = runtime::Builder::new_multi_thread()
1276 /// .enable_metrics_poll_time_histogram()
1277 /// .metrics_poll_time_histogram_configuration(
1278 /// HistogramConfiguration::log(LogHistogram::default())
1279 /// )
1280 /// .build()
1281 /// .unwrap();
1282 /// ```
1283 ///
1284 /// Configure a linear histogram with 100 buckets, each 10μs wide
1285 /// ```
1286 /// use tokio::runtime;
1287 /// use std::time::Duration;
1288 /// use tokio::runtime::HistogramConfiguration;
1289 ///
1290 /// let rt = runtime::Builder::new_multi_thread()
1291 /// .enable_metrics_poll_time_histogram()
1292 /// .metrics_poll_time_histogram_configuration(
1293 /// HistogramConfiguration::linear(Duration::from_micros(10), 100)
1294 /// )
1295 /// .build()
1296 /// .unwrap();
1297 /// ```
1298 ///
1299 /// Configure a [`LogHistogram`] with the following settings:
1300 /// - Measure times from 100ns to 120s
1301 /// - Max error of 0.1
1302 /// - No more than 1024 buckets
1303 /// ```
1304 /// use std::time::Duration;
1305 /// use tokio::runtime;
1306 /// use tokio::runtime::{HistogramConfiguration, LogHistogram};
1307 ///
1308 /// let rt = runtime::Builder::new_multi_thread()
1309 /// .enable_metrics_poll_time_histogram()
1310 /// .metrics_poll_time_histogram_configuration(
1311 /// HistogramConfiguration::log(LogHistogram::builder()
1312 /// .max_value(Duration::from_secs(120))
1313 /// .min_value(Duration::from_nanos(100))
1314 /// .max_error(0.1)
1315 /// .max_buckets(1024)
1316 /// .expect("configuration uses 488 buckets")
1317 /// )
1318 /// )
1319 /// .build()
1320 /// .unwrap();
1321 /// ```
1322 ///
1323 /// When migrating from the legacy histogram ([`HistogramScale::Log`]) and wanting
1324 /// to match the previous behavior, use `precision_exact(0)`. This creates a histogram
1325 /// where each bucket is twice the size of the previous bucket.
1326 /// ```rust
1327 /// use std::time::Duration;
1328 /// use tokio::runtime::{HistogramConfiguration, LogHistogram};
1329 /// let rt = tokio::runtime::Builder::new_current_thread()
1330 /// .enable_all()
1331 /// .enable_metrics_poll_time_histogram()
1332 /// .metrics_poll_time_histogram_configuration(HistogramConfiguration::log(
1333 /// LogHistogram::builder()
1334 /// .min_value(Duration::from_micros(20))
1335 /// .max_value(Duration::from_millis(4))
1336 /// // Set `precision_exact` to `0` to match `HistogramScale::Log`
1337 /// .precision_exact(0)
1338 /// .max_buckets(10)
1339 /// .unwrap(),
1340 /// ))
1341 /// .build()
1342 /// .unwrap();
1343 /// ```
1344 ///
1345 /// [`LogHistogram`]: crate::runtime::LogHistogram
1346 /// [default configuration]: crate::runtime::LogHistogramBuilder
1347 /// [`HistogramScale::Log`]: crate::runtime::HistogramScale::Log
1348 pub fn metrics_poll_time_histogram_configuration(&mut self, configuration: HistogramConfiguration) -> &mut Self {
1349 self.metrics_poll_count_histogram.histogram_type = configuration.inner;
1350 self
1351 }
1352
1353 /// Sets the histogram resolution for tracking the distribution of task
1354 /// poll times.
1355 ///
1356 /// The resolution is the histogram's first bucket's range. When using a
1357 /// linear histogram scale, each bucket will cover the same range. When
1358 /// using a log scale, each bucket will cover a range twice as big as
1359 /// the previous bucket. In the log case, the resolution represents the
1360 /// smallest bucket range.
1361 ///
1362 /// Note that, when using log scale, the resolution is rounded up to the
1363 /// nearest power of 2 in nanoseconds.
1364 ///
1365 /// **Default:** 100 microseconds.
1366 ///
1367 /// # Examples
1368 ///
1369 /// ```
1370 /// use tokio::runtime;
1371 /// use std::time::Duration;
1372 ///
1373 /// # #[allow(deprecated)]
1374 /// let rt = runtime::Builder::new_multi_thread()
1375 /// .enable_metrics_poll_time_histogram()
1376 /// .metrics_poll_count_histogram_resolution(Duration::from_micros(100))
1377 /// .build()
1378 /// .unwrap();
1379 /// ```
1380 #[deprecated(note = "use `metrics_poll_time_histogram_configuration`")]
1381 pub fn metrics_poll_count_histogram_resolution(&mut self, resolution: Duration) -> &mut Self {
1382 assert!(resolution > Duration::from_secs(0));
1383 // Sanity check the argument and also make the cast below safe.
1384 assert!(resolution <= Duration::from_secs(1));
1385
1386 let resolution = resolution.as_nanos() as u64;
1387
1388 self.metrics_poll_count_histogram.legacy_mut(|b|b.resolution = resolution);
1389 self
1390 }
1391
1392 /// Sets the number of buckets for the histogram tracking the
1393 /// distribution of task poll times.
1394 ///
1395 /// The last bucket tracks all greater values that fall out of other
1396 /// ranges. So, configuring the histogram using a linear scale,
1397 /// resolution of 50ms, and 10 buckets, the 10th bucket will track task
1398 /// polls that take more than 450ms to complete.
1399 ///
1400 /// **Default:** 10
1401 ///
1402 /// # Examples
1403 ///
1404 /// ```
1405 /// use tokio::runtime;
1406 ///
1407 /// # #[allow(deprecated)]
1408 /// let rt = runtime::Builder::new_multi_thread()
1409 /// .enable_metrics_poll_time_histogram()
1410 /// .metrics_poll_count_histogram_buckets(15)
1411 /// .build()
1412 /// .unwrap();
1413 /// ```
1414 #[deprecated(note = "use `metrics_poll_time_histogram_configuration`")]
1415 pub fn metrics_poll_count_histogram_buckets(&mut self, buckets: usize) -> &mut Self {
1416 self.metrics_poll_count_histogram.legacy_mut(|b|b.num_buckets = buckets);
1417 self
1418 }
1419 }
1420
1421 fn build_current_thread_runtime(&mut self) -> io::Result<Runtime> {
1422 use crate::runtime::runtime::Scheduler;
1423
1424 let (scheduler, handle, blocking_pool) =
1425 self.build_current_thread_runtime_components(None)?;
1426
1427 Ok(Runtime::from_parts(
1428 Scheduler::CurrentThread(scheduler),
1429 handle,
1430 blocking_pool,
1431 ))
1432 }
1433
1434 #[cfg(tokio_unstable)]
1435 fn build_current_thread_local_runtime(&mut self) -> io::Result<LocalRuntime> {
1436 use crate::runtime::local_runtime::LocalRuntimeScheduler;
1437
1438 let tid = std::thread::current().id();
1439
1440 let (scheduler, handle, blocking_pool) =
1441 self.build_current_thread_runtime_components(Some(tid))?;
1442
1443 Ok(LocalRuntime::from_parts(
1444 LocalRuntimeScheduler::CurrentThread(scheduler),
1445 handle,
1446 blocking_pool,
1447 ))
1448 }
1449
1450 fn build_current_thread_runtime_components(
1451 &mut self,
1452 local_tid: Option<ThreadId>,
1453 ) -> io::Result<(CurrentThread, Handle, BlockingPool)> {
1454 use crate::runtime::scheduler;
1455 use crate::runtime::Config;
1456
1457 let (driver, driver_handle) = driver::Driver::new(self.get_cfg())?;
1458
1459 // Blocking pool
1460 let blocking_pool = blocking::create_blocking_pool(self, self.max_blocking_threads);
1461 let blocking_spawner = blocking_pool.spawner().clone();
1462
1463 // Generate a rng seed for this runtime.
1464 let seed_generator_1 = self.seed_generator.next_generator();
1465 let seed_generator_2 = self.seed_generator.next_generator();
1466
1467 // And now put a single-threaded scheduler on top of the timer. When
1468 // there are no futures ready to do something, it'll let the timer or
1469 // the reactor to generate some new stimuli for the futures to continue
1470 // in their life.
1471 let (scheduler, handle) = CurrentThread::new(
1472 driver,
1473 driver_handle,
1474 blocking_spawner,
1475 seed_generator_2,
1476 Config {
1477 before_park: self.before_park.clone(),
1478 after_unpark: self.after_unpark.clone(),
1479 before_spawn: self.before_spawn.clone(),
1480 #[cfg(tokio_unstable)]
1481 before_poll: self.before_poll.clone(),
1482 #[cfg(tokio_unstable)]
1483 after_poll: self.after_poll.clone(),
1484 after_termination: self.after_termination.clone(),
1485 global_queue_interval: self.global_queue_interval,
1486 event_interval: self.event_interval,
1487 #[cfg(tokio_unstable)]
1488 unhandled_panic: self.unhandled_panic.clone(),
1489 disable_lifo_slot: self.disable_lifo_slot,
1490 seed_generator: seed_generator_1,
1491 metrics_poll_count_histogram: self.metrics_poll_count_histogram_builder(),
1492 },
1493 local_tid,
1494 );
1495
1496 let handle = Handle {
1497 inner: scheduler::Handle::CurrentThread(handle),
1498 };
1499
1500 Ok((scheduler, handle, blocking_pool))
1501 }
1502
1503 fn metrics_poll_count_histogram_builder(&self) -> Option<HistogramBuilder> {
1504 if self.metrics_poll_count_histogram_enable {
1505 Some(self.metrics_poll_count_histogram.clone())
1506 } else {
1507 None
1508 }
1509 }
1510}
1511
1512cfg_io_driver! {
1513 impl Builder {
1514 /// Enables the I/O driver.
1515 ///
1516 /// Doing this enables using net, process, signal, and some I/O types on
1517 /// the runtime.
1518 ///
1519 /// # Examples
1520 ///
1521 /// ```
1522 /// use tokio::runtime;
1523 ///
1524 /// let rt = runtime::Builder::new_multi_thread()
1525 /// .enable_io()
1526 /// .build()
1527 /// .unwrap();
1528 /// ```
1529 pub fn enable_io(&mut self) -> &mut Self {
1530 self.enable_io = true;
1531 self
1532 }
1533
1534 /// Enables the I/O driver and configures the max number of events to be
1535 /// processed per tick.
1536 ///
1537 /// # Examples
1538 ///
1539 /// ```
1540 /// use tokio::runtime;
1541 ///
1542 /// let rt = runtime::Builder::new_current_thread()
1543 /// .enable_io()
1544 /// .max_io_events_per_tick(1024)
1545 /// .build()
1546 /// .unwrap();
1547 /// ```
1548 pub fn max_io_events_per_tick(&mut self, capacity: usize) -> &mut Self {
1549 self.nevents = capacity;
1550 self
1551 }
1552 }
1553}
1554
1555cfg_time! {
1556 impl Builder {
1557 /// Enables the time driver.
1558 ///
1559 /// Doing this enables using `tokio::time` on the runtime.
1560 ///
1561 /// # Examples
1562 ///
1563 /// ```
1564 /// use tokio::runtime;
1565 ///
1566 /// let rt = runtime::Builder::new_multi_thread()
1567 /// .enable_time()
1568 /// .build()
1569 /// .unwrap();
1570 /// ```
1571 pub fn enable_time(&mut self) -> &mut Self {
1572 self.enable_time = true;
1573 self
1574 }
1575 }
1576}
1577
1578cfg_test_util! {
1579 impl Builder {
1580 /// Controls if the runtime's clock starts paused or advancing.
1581 ///
1582 /// Pausing time requires the current-thread runtime; construction of
1583 /// the runtime will panic otherwise.
1584 ///
1585 /// # Examples
1586 ///
1587 /// ```
1588 /// use tokio::runtime;
1589 ///
1590 /// let rt = runtime::Builder::new_current_thread()
1591 /// .enable_time()
1592 /// .start_paused(true)
1593 /// .build()
1594 /// .unwrap();
1595 /// ```
1596 pub fn start_paused(&mut self, start_paused: bool) -> &mut Self {
1597 self.start_paused = start_paused;
1598 self
1599 }
1600 }
1601}
1602
1603cfg_rt_multi_thread! {
1604 impl Builder {
1605 fn build_threaded_runtime(&mut self) -> io::Result<Runtime> {
1606 use crate::loom::sys::num_cpus;
1607 use crate::runtime::{Config, runtime::Scheduler};
1608 use crate::runtime::scheduler::{self, MultiThread};
1609
1610 let worker_threads = self.worker_threads.unwrap_or_else(num_cpus);
1611
1612 let (driver, driver_handle) = driver::Driver::new(self.get_cfg())?;
1613
1614 // Create the blocking pool
1615 let blocking_pool =
1616 blocking::create_blocking_pool(self, self.max_blocking_threads + worker_threads);
1617 let blocking_spawner = blocking_pool.spawner().clone();
1618
1619 // Generate a rng seed for this runtime.
1620 let seed_generator_1 = self.seed_generator.next_generator();
1621 let seed_generator_2 = self.seed_generator.next_generator();
1622
1623 let (scheduler, handle, launch) = MultiThread::new(
1624 worker_threads,
1625 driver,
1626 driver_handle,
1627 blocking_spawner,
1628 seed_generator_2,
1629 Config {
1630 before_park: self.before_park.clone(),
1631 after_unpark: self.after_unpark.clone(),
1632 before_spawn: self.before_spawn.clone(),
1633 #[cfg(tokio_unstable)]
1634 before_poll: self.before_poll.clone(),
1635 #[cfg(tokio_unstable)]
1636 after_poll: self.after_poll.clone(),
1637 after_termination: self.after_termination.clone(),
1638 global_queue_interval: self.global_queue_interval,
1639 event_interval: self.event_interval,
1640 #[cfg(tokio_unstable)]
1641 unhandled_panic: self.unhandled_panic.clone(),
1642 disable_lifo_slot: self.disable_lifo_slot,
1643 seed_generator: seed_generator_1,
1644 metrics_poll_count_histogram: self.metrics_poll_count_histogram_builder(),
1645 },
1646 );
1647
1648 let handle = Handle { inner: scheduler::Handle::MultiThread(handle) };
1649
1650 // Spawn the thread pool workers
1651 let _enter = handle.enter();
1652 launch.launch();
1653
1654 Ok(Runtime::from_parts(Scheduler::MultiThread(scheduler), handle, blocking_pool))
1655 }
1656 }
1657}
1658
1659impl fmt::Debug for Builder {
1660 fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
1661 fmt.debug_struct("Builder")
1662 .field("worker_threads", &self.worker_threads)
1663 .field("max_blocking_threads", &self.max_blocking_threads)
1664 .field(
1665 "thread_name",
1666 &"<dyn Fn() -> String + Send + Sync + 'static>",
1667 )
1668 .field("thread_stack_size", &self.thread_stack_size)
1669 .field("after_start", &self.after_start.as_ref().map(|_| "..."))
1670 .field("before_stop", &self.before_stop.as_ref().map(|_| "..."))
1671 .field("before_park", &self.before_park.as_ref().map(|_| "..."))
1672 .field("after_unpark", &self.after_unpark.as_ref().map(|_| "..."))
1673 .finish()
1674 }
1675}