Skip to content

Commit

Permalink
Add SDK version and name to history (#1245)
Browse files Browse the repository at this point in the history
Add SDK version and name to history
  • Loading branch information
Quinn-With-Two-Ns authored Sep 25, 2023
1 parent ff9b3f4 commit 59d8488
Show file tree
Hide file tree
Showing 5 changed files with 116 additions and 10 deletions.
20 changes: 20 additions & 0 deletions internal/internal_event_handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,10 @@ type (
contextPropagators []ContextPropagator
deadlockDetectionTimeout time.Duration
sdkFlags *sdkFlags
sdkVersionUpdated bool
sdkVersion string
sdkNameUpdated bool
sdkName string

protocols *protocol.Registry
}
Expand Down Expand Up @@ -350,6 +354,22 @@ func (wc *workflowEnvironmentImpl) Send(msg *protocolpb.Message, opts ...msgSend
wc.outbox = append(wc.outbox, outboxEntry{msg: msg, eventPredicate: sendCfg.pred})
}

func (wc *workflowEnvironmentImpl) getNewSdkNameAndReset() string {
if wc.sdkNameUpdated {
wc.sdkNameUpdated = false
return wc.sdkName
}
return ""
}

func (wc *workflowEnvironmentImpl) getNewSdkVersionAndReset() string {
if wc.sdkVersionUpdated {
wc.sdkVersionUpdated = false
return wc.sdkVersion
}
return ""
}

func (wc *workflowEnvironmentImpl) getNextLocalActivityID() string {
wc.localActivityCounterID++
return getStringID(wc.localActivityCounterID)
Expand Down
65 changes: 55 additions & 10 deletions internal/internal_task_handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -179,6 +179,8 @@ type (
next []*historypb.HistoryEvent
nextFlags []sdkFlag
binaryChecksum string
sdkVersion string
sdkName string
}

workflowTaskHeartbeatError struct {
Expand All @@ -195,6 +197,16 @@ type (
flags []sdkFlag
msgs []*protocolpb.Message
binaryChecksum string
sdkVersion string
sdkName string
}

finishedTask struct {
isFailed bool
binaryChecksum string
flags []sdkFlag
sdkVersion string
sdkName string
}
)

Expand Down Expand Up @@ -237,15 +249,15 @@ func (eh *history) IsReplayEvent(event *historypb.HistoryEvent) bool {
return event.GetEventId() <= eh.workflowTask.task.GetPreviousStartedEventId() || isCommandEvent(event.GetEventType())
}

// IsNextWorkflowTaskFailed checks if the workflow task failed or completed. If it did complete returns some information
// isNextWorkflowTaskFailed checks if the workflow task failed or completed. If it did complete returns some information
// on the completed workflow task.
func (eh *history) IsNextWorkflowTaskFailed() (isFailed bool, binaryChecksum string, flags []sdkFlag, err error) {
func (eh *history) isNextWorkflowTaskFailed() (task finishedTask, err error) {
nextIndex := eh.currentIndex + 1
// Server can return an empty page so if we need the next event we must keep checking until we either get it
// or know we have no more pages to check
for nextIndex >= len(eh.loadedEvents) && eh.hasMoreEvents() { // current page ends and there is more pages
if err := eh.loadMoreEvents(); err != nil {
return false, "", nil, err
return finishedTask{}, err
}
}

Expand All @@ -262,14 +274,20 @@ func (eh *history) IsNextWorkflowTaskFailed() (isFailed bool, binaryChecksum str
f := sdkFlagFromUint(flag)
if !f.isValid() {
// If a flag is not recognized (value is too high or not defined), it must fail the workflow task
return false, "", nil, errors.New("could not recognize SDK flag")
return finishedTask{}, errors.New("could not recognize SDK flag")
}
flags = append(flags, f)
}
}
return isFailed, binaryChecksum, flags, nil
return finishedTask{
isFailed: isFailed,
binaryChecksum: binaryChecksum,
flags: flags,
sdkName: nextEvent.GetWorkflowTaskCompletedEventAttributes().GetSdkMetadata().GetSdkName(),
sdkVersion: nextEvent.GetWorkflowTaskCompletedEventAttributes().GetSdkMetadata().GetSdkVersion(),
}, nil
}
return false, "", nil, nil
return finishedTask{}, nil
}

func (eh *history) loadMoreEvents() error {
Expand Down Expand Up @@ -318,11 +336,15 @@ func (eh *history) nextTask() (*preparedTask, error) {
}
eh.next = firstTask.events
eh.nextFlags = firstTask.flags
eh.sdkName = firstTask.sdkName
eh.sdkVersion = firstTask.sdkVersion
}

result := eh.next
checksum := eh.binaryChecksum
sdkFlags := eh.nextFlags
sdkName := eh.sdkName
sdkVersion := eh.sdkVersion

var markers []*historypb.HistoryEvent
var msgs []*protocolpb.Message
Expand All @@ -333,6 +355,8 @@ func (eh *history) nextTask() (*preparedTask, error) {
}
eh.next = nextTaskEvents.events
eh.nextFlags = nextTaskEvents.flags
eh.sdkName = nextTaskEvents.sdkName
eh.sdkVersion = nextTaskEvents.sdkVersion
markers = nextTaskEvents.markers
msgs = nextTaskEvents.msgs
}
Expand All @@ -342,6 +366,8 @@ func (eh *history) nextTask() (*preparedTask, error) {
flags: sdkFlags,
msgs: msgs,
binaryChecksum: checksum,
sdkName: sdkName,
sdkVersion: sdkVersion,
}, nil
}

Expand Down Expand Up @@ -408,16 +434,22 @@ OrderEvents:

switch event.GetEventType() {
case enumspb.EVENT_TYPE_WORKFLOW_TASK_STARTED:
isFailed, binaryChecksum, newFlags, err1 := eh.IsNextWorkflowTaskFailed()
finishedTask, err1 := eh.isNextWorkflowTaskFailed()
if err1 != nil {
err := err1
return nil, err
}
if !isFailed {
eh.binaryChecksum = binaryChecksum
if !finishedTask.isFailed {
eh.binaryChecksum = finishedTask.binaryChecksum
eh.currentIndex++
taskEvents.events = append(taskEvents.events, event)
taskEvents.flags = append(taskEvents.flags, newFlags...)
taskEvents.flags = append(taskEvents.flags, finishedTask.flags...)
if finishedTask.sdkName != "" {
taskEvents.sdkName = finishedTask.sdkName
}
if finishedTask.sdkVersion != "" {
taskEvents.sdkVersion = finishedTask.sdkVersion
}
break OrderEvents
}
case enumspb.EVENT_TYPE_WORKFLOW_TASK_SCHEDULED,
Expand Down Expand Up @@ -959,9 +991,20 @@ ProcessEvents:
var msgs *eventMsgIndex
if isReplay {
msgs = indexMessagesByEventID(historyMessages)

eventHandler.sdkVersion = nextTask.sdkVersion
eventHandler.sdkName = nextTask.sdkName
} else {
msgs = indexMessagesByEventID(taskMessages)
taskMessages = []*protocolpb.Message{}
if eventHandler.sdkVersion != SDKVersion {
eventHandler.sdkVersionUpdated = true
eventHandler.sdkVersion = SDKVersion
}
if eventHandler.sdkName != SDKName {
eventHandler.sdkNameUpdated = true
eventHandler.sdkName = SDKName
}
}

eventHandler.sdkFlags.set(flags...)
Expand Down Expand Up @@ -1729,6 +1772,8 @@ func (wth *workflowTaskHandlerImpl) completeWorkflow(
MeteringMetadata: &commonpb.MeteringMetadata{NonfirstLocalActivityExecutionAttempts: nonfirstLAAttempts},
SdkMetadata: &sdk.WorkflowTaskCompletedMetadata{
LangUsedFlags: langUsedFlags,
SdkName: eventHandler.getNewSdkNameAndReset(),
SdkVersion: eventHandler.getNewSdkVersionAndReset(),
},
WorkerVersionStamp: &commonpb.WorkerVersionStamp{
BuildId: wth.workerBuildID,
Expand Down
4 changes: 4 additions & 0 deletions internal/internal_task_handlers_interfaces_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -210,6 +210,8 @@ func (s *PollLayerInterfacesTestSuite) TestGetNextCommandsSdkFlags() {
StartedEventId: 3,
SdkMetadata: &sdk.WorkflowTaskCompletedMetadata{
LangUsedFlags: []uint32{SDKFlagLimitChangeVersionSASize},
SdkName: SDKName,
SdkVersion: "1.0",
},
}),
createTestEventVersionMarker(5, 4, "test-id", 1),
Expand Down Expand Up @@ -241,6 +243,8 @@ func (s *PollLayerInterfacesTestSuite) TestGetNextCommandsSdkFlags() {
// function is run.
s.Equal(1, len(nextTask.flags))
s.EqualValues(SDKFlagLimitChangeVersionSASize, nextTask.flags[0])
s.EqualValues(SDKName, nextTask.sdkName)
s.EqualValues("1.0", nextTask.sdkVersion)

nextTask, err = eh.nextTask()

Expand Down
3 changes: 3 additions & 0 deletions internal/version.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,9 @@ const (
// Server validates if SDKVersion fits its supported range and rejects request if it doesn't.
SDKVersion = "1.24.0"

// SDKName represents the name of the SDK.
SDKName = clientNameHeaderValue

// SupportedServerVersions is a semver rages (https://github.com/blang/semver#ranges) of server versions that
// are supported by this Temporal SDK.
// Server validates if its version fits into SupportedServerVersions range and rejects request if it doesn't.
Expand Down
34 changes: 34 additions & 0 deletions test/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -386,6 +386,40 @@ func (ts *IntegrationTestSuite) TestPanicActivityWorkflow() {
}, res)
}

func (ts *IntegrationTestSuite) TestSDKNameAndVersionWritten() {
const wfID = "test-sdk-name-and-version"
wfOpts := ts.startWorkflowOptions(wfID)
ctx, cancel := context.WithTimeout(context.Background(), ctxTimeout)
defer cancel()

run, err := ts.client.ExecuteWorkflow(ctx, wfOpts, ts.workflows.sleep, time.Second)
ts.NoError(err)

var result int
err = run.Get(ctx, &result)
ts.NoError(err)

iter := ts.client.GetWorkflowHistory(ctx, run.GetID(), run.GetRunID(), false, enumspb.HISTORY_EVENT_FILTER_TYPE_ALL_EVENT)
var firstTaskFound bool
for iter.HasNext() {
event, err := iter.Next()
ts.NoError(err)
if event.EventType == enumspb.EVENT_TYPE_WORKFLOW_TASK_COMPLETED {
sdkName := event.GetWorkflowTaskCompletedEventAttributes().GetSdkMetadata().GetSdkName()
sdkVersion := event.GetWorkflowTaskCompletedEventAttributes().GetSdkMetadata().GetSdkVersion()
if !firstTaskFound {
firstTaskFound = true
// The name and version should only be written once if they don't change
ts.Equal(internal.SDKName, sdkName)
ts.Equal(internal.SDKVersion, sdkVersion)
} else {
ts.Equal("", sdkName)
ts.Equal("", sdkVersion)
}
}
}
}

func (ts *IntegrationTestSuite) TestDeadlockDetection() {
var expected []string
wfOpts := ts.startWorkflowOptions("test-deadlock")
Expand Down

0 comments on commit 59d8488

Please sign in to comment.