Skip to content

Commit

Permalink
fix: check rule exists before create (#2741)
Browse files Browse the repository at this point in the history
Signed-off-by: yisaer <disxiaofei@163.com>
  • Loading branch information
Yisaer authored and ngjaying committed Mar 29, 2024
1 parent a52d94f commit 7404c57
Show file tree
Hide file tree
Showing 3 changed files with 40 additions and 4 deletions.
6 changes: 6 additions & 0 deletions internal/processor/rule.go
Original file line number Diff line number Diff line change
Expand Up @@ -235,6 +235,12 @@ func clone(opt api.RuleOption) *api.RuleOption {
}
}

func (p *RuleProcessor) ExecExists(name string) bool {
var s1 string
f, _ := p.db.Get(name, &s1)
return f
}

func (p *RuleProcessor) ExecDesc(name string) (string, error) {
var s1 string
f, _ := p.db.Get(name, &s1)
Expand Down
23 changes: 23 additions & 0 deletions internal/server/rest_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -793,3 +793,26 @@ func (suite *RestTestSuite) TestCreateRuleReplacePasswd() {
require.True(suite.T(), ok)
require.Equal(suite.T(), "4444", c["password"])
}

func (suite *RestTestSuite) TestCreateDuplicateRule() {
buf1 := bytes.NewBuffer([]byte(`{"sql":"CREATE stream demo123() WITH (DATASOURCE=\"0\", TYPE=\"mqtt\")"}`))
req1, _ := http.NewRequest(http.MethodPost, "http://localhost:8080/streams", buf1)
w1 := httptest.NewRecorder()
suite.r.ServeHTTP(w1, req1)

ruleJson2 := `{"id":"test12345","triggered":false,"sql":"select * from demo123","actions":[{"log":{}}]}`
buf2 := bytes.NewBuffer([]byte(ruleJson2))
req2, _ := http.NewRequest(http.MethodPost, "http://localhost:8080/rules", buf2)
w2 := httptest.NewRecorder()
suite.r.ServeHTTP(w2, req2)
require.Equal(suite.T(), http.StatusCreated, w2.Code)

buf2 = bytes.NewBuffer([]byte(ruleJson2))
req2, _ = http.NewRequest(http.MethodPost, "http://localhost:8080/rules", buf2)
w2 = httptest.NewRecorder()
suite.r.ServeHTTP(w2, req2)
require.Equal(suite.T(), http.StatusBadRequest, w2.Code)
var returnVal []byte
returnVal, _ = io.ReadAll(w2.Result().Body)
require.Equal(suite.T(), `{"error":1000,"message":"rule test12345 already exists"}`+"\n", string(returnVal))
}
15 changes: 11 additions & 4 deletions internal/server/rule_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,16 +74,19 @@ func (rr *RuleRegistry) Delete(key string) (*rule.RuleState, bool) {
return result, ok
}

func createRule(name, ruleJson string) (string, error) {
func createRule(name, ruleJson string) (id string, err error) {
var rs *rule.RuleState = nil
var err error = nil

// Validate the rule json
r, err := ruleProcessor.GetRuleByJson(name, ruleJson)
if err != nil {
return "", fmt.Errorf("invalid rule json: %v", err)
}

if exists := ruleProcessor.ExecExists(r.Id); exists {
return r.Id, fmt.Errorf("rule %v already exists", r.Id)
}

// Validate the topo
err = infra.SafeRun(func() error {
rs, err = createRuleState(r)
Expand All @@ -92,12 +95,16 @@ func createRule(name, ruleJson string) (string, error) {
if err != nil {
return r.Id, err
}
defer func() {
if err != nil {
// Do not store to registry so also delete the KV
deleteRule(id)
}
}()

// Store to KV
err = ruleProcessor.ExecCreate(r.Id, ruleJson)
if err != nil {
// Do not store to registry so also delete the KV
deleteRule(r.Id)
return r.Id, fmt.Errorf("store the rule error: %v", err)
}

Expand Down

0 comments on commit 7404c57

Please sign in to comment.