Skip to content

Commit

Permalink
feat(clients): add chunkedBatch helper for Kotlin and Scala (#3206)
Browse files Browse the repository at this point in the history
  • Loading branch information
Fluf22 authored Jun 18, 2024
1 parent 21490f8 commit fc1752b
Show file tree
Hide file tree
Showing 2 changed files with 129 additions and 24 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -276,6 +276,49 @@ public suspend fun SearchClient.searchForFacets(
).results.map { it as SearchForFacetValuesResponse }
}

/**
* Helper: Chunks the given `objects` list in subset of 1000 elements max to make it fit in `batch` requests.
*
* @param indexName The index in which to perform the request.
* @param records The list of objects to index.
* @param serializer The serializer to use for the objects.
* @param action The action to perform on the objects. Default is `Action.AddObject`.
* @param waitForTask If true, wait for the task to complete.
* @param batchSize The size of the batch. Default is 1000.
* @param requestOptions The requestOptions to send along with the query, they will be merged with the transporter requestOptions.
* @return The list of responses from the batch requests.
*
*/
public suspend fun <T> SearchClient.chunkedBatch(
indexName: String,
records: List<T>,
serializer: KSerializer<T>,
action: Action = Action.AddObject,
waitForTask: Boolean,
batchSize: Int = 1000,
requestOptions: RequestOptions? = null,
): List<BatchResponse> {
val tasks = mutableListOf<BatchResponse>()
records.chunked(batchSize).forEach { chunk ->
val requests = chunk.map {
BatchRequest(
action = action,
body = options.json.encodeToJsonElement(serializer, it).jsonObject
)
}
val batch = batch(
indexName = indexName,
batchWriteParams = BatchWriteParams(requests),
requestOptions = requestOptions,
)
tasks.add(batch)
}
if (waitForTask) {
tasks.forEach { waitTask(indexName, it.taskID) }
}
return tasks
}

/**
* Push a new set of objects and remove all previous ones. Settings, synonyms and query rules are untouched.
* Replace all objects in an index without any downtime.
Expand All @@ -284,22 +327,19 @@ public suspend fun SearchClient.searchForFacets(
*
* See https://api-clients-automation.netlify.app/docs/contributing/add-new-api-client#5-helpers for implementation details.
*
* @param serializer [KSerializer] of type [T] for serialization.
* @param indexName The index in which to perform the request.
* @param records The list of records to replace.
* @return intermediate operations (index name to task ID).
* @param serializer [KSerializer] of type [T] for serialization.
* @param batchSize The size of the batch. Default is 1000.
* @return responses from the three-step operations: copy, batch, move.
*/
public suspend fun <T> SearchClient.replaceAllObjects(
indexName: String,
serializer: KSerializer<T>,
records: List<T>,
serializer: KSerializer<T>,
batchSize: Int = 1000,
requestOptions: RequestOptions?,
): List<Long> {
if (records.isEmpty()) return emptyList()

val requests = records.map { record ->
val body = options.json.encodeToJsonElement(serializer, record).jsonObject
BatchRequest(action = Action.AddObject, body = body)
}
): ReplaceAllObjectsResponse {
val tmpIndexName = "${indexName}_tmp_${Random.nextInt(from = 0, until = 100)}"

var copy = operationIndex(
Expand All @@ -312,12 +352,16 @@ public suspend fun <T> SearchClient.replaceAllObjects(
requestOptions = requestOptions,
)

val batch = batch(
val batchResponses = this.chunkedBatch(
indexName = tmpIndexName,
batchWriteParams = BatchWriteParams(requests),
records = records,
serializer = serializer,
action = Action.AddObject,
waitForTask = true,
batchSize = batchSize,
requestOptions = requestOptions,
)
waitTask(indexName = tmpIndexName, taskID = batch.taskID)

waitTask(indexName = tmpIndexName, taskID = copy.taskID)

copy = operationIndex(
Expand All @@ -338,7 +382,7 @@ public suspend fun <T> SearchClient.replaceAllObjects(
)
waitTask(indexName = tmpIndexName, taskID = move.taskID)

return listOf(copy.taskID, batch.taskID, move.taskID)
return ReplaceAllObjectsResponse(copy, batchResponses, move)
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -193,6 +193,57 @@ package object extension {
Future.successful(true)
}

/** Helper: Chunks the given `objects` list in subset of 1000 elements max to make it fit in `batch` requests.
*
* @param indexName
* The index in which to perform the request.
* @param records
* The list of records to replace.
* @param action
* The action to perform on the records.
* @param waitForTasks
* Whether to wait for the tasks to complete.
* @param batchSize
* The size of the batch. Default is 1000.
* @param requestOptions
* Additional request configuration.
* @return
* A future containing the response of the batch operations.
*/
def chunkedBatch(
indexName: String,
records: Seq[Any],
action: Action = Action.AddObject,
waitForTasks: Boolean,
batchSize: Int = 1000,
requestOptions: Option[RequestOptions] = None
)(implicit ec: ExecutionContext): Future[Seq[BatchResponse]] = {
var futures = Seq.empty[Future[BatchResponse]]
records.grouped(batchSize).foreach { chunk =>
val requests = chunk.map { record =>
BatchRequest(action = action, body = record)
}
val future = client.batch(
indexName = indexName,
batchWriteParams = BatchWriteParams(requests),
requestOptions = requestOptions
)
futures = futures :+ future
}

val responses = Future.sequence(futures)

if (waitForTasks) {
responses.foreach { tasks =>
tasks.foreach { task =>
client.waitTask(indexName, task.taskID, requestOptions = requestOptions)
}
}
}

responses
}

/** Push a new set of objects and remove all previous ones. Settings, synonyms and query rules are untouched.
* Replace all objects in an index without any downtime. Internally, this method copies the existing index
* settings, synonyms and query rules and indexes all passed objects. Finally, the temporary one replaces the
Expand All @@ -205,16 +256,19 @@ package object extension {
* The index in which to perform the request.
* @param records
* The list of records to replace.
* @param batchSize
* The size of the batch. Default is 1000.
* @param requestOptions
* Additional request configuration.
* @return
* intermediate operations (task IDs).
* A future containing the response of the three-step operations: copy, batch and move.
*/
def replaceAllObjects(
indexName: String,
records: Seq[Any],
batchSize: Int = 1000,
requestOptions: Option[RequestOptions] = None
)(implicit ec: ExecutionContext): Future[Seq[Long]] = {
if (records.isEmpty) return Future.successful(Seq.empty)

)(implicit ec: ExecutionContext): Future[ReplaceAllObjectsResponse] = {
val requests = records.map { record =>
BatchRequest(action = Action.AddObject, body = record)
}
Expand All @@ -231,12 +285,15 @@ package object extension {
requestOptions = requestOptions
)

batch <- client.batch(
batchResponses <- chunkedBatch(
indexName = tmpIndexName,
batchWriteParams = BatchWriteParams(requests),
records = records,
action = Action.AddObject,
waitForTasks = true,
batchSize = batchSize,
requestOptions = requestOptions
)
_ <- client.waitTask(indexName = tmpIndexName, taskID = batch.taskID, requestOptions = requestOptions)

_ <- client.waitTask(indexName = tmpIndexName, taskID = copy.taskID, requestOptions = requestOptions)

copy <- client.operationIndex(
Expand All @@ -250,13 +307,17 @@ package object extension {
)
_ <- client.waitTask(indexName = tmpIndexName, taskID = copy.taskID, requestOptions = requestOptions)

replace <- client.operationIndex(
move <- client.operationIndex(
indexName = tmpIndexName,
operationIndexParams = OperationIndexParams(operation = OperationType.Move, destination = indexName),
requestOptions = requestOptions
)
_ <- client.waitTask(indexName = tmpIndexName, taskID = replace.taskID, requestOptions = requestOptions)
} yield Seq(copy.taskID, batch.taskID, replace.taskID)
_ <- client.waitTask(indexName = tmpIndexName, taskID = move.taskID, requestOptions = requestOptions)
} yield ReplaceAllObjectsResponse(
copyOperationResponse = copy,
batchResponses = batchResponses,
moveOperationResponse = move
)
}
}
}

1 comment on commit fc1752b

@github-actions
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please sign in to comment.