diff --git a/cmd/changelog-reader/main.go b/cmd/changelog-reader/main.go index 50e23d6..51675e0 100644 --- a/cmd/changelog-reader/main.go +++ b/cmd/changelog-reader/main.go @@ -112,18 +112,24 @@ func get_meta(fname string) (_ azblob.Metadata, err error) { } func delete_blob(ctx context.Context, path string) { - blobURL := containerURL.NewBlockBlobURL(path) + if _, err := os.Stat(mountRoot + "/" + path); os.IsNotExist(err) { + // file does not exist on the lustre filesystem now + blobURL := containerURL.NewBlockBlobURL(path) - if use_versioned_blob { - meta := azblob.Metadata{"deleted": "true"} - _, err := blobURL.Upload(ctx, bytes.NewReader(nil), azblob.BlobHTTPHeaders{}, meta, azblob.BlobAccessConditions{}, azblob.AccessTierNone) + if use_versioned_blob { + meta := azblob.Metadata{"deleted": "true"} + _, err := blobURL.Upload(ctx, bytes.NewReader(nil), azblob.BlobHTTPHeaders{}, meta, azblob.BlobAccessConditions{}, azblob.AccessTierNone) + if err != nil { + fmt.Printf("Failed to create deleted object - %s\n", path) + } + } + _, err := blobURL.Delete(ctx, azblob.DeleteSnapshotsOptionInclude, azblob.BlobAccessConditions{}) if err != nil { - fmt.Printf("Failed to create deleted object - %s\n", path) + fmt.Printf("Failed to delete object - %s\n", path) } - } - _, err := blobURL.Delete(ctx, azblob.DeleteSnapshotsOptionInclude, azblob.BlobAccessConditions{}) - if err != nil { - fmt.Printf("Failed to delete object - %s\n", path) + } else { + // file exists on the lustre filesystem so do not delete from the BLOB storage + fmt.Printf("delete_blob: file exists on lustre, not deleting - %s\n", path) } } @@ -132,11 +138,16 @@ func create_symlink(ctx context.Context, name string) { if err != nil { fmt.Printf("Failed to get metadata for slink - %s\n", name) } else { - fmt.Println(meta) - blobURL := containerURL.NewBlockBlobURL(name) - _, err := blobURL.Upload(ctx, bytes.NewReader(nil), azblob.BlobHTTPHeaders{}, meta, azblob.BlobAccessConditions{}, azblob.AccessTierNone) - if err != nil { - fmt.Printf("Failed to create symlink - %s\n", name) + // only create in blob storage if it is a symlink on the filesystem + if _, ok := meta["symlink"]; ok { + blobURL := containerURL.NewBlockBlobURL(name) + _, err := blobURL.Upload(ctx, bytes.NewReader(nil), azblob.BlobHTTPHeaders{}, meta, azblob.BlobAccessConditions{}, azblob.AccessTierNone) + if err != nil { + fmt.Printf("Failed to create symlink - %s\n", name) + } + } else { + fmt.Printf("Not a symlink on the filesystem anymore, not creating\n") + fmt.Println(meta) } } } @@ -191,11 +202,16 @@ func create_dir(ctx context.Context, name string) { if err != nil { fmt.Printf("Failed to get metadata for directory - %s\n", name) } else { - fmt.Println(meta) - blobURL := containerURL.NewBlockBlobURL(name) - _, err := blobURL.Upload(ctx, bytes.NewReader(nil), azblob.BlobHTTPHeaders{}, meta, azblob.BlobAccessConditions{}, azblob.AccessTierNone) - if err != nil { - fmt.Printf("Failed to create directory - %s\n", name) + // only create in blob storage if it is a directory on the filesystem + if _, ok := meta["hdi_isfolder"]; ok { + blobURL := containerURL.NewBlockBlobURL(name) + _, err := blobURL.Upload(ctx, bytes.NewReader(nil), azblob.BlobHTTPHeaders{}, meta, azblob.BlobAccessConditions{}, azblob.AccessTierNone) + if err != nil { + fmt.Printf("Failed to create directory - %s\n", name) + } + } else { + fmt.Printf("Not a directory on the filesystem anymore, not creating\n") + fmt.Println(meta) } } } @@ -596,6 +612,10 @@ func pretty_print_changelog_record(rec *llapi.ChangelogRecord) { fmt.Println(" TypeCode=" + string(rec.TypeCode())) } +func display_changelog(rec *llapi.ChangelogRecord) { + fmt.Printf("[ %s ] %s [%d] Name='%s', SourceName='%s'\n", time.Now().Format("2006-01-02 15:04:05"), rec.Type(), rec.Index(), rec.Name(), rec.SourceName()) +} + func process_changelog(mdtname string, userid string) { follow := true cl, err := llapi.ChangelogStart(mdtname, 0, follow) @@ -625,28 +645,33 @@ func process_changelog(mdtname string, userid string) { debug := false - fmt.Printf("%s [%d] Name='%s', SourceName='%s'\n", rec.Type(), rec.Index(), rec.Name(), rec.SourceName()) - switch { case rectype == "MKDIR": + display_changelog(rec) mkdir(ctx, mdtname, rec) case rectype == "RMDIR": + display_changelog(rec) rmdir(ctx, mdtname, rec) case rectype == "RENME": + display_changelog(rec) renme(ctx, mdtname, rec) //case rectype == "XATTR": // update_metadata(ctx, mdtname, rec) case rectype == "SATTR": + display_changelog(rec) update_metadata(ctx, mdtname, rec) case rectype == "LYOUT": + display_changelog(rec) update_layout(ctx, mdtname, rec) case rectype == "SLINK": + display_changelog(rec) slink(ctx, mdtname, rec) case rectype == "UNLNK": + display_changelog(rec) unlnk(ctx, mdtname, rec) default: - fmt.Printf("Ignoring %s record.\n", rec.Type()) if debug == true { + display_changelog(rec) pretty_print_changelog_record(rec) } }