Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

#1748 Add honoTenantId configuration for HonoConnection. #1788

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,6 @@
import java.text.MessageFormat;
import java.util.Set;

import org.eclipse.ditto.connectivity.model.Connection;
import org.eclipse.ditto.connectivity.model.ConnectionId;
import org.eclipse.ditto.connectivity.model.HonoAddressAlias;
import org.eclipse.ditto.connectivity.model.UserPasswordCredentials;
import org.eclipse.ditto.connectivity.service.config.DefaultHonoConfig;
Expand All @@ -35,8 +33,6 @@ public final class DefaultHonoConnectionFactory extends HonoConnectionFactory {

private final HonoConfig honoConfig;

private ConnectionId connectionId;

/**
* Constructs a {@code DefaultHonoConnectionFactory} for the specified arguments.
*
Expand All @@ -48,11 +44,6 @@ public DefaultHonoConnectionFactory(final ActorSystem actorSystem, final Config
honoConfig = new DefaultHonoConfig(actorSystem);
}

@Override
protected void preConversion(final Connection honoConnection) {
connectionId = honoConnection.getId();
}

@Override
public URI getBaseUri() {
return honoConfig.getBaseUri();
Expand Down Expand Up @@ -86,13 +77,13 @@ protected UserPasswordCredentials getCredentials() {
@Override
protected String resolveSourceAddress(final HonoAddressAlias honoAddressAlias) {
return MessageFormat.format("hono.{0}.{1}",
honoAddressAlias.getAliasValue(), connectionId);
honoAddressAlias.getAliasValue(), getHonoTenantId());
}

@Override
protected String resolveTargetAddress(final HonoAddressAlias honoAddressAlias) {
return MessageFormat.format("hono.{0}.{1}/'{{thing:id}}'",
honoAddressAlias.getAliasValue(), connectionId);
honoAddressAlias.getAliasValue(), getHonoTenantId());
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,13 @@
*/
public abstract class HonoConnectionFactory implements DittoExtensionPoint {

/**
* The name of the property in the {@code specificConfig} object containing the Hono tenant identifier.
*/
protected static final String SPEC_CONFIG_HONO_TENANT_ID = "honoTenantId";

private String honoTenantId;

/**
* Constructs a {@code HonoConnectionFactory}.
*/
Expand Down Expand Up @@ -112,6 +119,9 @@ public Connection getHonoConnection(final Connection connection) {
connection.getConnectionType())
);

honoTenantId = connection.getSpecificConfig()
.getOrDefault(SPEC_CONFIG_HONO_TENANT_ID, connection.getId().toString());

preConversion(connection);

return ConnectivityModelFactory.newConnectionBuilder(connection)
Expand All @@ -134,6 +144,18 @@ protected void preConversion(final Connection honoConnection) {
// Do nothing by default.
}

/**
* Get the Hono tenant identifier associated with the connection.
*
* @return The Hono tenant identifier.
*/
protected String getHonoTenantId() {
if (honoTenantId == null) {
throw new IllegalStateException("getHonoTenantId invoked before tenant id got determined");
}
return honoTenantId;
}

protected abstract URI getBaseUri();

protected abstract boolean isValidateCertificates();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@

import org.assertj.core.api.Assertions;
import org.eclipse.ditto.connectivity.model.Connection;
import org.eclipse.ditto.connectivity.model.ConnectionId;
import org.eclipse.ditto.connectivity.model.ConnectivityModelFactory;
import org.eclipse.ditto.connectivity.model.HonoAddressAlias;
import org.eclipse.ditto.connectivity.model.ReplyTarget;
Expand Down Expand Up @@ -58,7 +57,7 @@ public final class DefaultHonoConnectionFactoryTest {

private HonoConfig honoConfig;

private static Connection generateConnectionObjectFromJsonFile( String fileName) throws IOException {
private static Connection generateConnectionObjectFromJsonFile(final String fileName) throws IOException {
final var testClassLoader = DefaultHonoConnectionFactoryTest.class.getClassLoader();
try (final var connectionJsonFileStreamReader = new InputStreamReader(
testClassLoader.getResourceAsStream(fileName)
Expand All @@ -85,13 +84,26 @@ public void newInstanceWithNullActorSystemThrowsException() {
public void getHonoConnectionWithCustomMappingsReturnsExpected() throws IOException {
final var userProvidedHonoConnection =
generateConnectionObjectFromJsonFile("hono-connection-custom-test.json");
final var expectedHonoConnection =
generateConnectionObjectFromJsonFile("hono-connection-custom-expected.json");
final var expectedAdaptedConnection =
generateConnectionObjectFromJsonFile("hono-connection-custom-expected-after-adaptation.json");

final var underTest =
new DefaultHonoConnectionFactory(actorSystemResource.getActorSystem(), ConfigFactory.empty());

assertThat(underTest.getHonoConnection(userProvidedHonoConnection)).isEqualTo(expectedHonoConnection);
assertThat(underTest.getHonoConnection(userProvidedHonoConnection)).isEqualTo(expectedAdaptedConnection);
}

@Test
public void getHonoConnectionWithImplicitTenantIdAndCustomMappingsReturnsExpected() throws IOException {
final var userProvidedHonoConnection =
generateConnectionObjectFromJsonFile("hono-connection-implicit-tenant-custom-test.json");
final var expectedAdaptedConnection =
generateConnectionObjectFromJsonFile("hono-connection-implicit-tenant-custom-expected-after-adaptation.json");

final var underTest =
new DefaultHonoConnectionFactory(actorSystemResource.getActorSystem(), ConfigFactory.empty());

assertThat(underTest.getHonoConnection(userProvidedHonoConnection)).isEqualTo(expectedAdaptedConnection);
}

@Test
Expand All @@ -103,11 +115,11 @@ public void getHonoConnectionWithDefaultMappingReturnsExpected() throws IOExcept
new DefaultHonoConnectionFactory(actorSystemResource.getActorSystem(), ConfigFactory.empty());

assertThat(underTest.getHonoConnection(userProvidedHonoConnection))
.isEqualTo(getExpectedHonoConnection(userProvidedHonoConnection));
.isEqualTo(getExpectedAdaptedHonoConnection(userProvidedHonoConnection));
}

@SuppressWarnings("unchecked")
private Connection getExpectedHonoConnection(final Connection originalConnection) {
private Connection getExpectedAdaptedHonoConnection(final Connection originalConnection) {
final var sourcesByAddress = getSourcesByAddress(originalConnection.getSources());
final var commandReplyTargetHeaderMapping = ConnectivityModelFactory.newHeaderMapping(Map.of(
"correlation-id", "{{ header:correlation-id }}",
Expand All @@ -122,6 +134,9 @@ private Connection getExpectedHonoConnection(final Connection originalConnection
"subject", "{{ header:subject | fn:default(topic:action-subject) }}"
);
final var connectionId = originalConnection.getId();
final String honoTenantId = originalConnection.getSpecificConfig()
.getOrDefault(DefaultHonoConnectionFactory.SPEC_CONFIG_HONO_TENANT_ID, connectionId.toString());
final String expectedResolvedCommandTargetAddress = getExpectedResolvedCommandTargetAddress(honoTenantId);
return ConnectivityModelFactory.newConnectionBuilder(originalConnection)
.uri(honoConfig.getBaseUri().toString().replaceFirst("(\\S+://)(\\S+)",
"$1" + URLEncoder.encode(honoConfig.getUserPasswordCredentials().getUsername(), StandardCharsets.UTF_8)
Expand All @@ -135,22 +150,22 @@ private Connection getExpectedHonoConnection(final Connection originalConnection
)
.setSources(List.of(
ConnectivityModelFactory.newSourceBuilder(sourcesByAddress.get(TELEMETRY.getAliasValue()))
.addresses(Set.of(getExpectedResolvedSourceAddress(TELEMETRY, connectionId)))
.addresses(Set.of(getExpectedResolvedSourceAddress(TELEMETRY, honoTenantId)))
.replyTarget(ReplyTarget.newBuilder()
.address(getExpectedResolvedCommandTargetAddress(connectionId))
.address(expectedResolvedCommandTargetAddress)
.headerMapping(commandReplyTargetHeaderMapping)
.build())
.build(),
ConnectivityModelFactory.newSourceBuilder(sourcesByAddress.get(EVENT.getAliasValue()))
.addresses(Set.of(getExpectedResolvedSourceAddress(EVENT, connectionId)))
.addresses(Set.of(getExpectedResolvedSourceAddress(EVENT, honoTenantId)))
.replyTarget(ReplyTarget.newBuilder()
.address(getExpectedResolvedCommandTargetAddress(connectionId))
.address(expectedResolvedCommandTargetAddress)
.headerMapping(commandReplyTargetHeaderMapping)
.build())
.build(),
ConnectivityModelFactory.newSourceBuilder(
sourcesByAddress.get(COMMAND_RESPONSE.getAliasValue()))
.addresses(Set.of(getExpectedResolvedSourceAddress(COMMAND_RESPONSE, connectionId)))
.addresses(Set.of(getExpectedResolvedSourceAddress(COMMAND_RESPONSE, honoTenantId)))
.headerMapping(ConnectivityModelFactory.newHeaderMapping(Map.of(
"correlation-id", "{{ header:correlation-id }}",
"status", "{{ header:status }}"
Expand All @@ -159,8 +174,8 @@ private Connection getExpectedHonoConnection(final Connection originalConnection
))
.setTargets(List.of(
ConnectivityModelFactory.newTargetBuilder(targets.get(0))
.address(getExpectedResolvedCommandTargetAddress(connectionId))
.originalAddress(getExpectedResolvedCommandTargetAddress(connectionId))
.address(expectedResolvedCommandTargetAddress)
.originalAddress(expectedResolvedCommandTargetAddress)
.headerMapping(ConnectivityModelFactory.newHeaderMapping(
Stream.concat(
basicAdditionalTargetHeaderMappingEntries.entrySet().stream(),
Expand All @@ -170,8 +185,8 @@ private Connection getExpectedHonoConnection(final Connection originalConnection
))
.build(),
ConnectivityModelFactory.newTargetBuilder(targets.get(1))
.address(getExpectedResolvedCommandTargetAddress(connectionId))
.originalAddress(getExpectedResolvedCommandTargetAddress(connectionId))
.address(expectedResolvedCommandTargetAddress)
.originalAddress(expectedResolvedCommandTargetAddress)
.headerMapping(ConnectivityModelFactory.newHeaderMapping(
basicAdditionalTargetHeaderMappingEntries
))
Expand All @@ -186,12 +201,12 @@ private static Map<String, Source> getSourcesByAddress(final Iterable<Source> so
return result;
}

private static String getExpectedResolvedSourceAddress(final HonoAddressAlias honoAddressAlias, final ConnectionId connectionId) {
return "hono." + honoAddressAlias.getAliasValue() + "." + connectionId;
private static String getExpectedResolvedSourceAddress(final HonoAddressAlias honoAddressAlias, final String honoTenantId) {
return "hono." + honoAddressAlias.getAliasValue() + "." + honoTenantId;
}

private static String getExpectedResolvedCommandTargetAddress(final ConnectionId connectionId) {
return "hono." + HonoAddressAlias.COMMAND.getAliasValue() + "." + connectionId + "/{{thing:id}}";
private static String getExpectedResolvedCommandTargetAddress(final String honoTenantId) {
return "hono." + HonoAddressAlias.COMMAND.getAliasValue() + "." + honoTenantId + "/{{thing:id}}";
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -221,7 +221,7 @@ public void testConnectionTypeHono() throws IOException {
.toBuilder()
.id(connectionId)
.build();
final var expectedHonoConnection = generateConnectionObjectFromJsonFile("hono-connection-custom-expected.json", connectionId)
final var expectedHonoConnection = generateConnectionObjectFromJsonFile("hono-connection-custom-expected-after-adaptation.json", connectionId)
.toBuilder()
.id(connectionId)
.build();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
"sources": [
{
"addresses": [
"hono.telemetry.test-connection-id"
"hono.telemetry.hono-tenant-id"
],
"consumerCount": 1,
"qos": 0,
Expand All @@ -32,7 +32,7 @@
"implicitStandaloneThingCreation"
],
"replyTarget": {
"address": "hono.command.test-connection-id/{{thing:id}}",
"address": "hono.command.hono-tenant-id/{{thing:id}}",
"headerMapping": {
"device_id": "custom_value1",
"user_key1": "user_value1",
Expand All @@ -48,7 +48,7 @@
},
{
"addresses": [
"hono.event.test-connection-id"
"hono.event.hono-tenant-id"
],
"consumerCount": 1,
"qos": 1,
Expand All @@ -72,7 +72,7 @@
"implicitStandaloneThingCreation"
],
"replyTarget": {
"address": "hono.command.test-connection-id/{{thing:id}}",
"address": "hono.command.hono-tenant-id/{{thing:id}}",
"headerMapping": {
"device_id": "{{ thing:id }}",
"subject": "custom_value2",
Expand All @@ -88,7 +88,7 @@
},
{
"addresses": [
"hono.command_response.test-connection-id"
"hono.command_response.hono-tenant-id"
],
"consumerCount": 1,
"qos": 0,
Expand Down Expand Up @@ -120,7 +120,7 @@
],
"targets": [
{
"address": "hono.command.test-connection-id/{{thing:id}}",
"address": "hono.command.hono-tenant-id/{{thing:id}}",
"topics": [
"_/_/things/live/messages",
"_/_/things/live/commands"
Expand All @@ -137,7 +137,7 @@
}
},
{
"address": "hono.command.test-connection-id/{{thing:id}}",
"address": "hono.command.hono-tenant-id/{{thing:id}}",
"topics": [
"_/_/things/twin/events",
"_/_/things/live/events"
Expand All @@ -158,6 +158,7 @@
"validateCertificates": false,
"processorPoolSize": 5,
"specificConfig": {
"honoTenantId": "hono-tenant-id",
"saslMechanism": "plain",
"bootstrapServers": "tcp://server1:port1,tcp://server2:port2,tcp://server3:port3",
"groupId": "custom_groupId"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,7 @@
"validateCertificates": true,
"processorPoolSize": 5,
"specificConfig": {
"honoTenantId": "hono-tenant-id",
"groupId": "custom_groupId"
},
"mappingDefinitions": {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,9 @@
"failoverEnabled": true,
"validateCertificates": true,
"processorPoolSize": 5,
"specificConfig": {
"honoTenantId": "hono-tenant-id"
},
"mappingDefinitions": {
"implicitEdgeThingCreation": {
"mappingEngine": "ImplicitThingCreation",
Expand Down
Loading
Loading