diff --git a/csharp/examples/ProducerTransactionMessageExample.cs b/csharp/examples/ProducerTransactionMessageExample.cs index d353f743..2a5fc75a 100644 --- a/csharp/examples/ProducerTransactionMessageExample.cs +++ b/csharp/examples/ProducerTransactionMessageExample.cs @@ -76,9 +76,9 @@ internal static async Task QuickStart() var sendReceipt = await producer.Send(message, transaction); Logger.LogInformation("Send transaction message successfully, messageId={}", sendReceipt.MessageId); // Commit the transaction. - transaction.Commit(); + await transaction.Commit(); // Or rollback the transaction. - // transaction.Rollback(); + // await transaction.Rollback(); // Close the producer if you don't need it anymore. await producer.DisposeAsync(); diff --git a/csharp/rocketmq-client-csharp/Client.cs b/csharp/rocketmq-client-csharp/Client.cs index cf55936c..25fcad65 100644 --- a/csharp/rocketmq-client-csharp/Client.cs +++ b/csharp/rocketmq-client-csharp/Client.cs @@ -21,10 +21,13 @@ using System.Threading; using System; using System.Linq; +using System.Runtime.CompilerServices; using Microsoft.Extensions.Logging; using Proto = Apache.Rocketmq.V2; using grpc = Grpc.Core; +[assembly:InternalsVisibleTo("tests")] +[assembly: InternalsVisibleTo("DynamicProxyGenAssembly2")] namespace Org.Apache.Rocketmq { public abstract class Client @@ -49,7 +52,7 @@ public abstract class Client protected readonly ClientConfig ClientConfig; protected readonly Endpoints Endpoints; - protected readonly IClientManager ClientManager; + protected IClientManager ClientManager; protected readonly string ClientId; protected readonly ClientMeterManager ClientMeterManager; @@ -151,11 +154,11 @@ private protected (bool, Session) GetSession(Endpoints endpoints) protected abstract IEnumerable GetTopics(); - protected abstract Proto::HeartbeatRequest WrapHeartbeatRequest(); + internal abstract Proto::HeartbeatRequest WrapHeartbeatRequest(); protected abstract void OnTopicRouteDataUpdated0(string topic, TopicRouteData topicRouteData); - private async Task OnTopicRouteDataFetched(string topic, TopicRouteData topicRouteData) + internal async Task OnTopicRouteDataFetched(string topic, TopicRouteData topicRouteData) { var routeEndpoints = new HashSet(); foreach (var mq in topicRouteData.MessageQueues) @@ -398,7 +401,7 @@ internal grpc.Metadata Sign() return metadata; } - protected abstract Proto::NotifyClientTerminationRequest WrapNotifyClientTerminationRequest(); + internal abstract Proto::NotifyClientTerminationRequest WrapNotifyClientTerminationRequest(); private async void NotifyClientTermination() { @@ -438,6 +441,12 @@ internal IClientManager GetClientManager() return ClientManager; } + // Only for testing + internal void SetClientManager(IClientManager clientManager) + { + ClientManager = clientManager; + } + internal virtual void OnRecoverOrphanedTransactionCommand(Endpoints endpoints, Proto.RecoverOrphanedTransactionCommand command) { diff --git a/csharp/rocketmq-client-csharp/Consumer.cs b/csharp/rocketmq-client-csharp/Consumer.cs index 2ccb8ef6..2b3c71bb 100644 --- a/csharp/rocketmq-client-csharp/Consumer.cs +++ b/csharp/rocketmq-client-csharp/Consumer.cs @@ -18,11 +18,14 @@ using System; using System.Collections.Generic; using System.Linq; +using System.Runtime.CompilerServices; using System.Text.RegularExpressions; using System.Threading.Tasks; using Google.Protobuf.WellKnownTypes; using Proto = Apache.Rocketmq.V2; +[assembly:InternalsVisibleTo("tests")] +[assembly: InternalsVisibleTo("DynamicProxyGenAssembly2")] namespace Org.Apache.Rocketmq { public abstract class Consumer : Client @@ -85,7 +88,7 @@ private static Proto.FilterExpression WrapFilterExpression(FilterExpression filt }; } - protected Proto.ReceiveMessageRequest WrapReceiveMessageRequest(int batchSize, MessageQueue mq, + internal Proto.ReceiveMessageRequest WrapReceiveMessageRequest(int batchSize, MessageQueue mq, FilterExpression filterExpression, TimeSpan awaitDuration, TimeSpan invisibleDuration) { var group = new Proto.Resource diff --git a/csharp/rocketmq-client-csharp/ITransaction.cs b/csharp/rocketmq-client-csharp/ITransaction.cs index 27c770b1..7abd5daa 100644 --- a/csharp/rocketmq-client-csharp/ITransaction.cs +++ b/csharp/rocketmq-client-csharp/ITransaction.cs @@ -15,12 +15,14 @@ * limitations under the License. */ +using System.Threading.Tasks; + namespace Org.Apache.Rocketmq { public interface ITransaction { - void Commit(); + Task Commit(); - void Rollback(); + Task Rollback(); } } \ No newline at end of file diff --git a/csharp/rocketmq-client-csharp/ProcessQueue.cs b/csharp/rocketmq-client-csharp/ProcessQueue.cs index 7a5ab18e..cc085ceb 100644 --- a/csharp/rocketmq-client-csharp/ProcessQueue.cs +++ b/csharp/rocketmq-client-csharp/ProcessQueue.cs @@ -37,9 +37,9 @@ public class ProcessQueue { private static readonly ILogger Logger = MqLogManager.CreateLogger(); - private static readonly TimeSpan AckMessageFailureBackoffDelay = TimeSpan.FromSeconds(1); - private static readonly TimeSpan ChangeInvisibleDurationFailureBackoffDelay = TimeSpan.FromSeconds(1); - private static readonly TimeSpan ForwardMessageToDeadLetterQueueFailureBackoffDelay = TimeSpan.FromSeconds(1); + internal static readonly TimeSpan AckMessageFailureBackoffDelay = TimeSpan.FromSeconds(1); + internal static readonly TimeSpan ChangeInvisibleDurationFailureBackoffDelay = TimeSpan.FromSeconds(1); + internal static readonly TimeSpan ForwardMessageToDeadLetterQueueFailureBackoffDelay = TimeSpan.FromSeconds(1); private static readonly TimeSpan ReceivingFlowControlBackoffDelay = TimeSpan.FromMilliseconds(20); private static readonly TimeSpan ReceivingFailureBackoffDelay = TimeSpan.FromSeconds(1); diff --git a/csharp/rocketmq-client-csharp/Producer.cs b/csharp/rocketmq-client-csharp/Producer.cs index 0367a1cb..3f86bef9 100644 --- a/csharp/rocketmq-client-csharp/Producer.cs +++ b/csharp/rocketmq-client-csharp/Producer.cs @@ -21,24 +21,27 @@ using System.Diagnostics; using System.Diagnostics.Metrics; using System.Linq; +using System.Runtime.CompilerServices; using System.Threading.Tasks; using Microsoft.Extensions.Logging; using Proto = Apache.Rocketmq.V2; using Org.Apache.Rocketmq.Error; +[assembly:InternalsVisibleTo("tests")] +[assembly: InternalsVisibleTo("DynamicProxyGenAssembly2")] namespace Org.Apache.Rocketmq { public class Producer : Client, IAsyncDisposable, IDisposable { private static readonly ILogger Logger = MqLogManager.CreateLogger(); - private readonly ConcurrentDictionary _publishingRouteDataCache; + internal readonly ConcurrentDictionary _publishingRouteDataCache; internal readonly PublishingSettings PublishingSettings; private readonly ConcurrentDictionary _publishingTopics; private readonly ITransactionChecker _checker; private readonly Histogram _sendCostTimeHistogram; - private Producer(ClientConfig clientConfig, ConcurrentDictionary publishingTopics, + internal Producer(ClientConfig clientConfig, ConcurrentDictionary publishingTopics, int maxAttempts, ITransactionChecker checker) : base(clientConfig) { var retryPolicy = ExponentialBackoffRetryPolicy.ImmediatelyRetryPolicy(maxAttempts); @@ -102,7 +105,7 @@ protected override async Task Shutdown() } } - protected override Proto::HeartbeatRequest WrapHeartbeatRequest() + internal override Proto::HeartbeatRequest WrapHeartbeatRequest() { return new Proto::HeartbeatRequest { @@ -110,7 +113,7 @@ protected override async Task Shutdown() }; } - protected override Proto::NotifyClientTerminationRequest WrapNotifyClientTerminationRequest() + internal override Proto::NotifyClientTerminationRequest WrapNotifyClientTerminationRequest() { return new Proto::NotifyClientTerminationRequest(); } diff --git a/csharp/rocketmq-client-csharp/PushConsumer.cs b/csharp/rocketmq-client-csharp/PushConsumer.cs index d54afc50..cc67476f 100644 --- a/csharp/rocketmq-client-csharp/PushConsumer.cs +++ b/csharp/rocketmq-client-csharp/PushConsumer.cs @@ -19,6 +19,7 @@ using System.Collections.Concurrent; using System.Collections.Generic; using System.Linq; +using System.Runtime.CompilerServices; using System.Threading; using System.Threading.Tasks; using System.Threading.Tasks.Schedulers; @@ -27,6 +28,8 @@ using Proto = Apache.Rocketmq.V2; using Microsoft.Extensions.Logging; +[assembly:InternalsVisibleTo("tests")] +[assembly: InternalsVisibleTo("DynamicProxyGenAssembly2")] namespace Org.Apache.Rocketmq { public class PushConsumer : Consumer, IAsyncDisposable, IDisposable @@ -195,7 +198,7 @@ public void Unsubscribe(string topic) _subscriptionExpressions.TryRemove(topic, out _); } - private async void ScanAssignments() + internal async void ScanAssignments() { try { @@ -296,7 +299,7 @@ private async Task SyncProcessQueue(string topic, Assignments assignments, } } - private async Task QueryAssignment(string topic) + internal async Task QueryAssignment(string topic) { var endpoints = await PickEndpointsToQueryAssignments(topic); var request = WrapQueryAssignmentRequest(topic); @@ -386,7 +389,7 @@ protected override IEnumerable GetTopics() return _subscriptionExpressions.Keys; } - protected override Proto.HeartbeatRequest WrapHeartbeatRequest() + internal override Proto.HeartbeatRequest WrapHeartbeatRequest() { return new Proto::HeartbeatRequest { @@ -490,7 +493,7 @@ internal override async void OnVerifyMessageCommand(Endpoints endpoints, VerifyM } } - protected override NotifyClientTerminationRequest WrapNotifyClientTerminationRequest() + internal override NotifyClientTerminationRequest WrapNotifyClientTerminationRequest() { return new NotifyClientTerminationRequest() { diff --git a/csharp/rocketmq-client-csharp/SimpleConsumer.cs b/csharp/rocketmq-client-csharp/SimpleConsumer.cs index bc8ac66d..948c100a 100644 --- a/csharp/rocketmq-client-csharp/SimpleConsumer.cs +++ b/csharp/rocketmq-client-csharp/SimpleConsumer.cs @@ -128,7 +128,7 @@ protected override IEnumerable GetTopics() return _subscriptionExpressions.Keys; } - protected override Proto.NotifyClientTerminationRequest WrapNotifyClientTerminationRequest() + internal override Proto.NotifyClientTerminationRequest WrapNotifyClientTerminationRequest() { return new Proto.NotifyClientTerminationRequest() { @@ -136,7 +136,7 @@ protected override Proto.NotifyClientTerminationRequest WrapNotifyClientTerminat }; } - protected override Proto.HeartbeatRequest WrapHeartbeatRequest() + internal override Proto.HeartbeatRequest WrapHeartbeatRequest() { return new Proto::HeartbeatRequest { @@ -212,7 +212,7 @@ public async Task> Receive(int maxMessageNum, TimeSpan invisib return receiveMessageResult.Messages; } - public async void ChangeInvisibleDuration(MessageView messageView, TimeSpan invisibleDuration) + public async Task ChangeInvisibleDuration(MessageView messageView, TimeSpan invisibleDuration) { if (State.Running != State) { diff --git a/csharp/rocketmq-client-csharp/SimpleSubscriptionSettings.cs b/csharp/rocketmq-client-csharp/SimpleSubscriptionSettings.cs index 3dc5d05d..c5a0dfa8 100644 --- a/csharp/rocketmq-client-csharp/SimpleSubscriptionSettings.cs +++ b/csharp/rocketmq-client-csharp/SimpleSubscriptionSettings.cs @@ -78,6 +78,7 @@ public override Proto.Settings ToProtobuf() filterExpression.Expression = value.Expression; subscriptionEntry.Topic = topic; + subscriptionEntry.Expression = filterExpression; subscriptionEntries.Add(subscriptionEntry); } diff --git a/csharp/rocketmq-client-csharp/Transaction.cs b/csharp/rocketmq-client-csharp/Transaction.cs index 71d5b748..8a4df4ac 100644 --- a/csharp/rocketmq-client-csharp/Transaction.cs +++ b/csharp/rocketmq-client-csharp/Transaction.cs @@ -19,6 +19,7 @@ using System.Collections.Concurrent; using System.Collections.Generic; using System.Threading; +using System.Threading.Tasks; namespace Org.Apache.Rocketmq { @@ -44,7 +45,7 @@ public PublishingMessage TryAddMessage(Message message) _messagesLock.EnterReadLock(); try { - if (_messages.Count > MaxMessageNum) + if (_messages.Count >= MaxMessageNum) { throw new ArgumentException($"Message in transaction has exceed the threshold: {MaxMessageNum}"); } @@ -57,7 +58,7 @@ public PublishingMessage TryAddMessage(Message message) _messagesLock.EnterWriteLock(); try { - if (_messages.Count > MaxMessageNum) + if (_messages.Count >= MaxMessageNum) { throw new ArgumentException($"Message in transaction has exceed the threshold: {MaxMessageNum}"); } @@ -90,7 +91,7 @@ public void TryAddReceipt(PublishingMessage publishingMessage, SendReceipt sendR } } - public async void Commit() + public async Task Commit() { if (State.Running != _producer.State) { @@ -109,7 +110,7 @@ await _producer.EndTransaction(sendReceipt.Endpoints, publishingMessage.Topic, s } } - public async void Rollback() + public async Task Rollback() { if (State.Running != _producer.State) { diff --git a/csharp/tests/ClientManagerTest.cs b/csharp/tests/ClientManagerTest.cs new file mode 100644 index 00000000..40d4e4ea --- /dev/null +++ b/csharp/tests/ClientManagerTest.cs @@ -0,0 +1,123 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +using System; +using System.Collections.Concurrent; +using Apache.Rocketmq.V2; +using Microsoft.VisualStudio.TestTools.UnitTesting; +using Org.Apache.Rocketmq; +using Endpoints = Org.Apache.Rocketmq.Endpoints; + +namespace tests +{ + [TestClass] + public class ClientManagerTest + { + private const string ClientId = "fakeClientId"; + private static IClientManager CLIENT_MANAGER; + private readonly ClientConfig clientConfig = new ClientConfig.Builder().SetEndpoints("127.0.0.1:8080").Build(); + private static readonly Endpoints FAKE_ENDPOINTS = new Endpoints("127.0.0.1:8080"); + + [TestMethod] + public void TestHeartbeat() + { + CLIENT_MANAGER = new ClientManager(CreateTestClient()); + var request = new HeartbeatRequest(); + CLIENT_MANAGER.Heartbeat(FAKE_ENDPOINTS, request, TimeSpan.FromSeconds(1)); + CLIENT_MANAGER.Heartbeat(null, request, TimeSpan.FromSeconds(1)); + // Expect no exception thrown. + } + + [TestMethod] + public void TestSendMessage() + { + CLIENT_MANAGER = new ClientManager(CreateTestClient()); + var request = new SendMessageRequest(); + CLIENT_MANAGER.SendMessage(FAKE_ENDPOINTS, request, TimeSpan.FromSeconds(1)); + CLIENT_MANAGER.SendMessage(null, request, TimeSpan.FromSeconds(1)); + // Expect no exception thrown. + } + + [TestMethod] + public void TestQueryAssignment() + { + var request = new QueryAssignmentRequest(); + CLIENT_MANAGER.QueryAssignment(FAKE_ENDPOINTS, request, TimeSpan.FromSeconds(1)); + CLIENT_MANAGER.QueryAssignment(null, request, TimeSpan.FromSeconds(1)); + // Expect no exception thrown. + } + + [TestMethod] + public void TestReceiveMessage() + { + var request = new ReceiveMessageRequest(); + CLIENT_MANAGER.ReceiveMessage(FAKE_ENDPOINTS, request, TimeSpan.FromSeconds(1)); + CLIENT_MANAGER.ReceiveMessage(null, request, TimeSpan.FromSeconds(1)); + // Expect no exception thrown. + } + + [TestMethod] + public void TestAckMessage() + { + var request = new AckMessageRequest(); + CLIENT_MANAGER.AckMessage(FAKE_ENDPOINTS, request, TimeSpan.FromSeconds(1)); + CLIENT_MANAGER.AckMessage(null, request, TimeSpan.FromSeconds(1)); + // Expect no exception thrown. + } + + [TestMethod] + public void TestChangeInvisibleDuration() + { + var request = new ChangeInvisibleDurationRequest(); + CLIENT_MANAGER.ChangeInvisibleDuration(FAKE_ENDPOINTS, request, TimeSpan.FromSeconds(1)); + CLIENT_MANAGER.ChangeInvisibleDuration(null, request, TimeSpan.FromSeconds(1)); + // Expect no exception thrown. + } + + [TestMethod] + public void TestForwardMessageToDeadLetterQueue() + { + var request = new ForwardMessageToDeadLetterQueueRequest(); + CLIENT_MANAGER.ForwardMessageToDeadLetterQueue(FAKE_ENDPOINTS, request, TimeSpan.FromSeconds(1)); + CLIENT_MANAGER.ForwardMessageToDeadLetterQueue(null, request, TimeSpan.FromSeconds(1)); + // Expect no exception thrown. + } + + [TestMethod] + public void TestEndTransaction() + { + var request = new EndTransactionRequest(); + CLIENT_MANAGER.EndTransaction(FAKE_ENDPOINTS, request, TimeSpan.FromSeconds(1)); + CLIENT_MANAGER.EndTransaction(null, request, TimeSpan.FromSeconds(1)); + // Expect no exception thrown. + } + + [TestMethod] + public void TestNotifyClientTermination() + { + var request = new NotifyClientTerminationRequest(); + CLIENT_MANAGER.NotifyClientTermination(FAKE_ENDPOINTS, request, TimeSpan.FromSeconds(1)); + CLIENT_MANAGER.NotifyClientTermination(null, request, TimeSpan.FromSeconds(1)); + // Expect no exception thrown. + } + + private Client CreateTestClient() + { + return new Producer(clientConfig, new ConcurrentDictionary(), 1, null); + } + } +} \ No newline at end of file diff --git a/csharp/tests/ClientMeterManagerTest.cs b/csharp/tests/ClientMeterManagerTest.cs new file mode 100644 index 00000000..4432c2fa --- /dev/null +++ b/csharp/tests/ClientMeterManagerTest.cs @@ -0,0 +1,77 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +using System.Collections.Concurrent; +using Microsoft.VisualStudio.TestTools.UnitTesting; +using Org.Apache.Rocketmq; +using Metric = Org.Apache.Rocketmq.Metric; +using Proto = Apache.Rocketmq.V2; + +namespace tests +{ + [TestClass] + public class ClientMeterManagerTest + { + private const string FakeEndpoint = "127.0.0.1:8080"; + + [TestMethod] + public void TestResetWithMetricOn() + { + var clientConfig = new ClientConfig.Builder() + .SetEndpoints(FakeEndpoint) + .Build(); + var meterManager = new ClientMeterManager(CreateTestClient()); + var endpoints0 = new Proto.Endpoints { Scheme = Proto.AddressScheme.Ipv4, + Addresses = { new Proto.Address { Host = "127.0.0.1", Port = 8080 }}}; + var metric = new Metric(new Proto.Metric { On = true, Endpoints = endpoints0 }); + meterManager.Reset(metric); + Assert.IsTrue(meterManager.IsEnabled()); + } + + [TestMethod] + public void TestResetWithMetricOff() + { + var clientConfig = new ClientConfig.Builder() + .SetEndpoints(FakeEndpoint) + .Build(); + var meterManager = new ClientMeterManager(CreateTestClient()); + var endpoints0 = new Proto.Endpoints { Scheme = Proto.AddressScheme.Ipv4, + Addresses = { new Proto.Address { Host = "127.0.0.1", Port = 8080 }}}; + var metric = new Metric(new Proto.Metric { On = false, Endpoints = endpoints0 }); + meterManager.Reset(metric); + Assert.IsFalse(meterManager.IsEnabled()); + } + + private Client CreateTestClient() + { + var clientConfig = new ClientConfig.Builder() + .SetEndpoints(FakeEndpoint) + .Build(); + return new PushConsumer(clientConfig, "testGroup", + new ConcurrentDictionary(), new TestMessageListener(), + 0, 0, 1); + } + + private class TestMessageListener : IMessageListener + { + public ConsumeResult Consume(MessageView messageView) + { + return ConsumeResult.SUCCESS; + } + } + } +} \ No newline at end of file diff --git a/csharp/tests/ClientMeterTest.cs b/csharp/tests/ClientMeterTest.cs new file mode 100644 index 00000000..00e500f9 --- /dev/null +++ b/csharp/tests/ClientMeterTest.cs @@ -0,0 +1,105 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +using System; +using Microsoft.VisualStudio.TestTools.UnitTesting; +using OpenTelemetry; +using OpenTelemetry.Metrics; +using OpenTelemetry.Resources; +using Org.Apache.Rocketmq; +using Metric = Org.Apache.Rocketmq.Metric; +using Proto = Apache.Rocketmq.V2; + +namespace tests +{ + [TestClass] + public class ClientMeterTest + { + private const string MeterName = "test"; + + private MeterProvider CreateMeterProvider() + { + return Sdk.CreateMeterProviderBuilder() + .SetResourceBuilder(ResourceBuilder.CreateEmpty()) + .Build(); + } + + [TestMethod] + public void TestShutdownWithEnabledMeter() + { + var endpoints = new Endpoints(new Proto.Endpoints { Scheme = Proto.AddressScheme.Ipv4, + Addresses = { new Proto.Address { Host = "127.0.0.1", Port = 8080 }}}); + var provider = CreateMeterProvider(); + var clientId = "testClientId"; + var clientMeter = new ClientMeter(endpoints, provider, clientId); + Assert.IsTrue(clientMeter.Enabled); + clientMeter.Shutdown(); + } + + [TestMethod] + public void TestShutdownWithDisabledMeter() + { + var clientId = "testClientId"; + var clientMeter = ClientMeter.DisabledInstance(clientId); + Assert.IsFalse(clientMeter.Enabled); + clientMeter.Shutdown(); + } + + [TestMethod] + public void TestSatisfy() + { + var clientId = "testClientId"; + var clientMeter = ClientMeter.DisabledInstance(clientId); + + var metric = new Metric(new Proto.Metric { On = false }); + Assert.IsTrue(clientMeter.Satisfy(metric)); + + metric = new Metric(new Proto.Metric { On = true }); + Assert.IsTrue(clientMeter.Satisfy(metric)); + + var endpoints0 = new Proto.Endpoints { Scheme = Proto.AddressScheme.Ipv4, + Addresses = { new Proto.Address { Host = "127.0.0.1", Port = 8080 }}}; + + metric = new Metric(new Proto.Metric { On = false, Endpoints = endpoints0 }); + Assert.IsTrue(clientMeter.Satisfy(metric)); + + metric = new Metric(new Proto.Metric { On = true, Endpoints = endpoints0 }); + Assert.IsFalse(clientMeter.Satisfy(metric)); + + var endpoints = new Endpoints(endpoints0); + var provider = CreateMeterProvider(); + clientMeter = new ClientMeter(endpoints, provider, clientId); + + metric = new Metric(new Proto.Metric { On = false }); + Assert.IsFalse(clientMeter.Satisfy(metric)); + + metric = new Metric(new Proto.Metric { On = true }); + Assert.IsFalse(clientMeter.Satisfy(metric)); + + metric = new Metric(new Proto.Metric { On = false, Endpoints = endpoints0 }); + Assert.IsFalse(clientMeter.Satisfy(metric)); + + metric = new Metric(new Proto.Metric { On = true, Endpoints = endpoints0 }); + Assert.IsTrue(clientMeter.Satisfy(metric)); + + var endpoints1 = new Proto.Endpoints { Scheme = Proto.AddressScheme.Ipv4, + Addresses = { new Proto.Address { Host = "127.0.0.2", Port = 8081 }}}; + metric = new Metric(new Proto.Metric { On = true, Endpoints = endpoints1 }); + Assert.IsFalse(clientMeter.Satisfy(metric)); + } + } +} \ No newline at end of file diff --git a/csharp/tests/ClientTest.cs b/csharp/tests/ClientTest.cs new file mode 100644 index 00000000..d010400e --- /dev/null +++ b/csharp/tests/ClientTest.cs @@ -0,0 +1,166 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +using System; +using System.Collections.Concurrent; +using System.Threading; +using System.Threading.Tasks; +using Apache.Rocketmq.V2; +using Grpc.Core; +using Microsoft.VisualStudio.TestTools.UnitTesting; +using Moq; +using Org.Apache.Rocketmq; +using Endpoints = Org.Apache.Rocketmq.Endpoints; +using Proto = Apache.Rocketmq.V2; + +namespace tests +{ + [TestClass] + public class ClientTest + { + private readonly ClientConfig clientConfig = new ClientConfig.Builder().SetEndpoints("127.0.0.1:9876").Build(); + + + [TestMethod] + public async Task TestOnVerifyMessageCommand() + { + var testClient = CreateTestClient(); + var endpoints = new Endpoints("testEndpoints"); + var command = new VerifyMessageCommand { Nonce = "testNonce" }; + + var mockCall = new AsyncDuplexStreamingCall( + new MockClientStreamWriter(), + new MockAsyncStreamReader(), + null, + null, + null, + null); + var mockClientManager = new Mock(); + mockClientManager.Setup(cm => cm.Telemetry(endpoints)).Returns(mockCall); + + testClient.SetClientManager(mockClientManager.Object); + + testClient.OnVerifyMessageCommand(endpoints, command); + + mockClientManager.Verify(cm => cm.Telemetry(endpoints), Times.Once); + } + + [TestMethod] + public async Task TestOnTopicRouteDataFetchedFailure() + { + var testClient = CreateTestClient(); + var endpoints = new Endpoints("testEndpoints"); + var mq = new Proto.MessageQueue + { + Topic = new Proto::Resource + { + ResourceNamespace = "testNamespace", + Name = "testTopic" + }, + Id = 0, + Permission = Proto.Permission.ReadWrite, + Broker = new Proto::Broker + { + Name = "testBroker", + Id = 0, + Endpoints = new Proto.Endpoints { Scheme = Proto.AddressScheme.Ipv4, + Addresses = { new Proto.Address { Host = "127.0.0.1", Port = 8080 }}} + } + }; + var topicRouteData = new TopicRouteData(new[] { mq }); + + var mockCall = new AsyncDuplexStreamingCall( + new MockClientStreamWriter(), + new MockAsyncStreamReader(), + null, + null, + null, + null); + var mockClientManager = new Mock(); + mockClientManager.Setup(cm => cm.Telemetry(endpoints)).Returns(mockCall); + + testClient.SetClientManager(mockClientManager.Object); + + try + { + await testClient.OnTopicRouteDataFetched("testTopic", topicRouteData); + Assert.Fail(); + } + catch (Exception e) + { + mockClientManager.Verify(cm => cm.Telemetry(It.IsAny()), Times.Once); + } + } + + [TestMethod] + public async Task TestOnPrintThreadStackTraceCommand() + { + var testClient = CreateTestClient(); + var endpoints = new Endpoints("testEndpoints"); + var command = new PrintThreadStackTraceCommand { Nonce = "testNonce" }; + var mockCall = new AsyncDuplexStreamingCall( + new MockClientStreamWriter(), + new MockAsyncStreamReader(), + null, + null, + null, + null); + + var mockClientManager = new Mock(); + mockClientManager.Setup(cm => cm.Telemetry(endpoints)).Returns(mockCall); + + testClient.SetClientManager(mockClientManager.Object); + + // Act + testClient.OnPrintThreadStackTraceCommand(endpoints, command); + + // Assert + mockClientManager.Verify(cm => cm.Telemetry(endpoints), Times.Once); + } + + private Client CreateTestClient() + { + return new Producer(clientConfig, new ConcurrentDictionary(), 1, null); + } + + private class MockClientStreamWriter : IClientStreamWriter + { + public Task WriteAsync(T message) + { + // Simulate async operation + return Task.CompletedTask; + } + + public WriteOptions WriteOptions { get; set; } + + public Task CompleteAsync() + { + throw new NotImplementedException(); + } + } + + private class MockAsyncStreamReader : IAsyncStreamReader + { + public Task MoveNext(CancellationToken cancellationToken) + { + throw new System.NotImplementedException(); + } + + public T Current => throw new NotImplementedException(); + } + } +} \ No newline at end of file diff --git a/csharp/tests/ConsumeServiceTest.cs b/csharp/tests/ConsumeServiceTest.cs new file mode 100644 index 00000000..c7e8243a --- /dev/null +++ b/csharp/tests/ConsumeServiceTest.cs @@ -0,0 +1,179 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +using System; +using System.Collections.Generic; +using System.Linq; +using System.Text; +using System.Threading; +using System.Threading.Tasks; +using System.Threading.Tasks.Schedulers; +using Google.Protobuf; +using Google.Protobuf.WellKnownTypes; +using Microsoft.VisualStudio.TestTools.UnitTesting; +using Org.Apache.Rocketmq; +using Proto = Apache.Rocketmq.V2; + +namespace tests +{ + [TestClass] + public class ConsumeServiceTest + { + [TestMethod] + public void TestConsumeSuccess() + { + var messageListener = new TestSuccessMessageListener(); + var consumeService = new TestConsumeService("testClientId", messageListener, + new CurrentThreadTaskScheduler(), new CancellationToken()); + var digest = new Proto.Digest { Type = Proto.DigestType.Crc32, Checksum = "9EF61F95" }; + var systemProperties = new Proto.SystemProperties + { + MessageType = Proto.MessageType.Normal, + MessageId = MessageIdGenerator.GetInstance().Next(), + BornHost = "127.0.0.1:8080", + BodyDigest = digest, + BornTimestamp = new Timestamp() + }; + var body = ByteString.CopyFrom("foobar", Encoding.UTF8); + var message = new Proto.Message + { + SystemProperties = systemProperties, + Topic = new Proto.Resource { Name = "testTopic" }, + Body = body + }; + var messageView = MessageView.FromProtobuf(message); + Assert.AreEqual(ConsumeResult.SUCCESS, consumeService.Consume(messageView).Result); + } + + private class TestSuccessMessageListener : IMessageListener + { + public ConsumeResult Consume(MessageView messageView) + { + return ConsumeResult.SUCCESS; + } + } + + [TestMethod] + public void TestConsumeFailure() + { + var messageListener = new TestFailureMessageListener(); + var consumeService = new TestConsumeService("testClientId", messageListener, + new CurrentThreadTaskScheduler(), new CancellationToken()); + var digest = new Proto.Digest { Type = Proto.DigestType.Crc32, Checksum = "9EF61F95" }; + var systemProperties = new Proto.SystemProperties + { + MessageType = Proto.MessageType.Normal, + MessageId = MessageIdGenerator.GetInstance().Next(), + BornHost = "127.0.0.1:8080", + BodyDigest = digest, + BornTimestamp = new Timestamp() + }; + var body = ByteString.CopyFrom("foobar", Encoding.UTF8); + var message = new Proto.Message + { + SystemProperties = systemProperties, + Topic = new Proto.Resource { Name = "testTopic" }, + Body = body + }; + var messageView = MessageView.FromProtobuf(message); + Assert.AreEqual(ConsumeResult.FAILURE, consumeService.Consume(messageView).Result); + } + + private class TestFailureMessageListener : IMessageListener + { + public ConsumeResult Consume(MessageView messageView) + { + return ConsumeResult.FAILURE; + } + } + + [TestMethod] + public void TestConsumeWithException() + { + var messageListener = new TestExceptionMessageListener(); + var consumeService = new TestConsumeService("testClientId", messageListener, + new CurrentThreadTaskScheduler(), new CancellationToken()); + var digest = new Proto.Digest { Type = Proto.DigestType.Crc32, Checksum = "9EF61F95" }; + var systemProperties = new Proto.SystemProperties + { + MessageType = Proto.MessageType.Normal, + MessageId = MessageIdGenerator.GetInstance().Next(), + BornHost = "127.0.0.1:8080", + BodyDigest = digest, + BornTimestamp = new Timestamp() + }; + var body = ByteString.CopyFrom("foobar", Encoding.UTF8); + var message = new Proto.Message + { + SystemProperties = systemProperties, + Topic = new Proto.Resource { Name = "testTopic" }, + Body = body + }; + var messageView = MessageView.FromProtobuf(message); + Assert.AreEqual(ConsumeResult.FAILURE, consumeService.Consume(messageView).Result); + } + + private class TestExceptionMessageListener : IMessageListener + { + public ConsumeResult Consume(MessageView messageView) + { + throw new Exception(); + } + } + + [TestMethod] + public void TestConsumeWithDelay() + { + var messageListener = new TestSuccessMessageListener(); + var consumeService = new TestConsumeService("testClientId", messageListener, + new CurrentThreadTaskScheduler(), new CancellationToken()); + var digest = new Proto.Digest { Type = Proto.DigestType.Crc32, Checksum = "9EF61F95" }; + var systemProperties = new Proto.SystemProperties + { + MessageType = Proto.MessageType.Normal, + MessageId = MessageIdGenerator.GetInstance().Next(), + BornHost = "127.0.0.1:8080", + BodyDigest = digest, + BornTimestamp = new Timestamp() + }; + var body = ByteString.CopyFrom("foobar", Encoding.UTF8); + var message = new Proto.Message + { + SystemProperties = systemProperties, + Topic = new Proto.Resource { Name = "testTopic" }, + Body = body + }; + var messageView = MessageView.FromProtobuf(message); + Assert.AreEqual(ConsumeResult.SUCCESS, + consumeService.Consume(messageView, TimeSpan.FromMilliseconds(500)).Result); + } + + private class TestConsumeService : ConsumeService + { + public TestConsumeService(string clientId, IMessageListener messageListener, + TaskScheduler consumptionTaskScheduler, CancellationToken consumptionCtsToken) : + base(clientId, messageListener, consumptionTaskScheduler, consumptionCtsToken) + { + } + + public override Task Consume(ProcessQueue pq, List messageViews) + { + return Task.FromResult(0); + } + } + } +} \ No newline at end of file diff --git a/csharp/tests/ConsumerTest.cs b/csharp/tests/ConsumerTest.cs new file mode 100644 index 00000000..a4b1494b --- /dev/null +++ b/csharp/tests/ConsumerTest.cs @@ -0,0 +1,145 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +using System; +using System.Collections.Concurrent; +using System.Collections.Generic; +using System.Runtime.CompilerServices; +using System.Text; +using System.Threading.Tasks; +using Google.Protobuf; +using Google.Protobuf.WellKnownTypes; +using Microsoft.VisualStudio.TestTools.UnitTesting; +using Moq; +using Org.Apache.Rocketmq; +using Proto = Apache.Rocketmq.V2; + +[assembly:InternalsVisibleTo("tests")] +[assembly: InternalsVisibleTo("DynamicProxyGenAssembly2")] +namespace tests +{ + [TestClass] + public class ConsumerTest + { + [TestMethod] + public async Task TestReceiveMessage() + { + var maxCacheMessageCount = 8; + var maxCacheMessageSizeInBytes = 1024; + var consumptionThreadCount = 4; + + var consumer = + CreateTestClient(maxCacheMessageCount, maxCacheMessageSizeInBytes, consumptionThreadCount); + var mockClientManager = new Mock(); + consumer.SetClientManager(mockClientManager.Object); + + var digest = new Proto.Digest { Type = Proto.DigestType.Crc32, Checksum = "00000000" }; + var systemProperties = new Proto.SystemProperties + { + MessageType = Proto.MessageType.Normal, + MessageId = MessageIdGenerator.GetInstance().Next(), + BornHost = "127.0.0.1", + BodyDigest = digest, + BornTimestamp = new Timestamp() + }; + var body = ByteString.CopyFrom("foobar", Encoding.UTF8); + var message = new Proto.Message + { + SystemProperties = systemProperties, + Topic = new Proto.Resource + { + ResourceNamespace = "testNamespace", + Name = "testTopic" + }, + Body = body + }; + var receiveMessageResponse0 = new Proto.ReceiveMessageResponse + { + Status = new Proto.Status + { + Code = Proto.Code.Ok + } + }; + var receiveMessageResponse1 = new Proto.ReceiveMessageResponse + { + Message = message + }; + var metadata = consumer.Sign(); + var receiveMessageResponseList = new List + { receiveMessageResponse0, receiveMessageResponse1 }; + var receiveMessageInvocation = + new RpcInvocation>(null, + receiveMessageResponseList, metadata); + mockClientManager.Setup(cm => cm.ReceiveMessage(It.IsAny(), + It.IsAny(), It.IsAny())) + .Returns(Task.FromResult(receiveMessageInvocation)); + + var receivedMessageCount = 1; + var mq = new Proto.MessageQueue + { + Broker = new Proto.Broker + { + Name = "broker0", + Endpoints = new Proto.Endpoints + { + Scheme = Proto.AddressScheme.Ipv4, + Addresses = + { + new Proto.Address + { + Host = "127.0.0.1", + Port = 8080 + } + } + } + }, + Id = 0, + Permission = Proto.Permission.ReadWrite, + Topic = new Proto.Resource + { + ResourceNamespace = "testNamespace", + Name = "testTopic", + }, + AcceptMessageTypes = { Proto.MessageType.Normal } + }; + var request = consumer.WrapReceiveMessageRequest(1, new MessageQueue(mq), new FilterExpression("*"), + TimeSpan.FromSeconds(15), Guid.NewGuid().ToString()); + var receiveMessageResult = await consumer.ReceiveMessage(request, new MessageQueue(mq), + TimeSpan.FromSeconds(15)); + Assert.AreEqual(receiveMessageResult.Messages.Count, receivedMessageCount); + } + + private PushConsumer CreateTestClient(int maxCacheMessageCount, int maxCacheMessageSizeInBytes, + int consumptionThreadCount) + { + var clientConfig = new ClientConfig.Builder() + .SetEndpoints("127.0.0.1:9876") + .Build(); + return new PushConsumer(clientConfig, "testGroup", + new ConcurrentDictionary(), new TestMessageListener(), + maxCacheMessageCount, maxCacheMessageSizeInBytes, consumptionThreadCount); + } + + private class TestMessageListener : IMessageListener + { + public ConsumeResult Consume(MessageView messageView) + { + return ConsumeResult.SUCCESS; + } + } + } +} \ No newline at end of file diff --git a/csharp/tests/CustomizedBackoffRetryPolicyTest.cs b/csharp/tests/CustomizedBackoffRetryPolicyTest.cs new file mode 100644 index 00000000..6ab58a10 --- /dev/null +++ b/csharp/tests/CustomizedBackoffRetryPolicyTest.cs @@ -0,0 +1,228 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +using System; +using System.Collections.Generic; +using System.Linq; +using Apache.Rocketmq.V2; +using Google.Protobuf.WellKnownTypes; +using Microsoft.VisualStudio.TestTools.UnitTesting; +using Org.Apache.Rocketmq; + +namespace tests +{ + [TestClass] + public class CustomizedBackoffRetryPolicyTest + { + [TestMethod] + public void TestConstructWithValidDurationsAndMaxAttempts() + { + // Arrange + var durations = new List { TimeSpan.FromSeconds(1), TimeSpan.FromSeconds(2) }; + var maxAttempts = 3; + + // Act + var policy = new CustomizedBackoffRetryPolicy(durations, maxAttempts); + + // Assert + Assert.AreEqual(maxAttempts, policy.GetMaxAttempts()); + CollectionAssert.AreEqual(durations, policy.GetDurations()); + } + + [TestMethod] + [ExpectedException(typeof(ArgumentException))] + public void TestConstructWithEmptyDurations() + { + // Arrange & Act + new CustomizedBackoffRetryPolicy(new List(), 3); + + // Assert is handled by ExpectedException + } + + [TestMethod] + public void TestGetNextAttemptDelayWithValidAttempts() + { + // Arrange + var durations = new List { TimeSpan.FromSeconds(1), TimeSpan.FromSeconds(3), TimeSpan.FromSeconds(5) }; + var policy = new CustomizedBackoffRetryPolicy(durations, 5); + + // Act & Assert + Assert.AreEqual(TimeSpan.FromSeconds(1), policy.GetNextAttemptDelay(1)); + Assert.AreEqual(TimeSpan.FromSeconds(3), policy.GetNextAttemptDelay(2)); + Assert.AreEqual(TimeSpan.FromSeconds(5), policy.GetNextAttemptDelay(3)); + Assert.AreEqual(TimeSpan.FromSeconds(5), policy.GetNextAttemptDelay(4)); // Should inherit the last duration + } + + [TestMethod] + [ExpectedException(typeof(ArgumentException))] + public void TestGetNextAttemptDelayWithInvalidAttempt() + { + // Arrange + var durations = new List { TimeSpan.FromSeconds(1), TimeSpan.FromSeconds(2) }; + var policy = new CustomizedBackoffRetryPolicy(durations, 3); + + // Act + policy.GetNextAttemptDelay(0); + + // Assert is handled by ExpectedException + } + + [TestMethod] + [ExpectedException(typeof(ArgumentException))] + public void TestGetNextAttemptDelayWithNegativeAttempt() + { + // Arrange + var durations = new List { TimeSpan.FromSeconds(1), TimeSpan.FromSeconds(2) }; + var policy = new CustomizedBackoffRetryPolicy(durations, 3); + + // Act + policy.GetNextAttemptDelay(-1); + + // Assert is handled by ExpectedException + } + + [TestMethod] + public void TestFromProtobufWithValidRetryPolicy() + { + // Arrange + var protoDurations = new List + { + Duration.FromTimeSpan(TimeSpan.FromSeconds(1)), + Duration.FromTimeSpan(TimeSpan.FromSeconds(2)) + }; + var protoRetryPolicy = new RetryPolicy + { + MaxAttempts = 3, + CustomizedBackoff = new CustomizedBackoff { Next = { protoDurations } }, + }; + + // Act + var policy = CustomizedBackoffRetryPolicy.FromProtobuf(protoRetryPolicy); + + // Assert + Assert.AreEqual(3, policy.GetMaxAttempts()); + Assert.AreEqual(protoDurations.Count, policy.GetDurations().Count); + foreach (var (expected, actual) in protoDurations.Zip(policy.GetDurations(), Tuple.Create)) + { + Assert.AreEqual(expected.ToTimeSpan(), actual); + } + } + + [TestMethod] + [ExpectedException(typeof(ArgumentException))] + public void TestFromProtobufWithInvalidRetryPolicy() + { + var initialBackoff0 = Duration.FromTimeSpan(TimeSpan.FromSeconds(1)); + var maxBackoff0 = Duration.FromTimeSpan(TimeSpan.FromSeconds(1)); + var backoffMultiplier = 1.0f; + var maxAttempts = 3; + + var exponentialBackoff = new ExponentialBackoff + { + Initial = initialBackoff0, + Max = maxBackoff0, + Multiplier = backoffMultiplier + }; + + var retryPolicy = new RetryPolicy + { + MaxAttempts = maxAttempts, + ExponentialBackoff = exponentialBackoff + }; + + CustomizedBackoffRetryPolicy.FromProtobuf(retryPolicy); + } + + [TestMethod] + public void ToProtobuf_ShouldReturnCorrectProtobuf() + { + // Arrange + var durations = new List { TimeSpan.FromSeconds(1), TimeSpan.FromSeconds(2) }; + var maxAttempts = 3; + var policy = new CustomizedBackoffRetryPolicy(durations, maxAttempts); + + // Act + var proto = policy.ToProtobuf(); + + // Assert + Assert.AreEqual(maxAttempts, proto.MaxAttempts); + Assert.AreEqual(durations.Count, proto.CustomizedBackoff.Next.Count); + for (var i = 0; i < durations.Count; i++) + { + Assert.AreEqual(durations[i], proto.CustomizedBackoff.Next[i].ToTimeSpan()); + } + } + + [TestMethod] + public void TestInheritBackoffWithValidCustomizedBackoffPolicy() + { + // Arrange + var originalDurations = new List { TimeSpan.FromSeconds(1), TimeSpan.FromSeconds(3) }; + var newDurations = new List + { + Duration.FromTimeSpan(TimeSpan.FromSeconds(2)), + Duration.FromTimeSpan(TimeSpan.FromSeconds(4)) + }; + var backoff = new CustomizedBackoff { Next = { newDurations } }; + var retryPolicy = new RetryPolicy + { + MaxAttempts = 5, + CustomizedBackoff = backoff, + }; + var policy = new CustomizedBackoffRetryPolicy(originalDurations, 5); + + // Act + var inheritedPolicy = policy.InheritBackoff(retryPolicy); + + // Assert + Assert.IsTrue(inheritedPolicy is CustomizedBackoffRetryPolicy); + var customizedBackoffRetryPolicy = (CustomizedBackoffRetryPolicy) inheritedPolicy; + Assert.AreEqual(policy.GetMaxAttempts(), inheritedPolicy.GetMaxAttempts()); + var inheritedDurations = customizedBackoffRetryPolicy.GetDurations(); + Assert.AreEqual(newDurations.Count, inheritedDurations.Count); + for (var i = 0; i < newDurations.Count; i++) + { + Assert.AreEqual(newDurations[i].ToTimeSpan(), inheritedDurations[i]); + } + } + + [TestMethod] + [ExpectedException(typeof(InvalidOperationException))] + public void TestInheritBackoffWithInvalidPolicy() + { + var maxAttempt = 3; + + var durations1 = new List + { + TimeSpan.FromSeconds(3), + TimeSpan.FromSeconds(2), + TimeSpan.FromSeconds(1) + }; + + var retryPolicy0 = new CustomizedBackoffRetryPolicy(durations1, maxAttempt); + + var exponentialBackoff = new ExponentialBackoff(); + + var retryPolicy = new RetryPolicy + { + ExponentialBackoff = exponentialBackoff, + }; + + retryPolicy0.InheritBackoff(retryPolicy); + } + } +} \ No newline at end of file diff --git a/csharp/tests/EncodingTest.cs b/csharp/tests/EncodingTest.cs new file mode 100644 index 00000000..198fec92 --- /dev/null +++ b/csharp/tests/EncodingTest.cs @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +using System; +using Microsoft.VisualStudio.TestTools.UnitTesting; +using Org.Apache.Rocketmq; + +namespace tests +{ + [TestClass] + public class EncodingTest + { + [TestMethod] + public void TestToProtobuf() + { + Assert.AreEqual(EncodingHelper.ToProtobuf(MqEncoding.Identity), Apache.Rocketmq.V2.Encoding.Identity); + Assert.AreEqual(EncodingHelper.ToProtobuf(MqEncoding.Gzip), Apache.Rocketmq.V2.Encoding.Gzip); + } + } +} \ No newline at end of file diff --git a/csharp/tests/ExponentialBackoffRetryPolicyTest.cs b/csharp/tests/ExponentialBackoffRetryPolicyTest.cs new file mode 100644 index 00000000..bf0a8e1c --- /dev/null +++ b/csharp/tests/ExponentialBackoffRetryPolicyTest.cs @@ -0,0 +1,181 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +using System; +using Apache.Rocketmq.V2; +using Google.Protobuf.WellKnownTypes; +using Microsoft.VisualStudio.TestTools.UnitTesting; +using Org.Apache.Rocketmq; + +namespace tests +{ + [TestClass] + public class ExponentialBackoffRetryPolicyTest + { + + [TestMethod] + public void TestNextAttemptDelayForImmediatelyRetryPolicy() + { + var retryPolicy = ExponentialBackoffRetryPolicy.ImmediatelyRetryPolicy(3); + Assert.AreEqual(TimeSpan.Zero, retryPolicy.GetNextAttemptDelay(1)); + Assert.AreEqual(TimeSpan.Zero, retryPolicy.GetNextAttemptDelay(2)); + Assert.AreEqual(TimeSpan.Zero, retryPolicy.GetNextAttemptDelay(3)); + Assert.AreEqual(TimeSpan.Zero, retryPolicy.GetNextAttemptDelay(4)); + } + + [TestMethod] + [ExpectedException(typeof(ArgumentException))] + public void TestGetNextAttemptDelayWithIllegalAttempt() + { + var initialBackoff = TimeSpan.FromMilliseconds(5); + var maxBackoff = TimeSpan.FromSeconds(1); + double backoffMultiplier = 5; + var retryPolicy = new ExponentialBackoffRetryPolicy(3, initialBackoff, maxBackoff, backoffMultiplier); + retryPolicy.GetNextAttemptDelay(0); + } + + [TestMethod] + public void TestGetNextAttemptDelay() + { + var initialBackoff = TimeSpan.FromMilliseconds(5); + var maxBackoff = TimeSpan.FromSeconds(1); + double backoffMultiplier = 5; + var retryPolicy = new ExponentialBackoffRetryPolicy(3, initialBackoff, maxBackoff, backoffMultiplier); + + Assert.AreEqual(TimeSpan.FromMilliseconds(5), retryPolicy.GetNextAttemptDelay(1)); + Assert.AreEqual(TimeSpan.FromMilliseconds(25), retryPolicy.GetNextAttemptDelay(2)); + Assert.AreEqual(TimeSpan.FromMilliseconds(125), retryPolicy.GetNextAttemptDelay(3)); + Assert.AreEqual(TimeSpan.FromMilliseconds(625), retryPolicy.GetNextAttemptDelay(4)); + Assert.AreEqual(TimeSpan.FromSeconds(1), retryPolicy.GetNextAttemptDelay(5)); + } + + [TestMethod] + public void TestFromProtobuf() + { + var initialBackoff = TimeSpan.FromMilliseconds(5); + var maxBackoff = TimeSpan.FromSeconds(1); + var initialBackoffProto = Duration.FromTimeSpan(initialBackoff); + var maxBackoffProto = Duration.FromTimeSpan(maxBackoff); + var backoffMultiplier = 5; + var maxAttempts = 3; + var exponentialBackoff = new ExponentialBackoff + { + Initial = initialBackoffProto, + Max = maxBackoffProto, + Multiplier = backoffMultiplier + }; + var retryPolicyProto = new RetryPolicy + { + MaxAttempts = maxAttempts, + ExponentialBackoff = exponentialBackoff + }; + + var policy = ExponentialBackoffRetryPolicy.FromProtobuf(retryPolicyProto); + Assert.AreEqual(maxAttempts, policy.GetMaxAttempts()); + Assert.AreEqual(initialBackoff, policy.InitialBackoff); + Assert.AreEqual(maxBackoff, policy.MaxBackoff); + Assert.AreEqual(backoffMultiplier, policy.BackoffMultiplier); + } + + [TestMethod] + [ExpectedException(typeof(ArgumentException))] + public void TestFromProtobufWithoutExponentialBackoff() + { + var maxAttempts = 3; + var customizedBackoff = new CustomizedBackoff(); + var retryPolicyProto = new RetryPolicy + { + MaxAttempts = maxAttempts, + CustomizedBackoff = customizedBackoff + }; + + ExponentialBackoffRetryPolicy.FromProtobuf(retryPolicyProto); + } + + [TestMethod] + public void TestToProtobuf() + { + var initialBackoff = TimeSpan.FromMilliseconds(5); + var maxBackoff = TimeSpan.FromSeconds(1); + double backoffMultiplier = 5; + int maxAttempts = 3; + var retryPolicy = new ExponentialBackoffRetryPolicy(maxAttempts, initialBackoff, maxBackoff, backoffMultiplier); + var retryPolicyProto = retryPolicy.ToProtobuf(); + + Assert.IsNotNull(retryPolicyProto.ExponentialBackoff); + var exponentialBackoff = retryPolicyProto.ExponentialBackoff; + var initialBackoffProto = Duration.FromTimeSpan(initialBackoff); + var maxBackoffProto = Duration.FromTimeSpan(maxBackoff); + Assert.AreEqual(exponentialBackoff.Initial, initialBackoffProto); + Assert.AreEqual(exponentialBackoff.Max, maxBackoffProto); + Assert.AreEqual(exponentialBackoff.Multiplier, backoffMultiplier); + Assert.AreEqual(retryPolicyProto.MaxAttempts, maxAttempts); + } + + [TestMethod] + public void TestInheritBackoff() + { + var initialBackoff = TimeSpan.FromMilliseconds(5); + var maxBackoff = TimeSpan.FromSeconds(1); + double backoffMultiplier = 5; + var maxAttempts = 3; + var retryPolicy = new ExponentialBackoffRetryPolicy(maxAttempts, initialBackoff, maxBackoff, backoffMultiplier); + + var initialBackoffProto = TimeSpan.FromMilliseconds(10); + var maxBackoffProto = TimeSpan.FromSeconds(3); + double backoffMultiplierProto = 10; + var exponentialBackoff = new ExponentialBackoff + { + Initial = Duration.FromTimeSpan(initialBackoffProto), + Max = Duration.FromTimeSpan(maxBackoffProto), + Multiplier = (float)backoffMultiplierProto + }; + var retryPolicyProto = new RetryPolicy + { + ExponentialBackoff = exponentialBackoff + }; + + var inheritedRetryPolicy = retryPolicy.InheritBackoff(retryPolicyProto); + Assert.IsInstanceOfType(inheritedRetryPolicy, typeof(ExponentialBackoffRetryPolicy)); + var exponentialBackoffRetryPolicy = (ExponentialBackoffRetryPolicy)inheritedRetryPolicy; + + Assert.AreEqual(initialBackoffProto, exponentialBackoffRetryPolicy.InitialBackoff); + Assert.AreEqual(maxBackoffProto, exponentialBackoffRetryPolicy.MaxBackoff); + Assert.AreEqual(backoffMultiplierProto, exponentialBackoffRetryPolicy.BackoffMultiplier); + } + + [TestMethod] + [ExpectedException(typeof(InvalidOperationException))] + public void TestInheritBackoffWithoutExponentialBackoff() + { + var maxAttempts = 3; + var customizedBackoff = new CustomizedBackoff(); + var retryPolicyProto = new RetryPolicy + { + MaxAttempts = maxAttempts, + CustomizedBackoff = customizedBackoff + }; + + var initialBackoff = TimeSpan.FromMilliseconds(5); + var maxBackoff = TimeSpan.FromSeconds(1); + double backoffMultiplier = 5; + var exponentialBackoffRetryPolicy = new ExponentialBackoffRetryPolicy(maxAttempts, initialBackoff, maxBackoff, backoffMultiplier); + + exponentialBackoffRetryPolicy.InheritBackoff(retryPolicyProto); + } + } +} \ No newline at end of file diff --git a/csharp/tests/MessageViewTest.cs b/csharp/tests/MessageViewTest.cs new file mode 100644 index 00000000..355497c3 --- /dev/null +++ b/csharp/tests/MessageViewTest.cs @@ -0,0 +1,201 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +using System; +using System.Text; +using Google.Protobuf; +using Google.Protobuf.WellKnownTypes; +using Microsoft.VisualStudio.TestTools.UnitTesting; +using Org.Apache.Rocketmq; +using Proto = Apache.Rocketmq.V2; + +namespace tests +{ + [TestClass] + public class MessageViewTests + { + private const string FakeHost = "127.0.0.1"; + private const string FakeTopic = "test-topic"; + + [TestMethod] + public void TestFromProtobufWithCrc32() + { + var digest = new Proto.Digest { Type = Proto.DigestType.Crc32, Checksum = "9EF61F95" }; + var systemProperties = new Proto.SystemProperties + { + MessageType = Proto.MessageType.Normal, + MessageId = MessageIdGenerator.GetInstance().Next(), + BornHost = FakeHost, + BodyDigest = digest, + BornTimestamp = new Timestamp() + }; + var body = ByteString.CopyFrom("foobar", Encoding.UTF8); + var message = new Proto.Message + { + SystemProperties = systemProperties, + Topic = new Proto.Resource { Name = FakeTopic }, + Body = body + }; + + var messageView = MessageView.FromProtobuf(message); + + CollectionAssert.AreEqual(body.ToByteArray(), messageView.Body); + Assert.AreEqual(FakeTopic, messageView.Topic); + Assert.AreEqual(FakeHost, messageView.BornHost); + Assert.IsFalse(messageView.IsCorrupted()); + } + + [TestMethod] + public void TestFromProtobufWithWrongCrc32() + { + var digest = new Proto.Digest { Type = Proto.DigestType.Crc32, Checksum = "00000000" }; + var systemProperties = new Proto.SystemProperties + { + MessageType = Proto.MessageType.Normal, + MessageId = MessageIdGenerator.GetInstance().Next(), + BornHost = FakeHost, + BodyDigest = digest, + BornTimestamp = new Timestamp() + }; + var body = ByteString.CopyFrom("foobar", Encoding.UTF8); + var message = new Proto.Message + { + SystemProperties = systemProperties, + Topic = new Proto.Resource { Name = FakeTopic }, + Body = body + }; + + var messageView = MessageView.FromProtobuf(message); + + CollectionAssert.AreEqual(body.ToByteArray(), messageView.Body); + Assert.AreEqual(FakeTopic, messageView.Topic); + Assert.IsTrue(messageView.IsCorrupted()); + } + + [TestMethod] + public void TestFromProtobufWithMd5() + { + var digest = new Proto.Digest + { Type = Proto.DigestType.Md5, Checksum = "3858F62230AC3C915F300C664312C63F" }; + var systemProperties = new Proto.SystemProperties + { + MessageType = Proto.MessageType.Normal, + MessageId = MessageIdGenerator.GetInstance().Next(), + BornHost = FakeHost, + BodyDigest = digest, + BornTimestamp = new Timestamp() + }; + var body = ByteString.CopyFrom("foobar", Encoding.UTF8); + var message = new Proto.Message + { + SystemProperties = systemProperties, + Topic = new Proto.Resource { Name = FakeTopic }, + Body = body + }; + + var messageView = MessageView.FromProtobuf(message); + + CollectionAssert.AreEqual(body.ToByteArray(), messageView.Body); + Assert.AreEqual(FakeTopic, messageView.Topic); + Assert.IsFalse(messageView.IsCorrupted()); + } + + [TestMethod] + public void TestFromProtobufWithWrongMd5() + { + var digest = new Proto.Digest + { Type = Proto.DigestType.Md5, Checksum = "00000000000000000000000000000000" }; + var systemProperties = new Proto.SystemProperties + { + MessageType = Proto.MessageType.Normal, + MessageId = MessageIdGenerator.GetInstance().Next(), + BornHost = FakeHost, + BodyDigest = digest, + BornTimestamp = new Timestamp() + }; + var body = ByteString.CopyFrom("foobar", Encoding.UTF8); + var message = new Proto.Message + { + SystemProperties = systemProperties, + Topic = new Proto.Resource { Name = FakeTopic }, + Body = body + }; + + var messageView = MessageView.FromProtobuf(message); + + CollectionAssert.AreEqual(body.ToByteArray(), messageView.Body); + Assert.AreEqual(FakeTopic, messageView.Topic); + Assert.IsTrue(messageView.IsCorrupted()); + } + + [TestMethod] + public void TestFromProtobufWithSha1() + { + var digest = new Proto.Digest + { Type = Proto.DigestType.Sha1, Checksum = "8843D7F92416211DE9EBB963FF4CE28125932878" }; + var systemProperties = new Proto.SystemProperties + { + MessageType = Proto.MessageType.Normal, + MessageId = MessageIdGenerator.GetInstance().Next(), + BornHost = FakeHost, + BodyDigest = digest, + BornTimestamp = new Timestamp() + }; + var body = ByteString.CopyFrom("foobar", Encoding.UTF8); + var message = new Proto.Message + { + SystemProperties = systemProperties, + Topic = new Proto.Resource { Name = FakeTopic }, + Body = body + }; + + var messageView = MessageView.FromProtobuf(message); + + CollectionAssert.AreEqual(body.ToByteArray(), messageView.Body); + Assert.AreEqual(FakeTopic, messageView.Topic); + Assert.IsFalse(messageView.IsCorrupted()); + } + + [TestMethod] + public void TestFromProtobufWithWrongSha1() + { + var digest = new Proto.Digest + { Type = Proto.DigestType.Sha1, Checksum = "0000000000000000000000000000000000000000" }; + var systemProperties = new Proto.SystemProperties + { + MessageType = Proto.MessageType.Normal, + MessageId = MessageIdGenerator.GetInstance().Next(), + BornHost = FakeHost, + BodyDigest = digest, + BornTimestamp = new Timestamp() + }; + var body = ByteString.CopyFrom("foobar", Encoding.UTF8); + var message = new Proto.Message + { + SystemProperties = systemProperties, + Topic = new Proto.Resource { Name = FakeTopic }, + Body = body + }; + + var messageView = MessageView.FromProtobuf(message); + + CollectionAssert.AreEqual(body.ToByteArray(), messageView.Body); + Assert.AreEqual(FakeTopic, messageView.Topic); + Assert.IsTrue(messageView.IsCorrupted()); + } + } +} diff --git a/csharp/tests/ProcessQueueTest.cs b/csharp/tests/ProcessQueueTest.cs new file mode 100644 index 00000000..db7cc4b2 --- /dev/null +++ b/csharp/tests/ProcessQueueTest.cs @@ -0,0 +1,319 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +using System; +using System.Collections.Concurrent; +using System.Collections.Generic; +using System.Text; +using System.Threading; +using System.Threading.Tasks; +using Google.Protobuf; +using Google.Protobuf.WellKnownTypes; +using Microsoft.VisualStudio.TestTools.UnitTesting; +using Moq; +using Org.Apache.Rocketmq; +using Proto = Apache.Rocketmq.V2; + +namespace tests +{ + [TestClass] + public class ProcessQueueTest + { + [TestMethod] + public void TestExpired() + { + var pushConsumer = CreatePushConsumer("testTopic", 8, 1024, + 4); + pushConsumer.State = State.Running; + var processQueue = CreateProcessQueue(pushConsumer); + Assert.IsFalse(processQueue.Expired()); + } + + [TestMethod] + public async Task TestReceiveMessageImmediately() + { + var pushConsumer = CreatePushConsumer("testTopic", 8, 1024, + 4); + pushConsumer.State = State.Running; + var processQueue = CreateProcessQueue(pushConsumer); + var mockClientManager = new Mock(); + pushConsumer.SetClientManager(mockClientManager.Object); + + var digest = new Proto.Digest { Type = Proto.DigestType.Crc32, Checksum = "00000000" }; + var systemProperties = new Proto.SystemProperties + { + MessageType = Proto.MessageType.Normal, + MessageId = MessageIdGenerator.GetInstance().Next(), + BornHost = "127.0.0.1", + BodyDigest = digest, + BornTimestamp = new Timestamp() + }; + var body = ByteString.CopyFrom("foobar", Encoding.UTF8); + var message = new Proto.Message + { + SystemProperties = systemProperties, + Topic = new Proto.Resource + { + ResourceNamespace = "testNamespace", + Name = "testTopic" + }, + Body = body + }; + var receiveMessageResponse0 = new Proto.ReceiveMessageResponse + { + Status = new Proto.Status + { + Code = Proto.Code.Ok + } + }; + var receiveMessageResponse1 = new Proto.ReceiveMessageResponse + { + Message = message + }; + var metadata = pushConsumer.Sign(); + var receiveMessageResponseList = new List + { receiveMessageResponse0, receiveMessageResponse1 }; + var receiveMessageInvocation = + new RpcInvocation>(null, + receiveMessageResponseList, metadata); + mockClientManager.Setup(cm => cm.ReceiveMessage(It.IsAny(), + It.IsAny(), It.IsAny())) + .Returns(Task.FromResult(receiveMessageInvocation)); + + var receivedMessageCount = 1; + var mq = new Proto.MessageQueue + { + Broker = new Proto.Broker + { + Name = "broker0", + Endpoints = new Proto.Endpoints + { + Scheme = Proto.AddressScheme.Ipv4, + Addresses = + { + new Proto.Address + { + Host = "127.0.0.1", + Port = 8080 + } + } + } + }, + Id = 0, + Permission = Proto.Permission.ReadWrite, + Topic = new Proto.Resource + { + ResourceNamespace = "testNamespace", + Name = "testTopic", + }, + AcceptMessageTypes = { Proto.MessageType.Normal } + }; + await Task.WhenAny(processQueue.FetchMessageImmediately(), Task.Delay(3000)); + Assert.AreEqual(processQueue.GetCachedMessageCount(), receivedMessageCount); + } + + [TestMethod] + public async Task TestEraseMessageWithConsumeOk() + { + var pushConsumer = CreatePushConsumer("testTopic", 8, 1024, + 4); + pushConsumer.State = State.Running; + var messageView = MessageView.FromProtobuf(CreateMessage(), new MessageQueue(new Proto.MessageQueue + { + Broker = new Proto.Broker + { + Name = "broker0", + Endpoints = new Proto.Endpoints + { + Scheme = Proto.AddressScheme.Ipv4, + Addresses = + { + new Proto.Address + { + Host = "127.0.0.1", + Port = 8080 + } + } + } + }, + Id = 0, + Permission = Proto.Permission.ReadWrite, + Topic = new Proto.Resource + { + ResourceNamespace = "testNamespace", + Name = "testTopic", + } + })); + var messageViewList = new List { messageView }; + var mockClientManager = new Mock(); + pushConsumer.SetClientManager(mockClientManager.Object); + var metadata = pushConsumer.Sign(); + var ackMessageResponse = new Proto.AckMessageResponse + { + Status = new Proto.Status + { + Code = Proto.Code.Ok + } + }; + var ackMessageInvocation = new RpcInvocation(null, + ackMessageResponse, metadata); + mockClientManager.Setup(cm => cm.AckMessage(It.IsAny(), + It.IsAny(), It.IsAny())) + .Returns(Task.FromResult(ackMessageInvocation)); + var processQueue = CreateProcessQueue(pushConsumer); + processQueue.CacheMessages(messageViewList); + await processQueue.EraseMessage(messageView, ConsumeResult.SUCCESS); + mockClientManager.Verify(cm => cm.AckMessage(It.IsAny(), + It.IsAny(), It.IsAny()), Times.Once); + } + + [TestMethod] + public async Task TestEraseMessageWithFailure() + { + var pushConsumer = CreatePushConsumer("testTopic", 8, 1024, + 4); + pushConsumer.State = State.Running; + var messageView = MessageView.FromProtobuf(CreateMessage(), new MessageQueue(new Proto.MessageQueue + { + Broker = new Proto.Broker + { + Name = "broker0", + Endpoints = new Proto.Endpoints + { + Scheme = Proto.AddressScheme.Ipv4, + Addresses = + { + new Proto.Address + { + Host = "127.0.0.1", + Port = 8080 + } + } + } + }, + Id = 0, + Permission = Proto.Permission.ReadWrite, + Topic = new Proto.Resource + { + ResourceNamespace = "testNamespace", + Name = "testTopic", + } + })); + var messageViewList = new List { messageView }; + var mockClientManager = new Mock(); + pushConsumer.SetClientManager(mockClientManager.Object); + var metadata = pushConsumer.Sign(); + var ackMessageResponse = new Proto.AckMessageResponse + { + Status = new Proto.Status + { + Code = Proto.Code.InternalServerError + } + }; + var ackMessageInvocation = new RpcInvocation(null, + ackMessageResponse, metadata); + mockClientManager.Setup(cm => cm.AckMessage(It.IsAny(), + It.IsAny(), It.IsAny())) + .Returns(Task.FromResult(ackMessageInvocation)); + var processQueue = CreateProcessQueue(pushConsumer); + processQueue.CacheMessages(messageViewList); + var ackTimes = 3; + await Task.WhenAny(processQueue.EraseMessage(messageView, ConsumeResult.SUCCESS), + Task.Delay(ProcessQueue.AckMessageFailureBackoffDelay.Multiply(2))); + mockClientManager.Verify(cm => cm.AckMessage(It.IsAny(), + It.IsAny(), It.IsAny()), Times.Exactly(ackTimes)); + } + + private static ProcessQueue CreateProcessQueue(PushConsumer pushConsumer) + { + var mq = new Proto.MessageQueue + { + Broker = new Proto.Broker + { + Name = "broker0", + Endpoints = new Proto.Endpoints + { + Scheme = Proto.AddressScheme.Ipv4, + Addresses = + { + new Proto.Address + { + Host = "127.0.0.1", + Port = 8080 + } + } + } + }, + Id = 0, + Permission = Proto.Permission.ReadWrite, + Topic = new Proto.Resource + { + ResourceNamespace = "testNamespace", + Name = "testTopic", + }, + AcceptMessageTypes = { Proto.MessageType.Normal } + }; + var processQueue = new ProcessQueue(pushConsumer, new MessageQueue(mq), + pushConsumer.GetSubscriptionExpressions()["testTopic"], new CancellationTokenSource(), + new CancellationTokenSource(), new CancellationTokenSource(), + new CancellationTokenSource()); + return processQueue; + } + + private Proto.Message CreateMessage() + { + var digest = new Proto.Digest { Type = Proto.DigestType.Crc32, Checksum = "9EF61F95" }; + var systemProperties = new Proto.SystemProperties + { + MessageType = Proto.MessageType.Normal, + MessageId = MessageIdGenerator.GetInstance().Next(), + BornHost = "127.0.0.1", + BodyDigest = digest, + BornTimestamp = new Timestamp() + }; + var body = ByteString.CopyFrom("foobar", Encoding.UTF8); + var message = new Proto.Message + { + SystemProperties = systemProperties, + Topic = new Proto.Resource { Name = "testTopic" }, + Body = body + }; + return message; + } + + private PushConsumer CreatePushConsumer(string topic, int maxCacheMessageCount, int maxCacheMessageSizeInBytes, + int consumptionThreadCount) + { + var clientConfig = new ClientConfig.Builder() + .SetEndpoints("127.0.0.1:8080") + .Build(); + var subscription = new Dictionary + { { topic, new FilterExpression("*") } }; + return new PushConsumer(clientConfig, "testGroup", + new ConcurrentDictionary(subscription), new TestMessageListener(), + maxCacheMessageCount, maxCacheMessageSizeInBytes, consumptionThreadCount); + } + + private class TestMessageListener : IMessageListener + { + public ConsumeResult Consume(MessageView messageView) + { + return ConsumeResult.SUCCESS; + } + } + } +} \ No newline at end of file diff --git a/csharp/tests/ProducerBuilderTest.cs b/csharp/tests/ProducerBuilderTest.cs new file mode 100644 index 00000000..1e49912e --- /dev/null +++ b/csharp/tests/ProducerBuilderTest.cs @@ -0,0 +1,114 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +using System; +using System.Threading.Tasks; +using Microsoft.VisualStudio.TestTools.UnitTesting; +using Org.Apache.Rocketmq; + +namespace tests +{ + [TestClass] + public class ProducerBuilderTest + { + [TestMethod] + [ExpectedException(typeof(ArgumentException))] + public void TestSetClientConfigurationWithNull() + { + var builder = new Producer.Builder(); + builder.SetClientConfig(null); + } + + [TestMethod] + [ExpectedException(typeof(NullReferenceException))] + public void TestSetTopicWithNull() + { + var builder = new Producer.Builder(); + builder.SetTopics(null); + } + + [TestMethod] + [ExpectedException(typeof(ArgumentException))] + public void TestSetIllegalTopic() + { + var builder = new Producer.Builder(); + builder.SetTopics("\t"); + } + + [TestMethod] + public void TestSetTopic() + { + var builder = new Producer.Builder(); + builder.SetTopics("abc"); + } + + [TestMethod] + [ExpectedException(typeof(ArgumentException))] + public void TestSetNegativeMaxAttempts() + { + var builder = new Producer.Builder(); + builder.SetMaxAttempts(-1); + } + + [TestMethod] + public void TestSetMaxAttempts() + { + var builder = new Producer.Builder(); + builder.SetMaxAttempts(3); + } + + [TestMethod] + [ExpectedException(typeof(ArgumentException))] + public void TestSetTransactionCheckerWithNull() + { + var builder = new Producer.Builder(); + builder.SetTransactionChecker(null); + } + + [TestMethod] + public void TestSetTransactionChecker() + { + var builder = new Producer.Builder(); + builder.SetTransactionChecker(new TestTransactionChecker()); + } + + [TestMethod] + [ExpectedException(typeof(ArgumentException))] + public async Task TestBuildWithoutClientConfiguration() + { + var builder = new Producer.Builder(); + await builder.Build(); + } + + [TestMethod] + public void TestBuild() + { + var clientConfig = new ClientConfig.Builder() + .SetEndpoints("127.0.0.1:9876").Build(); + var builder = new Producer.Builder(); + builder.SetClientConfig(clientConfig).Build(); + } + + private class TestTransactionChecker : ITransactionChecker + { + public TransactionResolution Check(MessageView messageView) + { + return TransactionResolution.Commit; + } + } + } +} \ No newline at end of file diff --git a/csharp/tests/ProducerTest.cs b/csharp/tests/ProducerTest.cs new file mode 100644 index 00000000..364ac524 --- /dev/null +++ b/csharp/tests/ProducerTest.cs @@ -0,0 +1,140 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +using System; +using System.Collections.Generic; +using Microsoft.VisualStudio.TestTools.UnitTesting; +using Org.Apache.Rocketmq; +using Proto = Apache.Rocketmq.V2; + +using System.Collections.Concurrent; +using System.Text; +using System.Threading.Tasks; +using Moq; + +namespace tests +{ + [TestClass] + public class ProducerTest + { + [TestMethod] + [ExpectedException(typeof(InvalidOperationException))] + public async Task TestSendBeforeStartup() + { + var clientConfig = new ClientConfig.Builder().SetEndpoints("127.0.0.1:9876").Build(); + var publishingTopics = new ConcurrentDictionary(); + publishingTopics.TryAdd("testTopic", true); + var producer = new Producer(clientConfig, publishingTopics, 1, null); + var message = new Message.Builder().SetTopic("testTopic").SetBody(Encoding.UTF8.GetBytes("foobar")).Build(); + await producer.Send(message); + } + + [TestMethod] + public async Task TestSendWithTopic() + { + var producer = CreateTestClient(); + producer.State = State.Running; + var message = new Message.Builder().SetTopic("testTopic").SetBody(Encoding.UTF8.GetBytes("foobar")).Build(); + var metadata = producer.Sign(); + var sendResultEntry = new Proto.SendResultEntry + { + MessageId = "fakeMsgId", + Status = new Proto.Status + { + Code = Proto.Code.Ok + }, + Offset = 1 + }; + var sendMessageResponse = new Proto.SendMessageResponse + { + Status = new Proto.Status + { + Code = Proto.Code.Ok + }, + Entries = { sendResultEntry } + }; + var sendMessageInvocation = new RpcInvocation(null, + sendMessageResponse, metadata); + var mockClientManager = new Mock(); + producer.SetClientManager(mockClientManager.Object); + mockClientManager.Setup(cm => cm.SendMessage(It.IsAny(), + It.IsAny(), It.IsAny())).Returns(Task.FromResult(sendMessageInvocation)); + await producer.Send(message); + mockClientManager.Verify(cm => cm.SendMessage(It.IsAny(), + It.IsAny(), It.IsAny()), Times.Once); + } + + [TestMethod] + [ExpectedException(typeof(ArgumentException))] + public async Task TestSendFailureWithTopic() + { + var producer = CreateTestClient(); + producer.State = State.Running; + var message = new Message.Builder().SetTopic("testTopic").SetBody(Encoding.UTF8.GetBytes("foobar")).Build(); + var mockClientManager = new Mock(); + producer.SetClientManager(mockClientManager.Object); + var exception = new ArgumentException(); + mockClientManager.Setup(cm => cm.SendMessage(It.IsAny(), + It.IsAny(), It.IsAny())).Throws(exception); + await producer.Send(message); + var maxAttempts = producer.PublishingSettings.GetRetryPolicy().GetMaxAttempts(); + mockClientManager.Verify(cm => cm.SendMessage(It.IsAny(), + It.IsAny(), It.IsAny()), Times.Exactly(maxAttempts)); + } + + private Producer CreateTestClient() + { + const string host0 = "127.0.0.1"; + var mqs = new List(); + var mq0 = new Proto.MessageQueue + { + Broker = new Proto.Broker + { + Name = "broker0", + Endpoints = new Proto.Endpoints + { + Scheme = Proto.AddressScheme.Ipv4, + Addresses = + { + new Proto.Address + { + Host = host0, + Port = 80 + } + } + } + }, + Id = 0, + Permission = Proto.Permission.ReadWrite, + Topic = new Proto.Resource + { + ResourceNamespace = "foo-bar-namespace", + Name = "testTopic", + }, + AcceptMessageTypes = { Proto.MessageType.Normal } + }; + mqs.Add(mq0); + var topicRouteData = new TopicRouteData(mqs); + var publishingLoadBalancer = new PublishingLoadBalancer(topicRouteData); + var clientConfig = new ClientConfig.Builder().SetEndpoints("127.0.0.1:9876").Build(); + var producer = new Producer(clientConfig, new ConcurrentDictionary(), + 1, null); + producer._publishingRouteDataCache.TryAdd("testTopic", publishingLoadBalancer); + return producer; + } + } +} \ No newline at end of file diff --git a/csharp/tests/PushConsumerBuilderTest.cs b/csharp/tests/PushConsumerBuilderTest.cs new file mode 100644 index 00000000..8358c7c1 --- /dev/null +++ b/csharp/tests/PushConsumerBuilderTest.cs @@ -0,0 +1,123 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +using System; +using System.Collections.Generic; +using System.Threading.Tasks; +using Microsoft.VisualStudio.TestTools.UnitTesting; +using Org.Apache.Rocketmq; + +namespace tests +{ + [TestClass] + public class PushConsumerBuilderTest + { + [TestMethod] + [ExpectedException(typeof(ArgumentException))] + public void TestSetClientConfigWithNull() + { + var builder = new PushConsumer.Builder(); + builder.SetClientConfig(null); + } + + [TestMethod] + [ExpectedException(typeof(ArgumentException))] + public void TestSetConsumerGroupWithNull() + { + var builder = new PushConsumer.Builder(); + builder.SetConsumerGroup(null); + } + + [TestMethod] + [ExpectedException(typeof(ArgumentException))] + public void TestSetConsumerGroupWithSpecialChar() + { + var builder = new PushConsumer.Builder(); + builder.SetConsumerGroup("#.testGroup#"); + } + + [TestMethod] + [ExpectedException(typeof(ArgumentException))] + public void TestBuildWithoutExpressions() + { + var builder = new PushConsumer.Builder(); + builder.SetSubscriptionExpression(null); + } + + [TestMethod] + [ExpectedException(typeof(ArgumentException))] + public void TestBuildWithEmptyExpressions() + { + var builder = new PushConsumer.Builder(); + builder.SetSubscriptionExpression(new Dictionary()); + } + + [TestMethod] + [ExpectedException(typeof(ArgumentException))] + public void TestBuildWithNullMessageListener() + { + var builder = new PushConsumer.Builder(); + builder.SetMessageListener(null); + } + + [TestMethod] + [ExpectedException(typeof(ArgumentException))] + public void TestNegativeMaxCacheMessageCount() + { + var builder = new PushConsumer.Builder(); + builder.SetMaxCacheMessageCount(-1); + } + + [TestMethod] + [ExpectedException(typeof(ArgumentException))] + public void TestNegativeMaxCacheMessageSizeInBytes() + { + var builder = new PushConsumer.Builder(); + builder.SetMaxCacheMessageSizeInBytes(-1); + } + + [TestMethod] + [ExpectedException(typeof(ArgumentException))] + public void TestNegativeConsumptionThreadCount() + { + var builder = new PushConsumer.Builder(); + builder.SetMaxCacheMessageCount(-1); + } + + [TestMethod] + public void TestBuild() + { + var clientConfig = new ClientConfig.Builder() + .SetEndpoints("127.0.0.1:9876").Build(); + var subscription = new Dictionary + {{ "fakeTopic", new FilterExpression("*") }}; + var builder = new PushConsumer.Builder(); + builder.SetClientConfig(clientConfig).SetSubscriptionExpression(subscription).SetConsumerGroup("testGroup") + .SetMessageListener(new TestMessageListener()).SetMaxCacheMessageCount(10) + .SetMaxCacheMessageSizeInBytes(10).SetConsumptionThreadCount(10).Build(); + } + + private class TestMessageListener : IMessageListener + { + public ConsumeResult Consume(MessageView messageView) + { + // Handle the received message and return consume result. + return ConsumeResult.SUCCESS; + } + } + } +} \ No newline at end of file diff --git a/csharp/tests/PushConsumerTest.cs b/csharp/tests/PushConsumerTest.cs new file mode 100644 index 00000000..5ae021e6 --- /dev/null +++ b/csharp/tests/PushConsumerTest.cs @@ -0,0 +1,299 @@ +using System; +using System.Collections.Concurrent; +using System.Threading; +using System.Threading.Tasks; +using Grpc.Core; +using Microsoft.VisualStudio.TestTools.UnitTesting; +using Moq; +using Org.Apache.Rocketmq; +using Proto = Apache.Rocketmq.V2; + +namespace tests +{ + [TestClass] + public class PushConsumerTest + { + [TestMethod] + [ExpectedException(typeof(InvalidOperationException))] + public async Task TestSubscribeBeforeStartup() + { + var pushConsumer = CreatePushConsumer(); + await pushConsumer.Subscribe("testTopic", new FilterExpression("*")); + } + + [TestMethod] + [ExpectedException(typeof(InvalidOperationException))] + public void TestUnsubscribeBeforeStartup() + { + var pushConsumer = CreatePushConsumer(); + pushConsumer.Unsubscribe("testTopic"); + } + + [TestMethod] + public async Task TestQueryAssignment() + { + var pushConsumer = CreatePushConsumer(); + var metadata = pushConsumer.Sign(); + var mq = new Proto.MessageQueue + { + Broker = new Proto.Broker + { + Name = "broker0", + Endpoints = new Proto.Endpoints + { + Scheme = Proto.AddressScheme.Ipv4, + Addresses = + { + new Proto.Address + { + Host = "127.0.0.1", + Port = 8080 + } + } + } + }, + Id = 0, + Permission = Proto.Permission.ReadWrite, + Topic = new Proto.Resource + { + ResourceNamespace = "testNamespace", + Name = "testTopic", + }, + AcceptMessageTypes = { Proto.MessageType.Normal } + }; + var queryRouteResponse = new Proto.QueryRouteResponse + { + Status = new Proto.Status + { + Code = Proto.Code.Ok + }, + MessageQueues = { mq } + }; + var queryRouteInvocation = new RpcInvocation(null, + queryRouteResponse, metadata); + var mockClientManager = new Mock(); + mockClientManager.Setup(cm => + cm.QueryRoute(It.IsAny(), It.IsAny(), It.IsAny())) + .Returns(Task.FromResult(queryRouteInvocation)); + var mockCall = new AsyncDuplexStreamingCall( + new MockClientStreamWriter(), + new MockAsyncStreamReader(), + null, + null, + null, + null); + var queryAssignmentResponse = new Proto.QueryAssignmentResponse + { + Status = new Proto.Status + { + Code = Proto.Code.Ok + }, + Assignments = { } + }; + var queryAssignmentInvocation = + new RpcInvocation(null, + queryAssignmentResponse, metadata); + mockClientManager.Setup(cm => + cm.QueryAssignment(It.IsAny(), It.IsAny(), It.IsAny())) + .Returns(Task.FromResult(queryAssignmentInvocation)); + mockClientManager.Setup(cm => cm.Telemetry(It.IsAny())).Returns(mockCall); + pushConsumer.SetClientManager(mockClientManager.Object); + await pushConsumer.QueryAssignment("testTopic"); + } + + [TestMethod] + public async Task TestScanAssignments() + { + var pushConsumer = CreatePushConsumer(); + var metadata = pushConsumer.Sign(); + var mq = new Proto.MessageQueue + { + Broker = new Proto.Broker + { + Name = "broker0", + Endpoints = new Proto.Endpoints + { + Scheme = Proto.AddressScheme.Ipv4, + Addresses = + { + new Proto.Address + { + Host = "127.0.0.1", + Port = 8080 + } + } + } + }, + Id = 0, + Permission = Proto.Permission.ReadWrite, + Topic = new Proto.Resource + { + ResourceNamespace = "testNamespace", + Name = "testTopic", + }, + AcceptMessageTypes = { Proto.MessageType.Normal } + }; + var queryRouteResponse = new Proto.QueryRouteResponse + { + Status = new Proto.Status + { + Code = Proto.Code.Ok + }, + MessageQueues = { mq } + }; + var queryRouteInvocation = new RpcInvocation(null, + queryRouteResponse, metadata); + var mockClientManager = new Mock(); + mockClientManager.Setup(cm => + cm.QueryRoute(It.IsAny(), It.IsAny(), It.IsAny())) + .Returns(Task.FromResult(queryRouteInvocation)); + var mockCall = new AsyncDuplexStreamingCall( + new MockClientStreamWriter(), + new MockAsyncStreamReader(), + null, + null, + null, + null); + var queryAssignmentResponse = new Proto.QueryAssignmentResponse + { + Status = new Proto.Status + { + Code = Proto.Code.Ok + }, + Assignments = { new Proto.Assignment + { + MessageQueue = mq + } } + }; + var queryAssignmentInvocation = + new RpcInvocation(null, + queryAssignmentResponse, metadata); + mockClientManager.Setup(cm => + cm.QueryAssignment(It.IsAny(), It.IsAny(), It.IsAny())) + .Returns(Task.FromResult(queryAssignmentInvocation)); + mockClientManager.Setup(cm => cm.Telemetry(It.IsAny())).Returns(mockCall); + pushConsumer.SetClientManager(mockClientManager.Object); + pushConsumer.State = State.Running; + await pushConsumer.Subscribe("testTopic", new FilterExpression("*")); + pushConsumer.ScanAssignments(); + } + + [TestMethod] + public async Task TestScanAssignmentsWithoutResults() + { + var pushConsumer = CreatePushConsumer(); + var metadata = pushConsumer.Sign(); + var mq = new Proto.MessageQueue + { + Broker = new Proto.Broker + { + Name = "broker0", + Endpoints = new Proto.Endpoints + { + Scheme = Proto.AddressScheme.Ipv4, + Addresses = + { + new Proto.Address + { + Host = "127.0.0.1", + Port = 8080 + } + } + } + }, + Id = 0, + Permission = Proto.Permission.ReadWrite, + Topic = new Proto.Resource + { + ResourceNamespace = "testNamespace", + Name = "testTopic", + }, + AcceptMessageTypes = { Proto.MessageType.Normal } + }; + var queryRouteResponse = new Proto.QueryRouteResponse + { + Status = new Proto.Status + { + Code = Proto.Code.Ok + }, + MessageQueues = { mq } + }; + var queryRouteInvocation = new RpcInvocation(null, + queryRouteResponse, metadata); + var mockClientManager = new Mock(); + mockClientManager.Setup(cm => + cm.QueryRoute(It.IsAny(), It.IsAny(), It.IsAny())) + .Returns(Task.FromResult(queryRouteInvocation)); + var mockCall = new AsyncDuplexStreamingCall( + new MockClientStreamWriter(), + new MockAsyncStreamReader(), + null, + null, + null, + null); + var queryAssignmentResponse = new Proto.QueryAssignmentResponse + { + Status = new Proto.Status + { + Code = Proto.Code.Ok + }, + Assignments = { } + }; + var queryAssignmentInvocation = + new RpcInvocation(null, + queryAssignmentResponse, metadata); + mockClientManager.Setup(cm => + cm.QueryAssignment(It.IsAny(), It.IsAny(), It.IsAny())) + .Returns(Task.FromResult(queryAssignmentInvocation)); + mockClientManager.Setup(cm => cm.Telemetry(It.IsAny())).Returns(mockCall); + pushConsumer.SetClientManager(mockClientManager.Object); + pushConsumer.State = State.Running; + await pushConsumer.Subscribe("testTopic", new FilterExpression("*")); + pushConsumer.ScanAssignments(); + } + + private PushConsumer CreatePushConsumer() + { + var clientConfig = new ClientConfig.Builder() + .SetEndpoints("127.0.0.1") + .Build(); + return new PushConsumer(clientConfig, "testGroup", + new ConcurrentDictionary(), new TestMessageListener(), + 10, 10, 1); + } + + private class TestMessageListener : IMessageListener + { + public ConsumeResult Consume(MessageView messageView) + { + return ConsumeResult.SUCCESS; + } + } + + private class MockClientStreamWriter : IClientStreamWriter + { + public Task WriteAsync(T message) + { + // Simulate async operation + return Task.CompletedTask; + } + + public WriteOptions WriteOptions { get; set; } + + public Task CompleteAsync() + { + throw new NotImplementedException(); + } + } + + private class MockAsyncStreamReader : IAsyncStreamReader + { + public Task MoveNext(CancellationToken cancellationToken) + { + throw new System.NotImplementedException(); + } + + public T Current => throw new NotImplementedException(); + } + } +} \ No newline at end of file diff --git a/csharp/tests/PushSubscriptionSettingsTest.cs b/csharp/tests/PushSubscriptionSettingsTest.cs new file mode 100644 index 00000000..22ed649a --- /dev/null +++ b/csharp/tests/PushSubscriptionSettingsTest.cs @@ -0,0 +1,110 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +using System; +using System.Collections.Concurrent; +using System.Collections.Generic; +using Castle.Core.Internal; +using Google.Protobuf.WellKnownTypes; +using Microsoft.VisualStudio.TestTools.UnitTesting; +using Org.Apache.Rocketmq; +using Proto = Apache.Rocketmq.V2; + +namespace tests +{ + [TestClass] + public class PushSubscriptionSettingsTest + { + [TestMethod] + public void TestToProtobuf() + { + var groupResource = "testConsumerGroup"; + var clientId = "testClientId"; + var subscriptionExpression = new ConcurrentDictionary( + new Dictionary {{"testTopic", new FilterExpression("*")}}); + var requestTimeout = TimeSpan.FromSeconds(3); + var pushSubscriptionSettings = new PushSubscriptionSettings( + "testNamespace", + clientId, + new Endpoints("127.0.0.1:9876"), + groupResource, + requestTimeout, + subscriptionExpression + ); + var settings = pushSubscriptionSettings.ToProtobuf(); + + Assert.AreEqual(Proto.ClientType.PushConsumer, settings.ClientType); + Assert.AreEqual(Duration.FromTimeSpan(requestTimeout), settings.RequestTimeout); + Assert.IsFalse(settings.Subscription.Subscriptions.IsNullOrEmpty()); + var subscription = settings.Subscription; + Assert.AreEqual(subscription.Group, new Proto.Resource + { + ResourceNamespace = "testNamespace", + Name = "testConsumerGroup" + }); + Assert.IsFalse(subscription.Fifo); + var subscriptionsList = subscription.Subscriptions; + Assert.AreEqual(1, subscriptionsList.Count); + var subscriptionEntry = subscriptionsList[0]; + Assert.AreEqual(Proto.FilterType.Tag, subscriptionEntry.Expression.Type); + Assert.AreEqual(subscriptionEntry.Topic, new Proto.Resource + { + ResourceNamespace = "testNamespace", + Name = "testTopic" + }); + } + + [TestMethod] + public void TestSync() + { + var duration0 = Duration.FromTimeSpan(TimeSpan.FromSeconds(1)); + var duration1 = Duration.FromTimeSpan(TimeSpan.FromSeconds(2)); + var duration2 = Duration.FromTimeSpan(TimeSpan.FromSeconds(3)); + var durations = new List { duration0, duration1, duration2 }; + var customizedBackoff = new Proto.CustomizedBackoff + { + Next = { durations } + }; + var retryPolicy = new Proto.RetryPolicy + { + CustomizedBackoff = customizedBackoff, + MaxAttempts = 3 + }; + var receiveBatchSize = 96; + var longPollingTimeout = Duration.FromTimeSpan(TimeSpan.FromSeconds(60)); + var subscription = new Proto.Subscription + { + Fifo = true, + ReceiveBatchSize = receiveBatchSize, + LongPollingTimeout = longPollingTimeout + }; + var settings = new Proto.Settings + { + Subscription = subscription, + BackoffPolicy = retryPolicy + }; + var clientId = "testClientId"; + var subscriptionExpression = new Dictionary + { { "testTopic", new FilterExpression("*") } }; + var requestTimeout = TimeSpan.FromSeconds(3); + var pushSubscriptionSettings = new PushSubscriptionSettings("fakeNamespace", clientId, + new Endpoints("127.0.0.1:8080"), "testConsumerGroup", requestTimeout, + new ConcurrentDictionary(subscriptionExpression)); + pushSubscriptionSettings.Sync(settings); + } + } +} \ No newline at end of file diff --git a/csharp/tests/ResourceTest.cs b/csharp/tests/ResourceTest.cs new file mode 100644 index 00000000..006deb96 --- /dev/null +++ b/csharp/tests/ResourceTest.cs @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +using Microsoft.VisualStudio.TestTools.UnitTesting; +using Org.Apache.Rocketmq; + +namespace tests +{ + [TestClass] + public class ResourceTests + { + [TestMethod] + public void TestGetterAndSetter() + { + var resource = new Resource("foobar"); + Assert.AreEqual("foobar", resource.Name); + Assert.AreEqual(string.Empty, resource.Namespace); + + resource = new Resource("foo", "bar"); + Assert.AreEqual("bar", resource.Name); + Assert.AreEqual("foo", resource.Namespace); + } + + [TestMethod] + public void TestToProtobuf() + { + var resource = new Resource("foo", "bar"); + var protobuf = resource.ToProtobuf(); + Assert.AreEqual("foo", protobuf.ResourceNamespace); + Assert.AreEqual("bar", protobuf.Name); + } + + [TestMethod] + public void TestEqual() + { + var resource0 = new Resource("foo", "bar"); + var resource1 = new Resource("foo", "bar"); + Assert.AreEqual(resource0, resource1); + + var resource2 = new Resource("foo0", "bar"); + Assert.AreNotEqual(resource0, resource2); + } + } +} \ No newline at end of file diff --git a/csharp/tests/SessionTest.cs b/csharp/tests/SessionTest.cs new file mode 100644 index 00000000..31ac622c --- /dev/null +++ b/csharp/tests/SessionTest.cs @@ -0,0 +1,64 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +using System; +using System.Collections.Concurrent; +using System.Threading; +using System.Threading.Tasks; +using Grpc.Core; +using Microsoft.VisualStudio.TestTools.UnitTesting; +using Moq; +using Org.Apache.Rocketmq; +using Endpoints = Org.Apache.Rocketmq.Endpoints; +using Proto = Apache.Rocketmq.V2; + +namespace tests +{ + [TestClass] + public class SessionTests + { + private static Client CreateTestClient() + { + var clientConfig = new ClientConfig.Builder().SetEndpoints("127.0.0.1:9876").Build(); + return new Producer(clientConfig, new ConcurrentDictionary(), 1, null); + } + + [TestMethod] + public void TestSyncSettings() + { + var testClient = CreateTestClient(); + var endpoints = new Endpoints(testClient.GetClientConfig().Endpoints); + + var mockStreamWriter = new Mock>(); + var mockAsyncStreamReader = new Mock>(); + var mockClientManager = new Mock(); + var mockGrpcCall = new AsyncDuplexStreamingCall( + mockStreamWriter.Object, mockAsyncStreamReader.Object, null, null, null, null); + + mockClientManager.Setup(cm => cm.Telemetry(endpoints)).Returns(mockGrpcCall); + var session = new Session(endpoints, mockGrpcCall, testClient); + + var settings = new Proto.Settings(); + mockStreamWriter.Setup(m => m.WriteAsync(It.Is(tc => tc.Settings == settings))).Returns(Task.CompletedTask); + testClient.SetClientManager(mockClientManager.Object); + + session.SyncSettings(true); + + mockStreamWriter.Verify(m => m.WriteAsync(It.IsAny()), Times.Once); + } + } +} \ No newline at end of file diff --git a/csharp/tests/SimpleConsumerBuilderTest.cs b/csharp/tests/SimpleConsumerBuilderTest.cs new file mode 100644 index 00000000..80ad08d5 --- /dev/null +++ b/csharp/tests/SimpleConsumerBuilderTest.cs @@ -0,0 +1,80 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +using System; +using System.Collections.Generic; +using System.Threading.Tasks; +using Microsoft.VisualStudio.TestTools.UnitTesting; +using Org.Apache.Rocketmq; + +namespace tests +{ + [TestClass] + public class SimpleConsumerBuilderTest + { + [TestMethod] + [ExpectedException(typeof(ArgumentException))] + public void TestSetClientConfigurationWithNull() + { + var builder = new SimpleConsumer.Builder(); + builder.SetClientConfig(null); + } + + [TestMethod] + [ExpectedException(typeof(ArgumentException))] + public void TestSetConsumerGroupWithNull() + { + var builder = new SimpleConsumer.Builder(); + builder.SetConsumerGroup(null); + } + + [TestMethod] + public void TestSetAwaitDuration() + { + var builder = new SimpleConsumer.Builder(); + builder.SetAwaitDuration(TimeSpan.FromSeconds(5)); + } + + [TestMethod] + [ExpectedException(typeof(ArgumentException))] + public void TestBuildWithEmptyExpressions() + { + var builder = new SimpleConsumer.Builder(); + builder.SetSubscriptionExpression(new Dictionary()); + } + + [TestMethod] + [ExpectedException(typeof(ArgumentException))] + public void TestBuildWithoutExpressions() + { + var builder = new SimpleConsumer.Builder(); + builder.SetSubscriptionExpression(null); + } + + [TestMethod] + public void TestBuild() + { + var clientConfig = new ClientConfig.Builder() + .SetEndpoints("127.0.0.1:9876").Build(); + var subscription = new Dictionary + {{ "fakeTopic", new FilterExpression("*") }}; + var builder = new SimpleConsumer.Builder(); + builder.SetClientConfig(clientConfig).SetConsumerGroup("fakeGroup"). + SetSubscriptionExpression(subscription).Build(); + } + } +} \ No newline at end of file diff --git a/csharp/tests/SimpleConsumerTest.cs b/csharp/tests/SimpleConsumerTest.cs new file mode 100644 index 00000000..7f15eea4 --- /dev/null +++ b/csharp/tests/SimpleConsumerTest.cs @@ -0,0 +1,840 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +using System; +using System.Collections.Generic; +using System.Text; +using System.Threading.Tasks; +using Google.Protobuf; +using Google.Protobuf.WellKnownTypes; +using Microsoft.VisualStudio.TestTools.UnitTesting; +using Moq; +using Org.Apache.Rocketmq; +using Org.Apache.Rocketmq.Error; +using Proto = Apache.Rocketmq.V2; + +namespace tests +{ + [TestClass] + public class SimpleConsumerTest + { + [TestMethod] + [ExpectedException(typeof(InvalidOperationException))] + public async Task TestReceiveWithoutStart() + { + var consumer = CreateSimpleConsumer(); + await consumer.Receive(16, TimeSpan.FromSeconds(15)); + } + + [TestMethod] + [ExpectedException(typeof(InvalidOperationException))] + public async Task TestAckWithoutStart() + { + var consumer = CreateSimpleConsumer(); + var messageView = MessageView.FromProtobuf(CreateMessage()); + await consumer.Ack(messageView); + } + + [TestMethod] + [ExpectedException(typeof(InvalidOperationException))] + public async Task TestSubscribeWithoutStart() + { + var consumer = CreateSimpleConsumer(); + await consumer.Subscribe("testTopic", new FilterExpression("*")); + } + + [TestMethod] + [ExpectedException(typeof(InvalidOperationException))] + public void TestUnsubscribeWithoutStart() + { + var consumer = CreateSimpleConsumer(); + consumer.Unsubscribe("testTopic"); + } + + [TestMethod] + [ExpectedException(typeof(InternalErrorException))] + public async Task TestReceiveWithZeroMaxMessageNum() + { + var consumer = CreateSimpleConsumer(); + consumer.State = State.Running; + await consumer.Receive(0, TimeSpan.FromSeconds(15)); + } + + [TestMethod] + public async Task TestAck() + { + var consumer = CreateSimpleConsumer(); + consumer.State = State.Running; + var messageView = MessageView.FromProtobuf(CreateMessage(), new MessageQueue(new Proto.MessageQueue + { + Broker = new Proto.Broker + { + Name = "broker0", + Endpoints = new Proto.Endpoints + { + Scheme = Proto.AddressScheme.Ipv4, + Addresses = + { + new Proto.Address + { + Host = "127.0.0.1", + Port = 8080 + } + } + } + }, + Id = 0, + Permission = Proto.Permission.ReadWrite, + Topic = new Proto.Resource + { + ResourceNamespace = "testNamespace", + Name = "testTopic", + } + })); + var mockClientManager = new Mock(); + consumer.SetClientManager(mockClientManager.Object); + + var metadata = consumer.Sign(); + var ackMessageResponse0 = new Proto.AckMessageResponse + { + Status = new Proto.Status + { + Code = Proto.Code.Ok + } + }; + var ackMessageInvocation0 = new RpcInvocation(null, + ackMessageResponse0, metadata); + mockClientManager.Setup(cm => cm.AckMessage(It.IsAny(), + It.IsAny(), It.IsAny())) + .Returns(Task.FromResult(ackMessageInvocation0)); + await consumer.Ack(messageView); + + + var ackMessageResponse1 = new Proto.AckMessageResponse + { + Status = new Proto.Status + { + Code = Proto.Code.BadRequest + } + }; + var ackMessageInvocation1 = new RpcInvocation(null, + ackMessageResponse1, metadata); + mockClientManager.Setup(cm => cm.AckMessage(It.IsAny(), + It.IsAny(), It.IsAny())) + .Returns(Task.FromResult(ackMessageInvocation1)); + try + { + await consumer.Ack(messageView); + } + catch (Exception e) + { + Assert.IsInstanceOfType(e, typeof(BadRequestException)); + } + + + var ackMessageResponse2 = new Proto.AckMessageResponse + { + Status = new Proto.Status + { + Code = Proto.Code.IllegalTopic + } + }; + var ackMessageInvocation2 = new RpcInvocation(null, + ackMessageResponse2, metadata); + mockClientManager.Setup(cm => cm.AckMessage(It.IsAny(), + It.IsAny(), It.IsAny())) + .Returns(Task.FromResult(ackMessageInvocation2)); + try + { + await consumer.Ack(messageView); + } + catch (Exception e) + { + Assert.IsInstanceOfType(e, typeof(BadRequestException)); + } + + + var ackMessageResponse3 = new Proto.AckMessageResponse + { + Status = new Proto.Status + { + Code = Proto.Code.IllegalConsumerGroup + } + }; + var ackMessageInvocation3 = new RpcInvocation(null, + ackMessageResponse3, metadata); + mockClientManager.Setup(cm => cm.AckMessage(It.IsAny(), + It.IsAny(), It.IsAny())) + .Returns(Task.FromResult(ackMessageInvocation3)); + try + { + await consumer.Ack(messageView); + } + catch (Exception e) + { + Assert.IsInstanceOfType(e, typeof(BadRequestException)); + } + + + var ackMessageResponse4 = new Proto.AckMessageResponse + { + Status = new Proto.Status + { + Code = Proto.Code.InvalidReceiptHandle + } + }; + var ackMessageInvocation4 = new RpcInvocation(null, + ackMessageResponse4, metadata); + mockClientManager.Setup(cm => cm.AckMessage(It.IsAny(), + It.IsAny(), It.IsAny())) + .Returns(Task.FromResult(ackMessageInvocation4)); + try + { + await consumer.Ack(messageView); + } + catch (Exception e) + { + Assert.IsInstanceOfType(e, typeof(BadRequestException)); + } + + + var ackMessageResponse5 = new Proto.AckMessageResponse + { + Status = new Proto.Status + { + Code = Proto.Code.ClientIdRequired + } + }; + var ackMessageInvocation5 = new RpcInvocation(null, + ackMessageResponse5, metadata); + mockClientManager.Setup(cm => cm.AckMessage(It.IsAny(), + It.IsAny(), It.IsAny())) + .Returns(Task.FromResult(ackMessageInvocation5)); + try + { + await consumer.Ack(messageView); + } + catch (Exception e) + { + Assert.IsInstanceOfType(e, typeof(BadRequestException)); + } + + + var ackMessageResponse6 = new Proto.AckMessageResponse + { + Status = new Proto.Status + { + Code = Proto.Code.Unauthorized + } + }; + var ackMessageInvocation6 = new RpcInvocation(null, + ackMessageResponse6, metadata); + mockClientManager.Setup(cm => cm.AckMessage(It.IsAny(), + It.IsAny(), It.IsAny())) + .Returns(Task.FromResult(ackMessageInvocation6)); + try + { + await consumer.Ack(messageView); + } + catch (Exception e) + { + Assert.IsInstanceOfType(e, typeof(UnauthorizedException)); + } + + + var ackMessageResponse7 = new Proto.AckMessageResponse + { + Status = new Proto.Status + { + Code = Proto.Code.Forbidden + } + }; + var ackMessageInvocation7 = new RpcInvocation(null, + ackMessageResponse7, metadata); + mockClientManager.Setup(cm => cm.AckMessage(It.IsAny(), + It.IsAny(), It.IsAny())) + .Returns(Task.FromResult(ackMessageInvocation7)); + try + { + await consumer.Ack(messageView); + } + catch (Exception e) + { + Assert.IsInstanceOfType(e, typeof(ForbiddenException)); + } + + + var ackMessageResponse8 = new Proto.AckMessageResponse + { + Status = new Proto.Status + { + Code = Proto.Code.NotFound + } + }; + var ackMessageInvocation8 = new RpcInvocation(null, + ackMessageResponse8, metadata); + mockClientManager.Setup(cm => cm.AckMessage(It.IsAny(), + It.IsAny(), It.IsAny())) + .Returns(Task.FromResult(ackMessageInvocation8)); + try + { + await consumer.Ack(messageView); + } + catch (Exception e) + { + Assert.IsInstanceOfType(e, typeof(NotFoundException)); + } + + + var ackMessageResponse9 = new Proto.AckMessageResponse + { + Status = new Proto.Status + { + Code = Proto.Code.TopicNotFound + } + }; + var ackMessageInvocation9 = new RpcInvocation(null, + ackMessageResponse9, metadata); + mockClientManager.Setup(cm => cm.AckMessage(It.IsAny(), + It.IsAny(), It.IsAny())) + .Returns(Task.FromResult(ackMessageInvocation9)); + try + { + await consumer.Ack(messageView); + } + catch (Exception e) + { + Assert.IsInstanceOfType(e, typeof(NotFoundException)); + } + + + var ackMessageResponse10 = new Proto.AckMessageResponse + { + Status = new Proto.Status + { + Code = Proto.Code.TooManyRequests + } + }; + var ackMessageInvocation10 = new RpcInvocation(null, + ackMessageResponse10, metadata); + mockClientManager.Setup(cm => cm.AckMessage(It.IsAny(), + It.IsAny(), It.IsAny())) + .Returns(Task.FromResult(ackMessageInvocation10)); + try + { + await consumer.Ack(messageView); + } + catch (Exception e) + { + Assert.IsInstanceOfType(e, typeof(TooManyRequestsException)); + } + + + var ackMessageResponse11 = new Proto.AckMessageResponse + { + Status = new Proto.Status + { + Code = Proto.Code.InternalError + } + }; + var ackMessageInvocation11 = new RpcInvocation(null, + ackMessageResponse11, metadata); + mockClientManager.Setup(cm => cm.AckMessage(It.IsAny(), + It.IsAny(), It.IsAny())) + .Returns(Task.FromResult(ackMessageInvocation11)); + try + { + await consumer.Ack(messageView); + } + catch (Exception e) + { + Assert.IsInstanceOfType(e, typeof(InternalErrorException)); + } + + + var ackMessageResponse12 = new Proto.AckMessageResponse + { + Status = new Proto.Status + { + Code = Proto.Code.InternalServerError + } + }; + var ackMessageInvocation12 = new RpcInvocation(null, + ackMessageResponse12, metadata); + mockClientManager.Setup(cm => cm.AckMessage(It.IsAny(), + It.IsAny(), It.IsAny())) + .Returns(Task.FromResult(ackMessageInvocation12)); + try + { + await consumer.Ack(messageView); + } + catch (Exception e) + { + Assert.IsInstanceOfType(e, typeof(InternalErrorException)); + } + + + var ackMessageResponse13 = new Proto.AckMessageResponse + { + Status = new Proto.Status + { + Code = Proto.Code.ProxyTimeout + } + }; + var ackMessageInvocation13 = new RpcInvocation(null, + ackMessageResponse13, metadata); + mockClientManager.Setup(cm => cm.AckMessage(It.IsAny(), + It.IsAny(), It.IsAny())) + .Returns(Task.FromResult(ackMessageInvocation13)); + try + { + await consumer.Ack(messageView); + } + catch (Exception e) + { + Assert.IsInstanceOfType(e, typeof(ProxyTimeoutException)); + } + + + var ackMessageResponse14 = new Proto.AckMessageResponse + { + Status = new Proto.Status + { + Code = Proto.Code.Unsupported + } + }; + var ackMessageInvocation14 = new RpcInvocation(null, + ackMessageResponse14, metadata); + mockClientManager.Setup(cm => cm.AckMessage(It.IsAny(), + It.IsAny(), It.IsAny())) + .Returns(Task.FromResult(ackMessageInvocation14)); + try + { + await consumer.Ack(messageView); + } + catch (Exception e) + { + Assert.IsInstanceOfType(e, typeof(UnsupportedException)); + } + } + + [TestMethod] + public async Task TestChangeInvisibleDuration() + { + var consumer = CreateSimpleConsumer(); + consumer.State = State.Running; + var messageView = MessageView.FromProtobuf(CreateMessage(), new MessageQueue(new Proto.MessageQueue + { + Broker = new Proto.Broker + { + Name = "broker0", + Endpoints = new Proto.Endpoints + { + Scheme = Proto.AddressScheme.Ipv4, + Addresses = + { + new Proto.Address + { + Host = "127.0.0.1", + Port = 8080 + } + } + } + }, + Id = 0, + Permission = Proto.Permission.ReadWrite, + Topic = new Proto.Resource + { + ResourceNamespace = "testNamespace", + Name = "testTopic", + } + })); + var invisibleDuration = TimeSpan.FromSeconds(3); + var mockClientManager = new Mock(); + consumer.SetClientManager(mockClientManager.Object); + + var metadata = consumer.Sign(); + var changeInvisibleTimeResponse0 = new Proto.ChangeInvisibleDurationResponse + { + Status = new Proto.Status + { + Code = Proto.Code.Ok + } + }; + var changeInvisibleTimeInvocation0 = + new RpcInvocation(null, + changeInvisibleTimeResponse0, metadata); + mockClientManager.Setup(cm => cm.ChangeInvisibleDuration(It.IsAny(), + It.IsAny(), It.IsAny())) + .Returns(Task.FromResult(changeInvisibleTimeInvocation0)); + await consumer.ChangeInvisibleDuration(messageView, invisibleDuration); + + + var changeInvisibleTimeResponse1 = new Proto.ChangeInvisibleDurationResponse + { + Status = new Proto.Status + { + Code = Proto.Code.BadRequest + } + }; + var changeInvisibleTimeInvocation1 = + new RpcInvocation(null, + changeInvisibleTimeResponse1, metadata); + mockClientManager.Setup(cm => cm.ChangeInvisibleDuration(It.IsAny(), + It.IsAny(), It.IsAny())) + .Returns(Task.FromResult(changeInvisibleTimeInvocation1)); + try + { + await consumer.ChangeInvisibleDuration(messageView, invisibleDuration); + } + catch (Exception e) + { + Assert.IsInstanceOfType(e, typeof(BadRequestException)); + } + + + var changeInvisibleTimeResponse2 = new Proto.ChangeInvisibleDurationResponse + { + Status = new Proto.Status + { + Code = Proto.Code.IllegalTopic + } + }; + var changeInvisibleTimeInvocation2 = + new RpcInvocation(null, + changeInvisibleTimeResponse2, metadata); + mockClientManager.Setup(cm => cm.ChangeInvisibleDuration(It.IsAny(), + It.IsAny(), It.IsAny())) + .Returns(Task.FromResult(changeInvisibleTimeInvocation2)); + try + { + await consumer.ChangeInvisibleDuration(messageView, invisibleDuration); + } + catch (Exception e) + { + Assert.IsInstanceOfType(e, typeof(BadRequestException)); + } + + + var changeInvisibleTimeResponse3 = new Proto.ChangeInvisibleDurationResponse + { + Status = new Proto.Status + { + Code = Proto.Code.IllegalConsumerGroup + } + }; + var changeInvisibleTimeInvocation3 = + new RpcInvocation(null, + changeInvisibleTimeResponse3, metadata); + mockClientManager.Setup(cm => cm.ChangeInvisibleDuration(It.IsAny(), + It.IsAny(), It.IsAny())) + .Returns(Task.FromResult(changeInvisibleTimeInvocation3)); + try + { + await consumer.ChangeInvisibleDuration(messageView, invisibleDuration); + } + catch (Exception e) + { + Assert.IsInstanceOfType(e, typeof(BadRequestException)); + } + + + var changeInvisibleTimeResponse4 = new Proto.ChangeInvisibleDurationResponse + { + Status = new Proto.Status + { + Code = Proto.Code.IllegalInvisibleTime + } + }; + var changeInvisibleTimeInvocation4 = + new RpcInvocation(null, + changeInvisibleTimeResponse4, metadata); + mockClientManager.Setup(cm => cm.ChangeInvisibleDuration(It.IsAny(), + It.IsAny(), It.IsAny())) + .Returns(Task.FromResult(changeInvisibleTimeInvocation4)); + try + { + await consumer.ChangeInvisibleDuration(messageView, invisibleDuration); + } + catch (Exception e) + { + Assert.IsInstanceOfType(e, typeof(BadRequestException)); + } + + + var changeInvisibleTimeResponse5 = new Proto.ChangeInvisibleDurationResponse + { + Status = new Proto.Status + { + Code = Proto.Code.InvalidReceiptHandle + } + }; + var changeInvisibleTimeInvocation5 = + new RpcInvocation(null, + changeInvisibleTimeResponse5, metadata); + mockClientManager.Setup(cm => cm.ChangeInvisibleDuration(It.IsAny(), + It.IsAny(), It.IsAny())) + .Returns(Task.FromResult(changeInvisibleTimeInvocation5)); + try + { + await consumer.ChangeInvisibleDuration(messageView, invisibleDuration); + } + catch (Exception e) + { + Assert.IsInstanceOfType(e, typeof(BadRequestException)); + } + + + var changeInvisibleTimeResponse6 = new Proto.ChangeInvisibleDurationResponse + { + Status = new Proto.Status + { + Code = Proto.Code.ClientIdRequired + } + }; + var changeInvisibleTimeInvocation6 = + new RpcInvocation(null, + changeInvisibleTimeResponse6, metadata); + mockClientManager.Setup(cm => cm.ChangeInvisibleDuration(It.IsAny(), + It.IsAny(), It.IsAny())) + .Returns(Task.FromResult(changeInvisibleTimeInvocation6)); + try + { + await consumer.ChangeInvisibleDuration(messageView, invisibleDuration); + } + catch (Exception e) + { + Assert.IsInstanceOfType(e, typeof(BadRequestException)); + } + + + var changeInvisibleTimeResponse7 = new Proto.ChangeInvisibleDurationResponse + { + Status = new Proto.Status + { + Code = Proto.Code.Unauthorized + } + }; + var changeInvisibleTimeInvocation7 = + new RpcInvocation(null, + changeInvisibleTimeResponse7, metadata); + mockClientManager.Setup(cm => cm.ChangeInvisibleDuration(It.IsAny(), + It.IsAny(), It.IsAny())) + .Returns(Task.FromResult(changeInvisibleTimeInvocation7)); + try + { + await consumer.ChangeInvisibleDuration(messageView, invisibleDuration); + } + catch (Exception e) + { + Assert.IsInstanceOfType(e, typeof(UnauthorizedException)); + } + + + var changeInvisibleTimeResponse8 = new Proto.ChangeInvisibleDurationResponse + { + Status = new Proto.Status + { + Code = Proto.Code.NotFound + } + }; + var changeInvisibleTimeInvocation8 = + new RpcInvocation(null, + changeInvisibleTimeResponse8, metadata); + mockClientManager.Setup(cm => cm.ChangeInvisibleDuration(It.IsAny(), + It.IsAny(), It.IsAny())) + .Returns(Task.FromResult(changeInvisibleTimeInvocation8)); + try + { + await consumer.ChangeInvisibleDuration(messageView, invisibleDuration); + } + catch (Exception e) + { + Assert.IsInstanceOfType(e, typeof(NotFoundException)); + } + + + var changeInvisibleTimeResponse9 = new Proto.ChangeInvisibleDurationResponse + { + Status = new Proto.Status + { + Code = Proto.Code.TopicNotFound + } + }; + var changeInvisibleTimeInvocation9 = + new RpcInvocation(null, + changeInvisibleTimeResponse9, metadata); + mockClientManager.Setup(cm => cm.ChangeInvisibleDuration(It.IsAny(), + It.IsAny(), It.IsAny())) + .Returns(Task.FromResult(changeInvisibleTimeInvocation9)); + try + { + await consumer.ChangeInvisibleDuration(messageView, invisibleDuration); + } + catch (Exception e) + { + Assert.IsInstanceOfType(e, typeof(NotFoundException)); + } + + + var changeInvisibleTimeResponse10 = new Proto.ChangeInvisibleDurationResponse + { + Status = new Proto.Status + { + Code = Proto.Code.TooManyRequests + } + }; + var changeInvisibleTimeInvocation10 = + new RpcInvocation(null, + changeInvisibleTimeResponse10, metadata); + mockClientManager.Setup(cm => cm.ChangeInvisibleDuration(It.IsAny(), + It.IsAny(), It.IsAny())) + .Returns(Task.FromResult(changeInvisibleTimeInvocation10)); + try + { + await consumer.ChangeInvisibleDuration(messageView, invisibleDuration); + } + catch (Exception e) + { + Assert.IsInstanceOfType(e, typeof(TooManyRequestsException)); + } + + + var changeInvisibleTimeResponse11 = new Proto.ChangeInvisibleDurationResponse + { + Status = new Proto.Status + { + Code = Proto.Code.InternalError + } + }; + var changeInvisibleTimeInvocation11 = + new RpcInvocation(null, + changeInvisibleTimeResponse11, metadata); + mockClientManager.Setup(cm => cm.ChangeInvisibleDuration(It.IsAny(), + It.IsAny(), It.IsAny())) + .Returns(Task.FromResult(changeInvisibleTimeInvocation11)); + try + { + await consumer.ChangeInvisibleDuration(messageView, invisibleDuration); + } + catch (Exception e) + { + Assert.IsInstanceOfType(e, typeof(InternalErrorException)); + } + + + var changeInvisibleTimeResponse12 = new Proto.ChangeInvisibleDurationResponse + { + Status = new Proto.Status + { + Code = Proto.Code.InternalServerError + } + }; + var changeInvisibleTimeInvocation12 = + new RpcInvocation(null, + changeInvisibleTimeResponse12, metadata); + mockClientManager.Setup(cm => cm.ChangeInvisibleDuration(It.IsAny(), + It.IsAny(), It.IsAny())) + .Returns(Task.FromResult(changeInvisibleTimeInvocation12)); + try + { + await consumer.ChangeInvisibleDuration(messageView, invisibleDuration); + } + catch (Exception e) + { + Assert.IsInstanceOfType(e, typeof(InternalErrorException)); + } + + + var changeInvisibleTimeResponse13 = new Proto.ChangeInvisibleDurationResponse + { + Status = new Proto.Status + { + Code = Proto.Code.ProxyTimeout + } + }; + var changeInvisibleTimeInvocation13 = + new RpcInvocation(null, + changeInvisibleTimeResponse13, metadata); + mockClientManager.Setup(cm => cm.ChangeInvisibleDuration(It.IsAny(), + It.IsAny(), It.IsAny())) + .Returns(Task.FromResult(changeInvisibleTimeInvocation13)); + try + { + await consumer.ChangeInvisibleDuration(messageView, invisibleDuration); + } + catch (Exception e) + { + Assert.IsInstanceOfType(e, typeof(ProxyTimeoutException)); + } + + + var changeInvisibleTimeResponse14 = new Proto.ChangeInvisibleDurationResponse + { + Status = new Proto.Status + { + Code = Proto.Code.Unsupported + } + }; + var changeInvisibleTimeInvocation14 = + new RpcInvocation(null, + changeInvisibleTimeResponse14, metadata); + mockClientManager.Setup(cm => cm.ChangeInvisibleDuration(It.IsAny(), + It.IsAny(), It.IsAny())) + .Returns(Task.FromResult(changeInvisibleTimeInvocation14)); + try + { + await consumer.ChangeInvisibleDuration(messageView, invisibleDuration); + } + catch (Exception e) + { + Assert.IsInstanceOfType(e, typeof(UnsupportedException)); + } + } + + private SimpleConsumer CreateSimpleConsumer() + { + var clientConfig = new ClientConfig.Builder().SetEndpoints("127.0.0.1:9876").Build(); + var subscription = new Dictionary + { { "testTopic", new FilterExpression("*") } }; + var consumer = new SimpleConsumer(clientConfig, "testConsumerGroup", + TimeSpan.FromSeconds(15), subscription); + return consumer; + } + + private Proto.Message CreateMessage() + { + var digest = new Proto.Digest { Type = Proto.DigestType.Crc32, Checksum = "9EF61F95" }; + var systemProperties = new Proto.SystemProperties + { + MessageType = Proto.MessageType.Normal, + MessageId = MessageIdGenerator.GetInstance().Next(), + BornHost = "127.0.0.1", + BodyDigest = digest, + BornTimestamp = new Timestamp() + }; + var body = ByteString.CopyFrom("foobar", Encoding.UTF8); + var message = new Proto.Message + { + SystemProperties = systemProperties, + Topic = new Proto.Resource { Name = "testTopic" }, + Body = body + }; + return message; + } + } +} \ No newline at end of file diff --git a/csharp/tests/SimpleSubscriptionSettingsTest.cs b/csharp/tests/SimpleSubscriptionSettingsTest.cs new file mode 100644 index 00000000..3aa77c24 --- /dev/null +++ b/csharp/tests/SimpleSubscriptionSettingsTest.cs @@ -0,0 +1,108 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +using System; +using System.Collections.Concurrent; +using System.Collections.Generic; +using Apache.Rocketmq.V2; +using Castle.Core.Internal; +using Google.Protobuf.WellKnownTypes; +using Microsoft.Extensions.Logging; +using Microsoft.VisualStudio.TestTools.UnitTesting; +using Moq; +using Org.Apache.Rocketmq; +using Proto = Apache.Rocketmq.V2; +using Endpoints = Org.Apache.Rocketmq.Endpoints; +using FilterExpression = Org.Apache.Rocketmq.FilterExpression; + +namespace tests +{ + [TestClass] + public class SimpleSubscriptionSettingsTest + { + [TestMethod] + public void TestToProtobuf() + { + var groupResource = "testConsumerGroup"; + var clientId = "testClientId"; + var subscriptionExpression = new ConcurrentDictionary( + new Dictionary {{"testTopic", new FilterExpression("*")}}); + var requestTimeout = TimeSpan.FromSeconds(3); + var longPollingTimeout = TimeSpan.FromSeconds(15); + var simpleSubscriptionSettings = new SimpleSubscriptionSettings( + "testNamespace", + clientId, + new Endpoints("127.0.0.1:9876"), + groupResource, + requestTimeout, + longPollingTimeout, + subscriptionExpression + ); + var settings = simpleSubscriptionSettings.ToProtobuf(); + + Assert.AreEqual(Proto.ClientType.SimpleConsumer, settings.ClientType); + Assert.AreEqual(Duration.FromTimeSpan(requestTimeout), settings.RequestTimeout); + Assert.IsFalse(settings.Subscription.Subscriptions.IsNullOrEmpty()); + var subscription = settings.Subscription; + Assert.AreEqual(subscription.Group, new Proto.Resource + { + ResourceNamespace = "testNamespace", + Name = "testConsumerGroup" + }); + Assert.IsFalse(subscription.Fifo); + Assert.AreEqual(Duration.FromTimeSpan(longPollingTimeout), subscription.LongPollingTimeout); + var subscriptionsList = subscription.Subscriptions; + Assert.AreEqual(1, subscriptionsList.Count); + var subscriptionEntry = subscriptionsList[0]; + Assert.AreEqual(FilterType.Tag, subscriptionEntry.Expression.Type); + Assert.AreEqual(subscriptionEntry.Topic, new Proto.Resource + { + ResourceNamespace = "testNamespace", + Name = "testTopic" + }); + } + + [TestMethod] + public void TestSync() + { + var groupResource = "testConsumerGroup"; + var clientId = "testClientId"; + var subscriptionExpression = new ConcurrentDictionary( + new Dictionary {{"testTopic", new FilterExpression("*")}}); + var requestTimeout = TimeSpan.FromSeconds(3); + var longPollingTimeout = TimeSpan.FromSeconds(15); + var simpleSubscriptionSettings = new SimpleSubscriptionSettings( + "testNamespace", + clientId, + new Endpoints("127.0.0.1:9876"), + groupResource, + requestTimeout, + longPollingTimeout, + subscriptionExpression + ); + var subscription = new Proto.Subscription + { + Fifo = true + }; + var settings = new Proto.Settings + { + Subscription = subscription + }; + simpleSubscriptionSettings.Sync(settings); + } + } +} \ No newline at end of file diff --git a/csharp/tests/StatusCheckerTest.cs b/csharp/tests/StatusCheckerTest.cs new file mode 100644 index 00000000..b9215807 --- /dev/null +++ b/csharp/tests/StatusCheckerTest.cs @@ -0,0 +1,230 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +using System; +using Microsoft.VisualStudio.TestTools.UnitTesting; +using Org.Apache.Rocketmq; +using Org.Apache.Rocketmq.Error; +using Proto = Apache.Rocketmq.V2; + +namespace tests +{ + [TestClass] + public class StatusCheckerTests + { + [TestMethod] + public void TestCheckStatusOk() + { + // Arrange + var status = new Proto.Status { Code = Proto.Code.Ok, Message = "OK" }; + var request = new Proto.ReceiveMessageRequest(); + var requestId = "requestId"; + + // Act & Assert + Exception exception = null; + try + { + StatusChecker.Check(status, request, requestId); + } + catch (Exception ex) + { + exception = ex; + } + + Assert.IsNull(exception, "Expected no exception to be thrown, but got: " + exception); + } + + [TestMethod] + public void TestCheckStatusMultipleResults() + { + // Arrange + var status = new Proto.Status { Code = Proto.Code.MultipleResults, Message = "Multiple Results" }; + var request = new Proto.ReceiveMessageRequest(); + var requestId = "requestId"; + + // Act & Assert + // Act & Assert + Exception exception = null; + try + { + StatusChecker.Check(status, request, requestId); + } + catch (Exception ex) + { + exception = ex; + } + + Assert.IsNull(exception, "Expected no exception to be thrown, but got: " + exception); + } + + [TestMethod] + public void TestCheckStatusBadRequest() + { + // Arrange + var status = new Proto.Status { Code = Proto.Code.BadRequest, Message = "Bad Request" }; + var request = new Proto.ReceiveMessageRequest(); + var requestId = "requestId"; + + // Act & Assert + Assert.ThrowsException(() => StatusChecker.Check(status, request, requestId)); + } + + [TestMethod] + public void TestCheckStatusUnauthorized() + { + // Arrange + var status = new Proto.Status { Code = Proto.Code.Unauthorized, Message = "Unauthorized" }; + var request = new Proto.ReceiveMessageRequest(); + var requestId = "requestId"; + + // Act & Assert + Assert.ThrowsException(() => StatusChecker.Check(status, request, requestId)); + } + + [TestMethod] + public void TestCheckStatusPaymentRequired() + { + // Arrange + var status = new Proto.Status { Code = Proto.Code.PaymentRequired, Message = "Payment Required" }; + var request = new Proto.ReceiveMessageRequest(); + var requestId = "requestId"; + + // Act & Assert + Assert.ThrowsException(() => StatusChecker.Check(status, request, requestId)); + } + + [TestMethod] + public void TestCheckStatusForbidden() + { + // Arrange + var status = new Proto.Status { Code = Proto.Code.Forbidden, Message = "Forbidden" }; + var request = new Proto.ReceiveMessageRequest(); + var requestId = "requestId"; + + // Act & Assert + Assert.ThrowsException(() => StatusChecker.Check(status, request, requestId)); + } + + [TestMethod] + public void TestCheckStatusMessageNotFoundForNonReceiveRequest() + { + // Arrange + var status = new Proto.Status { Code = Proto.Code.MessageNotFound, Message = "Message Not Found" }; + var request = new Proto.SendMessageRequest(); + var requestId = "requestId"; + + // Act & Assert + Assert.ThrowsException(() => StatusChecker.Check(status, request, requestId)); + } + + [TestMethod] + public void TestCheckStatusNotFound() + { + // Arrange + var status = new Proto.Status { Code = Proto.Code.NotFound, Message = "Not Found" }; + var request = new Proto.ReceiveMessageRequest(); + var requestId = "requestId"; + + // Act & Assert + Assert.ThrowsException(() => StatusChecker.Check(status, request, requestId)); + } + + [TestMethod] + public void TestCheckStatusPayloadTooLarge() + { + // Arrange + var status = new Proto.Status { Code = Proto.Code.PayloadTooLarge, Message = "Payload Too Large" }; + var request = new Proto.ReceiveMessageRequest(); + var requestId = "requestId"; + + // Act & Assert + Assert.ThrowsException(() => StatusChecker.Check(status, request, requestId)); + } + + [TestMethod] + public void TestCheckStatusTooManyRequests() + { + // Arrange + var status = new Proto.Status { Code = Proto.Code.TooManyRequests, Message = "Too Many Requests" }; + var request = new Proto.ReceiveMessageRequest(); + var requestId = "requestId"; + + // Act & Assert + Assert.ThrowsException(() => StatusChecker.Check(status, request, requestId)); + } + + [TestMethod] + public void TestCheckStatusRequestHeaderFieldsTooLarge() + { + // Arrange + var status = new Proto.Status { Code = Proto.Code.RequestHeaderFieldsTooLarge, Message = "Request Header Fields Too Large" }; + var request = new Proto.ReceiveMessageRequest(); + var requestId = "requestId"; + + // Act & Assert + Assert.ThrowsException(() => StatusChecker.Check(status, request, requestId)); + } + + [TestMethod] + public void TestCheckStatusInternalError() + { + // Arrange + var status = new Proto.Status { Code = Proto.Code.InternalError, Message = "Internal Error" }; + var request = new Proto.ReceiveMessageRequest(); + var requestId = "requestId"; + + // Act & Assert + Assert.ThrowsException(() => StatusChecker.Check(status, request, requestId)); + } + + [TestMethod] + public void TestCheckStatusProxyTimeout() + { + // Arrange + var status = new Proto.Status { Code = Proto.Code.ProxyTimeout, Message = "Proxy Timeout" }; + var request = new Proto.ReceiveMessageRequest(); + var requestId = "requestId"; + + // Act & Assert + Assert.ThrowsException(() => StatusChecker.Check(status, request, requestId)); + } + + [TestMethod] + public void TestCheckStatusUnsupported() + { + // Arrange + var status = new Proto.Status { Code = Proto.Code.Unsupported, Message = "Unsupported" }; + var request = new Proto.ReceiveMessageRequest(); + var requestId = "requestId"; + + // Act & Assert + Assert.ThrowsException(() => StatusChecker.Check(status, request, requestId)); + } + + [TestMethod] + public void TestCheckStatusUnrecognized() + { + // Arrange + var status = new Proto.Status { Code = (Proto.Code)999, Message = "Unrecognized" }; + var request = new Proto.ReceiveMessageRequest(); + var requestId = "requestId"; + + // Act & Assert + Assert.ThrowsException(() => StatusChecker.Check(status, request, requestId)); + } + } +} \ No newline at end of file diff --git a/csharp/tests/TransactionTest.cs b/csharp/tests/TransactionTest.cs new file mode 100644 index 00000000..3be592a8 --- /dev/null +++ b/csharp/tests/TransactionTest.cs @@ -0,0 +1,398 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +using System; +using System.Collections.Concurrent; +using System.Linq; +using System.Text; +using System.Threading.Tasks; +using Microsoft.VisualStudio.TestTools.UnitTesting; +using Moq; +using Org.Apache.Rocketmq; +using Proto = Apache.Rocketmq.V2; + +namespace tests +{ + [TestClass] + public class TransactionTest + { + [TestMethod] + public void TestTryAddMessage() + { + var producer = CreateTestClient(); + var transaction = new Transaction(producer); + var bytes = Encoding.UTF8.GetBytes("fakeBytes"); + const string tag = "fakeTag"; + var message = new Message.Builder() + .SetTopic("fakeTopic") + .SetBody(bytes) + .SetTag(tag) + .SetKeys("fakeMsgKey") + .Build(); + var publishingMessage = transaction.TryAddMessage(message); + Assert.AreEqual(MessageType.Transaction, publishingMessage.MessageType); + } + + [TestMethod] + [ExpectedException(typeof(ArgumentException))] + public void TestTryAddExceededMessages() + { + var producer = CreateTestClient(); + var transaction = new Transaction(producer); + var bytes = Encoding.UTF8.GetBytes("fakeBytes"); + const string tag = "fakeTag"; + var message0 = new Message.Builder() + .SetTopic("fakeTopic") + .SetBody(bytes) + .SetTag(tag) + .SetKeys("fakeMsgKey") + .Build(); + transaction.TryAddMessage(message0); + var message1 = new Message.Builder() + .SetTopic("fakeTopic") + .SetBody(bytes) + .SetTag(tag) + .SetKeys("fakeMsgKey") + .Build(); + transaction.TryAddMessage(message1); + } + + [TestMethod] + public void TestTryAddReceipt() + { + var producer = CreateTestClient(); + var transaction = new Transaction(producer); + var bytes = Encoding.UTF8.GetBytes("fakeBytes"); + const string tag = "fakeTag"; + var message = new Message.Builder() + .SetTopic("fakeTopic") + .SetBody(bytes) + .SetTag(tag) + .SetKeys("fakeMsgKey") + .Build(); + var publishingMessage = transaction.TryAddMessage(message); + var mq0 = new Proto.MessageQueue + { + Broker = new Proto.Broker + { + Name = "broker0", + Endpoints = new Proto.Endpoints + { + Scheme = Proto.AddressScheme.Ipv4, + Addresses = + { + new Proto.Address + { + Host = "127.0.0.1", + Port = 8080 + } + } + } + }, + Id = 0, + Permission = Proto.Permission.ReadWrite, + Topic = new Proto.Resource + { + ResourceNamespace = "foo-bar-namespace", + Name = "testTopic", + } + }; + var metadata = producer.Sign(); + var sendResultEntry = new Proto.SendResultEntry + { + MessageId = "fakeMsgId", + TransactionId = "fakeTxId", + Status = new Proto.Status + { + Code = Proto.Code.Ok + }, + Offset = 1 + }; + var sendMessageResponse = new Proto.SendMessageResponse + { + Status = new Proto.Status + { + Code = Proto.Code.Ok + }, + Entries = { sendResultEntry } + }; + var invocation = new RpcInvocation(null, sendMessageResponse, metadata); + var sendReceipt = SendReceipt.ProcessSendMessageResponse(new MessageQueue(mq0), invocation); + transaction.TryAddReceipt(publishingMessage, sendReceipt.GetEnumerator().Current); + } + + [TestMethod] + [ExpectedException(typeof(ArgumentException))] + public void TestTryAddReceiptNotContained() + { + var producer = CreateTestClient(); + var transaction = new Transaction(producer); + var bytes = Encoding.UTF8.GetBytes("fakeBytes"); + const string tag = "fakeTag"; + var message = new Message.Builder() + .SetTopic("fakeTopic") + .SetBody(bytes) + .SetTag(tag) + .SetKeys("fakeMsgKey") + .Build(); + var publishingMessage = new PublishingMessage(message, new PublishingSettings("fakeNamespace", + "fakeClientId", new Endpoints("fakeEndpoints"), new Mock().Object, + TimeSpan.FromSeconds(10), new ConcurrentDictionary()), true); + var mq0 = new Proto.MessageQueue + { + Broker = new Proto.Broker + { + Name = "broker0", + Endpoints = new Proto.Endpoints + { + Scheme = Proto.AddressScheme.Ipv4, + Addresses = + { + new Proto.Address + { + Host = "127.0.0.1", + Port = 8080 + } + } + } + }, + Id = 0, + Permission = Proto.Permission.ReadWrite, + Topic = new Proto.Resource + { + ResourceNamespace = "foo-bar-namespace", + Name = "TestTopic", + } + }; + var metadata = producer.Sign(); + var sendResultEntry = new Proto.SendResultEntry + { + MessageId = "fakeMsgId", + TransactionId = "fakeTxId", + Status = new Proto.Status + { + Code = Proto.Code.Ok + }, + Offset = 1 + }; + var sendMessageResponse = new Proto.SendMessageResponse + { + Status = new Proto.Status + { + Code = Proto.Code.Ok + }, + Entries = { sendResultEntry } + }; + var invocation = new RpcInvocation(null, sendMessageResponse, metadata); + var sendReceipt = SendReceipt.ProcessSendMessageResponse(new MessageQueue(mq0), invocation); + transaction.TryAddReceipt(publishingMessage, sendReceipt.GetEnumerator().Current); + } + + [TestMethod] + [ExpectedException(typeof(InvalidOperationException))] + public async Task TestCommitWithNoReceipts() + { + var producer = CreateTestClient(); + var transaction = new Transaction(producer); + await transaction.Commit(); + } + + [TestMethod] + [ExpectedException(typeof(InvalidOperationException))] + public async Task TestRollbackWithNoReceipts() + { + var producer = CreateTestClient(); + var transaction = new Transaction(producer); + await transaction.Rollback(); + } + + [TestMethod] + public async Task TestCommit() + { + var producer = CreateTestClient(); + var transaction = new Transaction(producer); + var bytes = Encoding.UTF8.GetBytes("fakeBytes"); + const string tag = "fakeTag"; + var message = new Message.Builder() + .SetTopic("fakeTopic") + .SetBody(bytes) + .SetTag(tag) + .SetKeys("fakeMsgKey") + .Build(); + var publishingMessage = transaction.TryAddMessage(message); + var mq0 = new Proto.MessageQueue + { + Broker = new Proto.Broker + { + Name = "broker0", + Endpoints = new Proto.Endpoints + { + Scheme = Proto.AddressScheme.Ipv4, + Addresses = + { + new Proto.Address + { + Host = "127.0.0.1", + Port = 8080 + } + } + } + }, + Id = 0, + Permission = Proto.Permission.ReadWrite, + Topic = new Proto.Resource + { + ResourceNamespace = "foo-bar-namespace", + Name = "TestTopic", + } + }; + var metadata = producer.Sign(); + var sendResultEntry = new Proto.SendResultEntry + { + MessageId = "fakeMsgId", + TransactionId = "fakeTxId", + Status = new Proto.Status + { + Code = Proto.Code.Ok + }, + Offset = 1 + }; + var sendMessageResponse = new Proto.SendMessageResponse + { + Status = new Proto.Status + { + Code = Proto.Code.Ok + }, + Entries = { sendResultEntry } + }; + var sendMessageInvocation = new RpcInvocation(null, + sendMessageResponse, metadata); + var sendReceipt = SendReceipt.ProcessSendMessageResponse(new MessageQueue(mq0), + sendMessageInvocation); + transaction.TryAddReceipt(publishingMessage, sendReceipt.First()); + + var mockClientManager = new Mock(); + producer.SetClientManager(mockClientManager.Object); + var endTransactionMetadata = producer.Sign(); + var endTransactionResponse = new Proto.EndTransactionResponse + { + Status = new Proto.Status + { + Code = Proto.Code.Ok + } + }; + var endTransactionInvocation = new RpcInvocation(null, + endTransactionResponse, endTransactionMetadata); + mockClientManager.Setup(cm => cm.EndTransaction(It.IsAny(), + It.IsAny(), It.IsAny())).Returns(Task.FromResult(endTransactionInvocation)); + + producer.State = State.Running; + await transaction.Commit(); + } + + [TestMethod] + public async Task TestRollback() + { + var producer = CreateTestClient(); + var transaction = new Transaction(producer); + var bytes = Encoding.UTF8.GetBytes("fakeBytes"); + const string tag = "fakeTag"; + var message = new Message.Builder() + .SetTopic("fakeTopic") + .SetBody(bytes) + .SetTag(tag) + .SetKeys("fakeMsgKey") + .Build(); + var publishingMessage = transaction.TryAddMessage(message); + var mq0 = new Proto.MessageQueue + { + Broker = new Proto.Broker + { + Name = "broker0", + Endpoints = new Proto.Endpoints + { + Scheme = Proto.AddressScheme.Ipv4, + Addresses = + { + new Proto.Address + { + Host = "127.0.0.1", + Port = 8080 + } + } + } + }, + Id = 0, + Permission = Proto.Permission.ReadWrite, + Topic = new Proto.Resource + { + ResourceNamespace = "foo-bar-namespace", + Name = "TestTopic", + } + }; + var metadata = producer.Sign(); + var sendResultEntry = new Proto.SendResultEntry + { + MessageId = "fakeMsgId", + TransactionId = "fakeTxId", + Status = new Proto.Status + { + Code = Proto.Code.Ok + }, + Offset = 1 + }; + var sendMessageResponse = new Proto.SendMessageResponse + { + Status = new Proto.Status + { + Code = Proto.Code.Ok + }, + Entries = { sendResultEntry } + }; + var sendMessageInvocation = new RpcInvocation(null, + sendMessageResponse, metadata); + var sendReceipt = SendReceipt.ProcessSendMessageResponse(new MessageQueue(mq0), + sendMessageInvocation); + transaction.TryAddReceipt(publishingMessage, sendReceipt.First()); + + var mockClientManager = new Mock(); + producer.SetClientManager(mockClientManager.Object); + var endTransactionMetadata = producer.Sign(); + var endTransactionResponse = new Proto.EndTransactionResponse + { + Status = new Proto.Status + { + Code = Proto.Code.Ok + } + }; + var endTransactionInvocation = new RpcInvocation(null, + endTransactionResponse, endTransactionMetadata); + mockClientManager.Setup(cm => cm.EndTransaction(It.IsAny(), + It.IsAny(), It.IsAny())).Returns(Task.FromResult(endTransactionInvocation)); + + producer.State = State.Running; + await transaction.Rollback(); + } + + private Producer CreateTestClient() + { + var clientConfig = new ClientConfig.Builder().SetEndpoints("127.0.0.1:9876").Build(); + return new Producer(clientConfig, new ConcurrentDictionary(), + 1, null); + } + } +} \ No newline at end of file