Skip to content

Commit

Permalink
add migration logic
Browse files Browse the repository at this point in the history
  • Loading branch information
nathanielc committed Mar 17, 2017
1 parent 3850c33 commit 8b0148d
Show file tree
Hide file tree
Showing 4 changed files with 311 additions and 132 deletions.
38 changes: 35 additions & 3 deletions services/alert/dao.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,24 +21,29 @@ var (
type HandlerSpecDAO interface {
// Retrieve a handler
Get(topic, id string) (HandlerSpec, error)
GetTx(tx storage.ReadOnlyTx, topic, id string) (HandlerSpec, error)

// Create a handler.
// ErrHandlerSpecExists is returned if a handler already exists with the same ID.
Create(h HandlerSpec) error
CreateTx(tx storage.Tx, h HandlerSpec) error

// Replace an existing handler.
// ErrNoHandlerSpecExists is returned if the handler does not exist.
Replace(h HandlerSpec) error
ReplaceTx(tx storage.Tx, h HandlerSpec) error

// Delete a handler.
// It is not an error to delete an non-existent handler.
Delete(topic, id string) error
DeleteTx(tx storage.Tx, topic, id string) error

// List handlers matching a pattern.
// The pattern is shell/glob matching see https://golang.org/pkg/path/#Match
// Offset and limit are pagination bounds. Offset is inclusive starting at index 0.
// More results may exist while the number of returned items is equal to limit.
List(topic, pattern string, offset, limit int) ([]HandlerSpec, error)
ListTx(tx storage.ReadOnlyTx, topic, pattern string, offset, limit int) ([]HandlerSpec, error)
}

//--------------------------------------------------------------------
Expand Down Expand Up @@ -113,8 +118,12 @@ type handlerSpecKV struct {
store *storage.IndexedStore
}

const (
handlerPrefix = "handlers"
)

func newHandlerSpecKV(store storage.Interface) (*handlerSpecKV, error) {
c := storage.DefaultIndexedStoreConfig("handlers", func() storage.BinaryObject {
c := storage.DefaultIndexedStoreConfig(handlerPrefix, func() storage.BinaryObject {
return new(HandlerSpec)
})
istore, err := storage.NewIndexedStore(store, c)
Expand All @@ -136,7 +145,13 @@ func (kv *handlerSpecKV) error(err error) error {
}

func (kv *handlerSpecKV) Get(topic, id string) (HandlerSpec, error) {
o, err := kv.store.Get(fullID(topic, id))
return kv.getHelper(kv.store.Get(fullID(topic, id)))
}
func (kv *handlerSpecKV) GetTx(tx storage.ReadOnlyTx, topic, id string) (HandlerSpec, error) {
return kv.getHelper(kv.store.GetTx(tx, fullID(topic, id)))
}

func (kv *handlerSpecKV) getHelper(o storage.BinaryObject, err error) (HandlerSpec, error) {
if err != nil {
return HandlerSpec{}, kv.error(err)
}
Expand All @@ -150,20 +165,37 @@ func (kv *handlerSpecKV) Get(topic, id string) (HandlerSpec, error) {
func (kv *handlerSpecKV) Create(h HandlerSpec) error {
return kv.store.Create(&h)
}
func (kv *handlerSpecKV) CreateTx(tx storage.Tx, h HandlerSpec) error {
return kv.store.CreateTx(tx, &h)
}

func (kv *handlerSpecKV) Replace(h HandlerSpec) error {
return kv.store.Replace(&h)
}
func (kv *handlerSpecKV) ReplaceTx(tx storage.Tx, h HandlerSpec) error {
return kv.store.ReplaceTx(tx, &h)
}

func (kv *handlerSpecKV) Delete(topic, id string) error {
return kv.store.Delete(fullID(topic, id))
}
func (kv *handlerSpecKV) DeleteTx(tx storage.Tx, topic, id string) error {
return kv.store.DeleteTx(tx, fullID(topic, id))
}

func (kv *handlerSpecKV) List(topic, pattern string, offset, limit int) ([]HandlerSpec, error) {
if pattern == "" {
pattern = "*"
}
objects, err := kv.store.List(storage.DefaultIDIndex, fullID(topic, pattern), offset, limit)
return kv.listHelper(kv.store.List(storage.DefaultIDIndex, fullID(topic, pattern), offset, limit))
}
func (kv *handlerSpecKV) ListTx(tx storage.ReadOnlyTx, topic, pattern string, offset, limit int) ([]HandlerSpec, error) {
if pattern == "" {
pattern = "*"
}
return kv.listHelper(kv.store.ListTx(tx, storage.DefaultIDIndex, fullID(topic, pattern), offset, limit))
}
func (kv *handlerSpecKV) listHelper(objects []storage.BinaryObject, err error) ([]HandlerSpec, error) {
if err != nil {
return nil, err
}
Expand Down
121 changes: 121 additions & 0 deletions services/alert/service.go
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
package alert

import (
"encoding/json"
"fmt"
"log"
"path"
"regexp"
"sync"

"github.com/influxdata/kapacitor/alert"
Expand Down Expand Up @@ -128,6 +130,11 @@ func (s *Service) Open() error {
}
s.topicsDAO = topicsDAO

// Migrate v1.2 handlers
if err := s.migrateHandlerSpecs(store); err != nil {
return err
}

// Load saved handlers
if err := s.loadSavedHandlerSpecs(); err != nil {
return err
Expand All @@ -153,6 +160,120 @@ func (s *Service) Close() error {
return s.APIServer.Close()
}

func (s *Service) migrateHandlerSpecs(store storage.Interface) error {
s.logger.Println("D! migrating old v1.2 handler specs")

// v1.2 HandlerActionSpec
type oldHandlerActionSpec struct {
Kind string `json:"kind"`
Options map[string]interface{} `json:"options"`
}

// v1.2 HandlerSpec
type oldHandlerSpec struct {
ID string `json:"id"`
Topics []string `json:"topics"`
Actions []oldHandlerActionSpec `json:"actions"`
}
oldDataPrefix := "/" + handlerPrefix + "/data"
oldKeyPattern := regexp.MustCompile(fmt.Sprintf(`^%s/[-\._\p{L}0-9]+$`, oldDataPrefix))

// Process to migrate to new handler specs:
// 1. Gather all old handlers
// 2. Define new handlers that are equivalent
// 3. Check that there are no ID conflicts
// 4. Save the new handlers
// 5. Delete the old specs
//
// All steps are performed in a single transaction,
// so it can be rolledback in case of an error.
return store.Update(func(tx storage.Tx) error {
var newHandlers []HandlerSpec
kvs, err := tx.List(oldDataPrefix)
if err != nil {
return err
}
s.logger.Printf("D! found %d handler rows", len(kvs))

var oldKeys []string
for _, kv := range kvs {
if !oldKeyPattern.MatchString(kv.Key) {
s.logger.Println("D! found new handler skipping:", kv.Key)
continue
}
oldKeys = append(oldKeys, kv.Key)
var old oldHandlerSpec
err := storage.VersionJSONDecode(kv.Value, func(version int, dec *json.Decoder) error {
if version != 1 {
return fmt.Errorf("old handler specs should all be version 1, got version %d", version)
}
return dec.Decode(&old)
})
if err != nil {
return errors.Wrapf(err, "failed to read old handler spec data for %s", kv.Key)
}

s.logger.Println("D! migrating old handler spec", old.ID)

// Create new handlers from the old
hasStateChangesOnly := false
aggregatePrefix := ""
for i, action := range old.Actions {
new := HandlerSpec{
ID: fmt.Sprintf("%s-%s-%d", old.ID, action.Kind, i),
Kind: action.Kind,
Options: action.Options,
}
if hasStateChangesOnly {
new.Match = "changed() == TRUE"
}
switch action.Kind {
case "stateChangesOnly":
hasStateChangesOnly = true
// No need to add a handler for this one
case "aggregate":
newPrefix := aggregatePrefix + "agg_topic-" + old.ID + "-"
for _, topic := range old.Topics {
new.Topic = aggregatePrefix + topic
new.Options["topic"] = newPrefix + topic
newHandlers = append(newHandlers, new)
}
aggregatePrefix = newPrefix
default:
for _, topic := range old.Topics {
new.Topic = aggregatePrefix + topic
newHandlers = append(newHandlers, new)
}
}
}
}

s.logger.Printf("D! creating %d new handlers in place of old handlers", len(newHandlers))

// Check that all new handlers are unique
for _, handler := range newHandlers {
if _, err := s.specsDAO.GetTx(tx, handler.Topic, handler.ID); err != ErrNoHandlerSpecExists {
return fmt.Errorf("handler %q for topic %q already exists", handler.ID, handler.Topic)
}
}

// Create new handlers
for _, handler := range newHandlers {
if err := s.specsDAO.CreateTx(tx, handler); err != nil {
return errors.Wrap(err, "failed to create new handler during migration")
}
}

// Delete old handlers
for _, key := range oldKeys {
if err := tx.Delete(key); err != nil {
return err
}
}
return nil
})
}

func (s *Service) loadSavedHandlerSpecs() error {
offset := 0
limit := 100
Expand Down
Loading

0 comments on commit 8b0148d

Please sign in to comment.