zbus/connection/socket/
command.rs

1use std::{io, os::fd::BorrowedFd};
2
3#[cfg(not(feature = "tokio"))]
4use async_process::{Child, ChildStdin, ChildStdout};
5
6#[cfg(feature = "tokio")]
7use tokio::{
8    io::{AsyncReadExt, ReadBuf},
9    process::{Child, ChildStdin, ChildStdout},
10};
11
12use super::{ReadHalf, RecvmsgResult, Socket, Split, WriteHalf};
13
14/// A Command stream socket.
15///
16/// This socket communicates with a spawned child process via its standard input
17/// and output streams.
18#[derive(Debug)]
19pub(crate) struct Command {
20    stdin: ChildStdin,
21    stdout: ChildStdout,
22}
23
24impl Command {
25    fn into_split(self) -> (ChildStdout, ChildStdin) {
26        (self.stdout, self.stdin)
27    }
28}
29
30impl Socket for Command {
31    type ReadHalf = ChildStdout;
32    type WriteHalf = ChildStdin;
33
34    fn split(self) -> Split<Self::ReadHalf, Self::WriteHalf> {
35        let (read, write) = self.into_split();
36
37        Split { read, write }
38    }
39}
40
41impl TryFrom<&mut Child> for Command {
42    type Error = crate::Error;
43
44    fn try_from(child: &mut Child) -> Result<Self, Self::Error> {
45        let stdin = child
46            .stdin
47            .take()
48            .ok_or(crate::Error::Failure("child stdin not found".into()))?;
49
50        let stdout = child
51            .stdout
52            .take()
53            .ok_or(crate::Error::Failure("child stdout not found".into()))?;
54
55        Ok(Command { stdin, stdout })
56    }
57}
58
59#[cfg(not(feature = "tokio"))]
60#[async_trait::async_trait]
61impl ReadHalf for ChildStdout {
62    async fn recvmsg(&mut self, buf: &mut [u8]) -> RecvmsgResult {
63        match futures_lite::AsyncReadExt::read(&mut self, buf).await {
64            Err(e) => Err(e),
65            Ok(len) => {
66                #[cfg(unix)]
67                let ret = (len, vec![]);
68                #[cfg(not(unix))]
69                let ret = len;
70                Ok(ret)
71            }
72        }
73    }
74}
75
76#[cfg(feature = "tokio")]
77#[async_trait::async_trait]
78impl ReadHalf for ChildStdout {
79    async fn recvmsg(&mut self, buf: &mut [u8]) -> RecvmsgResult {
80        let mut read_buf = ReadBuf::new(buf);
81        self.read_buf(&mut read_buf).await.map(|_| {
82            let ret = read_buf.filled().len();
83            #[cfg(unix)]
84            let ret = (ret, vec![]);
85
86            ret
87        })
88    }
89}
90
91#[cfg(not(feature = "tokio"))]
92#[async_trait::async_trait]
93impl WriteHalf for ChildStdin {
94    async fn sendmsg(
95        &mut self,
96        buf: &[u8],
97        #[cfg(unix)] fds: &[BorrowedFd<'_>],
98    ) -> io::Result<usize> {
99        #[cfg(unix)]
100        if !fds.is_empty() {
101            return Err(io::Error::new(
102                io::ErrorKind::InvalidInput,
103                "fds cannot be sent with a command stream",
104            ));
105        }
106
107        futures_lite::AsyncWriteExt::write(&mut self, buf).await
108    }
109
110    async fn close(&mut self) -> io::Result<()> {
111        futures_lite::AsyncWriteExt::close(&mut self).await
112    }
113}
114
115#[cfg(feature = "tokio")]
116#[async_trait::async_trait]
117impl WriteHalf for ChildStdin {
118    async fn sendmsg(
119        &mut self,
120        buf: &[u8],
121        #[cfg(unix)] fds: &[BorrowedFd<'_>],
122    ) -> io::Result<usize> {
123        #[cfg(unix)]
124        if !fds.is_empty() {
125            return Err(io::Error::new(
126                io::ErrorKind::InvalidInput,
127                "fds cannot be sent with a command stream",
128            ));
129        }
130
131        tokio::io::AsyncWriteExt::write(&mut self, buf).await
132    }
133
134    async fn close(&mut self) -> io::Result<()> {
135        tokio::io::AsyncWriteExt::shutdown(&mut self).await
136    }
137}