Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

compare normalized records to expected records #876

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,12 @@
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;

import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.node.ObjectNode;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import io.airbyte.commons.json.Jsons;
import io.airbyte.commons.lang.Exceptions;
import io.airbyte.commons.resources.MoreResources;
Expand Down Expand Up @@ -59,6 +61,8 @@
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.stream.Collectors;
import java.util.stream.Stream;
Expand All @@ -70,9 +74,13 @@
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.ArgumentsProvider;
import org.junit.jupiter.params.provider.ArgumentsSource;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public abstract class TestDestination {

private static final Logger LOGGER = LoggerFactory.getLogger(TestDestination.class);

private TestDestinationEnv testEnv;

private Path jobRoot;
Expand Down Expand Up @@ -166,6 +174,7 @@ void setUpInternal() throws Exception {
final Path workspaceRoot = Files.createTempDirectory(testDir, "test");
jobRoot = Files.createDirectories(Path.of(workspaceRoot.toString(), "job"));
localRoot = Files.createTempDirectory(testDir, "output");
LOGGER.info("jobRoot: {}", jobRoot);
testEnv = new TestDestinationEnv(localRoot);

setup(testEnv);
Expand Down Expand Up @@ -244,16 +253,17 @@ public void testSync(String messagesFilename, String catalogFilename) throws Exc
@ParameterizedTest
@ArgumentsSource(DataArgumentsProvider.class)
public void testSyncWithNormalization(String messagesFilename, String catalogFilename) throws Exception {
if (!implementsBasicNormalization())
if (!implementsBasicNormalization()) {
return;
}

final AirbyteCatalog catalog = Jsons.deserialize(MoreResources.readResource(catalogFilename), AirbyteCatalog.class);
final List<AirbyteMessage> messages = MoreResources.readResource(messagesFilename).lines()
.map(record -> Jsons.deserialize(record, AirbyteMessage.class)).collect(Collectors.toList());
runSync(getConfigWithBasicNormalization(), messages, catalog);

assertSameMessages(messages, retrieveRecords(testEnv, catalog.getStreams().get(0).getName()));
assertEquivalentMessages(messages, retrieveNormalizedRecords(testEnv, catalog.getStreams().get(0).getName()));
assertSameMessagesPruneAirbyteInternalFields(messages, retrieveNormalizedRecords(testEnv, catalog.getStreams().get(0).getName()));
}

/**
Expand Down Expand Up @@ -303,8 +313,10 @@ private void runSync(JsonNode config, List<AirbyteMessage> messages, AirbyteCata
target.notifyEndOfStream();
target.close();

if (!implementsBasicNormalization())
// skip if basic normalization is not configured to run (either not set or false).
if (!config.hasNonNull(WorkerConstants.BASIC_NORMALIZATION_KEY) || !config.get(WorkerConstants.BASIC_NORMALIZATION_KEY).asBoolean()) {
return;
}

final NormalizationRunner runner = NormalizationRunnerFactory.create(
getImageName(),
Expand All @@ -325,22 +337,72 @@ private void assertSameMessages(List<AirbyteMessage> expected, List<JsonNode> ac
.map(AirbyteRecordMessage::getData)
.collect(Collectors.toList());

// we want to ignore order in this comparison.
assertEquals(expectedJson.size(), actual.size());
assertTrue(expectedJson.containsAll(actual));
assertTrue(actual.containsAll(expectedJson));
assertSameData(expectedJson, actual);
}

private void assertEquivalentMessages(List<AirbyteMessage> expected, List<JsonNode> actual) {
final List<JsonNode> expectedJson = expected.stream()
private void assertSameMessagesPruneAirbyteInternalFields(List<AirbyteMessage> expected, List<JsonNode> actual) {
final List<JsonNode> expectedPruned = expected.stream()
.filter(message -> message.getType() == AirbyteMessage.Type.RECORD)
.map(AirbyteMessage::getRecord)
.map(AirbyteRecordMessage::getData)
.map(Jsons::clone)
.map(this::prune)
.collect(Collectors.toList());

final List<JsonNode> actualPruned = actual.stream().map(this::prune).collect(Collectors.toList());
assertSameData(expectedPruned, actualPruned);
}

private void assertSameData(List<JsonNode> expected, List<JsonNode> actual) {
LOGGER.info("expected: {}", expected);
LOGGER.info("actual: {}", actual);

// we want to ignore order in this comparison.
assertEquals(expectedJson.size(), actual.size());
// todo Message can be slightly different with additional columns?
assertEquals(expected.size(), actual.size());
assertTrue(expected.containsAll(actual));
assertTrue(actual.containsAll(expected));
}

/**
* Same as {@link #pruneMutate(JsonNode)}, except does a defensive copy and returns a new json node
* object instead of mutating in place.
*
* @param json - json that will be pruned.
* @return pruned json node.
*/
private JsonNode prune(JsonNode json) {
final JsonNode clone = Jsons.clone(json);
pruneMutate(clone);
return clone;
}

/**
* Prune fields that are added internally by airbyte and are not part of the original data. Used so
* that we can compare data that is persisted by an Airbyte worker to the original data. This method
* mutates the provided json in place.
*
* @param json - json that will be pruned. will be mutated in place!
*/
private void pruneMutate(JsonNode json) {
final Set<String> keys = Jsons.object(json, new TypeReference<Map<String, Object>>() {}).keySet();
for (final String key : keys) {
final JsonNode node = json.get(key);
// recursively prune all airbyte internal fields.
if (node.isObject() || node.isArray()) {
pruneMutate(node);
}

// prune the following
// - airbyte internal fields
// - fields that match what airbyte generates as hash ids
// - null values -- normalization will often return `<key>: null` but in the original data that key
// likely did not exist in the original message. the most consistent thing to do is always remove
// the null fields (this choice does decrease our ability to check that normalization creates
// columns even if all the values in that column are null)
if (Sets.newHashSet("emitted_at", "ab_id", "normalized_at").contains(key) || key.matches("^_.*_hashid$") || json.get(key).isNull()) {
((ObjectNode) json).remove(key);
}
}
}

private JsonNode getConfigWithBasicNormalization() throws Exception {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,9 @@
},
"NZD": {
"type": "number"
},
"USD": {
"type": "number"
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@
import io.airbyte.commons.json.Jsons;
import io.airbyte.db.Databases;
import io.airbyte.integrations.standardtest.destination.TestDestination;
import io.airbyte.protocol.models.AirbyteMessage;
import java.util.List;
import java.util.stream.Collectors;
import org.jooq.JSONFormat;
Expand Down