Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add new "Hono" connection type #1548

Merged
merged 77 commits into from
Dec 19, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
77 commits
Select commit Hold shift + click to select a range
fad16b2
Connection type 'Hono' introduced
abalarev May 30, 2022
9af8a4f
HonoConfig introduced to provide properties for the new Hono-connecti…
abalarev May 30, 2022
448d5a2
DefaultHonoConfig and conf files update
abalarev Jun 7, 2022
70b47d1
Config values added
abalarev Jun 7, 2022
1029377
Redundant file removed
abalarev Jun 8, 2022
16f3da6
HonoCredentials taken from static configuration
abalarev Jun 13, 2022
000b333
Config refactored
abalarev Jun 17, 2022
845d7b8
Config refactored
abalarev Jun 22, 2022
70386d0
Connection "enrichment"
abalarev Jun 24, 2022
c70c88b
CR-11462 Add HonoValidator impl and Junit tests
SilviaGeorgievaLyoteva Jun 29, 2022
455891a
Connection "enrichment"
abalarev Jul 5, 2022
6db9090
Merge remote-tracking branch 'origin/feature/Hono_connection_type' in…
abalarev Jul 5, 2022
7b72bee
Fixed HonoValidator merge issues
abalarev Jul 5, 2022
9426aec
Hono-connection enrichment fixed
abalarev Jul 6, 2022
fba66b9
ConfigValues renamed to HonoConfigValue, minor fixes
abalarev Jul 6, 2022
ba5b81a
Review issues fixes, unit tests fixed
abalarev Jul 7, 2022
6a5aee5
Review issues fixes, unit tests fixed
abalarev Jul 7, 2022
e675897
DefaultHonoConfig class moved from api to service, URI init fixed
abalarev Jul 13, 2022
0b11f79
Merge remote-tracking branch 'origin/master' into feature/Hono_connec…
abalarev Jul 13, 2022
0fecea7
Merge remote-tracking branch 'origin/feature/Hono_connection_type' in…
Jul 14, 2022
cda7bb5
Minor refactoring of ConfigWithFallback.
Jul 19, 2022
a843974
Streamline behaviour of DefaultScopedConfig in case of an unknown enu…
Jul 19, 2022
09907a9
Refactored HonoConfig.
Jul 19, 2022
710f818
Connection enrichment refactored, HonoConfig cleaned up.
abalarev Jul 20, 2022
c59a028
Refactored `HonoAddressAlias` and added unit tests.
Jul 21, 2022
0391f92
Refactored `HonoValidator` and adjusted unit tests.
Jul 22, 2022
c86312f
Unit tests created
abalarev Jul 25, 2022
53da031
Merge remote-tracking branch 'origin/feature/Hono_connection_type' in…
Jul 27, 2022
111c99a
Fixed compilation errors of `HonoConnectionFactoryTest`.
Jul 27, 2022
fca7c19
Cleaned up `HonoConfig`.
Aug 4, 2022
9d9705e
Refactored `HonoConnectionFactory`.
Aug 4, 2022
495f8b2
Merge branch 'master' into feature/Hono_connection_type
Aug 8, 2022
766b2dc
Reordered import.
Aug 10, 2022
059f619
Further simplified `HonoAddressAlias`.
Aug 10, 2022
6e73c74
Fixed unit test.
Aug 10, 2022
b46e333
Moved back method for getting `HonoAddressAlias` by alias value to th…
Aug 10, 2022
2c01aef
Made `HonoConnectionFactory` a `DittoExtensionPoint`.
Aug 10, 2022
0bfe490
newInstance() method added to UserPasswordCredentials class
abalarev Aug 12, 2022
5ee0c6e
Merge branch 'master' into feature/Hono_connection_type
Yannic92 Aug 29, 2022
69ee5af
replyTargetEnabled bug in ImmutableSource fixed
abalarev Aug 31, 2022
136af98
Custom `headerMappings` and custom `groupId` in `specificConfig` allo…
abalarev Sep 2, 2022
91b4804
groupId handling changed
abalarev Sep 15, 2022
fb0ae41
Hono-credentials added to connectivity-extension.conf
abalarev Oct 4, 2022
26902a5
retrieveHonoConnection piggyback command introduced.
abalarev Oct 4, 2022
b1845b6
Automatic reconnect of 'hono' connection after credentials are changed.
abalarev Oct 4, 2022
36e1e13
Copyright headers fixes
abalarev Oct 5, 2022
5079f38
Javadoc improved.
abalarev Oct 5, 2022
b790173
Merge remote-tracking branch 'origin/master' into feature/Hono_connec…
abalarev Oct 5, 2022
5b7f415
some fixes after merge
abalarev Oct 6, 2022
ec67d80
Integrate Hono connection in ditto ConnectionRoute
SilviaGeorgievaLyoteva Oct 6, 2022
f34165d
Merge remote-tracking branch 'origin/master' into feature/Hono_connec…
abalarev Oct 7, 2022
60ea7fd
Hono-connection resolving added for TestConnection (dry-run)
abalarev Oct 12, 2022
a117a57
Merge remote-tracking branch 'origin/master' into feature/Hono_connec…
abalarev Oct 12, 2022
5c5564e
HubParamsModified renamed to HubInstanceInfoModified, few logs added
abalarev Oct 13, 2022
29fd5a3
CR-11683 Bug fixed in handling of 'Event'
abalarev Oct 13, 2022
1886e85
Review issues fixes.
abalarev Oct 17, 2022
5f2ec1c
Automatic reconnect fixed to filter hono-connections only
abalarev Oct 19, 2022
3ff0fe9
Auto-reconnect review issues fixed.
abalarev Oct 21, 2022
b4a0e76
Revert ConnectionRoute to origin/master and change REST API documenta…
SilviaGeorgievaLyoteva Oct 21, 2022
6f7adeb
Reverted ConnectionRoute to origin/master
SilviaGeorgievaLyoteva Oct 21, 2022
57703cc
Javadoc fixed
abalarev Oct 21, 2022
f49fb66
More review issues fixed.
abalarev Oct 24, 2022
9b38b8d
Changed ConnectionId.generateRandom()
SilviaGeorgievaLyoteva Oct 24, 2022
d5c9add
Introduce dedicated implementation of HonoConnection
SilviaGeorgievaLyoteva Oct 25, 2022
79c237b
Merge remote-tracking branch 'origin/master' into feature/Hono_connec…
abalarev Oct 31, 2022
e649ad0
Fixed HonoConnection implementation and added HonoConnection test
SilviaGeorgievaLyoteva Nov 2, 2022
83ca606
Introduce AbstractConnection and AbstractConnectionBuilder
SilviaGeorgievaLyoteva Nov 24, 2022
37cdff1
Hono connection default config kafka parameters changed
abalarev Nov 24, 2022
ca276e5
Merge remote-tracking branch 'origin/master' into feature/Hono_connec…
abalarev Nov 24, 2022
ef5d05c
Unit test fixed
abalarev Nov 24, 2022
baecbb1
Removed empty rows, checked for hono conn. type in getConnectionTypeO…
SilviaGeorgievaLyoteva Dec 1, 2022
83315d6
Documentation updated regarding Hono connection
abalarev Dec 2, 2022
100fe3d
Fixed copyright headers
abalarev Dec 9, 2022
e4369c9
Fixed review issues in connectivity-protocol-bindings-hono.md
abalarev Dec 12, 2022
2c4663d
Merge remote-tracking branch 'origin/master' into feature/Hono_connec…
abalarev Dec 12, 2022
4444087
Copyright headers
abalarev Dec 12, 2022
ff46662
adjust since tags in javadoc to Ditto version 3.2.0
thjaeckle Dec 19, 2022
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ private static MappingStrategies getConnectivityMappingStrategies() {
.add(GlobalErrorRegistry.getInstance())
.add(Connection.class, ConnectivityModelFactory::connectionFromJson)
.add("ImmutableConnection", ConnectivityModelFactory::connectionFromJson)
.add("HonoConnection", ConnectivityModelFactory::connectionFromJson)
.add(ResourceStatus.class, ConnectivityModelFactory::resourceStatusFromJson)
.add("ImmutableResourceStatus", ConnectivityModelFactory::resourceStatusFromJson)
.add(ConnectivityStatus.class, ConnectivityStatus::fromJson)
Expand Down

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
@@ -0,0 +1,325 @@
/*
* Copyright (c) 2022 Contributors to the Eclipse Foundation
*
* See the NOTICE file(s) distributed with this work for additional
* information regarding copyright ownership.
*
* This program and the accompanying materials are made available under the
* terms of the Eclipse Public License 2.0 which is available at
* http://www.eclipse.org/legal/epl-2.0
*
* SPDX-License-Identifier: EPL-2.0
*/
package org.eclipse.ditto.connectivity.model;

import static org.eclipse.ditto.base.model.common.ConditionChecker.checkArgument;
import static org.eclipse.ditto.base.model.common.ConditionChecker.checkNotNull;

import java.text.MessageFormat;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;

import javax.annotation.Nullable;
import javax.annotation.concurrent.Immutable;

/**
* Abstract implementation for common aspects of {@link ConnectionBuilder}.
*
* @since 3.2.0
*/

@Immutable
abstract class AbstractConnectionBuilder implements ConnectionBuilder {

private static final String MIGRATED_MAPPER_ID = "javascript";

// required but changeable:
@Nullable ConnectionId id;
@Nullable ConnectivityStatus connectionStatus;
String uri;
// optional:
@Nullable String name = null;
@Nullable Credentials credentials;
@Nullable MappingContext mappingContext = null;
@Nullable String trustedCertificates;
@Nullable ConnectionLifecycle lifecycle = null;
@Nullable SshTunnel sshTunnel = null;

// optional with default:
Set<String> tags = new LinkedHashSet<>();
boolean failOverEnabled = true;
boolean validateCertificate = true;
final List<Source> sources = new ArrayList<>();
final List<Target> targets = new ArrayList<>();
int clientCount = 1;
int processorPoolSize = 1;
PayloadMappingDefinition payloadMappingDefinition =
ConnectivityModelFactory.emptyPayloadMappingDefinition();
final Map<String, String> specificConfig = new HashMap<>();
ConnectionType connectionType;

AbstractConnectionBuilder(final ConnectionType connectionType) {
this.connectionType = connectionType;
}

private static boolean isBlankOrNull(@Nullable final String toTest) {
return null == toTest || toTest.trim().isEmpty();
}

@Override
public ConnectionBuilder id(final ConnectionId id) {
this.id = checkNotNull(id, "id");
return this;
}

@Override
public ConnectionBuilder name(@Nullable final String name) {
this.name = name;
return this;
}

@Override
public ConnectionBuilder credentials(@Nullable final Credentials credentials) {
this.credentials = credentials;
return this;
}

@Override
public ConnectionBuilder trustedCertificates(@Nullable final String trustedCertificates) {
if (isBlankOrNull(trustedCertificates)) {
this.trustedCertificates = null;
} else {
this.trustedCertificates = trustedCertificates;
}
return this;
}

@Override
public ConnectionBuilder uri(final String uri) {
this.uri = checkNotNull(uri, "uri");
return this;
}

@Override
public ConnectionBuilder connectionStatus(final ConnectivityStatus connectionStatus) {
this.connectionStatus = checkNotNull(connectionStatus, "connectionStatus");
return this;
}

@Override
public ConnectionBuilder failoverEnabled(final boolean failOverEnabled) {
this.failOverEnabled = failOverEnabled;
return this;
}

@Override
public ConnectionBuilder validateCertificate(final boolean validateCertificate) {
this.validateCertificate = validateCertificate;
return this;
}

@Override
public ConnectionBuilder processorPoolSize(final int processorPoolSize) {
checkArgument(processorPoolSize, ps -> ps > 0, () -> "The processor pool size must be positive!");
this.processorPoolSize = processorPoolSize;
return this;
}

@Override
public ConnectionBuilder sources(final List<Source> sources) {
this.sources.addAll(checkNotNull(sources, "sources"));
return this;
}

@Override
public ConnectionBuilder targets(final List<Target> targets) {
this.targets.addAll(checkNotNull(targets, "targets"));
return this;
}

@Override
public ConnectionBuilder setSources(final List<Source> sources) {
this.sources.clear();
return sources(sources);
}

@Override
public ConnectionBuilder setTargets(final List<Target> targets) {
this.targets.clear();
return targets(targets);
}

@Override
public ConnectionBuilder clientCount(final int clientCount) {
checkArgument(clientCount, ps -> ps > 0, () -> "The client count must be positive!");
this.clientCount = clientCount;
return this;
}

@Override
public ConnectionBuilder specificConfig(final Map<String, String> specificConfig) {
this.specificConfig.putAll(checkNotNull(specificConfig, "specificConfig"));
return this;
}

@Override
public ConnectionBuilder mappingContext(@Nullable final MappingContext mappingContext) {
this.mappingContext = mappingContext;
return this;
}

@Override
public ConnectionBuilder tags(final Collection<String> tags) {
this.tags = new LinkedHashSet<>(checkNotNull(tags, "tags to set"));
return this;
}

@Override
public ConnectionBuilder tag(final String tag) {
tags.add(checkNotNull(tag, "tag to set"));
return this;
}

@Override
public ConnectionBuilder lifecycle(@Nullable final ConnectionLifecycle lifecycle) {
this.lifecycle = lifecycle;
return this;
}

@Override
public ConnectionBuilder sshTunnel(@Nullable final SshTunnel sshTunnel) {
this.sshTunnel = sshTunnel;
return this;
}

@Override
public ConnectionBuilder payloadMappingDefinition(final PayloadMappingDefinition payloadMappingDefinition) {
this.payloadMappingDefinition = payloadMappingDefinition;
return this;
}

private boolean shouldMigrateMappingContext() {
return mappingContext != null;
}

void migrateLegacyConfigurationOnTheFly() {
if (shouldMigrateMappingContext()) {
this.payloadMappingDefinition =
payloadMappingDefinition.withDefinition(MIGRATED_MAPPER_ID, mappingContext);
}
setSources(sources.stream().map(this::migrateSource).collect(Collectors.toList()));
setTargets(targets.stream().map(this::migrateTarget).collect(Collectors.toList()));
}

private Source migrateSource(final Source source) {
final Source sourceAfterReplyTargetMigration = ImmutableSource.migrateReplyTarget(source, connectionType);
if (shouldMigrateMappingContext()) {
return new ImmutableSource.Builder(sourceAfterReplyTargetMigration)
.payloadMapping(addMigratedPayloadMappings(source.getPayloadMapping()))
.build();
} else {
return sourceAfterReplyTargetMigration;
}
}

private Target migrateTarget(final Target target) {
final boolean shouldAddHeaderMapping = shouldAddDefaultHeaderMappingToTarget(connectionType);
final boolean shouldMigrateMappingContext = shouldMigrateMappingContext();
if (shouldMigrateMappingContext || shouldAddHeaderMapping) {
final TargetBuilder builder = new ImmutableTarget.Builder(target);
if (shouldMigrateMappingContext) {
builder.payloadMapping(addMigratedPayloadMappings(target.getPayloadMapping()));
}
if (shouldAddHeaderMapping) {
builder.headerMapping(target.getHeaderMapping());
}
return builder.build();
} else {
return target;
}
}

private boolean shouldAddDefaultHeaderMappingToTarget(final ConnectionType connectionType) {
switch (connectionType) {
case AMQP_091:
case AMQP_10:
case KAFKA:
case MQTT_5:
case HONO:
return true;
case MQTT:
case HTTP_PUSH:
default:
return false;
}
}


private PayloadMapping addMigratedPayloadMappings(final PayloadMapping payloadMapping) {
final ArrayList<String> merged = new ArrayList<>(payloadMapping.getMappings());
merged.add(MIGRATED_MAPPER_ID);
return ConnectivityModelFactory.newPayloadMapping(merged);
}

void checkSourceAndTargetAreValid() {
if (sources.isEmpty() && targets.isEmpty()) {
throw ConnectionConfigurationInvalidException.newBuilder("Either a source or a target must be " +
"specified in the configuration of a connection!").build();
}
}

/**
* If no context is set on connection level each target and source must have its own context.
*/
void checkAuthorizationContextsAreValid() {
// if the auth context on connection level is empty,
// an auth context is required to be set on each source/target
final Set<String> sourcesWithoutAuthContext = sources.stream()
.filter(source -> source.getAuthorizationContext().isEmpty())
.flatMap(source -> source.getAddresses().stream())
.collect(Collectors.toCollection(LinkedHashSet::new));
final Set<String> targetsWithoutAuthContext = targets.stream()
.filter(target -> target.getAuthorizationContext().isEmpty())
.map(Target::getAddress)
.collect(Collectors.toCollection(LinkedHashSet::new));

if (!sourcesWithoutAuthContext.isEmpty() || !targetsWithoutAuthContext.isEmpty()) {
final StringBuilder message = new StringBuilder("The ");
if (!sourcesWithoutAuthContext.isEmpty()) {
message.append("Sources ").append(sourcesWithoutAuthContext);
}
if (!sourcesWithoutAuthContext.isEmpty() && !targetsWithoutAuthContext.isEmpty()) {
message.append(" and ");
}
if (!targetsWithoutAuthContext.isEmpty()) {
message.append("Targets ").append(targetsWithoutAuthContext);
}
message.append(" are missing an authorization context.");
throw ConnectionConfigurationInvalidException.newBuilder(message.toString()).build();
}
}

void checkConnectionAnnouncementsOnlySetIfClientCount1() {
if (clientCount > 1 && containsTargetWithConnectionAnnouncementsTopic()) {
final String message = MessageFormat.format("Connection announcements (topic {0}) can" +
" only be used with client count 1.", Topic.CONNECTION_ANNOUNCEMENTS.getName());
throw ConnectionConfigurationInvalidException.newBuilder(message)
.build();
}
}

private boolean containsTargetWithConnectionAnnouncementsTopic() {
return targets.stream()
.map(Target::getTopics)
.flatMap(Set::stream)
.map(FilteredTopic::getTopic)
.anyMatch(Topic.CONNECTION_ANNOUNCEMENTS::equals);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
import org.eclipse.ditto.base.model.json.JsonParsableException;

/**
* Thrown if the the configuration of a {@link Connection} was invalid.
* Thrown if the configuration of a {@link Connection} was invalid.
*/
@Immutable
@JsonParsableException(errorCode = ConnectionConfigurationInvalidException.ERROR_CODE)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,13 @@ public enum ConnectionType implements CharSequence {
/**
* Indicates a MQTT 5 connection.
*/
MQTT_5("mqtt-5");
MQTT_5("mqtt-5"),

/**
* Indicates a connection to Eclipse Hono.
* @since 3.2.0
*/
HONO("hono");

private final String name;

Expand Down
Loading