Skip to content

Commit

Permalink
System indices auto-create for searchable-snapshots (#66940)
Browse files Browse the repository at this point in the history
Backport of #66276.

Part of #61656. Use the system indices auto-creation infrastructure
for the searchable snapshots plugin.
  • Loading branch information
pugnascotia authored Jan 6, 2021
1 parent 81871b9 commit 065dc33
Show file tree
Hide file tree
Showing 3 changed files with 154 additions and 198 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,8 @@
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.elasticsearch.ElasticsearchTimeoutException;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.ResourceAlreadyExistsException;
import org.elasticsearch.Version;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.admin.indices.create.CreateIndexResponse;
import org.elasticsearch.action.get.GetRequest;
import org.elasticsearch.action.get.GetResponse;
import org.elasticsearch.action.index.IndexRequest;
Expand All @@ -24,180 +22,36 @@
import org.elasticsearch.client.Client;
import org.elasticsearch.client.OriginSettingClient;
import org.elasticsearch.cluster.block.ClusterBlockException;
import org.elasticsearch.cluster.metadata.IndexMetadata;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.ByteSizeUnit;
import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.index.IndexSettings;
import org.elasticsearch.index.translog.Translog;
import org.elasticsearch.node.NodeClosedException;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.ConnectTransportException;
import org.elasticsearch.xpack.cluster.routing.allocation.DataTierAllocationDecider;

import java.io.IOException;
import java.time.Instant;
import java.util.concurrent.TimeUnit;

import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
import static org.elasticsearch.index.mapper.MapperService.SINGLE_MAPPING_NAME;
import static org.elasticsearch.xpack.core.ClientHelper.SEARCHABLE_SNAPSHOTS_ORIGIN;
import static org.elasticsearch.xpack.searchablesnapshots.SearchableSnapshots.DATA_TIERS_PREFERENCE;

public class BlobStoreCacheService {

private static final Logger logger = LogManager.getLogger(BlobStoreCacheService.class);

public static final int DEFAULT_CACHED_BLOB_SIZE = ByteSizeUnit.KB.toIntBytes(4);

private final ClusterService clusterService;
private final ThreadPool threadPool;
private final Client client;
private final String index;

public BlobStoreCacheService(ClusterService clusterService, ThreadPool threadPool, Client client, String index) {
public BlobStoreCacheService(ThreadPool threadPool, Client client, String index) {
this.client = new OriginSettingClient(client, SEARCHABLE_SNAPSHOTS_ORIGIN);
this.clusterService = clusterService;
this.threadPool = threadPool;
this.index = index;
}

private void createIndexIfNecessary(ActionListener<String> listener) {
if (clusterService.state().routingTable().hasIndex(index)) {
listener.onResponse(index);
return;
}
try {
client.admin()
.indices()
.prepareCreate(index)
.setSettings(indexSettings())
.addMapping(SINGLE_MAPPING_NAME, mappings())
.execute(new ActionListener<CreateIndexResponse>() {
@Override
public void onResponse(CreateIndexResponse createIndexResponse) {
assert createIndexResponse.index().equals(index);
listener.onResponse(createIndexResponse.index());
}

@Override
public void onFailure(Exception e) {
if (e instanceof ResourceAlreadyExistsException
|| ExceptionsHelper.unwrapCause(e) instanceof ResourceAlreadyExistsException) {
listener.onResponse(index);
} else {
listener.onFailure(e);
}
}
});
} catch (Exception e) {
listener.onFailure(e);
}
}

private static Settings indexSettings() {
return Settings.builder()
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1)
.put(IndexMetadata.SETTING_AUTO_EXPAND_REPLICAS, "0-1")
.put(IndexMetadata.SETTING_PRIORITY, "900")
.put(IndexSettings.INDEX_TRANSLOG_DURABILITY_SETTING.getKey(), Translog.Durability.ASYNC)
.put(DataTierAllocationDecider.INDEX_ROUTING_PREFER, DATA_TIERS_PREFERENCE)
.build();
}

private static XContentBuilder mappings() throws IOException {
final XContentBuilder builder = jsonBuilder();
{
builder.startObject();
{
builder.startObject(SINGLE_MAPPING_NAME);
builder.field("dynamic", "strict");
{
builder.startObject("_meta");
builder.field("version", Version.CURRENT);
builder.endObject();
}
{
builder.startObject("properties");
{
builder.startObject("type");
builder.field("type", "keyword");
builder.endObject();
}
{
builder.startObject("creation_time");
builder.field("type", "date");
builder.field("format", "epoch_millis");
builder.endObject();
}
{
builder.startObject("version");
builder.field("type", "integer");
builder.endObject();
}
{
builder.startObject("repository");
builder.field("type", "keyword");
builder.endObject();
}
{
builder.startObject("blob");
builder.field("type", "object");
{
builder.startObject("properties");
{
builder.startObject("name");
builder.field("type", "keyword");
builder.endObject();
builder.startObject("path");
builder.field("type", "keyword");
builder.endObject();
}
builder.endObject();
}
builder.endObject();
}
{
builder.startObject("data");
builder.field("type", "object");
{
builder.startObject("properties");
{
builder.startObject("content");
builder.field("type", "binary");
builder.endObject();
}
{
builder.startObject("length");
builder.field("type", "long");
builder.endObject();
}
{
builder.startObject("from");
builder.field("type", "long");
builder.endObject();
}
{
builder.startObject("to");
builder.field("type", "long");
builder.endObject();
}
builder.endObject();
}
builder.endObject();
}
builder.endObject();
}
builder.endObject();
}
builder.endObject();
}
return builder;
}

public CachedBlob get(String repository, String name, String path, long offset) {
assert Thread.currentThread().getName().contains('[' + ThreadPool.Names.SYSTEM_READ + ']') == false : "must not block ["
+ Thread.currentThread().getName()
Expand Down Expand Up @@ -267,52 +121,37 @@ private static boolean isExpectedCacheGetException(Exception e) {
}

public void putAsync(String repository, String name, String path, long offset, BytesReference content, ActionListener<Void> listener) {
createIndexIfNecessary(new ActionListener<String>() {
@Override
public void onResponse(String s) {
final IndexRequest request;
try {
final CachedBlob cachedBlob = new CachedBlob(
Instant.ofEpochMilli(threadPool.absoluteTimeInMillis()),
Version.CURRENT,
repository,
name,
path,
content,
offset
);
request = new IndexRequest(index).id(cachedBlob.generatedId());
try (XContentBuilder builder = jsonBuilder()) {
request.source(cachedBlob.toXContent(builder, ToXContent.EMPTY_PARAMS));
}
try {
final CachedBlob cachedBlob = new CachedBlob(
Instant.ofEpochMilli(threadPool.absoluteTimeInMillis()),
Version.CURRENT,
repository,
name,
path,
content,
offset
);
final IndexRequest request = new IndexRequest(index).id(cachedBlob.generatedId());
try (XContentBuilder builder = jsonBuilder()) {
request.source(cachedBlob.toXContent(builder, ToXContent.EMPTY_PARAMS));
}

client.index(request, new ActionListener<IndexResponse>() {
@Override
public void onResponse(IndexResponse indexResponse) {
logger.trace("cache fill ({}): [{}]", indexResponse.status(), request.id());
listener.onResponse(null);
}
client.index(request, new ActionListener<IndexResponse>() {
@Override
public void onResponse(IndexResponse indexResponse) {
logger.trace("cache fill ({}): [{}]", indexResponse.status(), request.id());
listener.onResponse(null);
}

@Override
public void onFailure(Exception e) {
logger.debug(new ParameterizedMessage("failure in cache fill: [{}]", request.id()), e);
listener.onFailure(e);
}
});
} catch (Exception e) {
logger.warn(
new ParameterizedMessage("cache fill failure: [{}]", CachedBlob.generateId(repository, name, path, offset)),
e
);
@Override
public void onFailure(Exception e) {
logger.debug(new ParameterizedMessage("failure in cache fill: [{}]", request.id()), e);
listener.onFailure(e);
}
}

@Override
public void onFailure(Exception e) {
logger.error(() -> new ParameterizedMessage("failed to create blob cache system index [{}]", index), e);
listener.onFailure(e);
}
});
});
} catch (Exception e) {
logger.warn(new ParameterizedMessage("cache fill failure: [{}]", CachedBlob.generateId(repository, name, path, offset)), e);
listener.onFailure(e);
}
}
}
Loading

0 comments on commit 065dc33

Please sign in to comment.