This repository has been archived by the owner on Mar 17, 2023. It is now read-only.
generated from streamnative/pulsar-io-template
-
Notifications
You must be signed in to change notification settings - Fork 12
/
Copy pathpulsar-io-kafka.yaml
127 lines (108 loc) · 5.68 KB
/
pulsar-io-kafka.yaml
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
#
configs:
# `kafka` section is for configuring Kafka related settings
kafka:
# The Kafka topic that that source connector reads from. (* Required)
topic: input-topic
# The duration in milliseconds to poll the records from a Kafka consumer.
# The default value is 1000 ms.
#
# poll_duration_ms: 1000
# The properties to configure a Kafka consumer. (* Required)
# See Kafka documentation (http://kafka.apache.org/documentation/#consumerconfigs) for more details
#
consumer:
bootstrap.servers: 127.0.0.1:9092
group.id: test-group
auto.offset.reset: earliest
key.deserializer: org.apache.kafka.common.serialization.ByteArrayDeserializer
value.deserializer: org.apache.kafka.connect.json.JsonDeserializer
# Schema related settings
#
schema:
# The service url for Confluent schema registry. If you have configured the Kafka consumer using
# KafkaAvroDeserializer in the above section, please specify the schema registry service url.
#
# schema_registry:
#
# The following settings are used for configuring *schema* for JSON data.
#
# The schema provider type to tell the source connector how to fetch the schema information for JSON data
# The available providers are : [ PULSAR, CONFIG ]
# - PULSAR: Source connector fetches the schema information of the output Pulsar topic. It requires the output
# Pulsar topic exists with a schema prior to running the source connector.
# - CONFIG: Source connector fetches the schema information from `key_schema` and `value_schema` settings. The
# schema definition is a json string written in AVRO specification.
# json_schema_provider: PULSAR
json_schema_provider: CONFIG
# The key/value schema definition. The schema definition is a json string written in AVRO specification. These
# settings are only used when `json_schema_provider` is configured to be `CONFIG`.
#
# key_schema:
value_schema: '{"type":"record","name":"User","namespace":"io.streamnative.connectors.kafka.example","fields":[{"name":"name","type":["null","string"],"default":null},{"name":"age","type":"int"},{"name":"address","type":["null","string"],"default":null}]}'
# The key/value converter used for converting json data into the target schema. If it is omitted, the json data
# will be converted to AVRO data by default.
#
# key_converter:
# value_converter:
# `pulsar` section is for configuring Pulsar related settings
pulsar:
# The output Pulsar topic. If it is not set, Kafka source will automatically create the topic
# if `create_topic_if_missing` is set to true.
#
topic: output-topic
# The Pulsar web service url. This is used by Kafka source to create Pulsar topic.
#
pulsar_web_service_url: "http://localhost:8080"
# If Kafka source allows Kafka input topic and Pulsar output topic to have different number of
# partitions. If it is disallowed, Kafka source is refused to start when detecting Pulsar output
# topic has different number of partitions.
allow_different_num_partitions: true
# Flag to tell Kafka source to update the number of partitions if it detects Kafka input topic and
# Pulsar output topic have different number of partitions. Currently it only supports increasing
# the number of partitions in Pulsar topic. It doesn't support decreasing.
#
update_partitions_if_inconsistent: true
# If the Pulsar topic is missing, create the Pulsar topic and make sure the number of partitions
# is same as the number partitions of Kafka topic.
create_topic_if_missing: true
# Flag to tell Kafka source to copy kafka schema or not. If it is set to false, Kafka source will
# only transfer raw bytes; if it is set to true, Kafka source will detect the schema by interpreting
# the deserializer configured in Kafka section and converting the data into Pulsar schema.
#
# The convertion rule:
# - ByteArrayDeserializer -> Schema.BYTES
# - ByteBufferDeserializer -> Schema.BYTES
# - BytesDeserializer -> Schema.BYTES
# - StringDeserializer -> Schema.STRING
# - DoubleDeserializer -> Schema.DOUBLE
# - FloatDeserializer -> Schema.FLOAT
# - IntegerDeserializer -> Schema.INT32
# - LongDeserializer -> Schema.INT64
# - ShortDeserializer -> Schema.INT16
# - JsonDeserializer -> Schema.JSON / Schema.AVRO (if an AVRO converter is specified)
# - KafkaAvroDeserializer -> Schema.AVRO
# - Unknown Deserializers -> Schema.BYTES
copy_kafka_schema: true
# The properties to configure Pulsar client
client:
serviceUrl: "pulsar://127.0.0.1:6650"
# The properties to configure Pulsar producer
producer: