Skip to content

Commit

Permalink
feature: add blacklist
Browse files Browse the repository at this point in the history
  • Loading branch information
kernelai committed May 27, 2020
1 parent 8d484c7 commit 6d6be67
Show file tree
Hide file tree
Showing 7 changed files with 97 additions and 14 deletions.
15 changes: 13 additions & 2 deletions cmd/admin/dashboard.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,8 @@ func (t *cmdDashboard) Main(d map[string]interface{}) {
case d["--get-table-meta"].(bool):
fallthrough
case d["--set-table-meta"].(bool):
fallthrough
case d["--set-table-block"].(bool):
t.handleTableCommand(d)

case d["--sentinel-add"].(bool):
Expand Down Expand Up @@ -753,9 +755,8 @@ func (t *cmdDashboard) handleTableCommand(d map[string]interface{}) {
log.Debugf("call rpc table-list OK")

for _, t := range s.Table {
fmt.Printf("table ID: %d, table name: %s, slots num: %d, auth: %s\n", t.Id, t.Name, t.MaxSlotMum, t.Auth)
fmt.Printf("table ID: %d, table name: %s, slots num: %d, auth: %s, blocked: %t\n", t.Id, t.Name, t.MaxSlotMum, t.Auth, t.IsBlocked)
}
fmt.Println()
case d["--set-table-meta"].(bool):

tid := utils.ArgumentIntegerMust(d, "--tid")
Expand All @@ -778,6 +779,16 @@ func (t *cmdDashboard) handleTableCommand(d map[string]interface{}) {
log.PanicErrorf(err, "json marshal failed")
}
fmt.Println(string(b))
case d["--set-table-block"].(bool):

tid := utils.ArgumentIntegerMust(d, "--tid")
value := d["--enable"].(bool)

log.Debugf("call rpc set-table-block to dashboard %s", t.addr)
if err := c.SetTableBlock(tid, value); err != nil {
log.PanicErrorf(err, "call rpc set-table-block to dashboard %s failed", t.addr)
}
log.Debugf("call rpc set-table-block OK")
}
}

Expand Down
1 change: 1 addition & 0 deletions cmd/admin/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ Usage:
codis-admin [-v] --dashboard=ADDR --remove-table --tid=ID
codis-admin [-v] --dashboard=ADDR --get-table-meta
codis-admin [-v] --dashboard=ADDR --set-table-meta --tid=ID
codis-admin [-v] --dashboard=ADDR --set-table-block --tid=ID (--enable|--disable)
codis-admin [-v] --dashboard=ADDR --list-group
codis-admin [-v] --dashboard=ADDR --create-group --gid=ID
codis-admin [-v] --dashboard=ADDR --remove-group --gid=ID
Expand Down
3 changes: 1 addition & 2 deletions pkg/models/table.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,7 @@ type Table struct {
Name string `json:"name"`
MaxSlotMum int `json:"max_slot_mum"`
Auth string `json:"auth,omitempty"`
// Slots []*Slot `json:"slots"`
// Group []*Group `json:"groups"`
IsBlocked bool `json:"is_blocked,omitempty"`
}

type TableMeta struct {
Expand Down
11 changes: 11 additions & 0 deletions pkg/proxy/router.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,7 @@ var (
ErrInvalidSlotId = errors.New("use of invalid slot id")
ErrInvalidTableId = errors.New("use of invalid table id")
ErrInvalidMethod = errors.New("use of invalid forwarder method")
ErrTableBlocked = errors.New("table is blocked")
)


Expand All @@ -135,6 +136,16 @@ func (s *Router) GetTable(tid int) *models.Table {
return nil
}

func (s *Router) isBlocked(tid int) (bool, error) {
s.tableMu.RLock()
defer s.tableMu.RUnlock()
if t, ok := s.table[tid]; ok == true {
return t.IsBlocked, nil
} else {
return false, ErrInvalidTableId
}
}

func (s *Router) getSlotNum(tid int) (int, error) {
s.tableMu.RLock()
defer s.tableMu.RUnlock()
Expand Down
22 changes: 14 additions & 8 deletions pkg/proxy/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ type Session struct {
start sync.Once

broken atomic2.Bool
blocked bool
config *Config

authorized bool
Expand Down Expand Up @@ -76,7 +77,7 @@ func NewSession(sock net.Conn, config *Config) *Session {
s := &Session{
Conn: c, config: config,
CreateUnix: time.Now().Unix(),
authorizeTable: -1,
authorizeTable: -1, blocked:false,
}
s.stats.opmap = make(map[string]*opStats, 16)
log.Infof("session [%p] create: %s", s, s)
Expand Down Expand Up @@ -194,6 +195,9 @@ func (s *Session) loopReader(tasks *RequestChan, d *Router) (err error) {
}
} else {
tasks.PushBack(r)
if s.blocked == true {
return nil
}
}
}
return nil
Expand Down Expand Up @@ -283,14 +287,16 @@ func (s *Session) handleRequest(r *Request, d *Router) error {
r.Resp = redis.NewErrorf("DB-[%d] NOAUTH Authentication required", s.database)
return nil
}
// if !s.authorized {
// if s.config.SessionAuth != "" {
// r.Resp = redis.NewErrorf("NOAUTH Authentication required")
// return nil
// }
// s.authorized = true
// }

if b, err := d.isBlocked(r.Database); err != nil {
return err
} else {
s.blocked = b
if b == true {
r.Resp = redis.NewErrorf("DB-[%d] is blocked because of in the blacklist", s.database)
return nil
}
}
switch opstr {
case "PING":
return s.handleRequestPing(r, d)
Expand Down
34 changes: 34 additions & 0 deletions pkg/topom/topom_api.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,7 @@ func newApiServer(t *Topom) http.Handler {
r.Put("/remove/:xauth/:tid", api.RemoveTable)
r.Put("/rename/:xauth/:tid/:name/:auth", api.RenameTable)
r.Put("/meta/:xauth/:tid", api.SetTableMeta)
r.Put("/block/:xauth/:tid/:block", api.SetTableBlock)
r.Get("/list/:xauth/:tid", api.ListTable)
r.Get("/get/:xauth/:tid", api.GetTable)
r.Get("/meta/:xauth", api.GetTableMeta)
Expand Down Expand Up @@ -608,6 +609,34 @@ func (s *apiServer) SetTableMeta(params martini.Params) (int, string) {
}
}

func (s *apiServer) SetTableBlock(params martini.Params) (int, string) {
if err := s.verifyXAuth(params); err != nil {
return rpc.ApiResponseError(err)
}
tid, err := s.parseInteger(params, "tid")
if err != nil {
return rpc.ApiResponseError(err)
}
text, err := s.parseString(params, "block")
if err != nil {
return rpc.ApiResponseError(err)
}
var block bool
switch text {
case "true":
block = true
case "false":
block = false
default:
return rpc.ApiResponseError(fmt.Errorf("block accept bool type"))
}
if err := s.topom.SetTableBlock(tid, block); err != nil {
return rpc.ApiResponseError(err)
} else {
return rpc.ApiResponseJson("OK")
}
}

func (s *apiServer) AddSentinel(params martini.Params) (int, string) {
if err := s.verifyXAuth(params); err != nil {
return rpc.ApiResponseError(err)
Expand Down Expand Up @@ -1167,6 +1196,11 @@ func (c *ApiClient) SetTableMeta( tid int) error {
return rpc.ApiPutJson(url, nil, nil)
}

func (c *ApiClient) SetTableBlock(tid int, block bool) error {
url := c.encodeURL("/api/topom/table/block/%s/%d/%t", c.xauth, tid, block)
return rpc.ApiPutJson(url, nil, nil)
}

func (c *ApiClient) AddSentinel(addr string) error {
url := c.encodeURL("/api/topom/sentinels/add/%s/%s", c.xauth, addr)
return rpc.ApiPutJson(url, nil, nil)
Expand Down
25 changes: 23 additions & 2 deletions pkg/topom/topom_table.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ func (s *Topom) CreateTable(name string, num ,id int, auth string) error {
Name: name,
MaxSlotMum: num,
Auth: auth,
IsBlocked: false,
}
if err := s.storeCreateTable(t); err != nil {
return err
Expand Down Expand Up @@ -173,5 +174,25 @@ func (s *Topom) SetTableMeta(id int) error{
return s.storeCreateTableMeta(tm)
}



func (s *Topom) SetTableBlock(tid int, isBlock bool) error {
s.mu.Lock()
defer s.mu.Unlock()
ctx, err := s.newContext()
if err != nil {
return err
}
t, ok := ctx.table[tid]
if ok == false {
return errors.Errorf("table-[%d] dose not exist", tid)
}
defer s.dirtyTableCache(tid)
t.IsBlocked = isBlock
if err := s.storeUpdateTable(t); err != nil {
return err
}
if err := s.syncFillTable(ctx, t); err != nil {
log.Warnf("table-[%s] tid-[%d] sync to proxy failed", t.Name, t.Id)
return err
}
return nil
}

0 comments on commit 6d6be67

Please sign in to comment.