Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Stream implementation (wrapper) for PaginationStream #3299

Merged
merged 10 commits into from
Jan 2, 2024
28 changes: 28 additions & 0 deletions CHANGELOG.next.toml
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,34 @@ references = ["smithy-rs#3300", "aws-sdk-rust#977"]
meta = { "breaking" = false, "tada" = true, "bug" = false }
author = "rcoh"

[[smithy-rs]]
message = """ Add `PaginationStreamExt` extension trait to `aws-smithy-types-convert` behind the `convert-streams` feature. This makes it possible to treat a paginator as a [`futures_core::Stream`](https://docs.rs/futures-core/latest/futures_core/stream/trait.Stream.html), allowing customers to use stream combinators like [`map`](https://docs.rs/tokio-stream/latest/tokio_stream/trait.StreamExt.html#method.map) and [`filter`](https://docs.rs/tokio-stream/latest/tokio_stream/trait.StreamExt.html#method.filter).

Example:

```rust
use aws_smithy_types_convert::stream::PaginationStreamExt
let stream = s3_client.list_objects_v2().bucket("...").into_paginator().send().into_stream_03x();
```
"""
references = ["smithy-rs#3299"]
meta = { "breaking" = false, "tada" = false, "bug" = false, "target" = "client"}
author = "Ploppz"

[[aws-sdk-rust]]
message = """ Add `PaginationStreamExt` extension trait to `aws-smithy-types-convert` behind the `convert-streams` feature. This makes it possible to treat a paginator as a [`futures_core::Stream`](https://docs.rs/futures-core/latest/futures_core/stream/trait.Stream.html), allowing customers to use stream combinators like [`map`](https://docs.rs/tokio-stream/latest/tokio_stream/trait.StreamExt.html#method.map) and [`filter`](https://docs.rs/tokio-stream/latest/tokio_stream/trait.StreamExt.html#method.filter).

Example:

```rust
use aws_smithy_types_convert::stream::PaginationStreamExt
let stream = s3_client.list_objects_v2().bucket("...").into_paginator().send().into_stream_03x();
```
"""
references = ["smithy-rs#3299"]
meta = { "breaking" = false, "tada" = false, "bug" = false }
author = "Ploppz"

[[smithy-rs]]
message = "Serialize 0/false in query parameters, and ignore actual default value during serialization instead of just 0/false. See [changelog discussion](https://github.com/smithy-lang/smithy-rs/discussions/3312) for details."
references = ["smithy-rs#3252", "smithy-rs#3312"]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@
use crate::future::pagination_stream::collect::sealed::Collectable;
use std::future::Future;
use std::pin::Pin;
use std::task::{Context, Poll};

pub mod collect;
pub mod fn_stream;
use fn_stream::FnStream;
Expand Down Expand Up @@ -60,6 +62,11 @@ impl<Item> PaginationStream<Item> {
self.0.next().await
}

/// Poll an item from the stream
pub fn poll_next(&mut self, cx: &mut Context<'_>) -> Poll<Option<Item>> {
Pin::new(&mut self.0).poll_next(cx)
}

/// Consumes this stream and gathers elements into a collection.
pub async fn collect<T: Collectable<Item>>(self) -> T {
self.0.collect().await
Expand Down
3 changes: 3 additions & 0 deletions rust-runtime/aws-smithy-types-convert/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,14 @@ repository = "https://github.com/smithy-lang/smithy-rs"
[features]
convert-chrono = ["aws-smithy-types", "chrono"]
convert-time = ["aws-smithy-types", "time"]
convert-streams = ["aws-smithy-async", "futures-core"]

[dependencies]
aws-smithy-types = { path = "../aws-smithy-types", optional = true }
aws-smithy-async = {path = "../aws-smithy-async", optional = true}
chrono = { version = "0.4.26", optional = true, default-features = false, features = ["std"] }
time = { version = "0.3.4", optional = true }
futures-core = { version = "0.3.0", optional = true }

[package.metadata.docs.rs]
all-features = true
Expand Down
2 changes: 2 additions & 0 deletions rust-runtime/aws-smithy-types-convert/external-types.toml
Original file line number Diff line number Diff line change
Expand Up @@ -4,4 +4,6 @@ allowed_external_types = [
"chrono::offset::fixed::FixedOffset",
"chrono::offset::utc::Utc",
"time::offset_date_time::OffsetDateTime",
"aws_smithy_async::future::pagination_stream::PaginationStream",
"futures_core::stream::Stream",
]
3 changes: 3 additions & 0 deletions rust-runtime/aws-smithy-types-convert/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,3 +18,6 @@

#[cfg(any(feature = "convert-time", feature = "convert-chrono"))]
pub mod date_time;

#[cfg(feature = "convert-streams")]
pub mod stream;
62 changes: 62 additions & 0 deletions rust-runtime/aws-smithy-types-convert/src/stream.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
/*
* Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
* SPDX-License-Identifier: Apache-2.0
*/

//! Conversions from Stream-like structs to implementors of `futures::Stream`

use futures_core::Stream;
use std::pin::Pin;
use std::task::{Context, Poll};

use aws_smithy_async::future::pagination_stream::PaginationStream;

/// Stream implementor wrapping `PaginationStream`
pub struct PaginationStreamImplStream<Item> {
pagination_stream: PaginationStream<Item>,
}

impl<Item> PaginationStreamImplStream<Item> {
/// Create a new Stream object wrapping a `PaginationStream`
pub fn new(pagination_stream: PaginationStream<Item>) -> Self {
PaginationStreamImplStream { pagination_stream }
}
}

impl<Item> Stream for PaginationStreamImplStream<Item> {
type Item = Item;

fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
self.pagination_stream.poll_next(cx)
}
}

/// Trait to convert PaginationStream into implementor of `Stream`
pub trait PaginationStreamExt<Item> {
/// Convert PaginationStream into implementor of `Stream`
///
/// # Example
/// ```no_run
/// # use aws_smithy_async::future::pagination_stream::PaginationStream;
/// use aws_smithy_types_convert::stream::PaginationStreamExt;
/// // Assuming you have obtained a pagination stream, by something like:
/// // ```
/// // let pagination_stream = s3_client
/// // .list_objects_v2()
/// // .bucket(bucket)
/// // .into_paginator()
/// // .send();
/// // ```
/// # let pagination_stream: PaginationStream<i32> = unimplemented!();
/// let futures_stream = pagination_stream.into_stream_03x();
/// ```
fn into_stream_03x(self) -> PaginationStreamImplStream<Item>;
}

impl<Item> PaginationStreamExt<Item> for PaginationStream<Item> {
fn into_stream_03x(self) -> PaginationStreamImplStream<Item> {
PaginationStreamImplStream {
pagination_stream: self,
}
}
}
Loading