Skip to content

Commit

Permalink
File checks when creating/deleting from archive
Browse files Browse the repository at this point in the history
  • Loading branch information
edwardsp committed Jun 26, 2023
1 parent 016fc05 commit a6a25be
Showing 1 changed file with 47 additions and 22 deletions.
69 changes: 47 additions & 22 deletions cmd/changelog-reader/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,18 +112,24 @@ func get_meta(fname string) (_ azblob.Metadata, err error) {
}

func delete_blob(ctx context.Context, path string) {
blobURL := containerURL.NewBlockBlobURL(path)
if _, err := os.Stat(mountRoot + "/" + path); os.IsNotExist(err) {
// file does not exist on the lustre filesystem now
blobURL := containerURL.NewBlockBlobURL(path)

if use_versioned_blob {
meta := azblob.Metadata{"deleted": "true"}
_, err := blobURL.Upload(ctx, bytes.NewReader(nil), azblob.BlobHTTPHeaders{}, meta, azblob.BlobAccessConditions{}, azblob.AccessTierNone)
if use_versioned_blob {
meta := azblob.Metadata{"deleted": "true"}
_, err := blobURL.Upload(ctx, bytes.NewReader(nil), azblob.BlobHTTPHeaders{}, meta, azblob.BlobAccessConditions{}, azblob.AccessTierNone)
if err != nil {
fmt.Printf("Failed to create deleted object - %s\n", path)
}
}
_, err := blobURL.Delete(ctx, azblob.DeleteSnapshotsOptionInclude, azblob.BlobAccessConditions{})
if err != nil {
fmt.Printf("Failed to create deleted object - %s\n", path)
fmt.Printf("Failed to delete object - %s\n", path)
}
}
_, err := blobURL.Delete(ctx, azblob.DeleteSnapshotsOptionInclude, azblob.BlobAccessConditions{})
if err != nil {
fmt.Printf("Failed to delete object - %s\n", path)
} else {
// file exists on the lustre filesystem so do not delete from the BLOB storage
fmt.Printf("delete_blob: file exists on lustre, not deleting - %s\n", path)
}
}

Expand All @@ -132,11 +138,16 @@ func create_symlink(ctx context.Context, name string) {
if err != nil {
fmt.Printf("Failed to get metadata for slink - %s\n", name)
} else {
fmt.Println(meta)
blobURL := containerURL.NewBlockBlobURL(name)
_, err := blobURL.Upload(ctx, bytes.NewReader(nil), azblob.BlobHTTPHeaders{}, meta, azblob.BlobAccessConditions{}, azblob.AccessTierNone)
if err != nil {
fmt.Printf("Failed to create symlink - %s\n", name)
// only create in blob storage if it is a symlink on the filesystem
if _, ok := meta["symlink"]; ok {
blobURL := containerURL.NewBlockBlobURL(name)
_, err := blobURL.Upload(ctx, bytes.NewReader(nil), azblob.BlobHTTPHeaders{}, meta, azblob.BlobAccessConditions{}, azblob.AccessTierNone)
if err != nil {
fmt.Printf("Failed to create symlink - %s\n", name)
}
} else {
fmt.Printf("Not a symlink on the filesystem anymore, not creating\n")
fmt.Println(meta)
}
}
}
Expand Down Expand Up @@ -191,11 +202,16 @@ func create_dir(ctx context.Context, name string) {
if err != nil {
fmt.Printf("Failed to get metadata for directory - %s\n", name)
} else {
fmt.Println(meta)
blobURL := containerURL.NewBlockBlobURL(name)
_, err := blobURL.Upload(ctx, bytes.NewReader(nil), azblob.BlobHTTPHeaders{}, meta, azblob.BlobAccessConditions{}, azblob.AccessTierNone)
if err != nil {
fmt.Printf("Failed to create directory - %s\n", name)
// only create in blob storage if it is a directory on the filesystem
if _, ok := meta["hdi_isfolder"]; ok {
blobURL := containerURL.NewBlockBlobURL(name)
_, err := blobURL.Upload(ctx, bytes.NewReader(nil), azblob.BlobHTTPHeaders{}, meta, azblob.BlobAccessConditions{}, azblob.AccessTierNone)
if err != nil {
fmt.Printf("Failed to create directory - %s\n", name)
}
} else {
fmt.Printf("Not a directory on the filesystem anymore, not creating\n")
fmt.Println(meta)
}
}
}
Expand Down Expand Up @@ -596,6 +612,10 @@ func pretty_print_changelog_record(rec *llapi.ChangelogRecord) {
fmt.Println(" TypeCode=" + string(rec.TypeCode()))
}

func display_changelog(rec *llapi.ChangelogRecord) {
fmt.Printf("[ %s ] %s [%d] Name='%s', SourceName='%s'\n", time.Now().Format("2006-01-02 15:04:05"), rec.Type(), rec.Index(), rec.Name(), rec.SourceName())
}

func process_changelog(mdtname string, userid string) {
follow := true
cl, err := llapi.ChangelogStart(mdtname, 0, follow)
Expand Down Expand Up @@ -625,28 +645,33 @@ func process_changelog(mdtname string, userid string) {

debug := false

fmt.Printf("%s [%d] Name='%s', SourceName='%s'\n", rec.Type(), rec.Index(), rec.Name(), rec.SourceName())

switch {
case rectype == "MKDIR":
display_changelog(rec)
mkdir(ctx, mdtname, rec)
case rectype == "RMDIR":
display_changelog(rec)
rmdir(ctx, mdtname, rec)
case rectype == "RENME":
display_changelog(rec)
renme(ctx, mdtname, rec)
//case rectype == "XATTR":
// update_metadata(ctx, mdtname, rec)
case rectype == "SATTR":
display_changelog(rec)
update_metadata(ctx, mdtname, rec)
case rectype == "LYOUT":
display_changelog(rec)
update_layout(ctx, mdtname, rec)
case rectype == "SLINK":
display_changelog(rec)
slink(ctx, mdtname, rec)
case rectype == "UNLNK":
display_changelog(rec)
unlnk(ctx, mdtname, rec)
default:
fmt.Printf("Ignoring %s record.\n", rec.Type())
if debug == true {
display_changelog(rec)
pretty_print_changelog_record(rec)
}
}
Expand Down

0 comments on commit a6a25be

Please sign in to comment.