Skip to content

Commit

Permalink
see if new swarm setting works (#715)
Browse files Browse the repository at this point in the history
Co-authored-by: alabdao <139594838+alabdao@users.noreply.github.com>
  • Loading branch information
thetechnocrat-dev and alabdao authored Oct 23, 2023
1 parent 2e18c25 commit f0128a2
Show file tree
Hide file tree
Showing 3 changed files with 34 additions and 17 deletions.
39 changes: 27 additions & 12 deletions internal/bacalhau/bacalhau.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,39 +115,57 @@ func CreateBacalhauJob(cid, container, cmd, selector string, maxTime, memory int
return job, err
}

func CreateBacalhauClient() *client.APIClient {
func CreateBacalhauClient() (*client.APIClient, error) {
home, err := os.UserHomeDir()
if err != nil {
log.Fatal(err)
return nil, err
}
bacalhauConfigDirPath := filepath.Join(home, ".bacalhau")
bacalhauConfig, err := config.Load(bacalhauConfigDirPath, "config", "yaml")
if err != nil {
return nil, err
}
if os.Getenv("BACALHAU_IPFS_SWARM_ADDRESSES") != "" {
swarmAddresses := []string{os.Getenv("BACALHAU_IPFS_SWARM_ADDRESSES")}
bacalhauConfig.Node.IPFS.SwarmAddresses = swarmAddresses
}
config.SetUserKey(filepath.Join(bacalhauConfigDirPath, "user_id.pem"))
config.SetLibp2pKey(filepath.Join(bacalhauConfigDirPath, "libp2p_private_key"))
defaultConfig := config.ForEnvironment()
_, err = config.Init(defaultConfig, filepath.Join(home, ".bacalhau"), "config", "yaml")
config.Set(bacalhauConfig)

_, err = config.Init(bacalhauConfig, bacalhauConfigDirPath, "config", "yaml")
if err != nil {
log.Fatal(err)
return nil, err
}
apiHost := GetBacalhauApiHost()
apiPort := uint16(1234)
client := client.NewAPIClient(apiHost, apiPort)
return client
return client, err
}

func SubmitBacalhauJob(job *model.Job) (submittedJob *model.Job, err error) {
client := CreateBacalhauClient()
client, err := CreateBacalhauClient()
if err != nil {
return nil, err
}
submittedJob, err = client.Submit(context.Background(), job)
return submittedJob, err
}

func GetBacalhauJobState(jobId string) (*model.JobWithInfo, error) {
client := CreateBacalhauClient()
client, err := CreateBacalhauClient()
if err != nil {
return nil, err
}
updatedJob, _, err := client.Get(context.Background(), jobId)
return updatedJob, err
}

func GetBacalhauJobResults(submittedJob *model.Job, showAnimation bool, maxTime int) (results []model.PublishedResult, err error) {
client := CreateBacalhauClient()
client, err := CreateBacalhauClient()
if err != nil {
return nil, err
}

sleepConstant := 2
maxTrys := maxTime * 60 / sleepConstant
Expand Down Expand Up @@ -197,9 +215,6 @@ func DownloadBacalhauResults(dir string, submittedJob *model.Job, results []mode
Timeout: model.DefaultDownloadTimeout,
OutputDir: dir,
}
// if os.Getenv("BACALHAU_IPFS_SWARM_ADDRESSES") != "" {
// downloadSettings.IPFSSwarmAddrs = os.Getenv("BACALHAU_IPFS_SWARM_ADDRESSES")
// }
downloadSettings.OutputDir = dir
downloaderProvider := util.NewStandardDownloaders(cm, downloadSettings)
err := downloader.DownloadResults(context.Background(), results, downloaderProvider, downloadSettings)
Expand Down
5 changes: 1 addition & 4 deletions internal/ipfs/ipfs.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,11 +113,8 @@ func DownloadToDirectory(cid, directory string) error {
ipfsNodeUrl := DeriveIpfsNodeUrl()
sh := shell.NewShell(ipfsNodeUrl)

// Construct the full directory path where the CID content will be downloaded
downloadPath := path.Join(directory, cid)

// Use the Get method to download the file or directory with the specified CID
err := sh.Get(cid, downloadPath)
err := sh.Get(cid, directory)
if err != nil {
return err
}
Expand Down
7 changes: 6 additions & 1 deletion internal/ipwl/ipwl.go
Original file line number Diff line number Diff line change
Expand Up @@ -300,7 +300,12 @@ func processIOTask(ioEntry IO, index, maxTime int, jobDir, ioJsonPath, selector
fmt.Println("Downloading Bacalhau job")
fmt.Printf("Output dir of %s \n", outputsDirPath)
}
err = bacalhau.DownloadBacalhauResults(outputsDirPath, submittedJob, results)

for _, result := range results {
fmt.Printf("Downloading result %s to %s \n", result.Data.CID, outputsDirPath)
err = ipfs.DownloadToDirectory(result.Data.CID, outputsDirPath)
}

if err != nil {
updateIOWithError(ioJsonPath, index, err, fileMutex)
return fmt.Errorf("error downloading Bacalhau results: %w", err)
Expand Down

0 comments on commit f0128a2

Please sign in to comment.