Skip to content

Commit

Permalink
A Transform Service that uses Docker Compose (apache#26023)
Browse files Browse the repository at this point in the history
* A Transform Service that uses Docker Compose

* Adds supports for the Python expansion service

* Fix spotless and remove unused config file

* Fix spotless

* Add licenses

* Fix spotless and adds code to copy licenses to Docker containers

* Fix checkstyle and artifact request forwarding

* Adding unit tests for the controller

* Adds support for specifying credentials via a volume

* Rebasing to fix test failures

* Use correct dependencies for Schema-aware transforms

* Addreses reviewer comments

* Addressing reviewer comments
  • Loading branch information
chamikaramj authored and cushon committed May 24, 2024
1 parent 24a816b commit 569c7ba
Show file tree
Hide file tree
Showing 33 changed files with 2,046 additions and 1 deletion.
1 change: 1 addition & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@
* Allow passing service name for google-cloud-profiler (Python) ([#26280](https://github.com/apache/beam/issues/26280)).
* Dead letter queue support added to RunInference in Python ([#24209](https://github.com/apache/beam/issues/24209)).
* Support added for defining pre/postprocessing operations on the RunInference transform ([#26308](https://github.com/apache/beam/issues/26308))
* Adds a Docker Compose based transform service that can be used to discover and use portable Beam transforms ([#26023](https://github.com/apache/beam/pull/26023)).

## Breaking Changes

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,12 @@ public ExpansionApi.ExpansionResponse expand(ExpansionApi.ExpansionRequest reque
return service.expand(request);
}

@Override
public ExpansionApi.DiscoverSchemaTransformResponse discover(
ExpansionApi.DiscoverSchemaTransformRequest request) {
return service.discoverSchemaTransform(request);
}

@Override
public void close() throws Exception {
channel.shutdown();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,4 +22,7 @@
/** A high-level client for a cross-language expansion service. */
public interface ExpansionServiceClient extends AutoCloseable {
ExpansionApi.ExpansionResponse expand(ExpansionApi.ExpansionRequest request);

ExpansionApi.DiscoverSchemaTransformResponse discover(
ExpansionApi.DiscoverSchemaTransformRequest request);
}
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,12 @@ public ExpansionApi.ExpansionResponse expand(ExpansionApi.ExpansionRequest reque
return response;
}

@Override
public ExpansionApi.DiscoverSchemaTransformResponse discover(
ExpansionApi.DiscoverSchemaTransformRequest request) {
return null;
}

@Override
public void close() throws Exception {
// do nothing
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2447,6 +2447,12 @@ public ExpansionApi.ExpansionResponse expand(ExpansionApi.ExpansionRequest reque
return response;
}

@Override
public ExpansionApi.DiscoverSchemaTransformResponse discover(
ExpansionApi.DiscoverSchemaTransformRequest request) {
return null;
}

@Override
public void close() throws Exception {
// do nothing
Expand Down
49 changes: 49 additions & 0 deletions sdks/java/expansion-service/container/Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
###############################################################################
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
###############################################################################

FROM eclipse-temurin:11
LABEL Author "Apache Beam <dev@beam.apache.org>"
ARG TARGETOS
ARG TARGETARCH

WORKDIR /opt/apache/beam

# Copy dependencies generated by the Gradle build.
COPY target/beam-sdks-java-io-expansion-service.jar jars/
COPY target/beam-sdks-java-io-google-cloud-platform-expansion-service.jar jars/
COPY target/beam-sdks-java-extensions-schemaio-expansion-service.jar jars/

# Copy licenses
COPY target/LICENSE /opt/apache/beam/
COPY target/NOTICE /opt/apache/beam/

# Copy the boot program
COPY target/launcher/${TARGETOS}_${TARGETARCH}/boot /opt/apache/beam/


# Copy the config file
COPY target/expansion_service_config.yml ./

# Add golang licenses.
COPY target/go-licenses/* /opt/apache/beam/third_party_licenses/golang/
RUN if [ "${pull_licenses}" = "false" ] ; then \
# Remove above license dir if pull licenses false
rm -rf /opt/apache/beam/third_party_licenses ; \
fi

ENTRYPOINT ["/opt/apache/beam/boot", "-config_file", "./expansion_service_config.yml", "-dependencies_dir", "jars/"]
96 changes: 96 additions & 0 deletions sdks/java/expansion-service/container/boot.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
// Licensed to the Apache Software Foundation (ASF) under one or more
// contributor license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright ownership.
// The ASF licenses this file to You under the Apache License, Version 2.0
// (the "License"); you may not use this file except in compliance with
// the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

// Boot code for the Java SDK expansion service.
// Contract:
// https://github.com/apache/beam/blob/master/model/job-management/src/main/proto/org/apache/beam/model/job_management/v1/beam_expansion_api.proto
// https://github.com/apache/beam/blob/master/model/job-management/src/main/proto/org/apache/beam/model/job_management/v1/beam_artifact_api.proto
package main

import (
// "context"
"flag"
"fmt"
"io/ioutil"
"log"
"os"
"path/filepath"
"strconv"
"strings"

"github.com/apache/beam/sdks/v2/go/pkg/beam/util/execx"
)

// Args:
// - Expansion service port
// - Dependencies (for loading SchemaTransforms)
// - Config file path. Config file contains:
// - Allow-list
// - Per-transform dependencies config.
var (
id = flag.String("id", "", "Local identifier (required)")
port = flag.Int("port", 0, "Port for the expansion service (required)")
dependencies_dir = flag.String("dependencies_dir", "", "A directory containing the set of jar files to load transforms from (required)")
config_file = flag.String("config_file", "", "Expansion service config YAML file. (required)")
)

const entrypoint = "org.apache.beam.sdk.expansion.service.ExpansionService"

func main() {
flag.Parse()

if *id == "" {
log.Fatalf("The flag 'id' was not specified")
}
if *port == 0 {
log.Fatalf("The flag 'port' was not specified")
}
if *dependencies_dir == "" {
log.Fatalf("The flag 'dependencies_dir' was not specified")
}
if *config_file == "" {
log.Fatalf("The flag 'config_file' was not specified")
}

log.Printf("Starting the Java expansion service container %v.", *id)

// Determine all jar files from the dipendencies_dir to be used for the CLASSPATH.
files, _ := ioutil.ReadDir(*dependencies_dir)
cp := []string{}
path, _ := os.Getwd()
for _, file := range files {
cp = append(cp, filepath.Join(path, *dependencies_dir, file.Name()))
}

args := []string{
// Seting max RAM percentage to a high value since we are running a single JVM within the container.
"-XX:MaxRAMPercentage=80.0",
// Keep following JVM options in sync with other Java containers released with Beam.
"-XX:+UseParallelGC",
"-XX:+AlwaysActAsServerClassMachine",
"-XX:-OmitStackTraceInFastThrow",
"-cp", strings.Join(cp, ":"),
}

args = append(args, entrypoint)
args = append(args, strconv.Itoa(*port))

if *config_file != "" {
args = append(args, fmt.Sprintf("--expansionServiceConfigFile=%s", *config_file))
}

log.Printf("Executing: java %v", strings.Join(args, " "))
log.Fatalf("Java exited: %v", execx.Execute("java", args...))
}
100 changes: 100 additions & 0 deletions sdks/java/expansion-service/container/build.gradle
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* License); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an AS IS BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

plugins {
id 'org.apache.beam.module'
id 'com.github.jk1.dependency-license-report' version '1.16'
}

applyDockerNature()
applyGoNature()

evaluationDependsOn(":sdks:java:extensions:schemaio-expansion-service")
evaluationDependsOn(":sdks:java:io:expansion-service")
evaluationDependsOn(":sdks:java:io:google-cloud-platform:expansion-service")

description = "Apache Beam :: SDKs :: Java :: Expansion-service :: Container"

configurations {
dockerDependency
expansionServiceLauncher
}

dependencies {
dockerDependency project(path: ":sdks:java:extensions:schemaio-expansion-service", configuration: "shadow")
dockerDependency project(path: ":sdks:java:io:expansion-service", configuration: "shadow")
dockerDependency project(path: ":sdks:java:io:google-cloud-platform:expansion-service", configuration: "shadow")
}

goBuild {
goTargets = '*.go' // only build the immediate directory.
outputLocation = './build/target/launcher/${GOOS}_${GOARCH}/boot'
}

task copyDockerfileDependencies(type: Copy) {
from configurations.dockerDependency
rename 'beam-sdks-java-extensions-schemaio-expansion-service-.*.jar', 'beam-sdks-java-extensions-schemaio-expansion-service.jar'
rename 'beam-sdks-java-io-expansion-service-.*.jar', 'beam-sdks-java-io-expansion-service.jar'
rename 'beam-sdks-java-io-google-cloud-platform-expansion-service-.*.jar', 'beam-sdks-java-io-google-cloud-platform-expansion-service.jar'
setDuplicatesStrategy(DuplicatesStrategy.INCLUDE)
into "build/target"
}

task copyConfigFile(type: Copy) {
from "expansion_service_config.yml"
into "build/target"
}

docker {
name containerImageName(
name: project.docker_image_default_repo_prefix + "java_expansion_service",
root: project.rootProject.hasProperty(["docker-repository-root"]) ?
project.rootProject["docker-repository-root"] :
project.docker_image_default_repo_root,
tag: project.rootProject.hasProperty(["docker-tag"]) ?
project.rootProject["docker-tag"] : project.sdk_version)
// tags used by dockerTag task
tags containerImageTags()
files "./build"
buildx project.containerPlatforms() != [project.nativeArchitecture()]
platform(*project.containerPlatforms())
}

dockerPrepare.dependsOn goBuild
dockerPrepare.dependsOn copyConfigFile
dockerPrepare.dependsOn copyDockerfileDependencies

if (project.rootProject.hasProperty(["docker-pull-licenses"])) {
def copyGolangLicenses = tasks.register("copyGolangLicenses", Copy) {
from "${project(':release:go-licenses:py').buildDir}/output"
into "build/target/go-licenses"
dependsOn ':release:go-licenses:py:createLicenses'
}
dockerPrepare.dependsOn copyGolangLicenses
} else {
def skipPullLicenses = tasks.register("skipPullLicenses", Exec) {
executable "sh"
// Touch a dummy file to ensure the directory exists.
args "-c", "mkdir -p build/target/go-licenses && touch build/target/go-licenses/skip"
}
dockerPrepare.dependsOn skipPullLicenses
}

task pushAll {
dependsOn dockerPush
}
32 changes: 32 additions & 0 deletions sdks/java/expansion-service/container/expansion_service_config.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

allowlist:
- "beam:transform:org.apache.beam:kafka_read_with_metadata:v1"
- "beam:transform:org.apache.beam:kafka_read_without_metadata:v1"
- "beam:transform:org.apache.beam:kafka_write:v1"
- "beam:transform:org.apache.beam:schemaio_jdbc_read:v1"
- "beam:transform:org.apache.beam:schemaio_jdbc_write:v1"
- "beam:schematransform:org.apache.beam:bigquery_storage_write:v1"
dependencies:
"beam:transform:org.apache.beam:kafka_read_with_metadata:v1":
- path: "jars/beam-sdks-java-io-expansion-service.jar"
"beam:transform:org.apache.beam:kafka_read_without_metadata:v1":
- path: "jars/beam-sdks-java-io-expansion-service.jar"
"beam:transform:org.apache.beam:kafka_write:v1":
- path: "jars/beam-sdks-java-io-expansion-service.jar"
"beam:transform:org.apache.beam:schemaio_jdbc_read:v1":
- path: "jars/beam-sdks-java-extensions-schemaio-expansion-service.jar"
"beam:transform:org.apache.beam:schemaio_jdbc_write:v1":
- path: "jars/beam-sdks-java-extensions-schemaio-expansion-service.jar"
"beam:schematransform:org.apache.beam:bigquery_storage_write:v1":
- path: "jars/beam-sdks-java-io-google-cloud-platform-expansion-service.jar"
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.sdk.expansion.service;

import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.auto.value.AutoValue;

//// TODO(https://github.com/apache/beam/issues/26527): generalize to support other types of
// dependencies
@AutoValue
public abstract class Dependency {
abstract String getPath();

@JsonCreator
static Dependency create(@JsonProperty("path") String path) {
return new AutoValue_Dependency(path);
}
}
Loading

0 comments on commit 569c7ba

Please sign in to comment.