-
Notifications
You must be signed in to change notification settings - Fork 416
Proposal: Node Level Ordered Concurrency
- Nodes provide a concurrency parameter, which can start multiple Go routines.
- Each Go routine in a node is completely independent, making the results of node execution unordered.
The current parallel relationship is actually starting multiple replicas of the node.
- Node parallelism is unordered, which is not suitable for most application scenarios, leading to low usage.
- Scenarios that require order (most cases) cannot use parallelism, and the overall throughput of the rules depends on the slowest node. Mismatched processing speeds can easily lead to back pressure and data loss.
- Especially in Source nodes, the external system connection and decoding are in the same node. Decoding is much slower than data subscription, leading to low overall consumption throughput, subsequent node starvation, inability to increase rule throughput, and inability to increase CPU utilization.
- For example, in MQTT, slow consumption can lead to broker message backlog (seen in some users).
- Some nodes have multiple responsibilities, and the computational resource consumption of different responsibilities varies greatly, typically in IO node. It cannot flexibly allocate parallel computing power, and it cannot observe the metrics of different responsibilities within the node.
- Source node, connection and decoding.
- Sink node, transformation/encoding and connection sending.
Stream processing in cloud usually allow setting the parallelism. However, it also does not guarantee the order, it only guarantees the data order within the keyed partition. EK currently has no distributed mechanism or partition mechanism.
Different from cloud scenario, in edge computing scenarios, it is a strong demand to improve processing capabilities (throughput) while ensuring order.
- In vehicle, the data source is relatively single, and basically all processing is ordered.
- In industrial scenarios, it is common to deploy at the gateway, and internal processing of data from different logical partitions; for example, data from different field stations, distinguished by group id. At this time, only the same logical partition needs to be ordered. Future consideration of partition support.
In edge scenarios, it is necessary to implement parallel computing while ensuring order.
- Source
- Can be parallel, just ensure data order (single exit parallel)
- Error handling (decoding errors, etc.), irrecoverable, can be discarded (record metrics)
- Sink
- Can be parallel, just ensure data order (single exit parallel)
- Error handling, recoverable errors go through the retry mechanism, irrecoverable errors can be discarded
- Stateless Operator
- Can be parallel, just ensure data order (single exit parallel)
- Error handling, errors as data
- Stateful Operator
- Cannot be parallel
- Future consideration of Partition By/Group By parallel processing: partitioning within the operator based on the key, sequential processing within each partition, and also sequential processing when merging
Observations and Conclusions
- Stateful operators may become bottlenecks, but in actual applications, this part of state calculation is usually very small and has little overall impact.
- The current main CPU-intensive calculations are concentrated in the source decoding part, the sink encoding compression part, the sink data template conversion part, the AI inference part, most of which can support ordered parallelism
Parallelism occurs inside the node, and the node itself guarantees order.
Ordered parallelism is technically optional and has been applied in the Lianyou project.
Given that the current main bottleneck is in the Source and Sink parts, the next version will prioritize the reconstruction of these two parts. The main changes after the reconstruction are Planner and Runtime, and users do not need to change existing rules.
Using Go's channel mechanism, ordered concurrency can be implemented. Create multiple workers and corresponding channels, the distributor receives data and sends it to the corresponding channel in order. The Syncer receives the corresponding channel data in order and sends it downstream.
Users still configure parallelism and other configurations at the Stream end. In the actual execution plan, the source node is split as needed and the parallelism is set.
- Split: Connect -> RateLimit/Batch(No need to add after Decode) -> Decode -> Preprocess -> Partition
- Connect: Single node, ensure order. Transparent transmission generally does not affect performance. If multiple consumers are needed, use shared subscription.
- RateLimit: Single node, stateful, very small calculation.
- Decode: Ordered parallel
- Preprocess: Ordered parallel
- Partition: TODO, ordered parallel
- Built-in source transformation
- Only deal with connect, implement Connector only
- Connection metrics: connect status (0/1), last connect time, connect exception
- User extension
- Still support the all in one way (keep the context method but deprecate), if you use the decode method of context, it is recommended to migrate to the new interface
- For those who need to fully customize the connection + decoding, still use the all in one way and handle the parallel problem by themselves
- The rest is recommended to use the SourceConnector interface + Decoder interface
- Planner update
- DataSourcePlan can be converted into multiple Nodes, according to the interface type of Source (all in one or connector), converted into different Nodes
- Extract the process from logical to physical plan, Graph can also implement the same conversion
TBD
- If there is a global order requirement, use a single rule to implement
- Implement ordered parallel: this proposal
- If there is no global order requirement, use multiple rules
- Manually partition the stream (MQTT topic, memory topic etc.), each stream adds its own rules: more cumbersome
- TBD: eKuiper automatic partition capability
- Completely unordered (stateless), single rule multiple concurrency (prepare to deprecate) or random partition
- TBD: random partition, the source needs to support partitioning, such as shared subscription
- Manual random partition: Use multiple rules, each rule's source is a shared subscription (Source must support)