Skip to content

Commit

Permalink
fix(clients): safer replaceAllObjects + metis compliant (#3164)
Browse files Browse the repository at this point in the history
  • Loading branch information
shortcuts authored Jun 12, 2024
1 parent 3f692ae commit de40907
Show file tree
Hide file tree
Showing 14 changed files with 257 additions and 110 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -410,6 +410,7 @@ private static async Task<T> RetryUntil<T>(Func<Task<T>> func, Func<T, bool> val
/// Push a new set of objects and remove all previous ones. Settings, synonyms and query rules are untouched.
/// Replace all objects in an index without any downtime. Internally, this method copies the existing index settings, synonyms and query rules and indexes all passed objects.
/// Finally, the temporary one replaces the existing index. (Synchronous version)
/// See https://api-clients-automation.netlify.app/docs/contributing/add-new-api-client#5-helpers for implementation details.
/// </summary>
/// <param name="indexName">The index in which to perform the request.</param>
/// <param name="objects">The list of `objects` to store in the given Algolia `indexName`.</param>
Expand All @@ -424,6 +425,7 @@ public ReplaceAllObjectsResponse ReplaceAllObjects<T>(string indexName, IEnumera
/// Push a new set of objects and remove all previous ones. Settings, synonyms and query rules are untouched.
/// Replace all objects in an index without any downtime. Internally, this method copies the existing index settings, synonyms and query rules and indexes all passed objects.
/// Finally, the temporary one replaces the existing index.
/// See https://api-clients-automation.netlify.app/docs/contributing/add-new-api-client#5-helpers for implementation details.
/// </summary>
/// <param name="indexName">The index in which to perform the request.</param>
/// <param name="objects">The list of `objects` to store in the given Algolia `indexName`.</param>
Expand All @@ -441,20 +443,24 @@ public async Task<ReplaceAllObjectsResponse> ReplaceAllObjectsAsync<T>(string in
var rnd = new Random();
var tmpIndexName = $"{indexName}_tmp_{rnd.Next(100)}";

// Copy settings, synonyms and query rules into the temporary index
var copyResponse = await OperationIndexAsync(indexName,
new OperationIndexParams(OperationType.Copy, tmpIndexName)
{ Scope = [ScopeType.Rules, ScopeType.Settings, ScopeType.Synonyms] }, options, cancellationToken)
.ConfigureAwait(false);

await WaitForTaskAsync(indexName, copyResponse.TaskID, requestOptions: options, ct: cancellationToken)
.ConfigureAwait(false);

// Add objects to the temporary index
var batchResponse = await ChunkedBatchAsync(tmpIndexName, objects, Action.AddObject, batchSize,
options, cancellationToken).ConfigureAwait(false);

// Move the temporary index to the main one
await WaitForTaskAsync(tmpIndexName, copyResponse.TaskID, requestOptions: options, ct: cancellationToken)
.ConfigureAwait(false);

copyResponse = await OperationIndexAsync(indexName,
new OperationIndexParams(OperationType.Copy, tmpIndexName)
{ Scope = [ScopeType.Rules, ScopeType.Settings, ScopeType.Synonyms] }, options, cancellationToken)
.ConfigureAwait(false);
await WaitForTaskAsync(tmpIndexName, copyResponse.TaskID, requestOptions: options, ct: cancellationToken)
.ConfigureAwait(false);

var moveResponse = await OperationIndexAsync(tmpIndexName,
new OperationIndexParams(OperationType.Move, indexName), options, cancellationToken)
.ConfigureAwait(false);
Expand Down
2 changes: 1 addition & 1 deletion clients/algoliasearch-client-go/.golangci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ linters:
- wsl
- varnamelen
- nlreturn
- goerr113
- err113
- gochecknoglobals
- exhaustruct
- exhaustive
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -282,6 +282,8 @@ public suspend fun SearchClient.searchForFacets(
* Internally, this method copies the existing index settings, synonyms and query rules and indexes all
* passed objects. Finally, the temporary one replaces the existing index.
*
* See https://api-clients-automation.netlify.app/docs/contributing/add-new-api-client#5-helpers for implementation details.
*
* @param serializer [KSerializer] of type [T] for serialization.
* @param records The list of records to replace.
* @return intermediate operations (index name to task ID).
Expand All @@ -298,37 +300,44 @@ public suspend fun <T> SearchClient.replaceAllObjects(
val body = options.json.encodeToJsonElement(serializer, record).jsonObject
BatchRequest(action = Action.AddObject, body = body)
}
val destinationIndex = "${indexName}_tmp_${Random.nextInt(from = 0, until = 100)}"
val tmpIndexName = "${indexName}_tmp_${Random.nextInt(from = 0, until = 100)}"

// 1. Copy index resources
val copy = operationIndex(
var copy = operationIndex(
indexName = indexName,
operationIndexParams = OperationIndexParams(
operation = OperationType.Copy,
destination = destinationIndex,
destination = tmpIndexName,
scope = listOf(ScopeType.Settings, ScopeType.Rules, ScopeType.Synonyms),
),
requestOptions = requestOptions,
)
waitTask(indexName = indexName, taskID = copy.taskID)

// 2. Save new objects
val batch = batch(
indexName = destinationIndex,
indexName = tmpIndexName,
batchWriteParams = BatchWriteParams(requests),
requestOptions = requestOptions,
)
waitTask(indexName = destinationIndex, taskID = batch.taskID)
waitTask(indexName = tmpIndexName, taskID = batch.taskID)
waitTask(indexName = tmpIndexName, taskID = copy.taskID)

copy = operationIndex(
indexName = indexName,
operationIndexParams = OperationIndexParams(
operation = OperationType.Copy,
destination = tmpIndexName,
scope = listOf(ScopeType.Settings, ScopeType.Rules, ScopeType.Synonyms),
),
requestOptions = requestOptions,
)
waitTask(indexName = tmpIndexName, taskID = copy.taskID)

// 3. Move temporary index to source index
val move = operationIndex(
indexName = destinationIndex,
indexName = tmpIndexName,
operationIndexParams = OperationIndexParams(operation = OperationType.Move, destination = indexName),
requestOptions = requestOptions,
)
waitTask(indexName = destinationIndex, taskID = move.taskID)
waitTask(indexName = tmpIndexName, taskID = move.taskID)

// 4. Return the list of operations
return listOf(copy.taskID, batch.taskID, move.taskID)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -198,6 +198,9 @@ package object extension {
* settings, synonyms and query rules and indexes all passed objects. Finally, the temporary one replaces the
* existing index.
*
* See https://api-clients-automation.netlify.app/docs/contributing/add-new-api-client#5-helpers for implementation
* details.
*
* @param indexName
* The index in which to perform the request.
* @param records
Expand All @@ -215,31 +218,44 @@ package object extension {
val requests = records.map { record =>
BatchRequest(action = Action.AddObject, body = record)
}
val destinationIndex = s"${indexName}_tmp_${scala.util.Random.nextInt(100)}"
val tmpIndexName = s"${indexName}_tmp_${scala.util.Random.nextInt(100)}"

for {
copy <- client.operationIndex(
indexName = indexName,
operationIndexParams = OperationIndexParams(
operation = OperationType.Copy,
destination = destinationIndex,
destination = tmpIndexName,
scope = Some(Seq(ScopeType.Settings, ScopeType.Rules, ScopeType.Synonyms))
),
requestOptions = requestOptions
)
_ <- client.waitTask(indexName = destinationIndex, taskID = copy.taskID, requestOptions = requestOptions)

batch <- client.batch(
indexName = destinationIndex,
indexName = tmpIndexName,
batchWriteParams = BatchWriteParams(requests),
requestOptions = requestOptions
) // 3. update the copy
_ <- client.waitTask(indexName = destinationIndex, taskID = batch.taskID, requestOptions = requestOptions)
)
_ <- client.waitTask(indexName = tmpIndexName, taskID = batch.taskID, requestOptions = requestOptions)
_ <- client.waitTask(indexName = tmpIndexName, taskID = copy.taskID, requestOptions = requestOptions)

copy <- client.operationIndex(
indexName = indexName,
operationIndexParams = OperationIndexParams(
operation = OperationType.Copy,
destination = tmpIndexName,
scope = Some(Seq(ScopeType.Settings, ScopeType.Rules, ScopeType.Synonyms))
),
requestOptions = requestOptions
)
_ <- client.waitTask(indexName = tmpIndexName, taskID = copy.taskID, requestOptions = requestOptions)

replace <- client.operationIndex(
indexName = destinationIndex,
indexName = tmpIndexName,
operationIndexParams = OperationIndexParams(operation = OperationType.Move, destination = indexName),
requestOptions = requestOptions
)
_ <- client.waitTask(indexName = indexName, taskID = replace.taskID, requestOptions = requestOptions)
_ <- client.waitTask(indexName = tmpIndexName, taskID = replace.taskID, requestOptions = requestOptions)
} yield Seq(copy.taskID, batch.taskID, replace.taskID)
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -457,6 +457,8 @@ public extension SearchClient {
}

/// Replace all objects in an index
///
/// See https://api-clients-automation.netlify.app/docs/contributing/add-new-api-client#5-helpers for implementation details.
/// - parameter objects: The new objects
/// - parameter indexName: The name of the index where to replace the objects
/// - parameter requestOptions: The request options
Expand All @@ -470,8 +472,7 @@ public extension SearchClient {
) async throws -> ReplaceAllObjectsResponse {
let tmpIndexName = try "\(indexName)_tmp_\(randomString())"

// Copy all index resources from production index
let copyOperationResponse = try await operationIndex(
var copyOperationResponse = try await operationIndex(
indexName: indexName,
operationIndexParams: OperationIndexParams(
operation: .copy,
Expand All @@ -481,18 +482,26 @@ public extension SearchClient {
requestOptions: requestOptions
)

try await self.waitForTask(with: copyOperationResponse.taskID, in: indexName)

// Send records to the tmp index (batched)
let batchResponses = try await self.chunkedBatch(
indexName: tmpIndexName,
objects: objects,
waitForTasks: true,
batchSize: batchSize,
requestOptions: requestOptions
)
try await self.waitForTask(with: copyOperationResponse.taskID, in: tmpIndexName)

copyOperationResponse = try await operationIndex(
indexName: indexName,
operationIndexParams: OperationIndexParams(
operation: .copy,
destination: tmpIndexName,
scope: [.rules, .settings, .synonyms]
),
requestOptions: requestOptions
)
try await self.waitForTask(with: copyOperationResponse.taskID, in: tmpIndexName)

// Move the temporary index to replace the main one
let moveOperationResponse = try await self.operationIndex(
indexName: tmpIndexName,
operationIndexParams: OperationIndexParams(
Expand All @@ -501,7 +510,6 @@ public extension SearchClient {
),
requestOptions: requestOptions
)

try await self.waitForTask(with: moveOperationResponse.taskID, in: tmpIndexName)

return ReplaceAllObjectsResponse(
Expand Down
2 changes: 1 addition & 1 deletion playground/python/app/search.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ async def main():

try:
resp = await client.replace_all_objects(
index_name="test_replace_all_objects",
index_name="newoneeverytime",
objects=[{"name": f"John Doe{i}", "objectID": f"fff2bd4d-bb17-4e21-a0c4-0a8ea5e363f2{i}" } for i in range(33)],
batch_size=10
)
Expand Down
25 changes: 18 additions & 7 deletions templates/go/search_helpers.mustache
Original file line number Diff line number Diff line change
Expand Up @@ -516,32 +516,43 @@ func (c *APIClient) ChunkedBatch(indexName string, objects []map[string]any, act
}

// ReplaceAllObjects replaces all objects (records) in the given `indexName` with the given `objects`. A temporary index is created during this process in order to backup your data.
// See https://api-clients-automation.netlify.app/docs/contributing/add-new-api-client#5-helpers for implementation details.
func (c *APIClient) ReplaceAllObjects(indexName string, objects []map[string]any, batchSize *int) (*ReplaceAllObjectsResponse, error) {
tmpIndex := fmt.Sprintf("%s_tmp_%d", indexName, time.Now().UnixNano())
tmpIndexName := fmt.Sprintf("%s_tmp_%d", indexName, time.Now().UnixNano())
copyResp, err := c.OperationIndex(c.NewApiOperationIndexRequest(indexName, NewOperationIndexParams(OPERATIONTYPE_COPY, tmpIndex, WithOperationIndexParamsScope([]ScopeType{SCOPETYPE_RULES, SCOPETYPE_SETTINGS, SCOPETYPE_SYNONYMS}))))
copyResp, err := c.OperationIndex(c.NewApiOperationIndexRequest(indexName, NewOperationIndexParams(OPERATIONTYPE_COPY, tmpIndexName, WithOperationIndexParamsScope([]ScopeType{SCOPETYPE_RULES, SCOPETYPE_SETTINGS, SCOPETYPE_SYNONYMS}))))
if err != nil {
return nil, err
}

_, err = c.WaitForTask(indexName, copyResp.TaskID, nil, nil, nil)
waitForTask := true

batchResp, err := c.ChunkedBatch(tmpIndexName, objects, nil, &waitForTask, batchSize)
if err != nil {
return nil, err
}

waitForTask := true
_, err = c.WaitForTask(tmpIndexName, copyResp.TaskID, nil, nil, nil)
if err != nil {
return nil, err
}

copyResp, err = c.OperationIndex(c.NewApiOperationIndexRequest(indexName, NewOperationIndexParams(OPERATIONTYPE_COPY, tmpIndexName, WithOperationIndexParamsScope([]ScopeType{SCOPETYPE_RULES, SCOPETYPE_SETTINGS, SCOPETYPE_SYNONYMS}))))
if err != nil {
return nil, err
}

batchResp, err := c.ChunkedBatch(tmpIndex, objects, nil, &waitForTask, batchSize)
_, err = c.WaitForTask(tmpIndexName, copyResp.TaskID, nil, nil, nil)
if err != nil {
return nil, err
}

moveResp, err := c.OperationIndex(c.NewApiOperationIndexRequest(tmpIndex, NewOperationIndexParams(OPERATIONTYPE_MOVE, indexName)))
moveResp, err := c.OperationIndex(c.NewApiOperationIndexRequest(tmpIndexName, NewOperationIndexParams(OPERATIONTYPE_MOVE, indexName)))
if err != nil {
return nil, err
}

_, err = c.WaitForTask(indexName, moveResp.TaskID, nil, nil, nil)
_, err = c.WaitForTask(tmpIndexName, moveResp.TaskID, nil, nil, nil)
if err != nil {
return nil, err
}
Expand Down
18 changes: 16 additions & 2 deletions templates/java/api_helpers.mustache
Original file line number Diff line number Diff line change
Expand Up @@ -579,6 +579,7 @@ return responses;
/**
* Push a new set of objects and remove all previous ones. Settings, synonyms and query rules are
* untouched. Replace all records in an index without any downtime.
* See https://api-clients-automation.netlify.app/docs/contributing/add-new-api-client#5-helpers for implementation details.
*
* @param indexName The `indexName` to replace `objects` in.
* @param objects The array of `objects` to store in the given Algolia `indexName`.
Expand Down Expand Up @@ -608,18 +609,31 @@ UpdatedAtResponse copyOperationResponse = operationIndex(
.addScope(ScopeType.SYNONYMS),
requestOptions
);
waitForTask(indexName, copyOperationResponse.getTaskID(), requestOptions);
// Save new objects
List<BatchResponse> batchResponses = chunkedBatch(tmpIndexName, objects, Action.ADD_OBJECT, true, batchSize, requestOptions);
waitForTask(tmpIndexName, copyOperationResponse.getTaskID(), requestOptions);
copyOperationResponse = operationIndex(
indexName,
new OperationIndexParams()
.setOperation(OperationType.COPY)
.setDestination(tmpIndexName)
.addScope(ScopeType.SETTINGS)
.addScope(ScopeType.RULES)
.addScope(ScopeType.SYNONYMS),
requestOptions
);
waitForTask(tmpIndexName, copyOperationResponse.getTaskID(), requestOptions);
// Move temporary index to source index
UpdatedAtResponse moveOperationResponse = operationIndex(
tmpIndexName,
new OperationIndexParams().setOperation(OperationType.MOVE).setDestination(indexName),
requestOptions
);
waitForTask(indexName, moveOperationResponse.getTaskID(), requestOptions);
waitForTask(tmpIndexName, moveOperationResponse.getTaskID(), requestOptions);
return new ReplaceAllObjectsResponse()
.setCopyOperationResponse(copyOperationResponse)
Expand Down
Loading

1 comment on commit de40907

@github-actions
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please sign in to comment.