zbus/raw/
connection.rs

1use std::{
2    collections::VecDeque,
3    io,
4    sync::Arc,
5    task::{Context, Poll},
6};
7
8use event_listener::{Event, EventListener};
9
10#[cfg(unix)]
11use crate::OwnedFd;
12use crate::{
13    message_header::{MAX_MESSAGE_SIZE, MIN_MESSAGE_SIZE},
14    raw::Socket,
15    utils::padding_for_8_bytes,
16    Message, MessagePrimaryHeader,
17};
18
19use futures_core::ready;
20
21/// A low-level representation of a D-Bus connection
22///
23/// This wrapper is agnostic on the actual transport, using the `Socket` trait
24/// to abstract it. It is compatible with sockets both in blocking or non-blocking
25/// mode.
26///
27/// This wrapper abstracts away the serialization & buffering considerations of the
28/// protocol, and allows interaction based on messages, rather than bytes.
29#[derive(derivative::Derivative)]
30#[derivative(Debug)]
31pub struct Connection<S> {
32    #[derivative(Debug = "ignore")]
33    socket: S,
34    event: Event,
35    raw_in_buffer: Vec<u8>,
36    #[cfg(unix)]
37    raw_in_fds: Vec<OwnedFd>,
38    raw_in_pos: usize,
39    out_pos: usize,
40    out_msgs: VecDeque<Arc<Message>>,
41    prev_seq: u64,
42}
43
44impl<S: Socket> Connection<S> {
45    pub(crate) fn new(socket: S, raw_in_buffer: Vec<u8>) -> Connection<S> {
46        Connection {
47            socket,
48            event: Event::new(),
49            raw_in_pos: raw_in_buffer.len(),
50            raw_in_buffer,
51            #[cfg(unix)]
52            raw_in_fds: vec![],
53            out_pos: 0,
54            out_msgs: VecDeque::new(),
55            prev_seq: 0,
56        }
57    }
58
59    /// Attempt to flush the outgoing buffer
60    ///
61    /// This will try to write as many messages as possible from the
62    /// outgoing buffer into the socket, until an error is encountered.
63    ///
64    /// This method will thus only block if the socket is in blocking mode.
65    pub fn try_flush(&mut self, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
66        self.event.notify(usize::MAX);
67        while let Some(msg) = self.out_msgs.front() {
68            loop {
69                let data = &msg.as_bytes()[self.out_pos..];
70                if data.is_empty() {
71                    self.out_pos = 0;
72                    self.out_msgs.pop_front();
73                    break;
74                }
75                #[cfg(unix)]
76                let fds = if self.out_pos == 0 { msg.fds() } else { vec![] };
77                self.out_pos += ready!(self.socket.poll_sendmsg(
78                    cx,
79                    data,
80                    #[cfg(unix)]
81                    &fds,
82                ))?;
83            }
84        }
85        Poll::Ready(Ok(()))
86    }
87
88    /// Enqueue a message to be sent out to the socket
89    ///
90    /// This method will *not* write anything to the socket, you need to call
91    /// `try_flush()` afterwards so that your message is actually sent out.
92    pub fn enqueue_message(&mut self, msg: Arc<Message>) {
93        self.out_msgs.push_back(msg);
94    }
95
96    /// Attempt to read a message from the socket
97    ///
98    /// This methods will read from the socket until either a full D-Bus message is
99    /// read or an error is encountered.
100    ///
101    /// If the socket is in non-blocking mode, it may read a partial message. In such case it
102    /// will buffer it internally and try to complete it the next time you call
103    /// `try_receive_message`.
104    pub fn try_receive_message(&mut self, cx: &mut Context<'_>) -> Poll<crate::Result<Message>> {
105        self.event.notify(usize::MAX);
106        if self.raw_in_pos < MIN_MESSAGE_SIZE {
107            self.raw_in_buffer.resize(MIN_MESSAGE_SIZE, 0);
108            // We don't have enough data to make a proper message header yet.
109            // Some partial read may be in raw_in_buffer, so we try to complete it
110            // until we have MIN_MESSAGE_SIZE bytes
111            //
112            // Given that MIN_MESSAGE_SIZE is 16, this codepath is actually extremely unlikely
113            // to be taken more than once
114            while self.raw_in_pos < MIN_MESSAGE_SIZE {
115                let res = ready!(self
116                    .socket
117                    .poll_recvmsg(cx, &mut self.raw_in_buffer[self.raw_in_pos..]))?;
118                let len = {
119                    #[cfg(unix)]
120                    {
121                        let (len, fds) = res;
122                        self.raw_in_fds.extend(fds);
123                        len
124                    }
125                    #[cfg(not(unix))]
126                    {
127                        res
128                    }
129                };
130                self.raw_in_pos += len;
131                if len == 0 {
132                    return Poll::Ready(Err(crate::Error::InputOutput(
133                        std::io::Error::new(
134                            std::io::ErrorKind::UnexpectedEof,
135                            "failed to receive message",
136                        )
137                        .into(),
138                    )));
139                }
140            }
141        }
142
143        let (primary_header, fields_len) = MessagePrimaryHeader::read(&self.raw_in_buffer)?;
144        let header_len = MIN_MESSAGE_SIZE + fields_len as usize;
145        let body_padding = padding_for_8_bytes(header_len);
146        let body_len = primary_header.body_len() as usize;
147        let total_len = header_len + body_padding + body_len;
148        if total_len > MAX_MESSAGE_SIZE {
149            return Poll::Ready(Err(crate::Error::ExcessData));
150        }
151
152        // By this point we have a full primary header, so we know the exact length of the complete
153        // message.
154        self.raw_in_buffer.resize(total_len, 0);
155
156        // Now we have an incomplete message; read the rest
157        while self.raw_in_buffer.len() > self.raw_in_pos {
158            let res = ready!(self
159                .socket
160                .poll_recvmsg(cx, &mut self.raw_in_buffer[self.raw_in_pos..]))?;
161            let read = {
162                #[cfg(unix)]
163                {
164                    let (read, fds) = res;
165                    self.raw_in_fds.extend(fds);
166                    read
167                }
168                #[cfg(not(unix))]
169                {
170                    res
171                }
172            };
173            self.raw_in_pos += read;
174        }
175
176        // If we reach here, the message is complete; return it
177        self.raw_in_pos = 0;
178        let bytes = std::mem::take(&mut self.raw_in_buffer);
179        #[cfg(unix)]
180        let fds = std::mem::take(&mut self.raw_in_fds);
181        let seq = self.prev_seq + 1;
182        self.prev_seq = seq;
183        Poll::Ready(Message::from_raw_parts(
184            bytes,
185            #[cfg(unix)]
186            fds,
187            seq,
188        ))
189    }
190
191    /// Close the connection.
192    ///
193    /// After this call, all reading and writing operations will fail.
194    pub fn close(&self) -> crate::Result<()> {
195        self.event.notify(usize::MAX);
196        self.socket().close().map_err(|e| e.into())
197    }
198
199    /// Access the underlying socket
200    ///
201    /// This method is intended to provide access to the socket in order to access certain
202    /// properties (e.g peer credentials).
203    ///
204    /// You should not try to read or write from it directly, as it may corrupt the internal state
205    /// of this wrapper.
206    pub fn socket(&self) -> &S {
207        &self.socket
208    }
209
210    pub(crate) fn monitor_activity(&self) -> EventListener {
211        self.event.listen()
212    }
213}
214
215impl Connection<Box<dyn Socket>> {
216    /// Same as `try_flush` above, except it wraps the method for use in [`std::future::Future`]
217    /// impls.
218    pub(crate) fn flush(&mut self, cx: &mut Context<'_>) -> Poll<crate::Result<()>> {
219        self.try_flush(cx).map_err(Into::into)
220    }
221}
222
223#[cfg(unix)]
224#[cfg(test)]
225mod tests {
226    use super::{Arc, Connection};
227    use crate::message::Message;
228    use futures_util::future::poll_fn;
229    use test_log::test;
230
231    #[test]
232    fn raw_send_receive() {
233        crate::block_on(raw_send_receive_async());
234    }
235
236    async fn raw_send_receive_async() {
237        #[cfg(not(feature = "tokio"))]
238        let (p0, p1) = std::os::unix::net::UnixStream::pair()
239            .map(|(p0, p1)| {
240                (
241                    async_io::Async::new(p0).unwrap(),
242                    async_io::Async::new(p1).unwrap(),
243                )
244            })
245            .unwrap();
246        #[cfg(feature = "tokio")]
247        let (p0, p1) = tokio::net::UnixStream::pair().unwrap();
248
249        let mut conn0 = Connection::new(p0, vec![]);
250        let mut conn1 = Connection::new(p1, vec![]);
251
252        let msg = Message::method(
253            None::<()>,
254            None::<()>,
255            "/",
256            Some("org.zbus.p2p"),
257            "Test",
258            &(),
259        )
260        .unwrap();
261
262        conn0.enqueue_message(Arc::new(msg));
263        poll_fn(|cx| conn0.try_flush(cx)).await.unwrap();
264
265        let ret = poll_fn(|cx| conn1.try_receive_message(cx)).await.unwrap();
266        assert_eq!(ret.to_string(), "Method call Test");
267    }
268}