Skip to content

Commit

Permalink
Merge pull request #17111: [BEAM-14101] [CdapIO] Add ReceiverBuilder …
Browse files Browse the repository at this point in the history
…for SparkReceiverIO
  • Loading branch information
aromanenko-dev authored Jul 15, 2022
2 parents 2b42751 + 87267d5 commit b0225ca
Show file tree
Hide file tree
Showing 7 changed files with 339 additions and 0 deletions.
1 change: 1 addition & 0 deletions sdks/java/io/sparkreceiver/OWNERS
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
# See the OWNERS docs at https://s.apache.org/beam-owners
44 changes: 44 additions & 0 deletions sdks/java/io/sparkreceiver/build.gradle
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* License); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an AS IS BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

plugins {
id 'java'
id 'org.apache.beam.module'
}

applyJavaNature(
exportJavadoc: false,
automaticModuleName: 'org.apache.beam.sdk.io.sparkreceiver',
)
provideIntegrationTestingDependencies()
enableJavaPerformanceTesting()

description = "Apache Beam :: SDKs :: Java :: IO :: Spark Receiver"
ext.summary = """Apache Beam SDK provides a simple, Java-based
interface for streaming integration with CDAP plugins."""

dependencies {
implementation library.java.commons_lang3
implementation library.java.spark_streaming
implementation library.java.spark_core
implementation library.java.vendored_guava_26_0_jre
implementation project(path: ":sdks:java:core", configuration: "shadow")
compileOnly "org.scala-lang:scala-library:2.11.12"
testImplementation project(path: ":sdks:java:io:cdap", configuration: "testRuntimeMigration")
testImplementation library.java.junit
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.sdk.io.sparkreceiver;

import static org.apache.beam.sdk.util.Preconditions.checkStateNotNull;
import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkArgument;

import java.io.Serializable;
import java.lang.reflect.Constructor;
import java.lang.reflect.InvocationTargetException;
import org.apache.commons.lang3.ClassUtils;
import org.apache.spark.streaming.receiver.Receiver;

/**
* Class for building an instance for {@link Receiver} that uses Apache Beam mechanisms instead of
* Spark environment.
*/
public class ReceiverBuilder<X, T extends Receiver<X>> implements Serializable {

private final Class<T> sparkReceiverClass;
private Object[] constructorArgs;

public ReceiverBuilder(Class<T> sparkReceiverClass) {
this.sparkReceiverClass = sparkReceiverClass;
this.constructorArgs = new Object[0];
}

/** Method for specifying constructor arguments for corresponding {@link #sparkReceiverClass}. */
public ReceiverBuilder<X, T> withConstructorArgs(Object... args) {
this.constructorArgs = args;
return this;
}

/**
* @return Proxy for given {@param receiver} that doesn't use Spark environment and uses Apache
* Beam mechanisms instead.
*/
public T build()
throws InvocationTargetException, InstantiationException, IllegalAccessException {

Constructor<?> currentConstructor = null;
for (Constructor<?> constructor : sparkReceiverClass.getDeclaredConstructors()) {
Class<?>[] paramTypes = constructor.getParameterTypes();
if (paramTypes.length != constructorArgs.length) {
continue;
}
boolean matches = true;
for (int i = 0; i < constructorArgs.length; i++) {
Object arg = constructorArgs[i];

checkArgument(arg != null, "All args must be not null!");

Class<?> currArgClass = paramTypes[i];
if (currArgClass.isPrimitive()) {
currArgClass = ClassUtils.primitiveToWrapper(currArgClass);
}
if (!currArgClass.equals(arg.getClass())) {
matches = false;
break;
}
}
if (matches) {
currentConstructor = constructor;
}
}

checkStateNotNull(currentConstructor, "Can not find appropriate constructor!");

currentConstructor.setAccessible(true);
return sparkReceiverClass.cast(currentConstructor.newInstance(constructorArgs));
}

public Class<T> getSparkReceiverClass() {
return sparkReceiverClass;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.sdk.io.sparkreceiver;

import java.nio.ByteBuffer;
import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.spark.SparkConf;
import org.apache.spark.storage.StreamBlockId;
import org.apache.spark.streaming.receiver.BlockGenerator;
import org.apache.spark.streaming.receiver.BlockGeneratorListener;
import org.apache.spark.streaming.receiver.Receiver;
import org.apache.spark.streaming.receiver.ReceiverSupervisor;
import scala.Option;
import scala.collection.Iterator;
import scala.collection.mutable.ArrayBuffer;

/** Wrapper class for {@link ReceiverSupervisor} that doesn't use Spark Environment. */
@SuppressWarnings("return.type.incompatible")
public class WrappedSupervisor extends ReceiverSupervisor {

private final SerializableFunction<Object[], Void> storeFn;

public WrappedSupervisor(
Receiver<?> receiver, SparkConf conf, SerializableFunction<Object[], Void> storeFn) {
super(receiver, conf);
this.storeFn = storeFn;
}

@Override
public void pushSingle(Object o) {
storeFn.apply(new Object[] {o});
}

@Override
public void pushBytes(
ByteBuffer byteBuffer, Option<Object> option, Option<StreamBlockId> option1) {
storeFn.apply(new Object[] {byteBuffer, option, option1});
}

@Override
public void pushIterator(
Iterator<?> iterator, Option<Object> option, Option<StreamBlockId> option1) {
storeFn.apply(new Object[] {iterator, option, option1});
}

@Override
public void pushArrayBuffer(
ArrayBuffer<?> arrayBuffer, Option<Object> option, Option<StreamBlockId> option1) {
storeFn.apply(new Object[] {arrayBuffer, option, option1});
}

@Override
public BlockGenerator createBlockGenerator(BlockGeneratorListener blockGeneratorListener) {
return null;
}

@Override
public void reportError(String s, Throwable throwable) {}

@Override
public boolean onReceiverStart() {
return true;
}

@Override
public long getCurrentRateLimit() {
return Integer.MAX_VALUE;
}

@Override
public boolean isReceiverStopped() {
return super.isReceiverStopped();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

/** Transforms for reading and writing from streaming CDAP plugins. */
@Experimental(Kind.SOURCE_SINK)
package org.apache.beam.sdk.io.sparkreceiver;

import org.apache.beam.sdk.annotations.Experimental;
import org.apache.beam.sdk.annotations.Experimental.Kind;
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.sdk.io.sparkreceiver;

import static org.junit.Assert.assertTrue;

import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.spark.SparkConf;
import org.apache.spark.storage.StorageLevel;
import org.apache.spark.streaming.receiver.Receiver;
import org.apache.spark.streaming.receiver.ReceiverSupervisor;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/** Test class for {@link ReceiverBuilder}. */
@RunWith(JUnit4.class)
public class ReceiverBuilderTest {

private static final Logger LOG = LoggerFactory.getLogger(ReceiverBuilderTest.class);

public static final String TEST_MESSAGE = "testMessage";

private static class CustomReceiver extends Receiver<String> {

public CustomReceiver(StorageLevel storageLevel) {
super(storageLevel);
}

@Override
public void onStart() {
LOG.info("Receiver onStart()");
}

@Override
public void onStop() {
LOG.info("Receiver onStop()");
}
}

/**
* If this test passed, then object for Custom {@link
* org.apache.spark.streaming.receiver.Receiver} was created successfully, and the corresponding
* {@link ReceiverSupervisor} was wrapped into {@link WrappedSupervisor}.
*/
@Test
public void testCreatingCustomSparkReceiver() {
try {

AtomicBoolean customStoreConsumerWasUsed = new AtomicBoolean(false);
ReceiverBuilder<String, CustomReceiver> receiverBuilder =
new ReceiverBuilder<>(CustomReceiver.class);
Receiver<String> receiver =
receiverBuilder.withConstructorArgs(StorageLevel.DISK_ONLY()).build();
new WrappedSupervisor(
receiver,
new SparkConf(),
args -> {
customStoreConsumerWasUsed.set(true);
return null;
});

receiver.onStart();
assertTrue(receiver.supervisor() instanceof WrappedSupervisor);

receiver.store(TEST_MESSAGE);
assertTrue(customStoreConsumerWasUsed.get());
} catch (Exception e) {
LOG.error("Can not get receiver", e);
}
}
}
1 change: 1 addition & 0 deletions settings.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,7 @@ include(":sdks:java:io:expansion-service")
include(":sdks:java:io:file-based-io-tests")
include(":sdks:java:io:bigquery-io-perf-tests")
include(":sdks:java:io:cdap")
include(":sdks:java:io:sparkreceiver")
include(":sdks:java:io:google-cloud-platform")
include(":sdks:java:io:google-cloud-platform:expansion-service")
include(":sdks:java:io:hadoop-common")
Expand Down

0 comments on commit b0225ca

Please sign in to comment.