Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Pipe: support IoTDB-style pattern #12085

Merged
merged 35 commits into from
Mar 12, 2024
Merged
Show file tree
Hide file tree
Changes from 11 commits
Commits
Show all changes
35 commits
Select commit Hold shift + click to select a range
9290e14
tmp
DanielWang2035 Feb 25, 2024
919d823
Merge branch 'apache:master' into pipe-pattern-v2
DanielWang2035 Feb 25, 2024
88c806e
tmp
DanielWang2035 Feb 26, 2024
810416d
refactor
DanielWang2035 Feb 26, 2024
1146bd2
fix uppercase
DanielWang2035 Feb 26, 2024
15badd3
fix
DanielWang2035 Feb 26, 2024
703d14d
Merge branch 'master' into pipe-pattern-v2
DanielWang2035 Feb 27, 2024
97332f1
add IT & fix a bug in prefix matching
DanielWang2035 Feb 27, 2024
ca7a215
add IT
DanielWang2035 Feb 28, 2024
d5e60bf
set default format to "iotdb" & compatibility with old version
DanielWang2035 Feb 28, 2024
458b254
check pattern cover db
DanielWang2035 Feb 28, 2024
e97c843
fix
DanielWang2035 Feb 29, 2024
12525f9
Merge branch 'master' into pipe-pattern-v2
DanielWang2035 Feb 29, 2024
3e2cf7c
add toString()
DanielWang2035 Feb 29, 2024
1bdc17d
refactor
DanielWang2035 Feb 29, 2024
19e96a0
Merge branch 'master' into pipe-pattern-v2
DanielWang2035 Feb 29, 2024
7c6a853
fix IT
DanielWang2035 Feb 29, 2024
9e410df
add UT in TsFileInsertionDataContainerTest
DanielWang2035 Feb 29, 2024
4b01642
update CachedSchemaPatternMatcherTest.java
DanielWang2035 Feb 29, 2024
63d171f
update to new definitions
DanielWang2035 Mar 1, 2024
6979af3
Merge branch 'master' into pipe-pattern-v2
DanielWang2035 Mar 1, 2024
1603532
move PipePattern to node-commons
DanielWang2035 Mar 11, 2024
c9b7b1d
Merge branch 'master' into pipe-pattern-v2
DanielWang2035 Mar 11, 2024
04f5a26
resolve
DanielWang2035 Mar 11, 2024
d44b476
fix IT
DanielWang2035 Mar 11, 2024
7fdafa6
fix
DanielWang2035 Mar 12, 2024
4104b6a
rename
SteveYurongSu Mar 12, 2024
d659c6b
Merge branch 'master' of https://github.com/apache/iotdb into pr/12085
SteveYurongSu Mar 12, 2024
03b5581
refactor
SteveYurongSu Mar 12, 2024
d950004
Update CachedSchemaPatternMatcher.java
SteveYurongSu Mar 12, 2024
d4d502e
Update IoTDBPipePattern.java
SteveYurongSu Mar 12, 2024
7f4587d
comment
DanielWang2035 Mar 12, 2024
9089399
fix CI
SteveYurongSu Mar 12, 2024
8c234f8
Merge branch 'pipe-pattern-v2' of https://github.com/DanielWang2035/i…
SteveYurongSu Mar 12, 2024
0b05d52
Merge branch 'master' of https://github.com/apache/iotdb into pr/12085
SteveYurongSu Mar 12, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
import org.junit.After;
import org.junit.Before;

abstract class AbstractPipeDualIT {
public abstract class AbstractPipeDualIT {
DanielWang2035 marked this conversation as resolved.
Show resolved Hide resolved

protected BaseEnv senderEnv;
protected BaseEnv receiverEnv;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,7 @@ public void testWithAllParameters(String realtimeMode) throws Exception {

extractorAttributes.put("extractor", "iotdb-extractor");
extractorAttributes.put("extractor.pattern", "root.db.d1");
extractorAttributes.put("extractor.pattern.format", "prefix");
extractorAttributes.put("extractor.history.enable", "true");
extractorAttributes.put("extractor.history.start-time", "2010-01-01T08:00:00+08:00");
extractorAttributes.put("extractor.history.end-time", "2010-01-02T08:00:00+08:00");
Expand Down Expand Up @@ -189,6 +190,7 @@ public void testPipeAfterDataRegionLeaderStop() throws Exception {
Map<String, String> connectorAttributes = new HashMap<>();

extractorAttributes.put("extractor.pattern", "root.db.d1");
extractorAttributes.put("extractor.pattern.format", "prefix");

connectorAttributes.put("connector", "iotdb-thrift-connector");
connectorAttributes.put("connector.batch.enable", "false");
Expand Down Expand Up @@ -272,6 +274,7 @@ public void testPipeAfterDataRegionLeaderStop() throws Exception {
Map<String, String> connectorAttributes = new HashMap<>();

extractorAttributes.put("extractor.pattern", "root.db.d2");
extractorAttributes.put("extractor.pattern.format", "prefix");

connectorAttributes.put("connector", "iotdb-thrift-connector");
connectorAttributes.put("connector.batch.enable", "false");
Expand Down Expand Up @@ -315,6 +318,7 @@ public void testPipeAfterRegisterNewDataNode() throws Exception {
Map<String, String> connectorAttributes = new HashMap<>();

extractorAttributes.put("extractor.pattern", "root.db.d1");
extractorAttributes.put("extractor.pattern.format", "prefix");

connectorAttributes.put("connector", "iotdb-thrift-connector");
connectorAttributes.put("connector.batch.enable", "false");
Expand Down Expand Up @@ -368,6 +372,7 @@ public void testPipeAfterRegisterNewDataNode() throws Exception {
Map<String, String> connectorAttributes = new HashMap<>();

extractorAttributes.put("extractor.pattern", "root.db.d2");
extractorAttributes.put("extractor.pattern.format", "prefix");

connectorAttributes.put("connector", "iotdb-thrift-connector");
connectorAttributes.put("connector.batch.enable", "false");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -411,6 +411,7 @@ public void testExtractorPatternMatch() throws Exception {
Map<String, String> connectorAttributes = new HashMap<>();

extractorAttributes.put("extractor.pattern", null);
extractorAttributes.put("extractor.pattern.format", "prefix");

connectorAttributes.put("connector", "iotdb-thrift-connector");
connectorAttributes.put("connector.batch.enable", "false");
Expand Down Expand Up @@ -482,6 +483,7 @@ public void testMatchingMultipleDatabases() throws Exception {
Map<String, String> connectorAttributes = new HashMap<>();

extractorAttributes.put("extractor.pattern", "root.db1");
extractorAttributes.put("extractor.pattern.format", "prefix");

connectorAttributes.put("connector", "iotdb-thrift-connector");
connectorAttributes.put("connector.batch.enable", "false");
Expand Down Expand Up @@ -580,6 +582,7 @@ public void testHistoryAndRealtime() throws Exception {
connectorAttributes.put("connector.port", Integer.toString(receiverPort));

extractorAttributes.put("extractor.pattern", "root.db.d2");
extractorAttributes.put("extractor.pattern.format", "prefix");
extractorAttributes.put("extractor.history.enable", "false");
extractorAttributes.put("extractor.realtime.enable", "true");
TSStatus status =
Expand Down Expand Up @@ -663,6 +666,7 @@ public void testHistoryStartTimeAndEndTimeWorkingWithOrWithoutPattern() throws E
Map<String, String> connectorAttributes = new HashMap<>();

extractorAttributes.put("extractor.pattern", "root.db.d1");
extractorAttributes.put("extractor.pattern.format", "prefix");
extractorAttributes.put("extractor.history.enable", "true");
// 1970-01-01T08:00:02+08:00
extractorAttributes.put("extractor.history.start-time", "2000");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -220,6 +220,7 @@ private void testInsertNullValueTemplate(

if (withParsing) {
extractorAttributes.put("extractor.pattern", "root.sg.d1");
extractorAttributes.put("extractor.pattern.format", "prefix");
}

TSStatus status =
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,146 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.iotdb.pipe.it.pattern;

import org.apache.iotdb.common.rpc.thrift.TSStatus;
import org.apache.iotdb.commons.client.sync.SyncConfigNodeIServiceClient;
import org.apache.iotdb.confignode.rpc.thrift.TCreatePipeReq;
import org.apache.iotdb.db.it.utils.TestUtils;
import org.apache.iotdb.it.env.cluster.node.DataNodeWrapper;
import org.apache.iotdb.it.framework.IoTDBTestRunner;
import org.apache.iotdb.itbase.category.MultiClusterIT2;
import org.apache.iotdb.pipe.it.AbstractPipeDualIT;
import org.apache.iotdb.rpc.TSStatusCode;

import org.junit.Assert;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.runner.RunWith;

import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;

@RunWith(IoTDBTestRunner.class)
@Category({MultiClusterIT2.class})
public class PipePatternFormatIT extends AbstractPipeDualIT {
@Test
public void testPrefixPattern() throws Exception {
DataNodeWrapper receiverDataNode = receiverEnv.getDataNodeWrapper(0);

String receiverIp = receiverDataNode.getIp();
int receiverPort = receiverDataNode.getPort();

try (SyncConfigNodeIServiceClient client =
(SyncConfigNodeIServiceClient) senderEnv.getLeaderConfigNodeConnection()) {

if (!TestUtils.tryExecuteNonQueriesWithRetry(
senderEnv,
Arrays.asList(
"insert into root.db.d1(time, s, s1, t) values (1, 1, 1, 1)",
"insert into root.db.d2(time, s) values (1, 1)",
"insert into root.db2.d1(time, s) values (1, 1)"))) {
return;
}

Map<String, String> extractorAttributes = new HashMap<>();
Map<String, String> processorAttributes = new HashMap<>();
Map<String, String> connectorAttributes = new HashMap<>();

extractorAttributes.put("extractor.pattern", "root.db.d1.s");
extractorAttributes.put("extractor.pattern.format", "prefix");

connectorAttributes.put("connector", "iotdb-thrift-connector");
connectorAttributes.put("connector.batch.enable", "false");
connectorAttributes.put("connector.ip", receiverIp);
connectorAttributes.put("connector.port", Integer.toString(receiverPort));

TSStatus status =
client.createPipe(
new TCreatePipeReq("p1", connectorAttributes)
.setExtractorAttributes(extractorAttributes)
.setProcessorAttributes(processorAttributes));

Assert.assertEquals(TSStatusCode.SUCCESS_STATUS.getStatusCode(), status.getCode());

Assert.assertEquals(
TSStatusCode.SUCCESS_STATUS.getStatusCode(), client.startPipe("p1").getCode());

Set<String> expectedResSet = new HashSet<>();
expectedResSet.add("1,1.0,1.0,");
TestUtils.assertDataOnEnv(
receiverEnv, "select * from root.**", "Time,root.db.d1.s,root.db.d1.s1,", expectedResSet);
}
}

@Test
public void testIotdbPattern() throws Exception {
DataNodeWrapper receiverDataNode = receiverEnv.getDataNodeWrapper(0);

String receiverIp = receiverDataNode.getIp();
int receiverPort = receiverDataNode.getPort();

try (SyncConfigNodeIServiceClient client =
(SyncConfigNodeIServiceClient) senderEnv.getLeaderConfigNodeConnection()) {

if (!TestUtils.tryExecuteNonQueriesWithRetry(
senderEnv,
Arrays.asList(
"insert into root.db.d1(time, s, s1, t) values (1, 1, 1, 1)",
"insert into root.db.d2(time, s) values (1, 1)",
"insert into root.db2.d1(time, s) values (1, 1)"))) {
return;
}

Map<String, String> extractorAttributes = new HashMap<>();
Map<String, String> processorAttributes = new HashMap<>();
Map<String, String> connectorAttributes = new HashMap<>();

extractorAttributes.put("extractor.pattern", "root.**.d1.s*");
extractorAttributes.put("extractor.pattern.format", "iotdb");

connectorAttributes.put("connector", "iotdb-thrift-connector");
connectorAttributes.put("connector.batch.enable", "false");
connectorAttributes.put("connector.ip", receiverIp);
connectorAttributes.put("connector.port", Integer.toString(receiverPort));

TSStatus status =
client.createPipe(
new TCreatePipeReq("p1", connectorAttributes)
.setExtractorAttributes(extractorAttributes)
.setProcessorAttributes(processorAttributes));

Assert.assertEquals(TSStatusCode.SUCCESS_STATUS.getStatusCode(), status.getCode());

Assert.assertEquals(
TSStatusCode.SUCCESS_STATUS.getStatusCode(), client.startPipe("p1").getCode());

Set<String> expectedResSet = new HashSet<>();
expectedResSet.add("1,1.0,1.0,1.0,");
TestUtils.assertDataOnEnv(
receiverEnv,
"select * from root.**",
"Time,root.db2.d1.s,root.db.d1.s,root.db.d1.s1,",
expectedResSet);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,7 @@ public void open(Configuration parameters) throws Exception {
+ "'extractor' = 'iotdb-extractor',\n"
+ "'extractor.history.enable' = 'false',\n"
+ "'extractor.pattern' = '%s',\n"
+ "'extractor.pattern.format' = 'prefix',\n"
+ ") WITH CONNECTOR (\n"
+ "'connector' = 'websocket-connector',\n"
+ "'connector.websocket.port' = '%d',\n"
Expand All @@ -121,6 +122,7 @@ public void open(Configuration parameters) throws Exception {
+ "WITH EXTRACTOR (\n"
+ "'extractor' = 'iotdb-extractor',\n"
+ "'extractor.pattern' = '%s',\n"
+ "'extractor.pattern.format' = 'prefix',\n"
+ ") WITH CONNECTOR (\n"
+ "'connector' = 'websocket-connector',\n"
+ "'connector.websocket.port' = '%d',\n"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.apache.iotdb.common.rpc.thrift.TConsensusGroupType;
import org.apache.iotdb.common.rpc.thrift.TSStatus;
import org.apache.iotdb.commons.consensus.index.impl.MinimumProgressIndex;
import org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant;
import org.apache.iotdb.commons.pipe.task.meta.PipeRuntimeMeta;
import org.apache.iotdb.commons.pipe.task.meta.PipeStaticMeta;
import org.apache.iotdb.commons.pipe.task.meta.PipeStatus;
Expand Down Expand Up @@ -67,6 +68,16 @@ public CreatePipeProcedureV2() {
public CreatePipeProcedureV2(TCreatePipeReq createPipeRequest) throws PipeException {
super();
this.createPipeRequest = createPipeRequest;
// For newly created pipes, we add an internal "source.version" attribute to identify them.
// Pipes without this attribute will be treated as legacy version, and their default pattern
// format will be "prefix".
// Pipes with this attribute will be treated as new version, and their default pattern format
// will be "iotdb".
this.createPipeRequest
.getExtractorAttributes()
.put(
PipeExtractorConstant.SOURCE_VERSION_KEY,
DanielWang2035 marked this conversation as resolved.
Show resolved Hide resolved
PipeExtractorConstant.SOURCE_VERSION_V2_VALUE);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,10 @@
import org.apache.iotdb.commons.consensus.index.ProgressIndex;
import org.apache.iotdb.commons.consensus.index.impl.MinimumProgressIndex;
import org.apache.iotdb.commons.exception.pipe.PipeRuntimeException;
import org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant;
import org.apache.iotdb.commons.pipe.task.meta.PipeTaskMeta;
import org.apache.iotdb.db.pipe.agent.PipeAgent;
import org.apache.iotdb.db.pipe.pattern.PipePattern;
import org.apache.iotdb.db.pipe.pattern.PipePatternFormat;
import org.apache.iotdb.db.pipe.progress.committer.PipeEventCommitManager;
import org.apache.iotdb.pipe.api.event.Event;

Expand All @@ -50,7 +51,7 @@ public abstract class EnrichedEvent implements Event {
public static final long NO_COMMIT_ID = -1;
protected long commitId = NO_COMMIT_ID;

protected final String pattern;
protected final PipePattern pipePattern;

protected final long startTime;
protected final long endTime;
Expand All @@ -61,14 +62,18 @@ public abstract class EnrichedEvent implements Event {
protected boolean shouldReportOnCommit = false;

protected EnrichedEvent(
String pipeName, PipeTaskMeta pipeTaskMeta, String pattern, long startTime, long endTime) {
String pipeName,
PipeTaskMeta pipeTaskMeta,
PipePattern pipePattern,
long startTime,
long endTime) {
referenceCount = new AtomicInteger(0);
this.pipeName = pipeName;
this.pipeTaskMeta = pipeTaskMeta;
this.pattern = pattern;
this.pipePattern = pipePattern != null ? pipePattern : new PipePattern(null);
this.startTime = startTime;
this.endTime = endTime;
isPatternParsed = getPattern().equals(PipeExtractorConstant.EXTRACTOR_PATTERN_DEFAULT_VALUE);
isPatternParsed = this.pipePattern.isRoot();
isTimeParsed = Long.MIN_VALUE == startTime && Long.MAX_VALUE == endTime;
}

Expand Down Expand Up @@ -180,12 +185,18 @@ public final PipeTaskMeta getPipeTaskMeta() {
}

/**
* Get the pattern of this event.
* Get the pattern string of this event.
*
* @return the pattern
*/
public final String getPattern() {
return pattern == null ? PipeExtractorConstant.EXTRACTOR_PATTERN_DEFAULT_VALUE : pattern;
return pipePattern.getPattern() == null
? pipePattern.getFormat().getDefaultPattern()
: pipePattern.getPattern();
}

public final PipePatternFormat getPatternFormat() {
return pipePattern.getFormat();
}

public final long getStartTime() {
Expand Down Expand Up @@ -221,7 +232,11 @@ public boolean shouldParseTime() {
}

public abstract EnrichedEvent shallowCopySelfAndBindPipeTaskMetaForProgressReport(
String pipeName, PipeTaskMeta pipeTaskMeta, String pattern, long startTime, long endTime);
String pipeName,
PipeTaskMeta pipeTaskMeta,
PipePattern pattern,
long startTime,
long endTime);

public void reportException(PipeRuntimeException pipeRuntimeException) {
if (pipeTaskMeta != null) {
Expand Down Expand Up @@ -268,7 +283,7 @@ public String toString() {
+ "', commitId="
+ commitId
+ ", pattern='"
+ pattern
+ pipePattern
+ "', startTime="
+ startTime
+ ", endTime="
Expand All @@ -293,7 +308,7 @@ public String coreReportMessage() {
+ "', commitId="
+ commitId
+ ", pattern='"
+ pattern
+ pipePattern
+ "', startTime="
+ startTime
+ ", endTime="
Expand Down
Loading