Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Generate tombstones when running MSQ's replace #13706

Merged
merged 16 commits into from
Mar 1, 2023
1 change: 0 additions & 1 deletion docs/multi-stage-query/reference.md
Original file line number Diff line number Diff line change
Expand Up @@ -673,7 +673,6 @@ The following table describes error codes you may encounter in the `multiStageQu
| <a name="error_InsertCannotAllocateSegment">`InsertCannotAllocateSegment`</a> | The controller task could not allocate a new segment ID due to conflict with existing segments or pending segments. Common reasons for such conflicts:<br /> <br /><ul><li>Attempting to mix different granularities in the same intervals of the same datasource.</li><li>Prior ingestions that used non-extendable shard specs.</li></ul>| `dataSource`<br /> <br />`interval`: The interval for the attempted new segment allocation. |
| <a name="error_InsertCannotBeEmpty">`InsertCannotBeEmpty`</a> | An INSERT or REPLACE query did not generate any output rows in a situation where output rows are required for success. This can happen for INSERT or REPLACE queries with `PARTITIONED BY` set to something other than `ALL` or `ALL TIME`. | `dataSource` |
| <a name="error_InsertCannotOrderByDescending">`InsertCannotOrderByDescending`</a> | An INSERT query contained a `CLUSTERED BY` expression in descending order. Druid's segment generation code only supports ascending order. | `columnName` |
| <a name="error_InsertCannotReplaceExistingSegment">`InsertCannotReplaceExistingSegment`</a> | A REPLACE query cannot proceed because an existing segment partially overlaps those bounds, and the portion within the bounds is not fully overshadowed by query results. <br /> <br />There are two ways to address this without modifying your query:<ul><li>Shrink the OVERLAP filter to match the query results.</li><li>Expand the OVERLAP filter to fully contain the existing segment.</li></ul>| `segmentId`: The existing segment <br />
| <a name="error_InsertLockPreempted">`InsertLockPreempted`</a> | An INSERT or REPLACE query was canceled by a higher-priority ingestion job, such as a real-time ingestion task. | |
| <a name="error_InsertTimeNull">`InsertTimeNull`</a> | An INSERT or REPLACE query encountered a null timestamp in the `__time` field.<br /><br />This can happen due to using an expression like `TIME_PARSE(timestamp) AS __time` with a timestamp that cannot be parsed. (TIME_PARSE returns null when it cannot parse a timestamp.) In this case, try parsing your timestamps using a different function or pattern.<br /><br />If your timestamps may genuinely be null, consider using COALESCE to provide a default value. One option is CURRENT_TIMESTAMP, which represents the start time of the job. |
| <a name="error_InsertTimeOutOfBounds">`InsertTimeOutOfBounds`</a> | A REPLACE query generated a timestamp outside the bounds of the TIMESTAMP parameter for your OVERWRITE WHERE clause.<br /> <br />To avoid this error, verify that the you specified is valid. | `interval`: time chunk interval corresponding to the out-of-bounds timestamp |
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Iterables;
import com.google.common.collect.Iterators;
import com.google.common.util.concurrent.FutureCallback;
Expand Down Expand Up @@ -62,12 +61,11 @@
import org.apache.druid.indexing.common.TaskReport;
import org.apache.druid.indexing.common.actions.LockListAction;
import org.apache.druid.indexing.common.actions.MarkSegmentsAsUnusedAction;
import org.apache.druid.indexing.common.actions.RetrieveUsedSegmentsAction;
import org.apache.druid.indexing.common.actions.SegmentAllocateAction;
import org.apache.druid.indexing.common.actions.SegmentTransactionalInsertAction;
import org.apache.druid.indexing.common.actions.TaskActionClient;
import org.apache.druid.indexing.common.task.batch.parallel.TombstoneHelper;
import org.apache.druid.indexing.overlord.SegmentPublishResult;
import org.apache.druid.indexing.overlord.Segments;
import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.common.IAE;
import org.apache.druid.java.util.common.ISE;
Expand Down Expand Up @@ -103,7 +101,6 @@
import org.apache.druid.msq.indexing.error.InsertCannotAllocateSegmentFault;
import org.apache.druid.msq.indexing.error.InsertCannotBeEmptyFault;
import org.apache.druid.msq.indexing.error.InsertCannotOrderByDescendingFault;
import org.apache.druid.msq.indexing.error.InsertCannotReplaceExistingSegmentFault;
import org.apache.druid.msq.indexing.error.InsertLockPreemptedFault;
import org.apache.druid.msq.indexing.error.InsertTimeOutOfBoundsFault;
import org.apache.druid.msq.indexing.error.MSQErrorReport;
Expand Down Expand Up @@ -1247,48 +1244,33 @@ private void postResultPartitionBoundariesForStage(
/**
* Publish the list of segments. Additionally, if {@link DataSourceMSQDestination#isReplaceTimeChunks()},
* also drop all other segments within the replacement intervals.
* <p>
* If any existing segments cannot be dropped because their intervals are not wholly contained within the
* replacement parameter, throws a {@link MSQException} with {@link InsertCannotReplaceExistingSegmentFault}.
*/
private void publishAllSegments(final Set<DataSegment> segments) throws IOException
{
final DataSourceMSQDestination destination =
(DataSourceMSQDestination) task.getQuerySpec().getDestination();
final Set<DataSegment> segmentsToDrop;
Set<DataSegment> segmentsWithTombstones = new HashSet<>(segments);
LakshSingla marked this conversation as resolved.
Show resolved Hide resolved

if (destination.isReplaceTimeChunks()) {
final List<Interval> intervalsToDrop = findIntervalsToDrop(Preconditions.checkNotNull(segments, "segments"));

if (intervalsToDrop.isEmpty()) {
segmentsToDrop = null;
} else {
// Determine which segments to drop as part of the replace operation. This is safe because, in the case where we
// are doing a replace, the isReady method (which runs prior to the task starting) acquires an exclusive lock.
segmentsToDrop =
ImmutableSet.copyOf(
context.taskActionClient().submit(
new RetrieveUsedSegmentsAction(
task.getDataSource(),
null,
intervalsToDrop,
Segments.ONLY_VISIBLE
)
)
);

// Validate that there are no segments that partially overlap the intervals-to-drop. Otherwise, the replace
// may be incomplete.
for (final DataSegment segmentToDrop : segmentsToDrop) {
if (destination.getReplaceTimeChunks()
.stream()
.noneMatch(interval -> interval.contains(segmentToDrop.getInterval()))) {
throw new MSQException(new InsertCannotReplaceExistingSegmentFault(segmentToDrop.getId()));
}
if (!intervalsToDrop.isEmpty()) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was trying to compare this code to the one present in the native ingestion - I couldn't understand the reason for not using TombstoneHelper class to compute the tombstone intervals and the segments.
Is there a specific reason that both the code paths can be common?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There are some minute differences between how the TombstoneHelper expects the arguments v/s as to how the MSQ is generating the segments:
The TombstoneHelper is using the DataSchema and its granularitySpec to compute the empty segments, v/s here we have the empty intervals for which we know that the segments corresponding to it should be empty, due to which I wasn't able to reconcile the code paths cleanly. One way was to create a dummy data schema corresponding to the empty intervals.
Also, the pushedSegments argument in the helper was of no use since we know the empty intervals in replace, therefore we would also need to dummy that to something which would never overlap. Due to these, I decided to drop the usage of TombstoneHelper

TombstoneHelper tombstoneHelper = new TombstoneHelper(context.taskActionClient());
try {
Set<DataSegment> tombstones = tombstoneHelper.computeTombstonesForReplace(
intervalsToDrop,
destination.getReplaceTimeChunks(),
task.getDataSource(),
destination.getSegmentGranularity()
);
segmentsWithTombstones.addAll(tombstones);
}
catch (IllegalStateException e) {
throw new MSQException(e, InsertLockPreemptedFault.instance());
}
}

if (segments.isEmpty()) {
if (segmentsWithTombstones.isEmpty()) {
// Nothing to publish, only drop. We already validated that the intervalsToDrop do not have any
// partially-overlapping segments, so it's safe to drop them as intervals instead of as specific segments.
for (final Interval interval : intervalsToDrop) {
Expand All @@ -1298,7 +1280,7 @@ private void publishAllSegments(final Set<DataSegment> segments) throws IOExcept
} else {
performSegmentPublish(
context.taskActionClient(),
SegmentTransactionalInsertAction.overwriteAction(null, segmentsToDrop, segments)
SegmentTransactionalInsertAction.overwriteAction(null, null, segmentsWithTombstones)
);
}
} else if (!segments.isEmpty()) {
Expand Down Expand Up @@ -2586,6 +2568,8 @@ static ClusterStatisticsMergeMode finalizeClusterStatisticsMergeMode(
return mergeMode;
}


LakshSingla marked this conversation as resolved.
Show resolved Hide resolved

/**
* Interface used by {@link #contactWorkersForStage}.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,6 @@
import org.apache.druid.msq.indexing.error.InsertCannotAllocateSegmentFault;
import org.apache.druid.msq.indexing.error.InsertCannotBeEmptyFault;
import org.apache.druid.msq.indexing.error.InsertCannotOrderByDescendingFault;
import org.apache.druid.msq.indexing.error.InsertCannotReplaceExistingSegmentFault;
import org.apache.druid.msq.indexing.error.InsertLockPreemptedFault;
import org.apache.druid.msq.indexing.error.InsertTimeNullFault;
import org.apache.druid.msq.indexing.error.InsertTimeOutOfBoundsFault;
Expand Down Expand Up @@ -106,7 +105,6 @@ public class MSQIndexingModule implements DruidModule
InsertCannotAllocateSegmentFault.class,
InsertCannotBeEmptyFault.class,
InsertCannotOrderByDescendingFault.class,
InsertCannotReplaceExistingSegmentFault.class,
InsertLockPreemptedFault.class,
InsertTimeNullFault.class,
InsertTimeOutOfBoundsFault.class,
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -21,14 +21,12 @@

import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import org.apache.druid.indexing.common.actions.RetrieveUsedSegmentsAction;
import org.apache.druid.indexing.common.actions.SegmentAllocateAction;
import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.msq.indexing.error.InsertCannotAllocateSegmentFault;
import org.apache.druid.msq.indexing.error.InsertCannotBeEmptyFault;
import org.apache.druid.msq.indexing.error.InsertCannotOrderByDescendingFault;
import org.apache.druid.msq.indexing.error.InsertCannotReplaceExistingSegmentFault;
import org.apache.druid.msq.indexing.error.InsertTimeNullFault;
import org.apache.druid.msq.indexing.error.InsertTimeOutOfBoundsFault;
import org.apache.druid.msq.indexing.error.TooManyClusteredByColumnsFault;
Expand All @@ -40,7 +38,6 @@
import org.apache.druid.msq.test.MSQTestFileUtils;
import org.apache.druid.segment.column.ColumnType;
import org.apache.druid.segment.column.RowSignature;
import org.apache.druid.timeline.DataSegment;
import org.apache.druid.timeline.SegmentId;
import org.junit.Test;
import org.mockito.Mockito;
Expand Down Expand Up @@ -114,30 +111,6 @@ public void testInsertCannotOrderByDescendingFault()
.verifyResults();
}

@Test
public void testInsertCannotReplaceExistingSegmentFault()
{
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should have a test case which tests tombstone segments. This would give us more confidence in the PR.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I moved the same test case to the replace tests, and ensured that it passes. Changed the granularity of the query a bit so as to not blow up the tombstone segments that are generated.

RowSignature rowSignature = RowSignature.builder()
.add("__time", ColumnType.LONG)
.add("dim1", ColumnType.STRING)
.add("cnt", ColumnType.LONG).build();

// Create a datasegment which lies partially outside the generated segment
DataSegment existingDataSegment = DataSegment.builder()
.interval(Intervals.of("2001-01-01T/2003-01-04T"))
.size(50)
.version("1").dataSource("foo1")
.build();
Mockito.doReturn(ImmutableSet.of(existingDataSegment)).when(testTaskActionClient).submit(isA(RetrieveUsedSegmentsAction.class));

testIngestQuery().setSql(
"replace into foo1 overwrite where __time >= TIMESTAMP '2000-01-01 00:00:00' and __time < TIMESTAMP '2002-01-03 00:00:00' select __time, dim1 , count(*) as cnt from foo where dim1 is not null group by 1, 2 PARTITIONED by day clustered by dim1")
.setExpectedDataSource("foo1")
.setExpectedRowSignature(rowSignature)
.setExpectedMSQFault(new InsertCannotReplaceExistingSegmentFault(existingDataSegment.getId()))
.verifyResults();
}

@Test
public void testInsertTimeOutOfBoundsFault()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -413,12 +413,7 @@ public void testReplaceWhereClauseLargerThanData()
new Object[]{946771200000L, 2.0f}
)
)
.setExpectedSegment(ImmutableSet.of(SegmentId.of(
"foo",
Intervals.of("2000-01-01T/P1M"),
"test",
0
)))
.setExpectedTombstoneIntervals(ImmutableSet.of(Intervals.of("2001-01-01/2001-02-01")))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How is this tombstone since we are generating data for this interval?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated the test cases, and moved this line along with the destination segments.

.verifyResults();
}

Expand Down Expand Up @@ -524,6 +519,7 @@ public void testReplaceTimeChunksLargerThanData()
new Object[]{946771200000L, 2.0f}
)
)
.setExpectedTombstoneIntervals(ImmutableSet.of(Intervals.of("2001-01-01/P1M")))
.verifyResults();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@
import org.apache.druid.msq.guice.MSQIndexingModule;
import org.apache.druid.segment.TestHelper;
import org.apache.druid.segment.column.ColumnType;
import org.apache.druid.timeline.SegmentId;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
Expand Down Expand Up @@ -57,9 +56,6 @@ public void testFaultSerde() throws IOException
assertFaultSerde(new InsertCannotAllocateSegmentFault("the datasource", Intervals.ETERNITY));
assertFaultSerde(new InsertCannotBeEmptyFault("the datasource"));
assertFaultSerde(new InsertCannotOrderByDescendingFault("the column"));
assertFaultSerde(
new InsertCannotReplaceExistingSegmentFault(SegmentId.of("the datasource", Intervals.ETERNITY, "v1", 1))
);
assertFaultSerde(InsertLockPreemptedFault.INSTANCE);
assertFaultSerde(InsertTimeNullFault.INSTANCE);
assertFaultSerde(new InsertTimeOutOfBoundsFault(Intervals.ETERNITY));
Expand Down
Loading