1#[cfg(not(feature = "tokio"))]
2use async_io::Async;
3use enumflags2::BitFlags;
4use event_listener::Event;
5#[cfg(not(feature = "tokio"))]
6use std::net::TcpStream;
7#[cfg(all(unix, not(feature = "tokio")))]
8use std::os::unix::net::UnixStream;
9use std::{
10 collections::{HashMap, HashSet},
11 vec,
12};
13#[cfg(feature = "tokio")]
14use tokio::net::TcpStream;
15#[cfg(all(unix, feature = "tokio"))]
16use tokio::net::UnixStream;
17#[cfg(feature = "tokio-vsock")]
18use tokio_vsock::VsockStream;
19#[cfg(all(windows, not(feature = "tokio")))]
20use uds_windows::UnixStream;
21#[cfg(all(feature = "vsock", not(feature = "tokio")))]
22use vsock::VsockStream;
23
24use zvariant::ObjectPath;
25
26use crate::{
27 address::{self, Address},
28 fdo::RequestNameFlags,
29 names::{InterfaceName, WellKnownName},
30 object_server::{ArcInterface, Interface},
31 Connection, Error, Executor, Guid, OwnedGuid, Result,
32};
33
34use super::{
35 handshake::{AuthMechanism, Authenticated},
36 socket::{BoxedSplit, ReadHalf, Split, WriteHalf},
37};
38
39const DEFAULT_MAX_QUEUED: usize = 64;
40
41#[derive(Debug)]
42enum Target {
43 #[cfg(any(unix, not(feature = "tokio")))]
44 UnixStream(UnixStream),
45 TcpStream(TcpStream),
46 #[cfg(any(
47 all(feature = "vsock", not(feature = "tokio")),
48 feature = "tokio-vsock"
49 ))]
50 VsockStream(VsockStream),
51 Address(Address),
52 Socket(Split<Box<dyn ReadHalf>, Box<dyn WriteHalf>>),
53 AuthenticatedSocket(Split<Box<dyn ReadHalf>, Box<dyn WriteHalf>>),
54}
55
56type Interfaces<'a> = HashMap<ObjectPath<'a>, HashMap<InterfaceName<'static>, ArcInterface>>;
57
58#[derive(Debug)]
71#[must_use]
72pub struct Builder<'a> {
73 target: Option<Target>,
74 max_queued: Option<usize>,
75 guid: Option<Guid<'a>>,
77 #[cfg(feature = "p2p")]
78 p2p: bool,
79 internal_executor: bool,
80 interfaces: Interfaces<'a>,
81 names: HashSet<WellKnownName<'a>>,
82 auth_mechanism: Option<AuthMechanism>,
83 #[cfg(feature = "bus-impl")]
84 unique_name: Option<crate::names::UniqueName<'a>>,
85 request_name_flags: BitFlags<RequestNameFlags>,
86}
87
88impl<'a> Builder<'a> {
89 pub fn session() -> Result<Self> {
91 Ok(Self::new(Target::Address(Address::session()?)))
92 }
93
94 pub fn system() -> Result<Self> {
96 Ok(Self::new(Target::Address(Address::system()?)))
97 }
98
99 pub fn address<A>(address: A) -> Result<Self>
131 where
132 A: TryInto<Address>,
133 A::Error: Into<Error>,
134 {
135 Ok(Self::new(Target::Address(
136 address.try_into().map_err(Into::into)?,
137 )))
138 }
139
140 #[cfg(any(unix, not(feature = "tokio")))]
151 pub fn unix_stream(stream: UnixStream) -> Self {
152 Self::new(Target::UnixStream(stream))
153 }
154
155 pub fn tcp_stream(stream: TcpStream) -> Self {
161 Self::new(Target::TcpStream(stream))
162 }
163
164 #[cfg(any(
170 all(feature = "vsock", not(feature = "tokio")),
171 feature = "tokio-vsock"
172 ))]
173 pub fn vsock_stream(stream: VsockStream) -> Self {
174 Self::new(Target::VsockStream(stream))
175 }
176
177 pub fn socket<S: Into<BoxedSplit>>(socket: S) -> Self {
179 Self::new(Target::Socket(socket.into()))
180 }
181
182 pub fn authenticated_socket<S, G>(socket: S, guid: G) -> Result<Self>
187 where
188 S: Into<BoxedSplit>,
189 G: TryInto<Guid<'a>>,
190 G::Error: Into<Error>,
191 {
192 let mut builder = Self::new(Target::AuthenticatedSocket(socket.into()));
193 builder.guid = Some(guid.try_into().map_err(Into::into)?);
194
195 Ok(builder)
196 }
197
198 pub fn auth_mechanism(mut self, auth_mechanism: AuthMechanism) -> Self {
200 self.auth_mechanism = Some(auth_mechanism);
201
202 self
203 }
204
205 #[cfg(feature = "p2p")]
209 pub fn p2p(mut self) -> Self {
210 self.p2p = true;
211
212 self
213 }
214
215 #[cfg(feature = "p2p")]
226 pub fn server<G>(mut self, guid: G) -> Result<Self>
227 where
228 G: TryInto<Guid<'a>>,
229 G::Error: Into<Error>,
230 {
231 self.guid = Some(guid.try_into().map_err(Into::into)?);
232
233 Ok(self)
234 }
235
236 pub fn max_queued(mut self, max: usize) -> Self {
262 self.max_queued = Some(max);
263
264 self
265 }
266
267 pub fn internal_executor(mut self, enabled: bool) -> Self {
273 self.internal_executor = enabled;
274
275 self
276 }
277
278 pub fn serve_at<P, I>(mut self, path: P, iface: I) -> Result<Self>
288 where
289 I: Interface,
290 P: TryInto<ObjectPath<'a>>,
291 P::Error: Into<Error>,
292 {
293 let path = path.try_into().map_err(Into::into)?;
294 let entry = self.interfaces.entry(path).or_default();
295 entry.insert(I::name(), ArcInterface::new(iface));
296 Ok(self)
297 }
298
299 pub fn name<W>(mut self, well_known_name: W) -> Result<Self>
309 where
310 W: TryInto<WellKnownName<'a>>,
311 W::Error: Into<Error>,
312 {
313 let well_known_name = well_known_name.try_into().map_err(Into::into)?;
314 self.names.insert(well_known_name);
315
316 Ok(self)
317 }
318
319 pub fn allow_name_replacements(mut self, allow_replacement: bool) -> Self {
322 self.request_name_flags
323 .set(RequestNameFlags::AllowReplacement, allow_replacement);
324 self
325 }
326
327 pub fn replace_existing_names(mut self, replace_existing: bool) -> Self {
330 self.request_name_flags
331 .set(RequestNameFlags::ReplaceExisting, replace_existing);
332 self
333 }
334
335 #[cfg(feature = "bus-impl")]
345 pub fn unique_name<U>(mut self, unique_name: U) -> Result<Self>
346 where
347 U: TryInto<crate::names::UniqueName<'a>>,
348 U::Error: Into<Error>,
349 {
350 if !self.p2p {
351 panic!("unique name can only be set for peer-to-peer connections");
352 }
353 let name = unique_name.try_into().map_err(Into::into)?;
354 self.unique_name = Some(name);
355
356 Ok(self)
357 }
358
359 pub async fn build(self) -> Result<Connection> {
366 let executor = Executor::new();
367 #[cfg(not(feature = "tokio"))]
368 let internal_executor = self.internal_executor;
369 let conn = Box::pin(executor.run(self.build_(executor.clone()))).await?;
371
372 #[cfg(not(feature = "tokio"))]
373 start_internal_executor(&executor, internal_executor)?;
374
375 Ok(conn)
376 }
377
378 async fn build_(mut self, executor: Executor<'static>) -> Result<Connection> {
379 #[cfg(feature = "p2p")]
380 let is_bus_conn = !self.p2p;
381 #[cfg(not(feature = "p2p"))]
382 let is_bus_conn = true;
383
384 let mut auth = self.connect(is_bus_conn).await?;
385
386 let socket_read = auth.socket_read.take().unwrap();
388 let already_received_bytes = auth.already_received_bytes.drain(..).collect();
389 #[cfg(unix)]
390 let already_received_fds = auth.already_received_fds.drain(..).collect();
391
392 let mut conn = Connection::new(auth, is_bus_conn, executor).await?;
393 conn.set_max_queued(self.max_queued.unwrap_or(DEFAULT_MAX_QUEUED));
394
395 if !self.interfaces.is_empty() {
396 let object_server = conn.ensure_object_server(false);
397 for (path, interfaces) in self.interfaces {
398 for (name, iface) in interfaces {
399 let added = object_server
400 .add_arc_interface(path.clone(), name.clone(), iface.clone())
401 .await?;
402 if !added {
403 return Err(Error::InterfaceExists(name.clone(), path.to_owned()));
404 }
405 }
406 }
407
408 let started_event = Event::new();
409 let listener = started_event.listen();
410 conn.start_object_server(Some(started_event));
411
412 listener.await;
413 }
414
415 conn.init_socket_reader(
417 socket_read,
418 already_received_bytes,
419 #[cfg(unix)]
420 already_received_fds,
421 );
422
423 for name in self.names {
424 conn.request_name_with_flags(name, self.request_name_flags)
425 .await?;
426 }
427
428 Ok(conn)
429 }
430
431 fn new(target: Target) -> Self {
432 Self {
433 target: Some(target),
434 #[cfg(feature = "p2p")]
435 p2p: false,
436 max_queued: None,
437 guid: None,
438 internal_executor: true,
439 interfaces: HashMap::new(),
440 names: HashSet::new(),
441 auth_mechanism: None,
442 #[cfg(feature = "bus-impl")]
443 unique_name: None,
444 request_name_flags: BitFlags::default(),
445 }
446 }
447
448 async fn connect(&mut self, is_bus_conn: bool) -> Result<Authenticated> {
449 #[cfg(not(feature = "bus-impl"))]
450 let unique_name = None;
451 #[cfg(feature = "bus-impl")]
452 let unique_name = self.unique_name.take().map(Into::into);
453
454 #[allow(unused_mut)]
455 let (mut stream, server_guid, authenticated) = self.target_connect().await?;
456 if authenticated {
457 let (socket_read, socket_write) = stream.take();
458 Ok(Authenticated {
459 #[cfg(unix)]
460 cap_unix_fd: socket_read.can_pass_unix_fd(),
461 socket_read: Some(socket_read),
462 socket_write,
463 server_guid: server_guid.unwrap(),
465 already_received_bytes: vec![],
466 unique_name,
467 #[cfg(unix)]
468 already_received_fds: vec![],
469 })
470 } else {
471 #[cfg(feature = "p2p")]
472 match self.guid.take() {
473 None => {
474 Authenticated::client(stream, server_guid, self.auth_mechanism, is_bus_conn)
476 .await
477 }
478 Some(guid) => {
479 if !self.p2p {
480 return Err(Error::Unsupported);
481 }
482
483 let creds = stream.read_mut().peer_credentials().await?;
484 #[cfg(unix)]
485 let client_uid = creds.unix_user_id();
486 #[cfg(windows)]
487 let client_sid = creds.into_windows_sid();
488
489 Authenticated::server(
490 stream,
491 guid.to_owned().into(),
492 #[cfg(unix)]
493 client_uid,
494 #[cfg(windows)]
495 client_sid,
496 self.auth_mechanism,
497 unique_name,
498 )
499 .await
500 }
501 }
502
503 #[cfg(not(feature = "p2p"))]
504 Authenticated::client(stream, server_guid, self.auth_mechanism, is_bus_conn).await
505 }
506 }
507
508 async fn target_connect(&mut self) -> Result<(BoxedSplit, Option<OwnedGuid>, bool)> {
509 let mut authenticated = false;
510 let mut guid = None;
511 let split = match self.target.take().unwrap() {
514 #[cfg(not(feature = "tokio"))]
515 Target::UnixStream(stream) => Async::new(stream)?.into(),
516 #[cfg(all(unix, feature = "tokio"))]
517 Target::UnixStream(stream) => stream.into(),
518 #[cfg(not(feature = "tokio"))]
519 Target::TcpStream(stream) => Async::new(stream)?.into(),
520 #[cfg(feature = "tokio")]
521 Target::TcpStream(stream) => stream.into(),
522 #[cfg(all(feature = "vsock", not(feature = "tokio")))]
523 Target::VsockStream(stream) => Async::new(stream)?.into(),
524 #[cfg(feature = "tokio-vsock")]
525 Target::VsockStream(stream) => stream.into(),
526 Target::Address(address) => {
527 guid = address.guid().map(|g| g.to_owned().into());
528 match address.connect().await? {
529 #[cfg(any(unix, not(feature = "tokio")))]
530 address::transport::Stream::Unix(stream) => stream.into(),
531 #[cfg(unix)]
532 address::transport::Stream::Unixexec(stream) => stream.into(),
533 address::transport::Stream::Tcp(stream) => stream.into(),
534 #[cfg(any(
535 all(feature = "vsock", not(feature = "tokio")),
536 feature = "tokio-vsock"
537 ))]
538 address::transport::Stream::Vsock(stream) => stream.into(),
539 }
540 }
541 Target::Socket(stream) => stream,
542 Target::AuthenticatedSocket(stream) => {
543 authenticated = true;
544 guid = self.guid.take().map(Into::into);
545 stream
546 }
547 };
548
549 Ok((split, guid, authenticated))
550 }
551}
552
553#[cfg(not(feature = "tokio"))]
558fn start_internal_executor(executor: &Executor<'static>, internal_executor: bool) -> Result<()> {
559 if internal_executor {
560 let executor = executor.clone();
561 std::thread::Builder::new()
562 .name("zbus::Connection executor".into())
563 .spawn(move || {
564 crate::utils::block_on(async move {
565 while !executor.is_empty() {
567 executor.tick().await;
568 }
569 })
570 })?;
571 }
572
573 Ok(())
574}