1use std::{collections::HashMap, marker::PhantomData, sync::Arc};
4use tracing::{debug, instrument, trace, trace_span, Instrument};
5
6use zbus_names::InterfaceName;
7use zvariant::{ObjectPath, Value};
8
9use crate::{
10 async_lock::RwLock,
11 connection::WeakConnection,
12 fdo,
13 fdo::ObjectManager,
14 message::{Header, Message},
15 Connection, Error, Result,
16};
17
18mod interface;
19pub(crate) use interface::ArcInterface;
20pub use interface::{DispatchResult, Interface, InterfaceDeref, InterfaceDerefMut, InterfaceRef};
21
22mod signal_emitter;
23pub use signal_emitter::SignalEmitter;
24#[deprecated(since = "5.0.0", note = "Please use `SignalEmitter` instead.")]
25pub type SignalContext<'s> = SignalEmitter<'s>;
26
27mod dispatch_notifier;
28pub use dispatch_notifier::ResponseDispatchNotifier;
29
30mod node;
31pub(crate) use node::Node;
32
33#[derive(Debug, Clone)]
91pub struct ObjectServer {
92 conn: WeakConnection,
93 root: Arc<RwLock<Node>>,
94}
95
96impl ObjectServer {
97 pub(crate) fn new(conn: &Connection) -> Self {
99 Self {
100 conn: conn.into(),
101 root: Arc::new(RwLock::new(Node::new(
102 "/".try_into().expect("zvariant bug"),
103 ))),
104 }
105 }
106
107 pub(crate) fn root(&self) -> &RwLock<Node> {
108 &self.root
109 }
110
111 pub async fn at<'p, P, I>(&self, path: P, iface: I) -> Result<bool>
120 where
121 I: Interface,
122 P: TryInto<ObjectPath<'p>>,
123 P::Error: Into<Error>,
124 {
125 self.add_arc_interface(path, I::name(), ArcInterface::new(iface))
126 .await
127 }
128
129 pub(crate) async fn add_arc_interface<'p, P>(
130 &self,
131 path: P,
132 name: InterfaceName<'static>,
133 arc_iface: ArcInterface,
134 ) -> Result<bool>
135 where
136 P: TryInto<ObjectPath<'p>>,
137 P::Error: Into<Error>,
138 {
139 let path = path.try_into().map_err(Into::into)?;
140 let mut root = self.root().write().await;
141 let (node, manager_path) = root.get_child_mut(&path, true);
142 let node = node.unwrap();
143 let added = node.add_arc_interface(name.clone(), arc_iface);
144 if added {
145 if name == ObjectManager::name() {
146 let emitter = SignalEmitter::new(&self.connection(), path)?;
148 let objects = node.get_managed_objects(self, &self.connection()).await?;
149 for (path, owned_interfaces) in objects {
150 let interfaces = owned_interfaces
151 .iter()
152 .map(|(i, props)| {
153 let props = props
154 .iter()
155 .map(|(k, v)| Ok((k.as_str(), Value::try_from(v)?)))
156 .collect::<Result<_>>();
157 Ok((i.into(), props?))
158 })
159 .collect::<Result<_>>()?;
160 ObjectManager::interfaces_added(&emitter, path.into(), interfaces).await?;
161 }
162 } else if let Some(manager_path) = manager_path {
163 let emitter = SignalEmitter::new(&self.connection(), manager_path.clone())?;
164 let mut interfaces = HashMap::new();
165 let owned_props = node
166 .get_properties(self, &self.connection(), name.clone())
167 .await?;
168 let props = owned_props
169 .iter()
170 .map(|(k, v)| Ok((k.as_str(), Value::try_from(v)?)))
171 .collect::<Result<_>>()?;
172 interfaces.insert(name, props);
173
174 ObjectManager::interfaces_added(&emitter, path, interfaces).await?;
175 }
176 }
177
178 Ok(added)
179 }
180
181 pub async fn remove<'p, I, P>(&self, path: P) -> Result<bool>
186 where
187 I: Interface,
188 P: TryInto<ObjectPath<'p>>,
189 P::Error: Into<Error>,
190 {
191 let path = path.try_into().map_err(Into::into)?;
192 let mut root = self.root.write().await;
193 let (node, manager_path) = root.get_child_mut(&path, false);
194 let node = node.ok_or(Error::InterfaceNotFound)?;
195 if !node.remove_interface(I::name()) {
196 return Err(Error::InterfaceNotFound);
197 }
198 if let Some(manager_path) = manager_path {
199 let ctxt = SignalEmitter::new(&self.connection(), manager_path.clone())?;
200 ObjectManager::interfaces_removed(&ctxt, path.clone(), (&[I::name()]).into()).await?;
201 }
202 if node.is_empty() {
203 let mut path_parts = path.rsplit('/').filter(|i| !i.is_empty());
204 let last_part = path_parts.next().unwrap();
205 let ppath = ObjectPath::from_string_unchecked(
206 path_parts.fold(String::new(), |a, p| format!("/{p}{a}")),
207 );
208 root.get_child_mut(&ppath, false)
209 .0
210 .unwrap()
211 .remove_node(last_part);
212 return Ok(true);
213 }
214 Ok(false)
215 }
216
217 pub async fn interface<'p, P, I>(&self, path: P) -> Result<InterfaceRef<I>>
260 where
261 I: Interface,
262 P: TryInto<ObjectPath<'p>>,
263 P::Error: Into<Error>,
264 {
265 let path = path.try_into().map_err(Into::into)?;
266 let root = self.root().read().await;
267 let node = root.get_child(&path).ok_or(Error::InterfaceNotFound)?;
268
269 let lock = node
270 .interface_lock(I::name())
271 .ok_or(Error::InterfaceNotFound)?
272 .instance
273 .clone();
274
275 lock.read()
277 .await
278 .downcast_ref::<I>()
279 .ok_or(Error::InterfaceNotFound)?;
280
281 let conn = self.connection();
282 let emitter = SignalEmitter::new(&conn, path).unwrap().into_owned();
284
285 Ok(InterfaceRef {
286 emitter,
287 lock,
288 phantom: PhantomData,
289 })
290 }
291
292 async fn dispatch_call_to_iface(
293 &self,
294 iface: Arc<RwLock<dyn Interface>>,
295 connection: &Connection,
296 msg: &Message,
297 hdr: &Header<'_>,
298 ) -> fdo::Result<()> {
299 let member = hdr
300 .member()
301 .ok_or_else(|| fdo::Error::Failed("Missing member".into()))?;
302 let iface_name = hdr
303 .interface()
304 .ok_or_else(|| fdo::Error::Failed("Missing interface".into()))?;
305
306 trace!("acquiring read lock on interface `{}`", iface_name);
307 let read_lock = iface.read().await;
308 trace!("acquired read lock on interface `{}`", iface_name);
309 match read_lock.call(self, connection, msg, member.as_ref()) {
310 DispatchResult::NotFound => {
311 return Err(fdo::Error::UnknownMethod(format!(
312 "Unknown method '{member}'"
313 )));
314 }
315 DispatchResult::Async(f) => {
316 return f.await.map_err(|e| match e {
317 Error::FDO(e) => *e,
318 e => fdo::Error::Failed(format!("{e}")),
319 });
320 }
321 DispatchResult::RequiresMut => {}
322 }
323 drop(read_lock);
324 trace!("acquiring write lock on interface `{}`", iface_name);
325 let mut write_lock = iface.write().await;
326 trace!("acquired write lock on interface `{}`", iface_name);
327 match write_lock.call_mut(self, connection, msg, member.as_ref()) {
328 DispatchResult::NotFound => {}
329 DispatchResult::RequiresMut => {}
330 DispatchResult::Async(f) => {
331 return f.await.map_err(|e| match e {
332 Error::FDO(e) => *e,
333 e => fdo::Error::Failed(format!("{e}")),
334 });
335 }
336 }
337 drop(write_lock);
338 Err(fdo::Error::UnknownMethod(format!(
339 "Unknown method '{member}'"
340 )))
341 }
342
343 async fn dispatch_method_call_try(
344 &self,
345 connection: &Connection,
346 msg: &Message,
347 hdr: &Header<'_>,
348 ) -> fdo::Result<()> {
349 let path = hdr
350 .path()
351 .ok_or_else(|| fdo::Error::Failed("Missing object path".into()))?;
352 let iface_name = hdr
353 .interface()
354 .ok_or_else(|| fdo::Error::Failed("Missing interface".into()))?;
360 let _ = hdr
365 .member()
366 .ok_or_else(|| fdo::Error::Failed("Missing member".into()))?;
367
368 let (iface, with_spawn) = {
371 let root = self.root.read().await;
372 let node = root
373 .get_child(path)
374 .ok_or_else(|| fdo::Error::UnknownObject(format!("Unknown object '{path}'")))?;
375
376 let iface = node.interface_lock(iface_name.as_ref()).ok_or_else(|| {
377 fdo::Error::UnknownInterface(format!("Unknown interface '{iface_name}'"))
378 })?;
379 (iface.instance, iface.spawn_tasks_for_methods)
380 };
381
382 if with_spawn {
383 let executor = connection.executor().clone();
384 let task_name = format!("`{msg}` method dispatcher");
385 let connection = connection.clone();
386 let msg = msg.clone();
387 executor
388 .spawn(
389 async move {
390 let server = connection.object_server();
391 let hdr = msg.header();
392 if let Err(e) = server
393 .dispatch_call_to_iface(iface, &connection, &msg, &hdr)
394 .await
395 {
396 debug!("Returning error: {}", e);
398 if let Err(e) = connection.reply_dbus_error(&hdr, e).await {
399 debug!(
400 "Error dispatching message. Message: {:?}, error: {:?}",
401 msg, e
402 );
403 }
404 }
405 }
406 .instrument(trace_span!("{}", task_name)),
407 &task_name,
408 )
409 .detach();
410 Ok(())
411 } else {
412 self.dispatch_call_to_iface(iface, connection, msg, hdr)
413 .await
414 }
415 }
416
417 #[instrument(skip(self))]
430 pub(crate) async fn dispatch_call(&self, msg: &Message, hdr: &Header<'_>) -> Result<()> {
431 let conn = self.connection();
432
433 if let Err(e) = self.dispatch_method_call_try(&conn, msg, hdr).await {
434 debug!("Returning error: {}", e);
435 conn.reply_dbus_error(hdr, e).await?;
436 }
437 trace!("Handled: {}", msg);
438
439 Ok(())
440 }
441
442 pub(crate) fn connection(&self) -> Connection {
443 self.conn
444 .upgrade()
445 .expect("ObjectServer can't exist w/o an associated Connection")
446 }
447}
448
449#[cfg(feature = "blocking-api")]
450impl From<crate::blocking::ObjectServer> for ObjectServer {
451 fn from(server: crate::blocking::ObjectServer) -> Self {
452 server.into_inner()
453 }
454}