Skip to content

Commit

Permalink
Add double read for latency comparison for Pinot Migration (#5927)
Browse files Browse the repository at this point in the history
* Add double read for latency comparison for Pinot Migration

* use go routine to make sure the primary read will not fail; update unit tests

* reformat
  • Loading branch information
bowenxia authored Apr 24, 2024
1 parent 882796c commit 0b41007
Show file tree
Hide file tree
Showing 8 changed files with 1,047 additions and 222 deletions.
12 changes: 12 additions & 0 deletions common/dynamicconfig/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -1487,6 +1487,12 @@ const (
// Default value: true
// Allowed filters: DomainName
EnableReadVisibilityFromPinot
// EnableVisibilityDoubleRead is the key for enable double read for a latency comparison
// KeyName: system.EnableVisibilityDoubleRead
// Value type: Bool
// Default value: false
// Allowed filters: DomainName
EnableVisibilityDoubleRead
// EnableLogCustomerQueryParameter is key for enable log customer query parameters
// KeyName: system.enableLogCustomerQueryParameter
// Value type: Bool
Expand Down Expand Up @@ -3863,6 +3869,12 @@ var BoolKeys = map[BoolKey]DynamicBool{
Description: "EnableReadVisibilityFromPinot is key for enable read from pinot or db visibility, usually using with AdvancedVisibilityWritingMode for seamless migration from db visibility to advanced visibility",
DefaultValue: true,
},
EnableVisibilityDoubleRead: {
KeyName: "system.enableVisibilityDoubleRead",
Filters: []Filter{DomainName},
Description: "EnableVisibilityDoubleRead is key for enable read for both elastic search and Pinot for a latency comparison",
DefaultValue: false,
},
EnableLogCustomerQueryParameter: {
KeyName: "system.enableLogCustomerQueryParameter",
Filters: []Filter{DomainName},
Expand Down
1 change: 1 addition & 0 deletions common/persistence/client/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -305,6 +305,7 @@ func (f *factoryImpl) NewVisibilityManager(
resourceConfig.EnableReadVisibilityFromES,
resourceConfig.AdvancedVisibilityWritingMode,
resourceConfig.EnableLogCustomerQueryParameter,
resourceConfig.EnableVisibilityDoubleRead,
f.logger,
), nil
} else if params.PersistenceConfig.AdvancedVisibilityStore != "" {
Expand Down
135 changes: 135 additions & 0 deletions common/persistence/pinot_visibility_triple_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ type (
readModeIsFromES dynamicconfig.BoolPropertyFnWithDomainFilter
writeMode dynamicconfig.StringPropertyFn
logCustomerQueryParameter dynamicconfig.BoolPropertyFnWithDomainFilter
readModeIsDouble dynamicconfig.BoolPropertyFnWithDomainFilter
}
)

Expand Down Expand Up @@ -77,6 +78,7 @@ func NewPinotVisibilityTripleManager(
readModeIsFromES dynamicconfig.BoolPropertyFnWithDomainFilter,
visWritingMode dynamicconfig.StringPropertyFn,
logCustomerQueryParameter dynamicconfig.BoolPropertyFnWithDomainFilter,
readModeIsDouble dynamicconfig.BoolPropertyFnWithDomainFilter,
logger log.Logger,
) VisibilityManager {
if dbVisibilityManager == nil && pinotVisibilityManager == nil && esVisibilityManager == nil {
Expand All @@ -92,6 +94,7 @@ func NewPinotVisibilityTripleManager(
writeMode: visWritingMode,
logger: logger,
logCustomerQueryParameter: logCustomerQueryParameter,
readModeIsDouble: readModeIsDouble,
}
}

Expand Down Expand Up @@ -360,6 +363,34 @@ func filterAttrPrefix(str string) string {
return strings.Replace(str, "`", "", -1)
}

func (v *pinotVisibilityTripleManager) getShadowMgrForDoubleRead(domain string) VisibilityManager {
// invalid cases:
// case0: when it is not double read
if !v.readModeIsDouble(domain) {
return nil
}
// case1: when it is double read, and both advanced visibility are not available
if v.pinotVisibilityManager == nil && v.esVisibilityManager == nil {
return nil
}
// case2: when it is double read, and only one of advanced visibility is available
if v.pinotVisibilityManager == nil || v.esVisibilityManager == nil {
return nil
}

// Valid cases:
// case3: when it is double read, and both advanced visibility are available, and read mode is from Pinot
if v.readModeIsFromPinot(domain) {
return v.esVisibilityManager
}
// case4: when it is double read, and both advanced visibility are available, and read mode is from ES
if v.readModeIsFromES(domain) {
return v.pinotVisibilityManager
}
// exclude all other cases
return nil
}

func (v *pinotVisibilityTripleManager) ListOpenWorkflowExecutions(
ctx context.Context,
request *ListWorkflowExecutionsRequest,
Expand All @@ -373,7 +404,15 @@ func (v *pinotVisibilityTripleManager) ListOpenWorkflowExecutions(
latestTime: request.LatestTime,
}, request.Domain, override != nil)

// get another manager for double read
shadowMgr := v.getShadowMgrForDoubleRead(request.Domain)
// call the API for latency comparison
if shadowMgr != nil {
go shadow(shadowMgr.ListOpenWorkflowExecutions, request, v.logger)
}

manager := v.chooseVisibilityManagerForRead(ctx, request.Domain)
// return result from primary
return manager.ListOpenWorkflowExecutions(ctx, request)
}

Expand All @@ -389,6 +428,14 @@ func (v *pinotVisibilityTripleManager) ListClosedWorkflowExecutions(
earliestTime: request.EarliestTime,
latestTime: request.LatestTime,
}, request.Domain, override != nil)

// get another manager for double read
shadowMgr := v.getShadowMgrForDoubleRead(request.Domain)
// call the API for latency comparison
if shadowMgr != nil {
go shadow(shadowMgr.ListClosedWorkflowExecutions, request, v.logger)
}

manager := v.chooseVisibilityManagerForRead(ctx, request.Domain)
return manager.ListClosedWorkflowExecutions(ctx, request)
}
Expand All @@ -406,6 +453,14 @@ func (v *pinotVisibilityTripleManager) ListOpenWorkflowExecutionsByType(
earliestTime: request.EarliestTime,
latestTime: request.LatestTime,
}, request.Domain, override != nil)

// get another manager for double read
shadowMgr := v.getShadowMgrForDoubleRead(request.Domain)
// call the API for latency comparison
if shadowMgr != nil {
go shadow(shadowMgr.ListOpenWorkflowExecutionsByType, request, v.logger)
}

manager := v.chooseVisibilityManagerForRead(ctx, request.Domain)
return manager.ListOpenWorkflowExecutionsByType(ctx, request)
}
Expand All @@ -423,6 +478,14 @@ func (v *pinotVisibilityTripleManager) ListClosedWorkflowExecutionsByType(
earliestTime: request.EarliestTime,
latestTime: request.LatestTime,
}, request.Domain, override != nil)

// get another manager for double read
shadowMgr := v.getShadowMgrForDoubleRead(request.Domain)
// call the API for latency comparison
if shadowMgr != nil {
go shadow(shadowMgr.ListClosedWorkflowExecutionsByType, request, v.logger)
}

manager := v.chooseVisibilityManagerForRead(ctx, request.Domain)
return manager.ListClosedWorkflowExecutionsByType(ctx, request)
}
Expand All @@ -440,6 +503,14 @@ func (v *pinotVisibilityTripleManager) ListOpenWorkflowExecutionsByWorkflowID(
earliestTime: request.EarliestTime,
latestTime: request.LatestTime,
}, request.Domain, override != nil)

// get another manager for double read
shadowMgr := v.getShadowMgrForDoubleRead(request.Domain)
// call the API for latency comparison
if shadowMgr != nil {
go shadow(shadowMgr.ListOpenWorkflowExecutionsByWorkflowID, request, v.logger)
}

manager := v.chooseVisibilityManagerForRead(ctx, request.Domain)
return manager.ListOpenWorkflowExecutionsByWorkflowID(ctx, request)
}
Expand All @@ -457,6 +528,14 @@ func (v *pinotVisibilityTripleManager) ListClosedWorkflowExecutionsByWorkflowID(
earliestTime: request.EarliestTime,
latestTime: request.LatestTime,
}, request.Domain, override != nil)

// get another manager for double read
shadowMgr := v.getShadowMgrForDoubleRead(request.Domain)
// call the API for latency comparison
if shadowMgr != nil {
go shadow(shadowMgr.ListClosedWorkflowExecutionsByWorkflowID, request, v.logger)
}

manager := v.chooseVisibilityManagerForRead(ctx, request.Domain)
return manager.ListClosedWorkflowExecutionsByWorkflowID(ctx, request)
}
Expand All @@ -473,6 +552,14 @@ func (v *pinotVisibilityTripleManager) ListClosedWorkflowExecutionsByStatus(
earliestTime: request.EarliestTime,
latestTime: request.LatestTime,
}, request.Domain, override != nil)

// get another manager for double read
shadowMgr := v.getShadowMgrForDoubleRead(request.Domain)
// call the API for latency comparison
if shadowMgr != nil {
go shadow(shadowMgr.ListClosedWorkflowExecutionsByStatus, request, v.logger)
}

manager := v.chooseVisibilityManagerForRead(ctx, request.Domain)
return manager.ListClosedWorkflowExecutionsByStatus(ctx, request)
}
Expand All @@ -492,6 +579,14 @@ func (v *pinotVisibilityTripleManager) GetClosedWorkflowExecution(
earliestTime: earlistTime,
latestTime: latestTime,
}, request.Domain, override != nil)

// get another manager for double read
shadowMgr := v.getShadowMgrForDoubleRead(request.Domain)
// call the API for latency comparison
if shadowMgr != nil {
go shadow(shadowMgr.GetClosedWorkflowExecution, request, v.logger)
}

manager := v.chooseVisibilityManagerForRead(ctx, request.Domain)
return manager.GetClosedWorkflowExecution(ctx, request)
}
Expand All @@ -509,6 +604,14 @@ func (v *pinotVisibilityTripleManager) ListWorkflowExecutions(
earliestTime: -1,
latestTime: -1,
}, request.Domain, override != nil)

// get another manager for double read
shadowMgr := v.getShadowMgrForDoubleRead(request.Domain)
// call the API for latency comparison
if shadowMgr != nil {
go shadow(shadowMgr.ListWorkflowExecutions, request, v.logger)
}

manager := v.chooseVisibilityManagerForRead(ctx, request.Domain)
return manager.ListWorkflowExecutions(ctx, request)
}
Expand All @@ -526,6 +629,14 @@ func (v *pinotVisibilityTripleManager) ScanWorkflowExecutions(
earliestTime: -1,
latestTime: -1,
}, request.Domain, override != nil)

// get another manager for double read
shadowMgr := v.getShadowMgrForDoubleRead(request.Domain)
// call the API for latency comparison
if shadowMgr != nil {
go shadow(shadowMgr.ScanWorkflowExecutions, request, v.logger)
}

manager := v.chooseVisibilityManagerForRead(ctx, request.Domain)
return manager.ScanWorkflowExecutions(ctx, request)
}
Expand All @@ -543,6 +654,14 @@ func (v *pinotVisibilityTripleManager) CountWorkflowExecutions(
earliestTime: -1,
latestTime: -1,
}, request.Domain, override != nil)

// get another manager for double read
shadowMgr := v.getShadowMgrForDoubleRead(request.Domain)
// call the API for latency comparison
if shadowMgr != nil {
go shadow(shadowMgr.CountWorkflowExecutions, request, v.logger)
}

manager := v.chooseVisibilityManagerForRead(ctx, request.Domain)
return manager.CountWorkflowExecutions(ctx, request)
}
Expand Down Expand Up @@ -584,3 +703,19 @@ func (v *pinotVisibilityTripleManager) chooseVisibilityManagerForRead(ctx contex
}
return visibilityMgr
}

func shadow[ReqT any, ResT any](f func(ctx context.Context, request ReqT) (ResT, error), request ReqT, logger log.Logger) {
ctxNew, cancel := context.WithTimeout(context.Background(), 2*time.Minute) // don't want f to run too long

defer cancel()
defer func() {
if r := recover(); r != nil {
logger.Info(fmt.Sprintf("Recovered in Shadow function in double read: %v", r))
}
}()

_, err := f(ctxNew, request)
if err != nil {
logger.Error(fmt.Sprintf("Error in Shadow function in double read: %s", err.Error()))
}
}
Loading

0 comments on commit 0b41007

Please sign in to comment.