Skip to content

Commit

Permalink
#34 Adding annotation processor for exposing referenced stream names;
Browse files Browse the repository at this point in the history
The file META-INF/decodable/stream-names.properties will be generated based on the InputStreams and OutputStreams annotations.
  • Loading branch information
gunnarmorling authored Jul 18, 2023
1 parent a3297d7 commit 317e281
Show file tree
Hide file tree
Showing 10 changed files with 250 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,9 @@
*/
package co.decodable.examples.cpdemo;

import static co.decodable.examples.cpdemo.DataStreamJob.PURCHASE_ORDERS_PROCESSED_STREAM;
import static co.decodable.examples.cpdemo.DataStreamJob.PURCHASE_ORDERS_STREAM;

import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
Expand All @@ -19,21 +22,28 @@

import co.decodable.sdk.pipeline.DecodableStreamSink;
import co.decodable.sdk.pipeline.DecodableStreamSource;
import co.decodable.sdk.pipeline.metadata.SinkStreams;
import co.decodable.sdk.pipeline.metadata.SourceStreams;

@SourceStreams(PURCHASE_ORDERS_STREAM)
@SinkStreams(PURCHASE_ORDERS_PROCESSED_STREAM)
public class DataStreamJob {

static final String PURCHASE_ORDERS_PROCESSED_STREAM = "purchase-orders-processed";
static final String PURCHASE_ORDERS_STREAM = "purchase-orders";

public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

DecodableStreamSource<String> source =
DecodableStreamSource.<String>builder()
.withStreamName("purchase-orders")
.withStreamName(PURCHASE_ORDERS_STREAM)
.withDeserializationSchema(new SimpleStringSchema())
.build();

DecodableStreamSink<String> sink =
DecodableStreamSink.<String>builder()
.withStreamName("purchase-orders-processed")
.withStreamName(PURCHASE_ORDERS_PROCESSED_STREAM)
.withSerializationSchema(new SimpleStringSchema())
.build();

Expand Down
1 change: 1 addition & 0 deletions sdk/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ dependencies {
testImplementation 'org.testcontainers:junit-jupiter:1.18.3'
testImplementation 'org.slf4j:slf4j-simple:1.7.36'
testImplementation 'org.assertj:assertj-core:3.24.2'
testImplementation 'com.google.testing.compile:compile-testing:0.21.0'
}

tasks.named('jar') {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* Copyright Decodable, Inc.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package co.decodable.sdk.pipeline.internal.metadata;

import co.decodable.sdk.pipeline.metadata.SinkStreams;
import co.decodable.sdk.pipeline.metadata.SourceStreams;
import java.io.IOException;
import java.io.PrintWriter;
import java.util.Arrays;
import java.util.HashSet;
import java.util.Set;
import java.util.stream.Collectors;
import javax.annotation.processing.AbstractProcessor;
import javax.annotation.processing.RoundEnvironment;
import javax.annotation.processing.SupportedAnnotationTypes;
import javax.lang.model.SourceVersion;
import javax.lang.model.element.Element;
import javax.lang.model.element.TypeElement;
import javax.tools.Diagnostic.Kind;
import javax.tools.FileObject;
import javax.tools.StandardLocation;

/**
* An annotation processor for generating the file {@code
* "META-INF/decodable/stream-names.properties"}, allowing the Decodable platform to display the
* streams connected to a custom pipeline.
*/
@SupportedAnnotationTypes({
"co.decodable.sdk.pipeline.metadata.SourceStreams",
"co.decodable.sdk.pipeline.metadata.SinkStreams"
})
public class MetadataProcessor extends AbstractProcessor {

private static final String STREAM_NAMES_FILE = "META-INF/decodable/stream-names.properties";

private final Set<String> allSourceStreams;
private final Set<String> allSinkStreams;

public MetadataProcessor() {
allSourceStreams = new HashSet<>();
allSinkStreams = new HashSet<>();
}

@Override
public boolean process(Set<? extends TypeElement> annotations, RoundEnvironment roundEnv) {
for (TypeElement annotation : annotations) {
Set<? extends Element> annotatedElements = roundEnv.getElementsAnnotatedWith(annotation);
for (Element annotated : annotatedElements) {
SourceStreams sourceStreams = annotated.getAnnotation(SourceStreams.class);
allSourceStreams.addAll(Arrays.asList(sourceStreams.value()));

SinkStreams sinkStreams = annotated.getAnnotation(SinkStreams.class);
allSinkStreams.addAll(Arrays.asList(sinkStreams.value()));
}
}

if (roundEnv.processingOver()) {
try {
FileObject streamNamesFile =
processingEnv
.getFiler()
.createResource(StandardLocation.CLASS_OUTPUT, "", STREAM_NAMES_FILE);

try (PrintWriter out = new PrintWriter(streamNamesFile.openWriter())) {
out.println(
"source-streams=" + allSourceStreams.stream().collect(Collectors.joining(",")));
out.println("sink-streams=" + allSinkStreams.stream().collect(Collectors.joining(",")));
}
} catch (IOException e) {
processingEnv
.getMessager()
.printMessage(
Kind.ERROR, "Couldn't generate stream-names.properties file: " + e.getMessage());
}
}

return true;
}

@Override
public SourceVersion getSupportedSourceVersion() {
return SourceVersion.latest();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* Copyright Decodable, Inc.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package co.decodable.sdk.pipeline.metadata;

import co.decodable.sdk.pipeline.util.Incubating;
import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;

/**
* Denotes the sink streams accessed by a custom pipeline. Must be specified on the job class in
* order for the Decodable platform to display the connected streams.
*/
@Retention(RetentionPolicy.CLASS)
@Target(ElementType.TYPE)
@Incubating
public @interface SinkStreams {

/** One or more sink stream name. */
String[] value();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* Copyright Decodable, Inc.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package co.decodable.sdk.pipeline.metadata;

import co.decodable.sdk.pipeline.util.Incubating;
import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;

/**
* Denotes the source streams accessed by a custom pipeline. Must be specified on the job class in
* order for the Decodable platform to display the connected streams.
*/
@Retention(RetentionPolicy.CLASS)
@Target(ElementType.TYPE)
@Incubating
public @interface SourceStreams {

/** One or more source stream name. */
String[] value();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* Copyright Decodable, Inc.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/

/** Annotations for linking custom pipelines to managed Decodable streams. */
package co.decodable.sdk.pipeline.metadata;
23 changes: 22 additions & 1 deletion sdk/src/main/java/co/decodable/sdk/pipeline/package-info.java
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/

// spotless:off
/**
* An SDK for implementing Apache Flink jobs and running them on <a
* href="https://www.decodable.co">Decodable</a>.
Expand All @@ -19,5 +19,26 @@
*
* <p>{@snippet class="co.decodable.sdk.pipeline.snippets.PurchaseOrderProcessingJob"
* region="custom-pipeline"}
*
* <h2>Stream Metadata</h2>
*
* While not required, it is a good practice for custom pipeline authors to provide metadata about
* the source and sink streams accessed by their pipelines. That way, the referenced pipelines can
* be displayed in the Decodable user interface. In order to do so, add a file named
* <i>META-INF/decodable/stream-names.properties</i> to your Flink job JAR. Within that file,
* specify the name(s) of all source and sink streams as comma-separated lists, using the property
* keys "source-streams" and "sink-streams":
*
* <p>
* {@snippet :
source-streams=my_source_stream_1,my_source_stream_2
sink-streams=my_sink_stream_1,my_sink_stream_2
}
* Instead of manually creating this file, it is recommended to generate it automatically, using an
* annotation processor which ships with this SDK. To do so, specify the stream names using the
* {@link co.decodable.sdk.pipeline.metadata.SourceStreams} and
* {@link co.decodable.sdk.pipeline.metadata.SinkStreams} annotations on the job class, as shown in
* the example listing above.
*/
//spotless:on
package co.decodable.sdk.pipeline;
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
co.decodable.sdk.pipeline.internal.metadata.MetadataProcessor
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* Copyright Decodable, Inc.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package co.decodable.sdk.pipeline.internal.config.metadata;

import static com.google.testing.compile.CompilationSubject.assertThat;

import co.decodable.sdk.pipeline.internal.metadata.MetadataProcessor;
import com.google.common.io.CharSource;
import com.google.testing.compile.Compilation;
import com.google.testing.compile.Compiler;
import com.google.testing.compile.JavaFileObjects;
import java.io.File;
import java.net.MalformedURLException;
import java.net.URL;
import java.nio.charset.Charset;
import javax.tools.StandardLocation;
import org.junit.jupiter.api.Test;

public class MetadataProcessorTest {

@Test
public void shouldGenerateStreamNamesFile() throws MalformedURLException {
URL jobFile =
new File(
"./src/test/java/co/decodable/sdk/pipeline/snippets/PurchaseOrderProcessingJob.java")
.toURI()
.toURL();

Compilation compilation =
Compiler.javac()
.withProcessors(new MetadataProcessor())
.compile(JavaFileObjects.forResource(jobFile));

assertThat(compilation).succeeded();
assertThat(compilation)
.generatedFile(StandardLocation.CLASS_OUTPUT, "META-INF/decodable/stream-names.properties")
.hasContents(
CharSource.wrap(
"source-streams=purchase-orders\nsink-streams=purchase-orders-processed\n")
.asByteSource(Charset.forName("UTF-8")));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,30 +7,40 @@
*/
package co.decodable.sdk.pipeline.snippets;

import static co.decodable.sdk.pipeline.snippets.PurchaseOrderProcessingJob.PURCHASE_ORDERS_PROCESSED_STREAM;
import static co.decodable.sdk.pipeline.snippets.PurchaseOrderProcessingJob.PURCHASE_ORDERS_STREAM;

import co.decodable.sdk.pipeline.DecodableStreamSink;
import co.decodable.sdk.pipeline.DecodableStreamSource;
import co.decodable.sdk.pipeline.PurchaseOrder;
import co.decodable.sdk.pipeline.metadata.SinkStreams;
import co.decodable.sdk.pipeline.metadata.SourceStreams;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.formats.json.JsonDeserializationSchema;
import org.apache.flink.formats.json.JsonSerializationSchema;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

// spotless:off
@SourceStreams(PURCHASE_ORDERS_STREAM) // @start region="custom-pipeline"
@SinkStreams(PURCHASE_ORDERS_PROCESSED_STREAM)
public class PurchaseOrderProcessingJob {

// spotless:off
public static void main(String[] args) throws Exception { // @start region="custom-pipeline"
static final String PURCHASE_ORDERS_STREAM = "purchase-orders";
static final String PURCHASE_ORDERS_PROCESSED_STREAM = "purchase-orders-processed";

public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

// @highlight region regex=".*"
DecodableStreamSource<PurchaseOrder> source = DecodableStreamSource.<PurchaseOrder>builder()
.withStreamName("purchase-orders")
.withStreamName(PURCHASE_ORDERS_STREAM)
.withDeserializationSchema(new JsonDeserializationSchema<>(PurchaseOrder.class))
.build();

DecodableStreamSink<PurchaseOrder> sink = DecodableStreamSink.<PurchaseOrder>builder()
.withStreamName("purchase-orders-processed")
.withStreamName(PURCHASE_ORDERS_PROCESSED_STREAM)
.withSerializationSchema(new JsonSerializationSchema<>())
.build();
// @end
Expand Down

0 comments on commit 317e281

Please sign in to comment.