futures_lite/
io.rs

1//! Tools and combinators for I/O.
2//!
3//! # Examples
4//!
5//! ```
6//! use futures_lite::io::{self, AsyncReadExt};
7//!
8//! # spin_on::spin_on(async {
9//! let input: &[u8] = b"hello";
10//! let mut reader = io::BufReader::new(input);
11//!
12//! let mut contents = String::new();
13//! reader.read_to_string(&mut contents).await?;
14//! # std::io::Result::Ok(()) });
15//! ```
16
17#[doc(no_inline)]
18pub use std::io::{Error, ErrorKind, Result, SeekFrom};
19
20#[doc(no_inline)]
21pub use futures_io::{AsyncBufRead, AsyncRead, AsyncSeek, AsyncWrite};
22
23use std::borrow::{Borrow, BorrowMut};
24use std::cmp;
25use std::fmt;
26use std::future::Future;
27use std::io::{IoSlice, IoSliceMut};
28use std::mem;
29use std::pin::Pin;
30use std::sync::{Arc, Mutex};
31use std::task::{Context, Poll};
32
33use futures_core::stream::Stream;
34use pin_project_lite::pin_project;
35
36use crate::future;
37use crate::ready;
38
39const DEFAULT_BUF_SIZE: usize = 8 * 1024;
40
41/// Copies the entire contents of a reader into a writer.
42///
43/// This function will read data from `reader` and write it into `writer` in a streaming fashion
44/// until `reader` returns EOF.
45///
46/// On success, returns the total number of bytes copied.
47///
48/// # Examples
49///
50/// ```
51/// use futures_lite::io::{self, BufReader, BufWriter};
52///
53/// # spin_on::spin_on(async {
54/// let input: &[u8] = b"hello";
55/// let reader = BufReader::new(input);
56///
57/// let mut output = Vec::new();
58/// let writer = BufWriter::new(&mut output);
59///
60/// io::copy(reader, writer).await?;
61/// # std::io::Result::Ok(()) });
62/// ```
63pub async fn copy<R, W>(reader: R, writer: W) -> Result<u64>
64where
65    R: AsyncRead + Unpin,
66    W: AsyncWrite + Unpin,
67{
68    pin_project! {
69        struct CopyFuture<R, W> {
70            #[pin]
71            reader: R,
72            #[pin]
73            writer: W,
74            amt: u64,
75        }
76    }
77
78    impl<R, W> Future for CopyFuture<R, W>
79    where
80        R: AsyncBufRead,
81        W: AsyncWrite + Unpin,
82    {
83        type Output = Result<u64>;
84
85        fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
86            let mut this = self.project();
87            loop {
88                let buffer = ready!(this.reader.as_mut().poll_fill_buf(cx))?;
89                if buffer.is_empty() {
90                    ready!(this.writer.as_mut().poll_flush(cx))?;
91                    return Poll::Ready(Ok(*this.amt));
92                }
93
94                let i = ready!(this.writer.as_mut().poll_write(cx, buffer))?;
95                if i == 0 {
96                    return Poll::Ready(Err(ErrorKind::WriteZero.into()));
97                }
98                *this.amt += i as u64;
99                this.reader.as_mut().consume(i);
100            }
101        }
102    }
103
104    let future = CopyFuture {
105        reader: BufReader::new(reader),
106        writer,
107        amt: 0,
108    };
109    future.await
110}
111
112/// Asserts that a type implementing [`std::io`] traits can be used as an async type.
113///
114/// The underlying I/O handle should never block nor return the [`ErrorKind::WouldBlock`] error.
115/// This is usually the case for in-memory buffered I/O.
116///
117/// # Examples
118///
119/// ```
120/// use futures_lite::io::{AssertAsync, AsyncReadExt};
121///
122/// let reader: &[u8] = b"hello";
123///
124/// # spin_on::spin_on(async {
125/// let mut async_reader = AssertAsync::new(reader);
126/// let mut contents = String::new();
127///
128/// // This line works in async manner - note that there is await:
129/// async_reader.read_to_string(&mut contents).await?;
130/// # std::io::Result::Ok(()) });
131/// ```
132#[derive(Debug, Copy, Clone, Eq, PartialEq, Ord, PartialOrd, Hash)]
133pub struct AssertAsync<T>(T);
134
135impl<T> Unpin for AssertAsync<T> {}
136
137impl<T> AssertAsync<T> {
138    /// Wraps an I/O handle implementing [`std::io`] traits.
139    ///
140    /// # Examples
141    ///
142    /// ```
143    /// use futures_lite::io::AssertAsync;
144    ///
145    /// let reader: &[u8] = b"hello";
146    ///
147    /// let async_reader = AssertAsync::new(reader);
148    /// ```
149    #[inline(always)]
150    pub fn new(io: T) -> Self {
151        AssertAsync(io)
152    }
153
154    /// Gets a reference to the inner I/O handle.
155    ///
156    /// # Examples
157    ///
158    /// ```
159    /// use futures_lite::io::AssertAsync;
160    ///
161    /// let reader: &[u8] = b"hello";
162    ///
163    /// let async_reader = AssertAsync::new(reader);
164    /// let r = async_reader.get_ref();
165    /// ```
166    #[inline(always)]
167    pub fn get_ref(&self) -> &T {
168        &self.0
169    }
170
171    /// Gets a mutable reference to the inner I/O handle.
172    ///
173    /// # Examples
174    ///
175    /// ```
176    /// use futures_lite::io::AssertAsync;
177    ///
178    /// let reader: &[u8] = b"hello";
179    ///
180    /// let mut async_reader = AssertAsync::new(reader);
181    /// let r = async_reader.get_mut();
182    /// ```
183    #[inline(always)]
184    pub fn get_mut(&mut self) -> &mut T {
185        &mut self.0
186    }
187
188    /// Extracts the inner I/O handle.
189    ///
190    /// # Examples
191    ///
192    /// ```
193    /// use futures_lite::io::AssertAsync;
194    ///
195    /// let reader: &[u8] = b"hello";
196    ///
197    /// let async_reader = AssertAsync::new(reader);
198    /// let inner = async_reader.into_inner();
199    /// ```
200    #[inline(always)]
201    pub fn into_inner(self) -> T {
202        self.0
203    }
204}
205
206fn assert_async_wrapio<F, T>(mut f: F) -> Poll<std::io::Result<T>>
207where
208    F: FnMut() -> std::io::Result<T>,
209{
210    loop {
211        match f() {
212            Err(err) if err.kind() == ErrorKind::Interrupted => {}
213            res => return Poll::Ready(res),
214        }
215    }
216}
217
218impl<T: std::io::Read> AsyncRead for AssertAsync<T> {
219    #[inline]
220    fn poll_read(
221        mut self: Pin<&mut Self>,
222        _: &mut Context<'_>,
223        buf: &mut [u8],
224    ) -> Poll<Result<usize>> {
225        assert_async_wrapio(move || self.0.read(buf))
226    }
227
228    #[inline]
229    fn poll_read_vectored(
230        mut self: Pin<&mut Self>,
231        _: &mut Context<'_>,
232        bufs: &mut [IoSliceMut<'_>],
233    ) -> Poll<Result<usize>> {
234        assert_async_wrapio(move || self.0.read_vectored(bufs))
235    }
236}
237
238impl<T: std::io::Write> AsyncWrite for AssertAsync<T> {
239    #[inline]
240    fn poll_write(
241        mut self: Pin<&mut Self>,
242        _: &mut Context<'_>,
243        buf: &[u8],
244    ) -> Poll<Result<usize>> {
245        assert_async_wrapio(move || self.0.write(buf))
246    }
247
248    #[inline]
249    fn poll_write_vectored(
250        mut self: Pin<&mut Self>,
251        _: &mut Context<'_>,
252        bufs: &[IoSlice<'_>],
253    ) -> Poll<Result<usize>> {
254        assert_async_wrapio(move || self.0.write_vectored(bufs))
255    }
256
257    #[inline]
258    fn poll_flush(mut self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Result<()>> {
259        assert_async_wrapio(move || self.0.flush())
260    }
261
262    #[inline]
263    fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<()>> {
264        self.poll_flush(cx)
265    }
266}
267
268impl<T: std::io::Seek> AsyncSeek for AssertAsync<T> {
269    #[inline]
270    fn poll_seek(
271        mut self: Pin<&mut Self>,
272        _: &mut Context<'_>,
273        pos: SeekFrom,
274    ) -> Poll<Result<u64>> {
275        assert_async_wrapio(move || self.0.seek(pos))
276    }
277}
278
279/// A wrapper around a type that implements `AsyncRead` or `AsyncWrite` that converts `Pending`
280/// polls to `WouldBlock` errors.
281///
282/// This wrapper can be used as a compatibility layer between `AsyncRead` and `Read`, for types
283/// that take `Read` as a parameter.
284///
285/// # Examples
286///
287/// ```
288/// use std::io::Read;
289/// use std::task::{Poll, Context};
290///
291/// fn poll_for_io(cx: &mut Context<'_>) -> Poll<usize> {
292///     // Assume we have a library that's built around `Read` and `Write` traits.
293///     use cooltls::Session;
294///
295///     // We want to use it with our writer that implements `AsyncWrite`.
296///     let writer = Stream::new();
297///
298///     // First, we wrap our `Writer` with `AsyncAsSync` to convert `Pending` polls to `WouldBlock`.
299///     use futures_lite::io::AsyncAsSync;
300///     let writer = AsyncAsSync::new(cx, writer);
301///
302///     // Now, we can use it with `cooltls`.
303///     let mut session = Session::new(writer);
304///
305///     // Match on the result of `read()` and translate it to poll.
306///     match session.read(&mut [0; 1024]) {
307///         Ok(n) => Poll::Ready(n),
308///         Err(err) if err.kind() == std::io::ErrorKind::WouldBlock => Poll::Pending,
309///         Err(err) => panic!("unexpected error: {}", err),
310///     }
311/// }
312///
313/// // Usually, poll-based functions are best wrapped using `poll_fn`.
314/// use futures_lite::future::poll_fn;
315/// # futures_lite::future::block_on(async {
316/// poll_fn(|cx| poll_for_io(cx)).await;
317/// # });
318/// # struct Stream;
319/// # impl Stream {
320/// #     fn new() -> Stream {
321/// #         Stream
322/// #     }
323/// # }
324/// # impl futures_lite::io::AsyncRead for Stream {
325/// #     fn poll_read(self: std::pin::Pin<&mut Self>, _: &mut Context<'_>, _: &mut [u8]) -> Poll<std::io::Result<usize>> {
326/// #         Poll::Ready(Ok(0))
327/// #     }
328/// # }
329/// # mod cooltls {
330/// #     pub struct Session<W> {
331/// #         reader: W,
332/// #     }
333/// #     impl<W> Session<W> {
334/// #         pub fn new(reader: W) -> Session<W> {
335/// #             Session { reader }
336/// #         }
337/// #     }
338/// #     impl<W: std::io::Read> std::io::Read for Session<W> {
339/// #         fn read(&mut self, buf: &mut [u8]) -> std::io::Result<usize> {
340/// #             self.reader.read(buf)
341/// #         }
342/// #     }
343/// # }
344/// ```
345#[derive(Debug)]
346pub struct AsyncAsSync<'r, 'ctx, T> {
347    /// The context we are using to poll the future.
348    pub context: &'r mut Context<'ctx>,
349
350    /// The actual reader/writer we are wrapping.
351    pub inner: T,
352}
353
354impl<'r, 'ctx, T> AsyncAsSync<'r, 'ctx, T> {
355    /// Wraps an I/O handle implementing [`AsyncRead`] or [`AsyncWrite`] traits.
356    ///
357    /// # Examples
358    ///
359    /// ```
360    /// use futures_lite::io::AsyncAsSync;
361    /// use std::task::Context;
362    /// use waker_fn::waker_fn;
363    ///
364    /// let reader: &[u8] = b"hello";
365    /// let waker = waker_fn(|| {});
366    /// let mut context = Context::from_waker(&waker);
367    ///
368    /// let async_reader = AsyncAsSync::new(&mut context, reader);
369    /// ```
370    #[inline]
371    pub fn new(context: &'r mut Context<'ctx>, inner: T) -> Self {
372        AsyncAsSync { context, inner }
373    }
374
375    /// Attempt to shutdown the I/O handle.
376    ///
377    /// # Examples
378    ///
379    /// ```
380    /// use futures_lite::io::AsyncAsSync;
381    /// use std::task::Context;
382    /// use waker_fn::waker_fn;
383    ///
384    /// let reader: Vec<u8> = b"hello".to_vec();
385    /// let waker = waker_fn(|| {});
386    /// let mut context = Context::from_waker(&waker);
387    ///
388    /// let mut async_reader = AsyncAsSync::new(&mut context, reader);
389    /// async_reader.close().unwrap();
390    /// ```
391    #[inline]
392    pub fn close(&mut self) -> Result<()>
393    where
394        T: AsyncWrite + Unpin,
395    {
396        self.poll_with(|io, cx| io.poll_close(cx))
397    }
398
399    /// Poll this `AsyncAsSync` for some function.
400    ///
401    /// # Examples
402    ///
403    /// ```
404    /// use futures_lite::io::{AsyncAsSync, AsyncRead};
405    /// use std::task::Context;
406    /// use waker_fn::waker_fn;
407    ///
408    /// let reader: &[u8] = b"hello";
409    /// let waker = waker_fn(|| {});
410    /// let mut context = Context::from_waker(&waker);
411    ///
412    /// let mut async_reader = AsyncAsSync::new(&mut context, reader);
413    /// let r = async_reader.poll_with(|io, cx| io.poll_read(cx, &mut [0; 1024]));
414    /// assert_eq!(r.unwrap(), 5);
415    /// ```
416    #[inline]
417    pub fn poll_with<R>(
418        &mut self,
419        f: impl FnOnce(Pin<&mut T>, &mut Context<'_>) -> Poll<Result<R>>,
420    ) -> Result<R>
421    where
422        T: Unpin,
423    {
424        match f(Pin::new(&mut self.inner), self.context) {
425            Poll::Ready(res) => res,
426            Poll::Pending => Err(ErrorKind::WouldBlock.into()),
427        }
428    }
429}
430
431impl<T: AsyncRead + Unpin> std::io::Read for AsyncAsSync<'_, '_, T> {
432    #[inline]
433    fn read(&mut self, buf: &mut [u8]) -> Result<usize> {
434        self.poll_with(|io, cx| io.poll_read(cx, buf))
435    }
436
437    #[inline]
438    fn read_vectored(&mut self, bufs: &mut [IoSliceMut<'_>]) -> Result<usize> {
439        self.poll_with(|io, cx| io.poll_read_vectored(cx, bufs))
440    }
441}
442
443impl<T: AsyncWrite + Unpin> std::io::Write for AsyncAsSync<'_, '_, T> {
444    #[inline]
445    fn write(&mut self, buf: &[u8]) -> Result<usize> {
446        self.poll_with(|io, cx| io.poll_write(cx, buf))
447    }
448
449    #[inline]
450    fn write_vectored(&mut self, bufs: &[IoSlice<'_>]) -> Result<usize> {
451        self.poll_with(|io, cx| io.poll_write_vectored(cx, bufs))
452    }
453
454    #[inline]
455    fn flush(&mut self) -> Result<()> {
456        self.poll_with(|io, cx| io.poll_flush(cx))
457    }
458}
459
460impl<T: AsyncSeek + Unpin> std::io::Seek for AsyncAsSync<'_, '_, T> {
461    #[inline]
462    fn seek(&mut self, pos: SeekFrom) -> Result<u64> {
463        self.poll_with(|io, cx| io.poll_seek(cx, pos))
464    }
465}
466
467impl<T> AsRef<T> for AsyncAsSync<'_, '_, T> {
468    #[inline]
469    fn as_ref(&self) -> &T {
470        &self.inner
471    }
472}
473
474impl<T> AsMut<T> for AsyncAsSync<'_, '_, T> {
475    #[inline]
476    fn as_mut(&mut self) -> &mut T {
477        &mut self.inner
478    }
479}
480
481impl<T> Borrow<T> for AsyncAsSync<'_, '_, T> {
482    #[inline]
483    fn borrow(&self) -> &T {
484        &self.inner
485    }
486}
487
488impl<T> BorrowMut<T> for AsyncAsSync<'_, '_, T> {
489    #[inline]
490    fn borrow_mut(&mut self) -> &mut T {
491        &mut self.inner
492    }
493}
494
495/// Blocks on all async I/O operations and implements [`std::io`] traits.
496///
497/// Sometimes async I/O needs to be used in a blocking manner. If calling [`future::block_on()`]
498/// manually all the time becomes too tedious, use this type for more convenient blocking on async
499/// I/O operations.
500///
501/// This type implements traits [`Read`][`std::io::Read`], [`Write`][`std::io::Write`], or
502/// [`Seek`][`std::io::Seek`] if the inner type implements [`AsyncRead`], [`AsyncWrite`], or
503/// [`AsyncSeek`], respectively.
504///
505/// If writing data through the [`Write`][`std::io::Write`] trait, make sure to flush before
506/// dropping the [`BlockOn`] handle or some buffered data might get lost.
507///
508/// # Examples
509///
510/// ```
511/// use futures_lite::io::BlockOn;
512/// use futures_lite::pin;
513/// use std::io::Read;
514///
515/// let reader: &[u8] = b"hello";
516/// pin!(reader);
517///
518/// let mut blocking_reader = BlockOn::new(reader);
519/// let mut contents = String::new();
520///
521/// // This line blocks - note that there is no await:
522/// blocking_reader.read_to_string(&mut contents)?;
523/// # std::io::Result::Ok(())
524/// ```
525#[derive(Debug)]
526pub struct BlockOn<T>(T);
527
528impl<T> BlockOn<T> {
529    /// Wraps an async I/O handle into a blocking interface.
530    ///
531    /// # Examples
532    ///
533    /// ```
534    /// use futures_lite::io::BlockOn;
535    /// use futures_lite::pin;
536    ///
537    /// let reader: &[u8] = b"hello";
538    /// pin!(reader);
539    ///
540    /// let blocking_reader = BlockOn::new(reader);
541    /// ```
542    pub fn new(io: T) -> BlockOn<T> {
543        BlockOn(io)
544    }
545
546    /// Gets a reference to the async I/O handle.
547    ///
548    /// # Examples
549    ///
550    /// ```
551    /// use futures_lite::io::BlockOn;
552    /// use futures_lite::pin;
553    ///
554    /// let reader: &[u8] = b"hello";
555    /// pin!(reader);
556    ///
557    /// let blocking_reader = BlockOn::new(reader);
558    /// let r = blocking_reader.get_ref();
559    /// ```
560    pub fn get_ref(&self) -> &T {
561        &self.0
562    }
563
564    /// Gets a mutable reference to the async I/O handle.
565    ///
566    /// # Examples
567    ///
568    /// ```
569    /// use futures_lite::io::BlockOn;
570    /// use futures_lite::pin;
571    ///
572    /// let reader: &[u8] = b"hello";
573    /// pin!(reader);
574    ///
575    /// let mut blocking_reader = BlockOn::new(reader);
576    /// let r = blocking_reader.get_mut();
577    /// ```
578    pub fn get_mut(&mut self) -> &mut T {
579        &mut self.0
580    }
581
582    /// Extracts the inner async I/O handle.
583    ///
584    /// # Examples
585    ///
586    /// ```
587    /// use futures_lite::io::BlockOn;
588    /// use futures_lite::pin;
589    ///
590    /// let reader: &[u8] = b"hello";
591    /// pin!(reader);
592    ///
593    /// let blocking_reader = BlockOn::new(reader);
594    /// let inner = blocking_reader.into_inner();
595    /// ```
596    pub fn into_inner(self) -> T {
597        self.0
598    }
599}
600
601impl<T: AsyncRead + Unpin> std::io::Read for BlockOn<T> {
602    fn read(&mut self, buf: &mut [u8]) -> Result<usize> {
603        future::block_on(self.0.read(buf))
604    }
605}
606
607impl<T: AsyncBufRead + Unpin> std::io::BufRead for BlockOn<T> {
608    fn fill_buf(&mut self) -> Result<&[u8]> {
609        future::block_on(self.0.fill_buf())
610    }
611
612    fn consume(&mut self, amt: usize) {
613        Pin::new(&mut self.0).consume(amt)
614    }
615}
616
617impl<T: AsyncWrite + Unpin> std::io::Write for BlockOn<T> {
618    fn write(&mut self, buf: &[u8]) -> Result<usize> {
619        future::block_on(self.0.write(buf))
620    }
621
622    fn flush(&mut self) -> Result<()> {
623        future::block_on(self.0.flush())
624    }
625}
626
627impl<T: AsyncSeek + Unpin> std::io::Seek for BlockOn<T> {
628    fn seek(&mut self, pos: SeekFrom) -> Result<u64> {
629        future::block_on(self.0.seek(pos))
630    }
631}
632
633pin_project! {
634    /// Adds buffering to a reader.
635    ///
636    /// It can be excessively inefficient to work directly with an [`AsyncRead`] instance. A
637    /// [`BufReader`] performs large, infrequent reads on the underlying [`AsyncRead`] and
638    /// maintains an in-memory buffer of the incoming byte stream.
639    ///
640    /// [`BufReader`] can improve the speed of programs that make *small* and *repeated* reads to
641    /// the same file or networking socket. It does not help when reading very large amounts at
642    /// once, or reading just once or a few times. It also provides no advantage when reading from
643    /// a source that is already in memory, like a `Vec<u8>`.
644    ///
645    /// When a [`BufReader`] is dropped, the contents of its buffer are discarded. Creating
646    /// multiple instances of [`BufReader`] on the same reader can cause data loss.
647    ///
648    /// # Examples
649    ///
650    /// ```
651    /// use futures_lite::io::{AsyncBufReadExt, BufReader};
652    ///
653    /// # spin_on::spin_on(async {
654    /// let input: &[u8] = b"hello";
655    /// let mut reader = BufReader::new(input);
656    ///
657    /// let mut line = String::new();
658    /// reader.read_line(&mut line).await?;
659    /// # std::io::Result::Ok(()) });
660    /// ```
661    pub struct BufReader<R> {
662        #[pin]
663        inner: R,
664        buf: Box<[u8]>,
665        pos: usize,
666        cap: usize,
667    }
668}
669
670impl<R: AsyncRead> BufReader<R> {
671    /// Creates a buffered reader with the default buffer capacity.
672    ///
673    /// The default capacity is currently 8 KB, but that may change in the future.
674    ///
675    /// # Examples
676    ///
677    /// ```
678    /// use futures_lite::io::BufReader;
679    ///
680    /// let input: &[u8] = b"hello";
681    /// let reader = BufReader::new(input);
682    /// ```
683    pub fn new(inner: R) -> BufReader<R> {
684        BufReader::with_capacity(DEFAULT_BUF_SIZE, inner)
685    }
686
687    /// Creates a buffered reader with the specified capacity.
688    ///
689    /// # Examples
690    ///
691    /// ```
692    /// use futures_lite::io::BufReader;
693    ///
694    /// let input: &[u8] = b"hello";
695    /// let reader = BufReader::with_capacity(1024, input);
696    /// ```
697    pub fn with_capacity(capacity: usize, inner: R) -> BufReader<R> {
698        BufReader {
699            inner,
700            buf: vec![0; capacity].into_boxed_slice(),
701            pos: 0,
702            cap: 0,
703        }
704    }
705}
706
707impl<R> BufReader<R> {
708    /// Gets a reference to the underlying reader.
709    ///
710    /// It is not advisable to directly read from the underlying reader.
711    ///
712    /// # Examples
713    ///
714    /// ```
715    /// use futures_lite::io::BufReader;
716    ///
717    /// let input: &[u8] = b"hello";
718    /// let reader = BufReader::new(input);
719    ///
720    /// let r = reader.get_ref();
721    /// ```
722    pub fn get_ref(&self) -> &R {
723        &self.inner
724    }
725
726    /// Gets a mutable reference to the underlying reader.
727    ///
728    /// It is not advisable to directly read from the underlying reader.
729    ///
730    /// # Examples
731    ///
732    /// ```
733    /// use futures_lite::io::BufReader;
734    ///
735    /// let input: &[u8] = b"hello";
736    /// let mut reader = BufReader::new(input);
737    ///
738    /// let r = reader.get_mut();
739    /// ```
740    pub fn get_mut(&mut self) -> &mut R {
741        &mut self.inner
742    }
743
744    /// Gets a pinned mutable reference to the underlying reader.
745    ///
746    /// It is not advisable to directly read from the underlying reader.
747    fn get_pin_mut(self: Pin<&mut Self>) -> Pin<&mut R> {
748        self.project().inner
749    }
750
751    /// Returns a reference to the internal buffer.
752    ///
753    /// This method will not attempt to fill the buffer if it is empty.
754    ///
755    /// # Examples
756    ///
757    /// ```
758    /// use futures_lite::io::BufReader;
759    ///
760    /// let input: &[u8] = b"hello";
761    /// let reader = BufReader::new(input);
762    ///
763    /// // The internal buffer is empty until the first read request.
764    /// assert_eq!(reader.buffer(), &[]);
765    /// ```
766    pub fn buffer(&self) -> &[u8] {
767        &self.buf[self.pos..self.cap]
768    }
769
770    /// Unwraps the buffered reader, returning the underlying reader.
771    ///
772    /// Note that any leftover data in the internal buffer will be lost.
773    ///
774    /// # Examples
775    ///
776    /// ```
777    /// use futures_lite::io::BufReader;
778    ///
779    /// let input: &[u8] = b"hello";
780    /// let reader = BufReader::new(input);
781    ///
782    /// assert_eq!(reader.into_inner(), input);
783    /// ```
784    pub fn into_inner(self) -> R {
785        self.inner
786    }
787
788    /// Invalidates all data in the internal buffer.
789    #[inline]
790    fn discard_buffer(self: Pin<&mut Self>) {
791        let this = self.project();
792        *this.pos = 0;
793        *this.cap = 0;
794    }
795}
796
797impl<R: AsyncRead> AsyncRead for BufReader<R> {
798    fn poll_read(
799        mut self: Pin<&mut Self>,
800        cx: &mut Context<'_>,
801        buf: &mut [u8],
802    ) -> Poll<Result<usize>> {
803        // If we don't have any buffered data and we're doing a massive read
804        // (larger than our internal buffer), bypass our internal buffer
805        // entirely.
806        if self.pos == self.cap && buf.len() >= self.buf.len() {
807            let res = ready!(self.as_mut().get_pin_mut().poll_read(cx, buf));
808            self.discard_buffer();
809            return Poll::Ready(res);
810        }
811        let mut rem = ready!(self.as_mut().poll_fill_buf(cx))?;
812        let nread = std::io::Read::read(&mut rem, buf)?;
813        self.consume(nread);
814        Poll::Ready(Ok(nread))
815    }
816
817    fn poll_read_vectored(
818        mut self: Pin<&mut Self>,
819        cx: &mut Context<'_>,
820        bufs: &mut [IoSliceMut<'_>],
821    ) -> Poll<Result<usize>> {
822        let total_len = bufs.iter().map(|b| b.len()).sum::<usize>();
823        if self.pos == self.cap && total_len >= self.buf.len() {
824            let res = ready!(self.as_mut().get_pin_mut().poll_read_vectored(cx, bufs));
825            self.discard_buffer();
826            return Poll::Ready(res);
827        }
828        let mut rem = ready!(self.as_mut().poll_fill_buf(cx))?;
829        let nread = std::io::Read::read_vectored(&mut rem, bufs)?;
830        self.consume(nread);
831        Poll::Ready(Ok(nread))
832    }
833}
834
835impl<R: AsyncRead> AsyncBufRead for BufReader<R> {
836    fn poll_fill_buf<'a>(self: Pin<&'a mut Self>, cx: &mut Context<'_>) -> Poll<Result<&'a [u8]>> {
837        let mut this = self.project();
838
839        // If we've reached the end of our internal buffer then we need to fetch
840        // some more data from the underlying reader.
841        // Branch using `>=` instead of the more correct `==`
842        // to tell the compiler that the pos..cap slice is always valid.
843        if *this.pos >= *this.cap {
844            debug_assert!(*this.pos == *this.cap);
845            *this.cap = ready!(this.inner.as_mut().poll_read(cx, this.buf))?;
846            *this.pos = 0;
847        }
848        Poll::Ready(Ok(&this.buf[*this.pos..*this.cap]))
849    }
850
851    fn consume(self: Pin<&mut Self>, amt: usize) {
852        let this = self.project();
853        *this.pos = cmp::min(*this.pos + amt, *this.cap);
854    }
855}
856
857impl<R: fmt::Debug> fmt::Debug for BufReader<R> {
858    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
859        f.debug_struct("BufReader")
860            .field("reader", &self.inner)
861            .field(
862                "buffer",
863                &format_args!("{}/{}", self.cap - self.pos, self.buf.len()),
864            )
865            .finish()
866    }
867}
868
869impl<R: AsyncSeek> AsyncSeek for BufReader<R> {
870    /// Seeks to an offset, in bytes, in the underlying reader.
871    ///
872    /// The position used for seeking with [`SeekFrom::Current`] is the position the underlying
873    /// reader would be at if the [`BufReader`] had no internal buffer.
874    ///
875    /// Seeking always discards the internal buffer, even if the seek position would otherwise fall
876    /// within it. This guarantees that calling [`into_inner()`][`BufReader::into_inner()`]
877    /// immediately after a seek yields the underlying reader at the same position.
878    ///
879    /// See [`AsyncSeek`] for more details.
880    ///
881    /// Note: In the edge case where you're seeking with `SeekFrom::Current(n)` where `n` minus the
882    /// internal buffer length overflows an `i64`, two seeks will be performed instead of one. If
883    /// the second seek returns `Err`, the underlying reader will be left at the same position it
884    /// would have if you called [`seek()`][`AsyncSeekExt::seek()`] with `SeekFrom::Current(0)`.
885    fn poll_seek(
886        mut self: Pin<&mut Self>,
887        cx: &mut Context<'_>,
888        pos: SeekFrom,
889    ) -> Poll<Result<u64>> {
890        let result: u64;
891        if let SeekFrom::Current(n) = pos {
892            let remainder = (self.cap - self.pos) as i64;
893            // it should be safe to assume that remainder fits within an i64 as the alternative
894            // means we managed to allocate 8 exbibytes and that's absurd.
895            // But it's not out of the realm of possibility for some weird underlying reader to
896            // support seeking by i64::min_value() so we need to handle underflow when subtracting
897            // remainder.
898            if let Some(offset) = n.checked_sub(remainder) {
899                result = ready!(self
900                    .as_mut()
901                    .get_pin_mut()
902                    .poll_seek(cx, SeekFrom::Current(offset)))?;
903            } else {
904                // seek backwards by our remainder, and then by the offset
905                ready!(self
906                    .as_mut()
907                    .get_pin_mut()
908                    .poll_seek(cx, SeekFrom::Current(-remainder)))?;
909                self.as_mut().discard_buffer();
910                result = ready!(self
911                    .as_mut()
912                    .get_pin_mut()
913                    .poll_seek(cx, SeekFrom::Current(n)))?;
914            }
915        } else {
916            // Seeking with Start/End doesn't care about our buffer length.
917            result = ready!(self.as_mut().get_pin_mut().poll_seek(cx, pos))?;
918        }
919        self.discard_buffer();
920        Poll::Ready(Ok(result))
921    }
922}
923
924impl<R: AsyncWrite> AsyncWrite for BufReader<R> {
925    fn poll_write(
926        mut self: Pin<&mut Self>,
927        cx: &mut Context<'_>,
928        buf: &[u8],
929    ) -> Poll<Result<usize>> {
930        self.as_mut().get_pin_mut().poll_write(cx, buf)
931    }
932
933    fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<()>> {
934        self.as_mut().get_pin_mut().poll_flush(cx)
935    }
936
937    fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<()>> {
938        self.as_mut().get_pin_mut().poll_close(cx)
939    }
940}
941
942pin_project! {
943    /// Adds buffering to a writer.
944    ///
945    /// It can be excessively inefficient to work directly with something that implements
946    /// [`AsyncWrite`]. For example, every call to [`write()`][`AsyncWriteExt::write()`] on a TCP
947    /// stream results in a system call. A [`BufWriter`] keeps an in-memory buffer of data and
948    /// writes it to the underlying writer in large, infrequent batches.
949    ///
950    /// [`BufWriter`] can improve the speed of programs that make *small* and *repeated* writes to
951    /// the same file or networking socket. It does not help when writing very large amounts at
952    /// once, or writing just once or a few times. It also provides no advantage when writing to a
953    /// destination that is in memory, like a `Vec<u8>`.
954    ///
955    /// Unlike [`std::io::BufWriter`], this type does not write out the contents of its buffer when
956    /// it is dropped. Therefore, it is important that users explicitly flush the buffer before
957    /// dropping the [`BufWriter`].
958    ///
959    /// # Examples
960    ///
961    /// ```
962    /// use futures_lite::io::{AsyncWriteExt, BufWriter};
963    ///
964    /// # spin_on::spin_on(async {
965    /// let mut output = Vec::new();
966    /// let mut writer = BufWriter::new(&mut output);
967    ///
968    /// writer.write_all(b"hello").await?;
969    /// writer.flush().await?;
970    /// # std::io::Result::Ok(()) });
971    /// ```
972    pub struct BufWriter<W> {
973        #[pin]
974        inner: W,
975        buf: Vec<u8>,
976        written: usize,
977    }
978}
979
980impl<W: AsyncWrite> BufWriter<W> {
981    /// Creates a buffered writer with the default buffer capacity.
982    ///
983    /// The default capacity is currently 8 KB, but that may change in the future.
984    ///
985    /// # Examples
986    ///
987    /// ```
988    /// use futures_lite::io::BufWriter;
989    ///
990    /// let mut output = Vec::new();
991    /// let writer = BufWriter::new(&mut output);
992    /// ```
993    pub fn new(inner: W) -> BufWriter<W> {
994        BufWriter::with_capacity(DEFAULT_BUF_SIZE, inner)
995    }
996
997    /// Creates a buffered writer with the specified buffer capacity.
998    ///
999    /// # Examples
1000    ///
1001    /// ```
1002    /// use futures_lite::io::BufWriter;
1003    ///
1004    /// let mut output = Vec::new();
1005    /// let writer = BufWriter::with_capacity(100, &mut output);
1006    /// ```
1007    pub fn with_capacity(capacity: usize, inner: W) -> BufWriter<W> {
1008        BufWriter {
1009            inner,
1010            buf: Vec::with_capacity(capacity),
1011            written: 0,
1012        }
1013    }
1014
1015    /// Gets a reference to the underlying writer.
1016    ///
1017    /// # Examples
1018    ///
1019    /// ```
1020    /// use futures_lite::io::BufWriter;
1021    ///
1022    /// let mut output = Vec::new();
1023    /// let writer = BufWriter::new(&mut output);
1024    ///
1025    /// let r = writer.get_ref();
1026    /// ```
1027    pub fn get_ref(&self) -> &W {
1028        &self.inner
1029    }
1030
1031    /// Gets a mutable reference to the underlying writer.
1032    ///
1033    /// It is not advisable to directly write to the underlying writer.
1034    ///
1035    /// # Examples
1036    ///
1037    /// ```
1038    /// use futures_lite::io::BufWriter;
1039    ///
1040    /// let mut output = Vec::new();
1041    /// let mut writer = BufWriter::new(&mut output);
1042    ///
1043    /// let r = writer.get_mut();
1044    /// ```
1045    pub fn get_mut(&mut self) -> &mut W {
1046        &mut self.inner
1047    }
1048
1049    /// Gets a pinned mutable reference to the underlying writer.
1050    ///
1051    /// It is not not advisable to directly write to the underlying writer.
1052    fn get_pin_mut(self: Pin<&mut Self>) -> Pin<&mut W> {
1053        self.project().inner
1054    }
1055
1056    /// Unwraps the buffered writer, returning the underlying writer.
1057    ///
1058    /// Note that any leftover data in the internal buffer will be lost. If you don't want to lose
1059    /// that data, flush the buffered writer before unwrapping it.
1060    ///
1061    /// # Examples
1062    ///
1063    /// ```
1064    /// use futures_lite::io::{AsyncWriteExt, BufWriter};
1065    ///
1066    /// # spin_on::spin_on(async {
1067    /// let mut output = vec![1, 2, 3];
1068    /// let mut writer = BufWriter::new(&mut output);
1069    ///
1070    /// writer.write_all(&[4]).await?;
1071    /// writer.flush().await?;
1072    /// assert_eq!(writer.into_inner(), &[1, 2, 3, 4]);
1073    /// # std::io::Result::Ok(()) });
1074    /// ```
1075    pub fn into_inner(self) -> W {
1076        self.inner
1077    }
1078
1079    /// Returns a reference to the internal buffer.
1080    ///
1081    /// # Examples
1082    ///
1083    /// ```
1084    /// use futures_lite::io::BufWriter;
1085    ///
1086    /// let mut output = Vec::new();
1087    /// let writer = BufWriter::new(&mut output);
1088    ///
1089    /// // The internal buffer is empty until the first write request.
1090    /// assert_eq!(writer.buffer(), &[]);
1091    /// ```
1092    pub fn buffer(&self) -> &[u8] {
1093        &self.buf
1094    }
1095
1096    /// Flush the buffer.
1097    fn poll_flush_buf(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<()>> {
1098        let mut this = self.project();
1099        let len = this.buf.len();
1100        let mut ret = Ok(());
1101
1102        while *this.written < len {
1103            match this
1104                .inner
1105                .as_mut()
1106                .poll_write(cx, &this.buf[*this.written..])
1107            {
1108                Poll::Ready(Ok(0)) => {
1109                    ret = Err(Error::new(
1110                        ErrorKind::WriteZero,
1111                        "Failed to write buffered data",
1112                    ));
1113                    break;
1114                }
1115                Poll::Ready(Ok(n)) => *this.written += n,
1116                Poll::Ready(Err(ref e)) if e.kind() == ErrorKind::Interrupted => {}
1117                Poll::Ready(Err(e)) => {
1118                    ret = Err(e);
1119                    break;
1120                }
1121                Poll::Pending => return Poll::Pending,
1122            }
1123        }
1124
1125        if *this.written > 0 {
1126            this.buf.drain(..*this.written);
1127        }
1128        *this.written = 0;
1129
1130        Poll::Ready(ret)
1131    }
1132}
1133
1134impl<W: fmt::Debug> fmt::Debug for BufWriter<W> {
1135    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1136        f.debug_struct("BufWriter")
1137            .field("writer", &self.inner)
1138            .field("buf", &self.buf)
1139            .finish()
1140    }
1141}
1142
1143impl<W: AsyncWrite> AsyncWrite for BufWriter<W> {
1144    fn poll_write(
1145        mut self: Pin<&mut Self>,
1146        cx: &mut Context<'_>,
1147        buf: &[u8],
1148    ) -> Poll<Result<usize>> {
1149        if self.buf.len() + buf.len() > self.buf.capacity() {
1150            ready!(self.as_mut().poll_flush_buf(cx))?;
1151        }
1152        if buf.len() >= self.buf.capacity() {
1153            self.get_pin_mut().poll_write(cx, buf)
1154        } else {
1155            Pin::new(&mut *self.project().buf).poll_write(cx, buf)
1156        }
1157    }
1158
1159    fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<()>> {
1160        ready!(self.as_mut().poll_flush_buf(cx))?;
1161        self.get_pin_mut().poll_flush(cx)
1162    }
1163
1164    fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<()>> {
1165        ready!(self.as_mut().poll_flush_buf(cx))?;
1166        self.get_pin_mut().poll_close(cx)
1167    }
1168}
1169
1170impl<W: AsyncWrite + AsyncSeek> AsyncSeek for BufWriter<W> {
1171    /// Seek to the offset, in bytes, in the underlying writer.
1172    ///
1173    /// Seeking always writes out the internal buffer before seeking.
1174    fn poll_seek(
1175        mut self: Pin<&mut Self>,
1176        cx: &mut Context<'_>,
1177        pos: SeekFrom,
1178    ) -> Poll<Result<u64>> {
1179        ready!(self.as_mut().poll_flush_buf(cx))?;
1180        self.get_pin_mut().poll_seek(cx, pos)
1181    }
1182}
1183
1184/// Gives an in-memory buffer a cursor for reading and writing.
1185///
1186/// # Examples
1187///
1188/// ```
1189/// use futures_lite::io::{AsyncReadExt, AsyncSeekExt, AsyncWriteExt, Cursor, SeekFrom};
1190///
1191/// # spin_on::spin_on(async {
1192/// let mut bytes = b"hello".to_vec();
1193/// let mut cursor = Cursor::new(&mut bytes);
1194///
1195/// // Overwrite 'h' with 'H'.
1196/// cursor.write_all(b"H").await?;
1197///
1198/// // Move the cursor one byte forward.
1199/// cursor.seek(SeekFrom::Current(1)).await?;
1200///
1201/// // Read a byte.
1202/// let mut byte = [0];
1203/// cursor.read_exact(&mut byte).await?;
1204/// assert_eq!(&byte, b"l");
1205///
1206/// // Check the final buffer.
1207/// assert_eq!(bytes, b"Hello");
1208/// # std::io::Result::Ok(()) });
1209/// ```
1210#[derive(Clone, Debug, Default)]
1211pub struct Cursor<T> {
1212    inner: std::io::Cursor<T>,
1213}
1214
1215impl<T> Cursor<T> {
1216    /// Creates a cursor for an in-memory buffer.
1217    ///
1218    /// Cursor's initial position is 0 even if the underlying buffer is not empty. Writing using
1219    /// [`Cursor`] will overwrite the existing contents unless the cursor is moved to the end of
1220    /// the buffer using [`set_position()`][Cursor::set_position()`] or
1221    /// [`seek()`][`AsyncSeekExt::seek()`].
1222    ///
1223    /// # Examples
1224    ///
1225    /// ```
1226    /// use futures_lite::io::Cursor;
1227    ///
1228    /// let cursor = Cursor::new(Vec::<u8>::new());
1229    /// ```
1230    pub fn new(inner: T) -> Cursor<T> {
1231        Cursor {
1232            inner: std::io::Cursor::new(inner),
1233        }
1234    }
1235
1236    /// Gets a reference to the underlying buffer.
1237    ///
1238    /// # Examples
1239    ///
1240    /// ```
1241    /// use futures_lite::io::Cursor;
1242    ///
1243    /// let cursor = Cursor::new(Vec::<u8>::new());
1244    /// let r = cursor.get_ref();
1245    /// ```
1246    pub fn get_ref(&self) -> &T {
1247        self.inner.get_ref()
1248    }
1249
1250    /// Gets a mutable reference to the underlying buffer.
1251    ///
1252    /// # Examples
1253    ///
1254    /// ```
1255    /// use futures_lite::io::Cursor;
1256    ///
1257    /// let mut cursor = Cursor::new(Vec::<u8>::new());
1258    /// let r = cursor.get_mut();
1259    /// ```
1260    pub fn get_mut(&mut self) -> &mut T {
1261        self.inner.get_mut()
1262    }
1263
1264    /// Unwraps the cursor, returning the underlying buffer.
1265    ///
1266    /// # Examples
1267    ///
1268    /// ```
1269    /// use futures_lite::io::Cursor;
1270    ///
1271    /// let cursor = Cursor::new(vec![1, 2, 3]);
1272    /// assert_eq!(cursor.into_inner(), [1, 2, 3]);
1273    /// ```
1274    pub fn into_inner(self) -> T {
1275        self.inner.into_inner()
1276    }
1277
1278    /// Returns the current position of this cursor.
1279    ///
1280    /// # Examples
1281    ///
1282    /// ```
1283    /// use futures_lite::io::{AsyncSeekExt, Cursor, SeekFrom};
1284    ///
1285    /// # spin_on::spin_on(async {
1286    /// let mut cursor = Cursor::new(b"hello");
1287    /// assert_eq!(cursor.position(), 0);
1288    ///
1289    /// cursor.seek(SeekFrom::Start(2)).await?;
1290    /// assert_eq!(cursor.position(), 2);
1291    /// # std::io::Result::Ok(()) });
1292    /// ```
1293    pub fn position(&self) -> u64 {
1294        self.inner.position()
1295    }
1296
1297    /// Sets the position of this cursor.
1298    ///
1299    /// # Examples
1300    ///
1301    /// ```
1302    /// use futures_lite::io::Cursor;
1303    ///
1304    /// let mut cursor = Cursor::new(b"hello");
1305    /// assert_eq!(cursor.position(), 0);
1306    ///
1307    /// cursor.set_position(2);
1308    /// assert_eq!(cursor.position(), 2);
1309    /// ```
1310    pub fn set_position(&mut self, pos: u64) {
1311        self.inner.set_position(pos)
1312    }
1313}
1314
1315impl<T> AsyncSeek for Cursor<T>
1316where
1317    T: AsRef<[u8]> + Unpin,
1318{
1319    fn poll_seek(
1320        mut self: Pin<&mut Self>,
1321        _: &mut Context<'_>,
1322        pos: SeekFrom,
1323    ) -> Poll<Result<u64>> {
1324        Poll::Ready(std::io::Seek::seek(&mut self.inner, pos))
1325    }
1326}
1327
1328impl<T> AsyncRead for Cursor<T>
1329where
1330    T: AsRef<[u8]> + Unpin,
1331{
1332    fn poll_read(
1333        mut self: Pin<&mut Self>,
1334        _cx: &mut Context<'_>,
1335        buf: &mut [u8],
1336    ) -> Poll<Result<usize>> {
1337        Poll::Ready(std::io::Read::read(&mut self.inner, buf))
1338    }
1339
1340    fn poll_read_vectored(
1341        mut self: Pin<&mut Self>,
1342        _: &mut Context<'_>,
1343        bufs: &mut [IoSliceMut<'_>],
1344    ) -> Poll<Result<usize>> {
1345        Poll::Ready(std::io::Read::read_vectored(&mut self.inner, bufs))
1346    }
1347}
1348
1349impl<T> AsyncBufRead for Cursor<T>
1350where
1351    T: AsRef<[u8]> + Unpin,
1352{
1353    fn poll_fill_buf(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Result<&[u8]>> {
1354        Poll::Ready(std::io::BufRead::fill_buf(&mut self.get_mut().inner))
1355    }
1356
1357    fn consume(mut self: Pin<&mut Self>, amt: usize) {
1358        std::io::BufRead::consume(&mut self.inner, amt)
1359    }
1360}
1361
1362impl AsyncWrite for Cursor<&mut [u8]> {
1363    fn poll_write(
1364        mut self: Pin<&mut Self>,
1365        _: &mut Context<'_>,
1366        buf: &[u8],
1367    ) -> Poll<Result<usize>> {
1368        Poll::Ready(std::io::Write::write(&mut self.inner, buf))
1369    }
1370
1371    fn poll_write_vectored(
1372        mut self: Pin<&mut Self>,
1373        _: &mut Context<'_>,
1374        bufs: &[IoSlice<'_>],
1375    ) -> Poll<Result<usize>> {
1376        Poll::Ready(std::io::Write::write_vectored(&mut self.inner, bufs))
1377    }
1378
1379    fn poll_flush(mut self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Result<()>> {
1380        Poll::Ready(std::io::Write::flush(&mut self.inner))
1381    }
1382
1383    fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<()>> {
1384        self.poll_flush(cx)
1385    }
1386}
1387
1388impl AsyncWrite for Cursor<&mut Vec<u8>> {
1389    fn poll_write(
1390        mut self: Pin<&mut Self>,
1391        _: &mut Context<'_>,
1392        buf: &[u8],
1393    ) -> Poll<Result<usize>> {
1394        Poll::Ready(std::io::Write::write(&mut self.inner, buf))
1395    }
1396
1397    fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<()>> {
1398        self.poll_flush(cx)
1399    }
1400
1401    fn poll_flush(mut self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Result<()>> {
1402        Poll::Ready(std::io::Write::flush(&mut self.inner))
1403    }
1404}
1405
1406impl AsyncWrite for Cursor<Vec<u8>> {
1407    fn poll_write(
1408        mut self: Pin<&mut Self>,
1409        _: &mut Context<'_>,
1410        buf: &[u8],
1411    ) -> Poll<Result<usize>> {
1412        Poll::Ready(std::io::Write::write(&mut self.inner, buf))
1413    }
1414
1415    fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<()>> {
1416        self.poll_flush(cx)
1417    }
1418
1419    fn poll_flush(mut self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Result<()>> {
1420        Poll::Ready(std::io::Write::flush(&mut self.inner))
1421    }
1422}
1423
1424/// Creates an empty reader.
1425///
1426/// # Examples
1427///
1428/// ```
1429/// use futures_lite::io::{self, AsyncReadExt};
1430///
1431/// # spin_on::spin_on(async {
1432/// let mut reader = io::empty();
1433///
1434/// let mut contents = Vec::new();
1435/// reader.read_to_end(&mut contents).await?;
1436/// assert!(contents.is_empty());
1437/// # std::io::Result::Ok(()) });
1438/// ```
1439pub fn empty() -> Empty {
1440    Empty { _private: () }
1441}
1442
1443/// Reader for the [`empty()`] function.
1444pub struct Empty {
1445    _private: (),
1446}
1447
1448impl fmt::Debug for Empty {
1449    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1450        f.pad("Empty { .. }")
1451    }
1452}
1453
1454impl AsyncRead for Empty {
1455    #[inline]
1456    fn poll_read(self: Pin<&mut Self>, _: &mut Context<'_>, _: &mut [u8]) -> Poll<Result<usize>> {
1457        Poll::Ready(Ok(0))
1458    }
1459}
1460
1461impl AsyncBufRead for Empty {
1462    #[inline]
1463    fn poll_fill_buf<'a>(self: Pin<&'a mut Self>, _: &mut Context<'_>) -> Poll<Result<&'a [u8]>> {
1464        Poll::Ready(Ok(&[]))
1465    }
1466
1467    #[inline]
1468    fn consume(self: Pin<&mut Self>, _: usize) {}
1469}
1470
1471/// Creates an infinite reader that reads the same byte repeatedly.
1472///
1473/// # Examples
1474///
1475/// ```
1476/// use futures_lite::io::{self, AsyncReadExt};
1477///
1478/// # spin_on::spin_on(async {
1479/// let mut reader = io::repeat(b'a');
1480///
1481/// let mut contents = vec![0; 5];
1482/// reader.read_exact(&mut contents).await?;
1483/// assert_eq!(contents, b"aaaaa");
1484/// # std::io::Result::Ok(()) });
1485/// ```
1486pub fn repeat(byte: u8) -> Repeat {
1487    Repeat { byte }
1488}
1489
1490/// Reader for the [`repeat()`] function.
1491#[derive(Debug)]
1492pub struct Repeat {
1493    byte: u8,
1494}
1495
1496impl AsyncRead for Repeat {
1497    #[inline]
1498    fn poll_read(self: Pin<&mut Self>, _: &mut Context<'_>, buf: &mut [u8]) -> Poll<Result<usize>> {
1499        for b in &mut *buf {
1500            *b = self.byte;
1501        }
1502        Poll::Ready(Ok(buf.len()))
1503    }
1504}
1505
1506/// Creates a writer that consumes and drops all data.
1507///
1508/// # Examples
1509///
1510/// ```
1511/// use futures_lite::io::{self, AsyncWriteExt};
1512///
1513/// # spin_on::spin_on(async {
1514/// let mut writer = io::sink();
1515/// writer.write_all(b"hello").await?;
1516/// # std::io::Result::Ok(()) });
1517/// ```
1518pub fn sink() -> Sink {
1519    Sink { _private: () }
1520}
1521
1522/// Writer for the [`sink()`] function.
1523#[derive(Debug)]
1524pub struct Sink {
1525    _private: (),
1526}
1527
1528impl AsyncWrite for Sink {
1529    #[inline]
1530    fn poll_write(self: Pin<&mut Self>, _: &mut Context<'_>, buf: &[u8]) -> Poll<Result<usize>> {
1531        Poll::Ready(Ok(buf.len()))
1532    }
1533
1534    #[inline]
1535    fn poll_flush(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Result<()>> {
1536        Poll::Ready(Ok(()))
1537    }
1538
1539    #[inline]
1540    fn poll_close(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Result<()>> {
1541        Poll::Ready(Ok(()))
1542    }
1543}
1544
1545/// Extension trait for [`AsyncBufRead`].
1546pub trait AsyncBufReadExt: AsyncBufRead {
1547    /// Returns the contents of the internal buffer, filling it with more data if empty.
1548    ///
1549    /// If the stream has reached EOF, an empty buffer will be returned.
1550    ///
1551    /// # Examples
1552    ///
1553    /// ```
1554    /// use futures_lite::io::{AsyncBufReadExt, BufReader};
1555    /// use std::pin::Pin;
1556    ///
1557    /// # spin_on::spin_on(async {
1558    /// let input: &[u8] = b"hello world";
1559    /// let mut reader = BufReader::with_capacity(5, input);
1560    ///
1561    /// assert_eq!(reader.fill_buf().await?, b"hello");
1562    /// reader.consume(2);
1563    /// assert_eq!(reader.fill_buf().await?, b"llo");
1564    /// reader.consume(3);
1565    /// assert_eq!(reader.fill_buf().await?, b" worl");
1566    /// # std::io::Result::Ok(()) });
1567    /// ```
1568    fn fill_buf(&mut self) -> FillBuf<'_, Self>
1569    where
1570        Self: Unpin,
1571    {
1572        FillBuf { reader: Some(self) }
1573    }
1574
1575    /// Consumes `amt` buffered bytes.
1576    ///
1577    /// This method does not perform any I/O, it simply consumes some amount of bytes from the
1578    /// internal buffer.
1579    ///
1580    /// The `amt` must be <= the number of bytes in the buffer returned by
1581    /// [`fill_buf()`][`AsyncBufReadExt::fill_buf()`].
1582    ///
1583    /// # Examples
1584    ///
1585    /// ```
1586    /// use futures_lite::io::{AsyncBufReadExt, BufReader};
1587    /// use std::pin::Pin;
1588    ///
1589    /// # spin_on::spin_on(async {
1590    /// let input: &[u8] = b"hello";
1591    /// let mut reader = BufReader::with_capacity(4, input);
1592    ///
1593    /// assert_eq!(reader.fill_buf().await?, b"hell");
1594    /// reader.consume(2);
1595    /// assert_eq!(reader.fill_buf().await?, b"ll");
1596    /// # std::io::Result::Ok(()) });
1597    /// ```
1598    fn consume(&mut self, amt: usize)
1599    where
1600        Self: Unpin,
1601    {
1602        AsyncBufRead::consume(Pin::new(self), amt);
1603    }
1604
1605    /// Reads all bytes and appends them into `buf` until the delimiter `byte` or EOF is found.
1606    ///
1607    /// This method will read bytes from the underlying stream until the delimiter or EOF is
1608    /// found. All bytes up to and including the delimiter (if found) will be appended to `buf`.
1609    ///
1610    /// If successful, returns the total number of bytes read.
1611    ///
1612    /// # Examples
1613    ///
1614    /// ```
1615    /// use futures_lite::io::{AsyncBufReadExt, BufReader};
1616    ///
1617    /// # spin_on::spin_on(async {
1618    /// let input: &[u8] = b"hello";
1619    /// let mut reader = BufReader::new(input);
1620    ///
1621    /// let mut buf = Vec::new();
1622    /// let n = reader.read_until(b'\n', &mut buf).await?;
1623    /// # std::io::Result::Ok(()) });
1624    /// ```
1625    fn read_until<'a>(&'a mut self, byte: u8, buf: &'a mut Vec<u8>) -> ReadUntilFuture<'_, Self>
1626    where
1627        Self: Unpin,
1628    {
1629        ReadUntilFuture {
1630            reader: self,
1631            byte,
1632            buf,
1633            read: 0,
1634        }
1635    }
1636
1637    /// Reads all bytes and appends them into `buf` until a newline (the 0xA byte) or EOF is found.
1638    ///
1639    /// This method will read bytes from the underlying stream until the newline delimiter (the
1640    /// 0xA byte) or EOF is found. All bytes up to, and including, the newline delimiter (if found)
1641    /// will be appended to `buf`.
1642    ///
1643    /// If successful, returns the total number of bytes read.
1644    ///
1645    /// # Examples
1646    ///
1647    /// ```
1648    /// use futures_lite::io::{AsyncBufReadExt, BufReader};
1649    ///
1650    /// # spin_on::spin_on(async {
1651    /// let input: &[u8] = b"hello";
1652    /// let mut reader = BufReader::new(input);
1653    ///
1654    /// let mut line = String::new();
1655    /// let n = reader.read_line(&mut line).await?;
1656    /// # std::io::Result::Ok(()) });
1657    /// ```
1658    fn read_line<'a>(&'a mut self, buf: &'a mut String) -> ReadLineFuture<'_, Self>
1659    where
1660        Self: Unpin,
1661    {
1662        ReadLineFuture {
1663            reader: self,
1664            buf,
1665            bytes: Vec::new(),
1666            read: 0,
1667        }
1668    }
1669
1670    /// Returns a stream over the lines of this byte stream.
1671    ///
1672    /// The stream returned from this method yields items of type
1673    /// [`io::Result`][`super::io::Result`]`<`[`String`]`>`.
1674    /// Each string returned will *not* have a newline byte (the 0xA byte) or CRLF (0xD, 0xA bytes)
1675    /// at the end.
1676    ///
1677    /// # Examples
1678    ///
1679    /// ```
1680    /// use futures_lite::io::{AsyncBufReadExt, BufReader};
1681    /// use futures_lite::stream::StreamExt;
1682    ///
1683    /// # spin_on::spin_on(async {
1684    /// let input: &[u8] = b"hello\nworld\n";
1685    /// let mut reader = BufReader::new(input);
1686    /// let mut lines = reader.lines();
1687    ///
1688    /// while let Some(line) = lines.next().await {
1689    ///     println!("{}", line?);
1690    /// }
1691    /// # std::io::Result::Ok(()) });
1692    /// ```
1693    fn lines(self) -> Lines<Self>
1694    where
1695        Self: Unpin + Sized,
1696    {
1697        Lines {
1698            reader: self,
1699            buf: String::new(),
1700            bytes: Vec::new(),
1701            read: 0,
1702        }
1703    }
1704
1705    /// Returns a stream over the contents of this reader split on the specified `byte`.
1706    ///
1707    /// The stream returned from this method yields items of type
1708    /// [`io::Result`][`super::io::Result`]`<`[`Vec<u8>`][`Vec`]`>`.
1709    /// Each vector returned will *not* have the delimiter byte at the end.
1710    ///
1711    /// # Examples
1712    ///
1713    /// ```
1714    /// use futures_lite::io::{AsyncBufReadExt, Cursor};
1715    /// use futures_lite::stream::StreamExt;
1716    ///
1717    /// # spin_on::spin_on(async {
1718    /// let cursor = Cursor::new(b"lorem-ipsum-dolor");
1719    /// let items: Vec<Vec<u8>> = cursor.split(b'-').try_collect().await?;
1720    ///
1721    /// assert_eq!(items[0], b"lorem");
1722    /// assert_eq!(items[1], b"ipsum");
1723    /// assert_eq!(items[2], b"dolor");
1724    /// # std::io::Result::Ok(()) });
1725    /// ```
1726    fn split(self, byte: u8) -> Split<Self>
1727    where
1728        Self: Sized,
1729    {
1730        Split {
1731            reader: self,
1732            buf: Vec::new(),
1733            delim: byte,
1734            read: 0,
1735        }
1736    }
1737}
1738
1739impl<R: AsyncBufRead + ?Sized> AsyncBufReadExt for R {}
1740
1741/// Future for the [`AsyncBufReadExt::fill_buf()`] method.
1742#[derive(Debug)]
1743#[must_use = "futures do nothing unless you `.await` or poll them"]
1744pub struct FillBuf<'a, R: ?Sized> {
1745    reader: Option<&'a mut R>,
1746}
1747
1748impl<R: ?Sized> Unpin for FillBuf<'_, R> {}
1749
1750impl<'a, R> Future for FillBuf<'a, R>
1751where
1752    R: AsyncBufRead + Unpin + ?Sized,
1753{
1754    type Output = Result<&'a [u8]>;
1755
1756    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
1757        let this = &mut *self;
1758        let reader = this
1759            .reader
1760            .take()
1761            .expect("polled `FillBuf` after completion");
1762
1763        match Pin::new(&mut *reader).poll_fill_buf(cx) {
1764            Poll::Ready(Ok(_)) => match Pin::new(reader).poll_fill_buf(cx) {
1765                Poll::Ready(Ok(slice)) => Poll::Ready(Ok(slice)),
1766                poll => panic!("`poll_fill_buf()` was ready but now it isn't: {:?}", poll),
1767            },
1768            Poll::Ready(Err(err)) => Poll::Ready(Err(err)),
1769            Poll::Pending => {
1770                this.reader = Some(reader);
1771                Poll::Pending
1772            }
1773        }
1774    }
1775}
1776
1777/// Future for the [`AsyncBufReadExt::read_until()`] method.
1778#[derive(Debug)]
1779#[must_use = "futures do nothing unless you `.await` or poll them"]
1780pub struct ReadUntilFuture<'a, R: Unpin + ?Sized> {
1781    reader: &'a mut R,
1782    byte: u8,
1783    buf: &'a mut Vec<u8>,
1784    read: usize,
1785}
1786
1787impl<R: Unpin + ?Sized> Unpin for ReadUntilFuture<'_, R> {}
1788
1789impl<R: AsyncBufRead + Unpin + ?Sized> Future for ReadUntilFuture<'_, R> {
1790    type Output = Result<usize>;
1791
1792    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
1793        let Self {
1794            reader,
1795            byte,
1796            buf,
1797            read,
1798        } = &mut *self;
1799        read_until_internal(Pin::new(reader), cx, *byte, buf, read)
1800    }
1801}
1802
1803fn read_until_internal<R: AsyncBufReadExt + ?Sized>(
1804    mut reader: Pin<&mut R>,
1805    cx: &mut Context<'_>,
1806    byte: u8,
1807    buf: &mut Vec<u8>,
1808    read: &mut usize,
1809) -> Poll<Result<usize>> {
1810    loop {
1811        let (done, used) = {
1812            let available = ready!(reader.as_mut().poll_fill_buf(cx))?;
1813
1814            if let Some(i) = memchr::memchr(byte, available) {
1815                buf.extend_from_slice(&available[..=i]);
1816                (true, i + 1)
1817            } else {
1818                buf.extend_from_slice(available);
1819                (false, available.len())
1820            }
1821        };
1822
1823        reader.as_mut().consume(used);
1824        *read += used;
1825
1826        if done || used == 0 {
1827            return Poll::Ready(Ok(mem::replace(read, 0)));
1828        }
1829    }
1830}
1831
1832/// Future for the [`AsyncBufReadExt::read_line()`] method.
1833#[derive(Debug)]
1834#[must_use = "futures do nothing unless you `.await` or poll them"]
1835pub struct ReadLineFuture<'a, R: Unpin + ?Sized> {
1836    reader: &'a mut R,
1837    buf: &'a mut String,
1838    bytes: Vec<u8>,
1839    read: usize,
1840}
1841
1842impl<R: Unpin + ?Sized> Unpin for ReadLineFuture<'_, R> {}
1843
1844impl<R: AsyncBufRead + Unpin + ?Sized> Future for ReadLineFuture<'_, R> {
1845    type Output = Result<usize>;
1846
1847    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
1848        let Self {
1849            reader,
1850            buf,
1851            bytes,
1852            read,
1853        } = &mut *self;
1854        read_line_internal(Pin::new(reader), cx, buf, bytes, read)
1855    }
1856}
1857
1858pin_project! {
1859    /// Stream for the [`AsyncBufReadExt::lines()`] method.
1860    #[derive(Debug)]
1861    #[must_use = "streams do nothing unless polled"]
1862    pub struct Lines<R> {
1863        #[pin]
1864        reader: R,
1865        buf: String,
1866        bytes: Vec<u8>,
1867        read: usize,
1868    }
1869}
1870
1871impl<R: AsyncBufRead> Stream for Lines<R> {
1872    type Item = Result<String>;
1873
1874    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
1875        let this = self.project();
1876
1877        let n = ready!(read_line_internal(
1878            this.reader,
1879            cx,
1880            this.buf,
1881            this.bytes,
1882            this.read
1883        ))?;
1884        if n == 0 && this.buf.is_empty() {
1885            return Poll::Ready(None);
1886        }
1887
1888        if this.buf.ends_with('\n') {
1889            this.buf.pop();
1890            if this.buf.ends_with('\r') {
1891                this.buf.pop();
1892            }
1893        }
1894        Poll::Ready(Some(Ok(mem::take(this.buf))))
1895    }
1896}
1897
1898fn read_line_internal<R: AsyncBufRead + ?Sized>(
1899    reader: Pin<&mut R>,
1900    cx: &mut Context<'_>,
1901    buf: &mut String,
1902    bytes: &mut Vec<u8>,
1903    read: &mut usize,
1904) -> Poll<Result<usize>> {
1905    let ret = ready!(read_until_internal(reader, cx, b'\n', bytes, read));
1906
1907    match String::from_utf8(mem::take(bytes)) {
1908        Ok(s) => {
1909            debug_assert!(buf.is_empty());
1910            debug_assert_eq!(*read, 0);
1911            *buf = s;
1912            Poll::Ready(ret)
1913        }
1914        Err(_) => Poll::Ready(ret.and_then(|_| {
1915            Err(Error::new(
1916                ErrorKind::InvalidData,
1917                "stream did not contain valid UTF-8",
1918            ))
1919        })),
1920    }
1921}
1922
1923pin_project! {
1924    /// Stream for the [`AsyncBufReadExt::split()`] method.
1925    #[derive(Debug)]
1926    #[must_use = "streams do nothing unless polled"]
1927    pub struct Split<R> {
1928        #[pin]
1929        reader: R,
1930        buf: Vec<u8>,
1931        read: usize,
1932        delim: u8,
1933    }
1934}
1935
1936impl<R: AsyncBufRead> Stream for Split<R> {
1937    type Item = Result<Vec<u8>>;
1938
1939    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
1940        let this = self.project();
1941
1942        let n = ready!(read_until_internal(
1943            this.reader,
1944            cx,
1945            *this.delim,
1946            this.buf,
1947            this.read
1948        ))?;
1949        if n == 0 && this.buf.is_empty() {
1950            return Poll::Ready(None);
1951        }
1952
1953        if this.buf[this.buf.len() - 1] == *this.delim {
1954            this.buf.pop();
1955        }
1956        Poll::Ready(Some(Ok(mem::take(this.buf))))
1957    }
1958}
1959
1960/// Extension trait for [`AsyncRead`].
1961pub trait AsyncReadExt: AsyncRead {
1962    /// Reads some bytes from the byte stream.
1963    ///
1964    /// On success, returns the total number of bytes read.
1965    ///
1966    /// If the return value is `Ok(n)`, then it must be guaranteed that
1967    /// `0 <= n <= buf.len()`. A nonzero `n` value indicates that the buffer has been
1968    /// filled with `n` bytes of data. If `n` is `0`, then it can indicate one of two
1969    /// scenarios:
1970    ///
1971    /// 1. This reader has reached its "end of file" and will likely no longer be able to
1972    ///    produce bytes. Note that this does not mean that the reader will always no
1973    ///    longer be able to produce bytes.
1974    /// 2. The buffer specified was 0 bytes in length.
1975    ///
1976    /// # Examples
1977    ///
1978    /// ```
1979    /// use futures_lite::io::{AsyncReadExt, BufReader};
1980    ///
1981    /// # spin_on::spin_on(async {
1982    /// let input: &[u8] = b"hello";
1983    /// let mut reader = BufReader::new(input);
1984    ///
1985    /// let mut buf = vec![0; 1024];
1986    /// let n = reader.read(&mut buf).await?;
1987    /// # std::io::Result::Ok(()) });
1988    /// ```
1989    fn read<'a>(&'a mut self, buf: &'a mut [u8]) -> ReadFuture<'a, Self>
1990    where
1991        Self: Unpin,
1992    {
1993        ReadFuture { reader: self, buf }
1994    }
1995
1996    /// Like [`read()`][`AsyncReadExt::read()`], except it reads into a slice of buffers.
1997    ///
1998    /// Data is copied to fill each buffer in order, with the final buffer possibly being
1999    /// only partially filled. This method must behave same as a single call to
2000    /// [`read()`][`AsyncReadExt::read()`] with the buffers concatenated would.
2001    fn read_vectored<'a>(
2002        &'a mut self,
2003        bufs: &'a mut [IoSliceMut<'a>],
2004    ) -> ReadVectoredFuture<'a, Self>
2005    where
2006        Self: Unpin,
2007    {
2008        ReadVectoredFuture { reader: self, bufs }
2009    }
2010
2011    /// Reads the entire contents and appends them to a [`Vec`].
2012    ///
2013    /// On success, returns the total number of bytes read.
2014    ///
2015    /// # Examples
2016    ///
2017    /// ```
2018    /// use futures_lite::io::{AsyncReadExt, Cursor};
2019    ///
2020    /// # spin_on::spin_on(async {
2021    /// let mut reader = Cursor::new(vec![1, 2, 3]);
2022    /// let mut contents = Vec::new();
2023    ///
2024    /// let n = reader.read_to_end(&mut contents).await?;
2025    /// assert_eq!(n, 3);
2026    /// assert_eq!(contents, [1, 2, 3]);
2027    /// # std::io::Result::Ok(()) });
2028    /// ```
2029    fn read_to_end<'a>(&'a mut self, buf: &'a mut Vec<u8>) -> ReadToEndFuture<'a, Self>
2030    where
2031        Self: Unpin,
2032    {
2033        let start_len = buf.len();
2034        ReadToEndFuture {
2035            reader: self,
2036            buf,
2037            start_len,
2038        }
2039    }
2040
2041    /// Reads the entire contents and appends them to a [`String`].
2042    ///
2043    /// On success, returns the total number of bytes read.
2044    ///
2045    /// # Examples
2046    ///
2047    /// ```
2048    /// use futures_lite::io::{AsyncReadExt, Cursor};
2049    ///
2050    /// # spin_on::spin_on(async {
2051    /// let mut reader = Cursor::new(&b"hello");
2052    /// let mut contents = String::new();
2053    ///
2054    /// let n = reader.read_to_string(&mut contents).await?;
2055    /// assert_eq!(n, 5);
2056    /// assert_eq!(contents, "hello");
2057    /// # std::io::Result::Ok(()) });
2058    /// ```
2059    fn read_to_string<'a>(&'a mut self, buf: &'a mut String) -> ReadToStringFuture<'a, Self>
2060    where
2061        Self: Unpin,
2062    {
2063        ReadToStringFuture {
2064            reader: self,
2065            buf,
2066            bytes: Vec::new(),
2067            start_len: 0,
2068        }
2069    }
2070
2071    /// Reads the exact number of bytes required to fill `buf`.
2072    ///
2073    /// On success, returns the total number of bytes read.
2074    ///
2075    /// # Examples
2076    ///
2077    /// ```
2078    /// use futures_lite::io::{AsyncReadExt, Cursor};
2079    ///
2080    /// # spin_on::spin_on(async {
2081    /// let mut reader = Cursor::new(&b"hello");
2082    /// let mut contents = vec![0; 3];
2083    ///
2084    /// reader.read_exact(&mut contents).await?;
2085    /// assert_eq!(contents, b"hel");
2086    /// # std::io::Result::Ok(()) });
2087    /// ```
2088    fn read_exact<'a>(&'a mut self, buf: &'a mut [u8]) -> ReadExactFuture<'a, Self>
2089    where
2090        Self: Unpin,
2091    {
2092        ReadExactFuture { reader: self, buf }
2093    }
2094
2095    /// Creates an adapter which will read at most `limit` bytes from it.
2096    ///
2097    /// This method returns a new instance of [`AsyncRead`] which will read at most
2098    /// `limit` bytes, after which it will always return `Ok(0)` indicating EOF.
2099    ///
2100    /// # Examples
2101    ///
2102    /// ```
2103    /// use futures_lite::io::{AsyncReadExt, Cursor};
2104    ///
2105    /// # spin_on::spin_on(async {
2106    /// let mut reader = Cursor::new(&b"hello");
2107    /// let mut contents = String::new();
2108    ///
2109    /// let n = reader.take(3).read_to_string(&mut contents).await?;
2110    /// assert_eq!(n, 3);
2111    /// assert_eq!(contents, "hel");
2112    /// # std::io::Result::Ok(()) });
2113    /// ```
2114    fn take(self, limit: u64) -> Take<Self>
2115    where
2116        Self: Sized,
2117    {
2118        Take { inner: self, limit }
2119    }
2120
2121    /// Converts this [`AsyncRead`] into a [`Stream`] of bytes.
2122    ///
2123    /// The returned type implements [`Stream`] where `Item` is `io::Result<u8>`.
2124    ///
2125    /// ```
2126    /// use futures_lite::io::{AsyncReadExt, Cursor};
2127    /// use futures_lite::stream::StreamExt;
2128    ///
2129    /// # spin_on::spin_on(async {
2130    /// let reader = Cursor::new(&b"hello");
2131    /// let mut bytes = reader.bytes();
2132    ///
2133    /// while let Some(byte) = bytes.next().await {
2134    ///     println!("byte: {}", byte?);
2135    /// }
2136    /// # std::io::Result::Ok(()) });
2137    /// ```
2138    fn bytes(self) -> Bytes<Self>
2139    where
2140        Self: Sized,
2141    {
2142        Bytes { inner: self }
2143    }
2144
2145    /// Creates an adapter which will chain this stream with another.
2146    ///
2147    /// The returned [`AsyncRead`] instance will first read all bytes from this reader
2148    /// until EOF is found, and then continue with `next`.
2149    ///
2150    /// # Examples
2151    ///
2152    /// ```
2153    /// use futures_lite::io::{AsyncReadExt, Cursor};
2154    ///
2155    /// # spin_on::spin_on(async {
2156    /// let r1 = Cursor::new(&b"hello");
2157    /// let r2 = Cursor::new(&b"world");
2158    /// let mut reader = r1.chain(r2);
2159    ///
2160    /// let mut contents = String::new();
2161    /// reader.read_to_string(&mut contents).await?;
2162    /// assert_eq!(contents, "helloworld");
2163    /// # std::io::Result::Ok(()) });
2164    /// ```
2165    fn chain<R: AsyncRead>(self, next: R) -> Chain<Self, R>
2166    where
2167        Self: Sized,
2168    {
2169        Chain {
2170            first: self,
2171            second: next,
2172            done_first: false,
2173        }
2174    }
2175
2176    /// Boxes the reader and changes its type to `dyn AsyncRead + Send + 'a`.
2177    ///
2178    /// # Examples
2179    ///
2180    /// ```
2181    /// use futures_lite::io::AsyncReadExt;
2182    ///
2183    /// let reader = [1, 2, 3].boxed_reader();
2184    /// ```
2185    #[cfg(feature = "alloc")]
2186    fn boxed_reader<'a>(self) -> Pin<Box<dyn AsyncRead + Send + 'a>>
2187    where
2188        Self: Sized + Send + 'a,
2189    {
2190        Box::pin(self)
2191    }
2192}
2193
2194impl<R: AsyncRead + ?Sized> AsyncReadExt for R {}
2195
2196/// Future for the [`AsyncReadExt::read()`] method.
2197#[derive(Debug)]
2198#[must_use = "futures do nothing unless you `.await` or poll them"]
2199pub struct ReadFuture<'a, R: Unpin + ?Sized> {
2200    reader: &'a mut R,
2201    buf: &'a mut [u8],
2202}
2203
2204impl<R: Unpin + ?Sized> Unpin for ReadFuture<'_, R> {}
2205
2206impl<R: AsyncRead + Unpin + ?Sized> Future for ReadFuture<'_, R> {
2207    type Output = Result<usize>;
2208
2209    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
2210        let Self { reader, buf } = &mut *self;
2211        Pin::new(reader).poll_read(cx, buf)
2212    }
2213}
2214
2215/// Future for the [`AsyncReadExt::read_vectored()`] method.
2216#[derive(Debug)]
2217#[must_use = "futures do nothing unless you `.await` or poll them"]
2218pub struct ReadVectoredFuture<'a, R: Unpin + ?Sized> {
2219    reader: &'a mut R,
2220    bufs: &'a mut [IoSliceMut<'a>],
2221}
2222
2223impl<R: Unpin + ?Sized> Unpin for ReadVectoredFuture<'_, R> {}
2224
2225impl<R: AsyncRead + Unpin + ?Sized> Future for ReadVectoredFuture<'_, R> {
2226    type Output = Result<usize>;
2227
2228    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
2229        let Self { reader, bufs } = &mut *self;
2230        Pin::new(reader).poll_read_vectored(cx, bufs)
2231    }
2232}
2233
2234/// Future for the [`AsyncReadExt::read_to_end()`] method.
2235#[derive(Debug)]
2236#[must_use = "futures do nothing unless you `.await` or poll them"]
2237pub struct ReadToEndFuture<'a, R: Unpin + ?Sized> {
2238    reader: &'a mut R,
2239    buf: &'a mut Vec<u8>,
2240    start_len: usize,
2241}
2242
2243impl<R: Unpin + ?Sized> Unpin for ReadToEndFuture<'_, R> {}
2244
2245impl<R: AsyncRead + Unpin + ?Sized> Future for ReadToEndFuture<'_, R> {
2246    type Output = Result<usize>;
2247
2248    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
2249        let Self {
2250            reader,
2251            buf,
2252            start_len,
2253        } = &mut *self;
2254        read_to_end_internal(Pin::new(reader), cx, buf, *start_len)
2255    }
2256}
2257
2258/// Future for the [`AsyncReadExt::read_to_string()`] method.
2259#[derive(Debug)]
2260#[must_use = "futures do nothing unless you `.await` or poll them"]
2261pub struct ReadToStringFuture<'a, R: Unpin + ?Sized> {
2262    reader: &'a mut R,
2263    buf: &'a mut String,
2264    bytes: Vec<u8>,
2265    start_len: usize,
2266}
2267
2268impl<R: Unpin + ?Sized> Unpin for ReadToStringFuture<'_, R> {}
2269
2270impl<R: AsyncRead + Unpin + ?Sized> Future for ReadToStringFuture<'_, R> {
2271    type Output = Result<usize>;
2272
2273    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
2274        let Self {
2275            reader,
2276            buf,
2277            bytes,
2278            start_len,
2279        } = &mut *self;
2280        let reader = Pin::new(reader);
2281
2282        let ret = ready!(read_to_end_internal(reader, cx, bytes, *start_len));
2283
2284        match String::from_utf8(mem::take(bytes)) {
2285            Ok(s) => {
2286                debug_assert!(buf.is_empty());
2287                **buf = s;
2288                Poll::Ready(ret)
2289            }
2290            Err(_) => Poll::Ready(ret.and_then(|_| {
2291                Err(Error::new(
2292                    ErrorKind::InvalidData,
2293                    "stream did not contain valid UTF-8",
2294                ))
2295            })),
2296        }
2297    }
2298}
2299
2300// This uses an adaptive system to extend the vector when it fills. We want to
2301// avoid paying to allocate and zero a huge chunk of memory if the reader only
2302// has 4 bytes while still making large reads if the reader does have a ton
2303// of data to return. Simply tacking on an extra DEFAULT_BUF_SIZE space every
2304// time is 4,500 times (!) slower than this if the reader has a very small
2305// amount of data to return.
2306//
2307// Because we're extending the buffer with uninitialized data for trusted
2308// readers, we need to make sure to truncate that if any of this panics.
2309fn read_to_end_internal<R: AsyncRead + ?Sized>(
2310    mut rd: Pin<&mut R>,
2311    cx: &mut Context<'_>,
2312    buf: &mut Vec<u8>,
2313    start_len: usize,
2314) -> Poll<Result<usize>> {
2315    struct Guard<'a> {
2316        buf: &'a mut Vec<u8>,
2317        len: usize,
2318    }
2319
2320    impl Drop for Guard<'_> {
2321        fn drop(&mut self) {
2322            self.buf.resize(self.len, 0);
2323        }
2324    }
2325
2326    let mut g = Guard {
2327        len: buf.len(),
2328        buf,
2329    };
2330    let ret;
2331    loop {
2332        if g.len == g.buf.len() {
2333            g.buf.reserve(32);
2334            let capacity = g.buf.capacity();
2335            g.buf.resize(capacity, 0);
2336        }
2337
2338        match ready!(rd.as_mut().poll_read(cx, &mut g.buf[g.len..])) {
2339            Ok(0) => {
2340                ret = Poll::Ready(Ok(g.len - start_len));
2341                break;
2342            }
2343            Ok(n) => g.len += n,
2344            Err(e) => {
2345                ret = Poll::Ready(Err(e));
2346                break;
2347            }
2348        }
2349    }
2350
2351    ret
2352}
2353
2354/// Future for the [`AsyncReadExt::read_exact()`] method.
2355#[derive(Debug)]
2356#[must_use = "futures do nothing unless you `.await` or poll them"]
2357pub struct ReadExactFuture<'a, R: Unpin + ?Sized> {
2358    reader: &'a mut R,
2359    buf: &'a mut [u8],
2360}
2361
2362impl<R: Unpin + ?Sized> Unpin for ReadExactFuture<'_, R> {}
2363
2364impl<R: AsyncRead + Unpin + ?Sized> Future for ReadExactFuture<'_, R> {
2365    type Output = Result<()>;
2366
2367    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
2368        let Self { reader, buf } = &mut *self;
2369
2370        while !buf.is_empty() {
2371            let n = ready!(Pin::new(&mut *reader).poll_read(cx, buf))?;
2372            let (_, rest) = mem::take(buf).split_at_mut(n);
2373            *buf = rest;
2374
2375            if n == 0 {
2376                return Poll::Ready(Err(ErrorKind::UnexpectedEof.into()));
2377            }
2378        }
2379
2380        Poll::Ready(Ok(()))
2381    }
2382}
2383
2384pin_project! {
2385    /// Reader for the [`AsyncReadExt::take()`] method.
2386    #[derive(Debug)]
2387    pub struct Take<R> {
2388        #[pin]
2389        inner: R,
2390        limit: u64,
2391    }
2392}
2393
2394impl<R> Take<R> {
2395    /// Returns the number of bytes before this adapter will return EOF.
2396    ///
2397    /// Note that EOF may be reached sooner if the underlying reader is shorter than the limit.
2398    ///
2399    /// # Examples
2400    ///
2401    /// ```
2402    /// use futures_lite::io::{AsyncReadExt, Cursor};
2403    ///
2404    /// let reader = Cursor::new("hello");
2405    ///
2406    /// let reader = reader.take(3);
2407    /// assert_eq!(reader.limit(), 3);
2408    /// ```
2409    pub fn limit(&self) -> u64 {
2410        self.limit
2411    }
2412
2413    /// Puts a limit on the number of bytes.
2414    ///
2415    /// Changing the limit is equivalent to creating a new adapter with [`AsyncReadExt::take()`].
2416    ///
2417    /// # Examples
2418    ///
2419    /// ```
2420    /// use futures_lite::io::{AsyncReadExt, Cursor};
2421    ///
2422    /// let reader = Cursor::new("hello");
2423    ///
2424    /// let mut reader = reader.take(10);
2425    /// assert_eq!(reader.limit(), 10);
2426    ///
2427    /// reader.set_limit(3);
2428    /// assert_eq!(reader.limit(), 3);
2429    /// ```
2430    pub fn set_limit(&mut self, limit: u64) {
2431        self.limit = limit;
2432    }
2433
2434    /// Gets a reference to the underlying reader.
2435    ///
2436    /// # Examples
2437    ///
2438    /// ```
2439    /// use futures_lite::io::{AsyncReadExt, Cursor};
2440    ///
2441    /// let reader = Cursor::new("hello");
2442    ///
2443    /// let reader = reader.take(3);
2444    /// let r = reader.get_ref();
2445    /// ```
2446    pub fn get_ref(&self) -> &R {
2447        &self.inner
2448    }
2449
2450    /// Gets a mutable reference to the underlying reader.
2451    ///
2452    /// # Examples
2453    ///
2454    /// ```
2455    /// use futures_lite::io::{AsyncReadExt, Cursor};
2456    ///
2457    /// let reader = Cursor::new("hello");
2458    ///
2459    /// let mut reader = reader.take(3);
2460    /// let r = reader.get_mut();
2461    /// ```
2462    pub fn get_mut(&mut self) -> &mut R {
2463        &mut self.inner
2464    }
2465
2466    /// Unwraps the adapter, returning the underlying reader.
2467    ///
2468    /// # Examples
2469    ///
2470    /// ```
2471    /// use futures_lite::io::{AsyncReadExt, Cursor};
2472    ///
2473    /// let reader = Cursor::new("hello");
2474    ///
2475    /// let reader = reader.take(3);
2476    /// let reader = reader.into_inner();
2477    /// ```
2478    pub fn into_inner(self) -> R {
2479        self.inner
2480    }
2481}
2482
2483impl<R: AsyncRead> AsyncRead for Take<R> {
2484    fn poll_read(
2485        self: Pin<&mut Self>,
2486        cx: &mut Context<'_>,
2487        buf: &mut [u8],
2488    ) -> Poll<Result<usize>> {
2489        let this = self.project();
2490        take_read_internal(this.inner, cx, buf, this.limit)
2491    }
2492}
2493
2494fn take_read_internal<R: AsyncRead + ?Sized>(
2495    mut rd: Pin<&mut R>,
2496    cx: &mut Context<'_>,
2497    buf: &mut [u8],
2498    limit: &mut u64,
2499) -> Poll<Result<usize>> {
2500    // Don't call into inner reader at all at EOF because it may still block
2501    if *limit == 0 {
2502        return Poll::Ready(Ok(0));
2503    }
2504
2505    let max = cmp::min(buf.len() as u64, *limit) as usize;
2506
2507    match ready!(rd.as_mut().poll_read(cx, &mut buf[..max])) {
2508        Ok(n) => {
2509            *limit -= n as u64;
2510            Poll::Ready(Ok(n))
2511        }
2512        Err(e) => Poll::Ready(Err(e)),
2513    }
2514}
2515
2516impl<R: AsyncBufRead> AsyncBufRead for Take<R> {
2517    fn poll_fill_buf(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<&[u8]>> {
2518        let this = self.project();
2519
2520        if *this.limit == 0 {
2521            return Poll::Ready(Ok(&[]));
2522        }
2523
2524        match ready!(this.inner.poll_fill_buf(cx)) {
2525            Ok(buf) => {
2526                let cap = cmp::min(buf.len() as u64, *this.limit) as usize;
2527                Poll::Ready(Ok(&buf[..cap]))
2528            }
2529            Err(e) => Poll::Ready(Err(e)),
2530        }
2531    }
2532
2533    fn consume(self: Pin<&mut Self>, amt: usize) {
2534        let this = self.project();
2535        // Don't let callers reset the limit by passing an overlarge value
2536        let amt = cmp::min(amt as u64, *this.limit) as usize;
2537        *this.limit -= amt as u64;
2538
2539        this.inner.consume(amt);
2540    }
2541}
2542
2543pin_project! {
2544    /// Reader for the [`AsyncReadExt::bytes()`] method.
2545    #[derive(Debug)]
2546    pub struct Bytes<R> {
2547        #[pin]
2548        inner: R,
2549    }
2550}
2551
2552impl<R: AsyncRead + Unpin> Stream for Bytes<R> {
2553    type Item = Result<u8>;
2554
2555    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
2556        let mut byte = 0;
2557
2558        let rd = Pin::new(&mut self.inner);
2559
2560        match ready!(rd.poll_read(cx, std::slice::from_mut(&mut byte))) {
2561            Ok(0) => Poll::Ready(None),
2562            Ok(..) => Poll::Ready(Some(Ok(byte))),
2563            Err(ref e) if e.kind() == ErrorKind::Interrupted => Poll::Pending,
2564            Err(e) => Poll::Ready(Some(Err(e))),
2565        }
2566    }
2567}
2568
2569impl<R: AsyncRead> AsyncRead for Bytes<R> {
2570    fn poll_read(
2571        self: Pin<&mut Self>,
2572        cx: &mut Context<'_>,
2573        buf: &mut [u8],
2574    ) -> Poll<Result<usize>> {
2575        self.project().inner.poll_read(cx, buf)
2576    }
2577
2578    fn poll_read_vectored(
2579        self: Pin<&mut Self>,
2580        cx: &mut Context<'_>,
2581        bufs: &mut [IoSliceMut<'_>],
2582    ) -> Poll<Result<usize>> {
2583        self.project().inner.poll_read_vectored(cx, bufs)
2584    }
2585}
2586
2587pin_project! {
2588    /// Reader for the [`AsyncReadExt::chain()`] method.
2589    pub struct Chain<R1, R2> {
2590        #[pin]
2591        first: R1,
2592        #[pin]
2593        second: R2,
2594        done_first: bool,
2595    }
2596}
2597
2598impl<R1, R2> Chain<R1, R2> {
2599    /// Gets references to the underlying readers.
2600    ///
2601    /// # Examples
2602    ///
2603    /// ```
2604    /// use futures_lite::io::{AsyncReadExt, Cursor};
2605    ///
2606    /// let r1 = Cursor::new(b"hello");
2607    /// let r2 = Cursor::new(b"world");
2608    ///
2609    /// let reader = r1.chain(r2);
2610    /// let (r1, r2) = reader.get_ref();
2611    /// ```
2612    pub fn get_ref(&self) -> (&R1, &R2) {
2613        (&self.first, &self.second)
2614    }
2615
2616    /// Gets mutable references to the underlying readers.
2617    ///
2618    /// # Examples
2619    ///
2620    /// ```
2621    /// use futures_lite::io::{AsyncReadExt, Cursor};
2622    ///
2623    /// let r1 = Cursor::new(b"hello");
2624    /// let r2 = Cursor::new(b"world");
2625    ///
2626    /// let mut reader = r1.chain(r2);
2627    /// let (r1, r2) = reader.get_mut();
2628    /// ```
2629    pub fn get_mut(&mut self) -> (&mut R1, &mut R2) {
2630        (&mut self.first, &mut self.second)
2631    }
2632
2633    /// Unwraps the adapter, returning the underlying readers.
2634    ///
2635    /// # Examples
2636    ///
2637    /// ```
2638    /// use futures_lite::io::{AsyncReadExt, Cursor};
2639    ///
2640    /// let r1 = Cursor::new(b"hello");
2641    /// let r2 = Cursor::new(b"world");
2642    ///
2643    /// let reader = r1.chain(r2);
2644    /// let (r1, r2) = reader.into_inner();
2645    /// ```
2646    pub fn into_inner(self) -> (R1, R2) {
2647        (self.first, self.second)
2648    }
2649}
2650
2651impl<R1: fmt::Debug, R2: fmt::Debug> fmt::Debug for Chain<R1, R2> {
2652    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
2653        f.debug_struct("Chain")
2654            .field("r1", &self.first)
2655            .field("r2", &self.second)
2656            .finish()
2657    }
2658}
2659
2660impl<R1: AsyncRead, R2: AsyncRead> AsyncRead for Chain<R1, R2> {
2661    fn poll_read(
2662        self: Pin<&mut Self>,
2663        cx: &mut Context<'_>,
2664        buf: &mut [u8],
2665    ) -> Poll<Result<usize>> {
2666        let this = self.project();
2667        if !*this.done_first {
2668            match ready!(this.first.poll_read(cx, buf)) {
2669                Ok(0) if !buf.is_empty() => *this.done_first = true,
2670                Ok(n) => return Poll::Ready(Ok(n)),
2671                Err(err) => return Poll::Ready(Err(err)),
2672            }
2673        }
2674
2675        this.second.poll_read(cx, buf)
2676    }
2677
2678    fn poll_read_vectored(
2679        self: Pin<&mut Self>,
2680        cx: &mut Context<'_>,
2681        bufs: &mut [IoSliceMut<'_>],
2682    ) -> Poll<Result<usize>> {
2683        let this = self.project();
2684        if !*this.done_first {
2685            match ready!(this.first.poll_read_vectored(cx, bufs)) {
2686                Ok(0) if !bufs.is_empty() => *this.done_first = true,
2687                Ok(n) => return Poll::Ready(Ok(n)),
2688                Err(err) => return Poll::Ready(Err(err)),
2689            }
2690        }
2691
2692        this.second.poll_read_vectored(cx, bufs)
2693    }
2694}
2695
2696impl<R1: AsyncBufRead, R2: AsyncBufRead> AsyncBufRead for Chain<R1, R2> {
2697    fn poll_fill_buf(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<&[u8]>> {
2698        let this = self.project();
2699        if !*this.done_first {
2700            match ready!(this.first.poll_fill_buf(cx)) {
2701                Ok(buf) if buf.is_empty() => {
2702                    *this.done_first = true;
2703                }
2704                Ok(buf) => return Poll::Ready(Ok(buf)),
2705                Err(err) => return Poll::Ready(Err(err)),
2706            }
2707        }
2708
2709        this.second.poll_fill_buf(cx)
2710    }
2711
2712    fn consume(self: Pin<&mut Self>, amt: usize) {
2713        let this = self.project();
2714        if !*this.done_first {
2715            this.first.consume(amt)
2716        } else {
2717            this.second.consume(amt)
2718        }
2719    }
2720}
2721
2722/// Extension trait for [`AsyncSeek`].
2723pub trait AsyncSeekExt: AsyncSeek {
2724    /// Seeks to a new position in a byte stream.
2725    ///
2726    /// Returns the new position in the byte stream.
2727    ///
2728    /// A seek beyond the end of stream is allowed, but behavior is defined by the implementation.
2729    ///
2730    /// # Examples
2731    ///
2732    /// ```
2733    /// use futures_lite::io::{AsyncSeekExt, Cursor, SeekFrom};
2734    ///
2735    /// # spin_on::spin_on(async {
2736    /// let mut cursor = Cursor::new("hello");
2737    ///
2738    /// // Move the cursor to the end.
2739    /// cursor.seek(SeekFrom::End(0)).await?;
2740    ///
2741    /// // Check the current position.
2742    /// assert_eq!(cursor.seek(SeekFrom::Current(0)).await?, 5);
2743    /// # std::io::Result::Ok(()) });
2744    /// ```
2745    fn seek(&mut self, pos: SeekFrom) -> SeekFuture<'_, Self>
2746    where
2747        Self: Unpin,
2748    {
2749        SeekFuture { seeker: self, pos }
2750    }
2751}
2752
2753impl<S: AsyncSeek + ?Sized> AsyncSeekExt for S {}
2754
2755/// Future for the [`AsyncSeekExt::seek()`] method.
2756#[derive(Debug)]
2757#[must_use = "futures do nothing unless you `.await` or poll them"]
2758pub struct SeekFuture<'a, S: Unpin + ?Sized> {
2759    seeker: &'a mut S,
2760    pos: SeekFrom,
2761}
2762
2763impl<S: Unpin + ?Sized> Unpin for SeekFuture<'_, S> {}
2764
2765impl<S: AsyncSeek + Unpin + ?Sized> Future for SeekFuture<'_, S> {
2766    type Output = Result<u64>;
2767
2768    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
2769        let pos = self.pos;
2770        Pin::new(&mut *self.seeker).poll_seek(cx, pos)
2771    }
2772}
2773
2774/// Extension trait for [`AsyncWrite`].
2775pub trait AsyncWriteExt: AsyncWrite {
2776    /// Writes some bytes into the byte stream.
2777    ///
2778    /// Returns the number of bytes written from the start of the buffer.
2779    ///
2780    /// If the return value is `Ok(n)` then it must be guaranteed that
2781    /// `0 <= n <= buf.len()`. A return value of `0` typically means that the underlying
2782    /// object is no longer able to accept bytes and will likely not be able to in the
2783    /// future as well, or that the provided buffer is empty.
2784    ///
2785    /// # Examples
2786    ///
2787    /// ```
2788    /// use futures_lite::io::{AsyncWriteExt, BufWriter};
2789    ///
2790    /// # spin_on::spin_on(async {
2791    /// let mut output = Vec::new();
2792    /// let mut writer = BufWriter::new(&mut output);
2793    ///
2794    /// let n = writer.write(b"hello").await?;
2795    /// # std::io::Result::Ok(()) });
2796    /// ```
2797    fn write<'a>(&'a mut self, buf: &'a [u8]) -> WriteFuture<'a, Self>
2798    where
2799        Self: Unpin,
2800    {
2801        WriteFuture { writer: self, buf }
2802    }
2803
2804    /// Like [`write()`][`AsyncWriteExt::write()`], except that it writes a slice of buffers.
2805    ///
2806    /// Data is copied from each buffer in order, with the final buffer possibly being only
2807    /// partially consumed. This method must behave same as a call to
2808    /// [`write()`][`AsyncWriteExt::write()`] with the buffers concatenated would.
2809    fn write_vectored<'a>(&'a mut self, bufs: &'a [IoSlice<'a>]) -> WriteVectoredFuture<'a, Self>
2810    where
2811        Self: Unpin,
2812    {
2813        WriteVectoredFuture { writer: self, bufs }
2814    }
2815
2816    /// Writes an entire buffer into the byte stream.
2817    ///
2818    /// This method will keep calling [`write()`][`AsyncWriteExt::write()`] until there is no more
2819    /// data to be written or an error occurs. It will not return before the entire buffer is
2820    /// successfully written or an error occurs.
2821    ///
2822    /// # Examples
2823    ///
2824    /// ```
2825    /// use futures_lite::io::{AsyncWriteExt, BufWriter};
2826    ///
2827    /// # spin_on::spin_on(async {
2828    /// let mut output = Vec::new();
2829    /// let mut writer = BufWriter::new(&mut output);
2830    ///
2831    /// let n = writer.write_all(b"hello").await?;
2832    /// # std::io::Result::Ok(()) });
2833    /// ```
2834    fn write_all<'a>(&'a mut self, buf: &'a [u8]) -> WriteAllFuture<'a, Self>
2835    where
2836        Self: Unpin,
2837    {
2838        WriteAllFuture { writer: self, buf }
2839    }
2840
2841    /// Flushes the stream to ensure that all buffered contents reach their destination.
2842    ///
2843    /// # Examples
2844    ///
2845    /// ```
2846    /// use futures_lite::io::{AsyncWriteExt, BufWriter};
2847    ///
2848    /// # spin_on::spin_on(async {
2849    /// let mut output = Vec::new();
2850    /// let mut writer = BufWriter::new(&mut output);
2851    ///
2852    /// writer.write_all(b"hello").await?;
2853    /// writer.flush().await?;
2854    /// # std::io::Result::Ok(()) });
2855    /// ```
2856    fn flush(&mut self) -> FlushFuture<'_, Self>
2857    where
2858        Self: Unpin,
2859    {
2860        FlushFuture { writer: self }
2861    }
2862
2863    /// Closes the writer.
2864    ///
2865    /// # Examples
2866    ///
2867    /// ```
2868    /// use futures_lite::io::{AsyncWriteExt, BufWriter};
2869    ///
2870    /// # spin_on::spin_on(async {
2871    /// let mut output = Vec::new();
2872    /// let mut writer = BufWriter::new(&mut output);
2873    ///
2874    /// writer.close().await?;
2875    /// # std::io::Result::Ok(()) });
2876    /// ```
2877    fn close(&mut self) -> CloseFuture<'_, Self>
2878    where
2879        Self: Unpin,
2880    {
2881        CloseFuture { writer: self }
2882    }
2883
2884    /// Boxes the writer and changes its type to `dyn AsyncWrite + Send + 'a`.
2885    ///
2886    /// # Examples
2887    ///
2888    /// ```
2889    /// use futures_lite::io::AsyncWriteExt;
2890    ///
2891    /// let writer = Vec::<u8>::new().boxed_writer();
2892    /// ```
2893    #[cfg(feature = "alloc")]
2894    fn boxed_writer<'a>(self) -> Pin<Box<dyn AsyncWrite + Send + 'a>>
2895    where
2896        Self: Sized + Send + 'a,
2897    {
2898        Box::pin(self)
2899    }
2900}
2901
2902impl<W: AsyncWrite + ?Sized> AsyncWriteExt for W {}
2903
2904/// Future for the [`AsyncWriteExt::write()`] method.
2905#[derive(Debug)]
2906#[must_use = "futures do nothing unless you `.await` or poll them"]
2907pub struct WriteFuture<'a, W: Unpin + ?Sized> {
2908    writer: &'a mut W,
2909    buf: &'a [u8],
2910}
2911
2912impl<W: Unpin + ?Sized> Unpin for WriteFuture<'_, W> {}
2913
2914impl<W: AsyncWrite + Unpin + ?Sized> Future for WriteFuture<'_, W> {
2915    type Output = Result<usize>;
2916
2917    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
2918        let buf = self.buf;
2919        Pin::new(&mut *self.writer).poll_write(cx, buf)
2920    }
2921}
2922
2923/// Future for the [`AsyncWriteExt::write_vectored()`] method.
2924#[derive(Debug)]
2925#[must_use = "futures do nothing unless you `.await` or poll them"]
2926pub struct WriteVectoredFuture<'a, W: Unpin + ?Sized> {
2927    writer: &'a mut W,
2928    bufs: &'a [IoSlice<'a>],
2929}
2930
2931impl<W: Unpin + ?Sized> Unpin for WriteVectoredFuture<'_, W> {}
2932
2933impl<W: AsyncWrite + Unpin + ?Sized> Future for WriteVectoredFuture<'_, W> {
2934    type Output = Result<usize>;
2935
2936    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
2937        let bufs = self.bufs;
2938        Pin::new(&mut *self.writer).poll_write_vectored(cx, bufs)
2939    }
2940}
2941
2942/// Future for the [`AsyncWriteExt::write_all()`] method.
2943#[derive(Debug)]
2944#[must_use = "futures do nothing unless you `.await` or poll them"]
2945pub struct WriteAllFuture<'a, W: Unpin + ?Sized> {
2946    writer: &'a mut W,
2947    buf: &'a [u8],
2948}
2949
2950impl<W: Unpin + ?Sized> Unpin for WriteAllFuture<'_, W> {}
2951
2952impl<W: AsyncWrite + Unpin + ?Sized> Future for WriteAllFuture<'_, W> {
2953    type Output = Result<()>;
2954
2955    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
2956        let Self { writer, buf } = &mut *self;
2957
2958        while !buf.is_empty() {
2959            let n = ready!(Pin::new(&mut **writer).poll_write(cx, buf))?;
2960            let (_, rest) = mem::take(buf).split_at(n);
2961            *buf = rest;
2962
2963            if n == 0 {
2964                return Poll::Ready(Err(ErrorKind::WriteZero.into()));
2965            }
2966        }
2967
2968        Poll::Ready(Ok(()))
2969    }
2970}
2971
2972/// Future for the [`AsyncWriteExt::flush()`] method.
2973#[derive(Debug)]
2974#[must_use = "futures do nothing unless you `.await` or poll them"]
2975pub struct FlushFuture<'a, W: Unpin + ?Sized> {
2976    writer: &'a mut W,
2977}
2978
2979impl<W: Unpin + ?Sized> Unpin for FlushFuture<'_, W> {}
2980
2981impl<W: AsyncWrite + Unpin + ?Sized> Future for FlushFuture<'_, W> {
2982    type Output = Result<()>;
2983
2984    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
2985        Pin::new(&mut *self.writer).poll_flush(cx)
2986    }
2987}
2988
2989/// Future for the [`AsyncWriteExt::close()`] method.
2990#[derive(Debug)]
2991#[must_use = "futures do nothing unless you `.await` or poll them"]
2992pub struct CloseFuture<'a, W: Unpin + ?Sized> {
2993    writer: &'a mut W,
2994}
2995
2996impl<W: Unpin + ?Sized> Unpin for CloseFuture<'_, W> {}
2997
2998impl<W: AsyncWrite + Unpin + ?Sized> Future for CloseFuture<'_, W> {
2999    type Output = Result<()>;
3000
3001    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
3002        Pin::new(&mut *self.writer).poll_close(cx)
3003    }
3004}
3005
3006/// Type alias for `Pin<Box<dyn AsyncRead + Send + 'static>>`.
3007///
3008/// # Examples
3009///
3010/// ```
3011/// use futures_lite::io::AsyncReadExt;
3012///
3013/// let reader = [1, 2, 3].boxed_reader();
3014/// ```
3015#[cfg(feature = "alloc")]
3016pub type BoxedReader = Pin<Box<dyn AsyncRead + Send + 'static>>;
3017
3018/// Type alias for `Pin<Box<dyn AsyncWrite + Send + 'static>>`.
3019///
3020/// # Examples
3021///
3022/// ```
3023/// use futures_lite::io::AsyncWriteExt;
3024///
3025/// let writer = Vec::<u8>::new().boxed_writer();
3026/// ```
3027#[cfg(feature = "alloc")]
3028pub type BoxedWriter = Pin<Box<dyn AsyncWrite + Send + 'static>>;
3029
3030/// Splits a stream into [`AsyncRead`] and [`AsyncWrite`] halves.
3031///
3032/// # Examples
3033///
3034/// ```
3035/// use futures_lite::io::{self, Cursor};
3036///
3037/// # spin_on::spin_on(async {
3038/// let stream = Cursor::new(vec![]);
3039/// let (mut reader, mut writer) = io::split(stream);
3040/// # std::io::Result::Ok(()) });
3041/// ```
3042pub fn split<T>(stream: T) -> (ReadHalf<T>, WriteHalf<T>)
3043where
3044    T: AsyncRead + AsyncWrite + Unpin,
3045{
3046    let inner = Arc::new(Mutex::new(stream));
3047    (ReadHalf(inner.clone()), WriteHalf(inner))
3048}
3049
3050/// The read half returned by [`split()`].
3051#[derive(Debug)]
3052pub struct ReadHalf<T>(Arc<Mutex<T>>);
3053
3054/// The write half returned by [`split()`].
3055#[derive(Debug)]
3056pub struct WriteHalf<T>(Arc<Mutex<T>>);
3057
3058impl<T: AsyncRead + Unpin> AsyncRead for ReadHalf<T> {
3059    fn poll_read(
3060        self: Pin<&mut Self>,
3061        cx: &mut Context<'_>,
3062        buf: &mut [u8],
3063    ) -> Poll<Result<usize>> {
3064        let mut inner = self.0.lock().unwrap();
3065        Pin::new(&mut *inner).poll_read(cx, buf)
3066    }
3067
3068    fn poll_read_vectored(
3069        self: Pin<&mut Self>,
3070        cx: &mut Context<'_>,
3071        bufs: &mut [IoSliceMut<'_>],
3072    ) -> Poll<Result<usize>> {
3073        let mut inner = self.0.lock().unwrap();
3074        Pin::new(&mut *inner).poll_read_vectored(cx, bufs)
3075    }
3076}
3077
3078impl<T: AsyncWrite + Unpin> AsyncWrite for WriteHalf<T> {
3079    fn poll_write(self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &[u8]) -> Poll<Result<usize>> {
3080        let mut inner = self.0.lock().unwrap();
3081        Pin::new(&mut *inner).poll_write(cx, buf)
3082    }
3083
3084    fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<()>> {
3085        let mut inner = self.0.lock().unwrap();
3086        Pin::new(&mut *inner).poll_flush(cx)
3087    }
3088
3089    fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<()>> {
3090        let mut inner = self.0.lock().unwrap();
3091        Pin::new(&mut *inner).poll_close(cx)
3092    }
3093}