Skip to content

Commit

Permalink
Prefix some errors with rule identifiers (#1281)
Browse files Browse the repository at this point in the history
  • Loading branch information
cretz authored Feb 1, 2024
1 parent e2bec16 commit a7c9528
Show file tree
Hide file tree
Showing 6 changed files with 23 additions and 23 deletions.
18 changes: 9 additions & 9 deletions internal/internal_command_state_machine.go
Original file line number Diff line number Diff line change
Expand Up @@ -442,7 +442,7 @@ func panicIllegalState(message string) {

func (d *commandStateMachineBase) failStateTransition(event string) {
// this is when we detect illegal state transition, likely due to ill history sequence or nondeterministic workflow code
panicIllegalState(fmt.Sprintf("invalid state transition: attempt to %v, %v", event, d))
panicIllegalState(fmt.Sprintf("[TMPRL1100] invalid state transition: attempt to %v, %v", event, d))
}

func (d *commandStateMachineBase) handleCommandSent() {
Expand Down Expand Up @@ -918,7 +918,7 @@ func (h *commandsHelper) incrementNextCommandEventIDIfVersionMarker() {
func (h *commandsHelper) getCommand(id commandID) commandStateMachine {
command, ok := h.commands[id]
if !ok {
panicMsg := fmt.Sprintf("unknown command %v, possible causes are nondeterministic workflow definition code"+
panicMsg := fmt.Sprintf("[TMPRL1100] unknown command %v, possible causes are nondeterministic workflow definition code"+
" or incompatible change in the workflow definition", id)
panicIllegalState(panicMsg)
}
Expand All @@ -927,7 +927,7 @@ func (h *commandsHelper) getCommand(id commandID) commandStateMachine {

func (h *commandsHelper) addCommand(command commandStateMachine) {
if _, ok := h.commands[command.getID()]; ok {
panicMsg := fmt.Sprintf("adding duplicate command %v", command)
panicMsg := fmt.Sprintf("[TMPRL1100] adding duplicate command %v", command)
panicIllegalState(panicMsg)
}
element := h.orderedCommands.PushBack(command)
Expand Down Expand Up @@ -957,7 +957,7 @@ func (h *commandsHelper) removeCancelOfResolvedCommand(commandID commandID) {
func (h *commandsHelper) moveCommandToBack(command commandStateMachine) {
elem := h.commands[command.getID()]
if elem == nil {
panicIllegalState(fmt.Sprintf("moving command not present %v", command))
panicIllegalState(fmt.Sprintf("[TMPRL1100] moving command not present %v", command))
}
h.orderedCommands.Remove(elem)
h.commands[command.getID()] = h.orderedCommands.PushBack(command)
Expand Down Expand Up @@ -993,7 +993,7 @@ func (h *commandsHelper) handleActivityTaskClosed(activityID string, scheduledEv

func (h *commandsHelper) handleActivityTaskScheduled(activityID string, scheduledEventID int64) {
if _, ok := h.scheduledEventIDToActivityID[scheduledEventID]; !ok {
panicMsg := fmt.Sprintf("lookup failed for scheduledEventID to activityID: scheduleEventID: %v, activityID: %v",
panicMsg := fmt.Sprintf("[TMPRL1100] lookup failed for scheduledEventID to activityID: scheduleEventID: %v, activityID: %v",
scheduledEventID, activityID)
panicIllegalState(panicMsg)
}
Expand All @@ -1005,7 +1005,7 @@ func (h *commandsHelper) handleActivityTaskScheduled(activityID string, schedule
func (h *commandsHelper) handleActivityTaskCancelRequested(scheduledEventID int64) {
activityID, ok := h.scheduledEventIDToActivityID[scheduledEventID]
if !ok {
panicIllegalState(fmt.Sprintf("unable to find activityID for the scheduledEventID: %v", scheduledEventID))
panicIllegalState(fmt.Sprintf("[TMPRL1100] unable to find activityID for the scheduledEventID: %v", scheduledEventID))
}
command := h.getCommand(makeCommandID(commandTypeActivity, activityID))
command.handleCancelInitiatedEvent()
Expand All @@ -1030,12 +1030,12 @@ func (h *commandsHelper) getActivityAndScheduledEventIDs(event *historypb.Histor
case enumspb.EVENT_TYPE_ACTIVITY_TASK_TIMED_OUT:
scheduledEventID = event.GetActivityTaskTimedOutEventAttributes().GetScheduledEventId()
default:
panicIllegalState(fmt.Sprintf("unexpected event type: %v", event.GetEventType()))
panicIllegalState(fmt.Sprintf("[TMPRL1100] unexpected event type: %v", event.GetEventType()))
}

activityID, ok := h.scheduledEventIDToActivityID[scheduledEventID]
if !ok {
panicIllegalState(fmt.Sprintf("unable to find activityID for the event: %v", util.HistoryEventToString(event)))
panicIllegalState(fmt.Sprintf("[TMPRL1100] unable to find activityID for the event: %v", util.HistoryEventToString(event)))
}
return activityID, scheduledEventID
}
Expand Down Expand Up @@ -1076,7 +1076,7 @@ func (h *commandsHelper) recordVersionMarker(changeID string, version Version, d

func (h *commandsHelper) handleVersionMarker(eventID int64, changeID string, searchAttrUpdated bool) {
if _, ok := h.versionMarkerLookup[eventID]; ok {
panicMsg := fmt.Sprintf("marker event already exists for eventID in lookup: eventID: %v, changeID: %v",
panicMsg := fmt.Sprintf("[TMPRL1100] marker event already exists for eventID in lookup: eventID: %v, changeID: %v",
eventID, changeID)
panicIllegalState(panicMsg)
}
Expand Down
6 changes: 3 additions & 3 deletions internal/internal_coroutines_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -968,7 +968,7 @@ func TestDeadlockDetectorAndAwaitRace(t *testing.T) {
defer d.Close()
// Expecting deadlock detection timeout instead of a data race.
err := d.ExecuteUntilAllBlocked(defaultDeadlockDetectionTimeout)
require.EqualError(t, err, "Potential deadlock detected: workflow goroutine \"root\" didn't yield for over a second")
require.EqualError(t, err, "[TMPRL1101] Potential deadlock detected: workflow goroutine \"root\" didn't yield for over a second")
}

func TestAwaitCancellation(t *testing.T) {
Expand Down Expand Up @@ -1670,7 +1670,7 @@ func TestDeadlockDetectorStackTrace(t *testing.T) {
c.Receive(ctx, nil) // blocked forever
})
GoNamed(ctx, "sleeper", func(ctx Context) {
time.Sleep(defaultDeadlockDetectionTimeout + 100 * time.Millisecond)
time.Sleep(defaultDeadlockDetectionTimeout + 100*time.Millisecond)
})
c.Receive(ctx, nil) // blocked forever
})
Expand All @@ -1679,7 +1679,7 @@ func TestDeadlockDetectorStackTrace(t *testing.T) {

var wfPanic *workflowPanicError
require.ErrorAs(t, err, &wfPanic)
require.Equal(t, `Potential deadlock detected: workflow goroutine "sleeper" didn't yield for over a second`, wfPanic.Error())
require.Equal(t, `[TMPRL1101] Potential deadlock detected: workflow goroutine "sleeper" didn't yield for over a second`, wfPanic.Error())
require.Regexp(t, `^coroutine sleeper \[running\]:\ntime\.Sleep\(0x[\da-f]+\)\n`, wfPanic.StackTrace())
require.Equal(t, 4, strings.Count(wfPanic.StackTrace(), "\n"), "2 stack frames expected")
}
10 changes: 5 additions & 5 deletions internal/internal_event_handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -783,12 +783,12 @@ func (wc *workflowEnvironmentImpl) RequestCancelTimer(timerID TimerID) {

func validateVersion(changeID string, version, minSupported, maxSupported Version) {
if version < minSupported {
panicIllegalState(fmt.Sprintf("Workflow code removed support of version %v. "+
panicIllegalState(fmt.Sprintf("[TMPRL1100] Workflow code removed support of version %v. "+
"for \"%v\" changeID. The oldest supported version is %v",
version, changeID, minSupported))
}
if version > maxSupported {
panicIllegalState(fmt.Sprintf("Workflow code is too old to support version %v "+
panicIllegalState(fmt.Sprintf("[TMPRL1100] Workflow code is too old to support version %v "+
"for \"%v\" changeID. The maximum supported version is %v",
version, changeID, maxSupported))
}
Expand Down Expand Up @@ -864,7 +864,7 @@ func (wc *workflowEnvironmentImpl) SideEffect(f func() (*commonpb.Payloads, erro
for k := range wc.sideEffectResult {
keys = append(keys, k)
}
panicIllegalState(fmt.Sprintf("No cached result found for side effectID=%v. KnownSideEffects=%v",
panicIllegalState(fmt.Sprintf("[TMPRL1100] No cached result found for side effectID=%v. KnownSideEffects=%v",
sideEffectID, keys))
}

Expand Down Expand Up @@ -976,7 +976,7 @@ func (wc *workflowEnvironmentImpl) MutableSideEffect(id string, f func() interfa

if wc.isReplay {
// This should not happen
panicIllegalState(fmt.Sprintf("Non deterministic workflow code change detected. MutableSideEffect API call doesn't have a correspondent event in the workflow history. MutableSideEffect ID: %s", id))
panicIllegalState(fmt.Sprintf("[TMPRL1100] Non deterministic workflow code change detected. MutableSideEffect API call doesn't have a correspondent event in the workflow history. MutableSideEffect ID: %s", id))
}

return wc.recordMutableSideEffect(id, callCount, wc.encodeValue(f()))
Expand Down Expand Up @@ -1566,7 +1566,7 @@ func (weh *workflowExecutionEventHandlerImpl) handleLocalActivityMarker(details
if la, ok := weh.pendingLaTasks[lamd.ActivityID]; ok {
if len(lamd.ActivityType) > 0 && lamd.ActivityType != la.params.ActivityType {
// history marker mismatch to the current code.
panicMsg := fmt.Sprintf("code execute local activity %v, but history event found %v, markerData: %v", la.params.ActivityType, lamd.ActivityType, markerData)
panicMsg := fmt.Sprintf("[TMPRL1100] code execute local activity %v, but history event found %v, markerData: %v", la.params.ActivityType, lamd.ActivityType, markerData)
panicIllegalState(panicMsg)
}
weh.commandsHelper.recordLocalActivityMarker(lamd.ActivityID, details, failure)
Expand Down
6 changes: 3 additions & 3 deletions internal/internal_task_handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -1453,15 +1453,15 @@ matchLoop:
}

if d == nil {
return historyMismatchErrorf("nondeterministic workflow: missing replay command for %s", util.HistoryEventToString(e))
return historyMismatchErrorf("[TMPRL1100] nondeterministic workflow: missing replay command for %s", util.HistoryEventToString(e))
}

if e == nil {
return historyMismatchErrorf("nondeterministic workflow: extra replay command for %s", util.CommandToString(d))
return historyMismatchErrorf("[TMPRL1100] nondeterministic workflow: extra replay command for %s", util.CommandToString(d))
}

if !isCommandMatchEvent(d, e, msgs) {
return historyMismatchErrorf("nondeterministic workflow: history event is %s, replay command is %s",
return historyMismatchErrorf("[TMPRL1100] nondeterministic workflow: history event is %s, replay command is %s",
util.HistoryEventToString(e), util.CommandToString(d))
}

Expand Down
2 changes: 1 addition & 1 deletion internal/internal_update.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ func (up *updateProtocol) requireState(action string, valid ...updateState) {
return
}
}
panicIllegalState(fmt.Sprintf("invalid action %q in update protocol %+v", action, up))
panicIllegalState(fmt.Sprintf("[TMPRL1100] invalid action %q in update protocol %+v", action, up))
}

func (up *updateProtocol) HandleMessage(msg *protocolpb.Message) error {
Expand Down
4 changes: 2 additions & 2 deletions internal/internal_workflow.go
Original file line number Diff line number Diff line change
Expand Up @@ -279,7 +279,7 @@ var _ WaitGroup = (*waitGroupImpl)(nil)
var _ dispatcher = (*dispatcherImpl)(nil)

// 1MB buffer to fit combined stack trace of all active goroutines
var stackBuf [1024*1024]byte
var stackBuf [1024 * 1024]byte

var (
errCoroStackNotFound = errors.New("coroutine stack not found")
Expand Down Expand Up @@ -1065,7 +1065,7 @@ func (s *coroutineState) call(timeout time.Duration) {
if err != nil {
st = fmt.Sprintf("<%s>", err)
}
msg := fmt.Sprintf("Potential deadlock detected: "+
msg := fmt.Sprintf("[TMPRL1101] Potential deadlock detected: "+
"workflow goroutine %q didn't yield for over a second", s.name)
s.closed.Store(true)
s.panicError = newWorkflowPanicError(msg, st)
Expand Down

0 comments on commit a7c9528

Please sign in to comment.