diff --git a/servicetalk-client-api-internal/src/main/java/io/servicetalk/client/api/internal/ServiceDiscovererUtils.java b/servicetalk-client-api-internal/src/main/java/io/servicetalk/client/api/internal/ServiceDiscovererUtils.java index 914f118126..71dc4f8536 100644 --- a/servicetalk-client-api-internal/src/main/java/io/servicetalk/client/api/internal/ServiceDiscovererUtils.java +++ b/servicetalk-client-api-internal/src/main/java/io/servicetalk/client/api/internal/ServiceDiscovererUtils.java @@ -61,17 +61,19 @@ public static List> calculateDifference(List> availableEvents = relativeComplement(true, currentActiveAddresses, newActiveAddresses, comparator, null); + // Store nAvailable now because the List may be updated on the next step. + final int nAvailable = availableEvents == null ? 0 : availableEvents.size(); // Calculate removals (in activeAddresses, not in newAddresses). List> allEvents = relativeComplement(false, newActiveAddresses, currentActiveAddresses, comparator, availableEvents); - reportEvents(reporter, allEvents, availableEvents); + reportEvents(reporter, allEvents, nAvailable); return allEvents; } private static void reportEvents(@Nullable final TwoIntsConsumer reporter, @Nullable final List> allEvents, - @Nullable final List> availableEvents) { + final int nAvailable) { if (reporter == null) { return; } @@ -79,8 +81,7 @@ private static void reportEvents(@Nullable final TwoIntsConsumer reporter, reporter.accept(0, 0); return; } - final int available = availableEvents == null ? 0 : availableEvents.size(); - reporter.accept(available, allEvents.size() - available); + reporter.accept(nAvailable, allEvents.size() - nAvailable); } /** diff --git a/servicetalk-dns-discovery-netty/src/main/java/io/servicetalk/dns/discovery/netty/DnsServiceDiscovererObserver.java b/servicetalk-dns-discovery-netty/src/main/java/io/servicetalk/dns/discovery/netty/DnsServiceDiscovererObserver.java index c3e6b31810..fc16d4ded6 100644 --- a/servicetalk-dns-discovery-netty/src/main/java/io/servicetalk/dns/discovery/netty/DnsServiceDiscovererObserver.java +++ b/servicetalk-dns-discovery-netty/src/main/java/io/servicetalk/dns/discovery/netty/DnsServiceDiscovererObserver.java @@ -88,16 +88,16 @@ interface ResolutionResult { int ttl(); /** - * Number of records that are {@link ServiceDiscovererEvent#isAvailable() available}. + * Number of resolved records that became {@link ServiceDiscovererEvent#isAvailable() available}. * - * @return the number of records that are {@link ServiceDiscovererEvent#isAvailable() available} + * @return the number of resolved records that became {@link ServiceDiscovererEvent#isAvailable() available} */ int nAvailable(); /** - * Number of records that are {@link ServiceDiscovererEvent#isAvailable() unavailable}. + * Number of resolved records that became {@link ServiceDiscovererEvent#isAvailable() unavailable}. * - * @return the number of records that are {@link ServiceDiscovererEvent#isAvailable() unavailable} + * @return the number of resolved records that became {@link ServiceDiscovererEvent#isAvailable() unavailable} */ int nUnavailable(); } diff --git a/servicetalk-dns-discovery-netty/src/test/java/io/servicetalk/dns/discovery/netty/DnsServiceDiscovererObserverTest.java b/servicetalk-dns-discovery-netty/src/test/java/io/servicetalk/dns/discovery/netty/DnsServiceDiscovererObserverTest.java index 0cd74cbe33..50493a76b7 100644 --- a/servicetalk-dns-discovery-netty/src/test/java/io/servicetalk/dns/discovery/netty/DnsServiceDiscovererObserverTest.java +++ b/servicetalk-dns-discovery-netty/src/test/java/io/servicetalk/dns/discovery/netty/DnsServiceDiscovererObserverTest.java @@ -15,6 +15,7 @@ */ package io.servicetalk.dns.discovery.netty; +import io.servicetalk.concurrent.Cancellable; import io.servicetalk.concurrent.api.CompositeCloseable; import io.servicetalk.concurrent.api.Publisher; import io.servicetalk.concurrent.internal.ServiceTalkTestTimeout; @@ -29,12 +30,13 @@ import org.junit.rules.Timeout; import java.net.UnknownHostException; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; import java.util.Map; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutionException; +import java.util.concurrent.LinkedBlockingDeque; import java.util.function.BiFunction; +import javax.annotation.Nullable; import static io.servicetalk.concurrent.api.AsyncCloseables.newCompositeCloseable; import static io.servicetalk.concurrent.internal.DeliberateException.DELIBERATE_EXCEPTION; @@ -47,6 +49,7 @@ import static org.hamcrest.Matchers.hasSize; import static org.hamcrest.Matchers.instanceOf; import static org.hamcrest.Matchers.is; +import static org.hamcrest.Matchers.notNullValue; import static org.hamcrest.Matchers.sameInstance; import static org.junit.Assert.assertThrows; import static org.mockito.ArgumentMatchers.any; @@ -109,7 +112,7 @@ public void srvQueryTriggersNewDiscoveryObserver() throws Exception { private void testNewDiscoveryObserver(BiFunction> publisherFactory, String expectedName) throws Exception { - List newDiscoveryCalls = new ArrayList<>(); + BlockingQueue newDiscoveryCalls = new LinkedBlockingDeque<>(); DnsClient client = dnsClient(name -> { newDiscoveryCalls.add(name); return NoopDnsDiscoveryObserver.INSTANCE; @@ -125,7 +128,7 @@ private void testNewDiscoveryObserver(BiFunction @Test public void aQueryTriggersNewResolutionObserver() throws Exception { - List newResolution = new ArrayList<>(); + BlockingQueue newResolution = new LinkedBlockingDeque<>(); DnsClient client = dnsClient(__ -> name -> { newResolution.add(name); return NoopDnsResolutionObserver.INSTANCE; @@ -141,8 +144,7 @@ public void aQueryTriggersNewResolutionObserver() throws Exception { @Test public void srvQueryTriggersNewResolutionObserver() throws Exception { - System.err.println(NoopDnsResolutionObserver.INSTANCE.toString()); - List newResolution = new ArrayList<>(); + BlockingQueue newResolution = new LinkedBlockingDeque<>(); DnsClient client = dnsClient(__ -> name -> { newResolution.add(name); return NoopDnsResolutionObserver.INSTANCE; @@ -170,7 +172,7 @@ public void srvQueryFailedResolution() { } private void testFailedResolution(BiFunction> publisherFactory) { - List resolutionFailures = new ArrayList<>(); + BlockingQueue resolutionFailures = new LinkedBlockingDeque<>(); DnsClient client = dnsClient(__ -> name -> new NoopDnsResolutionObserver() { @Override public void resolutionFailed(final Throwable cause) { @@ -190,8 +192,61 @@ public void resolutionFailed(final Throwable cause) { } @Test - public void aQueryResolutionResult() throws Exception { - List results = new ArrayList<>(); + public void aQueryResolutionResultNoUpdates() throws Exception { + aQueryResolutionResult(results -> { + assertResolutionResult(results.take(), 2, 2, 0); + assertResolutionResult(results.take(), 2, 0, 0); + }); + } + + @Test + public void aQueryResolutionResultNewIPsAvailable() throws Exception { + aQueryResolutionResult(results -> { + assertResolutionResult(results.take(), 2, 2, 0); + + recordStore.addIPv4Address(HOST_NAME, DEFAULT_TTL, nextIp(), nextIp()); + assertResolutionResult(results.take(), 4, 2, 0); + }); + } + + @Test + public void aQueryResolutionResultOneBecameUnavailable() throws Exception { + final String tmpIP = nextIp(); + recordStore.addIPv4Address(HOST_NAME, DEFAULT_TTL, tmpIP); + aQueryResolutionResult(results -> { + assertResolutionResult(results.take(), 3, 3, 0); + + recordStore.removeIPv4Address(HOST_NAME, DEFAULT_TTL, tmpIP); + assertResolutionResult(results.take(), 2, 0, 1); + }); + } + + @Test + public void aQueryResolutionResultNewAvailableOneUnavailable() throws Exception { + final String tmpIP = nextIp(); + recordStore.addIPv4Address(HOST_NAME, DEFAULT_TTL, tmpIP); + aQueryResolutionResult(results -> { + assertResolutionResult(results.take(), 3, 3, 0); + + recordStore.removeIPv4Address(HOST_NAME, DEFAULT_TTL, tmpIP); + recordStore.addIPv4Address(HOST_NAME, DEFAULT_TTL, nextIp()); + assertResolutionResult(results.take(), 3, 1, 1); + }); + } + + @Test + public void aQueryResolutionResultAllNewIPs() throws Exception { + aQueryResolutionResult(results -> { + assertResolutionResult(results.take(), 2, 2, 0); + + recordStore.removeIPv4Addresses(HOST_NAME); + recordStore.addIPv4Address(HOST_NAME, DEFAULT_TTL, nextIp(), nextIp(), nextIp()); + assertResolutionResult(results.take(), 3, 3, 2); + }); + } + + private void aQueryResolutionResult(ResultsVerifier> verifier) throws Exception { + BlockingQueue results = new LinkedBlockingDeque<>(); DnsClient client = dnsClient(__ -> name -> new NoopDnsResolutionObserver() { @Override public void resolutionCompleted(final ResolutionResult result) { @@ -199,21 +254,27 @@ public void resolutionCompleted(final ResolutionResult result) { } }); - Publisher publisher = client.dnsQuery(HOST_NAME); assertThat("Unexpected calls to resolutionCompleted", results, hasSize(0)); - // Wait until SD returns at least one address: - publisher.takeAtMost(1).ignoreElements().toFuture().get(); - assertThat("Unexpected number of calls to resolutionCompleted", results, hasSize(1)); - ResolutionResult result = results.get(0); - assertThat(result.resolvedRecords(), is(2)); - assertThat(result.ttl(), is(DEFAULT_TTL)); - assertThat(result.nAvailable(), is(2)); - assertThat(result.nUnavailable(), is(0)); + Cancellable discovery = client.dnsQuery(HOST_NAME).forEach(__ -> { }); + try { + verifier.verify(results); + } finally { + discovery.cancel(); + } + } + + private static void assertResolutionResult(@Nullable ResolutionResult result, + int resolvedRecords, int nAvailable, int nUnavailable) { + assertThat("Unexpected null ResolutionResult", result, is(notNullValue())); + assertThat("Unexpected number of resolvedRecords", result.resolvedRecords(), is(resolvedRecords)); + assertThat("Unexpected TTL value", result.ttl(), is(DEFAULT_TTL)); + assertThat("Unexpected number of nAvailable records", result.nAvailable(), is(nAvailable)); + assertThat("Unexpected number of nUnavailable records", result.nUnavailable(), is(nUnavailable)); } @Test public void srvQueryResolutionResult() throws Exception { - Map results = new HashMap<>(); + Map results = new ConcurrentHashMap<>(); DnsClient client = dnsClient(__ -> name -> new NoopDnsResolutionObserver() { @Override public void resolutionCompleted(final ResolutionResult result) { @@ -227,17 +288,8 @@ public void resolutionCompleted(final ResolutionResult result) { publisher.takeAtMost(1).ignoreElements().toFuture().get(); assertThat("Unexpected number of calls to resolutionCompleted", results.entrySet(), hasSize(2)); - ResolutionResult srvResult = results.get(SERVICE_NAME); - assertThat(srvResult.resolvedRecords(), is(1)); - assertThat(srvResult.ttl(), is(DEFAULT_TTL)); - assertThat(srvResult.nAvailable(), is(1)); - assertThat(srvResult.nUnavailable(), is(0)); - - ResolutionResult dnsResult = results.get(HOST_NAME + '.'); - assertThat(dnsResult.resolvedRecords(), is(2)); - assertThat(dnsResult.ttl(), is(DEFAULT_TTL)); - assertThat(dnsResult.nAvailable(), is(2)); - assertThat(dnsResult.nUnavailable(), is(0)); + assertResolutionResult(results.get(SERVICE_NAME), 1, 1, 0); + assertResolutionResult(results.get(HOST_NAME + '.'), 2, 2, 0); } @Test @@ -347,4 +399,9 @@ public void resolutionCompleted(final ResolutionResult result) { // noop } } + + @FunctionalInterface + private interface ResultsVerifier { + void verify(T t) throws Exception; + } } diff --git a/servicetalk-dns-discovery-netty/src/test/java/io/servicetalk/dns/discovery/netty/TestRecordStore.java b/servicetalk-dns-discovery-netty/src/test/java/io/servicetalk/dns/discovery/netty/TestRecordStore.java index 8c39adcfa6..93dcf22861 100644 --- a/servicetalk-dns-discovery-netty/src/test/java/io/servicetalk/dns/discovery/netty/TestRecordStore.java +++ b/servicetalk-dns-discovery-netty/src/test/java/io/servicetalk/dns/discovery/netty/TestRecordStore.java @@ -74,6 +74,10 @@ public synchronized boolean removeIPv4Address(final String domain, final int ttl return removeAddresses(domain, A, ttl, ipAddresses); } + public synchronized boolean removeIPv4Addresses(final String domain) { + return removeAddresses(domain, A); + } + public synchronized void addIPv6Address(final String domain, final int ttl, final String... ipAddresses) { addAddress(domain, AAAA, ttl, ipAddresses); } @@ -122,6 +126,17 @@ private void addAddress(final String domain, final RecordType recordType, final } } + private boolean removeAddresses(final String domain, final RecordType recordType) { + Map> typeMap = getTypeMap(domain); + boolean removed = typeMap.remove(recordType) != null; + List recordList = getRecordList(typeMap, recordType); + recordList.clear(); + if (removed && typeMap.isEmpty()) { + recordsToReturnByDomain.remove(domain, typeMap); + } + return removed; + } + private boolean removeAddresses(final String domain, final RecordType recordType, final int ttl, final String... ipAddresses) { boolean removed = false;