Sink that indexes documents into Elasticsearch.
This Elasticsearch sink only supports indexing JSON documents.
It consumes data from an input destination and then indexes it to Elasticsearch.
The input data can be a plain json string, or a java.util.Map
that represents the JSON.
It also accepts the data as the Elasticsearch provided XContentBuilder
.
However, this is a rare case as it is not likely the middleware keeps the records as XContentBuilder
.
This is provided mainly for direct invocation of the consumer.
The Elasticsearch sink has the following options:
- elasticsearch.consumer.async
-
Indicates whether the indexing operation is async or not. By default indexing is done synchronously. (Boolean, default:
false
) - elasticsearch.consumer.batch-size
-
Number of items to index for each request. It defaults to 1. For values greater than 1 bulk indexing API will be used. (Integer, default:
1
) - elasticsearch.consumer.group-timeout
-
Timeout in milliseconds after which message group is flushed when bulk indexing is active. It defaults to -1, meaning no automatic flush of idle message groups occurs. (Long, default:
-1
) - elasticsearch.consumer.id
-
The id of the document to index. If set, the INDEX_ID header value overrides this property on a per message basis. (Expression, default:
<none>
) - elasticsearch.consumer.index
-
Name of the index. If set, the INDEX_NAME header value overrides this property on a per message basis. (String, default:
<none>
) - elasticsearch.consumer.routing
-
Indicates the shard to route to. If not provided, Elasticsearch will default to a hash of the document id. (String, default:
<none>
) - elasticsearch.consumer.timeout-seconds
-
Timeout for the shard to be available. If not set, it defaults to 1 minute set by the Elasticsearch client. (Long, default:
0
)
-
From the folder
elasticsearch-sink
:./mvnw clean package
-
cd apps
-
cd to the proper binder generated app (Kafka or RabbitMQ)
-
./mvnw clean package
-
Make sure that you have Elasticsearch running. For example you can run it as a docker container using the following command.
docker run -d --name es762 -p 9200:9200 -e "discovery.type=single-node" elasticsearch:7.6.2
-
Start the middleware (Kafka or RabbitMQ) if it is not already running.
-
java -jar target/elasticsearch-sink-<kafka|rabbit>-3.0.0-SNAPSHOT.jar --spring.cloud.stream.bindings.input.destination=els-in --elasticsearch.consumer.index=testing
-
Send some JSON data into the middleware destination. For e.g:
"foo":"bar"}
-
Verify that the data is indexed:
curl localhost:9200/testing/_search