Skip to content

Commit

Permalink
Use futures-core rather than tokio-stream, add better description,
Browse files Browse the repository at this point in the history
remove unnecessary trait impl bounds
  • Loading branch information
Ploppz committed Dec 14, 2023
1 parent dc6c4c9 commit 11f1e64
Show file tree
Hide file tree
Showing 3 changed files with 51 additions and 13 deletions.
28 changes: 28 additions & 0 deletions CHANGELOG.next.toml
Original file line number Diff line number Diff line change
Expand Up @@ -109,3 +109,31 @@ message = "Add support for constructing [`SdkBody`] and [`ByteStream`] from `htt
references = ["smithy-rs#3300", "aws-sdk-rust#977"]
meta = { "breaking" = false, "tada" = true, "bug" = false }
author = "rcoh"

[[smithy-rs]]
message = """ Add `PaginationStreamExt` extension trait to `aws-smithy-types-convert` behind the `convert-streams` feature. This makes it possible to treat a paginator as a [`futures_core::Stream`](https://docs.rs/futures-core/latest/futures_core/stream/trait.Stream.html), allowing customers to use stream combinators like [`map`](https://docs.rs/tokio-stream/latest/tokio_stream/trait.StreamExt.html#method.map) and [`filter`](https://docs.rs/tokio-stream/latest/tokio_stream/trait.StreamExt.html#method.filter).
Example:
```rust
use aws_smithy_types_convert::stream::PaginationStreamExt
let stream = s3_client.list_objects_v2().bucket("...").into_paginator().send().into_stream_03x();
```
"""
references = ["smithy-rs#3299"]
meta = { "breaking" = false, "tada" = false, "bug" = false, "target" = "client"}
author = "Erlend Langseth <erlend.langseth@openanalytics.eu>"

[[aws-sdk-rust]]
message = """ Add `PaginationStreamExt` extension trait to `aws-smithy-types-convert` behind the `convert-streams` feature. This makes it possible to treat a paginator as a [`futures_core::Stream`](https://docs.rs/futures-core/latest/futures_core/stream/trait.Stream.html), allowing customers to use stream combinators like [`map`](https://docs.rs/tokio-stream/latest/tokio_stream/trait.StreamExt.html#method.map) and [`filter`](https://docs.rs/tokio-stream/latest/tokio_stream/trait.StreamExt.html#method.filter).
Example:
```rust
use aws_smithy_types_convert::stream::PaginationStreamExt
let stream = s3_client.list_objects_v2().bucket("...").into_paginator().send().into_stream_03x();
```
"""
references = ["smithy-rs#3299"]
meta = { "breaking" = false, "tada" = false, "bug" = false, "target" = "client"}
author = "Erlend Langseth <erlend.langseth@openanalytics.eu>"
4 changes: 2 additions & 2 deletions rust-runtime/aws-smithy-types-convert/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,14 +10,14 @@ repository = "https://github.com/smithy-lang/smithy-rs"
[features]
convert-chrono = ["aws-smithy-types", "chrono"]
convert-time = ["aws-smithy-types", "time"]
convert-streams = ["aws-smithy-async", "tokio-stream"]
convert-streams = ["aws-smithy-async", "futures-core"]

[dependencies]
aws-smithy-types = { path = "../aws-smithy-types", optional = true }
aws-smithy-async = {path = "../aws-smithy-async", optional = true}
chrono = { version = "0.4.26", optional = true, default-features = false, features = ["std"] }
time = { version = "0.3.4", optional = true }
tokio-stream = { version = "0.1.14", optional = true }
futures-core = { version = "0.3.0", optional = true }

[package.metadata.docs.rs]
all-features = true
Expand Down
32 changes: 21 additions & 11 deletions rust-runtime/aws-smithy-types-convert/src/stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,9 @@

//! Conversions from Stream-like structs to implementors of `futures::Stream`

use futures_core::Stream;
use std::pin::Pin;
use std::task::{Context, Poll};
use tokio_stream::Stream;

use aws_smithy_async::future::pagination_stream::PaginationStream;

Expand All @@ -16,20 +16,14 @@ pub struct PaginationStreamImplStream<Item> {
pagination_stream: PaginationStream<Item>,
}

impl<Item> PaginationStreamImplStream<Item>
where
Item: Send + 'static,
{
impl<Item> PaginationStreamImplStream<Item> {
/// Create a new Stream object wrapping a `PaginationStream`
pub fn new(pagination_stream: PaginationStream<Item>) -> Self {
PaginationStreamImplStream { pagination_stream }
}
}

impl<Item> Stream for PaginationStreamImplStream<Item>
where
Item: Send + 'static,
{
impl<Item> Stream for PaginationStreamImplStream<Item> {
type Item = Item;

fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
Expand All @@ -40,11 +34,27 @@ where
/// Trait to convert PaginationStream into implementor of `Stream`
pub trait PaginationStreamExt<Item> {
/// Convert PaginationStream into implementor of `Stream`
fn as_stream_01x(self) -> PaginationStreamImplStream<Item>;
///
/// # Example
/// ```no_run
/// # use aws_smithy_async::future::pagination_stream::PaginationStream;
/// use aws_smithy_types_convert::stream::PaginationStreamExt;
/// // Assuming you have obtained a pagination stream, by something like:
/// // ```
/// // let pagination_stream = s3_client
/// // .list_objects_v2()
/// // .bucket(bucket)
/// // .into_paginator()
/// // .send();
/// // ```
/// # let pagination_stream: PaginationStream<i32> = unimplemented!();
/// let futures_stream = pagination_stream.into_stream_03x();
/// ```
fn into_stream_03x(self) -> PaginationStreamImplStream<Item>;
}

impl<Item> PaginationStreamExt<Item> for PaginationStream<Item> {
fn as_stream_01x(self) -> PaginationStreamImplStream<Item> {
fn into_stream_03x(self) -> PaginationStreamImplStream<Item> {
PaginationStreamImplStream {
pagination_stream: self,
}
Expand Down

0 comments on commit 11f1e64

Please sign in to comment.