Expand description
§Streams that produce elements with an associated ordering
Say you have a bunch of events that all have a timestamp, sequence number, or other ordering
attribute. If you get these events from multiple Streams, then you should be able to produce
a “composite” stream by joining each of the individual streams, so long as each originating stream
is ordered.
However, if you actually implement this, you discover that you need to buffer at least one element from each stream in order to avoid ordering inversions if the sources are independent (including just running in different tasks). This presents a problem if one of the sources rarely produces events: that slow source can stall all other streams in order to handle the case where the slowness is due to an earlier element instead of just having no elements.
The OrderedStream trait provides a way to solve this problem: if you can ask a stream if it
will ever have any events that should be delivered before a given event, then you can often
avoid blocking the composite stream when data is ready.
use futures_core::Stream;
use ordered_stream::FromStream;
use ordered_stream::JoinMultiple;
use ordered_stream::OrderedStream;
use ordered_stream::OrderedStreamExt;
use std::pin::Pin;
use std::time::SystemTime;
pub struct Message {
time: SystemTime,
level: u8,
data: String,
source: String,
}
pub struct RemoteLogSource {
stream: Pin<Box<dyn Stream<Item = Message>>>,
min_level: u8,
}
pub async fn display_logs(logs: &mut [RemoteLogSource]) {
let mut streams: Vec<_> = logs
.iter_mut()
.map(|s| {
let min = s.min_level;
FromStream::with_ordering(&mut s.stream, |m| m.time)
.filter(move |m| m.level >= min)
.peekable()
})
.collect();
let mut joined = JoinMultiple(streams);
while let Some(msg) = joined.next().await {
println!("{:?}: {}", msg.time, msg.data);
}
}Structs§
- Filter
- A stream for the
filterfunction. - Filter
Map - A stream for the
filter_mapfunction. - From
Future - An
OrderedStreamwrapper around anOrderedFuture. - From
Sorted Stream - An
OrderedStreamwrapper around aStream. - From
Stream - An
OrderedStreamwrapper around aStream. - From
Stream Direct - An
OrderedStreamwrapper around aStream. - Into
Ordering - A
Streamfor theinto_orderingfunction. - Into
Stream - A
Streamfor theinto_streamfunction. - Into
Tuple Stream - A
Streamfor theinto_tuple_streamfunction. - Join
- A stream for the
joinfunction. - Join
Multiple - Join a collection of
OrderedStreams. - Join
Multiple Pin - Join a collection of pinned
OrderedStreams. - Map
- A stream for the
mapfunction. - MapItem
- A stream for the
map_itemfunction. - MapOrdering
- A stream for the
map_orderingfunction. - Next
- A future for the
nextfunction. - Next
Before - A future for the
next_beforefunction. - Peekable
- A stream for the
peekablefunction. - Then
- A stream for the
thenfunction.
Enums§
- Maybe
Borrowed - A value that is either borrowed or owned.
- Poll
Result - The result of a
OrderedStream::poll_next_beforeoperation.
Traits§
- Fused
Ordered Stream - An
OrderedStreamthat tracks if the underlying stream should be polled. - Ordered
Future - A
Futurethat produces an item with an associated ordering. - Ordered
Stream - A stream that produces items that are ordered according to some token.
- Ordered
Stream Ext - Helpers for chaining
OrderedStreams.
Functions§
- join
- Join two streams while preserving the overall ordering of elements.