accesskit_unix/
executor.rs

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
// Copyright 2024 The AccessKit Authors. All rights reserved.
// Licensed under the Apache License, Version 2.0 (found in
// the LICENSE-APACHE file) or the MIT license (found in
// the LICENSE-MIT file), at your option.

// Derived from zbus.
// Copyright 2024 Zeeshan Ali Khan.
// Licensed under the MIT license (found in the LICENSE-MIT file).

#[cfg(not(feature = "tokio"))]
use async_executor::Executor as AsyncExecutor;
#[cfg(not(feature = "tokio"))]
use async_task::Task as AsyncTask;
#[cfg(feature = "tokio")]
use std::marker::PhantomData;
#[cfg(not(feature = "tokio"))]
use std::sync::Arc;
use std::{
    future::Future,
    pin::Pin,
    task::{Context, Poll},
};
#[cfg(feature = "tokio")]
use tokio::task::JoinHandle;

/// A wrapper around the underlying runtime/executor.
///
/// This is used to run asynchronous tasks internally and allows integration with various runtimes.
/// See [`crate::Connection::executor`] for an example of integration with external runtimes.
///
/// **Note:** You can (and should) completely ignore this type when building with `tokio` feature
/// enabled.
#[cfg(not(feature = "tokio"))]
#[derive(Debug, Clone)]
pub(crate) struct Executor<'a> {
    executor: Arc<AsyncExecutor<'a>>,
}
#[cfg(feature = "tokio")]
#[derive(Debug, Clone)]
pub(crate) struct Executor<'a> {
    phantom: PhantomData<&'a ()>,
}

impl<'a> Executor<'a> {
    /// Spawns a task onto the executor.
    pub(crate) fn spawn<T: Send + 'static>(
        &self,
        future: impl Future<Output = T> + Send + 'static,
        #[allow(unused)] name: &str,
    ) -> Task<T> {
        #[cfg(not(feature = "tokio"))]
        {
            Task(Some(self.executor.spawn(future)))
        }

        #[cfg(feature = "tokio")]
        {
            #[cfg(tokio_unstable)]
            {
                Task(Some(
                    tokio::task::Builder::new()
                        .name(name)
                        .spawn(future)
                        // SAFETY: Looking at the code, this call always returns an `Ok`.
                        .unwrap(),
                ))
            }
            #[cfg(not(tokio_unstable))]
            {
                Task(Some(tokio::task::spawn(future)))
            }
        }
    }

    /// Create a new `Executor`.
    pub(crate) fn new() -> Self {
        #[cfg(not(feature = "tokio"))]
        {
            Self {
                executor: Arc::new(AsyncExecutor::new()),
            }
        }

        #[cfg(feature = "tokio")]
        {
            Self {
                phantom: PhantomData,
            }
        }
    }

    /// Runs the executor until the given future completes.
    ///
    /// With `tokio` feature enabled, it just awaits on the `future`.
    pub(crate) async fn run<T>(&self, future: impl Future<Output = T>) -> T {
        #[cfg(not(feature = "tokio"))]
        {
            self.executor.run(future).await
        }
        #[cfg(feature = "tokio")]
        {
            future.await
        }
    }
}

/// A wrapper around the task API of the underlying runtime/executor.
///
/// This follows the semantics of `async_task::Task` on drop:
///
/// * it will be cancelled, rather than detached. For detaching, use the `detach` method.
/// * errors from the task cancellation will will be ignored. If you need to know about task errors,
///   convert the task to a `FallibleTask` using the `fallible` method.
#[cfg(not(feature = "tokio"))]
#[derive(Debug)]
pub(crate) struct Task<T>(Option<AsyncTask<T>>);
#[cfg(feature = "tokio")]
#[derive(Debug)]
pub(crate) struct Task<T>(Option<JoinHandle<T>>);

impl<T> Task<T> {
    /// Detaches the task to let it keep running in the background.
    #[allow(unused_mut)]
    #[allow(unused)]
    pub(crate) fn detach(mut self) {
        #[cfg(not(feature = "tokio"))]
        {
            self.0.take().expect("async_task::Task is none").detach()
        }

        #[cfg(feature = "tokio")]
        {
            self.0.take().expect("tokio::task::JoinHandle is none");
        }
    }
}

impl<T> Drop for Task<T> {
    fn drop(&mut self) {
        #[cfg(feature = "tokio")]
        {
            if let Some(join_handle) = self.0.take() {
                join_handle.abort();
            }
        }
    }
}

impl<T> Future for Task<T> {
    type Output = T;

    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
        #[cfg(not(feature = "tokio"))]
        {
            Pin::new(&mut self.get_mut().0.as_mut().expect("async_task::Task is none")).poll(cx)
        }

        #[cfg(feature = "tokio")]
        {
            Pin::new(
                &mut self
                    .get_mut()
                    .0
                    .as_mut()
                    .expect("tokio::task::JoinHandle is none"),
            )
            .poll(cx)
            .map(|r| r.expect("tokio::task::JoinHandle error"))
        }
    }
}