Skip to content

Commit

Permalink
refactor
Browse files Browse the repository at this point in the history
  • Loading branch information
martijnvg committed Jan 10, 2022
1 parent efdd833 commit f97d889
Show file tree
Hide file tree
Showing 3 changed files with 103 additions and 64 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,9 @@
import org.elasticsearch.action.support.replication.ReplicatedWriteRequest;
import org.elasticsearch.action.support.replication.ReplicationRequest;
import org.elasticsearch.client.internal.Requests;
import org.elasticsearch.cluster.metadata.DataStream;
import org.elasticsearch.cluster.metadata.IndexAbstraction;
import org.elasticsearch.cluster.metadata.Metadata;
import org.elasticsearch.cluster.routing.IndexRouting;
import org.elasticsearch.common.ParsingException;
import org.elasticsearch.common.UUIDs;
import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.common.bytes.BytesReference;
Expand All @@ -34,25 +32,19 @@
import org.elasticsearch.core.Nullable;
import org.elasticsearch.index.Index;
import org.elasticsearch.index.VersionType;
import org.elasticsearch.index.mapper.DateFieldMapper;
import org.elasticsearch.index.mapper.MapperService;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.xcontent.XContentBuilder;
import org.elasticsearch.xcontent.XContentFactory;
import org.elasticsearch.xcontent.XContentParser;
import org.elasticsearch.xcontent.XContentParserConfiguration;
import org.elasticsearch.xcontent.XContentType;

import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.time.Instant;
import java.util.Locale;
import java.util.Map;
import java.util.Objects;
import java.util.Set;

import static org.elasticsearch.action.ValidateActions.addValidationError;
import static org.elasticsearch.common.xcontent.XContentParserUtils.ensureExpectedToken;
import static org.elasticsearch.index.seqno.SequenceNumbers.UNASSIGNED_PRIMARY_TERM;
import static org.elasticsearch.index.seqno.SequenceNumbers.UNASSIGNED_SEQ_NO;

Expand All @@ -75,11 +67,6 @@
*/
public class IndexRequest extends ReplicatedWriteRequest<IndexRequest> implements DocWriteRequest<IndexRequest>, CompositeIndicesRequest {

public static final XContentParserConfiguration TS_EXTRACT_CONFIG = XContentParserConfiguration.EMPTY.withFiltering(
Set.of("@timestamp"),
null
);

private static final long SHALLOW_SIZE = RamUsageEstimator.shallowSizeOfInstance(IndexRequest.class);

/**
Expand Down Expand Up @@ -736,55 +723,7 @@ public boolean isRequireAlias() {

@Override
public Index getConcreteWriteIndex(IndexAbstraction ia, Metadata metadata) {
if (opType() != DocWriteRequest.OpType.CREATE) {
return ia.getWriteIndex();
}

if (ia.getType() != IndexAbstraction.Type.DATA_STREAM) {
return ia.getWriteIndex();
}

DataStream dataStream = metadata.dataStreams().get(ia.getName());
if (dataStream.isTimeSeries(metadata::index) == false) {
return ia.getWriteIndex();
}

try (XContentParser parser = contentType.xContent().createParser(TS_EXTRACT_CONFIG, source().streamInput())) {
ensureExpectedToken(XContentParser.Token.START_OBJECT, parser.nextToken(), parser);
ensureExpectedToken(XContentParser.Token.FIELD_NAME, parser.nextToken(), parser);
Instant timestamp;
switch (parser.nextToken()) {
case VALUE_STRING:
// TODO: deal with nanos too here.
// (the index hasn't been resolved yet, keep track of timestamp field metadata at data stream level, so we can use it
// here)
timestamp = Instant.from(DateFieldMapper.DEFAULT_DATE_TIME_FORMATTER.parse(parser.text()));
break;
case VALUE_NUMBER:
timestamp = Instant.ofEpochMilli(parser.longValue());
break;
default:
throw new ParsingException(
parser.getTokenLocation(),
String.format(
Locale.ROOT,
"Failed to parse object: expecting token of type [%s] or [%s] but found [%s]",
XContentParser.Token.VALUE_STRING,
XContentParser.Token.VALUE_NUMBER,
parser.currentToken()
)
);
}

Index result = dataStream.selectTimeSeriesWriteIndex(timestamp, metadata);
if (result == null) {
String timestampAsString = DateFieldMapper.DEFAULT_DATE_TIME_FORMATTER.format(timestamp);
throw new IllegalArgumentException("no index available for a document with an @timestamp of [" + timestampAsString + "]");
}
return result;
} catch (IOException e) {
throw new IllegalArgumentException("Error extracting timestamp: " + e.getMessage(), e);
}
return ia.getWriteIndex(this, metadata);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,16 +7,30 @@
*/
package org.elasticsearch.cluster.metadata;

import org.elasticsearch.action.DocWriteRequest;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.common.ParsingException;
import org.elasticsearch.common.Strings;
import org.elasticsearch.core.Nullable;
import org.elasticsearch.index.Index;

import org.elasticsearch.index.IndexSettings;
import org.elasticsearch.index.mapper.DateFieldMapper;
import org.elasticsearch.xcontent.XContent;
import org.elasticsearch.xcontent.XContentParser;
import org.elasticsearch.xcontent.XContentParserConfiguration;

import java.io.IOException;
import java.time.Instant;
import java.util.ArrayList;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.stream.Collectors;

import static org.elasticsearch.common.xcontent.XContentParserUtils.ensureExpectedToken;

/**
* An index abstraction is a reference to one or more concrete indices.
* An index abstraction has a unique name and encapsulates all the {@link IndexMetadata} instances it is pointing to.
Expand Down Expand Up @@ -51,6 +65,10 @@ public interface IndexAbstraction {
@Nullable
Index getWriteIndex();

default Index getWriteIndex(IndexRequest request, Metadata metadata) {
return getWriteIndex();
}

/**
* @return the data stream to which this index belongs or <code>null</code> if this is not a concrete index or
* if it is a concrete index that does not belong to a data stream.
Expand Down Expand Up @@ -384,6 +402,11 @@ public int hashCode() {

class DataStream implements IndexAbstraction {

public static final XContentParserConfiguration TS_EXTRACT_CONFIG = XContentParserConfiguration.EMPTY.withFiltering(
Set.of("@timestamp"),
null
);

private final org.elasticsearch.cluster.metadata.DataStream dataStream;
private final List<String> referencedByDataStreamAliases;

Expand Down Expand Up @@ -411,6 +434,76 @@ public Index getWriteIndex() {
return dataStream.getWriteIndex();
}

@Override
public Index getWriteIndex(IndexRequest request, Metadata metadata) {
if (request.opType() != DocWriteRequest.OpType.CREATE) {
return getWriteIndex();
}

if (getType() != IndexAbstraction.Type.DATA_STREAM) {
return getWriteIndex();
}

if (dataStream.isTimeSeries(metadata::index) == false) {
return getWriteIndex();
}

Instant timestamp;
XContent xContent = request.getContentType().xContent();
try (XContentParser parser = xContent.createParser(TS_EXTRACT_CONFIG, request.source().streamInput())) {
ensureExpectedToken(XContentParser.Token.START_OBJECT, parser.nextToken(), parser);
ensureExpectedToken(XContentParser.Token.FIELD_NAME, parser.nextToken(), parser);
switch (parser.nextToken()) {
case VALUE_STRING:
// TODO: deal with nanos too here.
// (the index hasn't been resolved yet, keep track of timestamp field metadata at data stream level,
// so we can use it here)
timestamp = Instant.from(DateFieldMapper.DEFAULT_DATE_TIME_FORMATTER.parse(parser.text()));
break;
case VALUE_NUMBER:
timestamp = Instant.ofEpochMilli(parser.longValue());
break;
default:
throw new ParsingException(
parser.getTokenLocation(),
String.format(
Locale.ROOT,
"Failed to parse object: expecting token of type [%s] or [%s] but found [%s]",
XContentParser.Token.VALUE_STRING,
XContentParser.Token.VALUE_NUMBER,
parser.currentToken()
)
);
}
} catch (IOException e) {
throw new IllegalArgumentException("Error extracting timestamp: " + e.getMessage(), e);
}
Index result = dataStream.selectTimeSeriesWriteIndex(timestamp, metadata);
if (result == null) {
String timestampAsString = DateFieldMapper.DEFAULT_DATE_TIME_FORMATTER.format(timestamp);
String writeableIndicesString = dataStream.getIndices()
.stream()
.map(metadata::index)
.map(IndexMetadata::getSettings)
.map(
settings -> "["
+ settings.get(IndexSettings.TIME_SERIES_START_TIME.getKey())
+ ","
+ settings.get(IndexSettings.TIME_SERIES_END_TIME.getKey())
+ "]"
)
.collect(Collectors.joining());
throw new IllegalArgumentException(
"the document timestamp ["
+ timestampAsString
+ "] is outside of ranges of currently writable indices ["
+ writeableIndicesString
+ "]"
);
}
return result;
}

@Override
public DataStream getParentDataStream() {
// a data stream cannot have a parent data stream
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -381,7 +381,14 @@ public void testGetConcreteWriteIndex() {
);
assertThat(
e.getMessage(),
equalTo("no index available for a document with an @timestamp of [$time]".replace("$time", formatInstant(end2)))
equalTo(
"the document timestamp [$time] is outside of ranges of currently writable indices [[$start1,$end1][$start2,$end2]]"
.replace("$time", formatInstant(end2))
.replace("$start1", formatInstant(start1))
.replace("$end1", formatInstant(end1))
.replace("$start2", formatInstant(start2))
.replace("$end2", formatInstant(end2))
)
);
}
}
Expand Down

0 comments on commit f97d889

Please sign in to comment.