Skip to content

Commit

Permalink
fix injection failure of StorageLocationSelectorStrategy objects (apa…
Browse files Browse the repository at this point in the history
…che#10363)

* fix to allow customer storage location selector strategy

* add test cases to check instance of selector strategy

* update doc

* code format

* resolve code review comments

* inject StorageLocation

* fix CI

* fix mismatched license item reported by CI

* change property path from druid.segmentCache.locationSelectorStrategy.type to druid.segmentCache.locationSelector.strategy

* using a helper method to bind to correct property path
  • Loading branch information
FrankChen021 authored and JulianJaffePinterest committed Jan 22, 2021
1 parent 375e6af commit eb57222
Show file tree
Hide file tree
Showing 13 changed files with 226 additions and 69 deletions.
4 changes: 2 additions & 2 deletions docs/configuration/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -1409,7 +1409,7 @@ These Historical configurations can be defined in the `historical/runtime.proper
|Property|Description|Default|
|--------|-----------|-------|
|`druid.segmentCache.locations`|Segments assigned to a Historical process are first stored on the local file system (in a disk cache) and then served by the Historical process. These locations define where that local cache resides. This value cannot be NULL or EMPTY. Here is an example `druid.segmentCache.locations=[{"path": "/mnt/druidSegments", "maxSize": "10k", "freeSpacePercent": 1.0}]`. "freeSpacePercent" is optional, if provided then enforces that much of free disk partition space while storing segments. But, it depends on File.getTotalSpace() and File.getFreeSpace() methods, so enable if only if they work for your File System.| none |
|`druid.segmentCache.locationSelectorStrategy`|The strategy used to select a location from the configured `druid.segmentCache.locations` for segment distribution. Possible values are `leastBytesUsed`, `roundRobin`, `random`, or `mostAvailableSize`. |leastBytesUsed|
|`druid.segmentCache.locationSelector.strategy`|The strategy used to select a location from the configured `druid.segmentCache.locations` for segment distribution. Possible values are `leastBytesUsed`, `roundRobin`, `random`, or `mostAvailableSize`. |leastBytesUsed|
|`druid.segmentCache.deleteOnRemove`|Delete segment files from cache once a process is no longer serving a segment.|true|
|`druid.segmentCache.dropSegmentDelayMillis`|How long a process delays before completely dropping segment.|30000 (30 seconds)|
|`druid.segmentCache.infoDir`|Historical processes keep track of the segments they are serving so that when the process is restarted they can reload the same segments without waiting for the Coordinator to reassign. This path defines where this metadata is kept. Directory will be created if needed.|${first_location}/info_dir|
Expand All @@ -1421,7 +1421,7 @@ These Historical configurations can be defined in the `historical/runtime.proper

In `druid.segmentCache.locations`, *freeSpacePercent* was added because *maxSize* setting is only a theoretical limit and assumes that much space will always be available for storing segments. In case of any druid bug leading to unaccounted segment files left alone on disk or some other process writing stuff to disk, This check can start failing segment loading early before filling up the disk completely and leaving the host usable otherwise.

In `druid.segmentCache.locationSelectorStrategy`, one of leastBytesUsed, roundRobin, random, or mostAvailableSize could be specified to represent the strategy to distribute segments across multiple segment cache locations.
In `druid.segmentCache.locationSelector.strategy`, one of `leastBytesUsed`, `roundRobin`, `random`, or `mostAvailableSize` could be specified to represent the strategy to distribute segments across multiple segment cache locations.

|Strategy|Description|
|--------|-----------|
Expand Down
11 changes: 1 addition & 10 deletions licenses.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -231,6 +231,7 @@ libraries:
- com.fasterxml.jackson.jaxrs: jackson-jaxrs-json-provider
- com.fasterxml.jackson.jaxrs: jackson-jaxrs-smile-provider
- com.fasterxml.jackson.module: jackson-module-jaxb-annotations
- com.fasterxml.jackson.module: jackson-module-guice
notice: |
# Jackson JSON processor
Expand Down Expand Up @@ -4126,16 +4127,6 @@ libraries:

---

name: "Jackson Module: Guice"
license_category: binary
module: java-core
license_name: Apache License version 2.0
version: 2.6.7
libraries:
- com.fasterxml.jackson.module: jackson-module-guice

---

name: Google APIs Client Library For Java
license_category: binary
module: java-core
Expand Down
4 changes: 4 additions & 0 deletions server/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -315,6 +315,10 @@
<groupId>io.timeandspace</groupId>
<artifactId>cron-scheduler</artifactId>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.module</groupId>
<artifactId>jackson-module-guice</artifactId>
</dependency>

<!-- Tests -->
<dependency>
Expand Down
23 changes: 23 additions & 0 deletions server/src/main/java/org/apache/druid/guice/StorageNodeModule.java
Original file line number Diff line number Diff line change
Expand Up @@ -33,11 +33,14 @@
import org.apache.druid.query.DruidProcessingConfig;
import org.apache.druid.segment.column.ColumnConfig;
import org.apache.druid.segment.loading.SegmentLoaderConfig;
import org.apache.druid.segment.loading.StorageLocation;
import org.apache.druid.segment.loading.StorageLocationSelectorStrategy;
import org.apache.druid.server.DruidNode;
import org.apache.druid.server.coordination.DruidServerMetadata;
import org.apache.druid.server.coordination.ServerType;

import javax.annotation.Nullable;
import java.util.List;

/**
*/
Expand All @@ -52,6 +55,7 @@ public void configure(Binder binder)
{
JsonConfigProvider.bind(binder, "druid.server", DruidServerConfig.class);
JsonConfigProvider.bind(binder, "druid.segmentCache", SegmentLoaderConfig.class);
bindLocationSelectorStrategy(binder);

binder.bind(ServerTypeConfig.class).toProvider(Providers.of(null));
binder.bind(ColumnConfig.class).to(DruidProcessingConfig.class);
Expand Down Expand Up @@ -117,4 +121,23 @@ public Boolean isSegmentCacheConfigured(SegmentLoaderConfig segmentLoaderConfig)
{
return !segmentLoaderConfig.getLocations().isEmpty();
}

/**
* provide a list of StorageLocation
* so that it can be injected into objects such as implementations of {@link StorageLocationSelectorStrategy}
*/
@Provides
@LazySingleton
public List<StorageLocation> provideStorageLocation(SegmentLoaderConfig config)
{
return config.toStorageLocations();
}

/**
* a helper method for both storage module and independent unit test cases
*/
public static void bindLocationSelectorStrategy(Binder binder)
{
JsonConfigProvider.bind(binder, "druid.segmentCache.locationSelector", StorageLocationSelectorStrategy.class);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@

package org.apache.druid.segment.loading;

import com.fasterxml.jackson.annotation.JacksonInject;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.google.common.collect.Ordering;

import java.util.Comparator;
Expand All @@ -34,9 +36,10 @@ public class LeastBytesUsedStorageLocationSelectorStrategy implements StorageLoc
private static final Ordering<StorageLocation> ORDERING = Ordering.from(Comparator
.comparingLong(StorageLocation::currSizeBytes));

private List<StorageLocation> storageLocations;
private final List<StorageLocation> storageLocations;

public LeastBytesUsedStorageLocationSelectorStrategy(List<StorageLocation> storageLocations)
@JsonCreator
public LeastBytesUsedStorageLocationSelectorStrategy(@JacksonInject final List<StorageLocation> storageLocations)
{
this.storageLocations = storageLocations;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@

package org.apache.druid.segment.loading;

import com.fasterxml.jackson.annotation.JacksonInject;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.google.common.collect.Ordering;

import java.util.Comparator;
Expand All @@ -35,9 +37,10 @@ public class MostAvailableSizeStorageLocationSelectorStrategy implements Storage
.comparingLong(StorageLocation::availableSizeBytes)
.reversed());

private List<StorageLocation> storageLocations;
private final List<StorageLocation> storageLocations;

public MostAvailableSizeStorageLocationSelectorStrategy(List<StorageLocation> storageLocations)
@JsonCreator
public MostAvailableSizeStorageLocationSelectorStrategy(@JacksonInject final List<StorageLocation> storageLocations)
{
this.storageLocations = storageLocations;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,9 @@

package org.apache.druid.segment.loading;

import com.fasterxml.jackson.annotation.JacksonInject;
import com.fasterxml.jackson.annotation.JsonCreator;

import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
Expand All @@ -33,7 +36,8 @@ public class RandomStorageLocationSelectorStrategy implements StorageLocationSel

private final List<StorageLocation> storageLocations;

public RandomStorageLocationSelectorStrategy(List<StorageLocation> storageLocations)
@JsonCreator
public RandomStorageLocationSelectorStrategy(@JacksonInject final List<StorageLocation> storageLocations)
{
this.storageLocations = storageLocations;
}
Expand All @@ -45,5 +49,4 @@ public Iterator<StorageLocation> getLocations()
Collections.shuffle(copyLocation);
return copyLocation.iterator();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,9 @@

package org.apache.druid.segment.loading;

import com.fasterxml.jackson.annotation.JacksonInject;
import com.fasterxml.jackson.annotation.JsonCreator;

import java.util.Iterator;
import java.util.List;
import java.util.NoSuchElementException;
Expand All @@ -32,19 +35,20 @@
*/
public class RoundRobinStorageLocationSelectorStrategy implements StorageLocationSelectorStrategy
{

private final List<StorageLocation> storageLocations;
private final AtomicInteger startIndex = new AtomicInteger(0);

public RoundRobinStorageLocationSelectorStrategy(List<StorageLocation> storageLocations)
@JsonCreator
public RoundRobinStorageLocationSelectorStrategy(@JacksonInject List<StorageLocation> storageLocations)
{
this.storageLocations = storageLocations;
}

@Override
public Iterator<StorageLocation> getLocations()
{
return new Iterator<StorageLocation>() {
return new Iterator<StorageLocation>()
{

private final int numStorageLocations = storageLocations.size();
private int remainingIterations = numStorageLocations;
Expand Down Expand Up @@ -73,5 +77,4 @@ public StorageLocation next()
}
};
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,14 @@
package org.apache.druid.segment.loading;

import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Lists;
import org.apache.druid.utils.JvmUtils;

import java.io.File;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;

/**
*
Expand Down Expand Up @@ -55,9 +55,6 @@ public class SegmentLoaderConfig
@JsonProperty("numBootstrapThreads")
private Integer numBootstrapThreads = null;

@JsonProperty("locationSelectorStrategy")
private StorageLocationSelectorStrategy locationSelectorStrategy;

@JsonProperty
private File infoDir = null;

Expand Down Expand Up @@ -101,15 +98,6 @@ public int getNumBootstrapThreads()
return numBootstrapThreads == null ? numLoadingThreads : numBootstrapThreads;
}

public StorageLocationSelectorStrategy getStorageLocationSelectorStrategy(List<StorageLocation> storageLocations)
{
if (locationSelectorStrategy == null) {
// default strategy if no strategy is specified in the config
locationSelectorStrategy = new LeastBytesUsedStorageLocationSelectorStrategy(storageLocations);
}
return locationSelectorStrategy;
}

public File getInfoDir()
{
if (infoDir == null) {
Expand Down Expand Up @@ -140,11 +128,19 @@ public SegmentLoaderConfig withLocations(List<StorageLocationConfig> locations)
return retVal;
}

@VisibleForTesting
SegmentLoaderConfig withStorageLocationSelectorStrategy(StorageLocationSelectorStrategy strategy)
/**
* Convert StorageLocationConfig objects to StorageLocation objects
*
* Note: {@link #getLocations} is called instead of variable access because some testcases overrides this method
*/
public List<StorageLocation> toStorageLocations()
{
this.locationSelectorStrategy = strategy;
return this;
return this.getLocations()
.stream()
.map(locationConfig -> new StorageLocation(locationConfig.getPath(),
locationConfig.getMaxSize(),
locationConfig.getFreeSpacePercent()))
.collect(Collectors.toList());
}

@Override
Expand All @@ -154,7 +150,6 @@ public String toString()
"locations=" + locations +
", deleteOnRemove=" + deleteOnRemove +
", dropSegmentDelayMillis=" + dropSegmentDelayMillis +
", locationSelectorStrategy=" + locationSelectorStrategy +
", infoDir=" + infoDir +
'}';
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,9 @@
import org.apache.druid.segment.Segment;
import org.apache.druid.timeline.DataSegment;

import javax.annotation.Nonnull;
import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.ConcurrentHashMap;
Expand Down Expand Up @@ -82,24 +82,48 @@ public class SegmentLoaderLocalCacheManager implements SegmentLoader
@Inject
public SegmentLoaderLocalCacheManager(
IndexIO indexIO,
List<StorageLocation> locations,
SegmentLoaderConfig config,
@Nonnull StorageLocationSelectorStrategy strategy,
@Json ObjectMapper mapper
)
{
this.indexIO = indexIO;
this.config = config;
this.jsonMapper = mapper;
this.locations = new ArrayList<>();
for (StorageLocationConfig locationConfig : config.getLocations()) {
locations.add(
new StorageLocation(
locationConfig.getPath(),
locationConfig.getMaxSize(),
locationConfig.getFreeSpacePercent()
)
);
}
this.strategy = config.getStorageLocationSelectorStrategy(locations);
this.locations = locations;
this.strategy = strategy;
log.info("Using storage location strategy: [%s]", this.strategy.getClass().getSimpleName());
}

@VisibleForTesting
SegmentLoaderLocalCacheManager(
IndexIO indexIO,
SegmentLoaderConfig config,
@Nonnull StorageLocationSelectorStrategy strategy,
@Json ObjectMapper mapper
)
{
this(indexIO, config.toStorageLocations(), config, strategy, mapper);
}

/**
* creates instance with default storage location selector strategy
*
* This ctor is mainly for test cases, including test cases in other modules
*/
public SegmentLoaderLocalCacheManager(
IndexIO indexIO,
SegmentLoaderConfig config,
@Json ObjectMapper mapper
)
{
this.indexIO = indexIO;
this.config = config;
this.jsonMapper = mapper;
this.locations = config.toStorageLocations();
this.strategy = new LeastBytesUsedStorageLocationSelectorStrategy(locations);
log.info("Using storage location strategy: [%s]", this.strategy.getClass().getSimpleName());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@
* https://github.com/apache/druid/pull/8038#discussion_r325520829 of PR https://github
* .com/apache/druid/pull/8038 for more details.
*/
@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "type", defaultImpl =
@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "strategy", defaultImpl =
LeastBytesUsedStorageLocationSelectorStrategy.class)
@JsonSubTypes(value = {
@JsonSubTypes.Type(name = "leastBytesUsed", value = LeastBytesUsedStorageLocationSelectorStrategy.class),
Expand Down
Loading

0 comments on commit eb57222

Please sign in to comment.