Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adding code for pubsub connectors #1007

Merged
merged 5 commits into from
May 17, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 12 additions & 8 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -28,14 +28,18 @@
## 0.11.1

### New features
* Add `tremor_value::structurize` convenience fn
* Change `qos::wal` operator to only require one of `max_elements` or `max_bytes` (using both is still possible).
* Add DNS sink
* Add syslog codec.
* Add `$udp.host` and `$udp.port` to allow controling udp packet destinations on a per event basis.
* Deprecate `udp.dst_*` config, introduce `udp.bind.*` config instead.
* Allow insights/contraflow events to traverse through multiple connected pipelines
* Add branch/hash version output to versions of tremor not built on `main` branch

- Add `tremor_value::structurize` convenience fn
- Change `qos::wal` operator to only require one of `max_elements` or `max_bytes` (using both is still possible).
- Add DNS sink
- Add syslog codec.
- Add `$udp.host` and `$udp.port` to allow controling udp packet destinations on a per event basis.
- Deprecate `udp.dst_*` config, introduce `udp.bind.*` config instead.
- Allow insights/contraflow events to traverse through multiple connected pipelines

- Add GCP Cloud Storage linked sink connector.
- Add textual-length-prefix pre and postprocessor.
- Add GCP Pubsub sink and source connector.

### Fixes

Expand Down
22 changes: 22 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,8 @@ tremor-otelapis = "0.1"
gouth = {version = "0.2"}
http = "0.2.4"
reqwest = "0.11.3"
googapis = { version = "0.4.2", default-features = false, features = ["google-pubsub-v1"] }


[dependencies.tungstenite]
default-features = false
Expand Down
2 changes: 2 additions & 0 deletions src/connectors/gcp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,4 +13,6 @@
// limitations under the License.

pub(crate) mod auth;
pub(crate) mod pubsub;
pub(crate) mod pubsub_auth;
pub(crate) mod storage;
92 changes: 92 additions & 0 deletions src/connectors/gcp/pubsub.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
// Copyright 2020-2021, The Tremor Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#![cfg(not(tarpaulin_include))]

use crate::errors::{Error, Result};
use googapis::google::pubsub::v1::{
publisher_client::PublisherClient, subscriber_client::SubscriberClient,
};
use googapis::google::pubsub::v1::{
AcknowledgeRequest, PublishRequest, PubsubMessage, PullRequest, PullResponse,
};
use std::collections::HashMap;
use tonic::transport::Channel;

pub(crate) async fn send_message(
client: &mut PublisherClient<Channel>,
project_id: &str,
topic_name: &str,
data_val: &[u8],
) -> Result<String> {
let message = PubsubMessage {
data: data_val.to_vec(),
attributes: HashMap::new(),
message_id: "".into(),
publish_time: None,
ordering_key: "".to_string(),
};

let response = client
.publish(PublishRequest {
topic: format!("projects/{}/topics/{}", project_id, topic_name),
messages: vec![message],
})
.await?;
let p = response.into_inner();
let res = p
.message_ids
.get(0)
.ok_or_else(|| Error::from("Failed to get message id"))?;
Ok(res.to_string())
}

pub(crate) async fn receive_message(
client: &mut SubscriberClient<Channel>,
project_id: &str,
subscription_name: &str,
) -> Result<PullResponse> {
// TODO: Use streaming pull
#[allow(warnings)]
Licenser marked this conversation as resolved.
Show resolved Hide resolved
// to allow use of deprecated field googapis::google::pubsub::v1::PullRequest::return_immediately
let response = client
.pull(PullRequest {
subscription: format!(
"projects/{}/subscriptions/{}",
project_id, subscription_name
),
max_messages: 50,
return_immediately: false,
})
.await?;
Ok(response.into_inner())
}

pub(crate) async fn acknowledge(
client: &mut SubscriberClient<Channel>,
project_id: &str,
subscription_name: &str,
ack_ids: Vec<String>,
) -> Result<()> {
client
.acknowledge(AcknowledgeRequest {
subscription: format!(
"projects/{}/subscriptions/{}",
project_id, subscription_name
),
ack_ids,
})
.await?;
Ok(())
}
78 changes: 78 additions & 0 deletions src/connectors/gcp/pubsub_auth.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
// Copyright 2020-2021, The Tremor Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

Licenser marked this conversation as resolved.
Show resolved Hide resolved
#![cfg(not(tarpaulin_include))]

use crate::errors::Result;
/// Using `GOOGLE_APPLICATION_CREDENTIALS="<path to service token json file>"` this function
/// will authenticate against the google cloud platform using the authentication flow defined
/// in the file. The provided PEM file should be a current non-revoked and complete copy of the
/// required certificate chain for the Google Cloud Platform.
use googapis::google::pubsub::v1::{
publisher_client::PublisherClient, subscriber_client::SubscriberClient,
};
use googapis::CERTIFICATES;
use gouth::Token;
// use http_types::headers;
use tonic::{
metadata::MetadataValue,
transport::{Certificate, Channel, ClientTlsConfig},
Request, Status,
};

pub(crate) async fn setup_publisher_client() -> Result<PublisherClient<Channel>> {
let token = Token::new()?;
let tls_config = ClientTlsConfig::new()
.ca_certificate(Certificate::from_pem(CERTIFICATES))
.domain_name("pubsub.googleapis.com");

let channel = Channel::from_static(r#"https://pubsub.googleapis.com/pubsub/v1"#)
.tls_config(tls_config)?
.connect()
.await?;

let service = PublisherClient::with_interceptor(channel, move |mut req: Request<()>| {
let token = &*token
.header_value()
.map_err(|_| Status::unauthenticated("Error getting token header value"))?;
let meta = MetadataValue::from_str(token)
.map_err(|_| Status::not_found("Error getting token header value"))?;
req.metadata_mut().insert("authorization", meta);
Ok(req)
});
Ok(service)
}

pub(crate) async fn setup_subscriber_client() -> Result<SubscriberClient<Channel>> {
let token = Token::new()?;
let tls_config = ClientTlsConfig::new()
.ca_certificate(Certificate::from_pem(CERTIFICATES))
.domain_name("pubsub.googleapis.com");

let channel = Channel::from_static(r#"https://pubsub.googleapis.com/pubsub/v1"#)
.tls_config(tls_config)?
.connect()
.await?;

let service = SubscriberClient::with_interceptor(channel, move |mut req: Request<()>| {
let token = &*token
.header_value()
.map_err(|_| Status::unauthenticated("Error getting token header value"))?;
let meta = MetadataValue::from_str(token)
.map_err(|_| Status::not_found("Error getting token header value"))?;
req.metadata_mut().insert("authorization", meta);
Ok(req)
});
Ok(service)
}
3 changes: 3 additions & 0 deletions src/errors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,9 @@ error_chain! {
GoogleAuthError(gouth::Error);
ReqwestError(reqwest::Error);
HttpHeaderError(http::header::InvalidHeaderValue);
TonicTransportError(tonic::transport::Error);
TonicStatusError(tonic::Status);

}

errors {
Expand Down
5 changes: 3 additions & 2 deletions src/offramp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,8 @@ use crate::permge::PriorityMerge;
use crate::pipeline;
use crate::registry::ServantId;
use crate::sink::{
self, blackhole, cb, debug, dns, elastic, exit, file, gcs, handle_response, kafka, kv, nats,
newrelic, otel, postgres, rest, stderr, stdout, tcp, udp, ws,
self, blackhole, cb, debug, dns, elastic, exit, file, gcs, gpub, handle_response, kafka, kv,
nats, newrelic, otel, postgres, rest, stderr, stdout, tcp, udp, ws,
};
use crate::source::Processors;
use crate::url::ports::{IN, METRICS};
Expand Down Expand Up @@ -128,6 +128,7 @@ pub fn lookup(name: &str, config: &Option<OpConfig>) -> Result<Box<dyn Offramp>>
"udp" => udp::Udp::from_config(config),
"ws" => ws::Ws::from_config(config),
"gcs" => gcs::GoogleCloudStorage::from_config(config),
"gpub" => gpub::GoogleCloudPubSub::from_config(config),
_ => Err(format!("Offramp {} not known", name).into()),
}
}
Expand Down
5 changes: 3 additions & 2 deletions src/onramp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,8 @@ use crate::pipeline;
use crate::repository::ServantId;
use crate::source::prelude::*;
use crate::source::{
blaster, cb, crononome, discord, file, kafka, metronome, nats, otel, postgres, rest, stdin,
tcp, udp, ws,
blaster, cb, crononome, discord, file, gsub, kafka, metronome, nats, otel, postgres, rest,
stdin, tcp, udp, ws,
};
use crate::url::TremorUrl;
use async_std::task::{self, JoinHandle};
Expand Down Expand Up @@ -85,6 +85,7 @@ pub(crate) fn lookup(
"discord" => discord::Discord::from_config(id, config),
"otel" => otel::OpenTelemetry::from_config(id, config),
"nats" => nats::Nats::from_config(id, config),
"gsub" => gsub::GoogleCloudPubSub::from_config(id, config),
_ => Err(format!("[onramp:{}] Onramp type {} not known", id, name).into()),
}
}
Expand Down
1 change: 1 addition & 0 deletions src/sink.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ pub(crate) mod elastic;
pub(crate) mod exit;
pub(crate) mod file;
pub(crate) mod gcs;
pub(crate) mod gpub;
pub(crate) mod kafka;
pub(crate) mod kv;
pub(crate) mod nats;
Expand Down
Loading