Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

746 migration #809

Closed
wants to merge 7 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@ protocol/protocol.pb.go
build/

# executables
/server
/influxdb
/benchmark-tool
/main
Expand Down Expand Up @@ -58,3 +57,6 @@ config.toml
/admin
/admin/
/data/

# test data files
integration/migration_data/
64 changes: 46 additions & 18 deletions api/http/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,48 +14,56 @@ import (
"path/filepath"
"strconv"
"strings"
"sync/atomic"
"time"

log "code.google.com/p/log4go"
"github.com/bmizerany/pat"
"github.com/influxdb/influxdb/cluster"
. "github.com/influxdb/influxdb/common"
"github.com/influxdb/influxdb/configuration"
"github.com/influxdb/influxdb/coordinator"
"github.com/influxdb/influxdb/migration"
"github.com/influxdb/influxdb/parser"
"github.com/influxdb/influxdb/protocol"
)

type HttpServer struct {
conn net.Listener
sslConn net.Listener
httpPort string
httpSslPort string
httpSslCert string
adminAssetsDir string
coordinator coordinator.Coordinator
userManager UserManager
shutdown chan bool
clusterConfig *cluster.ClusterConfiguration
raftServer *coordinator.RaftServer
readTimeout time.Duration
}

func NewHttpServer(httpPort string, readTimeout time.Duration, adminAssetsDir string, theCoordinator coordinator.Coordinator, userManager UserManager, clusterConfig *cluster.ClusterConfiguration, raftServer *coordinator.RaftServer) *HttpServer {
conn net.Listener
sslConn net.Listener
httpPort string
httpSslPort string
httpSslCert string
adminAssetsDir string
coordinator coordinator.Coordinator
userManager UserManager
shutdown chan bool
clusterConfig *cluster.ClusterConfiguration
raftServer *coordinator.RaftServer
readTimeout time.Duration
migrationRunning uint32
config *configuration.Configuration
}

func NewHttpServer(config *configuration.Configuration, theCoordinator coordinator.Coordinator, userManager UserManager, clusterConfig *cluster.ClusterConfiguration, raftServer *coordinator.RaftServer) *HttpServer {
self := &HttpServer{}
self.httpPort = httpPort
self.adminAssetsDir = adminAssetsDir
self.httpPort = config.ApiHttpPortString()
self.adminAssetsDir = config.AdminAssetsDir
self.coordinator = theCoordinator
self.userManager = userManager
self.shutdown = make(chan bool, 2)
self.clusterConfig = clusterConfig
self.raftServer = raftServer
self.readTimeout = readTimeout
self.readTimeout = config.ApiReadTimeout
self.config = config
return self
}

const (
INVALID_CREDENTIALS_MSG = "Invalid database/username/password"
JSON_PRETTY_PRINT_INDENT = " "
MIGRATION_RUNNING = uint32(1)
MIGRATION_NOT_RUNNING = uint32(0)
)

func isPretty(r *libhttp.Request) bool {
Expand Down Expand Up @@ -156,6 +164,9 @@ func (self *HttpServer) Serve(listener net.Listener) {
self.registerEndpoint(p, "del", "/cluster/shard_spaces/:db/:name", self.dropShardSpace)
self.registerEndpoint(p, "post", "/cluster/database_configs/:db", self.configureDatabase)

// migrates leveldb data from 0.7 to 0.8 format.
self.registerEndpoint(p, "post", "/cluster/migrate_data", self.migrateData)

// return whether the cluster is in sync or not
self.registerEndpoint(p, "get", "/sync", self.isInSync)

Expand Down Expand Up @@ -1213,3 +1224,20 @@ func (self *HttpServer) configureDatabase(w libhttp.ResponseWriter, r *libhttp.R
return libhttp.StatusCreated, nil
})
}

func (self *HttpServer) migrateData(w libhttp.ResponseWriter, r *libhttp.Request) {
self.tryAsClusterAdmin(w, r, func(u User) (int, interface{}) {
if !atomic.CompareAndSwapUint32(&self.migrationRunning, MIGRATION_NOT_RUNNING, MIGRATION_RUNNING) {
return libhttp.StatusForbidden, fmt.Errorf("A migration is already running")
}
go func() {
log.Info("Starting Migration")
defer atomic.CompareAndSwapUint32(&self.migrationRunning, MIGRATION_RUNNING, MIGRATION_NOT_RUNNING)
dataMigrator := migration.NewDataMigrator(
self.coordinator.(*coordinator.CoordinatorImpl), self.clusterConfig, self.config, self.config.DataDir, "shard_db", self.clusterConfig.MetaStore)
dataMigrator.Migrate()
}()

return libhttp.StatusAccepted, nil
})
}
2 changes: 0 additions & 2 deletions coordinator/coordinator.go
Original file line number Diff line number Diff line change
Expand Up @@ -204,12 +204,10 @@ func (self *CoordinatorImpl) runDropSeriesQuery(querySpec *parser.QuerySpec, ser
return err
}
defer seriesWriter.Close()
fmt.Println("DROP series")
err := self.raftServer.DropSeries(db, series)
if err != nil {
return err
}
fmt.Println("DROP returning nil")
return nil
}

Expand Down
157 changes: 2 additions & 155 deletions datastore/shard.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ import (
"fmt"
"math"
"regexp"
"strings"
"time"

"code.google.com/p/goprotobuf/proto"
Expand Down Expand Up @@ -95,7 +94,7 @@ func (self *Shard) Write(database string, series []*protocol.Series) error {

func (self *Shard) Query(querySpec *parser.QuerySpec, processor cluster.QueryProcessor) error {
if querySpec.IsListSeriesQuery() {
return self.executeListSeriesQuery(querySpec, processor)
return fmt.Errorf("List series queries should never come to the shard")
} else if querySpec.IsDeleteFromSeriesQuery() {
return self.executeDeleteQuery(querySpec, processor)
}
Expand Down Expand Up @@ -279,7 +278,7 @@ func (self *Shard) executeQueryForSeries(querySpec *parser.QuerySpec, seriesName
Points: seriesOutgoing.Points,
}
if !processor.YieldSeries(series) {
log.Info("Stopping processing")
log.Debug("Stopping processing")
shouldContinue = false
}
}
Expand All @@ -304,47 +303,6 @@ func (self *Shard) executeQueryForSeries(querySpec *parser.QuerySpec, seriesName
return nil
}

func (self *Shard) yieldSeriesNamesForDb(db string, yield func(string) bool) error {
dbNameStart := len(DATABASE_SERIES_INDEX_PREFIX)
pred := func(key []byte) bool {
return len(key) >= dbNameStart && bytes.Equal(key[:dbNameStart], DATABASE_SERIES_INDEX_PREFIX)
}

firstKey := append(DATABASE_SERIES_INDEX_PREFIX, []byte(db+"~")...)
itr := self.db.Iterator()
defer itr.Close()

for itr.Seek(firstKey); itr.Valid(); itr.Next() {
key := itr.Key()
if !pred(key) {
break
}
dbSeries := string(key[dbNameStart:])
parts := strings.Split(dbSeries, "~")
if len(parts) > 1 {
if parts[0] != db {
break
}
name := parts[1]
shouldContinue := yield(name)
if !shouldContinue {
return nil
}
}
}
if err := itr.Error(); err != nil {
return err
}
return nil
}

func (self *Shard) executeListSeriesQuery(querySpec *parser.QuerySpec, processor cluster.QueryProcessor) error {
return self.yieldSeriesNamesForDb(querySpec.Database(), func(_name string) bool {
name := _name
return processor.YieldPoint(&name, nil, nil)
})
}

func (self *Shard) executeDeleteQuery(querySpec *parser.QuerySpec, processor cluster.QueryProcessor) error {
query := querySpec.DeleteQuery()
series := query.GetFromClause()
Expand Down Expand Up @@ -571,114 +529,3 @@ func (self *Shard) getFieldsForSeries(db, series string, columns []string) ([]*m
}
return fields, nil
}

/* DEPRECATED methods do not use*/

// TODO: remove this on version 0.9 after people have had a chance to do migrations
func (self *Shard) getSeriesForDbAndRegexDEPRECATED(database string, regex *regexp.Regexp) []string {
names := []string{}
allSeries := self.metaStore.GetSeriesForDatabase(database)
for _, name := range allSeries {
if regex.MatchString(name) {
names = append(names, name)
}
}
return names
}

// TODO: remove this on version 0.9 after people have had a chance to do migrations
func (self *Shard) getSeriesForDatabaseDEPRECATED(database string) (series []string) {
err := self.yieldSeriesNamesForDb(database, func(name string) bool {
series = append(series, name)
return true
})
if err != nil {
log.Error("Cannot get series names for db %s: %s", database, err)
return nil
}
return series
}

// TODO: remove this function. I'm keeping it around for the moment since it'll probably have to be
// used in the DB upgrate/migration that moves metadata from the shard to Raft
func (self *Shard) getFieldsForSeriesDEPRECATED(db, series string, columns []string) ([]*metastore.Field, error) {
isCountQuery := false
if len(columns) > 0 && columns[0] == "*" {
columns = self.getColumnNamesForSeriesDEPRECATED(db, series)
} else if len(columns) == 0 {
isCountQuery = true
columns = self.getColumnNamesForSeriesDEPRECATED(db, series)
}
if len(columns) == 0 {
return nil, FieldLookupError{"Couldn't look up columns for series: " + series}
}

fields := make([]*metastore.Field, len(columns), len(columns))

for i, name := range columns {
id, errId := self.getIdForDbSeriesColumnDEPRECATED(&db, &series, &name)
if errId != nil {
return nil, errId
}
if id == nil {
return nil, FieldLookupError{"Field " + name + " doesn't exist in series " + series}
}
idInt, err := binary.ReadUvarint(bytes.NewBuffer(id))
if err != nil {
return nil, err
}
fields[i] = &metastore.Field{Name: name, Id: idInt}
}

// if it's a count query we just want the column that will be the most efficient to
// scan through. So find that and return it.
if isCountQuery {
bestField := fields[0]
return []*metastore.Field{bestField}, nil
}
return fields, nil
}

// TODO: remove this function. I'm keeping it around for the moment since it'll probably have to be
// used in the DB upgrate/migration that moves metadata from the shard to Raft
func (self *Shard) getColumnNamesForSeriesDEPRECATED(db, series string) []string {
dbNameStart := len(SERIES_COLUMN_INDEX_PREFIX)
seekKey := append(SERIES_COLUMN_INDEX_PREFIX, []byte(db+"~"+series+"~")...)
pred := func(key []byte) bool {
return len(key) >= dbNameStart && bytes.Equal(key[:dbNameStart], SERIES_COLUMN_INDEX_PREFIX)
}
it := self.db.Iterator()
defer it.Close()

names := make([]string, 0)
for it.Seek(seekKey); it.Valid(); it.Next() {
key := it.Key()
if !pred(key) {
break
}
dbSeriesColumn := string(key[dbNameStart:])
parts := strings.Split(dbSeriesColumn, "~")
if len(parts) > 2 {
if parts[0] != db || parts[1] != series {
break
}
names = append(names, parts[2])
}
}
if err := it.Error(); err != nil {
log.Error("Error while getting columns for series %s: %s", series, err)
return nil
}
return names
}

// TODO: remove this after a version that doesn't support migration from old non-raft metastore
func (self *Shard) getIdForDbSeriesColumnDEPRECATED(db, series, column *string) (ret []byte, err error) {
s := fmt.Sprintf("%s~%s~%s", *db, *series, *column)
b := []byte(s)
key := append(SERIES_COLUMN_INDEX_PREFIX, b...)
if ret, err = self.db.Get(key); err != nil {
return nil, err
}
return ret, nil
}
4 changes: 4 additions & 0 deletions datastore/shard_datastore.go
Original file line number Diff line number Diff line change
Expand Up @@ -296,6 +296,10 @@ type FieldLookupError struct {
message string
}

func NewFieldLookupError(message string) *FieldLookupError {
return &FieldLookupError{message}
}

func (self FieldLookupError) Error() string {
return self.message
}
Expand Down
Loading