Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: refactor Fetcher interface used for downloading migrations #8728

Merged
merged 7 commits into from
Feb 11, 2022
13 changes: 2 additions & 11 deletions repo/fsrepo/migrations/fetch.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ import (
"bytes"
"context"
"fmt"
"io"
"io/ioutil"
"os"
"os/exec"
Expand Down Expand Up @@ -111,16 +110,8 @@ func FetchBinary(ctx context.Context, fetcher Fetcher, dist, ver, binName, out s
}
defer arcFile.Close()

// Open connection to download archive from ipfs path
rc, err := fetcher.Fetch(ctx, arcDistPath)
if err != nil {
return "", err
}
defer rc.Close()

// Write download data
_, err = io.Copy(arcFile, rc)
if err != nil {
// Open connection to download archive from ipfs path and write to file
if err := fetcher.Fetch(ctx, arcDistPath, arcFile); err != nil {
return "", err
}
arcFile.Close()
Expand Down
16 changes: 8 additions & 8 deletions repo/fsrepo/migrations/fetch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package migrations

import (
"bufio"
"bytes"
"context"
"fmt"
"io"
Expand Down Expand Up @@ -96,14 +97,14 @@ func TestHttpFetch(t *testing.T) {

fetcher := NewHttpFetcher("", ts.URL, "", 0)

rc, err := fetcher.Fetch(ctx, "/versions")
var buf bytes.Buffer
err := fetcher.Fetch(ctx, "/versions", &buf)
if err != nil {
t.Fatal(err)
}
defer rc.Close()

var lines []string
scan := bufio.NewScanner(rc)
scan := bufio.NewScanner(&buf)
for scan.Scan() {
lines = append(lines, scan.Text())
}
Expand All @@ -120,7 +121,7 @@ func TestHttpFetch(t *testing.T) {
}

// Check not found
_, err = fetcher.Fetch(ctx, "/no_such_file")
err = fetcher.Fetch(ctx, "/no_such_file", &bytes.Buffer{})
if err == nil || !strings.Contains(err.Error(), "404") {
t.Fatal("expected error 404")
}
Expand Down Expand Up @@ -232,13 +233,12 @@ func TestMultiFetcher(t *testing.T) {

mf := NewMultiFetcher(badFetcher, fetcher)

rc, err := mf.Fetch(ctx, "/versions")
if err != nil {
var buf bytes.Buffer
if err := mf.Fetch(ctx, "/versions", &buf); err != nil {
t.Fatal(err)
}
defer rc.Close()

vers, err := ioutil.ReadAll(rc)
vers, err := ioutil.ReadAll(&buf)
if err != nil {
t.Fatal("could not read versions:", err)
}
Expand Down
15 changes: 10 additions & 5 deletions repo/fsrepo/migrations/fetcher.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package migrations

import (
"bytes"
"context"
"io"
"os"
Expand All @@ -21,7 +22,7 @@ const (
type Fetcher interface {
// Fetch attempts to fetch the file at the given ipfs path.
// Returns io.ReadCloser on success, which caller must close.
Fetch(ctx context.Context, filePath string) (io.ReadCloser, error)
Fetch(ctx context.Context, filePath string, writer io.Writer) error
// Close performs any cleanup after the fetcher is not longer needed.
Close() error
}
Expand Down Expand Up @@ -49,16 +50,20 @@ func NewMultiFetcher(f ...Fetcher) Fetcher {

// Fetch attempts to fetch the file at each of its fetchers until one succeeds.
// Returns io.ReadCloser on success, which caller must close.
aschmahmann marked this conversation as resolved.
Show resolved Hide resolved
func (f *MultiFetcher) Fetch(ctx context.Context, ipfsPath string) (io.ReadCloser, error) {
func (f *MultiFetcher) Fetch(ctx context.Context, ipfsPath string, writer io.Writer) error {
var errs error
for _, fetcher := range f.fetchers {
rc, err := fetcher.Fetch(ctx, ipfsPath)
var buf bytes.Buffer
err := fetcher.Fetch(ctx, ipfsPath, &buf)
if err == nil {
return rc, nil
if _, err := io.Copy(writer, &buf); err != nil {
return err
}
return nil
}
errs = multierror.Append(errs, err)
aschmahmann marked this conversation as resolved.
Show resolved Hide resolved
}
return nil, errs
return errs
}

func (f *MultiFetcher) Close() error {
Expand Down
22 changes: 15 additions & 7 deletions repo/fsrepo/migrations/httpfetcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,13 +62,13 @@ func NewHttpFetcher(distPath, gateway, userAgent string, fetchLimit int64) *Http
// Fetch attempts to fetch the file at the given path, from the distribution
// site configured for this HttpFetcher. Returns io.ReadCloser on success,
// which caller must close.
func (f *HttpFetcher) Fetch(ctx context.Context, filePath string) (io.ReadCloser, error) {
func (f *HttpFetcher) Fetch(ctx context.Context, filePath string, writer io.Writer) error {
gwURL := f.gateway + path.Join(f.distPath, filePath)
fmt.Printf("Fetching with HTTP: %q\n", gwURL)

req, err := http.NewRequestWithContext(ctx, http.MethodGet, gwURL, nil)
if err != nil {
return nil, fmt.Errorf("http.NewRequest error: %s", err)
return fmt.Errorf("http.NewRequest error: %s", err)
}

if f.userAgent != "" {
Expand All @@ -77,22 +77,30 @@ func (f *HttpFetcher) Fetch(ctx context.Context, filePath string) (io.ReadCloser

resp, err := http.DefaultClient.Do(req)
if err != nil {
return nil, fmt.Errorf("http.DefaultClient.Do error: %s", err)
return fmt.Errorf("http.DefaultClient.Do error: %s", err)
}

if resp.StatusCode >= 400 {
defer resp.Body.Close()
mes, err := ioutil.ReadAll(resp.Body)
if err != nil {
return nil, fmt.Errorf("error reading error body: %s", err)
return fmt.Errorf("error reading error body: %s", err)
}
return nil, fmt.Errorf("GET %s error: %s: %s", gwURL, resp.Status, string(mes))
return fmt.Errorf("GET %s error: %s: %s", gwURL, resp.Status, string(mes))
}

var rc io.ReadCloser
if f.limit != 0 {
return NewLimitReadCloser(resp.Body, f.limit), nil
rc = NewLimitReadCloser(resp.Body, f.limit)
} else {
rc = resp.Body
}
return resp.Body, nil
defer rc.Close()

if _, err := io.Copy(writer, rc); err != nil {
return err
}
return nil
}

func (f *HttpFetcher) Close() error {
Expand Down
24 changes: 17 additions & 7 deletions repo/fsrepo/migrations/ipfsfetcher/ipfsfetcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,8 @@ type IpfsFetcher struct {
addrInfo peer.AddrInfo
}

var _ migrations.Fetcher = (*IpfsFetcher)(nil)

// NewIpfsFetcher creates a new IpfsFetcher
//
// Specifying "" for distPath sets the default IPNS path.
Expand Down Expand Up @@ -87,7 +89,7 @@ func NewIpfsFetcher(distPath string, fetchLimit int64, repoRoot *string) *IpfsFe
// Fetch attempts to fetch the file at the given path, from the distribution
// site configured for this HttpFetcher. Returns io.ReadCloser on success,
// which caller must close.
func (f *IpfsFetcher) Fetch(ctx context.Context, filePath string) (io.ReadCloser, error) {
func (f *IpfsFetcher) Fetch(ctx context.Context, filePath string, writer io.Writer) error {
// Initialize and start IPFS node on first call to Fetch, since the fetcher
// may be created by not used.
f.openOnce.Do(func() {
Expand All @@ -103,30 +105,38 @@ func (f *IpfsFetcher) Fetch(ctx context.Context, filePath string) (io.ReadCloser
fmt.Printf("Fetching with IPFS: %q\n", filePath)

if f.openErr != nil {
return nil, f.openErr
return f.openErr
}

iPath, err := parsePath(path.Join(f.distPath, filePath))
if err != nil {
return nil, err
return err
}

nd, err := f.ipfs.Unixfs().Get(ctx, iPath)
if err != nil {
return nil, err
return err
}

f.recordFetched(iPath)

fileNode, ok := nd.(files.File)
if !ok {
return nil, fmt.Errorf("%q is not a file", filePath)
return fmt.Errorf("%q is not a file", filePath)
}

var rc io.ReadCloser
if f.limit != 0 {
return migrations.NewLimitReadCloser(fileNode, f.limit), nil
rc = migrations.NewLimitReadCloser(fileNode, f.limit)
} else {
rc = fileNode
}
return fileNode, nil
defer rc.Close()

if _, err := io.Copy(writer, rc); err != nil {
return err
}
return nil
}

func (f *IpfsFetcher) Close() error {
Expand Down
10 changes: 5 additions & 5 deletions repo/fsrepo/migrations/ipfsfetcher/ipfsfetcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package ipfsfetcher

import (
"bufio"
"bytes"
"context"
"fmt"
"os"
Expand All @@ -28,14 +29,14 @@ func TestIpfsFetcher(t *testing.T) {
fetcher := NewIpfsFetcher("", 0, nil)
defer fetcher.Close()

rc, err := fetcher.Fetch(ctx, "go-ipfs/versions")
var buf bytes.Buffer
err := fetcher.Fetch(ctx, "go-ipfs/versions", &buf)
if err != nil {
t.Fatal(err)
}
defer rc.Close()

var lines []string
scan := bufio.NewScanner(rc)
scan := bufio.NewScanner(&buf)
for scan.Scan() {
lines = append(lines, scan.Text())
}
Expand All @@ -52,8 +53,7 @@ func TestIpfsFetcher(t *testing.T) {
}

// Check not found
_, err = fetcher.Fetch(ctx, "/no_such_file")
if err == nil {
if err = fetcher.Fetch(ctx, "/no_such_file", &bytes.Buffer{}); err == nil {
t.Fatal("expected error 404")
}

Expand Down
6 changes: 4 additions & 2 deletions repo/fsrepo/migrations/migrations_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -290,8 +290,10 @@ func TestReadMigrationConfig(t *testing.T) {

type mockIpfsFetcher struct{}

func (m *mockIpfsFetcher) Fetch(ctx context.Context, filePath string) (io.ReadCloser, error) {
return nil, nil
var _ Fetcher = (*mockIpfsFetcher)(nil)

func (m *mockIpfsFetcher) Fetch(ctx context.Context, filePath string, writer io.Writer) error {
return nil
}

func (m *mockIpfsFetcher) Close() error {
Expand Down
8 changes: 4 additions & 4 deletions repo/fsrepo/migrations/versions.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package migrations

import (
"bufio"
"bytes"
"context"
"errors"
"fmt"
Expand Down Expand Up @@ -39,16 +40,15 @@ func LatestDistVersion(ctx context.Context, fetcher Fetcher, dist string, stable
// available on the distriburion site. List is in ascending order, unless
// sortDesc is true.
func DistVersions(ctx context.Context, fetcher Fetcher, dist string, sortDesc bool) ([]string, error) {
rc, err := fetcher.Fetch(ctx, path.Join(dist, distVersions))
if err != nil {
var buf bytes.Buffer
if err := fetcher.Fetch(ctx, path.Join(dist, distVersions), &buf); err != nil {
return nil, err
}
defer rc.Close()

prefix := "v"
var vers []semver.Version

scan := bufio.NewScanner(rc)
scan := bufio.NewScanner(&buf)
for scan.Scan() {
ver, err := semver.Make(strings.TrimLeft(scan.Text(), prefix))
if err != nil {
Expand Down