Skip to content

Commit

Permalink
add surface mutid to label index
Browse files Browse the repository at this point in the history
  • Loading branch information
DocSavage committed May 6, 2024
1 parent b07e713 commit 8b74961
Show file tree
Hide file tree
Showing 8 changed files with 103 additions and 71 deletions.
24 changes: 20 additions & 4 deletions datatype/common/labels/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -400,8 +400,9 @@ func (idx *Index) FitToBounds(bounds *dvid.OptionalBounds) error {
return nil
}

// Add adds the given Index to the receiver.
func (idx *Index) Add(idx2 *Index) error {
// Add adds the given Index to the receiver. Any blocks modified will result in
// the surface_mutid being set since the surface in the block would have changed.
func (idx *Index) Add(idx2 *Index, mutInfo dvid.MutInfo) error {
if idx == nil {
return fmt.Errorf("can't use Index.Add with nil receiver Index")
}
Expand All @@ -422,18 +423,32 @@ func (idx *Index) Add(idx2 *Index) error {
for sv2, c2 := range svc2.Counts {
svc.Counts[sv2] = c2
}
svc.SurfaceMutid = mutInfo.MutID
}
}
idx.LastMutId = mutInfo.MutID
idx.LastModTime = mutInfo.Time
idx.LastModUser = mutInfo.User
idx.LastModApp = mutInfo.App
return nil
}

// Cleave the given supervoxels from an index and returns a new index, modifying both receiver
// and creating new cleaved index.
func (idx *Index) Cleave(cleaveLabel uint64, toCleave []uint64) (cleavedSize, remainSize uint64, cidx *Index) {
func (idx *Index) Cleave(cleaveLabel uint64, toCleave []uint64, mutInfo dvid.MutInfo) (cleavedSize, remainSize uint64, cidx *Index) {
idx.LastMutId = mutInfo.MutID
idx.LastModUser = mutInfo.User
idx.LastModTime = mutInfo.Time
idx.LastModApp = mutInfo.App

cleaveSet := NewSet(toCleave...)
cidx = new(Index)
cidx.Label = cleaveLabel
cidx.Blocks = make(map[uint64]*proto.SVCount)
cidx.LastMutId = mutInfo.MutID
cidx.LastModUser = mutInfo.User
cidx.LastModTime = mutInfo.Time
cidx.LastModApp = mutInfo.App

for zyx, svc := range idx.Blocks {
if svc != nil && svc.Counts != nil {
Expand All @@ -444,12 +459,13 @@ func (idx *Index) Cleave(cleaveLabel uint64, toCleave []uint64) (cleavedSize, re
cleavedSize += uint64(sz)
cleavedCounts[supervoxel] = sz
delete(svc.Counts, supervoxel)
svc.SurfaceMutid = mutInfo.MutID
} else {
remainSize += uint64(sz)
}
}
if len(cleavedCounts) > 0 {
cidx.Blocks[zyx] = &proto.SVCount{Counts: cleavedCounts}
cidx.Blocks[zyx] = &proto.SVCount{Counts: cleavedCounts, SurfaceMutid: mutInfo.MutID}
}
}
}
Expand Down
6 changes: 3 additions & 3 deletions datatype/common/labels/index_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ func TestIndexOps(t *testing.T) {
t.Errorf("expected 635228 total voxels, got %d voxels in index supervoxel counts\n", totalVoxels)
}

cleavedSize, remainSize, cleaveIdx := idx.Cleave(200, []uint64{1001, 26029, 3829})
cleavedSize, remainSize, cleaveIdx := idx.Cleave(200, []uint64{1001, 26029, 3829}, dvid.MutInfo{})
expectedCleavedSize := uint64(6899 + 63560 + 10000)
if cleavedSize != expectedCleavedSize {
t.Errorf("expected cleaved size to be %d, got %d\n", expectedCleavedSize, cleavedSize)
Expand Down Expand Up @@ -174,7 +174,7 @@ func TestIndexOps(t *testing.T) {
if !reflect.DeepEqual(cleaveCounts, cleaveIdx.GetSupervoxelCounts()) {
t.Errorf("after cleave, remain index has incorrect counts:\nExpected %v\nGot %v\n", cleaveCounts, cleaveIdx.GetSupervoxelCounts())
}
if err := idx.Add(cleaveIdx); err != nil {
if err := idx.Add(cleaveIdx, dvid.MutInfo{}); err != nil {
t.Error(err)
}
if !reflect.DeepEqual(origCounts, idx.GetSupervoxelCounts()) {
Expand Down Expand Up @@ -297,7 +297,7 @@ func TestCleaveIndex(t *testing.T) {
}
idx.Blocks[block4] = svc

cleavedSize, remainSize, cleaveIdx := idx.Cleave(200, []uint64{87, 382, 1001, 3829, 673, 8763})
cleavedSize, remainSize, cleaveIdx := idx.Cleave(200, []uint64{87, 382, 1001, 3829, 673, 8763}, dvid.MutInfo{})
expectedTotalSize := uint64(73396)
if cleavedSize+remainSize != expectedTotalSize {
t.Errorf("cleaved %d + remain %d voxels != %d total voxels\n", cleavedSize, remainSize, expectedTotalSize)
Expand Down
84 changes: 49 additions & 35 deletions datatype/common/proto/labelops.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions datatype/common/proto/labelops.proto
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ message AffinityTable {

message SVCount {
map<uint64, uint32> counts = 1;
uint64 surface_mutid = 2; // mutation id for last surface changes that affect mesh
}

message LabelIndex {
Expand Down
22 changes: 9 additions & 13 deletions datatype/labelmap/labelidx.go
Original file line number Diff line number Diff line change
Expand Up @@ -656,10 +656,6 @@ func (d *Data) cleaveIndex(v dvid.VersionID, op labels.CleaveOp, info dvid.ModIn
err = fmt.Errorf("cannot cleave non-existent label %d", op.Target)
return
}
idx.LastMutId = op.MutID
idx.LastModUser = info.User
idx.LastModTime = info.Time
idx.LastModApp = info.App

if err := d.addMutcache(v, op.MutID, idx); err != nil {
dvid.Criticalf("unable to add cleaved mutid %d index %d: %v\n", op.MutID, op.Target, err)
Expand All @@ -681,11 +677,11 @@ func (d *Data) cleaveIndex(v dvid.VersionID, op labels.CleaveOp, info dvid.ModIn
// create a new label index to contain the cleaved supervoxels.
// we don't have to worry about mutex here because it's a new index.
var cidx *labels.Index
cleavedSize, remainSize, cidx = idx.Cleave(op.CleavedLabel, op.CleavedSupervoxels)
cidx.LastMutId = op.MutID
cidx.LastModUser = info.User
cidx.LastModTime = info.Time
cidx.LastModApp = info.App
mutInfo := dvid.MutInfo{
MutID: op.MutID,
ModInfo: info,
}
cleavedSize, remainSize, cidx = idx.Cleave(op.CleavedLabel, op.CleavedSupervoxels, mutInfo)
if err = putCachedLabelIndex(d, v, cidx); err != nil {
return
}
Expand Down Expand Up @@ -721,7 +717,7 @@ func ChangeLabelIndex(d dvid.Data, v dvid.VersionID, label uint64, delta labels.
}

// getMergedIndex gets index data for all labels in a set with possible bounds.
func (d *Data) getMergedIndex(v dvid.VersionID, mutID uint64, lbls labels.Set, bounds dvid.Bounds) (*labels.Index, error) {
func (d *Data) getMergedIndex(v dvid.VersionID, lbls labels.Set, mutInfo dvid.MutInfo, bounds dvid.Bounds) (*labels.Index, error) {
if len(lbls) == 0 {
return nil, nil
}
Expand All @@ -731,15 +727,15 @@ func (d *Data) getMergedIndex(v dvid.VersionID, mutID uint64, lbls labels.Set, b
if err != nil {
return nil, err
}
if err := d.addMutcache(v, mutID, idx2); err != nil {
dvid.Criticalf("unable to add merge mutid %d index %d: %v\n", mutID, label, err)
if err := d.addMutcache(v, mutInfo.MutID, idx2); err != nil {
dvid.Criticalf("unable to add merge mutid %d index %d: %v\n", mutInfo.MutID, label, err)
}
if bounds.Block != nil && bounds.Block.IsSet() {
if err := idx2.FitToBounds(bounds.Block); err != nil {
return nil, err
}
}
if err := idx.Add(idx2); err != nil {
if err := idx.Add(idx2, mutInfo); err != nil {
return nil, err
}
}
Expand Down
2 changes: 1 addition & 1 deletion datatype/labelmap/labelidx_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -214,7 +214,7 @@ func TestIngest(t *testing.T) {
idx3 := body3.getIndex(t)
checkGetIndices(t, child1, idx1, idx2, idx3)

if err := idx1.Add(idx2); err != nil {
if err := idx1.Add(idx2, dvid.MutInfo{}); err != nil {
t.Fatal(err)
}
idx1.Label = 7
Expand Down
26 changes: 12 additions & 14 deletions datatype/labelmap/mutate.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,10 @@ func (d *Data) MergeLabels(v dvid.VersionID, op labels.MergeOp, info dvid.ModInf
timedLog := dvid.NewTimeLog()
mutID = d.NewMutationID()
op.MutID = mutID
mutInfo := dvid.MutInfo{
MutID: mutID,
ModInfo: info,
}

// send kafka merge event to instance-uuid topic
lbls := make([]uint64, 0, len(op.Merged))
Expand Down Expand Up @@ -96,7 +100,7 @@ func (d *Data) MergeLabels(v dvid.VersionID, op labels.MergeOp, info dvid.ModInf
dvid.Criticalf("unable to add merge mutid %d target index %d: %v\n", mutID, op.Target, err)
}
delta.TargetVoxels = targetIdx.NumVoxels()
if mergeIdx, err = d.getMergedIndex(v, mutID, op.Merged, dvid.Bounds{}); err != nil {
if mergeIdx, err = d.getMergedIndex(v, op.Merged, mutInfo, dvid.Bounds{}); err != nil {
err = fmt.Errorf("can't get block indices of merge labels %s: %v", op.Merged, err)
return
}
Expand Down Expand Up @@ -127,19 +131,13 @@ func (d *Data) MergeLabels(v dvid.VersionID, op labels.MergeOp, info dvid.ModInf
return
}

if mergeIdx != nil && len(mergeIdx.Blocks) != 0 {
err = targetIdx.Add(mergeIdx)
if err != nil {
return
}
targetIdx.LastMutId = mutID
targetIdx.LastModUser = info.User
targetIdx.LastModTime = info.Time
targetIdx.LastModApp = info.App
dvid.Infof("putting targetIdx with user %s\n", targetIdx.LastModUser)
if err = PutLabelIndex(d, v, op.Target, targetIdx); err != nil {
return
}
// Write the final merged index and also record surface_mutid since surface changed.
if err = targetIdx.Add(mergeIdx, mutInfo); err != nil {
return
}
dvid.Infof("putting targetIdx with user %s\n", targetIdx.LastModUser)
if err = PutLabelIndex(d, v, op.Target, targetIdx); err != nil {
return
}
for merged := range delta.Merged {
DeleteLabelIndex(d, v, merged)
Expand Down
9 changes: 8 additions & 1 deletion dvid/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,12 @@ func MinInt32(a, b int32) int32 {
return b
}

// MutInfo gives mutation id and the associated ModInfo
type MutInfo struct {
MutID uint64
ModInfo
}

// ModInfo gives a user, app and time for a modification
type ModInfo struct {
User string
Expand Down Expand Up @@ -134,7 +140,8 @@ func RandomBytes(numBytes int32) []byte {
// option or # cores) and/or the megabytes (MB) of memory needed for each goroutine.
// A minimum of 1 goroutine is returned.
// TODO: Actually use the required memory provided in argument. For now,
// only returns percentage of maximum # of cores.
//
// only returns percentage of maximum # of cores.
func EstimateGoroutines(percentCPUs float64, goroutineMB int32) int {
goroutines := percentCPUs * float64(NumCPU)
if goroutines < 1.0 {
Expand Down

0 comments on commit 8b74961

Please sign in to comment.