From 5bb5d30e00823ff9caba21793d4c69728619b21f Mon Sep 17 00:00:00 2001 From: Nicolaus Weidner Date: Mon, 18 Dec 2023 10:21:45 +0100 Subject: [PATCH 1/5] #45 chore: Clarify that @SourceStreams or @SinkStreams is required to make streams accessible --- .../java/co/decodable/sdk/pipeline/DecodableStreamSink.java | 4 +++- .../co/decodable/sdk/pipeline/DecodableStreamSource.java | 4 +++- .../co/decodable/sdk/pipeline/metadata/SinkStreams.java | 6 +++--- .../co/decodable/sdk/pipeline/metadata/SourceStreams.java | 6 +++--- 4 files changed, 12 insertions(+), 8 deletions(-) diff --git a/sdk/src/main/java/co/decodable/sdk/pipeline/DecodableStreamSink.java b/sdk/src/main/java/co/decodable/sdk/pipeline/DecodableStreamSink.java index a70c936..afb3f5a 100644 --- a/sdk/src/main/java/co/decodable/sdk/pipeline/DecodableStreamSink.java +++ b/sdk/src/main/java/co/decodable/sdk/pipeline/DecodableStreamSink.java @@ -15,7 +15,9 @@ /** * A {@link StatefulSink} which allows to write to a Decodable stream from within a Flink job. + * href="https://docs.decodable.co/docs/streams">Decodable stream from within a Flink job. The + * stream must be referenced in {@link co.decodable.sdk.pipeline.metadata.SinkStreams} to be + * accessible. * * @param The data type of this stream */ diff --git a/sdk/src/main/java/co/decodable/sdk/pipeline/DecodableStreamSource.java b/sdk/src/main/java/co/decodable/sdk/pipeline/DecodableStreamSource.java index 5c771fd..3178466 100644 --- a/sdk/src/main/java/co/decodable/sdk/pipeline/DecodableStreamSource.java +++ b/sdk/src/main/java/co/decodable/sdk/pipeline/DecodableStreamSource.java @@ -14,7 +14,9 @@ /** * A {@link Source} which allows to retrieve the contents of a Decodable stream from within a Flink job. + * href="https://docs.decodable.co/docs/streams">Decodable stream from within a Flink job. The + * stream must be referenced in {@link co.decodable.sdk.pipeline.metadata.SourceStreams} to be + * accessible. * * @param The data type of this stream */ diff --git a/sdk/src/main/java/co/decodable/sdk/pipeline/metadata/SinkStreams.java b/sdk/src/main/java/co/decodable/sdk/pipeline/metadata/SinkStreams.java index cde0464..2647760 100644 --- a/sdk/src/main/java/co/decodable/sdk/pipeline/metadata/SinkStreams.java +++ b/sdk/src/main/java/co/decodable/sdk/pipeline/metadata/SinkStreams.java @@ -14,14 +14,14 @@ import java.lang.annotation.Target; /** - * Denotes the sink streams accessed by a custom pipeline. Must be specified on the job class in - * order for the Decodable platform to display the connected streams. + * Denotes the sink streams accessed by a custom pipeline. Only streams referenced in {@link + * SourceStreams} or {@link SinkStreams} on the job class will be accessible to the pipeline. */ @Retention(RetentionPolicy.CLASS) @Target(ElementType.TYPE) @Incubating public @interface SinkStreams { - /** One or more sink stream name. */ + /** Names of the sink streams. */ String[] value(); } diff --git a/sdk/src/main/java/co/decodable/sdk/pipeline/metadata/SourceStreams.java b/sdk/src/main/java/co/decodable/sdk/pipeline/metadata/SourceStreams.java index df6a3fa..85c97bd 100644 --- a/sdk/src/main/java/co/decodable/sdk/pipeline/metadata/SourceStreams.java +++ b/sdk/src/main/java/co/decodable/sdk/pipeline/metadata/SourceStreams.java @@ -14,14 +14,14 @@ import java.lang.annotation.Target; /** - * Denotes the source streams accessed by a custom pipeline. Must be specified on the job class in - * order for the Decodable platform to display the connected streams. + * Denotes the source streams accessed by a custom pipeline. Only streams referenced in {@link + * SourceStreams} or {@link SinkStreams} on the job class will be accessible to the pipeline. */ @Retention(RetentionPolicy.CLASS) @Target(ElementType.TYPE) @Incubating public @interface SourceStreams { - /** One or more source stream name. */ + /** Names of the source streams. */ String[] value(); } From 4bb3da71577601d3ae105d19d34da03c87962969 Mon Sep 17 00:00:00 2001 From: Nicolaus Weidner Date: Mon, 18 Dec 2023 16:40:39 +0100 Subject: [PATCH 2/5] #45 chore: Fix NPE if only source streams or only sink streams are declared --- .../sdk/pipeline/internal/metadata/MetadataProcessor.java | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/sdk/src/main/java/co/decodable/sdk/pipeline/internal/metadata/MetadataProcessor.java b/sdk/src/main/java/co/decodable/sdk/pipeline/internal/metadata/MetadataProcessor.java index bd7a241..cfed08e 100644 --- a/sdk/src/main/java/co/decodable/sdk/pipeline/internal/metadata/MetadataProcessor.java +++ b/sdk/src/main/java/co/decodable/sdk/pipeline/internal/metadata/MetadataProcessor.java @@ -52,10 +52,14 @@ public boolean process(Set annotations, RoundEnvironment Set annotatedElements = roundEnv.getElementsAnnotatedWith(annotation); for (Element annotated : annotatedElements) { SourceStreams sourceStreams = annotated.getAnnotation(SourceStreams.class); - allSourceStreams.addAll(Arrays.asList(sourceStreams.value())); + if (sourceStreams != null && sourceStreams.value() != null) { + allSourceStreams.addAll(Arrays.asList(sourceStreams.value())); + } SinkStreams sinkStreams = annotated.getAnnotation(SinkStreams.class); - allSinkStreams.addAll(Arrays.asList(sinkStreams.value())); + if (sinkStreams != null && sinkStreams.value() != null) { + allSinkStreams.addAll(Arrays.asList(sinkStreams.value())); + } } } From fccca52fa61b4729bb8e52dd6c815a0f7e297a50 Mon Sep 17 00:00:00 2001 From: Nicolaus Weidner Date: Mon, 18 Dec 2023 19:03:36 +0100 Subject: [PATCH 3/5] #45 chore: Add warning if neither source nor sink streams are declared --- .../internal/metadata/MetadataProcessor.java | 39 +++++++++---- .../metadata/MetadataProcessorTest.java | 57 ++++++++++++++++++- 2 files changed, 85 insertions(+), 11 deletions(-) diff --git a/sdk/src/main/java/co/decodable/sdk/pipeline/internal/metadata/MetadataProcessor.java b/sdk/src/main/java/co/decodable/sdk/pipeline/internal/metadata/MetadataProcessor.java index cfed08e..a028c00 100644 --- a/sdk/src/main/java/co/decodable/sdk/pipeline/internal/metadata/MetadataProcessor.java +++ b/sdk/src/main/java/co/decodable/sdk/pipeline/internal/metadata/MetadataProcessor.java @@ -11,9 +11,9 @@ import co.decodable.sdk.pipeline.metadata.SourceStreams; import java.io.IOException; import java.io.PrintWriter; -import java.util.Arrays; -import java.util.HashSet; -import java.util.Set; +import java.util.*; +import java.util.logging.Level; +import java.util.logging.Logger; import java.util.stream.Collectors; import javax.annotation.processing.AbstractProcessor; import javax.annotation.processing.RoundEnvironment; @@ -28,15 +28,18 @@ /** * An annotation processor for generating the file {@code * "META-INF/decodable/stream-names.properties"}, allowing the Decodable platform to display the - * streams connected to a custom pipeline. + * streams connected to a custom pipeline. Note: We handle the supported annotation types inside the + * processor in order to print a warning when no source or sink streams were declared. */ -@SupportedAnnotationTypes({ - "co.decodable.sdk.pipeline.metadata.SourceStreams", - "co.decodable.sdk.pipeline.metadata.SinkStreams" -}) +@SupportedAnnotationTypes({"*"}) public class MetadataProcessor extends AbstractProcessor { + private static final Logger LOGGER = Logger.getLogger(MetadataProcessor.class.getName()); private static final String STREAM_NAMES_FILE = "META-INF/decodable/stream-names.properties"; + private final Set supportedAnnotationClassNames = + Set.of(SourceStreams.class, SinkStreams.class).stream() + .map(Class::getName) + .collect(Collectors.toSet()); private final Set allSourceStreams; private final Set allSinkStreams; @@ -48,7 +51,14 @@ public MetadataProcessor() { @Override public boolean process(Set annotations, RoundEnvironment roundEnv) { - for (TypeElement annotation : annotations) { + var filteredAnnotations = + annotations.stream() + .filter( + annotation -> + supportedAnnotationClassNames.contains( + annotation.getQualifiedName().toString())) + .collect(Collectors.toSet()); + for (TypeElement annotation : filteredAnnotations) { Set annotatedElements = roundEnv.getElementsAnnotatedWith(annotation); for (Element annotated : annotatedElements) { SourceStreams sourceStreams = annotated.getAnnotation(SourceStreams.class); @@ -64,6 +74,13 @@ public boolean process(Set annotations, RoundEnvironment } if (roundEnv.processingOver()) { + if (allSourceStreams.isEmpty() && allSinkStreams.isEmpty()) { + LOGGER.log( + Level.WARNING, + "Neither source nor sink streams were declared. No streams will be available to this pipeline. If this " + + "is unintentional, please use the @SourceStreams and @SinkStreams annotations to declare source " + + "and/or sink streams."); + } try { FileObject streamNamesFile = processingEnv @@ -83,7 +100,9 @@ public boolean process(Set annotations, RoundEnvironment } } - return true; + // Since we pass all annotation types to this processor, we return false to make sure other + // processors will still process their target annotations + return false; } @Override diff --git a/sdk/src/test/java/co/decodable/sdk/pipeline/internal/config/metadata/MetadataProcessorTest.java b/sdk/src/test/java/co/decodable/sdk/pipeline/internal/config/metadata/MetadataProcessorTest.java index 079573c..60e5303 100644 --- a/sdk/src/test/java/co/decodable/sdk/pipeline/internal/config/metadata/MetadataProcessorTest.java +++ b/sdk/src/test/java/co/decodable/sdk/pipeline/internal/config/metadata/MetadataProcessorTest.java @@ -8,6 +8,7 @@ package co.decodable.sdk.pipeline.internal.config.metadata; import static com.google.testing.compile.CompilationSubject.assertThat; +import static org.assertj.core.api.Assertions.assertThat; import co.decodable.sdk.pipeline.internal.metadata.MetadataProcessor; import com.google.common.io.CharSource; @@ -18,11 +19,19 @@ import java.net.MalformedURLException; import java.net.URL; import java.nio.charset.Charset; +import java.util.ArrayList; +import java.util.List; +import java.util.logging.Handler; +import java.util.logging.Level; +import java.util.logging.LogRecord; +import java.util.logging.Logger; import javax.tools.StandardLocation; import org.junit.jupiter.api.Test; public class MetadataProcessorTest { + private static final String OUTPUT_PATH = "META-INF/decodable/stream-names.properties"; + @Test public void shouldGenerateStreamNamesFile() throws MalformedURLException { URL jobFile = @@ -38,10 +47,56 @@ public void shouldGenerateStreamNamesFile() throws MalformedURLException { assertThat(compilation).succeeded(); assertThat(compilation) - .generatedFile(StandardLocation.CLASS_OUTPUT, "META-INF/decodable/stream-names.properties") + .generatedFile(StandardLocation.CLASS_OUTPUT, OUTPUT_PATH) .hasContents( CharSource.wrap( "source-streams=purchase-orders\nsink-streams=purchase-orders-processed\n") .asByteSource(Charset.forName("UTF-8"))); } + + @Test + public void shouldWarnIfNoStreamsAreDeclared() throws MalformedURLException { + var logger = Logger.getLogger(MetadataProcessor.class.getName()); + var handler = new TestHandler(); + logger.setUseParentHandlers(false); + logger.addHandler(handler); + var fileWithoutAnnotations = + new File( + "./src/test/java/co/decodable/sdk/pipeline/internal/config/metadata/MetadataProcessorTest.java") + .toURI() + .toURL(); + + var compilation = + Compiler.javac() + .withProcessors(new MetadataProcessor()) + .compile(JavaFileObjects.forResource(fileWithoutAnnotations)); + + assertThat(compilation).succeeded(); + assertThat(handler.getRecords()).hasSize(1); + var record = handler.getRecords().get(0); + assertThat(record.getLevel()).isEqualTo(Level.WARNING); + assertThat(record.getMessage()).contains("Neither source nor sink streams were declared"); + } + + private static class TestHandler extends Handler { + + private final List records = new ArrayList<>(); + + @Override + public void publish(LogRecord record) { + records.add(record); + } + + @Override + public void flush() { + records.clear(); + } + + @Override + public void close() throws SecurityException {} + + public List getRecords() { + return records; + } + } } From c78f3d46ddaac13ca81c7f22c6fe89e77a0a170d Mon Sep 17 00:00:00 2001 From: Nicolaus Weidner Date: Mon, 18 Dec 2023 21:28:09 +0100 Subject: [PATCH 4/5] #45 chore: Add tests for jobs specifying only sources or only sinks --- .../metadata/MetadataProcessorTest.java | 48 ++++++++++++++++++- .../pipeline/snippets/DummySinksOnlyJob.java | 16 +++++++ .../snippets/DummySourcesOnlyJob.java | 16 +++++++ 3 files changed, 78 insertions(+), 2 deletions(-) create mode 100644 sdk/src/test/java/co/decodable/sdk/pipeline/snippets/DummySinksOnlyJob.java create mode 100644 sdk/src/test/java/co/decodable/sdk/pipeline/snippets/DummySourcesOnlyJob.java diff --git a/sdk/src/test/java/co/decodable/sdk/pipeline/internal/config/metadata/MetadataProcessorTest.java b/sdk/src/test/java/co/decodable/sdk/pipeline/internal/config/metadata/MetadataProcessorTest.java index 60e5303..026a7f8 100644 --- a/sdk/src/test/java/co/decodable/sdk/pipeline/internal/config/metadata/MetadataProcessorTest.java +++ b/sdk/src/test/java/co/decodable/sdk/pipeline/internal/config/metadata/MetadataProcessorTest.java @@ -16,15 +16,17 @@ import com.google.testing.compile.Compiler; import com.google.testing.compile.JavaFileObjects; import java.io.File; +import java.io.IOException; import java.net.MalformedURLException; import java.net.URL; -import java.nio.charset.Charset; +import java.nio.charset.StandardCharsets; import java.util.ArrayList; import java.util.List; import java.util.logging.Handler; import java.util.logging.Level; import java.util.logging.LogRecord; import java.util.logging.Logger; +import java.util.regex.Pattern; import javax.tools.StandardLocation; import org.junit.jupiter.api.Test; @@ -51,7 +53,49 @@ public void shouldGenerateStreamNamesFile() throws MalformedURLException { .hasContents( CharSource.wrap( "source-streams=purchase-orders\nsink-streams=purchase-orders-processed\n") - .asByteSource(Charset.forName("UTF-8"))); + .asByteSource(StandardCharsets.UTF_8)); + } + + @Test + public void onlySourcesPresent() throws IOException { + URL jobFile = + new File("./src/test/java/co/decodable/sdk/pipeline/snippets/DummySourcesOnlyJob.java") + .toURI() + .toURL(); + + Compilation compilation = + Compiler.javac() + .withProcessors(new MetadataProcessor()) + .compile(JavaFileObjects.forResource(jobFile)); + + assertThat(compilation).succeeded(); + var file = compilation.generatedFile(StandardLocation.CLASS_OUTPUT, OUTPUT_PATH).get(); + var fileContents = file.getCharContent(false).toString(); + assertThat(fileContents) + .endsWith("\nsink-streams=\n") + .hasLineCount(2) + .containsPattern(Pattern.compile("^source-streams=source[1-2],source[1-2]\\n")); + } + + @Test + public void onlySinksPresent() throws IOException { + URL jobFile = + new File("./src/test/java/co/decodable/sdk/pipeline/snippets/DummySinksOnlyJob.java") + .toURI() + .toURL(); + + Compilation compilation = + Compiler.javac() + .withProcessors(new MetadataProcessor()) + .compile(JavaFileObjects.forResource(jobFile)); + + assertThat(compilation).succeeded(); + var file = compilation.generatedFile(StandardLocation.CLASS_OUTPUT, OUTPUT_PATH).get(); + var fileContents = file.getCharContent(false).toString(); + assertThat(fileContents) + .startsWith("source-streams=\n") + .hasLineCount(2) + .containsPattern(Pattern.compile("\\nsink-streams=sink[1-2],sink[1-2]$")); } @Test diff --git a/sdk/src/test/java/co/decodable/sdk/pipeline/snippets/DummySinksOnlyJob.java b/sdk/src/test/java/co/decodable/sdk/pipeline/snippets/DummySinksOnlyJob.java new file mode 100644 index 0000000..f277b19 --- /dev/null +++ b/sdk/src/test/java/co/decodable/sdk/pipeline/snippets/DummySinksOnlyJob.java @@ -0,0 +1,16 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * Copyright Decodable, Inc. + * + * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0 + */ +package co.decodable.sdk.pipeline.snippets; + +import co.decodable.sdk.pipeline.metadata.SinkStreams; + +@SinkStreams({DummySinksOnlyJob.SINK1, DummySinksOnlyJob.SINK2}) +public class DummySinksOnlyJob { + public static final String SINK1 = "sink1"; + public static final String SINK2 = "sink2"; +} diff --git a/sdk/src/test/java/co/decodable/sdk/pipeline/snippets/DummySourcesOnlyJob.java b/sdk/src/test/java/co/decodable/sdk/pipeline/snippets/DummySourcesOnlyJob.java new file mode 100644 index 0000000..f24cbc6 --- /dev/null +++ b/sdk/src/test/java/co/decodable/sdk/pipeline/snippets/DummySourcesOnlyJob.java @@ -0,0 +1,16 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * Copyright Decodable, Inc. + * + * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0 + */ +package co.decodable.sdk.pipeline.snippets; + +import co.decodable.sdk.pipeline.metadata.SourceStreams; + +@SourceStreams({DummySourcesOnlyJob.SOURCE1, DummySourcesOnlyJob.SOURCE2}) +public class DummySourcesOnlyJob { + public static final String SOURCE1 = "source1"; + public static final String SOURCE2 = "source2"; +} From 2dd5cb21befc7f5cb03227e8be5766792de579fc Mon Sep 17 00:00:00 2001 From: Nicolaus Weidner Date: Mon, 18 Dec 2023 21:29:35 +0100 Subject: [PATCH 5/5] #45 chore: Extract method --- .../metadata/MetadataProcessorTest.java | 26 +++++++------------ 1 file changed, 10 insertions(+), 16 deletions(-) diff --git a/sdk/src/test/java/co/decodable/sdk/pipeline/internal/config/metadata/MetadataProcessorTest.java b/sdk/src/test/java/co/decodable/sdk/pipeline/internal/config/metadata/MetadataProcessorTest.java index 026a7f8..4ed1e75 100644 --- a/sdk/src/test/java/co/decodable/sdk/pipeline/internal/config/metadata/MetadataProcessorTest.java +++ b/sdk/src/test/java/co/decodable/sdk/pipeline/internal/config/metadata/MetadataProcessorTest.java @@ -42,10 +42,7 @@ public void shouldGenerateStreamNamesFile() throws MalformedURLException { .toURI() .toURL(); - Compilation compilation = - Compiler.javac() - .withProcessors(new MetadataProcessor()) - .compile(JavaFileObjects.forResource(jobFile)); + Compilation compilation = compile(jobFile); assertThat(compilation).succeeded(); assertThat(compilation) @@ -63,10 +60,7 @@ public void onlySourcesPresent() throws IOException { .toURI() .toURL(); - Compilation compilation = - Compiler.javac() - .withProcessors(new MetadataProcessor()) - .compile(JavaFileObjects.forResource(jobFile)); + Compilation compilation = compile(jobFile); assertThat(compilation).succeeded(); var file = compilation.generatedFile(StandardLocation.CLASS_OUTPUT, OUTPUT_PATH).get(); @@ -84,10 +78,7 @@ public void onlySinksPresent() throws IOException { .toURI() .toURL(); - Compilation compilation = - Compiler.javac() - .withProcessors(new MetadataProcessor()) - .compile(JavaFileObjects.forResource(jobFile)); + Compilation compilation = compile(jobFile); assertThat(compilation).succeeded(); var file = compilation.generatedFile(StandardLocation.CLASS_OUTPUT, OUTPUT_PATH).get(); @@ -110,10 +101,7 @@ public void shouldWarnIfNoStreamsAreDeclared() throws MalformedURLException { .toURI() .toURL(); - var compilation = - Compiler.javac() - .withProcessors(new MetadataProcessor()) - .compile(JavaFileObjects.forResource(fileWithoutAnnotations)); + var compilation = compile(fileWithoutAnnotations); assertThat(compilation).succeeded(); assertThat(handler.getRecords()).hasSize(1); @@ -122,6 +110,12 @@ var record = handler.getRecords().get(0); assertThat(record.getMessage()).contains("Neither source nor sink streams were declared"); } + private static Compilation compile(URL fileWithoutAnnotations) { + return Compiler.javac() + .withProcessors(new MetadataProcessor()) + .compile(JavaFileObjects.forResource(fileWithoutAnnotations)); + } + private static class TestHandler extends Handler { private final List records = new ArrayList<>();