From 0c294756958df2e1c10649c8ed4e4d6a92263746 Mon Sep 17 00:00:00 2001 From: Nut He <18328704+hetao92@users.noreply.github.com> Date: Tue, 7 Mar 2023 10:13:16 +0800 Subject: [PATCH 1/4] mod: update importer version --- app/utils/import.ts | 267 ++++++++---------- server/api/studio/internal/model/taskInfo.go | 10 +- server/api/studio/internal/service/import.go | 93 +++--- .../internal/service/importer/importer.go | 194 +++++-------- .../studio/internal/service/importer/task.go | 19 +- .../internal/service/importer/taskmgr.go | 115 +++++--- server/api/studio/internal/types/types.go | 171 +++++------ server/api/studio/restapi/import.api | 170 +++++------ server/go.mod | 38 ++- server/go.sum | 74 +++-- 10 files changed, 565 insertions(+), 586 deletions(-) diff --git a/app/utils/import.ts b/app/utils/import.ts index f5e8a854..aaf3514c 100644 --- a/app/utils/import.ts +++ b/app/utils/import.ts @@ -26,32 +26,32 @@ export function configToJson(payload: IConfig) { retry, channelBufferSize } = payload; + console.log('tagConfig', tagConfig); + console.log('edgeConfig', edgeConfig); const vertexToJSON = tagDataToJSON( tagConfig, spaceVidType, - batchSize ); + console.log('vertexToJSON', vertexToJSON); const edgeToJSON = edgeDataToJSON( edgeConfig, spaceVidType, - batchSize ); - const files: any[] = [...vertexToJSON, ...edgeToJSON]; + const sources: any[] = [...vertexToJSON, ...edgeToJSON]; const configJson = { - version: 'v2', - description: 'studio import', - clientSettings: { + client: { + version: 'v3', + address: address.join(','), + user: username, + password, + concurrencyPerAddress: Number(concurrency ?? DEFAULT_IMPORT_CONFIG.concurrency), retry: Number(retry ?? DEFAULT_IMPORT_CONFIG.retry), - concurrency: Number(concurrency ?? DEFAULT_IMPORT_CONFIG.concurrency), - channelBufferSize: Number(channelBufferSize ?? DEFAULT_IMPORT_CONFIG.channelBufferSize), - space: handleEscape(space), - connection: { - user: username, - password, - address: address.join(',') - }, }, - files, + manager: { + spaceName: handleEscape(space), + batch: Number(batchSize) || DEFAULT_IMPORT_CONFIG.batchSize, + }, + sources }; return configJson; } @@ -59,7 +59,6 @@ export function configToJson(payload: IConfig) { export function edgeDataToJSON( configs: IEdgeItem[], spaceVidType: string, - batchSize?: string, ) { const result = configs.reduce((acc: any, cur) => { const { name, files } = cur; @@ -79,33 +78,30 @@ export function edgeDataToJSON( }); return acc; }, []); + const edges = [{ + name: handleEscape(name), + src: { + type: vidType, + index: srcIdIndex, + function: srcIdFunction, + }, + dst: { + type: vidType, + index: dstIdIndex, + function: dstIdFunction, + }, + rank: { index: rank.mapping }, + props: edgeProps, + }]; const edgeConfig = { - path: file.name, - batchSize: Number(batchSize) || DEFAULT_IMPORT_CONFIG.batchSize, - type: 'csv', + local: { + path: file.name, + }, csv: { withHeader: file.withHeader || false, - withLabel: false, delimiter: file.delimiter }, - schema: { - type: 'edge', - edge: { - name: handleEscape(name), - srcVID: { - index: srcIdIndex, - function: srcIdFunction, - type: vidType, - }, - dstVID: { - index: dstIdIndex, - function: dstIdFunction, - type: vidType, - }, - rank: { index: rank.mapping }, - props: edgeProps, - }, - }, + edges, }; return edgeConfig; }); @@ -118,12 +114,11 @@ export function edgeDataToJSON( export function tagDataToJSON( configs: ITagItem[], spaceVidType: string, - batchSize?: string ) { const result = configs.reduce((acc: any, cur) => { const { name, files } = cur; const _config = files.map(item => { - const { file, props, vidIndex, vidFunction, vidPrefix } = item; + const { file, props, vidIndex, vidFunction } = item; const _props = props.reduce((acc: any, cur) => { if (isEmpty(cur.mapping) && (cur.allowNull || cur.isDefault)) { return acc; @@ -138,29 +133,22 @@ export function tagDataToJSON( const tags = [{ name: handleEscape(name), + id: { + type: spaceVidType === 'INT64' ? 'int' : 'string', + index: vidIndex, + function: vidFunction, + }, props: _props.filter(prop => prop), }]; return { - path: file.name, - batchSize: Number(batchSize) || DEFAULT_IMPORT_CONFIG.batchSize, - type: 'csv', + local: { + path: file.name, + }, csv: { withHeader: file.withHeader || false, - withLabel: false, delimiter: file.delimiter }, - schema: { - type: 'vertex', - vertex: { - vid: { - index: vidIndex, - function: vidFunction, - type: spaceVidType === 'INT64' ? 'int' : 'string', - prefix: vidPrefix, - }, - tags, - }, - } + tags }; }); acc.push(..._config); @@ -170,141 +158,112 @@ export function tagDataToJSON( } export const exampleJson = { - 'version': 'v2', - 'description': 'web console import', - 'removeTempFiles': null, - 'clientSettings': { - 'retry': 3, - 'concurrency': 10, - 'channelBufferSize': 128, - 'space': 'sales', - 'connection': { - 'user': '', - 'password': '', - 'address': '' - }, - 'postStart': null, - 'preStop': null + 'client': { + 'version': 'v3', + 'user': '', + 'password': '', + 'address': '' }, - 'logPath': 'import.log', - 'files': [ + 'manager': { + 'spaceName': 'sales', + }, + 'sources': [ { 'path': 'item.csv', - 'batchSize': 60, - 'limit': null, - 'inOrder': null, - 'type': 'csv', 'csv': { 'withHeader': false, - 'withLabel': false, 'delimiter': null }, - 'schema': { - 'type': 'vertex', - 'edge': null, - 'vertex': { + 'tags': [ + { + 'name': 'item', 'vid': { 'index': 0, 'function': null, 'type': 'string', - 'prefix': null }, - 'tags': [ - { - 'name': 'item', - 'props': [ - { - 'name': 'id_single_item', - 'type': 'string', - 'index': 0 - }, - { - 'name': 'region', - 'type': 'string', - 'index': 1 - }, - { - 'name': 'country', - 'type': 'string', - 'index': 2 - }, - { - 'name': 'item_type', - 'type': 'string', - 'index': 3 - }, - { - 'name': 'sales_channel', - 'type': 'string', - 'index': 4 - } - ] - } - ] - } - } - }, - { - 'path': 'orderr.csv', - 'batchSize': 60, - 'limit': null, - 'inOrder': null, - 'type': 'csv', - 'csv': { - 'withHeader': false, - 'withLabel': false, - 'delimiter': null - }, - 'schema': { - 'type': 'edge', - 'edge': { - 'name': 'order', 'props': [ { - 'name': 'order_id', + 'name': 'id_single_item', 'type': 'string', 'index': 0 }, { - 'name': 'id_item', + 'name': 'region', 'type': 'string', - 'index': 0 + 'index': 1 }, { - 'name': 'unit_sold', + 'name': 'country', 'type': 'string', 'index': 2 }, { - 'name': 'unit_price', + 'name': 'item_type', 'type': 'string', 'index': 3 }, { - 'name': 'unit_cost', + 'name': 'sales_channel', 'type': 'string', 'index': 4 - }, - { - 'name': 'total_profit', - 'type': 'string', - 'index': 5 } - ], - 'srcVID': { - 'index': 1, - 'function': null, + ] + } + ] + }, + { + 'path': 'orderr.csv', + 'csv': { + 'withHeader': false, + 'delimiter': null + }, + 'edges': [{ + 'name': 'order', + 'props': [ + { + 'name': 'order_id', 'type': 'string', + 'index': 0 }, - 'dstVID': { - 'index': 1, - 'function': null, + { + 'name': 'id_item', + 'type': 'string', + 'index': 0 + }, + { + 'name': 'unit_sold', 'type': 'string', + 'index': 2 }, - 'rank': null + { + 'name': 'unit_price', + 'type': 'string', + 'index': 3 + }, + { + 'name': 'unit_cost', + 'type': 'string', + 'index': 4 + }, + { + 'name': 'total_profit', + 'type': 'string', + 'index': 5 + } + ], + 'src': { + 'index': 1, + 'function': null, + 'type': 'string', + }, + 'dst': { + 'index': 1, + 'function': null, + 'type': 'string', }, - 'vertex': null - } + 'rank': null + }], } ] }; \ No newline at end of file diff --git a/server/api/studio/internal/model/taskInfo.go b/server/api/studio/internal/model/taskInfo.go index 4d9a73f2..897ec39f 100644 --- a/server/api/studio/internal/model/taskInfo.go +++ b/server/api/studio/internal/model/taskInfo.go @@ -1,6 +1,10 @@ package db -import "github.com/vesoft-inc/nebula-importer/pkg/stats" +import ( + "time" + + "github.com/vesoft-inc/nebula-importer/v4/pkg/stats" +) type TaskInfo struct { ID int `json:"taskID" gorm:"primaryKey;autoIncrement"` @@ -8,8 +12,8 @@ type TaskInfo struct { Name string `json:"name"` Space string `json:"space"` ImportAddress string `json:"importAddress"` - CreatedTime int64 `json:"createdTime"` - UpdatedTime int64 `json:"updatedTime"` + CreateTime time.Time `json:"create_time" gorm:"autoCreateTime"` + UpdateTime time.Time `json:"update_time" gorm:"autoUpdateTime"` User string `json:"user"` TaskStatus string `json:"taskStatus"` TaskMessage string `json:"taskMessage"` diff --git a/server/api/studio/internal/service/import.go b/server/api/studio/internal/service/import.go index ad5f440f..390b29f3 100644 --- a/server/api/studio/internal/service/import.go +++ b/server/api/studio/internal/service/import.go @@ -14,16 +14,13 @@ import ( "sync" "github.com/vesoft-inc/go-pkg/middleware" - importconfig "github.com/vesoft-inc/nebula-importer/pkg/config" - importererrors "github.com/vesoft-inc/nebula-importer/pkg/errors" + configv3 "github.com/vesoft-inc/nebula-importer/v4/pkg/config/v3" "github.com/vesoft-inc/nebula-studio/server/api/studio/internal/service/importer" "github.com/vesoft-inc/nebula-studio/server/api/studio/internal/svc" "github.com/vesoft-inc/nebula-studio/server/api/studio/internal/types" "github.com/vesoft-inc/nebula-studio/server/api/studio/pkg/auth" "github.com/vesoft-inc/nebula-studio/server/api/studio/pkg/ecode" - "github.com/vesoft-inc/nebula-studio/server/api/studio/pkg/utils" "github.com/zeromicro/go-zero/core/logx" - "go.uber.org/zap" ) var ( @@ -66,59 +63,49 @@ func NewImportService(ctx context.Context, svcCtx *svc.ServiceContext) ImportSer } func (i *importService) CreateImportTask(req *types.CreateImportTaskRequest) (*types.CreateImportTaskData, error) { + // 校验 + // 初始化 + // 生成目录 + // 生成配置文件 + // 启动任务 jsons, err := json.Marshal(req.Config) if err != nil { return nil, ecode.WithErrorMessage(ecode.ErrParam, err) } - conf := importconfig.YAMLConfig{} + conf := configv3.Config{} err = json.Unmarshal(jsons, &conf) if err != nil { return nil, ecode.WithErrorMessage(ecode.ErrInternalServer, err) } - if err = validClientParams(&conf); err != nil { - err = importererrors.Wrap(importererrors.InvalidConfigPathOrFormat, err) - zap.L().Warn("client params is wrong", zap.Error(err)) + // init task + auth := i.ctx.Value(auth.CtxKeyUserInfo{}).(*auth.AuthData) + host := auth.Address + ":" + strconv.Itoa(auth.Port) + taskMgr := importer.GetTaskMgr() + task, taskID, err := taskMgr.NewTask(host, auth.Username, req.Name, conf) + if err != nil { + logx.Errorf("init task fail", err) return nil, ecode.WithErrorMessage(ecode.ErrInternalServer, err) } - - taskDir, err := importer.GetNewTaskDir(i.svcCtx.Config.File.TasksDir) + // create task dir + taskDir, err := importer.CreateTaskDir(i.svcCtx.Config.File.TasksDir, taskID) if err != nil { return nil, ecode.WithErrorMessage(ecode.ErrInternalServer, err) } - logPath := filepath.Join(taskDir, importLogName) - conf.LogPath = &logPath // create config file - if err := importer.CreateConfigFile(i.svcCtx.Config.File.UploadDir, taskDir, conf); err != nil { + if err := importer.CreateConfigFile(taskDir, conf); err != nil { return nil, ecode.WithErrorMessage(ecode.ErrInternalServer, err) } - // create err dir - taskErrDir := filepath.Join(taskDir, "err") - if err = utils.CreateDir(taskErrDir); err != nil { - return nil, ecode.WithErrorMessage(ecode.ErrInternalServer, err) - } - - // import - address := *conf.NebulaClientSettings.Connection.Address - user := *conf.NebulaClientSettings.Connection.User - name := req.Name - space := *conf.NebulaClientSettings.Space - auth := i.ctx.Value(auth.CtxKeyUserInfo{}).(*auth.AuthData) - host := auth.Address + ":" + strconv.Itoa(auth.Port) - task, taskID, err := importer.GetTaskMgr().NewTask(host, address, user, name, space) - if err != nil { - zap.L().Warn("init task fail", zap.Error(err)) - return nil, ecode.WithErrorMessage(ecode.ErrInternalServer, err) - } - if err = importer.Import(taskID, &conf); err != nil { + // start import + if err = importer.StartImport(taskID); err != nil { // task err: import task not start err task.TaskInfo.TaskStatus = importer.StatusAborted.String() err1 := importer.GetTaskMgr().AbortTask(taskID) if err != nil { - zap.L().Warn("finish task fail", zap.Error(err1)) + logx.Errorf("finish task fail", err1) } - zap.L().Error(fmt.Sprintf("Failed to start a import task: `%s`, task result: `%v`", taskID, err)) + logx.Errorf(fmt.Sprintf("Failed to start a import task: `%s`, task result: `%v`", taskID, err)) return nil, ecode.WithErrorMessage(ecode.ErrInternalServer, err) } @@ -126,24 +113,24 @@ func (i *importService) CreateImportTask(req *types.CreateImportTaskRequest) (*t muTaskId.Lock() taskIDBytes, err := ioutil.ReadFile(i.svcCtx.Config.File.TaskIdPath) if err != nil { - zap.L().Warn("read taskId file error", zap.Error(err)) + logx.Errorf("read taskId file error", err) return nil, ecode.WithErrorMessage(ecode.ErrInternalServer, err) } taskIdJSON := make(map[string]bool) if len(taskIDBytes) != 0 { if err := json.Unmarshal(taskIDBytes, &taskIdJSON); err != nil { - zap.L().Warn("read taskId file error", zap.Error(err)) + logx.Errorf("read taskId file error", err) return nil, ecode.WithErrorMessage(ecode.ErrInternalServer, err) } } taskIdJSON[taskID] = true bytes, err := json.Marshal(taskIdJSON) if err != nil { - zap.L().Warn("read taskId file error", zap.Error(err)) + logx.Errorf("read taskId file error", err) } err = ioutil.WriteFile(i.svcCtx.Config.File.TaskIdPath, bytes, 777) if err != nil { - zap.L().Warn("write taskId file error", zap.Error(err)) + logx.Errorf("write taskId file error", err) } defer muTaskId.Unlock() @@ -259,14 +246,14 @@ func (i *importService) GetManyImportTaskLog(req *types.GetManyImportTaskLogRequ taskIdBytes, err := ioutil.ReadFile(i.svcCtx.Config.File.TaskIdPath) muTaskId.RUnlock() if err != nil { - zap.L().Warn("read taskId file error", zap.Error(err)) + logx.Errorf("read taskId file error", err) return nil, ecode.WithErrorMessage(ecode.ErrInternalServer, err) } taskIdJSON := make(map[string]bool) if len(taskIdBytes) != 0 { err = json.Unmarshal(taskIdBytes, &taskIdJSON) if err != nil { - zap.L().Warn("parse taskId file error", zap.Error(err)) + logx.Errorf("parse taskId file error", err) return nil, ecode.WithErrorMessage(ecode.ErrInternalServer, err) } } @@ -290,24 +277,24 @@ func (i *importService) GetWorkingDir() (*types.GetWorkingDirResult, error) { }, nil } -func validClientParams(conf *importconfig.YAMLConfig) error { - if conf.NebulaClientSettings.Connection == nil || - conf.NebulaClientSettings.Connection.Address == nil || - *conf.NebulaClientSettings.Connection.Address == "" || - conf.NebulaClientSettings.Connection.User == nil || - *conf.NebulaClientSettings.Connection.User == "" || - conf.NebulaClientSettings.Space == nil || - *conf.NebulaClientSettings.Space == "" { - return ecode.WithCode(ecode.ErrParam, nil) - } +// func validClientParams(conf *importconfig.YAMLConfig) error { +// if conf.NebulaClientSettings.Connection == nil || +// conf.NebulaClientSettings.Connection.Address == nil || +// *conf.NebulaClientSettings.Connection.Address == "" || +// conf.NebulaClientSettings.Connection.User == nil || +// *conf.NebulaClientSettings.Connection.User == "" || +// conf.NebulaClientSettings.Space == nil || +// *conf.NebulaClientSettings.Space == "" { +// return ecode.WithCode(ecode.ErrParam, nil) +// } - return nil -} +// return nil +// } func readFileLines(path string, offset int64, limit int64) ([]string, error) { file, err := os.Open(path) if err != nil { - zap.L().Warn("open file error", zap.Error(err)) + logx.Errorf("open file error", err) return nil, ecode.WithErrorMessage(ecode.ErrInternalServer, err) } defer file.Close() diff --git a/server/api/studio/internal/service/importer/importer.go b/server/api/studio/internal/service/importer/importer.go index 223446c6..759221c2 100644 --- a/server/api/studio/internal/service/importer/importer.go +++ b/server/api/studio/internal/service/importer/importer.go @@ -3,21 +3,13 @@ package importer import ( "errors" "fmt" - "os" - "path/filepath" "regexp" "strconv" "time" - importconfig "github.com/vesoft-inc/nebula-importer/pkg/config" - importerErrors "github.com/vesoft-inc/nebula-importer/pkg/errors" - "github.com/vesoft-inc/nebula-importer/pkg/logger" "github.com/vesoft-inc/nebula-studio/server/api/studio/internal/types" "github.com/vesoft-inc/nebula-studio/server/api/studio/pkg/ecode" - "github.com/vesoft-inc/nebula-studio/server/api/studio/pkg/utils" - "go.uber.org/zap" - - "gopkg.in/yaml.v2" + "github.com/zeromicro/go-zero/core/logx" ) type ImportResult struct { @@ -30,74 +22,21 @@ type ImportResult struct { } } -func GetNewTaskDir(tasksDir string) (string, error) { - taskId, err := GetTaskMgr().NewTaskID() - if err != nil { - return "", err - } - taskDir := filepath.Join(tasksDir, taskId) - return taskDir, nil -} - -func CreateConfigFile(uploadDir, taskdir string, config importconfig.YAMLConfig) error { - fileName := "config.yaml" - // err := utils.CreateDir(taskdir) - if err := utils.CreateDir(taskdir); err != nil { - return ecode.WithErrorMessage(ecode.ErrInternalServer, err) - } - path := filepath.Join(taskdir, fileName) - // erase user information - address := *config.NebulaClientSettings.Connection.Address - user := *config.NebulaClientSettings.Connection.User - password := *config.NebulaClientSettings.Connection.Password - *config.NebulaClientSettings.Connection.Address = "" - *config.NebulaClientSettings.Connection.User = "" - *config.NebulaClientSettings.Connection.Password = "" +func StartImport(taskID string) (err error) { + task, _ := GetTaskMgr().GetTask(taskID) + signal := make(chan struct{}, 1) - // erase path infomation - logPath := *config.LogPath - *config.LogPath = "import.log" - paths := make([]string, 0) - failDataPaths := make([]string, 0) - for _, file := range config.Files { - paths = append(paths, filepath.Join(uploadDir, *file.Path)) - failDataPaths = append(failDataPaths, filepath.Join(taskdir, "err")) - _, fileName := filepath.Split(*file.Path) - *file.Path = fileName - if file.FailDataPath == nil { - file.FailDataPath = new(string) + abort := func() { + task.TaskInfo.TaskStatus = StatusAborted.String() + task.TaskInfo.TaskMessage = err.Error() + err = GetTaskMgr().AbortTask(taskID) + if err != nil { + logx.Errorf("start task fail, %v", err) } - *file.FailDataPath = fileName - } - - outYaml, err := yaml.Marshal(config) - if err != nil { - return ecode.WithErrorMessage(ecode.ErrInternalServer, err) - } - if err := os.WriteFile(path, outYaml, 0o644); err != nil { - zap.L().Warn("write"+path+"file error", zap.Error(err)) - return ecode.WithErrorMessage(ecode.ErrInternalServer, err) - } - - *config.LogPath = logPath - *config.NebulaClientSettings.Connection.Address = address - *config.NebulaClientSettings.Connection.User = user - *config.NebulaClientSettings.Connection.Password = password - for i, file := range config.Files { - *file.Path = paths[i] - *file.FailDataPath = failDataPaths[i] - } - return nil -} - -func Import(taskID string, conf *importconfig.YAMLConfig) (err error) { - runnerLogger := logger.NewRunnerLogger(*conf.LogPath) - if err := conf.ValidateAndReset("", runnerLogger); err != nil { - return ecode.WithErrorMessage(ecode.ErrInternalServer, err) + signal <- struct{}{} + return } - task, _ := GetTaskMgr().GetTask(taskID) - signal := make(chan struct{}, 1) go func() { ticker := time.NewTicker(2 * time.Second) defer ticker.Stop() @@ -106,7 +45,7 @@ func Import(taskID string, conf *importconfig.YAMLConfig) (err error) { case <-ticker.C: err := GetTaskMgr().UpdateTaskInfo(taskID) if err != nil { - zap.L().Warn(fmt.Sprintf("UpdateTaskInfo fail, id : %s", taskID), zap.Error(err)) + logx.Errorf(fmt.Sprintf("UpdateTaskInfo fail, id : %s", taskID), err) } case <-signal: return @@ -114,52 +53,29 @@ func Import(taskID string, conf *importconfig.YAMLConfig) (err error) { } }() go func() { - result := ImportResult{} - now := time.Now() - task.GetRunner().Run(conf) - timeCost := time.Since(now).Milliseconds() - result.TaskId = taskID - result.TimeCost = fmt.Sprintf("%dms", timeCost) - if rerrs := task.GetRunner().Errors(); len(rerrs) != 0 { - allErrIsNotCompleteError := true - for _, rerr := range rerrs { - err := rerr.(importerErrors.ImporterError) - if err.ErrCode != importerErrors.NotCompleteError { - allErrIsNotCompleteError = false - break - } - } - if allErrIsNotCompleteError { - task.TaskInfo.TaskStatus = StatusFinished.String() - result.FailedRows = task.GetRunner().NumFailed - err1 := GetTaskMgr().FinishTask(taskID) - if err1 != nil { - zap.L().Warn("finish task fail", zap.Error(err1)) - } - zap.L().Debug(fmt.Sprintf("Success to finish a import task: `%s`, task result: `%v`", taskID, result)) - return - } - // TODO: return all errors + mgr := task.Client.Manager + cfg := task.Client.Cfg + if err = cfg.Build(); err != nil { + abort() + } + if err = mgr.Start(); err != nil { + abort() + } + err = mgr.Wait() + if err != nil { task.TaskInfo.TaskStatus = StatusAborted.String() - err, _ := rerrs[0].(importerErrors.ImporterError) - result.ErrorResult.ErrorCode = err.ErrCode - result.ErrorResult.ErrorMsg = err.ErrMsg.Error() - task.TaskInfo.TaskMessage = err.ErrMsg.Error() - err1 := GetTaskMgr().AbortTask(taskID) - if err1 != nil { - zap.L().Warn("finish task fail", zap.Error(err1)) - } - zap.L().Warn(fmt.Sprintf("Failed to finish a import task: `%s`, task result: `%v`", taskID, result)) - } else { - task.TaskInfo.TaskStatus = StatusFinished.String() - result.FailedRows = task.GetRunner().NumFailed - err := GetTaskMgr().FinishTask(taskID) + task.TaskInfo.TaskMessage = err.Error() + err = GetTaskMgr().AbortTask(taskID) if err != nil { - zap.L().Warn("finish task fail", zap.Error(err)) + logx.Errorf("finish task fail, %v", err) } - zap.L().Debug(fmt.Sprintf("Success to finish a import task: `%s`, task result: `%v`", taskID, result)) + return + } + task.TaskInfo.TaskStatus = StatusFinished.String() + err = GetTaskMgr().FinishTask(taskID) + if err != nil { + logx.Errorf("finish task fail, %v", err) } - signal <- struct{}{} }() return nil @@ -167,12 +83,12 @@ func Import(taskID string, conf *importconfig.YAMLConfig) (err error) { func DeleteImportTask(tasksDir, taskID, address, username string) error { if id, err := strconv.Atoi(taskID); err != nil { - zap.L().Warn(fmt.Sprintf("stop task fail, id : %s", taskID), zap.Error(err)) + logx.Errorf(fmt.Sprintf("stop task fail, id : %s", taskID), err) return errors.New("task not existed") } else { _, err := taskmgr.db.FindTaskInfoByIdAndAddresssAndUser(id, address, username) if err != nil { - zap.L().Warn(fmt.Sprintf("stop task fail, id : %s", taskID), zap.Error(err)) + logx.Errorf(fmt.Sprintf("stop task fail, id : %s", taskID), err) return errors.New("task not existed") } } @@ -202,17 +118,29 @@ func GetImportTask(tasksDir, taskID, address, username string) (*types.GetImport if err != nil { return nil, err } + stats := task.TaskInfo.Stats result.Id = strconv.Itoa(t.TaskInfo.ID) result.Status = task.TaskInfo.TaskStatus result.Message = task.TaskInfo.TaskMessage - result.CreateTime = task.TaskInfo.CreatedTime - result.UpdateTime = task.TaskInfo.UpdatedTime + result.CreateTime = task.TaskInfo.CreateTime.UnixMilli() + result.UpdateTime = task.TaskInfo.UpdateTime.UnixMilli() result.Address = task.TaskInfo.Address result.ImportAddress = importAddress result.User = task.TaskInfo.User result.Name = task.TaskInfo.Name result.Space = task.TaskInfo.Space - result.Stats = types.ImportTaskStats(task.TaskInfo.Stats) + result.Stats = types.ImportTaskStats{ + TotalBytes: stats.TotalBytes, + ProcessedBytes: stats.ProcessedBytes, + FailedRecords: stats.FailedRecords, + TotalRecords: stats.TotalRecords, + TotalRequest: stats.TotalRequest, + FailedRequest: stats.FailedRequest, + TotalLatency: int64(stats.TotalLatency), + TotalRespTime: int64(stats.TotalRespTime), + FailedProcessed: stats.FailedProcessed, + TotalProcessed: stats.TotalProcessed, + } } return result, nil @@ -234,18 +162,30 @@ func GetManyImportTask(tasksDir, address, username string, pageIndex, pageSize i if err != nil { return nil, err } + stats := t.Stats data := types.GetImportTaskData{ Id: strconv.Itoa(t.ID), Status: t.TaskStatus, Message: t.TaskMessage, - CreateTime: t.CreatedTime, - UpdateTime: t.UpdatedTime, + CreateTime: t.CreateTime.UnixMilli(), + UpdateTime: t.UpdateTime.UnixMilli(), Address: t.Address, ImportAddress: importAddress, User: t.User, Name: t.Name, Space: t.Space, - Stats: types.ImportTaskStats(t.Stats), + Stats: types.ImportTaskStats{ + TotalBytes: stats.TotalBytes, + ProcessedBytes: stats.ProcessedBytes, + FailedRecords: stats.FailedRecords, + TotalRecords: stats.TotalRecords, + TotalRequest: stats.TotalRequest, + FailedRequest: stats.FailedRequest, + TotalLatency: int64(stats.TotalLatency), + TotalRespTime: int64(stats.TotalRespTime), + FailedProcessed: stats.FailedProcessed, + TotalProcessed: stats.TotalProcessed, + }, } result.List = append(result.List, data) } @@ -256,19 +196,19 @@ func GetManyImportTask(tasksDir, address, username string, pageIndex, pageSize i func StopImportTask(taskID, address, username string) error { if id, err := strconv.Atoi(taskID); err != nil { - zap.L().Warn(fmt.Sprintf("stop task fail, id : %s", taskID), zap.Error(err)) + logx.Errorf(fmt.Sprintf("stop task fail, id : %s", taskID), err) return errors.New("task not existed") } else { _, err := taskmgr.db.FindTaskInfoByIdAndAddresssAndUser(id, address, username) if err != nil { - zap.L().Warn(fmt.Sprintf("stop task fail, id : %s", taskID), zap.Error(err)) + logx.Errorf(fmt.Sprintf("stop task fail, id : %s", taskID), err) return errors.New("task not existed") } } err := GetTaskMgr().StopTask(taskID) if err != nil { - zap.L().Warn(fmt.Sprintf("stop task fail, id : %s", taskID), zap.Error(err)) + logx.Errorf(fmt.Sprintf("stop task fail, id : %s", taskID), err) return ecode.WithErrorMessage(ecode.ErrInternalServer, err) } else { return nil diff --git a/server/api/studio/internal/service/importer/task.go b/server/api/studio/internal/service/importer/task.go index 4dc2de11..27fa61c7 100644 --- a/server/api/studio/internal/service/importer/task.go +++ b/server/api/studio/internal/service/importer/task.go @@ -2,23 +2,24 @@ package importer import ( db "github.com/vesoft-inc/nebula-studio/server/api/studio/internal/model" - "github.com/vesoft-inc/nebula-studio/server/api/studio/pkg/ecode" - "github.com/vesoft-inc/nebula-importer/pkg/cmd" - "github.com/zeromicro/go-zero/core/logx" + "github.com/vesoft-inc/nebula-importer/v4/pkg/config" + "github.com/vesoft-inc/nebula-importer/v4/pkg/logger" + "github.com/vesoft-inc/nebula-importer/v4/pkg/manager" ) +type Client struct { + Cfg config.Configurator `json:"cfg,omitempty"` + Logger logger.Logger `json:"logger,omitempty"` + Manager manager.Manager `json:"manager,omitempty"` +} type Task struct { - Runner *cmd.Runner `json:"runner,omitempty"` + Client Client `json:"client,omitempty"` TaskInfo *db.TaskInfo `json:"task_info,omitempty"` } func (t *Task) UpdateQueryStats() error { - stats, err := t.Runner.QueryStats() - if err != nil { - logx.Infof("query import stats fail: %s", err) - return ecode.WithErrorMessage(ecode.ErrInternalServer, err) - } + stats := t.Client.Manager.Stats() t.TaskInfo.Stats = *stats return nil } diff --git a/server/api/studio/internal/service/importer/taskmgr.go b/server/api/studio/internal/service/importer/taskmgr.go index 686fa23e..3fe9eead 100644 --- a/server/api/studio/internal/service/importer/taskmgr.go +++ b/server/api/studio/internal/service/importer/taskmgr.go @@ -1,19 +1,22 @@ package importer import ( + "encoding/json" "errors" "fmt" "os" "path/filepath" "strconv" "sync" - "time" + importConfig "github.com/vesoft-inc/nebula-importer/v4/pkg/config" + configv3 "github.com/vesoft-inc/nebula-importer/v4/pkg/config/v3" db "github.com/vesoft-inc/nebula-studio/server/api/studio/internal/model" "github.com/vesoft-inc/nebula-studio/server/api/studio/pkg/ecode" - - "github.com/vesoft-inc/nebula-importer/pkg/cmd" + "github.com/vesoft-inc/nebula-studio/server/api/studio/pkg/utils" "github.com/zeromicro/go-zero/core/logx" + "go.uber.org/zap" + "gopkg.in/yaml.v2" _ "github.com/mattn/go-sqlite3" ) @@ -32,51 +35,82 @@ type TaskMgr struct { db *TaskDb } -func newTask(host string, importAddress string, user string, name string, space string) *Task { - timeUnix := time.Now().Unix() - return &Task{ - Runner: &cmd.Runner{}, - TaskInfo: &db.TaskInfo{ - Name: name, - Address: host, - Space: space, - CreatedTime: timeUnix, - UpdatedTime: timeUnix, - TaskStatus: StatusProcessing.String(), - ImportAddress: importAddress, - User: user, - }, +func createTaskClient(conf configv3.Config) (client Client, err error) { + jsons, err := json.Marshal(conf) + if err != nil { + return Client{}, ecode.WithErrorMessage(ecode.ErrParam, err) + } + cfg, err := importConfig.FromBytes(jsons) + if err != nil { + return Client{}, ecode.WithErrorMessage(ecode.ErrInternalServer, err) + } + client = Client{ + Cfg: cfg, + Manager: cfg.GetManager(), + Logger: cfg.GetLogger(), } + return client, nil } -func (task *Task) GetRunner() *cmd.Runner { - return task.Runner +func CreateTaskDir(rootDir string, id string) (string, error) { + taskDir := filepath.Join(rootDir, id) + if err := utils.CreateDir(taskDir); err != nil { + return "", ecode.WithErrorMessage(ecode.ErrInternalServer, err) + } + return taskDir, nil } -func (mgr *TaskMgr) NewTaskID() (string, error) { - tid, err := mgr.db.LastId() +func CreateConfigFile(taskdir string, config configv3.Config) error { + fileName := "config.yaml" + path := filepath.Join(taskdir, fileName) + // erase user information + _config := config + _config.Client.User = "" + _config.Client.Password = "" + _config.Client.Address = "" + + // TODO hide data source access key and so on + + outYaml, err := yaml.Marshal(config) if err != nil { - return "", err + return ecode.WithErrorMessage(ecode.ErrInternalServer, err) } - taskID := fmt.Sprintf("%v", tid+1) - return taskID, nil + if err := os.WriteFile(path, outYaml, 0o644); err != nil { + zap.L().Warn("write"+path+"file error", zap.Error(err)) + return ecode.WithErrorMessage(ecode.ErrInternalServer, err) + } + return nil } -func (mgr *TaskMgr) NewTask(host string, importAddress string, user string, name string, space string) (*Task, string, error) { +func (mgr *TaskMgr) NewTask(host string, user string, taskName string, conf configv3.Config) (*Task, string, error) { mux.Lock() defer mux.Unlock() - task := newTask(host, importAddress, user, name, space) - if err := mgr.db.InsertTaskInfo(task.TaskInfo); err != nil { + // init importer client + client, err := createTaskClient(conf) + if err != nil { return nil, "", err } - tid, err := mgr.db.LastId() - if err != nil { + // init task db + taskInfo := &db.TaskInfo{ + Name: taskName, + Address: host, + Space: conf.Manager.GraphName, + TaskStatus: StatusProcessing.String(), + ImportAddress: conf.Client.Address, + User: user, + } + task := &Task{ + Client: client, + TaskInfo: taskInfo, + } + if err := mgr.db.InsertTaskInfo(task.TaskInfo); err != nil { return nil, "", err } - task.TaskInfo.ID = tid - taskID := fmt.Sprintf("%v", tid) - mgr.PutTask(taskID, task) - return task, taskID, nil + + // create task dir + id := string(taskInfo.ID) + mgr.PutTask(id, task) + return task, id, nil } func GetTaskMgr() *TaskMgr { @@ -117,8 +151,6 @@ func (mgr *TaskMgr) FinishTask(taskID string) (err error) { if err := task.UpdateQueryStats(); err != nil { return ecode.WithErrorMessage(ecode.ErrInternalServer, err) } - timeUnix := time.Now().Unix() - task.TaskInfo.UpdatedTime = timeUnix err = mgr.db.UpdateTaskInfo(task.TaskInfo) if err != nil { return ecode.WithErrorMessage(ecode.ErrInternalServer, err) @@ -132,8 +164,6 @@ func (mgr *TaskMgr) AbortTask(taskID string) (err error) { if !ok { return } - timeUnix := time.Now().Unix() - task.TaskInfo.UpdatedTime = timeUnix err = mgr.db.UpdateTaskInfo(task.TaskInfo) if err != nil { return ecode.WithErrorMessage(ecode.ErrInternalServer, err) @@ -170,8 +200,6 @@ func (mgr *TaskMgr) UpdateTaskInfo(taskID string) error { if err := task.UpdateQueryStats(); err != nil { return ecode.WithErrorMessage(ecode.ErrInternalServer, err) } - timeUnix := time.Now().Unix() - task.TaskInfo.UpdatedTime = timeUnix return mgr.db.UpdateTaskInfo(task.TaskInfo) } @@ -181,11 +209,10 @@ and then call FinishTask */ func (mgr *TaskMgr) StopTask(taskID string) error { if task, ok := mgr.getTaskFromMap(taskID); ok { - if task.GetRunner().Readers == nil { - return errors.New("task is not initialized") - } - for _, r := range task.GetRunner().Readers { - r.Stop() + manager := task.Client.Manager + err := manager.Stop() + if err != nil { + return errors.New("stop task fail") } task.TaskInfo.TaskStatus = StatusStoped.String() if err := mgr.FinishTask(taskID); err != nil { diff --git a/server/api/studio/internal/types/types.go b/server/api/studio/internal/types/types.go index 51f484ef..2840ab9d 100644 --- a/server/api/studio/internal/types/types.go +++ b/server/api/studio/internal/types/types.go @@ -48,108 +48,107 @@ type FileConfigUpdateRequest struct { Name string `json:"name" validate:"required"` } -type ImportTaskConnection struct { - User *string `json:"user" validate:"required"` - Password *string `json:"password" validate:"required"` - Address *string `json:"address" validate:"required"` -} - -type ImportTaskClientSettings struct { - Retry *int `json:"retry,optional"` - Concurrency *int `json:"concurrency,optional"` - ChannelBufferSize *int `json:"channelBufferSize,optional"` - Space *string `json:"space" validate:"required"` - Connection *ImportTaskConnection `json:"connection" validate:"required"` - PostStart *ImportTaskPostStart `json:"postStart,optional"` - PreStop *ImportTaskPreStop `json:"preStop,optional"` -} - -type ImportTaskPostStart struct { - Commands *string `json:"commands, optional"` - AfterPeriod *string `json:"afterPeriod, optional"` -} - -type ImportTaskPreStop struct { - Commands *string `json:"commands,optional"` -} - type ImportTaskCSV struct { WithHeader *bool `json:"withHeader,optional"` - WithLabel *bool `json:"withLabel,optional"` + LazyQuotes *bool `json:"lazyQuotes,optional"` Delimiter *string `json:"delimiter,optional"` } -type ImportTaskVID struct { - Index *int64 `json:"index" validate:"required"` - Type *string `json:"type" validate:"required"` - Function *string `json:"function,optional"` - Prefix *string `json:"prefix,optional"` +type NodeId struct { + Name string `json:"name" validate:"required"` + Type string `json:"type" validate:"required"` + Index int64 `json:"index" validate:"required"` + ConcatItems []interface{} `json:"concatItems,optional"` + Function string `json:"function,optional"` } -type ImportTaskTagProp struct { - Name *string `json:"name" validate:"required"` - Type *string `json:"type" validate:"required"` - Index *int64 `json:"index, optional"` +type Tag struct { + Name string `json:"name" validate:"required"` + ID NodeId `json:"id" validate:"required"` + Props []Prop `json:"props" validate:"required"` + IgnoreExistedIndex bool `json:"ignoreExistedIndex,optional"` } -type ImportTaskTag struct { - Name *string `json:"name" validate:"required"` - Props []*ImportTaskTagProp `json:"props" validate:"required"` +type Edge struct { + Name string `json:"name" validate:"required"` + Src NodeId `json:"src" validate:"required"` + Dst NodeId `json:"dst" validate:"required"` + Props []Prop `json:"props" validate:"required"` + Rank EdgeRank `json:"rank, optional"` + IgnoreExistedIndex bool `json:"ignoreExistedIndex,optional"` } -type ImportTaskVertex struct { - VID *ImportTaskVID `json:"vid" validate:"required"` - Tags []*ImportTaskTag `json:"tags" validate:"required"` +type Prop struct { + Name string `json:"name" validate:"required"` + Type string `json:"type" validate:"required"` + Index int64 `json:"index, optional"` + Nullable bool `json:"nullable, optional"` + NullValue string `json:"nullValue, optional"` + AlternativeIndices []int64 `json:"alternativeIndices, optional"` + DefaultValue string `json:"defaultValue, optional"` } -type ImportTaskEdgeID struct { - Index *int64 `json:"index" validate:"required"` - Function *string `json:"function,optional"` - Type *string `json:"type" validate:"required"` - Prefix *string `json:"prefix,optional"` +type EdgeRank struct { + Index *int64 `json:"index, optional"` } -type ImportTaskEdgeRank struct { - Index *int64 `json:"index, optional"` +type S3Config struct { + Endpoint string `json:"endpoint,omitempty"` + Region string `json:"region,omitempty"` + AccessKey string `json:"accessKey,omitempty"` + SecretKey string `json:"secretKey,omitempty"` + Token string `json:"token,omitempty"` + Bucket string `json:"bucket,omitempty"` + Key string `json:"key,omitempty"` } -type ImportTaskEdgeProp struct { - Name *string `json:"name"` - Type *string `json:"type"` - Index *int64 `json:"index, optional"` +type SFTPConfig struct { + Host string `json:"host,omitempty"` + Port int `json:"port,omitempty"` + User string `json:"user,omitempty"` + Password string `json:"password,omitempty"` + KeyFile string `json:"keyFile,omitempty"` + KeyData string `json:"keyData,omitempty"` + Passphrase string `json:"passphrase,omitempty"` + Path string `json:"path,omitempty"` } -type ImportTaskEdge struct { - Name *string `json:"name" validate:"required"` - SrcVID *ImportTaskEdgeID `json:"srcVID" validate:"required"` - DstVID *ImportTaskEdgeID `json:"dstVID" validate:"required"` - Rank *ImportTaskEdgeRank `json:"rank, optional"` - Props []*ImportTaskEdgeProp `json:"props" validate:"required"` +type LocalConfig struct { + Path string `json:"path,omitempty"` } -type ImportTaskSchema struct { - Type *string `json:"type" validate:"required"` - Edge *ImportTaskEdge `json:"edge,optional"` - Vertex *ImportTaskVertex `json:"vertex,optional"` +type ImportTaskConfig struct { + Client Client `json:"client" validate:"required"` + Manager Manager `json:"manager" validate:"required"` + Sources []Sources `json:"sources" validate:"required"` } -type ImportTaskFile struct { - Path *string `json:"path" validate:"required"` - FailDataPath *string `json:"failDataPath,optional"` - BatchSize *int `json:"batchSize,optional"` - Limit *int `json:"limit, optional"` - InOrder *bool `json:"inOrder, optional"` - Type *string `json:"type" validate:"required"` - CSV *ImportTaskCSV `json:"csv" validate:"required"` - Schema *ImportTaskSchema `json:"schema" validate:"required"` +type Client struct { + Version string `json:"version,omitempty" validate:"required"` + Address string `json:"address,omitempty" validate:"required"` + User string `json:"user,omitempty" validate:"required"` + Password string `json:"password,omitempty" validate:"required"` + ConcurrencyPerAddress int `json:"concurrencyPerAddress,optional"` + ReconnectInitialInterval string `json:"reconnectInitialInterval,optional"` + Retry int `json:"retry,optional"` + RetryInitialInterval string `json:"retryInitialInterval,optional"` } -type ImportTaskConfig struct { - Version *string `json:"version" validate:"required"` - Description *string `json:"description,optional"` - RemoveTempFiles *bool `json:"removeTempFiles,optional"` - ClientSettings *ImportTaskClientSettings `json:"clientSettings" validate:"required"` - Files []*ImportTaskFile `json:"files" validate:"required"` +type Manager struct { + SpaceName string `json:"spaceName,omitempty" validate:"required"` + Batch int `json:"batch,omitempty, optional"` + ReaderConcurrency int `json:"readerConcurrency,omitempty, optional"` + ImporterConcurrency int `json:"importerConcurrency,omitempty, optional"` + StatsInterval string `json:"statsInterval,omitempty, optional"` +} + +type Sources struct { + CSV ImportTaskCSV `json:"csv" validate:"required"` + Local LocalConfig `json:"local, optional"` + S3 S3Config `json:"s3, optional"` + SFTP SFTPConfig `json:"sftpConfig, optional"` + Tags []Tag `json:"tags, optional"` + Edges []Edge `json:"edges, optional"` } type CreateImportTaskRequest struct { @@ -180,14 +179,16 @@ type GetImportTaskData struct { } type ImportTaskStats struct { - NumFailed int64 `json:"numFailed"` - NumReadFailed int64 `json:"numReadFailed"` - TotalCount int64 `json:"totalCount"` - TotalBatches int64 `json:"totalBatches"` - TotalLatency int64 `json:"totalLatency"` - TotalReqTime int64 `json:"totalReqTime"` - TotalBytes int64 `json:"totalBytes"` - TotalImportedBytes int64 `json:"totalImportedBytes"` + ProcessedBytes int64 `json:"processedBytes"` + TotalBytes int64 `json:"totalBytes"` + FailedRecords int64 `json:"failedRecords"` + TotalRecords int64 `json:"totalRecords"` + FailedRequest int64 `json:"failedRequest"` + TotalRequest int64 `json:"totalRequest"` + TotalLatency int64 `json:"totalLatency"` + TotalRespTime int64 `json:"totalRespTime"` + FailedProcessed int64 `json:"failedProcessed"` + TotalProcessed int64 `json:"totalProcessed"` } type GetManyImportTaskRequest struct { diff --git a/server/api/studio/restapi/import.api b/server/api/studio/restapi/import.api index 9b7a02a5..1f3f11cf 100644 --- a/server/api/studio/restapi/import.api +++ b/server/api/studio/restapi/import.api @@ -1,108 +1,106 @@ syntax = "v1" type ( - ImportTaskConnection { - User *string `json:"user" validate:"required"` - Password *string `json:"password" validate:"required"` - Address *string `json:"address" validate:"required"` - } - - ImportTaskClientSettings { - Retry *int `json:"retry,optional"` - Concurrency *int `json:"concurrency,optional"` - ChannelBufferSize *int `json:"channelBufferSize,optional"` - Space *string `json:"space" validate:"required"` - Connection *ImportTaskConnection `json:"connection" validate:"required"` - PostStart *ImportTaskPostStart `json:"postStart,optional"` - PreStop *ImportTaskPreStop `json:"preStop,optional"` - } - - ImportTaskPostStart { - Commands *string `json:"commands, optional"` - AfterPeriod *string `json:"afterPeriod, optional"` - } - - ImportTaskPreStop { - Commands *string `json:"commands,optional"` - } - ImportTaskCSV { WithHeader *bool `json:"withHeader,optional"` - WithLabel *bool `json:"withLabel,optional"` + LazyQuotes *bool `json:"lazyQuotes,optional"` Delimiter *string `json:"delimiter,optional"` } - ImportTaskVID { - Index *int64 `json:"index" validate:"required"` - Type *string `json:"type" validate:"required"` - Function *string `json:"function,optional"` - Prefix *string `json:"prefix,optional"` + NodeId { + Name string `json:"name,optional"` + Type string `json:"type" validate:"required"` + Index int64 `json:"index" validate:"required"` + ConcatItems []interface{} `json:"concatItems,optional"` + Function string `json:"function,optional"` } - ImportTaskTagProp { - Name *string `json:"name" validate:"required"` - Type *string `json:"type" validate:"required"` - Index *int64 `json:"index, optional"` + Tag { + Name string `json:"name" validate:"required"` + ID NodeId `json:"id" validate:"required"` + Props []Prop `json:"props" validate:"required"` + IgnoreExistedIndex bool `json:"ignoreExistedIndex,optional"` + } + Edge { + Name string `json:"name" validate:"required"` + Src NodeId `json:"src" validate:"required"` + Dst NodeId `json:"dst" validate:"required"` + Props []Prop `json:"props" validate:"required"` + Rank EdgeRank `json:"rank, optional"` + IgnoreExistedIndex bool `json:"ignoreExistedIndex,optional"` } - ImportTaskTag { - Name *string `json:"name" validate:"required"` - Props []*ImportTaskTagProp `json:"props" validate:"required"` + Prop { + Name string `json:"name" validate:"required"` + Type string `json:"type" validate:"required"` + Index int64 `json:"index, optional"` + Nullable bool `json:"nullable, optional"` + NullValue string `json:"nullValue, optional"` + AlternativeIndices []int64 `json:"alternativeIndices, optional"` + DefaultValue string `json:"defaultValue, optional"` } - ImportTaskVertex { - VID *ImportTaskVID `json:"vid" validate:"required"` - Tags []*ImportTaskTag `json:"tags" validate:"required"` + EdgeRank { + Index *int64 `json:"index, optional"` } - ImportTaskEdgeID { - Index *int64 `json:"index" validate:"required"` - Function *string `json:"function,optional"` - Type *string `json:"type" validate:"required"` - Prefix *string `json:"prefix,optional"` + S3Config { + Endpoint string `json:"endpoint,omitempty"` + Region string `json:"region,omitempty"` + AccessKey string `json:"accessKey,omitempty"` + SecretKey string `json:"secretKey,omitempty"` + Token string `json:"token,omitempty"` + Bucket string `json:"bucket,omitempty"` + Key string `json:"key,omitempty"` } - ImportTaskEdgeRank { - Index *int64 `json:"index, optional"` + SFTPConfig { + Host string `json:"host,omitempty"` + Port int `json:"port,omitempty"` + User string `json:"user,omitempty"` + Password string `json:"password,omitempty"` + KeyFile string `json:"keyFile,omitempty"` + KeyData string `json:"keyData,omitempty"` + Passphrase string `json:"passphrase,omitempty"` + Path string `json:"path,omitempty"` } - ImportTaskEdgeProp { - Name *string `json:"name"` - Type *string `json:"type"` - Index *int64 `json:"index, optional"` + LocalConfig { + Path string `json:"path,omitempty"` } - ImportTaskEdge { - Name *string `json:"name" validate:"required"` - SrcVID *ImportTaskEdgeID `json:"srcVID" validate:"required"` - DstVID *ImportTaskEdgeID `json:"dstVID" validate:"required"` - Rank *ImportTaskEdgeRank `json:"rank, optional"` - Props []*ImportTaskEdgeProp `json:"props" validate:"required"` + ImportTaskConfig { + Client Client `json:"client" validate:"required"` + Manager Manager `json:"manager" validate:"required"` + Sources []Sources `json:"sources" validate:"required"` } - ImportTaskSchema { - Type *string `json:"type" validate:"required"` - Edge *ImportTaskEdge `json:"edge,optional"` - Vertex *ImportTaskVertex `json:"vertex,optional"` + Client { + Version string `json:"version,omitempty" validate:"required"` + Address string `json:"address,omitempty" validate:"required"` + User string `json:"user,omitempty" validate:"required"` + Password string `json:"password,omitempty" validate:"required"` + ConcurrencyPerAddress int `json:"concurrencyPerAddress,optional"` + ReconnectInitialInterval string `json:"reconnectInitialInterval,optional"` + Retry int `json:"retry,optional"` + RetryInitialInterval string `json:"retryInitialInterval,optional"` } - ImportTaskFile { - Path *string `json:"path" validate:"required"` - FailDataPath *string `json:"failDataPath,optional"` - BatchSize *int `json:"batchSize,optional"` - Limit *int `json:"limit, optional"` - InOrder *bool `json:"inOrder, optional"` - Type *string `json:"type" validate:"required"` - CSV *ImportTaskCSV `json:"csv" validate:"required"` - Schema *ImportTaskSchema `json:"schema" validate:"required"` + Manager { + SpaceName string `json:"spaceName,omitempty" validate:"required"` + Batch int `json:"batch,omitempty, optional"` + ReaderConcurrency int `json:"readerConcurrency,omitempty, optional"` + ImporterConcurrency int `json:"importerConcurrency,omitempty, optional"` + StatsInterval string `json:"statsInterval,omitempty, optional"` } - ImportTaskConfig { - Version *string `json:"version" validate:"required"` - Description *string `json:"description,optional"` - RemoveTempFiles *bool `json:"removeTempFiles,optional"` - ClientSettings *ImportTaskClientSettings `json:"clientSettings" validate:"required"` - Files []*ImportTaskFile `json:"files" validate:"required"` + Sources { + CSV ImportTaskCSV `json:"csv" validate:"required"` + Local LocalConfig `json:"local, optional"` + S3 S3Config `json:"s3, optional"` + SFTP SFTPConfig `json:"sftpConfig, optional"` + Tags []Tag `json:"tags, optional"` + Edges []Edge `json:"edges, optional"` } CreateImportTaskRequest { @@ -133,14 +131,16 @@ type ( } ImportTaskStats { - NumFailed int64 `json:"numFailed"` - NumReadFailed int64 `json:"numReadFailed"` - TotalCount int64 `json:"totalCount"` - TotalBatches int64 `json:"totalBatches"` - TotalLatency int64 `json:"totalLatency"` - TotalReqTime int64 `json:"totalReqTime"` - TotalBytes int64 `json:"totalBytes"` - TotalImportedBytes int64 `json:"totalImportedBytes"` + ProcessedBytes int64 `json:"processedBytes"` + TotalBytes int64 `json:"totalBytes"` + FailedRecords int64 `json:"failedRecords"` + TotalRecords int64 `json:"totalRecords"` + FailedRequest int64 `json:"failedRequest"` + TotalRequest int64 `json:"totalRequest"` + TotalLatency int64 `json:"totalLatency"` + TotalRespTime int64 `json:"totalRespTime"` + FailedProcessed int64 `json:"failedProcessed"` + TotalProcessed int64 `json:"totalProcessed"` } GetManyImportTaskRequest { diff --git a/server/go.mod b/server/go.mod index 0ecab4fb..6e902392 100644 --- a/server/go.mod +++ b/server/go.mod @@ -7,7 +7,7 @@ require ( github.com/gorilla/websocket v1.5.0 github.com/satori/go.uuid v1.2.1-0.20181028125025-b2ce2384e17b github.com/vesoft-inc/go-pkg v0.0.0-20230307111816-a43d6a342d23 - github.com/vesoft-inc/nebula-importer v1.0.1-0.20220719030708-8e376665042e + github.com/vesoft-inc/nebula-importer/v4 v4.0.0-20230302074940-d97c824a6225 github.com/zeromicro/go-zero v1.3.3 ) @@ -15,7 +15,7 @@ require ( github.com/agiledragon/gomonkey/v2 v2.9.0 github.com/aws/aws-sdk-go v1.44.217 github.com/pkg/sftp v1.13.5 - github.com/stretchr/testify v1.7.1 + github.com/stretchr/testify v1.8.0 github.com/vesoft-inc/nebula-go/v3 v3.4.0 ) @@ -27,20 +27,40 @@ require ( ) require ( + github.com/aliyun/aliyun-oss-go-sdk v2.2.6+incompatible // indirect + github.com/cenkalti/backoff/v4 v4.1.3 // indirect + github.com/colinmarc/hdfs/v2 v2.3.0 // indirect github.com/davecgh/go-spew v1.1.1 // indirect + github.com/dustin/go-humanize v1.0.0 // indirect + github.com/golang/mock v1.6.0 // indirect + github.com/hashicorp/errwrap v1.0.0 // indirect + github.com/hashicorp/go-multierror v1.1.1 // indirect + github.com/hashicorp/go-uuid v1.0.2 // indirect + github.com/jcmturner/aescts/v2 v2.0.0 // indirect + github.com/jcmturner/dnsutils/v2 v2.0.0 // indirect + github.com/jcmturner/gofork v1.0.0 // indirect + github.com/jcmturner/goidentity/v6 v6.0.1 // indirect + github.com/jcmturner/gokrb5/v8 v8.4.2 // indirect + github.com/jcmturner/rpc/v2 v2.0.3 // indirect + github.com/jlaffaye/ftp v0.1.0 // indirect github.com/jmespath/go-jmespath v0.4.0 // indirect github.com/kr/fs v0.1.0 // indirect + github.com/panjf2000/ants v1.2.1 // indirect github.com/pmezard/go-difflib v1.0.0 // indirect + github.com/valyala/bytebufferpool v1.0.0 // indirect go.uber.org/atomic v1.9.0 // indirect go.uber.org/multierr v1.8.0 // indirect + golang.org/x/time v0.3.0 // indirect gopkg.in/yaml.v3 v3.0.1 // indirect ) +replace github.com/vesoft-inc/nebula-importer/v4 => github.com/veezhang/nebula-importer/v4 v4.0.0-20230302074940-d97c824a6225 + require ( github.com/axgle/mahonia v0.0.0-20180208002826-3358181d7394 github.com/beorn7/perks v1.0.1 // indirect github.com/cespare/xxhash/v2 v2.1.2 // indirect - github.com/go-logr/logr v1.2.2 // indirect + github.com/go-logr/logr v1.2.3 // indirect github.com/go-logr/stdr v1.2.2 // indirect github.com/go-playground/locales v0.14.0 // indirect github.com/go-playground/universal-translator v0.18.0 // indirect @@ -66,13 +86,13 @@ require ( go.opentelemetry.io/otel/sdk v1.3.0 // indirect go.opentelemetry.io/otel/trace v1.3.0 // indirect go.uber.org/automaxprocs v1.4.0 // indirect - go.uber.org/zap v1.21.0 - golang.org/x/crypto v0.0.0-20211215153901-e495a2d5b3d3 - golang.org/x/net v0.1.0 // indirect - golang.org/x/sys v0.1.0 // indirect - golang.org/x/text v0.4.0 // indirect + go.uber.org/zap v1.23.0 + golang.org/x/crypto v0.5.0 + golang.org/x/net v0.5.0 // indirect + golang.org/x/sys v0.4.0 // indirect + golang.org/x/text v0.6.0 // indirect google.golang.org/grpc v1.46.0 // indirect - google.golang.org/protobuf v1.28.0 // indirect + google.golang.org/protobuf v1.28.1 // indirect gopkg.in/yaml.v2 v2.4.0 gorm.io/driver/sqlite v1.3.2 ) diff --git a/server/go.sum b/server/go.sum index c3a71567..5ed4ade6 100644 --- a/server/go.sum +++ b/server/go.sum @@ -58,6 +58,8 @@ github.com/alecthomas/units v0.0.0-20190717042225-c3de453c63f4/go.mod h1:ybxpYRF github.com/alecthomas/units v0.0.0-20190924025748-f65c72e2690d/go.mod h1:rBZYJk541a8SKzHPHnH3zbiI+7dagKZ0cgpgrD7Fyho= github.com/alicebob/gopher-json v0.0.0-20200520072559-a9ecdc9d1d3a/go.mod h1:SGnFV6hVsYE877CKEZ6tDNTjaSXYUk6QqoIK6PrAtcc= github.com/alicebob/miniredis/v2 v2.17.0/go.mod h1:gquAfGbzn92jvtrSC69+6zZnwSODVXVpYDRaGhWaL6I= +github.com/aliyun/aliyun-oss-go-sdk v2.2.6+incompatible h1:KXeJoM1wo9I/6xPTyt6qCxoSZnmASiAjlrr0dyTUKt8= +github.com/aliyun/aliyun-oss-go-sdk v2.2.6+incompatible/go.mod h1:T/Aws4fEfogEE9v+HPhhw+CntffsBHJ8nXQCwKr0/g8= github.com/antihax/optional v1.0.0/go.mod h1:uupD/76wgC+ih3iEmQUL+0Ugr19nfwCT1kdvxnR2qWY= github.com/asaskevich/govalidator v0.0.0-20190424111038-f61b66f89f4a/go.mod h1:lB+ZfQJz7igIIfQNfa7Ml4HSf2uFQQRzpGGRXenZAgY= github.com/aws/aws-sdk-go v1.44.217 h1:FcWC56MRl+k756aH3qeMQTylSdeJ58WN0iFz3fkyRz0= @@ -71,6 +73,8 @@ github.com/beorn7/perks v1.0.0/go.mod h1:KWe93zE9D1o94FZ5RNwFwVgaQK1VOXiVxmqh+Ce github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM= github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6rlkpw= github.com/bkaradzic/go-lz4 v1.0.0/go.mod h1:0YdlkowM3VswSROI7qDxhRvJ3sLhlFrRRwjwegp5jy4= +github.com/cenkalti/backoff/v4 v4.1.3 h1:cFAlzYUlVYDysBEH2T5hyJZMh3+5+WCBvSnK6Q8UtC4= +github.com/cenkalti/backoff/v4 v4.1.3/go.mod h1:scbssz8iZGpm3xbr14ovlUdkxfGXNInqkPWOWmG2CLw= github.com/census-instrumentation/opencensus-proto v0.2.1/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU= github.com/cespare/xxhash/v2 v2.1.1/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs= github.com/cespare/xxhash/v2 v2.1.2 h1:YRXhKfTDauu4ajMg1TPgFO5jnlC2HCbmLXMcTG5cbYE= @@ -87,6 +91,8 @@ github.com/cncf/xds/go v0.0.0-20210805033703-aa0b78936158/go.mod h1:eXthEFrGJvWH github.com/cncf/xds/go v0.0.0-20210922020428-25de7278fc84/go.mod h1:eXthEFrGJvWHgFFCl3hGmgk+/aYT6PnTQLykKQRLhEs= github.com/cncf/xds/go v0.0.0-20211001041855-01bcc9b48dfe/go.mod h1:eXthEFrGJvWHgFFCl3hGmgk+/aYT6PnTQLykKQRLhEs= github.com/cncf/xds/go v0.0.0-20211011173535-cb28da3451f1/go.mod h1:eXthEFrGJvWHgFFCl3hGmgk+/aYT6PnTQLykKQRLhEs= +github.com/colinmarc/hdfs/v2 v2.3.0 h1:tMxOjXn6+7iPUlxAyup9Ha2hnmLe3Sv5DM2qqbSQ2VY= +github.com/colinmarc/hdfs/v2 v2.3.0/go.mod h1:nsyY1uyQOomU34KVQk9Qb/lDJobN1MQ/9WS6IqcVZno= github.com/coreos/go-semver v0.3.0/go.mod h1:nnelYz7RCh+5ahJtPPxZlU+153eP4D4r3EedlOD2RNk= github.com/coreos/go-systemd/v22 v22.3.2/go.mod h1:Y58oyj3AT4RCenI/lSvhwexgC+NSVTIJ3seZv2GcEnc= github.com/cpuguy83/go-md2man/v2 v2.0.0-20190314233015-f79a8a8ca69d/go.mod h1:maD7wRr/U5Z6m/iR4s+kqSMx2CaBsrgA7czyZG/E6dU= @@ -98,6 +104,7 @@ github.com/dgrijalva/jwt-go v3.2.0+incompatible/go.mod h1:E3ru+11k8xSBh+hMPgOLZm github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f/go.mod h1:cuUVRXasLTGF7a8hSLbxyZXjz+1KgoB3wDUb6vlszIc= github.com/docker/spdystream v0.0.0-20160310174837-449fdfce4d96/go.mod h1:Qh8CwZgvJUkLughtfhJv5dyTYa91l1fOUCrgjqmcifM= github.com/docopt/docopt-go v0.0.0-20180111231733-ee0de3bc6815/go.mod h1:WwZ+bS3ebgob9U8Nd0kOddGdZWjyMGR8Wziv+TBNwSE= +github.com/dustin/go-humanize v1.0.0 h1:VSnTsYCnlFHaM2/igO1h6X3HA71jcobQuxemgkq4zYo= github.com/dustin/go-humanize v1.0.0/go.mod h1:HtrtbFcZ19U5GC7JDqmcUSB87Iq5E25KnS6fMYU6eOk= github.com/eapache/go-resiliency v1.2.0/go.mod h1:kFI+JgMyC7bLPUVY133qvEBtVayf5mFgVsvEsIPBvNs= github.com/eapache/go-xerial-snappy v0.0.0-20180814174437-776d5712da21/go.mod h1:+020luEh2TKB4/GOp8oxxtq0Daoen/Cii55CzbTV6DU= @@ -116,6 +123,8 @@ github.com/evanphx/json-patch v4.9.0+incompatible/go.mod h1:50XU6AFN0ol/bzJsmQLi github.com/facebook/fbthrift v0.31.1-0.20211129061412-801ed7f9f295 h1:ZA+qQ3d2In0RNzVpk+D/nq1sjDSv+s1Wy2zrAPQAmsg= github.com/facebook/fbthrift v0.31.1-0.20211129061412-801ed7f9f295/go.mod h1:2tncLx5rmw69e5kMBv/yJneERbzrr1yr5fdlnTbu8lU= github.com/fatih/color v1.10.0/go.mod h1:ELkj/draVOlAH/xkhN6mQ50Qd0MPOk5AAr3maGEBuJM= +github.com/fclairamb/ftpserverlib v0.21.0 h1:QO4ex827FU6Y7FNi1cj4dmAs6bcmy+UtWcX5yzVzFAw= +github.com/fclairamb/go-log v0.4.1 h1:rLtdSG9x2pK41AIAnE8WYpl05xBJfw1ZyYxZaXFcBsM= github.com/form3tech-oss/jwt-go v3.2.2+incompatible/go.mod h1:pbq4aXjuKjdthFRnoDwaVPLA+WlJuPGy+QneDUgJi2k= github.com/fortytw2/leaktest v1.3.0/go.mod h1:jDsjWgpAGjm2CA7WthBh/CdZYEPF31XHquHwclZch5g= github.com/frankban/quicktest v1.11.3/go.mod h1:wRf/ReqHper53s+kmmSZizM8NamnL3IM0I9ntUbOk+k= @@ -137,8 +146,9 @@ github.com/go-logr/logr v0.1.0/go.mod h1:ixOQHD9gLJUVQQ2ZOR7zLEifBX6tGkNJF4QyIY7 github.com/go-logr/logr v0.2.0/go.mod h1:z6/tIYblkpsD+a4lm/fGIIU9mZ+XfAiaFtq7xTgseGU= github.com/go-logr/logr v1.2.0/go.mod h1:jdQByPbusPIv2/zmleS9BjJVeZ6kBagPoEUsqbVz/1A= github.com/go-logr/logr v1.2.1/go.mod h1:jdQByPbusPIv2/zmleS9BjJVeZ6kBagPoEUsqbVz/1A= -github.com/go-logr/logr v1.2.2 h1:ahHml/yUpnlb96Rp8HCvtYVPY8ZYpxq3g7UYchIYwbs= github.com/go-logr/logr v1.2.2/go.mod h1:jdQByPbusPIv2/zmleS9BjJVeZ6kBagPoEUsqbVz/1A= +github.com/go-logr/logr v1.2.3 h1:2DntVwHkVopvECVRSlL5PSo9eG+cAkDCuckLubN+rq0= +github.com/go-logr/logr v1.2.3/go.mod h1:jdQByPbusPIv2/zmleS9BjJVeZ6kBagPoEUsqbVz/1A= github.com/go-logr/stdr v1.2.0/go.mod h1:YkVgnZu1ZjjL7xTxrfm/LLZBfkhTqSR1ydtm6jTKKwI= github.com/go-logr/stdr v1.2.2 h1:hSWxHoqTgW2S2qGc0LTAI563KZ5YKYRhT3MFKZMbjag= github.com/go-logr/stdr v1.2.2/go.mod h1:mMo/vtBO5dYbehREoey6XUKy/eSumjCCveDpRre4VKE= @@ -180,6 +190,7 @@ github.com/golang/mock v1.4.0/go.mod h1:UOMv5ysSaYNkG+OFQykRIcU/QvvxJf3p21QfJ2Bt github.com/golang/mock v1.4.1/go.mod h1:UOMv5ysSaYNkG+OFQykRIcU/QvvxJf3p21QfJ2Bt3cw= github.com/golang/mock v1.4.3/go.mod h1:UOMv5ysSaYNkG+OFQykRIcU/QvvxJf3p21QfJ2Bt3cw= github.com/golang/mock v1.4.4/go.mod h1:l3mdAwkq5BuhzHwde/uurv3sEJeZMXNpwsxVWU71h+4= +github.com/golang/mock v1.6.0 h1:ErTB+efbowRARo13NNdxyJji2egdxLGQhRaY+DUumQc= github.com/golang/mock v1.6.0/go.mod h1:p6yTPP+5HYm5mzsMV8JkE6ZKdX+/wYM6Hr+LicevLPs= github.com/golang/protobuf v1.2.0/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= github.com/golang/protobuf v1.3.1/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= @@ -212,8 +223,8 @@ github.com/google/go-cmp v0.5.1/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/ github.com/google/go-cmp v0.5.2/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= github.com/google/go-cmp v0.5.4/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= -github.com/google/go-cmp v0.5.6 h1:BKbKCqvP6I+rmFHt06ZmyQtvB8xAkWdhFyr0ZUNZcxQ= github.com/google/go-cmp v0.5.6/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= +github.com/google/go-cmp v0.5.9 h1:O2Tfq5qg4qc4AmwVlvv0oLiVAGB7enBSJ2x2DqQFi38= github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg= github.com/google/gofuzz v1.1.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg= github.com/google/gofuzz v1.2.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg= @@ -236,7 +247,9 @@ github.com/googleapis/gax-go/v2 v2.0.5/go.mod h1:DWXyrwAJ9X0FpwwEdw+IPEYBICEFu5m github.com/googleapis/gnostic v0.4.1/go.mod h1:LRhVm6pbyptWbWbuZ38d1eyptfvIytN3ir6b65WBswg= github.com/gopherjs/gopherjs v0.0.0-20181017120253-0766667cb4d1/go.mod h1:wJfORRmW1u3UXTncJ5qlYoELFm8eSnnEO6hX4iZ3EWY= github.com/gorilla/mux v1.8.0/go.mod h1:DVbg23sWSpFRCP0SfiEN6jmj59UnW/n46BH5rLB71So= +github.com/gorilla/securecookie v1.1.1 h1:miw7JPhV+b/lAHSXz4qd/nN9jRiAFV5FwjeKyCS8BvQ= github.com/gorilla/securecookie v1.1.1/go.mod h1:ra0sb63/xPlUeL+yeDciTfxMRAA+MP+HVt/4epWDjd4= +github.com/gorilla/sessions v1.2.1 h1:DHd3rPN5lE3Ts3D8rKkQ8x/0kqfeNmBAaiSi+o7FsgI= github.com/gorilla/sessions v1.2.1/go.mod h1:dk2InVEVJ0sfLlnXv9EAgkf6ecYs/i80K/zI+bUmuGM= github.com/gorilla/websocket v1.5.0 h1:PPwGk2jz7EePpoHN/+ClbZu8SPxiqlu12wZP/3sWmnc= github.com/gorilla/websocket v1.5.0/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE= @@ -245,23 +258,36 @@ github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0/go.mod h1:8NvIoxWQoOIhqOTXgf github.com/grpc-ecosystem/grpc-gateway v1.16.0/go.mod h1:BDjrQk3hbvj6Nolgz8mAMFbcEtjT1g+wF4CSlocrBnw= github.com/h2non/parth v0.0.0-20190131123155-b4df798d6542 h1:2VTzZjLZBgl62/EtslCrtky5vbi9dd7HrQPQIx6wqiw= github.com/h2non/parth v0.0.0-20190131123155-b4df798d6542/go.mod h1:Ow0tF8D4Kplbc8s8sSb3V2oUCygFHVp8gC3Dn6U4MNI= +github.com/hashicorp/errwrap v1.0.0 h1:hLrqtEDnRye3+sgx6z4qVLNuviH3MR5aQ0ykNJa/UYA= +github.com/hashicorp/errwrap v1.0.0/go.mod h1:YH+1FKiLXxHSkmPseP+kNlulaMuP3n2brvKWEqk/Jc4= +github.com/hashicorp/go-multierror v1.1.1 h1:H5DkEtf6CXdFp0N0Em5UCwQpXMWke8IA0+lD48awMYo= +github.com/hashicorp/go-multierror v1.1.1/go.mod h1:iw975J/qwKPdAO1clOe2L8331t/9/fmwbPZ6JB6eMoM= +github.com/hashicorp/go-uuid v1.0.2 h1:cfejS+Tpcp13yd5nYHWDI6qVCny6wyX2Mt5SGur2IGE= github.com/hashicorp/go-uuid v1.0.2/go.mod h1:6SBZvOh/SIDV7/2o3Jml5SYk/TvGqwFJ/bN7x4byOro= github.com/hashicorp/golang-lru v0.5.0/go.mod h1:/m3WP610KZHVQ1SGc6re/UDhFvYD7pJ4Ao+sR/qLZy8= github.com/hashicorp/golang-lru v0.5.1/go.mod h1:/m3WP610KZHVQ1SGc6re/UDhFvYD7pJ4Ao+sR/qLZy8= github.com/hpcloud/tail v1.0.0/go.mod h1:ab1qPbhIpdTxEkNHXyeSf5vhxWSCs/tWer42PpOxQnU= github.com/ianlancetaylor/demangle v0.0.0-20181102032728-5e5cf60278f6/go.mod h1:aSSvb/t6k1mPoxDqO4vJh6VOCGPwU4O0C2/Eqndh1Sc= github.com/imdario/mergo v0.3.5/go.mod h1:2EnlNZ0deacrJVfApfmtdGgDfMuh/nq6Ok1EcJh5FfA= +github.com/jcmturner/aescts/v2 v2.0.0 h1:9YKLH6ey7H4eDBXW8khjYslgyqG2xZikXP0EQFKrle8= github.com/jcmturner/aescts/v2 v2.0.0/go.mod h1:AiaICIRyfYg35RUkr8yESTqvSy7csK90qZ5xfvvsoNs= +github.com/jcmturner/dnsutils/v2 v2.0.0 h1:lltnkeZGL0wILNvrNiVCR6Ro5PGU/SeBvVO/8c/iPbo= github.com/jcmturner/dnsutils/v2 v2.0.0/go.mod h1:b0TnjGOvI/n42bZa+hmXL+kFJZsFT7G4t3HTlQ184QM= +github.com/jcmturner/gofork v1.0.0 h1:J7uCkflzTEhUZ64xqKnkDxq3kzc96ajM1Gli5ktUem8= github.com/jcmturner/gofork v1.0.0/go.mod h1:MK8+TM0La+2rjBD4jE12Kj1pCCxK7d2LK/UM3ncEo0o= +github.com/jcmturner/goidentity/v6 v6.0.1 h1:VKnZd2oEIMorCTsFBnJWbExfNN7yZr3EhJAxwOkZg6o= github.com/jcmturner/goidentity/v6 v6.0.1/go.mod h1:X1YW3bgtvwAXju7V3LCIMpY0Gbxyjn/mY9zx4tFonSg= +github.com/jcmturner/gokrb5/v8 v8.4.2 h1:6ZIM6b/JJN0X8UM43ZOM6Z4SJzla+a/u7scXFJzodkA= github.com/jcmturner/gokrb5/v8 v8.4.2/go.mod h1:sb+Xq/fTY5yktf/VxLsE3wlfPqQjp0aWNYyvBVK62bc= +github.com/jcmturner/rpc/v2 v2.0.3 h1:7FXXj8Ti1IaVFpSAziCZWNzbNuZmnvw/i6CqLNdWfZY= github.com/jcmturner/rpc/v2 v2.0.3/go.mod h1:VUJYCIDm3PVOEHw8sgt091/20OJjskO/YJki3ELg/Hc= github.com/jinzhu/inflection v1.0.0 h1:K317FqzuhWc8YvSVlFMCCUb36O/S9MCKRDI7QkRKD/E= github.com/jinzhu/inflection v1.0.0/go.mod h1:h+uFLlag+Qp1Va5pdKtLDYj+kHp5pxUVkryuEj+Srlc= github.com/jinzhu/now v1.1.4/go.mod h1:d3SSVoowX0Lcu0IBviAWJpolVfI5UJVZZ7cO71lE/z8= github.com/jinzhu/now v1.1.5 h1:/o9tlHleP7gOFmsnYNz3RGnqzefHA47wQpKrrdTIwXQ= github.com/jinzhu/now v1.1.5/go.mod h1:d3SSVoowX0Lcu0IBviAWJpolVfI5UJVZZ7cO71lE/z8= +github.com/jlaffaye/ftp v0.1.0 h1:DLGExl5nBoSFoNshAUHwXAezXwXBvFdx7/qwhucWNSE= +github.com/jlaffaye/ftp v0.1.0/go.mod h1:hhq4G4crv+nW2qXtNYcuzLeOudG92Ps37HEKeg2e3lE= github.com/jmespath/go-jmespath v0.4.0 h1:BEgLn5cpjn8UN1mAw4NjwDrS35OdebyEtFe+9YPoQUg= github.com/jmespath/go-jmespath v0.4.0/go.mod h1:T8mJZnbsbmF+m6zOOFylbeCJqk5+pHWvzYPziyZiYoo= github.com/jmespath/go-jmespath/internal/testify v1.5.1 h1:shLQSRRSCCPj3f2gpwzGwWFoC7ycTf1rcQZHOlsJ6N8= @@ -333,15 +359,20 @@ github.com/onsi/ginkgo v1.6.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+W github.com/onsi/ginkgo v1.11.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE= github.com/onsi/ginkgo v1.12.1/go.mod h1:zj2OWP4+oCPe1qIXoGWkgMRwljMUYCdkwsT2108oapk= github.com/onsi/ginkgo v1.16.4/go.mod h1:dX+/inL/fNMqNlz0e9LfyB9TswhZpCVdJM/Z6Vvnwo0= +github.com/onsi/ginkgo v1.16.5 h1:8xi0RTUf59SOSfEtZMvwTvXYMzG4gV23XVHOZiXNtnE= github.com/onsi/ginkgo v1.16.5/go.mod h1:+E8gABHa3K6zRBolWtd+ROzc/U5bkGt0FwiG042wbpU= +github.com/onsi/ginkgo/v2 v2.4.0 h1:+Ig9nvqgS5OBSACXNk15PLdp0U9XPYROt9CFzVdFGIs= github.com/onsi/gomega v0.0.0-20170829124025-dcabb60a477c/go.mod h1:C1qb7wdrVGGVU+Z6iS04AVkA3Q65CEZX59MT0QO5uiA= github.com/onsi/gomega v1.7.0/go.mod h1:ex+gbHU/CVuBBDIJjb2X0qEXbFg53c61hWP/1CpauHY= github.com/onsi/gomega v1.7.1/go.mod h1:XdKZgCCFLUoM/7CFJVPcG8C1xQ1AJ0vpAezJrB7JYyY= github.com/onsi/gomega v1.10.1/go.mod h1:iN09h71vgCQne3DLsj+A5owkum+a2tYe+TOCB1ybHNo= github.com/onsi/gomega v1.16.0/go.mod h1:HnhC7FXeEQY45zxNK3PPoIUhzk/80Xly9PcubAlGdZY= +github.com/onsi/gomega v1.24.0 h1:+0glovB9Jd6z3VR+ScSwQqXVTIfJcGA9UBM8yzQxhqg= github.com/openzipkin/zipkin-go v0.3.0/go.mod h1:4c3sLeE8xjNqehmF5RpAFLPLJxXscc0R4l6Zg0P1tTQ= github.com/openzipkin/zipkin-go v0.4.0 h1:CtfRrOVZtbDj8rt1WXjklw0kqqJQwICrCKmlfUuBUUw= github.com/openzipkin/zipkin-go v0.4.0/go.mod h1:4c3sLeE8xjNqehmF5RpAFLPLJxXscc0R4l6Zg0P1tTQ= +github.com/panjf2000/ants v1.2.1 h1:IlhLREssFi+YFOITnHdH3FHhulY6WDS0OB9e7+3fMHk= +github.com/panjf2000/ants v1.2.1/go.mod h1:AaACblRPzq35m1g3enqYcxspbbiOJJYaxU2wMpm1cXY= github.com/peterbourgon/diskv v2.0.1+incompatible/go.mod h1:uqqh8zWWbv1HBMNONnaR/tNboyR3/BZd58JJSHlUSCU= github.com/pierrec/lz4 v2.0.5+incompatible/go.mod h1:pdkljMzZIN41W+lC3N2tnIh5sFi+IEE17M5jbnwPHcY= github.com/pierrec/lz4 v2.6.1+incompatible/go.mod h1:pdkljMzZIN41W+lC3N2tnIh5sFi+IEE17M5jbnwPHcY= @@ -399,31 +430,33 @@ github.com/smartystreets/goconvey v1.6.4/go.mod h1:syvi0/a8iFYH4r/RixwvyeAJjdLS9 github.com/spaolacci/murmur3 v1.1.0 h1:7c1g84S4BPRrfL5Xrdp6fOJ206sU9y293DDHaoy0bLI= github.com/spaolacci/murmur3 v1.1.0/go.mod h1:JwIasOWyU6f++ZhiEuf87xNszmSA2myDM2Kzu9HwQUA= github.com/spf13/afero v1.2.2/go.mod h1:9ZxEEn6pIJ8Rxe320qSDBk6AsU0r9pR7Q4OcevTdifk= +github.com/spf13/afero v1.9.3 h1:41FoI0fD7OR7mGcKE/aOiLkGreyf8ifIOQmJANWogMk= github.com/spf13/pflag v0.0.0-20170130214245-9ff6c6923cff/go.mod h1:DYY7MBk1bdzusC3SYhjObp+wFpr4gzcvqqNjLnInEg4= github.com/spf13/pflag v1.0.5/go.mod h1:McXfInJRrz4CZXVZOBLb0bTZqETkiAhM9Iw0y3An2Bg= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/objx v0.1.1/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= -github.com/stretchr/objx v0.2.0 h1:Hbg2NidpLE8veEBkEZTL3CvlkUIVzuU9jDplZO54c48= github.com/stretchr/objx v0.2.0/go.mod h1:qt09Ya8vawLte6SNmTgCsAVtYtaKzEcn8ATUoHMkEqE= +github.com/stretchr/objx v0.4.0 h1:M2gUjqZET1qApGOWNSnZ49BAIMX4F/1plDv3+l31EJ4= +github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw= github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs= github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4= github.com/stretchr/testify v1.5.1/go.mod h1:5W2xD1RspED5o8YsWQXVCued0rvSQ+mT+I5cxcmMvtA= github.com/stretchr/testify v1.6.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= -github.com/stretchr/testify v1.7.1 h1:5TQK59W5E3v0r2duFAb7P95B6hEeOyEnHRa8MjYSMTY= github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= +github.com/stretchr/testify v1.8.0 h1:pSgiaMZlXftHpm5L7V1+rVB+AZJydKsMxsQBIJw4PKk= +github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU= github.com/tidwall/pretty v1.0.0/go.mod h1:XNkn88O1ChpSDQmQeStsy+sBenx6DDtFZJxhVysOjyk= github.com/urfave/cli/v2 v2.3.0/go.mod h1:LJmUH05zAU44vOAcrfzZQKsZbVcdbOG8rtL3/XcUArI= -github.com/vesoft-inc/go-pkg v0.0.0-20220714021138-c5edb9946837 h1:18LhwI/O7zbAboZC87N1YR+49EorwuCz65yrmEJqoGU= -github.com/vesoft-inc/go-pkg v0.0.0-20220714021138-c5edb9946837/go.mod h1:HCAXRhF2io+nPLQnl+RQ6XyVcp1Xdv6NgslXRBBCiEU= +github.com/valyala/bytebufferpool v1.0.0 h1:GqA5TC/0021Y/b9FG4Oi9Mr3q7XYx6KllzawFIhcdPw= +github.com/valyala/bytebufferpool v1.0.0/go.mod h1:6bBcMArwyJ5K/AmCkWv1jt77kVWyCJ6HpOuEn7z0Csc= +github.com/veezhang/nebula-importer/v4 v4.0.0-20230302074940-d97c824a6225 h1:BZv/ScJIgYQSeVVJWPDgJKrEhn5TQJ2ZngBpDvGbA9o= +github.com/veezhang/nebula-importer/v4 v4.0.0-20230302074940-d97c824a6225/go.mod h1:6Z0GAlZdHWMQCyLkq377Li6ibIHkcwkzmBTEwIorAoU= github.com/vesoft-inc/go-pkg v0.0.0-20230307111816-a43d6a342d23 h1:uiuSGlc1MTuEOy+VKBFxXA8WayxXeGigh6DT0On5r/I= github.com/vesoft-inc/go-pkg v0.0.0-20230307111816-a43d6a342d23/go.mod h1:HCAXRhF2io+nPLQnl+RQ6XyVcp1Xdv6NgslXRBBCiEU= -github.com/vesoft-inc/nebula-go/v3 v3.0.0-20220425030225-cdb52399b40a/go.mod h1:+sXv05jYQBARdTbTcIEsWVXCnF/6ttOlDK35xQ6m54s= github.com/vesoft-inc/nebula-go/v3 v3.4.0 h1:7q2DSW4QABwI2oGPSVuC+Ql7kGwj26G/YVPGD7gETys= github.com/vesoft-inc/nebula-go/v3 v3.4.0/go.mod h1:+sXv05jYQBARdTbTcIEsWVXCnF/6ttOlDK35xQ6m54s= -github.com/vesoft-inc/nebula-importer v1.0.1-0.20220719030708-8e376665042e h1:Xj3N5lfKv+mG59Fh2GoWZ/89kWEwQtW/W4EiKkD2yI0= -github.com/vesoft-inc/nebula-importer v1.0.1-0.20220719030708-8e376665042e/go.mod h1:8xAQi6KI2qe40Dop/GqDXmBEurt7qGp5Pjd1MESAVNA= github.com/xdg-go/pbkdf2 v1.0.0/go.mod h1:jrpuAogTd400dnrH08LKmI/xc1MbPOebTwRqcT5RDeI= github.com/xdg-go/scram v1.0.2/go.mod h1:1WAq6h33pAW+iRreB34OORO2Nf7qel3VV3fjBj+hCSs= github.com/xdg-go/stringprep v1.0.2/go.mod h1:8F9zXuvzgwmyT5DUm4GUfZGDdT3W+LCvS6+da4O5kxM= @@ -469,8 +502,9 @@ go.uber.org/multierr v1.6.0/go.mod h1:cdWPpRnG4AhwMwsgIHip0KRBQjJy5kYEpYjJxpXp9i go.uber.org/multierr v1.8.0 h1:dg6GjLku4EH+249NNmoIciG9N/jURbDG+pFlTkhzIC8= go.uber.org/multierr v1.8.0/go.mod h1:7EAYxJLBy9rStEaz58O2t4Uvip6FSURkq8/ppBp95ak= go.uber.org/zap v1.17.0/go.mod h1:MXVU+bhUf/A7Xi2HNOnopQOrmycQ5Ih87HtOu4q5SSo= -go.uber.org/zap v1.21.0 h1:WefMeulhovoZ2sYXz7st6K0sLj7bBhpiFaud4r4zST8= go.uber.org/zap v1.21.0/go.mod h1:wjWOCqI0f2ZZrJF/UufIOkiC8ii6tm1iqIsLo76RfJw= +go.uber.org/zap v1.23.0 h1:OjGQ5KQDEUawVHxNwQgPpiypGHOxo2mNZsOqTak4fFY= +go.uber.org/zap v1.23.0/go.mod h1:D+nX8jyLsMHMYrln8A0rJjFt/T/9/bGgIhAqxv5URuY= golang.org/x/crypto v0.0.0-20180904163835-0709b304e793/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4= golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= golang.org/x/crypto v0.0.0-20190510104115-cbcb75029529/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= @@ -483,8 +517,9 @@ golang.org/x/crypto v0.0.0-20201112155050-0c6587e931a9/go.mod h1:LzIPMQfyMNhhGPh golang.org/x/crypto v0.0.0-20201216223049-8b5274cf687f/go.mod h1:jdWPYTVW3xRLrWPugEBEK3UY2ZEsg3UU495nc5E+M+I= golang.org/x/crypto v0.0.0-20210920023735-84f357641f63/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc= golang.org/x/crypto v0.0.0-20210921155107-089bfa567519/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc= -golang.org/x/crypto v0.0.0-20211215153901-e495a2d5b3d3 h1:0es+/5331RGQPcXlMfP+WrnIIS6dNnNRe0WB02W0F4M= golang.org/x/crypto v0.0.0-20211215153901-e495a2d5b3d3/go.mod h1:IxCIyHEi3zRg3s0A5j5BB6A9Jmi73HwBIUl50j+osU4= +golang.org/x/crypto v0.5.0 h1:U/0M97KRkSFvyD/3FSmdP5W5swImpNgle/EHFhOsQPE= +golang.org/x/crypto v0.5.0/go.mod h1:NK/OQwhpMQP3MwtdjgLlYHnH9ebylxKWv3e0fK+mkQU= golang.org/x/exp v0.0.0-20190121172915-509febef88a4/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= golang.org/x/exp v0.0.0-20190306152737-a1d7652674e8/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= golang.org/x/exp v0.0.0-20190510132918-efd6b22b2522/go.mod h1:ZjyILWgesfNpC6sMxTJOJm9Kp84zZh5NQWvqDGG3Qr8= @@ -560,8 +595,9 @@ golang.org/x/net v0.0.0-20211029224645-99673261e6eb/go.mod h1:9nx3DQGgdP8bBQD5qx golang.org/x/net v0.0.0-20211112202133-69e39bad7dc2/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y= golang.org/x/net v0.0.0-20220421235706-1d1ef9303861/go.mod h1:CfG3xpIq0wQ8r1q4Su4UZFWDARRcnwPjda9FqA0JpMk= golang.org/x/net v0.0.0-20220722155237-a158d28d115b/go.mod h1:XRhObCWvk6IyKnWLug+ECip1KBveYUHfp+8e9klMJ9c= -golang.org/x/net v0.1.0 h1:hZ/3BUoy5aId7sCpA/Tc5lt8DkFgdVS2onTpJsZ/fl0= golang.org/x/net v0.1.0/go.mod h1:Cx3nUiGt4eDBEyega/BKRp+/AlGL8hYe7U9odMt2Cco= +golang.org/x/net v0.5.0 h1:GyT4nK/YDHSqa1c4753ouYCDajOYKTja9Xb/OHtgvSw= +golang.org/x/net v0.5.0/go.mod h1:DivGGAXEgPSlEBzxGzZI+ZLohi+xUj054jfeKui00ws= golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U= golang.org/x/oauth2 v0.0.0-20190226205417-e64efc72b421/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw= golang.org/x/oauth2 v0.0.0-20190604053449-0f29369cfe45/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw= @@ -638,13 +674,14 @@ golang.org/x/sys v0.0.0-20211216021012-1d35b9e2eb4e/go.mod h1:oPkhp1MJrh7nUepCBc golang.org/x/sys v0.0.0-20220422013727-9388b58f7150/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20220520151302-bc2c85ada10a/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20220722155257-8c9f86f7a55f/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= -golang.org/x/sys v0.1.0 h1:kunALQeHf1/185U1i0GOB/fy1IPRDDpuoOOqRReG57U= golang.org/x/sys v0.1.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.4.0 h1:Zr2JFtRQNX3BCZ8YtxRE9hNJYC8J6I1MVbMg6owUp18= +golang.org/x/sys v0.4.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/term v0.0.0-20201117132131-f5c789dd3221/go.mod h1:Nr5EML6q2oocZ2LXRh80K7BxOlk5/8JxuGnuhpl+muw= golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8= -golang.org/x/term v0.1.0 h1:g6Z6vPFA9dYBAF7DWcH6sCcOntplXsDKcliusYijMlw= golang.org/x/term v0.1.0/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8= +golang.org/x/term v0.4.0 h1:O7UWfv5+A2qiuulQk30kVinPoMtoIPeVaKLEgLpVkvg= golang.org/x/text v0.0.0-20170915032832-14c0d48ead0c/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/text v0.3.1-0.20180807135948-17ff2d5776d2/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= @@ -653,13 +690,16 @@ golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= golang.org/x/text v0.3.5/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= golang.org/x/text v0.3.6/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= golang.org/x/text v0.3.7/go.mod h1:u+2+/6zg+i71rQMx5EYifcz6MCKuco9NR6JIITiCfzQ= -golang.org/x/text v0.4.0 h1:BrVqGRd7+k1DiOgtnFvAkoQEWQvBc25ouMJM6429SFg= golang.org/x/text v0.4.0/go.mod h1:mrYo+phRRbMaCq/xk9113O4dZlRixOauAjOtrjsXDZ8= +golang.org/x/text v0.6.0 h1:3XmdazWV+ubf7QgHSTWeykHOci5oeekaGJBLkrkaw4k= +golang.org/x/text v0.6.0/go.mod h1:mrYo+phRRbMaCq/xk9113O4dZlRixOauAjOtrjsXDZ8= golang.org/x/time v0.0.0-20181108054448-85acf8d2951c/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= golang.org/x/time v0.0.0-20190308202827-9d24e82272b4/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= golang.org/x/time v0.0.0-20191024005414-555d28b269f0/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= golang.org/x/time v0.0.0-20200630173020-3af7569d3a1e/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= golang.org/x/time v0.0.0-20211116232009-f0f3c7e86c11/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= +golang.org/x/time v0.3.0 h1:rg5rLMjNzMS1RkNLzCG38eapWhnYLFYXDXj2gOlr8j4= +golang.org/x/time v0.3.0/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= golang.org/x/tools v0.0.0-20190114222345-bf090417da8b/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= golang.org/x/tools v0.0.0-20190226205152-f727befe758c/go.mod h1:9Yl7xja0Znq3iFh3HoIrodX9oNMXvdceNzlUR8zjMvY= @@ -713,7 +753,6 @@ golang.org/x/tools v0.1.12/go.mod h1:hNGJHUnrk76NpqgfD5Aqm5Crs+Hm0VOH/i9J2+nxYbc golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= -golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1 h1:go1bK/D/BFZV2I8cIQd1NKEZ+0owSTG1fDTci4IqFcE= golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= google.golang.org/api v0.4.0/go.mod h1:8k5glujaEP+g9n7WNsDg8QP6cUVNI86fCNMcbazEtwE= google.golang.org/api v0.7.0/go.mod h1:WtwebWUNSVBH/HAw79HIFXZNqEvBhG+Ra+ax0hx3E3M= @@ -803,8 +842,9 @@ google.golang.org/protobuf v1.25.0/go.mod h1:9JNX74DMeImyA3h4bdi1ymwjUzf21/xIlba google.golang.org/protobuf v1.26.0-rc.1/go.mod h1:jlhhOSvTdKEhbULTjvd4ARK9grFBp09yW+WbY/TyQbw= google.golang.org/protobuf v1.26.0/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc= google.golang.org/protobuf v1.27.1/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc= -google.golang.org/protobuf v1.28.0 h1:w43yiav+6bVFTBQFZX0r7ipe9JQ1QsbMgHwbBziscLw= google.golang.org/protobuf v1.28.0/go.mod h1:HV8QOd/L58Z+nl8r43ehVNZIU/HEI6OcFqwMG9pJV4I= +google.golang.org/protobuf v1.28.1 h1:d0NfwRgPtno5B1Wa6L2DAG+KivqkdutMf1UhdNx175w= +google.golang.org/protobuf v1.28.1/go.mod h1:HV8QOd/L58Z+nl8r43ehVNZIU/HEI6OcFqwMG9pJV4I= gopkg.in/alecthomas/kingpin.v2 v2.2.6/go.mod h1:FMv+mEhP44yOT+4EoQTLFTRgOQ1FBLkstjWtayDeSgw= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= From 830cd8de29b1047ab4db0adc9d60a800734d7d03 Mon Sep 17 00:00:00 2001 From: Nut He <18328704+hetao92@users.noreply.github.com> Date: Wed, 8 Mar 2023 17:35:37 +0800 Subject: [PATCH 2/4] mod: update import logic mod: code review --- app/config/locale/en-US.json | 5 +- app/config/locale/zh-CN.json | 5 +- app/interfaces/import.ts | 14 ++- .../FileList/UploadConfigModal/index.tsx | 3 +- app/pages/Import/TaskCreate/index.tsx | 12 -- app/pages/Import/TaskList/TaskItem/index.tsx | 19 ++- .../Import/TaskList/TemplateModal/index.tsx | 22 ++-- app/utils/constant.ts | 1 - app/utils/import.ts | 46 +++---- server/api/studio/internal/service/import.go | 117 ++++++++---------- .../internal/service/importer/importer.go | 83 +++++-------- .../studio/internal/service/importer/task.go | 2 +- .../internal/service/importer/taskmgr.go | 112 ++++++++--------- server/api/studio/internal/types/types.go | 103 ++++++++------- server/api/studio/restapi/import.api | 101 ++++++++------- server/go.mod | 6 +- server/go.sum | 4 +- 17 files changed, 312 insertions(+), 343 deletions(-) diff --git a/app/config/locale/en-US.json b/app/config/locale/en-US.json index 9b1ee5bb..ab65fdc1 100644 --- a/app/config/locale/en-US.json +++ b/app/config/locale/en-US.json @@ -190,8 +190,7 @@ "importFailed": "Failed", "importRunning": "Running", "importPending": "Pending", - "notImported": "{total} lines not imported", - "readFailed": "{total} lines read failed", + "notImported": "{total} records not imported", "selectFile": "Select file", "addTag": "Add Tag", "addEdge": "Add Edge Type", @@ -222,14 +221,12 @@ "concurrencyTip": "Number of NebulaGraph client concurrency.", "batchSizeTip": "The number of statements inserting data in a batch.", "retryTip": "Retry times of nGQL statement execution failures.", - "channelBufferSizeTip": "Cache queue size per NebulaGraph client.", "vidFunctionTip": "Function to generate VID. Currently only hash functions are supported.", "vidPrefixTip": "prefix added to the original vid.", "selectCsvColumn": "Select CSV Index", "graphAddress": "Graph service address", "concurrency": "Concurrency", "retry": "Retry", - "channelBufferSize": "Channel Buffer Size", "graphAddressTip": "The following Graph host will be used for data import", "currentHost": "Current connected host", "expandMoreConfig": "Expand more configurations", diff --git a/app/config/locale/zh-CN.json b/app/config/locale/zh-CN.json index 47201cce..8ea46787 100644 --- a/app/config/locale/zh-CN.json +++ b/app/config/locale/zh-CN.json @@ -190,8 +190,7 @@ "importFailed": "导入失败", "importRunning": "导入中", "importPending": "等待导入", - "notImported": "{total}行未导入", - "readFailed": "{total}行读取失败", + "notImported": "{total}条记录未导入", "selectFile": "选择绑定文件", "addTag": "添加 Tag", "addEdge": "添加 Edge Type", @@ -222,14 +221,12 @@ "concurrencyTip": "NebulaGraph 客户端并发数", "batchSizeTip": "单批次插入数据的语句数量", "retryTip": "nGQL 语句执行失败的重试次数", - "channelBufferSizeTip": "每个 NebulaGraph 客户端的缓存队列大小", "vidFunctionTip": "生成 VID 的函数。目前只支持 hash 函数", "vidPrefixTip": "给原始 VID 添加的前缀", "selectCsvColumn": "选择 CSV 列", "graphAddress": "Graph 服务地址", "concurrency": "并发数", "retry": "重试次数", - "channelBufferSize": "缓存队列大小", "graphAddressTip": "Graph 服务的地址和端口。将使用以下 Graph 节点进行数据导入", "currentHost": "当前登录的 Graph 节点", "expandMoreConfig": "展开更多配置", diff --git a/app/interfaces/import.ts b/app/interfaces/import.ts index c7c227a1..7201e3a7 100644 --- a/app/interfaces/import.ts +++ b/app/interfaces/import.ts @@ -10,13 +10,16 @@ export enum ITaskStatus { } export interface ITaskStats { - totalBatches: number; + processedBytes: number; totalBytes: number; - totalImportedBytes: number; + failedRecords: number; + totalRecords: number; + failedRequest: number; + totalRequest: number; totalLatency: number; - totalReqTime: number; - numFailed: number; - numReadFailed: number; + totalRespTime: number; + failedProcessed: number; // The number of nodes and edges that have failed to be processed. + totalProcessed: number; // The number of nodes and edges that have been processed. } export interface ITaskItem { id: number; @@ -52,7 +55,6 @@ export interface IBasicConfig { batchSize?: string; concurrency?: string; retry?: string; - channelBufferSize?: string; } export interface ILogDimension { diff --git a/app/pages/Import/FileList/UploadConfigModal/index.tsx b/app/pages/Import/FileList/UploadConfigModal/index.tsx index 5f91b9b4..344128c2 100644 --- a/app/pages/Import/FileList/UploadConfigModal/index.tsx +++ b/app/pages/Import/FileList/UploadConfigModal/index.tsx @@ -1,6 +1,6 @@ import Icon from '@app/components/Icon'; import { useI18n } from '@vesoft-inc/i18n'; -import { Button, Input, Modal, Table, Popconfirm, Dropdown } from 'antd'; +import { Button, Input, Modal, Table, Popconfirm, Dropdown, message } from 'antd'; import { v4 as uuidv4 } from 'uuid'; import React, { useCallback, useEffect, useState } from 'react'; import { usePapaParse } from 'react-papaparse'; @@ -158,6 +158,7 @@ const UploadConfigModal = (props: IProps) => { const res = await uploadFile(data); if(res.code === 0) { onConfirm(); + message.success(intl.get('import.uploadSuccessfully')); } setState({ uploading: false }); }, []); diff --git a/app/pages/Import/TaskCreate/index.tsx b/app/pages/Import/TaskCreate/index.tsx index e64b4ca2..e30f3dc5 100644 --- a/app/pages/Import/TaskCreate/index.tsx +++ b/app/pages/Import/TaskCreate/index.tsx @@ -206,18 +206,6 @@ const TaskCreate = (props: IProps) => { placeholder: DEFAULT_IMPORT_CONFIG.retry, description: intl.get('import.retryTip'), }, - { - label: intl.get('import.channelBufferSize'), - key: 'channelBufferSize', - rules: [ - { - pattern: POSITIVE_INTEGER_REGEX, - message: intl.get('formRules.numberRequired'), - }, - ], - placeholder: DEFAULT_IMPORT_CONFIG.channelBufferSize, - description: intl.get('import.channelBufferSizeTip'), - }, ], [currentLocale]); return (