Skip to content

Commit

Permalink
support the replicate id param for the cdc server
Browse files Browse the repository at this point in the history
Signed-off-by: SimFG <bang.fu@zilliz.com>
  • Loading branch information
SimFG committed Nov 21, 2024
1 parent 92962d9 commit bad5e1f
Show file tree
Hide file tree
Showing 21 changed files with 5,196 additions and 51 deletions.
3 changes: 2 additions & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -24,4 +24,5 @@ generate-mockery:
@echo "Generating mockery server mocks..."

@cd "$(PWD)/core"; mockery -r --name "$(shell echo $(strip $(CORE_API)) | tr ' ' '|')" --output ./mocks --case snake --with-expecter
@cd "$(PWD)/server"; mockery -r --name "$(shell echo $(strip $(SERVER_API)) | tr ' ' '|')" --output ./mocks --case snake --with-expecter
@cd "$(PWD)/server"; mockery -r --name "$(shell echo $(strip $(SERVER_API)) | tr ' ' '|')" --output ./mocks --case snake --with-expecter
@cd "$(PWD)/core"; mockery --srcpkg github.com/milvus-io/milvus-proto/go-api/v2/milvuspb --name MilvusServiceServer --output ./servermocks --case snake --with-expecter
1 change: 1 addition & 0 deletions core/config/writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,4 +21,5 @@ package config
type WriterConfig struct {
MessageBufferSize int
Retry RetrySettings
ReplicateID string
}
2 changes: 1 addition & 1 deletion core/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,7 @@ replace (
github.com/apache/pulsar-client-go => github.com/milvus-io/pulsar-client-go v0.6.10
github.com/confluentinc/confluent-kafka-go => github.com/confluentinc/confluent-kafka-go/v2 v2.3.0
// github.com/milvus-io/milvus-sdk-go/v2 => github.com/SimFG/milvus-sdk-go/v2 v2.0.0-20231025090810-3c862f081ed2
github.com/milvus-io/milvus-proto/go-api/v2 => github.com/SimFG/milvus-proto/go-api/v2 v2.0.0-20240923040205-ef42b6b1a81b
github.com/milvus-io/milvus-proto/go-api/v2 => github.com/SimFG/milvus-proto/go-api/v2 v2.0.0-20241120065629-5a84afe5a345
github.com/milvus-io/milvus/pkg => github.com/SimFG/milvus/pkg v0.0.0-20240929115325-612f86d538e8
github.com/streamnative/pulsarctl => github.com/xiaofan-luan/pulsarctl v0.5.1
github.com/tecbot/gorocksdb => ./../rocksdb
Expand Down
4 changes: 2 additions & 2 deletions core/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -63,8 +63,8 @@ github.com/Microsoft/hcsshim v0.11.4 h1:68vKo2VN8DE9AdN4tnkWnmdhqdbpUFM8OF3Airm7
github.com/Microsoft/hcsshim v0.11.4/go.mod h1:smjE4dvqPX9Zldna+t5FG3rnoHhaB7QYxPRqGcpAD9w=
github.com/OneOfOne/xxhash v1.2.2/go.mod h1:HSdplMjZKSmBqAxg5vPj2TmRDmfkzw+cTzAElWljhcU=
github.com/Shopify/goreferrer v0.0.0-20181106222321-ec9c9a553398/go.mod h1:a1uqRtAwp2Xwc6WNPJEufxJ7fx3npB4UV/JOLmbu5I0=
github.com/SimFG/milvus-proto/go-api/v2 v2.0.0-20240923040205-ef42b6b1a81b h1:fWcTqPhsHX3m7PBFoENwrDWnqmdBDLa4Zkd0ZNkZjU4=
github.com/SimFG/milvus-proto/go-api/v2 v2.0.0-20240923040205-ef42b6b1a81b/go.mod h1:/6UT4zZl6awVeXLeE7UGDWZvXj3IWkRsh3mqsn0DiAs=
github.com/SimFG/milvus-proto/go-api/v2 v2.0.0-20241120065629-5a84afe5a345 h1:hzf9web1XGcnx/PCs/gla1V3NPTQfitOrxQ6f9m7DqA=
github.com/SimFG/milvus-proto/go-api/v2 v2.0.0-20241120065629-5a84afe5a345/go.mod h1:/6UT4zZl6awVeXLeE7UGDWZvXj3IWkRsh3mqsn0DiAs=
github.com/SimFG/milvus/pkg v0.0.0-20240929115325-612f86d538e8 h1:gRjXG9Io6qDDp7r3NyB4hCpEbA0+08VLA6h/A6A5NaY=
github.com/SimFG/milvus/pkg v0.0.0-20240929115325-612f86d538e8/go.mod h1:FStG8u3/hsi3Au3I36NBr9G5c6f1CQP9oKNIgvlCxm0=
github.com/ajg/form v1.5.1/go.mod h1:uL1WgH+h2mgNtvBq0339dVnzXdBETtL2LeUXaIv25UY=
Expand Down
25 changes: 21 additions & 4 deletions core/model/reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,10 @@
package model

import (
"sync"

"github.com/milvus-io/milvus/pkg/mq/msgstream"
"github.com/milvus-io/milvus/pkg/util/retry"

"github.com/zilliztech/milvus-cdc/core/util"
)

type SourceCollectionInfo struct {
Expand All @@ -46,8 +46,8 @@ type TargetCollectionInfo struct {
PartitionInfo map[string]int64
PChannel string
VChannel string
BarrierChan *util.OnceWriteChan[uint64]
PartitionBarrierChan map[int64]*util.OnceWriteChan[uint64] // id is the source partition id
BarrierChan *OnceWriteChan[uint64]
PartitionBarrierChan map[int64]*OnceWriteChan[uint64] // id is the source partition id
Dropped bool
DroppedPartition map[int64]struct{} // id is the source partition id
}
Expand Down Expand Up @@ -75,3 +75,20 @@ type DatabaseInfo struct {
Name string
Dropped bool
}

type OnceWriteChan[T any] struct {
once sync.Once
ch chan<- T
}

func NewOnceWriteChan[T any](c chan<- T) *OnceWriteChan[T] {
return &OnceWriteChan[T]{
ch: c,
}
}

func (o *OnceWriteChan[T]) Write(data T) {
o.once.Do(func() {
o.ch <- data
})
}
8 changes: 4 additions & 4 deletions core/reader/replicate_channel_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -384,8 +384,8 @@ func (r *replicateChannelManager) StartReadCollection(ctx context.Context, db *m
PartitionInfo: targetInfo.Partitions,
PChannel: targetPChannel,
VChannel: targetVChannel,
BarrierChan: util.NewOnceWriteChan(barrier.BarrierChan),
PartitionBarrierChan: make(map[int64]*util.OnceWriteChan[uint64]),
BarrierChan: model.NewOnceWriteChan(barrier.BarrierChan),
PartitionBarrierChan: make(map[int64]*model.OnceWriteChan[uint64]),
Dropped: targetInfo.Dropped,
DroppedPartition: make(map[int64]struct{}),
})
Expand Down Expand Up @@ -1036,7 +1036,7 @@ func (r *replicateChannelHandler) AddPartitionInfo(taskID string, collectionInfo
r.recordLock.Unlock()
return nil
}
targetInfo.PartitionBarrierChan[partitionID] = util.NewOnceWriteChan(barrierChan)
targetInfo.PartitionBarrierChan[partitionID] = model.NewOnceWriteChan(barrierChan)
sourcePChannel := r.collectionNames[collectionName].PChannel
partitionLog.Info("add partition info done")
r.recordLock.Unlock()
Expand Down Expand Up @@ -1522,7 +1522,7 @@ func (r *replicateChannelHandler) handlePack(forward bool, pack *msgstream.MsgPa
log.Warn("invalid drop partition message, empty partition name", zap.Any("msg", msg))
continue
}
var partitionBarrierChan *util.OnceWriteChan[uint64]
var partitionBarrierChan *model.OnceWriteChan[uint64]
retryErr := retry.Do(r.replicateCtx, func() error {
err = nil
r.recordLock.RLock()
Expand Down
20 changes: 10 additions & 10 deletions core/reader/replicate_channel_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -285,9 +285,9 @@ func TestStartReadCollectionForMilvus(t *testing.T) {
},
PChannel: "ttest_read_channel",
VChannel: "ttest_read_channel_v0",
BarrierChan: util.NewOnceWriteChan(make(chan<- uint64)),
PartitionBarrierChan: map[int64]*util.OnceWriteChan[uint64]{
1101: util.NewOnceWriteChan(make(chan<- uint64)),
BarrierChan: model.NewOnceWriteChan(make(chan<- uint64)),
PartitionBarrierChan: map[int64]*model.OnceWriteChan[uint64]{
1101: model.NewOnceWriteChan(make(chan<- uint64)),
},
})
assert.NoError(t, err)
Expand Down Expand Up @@ -449,9 +449,9 @@ func TestStartReadCollectionForKafka(t *testing.T) {
},
PChannel: "kafka_ttest_read_channel",
VChannel: "kafka_ttest_read_channel_v0",
BarrierChan: util.NewOnceWriteChan(make(chan<- uint64)),
PartitionBarrierChan: map[int64]*util.OnceWriteChan[uint64]{
1101: util.NewOnceWriteChan(make(chan<- uint64)),
BarrierChan: model.NewOnceWriteChan(make(chan<- uint64)),
PartitionBarrierChan: map[int64]*model.OnceWriteChan[uint64]{
1101: model.NewOnceWriteChan(make(chan<- uint64)),
},
})
assert.NoError(t, err)
Expand Down Expand Up @@ -702,8 +702,8 @@ func TestReplicateChannelHandler(t *testing.T) {
CollectionID: 2,
}, &model.TargetCollectionInfo{
CollectionName: "test2",
PartitionBarrierChan: map[int64]*util.OnceWriteChan[uint64]{
1001: util.NewOnceWriteChan(make(chan<- uint64)),
PartitionBarrierChan: map[int64]*model.OnceWriteChan[uint64]{
1001: model.NewOnceWriteChan(make(chan<- uint64)),
},
DroppedPartition: make(map[int64]struct{}),
})
Expand Down Expand Up @@ -779,8 +779,8 @@ func TestReplicateChannelHandler(t *testing.T) {
},
PChannel: "test_q",
VChannel: "test_q_v1",
BarrierChan: util.NewOnceWriteChan(barrierChan),
PartitionBarrierChan: map[int64]*util.OnceWriteChan[uint64]{},
BarrierChan: model.NewOnceWriteChan(barrierChan),
PartitionBarrierChan: map[int64]*model.OnceWriteChan[uint64]{},
DroppedPartition: make(map[int64]struct{}),
}, targetClient, &api.DefaultMetaOp{}, apiEventChan, &model.HandlerOpts{
Factory: factory,
Expand Down
3 changes: 2 additions & 1 deletion core/reader/target_client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,8 @@ import (

"github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
"github.com/milvus-io/milvus-proto/go-api/v2/milvuspb"
"github.com/milvus-io/milvus-sdk-go/v2/mocks"

mocks "github.com/zilliztech/milvus-cdc/core/servermocks"
)

func TestTargetClient(t *testing.T) {
Expand Down
10 changes: 10 additions & 0 deletions core/reader/ts_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,10 @@ import (
"time"

"github.com/stretchr/testify/assert"
"go.uber.org/zap"

"github.com/milvus-io/milvus/pkg/util/lock"
"github.com/milvus-io/milvus/pkg/util/tsoutil"
"github.com/milvus-io/milvus/pkg/util/typeutil"

"github.com/zilliztech/milvus-cdc/core/config"
Expand Down Expand Up @@ -96,3 +98,11 @@ func TestTS(t *testing.T) {
assert.EqualValues(t, 1, minTS)
m.EmptyTS()
}

func TestGetTT(t *testing.T) {
curTime := time.UnixMilli(1732111898778)
// curTime := time.Now()
tt := tsoutil.ComposeTSByTime(curTime, 0)
log.Info("compose ts by time", zap.Any("tt", tt))
log.Info("cur time", zap.Any("curTime", tsoutil.ComposeTS(1732111898778, 0)))
}
Loading

0 comments on commit bad5e1f

Please sign in to comment.