1#[cfg(feature = "alloc")]
19extern crate alloc;
20
21#[doc(no_inline)]
22pub use futures_core::stream::Stream;
23
24#[cfg(feature = "alloc")]
25use alloc::boxed::Box;
26
27use core::fmt;
28use core::future::Future;
29use core::marker::PhantomData;
30use core::mem;
31use core::pin::Pin;
32use core::task::{Context, Poll};
33
34use pin_project_lite::pin_project;
35
36use crate::ready;
37
38#[cfg(feature = "std")]
53pub fn block_on<S: Stream + Unpin>(stream: S) -> BlockOn<S> {
54 BlockOn(stream)
55}
56
57#[derive(Debug)]
59pub struct BlockOn<S>(S);
60
61#[cfg(feature = "std")]
62impl<S: Stream + Unpin> Iterator for BlockOn<S> {
63 type Item = S::Item;
64
65 fn next(&mut self) -> Option<Self::Item> {
66 crate::future::block_on(self.0.next())
67 }
68
69 fn size_hint(&self) -> (usize, Option<usize>) {
70 self.0.size_hint()
71 }
72
73 fn count(self) -> usize {
74 crate::future::block_on(self.0.count())
75 }
76
77 fn last(self) -> Option<Self::Item> {
78 crate::future::block_on(self.0.last())
79 }
80
81 fn nth(&mut self, n: usize) -> Option<Self::Item> {
82 crate::future::block_on(self.0.nth(n))
83 }
84
85 fn fold<B, F>(self, init: B, f: F) -> B
86 where
87 F: FnMut(B, Self::Item) -> B,
88 {
89 crate::future::block_on(self.0.fold(init, f))
90 }
91
92 fn for_each<F>(self, f: F) -> F::Output
93 where
94 F: FnMut(Self::Item),
95 {
96 crate::future::block_on(self.0.for_each(f))
97 }
98
99 fn all<F>(&mut self, f: F) -> bool
100 where
101 F: FnMut(Self::Item) -> bool,
102 {
103 crate::future::block_on(self.0.all(f))
104 }
105
106 fn any<F>(&mut self, f: F) -> bool
107 where
108 F: FnMut(Self::Item) -> bool,
109 {
110 crate::future::block_on(self.0.any(f))
111 }
112
113 fn find<P>(&mut self, predicate: P) -> Option<Self::Item>
114 where
115 P: FnMut(&Self::Item) -> bool,
116 {
117 crate::future::block_on(self.0.find(predicate))
118 }
119
120 fn find_map<B, F>(&mut self, f: F) -> Option<B>
121 where
122 F: FnMut(Self::Item) -> Option<B>,
123 {
124 crate::future::block_on(self.0.find_map(f))
125 }
126
127 fn position<P>(&mut self, predicate: P) -> Option<usize>
128 where
129 P: FnMut(Self::Item) -> bool,
130 {
131 crate::future::block_on(self.0.position(predicate))
132 }
133}
134
135pub fn empty<T>() -> Empty<T> {
148 Empty {
149 _marker: PhantomData,
150 }
151}
152
153#[derive(Clone, Debug)]
155#[must_use = "streams do nothing unless polled"]
156pub struct Empty<T> {
157 _marker: PhantomData<T>,
158}
159
160impl<T> Unpin for Empty<T> {}
161
162impl<T> Stream for Empty<T> {
163 type Item = T;
164
165 fn poll_next(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Option<Self::Item>> {
166 Poll::Ready(None)
167 }
168
169 fn size_hint(&self) -> (usize, Option<usize>) {
170 (0, Some(0))
171 }
172}
173
174pub fn iter<I: IntoIterator>(iter: I) -> Iter<I::IntoIter> {
190 Iter {
191 iter: iter.into_iter(),
192 }
193}
194
195#[derive(Clone, Debug)]
197#[must_use = "streams do nothing unless polled"]
198pub struct Iter<I> {
199 iter: I,
200}
201
202impl<I> Unpin for Iter<I> {}
203
204impl<I: Iterator> Stream for Iter<I> {
205 type Item = I::Item;
206
207 fn poll_next(mut self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
208 Poll::Ready(self.iter.next())
209 }
210
211 fn size_hint(&self) -> (usize, Option<usize>) {
212 self.iter.size_hint()
213 }
214}
215
216pub fn once<T>(t: T) -> Once<T> {
231 Once { value: Some(t) }
232}
233
234pin_project! {
235 #[derive(Clone, Debug)]
237 #[must_use = "streams do nothing unless polled"]
238 pub struct Once<T> {
239 value: Option<T>,
240 }
241}
242
243impl<T> Stream for Once<T> {
244 type Item = T;
245
246 fn poll_next(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Option<T>> {
247 Poll::Ready(self.project().value.take())
248 }
249
250 fn size_hint(&self) -> (usize, Option<usize>) {
251 if self.value.is_some() {
252 (1, Some(1))
253 } else {
254 (0, Some(0))
255 }
256 }
257}
258
259pub fn pending<T>() -> Pending<T> {
273 Pending {
274 _marker: PhantomData,
275 }
276}
277
278#[derive(Clone, Debug)]
280#[must_use = "streams do nothing unless polled"]
281pub struct Pending<T> {
282 _marker: PhantomData<T>,
283}
284
285impl<T> Unpin for Pending<T> {}
286
287impl<T> Stream for Pending<T> {
288 type Item = T;
289
290 fn poll_next(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Option<T>> {
291 Poll::Pending
292 }
293
294 fn size_hint(&self) -> (usize, Option<usize>) {
295 (0, Some(0))
296 }
297}
298
299pub fn poll_fn<T, F>(f: F) -> PollFn<F>
316where
317 F: FnMut(&mut Context<'_>) -> Poll<Option<T>>,
318{
319 PollFn { f }
320}
321
322#[derive(Clone)]
324#[must_use = "streams do nothing unless polled"]
325pub struct PollFn<F> {
326 f: F,
327}
328
329impl<F> Unpin for PollFn<F> {}
330
331impl<F> fmt::Debug for PollFn<F> {
332 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
333 f.debug_struct("PollFn").finish()
334 }
335}
336
337impl<T, F> Stream for PollFn<F>
338where
339 F: FnMut(&mut Context<'_>) -> Poll<Option<T>>,
340{
341 type Item = T;
342
343 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<T>> {
344 (&mut self.f)(cx)
345 }
346}
347
348pub fn repeat<T: Clone>(item: T) -> Repeat<T> {
363 Repeat { item }
364}
365
366#[derive(Clone, Debug)]
368#[must_use = "streams do nothing unless polled"]
369pub struct Repeat<T> {
370 item: T,
371}
372
373impl<T> Unpin for Repeat<T> {}
374
375impl<T: Clone> Stream for Repeat<T> {
376 type Item = T;
377
378 fn poll_next(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Option<Self::Item>> {
379 Poll::Ready(Some(self.item.clone()))
380 }
381
382 fn size_hint(&self) -> (usize, Option<usize>) {
383 (usize::max_value(), None)
384 }
385}
386
387pub fn repeat_with<T, F>(repeater: F) -> RepeatWith<F>
402where
403 F: FnMut() -> T,
404{
405 RepeatWith { f: repeater }
406}
407
408#[derive(Clone, Debug)]
410#[must_use = "streams do nothing unless polled"]
411pub struct RepeatWith<F> {
412 f: F,
413}
414
415impl<F> Unpin for RepeatWith<F> {}
416
417impl<T, F> Stream for RepeatWith<F>
418where
419 F: FnMut() -> T,
420{
421 type Item = T;
422
423 fn poll_next(mut self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Option<Self::Item>> {
424 let item = (&mut self.f)();
425 Poll::Ready(Some(item))
426 }
427
428 fn size_hint(&self) -> (usize, Option<usize>) {
429 (usize::max_value(), None)
430 }
431}
432
433pub fn unfold<T, F, Fut, Item>(seed: T, f: F) -> Unfold<T, F, Fut>
455where
456 F: FnMut(T) -> Fut,
457 Fut: Future<Output = Option<(Item, T)>>,
458{
459 Unfold {
460 f,
461 state: Some(seed),
462 fut: None,
463 }
464}
465
466pin_project! {
467 #[derive(Clone)]
469 #[must_use = "streams do nothing unless polled"]
470 pub struct Unfold<T, F, Fut> {
471 f: F,
472 state: Option<T>,
473 #[pin]
474 fut: Option<Fut>,
475 }
476}
477
478impl<T, F, Fut> fmt::Debug for Unfold<T, F, Fut>
479where
480 T: fmt::Debug,
481 Fut: fmt::Debug,
482{
483 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
484 f.debug_struct("Unfold")
485 .field("state", &self.state)
486 .field("fut", &self.fut)
487 .finish()
488 }
489}
490
491impl<T, F, Fut, Item> Stream for Unfold<T, F, Fut>
492where
493 F: FnMut(T) -> Fut,
494 Fut: Future<Output = Option<(Item, T)>>,
495{
496 type Item = Item;
497
498 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
499 let mut this = self.project();
500
501 if let Some(state) = this.state.take() {
502 this.fut.set(Some((this.f)(state)));
503 }
504
505 let step = ready!(this
506 .fut
507 .as_mut()
508 .as_pin_mut()
509 .expect("`Unfold` must not be polled after it returned `Poll::Ready(None)`")
510 .poll(cx));
511 this.fut.set(None);
512
513 if let Some((item, next_state)) = step {
514 *this.state = Some(next_state);
515 Poll::Ready(Some(item))
516 } else {
517 Poll::Ready(None)
518 }
519 }
520}
521
522pub fn try_unfold<T, E, F, Fut, Item>(init: T, f: F) -> TryUnfold<T, F, Fut>
544where
545 F: FnMut(T) -> Fut,
546 Fut: Future<Output = Result<Option<(Item, T)>, E>>,
547{
548 TryUnfold {
549 f,
550 state: Some(init),
551 fut: None,
552 }
553}
554
555pin_project! {
556 #[derive(Clone)]
558 #[must_use = "streams do nothing unless polled"]
559 pub struct TryUnfold<T, F, Fut> {
560 f: F,
561 state: Option<T>,
562 #[pin]
563 fut: Option<Fut>,
564 }
565}
566
567impl<T, F, Fut> fmt::Debug for TryUnfold<T, F, Fut>
568where
569 T: fmt::Debug,
570 Fut: fmt::Debug,
571{
572 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
573 f.debug_struct("TryUnfold")
574 .field("state", &self.state)
575 .field("fut", &self.fut)
576 .finish()
577 }
578}
579
580impl<T, E, F, Fut, Item> Stream for TryUnfold<T, F, Fut>
581where
582 F: FnMut(T) -> Fut,
583 Fut: Future<Output = Result<Option<(Item, T)>, E>>,
584{
585 type Item = Result<Item, E>;
586
587 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
588 let mut this = self.project();
589
590 if let Some(state) = this.state.take() {
591 this.fut.set(Some((this.f)(state)));
592 }
593
594 match this.fut.as_mut().as_pin_mut() {
595 None => {
596 Poll::Ready(None)
598 }
599 Some(future) => {
600 let step = ready!(future.poll(cx));
601 this.fut.set(None);
602
603 match step {
604 Ok(Some((item, next_state))) => {
605 *this.state = Some(next_state);
606 Poll::Ready(Some(Ok(item)))
607 }
608 Ok(None) => Poll::Ready(None),
609 Err(e) => Poll::Ready(Some(Err(e))),
610 }
611 }
612 }
613 }
614}
615
616pub fn once_future<F: Future>(future: F) -> OnceFuture<F> {
631 OnceFuture {
632 future: Some(future),
633 }
634}
635
636pin_project! {
637 #[derive(Debug)]
639 #[must_use = "futures do nothing unless you `.await` or poll them"]
640 pub struct OnceFuture<F> {
641 #[pin]
642 future: Option<F>,
643 }
644}
645
646impl<F: Future> Stream for OnceFuture<F> {
647 type Item = F::Output;
648
649 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
650 let mut this = self.project();
651
652 match this.future.as_mut().as_pin_mut().map(|f| f.poll(cx)) {
653 Some(Poll::Ready(t)) => {
654 this.future.set(None);
655 Poll::Ready(Some(t))
656 }
657 Some(Poll::Pending) => Poll::Pending,
658 None => Poll::Ready(None),
659 }
660 }
661}
662
663pub trait StreamExt: Stream {
665 fn poll_next(&mut self, cx: &mut Context<'_>) -> Poll<Option<Self::Item>>
667 where
668 Self: Unpin,
669 {
670 Stream::poll_next(Pin::new(self), cx)
671 }
672
673 fn next(&mut self) -> NextFuture<'_, Self>
693 where
694 Self: Unpin,
695 {
696 NextFuture { stream: self }
697 }
698
699 fn try_next<T, E>(&mut self) -> TryNextFuture<'_, Self>
721 where
722 Self: Stream<Item = Result<T, E>> + Unpin,
723 {
724 TryNextFuture { stream: self }
725 }
726
727 fn count(self) -> CountFuture<Self>
743 where
744 Self: Sized,
745 {
746 CountFuture {
747 stream: self,
748 count: 0,
749 }
750 }
751
752 fn map<T, F>(self, f: F) -> Map<Self, F>
770 where
771 Self: Sized,
772 F: FnMut(Self::Item) -> T,
773 {
774 Map { stream: self, f }
775 }
776
777 fn flat_map<U, F>(self, f: F) -> FlatMap<Self, U, F>
796 where
797 Self: Sized,
798 U: Stream,
799 F: FnMut(Self::Item) -> U,
800 {
801 FlatMap {
802 stream: self.map(f),
803 inner_stream: None,
804 }
805 }
806
807 fn flatten(self) -> Flatten<Self>
824 where
825 Self: Sized,
826 Self::Item: Stream,
827 {
828 Flatten {
829 stream: self,
830 inner_stream: None,
831 }
832 }
833
834 fn then<F, Fut>(self, f: F) -> Then<Self, F, Fut>
854 where
855 Self: Sized,
856 F: FnMut(Self::Item) -> Fut,
857 Fut: Future,
858 {
859 Then {
860 stream: self,
861 future: None,
862 f,
863 }
864 }
865
866 fn filter<P>(self, predicate: P) -> Filter<Self, P>
883 where
884 Self: Sized,
885 P: FnMut(&Self::Item) -> bool,
886 {
887 Filter {
888 stream: self,
889 predicate,
890 }
891 }
892
893 fn filter_map<T, F>(self, f: F) -> FilterMap<Self, F>
911 where
912 Self: Sized,
913 F: FnMut(Self::Item) -> Option<T>,
914 {
915 FilterMap { stream: self, f }
916 }
917
918 fn take(self, n: usize) -> Take<Self>
934 where
935 Self: Sized,
936 {
937 Take { stream: self, n }
938 }
939
940 fn take_while<P>(self, predicate: P) -> TakeWhile<Self, P>
957 where
958 Self: Sized,
959 P: FnMut(&Self::Item) -> bool,
960 {
961 TakeWhile {
962 stream: self,
963 predicate,
964 }
965 }
966
967 fn skip(self, n: usize) -> Skip<Self>
983 where
984 Self: Sized,
985 {
986 Skip { stream: self, n }
987 }
988
989 fn skip_while<P>(self, predicate: P) -> SkipWhile<Self, P>
1006 where
1007 Self: Sized,
1008 P: FnMut(&Self::Item) -> bool,
1009 {
1010 SkipWhile {
1011 stream: self,
1012 predicate: Some(predicate),
1013 }
1014 }
1015
1016 fn step_by(self, step: usize) -> StepBy<Self>
1038 where
1039 Self: Sized,
1040 {
1041 assert!(step > 0, "`step` must be greater than zero");
1042 StepBy {
1043 stream: self,
1044 step,
1045 i: 0,
1046 }
1047 }
1048
1049 fn chain<U>(self, other: U) -> Chain<Self, U>
1069 where
1070 Self: Sized,
1071 U: Stream<Item = Self::Item> + Sized,
1072 {
1073 Chain {
1074 first: self.fuse(),
1075 second: other.fuse(),
1076 }
1077 }
1078
1079 fn cloned<'a, T>(self) -> Cloned<Self>
1096 where
1097 Self: Stream<Item = &'a T> + Sized,
1098 T: Clone + 'a,
1099 {
1100 Cloned { stream: self }
1101 }
1102
1103 fn copied<'a, T>(self) -> Copied<Self>
1120 where
1121 Self: Stream<Item = &'a T> + Sized,
1122 T: Copy + 'a,
1123 {
1124 Copied { stream: self }
1125 }
1126
1127 fn collect<C>(self) -> CollectFuture<Self, C>
1142 where
1143 Self: Sized,
1144 C: Default + Extend<Self::Item>,
1145 {
1146 CollectFuture {
1147 stream: self,
1148 collection: Default::default(),
1149 }
1150 }
1151
1152 fn try_collect<T, E, C>(self) -> TryCollectFuture<Self, C>
1168 where
1169 Self: Stream<Item = Result<T, E>> + Sized,
1170 C: Default + Extend<T>,
1171 {
1172 TryCollectFuture {
1173 stream: self,
1174 items: Default::default(),
1175 }
1176 }
1177
1178 fn partition<B, P>(self, predicate: P) -> PartitionFuture<Self, P, B>
1195 where
1196 Self: Sized,
1197 B: Default + Extend<Self::Item>,
1198 P: FnMut(&Self::Item) -> bool,
1199 {
1200 PartitionFuture {
1201 stream: self,
1202 predicate,
1203 res: Some(Default::default()),
1204 }
1205 }
1206
1207 fn fold<T, F>(self, init: T, f: F) -> FoldFuture<Self, F, T>
1225 where
1226 Self: Sized,
1227 F: FnMut(T, Self::Item) -> T,
1228 {
1229 FoldFuture {
1230 stream: self,
1231 f,
1232 acc: Some(init),
1233 }
1234 }
1235
1236 fn try_fold<T, E, F, B>(&mut self, init: B, f: F) -> TryFoldFuture<'_, Self, F, B>
1263 where
1264 Self: Stream<Item = Result<T, E>> + Unpin + Sized,
1265 F: FnMut(B, T) -> Result<B, E>,
1266 {
1267 TryFoldFuture {
1268 stream: self,
1269 f,
1270 acc: Some(init),
1271 }
1272 }
1273
1274 fn scan<St, B, F>(self, initial_state: St, f: F) -> Scan<Self, St, F>
1298 where
1299 Self: Sized,
1300 F: FnMut(&mut St, Self::Item) -> Option<B>,
1301 {
1302 Scan {
1303 stream: self,
1304 state_f: (initial_state, f),
1305 }
1306 }
1307
1308 fn fuse(self) -> Fuse<Self>
1324 where
1325 Self: Sized,
1326 {
1327 Fuse {
1328 stream: self,
1329 done: false,
1330 }
1331 }
1332
1333 fn cycle(self) -> Cycle<Self>
1350 where
1351 Self: Clone + Sized,
1352 {
1353 Cycle {
1354 orig: self.clone(),
1355 stream: self,
1356 }
1357 }
1358
1359 fn enumerate(self) -> Enumerate<Self>
1377 where
1378 Self: Sized,
1379 {
1380 Enumerate { stream: self, i: 0 }
1381 }
1382
1383 fn inspect<F>(self, f: F) -> Inspect<Self, F>
1402 where
1403 Self: Sized,
1404 F: FnMut(&Self::Item),
1405 {
1406 Inspect { stream: self, f }
1407 }
1408
1409 fn nth(&mut self, n: usize) -> NthFuture<'_, Self>
1427 where
1428 Self: Unpin,
1429 {
1430 NthFuture { stream: self, n }
1431 }
1432
1433 fn last(self) -> LastFuture<Self>
1449 where
1450 Self: Sized,
1451 {
1452 LastFuture {
1453 stream: self,
1454 last: None,
1455 }
1456 }
1457
1458 fn find<P>(&mut self, predicate: P) -> FindFuture<'_, Self, P>
1473 where
1474 Self: Unpin,
1475 P: FnMut(&Self::Item) -> bool,
1476 {
1477 FindFuture {
1478 stream: self,
1479 predicate,
1480 }
1481 }
1482
1483 fn find_map<F, B>(&mut self, f: F) -> FindMapFuture<'_, Self, F>
1498 where
1499 Self: Unpin,
1500 F: FnMut(Self::Item) -> Option<B>,
1501 {
1502 FindMapFuture { stream: self, f }
1503 }
1504
1505 fn position<P>(&mut self, predicate: P) -> PositionFuture<'_, Self, P>
1521 where
1522 Self: Unpin,
1523 P: FnMut(Self::Item) -> bool,
1524 {
1525 PositionFuture {
1526 stream: self,
1527 predicate,
1528 index: 0,
1529 }
1530 }
1531
1532 fn all<P>(&mut self, predicate: P) -> AllFuture<'_, Self, P>
1553 where
1554 Self: Unpin,
1555 P: FnMut(Self::Item) -> bool,
1556 {
1557 AllFuture {
1558 stream: self,
1559 predicate,
1560 }
1561 }
1562
1563 fn any<P>(&mut self, predicate: P) -> AnyFuture<'_, Self, P>
1584 where
1585 Self: Unpin,
1586 P: FnMut(Self::Item) -> bool,
1587 {
1588 AnyFuture {
1589 stream: self,
1590 predicate,
1591 }
1592 }
1593
1594 fn for_each<F>(self, f: F) -> ForEachFuture<Self, F>
1607 where
1608 Self: Sized,
1609 F: FnMut(Self::Item),
1610 {
1611 ForEachFuture { stream: self, f }
1612 }
1613
1614 fn try_for_each<F, E>(&mut self, f: F) -> TryForEachFuture<'_, Self, F>
1641 where
1642 Self: Unpin,
1643 F: FnMut(Self::Item) -> Result<(), E>,
1644 {
1645 TryForEachFuture { stream: self, f }
1646 }
1647
1648 fn zip<U>(self, other: U) -> Zip<Self, U>
1669 where
1670 Self: Sized,
1671 U: Stream,
1672 {
1673 Zip {
1674 item_slot: None,
1675 first: self,
1676 second: other,
1677 }
1678 }
1679
1680 fn unzip<A, B, FromA, FromB>(self) -> UnzipFuture<Self, FromA, FromB>
1696 where
1697 FromA: Default + Extend<A>,
1698 FromB: Default + Extend<B>,
1699 Self: Stream<Item = (A, B)> + Sized,
1700 {
1701 UnzipFuture {
1702 stream: self,
1703 res: Some(Default::default()),
1704 }
1705 }
1706
1707 fn or<S>(self, other: S) -> Or<Self, S>
1724 where
1725 Self: Sized,
1726 S: Stream<Item = Self::Item>,
1727 {
1728 Or {
1729 stream1: self,
1730 stream2: other,
1731 }
1732 }
1733
1734 #[cfg(feature = "std")]
1751 fn race<S>(self, other: S) -> Race<Self, S>
1752 where
1753 Self: Sized,
1754 S: Stream<Item = Self::Item>,
1755 {
1756 Race {
1757 stream1: self,
1758 stream2: other,
1759 }
1760 }
1761
1762 #[cfg(feature = "alloc")]
1779 fn boxed<'a>(self) -> Pin<Box<dyn Stream<Item = Self::Item> + Send + 'a>>
1780 where
1781 Self: Send + Sized + 'a,
1782 {
1783 Box::pin(self)
1784 }
1785
1786 #[cfg(feature = "alloc")]
1803 fn boxed_local<'a>(self) -> Pin<Box<dyn Stream<Item = Self::Item> + 'a>>
1804 where
1805 Self: Sized + 'a,
1806 {
1807 Box::pin(self)
1808 }
1809}
1810
1811impl<S: Stream + ?Sized> StreamExt for S {}
1812
1813#[cfg(feature = "alloc")]
1825pub type Boxed<T> = Pin<Box<dyn Stream<Item = T> + Send + 'static>>;
1826
1827#[cfg(feature = "alloc")]
1839pub type BoxedLocal<T> = Pin<Box<dyn Stream<Item = T> + 'static>>;
1840
1841#[derive(Debug)]
1843#[must_use = "futures do nothing unless you `.await` or poll them"]
1844pub struct NextFuture<'a, S: ?Sized> {
1845 stream: &'a mut S,
1846}
1847
1848impl<S: Unpin + ?Sized> Unpin for NextFuture<'_, S> {}
1849
1850impl<S: Stream + Unpin + ?Sized> Future for NextFuture<'_, S> {
1851 type Output = Option<S::Item>;
1852
1853 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
1854 self.stream.poll_next(cx)
1855 }
1856}
1857
1858#[derive(Debug)]
1860#[must_use = "futures do nothing unless you `.await` or poll them"]
1861pub struct TryNextFuture<'a, S: ?Sized> {
1862 stream: &'a mut S,
1863}
1864
1865impl<S: Unpin + ?Sized> Unpin for TryNextFuture<'_, S> {}
1866
1867impl<T, E, S> Future for TryNextFuture<'_, S>
1868where
1869 S: Stream<Item = Result<T, E>> + Unpin + ?Sized,
1870{
1871 type Output = Result<Option<T>, E>;
1872
1873 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
1874 let res = ready!(self.stream.poll_next(cx));
1875 Poll::Ready(res.transpose())
1876 }
1877}
1878
1879pin_project! {
1880 #[derive(Debug)]
1882 #[must_use = "futures do nothing unless you `.await` or poll them"]
1883 pub struct CountFuture<S: ?Sized> {
1884 count: usize,
1885 #[pin]
1886 stream: S,
1887 }
1888}
1889
1890impl<S: Stream + ?Sized> Future for CountFuture<S> {
1891 type Output = usize;
1892
1893 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
1894 loop {
1895 match ready!(self.as_mut().project().stream.poll_next(cx)) {
1896 None => return Poll::Ready(self.count),
1897 Some(_) => *self.as_mut().project().count += 1,
1898 }
1899 }
1900 }
1901}
1902
1903pin_project! {
1904 #[derive(Debug)]
1906 #[must_use = "futures do nothing unless you `.await` or poll them"]
1907 pub struct CollectFuture<S, C> {
1908 #[pin]
1909 stream: S,
1910 collection: C,
1911 }
1912}
1913
1914impl<S, C> Future for CollectFuture<S, C>
1915where
1916 S: Stream,
1917 C: Default + Extend<S::Item>,
1918{
1919 type Output = C;
1920
1921 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<C> {
1922 let mut this = self.as_mut().project();
1923 loop {
1924 match ready!(this.stream.as_mut().poll_next(cx)) {
1925 Some(e) => this.collection.extend(Some(e)),
1926 None => return Poll::Ready(mem::take(self.project().collection)),
1927 }
1928 }
1929 }
1930}
1931
1932pin_project! {
1933 #[derive(Debug)]
1935 #[must_use = "futures do nothing unless you `.await` or poll them"]
1936 pub struct TryCollectFuture<S, C> {
1937 #[pin]
1938 stream: S,
1939 items: C,
1940 }
1941}
1942
1943impl<T, E, S, C> Future for TryCollectFuture<S, C>
1944where
1945 S: Stream<Item = Result<T, E>>,
1946 C: Default + Extend<T>,
1947{
1948 type Output = Result<C, E>;
1949
1950 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
1951 let mut this = self.project();
1952 Poll::Ready(Ok(loop {
1953 match ready!(this.stream.as_mut().poll_next(cx)?) {
1954 Some(x) => this.items.extend(Some(x)),
1955 None => break mem::take(this.items),
1956 }
1957 }))
1958 }
1959}
1960
1961pin_project! {
1962 #[derive(Debug)]
1964 #[must_use = "futures do nothing unless you `.await` or poll them"]
1965 pub struct PartitionFuture<S, P, B> {
1966 #[pin]
1967 stream: S,
1968 predicate: P,
1969 res: Option<(B, B)>,
1970 }
1971}
1972
1973impl<S, P, B> Future for PartitionFuture<S, P, B>
1974where
1975 S: Stream + Sized,
1976 P: FnMut(&S::Item) -> bool,
1977 B: Default + Extend<S::Item>,
1978{
1979 type Output = (B, B);
1980
1981 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
1982 let mut this = self.project();
1983 loop {
1984 match ready!(this.stream.as_mut().poll_next(cx)) {
1985 Some(v) => {
1986 let res = this.res.as_mut().unwrap();
1987 if (this.predicate)(&v) {
1988 res.0.extend(Some(v))
1989 } else {
1990 res.1.extend(Some(v))
1991 }
1992 }
1993 None => return Poll::Ready(this.res.take().unwrap()),
1994 }
1995 }
1996 }
1997}
1998
1999pin_project! {
2000 #[derive(Debug)]
2002 #[must_use = "futures do nothing unless you `.await` or poll them"]
2003 pub struct FoldFuture<S, F, T> {
2004 #[pin]
2005 stream: S,
2006 f: F,
2007 acc: Option<T>,
2008 }
2009}
2010
2011impl<S, F, T> Future for FoldFuture<S, F, T>
2012where
2013 S: Stream,
2014 F: FnMut(T, S::Item) -> T,
2015{
2016 type Output = T;
2017
2018 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
2019 let mut this = self.project();
2020 loop {
2021 match ready!(this.stream.as_mut().poll_next(cx)) {
2022 Some(v) => {
2023 let old = this.acc.take().unwrap();
2024 let new = (this.f)(old, v);
2025 *this.acc = Some(new);
2026 }
2027 None => return Poll::Ready(this.acc.take().unwrap()),
2028 }
2029 }
2030 }
2031}
2032
2033#[derive(Debug)]
2035#[must_use = "futures do nothing unless you `.await` or poll them"]
2036pub struct TryFoldFuture<'a, S, F, B> {
2037 stream: &'a mut S,
2038 f: F,
2039 acc: Option<B>,
2040}
2041
2042impl<'a, S, F, B> Unpin for TryFoldFuture<'a, S, F, B> {}
2043
2044impl<'a, T, E, S, F, B> Future for TryFoldFuture<'a, S, F, B>
2045where
2046 S: Stream<Item = Result<T, E>> + Unpin,
2047 F: FnMut(B, T) -> Result<B, E>,
2048{
2049 type Output = Result<B, E>;
2050
2051 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
2052 loop {
2053 match ready!(self.stream.poll_next(cx)) {
2054 Some(Err(e)) => return Poll::Ready(Err(e)),
2055 Some(Ok(t)) => {
2056 let old = self.acc.take().unwrap();
2057 let new = (&mut self.f)(old, t);
2058
2059 match new {
2060 Ok(t) => self.acc = Some(t),
2061 Err(e) => return Poll::Ready(Err(e)),
2062 }
2063 }
2064 None => return Poll::Ready(Ok(self.acc.take().unwrap())),
2065 }
2066 }
2067 }
2068}
2069
2070pin_project! {
2071 #[derive(Clone, Debug)]
2073 #[must_use = "streams do nothing unless polled"]
2074 pub struct Scan<S, St, F> {
2075 #[pin]
2076 stream: S,
2077 state_f: (St, F),
2078 }
2079}
2080
2081impl<S, St, F, B> Stream for Scan<S, St, F>
2082where
2083 S: Stream,
2084 F: FnMut(&mut St, S::Item) -> Option<B>,
2085{
2086 type Item = B;
2087
2088 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<B>> {
2089 let mut this = self.project();
2090 this.stream.as_mut().poll_next(cx).map(|item| {
2091 item.and_then(|item| {
2092 let (state, f) = this.state_f;
2093 f(state, item)
2094 })
2095 })
2096 }
2097}
2098
2099pin_project! {
2100 #[derive(Clone, Debug)]
2102 #[must_use = "streams do nothing unless polled"]
2103 pub struct Fuse<S> {
2104 #[pin]
2105 stream: S,
2106 done: bool,
2107 }
2108}
2109
2110impl<S: Stream> Stream for Fuse<S> {
2111 type Item = S::Item;
2112
2113 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<S::Item>> {
2114 let this = self.project();
2115
2116 if *this.done {
2117 Poll::Ready(None)
2118 } else {
2119 let next = ready!(this.stream.poll_next(cx));
2120 if next.is_none() {
2121 *this.done = true;
2122 }
2123 Poll::Ready(next)
2124 }
2125 }
2126}
2127
2128pin_project! {
2129 #[derive(Clone, Debug)]
2131 #[must_use = "streams do nothing unless polled"]
2132 pub struct Map<S, F> {
2133 #[pin]
2134 stream: S,
2135 f: F,
2136 }
2137}
2138
2139impl<S, F, T> Stream for Map<S, F>
2140where
2141 S: Stream,
2142 F: FnMut(S::Item) -> T,
2143{
2144 type Item = T;
2145
2146 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
2147 let this = self.project();
2148 let next = ready!(this.stream.poll_next(cx));
2149 Poll::Ready(next.map(this.f))
2150 }
2151
2152 fn size_hint(&self) -> (usize, Option<usize>) {
2153 self.stream.size_hint()
2154 }
2155}
2156
2157pin_project! {
2158 #[derive(Clone, Debug)]
2160 #[must_use = "streams do nothing unless polled"]
2161 pub struct FlatMap<S, U, F> {
2162 #[pin]
2163 stream: Map<S, F>,
2164 #[pin]
2165 inner_stream: Option<U>,
2166 }
2167}
2168
2169impl<S, U, F> Stream for FlatMap<S, U, F>
2170where
2171 S: Stream,
2172 U: Stream,
2173 F: FnMut(S::Item) -> U,
2174{
2175 type Item = U::Item;
2176
2177 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
2178 let mut this = self.project();
2179 loop {
2180 if let Some(inner) = this.inner_stream.as_mut().as_pin_mut() {
2181 match ready!(inner.poll_next(cx)) {
2182 Some(item) => return Poll::Ready(Some(item)),
2183 None => this.inner_stream.set(None),
2184 }
2185 }
2186
2187 match ready!(this.stream.as_mut().poll_next(cx)) {
2188 Some(stream) => this.inner_stream.set(Some(stream)),
2189 None => return Poll::Ready(None),
2190 }
2191 }
2192 }
2193}
2194
2195pin_project! {
2196 #[derive(Clone, Debug)]
2198 #[must_use = "streams do nothing unless polled"]
2199 pub struct Flatten<S: Stream> {
2200 #[pin]
2201 stream: S,
2202 #[pin]
2203 inner_stream: Option<S::Item>,
2204 }
2205}
2206
2207impl<S, U> Stream for Flatten<S>
2208where
2209 S: Stream<Item = U>,
2210 U: Stream,
2211{
2212 type Item = U::Item;
2213
2214 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
2215 let mut this = self.project();
2216 loop {
2217 if let Some(inner) = this.inner_stream.as_mut().as_pin_mut() {
2218 match ready!(inner.poll_next(cx)) {
2219 Some(item) => return Poll::Ready(Some(item)),
2220 None => this.inner_stream.set(None),
2221 }
2222 }
2223
2224 match ready!(this.stream.as_mut().poll_next(cx)) {
2225 Some(inner) => this.inner_stream.set(Some(inner)),
2226 None => return Poll::Ready(None),
2227 }
2228 }
2229 }
2230}
2231
2232pin_project! {
2233 #[derive(Clone, Debug)]
2235 #[must_use = "streams do nothing unless polled"]
2236 pub struct Then<S, F, Fut> {
2237 #[pin]
2238 stream: S,
2239 #[pin]
2240 future: Option<Fut>,
2241 f: F,
2242 }
2243}
2244
2245impl<S, F, Fut> Stream for Then<S, F, Fut>
2246where
2247 S: Stream,
2248 F: FnMut(S::Item) -> Fut,
2249 Fut: Future,
2250{
2251 type Item = Fut::Output;
2252
2253 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
2254 let mut this = self.project();
2255
2256 loop {
2257 if let Some(fut) = this.future.as_mut().as_pin_mut() {
2258 let item = ready!(fut.poll(cx));
2259 this.future.set(None);
2260 return Poll::Ready(Some(item));
2261 } else if let Some(item) = ready!(this.stream.as_mut().poll_next(cx)) {
2262 this.future.set(Some((this.f)(item)));
2263 } else {
2264 return Poll::Ready(None);
2265 }
2266 }
2267 }
2268
2269 fn size_hint(&self) -> (usize, Option<usize>) {
2270 let future_len = self.future.is_some() as usize;
2271 let (lower, upper) = self.stream.size_hint();
2272 let lower = lower.saturating_add(future_len);
2273 let upper = upper.and_then(|u| u.checked_add(future_len));
2274 (lower, upper)
2275 }
2276}
2277
2278pin_project! {
2279 #[derive(Clone, Debug)]
2281 #[must_use = "streams do nothing unless polled"]
2282 pub struct Filter<S, P> {
2283 #[pin]
2284 stream: S,
2285 predicate: P,
2286 }
2287}
2288
2289impl<S, P> Stream for Filter<S, P>
2290where
2291 S: Stream,
2292 P: FnMut(&S::Item) -> bool,
2293{
2294 type Item = S::Item;
2295
2296 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
2297 let mut this = self.project();
2298 loop {
2299 match ready!(this.stream.as_mut().poll_next(cx)) {
2300 None => return Poll::Ready(None),
2301 Some(v) if (this.predicate)(&v) => return Poll::Ready(Some(v)),
2302 Some(_) => {}
2303 }
2304 }
2305 }
2306}
2307
2308pub fn or<T, S1, S2>(stream1: S1, stream2: S2) -> Or<S1, S2>
2324where
2325 S1: Stream<Item = T>,
2326 S2: Stream<Item = T>,
2327{
2328 Or { stream1, stream2 }
2329}
2330
2331pin_project! {
2332 #[derive(Clone, Debug)]
2334 #[must_use = "streams do nothing unless polled"]
2335 pub struct Or<S1, S2> {
2336 #[pin]
2337 stream1: S1,
2338 #[pin]
2339 stream2: S2,
2340 }
2341}
2342
2343impl<T, S1, S2> Stream for Or<S1, S2>
2344where
2345 S1: Stream<Item = T>,
2346 S2: Stream<Item = T>,
2347{
2348 type Item = T;
2349
2350 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
2351 let mut this = self.project();
2352
2353 if let Poll::Ready(Some(t)) = this.stream1.as_mut().poll_next(cx) {
2354 return Poll::Ready(Some(t));
2355 }
2356 this.stream2.as_mut().poll_next(cx)
2357 }
2358}
2359
2360#[cfg(feature = "std")]
2375pub fn race<T, S1, S2>(stream1: S1, stream2: S2) -> Race<S1, S2>
2376where
2377 S1: Stream<Item = T>,
2378 S2: Stream<Item = T>,
2379{
2380 Race { stream1, stream2 }
2381}
2382
2383#[cfg(feature = "std")]
2384pin_project! {
2385 #[derive(Clone, Debug)]
2387 #[must_use = "streams do nothing unless polled"]
2388 pub struct Race<S1, S2> {
2389 #[pin]
2390 stream1: S1,
2391 #[pin]
2392 stream2: S2,
2393 }
2394}
2395
2396#[cfg(feature = "std")]
2397impl<T, S1, S2> Stream for Race<S1, S2>
2398where
2399 S1: Stream<Item = T>,
2400 S2: Stream<Item = T>,
2401{
2402 type Item = T;
2403
2404 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
2405 let mut this = self.project();
2406
2407 if fastrand::bool() {
2408 if let Poll::Ready(Some(t)) = this.stream1.as_mut().poll_next(cx) {
2409 return Poll::Ready(Some(t));
2410 }
2411 if let Poll::Ready(Some(t)) = this.stream2.as_mut().poll_next(cx) {
2412 return Poll::Ready(Some(t));
2413 }
2414 } else {
2415 if let Poll::Ready(Some(t)) = this.stream2.as_mut().poll_next(cx) {
2416 return Poll::Ready(Some(t));
2417 }
2418 if let Poll::Ready(Some(t)) = this.stream1.as_mut().poll_next(cx) {
2419 return Poll::Ready(Some(t));
2420 }
2421 }
2422 Poll::Pending
2423 }
2424}
2425
2426pin_project! {
2427 #[derive(Clone, Debug)]
2429 #[must_use = "streams do nothing unless polled"]
2430 pub struct FilterMap<S, F> {
2431 #[pin]
2432 stream: S,
2433 f: F,
2434 }
2435}
2436
2437impl<S, F, T> Stream for FilterMap<S, F>
2438where
2439 S: Stream,
2440 F: FnMut(S::Item) -> Option<T>,
2441{
2442 type Item = T;
2443
2444 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
2445 let mut this = self.project();
2446 loop {
2447 match ready!(this.stream.as_mut().poll_next(cx)) {
2448 None => return Poll::Ready(None),
2449 Some(v) => {
2450 if let Some(t) = (this.f)(v) {
2451 return Poll::Ready(Some(t));
2452 }
2453 }
2454 }
2455 }
2456 }
2457}
2458
2459pin_project! {
2460 #[derive(Clone, Debug)]
2462 #[must_use = "streams do nothing unless polled"]
2463 pub struct Take<S> {
2464 #[pin]
2465 stream: S,
2466 n: usize,
2467 }
2468}
2469
2470impl<S: Stream> Stream for Take<S> {
2471 type Item = S::Item;
2472
2473 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<S::Item>> {
2474 let this = self.project();
2475
2476 if *this.n == 0 {
2477 Poll::Ready(None)
2478 } else {
2479 let next = ready!(this.stream.poll_next(cx));
2480 match next {
2481 Some(_) => *this.n -= 1,
2482 None => *this.n = 0,
2483 }
2484 Poll::Ready(next)
2485 }
2486 }
2487}
2488
2489pin_project! {
2490 #[derive(Clone, Debug)]
2492 #[must_use = "streams do nothing unless polled"]
2493 pub struct TakeWhile<S, P> {
2494 #[pin]
2495 stream: S,
2496 predicate: P,
2497 }
2498}
2499
2500impl<S, P> Stream for TakeWhile<S, P>
2501where
2502 S: Stream,
2503 P: FnMut(&S::Item) -> bool,
2504{
2505 type Item = S::Item;
2506
2507 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
2508 let this = self.project();
2509
2510 match ready!(this.stream.poll_next(cx)) {
2511 Some(v) => {
2512 if (this.predicate)(&v) {
2513 Poll::Ready(Some(v))
2514 } else {
2515 Poll::Ready(None)
2516 }
2517 }
2518 None => Poll::Ready(None),
2519 }
2520 }
2521}
2522
2523pin_project! {
2524 #[derive(Clone, Debug)]
2526 #[must_use = "streams do nothing unless polled"]
2527 pub struct Skip<S> {
2528 #[pin]
2529 stream: S,
2530 n: usize,
2531 }
2532}
2533
2534impl<S: Stream> Stream for Skip<S> {
2535 type Item = S::Item;
2536
2537 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
2538 let mut this = self.project();
2539 loop {
2540 match ready!(this.stream.as_mut().poll_next(cx)) {
2541 Some(v) => match *this.n {
2542 0 => return Poll::Ready(Some(v)),
2543 _ => *this.n -= 1,
2544 },
2545 None => return Poll::Ready(None),
2546 }
2547 }
2548 }
2549}
2550
2551pin_project! {
2552 #[derive(Clone, Debug)]
2554 #[must_use = "streams do nothing unless polled"]
2555 pub struct SkipWhile<S, P> {
2556 #[pin]
2557 stream: S,
2558 predicate: Option<P>,
2559 }
2560}
2561
2562impl<S, P> Stream for SkipWhile<S, P>
2563where
2564 S: Stream,
2565 P: FnMut(&S::Item) -> bool,
2566{
2567 type Item = S::Item;
2568
2569 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
2570 let mut this = self.project();
2571 loop {
2572 match ready!(this.stream.as_mut().poll_next(cx)) {
2573 Some(v) => match this.predicate {
2574 Some(p) => {
2575 if !p(&v) {
2576 *this.predicate = None;
2577 return Poll::Ready(Some(v));
2578 }
2579 }
2580 None => return Poll::Ready(Some(v)),
2581 },
2582 None => return Poll::Ready(None),
2583 }
2584 }
2585 }
2586}
2587
2588pin_project! {
2589 #[derive(Clone, Debug)]
2591 #[must_use = "streams do nothing unless polled"]
2592 pub struct StepBy<S> {
2593 #[pin]
2594 stream: S,
2595 step: usize,
2596 i: usize,
2597 }
2598}
2599
2600impl<S: Stream> Stream for StepBy<S> {
2601 type Item = S::Item;
2602
2603 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
2604 let mut this = self.project();
2605 loop {
2606 match ready!(this.stream.as_mut().poll_next(cx)) {
2607 Some(v) => {
2608 if *this.i == 0 {
2609 *this.i = *this.step - 1;
2610 return Poll::Ready(Some(v));
2611 } else {
2612 *this.i -= 1;
2613 }
2614 }
2615 None => return Poll::Ready(None),
2616 }
2617 }
2618 }
2619}
2620
2621pin_project! {
2622 #[derive(Clone, Debug)]
2624 #[must_use = "streams do nothing unless polled"]
2625 pub struct Chain<S, U> {
2626 #[pin]
2627 first: Fuse<S>,
2628 #[pin]
2629 second: Fuse<U>,
2630 }
2631}
2632
2633impl<S: Stream, U: Stream<Item = S::Item>> Stream for Chain<S, U> {
2634 type Item = S::Item;
2635
2636 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
2637 let mut this = self.project();
2638
2639 if !this.first.done {
2640 let next = ready!(this.first.as_mut().poll_next(cx));
2641 if let Some(next) = next {
2642 return Poll::Ready(Some(next));
2643 }
2644 }
2645
2646 if !this.second.done {
2647 let next = ready!(this.second.as_mut().poll_next(cx));
2648 if let Some(next) = next {
2649 return Poll::Ready(Some(next));
2650 }
2651 }
2652
2653 if this.first.done && this.second.done {
2654 Poll::Ready(None)
2655 } else {
2656 Poll::Pending
2657 }
2658 }
2659}
2660
2661pin_project! {
2662 #[derive(Clone, Debug)]
2664 #[must_use = "streams do nothing unless polled"]
2665 pub struct Cloned<S> {
2666 #[pin]
2667 stream: S,
2668 }
2669}
2670
2671impl<'a, S, T: 'a> Stream for Cloned<S>
2672where
2673 S: Stream<Item = &'a T>,
2674 T: Clone,
2675{
2676 type Item = T;
2677
2678 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
2679 let this = self.project();
2680 let next = ready!(this.stream.poll_next(cx));
2681 Poll::Ready(next.cloned())
2682 }
2683}
2684
2685pin_project! {
2686 #[derive(Clone, Debug)]
2688 #[must_use = "streams do nothing unless polled"]
2689 pub struct Copied<S> {
2690 #[pin]
2691 stream: S,
2692 }
2693}
2694
2695impl<'a, S, T: 'a> Stream for Copied<S>
2696where
2697 S: Stream<Item = &'a T>,
2698 T: Copy,
2699{
2700 type Item = T;
2701
2702 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
2703 let this = self.project();
2704 let next = ready!(this.stream.poll_next(cx));
2705 Poll::Ready(next.copied())
2706 }
2707}
2708
2709pin_project! {
2710 #[derive(Clone, Debug)]
2712 #[must_use = "streams do nothing unless polled"]
2713 pub struct Cycle<S> {
2714 orig: S,
2715 #[pin]
2716 stream: S,
2717 }
2718}
2719
2720impl<S> Stream for Cycle<S>
2721where
2722 S: Stream + Clone,
2723{
2724 type Item = S::Item;
2725
2726 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
2727 match ready!(self.as_mut().project().stream.as_mut().poll_next(cx)) {
2728 Some(item) => Poll::Ready(Some(item)),
2729 None => {
2730 let new = self.as_mut().orig.clone();
2731 self.as_mut().project().stream.set(new);
2732 self.project().stream.poll_next(cx)
2733 }
2734 }
2735 }
2736}
2737
2738pin_project! {
2739 #[derive(Clone, Debug)]
2741 #[must_use = "streams do nothing unless polled"]
2742 pub struct Enumerate<S> {
2743 #[pin]
2744 stream: S,
2745 i: usize,
2746 }
2747}
2748
2749impl<S> Stream for Enumerate<S>
2750where
2751 S: Stream,
2752{
2753 type Item = (usize, S::Item);
2754
2755 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
2756 let this = self.project();
2757
2758 match ready!(this.stream.poll_next(cx)) {
2759 Some(v) => {
2760 let ret = (*this.i, v);
2761 *this.i += 1;
2762 Poll::Ready(Some(ret))
2763 }
2764 None => Poll::Ready(None),
2765 }
2766 }
2767}
2768
2769pin_project! {
2770 #[derive(Clone, Debug)]
2772 #[must_use = "streams do nothing unless polled"]
2773 pub struct Inspect<S, F> {
2774 #[pin]
2775 stream: S,
2776 f: F,
2777 }
2778}
2779
2780impl<S, F> Stream for Inspect<S, F>
2781where
2782 S: Stream,
2783 F: FnMut(&S::Item),
2784{
2785 type Item = S::Item;
2786
2787 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
2788 let mut this = self.project();
2789 let next = ready!(this.stream.as_mut().poll_next(cx));
2790 if let Some(x) = &next {
2791 (this.f)(x);
2792 }
2793 Poll::Ready(next)
2794 }
2795}
2796
2797#[derive(Debug)]
2799#[must_use = "futures do nothing unless you `.await` or poll them"]
2800pub struct NthFuture<'a, S: ?Sized> {
2801 stream: &'a mut S,
2802 n: usize,
2803}
2804
2805impl<S: Unpin + ?Sized> Unpin for NthFuture<'_, S> {}
2806
2807impl<'a, S> Future for NthFuture<'a, S>
2808where
2809 S: Stream + Unpin + ?Sized,
2810{
2811 type Output = Option<S::Item>;
2812
2813 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
2814 loop {
2815 match ready!(self.stream.poll_next(cx)) {
2816 Some(v) => match self.n {
2817 0 => return Poll::Ready(Some(v)),
2818 _ => self.n -= 1,
2819 },
2820 None => return Poll::Ready(None),
2821 }
2822 }
2823 }
2824}
2825
2826pin_project! {
2827 #[derive(Debug)]
2829 #[must_use = "futures do nothing unless you `.await` or poll them"]
2830 pub struct LastFuture<S: Stream> {
2831 #[pin]
2832 stream: S,
2833 last: Option<S::Item>,
2834 }
2835}
2836
2837impl<S: Stream> Future for LastFuture<S> {
2838 type Output = Option<S::Item>;
2839
2840 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
2841 let mut this = self.project();
2842 loop {
2843 match ready!(this.stream.as_mut().poll_next(cx)) {
2844 Some(new) => *this.last = Some(new),
2845 None => return Poll::Ready(this.last.take()),
2846 }
2847 }
2848 }
2849}
2850
2851#[derive(Debug)]
2853#[must_use = "futures do nothing unless you `.await` or poll them"]
2854pub struct FindFuture<'a, S: ?Sized, P> {
2855 stream: &'a mut S,
2856 predicate: P,
2857}
2858
2859impl<S: Unpin + ?Sized, P> Unpin for FindFuture<'_, S, P> {}
2860
2861impl<'a, S, P> Future for FindFuture<'a, S, P>
2862where
2863 S: Stream + Unpin + ?Sized,
2864 P: FnMut(&S::Item) -> bool,
2865{
2866 type Output = Option<S::Item>;
2867
2868 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
2869 loop {
2870 match ready!(self.stream.poll_next(cx)) {
2871 Some(v) if (&mut self.predicate)(&v) => return Poll::Ready(Some(v)),
2872 Some(_) => {}
2873 None => return Poll::Ready(None),
2874 }
2875 }
2876 }
2877}
2878
2879#[derive(Debug)]
2881#[must_use = "futures do nothing unless you `.await` or poll them"]
2882pub struct FindMapFuture<'a, S: ?Sized, F> {
2883 stream: &'a mut S,
2884 f: F,
2885}
2886
2887impl<S: Unpin + ?Sized, F> Unpin for FindMapFuture<'_, S, F> {}
2888
2889impl<'a, S, B, F> Future for FindMapFuture<'a, S, F>
2890where
2891 S: Stream + Unpin + ?Sized,
2892 F: FnMut(S::Item) -> Option<B>,
2893{
2894 type Output = Option<B>;
2895
2896 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
2897 loop {
2898 match ready!(self.stream.poll_next(cx)) {
2899 Some(v) => {
2900 if let Some(v) = (&mut self.f)(v) {
2901 return Poll::Ready(Some(v));
2902 }
2903 }
2904 None => return Poll::Ready(None),
2905 }
2906 }
2907 }
2908}
2909
2910#[derive(Debug)]
2912#[must_use = "futures do nothing unless you `.await` or poll them"]
2913pub struct PositionFuture<'a, S: ?Sized, P> {
2914 stream: &'a mut S,
2915 predicate: P,
2916 index: usize,
2917}
2918
2919impl<'a, S: Unpin + ?Sized, P> Unpin for PositionFuture<'a, S, P> {}
2920
2921impl<'a, S, P> Future for PositionFuture<'a, S, P>
2922where
2923 S: Stream + Unpin + ?Sized,
2924 P: FnMut(S::Item) -> bool,
2925{
2926 type Output = Option<usize>;
2927
2928 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
2929 loop {
2930 match ready!(self.stream.poll_next(cx)) {
2931 Some(v) => {
2932 if (&mut self.predicate)(v) {
2933 return Poll::Ready(Some(self.index));
2934 } else {
2935 self.index += 1;
2936 }
2937 }
2938 None => return Poll::Ready(None),
2939 }
2940 }
2941 }
2942}
2943
2944#[derive(Debug)]
2946#[must_use = "futures do nothing unless you `.await` or poll them"]
2947pub struct AllFuture<'a, S: ?Sized, P> {
2948 stream: &'a mut S,
2949 predicate: P,
2950}
2951
2952impl<S: Unpin + ?Sized, P> Unpin for AllFuture<'_, S, P> {}
2953
2954impl<S, P> Future for AllFuture<'_, S, P>
2955where
2956 S: Stream + Unpin + ?Sized,
2957 P: FnMut(S::Item) -> bool,
2958{
2959 type Output = bool;
2960
2961 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
2962 loop {
2963 match ready!(self.stream.poll_next(cx)) {
2964 Some(v) => {
2965 if !(&mut self.predicate)(v) {
2966 return Poll::Ready(false);
2967 }
2968 }
2969 None => return Poll::Ready(true),
2970 }
2971 }
2972 }
2973}
2974
2975#[derive(Debug)]
2977#[must_use = "futures do nothing unless you `.await` or poll them"]
2978pub struct AnyFuture<'a, S: ?Sized, P> {
2979 stream: &'a mut S,
2980 predicate: P,
2981}
2982
2983impl<S: Unpin + ?Sized, P> Unpin for AnyFuture<'_, S, P> {}
2984
2985impl<S, P> Future for AnyFuture<'_, S, P>
2986where
2987 S: Stream + Unpin + ?Sized,
2988 P: FnMut(S::Item) -> bool,
2989{
2990 type Output = bool;
2991
2992 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
2993 loop {
2994 match ready!(self.stream.poll_next(cx)) {
2995 Some(v) => {
2996 if (&mut self.predicate)(v) {
2997 return Poll::Ready(true);
2998 }
2999 }
3000 None => return Poll::Ready(false),
3001 }
3002 }
3003 }
3004}
3005
3006pin_project! {
3007 #[derive(Debug)]
3009 #[must_use = "futures do nothing unless you `.await` or poll them"]
3010 pub struct ForEachFuture<S, F> {
3011 #[pin]
3012 stream: S,
3013 f: F,
3014 }
3015}
3016
3017impl<S, F> Future for ForEachFuture<S, F>
3018where
3019 S: Stream,
3020 F: FnMut(S::Item),
3021{
3022 type Output = ();
3023
3024 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
3025 let mut this = self.project();
3026 loop {
3027 match ready!(this.stream.as_mut().poll_next(cx)) {
3028 Some(v) => (this.f)(v),
3029 None => return Poll::Ready(()),
3030 }
3031 }
3032 }
3033}
3034
3035#[derive(Debug)]
3037#[must_use = "futures do nothing unless you `.await` or poll them"]
3038pub struct TryForEachFuture<'a, S: ?Sized, F> {
3039 stream: &'a mut S,
3040 f: F,
3041}
3042
3043impl<'a, S: Unpin + ?Sized, F> Unpin for TryForEachFuture<'a, S, F> {}
3044
3045impl<'a, S, F, E> Future for TryForEachFuture<'a, S, F>
3046where
3047 S: Stream + Unpin + ?Sized,
3048 F: FnMut(S::Item) -> Result<(), E>,
3049{
3050 type Output = Result<(), E>;
3051
3052 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
3053 loop {
3054 match ready!(self.stream.poll_next(cx)) {
3055 None => return Poll::Ready(Ok(())),
3056 Some(v) => (&mut self.f)(v)?,
3057 }
3058 }
3059 }
3060}
3061
3062pin_project! {
3063 #[derive(Clone, Debug)]
3065 #[must_use = "streams do nothing unless polled"]
3066 pub struct Zip<A: Stream, B> {
3067 item_slot: Option<A::Item>,
3068 #[pin]
3069 first: A,
3070 #[pin]
3071 second: B,
3072 }
3073}
3074
3075impl<A: Stream, B: Stream> Stream for Zip<A, B> {
3076 type Item = (A::Item, B::Item);
3077
3078 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
3079 let this = self.project();
3080
3081 if this.item_slot.is_none() {
3082 match this.first.poll_next(cx) {
3083 Poll::Pending => return Poll::Pending,
3084 Poll::Ready(None) => return Poll::Ready(None),
3085 Poll::Ready(Some(item)) => *this.item_slot = Some(item),
3086 }
3087 }
3088
3089 let second_item = ready!(this.second.poll_next(cx));
3090 let first_item = this.item_slot.take().unwrap();
3091 Poll::Ready(second_item.map(|second_item| (first_item, second_item)))
3092 }
3093}
3094
3095pin_project! {
3096 #[derive(Debug)]
3098 #[must_use = "futures do nothing unless you `.await` or poll them"]
3099 pub struct UnzipFuture<S, FromA, FromB> {
3100 #[pin]
3101 stream: S,
3102 res: Option<(FromA, FromB)>,
3103 }
3104}
3105
3106impl<S, A, B, FromA, FromB> Future for UnzipFuture<S, FromA, FromB>
3107where
3108 S: Stream<Item = (A, B)>,
3109 FromA: Default + Extend<A>,
3110 FromB: Default + Extend<B>,
3111{
3112 type Output = (FromA, FromB);
3113
3114 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
3115 let mut this = self.project();
3116
3117 loop {
3118 match ready!(this.stream.as_mut().poll_next(cx)) {
3119 Some((a, b)) => {
3120 let res = this.res.as_mut().unwrap();
3121 res.0.extend(Some(a));
3122 res.1.extend(Some(b));
3123 }
3124 None => return Poll::Ready(this.res.take().unwrap()),
3125 }
3126 }
3127 }
3128}