diff --git a/docs/multi-stage-query/reference.md b/docs/multi-stage-query/reference.md
index 8de7507f5205..de95b569232e 100644
--- a/docs/multi-stage-query/reference.md
+++ b/docs/multi-stage-query/reference.md
@@ -674,7 +674,6 @@ The following table describes error codes you may encounter in the `multiStageQu
| `InsertCannotAllocateSegment` | The controller task could not allocate a new segment ID due to conflict with existing segments or pending segments. Common reasons for such conflicts:
Attempting to mix different granularities in the same intervals of the same datasource.
Prior ingestions that used non-extendable shard specs.
| `dataSource`
`interval`: The interval for the attempted new segment allocation. |
| `InsertCannotBeEmpty` | An INSERT or REPLACE query did not generate any output rows in a situation where output rows are required for success. This can happen for INSERT or REPLACE queries with `PARTITIONED BY` set to something other than `ALL` or `ALL TIME`. | `dataSource` |
| `InsertCannotOrderByDescending` | An INSERT query contained a `CLUSTERED BY` expression in descending order. Druid's segment generation code only supports ascending order. | `columnName` |
-| `InsertCannotReplaceExistingSegment` | A REPLACE query cannot proceed because an existing segment partially overlaps those bounds, and the portion within the bounds is not fully overshadowed by query results.
There are two ways to address this without modifying your query:
Shrink the OVERLAP filter to match the query results.
Expand the OVERLAP filter to fully contain the existing segment.
| `segmentId`: The existing segment
| `InsertLockPreempted` | An INSERT or REPLACE query was canceled by a higher-priority ingestion job, such as a real-time ingestion task. | |
| `InsertTimeNull` | An INSERT or REPLACE query encountered a null timestamp in the `__time` field.
This can happen due to using an expression like `TIME_PARSE(timestamp) AS __time` with a timestamp that cannot be parsed. (TIME_PARSE returns null when it cannot parse a timestamp.) In this case, try parsing your timestamps using a different function or pattern.
If your timestamps may genuinely be null, consider using COALESCE to provide a default value. One option is CURRENT_TIMESTAMP, which represents the start time of the job. |
| `InsertTimeOutOfBounds` | A REPLACE query generated a timestamp outside the bounds of the TIMESTAMP parameter for your OVERWRITE WHERE clause.
To avoid this error, verify that the you specified is valid. | `interval`: time chunk interval corresponding to the out-of-bounds timestamp |
diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java
index e190edae9e67..180323caab02 100644
--- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java
+++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java
@@ -24,7 +24,6 @@
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
-import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Iterables;
import com.google.common.collect.Iterators;
import com.google.common.util.concurrent.FutureCallback;
@@ -62,12 +61,11 @@
import org.apache.druid.indexing.common.TaskReport;
import org.apache.druid.indexing.common.actions.LockListAction;
import org.apache.druid.indexing.common.actions.MarkSegmentsAsUnusedAction;
-import org.apache.druid.indexing.common.actions.RetrieveUsedSegmentsAction;
import org.apache.druid.indexing.common.actions.SegmentAllocateAction;
import org.apache.druid.indexing.common.actions.SegmentTransactionalInsertAction;
import org.apache.druid.indexing.common.actions.TaskActionClient;
+import org.apache.druid.indexing.common.task.batch.parallel.TombstoneHelper;
import org.apache.druid.indexing.overlord.SegmentPublishResult;
-import org.apache.druid.indexing.overlord.Segments;
import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.common.IAE;
import org.apache.druid.java.util.common.ISE;
@@ -103,7 +101,6 @@
import org.apache.druid.msq.indexing.error.InsertCannotAllocateSegmentFault;
import org.apache.druid.msq.indexing.error.InsertCannotBeEmptyFault;
import org.apache.druid.msq.indexing.error.InsertCannotOrderByDescendingFault;
-import org.apache.druid.msq.indexing.error.InsertCannotReplaceExistingSegmentFault;
import org.apache.druid.msq.indexing.error.InsertLockPreemptedFault;
import org.apache.druid.msq.indexing.error.InsertTimeOutOfBoundsFault;
import org.apache.druid.msq.indexing.error.MSQErrorReport;
@@ -1247,48 +1244,33 @@ private void postResultPartitionBoundariesForStage(
/**
* Publish the list of segments. Additionally, if {@link DataSourceMSQDestination#isReplaceTimeChunks()},
* also drop all other segments within the replacement intervals.
- *
- * If any existing segments cannot be dropped because their intervals are not wholly contained within the
- * replacement parameter, throws a {@link MSQException} with {@link InsertCannotReplaceExistingSegmentFault}.
*/
private void publishAllSegments(final Set segments) throws IOException
{
final DataSourceMSQDestination destination =
(DataSourceMSQDestination) task.getQuerySpec().getDestination();
- final Set segmentsToDrop;
+ final Set segmentsWithTombstones = new HashSet<>(segments);
if (destination.isReplaceTimeChunks()) {
final List intervalsToDrop = findIntervalsToDrop(Preconditions.checkNotNull(segments, "segments"));
- if (intervalsToDrop.isEmpty()) {
- segmentsToDrop = null;
- } else {
- // Determine which segments to drop as part of the replace operation. This is safe because, in the case where we
- // are doing a replace, the isReady method (which runs prior to the task starting) acquires an exclusive lock.
- segmentsToDrop =
- ImmutableSet.copyOf(
- context.taskActionClient().submit(
- new RetrieveUsedSegmentsAction(
- task.getDataSource(),
- null,
- intervalsToDrop,
- Segments.ONLY_VISIBLE
- )
- )
- );
-
- // Validate that there are no segments that partially overlap the intervals-to-drop. Otherwise, the replace
- // may be incomplete.
- for (final DataSegment segmentToDrop : segmentsToDrop) {
- if (destination.getReplaceTimeChunks()
- .stream()
- .noneMatch(interval -> interval.contains(segmentToDrop.getInterval()))) {
- throw new MSQException(new InsertCannotReplaceExistingSegmentFault(segmentToDrop.getId()));
- }
+ if (!intervalsToDrop.isEmpty()) {
+ TombstoneHelper tombstoneHelper = new TombstoneHelper(context.taskActionClient());
+ try {
+ Set tombstones = tombstoneHelper.computeTombstoneSegmentsForReplace(
+ intervalsToDrop,
+ destination.getReplaceTimeChunks(),
+ task.getDataSource(),
+ destination.getSegmentGranularity()
+ );
+ segmentsWithTombstones.addAll(tombstones);
+ }
+ catch (IllegalStateException e) {
+ throw new MSQException(e, InsertLockPreemptedFault.instance());
}
}
- if (segments.isEmpty()) {
+ if (segmentsWithTombstones.isEmpty()) {
// Nothing to publish, only drop. We already validated that the intervalsToDrop do not have any
// partially-overlapping segments, so it's safe to drop them as intervals instead of as specific segments.
for (final Interval interval : intervalsToDrop) {
@@ -1298,7 +1280,7 @@ private void publishAllSegments(final Set segments) throws IOExcept
} else {
performSegmentPublish(
context.taskActionClient(),
- SegmentTransactionalInsertAction.overwriteAction(null, segmentsToDrop, segments)
+ SegmentTransactionalInsertAction.overwriteAction(null, null, segmentsWithTombstones)
);
}
} else if (!segments.isEmpty()) {
@@ -2590,6 +2572,7 @@ static ClusterStatisticsMergeMode finalizeClusterStatisticsMergeMode(
return mergeMode;
}
+
/**
* Interface used by {@link #contactWorkersForStage}.
*/
diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/guice/MSQIndexingModule.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/guice/MSQIndexingModule.java
index 73e87b5e6657..dccd42bad94f 100644
--- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/guice/MSQIndexingModule.java
+++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/guice/MSQIndexingModule.java
@@ -45,7 +45,6 @@
import org.apache.druid.msq.indexing.error.InsertCannotAllocateSegmentFault;
import org.apache.druid.msq.indexing.error.InsertCannotBeEmptyFault;
import org.apache.druid.msq.indexing.error.InsertCannotOrderByDescendingFault;
-import org.apache.druid.msq.indexing.error.InsertCannotReplaceExistingSegmentFault;
import org.apache.druid.msq.indexing.error.InsertLockPreemptedFault;
import org.apache.druid.msq.indexing.error.InsertTimeNullFault;
import org.apache.druid.msq.indexing.error.InsertTimeOutOfBoundsFault;
@@ -106,7 +105,6 @@ public class MSQIndexingModule implements DruidModule
InsertCannotAllocateSegmentFault.class,
InsertCannotBeEmptyFault.class,
InsertCannotOrderByDescendingFault.class,
- InsertCannotReplaceExistingSegmentFault.class,
InsertLockPreemptedFault.class,
InsertTimeNullFault.class,
InsertTimeOutOfBoundsFault.class,
diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/error/InsertCannotReplaceExistingSegmentFault.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/error/InsertCannotReplaceExistingSegmentFault.java
deleted file mode 100644
index ed1d14fcc005..000000000000
--- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/error/InsertCannotReplaceExistingSegmentFault.java
+++ /dev/null
@@ -1,76 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.druid.msq.indexing.error;
-
-import com.fasterxml.jackson.annotation.JsonProperty;
-import org.apache.druid.timeline.SegmentId;
-
-import java.util.Objects;
-
-public class InsertCannotReplaceExistingSegmentFault extends BaseMSQFault
-{
- static final String CODE = "InsertCannotReplaceExistingSegment";
-
- private final String segmentId;
-
- public InsertCannotReplaceExistingSegmentFault(@JsonProperty("segmentId") String segmentId)
- {
- super(
- CODE,
- "Cannot replace existing segment [%s] because it is not within the "
- + "bounds specified by replaceExistingTimeChunks",
- segmentId
- );
- this.segmentId = segmentId;
- }
-
- public InsertCannotReplaceExistingSegmentFault(final SegmentId segmentId)
- {
- this(segmentId.toString());
- }
-
- @JsonProperty
- public String getSegmentId()
- {
- return segmentId;
- }
-
- @Override
- public boolean equals(Object o)
- {
- if (this == o) {
- return true;
- }
- if (o == null || getClass() != o.getClass()) {
- return false;
- }
- if (!super.equals(o)) {
- return false;
- }
- InsertCannotReplaceExistingSegmentFault that = (InsertCannotReplaceExistingSegmentFault) o;
- return Objects.equals(segmentId, that.segmentId);
- }
-
- @Override
- public int hashCode()
- {
- return Objects.hash(super.hashCode(), segmentId);
- }
-}
diff --git a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQFaultsTest.java b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQFaultsTest.java
index f7028f57aad2..4a9c34bc79aa 100644
--- a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQFaultsTest.java
+++ b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQFaultsTest.java
@@ -21,14 +21,12 @@
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
-import org.apache.druid.indexing.common.actions.RetrieveUsedSegmentsAction;
import org.apache.druid.indexing.common.actions.SegmentAllocateAction;
import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.msq.indexing.error.InsertCannotAllocateSegmentFault;
import org.apache.druid.msq.indexing.error.InsertCannotBeEmptyFault;
import org.apache.druid.msq.indexing.error.InsertCannotOrderByDescendingFault;
-import org.apache.druid.msq.indexing.error.InsertCannotReplaceExistingSegmentFault;
import org.apache.druid.msq.indexing.error.InsertTimeNullFault;
import org.apache.druid.msq.indexing.error.InsertTimeOutOfBoundsFault;
import org.apache.druid.msq.indexing.error.TooManyClusteredByColumnsFault;
@@ -40,7 +38,6 @@
import org.apache.druid.msq.test.MSQTestFileUtils;
import org.apache.druid.segment.column.ColumnType;
import org.apache.druid.segment.column.RowSignature;
-import org.apache.druid.timeline.DataSegment;
import org.apache.druid.timeline.SegmentId;
import org.junit.Test;
import org.mockito.Mockito;
@@ -114,30 +111,6 @@ public void testInsertCannotOrderByDescendingFault()
.verifyResults();
}
- @Test
- public void testInsertCannotReplaceExistingSegmentFault()
- {
- RowSignature rowSignature = RowSignature.builder()
- .add("__time", ColumnType.LONG)
- .add("dim1", ColumnType.STRING)
- .add("cnt", ColumnType.LONG).build();
-
- // Create a datasegment which lies partially outside the generated segment
- DataSegment existingDataSegment = DataSegment.builder()
- .interval(Intervals.of("2001-01-01T/2003-01-04T"))
- .size(50)
- .version("1").dataSource("foo1")
- .build();
- Mockito.doReturn(ImmutableSet.of(existingDataSegment)).when(testTaskActionClient).submit(isA(RetrieveUsedSegmentsAction.class));
-
- testIngestQuery().setSql(
- "replace into foo1 overwrite where __time >= TIMESTAMP '2000-01-01 00:00:00' and __time < TIMESTAMP '2002-01-03 00:00:00' select __time, dim1 , count(*) as cnt from foo where dim1 is not null group by 1, 2 PARTITIONED by day clustered by dim1")
- .setExpectedDataSource("foo1")
- .setExpectedRowSignature(rowSignature)
- .setExpectedMSQFault(new InsertCannotReplaceExistingSegmentFault(existingDataSegment.getId()))
- .verifyResults();
- }
-
@Test
public void testInsertTimeOutOfBoundsFault()
{
diff --git a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQReplaceTest.java b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQReplaceTest.java
index 56ccb0560d0a..02c559ee257a 100644
--- a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQReplaceTest.java
+++ b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQReplaceTest.java
@@ -21,13 +21,17 @@
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
+import org.apache.druid.common.config.NullHandling;
+import org.apache.druid.indexing.common.actions.RetrieveUsedSegmentsAction;
import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.msq.test.CounterSnapshotMatcher;
import org.apache.druid.msq.test.MSQTestBase;
import org.apache.druid.msq.test.MSQTestFileUtils;
+import org.apache.druid.msq.test.MSQTestTaskActionClient;
import org.apache.druid.segment.column.ColumnType;
import org.apache.druid.segment.column.RowSignature;
import org.apache.druid.sql.SqlPlanningException;
+import org.apache.druid.timeline.DataSegment;
import org.apache.druid.timeline.SegmentId;
import org.apache.druid.timeline.partition.DimensionRangeShardSpec;
import org.hamcrest.CoreMatchers;
@@ -35,6 +39,8 @@
import org.junit.internal.matchers.ThrowableMessageMatcher;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
+import org.mockito.ArgumentMatchers;
+import org.mockito.Mockito;
import javax.annotation.Nonnull;
import java.io.File;
@@ -499,6 +505,7 @@ public void testReplaceWhereClauseLargerThanData()
.setExpectedRowSignature(rowSignature)
.setQueryContext(context)
.setExpectedDestinationIntervals(Collections.singletonList(Intervals.of("2000-01-01T/2002-01-01T")))
+ .setExpectedTombstoneIntervals(ImmutableSet.of(Intervals.of("2001-01-01T/2001-02-01T")))
.setExpectedSegment(ImmutableSet.of(SegmentId.of(
"foo",
Intervals.of("2000-01-01T/P1M"),
@@ -511,12 +518,6 @@ public void testReplaceWhereClauseLargerThanData()
new Object[]{946771200000L, 2.0f}
)
)
- .setExpectedSegment(ImmutableSet.of(SegmentId.of(
- "foo",
- Intervals.of("2000-01-01T/P1M"),
- "test",
- 0
- )))
.setExpectedCountersForStageWorkerChannel(
CounterSnapshotMatcher
.with().totalFiles(1),
@@ -625,6 +626,7 @@ public void testReplaceTimeChunksLargerThanData()
.setExpectedRowSignature(rowSignature)
.setQueryContext(context)
.setExpectedDestinationIntervals(Collections.singletonList(Intervals.of("2000-01-01T/2002-01-01T")))
+ .setExpectedTombstoneIntervals(ImmutableSet.of(Intervals.of("2001-01-01T/2001-02-01T")))
.setExpectedSegment(ImmutableSet.of(SegmentId.of(
"foo",
Intervals.of("2000-01-01T/P1M"),
@@ -694,6 +696,67 @@ public void testReplaceSegmentsInsertIntoNewTable()
.verifyResults();
}
+ @Test
+ public void testReplaceTombstonesOverPartiallyOverlappingSegments()
+ {
+ RowSignature rowSignature = RowSignature.builder()
+ .add("__time", ColumnType.LONG)
+ .add("dim1", ColumnType.STRING)
+ .add("cnt", ColumnType.LONG).build();
+
+ // Create a datasegment which lies partially outside the generated segment
+ DataSegment existingDataSegment = DataSegment.builder()
+ .interval(Intervals.of("2001-01-01T/2003-01-04T"))
+ .size(50)
+ .version(MSQTestTaskActionClient.VERSION)
+ .dataSource("foo1")
+ .build();
+
+ Mockito.doReturn(ImmutableSet.of(existingDataSegment)).when(testTaskActionClient).submit(ArgumentMatchers.isA(RetrieveUsedSegmentsAction.class));
+
+ List