Skip to content

Commit

Permalink
test(proxy): add unit test for ProxyService
Browse files Browse the repository at this point in the history
Signed-off-by: SSpirits <admin@lv5.moe>
  • Loading branch information
ShadowySpirits committed Dec 8, 2023
1 parent e284f99 commit ab29ca5
Show file tree
Hide file tree
Showing 8 changed files with 203 additions and 15 deletions.
6 changes: 6 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -187,6 +187,12 @@
<version>${junit-jupiter.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.junit.jupiter</groupId>
<artifactId>junit-jupiter-api</artifactId>
<version>${junit-jupiter.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.assertj</groupId>
<artifactId>assertj-core</artifactId>
Expand Down
5 changes: 5 additions & 0 deletions proxy/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -62,5 +62,10 @@
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>io.grpc</groupId>
<artifactId>grpc-testing</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
</project>
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,6 @@
import io.grpc.Grpc;
import io.grpc.InsecureChannelCredentials;
import io.grpc.ManagedChannel;
import java.io.IOException;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
Expand All @@ -61,28 +60,28 @@ public class GrpcProxyClient implements ProxyClient {

private final GrpcClientConfig clientConfig;

private final ConcurrentHashMap<String, ProxyServiceGrpc.ProxyServiceFutureStub> stubs;
private final ConcurrentHashMap<String, ProxyServiceGrpc.ProxyServiceFutureStub> stubMap;

public GrpcProxyClient(GrpcClientConfig clientConfig) {
this.clientConfig = clientConfig;
stubs = new ConcurrentHashMap<>();
stubMap = new ConcurrentHashMap<>();
}

private ProxyServiceGrpc.ProxyServiceFutureStub getOrCreateStubForTarget(String target) {
if (Strings.isNullOrEmpty(target)) {
throw new IllegalArgumentException("target is null or empty");
}

if (!stubs.containsKey(target)) {
if (!stubMap.containsKey(target)) {
ManagedChannel channel = Grpc.newChannelBuilder(target, InsecureChannelCredentials.create())
.build();
ProxyServiceGrpc.ProxyServiceFutureStub stub = ProxyServiceGrpc.newFutureStub(channel);
stubs.putIfAbsent(target, stub);
stubMap.putIfAbsent(target, stub);

Check warning on line 79 in proxy/src/main/java/com/automq/rocketmq/proxy/grpc/client/GrpcProxyClient.java

View check run for this annotation

Codecov / codecov/patch

proxy/src/main/java/com/automq/rocketmq/proxy/grpc/client/GrpcProxyClient.java#L79

Added line #L79 was not covered by tests
}
Duration timeout = clientConfig.rpcTimeout();
long rpcTimeout = TimeUnit.SECONDS.toMillis(timeout.getSeconds())
+ TimeUnit.NANOSECONDS.toMillis(timeout.getNanos());
return stubs.get(target).withDeadlineAfter(rpcTimeout, TimeUnit.MILLISECONDS);
return stubMap.get(target).withDeadlineAfter(rpcTimeout, TimeUnit.MILLISECONDS);
}

@Override
Expand Down Expand Up @@ -233,15 +232,15 @@ public void onFailure(Throwable t) {
}

@Override
public void close() throws IOException {
for (Map.Entry<String, ProxyServiceGrpc.ProxyServiceFutureStub> entry : stubs.entrySet()) {
public void close() {
for (Map.Entry<String, ProxyServiceGrpc.ProxyServiceFutureStub> entry : stubMap.entrySet()) {
Channel channel = entry.getValue().getChannel();
if (channel instanceof ManagedChannel managedChannel) {
managedChannel.shutdownNow();
}
}

for (Map.Entry<String, ProxyServiceGrpc.ProxyServiceFutureStub> entry : stubs.entrySet()) {
for (Map.Entry<String, ProxyServiceGrpc.ProxyServiceFutureStub> entry : stubMap.entrySet()) {
Channel channel = entry.getValue().getChannel();
if (channel instanceof ManagedChannel managedChannel) {
try {
Expand Down
104 changes: 104 additions & 0 deletions proxy/src/test/java/com/automq/rocketmq/proxy/grpc/GrpcServerRule.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.automq.rocketmq.proxy.grpc;

import com.google.common.base.Preconditions;
import io.grpc.ManagedChannel;
import io.grpc.Server;
import io.grpc.inprocess.InProcessChannelBuilder;
import io.grpc.inprocess.InProcessServerBuilder;
import io.grpc.util.MutableHandlerRegistry;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import org.junit.jupiter.api.extension.AfterEachCallback;
import org.junit.jupiter.api.extension.BeforeEachCallback;
import org.junit.jupiter.api.extension.ExtensionContext;

public final class GrpcServerRule implements BeforeEachCallback, AfterEachCallback {
private ManagedChannel channel;
private Server server;
private String serverName;
private MutableHandlerRegistry serviceRegistry;
private boolean useDirectExecutor;

public GrpcServerRule() {
}

public GrpcServerRule directExecutor() {
Preconditions.checkState(this.serverName == null, "directExecutor() can only be called at the rule instantiation");
this.useDirectExecutor = true;
return this;
}

public ManagedChannel getChannel() {
return this.channel;
}

public Server getServer() {
return this.server;
}

public String getServerName() {
return this.serverName;
}

public MutableHandlerRegistry getServiceRegistry() {
return this.serviceRegistry;
}

@Override
public void afterEach(ExtensionContext context) throws Exception {
this.serverName = null;
this.serviceRegistry = null;
this.channel.shutdown();
this.server.shutdown();

try {
this.channel.awaitTermination(1L, TimeUnit.MINUTES);
this.server.awaitTermination(1L, TimeUnit.MINUTES);
} catch (InterruptedException var5) {
Thread.currentThread().interrupt();
throw new RuntimeException(var5);
} finally {
this.channel.shutdownNow();
this.channel = null;
this.server.shutdownNow();
this.server = null;
}

}

@Override
public void beforeEach(ExtensionContext context) throws Exception {
this.serverName = UUID.randomUUID().toString();
this.serviceRegistry = new MutableHandlerRegistry();
InProcessServerBuilder serverBuilder = InProcessServerBuilder.forName(this.serverName).fallbackHandlerRegistry(this.serviceRegistry);
if (this.useDirectExecutor) {
serverBuilder.directExecutor();
}

this.server = serverBuilder.build().start();
InProcessChannelBuilder channelBuilder = InProcessChannelBuilder.forName(this.serverName);
if (this.useDirectExecutor) {
channelBuilder.directExecutor();
}

this.channel = channelBuilder.build();
}
}

Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.automq.rocketmq.proxy.grpc;

import apache.rocketmq.common.v1.Code;
import apache.rocketmq.proxy.v1.ProxyServiceGrpc;
import apache.rocketmq.proxy.v1.Status;
import com.automq.rocketmq.common.config.BrokerConfig;
import com.automq.rocketmq.common.model.FlatMessageExt;
import com.automq.rocketmq.proxy.grpc.client.GrpcProxyClient;
import com.automq.rocketmq.proxy.mock.MockMessageUtil;
import com.automq.rocketmq.proxy.service.ExtendMessageService;
import com.automq.rocketmq.store.api.MessageStore;
import com.automq.rocketmq.store.model.message.PutResult;
import java.lang.reflect.Field;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentMap;
import org.apache.rocketmq.broker.client.ConsumerManager;
import org.apache.rocketmq.broker.client.ProducerManager;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;

class ProxyServiceImplTest {
private static final String TARGET = "target";

@RegisterExtension
public final GrpcServerRule grpcServerRule = new GrpcServerRule().directExecutor();
GrpcProxyClient proxyClient;

@BeforeEach
@SuppressWarnings({"unchecked", "rawtypes"})
public void setup() throws NoSuchFieldException, IllegalAccessException {
MessageStore messageStore = mock(MessageStore.class);
when(messageStore.put(any(), any())).thenReturn(CompletableFuture.completedFuture(new PutResult(PutResult.Status.PUT_OK, 0)));

ExtendMessageService messageService = mock(ExtendMessageService.class);
ProducerManager producerManager = mock(ProducerManager.class);
ConsumerManager consumerManager = mock(ConsumerManager.class);
ProxyServiceImpl server = new ProxyServiceImpl(messageStore, messageService, producerManager, consumerManager);
grpcServerRule.getServiceRegistry().addService(server);

ProxyServiceGrpc.ProxyServiceFutureStub stub = ProxyServiceGrpc.newFutureStub(grpcServerRule.getChannel());
proxyClient = new GrpcProxyClient(new BrokerConfig());

Field field = proxyClient.getClass().getDeclaredField("stubMap");
field.setAccessible(true);
ConcurrentMap<String, ProxyServiceGrpc.ProxyServiceFutureStub> stubMap = (ConcurrentMap) field.get(proxyClient);
stubMap.put(TARGET, stub);
}

@Test
void relay() {
FlatMessageExt messageExt = MockMessageUtil.buildMessage(0, 1, "");
Status status = proxyClient.relayMessage(TARGET, messageExt.message()).join();
assertEquals(Code.OK, status.getCode());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -58,10 +58,8 @@ public class StoreMetricsManager extends ServiceThread implements MetricsManager

public static ObservableLongGauge consumerLagMessages = new NopObservableLongGauge();

// TODO: implement consumerLagLatency
public static ObservableLongGauge consumerLagLatency = new NopObservableLongGauge();
public static ObservableLongGauge consumerInflightMessages = new NopObservableLongGauge();
// TODO: implement consumerQueueingLatency
public static ObservableLongGauge consumerQueueingLatency = new NopObservableLongGauge();
public static ObservableLongGauge consumerReadyMessages = new NopObservableLongGauge();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -664,7 +664,6 @@ public AckCommitter(long ackOffset, Consumer<Long> ackAdvanceFn, ByteBuffer seri

public synchronized void commitAck(long offset) {
if (offset >= ackOffset) {
// TODO: how to handle overflow?
int offsetInBitmap = (int) (offset - baseOffset);
bitmap.add(offsetInBitmap);
boolean advance = false;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,6 @@ private Stream openStream(long streamId) {
if (openedStreams.containsKey(streamId)) {
throw new IllegalStateException("Stream " + streamId + " already opened.");
}
// TODO: Build a real OpenStreamOptions
return openedStreams.computeIfAbsent(streamId, id -> streamClient.openStream(id, null).join());
}

Expand All @@ -129,12 +128,12 @@ private void closeStream(long streamId) {
}

@Override
public void start() throws Exception {
public void start() {

}

@Override
public void shutdown() throws Exception {
public void shutdown() {

}
}

0 comments on commit ab29ca5

Please sign in to comment.