Skip to content

Commit

Permalink
feat(repo.Repo.Graph): repos now have a graph method
Browse files Browse the repository at this point in the history
this is going to take some work, but at least now we have the start
of what we need to do repository graph analysis.
  • Loading branch information
b5 committed Nov 16, 2017
1 parent a285644 commit bc1f377
Show file tree
Hide file tree
Showing 7 changed files with 265 additions and 208 deletions.
29 changes: 23 additions & 6 deletions repo/fs/fs.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,13 @@ package fs_repo
import (
"encoding/json"
"fmt"
"github.com/qri-io/cafs"
"github.com/qri-io/doggos"
"io/ioutil"
"os"

"github.com/qri-io/analytics"
"github.com/qri-io/cafs"
"github.com/qri-io/dataset/dsgraph"
"github.com/qri-io/doggos"
"github.com/qri-io/qri/repo"
"github.com/qri-io/qri/repo/profile"
"github.com/qri-io/qri/repo/search"
Expand All @@ -17,6 +18,7 @@ import (
type Repo struct {
store cafs.Filestore
basepath
graph map[string]*dsgraph.Node

Datasets
Namestore
Expand All @@ -38,7 +40,7 @@ func NewRepo(store cafs.Filestore, base, id string) (repo.Repo, error) {
return nil, err
}

repo := &Repo{
r := &Repo{
store: store,
basepath: bp,

Expand All @@ -53,17 +55,32 @@ func NewRepo(store cafs.Filestore, base, id string) (repo.Repo, error) {
}

if index, err := search.LoadIndex(bp.filepath(FileSearchIndex)); err == nil {
repo.index = index
repo.Namestore.index = index
r.index = index
r.Namestore.index = index
}

return repo, nil
go func() {
r.graph, _ = repo.RepoGraph(r)
}()

return r, nil
}

func (r Repo) Store() cafs.Filestore {
return r.store
}

func (r *Repo) Graph() (map[string]*dsgraph.Node, error) {
if r.graph == nil {
nodes, err := repo.RepoGraph(r)
if err != nil {
return nil, err
}
r.graph = nodes
}
return r.graph, nil
}

func (r *Repo) Profile() (*profile.Profile, error) {
p := &profile.Profile{}
data, err := ioutil.ReadFile(r.filepath(FileProfile))
Expand Down
192 changes: 192 additions & 0 deletions repo/graph.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,192 @@
package repo

import (
"sync"

"github.com/ipfs/go-datastore"
"github.com/qri-io/dataset/dsfs"
"github.com/qri-io/dataset/dsgraph"
)

var walkParallelism = 4

// RepoGraph generates a map of all paths on this repository pointing
// to dsgraph.Node structs with all links configured. This is potentially
// expensive to calculate. Best to do some caching.
func RepoGraph(r Repo) (map[string]*dsgraph.Node, error) {
nodes := NodeList{Nodes: map[string]*dsgraph.Node{}}
root := nodes.node(dsgraph.NtNamespace, "root")
mu := sync.Mutex{}
err := WalkRepoDatasets(r, func(prev *dsgraph.Node) func(int, *DatasetRef, error) (bool, error) {
return func(depth int, ref *DatasetRef, e error) (kontinue bool, err error) {
if e != nil {
return false, e
}
mu.Lock()
ds := nodes.nodesFromDatasetRef(r, ref)
prev.AddLinks(dsgraph.Link{From: prev, To: ds})
prev = ds
mu.Unlock()
return true, nil
}
}(root))
return nodes.Nodes, err
}

// QueriesMap returns a mapped subset of a list of nodes in the form:
// QueryHash : DatasetHash
func QueriesMap(nodes map[string]*dsgraph.Node) (qs map[string]datastore.Key) {
qs = map[string]datastore.Key{}
for path, node := range nodes {
if node.Type == dsgraph.NtDataset && len(node.Links) > 0 {
for _, l := range node.Links {
if l.To.Type == dsgraph.NtQuery {
qs[l.To.Path] = datastore.NewKey(path)
}
}
}
}
return
}

// DataNodes returns a map[path]bool of all raw data nodes
func DataNodes(nodes map[string]*dsgraph.Node) (ds map[string]bool) {
ds = map[string]bool{}
for path, node := range nodes {
if node.Type == dsgraph.NtData {
ds[path] = true
}
}
return
}

type NodeList struct {
Nodes map[string]*dsgraph.Node
}

func (nl NodeList) node(t dsgraph.NodeType, path string) *dsgraph.Node {
if nl.Nodes[path] != nil {
return nl.Nodes[path]
}
nl.Nodes[path] = &dsgraph.Node{Type: t, Path: path}
return nl.Nodes[path]
}

func (nl NodeList) nodesFromDatasetRef(r Repo, ref *DatasetRef) *dsgraph.Node {
root := nl.node(dsgraph.NtDataset, ref.Path.String())
ds := ref.Dataset
if ds == nil {
return root
}

root.AddLinks(dsgraph.Link{
From: root,
To: nl.node(dsgraph.NtData, ds.Data.String()),
})

if ds.Previous.Path().String() != "/" {
root.AddLinks(dsgraph.Link{
From: root,
To: nl.node(dsgraph.NtDataset, ds.Previous.Path().String()),
})
}
// if ds.Commit.Path().String() != "" {
// commit := &dsgraph.Node{Type: dsgraph.NtCommit, Path: ds.Commit.Path()}
// root.AddLinks(dsgraph.Link{From: root, To: data})
// }
if ds.AbstractStructure != nil && ds.AbstractStructure.Path().String() != "" {
root.AddLinks(dsgraph.Link{
From: root,
To: nl.node(dsgraph.NtAbstStructure, ds.AbstractStructure.Path().String()),
})
}
if ds.Query != nil && ds.Query.Path().String() != "" {
if q, err := dsfs.LoadQuery(r.Store(), ds.Query.Path()); err == nil {
query := nl.node(dsgraph.NtQuery, ds.Query.Path().String())
if q.Abstract.Path().String() != "" {
query.AddLinks(dsgraph.Link{
From: query,
To: nl.node(dsgraph.NtAbstQuery, q.Abstract.Path().String()),
})
}
for _, ref := range q.Resources {
query.AddLinks(dsgraph.Link{
From: query,
To: nl.node(dsgraph.NtDataset, ref.Path().String()),
})
}
root.AddLinks(dsgraph.Link{From: root, To: query})
}
}

return root
}

// WalkDatasets visits every dataset in the history of a user's namespace
// Yes, this potentially a very expensive function to call, use sparingly.
func WalkRepoDatasets(r Repo, visit func(depth int, ref *DatasetRef, err error) (bool, error)) error {
pll := walkParallelism
store := r.Store()
count, err := r.NameCount()
if err != nil {
return err
} else if count == 0 {
return ErrRepoEmpty
}

if count < pll {
pll = count
}

doSection := func(idx, pageSize int, done chan error) error {
refs, err := r.Namespace(pageSize, idx*pageSize)
if err != nil {
done <- err
return err
}

for _, ref := range refs {
ref.Dataset, err = dsfs.LoadDatasetRefs(store, ref.Path)
kontinue, err := visit(0, ref, err)
if err != nil {
done <- err
return err
}
if !kontinue {
break
}

depth := 1
for ref.Dataset != nil && ref.Dataset.Previous.String() != "" && ref.Dataset.Previous.String() != "/" {
ref.Path = ref.Dataset.Previous
ref.Dataset, err = dsfs.LoadDatasetRefs(store, ref.Path)
kontinue, err = visit(depth, ref, err)
if err != nil {
done <- err
return err
}
if !kontinue {
break
}
depth++
}
}
done <- nil
return nil
}

pageSize := count / pll
done := make(chan error, pll)
for i := 0; i < pll; i++ {
go doSection(i, pageSize, done)
}

for i := 0; i < pll; i++ {
err := <-done
if err != nil {
return err
}
}

return nil
}
Loading

0 comments on commit bc1f377

Please sign in to comment.