Skip to content

Commit

Permalink
🐛 Fixed test for gcs, csv, json-local, mongodb and meilisearch destin…
Browse files Browse the repository at this point in the history
…ation (#6134)

* updated gcs, csv and localJson tests

* added emitted_at to MeiliSearchDestination

* updated testSyncWithLargeRecordBatch test

* added comment to meiliSerch destination test

* fixed remarks
  • Loading branch information
andriikorotkov authored Sep 17, 2021
1 parent 13e8be5 commit 0a027a1
Show file tree
Hide file tree
Showing 9 changed files with 23 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,6 @@
"destinationDefinitionId": "af7c921e-5892-4ff2-b6c1-4a5ab258fb7e",
"name": "MeiliSearch",
"dockerRepository": "airbyte/destination-meilisearch",
"dockerImageTag": "0.2.9",
"dockerImageTag": "0.2.10",
"documentationUrl": "https://docs.airbyte.io/integrations/destinations/meilisearch"
}
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@
- destinationDefinitionId: af7c921e-5892-4ff2-b6c1-4a5ab258fb7e
name: MeiliSearch
dockerRepository: airbyte/destination-meilisearch
dockerImageTag: 0.2.9
dockerImageTag: 0.2.10
documentationUrl: https://docs.airbyte.io/integrations/destinations/meilisearch
- destinationDefinitionId: ca81ee7c-3163-4246-af40-094cc31e5e42
name: MySQL
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -383,10 +383,9 @@ public void testSyncWithLargeRecordBatch(String messagesFilename, String catalog
final List<AirbyteMessage> messages = MoreResources.readResource(messagesFilename).lines()
.map(record -> Jsons.deserialize(record, AirbyteMessage.class)).collect(Collectors.toList());

final List<AirbyteMessage> largeNumberRecords = Collections.nCopies(1000, messages).stream().flatMap(List::stream).collect(Collectors.toList());
final List<AirbyteMessage> largeNumberRecords = Collections.nCopies(400, messages).stream().flatMap(List::stream).collect(Collectors.toList());

final JsonNode config = getConfig();
final String defaultSchema = getDefaultSchema(config);
runSyncAndVerifyStateOutput(config, largeNumberRecords, configuredCatalog, false);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,8 +80,10 @@ protected List<JsonNode> retrieveRecords(TestDestinationEnv testEnv,
JsonNode streamSchema)
throws Exception {
final List<Path> allOutputs = Files.list(testEnv.getLocalRoot().resolve(RELATIVE_PATH)).collect(Collectors.toList());

final Optional<Path> streamOutput =
allOutputs.stream().filter(path -> path.getFileName().toString().contains(new StandardNameTransformer().getRawTableName(streamName)))
allOutputs.stream()
.filter(path -> path.getFileName().toString().endsWith(new StandardNameTransformer().getRawTableName(streamName) + ".csv"))
.findFirst();

assertTrue(streamOutput.isPresent(), "could not find output file for stream: " + streamName);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,8 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import static io.airbyte.integrations.destination.s3.S3DestinationConstants.NAME_TRANSFORMER;

/**
* When adding a new GCS destination acceptance test, extend this class and do the following:
* <li>Implement {@link #getFormatConfig} that returns a {@link S3FormatConfig}</li>
Expand Down Expand Up @@ -107,6 +109,7 @@ protected List<S3ObjectSummary> getAllSyncedObjects(String streamName, String na
.listObjects(config.getBucketName(), outputPrefix)
.getObjectSummaries()
.stream()
.filter(o -> o.getKey().contains(NAME_TRANSFORMER.convertStreamName(streamName) + "/"))
.sorted(Comparator.comparingLong(o -> o.getLastModified().getTime()))
.collect(Collectors.toList());
LOGGER.info(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ protected List<JsonNode> retrieveRecords(TestDestinationEnv testEnv,
throws Exception {
final List<Path> allOutputs = Files.list(testEnv.getLocalRoot().resolve(RELATIVE_PATH)).collect(Collectors.toList());
final Optional<Path> streamOutput = allOutputs.stream()
.filter(path -> path.getFileName().toString().contains(new StandardNameTransformer().getRawTableName(streamName)))
.filter(path -> path.getFileName().toString().endsWith(new StandardNameTransformer().getRawTableName(streamName) + ".jsonl"))
.findFirst();

assertTrue(streamOutput.isPresent(), "could not find output file for stream: " + streamName);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,5 +8,5 @@ COPY build/distributions/${APPLICATION}*.tar ${APPLICATION}.tar

RUN tar xf ${APPLICATION}.tar --strip-components=1

LABEL io.airbyte.version=0.2.9
LABEL io.airbyte.version=0.2.10
LABEL io.airbyte.name=airbyte/destination-meilisearch
Original file line number Diff line number Diff line change
Expand Up @@ -45,12 +45,15 @@
import io.airbyte.protocol.models.ConfiguredAirbyteStream;
import io.airbyte.protocol.models.DestinationSyncMode;
import java.time.Instant;
import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;
import java.util.UUID;
import java.util.function.Consumer;
import java.util.stream.Collectors;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -83,8 +86,10 @@ public class MeiliSearchDestination extends BaseConnector implements Destination
private static final Logger LOGGER = LoggerFactory.getLogger(MeiliSearchDestination.class);

private static final int MAX_BATCH_SIZE = 10000;
private static final DateTimeFormatter FORMATTER = DateTimeFormatter.ofPattern("uuuu-MM-dd'T'HH:mm:ss.SSSSSSSSS");

public static final String AB_PK_COLUMN = "_ab_pk";
public static final String AB_EMITTED_AT_COLUMN = "_ab_emitted_at";

@Override
public AirbyteConnectionStatus check(JsonNode config) {
Expand Down Expand Up @@ -164,6 +169,7 @@ private static RecordWriter recordWriterFunction(final Map<String, Index> indexN
.stream()
.map(AirbyteRecordMessage::getData)
.peek(o -> ((ObjectNode) o).put(AB_PK_COLUMN, Names.toAlphanumericAndUnderscore(UUID.randomUUID().toString())))
.peek(o -> ((ObjectNode) o).put(AB_EMITTED_AT_COLUMN, LocalDateTime.now().format(FORMATTER)))
.collect(Collectors.toList()));
final String s = index.addDocuments(json);
LOGGER.info("add docs response {}", s);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.Comparator;
import java.util.List;
import java.util.stream.Collectors;
import org.testcontainers.containers.GenericContainer;
Expand Down Expand Up @@ -101,10 +102,14 @@ protected List<JsonNode> retrieveRecords(TestDestinationEnv env,
final Index index = meiliSearchClient.index(Names.toAlphanumericAndUnderscore(streamName));
final String responseString = index.getDocuments();
final JsonNode response = Jsons.deserialize(responseString);
return MoreStreams.toStream(response.iterator())
return MoreStreams.toStream(response.iterator())
// strip out the airbyte primary key because the test cases only expect the data, no the airbyte
// metadata column.
// We also sort the data by "emitted_at" and then remove that column, because the test cases only expect data,
// not the airbyte metadata column.
.peek(r -> ((ObjectNode) r).remove(MeiliSearchDestination.AB_PK_COLUMN))
.sorted(Comparator.comparing(o -> o.get(MeiliSearchDestination.AB_EMITTED_AT_COLUMN).asText()))
.peek(r -> ((ObjectNode) r).remove(MeiliSearchDestination.AB_EMITTED_AT_COLUMN))
.collect(Collectors.toList());
}

Expand Down

0 comments on commit 0a027a1

Please sign in to comment.