Skip to content

Commit

Permalink
[patch] bugfix agent duplicated data update execution (#885)
Browse files Browse the repository at this point in the history
Signed-off-by: kpango <kpango@vdaas.org>
  • Loading branch information
kpango authored Dec 15, 2020
1 parent 9aeab03 commit 8d62c74
Show file tree
Hide file tree
Showing 2 changed files with 34 additions and 6 deletions.
10 changes: 6 additions & 4 deletions internal/net/grpc/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -434,10 +434,12 @@ func (g *gRPCClient) Connect(ctx context.Context, addr string, dopts ...DialOpti
log.Warnf("failed to reconnect unhealthy pool addr= %s\terror= %s", addr, err.Error())
g.conns.Delete(addr)
atomic.AddUint64(&g.clientCount, ^uint64(0))
err = conn.Disconnect()
if err != nil {
log.Warnf("failed to disconnect unhealthy pool addr= %s\terror= %s", addr, err.Error())
g.conns.Delete(addr)
if conn != nil {
err = conn.Disconnect()
if err != nil {
log.Warnf("failed to disconnect unhealthy pool addr= %s\terror= %s", addr, err.Error())
g.conns.Delete(addr)
}
}
} else if conn == nil {
g.conns.Delete(addr)
Expand Down
30 changes: 28 additions & 2 deletions pkg/agent/core/ngt/service/ngt.go
Original file line number Diff line number Diff line change
Expand Up @@ -426,6 +426,9 @@ func (n *ngt) InsertMultiple(vecs map[string][]float32) (err error) {

func (n *ngt) Update(uuid string, vec []float32) (err error) {
now := time.Now().UnixNano()
if !n.readyForUpdate(uuid, vec) {
return nil
}
err = n.delete(uuid, now)
if err != nil {
return err
Expand All @@ -436,8 +439,12 @@ func (n *ngt) Update(uuid string, vec []float32) (err error) {

func (n *ngt) UpdateMultiple(vecs map[string][]float32) (err error) {
uuids := make([]string, 0, len(vecs))
for uuid := range vecs {
uuids = append(uuids, uuid)
for uuid, vec := range vecs {
if n.readyForUpdate(uuid, vec) {
uuids = append(uuids, uuid)
} else {
delete(vecs, uuid)
}
}
err = n.DeleteMultiple(uuids...)
if err != nil {
Expand Down Expand Up @@ -738,6 +745,25 @@ func (n *ngt) Exists(uuid string) (oid uint32, ok bool) {
return oid, ok
}

func (n *ngt) readyForUpdate(uuid string, vec []float32) (ready bool) {
if len(uuid) == 0 || len(vec) == 0 {
return false
}
ovec, err := n.GetObject(uuid)
if err != nil || len(vec) != len(ovec) {
// if error (GetObject cannot find vector) or vector length is not equal let's try update
return true
}
for i, v := range vec {
if v != ovec[i] {
// if difference exists return true for update
return true
}
}
// if no difference exists (same vector already exists) return false for skip update
return false
}

func (n *ngt) insertCache(uuid string) (*vcache, bool) {
iv, ok := n.ivc.Load(uuid)
if ok {
Expand Down

0 comments on commit 8d62c74

Please sign in to comment.