Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor Publishing #34

Merged
merged 10 commits into from
Oct 27, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 4 additions & 6 deletions .github/workflows/hedwig.yml
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,9 @@ on:

jobs:
lint:
runs-on: ${{ matrix.os }}
runs-on: ubuntu-latest
strategy:
fail-fast: false
matrix:
os: [ubuntu-latest, windows-latest, macOS-latest]
timeout-minutes: 10
steps:
- uses: actions/checkout@v2
Expand All @@ -31,7 +29,7 @@ jobs:
uses: actions-rs/cargo@v1
with:
command: clippy
args: -- -Dclippy::correctness -Dclippy::complexity -Dclippy::perf -Dunsafe_code -Dunreachable_pub -Dunused
args: --all-features -- -Dclippy::correctness -Dclippy::complexity -Dclippy::perf -Dunsafe_code -Dunreachable_pub -Dunused

doc:
runs-on: ubuntu-latest
Expand All @@ -50,14 +48,14 @@ jobs:
command: doc
args: --all-features --manifest-path=Cargo.toml
env:
RUSTDOCFLAGS: --cfg docsrs -Dmissing_docs -Dbroken_intra_doc_links
RUSTDOCFLAGS: --cfg docsrs -Dmissing_docs -Drustdoc::broken_intra_doc_links

test:
runs-on: ${{ matrix.os }}
strategy:
fail-fast: false
matrix:
rust_toolchain: [nightly, stable, 1.49.0]
rust_toolchain: [nightly, stable, 1.53.0]
os: [ubuntu-latest, windows-latest, macOS-latest]
timeout-minutes: 20
steps:
Expand Down
48 changes: 24 additions & 24 deletions Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
[package]
name = "hedwig"
# TODO: When bumping to next major version, make sure to clean up the MRV and lints we allow in CI.
version = "4.1.0"
version = "5.0.0"
authors = [
"Aniruddha Maru <aniruddhamaru@gmail.com>",
"Simonas Kazlauskas <hedwig@kazlauskas.me>"
"Simonas Kazlauskas <hedwig@kazlauskas.me>",
"Renar Narubin <r.narubin@gmail.com>",
]
edition = "2018"
repository = "https://github.com/standard-ai/hedwig-rust.git"
Expand All @@ -19,50 +20,49 @@ categories = ["asynchronous", "web-programming"]
maintenance = { status = "actively-developed" }

[features]
default = ["consume", "sink"]
default = []

# Whether publishing/consuming is enabled
publish = []
consume = ["async-trait", "either"]

# Publishers
google = ["base64", "yup-oauth2", "hyper", "http", "serde_json", "serde", "serde/derive", "uuid/serde"]
# Backends
google = ["ya-gcp", "tracing", "parking_lot"]
mock = ["async-channel", "parking_lot"]

# Validators
json-schema = ["valico", "serde_json", "serde"]
protobuf = ["prost"]

# Convenience API
sink = ["futures-util/sink", "either", "publish"]

[[example]]
name = "publish"
required-features = ["google", "json-schema"]
name = "googlepubsub"
required-features = ["google", "protobuf"]

[dependencies]
async-trait = { version = "0.1" }
bytes = "1"
futures-util = { version = "0.3", features = ["std"], default-features = false }
either = { version = "1", features = ["use_std"], default-features = false }
futures-util = { version = "0.3.17", features = ["std", "sink"], default-features = false }
pin-project = "1"
thiserror = { version = "1", default-features = false }
url = { version = "2", default-features = false }
uuid = { version = "^0.8", features = ["v4"], default-features = false }

async-trait = { version = "0.1", optional = true }
either = { version = "1", optional = true, features = ["use_std"], default-features = false }
async-channel = { version = "1.6", optional = true }
serde = { version = "^1.0", optional = true, default-features = false }
serde_json = { version = "^1", features = ["std"], optional = true, default-features = false }
parking_lot = { version = "0.11", optional = true }
prost = { version = "0.8", optional = true, features = ["std"], default-features = false }
tracing = { version = "0.1.26", optional = true }
valico = { version = "^3.2", optional = true, default-features = false }
base64 = { version = "^0.13", optional = true, default-features = false }
http = { version = "^0.2", optional = true, default-features = false }
hyper = { version = "^0.14.4", optional = true, features = ["client", "stream"], default-features = false }
yup-oauth2 = { version = "5.1", optional = true, features = ["hyper-rustls"], default-features = false }
prost = { version = "0.7", optional = true, features = ["std"], default-features = false }
ya-gcp = { version = "0.6.3", features = ["pubsub"], optional = true }

[dev-dependencies]
hyper-tls = "0.5.0"
prost = { version = "0.7", features = ["std", "prost-derive"] }
async-channel = { version = "1.6" }
futures-channel = "0.3.17"
parking_lot = { version = "0.11" }
prost = { version = "0.8", features = ["std", "prost-derive"] }
tokio = { version = "1", features = ["macros", "rt"] }
tonic = "0.5"
serde = { version = "1", features = ["derive"] }
ya-gcp = { version = "0.6.3", features = ["pubsub", "emulators"] }
structopt = "0.3"

[package.metadata.docs.rs]
all-features = true
Expand Down
238 changes: 238 additions & 0 deletions examples/googlepubsub.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,238 @@
//! An example of ingesting messages from a PubSub subscription, applying a
//! transformation, then submitting those transformations to another PubSub topic.

use futures_util::{SinkExt, StreamExt, TryFutureExt};
use hedwig::{
googlepubsub::{
AuthFlow, ClientBuilder, ClientBuilderConfig, PubSubConfig, PubSubMessage, PublishError,
ServiceAccountAuth, StreamSubscriptionConfig, SubscriptionConfig, SubscriptionName,
TopicConfig, TopicName,
},
validators, Consumer, DecodableMessage, EncodableMessage, Headers, Publisher,
};
use std::{error::Error as StdError, time::SystemTime};
use structopt::StructOpt;

const USER_CREATED_TOPIC: &str = "user.created";
const USER_UPDATED_TOPIC: &str = "user.updated";

/// The input data, representing some user being created with the given name
#[derive(PartialEq, Eq, prost::Message)]
struct UserCreatedMessage {
#[prost(string, tag = "1")]
name: String,
}

impl EncodableMessage for UserCreatedMessage {
type Error = validators::ProstValidatorError;
type Validator = validators::ProstValidator;
fn topic(&self) -> hedwig::Topic {
USER_CREATED_TOPIC.into()
}
fn encode(&self, validator: &Self::Validator) -> Result<hedwig::ValidatedMessage, Self::Error> {
Ok(validator.validate(
uuid::Uuid::new_v4(),
SystemTime::now(),
"user.created/1.0",
Headers::new(),
self,
)?)
}
}

impl DecodableMessage for UserCreatedMessage {
type Error = validators::ProstDecodeError<validators::prost::SchemaMismatchError>;
type Decoder =
validators::ProstDecoder<validators::prost::ExactSchemaMatcher<UserCreatedMessage>>;

fn decode(msg: hedwig::ValidatedMessage, decoder: &Self::Decoder) -> Result<Self, Self::Error> {
decoder.decode(msg)
}
}

/// The output data, where the given user has now been assigned an ID and some metadata
#[derive(PartialEq, Eq, prost::Message)]
struct UserUpdatedMessage {
#[prost(string, tag = "1")]
name: String,

#[prost(int64, tag = "2")]
id: i64,

#[prost(string, tag = "3")]
metadata: String,
}

/// The output message will carry an ack token from the input message, to ack when the output is
/// successfully published, or nack on failure
#[derive(Debug)]
struct TransformedMessage(PubSubMessage<UserUpdatedMessage>);

impl EncodableMessage for TransformedMessage {
type Error = validators::ProstValidatorError;
type Validator = validators::ProstValidator;

fn topic(&self) -> hedwig::Topic {
USER_UPDATED_TOPIC.into()
}

fn encode(&self, validator: &Self::Validator) -> Result<hedwig::ValidatedMessage, Self::Error> {
Ok(validator.validate(
uuid::Uuid::new_v4(),
SystemTime::now(),
"user.updated/1.0",
Headers::new(),
&self.0.message,
)?)
}
}

#[derive(Debug, StructOpt)]
struct Args {
/// The name of the pubsub project
#[structopt(long)]
project_name: String,
}

#[tokio::main(flavor = "current_thread")]
async fn main() -> Result<(), Box<dyn StdError>> {
let args = Args::from_args();

println!("Building PubSub clients");

let builder = ClientBuilder::new(
ClientBuilderConfig::new().auth_flow(AuthFlow::ServiceAccount(ServiceAccountAuth::EnvVar)),
PubSubConfig::default(),
)
.await?;

let input_topic_name = TopicName::new(USER_CREATED_TOPIC);
let subscription_name = SubscriptionName::new("user-metadata-updaters");

let output_topic_name = TopicName::new(USER_UPDATED_TOPIC);
const APP_NAME: &str = "user-metadata-updater";

let mut publisher_client = builder
.build_publisher(&args.project_name, APP_NAME)
.await?;
let mut consumer_client = builder.build_consumer(&args.project_name, APP_NAME).await?;

for topic_name in [&input_topic_name, &output_topic_name] {
println!("Creating topic {:?}", topic_name);

publisher_client
.create_topic(TopicConfig {
name: topic_name.clone(),
..TopicConfig::default()
})
.await?;
}

println!("Creating subscription {:?}", &subscription_name);

consumer_client
.create_subscription(SubscriptionConfig {
topic: input_topic_name.clone(),
name: subscription_name.clone(),
..SubscriptionConfig::default()
})
.await?;

println!(
"Synthesizing input messages for topic {:?}",
&input_topic_name
);

{
let validator = validators::ProstValidator::new();
let mut input_sink =
Publisher::<UserCreatedMessage>::publish_sink(publisher_client.publisher(), validator);

for i in 1..=10 {
let message = UserCreatedMessage {
name: format!("Example Name #{}", i),
};

input_sink.feed(message).await?;
}
input_sink.flush().await?;
}

println!("Ingesting input messages, applying transformations, and publishing to destination");

let mut read_stream = consumer_client
.stream_subscription(
subscription_name.clone(),
StreamSubscriptionConfig::default(),
)
.consume::<UserCreatedMessage>(hedwig::validators::ProstDecoder::new(
hedwig::validators::prost::ExactSchemaMatcher::new("user.created/1.0"),
));

let mut output_sink = Publisher::<TransformedMessage, _>::publish_sink_with_responses(
publisher_client.publisher(),
validators::ProstValidator::new(),
futures_util::sink::unfold((), |_, message: TransformedMessage| async move {
// if the output is successfully sent, ack the input to mark it as processed
message.0.ack().await.map(|_success| ())
}),
);

for i in 1..=10 {
let PubSubMessage { ack_token, message } = read_stream
.next()
.await
.expect("stream should have 10 elements")?;

assert_eq!(&message.name, &format!("Example Name #{}", i));

let transformed = TransformedMessage(PubSubMessage {
ack_token,
message: UserUpdatedMessage {
name: message.name,
id: random_id(),
metadata: "some metadata".into(),
},
});

output_sink
.feed(transformed)
.or_else(|publish_error| async move {
// if publishing fails, nack the failed messages to allow later retries
Err(match publish_error {
PublishError::Publish { cause, messages } => {
for failed_transform in messages {
failed_transform.0.nack().await?;
}
Box::<dyn StdError>::from(cause)
}
err => Box::<dyn StdError>::from(err),
})
})
.await?
}
output_sink.flush().await?;

println!("All messages matched and published successfully!");

println!("Deleting subscription {:?}", &subscription_name);

consumer_client
.delete_subscription(subscription_name)
.await?;

for topic_name in [input_topic_name, output_topic_name] {
println!("Deleting topic {:?}", &topic_name);

publisher_client.delete_topic(topic_name).await?;
}

println!("Done");

Ok(())
}

fn random_id() -> i64 {
4 // chosen by fair dice roll.
// guaranteed to be random.
rnarubin marked this conversation as resolved.
Show resolved Hide resolved
}
Loading