Skip to content

Commit

Permalink
Add trivial describeNamespace test server implementation
Browse files Browse the repository at this point in the history
  • Loading branch information
Spikhalskiy committed Apr 21, 2022
1 parent 3db50b6 commit 91945b2
Show file tree
Hide file tree
Showing 4 changed files with 131 additions and 82 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,14 @@
package io.temporal.internal.testservice;

import io.grpc.Status;
import io.grpc.StatusRuntimeException;
import io.grpc.stub.StreamObserver;
import io.temporal.api.enums.v1.IndexedValueType;
import io.temporal.api.operatorservice.v1.*;
import java.io.Closeable;
import java.util.Map;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* In memory implementation of the Operator Service. To be used for testing purposes only.
Expand All @@ -33,6 +36,7 @@
*/
final class TestOperatorService extends OperatorServiceGrpc.OperatorServiceImplBase
implements Closeable {
private static final Logger log = LoggerFactory.getLogger(TestOperatorService.class);

private final TestVisibilityStore visibilityStore;

Expand All @@ -44,47 +48,56 @@ public TestOperatorService(TestVisibilityStore visibilityStore) {
public void addSearchAttributes(
AddSearchAttributesRequest request,
StreamObserver<AddSearchAttributesResponse> responseObserver) {
Map<String, IndexedValueType> registeredSearchAttributes =
visibilityStore.getRegisteredSearchAttributes();
request.getSearchAttributesMap().keySet().stream()
.filter(registeredSearchAttributes::containsKey)
.findFirst()
.ifPresent(
sa -> {
throw Status.ALREADY_EXISTS
.withDescription("Search attribute " + sa + " already exists.")
.asRuntimeException();
});
request.getSearchAttributesMap().forEach(visibilityStore::addSearchAttribute);
responseObserver.onNext(AddSearchAttributesResponse.newBuilder().build());
responseObserver.onCompleted();
try {
Map<String, IndexedValueType> registeredSearchAttributes =
visibilityStore.getRegisteredSearchAttributes();
request.getSearchAttributesMap().keySet().stream()
.filter(registeredSearchAttributes::containsKey)
.findFirst()
.ifPresent(
sa -> {
throw Status.ALREADY_EXISTS
.withDescription("Search attribute " + sa + " already exists.")
.asRuntimeException();
});
request.getSearchAttributesMap().forEach(visibilityStore::addSearchAttribute);
responseObserver.onNext(AddSearchAttributesResponse.newBuilder().build());
responseObserver.onCompleted();
} catch (StatusRuntimeException e) {
handleStatusRuntimeException(e, responseObserver);
}
}

@Override
public void removeSearchAttributes(
RemoveSearchAttributesRequest request,
StreamObserver<RemoveSearchAttributesResponse> responseObserver) {
Map<String, IndexedValueType> registeredSearchAttributes =
visibilityStore.getRegisteredSearchAttributes();
request.getSearchAttributesList().stream()
.filter(k -> !registeredSearchAttributes.containsKey(k))
.findFirst()
.ifPresent(
sa -> {
throw Status.NOT_FOUND
.withDescription("Search attribute " + sa + " doesn't exist.")
.asRuntimeException();
});
request.getSearchAttributesList().forEach(visibilityStore::removeSearchAttribute);
responseObserver.onNext(RemoveSearchAttributesResponse.newBuilder().build());
responseObserver.onCompleted();
try {
Map<String, IndexedValueType> registeredSearchAttributes =
visibilityStore.getRegisteredSearchAttributes();
request.getSearchAttributesList().stream()
.filter(k -> !registeredSearchAttributes.containsKey(k))
.findFirst()
.ifPresent(
sa -> {
throw Status.NOT_FOUND
.withDescription("Search attribute " + sa + " doesn't exist.")
.asRuntimeException();
});
request.getSearchAttributesList().forEach(visibilityStore::removeSearchAttribute);
responseObserver.onNext(RemoveSearchAttributesResponse.newBuilder().build());
responseObserver.onCompleted();
} catch (StatusRuntimeException e) {
handleStatusRuntimeException(e, responseObserver);
}
}

@Override
public void listSearchAttributes(
ListSearchAttributesRequest request,
StreamObserver<ListSearchAttributesResponse> responseObserver) {
super.listSearchAttributes(request, responseObserver);
private void handleStatusRuntimeException(
StatusRuntimeException e, StreamObserver<?> responseObserver) {
if (e.getStatus().getCode() == Status.Code.INTERNAL) {
log.error("unexpected", e);
}
responseObserver.onError(e);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,63 +32,19 @@
import io.temporal.api.common.v1.Payloads;
import io.temporal.api.common.v1.RetryPolicy;
import io.temporal.api.common.v1.WorkflowExecution;
import io.temporal.api.enums.v1.NamespaceState;
import io.temporal.api.enums.v1.SignalExternalWorkflowExecutionFailedCause;
import io.temporal.api.enums.v1.WorkflowExecutionStatus;
import io.temporal.api.enums.v1.WorkflowIdReusePolicy;
import io.temporal.api.errordetails.v1.WorkflowExecutionAlreadyStartedFailure;
import io.temporal.api.failure.v1.Failure;
import io.temporal.api.history.v1.WorkflowExecutionContinuedAsNewEventAttributes;
import io.temporal.api.namespace.v1.NamespaceInfo;
import io.temporal.api.testservice.v1.LockTimeSkippingRequest;
import io.temporal.api.testservice.v1.SleepRequest;
import io.temporal.api.testservice.v1.UnlockTimeSkippingRequest;
import io.temporal.api.workflow.v1.WorkflowExecutionInfo;
import io.temporal.api.workflowservice.v1.DescribeWorkflowExecutionRequest;
import io.temporal.api.workflowservice.v1.DescribeWorkflowExecutionResponse;
import io.temporal.api.workflowservice.v1.GetWorkflowExecutionHistoryRequest;
import io.temporal.api.workflowservice.v1.GetWorkflowExecutionHistoryResponse;
import io.temporal.api.workflowservice.v1.ListClosedWorkflowExecutionsRequest;
import io.temporal.api.workflowservice.v1.ListClosedWorkflowExecutionsResponse;
import io.temporal.api.workflowservice.v1.ListOpenWorkflowExecutionsRequest;
import io.temporal.api.workflowservice.v1.ListOpenWorkflowExecutionsResponse;
import io.temporal.api.workflowservice.v1.PollActivityTaskQueueRequest;
import io.temporal.api.workflowservice.v1.PollActivityTaskQueueResponse;
import io.temporal.api.workflowservice.v1.PollWorkflowTaskQueueRequest;
import io.temporal.api.workflowservice.v1.PollWorkflowTaskQueueResponse;
import io.temporal.api.workflowservice.v1.QueryWorkflowRequest;
import io.temporal.api.workflowservice.v1.QueryWorkflowResponse;
import io.temporal.api.workflowservice.v1.RecordActivityTaskHeartbeatByIdRequest;
import io.temporal.api.workflowservice.v1.RecordActivityTaskHeartbeatByIdResponse;
import io.temporal.api.workflowservice.v1.RecordActivityTaskHeartbeatRequest;
import io.temporal.api.workflowservice.v1.RecordActivityTaskHeartbeatResponse;
import io.temporal.api.workflowservice.v1.RequestCancelWorkflowExecutionRequest;
import io.temporal.api.workflowservice.v1.RequestCancelWorkflowExecutionResponse;
import io.temporal.api.workflowservice.v1.RespondActivityTaskCanceledByIdRequest;
import io.temporal.api.workflowservice.v1.RespondActivityTaskCanceledByIdResponse;
import io.temporal.api.workflowservice.v1.RespondActivityTaskCanceledRequest;
import io.temporal.api.workflowservice.v1.RespondActivityTaskCanceledResponse;
import io.temporal.api.workflowservice.v1.RespondActivityTaskCompletedByIdRequest;
import io.temporal.api.workflowservice.v1.RespondActivityTaskCompletedByIdResponse;
import io.temporal.api.workflowservice.v1.RespondActivityTaskCompletedRequest;
import io.temporal.api.workflowservice.v1.RespondActivityTaskCompletedResponse;
import io.temporal.api.workflowservice.v1.RespondActivityTaskFailedByIdRequest;
import io.temporal.api.workflowservice.v1.RespondActivityTaskFailedByIdResponse;
import io.temporal.api.workflowservice.v1.RespondActivityTaskFailedRequest;
import io.temporal.api.workflowservice.v1.RespondActivityTaskFailedResponse;
import io.temporal.api.workflowservice.v1.RespondQueryTaskCompletedRequest;
import io.temporal.api.workflowservice.v1.RespondQueryTaskCompletedResponse;
import io.temporal.api.workflowservice.v1.RespondWorkflowTaskCompletedRequest;
import io.temporal.api.workflowservice.v1.RespondWorkflowTaskCompletedResponse;
import io.temporal.api.workflowservice.v1.RespondWorkflowTaskFailedRequest;
import io.temporal.api.workflowservice.v1.RespondWorkflowTaskFailedResponse;
import io.temporal.api.workflowservice.v1.SignalWithStartWorkflowExecutionRequest;
import io.temporal.api.workflowservice.v1.SignalWithStartWorkflowExecutionResponse;
import io.temporal.api.workflowservice.v1.SignalWorkflowExecutionRequest;
import io.temporal.api.workflowservice.v1.SignalWorkflowExecutionResponse;
import io.temporal.api.workflowservice.v1.StartWorkflowExecutionRequest;
import io.temporal.api.workflowservice.v1.StartWorkflowExecutionResponse;
import io.temporal.api.workflowservice.v1.TerminateWorkflowExecutionRequest;
import io.temporal.api.workflowservice.v1.TerminateWorkflowExecutionResponse;
import io.temporal.api.workflowservice.v1.WorkflowServiceGrpc;
import io.temporal.api.workflowservice.v1.*;
import io.temporal.internal.common.ProtobufTimeUtils;
import io.temporal.internal.testservice.TestWorkflowStore.WorkflowState;
import io.temporal.serviceclient.StatusUtils;
Expand Down Expand Up @@ -119,7 +75,6 @@
*/
public final class TestWorkflowService extends WorkflowServiceGrpc.WorkflowServiceImplBase
implements Closeable {

private static final Logger log = LoggerFactory.getLogger(TestWorkflowService.class);
private final Map<ExecutionId, TestWorkflowMutableState> executions = new HashMap<>();
// key->WorkflowId
Expand Down Expand Up @@ -1026,6 +981,34 @@ public void describeWorkflowExecution(
}
}

/**
* This method doesn't make much sense for test server, it accepts all namespaces as existent and
* registered. so, it's a trivial implementation just returning an info that a namespace is
* registered irrespectively of the input
*/
@Override
public void describeNamespace(
DescribeNamespaceRequest request,
StreamObserver<DescribeNamespaceResponse> responseObserver) {
try {
// generating a stable UUID for name
String namespaceId = UUID.nameUUIDFromBytes(request.getNamespace().getBytes()).toString();
DescribeNamespaceResponse result =
DescribeNamespaceResponse.newBuilder()
.setNamespaceInfo(
NamespaceInfo.newBuilder()
.setName(request.getNamespace())
.setState(NamespaceState.NAMESPACE_STATE_REGISTERED)
.setId(namespaceId)
.build())
.build();
responseObserver.onNext(result);
responseObserver.onCompleted();
} catch (StatusRuntimeException e) {
handleStatusRuntimeException(e, responseObserver);
}
}

private <R> R requireNotNull(String fieldName, R value) {
if (value == null) {
throw Status.INVALID_ARGUMENT
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
/*
* Copyright (C) 2020 Temporal Technologies, Inc. All Rights Reserved.
*
* Copyright 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Modifications copyright (C) 2017 Uber Technologies, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"). You may not
* use this file except in compliance with the License. A copy of the License is
* located at
*
* http://aws.amazon.com/apache2.0
*
* or in the "license" file accompanying this file. This file is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
* express or implied. See the License for the specific language governing
* permissions and limitations under the License.
*/

package io.temporal.testserver.functional;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;

import io.temporal.api.enums.v1.NamespaceState;
import io.temporal.api.workflowservice.v1.DescribeNamespaceRequest;
import io.temporal.api.workflowservice.v1.DescribeNamespaceResponse;
import io.temporal.internal.docker.RegisterTestNamespace;
import io.temporal.testing.internal.SDKTestWorkflowRule;
import org.junit.Rule;
import org.junit.Test;

public class DescribeNamespaceTest {
@Rule public SDKTestWorkflowRule testWorkflowRule = SDKTestWorkflowRule.newBuilder().build();

@Test
public void testDescribeNamespace() {
DescribeNamespaceResponse describeNamespaceResponse =
testWorkflowRule
.getWorkflowServiceStubs()
.blockingStub()
.describeNamespace(
DescribeNamespaceRequest.newBuilder()
.setNamespace(RegisterTestNamespace.NAMESPACE)
.build());
assertEquals(
NamespaceState.NAMESPACE_STATE_REGISTERED,
describeNamespaceResponse.getNamespaceInfo().getState());
assertEquals(
RegisterTestNamespace.NAMESPACE, describeNamespaceResponse.getNamespaceInfo().getName());
assertTrue(describeNamespaceResponse.getNamespaceInfo().getId().length() > 0);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class DescribeTest {
public class DescribeWorkflowExecutionTest {

@Rule
public SDKTestWorkflowRule testWorkflowRule =
Expand Down

0 comments on commit 91945b2

Please sign in to comment.