zbus/connection/socket/
tcp.rs

1#[cfg(not(feature = "tokio"))]
2use async_io::Async;
3use std::io;
4#[cfg(unix)]
5use std::os::fd::BorrowedFd;
6#[cfg(not(feature = "tokio"))]
7use std::{net::TcpStream, sync::Arc};
8
9use super::{ReadHalf, RecvmsgResult, WriteHalf};
10#[cfg(feature = "tokio")]
11use super::{Socket, Split};
12
13#[cfg(not(feature = "tokio"))]
14#[async_trait::async_trait]
15impl ReadHalf for Arc<Async<TcpStream>> {
16    async fn recvmsg(&mut self, buf: &mut [u8]) -> RecvmsgResult {
17        match futures_lite::AsyncReadExt::read(&mut self.as_ref(), buf).await {
18            Err(e) => Err(e),
19            Ok(len) => {
20                #[cfg(unix)]
21                let ret = (len, vec![]);
22                #[cfg(not(unix))]
23                let ret = len;
24                Ok(ret)
25            }
26        }
27    }
28
29    #[cfg(windows)]
30    async fn peer_credentials(&mut self) -> io::Result<crate::fdo::ConnectionCredentials> {
31        let stream = self.clone();
32        crate::Task::spawn_blocking(
33            move || {
34                use crate::win32::{tcp_stream_get_peer_pid, ProcessToken};
35
36                let pid = tcp_stream_get_peer_pid(stream.get_ref())? as _;
37                let sid = ProcessToken::open(if pid != 0 { Some(pid as _) } else { None })
38                    .and_then(|process_token| process_token.sid())?;
39                io::Result::Ok(
40                    crate::fdo::ConnectionCredentials::default()
41                        .set_process_id(pid)
42                        .set_windows_sid(sid),
43                )
44            },
45            "peer credentials",
46        )
47        .await
48    }
49
50    #[cfg(not(windows))]
51    fn auth_mechanism(&self) -> crate::conn::AuthMechanism {
52        crate::conn::AuthMechanism::Anonymous
53    }
54}
55
56#[cfg(not(feature = "tokio"))]
57#[async_trait::async_trait]
58impl WriteHalf for Arc<Async<TcpStream>> {
59    async fn sendmsg(
60        &mut self,
61        buf: &[u8],
62        #[cfg(unix)] fds: &[BorrowedFd<'_>],
63    ) -> io::Result<usize> {
64        #[cfg(unix)]
65        if !fds.is_empty() {
66            return Err(io::Error::new(
67                io::ErrorKind::InvalidInput,
68                "fds cannot be sent with a tcp stream",
69            ));
70        }
71
72        futures_lite::AsyncWriteExt::write(&mut self.as_ref(), buf).await
73    }
74
75    async fn close(&mut self) -> io::Result<()> {
76        let stream = self.clone();
77        crate::Task::spawn_blocking(
78            move || stream.get_ref().shutdown(std::net::Shutdown::Both),
79            "close socket",
80        )
81        .await
82    }
83
84    async fn peer_credentials(&mut self) -> io::Result<crate::fdo::ConnectionCredentials> {
85        ReadHalf::peer_credentials(self).await
86    }
87}
88
89#[cfg(feature = "tokio")]
90impl Socket for tokio::net::TcpStream {
91    type ReadHalf = tokio::net::tcp::OwnedReadHalf;
92    type WriteHalf = tokio::net::tcp::OwnedWriteHalf;
93
94    fn split(self) -> Split<Self::ReadHalf, Self::WriteHalf> {
95        let (read, write) = self.into_split();
96
97        Split { read, write }
98    }
99}
100
101#[cfg(feature = "tokio")]
102#[async_trait::async_trait]
103impl ReadHalf for tokio::net::tcp::OwnedReadHalf {
104    async fn recvmsg(&mut self, buf: &mut [u8]) -> RecvmsgResult {
105        use tokio::io::{AsyncReadExt, ReadBuf};
106
107        let mut read_buf = ReadBuf::new(buf);
108        self.read_buf(&mut read_buf).await.map(|_| {
109            let ret = read_buf.filled().len();
110            #[cfg(unix)]
111            let ret = (ret, vec![]);
112
113            ret
114        })
115    }
116
117    #[cfg(windows)]
118    async fn peer_credentials(&mut self) -> io::Result<crate::fdo::ConnectionCredentials> {
119        let peer_addr = self.peer_addr()?.clone();
120        crate::Task::spawn_blocking(
121            move || win32_credentials_from_addr(&peer_addr),
122            "peer credentials",
123        )
124        .await
125    }
126
127    #[cfg(not(windows))]
128    fn auth_mechanism(&self) -> crate::conn::AuthMechanism {
129        crate::conn::AuthMechanism::Anonymous
130    }
131}
132
133#[cfg(feature = "tokio")]
134#[async_trait::async_trait]
135impl WriteHalf for tokio::net::tcp::OwnedWriteHalf {
136    async fn sendmsg(
137        &mut self,
138        buf: &[u8],
139        #[cfg(unix)] fds: &[BorrowedFd<'_>],
140    ) -> io::Result<usize> {
141        use tokio::io::AsyncWriteExt;
142
143        #[cfg(unix)]
144        if !fds.is_empty() {
145            return Err(io::Error::new(
146                io::ErrorKind::InvalidInput,
147                "fds cannot be sent with a tcp stream",
148            ));
149        }
150
151        self.write(buf).await
152    }
153
154    async fn close(&mut self) -> io::Result<()> {
155        tokio::io::AsyncWriteExt::shutdown(self).await
156    }
157
158    #[cfg(windows)]
159    async fn peer_credentials(&mut self) -> io::Result<crate::fdo::ConnectionCredentials> {
160        let peer_addr = self.peer_addr()?.clone();
161        crate::Task::spawn_blocking(
162            move || win32_credentials_from_addr(&peer_addr),
163            "peer credentials",
164        )
165        .await
166    }
167}
168
169#[cfg(feature = "tokio")]
170#[cfg(windows)]
171fn win32_credentials_from_addr(
172    addr: &std::net::SocketAddr,
173) -> io::Result<crate::fdo::ConnectionCredentials> {
174    use crate::win32::{socket_addr_get_pid, ProcessToken};
175
176    let pid = socket_addr_get_pid(addr)? as _;
177    let sid = ProcessToken::open(if pid != 0 { Some(pid as _) } else { None })
178        .and_then(|process_token| process_token.sid())?;
179    Ok(crate::fdo::ConnectionCredentials::default()
180        .set_process_id(pid)
181        .set_windows_sid(sid))
182}