1#[cfg(not(feature = "tokio"))]
2use async_io::Async;
3use event_listener::Event;
4use static_assertions::assert_impl_all;
5#[cfg(not(feature = "tokio"))]
6use std::net::TcpStream;
7#[cfg(all(unix, not(feature = "tokio")))]
8use std::os::unix::net::UnixStream;
9use std::{
10 collections::{HashMap, HashSet, VecDeque},
11 convert::TryInto,
12 sync::Arc,
13};
14#[cfg(feature = "tokio")]
15use tokio::net::TcpStream;
16#[cfg(all(unix, feature = "tokio"))]
17use tokio::net::UnixStream;
18#[cfg(feature = "tokio-vsock")]
19use tokio_vsock::VsockStream;
20#[cfg(windows)]
21use uds_windows::UnixStream;
22#[cfg(all(feature = "vsock", not(feature = "tokio")))]
23use vsock::VsockStream;
24
25use zvariant::{ObjectPath, Str};
26
27use crate::{
28 address::{self, Address},
29 async_lock::RwLock,
30 handshake,
31 names::{InterfaceName, UniqueName, WellKnownName},
32 raw::Socket,
33 AuthMechanism, Authenticated, Connection, Error, Executor, Guid, Interface, Result,
34};
35
36const DEFAULT_MAX_QUEUED: usize = 64;
37
38#[derive(Debug)]
39enum Target {
40 UnixStream(UnixStream),
41 TcpStream(TcpStream),
42 #[cfg(any(
43 all(feature = "vsock", not(feature = "tokio")),
44 feature = "tokio-vsock"
45 ))]
46 VsockStream(VsockStream),
47 Address(Address),
48 Socket(Box<dyn Socket>),
49}
50
51type Interfaces<'a> =
52 HashMap<ObjectPath<'a>, HashMap<InterfaceName<'static>, Arc<RwLock<dyn Interface>>>>;
53
54#[derive(derivative::Derivative)]
56#[derivative(Debug)]
57#[must_use]
58pub struct ConnectionBuilder<'a> {
59 target: Target,
60 max_queued: Option<usize>,
61 guid: Option<&'a Guid>,
62 p2p: bool,
63 internal_executor: bool,
64 #[derivative(Debug = "ignore")]
65 interfaces: Interfaces<'a>,
66 names: HashSet<WellKnownName<'a>>,
67 auth_mechanisms: Option<VecDeque<AuthMechanism>>,
68 unique_name: Option<UniqueName<'a>>,
69 cookie_context: Option<handshake::CookieContext<'a>>,
70 cookie_id: Option<usize>,
71}
72
73assert_impl_all!(ConnectionBuilder<'_>: Send, Sync, Unpin);
74
75impl<'a> ConnectionBuilder<'a> {
76 pub fn session() -> Result<Self> {
78 Ok(Self::new(Target::Address(Address::session()?)))
79 }
80
81 pub fn system() -> Result<Self> {
83 Ok(Self::new(Target::Address(Address::system()?)))
84 }
85
86 pub fn address<A>(address: A) -> Result<Self>
118 where
119 A: TryInto<Address>,
120 A::Error: Into<Error>,
121 {
122 Ok(Self::new(Target::Address(
123 address.try_into().map_err(Into::into)?,
124 )))
125 }
126
127 pub fn unix_stream(stream: UnixStream) -> Self {
133 Self::new(Target::UnixStream(stream))
134 }
135
136 pub fn tcp_stream(stream: TcpStream) -> Self {
142 Self::new(Target::TcpStream(stream))
143 }
144
145 #[cfg(any(
151 all(feature = "vsock", not(feature = "tokio")),
152 feature = "tokio-vsock"
153 ))]
154 pub fn vsock_stream(stream: VsockStream) -> Self {
155 Self::new(Target::VsockStream(stream))
156 }
157
158 pub fn socket<S: Socket + 'static>(socket: S) -> Self {
160 Self::new(Target::Socket(Box::new(socket)))
161 }
162
163 pub fn auth_mechanisms(mut self, auth_mechanisms: &[AuthMechanism]) -> Self {
165 self.auth_mechanisms = Some(VecDeque::from(auth_mechanisms.to_vec()));
166
167 self
168 }
169
170 pub fn cookie_context<C>(mut self, context: C) -> Result<Self>
181 where
182 C: Into<Str<'a>>,
183 {
184 self.cookie_context = Some(context.into().try_into()?);
185
186 Ok(self)
187 }
188
189 pub fn cookie_id(mut self, id: usize) -> Self {
196 self.cookie_id = Some(id);
197
198 self
199 }
200
201 pub fn p2p(mut self) -> Self {
203 self.p2p = true;
204
205 self
206 }
207
208 pub fn server(mut self, guid: &'a Guid) -> Self {
213 self.guid = Some(guid);
214
215 self
216 }
217
218 pub fn max_queued(mut self, max: usize) -> Self {
244 self.max_queued = Some(max);
245
246 self
247 }
248
249 pub fn internal_executor(mut self, enabled: bool) -> Self {
255 self.internal_executor = enabled;
256
257 self
258 }
259
260 pub fn serve_at<P, I>(mut self, path: P, iface: I) -> Result<Self>
267 where
268 I: Interface,
269 P: TryInto<ObjectPath<'a>>,
270 P::Error: Into<Error>,
271 {
272 let path = path.try_into().map_err(Into::into)?;
273 let entry = self.interfaces.entry(path).or_default();
274 entry.insert(I::name(), Arc::new(RwLock::new(iface)));
275
276 Ok(self)
277 }
278
279 pub fn name<W>(mut self, well_known_name: W) -> Result<Self>
286 where
287 W: TryInto<WellKnownName<'a>>,
288 W::Error: Into<Error>,
289 {
290 let well_known_name = well_known_name.try_into().map_err(Into::into)?;
291 self.names.insert(well_known_name);
292
293 Ok(self)
294 }
295
296 pub fn unique_name<U>(mut self, unique_name: U) -> Result<Self>
305 where
306 U: TryInto<UniqueName<'a>>,
307 U::Error: Into<Error>,
308 {
309 if !self.p2p {
310 panic!("unique name can only be set for peer-to-peer connections");
311 }
312 let name = unique_name.try_into().map_err(Into::into)?;
313 self.unique_name = Some(name);
314
315 Ok(self)
316 }
317
318 pub async fn build(self) -> Result<Connection> {
325 let executor = Executor::new();
326 #[cfg(not(feature = "tokio"))]
327 let internal_executor = self.internal_executor;
328 let conn = Box::pin(executor.run(self.build_(executor.clone()))).await?;
330
331 #[cfg(not(feature = "tokio"))]
332 start_internal_executor(&executor, internal_executor)?;
333
334 Ok(conn)
335 }
336
337 async fn build_(self, executor: Executor<'static>) -> Result<Connection> {
338 let stream = match self.target {
339 #[cfg(not(feature = "tokio"))]
340 Target::UnixStream(stream) => Box::new(Async::new(stream)?) as Box<dyn Socket>,
341 #[cfg(all(unix, feature = "tokio"))]
342 Target::UnixStream(stream) => Box::new(stream) as Box<dyn Socket>,
343 #[cfg(all(not(unix), feature = "tokio"))]
344 Target::UnixStream(_) => return Err(Error::Unsupported),
345 #[cfg(not(feature = "tokio"))]
346 Target::TcpStream(stream) => Box::new(Async::new(stream)?) as Box<dyn Socket>,
347 #[cfg(feature = "tokio")]
348 Target::TcpStream(stream) => Box::new(stream) as Box<dyn Socket>,
349 #[cfg(all(feature = "vsock", not(feature = "tokio")))]
350 Target::VsockStream(stream) => Box::new(Async::new(stream)?) as Box<dyn Socket>,
351 #[cfg(feature = "tokio-vsock")]
352 Target::VsockStream(stream) => Box::new(stream) as Box<dyn Socket>,
353 Target::Address(address) => match address.connect().await? {
354 #[cfg(any(unix, not(feature = "tokio")))]
355 address::Stream::Unix(stream) => Box::new(stream) as Box<dyn Socket>,
356 address::Stream::Tcp(stream) => Box::new(stream) as Box<dyn Socket>,
357 #[cfg(any(
358 all(feature = "vsock", not(feature = "tokio")),
359 feature = "tokio-vsock"
360 ))]
361 address::Stream::Vsock(stream) => Box::new(stream) as Box<dyn Socket>,
362 },
363 Target::Socket(stream) => stream,
364 };
365 let auth = match self.guid {
366 None => {
367 Authenticated::client(stream, self.auth_mechanisms).await?
369 }
370 Some(guid) => {
371 if !self.p2p {
372 return Err(Error::Unsupported);
373 }
374
375 #[cfg(unix)]
376 let client_uid = stream.uid()?;
377
378 #[cfg(windows)]
379 let client_sid = stream.peer_sid();
380
381 Authenticated::server(
382 stream,
383 guid.clone(),
384 #[cfg(unix)]
385 client_uid,
386 #[cfg(windows)]
387 client_sid,
388 self.auth_mechanisms,
389 self.cookie_id,
390 self.cookie_context.unwrap_or_default(),
391 )
392 .await?
393 }
394 };
395
396 let mut conn = Connection::new(auth, !self.p2p, executor).await?;
397 conn.set_max_queued(self.max_queued.unwrap_or(DEFAULT_MAX_QUEUED));
398 if let Some(unique_name) = self.unique_name {
399 conn.set_unique_name(unique_name)?;
400 }
401
402 if !self.interfaces.is_empty() {
403 let object_server = conn.sync_object_server(false, None);
404 for (path, interfaces) in self.interfaces {
405 for (name, iface) in interfaces {
406 let future = object_server.at_ready(path.to_owned(), name, || iface);
407 let added = future.await?;
408 assert!(added);
410 }
411 }
412
413 let started_event = Event::new();
414 let listener = started_event.listen();
415 conn.start_object_server(Some(started_event));
416
417 listener.await;
418 }
419
420 conn.init_socket_reader();
422
423 if !self.p2p {
424 conn.hello_bus().await?;
426 }
427
428 for name in self.names {
429 conn.request_name(name).await?;
430 }
431
432 Ok(conn)
433 }
434
435 fn new(target: Target) -> Self {
436 Self {
437 target,
438 p2p: false,
439 max_queued: None,
440 guid: None,
441 internal_executor: true,
442 interfaces: HashMap::new(),
443 names: HashSet::new(),
444 auth_mechanisms: None,
445 unique_name: None,
446 cookie_id: None,
447 cookie_context: None,
448 }
449 }
450}
451
452#[cfg(not(feature = "tokio"))]
457fn start_internal_executor(executor: &Executor<'static>, internal_executor: bool) -> Result<()> {
458 if internal_executor {
459 let executor = executor.clone();
460 std::thread::Builder::new()
461 .name("zbus::Connection executor".into())
462 .spawn(move || {
463 crate::utils::block_on(async move {
464 while !executor.is_empty() {
466 executor.tick().await;
467 }
468 })
469 })?;
470 }
471
472 Ok(())
473}