Skip to content

Allow any of the subscribed GruleEngineListener to break out of Cycle during the start of a Cycle. #456

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 9 additions & 11 deletions engine/GruleEngine.go
Original file line number Diff line number Diff line change
@@ -17,11 +17,12 @@ package engine
import (
"context"
"fmt"
"github.com/sirupsen/logrus"
"go.uber.org/zap"
"sort"
"time"

"github.com/sirupsen/logrus"
"go.uber.org/zap"

"github.com/hyperjumptech/grule-rule-engine/ast"
"github.com/hyperjumptech/grule-rule-engine/logger"
)
@@ -48,14 +49,12 @@ func SetLogger(externalLog interface{}) {
case *zap.Logger:
log, ok := externalLog.(*zap.Logger)
if !ok {

return
}
entry = logger.NewZap(log)
case *logrus.Logger:
log, ok := externalLog.(*logrus.Logger)
if !ok {

return
}
entry = logger.NewLogrus(log)
@@ -70,7 +69,6 @@ func SetLogger(externalLog interface{}) {
// NewGruleEngine will create new instance of GruleEngine struct.
// It will set the max cycle to 5000
func NewGruleEngine() *GruleEngine {

return &GruleEngine{
MaxCycle: DefaultCycleCount,
}
@@ -85,7 +83,6 @@ type GruleEngine struct {

// Execute function is the same as ExecuteWithContext(context.Background())
func (g *GruleEngine) Execute(dataCtx ast.IDataContext, knowledge *ast.KnowledgeBase) error {

return g.ExecuteWithContext(context.Background(), dataCtx, knowledge)
}

@@ -121,7 +118,6 @@ func (g *GruleEngine) notifyBeginCycle(cycle uint64) {
// The engine also do conflict resolution of which rule to execute.
func (g *GruleEngine) ExecuteWithContext(ctx context.Context, dataCtx ast.IDataContext, knowledge *ast.KnowledgeBase) error {
if knowledge == nil || dataCtx == nil {

return fmt.Errorf("nil KnowledgeBase or DataContext is not allowed")
}

@@ -163,6 +159,11 @@ func (g *GruleEngine) ExecuteWithContext(ctx context.Context, dataCtx ast.IDataC

g.notifyBeginCycle(cycle + 1)

// If any listener wants to abort the cycle, we will break the loop.
if dataCtx.IsComplete() {
break
}

// Select all rule entry that can be executed.
log.Tracef("Select all rule entry that can be executed.")
runnable := make([]*ast.RuleEntry, 0)
@@ -178,7 +179,6 @@ func (g *GruleEngine) ExecuteWithContext(ctx context.Context, dataCtx ast.IDataC
if err != nil {
log.Errorf("Failed testing condition for rule : %s. Got error %v", ruleEntry.RuleName, err)
if g.ReturnErrOnFailedRuleEvaluation {

return err
}
}
@@ -249,7 +249,6 @@ func (g *GruleEngine) ExecuteWithContext(ctx context.Context, dataCtx ast.IDataC
// Returns []*ast.RuleEntry order by salience
func (g *GruleEngine) FetchMatchingRules(dataCtx ast.IDataContext, knowledge *ast.KnowledgeBase) ([]*ast.RuleEntry, error) {
if knowledge == nil || dataCtx == nil {

return nil, fmt.Errorf("nil KnowledgeBase or DataContext is not allowed")
}

@@ -269,7 +268,7 @@ func (g *GruleEngine) FetchMatchingRules(dataCtx ast.IDataContext, knowledge *as
log.Debugf("Initializing Context")
knowledge.InitializeContext(dataCtx)

//Loop through all the rule entries available in the knowledge base and add to the response list if it is able to evaluate
// Loop through all the rule entries available in the knowledge base and add to the response list if it is able to evaluate
// Select all rule entry that can be executed.
log.Tracef("Select all rule entry that can be executed.")
runnable := make([]*ast.RuleEntry, 0)
@@ -292,7 +291,6 @@ func (g *GruleEngine) FetchMatchingRules(dataCtx ast.IDataContext, knowledge *as
log.Debugf("Matching rules length %d.", len(runnable))
if len(runnable) > 1 {
sort.SliceStable(runnable, func(i, j int) bool {

return runnable[i].Salience > runnable[j].Salience
})
}
120 changes: 107 additions & 13 deletions engine/GruleEngine_test.go
Original file line number Diff line number Diff line change
@@ -237,7 +237,6 @@ const complexRule2 = `rule ComplexRule "test complex rule" salience 10 {
}`

func TestEngine_ComplexRule2(t *testing.T) {

ts := &TestStruct{
Param1: false,
Param2: false,
@@ -274,7 +273,6 @@ const complexRule3 = `rule ComplexRule "test complex rule" salience 10 {
}`

func TestEngine_ComplexRule3(t *testing.T) {

ts := &TestStruct{
Param1: false,
Param2: false,
@@ -312,7 +310,6 @@ const complexRule4 = `rule ComplexRule "test complex rule" salience 10 {
}`

func TestEngine_ComplexRule4(t *testing.T) {

ts := &TestStruct{
Param1: true,
Param2: false,
@@ -347,7 +344,6 @@ const OpPresedenceRule = `rule OpPresedenceRule "test operator presedence" salie
}`

func TestEngine_OperatorPrecedence(t *testing.T) {

ts := &TestStruct{}

dctx := ast.NewDataContext()
@@ -511,7 +507,7 @@ Then
}`

func TestGruleEngine_FetchMatchingRules_Having_Same_Salience(t *testing.T) {
//Given
// Given
fact := &Fact{
Distance: 6000,
Duration: 123,
@@ -526,11 +522,11 @@ func TestGruleEngine_FetchMatchingRules_Having_Same_Salience(t *testing.T) {
kb, err := lib.NewKnowledgeBaseInstance("conflict_rules_test", "0.1.1")
assert.NoError(t, err)

//When
// When
engine := NewGruleEngine()
ruleEntries, err := engine.FetchMatchingRules(dctx, kb)

//Then
// Then
assert.NoError(t, err)
assert.Equal(t, 5, len(ruleEntries))
}
@@ -597,7 +593,7 @@ Then
}`

func TestGruleEngine_FetchMatchingRules_Having_Diff_Salience(t *testing.T) {
//Given
// Given
fact := &Fact{
Distance: 6000,
Duration: 121,
@@ -612,11 +608,11 @@ func TestGruleEngine_FetchMatchingRules_Having_Diff_Salience(t *testing.T) {
kb, err := lib.NewKnowledgeBaseInstance("conflict_rules_test", "0.1.1")
assert.NoError(t, err)

//When
// When
engine := NewGruleEngine()
ruleEntries, err := engine.FetchMatchingRules(dctx, kb)

//Then
// Then
assert.NoError(t, err)
assert.Equal(t, 4, len(ruleEntries))
assert.Equal(t, 8, ruleEntries[0].Salience)
@@ -659,7 +655,7 @@ type LogicalOperatorRuleFact struct {
}

func TestGruleEngine_Follows_logical_operator_precedence(t *testing.T) {
//Given
// Given
fact := &LogicalOperatorRuleFact{
Distance: 2000,
Duration: 121,
@@ -676,12 +672,110 @@ func TestGruleEngine_Follows_logical_operator_precedence(t *testing.T) {
kb, err := lib.NewKnowledgeBaseInstance("logical_operator_rules_test", "0.1.1")
assert.NoError(t, err)

//When
// When
engine := NewGruleEngine()
err = engine.Execute(dctx, kb)

//Then
// Then
assert.NoError(t, err)
assert.Equal(t, fact.Result, true)
assert.Equal(t, fact.NetAmount, float32(143.32))
}

const LevelRuleWithListener = `
rule ProcessThresholds "ProcessThresholds" Salience 1 {
when
DP.Index < DP.Thresholds.Len()
then
DP.ProcessThresholds(DP.Level.Value, DP.Thresholds[DP.Index]);
DP.Index = DP.Index + 1;
Log("Index: " + DP.Index);
}`

type Level struct {
Value int64
}

type Threshold struct {
MinLevel int64
}

type DataPoint struct {
Level *Level // Level that needs to be monitored.
Thresholds []*Threshold // Threshold configuration levels.
TurnedOn int64 // Turn something on for every threshold that breached.
Index int64 // Running index of the threshold that is being processed.
}

func (d *DataPoint) ProcessThresholds(level int64, t *Threshold) {
if d == nil {
return
}
if level > t.MinLevel {
d.TurnedOn++
}
}

type LevelListener struct {
kb *ast.KnowledgeBase
dc ast.IDataContext
dp *DataPoint
}

const MaxTurnOns = int64(2)

func (l *LevelListener) BeginCycle(cycle uint64) {
if l.dp.TurnedOn == MaxTurnOns { // Stop at MaxTurnOns.
l.dc.Complete() // Stop Cycle.
}
}

func (c *LevelListener) EvaluateRuleEntry(cycle uint64, entry *ast.RuleEntry, candidate bool) {
}

func (c *LevelListener) ExecuteRuleEntry(cycle uint64, entry *ast.RuleEntry) {
}

// TestGruleListener is a test function that verifies the behavior of the GruleEngineListener.
// It creates a DataPoint with some thresholds, sets up a GruleEngine with a custom Listener,
// and executes the engine. The test then verifies that the execution has aborted when a global
// condition is met and that rule cycle gets stopped, by validating DataPoint's TurnedOn and Index
// values are set to the expected MaxTurnOns value.
func TestGruleListener(t *testing.T) {
// Given
datapoint := &DataPoint{
Level: &Level{
Value: 1000,
},
Thresholds: []*Threshold{
{MinLevel: 200},
{MinLevel: 400},
{MinLevel: 600},
{MinLevel: 800},
},
Index: 0, // Initialize index to 0.
TurnedOn: int64(0), // Nothing turned on yet.
}

dctx := ast.NewDataContext()
err := dctx.Add("DP", datapoint)
assert.NoError(t, err)

lib := ast.NewKnowledgeLibrary()
rb := builder.NewRuleBuilder(lib)
err = rb.BuildRuleFromResource("TestListener", "0.0.1", pkg.NewBytesResource([]byte(LevelRuleWithListener)))
assert.NoError(t, err)
kb, err := lib.NewKnowledgeBaseInstance("TestListener", "0.0.1")
assert.NoError(t, err)

engine := NewGruleEngine()

// When
engine.Listeners = append(engine.Listeners, &LevelListener{kb: kb, dc: dctx, dp: datapoint})
err = engine.Execute(dctx, kb)
assert.NoError(t, err)

// Then
assert.Equal(t, MaxTurnOns, datapoint.TurnedOn) // Limit at MaxTurnOns.
assert.Equal(t, MaxTurnOns, datapoint.Index)
}