Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feat/destroy shards #309

Merged
merged 2 commits into from
Mar 28, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 21 additions & 0 deletions api/impl/venus_market.go
Original file line number Diff line number Diff line change
Expand Up @@ -886,6 +886,27 @@ func (m *MarketNodeImpl) DagstoreInitializeStorage(ctx context.Context, storageN
return m.dagstoreLoadShards(ctx, toInitialize, params.MaxConcurrency)
}

func (m *MarketNodeImpl) DagstoreDestroyShard(ctx context.Context, key string) error {
opts := dagstore.DestroyOpts{}
sr := make(chan dagstore.ShardResult, 1)

shardKey := shard.KeyFromString(key)
if _, err := m.DAGStore.GetShardInfo(shardKey); err != nil {
return fmt.Errorf("query shard failed: %v", err)
}

if err := m.DAGStore.DestroyShard(ctx, shardKey, sr, opts); err != nil {
return err
}

select {
case <-ctx.Done():
return ctx.Err()
case r := <-sr:
return r.Error
}
}

func (m *MarketNodeImpl) dagstoreLoadShards(ctx context.Context, toInitialize []string, concurrency int) (<-chan types.DagstoreInitializeAllEvent, error) {
// prepare the thottler tokens.
var throttle chan struct{}
Expand Down
80 changes: 76 additions & 4 deletions cli/dagstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ var DagstoreCmd = &cli.Command{
dagstoreInitializeAllCmd,
dagstoreInitializeStorageCmd,
dagstoreGcCmd,
dagStoreDestroyShardCmd,
},
}

Expand All @@ -34,6 +35,20 @@ var dagstoreListShardsCmd = &cli.Command{
Usage: "use color in display output",
DefaultText: "depends on output being a TTY",
},
&cli.StringSliceFlag{
Name: "filter",
Usage: `Filter shards in specific states,
eg. ./venus-market dagstore list-shards --filter=ShardStateErrored --filter=ShardStateAvailable, will ignore Errored and Available shards.
all shard states:
ShardStateAvailable
ShardStateServing
ShardStateErrored
ShardStateNew
ShardStateInitializing
ShardStateRecovering
ShardStateUnknown
`,
},
},
Action: func(cctx *cli.Context) error {
if cctx.IsSet("color") {
Expand All @@ -56,20 +71,28 @@ var dagstoreListShardsCmd = &cli.Command{
return nil
}

filterStates := make(map[string]struct{})
for _, state := range cctx.StringSlice("filter") {
filterStates[state] = struct{}{}
}

tw := tablewriter.New(
tablewriter.Col("Key"),
tablewriter.Col("State"),
tablewriter.Col("Error"),
)

colors := map[string]color.Attribute{
"ShardStateAvailable": color.FgGreen,
"ShardStateServing": color.FgBlue,
"ShardStateErrored": color.FgRed,
"ShardStateNew": color.FgYellow,
types.ShardStateAvailable: color.FgGreen,
types.ShardStateServing: color.FgBlue,
types.ShardStateErrored: color.FgRed,
types.ShardStateNew: color.FgYellow,
}

for _, s := range shards {
if _, ok := filterStates[s.State]; ok {
continue
}
m := map[string]interface{}{
"Key": s.Key,
"State": func() string {
Expand Down Expand Up @@ -336,3 +359,52 @@ var dagstoreGcCmd = &cli.Command{
return nil
},
}

var dagStoreDestroyShardCmd = &cli.Command{
Name: "destroy-shard",
Usage: "Destroy shard",
Flags: []cli.Flag{
&cli.BoolFlag{
Name: "errored",
Usage: "Destroy all errored shards",
},
},
ArgsUsage: "[keys]",
Action: func(cliCtx *cli.Context) error {
marketsApi, closer, err := NewMarketNode(cliCtx)
if err != nil {
return err
}
defer closer()

ctx := ReqContext(cliCtx)
args := cliCtx.Args().Slice()
if len(args) == 0 && !cliCtx.IsSet("errored") {
return fmt.Errorf("must pass shard key or set --errored flag")
}

keys := args
if cliCtx.Bool("errored") {
shards, err := marketsApi.DagstoreListShards(ctx)
if err != nil {
return err
}
for _, shardInfo := range shards {
if shardInfo.State == types.ShardStateErrored {
keys = append(keys, shardInfo.Key)
}
}
}

fmt.Printf("Have %d shard need to destroy\n", len(keys))

for _, key := range keys {
if err := marketsApi.DagstoreDestroyShard(ctx, key); err != nil {
return fmt.Errorf("destroy %s failed: %v", key, err)
}
fmt.Printf("destroy %s success\n", key)
}

return nil
},
}
2 changes: 1 addition & 1 deletion cli/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ func NewMarketNode(cctx *cli.Context) (marketapi.IMarket, jsonrpc.ClientCloser,
if err != nil {
return nil, nil, err
}
fmt.Println(homePath)

apiUrl, err := ioutil.ReadFile(path.Join(homePath, "api"))
if err != nil {
return nil, nil, err
Expand Down
3 changes: 3 additions & 0 deletions dagstore/wrapper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,9 @@ func TestWrapperAcquireRecoveryDestroy(t *testing.T) {
dr := make(chan dagstore.ShardResult, 1)
err = w.DestroyShard(ctx, pieceCid, dr)
require.NoError(t, err)
res := <-dr
require.NoError(t, res.Error)
require.Equal(t, shard.KeyFromCID(pieceCid), res.Key)

dctx, cancel := context.WithTimeout(ctx, time.Second)
defer cancel()
Expand Down
7 changes: 4 additions & 3 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ require (
github.com/docker/go-units v0.5.0
github.com/etherlabsio/healthcheck/v2 v2.0.0
github.com/fatih/color v1.13.0
github.com/filecoin-project/dagstore v0.5.2
github.com/filecoin-project/dagstore v0.6.0
github.com/filecoin-project/go-address v1.1.0
github.com/filecoin-project/go-bitfield v0.2.4
github.com/filecoin-project/go-cbor-util v0.0.1
Expand All @@ -30,7 +30,7 @@ require (
github.com/filecoin-project/go-statestore v0.2.0
github.com/filecoin-project/specs-actors/v2 v2.3.6
github.com/filecoin-project/specs-actors/v7 v7.0.1
github.com/filecoin-project/venus v1.10.2-0.20230323084520-c7cd25dffe48
github.com/filecoin-project/venus v1.10.2-0.20230327054644-931bc77297de
github.com/filecoin-project/venus-auth v1.10.2-0.20230308100319-913815325d5e
github.com/filecoin-project/venus-messager v1.10.2-0.20230309071456-7cd8d49c6e9a
github.com/golang/mock v1.6.0
Expand All @@ -41,7 +41,7 @@ require (
github.com/ipfs-force-community/metrics v1.0.1-0.20220824061112-ac916bacf2ea
github.com/ipfs-force-community/venus-common-utils v0.0.0-20220217030526-e5e4c6bc14f7
github.com/ipfs-force-community/venus-gateway v1.10.2-0.20230320070449-17b514ccd356
github.com/ipfs/go-blockservice v0.4.0
github.com/ipfs/go-blockservice v0.5.0
github.com/ipfs/go-cid v0.3.2
github.com/ipfs/go-cidutil v0.1.0
github.com/ipfs/go-datastore v0.6.0
Expand Down Expand Up @@ -171,6 +171,7 @@ require (
github.com/hashicorp/errwrap v1.1.0 // indirect
github.com/hashicorp/go-multierror v1.1.1 // indirect
github.com/hashicorp/golang-lru v0.5.4 // indirect
github.com/hashicorp/golang-lru/v2 v2.0.2 // indirect
github.com/hashicorp/hcl v1.0.0 // indirect
github.com/huin/goupnp v1.0.3 // indirect
github.com/influxdata/influxdb-client-go/v2 v2.2.2 // indirect
Expand Down
Loading