Skip to content

Commit

Permalink
reformat code for changed test assertness commit
Browse files Browse the repository at this point in the history
  • Loading branch information
akashorabek committed Jul 14, 2022
1 parent 5d446c3 commit 134216d
Show file tree
Hide file tree
Showing 4 changed files with 15 additions and 18 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,7 @@
/** Streaming sources for Spark {@link Receiver}. */
public class SparkReceiverIO {

private static final Logger LOG =
LoggerFactory.getLogger(SparkReceiverIO.class);
private static final Logger LOG = LoggerFactory.getLogger(SparkReceiverIO.class);

public static <V> Read<V> read() {
return new AutoValue_SparkReceiverIO_Read.Builder<V>().build();
Expand Down Expand Up @@ -126,7 +125,8 @@ public PCollection<V> expand(PBegin input) {
sparkReceiverRead.getSparkReceiverBuilder();
checkStateNotNull(sparkReceiverBuilder, "withSparkReceiverBuilder() is required");
if (!HasOffset.class.isAssignableFrom(sparkReceiverBuilder.getSparkReceiverClass())) {
LOG.info("{} started reading", ReadFromSparkReceiverWithoutOffsetDoFn.class.getSimpleName());
LOG.info(
"{} started reading", ReadFromSparkReceiverWithoutOffsetDoFn.class.getSimpleName());
return input
.apply(Impulse.create())
.apply(ParDo.of(new ReadFromSparkReceiverWithoutOffsetDoFn<>(sparkReceiverRead)));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import static org.junit.Assert.assertThrows;
import static org.junit.Assert.assertTrue;

import java.util.List;
import org.apache.beam.runners.direct.DirectOptions;
import org.apache.beam.runners.direct.DirectRunner;
import org.apache.beam.sdk.Pipeline;
Expand All @@ -33,8 +34,6 @@
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;

import java.util.List;

/** Test class for {@link SparkReceiverIO}. */
@RunWith(JUnit4.class)
public class SparkReceiverIOTest {
Expand Down Expand Up @@ -142,7 +141,6 @@ public void testReadFromCustomReceiverWithoutOffset() {
.withSparkConsumer(new CustomSparkConsumer<>())
.withSparkReceiverBuilder(receiverBuilder);


WithoutOffsetTestOutputDoFn testDoFn = new WithoutOffsetTestOutputDoFn();
List<String> storedRecords = CustomReceiverWithoutOffset.getStoredRecords();
List<String> outputRecords = WithoutOffsetTestOutputDoFn.getRecords();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,21 +17,20 @@
*/
package org.apache.beam.sdk.io.sparkreceiver;

import org.apache.beam.sdk.transforms.DoFn;

import java.util.ArrayList;
import java.util.List;
import org.apache.beam.sdk.transforms.DoFn;

public class WithOffsetTestOutputDoFn extends DoFn<String, String> {
private final static List<String> records = new ArrayList<>();
private static final List<String> records = new ArrayList<>();

@ProcessElement
public void processElement(@Element String input, DoFn.OutputReceiver<String> output) {
records.add(input);
output.output(input);
}
@ProcessElement
public void processElement(@Element String input, DoFn.OutputReceiver<String> output) {
records.add(input);
output.output(input);
}

public static List<String> getRecords() {
return records;
}
public static List<String> getRecords() {
return records;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
import org.apache.beam.sdk.transforms.DoFn;

public class WithoutOffsetTestOutputDoFn extends DoFn<String, String> {
private final static List<String> records = new ArrayList<>();
private static final List<String> records = new ArrayList<>();

@ProcessElement
public void processElement(@Element String input, OutputReceiver<String> output) {
Expand Down

0 comments on commit 134216d

Please sign in to comment.