Skip to content

Commit

Permalink
feat: refactor Fetcher interface used for downloading migrations (#8728)
Browse files Browse the repository at this point in the history
* feat: refactor Fetcher interface used for downloading migrations
* feat: add RetryFetcher for migration downloads
* feat: 3 retries for each HTTP migration download

(cherry picked from commit b1ffc87)
  • Loading branch information
aschmahmann committed Feb 17, 2022
1 parent 310a547 commit 687a834
Show file tree
Hide file tree
Showing 10 changed files with 83 additions and 44 deletions.
7 changes: 3 additions & 4 deletions repo/fsrepo/migrations/fetch.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,15 +111,14 @@ func FetchBinary(ctx context.Context, fetcher Fetcher, dist, ver, binName, out s
}
defer arcFile.Close()

// Open connection to download archive from ipfs path
rc, err := fetcher.Fetch(ctx, arcDistPath)
// Open connection to download archive from ipfs path and write to file
arcBytes, err := fetcher.Fetch(ctx, arcDistPath)
if err != nil {
return "", err
}
defer rc.Close()

// Write download data
_, err = io.Copy(arcFile, rc)
_, err = io.Copy(arcFile, bytes.NewReader(arcBytes))
if err != nil {
return "", err
}
Expand Down
14 changes: 4 additions & 10 deletions repo/fsrepo/migrations/fetch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,10 @@ package migrations

import (
"bufio"
"bytes"
"context"
"fmt"
"io"
"io/ioutil"
"net/http"
"net/http/httptest"
"os"
Expand Down Expand Up @@ -96,14 +96,13 @@ func TestHttpFetch(t *testing.T) {

fetcher := NewHttpFetcher("", ts.URL, "", 0)

rc, err := fetcher.Fetch(ctx, "/versions")
out, err := fetcher.Fetch(ctx, "/versions")
if err != nil {
t.Fatal(err)
}
defer rc.Close()

var lines []string
scan := bufio.NewScanner(rc)
scan := bufio.NewScanner(bytes.NewReader(out))
for scan.Scan() {
lines = append(lines, scan.Text())
}
Expand Down Expand Up @@ -232,16 +231,11 @@ func TestMultiFetcher(t *testing.T) {

mf := NewMultiFetcher(badFetcher, fetcher)

rc, err := mf.Fetch(ctx, "/versions")
vers, err := mf.Fetch(ctx, "/versions")
if err != nil {
t.Fatal(err)
}
defer rc.Close()

vers, err := ioutil.ReadAll(rc)
if err != nil {
t.Fatal("could not read versions:", err)
}
if len(vers) < 45 {
fmt.Println("unexpected more data")
}
Expand Down
10 changes: 4 additions & 6 deletions repo/fsrepo/migrations/fetcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,7 @@ const (

type Fetcher interface {
// Fetch attempts to fetch the file at the given ipfs path.
// Returns io.ReadCloser on success, which caller must close.
Fetch(ctx context.Context, filePath string) (io.ReadCloser, error)
Fetch(ctx context.Context, filePath string) ([]byte, error)
// Close performs any cleanup after the fetcher is not longer needed.
Close() error
}
Expand Down Expand Up @@ -50,13 +49,12 @@ func NewMultiFetcher(f ...Fetcher) *MultiFetcher {
}

// Fetch attempts to fetch the file at each of its fetchers until one succeeds.
// Returns io.ReadCloser on success, which caller must close.
func (f *MultiFetcher) Fetch(ctx context.Context, ipfsPath string) (io.ReadCloser, error) {
func (f *MultiFetcher) Fetch(ctx context.Context, ipfsPath string) ([]byte, error) {
var errs error
for _, fetcher := range f.fetchers {
rc, err := fetcher.Fetch(ctx, ipfsPath)
out, err := fetcher.Fetch(ctx, ipfsPath)
if err == nil {
return rc, nil
return out, nil
}
fmt.Printf("Error fetching: %s\n", err.Error())
errs = multierror.Append(errs, err)
Expand Down
14 changes: 9 additions & 5 deletions repo/fsrepo/migrations/httpfetcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,9 +60,8 @@ func NewHttpFetcher(distPath, gateway, userAgent string, fetchLimit int64) *Http
}

// Fetch attempts to fetch the file at the given path, from the distribution
// site configured for this HttpFetcher. Returns io.ReadCloser on success,
// which caller must close.
func (f *HttpFetcher) Fetch(ctx context.Context, filePath string) (io.ReadCloser, error) {
// site configured for this HttpFetcher.
func (f *HttpFetcher) Fetch(ctx context.Context, filePath string) ([]byte, error) {
gwURL := f.gateway + path.Join(f.distPath, filePath)
fmt.Printf("Fetching with HTTP: %q\n", gwURL)

Expand All @@ -89,10 +88,15 @@ func (f *HttpFetcher) Fetch(ctx context.Context, filePath string) (io.ReadCloser
return nil, fmt.Errorf("GET %s error: %s: %s", gwURL, resp.Status, string(mes))
}

var rc io.ReadCloser
if f.limit != 0 {
return NewLimitReadCloser(resp.Body, f.limit), nil
rc = NewLimitReadCloser(resp.Body, f.limit)
} else {
rc = resp.Body
}
return resp.Body, nil
defer rc.Close()

return ioutil.ReadAll(rc)
}

func (f *HttpFetcher) Close() error {
Expand Down
16 changes: 11 additions & 5 deletions repo/fsrepo/migrations/ipfsfetcher/ipfsfetcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,8 @@ type IpfsFetcher struct {
addrInfo peer.AddrInfo
}

var _ migrations.Fetcher = (*IpfsFetcher)(nil)

// NewIpfsFetcher creates a new IpfsFetcher
//
// Specifying "" for distPath sets the default IPNS path.
Expand Down Expand Up @@ -85,9 +87,8 @@ func NewIpfsFetcher(distPath string, fetchLimit int64, repoRoot *string) *IpfsFe
}

// Fetch attempts to fetch the file at the given path, from the distribution
// site configured for this HttpFetcher. Returns io.ReadCloser on success,
// which caller must close.
func (f *IpfsFetcher) Fetch(ctx context.Context, filePath string) (io.ReadCloser, error) {
// site configured for this HttpFetcher.
func (f *IpfsFetcher) Fetch(ctx context.Context, filePath string) ([]byte, error) {
// Initialize and start IPFS node on first call to Fetch, since the fetcher
// may be created by not used.
f.openOnce.Do(func() {
Expand Down Expand Up @@ -123,10 +124,15 @@ func (f *IpfsFetcher) Fetch(ctx context.Context, filePath string) (io.ReadCloser
return nil, fmt.Errorf("%q is not a file", filePath)
}

var rc io.ReadCloser
if f.limit != 0 {
return migrations.NewLimitReadCloser(fileNode, f.limit), nil
rc = migrations.NewLimitReadCloser(fileNode, f.limit)
} else {
rc = fileNode
}
return fileNode, nil
defer rc.Close()

return ioutil.ReadAll(rc)
}

func (f *IpfsFetcher) Close() error {
Expand Down
9 changes: 4 additions & 5 deletions repo/fsrepo/migrations/ipfsfetcher/ipfsfetcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package ipfsfetcher

import (
"bufio"
"bytes"
"context"
"fmt"
"os"
Expand All @@ -28,14 +29,13 @@ func TestIpfsFetcher(t *testing.T) {
fetcher := NewIpfsFetcher("", 0, nil)
defer fetcher.Close()

rc, err := fetcher.Fetch(ctx, "go-ipfs/versions")
out, err := fetcher.Fetch(ctx, "go-ipfs/versions")
if err != nil {
t.Fatal(err)
}
defer rc.Close()

var lines []string
scan := bufio.NewScanner(rc)
scan := bufio.NewScanner(bytes.NewReader(out))
for scan.Scan() {
lines = append(lines, scan.Text())
}
Expand All @@ -52,8 +52,7 @@ func TestIpfsFetcher(t *testing.T) {
}

// Check not found
_, err = fetcher.Fetch(ctx, "/no_such_file")
if err == nil {
if _, err = fetcher.Fetch(ctx, "/no_such_file"); err == nil {
t.Fatal("expected error 404")
}

Expand Down
5 changes: 3 additions & 2 deletions repo/fsrepo/migrations/migrations.go
Original file line number Diff line number Diff line change
Expand Up @@ -155,13 +155,14 @@ func ReadMigrationConfig(repoRoot string) (*config.Migration, error) {
// downloadSources,
func GetMigrationFetcher(downloadSources []string, distPath string, newIpfsFetcher func(string) Fetcher) (Fetcher, error) {
const httpUserAgent = "go-ipfs"
const numTriesPerHTTP = 3

var fetchers []Fetcher
for _, src := range downloadSources {
src := strings.TrimSpace(src)
switch src {
case "HTTPS", "https", "HTTP", "http":
fetchers = append(fetchers, NewHttpFetcher(distPath, "", httpUserAgent, 0))
fetchers = append(fetchers, &RetryFetcher{NewHttpFetcher(distPath, "", httpUserAgent, 0), numTriesPerHTTP})
case "IPFS", "ipfs":
if newIpfsFetcher != nil {
fetchers = append(fetchers, newIpfsFetcher(distPath))
Expand All @@ -178,7 +179,7 @@ func GetMigrationFetcher(downloadSources []string, distPath string, newIpfsFetch
default:
return nil, errors.New("bad gateway address: url scheme must be http or https")
}
fetchers = append(fetchers, NewHttpFetcher(distPath, u.String(), httpUserAgent, 0))
fetchers = append(fetchers, &RetryFetcher{NewHttpFetcher(distPath, u.String(), httpUserAgent, 0), numTriesPerHTTP})
case "":
// Ignore empty string
}
Expand Down
13 changes: 9 additions & 4 deletions repo/fsrepo/migrations/migrations_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package migrations
import (
"context"
"fmt"
"io"
"log"
"os"
"path/filepath"
Expand Down Expand Up @@ -290,7 +289,9 @@ func TestReadMigrationConfig(t *testing.T) {

type mockIpfsFetcher struct{}

func (m *mockIpfsFetcher) Fetch(ctx context.Context, filePath string) (io.ReadCloser, error) {
var _ Fetcher = (*mockIpfsFetcher)(nil)

func (m *mockIpfsFetcher) Fetch(ctx context.Context, filePath string) ([]byte, error) {
return nil, nil
}

Expand Down Expand Up @@ -323,7 +324,9 @@ func TestGetMigrationFetcher(t *testing.T) {
if err != nil {
t.Fatal(err)
}
if _, ok := f.(*HttpFetcher); !ok {
if rf, ok := f.(*RetryFetcher); !ok {
t.Fatal("expected RetryFetcher")
} else if _, ok := rf.Fetcher.(*HttpFetcher); !ok {
t.Fatal("expected HttpFetcher")
}

Expand All @@ -341,7 +344,9 @@ func TestGetMigrationFetcher(t *testing.T) {
if err != nil {
t.Fatal(err)
}
if _, ok := f.(*HttpFetcher); !ok {
if rf, ok := f.(*RetryFetcher); !ok {
t.Fatal("expected RetryFetcher")
} else if _, ok := rf.Fetcher.(*HttpFetcher); !ok {
t.Fatal("expected HttpFetcher")
}

Expand Down
33 changes: 33 additions & 0 deletions repo/fsrepo/migrations/retryfetcher.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
package migrations

import (
"context"
"fmt"
)

type RetryFetcher struct {
Fetcher
MaxTries int
}

var _ Fetcher = (*RetryFetcher)(nil)

func (r *RetryFetcher) Fetch(ctx context.Context, filePath string) ([]byte, error) {
var lastErr error
for i := 0; i < r.MaxTries; i++ {
out, err := r.Fetcher.Fetch(ctx, filePath)
if err == nil {
return out, nil
}

if ctx.Err() != nil {
return nil, ctx.Err()
}
lastErr = err
}
return nil, fmt.Errorf("exceeded number of retries. last error was %w", lastErr)
}

func (r *RetryFetcher) Close() error {
return r.Fetcher.Close()
}
6 changes: 3 additions & 3 deletions repo/fsrepo/migrations/versions.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package migrations

import (
"bufio"
"bytes"
"context"
"errors"
"fmt"
Expand Down Expand Up @@ -39,16 +40,15 @@ func LatestDistVersion(ctx context.Context, fetcher Fetcher, dist string, stable
// available on the distriburion site. List is in ascending order, unless
// sortDesc is true.
func DistVersions(ctx context.Context, fetcher Fetcher, dist string, sortDesc bool) ([]string, error) {
rc, err := fetcher.Fetch(ctx, path.Join(dist, distVersions))
versionBytes, err := fetcher.Fetch(ctx, path.Join(dist, distVersions))
if err != nil {
return nil, err
}
defer rc.Close()

prefix := "v"
var vers []semver.Version

scan := bufio.NewScanner(rc)
scan := bufio.NewScanner(bytes.NewReader(versionBytes))
for scan.Scan() {
ver, err := semver.Make(strings.TrimLeft(scan.Text(), prefix))
if err != nil {
Expand Down

0 comments on commit 687a834

Please sign in to comment.