Skip to content

Commit

Permalink
fix: more work on settling Transform refactor
Browse files Browse the repository at this point in the history
  • Loading branch information
b5 committed Dec 12, 2017
1 parent f3dea07 commit 6010e9f
Show file tree
Hide file tree
Showing 5 changed files with 106 additions and 141 deletions.
101 changes: 41 additions & 60 deletions core/queries.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,14 +36,16 @@ func (d *QueryRequests) List(p *ListParams, res *[]*repo.DatasetRef) error {
return d.cli.Call("QueryRequests.List", p, res)
}

results, err := d.repo.GetQueryLogs(p.Limit, p.Offset)
items, err := d.repo.ListQueryLogs(p.Limit, p.Offset)
if err != nil {
return fmt.Errorf("error getting query logs: %s", err.Error())
}

for _, ref := range results {
if ds, err := dsfs.LoadDataset(d.repo.Store(), ref.Path); err == nil {
ref.Dataset = ds
results := make([]*repo.DatasetRef, len(items))
for i, item := range items {
results[i].Path = item.DatasetPath
if ds, err := dsfs.LoadDataset(d.repo.Store(), item.DatasetPath); err == nil {
results[i].Dataset = ds
}
}

Expand Down Expand Up @@ -95,17 +97,16 @@ func (r *QueryRequests) Run(p *RunParams, res *repo.DatasetRef) error {
}

var (
store = r.repo.Store()
transform *dataset.Transform
results []byte
err error
ds = p.Dataset
store = r.repo.Store()
abst *dataset.Transform
results []byte
err error
ds = p.Dataset
)

if ds == nil {
return fmt.Errorf("dataset is required")
}
// fmt.Println("running query: %s", p.Dataset.QueryString)

ds.Timestamp = time.Now()

Expand All @@ -117,16 +118,6 @@ func (r *QueryRequests) Run(p *RunParams, res *repo.DatasetRef) error {
}
}

// if ds.QueryString == "" {
// ds.QueryString = q.Abstract.Statement
// }

// TODO - make format output the parsed statement as well
// to avoid triple-parsing
// sqlstr, _, remap, err := sql.Format(ds.QueryString)
// if err != nil {
// return err
// }
names, err := sql.StatementTableNames(q.Data)
if err != nil {
return fmt.Errorf("error getting statement table names: %s", err.Error())
Expand All @@ -148,67 +139,53 @@ func (r *QueryRequests) Run(p *RunParams, res *repo.DatasetRef) error {
}
}

// func PreparedQueryPath(fs cafs.Filestore, q *dataset.Query, opts *ExecOpt) (datastore.Key, error) {
// q2 := &dataset.Query{}
// q2.Assign(q)
// prep, err := Prepare(q2, opts)
// if err != nil {
// return datastore.NewKey(""), err
// }
// return dsfs.SaveQuery(fs, prep.q, false)
// }

q2 := &dataset.Transform{}
q2.Assign(q)
_, abst, err := sql.Format(q, func(o *sql.ExecOpt) {
// _, abst, err = sql.Format(q, func(o *sql.ExecOpt) {
// o.Format = dataset.CSVDataFormat
// })
// if err != nil {
// return fmt.Errorf("formatting error: %s", err.Error())
// }
// qpath, err := dsfs.SaveAbstractTransform(store, abst, false)
fmt.Println("queries", q.Data, q2.Data)
qrpath, err := sql.QueryRecordPath(store, q2, func(o *sql.ExecOpt) {
o.Format = dataset.CSVDataFormat
})
if err != nil {
return fmt.Errorf("formatting error: %s", err.Error())
}
qpath, err := dsfs.SaveAbstractTransform(store, abst, false)
if err != nil {
return fmt.Errorf("error calculating query hash: %s", err.Error())
}

// fmt.Println(qpath.String())
// atb, _ := abst.MarshalJSON()
// fmt.Println(string(atb))

// qpath, err := sql.PreparedQueryPath(r.repo.Store(), q, &sql.ExecOpt{Format: dataset.CSVDataFormat})
// if err != nil {
// return fmt.Errorf("error calculating query hash: %s", err.Error())
// }

if dsp, err := repo.DatasetForQuery(r.repo, qpath); err != nil && err != repo.ErrNotFound {
if qi, err := r.repo.QueryLogItem(&repo.QueryLogItem{Key: qrpath}); err != nil && err != repo.ErrNotFound {
return fmt.Errorf("error checking for existing query: %s", err.Error())
} else if err != repo.ErrNotFound {
if ds, err := dsfs.LoadDataset(store, dsp); err == nil {
ref := &repo.DatasetRef{Name: p.SaveName, Path: dsp, Dataset: ds}
if err := r.repo.LogQuery(ref); err != nil {
return fmt.Errorf("error logging query to repo: %s", err.Error())
if ds, err := dsfs.LoadDataset(store, qi.DatasetPath); err == nil {
// ref := &repo.QueryLogItem{Name: p.SaveName, Query: q.Data, Key: dsp, Dataset: dsp}
// if err := r.repo.LogQuery(ref); err != nil {
// return fmt.Errorf("error logging query to repo: %s", err.Error())
// }
*res = repo.DatasetRef{
Path: qi.DatasetPath,
Dataset: ds,
}
*res = *ref
return nil
}
}

// TODO - detect data format from passed-in results structure
transform, results, err = sql.Exec(store, q, func(o *sql.ExecOpt) {
abst, results, err = sql.Exec(store, q, func(o *sql.ExecOpt) {
o.Format = dataset.CSVDataFormat
})
if err != nil {
return fmt.Errorf("error executing query: %s", err.Error())
}

// tb, _ := transform.MarshalJSON()
// fmt.Println(string(tb))

// TODO - move this into setting on the dataset outparam
ds.Structure = transform.Structure
ds.Structure = q.Structure
ds.Length = len(results)
ds.Transform = q
ds.AbstractTransform = transform
ds.AbstractTransform = abst
fmt.Printf("abst: %#v\n", abst)

datakey, err := store.Put(memfs.NewMemfileBytes("data."+ds.Structure.Format.String(), results), false)
if err != nil {
Expand All @@ -232,15 +209,19 @@ func (r *QueryRequests) Run(p *RunParams, res *repo.DatasetRef) error {
if err := dsfs.DerefDatasetStructure(store, ds); err != nil {
return fmt.Errorf("error dereferencing dataset structure: %s", err.Error())
}
// fmt.Println("result query:", ds.AbstractTransform.Path())
if err := dsfs.DerefDatasetTransform(store, ds); err != nil {
return fmt.Errorf("error dereferencing dataset query: %s", err.Error())
}
// fmt.Println(ds.AbstractTransform.Path().String())

ref := &repo.DatasetRef{Name: p.SaveName, Path: dspath, Dataset: ds}

if err := r.repo.LogQuery(ref); err != nil {
item := &repo.QueryLogItem{
Query: ds.QueryString,
Name: p.SaveName,
Key: qrpath,
DatasetPath: dspath,
Time: time.Now(),
}
if err := r.repo.LogQuery(item); err != nil {
return fmt.Errorf("error logging query to repo: %s", err.Error())
}

Expand Down
94 changes: 24 additions & 70 deletions repo/fs/query_logs.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"fmt"
"io/ioutil"
"os"
"sort"

"github.com/qri-io/cafs"
"github.com/qri-io/qri/repo"
Expand All @@ -20,16 +21,34 @@ func NewQueryLog(base string, file File, store cafs.Filestore) QueryLog {
return QueryLog{basepath: basepath(base), file: file, store: store}
}

func (ql QueryLog) LogQuery(ref *repo.DatasetRef) error {
func (ql QueryLog) LogQuery(item *repo.QueryLogItem) error {
log, err := ql.logs()
if err != nil {
return err
}
log = append([]*repo.DatasetRef{&repo.DatasetRef{Name: ref.Name, Path: ref.Path}}, log...)
log = append(log, item)
sort.Slice(log, func(i, j int) bool { return log[i].Time.Before(log[j].Time) })
return ql.saveFile(log, ql.file)
}

func (ql QueryLog) GetQueryLogs(limit, offset int) ([]*repo.DatasetRef, error) {
func (ql QueryLog) QueryLogItem(q *repo.QueryLogItem) (*repo.QueryLogItem, error) {
log, err := ql.logs()
if err != nil {
return nil, err
}

for _, item := range log {
if item.DatasetPath.Equal(q.DatasetPath) ||
item.Query == q.Query ||
item.Time.Equal(q.Time) ||
item.Key.Equal(q.Key) {
return item, nil
}
}
return nil, repo.ErrNotFound
}

func (ql QueryLog) ListQueryLogs(limit, offset int) ([]*repo.QueryLogItem, error) {
logs, err := ql.logs()
if err != nil {
return nil, err
Expand All @@ -46,73 +65,8 @@ func (ql QueryLog) GetQueryLogs(limit, offset int) ([]*repo.DatasetRef, error) {
return logs[offset:stop], nil
}

// func (r QueryLog) PutDataset(path datastore.Key, ds *dataset.Dataset) error {
// d, err := r.logs()
// if err != nil {
// return err
// }
// d[path.String()] = ds
// return r.saveFile(d, r.file)
// }

// func (r QueryLog) PutQueryLog(logs []*repo.repo.DatasetRef) error {
// ds, err := r.logs()
// if err != nil {
// return err
// }
// for _, dr := range logs {
// ps := dr.Path.String()
// if ps != "" && dr.Dataset != nil {
// ds[ps] = dr.Dataset
// }
// }
// return r.saveFile(ds, r.file)
// }

// func (r QueryLog) GetDataset(path datastore.Key) (*dataset.Dataset, error) {
// ds, err := r.logs()
// if err != nil {
// return nil, err
// }
// ps := path.String()
// for p, d := range ds {
// if ps == p {
// return d, nil
// }
// }
// if r.store != nil {
// return dsfs.LoadDataset(r.store, path)
// }

// return nil, datastore.ErrNotFound
// }

// func (r QueryLog) DeleteDataset(path datastore.Key) error {
// ds, err := r.logs()
// if err != nil {
// return err
// }
// delete(ds, path.String())
// return r.saveFile(ds, r.file)
// }

// func (r QueryLog) Query(q query.Query) (query.Results, error) {
// ds, err := r.logs()
// if err != nil {
// return nil, err
// }

// re := make([]query.Entry, 0, len(ds))
// for path, d := range ds {
// re = append(re, query.Entry{Key: path, Value: d})
// }
// res := query.ResultsWithEntries(q, re)
// res = query.NaiveQueryApply(q, res)
// return res, nil
// }

func (r *QueryLog) logs() ([]*repo.DatasetRef, error) {
ds := []*repo.DatasetRef{}
func (r *QueryLog) logs() ([]*repo.QueryLogItem, error) {
ds := []*repo.QueryLogItem{}
data, err := ioutil.ReadFile(r.filepath(r.file))
if err != nil {
if os.IsNotExist(err) {
Expand Down
12 changes: 7 additions & 5 deletions repo/graph.go
Original file line number Diff line number Diff line change
Expand Up @@ -259,18 +259,20 @@ func WalkRepoDatasets(r Repo, visit func(depth int, ref *DatasetRef, err error)

// TODO - make properly parallel
go func() {
refs, err := r.GetQueryLogs(1000, 0)
items, err := r.ListQueryLogs(1000, 0)
if err != nil {
done <- err
}
for _, ref := range refs {
ref.Dataset, err = dsfs.LoadDatasetRefs(store, ref.Path)
for _, item := range items {
ref := &DatasetRef{Path: item.DatasetPath}

ref.Dataset, err = dsfs.LoadDatasetRefs(store, item.DatasetPath)
// TODO - remove this once loading is more consistent.
if err != nil {
ref.Dataset, err = dsfs.LoadDatasetRefs(store, ref.Path)
ref.Dataset, err = dsfs.LoadDatasetRefs(store, item.DatasetPath)
}
if err != nil {
ref.Dataset, err = dsfs.LoadDatasetRefs(store, ref.Path)
ref.Dataset, err = dsfs.LoadDatasetRefs(store, item.DatasetPath)
}

kontinue, err := visit(0, ref, err)
Expand Down
26 changes: 22 additions & 4 deletions repo/mem_logs.go
Original file line number Diff line number Diff line change
@@ -1,13 +1,31 @@
package repo

type MemQueryLog []*DatasetRef
import (
"sort"
)

func (ql *MemQueryLog) LogQuery(ref *DatasetRef) error {
*ql = append(*ql, &DatasetRef{Name: ref.Name, Path: ref.Path})
type MemQueryLog []*QueryLogItem

func (ql *MemQueryLog) LogQuery(item *QueryLogItem) error {
logs := append(*ql, item)
sort.Slice(logs, func(i, j int) bool { return logs[i].Time.Before(logs[j].Time) })
*ql = logs
return nil
}

func (ql MemQueryLog) GetQueryLogs(limit, offset int) ([]*DatasetRef, error) {
func (ql *MemQueryLog) QueryLogItem(q *QueryLogItem) (*QueryLogItem, error) {
for _, item := range *ql {
if item.DatasetPath.Equal(q.DatasetPath) ||
item.Query == q.Query ||
item.Time.Equal(q.Time) ||
item.Key.Equal(q.Key) {
return item, nil
}
}
return nil, ErrNotFound
}

func (ql MemQueryLog) ListQueryLogs(limit, offset int) ([]*QueryLogItem, error) {
if offset > len(ql) {
offset = len(ql)
}
Expand Down
14 changes: 12 additions & 2 deletions repo/repo.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ package repo

import (
"fmt"
"time"

"github.com/ipfs/go-datastore"
"github.com/ipfs/go-datastore/query"
Expand Down Expand Up @@ -105,10 +106,19 @@ type Datasets interface {
Query(query.Query) (query.Results, error)
}

type QueryLogItem struct {
Query string
Name string
Key datastore.Key
DatasetPath datastore.Key
Time time.Time
}

// QueryLog keeps logs
type QueryLog interface {
LogQuery(*DatasetRef) error
GetQueryLogs(limit, offset int) ([]*DatasetRef, error)
LogQuery(*QueryLogItem) error
ListQueryLogs(limit, offset int) ([]*QueryLogItem, error)
QueryLogItem(q *QueryLogItem) (*QueryLogItem, error)
}

// SearchParams encapsulates parameters provided to Searchable.Search
Expand Down

0 comments on commit 6010e9f

Please sign in to comment.