zbus/connection/socket/
tcp.rs1#[cfg(not(feature = "tokio"))]
2use async_io::Async;
3use std::io;
4#[cfg(unix)]
5use std::os::fd::BorrowedFd;
6#[cfg(not(feature = "tokio"))]
7use std::{net::TcpStream, sync::Arc};
8
9use super::{ReadHalf, RecvmsgResult, WriteHalf};
10#[cfg(feature = "tokio")]
11use super::{Socket, Split};
12
13#[cfg(not(feature = "tokio"))]
14#[async_trait::async_trait]
15impl ReadHalf for Arc<Async<TcpStream>> {
16 async fn recvmsg(&mut self, buf: &mut [u8]) -> RecvmsgResult {
17 match futures_lite::AsyncReadExt::read(&mut self.as_ref(), buf).await {
18 Err(e) => Err(e),
19 Ok(len) => {
20 #[cfg(unix)]
21 let ret = (len, vec![]);
22 #[cfg(not(unix))]
23 let ret = len;
24 Ok(ret)
25 }
26 }
27 }
28
29 #[cfg(windows)]
30 async fn peer_credentials(&mut self) -> io::Result<crate::fdo::ConnectionCredentials> {
31 let stream = self.clone();
32 crate::Task::spawn_blocking(
33 move || {
34 use crate::win32::{tcp_stream_get_peer_pid, ProcessToken};
35
36 let pid = tcp_stream_get_peer_pid(stream.get_ref())? as _;
37 let sid = ProcessToken::open(if pid != 0 { Some(pid as _) } else { None })
38 .and_then(|process_token| process_token.sid())?;
39 io::Result::Ok(
40 crate::fdo::ConnectionCredentials::default()
41 .set_process_id(pid)
42 .set_windows_sid(sid),
43 )
44 },
45 "peer credentials",
46 )
47 .await
48 }
49
50 #[cfg(not(windows))]
51 fn auth_mechanism(&self) -> crate::conn::AuthMechanism {
52 crate::conn::AuthMechanism::Anonymous
53 }
54}
55
56#[cfg(not(feature = "tokio"))]
57#[async_trait::async_trait]
58impl WriteHalf for Arc<Async<TcpStream>> {
59 async fn sendmsg(
60 &mut self,
61 buf: &[u8],
62 #[cfg(unix)] fds: &[BorrowedFd<'_>],
63 ) -> io::Result<usize> {
64 #[cfg(unix)]
65 if !fds.is_empty() {
66 return Err(io::Error::new(
67 io::ErrorKind::InvalidInput,
68 "fds cannot be sent with a tcp stream",
69 ));
70 }
71
72 futures_lite::AsyncWriteExt::write(&mut self.as_ref(), buf).await
73 }
74
75 async fn close(&mut self) -> io::Result<()> {
76 let stream = self.clone();
77 crate::Task::spawn_blocking(
78 move || stream.get_ref().shutdown(std::net::Shutdown::Both),
79 "close socket",
80 )
81 .await
82 }
83
84 async fn peer_credentials(&mut self) -> io::Result<crate::fdo::ConnectionCredentials> {
85 ReadHalf::peer_credentials(self).await
86 }
87}
88
89#[cfg(feature = "tokio")]
90impl Socket for tokio::net::TcpStream {
91 type ReadHalf = tokio::net::tcp::OwnedReadHalf;
92 type WriteHalf = tokio::net::tcp::OwnedWriteHalf;
93
94 fn split(self) -> Split<Self::ReadHalf, Self::WriteHalf> {
95 let (read, write) = self.into_split();
96
97 Split { read, write }
98 }
99}
100
101#[cfg(feature = "tokio")]
102#[async_trait::async_trait]
103impl ReadHalf for tokio::net::tcp::OwnedReadHalf {
104 async fn recvmsg(&mut self, buf: &mut [u8]) -> RecvmsgResult {
105 use tokio::io::{AsyncReadExt, ReadBuf};
106
107 let mut read_buf = ReadBuf::new(buf);
108 self.read_buf(&mut read_buf).await.map(|_| {
109 let ret = read_buf.filled().len();
110 #[cfg(unix)]
111 let ret = (ret, vec![]);
112
113 ret
114 })
115 }
116
117 #[cfg(windows)]
118 async fn peer_credentials(&mut self) -> io::Result<crate::fdo::ConnectionCredentials> {
119 let peer_addr = self.peer_addr()?.clone();
120 crate::Task::spawn_blocking(
121 move || win32_credentials_from_addr(&peer_addr),
122 "peer credentials",
123 )
124 .await
125 }
126
127 #[cfg(not(windows))]
128 fn auth_mechanism(&self) -> crate::conn::AuthMechanism {
129 crate::conn::AuthMechanism::Anonymous
130 }
131}
132
133#[cfg(feature = "tokio")]
134#[async_trait::async_trait]
135impl WriteHalf for tokio::net::tcp::OwnedWriteHalf {
136 async fn sendmsg(
137 &mut self,
138 buf: &[u8],
139 #[cfg(unix)] fds: &[BorrowedFd<'_>],
140 ) -> io::Result<usize> {
141 use tokio::io::AsyncWriteExt;
142
143 #[cfg(unix)]
144 if !fds.is_empty() {
145 return Err(io::Error::new(
146 io::ErrorKind::InvalidInput,
147 "fds cannot be sent with a tcp stream",
148 ));
149 }
150
151 self.write(buf).await
152 }
153
154 async fn close(&mut self) -> io::Result<()> {
155 tokio::io::AsyncWriteExt::shutdown(self).await
156 }
157
158 #[cfg(windows)]
159 async fn peer_credentials(&mut self) -> io::Result<crate::fdo::ConnectionCredentials> {
160 let peer_addr = self.peer_addr()?.clone();
161 crate::Task::spawn_blocking(
162 move || win32_credentials_from_addr(&peer_addr),
163 "peer credentials",
164 )
165 .await
166 }
167}
168
169#[cfg(feature = "tokio")]
170#[cfg(windows)]
171fn win32_credentials_from_addr(
172 addr: &std::net::SocketAddr,
173) -> io::Result<crate::fdo::ConnectionCredentials> {
174 use crate::win32::{socket_addr_get_pid, ProcessToken};
175
176 let pid = socket_addr_get_pid(addr)? as _;
177 let sid = ProcessToken::open(if pid != 0 { Some(pid as _) } else { None })
178 .and_then(|process_token| process_token.sid())?;
179 Ok(crate::fdo::ConnectionCredentials::default()
180 .set_process_id(pid)
181 .set_windows_sid(sid))
182}