Skip to content

Commit

Permalink
[Java] Remove MDS channel transports when endpoint is closing.
Browse files Browse the repository at this point in the history
Fixes #1683
  • Loading branch information
vyazelenko committed Nov 22, 2024
1 parent 235499b commit 2aa1ab1
Show file tree
Hide file tree
Showing 3 changed files with 120 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -33,17 +33,15 @@ final class MultiRcvDestination

private ReceiveDestinationTransport[] transports = EMPTY_TRANSPORTS;

void closeTransports(final DataTransportPoller poller)
void closeTransports(final ReceiveChannelEndpoint endpoint, final DataTransportPoller poller)
{
for (final ReceiveDestinationTransport transport : transports)
{
if (null != transport)
{
poller.cancelRead(endpoint, transport);
transport.closeTransport();
if (null != poller)
{
poller.selectNowWithoutProcessing();
}
poller.selectNowWithoutProcessing();
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -298,7 +298,7 @@ public void closeMultiRcvDestinationTransports(final DataTransportPoller poller)
{
if (null != multiRcvDestination)
{
multiRcvDestination.closeTransports(poller);
multiRcvDestination.closeTransports(this, poller);
}
}

Expand Down Expand Up @@ -1135,6 +1135,7 @@ public String toString()
", udpChannel=" + udpChannel +
", connectAddress=" + connectAddress +
", isClosed=" + isClosed +
", multiRcvDestination=" + multiRcvDestination +
'}';
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,9 @@

import io.aeron.driver.MediaDriver;
import io.aeron.driver.ThreadingMode;
import io.aeron.driver.status.ReceiveLocalSocketAddress;
import io.aeron.driver.status.SystemCounterDescriptor;
import io.aeron.exceptions.RegistrationException;
import io.aeron.logbuffer.FragmentHandler;
import io.aeron.logbuffer.Header;
import io.aeron.logbuffer.LogBufferDescriptor;
Expand All @@ -30,22 +33,29 @@
import org.agrona.CloseHelper;
import org.agrona.DirectBuffer;
import org.agrona.ErrorHandler;
import org.agrona.collections.MutableInteger;
import org.agrona.collections.MutableLong;
import org.agrona.concurrent.UnsafeBuffer;
import org.agrona.concurrent.status.CountersReader;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.condition.EnabledOnOs;
import org.junit.jupiter.api.condition.OS;
import org.junit.jupiter.api.extension.ExtendWith;
import org.junit.jupiter.api.extension.RegisterExtension;
import org.junit.jupiter.api.io.TempDir;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;

import java.nio.file.Path;
import java.util.concurrent.ThreadLocalRandom;
import java.util.function.Supplier;

import static io.aeron.AeronCounters.DRIVER_RECEIVE_CHANNEL_STATUS_TYPE_ID;
import static io.aeron.ChannelUri.SPY_QUALIFIER;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.hamcrest.CoreMatchers.containsString;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.junit.jupiter.api.Assertions.*;
import static org.mockito.Mockito.*;

@ExtendWith(InterruptingTestCallback.class)
Expand Down Expand Up @@ -764,6 +774,109 @@ void shouldMergeStreamsFromMultiplePublicationsWithSameParams()
verifyFragments(fragmentHandler, numMessagesToSend);
}

@ParameterizedTest
@ValueSource(strings = {
"aeron:udp?endpoint=localhost:8889" })
void shouldNotReuseEndpointAcrossMultipleSubscriptionsIfAtLeastOneIsMds(final String channel)
{
testWatcher.ignoreErrorsMatching((err) -> err.contains("Address already in use"));
launch(mock(ErrorHandler.class));

try (Subscription sub1 = clientA.addSubscription(channel, STREAM_ID);
Subscription mdsSubscription = clientA.addSubscription("aeron:udp?control-mode=manual", STREAM_ID))
{
final RegistrationException exception =
assertThrowsExactly(RegistrationException.class, () -> mdsSubscription.addDestination(sub1.channel()));
assertThat(exception.getMessage(), containsString("Address already in use"));

Tests.await(() -> 1 == clientA.countersReader().getCounterValue(SystemCounterDescriptor.ERRORS.id()));
}

try (Subscription mdsSubscription = clientA.addSubscription("aeron:udp?control-mode=manual", STREAM_ID))
{
mdsSubscription.addDestination(channel);

final RegistrationException exception =
assertThrowsExactly(RegistrationException.class, () -> clientA.addSubscription(channel, STREAM_ID));
assertThat(exception.getMessage(), containsString("Address already in use"));

Tests.await(() -> 2 == clientA.countersReader().getCounterValue(SystemCounterDescriptor.ERRORS.id()));
}

try (Subscription mdsSubscription1 = clientA.addSubscription("aeron:udp?control-mode=manual", STREAM_ID);
Subscription mdsSubscription2 = clientA.addSubscription("aeron:udp?control-mode=manual", STREAM_ID))
{
mdsSubscription1.addDestination(channel);
final RegistrationException exception =
assertThrowsExactly(RegistrationException.class, () -> mdsSubscription2.addDestination(channel));
assertThat(exception.getMessage(), containsString("Address already in use"));

Tests.await(() -> 3 == clientA.countersReader().getCounterValue(SystemCounterDescriptor.ERRORS.id()));
}
}

@Test
void shouldCleanupMdcDestinationWhenSubscriptionIsClosed()
{
launch(mock(ErrorHandler.class));

final CountersReader countersReader = clientA.countersReader();
final MutableInteger receiveSocketCount = new MutableInteger();
final CountersReader.MetaData socketAddressCapture = (counterId, typeId, keyBuffer, label) ->
{
if (AeronCounters.DRIVER_LOCAL_SOCKET_ADDRESS_STATUS_TYPE_ID == typeId &&
label.startsWith(ReceiveLocalSocketAddress.NAME))
{
receiveSocketCount.increment();
}
};

try (Publication pub1 = clientA.addExclusivePublication("aeron:udp?endpoint=localhost:8889", STREAM_ID);
Publication pub2 = clientA.addExclusivePublication(
"aeron:udp?control=localhost:5555|control-mode=dynamic", STREAM_ID))
{
final long registrationId;
try (Subscription mdsSubscription = clientA.addSubscription("aeron:udp?control-mode=manual", STREAM_ID))
{
registrationId = mdsSubscription.registrationId();
mdsSubscription.addDestination("aeron:udp?endpoint=localhost:8889");
mdsSubscription.addDestination("aeron:udp?control=localhost:5555|endpoint=localhost:0");

Tests.awaitConnected(pub1);
Tests.awaitConnected(pub2);
Tests.awaitConnected(mdsSubscription);

final int length = ThreadLocalRandom.current().nextInt(1, buffer.capacity());
while (pub1.offer(buffer, 0, length) < 0)
{
Tests.yield();
}

final int length2 = ThreadLocalRandom.current().nextInt(1, buffer.capacity());
while (pub2.offer(buffer, 0, length2) < 0)
{
Tests.yield();
}

countersReader.forEach(socketAddressCapture);
assertEquals(2, receiveSocketCount.intValue());
assertNotEquals(CountersReader.NULL_COUNTER_ID, countersReader.findByTypeIdAndRegistrationId(
DRIVER_RECEIVE_CHANNEL_STATUS_TYPE_ID, registrationId));
}

Tests.await(() ->
{
Tests.sleep(10);
receiveSocketCount.set(0);
countersReader.forEach(socketAddressCapture);
return 0 == receiveSocketCount.intValue();
});

assertEquals(CountersReader.NULL_COUNTER_ID, countersReader.findByTypeIdAndRegistrationId(
DRIVER_RECEIVE_CHANNEL_STATUS_TYPE_ID, registrationId));
}
}

private void pollForFragment(final Subscription subscription, final FragmentHandler handler)
{
while (0 == subscription.poll(handler, 1))
Expand Down

0 comments on commit 2aa1ab1

Please sign in to comment.