Skip to content

Commit

Permalink
enhance: Sync v1.0.x pr to main branch (part4) (#357)
Browse files Browse the repository at this point in the history
Related to #332 #334 #335 #336 #337 #338 #339 #340 #343 #344 #346

---------

Signed-off-by: Congqi Xia <congqi.xia@zilliz.com>
Signed-off-by: bigsheeper <yihao.dai@zilliz.com>
Signed-off-by: Wei Liu <wei.liu@zilliz.com>
Co-authored-by: yihao.dai <yihao.dai@zilliz.com>
Co-authored-by: wei liu <wei.liu@zilliz.com>
  • Loading branch information
3 people authored Feb 19, 2025
1 parent 9f0d33c commit f41743b
Show file tree
Hide file tree
Showing 35 changed files with 680 additions and 53 deletions.
1 change: 1 addition & 0 deletions .goreleaser-darwin.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ before:
builds:
- env:
- CGO_ENABLED=1
main: ./cmd/birdwatcher
goos:
# - linux
#- windows
Expand Down
1 change: 1 addition & 0 deletions .goreleaser-linux.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ before:
builds:
- env:
- CGO_ENABLED=1
main: ./cmd/birdwatcher
goos:
- linux
#- windows
Expand Down
4 changes: 2 additions & 2 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,12 @@ all: static-check birdwatcher
birdwatcher:
@echo "Compiling birdwatcher"
@mkdir -p bin
@CGO_ENABLED=0 go build -o bin/birdwatcher main.go
@CGO_ENABLED=0 go build -o bin/birdwatcher cmd/birdwatcher/main.go

birdwatcher_wkafka:
@echo "Compiling birdwatcher with kafka(CGO_ENABLED)"
@mkdir -p bin
@CGO_ENABLED=1 go build -o bin/birdwatcher_wkafka -tags WKAFKA main.go
@CGO_ENABLED=1 go build -o bin/birdwatcher_wkafka -tags WKAFKA cmd/birdwatcher/main.go

getdeps:
@mkdir -p $(INSTALL_PATH)
Expand Down
75 changes: 75 additions & 0 deletions cmd/birdwatcher/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
package main

import (
"flag"
"fmt"
"log"
"os"
"os/exec"

_ "github.com/milvus-io/birdwatcher/asap"
"github.com/milvus-io/birdwatcher/bapps"
"github.com/milvus-io/birdwatcher/common"
"github.com/milvus-io/birdwatcher/configs"
"github.com/milvus-io/birdwatcher/states"
)

var (
oneLineCommand = flag.String("olc", "", "one line command execution mode")
simple = flag.Bool("simple", false, "use simple ui without suggestion and history")
restServer = flag.Bool("rest", false, "rest server address")
webPort = flag.Int("port", 8002, "listening port for web server")
printVersion = flag.Bool("version", false, "print version")
)

func main() {
flag.Parse()

var appFactory func(config *configs.Config) bapps.BApp

switch {
// Print current birdwatcher version
case *printVersion:
fmt.Println("Birdwatcher Version", common.Version)
return
case *simple:
appFactory = func(*configs.Config) bapps.BApp { return bapps.NewSimpleApp() }
case len(*oneLineCommand) > 0:
appFactory = func(*configs.Config) bapps.BApp { return bapps.NewOlcApp(*oneLineCommand) }
case *restServer:
appFactory = func(config *configs.Config) bapps.BApp { return bapps.NewWebServerApp(*webPort, config) }
default:
defer handleExit()
// open file and create if non-existent
file, err := os.OpenFile("bw_debug.log", os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0o644)
if err != nil {
log.Fatal(err)
}
defer file.Close()

logger := log.New(file, "Custom Log", log.LstdFlags)

appFactory = func(config *configs.Config) bapps.BApp {
return bapps.NewPromptApp(config, bapps.WithLogger(logger))
}
}

config, err := configs.NewConfig(".bw_config")
if err != nil {
// run by default, just printing warning.
fmt.Println("[WARN] load config file failed, running in default setting", err.Error())
}

start := states.Start(config)

app := appFactory(config)
app.Run(start)
}

// handleExit is the fix for go-prompt output hi-jack fix.
func handleExit() {
rawModeOff := exec.Command("/bin/stty", "-raw", "echo")
rawModeOff.Stdin = os.Stdin
_ = rawModeOff.Run()
rawModeOff.Wait()
}
4 changes: 2 additions & 2 deletions states/backup_mock_connect.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ func (s *embedEtcdMockState) SetInstance(instanceName string) {
s.SetLabel(fmt.Sprintf("Backup(%s)", instanceName))
s.instanceName = instanceName
rootPath := path.Join(instanceName, metaPath)
s.ComponentShow = show.NewComponent(s.client, s.config, rootPath)
s.ComponentShow = show.NewComponent(s.client, s.config, instanceName, metaPath)
s.ComponentRemove = remove.NewComponent(s.client, s.config, rootPath)
s.ComponentRepair = repair.NewComponent(s.client, s.config, rootPath)
s.SetupCommands()
Expand Down Expand Up @@ -171,7 +171,7 @@ func getEmbedEtcdInstance(server *embed.Etcd, cli kv.MetaKV, instanceName string

state := &embedEtcdMockState{
CmdState: framework.NewCmdState(fmt.Sprintf("Backup(%s)", instanceName)),
ComponentShow: show.NewComponent(cli, config, basePath),
ComponentShow: show.NewComponent(cli, config, instanceName, metaPath),
ComponentRemove: remove.NewComponent(cli, config, basePath),
instanceName: instanceName,
server: server,
Expand Down
19 changes: 15 additions & 4 deletions states/etcd/common/bulkinsert.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,14 +24,25 @@ func ListImportJobs(ctx context.Context, cli kv.MetaKV, basePath string, filters
return nil, nil, err
}

return lo.FilterMap(jobs, func(job datapb.ImportJob, idx int) (*datapb.ImportJob, bool) {
resultJobs := make([]*datapb.ImportJob, 0, len(jobs))
resultKeys := make([]string, 0, len(keys))

filterFn := func(job datapb.ImportJob) bool {
for _, filter := range filters {
if !filter(&job) {
return nil, false
return false
}
}
return &job, true
}), keys, nil
return true
}
for i, job := range jobs {
if ok := filterFn(job); ok {
resultJobs = append(resultJobs, &jobs[i])
resultKeys = append(resultKeys, keys[i])
}
}

return resultJobs, resultKeys, nil
}

// ListPreImportTasks list pre-import tasks.
Expand Down
6 changes: 1 addition & 5 deletions states/etcd/common/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package common
import (
"context"
"path"
"time"

"github.com/milvus-io/birdwatcher/proto/v2.0/etcdpb"
"github.com/milvus-io/birdwatcher/proto/v2.0/indexpb"
Expand All @@ -18,10 +17,7 @@ func ListIndex(ctx context.Context, cli kv.MetaKV, basePath string, filters ...f
}

// ListSegmentIndex list segment index info.
func ListSegmentIndex(cli kv.MetaKV, basePath string, filters ...func(segIdx *etcdpb.SegmentIndexInfo) bool) ([]etcdpb.SegmentIndexInfo, error) {
ctx, cancel := context.WithTimeout(context.Background(), time.Second*3)
defer cancel()

func ListSegmentIndex(ctx context.Context, cli kv.MetaKV, basePath string, filters ...func(segIdx *etcdpb.SegmentIndexInfo) bool) ([]etcdpb.SegmentIndexInfo, error) {
prefix := path.Join(basePath, "root-coord/segment-index") + "/"
result, _, err := ListProtoObjects(ctx, cli, prefix, filters...)
return result, err
Expand Down
60 changes: 60 additions & 0 deletions states/etcd/remove/dirty_importing_segment.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
package remove

import (
"context"
"fmt"

"github.com/samber/lo"

"github.com/milvus-io/birdwatcher/framework"
"github.com/milvus-io/birdwatcher/models"
"github.com/milvus-io/birdwatcher/states/etcd/common"
etcdversion "github.com/milvus-io/birdwatcher/states/etcd/version"
)

type DirtyImportingSegment struct {
framework.ParamBase `use:"remove dirty-importing-segment" desc:"remove dirty importing segments with 0 rows"`
CollectionID int64 `name:"collection" default:"0" desc:"collection id to filter with"`
Ts int64 `name:"ts" default:"0" desc:"only remove segments with ts less than this value"`
Run bool `name:"run" default:"false" desc:"flag to control actually run or dry"`
}

// DirtyImportingSegmentCommand returns command to remove
func (c *ComponentRemove) DirtyImportingSegmentCommand(ctx context.Context, p *DirtyImportingSegment) error {
fmt.Println("start to remove dirty importing segment")
segments, err := common.ListSegmentsVersion(ctx, c.client, c.basePath, etcdversion.GetVersion(), func(segment *models.Segment) bool {
return (p.CollectionID == 0 || segment.CollectionID == p.CollectionID)
})
if err != nil {
return err
}

groups := lo.GroupBy(segments, func(segment *models.Segment) int64 {
return segment.CollectionID
})

for collectionID, segments := range groups {
for _, segment := range segments {
if segment.State == models.SegmentStateImporting {
segmentTs := segment.GetDmlPosition().GetTimestamp()
if segmentTs == 0 {
segmentTs = segment.GetStartPosition().GetTimestamp()
}
if segment.NumOfRows == 0 && segmentTs < uint64(p.Ts) {
fmt.Printf("collection %d, segment %d is dirty importing with 0 rows, remove it\n", collectionID, segment.ID)
if p.Run {
err := common.RemoveSegmentByID(ctx, c.client, c.basePath, segment.CollectionID, segment.PartitionID, segment.ID)
if err != nil {
fmt.Printf("failed to remove segment %d, err: %s\n", segment.ID, err.Error())
}
}
} else {
fmt.Printf("collection %d, segment %d is dirty importing with %d rows, ts=%d, skip it\n", collectionID, segment.ID, segment.NumOfRows, segmentTs)
}
}
}
}

fmt.Println("finish to remove dirty importing segment")
return nil
}
6 changes: 5 additions & 1 deletion states/etcd/repair/segment.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"fmt"
"path"
"time"

"github.com/golang/protobuf/proto"
"github.com/spf13/cobra"
Expand Down Expand Up @@ -56,8 +57,11 @@ func SegmentCommand(cli kv.MetaKV, basePath string) *cobra.Command {
return
}

ctx, cancel := context.WithTimeout(context.Background(), time.Second*3)
defer cancel()

// use v1 meta for now
segmentIndexes, err := common.ListSegmentIndex(cli, basePath)
segmentIndexes, err := common.ListSegmentIndex(ctx, cli, basePath)
if err != nil {
fmt.Println(err.Error())
return
Expand Down
2 changes: 1 addition & 1 deletion states/etcd/show/alias.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ type AliasParam struct {

// AliasCommand implements `show alias` command.
func (c *ComponentShow) AliasCommand(ctx context.Context, p *AliasParam) (*Aliases, error) {
aliases, err := common.ListAliasVersion(ctx, c.client, c.basePath, etcdversion.GetVersion(), func(a *models.Alias) bool {
aliases, err := common.ListAliasVersion(ctx, c.client, c.metaPath, etcdversion.GetVersion(), func(a *models.Alias) bool {
return p.DBID == -1 || p.DBID == a.DBID
})
if err != nil {
Expand Down
4 changes: 2 additions & 2 deletions states/etcd/show/bulkinsert.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ type ImportJobParam struct {

// BulkInsertCommand returns show bulkinsert command.
func (c *ComponentShow) BulkInsertCommand(ctx context.Context, p *ImportJobParam) error {
jobs, _, err := common.ListImportJobs(ctx, c.client, c.basePath, func(job *datapb.ImportJob) bool {
jobs, _, err := common.ListImportJobs(ctx, c.client, c.metaPath, func(job *datapb.ImportJob) bool {
return (p.JobID == 0 || job.GetJobID() == p.JobID) &&
(p.CollectionID == 0 || job.GetCollectionID() == p.CollectionID) &&
(p.State == "" || strings.EqualFold(job.GetState().String(), p.State))
Expand All @@ -51,7 +51,7 @@ func (c *ComponentShow) BulkInsertCommand(ctx context.Context, p *ImportJobParam
fmt.Println("Please specify the job ID (-job={JobID}) to show detailed info.")
return nil
}
PrintDetailedImportJob(ctx, c.client, c.basePath, job, p.ShowAllFiles)
PrintDetailedImportJob(ctx, c.client, c.metaPath, job, p.ShowAllFiles)
} else {
PrintSimpleImportJob(job)
}
Expand Down
2 changes: 1 addition & 1 deletion states/etcd/show/channel_watched.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ type ChannelWatchedParam struct {

// ChannelWatchedCommand return show channel-watched commands.
func (c *ComponentShow) ChannelWatchedCommand(ctx context.Context, p *ChannelWatchedParam) (*framework.PresetResultSet, error) {
infos, err := common.ListChannelWatch(ctx, c.client, c.basePath, etcdversion.GetVersion(), func(channel *models.ChannelWatch) bool {
infos, err := common.ListChannelWatch(ctx, c.client, c.metaPath, etcdversion.GetVersion(), func(channel *models.ChannelWatch) bool {
return (p.CollectionID == 0 || channel.Vchan.CollectionID == p.CollectionID) && (!p.WithoutSchema || channel.Schema == nil)
})
if err != nil {
Expand Down
6 changes: 3 additions & 3 deletions states/etcd/show/checkpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ type CheckpointParam struct {

// CheckpointCommand returns show checkpoint command.
func (c *ComponentShow) CheckpointCommand(ctx context.Context, p *CheckpointParam) (*Checkpoints, error) {
coll, err := common.GetCollectionByIDVersion(context.Background(), c.client, c.basePath, etcdversion.GetVersion(), p.CollectionID)
coll, err := common.GetCollectionByIDVersion(context.Background(), c.client, c.metaPath, etcdversion.GetVersion(), p.CollectionID)
if err != nil {
return nil, errors.Wrap(err, "failed to get collection")
}
Expand Down Expand Up @@ -93,7 +93,7 @@ func (rs *Checkpoints) PrintAs(format framework.Format) string {
}

func (c *ComponentShow) getChannelCheckpoint(ctx context.Context, channelName string) (*models.MsgPosition, error) {
prefix := path.Join(c.basePath, "datacoord-meta", "channel-cp", channelName)
prefix := path.Join(c.metaPath, "datacoord-meta", "channel-cp", channelName)
results, _, err := common.ListProtoObjects[internalpb.MsgPosition](ctx, c.client, prefix)
if err != nil {
return nil, err
Expand All @@ -108,7 +108,7 @@ func (c *ComponentShow) getChannelCheckpoint(ctx context.Context, channelName st
}

func (c *ComponentShow) getCheckpointFromSegments(ctx context.Context, collID int64, vchannel string) (*models.MsgPosition, int64, error) {
segments, err := common.ListSegmentsVersion(ctx, c.client, c.basePath, etcdversion.GetVersion(), func(info *models.Segment) bool {
segments, err := common.ListSegmentsVersion(ctx, c.client, c.metaPath, etcdversion.GetVersion(), func(info *models.Segment) bool {
return info.CollectionID == collID && info.InsertChannel == vchannel
})
if err != nil {
Expand Down
4 changes: 2 additions & 2 deletions states/etcd/show/collection.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,12 +30,12 @@ func (c *ComponentShow) CollectionCommand(ctx context.Context, p *CollectionPara
// perform get by id to accelerate
if p.CollectionID > 0 {
var collection *models.Collection
collection, err = common.GetCollectionByIDVersion(ctx, c.client, c.basePath, etcdversion.GetVersion(), p.CollectionID)
collection, err = common.GetCollectionByIDVersion(ctx, c.client, c.metaPath, etcdversion.GetVersion(), p.CollectionID)
if err == nil {
collections = append(collections, collection)
}
} else {
collections, err = common.ListCollectionsVersion(ctx, c.client, c.basePath, etcdversion.GetVersion(), func(coll *models.Collection) bool {
collections, err = common.ListCollectionsVersion(ctx, c.client, c.metaPath, etcdversion.GetVersion(), func(coll *models.Collection) bool {
if p.CollectionName != "" && coll.Schema.Name != p.CollectionName {
return false
}
Expand Down
4 changes: 2 additions & 2 deletions states/etcd/show/collection_history.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ func (c *ComponentShow) CollectionHistoryCommand(ctx context.Context, p *Collect
}

// fetch current for now
collection, err := common.GetCollectionByIDVersion(ctx, c.client, c.basePath, etcdversion.GetVersion(), p.CollectionID)
collection, err := common.GetCollectionByIDVersion(ctx, c.client, c.metaPath, etcdversion.GetVersion(), p.CollectionID)
if err != nil {
switch {
case errors.Is(err, common.ErrCollectionDropped):
Expand All @@ -43,7 +43,7 @@ func (c *ComponentShow) CollectionHistoryCommand(ctx context.Context, p *Collect
Collection: collection,
}
// fetch history
items, err := common.ListCollectionHistory(ctx, c.client, c.basePath, etcdversion.GetVersion(), collection.DBID, p.CollectionID)
items, err := common.ListCollectionHistory(ctx, c.client, c.metaPath, etcdversion.GetVersion(), collection.DBID, p.CollectionID)
if err != nil {
return nil, err
}
Expand Down
2 changes: 1 addition & 1 deletion states/etcd/show/collection_loaded.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ type CollectionLoadedParam struct {
// CollectionLoadedCommand return show collection-loaded command.
func (c *ComponentShow) CollectionLoadedCommand(ctx context.Context, p *CollectionLoadedParam) (*CollectionsLoaded, error) {
var total int
infos, err := common.ListCollectionLoadedInfo(ctx, c.client, c.basePath, etcdversion.GetVersion(), func(info *models.CollectionLoaded) bool {
infos, err := common.ListCollectionLoadedInfo(ctx, c.client, c.metaPath, etcdversion.GetVersion(), func(info *models.CollectionLoaded) bool {
total++
return p.CollectionID == 0 || p.CollectionID == info.CollectionID
})
Expand Down
2 changes: 1 addition & 1 deletion states/etcd/show/compaction.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ func (c *ComponentShow) CompactionTaskCommand(ctx context.Context, p *Compaction

// perform get by id to accelerate

compactionTasks, err = common.ListCompactionTask(ctx, c.client, c.basePath, func(task *models.CompactionTask) bool {
compactionTasks, err = common.ListCompactionTask(ctx, c.client, c.metaPath, func(task *models.CompactionTask) bool {
total++
if p.CollectionName != "" && task.GetSchema().GetName() != p.CollectionName {
return false
Expand Down
Loading

0 comments on commit f41743b

Please sign in to comment.