diff --git a/README.md b/README.md index 99ee224..4ceaa2e 100644 --- a/README.md +++ b/README.md @@ -1,6 +1,6 @@ # Pulsar AWS S3 sink -Pulsar AWS S3 sink +Pulsar AWS S3 sink receives JSON messages over Pulsar topics with the same schema and saves them as Parquet format on AWS S3. ## Operations ### Deployment @@ -11,8 +11,17 @@ GET `admin/v2/functions/connectors` displays the nar is loaded successfully as {"name":"aws-s3","description":"AWS S3 sink","sinkClass":"com.kesque.pulsar.sink.s3.AWSS3Sink"} ``` +`EFFECTIVELY_ONCE` processing guarantees is required since it implements message [cumulative acknowledgement](https://github.com/apache/pulsar/blob/master/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/PulsarSource.java#L129) + +Create a sink by uploading a nar file. +``` +$ bin/pulsar-admin sinks create --archive ./connectors/pulsar-io-s3-1.0.nar --inputs aws-s3-input-topic --name aws-s3-test --sink-config-file ./connectors/pulsar-s3-io.yaml --processing-guarantees EFFECTIVELY_ONCE --subs-position Earliest --sub-name auniquename +"Created successfully" ``` -$ bin/pulsar-admin sinks create --archive ./connectors/pulsar-io-s3-1.0.nar --inputs aws-s3-input-topic --name aws-s3-test --sink-config-file ./connectors/pulsar-s3-io.yaml --processing-guarantees EFFECTIVELY_ONCE --subs-position Earliest + +Create a sink from a preloaded nar file. +``` +$ bin/pulsar-admin sinks create --sink-type aws-s3 --inputs aws-s3-input-topic --name aws-s3-test --sink-config-file ./connectors/pulsar-s3-io.yaml --processing-guarantees EFFECTIVELY_ONCE --subs-position Earliest --subs-name auniquename "Created successfully" $ bin/pulsar-admin sinks list @@ -24,6 +33,9 @@ $ bin/pulsar-admin sinks delete --name aws-s3-test "Deleted successfully" ``` +### Topic schema registry +It is mandatory a schema is enforced over the input topics. The Sink would have fatal error to create parquet format when it receives messages with different schemas. + ## Build The command to build a nar file. ``` diff --git a/s3/src/main/resources/META-INF/services/pulsar-io.yaml b/s3/src/main/resources/META-INF/services/pulsar-io.yaml index 869b431..f1e1124 100644 --- a/s3/src/main/resources/META-INF/services/pulsar-io.yaml +++ b/s3/src/main/resources/META-INF/services/pulsar-io.yaml @@ -20,3 +20,4 @@ name: aws-s3 description: AWS S3 sink sinkClass: com.kesque.pulsar.sink.s3.AWSS3Sink +sinkConfigClass: com.kesque.pulsar.sink.s3.AWSS3Config