1use std::{
2 collections::VecDeque,
3 io,
4 sync::Arc,
5 task::{Context, Poll},
6};
78use event_listener::{Event, EventListener};
910#[cfg(unix)]
11use crate::OwnedFd;
12use crate::{
13 message_header::{MAX_MESSAGE_SIZE, MIN_MESSAGE_SIZE},
14 raw::Socket,
15 utils::padding_for_8_bytes,
16 Message, MessagePrimaryHeader,
17};
1819use futures_core::ready;
2021/// A low-level representation of a D-Bus connection
22///
23/// This wrapper is agnostic on the actual transport, using the `Socket` trait
24/// to abstract it. It is compatible with sockets both in blocking or non-blocking
25/// mode.
26///
27/// This wrapper abstracts away the serialization & buffering considerations of the
28/// protocol, and allows interaction based on messages, rather than bytes.
29#[derive(derivative::Derivative)]
30#[derivative(Debug)]
31pub struct Connection<S> {
32#[derivative(Debug = "ignore")]
33socket: S,
34 event: Event,
35 raw_in_buffer: Vec<u8>,
36#[cfg(unix)]
37raw_in_fds: Vec<OwnedFd>,
38 raw_in_pos: usize,
39 out_pos: usize,
40 out_msgs: VecDeque<Arc<Message>>,
41 prev_seq: u64,
42}
4344impl<S: Socket> Connection<S> {
45pub(crate) fn new(socket: S, raw_in_buffer: Vec<u8>) -> Connection<S> {
46 Connection {
47 socket,
48 event: Event::new(),
49 raw_in_pos: raw_in_buffer.len(),
50 raw_in_buffer,
51#[cfg(unix)]
52raw_in_fds: vec![],
53 out_pos: 0,
54 out_msgs: VecDeque::new(),
55 prev_seq: 0,
56 }
57 }
5859/// Attempt to flush the outgoing buffer
60 ///
61 /// This will try to write as many messages as possible from the
62 /// outgoing buffer into the socket, until an error is encountered.
63 ///
64 /// This method will thus only block if the socket is in blocking mode.
65pub fn try_flush(&mut self, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
66self.event.notify(usize::MAX);
67while let Some(msg) = self.out_msgs.front() {
68loop {
69let data = &msg.as_bytes()[self.out_pos..];
70if data.is_empty() {
71self.out_pos = 0;
72self.out_msgs.pop_front();
73break;
74 }
75#[cfg(unix)]
76let fds = if self.out_pos == 0 { msg.fds() } else { vec![] };
77self.out_pos += ready!(self.socket.poll_sendmsg(
78 cx,
79 data,
80#[cfg(unix)]
81&fds,
82 ))?;
83 }
84 }
85 Poll::Ready(Ok(()))
86 }
8788/// Enqueue a message to be sent out to the socket
89 ///
90 /// This method will *not* write anything to the socket, you need to call
91 /// `try_flush()` afterwards so that your message is actually sent out.
92pub fn enqueue_message(&mut self, msg: Arc<Message>) {
93self.out_msgs.push_back(msg);
94 }
9596/// Attempt to read a message from the socket
97 ///
98 /// This methods will read from the socket until either a full D-Bus message is
99 /// read or an error is encountered.
100 ///
101 /// If the socket is in non-blocking mode, it may read a partial message. In such case it
102 /// will buffer it internally and try to complete it the next time you call
103 /// `try_receive_message`.
104pub fn try_receive_message(&mut self, cx: &mut Context<'_>) -> Poll<crate::Result<Message>> {
105self.event.notify(usize::MAX);
106if self.raw_in_pos < MIN_MESSAGE_SIZE {
107self.raw_in_buffer.resize(MIN_MESSAGE_SIZE, 0);
108// We don't have enough data to make a proper message header yet.
109 // Some partial read may be in raw_in_buffer, so we try to complete it
110 // until we have MIN_MESSAGE_SIZE bytes
111 //
112 // Given that MIN_MESSAGE_SIZE is 16, this codepath is actually extremely unlikely
113 // to be taken more than once
114while self.raw_in_pos < MIN_MESSAGE_SIZE {
115let res = ready!(self
116.socket
117 .poll_recvmsg(cx, &mut self.raw_in_buffer[self.raw_in_pos..]))?;
118let len = {
119#[cfg(unix)]
120{
121let (len, fds) = res;
122self.raw_in_fds.extend(fds);
123 len
124 }
125#[cfg(not(unix))]
126{
127 res
128 }
129 };
130self.raw_in_pos += len;
131if len == 0 {
132return Poll::Ready(Err(crate::Error::InputOutput(
133 std::io::Error::new(
134 std::io::ErrorKind::UnexpectedEof,
135"failed to receive message",
136 )
137 .into(),
138 )));
139 }
140 }
141 }
142143let (primary_header, fields_len) = MessagePrimaryHeader::read(&self.raw_in_buffer)?;
144let header_len = MIN_MESSAGE_SIZE + fields_len as usize;
145let body_padding = padding_for_8_bytes(header_len);
146let body_len = primary_header.body_len() as usize;
147let total_len = header_len + body_padding + body_len;
148if total_len > MAX_MESSAGE_SIZE {
149return Poll::Ready(Err(crate::Error::ExcessData));
150 }
151152// By this point we have a full primary header, so we know the exact length of the complete
153 // message.
154self.raw_in_buffer.resize(total_len, 0);
155156// Now we have an incomplete message; read the rest
157while self.raw_in_buffer.len() > self.raw_in_pos {
158let res = ready!(self
159.socket
160 .poll_recvmsg(cx, &mut self.raw_in_buffer[self.raw_in_pos..]))?;
161let read = {
162#[cfg(unix)]
163{
164let (read, fds) = res;
165self.raw_in_fds.extend(fds);
166 read
167 }
168#[cfg(not(unix))]
169{
170 res
171 }
172 };
173self.raw_in_pos += read;
174 }
175176// If we reach here, the message is complete; return it
177self.raw_in_pos = 0;
178let bytes = std::mem::take(&mut self.raw_in_buffer);
179#[cfg(unix)]
180let fds = std::mem::take(&mut self.raw_in_fds);
181let seq = self.prev_seq + 1;
182self.prev_seq = seq;
183 Poll::Ready(Message::from_raw_parts(
184 bytes,
185#[cfg(unix)]
186fds,
187 seq,
188 ))
189 }
190191/// Close the connection.
192 ///
193 /// After this call, all reading and writing operations will fail.
194pub fn close(&self) -> crate::Result<()> {
195self.event.notify(usize::MAX);
196self.socket().close().map_err(|e| e.into())
197 }
198199/// Access the underlying socket
200 ///
201 /// This method is intended to provide access to the socket in order to access certain
202 /// properties (e.g peer credentials).
203 ///
204 /// You should not try to read or write from it directly, as it may corrupt the internal state
205 /// of this wrapper.
206pub fn socket(&self) -> &S {
207&self.socket
208 }
209210pub(crate) fn monitor_activity(&self) -> EventListener {
211self.event.listen()
212 }
213}
214215impl Connection<Box<dyn Socket>> {
216/// Same as `try_flush` above, except it wraps the method for use in [`std::future::Future`]
217 /// impls.
218pub(crate) fn flush(&mut self, cx: &mut Context<'_>) -> Poll<crate::Result<()>> {
219self.try_flush(cx).map_err(Into::into)
220 }
221}
222223#[cfg(unix)]
224#[cfg(test)]
225mod tests {
226use super::{Arc, Connection};
227use crate::message::Message;
228use futures_util::future::poll_fn;
229use test_log::test;
230231#[test]
232fn raw_send_receive() {
233crate::block_on(raw_send_receive_async());
234 }
235236async fn raw_send_receive_async() {
237#[cfg(not(feature = "tokio"))]
238let (p0, p1) = std::os::unix::net::UnixStream::pair()
239 .map(|(p0, p1)| {
240 (
241 async_io::Async::new(p0).unwrap(),
242 async_io::Async::new(p1).unwrap(),
243 )
244 })
245 .unwrap();
246#[cfg(feature = "tokio")]
247let (p0, p1) = tokio::net::UnixStream::pair().unwrap();
248249let mut conn0 = Connection::new(p0, vec![]);
250let mut conn1 = Connection::new(p1, vec![]);
251252let msg = Message::method(
253None::<()>,
254None::<()>,
255"/",
256Some("org.zbus.p2p"),
257"Test",
258&(),
259 )
260 .unwrap();
261262 conn0.enqueue_message(Arc::new(msg));
263 poll_fn(|cx| conn0.try_flush(cx)).await.unwrap();
264265let ret = poll_fn(|cx| conn1.try_receive_message(cx)).await.unwrap();
266assert_eq!(ret.to_string(), "Method call Test");
267 }
268}