Skip to content

Commit

Permalink
wip -- draft bundle diff
Browse files Browse the repository at this point in the history
Signed-off-by: Ransom Williams <rwilliams@oneconcern.com>
  • Loading branch information
ransomw1c committed Jun 24, 2019
1 parent 72c11af commit 824c5bb
Show file tree
Hide file tree
Showing 5 changed files with 423 additions and 14 deletions.
110 changes: 110 additions & 0 deletions cmd/datamon/cmd/bundle_diff.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
// Copyright © 2018 One Concern

package cmd

import (
"bytes"
"context"
"log"
"text/template"

"github.com/oneconcern/datamon/pkg/core"
"github.com/oneconcern/datamon/pkg/storage/gcs"
"github.com/oneconcern/datamon/pkg/storage/localfs"

"github.com/spf13/afero"
"github.com/spf13/cobra"
)

var bundleDiffCmd = &cobra.Command{
Use: "diff",
Short: "Diff a downloaded bundle with a remote bundle.",
Long: "Diff a downloaded bundle with a remote bundle. " +
"--destination is a location previously passed to the `bundle download` command.",
Run: func(cmd *cobra.Command, args []string) {
const listLineTemplateString = `{{.Type}} , {{.Name}} , {{with .Additional}}{{.Size}} , {{.Hash}}{{end}} , {{with .Existing}}{{.Size}} , {{.Hash}}{{end}}`
listLineTemplate := template.Must(template.New("list line").Parse(listLineTemplateString))

sourceStore, err := gcs.New(params.repo.MetadataBucket, config.Credential)
if err != nil {
logFatalln(err)
}
blobStore, err := gcs.New(params.repo.BlobBucket, config.Credential)
if err != nil {
logFatalln(err)
}
path, err := sanitizePath(params.bundle.DataPath)
if err != nil {
logFatalln("Failed path validation: " + err.Error())
}
fs := afero.NewBasePathFs(afero.NewOsFs(), path+"/")
destinationStore := localfs.New(fs)

err = setLatestOrLabelledBundle(sourceStore)
if err != nil {
logFatalln(err)
}

localBundle := core.New(core.NewBDescriptor(),
core.ConsumableStore(destinationStore),
)
remoteBundle := core.New(core.NewBDescriptor(),
core.Repo(params.repo.RepoName),
core.MetaStore(sourceStore),
core.BlobStore(blobStore),
core.BundleID(params.bundle.ID),
)

if err = core.PublishMetadata(context.Background(), localBundle); err != nil {
logFatalln(err)
}

if err = core.PublishMetadata(context.Background(), remoteBundle); err != nil {
logFatalln(err)
}

diff, err := core.Diff(localBundle, remoteBundle)
if err != nil {
logFatalln(err)
}

if len(diff.Entries) == 0 {
log.Println("empty diff")
} else {
for _, de := range diff.Entries {
var buf bytes.Buffer
err := listLineTemplate.Execute(&buf, de)
if err != nil {
log.Println("executing template:", err)
}
log.Println(buf.String())
}
}
},
}

func init() {

// Source
requiredFlags := []string{addRepoNameOptionFlag(bundleDiffCmd)}

// Destination
requiredFlags = append(requiredFlags, addDataPathFlag(bundleDiffCmd))

// Bundle to download
addBundleFlag(bundleDiffCmd)
// Blob bucket
addBlobBucket(bundleDiffCmd)
addBucketNameFlag(bundleDiffCmd)

addLabelNameFlag(bundleDiffCmd)

for _, flag := range requiredFlags {
err := bundleDiffCmd.MarkFlagRequired(flag)
if err != nil {
logFatalln(err)
}
}

bundleCmd.AddCommand(bundleDiffCmd)
}
142 changes: 138 additions & 4 deletions cmd/datamon/cmd/cli_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -640,17 +640,14 @@ func testDownloadBundle(t *testing.T, files []uploadTree, bcnt int) {
"--repo", repo1,
"--concurrency-factor", concurrencyFactor,
}, "upload bundle at "+dirPathStr(t, files[0]), false)

ll, err := listBundles(t, repo1)
require.NoError(t, err, "error out of listBundles() test helper")
require.Equal(t, bcnt, len(ll), "bundle count in test repo")
//
destFS := afero.NewBasePathFs(afero.NewOsFs(), consumedData)
dpc := "bundle-dl-" + ll[len(ll)-1].hash
dp, err := filepath.Abs(filepath.Join(consumedData, dpc))
if err != nil {
t.Errorf("couldn't build file path: %v", err)
}
require.NoError(t, err, "couldn't build file path")
exists, err := afero.Exists(destFS, dpc)
require.NoError(t, err, "error out of afero upstream library. possibly programming error in test.")
require.False(t, exists, "no filesystem entry at destination path '"+dpc+"' before bundle upload")
Expand Down Expand Up @@ -688,6 +685,143 @@ func TestDownloadBundles(t *testing.T) {
}
}

type diffEntrySide struct {
hash string
size string
}

type diffEntry struct {
rawLine string
typeLetter string
name string
remote diffEntrySide
local diffEntrySide
}

func TestDiffBundle(t *testing.T) {
cleanup := setupTests(t)
defer cleanup()
files := testUploadTrees[1]
label := internal.RandStringBytesMaskImprSrc(8)
runCmd(t, []string{"repo",
"create",
"--description", "testing",
"--repo", repo1,
"--name", "tests",
"--email", "datamon@oneconcern.com",
}, "create second test repo", false)
//
runCmd(t, []string{"bundle",
"upload",
"--path", dirPathStr(t, files[0]),
"--message", "diff orig",
"--label", label,
"--repo", repo1,
"--concurrency-factor", "20",
}, "upload bundle at "+dirPathStr(t, files[0]), false)
//
destFS := afero.NewBasePathFs(afero.NewOsFs(), consumedData)
dpc := "bundle-diff"
dp, err := filepath.Abs(filepath.Join(consumedData, dpc))
require.NoError(t, err, "couldn't build file path")
exists, err := afero.Exists(destFS, dpc)
require.NoError(t, err, "error out of afero upstream library. possibly programming error in test.")
require.False(t, exists, "no filesystem entry at destination path '"+dpc+"' before bundle upload")
runCmd(t, []string{"bundle",
"download",
"--repo", repo1,
"--destination", dp,
"--label", label,
}, "download bundle uploaded from "+dirPathStr(t, files[0]), false)
exists, err = afero.Exists(destFS, dpc)
require.NoError(t, err, "error out of afero upstream library. possibly programming error in test.")
require.True(t, exists, "filesystem entry at at destination path '"+dpc+"' after bundle upload")
//
srcFS := afero.NewBasePathFs(afero.NewOsFs(), dirPathStr(t, files[0]))
delFile := files[0]
difFile := files[1]
addFileName := "add"
f, err := srcFS.OpenFile(addFileName, os.O_CREATE|os.O_WRONLY|os.O_SYNC|0600, 0600)
require.NoError(t, err, "open file to add")
_, err = f.WriteString("additional")
require.NoError(t, err, "write to additional file")
require.NoError(t, f.Close(), "close add file")
require.NoError(t, srcFS.Remove(pathInBundle(delFile)), "remove file from downloaded bundle")
require.NoError(t, srcFS.Remove(pathInBundle(difFile)), "remove file to change")
f, err = srcFS.OpenFile(pathInBundle(difFile), os.O_CREATE|os.O_WRONLY|os.O_SYNC|0600, 0600)
require.NoError(t, err, "open file to change")
_, err = f.WriteString(internal.RandStringBytesMaskImprSrc(10))
require.NoError(t, err, "write to changed file")
require.NoError(t, f.Close(), "close changed file")
//
runCmd(t, []string{"bundle",
"upload",
"--path", dirPathStr(t, files[0]),
"--message", "diff update",
"--label", label,
"--repo", repo1,
"--concurrency-factor", "20",
}, "upload bundle at "+dirPathStr(t, files[0]), false)
// todo: zero out params in runCmd(), not cleanup()
// params = paramsT{} << this breaks getting bundle id from label
params.bundle.ID = ""
r, w, err := os.Pipe()
if err != nil {
panic(err)
}
log.SetOutput(w)
runCmd(t, []string{"bundle",
"diff",
"--repo", repo1,
"--destination", dp,
"--label", label,
}, "download bundle uploaded from "+dirPathStr(t, files[0]), false)
log.SetOutput(os.Stdout)
w.Close()
//
lb, err := ioutil.ReadAll(r)
require.NoError(t, err, "i/o error reading patched log from pipe")
des := make([]diffEntry, 0)
for _, line := range getDataLogLines(t, string(lb), []string{`Using config file`}) {
sl := strings.Split(line, ",")
de := diffEntry{
rawLine: line,
typeLetter: strings.TrimSpace(sl[0]),
name: strings.TrimSpace(sl[1]),
local: diffEntrySide{
size: strings.TrimSpace(sl[2]),
hash: strings.TrimSpace(sl[3]),
},
remote: diffEntrySide{
size: strings.TrimSpace(sl[4]),
hash: strings.TrimSpace(sl[5]),
},
}
des = append(des, de)
}
require.Equal(t, len(des), 3, "found expected number of diff entries")
desm := make(map[string]diffEntry)
for _, de := range des {
desm[de.name] = de
}
require.Equal(t, len(desm), 3, "diff entries have unique names")
de, ok := desm[addFileName]
require.True(t, ok, "found add file entry")
require.Equal(t, de.typeLetter, "A", "found correct type on add file entry")
require.Equal(t, de.remote.hash, "", "no remote hash on add file entry")
require.NotEqual(t, de.local.hash, "", "local hash on add file entry")
de, ok = desm[pathInBundle(difFile)]
require.True(t, ok, "found update file entry")
require.Equal(t, de.typeLetter, "U", "found correct type on update file entry")
require.NotEqual(t, de.remote.hash, "", "remote hash on update file entry")
require.NotEqual(t, de.local.hash, "", "local hash on update file entry")
de, ok = desm[pathInBundle(delFile)]
require.True(t, ok, "found delete file entry")
require.Equal(t, de.typeLetter, "D", "found correct type on delete file entry")
require.NotEqual(t, de.remote.hash, "", "remote hash on delete file entry")
require.Equal(t, de.local.hash, "", "no local hash on delete file entry")
}

func TestDownloadBundleByLabel(t *testing.T) {
cleanup := setupTests(t)
defer cleanup()
Expand Down
10 changes: 10 additions & 0 deletions pkg/core/bundle.go
Original file line number Diff line number Diff line change
Expand Up @@ -203,6 +203,12 @@ func Publish(ctx context.Context, bundle *Bundle) error {

// PublishMetadata from the archive to the consumable store
func PublishMetadata(ctx context.Context, bundle *Bundle) error {
if bundle.BundleID == "" && bundle.ConsumableStore != nil {
if err := setBundleIDFromConsumableStore(ctx, bundle); err != nil {
return err
}
}

err := unpackBundleDescriptor(ctx, bundle)
if err != nil {
return err
Expand Down Expand Up @@ -296,3 +302,7 @@ func PublishFile(ctx context.Context, bundle *Bundle, file string) error {
func (b *Bundle) Exists(ctx context.Context) (bool, error) {
return b.MetaStore.Has(ctx, model.GetArchivePathToBundle(b.RepoID, b.BundleID))
}

func Diff(bundleExisting *Bundle, bundleAdditional *Bundle) (BundleDiff, error) {
return diffBundles(bundleExisting, bundleAdditional)
}
81 changes: 81 additions & 0 deletions pkg/core/bundle_diff.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
// Copyright © 2018 One Concern

package core

import (
"github.com/oneconcern/datamon/pkg/model"
)

const (
DiffEntryTypeAdd = iota
DiffEntryTypeDel
DiffEntryTypeDif
)

type DiffEntryType uint

func (det DiffEntryType) String() string {
diffEntryStrings := map[DiffEntryType]string{
DiffEntryTypeAdd: "A",
DiffEntryTypeDel: "D",
DiffEntryTypeDif: "U",
}
return diffEntryStrings[det]
}

type DiffEntry struct {
Type DiffEntryType
// could use a method rather than storing Name in order to curb memory use
Name string
Existing model.BundleEntry
Additional model.BundleEntry
}

type BundleDiff struct {
Entries []DiffEntry
}

func diffBundles(bundleExisting *Bundle, bundleAdditional *Bundle) (BundleDiff, error) {
diffEntries := make([]DiffEntry, 0)
bundleEntriesExisting := make(map[string]model.BundleEntry, len(bundleExisting.BundleEntries))
for _, bundleEntry := range bundleExisting.BundleEntries {
bundleEntriesExisting[bundleEntry.NameWithPath] = bundleEntry
}
bundleEntriesAdditional := make(map[string]model.BundleEntry, len(bundleAdditional.BundleEntries))
for _, bundleEntry := range bundleAdditional.BundleEntries {
bundleEntriesAdditional[bundleEntry.NameWithPath] = bundleEntry
}

for nameWithPath, bundleEntryExisting := range bundleEntriesExisting {
bundleEntryAdditional, ok := bundleEntriesAdditional[nameWithPath]
if ok {
if bundleEntryAdditional.Hash != bundleEntryExisting.Hash {
diffEntries = append(diffEntries, DiffEntry{
Type: DiffEntryTypeDif,
Name: nameWithPath,
Existing: bundleEntryExisting,
Additional: bundleEntryAdditional,
})
}
} else {
diffEntries = append(diffEntries, DiffEntry{
Type: DiffEntryTypeDel,
Name: nameWithPath,
Existing: bundleEntryExisting,
})
}
}
for nameWithPath, bundleEntryAdditional := range bundleEntriesAdditional {
_, ok := bundleEntriesExisting[nameWithPath]
if !ok {
diffEntries = append(diffEntries, DiffEntry{
Type: DiffEntryTypeAdd,
Name: nameWithPath,
Additional: bundleEntryAdditional,
})
}
}
return BundleDiff{
Entries: diffEntries,
}, nil
}
Loading

0 comments on commit 824c5bb

Please sign in to comment.