Skip to content

Commit

Permalink
ESQL: Read from the BlockFactory (elastic#100231) (elastic#100310)
Browse files Browse the repository at this point in the history
This links the `BlockFactory` into the `Block` serialization code. With
this blocks that are deserialized from over the wire are tracked.

Co-authored-by: Nhat Nguyen <nhat.nguyen@elastic.co>
  • Loading branch information
nik9000 and dnhatn committed Oct 5, 2023
1 parent f45d69a commit cb57d48
Show file tree
Hide file tree
Showing 40 changed files with 795 additions and 467 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1844,4 +1844,9 @@ public DateFieldMapper.DateFieldType getTimestampFieldType(Index index) {
public IndexScopedSettings getIndexScopedSettings() {
return indexScopedSettings;
}

// TODO move this?
public BigArrays getBigArrays() {
return bigArrays;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ public final void testFromXContent() throws IOException {
.randomFieldsExcludeFilter(getRandomFieldsExcludeFilter())
.assertEqualsConsumer(this::assertEqualInstances)
.assertToXContentEquivalence(assertToXContentEquivalence())
.dispose(this::dispose)
.test();
}

Expand All @@ -61,41 +62,45 @@ public final void testConcurrentToXContent() throws IOException, InterruptedExce
() -> randomFrom(XContentType.values())
);
T testInstance = createXContextTestInstance(xContentType);
ToXContent.Params params = new ToXContent.DelegatingMapParams(
singletonMap(RestSearchAction.TYPED_KEYS_PARAM, "true"),
getToXContentParams()
);
boolean humanReadable = randomBoolean();
BytesRef firstTimeBytes = toXContent(asXContent(testInstance), xContentType, params, humanReadable).toBytesRef();

/*
* 500 rounds seems to consistently reproduce the issue on Nik's
* laptop. Larger numbers are going to be slower but more likely
* to reproduce the issue.
*/
int rounds = scaledRandomIntBetween(300, 5000);
concurrentTest(() -> {
try {
for (int r = 0; r < rounds; r++) {
BytesRef thisRoundBytes = toXContent(asXContent(testInstance), xContentType, params, humanReadable).toBytesRef();
if (firstTimeBytes.bytesEquals(thisRoundBytes)) {
continue;
}
StringBuilder error = new StringBuilder("Failed to round trip over ");
if (humanReadable) {
error.append("human readable ");
try {
ToXContent.Params params = new ToXContent.DelegatingMapParams(
singletonMap(RestSearchAction.TYPED_KEYS_PARAM, "true"),
getToXContentParams()
);
boolean humanReadable = randomBoolean();
BytesRef firstTimeBytes = toXContent(asXContent(testInstance), xContentType, params, humanReadable).toBytesRef();

/*
* 500 rounds seems to consistently reproduce the issue on Nik's
* laptop. Larger numbers are going to be slower but more likely
* to reproduce the issue.
*/
int rounds = scaledRandomIntBetween(300, 5000);
concurrentTest(() -> {
try {
for (int r = 0; r < rounds; r++) {
BytesRef thisRoundBytes = toXContent(asXContent(testInstance), xContentType, params, humanReadable).toBytesRef();
if (firstTimeBytes.bytesEquals(thisRoundBytes)) {
continue;
}
StringBuilder error = new StringBuilder("Failed to round trip over ");
if (humanReadable) {
error.append("human readable ");
}
error.append(xContentType);
error.append("\nCanonical is:\n").append(Strings.toString(asXContent(testInstance), true, true));
boolean showBytes = xContentType.xContent() == CborXContent.cborXContent;
error.append("\nWanted : ").append(showBytes ? firstTimeBytes : firstTimeBytes.utf8ToString());
error.append("\nBut got: ").append(showBytes ? thisRoundBytes : thisRoundBytes.utf8ToString());
fail(error.toString());
}
error.append(xContentType);
error.append("\nCanonical is:\n").append(Strings.toString(asXContent(testInstance), true, true));
boolean showBytes = xContentType.xContent() == CborXContent.cborXContent;
error.append("\nWanted : ").append(showBytes ? firstTimeBytes : firstTimeBytes.utf8ToString());
error.append("\nBut got: ").append(showBytes ? thisRoundBytes : thisRoundBytes.utf8ToString());
fail(error.toString());
} catch (IOException e) {
throw new AssertionError(e);
}
} catch (IOException e) {
throw new AssertionError(e);
}
});
});
} finally {
dispose(testInstance);
}
}

protected abstract ToXContent asXContent(T instance);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import org.elasticsearch.common.io.stream.NamedWriteable;
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.core.Releasable;
import org.elasticsearch.xcontent.ToXContent;

import java.io.IOException;
Expand Down Expand Up @@ -54,10 +55,20 @@ public abstract class AbstractWireTestCase<T> extends ESTestCase {
*/
public final void testEqualsAndHashcode() {
for (int runs = 0; runs < NUMBER_OF_TEST_RUNS; runs++) {
EqualsHashCodeTestUtils.checkEqualsAndHashCode(createTestInstance(), this::copyInstance, this::mutateInstance);
T testInstance = createTestInstance();
try {
EqualsHashCodeTestUtils.checkEqualsAndHashCode(testInstance, this::copyInstance, this::mutateInstance, this::dispose);
} finally {
dispose(testInstance);
}
}
}

/**
* Dispose of the copy, usually {@link Releasable#close} or a noop.
*/
protected void dispose(T t) {}

/**
* Calls {@link Object#equals} on equal objects on many threads and verifies
* they all return true. Folks tend to assume this is true about
Expand All @@ -67,19 +78,27 @@ public final void testEqualsAndHashcode() {
*/
public final void testConcurrentEquals() throws IOException, InterruptedException, ExecutionException {
T testInstance = createTestInstance();
T copy = copyInstance(testInstance);

/*
* 500 rounds seems to consistently reproduce the issue on Nik's
* laptop. Larger numbers are going to be slower but more likely
* to reproduce the issue.
*/
int rounds = scaledRandomIntBetween(300, 5000);
concurrentTest(() -> {
for (int r = 0; r < rounds; r++) {
assertEquals(testInstance, copy);
try {
T copy = copyInstance(testInstance);
try {

/*
* 500 rounds seems to consistently reproduce the issue on Nik's
* laptop. Larger numbers are going to be slower but more likely
* to reproduce the issue.
*/
int rounds = scaledRandomIntBetween(300, 5000);
concurrentTest(() -> {
for (int r = 0; r < rounds; r++) {
assertEquals(testInstance, copy);
}
});
} finally {
dispose(copy);
}
});
} finally {
dispose(testInstance);
}
}

/**
Expand Down Expand Up @@ -111,25 +130,34 @@ protected void concurrentTest(Runnable r) throws InterruptedException, Execution
*/
public final void testConcurrentHashCode() throws InterruptedException, ExecutionException {
T testInstance = createTestInstance();
int firstHashCode = testInstance.hashCode();

/*
* 500 rounds seems to consistently reproduce the issue on Nik's
* laptop. Larger numbers are going to be slower but more likely
* to reproduce the issue.
*/
int rounds = scaledRandomIntBetween(300, 5000);
concurrentTest(() -> {
for (int r = 0; r < rounds; r++) {
assertEquals(firstHashCode, testInstance.hashCode());
}
});
try {
int firstHashCode = testInstance.hashCode();

/*
* 500 rounds seems to consistently reproduce the issue on Nik's
* laptop. Larger numbers are going to be slower but more likely
* to reproduce the issue.
*/
int rounds = scaledRandomIntBetween(300, 5000);
concurrentTest(() -> {
for (int r = 0; r < rounds; r++) {
assertEquals(firstHashCode, testInstance.hashCode());
}
});
} finally {
dispose(testInstance);
}
}

public void testToString() throws Exception {
final String toString = createTestInstance().toString();
assertNotNull(toString);
assertThat(toString, not(emptyString()));
T testInstance = createTestInstance();
try {
final String toString = testInstance.toString();
assertNotNull(toString);
assertThat(toString, not(emptyString()));
} finally {
dispose(testInstance);
}
}

/**
Expand All @@ -138,7 +166,11 @@ public void testToString() throws Exception {
public final void testSerialization() throws IOException {
for (int runs = 0; runs < NUMBER_OF_TEST_RUNS; runs++) {
T testInstance = createTestInstance();
assertSerialization(testInstance);
try {
assertSerialization(testInstance);
} finally {
dispose(testInstance);
}
}
}

Expand All @@ -155,22 +187,25 @@ public final void testSerialization() throws IOException {
*/
public final void testConcurrentSerialization() throws InterruptedException, ExecutionException {
T testInstance = createTestInstance();

/*
* 500 rounds seems to consistently reproduce the issue on Nik's
* laptop. Larger numbers are going to be slower but more likely
* to reproduce the issue.
*/
int rounds = scaledRandomIntBetween(300, 2000);
concurrentTest(() -> {
try {
for (int r = 0; r < rounds; r++) {
assertSerialization(testInstance);
try {
/*
* 500 rounds seems to consistently reproduce the issue on Nik's
* laptop. Larger numbers are going to be slower but more likely
* to reproduce the issue.
*/
int rounds = scaledRandomIntBetween(300, 2000);
concurrentTest(() -> {
try {
for (int r = 0; r < rounds; r++) {
assertSerialization(testInstance);
}
} catch (IOException e) {
throw new AssertionError("error serializing", e);
}
} catch (IOException e) {
throw new AssertionError("error serializing", e);
}
});
});
} finally {
dispose(testInstance);
}
}

/**
Expand All @@ -187,7 +222,11 @@ protected final void assertSerialization(T testInstance) throws IOException {
*/
protected final void assertSerialization(T testInstance, TransportVersion version) throws IOException {
T deserializedInstance = copyInstance(testInstance, version);
assertEqualInstances(testInstance, deserializedInstance);
try {
assertEqualInstances(testInstance, deserializedInstance);
} finally {
dispose(deserializedInstance);
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@

import java.io.IOException;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.Predicate;
import java.util.function.Supplier;
Expand Down Expand Up @@ -125,6 +126,7 @@ public static class XContentTester<T> {
assertEquals(expectedInstance.hashCode(), newInstance.hashCode());
};
private boolean assertToXContentEquivalence = true;
private Consumer<T> dispose = t -> {};

private XContentTester(
CheckedBiFunction<XContent, BytesReference, XContentParser, IOException> createParser,
Expand All @@ -142,24 +144,32 @@ public void test() throws IOException {
for (int runs = 0; runs < numberOfTestRuns; runs++) {
XContentType xContentType = randomFrom(XContentType.values()).canonical();
T testInstance = instanceSupplier.apply(xContentType);
BytesReference originalXContent = toXContent.apply(testInstance, xContentType);
BytesReference shuffledContent = insertRandomFieldsAndShuffle(
originalXContent,
xContentType,
supportsUnknownFields,
shuffleFieldsExceptions,
randomFieldsExcludeFilter,
createParser
);
XContentParser parser = createParser.apply(XContentFactory.xContent(xContentType), shuffledContent);
T parsed = fromXContent.apply(parser);
assertEqualsConsumer.accept(testInstance, parsed);
if (assertToXContentEquivalence) {
assertToXContentEquivalent(
toXContent.apply(testInstance, xContentType),
toXContent.apply(parsed, xContentType),
xContentType
try {
BytesReference originalXContent = toXContent.apply(testInstance, xContentType);
BytesReference shuffledContent = insertRandomFieldsAndShuffle(
originalXContent,
xContentType,
supportsUnknownFields,
shuffleFieldsExceptions,
randomFieldsExcludeFilter,
createParser
);
XContentParser parser = createParser.apply(XContentFactory.xContent(xContentType), shuffledContent);
T parsed = fromXContent.apply(parser);
try {
assertEqualsConsumer.accept(testInstance, parsed);
if (assertToXContentEquivalence) {
assertToXContentEquivalent(
toXContent.apply(testInstance, xContentType),
toXContent.apply(parsed, xContentType),
xContentType
);
}
} finally {
dispose.accept(parsed);
}
} finally {
dispose.accept(testInstance);
}
}
}
Expand Down Expand Up @@ -193,6 +203,11 @@ public XContentTester<T> assertToXContentEquivalence(boolean assertToXContentEqu
this.assertToXContentEquivalence = assertToXContentEquivalence;
return this;
}

public XContentTester<T> dispose(Consumer<T> dispose) {
this.dispose = dispose;
return this;
}
}

public static <T extends ToXContent> void testFromXContent(
Expand Down
Loading

0 comments on commit cb57d48

Please sign in to comment.