From 7404c57c71c3b16125192c927d852919295ee1ad Mon Sep 17 00:00:00 2001 From: Song Gao Date: Wed, 27 Mar 2024 16:51:52 +0800 Subject: [PATCH] fix: check rule exists before create (#2741) Signed-off-by: yisaer --- internal/processor/rule.go | 6 ++++++ internal/server/rest_test.go | 23 +++++++++++++++++++++++ internal/server/rule_manager.go | 15 +++++++++++---- 3 files changed, 40 insertions(+), 4 deletions(-) diff --git a/internal/processor/rule.go b/internal/processor/rule.go index 5a677585a5..4458702e78 100644 --- a/internal/processor/rule.go +++ b/internal/processor/rule.go @@ -235,6 +235,12 @@ func clone(opt api.RuleOption) *api.RuleOption { } } +func (p *RuleProcessor) ExecExists(name string) bool { + var s1 string + f, _ := p.db.Get(name, &s1) + return f +} + func (p *RuleProcessor) ExecDesc(name string) (string, error) { var s1 string f, _ := p.db.Get(name, &s1) diff --git a/internal/server/rest_test.go b/internal/server/rest_test.go index 6a5a8d7ca5..afe9710b21 100644 --- a/internal/server/rest_test.go +++ b/internal/server/rest_test.go @@ -793,3 +793,26 @@ func (suite *RestTestSuite) TestCreateRuleReplacePasswd() { require.True(suite.T(), ok) require.Equal(suite.T(), "4444", c["password"]) } + +func (suite *RestTestSuite) TestCreateDuplicateRule() { + buf1 := bytes.NewBuffer([]byte(`{"sql":"CREATE stream demo123() WITH (DATASOURCE=\"0\", TYPE=\"mqtt\")"}`)) + req1, _ := http.NewRequest(http.MethodPost, "http://localhost:8080/streams", buf1) + w1 := httptest.NewRecorder() + suite.r.ServeHTTP(w1, req1) + + ruleJson2 := `{"id":"test12345","triggered":false,"sql":"select * from demo123","actions":[{"log":{}}]}` + buf2 := bytes.NewBuffer([]byte(ruleJson2)) + req2, _ := http.NewRequest(http.MethodPost, "http://localhost:8080/rules", buf2) + w2 := httptest.NewRecorder() + suite.r.ServeHTTP(w2, req2) + require.Equal(suite.T(), http.StatusCreated, w2.Code) + + buf2 = bytes.NewBuffer([]byte(ruleJson2)) + req2, _ = http.NewRequest(http.MethodPost, "http://localhost:8080/rules", buf2) + w2 = httptest.NewRecorder() + suite.r.ServeHTTP(w2, req2) + require.Equal(suite.T(), http.StatusBadRequest, w2.Code) + var returnVal []byte + returnVal, _ = io.ReadAll(w2.Result().Body) + require.Equal(suite.T(), `{"error":1000,"message":"rule test12345 already exists"}`+"\n", string(returnVal)) +} diff --git a/internal/server/rule_manager.go b/internal/server/rule_manager.go index ea8a1b48ba..6faf00bd8b 100644 --- a/internal/server/rule_manager.go +++ b/internal/server/rule_manager.go @@ -74,9 +74,8 @@ func (rr *RuleRegistry) Delete(key string) (*rule.RuleState, bool) { return result, ok } -func createRule(name, ruleJson string) (string, error) { +func createRule(name, ruleJson string) (id string, err error) { var rs *rule.RuleState = nil - var err error = nil // Validate the rule json r, err := ruleProcessor.GetRuleByJson(name, ruleJson) @@ -84,6 +83,10 @@ func createRule(name, ruleJson string) (string, error) { return "", fmt.Errorf("invalid rule json: %v", err) } + if exists := ruleProcessor.ExecExists(r.Id); exists { + return r.Id, fmt.Errorf("rule %v already exists", r.Id) + } + // Validate the topo err = infra.SafeRun(func() error { rs, err = createRuleState(r) @@ -92,12 +95,16 @@ func createRule(name, ruleJson string) (string, error) { if err != nil { return r.Id, err } + defer func() { + if err != nil { + // Do not store to registry so also delete the KV + deleteRule(id) + } + }() // Store to KV err = ruleProcessor.ExecCreate(r.Id, ruleJson) if err != nil { - // Do not store to registry so also delete the KV - deleteRule(r.Id) return r.Id, fmt.Errorf("store the rule error: %v", err) }