Skip to content

Commit

Permalink
Add namespace support for c# sdk
Browse files Browse the repository at this point in the history
  • Loading branch information
tsaitsung-han.tht committed Jul 9, 2024
1 parent 6292e81 commit 3925413
Show file tree
Hide file tree
Showing 14 changed files with 104 additions and 25 deletions.
1 change: 1 addition & 0 deletions csharp/rocketmq-client-csharp/Client.cs
Original file line number Diff line number Diff line change
Expand Up @@ -313,6 +313,7 @@ private async Task<TopicRouteData> FetchTopicRoute0(string topic)
{
Topic = new Proto::Resource
{
ResourceNamespace = ClientConfig.Namespace,
Name = topic
},
Endpoints = Endpoints.ToProtobuf()
Expand Down
14 changes: 12 additions & 2 deletions csharp/rocketmq-client-csharp/ClientConfig.cs
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,13 @@ namespace Org.Apache.Rocketmq
public class ClientConfig
{
private ClientConfig(ISessionCredentialsProvider sessionCredentialsProvider, TimeSpan requestTimeout,
string endpoints, bool sslEnabled)
string endpoints, bool sslEnabled, string namespaceName)
{
SessionCredentialsProvider = sessionCredentialsProvider;
RequestTimeout = requestTimeout;
Endpoints = endpoints;
SslEnabled = sslEnabled;
Namespace = namespaceName;
}

public ISessionCredentialsProvider SessionCredentialsProvider { get; }
Expand All @@ -37,13 +38,16 @@ private ClientConfig(ISessionCredentialsProvider sessionCredentialsProvider, Tim
public string Endpoints { get; }

public bool SslEnabled { get; }

public string Namespace { get; }

public class Builder
{
private ISessionCredentialsProvider _sessionCredentialsProvider;
private TimeSpan _requestTimeout = TimeSpan.FromSeconds(3);
private string _endpoints;
private bool _sslEnabled = true;
private string _namespace = "";

public Builder SetCredentialsProvider(ISessionCredentialsProvider sessionCredentialsProvider)
{
Expand All @@ -68,10 +72,16 @@ public Builder EnableSsl(bool sslEnabled)
_sslEnabled = sslEnabled;
return this;
}

public Builder SetNamespace(string namespaceName)
{
_namespace = namespaceName;
return this;
}

public ClientConfig Build()
{
return new ClientConfig(_sessionCredentialsProvider, _requestTimeout, _endpoints, _sslEnabled);
return new ClientConfig(_sessionCredentialsProvider, _requestTimeout, _endpoints, _sslEnabled, _namespace);
}
}
}
Expand Down
2 changes: 2 additions & 0 deletions csharp/rocketmq-client-csharp/Consumer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,7 @@ protected Proto.ReceiveMessageRequest WrapReceiveMessageRequest(int batchSize, M
{
var group = new Proto.Resource
{
ResourceNamespace = ClientConfig.Namespace,
Name = ConsumerGroup
};
return new Proto.ReceiveMessageRequest
Expand All @@ -109,6 +110,7 @@ internal Proto.ReceiveMessageRequest WrapReceiveMessageRequest(int batchSize, Me
{
var group = new Proto.Resource
{
ResourceNamespace = ClientConfig.Namespace,
Name = ConsumerGroup
};
return new Proto.ReceiveMessageRequest
Expand Down
7 changes: 4 additions & 3 deletions csharp/rocketmq-client-csharp/Producer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ private Producer(ClientConfig clientConfig, ConcurrentDictionary<string, bool> p
int maxAttempts, ITransactionChecker checker) : base(clientConfig)
{
var retryPolicy = ExponentialBackoffRetryPolicy.ImmediatelyRetryPolicy(maxAttempts);
PublishingSettings = new PublishingSettings(ClientId, Endpoints, retryPolicy,
PublishingSettings = new PublishingSettings(ClientConfig.Namespace, ClientId, Endpoints, retryPolicy,
clientConfig.RequestTimeout, publishingTopics);
_publishingRouteDataCache = new ConcurrentDictionary<string, PublishingLoadBalancer>();
_publishingTopics = publishingTopics;
Expand Down Expand Up @@ -192,11 +192,11 @@ public async Task<SendReceipt> Send(Message message, ITransaction transaction)
return sendReceipt;
}

private static Proto.SendMessageRequest WrapSendMessageRequest(PublishingMessage message, MessageQueue mq)
private Proto.SendMessageRequest WrapSendMessageRequest(PublishingMessage message, MessageQueue mq)
{
return new Proto.SendMessageRequest
{
Messages = { message.ToProtobuf(mq.QueueId) }
Messages = { message.ToProtobuf(ClientConfig.Namespace, mq.QueueId) }
};
}

Expand Down Expand Up @@ -331,6 +331,7 @@ internal async Task EndTransaction(Endpoints endpoints, string topic, string mes
{
var topicResource = new Proto.Resource
{
ResourceNamespace = ClientConfig.Namespace,
Name = topic
};
var request = new Proto.EndTransactionRequest
Expand Down
3 changes: 2 additions & 1 deletion csharp/rocketmq-client-csharp/PublishingMessage.cs
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ public PublishingMessage(Message message, PublishingSettings publishingSettings,
MessageType = MessageType.Transaction;
}

public Proto::Message ToProtobuf(int queueId)
public Proto::Message ToProtobuf(string namespaceName, int queueId)
{
var systemProperties = new Proto.SystemProperties
{
Expand Down Expand Up @@ -105,6 +105,7 @@ public PublishingMessage(Message message, PublishingSettings publishingSettings,

var topicResource = new Proto.Resource
{
ResourceNamespace = namespaceName,
Name = Topic
};
return new Proto.Message
Expand Down
7 changes: 4 additions & 3 deletions csharp/rocketmq-client-csharp/PublishingSettings.cs
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,8 @@ public class PublishingSettings : Settings
private volatile int _maxBodySizeBytes = 4 * 1024 * 1024;
private volatile bool _validateMessageType = true;

public PublishingSettings(string clientId, Endpoints endpoints, IRetryPolicy retryPolicy,
TimeSpan requestTimeout, ConcurrentDictionary<string, bool> topics) : base(clientId, ClientType.Producer,
public PublishingSettings(string namespaceName, string clientId, Endpoints endpoints, IRetryPolicy retryPolicy,
TimeSpan requestTimeout, ConcurrentDictionary<string, bool> topics) : base(namespaceName, clientId, ClientType.Producer,
endpoints, retryPolicy, requestTimeout)
{
Topics = topics;
Expand Down Expand Up @@ -66,7 +66,8 @@ public override void Sync(Proto::Settings settings)

public override Proto.Settings ToProtobuf()
{
var topics = Topics.Select(topic => new Proto.Resource { Name = topic.Key }).ToList();
var topics = Topics.Select(topic =>
new Proto.Resource { ResourceNamespace = Namespace, Name = topic.Key }).ToList();

var publishing = new Proto.Publishing();
publishing.Topics.Add(topics);
Expand Down
7 changes: 6 additions & 1 deletion csharp/rocketmq-client-csharp/PushConsumer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ public PushConsumer(ClientConfig clientConfig, string consumerGroup,
_clientConfig = clientConfig;
_consumerGroup = consumerGroup;
_subscriptionExpressions = subscriptionExpressions;
_pushSubscriptionSettings = new PushSubscriptionSettings(ClientId, Endpoints, consumerGroup,
_pushSubscriptionSettings = new PushSubscriptionSettings(_clientConfig.Namespace, ClientId, Endpoints, consumerGroup,
clientConfig.RequestTimeout, subscriptionExpressions);
_cacheAssignments = new ConcurrentDictionary<string, Assignments>();
_messageListener = messageListener;
Expand Down Expand Up @@ -320,6 +320,7 @@ private QueryAssignmentRequest WrapQueryAssignmentRequest(string topic)
{
var topicResource = new Proto.Resource
{
ResourceNamespace = _clientConfig.Namespace,
Name = topic
};
return new QueryAssignmentRequest
Expand Down Expand Up @@ -399,6 +400,7 @@ protected internal ChangeInvisibleDurationRequest WrapChangeInvisibleDuration(Me
{
var topicResource = new Proto.Resource
{
ResourceNamespace = _clientConfig.Namespace,
Name = messageView.Topic
};
return new Proto.ChangeInvisibleDurationRequest
Expand All @@ -415,6 +417,7 @@ protected internal AckMessageRequest WrapAckMessageRequest(MessageView messageVi
{
var topicResource = new Proto.Resource
{
ResourceNamespace = _clientConfig.Namespace,
Name = messageView.Topic
};
var entry = new Proto.AckMessageEntry
Expand All @@ -434,6 +437,7 @@ protected internal ForwardMessageToDeadLetterQueueRequest WrapForwardMessageToDe
{
var topicResource = new Proto.Resource
{
ResourceNamespace = _clientConfig.Namespace,
Name = messageView.Topic
};

Expand Down Expand Up @@ -560,6 +564,7 @@ private Proto.Resource GetProtobufGroup()
{
return new Proto.Resource()
{
ResourceNamespace = _clientConfig.Namespace,
Name = ConsumerGroup
};
}
Expand Down
7 changes: 4 additions & 3 deletions csharp/rocketmq-client-csharp/PushSubscriptionSettings.cs
Original file line number Diff line number Diff line change
Expand Up @@ -34,11 +34,11 @@ public class PushSubscriptionSettings : Settings
private volatile int _receiveBatchSize = 32;
private TimeSpan _longPollingTimeout = TimeSpan.FromSeconds(30);

public PushSubscriptionSettings(string clientId, Endpoints endpoints, string consumerGroup,
public PushSubscriptionSettings(string namespaceName, string clientId, Endpoints endpoints, string consumerGroup,
TimeSpan requestTimeout, ConcurrentDictionary<string, FilterExpression> subscriptionExpressions)
: base(clientId, ClientType.PushConsumer, endpoints, requestTimeout)
: base(namespaceName, clientId, ClientType.PushConsumer, endpoints, requestTimeout)
{
_group = new Resource(consumerGroup);
_group = new Resource(namespaceName, consumerGroup);
_subscriptionExpressions = subscriptionExpressions;
}

Expand All @@ -64,6 +64,7 @@ public override Proto.Settings ToProtobuf()
{
var topic = new Proto.Resource()
{
ResourceNamespace = Namespace,
Name = key
};
var filterExpression = new Proto.FilterExpression()
Expand Down
48 changes: 46 additions & 2 deletions csharp/rocketmq-client-csharp/Resource.cs
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,19 @@
* limitations under the License.
*/

using System;
using Proto = Apache.Rocketmq.V2;

namespace Org.Apache.Rocketmq
{
public class Resource
public class Resource : IEquatable<Resource>
{
public Resource(string namespaceName, string name)
{
Namespace = namespaceName;
Name = name;
}

public Resource(Proto.Resource resource)
{
Namespace = resource.ResourceNamespace;
Expand All @@ -33,7 +40,7 @@ public Resource(string name)
Name = name;
}

private string Namespace { get; }
public string Namespace { get; }
public string Name { get; }

public Proto.Resource ToProtobuf()
Expand All @@ -45,9 +52,46 @@ public Proto.Resource ToProtobuf()
};
}

public bool Equals(Resource other)
{
if (ReferenceEquals(null, other))
{
return false;
}

if (ReferenceEquals(this, other))
{
return true;
}

return Name == other.Name && Namespace == other.Namespace;
}

public override bool Equals(object obj)
{
if (ReferenceEquals(null, obj))
{
return false;
}

if (ReferenceEquals(this, obj))
{
return true;
}

return obj.GetType() == GetType() && Equals((Resource) obj);
}

public override int GetHashCode()
{
return HashCode.Combine(Namespace, Name);
}

public override string ToString()
{
return string.IsNullOrEmpty(Namespace) ? Name : $"{Namespace}.{Name}";
}


}
}
7 changes: 5 additions & 2 deletions csharp/rocketmq-client-csharp/Settings.cs
Original file line number Diff line number Diff line change
Expand Up @@ -22,24 +22,27 @@ namespace Org.Apache.Rocketmq
{
public abstract class Settings
{
protected readonly string Namespace;
protected readonly string ClientId;
protected readonly ClientType ClientType;
protected readonly Endpoints Endpoints;
protected volatile IRetryPolicy RetryPolicy;
protected readonly TimeSpan RequestTimeout;

protected Settings(string clientId, ClientType clientType, Endpoints endpoints, IRetryPolicy retryPolicy,
protected Settings(string namespaceName, string clientId, ClientType clientType, Endpoints endpoints, IRetryPolicy retryPolicy,
TimeSpan requestTimeout)
{
Namespace = namespaceName;
ClientId = clientId;
ClientType = clientType;
Endpoints = endpoints;
RetryPolicy = retryPolicy;
RequestTimeout = requestTimeout;
}

protected Settings(string clientId, ClientType clientType, Endpoints endpoints, TimeSpan requestTimeout)
protected Settings(string namespaceName, string clientId, ClientType clientType, Endpoints endpoints, TimeSpan requestTimeout)
{
Namespace = namespaceName;
ClientId = clientId;
ClientType = clientType;
Endpoints = endpoints;
Expand Down
8 changes: 7 additions & 1 deletion csharp/rocketmq-client-csharp/SimpleConsumer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@ public class SimpleConsumer : Consumer, IAsyncDisposable, IDisposable
private readonly TimeSpan _awaitDuration;
private readonly SimpleSubscriptionSettings _simpleSubscriptionSettings;
private int _topicRoundRobinIndex;

private readonly ClientConfig _clientConfig;

public SimpleConsumer(ClientConfig clientConfig, string consumerGroup, TimeSpan awaitDuration,
Dictionary<string, FilterExpression> subscriptionExpressions) : this(clientConfig, consumerGroup,
Expand All @@ -48,9 +50,10 @@ private SimpleConsumer(ClientConfig clientConfig, string consumerGroup, TimeSpan
_awaitDuration = awaitDuration;
_subscriptionRouteDataCache = new ConcurrentDictionary<string, SubscriptionLoadBalancer>();
_subscriptionExpressions = subscriptionExpressions;
_simpleSubscriptionSettings = new SimpleSubscriptionSettings(ClientId, Endpoints,
_simpleSubscriptionSettings = new SimpleSubscriptionSettings(clientConfig.Namespace, ClientId, Endpoints,
ConsumerGroup, clientConfig.RequestTimeout, awaitDuration, subscriptionExpressions);
_topicRoundRobinIndex = 0;
_clientConfig = clientConfig;
}

public async Task Subscribe(string topic, FilterExpression filterExpression)
Expand Down Expand Up @@ -240,6 +243,7 @@ private Proto.AckMessageRequest WrapAckMessageRequest(MessageView messageView)
{
var topicResource = new Proto.Resource
{
ResourceNamespace = _clientConfig.Namespace,
Name = messageView.Topic
};
var entry = new Proto.AckMessageEntry
Expand All @@ -260,6 +264,7 @@ private Proto.ChangeInvisibleDurationRequest WrapChangeInvisibleDuration(Message
{
var topicResource = new Proto.Resource
{
ResourceNamespace = _clientConfig.Namespace,
Name = messageView.Topic
};
return new Proto.ChangeInvisibleDurationRequest
Expand All @@ -276,6 +281,7 @@ private Proto.Resource GetProtobufGroup()
{
return new Proto.Resource()
{
ResourceNamespace = _clientConfig.Namespace,
Name = ConsumerGroup
};
}
Expand Down
7 changes: 4 additions & 3 deletions csharp/rocketmq-client-csharp/SimpleSubscriptionSettings.cs
Original file line number Diff line number Diff line change
Expand Up @@ -32,12 +32,12 @@ public class SimpleSubscriptionSettings : Settings
private readonly TimeSpan _longPollingTimeout;
private readonly ConcurrentDictionary<string /* topic */, FilterExpression> _subscriptionExpressions;

public SimpleSubscriptionSettings(string clientId, Endpoints endpoints, string consumerGroup,
public SimpleSubscriptionSettings(string namespaceName, string clientId, Endpoints endpoints, string consumerGroup,
TimeSpan requestTimeout, TimeSpan longPollingTimeout,
ConcurrentDictionary<string, FilterExpression> subscriptionExpressions) : base(
clientId, ClientType.SimpleConsumer, endpoints, requestTimeout)
namespaceName, clientId, ClientType.SimpleConsumer, endpoints, requestTimeout)
{
_group = new Resource(consumerGroup);
_group = new Resource(namespaceName, consumerGroup);
_longPollingTimeout = longPollingTimeout;
_subscriptionExpressions = subscriptionExpressions;
}
Expand All @@ -58,6 +58,7 @@ public override Proto.Settings ToProtobuf()
{
var topic = new Proto.Resource()
{
ResourceNamespace = Namespace,
Name = key,
};
var subscriptionEntry = new Proto.SubscriptionEntry();
Expand Down
Loading

0 comments on commit 3925413

Please sign in to comment.