diff --git a/.gitignore b/.gitignore index 4323a0c0..55cb21f5 100644 --- a/.gitignore +++ b/.gitignore @@ -8,6 +8,7 @@ badgerStAskDb/ *.puml /droplet-client /droplet +/index-tool sequence_chart.md .idea .vscode diff --git a/Makefile b/Makefile index 2a32f1e4..0fbf49f1 100644 --- a/Makefile +++ b/Makefile @@ -70,6 +70,10 @@ droplet-client: $(BUILD_DEPS) rm -f droplet-client go build -o ./droplet-client $(GOFLAGS) ./cmd/droplet-client +index: $(BUILD_DEPS) + rm -f index-tool + go build -o ./index-tool $(GOFLAGS) ./tools/index + add-debug-flag: GOFLAGS+=-gcflags="all=-N -l" diff --git a/dagstore/mongo_topindex.go b/dagstore/mongo_topindex.go index 5390a843..041e2914 100644 --- a/dagstore/mongo_topindex.go +++ b/dagstore/mongo_topindex.go @@ -25,7 +25,7 @@ type MongoTopIndex struct { indexCol *mongo.Collection } -func NewMongoTopIndex(ctx context.Context, url string) (index.Inverted, error) { +func NewMongoTopIndex(ctx context.Context, url string) (*MongoTopIndex, error) { ctx, cancel := context.WithTimeout(ctx, 10*time.Second) defer cancel() client, err := mongo.Connect(ctx, options.Client().ApplyURI(url)) @@ -72,3 +72,12 @@ func (mongoTopIndex *MongoTopIndex) GetShardsForMultihash(ctx context.Context, h } return shardKeys, nil } + +func (mongoTopIndex *MongoTopIndex) HasShard(ctx context.Context, shard shard.Key) (bool, error) { + count, err := mongoTopIndex.indexCol.CountDocuments(ctx, bson.M{"pieces": shard.String()}) + if err != nil { + return false, err + } + + return count > 0, nil +} diff --git a/dagstore/mongo_topindex_test.go b/dagstore/mongo_topindex_test.go index 956cb250..94f1a7ec 100644 --- a/dagstore/mongo_topindex_test.go +++ b/dagstore/mongo_topindex_test.go @@ -56,6 +56,12 @@ func TestAddMultihashesForShard(t *testing.T) { }) assert.Nil(t, err) } + + { + has, err := indexSaver.HasShard(ctx, shard.KeyFromString(entry.Name())) + assert.NoError(t, err) + assert.True(t, has) + } } } diff --git a/models/badger/shard.go b/models/badger/shard.go index e966a9ab..ec1f9bc7 100644 --- a/models/badger/shard.go +++ b/models/badger/shard.go @@ -14,6 +14,10 @@ func NewShardRepo() *Shard { return &Shard{} } +func (s *Shard) CreateShard(ctx context.Context, shard *dagstore.PersistedShard) error { + panic("implement me") +} + func (s *Shard) SaveShard(ctx context.Context, shard *dagstore.PersistedShard) error { panic("implement me") } diff --git a/models/mysql/shard.go b/models/mysql/shard.go index f6c63575..8d8d6f07 100644 --- a/models/mysql/shard.go +++ b/models/mysql/shard.go @@ -53,6 +53,10 @@ func from(s *dagstore.PersistedShard) *shard { } } +func (s *shardRepo) CreateShard(ctx context.Context, shard *dagstore.PersistedShard) error { + return s.DB.WithContext(ctx).Create(from(shard)).Error +} + func (s *shardRepo) SaveShard(ctx context.Context, shard *dagstore.PersistedShard) error { return s.DB.WithContext(ctx).Save(from(shard)).Error } diff --git a/models/repo/repo.go b/models/repo/repo.go index 76ca0fef..2d29eb2d 100644 --- a/models/repo/repo.go +++ b/models/repo/repo.go @@ -99,6 +99,7 @@ type ICidInfoRepo interface { } type IShardRepo interface { + CreateShard(ctx context.Context, shard *dagstore.PersistedShard) error dagstore.ShardRepo } diff --git a/tools/index/README.md b/tools/index/README.md new file mode 100644 index 00000000..a3894e04 --- /dev/null +++ b/tools/index/README.md @@ -0,0 +1,51 @@ +# 索引工具 + +主要有两个功能,一个是给未生成索引的 active 订单生成索引,另一个是迁移 top index 到 MongoDB,迁移 shard 到 MySQL。 + +### 编译 + +``` +make index +``` + +### 生成索引 + +先去 droplet 获取订单状态是 active 的订单,然后去遍历 car 文件,如果被 active 订单使用且未生成索引,则为其生成索引。 + +* --car-dir:存储 car 文件的目录。 +* --index-dir:存储索引文件的目录,`droplet` 默认在 `~/.droplet/dagstore/index`。 +* --mongo-url:MongoDB 的连接地址,用于存储 top index,数据库是 `market_index`,collection 是 `top_index`。 +* --mysql-url:MySQL 的连接地址,用于存储 shard 状态,要和 `droplet` 使用同一个数据库,表名是 `shards`。 +* --droplet-url:droplet 服务的 RPC 地址。 +* --droplet-token:droplet 服务的 token。 + +```bash +./index-tool gen-index \ +--car-dir= \ +--index-dir= \ +--mongo-url="mongodb://user:pass@host/?retryWrites=true&w=majority" \ +--mysql-url="user:pass@(127.0.0.1:3306)/venus-market?parseTime=true&loc=Local" \ +--droplet-urls="/ip4/127.0.0.1/tcp/41235" \ +--droplet-token= +``` + +> 成功生成索引会输出类似日志:`generate index success: xxxxxx` + +### 迁移索引 + +目前 top index 和 shard 都是存储在 badger,这样多个 droplet 时不能共享,所有需要把 top index 存储到 MongoDB,shard 存储到 MySQL,方便共享数据。 + +* --index-dir:存储索引文件的目录,`droplet` 默认在 `~/.droplet/dagstore/index`。 +* --mongo-url:MongoDB 的连接地址,用于存储 top index,数据库是 `market_index`,`collection` 是 `top_index`。 +* --mysql-url:MySQL 的连接地址,用于存储 shard 状态,要和 `droplet` 使用同一个数据库,表名是 `shards`。 + +```bash +./index-tool migrate-index \ +--index-dir= \ +--mongo-url="mongodb://user:pass@host/?retryWrites=true&w=majority" \ +--mysql-url="user:pass@(127.0.0.1:3306)/venus-market?parseTime=true&loc=Local" \ +--droplet-urls="/ip4/127.0.0.1/tcp/41235" \ +--droplet-token= +``` + +> 成功迁移索引会输出类似日志:`migrate xxxxx success` diff --git a/tools/index/main.go b/tools/index/main.go new file mode 100644 index 00000000..c6db1369 --- /dev/null +++ b/tools/index/main.go @@ -0,0 +1,327 @@ +package main + +import ( + "context" + "fmt" + "log" + "os" + "path/filepath" + "strings" + + dagstore2 "github.com/filecoin-project/dagstore" + "github.com/filecoin-project/dagstore/index" + "github.com/filecoin-project/dagstore/shard" + "github.com/filecoin-project/go-fil-markets/storagemarket" + "github.com/filecoin-project/go-jsonrpc" + marketapi "github.com/filecoin-project/venus/venus-shared/api/market/v1" + "github.com/filecoin-project/venus/venus-shared/types/market" + "github.com/ipfs-force-community/droplet/v2/config" + "github.com/ipfs-force-community/droplet/v2/dagstore" + "github.com/ipfs-force-community/droplet/v2/models/mysql" + "github.com/ipfs-force-community/droplet/v2/models/repo" + "github.com/ipld/go-car/v2" + carindex "github.com/ipld/go-car/v2/index" + "github.com/multiformats/go-multihash" + "github.com/urfave/cli/v2" +) + +const indexSuffix = ".full.idx" + +var ( + mongoURLFlag = &cli.StringFlag{ + Name: "mongo-url", + Usage: "mongo url, use for store topIndex", + Required: true, + } + mysqlURLFlag = &cli.StringFlag{ + Name: "mysql-url", + Usage: "mysql url, use for store shard", + Required: true, + } + indexDirFlag = &cli.StringFlag{ + Name: "index-dir", + Usage: "The directory where the index is stored", + Required: true, + } + carDirFlag = &cli.StringFlag{ + Name: "car-dir", + Usage: "directory for car files", + Required: true, + } + dropletURLFlag = &cli.StringFlag{ + Name: "droplet-url", + Usage: "droplet url", + Required: true, + } + dropletTokenFlag = &cli.StringFlag{ + Name: "droplet-token", + Usage: "droplet token", + Required: true, + } +) + +func main() { + app := cli.App{ + Name: "index-tool", + Usage: "Used to generate indexes and migrate indexes", + Commands: []*cli.Command{ + generateIndexCmd, + migrateIndexCmd, + }, + } + + if err := app.Run(os.Args); err != nil { + log.Fatal(err) + } +} + +var generateIndexCmd = &cli.Command{ + Name: "gen-index", + Usage: "generate car index", + Flags: []cli.Flag{ + mongoURLFlag, + mysqlURLFlag, + indexDirFlag, + carDirFlag, + dropletTokenFlag, + dropletURLFlag, + }, + Action: func(cctx *cli.Context) error { + ctx := cctx.Context + carDir := cctx.String("car-dir") + indexDir := cctx.String(indexDirFlag.Name) + p, err := paramsFromContext(cctx) + if err != nil { + return err + } + + fmt.Println("car dir:", carDir, "index dir:", indexDir) + + return generateIndex(ctx, carDir, indexDir, p) + }, +} + +type params struct { + api marketapi.IMarket + close jsonrpc.ClientCloser + topIndexRepo *dagstore.MongoTopIndex + shardRepo repo.IShardRepo + pieces map[string]struct{} +} + +func paramsFromContext(cctx *cli.Context) (*params, error) { + ctx := cctx.Context + mongoURL := cctx.String(mongoURLFlag.Name) + mysqlURL := cctx.String(mysqlURLFlag.Name) + url := cctx.String(dropletURLFlag.Name) + token := cctx.String(dropletTokenFlag.Name) + + fmt.Println("mongo url:", mongoURL) + fmt.Println("mysql url:", mysqlURL) + fmt.Println("droplet url:", url, "token:", token) + + api, close, err := marketapi.DialIMarketRPC(ctx, url, token, nil) + if err != nil { + return nil, err + } + defer close() + + activeDeal := storagemarket.StorageDealActive + deals, err := api.MarketListIncompleteDeals(ctx, &market.StorageDealQueryParams{State: &activeDeal}) + if err != nil { + return nil, fmt.Errorf("list deal failed: %v", err) + } + pieces := make(map[string]struct{}, len(deals)) + for _, deal := range deals { + pieces[deal.Proposal.PieceCID.String()] = struct{}{} + } + fmt.Printf("had %d active deals, had %d piece\n", len(deals), len(pieces)) + + topIndexRepo, err := dagstore.NewMongoTopIndex(ctx, mongoURL) + if err != nil { + return nil, fmt.Errorf("connect to mongo failed: %v", err) + } + + cfg := config.DefaultMarketConfig + cfg.Mysql.ConnectionString = mysqlURL + repo, err := mysql.InitMysql(&cfg.Mysql) + if err != nil { + return nil, fmt.Errorf("connect to mysql failed: %v", err) + } + + return ¶ms{ + api: api, + close: close, + topIndexRepo: topIndexRepo, + shardRepo: repo.ShardRepo(), + pieces: pieces, + }, nil +} + +func generateIndex(ctx context.Context, carDir string, indexDir string, p *params) error { + for piece := range p.pieces { + has, err := hasIndex(ctx, piece, indexDir) + if err != nil { + return err + } + if has { + continue + } + + f, err := os.Open(filepath.Join(carDir, piece)) + if err != nil { + fmt.Println(err) + continue + } + defer f.Close() //nolint + + idx, err := car.ReadOrGenerateIndex(f, car.ZeroLengthSectionAsEOF(true), car.StoreIdentityCIDs(true)) + if err == nil { + fmt.Printf("generate index success: %s\n", piece) + + if err := saveIndex(idx, indexDir, piece); err != nil { + return fmt.Errorf("save index failed, piece: %s, error: %v", piece, err) + } + if err := saveTopIndexToMongo(ctx, piece, idx, p.topIndexRepo); err != nil { + return fmt.Errorf("save top index to mongo failed, piece: %s, error: %v", piece, err) + } + if err := saveShardToMysql(ctx, piece, p.shardRepo); err != nil { + return fmt.Errorf("save shard to mysql failed, piece: %s, error: %vs", piece, err) + } + } else { + fmt.Printf("generate index failed, piece: %s, error: %v\n", piece, err) + } + } + + return nil +} + +func hasIndex(ctx context.Context, piece string, indexDir string) (bool, error) { + _, err := os.Stat(filepath.Join(indexDir, piece+indexSuffix)) + if err != nil { + if os.IsNotExist(err) { + return false, nil + } + return false, err + } + + return true, nil +} + +func saveIndex(idx carindex.Index, dir string, piece string) error { + f, err := os.Create(filepath.Join(dir, piece+indexSuffix)) + if err != nil { + return err + } + defer f.Close() //nolint + + _, err = carindex.WriteTo(idx, f) + return err +} + +func saveTopIndexToMongo(ctx context.Context, key string, idx carindex.Index, indexRepo *dagstore.MongoTopIndex) error { + // add all cids in the shard to the inverted (cid -> []Shard Keys) index. + iterableIdx, ok := idx.(carindex.IterableIndex) + if ok { + if err := indexRepo.AddMultihashesForShard(ctx, &mhIdx{iterableIdx}, shard.KeyFromString(key)); err != nil { + return err + } + } + + return nil +} + +func saveShardToMysql(ctx context.Context, piece string, shardRepo repo.IShardRepo) error { + shard := dagstore2.PersistedShard{ + Key: piece, + URL: fmt.Sprintf("market://%s", piece), + State: dagstore2.ShardStateAvailable, + Lazy: true, + Error: "", + } + + return shardRepo.CreateShard(ctx, &shard) +} + +var migrateIndexCmd = &cli.Command{ + Name: "migrate-index", + Usage: "migrate top index to MongoDB and migrate shard state to mysql", + Flags: []cli.Flag{ + mongoURLFlag, + mysqlURLFlag, + indexDirFlag, + dropletURLFlag, + dropletTokenFlag, + }, + Action: func(cctx *cli.Context) error { + ctx := cctx.Context + indexDir := cctx.String(indexDirFlag.Name) + p, err := paramsFromContext(cctx) + if err != nil { + return err + } + + fmt.Println("index dir:", indexDir) + + return migrateIndex(ctx, indexDir, p) + }, +} + +func migrateIndex(ctx context.Context, indexDir string, p *params) error { + return filepath.Walk(indexDir, func(path string, info os.FileInfo, err error) error { + if err != nil { + return err + } + + name := info.Name() + if !strings.HasSuffix(name, indexSuffix) { + return nil + } + piece := name[:len(name)-len(indexSuffix)] + if _, ok := p.pieces[piece]; !ok { + return nil + } + indexPath := filepath.Join(indexDir, name) + has, err := p.shardRepo.HasShard(ctx, piece) + if err != nil { + return err + } + if has { + return nil + } + + f, err := os.Open(indexPath) + if err != nil { + return err + } + defer f.Close() //nolint + idx, err := carindex.ReadFrom(f) + if err != nil { + return err + } + + if err := saveTopIndexToMongo(ctx, piece, idx, p.topIndexRepo); err != nil { + return fmt.Errorf("save top index to mongo failed, piece: %s, error: %v", piece, err) + } + if err := saveShardToMysql(ctx, piece, p.shardRepo); err != nil { + return fmt.Errorf("save shard to mysql failed, piece: %s, error: %vs", piece, err) + } + fmt.Printf("migrate %s success\n", piece) + + return nil + }) +} + +// Convenience struct for converting from CAR index.IterableIndex to the +// iterator required by the dag store inverted index. +type mhIdx struct { + iterableIdx carindex.IterableIndex +} + +var _ index.MultihashIterator = (*mhIdx)(nil) + +func (it *mhIdx) ForEach(fn func(mh multihash.Multihash) error) error { + return it.iterableIdx.ForEach(func(mh multihash.Multihash, _ uint64) error { + return fn(mh) + }) +}