Skip to content

Commit

Permalink
address comments
Browse files Browse the repository at this point in the history
  • Loading branch information
Wenquan Xing committed Jul 17, 2018
1 parent 8fd74cf commit 440a3a5
Show file tree
Hide file tree
Showing 16 changed files with 59 additions and 66 deletions.
3 changes: 2 additions & 1 deletion service/history/conflictResolver_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ package history
import (
"os"
"testing"
"time"

"github.com/pborman/uuid"
log "github.com/sirupsen/logrus"
Expand Down Expand Up @@ -185,7 +186,7 @@ func (s *conflictResolverSuite) TestGetHistory() {

func (s *conflictResolverSuite) TestReset() {
sourceCluster := "some random source cluster"
startTime := common.NewRealTimeSource().Now()
startTime := time.Now()
domainID := s.mockContext.domainID
execution := s.mockContext.workflowExecution
nextEventID := int64(2)
Expand Down
16 changes: 5 additions & 11 deletions service/history/historyEngine.go
Original file line number Diff line number Diff line change
Expand Up @@ -356,8 +356,7 @@ func (e *historyEngineImpl) StartWorkflowExecution(startRequest *h.StartWorkflow
replicationTasks = append(replicationTasks, replicationTask)
}
}
setTaskVersion(msBuilder.GetCurrentVersion(), transferTasks, timerTasks)
setTransferTaskTimestamp(common.NewRealTimeSource().Now(), transferTasks)
setTaskInfo(msBuilder.GetCurrentVersion(), time.Now().UTC(), transferTasks, timerTasks)

createWorkflow := func(isBrandNew bool, prevRunID string) (string, error) {
_, err = e.shard.CreateWorkflowExecution(&persistence.CreateWorkflowExecutionRequest{
Expand Down Expand Up @@ -1999,8 +1998,7 @@ func (e *historyEngineImpl) SignalWithStartWorkflowExecution(ctx context.Context
replicationTasks = append(replicationTasks, replicationTask)
}
}
setTaskVersion(msBuilder.GetCurrentVersion(), transferTasks, timerTasks)
setTransferTaskTimestamp(common.NewRealTimeSource().Now(), transferTasks)
setTaskInfo(msBuilder.GetCurrentVersion(), time.Now().UTC(), transferTasks, timerTasks)

createWorkflow := func(isBrandNew bool, prevRunID string) (string, error) {
_, err = e.shard.CreateWorkflowExecution(&persistence.CreateWorkflowExecutionRequest{
Expand Down Expand Up @@ -2761,17 +2759,13 @@ func getStartRequest(domainID string,
return startRequest
}

func setTaskVersion(version int64, transferTasks []persistence.Task, timerTasks []persistence.Task) {
func setTaskInfo(version int64, timestamp time.Time, transferTasks []persistence.Task, timerTasks []persistence.Task) {
// set both the task version, as well as the timestamp on the transfer tasks
for _, task := range transferTasks {
task.SetVersion(version)
task.SetVisibilityTimestamp(timestamp)
}
for _, task := range timerTasks {
task.SetVersion(version)
}
}

func setTransferTaskTimestamp(timestamp time.Time, transferTasks []persistence.Task) {
for _, task := range transferTasks {
task.SetVisibilityTimestamp(timestamp)
}
}
4 changes: 2 additions & 2 deletions service/history/historyEngine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -321,13 +321,13 @@ func (s *engineSuite) TestGetMutableStateLongPoll() {

// long poll, new event happen before long poll timeout
go asycWorkflowUpdate(time.Second * 2)
start := common.NewRealTimeSource().Now()
start := time.Now()
response, err = s.mockHistoryEngine.GetMutableState(ctx, &history.GetMutableStateRequest{
DomainUUID: common.StringPtr(domainID),
Execution: &execution,
ExpectedNextEventId: common.Int64Ptr(4),
})
s.True(common.NewRealTimeSource().Now().After(start.Add(time.Second * 1)))
s.True(time.Now().After(start.Add(time.Second * 1)))
s.Nil(err)
s.Equal(int64(5), *response.NextEventId)
}
Expand Down
2 changes: 1 addition & 1 deletion service/history/historyEventNotifier.go
Original file line number Diff line number Diff line change
Expand Up @@ -177,7 +177,7 @@ func (notifier *historyEventNotifierImpl) dispatchHistoryEventNotification(event

func (notifier *historyEventNotifierImpl) enqueueHistoryEventNotification(event *historyEventNotification) {
// set the timestamp just before enqueuing the event
event.timestamp = common.NewRealTimeSource().Now()
event.timestamp = time.Now()
select {
case notifier.eventsChan <- event:
default:
Expand Down
8 changes: 6 additions & 2 deletions service/history/historyReplicator.go
Original file line number Diff line number Diff line change
Expand Up @@ -548,8 +548,12 @@ func (r *historyReplicator) replicateWorkflowStarted(ctx context.Context, contex
}
transferTasks := sBuilder.getTransferTasks()
timerTasks := sBuilder.getTimerTasks()
setTaskVersion(msBuilder.GetCurrentVersion(), transferTasks, timerTasks)
setTransferTaskTimestamp(common.NewFakeTimeSource().Update(time.Unix(0, lastEvent.GetTimestamp())).Now(), transferTasks)
setTaskInfo(
msBuilder.GetCurrentVersion(),
common.NewFakeTimeSource().Update(time.Unix(0, lastEvent.GetTimestamp())).Now(),
transferTasks,
timerTasks,
)

createWorkflow := func(isBrandNew bool, prevRunID string) error {
_, err = r.shard.CreateWorkflowExecution(&persistence.CreateWorkflowExecutionRequest{
Expand Down
15 changes: 8 additions & 7 deletions service/history/historyReplicator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"os"
"reflect"
"testing"
"time"

"github.com/pborman/uuid"
log "github.com/sirupsen/logrus"
Expand Down Expand Up @@ -309,7 +310,7 @@ func (s *historyReplicatorSuite) TestApplyOtherEventsVersionChecking_IncomingGre
}},
History: &shared.History{},
}
startTimeStamp := common.NewRealTimeSource().Now()
startTimeStamp := time.Now().UTC()
msBuilderIn.On("GetReplicationState").Return(&persistence.ReplicationState{
LastWriteVersion: currentLastWriteVersion,
LastWriteEventID: currentLastEventID,
Expand Down Expand Up @@ -359,7 +360,7 @@ func (s *historyReplicatorSuite) TestApplyOtherEventsVersionChecking_IncomingGre
}},
History: &shared.History{},
}
startTimeStamp := common.NewRealTimeSource().Now()
startTimeStamp := time.Now().UTC()
msBuilderIn.On("GetReplicationState").Return(&persistence.ReplicationState{
LastWriteVersion: currentLastWriteVersion,
LastWriteEventID: currentLastEventID,
Expand Down Expand Up @@ -398,7 +399,7 @@ func (s *historyReplicatorSuite) TestApplyOtherEventsVersionChecking_IncomingGre
}},
History: &shared.History{},
}
startTimeStamp := common.NewRealTimeSource().Now()
startTimeStamp := time.Now().UTC()
msBuilderIn.On("GetReplicationState").Return(&persistence.ReplicationState{
LastWriteVersion: currentLastWriteVersion,
LastWriteEventID: currentLastEventID,
Expand Down Expand Up @@ -625,7 +626,7 @@ func (s *historyReplicatorSuite) TestReplicateWorkflowStarted_BrandNew() {
}
sBuilder := &mockStateBuilder{}
requestID := uuid.New()
now := common.NewRealTimeSource().Now()
now := time.Now().UTC()
history := &shared.History{
Events: []*shared.HistoryEvent{
&shared.HistoryEvent{Version: common.Int64Ptr(version), EventId: common.Int64Ptr(1), Timestamp: common.Int64Ptr(now.UnixNano())},
Expand Down Expand Up @@ -1000,7 +1001,7 @@ func (s *historyReplicatorSuite) TestReplicateWorkflowStarted_CurrentComplete_In
}
sBuilder := &mockStateBuilder{}
requestID := uuid.New()
now := common.NewRealTimeSource().Now()
now := time.Now().UTC()
history := &shared.History{
Events: []*shared.HistoryEvent{
&shared.HistoryEvent{Version: common.Int64Ptr(version), EventId: common.Int64Ptr(1), Timestamp: common.Int64Ptr(now.UnixNano())},
Expand Down Expand Up @@ -1154,7 +1155,7 @@ func (s *historyReplicatorSuite) TestReplicateWorkflowStarted_CurrentComplete_In
}
sBuilder := &mockStateBuilder{}
requestID := uuid.New()
now := common.NewRealTimeSource().Now()
now := time.Now().UTC()
history := &shared.History{
Events: []*shared.HistoryEvent{
&shared.HistoryEvent{Version: common.Int64Ptr(version), EventId: common.Int64Ptr(1), Timestamp: common.Int64Ptr(now.UnixNano())},
Expand Down Expand Up @@ -1508,7 +1509,7 @@ func (s *historyReplicatorSuite) TestReplicateWorkflowStarted_CurrentRunning_Inc
}
sBuilder := &mockStateBuilder{}
requestID := uuid.New()
now := common.NewRealTimeSource().Now()
now := time.Now().UTC()
history := &shared.History{
Events: []*shared.HistoryEvent{
&shared.HistoryEvent{Version: common.Int64Ptr(version), EventId: common.Int64Ptr(1), Timestamp: common.Int64Ptr(now.UnixNano())},
Expand Down
4 changes: 2 additions & 2 deletions service/history/historyTestBase.go
Original file line number Diff line number Diff line change
Expand Up @@ -279,7 +279,7 @@ func (s *TestShardContext) UpdateWorkflowExecution(request *persistence.UpdateWo
}
task.SetTaskID(seqID)
s.logger.Infof("%v: TestShardContext: Assigning timer (timestamp: %v, seq: %v)",
common.NewRealTimeSource().Now(), persistence.GetVisibilityTSFrom(task).UTC(), task.GetTaskID())
time.Now(), persistence.GetVisibilityTSFrom(task).UTC(), task.GetTaskID())
}
return s.executionMgr.UpdateWorkflowExecution(request)
}
Expand Down Expand Up @@ -351,7 +351,7 @@ func (s *TestShardContext) GetCurrentTime(cluster string) time.Time {
if cluster != s.GetService().GetClusterMetadata().GetCurrentClusterName() {
return s.standbyClusterCurrentTime[cluster]
}
return common.NewRealTimeSource().Now()
return time.Now()
}

// SetupWorkflowStoreWithOptions to setup workflow test base
Expand Down
16 changes: 10 additions & 6 deletions service/history/mutableStateBuilder.go
Original file line number Diff line number Diff line change
Expand Up @@ -827,7 +827,7 @@ func (e *mutableStateBuilder) IsStickyTaskListEnabled() bool {
}

func (e *mutableStateBuilder) CreateNewHistoryEvent(eventType workflow.EventType) *workflow.HistoryEvent {
return e.CreateNewHistoryEventWithTimestamp(eventType, common.NewRealTimeSource().Now().UnixNano())
return e.CreateNewHistoryEventWithTimestamp(eventType, time.Now().UnixNano())
}

func (e *mutableStateBuilder) CreateNewHistoryEventWithTimestamp(eventType workflow.EventType,
Expand Down Expand Up @@ -1057,7 +1057,7 @@ func (e *mutableStateBuilder) HasParentExecution() bool {
func (e *mutableStateBuilder) UpdateActivityProgress(ai *persistence.ActivityInfo,
request *workflow.RecordActivityTaskHeartbeatRequest) {
ai.Details = request.Details
ai.LastHeartBeatUpdatedTime = common.NewRealTimeSource().Now()
ai.LastHeartBeatUpdatedTime = time.Now()
e.updateActivityInfos[ai] = struct{}{}
}

Expand Down Expand Up @@ -1459,7 +1459,7 @@ func (e *mutableStateBuilder) AddDecisionTaskStartedEvent(scheduleEventID int64,
scheduleID := di.ScheduleID
startedID := scheduleID + 1
tasklist := request.TaskList.GetName()
timestamp := common.NewRealTimeSource().Now().UnixNano()
timestamp := time.Now().UnixNano()
// First check to see if new events came since transient decision was scheduled
if di.Attempt > 0 && di.ScheduleID != e.GetNextEventID() {
// Also create a new DecisionTaskScheduledEvent since new events came in when it was scheduled
Expand Down Expand Up @@ -1713,7 +1713,7 @@ func (e *mutableStateBuilder) AddActivityTaskStartedEvent(ai *persistence.Activi
// instead update mutable state and will record started event when activity task is closed
ai.StartedID = common.TransientEventID
ai.RequestID = requestID
ai.StartedTime = common.NewRealTimeSource().Now()
ai.StartedTime = time.Now()
ai.StartedIdentity = identity
e.UpdateActivity(ai)
return nil
Expand Down Expand Up @@ -2337,8 +2337,12 @@ func (e *mutableStateBuilder) ReplicateWorkflowExecutionContinuedAsNewEvent(sour
TaskList: newExecutionInfo.TaskList,
ScheduleID: di.ScheduleID,
}}
setTaskVersion(newStateBuilder.GetCurrentVersion(), newTransferTasks, nil)
setTransferTaskTimestamp(common.NewFakeTimeSource().Update(time.Unix(0, startedEvent.GetTimestamp())).Now(), newTransferTasks)
setTaskInfo(
newStateBuilder.GetCurrentVersion(),
common.NewFakeTimeSource().Update(time.Unix(0, startedEvent.GetTimestamp())).Now(),
newTransferTasks,
nil,
)

e.continueAsNew = &persistence.CreateWorkflowExecutionRequest{
// NOTE: there is no replication task for the start / decision scheduled event,
Expand Down
9 changes: 4 additions & 5 deletions service/history/queueAckMgr_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@ import (
"github.com/stretchr/testify/suite"
"github.com/uber-common/bark"
"github.com/uber-go/tally"
"github.com/uber/cadence/common"
"github.com/uber/cadence/common/cache"
"github.com/uber/cadence/common/cluster"
"github.com/uber/cadence/common/messaging"
Expand Down Expand Up @@ -124,8 +123,8 @@ func (s *queueAckMgrSuite) SetupTest() {
ShardID: 0,
RangeID: 1,
ClusterTimerAckLevel: map[string]time.Time{
cluster.TestCurrentClusterName: common.NewRealTimeSource().Now().Add(-8 * time.Second),
cluster.TestAlternativeClusterName: common.NewRealTimeSource().Now().Add(-10 * time.Second),
cluster.TestCurrentClusterName: time.Now().Add(-8 * time.Second),
cluster.TestAlternativeClusterName: time.Now().Add(-10 * time.Second),
},
}),
transferSequenceNumber: 1,
Expand Down Expand Up @@ -333,8 +332,8 @@ func (s *queueFailoverAckMgrSuite) SetupTest() {
ShardID: 0,
RangeID: 1,
ClusterTimerAckLevel: map[string]time.Time{
cluster.TestCurrentClusterName: common.NewRealTimeSource().Now(),
cluster.TestAlternativeClusterName: common.NewRealTimeSource().Now().Add(-10 * time.Second),
cluster.TestCurrentClusterName: time.Now(),
cluster.TestAlternativeClusterName: time.Now().Add(-10 * time.Second),
},
}),
transferSequenceNumber: 1,
Expand Down
16 changes: 5 additions & 11 deletions service/history/queueProcessor.go
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,7 @@ processorPumpLoop:
p.processBatch(tasksCh)
case <-pollTimer.C:
pollTimer.Reset(jitter.JitDuration(p.options.MaxPollInterval(), p.options.MaxPollIntervalJitterCoefficient()))
if p.lastPollTime.Add(p.options.MaxPollInterval()).Before(common.NewRealTimeSource().Now()) {
if p.lastPollTime.Add(p.options.MaxPollInterval()).Before(time.Now()) {
p.processBatch(tasksCh)
}
case <-updateAckTimer.C:
Expand All @@ -196,7 +196,7 @@ func (p *queueProcessorBase) processBatch(tasksCh chan<- queueTaskInfo) {
return
}

p.lastPollTime = common.NewRealTimeSource().Now()
p.lastPollTime = time.Now()
tasks, more, err := p.ackMgr.readQueueTasks()

if err != nil {
Expand Down Expand Up @@ -251,7 +251,7 @@ func (p *queueProcessorBase) processWithRetry(notificationChan <-chan struct{},

var logger bark.Logger
var err error
startTime := common.NewRealTimeSource().Now()
startTime := time.Now()

retryCount := 0
op := func() error {
Expand Down Expand Up @@ -283,14 +283,8 @@ ProcessRetryLoop:
if err != nil {
if err == ErrTaskRetry {
p.metricsClient.IncCounter(p.options.MetricScope, metrics.HistoryTaskStandbyRetryCounter)
timestamp := task.GetVisibilityTimestamp()
for {
<-notificationChan
if p.shard.GetCurrentTime(p.clusterName).After(timestamp) {
continue ProcessRetryLoop
}
}
} else if _, ok := err.(*workflow.DomainNotActiveError); ok && common.NewRealTimeSource().Now().Sub(startTime) > cache.DomainCacheRefreshInterval {
<-notificationChan
} else if _, ok := err.(*workflow.DomainNotActiveError); ok && time.Now().Sub(startTime) > cache.DomainCacheRefreshInterval {
p.metricsClient.IncCounter(p.options.MetricScope, metrics.HistoryTaskNotActiveCounter)
return
}
Expand Down
4 changes: 2 additions & 2 deletions service/history/shardContext.go
Original file line number Diff line number Diff line change
Expand Up @@ -619,7 +619,7 @@ func (s *shardContextImpl) allocateTimerIDsLocked(timerTasks []persistence.Task)
// This is not a common scenario, the shard can move and new host might have a time SKU.
// We generate a new timer ID that is above the ack level with an offset.
s.logger.Warnf("%v: New timer generated is less than ack level. timestamp: %v, ackLevel: %v",
common.NewRealTimeSource().Now(), ts, s.shardInfo.TimerAckLevel)
time.Now(), ts, s.shardInfo.TimerAckLevel)
newTimestamp := s.shardInfo.TimerAckLevel
persistence.SetVisibilityTSFrom(task, newTimestamp.Add(time.Second))
}
Expand Down Expand Up @@ -658,7 +658,7 @@ func (s *shardContextImpl) GetCurrentTime(cluster string) time.Time {
if cluster != s.GetService().GetClusterMetadata().GetCurrentClusterName() {
return s.standbyClusterCurrentTime[cluster]
}
return common.NewRealTimeSource().Now()
return time.Now()
}

// TODO: This method has too many parameters. Clean it up. Maybe create a struct to pass in as parameter.
Expand Down
4 changes: 2 additions & 2 deletions service/history/timerBuilder.go
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,7 @@ func (tb *timerBuilder) AddActivityTimeoutTask(scheduleID int64,

timeOutTask := tb.createActivityTimeoutTask(fireTimeout, timeoutType, scheduleID, baseTime)
tb.logger.Debugf("%s: Adding Activity Timeout: with timeout: %v sec, TimeoutType: %v, EventID: %v",
common.NewRealTimeSource().Now(), fireTimeout, timeoutType.String(), timeOutTask.EventID)
time.Now(), fireTimeout, timeoutType.String(), timeOutTask.EventID)
return timeOutTask
}

Expand Down Expand Up @@ -249,7 +249,7 @@ func (tb *timerBuilder) GetActivityTimerTaskIfNeeded(msBuilder mutableState) per
msBuilder.UpdateActivity(ai)

tb.logger.Debugf("%s: Adding Activity Timeout: with timeout: %v sec, ExpiryTime: %s, TimeoutType: %v, EventID: %v",
common.NewRealTimeSource().Now(), td.TimeoutSec, at.VisibilityTimestamp, td.TimeoutType.String(), at.EventID)
time.Now(), td.TimeoutSec, at.VisibilityTimestamp, td.TimeoutType.String(), at.EventID)
}
return timerTask
}
Expand Down
4 changes: 1 addition & 3 deletions service/history/timerGate.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,6 @@ package history
import (
"sync"
"time"

"github.com/uber/cadence/common"
)

type (
Expand Down Expand Up @@ -133,7 +131,7 @@ func (timerGate *LocalTimerGateImpl) FireAfter(now time.Time) bool {
// success means timer is idle or timer is set with a sooner time to fire
func (timerGate *LocalTimerGateImpl) Update(nextTime time.Time) bool {
// NOTE: negative duration will make the timer fire immediately
now := common.NewRealTimeSource().Now()
now := time.Now()

if timerGate.timer.Stop() && timerGate.nextWakeupTime.Before(nextTime) {
// this means the timer, before stopped, is active && next wake up time do not have to be updated
Expand Down
4 changes: 2 additions & 2 deletions service/history/timerQueueActiveProcessor.go
Original file line number Diff line number Diff line change
Expand Up @@ -359,7 +359,7 @@ Update_History_Loop:
createNewTimer = true

t.logger.Debugf("Ignore ActivityTimeout (%v) as retry is needed. New attempt: %v, retry backoff duration: %v.",
timeoutType, ai.Attempt, retryTask.(*persistence.RetryTimerTask).VisibilityTimestamp.Sub(common.NewRealTimeSource().Now()))
timeoutType, ai.Attempt, retryTask.(*persistence.RetryTimerTask).VisibilityTimestamp.Sub(time.Now()))

continue
}
Expand Down Expand Up @@ -421,7 +421,7 @@ Update_History_Loop:
createNewTimer = true

t.logger.Debugf("%s: Adding Activity Timeout: with timeout: %v sec, ExpiryTime: %s, TimeoutType: %v, EventID: %v",
common.NewRealTimeSource().Now(), td.TimeoutSec, at.VisibilityTimestamp, td.TimeoutType.String(), at.EventID)
time.Now(), td.TimeoutSec, at.VisibilityTimestamp, td.TimeoutType.String(), at.EventID)
}

// Done!
Expand Down
Loading

0 comments on commit 440a3a5

Please sign in to comment.