Skip to content

Commit

Permalink
Merge branch 'master' into jrhizor/py-schema-converter
Browse files Browse the repository at this point in the history
  • Loading branch information
jrhizor committed Oct 14, 2020
2 parents 8bf9d50 + 000acdd commit 76a97e5
Show file tree
Hide file tree
Showing 55 changed files with 2,650 additions and 85 deletions.
12 changes: 6 additions & 6 deletions .github/workflows/gradle.yml
Original file line number Diff line number Diff line change
Expand Up @@ -35,18 +35,18 @@ jobs:
with:
node-version: '14.7'

- name: Build
run: ./gradlew build --no-daemon -g ${{ env.GRADLE_PATH }}

- name: Ensure no file change
run: test -z "$(git status --porcelain)"

- name: Write Integration Test Credentials
run: ./tools/bin/ci_credentials.sh
env:
BIGQUERY_INTEGRATION_TEST_CREDS: ${{ secrets.BIGQUERY_INTEGRATION_TEST_CREDS }}
STRIPE_INTEGRATION_TEST_CREDS: ${{ secrets.STRIPE_INTEGRATION_TEST_CREDS }}

- name: Build
run: ./gradlew build --no-daemon -g ${{ env.GRADLE_PATH }}

- name: Ensure no file change
run: test -z "$(git status --porcelain)"

- name: Run Integration Tests
run: ./gradlew integrationTest --no-daemon -g ${{ env.GRADLE_PATH }}

Expand Down
6 changes: 0 additions & 6 deletions airbyte-api/src/main/openapi/config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -1089,16 +1089,13 @@ components:
required:
- workspaceId
- sourceImplementationId
- sourceSpecificationId
- connectionConfiguration
- name
properties:
sourceImplementationId:
$ref: "#/components/schemas/SourceImplementationId"
workspaceId:
$ref: "#/components/schemas/WorkspaceId"
sourceSpecificationId:
$ref: "#/components/schemas/SourceSpecificationId"
connectionConfiguration:
$ref: "#/components/schemas/SourceConfiguration"
name:
Expand Down Expand Up @@ -1254,16 +1251,13 @@ components:
required:
- workspaceId
- destinationImplementationId
- destinationSpecificationId
- connectionConfiguration
- name
properties:
destinationImplementationId:
$ref: "#/components/schemas/DestinationImplementationId"
workspaceId:
$ref: "#/components/schemas/WorkspaceId"
destinationSpecificationId:
$ref: "#/components/schemas/DestinationSpecificationId"
connectionConfiguration:
$ref: "#/components/schemas/DestinationConfiguration"
name:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,19 +22,22 @@
* SOFTWARE.
*/

package io.airbyte.scheduler;
package io.airbyte.commons.concurrency;

import java.time.Duration;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class SchedulerShutdownHandler extends Thread {
public class GracefulShutdownHandler extends Thread {

private static final Logger LOGGER = LoggerFactory.getLogger(SchedulerShutdownHandler.class);
private static final Logger LOGGER = LoggerFactory.getLogger(GracefulShutdownHandler.class);
private final Duration terminateWaitDuration;
private final ExecutorService[] threadPools;

public SchedulerShutdownHandler(final ExecutorService... threadPools) {
public GracefulShutdownHandler(Duration terminateWaitDuration, final ExecutorService... threadPools) {
this.terminateWaitDuration = terminateWaitDuration;
this.threadPools = threadPools;
}

Expand All @@ -44,11 +47,11 @@ public void run() {
threadPool.shutdown();

try {
if (!threadPool.awaitTermination(30, TimeUnit.SECONDS)) {
LOGGER.error("Unable to kill worker threads by shutdown timeout.");
if (!threadPool.awaitTermination(terminateWaitDuration.getSeconds(), TimeUnit.SECONDS)) {
LOGGER.error("Unable to kill threads by shutdown timeout.");
}
} catch (InterruptedException e) {
LOGGER.error("Wait for graceful worker thread shutdown interrupted.", e);
LOGGER.error("Wait for graceful thread shutdown interrupted.", e);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Charsets;
import java.io.IOException;
import java.util.Optional;

Expand Down Expand Up @@ -109,4 +110,8 @@ public static <T> T clone(final T object) {
return (T) deserialize(serialize(object), object.getClass());
}

public static byte[] toBytes(JsonNode jsonNode) {
return serialize(jsonNode).getBytes(Charsets.UTF_8);
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
/*
* MIT License
*
* Copyright (c) 2020 Airbyte
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
* in the Software without restriction, including without limitation the rights
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in all
* copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
* SOFTWARE.
*/

package io.airbyte.commons.lang;

import java.util.Queue;

public interface CloseableQueue<E> extends Queue<E>, AutoCloseable {

}
Original file line number Diff line number Diff line change
Expand Up @@ -22,23 +22,23 @@
* SOFTWARE.
*/

package io.airbyte.scheduler;
package io.airbyte.commons.concurrency;

import static org.junit.jupiter.api.Assertions.*;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;

import java.time.Duration;
import java.util.concurrent.ExecutorService;
import org.junit.jupiter.api.Test;

class SchedulerShutdownHandlerTest {
class GracefulShutdownHandlerTest {

@Test
public void testRun() throws InterruptedException {
final ExecutorService executorService = mock(ExecutorService.class);
final SchedulerShutdownHandler schedulerShutdownHandler = new SchedulerShutdownHandler(executorService);
schedulerShutdownHandler.start();
schedulerShutdownHandler.join();
final GracefulShutdownHandler gracefulShutdownHandler = new GracefulShutdownHandler(Duration.ofSeconds(30), executorService);
gracefulShutdownHandler.start();
gracefulShutdownHandler.join();

verify(executorService).shutdown();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.JsonNode;
import com.google.common.base.Charsets;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import java.util.List;
Expand Down Expand Up @@ -193,6 +194,12 @@ void testMutateTypeToArrayStandard() {
Assertions.assertEquals(expectedWithoutArrayType, actualWithStringType);
}

@Test
void testToBytes() {
final String jsonString = "{\"test\":\"abc\",\"type\":[\"object\"]}";
Assertions.assertArrayEquals(jsonString.getBytes(Charsets.UTF_8), Jsons.toBytes(Jsons.deserialize(jsonString)));
}

private static class ToClass {

@JsonProperty("str")
Expand Down
14 changes: 12 additions & 2 deletions airbyte-db/src/main/java/io/airbyte/db/DatabaseHelper.java
Original file line number Diff line number Diff line change
Expand Up @@ -46,14 +46,24 @@ public static BasicDataSource getConnectionPool(String username,
return connectionPool;
}

public static <T> T query(BasicDataSource connectionPool, ContextQueryFunction<T> transform)
throws SQLException {
public static <T> T query(BasicDataSource connectionPool, ContextQueryFunction<T> transform) throws SQLException {

try (Connection connection = connectionPool.getConnection()) {
DSLContext context = getContext(connection);
return transform.apply(context);
}
}

public static <T> T transaction(BasicDataSource connectionPool, ContextQueryFunction<T> transform) throws SQLException {
try (Connection connection = connectionPool.getConnection()) {
DSLContext context = getContext(connection);
return context.transactionResult(configuration -> {
DSLContext transactionContext = DSL.using(configuration);
return transform.apply(transactionContext);
});
}
}

private static DSLContext getContext(Connection connection) {
return DSL.using(connection, SQLDialect.POSTGRES);
}
Expand Down
10 changes: 5 additions & 5 deletions airbyte-integrations/base-java/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,10 @@ WORKDIR /airbyte
COPY javabase.sh .

# airbyte base commands
ENV AIRBYTE_SPEC_CMD "./javabase.sh --spec"
ENV AIRBYTE_CHECK_CMD "./javabase.sh --check"
ENV AIRBYTE_DISCOVER_CMD "./javabase.sh --discover"
ENV AIRBYTE_READ_CMD "./javabase.sh --read"
ENV AIRBYTE_WRITE_CMD "./javabase.sh --write"
ENV AIRBYTE_SPEC_CMD "/airbyte/javabase.sh --spec"
ENV AIRBYTE_CHECK_CMD "/airbyte/javabase.sh --check"
ENV AIRBYTE_DISCOVER_CMD "/airbyte/javabase.sh --discover"
ENV AIRBYTE_READ_CMD "/airbyte/javabase.sh --read"
ENV AIRBYTE_WRITE_CMD "/airbyte/javabase.sh --write"

ENTRYPOINT ["/airbyte/base.sh"]
2 changes: 1 addition & 1 deletion airbyte-integrations/base-java/javabase.sh
Original file line number Diff line number Diff line change
Expand Up @@ -5,4 +5,4 @@ set -e
# Wrap run script in a script so that we can lazy evaluate the value of APPLICATION. APPLICATION is
# set by the dockerfile that inherits base-java, so it cannot be evaluated when base-java is built.
# We also need to make sure that stdin of the script is piped to the stdin of the java application.
cat <&0 | bin/"$APPLICATION" "$@"
cat <&0 | /airbyte/bin/"$APPLICATION" "$@"
3 changes: 3 additions & 0 deletions airbyte-integrations/bigquery-destination/.dockerignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
*
!Dockerfile
!build
1 change: 1 addition & 0 deletions airbyte-integrations/bigquery-destination/.gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
config
9 changes: 9 additions & 0 deletions airbyte-integrations/bigquery-destination/Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
FROM airbyte/base-java:dev

WORKDIR /airbyte

ENV APPLICATION bigquery-destination

COPY build/distributions/${APPLICATION}*.tar ${APPLICATION}.tar

RUN tar xf ${APPLICATION}.tar --strip-components=1
69 changes: 69 additions & 0 deletions airbyte-integrations/bigquery-destination/build.gradle
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
import com.bmuschko.gradle.docker.tasks.image.DockerBuildImage
plugins {
id 'com.bmuschko.docker-remote-api'
id 'application'
}

sourceSets {
integrationTest {
java {
srcDir 'src/test-integration/java'
}
resources {
srcDir 'src/test-integration/resources'
}
}
}
test.dependsOn('compileIntegrationTestJava')

configurations {
integrationTestImplementation.extendsFrom testImplementation
integrationTestRuntimeOnly.extendsFrom testRuntimeOnly
}

dependencies {
implementation project(':airbyte-config:models')
implementation project(':airbyte-integrations:base-java')
implementation project(':airbyte-queue')
implementation project(':airbyte-singer')

implementation 'com.google.cloud:google-cloud-bigquery:1.122.2'
implementation 'org.apache.commons:commons-lang3:3.11'

integrationTestImplementation project(':airbyte-integrations:integration-test-lib')

}

application {
mainClass = 'io.airbyte.integrations.destination.bigquery.BigQueryDestination'
}


def image = 'airbyte/airbyte-bigquery-destination:dev'

task imageName {
doLast {
println "IMAGE $image"
}
}

task buildImage(type: DockerBuildImage) {
inputDir = projectDir
images.add(image)
dependsOn ':airbyte-integrations:base-java:buildImage'
}

task integrationTest(type: Test) {
testClassesDirs += sourceSets.integrationTest.output.classesDirs
classpath += sourceSets.integrationTest.runtimeClasspath

useJUnitPlatform()
testLogging() {
events "passed", "failed"
exceptionFormat "full"
}

mustRunAfter test
dependsOn(buildImage)
}

Empty file.
Loading

0 comments on commit 76a97e5

Please sign in to comment.