From f0128a2ab17bfd64778e7420014b2867dc08fd2a Mon Sep 17 00:00:00 2001 From: The Technocrat Date: Mon, 23 Oct 2023 09:51:36 -0400 Subject: [PATCH] see if new swarm setting works (#715) Co-authored-by: alabdao <139594838+alabdao@users.noreply.github.com> --- internal/bacalhau/bacalhau.go | 39 ++++++++++++++++++++++++----------- internal/ipfs/ipfs.go | 5 +---- internal/ipwl/ipwl.go | 7 ++++++- 3 files changed, 34 insertions(+), 17 deletions(-) diff --git a/internal/bacalhau/bacalhau.go b/internal/bacalhau/bacalhau.go index 13e646431..576aa9ce8 100644 --- a/internal/bacalhau/bacalhau.go +++ b/internal/bacalhau/bacalhau.go @@ -115,39 +115,57 @@ func CreateBacalhauJob(cid, container, cmd, selector string, maxTime, memory int return job, err } -func CreateBacalhauClient() *client.APIClient { +func CreateBacalhauClient() (*client.APIClient, error) { home, err := os.UserHomeDir() if err != nil { - log.Fatal(err) + return nil, err } bacalhauConfigDirPath := filepath.Join(home, ".bacalhau") + bacalhauConfig, err := config.Load(bacalhauConfigDirPath, "config", "yaml") + if err != nil { + return nil, err + } + if os.Getenv("BACALHAU_IPFS_SWARM_ADDRESSES") != "" { + swarmAddresses := []string{os.Getenv("BACALHAU_IPFS_SWARM_ADDRESSES")} + bacalhauConfig.Node.IPFS.SwarmAddresses = swarmAddresses + } config.SetUserKey(filepath.Join(bacalhauConfigDirPath, "user_id.pem")) config.SetLibp2pKey(filepath.Join(bacalhauConfigDirPath, "libp2p_private_key")) - defaultConfig := config.ForEnvironment() - _, err = config.Init(defaultConfig, filepath.Join(home, ".bacalhau"), "config", "yaml") + config.Set(bacalhauConfig) + + _, err = config.Init(bacalhauConfig, bacalhauConfigDirPath, "config", "yaml") if err != nil { - log.Fatal(err) + return nil, err } apiHost := GetBacalhauApiHost() apiPort := uint16(1234) client := client.NewAPIClient(apiHost, apiPort) - return client + return client, err } func SubmitBacalhauJob(job *model.Job) (submittedJob *model.Job, err error) { - client := CreateBacalhauClient() + client, err := CreateBacalhauClient() + if err != nil { + return nil, err + } submittedJob, err = client.Submit(context.Background(), job) return submittedJob, err } func GetBacalhauJobState(jobId string) (*model.JobWithInfo, error) { - client := CreateBacalhauClient() + client, err := CreateBacalhauClient() + if err != nil { + return nil, err + } updatedJob, _, err := client.Get(context.Background(), jobId) return updatedJob, err } func GetBacalhauJobResults(submittedJob *model.Job, showAnimation bool, maxTime int) (results []model.PublishedResult, err error) { - client := CreateBacalhauClient() + client, err := CreateBacalhauClient() + if err != nil { + return nil, err + } sleepConstant := 2 maxTrys := maxTime * 60 / sleepConstant @@ -197,9 +215,6 @@ func DownloadBacalhauResults(dir string, submittedJob *model.Job, results []mode Timeout: model.DefaultDownloadTimeout, OutputDir: dir, } - // if os.Getenv("BACALHAU_IPFS_SWARM_ADDRESSES") != "" { - // downloadSettings.IPFSSwarmAddrs = os.Getenv("BACALHAU_IPFS_SWARM_ADDRESSES") - // } downloadSettings.OutputDir = dir downloaderProvider := util.NewStandardDownloaders(cm, downloadSettings) err := downloader.DownloadResults(context.Background(), results, downloaderProvider, downloadSettings) diff --git a/internal/ipfs/ipfs.go b/internal/ipfs/ipfs.go index aa6a4ea53..ebd4a20c0 100644 --- a/internal/ipfs/ipfs.go +++ b/internal/ipfs/ipfs.go @@ -113,11 +113,8 @@ func DownloadToDirectory(cid, directory string) error { ipfsNodeUrl := DeriveIpfsNodeUrl() sh := shell.NewShell(ipfsNodeUrl) - // Construct the full directory path where the CID content will be downloaded - downloadPath := path.Join(directory, cid) - // Use the Get method to download the file or directory with the specified CID - err := sh.Get(cid, downloadPath) + err := sh.Get(cid, directory) if err != nil { return err } diff --git a/internal/ipwl/ipwl.go b/internal/ipwl/ipwl.go index 22dd6cb65..1c8cf9271 100644 --- a/internal/ipwl/ipwl.go +++ b/internal/ipwl/ipwl.go @@ -300,7 +300,12 @@ func processIOTask(ioEntry IO, index, maxTime int, jobDir, ioJsonPath, selector fmt.Println("Downloading Bacalhau job") fmt.Printf("Output dir of %s \n", outputsDirPath) } - err = bacalhau.DownloadBacalhauResults(outputsDirPath, submittedJob, results) + + for _, result := range results { + fmt.Printf("Downloading result %s to %s \n", result.Data.CID, outputsDirPath) + err = ipfs.DownloadToDirectory(result.Data.CID, outputsDirPath) + } + if err != nil { updateIOWithError(ioJsonPath, index, err, fileMutex) return fmt.Errorf("error downloading Bacalhau results: %w", err)