zbus/object_server/
mod.rs

1//! The object server API.
2
3use std::{collections::HashMap, marker::PhantomData, sync::Arc};
4use tracing::{debug, instrument, trace, trace_span, Instrument};
5
6use zbus_names::InterfaceName;
7use zvariant::{ObjectPath, Value};
8
9use crate::{
10    async_lock::RwLock,
11    connection::WeakConnection,
12    fdo,
13    fdo::ObjectManager,
14    message::{Header, Message},
15    Connection, Error, Result,
16};
17
18mod interface;
19pub(crate) use interface::ArcInterface;
20pub use interface::{DispatchResult, Interface, InterfaceDeref, InterfaceDerefMut, InterfaceRef};
21
22mod signal_emitter;
23pub use signal_emitter::SignalEmitter;
24#[deprecated(since = "5.0.0", note = "Please use `SignalEmitter` instead.")]
25pub type SignalContext<'s> = SignalEmitter<'s>;
26
27mod dispatch_notifier;
28pub use dispatch_notifier::ResponseDispatchNotifier;
29
30mod node;
31pub(crate) use node::Node;
32
33/// An object server, holding server-side D-Bus objects & interfaces.
34///
35/// Object servers hold interfaces on various object paths, and expose them over D-Bus.
36///
37/// All object paths will have the standard interfaces implemented on your behalf, such as
38/// `org.freedesktop.DBus.Introspectable` or `org.freedesktop.DBus.Properties`.
39///
40/// # Example
41///
42/// This example exposes the `org.myiface.Example.Quit` method on the `/org/zbus/path`
43/// path.
44///
45/// ```no_run
46/// # use std::error::Error;
47/// use zbus::{Connection, interface};
48/// use event_listener::Event;
49/// # use async_io::block_on;
50///
51/// struct Example {
52///     // Interfaces are owned by the ObjectServer. They can have
53///     // `&mut self` methods.
54///     quit_event: Event,
55/// }
56///
57/// impl Example {
58///     fn new(quit_event: Event) -> Self {
59///         Self { quit_event }
60///     }
61/// }
62///
63/// #[interface(name = "org.myiface.Example")]
64/// impl Example {
65///     // This will be the "Quit" D-Bus method.
66///     async fn quit(&mut self) {
67///         self.quit_event.notify(1);
68///     }
69///
70///     // See `interface` documentation to learn
71///     // how to expose properties & signals as well.
72/// }
73///
74/// # block_on(async {
75/// let connection = Connection::session().await?;
76///
77/// let quit_event = Event::new();
78/// let quit_listener = quit_event.listen();
79/// let interface = Example::new(quit_event);
80/// connection
81///     .object_server()
82///     .at("/org/zbus/path", interface)
83///     .await?;
84///
85/// quit_listener.await;
86/// # Ok::<_, Box<dyn Error + Send + Sync>>(())
87/// # })?;
88/// # Ok::<_, Box<dyn Error + Send + Sync>>(())
89/// ```
90#[derive(Debug, Clone)]
91pub struct ObjectServer {
92    conn: WeakConnection,
93    root: Arc<RwLock<Node>>,
94}
95
96impl ObjectServer {
97    /// Create a new D-Bus `ObjectServer`.
98    pub(crate) fn new(conn: &Connection) -> Self {
99        Self {
100            conn: conn.into(),
101            root: Arc::new(RwLock::new(Node::new(
102                "/".try_into().expect("zvariant bug"),
103            ))),
104        }
105    }
106
107    pub(crate) fn root(&self) -> &RwLock<Node> {
108        &self.root
109    }
110
111    /// Register a D-Bus [`Interface`] at a given path (see the example above).
112    ///
113    /// Typically you'd want your interfaces to be registered immediately after the associated
114    /// connection is established and therefore use [`zbus::connection::Builder::serve_at`] instead.
115    /// However, there are situations where you'd need to register interfaces dynamically and that's
116    /// where this method becomes useful.
117    ///
118    /// If the interface already exists at this path, returns false.
119    pub async fn at<'p, P, I>(&self, path: P, iface: I) -> Result<bool>
120    where
121        I: Interface,
122        P: TryInto<ObjectPath<'p>>,
123        P::Error: Into<Error>,
124    {
125        self.add_arc_interface(path, I::name(), ArcInterface::new(iface))
126            .await
127    }
128
129    pub(crate) async fn add_arc_interface<'p, P>(
130        &self,
131        path: P,
132        name: InterfaceName<'static>,
133        arc_iface: ArcInterface,
134    ) -> Result<bool>
135    where
136        P: TryInto<ObjectPath<'p>>,
137        P::Error: Into<Error>,
138    {
139        let path = path.try_into().map_err(Into::into)?;
140        let mut root = self.root().write().await;
141        let (node, manager_path) = root.get_child_mut(&path, true);
142        let node = node.unwrap();
143        let added = node.add_arc_interface(name.clone(), arc_iface);
144        if added {
145            if name == ObjectManager::name() {
146                // Just added an object manager. Need to signal all managed objects under it.
147                let emitter = SignalEmitter::new(&self.connection(), path)?;
148                let objects = node.get_managed_objects(self, &self.connection()).await?;
149                for (path, owned_interfaces) in objects {
150                    let interfaces = owned_interfaces
151                        .iter()
152                        .map(|(i, props)| {
153                            let props = props
154                                .iter()
155                                .map(|(k, v)| Ok((k.as_str(), Value::try_from(v)?)))
156                                .collect::<Result<_>>();
157                            Ok((i.into(), props?))
158                        })
159                        .collect::<Result<_>>()?;
160                    ObjectManager::interfaces_added(&emitter, path.into(), interfaces).await?;
161                }
162            } else if let Some(manager_path) = manager_path {
163                let emitter = SignalEmitter::new(&self.connection(), manager_path.clone())?;
164                let mut interfaces = HashMap::new();
165                let owned_props = node
166                    .get_properties(self, &self.connection(), name.clone())
167                    .await?;
168                let props = owned_props
169                    .iter()
170                    .map(|(k, v)| Ok((k.as_str(), Value::try_from(v)?)))
171                    .collect::<Result<_>>()?;
172                interfaces.insert(name, props);
173
174                ObjectManager::interfaces_added(&emitter, path, interfaces).await?;
175            }
176        }
177
178        Ok(added)
179    }
180
181    /// Unregister a D-Bus [`Interface`] at a given path.
182    ///
183    /// If there are no more interfaces left at that path, destroys the object as well.
184    /// Returns whether the object was destroyed.
185    pub async fn remove<'p, I, P>(&self, path: P) -> Result<bool>
186    where
187        I: Interface,
188        P: TryInto<ObjectPath<'p>>,
189        P::Error: Into<Error>,
190    {
191        let path = path.try_into().map_err(Into::into)?;
192        let mut root = self.root.write().await;
193        let (node, manager_path) = root.get_child_mut(&path, false);
194        let node = node.ok_or(Error::InterfaceNotFound)?;
195        if !node.remove_interface(I::name()) {
196            return Err(Error::InterfaceNotFound);
197        }
198        if let Some(manager_path) = manager_path {
199            let ctxt = SignalEmitter::new(&self.connection(), manager_path.clone())?;
200            ObjectManager::interfaces_removed(&ctxt, path.clone(), (&[I::name()]).into()).await?;
201        }
202        if node.is_empty() {
203            let mut path_parts = path.rsplit('/').filter(|i| !i.is_empty());
204            let last_part = path_parts.next().unwrap();
205            let ppath = ObjectPath::from_string_unchecked(
206                path_parts.fold(String::new(), |a, p| format!("/{p}{a}")),
207            );
208            root.get_child_mut(&ppath, false)
209                .0
210                .unwrap()
211                .remove_node(last_part);
212            return Ok(true);
213        }
214        Ok(false)
215    }
216
217    /// Get the interface at the given path.
218    ///
219    /// # Errors
220    ///
221    /// If the interface is not registered at the given path, an `Error::InterfaceNotFound` error is
222    /// returned.
223    ///
224    /// # Examples
225    ///
226    /// The typical use of this is property changes outside of a dispatched handler:
227    ///
228    /// ```no_run
229    /// # use std::error::Error;
230    /// # use zbus::{Connection, interface};
231    /// # use async_io::block_on;
232    /// #
233    /// struct MyIface(u32);
234    ///
235    /// #[interface(name = "org.myiface.MyIface")]
236    /// impl MyIface {
237    ///      #[zbus(property)]
238    ///      async fn count(&self) -> u32 {
239    ///          self.0
240    ///      }
241    /// }
242    ///
243    /// # block_on(async {
244    /// # let connection = Connection::session().await?;
245    /// #
246    /// # let path = "/org/zbus/path";
247    /// # connection.object_server().at(path, MyIface(0)).await?;
248    /// let iface_ref = connection
249    ///     .object_server()
250    ///     .interface::<_, MyIface>(path).await?;
251    /// let mut iface = iface_ref.get_mut().await;
252    /// iface.0 = 42;
253    /// iface.count_changed(iface_ref.signal_emitter()).await?;
254    /// # Ok::<_, Box<dyn Error + Send + Sync>>(())
255    /// # })?;
256    /// #
257    /// # Ok::<_, Box<dyn Error + Send + Sync>>(())
258    /// ```
259    pub async fn interface<'p, P, I>(&self, path: P) -> Result<InterfaceRef<I>>
260    where
261        I: Interface,
262        P: TryInto<ObjectPath<'p>>,
263        P::Error: Into<Error>,
264    {
265        let path = path.try_into().map_err(Into::into)?;
266        let root = self.root().read().await;
267        let node = root.get_child(&path).ok_or(Error::InterfaceNotFound)?;
268
269        let lock = node
270            .interface_lock(I::name())
271            .ok_or(Error::InterfaceNotFound)?
272            .instance
273            .clone();
274
275        // Ensure what we return can later be dowcasted safely.
276        lock.read()
277            .await
278            .downcast_ref::<I>()
279            .ok_or(Error::InterfaceNotFound)?;
280
281        let conn = self.connection();
282        // SAFETY: We know that there is a valid path on the node as we already converted w/o error.
283        let emitter = SignalEmitter::new(&conn, path).unwrap().into_owned();
284
285        Ok(InterfaceRef {
286            emitter,
287            lock,
288            phantom: PhantomData,
289        })
290    }
291
292    async fn dispatch_call_to_iface(
293        &self,
294        iface: Arc<RwLock<dyn Interface>>,
295        connection: &Connection,
296        msg: &Message,
297        hdr: &Header<'_>,
298    ) -> fdo::Result<()> {
299        let member = hdr
300            .member()
301            .ok_or_else(|| fdo::Error::Failed("Missing member".into()))?;
302        let iface_name = hdr
303            .interface()
304            .ok_or_else(|| fdo::Error::Failed("Missing interface".into()))?;
305
306        trace!("acquiring read lock on interface `{}`", iface_name);
307        let read_lock = iface.read().await;
308        trace!("acquired read lock on interface `{}`", iface_name);
309        match read_lock.call(self, connection, msg, member.as_ref()) {
310            DispatchResult::NotFound => {
311                return Err(fdo::Error::UnknownMethod(format!(
312                    "Unknown method '{member}'"
313                )));
314            }
315            DispatchResult::Async(f) => {
316                return f.await.map_err(|e| match e {
317                    Error::FDO(e) => *e,
318                    e => fdo::Error::Failed(format!("{e}")),
319                });
320            }
321            DispatchResult::RequiresMut => {}
322        }
323        drop(read_lock);
324        trace!("acquiring write lock on interface `{}`", iface_name);
325        let mut write_lock = iface.write().await;
326        trace!("acquired write lock on interface `{}`", iface_name);
327        match write_lock.call_mut(self, connection, msg, member.as_ref()) {
328            DispatchResult::NotFound => {}
329            DispatchResult::RequiresMut => {}
330            DispatchResult::Async(f) => {
331                return f.await.map_err(|e| match e {
332                    Error::FDO(e) => *e,
333                    e => fdo::Error::Failed(format!("{e}")),
334                });
335            }
336        }
337        drop(write_lock);
338        Err(fdo::Error::UnknownMethod(format!(
339            "Unknown method '{member}'"
340        )))
341    }
342
343    async fn dispatch_method_call_try(
344        &self,
345        connection: &Connection,
346        msg: &Message,
347        hdr: &Header<'_>,
348    ) -> fdo::Result<()> {
349        let path = hdr
350            .path()
351            .ok_or_else(|| fdo::Error::Failed("Missing object path".into()))?;
352        let iface_name = hdr
353            .interface()
354            // TODO: In the absence of an INTERFACE field, if two or more interfaces on the same
355            // object have a method with the same name, it is undefined which of those
356            // methods will be invoked. Implementations may choose to either return an
357            // error, or deliver the message as though it had an arbitrary one of those
358            // interfaces.
359            .ok_or_else(|| fdo::Error::Failed("Missing interface".into()))?;
360        // Check that the message has a member before spawning.
361        // Note that an unknown member will still spawn a task. We should instead gather
362        // all the details for the call before spawning.
363        // See also https://github.com/dbus2/zbus/issues/674 for future of Interface.
364        let _ = hdr
365            .member()
366            .ok_or_else(|| fdo::Error::Failed("Missing member".into()))?;
367
368        // Ensure the root lock isn't held while dispatching the message. That
369        // way, the object server can be mutated during that time.
370        let (iface, with_spawn) = {
371            let root = self.root.read().await;
372            let node = root
373                .get_child(path)
374                .ok_or_else(|| fdo::Error::UnknownObject(format!("Unknown object '{path}'")))?;
375
376            let iface = node.interface_lock(iface_name.as_ref()).ok_or_else(|| {
377                fdo::Error::UnknownInterface(format!("Unknown interface '{iface_name}'"))
378            })?;
379            (iface.instance, iface.spawn_tasks_for_methods)
380        };
381
382        if with_spawn {
383            let executor = connection.executor().clone();
384            let task_name = format!("`{msg}` method dispatcher");
385            let connection = connection.clone();
386            let msg = msg.clone();
387            executor
388                .spawn(
389                    async move {
390                        let server = connection.object_server();
391                        let hdr = msg.header();
392                        if let Err(e) = server
393                            .dispatch_call_to_iface(iface, &connection, &msg, &hdr)
394                            .await
395                        {
396                            // When not spawning a task, this error is handled by the caller.
397                            debug!("Returning error: {}", e);
398                            if let Err(e) = connection.reply_dbus_error(&hdr, e).await {
399                                debug!(
400                                    "Error dispatching message. Message: {:?}, error: {:?}",
401                                    msg, e
402                                );
403                            }
404                        }
405                    }
406                    .instrument(trace_span!("{}", task_name)),
407                    &task_name,
408                )
409                .detach();
410            Ok(())
411        } else {
412            self.dispatch_call_to_iface(iface, connection, msg, hdr)
413                .await
414        }
415    }
416
417    /// Dispatch an incoming message to a registered interface.
418    ///
419    /// The object server will handle the message by:
420    ///
421    /// - looking up the called object path & interface,
422    ///
423    /// - calling the associated method if one exists,
424    ///
425    /// - returning a message (responding to the caller with either a return or error message) to
426    ///   the caller through the associated server connection.
427    ///
428    /// Returns an error if the message is malformed.
429    #[instrument(skip(self))]
430    pub(crate) async fn dispatch_call(&self, msg: &Message, hdr: &Header<'_>) -> Result<()> {
431        let conn = self.connection();
432
433        if let Err(e) = self.dispatch_method_call_try(&conn, msg, hdr).await {
434            debug!("Returning error: {}", e);
435            conn.reply_dbus_error(hdr, e).await?;
436        }
437        trace!("Handled: {}", msg);
438
439        Ok(())
440    }
441
442    pub(crate) fn connection(&self) -> Connection {
443        self.conn
444            .upgrade()
445            .expect("ObjectServer can't exist w/o an associated Connection")
446    }
447}
448
449#[cfg(feature = "blocking-api")]
450impl From<crate::blocking::ObjectServer> for ObjectServer {
451    fn from(server: crate::blocking::ObjectServer) -> Self {
452        server.into_inner()
453    }
454}