Skip to content

Commit

Permalink
Merge #274
Browse files Browse the repository at this point in the history
274: Add support for documents as NDJSON string r=alallema a=juchom

# Pull Request

## What does this PR do?
Add missing NDJSON methods

This one should add the missing methods in order to close #182



Co-authored-by: Julien Chomarat <j.chomarat@linoa.com>
  • Loading branch information
bors[bot] and juchom authored May 31, 2022
2 parents da0912f + bb86d76 commit c8aacc3
Show file tree
Hide file tree
Showing 6 changed files with 471 additions and 1 deletion.
46 changes: 46 additions & 0 deletions src/Meilisearch/Extensions/StringExtensions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -76,5 +76,51 @@ internal static IEnumerable<string> GetCsvChunks(this string csvString, int chun
}
}
}

/// <summary>
/// Returns chunks from a NDJSON string.
/// </summary>
/// <param name="ndjsonString">The NDJSON string to split.</param>
/// <param name="chunkSize">Size of the chunks.</param>
/// <returns>List of NDJSON string.</returns>
/// <exception cref="ArgumentNullException">Thrown if ndjsonString is null.</exception>
/// <exception cref="ArgumentException">Throw if chunkSize is lower than 1.</exception>
internal static IEnumerable<string> GetNdjsonChunks(this string ndjsonString, int chunkSize)
{
if (string.IsNullOrWhiteSpace(ndjsonString))
{
throw new ArgumentNullException(nameof(ndjsonString));
}

if (chunkSize < 1)
{
throw new ArgumentException("chunkSize value must be greater than 0", nameof(chunkSize));
}

using (var sr = new StringReader(ndjsonString))
{
var sb = new StringBuilder();
var line = "";
var lineNumber = 0;
while ((line = sr.ReadLine()) != null)
{
sb.AppendLine(line);
++lineNumber;

if (lineNumber % chunkSize == 0)
{
// We return our chunk, we clear our string builder
yield return sb.ToString();
sb.Clear();
}
}

// After the last line we check if we have something to send
if (lineNumber % chunkSize != 0)
{
yield return sb.ToString();
}
}
}
}
}
88 changes: 87 additions & 1 deletion src/Meilisearch/Index.Documents.cs
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,29 @@ public async Task<TaskInfo> AddDocumentsCsvAsync(string documents, string primar
.ConfigureAwait(false);
}

/// <summary>
/// Add documents from NDJSON string.
/// </summary>
/// <param name="documents">Documents to add as NDJSON string.</param>
/// <param name="primaryKey">Primary key for the documents.</param>
/// <param name="cancellationToken">The cancellation token for this call.</param>
/// <returns>Returns the task info.</returns>
public async Task<TaskInfo> AddDocumentsNdjsonAsync(string documents, string primaryKey = default,
CancellationToken cancellationToken = default)
{
var uri = $"indexes/{Uid}/documents";

if (primaryKey != default)
{
uri = $"{uri}?{new { primaryKey = primaryKey }.ToQueryString()}";
}

var content = new StringContent(documents, Encoding.UTF8, ContentType.Ndjson);
var responseMessage = await _http.PostAsync(uri, content, cancellationToken).ConfigureAwait(false);
return await responseMessage.Content.ReadFromJsonAsync<TaskInfo>(cancellationToken: cancellationToken)
.ConfigureAwait(false);
}

/// <summary>
/// Adds documents in batches with size specified with <paramref name="batchSize"/>.
/// </summary>
Expand All @@ -107,7 +130,7 @@ public async Task<IEnumerable<TaskInfo>> AddDocumentsInBatchesAsync<T>(IEnumerab
/// <summary>
/// Adds documents from CSV string in batches with size specified with <paramref name="batchSize"/>.
/// </summary>
/// <param name="documents">Documents to add.</param>
/// <param name="documents">Documents to add as CSV string.</param>
/// <param name="batchSize">Size of documents batches while adding them.</param>
/// <param name="primaryKey">Primary key for the documents.</param>
/// <param name="cancellationToken">The cancellation token for this call.</param>
Expand All @@ -124,6 +147,26 @@ public async Task<IEnumerable<TaskInfo>> AddDocumentsCsvInBatchesAsync(string do
return tasks;
}

/// <summary>
/// Adds documents from NDJSON string in batches with size specified with <paramref name="batchSize"/>.
/// </summary>
/// <param name="documents">Documents to add as NDJSON string.</param>
/// <param name="batchSize">Size of documents batches while adding them.</param>
/// <param name="primaryKey">Primary key for the documents.</param>
/// <param name="cancellationToken">The cancellation token for this call.</param>
/// <returns>Returns the task list.</returns>
public async Task<IEnumerable<TaskInfo>> AddDocumentsNdjsonInBatchesAsync(string documents,
int batchSize = 1000, string primaryKey = default, CancellationToken cancellationToken = default)
{
var tasks = new List<TaskInfo>();
foreach (var chunk in documents.GetNdjsonChunks(batchSize))
{
tasks.Add(await AddDocumentsNdjsonAsync(chunk, primaryKey, cancellationToken).ConfigureAwait(false));
}

return tasks;
}

/// <summary>
/// Update documents.
/// </summary>
Expand Down Expand Up @@ -195,6 +238,29 @@ public async Task<TaskInfo> UpdateDocumentsCsvAsync(string documents, string pri
.ConfigureAwait(false);
}

/// <summary>
/// Update documents from NDJSON string.
/// </summary>
/// <param name="documents">Documents to add as NDJSON string.</param>
/// <param name="primaryKey">Primary key for the documents.</param>
/// <param name="cancellationToken">The cancellation token for this call.</param>
/// <returns>Returns the task info.</returns>
public async Task<TaskInfo> UpdateDocumentsNdjsonAsync(string documents, string primaryKey = default,
CancellationToken cancellationToken = default)
{
var uri = $"indexes/{Uid}/documents";

if (primaryKey != default)
{
uri = $"{uri}?{new { primaryKey = primaryKey }.ToQueryString()}";
}

var content = new StringContent(documents, Encoding.UTF8, ContentType.Ndjson);
var responseMessage = await _http.PutAsync(uri, content, cancellationToken).ConfigureAwait(false);
return await responseMessage.Content.ReadFromJsonAsync<TaskInfo>(cancellationToken: cancellationToken)
.ConfigureAwait(false);
}

/// <summary>
/// Updates documents in batches with size specified with <paramref name="batchSize"/>.
/// </summary>
Expand Down Expand Up @@ -236,6 +302,26 @@ public async Task<IEnumerable<TaskInfo>> UpdateDocumentsCsvInBatchesAsync(string
return tasks;
}

/// <summary>
/// Updates documents as NDJSON string in batches with size specified with <paramref name="batchSize"/>.
/// </summary>
/// <param name="documents">Documents to update from NDJSON string.</param>
/// <param name="batchSize">Size of documents batches while updating them.</param>
/// <param name="primaryKey">Primary key for the documents.</param>
/// <param name="cancellationToken">The cancellation token for this call.</param>
/// <returns>Returns the task list.</returns>
public async Task<IEnumerable<TaskInfo>> UpdateDocumentsNdjsonInBatchesAsync(string documents,
int batchSize = 1000, string primaryKey = default, CancellationToken cancellationToken = default)
{
var tasks = new List<TaskInfo>();
foreach (var chunk in documents.GetNdjsonChunks(batchSize))
{
tasks.Add(await UpdateDocumentsNdjsonAsync(chunk, primaryKey, cancellationToken).ConfigureAwait(false));
}

return tasks;
}

/// <summary>
/// Get document by its ID.
/// </summary>
Expand Down
1 change: 1 addition & 0 deletions tests/Meilisearch.Tests/Datasets.cs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ internal static class Datasets
private static readonly string BasePath = Path.Combine(Directory.GetCurrentDirectory(), "Datasets");
public static readonly string SmallMoviesJson = Path.Combine(BasePath, "small_movies.json");
public static readonly string SongsCsv = Path.Combine(BasePath, "songs.csv");
public static readonly string SongsNdjson = Path.Combine(BasePath, "songs.ndjson");
}

public class DatasetSmallMovie
Expand Down
Loading

0 comments on commit c8aacc3

Please sign in to comment.