Skip to content

Commit ba46048

Browse files
committed
[FLINK-16124] [kinesis] Implement runtime GenericKinesisSinkProvider
1 parent a8d4b59 commit ba46048

File tree

7 files changed

+276
-1
lines changed

7 files changed

+276
-1
lines changed

statefun-flink/statefun-flink-io-bundle/src/main/java/org/apache/flink/statefun/flink/io/kinesis/KinesisSinkProvider.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@
2626
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
2727
import org.apache.flink.streaming.connectors.kinesis.FlinkKinesisProducer;
2828

29-
final class KinesisSinkProvider implements SinkProvider {
29+
public final class KinesisSinkProvider implements SinkProvider {
3030

3131
@Override
3232
public <T> SinkFunction<T> forSpec(EgressSpec<T> spec) {
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,55 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.flink.statefun.flink.io.kinesis.polyglot;
20+
21+
import com.google.protobuf.Any;
22+
import com.google.protobuf.InvalidProtocolBufferException;
23+
import org.apache.flink.statefun.flink.io.generated.KinesisEgressRecord;
24+
import org.apache.flink.statefun.sdk.kinesis.egress.EgressRecord;
25+
import org.apache.flink.statefun.sdk.kinesis.egress.KinesisEgressSerializer;
26+
27+
public final class GenericKinesisEgressSerializer implements KinesisEgressSerializer<Any> {
28+
29+
private static final long serialVersionUID = 1L;
30+
31+
@Override
32+
public EgressRecord serialize(Any value) {
33+
final KinesisEgressRecord kinesisEgressRecord = asKinesisEgressRecord(value);
34+
return EgressRecord.newBuilder()
35+
.withData(kinesisEgressRecord.getValueBytes().toByteArray())
36+
.withStream(kinesisEgressRecord.getStream())
37+
.withPartitionKey(kinesisEgressRecord.getPartitionKey())
38+
.withExplicitHashKey(kinesisEgressRecord.getExplicitHashKey())
39+
.build();
40+
}
41+
42+
private static KinesisEgressRecord asKinesisEgressRecord(Any message) {
43+
if (!message.is(KinesisEgressRecord.class)) {
44+
throw new IllegalStateException(
45+
"The generic Kinesis egress expects only messages of type "
46+
+ KinesisEgressRecord.class.getName());
47+
}
48+
try {
49+
return message.unpack(KinesisEgressRecord.class);
50+
} catch (InvalidProtocolBufferException e) {
51+
throw new RuntimeException(
52+
"Unable to unpack message as a " + KinesisEgressRecord.class.getName(), e);
53+
}
54+
}
55+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,91 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
package org.apache.flink.statefun.flink.io.kinesis.polyglot;
19+
20+
import static org.apache.flink.statefun.flink.io.kinesis.polyglot.AwsAuthSpecJsonParser.optionalAwsCredentials;
21+
import static org.apache.flink.statefun.flink.io.kinesis.polyglot.AwsAuthSpecJsonParser.optionalAwsRegion;
22+
import static org.apache.flink.statefun.flink.io.kinesis.polyglot.KinesisEgressSpecJsonParser.optionalMaxOutstandingRecords;
23+
24+
import com.google.protobuf.Any;
25+
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
26+
import org.apache.flink.statefun.flink.io.kinesis.KinesisSinkProvider;
27+
import org.apache.flink.statefun.flink.io.spi.JsonEgressSpec;
28+
import org.apache.flink.statefun.flink.io.spi.SinkProvider;
29+
import org.apache.flink.statefun.sdk.io.EgressIdentifier;
30+
import org.apache.flink.statefun.sdk.io.EgressSpec;
31+
import org.apache.flink.statefun.sdk.kinesis.egress.KinesisEgressBuilder;
32+
import org.apache.flink.statefun.sdk.kinesis.egress.KinesisEgressSpec;
33+
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
34+
35+
public final class GenericKinesisSinkProvider implements SinkProvider {
36+
37+
private final KinesisSinkProvider delegateProvider = new KinesisSinkProvider();
38+
39+
@Override
40+
public <T> SinkFunction<T> forSpec(EgressSpec<T> spec) {
41+
final KinesisEgressSpec<T> kinesisEgressSpec = asKinesisEgressSpec(spec);
42+
return delegateProvider.forSpec(kinesisEgressSpec);
43+
}
44+
45+
private static <T> KinesisEgressSpec<T> asKinesisEgressSpec(EgressSpec<T> spec) {
46+
if (!(spec instanceof JsonEgressSpec)) {
47+
throw new IllegalArgumentException("Wrong type " + spec.type());
48+
}
49+
JsonEgressSpec<T> casted = (JsonEgressSpec<T>) spec;
50+
51+
EgressIdentifier<T> id = casted.id();
52+
validateConsumedType(id);
53+
54+
JsonNode specJson = casted.specJson();
55+
56+
KinesisEgressBuilder<T> kinesisEgressBuilder = KinesisEgressBuilder.forIdentifier(id);
57+
58+
optionalAwsRegion(specJson).ifPresent(kinesisEgressBuilder::withAwsRegion);
59+
optionalAwsCredentials(specJson).ifPresent(kinesisEgressBuilder::withAwsCredentials);
60+
optionalMaxOutstandingRecords(specJson)
61+
.ifPresent(kinesisEgressBuilder::withMaxOutstandingRecords);
62+
KinesisIngressSpecJsonParser.clientConfigProperties(specJson)
63+
.entrySet()
64+
.forEach(
65+
entry ->
66+
kinesisEgressBuilder.withClientConfigurationProperty(
67+
entry.getKey(), entry.getValue()));
68+
69+
kinesisEgressBuilder.withSerializer(serializerClass());
70+
71+
return kinesisEgressBuilder.build();
72+
}
73+
74+
private static void validateConsumedType(EgressIdentifier<?> id) {
75+
Class<?> consumedType = id.consumedType();
76+
if (Any.class != consumedType) {
77+
throw new IllegalArgumentException(
78+
"Generic Kinesis egress is only able to consume messages types of "
79+
+ Any.class.getName()
80+
+ " but "
81+
+ consumedType.getName()
82+
+ " is provided.");
83+
}
84+
}
85+
86+
@SuppressWarnings("unchecked")
87+
private static <T> Class<T> serializerClass() {
88+
// this cast is safe, because we've already validated that the consumed type is Any.
89+
return (Class<T>) GenericKinesisEgressSerializer.class;
90+
}
91+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,43 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.flink.statefun.flink.io.kinesis.polyglot;
20+
21+
import java.util.Map;
22+
import java.util.OptionalInt;
23+
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonPointer;
24+
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
25+
import org.apache.flink.statefun.flink.common.json.Selectors;
26+
27+
final class KinesisEgressSpecJsonParser {
28+
29+
private KinesisEgressSpecJsonParser() {}
30+
31+
private static final JsonPointer MAX_OUTSTANDING_RECORDS_POINTER =
32+
JsonPointer.compile("/maxOutstandingRecords");
33+
private static final JsonPointer CLIENT_CONFIG_PROPS_POINTER =
34+
JsonPointer.compile("/clientConfigProperties");
35+
36+
static OptionalInt optionalMaxOutstandingRecords(JsonNode ingressSpecNode) {
37+
return Selectors.optionalIntegerAt(ingressSpecNode, MAX_OUTSTANDING_RECORDS_POINTER);
38+
}
39+
40+
static Map<String, String> clientConfigProperties(JsonNode ingressSpecNode) {
41+
return Selectors.propertiesAt(ingressSpecNode, CLIENT_CONFIG_PROPS_POINTER);
42+
}
43+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,50 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
package org.apache.flink.statefun.flink.io.kinesis;
19+
20+
import static org.apache.flink.statefun.flink.io.testutils.YamlUtils.loadAsJsonFromClassResource;
21+
import static org.hamcrest.CoreMatchers.instanceOf;
22+
import static org.hamcrest.MatcherAssert.assertThat;
23+
24+
import com.google.protobuf.Any;
25+
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
26+
import org.apache.flink.statefun.flink.io.kinesis.polyglot.GenericKinesisSinkProvider;
27+
import org.apache.flink.statefun.flink.io.spi.JsonEgressSpec;
28+
import org.apache.flink.statefun.sdk.io.EgressIdentifier;
29+
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
30+
import org.apache.flink.streaming.connectors.kinesis.FlinkKinesisProducer;
31+
import org.junit.Test;
32+
33+
public class GenericKinesisSinkProviderTest {
34+
35+
@Test
36+
public void exampleUsage() {
37+
JsonNode egressDefinition =
38+
loadAsJsonFromClassResource(getClass().getClassLoader(), "generic-kinesis-egress.yaml");
39+
JsonEgressSpec<?> spec =
40+
new JsonEgressSpec<>(
41+
PolyglotKinesisIOTypes.GENERIC_KINESIS_EGRESS_TYPE,
42+
new EgressIdentifier<>("foo", "bar", Any.class),
43+
egressDefinition);
44+
45+
GenericKinesisSinkProvider provider = new GenericKinesisSinkProvider();
46+
SinkFunction<?> source = provider.forSpec(spec);
47+
48+
assertThat(source, instanceOf(FlinkKinesisProducer.class));
49+
}
50+
}
Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
# Licensed to the Apache Software Foundation (ASF) under one or more
2+
# contributor license agreements. See the NOTICE file distributed with
3+
# this work for additional information regarding copyright ownership.
4+
# The ASF licenses this file to You under the Apache License, Version 2.0
5+
# (the "License"); you may not use this file except in compliance with
6+
# the License. You may obtain a copy of the License at
7+
#
8+
# http://www.apache.org/licenses/LICENSE-2.0
9+
#
10+
# Unless required by applicable law or agreed to in writing, software
11+
# distributed under the License is distributed on an "AS IS" BASIS,
12+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
# See the License for the specific language governing permissions and
14+
# limitations under the License.
15+
16+
egress:
17+
meta:
18+
type: statefun.kinesis.io/generic-egress
19+
id: com.mycomp.foo/bar
20+
spec:
21+
awsRegion:
22+
type: custom-endpoint
23+
endpoint: https://localhost:4567
24+
id: us-west-1
25+
awsCredentials:
26+
type: profile
27+
profileName: john-doe
28+
profilePath: /path/to/profile/config
29+
maxOutstandingRecords: 9999
30+
clientConfigProperties:
31+
- ThreadingModel: POOLED
32+
- ThreadPoolSize: 10

statefun-flink/statefun-flink-io/src/main/java/org/apache/flink/statefun/flink/io/kinesis/PolyglotKinesisIOTypes.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818

1919
package org.apache.flink.statefun.flink.io.kinesis;
2020

21+
import org.apache.flink.statefun.sdk.EgressType;
2122
import org.apache.flink.statefun.sdk.IngressType;
2223

2324
public final class PolyglotKinesisIOTypes {
@@ -26,4 +27,7 @@ private PolyglotKinesisIOTypes() {}
2627

2728
public static final IngressType ROUTABLE_PROTOBUF_KINESIS_INGRESS_TYPE =
2829
new IngressType("statefun.kinesis.io", "routable-protobuf-ingress");
30+
31+
public static final EgressType GENERIC_KINESIS_EGRESS_TYPE =
32+
new EgressType("statefun.kinesis.io", "generic-egress");
2933
}

0 commit comments

Comments
 (0)