Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Java] Cluster interface config. #1290

Merged
merged 6 commits into from
Mar 4, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1466,7 +1466,7 @@ private int runTasks(final ArrayDeque<Runnable> taskQueue)
private static ChannelUriStringBuilder strippedChannelBuilder(final ChannelUri channelUri)
{
return new ChannelUriStringBuilder()
.media(channelUri.media())
.media(channelUri)
.endpoint(channelUri)
.networkInterface(channelUri)
.controlEndpoint(channelUri)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -440,7 +440,7 @@ private int extend()
{
if (isMds)
{
replayDestination = "aeron:udp?endpoint=" + endpoint;
replayDestination = ChannelUri.createDestinationUri(replicationChannel, endpoint);
recordingSubscription.asyncAddDestination(replayDestination);
}

Expand Down
28 changes: 27 additions & 1 deletion aeron-client/src/main/java/io/aeron/ChannelUri.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,10 @@
import org.agrona.collections.ArrayUtil;
import org.agrona.collections.Object2ObjectHashMap;

import java.util.*;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;

import static io.aeron.CommonContext.*;
import static io.aeron.logbuffer.FrameDescriptor.FRAME_ALIGNMENT;
Expand All @@ -36,6 +39,7 @@
* </pre>
* <p>
* Multiple params with the same key are allowed, the last value specified takes precedence.
*
* @see ChannelUriStringBuilder
*/
public final class ChannelUri
Expand Down Expand Up @@ -507,6 +511,28 @@ public static long getTag(final String paramValue)
AsciiEncoding.parseLongAscii(paramValue, 4, paramValue.length() - 4) : INVALID_TAG;
}

/**
* Create a channel URI for a destination, i.e. a channel that uses {@code media} and {@code interface} parameters
* of the original channel and adds specified {@code endpoint} to it. For example given the input channel is
* {@code aeron:udp?mtu=1440|ttl=0|endpoint=localhost:8090|term-length=128k|interface=eth0} and the endpoint is
* {@code 192.168.0.14} the output of this method will be {@code aeron:udp?endpoint=192.168.0.14|interface=eth0}.
*
* @param channel for which the destination is being added.
* @param endpoint for the target destination.
* @return new channel URI for a destination.
*/
public static String createDestinationUri(final String channel, final String endpoint)
mikeb01 marked this conversation as resolved.
Show resolved Hide resolved
{
final ChannelUri channelUri = ChannelUri.parse(channel);
final String uri = AERON_PREFIX + channelUri.media() + "?" + ENDPOINT_PARAM_NAME + "=" + endpoint;
final String networkInterface = channelUri.get(INTERFACE_PARAM_NAME);
if (null != networkInterface)
{
return uri + "|" + INTERFACE_PARAM_NAME + "=" + networkInterface;
}
return uri;
}

private static void validateMedia(final String media)
{
if (IPC_MEDIA.equals(media) || UDP_MEDIA.equals(media))
Expand Down
15 changes: 15 additions & 0 deletions aeron-client/src/test/java/io/aeron/ChannelUriTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.CsvSource;
import org.junit.jupiter.params.provider.MethodSource;

import java.util.List;
Expand Down Expand Up @@ -178,6 +179,20 @@ void hashCode(final String uri1, final String uri2, final boolean expected)
}
}

@ParameterizedTest
@CsvSource(value = {
"aeron:udp?endpoint=poison|interface=iface|mtu=4444, dest1, aeron:udp?endpoint=dest1|interface=iface",
"aeron:ipc, dest2, aeron:ipc?endpoint=dest2",
"aeron:udp, localhost, aeron:udp?endpoint=localhost",
"aeron:ipc?interface=here, there, aeron:ipc?endpoint=there|interface=here",
"aeron-spy:aeron:udp?eol=true|interface=none, abc, aeron:udp?endpoint=abc|interface=none",
"aeron:udp?interface=eth0|term-length=64k|ttl=0|endpoint=some, vm1, aeron:udp?endpoint=vm1|interface=eth0" })
void createDestinationUriTest(final String channel, final String endpoint, final String expected)
{
final String destinationUri = ChannelUri.createDestinationUri(channel, endpoint);
assertEquals(expected, destinationUri);
}

private static List<Arguments> equalityValues()
{
return asList(
Expand Down
131 changes: 130 additions & 1 deletion aeron-cluster/src/main/java/io/aeron/cluster/ConsensusModule.java
Original file line number Diff line number Diff line change
Expand Up @@ -442,6 +442,28 @@ public static final class Configuration
*/
public static final String REPLICATION_CHANNEL_PROP_NAME = "aeron.cluster.replication.channel";

/**
* Channel template used for replaying logs to a follower using the {@link ClusterMember#catchupEndpoint()}.
*/
public static final String FOLLOWER_CATCHUP_CHANNEL_PROP_NAME = "aeron.cluster.follower.catchup.channel";

/**
* Default channel template used for replaying logs to a follower using the
* {@link ClusterMember#catchupEndpoint()}.
*/
public static final String FOLLOWER_CATCHUP_CHANNEL_DEFAULT = UDP_CHANNEL;

/**
* Channel used to build the control request channel for the leader Archive.
*/
public static final String LEADER_ARCHIVE_CONTROL_CHANNEL_PROP_NAME =
"aeron.cluster.leader.archive.control.channel";

/**
* Default channel used to build the control request channel for the leader Archive.
*/
public static final String LEADER_ARCHIVE_CONTROL_CHANNEL_DEFAULT = "aeron:udp?term-length=64k";

/**
* Counter type id for the consensus module state.
*/
Expand Down Expand Up @@ -1041,6 +1063,30 @@ public static String replicationChannel()
return System.getProperty(REPLICATION_CHANNEL_PROP_NAME);
}

/**
* The value {@link #FOLLOWER_CATCHUP_CHANNEL_DEFAULT} or system property
* {@link #FOLLOWER_CATCHUP_CHANNEL_PROP_NAME} if set.
*
* @return {@link #FOLLOWER_CATCHUP_CHANNEL_DEFAULT} or system property
* {@link #FOLLOWER_CATCHUP_CHANNEL_PROP_NAME} if set.
*/
public static String followerCatchupChannel()
{
return System.getProperty(FOLLOWER_CATCHUP_CHANNEL_PROP_NAME, FOLLOWER_CATCHUP_CHANNEL_DEFAULT);
}

/**
* The value {@link #LEADER_ARCHIVE_CONTROL_CHANNEL_DEFAULT} or system property
* {@link #LEADER_ARCHIVE_CONTROL_CHANNEL_PROP_NAME} if set.
*
* @return {@link #LEADER_ARCHIVE_CONTROL_CHANNEL_DEFAULT} or system property
* {@link #LEADER_ARCHIVE_CONTROL_CHANNEL_PROP_NAME} if set.
*/
public static String leaderArchiveControlChannel()
{
return System.getProperty(LEADER_ARCHIVE_CONTROL_CHANNEL_PROP_NAME, LEADER_ARCHIVE_CONTROL_CHANNEL_DEFAULT);
}

/**
* The value {@link #WHEEL_TICK_RESOLUTION_DEFAULT_NS} or system property
* {@link #WHEEL_TICK_RESOLUTION_PROP_NAME} if set.
Expand Down Expand Up @@ -1139,6 +1185,7 @@ public static final class Context implements Cloneable
private String ingressChannel = AeronCluster.Configuration.ingressChannel();
private int ingressStreamId = AeronCluster.Configuration.ingressStreamId();
private int ingressFragmentLimit = Configuration.ingressFragmentLimit();
private String egressChannel = AeronCluster.Configuration.egressChannel();
private String logChannel = Configuration.logChannel();
private int logStreamId = Configuration.logStreamId();
private String memberEndpoints = Configuration.memberEndpoints();
Expand All @@ -1152,6 +1199,8 @@ public static final class Context implements Cloneable
private String consensusChannel = Configuration.consensusChannel();
private int consensusStreamId = Configuration.consensusStreamId();
private String replicationChannel = Configuration.replicationChannel();
private String followerCatchupChannel = Configuration.followerCatchupChannel();
private String leaderArchiveControlChannel = Configuration.leaderArchiveControlChannel();
private int logFragmentLimit = ClusteredServiceContainer.Configuration.logFragmentLimit();

private int serviceCount = Configuration.serviceCount();
Expand Down Expand Up @@ -1452,7 +1501,7 @@ public void conclude()

if (null == logPublisher)
{
logPublisher = new LogPublisher();
logPublisher = new LogPublisher(logChannel());
}

if (null == egressPublisher)
Expand Down Expand Up @@ -1854,6 +1903,34 @@ public int ingressFragmentLimit()
return ingressFragmentLimit;
}

/**
* Set the channel parameter for the egress channel that is used as template to define a response
* channel for a client:
* <ul>
* <li></li>
* </ul>
*
* @param channel parameter for the egress channel.
* @return this for a fluent API.
* @see io.aeron.cluster.client.AeronCluster.Configuration#EGRESS_CHANNEL_PROP_NAME
*/
public Context egressChannel(final String channel)
{
egressChannel = channel;
return this;
}

/**
* Get the channel parameter for the egress channel.
*
* @return the channel parameter for the egress channel.
* @see io.aeron.cluster.client.AeronCluster.Configuration#EGRESS_CHANNEL_PROP_NAME
*/
public String egressChannel()
{
return egressChannel;
}

/**
* Set the channel parameter for the cluster log channel.
*
Expand Down Expand Up @@ -2170,6 +2247,58 @@ public String replicationChannel()
return replicationChannel;
}

/**
* Set a channel template used for replaying logs to a follower using the
* {@link ClusterMember#catchupEndpoint()}.
*
* @param channel to do a catch replay to a follower.
* @return this for a fluent API
* @see Configuration#FOLLOWER_CATCHUP_CHANNEL_PROP_NAME
*/
public Context followerCatchupChannel(final String channel)
{
this.followerCatchupChannel = channel;
return this;
}

/**
* Gets the channel template used for replaying logs to a follower using the
* {@link ClusterMember#catchupEndpoint()}.
*
* @return channel used for replaying older data during a catchup phase.
* @see Configuration#FOLLOWER_CATCHUP_CHANNEL_PROP_NAME
*/
public String followerCatchupChannel()
{
return followerCatchupChannel;
}

/**
* Set a channel template used to build the control request channel for the leader Archive using the
* {@link ClusterMember#archiveEndpoint()}.
*
* @param channel for the Archive control requests.
* @return this for a fluent API
* @see Configuration#LEADER_ARCHIVE_CONTROL_CHANNEL_PROP_NAME
*/
public Context leaderArchiveControlChannel(final String channel)
{
this.leaderArchiveControlChannel = channel;
return this;
}

/**
* Gets the channel template used to build the control request channel for the leader Archive using the
* {@link ClusterMember#archiveEndpoint()}.
*
* @return channel used for replaying older data during a catchup phase.
* @see Configuration#LEADER_ARCHIVE_CONTROL_CHANNEL_PROP_NAME
*/
public String leaderArchiveControlChannel()
{
return leaderArchiveControlChannel;
}

/**
* Set the fragment limit to be used when polling the log {@link Subscription}.
*
Expand Down
Loading