Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update @SourceStreams and @SinkStreams docs to clarify that they are required #46

Merged
merged 5 commits into from
Jan 9, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,9 @@

/**
* A {@link StatefulSink} which allows to write to a <a
* href="https://docs.decodable.co/docs/streams">Decodable stream</a> from within a Flink job.
* href="https://docs.decodable.co/docs/streams">Decodable stream</a> from within a Flink job. The
* stream must be referenced in {@link co.decodable.sdk.pipeline.metadata.SinkStreams} to be
* accessible.
*
* @param <T> The data type of this stream
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,9 @@

/**
* A {@link Source} which allows to retrieve the contents of a <a
* href="https://docs.decodable.co/docs/streams">Decodable stream</a> from within a Flink job.
* href="https://docs.decodable.co/docs/streams">Decodable stream</a> from within a Flink job. The
* stream must be referenced in {@link co.decodable.sdk.pipeline.metadata.SourceStreams} to be
* accessible.
*
* @param <T> The data type of this stream
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,9 @@
import co.decodable.sdk.pipeline.metadata.SourceStreams;
import java.io.IOException;
import java.io.PrintWriter;
import java.util.Arrays;
import java.util.HashSet;
import java.util.Set;
import java.util.*;
import java.util.logging.Level;
import java.util.logging.Logger;
import java.util.stream.Collectors;
import javax.annotation.processing.AbstractProcessor;
import javax.annotation.processing.RoundEnvironment;
Expand All @@ -28,15 +28,18 @@
/**
* An annotation processor for generating the file {@code
* "META-INF/decodable/stream-names.properties"}, allowing the Decodable platform to display the
* streams connected to a custom pipeline.
* streams connected to a custom pipeline. Note: We handle the supported annotation types inside the
* processor in order to print a warning when no source or sink streams were declared.
*/
@SupportedAnnotationTypes({
"co.decodable.sdk.pipeline.metadata.SourceStreams",
"co.decodable.sdk.pipeline.metadata.SinkStreams"
})
@SupportedAnnotationTypes({"*"})
public class MetadataProcessor extends AbstractProcessor {

private static final Logger LOGGER = Logger.getLogger(MetadataProcessor.class.getName());
private static final String STREAM_NAMES_FILE = "META-INF/decodable/stream-names.properties";
private final Set<String> supportedAnnotationClassNames =
Set.of(SourceStreams.class, SinkStreams.class).stream()
.map(Class::getName)
.collect(Collectors.toSet());

private final Set<String> allSourceStreams;
private final Set<String> allSinkStreams;
Expand All @@ -48,18 +51,36 @@ public MetadataProcessor() {

@Override
public boolean process(Set<? extends TypeElement> annotations, RoundEnvironment roundEnv) {
for (TypeElement annotation : annotations) {
var filteredAnnotations =
annotations.stream()
.filter(
annotation ->
supportedAnnotationClassNames.contains(
annotation.getQualifiedName().toString()))
.collect(Collectors.toSet());
for (TypeElement annotation : filteredAnnotations) {
Set<? extends Element> annotatedElements = roundEnv.getElementsAnnotatedWith(annotation);
for (Element annotated : annotatedElements) {
SourceStreams sourceStreams = annotated.getAnnotation(SourceStreams.class);
allSourceStreams.addAll(Arrays.asList(sourceStreams.value()));
if (sourceStreams != null && sourceStreams.value() != null) {
allSourceStreams.addAll(Arrays.asList(sourceStreams.value()));
}

SinkStreams sinkStreams = annotated.getAnnotation(SinkStreams.class);
allSinkStreams.addAll(Arrays.asList(sinkStreams.value()));
if (sinkStreams != null && sinkStreams.value() != null) {
allSinkStreams.addAll(Arrays.asList(sinkStreams.value()));
}
}
}

if (roundEnv.processingOver()) {
if (allSourceStreams.isEmpty() && allSinkStreams.isEmpty()) {
LOGGER.log(
Level.WARNING,
"Neither source nor sink streams were declared. No streams will be available to this pipeline. If this "
+ "is unintentional, please use the @SourceStreams and @SinkStreams annotations to declare source "
+ "and/or sink streams.");
}
try {
FileObject streamNamesFile =
processingEnv
Expand All @@ -79,7 +100,9 @@ public boolean process(Set<? extends TypeElement> annotations, RoundEnvironment
}
}

return true;
// Since we pass all annotation types to this processor, we return false to make sure other
// processors will still process their target annotations
return false;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,14 +14,14 @@
import java.lang.annotation.Target;

/**
* Denotes the sink streams accessed by a custom pipeline. Must be specified on the job class in
* order for the Decodable platform to display the connected streams.
* Denotes the sink streams accessed by a custom pipeline. Only streams referenced in {@link
* SourceStreams} or {@link SinkStreams} on the job class will be accessible to the pipeline.
*/
@Retention(RetentionPolicy.CLASS)
@Target(ElementType.TYPE)
@Incubating
public @interface SinkStreams {

/** One or more sink stream name. */
/** Names of the sink streams. */
String[] value();
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,14 +14,14 @@
import java.lang.annotation.Target;

/**
* Denotes the source streams accessed by a custom pipeline. Must be specified on the job class in
* order for the Decodable platform to display the connected streams.
* Denotes the source streams accessed by a custom pipeline. Only streams referenced in {@link
* SourceStreams} or {@link SinkStreams} on the job class will be accessible to the pipeline.
*/
@Retention(RetentionPolicy.CLASS)
@Target(ElementType.TYPE)
@Incubating
public @interface SourceStreams {

/** One or more source stream name. */
/** Names of the source streams. */
String[] value();
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,21 +8,32 @@
package co.decodable.sdk.pipeline.internal.config.metadata;

import static com.google.testing.compile.CompilationSubject.assertThat;
import static org.assertj.core.api.Assertions.assertThat;

import co.decodable.sdk.pipeline.internal.metadata.MetadataProcessor;
import com.google.common.io.CharSource;
import com.google.testing.compile.Compilation;
import com.google.testing.compile.Compiler;
import com.google.testing.compile.JavaFileObjects;
import java.io.File;
import java.io.IOException;
import java.net.MalformedURLException;
import java.net.URL;
import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.List;
import java.util.logging.Handler;
import java.util.logging.Level;
import java.util.logging.LogRecord;
import java.util.logging.Logger;
import java.util.regex.Pattern;
import javax.tools.StandardLocation;
import org.junit.jupiter.api.Test;

public class MetadataProcessorTest {

private static final String OUTPUT_PATH = "META-INF/decodable/stream-names.properties";

@Test
public void shouldGenerateStreamNamesFile() throws MalformedURLException {
URL jobFile =
Expand All @@ -31,17 +42,99 @@ public void shouldGenerateStreamNamesFile() throws MalformedURLException {
.toURI()
.toURL();

Compilation compilation =
Compiler.javac()
.withProcessors(new MetadataProcessor())
.compile(JavaFileObjects.forResource(jobFile));
Compilation compilation = compile(jobFile);

assertThat(compilation).succeeded();
assertThat(compilation)
.generatedFile(StandardLocation.CLASS_OUTPUT, "META-INF/decodable/stream-names.properties")
.generatedFile(StandardLocation.CLASS_OUTPUT, OUTPUT_PATH)
.hasContents(
CharSource.wrap(
"source-streams=purchase-orders\nsink-streams=purchase-orders-processed\n")
.asByteSource(Charset.forName("UTF-8")));
.asByteSource(StandardCharsets.UTF_8));
}

@Test
public void onlySourcesPresent() throws IOException {
URL jobFile =
new File("./src/test/java/co/decodable/sdk/pipeline/snippets/DummySourcesOnlyJob.java")
.toURI()
.toURL();

Compilation compilation = compile(jobFile);

assertThat(compilation).succeeded();
var file = compilation.generatedFile(StandardLocation.CLASS_OUTPUT, OUTPUT_PATH).get();
var fileContents = file.getCharContent(false).toString();
assertThat(fileContents)
.endsWith("\nsink-streams=\n")
.hasLineCount(2)
.containsPattern(Pattern.compile("^source-streams=source[1-2],source[1-2]\\n"));
}

@Test
public void onlySinksPresent() throws IOException {
URL jobFile =
new File("./src/test/java/co/decodable/sdk/pipeline/snippets/DummySinksOnlyJob.java")
.toURI()
.toURL();

Compilation compilation = compile(jobFile);

assertThat(compilation).succeeded();
var file = compilation.generatedFile(StandardLocation.CLASS_OUTPUT, OUTPUT_PATH).get();
var fileContents = file.getCharContent(false).toString();
assertThat(fileContents)
.startsWith("source-streams=\n")
.hasLineCount(2)
.containsPattern(Pattern.compile("\\nsink-streams=sink[1-2],sink[1-2]$"));
}

@Test
public void shouldWarnIfNoStreamsAreDeclared() throws MalformedURLException {
var logger = Logger.getLogger(MetadataProcessor.class.getName());
var handler = new TestHandler();
logger.setUseParentHandlers(false);
logger.addHandler(handler);
var fileWithoutAnnotations =
new File(
"./src/test/java/co/decodable/sdk/pipeline/internal/config/metadata/MetadataProcessorTest.java")
.toURI()
.toURL();

var compilation = compile(fileWithoutAnnotations);

assertThat(compilation).succeeded();
assertThat(handler.getRecords()).hasSize(1);
var record = handler.getRecords().get(0);
assertThat(record.getLevel()).isEqualTo(Level.WARNING);
assertThat(record.getMessage()).contains("Neither source nor sink streams were declared");
}

private static Compilation compile(URL fileWithoutAnnotations) {
return Compiler.javac()
.withProcessors(new MetadataProcessor())
.compile(JavaFileObjects.forResource(fileWithoutAnnotations));
}

private static class TestHandler extends Handler {

private final List<LogRecord> records = new ArrayList<>();

@Override
public void publish(LogRecord record) {
records.add(record);
}

@Override
public void flush() {
records.clear();
}

@Override
public void close() throws SecurityException {}

public List<LogRecord> getRecords() {
return records;
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* Copyright Decodable, Inc.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package co.decodable.sdk.pipeline.snippets;

import co.decodable.sdk.pipeline.metadata.SinkStreams;

@SinkStreams({DummySinksOnlyJob.SINK1, DummySinksOnlyJob.SINK2})
public class DummySinksOnlyJob {
public static final String SINK1 = "sink1";
public static final String SINK2 = "sink2";
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* Copyright Decodable, Inc.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package co.decodable.sdk.pipeline.snippets;

import co.decodable.sdk.pipeline.metadata.SourceStreams;

@SourceStreams({DummySourcesOnlyJob.SOURCE1, DummySourcesOnlyJob.SOURCE2})
public class DummySourcesOnlyJob {
public static final String SOURCE1 = "source1";
public static final String SOURCE2 = "source2";
}