Skip to content

Commit

Permalink
feat: implement subscribe_once api
Browse files Browse the repository at this point in the history
  • Loading branch information
Michael Xu committed Apr 15, 2023
1 parent 263a4d2 commit 6be7a91
Showing 1 changed file with 35 additions and 0 deletions.
35 changes: 35 additions & 0 deletions crates/xmtp-networking/src/grpc_api_helper.rs
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,41 @@ pub async fn publish_serialized(
Ok(json)
}

// Subscribe to a topic and get a stream of messages, but as soon as you get on message, subscribe
// the consumer will call this method again to get the next message
pub async fn subscribe_once(
host: String,
topics: Vec<String>,
) -> Result<v1::Envelope, tonic::Status> {
let mut client = v1::message_api_client::MessageApiClient::connect(host)
.await
.map_err(|e| tonic::Status::new(tonic::Code::Internal, format!("{}", e)))?;
let request = v1::SubscribeRequest {
content_topics: topics,
..Default::default()
};
let mut stream = client.subscribe(request).await?.into_inner();
// Get the first message from the stream
let response = stream.message().await;
// If Option has Envelope, return it, otherwise return an error
response
.map(|e| e.unwrap())
.map_err(|e| tonic::Status::new(tonic::Code::Internal, format!("{}", e)))
}

// Subscribe serialized version
pub async fn subscribe_once_serialized(
host: String,
topics: Vec<String>,
) -> Result<String, String> {
let response = subscribe_once(host, topics)
.await
.map_err(|e| format!("{}", e))?;
// Response is a v1::Envelope protobuf message, which we need to serialize to JSON
let json = serde_json::to_string(&response).map_err(|e| format!("{}", e))?;
Ok(json)
}

// Return the json serialization of an Envelope with bytes
pub fn test_envelope() -> String {
let envelope = v1::Envelope {
Expand Down

0 comments on commit 6be7a91

Please sign in to comment.