Skip to content

Commit

Permalink
Use "binding key" lingo for super stream creation
Browse files Browse the repository at this point in the history
Instead of "routing key".
  • Loading branch information
acogoluegnes committed Nov 13, 2023
1 parent 6868fb3 commit 57376da
Show file tree
Hide file tree
Showing 7 changed files with 52 additions and 52 deletions.
6 changes: 3 additions & 3 deletions src/docs/asciidoc/super-streams.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -83,12 +83,12 @@ The super stream partitions will be `invoices-0`, `invoices-1`, ..., `invoices-5
We use this kind of topology when routing keys of outbound messages are hashed to pick the partition to publish them to.
This way, if the routing key is the customer ID of the invoice, all the invoices for a given customer end up in the same partition, and they can be processed in the publishing order.

It is also possible to specify routing keys when creating a super stream:
It is also possible to specify binding keys when creating a super stream:

.Creating a super stream by specifying the routing keys
.Creating a super stream by specifying the binding keys
[source,java,indent=0]
--------
include::{test-examples}/SuperStreamUsage.java[tag=creation-routing-keys]
include::{test-examples}/SuperStreamUsage.java[tag=creation-binding-keys]
--------

The super stream partitions will be `invoices-amer`, `invoices-emea` and `invoices-apac` in this case.
Expand Down
8 changes: 4 additions & 4 deletions src/main/java/com/rabbitmq/stream/StreamCreator.java
Original file line number Diff line number Diff line change
Expand Up @@ -174,22 +174,22 @@ interface SuperStreamConfiguration {
/**
* The number of partitions of the super stream.
*
* <p>Mutually exclusive with {@link #routingKeys(String...)}. Default is 3.
* <p>Mutually exclusive with {@link #bindingKeys(String...)}. Default is 3.
*
* @param partitions
* @return this super stream configuration instance
*/
SuperStreamConfiguration partitions(int partitions);

/**
* The routing keys to use when declaring the super stream partitions.
* The binding keys to use when declaring the super stream partitions.
*
* <p>Mutually exclusive with {@link #partitions(int)}. Default is null.
*
* @param routingKeys
* @param bindingKeys
* @return this super stream configuration instance
*/
SuperStreamConfiguration routingKeys(String... routingKeys);
SuperStreamConfiguration bindingKeys(String... bindingKeys);

/**
* Go back to the creator.
Expand Down
10 changes: 5 additions & 5 deletions src/main/java/com/rabbitmq/stream/impl/Client.java
Original file line number Diff line number Diff line change
Expand Up @@ -688,14 +688,14 @@ public Response create(String stream, Map<String, String> arguments) {
Response createSuperStream(
String superStream,
List<String> partitions,
List<String> routingKeys,
List<String> bindingKeys,
Map<String, String> arguments) {
this.superStreamManagementCommandVersionsCheck.run();
if (partitions.isEmpty() || routingKeys.isEmpty()) {
if (partitions.isEmpty() || bindingKeys.isEmpty()) {
throw new IllegalArgumentException(
"Partitions and routing keys of a super stream cannot be empty");
}
if (partitions.size() != routingKeys.size()) {
if (partitions.size() != bindingKeys.size()) {
throw new IllegalArgumentException(
"Partitions and routing keys of a super stream must have "
+ "the same number of elements");
Expand All @@ -708,7 +708,7 @@ Response createSuperStream(
+ 2
+ superStream.length()
+ collectionSize(partitions)
+ collectionSize(routingKeys)
+ collectionSize(bindingKeys)
+ mapSize(arguments);
int correlationId = correlationSequence.incrementAndGet();
try {
Expand All @@ -720,7 +720,7 @@ Response createSuperStream(
bb.writeShort(superStream.length());
bb.writeBytes(superStream.getBytes(CHARSET));
writeCollection(bb, partitions);
writeCollection(bb, routingKeys);
writeCollection(bb, bindingKeys);
writeMap(bb, arguments);
OutstandingRequest<Response> request = outstandingRequest();
outstandingRequests.put(correlationId, request);
Expand Down
24 changes: 12 additions & 12 deletions src/main/java/com/rabbitmq/stream/impl/StreamStreamCreator.java
Original file line number Diff line number Diff line change
Expand Up @@ -101,28 +101,28 @@ public void create() {
Function<Client, Client.Response> function;
boolean superStream = this.superStreamConfiguration != null;
if (superStream) {
List<String> partitions, routingKeys;
if (this.superStreamConfiguration.routingKeys == null) {
List<String> partitions, bindingKeys;
if (this.superStreamConfiguration.bindingKeys == null) {
partitions =
IntStream.range(0, this.superStreamConfiguration.partitions)
.mapToObj(i -> this.name + "-" + i)
.collect(toList());
routingKeys =
bindingKeys =
IntStream.range(0, this.superStreamConfiguration.partitions)
.mapToObj(String::valueOf)
.collect(toList());
} else {
partitions =
this.superStreamConfiguration.routingKeys.stream()
this.superStreamConfiguration.bindingKeys.stream()
.map(rk -> this.name + "-" + rk)
.collect(toList());
routingKeys = this.superStreamConfiguration.routingKeys;
bindingKeys = this.superStreamConfiguration.bindingKeys;
}
function =
namedFunction(
c ->
c.createSuperStream(
this.name, partitions, routingKeys, streamParametersBuilder.build()),
this.name, partitions, bindingKeys, streamParametersBuilder.build()),
"Creation of super stream '%s'",
this.name);
} else {
Expand Down Expand Up @@ -154,7 +154,7 @@ private static class DefaultSuperStreamConfiguration implements SuperStreamConfi
private final StreamCreator creator;

private int partitions = 3;
private List<String> routingKeys = null;
private List<String> bindingKeys = null;

private DefaultSuperStreamConfiguration(StreamCreator creator) {
this.creator = creator;
Expand All @@ -166,16 +166,16 @@ public SuperStreamConfiguration partitions(int partitions) {
throw new IllegalArgumentException("The number of partitions must be greater than 0");
}
this.partitions = partitions;
this.routingKeys = null;
this.bindingKeys = null;
return this;
}

@Override
public SuperStreamConfiguration routingKeys(String... routingKeys) {
if (routingKeys == null || routingKeys.length == 0) {
throw new IllegalArgumentException("There must be at least 1 routing key");
public SuperStreamConfiguration bindingKeys(String... bindingKeys) {
if (bindingKeys == null || bindingKeys.length == 0) {
throw new IllegalArgumentException("There must be at least 1 binding key");
}
this.routingKeys = Arrays.asList(routingKeys);
this.bindingKeys = Arrays.asList(bindingKeys);
this.partitions = -1;
return this;
}
Expand Down
6 changes: 3 additions & 3 deletions src/test/java/com/rabbitmq/stream/docs/SuperStreamUsage.java
Original file line number Diff line number Diff line change
Expand Up @@ -34,12 +34,12 @@ void creation() {
.partitions(5).creator()
.create();
// end::creation-partitions[]
// tag::creation-routing-keys[]
// tag::creation-binding-keys[]
environment.streamCreator().name("invoices")
.superStream()
.routingKeys("amer", "emea", "apac").creator()
.bindingKeys("amer", "emea", "apac").creator()
.create();
// end::creation-routing-keys[]
// end::creation-binding-keys[]
}

void producerSimple() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -527,23 +527,23 @@ void superStreamCreationSetPartitions(int partitionCount, TestInfo info) {

@Test
@BrokerVersionAtLeast(BrokerVersion.RABBITMQ_3_13_0)
void superStreamCreationSetRoutingKeys(TestInfo info) {
List<String> routingKeys = Arrays.asList("a", "b", "c", "d", "e");
void superStreamCreationSetBindingKeys(TestInfo info) {
List<String> bindingKeys = Arrays.asList("a", "b", "c", "d", "e");
String s = streamName(info);
Client client = cf.get();
Environment env = environmentBuilder.build();
try {
env.streamCreator()
.name(s)
.superStream()
.routingKeys(routingKeys.toArray(new String[] {}))
.bindingKeys(bindingKeys.toArray(new String[] {}))
.creator()
.create();

assertThat(client.partitions(s))
.hasSize(routingKeys.size())
.containsAll(routingKeys.stream().map(rk -> s + "-" + rk).collect(toList()));
routingKeys.forEach(rk -> assertThat(client.route(rk, s)).hasSize(1).contains(s + "-" + rk));
.hasSize(bindingKeys.size())
.containsAll(bindingKeys.stream().map(rk -> s + "-" + rk).collect(toList()));
bindingKeys.forEach(bk -> assertThat(client.route(bk, s)).hasSize(1).contains(s + "-" + bk));
} finally {
env.deleteSuperStream(s);
env.close();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,28 +42,28 @@ public class SuperStreamManagementTest {
static final int partitionCount = 3;
String s;
List<String> partitions;
List<String> routingKeys;
List<String> bindingKeys;

@BeforeEach
void init(TestInfo info) {
s = streamName(info);
partitions = partitions(s);
routingKeys = routingKeys();
bindingKeys = bindingKeys();
}

@Test
@TestUtils.BrokerVersionAtLeast(TestUtils.BrokerVersion.RABBITMQ_3_13_0)
void createDelete() {
Client c = cf.get();
Client.Response response = c.createSuperStream(s, partitions, routingKeys, null);
Client.Response response = c.createSuperStream(s, partitions, bindingKeys, null);
assertThat(response).is(ok());
assertThat(c.metadata(partitions))
.hasSameSizeAs(partitions)
.allSatisfy((s, streamMetadata) -> assertThat(streamMetadata.isResponseOk()).isTrue());
assertThat(c.partitions(s)).isEqualTo(partitions);
routingKeys.forEach(rk -> assertThat(c.route(rk, s)).hasSize(1).contains(s + "-" + rk));
bindingKeys.forEach(bk -> assertThat(c.route(bk, s)).hasSize(1).contains(s + "-" + bk));

response = c.createSuperStream(s, partitions, routingKeys, null);
response = c.createSuperStream(s, partitions, bindingKeys, null);
assertThat(response).is(ko()).is(responseCode(RESPONSE_CODE_STREAM_ALREADY_EXISTS));

response = c.deleteSuperStream(s);
Expand All @@ -75,7 +75,7 @@ void createDelete() {
assertThat(streamMetadata.getResponseCode())
.isEqualTo(RESPONSE_CODE_STREAM_DOES_NOT_EXIST));
assertThat(c.partitions(s)).isEmpty();
routingKeys.forEach(rk -> assertThat(c.route(rk, s)).isEmpty());
bindingKeys.forEach(bk -> assertThat(c.route(bk, s)).isEmpty());

response = c.deleteSuperStream(s);
assertThat(response).is(responseCode(RESPONSE_CODE_STREAM_DOES_NOT_EXIST));
Expand All @@ -85,7 +85,7 @@ void createDelete() {
@TestUtils.BrokerVersionAtLeast(TestUtils.BrokerVersion.RABBITMQ_3_13_0)
void clientWithSubscriptionShouldReceiveNotificationOnDeletion() throws Exception {
Client c = cf.get();
Client.Response response = c.createSuperStream(s, partitions, routingKeys, null);
Client.Response response = c.createSuperStream(s, partitions, bindingKeys, null);
assertThat(response).is(ok());
Map<String, Short> notifications = new ConcurrentHashMap<>(partitions.size());
AtomicInteger notificationCount = new AtomicInteger();
Expand Down Expand Up @@ -114,37 +114,37 @@ void clientWithSubscriptionShouldReceiveNotificationOnDeletion() throws Exceptio
@TestUtils.BrokerVersionAtLeast(TestUtils.BrokerVersion.RABBITMQ_3_13_0)
void authorisation() throws Exception {
String user = "stream";
// routing keys do not matter for authorisation
routingKeys = asList("1", "2", "3");
// binding keys do not matter for authorisation
bindingKeys = asList("1", "2", "3");
try {
addUser(user, user);
setPermissions(user, asList("stream|partition.*$", "partition.*$", "stream.*$"));
Client c = cf.get(new Client.ClientParameters().username(user).password(user));
Client.Response response = c.createSuperStream("not-allowed", partitions, routingKeys, null);
Client.Response response = c.createSuperStream("not-allowed", partitions, bindingKeys, null);
assertThat(response).is(ko()).is(responseCode(RESPONSE_CODE_ACCESS_REFUSED));

s = name("stream");
response = c.createSuperStream(s, asList("1", "2", "3"), routingKeys, null);
response = c.createSuperStream(s, asList("1", "2", "3"), bindingKeys, null);
assertThat(response).is(ko()).is(responseCode(RESPONSE_CODE_ACCESS_REFUSED));

partitions = range(0, partitionCount).mapToObj(i -> s + "-" + i).collect(toList());
// we can create the queues, but can't bind them, as it requires write permission
response = c.createSuperStream(s, partitions, routingKeys, null);
response = c.createSuperStream(s, partitions, bindingKeys, null);
assertThat(response).is(ko()).is(responseCode(RESPONSE_CODE_ACCESS_REFUSED));

String partitionName = name("partition");
partitions =
range(0, partitionCount).mapToObj(i -> partitionName + "-" + i).collect(toList());
response = c.createSuperStream(s, partitions, routingKeys, null);
response = c.createSuperStream(s, partitions, bindingKeys, null);
assertThat(response).is(ok());

assertThat(c.metadata(partitions))
.hasSameSizeAs(partitions)
.allSatisfy((s, streamMetadata) -> assertThat(streamMetadata.isResponseOk()).isTrue());
assertThat(c.partitions(s)).isEqualTo(partitions);
for (int i = 0; i < routingKeys.size(); i++) {
String rk = routingKeys.get(i);
assertThat(c.route(rk, s)).hasSize(1).contains(partitions.get(i));
for (int i = 0; i < bindingKeys.size(); i++) {
String bk = bindingKeys.get(i);
assertThat(c.route(bk, s)).hasSize(1).contains(partitions.get(i));
}

response = c.deleteSuperStream(s);
Expand All @@ -154,11 +154,11 @@ void authorisation() throws Exception {
}
}

private static List<String> routingKeys() {
return routingKeys(partitionCount);
private static List<String> bindingKeys() {
return bindingKeys(partitionCount);
}

private static List<String> routingKeys(int partitions) {
private static List<String> bindingKeys(int partitions) {
return range(0, partitions).mapToObj(String::valueOf).collect(toList());
}

Expand Down

0 comments on commit 57376da

Please sign in to comment.