Skip to content

Commit

Permalink
Derived Source POC
Browse files Browse the repository at this point in the history
Signed-off-by: Shreyansh Ray <rayshrey@amazon.com>
  • Loading branch information
rayshrey committed Dec 6, 2024
1 parent 98dbc4a commit 3e4f0b2
Show file tree
Hide file tree
Showing 6 changed files with 291 additions and 2 deletions.
3 changes: 2 additions & 1 deletion server/src/main/java/org/opensearch/index/IndexService.java
Original file line number Diff line number Diff line change
Expand Up @@ -691,7 +691,8 @@ protected void closeInternal() {
recoverySettings,
remoteStoreSettings,
seedRemote,
discoveryNodes
discoveryNodes,
indexFieldData
);
eventListener.indexShardStateChanged(indexShard, null, indexShard.state(), "shard created");
eventListener.afterIndexShardCreated(indexShard);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@

import org.apache.lucene.util.Accountable;
import org.opensearch.ExceptionsHelper;
import org.opensearch.common.annotation.PublicApi;
import org.opensearch.common.settings.Setting;
import org.opensearch.common.settings.Setting.Property;
import org.opensearch.core.index.shard.ShardId;
Expand All @@ -59,6 +60,7 @@
*
* @opensearch.internal
*/
@PublicApi(since = "1.0.0")
public class IndexFieldDataService extends AbstractIndexComponent implements Closeable {
public static final String FIELDDATA_CACHE_VALUE_NODE = "node";
public static final String FIELDDATA_CACHE_KEY = "index.fielddata.cache";
Expand Down
139 changes: 139 additions & 0 deletions server/src/main/java/org/opensearch/index/get/ShardGetService.java
Original file line number Diff line number Diff line change
Expand Up @@ -37,10 +37,14 @@
import org.apache.lucene.index.IndexOptions;
import org.apache.lucene.index.IndexableField;
import org.apache.lucene.index.IndexableFieldType;
import org.apache.lucene.index.LeafReader;
import org.apache.lucene.index.SortedNumericDocValues;
import org.apache.lucene.index.SortedSetDocValues;
import org.apache.lucene.index.StoredFieldVisitor;
import org.apache.lucene.index.Term;
import org.apache.lucene.index.VectorEncoding;
import org.apache.lucene.index.VectorSimilarityFunction;
import org.apache.lucene.util.BytesRef;
import org.opensearch.OpenSearchException;
import org.opensearch.common.Nullable;
import org.opensearch.common.annotation.PublicApi;
Expand All @@ -51,36 +55,46 @@
import org.opensearch.common.metrics.CounterMetric;
import org.opensearch.common.metrics.MeanMetric;
import org.opensearch.common.util.set.Sets;
import org.opensearch.common.xcontent.XContentFactory;
import org.opensearch.common.xcontent.XContentHelper;
import org.opensearch.common.xcontent.XContentType;
import org.opensearch.common.xcontent.support.XContentMapValues;
import org.opensearch.core.common.bytes.BytesReference;
import org.opensearch.core.xcontent.MediaTypeRegistry;
import org.opensearch.core.xcontent.XContentBuilder;
import org.opensearch.index.IndexSettings;
import org.opensearch.index.VersionType;
import org.opensearch.index.engine.Engine;
import org.opensearch.index.engine.TranslogLeafReader;
import org.opensearch.index.fielddata.LeafNumericFieldData;
import org.opensearch.index.fielddata.SortedNumericDoubleValues;
import org.opensearch.index.fieldvisitor.CustomFieldsVisitor;
import org.opensearch.index.fieldvisitor.FieldsVisitor;
import org.opensearch.index.mapper.DocumentMapper;
import org.opensearch.index.mapper.FieldMapper;
import org.opensearch.index.mapper.IdFieldMapper;
import org.opensearch.index.mapper.Mapper;
import org.opensearch.index.mapper.MapperService;
import org.opensearch.index.mapper.MetadataFieldMapper;
import org.opensearch.index.mapper.NumberFieldMapper;
import org.opensearch.index.mapper.ParsedDocument;
import org.opensearch.index.mapper.RoutingFieldMapper;
import org.opensearch.index.mapper.SourceFieldMapper;
import org.opensearch.index.mapper.SourceToParse;
import org.opensearch.index.mapper.Uid;
import org.opensearch.index.shard.AbstractIndexShardComponent;
import org.opensearch.index.shard.IndexShard;
import org.opensearch.search.DocValueFormat;
import org.opensearch.search.fetch.subphase.FetchSourceContext;

import java.io.IOException;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import java.util.stream.Stream;

import static org.opensearch.index.seqno.SequenceNumbers.UNASSIGNED_PRIMARY_TERM;
Expand Down Expand Up @@ -415,6 +429,18 @@ private GetResult innerGetLoadFromStoredFields(
}
}

try {
Map<String, Object> sourceAsMap = buildUsingDocValues(docIdAndVersion.docId, docIdAndVersion.reader, mapperService, indexShard);
sourceAsMap = unflatten(sourceAsMap);
try (XContentBuilder builder = XContentFactory.jsonBuilder()) {
builder.map(sourceAsMap);
source = BytesReference.bytes(builder);
}
} catch (IOException ex) {
throw new RuntimeException(ex);
}


return new GetResult(
shardId.getIndexName(),
id,
Expand All @@ -428,11 +454,124 @@ private GetResult innerGetLoadFromStoredFields(
);
}

private static Map<String, Object> unflatten(Map<String, Object> flattened) {
Map<String, Object> unflattened = new HashMap<>();
for (String key : flattened.keySet()) {
doUnflatten(flattened, unflattened, key, flattened.get(key));
}
return unflattened;
}

private static Map<String, Object> doUnflatten(
Map<String, Object> flattened,
Map<String, Object> unflattened,
String key,
Object value) {

String[] parts = key.split("\\.");
for (int i = 0; i < parts.length; i++) {
String part = parts[i];
Object current = flattened.get(part);
if (i == (parts.length - 1)) {
unflattened.put(part, value);
} else if (current == null) {
if ((current = unflattened.get(part)) == null) {
current = new HashMap<>();
}
unflattened.put(part, current);
unflattened = (Map<String, Object>) current;
} else if (current instanceof Map) {
unflattened.put(part, current);
unflattened = (Map<String, Object>) current;
}
}
return unflattened;
}


private static FieldsVisitor buildFieldsVisitors(String[] fields, FetchSourceContext fetchSourceContext) {
if (fields == null || fields.length == 0) {
return fetchSourceContext.fetchSource() ? new FieldsVisitor(true) : null;
}

return new CustomFieldsVisitor(Sets.newHashSet(fields), fetchSourceContext.fetchSource());
}

private static Map<String, Object> buildUsingDocValues(int docId, LeafReader reader, MapperService mapperService, IndexShard indexShard) throws IOException {
Map<String, Object> docValues = new HashMap<>();
for (Mapper mapper: mapperService.documentMapper().mappers()) {
if (mapper instanceof MetadataFieldMapper) {
continue;
}
mapper.name();
if (mapper instanceof FieldMapper) {
FieldMapper fieldMapper = (FieldMapper) mapper;
if (fieldMapper.fieldType().hasDocValues()) {
String fieldName = fieldMapper.name();
FieldInfo fieldInfo = reader.getFieldInfos().fieldInfo(fieldName);
DocValueFormat format = fieldMapper.fieldType().docValueFormat(null, null);
if (fieldInfo != null) {
switch (fieldInfo.getDocValuesType()) {
case SORTED_SET:
SortedSetDocValues dv = reader.getSortedSetDocValues(fieldName);
if (dv.advanceExact(docId)) {
BytesRef[] values = new BytesRef[dv.docValueCount()];
for (int i = 0; i < dv.docValueCount(); i++) {
values[i] = dv.lookupOrd(dv.nextOrd());
}
if (values.length > 1) {
docValues.put(fieldName, Arrays.stream(values).map(format::format).collect(Collectors.toList()));
} else {
docValues.put(fieldName, format.format(values[0]));
}
}
break;
case SORTED_NUMERIC:
SortedNumericDocValues sndv = reader.getSortedNumericDocValues(fieldName);
if (fieldMapper instanceof NumberFieldMapper) {
NumberFieldMapper.NumberType numberType = ((NumberFieldMapper) fieldMapper).getType();
switch (numberType) {
case HALF_FLOAT:
case FLOAT:
case DOUBLE:
SortedNumericDoubleValues doubleValues = ((LeafNumericFieldData) indexShard.indexFieldDataService().getForField(fieldMapper.fieldType(), "", () -> null)
.load(reader.getContext())).getDoubleValues();
if (doubleValues.advanceExact(docId)) {
int size = doubleValues.docValueCount();
double[] vals = new double[size];
for (int i = 0; i < size; i++) {
vals[i] = doubleValues.nextValue();
}
if (size > 1) {
docValues.put(fieldName, vals);
} else {
docValues.put(fieldName, vals[0]);
}
}
break;
case INTEGER:
case LONG:
case UNSIGNED_LONG:
if (sndv.advanceExact(docId)) {
int size = sndv.docValueCount();
long[] vals = new long[size];
for (int i = 0; i < size; i++) {
vals[i] = sndv.nextValue();
}
if (size > 1) {
docValues.put(fieldName, vals);
} else {
docValues.put(fieldName, vals[0]);
}
}
}
}
break;
}
}
}
}
}
return docValues;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -1817,4 +1817,8 @@ protected void parseCreateField(ParseContext context) throws IOException {
public ParametrizedFieldMapper.Builder getMergeBuilder() {
return new Builder(simpleName(), type, ignoreMalformedByDefault, coerceByDefault).init(this);
}

public NumberType getType() {
return type;
}
}
10 changes: 9 additions & 1 deletion server/src/main/java/org/opensearch/index/shard/IndexShard.java
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,7 @@
import org.opensearch.index.engine.Segment;
import org.opensearch.index.engine.SegmentsStats;
import org.opensearch.index.fielddata.FieldDataStats;
import org.opensearch.index.fielddata.IndexFieldDataService;
import org.opensearch.index.fielddata.ShardFieldData;
import org.opensearch.index.flush.FlushStats;
import org.opensearch.index.get.GetStats;
Expand Down Expand Up @@ -361,6 +362,7 @@ Runnable getGlobalCheckpointSyncer() {
*/
private final ShardMigrationState shardMigrationState;
private DiscoveryNodes discoveryNodes;
private final IndexFieldDataService indexFieldDataService;

public IndexShard(
final ShardRouting shardRouting,
Expand Down Expand Up @@ -391,7 +393,8 @@ public IndexShard(
final RecoverySettings recoverySettings,
final RemoteStoreSettings remoteStoreSettings,
boolean seedRemote,
final DiscoveryNodes discoveryNodes
final DiscoveryNodes discoveryNodes,
final IndexFieldDataService indexFieldDataService
) throws IOException {
super(shardRouting.shardId(), indexSettings);
assert shardRouting.initializing();
Expand Down Expand Up @@ -493,6 +496,11 @@ public boolean shouldCache(Query query) {
this.fileDownloader = new RemoteStoreFileDownloader(shardRouting.shardId(), threadPool, recoverySettings);
this.shardMigrationState = getShardMigrationState(indexSettings, seedRemote);
this.discoveryNodes = discoveryNodes;
this.indexFieldDataService = indexFieldDataService;
}

public IndexFieldDataService indexFieldDataService() {
return indexFieldDataService;
}

public ThreadPool getThreadPool() {
Expand Down
Loading

0 comments on commit 3e4f0b2

Please sign in to comment.