diff --git a/statefun-flink/statefun-flink-common/src/main/java/org/apache/flink/statefun/flink/common/json/Selectors.java b/statefun-flink/statefun-flink-common/src/main/java/org/apache/flink/statefun/flink/common/json/Selectors.java index 33de036e1..755c3a901 100644 --- a/statefun-flink/statefun-flink-common/src/main/java/org/apache/flink/statefun/flink/common/json/Selectors.java +++ b/statefun-flink/statefun-flink-common/src/main/java/org/apache/flink/statefun/flink/common/json/Selectors.java @@ -111,9 +111,6 @@ public static Map propertiesAt(JsonNode node, JsonPointer pointe throw new WrongTypeException(pointer, "not a key-value list"); } Map.Entry field = fields.next(); - if (!field.getValue().isTextual()) { - throw new WrongTypeException(pointer, "not a key-value pair at " + field); - } properties.put(field.getKey(), field.getValue().asText()); } return properties; diff --git a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/jsonmodule/JsonModule.java b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/jsonmodule/JsonModule.java index 14a769c04..0d167c11b 100644 --- a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/jsonmodule/JsonModule.java +++ b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/jsonmodule/JsonModule.java @@ -55,6 +55,7 @@ import org.apache.flink.statefun.flink.core.protorouter.AutoRoutableProtobufRouter; import org.apache.flink.statefun.flink.core.protorouter.ProtobufRouter; import org.apache.flink.statefun.flink.io.kafka.ProtobufKafkaIngressTypes; +import org.apache.flink.statefun.flink.io.kinesis.PolyglotKinesisIOTypes; import org.apache.flink.statefun.flink.io.spi.JsonEgressSpec; import org.apache.flink.statefun.flink.io.spi.JsonIngressSpec; import org.apache.flink.statefun.sdk.EgressType; @@ -139,7 +140,7 @@ private void configureIngress(Binder binder, Iterable ingres JsonIngressSpec ingressSpec = new JsonIngressSpec<>(type, id, ingress); binder.bindIngress(ingressSpec); - if (type.equals(ProtobufKafkaIngressTypes.ROUTABLE_PROTOBUF_KAFKA_INGRESS_TYPE)) { + if (isAutoRoutableIngress(type)) { binder.bindIngressRouter(id, new AutoRoutableProtobufRouter()); } } @@ -170,6 +171,11 @@ private static IngressIdentifier ingressId(JsonNode ingress) { return new IngressIdentifier<>(Message.class, nn.namespace(), nn.name()); } + private static boolean isAutoRoutableIngress(IngressType ingressType) { + return ingressType.equals(ProtobufKafkaIngressTypes.ROUTABLE_PROTOBUF_KAFKA_INGRESS_TYPE) + || ingressType.equals(PolyglotKinesisIOTypes.ROUTABLE_PROTOBUF_KINESIS_INGRESS_TYPE); + } + // ---------------------------------------------------------------------------------------------------------- // Egresses // ---------------------------------------------------------------------------------------------------------- diff --git a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/protorouter/AutoRoutableProtobufRouter.java b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/protorouter/AutoRoutableProtobufRouter.java index 4e0836933..eb37fe831 100644 --- a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/protorouter/AutoRoutableProtobufRouter.java +++ b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/protorouter/AutoRoutableProtobufRouter.java @@ -24,7 +24,6 @@ import org.apache.flink.statefun.flink.io.generated.AutoRoutable; import org.apache.flink.statefun.flink.io.generated.RoutingConfig; import org.apache.flink.statefun.flink.io.generated.TargetFunctionType; -import org.apache.flink.statefun.flink.io.kafka.ProtobufKafkaIngressTypes; import org.apache.flink.statefun.sdk.FunctionType; import org.apache.flink.statefun.sdk.io.Router; @@ -32,8 +31,7 @@ * A {@link Router} that recognizes messages of type {@link AutoRoutable}. * *

For each incoming {@code AutoRoutable}, this router forwards the wrapped payload to the - * configured target addresses as a Protobuf {@link Any} message. This should only be attached to - * ingress types of {@link ProtobufKafkaIngressTypes#ROUTABLE_PROTOBUF_KAFKA_INGRESS_TYPE}. + * configured target addresses as a Protobuf {@link Any} message. */ public final class AutoRoutableProtobufRouter implements Router { diff --git a/statefun-flink/statefun-flink-io-bundle/src/main/java/org/apache/flink/statefun/flink/io/kinesis/KinesisFlinkIOModule.java b/statefun-flink/statefun-flink-io-bundle/src/main/java/org/apache/flink/statefun/flink/io/kinesis/KinesisFlinkIOModule.java index c3cbd0a11..fa813fd49 100644 --- a/statefun-flink/statefun-flink-io-bundle/src/main/java/org/apache/flink/statefun/flink/io/kinesis/KinesisFlinkIOModule.java +++ b/statefun-flink/statefun-flink-io-bundle/src/main/java/org/apache/flink/statefun/flink/io/kinesis/KinesisFlinkIOModule.java @@ -19,6 +19,8 @@ import com.google.auto.service.AutoService; import java.util.Map; +import org.apache.flink.statefun.flink.io.kinesis.polyglot.GenericKinesisSinkProvider; +import org.apache.flink.statefun.flink.io.kinesis.polyglot.RoutableProtobufKinesisSourceProvider; import org.apache.flink.statefun.flink.io.spi.FlinkIoModule; import org.apache.flink.statefun.sdk.kinesis.KinesisIOTypes; @@ -29,5 +31,10 @@ public final class KinesisFlinkIOModule implements FlinkIoModule { public void configure(Map globalConfiguration, Binder binder) { binder.bindSourceProvider(KinesisIOTypes.UNIVERSAL_INGRESS_TYPE, new KinesisSourceProvider()); binder.bindSinkProvider(KinesisIOTypes.UNIVERSAL_EGRESS_TYPE, new KinesisSinkProvider()); + binder.bindSourceProvider( + PolyglotKinesisIOTypes.ROUTABLE_PROTOBUF_KINESIS_INGRESS_TYPE, + new RoutableProtobufKinesisSourceProvider()); + binder.bindSinkProvider( + PolyglotKinesisIOTypes.GENERIC_KINESIS_EGRESS_TYPE, new GenericKinesisSinkProvider()); } } diff --git a/statefun-flink/statefun-flink-io-bundle/src/main/java/org/apache/flink/statefun/flink/io/kinesis/KinesisSinkProvider.java b/statefun-flink/statefun-flink-io-bundle/src/main/java/org/apache/flink/statefun/flink/io/kinesis/KinesisSinkProvider.java index c1156b62e..c969a0dff 100644 --- a/statefun-flink/statefun-flink-io-bundle/src/main/java/org/apache/flink/statefun/flink/io/kinesis/KinesisSinkProvider.java +++ b/statefun-flink/statefun-flink-io-bundle/src/main/java/org/apache/flink/statefun/flink/io/kinesis/KinesisSinkProvider.java @@ -26,7 +26,7 @@ import org.apache.flink.streaming.api.functions.sink.SinkFunction; import org.apache.flink.streaming.connectors.kinesis.FlinkKinesisProducer; -final class KinesisSinkProvider implements SinkProvider { +public final class KinesisSinkProvider implements SinkProvider { @Override public SinkFunction forSpec(EgressSpec spec) { diff --git a/statefun-flink/statefun-flink-io-bundle/src/main/java/org/apache/flink/statefun/flink/io/kinesis/KinesisSourceProvider.java b/statefun-flink/statefun-flink-io-bundle/src/main/java/org/apache/flink/statefun/flink/io/kinesis/KinesisSourceProvider.java index 92665eb54..93e322ed9 100644 --- a/statefun-flink/statefun-flink-io-bundle/src/main/java/org/apache/flink/statefun/flink/io/kinesis/KinesisSourceProvider.java +++ b/statefun-flink/statefun-flink-io-bundle/src/main/java/org/apache/flink/statefun/flink/io/kinesis/KinesisSourceProvider.java @@ -20,10 +20,8 @@ import java.time.ZonedDateTime; import java.time.format.DateTimeFormatter; import java.util.Properties; -import org.apache.flink.statefun.flink.io.common.ReflectionUtil; import org.apache.flink.statefun.flink.io.spi.SourceProvider; import org.apache.flink.statefun.sdk.io.IngressSpec; -import org.apache.flink.statefun.sdk.kinesis.ingress.KinesisIngressDeserializer; import org.apache.flink.statefun.sdk.kinesis.ingress.KinesisIngressSpec; import org.apache.flink.statefun.sdk.kinesis.ingress.KinesisIngressStartupPosition; import org.apache.flink.streaming.api.functions.source.SourceFunction; @@ -32,7 +30,7 @@ import org.apache.flink.streaming.connectors.kinesis.serialization.KinesisDeserializationSchema; import org.apache.flink.streaming.connectors.kinesis.util.AWSUtil; -final class KinesisSourceProvider implements SourceProvider { +public final class KinesisSourceProvider implements SourceProvider { @Override public SourceFunction forSpec(IngressSpec spec) { @@ -56,9 +54,7 @@ private static KinesisIngressSpec asKinesisSpec(IngressSpec spec) { private static KinesisDeserializationSchema deserializationSchemaFromSpec( KinesisIngressSpec spec) { - KinesisIngressDeserializer ingressDeserializer = - ReflectionUtil.instantiate(spec.deserializerClass()); - return new KinesisDeserializationSchemaDelegate<>(ingressDeserializer); + return new KinesisDeserializationSchemaDelegate<>(spec.deserializer()); } private static Properties propertiesFromSpec(KinesisIngressSpec spec) { diff --git a/statefun-flink/statefun-flink-io-bundle/src/main/java/org/apache/flink/statefun/flink/io/kinesis/polyglot/AwsAuthSpecJsonParser.java b/statefun-flink/statefun-flink-io-bundle/src/main/java/org/apache/flink/statefun/flink/io/kinesis/polyglot/AwsAuthSpecJsonParser.java new file mode 100644 index 000000000..47c26e489 --- /dev/null +++ b/statefun-flink/statefun-flink-io-bundle/src/main/java/org/apache/flink/statefun/flink/io/kinesis/polyglot/AwsAuthSpecJsonParser.java @@ -0,0 +1,130 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.statefun.flink.io.kinesis.polyglot; + +import java.util.Arrays; +import java.util.List; +import java.util.Optional; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonPointer; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode; +import org.apache.flink.statefun.flink.common.json.Selectors; +import org.apache.flink.statefun.sdk.kinesis.auth.AwsCredentials; +import org.apache.flink.statefun.sdk.kinesis.auth.AwsRegion; + +final class AwsAuthSpecJsonParser { + + private AwsAuthSpecJsonParser() {} + + private static final JsonPointer AWS_REGION_POINTER = JsonPointer.compile("/awsRegion"); + private static final JsonPointer AWS_CREDENTIALS_POINTER = JsonPointer.compile("/awsCredentials"); + + private static final class Region { + private static final String DEFAULT_TYPE = "default"; + private static final String SPECIFIED_ID_TYPE = "specific"; + private static final String CUSTOM_ENDPOINT_TYPE = "custom-endpoint"; + + private static final JsonPointer TYPE_POINTER = JsonPointer.compile("/type"); + private static final JsonPointer ID_POINTER = JsonPointer.compile("/id"); + private static final JsonPointer ENDPOINT_POINTER = JsonPointer.compile("/endpoint"); + } + + private static final class Credentials { + private static final String DEFAULT_TYPE = "default"; + private static final String BASIC_TYPE = "basic"; + private static final String PROFILE_TYPE = "profile"; + + private static final JsonPointer TYPE_POINTER = JsonPointer.compile("/type"); + private static final JsonPointer ACCESS_KEY_ID_POINTER = JsonPointer.compile("/accessKeyId"); + private static final JsonPointer SECRET_ACCESS_KEY_POINTER = + JsonPointer.compile("/secretAccessKey"); + private static final JsonPointer PROFILE_NAME_POINTER = JsonPointer.compile("/profileName"); + private static final JsonPointer PROFILE_PATH_POINTER = JsonPointer.compile("/profilePath"); + } + + static Optional optionalAwsRegion(JsonNode specNode) { + final JsonNode awsRegionSpecNode = specNode.at(AWS_REGION_POINTER); + if (awsRegionSpecNode.isMissingNode()) { + return Optional.empty(); + } + + final String type = Selectors.textAt(awsRegionSpecNode, Region.TYPE_POINTER); + switch (type) { + case Region.DEFAULT_TYPE: + return Optional.of(AwsRegion.fromDefaultProviderChain()); + case Region.SPECIFIED_ID_TYPE: + return Optional.of(AwsRegion.ofId(Selectors.textAt(awsRegionSpecNode, Region.ID_POINTER))); + case Region.CUSTOM_ENDPOINT_TYPE: + return Optional.of( + AwsRegion.ofCustomEndpoint( + Selectors.textAt(awsRegionSpecNode, Region.ENDPOINT_POINTER), + Selectors.textAt(awsRegionSpecNode, Region.ID_POINTER))); + default: + final List validValues = + Arrays.asList( + Region.DEFAULT_TYPE, Region.SPECIFIED_ID_TYPE, Region.CUSTOM_ENDPOINT_TYPE); + throw new IllegalArgumentException( + "Invalid AWS region type: " + + type + + "; valid values are [" + + String.join(", ", validValues) + + "]"); + } + } + + static Optional optionalAwsCredentials(JsonNode specNode) { + final JsonNode awsCredentialsSpecNode = specNode.at(AWS_CREDENTIALS_POINTER); + if (awsCredentialsSpecNode.isMissingNode()) { + return Optional.empty(); + } + + final String type = Selectors.textAt(awsCredentialsSpecNode, Credentials.TYPE_POINTER); + switch (type) { + case Credentials.DEFAULT_TYPE: + return Optional.of(AwsCredentials.fromDefaultProviderChain()); + case Credentials.BASIC_TYPE: + return Optional.of( + AwsCredentials.basic( + Selectors.textAt(awsCredentialsSpecNode, Credentials.ACCESS_KEY_ID_POINTER), + Selectors.textAt(awsCredentialsSpecNode, Credentials.SECRET_ACCESS_KEY_POINTER))); + case Credentials.PROFILE_TYPE: + final Optional path = + Selectors.optionalTextAt(awsCredentialsSpecNode, Credentials.PROFILE_PATH_POINTER); + if (path.isPresent()) { + return Optional.of( + AwsCredentials.profile( + Selectors.textAt(awsCredentialsSpecNode, Credentials.PROFILE_NAME_POINTER), + path.get())); + } else { + return Optional.of( + AwsCredentials.profile( + Selectors.textAt(awsCredentialsSpecNode, Credentials.PROFILE_NAME_POINTER))); + } + default: + final List validValues = + Arrays.asList( + Credentials.DEFAULT_TYPE, Credentials.BASIC_TYPE, Credentials.PROFILE_TYPE); + throw new IllegalArgumentException( + "Invalid AWS credential type: " + + type + + "; valid values are [" + + String.join(", ", validValues) + + "]"); + } + } +} diff --git a/statefun-flink/statefun-flink-io-bundle/src/main/java/org/apache/flink/statefun/flink/io/kinesis/polyglot/GenericKinesisEgressSerializer.java b/statefun-flink/statefun-flink-io-bundle/src/main/java/org/apache/flink/statefun/flink/io/kinesis/polyglot/GenericKinesisEgressSerializer.java new file mode 100644 index 000000000..84e2bd8fc --- /dev/null +++ b/statefun-flink/statefun-flink-io-bundle/src/main/java/org/apache/flink/statefun/flink/io/kinesis/polyglot/GenericKinesisEgressSerializer.java @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.statefun.flink.io.kinesis.polyglot; + +import com.google.protobuf.Any; +import com.google.protobuf.InvalidProtocolBufferException; +import org.apache.flink.statefun.flink.io.generated.KinesisEgressRecord; +import org.apache.flink.statefun.sdk.kinesis.egress.EgressRecord; +import org.apache.flink.statefun.sdk.kinesis.egress.KinesisEgressSerializer; + +public final class GenericKinesisEgressSerializer implements KinesisEgressSerializer { + + private static final long serialVersionUID = 1L; + + @Override + public EgressRecord serialize(Any value) { + final KinesisEgressRecord kinesisEgressRecord = asKinesisEgressRecord(value); + return EgressRecord.newBuilder() + .withData(kinesisEgressRecord.getValueBytes().toByteArray()) + .withStream(kinesisEgressRecord.getStream()) + .withPartitionKey(kinesisEgressRecord.getPartitionKey()) + .withExplicitHashKey(kinesisEgressRecord.getExplicitHashKey()) + .build(); + } + + private static KinesisEgressRecord asKinesisEgressRecord(Any message) { + if (!message.is(KinesisEgressRecord.class)) { + throw new IllegalStateException( + "The generic Kinesis egress expects only messages of type " + + KinesisEgressRecord.class.getName()); + } + try { + return message.unpack(KinesisEgressRecord.class); + } catch (InvalidProtocolBufferException e) { + throw new RuntimeException( + "Unable to unpack message as a " + KinesisEgressRecord.class.getName(), e); + } + } +} diff --git a/statefun-flink/statefun-flink-io-bundle/src/main/java/org/apache/flink/statefun/flink/io/kinesis/polyglot/GenericKinesisSinkProvider.java b/statefun-flink/statefun-flink-io-bundle/src/main/java/org/apache/flink/statefun/flink/io/kinesis/polyglot/GenericKinesisSinkProvider.java new file mode 100644 index 000000000..943e8de50 --- /dev/null +++ b/statefun-flink/statefun-flink-io-bundle/src/main/java/org/apache/flink/statefun/flink/io/kinesis/polyglot/GenericKinesisSinkProvider.java @@ -0,0 +1,91 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.statefun.flink.io.kinesis.polyglot; + +import static org.apache.flink.statefun.flink.io.kinesis.polyglot.AwsAuthSpecJsonParser.optionalAwsCredentials; +import static org.apache.flink.statefun.flink.io.kinesis.polyglot.AwsAuthSpecJsonParser.optionalAwsRegion; +import static org.apache.flink.statefun.flink.io.kinesis.polyglot.KinesisEgressSpecJsonParser.optionalMaxOutstandingRecords; + +import com.google.protobuf.Any; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode; +import org.apache.flink.statefun.flink.io.kinesis.KinesisSinkProvider; +import org.apache.flink.statefun.flink.io.spi.JsonEgressSpec; +import org.apache.flink.statefun.flink.io.spi.SinkProvider; +import org.apache.flink.statefun.sdk.io.EgressIdentifier; +import org.apache.flink.statefun.sdk.io.EgressSpec; +import org.apache.flink.statefun.sdk.kinesis.egress.KinesisEgressBuilder; +import org.apache.flink.statefun.sdk.kinesis.egress.KinesisEgressSpec; +import org.apache.flink.streaming.api.functions.sink.SinkFunction; + +public final class GenericKinesisSinkProvider implements SinkProvider { + + private final KinesisSinkProvider delegateProvider = new KinesisSinkProvider(); + + @Override + public SinkFunction forSpec(EgressSpec spec) { + final KinesisEgressSpec kinesisEgressSpec = asKinesisEgressSpec(spec); + return delegateProvider.forSpec(kinesisEgressSpec); + } + + private static KinesisEgressSpec asKinesisEgressSpec(EgressSpec spec) { + if (!(spec instanceof JsonEgressSpec)) { + throw new IllegalArgumentException("Wrong type " + spec.type()); + } + JsonEgressSpec casted = (JsonEgressSpec) spec; + + EgressIdentifier id = casted.id(); + validateConsumedType(id); + + JsonNode specJson = casted.specJson(); + + KinesisEgressBuilder kinesisEgressBuilder = KinesisEgressBuilder.forIdentifier(id); + + optionalAwsRegion(specJson).ifPresent(kinesisEgressBuilder::withAwsRegion); + optionalAwsCredentials(specJson).ifPresent(kinesisEgressBuilder::withAwsCredentials); + optionalMaxOutstandingRecords(specJson) + .ifPresent(kinesisEgressBuilder::withMaxOutstandingRecords); + KinesisIngressSpecJsonParser.clientConfigProperties(specJson) + .entrySet() + .forEach( + entry -> + kinesisEgressBuilder.withClientConfigurationProperty( + entry.getKey(), entry.getValue())); + + kinesisEgressBuilder.withSerializer(serializerClass()); + + return kinesisEgressBuilder.build(); + } + + private static void validateConsumedType(EgressIdentifier id) { + Class consumedType = id.consumedType(); + if (Any.class != consumedType) { + throw new IllegalArgumentException( + "Generic Kinesis egress is only able to consume messages types of " + + Any.class.getName() + + " but " + + consumedType.getName() + + " is provided."); + } + } + + @SuppressWarnings("unchecked") + private static Class serializerClass() { + // this cast is safe, because we've already validated that the consumed type is Any. + return (Class) GenericKinesisEgressSerializer.class; + } +} diff --git a/statefun-flink/statefun-flink-io-bundle/src/main/java/org/apache/flink/statefun/flink/io/kinesis/polyglot/KinesisEgressSpecJsonParser.java b/statefun-flink/statefun-flink-io-bundle/src/main/java/org/apache/flink/statefun/flink/io/kinesis/polyglot/KinesisEgressSpecJsonParser.java new file mode 100644 index 000000000..594623c20 --- /dev/null +++ b/statefun-flink/statefun-flink-io-bundle/src/main/java/org/apache/flink/statefun/flink/io/kinesis/polyglot/KinesisEgressSpecJsonParser.java @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.statefun.flink.io.kinesis.polyglot; + +import java.util.Map; +import java.util.OptionalInt; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonPointer; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode; +import org.apache.flink.statefun.flink.common.json.Selectors; + +final class KinesisEgressSpecJsonParser { + + private KinesisEgressSpecJsonParser() {} + + private static final JsonPointer MAX_OUTSTANDING_RECORDS_POINTER = + JsonPointer.compile("/maxOutstandingRecords"); + private static final JsonPointer CLIENT_CONFIG_PROPS_POINTER = + JsonPointer.compile("/clientConfigProperties"); + + static OptionalInt optionalMaxOutstandingRecords(JsonNode ingressSpecNode) { + return Selectors.optionalIntegerAt(ingressSpecNode, MAX_OUTSTANDING_RECORDS_POINTER); + } + + static Map clientConfigProperties(JsonNode ingressSpecNode) { + return Selectors.propertiesAt(ingressSpecNode, CLIENT_CONFIG_PROPS_POINTER); + } +} diff --git a/statefun-flink/statefun-flink-io-bundle/src/main/java/org/apache/flink/statefun/flink/io/kinesis/polyglot/KinesisIngressSpecJsonParser.java b/statefun-flink/statefun-flink-io-bundle/src/main/java/org/apache/flink/statefun/flink/io/kinesis/polyglot/KinesisIngressSpecJsonParser.java new file mode 100644 index 000000000..823cdfb35 --- /dev/null +++ b/statefun-flink/statefun-flink-io-bundle/src/main/java/org/apache/flink/statefun/flink/io/kinesis/polyglot/KinesisIngressSpecJsonParser.java @@ -0,0 +1,146 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.statefun.flink.io.kinesis.polyglot; + +import java.time.ZonedDateTime; +import java.time.format.DateTimeFormatter; +import java.time.format.DateTimeParseException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonPointer; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode; +import org.apache.flink.statefun.flink.common.json.NamespaceNamePair; +import org.apache.flink.statefun.flink.common.json.Selectors; +import org.apache.flink.statefun.flink.io.generated.RoutingConfig; +import org.apache.flink.statefun.flink.io.generated.TargetFunctionType; +import org.apache.flink.statefun.sdk.kinesis.ingress.KinesisIngressStartupPosition; + +final class KinesisIngressSpecJsonParser { + + private KinesisIngressSpecJsonParser() {} + + private static final JsonPointer STREAMS_POINTER = JsonPointer.compile("/streams"); + private static final JsonPointer STARTUP_POSITION_POINTER = + JsonPointer.compile("/startupPosition"); + private static final JsonPointer CLIENT_CONFIG_PROPS_POINTER = + JsonPointer.compile("/clientConfigProperties"); + + private static final class Streams { + private static final JsonPointer NAME_POINTER = JsonPointer.compile("/stream"); + private static final JsonPointer TYPE_URL_POINTER = JsonPointer.compile("/typeUrl"); + private static final JsonPointer TARGETS_POINTER = JsonPointer.compile("/targets"); + } + + private static final class StartupPosition { + private static final String EARLIEST_TYPE = "earliest"; + private static final String LATEST_TYPE = "latest"; + private static final String DATE_TYPE = "date"; + + private static final String DATE_PATTERN = "yyyy-MM-dd HH:mm:ss.SSS Z"; + private static final DateTimeFormatter DATE_FORMATTER = + DateTimeFormatter.ofPattern(DATE_PATTERN); + + private static final JsonPointer TYPE_POINTER = JsonPointer.compile("/type"); + private static final JsonPointer DATE_POINTER = JsonPointer.compile("/date"); + } + + static Optional optionalStartupPosition(JsonNode ingressSpecNode) { + final JsonNode startupPositionSpecNode = ingressSpecNode.at(STARTUP_POSITION_POINTER); + if (startupPositionSpecNode.isMissingNode()) { + return Optional.empty(); + } + + final String type = Selectors.textAt(startupPositionSpecNode, StartupPosition.TYPE_POINTER); + switch (type) { + case StartupPosition.EARLIEST_TYPE: + return Optional.of(KinesisIngressStartupPosition.fromEarliest()); + case StartupPosition.LATEST_TYPE: + return Optional.of(KinesisIngressStartupPosition.fromLatest()); + case StartupPosition.DATE_TYPE: + return Optional.of( + KinesisIngressStartupPosition.fromDate(startupDate(startupPositionSpecNode))); + default: + final List validValues = + Arrays.asList( + StartupPosition.EARLIEST_TYPE, + StartupPosition.LATEST_TYPE, + StartupPosition.DATE_TYPE); + throw new IllegalArgumentException( + "Invalid startup position type: " + + type + + "; valid values are [" + + String.join(", ", validValues) + + "]"); + } + } + + static Map clientConfigProperties(JsonNode ingressSpecNode) { + return Selectors.propertiesAt(ingressSpecNode, CLIENT_CONFIG_PROPS_POINTER); + } + + static Map routableStreams(JsonNode ingressSpecNode) { + Map routableStreams = new HashMap<>(); + for (JsonNode routableStreamNode : Selectors.listAt(ingressSpecNode, STREAMS_POINTER)) { + final String streamName = Selectors.textAt(routableStreamNode, Streams.NAME_POINTER); + final String typeUrl = Selectors.textAt(routableStreamNode, Streams.TYPE_URL_POINTER); + final List targets = parseRoutableTargetFunctionTypes(routableStreamNode); + + routableStreams.put( + streamName, + RoutingConfig.newBuilder() + .setTypeUrl(typeUrl) + .addAllTargetFunctionTypes(targets) + .build()); + } + return routableStreams; + } + + private static List parseRoutableTargetFunctionTypes( + JsonNode routableStreamNode) { + final List targets = new ArrayList<>(); + for (String namespaceAndName : + Selectors.textListAt(routableStreamNode, Streams.TARGETS_POINTER)) { + NamespaceNamePair namespaceNamePair = NamespaceNamePair.from(namespaceAndName); + targets.add( + TargetFunctionType.newBuilder() + .setNamespace(namespaceNamePair.namespace()) + .setType(namespaceNamePair.name()) + .build()); + } + return targets; + } + + private static ZonedDateTime startupDate(JsonNode startupPositionSpecNode) { + final String dateStr = Selectors.textAt(startupPositionSpecNode, StartupPosition.DATE_POINTER); + try { + return ZonedDateTime.parse(dateStr, StartupPosition.DATE_FORMATTER); + } catch (DateTimeParseException e) { + throw new IllegalArgumentException( + "Unable to parse date string for startup position: " + + dateStr + + "; the date should conform to the pattern " + + StartupPosition.DATE_PATTERN, + e); + } + } +} diff --git a/statefun-flink/statefun-flink-io-bundle/src/main/java/org/apache/flink/statefun/flink/io/kinesis/polyglot/RoutableProtobufKinesisIngressDeserializer.java b/statefun-flink/statefun-flink-io-bundle/src/main/java/org/apache/flink/statefun/flink/io/kinesis/polyglot/RoutableProtobufKinesisIngressDeserializer.java new file mode 100644 index 000000000..6b098f3b6 --- /dev/null +++ b/statefun-flink/statefun-flink-io-bundle/src/main/java/org/apache/flink/statefun/flink/io/kinesis/polyglot/RoutableProtobufKinesisIngressDeserializer.java @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.statefun.flink.io.kinesis.polyglot; + +import com.google.protobuf.ByteString; +import com.google.protobuf.Message; +import java.util.Map; +import org.apache.flink.statefun.flink.io.generated.AutoRoutable; +import org.apache.flink.statefun.flink.io.generated.RoutingConfig; +import org.apache.flink.statefun.sdk.kinesis.ingress.IngressRecord; +import org.apache.flink.statefun.sdk.kinesis.ingress.KinesisIngressDeserializer; + +public final class RoutableProtobufKinesisIngressDeserializer + implements KinesisIngressDeserializer { + + private static final long serialVersionUID = 1L; + + private final Map routingConfigs; + + RoutableProtobufKinesisIngressDeserializer(Map routingConfigs) { + if (routingConfigs == null || routingConfigs.isEmpty()) { + throw new IllegalArgumentException( + "Routing config for routable Kafka ingress cannot be empty."); + } + this.routingConfigs = routingConfigs; + } + + @Override + public Message deserialize(IngressRecord ingressRecord) { + final String stream = ingressRecord.getStream(); + + final RoutingConfig routingConfig = routingConfigs.get(stream); + if (routingConfig == null) { + throw new IllegalStateException( + "Consumed a record from stream [" + stream + "], but no routing config was specified."); + } + + return AutoRoutable.newBuilder() + .setConfig(routingConfig) + .setId(ingressRecord.getPartitionKey()) + .setPayloadBytes(ByteString.copyFrom(ingressRecord.getData())) + .build(); + } +} diff --git a/statefun-flink/statefun-flink-io-bundle/src/main/java/org/apache/flink/statefun/flink/io/kinesis/polyglot/RoutableProtobufKinesisSourceProvider.java b/statefun-flink/statefun-flink-io-bundle/src/main/java/org/apache/flink/statefun/flink/io/kinesis/polyglot/RoutableProtobufKinesisSourceProvider.java new file mode 100644 index 000000000..99776a318 --- /dev/null +++ b/statefun-flink/statefun-flink-io-bundle/src/main/java/org/apache/flink/statefun/flink/io/kinesis/polyglot/RoutableProtobufKinesisSourceProvider.java @@ -0,0 +1,98 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.statefun.flink.io.kinesis.polyglot; + +import static org.apache.flink.statefun.flink.io.kinesis.polyglot.AwsAuthSpecJsonParser.optionalAwsCredentials; +import static org.apache.flink.statefun.flink.io.kinesis.polyglot.AwsAuthSpecJsonParser.optionalAwsRegion; +import static org.apache.flink.statefun.flink.io.kinesis.polyglot.KinesisIngressSpecJsonParser.clientConfigProperties; +import static org.apache.flink.statefun.flink.io.kinesis.polyglot.KinesisIngressSpecJsonParser.optionalStartupPosition; +import static org.apache.flink.statefun.flink.io.kinesis.polyglot.KinesisIngressSpecJsonParser.routableStreams; + +import com.google.protobuf.Message; +import java.util.ArrayList; +import java.util.Map; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode; +import org.apache.flink.statefun.flink.io.generated.RoutingConfig; +import org.apache.flink.statefun.flink.io.kinesis.KinesisSourceProvider; +import org.apache.flink.statefun.flink.io.spi.JsonIngressSpec; +import org.apache.flink.statefun.flink.io.spi.SourceProvider; +import org.apache.flink.statefun.sdk.io.IngressIdentifier; +import org.apache.flink.statefun.sdk.io.IngressSpec; +import org.apache.flink.statefun.sdk.kinesis.ingress.KinesisIngressBuilder; +import org.apache.flink.statefun.sdk.kinesis.ingress.KinesisIngressBuilderApiExtension; +import org.apache.flink.statefun.sdk.kinesis.ingress.KinesisIngressDeserializer; +import org.apache.flink.statefun.sdk.kinesis.ingress.KinesisIngressSpec; +import org.apache.flink.streaming.api.functions.source.SourceFunction; + +public final class RoutableProtobufKinesisSourceProvider implements SourceProvider { + + private final KinesisSourceProvider delegateProvider = new KinesisSourceProvider(); + + @Override + public SourceFunction forSpec(IngressSpec spec) { + final KinesisIngressSpec kinesisIngressSpec = asKinesisIngressSpec(spec); + return delegateProvider.forSpec(kinesisIngressSpec); + } + + private static KinesisIngressSpec asKinesisIngressSpec(IngressSpec spec) { + if (!(spec instanceof JsonIngressSpec)) { + throw new IllegalArgumentException("Wrong type " + spec.type()); + } + JsonIngressSpec casted = (JsonIngressSpec) spec; + + IngressIdentifier id = casted.id(); + Class producedType = casted.id().producedType(); + if (!Message.class.isAssignableFrom(producedType)) { + throw new IllegalArgumentException( + "ProtocolBuffer based Kinesis ingress is only able to produce types that derive from " + + Message.class.getName() + + " but " + + producedType.getName() + + " is provided."); + } + + JsonNode specJson = casted.specJson(); + + KinesisIngressBuilder kinesisIngressBuilder = KinesisIngressBuilder.forIdentifier(id); + + optionalAwsRegion(specJson).ifPresent(kinesisIngressBuilder::withAwsRegion); + optionalAwsCredentials(specJson).ifPresent(kinesisIngressBuilder::withAwsCredentials); + optionalStartupPosition(specJson).ifPresent(kinesisIngressBuilder::withStartupPosition); + clientConfigProperties(specJson) + .entrySet() + .forEach( + entry -> + kinesisIngressBuilder.withClientConfigurationProperty( + entry.getKey(), entry.getValue())); + + Map routableStreams = routableStreams(specJson); + KinesisIngressBuilderApiExtension.withDeserializer( + kinesisIngressBuilder, deserializer(routableStreams)); + kinesisIngressBuilder.withStreams(new ArrayList<>(routableStreams.keySet())); + + return kinesisIngressBuilder.build(); + } + + @SuppressWarnings("unchecked") + private static KinesisIngressDeserializer deserializer( + Map routingConfig) { + // this cast is safe since we've already checked that T is a Message + return (KinesisIngressDeserializer) + new RoutableProtobufKinesisIngressDeserializer(routingConfig); + } +} diff --git a/statefun-flink/statefun-flink-io-bundle/src/main/java/org/apache/flink/statefun/sdk/kinesis/ingress/KinesisIngressBuilderApiExtension.java b/statefun-flink/statefun-flink-io-bundle/src/main/java/org/apache/flink/statefun/sdk/kinesis/ingress/KinesisIngressBuilderApiExtension.java new file mode 100644 index 000000000..bfa7ef90a --- /dev/null +++ b/statefun-flink/statefun-flink-io-bundle/src/main/java/org/apache/flink/statefun/sdk/kinesis/ingress/KinesisIngressBuilderApiExtension.java @@ -0,0 +1,28 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.statefun.sdk.kinesis.ingress; + +public final class KinesisIngressBuilderApiExtension { + + private KinesisIngressBuilderApiExtension() {} + + public static void withDeserializer( + KinesisIngressBuilder kinesisIngressBuilder, KinesisIngressDeserializer deserializer) { + kinesisIngressBuilder.withDeserializer(deserializer); + } +} diff --git a/statefun-flink/statefun-flink-io-bundle/src/test/java/org/apache/flink/statefun/flink/io/kinesis/GenericKinesisSinkProviderTest.java b/statefun-flink/statefun-flink-io-bundle/src/test/java/org/apache/flink/statefun/flink/io/kinesis/GenericKinesisSinkProviderTest.java new file mode 100644 index 000000000..2a6b19b7b --- /dev/null +++ b/statefun-flink/statefun-flink-io-bundle/src/test/java/org/apache/flink/statefun/flink/io/kinesis/GenericKinesisSinkProviderTest.java @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.statefun.flink.io.kinesis; + +import static org.apache.flink.statefun.flink.io.testutils.YamlUtils.loadAsJsonFromClassResource; +import static org.hamcrest.CoreMatchers.instanceOf; +import static org.hamcrest.MatcherAssert.assertThat; + +import com.google.protobuf.Any; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode; +import org.apache.flink.statefun.flink.io.kinesis.polyglot.GenericKinesisSinkProvider; +import org.apache.flink.statefun.flink.io.spi.JsonEgressSpec; +import org.apache.flink.statefun.sdk.io.EgressIdentifier; +import org.apache.flink.streaming.api.functions.sink.SinkFunction; +import org.apache.flink.streaming.connectors.kinesis.FlinkKinesisProducer; +import org.junit.Test; + +public class GenericKinesisSinkProviderTest { + + @Test + public void exampleUsage() { + JsonNode egressDefinition = + loadAsJsonFromClassResource(getClass().getClassLoader(), "generic-kinesis-egress.yaml"); + JsonEgressSpec spec = + new JsonEgressSpec<>( + PolyglotKinesisIOTypes.GENERIC_KINESIS_EGRESS_TYPE, + new EgressIdentifier<>("foo", "bar", Any.class), + egressDefinition); + + GenericKinesisSinkProvider provider = new GenericKinesisSinkProvider(); + SinkFunction source = provider.forSpec(spec); + + assertThat(source, instanceOf(FlinkKinesisProducer.class)); + } +} diff --git a/statefun-flink/statefun-flink-io-bundle/src/test/java/org/apache/flink/statefun/flink/io/kinesis/RoutableProtobufKinesisSourceProviderTest.java b/statefun-flink/statefun-flink-io-bundle/src/test/java/org/apache/flink/statefun/flink/io/kinesis/RoutableProtobufKinesisSourceProviderTest.java new file mode 100644 index 000000000..81e6ff136 --- /dev/null +++ b/statefun-flink/statefun-flink-io-bundle/src/test/java/org/apache/flink/statefun/flink/io/kinesis/RoutableProtobufKinesisSourceProviderTest.java @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.statefun.flink.io.kinesis; + +import static org.apache.flink.statefun.flink.io.testutils.YamlUtils.loadAsJsonFromClassResource; +import static org.hamcrest.CoreMatchers.instanceOf; +import static org.hamcrest.MatcherAssert.assertThat; + +import com.google.protobuf.Message; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode; +import org.apache.flink.statefun.flink.io.kinesis.polyglot.RoutableProtobufKinesisSourceProvider; +import org.apache.flink.statefun.flink.io.spi.JsonIngressSpec; +import org.apache.flink.statefun.sdk.io.IngressIdentifier; +import org.apache.flink.streaming.api.functions.source.SourceFunction; +import org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer; +import org.junit.Test; + +public class RoutableProtobufKinesisSourceProviderTest { + + @Test + public void exampleUsage() { + JsonNode ingressDefinition = + loadAsJsonFromClassResource( + getClass().getClassLoader(), "routable-protobuf-kinesis-ingress.yaml"); + JsonIngressSpec spec = + new JsonIngressSpec<>( + PolyglotKinesisIOTypes.ROUTABLE_PROTOBUF_KINESIS_INGRESS_TYPE, + new IngressIdentifier<>(Message.class, "foo", "bar"), + ingressDefinition); + + RoutableProtobufKinesisSourceProvider provider = new RoutableProtobufKinesisSourceProvider(); + SourceFunction source = provider.forSpec(spec); + + assertThat(source, instanceOf(FlinkKinesisConsumer.class)); + } +} diff --git a/statefun-flink/statefun-flink-io-bundle/src/test/resources/generic-kinesis-egress.yaml b/statefun-flink/statefun-flink-io-bundle/src/test/resources/generic-kinesis-egress.yaml new file mode 100644 index 000000000..902822d1e --- /dev/null +++ b/statefun-flink/statefun-flink-io-bundle/src/test/resources/generic-kinesis-egress.yaml @@ -0,0 +1,32 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +egress: + meta: + type: statefun.kinesis.io/generic-egress + id: com.mycomp.foo/bar + spec: + awsRegion: + type: custom-endpoint + endpoint: https://localhost:4567 + id: us-west-1 + awsCredentials: + type: profile + profileName: john-doe + profilePath: /path/to/profile/config + maxOutstandingRecords: 9999 + clientConfigProperties: + - ThreadingModel: POOLED + - ThreadPoolSize: 10 diff --git a/statefun-flink/statefun-flink-io-bundle/src/test/resources/routable-protobuf-kinesis-ingress.yaml b/statefun-flink/statefun-flink-io-bundle/src/test/resources/routable-protobuf-kinesis-ingress.yaml new file mode 100644 index 000000000..95616ee65 --- /dev/null +++ b/statefun-flink/statefun-flink-io-bundle/src/test/resources/routable-protobuf-kinesis-ingress.yaml @@ -0,0 +1,42 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +ingress: + meta: + type: statefun.kinesis.io/routable-protobuf-ingress + id: com.mycomp.foo/bar + spec: + awsRegion: + type: specific + id: us-west-2 + awsCredentials: + type: basic + accessKeyId: my_access_key_id + secretAccessKey: my_secret_access_key + startupPosition: + type: earliest + streams: + - stream: stream-1 + typeUrl: com.googleapis/com.mycomp.foo.MessageA + targets: + - com.mycomp.foo/function-1 + - com.mycomp.foo/function-2 + - stream: topic-2 + typeUrl: com.googleapis/com.mycomp.foo.MessageB + targets: + - com.mycomp.foo/function-2 + clientConfigProperties: + - SocketTimeout: 9999 + - MaxConnections: 15 diff --git a/statefun-flink/statefun-flink-io/src/main/java/org/apache/flink/statefun/flink/io/kinesis/PolyglotKinesisIOTypes.java b/statefun-flink/statefun-flink-io/src/main/java/org/apache/flink/statefun/flink/io/kinesis/PolyglotKinesisIOTypes.java new file mode 100644 index 000000000..252e59dad --- /dev/null +++ b/statefun-flink/statefun-flink-io/src/main/java/org/apache/flink/statefun/flink/io/kinesis/PolyglotKinesisIOTypes.java @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.statefun.flink.io.kinesis; + +import org.apache.flink.statefun.sdk.EgressType; +import org.apache.flink.statefun.sdk.IngressType; + +public final class PolyglotKinesisIOTypes { + + private PolyglotKinesisIOTypes() {} + + public static final IngressType ROUTABLE_PROTOBUF_KINESIS_INGRESS_TYPE = + new IngressType("statefun.kinesis.io", "routable-protobuf-ingress"); + + public static final EgressType GENERIC_KINESIS_EGRESS_TYPE = + new EgressType("statefun.kinesis.io", "generic-egress"); +} diff --git a/statefun-flink/statefun-flink-io/src/main/java/org/apache/flink/statefun/flink/io/spi/JsonEgressSpec.java b/statefun-flink/statefun-flink-io/src/main/java/org/apache/flink/statefun/flink/io/spi/JsonEgressSpec.java index 76ca1937e..58688407d 100644 --- a/statefun-flink/statefun-flink-io/src/main/java/org/apache/flink/statefun/flink/io/spi/JsonEgressSpec.java +++ b/statefun-flink/statefun-flink-io/src/main/java/org/apache/flink/statefun/flink/io/spi/JsonEgressSpec.java @@ -19,12 +19,16 @@ package org.apache.flink.statefun.flink.io.spi; import java.util.Objects; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonPointer; import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode; import org.apache.flink.statefun.sdk.EgressType; import org.apache.flink.statefun.sdk.io.EgressIdentifier; import org.apache.flink.statefun.sdk.io.EgressSpec; public final class JsonEgressSpec implements EgressSpec { + + private static final JsonPointer SPEC_POINTER = JsonPointer.compile("/egress/spec"); + private final JsonNode json; private final EgressIdentifier id; private final EgressType type; @@ -48,4 +52,8 @@ public EgressIdentifier id() { public JsonNode json() { return json; } + + public JsonNode specJson() { + return json.requiredAt(SPEC_POINTER); + } } diff --git a/statefun-flink/statefun-flink-io/src/main/java/org/apache/flink/statefun/flink/io/spi/JsonIngressSpec.java b/statefun-flink/statefun-flink-io/src/main/java/org/apache/flink/statefun/flink/io/spi/JsonIngressSpec.java index f46191e69..43cbcdf7c 100644 --- a/statefun-flink/statefun-flink-io/src/main/java/org/apache/flink/statefun/flink/io/spi/JsonIngressSpec.java +++ b/statefun-flink/statefun-flink-io/src/main/java/org/apache/flink/statefun/flink/io/spi/JsonIngressSpec.java @@ -18,12 +18,16 @@ package org.apache.flink.statefun.flink.io.spi; import java.util.Objects; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonPointer; import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode; import org.apache.flink.statefun.sdk.IngressType; import org.apache.flink.statefun.sdk.io.IngressIdentifier; import org.apache.flink.statefun.sdk.io.IngressSpec; public final class JsonIngressSpec implements IngressSpec { + + private static final JsonPointer SPEC_POINTER = JsonPointer.compile("/ingress/spec"); + private final JsonNode json; private final IngressIdentifier id; private final IngressType type; @@ -47,4 +51,8 @@ public IngressIdentifier id() { public JsonNode json() { return json; } + + public JsonNode specJson() { + return json.requiredAt(SPEC_POINTER); + } } diff --git a/statefun-flink/statefun-flink-io/src/main/protobuf/kinesis-egress.proto b/statefun-flink/statefun-flink-io/src/main/protobuf/kinesis-egress.proto new file mode 100644 index 000000000..68c92c0d1 --- /dev/null +++ b/statefun-flink/statefun-flink-io/src/main/protobuf/kinesis-egress.proto @@ -0,0 +1,30 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +syntax = "proto3"; + +package org.apache.flink.statefun.flink.io; +option java_package = "org.apache.flink.statefun.flink.io.generated"; +option java_multiple_files = true; + +message KinesisEgressRecord { + string partition_key = 1; + bytes value_bytes = 2; + string stream = 3; + string explicit_hash_key = 4; +} diff --git a/statefun-kafka-io/src/main/java/org/apache/flink/statefun/sdk/kafka/KafkaIngressBuilder.java b/statefun-kafka-io/src/main/java/org/apache/flink/statefun/sdk/kafka/KafkaIngressBuilder.java index d4329591e..cb16d5c29 100644 --- a/statefun-kafka-io/src/main/java/org/apache/flink/statefun/sdk/kafka/KafkaIngressBuilder.java +++ b/statefun-kafka-io/src/main/java/org/apache/flink/statefun/sdk/kafka/KafkaIngressBuilder.java @@ -156,7 +156,7 @@ private Properties resolveKafkaProperties() { return resultProps; } - private static T instantiateDeserializer( + private static > T instantiateDeserializer( Class deserializerClass) { try { Constructor defaultConstructor = deserializerClass.getDeclaredConstructor(); diff --git a/statefun-kinesis-io/src/main/java/org/apache/flink/statefun/sdk/kinesis/ingress/KinesisIngressBuilder.java b/statefun-kinesis-io/src/main/java/org/apache/flink/statefun/sdk/kinesis/ingress/KinesisIngressBuilder.java index 2d71ae6cb..a25d23aa7 100644 --- a/statefun-kinesis-io/src/main/java/org/apache/flink/statefun/sdk/kinesis/ingress/KinesisIngressBuilder.java +++ b/statefun-kinesis-io/src/main/java/org/apache/flink/statefun/sdk/kinesis/ingress/KinesisIngressBuilder.java @@ -17,10 +17,13 @@ */ package org.apache.flink.statefun.sdk.kinesis.ingress; +import java.lang.reflect.Constructor; +import java.lang.reflect.InvocationTargetException; import java.util.ArrayList; import java.util.List; import java.util.Objects; import java.util.Properties; +import org.apache.flink.statefun.sdk.annotations.ForRuntime; import org.apache.flink.statefun.sdk.io.IngressIdentifier; import org.apache.flink.statefun.sdk.io.IngressSpec; import org.apache.flink.statefun.sdk.kinesis.auth.AwsCredentials; @@ -36,7 +39,7 @@ public final class KinesisIngressBuilder { private final IngressIdentifier id; private final List streams = new ArrayList<>(); - private Class> deserializerClass; + private KinesisIngressDeserializer deserializer; private KinesisIngressStartupPosition startupPosition = KinesisIngressStartupPosition.fromLatest(); private AwsRegion awsRegion = AwsRegion.fromDefaultProviderChain(); @@ -74,7 +77,8 @@ public KinesisIngressBuilder withStreams(List streams) { */ public KinesisIngressBuilder withDeserializer( Class> deserializerClass) { - this.deserializerClass = Objects.requireNonNull(deserializerClass); + Objects.requireNonNull(deserializerClass); + this.deserializer = instantiateDeserializer(deserializerClass); return this; } @@ -159,10 +163,42 @@ public KinesisIngressSpec build() { return new KinesisIngressSpec<>( id, streams, - deserializerClass, + deserializer, startupPosition, awsRegion, awsCredentials, clientConfigurationProperties); } + + // ======================================================================================== + // Methods for runtime usage + // ======================================================================================== + + @ForRuntime + KinesisIngressBuilder withDeserializer(KinesisIngressDeserializer deserializer) { + this.deserializer = Objects.requireNonNull(deserializer); + return this; + } + + // ======================================================================================== + // Utility methods + // ======================================================================================== + + private static > T instantiateDeserializer( + Class deserializerClass) { + try { + Constructor defaultConstructor = deserializerClass.getDeclaredConstructor(); + defaultConstructor.setAccessible(true); + return defaultConstructor.newInstance(); + } catch (NoSuchMethodException e) { + throw new IllegalStateException( + "Unable to create an instance of deserializer " + + deserializerClass.getName() + + "; has no default constructor", + e); + } catch (IllegalAccessException | InstantiationException | InvocationTargetException e) { + throw new IllegalStateException( + "Unable to create an instance of deserializer " + deserializerClass.getName(), e); + } + } } diff --git a/statefun-kinesis-io/src/main/java/org/apache/flink/statefun/sdk/kinesis/ingress/KinesisIngressSpec.java b/statefun-kinesis-io/src/main/java/org/apache/flink/statefun/sdk/kinesis/ingress/KinesisIngressSpec.java index 9bc4a9ff3..a6da2c57c 100644 --- a/statefun-kinesis-io/src/main/java/org/apache/flink/statefun/sdk/kinesis/ingress/KinesisIngressSpec.java +++ b/statefun-kinesis-io/src/main/java/org/apache/flink/statefun/sdk/kinesis/ingress/KinesisIngressSpec.java @@ -30,7 +30,7 @@ public final class KinesisIngressSpec implements IngressSpec { private final IngressIdentifier ingressIdentifier; private final List streams; - private final Class> deserializerClass; + private final KinesisIngressDeserializer deserializer; private final KinesisIngressStartupPosition startupPosition; private final AwsRegion awsRegion; private final AwsCredentials awsCredentials; @@ -39,13 +39,13 @@ public final class KinesisIngressSpec implements IngressSpec { KinesisIngressSpec( IngressIdentifier ingressIdentifier, List streams, - Class> deserializerClass, + KinesisIngressDeserializer deserializer, KinesisIngressStartupPosition startupPosition, AwsRegion awsRegion, AwsCredentials awsCredentials, Properties clientConfigurationProperties) { this.ingressIdentifier = Objects.requireNonNull(ingressIdentifier, "ingress identifier"); - this.deserializerClass = Objects.requireNonNull(deserializerClass, "deserializer class"); + this.deserializer = Objects.requireNonNull(deserializer, "deserializer"); this.startupPosition = Objects.requireNonNull(startupPosition, "startup position"); this.awsRegion = Objects.requireNonNull(awsRegion, "AWS region configuration"); this.awsCredentials = Objects.requireNonNull(awsCredentials, "AWS credentials configuration"); @@ -72,8 +72,8 @@ public List streams() { return streams; } - public Class> deserializerClass() { - return deserializerClass; + public KinesisIngressDeserializer deserializer() { + return deserializer; } public KinesisIngressStartupPosition startupPosition() { diff --git a/statefun-kinesis-io/src/test/java/org/apache/flink/statefun/sdk/kinesis/KinesisIngressBuilderTest.java b/statefun-kinesis-io/src/test/java/org/apache/flink/statefun/sdk/kinesis/KinesisIngressBuilderTest.java index bd19b4858..0bc4cb24b 100644 --- a/statefun-kinesis-io/src/test/java/org/apache/flink/statefun/sdk/kinesis/KinesisIngressBuilderTest.java +++ b/statefun-kinesis-io/src/test/java/org/apache/flink/statefun/sdk/kinesis/KinesisIngressBuilderTest.java @@ -17,8 +17,8 @@ */ package org.apache.flink.statefun.sdk.kinesis; +import static org.hamcrest.CoreMatchers.instanceOf; import static org.hamcrest.CoreMatchers.is; -import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertThat; import static org.junit.Assert.assertTrue; @@ -49,7 +49,7 @@ public void exampleUsage() { assertThat(kinesisIngressSpec.streams(), is(Collections.singletonList(STREAM_NAME))); assertTrue(kinesisIngressSpec.awsRegion().isDefault()); assertTrue(kinesisIngressSpec.awsCredentials().isDefault()); - assertEquals(TestDeserializer.class, kinesisIngressSpec.deserializerClass()); + assertThat(kinesisIngressSpec.deserializer(), instanceOf(TestDeserializer.class)); assertTrue(kinesisIngressSpec.startupPosition().isLatest()); assertTrue(kinesisIngressSpec.clientConfigurationProperties().isEmpty()); }