Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix incorrect numbers reported by DnsResolutionObserver#resolutionCompleted #1261

Merged
merged 1 commit into from
Dec 10, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -61,26 +61,27 @@ public static <T> List<ServiceDiscovererEvent<T>> calculateDifference(List<? ext
// Calculate additions (in newAddresses, not in activeAddresses).
List<ServiceDiscovererEvent<T>> availableEvents =
relativeComplement(true, currentActiveAddresses, newActiveAddresses, comparator, null);
// Store nAvailable now because the List may be updated on the next step.
final int nAvailable = availableEvents == null ? 0 : availableEvents.size();
// Calculate removals (in activeAddresses, not in newAddresses).
List<ServiceDiscovererEvent<T>> allEvents =
relativeComplement(false, newActiveAddresses, currentActiveAddresses, comparator, availableEvents);

reportEvents(reporter, allEvents, availableEvents);
reportEvents(reporter, allEvents, nAvailable);
return allEvents;
}

private static <T> void reportEvents(@Nullable final TwoIntsConsumer reporter,
@Nullable final List<ServiceDiscovererEvent<T>> allEvents,
@Nullable final List<ServiceDiscovererEvent<T>> availableEvents) {
final int nAvailable) {
if (reporter == null) {
return;
}
if (allEvents == null) {
reporter.accept(0, 0);
return;
}
final int available = availableEvents == null ? 0 : availableEvents.size();
reporter.accept(available, allEvents.size() - available);
reporter.accept(nAvailable, allEvents.size() - nAvailable);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,16 +88,16 @@ interface ResolutionResult {
int ttl();

/**
* Number of records that are {@link ServiceDiscovererEvent#isAvailable() available}.
* Number of resolved records that became {@link ServiceDiscovererEvent#isAvailable() available}.
*
* @return the number of records that are {@link ServiceDiscovererEvent#isAvailable() available}
* @return the number of resolved records that became {@link ServiceDiscovererEvent#isAvailable() available}
*/
int nAvailable();

/**
* Number of records that are {@link ServiceDiscovererEvent#isAvailable() unavailable}.
* Number of resolved records that became {@link ServiceDiscovererEvent#isAvailable() unavailable}.
*
* @return the number of records that are {@link ServiceDiscovererEvent#isAvailable() unavailable}
* @return the number of resolved records that became {@link ServiceDiscovererEvent#isAvailable() unavailable}
*/
int nUnavailable();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
*/
package io.servicetalk.dns.discovery.netty;

import io.servicetalk.concurrent.Cancellable;
import io.servicetalk.concurrent.api.CompositeCloseable;
import io.servicetalk.concurrent.api.Publisher;
import io.servicetalk.concurrent.internal.ServiceTalkTestTimeout;
Expand All @@ -29,12 +30,13 @@
import org.junit.rules.Timeout;

import java.net.UnknownHostException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.function.BiFunction;
import javax.annotation.Nullable;

import static io.servicetalk.concurrent.api.AsyncCloseables.newCompositeCloseable;
import static io.servicetalk.concurrent.internal.DeliberateException.DELIBERATE_EXCEPTION;
Expand All @@ -47,6 +49,7 @@
import static org.hamcrest.Matchers.hasSize;
import static org.hamcrest.Matchers.instanceOf;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.notNullValue;
import static org.hamcrest.Matchers.sameInstance;
import static org.junit.Assert.assertThrows;
import static org.mockito.ArgumentMatchers.any;
Expand Down Expand Up @@ -109,7 +112,7 @@ public void srvQueryTriggersNewDiscoveryObserver() throws Exception {

private void testNewDiscoveryObserver(BiFunction<DnsClient, String, Publisher<?>> publisherFactory,
String expectedName) throws Exception {
List<String> newDiscoveryCalls = new ArrayList<>();
BlockingQueue<String> newDiscoveryCalls = new LinkedBlockingDeque<>();
DnsClient client = dnsClient(name -> {
newDiscoveryCalls.add(name);
return NoopDnsDiscoveryObserver.INSTANCE;
Expand All @@ -125,7 +128,7 @@ private void testNewDiscoveryObserver(BiFunction<DnsClient, String, Publisher<?>

@Test
public void aQueryTriggersNewResolutionObserver() throws Exception {
List<String> newResolution = new ArrayList<>();
BlockingQueue<String> newResolution = new LinkedBlockingDeque<>();
DnsClient client = dnsClient(__ -> name -> {
newResolution.add(name);
return NoopDnsResolutionObserver.INSTANCE;
Expand All @@ -141,8 +144,7 @@ public void aQueryTriggersNewResolutionObserver() throws Exception {

@Test
public void srvQueryTriggersNewResolutionObserver() throws Exception {
System.err.println(NoopDnsResolutionObserver.INSTANCE.toString());
List<String> newResolution = new ArrayList<>();
BlockingQueue<String> newResolution = new LinkedBlockingDeque<>();
DnsClient client = dnsClient(__ -> name -> {
newResolution.add(name);
return NoopDnsResolutionObserver.INSTANCE;
Expand Down Expand Up @@ -170,7 +172,7 @@ public void srvQueryFailedResolution() {
}

private void testFailedResolution(BiFunction<DnsClient, String, Publisher<?>> publisherFactory) {
List<Throwable> resolutionFailures = new ArrayList<>();
BlockingQueue<Throwable> resolutionFailures = new LinkedBlockingDeque<>();
DnsClient client = dnsClient(__ -> name -> new NoopDnsResolutionObserver() {
@Override
public void resolutionFailed(final Throwable cause) {
Expand All @@ -190,30 +192,89 @@ public void resolutionFailed(final Throwable cause) {
}

@Test
public void aQueryResolutionResult() throws Exception {
List<ResolutionResult> results = new ArrayList<>();
public void aQueryResolutionResultNoUpdates() throws Exception {
aQueryResolutionResult(results -> {
assertResolutionResult(results.take(), 2, 2, 0);
assertResolutionResult(results.take(), 2, 0, 0);
});
}

@Test
public void aQueryResolutionResultNewIPsAvailable() throws Exception {
aQueryResolutionResult(results -> {
assertResolutionResult(results.take(), 2, 2, 0);

recordStore.addIPv4Address(HOST_NAME, DEFAULT_TTL, nextIp(), nextIp());
assertResolutionResult(results.take(), 4, 2, 0);
});
}

@Test
public void aQueryResolutionResultOneBecameUnavailable() throws Exception {
final String tmpIP = nextIp();
recordStore.addIPv4Address(HOST_NAME, DEFAULT_TTL, tmpIP);
aQueryResolutionResult(results -> {
assertResolutionResult(results.take(), 3, 3, 0);

recordStore.removeIPv4Address(HOST_NAME, DEFAULT_TTL, tmpIP);
assertResolutionResult(results.take(), 2, 0, 1);
});
}

@Test
public void aQueryResolutionResultNewAvailableOneUnavailable() throws Exception {
final String tmpIP = nextIp();
recordStore.addIPv4Address(HOST_NAME, DEFAULT_TTL, tmpIP);
aQueryResolutionResult(results -> {
assertResolutionResult(results.take(), 3, 3, 0);

recordStore.removeIPv4Address(HOST_NAME, DEFAULT_TTL, tmpIP);
recordStore.addIPv4Address(HOST_NAME, DEFAULT_TTL, nextIp());
assertResolutionResult(results.take(), 3, 1, 1);
});
}

@Test
public void aQueryResolutionResultAllNewIPs() throws Exception {
aQueryResolutionResult(results -> {
assertResolutionResult(results.take(), 2, 2, 0);

recordStore.removeIPv4Addresses(HOST_NAME);
recordStore.addIPv4Address(HOST_NAME, DEFAULT_TTL, nextIp(), nextIp(), nextIp());
assertResolutionResult(results.take(), 3, 3, 2);
});
}

private void aQueryResolutionResult(ResultsVerifier<BlockingQueue<ResolutionResult>> verifier) throws Exception {
BlockingQueue<ResolutionResult> results = new LinkedBlockingDeque<>();
DnsClient client = dnsClient(__ -> name -> new NoopDnsResolutionObserver() {
@Override
public void resolutionCompleted(final ResolutionResult result) {
results.add(result);
}
});

Publisher<?> publisher = client.dnsQuery(HOST_NAME);
assertThat("Unexpected calls to resolutionCompleted", results, hasSize(0));
// Wait until SD returns at least one address:
publisher.takeAtMost(1).ignoreElements().toFuture().get();
assertThat("Unexpected number of calls to resolutionCompleted", results, hasSize(1));
ResolutionResult result = results.get(0);
assertThat(result.resolvedRecords(), is(2));
assertThat(result.ttl(), is(DEFAULT_TTL));
assertThat(result.nAvailable(), is(2));
assertThat(result.nUnavailable(), is(0));
Cancellable discovery = client.dnsQuery(HOST_NAME).forEach(__ -> { });
try {
verifier.verify(results);
} finally {
discovery.cancel();
}
}

private static void assertResolutionResult(@Nullable ResolutionResult result,
int resolvedRecords, int nAvailable, int nUnavailable) {
assertThat("Unexpected null ResolutionResult", result, is(notNullValue()));
assertThat("Unexpected number of resolvedRecords", result.resolvedRecords(), is(resolvedRecords));
assertThat("Unexpected TTL value", result.ttl(), is(DEFAULT_TTL));
assertThat("Unexpected number of nAvailable records", result.nAvailable(), is(nAvailable));
assertThat("Unexpected number of nUnavailable records", result.nUnavailable(), is(nUnavailable));
}

@Test
public void srvQueryResolutionResult() throws Exception {
Map<String, ResolutionResult> results = new HashMap<>();
Map<String, ResolutionResult> results = new ConcurrentHashMap<>();
DnsClient client = dnsClient(__ -> name -> new NoopDnsResolutionObserver() {
@Override
public void resolutionCompleted(final ResolutionResult result) {
Expand All @@ -227,17 +288,8 @@ public void resolutionCompleted(final ResolutionResult result) {
publisher.takeAtMost(1).ignoreElements().toFuture().get();
assertThat("Unexpected number of calls to resolutionCompleted", results.entrySet(), hasSize(2));

ResolutionResult srvResult = results.get(SERVICE_NAME);
assertThat(srvResult.resolvedRecords(), is(1));
assertThat(srvResult.ttl(), is(DEFAULT_TTL));
assertThat(srvResult.nAvailable(), is(1));
assertThat(srvResult.nUnavailable(), is(0));

ResolutionResult dnsResult = results.get(HOST_NAME + '.');
assertThat(dnsResult.resolvedRecords(), is(2));
assertThat(dnsResult.ttl(), is(DEFAULT_TTL));
assertThat(dnsResult.nAvailable(), is(2));
assertThat(dnsResult.nUnavailable(), is(0));
assertResolutionResult(results.get(SERVICE_NAME), 1, 1, 0);
assertResolutionResult(results.get(HOST_NAME + '.'), 2, 2, 0);
}

@Test
Expand Down Expand Up @@ -347,4 +399,9 @@ public void resolutionCompleted(final ResolutionResult result) {
// noop
}
}

@FunctionalInterface
private interface ResultsVerifier<T> {
void verify(T t) throws Exception;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,10 @@ public synchronized boolean removeIPv4Address(final String domain, final int ttl
return removeAddresses(domain, A, ttl, ipAddresses);
}

public synchronized boolean removeIPv4Addresses(final String domain) {
return removeAddresses(domain, A);
}

public synchronized void addIPv6Address(final String domain, final int ttl, final String... ipAddresses) {
addAddress(domain, AAAA, ttl, ipAddresses);
}
Expand Down Expand Up @@ -122,6 +126,17 @@ private void addAddress(final String domain, final RecordType recordType, final
}
}

private boolean removeAddresses(final String domain, final RecordType recordType) {
Map<RecordType, List<ResourceRecord>> typeMap = getTypeMap(domain);
boolean removed = typeMap.remove(recordType) != null;
List<ResourceRecord> recordList = getRecordList(typeMap, recordType);
recordList.clear();
if (removed && typeMap.isEmpty()) {
recordsToReturnByDomain.remove(domain, typeMap);
}
return removed;
}

private boolean removeAddresses(final String domain, final RecordType recordType, final int ttl,
final String... ipAddresses) {
boolean removed = false;
Expand Down