Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

oss-1840 #1

Open
wants to merge 5 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ async-trait = "0.1"
base64 = "0.21"
console-subscriber = { version = "0.1", optional = true }
crossbeam = "0.8"
dashmap = "5.0"
dashmap = "5.5"
derive_builder = "0.12"
derive_more = "0.99"
enum_dispatch = "0.3"
Expand Down
6 changes: 6 additions & 0 deletions core/src/internal_flags.rs
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,8 @@ impl InternalFlags {
WorkflowTaskCompletedMetadata {
core_used_flags: core_newly_used,
lang_used_flags: lang_newly_used,
sdk_name: "".to_string(),
sdk_version: "".to_string(),
}
}
Self::Disabled => WorkflowTaskCompletedMetadata::default(),
Expand Down Expand Up @@ -182,6 +184,8 @@ mod tests {
sdk_metadata: Some(WorkflowTaskCompletedMetadata {
core_used_flags: vec![1],
lang_used_flags: vec![],
sdk_name: "".to_string(),
sdk_version: "".to_string(),
}),
..Default::default()
});
Expand Down Expand Up @@ -215,6 +219,8 @@ mod tests {
sdk_metadata: Some(WorkflowTaskCompletedMetadata {
core_used_flags: vec![2],
lang_used_flags: vec![2],
sdk_name: "".to_string(),
sdk_version: "".to_string(),
}),
..Default::default()
});
Expand Down
1 change: 1 addition & 0 deletions core/src/worker/client/mocks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ pub(crate) static DEFAULT_TEST_CAPABILITIES: &Capabilities = &Capabilities {
upsert_memo: true,
eager_workflow_start: true,
sdk_metadata: true,
count_group_by_execution_status: false,
};

#[cfg(test)]
Expand Down
36 changes: 35 additions & 1 deletion core/src/worker/workflow/machines/activity_state_machine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ use super::{
OnEventWrapper, WFMachinesAdapter, WFMachinesError,
};
use crate::{
abstractions::dbg_panic,
internal_flags::CoreInternalFlags,
worker::workflow::{machines::HistEventData, InternalFlagsRef},
};
Expand Down Expand Up @@ -44,6 +45,7 @@ fsm! {
ScheduleCommandCreated --(ActivityTaskScheduled(ActTaskScheduledData),
shared on_activity_task_scheduled) --> ScheduledEventRecorded;
ScheduleCommandCreated --(Cancel, shared on_canceled) --> Canceled;
ScheduleCommandCreated --(Abandon, shared on_abandoned) --> Canceled;

ScheduledEventRecorded --(ActivityTaskStarted(i64), shared on_task_started) --> Started;
ScheduledEventRecorded --(ActivityTaskTimedOut(ActivityTaskTimedOutEventAttributes),
Expand Down Expand Up @@ -407,10 +409,18 @@ impl ScheduleCommandCreated {
pub(super) fn on_canceled(self, dat: &mut SharedState) -> ActivityMachineTransition<Canceled> {
dat.cancelled_before_sent = true;
match dat.cancellation_type {
ActivityCancellationType::Abandon => ActivityMachineTransition::default(),
ActivityCancellationType::Abandon => {
dbg_panic!("Can't get on_canceled transition with Abandon cancelation type");
ActivityMachineTransition::default()
}
_ => notify_lang_activity_cancelled(None),
}
}

pub(super) fn on_abandoned(self, dat: &mut SharedState) -> ActivityMachineTransition<Canceled> {
dat.cancelled_before_sent = true;
ActivityMachineTransition::default()
}
}

#[derive(Default, Clone)]
Expand Down Expand Up @@ -807,6 +817,7 @@ mod test {
internal_flags::InternalFlags,
replay::TestHistoryBuilder,
test_help::{build_fake_sdk, MockPollCfg, ResponseType},
worker::workflow::machines::Machines,
};
use std::{cell::RefCell, mem::discriminant, rc::Rc};
use temporal_sdk::{ActivityOptions, CancellableFuture, WfContext, WorkflowFunction};
Expand Down Expand Up @@ -886,4 +897,27 @@ mod test {
assert_eq!(discriminant(&state), discriminant(s.state()));
}
}

#[test]
fn cancel_in_schedule_command_created_for_abandon() {
let s = ActivityMachine::new_scheduled(
ScheduleActivity {
activity_id: "1".to_string(),
activity_type: "foo".to_string(),
cancellation_type: ActivityCancellationType::Abandon.into(),
..Default::default()
},
Rc::new(RefCell::new(InternalFlags::new(&Default::default()))),
true,
);
let mut s = if let Machines::ActivityMachine(am) = s.machine {
am
} else {
panic!("Wrong machine type");
};
let cmds = s.cancel().unwrap();
assert_eq!(cmds.len(), 0);
let curstate = s.state();
assert!(matches!(curstate, &ActivityMachineState::Canceled(_)));
}
}
3 changes: 1 addition & 2 deletions core/src/worker/workflow/machines/update_state_machine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ pub(super) enum UpdateMachineCommand {
pub(super) struct SharedState {
message_id: String,
instance_id: String,
#[allow(dead_code)]
event_seq_id: i64,
request: UpdateRequest,
}
Expand Down Expand Up @@ -236,7 +237,6 @@ impl WFMachinesAdapter for UpdateMachine {
format!("{}/accept", self.shared_state.message_id),
UpdateMsg::Accept(Acceptance {
accepted_request_message_id: self.shared_state.message_id.clone(),
accepted_request_sequencing_event_id: self.shared_state.event_seq_id,
..Default::default()
}),
)?,
Expand All @@ -245,7 +245,6 @@ impl WFMachinesAdapter for UpdateMachine {
format!("{}/reject", self.shared_state.message_id),
UpdateMsg::Reject(Rejection {
rejected_request_message_id: self.shared_state.message_id.clone(),
rejected_request_sequencing_event_id: self.shared_state.event_seq_id,
failure: Some(fail),
..Default::default()
}),
Expand Down
4 changes: 2 additions & 2 deletions sdk-core-protos/protos/api_upstream/.buildkite/Dockerfile
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
FROM temporalio/base-ci-builder:1.5.0
WORKDIR /temporal
FROM temporalio/base-ci-builder:1.10.3
WORKDIR /temporal
3 changes: 2 additions & 1 deletion sdk-core-protos/protos/api_upstream/.gitignore
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
/.idea
/.gen
/.vscode
/.vscode
/.stamp
63 changes: 41 additions & 22 deletions sdk-core-protos/protos/api_upstream/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -15,58 +15,77 @@ GOPATH := $(shell go env GOPATH)
endif

GOBIN := $(if $(shell go env GOBIN),$(shell go env GOBIN),$(GOPATH)/bin)
SHELL := PATH=$(GOBIN):$(PATH) /bin/sh
PATH := $(GOBIN):$(PATH)
STAMPDIR := .stamp

COLOR := "\e[1;36m%s\e[0m\n"

# Only prints output if the exit code is non-zero
define silent_exec
@output=$$($(1) 2>&1); \
status=$$?; \
if [ $$status -ne 0 ]; then \
echo "$$output"; \
fi; \
exit $$status
endef

PROTO_ROOT := .
PROTO_FILES = $(shell find $(PROTO_ROOT) -name "*.proto")
PROTO_FILES = $(shell find temporal -name "*.proto")
PROTO_DIRS = $(sort $(dir $(PROTO_FILES)))
PROTO_OUT := .gen
PROTO_IMPORTS = -I=$(PROTO_ROOT) -I=$(shell go list -modfile build/go.mod -m -f '{{.Dir}}' github.com/temporalio/gogo-protobuf)/protobuf
PROTO_IMPORTS = \
-I=$(PROTO_ROOT)
PROTO_PATHS = paths=source_relative:$(PROTO_OUT)

$(PROTO_OUT):
mkdir $(PROTO_OUT)

##### Compile proto files for go #####
grpc: buf-lint api-linter buf-breaking gogo-grpc fix-path
grpc: buf-lint api-linter buf-breaking clean go-grpc fix-path

go-grpc: clean $(PROTO_OUT)
printf $(COLOR) "Compile for go-gRPC..."
$(foreach PROTO_DIR,$(PROTO_DIRS),protoc --fatal_warnings $(PROTO_IMPORTS) --go_out=plugins=grpc,paths=source_relative:$(PROTO_OUT) $(PROTO_DIR)*.proto;)

gogo-grpc: clean $(PROTO_OUT)
printf $(COLOR) "Compile for gogo-gRPC..."
$(foreach PROTO_DIR,$(PROTO_DIRS),protoc --fatal_warnings $(PROTO_IMPORTS) --gogoslick_out=Mgoogle/protobuf/any.proto=github.com/gogo/protobuf/types,Mgoogle/protobuf/wrappers.proto=github.com/gogo/protobuf/types,Mgoogle/protobuf/duration.proto=github.com/gogo/protobuf/types,Mgoogle/protobuf/descriptor.proto=github.com/golang/protobuf/protoc-gen-go/descriptor,Mgoogle/protobuf/timestamp.proto=github.com/gogo/protobuf/types,plugins=grpc,paths=source_relative:$(PROTO_OUT) $(PROTO_DIR)*.proto;)
$(foreach PROTO_DIR,$(PROTO_DIRS),\
protoc --fatal_warnings $(PROTO_IMPORTS) \
--go_out=$(PROTO_PATHS) \
--grpc-gateway_out=allow_patch_feature=false,$(PROTO_PATHS)\
--doc_out=html,index.html,source_relative:$(PROTO_OUT) \
$(PROTO_DIR)*.proto;)

fix-path:
mv -f $(PROTO_OUT)/temporal/api/* $(PROTO_OUT) && rm -rf $(PROTO_OUT)/temporal

##### Plugins & tools #####
grpc-install: gogo-protobuf-install
printf $(COLOR) "Install/update gRPC plugins..."
go install google.golang.org/grpc/cmd/protoc-gen-go-grpc@latest

gogo-protobuf-install: go-protobuf-install
go install -modfile build/go.mod github.com/temporalio/gogo-protobuf/protoc-gen-gogoslick

go-protobuf-install:
go install github.com/golang/protobuf/protoc-gen-go@v1.5.2
grpc-install:
@printf $(COLOR) "Install/update protoc and plugins..."
@go install google.golang.org/protobuf/cmd/protoc-gen-go@latest
@go install google.golang.org/grpc/cmd/protoc-gen-go-grpc@latest
@go install github.com/grpc-ecosystem/grpc-gateway/v2/protoc-gen-grpc-gateway@latest
@go install github.com/pseudomuto/protoc-gen-doc/cmd/protoc-gen-doc@latest

api-linter-install:
printf $(COLOR) "Install/update api-linter..."
go install github.com/googleapis/api-linter/cmd/api-linter@v1.32.3

buf-install:
printf $(COLOR) "Install/update buf..."
go install github.com/bufbuild/buf/cmd/buf@v1.6.0
go install github.com/bufbuild/buf/cmd/buf@v1.27.0

##### Linters #####
api-linter:
printf $(COLOR) "Run api-linter..."
api-linter --set-exit-status $(PROTO_IMPORTS) --config $(PROTO_ROOT)/api-linter.yaml $(PROTO_FILES)
$(call silent_exec, api-linter --set-exit-status $(PROTO_IMPORTS) --config $(PROTO_ROOT)/api-linter.yaml $(PROTO_FILES))

$(STAMPDIR):
mkdir $@

$(STAMPDIR)/buf-mod-prune: $(STAMPDIR) buf.yaml
printf $(COLOR) "Pruning buf module"
buf mod prune
touch $@

buf-lint:
buf-lint: $(STAMPDIR)/buf-mod-prune
printf $(COLOR) "Run buf linter..."
(cd $(PROTO_ROOT) && buf lint)

Expand All @@ -77,4 +96,4 @@ buf-breaking:
##### Clean #####
clean:
printf $(COLOR) "Delete generated go files..."
rm -rf $(PROTO_OUT)
rm -rf $(PROTO_OUT) $(BUF_DEPS)
68 changes: 42 additions & 26 deletions sdk-core-protos/protos/api_upstream/api-linter.yaml
Original file line number Diff line number Diff line change
@@ -1,40 +1,56 @@
- included_paths:
- '**/*.proto'
- "**/*.proto"
disabled_rules:
- 'core::0192::has-comments'
- "core::0192::has-comments"

- included_paths:
- '**/message.proto'
- "**/message.proto"
disabled_rules:
- 'core::0122::name-suffix'
- 'core::0123::resource-annotation'
- "core::0122::name-suffix"
- "core::0123::resource-annotation"

- included_paths:
- '**/workflowservice/v1/request_response.proto'
- '**/operatorservice/v1/request_response.proto'
- "**/workflowservice/v1/request_response.proto"
- "**/operatorservice/v1/request_response.proto"
disabled_rules:
- 'core::0122::name-suffix'
- 'core::0131::request-name-required'
- 'core::0131::request-unknown-fields'
- 'core::0132::request-parent-required'
- 'core::0132::request-unknown-fields'
- 'core::0132::response-unknown-fields'
- 'core::0134::request-unknown-fields'
- 'core::0158::request-page-size-field'
- 'core::0158::request-page-token-field'
- 'core::0158::response-next-page-token-field'
- 'core::0158::response-plural-first-field'
- 'core::0158::response-repeated-first-field'
- "core::0122::name-suffix"
- "core::0131::request-name-required"
- "core::0131::request-unknown-fields"
- "core::0132::request-parent-required"
- "core::0132::request-unknown-fields"
- "core::0132::response-unknown-fields"
- "core::0134::request-unknown-fields"
- "core::0158::request-page-size-field"
- "core::0158::request-page-token-field"
- "core::0158::response-next-page-token-field"
- "core::0158::response-plural-first-field"
- "core::0158::response-repeated-first-field"

- included_paths:
- '**/workflowservice/v1/service.proto'
- '**/operatorservice/v1/service.proto'
- "**/workflowservice/v1/service.proto"
- "**/operatorservice/v1/service.proto"
disabled_rules:
- 'core::0127::http-annotation'
- 'core::0131::method-signature'
- 'core::0131::response-message-name'
# We extract specific fields in URL since the gRPC API predates the HTTP API
- "core::0127::resource-name-extraction"

# We do not require specific "Get", "Create", "Update", or "Delete" RPC
# rules just because we happen to use a known RPC name prefix
- "core::0131"
- "core::0133"
- "core::0134"
- "core::0135"

# We don't require HTTP calls to be suffixed with the same name as the gRPC
# name
- "core::0136::http-uri-suffix"

- included_paths:
- "**/operatorservice/v1/service.proto"
disabled_rules:
# Do not require HTTP annotations on OperatorService calls at this time
- "core::0127::http-annotation"

- included_paths:
- 'dependencies/gogoproto/gogo.proto'
- "google/**/*.proto"
disabled_rules:
- 'all'
- "all"
20 changes: 20 additions & 0 deletions sdk-core-protos/protos/api_upstream/buf.gen.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
version: v1
plugins:
- plugin: buf.build/protocolbuffers/go:v1.31.0
out: ./
opt:
- paths=source_relative
- plugin: buf.build/grpc/go:v1.3.0
out: ./
opt:
- paths=source_relative
- plugin: buf.build/grpc-ecosystem/gateway:v2.18.0
out: ./
opt:
- paths=source_relative
- allow_patch_feature=false
- name: go-helpers
out: ./
path: ["go", "run", "./protoc-gen-go-helpers"]
opt:
- paths=source_relative
11 changes: 11 additions & 0 deletions sdk-core-protos/protos/api_upstream/buf.lock
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
# Generated by buf. DO NOT EDIT.
version: v1
deps:
- remote: buf.build
owner: googleapis
repository: googleapis
commit: 28151c0d0a1641bf938a7672c500e01d
- remote: buf.build
owner: grpc-ecosystem
repository: grpc-gateway
commit: 048ae6ff94ca4476b3225904b1078fad
11 changes: 10 additions & 1 deletion sdk-core-protos/protos/api_upstream/buf.yaml
Original file line number Diff line number Diff line change
@@ -1,9 +1,18 @@
version: v1
deps:
- buf.build/grpc-ecosystem/grpc-gateway
build:
excludes:
# Buf won't accept a local dependency on the google protos but we need them
# to run api-linter, so just tell buf it ignore it
- google
breaking:
use:
- WIRE_JSON
ignore:
- google
lint:
use:
- DEFAULT
ignore:
- dependencies
- google
Loading
Loading