Skip to content

Commit

Permalink
Make Topic a newtype (standard-ai#29)
Browse files Browse the repository at this point in the history
Turns out having anything with any sort of lifetime in stream/sink
chains will trigger nasty issues such as
rust-lang/rust#79648 which is pretty difficult
to figure out how to work around.

Making `Topic` a newtype erases the lifetime from the type, making it
significantly easier to work with in thsoe contexts.
  • Loading branch information
nagisa authored and rnarubin committed Mar 29, 2021
1 parent 9dc5917 commit ea669ac
Show file tree
Hide file tree
Showing 8 changed files with 41 additions and 22 deletions.
4 changes: 2 additions & 2 deletions examples/publish.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,8 @@ struct UserCreatedMessage {
impl<'a> EncodableMessage for &'a UserCreatedMessage {
type Error = hedwig::validators::JsonSchemaValidatorError;
type Validator = hedwig::validators::JsonSchemaValidator;
fn topic(&self) -> &'static str {
"user.created"
fn topic(&self) -> hedwig::Topic {
"user.created".into()
}
fn encode(self, validator: &Self::Validator) -> Result<hedwig::ValidatedMessage, Self::Error> {
Ok(validator
Expand Down
9 changes: 3 additions & 6 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@
//! impl<'a> hedwig::publish::EncodableMessage for &'a UserCreatedMessage {
//! type Error = hedwig::validators::JsonSchemaValidatorError;
//! type Validator = hedwig::validators::JsonSchemaValidator;
//! fn topic(&self) -> hedwig::Topic { "user.created" }
//! fn topic(&self) -> hedwig::Topic { "user.created".into() }
//! fn encode(self, validator: &Self::Validator)
//! -> Result<hedwig::ValidatedMessage, Self::Error> {
//! validator.validate(
Expand Down Expand Up @@ -107,20 +107,17 @@
#![cfg_attr(docsrs, feature(doc_cfg))]

use std::{collections::BTreeMap, time::SystemTime};

pub use topic::Topic;
use uuid::Uuid;

#[cfg(feature = "publish")]
#[cfg_attr(docsrs, doc(cfg(feature = "publish")))]
pub mod publish;

#[cfg(test)]
mod tests;
mod topic;
pub mod validators;

/// A message queue topic name to which messages can be published
pub type Topic = &'static str;

/// All errors that may be returned when operating top level APIs.
#[derive(Debug, thiserror::Error)]
#[non_exhaustive]
Expand Down
2 changes: 1 addition & 1 deletion src/publish/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -185,7 +185,7 @@ where
None => None,
Some(stream_item) => Some((
stream_item,
this.topic,
*this.topic,
this.messages
.next()
.expect("should be as many messages as publishes"),
Expand Down
3 changes: 2 additions & 1 deletion src/publish/publishers/mock.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,8 @@ impl MockPublisher {
/// be published, was indeed published
///
/// Panics if the message was not published.
pub fn assert_message_published(&self, topic: Topic, uuid: &Uuid) {
pub fn assert_message_published<T: Into<Topic>>(&self, topic: T, uuid: &Uuid) {
let topic = topic.into();
{
let lock = self.0.lock().expect("this mutex cannot get poisoned");
for (mt, msg) in &lock[..] {
Expand Down
2 changes: 1 addition & 1 deletion src/publish/publishers/null.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ impl Publisher for NullPublisher {
type MessageError = std::convert::Infallible;
type PublishStream = NullPublishStream;

fn publish<'a, I>(&self, _: &'static str, messages: I) -> Self::PublishStream
fn publish<'a, I>(&self, _: crate::Topic, messages: I) -> Self::PublishStream
where
I: Iterator<Item = &'a ValidatedMessage> + ExactSizeIterator,
{
Expand Down
18 changes: 9 additions & 9 deletions src/publish/sink.rs
Original file line number Diff line number Diff line change
Expand Up @@ -272,7 +272,7 @@ mod test {
type Validator = TestValidator;

fn topic(&self) -> Topic {
"test_topic"
"test_topic".into()
}

fn encode(self, _: &Self::Validator) -> Result<ValidatedMessage, Self::Error> {
Expand Down Expand Up @@ -382,7 +382,7 @@ mod test {
assert_eq!(1, sink.sink.poll_close_called);
assert_eq!(
vec![(
"test_topic",
"test_topic".into(),
TestMessage("foo").encode(&TestValidator).unwrap()
)],
sink.sink.elements
Expand Down Expand Up @@ -517,14 +517,14 @@ mod test {
assert_eq!(
Ok(()),
sink.as_mut()
.start_send(("test_topic", test_validated_message("foo")))
.start_send(("test_topic".into(), test_validated_message("foo")))
);
}

/// The publisher should start flushing when the batch size has been exceeded
#[test]
fn batching_batches() {
let topic = "test_topic";
let topic = "test_topic".into();
let batch_size = 3;
let publisher = MockPublisher::new();
let sink = publisher_sink(publisher, batch_size);
Expand Down Expand Up @@ -565,7 +565,7 @@ mod test {
/// The publisher should flush buffered elements when asked to close
#[test]
fn close_flushes_batch() {
let topic = "test_topic";
let topic = "test_topic".into();
let batch_size = 3;
let publisher = MockPublisher::new();
let sink = publisher_sink(publisher, batch_size);
Expand Down Expand Up @@ -599,7 +599,7 @@ mod test {
/// The publisher should flush buffered elements when asked to flush
#[test]
fn flush_incomplete_batch() {
let topic = "test_topic";
let topic = "test_topic".into();
let batch_size = 3;
let publisher = MockPublisher::new();
let sink = publisher_sink(publisher, batch_size);
Expand Down Expand Up @@ -635,7 +635,7 @@ mod test {
#[test]
#[should_panic]
fn panic_at_buffer_full_without_ready_check() {
let topic = "test_topic";
let topic = "test_topic".into();
let batch_size = 1;
let publisher = MockPublisher::new();
let sink = publisher_sink(publisher, batch_size);
Expand All @@ -656,7 +656,7 @@ mod test {
/// Step through flushing a non-full batch and see that yield points are respected
#[test]
fn partial_flushing_check() {
let topic = "test_topic";
let topic = "test_topic".into();
let batch_size = 3;
let (publisher, command) = ControlledPublisher::new();
let sink = publisher_sink(publisher, batch_size);
Expand Down Expand Up @@ -724,7 +724,7 @@ mod test {
/// A failed message can be re-sent to the sink and eventually succeed
#[test]
fn flushing_error_retry() {
let topic = "test_topic";
let topic = "test_topic".into();
let batch_size = 5;
let (publisher, command) = ControlledPublisher::new();
let sink = publisher_sink(publisher, batch_size);
Expand Down
4 changes: 2 additions & 2 deletions src/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -71,8 +71,8 @@ impl<'a, I: serde::Serialize> EncodableMessage for &'a JsonUserCreatedMessage<I>
type Error = validators::JsonSchemaValidatorError;
type Validator = validators::JsonSchemaValidator;

fn topic(&self) -> &'static str {
"user.created"
fn topic(&self) -> crate::Topic {
"user.created".into()
}
fn encode(self, validator: &Self::Validator) -> Result<ValidatedMessage, Self::Error> {
validator.validate(
Expand Down
21 changes: 21 additions & 0 deletions src/topic.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
/// A message queue topic name to which messages can be published
#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash, Default)]
pub struct Topic(&'static str);

impl std::fmt::Display for Topic {
fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
std::fmt::Display::fmt(self.0, f)
}
}

impl From<&'static str> for Topic {
fn from(s: &'static str) -> Topic {
Topic(s)
}
}

impl From<Topic> for &'static str {
fn from(s: Topic) -> &'static str {
s.0
}
}

0 comments on commit ea669ac

Please sign in to comment.