From 6a1c1aa938cc49731db7e953b34bd37269a53a31 Mon Sep 17 00:00:00 2001 From: ijihang Date: Fri, 25 Jun 2021 14:44:00 +0800 Subject: [PATCH 1/6] Fix the inconsistency between distributed version and stand-alone version --- .../query/EmptyIntervalException.java | 28 +++ .../query/executor/RawDataQueryExecutor.java | 11 +- .../iotdb/db/utils/TimeValuePairUtils.java | 232 ++++++++++++++++++ 3 files changed, 269 insertions(+), 2 deletions(-) create mode 100644 server/src/main/java/org/apache/iotdb/db/exception/query/EmptyIntervalException.java diff --git a/server/src/main/java/org/apache/iotdb/db/exception/query/EmptyIntervalException.java b/server/src/main/java/org/apache/iotdb/db/exception/query/EmptyIntervalException.java new file mode 100644 index 000000000000..ebc60874bcf4 --- /dev/null +++ b/server/src/main/java/org/apache/iotdb/db/exception/query/EmptyIntervalException.java @@ -0,0 +1,28 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iotdb.db.exception.query; + +import org.apache.iotdb.tsfile.read.filter.basic.Filter; + +public class EmptyIntervalException extends Exception { + + public EmptyIntervalException(Filter filter) { + super(String.format("The interval of the filter %s is empty.", filter)); + } +} diff --git a/server/src/main/java/org/apache/iotdb/db/query/executor/RawDataQueryExecutor.java b/server/src/main/java/org/apache/iotdb/db/query/executor/RawDataQueryExecutor.java index ac13c1eb1f9a..ada73437c410 100644 --- a/server/src/main/java/org/apache/iotdb/db/query/executor/RawDataQueryExecutor.java +++ b/server/src/main/java/org/apache/iotdb/db/query/executor/RawDataQueryExecutor.java @@ -22,6 +22,7 @@ import org.apache.iotdb.db.engine.querycontext.QueryDataSource; import org.apache.iotdb.db.engine.storagegroup.StorageGroupProcessor; import org.apache.iotdb.db.exception.StorageEngineException; +import org.apache.iotdb.db.exception.query.EmptyIntervalException; import org.apache.iotdb.db.exception.query.QueryProcessException; import org.apache.iotdb.db.metadata.PartialPath; import org.apache.iotdb.db.qp.physical.crud.RawDataQueryPlan; @@ -35,6 +36,7 @@ import org.apache.iotdb.db.query.reader.series.SeriesRawDataBatchReader; import org.apache.iotdb.db.query.reader.series.SeriesReaderByTimestamp; import org.apache.iotdb.db.query.timegenerator.ServerTimeGenerator; +import org.apache.iotdb.db.utils.TimeValuePairUtils; import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType; import org.apache.iotdb.tsfile.read.expression.impl.GlobalTimeExpression; import org.apache.iotdb.tsfile.read.filter.basic.Filter; @@ -64,8 +66,8 @@ public QueryDataSet executeWithoutValueFilter(QueryContext context) if (dataSet != null) { return dataSet; } - List readersOfSelectedSeries = initManagedSeriesReader(context); try { + List readersOfSelectedSeries = initManagedSeriesReader(context); return new RawQueryDataSetWithoutValueFilter( context.getQueryId(), queryPlan.getDeduplicatedPaths(), @@ -112,7 +114,10 @@ protected List initManagedSeriesReader(QueryContext context QueryDataSource queryDataSource = QueryResourceManager.getInstance().getQueryDataSource(path, context, timeFilter); timeFilter = queryDataSource.updateFilterUsingTTL(timeFilter); - + TimeValuePairUtils.Intervals intervals = TimeValuePairUtils.extractTimeInterval(timeFilter); + if (intervals.isEmpty()) { + throw new EmptyIntervalException(timeFilter); + } ManagedSeriesReader reader = new SeriesRawDataBatchReader( path, @@ -126,6 +131,8 @@ protected List initManagedSeriesReader(QueryContext context queryPlan.isAscending()); readersOfSelectedSeries.add(reader); } + } catch (EmptyIntervalException e) { + throw new StorageEngineException(e.getMessage()); } finally { StorageEngine.getInstance().mergeUnLock(list); } diff --git a/server/src/main/java/org/apache/iotdb/db/utils/TimeValuePairUtils.java b/server/src/main/java/org/apache/iotdb/db/utils/TimeValuePairUtils.java index ba560a6c6565..1df6be093b7b 100644 --- a/server/src/main/java/org/apache/iotdb/db/utils/TimeValuePairUtils.java +++ b/server/src/main/java/org/apache/iotdb/db/utils/TimeValuePairUtils.java @@ -22,6 +22,12 @@ import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType; import org.apache.iotdb.tsfile.read.TimeValuePair; import org.apache.iotdb.tsfile.read.common.BatchData; +import org.apache.iotdb.tsfile.read.filter.GroupByFilter; +import org.apache.iotdb.tsfile.read.filter.TimeFilter; +import org.apache.iotdb.tsfile.read.filter.basic.Filter; +import org.apache.iotdb.tsfile.read.filter.operator.AndFilter; +import org.apache.iotdb.tsfile.read.filter.operator.NotFilter; +import org.apache.iotdb.tsfile.read.filter.operator.OrFilter; import org.apache.iotdb.tsfile.utils.Binary; import org.apache.iotdb.tsfile.utils.TsPrimitiveType; import org.apache.iotdb.tsfile.utils.TsPrimitiveType.TsBinary; @@ -31,6 +37,8 @@ import org.apache.iotdb.tsfile.utils.TsPrimitiveType.TsInt; import org.apache.iotdb.tsfile.utils.TsPrimitiveType.TsLong; +import java.util.ArrayList; + public class TimeValuePairUtils { private TimeValuePairUtils() {} @@ -107,4 +115,228 @@ public static TimeValuePair getEmptyTimeValuePair(TSDataType dataType) { throw new UnsupportedOperationException("Unrecognized datatype: " + dataType); } } + + public static Intervals extractTimeInterval(Filter filter) { + if (filter == null) { + return Intervals.ALL_INTERVAL; + } + // and, or, not, value, time, group by + // eq, neq, gt, gteq, lt, lteq, in + if (filter instanceof AndFilter) { + AndFilter andFilter = ((AndFilter) filter); + Intervals leftIntervals = extractTimeInterval(andFilter.getLeft()); + Intervals rightIntervals = extractTimeInterval(andFilter.getRight()); + return leftIntervals.intersection(rightIntervals); + } else if (filter instanceof OrFilter) { + OrFilter orFilter = ((OrFilter) filter); + Intervals leftIntervals = extractTimeInterval(orFilter.getLeft()); + Intervals rightIntervals = extractTimeInterval(orFilter.getRight()); + return leftIntervals.union(rightIntervals); + } else if (filter instanceof NotFilter) { + NotFilter notFilter = ((NotFilter) filter); + return extractTimeInterval(notFilter.getFilter()).not(); + } else if (filter instanceof TimeFilter.TimeGt) { + TimeFilter.TimeGt timeGt = ((TimeFilter.TimeGt) filter); + return new Intervals(((long) timeGt.getValue()) + 1, Long.MAX_VALUE); + } else if (filter instanceof TimeFilter.TimeGtEq) { + TimeFilter.TimeGtEq timeGtEq = ((TimeFilter.TimeGtEq) filter); + return new Intervals(((long) timeGtEq.getValue()), Long.MAX_VALUE); + } else if (filter instanceof TimeFilter.TimeEq) { + TimeFilter.TimeEq timeEq = ((TimeFilter.TimeEq) filter); + return new Intervals(((long) timeEq.getValue()), ((long) timeEq.getValue())); + } else if (filter instanceof TimeFilter.TimeNotEq) { + TimeFilter.TimeNotEq timeNotEq = ((TimeFilter.TimeNotEq) filter); + Intervals intervals = new Intervals(); + intervals.addInterval(Long.MIN_VALUE, (long) timeNotEq.getValue() - 1); + intervals.addInterval((long) timeNotEq.getValue() + 1, Long.MAX_VALUE); + return intervals; + } else if (filter instanceof TimeFilter.TimeLt) { + TimeFilter.TimeLt timeLt = ((TimeFilter.TimeLt) filter); + return new Intervals(Long.MIN_VALUE, (long) timeLt.getValue() - 1); + } else if (filter instanceof TimeFilter.TimeLtEq) { + TimeFilter.TimeLtEq timeLtEq = ((TimeFilter.TimeLtEq) filter); + return new Intervals(Long.MIN_VALUE, (long) timeLtEq.getValue()); + } else if (filter instanceof TimeFilter.TimeIn) { + TimeFilter.TimeIn timeIn = ((TimeFilter.TimeIn) filter); + Intervals intervals = new Intervals(); + for (Object value : timeIn.getValues()) { + long time = ((long) value); + intervals.addInterval(time, time); + } + return intervals; + } else if (filter instanceof GroupByFilter) { + GroupByFilter groupByFilter = ((GroupByFilter) filter); + return new Intervals(groupByFilter.getStartTime(), groupByFilter.getEndTime() + 1); + } + // value filter + return Intervals.ALL_INTERVAL; + } + + /** All intervals are closed. */ + public static class Intervals extends ArrayList { + + static final Intervals ALL_INTERVAL = new Intervals(Long.MIN_VALUE, Long.MAX_VALUE); + + public Intervals() { + super(); + } + + Intervals(long lowerBound, long upperBound) { + super(); + addInterval(lowerBound, upperBound); + } + + public int getIntervalSize() { + return size() / 2; + } + + public long getLowerBound(int index) { + return get(index * 2); + } + + public long getUpperBound(int index) { + return get(index * 2 + 1); + } + + void setLowerBound(int index, long lb) { + set(index * 2, lb); + } + + void setUpperBound(int index, long ub) { + set(index * 2 + 1, ub); + } + + public void addInterval(long lowerBound, long upperBound) { + add(lowerBound); + add(upperBound); + } + + Intervals intersection(Intervals that) { + Intervals result = new Intervals(); + int thisSize = this.getIntervalSize(); + int thatSize = that.getIntervalSize(); + for (int i = 0; i < thisSize; i++) { + for (int j = 0; j < thatSize; j++) { + long thisLB = this.getLowerBound(i); + long thisUB = this.getUpperBound(i); + long thatLB = that.getLowerBound(i); + long thatUB = that.getUpperBound(i); + if (thisUB >= thatLB) { + if (thisUB <= thatUB) { + result.addInterval(Math.max(thisLB, thatLB), thisUB); + } else if (thisLB <= thatUB) { + result.addInterval(Math.max(thisLB, thatLB), thatUB); + } + } + } + } + return result; + } + + /** + * The union is implemented by merge, so the two intervals must be ordered. + * + * @param that + * @return + */ + Intervals union(Intervals that) { + if (this.isEmpty()) { + return that; + } else if (that.isEmpty()) { + return this; + } + Intervals result = new Intervals(); + + int thisSize = this.getIntervalSize(); + int thatSize = that.getIntervalSize(); + int thisIndex = 0; + int thatIndex = 0; + // merge the heads of the two intervals + while (thisIndex < thisSize && thatIndex < thatSize) { + long thisLB = this.getLowerBound(thisIndex); + long thisUB = this.getUpperBound(thisIndex); + long thatLB = that.getLowerBound(thatIndex); + long thatUB = that.getUpperBound(thatIndex); + if (thisLB <= thatLB) { + result.mergeLast(thisLB, thisUB); + thisIndex++; + } else { + result.mergeLast(thatLB, thatUB); + thatIndex++; + } + } + // merge the remaining intervals + Intervals remainingIntervals = thisIndex < thisSize ? this : that; + int remainingIndex = thisIndex < thisSize ? thisIndex : thatIndex; + mergeRemainingIntervals(remainingIndex, remainingIntervals, result); + + return result; + } + + private void mergeRemainingIntervals( + int remainingIndex, Intervals remainingIntervals, Intervals result) { + for (int i = remainingIndex; i < remainingIntervals.getIntervalSize(); i++) { + long lb = remainingIntervals.getLowerBound(i); + long ub = remainingIntervals.getUpperBound(i); + result.mergeLast(lb, ub); + } + } + + /** + * Merge an interval of [lowerBound, upperBound] with the last interval if they can be merged, + * or just add it as the last interval if its lowerBound is larger than the upperBound of the + * last interval. If the upperBound of the new interval is less than the lowerBound of the last + * interval, nothing will be done. + * + * @param lowerBound + * @param upperBound + */ + private void mergeLast(long lowerBound, long upperBound) { + if (getIntervalSize() == 0) { + addInterval(lowerBound, upperBound); + return; + } + int lastIndex = getIntervalSize() - 1; + long lastLB = getLowerBound(lastIndex); + long lastUB = getUpperBound(lastIndex); + if (lowerBound > lastUB + 1) { + // e.g., last [3, 5], new [7, 10], just add the new interval + addInterval(lowerBound, upperBound); + return; + } + if (upperBound < lastLB - 1) { + // e.g., last [7, 10], new [3, 5], do nothing + return; + } + // merge the new interval into the last one + setLowerBound(lastIndex, Math.min(lastLB, lowerBound)); + setUpperBound(lastIndex, Math.max(lastUB, upperBound)); + } + + public Intervals not() { + if (isEmpty()) { + return ALL_INTERVAL; + } + Intervals result = new Intervals(); + long firstLB = getLowerBound(0); + if (firstLB != Long.MIN_VALUE) { + result.addInterval(Long.MIN_VALUE, firstLB - 1); + } + + int intervalSize = getIntervalSize(); + for (int i = 0; i < intervalSize - 1; i++) { + long currentUB = getUpperBound(i); + long nextLB = getLowerBound(i + 1); + if (currentUB + 1 <= nextLB - 1) { + result.addInterval(currentUB + 1, nextLB - 1); + } + } + + long lastUB = getUpperBound(result.getIntervalSize() - 1); + if (lastUB != Long.MAX_VALUE) { + result.addInterval(lastUB + 1, Long.MAX_VALUE); + } + return result; + } + } } From 21f0f71abeec43cb4ec7fb496e19f272b8f3d81e Mon Sep 17 00:00:00 2001 From: ijihang Date: Mon, 28 Jun 2021 16:10:06 +0800 Subject: [PATCH 2/6] cluster result set to null --- .../apache/iotdb/db/query/executor/QueryRouter.java | 11 +++++++++++ 1 file changed, 11 insertions(+) diff --git a/server/src/main/java/org/apache/iotdb/db/query/executor/QueryRouter.java b/server/src/main/java/org/apache/iotdb/db/query/executor/QueryRouter.java index 29254e72c339..6c5310255e3c 100644 --- a/server/src/main/java/org/apache/iotdb/db/query/executor/QueryRouter.java +++ b/server/src/main/java/org/apache/iotdb/db/query/executor/QueryRouter.java @@ -38,6 +38,7 @@ import org.apache.iotdb.db.query.dataset.groupby.GroupByWithoutValueFilterDataSet; import org.apache.iotdb.db.query.executor.fill.IFill; import org.apache.iotdb.db.service.IoTDB; +import org.apache.iotdb.db.utils.TimeValuePairUtils; import org.apache.iotdb.tsfile.exception.filter.QueryFilterOptimizationException; import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType; import org.apache.iotdb.tsfile.read.expression.ExpressionType; @@ -47,6 +48,8 @@ import org.apache.iotdb.tsfile.read.expression.util.ExpressionOptimizer; import org.apache.iotdb.tsfile.read.filter.GroupByFilter; import org.apache.iotdb.tsfile.read.filter.GroupByMonthFilter; +import org.apache.iotdb.tsfile.read.filter.basic.Filter; +import org.apache.iotdb.tsfile.read.query.dataset.EmptyDataSet; import org.apache.iotdb.tsfile.read.query.dataset.QueryDataSet; import org.slf4j.Logger; @@ -97,6 +100,14 @@ public QueryDataSet rawDataQuery(RawDataQueryPlan queryPlan, QueryContext contex throw new QueryProcessException(e); } return rawDataQueryExecutor.executeWithValueFilter(context); + } else if (optimizedExpression != null + && optimizedExpression.getType() == ExpressionType.GLOBAL_TIME) { + Filter timeFilter = ((GlobalTimeExpression) queryPlan.getExpression()).getFilter(); + TimeValuePairUtils.Intervals intervals = TimeValuePairUtils.extractTimeInterval(timeFilter); + if (intervals.isEmpty()) { + logger.warn("The interval of the filter {} is empty.", timeFilter); + return new EmptyDataSet(); + } } // Currently, we only group the vector partial paths for raw query without value filter From f7adc6fbd368555490f85b9926e868ebf67bbdd9 Mon Sep 17 00:00:00 2001 From: ijihang Date: Mon, 28 Jun 2021 16:31:41 +0800 Subject: [PATCH 3/6] cluster result set to null --- .../query/EmptyIntervalException.java | 28 ------------------- .../query/executor/RawDataQueryExecutor.java | 11 +------- 2 files changed, 1 insertion(+), 38 deletions(-) delete mode 100644 server/src/main/java/org/apache/iotdb/db/exception/query/EmptyIntervalException.java diff --git a/server/src/main/java/org/apache/iotdb/db/exception/query/EmptyIntervalException.java b/server/src/main/java/org/apache/iotdb/db/exception/query/EmptyIntervalException.java deleted file mode 100644 index ebc60874bcf4..000000000000 --- a/server/src/main/java/org/apache/iotdb/db/exception/query/EmptyIntervalException.java +++ /dev/null @@ -1,28 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.iotdb.db.exception.query; - -import org.apache.iotdb.tsfile.read.filter.basic.Filter; - -public class EmptyIntervalException extends Exception { - - public EmptyIntervalException(Filter filter) { - super(String.format("The interval of the filter %s is empty.", filter)); - } -} diff --git a/server/src/main/java/org/apache/iotdb/db/query/executor/RawDataQueryExecutor.java b/server/src/main/java/org/apache/iotdb/db/query/executor/RawDataQueryExecutor.java index ada73437c410..e2c0b4477a2d 100644 --- a/server/src/main/java/org/apache/iotdb/db/query/executor/RawDataQueryExecutor.java +++ b/server/src/main/java/org/apache/iotdb/db/query/executor/RawDataQueryExecutor.java @@ -22,7 +22,6 @@ import org.apache.iotdb.db.engine.querycontext.QueryDataSource; import org.apache.iotdb.db.engine.storagegroup.StorageGroupProcessor; import org.apache.iotdb.db.exception.StorageEngineException; -import org.apache.iotdb.db.exception.query.EmptyIntervalException; import org.apache.iotdb.db.exception.query.QueryProcessException; import org.apache.iotdb.db.metadata.PartialPath; import org.apache.iotdb.db.qp.physical.crud.RawDataQueryPlan; @@ -36,7 +35,6 @@ import org.apache.iotdb.db.query.reader.series.SeriesRawDataBatchReader; import org.apache.iotdb.db.query.reader.series.SeriesReaderByTimestamp; import org.apache.iotdb.db.query.timegenerator.ServerTimeGenerator; -import org.apache.iotdb.db.utils.TimeValuePairUtils; import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType; import org.apache.iotdb.tsfile.read.expression.impl.GlobalTimeExpression; import org.apache.iotdb.tsfile.read.filter.basic.Filter; @@ -66,8 +64,8 @@ public QueryDataSet executeWithoutValueFilter(QueryContext context) if (dataSet != null) { return dataSet; } + List readersOfSelectedSeries = initManagedSeriesReader(context); try { - List readersOfSelectedSeries = initManagedSeriesReader(context); return new RawQueryDataSetWithoutValueFilter( context.getQueryId(), queryPlan.getDeduplicatedPaths(), @@ -113,11 +111,6 @@ protected List initManagedSeriesReader(QueryContext context QueryDataSource queryDataSource = QueryResourceManager.getInstance().getQueryDataSource(path, context, timeFilter); - timeFilter = queryDataSource.updateFilterUsingTTL(timeFilter); - TimeValuePairUtils.Intervals intervals = TimeValuePairUtils.extractTimeInterval(timeFilter); - if (intervals.isEmpty()) { - throw new EmptyIntervalException(timeFilter); - } ManagedSeriesReader reader = new SeriesRawDataBatchReader( path, @@ -131,8 +124,6 @@ protected List initManagedSeriesReader(QueryContext context queryPlan.isAscending()); readersOfSelectedSeries.add(reader); } - } catch (EmptyIntervalException e) { - throw new StorageEngineException(e.getMessage()); } finally { StorageEngine.getInstance().mergeUnLock(list); } From 15837556051250a9f92ee41e22e19522a6e6d3ae Mon Sep 17 00:00:00 2001 From: ijihang Date: Mon, 28 Jun 2021 16:34:10 +0800 Subject: [PATCH 4/6] cluster result set to null --- .../org/apache/iotdb/db/query/executor/RawDataQueryExecutor.java | 1 + 1 file changed, 1 insertion(+) diff --git a/server/src/main/java/org/apache/iotdb/db/query/executor/RawDataQueryExecutor.java b/server/src/main/java/org/apache/iotdb/db/query/executor/RawDataQueryExecutor.java index e2c0b4477a2d..adab42f56578 100644 --- a/server/src/main/java/org/apache/iotdb/db/query/executor/RawDataQueryExecutor.java +++ b/server/src/main/java/org/apache/iotdb/db/query/executor/RawDataQueryExecutor.java @@ -111,6 +111,7 @@ protected List initManagedSeriesReader(QueryContext context QueryDataSource queryDataSource = QueryResourceManager.getInstance().getQueryDataSource(path, context, timeFilter); + timeFilter = queryDataSource.updateFilterUsingTTL(timeFilter); ManagedSeriesReader reader = new SeriesRawDataBatchReader( path, From c2d13692a105da7d567fb21f45a130d82a550832 Mon Sep 17 00:00:00 2001 From: ijihang Date: Mon, 28 Jun 2021 19:46:06 +0800 Subject: [PATCH 5/6] mvn spotless:apply and add test --- .../query/executor/RawDataQueryExecutor.java | 1 + .../integration/IoTDBSequenceDataQueryIT.java | 40 +++++++++++++++++++ 2 files changed, 41 insertions(+) diff --git a/server/src/main/java/org/apache/iotdb/db/query/executor/RawDataQueryExecutor.java b/server/src/main/java/org/apache/iotdb/db/query/executor/RawDataQueryExecutor.java index adab42f56578..ac13c1eb1f9a 100644 --- a/server/src/main/java/org/apache/iotdb/db/query/executor/RawDataQueryExecutor.java +++ b/server/src/main/java/org/apache/iotdb/db/query/executor/RawDataQueryExecutor.java @@ -112,6 +112,7 @@ protected List initManagedSeriesReader(QueryContext context QueryDataSource queryDataSource = QueryResourceManager.getInstance().getQueryDataSource(path, context, timeFilter); timeFilter = queryDataSource.updateFilterUsingTTL(timeFilter); + ManagedSeriesReader reader = new SeriesRawDataBatchReader( path, diff --git a/server/src/test/java/org/apache/iotdb/db/integration/IoTDBSequenceDataQueryIT.java b/server/src/test/java/org/apache/iotdb/db/integration/IoTDBSequenceDataQueryIT.java index 2c69a5293505..2fc49778875e 100644 --- a/server/src/test/java/org/apache/iotdb/db/integration/IoTDBSequenceDataQueryIT.java +++ b/server/src/test/java/org/apache/iotdb/db/integration/IoTDBSequenceDataQueryIT.java @@ -40,6 +40,7 @@ import org.apache.iotdb.tsfile.read.expression.impl.SingleSeriesExpression; import org.apache.iotdb.tsfile.read.filter.TimeFilter; import org.apache.iotdb.tsfile.read.filter.ValueFilter; +import org.apache.iotdb.tsfile.read.filter.operator.AndFilter; import org.apache.iotdb.tsfile.read.query.dataset.QueryDataSet; import org.junit.AfterClass; @@ -316,4 +317,43 @@ public void readWithValueFilterTest() QueryResourceManager.getInstance().endQuery(TEST_QUERY_JOB_ID); } + + @Test + public void readIncorrectTimeFilterTest() + throws IllegalPathException, QueryProcessException, StorageEngineException, IOException { + + QueryRouter queryRouter = new QueryRouter(); + List pathList = new ArrayList<>(); + List dataTypes = new ArrayList<>(); + pathList.add( + new PartialPath(TestConstant.d0 + TsFileConstant.PATH_SEPARATOR + TestConstant.s0)); + dataTypes.add(TSDataType.INT32); + pathList.add( + new PartialPath(TestConstant.d1 + TsFileConstant.PATH_SEPARATOR + TestConstant.s0)); + dataTypes.add(TSDataType.INT32); + + TimeFilter.TimeGt gtRight = TimeFilter.gt(10L); + TimeFilter.TimeLt ltLeft = TimeFilter.lt(5L); + AndFilter andFilter = new AndFilter(ltLeft, gtRight); + + GlobalTimeExpression globalTimeExpression = new GlobalTimeExpression(andFilter); + TEST_QUERY_JOB_ID = + QueryResourceManager.getInstance().assignQueryId(true, 1024, pathList.size()); + TEST_QUERY_CONTEXT = new QueryContext(TEST_QUERY_JOB_ID); + + RawDataQueryPlan queryPlan = new RawDataQueryPlan(); + queryPlan.setDeduplicatedDataTypes(dataTypes); + queryPlan.setDeduplicatedPathsAndUpdate(pathList); + queryPlan.setExpression(globalTimeExpression); + QueryDataSet queryDataSet = queryRouter.rawDataQuery(queryPlan, TEST_QUERY_CONTEXT); + + int cnt = 0; + while (queryDataSet.hasNext()) { + queryDataSet.next(); + cnt++; + } + assertEquals(0, cnt); + + QueryResourceManager.getInstance().endQuery(TEST_QUERY_JOB_ID); + } } From 681e4c4f09bde4bf31afdd1f27ec8341ac30fde2 Mon Sep 17 00:00:00 2001 From: ijihang Date: Tue, 29 Jun 2021 19:46:48 +0800 Subject: [PATCH 6/6] Code integration --- .../query/fill/ClusterPreviousFill.java | 2 +- .../server/member/MetaGroupMember.java | 5 +- .../iotdb/cluster/utils/PartitionUtils.java | 237 ------------------ .../server/member/MetaGroupMemberTest.java | 6 +- 4 files changed, 7 insertions(+), 243 deletions(-) diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/fill/ClusterPreviousFill.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/fill/ClusterPreviousFill.java index 33274e3b5549..b72bebea6f52 100644 --- a/cluster/src/main/java/org/apache/iotdb/cluster/query/fill/ClusterPreviousFill.java +++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/fill/ClusterPreviousFill.java @@ -32,12 +32,12 @@ import org.apache.iotdb.cluster.server.handlers.caller.PreviousFillHandler; import org.apache.iotdb.cluster.server.member.DataGroupMember; import org.apache.iotdb.cluster.server.member.MetaGroupMember; -import org.apache.iotdb.cluster.utils.PartitionUtils.Intervals; import org.apache.iotdb.db.exception.StorageEngineException; import org.apache.iotdb.db.exception.query.QueryProcessException; import org.apache.iotdb.db.metadata.PartialPath; import org.apache.iotdb.db.query.context.QueryContext; import org.apache.iotdb.db.query.executor.fill.PreviousFill; +import org.apache.iotdb.db.utils.TimeValuePairUtils.Intervals; import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType; import org.apache.iotdb.tsfile.read.TimeValuePair; diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/MetaGroupMember.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/MetaGroupMember.java index 0224c29ca50f..cb18b79d09fb 100644 --- a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/MetaGroupMember.java +++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/MetaGroupMember.java @@ -79,7 +79,6 @@ import org.apache.iotdb.cluster.utils.ClientUtils; import org.apache.iotdb.cluster.utils.ClusterUtils; import org.apache.iotdb.cluster.utils.PartitionUtils; -import org.apache.iotdb.cluster.utils.PartitionUtils.Intervals; import org.apache.iotdb.cluster.utils.StatusUtils; import org.apache.iotdb.db.conf.IoTDBDescriptor; import org.apache.iotdb.db.engine.StorageEngine; @@ -91,6 +90,8 @@ import org.apache.iotdb.db.qp.physical.PhysicalPlan; import org.apache.iotdb.db.service.IoTDB; import org.apache.iotdb.db.utils.TestOnly; +import org.apache.iotdb.db.utils.TimeValuePairUtils; +import org.apache.iotdb.db.utils.TimeValuePairUtils.Intervals; import org.apache.iotdb.service.rpc.thrift.EndPoint; import org.apache.iotdb.service.rpc.thrift.TSStatus; import org.apache.iotdb.tsfile.read.filter.basic.Filter; @@ -1422,7 +1423,7 @@ public TSStatus processNonPartitionedMetaPlan(PhysicalPlan plan) { */ public List routeFilter(Filter filter, PartialPath path) throws StorageEngineException, EmptyIntervalException { - Intervals intervals = PartitionUtils.extractTimeInterval(filter); + Intervals intervals = TimeValuePairUtils.extractTimeInterval(filter); if (intervals.isEmpty()) { throw new EmptyIntervalException(filter); } diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/utils/PartitionUtils.java b/cluster/src/main/java/org/apache/iotdb/cluster/utils/PartitionUtils.java index 1dd6189b1f8f..1dce2aea22eb 100644 --- a/cluster/src/main/java/org/apache/iotdb/cluster/utils/PartitionUtils.java +++ b/cluster/src/main/java/org/apache/iotdb/cluster/utils/PartitionUtils.java @@ -41,21 +41,8 @@ import org.apache.iotdb.db.qp.physical.sys.SetTTLPlan; import org.apache.iotdb.db.qp.physical.sys.ShowTTLPlan; import org.apache.iotdb.service.rpc.thrift.TSStatus; -import org.apache.iotdb.tsfile.read.filter.GroupByFilter; -import org.apache.iotdb.tsfile.read.filter.TimeFilter.TimeEq; -import org.apache.iotdb.tsfile.read.filter.TimeFilter.TimeGt; -import org.apache.iotdb.tsfile.read.filter.TimeFilter.TimeGtEq; -import org.apache.iotdb.tsfile.read.filter.TimeFilter.TimeIn; -import org.apache.iotdb.tsfile.read.filter.TimeFilter.TimeLt; -import org.apache.iotdb.tsfile.read.filter.TimeFilter.TimeLtEq; -import org.apache.iotdb.tsfile.read.filter.TimeFilter.TimeNotEq; -import org.apache.iotdb.tsfile.read.filter.basic.Filter; -import org.apache.iotdb.tsfile.read.filter.operator.AndFilter; -import org.apache.iotdb.tsfile.read.filter.operator.NotFilter; -import org.apache.iotdb.tsfile.read.filter.operator.OrFilter; import org.apache.iotdb.tsfile.utils.Murmur128Hash; -import java.util.ArrayList; import java.util.List; import java.util.Set; @@ -154,230 +141,6 @@ public static void reordering(InsertTabletPlan plan, TSStatus[] status, TSStatus } } - public static Intervals extractTimeInterval(Filter filter) { - if (filter == null) { - return Intervals.ALL_INTERVAL; - } - // and, or, not, value, time, group by - // eq, neq, gt, gteq, lt, lteq, in - if (filter instanceof AndFilter) { - AndFilter andFilter = ((AndFilter) filter); - Intervals leftIntervals = extractTimeInterval(andFilter.getLeft()); - Intervals rightIntervals = extractTimeInterval(andFilter.getRight()); - return leftIntervals.intersection(rightIntervals); - } else if (filter instanceof OrFilter) { - OrFilter orFilter = ((OrFilter) filter); - Intervals leftIntervals = extractTimeInterval(orFilter.getLeft()); - Intervals rightIntervals = extractTimeInterval(orFilter.getRight()); - return leftIntervals.union(rightIntervals); - } else if (filter instanceof NotFilter) { - NotFilter notFilter = ((NotFilter) filter); - return extractTimeInterval(notFilter.getFilter()).not(); - } else if (filter instanceof TimeGt) { - TimeGt timeGt = ((TimeGt) filter); - return new Intervals(((long) timeGt.getValue()) + 1, Long.MAX_VALUE); - } else if (filter instanceof TimeGtEq) { - TimeGtEq timeGtEq = ((TimeGtEq) filter); - return new Intervals(((long) timeGtEq.getValue()), Long.MAX_VALUE); - } else if (filter instanceof TimeEq) { - TimeEq timeEq = ((TimeEq) filter); - return new Intervals(((long) timeEq.getValue()), ((long) timeEq.getValue())); - } else if (filter instanceof TimeNotEq) { - TimeNotEq timeNotEq = ((TimeNotEq) filter); - Intervals intervals = new Intervals(); - intervals.addInterval(Long.MIN_VALUE, (long) timeNotEq.getValue() - 1); - intervals.addInterval((long) timeNotEq.getValue() + 1, Long.MAX_VALUE); - return intervals; - } else if (filter instanceof TimeLt) { - TimeLt timeLt = ((TimeLt) filter); - return new Intervals(Long.MIN_VALUE, (long) timeLt.getValue() - 1); - } else if (filter instanceof TimeLtEq) { - TimeLtEq timeLtEq = ((TimeLtEq) filter); - return new Intervals(Long.MIN_VALUE, (long) timeLtEq.getValue()); - } else if (filter instanceof TimeIn) { - TimeIn timeIn = ((TimeIn) filter); - Intervals intervals = new Intervals(); - for (Object value : timeIn.getValues()) { - long time = ((long) value); - intervals.addInterval(time, time); - } - return intervals; - } else if (filter instanceof GroupByFilter) { - GroupByFilter groupByFilter = ((GroupByFilter) filter); - return new Intervals(groupByFilter.getStartTime(), groupByFilter.getEndTime() + 1); - } - // value filter - return Intervals.ALL_INTERVAL; - } - - /** All intervals are closed. */ - public static class Intervals extends ArrayList { - - static final Intervals ALL_INTERVAL = new Intervals(Long.MIN_VALUE, Long.MAX_VALUE); - - public Intervals() { - super(); - } - - Intervals(long lowerBound, long upperBound) { - super(); - addInterval(lowerBound, upperBound); - } - - public int getIntervalSize() { - return size() / 2; - } - - public long getLowerBound(int index) { - return get(index * 2); - } - - public long getUpperBound(int index) { - return get(index * 2 + 1); - } - - void setLowerBound(int index, long lb) { - set(index * 2, lb); - } - - void setUpperBound(int index, long ub) { - set(index * 2 + 1, ub); - } - - public void addInterval(long lowerBound, long upperBound) { - add(lowerBound); - add(upperBound); - } - - Intervals intersection(Intervals that) { - Intervals result = new Intervals(); - int thisSize = this.getIntervalSize(); - int thatSize = that.getIntervalSize(); - for (int i = 0; i < thisSize; i++) { - for (int j = 0; j < thatSize; j++) { - long thisLB = this.getLowerBound(i); - long thisUB = this.getUpperBound(i); - long thatLB = that.getLowerBound(i); - long thatUB = that.getUpperBound(i); - if (thisUB >= thatLB) { - if (thisUB <= thatUB) { - result.addInterval(Math.max(thisLB, thatLB), thisUB); - } else if (thisLB <= thatUB) { - result.addInterval(Math.max(thisLB, thatLB), thatUB); - } - } - } - } - return result; - } - - /** - * The union is implemented by merge, so the two intervals must be ordered. - * - * @param that - * @return - */ - Intervals union(Intervals that) { - if (this.isEmpty()) { - return that; - } else if (that.isEmpty()) { - return this; - } - Intervals result = new Intervals(); - - int thisSize = this.getIntervalSize(); - int thatSize = that.getIntervalSize(); - int thisIndex = 0; - int thatIndex = 0; - // merge the heads of the two intervals - while (thisIndex < thisSize && thatIndex < thatSize) { - long thisLB = this.getLowerBound(thisIndex); - long thisUB = this.getUpperBound(thisIndex); - long thatLB = that.getLowerBound(thatIndex); - long thatUB = that.getUpperBound(thatIndex); - if (thisLB <= thatLB) { - result.mergeLast(thisLB, thisUB); - thisIndex++; - } else { - result.mergeLast(thatLB, thatUB); - thatIndex++; - } - } - // merge the remaining intervals - Intervals remainingIntervals = thisIndex < thisSize ? this : that; - int remainingIndex = thisIndex < thisSize ? thisIndex : thatIndex; - mergeRemainingIntervals(remainingIndex, remainingIntervals, result); - - return result; - } - - private void mergeRemainingIntervals( - int remainingIndex, Intervals remainingIntervals, Intervals result) { - for (int i = remainingIndex; i < remainingIntervals.getIntervalSize(); i++) { - long lb = remainingIntervals.getLowerBound(i); - long ub = remainingIntervals.getUpperBound(i); - result.mergeLast(lb, ub); - } - } - - /** - * Merge an interval of [lowerBound, upperBound] with the last interval if they can be merged, - * or just add it as the last interval if its lowerBound is larger than the upperBound of the - * last interval. If the upperBound of the new interval is less than the lowerBound of the last - * interval, nothing will be done. - * - * @param lowerBound - * @param upperBound - */ - private void mergeLast(long lowerBound, long upperBound) { - if (getIntervalSize() == 0) { - addInterval(lowerBound, upperBound); - return; - } - int lastIndex = getIntervalSize() - 1; - long lastLB = getLowerBound(lastIndex); - long lastUB = getUpperBound(lastIndex); - if (lowerBound > lastUB + 1) { - // e.g., last [3, 5], new [7, 10], just add the new interval - addInterval(lowerBound, upperBound); - return; - } - if (upperBound < lastLB - 1) { - // e.g., last [7, 10], new [3, 5], do nothing - return; - } - // merge the new interval into the last one - setLowerBound(lastIndex, Math.min(lastLB, lowerBound)); - setUpperBound(lastIndex, Math.max(lastUB, upperBound)); - } - - public Intervals not() { - if (isEmpty()) { - return ALL_INTERVAL; - } - Intervals result = new Intervals(); - long firstLB = getLowerBound(0); - if (firstLB != Long.MIN_VALUE) { - result.addInterval(Long.MIN_VALUE, firstLB - 1); - } - - int intervalSize = getIntervalSize(); - for (int i = 0; i < intervalSize - 1; i++) { - long currentUB = getUpperBound(i); - long nextLB = getLowerBound(i + 1); - if (currentUB + 1 <= nextLB - 1) { - result.addInterval(currentUB + 1, nextLB - 1); - } - } - - long lastUB = getUpperBound(result.getIntervalSize() - 1); - if (lastUB != Long.MAX_VALUE) { - result.addInterval(lastUB + 1, Long.MAX_VALUE); - } - return result; - } - } - /** * Calculate the headers of the groups that possibly store the data of a timeseries over the given * time range. diff --git a/cluster/src/test/java/org/apache/iotdb/cluster/server/member/MetaGroupMemberTest.java b/cluster/src/test/java/org/apache/iotdb/cluster/server/member/MetaGroupMemberTest.java index 1e94fe745b3f..1e732ccfed26 100644 --- a/cluster/src/test/java/org/apache/iotdb/cluster/server/member/MetaGroupMemberTest.java +++ b/cluster/src/test/java/org/apache/iotdb/cluster/server/member/MetaGroupMemberTest.java @@ -67,7 +67,6 @@ import org.apache.iotdb.cluster.server.service.MetaAsyncService; import org.apache.iotdb.cluster.utils.ClusterUtils; import org.apache.iotdb.cluster.utils.Constants; -import org.apache.iotdb.cluster.utils.PartitionUtils; import org.apache.iotdb.cluster.utils.StatusUtils; import org.apache.iotdb.db.auth.AuthException; import org.apache.iotdb.db.auth.authorizer.IAuthorizer; @@ -93,6 +92,7 @@ import org.apache.iotdb.db.query.reader.series.IReaderByTimestamp; import org.apache.iotdb.db.query.reader.series.ManagedSeriesReader; import org.apache.iotdb.db.service.IoTDB; +import org.apache.iotdb.db.utils.TimeValuePairUtils; import org.apache.iotdb.rpc.TSStatusCode; import org.apache.iotdb.service.rpc.thrift.TSStatus; import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType; @@ -1334,7 +1334,7 @@ public void testRouteIntervalsDisablePartition() StorageEngine.setEnablePartition(false); testMetaMember.setCharacter(LEADER); testMetaMember.setLeader(testMetaMember.getThisNode()); - PartitionUtils.Intervals intervals = new PartitionUtils.Intervals(); + TimeValuePairUtils.Intervals intervals = new TimeValuePairUtils.Intervals(); intervals.addInterval(Long.MIN_VALUE, Long.MAX_VALUE); List partitionGroups = @@ -1350,7 +1350,7 @@ public void testRouteIntervalsEnablePartition() StorageEngine.setEnablePartition(true); testMetaMember.setCharacter(LEADER); testMetaMember.setLeader(testMetaMember.getThisNode()); - PartitionUtils.Intervals intervals = new PartitionUtils.Intervals(); + TimeValuePairUtils.Intervals intervals = new TimeValuePairUtils.Intervals(); intervals.addInterval(Long.MIN_VALUE, Long.MAX_VALUE); List partitionGroups =