1#[cfg(not(feature = "tokio"))]
2use async_io::Async;
3use enumflags2::BitFlags;
4use event_listener::Event;
5#[cfg(not(feature = "tokio"))]
6use std::net::TcpStream;
7#[cfg(all(unix, not(feature = "tokio")))]
8use std::os::unix::net::UnixStream;
9use std::{
10 collections::{HashMap, HashSet},
11 vec,
12};
13#[cfg(feature = "tokio")]
14use tokio::net::TcpStream;
15#[cfg(all(unix, feature = "tokio"))]
16use tokio::net::UnixStream;
17#[cfg(feature = "tokio-vsock")]
18use tokio_vsock::VsockStream;
19#[cfg(all(windows, not(feature = "tokio")))]
20use uds_windows::UnixStream;
21#[cfg(all(feature = "vsock", not(feature = "tokio")))]
22use vsock::VsockStream;
23
24use zvariant::ObjectPath;
25
26use crate::{
27 address::{self, Address},
28 fdo::RequestNameFlags,
29 names::{InterfaceName, WellKnownName},
30 object_server::{ArcInterface, Interface},
31 Connection, Error, Executor, Guid, OwnedGuid, Result,
32};
33
34use super::{
35 handshake::{AuthMechanism, Authenticated},
36 socket::{BoxedSplit, ReadHalf, Split, WriteHalf},
37};
38
39const DEFAULT_MAX_QUEUED: usize = 64;
40
41#[derive(Debug)]
42enum Target {
43 #[cfg(any(unix, not(feature = "tokio")))]
44 UnixStream(UnixStream),
45 TcpStream(TcpStream),
46 #[cfg(any(
47 all(feature = "vsock", not(feature = "tokio")),
48 feature = "tokio-vsock"
49 ))]
50 VsockStream(VsockStream),
51 Address(Address),
52 Socket(Split<Box<dyn ReadHalf>, Box<dyn WriteHalf>>),
53 AuthenticatedSocket(Split<Box<dyn ReadHalf>, Box<dyn WriteHalf>>),
54}
55
56type Interfaces<'a> = HashMap<ObjectPath<'a>, HashMap<InterfaceName<'static>, ArcInterface>>;
57
58#[derive(Debug)]
71#[must_use]
72pub struct Builder<'a> {
73 target: Option<Target>,
74 max_queued: Option<usize>,
75 guid: Option<Guid<'a>>,
77 #[cfg(feature = "p2p")]
78 p2p: bool,
79 internal_executor: bool,
80 interfaces: Interfaces<'a>,
81 names: HashSet<WellKnownName<'a>>,
82 auth_mechanism: Option<AuthMechanism>,
83 #[cfg(feature = "bus-impl")]
84 unique_name: Option<crate::names::UniqueName<'a>>,
85 request_name_flags: BitFlags<RequestNameFlags>,
86 method_timeout: Option<std::time::Duration>,
87}
88
89impl<'a> Builder<'a> {
90 pub fn session() -> Result<Self> {
92 Ok(Self::new(Target::Address(Address::session()?)))
93 }
94
95 pub fn system() -> Result<Self> {
97 Ok(Self::new(Target::Address(Address::system()?)))
98 }
99
100 pub fn address<A>(address: A) -> Result<Self>
132 where
133 A: TryInto<Address>,
134 A::Error: Into<Error>,
135 {
136 Ok(Self::new(Target::Address(
137 address.try_into().map_err(Into::into)?,
138 )))
139 }
140
141 #[cfg(any(unix, not(feature = "tokio")))]
152 pub fn unix_stream(stream: UnixStream) -> Self {
153 Self::new(Target::UnixStream(stream))
154 }
155
156 pub fn tcp_stream(stream: TcpStream) -> Self {
162 Self::new(Target::TcpStream(stream))
163 }
164
165 #[cfg(any(
171 all(feature = "vsock", not(feature = "tokio")),
172 feature = "tokio-vsock"
173 ))]
174 pub fn vsock_stream(stream: VsockStream) -> Self {
175 Self::new(Target::VsockStream(stream))
176 }
177
178 pub fn socket<S: Into<BoxedSplit>>(socket: S) -> Self {
180 Self::new(Target::Socket(socket.into()))
181 }
182
183 pub fn authenticated_socket<S, G>(socket: S, guid: G) -> Result<Self>
188 where
189 S: Into<BoxedSplit>,
190 G: TryInto<Guid<'a>>,
191 G::Error: Into<Error>,
192 {
193 let mut builder = Self::new(Target::AuthenticatedSocket(socket.into()));
194 builder.guid = Some(guid.try_into().map_err(Into::into)?);
195
196 Ok(builder)
197 }
198
199 pub fn auth_mechanism(mut self, auth_mechanism: AuthMechanism) -> Self {
201 self.auth_mechanism = Some(auth_mechanism);
202
203 self
204 }
205
206 #[cfg(feature = "p2p")]
210 pub fn p2p(mut self) -> Self {
211 self.p2p = true;
212
213 self
214 }
215
216 #[cfg(feature = "p2p")]
227 pub fn server<G>(mut self, guid: G) -> Result<Self>
228 where
229 G: TryInto<Guid<'a>>,
230 G::Error: Into<Error>,
231 {
232 self.guid = Some(guid.try_into().map_err(Into::into)?);
233
234 Ok(self)
235 }
236
237 pub fn max_queued(mut self, max: usize) -> Self {
263 self.max_queued = Some(max);
264
265 self
266 }
267
268 pub fn internal_executor(mut self, enabled: bool) -> Self {
274 self.internal_executor = enabled;
275
276 self
277 }
278
279 pub fn serve_at<P, I>(mut self, path: P, iface: I) -> Result<Self>
289 where
290 I: Interface,
291 P: TryInto<ObjectPath<'a>>,
292 P::Error: Into<Error>,
293 {
294 let path = path.try_into().map_err(Into::into)?;
295 let entry = self.interfaces.entry(path).or_default();
296 entry.insert(I::name(), ArcInterface::new(iface));
297 Ok(self)
298 }
299
300 pub fn name<W>(mut self, well_known_name: W) -> Result<Self>
310 where
311 W: TryInto<WellKnownName<'a>>,
312 W::Error: Into<Error>,
313 {
314 let well_known_name = well_known_name.try_into().map_err(Into::into)?;
315 self.names.insert(well_known_name);
316
317 Ok(self)
318 }
319
320 pub fn allow_name_replacements(mut self, allow_replacement: bool) -> Self {
323 self.request_name_flags
324 .set(RequestNameFlags::AllowReplacement, allow_replacement);
325 self
326 }
327
328 pub fn replace_existing_names(mut self, replace_existing: bool) -> Self {
331 self.request_name_flags
332 .set(RequestNameFlags::ReplaceExisting, replace_existing);
333 self
334 }
335
336 #[cfg(feature = "bus-impl")]
346 pub fn unique_name<U>(mut self, unique_name: U) -> Result<Self>
347 where
348 U: TryInto<crate::names::UniqueName<'a>>,
349 U::Error: Into<Error>,
350 {
351 if !self.p2p {
352 panic!("unique name can only be set for peer-to-peer connections");
353 }
354 let name = unique_name.try_into().map_err(Into::into)?;
355 self.unique_name = Some(name);
356
357 Ok(self)
358 }
359
360 pub fn method_timeout(mut self, timeout: std::time::Duration) -> Self {
366 self.method_timeout = Some(timeout);
367
368 self
369 }
370
371 pub async fn build(self) -> Result<Connection> {
378 let executor = Executor::new();
379 #[cfg(not(feature = "tokio"))]
380 let internal_executor = self.internal_executor;
381 let conn = Box::pin(executor.run(self.build_(executor.clone()))).await?;
383
384 #[cfg(not(feature = "tokio"))]
385 start_internal_executor(&executor, internal_executor)?;
386
387 Ok(conn)
388 }
389
390 async fn build_(mut self, executor: Executor<'static>) -> Result<Connection> {
391 #[cfg(feature = "p2p")]
392 let is_bus_conn = !self.p2p;
393 #[cfg(not(feature = "p2p"))]
394 let is_bus_conn = true;
395
396 let mut auth = self.connect(is_bus_conn).await?;
397
398 let socket_read = auth.socket_read.take().unwrap();
400 let already_received_bytes = auth.already_received_bytes.drain(..).collect();
401 #[cfg(unix)]
402 let already_received_fds = auth.already_received_fds.drain(..).collect();
403
404 let mut conn = Connection::new(auth, is_bus_conn, executor, self.method_timeout).await?;
405 conn.set_max_queued(self.max_queued.unwrap_or(DEFAULT_MAX_QUEUED));
406
407 if !self.interfaces.is_empty() {
408 let object_server = conn.ensure_object_server(false);
409 for (path, interfaces) in self.interfaces {
410 for (name, iface) in interfaces {
411 let added = object_server
412 .add_arc_interface(path.clone(), name.clone(), iface.clone())
413 .await?;
414 if !added {
415 return Err(Error::InterfaceExists(name.clone(), path.to_owned()));
416 }
417 }
418 }
419
420 let started_event = Event::new();
421 let listener = started_event.listen();
422 conn.start_object_server(Some(started_event));
423
424 listener.await;
425 }
426
427 conn.init_socket_reader(
429 socket_read,
430 already_received_bytes,
431 #[cfg(unix)]
432 already_received_fds,
433 );
434
435 for name in self.names {
436 conn.request_name_with_flags(name, self.request_name_flags)
437 .await?;
438 }
439
440 Ok(conn)
441 }
442
443 fn new(target: Target) -> Self {
444 Self {
445 target: Some(target),
446 #[cfg(feature = "p2p")]
447 p2p: false,
448 max_queued: None,
449 guid: None,
450 internal_executor: true,
451 interfaces: HashMap::new(),
452 names: HashSet::new(),
453 auth_mechanism: None,
454 #[cfg(feature = "bus-impl")]
455 unique_name: None,
456 request_name_flags: BitFlags::default(),
457 method_timeout: None,
458 }
459 }
460
461 async fn connect(&mut self, is_bus_conn: bool) -> Result<Authenticated> {
462 #[cfg(not(feature = "bus-impl"))]
463 let unique_name = None;
464 #[cfg(feature = "bus-impl")]
465 let unique_name = self.unique_name.take().map(Into::into);
466
467 #[allow(unused_mut)]
468 let (mut stream, server_guid, authenticated) = self.target_connect().await?;
469 if authenticated {
470 let (socket_read, socket_write) = stream.take();
471 Ok(Authenticated {
472 #[cfg(unix)]
473 cap_unix_fd: socket_read.can_pass_unix_fd(),
474 socket_read: Some(socket_read),
475 socket_write,
476 server_guid: server_guid.unwrap(),
478 already_received_bytes: vec![],
479 unique_name,
480 #[cfg(unix)]
481 already_received_fds: vec![],
482 })
483 } else {
484 #[cfg(feature = "p2p")]
485 match self.guid.take() {
486 None => {
487 Authenticated::client(stream, server_guid, self.auth_mechanism, is_bus_conn)
489 .await
490 }
491 Some(guid) => {
492 if !self.p2p {
493 return Err(Error::Unsupported);
494 }
495
496 let creds = stream.read_mut().peer_credentials().await?;
497 #[cfg(unix)]
498 let client_uid = creds.unix_user_id();
499 #[cfg(windows)]
500 let client_sid = creds.into_windows_sid();
501
502 Authenticated::server(
503 stream,
504 guid.to_owned().into(),
505 #[cfg(unix)]
506 client_uid,
507 #[cfg(windows)]
508 client_sid,
509 self.auth_mechanism,
510 unique_name,
511 )
512 .await
513 }
514 }
515
516 #[cfg(not(feature = "p2p"))]
517 Authenticated::client(stream, server_guid, self.auth_mechanism, is_bus_conn).await
518 }
519 }
520
521 async fn target_connect(&mut self) -> Result<(BoxedSplit, Option<OwnedGuid>, bool)> {
522 let mut authenticated = false;
523 let mut guid = None;
524 let split = match self.target.take().unwrap() {
527 #[cfg(not(feature = "tokio"))]
528 Target::UnixStream(stream) => Async::new(stream)?.into(),
529 #[cfg(all(unix, feature = "tokio"))]
530 Target::UnixStream(stream) => stream.into(),
531 #[cfg(not(feature = "tokio"))]
532 Target::TcpStream(stream) => Async::new(stream)?.into(),
533 #[cfg(feature = "tokio")]
534 Target::TcpStream(stream) => stream.into(),
535 #[cfg(all(feature = "vsock", not(feature = "tokio")))]
536 Target::VsockStream(stream) => Async::new(stream)?.into(),
537 #[cfg(feature = "tokio-vsock")]
538 Target::VsockStream(stream) => stream.into(),
539 Target::Address(address) => {
540 guid = address.guid().map(|g| g.to_owned().into());
541 match address.connect().await? {
542 #[cfg(any(unix, not(feature = "tokio")))]
543 address::transport::Stream::Unix(stream) => stream.into(),
544 #[cfg(unix)]
545 address::transport::Stream::Unixexec(stream) => stream.into(),
546 address::transport::Stream::Tcp(stream) => stream.into(),
547 #[cfg(any(
548 all(feature = "vsock", not(feature = "tokio")),
549 feature = "tokio-vsock"
550 ))]
551 address::transport::Stream::Vsock(stream) => stream.into(),
552 }
553 }
554 Target::Socket(stream) => stream,
555 Target::AuthenticatedSocket(stream) => {
556 authenticated = true;
557 guid = self.guid.take().map(Into::into);
558 stream
559 }
560 };
561
562 Ok((split, guid, authenticated))
563 }
564}
565
566#[cfg(not(feature = "tokio"))]
571fn start_internal_executor(executor: &Executor<'static>, internal_executor: bool) -> Result<()> {
572 if internal_executor {
573 let executor = executor.clone();
574 std::thread::Builder::new()
575 .name("zbus::Connection executor".into())
576 .spawn(move || {
577 crate::utils::block_on(async move {
578 while !executor.is_empty() {
580 executor.tick().await;
581 }
582 })
583 })?;
584 }
585
586 Ok(())
587}