Skip to content
This repository has been archived by the owner on Mar 30, 2023. It is now read-only.

Commit

Permalink
Merge pull request #50 from RSE-Cambridge/8janfixes
Browse files Browse the repository at this point in the history
8janfixes
  • Loading branch information
JohnGarbutt authored Jan 9, 2019
2 parents 1a00af1 + f802972 commit 6fba633
Show file tree
Hide file tree
Showing 11 changed files with 80 additions and 219 deletions.
7 changes: 0 additions & 7 deletions cmd/dac-func-test/etcdkeystore.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,13 +111,6 @@ func testKeepAlive(keystore keystoreregistry.Keystore) {
func TestEtcdKeystore(keystore keystoreregistry.Keystore) {
log.Println("Testing etcdkeystore...")

keystore.WatchPrefix("ke",
func(old *keystoreregistry.KeyValueVersion, new *keystoreregistry.KeyValueVersion) {
log.Println("Watch spotted an update:")
log.Println(" new:", new)
log.Println(" old:", old)
})

testAddValues(keystore)
testGet(keystore)
testUpdate(keystore)
Expand Down
116 changes: 6 additions & 110 deletions internal/pkg/etcdregistry/keystore.go
Original file line number Diff line number Diff line change
Expand Up @@ -205,105 +205,6 @@ func (client *etcKeystore) Get(key string) (keystoreregistry.KeyValueVersion, er
return *getKeyValueVersion(response.Kvs[0]), nil
}

func (client *etcKeystore) WatchPrefix(prefix string,
onUpdate func(old *keystoreregistry.KeyValueVersion, new *keystoreregistry.KeyValueVersion)) {
rch := client.Client.Watch(context.Background(), prefix, clientv3.WithPrefix(), clientv3.WithPrevKV())
go func() {
for wresp := range rch {
for _, ev := range wresp.Events {
new := getKeyValueVersion(ev.Kv)
if new != nil && new.CreateRevision == 0 {
// show deleted by returning nil
new = nil
}
old := getKeyValueVersion(ev.PrevKv)
onUpdate(old, new)
}
}
}()
}

func (client *etcKeystore) WatchKey(ctxt context.Context, key string,
onUpdate func(old *keystoreregistry.KeyValueVersion, new *keystoreregistry.KeyValueVersion)) {
rch := client.Client.Watch(ctxt, key, clientv3.WithPrevKV())
go func() {
for watchResponse := range rch {
for _, ev := range watchResponse.Events {
new := getKeyValueVersion(ev.Kv)
if new != nil && new.CreateRevision == 0 {
// show deleted by returning nil
new = nil
}
old := getKeyValueVersion(ev.PrevKv)

onUpdate(old, new) // TODO if returns something cancel context? duno?
}
}
// TODO... chanel to receiver instead? // TODO... what about watchResponse.Cancelled or Err()?
onUpdate(nil, nil) // signal we are done
}()
}

// TODO: this needs fixing up
func (client *etcKeystore) WatchForCondition(ctxt context.Context, key string, fromRevision int64,
check func(update keystoreregistry.KeyValueUpdate) bool) (bool, error) {

// check key is present and find revision of the last update
initialValue, err := client.Get(key)
if err != nil {
return false, err
}
if fromRevision < initialValue.CreateRevision {
return false, errors.New("incorrect fromRevision")
}

// no deadline set, so add default timeout of 10 mins
var cancelFunc context.CancelFunc
_, ok := ctxt.Deadline()
if !ok {
ctxt, cancelFunc = context.WithTimeout(ctxt, time.Minute*10)
}

// open channel with etcd, starting with the last revision of the key from above
rch := client.Client.Watch(ctxt, key, clientv3.WithPrefix(), clientv3.WithRev(fromRevision))
if rch == nil {
cancelFunc()
return false, errors.New("no watcher returned from etcd")
}

conditionMet := false
go func() {
for watchResponse := range rch {
// TODO: this should instead use Watch from above!
for _, ev := range watchResponse.Events {
update := keystoreregistry.KeyValueUpdate{
New: getKeyValueVersion(ev.Kv),
Old: getKeyValueVersion(ev.PrevKv),
}

// show deleted by returning nil for new
isKeyDeleted := false
if ev.Type == clientv3.EventTypeDelete {
update.New = nil
isKeyDeleted = true
}

conditionMet := check(update)

// stop watching if the condition passed or key was deleted
if conditionMet || isKeyDeleted {
cancelFunc()
return
}
}
}
// Assuming we get here when the context is cancelled or hits its timeout
// i.e. there are no more events, so we close the channel
}()

return conditionMet, nil
}

func (client *etcKeystore) KeepAliveKey(key string) error {
kvc := clientv3.NewKV(client.Client)

Expand Down Expand Up @@ -337,20 +238,15 @@ func (client *etcKeystore) KeepAliveKey(key string) error {

counter := 9
go func() {
for {
ka := <-ch
if ka == nil {
log.Panicf("Unable to refresh key: %s", key)
break
for range ch {
if counter >= 9 {
counter = 0
log.Println("Still refreshing key:", key)
} else {
if counter >= 9 {
counter = 0
log.Println("Still refreshing key:", key)
} else {
counter++
}
counter++
}
}
log.Panicf("Unable to refresh key: %s", key)
}()
return nil
}
Expand Down
13 changes: 0 additions & 13 deletions internal/pkg/keystoreregistry/keystore.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,19 +44,6 @@ type Keystore interface {
// Get all keys for a given prefix.
Get(key string) (KeyValueVersion, error)

// Get callback on all changes related to the given prefix.
//
// When a key is created for the first time, old is an empty value,
// and new.CreateRevision == new.ModRevision
// This starts watching from the current version, rather than replaying old events
// Returns the revision that the watch is starting on
WatchPrefix(prefix string, onUpdate func(old *KeyValueVersion, new *KeyValueVersion))

// Watch given key
//
// When callback returns true, stop watch the key
WatchKey(ctxt context.Context, key string, onUpdate func(old *KeyValueVersion, new *KeyValueVersion))

// Get a channel containing all KeyValueUpdate events
//
// Use the context to control if you watch forever, or if you choose to cancel when a key
Expand Down
82 changes: 29 additions & 53 deletions internal/pkg/keystoreregistry/volume.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ import (
"github.com/RSE-Cambridge/data-acc/internal/pkg/registry"
"log"
"math/rand"
"sync"
"time"
)

Expand Down Expand Up @@ -352,8 +351,11 @@ func (volRegistry *volumeRegistry) GetVolumeChanges(ctx context.Context, volume

func (volRegistry *volumeRegistry) WaitForState(volumeName registry.VolumeName, state registry.VolumeState) error {
log.Println("Start waiting for volume", volumeName, "to reach state", state)
err := volRegistry.WaitForCondition(volumeName, func(old *registry.Volume, new *registry.Volume) bool {
return new.State == state || new.State == registry.Error
err := volRegistry.WaitForCondition(volumeName, func(event *registry.VolumeChange) bool {
if event.New == nil {
log.Panicf("unable to process event %+v", event)
}
return event.New.State == state || event.New.State == registry.Error
})
log.Println("Stopped waiting for volume", volumeName, "to reach state", state, err)
if err != nil {
Expand All @@ -368,62 +370,36 @@ func (volRegistry *volumeRegistry) WaitForState(volumeName registry.VolumeName,
return err
}

func (volRegistry *volumeRegistry) WaitForCondition(volumeName registry.VolumeName,
condition func(old *registry.Volume, new *registry.Volume) bool) error {

var waitGroup sync.WaitGroup
waitGroup.Add(1)
ctxt, cancelFunc := context.WithTimeout(context.Background(), time.Minute*10)
// TODO should we always need to call cancel? or is timeout enough?

err := fmt.Errorf("error waiting for volume %s to meet supplied condition", volumeName)

var finished bool
volRegistry.keystore.WatchKey(ctxt, getVolumeKey(string(volumeName)),
func(old *KeyValueVersion, new *KeyValueVersion) {
if old == nil && new == nil {
// TODO: attempt to signal error on timeout, should move to channel!!
cancelFunc()
if !finished {
// at the end we always get called with nil, nil
// but sometimes we will have already found the condition
waitGroup.Done()
}
return
}
oldVolume := &registry.Volume{}
newVolume := &registry.Volume{}
if old != nil {
volumeFromKeyValue(*old, oldVolume)
}
if new != nil {
volumeFromKeyValue(*new, newVolume)
}
// TODO: maybe have environment variable to tune this wait time?
var defaultTimeout = time.Minute * 10

if !finished && condition(oldVolume, newVolume) {
finished = true
log.Printf("condition met with new volume: %s", newVolume)
err = nil
cancelFunc()
waitGroup.Done()
}
})
func (volRegistry *volumeRegistry) WaitForCondition(volumeName registry.VolumeName,
condition func(event *registry.VolumeChange) bool) error {

// check we have not already hit the condition
volume, err := volRegistry.Volume(volumeName)
if err != nil {
// NOTE this forces the volume to existing before you wait, seems OK
return err
}
log.Printf("About to wait for condition on volume: %s", volume)
if condition(&volume, &volume) {
log.Println("Condition already met, bail early.")
cancelFunc()
return nil

ctxt, cancelFunc := context.WithTimeout(context.Background(), defaultTimeout)
events := volRegistry.GetVolumeChanges(ctxt, volume)
defer cancelFunc()

log.Printf("About to wait for condition on volume: %+v", volume)

for event := range events {
if event.Err != nil {
return event.Err
}
if event.IsDelete {
return fmt.Errorf("stopped waiting as volume %s is deleted", volume.Name)
}

conditionMet := condition(&event)
if conditionMet {
return nil
}
}
log.Println("Condition not met, starting to wait.")

// TODO do we get stuck in a forever loop here when we hit the timeout above?
waitGroup.Wait()
return err
return fmt.Errorf("stopped waiting for volume %s to meet supplied condition", volume.Name)
}
14 changes: 7 additions & 7 deletions internal/pkg/lifecycle/volume.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,16 +124,16 @@ func (vlm *volumeLifecycleManager) Mount(hosts []string, jobName string) error {

// TODO: should share code with Unmount!!
var volumeInErrorState bool
err := vlm.volumeRegistry.WaitForCondition(vlm.volume.Name, func(old *registry.Volume, new *registry.Volume) bool {
if new.State == registry.Error {
err := vlm.volumeRegistry.WaitForCondition(vlm.volume.Name, func(event *registry.VolumeChange) bool {
if event.New.State == registry.Error {
volumeInErrorState = true
return true
}
allAttached := false
for _, host := range hosts {

var isAttached bool
for _, attachment := range new.Attachments {
for _, attachment := range event.New.Attachments {
if attachment.Job == jobName && attachment.Hostname == host {
if attachment.State == registry.Attached {
isAttached = true
Expand Down Expand Up @@ -195,14 +195,14 @@ func (vlm *volumeLifecycleManager) Unmount(hosts []string, jobName string) error

// TODO: must share way more code and do more tests on this logic!!
var volumeInErrorState error
err := vlm.volumeRegistry.WaitForCondition(vlm.volume.Name, func(olqqd *registry.Volume, new *registry.Volume) bool {
if new.State == registry.Error {
volumeInErrorState = fmt.Errorf("volume %s now in error state", new.Name)
err := vlm.volumeRegistry.WaitForCondition(vlm.volume.Name, func(event *registry.VolumeChange) bool {
if event.New.State == registry.Error {
volumeInErrorState = fmt.Errorf("volume %s now in error state", event.New.Name)
return true
}
allDettached := false
for _, host := range hosts {
newAttachment, ok := new.FindAttachment(host, jobName)
newAttachment, ok := event.New.FindAttachment(host, jobName)
if !ok {
// TODO: debug log or something?
volumeInErrorState = fmt.Errorf("unable to find attachment for host: %s", host)
Expand Down
36 changes: 17 additions & 19 deletions internal/pkg/lifecycle/volume_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,30 +22,29 @@ func TestVolumeLifecycleManager_Mount(t *testing.T) {
{Hostname: "host1", State: registry.RequestAttach, Job: "job1"},
{Hostname: "host2", State: registry.RequestAttach, Job: "job1"},
})
fakeWait := func(volumeName registry.VolumeName, condition func(old *registry.Volume, new *registry.Volume) bool) error {
old := &registry.Volume{}
new := &registry.Volume{}
assert.False(t, condition(old, new))
new.Attachments = []registry.Attachment{
fakeWait := func(volumeName registry.VolumeName, condition func(event *registry.VolumeChange) bool) error {
event := &registry.VolumeChange{New: &registry.Volume{}}
assert.False(t, condition(event))
event.New.Attachments = []registry.Attachment{
{Hostname: "host1", Job: "job2", State: registry.Detached},
{Hostname: "host1", Job: "job1", State: registry.Attached},
{Hostname: "host2", Job: "job1", State: registry.Attached},
}
assert.True(t, condition(old, new))
assert.True(t, condition(event))

new.Attachments = []registry.Attachment{
event.New.Attachments = []registry.Attachment{
{Hostname: "host1", Job: "job2", State: registry.AttachmentError},
{Hostname: "host1", Job: "job1", State: registry.Detached},
{Hostname: "host2", Job: "job1", State: registry.Attached},
}
assert.False(t, condition(old, new))
assert.False(t, condition(event))

new.Attachments = []registry.Attachment{
event.New.Attachments = []registry.Attachment{
{Hostname: "host1", Job: "job2", State: registry.Attached},
{Hostname: "host1", Job: "job1", State: registry.AttachmentError},
{Hostname: "host2", Job: "job1", State: registry.Attached},
}
assert.True(t, condition(old, new))
assert.True(t, condition(event))
return nil
}
mockVolReg.EXPECT().WaitForCondition(volume.Name, gomock.Any()).DoAndReturn(fakeWait)
Expand Down Expand Up @@ -73,29 +72,28 @@ func TestVolumeLifecycleManager_Unmount(t *testing.T) {
{Hostname: "host1", State: registry.RequestDetach, Job: "job1"},
{Hostname: "host2", State: registry.RequestDetach, Job: "job1"},
})
fakeWait := func(volumeName registry.VolumeName, condition func(old *registry.Volume, new *registry.Volume) bool) error {
old := &registry.Volume{}
new := &registry.Volume{}
new.Attachments = []registry.Attachment{
fakeWait := func(volumeName registry.VolumeName, condition func(event *registry.VolumeChange) bool) error {
event := &registry.VolumeChange{New: &registry.Volume{}}
event.New.Attachments = []registry.Attachment{
{Hostname: "host1", Job: "job2"},
{Hostname: "host1", Job: "job1", State: registry.Detached},
{Hostname: "host2", Job: "job1", State: registry.Detached},
}
assert.True(t, condition(old, new))
assert.True(t, condition(event))

new.Attachments = []registry.Attachment{
event.New.Attachments = []registry.Attachment{
{Hostname: "host1", Job: "job2", State: registry.AttachmentError},
{Hostname: "host1", Job: "job1", State: registry.Detached},
{Hostname: "host2", Job: "job1", State: registry.Attached},
}
assert.False(t, condition(old, new))
assert.False(t, condition(event))

new.Attachments = []registry.Attachment{
event.New.Attachments = []registry.Attachment{
{Hostname: "host1", Job: "job2"},
{Hostname: "host1", Job: "job1", State: registry.AttachmentError},
{Hostname: "host2", Job: "job1", State: registry.Detached},
}
assert.True(t, condition(old, new))
assert.True(t, condition(event))
return nil
}
mockVolReg.EXPECT().WaitForCondition(volume.Name, gomock.Any()).DoAndReturn(fakeWait)
Expand Down
Loading

0 comments on commit 6fba633

Please sign in to comment.