From 75f5f780c8e177a8759bc0badbb04511a70aacd1 Mon Sep 17 00:00:00 2001 From: Dmitry Vyazelenko <696855+vyazelenko@users.noreply.github.com> Date: Mon, 21 Feb 2022 14:20:33 +0100 Subject: [PATCH 1/6] [Java] Replace all ad hoc channel creations in the Cluster with either the dedicated channels or channel templates. --- .../src/main/java/io/aeron/ChannelUri.java | 28 +++- .../test/java/io/aeron/ChannelUriTest.java | 15 ++ .../io/aeron/cluster/ConsensusModule.java | 131 +++++++++++++++++- .../aeron/cluster/ConsensusModuleAgent.java | 54 +++++--- .../java/io/aeron/cluster/DynamicJoin.java | 15 +- .../main/java/io/aeron/cluster/Election.java | 47 ++++--- .../java/io/aeron/cluster/LogPublisher.java | 21 ++- .../java/io/aeron/cluster/LogReplication.java | 7 +- 8 files changed, 266 insertions(+), 52 deletions(-) diff --git a/aeron-client/src/main/java/io/aeron/ChannelUri.java b/aeron-client/src/main/java/io/aeron/ChannelUri.java index 982dc2cf05..61dfbe4224 100644 --- a/aeron-client/src/main/java/io/aeron/ChannelUri.java +++ b/aeron-client/src/main/java/io/aeron/ChannelUri.java @@ -20,7 +20,10 @@ import org.agrona.collections.ArrayUtil; import org.agrona.collections.Object2ObjectHashMap; -import java.util.*; +import java.util.Arrays; +import java.util.HashMap; +import java.util.Map; +import java.util.Objects; import static io.aeron.CommonContext.*; import static io.aeron.logbuffer.FrameDescriptor.FRAME_ALIGNMENT; @@ -36,6 +39,7 @@ * *

* Multiple params with the same key are allowed, the last value specified takes precedence. + * * @see ChannelUriStringBuilder */ public final class ChannelUri @@ -507,6 +511,28 @@ public static long getTag(final String paramValue) AsciiEncoding.parseLongAscii(paramValue, 4, paramValue.length() - 4) : INVALID_TAG; } + /** + * Create a channel URI for a destination, i.e. a channel that uses {@code media} and {@code interface} parameters + * of the original channel and adds specified {@code endpoint} to it. For example given the input channel is + * {@code aeron:udp?mtu=1440|ttl=0|endpoint=localhost:8090|term-length=128k|interface=eth0} and the endpoint is + * {@code 192.168.0.14} the output of this method will be {@code aeron:udp?endpoint=192.168.0.14|interface=eth0}. + * + * @param channel for which the destination is being added. + * @param endpoint for the target destination. + * @return new channel URI for a destination. + */ + public static String createDestinationUri(final String channel, final String endpoint) + { + final ChannelUri channelUri = ChannelUri.parse(channel); + final String uri = AERON_PREFIX + channelUri.media() + "?" + ENDPOINT_PARAM_NAME + "=" + endpoint; + final String networkInterface = channelUri.get(INTERFACE_PARAM_NAME); + if (null != networkInterface) + { + return uri + "|" + INTERFACE_PARAM_NAME + "=" + networkInterface; + } + return uri; + } + private static void validateMedia(final String media) { if (IPC_MEDIA.equals(media) || UDP_MEDIA.equals(media)) diff --git a/aeron-client/src/test/java/io/aeron/ChannelUriTest.java b/aeron-client/src/test/java/io/aeron/ChannelUriTest.java index 9d8f176357..0fe838a75b 100644 --- a/aeron-client/src/test/java/io/aeron/ChannelUriTest.java +++ b/aeron-client/src/test/java/io/aeron/ChannelUriTest.java @@ -18,6 +18,7 @@ import org.junit.jupiter.api.Test; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.Arguments; +import org.junit.jupiter.params.provider.CsvSource; import org.junit.jupiter.params.provider.MethodSource; import java.util.List; @@ -178,6 +179,20 @@ void hashCode(final String uri1, final String uri2, final boolean expected) } } + @ParameterizedTest + @CsvSource(value = { + "aeron:udp?endpoint=poison|interface=iface|mtu=4444, dest1, aeron:udp?endpoint=dest1|interface=iface", + "aeron:ipc, dest2, aeron:ipc?endpoint=dest2", + "aeron:udp, localhost, aeron:udp?endpoint=localhost", + "aeron:ipc?interface=here, there, aeron:ipc?endpoint=there|interface=here", + "aeron-spy:aeron:udp?eol=true|interface=none, abc, aeron:udp?endpoint=abc|interface=none", + "aeron:udp?interface=eth0|term-length=64k|ttl=0|endpoint=some, vm1, aeron:udp?endpoint=vm1|interface=eth0" }) + void createDestinationUriTest(final String channel, final String endpoint, final String expected) + { + final String destinationUri = ChannelUri.createDestinationUri(channel, endpoint); + assertEquals(expected, destinationUri); + } + private static List equalityValues() { return asList( diff --git a/aeron-cluster/src/main/java/io/aeron/cluster/ConsensusModule.java b/aeron-cluster/src/main/java/io/aeron/cluster/ConsensusModule.java index 870e7f9057..c7e91b5b73 100644 --- a/aeron-cluster/src/main/java/io/aeron/cluster/ConsensusModule.java +++ b/aeron-cluster/src/main/java/io/aeron/cluster/ConsensusModule.java @@ -442,6 +442,28 @@ public static final class Configuration */ public static final String REPLICATION_CHANNEL_PROP_NAME = "aeron.cluster.replication.channel"; + /** + * Channel template used for replaying logs to a follower using the {@link ClusterMember#catchupEndpoint()}. + */ + public static final String FOLLOWER_CATCHUP_CHANNEL_PROP_NAME = "aeron.cluster.follower.catchup.channel"; + + /** + * Default channel template used for replaying logs to a follower using the + * {@link ClusterMember#catchupEndpoint()}. + */ + public static final String FOLLOWER_CATCHUP_CHANNEL_DEFAULT = UDP_CHANNEL; + + /** + * Channel used to build the control request channel for the leader Archive. + */ + public static final String LEADER_ARCHIVE_CONTROL_CHANNEL_PROP_NAME = + "aeron.cluster.leader.archive.control.channel"; + + /** + * Default channel used to build the control request channel for the leader Archive. + */ + public static final String LEADER_ARCHIVE_CONTROL_CHANNEL_DEFAULT = "aeron:udp?term-length=64k"; + /** * Counter type id for the consensus module state. */ @@ -1041,6 +1063,30 @@ public static String replicationChannel() return System.getProperty(REPLICATION_CHANNEL_PROP_NAME); } + /** + * The value {@link #FOLLOWER_CATCHUP_CHANNEL_DEFAULT} or system property + * {@link #FOLLOWER_CATCHUP_CHANNEL_PROP_NAME} if set. + * + * @return {@link #FOLLOWER_CATCHUP_CHANNEL_DEFAULT} or system property + * {@link #FOLLOWER_CATCHUP_CHANNEL_PROP_NAME} if set. + */ + public static String followerCatchupChannel() + { + return System.getProperty(FOLLOWER_CATCHUP_CHANNEL_PROP_NAME, FOLLOWER_CATCHUP_CHANNEL_DEFAULT); + } + + /** + * The value {@link #LEADER_ARCHIVE_CONTROL_CHANNEL_DEFAULT} or system property + * {@link #LEADER_ARCHIVE_CONTROL_CHANNEL_PROP_NAME} if set. + * + * @return {@link #LEADER_ARCHIVE_CONTROL_CHANNEL_DEFAULT} or system property + * {@link #LEADER_ARCHIVE_CONTROL_CHANNEL_PROP_NAME} if set. + */ + public static String leaderArchiveControlChannel() + { + return System.getProperty(LEADER_ARCHIVE_CONTROL_CHANNEL_PROP_NAME, LEADER_ARCHIVE_CONTROL_CHANNEL_DEFAULT); + } + /** * The value {@link #WHEEL_TICK_RESOLUTION_DEFAULT_NS} or system property * {@link #WHEEL_TICK_RESOLUTION_PROP_NAME} if set. @@ -1139,6 +1185,7 @@ public static final class Context implements Cloneable private String ingressChannel = AeronCluster.Configuration.ingressChannel(); private int ingressStreamId = AeronCluster.Configuration.ingressStreamId(); private int ingressFragmentLimit = Configuration.ingressFragmentLimit(); + private String egressChannel = AeronCluster.Configuration.egressChannel(); private String logChannel = Configuration.logChannel(); private int logStreamId = Configuration.logStreamId(); private String memberEndpoints = Configuration.memberEndpoints(); @@ -1152,6 +1199,8 @@ public static final class Context implements Cloneable private String consensusChannel = Configuration.consensusChannel(); private int consensusStreamId = Configuration.consensusStreamId(); private String replicationChannel = Configuration.replicationChannel(); + private String followerCatchupChannel = Configuration.followerCatchupChannel(); + private String leaderArchiveControlChannel = Configuration.leaderArchiveControlChannel(); private int logFragmentLimit = ClusteredServiceContainer.Configuration.logFragmentLimit(); private int serviceCount = Configuration.serviceCount(); @@ -1452,7 +1501,7 @@ public void conclude() if (null == logPublisher) { - logPublisher = new LogPublisher(); + logPublisher = new LogPublisher(logChannel()); } if (null == egressPublisher) @@ -1854,6 +1903,34 @@ public int ingressFragmentLimit() return ingressFragmentLimit; } + /** + * Set the channel parameter for the egress channel that is used as template to define a response + * channel for a client: + *

+ * + * @param channel parameter for the egress channel. + * @return this for a fluent API. + * @see io.aeron.cluster.client.AeronCluster.Configuration#EGRESS_CHANNEL_PROP_NAME + */ + public Context egressChannel(final String channel) + { + egressChannel = channel; + return this; + } + + /** + * Get the channel parameter for the egress channel. + * + * @return the channel parameter for the egress channel. + * @see io.aeron.cluster.client.AeronCluster.Configuration#EGRESS_CHANNEL_PROP_NAME + */ + public String egressChannel() + { + return egressChannel; + } + /** * Set the channel parameter for the cluster log channel. * @@ -2170,6 +2247,58 @@ public String replicationChannel() return replicationChannel; } + /** + * Set a channel template used for replaying logs to a follower using the + * {@link ClusterMember#catchupEndpoint()}. + * + * @param channel to do a catch replay to a follower. + * @return this for a fluent API + * @see Configuration#FOLLOWER_CATCHUP_CHANNEL_PROP_NAME + */ + public Context followerCatchupChannel(final String channel) + { + this.followerCatchupChannel = channel; + return this; + } + + /** + * Gets the channel template used for replaying logs to a follower using the + * {@link ClusterMember#catchupEndpoint()}. + * + * @return channel used for replaying older data during a catchup phase. + * @see Configuration#FOLLOWER_CATCHUP_CHANNEL_PROP_NAME + */ + public String followerCatchupChannel() + { + return followerCatchupChannel; + } + + /** + * Set a channel template used to build the control request channel for the leader Archive using the + * {@link ClusterMember#archiveEndpoint()}. + * + * @param channel for the Archive control requests. + * @return this for a fluent API + * @see Configuration#LEADER_ARCHIVE_CONTROL_CHANNEL_PROP_NAME + */ + public Context leaderArchiveControlChannel(final String channel) + { + this.leaderArchiveControlChannel = channel; + return this; + } + + /** + * Gets the channel template used to build the control request channel for the leader Archive using the + * {@link ClusterMember#archiveEndpoint()}. + * + * @return channel used for replaying older data during a catchup phase. + * @see Configuration#LEADER_ARCHIVE_CONTROL_CHANNEL_PROP_NAME + */ + public String leaderArchiveControlChannel() + { + return leaderArchiveControlChannel; + } + /** * Set the fragment limit to be used when polling the log {@link Subscription}. * diff --git a/aeron-cluster/src/main/java/io/aeron/cluster/ConsensusModuleAgent.java b/aeron-cluster/src/main/java/io/aeron/cluster/ConsensusModuleAgent.java index ccf01a743c..add8d115e3 100644 --- a/aeron-cluster/src/main/java/io/aeron/cluster/ConsensusModuleAgent.java +++ b/aeron-cluster/src/main/java/io/aeron/cluster/ConsensusModuleAgent.java @@ -383,7 +383,8 @@ void onSessionConnect( final byte[] encodedCredentials) { final long clusterSessionId = Cluster.Role.LEADER == role ? nextSessionId++ : NULL_VALUE; - final ClusterSession session = new ClusterSession(clusterSessionId, responseStreamId, responseChannel); + final ClusterSession session = new ClusterSession( + clusterSessionId, responseStreamId, createResponseChannel(responseChannel)); session.asyncConnect(aeron); final long now = clusterClock.time(); @@ -743,16 +744,14 @@ void onCatchupPosition( final ClusterMember follower = clusterMemberByIdMap.get(followerMemberId); if (null != follower && follower.catchupReplaySessionId() == NULL_VALUE) { - final String channel = new ChannelUriStringBuilder() - .media(CommonContext.UDP_MEDIA) - .endpoint(catchupEndpoint) - .sessionId(logPublisher.sessionId()) - .linger(0L) - .eos(Boolean.FALSE) - .build(); + final ChannelUri channel = ChannelUri.parse(ctx.followerCatchupChannel()); + channel.put(ENDPOINT_PARAM_NAME, catchupEndpoint); + channel.put(SESSION_ID_PARAM_NAME, Integer.toString(logPublisher.sessionId())); + channel.put(LINGER_PARAM_NAME, "0"); + channel.put(EOS_PARAM_NAME, "false"); follower.catchupReplaySessionId(archive.startReplay( - logRecordingId, logPosition, Long.MAX_VALUE, channel, ctx.logStreamId())); + logRecordingId, logPosition, Long.MAX_VALUE, channel.toString(), ctx.logStreamId())); follower.catchupReplayCorrelationId(archive.lastCorrelationId()); } } @@ -901,7 +900,8 @@ void onBackupQuery( } else if (state == ConsensusModule.State.ACTIVE || state == ConsensusModule.State.SUSPENDED) { - final ClusterSession session = new ClusterSession(NULL_VALUE, responseStreamId, responseChannel); + final ClusterSession session = new ClusterSession( + NULL_VALUE, responseStreamId, createResponseChannel(responseChannel)); session.markAsBackupSession(); session.asyncConnect(aeron); @@ -1205,7 +1205,8 @@ void onReplaySessionOpen( final int responseStreamId, final String responseChannel) { - final ClusterSession session = new ClusterSession(clusterSessionId, responseStreamId, responseChannel); + final ClusterSession session = new ClusterSession( + clusterSessionId, responseStreamId, createResponseChannel(responseChannel)); session.open(logPosition); session.lastActivityNs(clusterTimeUnit.toNanos(timestamp), correlationId); @@ -1353,7 +1354,7 @@ void onLoadSession( openedPosition, timeOfLastActivity, responseStreamId, - responseChannel, + createResponseChannel(responseChannel), closeReason)); if (clusterSessionId >= nextSessionId) @@ -1445,6 +1446,7 @@ int addLogPublication() final String channel = channelUri.toString(); final ExclusivePublication publication = aeron.addExclusivePublication(channel, ctx.logStreamId()); + logPublisher.publication(publication); if (ctx.isLogMdc()) { @@ -1452,18 +1454,16 @@ int addLogPublication() { if (member.id() != memberId) { - publication.asyncAddDestination("aeron:udp?endpoint=" + member.logEndpoint()); + logPublisher.addDestination(true, member.logEndpoint()); } } for (final ClusterMember member : passiveMembers) { - publication.asyncAddDestination("aeron:udp?endpoint=" + member.logEndpoint()); + logPublisher.addDestination(true, member.logEndpoint()); } } - logPublisher.publication(publication); - return publication.sessionId(); } @@ -2738,7 +2738,7 @@ LogReplication newLogReplication( leaderRecordingId, logRecordingId, stopPosition, - leaderArchiveEndpoint, + ChannelUri.createDestinationUri(ctx.leaderArchiveControlChannel(), leaderArchiveEndpoint), ctx.replicationChannel(), ctx.leaderHeartbeatTimeoutNs(), ctx.leaderHeartbeatIntervalNs(), @@ -3296,6 +3296,26 @@ private void runTerminationHook() } } + private String createResponseChannel(final String responseChannel) + { + final String egressChannel = ctx.egressChannel(); + if (null == egressChannel) + { + return responseChannel; // legacy behavior + } + else if (responseChannel.contains(ENDPOINT_PARAM_NAME)) + { + final String responseEndpoint = ChannelUri.parse(responseChannel).get(ENDPOINT_PARAM_NAME); + final ChannelUri channel = ChannelUri.parse(egressChannel); + channel.put(ENDPOINT_PARAM_NAME, responseEndpoint); + return channel.toString(); + } + else + { + return egressChannel; + } + } + public String toString() { return "ConsensusModuleAgent{" + diff --git a/aeron-cluster/src/main/java/io/aeron/cluster/DynamicJoin.java b/aeron-cluster/src/main/java/io/aeron/cluster/DynamicJoin.java index 7943551c16..6f5b7126b5 100644 --- a/aeron-cluster/src/main/java/io/aeron/cluster/DynamicJoin.java +++ b/aeron-cluster/src/main/java/io/aeron/cluster/DynamicJoin.java @@ -15,8 +15,10 @@ */ package io.aeron.cluster; -import io.aeron.*; -import io.aeron.archive.client.*; +import io.aeron.ChannelUri; +import io.aeron.Counter; +import io.aeron.ExclusivePublication; +import io.aeron.archive.client.AeronArchive; import io.aeron.archive.codecs.RecordingSignal; import io.aeron.archive.status.RecordingPos; import io.aeron.cluster.codecs.SnapshotRecordingsDecoder; @@ -283,7 +285,7 @@ private int snapshotRetrieve() RecordingPos.NULL_RECORDING_ID, AeronArchive.NULL_LENGTH, ctx.archiveContext().controlRequestStreamId(), - "aeron:udp?term-length=64k|endpoint=" + leaderMember.archiveEndpoint(), + leaderArchiveControlRequestChannel(), null, ctx.replicationChannel()); @@ -315,7 +317,7 @@ private int snapshotRetrieve() snapshotReplication.recordingId(), AeronArchive.NULL_LENGTH, ctx.archiveContext().controlRequestStreamId(), - "aeron:udp?term-length=64k|endpoint=" + leaderMember.archiveEndpoint(), + leaderArchiveControlRequestChannel(), null, ctx.replicationChannel()); @@ -328,6 +330,11 @@ private int snapshotRetrieve() return workCount; } + private String leaderArchiveControlRequestChannel() + { + return ChannelUri.createDestinationUri(ctx.leaderArchiveControlChannel(), leaderMember.archiveEndpoint()); + } + private int snapshotLoad(final long nowNs) { int workCount = 0; diff --git a/aeron-cluster/src/main/java/io/aeron/cluster/Election.java b/aeron-cluster/src/main/java/io/aeron/cluster/Election.java index 0430242056..b2ffcc2fb3 100644 --- a/aeron-cluster/src/main/java/io/aeron/cluster/Election.java +++ b/aeron-cluster/src/main/java/io/aeron/cluster/Election.java @@ -15,7 +15,10 @@ */ package io.aeron.cluster; -import io.aeron.*; +import io.aeron.Aeron; +import io.aeron.ChannelUri; +import io.aeron.Image; +import io.aeron.Subscription; import io.aeron.archive.codecs.RecordingSignal; import io.aeron.cluster.client.ClusterEvent; import io.aeron.cluster.client.ClusterException; @@ -34,6 +37,7 @@ import java.util.concurrent.TimeUnit; import static io.aeron.Aeron.NULL_VALUE; +import static io.aeron.CommonContext.*; import static io.aeron.archive.client.AeronArchive.NULL_POSITION; import static io.aeron.cluster.ClusterMember.compareLog; import static io.aeron.cluster.ElectionState.*; @@ -47,7 +51,7 @@ class Election private boolean isFirstInit = true; private boolean isLeaderStartup; private boolean isExtendedCanvass; - private int logSessionId = CommonContext.NULL_SESSION_ID; + private int logSessionId = NULL_SESSION_ID; private long timeOfLastStateChangeNs; private long timeOfLastUpdateNs; private long timeOfLastCommitPositionUpdateNs; @@ -556,7 +560,7 @@ private int init(final long nowNs) resetCatchup(); appendPosition = consensusModuleAgent.prepareForNewLeadership(logPosition); - logSessionId = CommonContext.NULL_SESSION_ID; + logSessionId = NULL_SESSION_ID; cleanupReplay(); CloseHelper.close(logSubscription); logSubscription = null; @@ -956,7 +960,7 @@ private int followerLogInit(final long nowNs) { if (null == logSubscription) { - if (CommonContext.NULL_SESSION_ID != logSessionId) + if (NULL_SESSION_ID != logSessionId) { logSubscription = addFollowerSubscription(); addLiveLogDestination(); @@ -1075,7 +1079,7 @@ private void publishNewLeadershipTerm(final long timestamp) private void publishNewLeadershipTerm( final ClusterMember member, final long logLeadershipTermId, final long timestamp) { - if (member.id() != thisMember.id() && CommonContext.NULL_SESSION_ID != logSessionId) + if (member.id() != thisMember.id() && NULL_SESSION_ID != logSessionId) { final RecordingLog.Entry logNextTermEntry = ctx.recordingLog().findTermEntry(logLeadershipTermId + 1); @@ -1132,14 +1136,22 @@ private boolean sendCatchupPosition(final String catchupEndpoint) private void addCatchupLogDestination() { - final String destination = "aeron:udp?endpoint=" + thisMember.catchupEndpoint(); + final String destination = ChannelUri.createDestinationUri(ctx.logChannel(), thisMember.catchupEndpoint()); logSubscription.addDestination(destination); consensusModuleAgent.catchupLogDestination(destination); } private void addLiveLogDestination() { - final String destination = ctx.isLogMdc() ? "aeron:udp?endpoint=" + thisMember.logEndpoint() : ctx.logChannel(); + final String destination; + if (ctx.isLogMdc()) + { + destination = ChannelUri.createDestinationUri(ctx.logChannel(), thisMember.logEndpoint()); + } + else + { + destination = ctx.logChannel(); + } logSubscription.addDestination(destination); consensusModuleAgent.liveLogDestination(destination); } @@ -1147,17 +1159,16 @@ private void addLiveLogDestination() private Subscription addFollowerSubscription() { final Aeron aeron = ctx.aeron(); - final String channel = new ChannelUriStringBuilder() - .media(CommonContext.UDP_MEDIA) - .tags(aeron.nextCorrelationId() + "," + aeron.nextCorrelationId()) - .controlMode(CommonContext.MDC_CONTROL_MODE_MANUAL) - .sessionId(logSessionId) - .group(Boolean.TRUE) - .rejoin(Boolean.FALSE) - .alias("log") - .build(); - - return aeron.addSubscription(channel, ctx.logStreamId()); + final ChannelUri channel = ChannelUri.parse(ctx.logChannel()); + + channel.put(TAGS_PARAM_NAME, aeron.nextCorrelationId() + "," + aeron.nextCorrelationId()); + channel.put(MDC_CONTROL_MODE_PARAM_NAME, MDC_CONTROL_MODE_MANUAL); + channel.put(SESSION_ID_PARAM_NAME, Integer.toString(logSessionId)); + channel.put(GROUP_PARAM_NAME, "true"); + channel.put(REJOIN_PARAM_NAME, "false"); + channel.put(ALIAS_PARAM_NAME, "log"); + + return aeron.addSubscription(channel.toString(), ctx.logStreamId()); } private void state(final ElectionState newState, final long nowNs) diff --git a/aeron-cluster/src/main/java/io/aeron/cluster/LogPublisher.java b/aeron-cluster/src/main/java/io/aeron/cluster/LogPublisher.java index 02a5881d73..d90430ba2e 100644 --- a/aeron-cluster/src/main/java/io/aeron/cluster/LogPublisher.java +++ b/aeron-cluster/src/main/java/io/aeron/cluster/LogPublisher.java @@ -15,13 +15,18 @@ */ package io.aeron.cluster; -import io.aeron.*; -import io.aeron.cluster.service.ClusterClock; +import io.aeron.ChannelUri; +import io.aeron.ExclusivePublication; +import io.aeron.Publication; import io.aeron.cluster.codecs.*; +import io.aeron.cluster.service.ClusterClock; import io.aeron.exceptions.AeronException; import io.aeron.logbuffer.BufferClaim; import io.aeron.protocol.DataHeaderFlyweight; -import org.agrona.*; +import org.agrona.CloseHelper; +import org.agrona.DirectBuffer; +import org.agrona.ErrorHandler; +import org.agrona.ExpandableArrayBuffer; import org.agrona.concurrent.UnsafeBuffer; import java.util.concurrent.TimeUnit; @@ -47,10 +52,13 @@ final class LogPublisher private final ExpandableArrayBuffer expandableArrayBuffer = new ExpandableArrayBuffer(); private final BufferClaim bufferClaim = new BufferClaim(); + private final String destinationChannel; + private ExclusivePublication publication; - LogPublisher() + LogPublisher(final String destinationChannel) { + this.destinationChannel = destinationChannel; sessionHeaderEncoder.wrapAndApplyHeader(sessionHeaderBuffer, 0, new MessageHeaderEncoder()); } @@ -101,7 +109,7 @@ void addDestination(final boolean isLogChannelMultiDestination, final String fol { if (isLogChannelMultiDestination && null != publication) { - publication.asyncAddDestination("aeron:udp?endpoint=" + followerLogEndpoint); + publication.asyncAddDestination(ChannelUri.createDestinationUri(destinationChannel, followerLogEndpoint)); } } @@ -109,7 +117,8 @@ void removeDestination(final boolean isLogChannelMultiDestination, final String { if (isLogChannelMultiDestination && null != publication) { - publication.asyncRemoveDestination("aeron:udp?endpoint=" + followerLogEndpoint); + publication.asyncRemoveDestination( + ChannelUri.createDestinationUri(destinationChannel, followerLogEndpoint)); } } diff --git a/aeron-cluster/src/main/java/io/aeron/cluster/LogReplication.java b/aeron-cluster/src/main/java/io/aeron/cluster/LogReplication.java index a68c2e283b..b6bc5dcc67 100644 --- a/aeron-cluster/src/main/java/io/aeron/cluster/LogReplication.java +++ b/aeron-cluster/src/main/java/io/aeron/cluster/LogReplication.java @@ -48,7 +48,7 @@ final class LogReplication final long srcRecordingId, final long dstRecordingId, final long stopPosition, - final String srcArchiveEndpoint, + final String srcArchiveChannel, final String replicationChannel, final long progressCheckTimeoutNs, final long progressCheckIntervalNs, @@ -61,14 +61,11 @@ final class LogReplication this.progressDeadlineNs = nowNs + progressCheckTimeoutNs; this.progressCheckDeadlineNs = nowNs + progressCheckIntervalNs; - final String srcArchiveChannel = "aeron:udp?endpoint=" + srcArchiveEndpoint; - final int srcControlStreamId = archive.context().controlRequestStreamId(); - replicationId = archive.replicate( srcRecordingId, dstRecordingId, stopPosition, - srcControlStreamId, + archive.context().controlRequestStreamId(), srcArchiveChannel, null, replicationChannel); From 8d6e03cc9ba292297943cfa38c810599c41e773d Mon Sep 17 00:00:00 2001 From: Dmitry Vyazelenko <696855+vyazelenko@users.noreply.github.com> Date: Mon, 21 Feb 2022 14:21:57 +0100 Subject: [PATCH 2/6] [Java] Use `replicationChannel` to create a `replayDestination` to avoid an ad hoc channel creation and re-use the `interface` param and the `media` of the source channel. --- .../src/main/java/io/aeron/archive/ReplicationSession.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/aeron-archive/src/main/java/io/aeron/archive/ReplicationSession.java b/aeron-archive/src/main/java/io/aeron/archive/ReplicationSession.java index aa797b48ad..f2197dba0a 100644 --- a/aeron-archive/src/main/java/io/aeron/archive/ReplicationSession.java +++ b/aeron-archive/src/main/java/io/aeron/archive/ReplicationSession.java @@ -440,7 +440,7 @@ private int extend() { if (isMds) { - replayDestination = "aeron:udp?endpoint=" + endpoint; + replayDestination = ChannelUri.createDestinationUri(replicationChannel, endpoint); recordingSubscription.asyncAddDestination(replayDestination); } From 8d4159cd1a93759823174cefddfd7d4c0af92f68 Mon Sep 17 00:00:00 2001 From: Dmitry Vyazelenko <696855+vyazelenko@users.noreply.github.com> Date: Mon, 21 Feb 2022 15:09:09 +0100 Subject: [PATCH 3/6] [Java] Add a test for the responseChannel. --- .../cluster/ConsensusModuleAgentTest.java | 45 +++++++++++++++++-- 1 file changed, 42 insertions(+), 3 deletions(-) diff --git a/aeron-cluster/src/test/java/io/aeron/cluster/ConsensusModuleAgentTest.java b/aeron-cluster/src/test/java/io/aeron/cluster/ConsensusModuleAgentTest.java index bd0942c3eb..65e5f0c69d 100644 --- a/aeron-cluster/src/test/java/io/aeron/cluster/ConsensusModuleAgentTest.java +++ b/aeron-cluster/src/test/java/io/aeron/cluster/ConsensusModuleAgentTest.java @@ -17,7 +17,9 @@ import io.aeron.*; import io.aeron.archive.client.AeronArchive; -import io.aeron.cluster.codecs.*; +import io.aeron.cluster.codecs.CloseReason; +import io.aeron.cluster.codecs.ClusterAction; +import io.aeron.cluster.codecs.EventCode; import io.aeron.cluster.service.Cluster; import io.aeron.cluster.service.ClusterMarkFile; import io.aeron.cluster.service.ClusterTerminationException; @@ -27,13 +29,18 @@ import io.aeron.test.Tests; import io.aeron.test.cluster.TestClusterClock; import org.agrona.collections.MutableLong; -import org.agrona.concurrent.*; +import org.agrona.concurrent.AgentInvoker; +import org.agrona.concurrent.CountedErrorHandler; +import org.agrona.concurrent.NoOpIdleStrategy; import org.agrona.concurrent.status.AtomicCounter; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.CsvSource; import org.mockito.ArgumentCaptor; import org.mockito.InOrder; import org.mockito.Mockito; + import java.util.concurrent.TimeUnit; import java.util.function.LongConsumer; @@ -42,7 +49,8 @@ import static io.aeron.cluster.ConsensusModuleAgent.SLOW_TICK_INTERVAL_NS; import static io.aeron.cluster.client.AeronCluster.Configuration.PROTOCOL_SEMANTIC_VERSION; import static java.lang.Boolean.TRUE; -import static org.junit.jupiter.api.Assertions.*; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertThrows; import static org.mockito.Mockito.*; public class ConsensusModuleAgentTest @@ -330,4 +338,35 @@ public void shouldThrowClusterTerminationExceptionUponShutdown() assertThrows(ClusterTerminationException.class, () -> agent.onServiceAck(1024, 100, 0, 55, 0)); } + + @ParameterizedTest + @CsvSource(value = { + "null, aeron:udp?endpoint=acme:2040, aeron:udp?endpoint=acme:2040", + ", aeron:ipc, aeron:ipc", + "aeron:udp?endpoint=host1:5050|interface=eth0|mtu=1440, aeron:udp?endpoint=localhost:8080|mtu=8000, " + + "aeron:udp?endpoint=localhost:8080|interface=eth0|mtu=1440", + "aeron:udp?endpoint=node0:21300|eos=false, aeron:udp?mtu=8000|interface=if1|eos=true|ttl=100, " + + "aeron:udp?endpoint=node0:21300|eos=false" + }, nullValues = "null") + void responseChannelIsBuiltBasedOnTheEgressChannel( + final String egressChannel, final String responseChannel, final String expectedResponseChannel) + { + final TestClusterClock clock = new TestClusterClock(TimeUnit.MILLISECONDS); + ctx.epochClock(clock).clusterClock(clock); + ctx.egressChannel(egressChannel); + + final ConsensusModuleAgent agent = new ConsensusModuleAgent(ctx); + agent.state(ConsensusModule.State.ACTIVE); + agent.role(Cluster.Role.LEADER); + + final long correlationId = 1L; + final int responseStreamId = 42; + agent.onSessionConnect( + correlationId, responseStreamId, PROTOCOL_SEMANTIC_VERSION, responseChannel, new byte[0]); + + final ArgumentCaptor channelCaptor = ArgumentCaptor.forClass(String.class); + verify(mockAeron).asyncAddPublication(channelCaptor.capture(), eq(responseStreamId)); + + assertEquals(ChannelUri.parse(expectedResponseChannel), ChannelUri.parse(channelCaptor.getValue())); + } } From c11ad17b72a12ccdad42da798c931d5731bac65c Mon Sep 17 00:00:00 2001 From: Dmitry Vyazelenko <696855+vyazelenko@users.noreply.github.com> Date: Mon, 21 Feb 2022 15:44:02 +0100 Subject: [PATCH 4/6] [Java] Use media/interface from the logChannel when creating follower log Subscription. --- .../main/java/io/aeron/cluster/Election.java | 28 +++++++++++-------- 1 file changed, 17 insertions(+), 11 deletions(-) diff --git a/aeron-cluster/src/main/java/io/aeron/cluster/Election.java b/aeron-cluster/src/main/java/io/aeron/cluster/Election.java index b2ffcc2fb3..4a8fea8a30 100644 --- a/aeron-cluster/src/main/java/io/aeron/cluster/Election.java +++ b/aeron-cluster/src/main/java/io/aeron/cluster/Election.java @@ -17,6 +17,7 @@ import io.aeron.Aeron; import io.aeron.ChannelUri; +import io.aeron.ChannelUriStringBuilder; import io.aeron.Image; import io.aeron.Subscription; import io.aeron.archive.codecs.RecordingSignal; @@ -37,7 +38,8 @@ import java.util.concurrent.TimeUnit; import static io.aeron.Aeron.NULL_VALUE; -import static io.aeron.CommonContext.*; +import static io.aeron.CommonContext.MDC_CONTROL_MODE_MANUAL; +import static io.aeron.CommonContext.NULL_SESSION_ID; import static io.aeron.archive.client.AeronArchive.NULL_POSITION; import static io.aeron.cluster.ClusterMember.compareLog; import static io.aeron.cluster.ElectionState.*; @@ -1159,16 +1161,20 @@ private void addLiveLogDestination() private Subscription addFollowerSubscription() { final Aeron aeron = ctx.aeron(); - final ChannelUri channel = ChannelUri.parse(ctx.logChannel()); - - channel.put(TAGS_PARAM_NAME, aeron.nextCorrelationId() + "," + aeron.nextCorrelationId()); - channel.put(MDC_CONTROL_MODE_PARAM_NAME, MDC_CONTROL_MODE_MANUAL); - channel.put(SESSION_ID_PARAM_NAME, Integer.toString(logSessionId)); - channel.put(GROUP_PARAM_NAME, "true"); - channel.put(REJOIN_PARAM_NAME, "false"); - channel.put(ALIAS_PARAM_NAME, "log"); - - return aeron.addSubscription(channel.toString(), ctx.logStreamId()); + final ChannelUri logChannel = ChannelUri.parse(ctx.logChannel()); + + final String channel = new ChannelUriStringBuilder() + .media(logChannel) + .networkInterface(logChannel) + .tags(aeron.nextCorrelationId() + "," + aeron.nextCorrelationId()) + .controlMode(MDC_CONTROL_MODE_MANUAL) + .sessionId(logSessionId) + .group(Boolean.TRUE) + .rejoin(Boolean.FALSE) + .alias("log") + .build(); + + return aeron.addSubscription(channel, ctx.logStreamId()); } private void state(final ElectionState newState, final long nowNs) From 2a042708b5919b2173901f16643414d087162d68 Mon Sep 17 00:00:00 2001 From: Dmitry Vyazelenko <696855+vyazelenko@users.noreply.github.com> Date: Mon, 21 Feb 2022 16:12:54 +0100 Subject: [PATCH 5/6] [Java] Do not use the logChannel when creating follower log Subscription as this results in an exception being thrown, e.g. `java.lang.IllegalArgumentException: matching tag=35 has explicit endpoint or control: aeron:udp?tags=35,36|interface=10.42.0.0/24|control-mode=manual|session-id=-197116438|alias=log|group=true|rejoin=false <> aeron:udp?tags=35,36|interface=10.42.0.0/24|control-mode=manual|session-id=-197116438|alias=log|group=true|rejoin=false`. --- .../src/main/java/io/aeron/cluster/Election.java | 8 ++------ 1 file changed, 2 insertions(+), 6 deletions(-) diff --git a/aeron-cluster/src/main/java/io/aeron/cluster/Election.java b/aeron-cluster/src/main/java/io/aeron/cluster/Election.java index 4a8fea8a30..a87c6c06d9 100644 --- a/aeron-cluster/src/main/java/io/aeron/cluster/Election.java +++ b/aeron-cluster/src/main/java/io/aeron/cluster/Election.java @@ -38,8 +38,7 @@ import java.util.concurrent.TimeUnit; import static io.aeron.Aeron.NULL_VALUE; -import static io.aeron.CommonContext.MDC_CONTROL_MODE_MANUAL; -import static io.aeron.CommonContext.NULL_SESSION_ID; +import static io.aeron.CommonContext.*; import static io.aeron.archive.client.AeronArchive.NULL_POSITION; import static io.aeron.cluster.ClusterMember.compareLog; import static io.aeron.cluster.ElectionState.*; @@ -1161,11 +1160,8 @@ private void addLiveLogDestination() private Subscription addFollowerSubscription() { final Aeron aeron = ctx.aeron(); - final ChannelUri logChannel = ChannelUri.parse(ctx.logChannel()); - final String channel = new ChannelUriStringBuilder() - .media(logChannel) - .networkInterface(logChannel) + .media(UDP_MEDIA) .tags(aeron.nextCorrelationId() + "," + aeron.nextCorrelationId()) .controlMode(MDC_CONTROL_MODE_MANUAL) .sessionId(logSessionId) From 5ddc89db3a4c6ff08299003004fda3cf173e742c Mon Sep 17 00:00:00 2001 From: Dmitry Vyazelenko <696855+vyazelenko@users.noreply.github.com> Date: Mon, 21 Feb 2022 16:39:10 +0100 Subject: [PATCH 6/6] [Java] Simplify. --- .../src/main/java/io/aeron/archive/ArchiveConductor.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/aeron-archive/src/main/java/io/aeron/archive/ArchiveConductor.java b/aeron-archive/src/main/java/io/aeron/archive/ArchiveConductor.java index bcabe3dfcc..37d8cb832f 100644 --- a/aeron-archive/src/main/java/io/aeron/archive/ArchiveConductor.java +++ b/aeron-archive/src/main/java/io/aeron/archive/ArchiveConductor.java @@ -1466,7 +1466,7 @@ private int runTasks(final ArrayDeque taskQueue) private static ChannelUriStringBuilder strippedChannelBuilder(final ChannelUri channelUri) { return new ChannelUriStringBuilder() - .media(channelUri.media()) + .media(channelUri) .endpoint(channelUri) .networkInterface(channelUri) .controlEndpoint(channelUri)