Skip to content

Commit

Permalink
Add vector data upload implementation to RemoteIndexBuildStrategy (op…
Browse files Browse the repository at this point in the history
…ensearch-project#2550)

Signed-off-by: Jay Deng <jayd0104@gmail.com>
  • Loading branch information
jed326 authored Feb 26, 2025
1 parent d5b2982 commit 5873add
Show file tree
Hide file tree
Showing 16 changed files with 1,344 additions and 101 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),

## [Unreleased 3.0](https://github.com/opensearch-project/k-NN/compare/2.x...HEAD)
### Features
* [Remote Vector Index Build] Introduce Remote Native Index Build feature flag, settings, and initial skeleton [#2525](https://github.com/opensearch-project/k-NN/pull/2525)
* [Remote Vector Index Build] Implement vector data upload and vector data size threshold setting [#2550](https://github.com/opensearch-project/k-NN/pull/2550)
### Enhancements
* Introduce node level circuit breakers for k-NN [#2509](https://github.com/opensearch-project/k-NN/pull/2509)
### Bug Fixes
Expand Down
21 changes: 19 additions & 2 deletions src/main/java/org/opensearch/knn/index/KNNSettings.java
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@
import org.opensearch.OpenSearchParseException;
import org.opensearch.action.admin.cluster.settings.ClusterUpdateSettingsRequest;
import org.opensearch.action.admin.cluster.settings.ClusterUpdateSettingsResponse;
import org.opensearch.transport.client.Client;
import org.opensearch.cluster.metadata.IndexMetadata;
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.Booleans;
Expand All @@ -29,6 +28,7 @@
import org.opensearch.knn.quantization.models.quantizationState.QuantizationStateCacheManager;
import org.opensearch.monitor.jvm.JvmInfo;
import org.opensearch.monitor.os.OsProbe;
import org.opensearch.transport.client.Client;

import java.security.InvalidParameterException;
import java.util.Arrays;
Expand Down Expand Up @@ -99,6 +99,7 @@ public class KNNSettings {
public static final String KNN_DERIVED_SOURCE_ENABLED = "index.knn.derived_source.enabled";
public static final String KNN_INDEX_REMOTE_VECTOR_BUILD = "index.knn.remote_index_build.enabled";
public static final String KNN_REMOTE_VECTOR_REPO = "knn.remote_index_build.vector_repo";
public static final String KNN_INDEX_REMOTE_VECTOR_BUILD_THRESHOLD = "index.knn.remote_index_build.size_threshold";

/**
* Default setting values
Expand Down Expand Up @@ -129,6 +130,8 @@ public class KNNSettings {
// 10% of the JVM heap
public static final Integer KNN_DEFAULT_QUANTIZATION_STATE_CACHE_EXPIRY_TIME_MINUTES = 60;
public static final boolean KNN_DISK_VECTOR_SHARD_LEVEL_RESCORING_DISABLED_VALUE = false;
// TODO: Tune this default value based on benchmarking
public static final ByteSizeValue KNN_INDEX_REMOTE_VECTOR_BUILD_THRESHOLD_DEFAULT_VALUE = new ByteSizeValue(50, ByteSizeUnit.MB);

/**
* Settings Definition
Expand Down Expand Up @@ -397,6 +400,15 @@ public class KNNSettings {
*/
public static final Setting<String> KNN_REMOTE_VECTOR_REPO_SETTING = Setting.simpleString(KNN_REMOTE_VECTOR_REPO, Dynamic, NodeScope);

/**
* Index level setting which indicates the size threshold above which remote vector builds will be enabled.
*/
public static final Setting<ByteSizeValue> KNN_INDEX_REMOTE_VECTOR_BUILD_THRESHOLD_SETTING = Setting.byteSizeSetting(
KNN_INDEX_REMOTE_VECTOR_BUILD_THRESHOLD,
KNN_INDEX_REMOTE_VECTOR_BUILD_THRESHOLD_DEFAULT_VALUE,
Dynamic,
IndexScope
);
/**
* Dynamic settings
*/
Expand Down Expand Up @@ -584,6 +596,10 @@ private Setting<?> getSetting(String key) {
return KNN_REMOTE_VECTOR_REPO_SETTING;
}

if (KNN_INDEX_REMOTE_VECTOR_BUILD_THRESHOLD.equals(key)) {
return KNN_INDEX_REMOTE_VECTOR_BUILD_THRESHOLD_SETTING;
}

throw new IllegalArgumentException("Cannot find setting by key [" + key + "]");
}

Expand Down Expand Up @@ -611,7 +627,8 @@ public List<Setting<?>> getSettings() {
KNN_DISK_VECTOR_SHARD_LEVEL_RESCORING_DISABLED_SETTING,
KNN_DERIVED_SOURCE_ENABLED_SETTING,
KNN_INDEX_REMOTE_VECTOR_BUILD_SETTING,
KNN_REMOTE_VECTOR_REPO_SETTING
KNN_REMOTE_VECTOR_REPO_SETTING,
KNN_INDEX_REMOTE_VECTOR_BUILD_THRESHOLD_SETTING
);
return Stream.concat(settings.stream(), Stream.concat(getFeatureFlags().stream(), dynamicCacheSettings.values().stream()))
.collect(Collectors.toList());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,14 +7,18 @@

import org.apache.lucene.index.FieldInfo;
import org.opensearch.index.IndexSettings;
import org.opensearch.knn.common.featureflags.KNNFeatureFlags;
import org.opensearch.knn.index.codec.nativeindex.remote.RemoteIndexBuildStrategy;
import org.opensearch.knn.index.engine.KNNEngine;
import org.opensearch.knn.index.vectorvalues.KNNVectorValues;
import org.opensearch.repositories.RepositoriesService;

import java.io.IOException;
import java.util.function.Supplier;

import static org.opensearch.knn.common.FieldInfoExtractor.extractKNNEngine;
import static org.opensearch.knn.common.KNNConstants.MODEL_ID;
import static org.opensearch.knn.index.codec.util.KNNCodecUtil.initializeVectorValues;

/**
* Creates the {@link NativeIndexBuildStrategy}
Expand All @@ -34,11 +38,18 @@ public NativeIndexBuildStrategyFactory(Supplier<RepositoriesService> repositorie
}

/**
* Creates or returns the desired {@link NativeIndexBuildStrategy} implementation. Intended to be used by {@link NativeIndexWriter}
* @param fieldInfo
* @return
* @param fieldInfo Field related attributes/info
* @param totalLiveDocs Number of documents with the vector field. This values comes from {@link org.opensearch.knn.index.codec.KNN990Codec.NativeEngines990KnnVectorsWriter#flush}
* and {@link org.opensearch.knn.index.codec.KNN990Codec.NativeEngines990KnnVectorsWriter#mergeOneField}
* @param knnVectorValues An instance of {@link KNNVectorValues} which is used to evaluate the size threshold KNN_REMOTE_VECTOR_BUILD_THRESHOLD
* @return The {@link NativeIndexBuildStrategy} to be used. Intended to be used by {@link NativeIndexWriter}
* @throws IOException
*/
public NativeIndexBuildStrategy getBuildStrategy(final FieldInfo fieldInfo) {
public NativeIndexBuildStrategy getBuildStrategy(
final FieldInfo fieldInfo,
final int totalLiveDocs,
final KNNVectorValues<?> knnVectorValues
) throws IOException {
final KNNEngine knnEngine = extractKNNEngine(fieldInfo);
boolean isTemplate = fieldInfo.attributes().containsKey(MODEL_ID);
boolean iterative = !isTemplate && KNNEngine.FAISS == knnEngine;
Expand All @@ -47,11 +58,15 @@ public NativeIndexBuildStrategy getBuildStrategy(final FieldInfo fieldInfo) {
? MemOptimizedNativeIndexBuildStrategy.getInstance()
: DefaultIndexBuildStrategy.getInstance();

if (repositoriesServiceSupplier != null
initializeVectorValues(knnVectorValues);
long vectorBlobLength = ((long) knnVectorValues.bytesPerVector()) * totalLiveDocs;

if (KNNFeatureFlags.isKNNRemoteVectorBuildEnabled()
&& repositoriesServiceSupplier != null
&& indexSettings != null
&& knnEngine.supportsRemoteIndexBuild()
&& RemoteIndexBuildStrategy.shouldBuildIndexRemotely(indexSettings)) {
return new RemoteIndexBuildStrategy(repositoriesServiceSupplier, strategy);
&& RemoteIndexBuildStrategy.shouldBuildIndexRemotely(indexSettings, vectorBlobLength)) {
return new RemoteIndexBuildStrategy(repositoriesServiceSupplier, strategy, indexSettings);
} else {
return strategy;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ public class NativeIndexWriter {

private final SegmentWriteState state;
private final FieldInfo fieldInfo;
private final NativeIndexBuildStrategy indexBuilder;
private final NativeIndexBuildStrategyFactory indexBuilderFactory;
@Nullable
private final QuantizationState quantizationState;

Expand Down Expand Up @@ -148,6 +148,11 @@ private void buildAndWriteIndex(final Supplier<KNNVectorValues<?>> knnVectorValu
knnVectorValuesSupplier,
totalLiveDocs
);
NativeIndexBuildStrategy indexBuilder = indexBuilderFactory.getBuildStrategy(
fieldInfo,
totalLiveDocs,
knnVectorValuesSupplier.get()
);
indexBuilder.buildAndWriteIndex(nativeIndexParams);
CodecUtil.writeFooter(output);
}
Expand Down Expand Up @@ -316,6 +321,6 @@ private static NativeIndexWriter createWriter(
@Nullable final QuantizationState quantizationState,
NativeIndexBuildStrategyFactory nativeIndexBuildStrategyFactory
) {
return new NativeIndexWriter(state, fieldInfo, nativeIndexBuildStrategyFactory.getBuildStrategy(fieldInfo), quantizationState);
return new NativeIndexWriter(state, fieldInfo, nativeIndexBuildStrategyFactory, quantizationState);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,213 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.knn.index.codec.nativeindex.remote;

import lombok.AllArgsConstructor;
import lombok.extern.log4j.Log4j2;
import org.opensearch.action.LatchedActionListener;
import org.opensearch.common.CheckedTriFunction;
import org.opensearch.common.StreamContext;
import org.opensearch.common.blobstore.AsyncMultiStreamBlobContainer;
import org.opensearch.common.blobstore.BlobContainer;
import org.opensearch.common.blobstore.BlobPath;
import org.opensearch.common.blobstore.stream.write.WriteContext;
import org.opensearch.common.blobstore.stream.write.WritePriority;
import org.opensearch.common.io.InputStreamContainer;
import org.opensearch.core.action.ActionListener;
import org.opensearch.index.IndexSettings;
import org.opensearch.knn.index.VectorDataType;
import org.opensearch.knn.index.vectorvalues.KNNVectorValues;
import org.opensearch.repositories.blobstore.BlobStoreRepository;

import java.io.BufferedInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Supplier;

import static org.opensearch.knn.index.codec.nativeindex.remote.RemoteIndexBuildStrategy.DOC_ID_FILE_EXTENSION;
import static org.opensearch.knn.index.codec.nativeindex.remote.RemoteIndexBuildStrategy.VECTORS_PATH;
import static org.opensearch.knn.index.codec.nativeindex.remote.RemoteIndexBuildStrategy.VECTOR_BLOB_FILE_EXTENSION;
import static org.opensearch.knn.index.codec.util.KNNCodecUtil.initializeVectorValues;

@Log4j2
@AllArgsConstructor
public class DefaultVectorRepositoryAccessor implements VectorRepositoryAccessor {
private final BlobStoreRepository repository;
private final IndexSettings indexSettings;

/**
* If the repository implements {@link AsyncMultiStreamBlobContainer}, then parallel uploads will be used. Parallel uploads are backed by a {@link WriteContext}, for which we have a custom
* {@link org.opensearch.common.blobstore.stream.write.StreamContextSupplier} implementation.
*
* @see DefaultVectorRepositoryAccessor#getStreamContext
* @see DefaultVectorRepositoryAccessor#getTransferPartStreamSupplier
*
* @param blobName Base name of the blobs we are writing, excluding file extensions
* @param totalLiveDocs Number of documents we are processing. This is used to compute the size of the blob we are writing
* @param vectorDataType Data type of the vector (FLOAT, BYTE, BINARY)
* @param knnVectorValuesSupplier Supplier for {@link KNNVectorValues}
* @throws IOException
* @throws InterruptedException
*/
@Override
public void writeToRepository(
String blobName,
int totalLiveDocs,
VectorDataType vectorDataType,
Supplier<KNNVectorValues<?>> knnVectorValuesSupplier
) throws IOException, InterruptedException {
assert repository != null;
// Get the blob container based on blobName and the repo base path. This is where the blobs will be written to.
BlobPath path = repository.basePath().add(indexSettings.getUUID() + VECTORS_PATH);
BlobContainer blobContainer = repository.blobStore().blobContainer(path);

KNNVectorValues<?> knnVectorValues = knnVectorValuesSupplier.get();
initializeVectorValues(knnVectorValues);
long vectorBlobLength = (long) knnVectorValues.bytesPerVector() * totalLiveDocs;

if (blobContainer instanceof AsyncMultiStreamBlobContainer) {
// First initiate vectors upload
log.debug("Repository {} Supports Parallel Blob Upload", repository);
// WriteContext is the main entry point into asyncBlobUpload. It stores all of our upload configurations, analogous to
// BuildIndexParams
WriteContext writeContext = new WriteContext.Builder().fileName(blobName + VECTOR_BLOB_FILE_EXTENSION)
.streamContextSupplier((partSize) -> getStreamContext(partSize, vectorBlobLength, knnVectorValuesSupplier, vectorDataType))
.fileSize(vectorBlobLength)
.failIfAlreadyExists(true)
.writePriority(WritePriority.NORMAL)
// TODO: Checksum implementations -- It is difficult to calculate a checksum on the knnVectorValues as
// there is no underlying file upon which we can create the checksum. We should be able to create a
// checksum still by iterating through once, however this will be an expensive operation.
.uploadFinalizer((bool) -> {})
.doRemoteDataIntegrityCheck(false)
.expectedChecksum(null)
.build();

AtomicReference<Exception> exception = new AtomicReference<>();
final CountDownLatch latch = new CountDownLatch(1);
((AsyncMultiStreamBlobContainer) blobContainer).asyncBlobUpload(
writeContext,
new LatchedActionListener<>(new ActionListener<>() {
@Override
public void onResponse(Void unused) {
log.debug(
"Parallel vector upload succeeded for blob {} with size {}",
blobName + VECTOR_BLOB_FILE_EXTENSION,
vectorBlobLength
);
}

@Override
public void onFailure(Exception e) {
log.error(
"Parallel vector upload failed for blob {} with size {}",
blobName + VECTOR_BLOB_FILE_EXTENSION,
vectorBlobLength,
e
);
exception.set(e);
}
}, latch)
);

// Then upload doc id blob before waiting on vector uploads
// TODO: We wrap with a BufferedInputStream to support retries. We can tune this buffer size to optimize performance.
// Note: We do not use the parallel upload API here as the doc id blob will be much smaller than the vector blob
writeDocIds(knnVectorValuesSupplier.get(), vectorBlobLength, totalLiveDocs, blobName, blobContainer);
latch.await();
if (exception.get() != null) {
throw new IOException(exception.get());
}
} else {
log.debug("Repository {} Does Not Support Parallel Blob Upload", repository);
// Write Vectors
InputStream vectorStream = new BufferedInputStream(new VectorValuesInputStream(knnVectorValuesSupplier.get(), vectorDataType));
log.debug("Writing {} bytes for {} docs to {}", vectorBlobLength, totalLiveDocs, blobName + VECTOR_BLOB_FILE_EXTENSION);
blobContainer.writeBlob(blobName + VECTOR_BLOB_FILE_EXTENSION, vectorStream, vectorBlobLength, true);
// Then write doc ids
writeDocIds(knnVectorValuesSupplier.get(), vectorBlobLength, totalLiveDocs, blobName, blobContainer);
}
}

/**
* Helper method for uploading doc ids to repository, as it's re-used in both parallel and sequential upload cases
* @param knnVectorValues
* @param vectorBlobLength
* @param totalLiveDocs
* @param blobName
* @param blobContainer
* @throws IOException
*/
private void writeDocIds(
KNNVectorValues<?> knnVectorValues,
long vectorBlobLength,
long totalLiveDocs,
String blobName,
BlobContainer blobContainer
) throws IOException {
InputStream docStream = new BufferedInputStream(new DocIdInputStream(knnVectorValues));
log.debug(
"Writing {} bytes for {} docs ids to {}",
vectorBlobLength,
totalLiveDocs * Integer.BYTES,
blobName + DOC_ID_FILE_EXTENSION
);
blobContainer.writeBlob(blobName + DOC_ID_FILE_EXTENSION, docStream, totalLiveDocs * Integer.BYTES, true);
}

/**
* Returns a {@link org.opensearch.common.StreamContext}. Intended to be invoked as a {@link org.opensearch.common.blobstore.stream.write.StreamContextSupplier},
* which takes the partSize determined by the repository implementation and calculates the number of parts as well as handles the last part of the stream.
*
* @see DefaultVectorRepositoryAccessor#getTransferPartStreamSupplier
*
* @param partSize Size of each InputStream to be uploaded in parallel. Provided by repository implementation
* @param vectorBlobLength Total size of the vectors across all InputStreams
* @param knnVectorValuesSupplier Supplier for {@link KNNVectorValues}
* @param vectorDataType Data type of the vector (FLOAT, BYTE, BINARY)
* @return a {@link org.opensearch.common.StreamContext} with a function that will create {@link InputStream}s of {@param partSize}
*/
private StreamContext getStreamContext(
long partSize,
long vectorBlobLength,
Supplier<KNNVectorValues<?>> knnVectorValuesSupplier,
VectorDataType vectorDataType
) {
long lastPartSize = (vectorBlobLength % partSize) != 0 ? vectorBlobLength % partSize : partSize;
int numberOfParts = (int) ((vectorBlobLength % partSize) == 0 ? vectorBlobLength / partSize : (vectorBlobLength / partSize) + 1);
return new StreamContext(
getTransferPartStreamSupplier(knnVectorValuesSupplier, vectorDataType),
partSize,
lastPartSize,
numberOfParts
);
}

/**
* This method handles creating {@link VectorValuesInputStream}s based on the part number, the requested size of the stream part, and the position that the stream starts at within the underlying {@link KNNVectorValues}
*
* @param knnVectorValuesSupplier Supplier for {@link KNNVectorValues}
* @param vectorDataType Data type of the vector (FLOAT, BYTE, BINARY)
* @return a function with which the repository implementation will use to create {@link VectorValuesInputStream}s of specific sizes and start positions.
*/
private CheckedTriFunction<Integer, Long, Long, InputStreamContainer, IOException> getTransferPartStreamSupplier(
Supplier<KNNVectorValues<?>> knnVectorValuesSupplier,
VectorDataType vectorDataType
) {
return ((partNo, size, position) -> {
log.info("Creating InputStream for partNo: {}, size: {}, position: {}", partNo, size, position);
VectorValuesInputStream vectorValuesInputStream = new VectorValuesInputStream(
knnVectorValuesSupplier.get(),
vectorDataType,
position,
size
);
return new InputStreamContainer(vectorValuesInputStream, size, position);
});
}
}
Loading

0 comments on commit 5873add

Please sign in to comment.