Skip to content

Commit

Permalink
feat(dsref): ParseLoadResolve turns strings into datasets
Browse files Browse the repository at this point in the history
intoduce a new interface: dsfs.Loader which is capable of loading datasets
from a passed-in "source" parameter,

a high level ParseLoadResolve function composes the common stack of parsing
a reference, resolving, then loading (with a potential network pull for resolution
and loading) makes a useful inversion of control flow for subsystems that
need this highest-order functionality, while still providing configurability
at the instance level
  • Loading branch information
b5 committed May 30, 2020
1 parent b0c585c commit 0283fc0
Show file tree
Hide file tree
Showing 17 changed files with 304 additions and 146 deletions.
26 changes: 26 additions & 0 deletions base/dataset.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,36 @@ import (
"github.com/qri-io/qfs"
"github.com/qri-io/qfs/cafs"
"github.com/qri-io/qri/base/dsfs"
"github.com/qri-io/qri/dsref"
"github.com/qri-io/qri/repo"
reporef "github.com/qri-io/qri/repo/ref"
)

// NewLocalDatasetLoader creates a dsfs.Loader that operates on a filestore
func NewLocalDatasetLoader(r repo.Repo) dsref.Loader {
return loader{r}
}

type loader struct {
r repo.Repo
}

func (l loader) LoadDataset(ctx context.Context, ref dsref.Ref, source string) (*dataset.Dataset, error) {
// LoadDataset fetches, derefences and opens a dataset from a reference
// implements the dsfs.Loader interface
if source != "" {
return nil, fmt.Errorf("only local datasets can be loaded")
}

ds, err := dsfs.LoadDataset(ctx, l.r.Store(), ref.Path)
if err != nil {
return nil, err
}

err = OpenDataset(ctx, l.r.Filesystem(), ds)
return ds, err
}

// OpenDataset prepares a dataset for use, checking each component
// for populated Path or Byte suffixed fields, consuming those fields to
// set File handlers that are ready for reading
Expand Down
12 changes: 11 additions & 1 deletion base/transform_apply.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"github.com/qri-io/ioes"
"github.com/qri-io/qfs"
"github.com/qri-io/qri/base/dsfs"
"github.com/qri-io/qri/dsref"
"github.com/qri-io/qri/repo"
reporef "github.com/qri-io/qri/repo/ref"
"github.com/qri-io/qri/startf"
Expand All @@ -16,7 +17,15 @@ import (
// TODO(dustmop): Tests. Especially once the `apply` command exists.

// TransformApply applies the transform script to order to modify the changing dataset
func TransformApply(ctx context.Context, ds *dataset.Dataset, r repo.Repo, str ioes.IOStreams, scriptOut io.Writer, secrets map[string]string) error {
func TransformApply(
ctx context.Context,
ds *dataset.Dataset,
r repo.Repo,
loader dsref.ParseResolveLoad,
str ioes.IOStreams,
scriptOut io.Writer,
secrets map[string]string,
) error {
pro, err := r.Profile()
if err != nil {
return err
Expand Down Expand Up @@ -62,6 +71,7 @@ func TransformApply(ctx context.Context, ds *dataset.Dataset, r repo.Repo, str i
startf.AddMutateFieldCheck(mutateCheck),
startf.SetErrWriter(scriptOut),
startf.SetSecrets(secrets),
startf.AddDatasetLoader(loader),
}

if err = startf.ExecScript(ctx, target, head, opts...); err != nil {
Expand Down
12 changes: 10 additions & 2 deletions cmd/sql.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ PostgreSQL in a few ways:
}

cmd.Flags().StringVarP(&o.Format, "format", "f", "table", "set output format [table]")
cmd.Flags().BoolVar(&o.Offline, "offline", false, "prevent network access")

return cmd
}
Expand All @@ -64,8 +65,9 @@ PostgreSQL in a few ways:
type SQLOptions struct {
ioes.IOStreams

Query string
Format string
Query string
Format string
Offline bool

SQLMethods *lib.SQLMethods
}
Expand All @@ -82,9 +84,15 @@ func (o *SQLOptions) Complete(f Factory, args []string) (err error) {
func (o *SQLOptions) Run() (err error) {
o.StartSpinner()

var rm string
if o.Offline {
rm = "local"
}

p := &lib.SQLQueryParams{
Query: o.Query,
OutputFormat: o.Format,
ResolverMode: rm,
}

res := []byte{}
Expand Down
3 changes: 1 addition & 2 deletions cmd/test_runner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -487,8 +487,7 @@ func (run *TestRunner) AddDatasetToRefstore(ctx context.Context, t *testing.T, r
// No existing commit
emptyHeadRef := ""

_, err = base.SaveDataset(ctx, r, initID, emptyHeadRef, ds, base.SaveSwitches{})
if err != nil {
if _, err = base.SaveDataset(ctx, r, initID, emptyHeadRef, ds, base.SaveSwitches{}); err != nil {
t.Fatal(err)
}
}
47 changes: 47 additions & 0 deletions dsref/load.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
package dsref

import (
"context"
"fmt"

"github.com/qri-io/dataset"
qerr "github.com/qri-io/qri/errors"
)

// Loader loads and opens a dataset. The only useful implementation of the
// loader interface is in github.com/qri-io/qri/lib.
// TODO(b5) - This interface is a work-in-progress
type Loader interface {
LoadDataset(ctx context.Context, ref Ref, source string) (*dataset.Dataset, error)
}

// ParseResolveLoad is the function type returned by NewParseResolveLoader
type ParseResolveLoad func(ctx context.Context, refStr string) (*dataset.Dataset, error)

// NewParseResolveLoadFunc composes a username, resolver, and loader into a
// higher-order function that converts strings to full datasets
// pass the empty string as a username to disable the "me" keyword in references
func NewParseResolveLoadFunc(username string, resolver Resolver, loader Loader) ParseResolveLoad {
return func(ctx context.Context, refStr string) (*dataset.Dataset, error) {
ref, err := Parse(refStr)
if err != nil {
return nil, err
}

if username == "" && ref.Username == "me" {
msg := fmt.Sprintf(`Can't use the "me" keyword to refer to a dataset in this context.
Replace "me" with your username for the reference:
%s`, refStr)
return nil, qerr.New(fmt.Errorf("invalid contextual reference"), msg)
} else if username != "" && ref.Username == "me" {
ref.Username = username
}

source, err := resolver.ResolveRef(ctx, &ref)
if err != nil {
return nil, err
}

return loader.LoadDataset(ctx, ref, source)
}
}
28 changes: 27 additions & 1 deletion dsref/resolve.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,8 @@ type Resolver interface {
}

// ParallelResolver composes multiple resolvers into one resolver that runs
// in parallel when called, returning the first valid response
// in parallel when called, using the first resolver that doesn't return
// ErrorNotFound
func ParallelResolver(resolvers ...Resolver) Resolver {
return parallelResolver(resolvers)
}
Expand Down Expand Up @@ -82,3 +83,28 @@ func (rs parallelResolver) ResolveRef(ctx context.Context, ref *Ref) (string, er
}
}
}

// SequentialResolver composes multiple resolvers into one that runs each
// resolver in sequence, using the first resolver that doesn't return
// ErrorNotFound
func SequentialResolver(resolvers ...Resolver) Resolver {
return sequentialResolver(resolvers)
}

type sequentialResolver []Resolver

func (sr sequentialResolver) ResolveRef(ctx context.Context, ref *Ref) (string, error) {
for _, resolver := range sr {
resolvedSource, err := resolver.ResolveRef(ctx, ref)
if err != nil {
if errors.Is(err, ErrNotFound) {
continue
} else {
return "", err
}
}
return resolvedSource, nil
}

return "", ErrNotFound
}
4 changes: 4 additions & 0 deletions dsref/resolve_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,3 +7,7 @@ import (
func TestParallelResolver(t *testing.T) {
t.Skip("TODO(b5)")
}

func TestSequentialResolver(t *testing.T) {
t.Skip("TODO(b5)")
}
21 changes: 15 additions & 6 deletions lib/datasets.go
Original file line number Diff line number Diff line change
Expand Up @@ -231,11 +231,11 @@ func (m *DatasetMethods) Get(p *GetParams, res *GetResult) error {
ctx := context.TODO()

var ds *dataset.Dataset
ref, _, err := m.inst.ParseAndResolveRefWithWorkingDir(ctx, p.Refstr, p.Remote)
ref, source, err := m.inst.ParseAndResolveRefWithWorkingDir(ctx, p.Refstr, p.Remote)
if err != nil {
return err
}
ds, err = m.inst.loadDataset(ctx, ref)
ds, err = m.inst.LoadDataset(ctx, ref, source)
if err != nil {
return err
}
Expand Down Expand Up @@ -632,7 +632,16 @@ func (m *DatasetMethods) Save(p *SaveParams, res *reporef.DatasetRef) error {
return nil
}
}
err := base.TransformApply(ctx, ds, r, str, scriptOut, secrets)

// create a loader so transforms can call `load_dataset`
// TODO(b5) - add a ResolverMode save parameter and call m.inst.resolverMode
// on the passed in mode string instead of just using the default resolver
// cmd can then define "remote" and "offline" flags, that set the ResolverMode
// string and control how transform functions
loader := dsref.NewParseResolveLoadFunc("", m.inst.defaultResolver(), m.inst)

// apply the transform
err := base.TransformApply(ctx, ds, r, loader, str, scriptOut, secrets)
if err != nil {
return err
}
Expand Down Expand Up @@ -1300,13 +1309,13 @@ func (m *DatasetMethods) Stats(p *StatsParams, res *StatsResponse) error {

if p.Dataset == nil {
// TODO (b5) - stats is currently local-only, supply a source parameter
ref, _, err := m.inst.ParseAndResolveRefWithWorkingDir(ctx, p.Ref, "local")
ref, source, err := m.inst.ParseAndResolveRefWithWorkingDir(ctx, p.Ref, "local")
if err != nil {
return err
}
p.Dataset, err = m.inst.loadDataset(ctx, ref)
p.Dataset, err = m.inst.LoadDataset(ctx, ref, source)
if err != nil {
return fmt.Errorf("loading dataset: %s", err)
return fmt.Errorf("loading dataset: %w", err)
}
}

Expand Down
2 changes: 1 addition & 1 deletion lib/fsi.go
Original file line number Diff line number Diff line change
Expand Up @@ -190,7 +190,7 @@ func (m *FSIMethods) Checkout(p *CheckoutParams, out *string) (err error) {
}

// Load dataset that is being checked out.
ds, err := m.inst.loadDataset(ctx, ref)
ds, err := m.inst.LoadDataset(ctx, ref, "")
if err != nil {
log.Debugf("Checkout, dsfs.LoadDataset failed, error: %s", err)
return err
Expand Down
74 changes: 74 additions & 0 deletions lib/load.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
package lib

import (
"context"
"fmt"
"strings"

"github.com/qri-io/dataset"
"github.com/qri-io/qri/base"
"github.com/qri-io/qri/base/dsfs"
"github.com/qri-io/qri/dsref"
"github.com/qri-io/qri/fsi"
reporef "github.com/qri-io/qri/repo/ref"
)

// LoadDataset fetches, derefences and opens a dataset from a reference
// implements the dsfs.Loader interface
func (inst *Instance) LoadDataset(ctx context.Context, ref dsref.Ref, source string) (*dataset.Dataset, error) {
if source == "" {
return inst.loadLocalDataset(ctx, ref)
}

// TODO(b5) - for now we're assuming any non-local source must fetch from the registry
if inst.cfg.Registry == nil {
return nil, fmt.Errorf("can't fetch remote dataset %q without a configured registry", ref)
} else if inst.cfg.Registry.Location == "" {
return nil, fmt.Errorf("can't fetch remote dataset %q without a configured registry", ref)
}

source = inst.cfg.Registry.Location

msg := fmt.Sprintf("pulling dataset from registry: %s ...\n", ref)
inst.streams.Out.Write([]byte(msg))

if err := inst.remoteClient.CloneLogs(ctx, ref, source); err != nil {
return nil, err
}

rref := reporef.RefFromDsref(ref)
if err := inst.remoteClient.AddDataset(ctx, &rref, source); err != nil {
return nil, err
}

return inst.loadLocalDataset(ctx, ref)
}

func (inst *Instance) loadLocalDataset(ctx context.Context, ref dsref.Ref) (*dataset.Dataset, error) {
var (
ds *dataset.Dataset
err error
)

if strings.HasPrefix(ref.Path, "/fsi") {
// Has an FSI Path, load from working directory
if ds, err = fsi.ReadDir(strings.TrimPrefix(ref.Path, "/fsi")); err != nil {
return nil, err
}
} else {
// Load from dsfs
if ds, err = dsfs.LoadDataset(ctx, inst.store, ref.Path); err != nil {
return nil, err
}
}
// Set transient info on the returned dataset
ds.Name = ref.Name
ds.Peername = ref.Username

if err = base.OpenDataset(ctx, inst.repo.Filesystem(), ds); err != nil {
log.Debugf("Get dataset, base.OpenDataset failed, error: %s", err)
return nil, err
}

return ds, nil
}
4 changes: 2 additions & 2 deletions lib/render.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,12 +95,12 @@ func (m *RenderMethods) RenderReadme(p *RenderParams, res *string) (err error) {
if p.Dataset != nil {
ds = p.Dataset
} else {
ref, _, err := m.inst.ParseAndResolveRefWithWorkingDir(ctx, p.Ref, "local")
ref, source, err := m.inst.ParseAndResolveRefWithWorkingDir(ctx, p.Ref, "local")
if err != nil {
return err
}

ds, err = m.inst.loadDataset(ctx, ref)
ds, err = m.inst.LoadDataset(ctx, ref, source)
if err != nil {
return fmt.Errorf("loading dataset: %w", err)
}
Expand Down
Loading

0 comments on commit 0283fc0

Please sign in to comment.