Skip to content

Commit

Permalink
[RRIO] Create Caller and SetupTeardown interfaces (#28905)
Browse files Browse the repository at this point in the history
* Create test Caller and SetupTeardown interfaces

* Update Javadoc

* Defer Call transform to future PR

* Rename package to requestresponseio

* Add username to TODO
  • Loading branch information
damondouglas authored Oct 11, 2023
1 parent 6bab4b5 commit 104c10b
Show file tree
Hide file tree
Showing 9 changed files with 462 additions and 3 deletions.
7 changes: 4 additions & 3 deletions sdks/java/io/rrio/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,10 @@ description = "Apache Beam :: SDKS :: Java :: IO :: RequestResponseIO (RRIO)"
ext.summary = "Support to read from and write to Web APIs"

dependencies {
implementation project(path: ":sdks:java:core", configuration: "shadow")
implementation library.java.joda_time
implementation library.java.vendored_guava_32_1_2_jre
// TODO(damondouglas): revert to implementation after project is more fully developed
permitUnusedDeclared project(path: ":sdks:java:core", configuration: "shadow")
permitUnusedDeclared library.java.joda_time
permitUnusedDeclared library.java.vendored_guava_32_1_2_jre

testImplementation project(path: ":sdks:java:core", configuration: "shadowTest")
testImplementation library.java.junit
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.io.requestresponseio;

import java.io.Serializable;

/** {@link Caller} interfaces user custom code intended for API calls. */
public interface Caller<RequestT, ResponseT> extends Serializable {

/** Calls a Web API with the {@link RequestT} and returns a {@link ResponseT}. */
ResponseT call(RequestT request) throws UserCodeExecutionException;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.io.requestresponseio;

import java.io.Serializable;

/**
* Provided by user and called within {@link org.apache.beam.sdk.transforms.DoFn.Setup} and @{link
* org.apache.beam.sdk.transforms.DoFn.Teardown} lifecycle methods of {@link Call}'s {@link
* org.apache.beam.sdk.transforms.DoFn}.
*/
public interface SetupTeardown extends Serializable {

/** Called during the {@link org.apache.beam.sdk.transforms.DoFn}'s setup lifecycle method. */
void setup() throws UserCodeExecutionException;

/** Called during the {@link org.apache.beam.sdk.transforms.DoFn}'s teardown lifecycle method. */
void teardown() throws UserCodeExecutionException;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.io.requestresponseio;

/** Base {@link Exception} for signaling errors in user custom code. */
public class UserCodeExecutionException extends Exception {
public UserCodeExecutionException(String message) {
super(message);
}

public UserCodeExecutionException(String message, Throwable cause) {
super(message, cause);
}

public UserCodeExecutionException(Throwable cause) {
super(cause);
}

public UserCodeExecutionException(
String message, Throwable cause, boolean enableSuppression, boolean writableStackTrace) {
super(message, cause, enableSuppression, writableStackTrace);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.io.requestresponseio;

/**
* Extends {@link UserCodeQuotaException} to allow the user custom code to specifically signal a
* Quota or API overuse related error.
*/
public class UserCodeQuotaException extends UserCodeExecutionException {

public UserCodeQuotaException(String message) {
super(message);
}

public UserCodeQuotaException(String message, Throwable cause) {
super(message, cause);
}

public UserCodeQuotaException(Throwable cause) {
super(cause);
}

public UserCodeQuotaException(
String message, Throwable cause, boolean enableSuppression, boolean writableStackTrace) {
super(message, cause, enableSuppression, writableStackTrace);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.io.requestresponseio;

/** An extension of {@link UserCodeQuotaException} to specifically signal a user code timeout. */
public class UserCodeTimeoutException extends UserCodeExecutionException {

public UserCodeTimeoutException(String message) {
super(message);
}

public UserCodeTimeoutException(String message, Throwable cause) {
super(message, cause);
}

public UserCodeTimeoutException(Throwable cause) {
super(cause);
}

public UserCodeTimeoutException(
String message, Throwable cause, boolean enableSuppression, boolean writableStackTrace) {
super(message, cause, enableSuppression, writableStackTrace);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

/** Package provides Beam I/O transform support for safely reading from and writing to Web APIs. */
package org.apache.beam.io.requestresponseio;
Original file line number Diff line number Diff line change
@@ -0,0 +1,126 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.io.rrio;

import static org.apache.beam.sdk.util.Preconditions.checkStateNotNull;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertThrows;

import org.apache.beam.io.requestresponseio.Caller;
import org.apache.beam.io.requestresponseio.UserCodeExecutionException;
import org.apache.beam.io.requestresponseio.UserCodeQuotaException;
import org.apache.beam.io.requestresponseio.UserCodeTimeoutException;
import org.apache.beam.sdk.Pipeline.PipelineExecutionException;
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.coders.VarIntCoder;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.util.SerializableUtils;
import org.joda.time.Instant;
import org.junit.Rule;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;

/** Tests for {@link Caller}. */
@RunWith(JUnit4.class)
public class CallerTest {

@Rule public TestPipeline pipeline = TestPipeline.create();

@Test
public void canSerializeImplementingClasses() {
SerializableUtils.serializeToByteArray(new CallerImpl());
}

@Test
public void canSerializeWhenUsedInDoFn() {
pipeline
.apply(Create.of(Instant.now()))
.apply(ParDo.of(new CallerUsingDoFn<>(new CallerImpl())))
.setCoder(StringUtf8Coder.of());

pipeline.run();
}

@Test
public void canSignalQuotaException() {
pipeline
.apply(Create.of(1))
.apply(ParDo.of(new CallerUsingDoFn<>(new CallerThrowsQuotaException())))
.setCoder(VarIntCoder.of());

PipelineExecutionException executionException =
assertThrows(PipelineExecutionException.class, pipeline::run);
assertEquals(UserCodeQuotaException.class, executionException.getCause().getClass());
}

@Test
public void canSignalTimeoutException() {
pipeline
.apply(Create.of(1))
.apply(ParDo.of(new CallerUsingDoFn<>(new CallerThrowsTimeoutException())))
.setCoder(VarIntCoder.of());

PipelineExecutionException executionException =
assertThrows(PipelineExecutionException.class, pipeline::run);
assertEquals(UserCodeTimeoutException.class, executionException.getCause().getClass());
}

private static class CallerUsingDoFn<RequestT, ResponseT> extends DoFn<RequestT, ResponseT> {
private final Caller<RequestT, ResponseT> caller;

private CallerUsingDoFn(Caller<RequestT, ResponseT> caller) {
this.caller = caller;
}

@ProcessElement
public void process(@Element RequestT request, OutputReceiver<ResponseT> receiver)
throws UserCodeExecutionException {
RequestT safeRequest = checkStateNotNull(request);
ResponseT response = caller.call(safeRequest);
receiver.output(response);
}
}

private static class CallerImpl implements Caller<Instant, String> {

@Override
public String call(Instant request) throws UserCodeExecutionException {
return request.toString();
}
}

private static class CallerThrowsQuotaException implements Caller<Integer, Integer> {

@Override
public Integer call(Integer request) throws UserCodeExecutionException {
throw new UserCodeQuotaException("quota");
}
}

private static class CallerThrowsTimeoutException implements Caller<Integer, Integer> {

@Override
public Integer call(Integer request) throws UserCodeExecutionException {
throw new UserCodeTimeoutException("timeout");
}
}
}
Loading

0 comments on commit 104c10b

Please sign in to comment.