[Feature Request] A new IngestionEngine that can pull data from streaming sources. #16929
Labels
enhancement
Enhancement or improvement to existing feature or request
Indexing
Indexing, Bulk Indexing and anything related to indexing
Is your feature request related to a problem? Please describe
Unlike the existing engines, we need a new engine that can pull messages directly from the streaming sources other than via the indexing APIs
Describe the solution you'd like
We introduce a new type of engine “IngestionEngine”, which contains a StreamPoller that can pull from the streaming source via the IngestionShardConsumer described in the API section. The IngestionEngine corresponds to a shard that is created by the
IndicesService
, and binds to the mapped streaming partition.The stream poller can poll the messages from the streaming source in a batch, and then it can process the messages in the batch in parallel. For each message, the ingestion engine processes it and parses the message into an Engine Operation object (e.g. index) using the DocumentMapper. And then each processed message is stored into Lucene along with the offset information as two special fields: (1) a PointField for range query during the recovery (2) a StoredField for value lookup.
For instance, the figure below shows an example of the processed state of the messages in a streaming source. The poller polls a batch of messages from 1001 to 1006, and the poller records the start offset of the batch, which is 1001 in this example. At the current state, messages at 1001,1002,1004 are already processed, and the offsets are recorded in the Lucene index, as shown in the index data in the Figure. Messages at 1003 and 1005 are being processed, so their offset information is not in the Lucene index yet. And the message at offset 1006 is already polled but it waits for processing.
Checkpoint and Recovery
One opportunity with streaming ingestion is to leverage streaming systems for durability and remove the need of translog. And therefore the IngestionEngine uses a no-op translog manager. Instead, we store the batch start offset in the commit data. For instance, offset 1001 is stored in the commit data in Figure 3, if the Ingestion Engine performs a flush with the state in Figure 3.
During recovery, the Ingestion Engine will parse the batch start offset from the commit data, and rewind the poller’s pointer to it. In addition, the ingestion engine will read the offset field using a pluggable range query (e.g. LongPoint for Kafka offset) to retrieve the offsets that are already processed, so that the stream poller’s replay can skip those processed messages to achieve exactly-once message read.
Since the IngestionEngine is able to recover from the commit data directly without the translog, each primary and replica shard can use the commit data to recover itself without peer-to-peer contact, and achieve sharded nothing architecture among the servers.
More details can be found in this design doc
Related component
Indexing
Describe alternatives you've considered
No response
Additional context
No response
The text was updated successfully, but these errors were encountered: