Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Select / Switch race condition #229

Closed
ninjaoxygen opened this issue Jun 25, 2024 · 4 comments
Closed

Select / Switch race condition #229

ninjaoxygen opened this issue Jun 25, 2024 · 4 comments

Comments

@ninjaoxygen
Copy link

There seems to be a race in Switch similar to issue #228

Below is a distilled test-case of the code that triggers it. I'm not sure what the guarantees are about threaded calls to OnNext, we basically ported from dotnet/reactive to R3 and came across the issue very intermittently.

Uncommenting the await Task.Delay(10); makes it almost always go away.

First three runs of the code for me produce different results:

dotnet run
entering Subscribe for 111
Subscribe dto is not null. DeviceId = UNCHANGED
Setting value for 111 to GEN-111-A
Subscribe dto is not null. DeviceId = GEN-111-A

dotnet run
Subscribe dto is not null. DeviceId = UNCHANGED
entering Subscribe for 111
Setting value for 111 to GEN-111-A
Subscribe dto is not null. DeviceId = GEN-111-A

dotnet run
entering Subscribe for 111
Setting value for 111 to GEN-111-A
Subscribe dto is not null. DeviceId = UNCHANGED

using R3;

SyncFactory factory = new();

ReactiveProperty<string?> udid = new("111");

var subscription = udid
    .GenerateObservable<string, ExampleDto?>(factory.Observe)
    .Subscribe(
        (x) =>
        {
            var output = string.Format("Subscribe dto {0} null. DeviceId = {1}", x is null ? "is" : "is not", x?.DeviceId ?? "(null)");
            Console.Out.WriteLine(output);
        });

//udid.Value = "222";

await Task.Delay(30000);


public static class ObserverGenerators
{
    /// <summary>
    /// Generate an observable from a key source.
    /// </summary>
    /// <typeparam name="TKey">Type of key used as generator parameter.</typeparam>
    /// <typeparam name="TValue">Type of value emitted by the generated Observable.</typeparam>
    /// <param name="keyObservable">Observable responsible for emitting keys.</param>
    /// <param name="observableFactoryFunction">Factory function used to create Observable for each key.</param>
    /// <returns>Observable of TValue?.</returns>
    public static Observable<TValue?> GenerateObservable<TKey, TValue>(this Observable<TKey?> keyObservable, Func<TKey, Observable<TValue?>> observableFactoryFunction)
    {
        var x = keyObservable.Select(
            (key) =>
            {
                if (key is null)
                {
                    return Observable.Return<TValue?>(default);
                }

                var value = observableFactoryFunction(key);

                return value;
            }).Switch();

        return x;
    }
}

public class SyncFactory
{
    public ReadOnlyReactiveProperty<ExampleDto?> Observe(string udid)
    {
        var provider = new SyncProvider(udid);

        return provider.Observable;
    }
}

public class SyncProvider
{
    private readonly string _udid;
    private readonly ReactiveProperty<ExampleDto?> _property;
    private readonly ReadOnlyReactiveProperty<ExampleDto?> _readOnlyProperty;

    public SyncProvider(string udid)
    {
        _udid = udid;
        _property = new(new ExampleDto(false, "UNCHANGED")); // establish property with a default value until async subscribe completes
        _readOnlyProperty = _property.ToReadOnlyReactiveProperty();

        // start async Subscribe process, so initially our Property will have the value from above.
        _ = Task.Run(SubscribeAsync);
    }

    public ReadOnlyReactiveProperty<ExampleDto?> Observable => _readOnlyProperty;

    /// <summary>
    /// Fake subscription to remote service.
    /// In reality, this is an Orleans grain Observer, our first value
    /// is the return from a Task<ExampleDto> Subscribe() call which is immediately delivered to OnNext.
    /// </summary>
    /// <returns>Task.</returns>
    public async Task SubscribeAsync()
    {
        Console.WriteLine("entering Subscribe for {0}", _udid);
        // await Task.Delay(10); // simulate Subscribe call round-trip time
        var value = $"GEN-{_udid}-A";
        Console.WriteLine("Setting value for {0} to {1}", _udid, value);
        _property.OnNext(new ExampleDto(true, value));
    }
}

public record ExampleDto(
    bool Configured,
    string? DeviceId);
@nepolak
Copy link

nepolak commented Jun 25, 2024

You could try building and linking R3 package from my forked repo from the pull request with fixed ReactiveProperty and see if it helps.

@ninjaoxygen
Copy link
Author

You could try building and linking R3 package from my forked repo from the pull request with fixed ReactiveProperty and see if it helps.

Tried your fork, it definitely solves it in the test case, so looks like the same underlying cause. I will try it on our full code base.

Many thanks for your issue and patch, I had started down the same lines of investigation and was scattering logging and extra locks in Select/Switch without it helping, they just made the problem occur less frequently.

I will check with our full code and post results, happy to close as a dupe of #228.

@ninjaoxygen
Copy link
Author

After testing with our wider codebase, the PR from @nepolak with the locks does fix the issue.

I understand the reluctance to use heavy locking, it might be possible to add a fast-path flag that skips the lock once the window where the race condition has passed?

As #228 is closed without merging the above mentioned PR, this still stands as a separate reproducible bug.

@neuecc
Copy link
Member

neuecc commented Jul 17, 2024

Finally, we added SynchronizedReactiveProperty in v1.1.15.

@neuecc neuecc closed this as completed Jul 17, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants