- 
                Notifications
    You must be signed in to change notification settings 
- Fork 362
Add .NET client for dynamic pubsub subscriptions #1346
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
          
     Closed
      
      
    
                
     Closed
            
            
          Conversation
  
    
      This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
      Learn more about bidirectional Unicode characters
    
  
  
    
    …n using streaming subscriptions). Signed-off-by: Whit Waldo <whit.waldo@innovian.net>
Signed-off-by: Whit Waldo <whit.waldo@innovian.net>
Signed-off-by: Whit Waldo <whit.waldo@innovian.net>
…apr prototypes Signed-off-by: Whit Waldo <whit.waldo@innovian.net>
Signed-off-by: Whit Waldo <whit.waldo@innovian.net>
Signed-off-by: Whit Waldo <whit.waldo@innovian.net>
…n extension methods. Signed-off-by: Whit Waldo <whit.waldo@innovian.net>
Signed-off-by: Whit Waldo <whit.waldo@innovian.net>
Signed-off-by: Whit Waldo <whit.waldo@innovian.net>
Signed-off-by: Whit Waldo <whit.waldo@innovian.net>
      
        
      
      
  
    38 tasks
  
              
                    philliphoff
  
              
              reviewed
              
                  
                    Sep 11, 2024 
                  
              
              
            
            
              
                    philliphoff
  
              
              reviewed
              
                  
                    Sep 11, 2024 
                  
              
              
            
            
              
                    philliphoff
  
              
              reviewed
              
                  
                    Sep 11, 2024 
                  
              
              
            
            
        
          
                src/Dapr.Messaging/PublishSubscribe/DaprPublishSubscribeClient.cs
              
                Outdated
          
            Show resolved
            Hide resolved
        
      
              
                    philliphoff
  
              
              reviewed
              
                  
                    Sep 11, 2024 
                  
              
              
            
            
        
          
                src/Dapr.Messaging/PublishSubscribe/DaprPublishSubscribeGrpcClient.cs
              
                Outdated
          
            Show resolved
            Hide resolved
        
      
              
                    philliphoff
  
              
              reviewed
              
                  
                    Sep 11, 2024 
                  
              
              
            
            
        
          
                src/Dapr.Messaging/PublishSubscribe/DaprPublishSubscribeGrpcClient.cs
              
                Outdated
          
            Show resolved
            Hide resolved
        
      
              
                    philliphoff
  
              
              reviewed
              
                  
                    Sep 11, 2024 
                  
              
              
            
            
        
          
                src/Dapr.Messaging/PublishSubscribe/Extensions/PublishSubscribeServiceCollectionExtensions.cs
          
            Show resolved
            Hide resolved
        
      
              
                    philliphoff
  
              
              reviewed
              
                  
                    Sep 11, 2024 
                  
              
              
            
            
        
          
                src/Dapr.Messaging/PublishSubscribe/PublishSubscribeReceiver.cs
              
                Outdated
          
            Show resolved
            Hide resolved
        
      
              
                    philliphoff
  
              
              reviewed
              
                  
                    Sep 11, 2024 
                  
              
              
            
            
        
          
                src/Dapr.Messaging/PublishSubscribe/PublishSubscribeReceiver.cs
              
                Outdated
          
            Show resolved
            Hide resolved
        
      
              
                    philliphoff
  
              
              reviewed
              
                  
                    Sep 11, 2024 
                  
              
              
            
            
        
          
                src/Dapr.Messaging/PublishSubscribe/PublishSubscribeReceiver.cs
              
                Outdated
          
            Show resolved
            Hide resolved
        
      
              
                    philliphoff
  
              
              reviewed
              
                  
                    Sep 11, 2024 
                  
              
              
            
            
        
          
                src/Dapr.Messaging/PublishSubscribe/DaprPublishSubscribeClient.cs
              
                Outdated
          
            Show resolved
            Hide resolved
        
      Signed-off-by: Whit Waldo <whit.waldo@innovian.net>
…ing XML comment. Signed-off-by: Whit Waldo <whit.waldo@innovian.net>
Signed-off-by: Whit Waldo <whit.waldo@innovian.net>
Signed-off-by: Whit Waldo <whit.waldo@innovian.net>
Signed-off-by: Whit Waldo <whit.waldo@innovian.net>
Signed-off-by: Whit Waldo <whit.waldo@innovian.net>
Signed-off-by: Whit Waldo <whit.waldo@innovian.net>
…ded flag to ensure double-subscription doesn't happen to cancel stream (with multiple initial requests), updated message draining during disposal and updated unit tests. Signed-off-by: Whit Waldo <whit.waldo@innovian.net>
Signed-off-by: Whit Waldo <whit.waldo@innovian.net>
| Codecov ReportAttention: Patch coverage is  
 Additional details and impacted files@@            Coverage Diff             @@
##           master    #1346      +/-   ##
==========================================
- Coverage   67.28%   65.91%   -1.37%     
==========================================
  Files         174      184      +10     
  Lines        6025     6241     +216     
  Branches      671      698      +27     
==========================================
+ Hits         4054     4114      +60     
- Misses       1802     1957     +155     
- Partials      169      170       +1     
 Flags with carried forward coverage won't be shown. Click here to find out more. ☔ View full report in Codecov by Sentry. | 
…le try/catch block instead of having more than one Signed-off-by: Whit Waldo <whit.waldo@innovian.net>
…cancellation token throws. Minor perf improvments as spotted. Added/fixed comments. Signed-off-by: Whit Waldo <whit.waldo@innovian.net>
… operators Signed-off-by: Whit Waldo <whit.waldo@innovian.net>
              
                    philliphoff
  
              
              reviewed
              
                  
                    Sep 25, 2024 
                  
              
              
            
            
              
                    philliphoff
  
              
              reviewed
              
                  
                    Sep 25, 2024 
                  
              
              
            
            
        
          
                src/Dapr.Messaging/PublishSubscribe/PublishSubscribeReceiver.cs
              
                Outdated
          
            Show resolved
            Hide resolved
        
      …be constrained by a configurable timespan in how long it waits. Added some try/catch blocks to handle messages being written when the writer has been completed in case a Disposal happens mid-processing. Signed-off-by: Whit Waldo <whit.waldo@innovian.net>
              
                    philliphoff
  
              
              reviewed
              
                  
                    Sep 25, 2024 
                  
              
              
            
            
        
          
                src/Dapr.Messaging/PublishSubscribe/PublishSubscribeReceiver.cs
              
                Outdated
          
            Show resolved
            Hide resolved
        
      
              
                    philliphoff
  
              
              reviewed
              
                  
                    Sep 25, 2024 
                  
              
              
            
            
… the GrpcClient creates the PublishSubscribeReceiver, then calls SubscribeAsync internally. Renamed Register to `SubscribeAsync` and updated return type + example Signed-off-by: Whit Waldo <whit.waldo@innovian.net>
Signed-off-by: Whit Waldo <whit.waldo@innovian.net>
…tantiation. Added support to let developer specify a maximum number of messages that can be queued for processing (blocking new Dapr from submitting more to the replica) and tweaked how messages are written in the subscription loop to accommodate this. Signed-off-by: Whit Waldo <whit.waldo@innovian.net>
              
                    philliphoff
  
              
              reviewed
              
                  
                    Sep 25, 2024 
                  
              
              
            
            
        
          
                src/Dapr.Messaging/PublishSubscribe/PublishSubscribeReceiver.cs
              
                Outdated
          
            Show resolved
            Hide resolved
        
      Signed-off-by: Whit Waldo <whit.waldo@innovian.net>
      
        
      
      
  
    3 tasks
  
              
                    philliphoff
  
              
              reviewed
              
                  
                    Oct 15, 2024 
                  
              
              
            
            
              
                    philliphoff
  
              
              reviewed
              
                  
                    Oct 15, 2024 
                  
              
              
            
            
              
                    philliphoff
  
              
              reviewed
              
                  
                    Oct 15, 2024 
                  
              
              
            
            
  This was referenced Oct 24, 2024 
      
  
    Sign up for free
    to join this conversation on GitHub.
    Already have an account?
    Sign in to comment
  
      
  Add this suggestion to a batch that can be applied as a single commit.
  This suggestion is invalid because no changes were made to the code.
  Suggestions cannot be applied while the pull request is closed.
  Suggestions cannot be applied while viewing a subset of changes.
  Only one suggestion per line can be applied in a batch.
  Add this suggestion to a batch that can be applied as a single commit.
  Applying suggestions on deleted lines is not supported.
  You must change the existing code in this line in order to create a valid suggestion.
  Outdated suggestions cannot be applied.
  This suggestion has been applied or marked resolved.
  Suggestions cannot be applied from pending reviews.
  Suggestions cannot be applied on multi-line comments.
  Suggestions cannot be applied while the pull request is queued to merge.
  Suggestion cannot be applied right now. Please check back later.
  
    
  
    
Description
Implements streaming subscription support as a discrete Dapr.Messaging package.
This was based on the work done by @philliphoff here. The requirements call for a mechanism to return an unbounded stream of message from the subscription and a follow-up call to the sidecar for each message to convey an action that should be taken on it (e.g. drop, retry or mark as completed).
While I think his use of a delegate to force the developer to process each message and immediately notate the action that should be taken on the message, I instead wanted to expose a method returning an IAsyncEnumerable as I think there's some interesting opportunities for combining this with some Rx goodness.
My approach then also requires that in addition to setting up the initial subscription, the developer callAcknowledgeMessageAsyncwith the message ID and the action to take, but also bakes in a policy so that a default configurable action is taken after a timeout window in which the developer fails to otherwise indicate success or failure.Because a separate connection will be created with the sidecar for every combination of pubsub component and topic, the actual implementation is an internal class calledPublishSubscribeReceiverthat maintains a single streaming connection to the sidecar for each instance in theConnectionManager. In order to facilitate some future backpressure support and otherwise decouple the receipt of messages from the sidecar from the subscription, I write inbound messages to aChannel<TopicMessage>and read from it from the subscription method implementingIAsyncEnumerable<TopicMessage>. As a side effect of each message read out, it also registers the message identifier with aTaskCompletionSourceand aCancellationTokenSourcebound to the provided cancellation token so that if the developer fails to acknowledge the message within the configured timeout window, it the default message handler action will be taken automatically.Update: After hammering out a concept of the above, I ultimately agreed with @philliphoff that the approach isn't as smooth as his original approach, so I've modified it accordingly and detailed it more in this comment.
Finally, I opted to implement the extensions property on the
TopicMessagewith aDictionary<string, Value>where theValueis the Protobuf Value struct (meaning that a developer is going to have to query the type of the struct and retrieve the appropriate property with the matching value). While I considered just strongly converting all values to a string, there are some Values types this can represent that would make that cumbersome (e.g. List or other structs). Ultimately, if the developer is opting to use this, they can figure out how to retrieve the Value they intend to.Additional notes
This should ideally be merged after #1331 as it takes a dependency on a package introduced (Dapr.Common which introduces a generic client builder class and contains shared exceptions) in that PR. I created a stub for it here, but it'd be nice to avoid the conflict later on.
This implementation also creates a separate Dapr.Protos package that each of the existing libraries can take advantage of once this is merged in that ensures that there's only one place that Protos have to be updated at instead of each library having its own copy.
I think there's an opportunity to improve on how we do logging across all these distinct projects using a static logging type. As the internal GrpcClient isn't DI-injected, it eliminates several of the simpler approaches to just use an
ILogger<>orILoggerFactory. I removed the use of the configuration options as a carrier of the logging implementation as it just didn't feel like a great fit there, but I think it's worth exploring a more concrete and centrally-defined approach to more uniform logging going forward for the .NET client. But perhaps in another issue.Issue reference
We strive to have all PR being opened based on an issue, where the problem or feature have been discussed prior to implementation.
Please reference the issue this PR will close: #1324
Checklist
Please make sure you've completed the relevant tasks for this PR, out of the following list: