futures_lite/
stream.rs

1//! Combinators for the [`Stream`] trait.
2//!
3//! # Examples
4//!
5//! ```
6//! use futures_lite::stream::{self, StreamExt};
7//!
8//! # spin_on::spin_on(async {
9//! let mut s = stream::iter(vec![1, 2, 3]);
10//!
11//! assert_eq!(s.next().await, Some(1));
12//! assert_eq!(s.next().await, Some(2));
13//! assert_eq!(s.next().await, Some(3));
14//! assert_eq!(s.next().await, None);
15//! # });
16//! ```
17
18#[cfg(feature = "alloc")]
19extern crate alloc;
20
21#[doc(no_inline)]
22pub use futures_core::stream::Stream;
23
24#[cfg(feature = "alloc")]
25use alloc::boxed::Box;
26
27use core::fmt;
28use core::future::Future;
29use core::marker::PhantomData;
30use core::mem;
31use core::pin::Pin;
32use core::task::{Context, Poll};
33
34use pin_project_lite::pin_project;
35
36use crate::ready;
37
38/// Converts a stream into a blocking iterator.
39///
40/// # Examples
41///
42/// ```
43/// use futures_lite::{pin, stream};
44///
45/// let stream = stream::once(7);
46/// pin!(stream);
47///
48/// let mut iter = stream::block_on(stream);
49/// assert_eq!(iter.next(), Some(7));
50/// assert_eq!(iter.next(), None);
51/// ```
52#[cfg(feature = "std")]
53pub fn block_on<S: Stream + Unpin>(stream: S) -> BlockOn<S> {
54    BlockOn(stream)
55}
56
57/// Iterator for the [`block_on()`] function.
58#[derive(Debug)]
59pub struct BlockOn<S>(S);
60
61#[cfg(feature = "std")]
62impl<S: Stream + Unpin> Iterator for BlockOn<S> {
63    type Item = S::Item;
64
65    fn next(&mut self) -> Option<Self::Item> {
66        crate::future::block_on(self.0.next())
67    }
68
69    fn size_hint(&self) -> (usize, Option<usize>) {
70        self.0.size_hint()
71    }
72
73    fn count(self) -> usize {
74        crate::future::block_on(self.0.count())
75    }
76
77    fn last(self) -> Option<Self::Item> {
78        crate::future::block_on(self.0.last())
79    }
80
81    fn nth(&mut self, n: usize) -> Option<Self::Item> {
82        crate::future::block_on(self.0.nth(n))
83    }
84
85    fn fold<B, F>(self, init: B, f: F) -> B
86    where
87        F: FnMut(B, Self::Item) -> B,
88    {
89        crate::future::block_on(self.0.fold(init, f))
90    }
91
92    fn for_each<F>(self, f: F) -> F::Output
93    where
94        F: FnMut(Self::Item),
95    {
96        crate::future::block_on(self.0.for_each(f))
97    }
98
99    fn all<F>(&mut self, f: F) -> bool
100    where
101        F: FnMut(Self::Item) -> bool,
102    {
103        crate::future::block_on(self.0.all(f))
104    }
105
106    fn any<F>(&mut self, f: F) -> bool
107    where
108        F: FnMut(Self::Item) -> bool,
109    {
110        crate::future::block_on(self.0.any(f))
111    }
112
113    fn find<P>(&mut self, predicate: P) -> Option<Self::Item>
114    where
115        P: FnMut(&Self::Item) -> bool,
116    {
117        crate::future::block_on(self.0.find(predicate))
118    }
119
120    fn find_map<B, F>(&mut self, f: F) -> Option<B>
121    where
122        F: FnMut(Self::Item) -> Option<B>,
123    {
124        crate::future::block_on(self.0.find_map(f))
125    }
126
127    fn position<P>(&mut self, predicate: P) -> Option<usize>
128    where
129        P: FnMut(Self::Item) -> bool,
130    {
131        crate::future::block_on(self.0.position(predicate))
132    }
133}
134
135/// Creates an empty stream.
136///
137/// # Examples
138///
139/// ```
140/// use futures_lite::stream::{self, StreamExt};
141///
142/// # spin_on::spin_on(async {
143/// let mut s = stream::empty::<i32>();
144/// assert_eq!(s.next().await, None);
145/// # })
146/// ```
147pub fn empty<T>() -> Empty<T> {
148    Empty {
149        _marker: PhantomData,
150    }
151}
152
153/// Stream for the [`empty()`] function.
154#[derive(Clone, Debug)]
155#[must_use = "streams do nothing unless polled"]
156pub struct Empty<T> {
157    _marker: PhantomData<T>,
158}
159
160impl<T> Unpin for Empty<T> {}
161
162impl<T> Stream for Empty<T> {
163    type Item = T;
164
165    fn poll_next(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Option<Self::Item>> {
166        Poll::Ready(None)
167    }
168
169    fn size_hint(&self) -> (usize, Option<usize>) {
170        (0, Some(0))
171    }
172}
173
174/// Creates a stream from an iterator.
175///
176/// # Examples
177///
178/// ```
179/// use futures_lite::stream::{self, StreamExt};
180///
181/// # spin_on::spin_on(async {
182/// let mut s = stream::iter(vec![1, 2]);
183///
184/// assert_eq!(s.next().await, Some(1));
185/// assert_eq!(s.next().await, Some(2));
186/// assert_eq!(s.next().await, None);
187/// # })
188/// ```
189pub fn iter<I: IntoIterator>(iter: I) -> Iter<I::IntoIter> {
190    Iter {
191        iter: iter.into_iter(),
192    }
193}
194
195/// Stream for the [`iter()`] function.
196#[derive(Clone, Debug)]
197#[must_use = "streams do nothing unless polled"]
198pub struct Iter<I> {
199    iter: I,
200}
201
202impl<I> Unpin for Iter<I> {}
203
204impl<I: Iterator> Stream for Iter<I> {
205    type Item = I::Item;
206
207    fn poll_next(mut self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
208        Poll::Ready(self.iter.next())
209    }
210
211    fn size_hint(&self) -> (usize, Option<usize>) {
212        self.iter.size_hint()
213    }
214}
215
216/// Creates a stream that yields a single item.
217///
218/// # Examples
219///
220/// ```
221/// use futures_lite::stream::{self, StreamExt};
222///
223/// # spin_on::spin_on(async {
224/// let mut s = stream::once(7);
225///
226/// assert_eq!(s.next().await, Some(7));
227/// assert_eq!(s.next().await, None);
228/// # })
229/// ```
230pub fn once<T>(t: T) -> Once<T> {
231    Once { value: Some(t) }
232}
233
234pin_project! {
235    /// Stream for the [`once()`] function.
236    #[derive(Clone, Debug)]
237    #[must_use = "streams do nothing unless polled"]
238    pub struct Once<T> {
239        value: Option<T>,
240    }
241}
242
243impl<T> Stream for Once<T> {
244    type Item = T;
245
246    fn poll_next(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Option<T>> {
247        Poll::Ready(self.project().value.take())
248    }
249
250    fn size_hint(&self) -> (usize, Option<usize>) {
251        if self.value.is_some() {
252            (1, Some(1))
253        } else {
254            (0, Some(0))
255        }
256    }
257}
258
259/// Creates a stream that is always pending.
260///
261/// # Examples
262///
263/// ```no_run
264/// use futures_lite::stream::{self, StreamExt};
265///
266/// # spin_on::spin_on(async {
267/// let mut s = stream::pending::<i32>();
268/// s.next().await;
269/// unreachable!();
270/// # })
271/// ```
272pub fn pending<T>() -> Pending<T> {
273    Pending {
274        _marker: PhantomData,
275    }
276}
277
278/// Stream for the [`pending()`] function.
279#[derive(Clone, Debug)]
280#[must_use = "streams do nothing unless polled"]
281pub struct Pending<T> {
282    _marker: PhantomData<T>,
283}
284
285impl<T> Unpin for Pending<T> {}
286
287impl<T> Stream for Pending<T> {
288    type Item = T;
289
290    fn poll_next(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Option<T>> {
291        Poll::Pending
292    }
293
294    fn size_hint(&self) -> (usize, Option<usize>) {
295        (0, Some(0))
296    }
297}
298
299/// Creates a stream from a function returning [`Poll`].
300///
301/// # Examples
302///
303/// ```
304/// use futures_lite::stream::{self, StreamExt};
305/// use std::task::{Context, Poll};
306///
307/// # spin_on::spin_on(async {
308/// fn f(_: &mut Context<'_>) -> Poll<Option<i32>> {
309///     Poll::Ready(Some(7))
310/// }
311///
312/// assert_eq!(stream::poll_fn(f).next().await, Some(7));
313/// # })
314/// ```
315pub fn poll_fn<T, F>(f: F) -> PollFn<F>
316where
317    F: FnMut(&mut Context<'_>) -> Poll<Option<T>>,
318{
319    PollFn { f }
320}
321
322/// Stream for the [`poll_fn()`] function.
323#[derive(Clone)]
324#[must_use = "streams do nothing unless polled"]
325pub struct PollFn<F> {
326    f: F,
327}
328
329impl<F> Unpin for PollFn<F> {}
330
331impl<F> fmt::Debug for PollFn<F> {
332    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
333        f.debug_struct("PollFn").finish()
334    }
335}
336
337impl<T, F> Stream for PollFn<F>
338where
339    F: FnMut(&mut Context<'_>) -> Poll<Option<T>>,
340{
341    type Item = T;
342
343    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<T>> {
344        (&mut self.f)(cx)
345    }
346}
347
348/// Creates an infinite stream that yields the same item repeatedly.
349///
350/// # Examples
351///
352/// ```
353/// use futures_lite::stream::{self, StreamExt};
354///
355/// # spin_on::spin_on(async {
356/// let mut s = stream::repeat(7);
357///
358/// assert_eq!(s.next().await, Some(7));
359/// assert_eq!(s.next().await, Some(7));
360/// # })
361/// ```
362pub fn repeat<T: Clone>(item: T) -> Repeat<T> {
363    Repeat { item }
364}
365
366/// Stream for the [`repeat()`] function.
367#[derive(Clone, Debug)]
368#[must_use = "streams do nothing unless polled"]
369pub struct Repeat<T> {
370    item: T,
371}
372
373impl<T> Unpin for Repeat<T> {}
374
375impl<T: Clone> Stream for Repeat<T> {
376    type Item = T;
377
378    fn poll_next(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Option<Self::Item>> {
379        Poll::Ready(Some(self.item.clone()))
380    }
381
382    fn size_hint(&self) -> (usize, Option<usize>) {
383        (usize::max_value(), None)
384    }
385}
386
387/// Creates an infinite stream from a closure that generates items.
388///
389/// # Examples
390///
391/// ```
392/// use futures_lite::stream::{self, StreamExt};
393///
394/// # spin_on::spin_on(async {
395/// let mut s = stream::repeat_with(|| 7);
396///
397/// assert_eq!(s.next().await, Some(7));
398/// assert_eq!(s.next().await, Some(7));
399/// # })
400/// ```
401pub fn repeat_with<T, F>(repeater: F) -> RepeatWith<F>
402where
403    F: FnMut() -> T,
404{
405    RepeatWith { f: repeater }
406}
407
408/// Stream for the [`repeat_with()`] function.
409#[derive(Clone, Debug)]
410#[must_use = "streams do nothing unless polled"]
411pub struct RepeatWith<F> {
412    f: F,
413}
414
415impl<F> Unpin for RepeatWith<F> {}
416
417impl<T, F> Stream for RepeatWith<F>
418where
419    F: FnMut() -> T,
420{
421    type Item = T;
422
423    fn poll_next(mut self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Option<Self::Item>> {
424        let item = (&mut self.f)();
425        Poll::Ready(Some(item))
426    }
427
428    fn size_hint(&self) -> (usize, Option<usize>) {
429        (usize::max_value(), None)
430    }
431}
432
433/// Creates a stream from a seed value and an async closure operating on it.
434///
435/// # Examples
436///
437/// ```
438/// use futures_lite::stream::{self, StreamExt};
439///
440/// # spin_on::spin_on(async {
441/// let s = stream::unfold(0, |mut n| async move {
442///     if n < 2 {
443///         let m = n + 1;
444///         Some((n, m))
445///     } else {
446///         None
447///     }
448/// });
449///
450/// let v: Vec<i32> = s.collect().await;
451/// assert_eq!(v, [0, 1]);
452/// # })
453/// ```
454pub fn unfold<T, F, Fut, Item>(seed: T, f: F) -> Unfold<T, F, Fut>
455where
456    F: FnMut(T) -> Fut,
457    Fut: Future<Output = Option<(Item, T)>>,
458{
459    Unfold {
460        f,
461        state: Some(seed),
462        fut: None,
463    }
464}
465
466pin_project! {
467    /// Stream for the [`unfold()`] function.
468    #[derive(Clone)]
469    #[must_use = "streams do nothing unless polled"]
470    pub struct Unfold<T, F, Fut> {
471        f: F,
472        state: Option<T>,
473        #[pin]
474        fut: Option<Fut>,
475    }
476}
477
478impl<T, F, Fut> fmt::Debug for Unfold<T, F, Fut>
479where
480    T: fmt::Debug,
481    Fut: fmt::Debug,
482{
483    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
484        f.debug_struct("Unfold")
485            .field("state", &self.state)
486            .field("fut", &self.fut)
487            .finish()
488    }
489}
490
491impl<T, F, Fut, Item> Stream for Unfold<T, F, Fut>
492where
493    F: FnMut(T) -> Fut,
494    Fut: Future<Output = Option<(Item, T)>>,
495{
496    type Item = Item;
497
498    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
499        let mut this = self.project();
500
501        if let Some(state) = this.state.take() {
502            this.fut.set(Some((this.f)(state)));
503        }
504
505        let step = ready!(this
506            .fut
507            .as_mut()
508            .as_pin_mut()
509            .expect("`Unfold` must not be polled after it returned `Poll::Ready(None)`")
510            .poll(cx));
511        this.fut.set(None);
512
513        if let Some((item, next_state)) = step {
514            *this.state = Some(next_state);
515            Poll::Ready(Some(item))
516        } else {
517            Poll::Ready(None)
518        }
519    }
520}
521
522/// Creates a stream from a seed value and a fallible async closure operating on it.
523///
524/// # Examples
525///
526/// ```
527/// use futures_lite::stream::{self, StreamExt};
528///
529/// # spin_on::spin_on(async {
530/// let s = stream::try_unfold(0, |mut n| async move {
531///     if n < 2 {
532///         let m = n + 1;
533///         Ok(Some((n, m)))
534///     } else {
535///         std::io::Result::Ok(None)
536///     }
537/// });
538///
539/// let v: Vec<i32> = s.try_collect().await?;
540/// assert_eq!(v, [0, 1]);
541/// # std::io::Result::Ok(()) });
542/// ```
543pub fn try_unfold<T, E, F, Fut, Item>(init: T, f: F) -> TryUnfold<T, F, Fut>
544where
545    F: FnMut(T) -> Fut,
546    Fut: Future<Output = Result<Option<(Item, T)>, E>>,
547{
548    TryUnfold {
549        f,
550        state: Some(init),
551        fut: None,
552    }
553}
554
555pin_project! {
556    /// Stream for the [`try_unfold()`] function.
557    #[derive(Clone)]
558    #[must_use = "streams do nothing unless polled"]
559    pub struct TryUnfold<T, F, Fut> {
560        f: F,
561        state: Option<T>,
562        #[pin]
563        fut: Option<Fut>,
564    }
565}
566
567impl<T, F, Fut> fmt::Debug for TryUnfold<T, F, Fut>
568where
569    T: fmt::Debug,
570    Fut: fmt::Debug,
571{
572    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
573        f.debug_struct("TryUnfold")
574            .field("state", &self.state)
575            .field("fut", &self.fut)
576            .finish()
577    }
578}
579
580impl<T, E, F, Fut, Item> Stream for TryUnfold<T, F, Fut>
581where
582    F: FnMut(T) -> Fut,
583    Fut: Future<Output = Result<Option<(Item, T)>, E>>,
584{
585    type Item = Result<Item, E>;
586
587    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
588        let mut this = self.project();
589
590        if let Some(state) = this.state.take() {
591            this.fut.set(Some((this.f)(state)));
592        }
593
594        match this.fut.as_mut().as_pin_mut() {
595            None => {
596                // The future previously errored
597                Poll::Ready(None)
598            }
599            Some(future) => {
600                let step = ready!(future.poll(cx));
601                this.fut.set(None);
602
603                match step {
604                    Ok(Some((item, next_state))) => {
605                        *this.state = Some(next_state);
606                        Poll::Ready(Some(Ok(item)))
607                    }
608                    Ok(None) => Poll::Ready(None),
609                    Err(e) => Poll::Ready(Some(Err(e))),
610                }
611            }
612        }
613    }
614}
615
616/// Creates a stream that invokes the given future as its first item, and then
617/// produces no more items.
618///
619/// # Example
620///
621/// ```
622/// use futures_lite::{stream, prelude::*};
623///
624/// # spin_on::spin_on(async {
625/// let mut stream = Box::pin(stream::once_future(async { 1 }));
626/// assert_eq!(stream.next().await, Some(1));
627/// assert_eq!(stream.next().await, None);
628/// # });
629/// ```
630pub fn once_future<F: Future>(future: F) -> OnceFuture<F> {
631    OnceFuture {
632        future: Some(future),
633    }
634}
635
636pin_project! {
637    /// Stream for the [`once_future()`] method.
638    #[derive(Debug)]
639    #[must_use = "futures do nothing unless you `.await` or poll them"]
640    pub struct OnceFuture<F> {
641        #[pin]
642        future: Option<F>,
643    }
644}
645
646impl<F: Future> Stream for OnceFuture<F> {
647    type Item = F::Output;
648
649    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
650        let mut this = self.project();
651
652        match this.future.as_mut().as_pin_mut().map(|f| f.poll(cx)) {
653            Some(Poll::Ready(t)) => {
654                this.future.set(None);
655                Poll::Ready(Some(t))
656            }
657            Some(Poll::Pending) => Poll::Pending,
658            None => Poll::Ready(None),
659        }
660    }
661}
662
663/// Extension trait for [`Stream`].
664pub trait StreamExt: Stream {
665    /// A convenience for calling [`Stream::poll_next()`] on `!`[`Unpin`] types.
666    fn poll_next(&mut self, cx: &mut Context<'_>) -> Poll<Option<Self::Item>>
667    where
668        Self: Unpin,
669    {
670        Stream::poll_next(Pin::new(self), cx)
671    }
672
673    /// Retrieves the next item in the stream.
674    ///
675    /// Returns [`None`] when iteration is finished. Stream implementations may choose to or not to
676    /// resume iteration after that.
677    ///
678    /// # Examples
679    ///
680    /// ```
681    /// use futures_lite::stream::{self, StreamExt};
682    ///
683    /// # spin_on::spin_on(async {
684    /// let mut s = stream::iter(1..=3);
685    ///
686    /// assert_eq!(s.next().await, Some(1));
687    /// assert_eq!(s.next().await, Some(2));
688    /// assert_eq!(s.next().await, Some(3));
689    /// assert_eq!(s.next().await, None);
690    /// # });
691    /// ```
692    fn next(&mut self) -> NextFuture<'_, Self>
693    where
694        Self: Unpin,
695    {
696        NextFuture { stream: self }
697    }
698
699    /// Retrieves the next item in the stream.
700    ///
701    /// This is similar to the [`next()`][`StreamExt::next()`] method, but returns
702    /// `Result<Option<T>, E>` rather than `Option<Result<T, E>>`.
703    ///
704    /// Note that `s.try_next().await` is equivalent to `s.next().await.transpose()`.
705    ///
706    /// # Examples
707    ///
708    /// ```
709    /// use futures_lite::stream::{self, StreamExt};
710    ///
711    /// # spin_on::spin_on(async {
712    /// let mut s = stream::iter(vec![Ok(1), Ok(2), Err("error")]);
713    ///
714    /// assert_eq!(s.try_next().await, Ok(Some(1)));
715    /// assert_eq!(s.try_next().await, Ok(Some(2)));
716    /// assert_eq!(s.try_next().await, Err("error"));
717    /// assert_eq!(s.try_next().await, Ok(None));
718    /// # });
719    /// ```
720    fn try_next<T, E>(&mut self) -> TryNextFuture<'_, Self>
721    where
722        Self: Stream<Item = Result<T, E>> + Unpin,
723    {
724        TryNextFuture { stream: self }
725    }
726
727    /// Counts the number of items in the stream.
728    ///
729    /// # Examples
730    ///
731    /// ```
732    /// use futures_lite::stream::{self, StreamExt};
733    ///
734    /// # spin_on::spin_on(async {
735    /// let s1 = stream::iter(vec![0]);
736    /// let s2 = stream::iter(vec![1, 2, 3]);
737    ///
738    /// assert_eq!(s1.count().await, 1);
739    /// assert_eq!(s2.count().await, 3);
740    /// # });
741    /// ```
742    fn count(self) -> CountFuture<Self>
743    where
744        Self: Sized,
745    {
746        CountFuture {
747            stream: self,
748            count: 0,
749        }
750    }
751
752    /// Maps items of the stream to new values using a closure.
753    ///
754    /// # Examples
755    ///
756    /// ```
757    /// use futures_lite::stream::{self, StreamExt};
758    ///
759    /// # spin_on::spin_on(async {
760    /// let s = stream::iter(vec![1, 2, 3]);
761    /// let mut s = s.map(|x| 2 * x);
762    ///
763    /// assert_eq!(s.next().await, Some(2));
764    /// assert_eq!(s.next().await, Some(4));
765    /// assert_eq!(s.next().await, Some(6));
766    /// assert_eq!(s.next().await, None);
767    /// # });
768    /// ```
769    fn map<T, F>(self, f: F) -> Map<Self, F>
770    where
771        Self: Sized,
772        F: FnMut(Self::Item) -> T,
773    {
774        Map { stream: self, f }
775    }
776
777    /// Maps items to streams and then concatenates them.
778    ///
779    /// # Examples
780    ///
781    /// ```
782    /// use futures_lite::stream::{self, StreamExt};
783    ///
784    /// # spin_on::spin_on(async {
785    /// let words = stream::iter(vec!["one", "two"]);
786    ///
787    /// let s: String = words
788    ///     .flat_map(|s| stream::iter(s.chars()))
789    ///     .collect()
790    ///     .await;
791    ///
792    /// assert_eq!(s, "onetwo");
793    /// # });
794    /// ```
795    fn flat_map<U, F>(self, f: F) -> FlatMap<Self, U, F>
796    where
797        Self: Sized,
798        U: Stream,
799        F: FnMut(Self::Item) -> U,
800    {
801        FlatMap {
802            stream: self.map(f),
803            inner_stream: None,
804        }
805    }
806
807    /// Concatenates inner streams.
808    ///
809    /// # Examples
810    ///
811    /// ```
812    /// use futures_lite::stream::{self, StreamExt};
813    ///
814    /// # spin_on::spin_on(async {
815    /// let s1 = stream::iter(vec![1, 2, 3]);
816    /// let s2 = stream::iter(vec![4, 5]);
817    ///
818    /// let s = stream::iter(vec![s1, s2]);
819    /// let v: Vec<_> = s.flatten().collect().await;
820    /// assert_eq!(v, [1, 2, 3, 4, 5]);
821    /// # });
822    /// ```
823    fn flatten(self) -> Flatten<Self>
824    where
825        Self: Sized,
826        Self::Item: Stream,
827    {
828        Flatten {
829            stream: self,
830            inner_stream: None,
831        }
832    }
833
834    /// Maps items of the stream to new values using an async closure.
835    ///
836    /// # Examples
837    ///
838    /// ```
839    /// use futures_lite::pin;
840    /// use futures_lite::stream::{self, StreamExt};
841    ///
842    /// # spin_on::spin_on(async {
843    /// let s = stream::iter(vec![1, 2, 3]);
844    /// let mut s = s.then(|x| async move { 2 * x });
845    ///
846    /// pin!(s);
847    /// assert_eq!(s.next().await, Some(2));
848    /// assert_eq!(s.next().await, Some(4));
849    /// assert_eq!(s.next().await, Some(6));
850    /// assert_eq!(s.next().await, None);
851    /// # });
852    /// ```
853    fn then<F, Fut>(self, f: F) -> Then<Self, F, Fut>
854    where
855        Self: Sized,
856        F: FnMut(Self::Item) -> Fut,
857        Fut: Future,
858    {
859        Then {
860            stream: self,
861            future: None,
862            f,
863        }
864    }
865
866    /// Keeps items of the stream for which `predicate` returns `true`.
867    ///
868    /// # Examples
869    ///
870    /// ```
871    /// use futures_lite::stream::{self, StreamExt};
872    ///
873    /// # spin_on::spin_on(async {
874    /// let s = stream::iter(vec![1, 2, 3, 4]);
875    /// let mut s = s.filter(|i| i % 2 == 0);
876    ///
877    /// assert_eq!(s.next().await, Some(2));
878    /// assert_eq!(s.next().await, Some(4));
879    /// assert_eq!(s.next().await, None);
880    /// # });
881    /// ```
882    fn filter<P>(self, predicate: P) -> Filter<Self, P>
883    where
884        Self: Sized,
885        P: FnMut(&Self::Item) -> bool,
886    {
887        Filter {
888            stream: self,
889            predicate,
890        }
891    }
892
893    /// Filters and maps items of the stream using a closure.
894    ///
895    /// # Examples
896    ///
897    /// ```
898    /// use futures_lite::stream::{self, StreamExt};
899    ///
900    /// # spin_on::spin_on(async {
901    /// let s = stream::iter(vec!["1", "lol", "3", "NaN", "5"]);
902    /// let mut s = s.filter_map(|a| a.parse::<u32>().ok());
903    ///
904    /// assert_eq!(s.next().await, Some(1));
905    /// assert_eq!(s.next().await, Some(3));
906    /// assert_eq!(s.next().await, Some(5));
907    /// assert_eq!(s.next().await, None);
908    /// # });
909    /// ```
910    fn filter_map<T, F>(self, f: F) -> FilterMap<Self, F>
911    where
912        Self: Sized,
913        F: FnMut(Self::Item) -> Option<T>,
914    {
915        FilterMap { stream: self, f }
916    }
917
918    /// Takes only the first `n` items of the stream.
919    ///
920    /// # Examples
921    ///
922    /// ```
923    /// use futures_lite::stream::{self, StreamExt};
924    ///
925    /// # spin_on::spin_on(async {
926    /// let mut s = stream::repeat(7).take(2);
927    ///
928    /// assert_eq!(s.next().await, Some(7));
929    /// assert_eq!(s.next().await, Some(7));
930    /// assert_eq!(s.next().await, None);
931    /// # });
932    /// ```
933    fn take(self, n: usize) -> Take<Self>
934    where
935        Self: Sized,
936    {
937        Take { stream: self, n }
938    }
939
940    /// Takes items while `predicate` returns `true`.
941    ///
942    /// # Examples
943    ///
944    /// ```
945    /// use futures_lite::stream::{self, StreamExt};
946    ///
947    /// # spin_on::spin_on(async {
948    /// let s = stream::iter(vec![1, 2, 3, 4]);
949    /// let mut s = s.take_while(|x| *x < 3);
950    ///
951    /// assert_eq!(s.next().await, Some(1));
952    /// assert_eq!(s.next().await, Some(2));
953    /// assert_eq!(s.next().await, None);
954    /// # });
955    /// ```
956    fn take_while<P>(self, predicate: P) -> TakeWhile<Self, P>
957    where
958        Self: Sized,
959        P: FnMut(&Self::Item) -> bool,
960    {
961        TakeWhile {
962            stream: self,
963            predicate,
964        }
965    }
966
967    /// Skips the first `n` items of the stream.
968    ///
969    /// # Examples
970    ///
971    /// ```
972    /// use futures_lite::stream::{self, StreamExt};
973    ///
974    /// # spin_on::spin_on(async {
975    /// let s = stream::iter(vec![1, 2, 3]);
976    /// let mut s = s.skip(2);
977    ///
978    /// assert_eq!(s.next().await, Some(3));
979    /// assert_eq!(s.next().await, None);
980    /// # });
981    /// ```
982    fn skip(self, n: usize) -> Skip<Self>
983    where
984        Self: Sized,
985    {
986        Skip { stream: self, n }
987    }
988
989    /// Skips items while `predicate` returns `true`.
990    ///
991    /// # Examples
992    ///
993    /// ```
994    /// use futures_lite::stream::{self, StreamExt};
995    ///
996    /// # spin_on::spin_on(async {
997    /// let s = stream::iter(vec![-1i32, 0, 1]);
998    /// let mut s = s.skip_while(|x| x.is_negative());
999    ///
1000    /// assert_eq!(s.next().await, Some(0));
1001    /// assert_eq!(s.next().await, Some(1));
1002    /// assert_eq!(s.next().await, None);
1003    /// # });
1004    /// ```
1005    fn skip_while<P>(self, predicate: P) -> SkipWhile<Self, P>
1006    where
1007        Self: Sized,
1008        P: FnMut(&Self::Item) -> bool,
1009    {
1010        SkipWhile {
1011            stream: self,
1012            predicate: Some(predicate),
1013        }
1014    }
1015
1016    /// Yields every `step`th item.
1017    ///
1018    /// # Panics
1019    ///
1020    /// This method will panic if the `step` is 0.
1021    ///
1022    /// # Examples
1023    ///
1024    /// ```
1025    /// use futures_lite::stream::{self, StreamExt};
1026    ///
1027    /// # spin_on::spin_on(async {
1028    /// let s = stream::iter(vec![0, 1, 2, 3, 4]);
1029    /// let mut s = s.step_by(2);
1030    ///
1031    /// assert_eq!(s.next().await, Some(0));
1032    /// assert_eq!(s.next().await, Some(2));
1033    /// assert_eq!(s.next().await, Some(4));
1034    /// assert_eq!(s.next().await, None);
1035    /// # });
1036    /// ```
1037    fn step_by(self, step: usize) -> StepBy<Self>
1038    where
1039        Self: Sized,
1040    {
1041        assert!(step > 0, "`step` must be greater than zero");
1042        StepBy {
1043            stream: self,
1044            step,
1045            i: 0,
1046        }
1047    }
1048
1049    /// Appends another stream to the end of this one.
1050    ///
1051    /// # Examples
1052    ///
1053    /// ```
1054    /// use futures_lite::stream::{self, StreamExt};
1055    ///
1056    /// # spin_on::spin_on(async {
1057    /// let s1 = stream::iter(vec![1, 2]);
1058    /// let s2 = stream::iter(vec![7, 8]);
1059    /// let mut s = s1.chain(s2);
1060    ///
1061    /// assert_eq!(s.next().await, Some(1));
1062    /// assert_eq!(s.next().await, Some(2));
1063    /// assert_eq!(s.next().await, Some(7));
1064    /// assert_eq!(s.next().await, Some(8));
1065    /// assert_eq!(s.next().await, None);
1066    /// # });
1067    /// ```
1068    fn chain<U>(self, other: U) -> Chain<Self, U>
1069    where
1070        Self: Sized,
1071        U: Stream<Item = Self::Item> + Sized,
1072    {
1073        Chain {
1074            first: self.fuse(),
1075            second: other.fuse(),
1076        }
1077    }
1078
1079    /// Clones all items.
1080    ///
1081    /// # Examples
1082    ///
1083    /// ```
1084    /// use futures_lite::stream::{self, StreamExt};
1085    ///
1086    /// # spin_on::spin_on(async {
1087    /// let s = stream::iter(vec![&1, &2]);
1088    /// let mut s = s.cloned();
1089    ///
1090    /// assert_eq!(s.next().await, Some(1));
1091    /// assert_eq!(s.next().await, Some(2));
1092    /// assert_eq!(s.next().await, None);
1093    /// # });
1094    /// ```
1095    fn cloned<'a, T>(self) -> Cloned<Self>
1096    where
1097        Self: Stream<Item = &'a T> + Sized,
1098        T: Clone + 'a,
1099    {
1100        Cloned { stream: self }
1101    }
1102
1103    /// Copies all items.
1104    ///
1105    /// # Examples
1106    ///
1107    /// ```
1108    /// use futures_lite::stream::{self, StreamExt};
1109    ///
1110    /// # spin_on::spin_on(async {
1111    /// let s = stream::iter(vec![&1, &2]);
1112    /// let mut s = s.copied();
1113    ///
1114    /// assert_eq!(s.next().await, Some(1));
1115    /// assert_eq!(s.next().await, Some(2));
1116    /// assert_eq!(s.next().await, None);
1117    /// # });
1118    /// ```
1119    fn copied<'a, T>(self) -> Copied<Self>
1120    where
1121        Self: Stream<Item = &'a T> + Sized,
1122        T: Copy + 'a,
1123    {
1124        Copied { stream: self }
1125    }
1126
1127    /// Collects all items in the stream into a collection.
1128    ///
1129    /// # Examples
1130    ///
1131    /// ```
1132    /// use futures_lite::stream::{self, StreamExt};
1133    ///
1134    /// # spin_on::spin_on(async {
1135    /// let mut s = stream::iter(1..=3);
1136    ///
1137    /// let items: Vec<_> = s.collect().await;
1138    /// assert_eq!(items, [1, 2, 3]);
1139    /// # });
1140    /// ```
1141    fn collect<C>(self) -> CollectFuture<Self, C>
1142    where
1143        Self: Sized,
1144        C: Default + Extend<Self::Item>,
1145    {
1146        CollectFuture {
1147            stream: self,
1148            collection: Default::default(),
1149        }
1150    }
1151
1152    /// Collects all items in the fallible stream into a collection.
1153    ///
1154    /// ```
1155    /// use futures_lite::stream::{self, StreamExt};
1156    ///
1157    /// # spin_on::spin_on(async {
1158    /// let s = stream::iter(vec![Ok(1), Err(2), Ok(3)]);
1159    /// let res: Result<Vec<i32>, i32> = s.try_collect().await;
1160    /// assert_eq!(res, Err(2));
1161    ///
1162    /// let s = stream::iter(vec![Ok(1), Ok(2), Ok(3)]);
1163    /// let res: Result<Vec<i32>, i32> = s.try_collect().await;
1164    /// assert_eq!(res, Ok(vec![1, 2, 3]));
1165    /// # })
1166    /// ```
1167    fn try_collect<T, E, C>(self) -> TryCollectFuture<Self, C>
1168    where
1169        Self: Stream<Item = Result<T, E>> + Sized,
1170        C: Default + Extend<T>,
1171    {
1172        TryCollectFuture {
1173            stream: self,
1174            items: Default::default(),
1175        }
1176    }
1177
1178    /// Partitions items into those for which `predicate` is `true` and those for which it is
1179    /// `false`, and then collects them into two collections.
1180    ///
1181    /// # Examples
1182    ///
1183    /// ```
1184    /// use futures_lite::stream::{self, StreamExt};
1185    ///
1186    /// # spin_on::spin_on(async {
1187    /// let s = stream::iter(vec![1, 2, 3]);
1188    /// let (even, odd): (Vec<_>, Vec<_>) = s.partition(|&n| n % 2 == 0).await;
1189    ///
1190    /// assert_eq!(even, &[2]);
1191    /// assert_eq!(odd, &[1, 3]);
1192    /// # })
1193    /// ```
1194    fn partition<B, P>(self, predicate: P) -> PartitionFuture<Self, P, B>
1195    where
1196        Self: Sized,
1197        B: Default + Extend<Self::Item>,
1198        P: FnMut(&Self::Item) -> bool,
1199    {
1200        PartitionFuture {
1201            stream: self,
1202            predicate,
1203            res: Some(Default::default()),
1204        }
1205    }
1206
1207    /// Accumulates a computation over the stream.
1208    ///
1209    /// The computation begins with the accumulator value set to `init`, and then applies `f` to
1210    /// the accumulator and each item in the stream. The final accumulator value is returned.
1211    ///
1212    /// # Examples
1213    ///
1214    /// ```
1215    /// use futures_lite::stream::{self, StreamExt};
1216    ///
1217    /// # spin_on::spin_on(async {
1218    /// let s = stream::iter(vec![1, 2, 3]);
1219    /// let sum = s.fold(0, |acc, x| acc + x).await;
1220    ///
1221    /// assert_eq!(sum, 6);
1222    /// # })
1223    /// ```
1224    fn fold<T, F>(self, init: T, f: F) -> FoldFuture<Self, F, T>
1225    where
1226        Self: Sized,
1227        F: FnMut(T, Self::Item) -> T,
1228    {
1229        FoldFuture {
1230            stream: self,
1231            f,
1232            acc: Some(init),
1233        }
1234    }
1235
1236    /// Accumulates a fallible computation over the stream.
1237    ///
1238    /// The computation begins with the accumulator value set to `init`, and then applies `f` to
1239    /// the accumulator and each item in the stream. The final accumulator value is returned, or an
1240    /// error if `f` failed the computation.
1241    ///
1242    /// # Examples
1243    ///
1244    /// ```
1245    /// use futures_lite::stream::{self, StreamExt};
1246    ///
1247    /// # spin_on::spin_on(async {
1248    /// let mut s = stream::iter(vec![Ok(1), Ok(2), Ok(3)]);
1249    ///
1250    /// let sum = s.try_fold(0, |acc, v| {
1251    ///     if (acc + v) % 2 == 1 {
1252    ///         Ok(acc + v)
1253    ///     } else {
1254    ///         Err("fail")
1255    ///     }
1256    /// })
1257    /// .await;
1258    ///
1259    /// assert_eq!(sum, Err("fail"));
1260    /// # })
1261    /// ```
1262    fn try_fold<T, E, F, B>(&mut self, init: B, f: F) -> TryFoldFuture<'_, Self, F, B>
1263    where
1264        Self: Stream<Item = Result<T, E>> + Unpin + Sized,
1265        F: FnMut(B, T) -> Result<B, E>,
1266    {
1267        TryFoldFuture {
1268            stream: self,
1269            f,
1270            acc: Some(init),
1271        }
1272    }
1273
1274    /// Maps items of the stream to new values using a state value and a closure.
1275    ///
1276    /// Scanning begins with the inital state set to `initial_state`, and then applies `f` to the
1277    /// state and each item in the stream. The stream stops when `f` returns `None`.
1278    ///
1279    /// # Examples
1280    ///
1281    /// ```
1282    /// use futures_lite::stream::{self, StreamExt};
1283    ///
1284    /// # spin_on::spin_on(async {
1285    /// let s = stream::iter(vec![1, 2, 3]);
1286    /// let mut s = s.scan(1, |state, x| {
1287    ///     *state = *state * x;
1288    ///     Some(-*state)
1289    /// });
1290    ///
1291    /// assert_eq!(s.next().await, Some(-1));
1292    /// assert_eq!(s.next().await, Some(-2));
1293    /// assert_eq!(s.next().await, Some(-6));
1294    /// assert_eq!(s.next().await, None);
1295    /// # })
1296    /// ```
1297    fn scan<St, B, F>(self, initial_state: St, f: F) -> Scan<Self, St, F>
1298    where
1299        Self: Sized,
1300        F: FnMut(&mut St, Self::Item) -> Option<B>,
1301    {
1302        Scan {
1303            stream: self,
1304            state_f: (initial_state, f),
1305        }
1306    }
1307
1308    /// Fuses the stream so that it stops yielding items after the first [`None`].
1309    ///
1310    /// # Examples
1311    ///
1312    /// ```
1313    /// use futures_lite::stream::{self, StreamExt};
1314    ///
1315    /// # spin_on::spin_on(async {
1316    /// let mut s = stream::once(1).fuse();
1317    ///
1318    /// assert_eq!(s.next().await, Some(1));
1319    /// assert_eq!(s.next().await, None);
1320    /// assert_eq!(s.next().await, None);
1321    /// # })
1322    /// ```
1323    fn fuse(self) -> Fuse<Self>
1324    where
1325        Self: Sized,
1326    {
1327        Fuse {
1328            stream: self,
1329            done: false,
1330        }
1331    }
1332
1333    /// Repeats the stream from beginning to end, forever.
1334    ///
1335    /// # Examples
1336    ///
1337    /// ```
1338    /// use futures_lite::stream::{self, StreamExt};
1339    ///
1340    /// # spin_on::spin_on(async {
1341    /// let mut s = stream::iter(vec![1, 2]).cycle();
1342    ///
1343    /// assert_eq!(s.next().await, Some(1));
1344    /// assert_eq!(s.next().await, Some(2));
1345    /// assert_eq!(s.next().await, Some(1));
1346    /// assert_eq!(s.next().await, Some(2));
1347    /// # });
1348    /// ```
1349    fn cycle(self) -> Cycle<Self>
1350    where
1351        Self: Clone + Sized,
1352    {
1353        Cycle {
1354            orig: self.clone(),
1355            stream: self,
1356        }
1357    }
1358
1359    /// Enumerates items, mapping them to `(index, item)`.
1360    ///
1361    /// # Examples
1362    ///
1363    /// ```
1364    /// use futures_lite::stream::{self, StreamExt};
1365    ///
1366    /// # spin_on::spin_on(async {
1367    /// let s = stream::iter(vec!['a', 'b', 'c']);
1368    /// let mut s = s.enumerate();
1369    ///
1370    /// assert_eq!(s.next().await, Some((0, 'a')));
1371    /// assert_eq!(s.next().await, Some((1, 'b')));
1372    /// assert_eq!(s.next().await, Some((2, 'c')));
1373    /// assert_eq!(s.next().await, None);
1374    /// # });
1375    /// ```
1376    fn enumerate(self) -> Enumerate<Self>
1377    where
1378        Self: Sized,
1379    {
1380        Enumerate { stream: self, i: 0 }
1381    }
1382
1383    /// Calls a closure on each item and passes it on.
1384    ///
1385    /// # Examples
1386    ///
1387    /// ```
1388    /// use futures_lite::stream::{self, StreamExt};
1389    ///
1390    /// # spin_on::spin_on(async {
1391    /// let s = stream::iter(vec![1, 2, 3, 4, 5]);
1392    ///
1393    /// let sum = s
1394    ///    .inspect(|x| println!("about to filter {}", x))
1395    ///    .filter(|x| x % 2 == 0)
1396    ///    .inspect(|x| println!("made it through filter: {}", x))
1397    ///    .fold(0, |sum, i| sum + i)
1398    ///    .await;
1399    /// # });
1400    /// ```
1401    fn inspect<F>(self, f: F) -> Inspect<Self, F>
1402    where
1403        Self: Sized,
1404        F: FnMut(&Self::Item),
1405    {
1406        Inspect { stream: self, f }
1407    }
1408
1409    /// Gets the `n`th item of the stream.
1410    ///
1411    /// In the end, `n+1` items of the stream will be consumed.
1412    ///
1413    /// # Examples
1414    ///
1415    /// ```
1416    /// use futures_lite::stream::{self, StreamExt};
1417    ///
1418    /// # spin_on::spin_on(async {
1419    /// let mut s = stream::iter(vec![0, 1, 2, 3, 4, 5, 6, 7]);
1420    ///
1421    /// assert_eq!(s.nth(2).await, Some(2));
1422    /// assert_eq!(s.nth(2).await, Some(5));
1423    /// assert_eq!(s.nth(2).await, None);
1424    /// # });
1425    /// ```
1426    fn nth(&mut self, n: usize) -> NthFuture<'_, Self>
1427    where
1428        Self: Unpin,
1429    {
1430        NthFuture { stream: self, n }
1431    }
1432
1433    /// Returns the last item in the stream.
1434    ///
1435    /// # Examples
1436    ///
1437    /// ```
1438    /// use futures_lite::stream::{self, StreamExt};
1439    ///
1440    /// # spin_on::spin_on(async {
1441    /// let s = stream::iter(vec![1, 2, 3, 4]);
1442    /// assert_eq!(s.last().await, Some(4));
1443    ///
1444    /// let s = stream::empty::<i32>();
1445    /// assert_eq!(s.last().await, None);
1446    /// # });
1447    /// ```
1448    fn last(self) -> LastFuture<Self>
1449    where
1450        Self: Sized,
1451    {
1452        LastFuture {
1453            stream: self,
1454            last: None,
1455        }
1456    }
1457
1458    /// Finds the first item of the stream for which `predicate` returns `true`.
1459    ///
1460    /// # Examples
1461    ///
1462    /// ```
1463    /// use futures_lite::stream::{self, StreamExt};
1464    ///
1465    /// # spin_on::spin_on(async {
1466    /// let mut s = stream::iter(vec![11, 12, 13, 14]);
1467    ///
1468    /// assert_eq!(s.find(|x| *x % 2 == 0).await, Some(12));
1469    /// assert_eq!(s.next().await, Some(13));
1470    /// # });
1471    /// ```
1472    fn find<P>(&mut self, predicate: P) -> FindFuture<'_, Self, P>
1473    where
1474        Self: Unpin,
1475        P: FnMut(&Self::Item) -> bool,
1476    {
1477        FindFuture {
1478            stream: self,
1479            predicate,
1480        }
1481    }
1482
1483    /// Applies a closure to items in the stream and returns the first [`Some`] result.
1484    ///
1485    /// # Examples
1486    ///
1487    /// ```
1488    /// use futures_lite::stream::{self, StreamExt};
1489    ///
1490    /// # spin_on::spin_on(async {
1491    /// let mut s = stream::iter(vec!["lol", "NaN", "2", "5"]);
1492    /// let number = s.find_map(|s| s.parse().ok()).await;
1493    ///
1494    /// assert_eq!(number, Some(2));
1495    /// # });
1496    /// ```
1497    fn find_map<F, B>(&mut self, f: F) -> FindMapFuture<'_, Self, F>
1498    where
1499        Self: Unpin,
1500        F: FnMut(Self::Item) -> Option<B>,
1501    {
1502        FindMapFuture { stream: self, f }
1503    }
1504
1505    /// Finds the index of the first item of the stream for which `predicate` returns `true`.
1506    ///
1507    /// # Examples
1508    ///
1509    /// ```
1510    /// use futures_lite::stream::{self, StreamExt};
1511    ///
1512    /// # spin_on::spin_on(async {
1513    /// let mut s = stream::iter(vec![0, 1, 2, 3, 4, 5]);
1514    ///
1515    /// assert_eq!(s.position(|x| x == 2).await, Some(2));
1516    /// assert_eq!(s.position(|x| x == 3).await, Some(0));
1517    /// assert_eq!(s.position(|x| x == 9).await, None);
1518    /// # });
1519    /// ```
1520    fn position<P>(&mut self, predicate: P) -> PositionFuture<'_, Self, P>
1521    where
1522        Self: Unpin,
1523        P: FnMut(Self::Item) -> bool,
1524    {
1525        PositionFuture {
1526            stream: self,
1527            predicate,
1528            index: 0,
1529        }
1530    }
1531
1532    /// Tests if `predicate` returns `true` for all items in the stream.
1533    ///
1534    /// The result is `true` for an empty stream.
1535    ///
1536    /// # Examples
1537    ///
1538    /// ```
1539    /// use futures_lite::stream::{self, StreamExt};
1540    ///
1541    /// # spin_on::spin_on(async {
1542    /// let mut s = stream::iter(vec![1, 2, 3]);
1543    /// assert!(!s.all(|x| x % 2 == 0).await);
1544    ///
1545    /// let mut s = stream::iter(vec![2, 4, 6, 8]);
1546    /// assert!(s.all(|x| x % 2 == 0).await);
1547    ///
1548    /// let mut s = stream::empty::<i32>();
1549    /// assert!(s.all(|x| x % 2 == 0).await);
1550    /// # });
1551    /// ```
1552    fn all<P>(&mut self, predicate: P) -> AllFuture<'_, Self, P>
1553    where
1554        Self: Unpin,
1555        P: FnMut(Self::Item) -> bool,
1556    {
1557        AllFuture {
1558            stream: self,
1559            predicate,
1560        }
1561    }
1562
1563    /// Tests if `predicate` returns `true` for any item in the stream.
1564    ///
1565    /// The result is `false` for an empty stream.
1566    ///
1567    /// # Examples
1568    ///
1569    /// ```
1570    /// use futures_lite::stream::{self, StreamExt};
1571    ///
1572    /// # spin_on::spin_on(async {
1573    /// let mut s = stream::iter(vec![1, 3, 5, 7]);
1574    /// assert!(!s.any(|x| x % 2 == 0).await);
1575    ///
1576    /// let mut s = stream::iter(vec![1, 2, 3]);
1577    /// assert!(s.any(|x| x % 2 == 0).await);
1578    ///
1579    /// let mut s = stream::empty::<i32>();
1580    /// assert!(!s.any(|x| x % 2 == 0).await);
1581    /// # });
1582    /// ```
1583    fn any<P>(&mut self, predicate: P) -> AnyFuture<'_, Self, P>
1584    where
1585        Self: Unpin,
1586        P: FnMut(Self::Item) -> bool,
1587    {
1588        AnyFuture {
1589            stream: self,
1590            predicate,
1591        }
1592    }
1593
1594    /// Calls a closure on each item of the stream.
1595    ///
1596    /// # Examples
1597    ///
1598    /// ```
1599    /// use futures_lite::stream::{self, StreamExt};
1600    ///
1601    /// # spin_on::spin_on(async {
1602    /// let mut s = stream::iter(vec![1, 2, 3]);
1603    /// s.for_each(|s| println!("{}", s)).await;
1604    /// # });
1605    /// ```
1606    fn for_each<F>(self, f: F) -> ForEachFuture<Self, F>
1607    where
1608        Self: Sized,
1609        F: FnMut(Self::Item),
1610    {
1611        ForEachFuture { stream: self, f }
1612    }
1613
1614    /// Calls a fallible closure on each item of the stream, stopping on first error.
1615    ///
1616    /// # Examples
1617    ///
1618    /// ```
1619    /// use futures_lite::stream::{self, StreamExt};
1620    ///
1621    /// # spin_on::spin_on(async {
1622    /// let mut s = stream::iter(vec![0, 1, 2, 3]);
1623    ///
1624    /// let mut v = vec![];
1625    /// let res = s
1626    ///     .try_for_each(|n| {
1627    ///         if n < 2 {
1628    ///             v.push(n);
1629    ///             Ok(())
1630    ///         } else {
1631    ///             Err("too big")
1632    ///         }
1633    ///     })
1634    ///     .await;
1635    ///
1636    /// assert_eq!(v, &[0, 1]);
1637    /// assert_eq!(res, Err("too big"));
1638    /// # });
1639    /// ```
1640    fn try_for_each<F, E>(&mut self, f: F) -> TryForEachFuture<'_, Self, F>
1641    where
1642        Self: Unpin,
1643        F: FnMut(Self::Item) -> Result<(), E>,
1644    {
1645        TryForEachFuture { stream: self, f }
1646    }
1647
1648    /// Zips up two streams into a single stream of pairs.
1649    ///
1650    /// The stream of pairs stops when either of the original two streams is exhausted.
1651    ///
1652    /// # Examples
1653    ///
1654    /// ```
1655    /// use futures_lite::stream::{self, StreamExt};
1656    ///
1657    /// # spin_on::spin_on(async {
1658    /// let l = stream::iter(vec![1, 2, 3]);
1659    /// let r = stream::iter(vec![4, 5, 6, 7]);
1660    /// let mut s = l.zip(r);
1661    ///
1662    /// assert_eq!(s.next().await, Some((1, 4)));
1663    /// assert_eq!(s.next().await, Some((2, 5)));
1664    /// assert_eq!(s.next().await, Some((3, 6)));
1665    /// assert_eq!(s.next().await, None);
1666    /// # });
1667    /// ```
1668    fn zip<U>(self, other: U) -> Zip<Self, U>
1669    where
1670        Self: Sized,
1671        U: Stream,
1672    {
1673        Zip {
1674            item_slot: None,
1675            first: self,
1676            second: other,
1677        }
1678    }
1679
1680    /// Collects a stream of pairs into a pair of collections.
1681    ///
1682    /// # Examples
1683    ///
1684    /// ```
1685    /// use futures_lite::stream::{self, StreamExt};
1686    ///
1687    /// # spin_on::spin_on(async {
1688    /// let s = stream::iter(vec![(1, 2), (3, 4)]);
1689    /// let (left, right): (Vec<_>, Vec<_>) = s.unzip().await;
1690    ///
1691    /// assert_eq!(left, [1, 3]);
1692    /// assert_eq!(right, [2, 4]);
1693    /// # });
1694    /// ```
1695    fn unzip<A, B, FromA, FromB>(self) -> UnzipFuture<Self, FromA, FromB>
1696    where
1697        FromA: Default + Extend<A>,
1698        FromB: Default + Extend<B>,
1699        Self: Stream<Item = (A, B)> + Sized,
1700    {
1701        UnzipFuture {
1702            stream: self,
1703            res: Some(Default::default()),
1704        }
1705    }
1706
1707    /// Merges with `other` stream, preferring items from `self` whenever both streams are ready.
1708    ///
1709    /// # Examples
1710    ///
1711    /// ```
1712    /// use futures_lite::stream::{self, StreamExt};
1713    /// use futures_lite::stream::{once, pending};
1714    ///
1715    /// # spin_on::spin_on(async {
1716    /// assert_eq!(once(1).or(pending()).next().await, Some(1));
1717    /// assert_eq!(pending().or(once(2)).next().await, Some(2));
1718    ///
1719    /// // The first future wins.
1720    /// assert_eq!(once(1).or(once(2)).next().await, Some(1));
1721    /// # })
1722    /// ```
1723    fn or<S>(self, other: S) -> Or<Self, S>
1724    where
1725        Self: Sized,
1726        S: Stream<Item = Self::Item>,
1727    {
1728        Or {
1729            stream1: self,
1730            stream2: other,
1731        }
1732    }
1733
1734    /// Merges with `other` stream, with no preference for either stream when both are ready.
1735    ///
1736    /// # Examples
1737    ///
1738    /// ```
1739    /// use futures_lite::stream::{self, StreamExt};
1740    /// use futures_lite::stream::{once, pending};
1741    ///
1742    /// # spin_on::spin_on(async {
1743    /// assert_eq!(once(1).race(pending()).next().await, Some(1));
1744    /// assert_eq!(pending().race(once(2)).next().await, Some(2));
1745    ///
1746    /// // One of the two stream is randomly chosen as the winner.
1747    /// let res = once(1).race(once(2)).next().await;
1748    /// # })
1749    /// ```
1750    #[cfg(feature = "std")]
1751    fn race<S>(self, other: S) -> Race<Self, S>
1752    where
1753        Self: Sized,
1754        S: Stream<Item = Self::Item>,
1755    {
1756        Race {
1757            stream1: self,
1758            stream2: other,
1759        }
1760    }
1761
1762    /// Boxes the stream and changes its type to `dyn Stream + Send + 'a`.
1763    ///
1764    /// # Examples
1765    ///
1766    /// ```
1767    /// use futures_lite::stream::{self, StreamExt};
1768    ///
1769    /// # spin_on::spin_on(async {
1770    /// let a = stream::once(1);
1771    /// let b = stream::empty();
1772    ///
1773    /// // Streams of different types can be stored in
1774    /// // the same collection when they are boxed:
1775    /// let streams = vec![a.boxed(), b.boxed()];
1776    /// # })
1777    /// ```
1778    #[cfg(feature = "alloc")]
1779    fn boxed<'a>(self) -> Pin<Box<dyn Stream<Item = Self::Item> + Send + 'a>>
1780    where
1781        Self: Send + Sized + 'a,
1782    {
1783        Box::pin(self)
1784    }
1785
1786    /// Boxes the stream and changes its type to `dyn Stream + 'a`.
1787    ///
1788    /// # Examples
1789    ///
1790    /// ```
1791    /// use futures_lite::stream::{self, StreamExt};
1792    ///
1793    /// # spin_on::spin_on(async {
1794    /// let a = stream::once(1);
1795    /// let b = stream::empty();
1796    ///
1797    /// // Streams of different types can be stored in
1798    /// // the same collection when they are boxed:
1799    /// let streams = vec![a.boxed_local(), b.boxed_local()];
1800    /// # })
1801    /// ```
1802    #[cfg(feature = "alloc")]
1803    fn boxed_local<'a>(self) -> Pin<Box<dyn Stream<Item = Self::Item> + 'a>>
1804    where
1805        Self: Sized + 'a,
1806    {
1807        Box::pin(self)
1808    }
1809}
1810
1811impl<S: Stream + ?Sized> StreamExt for S {}
1812
1813/// Type alias for `Pin<Box<dyn Stream<Item = T> + Send + 'static>>`.
1814///
1815/// # Examples
1816///
1817/// ```
1818/// use futures_lite::stream::{self, StreamExt};
1819///
1820/// // These two lines are equivalent:
1821/// let s1: stream::Boxed<i32> = stream::once(7).boxed();
1822/// let s2: stream::Boxed<i32> = Box::pin(stream::once(7));
1823/// ```
1824#[cfg(feature = "alloc")]
1825pub type Boxed<T> = Pin<Box<dyn Stream<Item = T> + Send + 'static>>;
1826
1827/// Type alias for `Pin<Box<dyn Stream<Item = T> + 'static>>`.
1828///
1829/// # Examples
1830///
1831/// ```
1832/// use futures_lite::stream::{self, StreamExt};
1833///
1834/// // These two lines are equivalent:
1835/// let s1: stream::BoxedLocal<i32> = stream::once(7).boxed_local();
1836/// let s2: stream::BoxedLocal<i32> = Box::pin(stream::once(7));
1837/// ```
1838#[cfg(feature = "alloc")]
1839pub type BoxedLocal<T> = Pin<Box<dyn Stream<Item = T> + 'static>>;
1840
1841/// Future for the [`StreamExt::next()`] method.
1842#[derive(Debug)]
1843#[must_use = "futures do nothing unless you `.await` or poll them"]
1844pub struct NextFuture<'a, S: ?Sized> {
1845    stream: &'a mut S,
1846}
1847
1848impl<S: Unpin + ?Sized> Unpin for NextFuture<'_, S> {}
1849
1850impl<S: Stream + Unpin + ?Sized> Future for NextFuture<'_, S> {
1851    type Output = Option<S::Item>;
1852
1853    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
1854        self.stream.poll_next(cx)
1855    }
1856}
1857
1858/// Future for the [`StreamExt::try_next()`] method.
1859#[derive(Debug)]
1860#[must_use = "futures do nothing unless you `.await` or poll them"]
1861pub struct TryNextFuture<'a, S: ?Sized> {
1862    stream: &'a mut S,
1863}
1864
1865impl<S: Unpin + ?Sized> Unpin for TryNextFuture<'_, S> {}
1866
1867impl<T, E, S> Future for TryNextFuture<'_, S>
1868where
1869    S: Stream<Item = Result<T, E>> + Unpin + ?Sized,
1870{
1871    type Output = Result<Option<T>, E>;
1872
1873    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
1874        let res = ready!(self.stream.poll_next(cx));
1875        Poll::Ready(res.transpose())
1876    }
1877}
1878
1879pin_project! {
1880    /// Future for the [`StreamExt::count()`] method.
1881    #[derive(Debug)]
1882    #[must_use = "futures do nothing unless you `.await` or poll them"]
1883    pub struct CountFuture<S: ?Sized> {
1884        count: usize,
1885        #[pin]
1886        stream: S,
1887    }
1888}
1889
1890impl<S: Stream + ?Sized> Future for CountFuture<S> {
1891    type Output = usize;
1892
1893    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
1894        loop {
1895            match ready!(self.as_mut().project().stream.poll_next(cx)) {
1896                None => return Poll::Ready(self.count),
1897                Some(_) => *self.as_mut().project().count += 1,
1898            }
1899        }
1900    }
1901}
1902
1903pin_project! {
1904    /// Future for the [`StreamExt::collect()`] method.
1905    #[derive(Debug)]
1906    #[must_use = "futures do nothing unless you `.await` or poll them"]
1907    pub struct CollectFuture<S, C> {
1908        #[pin]
1909        stream: S,
1910        collection: C,
1911    }
1912}
1913
1914impl<S, C> Future for CollectFuture<S, C>
1915where
1916    S: Stream,
1917    C: Default + Extend<S::Item>,
1918{
1919    type Output = C;
1920
1921    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<C> {
1922        let mut this = self.as_mut().project();
1923        loop {
1924            match ready!(this.stream.as_mut().poll_next(cx)) {
1925                Some(e) => this.collection.extend(Some(e)),
1926                None => return Poll::Ready(mem::take(self.project().collection)),
1927            }
1928        }
1929    }
1930}
1931
1932pin_project! {
1933    /// Future for the [`StreamExt::try_collect()`] method.
1934    #[derive(Debug)]
1935    #[must_use = "futures do nothing unless you `.await` or poll them"]
1936    pub struct TryCollectFuture<S, C> {
1937        #[pin]
1938        stream: S,
1939        items: C,
1940    }
1941}
1942
1943impl<T, E, S, C> Future for TryCollectFuture<S, C>
1944where
1945    S: Stream<Item = Result<T, E>>,
1946    C: Default + Extend<T>,
1947{
1948    type Output = Result<C, E>;
1949
1950    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
1951        let mut this = self.project();
1952        Poll::Ready(Ok(loop {
1953            match ready!(this.stream.as_mut().poll_next(cx)?) {
1954                Some(x) => this.items.extend(Some(x)),
1955                None => break mem::take(this.items),
1956            }
1957        }))
1958    }
1959}
1960
1961pin_project! {
1962    /// Future for the [`StreamExt::partition()`] method.
1963    #[derive(Debug)]
1964    #[must_use = "futures do nothing unless you `.await` or poll them"]
1965    pub struct PartitionFuture<S, P, B> {
1966        #[pin]
1967        stream: S,
1968        predicate: P,
1969        res: Option<(B, B)>,
1970    }
1971}
1972
1973impl<S, P, B> Future for PartitionFuture<S, P, B>
1974where
1975    S: Stream + Sized,
1976    P: FnMut(&S::Item) -> bool,
1977    B: Default + Extend<S::Item>,
1978{
1979    type Output = (B, B);
1980
1981    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
1982        let mut this = self.project();
1983        loop {
1984            match ready!(this.stream.as_mut().poll_next(cx)) {
1985                Some(v) => {
1986                    let res = this.res.as_mut().unwrap();
1987                    if (this.predicate)(&v) {
1988                        res.0.extend(Some(v))
1989                    } else {
1990                        res.1.extend(Some(v))
1991                    }
1992                }
1993                None => return Poll::Ready(this.res.take().unwrap()),
1994            }
1995        }
1996    }
1997}
1998
1999pin_project! {
2000    /// Future for the [`StreamExt::fold()`] method.
2001    #[derive(Debug)]
2002    #[must_use = "futures do nothing unless you `.await` or poll them"]
2003    pub struct FoldFuture<S, F, T> {
2004        #[pin]
2005        stream: S,
2006        f: F,
2007        acc: Option<T>,
2008    }
2009}
2010
2011impl<S, F, T> Future for FoldFuture<S, F, T>
2012where
2013    S: Stream,
2014    F: FnMut(T, S::Item) -> T,
2015{
2016    type Output = T;
2017
2018    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
2019        let mut this = self.project();
2020        loop {
2021            match ready!(this.stream.as_mut().poll_next(cx)) {
2022                Some(v) => {
2023                    let old = this.acc.take().unwrap();
2024                    let new = (this.f)(old, v);
2025                    *this.acc = Some(new);
2026                }
2027                None => return Poll::Ready(this.acc.take().unwrap()),
2028            }
2029        }
2030    }
2031}
2032
2033/// Future for the [`StreamExt::try_fold()`] method.
2034#[derive(Debug)]
2035#[must_use = "futures do nothing unless you `.await` or poll them"]
2036pub struct TryFoldFuture<'a, S, F, B> {
2037    stream: &'a mut S,
2038    f: F,
2039    acc: Option<B>,
2040}
2041
2042impl<'a, S, F, B> Unpin for TryFoldFuture<'a, S, F, B> {}
2043
2044impl<'a, T, E, S, F, B> Future for TryFoldFuture<'a, S, F, B>
2045where
2046    S: Stream<Item = Result<T, E>> + Unpin,
2047    F: FnMut(B, T) -> Result<B, E>,
2048{
2049    type Output = Result<B, E>;
2050
2051    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
2052        loop {
2053            match ready!(self.stream.poll_next(cx)) {
2054                Some(Err(e)) => return Poll::Ready(Err(e)),
2055                Some(Ok(t)) => {
2056                    let old = self.acc.take().unwrap();
2057                    let new = (&mut self.f)(old, t);
2058
2059                    match new {
2060                        Ok(t) => self.acc = Some(t),
2061                        Err(e) => return Poll::Ready(Err(e)),
2062                    }
2063                }
2064                None => return Poll::Ready(Ok(self.acc.take().unwrap())),
2065            }
2066        }
2067    }
2068}
2069
2070pin_project! {
2071    /// Stream for the [`StreamExt::scan()`] method.
2072    #[derive(Clone, Debug)]
2073    #[must_use = "streams do nothing unless polled"]
2074    pub struct Scan<S, St, F> {
2075        #[pin]
2076        stream: S,
2077        state_f: (St, F),
2078    }
2079}
2080
2081impl<S, St, F, B> Stream for Scan<S, St, F>
2082where
2083    S: Stream,
2084    F: FnMut(&mut St, S::Item) -> Option<B>,
2085{
2086    type Item = B;
2087
2088    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<B>> {
2089        let mut this = self.project();
2090        this.stream.as_mut().poll_next(cx).map(|item| {
2091            item.and_then(|item| {
2092                let (state, f) = this.state_f;
2093                f(state, item)
2094            })
2095        })
2096    }
2097}
2098
2099pin_project! {
2100    /// Stream for the [`StreamExt::fuse()`] method.
2101    #[derive(Clone, Debug)]
2102    #[must_use = "streams do nothing unless polled"]
2103    pub struct Fuse<S> {
2104        #[pin]
2105        stream: S,
2106        done: bool,
2107    }
2108}
2109
2110impl<S: Stream> Stream for Fuse<S> {
2111    type Item = S::Item;
2112
2113    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<S::Item>> {
2114        let this = self.project();
2115
2116        if *this.done {
2117            Poll::Ready(None)
2118        } else {
2119            let next = ready!(this.stream.poll_next(cx));
2120            if next.is_none() {
2121                *this.done = true;
2122            }
2123            Poll::Ready(next)
2124        }
2125    }
2126}
2127
2128pin_project! {
2129    /// Stream for the [`StreamExt::map()`] method.
2130    #[derive(Clone, Debug)]
2131    #[must_use = "streams do nothing unless polled"]
2132    pub struct Map<S, F> {
2133        #[pin]
2134        stream: S,
2135        f: F,
2136    }
2137}
2138
2139impl<S, F, T> Stream for Map<S, F>
2140where
2141    S: Stream,
2142    F: FnMut(S::Item) -> T,
2143{
2144    type Item = T;
2145
2146    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
2147        let this = self.project();
2148        let next = ready!(this.stream.poll_next(cx));
2149        Poll::Ready(next.map(this.f))
2150    }
2151
2152    fn size_hint(&self) -> (usize, Option<usize>) {
2153        self.stream.size_hint()
2154    }
2155}
2156
2157pin_project! {
2158    /// Stream for the [`StreamExt::flat_map()`] method.
2159    #[derive(Clone, Debug)]
2160    #[must_use = "streams do nothing unless polled"]
2161    pub struct FlatMap<S, U, F> {
2162        #[pin]
2163        stream: Map<S, F>,
2164        #[pin]
2165        inner_stream: Option<U>,
2166    }
2167}
2168
2169impl<S, U, F> Stream for FlatMap<S, U, F>
2170where
2171    S: Stream,
2172    U: Stream,
2173    F: FnMut(S::Item) -> U,
2174{
2175    type Item = U::Item;
2176
2177    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
2178        let mut this = self.project();
2179        loop {
2180            if let Some(inner) = this.inner_stream.as_mut().as_pin_mut() {
2181                match ready!(inner.poll_next(cx)) {
2182                    Some(item) => return Poll::Ready(Some(item)),
2183                    None => this.inner_stream.set(None),
2184                }
2185            }
2186
2187            match ready!(this.stream.as_mut().poll_next(cx)) {
2188                Some(stream) => this.inner_stream.set(Some(stream)),
2189                None => return Poll::Ready(None),
2190            }
2191        }
2192    }
2193}
2194
2195pin_project! {
2196    /// Stream for the [`StreamExt::flatten()`] method.
2197    #[derive(Clone, Debug)]
2198    #[must_use = "streams do nothing unless polled"]
2199    pub struct Flatten<S: Stream> {
2200        #[pin]
2201        stream: S,
2202        #[pin]
2203        inner_stream: Option<S::Item>,
2204    }
2205}
2206
2207impl<S, U> Stream for Flatten<S>
2208where
2209    S: Stream<Item = U>,
2210    U: Stream,
2211{
2212    type Item = U::Item;
2213
2214    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
2215        let mut this = self.project();
2216        loop {
2217            if let Some(inner) = this.inner_stream.as_mut().as_pin_mut() {
2218                match ready!(inner.poll_next(cx)) {
2219                    Some(item) => return Poll::Ready(Some(item)),
2220                    None => this.inner_stream.set(None),
2221                }
2222            }
2223
2224            match ready!(this.stream.as_mut().poll_next(cx)) {
2225                Some(inner) => this.inner_stream.set(Some(inner)),
2226                None => return Poll::Ready(None),
2227            }
2228        }
2229    }
2230}
2231
2232pin_project! {
2233    /// Stream for the [`StreamExt::then()`] method.
2234    #[derive(Clone, Debug)]
2235    #[must_use = "streams do nothing unless polled"]
2236    pub struct Then<S, F, Fut> {
2237        #[pin]
2238        stream: S,
2239        #[pin]
2240        future: Option<Fut>,
2241        f: F,
2242    }
2243}
2244
2245impl<S, F, Fut> Stream for Then<S, F, Fut>
2246where
2247    S: Stream,
2248    F: FnMut(S::Item) -> Fut,
2249    Fut: Future,
2250{
2251    type Item = Fut::Output;
2252
2253    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
2254        let mut this = self.project();
2255
2256        loop {
2257            if let Some(fut) = this.future.as_mut().as_pin_mut() {
2258                let item = ready!(fut.poll(cx));
2259                this.future.set(None);
2260                return Poll::Ready(Some(item));
2261            } else if let Some(item) = ready!(this.stream.as_mut().poll_next(cx)) {
2262                this.future.set(Some((this.f)(item)));
2263            } else {
2264                return Poll::Ready(None);
2265            }
2266        }
2267    }
2268
2269    fn size_hint(&self) -> (usize, Option<usize>) {
2270        let future_len = self.future.is_some() as usize;
2271        let (lower, upper) = self.stream.size_hint();
2272        let lower = lower.saturating_add(future_len);
2273        let upper = upper.and_then(|u| u.checked_add(future_len));
2274        (lower, upper)
2275    }
2276}
2277
2278pin_project! {
2279    /// Stream for the [`StreamExt::filter()`] method.
2280    #[derive(Clone, Debug)]
2281    #[must_use = "streams do nothing unless polled"]
2282    pub struct Filter<S, P> {
2283        #[pin]
2284        stream: S,
2285        predicate: P,
2286    }
2287}
2288
2289impl<S, P> Stream for Filter<S, P>
2290where
2291    S: Stream,
2292    P: FnMut(&S::Item) -> bool,
2293{
2294    type Item = S::Item;
2295
2296    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
2297        let mut this = self.project();
2298        loop {
2299            match ready!(this.stream.as_mut().poll_next(cx)) {
2300                None => return Poll::Ready(None),
2301                Some(v) if (this.predicate)(&v) => return Poll::Ready(Some(v)),
2302                Some(_) => {}
2303            }
2304        }
2305    }
2306}
2307
2308/// Merges two streams, preferring items from `stream1` whenever both streams are ready.
2309///
2310/// # Examples
2311///
2312/// ```
2313/// use futures_lite::stream::{self, once, pending, StreamExt};
2314///
2315/// # spin_on::spin_on(async {
2316/// assert_eq!(stream::or(once(1), pending()).next().await, Some(1));
2317/// assert_eq!(stream::or(pending(), once(2)).next().await, Some(2));
2318///
2319/// // The first stream wins.
2320/// assert_eq!(stream::or(once(1), once(2)).next().await, Some(1));
2321/// # })
2322/// ```
2323pub fn or<T, S1, S2>(stream1: S1, stream2: S2) -> Or<S1, S2>
2324where
2325    S1: Stream<Item = T>,
2326    S2: Stream<Item = T>,
2327{
2328    Or { stream1, stream2 }
2329}
2330
2331pin_project! {
2332    /// Stream for the [`or()`] function and the [`StreamExt::or()`] method.
2333    #[derive(Clone, Debug)]
2334    #[must_use = "streams do nothing unless polled"]
2335    pub struct Or<S1, S2> {
2336        #[pin]
2337        stream1: S1,
2338        #[pin]
2339        stream2: S2,
2340    }
2341}
2342
2343impl<T, S1, S2> Stream for Or<S1, S2>
2344where
2345    S1: Stream<Item = T>,
2346    S2: Stream<Item = T>,
2347{
2348    type Item = T;
2349
2350    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
2351        let mut this = self.project();
2352
2353        if let Poll::Ready(Some(t)) = this.stream1.as_mut().poll_next(cx) {
2354            return Poll::Ready(Some(t));
2355        }
2356        this.stream2.as_mut().poll_next(cx)
2357    }
2358}
2359
2360/// Merges two streams, with no preference for either stream when both are ready.
2361///
2362/// # Examples
2363///
2364/// ```
2365/// use futures_lite::stream::{self, once, pending, StreamExt};
2366///
2367/// # spin_on::spin_on(async {
2368/// assert_eq!(stream::race(once(1), pending()).next().await, Some(1));
2369/// assert_eq!(stream::race(pending(), once(2)).next().await, Some(2));
2370///
2371/// // One of the two stream is randomly chosen as the winner.
2372/// let res = stream::race(once(1), once(2)).next().await;
2373/// # })
2374#[cfg(feature = "std")]
2375pub fn race<T, S1, S2>(stream1: S1, stream2: S2) -> Race<S1, S2>
2376where
2377    S1: Stream<Item = T>,
2378    S2: Stream<Item = T>,
2379{
2380    Race { stream1, stream2 }
2381}
2382
2383#[cfg(feature = "std")]
2384pin_project! {
2385    /// Stream for the [`race()`] function and the [`StreamExt::race()`] method.
2386    #[derive(Clone, Debug)]
2387    #[must_use = "streams do nothing unless polled"]
2388    pub struct Race<S1, S2> {
2389        #[pin]
2390        stream1: S1,
2391        #[pin]
2392        stream2: S2,
2393    }
2394}
2395
2396#[cfg(feature = "std")]
2397impl<T, S1, S2> Stream for Race<S1, S2>
2398where
2399    S1: Stream<Item = T>,
2400    S2: Stream<Item = T>,
2401{
2402    type Item = T;
2403
2404    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
2405        let mut this = self.project();
2406
2407        if fastrand::bool() {
2408            if let Poll::Ready(Some(t)) = this.stream1.as_mut().poll_next(cx) {
2409                return Poll::Ready(Some(t));
2410            }
2411            if let Poll::Ready(Some(t)) = this.stream2.as_mut().poll_next(cx) {
2412                return Poll::Ready(Some(t));
2413            }
2414        } else {
2415            if let Poll::Ready(Some(t)) = this.stream2.as_mut().poll_next(cx) {
2416                return Poll::Ready(Some(t));
2417            }
2418            if let Poll::Ready(Some(t)) = this.stream1.as_mut().poll_next(cx) {
2419                return Poll::Ready(Some(t));
2420            }
2421        }
2422        Poll::Pending
2423    }
2424}
2425
2426pin_project! {
2427    /// Stream for the [`StreamExt::filter_map()`] method.
2428    #[derive(Clone, Debug)]
2429    #[must_use = "streams do nothing unless polled"]
2430    pub struct FilterMap<S, F> {
2431        #[pin]
2432        stream: S,
2433        f: F,
2434    }
2435}
2436
2437impl<S, F, T> Stream for FilterMap<S, F>
2438where
2439    S: Stream,
2440    F: FnMut(S::Item) -> Option<T>,
2441{
2442    type Item = T;
2443
2444    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
2445        let mut this = self.project();
2446        loop {
2447            match ready!(this.stream.as_mut().poll_next(cx)) {
2448                None => return Poll::Ready(None),
2449                Some(v) => {
2450                    if let Some(t) = (this.f)(v) {
2451                        return Poll::Ready(Some(t));
2452                    }
2453                }
2454            }
2455        }
2456    }
2457}
2458
2459pin_project! {
2460    /// Stream for the [`StreamExt::take()`] method.
2461    #[derive(Clone, Debug)]
2462    #[must_use = "streams do nothing unless polled"]
2463    pub struct Take<S> {
2464        #[pin]
2465        stream: S,
2466        n: usize,
2467    }
2468}
2469
2470impl<S: Stream> Stream for Take<S> {
2471    type Item = S::Item;
2472
2473    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<S::Item>> {
2474        let this = self.project();
2475
2476        if *this.n == 0 {
2477            Poll::Ready(None)
2478        } else {
2479            let next = ready!(this.stream.poll_next(cx));
2480            match next {
2481                Some(_) => *this.n -= 1,
2482                None => *this.n = 0,
2483            }
2484            Poll::Ready(next)
2485        }
2486    }
2487}
2488
2489pin_project! {
2490    /// Stream for the [`StreamExt::take_while()`] method.
2491    #[derive(Clone, Debug)]
2492    #[must_use = "streams do nothing unless polled"]
2493    pub struct TakeWhile<S, P> {
2494        #[pin]
2495        stream: S,
2496        predicate: P,
2497    }
2498}
2499
2500impl<S, P> Stream for TakeWhile<S, P>
2501where
2502    S: Stream,
2503    P: FnMut(&S::Item) -> bool,
2504{
2505    type Item = S::Item;
2506
2507    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
2508        let this = self.project();
2509
2510        match ready!(this.stream.poll_next(cx)) {
2511            Some(v) => {
2512                if (this.predicate)(&v) {
2513                    Poll::Ready(Some(v))
2514                } else {
2515                    Poll::Ready(None)
2516                }
2517            }
2518            None => Poll::Ready(None),
2519        }
2520    }
2521}
2522
2523pin_project! {
2524    /// Stream for the [`StreamExt::skip()`] method.
2525    #[derive(Clone, Debug)]
2526    #[must_use = "streams do nothing unless polled"]
2527    pub struct Skip<S> {
2528        #[pin]
2529        stream: S,
2530        n: usize,
2531    }
2532}
2533
2534impl<S: Stream> Stream for Skip<S> {
2535    type Item = S::Item;
2536
2537    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
2538        let mut this = self.project();
2539        loop {
2540            match ready!(this.stream.as_mut().poll_next(cx)) {
2541                Some(v) => match *this.n {
2542                    0 => return Poll::Ready(Some(v)),
2543                    _ => *this.n -= 1,
2544                },
2545                None => return Poll::Ready(None),
2546            }
2547        }
2548    }
2549}
2550
2551pin_project! {
2552    /// Stream for the [`StreamExt::skip_while()`] method.
2553    #[derive(Clone, Debug)]
2554    #[must_use = "streams do nothing unless polled"]
2555    pub struct SkipWhile<S, P> {
2556        #[pin]
2557        stream: S,
2558        predicate: Option<P>,
2559    }
2560}
2561
2562impl<S, P> Stream for SkipWhile<S, P>
2563where
2564    S: Stream,
2565    P: FnMut(&S::Item) -> bool,
2566{
2567    type Item = S::Item;
2568
2569    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
2570        let mut this = self.project();
2571        loop {
2572            match ready!(this.stream.as_mut().poll_next(cx)) {
2573                Some(v) => match this.predicate {
2574                    Some(p) => {
2575                        if !p(&v) {
2576                            *this.predicate = None;
2577                            return Poll::Ready(Some(v));
2578                        }
2579                    }
2580                    None => return Poll::Ready(Some(v)),
2581                },
2582                None => return Poll::Ready(None),
2583            }
2584        }
2585    }
2586}
2587
2588pin_project! {
2589    /// Stream for the [`StreamExt::step_by()`] method.
2590    #[derive(Clone, Debug)]
2591    #[must_use = "streams do nothing unless polled"]
2592    pub struct StepBy<S> {
2593        #[pin]
2594        stream: S,
2595        step: usize,
2596        i: usize,
2597    }
2598}
2599
2600impl<S: Stream> Stream for StepBy<S> {
2601    type Item = S::Item;
2602
2603    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
2604        let mut this = self.project();
2605        loop {
2606            match ready!(this.stream.as_mut().poll_next(cx)) {
2607                Some(v) => {
2608                    if *this.i == 0 {
2609                        *this.i = *this.step - 1;
2610                        return Poll::Ready(Some(v));
2611                    } else {
2612                        *this.i -= 1;
2613                    }
2614                }
2615                None => return Poll::Ready(None),
2616            }
2617        }
2618    }
2619}
2620
2621pin_project! {
2622    /// Stream for the [`StreamExt::chain()`] method.
2623    #[derive(Clone, Debug)]
2624    #[must_use = "streams do nothing unless polled"]
2625    pub struct Chain<S, U> {
2626        #[pin]
2627        first: Fuse<S>,
2628        #[pin]
2629        second: Fuse<U>,
2630    }
2631}
2632
2633impl<S: Stream, U: Stream<Item = S::Item>> Stream for Chain<S, U> {
2634    type Item = S::Item;
2635
2636    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
2637        let mut this = self.project();
2638
2639        if !this.first.done {
2640            let next = ready!(this.first.as_mut().poll_next(cx));
2641            if let Some(next) = next {
2642                return Poll::Ready(Some(next));
2643            }
2644        }
2645
2646        if !this.second.done {
2647            let next = ready!(this.second.as_mut().poll_next(cx));
2648            if let Some(next) = next {
2649                return Poll::Ready(Some(next));
2650            }
2651        }
2652
2653        if this.first.done && this.second.done {
2654            Poll::Ready(None)
2655        } else {
2656            Poll::Pending
2657        }
2658    }
2659}
2660
2661pin_project! {
2662    /// Stream for the [`StreamExt::cloned()`] method.
2663    #[derive(Clone, Debug)]
2664    #[must_use = "streams do nothing unless polled"]
2665    pub struct Cloned<S> {
2666        #[pin]
2667        stream: S,
2668    }
2669}
2670
2671impl<'a, S, T: 'a> Stream for Cloned<S>
2672where
2673    S: Stream<Item = &'a T>,
2674    T: Clone,
2675{
2676    type Item = T;
2677
2678    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
2679        let this = self.project();
2680        let next = ready!(this.stream.poll_next(cx));
2681        Poll::Ready(next.cloned())
2682    }
2683}
2684
2685pin_project! {
2686    /// Stream for the [`StreamExt::copied()`] method.
2687    #[derive(Clone, Debug)]
2688    #[must_use = "streams do nothing unless polled"]
2689    pub struct Copied<S> {
2690        #[pin]
2691        stream: S,
2692    }
2693}
2694
2695impl<'a, S, T: 'a> Stream for Copied<S>
2696where
2697    S: Stream<Item = &'a T>,
2698    T: Copy,
2699{
2700    type Item = T;
2701
2702    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
2703        let this = self.project();
2704        let next = ready!(this.stream.poll_next(cx));
2705        Poll::Ready(next.copied())
2706    }
2707}
2708
2709pin_project! {
2710    /// Stream for the [`StreamExt::cycle()`] method.
2711    #[derive(Clone, Debug)]
2712    #[must_use = "streams do nothing unless polled"]
2713    pub struct Cycle<S> {
2714        orig: S,
2715        #[pin]
2716        stream: S,
2717    }
2718}
2719
2720impl<S> Stream for Cycle<S>
2721where
2722    S: Stream + Clone,
2723{
2724    type Item = S::Item;
2725
2726    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
2727        match ready!(self.as_mut().project().stream.as_mut().poll_next(cx)) {
2728            Some(item) => Poll::Ready(Some(item)),
2729            None => {
2730                let new = self.as_mut().orig.clone();
2731                self.as_mut().project().stream.set(new);
2732                self.project().stream.poll_next(cx)
2733            }
2734        }
2735    }
2736}
2737
2738pin_project! {
2739    /// Stream for the [`StreamExt::enumerate()`] method.
2740    #[derive(Clone, Debug)]
2741    #[must_use = "streams do nothing unless polled"]
2742    pub struct Enumerate<S> {
2743        #[pin]
2744        stream: S,
2745        i: usize,
2746    }
2747}
2748
2749impl<S> Stream for Enumerate<S>
2750where
2751    S: Stream,
2752{
2753    type Item = (usize, S::Item);
2754
2755    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
2756        let this = self.project();
2757
2758        match ready!(this.stream.poll_next(cx)) {
2759            Some(v) => {
2760                let ret = (*this.i, v);
2761                *this.i += 1;
2762                Poll::Ready(Some(ret))
2763            }
2764            None => Poll::Ready(None),
2765        }
2766    }
2767}
2768
2769pin_project! {
2770    /// Stream for the [`StreamExt::inspect()`] method.
2771    #[derive(Clone, Debug)]
2772    #[must_use = "streams do nothing unless polled"]
2773    pub struct Inspect<S, F> {
2774        #[pin]
2775        stream: S,
2776        f: F,
2777    }
2778}
2779
2780impl<S, F> Stream for Inspect<S, F>
2781where
2782    S: Stream,
2783    F: FnMut(&S::Item),
2784{
2785    type Item = S::Item;
2786
2787    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
2788        let mut this = self.project();
2789        let next = ready!(this.stream.as_mut().poll_next(cx));
2790        if let Some(x) = &next {
2791            (this.f)(x);
2792        }
2793        Poll::Ready(next)
2794    }
2795}
2796
2797/// Future for the [`StreamExt::nth()`] method.
2798#[derive(Debug)]
2799#[must_use = "futures do nothing unless you `.await` or poll them"]
2800pub struct NthFuture<'a, S: ?Sized> {
2801    stream: &'a mut S,
2802    n: usize,
2803}
2804
2805impl<S: Unpin + ?Sized> Unpin for NthFuture<'_, S> {}
2806
2807impl<'a, S> Future for NthFuture<'a, S>
2808where
2809    S: Stream + Unpin + ?Sized,
2810{
2811    type Output = Option<S::Item>;
2812
2813    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
2814        loop {
2815            match ready!(self.stream.poll_next(cx)) {
2816                Some(v) => match self.n {
2817                    0 => return Poll::Ready(Some(v)),
2818                    _ => self.n -= 1,
2819                },
2820                None => return Poll::Ready(None),
2821            }
2822        }
2823    }
2824}
2825
2826pin_project! {
2827    /// Future for the [`StreamExt::last()`] method.
2828    #[derive(Debug)]
2829    #[must_use = "futures do nothing unless you `.await` or poll them"]
2830    pub struct LastFuture<S: Stream> {
2831        #[pin]
2832        stream: S,
2833        last: Option<S::Item>,
2834    }
2835}
2836
2837impl<S: Stream> Future for LastFuture<S> {
2838    type Output = Option<S::Item>;
2839
2840    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
2841        let mut this = self.project();
2842        loop {
2843            match ready!(this.stream.as_mut().poll_next(cx)) {
2844                Some(new) => *this.last = Some(new),
2845                None => return Poll::Ready(this.last.take()),
2846            }
2847        }
2848    }
2849}
2850
2851/// Future for the [`StreamExt::find()`] method.
2852#[derive(Debug)]
2853#[must_use = "futures do nothing unless you `.await` or poll them"]
2854pub struct FindFuture<'a, S: ?Sized, P> {
2855    stream: &'a mut S,
2856    predicate: P,
2857}
2858
2859impl<S: Unpin + ?Sized, P> Unpin for FindFuture<'_, S, P> {}
2860
2861impl<'a, S, P> Future for FindFuture<'a, S, P>
2862where
2863    S: Stream + Unpin + ?Sized,
2864    P: FnMut(&S::Item) -> bool,
2865{
2866    type Output = Option<S::Item>;
2867
2868    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
2869        loop {
2870            match ready!(self.stream.poll_next(cx)) {
2871                Some(v) if (&mut self.predicate)(&v) => return Poll::Ready(Some(v)),
2872                Some(_) => {}
2873                None => return Poll::Ready(None),
2874            }
2875        }
2876    }
2877}
2878
2879/// Future for the [`StreamExt::find_map()`] method.
2880#[derive(Debug)]
2881#[must_use = "futures do nothing unless you `.await` or poll them"]
2882pub struct FindMapFuture<'a, S: ?Sized, F> {
2883    stream: &'a mut S,
2884    f: F,
2885}
2886
2887impl<S: Unpin + ?Sized, F> Unpin for FindMapFuture<'_, S, F> {}
2888
2889impl<'a, S, B, F> Future for FindMapFuture<'a, S, F>
2890where
2891    S: Stream + Unpin + ?Sized,
2892    F: FnMut(S::Item) -> Option<B>,
2893{
2894    type Output = Option<B>;
2895
2896    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
2897        loop {
2898            match ready!(self.stream.poll_next(cx)) {
2899                Some(v) => {
2900                    if let Some(v) = (&mut self.f)(v) {
2901                        return Poll::Ready(Some(v));
2902                    }
2903                }
2904                None => return Poll::Ready(None),
2905            }
2906        }
2907    }
2908}
2909
2910/// Future for the [`StreamExt::position()`] method.
2911#[derive(Debug)]
2912#[must_use = "futures do nothing unless you `.await` or poll them"]
2913pub struct PositionFuture<'a, S: ?Sized, P> {
2914    stream: &'a mut S,
2915    predicate: P,
2916    index: usize,
2917}
2918
2919impl<'a, S: Unpin + ?Sized, P> Unpin for PositionFuture<'a, S, P> {}
2920
2921impl<'a, S, P> Future for PositionFuture<'a, S, P>
2922where
2923    S: Stream + Unpin + ?Sized,
2924    P: FnMut(S::Item) -> bool,
2925{
2926    type Output = Option<usize>;
2927
2928    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
2929        loop {
2930            match ready!(self.stream.poll_next(cx)) {
2931                Some(v) => {
2932                    if (&mut self.predicate)(v) {
2933                        return Poll::Ready(Some(self.index));
2934                    } else {
2935                        self.index += 1;
2936                    }
2937                }
2938                None => return Poll::Ready(None),
2939            }
2940        }
2941    }
2942}
2943
2944/// Future for the [`StreamExt::all()`] method.
2945#[derive(Debug)]
2946#[must_use = "futures do nothing unless you `.await` or poll them"]
2947pub struct AllFuture<'a, S: ?Sized, P> {
2948    stream: &'a mut S,
2949    predicate: P,
2950}
2951
2952impl<S: Unpin + ?Sized, P> Unpin for AllFuture<'_, S, P> {}
2953
2954impl<S, P> Future for AllFuture<'_, S, P>
2955where
2956    S: Stream + Unpin + ?Sized,
2957    P: FnMut(S::Item) -> bool,
2958{
2959    type Output = bool;
2960
2961    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
2962        loop {
2963            match ready!(self.stream.poll_next(cx)) {
2964                Some(v) => {
2965                    if !(&mut self.predicate)(v) {
2966                        return Poll::Ready(false);
2967                    }
2968                }
2969                None => return Poll::Ready(true),
2970            }
2971        }
2972    }
2973}
2974
2975/// Future for the [`StreamExt::any()`] method.
2976#[derive(Debug)]
2977#[must_use = "futures do nothing unless you `.await` or poll them"]
2978pub struct AnyFuture<'a, S: ?Sized, P> {
2979    stream: &'a mut S,
2980    predicate: P,
2981}
2982
2983impl<S: Unpin + ?Sized, P> Unpin for AnyFuture<'_, S, P> {}
2984
2985impl<S, P> Future for AnyFuture<'_, S, P>
2986where
2987    S: Stream + Unpin + ?Sized,
2988    P: FnMut(S::Item) -> bool,
2989{
2990    type Output = bool;
2991
2992    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
2993        loop {
2994            match ready!(self.stream.poll_next(cx)) {
2995                Some(v) => {
2996                    if (&mut self.predicate)(v) {
2997                        return Poll::Ready(true);
2998                    }
2999                }
3000                None => return Poll::Ready(false),
3001            }
3002        }
3003    }
3004}
3005
3006pin_project! {
3007    /// Future for the [`StreamExt::for_each()`] method.
3008    #[derive(Debug)]
3009    #[must_use = "futures do nothing unless you `.await` or poll them"]
3010    pub struct ForEachFuture<S, F> {
3011        #[pin]
3012        stream: S,
3013        f: F,
3014    }
3015}
3016
3017impl<S, F> Future for ForEachFuture<S, F>
3018where
3019    S: Stream,
3020    F: FnMut(S::Item),
3021{
3022    type Output = ();
3023
3024    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
3025        let mut this = self.project();
3026        loop {
3027            match ready!(this.stream.as_mut().poll_next(cx)) {
3028                Some(v) => (this.f)(v),
3029                None => return Poll::Ready(()),
3030            }
3031        }
3032    }
3033}
3034
3035/// Future for the [`StreamExt::try_for_each()`] method.
3036#[derive(Debug)]
3037#[must_use = "futures do nothing unless you `.await` or poll them"]
3038pub struct TryForEachFuture<'a, S: ?Sized, F> {
3039    stream: &'a mut S,
3040    f: F,
3041}
3042
3043impl<'a, S: Unpin + ?Sized, F> Unpin for TryForEachFuture<'a, S, F> {}
3044
3045impl<'a, S, F, E> Future for TryForEachFuture<'a, S, F>
3046where
3047    S: Stream + Unpin + ?Sized,
3048    F: FnMut(S::Item) -> Result<(), E>,
3049{
3050    type Output = Result<(), E>;
3051
3052    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
3053        loop {
3054            match ready!(self.stream.poll_next(cx)) {
3055                None => return Poll::Ready(Ok(())),
3056                Some(v) => (&mut self.f)(v)?,
3057            }
3058        }
3059    }
3060}
3061
3062pin_project! {
3063    /// Stream for the [`StreamExt::zip()`] method.
3064    #[derive(Clone, Debug)]
3065    #[must_use = "streams do nothing unless polled"]
3066    pub struct Zip<A: Stream, B> {
3067        item_slot: Option<A::Item>,
3068        #[pin]
3069        first: A,
3070        #[pin]
3071        second: B,
3072    }
3073}
3074
3075impl<A: Stream, B: Stream> Stream for Zip<A, B> {
3076    type Item = (A::Item, B::Item);
3077
3078    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
3079        let this = self.project();
3080
3081        if this.item_slot.is_none() {
3082            match this.first.poll_next(cx) {
3083                Poll::Pending => return Poll::Pending,
3084                Poll::Ready(None) => return Poll::Ready(None),
3085                Poll::Ready(Some(item)) => *this.item_slot = Some(item),
3086            }
3087        }
3088
3089        let second_item = ready!(this.second.poll_next(cx));
3090        let first_item = this.item_slot.take().unwrap();
3091        Poll::Ready(second_item.map(|second_item| (first_item, second_item)))
3092    }
3093}
3094
3095pin_project! {
3096    /// Future for the [`StreamExt::unzip()`] method.
3097    #[derive(Debug)]
3098    #[must_use = "futures do nothing unless you `.await` or poll them"]
3099    pub struct UnzipFuture<S, FromA, FromB> {
3100        #[pin]
3101        stream: S,
3102        res: Option<(FromA, FromB)>,
3103    }
3104}
3105
3106impl<S, A, B, FromA, FromB> Future for UnzipFuture<S, FromA, FromB>
3107where
3108    S: Stream<Item = (A, B)>,
3109    FromA: Default + Extend<A>,
3110    FromB: Default + Extend<B>,
3111{
3112    type Output = (FromA, FromB);
3113
3114    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
3115        let mut this = self.project();
3116
3117        loop {
3118            match ready!(this.stream.as_mut().poll_next(cx)) {
3119                Some((a, b)) => {
3120                    let res = this.res.as_mut().unwrap();
3121                    res.0.extend(Some(a));
3122                    res.1.extend(Some(b));
3123                }
3124                None => return Poll::Ready(this.res.take().unwrap()),
3125            }
3126        }
3127    }
3128}