Skip to content

Commit

Permalink
[#3362] feat(flink-connector): Add the code skeleton for flink-connec…
Browse files Browse the repository at this point in the history
…tor (#2635)

<!--
1. Title: [#<issue>] <type>(<scope>): <subject>
   Examples:
     - "[#123] feat(operator): support xxx"
     - "[#233] fix: check null before access result in xxx"
     - "[MINOR] refactor: fix typo in variable name"
     - "[MINOR] docs: fix typo in README"
     - "[#255] test: fix flaky test NameOfTheTest"
   Reference: https://www.conventionalcommits.org/en/v1.0.0/
2. If the PR is unfinished, please mark this PR as draft.
-->

### What changes were proposed in this pull request?

- support GravitinoCatalogStore to register the catalog. In the MR, we
will support to create the hive catalog.

### Why are the changes needed?

- Fix #3362 

### Does this PR introduce _any_ user-facing change?

- support flink in gravitino 

### How was this patch tested?

- add UTs
  • Loading branch information
coolderli authored May 24, 2024
1 parent 505a447 commit 2113efd
Show file tree
Hide file tree
Showing 26 changed files with 1,707 additions and 4 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/backend-integration-test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ jobs:
- name: Backend Integration Test
id: integrationTest
run: |
./gradlew test --rerun-tasks -PskipTests -PtestMode=${{ matrix.test-mode }} -PjdkVersion=${{ matrix.java-version }} -PskipWebITs -P${{ matrix.backend }} -PskipPyClientITs
./gradlew test --rerun-tasks -PskipTests -PtestMode=${{ matrix.test-mode }} -PjdkVersion=${{ matrix.java-version }} -PskipWebITs -P${{ matrix.backend }} -PskipPyClientITs -PskipFlinkITs
- name: Upload integrate tests reports
uses: actions/upload-artifact@v3
Expand Down
1 change: 1 addition & 0 deletions .github/workflows/build.yml
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ jobs:
- server/**
- server-common/**
- spark-connector/**
- flink-connector/**
- trino-connector/**
- web/**
- docs/open-api/**
Expand Down
108 changes: 108 additions & 0 deletions .github/workflows/flink-integration-test.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
name: Flink Integration Test

# Controls when the workflow will run
on:
# Triggers the workflow on push or pull request events but only for the "main" branch
push:
branches: [ "main", "branch-*" ]
pull_request:
branches: [ "main", "branch-*" ]

concurrency:
group: ${{ github.workflow }}-${{ github.event.pull_request.number || github.ref }}
cancel-in-progress: true

jobs:
changes:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v3
- uses: dorny/paths-filter@v2
id: filter
with:
filters: |
source_changes:
- .github/**
- api/**
- bin/**
- catalogs/**
- clients/client-java/**
- clients/client-java-runtime/**
- clients/filesystem-hadoop3/**
- clients/filesystem-hadoop3-runtime/**
- common/**
- conf/**
- core/**
- dev/**
- gradle/**
- meta/**
- server/**
- server-common/**
- flink-connector/**
- docs/open-api/**
- build.gradle.kts
- gradle.properties
- gradlew
- setting.gradle.kts
outputs:
source_changes: ${{ steps.filter.outputs.source_changes }}

# Integration test for AMD64 architecture
test-amd64-arch:
needs: changes
if: needs.changes.outputs.source_changes == 'true'
runs-on: ubuntu-latest
timeout-minutes: 30
strategy:
matrix:
architecture: [linux/amd64]
java-version: [ 8, 11, 17 ]
env:
PLATFORM: ${{ matrix.architecture }}
steps:
- uses: actions/checkout@v3

- uses: actions/setup-java@v3
with:
java-version: ${{ matrix.java-version }}
distribution: 'temurin'

- name: Set up QEMU
uses: docker/setup-qemu-action@v2

- name: Check required command
run: |
dev/ci/check_commands.sh
- name: Package Gravitino
run: |
./gradlew build -x test -PjdkVersion=${{ matrix.java-version }}
./gradlew compileDistribution -x test -PjdkVersion=${{ matrix.java-version }}
- name: Setup debug Github Action
if: ${{ contains(github.event.pull_request.labels.*.name, 'debug action') }}
uses: csexton/debugger-action@master

- name: Free up disk space
run: |
dev/ci/util_free_space.sh
- name: Flink Integration Test
id: integrationTest
run: |
./gradlew --rerun-tasks -PskipTests -PtestMode=embedded -PjdkVersion=${{ matrix.java-version }} :flink-connector:test --tests "com.datastrato.gravitino.flink.connector.integration.test.**"
./gradlew --rerun-tasks -PskipTests -PtestMode=deploy -PjdkVersion=${{ matrix.java-version }} :flink-connector:test --tests "com.datastrato.gravitino.flink.connector.integration.test.**"
- name: Upload integrate tests reports
uses: actions/upload-artifact@v3
if: ${{ (failure() && steps.integrationTest.outcome == 'failure') || contains(github.event.pull_request.labels.*.name, 'upload log') }}
with:
name: flink-connector-integrate-test-reports-${{ matrix.java-version }}
path: |
build/reports
flink-connector/build/flink-connector-integration-test.log
flink-connector/build/*.tar
distribution/package/logs/gravitino-server.out
distribution/package/logs/gravitino-server.log
catalogs/**/*.log
catalogs/**/*.tar
5 changes: 3 additions & 2 deletions build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -618,7 +618,7 @@ tasks {
subprojects.forEach() {
if (!it.name.startsWith("catalog") &&
!it.name.startsWith("client") && !it.name.startsWith("filesystem") && !it.name.startsWith("spark-connector") && it.name != "trino-connector" &&
it.name != "integration-test" && it.name != "bundled-catalog"
it.name != "integration-test" && it.name != "bundled-catalog" && it.name != "flink-connector"
) {
from(it.configurations.runtimeClasspath)
into("distribution/package/libs")
Expand All @@ -634,7 +634,8 @@ tasks {
!it.name.startsWith("spark-connector") &&
it.name != "trino-connector" &&
it.name != "integration-test" &&
it.name != "bundled-catalog"
it.name != "bundled-catalog" &&
it.name != "flink-connector"
) {
dependsOn("${it.name}:build")
from("${it.name}/build/libs")
Expand Down
131 changes: 131 additions & 0 deletions flink-connector/build.gradle.kts
Original file line number Diff line number Diff line change
@@ -0,0 +1,131 @@
/*
* Copyright 2024 Datastrato Pvt Ltd.
* This software is licensed under the Apache License version 2.
*/
plugins {
`maven-publish`
id("java")
id("idea")
}

repositories {
mavenCentral()
}

val flinkVersion: String = libs.versions.flink.get()
val scalaVersion: String = project.properties["scalaVersion"] as? String ?: extra["defaultScalaVersion"].toString()

dependencies {
implementation(project(":api"))
implementation(project(":common"))
implementation(project(":core"))
implementation(project(":clients:client-java"))
implementation(project(":catalogs:bundled-catalog", configuration = "shadow"))

implementation(libs.bundles.log4j)
implementation(libs.commons.lang3)
implementation(libs.guava)
implementation(libs.httpclient5)
implementation(libs.jackson.databind)
implementation(libs.jackson.annotations)
implementation(libs.jackson.datatype.jdk8)
implementation(libs.jackson.datatype.jsr310)

implementation("org.apache.flink:flink-connector-hive_$scalaVersion:$flinkVersion")
implementation("org.apache.flink:flink-table-common:$flinkVersion")
implementation("org.apache.flink:flink-table-api-java:$flinkVersion")

implementation(libs.hive2.exec) {
artifact {
classifier = "core"
}
exclude("com.fasterxml.jackson.core")
exclude("com.google.code.findbugs", "jsr305")
exclude("com.google.protobuf")
exclude("org.apache.avro")
exclude("org.apache.calcite")
exclude("org.apache.calcite.avatica")
exclude("org.apache.curator")
exclude("org.apache.hadoop", "hadoop-yarn-server-resourcemanager")
exclude("org.apache.logging.log4j")
exclude("org.apache.zookeeper")
exclude("org.eclipse.jetty.aggregate", "jetty-all")
exclude("org.eclipse.jetty.orbit", "javax.servlet")
exclude("org.openjdk.jol")
exclude("org.pentaho")
exclude("org.slf4j")
}

testAnnotationProcessor(libs.lombok)

testCompileOnly(libs.lombok)
testImplementation(project(":integration-test-common", "testArtifacts"))
testImplementation(project(":server"))
testImplementation(project(":server-common"))
testImplementation(libs.junit.jupiter.api)
testImplementation(libs.junit.jupiter.params)
testImplementation(libs.mockito.core)
testImplementation(libs.sqlite.jdbc)
testImplementation(libs.testcontainers)
testImplementation(libs.testcontainers.junit.jupiter)
testImplementation(libs.testcontainers.mysql)

testImplementation(libs.hadoop2.common) {
exclude("*")
}
testImplementation(libs.hadoop2.mapreduce.client.core) {
exclude("*")
}
testImplementation(libs.hive2.common) {
exclude("org.eclipse.jetty.aggregate", "jetty-all")
exclude("org.eclipse.jetty.orbit", "javax.servlet")
}
testImplementation(libs.hive2.metastore) {
exclude("co.cask.tephra")
exclude("com.github.joshelser")
exclude("com.google.code.findbugs", "jsr305")
exclude("com.google.code.findbugs", "sr305")
exclude("com.tdunning", "json")
exclude("com.zaxxer", "HikariCP")
exclude("io.dropwizard.metricss")
exclude("javax.transaction", "transaction-api")
exclude("org.apache.avro")
exclude("org.apache.curator")
exclude("org.apache.hbase")
exclude("org.apache.hadoop", "hadoop-yarn-server-resourcemanager")
exclude("org.apache.logging.log4j")
exclude("org.apache.parquet", "parquet-hadoop-bundle")
exclude("org.apache.zookeeper")
exclude("org.eclipse.jetty.aggregate", "jetty-all")
exclude("org.eclipse.jetty.orbit", "javax.servlet")
exclude("org.slf4j")
}
testImplementation("org.apache.flink:flink-table-api-bridge-base:$flinkVersion") {
exclude("commons-cli", "commons-cli")
exclude("commons-io", "commons-io")
exclude("com.google.code.findbugs", "jsr305")
}
testImplementation("org.apache.flink:flink-table-planner_$scalaVersion:$flinkVersion")

testRuntimeOnly(libs.junit.jupiter.engine)
}

tasks.test {
val skipUTs = project.hasProperty("skipTests")
if (skipUTs) {
// Only run integration tests
include("**/integration/**")
}

val skipITs = project.hasProperty("skipITs")
val skipFlinkITs = project.hasProperty("skipFlinkITs")
if (skipITs || skipFlinkITs) {
// Exclude integration tests
exclude("**/integration/**")
} else {
dependsOn(tasks.jar)

val init = project.extra.get("initIntegrationTest") as (Test) -> Unit
init(this)
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
/*
* Copyright 2024 Datastrato Pvt Ltd.
* This software is licensed under the Apache License version 2.
*/

package com.datastrato.gravitino.flink.connector;

import java.util.Map;
import org.apache.flink.configuration.Configuration;

/**
* PropertiesConverter is used to convert properties between Flink properties and Gravitino
* properties
*/
public interface PropertiesConverter {

String FLINK_PROPERTY_PREFIX = "flink.bypass.";

/**
* Converts properties from application provided properties and Flink connector properties to
* Gravitino properties.
*
* @param flinkConf The configuration provided by Flink.
* @return properties for the Gravitino connector.
*/
default Map<String, String> toGravitinoCatalogProperties(Configuration flinkConf) {
return flinkConf.toMap();
}

/**
* Converts properties from Gravitino properties to Flink connector properties.
*
* @param gravitinoProperties The properties provided by Gravitino.
* @return properties for the Flink connector.
*/
default Map<String, String> toFlinkCatalogProperties(Map<String, String> gravitinoProperties) {
return gravitinoProperties;
}
}
Loading

0 comments on commit 2113efd

Please sign in to comment.