Skip to content

Commit

Permalink
sink(ticdc): use generic slice allocator (#5038)
Browse files Browse the repository at this point in the history
close #5037
  • Loading branch information
sunxiaoguang authored Apr 1, 2022
1 parent c532bc1 commit 6948589
Showing 1 changed file with 28 additions and 326 deletions.
354 changes: 28 additions & 326 deletions cdc/sink/codec/craft/buffer.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,26 +28,26 @@ func newBufferSize(oldSize int) int {
return newSize
}

// int slice allocator
type intSliceAllocator struct {
buffer []int
// generic slice allocator
type sliceAllocator[T any] struct {
buffer []T
offset int
}

//nolint:unused
func (b *intSliceAllocator) realloc(old []int, newSize int) []int {
func (b *sliceAllocator[T]) realloc(old []T, newSize int) []T {
n := b.alloc(newSize)
copy(n, old)
return n
}

func (b *intSliceAllocator) alloc(size int) []int {
func (b *sliceAllocator[T]) alloc(size int) []T {
if len(b.buffer)-b.offset < size {
if size > len(b.buffer)/4 {
// large allocation
return make([]int, size)
return make([]T, size)
}
b.buffer = make([]int, len(b.buffer))
b.buffer = make([]T, len(b.buffer))
b.offset = 0
}
result := b.buffer[b.offset : b.offset+size]
Expand All @@ -56,339 +56,41 @@ func (b *intSliceAllocator) alloc(size int) []int {
}

//nolint:unused
func (b *intSliceAllocator) one(x int) []int {
func (b *sliceAllocator[T]) one(x T) []T {
r := b.alloc(1)
r[0] = x
return r
}

func newIntSliceAllocator(batchSize int) *intSliceAllocator {
return &intSliceAllocator{buffer: make([]int, batchSize)}
}

// int64 slice allocator
type int64SliceAllocator struct {
buffer []int64
offset int
}

func (b *int64SliceAllocator) realloc(old []int64, newSize int) []int64 {
n := b.alloc(newSize)
copy(n, old)
return n
}

func (b *int64SliceAllocator) alloc(size int) []int64 {
if len(b.buffer)-b.offset < size {
if size > len(b.buffer)/4 {
// large allocation
return make([]int64, size)
}
b.buffer = make([]int64, len(b.buffer))
b.offset = 0
}
result := b.buffer[b.offset : b.offset+size]
b.offset += size
return result
}

//nolint:unused
func (b *int64SliceAllocator) one(x int64) []int64 {
r := b.alloc(1)
r[0] = x
return r
}

func newInt64SliceAllocator(batchSize int) *int64SliceAllocator {
return &int64SliceAllocator{buffer: make([]int64, batchSize)}
}

// uint64 slice allocator
type uint64SliceAllocator struct {
buffer []uint64
offset int
}

func (b *uint64SliceAllocator) realloc(old []uint64, newSize int) []uint64 {
n := b.alloc(newSize)
copy(n, old)
return n
}

func (b *uint64SliceAllocator) alloc(size int) []uint64 {
if len(b.buffer)-b.offset < size {
if size > len(b.buffer)/4 {
// large allocation
return make([]uint64, size)
}
b.buffer = make([]uint64, len(b.buffer))
b.offset = 0
}
result := b.buffer[b.offset : b.offset+size]
b.offset += size
return result
}

func (b *uint64SliceAllocator) one(x uint64) []uint64 {
r := b.alloc(1)
r[0] = x
return r
}

func newUint64SliceAllocator(batchSize int) *uint64SliceAllocator {
return &uint64SliceAllocator{buffer: make([]uint64, batchSize)}
}

// string slice allocator
type stringSliceAllocator struct {
buffer []string
offset int
}

//nolint:unused
func (b *stringSliceAllocator) realloc(old []string, newSize int) []string {
n := b.alloc(newSize)
copy(n, old)
return n
}

func (b *stringSliceAllocator) alloc(size int) []string {
if len(b.buffer)-b.offset < size {
if size > len(b.buffer)/4 {
// large allocation
return make([]string, size)
}
b.buffer = make([]string, len(b.buffer))
b.offset = 0
}
result := b.buffer[b.offset : b.offset+size]
b.offset += size
return result
}

//nolint:unused
func (b *stringSliceAllocator) one(x string) []string {
r := b.alloc(1)
r[0] = x
return r
}

func newStringSliceAllocator(batchSize int) *stringSliceAllocator {
return &stringSliceAllocator{buffer: make([]string, batchSize)}
}

// nullable string slice allocator
type nullableStringSliceAllocator struct {
buffer []*string
offset int
}

func (b *nullableStringSliceAllocator) realloc(old []*string, newSize int) []*string {
n := b.alloc(newSize)
copy(n, old)
return n
}

func (b *nullableStringSliceAllocator) alloc(size int) []*string {
if len(b.buffer)-b.offset < size {
if size > len(b.buffer)/4 {
// large allocation
return make([]*string, size)
}
b.buffer = make([]*string, len(b.buffer))
b.offset = 0
}
result := b.buffer[b.offset : b.offset+size]
b.offset += size
return result
}

func (b *nullableStringSliceAllocator) one(x *string) []*string {
r := b.alloc(1)
r[0] = x
return r
}

func newNullableStringSliceAllocator(batchSize int) *nullableStringSliceAllocator {
return &nullableStringSliceAllocator{buffer: make([]*string, batchSize)}
}

// byte slice allocator
type byteSliceAllocator struct {
buffer []byte
offset int
}

//nolint:unused
func (b *byteSliceAllocator) realloc(old []byte, newSize int) []byte {
n := b.alloc(newSize)
copy(n, old)
return n
}

func (b *byteSliceAllocator) alloc(size int) []byte {
if len(b.buffer)-b.offset < size {
if size > len(b.buffer)/4 {
// large allocation
return make([]byte, size)
}
b.buffer = make([]byte, len(b.buffer))
b.offset = 0
}
result := b.buffer[b.offset : b.offset+size]
b.offset += size
return result
}

//nolint:unused
func (b *byteSliceAllocator) one(x byte) []byte {
r := b.alloc(1)
r[0] = x
return r
}

func newByteSliceAllocator(batchSize int) *byteSliceAllocator {
return &byteSliceAllocator{buffer: make([]byte, batchSize)}
}

// bytes slice allocator
type bytesSliceAllocator struct {
buffer [][]byte
offset int
}

//nolint:unused
func (b *bytesSliceAllocator) realloc(old [][]byte, newSize int) [][]byte {
n := b.alloc(newSize)
copy(n, old)
return n
}

func (b *bytesSliceAllocator) alloc(size int) [][]byte {
if len(b.buffer)-b.offset < size {
if size > len(b.buffer)/4 {
// large allocation
return make([][]byte, size)
}
b.buffer = make([][]byte, len(b.buffer))
b.offset = 0
}
result := b.buffer[b.offset : b.offset+size]
b.offset += size
return result
}

//nolint:unused
func (b *bytesSliceAllocator) one(x []byte) [][]byte {
r := b.alloc(1)
r[0] = x
return r
}

func newBytesSliceAllocator(batchSize int) *bytesSliceAllocator {
return &bytesSliceAllocator{buffer: make([][]byte, batchSize)}
}

// columnGroup slice allocator
type columnGroupSliceAllocator struct {
buffer []*columnGroup
offset int
}

//nolint:unused
func (b *columnGroupSliceAllocator) realloc(old []*columnGroup, newSize int) []*columnGroup {
n := b.alloc(newSize)
copy(n, old)
return n
}

func (b *columnGroupSliceAllocator) alloc(size int) []*columnGroup {
if len(b.buffer)-b.offset < size {
if size > len(b.buffer)/4 {
// large allocation
return make([]*columnGroup, size)
}
b.buffer = make([]*columnGroup, len(b.buffer))
b.offset = 0
}
result := b.buffer[b.offset : b.offset+size]
b.offset += size
return result
}

//nolint:unused
func (b *columnGroupSliceAllocator) one(x *columnGroup) []*columnGroup {
r := b.alloc(1)
r[0] = x
return r
}

func newColumnGroupSliceAllocator(batchSize int) *columnGroupSliceAllocator {
return &columnGroupSliceAllocator{buffer: make([]*columnGroup, batchSize)}
}

// rowChangedEvent slice allocator
type rowChangedEventSliceAllocator struct {
buffer []rowChangedEvent
offset int
}

func (b *rowChangedEventSliceAllocator) realloc(old []rowChangedEvent, newSize int) []rowChangedEvent {
n := b.alloc(newSize)
copy(n, old)
return n
}

func (b *rowChangedEventSliceAllocator) alloc(size int) []rowChangedEvent {
if len(b.buffer)-b.offset < size {
if size > len(b.buffer)/4 {
// large allocation
return make([]rowChangedEvent, size)
}
b.buffer = make([]rowChangedEvent, len(b.buffer))
b.offset = 0
}
result := b.buffer[b.offset : b.offset+size]
b.offset += size
return result
}

//nolint:unused
func (b *rowChangedEventSliceAllocator) one(x rowChangedEvent) []rowChangedEvent {
r := b.alloc(1)
r[0] = x
return r
}

func newRowChangedEventSliceAllocator(batchSize int) *rowChangedEventSliceAllocator {
return &rowChangedEventSliceAllocator{buffer: make([]rowChangedEvent, batchSize)}
func newGenericSliceAllocator[T any](batchSize int) *sliceAllocator[T] {
return &sliceAllocator[T]{buffer: make([]T, batchSize)}
}

// SliceAllocator for different slice types
type SliceAllocator struct {
intAllocator *intSliceAllocator
int64Allocator *int64SliceAllocator
uint64Allocator *uint64SliceAllocator
stringAllocator *stringSliceAllocator
nullableStringAllocator *nullableStringSliceAllocator
byteAllocator *byteSliceAllocator
bytesAllocator *bytesSliceAllocator
columnGroupAllocator *columnGroupSliceAllocator
rowChangedEventAllocator *rowChangedEventSliceAllocator
intAllocator *sliceAllocator[int]
int64Allocator *sliceAllocator[int64]
uint64Allocator *sliceAllocator[uint64]
stringAllocator *sliceAllocator[string]
nullableStringAllocator *sliceAllocator[*string]
byteAllocator *sliceAllocator[byte]
bytesAllocator *sliceAllocator[[]byte]
columnGroupAllocator *sliceAllocator[*columnGroup]
rowChangedEventAllocator *sliceAllocator[rowChangedEvent]
}

// NewSliceAllocator creates a new slice allocator with given batch allocation size.
func NewSliceAllocator(batchSize int) *SliceAllocator {
return &SliceAllocator{
intAllocator: newIntSliceAllocator(batchSize),
int64Allocator: newInt64SliceAllocator(batchSize),
uint64Allocator: newUint64SliceAllocator(batchSize),
stringAllocator: newStringSliceAllocator(batchSize),
nullableStringAllocator: newNullableStringSliceAllocator(batchSize),
byteAllocator: newByteSliceAllocator(batchSize),
bytesAllocator: newBytesSliceAllocator(batchSize),
columnGroupAllocator: newColumnGroupSliceAllocator(batchSize),
rowChangedEventAllocator: newRowChangedEventSliceAllocator(batchSize),
intAllocator: newGenericSliceAllocator[int](batchSize),
int64Allocator: newGenericSliceAllocator[int64](batchSize),
uint64Allocator: newGenericSliceAllocator[uint64](batchSize),
stringAllocator: newGenericSliceAllocator[string](batchSize),
nullableStringAllocator: newGenericSliceAllocator[*string](batchSize),
byteAllocator: newGenericSliceAllocator[byte](batchSize),
bytesAllocator: newGenericSliceAllocator[[]byte](batchSize),
columnGroupAllocator: newGenericSliceAllocator[*columnGroup](batchSize),
rowChangedEventAllocator: newGenericSliceAllocator[rowChangedEvent](batchSize),
}
}

Expand Down

0 comments on commit 6948589

Please sign in to comment.