- 
                Notifications
    
You must be signed in to change notification settings  - Fork 649
 
Add client-side async API for replication (IAsyncReplicator, ReplicationClient) #1182
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Add client-side async API for replication (IAsyncReplicator, ReplicationClient) #1182
Conversation
…lient-side async API, related to #XXXX)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the PR! I have some changes that need to be made, and a larger one about a refactoring that is up for discussion. Please share your thoughts on the refactoring before embarking on it!
| /// <returns> | ||
| /// A <see cref="SessionToken"/> if updates are available; otherwise, <c>null</c>. | ||
| /// </returns> | ||
| public async Task<SessionToken?> CheckForUpdateAsync(string currentVersion, CancellationToken cancellationToken = default) | 
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please add #nullable enable to the top of this file to resolve these warnings, and then if that exposes other nullability issues in the file, please fix them.
| /// </summary> | ||
| /// <param name="currentVersion">Current version of the index.</param> | ||
| /// <param name="cancellationToken">Cancellation token.</param> | ||
| Task<SessionToken?> CheckForUpdateAsync(string currentVersion, CancellationToken cancellationToken = default); | 
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Add #nullable enable to this file to resolve these warnings.
| if (asyncReplicator is null) | ||
| throw new InvalidOperationException("AsyncReplicator not initialized."); | ||
| 
               | 
          ||
| SessionToken? session = null; | 
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please add #nullable enable to this file to resolve these warnings, and fix any nullability issues that might arise as a result.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@paulirwin @NightOwl888
can you pls suggest better way to adjust whole file after adding #nullable enable
beacause once i add that and try to resolve fields
it actually 2x or 3x my warnings after every build ie from 32 -> 84 -> 153 warnings
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
#nullable enable can be turned off again using #nullable restore. While it is preferable to solve all of the nullable issues rather than ignore them, this at least lets us put them off until they can be considered separately from ongoing work. I would highly recommend that the new code we are adding is fenced between #nullable enable and #nullable restore blocks, which will save us the work of having to do it all later.
| /// <param name="handler"></param> | ||
| /// <param name="factory"></param> | ||
| /// <exception cref="ArgumentNullException"></exception> | ||
| public ReplicationClient(IAsyncReplicator asyncReplicator, IReplicationHandler handler, ISourceDirectoryFactory factory) | 
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So, this is a brittle design. It means that you have to know which constructor was used in order to know which methods to call and not get an exception at runtime. I think we should break this type apart.
Let me know your thoughts on this:
- Refactor out a common ReplicationClientBase abstract class with any common methods/fields
 - Make this type inherit from the base class. Leave 
// LUCENENET:comments where methods were refactored into the base class, to aid future porting efforts. Move the async methods into item 3 below. - Introduce a new AsyncReplicationClient that inherits from ReplicationClientBase, and only has the async methods. This way you can know that it will have an async replicator field.
 
Thoughts?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I suppose it is unlikely a user will require both async and synchronous methods at the same time. And it would clarify the StartAsyncUpdateLoop vs StartUpdateThread() logic. Although, I think both of those could be potentially combined.
I am hesitant to get on board with having completely separate implementations, though. That is not typically how concrete components in the BCL evolve. Usually, the synchronous and asynchronous methods exist side by side.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
agree with  @NightOwl888  suggestion
I guess we should stick with one ReplicationClient that implements both IReplicator and IAsyncReplicator
initially maintaining or testing it quite seems tough
but i guess our future target is asyn only
So may be
what we can do rightnow
Mark sync APIs with [Obsolete("Prefer async methods to avoid deadlocks.")] to gently guide users toward async.
Long term: async becomes the default; sync kept only for backward compatibility.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That said, I’m also flexible — if you both feel strongly about splitting into separate sync/async clients right now to minimize maintenance and testing overhead, I can adapt to that approach too.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Mark sync APIs with [Obsolete("Prefer async methods to avoid deadlocks.")] to gently guide users toward async.
Long term: async becomes the default; sync kept only for backward compatibility.
No, synchronous programming isn't going away any time soon. It is just in the space of HTTP servers, that it is no longer considered a good practice. Also, keep in mind that when debugging we will be comparing our implementation against the synchronous Java code. So, we should try to minimize changes to these methods.
ReplicationClient is not sealed. It is designed to be inherited to provide additional functionality. So, in that regard, it is the abstraction and can provide the base implementation of both the synchronous and asynchronous sides. The question is since ReplicationClient can already be mocked by inheritance, do we need to add interfaces?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for clarifying! You’re right — marking sync methods [Obsolete] was premature. Sync APIs are still important for backward compatibility and debugging against Java. Since ReplicationClient can already be extended and mocked via inheritance, we don’t need separate interfaces for now.
| EnsureOpen(); | ||
| 
               | 
          ||
| // Acquire the same update lock to prevent concurrent updates | ||
| updateLock.Lock(); | 
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Question for @NightOwl888: is this async-safe?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
According to ChatGPT, no this is not. Calling async code while using a synchronous lock can lead to deadlocks.
Do note that in other places where the call is to Task.Run(), this is fine to do.
Options
- Use 
SemaphoreSliminstead ofReentrantLock. Note thatSemaphoreSlimis not re-entrant, though.- Safe across await.
 - Prevents reentrant calls, which may actually be a feature (forces clear lock discipline).
 - But if you really need recursion/reentrancy, this won’t do.
 
 - Implement an async-compatible reentrant lock
- This is tricky but possible.
 - You’d track:
- Owning Thread or Task.Id
 - A recursion count
 - A 
SemaphoreSlimfor waiting 
 - On EnterAsync, if the current context already owns the lock, increment recursion count and continue immediately. Otherwise, wait.
 - On Exit, decrement recursion count and release only when it reaches 0.
 
 
Using SemaphoreSlim is certainly easier, but I have no idea whether reentrancy is actually required by the design, or if that is just something that happened by accident because ReentrantLock was the primitive that was chosen for synchronization. It would be possible to refactor the design to provide protected methods that are not protected by SemaphoreSlim so subclasses can avoid reentrancy. It would be harder to implement subclasses, but possible. I am not sure whether that covers general callers, though.
In the long run, implementing an AsyncReentrantLock could prove to be very useful for converting other parts of Lucene to be async, and would cause less pain for places where it is unknown whether reentrancy is a requirement.
Here is what such an implementation could look like. Note that I haven't tested this, it is straight out of ChatGPT. There may be a smarter way to do it than using a lock, but if we keep it, we would need to change lock (this) to UninterruptableMonitor.Enter() and UninterruptableMonitor.Exit() in a try/catch, to ensure it doesn't throw on thread interrupt.
AsyncReentrantLock - click to expand
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;
/// <summary>
/// A lightweight async-compatible reentrant lock.
/// </summary>
public sealed class AsyncReentrantLock
{
    private readonly SemaphoreSlim _semaphore = new(1, 1);
    // Owner + recursion count
    private int? _ownerThreadId;
    private int _recursionCount;
    // Track waiting requests
    private int _waitingCount;
    /// <summary> Number of threads/tasks currently waiting for the lock. </summary>
    public int WaitingCount => Volatile.Read(ref _waitingCount);
    /// <summary> True if the lock is currently held. </summary>
    public bool IsLocked => _ownerThreadId != null;
    /// <summary> Thread ID of the owner, or null if none. </summary>
    public int? OwnerThreadId => _ownerThreadId;
    /// <summary> Recursion depth of the current owner. </summary>
    public int RecursionCount => _recursionCount;
    // -----------------------
    // Sync API
    // -----------------------
    public void Lock(CancellationToken cancellationToken = default)
    {
        int currentId = Thread.CurrentThread.ManagedThreadId;
        lock (this)
        {
            if (_ownerThreadId == currentId)
            {
                _recursionCount++;
                return;
            }
        }
        Interlocked.Increment(ref _waitingCount);
        try
        {
            _semaphore.Wait(cancellationToken);
        }
        finally
        {
            Interlocked.Decrement(ref _waitingCount);
        }
        lock (this)
        {
            _ownerThreadId = currentId;
            _recursionCount = 1;
        }
    }
    // -----------------------
    // Async API
    // -----------------------
    public async Task LockAsync(CancellationToken cancellationToken = default)
    {
        int currentId = Thread.CurrentThread.ManagedThreadId;
        lock (this)
        {
            if (_ownerThreadId == currentId)
            {
                _recursionCount++;
                return;
            }
        }
        Interlocked.Increment(ref _waitingCount);
        try
        {
            await _semaphore.WaitAsync(cancellationToken).ConfigureAwait(false);
        }
        finally
        {
            Interlocked.Decrement(ref _waitingCount);
        }
        lock (this)
        {
            _ownerThreadId = currentId;
            _recursionCount = 1;
        }
    }
    // -----------------------
    // Unlock
    // -----------------------
    public void Unlock()
    {
        int currentId = Thread.CurrentThread.ManagedThreadId;
        lock (this)
        {
            if (_ownerThreadId != currentId)
                throw new SynchronizationLockException("Current thread does not own the lock.");
            _recursionCount--;
            if (_recursionCount == 0)
            {
                _ownerThreadId = null;
                _semaphore.Release();
            }
        }
    }
}AsyncReentrantLockTests - click to expand
using NUnit.Framework;
using System;
using System.Threading;
using System.Threading.Tasks;
[TestFixture]
public class AsyncReentrantLockTests
{
    [Test]
    public void SyncLock_IsReentrant()
    {
        var l = new AsyncReentrantLock();
        l.Lock();
        Assert.That(l.IsLocked, Is.True);
        Assert.That(l.RecursionCount, Is.EqualTo(1));
        l.Lock();
        Assert.That(l.RecursionCount, Is.EqualTo(2));
        l.Unlock();
        Assert.That(l.RecursionCount, Is.EqualTo(1));
        l.Unlock();
        Assert.That(l.IsLocked, Is.False);
    }
    [Test]
    public async Task AsyncLock_IsReentrant()
    {
        var l = new AsyncReentrantLock();
        await l.LockAsync();
        Assert.That(l.IsLocked, Is.True);
        await l.LockAsync();
        Assert.That(l.RecursionCount, Is.EqualTo(2));
        l.Unlock();
        Assert.That(l.RecursionCount, Is.EqualTo(1));
        l.Unlock();
        Assert.That(l.IsLocked, Is.False);
    }
    [Test]
    public void ThrowsIfUnlockedByOtherThread()
    {
        var l = new AsyncReentrantLock();
        l.Lock();
        var ex = Assert.Throws<SynchronizationLockException>(() =>
        {
            var t = new Thread(() => l.Unlock());
            t.Start();
            t.Join();
        });
        Assert.That(ex.Message, Does.Contain("does not own"));
    }
    [Test]
    public async Task LockAsync_BlocksUntilReleased()
    {
        var l = new AsyncReentrantLock();
        l.Lock();
        var task = Task.Run(async () =>
        {
            await Task.Delay(100);
            l.Unlock();
        });
        await l.LockAsync(); // should block until Unlock is called
        Assert.That(l.IsLocked, Is.True);
        l.Unlock();
        await task;
    }
    [Test]
    public async Task WaitingCount_IsAccurate()
    {
        var l = new AsyncReentrantLock();
        l.Lock();
        var t1 = l.LockAsync();
        var t2 = l.LockAsync();
        await Task.Delay(50); // let them enqueue
        Assert.That(l.WaitingCount, Is.EqualTo(2));
        l.Unlock(); // release once
        await Task.WhenAll(t1, t2);
        l.Unlock(); // release final owner
    }
}There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, my bad — I was just trying to stay aligned with the sync version at that time.
But after looking more carefully, I think we don’t actually need ReentrantLock for the sync version either. From what I see, inside doUpdate or doUpdateAsync there aren’t any reentrant scenarios, so it should be safe to use SemaphoreSlim in both cases.
@NightOwl888  @paulirwin
Please correct me if I’m missing something or if there are still chances of reentrancy in the sync version.
| /// <param name="handler"></param> | ||
| /// <param name="factory"></param> | ||
| /// <exception cref="ArgumentNullException"></exception> | ||
| public ReplicationClient(IAsyncReplicator asyncReplicator, IReplicationHandler handler, ISourceDirectoryFactory factory) | 
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I suppose it is unlikely a user will require both async and synchronous methods at the same time. And it would clarify the StartAsyncUpdateLoop vs StartUpdateThread() logic. Although, I think both of those could be potentially combined.
I am hesitant to get on board with having completely separate implementations, though. That is not typically how concrete components in the BCL evolve. Usually, the synchronous and asynchronous methods exist side by side.
| EnsureOpen(); | ||
| 
               | 
          ||
| // Acquire the same update lock to prevent concurrent updates | ||
| updateLock.Lock(); | 
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
According to ChatGPT, no this is not. Calling async code while using a synchronous lock can lead to deadlocks.
Do note that in other places where the call is to Task.Run(), this is fine to do.
Options
- Use 
SemaphoreSliminstead ofReentrantLock. Note thatSemaphoreSlimis not re-entrant, though.- Safe across await.
 - Prevents reentrant calls, which may actually be a feature (forces clear lock discipline).
 - But if you really need recursion/reentrancy, this won’t do.
 
 - Implement an async-compatible reentrant lock
- This is tricky but possible.
 - You’d track:
- Owning Thread or Task.Id
 - A recursion count
 - A 
SemaphoreSlimfor waiting 
 - On EnterAsync, if the current context already owns the lock, increment recursion count and continue immediately. Otherwise, wait.
 - On Exit, decrement recursion count and release only when it reaches 0.
 
 
Using SemaphoreSlim is certainly easier, but I have no idea whether reentrancy is actually required by the design, or if that is just something that happened by accident because ReentrantLock was the primitive that was chosen for synchronization. It would be possible to refactor the design to provide protected methods that are not protected by SemaphoreSlim so subclasses can avoid reentrancy. It would be harder to implement subclasses, but possible. I am not sure whether that covers general callers, though.
In the long run, implementing an AsyncReentrantLock could prove to be very useful for converting other parts of Lucene to be async, and would cause less pain for places where it is unknown whether reentrancy is a requirement.
Here is what such an implementation could look like. Note that I haven't tested this, it is straight out of ChatGPT. There may be a smarter way to do it than using a lock, but if we keep it, we would need to change lock (this) to UninterruptableMonitor.Enter() and UninterruptableMonitor.Exit() in a try/catch, to ensure it doesn't throw on thread interrupt.
AsyncReentrantLock - click to expand
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;
/// <summary>
/// A lightweight async-compatible reentrant lock.
/// </summary>
public sealed class AsyncReentrantLock
{
    private readonly SemaphoreSlim _semaphore = new(1, 1);
    // Owner + recursion count
    private int? _ownerThreadId;
    private int _recursionCount;
    // Track waiting requests
    private int _waitingCount;
    /// <summary> Number of threads/tasks currently waiting for the lock. </summary>
    public int WaitingCount => Volatile.Read(ref _waitingCount);
    /// <summary> True if the lock is currently held. </summary>
    public bool IsLocked => _ownerThreadId != null;
    /// <summary> Thread ID of the owner, or null if none. </summary>
    public int? OwnerThreadId => _ownerThreadId;
    /// <summary> Recursion depth of the current owner. </summary>
    public int RecursionCount => _recursionCount;
    // -----------------------
    // Sync API
    // -----------------------
    public void Lock(CancellationToken cancellationToken = default)
    {
        int currentId = Thread.CurrentThread.ManagedThreadId;
        lock (this)
        {
            if (_ownerThreadId == currentId)
            {
                _recursionCount++;
                return;
            }
        }
        Interlocked.Increment(ref _waitingCount);
        try
        {
            _semaphore.Wait(cancellationToken);
        }
        finally
        {
            Interlocked.Decrement(ref _waitingCount);
        }
        lock (this)
        {
            _ownerThreadId = currentId;
            _recursionCount = 1;
        }
    }
    // -----------------------
    // Async API
    // -----------------------
    public async Task LockAsync(CancellationToken cancellationToken = default)
    {
        int currentId = Thread.CurrentThread.ManagedThreadId;
        lock (this)
        {
            if (_ownerThreadId == currentId)
            {
                _recursionCount++;
                return;
            }
        }
        Interlocked.Increment(ref _waitingCount);
        try
        {
            await _semaphore.WaitAsync(cancellationToken).ConfigureAwait(false);
        }
        finally
        {
            Interlocked.Decrement(ref _waitingCount);
        }
        lock (this)
        {
            _ownerThreadId = currentId;
            _recursionCount = 1;
        }
    }
    // -----------------------
    // Unlock
    // -----------------------
    public void Unlock()
    {
        int currentId = Thread.CurrentThread.ManagedThreadId;
        lock (this)
        {
            if (_ownerThreadId != currentId)
                throw new SynchronizationLockException("Current thread does not own the lock.");
            _recursionCount--;
            if (_recursionCount == 0)
            {
                _ownerThreadId = null;
                _semaphore.Release();
            }
        }
    }
}AsyncReentrantLockTests - click to expand
using NUnit.Framework;
using System;
using System.Threading;
using System.Threading.Tasks;
[TestFixture]
public class AsyncReentrantLockTests
{
    [Test]
    public void SyncLock_IsReentrant()
    {
        var l = new AsyncReentrantLock();
        l.Lock();
        Assert.That(l.IsLocked, Is.True);
        Assert.That(l.RecursionCount, Is.EqualTo(1));
        l.Lock();
        Assert.That(l.RecursionCount, Is.EqualTo(2));
        l.Unlock();
        Assert.That(l.RecursionCount, Is.EqualTo(1));
        l.Unlock();
        Assert.That(l.IsLocked, Is.False);
    }
    [Test]
    public async Task AsyncLock_IsReentrant()
    {
        var l = new AsyncReentrantLock();
        await l.LockAsync();
        Assert.That(l.IsLocked, Is.True);
        await l.LockAsync();
        Assert.That(l.RecursionCount, Is.EqualTo(2));
        l.Unlock();
        Assert.That(l.RecursionCount, Is.EqualTo(1));
        l.Unlock();
        Assert.That(l.IsLocked, Is.False);
    }
    [Test]
    public void ThrowsIfUnlockedByOtherThread()
    {
        var l = new AsyncReentrantLock();
        l.Lock();
        var ex = Assert.Throws<SynchronizationLockException>(() =>
        {
            var t = new Thread(() => l.Unlock());
            t.Start();
            t.Join();
        });
        Assert.That(ex.Message, Does.Contain("does not own"));
    }
    [Test]
    public async Task LockAsync_BlocksUntilReleased()
    {
        var l = new AsyncReentrantLock();
        l.Lock();
        var task = Task.Run(async () =>
        {
            await Task.Delay(100);
            l.Unlock();
        });
        await l.LockAsync(); // should block until Unlock is called
        Assert.That(l.IsLocked, Is.True);
        l.Unlock();
        await task;
    }
    [Test]
    public async Task WaitingCount_IsAccurate()
    {
        var l = new AsyncReentrantLock();
        l.Lock();
        var t1 = l.LockAsync();
        var t2 = l.LockAsync();
        await Task.Delay(50); // let them enqueue
        Assert.That(l.WaitingCount, Is.EqualTo(2));
        l.Unlock(); // release once
        await Task.WhenAll(t1, t2);
        l.Unlock(); // release final owner
    }
}…safe); fix all warnings and nullable issues
…REAM_CANCELLATIONTOKEN defines for .NET 8+ and fix all warnings
| 
           @paulirwin @NightOwl888  | 
    
25f65ff    to
    8135524      
    Compare
  
    | 
           @NightOwl888 Reason: The merge brought in changes from master that we do not want in this feature branch. All intended feature commits are still intact. This keeps the branch history clean and focused on the feature work. If any changes from master are important to include, please let me know, and we can merge them properly.  | 
    
| 
           I’m seeing the check-editorconfig CI fail due to a “final newline expected” error. Locally, git diff --check doesn’t show any issues, so I suspect it might be related to line endings (CRLF vs LF) or some subtle trailing whitespace. Could you please guide me on the safest way to fix this, such as how to identify files with extra trailing whitespace or missing newlines, so the CI passes without affecting other files?  | 
    
| 
          
 That error means that after all of the content in the file, there is no newline character. So, it just needs to be added. I know it is a bit confusing - remove all trailing whitespace except at the end of the file, add a line break.  | 
    
@paulirwin @NightOwl888
Fixes:
Fixes #1181
Summary of the changes:
Implemented
IAsyncReplicatorand async methods inReplicationClientfor non-blocking replication operations.Description
This PR introduces an async Task-based API for the replication client, allowing operations such as checking for updates, obtaining files, and releasing sessions to be executed asynchronously.
Key changes:
IAsyncReplicatorinterface.HttpReplicatorto implement async versions of the operations (CheckForUpdateAsync,ObtainFileAsync,ReleaseAsync,PublishAsync).HttpReplicatorto wrap HTTP requests usingHttpClient.ReplicationClientnow exposes async methods that call intoIAsyncReplicator.This avoids synchronous HTTP calls that could deadlock or cause performance issues, while keeping
IReplicationHandlersynchronous, as Lucene.NET APIs currently do not have async equivalents.Additional context:
This implementation has been tested and works successfully when using
UpdateNowAsyncin the GSoC project by referencing the Lucene.NET repository in the GSoC extensions project.