Skip to content

Commit

Permalink
Pipe: Avoid constructing unnecessary pipe subtasks if db and pattern …
Browse files Browse the repository at this point in the history
…dismatch (apache#13873)
  • Loading branch information
SteveYurongSu authored Oct 23, 2024
1 parent 0c7a947 commit 86469d3
Show file tree
Hide file tree
Showing 6 changed files with 64 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -118,11 +118,11 @@ protected void createPipeTask(
throws IllegalPathException {
if (pipeTaskMeta.getLeaderNodeId() == CONFIG.getDataNodeId()) {
final PipeParameters extractorParameters = pipeStaticMeta.getExtractorParameters();
final DataRegionId dataRegionId = new DataRegionId(consensusGroupId);
final boolean needConstructDataRegionTask =
StorageEngine.getInstance()
.getAllDataRegionIds()
.contains(new DataRegionId(consensusGroupId))
&& DataRegionListeningFilter.shouldDataRegionBeListened(extractorParameters);
StorageEngine.getInstance().getAllDataRegionIds().contains(dataRegionId)
&& DataRegionListeningFilter.shouldDataRegionBeListened(
extractorParameters, dataRegionId);
final boolean needConstructSchemaRegionTask =
SchemaEngine.getInstance()
.getAllSchemaRegionIds()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,9 +64,11 @@ public Map<Integer, PipeTask> build() throws IllegalPathException {

if (pipeTaskMeta.getLeaderNodeId() == CONFIG.getDataNodeId()) {
final PipeParameters extractorParameters = pipeStaticMeta.getExtractorParameters();
final DataRegionId dataRegionId = new DataRegionId(consensusGroupId);
final boolean needConstructDataRegionTask =
dataRegionIds.contains(new DataRegionId(consensusGroupId))
&& DataRegionListeningFilter.shouldDataRegionBeListened(extractorParameters);
dataRegionIds.contains(dataRegionId)
&& DataRegionListeningFilter.shouldDataRegionBeListened(
extractorParameters, dataRegionId);
final boolean needConstructSchemaRegionTask =
schemaRegionIds.contains(new SchemaRegionId(consensusGroupId))
&& !SchemaRegionListeningFilter.parseListeningPlanTypeSet(extractorParameters)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,13 @@

package org.apache.iotdb.db.pipe.extractor.dataregion;

import org.apache.iotdb.commons.consensus.DataRegionId;
import org.apache.iotdb.commons.exception.IllegalPathException;
import org.apache.iotdb.commons.path.PartialPath;
import org.apache.iotdb.commons.pipe.agent.task.PipeTask;
import org.apache.iotdb.commons.pipe.datastructure.pattern.TablePattern;
import org.apache.iotdb.commons.pipe.datastructure.pattern.TreePattern;
import org.apache.iotdb.db.storageengine.StorageEngine;
import org.apache.iotdb.db.storageengine.dataregion.DataRegion;
import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameters;

Expand Down Expand Up @@ -57,12 +61,35 @@ public class DataRegionListeningFilter {
}
}

public static boolean shouldDataRegionBeListened(PipeParameters parameters)
throws IllegalPathException {
public static boolean shouldDataRegionBeListened(
PipeParameters parameters, DataRegionId dataRegionId) throws IllegalPathException {
final Pair<Boolean, Boolean> insertionDeletionListeningOptionPair =
parseInsertionDeletionListeningOptionPair(parameters);
return insertionDeletionListeningOptionPair.getLeft()
|| insertionDeletionListeningOptionPair.getRight();
final boolean hasSpecificListeningOption =
insertionDeletionListeningOptionPair.getLeft()
|| insertionDeletionListeningOptionPair.getRight();
if (!hasSpecificListeningOption) {
return false;
}

final DataRegion dataRegion = StorageEngine.getInstance().getDataRegion(dataRegionId);
if (dataRegion == null) {
return true;
}

final String databaseRawName = dataRegion.getDatabaseName();
final String databaseTreeModel =
databaseRawName.startsWith("root.") ? databaseRawName : "root." + databaseRawName;
final String databaseTableModel =
databaseRawName.startsWith("root.") ? databaseRawName.substring(5) : databaseRawName;

final TreePattern treePattern = TreePattern.parsePipePatternFromSourceParameters(parameters);
final TablePattern tablePattern = TablePattern.parsePipePatternFromSourceParameters(parameters);

return treePattern.isTreeModelDataAllowedToBeCaptured()
&& treePattern.mayOverlapWithDb(databaseTreeModel)
|| tablePattern.isTableModelDataAllowedToBeCaptured()
&& tablePattern.matchesDatabase(databaseTableModel);
}

public static Pair<Boolean, Boolean> parseInsertionDeletionListeningOptionPair(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,15 @@ public boolean coversDevice(final IDeviceID device) {
}
}

@Override
public boolean mayOverlapWithDb(final String db) {
try {
return patternPartialPath.overlapWith(new PartialPath(db + ".**"));
} catch (final IllegalPathException e) {
return false;
}
}

@Override
public boolean mayOverlapWithDevice(final IDeviceID device) {
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,15 @@ public boolean coversDevice(final IDeviceID device) {
return pattern.length() <= deviceStr.length() && deviceStr.startsWith(pattern);
}

@Override
public boolean mayOverlapWithDb(final String db) {
return
// for example, pattern is root.a.b and db is root.a.b.c
(pattern.length() <= db.length() && db.startsWith(pattern))
// for example, pattern is root.a.b.c and db is root.a.b
|| (pattern.length() > db.length() && pattern.startsWith(db));
}

@Override
public boolean mayOverlapWithDevice(final IDeviceID device) {
final String deviceStr = device.toString();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,13 @@ public static TreePattern parsePipePatternFromSourceParameters(
/** Check if a device's all measurements are covered by this pattern. */
public abstract boolean coversDevice(final IDeviceID device);

/**
* Check if a database may have some measurements matched by the pattern.
*
* @return {@code true} if the pattern may overlap with the database, {@code false} otherwise.
*/
public abstract boolean mayOverlapWithDb(final String db);

/**
* Check if a device may have some measurements matched by the pattern.
*
Expand Down

0 comments on commit 86469d3

Please sign in to comment.