diff --git a/airbyte-config/init/src/main/resources/seed/destination_definitions.yaml b/airbyte-config/init/src/main/resources/seed/destination_definitions.yaml index 84cd5e79274b0..f968e607d95be 100644 --- a/airbyte-config/init/src/main/resources/seed/destination_definitions.yaml +++ b/airbyte-config/init/src/main/resources/seed/destination_definitions.yaml @@ -24,6 +24,12 @@ dockerImageTag: 0.1.0 documentationUrl: https://docs.airbyte.com/integrations/destinations/doris releaseStage: alpha +- name: Apache Iceberg + destinationDefinitionId: df65a8f3-9908-451b-aa9b-445462803560 + dockerRepository: airbyte/destination-iceberg + dockerImageTag: 0.1.0 + documentationUrl: https://docs.airbyte.com/integrations/destinations/iceberg + releaseStage: alpha - name: AWS Datalake destinationDefinitionId: 99878c90-0fbd-46d3-9d98-ffde879d17fc dockerRepository: airbyte/destination-aws-datalake diff --git a/airbyte-config/init/src/main/resources/seed/destination_specs.yaml b/airbyte-config/init/src/main/resources/seed/destination_specs.yaml index 68b7d036f3c55..60ded4ebaf5b9 100644 --- a/airbyte-config/init/src/main/resources/seed/destination_specs.yaml +++ b/airbyte-config/init/src/main/resources/seed/destination_specs.yaml @@ -249,6 +249,281 @@ supported_destination_sync_modes: - "append" - "overwrite" +- dockerImage: "airbyte/destination-iceberg:0.1.0" + spec: + documentationUrl: "https://docs.airbyte.com/integrations/destinations/iceberg" + connectionSpecification: + $schema: "http://json-schema.org/draft-07/schema#" + title: "Iceberg Destination Spec" + type: "object" + required: + - "catalog_config" + - "storage_config" + - "format_config" + properties: + catalog_config: + title: "Iceberg catalog config" + type: "object" + description: "Catalog config of Iceberg." + oneOf: + - title: "HiveCatalog: Use Apache Hive MetaStore" + required: + - "catalog_type" + - "hive_thrift_uri" + properties: + catalog_type: + title: "Catalog Type" + type: "string" + default: "Hive" + enum: + - "Hive" + order: 0 + hive_thrift_uri: + title: "Hive Metastore thrift uri" + type: "string" + description: "Hive MetaStore thrift server uri of iceberg catalog." + examples: + - "host:port" + order: 1 + database: + title: "Default database" + description: "The default database tables are written to if the source\ + \ does not specify a namespace. The usual value for this field is\ + \ \"default\"." + type: "string" + default: "default" + examples: + - "default" + order: 2 + - title: "HadoopCatalog: Use hierarchical file systems as same as storage\ + \ config" + description: "A Hadoop catalog doesn’t need to connect to a Hive MetaStore,\ + \ but can only be used with HDFS or similar file systems that support\ + \ atomic rename." + required: + - "catalog_type" + properties: + catalog_type: + title: "Catalog Type" + type: "string" + default: "Hadoop" + enum: + - "Hadoop" + order: 0 + database: + title: "Default database" + description: "The default database tables are written to if the source\ + \ does not specify a namespace. The usual value for this field is\ + \ \"default\"." + type: "string" + default: "default" + examples: + - "default" + order: 1 + - title: "JdbcCatalog: Use relational database" + description: "Using a table in a relational database to manage Iceberg\ + \ tables through JDBC. Read more here. Supporting: PostgreSQL" + required: + - "catalog_type" + properties: + catalog_type: + title: "Catalog Type" + type: "string" + default: "Jdbc" + enum: + - "Jdbc" + order: 0 + database: + title: "Default schema" + description: "The default schema tables are written to if the source\ + \ does not specify a namespace. The usual value for this field is\ + \ \"public\"." + type: "string" + default: "public" + examples: + - "public" + order: 1 + jdbc_url: + title: "Jdbc url" + type: "string" + examples: + - "jdbc:postgresql://{host}:{port}/{database}" + order: 2 + username: + title: "User" + description: "Username to use to access the database." + type: "string" + order: 3 + password: + title: "Password" + description: "Password associated with the username." + type: "string" + airbyte_secret: true + order: 4 + ssl: + title: "SSL Connection" + description: "Encrypt data using SSL. When activating SSL, please\ + \ select one of the connection modes." + type: "boolean" + default: false + order: 5 + catalog_schema: + title: "schema for Iceberg catalog" + description: "Iceberg catalog metadata tables are written to catalog\ + \ schema. The usual value for this field is \"public\"." + type: "string" + default: "public" + examples: + - "public" + order: 6 + order: 0 + storage_config: + title: "Storage config" + type: "object" + description: "Storage config of Iceberg." + oneOf: + - title: "S3" + type: "object" + description: "S3 object storage" + required: + - "storage_type" + - "access_key_id" + - "secret_access_key" + - "s3_warehouse_uri" + properties: + storage_type: + title: "Storage Type" + type: "string" + default: "S3" + enum: + - "S3" + order: 0 + access_key_id: + type: "string" + description: "The access key ID to access the S3 bucket. Airbyte requires\ + \ Read and Write permissions to the given bucket. Read more here." + title: "S3 Key ID" + airbyte_secret: true + examples: + - "A012345678910EXAMPLE" + order: 0 + secret_access_key: + type: "string" + description: "The corresponding secret to the access key ID. Read\ + \ more here" + title: "S3 Access Key" + airbyte_secret: true + examples: + - "a012345678910ABCDEFGH/AbCdEfGhEXAMPLEKEY" + order: 1 + s3_warehouse_uri: + title: "S3 Warehouse Uri for Iceberg" + type: "string" + description: "The Warehouse Uri for Iceberg" + examples: + - "s3a://my-bucket/path/to/warehouse" + - "s3://my-bucket/path/to/warehouse" + order: 2 + s3_bucket_region: + title: "S3 Bucket Region" + type: "string" + default: "" + description: "The region of the S3 bucket. See here for all region codes." + enum: + - "" + - "us-east-1" + - "us-east-2" + - "us-west-1" + - "us-west-2" + - "af-south-1" + - "ap-east-1" + - "ap-south-1" + - "ap-northeast-1" + - "ap-northeast-2" + - "ap-northeast-3" + - "ap-southeast-1" + - "ap-southeast-2" + - "ca-central-1" + - "cn-north-1" + - "cn-northwest-1" + - "eu-central-1" + - "eu-north-1" + - "eu-south-1" + - "eu-west-1" + - "eu-west-2" + - "eu-west-3" + - "sa-east-1" + - "me-south-1" + - "us-gov-east-1" + - "us-gov-west-1" + order: 3 + s3_endpoint: + title: "Endpoint" + type: "string" + default: "" + description: "Your S3 endpoint url. Read more here" + examples: + - "http://localhost:9000" + - "localhost:9000" + order: 4 + s3_path_style_access: + type: "boolean" + description: "Use path style access" + examples: + - true + - false + default: true + order: 5 + order: 1 + format_config: + title: "File format" + type: "object" + required: + - "format" + description: "File format of Iceberg storage." + properties: + format: + title: "File storage format" + type: "string" + default: "Parquet" + description: "" + enum: + - "Parquet" + - "Avro" + order: 0 + flush_batch_size: + title: "Data file flushing batch size" + description: "Iceberg data file flush batch size. Incoming rows write\ + \ to cache firstly; When cache size reaches this 'batch size', flush\ + \ into real Iceberg data file." + type: "integer" + default: 10000 + order: 1 + auto_compact: + title: "Auto compact data files" + description: "Auto compact data files when stream close" + type: "boolean" + default: false + order: 2 + compact_target_file_size_in_mb: + title: "Target size of compacted data file" + description: "Specify the target size of Iceberg data file when performing\ + \ a compaction action. " + type: "integer" + default: 100 + order: 3 + order: 2 + supportsNormalization: false + supportsDBT: false + supported_destination_sync_modes: + - "overwrite" + - "append" - dockerImage: "airbyte/destination-aws-datalake:0.1.1" spec: documentationUrl: "https://docs.airbyte.com/integrations/destinations/aws-datalake" diff --git a/airbyte-integrations/builds.md b/airbyte-integrations/builds.md index b59dd842895d6..cd2a9cb9f1e46 100644 --- a/airbyte-integrations/builds.md +++ b/airbyte-integrations/builds.md @@ -164,7 +164,8 @@ | Google Cloud Storage (GCS) | [![destination-gcs](https://img.shields.io/endpoint?url=https%3A%2F%2Fdnsgjos7lj2fu.cloudfront.net%2Ftests%2Fsummary%2Fdestination-gcs%2Fbadge.json)](https://dnsgjos7lj2fu.cloudfront.net/tests/summary/destination-gcs) | | Google Firestore | [![destination-firestore](https://img.shields.io/endpoint?url=https%3A%2F%2Fdnsgjos7lj2fu.cloudfront.net%2Ftests%2Fsummary%2Fdestination-firestore%2Fbadge.json)](https://dnsgjos7lj2fu.cloudfront.net/tests/summary/destination-firestore) | | Google PubSub | [![destination-pubsub](https://img.shields.io/endpoint?url=https%3A%2F%2Fdnsgjos7lj2fu.cloudfront.net%2Ftests%2Fsummary%2Fdestination-pubsub%2Fbadge.json)](https://dnsgjos7lj2fu.cloudfront.net/tests/summary/destination-pubsub) | -| Google Sheets | [![destination-sheets](https://img.shields.io/endpoint?url=https%3A%2F%2Fdnsgjos7lj2fu.cloudfront.net%2Ftests%2Fsummary%2Fdestination-sheets%2Fbadge.json)](https://dnsgjos7lj2fu.cloudfront.net/tests/summary/destination-sheets) | +| Google Sheets | [![destination-sheets](https://img.shields.io/endpoint?url=https%3A%2F%2Fdnsgjos7lj2fu.cloudfront.net%2Ftests%2Fsummary%2Fdestination-sheets%2Fbadge.json)](https://dnsgjos7lj2fu.cloudfront.net/tests/summary/destination-sheets) | | +| Apache Iceberg | [![destination-iceberg](https://img.shields.io/endpoint?url=https%3A%2F%2Fdnsgjos7lj2fu.cloudfront.net%2Ftests%2Fsummary%2Fdestination-iceberg%2Fbadge.json)](https://dnsgjos7lj2fu.cloudfront.net/tests/summary/destination-iceberg) | Kafka | [![destination-kafka](https://img.shields.io/endpoint?url=https%3A%2F%2Fdnsgjos7lj2fu.cloudfront.net%2Ftests%2Fsummary%2Fdestination-kafka%2Fbadge.json)](https://dnsgjos7lj2fu.cloudfront.net/tests/summary/destination-kafka) | | Keen (Chargify) | [![destination-keen](https://img.shields.io/endpoint?url=https%3A%2F%2Fdnsgjos7lj2fu.cloudfront.net%2Ftests%2Fsummary%2Fdestination-keen%2Fbadge.json)](https://dnsgjos7lj2fu.cloudfront.net/tests/summary/destination-keen) | | Local CSV | [![destination-csv](https://img.shields.io/endpoint?url=https%3A%2F%2Fdnsgjos7lj2fu.cloudfront.net%2Ftests%2Fsummary%2Fdestination-csv%2Fbadge.json)](https://dnsgjos7lj2fu.cloudfront.net/tests/summary/destination-csv) | diff --git a/airbyte-integrations/connectors/destination-iceberg/.dockerignore b/airbyte-integrations/connectors/destination-iceberg/.dockerignore new file mode 100644 index 0000000000000..65c7d0ad3e73c --- /dev/null +++ b/airbyte-integrations/connectors/destination-iceberg/.dockerignore @@ -0,0 +1,3 @@ +* +!Dockerfile +!build diff --git a/airbyte-integrations/connectors/destination-iceberg/Dockerfile b/airbyte-integrations/connectors/destination-iceberg/Dockerfile new file mode 100644 index 0000000000000..2ef9968d0405a --- /dev/null +++ b/airbyte-integrations/connectors/destination-iceberg/Dockerfile @@ -0,0 +1,25 @@ +FROM airbyte/integration-base-java:dev AS build + +WORKDIR /airbyte +ENV APPLICATION destination-iceberg + +COPY build/distributions/${APPLICATION}*.tar ${APPLICATION}.tar + +RUN tar xf ${APPLICATION}.tar --strip-components=1 && rm -rf ${APPLICATION}.tar + +FROM airbyte/integration-base-java:dev + +WORKDIR /airbyte +ENV APPLICATION destination-iceberg + +ENV JAVA_OPTS="--add-opens java.base/java.lang=ALL-UNNAMED \ + --add-opens java.base/java.util=ALL-UNNAMED \ + --add-opens java.base/java.lang.reflect=ALL-UNNAMED \ + --add-opens java.base/java.text=ALL-UNNAMED \ + --add-opens java.base/sun.nio.ch=ALL-UNNAMED \ + --add-opens java.base/java.nio=ALL-UNNAMED " + +COPY --from=build /airbyte /airbyte + +LABEL io.airbyte.version=0.1.0 +LABEL io.airbyte.name=airbyte/destination-iceberg diff --git a/airbyte-integrations/connectors/destination-iceberg/README.md b/airbyte-integrations/connectors/destination-iceberg/README.md new file mode 100644 index 0000000000000..d995d3a0e0833 --- /dev/null +++ b/airbyte-integrations/connectors/destination-iceberg/README.md @@ -0,0 +1,94 @@ +# Destination Iceberg + +This is the repository for the Iceberg destination connector in Java. +For information about how to use this connector within Airbyte, +see [the User Documentation](https://docs.airbyte.io/integrations/destinations/iceberg). + +## Local development + +#### Building via Gradle + +From the Airbyte repository root, run: + +``` +./gradlew :airbyte-integrations:connectors:destination-iceberg:build +``` + +#### Create credentials + +**If you are a community contributor**, generate the necessary credentials and place them in `secrets/config.json` +conforming to the spec file in `src/main/resources/spec.json`. +Note that the `secrets` directory is git-ignored by default, so there is no danger of accidentally checking in sensitive +information. + +Please get examples in `secrets-examples` directory. + +**If you are an Airbyte core member**, follow +the [instructions](https://docs.airbyte.io/connector-development#using-credentials-in-ci) to set up the credentials. + +### Locally running the connector docker image + +#### Build + +Build the connector image via Gradle: + +``` +./gradlew :airbyte-integrations:connectors:destination-iceberg:airbyteDocker +``` + +When building via Gradle, the docker image name and tag, respectively, are the values of the `io.airbyte.name` +and `io.airbyte.version` `LABEL`s in +the Dockerfile. + +#### Run + +Then run any of the connector commands as follows: + +``` +docker run --rm airbyte/destination-iceberg:dev spec +docker run --rm -v $(pwd)/secrets:/secrets airbyte/destination-iceberg:dev check --config /secrets/config.json +docker run --rm -v $(pwd)/secrets:/secrets airbyte/destination-iceberg:dev discover --config /secrets/config.json +docker run --rm -v $(pwd)/secrets:/secrets -v $(pwd)/integration_tests:/integration_tests airbyte/destination-iceberg:dev read --config /secrets/config.json --catalog /integration_tests/configured_catalog.json +``` + +## Testing + +We use `JUnit` for Java tests. + +### Unit and Integration Tests + +Place unit tests under `src/test/io/airbyte/integrations/destinations/iceberg`. + +#### Acceptance Tests + +Airbyte has a standard test suite that all destination connectors must pass. Implement the `TODO`s in +`src/test-integration/java/io/airbyte/integrations/destinations/icebergDestinationAcceptanceTest.java`. + +### Using gradle to run tests + +All commands should be run from airbyte project root. +To run unit tests: + +``` +./gradlew :airbyte-integrations:connectors:destination-iceberg:unitTest +``` + +To run acceptance and custom integration tests: + +``` +./gradlew :airbyte-integrations:connectors:destination-iceberg:integrationTest +``` + +## Dependency Management + +### Publishing a new version of the connector + +You've checked out the repo, implemented a million dollar feature, and you're ready to share your changes with the +world. Now what? + +1. Make sure your changes are passing unit and integration tests. +2. Bump the connector version in `Dockerfile` -- just increment the value of the `LABEL io.airbyte.version` + appropriately (we use [SemVer](https://semver.org/)). +3. Create a Pull Request. +4. Pat yourself on the back for being an awesome contributor. +5. Someone from Airbyte will take a look at your PR and iterate with you to merge it into master. diff --git a/airbyte-integrations/connectors/destination-iceberg/bootstrap.md b/airbyte-integrations/connectors/destination-iceberg/bootstrap.md new file mode 100644 index 0000000000000..c9ae78bceb385 --- /dev/null +++ b/airbyte-integrations/connectors/destination-iceberg/bootstrap.md @@ -0,0 +1,12 @@ +# Apache Iceberg + +## Overview + +Apache Iceberg is an open table format for huge analytic datasets. Iceberg adds tables to compute engines including +Spark, Trino, PrestoDB, Flink, Hive and Impala using a high-performance table format that works just like a SQL table. + +## API Reference + +The Iceberg reference +documents: [https://iceberg.apache.org/docs/latest/api/](https://iceberg.apache.org/docs/latest/api/) + diff --git a/airbyte-integrations/connectors/destination-iceberg/build.gradle b/airbyte-integrations/connectors/destination-iceberg/build.gradle new file mode 100644 index 0000000000000..13fe5ff1d9235 --- /dev/null +++ b/airbyte-integrations/connectors/destination-iceberg/build.gradle @@ -0,0 +1,54 @@ +plugins { + id 'application' + id 'airbyte-docker' + id 'airbyte-integration-test-java' +} + +application { + mainClass = 'io.airbyte.integrations.destination.iceberg.IcebergDestination' +} + +dependencies { + implementation project(':airbyte-config:config-models') + implementation project(':airbyte-protocol:protocol-models') + implementation project(':airbyte-integrations:bases:base-java') + implementation files(project(':airbyte-integrations:bases:base-java').airbyteDocker.outputs) + + implementation('org.apache.spark:spark-sql_2.13:3.3.0') { + exclude(group: 'org.apache.hadoop', module: 'hadoop-common') + } + implementation('org.apache.spark:spark-hive_2.13:3.3.0') { + exclude(group: 'org.apache.hadoop', module: 'hadoop-common') + } + implementation 'org.apache.iceberg:iceberg-spark-runtime-3.3_2.13:1.0.0' + implementation "software.amazon.awssdk:bundle:2.17.131" + implementation "software.amazon.awssdk:url-connection-client:2.17.131" + implementation "org.apache.hadoop:hadoop-aws:3.3.2" + implementation "org.apache.hadoop:hadoop-client-api:3.3.2" + implementation "org.apache.hadoop:hadoop-client-runtime:3.3.2" + implementation "org.postgresql:postgresql:42.5.0" + implementation "commons-collections:commons-collections:3.2.2" +// implementation "software.amazon.awssdk:utils:2.17.131" + + testImplementation libs.connectors.testcontainers.postgresql + integrationTestJavaImplementation libs.connectors.testcontainers.postgresql + + integrationTestJavaImplementation project(':airbyte-integrations:bases:standard-destination-test') + integrationTestJavaImplementation project(':airbyte-integrations:connectors:destination-iceberg') + + compileOnly 'org.projectlombok:lombok:1.18.24' + annotationProcessor 'org.projectlombok:lombok:1.18.24' + + testCompileOnly 'org.projectlombok:lombok:1.18.24' + testAnnotationProcessor 'org.projectlombok:lombok:1.18.24' + + testImplementation 'org.mockito:mockito-inline:4.7.0' +} + +test { + jvmArgs = ['--add-opens=java.base/sun.nio.ch=ALL-UNNAMED', '--add-opens=java.base/java.nio=ALL-UNNAMED'] +} + +task prepareKotlinBuildScriptModel { + +} \ No newline at end of file diff --git a/airbyte-integrations/connectors/destination-iceberg/secrets-examples/hadoop-catalog.json b/airbyte-integrations/connectors/destination-iceberg/secrets-examples/hadoop-catalog.json new file mode 100644 index 0000000000000..7000393abf0f5 --- /dev/null +++ b/airbyte-integrations/connectors/destination-iceberg/secrets-examples/hadoop-catalog.json @@ -0,0 +1,20 @@ +{ + "catalog_config": { + "catalog_type": "Hadoop", + "database": "default" + }, + "storage_config": { + "storage_type": "S3", + "access_key_id": "xxxxxxxxxxx", + "secret_access_key": "yyyyyyyyyyyy", + "s3_warehouse_uri": "s3a://warehouse/hive", + "s3_bucket_region": "us-east-1", + "s3_endpoint": "your-own-minio-host:9000" + }, + "format_config": { + "format": "Parquet", + "flush_batch_size": 10000, + "auto_compact": true, + "compact_target_file_size_in_mb": 100 + } +} diff --git a/airbyte-integrations/connectors/destination-iceberg/secrets-examples/hive-catalog.json b/airbyte-integrations/connectors/destination-iceberg/secrets-examples/hive-catalog.json new file mode 100644 index 0000000000000..011eaa5c4deac --- /dev/null +++ b/airbyte-integrations/connectors/destination-iceberg/secrets-examples/hive-catalog.json @@ -0,0 +1,21 @@ +{ + "catalog_config": { + "catalog_type": "Hive", + "hive_thrift_uri": "thrift://xxxx:9083", + "database": "default" + }, + "storage_config": { + "storage_type": "S3", + "access_key_id": "xxxxxxxxxxx", + "secret_access_key": "yyyyyyyyyyyy", + "s3_warehouse_uri": "s3a://warehouse/hive", + "s3_bucket_region": "us-east-1", + "s3_endpoint": "your-own-minio-host:9000" + }, + "format_config": { + "format": "Parquet", + "flush_batch_size": 10000, + "auto_compact": true, + "compact_target_file_size_in_mb": 100 + } +} diff --git a/airbyte-integrations/connectors/destination-iceberg/secrets-examples/jdbc-catalog.json b/airbyte-integrations/connectors/destination-iceberg/secrets-examples/jdbc-catalog.json new file mode 100644 index 0000000000000..38147b65dbf20 --- /dev/null +++ b/airbyte-integrations/connectors/destination-iceberg/secrets-examples/jdbc-catalog.json @@ -0,0 +1,25 @@ +{ + "catalog_config": { + "catalog_type": "Jdbc", + "database": "public", + "jdbc_url": "jdbc:postgresql://host:port/database", + "username": "username", + "password": "password", + "ssl": false, + "catalog_schema": "public" + }, + "storage_config": { + "storage_type": "S3", + "access_key_id": "xxxxxxxxxxx", + "secret_access_key": "yyyyyyyyyyyy", + "s3_warehouse_uri": "s3a://warehouse/hive", + "s3_bucket_region": "us-east-1", + "s3_endpoint": "your-own-minio-host:9000" + }, + "format_config": { + "format": "Parquet", + "flush_batch_size": 10000, + "auto_compact": true, + "compact_target_file_size_in_mb": 100 + } +} diff --git a/airbyte-integrations/connectors/destination-iceberg/src/main/java/io/airbyte/integrations/destination/iceberg/IcebergConstants.java b/airbyte-integrations/connectors/destination-iceberg/src/main/java/io/airbyte/integrations/destination/iceberg/IcebergConstants.java new file mode 100644 index 0000000000000..858f020cc29e7 --- /dev/null +++ b/airbyte-integrations/connectors/destination-iceberg/src/main/java/io/airbyte/integrations/destination/iceberg/IcebergConstants.java @@ -0,0 +1,56 @@ +/* + * Copyright (c) 2022 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.integrations.destination.iceberg; + +/** + * @author Leibniz on 2022/10/26. + */ +public class IcebergConstants { + + /** + * Root Config keys + */ + public static final String ICEBERG_CATALOG_CONFIG_KEY = "catalog_config"; + public static final String ICEBERG_STORAGE_CONFIG_KEY = "storage_config"; + public static final String ICEBERG_FORMAT_CONFIG_KEY = "format_config"; + + /** + * Catalog Config keys + */ + public static final String ICEBERG_CATALOG_TYPE_CONFIG_KEY = "catalog_type"; + public static final String HIVE_THRIFT_URI_CONFIG_KEY = "hive_thrift_uri"; + public static final String DEFAULT_DATABASE_CONFIG_KEY = "database"; + public static final String JDBC_URL_CONFIG_KEY = "jdbc_url"; + public static final String JDBC_USERNAME_CONFIG_KEY = "username"; + public static final String JDBC_PASSWORD_CONFIG_KEY = "password"; + public static final String JDBC_SSL_CONFIG_KEY = "ssl"; + public static final String JDBC_CATALOG_SCHEMA_CONFIG_KEY = "catalog_schema"; + + /** + * Storage Config keys + */ + public static final String ICEBERG_STORAGE_TYPE_CONFIG_KEY = "storage_type"; + public static final String S3_ACCESS_KEY_ID_CONFIG_KEY = "access_key_id"; + public static final String S3_SECRET_KEY_CONFIG_KEY = "secret_access_key"; + public static final String S3_WAREHOUSE_URI_CONFIG_KEY = "s3_warehouse_uri"; + public static final String S3_BUCKET_REGION_CONFIG_KEY = "s3_bucket_region"; + public static final String S3_ENDPOINT_CONFIG_KEY = "s3_endpoint"; + public static final String S3_PATH_STYLE_ACCESS_CONFIG_KEY = "s3_path_style_access"; + + /** + * Format Config keys + */ + public static final String FORMAT_TYPE_CONFIG_KEY = "format"; + public static final String FLUSH_BATCH_SIZE_CONFIG_KEY = "flush_batch_size"; + public static final String AUTO_COMPACT_CONFIG_KEY = "auto_compact"; + public static final String COMPACT_TARGET_FILE_SIZE_IN_MB_CONFIG_KEY = "compact_target_file_size_in_mb"; + + /** + * default values + */ + public static final String CATALOG_NAME = "iceberg"; + public static final String DEFAULT_DATABASE = "default"; + +} diff --git a/airbyte-integrations/connectors/destination-iceberg/src/main/java/io/airbyte/integrations/destination/iceberg/IcebergConsumer.java b/airbyte-integrations/connectors/destination-iceberg/src/main/java/io/airbyte/integrations/destination/iceberg/IcebergConsumer.java new file mode 100644 index 0000000000000..3c76b5a8ed86e --- /dev/null +++ b/airbyte-integrations/connectors/destination-iceberg/src/main/java/io/airbyte/integrations/destination/iceberg/IcebergConsumer.java @@ -0,0 +1,226 @@ +/* + * Copyright (c) 2022 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.integrations.destination.iceberg; + +import static io.airbyte.integrations.base.JavaBaseConstants.COLUMN_NAME_AB_ID; +import static io.airbyte.integrations.base.JavaBaseConstants.COLUMN_NAME_DATA; +import static io.airbyte.integrations.base.JavaBaseConstants.COLUMN_NAME_EMITTED_AT; +import static org.apache.logging.log4j.util.Strings.isNotBlank; + +import io.airbyte.commons.json.Jsons; +import io.airbyte.integrations.base.AirbyteStreamNameNamespacePair; +import io.airbyte.integrations.base.CommitOnStateAirbyteMessageConsumer; +import io.airbyte.integrations.destination.iceberg.config.WriteConfig; +import io.airbyte.integrations.destination.iceberg.config.catalog.IcebergCatalogConfig; +import io.airbyte.protocol.models.AirbyteMessage; +import io.airbyte.protocol.models.AirbyteMessage.Type; +import io.airbyte.protocol.models.AirbyteRecordMessage; +import io.airbyte.protocol.models.ConfiguredAirbyteCatalog; +import io.airbyte.protocol.models.ConfiguredAirbyteStream; +import io.airbyte.protocol.models.DestinationSyncMode; +import java.sql.Timestamp; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Set; +import java.util.UUID; +import java.util.function.Consumer; +import lombok.extern.slf4j.Slf4j; +import org.apache.iceberg.catalog.Catalog; +import org.apache.iceberg.catalog.TableIdentifier; +import org.apache.iceberg.spark.actions.SparkActions; +import org.apache.spark.sql.Row; +import org.apache.spark.sql.SaveMode; +import org.apache.spark.sql.SparkSession; +import org.apache.spark.sql.catalyst.expressions.GenericRow; +import org.apache.spark.sql.types.StringType$; +import org.apache.spark.sql.types.StructType; +import org.apache.spark.sql.types.TimestampType$; + +/** + * @author Leibniz on 2022/10/26. + */ +@Slf4j +public class IcebergConsumer extends CommitOnStateAirbyteMessageConsumer { + + private final SparkSession spark; + private final ConfiguredAirbyteCatalog catalog; + private final IcebergCatalogConfig catalogConfig; + + private Map writeConfigs; + + private final StructType normalizationSchema; + + public IcebergConsumer(SparkSession spark, + Consumer outputRecordCollector, + ConfiguredAirbyteCatalog catalog, + IcebergCatalogConfig catalogConfig) { + super(outputRecordCollector); + this.spark = spark; + this.catalog = catalog; + this.catalogConfig = catalogConfig; + this.normalizationSchema = new StructType().add(COLUMN_NAME_AB_ID, StringType$.MODULE$) + .add(COLUMN_NAME_EMITTED_AT, TimestampType$.MODULE$) + .add(COLUMN_NAME_DATA, StringType$.MODULE$); + } + + /** + * call this method to initialize any resources that need to be created BEFORE the consumer consumes + * any messages + */ + @Override + protected void startTracked() throws Exception { + Map configs = new HashMap<>(); + Set namespaceSet = new HashSet<>(); + for (final ConfiguredAirbyteStream stream : catalog.getStreams()) { + final String streamName = stream.getStream().getName().toLowerCase(); + String namespace = (isNotBlank(stream.getStream().getNamespace()) ? stream.getStream().getNamespace() + : catalogConfig.defaultOutputDatabase()).toLowerCase(); + if (!namespaceSet.contains(namespace)) { + namespaceSet.add(namespace); + try { + spark.sql("CREATE DATABASE IF NOT EXISTS " + namespace); + } catch (Exception e) { + log.warn("Create non-existed database failed: {}", e.getMessage(), e); + } + } + final DestinationSyncMode syncMode = stream.getDestinationSyncMode(); + if (syncMode == null) { + throw new IllegalStateException("Undefined destination sync mode"); + } + final boolean isAppendMode = syncMode != DestinationSyncMode.OVERWRITE; + AirbyteStreamNameNamespacePair nameNamespacePair = AirbyteStreamNameNamespacePair.fromAirbyteSteam(stream.getStream()); + Integer flushBatchSize = catalogConfig.getFormatConfig().getFlushBatchSize(); + WriteConfig writeConfig = new WriteConfig(namespace, streamName, isAppendMode, flushBatchSize); + configs.put(nameNamespacePair, writeConfig); + try { + spark.sql("DROP TABLE IF EXISTS " + writeConfig.getFullTempTableName()); + } catch (Exception e) { + log.warn("Drop existed temp table failed: {}", e.getMessage(), e); + } + } + this.writeConfigs = configs; + } + + /** + * call this method when receive a non-STATE AirbyteMessage Ref to AirbyteMessage + */ + @Override + protected void acceptTracked(AirbyteMessage msg) throws Exception { + if (msg.getType() != Type.RECORD) { + return; + } + final AirbyteRecordMessage recordMessage = msg.getRecord(); + + // ignore other message types. + AirbyteStreamNameNamespacePair nameNamespacePair = AirbyteStreamNameNamespacePair.fromRecordMessage( + recordMessage); + WriteConfig writeConfig = writeConfigs.get(nameNamespacePair); + if (writeConfig == null) { + throw new IllegalArgumentException(String.format( + "Message contained record from a stream that was not in the catalog. namespace: %s , stream: %s", + recordMessage.getNamespace(), + recordMessage.getStream())); + } + + // write data + Row row = new GenericRow(new Object[] {UUID.randomUUID().toString(), new Timestamp(recordMessage.getEmittedAt()), + Jsons.serialize(recordMessage.getData())}); + boolean needInsert = writeConfig.addData(row); + if (needInsert) { + appendToTempTable(writeConfig); + } + } + + private void appendToTempTable(WriteConfig writeConfig) { + String tableName = writeConfig.getFullTempTableName(); + List rows = writeConfig.fetchDataCache(); + // saveAsTable even if rows is empty, to ensure table is created. + // otherwise the table would be missing, and throws exception in close() + log.info("=> Flushing {} rows into {}", rows.size(), tableName); + spark.createDataFrame(rows, normalizationSchema).write() + // append data to temp table + .mode(SaveMode.Append) + // TODO compression config + .option("write-format", catalogConfig.getFormatConfig().getFormat().getFormatName()).saveAsTable(tableName); + } + + /** + * call this method when receive a STATE AirbyteMessage ———— it is the last message + */ + @Override + public void commit() throws Exception {} + + @Override + protected void close(boolean hasFailed) throws Exception { + log.info("close {}, hasFailed={}", this.getClass().getSimpleName(), hasFailed); + Catalog icebergCatalog = catalogConfig.genCatalog(); + try { + if (!hasFailed) { + log.info("==> Migration finished with no explicit errors. Copying data from temp tables to permanent"); + for (WriteConfig writeConfig : writeConfigs.values()) { + appendToTempTable(writeConfig); + String tempTableName = writeConfig.getFullTempTableName(); + String finalTableName = writeConfig.getFullTableName(); + log.info("=> Migration({}) data from {} to {}", + writeConfig.isAppendMode() ? "append" : "overwrite", + tempTableName, + finalTableName); + spark.sql("SELECT * FROM %s".formatted(tempTableName)) + .write() + .mode(writeConfig.isAppendMode() ? SaveMode.Append : SaveMode.Overwrite) + .saveAsTable(finalTableName); + if (catalogConfig.getFormatConfig().isAutoCompact()) { + tryCompactTable(icebergCatalog, writeConfig); + } + } + log.info("==> Copy temp tables finished..."); + } else { + log.error("Had errors while migrations"); + } + } finally { + log.info("Removing temp tables..."); + for (Entry entry : writeConfigs.entrySet()) { + tryDropTempTable(icebergCatalog, entry.getValue()); + } + log.info("Closing Spark Session..."); + this.spark.close(); + log.info("Finishing destination process...completed"); + } + } + + private void tryDropTempTable(Catalog icebergCatalog, WriteConfig writeConfig) { + try { + log.info("Trying to drop temp table: {}", writeConfig.getFullTempTableName()); + TableIdentifier tempTableIdentifier = TableIdentifier.of(writeConfig.getNamespace(), + writeConfig.getTempTableName()); + boolean dropSuccess = icebergCatalog.dropTable(tempTableIdentifier, true); + log.info("Drop temp table: {}", writeConfig.getFullTempTableName()); + } catch (Exception e) { + String errMsg = e.getMessage(); + log.error("Drop temp table caught exception:{}", errMsg, e); + } + } + + private void tryCompactTable(Catalog icebergCatalog, WriteConfig writeConfig) { + log.info("=> Auto-Compact is enabled, try compact Iceberg data files"); + int compactTargetFileSizeBytes = + catalogConfig.getFormatConfig().getCompactTargetFileSizeInMb() * 1024 * 1024; + try { + TableIdentifier tableIdentifier = TableIdentifier.of(writeConfig.getNamespace(), + writeConfig.getTableName()); + SparkActions.get() + .rewriteDataFiles(icebergCatalog.loadTable(tableIdentifier)) + .option("target-file-size-bytes", String.valueOf(compactTargetFileSizeBytes)) + .execute(); + } catch (Exception e) { + log.warn("Compact Iceberg data files failed: {}", e.getMessage(), e); + } + } + +} diff --git a/airbyte-integrations/connectors/destination-iceberg/src/main/java/io/airbyte/integrations/destination/iceberg/IcebergDestination.java b/airbyte-integrations/connectors/destination-iceberg/src/main/java/io/airbyte/integrations/destination/iceberg/IcebergDestination.java new file mode 100644 index 0000000000000..1872637caa35a --- /dev/null +++ b/airbyte-integrations/connectors/destination-iceberg/src/main/java/io/airbyte/integrations/destination/iceberg/IcebergDestination.java @@ -0,0 +1,90 @@ +/* + * Copyright (c) 2022 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.integrations.destination.iceberg; + +import com.fasterxml.jackson.databind.JsonNode; +import com.google.common.annotations.VisibleForTesting; +import io.airbyte.integrations.BaseConnector; +import io.airbyte.integrations.base.AirbyteMessageConsumer; +import io.airbyte.integrations.base.Destination; +import io.airbyte.integrations.base.IntegrationRunner; +import io.airbyte.integrations.destination.iceberg.config.catalog.IcebergCatalogConfig; +import io.airbyte.integrations.destination.iceberg.config.catalog.IcebergCatalogConfigFactory; +import io.airbyte.protocol.models.AirbyteConnectionStatus; +import io.airbyte.protocol.models.AirbyteConnectionStatus.Status; +import io.airbyte.protocol.models.AirbyteMessage; +import io.airbyte.protocol.models.ConfiguredAirbyteCatalog; +import java.util.Map; +import java.util.Objects; +import java.util.function.Consumer; +import lombok.extern.slf4j.Slf4j; +import org.apache.spark.sql.SparkSession; +import org.apache.spark.sql.SparkSession.Builder; + +@Slf4j +public class IcebergDestination extends BaseConnector implements Destination { + + private final IcebergCatalogConfigFactory icebergCatalogConfigFactory; + + public IcebergDestination() { + this.icebergCatalogConfigFactory = new IcebergCatalogConfigFactory(); + } + + @VisibleForTesting + public IcebergDestination(IcebergCatalogConfigFactory icebergCatalogConfigFactory) { + this.icebergCatalogConfigFactory = Objects.requireNonNullElseGet(icebergCatalogConfigFactory, + IcebergCatalogConfigFactory::new); + } + + public static void main(String[] args) throws Exception { + new IntegrationRunner(new IcebergDestination()).run(args); + } + + @Override + public AirbyteConnectionStatus check(JsonNode config) { + try { + IcebergCatalogConfig icebergCatalogConfig = icebergCatalogConfigFactory.fromJsonNodeConfig(config); + icebergCatalogConfig.check(); + + // getting here means Iceberg catalog check success + return new AirbyteConnectionStatus().withStatus(Status.SUCCEEDED); + } catch (final Exception e) { + log.error("Exception attempting to access the Iceberg catalog: ", e); + Throwable rootCause = getRootCause(e); + String errMessage = + "Could not connect to the Iceberg catalog with the provided configuration. \n" + e.getMessage() + + ", root cause: " + rootCause.getClass().getSimpleName() + "(" + rootCause.getMessage() + ")"; + return new AirbyteConnectionStatus() + .withStatus(AirbyteConnectionStatus.Status.FAILED) + .withMessage(errMessage); + } + } + + private Throwable getRootCause(Throwable exp) { + Throwable curCause = exp.getCause(); + if (curCause == null) { + return exp; + } else { + return getRootCause(curCause); + } + } + + @Override + public AirbyteMessageConsumer getConsumer(JsonNode config, + ConfiguredAirbyteCatalog catalog, + Consumer outputRecordCollector) { + final IcebergCatalogConfig icebergCatalogConfig = this.icebergCatalogConfigFactory.fromJsonNodeConfig(config); + Map sparkConfMap = icebergCatalogConfig.sparkConfigMap(); + + Builder sparkBuilder = SparkSession.builder() + .master("local") + .appName("Airbyte->Iceberg-" + System.currentTimeMillis()); + sparkConfMap.forEach(sparkBuilder::config); + SparkSession spark = sparkBuilder.getOrCreate(); + + return new IcebergConsumer(spark, outputRecordCollector, catalog, icebergCatalogConfig); + } + +} diff --git a/airbyte-integrations/connectors/destination-iceberg/src/main/java/io/airbyte/integrations/destination/iceberg/config/WriteConfig.java b/airbyte-integrations/connectors/destination-iceberg/src/main/java/io/airbyte/integrations/destination/iceberg/config/WriteConfig.java new file mode 100644 index 0000000000000..04032374a5109 --- /dev/null +++ b/airbyte-integrations/connectors/destination-iceberg/src/main/java/io/airbyte/integrations/destination/iceberg/config/WriteConfig.java @@ -0,0 +1,71 @@ +/* + * Copyright (c) 2022 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.integrations.destination.iceberg.config; + +import io.airbyte.integrations.destination.NamingConventionTransformer; +import io.airbyte.integrations.destination.StandardNameTransformer; +import io.airbyte.integrations.destination.iceberg.IcebergConstants; +import java.io.Serializable; +import java.util.ArrayList; +import java.util.List; +import lombok.Data; +import org.apache.spark.sql.Row; + +/** + * Write config for each stream + * + * @author Leibniz on 2022/10/26. + */ +@Data +public class WriteConfig implements Serializable { + + private static final NamingConventionTransformer namingResolver = new StandardNameTransformer(); + private static final String AIRBYTE_RAW_TABLE_PREFIX = "airbyte_raw_"; + private static final String AIRBYTE_TMP_TABLE_PREFIX = "_airbyte_tmp_"; + + private final String namespace; + private final String tableName; + private final String tempTableName; + private final String fullTableName; + private final String fullTempTableName; + private final boolean isAppendMode; + private final Integer flushBatchSize; + + // TODO perf: use stageFile to do cache, see + // io.airbyte.integrations.destination.bigquery.BigQueryWriteConfig.addStagedFile + private final List dataCache; + + public WriteConfig(String namespace, String streamName, boolean isAppendMode, Integer flushBatchSize) { + this.namespace = namingResolver.convertStreamName(namespace); + this.tableName = namingResolver.convertStreamName(AIRBYTE_RAW_TABLE_PREFIX + streamName); + this.tempTableName = namingResolver.convertStreamName(AIRBYTE_TMP_TABLE_PREFIX + streamName); + final String tableName = genTableName(namespace, AIRBYTE_RAW_TABLE_PREFIX + streamName); + final String tempTableName = genTableName(namespace, AIRBYTE_TMP_TABLE_PREFIX + streamName); + this.fullTableName = tableName; + this.fullTempTableName = tempTableName; + this.isAppendMode = isAppendMode; + this.flushBatchSize = flushBatchSize; + this.dataCache = new ArrayList<>(flushBatchSize); + } + + public List fetchDataCache() { + List copied = new ArrayList<>(this.dataCache); + this.dataCache.clear(); + return copied; + } + + public boolean addData(Row row) { + this.dataCache.add(row); + return this.dataCache.size() >= flushBatchSize; + } + + private String genTableName(String database, String tmpTableName) { + return "%s.`%s`.`%s`".formatted( + IcebergConstants.CATALOG_NAME, + namingResolver.convertStreamName(database), + namingResolver.convertStreamName(tmpTableName)); + } + +} diff --git a/airbyte-integrations/connectors/destination-iceberg/src/main/java/io/airbyte/integrations/destination/iceberg/config/catalog/CatalogType.java b/airbyte-integrations/connectors/destination-iceberg/src/main/java/io/airbyte/integrations/destination/iceberg/config/catalog/CatalogType.java new file mode 100644 index 0000000000000..042ccfb6f95d8 --- /dev/null +++ b/airbyte-integrations/connectors/destination-iceberg/src/main/java/io/airbyte/integrations/destination/iceberg/config/catalog/CatalogType.java @@ -0,0 +1,14 @@ +/* + * Copyright (c) 2022 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.integrations.destination.iceberg.config.catalog; + +/** + * @author Leibniz on 2022/10/31. + */ +public enum CatalogType { + HIVE, + HADOOP, + JDBC +} diff --git a/airbyte-integrations/connectors/destination-iceberg/src/main/java/io/airbyte/integrations/destination/iceberg/config/catalog/HadoopCatalogConfig.java b/airbyte-integrations/connectors/destination-iceberg/src/main/java/io/airbyte/integrations/destination/iceberg/config/catalog/HadoopCatalogConfig.java new file mode 100644 index 0000000000000..242b12bfaaf51 --- /dev/null +++ b/airbyte-integrations/connectors/destination-iceberg/src/main/java/io/airbyte/integrations/destination/iceberg/config/catalog/HadoopCatalogConfig.java @@ -0,0 +1,66 @@ +/* + * Copyright (c) 2022 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.integrations.destination.iceberg.config.catalog; + +import static io.airbyte.integrations.destination.iceberg.IcebergConstants.CATALOG_NAME; + +import com.fasterxml.jackson.databind.JsonNode; +import java.util.HashMap; +import java.util.Map; +import java.util.Map.Entry; +import lombok.Data; +import lombok.EqualsAndHashCode; +import lombok.ToString; +import org.apache.hadoop.conf.Configuration; +import org.apache.iceberg.CatalogProperties; +import org.apache.iceberg.catalog.Catalog; +import org.apache.iceberg.hadoop.HadoopCatalog; +import org.jetbrains.annotations.NotNull; + +/** + * @author Leibniz on 2022/11/1. + */ +@Data +@ToString(callSuper = true) +@EqualsAndHashCode(callSuper = false) +public class HadoopCatalogConfig extends IcebergCatalogConfig { + + public static final String SPARK_HADOOP_CONFIG_PREFIX = "spark.hadoop."; + + public HadoopCatalogConfig(@NotNull JsonNode catalogConfigJson) {} + + @Override + public Map sparkConfigMap() { + Map configMap = new HashMap<>(); + configMap.put("spark.network.timeout", "300000"); + configMap.put("spark.sql.defaultCatalog", CATALOG_NAME); + configMap.put("spark.sql.catalog." + CATALOG_NAME, "org.apache.iceberg.spark.SparkCatalog"); + configMap.put("spark.sql.catalog." + CATALOG_NAME + ".type", "hadoop"); + configMap.put("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions"); + configMap.put("spark.driver.extraJavaOptions", "-Dpackaging.type=jar -Djava.io.tmpdir=/tmp"); + + configMap.putAll(this.storageConfig.sparkConfigMap(CATALOG_NAME)); + return configMap; + } + + @Override + public Catalog genCatalog() { + Configuration conf = new Configuration(); + for (Entry entry : this.storageConfig.sparkConfigMap(CATALOG_NAME).entrySet()) { + String key = entry.getKey(); + if (key.startsWith(SPARK_HADOOP_CONFIG_PREFIX + "fs.")) { + conf.set(key.substring(SPARK_HADOOP_CONFIG_PREFIX.length()), entry.getValue()); + } + } + + HadoopCatalog catalog = new HadoopCatalog(); + catalog.setConf(conf); + Map properties = new HashMap<>(this.storageConfig.catalogInitializeProperties()); + properties.put(CatalogProperties.WAREHOUSE_LOCATION, this.storageConfig.getWarehouseUri()); + catalog.initialize(CATALOG_NAME, properties); + return catalog; + } + +} diff --git a/airbyte-integrations/connectors/destination-iceberg/src/main/java/io/airbyte/integrations/destination/iceberg/config/catalog/HiveCatalogConfig.java b/airbyte-integrations/connectors/destination-iceberg/src/main/java/io/airbyte/integrations/destination/iceberg/config/catalog/HiveCatalogConfig.java new file mode 100644 index 0000000000000..8fe92b8257061 --- /dev/null +++ b/airbyte-integrations/connectors/destination-iceberg/src/main/java/io/airbyte/integrations/destination/iceberg/config/catalog/HiveCatalogConfig.java @@ -0,0 +1,66 @@ +/* + * Copyright (c) 2022 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.integrations.destination.iceberg.config.catalog; + +import static io.airbyte.integrations.destination.iceberg.IcebergConstants.CATALOG_NAME; +import static io.airbyte.integrations.destination.iceberg.IcebergConstants.HIVE_THRIFT_URI_CONFIG_KEY; + +import com.fasterxml.jackson.databind.JsonNode; +import java.util.HashMap; +import java.util.Map; +import lombok.AllArgsConstructor; +import lombok.Data; +import lombok.EqualsAndHashCode; +import lombok.ToString; +import org.apache.iceberg.CatalogProperties; +import org.apache.iceberg.catalog.Catalog; +import org.apache.iceberg.hive.HiveCatalog; + +/** + * @author Leibniz on 2022/10/26. + */ +@Data +@AllArgsConstructor +@ToString(callSuper = true) +@EqualsAndHashCode(callSuper = false) +public class HiveCatalogConfig extends IcebergCatalogConfig { + + private final String thriftUri; + + public HiveCatalogConfig(JsonNode catalogConfig) { + String thriftUri = catalogConfig.get(HIVE_THRIFT_URI_CONFIG_KEY).asText(); + if (!thriftUri.startsWith("thrift://")) { + throw new IllegalArgumentException(HIVE_THRIFT_URI_CONFIG_KEY + " must start with 'thrift://'"); + } + this.thriftUri = thriftUri; + } + + @Override + public Map sparkConfigMap() { + Map configMap = new HashMap<>(); + configMap.put("spark.network.timeout", "300000"); + configMap.put("spark.sql.defaultCatalog", CATALOG_NAME); + configMap.put("spark.sql.catalog." + CATALOG_NAME, "org.apache.iceberg.spark.SparkCatalog"); + configMap.put("spark.sql.catalog." + CATALOG_NAME + ".type", "hive"); + configMap.put("spark.sql.catalog." + CATALOG_NAME + ".uri", this.thriftUri); + configMap.put("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions"); + configMap.put("spark.driver.extraJavaOptions", "-Dpackaging.type=jar -Djava.io.tmpdir=/tmp"); + + configMap.putAll(this.storageConfig.sparkConfigMap(CATALOG_NAME)); + return configMap; + } + + @Override + public Catalog genCatalog() { + HiveCatalog catalog = new HiveCatalog(); + Map properties = new HashMap<>(); + properties.put(CatalogProperties.URI, thriftUri); + properties.put(CatalogProperties.WAREHOUSE_LOCATION, this.storageConfig.getWarehouseUri()); + properties.putAll(this.storageConfig.catalogInitializeProperties()); + catalog.initialize("hive", properties); + return catalog; + } + +} diff --git a/airbyte-integrations/connectors/destination-iceberg/src/main/java/io/airbyte/integrations/destination/iceberg/config/catalog/IcebergCatalogConfig.java b/airbyte-integrations/connectors/destination-iceberg/src/main/java/io/airbyte/integrations/destination/iceberg/config/catalog/IcebergCatalogConfig.java new file mode 100644 index 0000000000000..3999f6b5732de --- /dev/null +++ b/airbyte-integrations/connectors/destination-iceberg/src/main/java/io/airbyte/integrations/destination/iceberg/config/catalog/IcebergCatalogConfig.java @@ -0,0 +1,77 @@ +/* + * Copyright (c) 2022 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.integrations.destination.iceberg.config.catalog; + +import static org.apache.commons.lang3.StringUtils.isBlank; + +import io.airbyte.integrations.base.JavaBaseConstants; +import io.airbyte.integrations.destination.iceberg.IcebergConstants; +import io.airbyte.integrations.destination.iceberg.config.format.FormatConfig; +import io.airbyte.integrations.destination.iceberg.config.storage.StorageConfig; +import java.util.Map; +import lombok.Data; +import lombok.ToString; +import lombok.extern.slf4j.Slf4j; +import org.apache.iceberg.Schema; +import org.apache.iceberg.Table; +import org.apache.iceberg.TableScan; +import org.apache.iceberg.catalog.Catalog; +import org.apache.iceberg.catalog.TableIdentifier; +import org.apache.iceberg.data.IcebergGenerics; +import org.apache.iceberg.data.Record; +import org.apache.iceberg.io.CloseableIterable; +import org.apache.iceberg.types.Types; +import org.apache.iceberg.types.Types.NestedField; + +/** + * @author Leibniz on 2022/10/26. + */ +@Data +@ToString +@Slf4j +public abstract class IcebergCatalogConfig { + + protected StorageConfig storageConfig; + protected FormatConfig formatConfig; + + private String defaultOutputDatabase; + + public void check() throws Exception { + // Catalog check, only checks catalog metadata + Catalog catalog = genCatalog(); + String tempTableName = "temp_" + System.currentTimeMillis(); + TableIdentifier tempTableId = TableIdentifier.of(defaultOutputDatabase(), tempTableName); + Schema schema = new Schema( + NestedField.required(0, JavaBaseConstants.COLUMN_NAME_AB_ID, Types.StringType.get()), + NestedField.optional(1, JavaBaseConstants.COLUMN_NAME_EMITTED_AT, Types.TimestampType.withZone()), + NestedField.required(2, JavaBaseConstants.COLUMN_NAME_DATA, Types.StringType.get())); + Table tempTable = catalog.createTable(tempTableId, schema); + TableScan tableScan = tempTable.newScan(); + log.info("Created temp table: {}", tempTableName); + log.info("Temp table's schema: {}", tableScan.schema()); + + try (CloseableIterable records = IcebergGenerics.read(tempTable).build()) { + for (Record record : records) { + // never reach + log.info("Record in temp table: {}", record); + } + } + + boolean dropSuccess = catalog.dropTable(tempTableId); + log.info("Dropped temp table: {}, success: {}", tempTableName, dropSuccess); + + // storage check + this.storageConfig.check(); + } + + public abstract Map sparkConfigMap(); + + public abstract Catalog genCatalog(); + + public String defaultOutputDatabase() { + return isBlank(defaultOutputDatabase) ? IcebergConstants.DEFAULT_DATABASE : defaultOutputDatabase; + } + +} diff --git a/airbyte-integrations/connectors/destination-iceberg/src/main/java/io/airbyte/integrations/destination/iceberg/config/catalog/IcebergCatalogConfigFactory.java b/airbyte-integrations/connectors/destination-iceberg/src/main/java/io/airbyte/integrations/destination/iceberg/config/catalog/IcebergCatalogConfigFactory.java new file mode 100644 index 0000000000000..53413f6f7c224 --- /dev/null +++ b/airbyte-integrations/connectors/destination-iceberg/src/main/java/io/airbyte/integrations/destination/iceberg/config/catalog/IcebergCatalogConfigFactory.java @@ -0,0 +1,85 @@ +/* + * Copyright (c) 2022 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.integrations.destination.iceberg.config.catalog; + +import static io.airbyte.integrations.destination.iceberg.IcebergConstants.DEFAULT_DATABASE_CONFIG_KEY; +import static io.airbyte.integrations.destination.iceberg.IcebergConstants.ICEBERG_CATALOG_CONFIG_KEY; +import static io.airbyte.integrations.destination.iceberg.IcebergConstants.ICEBERG_CATALOG_TYPE_CONFIG_KEY; +import static io.airbyte.integrations.destination.iceberg.IcebergConstants.ICEBERG_FORMAT_CONFIG_KEY; +import static io.airbyte.integrations.destination.iceberg.IcebergConstants.ICEBERG_STORAGE_CONFIG_KEY; +import static io.airbyte.integrations.destination.iceberg.IcebergConstants.ICEBERG_STORAGE_TYPE_CONFIG_KEY; + +import com.fasterxml.jackson.databind.JsonNode; +import io.airbyte.integrations.destination.iceberg.config.format.FormatConfig; +import io.airbyte.integrations.destination.iceberg.config.storage.S3Config; +import io.airbyte.integrations.destination.iceberg.config.storage.StorageConfig; +import io.airbyte.integrations.destination.iceberg.config.storage.StorageType; +import javax.annotation.Nonnull; +import org.jetbrains.annotations.NotNull; + +/** + * @author Leibniz on 2022/10/31. + */ +public class IcebergCatalogConfigFactory { + + public IcebergCatalogConfig fromJsonNodeConfig(@Nonnull final JsonNode config) { + // storage config + final JsonNode storageConfigJson = config.get(ICEBERG_STORAGE_CONFIG_KEY); + StorageConfig storageConfig = genStorageConfig(storageConfigJson); + + // format config + final JsonNode formatConfigJson = config.get(ICEBERG_FORMAT_CONFIG_KEY); + FormatConfig formatConfig = new FormatConfig(formatConfigJson); + + // catalog config and make final IcebergCatalogConfig Object + final JsonNode catalogConfigJson = config.get(ICEBERG_CATALOG_CONFIG_KEY); + IcebergCatalogConfig icebergCatalogConfig = genIcebergCatalogConfig(catalogConfigJson); + icebergCatalogConfig.formatConfig = formatConfig; + icebergCatalogConfig.storageConfig = storageConfig; + icebergCatalogConfig.setDefaultOutputDatabase(catalogConfigJson.get(DEFAULT_DATABASE_CONFIG_KEY).asText()); + + return icebergCatalogConfig; + } + + private StorageConfig genStorageConfig(JsonNode storageConfigJson) { + String storageTypeStr = storageConfigJson.get(ICEBERG_STORAGE_TYPE_CONFIG_KEY).asText(); + if (storageTypeStr == null) { + throw new IllegalArgumentException(ICEBERG_STORAGE_TYPE_CONFIG_KEY + " cannot be null"); + } + StorageType storageType = StorageType.valueOf(storageTypeStr.toUpperCase()); + switch (storageType) { + case S3: + return S3Config.fromDestinationConfig(storageConfigJson); + case HDFS: + default: + throw new RuntimeException("Unexpected storage config: " + storageTypeStr); + } + } + + @NotNull + private static IcebergCatalogConfig genIcebergCatalogConfig(@NotNull JsonNode catalogConfigJson) { + String catalogTypeStr = catalogConfigJson.get(ICEBERG_CATALOG_TYPE_CONFIG_KEY).asText(); + if (catalogTypeStr == null) { + throw new IllegalArgumentException(ICEBERG_CATALOG_TYPE_CONFIG_KEY + " cannot be null"); + } + CatalogType catalogType = CatalogType.valueOf(catalogTypeStr.toUpperCase()); + + return switch (catalogType) { + case HIVE -> new HiveCatalogConfig(catalogConfigJson); + case HADOOP -> new HadoopCatalogConfig(catalogConfigJson); + case JDBC -> new JdbcCatalogConfig(catalogConfigJson); + default -> throw new RuntimeException("Unexpected catalog config: " + catalogTypeStr); + }; + } + + public static String getProperty(@Nonnull final JsonNode config, @Nonnull final String key) { + final JsonNode node = config.get(key); + if (node == null) { + return null; + } + return node.asText(); + } + +} diff --git a/airbyte-integrations/connectors/destination-iceberg/src/main/java/io/airbyte/integrations/destination/iceberg/config/catalog/JdbcCatalogConfig.java b/airbyte-integrations/connectors/destination-iceberg/src/main/java/io/airbyte/integrations/destination/iceberg/config/catalog/JdbcCatalogConfig.java new file mode 100644 index 0000000000000..6a49fc9157cce --- /dev/null +++ b/airbyte-integrations/connectors/destination-iceberg/src/main/java/io/airbyte/integrations/destination/iceberg/config/catalog/JdbcCatalogConfig.java @@ -0,0 +1,93 @@ +/* + * Copyright (c) 2022 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.integrations.destination.iceberg.config.catalog; + +import static io.airbyte.integrations.destination.iceberg.IcebergConstants.CATALOG_NAME; +import static io.airbyte.integrations.destination.iceberg.IcebergConstants.JDBC_CATALOG_SCHEMA_CONFIG_KEY; +import static io.airbyte.integrations.destination.iceberg.IcebergConstants.JDBC_PASSWORD_CONFIG_KEY; +import static io.airbyte.integrations.destination.iceberg.IcebergConstants.JDBC_SSL_CONFIG_KEY; +import static io.airbyte.integrations.destination.iceberg.IcebergConstants.JDBC_URL_CONFIG_KEY; +import static io.airbyte.integrations.destination.iceberg.IcebergConstants.JDBC_USERNAME_CONFIG_KEY; +import static org.apache.commons.lang3.StringUtils.isNotBlank; + +import com.fasterxml.jackson.databind.JsonNode; +import java.util.HashMap; +import java.util.Map; +import lombok.Data; +import lombok.EqualsAndHashCode; +import lombok.ToString; +import org.apache.iceberg.CatalogProperties; +import org.apache.iceberg.catalog.Catalog; +import org.apache.iceberg.jdbc.JdbcCatalog; +import org.jetbrains.annotations.NotNull; + +/** + * @author Leibniz on 2022/11/1. + */ +@Data +@ToString(callSuper = true) +@EqualsAndHashCode(callSuper = false) +public class JdbcCatalogConfig extends IcebergCatalogConfig { + + private final String jdbcUrl; + private final String user; + private final String password; + private final boolean verifyServerCertificate; + private final boolean useSSL; + private final String catalogSchema; + + public JdbcCatalogConfig(@NotNull JsonNode catalogConfig) { + this.jdbcUrl = catalogConfig.get(JDBC_URL_CONFIG_KEY).asText(); + this.user = catalogConfig.get(JDBC_USERNAME_CONFIG_KEY).asText(); + this.password = catalogConfig.get(JDBC_PASSWORD_CONFIG_KEY).asText(); + // TODO + this.verifyServerCertificate = false; + this.useSSL = catalogConfig.get(JDBC_SSL_CONFIG_KEY).asBoolean(); + this.catalogSchema = catalogConfig.get(JDBC_CATALOG_SCHEMA_CONFIG_KEY).asText(); + } + + @Override + public Map sparkConfigMap() { + Map configMap = new HashMap<>(); + configMap.put("spark.network.timeout", "300000"); + configMap.put("spark.sql.defaultCatalog", CATALOG_NAME); + configMap.put("spark.sql.catalog." + CATALOG_NAME, "org.apache.iceberg.spark.SparkCatalog"); + configMap.put("spark.sql.catalog." + CATALOG_NAME + ".catalog-impl", "org.apache.iceberg.jdbc.JdbcCatalog"); + configMap.put("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions"); + configMap.put("spark.driver.extraJavaOptions", "-Dpackaging.type=jar -Djava.io.tmpdir=/tmp"); + + configMap.put("spark.sql.catalog." + CATALOG_NAME + ".uri", this.jdbcUrl); + configMap.put("spark.sql.catalog." + CATALOG_NAME + ".jdbc.verifyServerCertificate", + String.valueOf(this.verifyServerCertificate)); + configMap.put("spark.sql.catalog." + CATALOG_NAME + ".jdbc.useSSL", String.valueOf(this.useSSL)); + configMap.put("spark.sql.catalog." + CATALOG_NAME + ".jdbc.user", this.user); + configMap.put("spark.sql.catalog." + CATALOG_NAME + ".jdbc.password", this.password); + if (isNotBlank(this.catalogSchema)) { + configMap.put("spark.sql.catalog." + CATALOG_NAME + ".jdbc.currentSchema", this.catalogSchema); + } + + configMap.putAll(this.storageConfig.sparkConfigMap(CATALOG_NAME)); + return configMap; + } + + @Override + public Catalog genCatalog() { + JdbcCatalog catalog = new JdbcCatalog(); + Map properties = new HashMap<>(this.storageConfig.catalogInitializeProperties()); + properties.put(CatalogProperties.URI, this.jdbcUrl); + properties.put(JdbcCatalog.PROPERTY_PREFIX + "user", this.user); + properties.put(JdbcCatalog.PROPERTY_PREFIX + "password", this.password); + properties.put(JdbcCatalog.PROPERTY_PREFIX + "useSSL", String.valueOf(this.useSSL)); + properties.put(JdbcCatalog.PROPERTY_PREFIX + "verifyServerCertificate", + String.valueOf(this.verifyServerCertificate)); + if (isNotBlank(this.catalogSchema)) { + properties.put(JdbcCatalog.PROPERTY_PREFIX + "currentSchema", this.catalogSchema); + } + properties.put(CatalogProperties.WAREHOUSE_LOCATION, this.storageConfig.getWarehouseUri()); + catalog.initialize(CATALOG_NAME, properties); + return catalog; + } + +} diff --git a/airbyte-integrations/connectors/destination-iceberg/src/main/java/io/airbyte/integrations/destination/iceberg/config/format/DataFileFormat.java b/airbyte-integrations/connectors/destination-iceberg/src/main/java/io/airbyte/integrations/destination/iceberg/config/format/DataFileFormat.java new file mode 100644 index 0000000000000..296b45c02f0dd --- /dev/null +++ b/airbyte-integrations/connectors/destination-iceberg/src/main/java/io/airbyte/integrations/destination/iceberg/config/format/DataFileFormat.java @@ -0,0 +1,29 @@ +/* + * Copyright (c) 2022 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.integrations.destination.iceberg.config.format; + +import lombok.Getter; + +/** + * @author Leibniz on 2022/10/31. + */ +public enum DataFileFormat { + + AVRO("avro", "Avro"), + PARQUET("parquet", "Parquet"), + // ORC("orc"), + ; + + @Getter + private final String formatName; + @Getter + private final String configValue; + + DataFileFormat(final String formatName, String configValue) { + this.formatName = formatName; + this.configValue = configValue; + } + +} diff --git a/airbyte-integrations/connectors/destination-iceberg/src/main/java/io/airbyte/integrations/destination/iceberg/config/format/FormatConfig.java b/airbyte-integrations/connectors/destination-iceberg/src/main/java/io/airbyte/integrations/destination/iceberg/config/format/FormatConfig.java new file mode 100644 index 0000000000000..483f134797f73 --- /dev/null +++ b/airbyte-integrations/connectors/destination-iceberg/src/main/java/io/airbyte/integrations/destination/iceberg/config/format/FormatConfig.java @@ -0,0 +1,64 @@ +/* + * Copyright (c) 2022 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.integrations.destination.iceberg.config.format; + +import static io.airbyte.integrations.destination.iceberg.IcebergConstants.AUTO_COMPACT_CONFIG_KEY; +import static io.airbyte.integrations.destination.iceberg.IcebergConstants.COMPACT_TARGET_FILE_SIZE_IN_MB_CONFIG_KEY; +import static io.airbyte.integrations.destination.iceberg.IcebergConstants.FLUSH_BATCH_SIZE_CONFIG_KEY; +import static io.airbyte.integrations.destination.iceberg.IcebergConstants.FORMAT_TYPE_CONFIG_KEY; +import static io.airbyte.integrations.destination.iceberg.config.catalog.IcebergCatalogConfigFactory.getProperty; + +import com.fasterxml.jackson.databind.JsonNode; +import lombok.Data; + +/** + * @author Leibniz on 2022/10/31. + */ +@Data +public class FormatConfig { + + public static final int DEFAULT_FLUSH_BATCH_SIZE = 10000; + public static final boolean DEFAULT_AUTO_COMPACT = false; + public static final int DEFAULT_COMPACT_TARGET_FILE_SIZE_IN_MB = 100; + + private DataFileFormat format; + private Integer flushBatchSize; + private boolean autoCompact; + private Integer compactTargetFileSizeInMb; + + // TODO compression config + + public FormatConfig(JsonNode formatConfigJson) { + // format + String formatStr = getProperty(formatConfigJson, FORMAT_TYPE_CONFIG_KEY); + if (formatStr == null) { + throw new IllegalArgumentException(FORMAT_TYPE_CONFIG_KEY + " cannot be null"); + } + this.format = DataFileFormat.valueOf(formatStr.toUpperCase()); + + // flushBatchSize + if (formatConfigJson.has(FLUSH_BATCH_SIZE_CONFIG_KEY)) { + this.flushBatchSize = formatConfigJson.get(FLUSH_BATCH_SIZE_CONFIG_KEY).asInt(DEFAULT_FLUSH_BATCH_SIZE); + } else { + this.flushBatchSize = DEFAULT_FLUSH_BATCH_SIZE; + } + + // autoCompact + if (formatConfigJson.has(AUTO_COMPACT_CONFIG_KEY)) { + this.autoCompact = formatConfigJson.get(AUTO_COMPACT_CONFIG_KEY).asBoolean(DEFAULT_AUTO_COMPACT); + } else { + this.autoCompact = DEFAULT_AUTO_COMPACT; + } + + // compactTargetFileSizeInMb + if (formatConfigJson.has(COMPACT_TARGET_FILE_SIZE_IN_MB_CONFIG_KEY)) { + this.compactTargetFileSizeInMb = formatConfigJson.get(COMPACT_TARGET_FILE_SIZE_IN_MB_CONFIG_KEY) + .asInt(DEFAULT_COMPACT_TARGET_FILE_SIZE_IN_MB); + } else { + this.compactTargetFileSizeInMb = DEFAULT_COMPACT_TARGET_FILE_SIZE_IN_MB; + } + } + +} diff --git a/airbyte-integrations/connectors/destination-iceberg/src/main/java/io/airbyte/integrations/destination/iceberg/config/storage/S3Config.java b/airbyte-integrations/connectors/destination-iceberg/src/main/java/io/airbyte/integrations/destination/iceberg/config/storage/S3Config.java new file mode 100644 index 0000000000000..01202054f4d52 --- /dev/null +++ b/airbyte-integrations/connectors/destination-iceberg/src/main/java/io/airbyte/integrations/destination/iceberg/config/storage/S3Config.java @@ -0,0 +1,252 @@ +/* + * Copyright (c) 2022 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.integrations.destination.iceberg.config.storage; + +import static io.airbyte.integrations.destination.iceberg.IcebergConstants.S3_ACCESS_KEY_ID_CONFIG_KEY; +import static io.airbyte.integrations.destination.iceberg.IcebergConstants.S3_BUCKET_REGION_CONFIG_KEY; +import static io.airbyte.integrations.destination.iceberg.IcebergConstants.S3_ENDPOINT_CONFIG_KEY; +import static io.airbyte.integrations.destination.iceberg.IcebergConstants.S3_PATH_STYLE_ACCESS_CONFIG_KEY; +import static io.airbyte.integrations.destination.iceberg.IcebergConstants.S3_SECRET_KEY_CONFIG_KEY; +import static io.airbyte.integrations.destination.iceberg.IcebergConstants.S3_WAREHOUSE_URI_CONFIG_KEY; +import static org.apache.commons.lang3.StringUtils.isBlank; +import static org.apache.commons.lang3.StringUtils.isEmpty; + +import com.amazonaws.ClientConfiguration; +import com.amazonaws.Protocol; +import com.amazonaws.auth.AWSCredentialsProvider; +import com.amazonaws.client.builder.AwsClientBuilder; +import com.amazonaws.services.s3.AmazonS3; +import com.amazonaws.services.s3.AmazonS3ClientBuilder; +import com.amazonaws.services.s3.model.ListObjectsRequest; +import com.fasterxml.jackson.databind.JsonNode; +import io.airbyte.integrations.destination.iceberg.config.storage.credential.S3AWSDefaultProfileCredentialConfig; +import io.airbyte.integrations.destination.iceberg.config.storage.credential.S3AccessKeyCredentialConfig; +import io.airbyte.integrations.destination.iceberg.config.storage.credential.S3CredentialConfig; +import io.airbyte.integrations.destination.iceberg.config.storage.credential.S3CredentialType; +import java.util.HashMap; +import java.util.Map; +import java.util.UUID; +import javax.annotation.Nonnull; +import lombok.AllArgsConstructor; +import lombok.Builder; +import lombok.Data; +import lombok.extern.slf4j.Slf4j; +import org.apache.iceberg.CatalogProperties; + +/** + * @author Leibniz on 2022/10/26. + */ +@Slf4j +@Data +@Builder +@AllArgsConstructor +public class S3Config implements StorageConfig { + + private static final String SCHEMA_SUFFIX = "://"; + /** + * Lock + */ + private final Object lock = new Object(); + + /** + * Properties from Destination Config + */ + private final String endpoint; + private final String endpointWithSchema; + private final String warehouseUri; + private final String bucketRegion; + private final String accessKeyId; + private final String secretKey; + private final S3CredentialConfig credentialConfig; + private final boolean pathStyleAccess; + private final boolean sslEnabled; + + private AmazonS3 s3Client; + + public static S3Config fromDestinationConfig(@Nonnull final JsonNode config) { + S3ConfigBuilder builder = new S3ConfigBuilder().bucketRegion(getProperty(config, S3_BUCKET_REGION_CONFIG_KEY)); + + String warehouseUri = getProperty(config, S3_WAREHOUSE_URI_CONFIG_KEY); + if (isBlank(warehouseUri)) { + throw new IllegalArgumentException(S3_WAREHOUSE_URI_CONFIG_KEY + " cannot be null"); + } + if (!warehouseUri.startsWith("s3://") && !warehouseUri.startsWith("s3n://") + && !warehouseUri.startsWith("s3a://")) { + throw new IllegalArgumentException( + S3_WAREHOUSE_URI_CONFIG_KEY + " must starts with 's3://' or 's3n://' or 's3a://'"); + } + builder.warehouseUri(warehouseUri); + + String endpointStr = getProperty(config, S3_ENDPOINT_CONFIG_KEY); + if (isBlank(endpointStr)) { + // use Amazon S3 + builder.sslEnabled(true); + } else { + boolean sslEnabled = !endpointStr.startsWith("http://"); + String pureEndpoint = removeSchemaSuffix(endpointStr); + builder.sslEnabled(sslEnabled); + builder.endpoint(pureEndpoint); + if (sslEnabled) { + builder.endpointWithSchema("https://" + pureEndpoint); + } else { + builder.endpointWithSchema("http://" + pureEndpoint); + } + } + + if (config.has(S3_ACCESS_KEY_ID_CONFIG_KEY)) { + String accessKeyId = getProperty(config, S3_ACCESS_KEY_ID_CONFIG_KEY); + String secretAccessKey = getProperty(config, S3_SECRET_KEY_CONFIG_KEY); + builder.credentialConfig(new S3AccessKeyCredentialConfig(accessKeyId, secretAccessKey)) + .accessKeyId(accessKeyId) + .secretKey(secretAccessKey); + } else { + builder.credentialConfig(new S3AWSDefaultProfileCredentialConfig()).accessKeyId("").secretKey(""); + } + + if (config.has(S3_PATH_STYLE_ACCESS_CONFIG_KEY)) { + builder.pathStyleAccess(config.get(S3_PATH_STYLE_ACCESS_CONFIG_KEY).booleanValue()); + } else { + builder.pathStyleAccess(true); + } + + return builder.build().setProperty(); + } + + private S3Config setProperty() { + System.setProperty("aws.region", bucketRegion); + System.setProperty("aws.accessKeyId", accessKeyId); + System.setProperty("aws.secretAccessKey", secretKey); + return this; + } + + private static String getProperty(@Nonnull final JsonNode config, @Nonnull final String key) { + final JsonNode node = config.get(key); + if (node == null) { + return null; + } + return node.asText(); + } + + public AmazonS3 getS3Client() { + synchronized (lock) { + if (s3Client == null) { + return resetS3Client(); + } + return s3Client; + } + } + + private AmazonS3 resetS3Client() { + synchronized (lock) { + if (s3Client != null) { + s3Client.shutdown(); + } + s3Client = createS3Client(); + return s3Client; + } + } + + private AmazonS3 createS3Client() { + log.info("Creating S3 client..."); + + final AWSCredentialsProvider credentialsProvider = credentialConfig.getS3CredentialsProvider(); + final S3CredentialType credentialType = credentialConfig.getCredentialType(); + + if (S3CredentialType.DEFAULT_PROFILE == credentialType) { + return AmazonS3ClientBuilder.standard() + .withRegion(bucketRegion) + .withCredentials(credentialsProvider) + .build(); + } + + if (isEmpty(endpoint)) { + return AmazonS3ClientBuilder.standard() + .withCredentials(credentialsProvider) + .withRegion(bucketRegion) + .build(); + } + + final ClientConfiguration clientConfiguration = new ClientConfiguration().withProtocol(Protocol.HTTPS); + clientConfiguration.setSignerOverride("AWSS3V4SignerType"); + + return AmazonS3ClientBuilder.standard() + .withEndpointConfiguration(new AwsClientBuilder.EndpointConfiguration(endpointWithSchema, bucketRegion)) + .withPathStyleAccessEnabled(true) + .withClientConfiguration(clientConfiguration) + .withCredentials(credentialsProvider) + .build(); + } + + public void check() { + final AmazonS3 s3Client = this.getS3Client(); + + // normalize path + String prefix = this.warehouseUri.replaceAll("^s3[an]?://.+?/(.+?)/?$", "$1/"); + String tempObjectName = prefix + "_airbyte_connection_test_" + + UUID.randomUUID().toString().replaceAll("-", ""); + String bucket = this.warehouseUri.replaceAll("^s3[an]?://(.+?)/.+$", "$1"); + + // check bucket exists + if (!s3Client.doesBucketExistV2(bucket)) { + log.info("Bucket {} does not exist; creating...", bucket); + s3Client.createBucket(bucket); + log.info("Bucket {} has been created.", bucket); + } + + // try puts temp object + s3Client.putObject(bucket, tempObjectName, "check-content"); + + // check listObjects + log.info("Started testing if IAM user can call listObjects on the destination bucket"); + final ListObjectsRequest request = new ListObjectsRequest().withBucketName(bucket).withMaxKeys(1); + s3Client.listObjects(request); + log.info("Finished checking for listObjects permission"); + + // delete temp object + s3Client.deleteObject(bucket, tempObjectName); + } + + private static String removeSchemaSuffix(String endpoint) { + if (endpoint.contains(SCHEMA_SUFFIX)) { + int schemaSuffixIndex = endpoint.indexOf(SCHEMA_SUFFIX) + SCHEMA_SUFFIX.length(); + return endpoint.substring(schemaSuffixIndex); + } else { + return endpoint; + } + } + + @Override + public Map sparkConfigMap(String catalogName) { + Map sparkConfig = new HashMap<>(); + sparkConfig.put("spark.sql.catalog." + catalogName + ".io-impl", "org.apache.iceberg.aws.s3.S3FileIO"); + sparkConfig.put("spark.sql.catalog." + catalogName + ".warehouse", this.warehouseUri); + sparkConfig.put("spark.sql.catalog." + catalogName + ".s3.endpoint", this.endpointWithSchema); + sparkConfig.put("spark.sql.catalog." + catalogName + ".s3.access-key-id", this.accessKeyId); + sparkConfig.put("spark.sql.catalog." + catalogName + ".s3.secret-access-key", this.secretKey); + sparkConfig.put("spark.sql.catalog." + catalogName + ".s3.path-style-access", + String.valueOf(this.pathStyleAccess)); + sparkConfig.put("spark.hadoop.fs.s3a.access.key", this.accessKeyId); + sparkConfig.put("spark.hadoop.fs.s3a.secret.key", this.secretKey); + sparkConfig.put("spark.hadoop.fs.s3a.path.style.access", String.valueOf(this.pathStyleAccess)); + sparkConfig.put("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem"); + sparkConfig.put("spark.hadoop.fs.s3a.endpoint", this.endpoint); + sparkConfig.put("spark.hadoop.fs.s3a.connection.ssl.enabled", String.valueOf(this.sslEnabled)); + sparkConfig.put("spark.hadoop.fs.s3a.aws.credentials.provider", + "org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider"); + return sparkConfig; + } + + @Override + public Map catalogInitializeProperties() { + Map properties = new HashMap<>(); + properties.put(CatalogProperties.FILE_IO_IMPL, "org.apache.iceberg.aws.s3.S3FileIO"); + properties.put("s3.endpoint", this.endpointWithSchema); + properties.put("s3.access-key-id", this.accessKeyId); + properties.put("s3.secret-access-key", this.secretKey); + properties.put("s3.path-style-access", String.valueOf(this.pathStyleAccess)); + return properties; + } + +} diff --git a/airbyte-integrations/connectors/destination-iceberg/src/main/java/io/airbyte/integrations/destination/iceberg/config/storage/StorageConfig.java b/airbyte-integrations/connectors/destination-iceberg/src/main/java/io/airbyte/integrations/destination/iceberg/config/storage/StorageConfig.java new file mode 100644 index 0000000000000..b4aa660248727 --- /dev/null +++ b/airbyte-integrations/connectors/destination-iceberg/src/main/java/io/airbyte/integrations/destination/iceberg/config/storage/StorageConfig.java @@ -0,0 +1,40 @@ +/* + * Copyright (c) 2022 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.integrations.destination.iceberg.config.storage; + +import java.util.Map; + +/** + * @author Leibniz on 2022/10/31. + */ +public interface StorageConfig { + + /** + * Checks about read, write, privileges + * + * @throws Exception maybe IOException + */ + void check() throws Exception; + + String getWarehouseUri(); + + /** + * append Spark storage configurations for Iceberg, including (but not limited to): 1. + * spark.sql.catalog.{catalogName}.xxx = yyy 2. spark.hadoop.fs.xxx = yyy + * + * @param catalogName name of Iceberg catalog + * @return a configuration Map to build Spark Session + */ + Map sparkConfigMap(String catalogName); + + /** + * append storage configurations for Iceberg Catalog For calling + * org.apache.iceberg.catalog.Catalog#initialize() + * + * @return a configuration Map to build Catalog(org.apache.iceberg.catalog.Catalog) + */ + Map catalogInitializeProperties(); + +} diff --git a/airbyte-integrations/connectors/destination-iceberg/src/main/java/io/airbyte/integrations/destination/iceberg/config/storage/StorageType.java b/airbyte-integrations/connectors/destination-iceberg/src/main/java/io/airbyte/integrations/destination/iceberg/config/storage/StorageType.java new file mode 100644 index 0000000000000..ae586d2eb1324 --- /dev/null +++ b/airbyte-integrations/connectors/destination-iceberg/src/main/java/io/airbyte/integrations/destination/iceberg/config/storage/StorageType.java @@ -0,0 +1,13 @@ +/* + * Copyright (c) 2022 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.integrations.destination.iceberg.config.storage; + +/** + * @author Leibniz on 2022/10/31. + */ +public enum StorageType { + S3, + HDFS; +} diff --git a/airbyte-integrations/connectors/destination-iceberg/src/main/java/io/airbyte/integrations/destination/iceberg/config/storage/credential/S3AWSDefaultProfileCredentialConfig.java b/airbyte-integrations/connectors/destination-iceberg/src/main/java/io/airbyte/integrations/destination/iceberg/config/storage/credential/S3AWSDefaultProfileCredentialConfig.java new file mode 100644 index 0000000000000..78ba9863b98c2 --- /dev/null +++ b/airbyte-integrations/connectors/destination-iceberg/src/main/java/io/airbyte/integrations/destination/iceberg/config/storage/credential/S3AWSDefaultProfileCredentialConfig.java @@ -0,0 +1,25 @@ +/* + * Copyright (c) 2022 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.integrations.destination.iceberg.config.storage.credential; + +import com.amazonaws.auth.AWSCredentialsProvider; +import com.amazonaws.auth.DefaultAWSCredentialsProviderChain; + +/** + * @author Leibniz on 2022/10/26. + */ +public class S3AWSDefaultProfileCredentialConfig implements S3CredentialConfig { + + @Override + public S3CredentialType getCredentialType() { + return S3CredentialType.DEFAULT_PROFILE; + } + + @Override + public AWSCredentialsProvider getS3CredentialsProvider() { + return new DefaultAWSCredentialsProviderChain(); + } + +} diff --git a/airbyte-integrations/connectors/destination-iceberg/src/main/java/io/airbyte/integrations/destination/iceberg/config/storage/credential/S3AccessKeyCredentialConfig.java b/airbyte-integrations/connectors/destination-iceberg/src/main/java/io/airbyte/integrations/destination/iceberg/config/storage/credential/S3AccessKeyCredentialConfig.java new file mode 100644 index 0000000000000..1c194981459df --- /dev/null +++ b/airbyte-integrations/connectors/destination-iceberg/src/main/java/io/airbyte/integrations/destination/iceberg/config/storage/credential/S3AccessKeyCredentialConfig.java @@ -0,0 +1,44 @@ +/* + * Copyright (c) 2022 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.integrations.destination.iceberg.config.storage.credential; + +import com.amazonaws.auth.AWSCredentials; +import com.amazonaws.auth.AWSCredentialsProvider; +import com.amazonaws.auth.AWSStaticCredentialsProvider; +import com.amazonaws.auth.BasicAWSCredentials; + +/** + * @author Leibniz on 2022/10/26. + */ +public class S3AccessKeyCredentialConfig implements S3CredentialConfig { + + private final String accessKeyId; + private final String secretAccessKey; + + public S3AccessKeyCredentialConfig(final String accessKeyId, final String secretAccessKey) { + this.accessKeyId = accessKeyId; + this.secretAccessKey = secretAccessKey; + } + + @Override + public S3CredentialType getCredentialType() { + return S3CredentialType.ACCESS_KEY; + } + + @Override + public AWSCredentialsProvider getS3CredentialsProvider() { + final AWSCredentials awsCreds = new BasicAWSCredentials(accessKeyId, secretAccessKey); + return new AWSStaticCredentialsProvider(awsCreds); + } + + public String getAccessKeyId() { + return accessKeyId; + } + + public String getSecretAccessKey() { + return secretAccessKey; + } + +} diff --git a/airbyte-integrations/connectors/destination-iceberg/src/main/java/io/airbyte/integrations/destination/iceberg/config/storage/credential/S3CredentialConfig.java b/airbyte-integrations/connectors/destination-iceberg/src/main/java/io/airbyte/integrations/destination/iceberg/config/storage/credential/S3CredentialConfig.java new file mode 100644 index 0000000000000..69ad328d0f29c --- /dev/null +++ b/airbyte-integrations/connectors/destination-iceberg/src/main/java/io/airbyte/integrations/destination/iceberg/config/storage/credential/S3CredentialConfig.java @@ -0,0 +1,18 @@ +/* + * Copyright (c) 2022 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.integrations.destination.iceberg.config.storage.credential; + +import com.amazonaws.auth.AWSCredentialsProvider; + +/** + * @author Leibniz on 2022/10/26. + */ +public interface S3CredentialConfig { + + S3CredentialType getCredentialType(); + + AWSCredentialsProvider getS3CredentialsProvider(); + +} diff --git a/airbyte-integrations/connectors/destination-iceberg/src/main/java/io/airbyte/integrations/destination/iceberg/config/storage/credential/S3CredentialType.java b/airbyte-integrations/connectors/destination-iceberg/src/main/java/io/airbyte/integrations/destination/iceberg/config/storage/credential/S3CredentialType.java new file mode 100644 index 0000000000000..01b82bf2d8a43 --- /dev/null +++ b/airbyte-integrations/connectors/destination-iceberg/src/main/java/io/airbyte/integrations/destination/iceberg/config/storage/credential/S3CredentialType.java @@ -0,0 +1,14 @@ +/* + * Copyright (c) 2022 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.integrations.destination.iceberg.config.storage.credential; + +/** + * @author Leibniz on 2022/10/26. + */ +public enum S3CredentialType { + + ACCESS_KEY, + DEFAULT_PROFILE +} diff --git a/airbyte-integrations/connectors/destination-iceberg/src/main/resources/spec.json b/airbyte-integrations/connectors/destination-iceberg/src/main/resources/spec.json new file mode 100644 index 0000000000000..8b95191c41b0a --- /dev/null +++ b/airbyte-integrations/connectors/destination-iceberg/src/main/resources/spec.json @@ -0,0 +1,269 @@ +{ + "documentationUrl": "https://docs.airbyte.com/integrations/destinations/iceberg", + "supportsNormalization": false, + "supported_destination_sync_modes": ["overwrite", "append"], + "connectionSpecification": { + "$schema": "http://json-schema.org/draft-07/schema#", + "title": "Iceberg Destination Spec", + "type": "object", + "required": ["catalog_config", "storage_config", "format_config"], + "properties": { + "catalog_config": { + "title": "Iceberg catalog config", + "type": "object", + "description": "Catalog config of Iceberg.", + "oneOf": [ + { + "title": "HiveCatalog: Use Apache Hive MetaStore", + "required": ["catalog_type", "hive_thrift_uri"], + "properties": { + "catalog_type": { + "title": "Catalog Type", + "type": "string", + "default": "Hive", + "enum": ["Hive"], + "order": 0 + }, + "hive_thrift_uri": { + "title": "Hive Metastore thrift uri", + "type": "string", + "description": "Hive MetaStore thrift server uri of iceberg catalog.", + "examples": ["host:port"], + "order": 1 + }, + "database": { + "title": "Default database", + "description": "The default database tables are written to if the source does not specify a namespace. The usual value for this field is \"default\".", + "type": "string", + "default": "default", + "examples": ["default"], + "order": 2 + } + } + }, + { + "title": "HadoopCatalog: Use hierarchical file systems as same as storage config", + "description": "A Hadoop catalog doesn’t need to connect to a Hive MetaStore, but can only be used with HDFS or similar file systems that support atomic rename.", + "required": ["catalog_type"], + "properties": { + "catalog_type": { + "title": "Catalog Type", + "type": "string", + "default": "Hadoop", + "enum": ["Hadoop"], + "order": 0 + }, + "database": { + "title": "Default database", + "description": "The default database tables are written to if the source does not specify a namespace. The usual value for this field is \"default\".", + "type": "string", + "default": "default", + "examples": ["default"], + "order": 1 + } + } + }, + { + "title": "JdbcCatalog: Use relational database", + "description": "Using a table in a relational database to manage Iceberg tables through JDBC. Read more here. Supporting: PostgreSQL", + "required": ["catalog_type"], + "properties": { + "catalog_type": { + "title": "Catalog Type", + "type": "string", + "default": "Jdbc", + "enum": ["Jdbc"], + "order": 0 + }, + "database": { + "title": "Default schema", + "description": "The default schema tables are written to if the source does not specify a namespace. The usual value for this field is \"public\".", + "type": "string", + "default": "public", + "examples": ["public"], + "order": 1 + }, + "jdbc_url": { + "title": "Jdbc url", + "type": "string", + "examples": ["jdbc:postgresql://{host}:{port}/{database}"], + "order": 2 + }, + "username": { + "title": "User", + "description": "Username to use to access the database.", + "type": "string", + "order": 3 + }, + "password": { + "title": "Password", + "description": "Password associated with the username.", + "type": "string", + "airbyte_secret": true, + "order": 4 + }, + "ssl": { + "title": "SSL Connection", + "description": "Encrypt data using SSL. When activating SSL, please select one of the connection modes.", + "type": "boolean", + "default": false, + "order": 5 + }, + "catalog_schema": { + "title": "schema for Iceberg catalog", + "description": "Iceberg catalog metadata tables are written to catalog schema. The usual value for this field is \"public\".", + "type": "string", + "default": "public", + "examples": ["public"], + "order": 6 + } + } + } + ], + "order": 0 + }, + "storage_config": { + "title": "Storage config", + "type": "object", + "description": "Storage config of Iceberg.", + "oneOf": [ + { + "title": "S3", + "type": "object", + "description": "S3 object storage", + "required": [ + "storage_type", + "access_key_id", + "secret_access_key", + "s3_warehouse_uri" + ], + "properties": { + "storage_type": { + "title": "Storage Type", + "type": "string", + "default": "S3", + "enum": ["S3"], + "order": 0 + }, + "access_key_id": { + "type": "string", + "description": "The access key ID to access the S3 bucket. Airbyte requires Read and Write permissions to the given bucket. Read more here.", + "title": "S3 Key ID", + "airbyte_secret": true, + "examples": ["A012345678910EXAMPLE"], + "order": 0 + }, + "secret_access_key": { + "type": "string", + "description": "The corresponding secret to the access key ID. Read more here", + "title": "S3 Access Key", + "airbyte_secret": true, + "examples": ["a012345678910ABCDEFGH/AbCdEfGhEXAMPLEKEY"], + "order": 1 + }, + "s3_warehouse_uri": { + "title": "S3 Warehouse Uri for Iceberg", + "type": "string", + "description": "The Warehouse Uri for Iceberg", + "examples": [ + "s3a://my-bucket/path/to/warehouse", + "s3://my-bucket/path/to/warehouse" + ], + "order": 2 + }, + "s3_bucket_region": { + "title": "S3 Bucket Region", + "type": "string", + "default": "", + "description": "The region of the S3 bucket. See here for all region codes.", + "enum": [ + "", + "us-east-1", + "us-east-2", + "us-west-1", + "us-west-2", + "af-south-1", + "ap-east-1", + "ap-south-1", + "ap-northeast-1", + "ap-northeast-2", + "ap-northeast-3", + "ap-southeast-1", + "ap-southeast-2", + "ca-central-1", + "cn-north-1", + "cn-northwest-1", + "eu-central-1", + "eu-north-1", + "eu-south-1", + "eu-west-1", + "eu-west-2", + "eu-west-3", + "sa-east-1", + "me-south-1", + "us-gov-east-1", + "us-gov-west-1" + ], + "order": 3 + }, + "s3_endpoint": { + "title": "Endpoint", + "type": "string", + "default": "", + "description": "Your S3 endpoint url. Read more here", + "examples": ["http://localhost:9000", "localhost:9000"], + "order": 4 + }, + "s3_path_style_access": { + "type": "boolean", + "description": "Use path style access", + "examples": [true, false], + "default": true, + "order": 5 + } + } + } + ], + "order": 1 + }, + "format_config": { + "title": "File format", + "type": "object", + "required": ["format"], + "description": "File format of Iceberg storage.", + "properties": { + "format": { + "title": "File storage format", + "type": "string", + "default": "Parquet", + "description": "", + "enum": ["Parquet", "Avro"], + "order": 0 + }, + "flush_batch_size": { + "title": "Data file flushing batch size", + "description": "Iceberg data file flush batch size. Incoming rows write to cache firstly; When cache size reaches this 'batch size', flush into real Iceberg data file.", + "type": "integer", + "default": 10000, + "order": 1 + }, + "auto_compact": { + "title": "Auto compact data files", + "description": "Auto compact data files when stream close", + "type": "boolean", + "default": false, + "order": 2 + }, + "compact_target_file_size_in_mb": { + "title": "Target size of compacted data file", + "description": "Specify the target size of Iceberg data file when performing a compaction action. ", + "type": "integer", + "default": 100, + "order": 3 + } + }, + "order": 2 + } + } + } +} diff --git a/airbyte-integrations/connectors/destination-iceberg/src/test-integration/java/io/airbyte/integrations/destination/iceberg/IcebergIntegrationTestUtil.java b/airbyte-integrations/connectors/destination-iceberg/src/test-integration/java/io/airbyte/integrations/destination/iceberg/IcebergIntegrationTestUtil.java new file mode 100644 index 0000000000000..878f20f8d7e6e --- /dev/null +++ b/airbyte-integrations/connectors/destination-iceberg/src/test-integration/java/io/airbyte/integrations/destination/iceberg/IcebergIntegrationTestUtil.java @@ -0,0 +1,115 @@ +/* + * Copyright (c) 2022 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.integrations.destination.iceberg; + +import static io.airbyte.integrations.destination.iceberg.container.MinioContainer.DEFAULT_ACCESS_KEY; +import static io.airbyte.integrations.destination.iceberg.container.MinioContainer.DEFAULT_SECRET_KEY; +import static org.sparkproject.jetty.util.StringUtil.isNotBlank; + +import com.amazonaws.services.s3.AmazonS3; +import com.amazonaws.services.s3.model.Bucket; +import com.fasterxml.jackson.databind.JsonNode; +import com.github.dockerjava.api.model.ContainerNetwork; +import io.airbyte.commons.json.Jsons; +import io.airbyte.integrations.base.JavaBaseConstants; +import io.airbyte.integrations.destination.NamingConventionTransformer; +import io.airbyte.integrations.destination.StandardNameTransformer; +import io.airbyte.integrations.destination.iceberg.config.catalog.IcebergCatalogConfig; +import io.airbyte.integrations.destination.iceberg.config.catalog.IcebergCatalogConfigFactory; +import io.airbyte.integrations.destination.iceberg.config.storage.S3Config; +import io.airbyte.integrations.destination.iceberg.container.MinioContainer; +import io.airbyte.integrations.destination.iceberg.container.MinioContainer.CredentialsProvider; +import java.io.IOException; +import java.sql.Timestamp; +import java.time.OffsetDateTime; +import java.time.ZoneOffset; +import java.util.ArrayList; +import java.util.Comparator; +import java.util.List; +import java.util.Map.Entry; +import java.util.stream.Collectors; +import org.apache.iceberg.Table; +import org.apache.iceberg.catalog.Catalog; +import org.apache.iceberg.catalog.TableIdentifier; +import org.apache.iceberg.data.IcebergGenerics; +import org.apache.iceberg.data.Record; +import org.apache.iceberg.io.CloseableIterable; +import org.glassfish.jersey.internal.guava.Lists; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.testcontainers.containers.GenericContainer; +import org.testcontainers.lifecycle.Startable; + +/** + * @author Leibniz on 2022/11/3. + */ +public class IcebergIntegrationTestUtil { + + private static final Logger LOGGER = LoggerFactory.getLogger(IcebergIntegrationTestUtil.class); + + public static final String ICEBERG_IMAGE_NAME = "airbyte/destination-iceberg:dev"; + + public static final String WAREHOUSE_BUCKET_NAME = "warehouse"; + private static final NamingConventionTransformer namingResolver = new StandardNameTransformer(); + private static final IcebergCatalogConfigFactory icebergCatalogConfigFactory = new IcebergCatalogConfigFactory(); + + public static MinioContainer createAndStartMinioContainer(Integer bindPort) { + CredentialsProvider credentialsProvider = new CredentialsProvider(DEFAULT_ACCESS_KEY, DEFAULT_SECRET_KEY); + String minioImage = "minio/minio:RELEASE.2022-10-29T06-21-33Z.fips"; + MinioContainer container = new MinioContainer(minioImage, credentialsProvider, bindPort); + container.start(); + LOGGER.info("==> Started Minio docker container..."); + return container; + } + + public static void stopAndCloseContainer(Startable container, String name) { + container.stop(); + container.close(); + LOGGER.info("<== Closed {} docker container...", name); + } + + public static void createS3WarehouseBucket(JsonNode config) { + IcebergCatalogConfig catalogConfig = icebergCatalogConfigFactory.fromJsonNodeConfig(config); + AmazonS3 client = ((S3Config) catalogConfig.getStorageConfig()).getS3Client(); + Bucket bucket = client.createBucket(WAREHOUSE_BUCKET_NAME); + LOGGER.info("Created s3 bucket: {}", bucket.getName()); + List buckets = client.listBuckets(); + LOGGER.info("All s3 buckets: {}", buckets); + } + + public static List retrieveRecords(JsonNode config, String namespace, String streamName) + throws IOException { + IcebergCatalogConfig catalogConfig = icebergCatalogConfigFactory.fromJsonNodeConfig(config); + Catalog catalog = catalogConfig.genCatalog(); + String dbName = namingResolver.getNamespace( + isNotBlank(namespace) ? namespace : catalogConfig.defaultOutputDatabase()).toLowerCase(); + String tableName = namingResolver.getIdentifier("airbyte_raw_" + streamName).toLowerCase(); + LOGGER.info("Select data from:{}", tableName); + Table table = catalog.loadTable(TableIdentifier.of(dbName, tableName)); + try (CloseableIterable recordItr = IcebergGenerics.read(table).build()) { + ArrayList records = Lists.newArrayList(recordItr); + return records.stream() + .sorted(Comparator.comparingLong(r -> offsetDataTimeToTimestamp((OffsetDateTime) r.getField( + JavaBaseConstants.COLUMN_NAME_EMITTED_AT)))) + .map(r -> Jsons.deserialize((String) r.getField(JavaBaseConstants.COLUMN_NAME_DATA))) + .collect(Collectors.toList()); + } + } + + private static long offsetDataTimeToTimestamp(OffsetDateTime offsetDateTime) { + return Timestamp.valueOf(offsetDateTime.atZoneSameInstant(ZoneOffset.UTC).toLocalDateTime()).getTime(); + } + + public static String getContainerIpAddr(GenericContainer container) { + for (Entry entry : container.getContainerInfo() + .getNetworkSettings() + .getNetworks() + .entrySet()) { + return entry.getValue().getIpAddress(); + } + return container.getContainerIpAddress(); + } + +} diff --git a/airbyte-integrations/connectors/destination-iceberg/src/test-integration/java/io/airbyte/integrations/destination/iceberg/container/HiveMetastoreS3PostgresCompose.java b/airbyte-integrations/connectors/destination-iceberg/src/test-integration/java/io/airbyte/integrations/destination/iceberg/container/HiveMetastoreS3PostgresCompose.java new file mode 100644 index 0000000000000..11ec693788ca0 --- /dev/null +++ b/airbyte-integrations/connectors/destination-iceberg/src/test-integration/java/io/airbyte/integrations/destination/iceberg/container/HiveMetastoreS3PostgresCompose.java @@ -0,0 +1,123 @@ +/* + * Copyright (c) 2022 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.integrations.destination.iceberg.container; + +import static io.airbyte.integrations.destination.iceberg.IcebergConstants.DEFAULT_DATABASE_CONFIG_KEY; +import static io.airbyte.integrations.destination.iceberg.IcebergConstants.HIVE_THRIFT_URI_CONFIG_KEY; +import static io.airbyte.integrations.destination.iceberg.IcebergConstants.ICEBERG_CATALOG_CONFIG_KEY; +import static io.airbyte.integrations.destination.iceberg.IcebergConstants.ICEBERG_CATALOG_TYPE_CONFIG_KEY; +import static io.airbyte.integrations.destination.iceberg.IcebergConstants.ICEBERG_FORMAT_CONFIG_KEY; +import static io.airbyte.integrations.destination.iceberg.IcebergConstants.ICEBERG_STORAGE_CONFIG_KEY; +import static io.airbyte.integrations.destination.iceberg.IcebergConstants.ICEBERG_STORAGE_TYPE_CONFIG_KEY; +import static io.airbyte.integrations.destination.iceberg.IcebergConstants.S3_ACCESS_KEY_ID_CONFIG_KEY; +import static io.airbyte.integrations.destination.iceberg.IcebergConstants.S3_BUCKET_REGION_CONFIG_KEY; +import static io.airbyte.integrations.destination.iceberg.IcebergConstants.S3_ENDPOINT_CONFIG_KEY; +import static io.airbyte.integrations.destination.iceberg.IcebergConstants.S3_SECRET_KEY_CONFIG_KEY; +import static io.airbyte.integrations.destination.iceberg.IcebergConstants.S3_WAREHOUSE_URI_CONFIG_KEY; +import static io.airbyte.integrations.destination.iceberg.IcebergIntegrationTestUtil.WAREHOUSE_BUCKET_NAME; +import static io.airbyte.integrations.destination.iceberg.container.MinioContainer.DEFAULT_ACCESS_KEY; +import static io.airbyte.integrations.destination.iceberg.container.MinioContainer.DEFAULT_SECRET_KEY; +import static java.util.Map.entry; +import static java.util.Map.ofEntries; + +import com.fasterxml.jackson.databind.JsonNode; +import io.airbyte.commons.json.Jsons; +import io.airbyte.integrations.destination.iceberg.config.format.DataFileFormat; +import io.airbyte.integrations.destination.iceberg.hive.IcebergHiveCatalogS3ParquetIntegrationTest; +import java.nio.file.Path; +import java.time.Duration; +import java.util.Map; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.testcontainers.containers.DockerComposeContainer; +import org.testcontainers.containers.PostgreSQLContainer; +import org.testcontainers.containers.wait.strategy.Wait; + +/** + * @author Leibniz on 2022/11/4. + */ +public class HiveMetastoreS3PostgresCompose extends DockerComposeContainer { + + private static final Logger LOGGER = LoggerFactory.getLogger(IcebergHiveCatalogS3ParquetIntegrationTest.class); + + private static final String LOCAL_RELATIVE_PATH = "src/test-integration/resources/"; + private static final String METASTORE_COMPOSE_PATH = LOCAL_RELATIVE_PATH + "hive-metastore-compose.yml"; + public static final int METASTORE_PORT = 9083; + private static final String POSTGRES_SERVICE_NAME = "postgres_1"; + private static final String MINIO_SERVICE_NAME = "minio_1"; + private static final String METASTORE_SERVICE_NAME = "hive_metastore_1"; + + public HiveMetastoreS3PostgresCompose() { + super(Path.of(METASTORE_COMPOSE_PATH).toFile()); + super.withExposedService(POSTGRES_SERVICE_NAME, + PostgreSQLContainer.POSTGRESQL_PORT, + Wait.forLogMessage(".*database system is ready to accept connections.*\\s", 2) + .withStartupTimeout(Duration.ofSeconds(60))) + .withExposedService(MINIO_SERVICE_NAME, + MinioContainer.MINIO_PORT, + Wait.forHttp(MinioContainer.HEALTH_ENDPOINT).withStartupTimeout(Duration.ofSeconds(60))) + .withExposedService(METASTORE_SERVICE_NAME, + METASTORE_PORT, + Wait.forListeningPort().withStartupTimeout(Duration.ofMinutes(5))) + .withLocalCompose(true); + } + + @Override + public void start() { + long startTime = System.currentTimeMillis(); + super.start(); + LOGGER.info("PostgreSQL port: {}", getServicePort(POSTGRES_SERVICE_NAME, PostgreSQLContainer.POSTGRESQL_PORT)); + LOGGER.info("Minio port: {}", getServicePort(MINIO_SERVICE_NAME, MinioContainer.MINIO_PORT)); + LOGGER.info("Hive Metastore port: {}", getServicePort(METASTORE_SERVICE_NAME, METASTORE_PORT)); + LOGGER.info("Hive Metastore docker-compose startup cost: {} ms", System.currentTimeMillis() - startTime); + } + + public String s3Endpoint() { + return "http://localhost:" + getServicePort(MINIO_SERVICE_NAME, MinioContainer.MINIO_PORT); + } + + public String thriftUri() { + return "thrift://localhost:" + getServicePort(METASTORE_SERVICE_NAME, METASTORE_PORT); + } + + public JsonNode getComposeConfig(DataFileFormat fileFormat) { + String s3Endpoint = this.s3Endpoint(); + LOGGER.info("Configurate S3 endpoint to {}", s3Endpoint); + return Jsons.jsonNode(ofEntries( + entry(ICEBERG_CATALOG_CONFIG_KEY, + Jsons.jsonNode(ofEntries( + entry(ICEBERG_CATALOG_TYPE_CONFIG_KEY, "Hive"), + entry(HIVE_THRIFT_URI_CONFIG_KEY, this.thriftUri()), + entry(DEFAULT_DATABASE_CONFIG_KEY, "test")))), + entry(ICEBERG_STORAGE_CONFIG_KEY, + Jsons.jsonNode(ofEntries( + entry(ICEBERG_STORAGE_TYPE_CONFIG_KEY, "S3"), + entry(S3_ACCESS_KEY_ID_CONFIG_KEY, DEFAULT_ACCESS_KEY), + entry(S3_SECRET_KEY_CONFIG_KEY, DEFAULT_SECRET_KEY), + entry(S3_WAREHOUSE_URI_CONFIG_KEY, "s3a://" + WAREHOUSE_BUCKET_NAME + "/hadoop"), + entry(S3_BUCKET_REGION_CONFIG_KEY, "us-east-1"), + entry(S3_ENDPOINT_CONFIG_KEY, s3Endpoint)))), + entry(ICEBERG_FORMAT_CONFIG_KEY, + Jsons.jsonNode(Map.of("format", fileFormat.getConfigValue()))))); + } + + public JsonNode getWrongConfig() { + return Jsons.jsonNode(ofEntries( + entry(ICEBERG_CATALOG_CONFIG_KEY, + Jsons.jsonNode(ofEntries( + entry(ICEBERG_CATALOG_TYPE_CONFIG_KEY, "Hive"), + entry(HIVE_THRIFT_URI_CONFIG_KEY, "wrong-host:1234"), + entry(DEFAULT_DATABASE_CONFIG_KEY, "default")))), + entry(ICEBERG_STORAGE_CONFIG_KEY, + Jsons.jsonNode(ofEntries(entry(ICEBERG_STORAGE_TYPE_CONFIG_KEY, "S3"), + entry(S3_ACCESS_KEY_ID_CONFIG_KEY, DEFAULT_ACCESS_KEY), + entry(S3_SECRET_KEY_CONFIG_KEY, "wrong_secret_key"), + entry(S3_WAREHOUSE_URI_CONFIG_KEY, "s3a://warehouse/hadoop"), + entry(S3_BUCKET_REGION_CONFIG_KEY, "us-east-1"), + entry(S3_ENDPOINT_CONFIG_KEY, this.s3Endpoint())))), + entry(ICEBERG_FORMAT_CONFIG_KEY, Jsons.jsonNode(Map.of("format", "wrong-format"))))); + } + +} diff --git a/airbyte-integrations/connectors/destination-iceberg/src/test-integration/java/io/airbyte/integrations/destination/iceberg/container/MinioContainer.java b/airbyte-integrations/connectors/destination-iceberg/src/test-integration/java/io/airbyte/integrations/destination/iceberg/container/MinioContainer.java new file mode 100644 index 0000000000000..c3606979da131 --- /dev/null +++ b/airbyte-integrations/connectors/destination-iceberg/src/test-integration/java/io/airbyte/integrations/destination/iceberg/container/MinioContainer.java @@ -0,0 +1,83 @@ +/* + * Copyright (c) 2022 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.integrations.destination.iceberg.container; + +import java.time.Duration; +import java.util.List; +import org.testcontainers.containers.GenericContainer; +import org.testcontainers.containers.wait.strategy.HttpWaitStrategy; + +/** + * @author Leibniz on 2022/11/3. + */ +public class MinioContainer extends GenericContainer { + + public static final String DEFAULT_ACCESS_KEY = "DEFAULT_ACCESS_KEY"; + public static final String DEFAULT_SECRET_KEY = "DEFAULT_SECRET_KEY"; + + public static final int MINIO_PORT = 9000; + private static final String DEFAULT_IMAGE = "minio/minio"; + private static final String DEFAULT_TAG = "edge"; + + private static final String MINIO_ACCESS_KEY = "MINIO_ACCESS_KEY"; + private static final String MINIO_SECRET_KEY = "MINIO_SECRET_KEY"; + + private static final String DEFAULT_STORAGE_DIRECTORY = "/data"; + public static final String HEALTH_ENDPOINT = "/minio/health/ready"; + + public MinioContainer() { + this(DEFAULT_IMAGE + ":" + DEFAULT_TAG, null, null); + } + + public MinioContainer(CredentialsProvider credentials) { + this(DEFAULT_IMAGE + ":" + DEFAULT_TAG, credentials, null); + } + + public MinioContainer(String image, CredentialsProvider credentials, Integer bindPort) { + super(image == null ? DEFAULT_IMAGE + ":" + DEFAULT_TAG : image); + addExposedPort(MINIO_PORT); + if (credentials != null) { + withEnv(MINIO_ACCESS_KEY, credentials.getAccessKey()); + withEnv(MINIO_SECRET_KEY, credentials.getSecretKey()); + } + withCommand("server", DEFAULT_STORAGE_DIRECTORY); + setWaitStrategy(new HttpWaitStrategy() + .forPort(MINIO_PORT) + .forPath(HEALTH_ENDPOINT) + .withStartupTimeout(Duration.ofMinutes(2))); + if (bindPort != null) { + setPortBindings(List.of(bindPort + ":" + MINIO_PORT)); + } + } + + public String getHostAddress() { + return getContainerIpAddress() + ":" + getMappedPort(MINIO_PORT); + } + + public int getPort() { + return getMappedPort(MINIO_PORT); + } + + public static class CredentialsProvider { + + private final String accessKey; + private final String secretKey; + + public CredentialsProvider(String accessKey, String secretKey) { + this.accessKey = accessKey; + this.secretKey = secretKey; + } + + public String getAccessKey() { + return accessKey; + } + + public String getSecretKey() { + return secretKey; + } + + } + +} diff --git a/airbyte-integrations/connectors/destination-iceberg/src/test-integration/java/io/airbyte/integrations/destination/iceberg/hadoop/BaseIcebergHadoopCatalogS3IntegrationTest.java b/airbyte-integrations/connectors/destination-iceberg/src/test-integration/java/io/airbyte/integrations/destination/iceberg/hadoop/BaseIcebergHadoopCatalogS3IntegrationTest.java new file mode 100644 index 0000000000000..aace8e520ee69 --- /dev/null +++ b/airbyte-integrations/connectors/destination-iceberg/src/test-integration/java/io/airbyte/integrations/destination/iceberg/hadoop/BaseIcebergHadoopCatalogS3IntegrationTest.java @@ -0,0 +1,110 @@ +/* + * Copyright (c) 2022 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.integrations.destination.iceberg.hadoop; + +import static io.airbyte.integrations.destination.iceberg.IcebergConstants.DEFAULT_DATABASE_CONFIG_KEY; +import static io.airbyte.integrations.destination.iceberg.IcebergConstants.ICEBERG_CATALOG_CONFIG_KEY; +import static io.airbyte.integrations.destination.iceberg.IcebergConstants.ICEBERG_CATALOG_TYPE_CONFIG_KEY; +import static io.airbyte.integrations.destination.iceberg.IcebergConstants.ICEBERG_FORMAT_CONFIG_KEY; +import static io.airbyte.integrations.destination.iceberg.IcebergConstants.ICEBERG_STORAGE_CONFIG_KEY; +import static io.airbyte.integrations.destination.iceberg.IcebergConstants.ICEBERG_STORAGE_TYPE_CONFIG_KEY; +import static io.airbyte.integrations.destination.iceberg.IcebergConstants.S3_ACCESS_KEY_ID_CONFIG_KEY; +import static io.airbyte.integrations.destination.iceberg.IcebergConstants.S3_BUCKET_REGION_CONFIG_KEY; +import static io.airbyte.integrations.destination.iceberg.IcebergConstants.S3_ENDPOINT_CONFIG_KEY; +import static io.airbyte.integrations.destination.iceberg.IcebergConstants.S3_SECRET_KEY_CONFIG_KEY; +import static io.airbyte.integrations.destination.iceberg.IcebergConstants.S3_WAREHOUSE_URI_CONFIG_KEY; +import static io.airbyte.integrations.destination.iceberg.IcebergIntegrationTestUtil.ICEBERG_IMAGE_NAME; +import static io.airbyte.integrations.destination.iceberg.IcebergIntegrationTestUtil.WAREHOUSE_BUCKET_NAME; +import static io.airbyte.integrations.destination.iceberg.container.MinioContainer.DEFAULT_ACCESS_KEY; +import static io.airbyte.integrations.destination.iceberg.container.MinioContainer.DEFAULT_SECRET_KEY; +import static java.util.Map.entry; +import static java.util.Map.ofEntries; + +import com.fasterxml.jackson.databind.JsonNode; +import io.airbyte.commons.json.Jsons; +import io.airbyte.integrations.destination.iceberg.IcebergIntegrationTestUtil; +import io.airbyte.integrations.destination.iceberg.config.format.DataFileFormat; +import io.airbyte.integrations.destination.iceberg.container.MinioContainer; +import io.airbyte.integrations.standardtest.destination.DestinationAcceptanceTest; +import io.airbyte.integrations.util.HostPortResolver; +import java.util.List; +import java.util.Map; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * @author Leibniz on 2022/11/3. + */ +public abstract class BaseIcebergHadoopCatalogS3IntegrationTest extends DestinationAcceptanceTest { + + private static final Logger LOGGER = LoggerFactory.getLogger(BaseIcebergHadoopCatalogS3IntegrationTest.class); + private MinioContainer s3Storage; + + @Override + protected void setup(final TestDestinationEnv testEnv) { + s3Storage = IcebergIntegrationTestUtil.createAndStartMinioContainer(null); + IcebergIntegrationTestUtil.createS3WarehouseBucket(getConfig()); + } + + @Override + protected void tearDown(final TestDestinationEnv testEnv) { + IcebergIntegrationTestUtil.stopAndCloseContainer(s3Storage, "Minio"); + } + + @Override + protected String getImageName() { + return ICEBERG_IMAGE_NAME; + } + + @Override + protected JsonNode getConfig() { + String s3Endpoint = "http://" + s3Storage.getHostAddress(); + LOGGER.info("Configurate S3 endpoint to {}", s3Endpoint); + return Jsons.jsonNode(ofEntries( + entry(ICEBERG_CATALOG_CONFIG_KEY, + Jsons.jsonNode(ofEntries( + entry(ICEBERG_CATALOG_TYPE_CONFIG_KEY, "Hadoop"), + entry(DEFAULT_DATABASE_CONFIG_KEY, "default")))), + entry(ICEBERG_STORAGE_CONFIG_KEY, + Jsons.jsonNode(ofEntries(entry(ICEBERG_STORAGE_TYPE_CONFIG_KEY, "S3"), + entry(S3_ACCESS_KEY_ID_CONFIG_KEY, DEFAULT_ACCESS_KEY), + entry(S3_SECRET_KEY_CONFIG_KEY, DEFAULT_SECRET_KEY), + entry(S3_WAREHOUSE_URI_CONFIG_KEY, "s3a://" + WAREHOUSE_BUCKET_NAME + "/hadoop"), + entry(S3_BUCKET_REGION_CONFIG_KEY, "us-east-1"), + entry(S3_ENDPOINT_CONFIG_KEY, s3Endpoint)))), + entry(ICEBERG_FORMAT_CONFIG_KEY, Jsons.jsonNode(Map.of("format", fileFormat().getConfigValue()))))); + } + + @Override + protected JsonNode getFailCheckConfig() { + String s3Endpoint = "http://%s:%s".formatted(HostPortResolver.resolveHost(s3Storage), + HostPortResolver.resolvePort(s3Storage)); + return Jsons.jsonNode(ofEntries( + entry(ICEBERG_CATALOG_CONFIG_KEY, + Jsons.jsonNode(ofEntries( + entry(ICEBERG_CATALOG_TYPE_CONFIG_KEY, "Hadoop"), + entry(DEFAULT_DATABASE_CONFIG_KEY, "default")))), + entry(ICEBERG_STORAGE_CONFIG_KEY, + Jsons.jsonNode(ofEntries(entry(ICEBERG_STORAGE_TYPE_CONFIG_KEY, "S3"), + entry(S3_ACCESS_KEY_ID_CONFIG_KEY, DEFAULT_ACCESS_KEY), + entry(S3_SECRET_KEY_CONFIG_KEY, "wrong_secret_key"), + entry(S3_WAREHOUSE_URI_CONFIG_KEY, "s3a://warehouse/hadoop"), + entry(S3_BUCKET_REGION_CONFIG_KEY, "us-east-1"), + entry(S3_ENDPOINT_CONFIG_KEY, s3Endpoint)))), + entry(ICEBERG_FORMAT_CONFIG_KEY, Jsons.jsonNode(Map.of("format", fileFormat().getConfigValue()))))); + } + + @Override + protected List retrieveRecords(TestDestinationEnv testEnv, + String streamName, + String namespace, + JsonNode streamSchema) + throws Exception { + return IcebergIntegrationTestUtil.retrieveRecords(getConfig(), namespace, streamName); + } + + abstract DataFileFormat fileFormat(); + +} diff --git a/airbyte-integrations/connectors/destination-iceberg/src/test-integration/java/io/airbyte/integrations/destination/iceberg/hadoop/IcebergHadoopCatalogS3AvroIntegrationTest.java b/airbyte-integrations/connectors/destination-iceberg/src/test-integration/java/io/airbyte/integrations/destination/iceberg/hadoop/IcebergHadoopCatalogS3AvroIntegrationTest.java new file mode 100644 index 0000000000000..acae735466360 --- /dev/null +++ b/airbyte-integrations/connectors/destination-iceberg/src/test-integration/java/io/airbyte/integrations/destination/iceberg/hadoop/IcebergHadoopCatalogS3AvroIntegrationTest.java @@ -0,0 +1,19 @@ +/* + * Copyright (c) 2022 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.integrations.destination.iceberg.hadoop; + +import io.airbyte.integrations.destination.iceberg.config.format.DataFileFormat; + +/** + * @author Leibniz on 2022/11/3. + */ +public class IcebergHadoopCatalogS3AvroIntegrationTest extends BaseIcebergHadoopCatalogS3IntegrationTest { + + @Override + DataFileFormat fileFormat() { + return DataFileFormat.AVRO; + } + +} diff --git a/airbyte-integrations/connectors/destination-iceberg/src/test-integration/java/io/airbyte/integrations/destination/iceberg/hadoop/IcebergHadoopCatalogS3ParquetIntegrationTest.java b/airbyte-integrations/connectors/destination-iceberg/src/test-integration/java/io/airbyte/integrations/destination/iceberg/hadoop/IcebergHadoopCatalogS3ParquetIntegrationTest.java new file mode 100644 index 0000000000000..2e4ef0f2065ba --- /dev/null +++ b/airbyte-integrations/connectors/destination-iceberg/src/test-integration/java/io/airbyte/integrations/destination/iceberg/hadoop/IcebergHadoopCatalogS3ParquetIntegrationTest.java @@ -0,0 +1,19 @@ +/* + * Copyright (c) 2022 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.integrations.destination.iceberg.hadoop; + +import io.airbyte.integrations.destination.iceberg.config.format.DataFileFormat; + +/** + * @author Leibniz on 2022/11/3. + */ +public class IcebergHadoopCatalogS3ParquetIntegrationTest extends BaseIcebergHadoopCatalogS3IntegrationTest { + + @Override + DataFileFormat fileFormat() { + return DataFileFormat.PARQUET; + } + +} diff --git a/airbyte-integrations/connectors/destination-iceberg/src/test-integration/java/io/airbyte/integrations/destination/iceberg/hive/IcebergHiveCatalogS3AvroIntegrationTest.java b/airbyte-integrations/connectors/destination-iceberg/src/test-integration/java/io/airbyte/integrations/destination/iceberg/hive/IcebergHiveCatalogS3AvroIntegrationTest.java new file mode 100644 index 0000000000000..42164da0d973b --- /dev/null +++ b/airbyte-integrations/connectors/destination-iceberg/src/test-integration/java/io/airbyte/integrations/destination/iceberg/hive/IcebergHiveCatalogS3AvroIntegrationTest.java @@ -0,0 +1,80 @@ +/* + * Copyright (c) 2022 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.integrations.destination.iceberg.hive; + +import static io.airbyte.integrations.destination.iceberg.IcebergIntegrationTestUtil.ICEBERG_IMAGE_NAME; + +import com.fasterxml.jackson.databind.JsonNode; +import io.airbyte.integrations.destination.iceberg.IcebergIntegrationTestUtil; +import io.airbyte.integrations.destination.iceberg.config.format.DataFileFormat; +import io.airbyte.integrations.destination.iceberg.container.HiveMetastoreS3PostgresCompose; +import io.airbyte.integrations.standardtest.destination.DestinationAcceptanceTest; +import java.util.List; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.BeforeAll; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * @author Leibniz on 2022/11/3. + */ +public class IcebergHiveCatalogS3AvroIntegrationTest extends DestinationAcceptanceTest { + + private static final Logger LOGGER = LoggerFactory.getLogger(IcebergHiveCatalogS3AvroIntegrationTest.class); + + /** + * start-up of hive metastore server takes minutes (including pg table initializing) so put the + * docker-compose environment here as a static member, only start once + */ + private static HiveMetastoreS3PostgresCompose metastoreCompose; + + private static JsonNode config; + + @BeforeAll + public static void startCompose() { + metastoreCompose = new HiveMetastoreS3PostgresCompose(); + metastoreCompose.start(); + config = metastoreCompose.getComposeConfig(DataFileFormat.AVRO); + IcebergIntegrationTestUtil.createS3WarehouseBucket(config); + LOGGER.info("==> Started Hive Metastore docker compose containers..."); + + } + + @AfterAll + public static void stopCompose() { + IcebergIntegrationTestUtil.stopAndCloseContainer(metastoreCompose, "Hive Metastore"); + } + + @Override + protected void setup(final TestDestinationEnv testEnv) {} + + @Override + protected void tearDown(final TestDestinationEnv testEnv) {} + + @Override + protected String getImageName() { + return ICEBERG_IMAGE_NAME; + } + + @Override + protected JsonNode getConfig() { + return config; + } + + @Override + protected JsonNode getFailCheckConfig() { + return metastoreCompose.getWrongConfig(); + } + + @Override + protected List retrieveRecords(TestDestinationEnv testEnv, + String streamName, + String namespace, + JsonNode streamSchema) + throws Exception { + return IcebergIntegrationTestUtil.retrieveRecords(getConfig(), namespace, streamName); + } + +} diff --git a/airbyte-integrations/connectors/destination-iceberg/src/test-integration/java/io/airbyte/integrations/destination/iceberg/hive/IcebergHiveCatalogS3ParquetIntegrationTest.java b/airbyte-integrations/connectors/destination-iceberg/src/test-integration/java/io/airbyte/integrations/destination/iceberg/hive/IcebergHiveCatalogS3ParquetIntegrationTest.java new file mode 100644 index 0000000000000..c70952394860b --- /dev/null +++ b/airbyte-integrations/connectors/destination-iceberg/src/test-integration/java/io/airbyte/integrations/destination/iceberg/hive/IcebergHiveCatalogS3ParquetIntegrationTest.java @@ -0,0 +1,80 @@ +/* + * Copyright (c) 2022 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.integrations.destination.iceberg.hive; + +import static io.airbyte.integrations.destination.iceberg.IcebergIntegrationTestUtil.ICEBERG_IMAGE_NAME; + +import com.fasterxml.jackson.databind.JsonNode; +import io.airbyte.integrations.destination.iceberg.IcebergIntegrationTestUtil; +import io.airbyte.integrations.destination.iceberg.config.format.DataFileFormat; +import io.airbyte.integrations.destination.iceberg.container.HiveMetastoreS3PostgresCompose; +import io.airbyte.integrations.standardtest.destination.DestinationAcceptanceTest; +import java.util.List; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.BeforeAll; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * @author Leibniz on 2022/11/3. + */ +public class IcebergHiveCatalogS3ParquetIntegrationTest extends DestinationAcceptanceTest { + + private static final Logger LOGGER = LoggerFactory.getLogger(IcebergHiveCatalogS3ParquetIntegrationTest.class); + + /** + * start-up of hive metastore server takes minutes (including pg table initializing) so put the + * docker-compose environment here as a static member, only start once + */ + private static HiveMetastoreS3PostgresCompose metastoreCompose; + + private static JsonNode config; + + @BeforeAll + public static void startCompose() { + metastoreCompose = new HiveMetastoreS3PostgresCompose(); + metastoreCompose.start(); + config = metastoreCompose.getComposeConfig(DataFileFormat.PARQUET); + IcebergIntegrationTestUtil.createS3WarehouseBucket(config); + LOGGER.info("==> Started Hive Metastore docker compose containers..."); + + } + + @AfterAll + public static void stopCompose() { + IcebergIntegrationTestUtil.stopAndCloseContainer(metastoreCompose, "Hive Metastore"); + } + + @Override + protected void setup(final TestDestinationEnv testEnv) {} + + @Override + protected void tearDown(final TestDestinationEnv testEnv) {} + + @Override + protected String getImageName() { + return ICEBERG_IMAGE_NAME; + } + + @Override + protected JsonNode getConfig() { + return config; + } + + @Override + protected JsonNode getFailCheckConfig() { + return metastoreCompose.getWrongConfig(); + } + + @Override + protected List retrieveRecords(TestDestinationEnv testEnv, + String streamName, + String namespace, + JsonNode streamSchema) + throws Exception { + return IcebergIntegrationTestUtil.retrieveRecords(getConfig(), namespace, streamName); + } + +} diff --git a/airbyte-integrations/connectors/destination-iceberg/src/test-integration/java/io/airbyte/integrations/destination/iceberg/jdbc/BaseIcebergJdbcCatalogS3IntegrationTest.java b/airbyte-integrations/connectors/destination-iceberg/src/test-integration/java/io/airbyte/integrations/destination/iceberg/jdbc/BaseIcebergJdbcCatalogS3IntegrationTest.java new file mode 100644 index 0000000000000..e79b576dba80c --- /dev/null +++ b/airbyte-integrations/connectors/destination-iceberg/src/test-integration/java/io/airbyte/integrations/destination/iceberg/jdbc/BaseIcebergJdbcCatalogS3IntegrationTest.java @@ -0,0 +1,138 @@ +/* + * Copyright (c) 2022 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.integrations.destination.iceberg.jdbc; + +import static io.airbyte.integrations.destination.iceberg.IcebergConstants.DEFAULT_DATABASE_CONFIG_KEY; +import static io.airbyte.integrations.destination.iceberg.IcebergConstants.ICEBERG_CATALOG_CONFIG_KEY; +import static io.airbyte.integrations.destination.iceberg.IcebergConstants.ICEBERG_CATALOG_TYPE_CONFIG_KEY; +import static io.airbyte.integrations.destination.iceberg.IcebergConstants.ICEBERG_FORMAT_CONFIG_KEY; +import static io.airbyte.integrations.destination.iceberg.IcebergConstants.ICEBERG_STORAGE_CONFIG_KEY; +import static io.airbyte.integrations.destination.iceberg.IcebergConstants.ICEBERG_STORAGE_TYPE_CONFIG_KEY; +import static io.airbyte.integrations.destination.iceberg.IcebergConstants.JDBC_CATALOG_SCHEMA_CONFIG_KEY; +import static io.airbyte.integrations.destination.iceberg.IcebergConstants.JDBC_PASSWORD_CONFIG_KEY; +import static io.airbyte.integrations.destination.iceberg.IcebergConstants.JDBC_SSL_CONFIG_KEY; +import static io.airbyte.integrations.destination.iceberg.IcebergConstants.JDBC_URL_CONFIG_KEY; +import static io.airbyte.integrations.destination.iceberg.IcebergConstants.JDBC_USERNAME_CONFIG_KEY; +import static io.airbyte.integrations.destination.iceberg.IcebergConstants.S3_ACCESS_KEY_ID_CONFIG_KEY; +import static io.airbyte.integrations.destination.iceberg.IcebergConstants.S3_BUCKET_REGION_CONFIG_KEY; +import static io.airbyte.integrations.destination.iceberg.IcebergConstants.S3_ENDPOINT_CONFIG_KEY; +import static io.airbyte.integrations.destination.iceberg.IcebergConstants.S3_SECRET_KEY_CONFIG_KEY; +import static io.airbyte.integrations.destination.iceberg.IcebergConstants.S3_WAREHOUSE_URI_CONFIG_KEY; +import static io.airbyte.integrations.destination.iceberg.IcebergIntegrationTestUtil.ICEBERG_IMAGE_NAME; +import static io.airbyte.integrations.destination.iceberg.IcebergIntegrationTestUtil.WAREHOUSE_BUCKET_NAME; +import static io.airbyte.integrations.destination.iceberg.container.MinioContainer.DEFAULT_ACCESS_KEY; +import static io.airbyte.integrations.destination.iceberg.container.MinioContainer.DEFAULT_SECRET_KEY; +import static java.util.Map.entry; +import static java.util.Map.ofEntries; + +import com.fasterxml.jackson.databind.JsonNode; +import io.airbyte.commons.json.Jsons; +import io.airbyte.integrations.destination.iceberg.IcebergIntegrationTestUtil; +import io.airbyte.integrations.destination.iceberg.config.format.DataFileFormat; +import io.airbyte.integrations.destination.iceberg.container.MinioContainer; +import io.airbyte.integrations.standardtest.destination.DestinationAcceptanceTest; +import io.airbyte.integrations.util.HostPortResolver; +import java.util.List; +import java.util.Map; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.testcontainers.containers.PostgreSQLContainer; + +/** + * @author Leibniz on 2022/11/3. + */ +public abstract class BaseIcebergJdbcCatalogS3IntegrationTest extends DestinationAcceptanceTest { + + private static final Logger LOGGER = LoggerFactory.getLogger(BaseIcebergJdbcCatalogS3IntegrationTest.class); + private static final String PG_SCHEMA = "public"; + + private PostgreSQLContainer catalogDb; + private MinioContainer s3Storage; + + @Override + protected void setup(final TestDestinationEnv testEnv) { + catalogDb = new PostgreSQLContainer<>("postgres:13-alpine"); + catalogDb.start(); + LOGGER.info("==> Started PostgreSQL docker container..."); + + s3Storage = IcebergIntegrationTestUtil.createAndStartMinioContainer(null); + IcebergIntegrationTestUtil.createS3WarehouseBucket(getConfig()); + } + + @Override + protected void tearDown(final TestDestinationEnv testEnv) { + IcebergIntegrationTestUtil.stopAndCloseContainer(s3Storage, "Minio"); + IcebergIntegrationTestUtil.stopAndCloseContainer(catalogDb, "PostgreSQL"); + } + + @Override + protected String getImageName() { + return ICEBERG_IMAGE_NAME; + } + + @Override + protected JsonNode getConfig() { + String jdbcUrl = catalogDb.getJdbcUrl(); + LOGGER.info("Postgresql jdbc url: {}", jdbcUrl); + String s3Endpoint = "http://" + s3Storage.getHostAddress(); + return Jsons.jsonNode(ofEntries( + entry(ICEBERG_CATALOG_CONFIG_KEY, + Jsons.jsonNode(ofEntries( + entry(ICEBERG_CATALOG_TYPE_CONFIG_KEY, "Jdbc"), + entry(DEFAULT_DATABASE_CONFIG_KEY, PG_SCHEMA), + entry(JDBC_URL_CONFIG_KEY, jdbcUrl), + entry(JDBC_USERNAME_CONFIG_KEY, catalogDb.getUsername()), + entry(JDBC_PASSWORD_CONFIG_KEY, catalogDb.getPassword()), + entry(JDBC_SSL_CONFIG_KEY, false), + entry(JDBC_CATALOG_SCHEMA_CONFIG_KEY, PG_SCHEMA)))), + entry(ICEBERG_STORAGE_CONFIG_KEY, + Jsons.jsonNode(ofEntries(entry(ICEBERG_STORAGE_TYPE_CONFIG_KEY, "S3"), + entry(S3_ACCESS_KEY_ID_CONFIG_KEY, DEFAULT_ACCESS_KEY), + entry(S3_SECRET_KEY_CONFIG_KEY, DEFAULT_SECRET_KEY), + entry(S3_WAREHOUSE_URI_CONFIG_KEY, "s3a://" + WAREHOUSE_BUCKET_NAME + "/jdbc"), + entry(S3_BUCKET_REGION_CONFIG_KEY, "us-east-1"), + entry(S3_ENDPOINT_CONFIG_KEY, s3Endpoint)))), + entry(ICEBERG_FORMAT_CONFIG_KEY, Jsons.jsonNode(Map.of("format", fileFormat().getConfigValue()))))); + } + + @Override + protected JsonNode getFailCheckConfig() { + String jdbcUrl = "jdbc:postgresql://%s:%d/%s".formatted(HostPortResolver.resolveHost(catalogDb), + HostPortResolver.resolvePort(catalogDb), + catalogDb.getDatabaseName()); + String s3Endpoint = "http://%s:%s".formatted(HostPortResolver.resolveHost(s3Storage), + HostPortResolver.resolvePort(s3Storage)); + return Jsons.jsonNode(ofEntries( + entry(ICEBERG_CATALOG_CONFIG_KEY, + Jsons.jsonNode(ofEntries( + entry(ICEBERG_CATALOG_TYPE_CONFIG_KEY, "Jdbc"), + entry(DEFAULT_DATABASE_CONFIG_KEY, PG_SCHEMA), + entry(JDBC_URL_CONFIG_KEY, jdbcUrl), + entry(JDBC_USERNAME_CONFIG_KEY, catalogDb.getUsername()), + entry(JDBC_PASSWORD_CONFIG_KEY, "wrong_password"), + entry(JDBC_SSL_CONFIG_KEY, false), + entry(JDBC_CATALOG_SCHEMA_CONFIG_KEY, PG_SCHEMA)))), + entry(ICEBERG_STORAGE_CONFIG_KEY, + Jsons.jsonNode(ofEntries(entry(ICEBERG_STORAGE_TYPE_CONFIG_KEY, "S3"), + entry(S3_ACCESS_KEY_ID_CONFIG_KEY, DEFAULT_ACCESS_KEY), + entry(S3_SECRET_KEY_CONFIG_KEY, "wrong_secret_key"), + entry(S3_WAREHOUSE_URI_CONFIG_KEY, "s3a://warehouse/jdbc"), + entry(S3_BUCKET_REGION_CONFIG_KEY, "us-east-1"), + entry(S3_ENDPOINT_CONFIG_KEY, s3Endpoint)))), + entry(ICEBERG_FORMAT_CONFIG_KEY, Jsons.jsonNode(Map.of("format", fileFormat().getConfigValue()))))); + } + + @Override + protected List retrieveRecords(TestDestinationEnv testEnv, + String streamName, + String namespace, + JsonNode streamSchema) + throws Exception { + return IcebergIntegrationTestUtil.retrieveRecords(getConfig(), namespace, streamName); + } + + abstract DataFileFormat fileFormat(); + +} diff --git a/airbyte-integrations/connectors/destination-iceberg/src/test-integration/java/io/airbyte/integrations/destination/iceberg/jdbc/IcebergJdbcCatalogS3AvroIntegrationTest.java b/airbyte-integrations/connectors/destination-iceberg/src/test-integration/java/io/airbyte/integrations/destination/iceberg/jdbc/IcebergJdbcCatalogS3AvroIntegrationTest.java new file mode 100644 index 0000000000000..c96cf224b704a --- /dev/null +++ b/airbyte-integrations/connectors/destination-iceberg/src/test-integration/java/io/airbyte/integrations/destination/iceberg/jdbc/IcebergJdbcCatalogS3AvroIntegrationTest.java @@ -0,0 +1,19 @@ +/* + * Copyright (c) 2022 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.integrations.destination.iceberg.jdbc; + +import io.airbyte.integrations.destination.iceberg.config.format.DataFileFormat; + +/** + * @author Leibniz on 2022/11/3. + */ +public class IcebergJdbcCatalogS3AvroIntegrationTest extends BaseIcebergJdbcCatalogS3IntegrationTest { + + @Override + DataFileFormat fileFormat() { + return DataFileFormat.AVRO; + } + +} diff --git a/airbyte-integrations/connectors/destination-iceberg/src/test-integration/java/io/airbyte/integrations/destination/iceberg/jdbc/IcebergJdbcCatalogS3ParquetIntegrationTest.java b/airbyte-integrations/connectors/destination-iceberg/src/test-integration/java/io/airbyte/integrations/destination/iceberg/jdbc/IcebergJdbcCatalogS3ParquetIntegrationTest.java new file mode 100644 index 0000000000000..4a5d382bcb713 --- /dev/null +++ b/airbyte-integrations/connectors/destination-iceberg/src/test-integration/java/io/airbyte/integrations/destination/iceberg/jdbc/IcebergJdbcCatalogS3ParquetIntegrationTest.java @@ -0,0 +1,19 @@ +/* + * Copyright (c) 2022 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.integrations.destination.iceberg.jdbc; + +import io.airbyte.integrations.destination.iceberg.config.format.DataFileFormat; + +/** + * @author Leibniz on 2022/11/3. + */ +public class IcebergJdbcCatalogS3ParquetIntegrationTest extends BaseIcebergJdbcCatalogS3IntegrationTest { + + @Override + DataFileFormat fileFormat() { + return DataFileFormat.PARQUET; + } + +} diff --git a/airbyte-integrations/connectors/destination-iceberg/src/test-integration/resources/core-site.xml b/airbyte-integrations/connectors/destination-iceberg/src/test-integration/resources/core-site.xml new file mode 100644 index 0000000000000..90f13d57b1e30 --- /dev/null +++ b/airbyte-integrations/connectors/destination-iceberg/src/test-integration/resources/core-site.xml @@ -0,0 +1,9 @@ + + + + + + fs.s3a.impl.disable.cache + true + + \ No newline at end of file diff --git a/airbyte-integrations/connectors/destination-iceberg/src/test-integration/resources/hive-metastore-compose.yml b/airbyte-integrations/connectors/destination-iceberg/src/test-integration/resources/hive-metastore-compose.yml new file mode 100644 index 0000000000000..7f6e40e7ce84e --- /dev/null +++ b/airbyte-integrations/connectors/destination-iceberg/src/test-integration/resources/hive-metastore-compose.yml @@ -0,0 +1,27 @@ +version: "2" + +services: + postgres: + image: postgres:13-alpine + environment: + POSTGRES_DB: test + POSTGRES_USER: test + POSTGRES_PASSWORD: test + command: postgres -c fsync=off + + minio: + image: minio/minio:RELEASE.2022-10-29T06-21-33Z.fips + environment: + - MINIO_ROOT_USER=DEFAULT_ACCESS_KEY + - MINIO_ROOT_PASSWORD=DEFAULT_SECRET_KEY + command: server /data + + hive_metastore: + image: meneal/docker-hive:hadoop2.9.2-hive2.3.7 + volumes: + - ./metastore-site.xml:/opt/apache-hive-3.1.2-bin/conf/hivemetastore-site.xml + - ./metastore-log4j.properties:/opt/apache-hive-3.1.2-bin/conf/hive-log4j2.properties + entrypoint: sh -c "bin/schematool -initSchema -dbType postgres && bin/hive --service metastore" + depends_on: + - postgres + - minio \ No newline at end of file diff --git a/airbyte-integrations/connectors/destination-iceberg/src/test-integration/resources/metastore-log4j.properties b/airbyte-integrations/connectors/destination-iceberg/src/test-integration/resources/metastore-log4j.properties new file mode 100644 index 0000000000000..c8e23faffb3f8 --- /dev/null +++ b/airbyte-integrations/connectors/destination-iceberg/src/test-integration/resources/metastore-log4j.properties @@ -0,0 +1,50 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +status=INFO +name=HiveLog4j2 +packages=org.apache.hadoop.hive.ql.log +# list of properties +property.hive.log.level=INFO +property.hive.root.logger=console +property.hive.log.dir=${sys:java.io.tmpdir}/${sys:user.name} +property.hive.log.file=hive.log +property.hive.perflogger.log.level=INFO +# list of all appenders +appenders=console +# console appender +appender.console.type=Console +appender.console.name=console +appender.console.target=SYSTEM_ERR +appender.console.layout.type=PatternLayout +appender.console.layout.pattern=%d{ISO8601} %5p [%t] %c{2}: %m%n +# list of all loggers +loggers=NIOServerCnxn, ClientCnxnSocketNIO, DataNucleus, Datastore, JPOX, PerfLogger +logger.NIOServerCnxn.name=org.apache.zookeeper.server.NIOServerCnxn +logger.NIOServerCnxn.level=WARN +logger.ClientCnxnSocketNIO.name=org.apache.zookeeper.ClientCnxnSocketNIO +logger.ClientCnxnSocketNIO.level=WARN +logger.DataNucleus.name=DataNucleus +logger.DataNucleus.level=ERROR +logger.Datastore.name=Datastore +logger.Datastore.level=ERROR +logger.JPOX.name=JPOX +logger.JPOX.level=ERROR +logger.PerfLogger.name=org.apache.hadoop.hive.ql.log.PerfLogger +logger.PerfLogger.level=${sys:hive.perflogger.log.level} +# root logger +rootLogger.level=${sys:hive.log.level} +rootLogger.appenderRefs=root +rootLogger.appenderRef.root.ref=${sys:hive.root.logger} \ No newline at end of file diff --git a/airbyte-integrations/connectors/destination-iceberg/src/test-integration/resources/metastore-site.xml b/airbyte-integrations/connectors/destination-iceberg/src/test-integration/resources/metastore-site.xml new file mode 100644 index 0000000000000..eb9da19414608 --- /dev/null +++ b/airbyte-integrations/connectors/destination-iceberg/src/test-integration/resources/metastore-site.xml @@ -0,0 +1,108 @@ + + + hive.metastore.warehouse.dir + file:///no/default/location/defined/please/create/new/schema/with/location/explicitly/set/ + + + + fs.s3.impl + org.apache.hadoop.fs.s3a.S3AFileSystem + + + + fs.s3n.impl + org.apache.hadoop.fs.s3a.S3AFileSystem + + + + fs.s3a.impl + org.apache.hadoop.fs.s3a.S3AFileSystem + + + + fs.s3a.endpoint + http://minio:9000 + + + + fs.s3a.access.key + DEFAULT_ACCESS_KEY + + + + fs.s3a.secret.key + DEFAULT_SECRET_KEY + + + + fs.s3a.path.style.access + true + + + + javax.jdo.option.ConnectionURL + jdbc:postgresql://postgres:5432/test + + + + javax.jdo.option.ConnectionDriverName + org.postgresql.Driver + + + + javax.jdo.option.ConnectionUserName + test + + + + javax.jdo.option.ConnectionPassword + test + + + + datanucleus.autoCreateSchema + false + + + + datanucleus.autoCreateTables + false + + + + datanucleus.autoCreateColumns + false + + + + datanucleus.autoCreateConstraints + false + + + + datanucleus.fixedDatastore + true + + + + datanucleus.autoStartMechanism + SchemaTable + + + + hive.metastore.schema.verification + true + + + + metastore.storage.schema.reader.impl + org.apache.hadoop.hive.metastore.SerDeStorageSchemaReader + + + + hive.security.authorization.createtable.owner.grants + ALL + The set of privileges automatically granted to the owner whenever a table gets created. + + + \ No newline at end of file diff --git a/airbyte-integrations/connectors/destination-iceberg/src/test/java/io/airbyte/integrations/destination/iceberg/EmptyIterator.java b/airbyte-integrations/connectors/destination-iceberg/src/test/java/io/airbyte/integrations/destination/iceberg/EmptyIterator.java new file mode 100644 index 0000000000000..cb9f117e0c31e --- /dev/null +++ b/airbyte-integrations/connectors/destination-iceberg/src/test/java/io/airbyte/integrations/destination/iceberg/EmptyIterator.java @@ -0,0 +1,42 @@ +/* + * Copyright (c) 2022 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.integrations.destination.iceberg; + +import java.io.IOException; +import org.apache.iceberg.data.Record; +import org.apache.iceberg.io.CloseableIterable; +import org.apache.iceberg.io.CloseableIterator; + +/** + * @author Leibniz on 2022/11/1. + */ +class EmptyIterator implements CloseableIterable { + + @Override + public CloseableIterator iterator() { + return new CloseableIterator() { + + @Override + public void close() throws IOException {} + + @Override + public boolean hasNext() { + return false; + } + + @Override + public Record next() { + return null; + } + + }; + } + + @Override + public void close() throws IOException { + + } + +} diff --git a/airbyte-integrations/connectors/destination-iceberg/src/test/java/io/airbyte/integrations/destination/iceberg/IcebergHiveCatalogConfigTest.java b/airbyte-integrations/connectors/destination-iceberg/src/test/java/io/airbyte/integrations/destination/iceberg/IcebergHiveCatalogConfigTest.java new file mode 100644 index 0000000000000..c8fa98c431f6e --- /dev/null +++ b/airbyte-integrations/connectors/destination-iceberg/src/test/java/io/airbyte/integrations/destination/iceberg/IcebergHiveCatalogConfigTest.java @@ -0,0 +1,231 @@ +/* + * Copyright (c) 2022 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.integrations.destination.iceberg; + +import static io.airbyte.integrations.destination.iceberg.IcebergConstants.FORMAT_TYPE_CONFIG_KEY; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.doThrow; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.mockStatic; +import static org.mockito.Mockito.when; + +import com.amazonaws.services.s3.AmazonS3; +import com.amazonaws.services.s3.model.AmazonS3Exception; +import com.amazonaws.services.s3.model.InitiateMultipartUploadRequest; +import com.amazonaws.services.s3.model.InitiateMultipartUploadResult; +import com.amazonaws.services.s3.model.ListObjectsRequest; +import com.amazonaws.services.s3.model.UploadPartRequest; +import com.amazonaws.services.s3.model.UploadPartResult; +import com.fasterxml.jackson.databind.JsonNode; +import com.google.common.collect.ImmutableMap; +import io.airbyte.commons.json.Jsons; +import io.airbyte.integrations.destination.iceberg.config.catalog.HiveCatalogConfig; +import io.airbyte.integrations.destination.iceberg.config.catalog.IcebergCatalogConfig; +import io.airbyte.integrations.destination.iceberg.config.catalog.IcebergCatalogConfigFactory; +import io.airbyte.integrations.destination.iceberg.config.format.FormatConfig; +import io.airbyte.integrations.destination.iceberg.config.storage.S3Config; +import io.airbyte.integrations.destination.iceberg.config.storage.credential.S3AccessKeyCredentialConfig; +import io.airbyte.protocol.models.AirbyteConnectionStatus; +import io.airbyte.protocol.models.AirbyteConnectionStatus.Status; +import java.io.IOException; +import java.util.Map; +import lombok.extern.slf4j.Slf4j; +import org.apache.hadoop.fs.s3a.S3AFileSystem; +import org.apache.iceberg.Schema; +import org.apache.iceberg.Table; +import org.apache.iceberg.TableScan; +import org.apache.iceberg.aws.s3.S3FileIO; +import org.apache.iceberg.catalog.Catalog; +import org.apache.iceberg.catalog.TableIdentifier; +import org.apache.iceberg.data.IcebergGenerics; +import org.apache.iceberg.data.IcebergGenerics.ScanBuilder; +import org.apache.iceberg.exceptions.AlreadyExistsException; +import org.apache.iceberg.spark.SparkCatalog; +import org.jetbrains.annotations.NotNull; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.mockito.MockedStatic; + +@Slf4j +class IcebergHiveCatalogConfigTest { + + private static final String FAKE_WAREHOUSE_URI = "s3a://fake-bucket"; + private static final String FAKE_ENDPOINT = "fake-endpoint"; + private static final String FAKE_ENDPOINT_WITH_SCHEMA = "https://fake-endpoint"; + private static final String FAKE_ACCESS_KEY_ID = "fake-accessKeyId"; + private static final String FAKE_SECRET_ACCESS_KEY = "fake-secretAccessKey"; + private static final String FAKE_THRIFT_URI = "thrift://fake-thrift-uri"; + private static MockedStatic mockedIcebergGenerics; + + private AmazonS3 s3; + private HiveCatalogConfig config; + private Catalog catalog; + private IcebergCatalogConfigFactory factory; + + @BeforeAll + static void staticSetup() { + IcebergHiveCatalogConfigTest.mockedIcebergGenerics = mockStatic(IcebergGenerics.class); + } + + @AfterAll + static void staticStop() { + IcebergHiveCatalogConfigTest.mockedIcebergGenerics.close(); + } + + @BeforeEach + void setup() throws IOException { + s3 = mock(AmazonS3.class); + final InitiateMultipartUploadResult uploadResult = mock(InitiateMultipartUploadResult.class); + final UploadPartResult uploadPartResult = mock(UploadPartResult.class); + when(s3.uploadPart(any(UploadPartRequest.class))).thenReturn(uploadPartResult); + when(s3.initiateMultipartUpload(any(InitiateMultipartUploadRequest.class))).thenReturn(uploadResult); + + TableScan tableScan = mock(TableScan.class); + when(tableScan.schema()).thenReturn(null); + Table tempTable = mock(Table.class); + when(tempTable.newScan()).thenReturn(tableScan); + ScanBuilder scanBuilder = mock(ScanBuilder.class); + when(scanBuilder.build()).thenReturn(new EmptyIterator()); + when(IcebergGenerics.read(tempTable)).thenReturn(scanBuilder); + + catalog = mock(Catalog.class); + when(catalog.createTable(any(), any())).thenReturn(tempTable); + when(catalog.dropTable(any())).thenReturn(true); + + config = new HiveCatalogConfig(FAKE_THRIFT_URI) { + + @Override + public Catalog genCatalog() { + return catalog; + } + + }; + config.setStorageConfig(S3Config.builder() + .warehouseUri(FAKE_WAREHOUSE_URI) + .bucketRegion("fake-region") + .endpoint(FAKE_ENDPOINT) + .endpointWithSchema(FAKE_ENDPOINT_WITH_SCHEMA) + .accessKeyId(FAKE_ACCESS_KEY_ID) + .secretKey(FAKE_SECRET_ACCESS_KEY) + .credentialConfig(new S3AccessKeyCredentialConfig(FAKE_ACCESS_KEY_ID, FAKE_SECRET_ACCESS_KEY)) + .s3Client(s3) + .build()); + config.setFormatConfig(new FormatConfig(Jsons.jsonNode(ImmutableMap.of(FORMAT_TYPE_CONFIG_KEY, "Parquet")))); + config.setDefaultOutputDatabase("default"); + + factory = new IcebergCatalogConfigFactory() { + + @Override + public IcebergCatalogConfig fromJsonNodeConfig(final @NotNull JsonNode jsonConfig) { + return config; + } + + }; + } + + /** + * Test that check will fail if IAM user does not have listObjects permission + */ + @Test + public void checksHiveCatalogWithoutS3ListObjectPermission() { + final IcebergDestination destinationFail = new IcebergDestination(factory); + doThrow(new AmazonS3Exception("Access Denied")).when(s3).listObjects(any(ListObjectsRequest.class)); + final AirbyteConnectionStatus status = destinationFail.check(null); + log.info("status={}", status); + assertEquals(Status.FAILED, status.getStatus(), "Connection check should have failed"); + assertTrue(status.getMessage().contains("Access Denied"), "Connection check returned wrong failure message"); + } + + @Test + public void checksTempTableAlreadyExists() { + final IcebergDestination destinationFail = new IcebergDestination(factory); + doThrow(new AlreadyExistsException("Table already exists: temp_1123412341234")).when(catalog) + .createTable(any(TableIdentifier.class), any(Schema.class)); + final AirbyteConnectionStatus status = destinationFail.check(null); + log.info("status={}", status); + assertEquals(Status.FAILED, status.getStatus(), "Connection check should have failed"); + assertTrue(status.getMessage().contains("Table already exists"), + "Connection check returned wrong failure message"); + } + + @Test + public void checksHiveThriftUri() throws IllegalAccessException { + final IcebergDestination destinationFail = new IcebergDestination(); + final AirbyteConnectionStatus status = destinationFail.check(Jsons.deserialize(""" + { + "catalog_config": { + "catalog_type": "Hive", + "hive_thrift_uri": "server:9083", + "database": "test" + }, + "storage_config": { + "storage_type": "S3", + "access_key_id": "xxxxxxxxxxx", + "secret_access_key": "yyyyyyyyyyyy", + "s3_warehouse_uri": "s3a://warehouse/hive", + "s3_bucket_region": "us-east-1", + "s3_endpoint": "your-own-minio-host:9000" + }, + "format_config": { + "format": "Parquet" + } + }""")); + log.info("status={}", status); + assertEquals(Status.FAILED, status.getStatus(), "Connection check should have failed"); + assertTrue(status.getMessage().contains("hive_thrift_uri must start with 'thrift://'"), + "Connection check returned wrong failure message"); + } + + /** + * Test that check will succeed when IAM user has all required permissions + */ + @Test + public void checksHiveCatalogWithS3Success() { + final IcebergDestination destinationSuccess = new IcebergDestination(factory); + final AirbyteConnectionStatus status = destinationSuccess.check(null); + assertEquals(Status.SUCCEEDED, status.getStatus(), "Connection check should have succeeded"); + } + + @Test + public void hiveCatalogSparkConfigTest() { + Map sparkConfig = config.sparkConfigMap(); + log.info("Spark Config for Hive-S3 catalog: {}", sparkConfig); + + // Catalog config + assertEquals("hive", sparkConfig.get("spark.sql.catalog.iceberg.type")); + assertEquals(FAKE_THRIFT_URI, sparkConfig.get("spark.sql.catalog.iceberg.uri")); + assertEquals(SparkCatalog.class.getName(), sparkConfig.get("spark.sql.catalog.iceberg")); + assertEquals(S3FileIO.class.getName(), sparkConfig.get("spark.sql.catalog.iceberg.io-impl")); + assertEquals(FAKE_WAREHOUSE_URI, sparkConfig.get("spark.sql.catalog.iceberg.warehouse")); + assertEquals(FAKE_ACCESS_KEY_ID, sparkConfig.get("spark.sql.catalog.iceberg.s3.access-key-id")); + assertEquals(FAKE_SECRET_ACCESS_KEY, sparkConfig.get("spark.sql.catalog.iceberg.s3.secret-access-key")); + assertEquals(FAKE_ENDPOINT_WITH_SCHEMA, sparkConfig.get("spark.sql.catalog.iceberg.s3.endpoint")); + assertEquals("false", sparkConfig.get("spark.sql.catalog.iceberg.s3.path-style-access")); + + // hadoop config + assertEquals(FAKE_ENDPOINT, sparkConfig.get("spark.hadoop.fs.s3a.endpoint")); + assertEquals(FAKE_ACCESS_KEY_ID, sparkConfig.get("spark.hadoop.fs.s3a.access.key")); + assertEquals(FAKE_SECRET_ACCESS_KEY, sparkConfig.get("spark.hadoop.fs.s3a.secret.key")); + assertEquals(S3AFileSystem.class.getName(), sparkConfig.get("spark.hadoop.fs.s3a.impl")); + assertEquals("false", sparkConfig.get("spark.hadoop.fs.s3a.connection.ssl.enabled")); + } + + @Test + public void s3ConfigForCatalogInitializeTest() { + Map properties = config.getStorageConfig().catalogInitializeProperties(); + log.info("S3 Config for HiveCatalog Initialize: {}", properties); + + assertEquals(S3FileIO.class.getName(), properties.get("io-impl")); + assertEquals(FAKE_ENDPOINT_WITH_SCHEMA, properties.get("s3.endpoint")); + assertEquals(FAKE_ACCESS_KEY_ID, properties.get("s3.access-key-id")); + assertEquals(FAKE_SECRET_ACCESS_KEY, properties.get("s3.secret-access-key")); + assertEquals("false", properties.get("s3.path-style-access")); + } + +} diff --git a/docs/integrations/destinations/iceberg.md b/docs/integrations/destinations/iceberg.md new file mode 100644 index 0000000000000..1083dbe6dd08e --- /dev/null +++ b/docs/integrations/destinations/iceberg.md @@ -0,0 +1,58 @@ +# Iceberg + +This page guides you through the process of setting up the Iceberg destination connector. + +## Sync overview + +### Output schema + +The incoming airbyte data is structured in keyspaces and tables and is partitioned and replicated across different nodes +in the cluster. This connector maps an incoming `stream` to an Iceberg `table` and a `namespace` to an +Iceberg `database`. Fields in the airbyte message become different columns in the Iceberg tables. Each table will +contain the following columns. + +* `_airbyte_ab_id`: A random generated uuid. +* `_airbyte_emitted_at`: a timestamp representing when the event was received from the data source. +* `_airbyte_data`: a json text representing the extracted data. + +### Features + +This section should contain a table with the following format: + +| Feature | Supported?(Yes/No) | Notes | +| :--- | :--- | :--- | +| Full Refresh Sync | ✅ | | +| Incremental Sync | ✅ | | +| Replicate Incremental Deletes | ❌ | | +| SSH Tunnel Support | ❌ | | + +### Performance considerations + +Every ten thousand pieces of incoming airbyte data in a stream ————we call it a batch, would produce one data file( +Parquet/Avro) in an Iceberg table. This batch size can be configurabled by `Data file flushing batch size` +property. +As the quantity of Iceberg data files grows, it causes an unnecessary amount of metadata and less efficient queries from +file open costs. +Iceberg provides data file compaction action to improve this case, you can read more about +compaction [HERE](https://iceberg.apache.org/docs/latest/maintenance/#compact-data-files). +This connector also provides auto compact action when stream closes, by `Auto compact data files` property. Any you can +specify the target size of compacted Iceberg data file. + +## Getting started + +### Requirements + +* **Iceberg catalog** : Iceberg uses `catalog` to manage tables. this connector already supports: + * [HiveCatalog](https://iceberg.apache.org/docs/latest/hive/#global-hive-catalog) connects to a **Hive metastore** + to keep track of Iceberg tables. + * [HadoopCatalog](https://iceberg.apache.org/docs/latest/java-api-quickstart/#using-a-hadoop-catalog) doesn’t need + to connect to a Hive MetaStore, but can only be used with **HDFS or similar file systems** that support atomic + rename. For `HadoopCatalog`, this connector use **Storage Config** (S3 or HDFS) to manage Iceberg tables. + * [JdbcCatalog](https://iceberg.apache.org/docs/latest/jdbc/) uses a table in a relational database to manage + Iceberg tables through JDBC. So far, this connector supports **PostgreSQL** only. +* **Storage medium** means where Iceberg data files storages in. So far, this connector supports **S3/S3N/S3N** + object-storage only. + +### Setup guide + +######TODO: more info