Skip to content

Commit

Permalink
Generate tombstones when running MSQ's replace (#13706)
Browse files Browse the repository at this point in the history
*When running REPLACE queries, the segments which contain no data are dropped (marked as unused). This PR aims to generate tombstones in place of segments which contain no data to mark their deletion, as is the behavior with the native ingestion.

This will cause InsertCannotReplaceExistingSegmentFault to be removed since it was generated if the interval to be marked unused didn't fully overlap one of the existing segments to replace.
  • Loading branch information
LakshSingla authored Mar 1, 2023
1 parent 6cf754b commit ca68fd9
Show file tree
Hide file tree
Showing 13 changed files with 488 additions and 206 deletions.
1 change: 0 additions & 1 deletion docs/multi-stage-query/reference.md
Original file line number Diff line number Diff line change
Expand Up @@ -674,7 +674,6 @@ The following table describes error codes you may encounter in the `multiStageQu
| <a name="error_InsertCannotAllocateSegment">`InsertCannotAllocateSegment`</a> | The controller task could not allocate a new segment ID due to conflict with existing segments or pending segments. Common reasons for such conflicts:<br /> <br /><ul><li>Attempting to mix different granularities in the same intervals of the same datasource.</li><li>Prior ingestions that used non-extendable shard specs.</li></ul>| `dataSource`<br /> <br />`interval`: The interval for the attempted new segment allocation. |
| <a name="error_InsertCannotBeEmpty">`InsertCannotBeEmpty`</a> | An INSERT or REPLACE query did not generate any output rows in a situation where output rows are required for success. This can happen for INSERT or REPLACE queries with `PARTITIONED BY` set to something other than `ALL` or `ALL TIME`. | `dataSource` |
| <a name="error_InsertCannotOrderByDescending">`InsertCannotOrderByDescending`</a> | An INSERT query contained a `CLUSTERED BY` expression in descending order. Druid's segment generation code only supports ascending order. | `columnName` |
| <a name="error_InsertCannotReplaceExistingSegment">`InsertCannotReplaceExistingSegment`</a> | A REPLACE query cannot proceed because an existing segment partially overlaps those bounds, and the portion within the bounds is not fully overshadowed by query results. <br /> <br />There are two ways to address this without modifying your query:<ul><li>Shrink the OVERLAP filter to match the query results.</li><li>Expand the OVERLAP filter to fully contain the existing segment.</li></ul>| `segmentId`: The existing segment <br />
| <a name="error_InsertLockPreempted">`InsertLockPreempted`</a> | An INSERT or REPLACE query was canceled by a higher-priority ingestion job, such as a real-time ingestion task. | |
| <a name="error_InsertTimeNull">`InsertTimeNull`</a> | An INSERT or REPLACE query encountered a null timestamp in the `__time` field.<br /><br />This can happen due to using an expression like `TIME_PARSE(timestamp) AS __time` with a timestamp that cannot be parsed. (TIME_PARSE returns null when it cannot parse a timestamp.) In this case, try parsing your timestamps using a different function or pattern.<br /><br />If your timestamps may genuinely be null, consider using COALESCE to provide a default value. One option is CURRENT_TIMESTAMP, which represents the start time of the job. |
| <a name="error_InsertTimeOutOfBounds">`InsertTimeOutOfBounds`</a> | A REPLACE query generated a timestamp outside the bounds of the TIMESTAMP parameter for your OVERWRITE WHERE clause.<br /> <br />To avoid this error, verify that the you specified is valid. | `interval`: time chunk interval corresponding to the out-of-bounds timestamp |
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Iterables;
import com.google.common.collect.Iterators;
import com.google.common.util.concurrent.FutureCallback;
Expand Down Expand Up @@ -62,12 +61,11 @@
import org.apache.druid.indexing.common.TaskReport;
import org.apache.druid.indexing.common.actions.LockListAction;
import org.apache.druid.indexing.common.actions.MarkSegmentsAsUnusedAction;
import org.apache.druid.indexing.common.actions.RetrieveUsedSegmentsAction;
import org.apache.druid.indexing.common.actions.SegmentAllocateAction;
import org.apache.druid.indexing.common.actions.SegmentTransactionalInsertAction;
import org.apache.druid.indexing.common.actions.TaskActionClient;
import org.apache.druid.indexing.common.task.batch.parallel.TombstoneHelper;
import org.apache.druid.indexing.overlord.SegmentPublishResult;
import org.apache.druid.indexing.overlord.Segments;
import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.common.IAE;
import org.apache.druid.java.util.common.ISE;
Expand Down Expand Up @@ -103,7 +101,6 @@
import org.apache.druid.msq.indexing.error.InsertCannotAllocateSegmentFault;
import org.apache.druid.msq.indexing.error.InsertCannotBeEmptyFault;
import org.apache.druid.msq.indexing.error.InsertCannotOrderByDescendingFault;
import org.apache.druid.msq.indexing.error.InsertCannotReplaceExistingSegmentFault;
import org.apache.druid.msq.indexing.error.InsertLockPreemptedFault;
import org.apache.druid.msq.indexing.error.InsertTimeOutOfBoundsFault;
import org.apache.druid.msq.indexing.error.MSQErrorReport;
Expand Down Expand Up @@ -1247,48 +1244,33 @@ private void postResultPartitionBoundariesForStage(
/**
* Publish the list of segments. Additionally, if {@link DataSourceMSQDestination#isReplaceTimeChunks()},
* also drop all other segments within the replacement intervals.
* <p>
* If any existing segments cannot be dropped because their intervals are not wholly contained within the
* replacement parameter, throws a {@link MSQException} with {@link InsertCannotReplaceExistingSegmentFault}.
*/
private void publishAllSegments(final Set<DataSegment> segments) throws IOException
{
final DataSourceMSQDestination destination =
(DataSourceMSQDestination) task.getQuerySpec().getDestination();
final Set<DataSegment> segmentsToDrop;
final Set<DataSegment> segmentsWithTombstones = new HashSet<>(segments);

if (destination.isReplaceTimeChunks()) {
final List<Interval> intervalsToDrop = findIntervalsToDrop(Preconditions.checkNotNull(segments, "segments"));

if (intervalsToDrop.isEmpty()) {
segmentsToDrop = null;
} else {
// Determine which segments to drop as part of the replace operation. This is safe because, in the case where we
// are doing a replace, the isReady method (which runs prior to the task starting) acquires an exclusive lock.
segmentsToDrop =
ImmutableSet.copyOf(
context.taskActionClient().submit(
new RetrieveUsedSegmentsAction(
task.getDataSource(),
null,
intervalsToDrop,
Segments.ONLY_VISIBLE
)
)
);

// Validate that there are no segments that partially overlap the intervals-to-drop. Otherwise, the replace
// may be incomplete.
for (final DataSegment segmentToDrop : segmentsToDrop) {
if (destination.getReplaceTimeChunks()
.stream()
.noneMatch(interval -> interval.contains(segmentToDrop.getInterval()))) {
throw new MSQException(new InsertCannotReplaceExistingSegmentFault(segmentToDrop.getId()));
}
if (!intervalsToDrop.isEmpty()) {
TombstoneHelper tombstoneHelper = new TombstoneHelper(context.taskActionClient());
try {
Set<DataSegment> tombstones = tombstoneHelper.computeTombstoneSegmentsForReplace(
intervalsToDrop,
destination.getReplaceTimeChunks(),
task.getDataSource(),
destination.getSegmentGranularity()
);
segmentsWithTombstones.addAll(tombstones);
}
catch (IllegalStateException e) {
throw new MSQException(e, InsertLockPreemptedFault.instance());
}
}

if (segments.isEmpty()) {
if (segmentsWithTombstones.isEmpty()) {
// Nothing to publish, only drop. We already validated that the intervalsToDrop do not have any
// partially-overlapping segments, so it's safe to drop them as intervals instead of as specific segments.
for (final Interval interval : intervalsToDrop) {
Expand All @@ -1298,7 +1280,7 @@ private void publishAllSegments(final Set<DataSegment> segments) throws IOExcept
} else {
performSegmentPublish(
context.taskActionClient(),
SegmentTransactionalInsertAction.overwriteAction(null, segmentsToDrop, segments)
SegmentTransactionalInsertAction.overwriteAction(null, null, segmentsWithTombstones)
);
}
} else if (!segments.isEmpty()) {
Expand Down Expand Up @@ -2590,6 +2572,7 @@ static ClusterStatisticsMergeMode finalizeClusterStatisticsMergeMode(
return mergeMode;
}


/**
* Interface used by {@link #contactWorkersForStage}.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,6 @@
import org.apache.druid.msq.indexing.error.InsertCannotAllocateSegmentFault;
import org.apache.druid.msq.indexing.error.InsertCannotBeEmptyFault;
import org.apache.druid.msq.indexing.error.InsertCannotOrderByDescendingFault;
import org.apache.druid.msq.indexing.error.InsertCannotReplaceExistingSegmentFault;
import org.apache.druid.msq.indexing.error.InsertLockPreemptedFault;
import org.apache.druid.msq.indexing.error.InsertTimeNullFault;
import org.apache.druid.msq.indexing.error.InsertTimeOutOfBoundsFault;
Expand Down Expand Up @@ -106,7 +105,6 @@ public class MSQIndexingModule implements DruidModule
InsertCannotAllocateSegmentFault.class,
InsertCannotBeEmptyFault.class,
InsertCannotOrderByDescendingFault.class,
InsertCannotReplaceExistingSegmentFault.class,
InsertLockPreemptedFault.class,
InsertTimeNullFault.class,
InsertTimeOutOfBoundsFault.class,
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -21,14 +21,12 @@

import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import org.apache.druid.indexing.common.actions.RetrieveUsedSegmentsAction;
import org.apache.druid.indexing.common.actions.SegmentAllocateAction;
import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.msq.indexing.error.InsertCannotAllocateSegmentFault;
import org.apache.druid.msq.indexing.error.InsertCannotBeEmptyFault;
import org.apache.druid.msq.indexing.error.InsertCannotOrderByDescendingFault;
import org.apache.druid.msq.indexing.error.InsertCannotReplaceExistingSegmentFault;
import org.apache.druid.msq.indexing.error.InsertTimeNullFault;
import org.apache.druid.msq.indexing.error.InsertTimeOutOfBoundsFault;
import org.apache.druid.msq.indexing.error.TooManyClusteredByColumnsFault;
Expand All @@ -40,7 +38,6 @@
import org.apache.druid.msq.test.MSQTestFileUtils;
import org.apache.druid.segment.column.ColumnType;
import org.apache.druid.segment.column.RowSignature;
import org.apache.druid.timeline.DataSegment;
import org.apache.druid.timeline.SegmentId;
import org.junit.Test;
import org.mockito.Mockito;
Expand Down Expand Up @@ -114,30 +111,6 @@ public void testInsertCannotOrderByDescendingFault()
.verifyResults();
}

@Test
public void testInsertCannotReplaceExistingSegmentFault()
{
RowSignature rowSignature = RowSignature.builder()
.add("__time", ColumnType.LONG)
.add("dim1", ColumnType.STRING)
.add("cnt", ColumnType.LONG).build();

// Create a datasegment which lies partially outside the generated segment
DataSegment existingDataSegment = DataSegment.builder()
.interval(Intervals.of("2001-01-01T/2003-01-04T"))
.size(50)
.version("1").dataSource("foo1")
.build();
Mockito.doReturn(ImmutableSet.of(existingDataSegment)).when(testTaskActionClient).submit(isA(RetrieveUsedSegmentsAction.class));

testIngestQuery().setSql(
"replace into foo1 overwrite where __time >= TIMESTAMP '2000-01-01 00:00:00' and __time < TIMESTAMP '2002-01-03 00:00:00' select __time, dim1 , count(*) as cnt from foo where dim1 is not null group by 1, 2 PARTITIONED by day clustered by dim1")
.setExpectedDataSource("foo1")
.setExpectedRowSignature(rowSignature)
.setExpectedMSQFault(new InsertCannotReplaceExistingSegmentFault(existingDataSegment.getId()))
.verifyResults();
}

@Test
public void testInsertTimeOutOfBoundsFault()
{
Expand Down
Loading

0 comments on commit ca68fd9

Please sign in to comment.