diff --git a/build.gradle b/build.gradle index aaa5c9ae5..79c6c8584 100644 --- a/build.gradle +++ b/build.gradle @@ -24,6 +24,7 @@ plugins { allprojects { repositories { + mavenLocal() mavenCentral() } } @@ -32,7 +33,7 @@ ext { // Platforms grpcVersion = '1.54.1' // [1.38.0,) Needed for io.grpc.protobuf.services.HealthStatusManager jacksonVersion = '2.14.2' // [2.9.0,) - nexusVersion = '0.2.0-alpha' + nexusVersion = '0.3.0' // we don't upgrade to 1.10.x because it requires kotlin 1.6. Users may use 1.10.x in their environments though. micrometerVersion = project.hasProperty("edgeDepsTest") ? '1.10.5' : '1.9.9' // [1.0.0,) diff --git a/temporal-opentracing/src/main/java/io/temporal/opentracing/OpenTracingWorkerInterceptor.java b/temporal-opentracing/src/main/java/io/temporal/opentracing/OpenTracingWorkerInterceptor.java index 095117f96..85b4d6663 100644 --- a/temporal-opentracing/src/main/java/io/temporal/opentracing/OpenTracingWorkerInterceptor.java +++ b/temporal-opentracing/src/main/java/io/temporal/opentracing/OpenTracingWorkerInterceptor.java @@ -21,10 +21,7 @@ package io.temporal.opentracing; import io.temporal.common.interceptors.*; -import io.temporal.opentracing.internal.ContextAccessor; -import io.temporal.opentracing.internal.OpenTracingActivityInboundCallsInterceptor; -import io.temporal.opentracing.internal.OpenTracingWorkflowInboundCallsInterceptor; -import io.temporal.opentracing.internal.SpanFactory; +import io.temporal.opentracing.internal.*; public class OpenTracingWorkerInterceptor implements WorkerInterceptor { private final OpenTracingOptions options; @@ -52,4 +49,11 @@ public ActivityInboundCallsInterceptor interceptActivity(ActivityInboundCallsInt return new OpenTracingActivityInboundCallsInterceptor( next, options, spanFactory, contextAccessor); } + + @Override + public NexusOperationInboundCallsInterceptor interceptNexusOperation( + NexusOperationInboundCallsInterceptor next) { + return new OpenTracingNexusOperationInboundCallsInterceptor( + next, options, spanFactory, contextAccessor); + } } diff --git a/temporal-opentracing/src/main/java/io/temporal/opentracing/SpanOperationType.java b/temporal-opentracing/src/main/java/io/temporal/opentracing/SpanOperationType.java index 513606865..5e10ae45a 100644 --- a/temporal-opentracing/src/main/java/io/temporal/opentracing/SpanOperationType.java +++ b/temporal-opentracing/src/main/java/io/temporal/opentracing/SpanOperationType.java @@ -34,7 +34,10 @@ public enum SpanOperationType { UPDATE_WORKFLOW("UpdateWorkflow"), HANDLE_QUERY("HandleQuery"), HANDLE_SIGNAL("HandleSignal"), - HANDLE_UPDATE("HandleUpdate"); + HANDLE_UPDATE("HandleUpdate"), + EXECUTE_NEXUS_OPERATION("ExecuteNexusOperation"), + START_NEXUS_OPERATION("StartNexusOperation"), + CANCEL_NEXUS_OPERATION("CancelNexusOperation"); private final String defaultPrefix; diff --git a/temporal-opentracing/src/main/java/io/temporal/opentracing/internal/ActionTypeAndNameSpanBuilderProvider.java b/temporal-opentracing/src/main/java/io/temporal/opentracing/internal/ActionTypeAndNameSpanBuilderProvider.java index 43d226226..cebc07621 100644 --- a/temporal-opentracing/src/main/java/io/temporal/opentracing/internal/ActionTypeAndNameSpanBuilderProvider.java +++ b/temporal-opentracing/src/main/java/io/temporal/opentracing/internal/ActionTypeAndNameSpanBuilderProvider.java @@ -100,6 +100,13 @@ protected Map getSpanTags(SpanCreationContext context) { return ImmutableMap.of( StandardTagNames.WORKFLOW_ID, context.getWorkflowId(), StandardTagNames.RUN_ID, context.getRunId()); + case EXECUTE_NEXUS_OPERATION: + return ImmutableMap.of( + StandardTagNames.WORKFLOW_ID, context.getWorkflowId(), + StandardTagNames.RUN_ID, context.getRunId()); + case START_NEXUS_OPERATION: + case CANCEL_NEXUS_OPERATION: + return ImmutableMap.of(); case HANDLE_QUERY: return ImmutableMap.of(); } diff --git a/temporal-opentracing/src/main/java/io/temporal/opentracing/internal/ContextAccessor.java b/temporal-opentracing/src/main/java/io/temporal/opentracing/internal/ContextAccessor.java index 617150856..9cb601c5b 100644 --- a/temporal-opentracing/src/main/java/io/temporal/opentracing/internal/ContextAccessor.java +++ b/temporal-opentracing/src/main/java/io/temporal/opentracing/internal/ContextAccessor.java @@ -24,6 +24,8 @@ import io.opentracing.Span; import io.opentracing.SpanContext; import io.opentracing.Tracer; +import io.opentracing.propagation.Format; +import io.opentracing.propagation.TextMapAdapter; import io.temporal.api.common.v1.Payload; import io.temporal.common.converter.DefaultDataConverter; import io.temporal.common.converter.StdConverterBackwardsCompatAdapter; @@ -72,4 +74,20 @@ public SpanContext readSpanContextFromHeader(Header header, Tracer tracer) { payload, HashMap.class, HASH_MAP_STRING_STRING_TYPE); return codec.decode(serializedSpanContext, tracer); } + + public Span writeSpanContextToHeader( + Supplier spanSupplier, Map header, Tracer tracer) { + Span span = spanSupplier.get(); + writeSpanContextToHeader(span.context(), header, tracer); + return span; + } + + public void writeSpanContextToHeader( + SpanContext spanContext, Map header, Tracer tracer) { + tracer.inject(spanContext, Format.Builtin.HTTP_HEADERS, new TextMapAdapter(header)); + } + + public SpanContext readSpanContextFromHeader(Map header, Tracer tracer) { + return tracer.extract(Format.Builtin.HTTP_HEADERS, new TextMapAdapter(header)); + } } diff --git a/temporal-opentracing/src/main/java/io/temporal/opentracing/internal/OpenTracingNexusOperationInboundCallsInterceptor.java b/temporal-opentracing/src/main/java/io/temporal/opentracing/internal/OpenTracingNexusOperationInboundCallsInterceptor.java new file mode 100644 index 000000000..f5420c7b7 --- /dev/null +++ b/temporal-opentracing/src/main/java/io/temporal/opentracing/internal/OpenTracingNexusOperationInboundCallsInterceptor.java @@ -0,0 +1,97 @@ +/* + * Copyright (C) 2022 Temporal Technologies, Inc. All Rights Reserved. + * + * Copyright (C) 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Modifications copyright (C) 2017 Uber Technologies, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this material except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.temporal.opentracing.internal; + +import io.nexusrpc.OperationUnsuccessfulException; +import io.opentracing.Scope; +import io.opentracing.Span; +import io.opentracing.SpanContext; +import io.opentracing.Tracer; +import io.temporal.common.interceptors.NexusOperationInboundCallsInterceptor; +import io.temporal.common.interceptors.NexusOperationInboundCallsInterceptorBase; +import io.temporal.opentracing.OpenTracingOptions; + +public class OpenTracingNexusOperationInboundCallsInterceptor + extends NexusOperationInboundCallsInterceptorBase { + private final OpenTracingOptions options; + private final SpanFactory spanFactory; + private final Tracer tracer; + private final ContextAccessor contextAccessor; + + public OpenTracingNexusOperationInboundCallsInterceptor( + NexusOperationInboundCallsInterceptor next, + OpenTracingOptions options, + SpanFactory spanFactory, + ContextAccessor contextAccessor) { + super(next); + this.options = options; + this.spanFactory = spanFactory; + this.tracer = options.getTracer(); + this.contextAccessor = contextAccessor; + } + + @Override + public StartOperationOutput startOperation(StartOperationInput input) + throws OperationUnsuccessfulException { + SpanContext rootSpanContext = + contextAccessor.readSpanContextFromHeader(input.getOperationContext().getHeaders(), tracer); + + Span operationStartSpan = + spanFactory + .createStartNexusOperationSpan( + tracer, + input.getOperationContext().getService(), + input.getOperationContext().getOperation(), + rootSpanContext) + .start(); + try (Scope scope = tracer.scopeManager().activate(operationStartSpan)) { + return super.startOperation(input); + } catch (Throwable t) { + spanFactory.logFail(operationStartSpan, t); + throw t; + } finally { + operationStartSpan.finish(); + } + } + + @Override + public CancelOperationOutput cancelOperation(CancelOperationInput input) { + SpanContext rootSpanContext = + contextAccessor.readSpanContextFromHeader(input.getOperationContext().getHeaders(), tracer); + + Span operationCancelSpan = + spanFactory + .createCancelNexusOperationSpan( + tracer, + input.getOperationContext().getService(), + input.getOperationContext().getOperation(), + rootSpanContext) + .start(); + try (Scope scope = tracer.scopeManager().activate(operationCancelSpan)) { + return super.cancelOperation(input); + } catch (Throwable t) { + spanFactory.logFail(operationCancelSpan, t); + throw t; + } finally { + operationCancelSpan.finish(); + } + } +} diff --git a/temporal-opentracing/src/main/java/io/temporal/opentracing/internal/OpenTracingWorkflowOutboundCallsInterceptor.java b/temporal-opentracing/src/main/java/io/temporal/opentracing/internal/OpenTracingWorkflowOutboundCallsInterceptor.java index 7b6add447..40b4ac42e 100644 --- a/temporal-opentracing/src/main/java/io/temporal/opentracing/internal/OpenTracingWorkflowOutboundCallsInterceptor.java +++ b/temporal-opentracing/src/main/java/io/temporal/opentracing/internal/OpenTracingWorkflowOutboundCallsInterceptor.java @@ -30,6 +30,8 @@ import io.temporal.workflow.Workflow; import io.temporal.workflow.WorkflowInfo; import io.temporal.workflow.unsafe.WorkflowUnsafe; +import java.util.HashMap; +import java.util.Map; public class OpenTracingWorkflowOutboundCallsInterceptor extends WorkflowOutboundCallsInterceptorBase { @@ -100,6 +102,33 @@ public ChildWorkflowOutput executeChildWorkflow(ChildWorkflowInput inp } } + @Override + public ExecuteNexusOperationOutput executeNexusOperation( + ExecuteNexusOperationInput input) { + if (!WorkflowUnsafe.isReplaying()) { + Map headers = new HashMap(input.getHeaders()); + Span nexusOperationExecuteSpan = + contextAccessor.writeSpanContextToHeader( + () -> createNexusOperationExecuteSpanBuilder(input).start(), headers, tracer); + try (Scope ignored = tracer.scopeManager().activate(nexusOperationExecuteSpan)) { + return super.executeNexusOperation( + new ExecuteNexusOperationInput( + input.getEndpoint(), + input.getService(), + input.getOperation(), + input.getResultClass(), + input.getResultType(), + input.getArg(), + input.getOptions(), + headers)); + } finally { + nexusOperationExecuteSpan.finish(); + } + } else { + return super.executeNexusOperation(input); + } + } + @Override public SignalExternalOutput signalExternalWorkflow(SignalExternalInput input) { if (!WorkflowUnsafe.isReplaying()) { @@ -176,6 +205,17 @@ private Tracer.SpanBuilder createChildWorkflowStartSpanBuilder(ChildWorkflow parentWorkflowInfo.getRunId()); } + private Tracer.SpanBuilder createNexusOperationExecuteSpanBuilder( + ExecuteNexusOperationInput input) { + WorkflowInfo parentWorkflowInfo = Workflow.getInfo(); + return spanFactory.createNexusOperationExecuteSpan( + tracer, + input.getService(), + input.getOperation(), + parentWorkflowInfo.getWorkflowId(), + parentWorkflowInfo.getRunId()); + } + private Tracer.SpanBuilder createContinueAsNewWorkflowStartSpanBuilder(ContinueAsNewInput input) { WorkflowInfo continuedWorkflowInfo = Workflow.getInfo(); return spanFactory.createContinueAsNewWorkflowStartSpan( diff --git a/temporal-opentracing/src/main/java/io/temporal/opentracing/internal/SpanFactory.java b/temporal-opentracing/src/main/java/io/temporal/opentracing/internal/SpanFactory.java index 9e0e79ed9..22346976f 100644 --- a/temporal-opentracing/src/main/java/io/temporal/opentracing/internal/SpanFactory.java +++ b/temporal-opentracing/src/main/java/io/temporal/opentracing/internal/SpanFactory.java @@ -59,6 +59,18 @@ public Tracer.SpanBuilder createWorkflowStartSpan( return createSpan(context, tracer, null, References.FOLLOWS_FROM); } + public Tracer.SpanBuilder createNexusOperationExecuteSpan( + Tracer tracer, String serviceName, String operationName, String workflowId, String runId) { + SpanCreationContext context = + SpanCreationContext.newBuilder() + .setSpanOperationType(SpanOperationType.EXECUTE_NEXUS_OPERATION) + .setActionName(serviceName + "." + operationName) + .setWorkflowId(workflowId) + .setRunId(runId) + .build(); + return createSpan(context, tracer, null, References.CHILD_OF); + } + public Tracer.SpanBuilder createChildWorkflowStartSpan( Tracer tracer, String childWorkflowType, @@ -173,6 +185,26 @@ public Tracer.SpanBuilder createActivityRunSpan( return createSpan(context, tracer, activityStartSpanContext, References.FOLLOWS_FROM); } + public Tracer.SpanBuilder createStartNexusOperationSpan( + Tracer tracer, String serviceName, String operationName, SpanContext nexusStartSpanContext) { + SpanCreationContext context = + SpanCreationContext.newBuilder() + .setSpanOperationType(SpanOperationType.START_NEXUS_OPERATION) + .setActionName(serviceName + "." + operationName) + .build(); + return createSpan(context, tracer, nexusStartSpanContext, References.FOLLOWS_FROM); + } + + public Tracer.SpanBuilder createCancelNexusOperationSpan( + Tracer tracer, String serviceName, String operationName, SpanContext nexusStartSpanContext) { + SpanCreationContext context = + SpanCreationContext.newBuilder() + .setSpanOperationType(SpanOperationType.CANCEL_NEXUS_OPERATION) + .setActionName(serviceName + "." + operationName) + .build(); + return createSpan(context, tracer, nexusStartSpanContext, References.FOLLOWS_FROM); + } + public Tracer.SpanBuilder createWorkflowStartUpdateSpan( Tracer tracer, String updateName, String workflowId, String runId) { SpanCreationContext context = diff --git a/temporal-opentracing/src/test/java/io/temporal/opentracing/NexusOperationTest.java b/temporal-opentracing/src/test/java/io/temporal/opentracing/NexusOperationTest.java new file mode 100644 index 000000000..a72e4dc6d --- /dev/null +++ b/temporal-opentracing/src/test/java/io/temporal/opentracing/NexusOperationTest.java @@ -0,0 +1,197 @@ +/* + * Copyright (C) 2022 Temporal Technologies, Inc. All Rights Reserved. + * + * Copyright (C) 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Modifications copyright (C) 2017 Uber Technologies, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this material except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.temporal.opentracing; + +import static org.junit.Assert.assertEquals; + +import io.nexusrpc.Operation; +import io.nexusrpc.Service; +import io.nexusrpc.handler.OperationHandler; +import io.nexusrpc.handler.OperationImpl; +import io.nexusrpc.handler.ServiceImpl; +import io.opentracing.Scope; +import io.opentracing.mock.MockSpan; +import io.opentracing.mock.MockTracer; +import io.opentracing.util.ThreadLocalScopeManager; +import io.temporal.client.WorkflowClient; +import io.temporal.client.WorkflowClientOptions; +import io.temporal.client.WorkflowOptions; +import io.temporal.nexus.WorkflowClientOperationHandlers; +import io.temporal.testing.internal.SDKTestWorkflowRule; +import io.temporal.worker.WorkerFactoryOptions; +import io.temporal.workflow.*; +import java.time.Duration; +import java.util.List; +import org.junit.After; +import org.junit.Rule; +import org.junit.Test; + +public class NexusOperationTest { + + private final MockTracer mockTracer = + new MockTracer(new ThreadLocalScopeManager(), MockTracer.Propagator.TEXT_MAP); + + private final OpenTracingOptions OT_OPTIONS = + OpenTracingOptions.newBuilder().setTracer(mockTracer).build(); + + @Rule + public SDKTestWorkflowRule testWorkflowRule = + SDKTestWorkflowRule.newBuilder() + .setWorkflowClientOptions( + WorkflowClientOptions.newBuilder() + .setInterceptors(new OpenTracingClientInterceptor(OT_OPTIONS)) + .validateAndBuildWithDefaults()) + .setWorkerFactoryOptions( + WorkerFactoryOptions.newBuilder() + .setWorkerInterceptors(new OpenTracingWorkerInterceptor(OT_OPTIONS)) + .validateAndBuildWithDefaults()) + .setWorkflowTypes(WorkflowImpl.class, OtherWorkflowImpl.class) + .setNexusServiceImplementation(new TestNexusServiceImpl()) + .build(); + + @After + public void tearDown() { + mockTracer.reset(); + } + + @Service + public interface TestNexusService { + @Operation + String operation(String input); + } + + @ServiceImpl(service = TestNexusService.class) + public class TestNexusServiceImpl { + @OperationImpl + public OperationHandler operation() { + return WorkflowClientOperationHandlers.fromWorkflowMethod( + (context, details, client, input) -> + client.newWorkflowStub( + TestOtherWorkflow.class, + WorkflowOptions.newBuilder().setWorkflowId(details.getRequestId()).build()) + ::workflow); + } + } + + @WorkflowInterface + public interface TestWorkflow { + @WorkflowMethod + String workflow(String input); + } + + @WorkflowInterface + public interface TestOtherWorkflow { + @WorkflowMethod + String workflow(String input); + } + + public static class OtherWorkflowImpl implements TestOtherWorkflow { + @Override + public String workflow(String input) { + return "Hello, " + input + "!"; + } + } + + public static class WorkflowImpl implements TestWorkflow { + private final TestNexusService nexusService = + Workflow.newNexusServiceStub( + TestNexusService.class, + NexusServiceOptions.newBuilder() + .setOperationOptions( + NexusOperationOptions.newBuilder() + .setScheduleToCloseTimeout(Duration.ofSeconds(10)) + .build()) + .build()); + + @Override + public String workflow(String input) { + return nexusService.operation(input); + } + } + + /* + * We are checking that spans structure looks like this: + * ClientFunction + * | + * child + * v + * StartWorkflow:TestWorkflow -follow> RunWorkflow:TestWorkflow + * | + * child + * v + * ExecuteNexusOperation:TestNexusService.operation -follow> StartNexusOperation:TestNexusService.operation + * | + * child + * v + * StartWorkflow:TestOtherWorkflow -follow> RunWorkflow:TestOtherWorkflow + */ + @Test + public void testNexusOperation() { + MockSpan span = mockTracer.buildSpan("ClientFunction").start(); + + WorkflowClient client = testWorkflowRule.getWorkflowClient(); + try (Scope scope = mockTracer.scopeManager().activate(span)) { + TestWorkflow workflow = + client.newWorkflowStub( + TestWorkflow.class, + WorkflowOptions.newBuilder() + .setTaskQueue(testWorkflowRule.getTaskQueue()) + .validateBuildWithDefaults()); + assertEquals("Hello, input!", workflow.workflow("input")); + } finally { + span.finish(); + } + + OpenTracingSpansHelper spansHelper = new OpenTracingSpansHelper(mockTracer.finishedSpans()); + + MockSpan clientSpan = spansHelper.getSpanByOperationName("ClientFunction"); + + MockSpan workflowStartSpan = spansHelper.getByParentSpan(clientSpan).get(0); + assertEquals(clientSpan.context().spanId(), workflowStartSpan.parentId()); + assertEquals("StartWorkflow:TestWorkflow", workflowStartSpan.operationName()); + + MockSpan workflowRunSpan = spansHelper.getByParentSpan(workflowStartSpan).get(0); + assertEquals(workflowStartSpan.context().spanId(), workflowRunSpan.parentId()); + assertEquals("RunWorkflow:TestWorkflow", workflowRunSpan.operationName()); + + MockSpan executeNexusOperationSpan = spansHelper.getByParentSpan(workflowRunSpan).get(0); + assertEquals(workflowRunSpan.context().spanId(), executeNexusOperationSpan.parentId()); + assertEquals( + "ExecuteNexusOperation:TestNexusService.operation", + executeNexusOperationSpan.operationName()); + + List startNexusOperationSpans = + spansHelper.getByParentSpan(executeNexusOperationSpan); + + MockSpan startNexusOperationSpan = startNexusOperationSpans.get(0); + assertEquals(executeNexusOperationSpan.context().spanId(), startNexusOperationSpan.parentId()); + assertEquals( + "StartNexusOperation:TestNexusService.operation", startNexusOperationSpan.operationName()); + + MockSpan startOtherWorkflowSpan = spansHelper.getByParentSpan(startNexusOperationSpan).get(0); + assertEquals(startNexusOperationSpan.context().spanId(), startOtherWorkflowSpan.parentId()); + assertEquals("StartWorkflow:TestOtherWorkflow", startOtherWorkflowSpan.operationName()); + + MockSpan otherWorkflowRunSpan = spansHelper.getByParentSpan(startOtherWorkflowSpan).get(0); + assertEquals(startOtherWorkflowSpan.context().spanId(), otherWorkflowRunSpan.parentId()); + assertEquals("RunWorkflow:TestOtherWorkflow", otherWorkflowRunSpan.operationName()); + } +} diff --git a/temporal-opentracing/src/test/java/io/temporal/opentracing/integration/JaegerTest.java b/temporal-opentracing/src/test/java/io/temporal/opentracing/integration/JaegerTest.java index f324cade3..b2ce28ff9 100644 --- a/temporal-opentracing/src/test/java/io/temporal/opentracing/integration/JaegerTest.java +++ b/temporal-opentracing/src/test/java/io/temporal/opentracing/integration/JaegerTest.java @@ -42,9 +42,7 @@ import io.temporal.opentracing.OpenTracingWorkerInterceptor; import io.temporal.testing.internal.SDKTestWorkflowRule; import io.temporal.worker.WorkerFactoryOptions; -import io.temporal.workflow.Workflow; -import io.temporal.workflow.WorkflowInterface; -import io.temporal.workflow.WorkflowMethod; +import io.temporal.workflow.*; import java.time.Duration; import java.util.List; import org.junit.After; @@ -55,7 +53,7 @@ public class JaegerTest { private final InMemoryReporter reporter = new InMemoryReporter(); private final Sampler sampler = new ConstSampler(true); - ; + private final Tracer tracer = new JaegerTracer.Builder("temporal-test").withReporter(reporter).withSampler(sampler).build(); diff --git a/temporal-sdk/src/main/java/io/temporal/common/interceptors/NexusOperationInboundCallsInterceptor.java b/temporal-sdk/src/main/java/io/temporal/common/interceptors/NexusOperationInboundCallsInterceptor.java new file mode 100644 index 000000000..cf8af8d95 --- /dev/null +++ b/temporal-sdk/src/main/java/io/temporal/common/interceptors/NexusOperationInboundCallsInterceptor.java @@ -0,0 +1,117 @@ +/* + * Copyright (C) 2022 Temporal Technologies, Inc. All Rights Reserved. + * + * Copyright (C) 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Modifications copyright (C) 2017 Uber Technologies, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this material except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.temporal.common.interceptors; + +import io.nexusrpc.OperationUnsuccessfulException; +import io.nexusrpc.handler.*; +import io.temporal.common.Experimental; + +/** + * Intercepts inbound calls to a Nexus operation on the worker side. + * + *

An instance should be created in {@link + * WorkerInterceptor#interceptNexusOperation(NexusOperationInboundCallsInterceptor)}. + * + *

Prefer extending {@link NexusOperationInboundCallsInterceptorBase} and overriding only the + * methods you need instead of implementing this interface directly. {@link + * NexusOperationInboundCallsInterceptorBase} provides correct default implementations to all the + * methods of this interface. + */ +@Experimental +public interface NexusOperationInboundCallsInterceptor { + final class StartOperationInput { + private final OperationContext operationContext; + private final OperationStartDetails startDetails; + private final Object input; + + public StartOperationInput( + OperationContext operationContext, OperationStartDetails startDetails, Object input) { + this.operationContext = operationContext; + this.startDetails = startDetails; + this.input = input; + } + + public OperationContext getOperationContext() { + return operationContext; + } + + public OperationStartDetails getStartDetails() { + return startDetails; + } + + public Object getInput() { + return input; + } + } + + final class StartOperationOutput { + private final OperationStartResult result; + + public StartOperationOutput(OperationStartResult result) { + this.result = result; + } + + public OperationStartResult getResult() { + return result; + } + } + + final class CancelOperationInput { + private final OperationContext operationContext; + private final OperationCancelDetails cancelDetails; + + public CancelOperationInput( + OperationContext operationContext, OperationCancelDetails cancelDetails) { + this.operationContext = operationContext; + this.cancelDetails = cancelDetails; + } + + public OperationContext getOperationContext() { + return operationContext; + } + + public OperationCancelDetails getCancelDetails() { + return cancelDetails; + } + } + + final class CancelOperationOutput {} + + void init(NexusOperationOutboundCallsInterceptor outboundCalls); + + /** + * Intercepts a call to start a Nexus operation. + * + * @param input input to the operation start. + * @return result of the operation start. + * @throws OperationUnsuccessfulException if the operation start failed. + */ + StartOperationOutput startOperation(StartOperationInput input) + throws OperationUnsuccessfulException; + + /** + * Intercepts a call to cancel a Nexus operation. + * + * @param input input to the operation cancel. + * @return result of the operation cancel. + */ + CancelOperationOutput cancelOperation(CancelOperationInput input); +} diff --git a/temporal-sdk/src/main/java/io/temporal/common/interceptors/NexusOperationInboundCallsInterceptorBase.java b/temporal-sdk/src/main/java/io/temporal/common/interceptors/NexusOperationInboundCallsInterceptorBase.java new file mode 100644 index 000000000..b523a9c6c --- /dev/null +++ b/temporal-sdk/src/main/java/io/temporal/common/interceptors/NexusOperationInboundCallsInterceptorBase.java @@ -0,0 +1,51 @@ +/* + * Copyright (C) 2022 Temporal Technologies, Inc. All Rights Reserved. + * + * Copyright (C) 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Modifications copyright (C) 2017 Uber Technologies, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this material except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.temporal.common.interceptors; + +import io.nexusrpc.OperationUnsuccessfulException; +import io.temporal.common.Experimental; + +/** Convenience base class for {@link NexusOperationInboundCallsInterceptor} implementations. */ +@Experimental +public class NexusOperationInboundCallsInterceptorBase + implements NexusOperationInboundCallsInterceptor { + private final NexusOperationInboundCallsInterceptor next; + + public NexusOperationInboundCallsInterceptorBase(NexusOperationInboundCallsInterceptor next) { + this.next = next; + } + + @Override + public void init(NexusOperationOutboundCallsInterceptor outboundCalls) { + next.init(outboundCalls); + } + + @Override + public StartOperationOutput startOperation(StartOperationInput input) + throws OperationUnsuccessfulException { + return next.startOperation(input); + } + + @Override + public CancelOperationOutput cancelOperation(CancelOperationInput input) { + return next.cancelOperation(input); + } +} diff --git a/temporal-sdk/src/main/java/io/temporal/common/interceptors/NexusOperationOutboundCallsInterceptor.java b/temporal-sdk/src/main/java/io/temporal/common/interceptors/NexusOperationOutboundCallsInterceptor.java new file mode 100644 index 000000000..1f3d80015 --- /dev/null +++ b/temporal-sdk/src/main/java/io/temporal/common/interceptors/NexusOperationOutboundCallsInterceptor.java @@ -0,0 +1,44 @@ +/* + * Copyright (C) 2022 Temporal Technologies, Inc. All Rights Reserved. + * + * Copyright (C) 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Modifications copyright (C) 2017 Uber Technologies, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this material except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.temporal.common.interceptors; + +import com.uber.m3.tally.Scope; +import io.temporal.common.Experimental; + +/** + * Can be used to intercept calls from a Nexus operation into the Temporal APIs. + * + *

Prefer extending {@link NexusOperationOutboundCallsInterceptorBase} and overriding only the + * methods you need instead of implementing this interface directly. {@link + * NexusOperationOutboundCallsInterceptorBase} provides correct default implementations to all the + * methods of this interface. + * + *

An instance may be created in {@link + * NexusOperationInboundCallsInterceptor#init(NexusOperationOutboundCallsInterceptor)} and set by + * passing it into {@code init} method of the {@code next} {@link + * NexusOperationInboundCallsInterceptor} The implementation must forward all the calls to the + * outbound interceptor passed as a {@code outboundCalls} parameter to the {@code init} call. + */ +@Experimental +public interface NexusOperationOutboundCallsInterceptor { + /** Intercepts call to get the metric scope in a Nexus operation. */ + Scope getMetricsScope(); +} diff --git a/temporal-sdk/src/main/java/io/temporal/common/interceptors/NexusOperationOutboundCallsInterceptorBase.java b/temporal-sdk/src/main/java/io/temporal/common/interceptors/NexusOperationOutboundCallsInterceptorBase.java new file mode 100644 index 000000000..26ad01481 --- /dev/null +++ b/temporal-sdk/src/main/java/io/temporal/common/interceptors/NexusOperationOutboundCallsInterceptorBase.java @@ -0,0 +1,40 @@ +/* + * Copyright (C) 2022 Temporal Technologies, Inc. All Rights Reserved. + * + * Copyright (C) 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Modifications copyright (C) 2017 Uber Technologies, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this material except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.temporal.common.interceptors; + +import com.uber.m3.tally.Scope; +import io.temporal.common.Experimental; + +/** Convenience base class for {@link NexusOperationOutboundCallsInterceptor} implementations. */ +@Experimental +public class NexusOperationOutboundCallsInterceptorBase + implements NexusOperationOutboundCallsInterceptor { + private final NexusOperationOutboundCallsInterceptor next; + + public NexusOperationOutboundCallsInterceptorBase(NexusOperationOutboundCallsInterceptor next) { + this.next = next; + } + + @Override + public Scope getMetricsScope() { + return next.getMetricsScope(); + } +} diff --git a/temporal-sdk/src/main/java/io/temporal/common/interceptors/WorkerInterceptor.java b/temporal-sdk/src/main/java/io/temporal/common/interceptors/WorkerInterceptor.java index e5aabbfe9..d2caf91c7 100644 --- a/temporal-sdk/src/main/java/io/temporal/common/interceptors/WorkerInterceptor.java +++ b/temporal-sdk/src/main/java/io/temporal/common/interceptors/WorkerInterceptor.java @@ -95,4 +95,16 @@ public interface WorkerInterceptor { WorkflowInboundCallsInterceptor interceptWorkflow(WorkflowInboundCallsInterceptor next); ActivityInboundCallsInterceptor interceptActivity(ActivityInboundCallsInterceptor next); + + /** + * Called when Nexus task is received. May create a {@link NexusOperationInboundCallsInterceptor} + * instance. The instance must forward all the calls to {@code next} {@link + * NexusOperationInboundCallsInterceptor}, but it may change the input parameters. + * + * @param next an existing interceptor instance to be proxied by the interceptor created inside + * this method + * @return an interceptor that passes all the calls to {@code next} + */ + NexusOperationInboundCallsInterceptor interceptNexusOperation( + NexusOperationInboundCallsInterceptor next); } diff --git a/temporal-sdk/src/main/java/io/temporal/common/interceptors/WorkerInterceptorBase.java b/temporal-sdk/src/main/java/io/temporal/common/interceptors/WorkerInterceptorBase.java index 2c09d188e..5a862e701 100644 --- a/temporal-sdk/src/main/java/io/temporal/common/interceptors/WorkerInterceptorBase.java +++ b/temporal-sdk/src/main/java/io/temporal/common/interceptors/WorkerInterceptorBase.java @@ -30,4 +30,10 @@ public WorkflowInboundCallsInterceptor interceptWorkflow(WorkflowInboundCallsInt public ActivityInboundCallsInterceptor interceptActivity(ActivityInboundCallsInterceptor next) { return next; } + + @Override + public NexusOperationInboundCallsInterceptor interceptNexusOperation( + NexusOperationInboundCallsInterceptor next) { + return next; + } } diff --git a/temporal-sdk/src/main/java/io/temporal/internal/nexus/NexusOperationContextImpl.java b/temporal-sdk/src/main/java/io/temporal/internal/nexus/NexusOperationContextImpl.java index b54c0867d..e159d775e 100644 --- a/temporal-sdk/src/main/java/io/temporal/internal/nexus/NexusOperationContextImpl.java +++ b/temporal-sdk/src/main/java/io/temporal/internal/nexus/NexusOperationContextImpl.java @@ -22,25 +22,29 @@ import com.uber.m3.tally.Scope; import io.temporal.client.WorkflowClient; +import io.temporal.common.interceptors.NexusOperationOutboundCallsInterceptor; import io.temporal.nexus.NexusOperationContext; public class NexusOperationContextImpl implements NexusOperationContext { private final String namespace; private final String taskQueue; private final WorkflowClient client; - private final Scope metricsScope; + NexusOperationOutboundCallsInterceptor outboundCalls; public NexusOperationContextImpl( - String namespace, String taskQueue, WorkflowClient client, Scope metricsScope) { + String namespace, + String taskQueue, + WorkflowClient client, + NexusOperationOutboundCallsInterceptor outboundCalls) { this.namespace = namespace; this.taskQueue = taskQueue; this.client = client; - this.metricsScope = metricsScope; + this.outboundCalls = outboundCalls; } @Override public Scope getMetricsScope() { - return metricsScope; + return outboundCalls.getMetricsScope(); } public WorkflowClient getWorkflowClient() { @@ -54,4 +58,8 @@ public String getTaskQueue() { public String getNamespace() { return namespace; } + + public void setOutboundInterceptor(NexusOperationOutboundCallsInterceptor outboundCalls) { + this.outboundCalls = outboundCalls; + } } diff --git a/temporal-sdk/src/main/java/io/temporal/internal/nexus/NexusServiceInterceptorConverter.java b/temporal-sdk/src/main/java/io/temporal/internal/nexus/NexusServiceInterceptorConverter.java new file mode 100644 index 000000000..4c44e8c6e --- /dev/null +++ b/temporal-sdk/src/main/java/io/temporal/internal/nexus/NexusServiceInterceptorConverter.java @@ -0,0 +1,91 @@ +/* + * Copyright (C) 2022 Temporal Technologies, Inc. All Rights Reserved. + * + * Copyright (C) 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Modifications copyright (C) 2017 Uber Technologies, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this material except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.temporal.internal.nexus; + +import io.nexusrpc.OperationInfo; +import io.nexusrpc.OperationUnsuccessfulException; +import io.nexusrpc.handler.*; +import io.temporal.common.interceptors.NexusOperationInboundCallsInterceptor; +import io.temporal.common.interceptors.WorkerInterceptor; + +public class NexusServiceInterceptorConverter implements ServiceHandlerInterceptor { + private final WorkerInterceptor[] interceptors; + RootNexusOperationInboundCallsInterceptor rootInboundCallsInterceptor; + + public NexusServiceInterceptorConverter(WorkerInterceptor[] interceptors) { + this.interceptors = interceptors; + } + + @Override + public OperationInterceptor interceptOperation(OperationInterceptor operationInterceptor) { + rootInboundCallsInterceptor = + new RootNexusOperationInboundCallsInterceptor(operationInterceptor); + NexusOperationInboundCallsInterceptor inboundCallsInterceptor = rootInboundCallsInterceptor; + for (WorkerInterceptor interceptor : interceptors) { + inboundCallsInterceptor = interceptor.interceptNexusOperation(inboundCallsInterceptor); + } + + inboundCallsInterceptor.init( + new RootNexusOperationOutboundCallsInterceptor( + CurrentNexusOperationContext.get().getMetricsScope())); + return new OperationInterceptorConverter(inboundCallsInterceptor); + } + + class OperationInterceptorConverter implements OperationInterceptor { + private final NexusOperationInboundCallsInterceptor next; + + public OperationInterceptorConverter(NexusOperationInboundCallsInterceptor next) { + this.next = next; + } + + @Override + public OperationStartResult start( + OperationContext operationContext, OperationStartDetails operationStartDetails, Object o) + throws OperationUnsuccessfulException { + return next.startOperation( + new NexusOperationInboundCallsInterceptor.StartOperationInput( + operationContext, operationStartDetails, o)) + .getResult(); + } + + @Override + public Object fetchResult( + OperationContext operationContext, OperationFetchResultDetails operationFetchResultDetails) + throws OperationHandlerException { + throw new UnsupportedOperationException("Not implemented"); + } + + @Override + public OperationInfo fetchInfo( + OperationContext operationContext, OperationFetchInfoDetails operationFetchInfoDetails) + throws OperationHandlerException { + throw new UnsupportedOperationException("Not implemented"); + } + + @Override + public void cancel( + OperationContext operationContext, OperationCancelDetails operationCancelDetails) { + next.cancelOperation( + new NexusOperationInboundCallsInterceptor.CancelOperationInput( + operationContext, operationCancelDetails)); + } + } +} diff --git a/temporal-sdk/src/main/java/io/temporal/internal/nexus/NexusTaskHandlerImpl.java b/temporal-sdk/src/main/java/io/temporal/internal/nexus/NexusTaskHandlerImpl.java index 0714d472e..0b0048d7e 100644 --- a/temporal-sdk/src/main/java/io/temporal/internal/nexus/NexusTaskHandlerImpl.java +++ b/temporal-sdk/src/main/java/io/temporal/internal/nexus/NexusTaskHandlerImpl.java @@ -33,6 +33,7 @@ import io.temporal.client.WorkflowClient; import io.temporal.client.WorkflowException; import io.temporal.common.converter.DataConverter; +import io.temporal.common.interceptors.WorkerInterceptor; import io.temporal.failure.ApplicationFailure; import io.temporal.internal.common.NexusUtil; import io.temporal.internal.worker.NexusTask; @@ -46,6 +47,7 @@ import java.util.concurrent.*; import java.util.concurrent.atomic.AtomicBoolean; import java.util.stream.Collectors; +import javax.annotation.Nonnull; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -59,13 +61,21 @@ public class NexusTaskHandlerImpl implements NexusTaskHandler { private final Map serviceImplInstances = Collections.synchronizedMap(new HashMap<>()); private final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1); + private final WorkerInterceptor[] interceptors; + private final NexusServiceInterceptorConverter nexusServiceInterceptor; public NexusTaskHandlerImpl( - WorkflowClient client, String namespace, String taskQueue, DataConverter dataConverter) { - this.client = client; - this.namespace = namespace; - this.taskQueue = taskQueue; - this.dataConverter = dataConverter; + @Nonnull WorkflowClient client, + @Nonnull String namespace, + @Nonnull String taskQueue, + @Nonnull DataConverter dataConverter, + @Nonnull WorkerInterceptor[] interceptors) { + this.client = Objects.requireNonNull(client); + this.namespace = Objects.requireNonNull(namespace); + this.taskQueue = Objects.requireNonNull(taskQueue); + this.dataConverter = Objects.requireNonNull(dataConverter); + this.interceptors = Objects.requireNonNull(interceptors); + this.nexusServiceInterceptor = new NexusServiceInterceptorConverter(interceptors); } @Override @@ -76,6 +86,7 @@ public boolean start() { ServiceHandler.Builder serviceHandlerBuilder = ServiceHandler.newBuilder().setSerializer(new PayloadSerializer(dataConverter)); serviceImplInstances.forEach((name, instance) -> serviceHandlerBuilder.addInstance(instance)); + serviceHandlerBuilder.setInterceptors(nexusServiceInterceptor); serviceHandler = serviceHandlerBuilder.build(); return true; } @@ -119,7 +130,11 @@ public Result handle(NexusTask task, Scope metricsScope) throws TimeoutException } CurrentNexusOperationContext.set( - new NexusOperationContextImpl(namespace, taskQueue, client, metricsScope)); + new NexusOperationContextImpl( + namespace, + taskQueue, + client, + new RootNexusOperationOutboundCallsInterceptor(metricsScope))); switch (request.getVariantCase()) { case START_OPERATION: diff --git a/temporal-sdk/src/main/java/io/temporal/internal/nexus/RootNexusOperationInboundCallsInterceptor.java b/temporal-sdk/src/main/java/io/temporal/internal/nexus/RootNexusOperationInboundCallsInterceptor.java new file mode 100644 index 000000000..b3b9c06b5 --- /dev/null +++ b/temporal-sdk/src/main/java/io/temporal/internal/nexus/RootNexusOperationInboundCallsInterceptor.java @@ -0,0 +1,56 @@ +/* + * Copyright (C) 2022 Temporal Technologies, Inc. All Rights Reserved. + * + * Copyright (C) 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Modifications copyright (C) 2017 Uber Technologies, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this material except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.temporal.internal.nexus; + +import io.nexusrpc.OperationUnsuccessfulException; +import io.nexusrpc.handler.OperationInterceptor; +import io.nexusrpc.handler.OperationStartResult; +import io.temporal.common.interceptors.NexusOperationInboundCallsInterceptor; +import io.temporal.common.interceptors.NexusOperationOutboundCallsInterceptor; + +public class RootNexusOperationInboundCallsInterceptor + implements NexusOperationInboundCallsInterceptor { + private final OperationInterceptor operationInterceptor; + + RootNexusOperationInboundCallsInterceptor(OperationInterceptor operationInterceptor) { + this.operationInterceptor = operationInterceptor; + } + + @Override + public void init(NexusOperationOutboundCallsInterceptor outboundCalls) { + CurrentNexusOperationContext.get().setOutboundInterceptor(outboundCalls); + } + + @Override + public StartOperationOutput startOperation(StartOperationInput input) + throws OperationUnsuccessfulException { + OperationStartResult result = + operationInterceptor.start( + input.getOperationContext(), input.getStartDetails(), input.getInput()); + return new StartOperationOutput(result); + } + + @Override + public CancelOperationOutput cancelOperation(CancelOperationInput input) { + operationInterceptor.cancel(input.getOperationContext(), input.getCancelDetails()); + return new CancelOperationOutput(); + } +} diff --git a/temporal-sdk/src/main/java/io/temporal/internal/nexus/RootNexusOperationOutboundCallsInterceptor.java b/temporal-sdk/src/main/java/io/temporal/internal/nexus/RootNexusOperationOutboundCallsInterceptor.java new file mode 100644 index 000000000..1b16f87fe --- /dev/null +++ b/temporal-sdk/src/main/java/io/temporal/internal/nexus/RootNexusOperationOutboundCallsInterceptor.java @@ -0,0 +1,38 @@ +/* + * Copyright (C) 2022 Temporal Technologies, Inc. All Rights Reserved. + * + * Copyright (C) 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Modifications copyright (C) 2017 Uber Technologies, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this material except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.temporal.internal.nexus; + +import com.uber.m3.tally.Scope; +import io.temporal.common.interceptors.NexusOperationOutboundCallsInterceptor; + +public class RootNexusOperationOutboundCallsInterceptor + implements NexusOperationOutboundCallsInterceptor { + private final Scope scope; + + RootNexusOperationOutboundCallsInterceptor(Scope scope) { + this.scope = scope; + } + + @Override + public Scope getMetricsScope() { + return scope; + } +} diff --git a/temporal-sdk/src/main/java/io/temporal/internal/worker/SyncNexusWorker.java b/temporal-sdk/src/main/java/io/temporal/internal/worker/SyncNexusWorker.java index c213dafc2..d452b7312 100644 --- a/temporal-sdk/src/main/java/io/temporal/internal/worker/SyncNexusWorker.java +++ b/temporal-sdk/src/main/java/io/temporal/internal/worker/SyncNexusWorker.java @@ -49,7 +49,12 @@ public SyncNexusWorker( this.taskQueue = taskQueue; this.taskHandler = - new NexusTaskHandlerImpl(client, namespace, taskQueue, options.getDataConverter()); + new NexusTaskHandlerImpl( + client, + namespace, + taskQueue, + options.getDataConverter(), + options.getWorkerInterceptors()); this.worker = new NexusWorker( client.getWorkflowServiceStubs(), diff --git a/temporal-sdk/src/test/java/io/temporal/internal/nexus/NexusTaskHandlerImplTest.java b/temporal-sdk/src/test/java/io/temporal/internal/nexus/NexusTaskHandlerImplTest.java index 6a1a85eb4..f0f1ab11e 100644 --- a/temporal-sdk/src/test/java/io/temporal/internal/nexus/NexusTaskHandlerImplTest.java +++ b/temporal-sdk/src/test/java/io/temporal/internal/nexus/NexusTaskHandlerImplTest.java @@ -20,6 +20,8 @@ package io.temporal.internal.nexus; +import static org.mockito.Mockito.mock; + import com.google.protobuf.ByteString; import com.uber.m3.tally.RootScopeBuilder; import com.uber.m3.tally.Scope; @@ -32,8 +34,10 @@ import io.temporal.api.nexus.v1.Request; import io.temporal.api.nexus.v1.StartOperationRequest; import io.temporal.api.workflowservice.v1.PollNexusTaskQueueResponse; +import io.temporal.client.WorkflowClient; import io.temporal.common.converter.DataConverter; import io.temporal.common.converter.DefaultDataConverter; +import io.temporal.common.interceptors.WorkerInterceptor; import io.temporal.common.reporter.TestStatsReporter; import io.temporal.internal.worker.NexusTask; import io.temporal.internal.worker.NexusTaskHandler; @@ -59,16 +63,20 @@ public void setUp() { @Test public void nexusTaskHandlerImplStartNoService() { + WorkflowClient client = mock(WorkflowClient.class); NexusTaskHandlerImpl nexusTaskHandlerImpl = - new NexusTaskHandlerImpl(null, NAMESPACE, TASK_QUEUE, dataConverter); + new NexusTaskHandlerImpl( + client, NAMESPACE, TASK_QUEUE, dataConverter, new WorkerInterceptor[] {}); // Verify if no service is registered, start should return false Assert.assertFalse(nexusTaskHandlerImpl.start()); } @Test public void nexusTaskHandlerImplStart() { + WorkflowClient client = mock(WorkflowClient.class); NexusTaskHandlerImpl nexusTaskHandlerImpl = - new NexusTaskHandlerImpl(null, NAMESPACE, TASK_QUEUE, dataConverter); + new NexusTaskHandlerImpl( + client, NAMESPACE, TASK_QUEUE, dataConverter, new WorkerInterceptor[] {}); nexusTaskHandlerImpl.registerNexusServiceImplementations( new Object[] {new TestNexusServiceImpl()}); // Verify if any services are registered, start should return true @@ -77,8 +85,10 @@ public void nexusTaskHandlerImplStart() { @Test public void startSyncTask() throws TimeoutException { + WorkflowClient client = mock(WorkflowClient.class); NexusTaskHandlerImpl nexusTaskHandlerImpl = - new NexusTaskHandlerImpl(null, NAMESPACE, TASK_QUEUE, dataConverter); + new NexusTaskHandlerImpl( + client, NAMESPACE, TASK_QUEUE, dataConverter, new WorkerInterceptor[] {}); nexusTaskHandlerImpl.registerNexusServiceImplementations( new Object[] {new TestNexusServiceImpl()}); nexusTaskHandlerImpl.start(); @@ -115,8 +125,10 @@ public void startSyncTask() throws TimeoutException { @Test public void syncTimeoutTask() { + WorkflowClient client = mock(WorkflowClient.class); NexusTaskHandlerImpl nexusTaskHandlerImpl = - new NexusTaskHandlerImpl(null, NAMESPACE, TASK_QUEUE, dataConverter); + new NexusTaskHandlerImpl( + client, NAMESPACE, TASK_QUEUE, dataConverter, new WorkerInterceptor[] {}); nexusTaskHandlerImpl.registerNexusServiceImplementations( new Object[] {new TestNexusServiceImpl2()}); nexusTaskHandlerImpl.start(); @@ -140,8 +152,10 @@ public void syncTimeoutTask() { @Test public void startAsyncSyncOperation() throws TimeoutException { + WorkflowClient client = mock(WorkflowClient.class); NexusTaskHandlerImpl nexusTaskHandlerImpl = - new NexusTaskHandlerImpl(null, NAMESPACE, TASK_QUEUE, dataConverter); + new NexusTaskHandlerImpl( + client, NAMESPACE, TASK_QUEUE, dataConverter, new WorkerInterceptor[] {}); nexusTaskHandlerImpl.registerNexusServiceImplementations( new Object[] {new TestNexusServiceImplAsync()}); nexusTaskHandlerImpl.start(); diff --git a/temporal-sdk/src/test/java/io/temporal/workflow/nexus/CancelAsyncOperationTest.java b/temporal-sdk/src/test/java/io/temporal/workflow/nexus/CancelAsyncOperationTest.java index eb0bd1e42..5c8aad867 100644 --- a/temporal-sdk/src/test/java/io/temporal/workflow/nexus/CancelAsyncOperationTest.java +++ b/temporal-sdk/src/test/java/io/temporal/workflow/nexus/CancelAsyncOperationTest.java @@ -29,6 +29,7 @@ import io.temporal.failure.NexusOperationFailure; import io.temporal.nexus.WorkflowClientOperationHandlers; import io.temporal.testing.internal.SDKTestWorkflowRule; +import io.temporal.testing.internal.TracingWorkerInterceptor; import io.temporal.workflow.*; import io.temporal.workflow.shared.TestNexusServices; import io.temporal.workflow.shared.TestWorkflows; @@ -58,6 +59,15 @@ public void asyncOperationImmediatelyCancelled() { CanceledFailure canceledFailure = (CanceledFailure) nexusFailure.getCause(); Assert.assertEquals( "operation canceled before it was started", canceledFailure.getOriginalMessage()); + + testWorkflowRule + .getInterceptor(TracingWorkerInterceptor.class) + .setExpected( + "interceptExecuteWorkflow " + SDKTestWorkflowRule.UUID_REGEXP, + "newThread workflow-method", + "executeNexusOperation TestNexusService1.operation", + "startNexusOperation TestNexusService1.operation", + "cancelNexusOperation TestNexusService1.operation"); } @Test @@ -69,6 +79,19 @@ public void asyncOperationCancelled() { Assert.assertTrue(exception.getCause() instanceof NexusOperationFailure); NexusOperationFailure nexusFailure = (NexusOperationFailure) exception.getCause(); Assert.assertTrue(nexusFailure.getCause() instanceof CanceledFailure); + + testWorkflowRule + .getInterceptor(TracingWorkerInterceptor.class) + .setExpected( + "interceptExecuteWorkflow " + SDKTestWorkflowRule.UUID_REGEXP, + "newThread workflow-method", + "executeNexusOperation TestNexusService1.operation", + "startNexusOperation TestNexusService1.operation", + "interceptExecuteWorkflow " + SDKTestWorkflowRule.UUID_REGEXP, + "registerSignalHandlers unblock", + "newThread workflow-method", + "await await", + "cancelNexusOperation TestNexusService1.operation"); } public static class TestNexus implements TestWorkflows.TestWorkflow1 { diff --git a/temporal-sdk/src/test/java/io/temporal/workflow/nexus/SyncClientOperationTest.java b/temporal-sdk/src/test/java/io/temporal/workflow/nexus/SyncClientOperationTest.java index 875f4ac83..d67bff8a8 100644 --- a/temporal-sdk/src/test/java/io/temporal/workflow/nexus/SyncClientOperationTest.java +++ b/temporal-sdk/src/test/java/io/temporal/workflow/nexus/SyncClientOperationTest.java @@ -34,6 +34,7 @@ import io.temporal.nexus.WorkflowClientOperationHandlers; import io.temporal.serviceclient.MetricsTag; import io.temporal.testing.internal.SDKTestWorkflowRule; +import io.temporal.testing.internal.TracingWorkerInterceptor; import io.temporal.worker.MetricsType; import io.temporal.worker.WorkerMetricsTag; import io.temporal.workflow.*; @@ -63,7 +64,14 @@ public void syncClientOperationSuccess() { TestUpdatedWorkflow workflowStub = testWorkflowRule.newWorkflowStubTimeoutOptions(TestUpdatedWorkflow.class); Assert.assertTrue(workflowStub.execute(false).startsWith("Update ID:")); - + testWorkflowRule + .getInterceptor(TracingWorkerInterceptor.class) + .setExpected( + "interceptExecuteWorkflow " + SDKTestWorkflowRule.UUID_REGEXP, + "registerUpdateHandlers update", + "newThread workflow-method", + "executeNexusOperation TestNexusService1.operation", + "startNexusOperation TestNexusService1.operation"); // Test metrics all tasks should have Map nexusWorkerTags = ImmutableMap.builder() diff --git a/temporal-testing/src/main/java/io/temporal/testing/internal/TracingWorkerInterceptor.java b/temporal-testing/src/main/java/io/temporal/testing/internal/TracingWorkerInterceptor.java index 88bba8b45..d3e1bfb12 100644 --- a/temporal-testing/src/main/java/io/temporal/testing/internal/TracingWorkerInterceptor.java +++ b/temporal-testing/src/main/java/io/temporal/testing/internal/TracingWorkerInterceptor.java @@ -24,6 +24,7 @@ import static org.junit.Assert.assertTrue; import com.uber.m3.tally.Scope; +import io.nexusrpc.OperationUnsuccessfulException; import io.temporal.activity.ActivityExecutionContext; import io.temporal.client.ActivityCompletionException; import io.temporal.common.SearchAttributeUpdate; @@ -129,6 +130,12 @@ public ActivityInboundCallsInterceptor interceptActivity(ActivityInboundCallsInt return new TracingActivityInboundCallsInterceptor(trace, next); } + @Override + public NexusOperationInboundCallsInterceptor interceptNexusOperation( + NexusOperationInboundCallsInterceptor next) { + return new TracingNexusOperationInboundCallsInterceptor(trace, next); + } + public static class FilteredTrace { private final List impl = Collections.synchronizedList(new ArrayList<>()); @@ -189,7 +196,7 @@ public ChildWorkflowOutput executeChildWorkflow(ChildWorkflowInput inp public ExecuteNexusOperationOutput executeNexusOperation( ExecuteNexusOperationInput input) { if (!WorkflowUnsafe.isReplaying()) { - trace.add("executeNexusOperation " + input.getOperation()); + trace.add("executeNexusOperation " + input.getService() + "." + input.getOperation()); } return next.executeNexusOperation(input); } @@ -467,4 +474,42 @@ public ActivityOutput execute(ActivityInput input) { return next.execute(input); } } + + private static class TracingNexusOperationInboundCallsInterceptor + implements NexusOperationInboundCallsInterceptor { + private final NexusOperationInboundCallsInterceptor next; + private final FilteredTrace trace; + + public TracingNexusOperationInboundCallsInterceptor( + FilteredTrace trace, NexusOperationInboundCallsInterceptor next) { + this.trace = trace; + this.next = next; + } + + @Override + public void init(NexusOperationOutboundCallsInterceptor outboundCalls) { + next.init(outboundCalls); + } + + @Override + public StartOperationOutput startOperation(StartOperationInput input) + throws OperationUnsuccessfulException { + trace.add( + "startNexusOperation " + + input.getOperationContext().getService() + + "." + + input.getOperationContext().getOperation()); + return next.startOperation(input); + } + + @Override + public CancelOperationOutput cancelOperation(CancelOperationInput input) { + trace.add( + "cancelNexusOperation " + + input.getOperationContext().getService() + + "." + + input.getOperationContext().getOperation()); + return next.cancelOperation(input); + } + } }