Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

RegisterAsStreamProducer failed: fullkey field of StreamId is null #9224

Open
nkosi23 opened this issue Nov 10, 2024 · 3 comments
Open

RegisterAsStreamProducer failed: fullkey field of StreamId is null #9224

nkosi23 opened this issue Nov 10, 2024 · 3 comments

Comments

@nkosi23
Copy link

nkosi23 commented Nov 10, 2024

I have implemented a custom PersistentStreamProvider by following the example of the Azure Queue Provider, and while the core logic seems to be working fine, I am getting the following exception:

fail: Orleans.Streams.MY_STREAM_PROVIDER[103317]
      RegisterAsStreamProducer failed
      System.ArgumentNullException: Value cannot be null. (Parameter 'bytes')
         at System.ArgumentNullException.Throw(String paramName)
         at System.Text.Encoding.GetCharCount(Byte[] bytes)
         at Orleans.Runtime.StreamId.System.ISpanFormattable.TryFormat(Span`1 destination, Int32& charsWritten, ReadOnlySpan`1 format, IFormatProvider provider) in /_/src/Orleans.Streaming/StreamId.cs:line 183
         at System.Runtime.CompilerServices.DefaultInterpolatedStringHandler.AppendFormatted[T](T value)
         at Orleans.Runtime.QualifiedStreamId.ToString() in /_/src/Orleans.Streaming/InternalStreamId.cs:line 50
         at Orleans.Streams.GrainBasedPubSubRuntime.GetRendezvousGrain(QualifiedStreamId streamId) in /_/src/Orleans.Streaming/PubSub/GrainBasedPubSubRuntime.cs:line 62
         at Orleans.Streams.GrainBasedPubSubRuntime.RegisterProducer(QualifiedStreamId streamId, GrainId streamProducer) in /_/src/Orleans.Streaming/PubSub/GrainBasedPubSubRuntime.cs:line 20
         at Orleans.Streams.StreamPubSubImpl.RegisterProducer(QualifiedStreamId streamId, GrainId streamProducer) in /_/src/Orleans.Streaming/PubSub/StreamPubSubImpl.cs:line 33
         at Orleans.Streams.PersistentStreamPullingAgent.PubsubRegisterProducer(IStreamPubSub pubSub, QualifiedStreamId streamId, GrainId meAsStreamProducer, ILogger logger) in /_/src/Orleans.Streaming/PersistentStreams/PersistentStreamPullingAgent.cs:line 802

Apparently the fullkey field of StreamId is null, while it is supposed to be serialized by the native serializer.

This exception occurs just after IList<IBatchContainer> IQueueAdapterReceiver.GetQueueMessagesAsync(maxCount) returns the list of IBatchContainers. As a result none of the consumers receive any message and the rest of the pipeline is not executed (IQueueAdapterReceiver.MessagesDeliveredAsync(messages) is not called etc...).

I have verified that the messages batches are correctly serialized and deserialized. Upon deserialization their content looks like StreamId:SERVERS_STREAM/ef3fc375fd2b490e96c20859d23a24fb,Context:,SequenceToken:[EventSequenceToken: SeqNum=3, EventIndex=0]

So the StreamId is at least set on the batch, however what has caught my attention is that the Events list is null upon deserialization. It is not even empty, it is null. And I have confirmed that it does contain events when it is serialized. Here is the IBatchContainer implementation (in f#):

[<Serializable>]
[<GenerateSerializer>]
type MyBatchContainer
    [<JsonConstructor>]
    (
        streamId: StreamId,
        events: List<obj>,
        requestContext: Dictionary<string, obj>,
        realSequenceToken: EventSequenceToken
    ) =

    [<Id(0u)>]
    let mutable sequenceToken: EventSequenceToken = null


    do
        throwIfNull events

        sequenceToken <- realSequenceToken


    new(streamId: StreamId, events: List<obj>, requestContext: Dictionary<string, obj>) =
        MyBatchContainer(streamId, events, requestContext, null)


    member this.RealSequenceToken
        with get () = sequenceToken
        and set (token: EventSequenceToken) = sequenceToken <- token

    [<Id(1u)>]
    member val StreamId = streamId with get, set

    [<Id(2u)>]
    member val Events = events with get, set

    [<Id(3u)>]
    member val RequestContext = requestContext with get, set

    interface IBatchContainer with
        member this.GetEvents<'T>() =
            events
            |> Seq.filter (fun x -> x :? 'T)
            |> Seq.map (fun x -> x :?> 'T)
            |> Seq.mapi (fun i e ->
                Tuple.Create<'T, StreamSequenceToken>(e, sequenceToken.CreateSequenceTokenForEvent(i)))

        member this.ImportRequestContext() =
            match requestContext <> null with
            | true ->
                RequestContextExtensions.Import(requestContext)
                true
            | false -> false

        member this.StreamId = streamId

        member this.SequenceToken: StreamSequenceToken = sequenceToken

And here is the generated code, which looks fine to me:

public sealed class Copier_MyBatchContainer : global::Orleans.Serialization.Cloning.IDeepCopier<global::Corp.MyBatchContainer>, global::Orleans.Serialization.Cloning.IBaseCopier<global::Corp.MyBatchContainer>
    {
        private readonly global::Orleans.Serialization.Activators.IActivator<global::Corp.MyBatchContainer> _activator;
        private readonly global::Orleans.Serialization.Codecs.ListCopier<object> _copier0;
        private readonly global::Orleans.Serialization.Codecs.DictionaryCopier<string, object> _copier1;
        [global::System.Runtime.CompilerServices.MethodImplAttribute(global::System.Runtime.CompilerServices.MethodImplOptions.AggressiveInlining)]
        public global::Corp.MyBatchContainer DeepCopy(global::Corp.MyBatchContainer original, global::Orleans.Serialization.Cloning.CopyContext context)
        {
            if (context.TryGetCopy(original, out global::Corp.MyBatchContainer existing))
                return existing;
            if (original.GetType() != typeof(global::Corp.MyBatchContainer))
                return context.DeepCopy(original);
            var result = _activator.Create();
            context.RecordCopy(original, result);
            DeepCopy(original, result, context);
            return result;
        }

        public Copier_MyBatchContainer(global::Orleans.Serialization.Activators.IActivator<global::Corp.MyBatchContainer> _activator, global::Orleans.Serialization.Serializers.ICodecProvider codecProvider)
        {
            this._activator = OrleansGeneratedCodeHelper.UnwrapService(this, _activator);
            _copier0 = OrleansGeneratedCodeHelper.GetService<global::Orleans.Serialization.Codecs.ListCopier<object>>(this, codecProvider);
            _copier1 = OrleansGeneratedCodeHelper.GetService<global::Orleans.Serialization.Codecs.DictionaryCopier<string, object>>(this, codecProvider);
        }

        [global::System.Runtime.CompilerServices.MethodImplAttribute(global::System.Runtime.CompilerServices.MethodImplOptions.AggressiveInlining)]
        public void DeepCopy(global::Corp.MyBatchContainer input, global::Corp.MyBatchContainer output, global::Orleans.Serialization.Cloning.CopyContext context)
        {
            output.StreamId = input.StreamId;
            output.Events = _copier0.DeepCopy(input.Events, context);
            output.RequestContext = _copier1.DeepCopy(input.RequestContext, context);
        }
    }

I serialize the messages using Orleans.Serialization.Serializer.SerializeToArray(batchMessage), and I have confirmed that the new native serializer is being used (ie. the one relying on the GenerateSerializer attributes)

What is particularly painful to me is that since RegisterAsStreamProducer is invoked by the Orleans framework, I have no idea where it gets the StreamId from, and therefore where the source of the issue could be. My guess is that the problem with the empty events should be unrelated since events do not even contain a StreamId, but this is not very helpful.

And with the problem with slow debugging that will only be fixed in .NET 9, inspecting external code during a debugging session is essentially impossible (exceptions all over the place due to timeouts, debug symbols not loaded etc...).

Any pointer from someone familiar with the inner working of streams would be greatly appreciated. Just an educated guess on the root cause of the RegisterAsStreamProducer failed would already be massively helpful.

But since I do not even know where Orleans.Streams.PersistentStreamPullingAgent.PubsubRegisterProducer(IStreamPubSub pubSub, QualifiedStreamId streamId, GrainId meAsStreamProducer, ILogger logger) gets this streamId, investigating has been a very painful experience, I've been on it for close to 2 days already.

@nkosi23
Copy link
Author

nkosi23 commented Nov 11, 2024

Okay, apparently the action is taking place here in the PersistentStreamPullingAgent class:

            IList<IBatchContainer> multiBatch = await rcvr.GetQueueMessagesAsync(maxCacheAddCount); // <---------------------------------


            if (multiBatch == null || multiBatch.Count == 0) return false; // queue is empty. Exit the loop. Will attempt again in the next timer callback.


            queueCache?.AddToCache(multiBatch);
            numMessages += multiBatch.Count;
            StreamInstruments.PersistentStreamReadMessages.Add(multiBatch.Count);
            if (logger.IsEnabled(LogLevel.Trace))
                logger.LogTrace(
                    (int)ErrorCode.PersistentStreamPullingAgent_11,
                    "Got {ReceivedCount} messages from queue {Queue}. So far {MessageCount} messages from this queue.",
                    multiBatch.Count,
                    myQueueId.ToStringWithHashCode(),
                    numMessages);


            foreach (var group in
                multiBatch
                .Where(m => m != null)
                .GroupBy(container => container.StreamId))
            {
                var streamId = new QualifiedStreamId(queueAdapter.Name, group.Key); // <---------------------------------
                StreamSequenceToken startToken = group.First().SequenceToken;
                StreamConsumerCollection streamData;
                if (pubSubCache.TryGetValue(streamId, out streamData))
                {
                    streamData.RefreshActivity(now);
                    StartInactiveCursors(streamData, startToken);


                }
                else
                {
                   // And finally the call to RegisterAsStreamProducer is below
                   RegisterStream(streamId, startToken, now).Ignore(); // if this is a new stream register as producer of stream in pub sub system** 
                }
            }
            return true;
        }

I think my brain got so melted eliminating possible root causes, that I lacked the freshness of mind required to simply walk up the method call tree in the source code 😅 This gives me some pointers.

@nkosi23
Copy link
Author

nkosi23 commented Nov 11, 2024

Okay i got it solved, it turned out I stumbled on very nasty situation specific to F#. Regarding the null StreamId, the problem was these lines:

    [<Id(1u)>]
    member val StreamId = streamId with get, set

   [<Id(2u)>]
    member val Events = events with get, set
....

interface IBatchContainer with
    member this.StreamId = streamId

When debugging using logs, I noticed that the Events property is actually correctly populated when accessed from outside the object (before that I was logging from within the object), which gave me the decisive clue. Unlike C#, in F# the parameters of the primary constructors are accessible across the whole class, they are not just scoped to the constructor function. In practice, this is a nice convenience made possible by the fact that everything is immutable by default in F#. But this is also what screwed me here.

I have indeed realized that Orleans' native serializer does not use any constructor. As a result I was binding the IBatchContainer.StreamId to an empty value (namely streamId, the constructor parameter which Orleans does not populate on deserialization), while the main StreamId property was correctly deserialized by Orleans (F# only allows explicitly interface implementations). Thus the confusion.

The problem with the empty events was the same, but when fixing it, I stumbled across another null issue in:

interface IBatchContainer with
        member this.GetEvents<'T>() =
            this.Events
            |> Seq.filter (fun x -> x :? 'T)
            |> Seq.map (fun x -> x :?> 'T)
            |> Seq.mapi (fun i e ->
                Tuple.Create<'T, StreamSequenceToken>(e, sequenceToken.CreateSequenceTokenForEvent(i)))

For some reason, the sequenceToken field was null. And upon further inspection, it looks like Orleans does not generate any code for F# fields:

[<Id(0u)>]
    let mutable sequenceToken: EventSequenceToken = null

I solved the problem by turning the field into a public property, but this looks like a bug to me. Serialization matters here since the batchContainer is passed across grain method calls.

So all in all there were a bunch of serialization issues in different places for different reasons, and untangling what was going on hasn't been a fun experience at all.

@ReubenBond
Copy link
Member

And upon further inspection, it looks like Orleans does not generate any code for F# fields

This might be an issue with the compiler using "ref assemblies", which strip private metadata before the source generator sees the assembly. There is very little which Orleans can do about that. You might be able to set <ProduceReferenceAssembly>false</ProduceReferenceAssembly> to change this behavior.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants