Skip to content

Commit

Permalink
processor,pkg(ticdc): add Span and package spanz (#7722)
Browse files Browse the repository at this point in the history
ref #7720
  • Loading branch information
overvenus authored Nov 26, 2022
1 parent 6eeec0a commit 5dc3c50
Show file tree
Hide file tree
Showing 9 changed files with 1,030 additions and 43 deletions.
79 changes: 79 additions & 0 deletions cdc/processor/tablepb/table.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,13 @@
package tablepb

import (
"bytes"
"encoding"
"encoding/hex"
"encoding/json"
"fmt"
"strconv"
"strings"
"sync/atomic"

"github.com/pingcap/tiflow/cdc/model"
Expand Down Expand Up @@ -61,3 +68,75 @@ type TablePipeline interface {
// RemainEvents return the amount of kv events remain in sorter.
RemainEvents() int64
}

// Key is a custom type for bytes in proto.
type Key []byte

var (
_ fmt.Stringer = Key{}
_ fmt.Stringer = (*Key)(nil)
)

func (k Key) String() string {
return hex.EncodeToString(k)
}

var (
_ json.Marshaler = Key{}
_ json.Marshaler = (*Key)(nil)
)

// MarshalJSON implements json.Marshaler.
// It is helpful to format span in log.
func (k Key) MarshalJSON() ([]byte, error) {
return json.Marshal(k.String())
}

var (
_ encoding.TextMarshaler = Span{}
_ encoding.TextMarshaler = (*Span)(nil)
)

// MarshalText implements encoding.TextMarshaler (used in proto.CompactTextString).
// It is helpful to format span in log.
func (s Span) MarshalText() ([]byte, error) {
return []byte(s.String()), nil
}

func (s *Span) String() string {
length := len("{table_id:,start_key:,end_key:}")
length += 8 // for TableID
length += len(s.StartKey) + len(s.EndKey)
b := strings.Builder{}
b.Grow(length)
b.Write([]byte("{table_id:"))
b.Write([]byte(strconv.Itoa(int(s.TableID))))
if len(s.StartKey) > 0 {
b.Write([]byte(",start_key:"))
b.Write([]byte(s.StartKey.String()))
}
if len(s.EndKey) > 0 {
b.Write([]byte(",end_key:"))
b.Write([]byte(s.EndKey.String()))
}
b.Write([]byte("}"))
return b.String()
}

// Less compares two Spans, defines the order between spans.
func (s *Span) Less(b *Span) bool {
if s.TableID < b.TableID {
return true
}
if bytes.Compare(s.StartKey, b.StartKey) < 0 {
return true
}
return false
}

// Eq compares two Spans, defines the equality between spans.
func (s *Span) Eq(b *Span) bool {
return s.TableID == b.TableID &&
bytes.Equal(s.StartKey, b.StartKey) &&
bytes.Equal(s.EndKey, b.EndKey)
}
Loading

0 comments on commit 5dc3c50

Please sign in to comment.