zbus/connection/socket/
command.rs1use std::{io, os::fd::BorrowedFd};
2
3#[cfg(not(feature = "tokio"))]
4use async_process::{Child, ChildStdin, ChildStdout};
5
6#[cfg(feature = "tokio")]
7use tokio::{
8 io::{AsyncReadExt, ReadBuf},
9 process::{Child, ChildStdin, ChildStdout},
10};
11
12use super::{ReadHalf, RecvmsgResult, Socket, Split, WriteHalf};
13
14#[derive(Debug)]
19pub(crate) struct Command {
20 stdin: ChildStdin,
21 stdout: ChildStdout,
22}
23
24impl Command {
25 fn into_split(self) -> (ChildStdout, ChildStdin) {
26 (self.stdout, self.stdin)
27 }
28}
29
30impl Socket for Command {
31 type ReadHalf = ChildStdout;
32 type WriteHalf = ChildStdin;
33
34 fn split(self) -> Split<Self::ReadHalf, Self::WriteHalf> {
35 let (read, write) = self.into_split();
36
37 Split { read, write }
38 }
39}
40
41impl TryFrom<&mut Child> for Command {
42 type Error = crate::Error;
43
44 fn try_from(child: &mut Child) -> Result<Self, Self::Error> {
45 let stdin = child
46 .stdin
47 .take()
48 .ok_or(crate::Error::Failure("child stdin not found".into()))?;
49
50 let stdout = child
51 .stdout
52 .take()
53 .ok_or(crate::Error::Failure("child stdout not found".into()))?;
54
55 Ok(Command { stdin, stdout })
56 }
57}
58
59#[cfg(not(feature = "tokio"))]
60#[async_trait::async_trait]
61impl ReadHalf for ChildStdout {
62 async fn recvmsg(&mut self, buf: &mut [u8]) -> RecvmsgResult {
63 match futures_lite::AsyncReadExt::read(&mut self, buf).await {
64 Err(e) => Err(e),
65 Ok(len) => {
66 #[cfg(unix)]
67 let ret = (len, vec![]);
68 #[cfg(not(unix))]
69 let ret = len;
70 Ok(ret)
71 }
72 }
73 }
74}
75
76#[cfg(feature = "tokio")]
77#[async_trait::async_trait]
78impl ReadHalf for ChildStdout {
79 async fn recvmsg(&mut self, buf: &mut [u8]) -> RecvmsgResult {
80 let mut read_buf = ReadBuf::new(buf);
81 self.read_buf(&mut read_buf).await.map(|_| {
82 let ret = read_buf.filled().len();
83 #[cfg(unix)]
84 let ret = (ret, vec![]);
85
86 ret
87 })
88 }
89}
90
91#[cfg(not(feature = "tokio"))]
92#[async_trait::async_trait]
93impl WriteHalf for ChildStdin {
94 async fn sendmsg(
95 &mut self,
96 buf: &[u8],
97 #[cfg(unix)] fds: &[BorrowedFd<'_>],
98 ) -> io::Result<usize> {
99 #[cfg(unix)]
100 if !fds.is_empty() {
101 return Err(io::Error::new(
102 io::ErrorKind::InvalidInput,
103 "fds cannot be sent with a command stream",
104 ));
105 }
106
107 futures_lite::AsyncWriteExt::write(&mut self, buf).await
108 }
109
110 async fn close(&mut self) -> io::Result<()> {
111 futures_lite::AsyncWriteExt::close(&mut self).await
112 }
113}
114
115#[cfg(feature = "tokio")]
116#[async_trait::async_trait]
117impl WriteHalf for ChildStdin {
118 async fn sendmsg(
119 &mut self,
120 buf: &[u8],
121 #[cfg(unix)] fds: &[BorrowedFd<'_>],
122 ) -> io::Result<usize> {
123 #[cfg(unix)]
124 if !fds.is_empty() {
125 return Err(io::Error::new(
126 io::ErrorKind::InvalidInput,
127 "fds cannot be sent with a command stream",
128 ));
129 }
130
131 tokio::io::AsyncWriteExt::write(&mut self, buf).await
132 }
133
134 async fn close(&mut self) -> io::Result<()> {
135 tokio::io::AsyncWriteExt::shutdown(&mut self).await
136 }
137}