Skip to content

Commit

Permalink
Store null columns in the segments (#12279)
Browse files Browse the repository at this point in the history
* Store null columns in the segments

* fix test

* remove NullNumericColumn and unused dependency

* fix compile failure

* use guava instead of apache commons

* split new tests

* unused imports

* address comments
  • Loading branch information
jihoonson authored Mar 23, 2022
1 parent d7308e9 commit b6eeef3
Show file tree
Hide file tree
Showing 81 changed files with 2,461 additions and 533 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@
import org.apache.druid.indexing.common.config.TaskStorageConfig;
import org.apache.druid.indexing.common.task.IndexTaskTest;
import org.apache.druid.indexing.common.task.Task;
import org.apache.druid.indexing.common.task.Tasks;
import org.apache.druid.indexing.common.task.TestAppenderatorsManager;
import org.apache.druid.indexing.kafka.supervisor.KafkaSupervisor;
import org.apache.druid.indexing.kafka.supervisor.KafkaSupervisorIOConfig;
Expand Down Expand Up @@ -148,6 +149,7 @@
import org.apache.druid.server.coordination.DataSegmentServerAnnouncer;
import org.apache.druid.server.coordination.ServerType;
import org.apache.druid.server.security.AuthTestUtils;
import org.apache.druid.timeline.DataSegment;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.header.Header;
Expand All @@ -169,6 +171,7 @@
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
Expand Down Expand Up @@ -434,6 +437,96 @@ public void testRunAfterDataInserted() throws Exception
);
}

@Test(timeout = 60_000L)
public void testIngestNullColumnAfterDataInserted() throws Exception
{
// Insert data
insertData();

final DimensionsSpec dimensionsSpec = new DimensionsSpec(
ImmutableList.of(
new StringDimensionSchema("dim1"),
new StringDimensionSchema("dim1t"),
new StringDimensionSchema("unknownDim"),
new StringDimensionSchema("dim2"),
new LongDimensionSchema("dimLong"),
new FloatDimensionSchema("dimFloat")
)
);
final KafkaIndexTask task = createTask(
null,
NEW_DATA_SCHEMA.withDimensionsSpec(dimensionsSpec),
new KafkaIndexTaskIOConfig(
0,
"sequence0",
new SeekableStreamStartSequenceNumbers<>(topic, ImmutableMap.of(0, 2L), ImmutableSet.of()),
new SeekableStreamEndSequenceNumbers<>(topic, ImmutableMap.of(0, 5L)),
kafkaServer.consumerProperties(),
KafkaSupervisorIOConfig.DEFAULT_POLL_TIMEOUT_MILLIS,
true,
null,
null,
INPUT_FORMAT
)
);
final ListenableFuture<TaskStatus> future = runTask(task);

// Wait for task to exit
Assert.assertEquals(TaskState.SUCCESS, future.get().getStatusCode());

final Collection<DataSegment> segments = publishedSegments();
for (DataSegment segment : segments) {
for (int i = 0; i < dimensionsSpec.getDimensions().size(); i++) {
Assert.assertEquals(dimensionsSpec.getDimensionNames().get(i), segment.getDimensions().get(i));
}
}
}

@Test(timeout = 60_000L)
public void testIngestNullColumnAfterDataInserted_storeEmptyColumnsOff_shouldNotStoreEmptyColumns() throws Exception
{
// Insert data
insertData();

final KafkaIndexTask task = createTask(
null,
NEW_DATA_SCHEMA.withDimensionsSpec(
new DimensionsSpec(
ImmutableList.of(
new StringDimensionSchema("dim1"),
new StringDimensionSchema("dim1t"),
new StringDimensionSchema("dim2"),
new LongDimensionSchema("dimLong"),
new FloatDimensionSchema("dimFloat"),
new StringDimensionSchema("unknownDim")
)
)
),
new KafkaIndexTaskIOConfig(
0,
"sequence0",
new SeekableStreamStartSequenceNumbers<>(topic, ImmutableMap.of(0, 2L), ImmutableSet.of()),
new SeekableStreamEndSequenceNumbers<>(topic, ImmutableMap.of(0, 5L)),
kafkaServer.consumerProperties(),
KafkaSupervisorIOConfig.DEFAULT_POLL_TIMEOUT_MILLIS,
true,
null,
null,
INPUT_FORMAT
)
);
task.addToContext(Tasks.STORE_EMPTY_COLUMNS_KEY, false);
final ListenableFuture<TaskStatus> future = runTask(task);

// Wait for task to exit
Assert.assertEquals(TaskState.SUCCESS, future.get().getStatusCode());

final Collection<DataSegment> segments = publishedSegments();
for (DataSegment segment : segments) {
Assert.assertFalse(segment.getDimensions().contains("unknownDim"));
}
}

@Test(timeout = 60_000L)
public void testRunAfterDataInsertedWithLegacyParser() throws Exception
{
Expand Down Expand Up @@ -2899,7 +2992,8 @@ private void makeToolboxFactory() throws IOException
null,
false,
false,
TaskConfig.BATCH_PROCESSING_MODE_DEFAULT.name()
TaskConfig.BATCH_PROCESSING_MODE_DEFAULT.name(),
null
);
final TestDerbyConnector derbyConnector = derby.getConnector();
derbyConnector.createDataSourceTable();
Expand Down Expand Up @@ -3009,7 +3103,7 @@ public void close()
MapCache.create(1024),
new CacheConfig(),
new CachePopulatorStats(),
testUtils.getTestIndexMergerV9(),
testUtils.getIndexMergerV9Factory(),
EasyMock.createNiceMock(DruidNodeAnnouncer.class),
EasyMock.createNiceMock(DruidNode.class),
new LookupNodeService("tier"),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,10 @@
import org.apache.druid.client.indexing.NoopIndexingServiceClient;
import org.apache.druid.common.aws.AWSCredentialsConfig;
import org.apache.druid.data.input.impl.ByteEntity;
import org.apache.druid.data.input.impl.DimensionsSpec;
import org.apache.druid.data.input.impl.FloatDimensionSchema;
import org.apache.druid.data.input.impl.LongDimensionSchema;
import org.apache.druid.data.input.impl.StringDimensionSchema;
import org.apache.druid.discovery.DataNodeService;
import org.apache.druid.discovery.DruidNodeAnnouncer;
import org.apache.druid.discovery.LookupNodeService;
Expand All @@ -61,6 +65,7 @@
import org.apache.druid.indexing.common.task.IndexTaskTest;
import org.apache.druid.indexing.common.task.Task;
import org.apache.druid.indexing.common.task.TaskResource;
import org.apache.druid.indexing.common.task.Tasks;
import org.apache.druid.indexing.common.task.TestAppenderatorsManager;
import org.apache.druid.indexing.kinesis.supervisor.KinesisSupervisor;
import org.apache.druid.indexing.overlord.DataSourceMetadata;
Expand Down Expand Up @@ -113,6 +118,7 @@
import org.apache.druid.server.coordination.DataSegmentServerAnnouncer;
import org.apache.druid.server.coordination.ServerType;
import org.apache.druid.server.security.AuthTestUtils;
import org.apache.druid.timeline.DataSegment;
import org.easymock.EasyMock;
import org.joda.time.Period;
import org.junit.After;
Expand All @@ -130,6 +136,7 @@
import java.io.File;
import java.io.IOException;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
Expand Down Expand Up @@ -392,6 +399,132 @@ public void testRunAfterDataInserted() throws Exception
);
}

@Test(timeout = 120_000L)
public void testIngestNullColumnAfterDataInserted() throws Exception
{
recordSupplier.assign(EasyMock.anyObject());
EasyMock.expectLastCall().anyTimes();

EasyMock.expect(recordSupplier.getEarliestSequenceNumber(EasyMock.anyObject())).andReturn("0").anyTimes();

recordSupplier.seek(EasyMock.anyObject(), EasyMock.anyString());
EasyMock.expectLastCall().anyTimes();

EasyMock.expect(recordSupplier.poll(EasyMock.anyLong())).andReturn(generateRecords(2, 5)).once();

recordSupplier.close();
EasyMock.expectLastCall().once();

replayAll();

final DimensionsSpec dimensionsSpec = new DimensionsSpec(
ImmutableList.of(
new StringDimensionSchema("dim1"),
new StringDimensionSchema("dim1t"),
new StringDimensionSchema("dim2"),
new LongDimensionSchema("dimLong"),
new FloatDimensionSchema("dimFloat"),
new StringDimensionSchema("unknownDim")
)
);
final KinesisIndexTask task = createTask(
null,
NEW_DATA_SCHEMA.withDimensionsSpec(dimensionsSpec),
new KinesisIndexTaskIOConfig(
0,
"sequence0",
new SeekableStreamStartSequenceNumbers<>(STREAM, ImmutableMap.of(SHARD_ID1, "2"), ImmutableSet.of()),
new SeekableStreamEndSequenceNumbers<>(STREAM, ImmutableMap.of(SHARD_ID1, "4")),
true,
null,
null,
INPUT_FORMAT,
"awsEndpoint",
null,
null,
null,
null,
false
)
);
final ListenableFuture<TaskStatus> future = runTask(task);

// Wait for task to exit
Assert.assertEquals(TaskState.SUCCESS, future.get().getStatusCode());

verifyAll();

final Collection<DataSegment> segments = publishedSegments();
for (DataSegment segment : segments) {
for (int i = 0; i < dimensionsSpec.getDimensions().size(); i++) {
Assert.assertEquals(dimensionsSpec.getDimensionNames().get(i), segment.getDimensions().get(i));
}
}
}

@Test(timeout = 120_000L)
public void testIngestNullColumnAfterDataInserted_storeEmptyColumnsOff_shouldNotStoreEmptyColumns() throws Exception
{
recordSupplier.assign(EasyMock.anyObject());
EasyMock.expectLastCall().anyTimes();

EasyMock.expect(recordSupplier.getEarliestSequenceNumber(EasyMock.anyObject())).andReturn("0").anyTimes();

recordSupplier.seek(EasyMock.anyObject(), EasyMock.anyString());
EasyMock.expectLastCall().anyTimes();

EasyMock.expect(recordSupplier.poll(EasyMock.anyLong())).andReturn(generateRecords(2, 5)).once();

recordSupplier.close();
EasyMock.expectLastCall().once();

replayAll();

final KinesisIndexTask task = createTask(
null,
NEW_DATA_SCHEMA.withDimensionsSpec(
new DimensionsSpec(
ImmutableList.of(
new StringDimensionSchema("dim1"),
new StringDimensionSchema("dim1t"),
new StringDimensionSchema("dim2"),
new LongDimensionSchema("dimLong"),
new FloatDimensionSchema("dimFloat"),
new StringDimensionSchema("unknownDim")
)
)
),
new KinesisIndexTaskIOConfig(
0,
"sequence0",
new SeekableStreamStartSequenceNumbers<>(STREAM, ImmutableMap.of(SHARD_ID1, "2"), ImmutableSet.of()),
new SeekableStreamEndSequenceNumbers<>(STREAM, ImmutableMap.of(SHARD_ID1, "4")),
true,
null,
null,
INPUT_FORMAT,
"awsEndpoint",
null,
null,
null,
null,
false
)
);
task.addToContext(Tasks.STORE_EMPTY_COLUMNS_KEY, false);
final ListenableFuture<TaskStatus> future = runTask(task);

// Wait for task to exit
Assert.assertEquals(TaskState.SUCCESS, future.get().getStatusCode());

verifyAll();

final Collection<DataSegment> segments = publishedSegments();
for (DataSegment segment : segments) {
Assert.assertFalse(segment.getDimensions().contains("unknownDim"));
}
}

@Test(timeout = 120_000L)
public void testRunAfterDataInsertedWithLegacyParser() throws Exception
{
Expand Down Expand Up @@ -2902,7 +3035,8 @@ private void makeToolboxFactory() throws IOException
null,
false,
false,
TaskConfig.BATCH_PROCESSING_MODE_DEFAULT.name()
TaskConfig.BATCH_PROCESSING_MODE_DEFAULT.name(),
null
);
final TestDerbyConnector derbyConnector = derby.getConnector();
derbyConnector.createDataSourceTable();
Expand Down Expand Up @@ -3013,7 +3147,7 @@ public void close()
MapCache.create(1024),
new CacheConfig(),
new CachePopulatorStats(),
testUtils.getTestIndexMergerV9(),
testUtils.getIndexMergerV9Factory(),
EasyMock.createNiceMock(DruidNodeAnnouncer.class),
EasyMock.createNiceMock(DruidNode.class),
new LookupNodeService("tier"),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ public class HadoopDruidIndexerConfig
static final Joiner TAB_JOINER = Joiner.on("\t");
public static final ObjectMapper JSON_MAPPER;
public static final IndexIO INDEX_IO;
static final IndexMerger INDEX_MERGER_V9;
static final IndexMerger INDEX_MERGER_V9; // storeEmptyColumns is off for this indexMerger
static final HadoopKerberosConfig HADOOP_KERBEROS_CONFIG;
static final DataSegmentPusher DATA_SEGMENT_PUSHER;
private static final String DEFAULT_WORKING_PATH = "/tmp/druid-indexing";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import org.apache.druid.indexing.common.config.TaskConfig;
import org.apache.druid.indexing.common.task.IndexTaskClientFactory;
import org.apache.druid.indexing.common.task.Task;
import org.apache.druid.indexing.common.task.Tasks;
import org.apache.druid.indexing.common.task.batch.parallel.ParallelIndexSupervisorTaskClient;
import org.apache.druid.indexing.common.task.batch.parallel.ShuffleClient;
import org.apache.druid.indexing.worker.shuffle.IntermediaryDataManager;
Expand All @@ -46,7 +47,7 @@
import org.apache.druid.query.QueryProcessingPool;
import org.apache.druid.query.QueryRunnerFactoryConglomerate;
import org.apache.druid.segment.IndexIO;
import org.apache.druid.segment.IndexMergerV9;
import org.apache.druid.segment.IndexMergerV9Factory;
import org.apache.druid.segment.handoff.SegmentHandoffNotifierFactory;
import org.apache.druid.segment.incremental.RowIngestionMetersFactory;
import org.apache.druid.segment.join.JoinableFactory;
Expand Down Expand Up @@ -89,7 +90,7 @@ public class TaskToolboxFactory
private final Cache cache;
private final CacheConfig cacheConfig;
private final CachePopulatorStats cachePopulatorStats;
private final IndexMergerV9 indexMergerV9;
private final IndexMergerV9Factory indexMergerV9Factory;
private final DruidNodeAnnouncer druidNodeAnnouncer;
private final DruidNode druidNode;
private final LookupNodeService lookupNodeService;
Expand Down Expand Up @@ -130,7 +131,7 @@ public TaskToolboxFactory(
Cache cache,
CacheConfig cacheConfig,
CachePopulatorStats cachePopulatorStats,
IndexMergerV9 indexMergerV9,
IndexMergerV9Factory indexMergerV9Factory,
DruidNodeAnnouncer druidNodeAnnouncer,
@RemoteChatHandler DruidNode druidNode,
LookupNodeService lookupNodeService,
Expand Down Expand Up @@ -168,7 +169,7 @@ public TaskToolboxFactory(
this.cache = cache;
this.cacheConfig = cacheConfig;
this.cachePopulatorStats = cachePopulatorStats;
this.indexMergerV9 = indexMergerV9;
this.indexMergerV9Factory = indexMergerV9Factory;
this.druidNodeAnnouncer = druidNodeAnnouncer;
this.druidNode = druidNode;
this.lookupNodeService = lookupNodeService;
Expand Down Expand Up @@ -211,7 +212,7 @@ public TaskToolbox build(Task task)
cache,
cacheConfig,
cachePopulatorStats,
indexMergerV9,
indexMergerV9Factory.create(task.getContextValue(Tasks.STORE_EMPTY_COLUMNS_KEY, config.isStoreEmptyColumns())),
druidNodeAnnouncer,
druidNode,
lookupNodeService,
Expand Down
Loading

0 comments on commit b6eeef3

Please sign in to comment.