diff --git a/.gitignore b/.gitignore index 81213335e87..b8ed456b804 100644 --- a/.gitignore +++ b/.gitignore @@ -24,6 +24,7 @@ build/ # executables /server +/daemon /main godef diff --git a/build.sh b/build.sh index e94d1172f43..fc95aec0917 100755 --- a/build.sh +++ b/build.sh @@ -14,7 +14,7 @@ go get github.com/fitstar/falcore/filter go get code.google.com/p/log4go go get code.google.com/p/go.crypto/bcrypt go get launchpad.net/gocheck -go get github.com/pmylund/go-cache +go get github.com/influxdb/go-cache patch="off" cflags="-m32" diff --git a/config.json.sample b/config.json.sample index 9e652810815..a9697483d00 100644 --- a/config.json.sample +++ b/config.json.sample @@ -5,5 +5,6 @@ "RaftServerPort": 8090, "SeedServers": [], "DataDir": "/tmp/influxdb/development/db", - "RaftDir": "/tmp/influxdb/development/raft" + "RaftDir": "/tmp/influxdb/development/raft", + "ProtobufPort": 8099 } diff --git a/package.sh b/package.sh index 04678ae4bf5..ef353f760da 100755 --- a/package.sh +++ b/package.sh @@ -34,7 +34,7 @@ function packae_source { git checkout . popd - rm -f server + rm -f influxd git ls-files --others | egrep -v 'github|launchpad|code.google' > /tmp/influxdb.ignored echo "pkg/*" >> /tmp/influxdb.ignored echo "packages/*" >> /tmp/influxdb.ignored @@ -62,7 +62,7 @@ function package_files { package_admin_interface - mv server build/influxdb + mv daemon build/influxdb # cp -R src/admin/site/ build/admin/ mkdir build/admin @@ -117,14 +117,14 @@ function build_packages { function setup_version { echo "Changing version from dev to $influxdb_version" sha1=`git rev-list --max-count=1 HEAD` - sed -i.bak -e "s/version = \"dev\"/version = \"$influxdb_version\"/" -e "s/gitSha\s*=\s*\"HEAD\"/gitSha = \"$sha1\"/" src/server/server.go + sed -i.bak -e "s/version = \"dev\"/version = \"$influxdb_version\"/" -e "s/gitSha\s*=\s*\"HEAD\"/gitSha = \"$sha1\"/" src/daemon/influxd.go sed -i.bak -e "s/REPLACE_VERSION/$influxdb_version/" scripts/post_install.sh } function revert_version { - if [ -e src/server/server.go.bak ]; then - rm src/server/server.go - mv src/server/server.go.bak src/server/server.go + if [ -e src/daemon/influxd.go.bak ]; then + rm src/daemon/influxd.go + mv src/daemon/influxd.go.bak src/daemon/influxd.go fi if [ -e scripts/post_install.sh ]; then diff --git a/src/api/http/api.go b/src/api/http/api.go index be99a80a5a7..98a3778e93e 100644 --- a/src/api/http/api.go +++ b/src/api/http/api.go @@ -107,10 +107,12 @@ func (self *HttpServer) Serve(listener net.Listener) { } func (self *HttpServer) Close() { - log.Info("Closing http server") - self.conn.Close() - log.Info("Waiting for all requests to finish before killing the process") - <-self.shutdown + if self.conn != nil { + log.Info("Closing http server") + self.conn.Close() + log.Info("Waiting for all requests to finish before killing the process") + <-self.shutdown + } } type Writer interface { @@ -366,23 +368,16 @@ func (self *HttpServer) writePoints(w libhttp.ResponseWriter, r *libhttp.Request } type createDatabaseRequest struct { - Name string `json:"name"` -} - -type Database struct { - Name string `json:"name"` + Name string `json:"name"` + ReplicationFactor uint8 `json:"replicationFactor"` } func (self *HttpServer) listDatabases(w libhttp.ResponseWriter, r *libhttp.Request) { self.tryAsClusterAdmin(w, r, func(u common.User) (int, interface{}) { - dbNames, err := self.coordinator.ListDatabases(u) + databases, err := self.coordinator.ListDatabases(u) if err != nil { return errorToStatusCode(err), err.Error() } - databases := make([]*Database, 0, len(dbNames)) - for _, db := range dbNames { - databases = append(databases, &Database{db}) - } return libhttp.StatusOK, databases }) } @@ -398,7 +393,7 @@ func (self *HttpServer) createDatabase(w libhttp.ResponseWriter, r *libhttp.Requ if err != nil { return libhttp.StatusBadRequest, err.Error() } - err = self.coordinator.CreateDatabase(user, createRequest.Name) + err = self.coordinator.CreateDatabase(user, createRequest.Name, createRequest.ReplicationFactor) if err != nil { return errorToStatusCode(err), err.Error() } diff --git a/src/api/http/api_test.go b/src/api/http/api_test.go index 39e566e5458..9799983e2cb 100644 --- a/src/api/http/api_test.go +++ b/src/api/http/api_test.go @@ -3,6 +3,7 @@ package http import ( "bytes" "common" + "coordinator" "encoding/base64" "encoding/json" "fmt" @@ -109,13 +110,13 @@ func (self *MockCoordinator) WriteSeriesData(_ common.User, db string, series *p return nil } -func (self *MockCoordinator) CreateDatabase(_ common.User, db string) error { +func (self *MockCoordinator) CreateDatabase(_ common.User, db string, _ uint8) error { self.db = db return nil } -func (self *MockCoordinator) ListDatabases(_ common.User) ([]string, error) { - return []string{"db1", "db2"}, nil +func (self *MockCoordinator) ListDatabases(_ common.User) ([]*coordinator.Database, error) { + return []*coordinator.Database{&coordinator.Database{"db1", 1}, &coordinator.Database{"db2", 1}}, nil } func (self *MockCoordinator) DropDatabase(_ common.User, db string) error { @@ -123,6 +124,14 @@ func (self *MockCoordinator) DropDatabase(_ common.User, db string) error { return nil } +func (self *MockCoordinator) ReplicateWrite(request *protocol.Request) error { + return nil +} + +func (self *MockCoordinator) ReplayReplication(request *protocol.Request, replicationFactor *uint8, owningServerId *uint32, lastSeenSequenceNumber *uint64) { + return +} + func (self *ApiSuite) formatUrl(path string, args ...interface{}) string { path = fmt.Sprintf(path, args...) port := self.listener.Addr().(*net.TCPAddr).Port @@ -692,10 +701,10 @@ func (self *ApiSuite) TestDatabasesIndex(c *C) { defer resp.Body.Close() body, err := ioutil.ReadAll(resp.Body) c.Assert(err, IsNil) - users := []*Database{} + users := []*coordinator.Database{} err = json.Unmarshal(body, &users) c.Assert(err, IsNil) - c.Assert(users, DeepEquals, []*Database{&Database{"db1"}, &Database{"db2"}}) + c.Assert(users, DeepEquals, []*coordinator.Database{&coordinator.Database{"db1", uint8(1)}, &coordinator.Database{"db2", uint8(1)}}) } } @@ -710,8 +719,8 @@ func (self *ApiSuite) TestBasicAuthentication(c *C) { body, err := ioutil.ReadAll(resp.Body) c.Assert(err, IsNil) c.Assert(resp.StatusCode, Equals, libhttp.StatusOK) - users := []*Database{} + users := []*coordinator.Database{} err = json.Unmarshal(body, &users) c.Assert(err, IsNil) - c.Assert(users, DeepEquals, []*Database{&Database{"db1"}, &Database{"db2"}}) + c.Assert(users, DeepEquals, []*coordinator.Database{&coordinator.Database{"db1", 1}, &coordinator.Database{"db2", 1}}) } diff --git a/src/common/helpers.go b/src/common/helpers.go index ae3ca72533a..76bf3747252 100644 --- a/src/common/helpers.go +++ b/src/common/helpers.go @@ -1,6 +1,9 @@ package common import ( + "bytes" + "crypto/sha1" + "encoding/binary" "encoding/json" "fmt" "protocol" @@ -17,3 +20,16 @@ func StringToSeriesArray(seriesString string, args ...interface{}) ([]*protocol. func CurrentTime() int64 { return time.Now().UnixNano() / int64(1000) } + +func RingLocation(database *string, timeSeries *string, time *int64) int { + hasher := sha1.New() + hasher.Write([]byte(fmt.Sprintf("%s%s%d", *database, *timeSeries, *time))) + buf := bytes.NewBuffer(hasher.Sum(nil)) + var n int64 + binary.Read(buf, binary.LittleEndian, &n) + nInt := int(n) + if nInt < 0 { + nInt = nInt * -1 + } + return nInt +} diff --git a/src/configuration/configuration.go b/src/configuration/configuration.go index f3c76c9147a..a5ab9a18c9b 100644 --- a/src/configuration/configuration.go +++ b/src/configuration/configuration.go @@ -5,6 +5,7 @@ import ( "fmt" "io/ioutil" "log" + "os" ) type Configuration struct { @@ -15,6 +16,8 @@ type Configuration struct { SeedServers []string DataDir string RaftDir string + ProtobufPort int + Hostname string } func LoadConfiguration(fileName string) *Configuration { @@ -43,3 +46,24 @@ func (self *Configuration) AdminHttpPortString() string { func (self *Configuration) ApiHttpPortString() string { return fmt.Sprintf(":%d", self.ApiHttpPort) } + +func (self *Configuration) ProtobufPortString() string { + return fmt.Sprintf(":%d", self.ProtobufPort) +} + +func (self *Configuration) HostnameOrDetect() string { + if self.Hostname != "" { + return self.Hostname + } else { + n, err := os.Hostname() + if err == nil { + return n + } else { + return "localhost" + } + } +} + +func (self *Configuration) ProtobufConnectionString() string { + return fmt.Sprintf("%s:%d", self.HostnameOrDetect(), self.ProtobufPort) +} diff --git a/src/coordinator/client.go b/src/coordinator/client.go deleted file mode 100644 index f2f4440fd7c..00000000000 --- a/src/coordinator/client.go +++ /dev/null @@ -1 +0,0 @@ -package coordinator diff --git a/src/coordinator/client_server_test.go b/src/coordinator/client_server_test.go new file mode 100644 index 00000000000..00c27aded35 --- /dev/null +++ b/src/coordinator/client_server_test.go @@ -0,0 +1,99 @@ +package coordinator + +import ( + "datastore" + "encoding/binary" + "fmt" + . "launchpad.net/gocheck" + "net" + "os" + "protocol" + "time" +) + +type ClientServerSuite struct{} + +var _ = Suite(&ClientServerSuite{}) + +const DB_DIR = "/tmp/influxdb/datastore_test" + +func newDatastore(c *C) datastore.Datastore { + os.MkdirAll(DB_DIR, 0744) + db, err := datastore.NewLevelDbDatastore(DB_DIR) + c.Assert(err, Equals, nil) + return db +} + +func cleanDb(db datastore.Datastore) { + if db != nil { + db.Close() + } + os.RemoveAll(DB_DIR) +} + +type MockRequestHandler struct { +} + +var writeOk = protocol.Response_WRITE_OK + +func (self *MockRequestHandler) HandleRequest(request *protocol.Request, conn net.Conn) error { + response := &protocol.Response{RequestId: request.Id, Type: &writeOk} + data, _ := response.Encode() + binary.Write(conn, binary.LittleEndian, uint32(len(data))) + conn.Write(data) + return nil +} + +func (self *ClientServerSuite) TestClientCanMakeRequests(c *C) { + server := startAndVerifyCluster(1, c)[0] + defer clean(server) + db := newDatastore(c) + coord := NewCoordinatorImpl(db, server, server.clusterConfig) + coord.ConnectToProtobufServers(server.config.ProtobufConnectionString()) + requestHandler := &MockRequestHandler{} + protobufServer := NewProtobufServer(":8091", requestHandler) + go protobufServer.ListenAndServe() + c.Assert(protobufServer, Not(IsNil)) + protobufClient := NewProtobufClient("localhost:8091") + responseStream := make(chan *protocol.Response, 1) + + mock := ` + { + "points": [ + { "values": [{"int64_value": 3}]} + ], + "name": "foo", + "fields": ["val"] + }` + fmt.Println("creating series") + series := stringToSeries(mock, c) + t := time.Now().Unix() + s := uint64(1) + series.Points[0].Timestamp = &t + series.Points[0].SequenceNumber = &s + id := uint32(1) + database := "pauldb" + proxyWrite := protocol.Request_PROXY_WRITE + request := &protocol.Request{Id: &id, Type: &proxyWrite, Database: &database, Series: series} + + time.Sleep(time.Second * 1) + err := protobufClient.MakeRequest(request, responseStream) + c.Assert(err, IsNil) + timer := time.NewTimer(time.Second) + select { + case <-timer.C: + c.Error("Timed out waiting for response") + case response := <-responseStream: + c.Assert(*response.Type, Equals, protocol.Response_WRITE_OK) + } +} + +func (self *ClientServerSuite) TestClientReconnectsIfDisconnected(c *C) { +} + +func (self *ClientServerSuite) TestServerExecutesReplayRequestIfWriteIsOutOfSequence(c *C) { +} + +func (self *ClientServerSuite) TestServerKillsOldHandlerWhenClientReconnects(c *C) { + +} diff --git a/src/coordinator/cluster_configuration.go b/src/coordinator/cluster_configuration.go index f9191ac9631..23d6287b226 100644 --- a/src/coordinator/cluster_configuration.go +++ b/src/coordinator/cluster_configuration.go @@ -1,38 +1,75 @@ package coordinator import ( + "common" "errors" "fmt" "sync" + "sync/atomic" ) +/* + This struct stores all the metadata confiugration information about a running cluster. This includes + the servers in the cluster and their state, databases, users, and which continuous queries are running. + + ClusterVersion is a monotonically increasing int that keeps track of different server configurations. + For example, when you spin up a cluster and start writing data, the version will be 1. If you expand the + cluster the version will be bumped. Using this the cluster is able to run two versions simultaneously + while the new servers are being brought online. +*/ type ClusterConfiguration struct { - createDatabaseLock sync.RWMutex - databaseNames map[string]bool - usersLock sync.RWMutex - clusterAdmins map[string]*clusterAdmin - dbUsers map[string]map[string]*dbUser - servers []*ClusterServer - serversLock sync.RWMutex - hasRunningServers bool - currentServerId uint32 + createDatabaseLock sync.RWMutex + databaseReplicationFactors map[string]uint8 + usersLock sync.RWMutex + clusterAdmins map[string]*clusterAdmin + dbUsers map[string]map[string]*dbUser + servers []*ClusterServer + serversLock sync.RWMutex + hasRunningServers bool + currentServerId uint32 + localServerId uint32 + ClusterVersion uint32 } -const NUMBER_OF_RING_LOCATIONS = 10000 +type Database struct { + Name string `json:"name"` + ReplicationFactor uint8 `json:"replicationFactor"` +} func NewClusterConfiguration() *ClusterConfiguration { return &ClusterConfiguration{ - databaseNames: make(map[string]bool), - clusterAdmins: make(map[string]*clusterAdmin), - dbUsers: make(map[string]map[string]*dbUser), - servers: make([]*ClusterServer, 0), + databaseReplicationFactors: make(map[string]uint8), + clusterAdmins: make(map[string]*clusterAdmin), + dbUsers: make(map[string]map[string]*dbUser), + servers: make([]*ClusterServer, 0), } } +func (self *ClusterConfiguration) IsSingleServer() bool { + return len(self.servers) < 2 +} + +func (self *ClusterConfiguration) Servers() []*ClusterServer { + return self.servers +} + +func (self *ClusterConfiguration) GetReplicationFactor(database *string) uint8 { + return self.databaseReplicationFactors[*database] +} + func (self *ClusterConfiguration) IsActive() bool { return self.hasRunningServers } +func (self *ClusterConfiguration) SetActive() { + self.serversLock.Lock() + defer self.serversLock.Unlock() + for _, server := range self.servers { + server.State = Running + } + atomic.AddUint32(&self.ClusterVersion, uint32(1)) +} + func (self *ClusterConfiguration) GetServerByRaftName(name string) *ClusterServer { for _, server := range self.servers { if server.RaftName == name { @@ -42,9 +79,161 @@ func (self *ClusterConfiguration) GetServerByRaftName(name string) *ClusterServe return nil } +func (self *ClusterConfiguration) GetServerById(id *uint32) *ClusterServer { + for _, server := range self.servers { + if server.Id == *id { + return server + } + } + return nil +} + +type serverToQuery struct { + server *ClusterServer + ringLocationsToQuery uint32 +} + +// This function will return an array of servers to query and the number of ring locations to return per server. +// Queries are issued to every nth server in the cluster where n is the replication factor. We need the local host id +// because we want to issue the query locally and the nth servers from there. +// Optimally, the number of servers in a cluster will be evenly divisible by the replication factors used. For example, +// if you have a cluster with databases with RFs of 1, 2, and 3: optimal cluster sizes would be 6, 12, 18, 24, 30, etc. +// If that's not the case, one or more servers will have to filter out data from other servers on the fly, which could +// be a little more expensive. +func (self *ClusterConfiguration) GetServersToMakeQueryTo(localHostId uint32, database *string) (servers []*serverToQuery, replicationFactor uint32) { + replicationFactor = uint32(self.GetReplicationFactor(database)) + replicationFactorInt := int(replicationFactor) + index := 0 + for i, s := range self.servers { + if s.Id == localHostId { + index = i % replicationFactorInt + break + } + } + servers = make([]*serverToQuery, 0, len(self.servers)/replicationFactorInt) + serverCount := len(self.servers) + for index = index; index < serverCount; index += replicationFactorInt { + server := self.servers[index] + servers = append(servers, &serverToQuery{server, replicationFactor}) + } + // need to maybe add a server and set which ones filter their data + if serverCount%replicationFactorInt != 0 { + /* + Here's what this looks like with a few different ring sizes and replication factors. + + server indexes + 0 1 2 3 4 + + 0 2 4 6 - 0 only sends his + 1 3 5 - add 0 and have 1 only send his + + 0 3 6 - have 0 only send his and 4 + 1 4 7 - have 1 only send his and 0 + 2 5 - add 0 and have 2 send only his and 1 + + + server indexes + 0 1 2 3 4 5 6 7 + + 0 3 6 9 - have 0 only send his and 7 + 1 4 7 10 - have 1 only send his and 0 + 2 5 8 - add 0 and have 2 only send his and 1 + + We see that there are the number of cases equal to the replication factor. The last case + always has us adding the first server in the ring. Then we're determining how much data + the local server should be ignoring. + */ + lastIndexAdded := index - replicationFactorInt + if serverCount-lastIndexAdded == replicationFactorInt { + servers[0].ringLocationsToQuery = uint32(replicationFactorInt - 1) + servers = append(servers, &serverToQuery{self.servers[0], replicationFactor}) + } else { + servers[0].ringLocationsToQuery = uint32(replicationFactorInt - 1) + } + } + return servers, replicationFactor +} + +// This method returns a function that can be passed into the datastore's ExecuteQuery method. It will tell the datastore +// if each point is something that should be returned in the query based on its ring location and if the query calls for +// data from different replicas. +func (self *ClusterConfiguration) GetRingFilterFunction(database string, countOfServersToInclude uint32) func(database, series *string, time *int64) bool { + serversToInclude := make([]*ClusterServer, 0, countOfServersToInclude) + countServers := int(countOfServersToInclude) + for i, s := range self.servers { + if s.Id == self.localServerId { + serversToInclude = append(serversToInclude, s) + for j := i - 1; j >= 0 && len(serversToInclude) < countServers; j-- { + serversToInclude = append(serversToInclude, self.servers[j]) + } + if len(serversToInclude) < countServers { + for j := len(self.servers) - 1; len(serversToInclude) < countServers; j-- { + serversToInclude = append(serversToInclude, self.servers[j]) + } + } + } + } + f := func(database, series *string, time *int64) bool { + location := common.RingLocation(database, series, time) + server := self.servers[location%len(self.servers)] + for _, s := range serversToInclude { + if s.Id == server.Id { + return false + } + } + return true + } + return f +} + +func (self *ClusterConfiguration) GetServerIndexByLocation(location *int) int { + return *location % len(self.servers) +} + +func (self *ClusterConfiguration) GetOwnerIdByLocation(location *int) *uint32 { + return &self.servers[self.GetServerIndexByLocation(location)].Id +} + +func (self *ClusterConfiguration) GetServersByRingLocation(database *string, location *int) []*ClusterServer { + index := *location % len(self.servers) + _, replicas := self.GetServersByIndexAndReplicationFactor(database, &index) + return replicas +} + +// This function returns the server that owns the ring location and a set of servers that are replicas (which include the onwer) +func (self *ClusterConfiguration) GetServersByIndexAndReplicationFactor(database *string, index *int) (*ClusterServer, []*ClusterServer) { + replicationFactor := int(self.GetReplicationFactor(database)) + serverCount := len(self.servers) + owner := self.servers[*index] + if replicationFactor >= serverCount { + return owner, self.servers + } + owners := make([]*ClusterServer, 0, replicationFactor) + ownerCount := 0 + for i := *index; i < serverCount && ownerCount < replicationFactor; i++ { + owners = append(owners, self.servers[i]) + ownerCount++ + } + for i := 0; ownerCount < replicationFactor; i++ { + owners = append(owners, self.servers[i]) + ownerCount++ + } + return owner, owners +} + +func (self *ClusterConfiguration) GetServerByProtobufConnectionString(connectionString string) *ClusterServer { + for _, server := range self.servers { + if server.ProtobufConnectionString == connectionString { + return server + } + } + return nil +} + func (self *ClusterConfiguration) UpdateServerState(serverId uint32, state ServerState) error { self.serversLock.Lock() defer self.serversLock.Unlock() + atomic.AddUint32(&self.ClusterVersion, uint32(1)) for _, server := range self.servers { if server.Id == serverId { if state == Running { @@ -66,25 +255,25 @@ func (self *ClusterConfiguration) AddPotentialServer(server *ClusterServer) { self.servers = append(self.servers, server) } -func (self *ClusterConfiguration) GetDatabases() []string { +func (self *ClusterConfiguration) GetDatabases() []*Database { self.createDatabaseLock.RLock() defer self.createDatabaseLock.RUnlock() - names := make([]string, 0, len(self.databaseNames)) - for name, _ := range self.databaseNames { - names = append(names, name) + dbs := make([]*Database, 0, len(self.databaseReplicationFactors)) + for name, rf := range self.databaseReplicationFactors { + dbs = append(dbs, &Database{Name: name, ReplicationFactor: rf}) } - return names + return dbs } -func (self *ClusterConfiguration) CreateDatabase(name string) error { +func (self *ClusterConfiguration) CreateDatabase(name string, replicationFactor uint8) error { self.createDatabaseLock.Lock() defer self.createDatabaseLock.Unlock() - if _, ok := self.databaseNames[name]; ok { + if _, ok := self.databaseReplicationFactors[name]; ok { return fmt.Errorf("database %s exists", name) } - self.databaseNames[name] = true + self.databaseReplicationFactors[name] = replicationFactor return nil } @@ -92,11 +281,11 @@ func (self *ClusterConfiguration) DropDatabase(name string) error { self.createDatabaseLock.Lock() defer self.createDatabaseLock.Unlock() - if _, ok := self.databaseNames[name]; !ok { + if _, ok := self.databaseReplicationFactors[name]; !ok { return fmt.Errorf("Database %s doesn't exist", name) } - delete(self.databaseNames, name) + delete(self.databaseReplicationFactors, name) self.usersLock.Lock() defer self.usersLock.Unlock() @@ -173,3 +362,9 @@ func (self *ClusterConfiguration) SaveClusterAdmin(u *clusterAdmin) { } self.clusterAdmins[u.GetName()] = u } + +func (self *ClusterConfiguration) GetDatabaseReplicationFactor(name string) uint8 { + self.createDatabaseLock.RLock() + defer self.createDatabaseLock.RUnlock() + return self.databaseReplicationFactors[name] +} diff --git a/src/coordinator/cluster_server.go b/src/coordinator/cluster_server.go index 407868adbaa..8991629f311 100644 --- a/src/coordinator/cluster_server.go +++ b/src/coordinator/cluster_server.go @@ -1,10 +1,16 @@ package coordinator +import ( + log "code.google.com/p/log4go" +) + type ClusterServer struct { - Id uint32 - RaftName string - State ServerState - RaftConnectionString string + Id uint32 + RaftName string + State ServerState + RaftConnectionString string + ProtobufConnectionString string + protobufClient *ProtobufClient } type ServerState int @@ -16,3 +22,8 @@ const ( Running Potential ) + +func (self *ClusterServer) Connect() { + log.Info("ClusterServer: %d connecting to: %s", self.Id, self.ProtobufConnectionString) + self.protobufClient = NewProtobufClient(self.ProtobufConnectionString) +} diff --git a/src/coordinator/command.go b/src/coordinator/command.go index ef863202d62..76a597b6766 100644 --- a/src/coordinator/command.go +++ b/src/coordinator/command.go @@ -23,11 +23,12 @@ func (c *DropDatabaseCommand) Apply(server raft.Server) (interface{}, error) { } type CreateDatabaseCommand struct { - Name string `json:"name"` + Name string `json:"name"` + ReplicationFactor uint8 `json:"replicationFactor"` } -func NewCreateDatabaseCommand(name string) *CreateDatabaseCommand { - return &CreateDatabaseCommand{name} +func NewCreateDatabaseCommand(name string, replicationFactor uint8) *CreateDatabaseCommand { + return &CreateDatabaseCommand{name, replicationFactor} } func (c *CreateDatabaseCommand) CommandName() string { @@ -36,7 +37,7 @@ func (c *CreateDatabaseCommand) CommandName() string { func (c *CreateDatabaseCommand) Apply(server raft.Server) (interface{}, error) { config := server.Context().(*ClusterConfiguration) - err := config.CreateDatabase(c.Name) + err := config.CreateDatabase(c.Name, c.ReplicationFactor) return nil, err } @@ -116,3 +117,24 @@ func (c *UpdateServerStateCommand) Apply(server raft.Server) (interface{}, error err := config.UpdateServerState(c.ServerId, c.State) return nil, err } + +type InfluxJoinCommand struct { + Name string `json:"name"` + ConnectionString string `json:"connectionString"` + ProtobufConnectionString string `json:"protobufConnectionString"` +} + +// The name of the Join command in the log +func (c *InfluxJoinCommand) CommandName() string { + return "raft:join" +} + +func (c *InfluxJoinCommand) Apply(server raft.Server) (interface{}, error) { + err := server.AddPeer(c.Name, c.ConnectionString) + + return []byte("join"), err +} + +func (c *InfluxJoinCommand) NodeName() string { + return c.Name +} diff --git a/src/coordinator/coordinator.go b/src/coordinator/coordinator.go index 86f36956116..e90cd8b7d3a 100644 --- a/src/coordinator/coordinator.go +++ b/src/coordinator/coordinator.go @@ -1,72 +1,508 @@ package coordinator import ( + log "code.google.com/p/log4go" "common" "datastore" + "errors" "fmt" + "math" "parser" "protocol" "sync" + "sync/atomic" ) type CoordinatorImpl struct { - clusterConfiguration *ClusterConfiguration - raftServer ClusterConsensus - datastore datastore.Datastore - currentSequenceNumber uint32 - sequenceNumberLock sync.Mutex + clusterConfiguration *ClusterConfiguration + raftServer ClusterConsensus + datastore datastore.Datastore + localHostId uint32 + requestId uint32 + runningReplays map[string][]*protocol.Request + runningReplaysLock sync.Mutex } +var proxyWrite = protocol.Request_PROXY_WRITE +var replayReplication = protocol.Request_REPLICATION_REPLAY + +// this is the key used for the persistent atomic ints for sequence numbers +const POINT_SEQUENCE_NUMBER_KEY = "p" + +// actual point sequence numbers will have the first part of the number +// be a host id. This ensures that sequence numbers are unique across the cluster +const HOST_ID_OFFSET = uint64(10000) + +var queryRequest = protocol.Request_QUERY +var endStreamResponse = protocol.Response_END_STREAM +var queryResponse = protocol.Response_QUERY + func NewCoordinatorImpl(datastore datastore.Datastore, raftServer ClusterConsensus, clusterConfiguration *ClusterConfiguration) *CoordinatorImpl { return &CoordinatorImpl{ clusterConfiguration: clusterConfiguration, raftServer: raftServer, datastore: datastore, + runningReplays: make(map[string][]*protocol.Request), } } +// Distributes the query across the cluster and combines the results. Yields as they come in ensuring proper order. +// TODO: make this work even if there is a downed server in the cluster func (self *CoordinatorImpl) DistributeQuery(user common.User, db string, query *parser.Query, yield func(*protocol.Series) error) error { - return self.datastore.ExecuteQuery(user, db, query, yield) + if self.clusterConfiguration.IsSingleServer() { + return self.datastore.ExecuteQuery(user, db, query, yield, nil) + } + servers, replicationFactor := self.clusterConfiguration.GetServersToMakeQueryTo(self.localHostId, &db) + queryString := query.GetQueryString() + id := atomic.AddUint32(&self.requestId, uint32(1)) + userName := user.GetName() + responseChannels := make([]chan *protocol.Response, 0, len(servers)+1) + var localServerToQuery *serverToQuery + for _, server := range servers { + if server.server.Id == self.localHostId { + localServerToQuery = server + } else { + request := &protocol.Request{Type: &queryRequest, Query: &queryString, Id: &id, Database: &db, UserName: &userName} + if server.ringLocationsToQuery != replicationFactor { + r := server.ringLocationsToQuery + request.RingLocationsToQuery = &r + } + responseChan := make(chan *protocol.Response, 3) + server.server.protobufClient.MakeRequest(request, responseChan) + responseChannels = append(responseChannels, responseChan) + } + } + + local := make(chan *protocol.Response) + var nextPoint *protocol.Point + chanClosed := false + sendFromLocal := func(series *protocol.Series) error { + pointCount := len(series.Points) + if pointCount == 0 { + if nextPoint != nil { + series.Points = append(series.Points, nextPoint) + } + + if !chanClosed { + local <- &protocol.Response{Type: &endStreamResponse, Series: series} + chanClosed = true + close(local) + } + return nil + } + oldNextPoint := nextPoint + nextPoint = series.Points[pointCount-1] + series.Points[pointCount-1] = nil + if oldNextPoint != nil { + copy(series.Points[1:], series.Points[0:]) + series.Points[0] = oldNextPoint + } else { + series.Points = series.Points[:len(series.Points)-1] + } + + response := &protocol.Response{Series: series, Type: &queryResponse} + if nextPoint != nil { + response.NextPointTime = nextPoint.Timestamp + } + local <- response + return nil + } + responseChannels = append(responseChannels, local) + // TODO: wire up the willreturnsingleseries method and uncomment this line and delete the next one. + // isSingleSeriesQuery := query.WillReturnSingleSeries() + isSingleSeriesQuery := false + + go func() { + var ringFilter func(database, series *string, time *int64) bool + if replicationFactor != localServerToQuery.ringLocationsToQuery { + ringFilter = self.clusterConfiguration.GetRingFilterFunction(db, localServerToQuery.ringLocationsToQuery) + } + self.datastore.ExecuteQuery(user, db, query, sendFromLocal, ringFilter) + }() + self.streamResultsFromChannels(isSingleSeriesQuery, query.Ascending, responseChannels, yield) + return nil +} + +// This function streams results from servers and ensures that series are yielded in the proper order (which is expected by the engine) +func (self *CoordinatorImpl) streamResultsFromChannels(isSingleSeriesQuery, isAscending bool, channels []chan *protocol.Response, yield func(*protocol.Series) error) { + channelCount := len(channels) + closedChannels := 0 + responses := make([]*protocol.Response, 0) + var leftovers []*protocol.Series + + seriesNames := make(map[string]bool) + for closedChannels < channelCount { + for _, ch := range channels { + response := <-ch + if response != nil { + if *response.Type == protocol.Response_END_STREAM { + closedChannels++ + } + seriesNames[*response.Series.Name] = true + if response.Series.Points != nil { + responses = append(responses, response) + } + } + } + leftovers = self.yieldResults(isSingleSeriesQuery, isAscending, leftovers, responses, yield) + responses = make([]*protocol.Response, 0) + } + for _, leftover := range leftovers { + if len(leftover.Points) > 0 { + yield(leftover) + } + } + for n, _ := range seriesNames { + name := n + yield(&protocol.Series{Name: &name, Points: []*protocol.Point{}}) + } +} + +// Response objects have a nextPointTime that tells us what the time of the next series from a given server will be. +// Using that we can make sure to yield results in the correct order. So we can safely yield all results that fall before +// (or after if descending) the lowest (or highest if descending) nextPointTime. If they're all nil, then we're safe to +// yield everything +func (self *CoordinatorImpl) yieldResults(isSingleSeriesQuery, isAscending bool, leftovers []*protocol.Series, + responses []*protocol.Response, yield func(*protocol.Series) error) []*protocol.Series { + + if isSingleSeriesQuery { + var oldLeftOver *protocol.Series + if len(leftovers) > 0 { + oldLeftOver = leftovers[0] + } + leftover := self.yieldResultsForSeries(isAscending, oldLeftOver, responses, yield) + if leftover != nil { + return []*protocol.Series{leftover} + } + return nil + } + + // a query could yield results from multiple series, handle the cases individually + nameToSeriesResponses := make(map[string][]*protocol.Response) + for _, response := range responses { + seriesResponses := nameToSeriesResponses[*response.Series.Name] + if seriesResponses == nil { + seriesResponses = make([]*protocol.Response, 0) + } + nameToSeriesResponses[*response.Series.Name] = append(seriesResponses, response) + } + leftoverResults := make([]*protocol.Series, 0) + for _, responses := range nameToSeriesResponses { + response := responses[0] + var seriesLeftover *protocol.Series + for _, series := range leftovers { + if *series.Name == *response.Series.Name { + seriesLeftover = series + break + } + } + leftover := self.yieldResultsForSeries(isAscending, seriesLeftover, responses, yield) + if leftover != nil { + leftoverResults = append(leftoverResults, leftover) + } + } + return leftoverResults +} + +// Function yields all results that are safe to do so ensuring order. Returns all results that must wait for more from the servers. +func (self *CoordinatorImpl) yieldResultsForSeries(isAscending bool, leftover *protocol.Series, responses []*protocol.Response, yield func(*protocol.Series) error) *protocol.Series { + result := &protocol.Series{Name: responses[0].Series.Name, Points: make([]*protocol.Point, 0)} + if leftover == nil { + leftover = &protocol.Series{Name: responses[0].Series.Name, Points: make([]*protocol.Point, 0)} + } + + barrierTime := int64(0) + if isAscending { + barrierTime = math.MaxInt64 + } + var shouldYieldComparator func(rawTime *int64) bool + if isAscending { + shouldYieldComparator = func(rawTime *int64) bool { + if rawTime != nil && *rawTime < barrierTime { + return true + } else { + return false + } + } + } else { + shouldYieldComparator = func(rawTime *int64) bool { + if rawTime != nil && *rawTime > barrierTime { + return true + } else { + return false + } + } + } + // find the barrier time + for _, response := range responses { + if shouldYieldComparator(response.NextPointTime) { + barrierTime = *response.NextPointTime + } + } + // yield the points from leftover that are safe + for _, point := range leftover.Points { + if shouldYieldComparator(point.Timestamp) { + result.Points = append(result.Points, point) + } else { + break + } + } + // if they all got added, clear out the leftover + if len(leftover.Points) == len(result.Points) { + leftover.Points = make([]*protocol.Point, 0) + } + + if barrierTime == int64(0) || barrierTime == math.MaxInt64 { + // all the nextPointTimes were nil so we're safe to send everything + for _, response := range responses { + result.Points = append(result.Points, response.Series.Points...) + result.Points = append(result.Points, leftover.Points...) + leftover.Points = []*protocol.Point{} + } + } else { + for _, response := range responses { + if shouldYieldComparator(response.NextPointTime) { + // all points safe to yield + result.Points = append(result.Points, response.Series.Points...) + continue + } + + for i, point := range response.Series.Points { + if shouldYieldComparator(point.Timestamp) { + result.Points = append(result.Points, point) + } else { + // since they're returned in order, we can just append these to + // the leftover and break out. + leftover.Points = append(leftover.Points, response.Series.Points[i:]...) + break + } + } + } + } + + if isAscending { + result.SortPointsTimeAscending() + leftover.SortPointsTimeAscending() + } else { + result.SortPointsTimeDescending() + leftover.SortPointsTimeDescending() + } + + // Don't yield an empty points array, the engine will think it's the end of the stream. + // streamResultsFromChannels will send the empty ones after all channels have returned. + if len(result.Points) > 0 { + yield(result) + } + if len(leftover.Points) > 0 { + return leftover + } + return nil +} + +func (self *CoordinatorImpl) ReplayReplication(request *protocol.Request, replicationFactor *uint8, owningServerId *uint32, lastSeenSequenceNumber *uint64) { + key := fmt.Sprintf("%d_%d_%d_%d", *replicationFactor, *request.ClusterVersion, *request.OriginatingServerId, *owningServerId) + self.runningReplaysLock.Lock() + requestsWaitingToWrite := self.runningReplays[key] + if requestsWaitingToWrite != nil { + self.runningReplays[key] = append(requestsWaitingToWrite, request) + self.runningReplaysLock.Unlock() + return + } + self.runningReplays[key] = []*protocol.Request{request} + self.runningReplaysLock.Unlock() + + id := atomic.AddUint32(&self.requestId, uint32(1)) + replicationFactor32 := uint32(*replicationFactor) + database := "" + replayRequest := &protocol.Request{ + Id: &id, + Type: &replayReplication, + Database: &database, + ReplicationFactor: &replicationFactor32, + OriginatingServerId: request.OriginatingServerId, + OwnerServerId: owningServerId, + ClusterVersion: request.ClusterVersion, + LastKnownSequenceNumber: lastSeenSequenceNumber} + replayedRequests := make(chan *protocol.Response, 100) + server := self.clusterConfiguration.GetServerById(request.OriginatingServerId) + log.Error("COORD REPLAY: ", request, server, self.localHostId) + err := server.protobufClient.MakeRequest(replayRequest, replayedRequests) + if err != nil { + log.Error("COORD REPLAY ERROR: ", err) + return + } + for { + response := <-replayedRequests + if response == nil || *response.Type == protocol.Response_REPLICATION_REPLAY_END { + self.runningReplaysLock.Lock() + defer self.runningReplaysLock.Unlock() + for _, r := range self.runningReplays[key] { + err := self.datastore.LogRequestAndAssignSequenceNumber(r, replicationFactor, owningServerId) + if err != nil { + log.Error("Error writing waiting requests after replay: ", err) + } + self.datastore.WriteSeriesData(*r.Database, r.Series) + } + delete(self.runningReplays, key) + log.Info("Replay done for originating server %d and owner server %d", *request.OriginatingServerId, *owningServerId) + return + } + request := response.Request + // TODO: make request logging and datastore write atomic + err := self.datastore.LogRequestAndAssignSequenceNumber(request, replicationFactor, owningServerId) + if err != nil { + log.Error("ERROR writing replay: ", err) + } else { + self.datastore.WriteSeriesData(*request.Database, request.Series) + } + } } func (self *CoordinatorImpl) WriteSeriesData(user common.User, db string, series *protocol.Series) error { if !user.HasWriteAccess(db) { return common.NewAuthorizationError("Insufficient permission to write to %s", db) } + if len(series.Points) == 0 { + return fmt.Errorf("Can't write series with zero points.") + } + + // break the series object into separate ones based on their ring location + // if times server assigned, all the points will go to the same place + serverAssignedTime := true now := common.CurrentTime() + + // assign sequence numbers + lastNumber, err := self.datastore.AtomicIncrement(POINT_SEQUENCE_NUMBER_KEY, len(series.Points)) + if err != nil { + return err + } + lastNumber = lastNumber - uint64(len(series.Points)-1) for _, p := range series.Points { if p.Timestamp == nil { p.Timestamp = &now - self.sequenceNumberLock.Lock() - self.currentSequenceNumber += 1 - n := self.currentSequenceNumber - self.sequenceNumberLock.Unlock() - p.SequenceNumber = &n - } else if p.SequenceNumber == nil { - self.sequenceNumberLock.Lock() - self.currentSequenceNumber += 1 - n := self.currentSequenceNumber - self.sequenceNumberLock.Unlock() + } else { + serverAssignedTime = false + } + if p.SequenceNumber == nil { + n := self.sequenceNumberWithServerId(lastNumber) + lastNumber++ p.SequenceNumber = &n } } - return self.datastore.WriteSeriesData(db, series) + + // if it's a single server setup, we don't need to bother with getting ring + // locations or logging requests or any of that, so just write to the local db and be done. + if self.clusterConfiguration.IsSingleServer() { + err := self.writeSeriesToLocalStore(&db, series) + return err + } + + if serverAssignedTime { + location := common.RingLocation(&db, series.Name, series.Points[0].Timestamp) + i := self.clusterConfiguration.GetServerIndexByLocation(&location) + return self.handleClusterWrite(&i, &db, series) + } + + // TODO: make this more efficient and not suck so much + // not all the same, so break things up + + seriesToServerIndex := make(map[int]*protocol.Series) + for _, p := range series.Points { + location := common.RingLocation(&db, series.Name, p.Timestamp) + i := self.clusterConfiguration.GetServerIndexByLocation(&location) + s := seriesToServerIndex[i] + if s == nil { + s = &protocol.Series{Name: series.Name, Fields: series.Fields, Points: make([]*protocol.Point, 0)} + seriesToServerIndex[i] = s + } + s.Points = append(s.Points, p) + } + + for serverIndex, s := range seriesToServerIndex { + err := self.handleClusterWrite(&serverIndex, &db, s) + if err != nil { + return err + } + } + return nil } -func (self *CoordinatorImpl) CreateDatabase(user common.User, db string) error { +func (self *CoordinatorImpl) writeSeriesToLocalStore(db *string, series *protocol.Series) error { + return self.datastore.WriteSeriesData(*db, series) +} + +func (self *CoordinatorImpl) handleClusterWrite(serverIndex *int, db *string, series *protocol.Series) error { + owner, servers := self.clusterConfiguration.GetServersByIndexAndReplicationFactor(db, serverIndex) + for _, s := range servers { + if s.Id == self.localHostId { + // TODO: make storing of the data and logging of the request atomic + id := atomic.AddUint32(&self.requestId, uint32(1)) + request := &protocol.Request{Type: &proxyWrite, Database: db, Series: series, Id: &id} + request.OriginatingServerId = &self.localHostId + request.ClusterVersion = &self.clusterConfiguration.ClusterVersion + replicationFactor := self.clusterConfiguration.GetReplicationFactor(db) + err := self.datastore.LogRequestAndAssignSequenceNumber(request, &replicationFactor, &owner.Id) + if err != nil { + return self.proxyUntilSuccess(servers, db, series) + } + + // ignoring the error for writing to the local store because we still want to send to replicas + err = self.writeSeriesToLocalStore(db, series) + if err != nil { + log.Error("Couldn't write data to local store: ", err, request) + } + self.sendRequestToReplicas(request, servers) + + return nil + } + } + + // it didn't live locally so proxy it + return self.proxyUntilSuccess(servers, db, series) +} + +// This method will attemp to proxy the request until the call to proxy returns nil. If no server succeeds, +// the last err value will be returned. +func (self *CoordinatorImpl) proxyUntilSuccess(servers []*ClusterServer, db *string, series *protocol.Series) (err error) { + for _, s := range servers { + if s.Id != self.localHostId { + err = self.proxyWrite(s, db, series) + if err == nil { + return nil + } + } + } + return +} + +func (self *CoordinatorImpl) proxyWrite(clusterServer *ClusterServer, db *string, series *protocol.Series) error { + id := atomic.AddUint32(&self.requestId, uint32(1)) + request := &protocol.Request{Database: db, Type: &proxyWrite, Series: series, Id: &id} + request.ClusterVersion = &self.clusterConfiguration.ClusterVersion + responseChan := make(chan *protocol.Response, 1) + clusterServer.protobufClient.MakeRequest(request, responseChan) + response := <-responseChan + if *response.Type == protocol.Response_WRITE_OK { + return nil + } else { + return errors.New(response.GetErrorMessage()) + } +} + +func (self *CoordinatorImpl) CreateDatabase(user common.User, db string, replicationFactor uint8) error { if !user.IsClusterAdmin() { return common.NewAuthorizationError("Insufficient permission to create database") } - err := self.raftServer.CreateDatabase(db) + err := self.raftServer.CreateDatabase(db, replicationFactor) if err != nil { return err } return nil } -func (self *CoordinatorImpl) ListDatabases(user common.User) ([]string, error) { +func (self *CoordinatorImpl) ListDatabases(user common.User) ([]*Database, error) { if !user.IsClusterAdmin() { return nil, common.NewAuthorizationError("Insufficient permission to list databases") } @@ -171,7 +607,7 @@ func (self *CoordinatorImpl) CreateDbUser(requester common.User, db, username st return fmt.Errorf("Username cannot be empty") } - self.clusterConfiguration.CreateDatabase(db) // ignore the error since the db may exist + self.clusterConfiguration.CreateDatabase(db, uint8(1)) // ignore the error since the db may exist dbUsers := self.clusterConfiguration.dbUsers[db] if dbUsers != nil && dbUsers[username] != nil { return fmt.Errorf("User %s already exists", username) @@ -238,3 +674,42 @@ func (self *CoordinatorImpl) SetDbAdmin(requester common.User, db, username stri self.raftServer.SaveDbUser(user) return nil } + +func (self *CoordinatorImpl) ConnectToProtobufServers(localConnectionString string) error { + // We shouldn't hit this. It's possible during initialization if Raft hasn't + // finished spinning up then there won't be any servers in the cluster config. + if len(self.clusterConfiguration.Servers()) == 0 { + return errors.New("No Protobuf servers to connect to.") + } + for _, server := range self.clusterConfiguration.Servers() { + if server.ProtobufConnectionString != localConnectionString { + server.Connect() + } else { + self.localHostId = server.Id + self.clusterConfiguration.localServerId = server.Id + } + } + return nil +} + +func (self *CoordinatorImpl) ReplicateWrite(request *protocol.Request) error { + id := atomic.AddUint32(&self.requestId, uint32(1)) + request.Id = &id + location := common.RingLocation(request.Database, request.Series.Name, request.Series.Points[0].Timestamp) + replicas := self.clusterConfiguration.GetServersByRingLocation(request.Database, &location) + self.sendRequestToReplicas(request, replicas) + return nil +} + +func (self *CoordinatorImpl) sendRequestToReplicas(request *protocol.Request, replicas []*ClusterServer) { + request.Type = &replicateWrite + for _, server := range replicas { + if server.Id != self.localHostId { + server.protobufClient.MakeRequest(request, nil) + } + } +} + +func (self *CoordinatorImpl) sequenceNumberWithServerId(n uint64) uint64 { + return n*HOST_ID_OFFSET + uint64(self.localHostId) +} diff --git a/src/coordinator/coordinator_test.go b/src/coordinator/coordinator_test.go index 76952219b2a..2a5f49bd282 100644 --- a/src/coordinator/coordinator_test.go +++ b/src/coordinator/coordinator_test.go @@ -1,8 +1,8 @@ package coordinator import ( - . "checkers" . "common" + "configuration" "datastore" "encoding/json" "flag" @@ -56,6 +56,20 @@ func (self *DatastoreMock) DropDatabase(database string) error { return nil } +func (self *DatastoreMock) LogRequestAndAssignSequenceNumber(request *protocol.Request, replicationFactor *uint8, ownerServerId *uint32) error { + id := uint64(1) + request.SequenceNumber = &id + return nil +} + +func (self *DatastoreMock) AtomicIncrement(name string, val int) (uint64, error) { + return uint64(val), nil +} + +func (self *DatastoreMock) ReplayRequestsFromSequenceNumber(*uint32, *uint32, *uint32, *uint8, *uint64, func(*[]byte) error) error { + return nil +} + func stringToSeries(seriesString string, c *C) *protocol.Series { series := &protocol.Series{} err := json.Unmarshal([]byte(seriesString), &series) @@ -101,7 +115,8 @@ func newConfigAndServer(c *C) *RaftServer { path, err := ioutil.TempDir(os.TempDir(), "influxdb") c.Assert(err, IsNil) config := NewClusterConfiguration() - server := NewRaftServer(path, "localhost", 0, config) + setupConfig := &configuration.Configuration{Hostname: "localhost", RaftDir: path, RaftServerPort: 0} + server := NewRaftServer(setupConfig, config) return server } @@ -143,7 +158,7 @@ func (self *CoordinatorSuite) TestCanRecover(c *C) { path, port := server.path, server.port - server.CreateDatabase("db1") + server.CreateDatabase("db1", uint8(1)) assertConfigContains(server.port, "db1", true, c) server.Close() time.Sleep(SERVER_STARTUP_TIME) @@ -168,7 +183,7 @@ func (self *CoordinatorSuite) TestCanCreateCoordinatorsAndReplicate(c *C) { servers := startAndVerifyCluster(2, c) defer clean(servers...) - err := servers[0].CreateDatabase("db2") + err := servers[0].CreateDatabase("db2", uint8(1)) c.Assert(err, IsNil) time.Sleep(REPLICATION_LAG) assertConfigContains(servers[0].port, "db2", true, c) @@ -182,7 +197,7 @@ func (self *CoordinatorSuite) TestDoWriteOperationsFromNonLeaderServer(c *C) { servers := startAndVerifyCluster(2, c) - err := servers[1].CreateDatabase("db3") + err := servers[1].CreateDatabase("db3", uint8(1)) c.Assert(err, Equals, nil) time.Sleep(REPLICATION_LAG) assertConfigContains(servers[0].port, "db3", true, c) @@ -192,7 +207,7 @@ func (self *CoordinatorSuite) TestDoWriteOperationsFromNonLeaderServer(c *C) { func (self *CoordinatorSuite) TestNewServerJoiningClusterWillPickUpData(c *C) { server := startAndVerifyCluster(1, c)[0] defer clean(server) - server.CreateDatabase("db4") + server.CreateDatabase("db4", uint8(1)) assertConfigContains(server.port, "db4", true, c) server2 := newConfigAndServer(c) @@ -214,7 +229,7 @@ func (self *CoordinatorSuite) TestCanElectNewLeaderAndRecover(c *C) { servers := startAndVerifyCluster(3, c) defer clean(servers...) - err := servers[0].CreateDatabase("db5") + err := servers[0].CreateDatabase("db5", uint8(1)) c.Assert(err, Equals, nil) time.Sleep(REPLICATION_LAG) assertConfigContains(servers[0].port, "db5", true, c) @@ -231,7 +246,7 @@ func (self *CoordinatorSuite) TestCanElectNewLeaderAndRecover(c *C) { time.Sleep(3 * time.Second) leader, _ = servers[1].leaderConnectString() c.Assert(leader, Not(Equals), fmt.Sprintf("http://localhost:%d", servers[0].port)) - err = servers[1].CreateDatabase("db6") + err = servers[1].CreateDatabase("db6", uint8(1)) c.Assert(err, Equals, nil) time.Sleep(REPLICATION_LAG) assertConfigContains(servers[1].port, "db6", true, c) @@ -269,7 +284,7 @@ func (self *CoordinatorSuite) TestAutomaticDbCreations(c *C) { // the db should be in the index now dbs, err := coordinator.ListDatabases(root) c.Assert(err, IsNil) - c.Assert(dbs, DeepEquals, []string{"db1"}) + c.Assert(dbs, DeepEquals, []*Database{&Database{"db1", 1}}) // if the db is dropped it should remove the users as well c.Assert(coordinator.DropDatabase(root, "db1"), IsNil) @@ -464,15 +479,15 @@ func (self *CoordinatorSuite) TestUserDataReplication(c *C) { } func (self *CoordinatorSuite) createDatabases(servers []*RaftServer, c *C) { - err := servers[0].CreateDatabase("db1") + err := servers[0].CreateDatabase("db1", 0) c.Assert(err, IsNil) - err = servers[1].CreateDatabase("db2") + err = servers[1].CreateDatabase("db2", 1) c.Assert(err, IsNil) - err = servers[2].CreateDatabase("db3") + err = servers[2].CreateDatabase("db3", 3) c.Assert(err, IsNil) } -func (self *CoordinatorSuite) TestCanCreateDatabaseWithName(c *C) { +func (self *CoordinatorSuite) TestCanCreateDatabaseWithNameAndReplicationFactor(c *C) { if !*noSkipReplicationTests { c.Skip("Not running replication tests. goraft has some rough edges") } @@ -486,12 +501,12 @@ func (self *CoordinatorSuite) TestCanCreateDatabaseWithName(c *C) { for i := 0; i < 3; i++ { databases := servers[i].clusterConfig.GetDatabases() - c.Assert(databases, DeepEquals, []string{"db1", "db2", "db3"}) + c.Assert(databases, DeepEquals, []*Database{&Database{"db1", 1}, &Database{"db2", 1}, &Database{"db3", 3}}) } - err := servers[0].CreateDatabase("db3") + err := servers[0].CreateDatabase("db3", 1) c.Assert(err, ErrorMatches, ".*db3 exists.*") - err = servers[2].CreateDatabase("db3") + err = servers[2].CreateDatabase("db3", 1) c.Assert(err, ErrorMatches, ".*db3 exists.*") } @@ -525,44 +540,6 @@ func (self *CoordinatorSuite) TestCanDropDatabaseWithName(c *C) { c.Assert(err, ErrorMatches, ".*db3 doesn't exist.*") } -func (self *CoordinatorSuite) TestWillSetTimestampsAndSequenceNumbersForPointsWithout(c *C) { - datastoreMock := &DatastoreMock{} - coordinator := NewCoordinatorImpl(datastoreMock, nil, nil) - mock := ` - { - "points": [ - { - "values": [ - { - "int64_value": 3 - } - ], - "sequence_number": 1, - "timestamp": 23423 - } - ], - "name": "foo", - "fields": ["value"] - }` - series := stringToSeries(mock, c) - user := &MockUser{} - coordinator.WriteSeriesData(user, "foo", series) - c.Assert(datastoreMock.Series, DeepEquals, series) - mock = `{ - "points": [{"values": [{"int64_value": 3}]}], - "name": "foo", - "fields": ["value"] - }` - series = stringToSeries(mock, c) - beforeTime := CurrentTime() - coordinator.WriteSeriesData(user, "foo", series) - afterTime := CurrentTime() - c.Assert(datastoreMock.Series, Not(DeepEquals), stringToSeries(mock, c)) - c.Assert(*datastoreMock.Series.Points[0].SequenceNumber, Equals, uint32(1)) - t := *datastoreMock.Series.Points[0].Timestamp - c.Assert(t, InRange, beforeTime, afterTime) -} - func (self *CoordinatorSuite) TestCheckReadAccess(c *C) { datastoreMock := &DatastoreMock{} coordinator := NewCoordinatorImpl(datastoreMock, nil, nil) @@ -637,7 +614,7 @@ func (self *CoordinatorSuite) TestCanJoinAClusterWhenNotInitiallyPointedAtLeader }() time.Sleep(SERVER_STARTUP_TIME) - err = servers[0].CreateDatabase("db8") + err = servers[0].CreateDatabase("db8", uint8(1)) c.Assert(err, Equals, nil) time.Sleep(REPLICATION_LAG) assertConfigContains(newServer.port, "db8", true, c) diff --git a/src/coordinator/interface.go b/src/coordinator/interface.go index 2f8c0ee4fbd..d201a1b6a7c 100644 --- a/src/coordinator/interface.go +++ b/src/coordinator/interface.go @@ -2,6 +2,7 @@ package coordinator import ( "common" + "net" "parser" "protocol" ) @@ -18,8 +19,10 @@ type Coordinator interface { DistributeQuery(user common.User, db string, query *parser.Query, yield func(*protocol.Series) error) error WriteSeriesData(user common.User, db string, series *protocol.Series) error DropDatabase(user common.User, db string) error - CreateDatabase(user common.User, db string) error - ListDatabases(user common.User) ([]string, error) + CreateDatabase(user common.User, db string, replicationFactor uint8) error + ListDatabases(user common.User) ([]*Database, error) + ReplicateWrite(request *protocol.Request) error + ReplayReplication(request *protocol.Request, replicationFactor *uint8, owningServerId *uint32, lastSeenSequenceNumber *uint64) } type UserManager interface { @@ -51,7 +54,7 @@ type UserManager interface { } type ClusterConsensus interface { - CreateDatabase(name string) error + CreateDatabase(name string, replicationFactor uint8) error DropDatabase(name string) error SaveClusterAdminUser(u *clusterAdmin) error SaveDbUser(user *dbUser) error @@ -75,3 +78,7 @@ type ClusterConsensus interface { // When a cluster is turned on for the first time. CreateRootUser() error } + +type RequestHandler interface { + HandleRequest(request *protocol.Request, conn net.Conn) error +} diff --git a/src/coordinator/protobuf_client.go b/src/coordinator/protobuf_client.go new file mode 100644 index 00000000000..6a13ebd466c --- /dev/null +++ b/src/coordinator/protobuf_client.go @@ -0,0 +1,181 @@ +package coordinator + +import ( + "bytes" + "encoding/binary" + "io" + "log" + "net" + "protocol" + "sync" + "sync/atomic" + "time" +) + +type ProtobufClient struct { + conn net.Conn + hostAndPort string + requestBufferLock sync.RWMutex + requestBuffer map[uint32]*runningRequest + connectionStatus uint32 + reconnectWait sync.WaitGroup +} + +type runningRequest struct { + timeMade time.Time + responseChan chan *protocol.Response +} + +const ( + REQUEST_RETRY_ATTEMPTS = 3 + MAX_RESPONSE_SIZE = 1024 + IS_RECONNECTING = uint32(1) + IS_CONNECTED = uint32(0) + MAX_REQUEST_TIME = time.Second * 1200 + RECONNECT_RETRY_WAIT = time.Millisecond * 100 +) + +func NewProtobufClient(hostAndPort string) *ProtobufClient { + client := &ProtobufClient{hostAndPort: hostAndPort, requestBuffer: make(map[uint32]*runningRequest), connectionStatus: IS_CONNECTED} + go func() { + client.reconnect() + client.readResponses() + }() + go client.peridicallySweepTimedOutRequests() + return client +} + +func (self *ProtobufClient) Close() { + if self.conn != nil { + self.conn.Close() + self.conn = nil + } +} + +// Makes a request to the server. If the responseStream chan is not nil it will expect a response from the server +// with a matching request.Id. The REQUEST_RETRY_ATTEMPTS constant of 3 and the RECONNECT_RETRY_WAIT of 100ms means +// that an attempt to make a request to a downed server will take 300ms to time out. +func (self *ProtobufClient) MakeRequest(request *protocol.Request, responseStream chan *protocol.Response) error { + if responseStream != nil { + self.requestBufferLock.Lock() + + // this should actually never happen. The sweeper should clear out dead requests + // before the uint32 ids roll over. + if oldReq, alreadyHasRequestById := self.requestBuffer[*request.Id]; alreadyHasRequestById { + log.Println("ProtobufClient: error, already has a request with this id, must have timed out: ") + close(oldReq.responseChan) + } + self.requestBuffer[*request.Id] = &runningRequest{time.Now(), responseStream} + self.requestBufferLock.Unlock() + } + + data, err := request.Encode() + if err != nil { + return err + } + + // retry sending this at least a few times + for attempts := 0; attempts < REQUEST_RETRY_ATTEMPTS; attempts++ { + if self.conn == nil { + self.reconnect() + } else { + err = binary.Write(self.conn, binary.LittleEndian, uint32(len(data))) + if err == nil { + _, err = self.conn.Write(data) + if err == nil { + return nil + } + } + log.Println("ProtobufClient: error making request: ", err) + // TODO: do something smarter here based on whatever the error is. + // failed to make the request, reconnect and try again. + self.reconnect() + } + } + + // if we got here it errored out, clear out the request + self.requestBufferLock.Lock() + delete(self.requestBuffer, *request.Id) + self.requestBufferLock.Unlock() + return err +} + +func (self *ProtobufClient) readResponses() { + message := make([]byte, 0, MAX_RESPONSE_SIZE) + buff := bytes.NewBuffer(message) + for { + if self.conn == nil { + self.reconnect() + } else { + var messageSizeU uint32 + var err error + if err = binary.Read(self.conn, binary.LittleEndian, &messageSizeU); err == nil { + messageSize := int64(messageSizeU) + messageReader := io.LimitReader(self.conn, messageSize) + if _, err = io.Copy(buff, messageReader); err == nil { + response, err := protocol.DecodeResponse(buff) + if err != nil { + log.Println("ProtobufClient: error unmarshaling response: ", err) + } else { + self.sendResponse(response) + } + } + } + } + buff.Reset() + } +} + +func (self *ProtobufClient) sendResponse(response *protocol.Response) { + self.requestBufferLock.RLock() + req, ok := self.requestBuffer[*response.RequestId] + self.requestBufferLock.RUnlock() + if ok { + req.responseChan <- response + if *response.Type == protocol.Response_END_STREAM || *response.Type == protocol.Response_WRITE_OK { + close(req.responseChan) + self.requestBufferLock.Lock() + delete(self.requestBuffer, *response.RequestId) + self.requestBufferLock.Unlock() + } + } +} + +func (self *ProtobufClient) reconnect() { + swapped := atomic.CompareAndSwapUint32(&self.connectionStatus, IS_CONNECTED, IS_RECONNECTING) + + // if it's not swapped, some other goroutine is already handling the reconect. Wait for it + if !swapped { + self.reconnectWait.Wait() + return + } + self.reconnectWait.Add(1) + + self.Close() + conn, err := net.Dial("tcp", self.hostAndPort) + if err == nil { + self.conn = conn + log.Println("ProtobufClient: connected to ", self.hostAndPort) + } else { + log.Println("ProtobufClient: failed to connect to ", self.hostAndPort) + time.Sleep(RECONNECT_RETRY_WAIT) + } + self.connectionStatus = IS_CONNECTED + self.reconnectWait.Done() + return +} + +func (self *ProtobufClient) peridicallySweepTimedOutRequests() { + for { + time.Sleep(time.Minute) + self.requestBufferLock.Lock() + maxAge := time.Now().Add(-MAX_REQUEST_TIME) + for k, req := range self.requestBuffer { + if req.timeMade.Before(maxAge) { + delete(self.requestBuffer, k) + log.Println("Request timed out.") + } + } + self.requestBufferLock.Unlock() + } +} diff --git a/src/coordinator/protobuf_request_handler.go b/src/coordinator/protobuf_request_handler.go new file mode 100644 index 00000000000..1e0ca24d27b --- /dev/null +++ b/src/coordinator/protobuf_request_handler.go @@ -0,0 +1,166 @@ +package coordinator + +import ( + "bytes" + "common" + "datastore" + "encoding/binary" + "errors" + "log" + "net" + "parser" + "protocol" +) + +type ProtobufRequestHandler struct { + db datastore.Datastore + coordinator Coordinator + clusterConfig *ClusterConfiguration + writeOk protocol.Response_Type +} + +var replayReplicationEnd = protocol.Response_REPLICATION_REPLAY_END +var responseReplicationReplay = protocol.Response_REPLICATION_REPLAY + +func NewProtobufRequestHandler(db datastore.Datastore, coordinator Coordinator, clusterConfig *ClusterConfiguration) *ProtobufRequestHandler { + return &ProtobufRequestHandler{db: db, coordinator: coordinator, writeOk: protocol.Response_WRITE_OK, clusterConfig: clusterConfig} +} + +func (self *ProtobufRequestHandler) HandleRequest(request *protocol.Request, conn net.Conn) error { + if *request.Type == protocol.Request_PROXY_WRITE { + response := &protocol.Response{RequestId: request.Id, Type: &self.writeOk} + + location := common.RingLocation(request.Database, request.Series.Name, request.Series.Points[0].Timestamp) + ownerId := self.clusterConfig.GetOwnerIdByLocation(&location) + request.OriginatingServerId = &self.clusterConfig.localServerId + // TODO: make request logging and datastore write atomic + replicationFactor := self.clusterConfig.GetReplicationFactor(request.Database) + err := self.db.LogRequestAndAssignSequenceNumber(request, &replicationFactor, ownerId) + if err != nil { + return err + } + err = self.db.WriteSeriesData(*request.Database, request.Series) + if err != nil { + return err + } + err = self.WriteResponse(conn, response) + // TODO: add quorum writes? + self.coordinator.ReplicateWrite(request) + return err + } else if *request.Type == protocol.Request_PROXY_DELETE { + + } else if *request.Type == protocol.Request_REPLICATION_WRITE { + // TODO: check the request id and server and make sure it's next (+1 from last one from the server). + // If so, write. If not, request replay. + // TODO: log replication writes so the can be retrieved from other servers + location := common.RingLocation(request.Database, request.Series.Name, request.Series.Points[0].Timestamp) + ownerId := self.clusterConfig.GetOwnerIdByLocation(&location) + replicationFactor := self.clusterConfig.GetReplicationFactor(request.Database) + // TODO: make request logging and datastore write atomic + err := self.db.LogRequestAndAssignSequenceNumber(request, &replicationFactor, ownerId) + if err != nil { + switch err := err.(type) { + case datastore.SequenceMissingRequestsError: + go self.coordinator.ReplayReplication(request, &replicationFactor, ownerId, &err.LastKnownRequestSequence) + return nil + default: + return err + } + } + self.db.WriteSeriesData(*request.Database, request.Series) + return nil + } else if *request.Type == protocol.Request_REPLICATION_DELETE { + + } else if *request.Type == protocol.Request_QUERY { + go self.handleQuery(request, conn) + } else if *request.Type == protocol.Request_REPLICATION_REPLAY { + self.handleReplay(request, conn) + } else { + log.Println("unknown request type: ", request) + return errors.New("Unknown request type") + } + return nil +} + +func (self *ProtobufRequestHandler) handleReplay(request *protocol.Request, conn net.Conn) { + sendRequest := func(loggedRequestData *[]byte) error { + var response *protocol.Response + if loggedRequestData != nil { + loggedRequest, err := protocol.DecodeRequest(bytes.NewBuffer(*loggedRequestData)) + if err != nil { + return err + } + response = &protocol.Response{Type: &responseReplicationReplay, Request: loggedRequest, RequestId: request.Id} + } else { + response = &protocol.Response{Type: &replayReplicationEnd, RequestId: request.Id} + } + return self.WriteResponse(conn, response) + } + replicationFactor8 := uint8(*request.ReplicationFactor) + err := self.db.ReplayRequestsFromSequenceNumber( + request.ClusterVersion, + request.OriginatingServerId, + request.OwnerServerId, + &replicationFactor8, + request.LastKnownSequenceNumber, + sendRequest) + if err != nil { + log.Println("REPLAY ERROR: ", err) + } +} + +func (self *ProtobufRequestHandler) handleQuery(request *protocol.Request, conn net.Conn) { + var nextPoint *protocol.Point + assignNextPointTimesAndSend := func(series *protocol.Series) error { + pointCount := len(series.Points) + if pointCount <= 1 { + if nextPoint != nil { + series.Points = append(series.Points, nextPoint) + } + response := &protocol.Response{Type: &endStreamResponse, Series: series, RequestId: request.Id} + + self.WriteResponse(conn, response) + return nil + } + oldNextPoint := nextPoint + nextPoint = series.Points[pointCount-1] + series.Points[pointCount-1] = nil + if oldNextPoint != nil { + copy(series.Points[1:], series.Points[0:]) + series.Points[0] = oldNextPoint + } else { + series.Points = series.Points[:len(series.Points)-1] + } + + response := &protocol.Response{Series: series, Type: &queryResponse, RequestId: request.Id} + if nextPoint != nil { + response.NextPointTime = nextPoint.Timestamp + } + err := self.WriteResponse(conn, response) + return err + } + // the query should always parse correctly since it was parsed at the originating server. + query, _ := parser.ParseQuery(*request.Query) + user := self.clusterConfig.GetDbUser(*request.Database, *request.UserName) + + var ringFilter func(database, series *string, time *int64) bool + if request.RingLocationsToQuery != nil { + ringFilter = self.clusterConfig.GetRingFilterFunction(*request.Database, *request.RingLocationsToQuery) + } + self.db.ExecuteQuery(user, *request.Database, query, assignNextPointTimesAndSend, ringFilter) +} + +func (self *ProtobufRequestHandler) WriteResponse(conn net.Conn, response *protocol.Response) error { + data, err := response.Encode() + if err != nil { + log.Println("ProtobufRequestHandler error encoding response: ", err) + return err + } + err = binary.Write(conn, binary.LittleEndian, uint32(len(data))) + if err != nil { + log.Println("ProtobufRequestHandler error writing response length: ", err) + return err + } + _, err = conn.Write(data) + return err +} diff --git a/src/coordinator/protobuf_server.go b/src/coordinator/protobuf_server.go new file mode 100644 index 00000000000..a5eaa6ba29d --- /dev/null +++ b/src/coordinator/protobuf_server.go @@ -0,0 +1,152 @@ +package coordinator + +import ( + "bytes" + "encoding/binary" + "io" + "log" + "net" + "protocol" + "sync" + "time" +) + +type ProtobufServer struct { + listener net.Listener + port string + requestHandler RequestHandler + connectionMapLock sync.Mutex + connectionMap map[net.Conn]bool +} + +const MAX_REQUEST_SIZE = 1024 + +func NewProtobufServer(port string, requestHandler RequestHandler) *ProtobufServer { + server := &ProtobufServer{port: port, requestHandler: requestHandler, connectionMap: make(map[net.Conn]bool)} + return server +} + +func (self *ProtobufServer) Close() { + self.listener.Close() + self.connectionMapLock.Lock() + defer self.connectionMapLock.Unlock() + for conn, _ := range self.connectionMap { + conn.Close() + } + + // loop while the port is still accepting connections + for { + _, port, _ := net.SplitHostPort(self.port) + conn, err := net.Dial("tcp", "localhost:"+port) + if err != nil { + log.Printf("Received error %s, assuming connection is closed.", err) + break + } + conn.Close() + + log.Println("Waiting while the server port is closing") + time.Sleep(1 * time.Second) + } +} + +func (self *ProtobufServer) ListenAndServe() { + ln, err := net.Listen("tcp", self.port) + if err != nil { + panic(err) + } + self.listener = ln + log.Println("ProtobufServer listening on ", self.port) + for { + conn, err := ln.Accept() + if err != nil { + log.Println("Error with TCP connection. Assuming server is closing: ", err, conn) + break + } + self.connectionMapLock.Lock() + self.connectionMap[conn] = true + self.connectionMapLock.Unlock() + go self.handleConnection(conn) + } +} + +func (self *ProtobufServer) handleConnection(conn net.Conn) { + log.Println("ProtobufServer: client connected: ", conn.RemoteAddr().String()) + + message := make([]byte, 0, MAX_REQUEST_SIZE) + buff := bytes.NewBuffer(message) + var messageSizeU uint32 + for { + err := binary.Read(conn, binary.LittleEndian, &messageSizeU) + if err != nil { + log.Println("ProtobufServer: Error reading from connection: ", conn.RemoteAddr().String(), err) + self.connectionMapLock.Lock() + delete(self.connectionMap, conn) + self.connectionMapLock.Unlock() + conn.Close() + return + } + + messageSize := int64(messageSizeU) + if messageSize > MAX_REQUEST_SIZE { + err = self.handleRequestTooLarge(conn, messageSize, buff) + } else { + err = self.handleRequest(conn, messageSize, buff) + } + + if err != nil { + log.Println("Error, closing connection: ", err) + self.connectionMapLock.Lock() + delete(self.connectionMap, conn) + self.connectionMapLock.Unlock() + conn.Close() + return + } + buff.Reset() + } +} + +func (self *ProtobufServer) handleRequest(conn net.Conn, messageSize int64, buff *bytes.Buffer) error { + reader := io.LimitReader(conn, messageSize) + _, err := io.Copy(buff, reader) + if err != nil { + return err + } + request, err := protocol.DecodeRequest(buff) + if err != nil { + return err + } + + return self.requestHandler.HandleRequest(request, conn) +} + +func (self *ProtobufServer) handleRequestTooLarge(conn net.Conn, messageSize int64, buff *bytes.Buffer) error { + log.Println("ProtobufServer: request too large, dumping: ", conn.RemoteAddr().String(), messageSize) + for messageSize > 0 { + reader := io.LimitReader(conn, MAX_REQUEST_SIZE) + _, err := io.Copy(buff, reader) + if err != nil { + return err + } + messageSize -= MAX_REQUEST_SIZE + buff.Reset() + } + return self.sendErrorResponse(conn, protocol.Response_REQUEST_TOO_LARGE, "request too large") +} + +func (self *ProtobufServer) sendErrorResponse(conn net.Conn, code protocol.Response_ErrorCode, message string) error { + response := &protocol.Response{ErrorCode: &code, ErrorMessage: &message} + data, err := response.Encode() + if err != nil { + return err + } + + buff := bytes.NewBuffer(make([]byte, 0, len(data)+4)) + err = binary.Write(buff, binary.LittleEndian, uint32(len(data))) + + if err != nil { + return err + } + + _, err = conn.Write(append(buff.Bytes(), data...)) + return err +} diff --git a/src/coordinator/raft_server.go b/src/coordinator/raft_server.go index 818761a3959..3e97f9ee996 100644 --- a/src/coordinator/raft_server.go +++ b/src/coordinator/raft_server.go @@ -2,6 +2,7 @@ package coordinator import ( "bytes" + "configuration" "encoding/json" "errors" "fmt" @@ -13,6 +14,7 @@ import ( "net" "net/http" "path/filepath" + "protocol" "strings" "sync" "time" @@ -37,12 +39,14 @@ type RaftServer struct { mutex sync.RWMutex listener net.Listener closing bool + config *configuration.Configuration } var registeredCommands bool +var replicateWrite = protocol.Request_REPLICATION_WRITE // Creates a new server. -func NewRaftServer(path string, host string, port int, clusterConfig *ClusterConfiguration) *RaftServer { +func NewRaftServer(config *configuration.Configuration, clusterConfig *ClusterConfiguration) *RaftServer { if !registeredCommands { registeredCommands = true raft.RegisterCommand(&AddPotentialServerCommand{}) @@ -52,19 +56,21 @@ func NewRaftServer(path string, host string, port int, clusterConfig *ClusterCon raft.RegisterCommand(&SaveDbUserCommand{}) raft.RegisterCommand(&SaveClusterAdminCommand{}) } + s := &RaftServer{ - host: host, - port: port, - path: path, + host: config.HostnameOrDetect(), + port: config.RaftServerPort, + path: config.RaftDir, clusterConfig: clusterConfig, router: mux.NewRouter(), + config: config, } // Read existing name or generate a new one. - if b, err := ioutil.ReadFile(filepath.Join(path, "name")); err == nil { + if b, err := ioutil.ReadFile(filepath.Join(s.path, "name")); err == nil { s.name = string(b) } else { s.name = fmt.Sprintf("%07x", rand.Int())[0:7] - if err = ioutil.WriteFile(filepath.Join(path, "name"), []byte(s.name), 0644); err != nil { + if err = ioutil.WriteFile(filepath.Join(s.path, "name"), []byte(s.name), 0644); err != nil { panic(err) } } @@ -122,8 +128,11 @@ func (s *RaftServer) doOrProxyCommand(command raft.Command, commandType string) return nil, nil } -func (s *RaftServer) CreateDatabase(name string) error { - command := NewCreateDatabaseCommand(name) +func (s *RaftServer) CreateDatabase(name string, replicationFactor uint8) error { + if replicationFactor == 0 { + replicationFactor = 1 + } + command := NewCreateDatabaseCommand(name, replicationFactor) _, err := s.doOrProxyCommand(command, "create_db") return err } @@ -173,14 +182,6 @@ func (s *RaftServer) connectionString() string { } func (s *RaftServer) startRaft(potentialLeaders []string, retryUntilJoin bool) { - // there's a race condition in goraft that will cause the server to panic - // while shutting down - defer func() { - if err := recover(); err != nil { - fmt.Printf("Raft paniced: %v\n", err) - } - }() - log.Printf("Initializing Raft Server: %s %d", s.path, s.port) // Initialize and start Raft server. @@ -214,15 +215,16 @@ func (s *RaftServer) startRaft(potentialLeaders []string, retryUntilJoin bool) { name := s.raftServer.Name() connectionString := s.connectionString() - _, err := s.raftServer.Do(&raft.DefaultJoinCommand{ - Name: name, - ConnectionString: connectionString, + _, err := s.raftServer.Do(&InfluxJoinCommand{ + Name: name, + ConnectionString: connectionString, + ProtobufConnectionString: s.config.ProtobufConnectionString(), }) if err != nil { log.Fatal(err) } - command := NewAddPotentialServerCommand(&ClusterServer{RaftName: name, RaftConnectionString: connectionString}) + command := NewAddPotentialServerCommand(&ClusterServer{RaftName: name, RaftConnectionString: connectionString, ProtobufConnectionString: s.config.ProtobufConnectionString()}) s.doOrProxyCommand(command, "add_server") s.CreateRootUser() break @@ -283,9 +285,10 @@ func (s *RaftServer) HandleFunc(pattern string, handler func(http.ResponseWriter // Joins to the leader of an existing cluster. func (s *RaftServer) Join(leader string) error { - command := &raft.DefaultJoinCommand{ - Name: s.raftServer.Name(), - ConnectionString: s.connectionString(), + command := &InfluxJoinCommand{ + Name: s.raftServer.Name(), + ConnectionString: s.connectionString(), + ProtobufConnectionString: s.config.ProtobufConnectionString(), } connectUrl := leader if !strings.HasPrefix(connectUrl, "http://") { @@ -326,7 +329,7 @@ func (s *RaftServer) retryCommand(command raft.Command, retries int) (ret interf func (s *RaftServer) joinHandler(w http.ResponseWriter, req *http.Request) { if s.raftServer.State() == raft.Leader { - command := &raft.DefaultJoinCommand{} + command := &InfluxJoinCommand{} if err := json.NewDecoder(req.Body).Decode(&command); err != nil { http.Error(w, err.Error(), http.StatusInternalServerError) return @@ -339,7 +342,7 @@ func (s *RaftServer) joinHandler(w http.ResponseWriter, req *http.Request) { server := s.clusterConfig.GetServerByRaftName(command.Name) // it's a new server the cluster has never seen, make it a potential if server == nil { - addServer := NewAddPotentialServerCommand(&ClusterServer{RaftName: command.Name, RaftConnectionString: command.ConnectionString}) + addServer := NewAddPotentialServerCommand(&ClusterServer{RaftName: command.Name, RaftConnectionString: command.ConnectionString, ProtobufConnectionString: command.ProtobufConnectionString}) if _, err := s.raftServer.Do(addServer); err != nil { http.Error(w, err.Error(), http.StatusInternalServerError) return @@ -358,7 +361,7 @@ func (s *RaftServer) joinHandler(w http.ResponseWriter, req *http.Request) { func (s *RaftServer) configHandler(w http.ResponseWriter, req *http.Request) { jsonObject := make(map[string]interface{}) dbs := make([]string, 0) - for db, _ := range s.clusterConfig.databaseNames { + for db, _ := range s.clusterConfig.databaseReplicationFactors { dbs = append(dbs, db) } jsonObject["databases"] = dbs diff --git a/src/coordinator/user.go b/src/coordinator/user.go index 00858f47bf8..28aec0cdd71 100644 --- a/src/coordinator/user.go +++ b/src/coordinator/user.go @@ -2,7 +2,7 @@ package coordinator import ( "code.google.com/p/go.crypto/bcrypt" - "github.com/pmylund/go-cache" + "github.com/influxdb/go-cache" "regexp" ) diff --git a/src/daemon/influxd.go b/src/daemon/influxd.go new file mode 100644 index 00000000000..894daadd1d1 --- /dev/null +++ b/src/daemon/influxd.go @@ -0,0 +1,116 @@ +package main + +import ( + "configuration" + "coordinator" + "flag" + "fmt" + "io/ioutil" + "log" + "os" + "os/signal" + "runtime" + "server" + "strconv" + "syscall" + "time" +) + +const ( + version = "dev" + gitSha = "HEAD" +) + +func waitForSignals(stopped <-chan bool) { + ch := make(chan os.Signal) + signal.Notify(ch, syscall.SIGTERM, syscall.SIGINT) + for { + sig := <-ch + fmt.Printf("Received signal: %s\n", sig.String()) + switch sig { + case syscall.SIGINT, syscall.SIGTERM: + runtime.SetCPUProfileRate(0) + <-stopped + os.Exit(0) + } + } +} + +func startProfiler(filename *string) error { + if filename == nil || *filename == "" { + return nil + } + + cpuProfileFile, err := os.Create(*filename) + if err != nil { + return err + } + runtime.SetCPUProfileRate(500) + stopped := make(chan bool) + + go waitForSignals(stopped) + + go func() { + for { + select { + default: + data := runtime.CPUProfile() + if data == nil { + cpuProfileFile.Close() + stopped <- true + break + } + cpuProfileFile.Write(data) + } + } + }() + return nil +} + +func main() { + fileName := flag.String("config", "config.json.sample", "Config file") + wantsVersion := flag.Bool("v", false, "Get version number") + resetRootPassword := flag.Bool("reset-root", false, "Reset root password") + pidFile := flag.String("pidfile", "", "the pid file") + cpuProfiler := flag.String("cpuprofile", "", "filename where cpu profile data will be written") + + runtime.GOMAXPROCS(runtime.NumCPU()) + flag.Parse() + + startProfiler(cpuProfiler) + + if wantsVersion != nil && *wantsVersion { + fmt.Printf("InfluxDB v%s (git: %s)\n", version, gitSha) + return + } + config := configuration.LoadConfiguration(*fileName) + + if pidFile != nil && *pidFile != "" { + pid := strconv.Itoa(os.Getpid()) + if err := ioutil.WriteFile(*pidFile, []byte(pid), 0644); err != nil { + panic(err) + } + } + + log.Println("Starting Influx Server...") + os.MkdirAll(config.RaftDir, 0744) + os.MkdirAll(config.DataDir, 0744) + server, err := server.NewServer(config) + if err != nil { + panic(err) + } + + if *resetRootPassword { + // TODO: make this not suck + // This is ghetto as hell, but it'll work for now. + go func() { + time.Sleep(2 * time.Second) // wait for the raft server to join the cluster + + fmt.Printf("Resetting root's password to %s", coordinator.DEFAULT_ROOT_PWD) + if err := server.RaftServer.CreateRootUser(); err != nil { + panic(err) + } + }() + } + server.ListenAndServe() +} diff --git a/src/datastore/datastore_test.go b/src/datastore/datastore_test.go index f0c1de58f19..10647c9f9af 100644 --- a/src/datastore/datastore_test.go +++ b/src/datastore/datastore_test.go @@ -62,7 +62,7 @@ func executeQuery(user common.User, database, query string, db Datastore, c *C) } return nil } - err := db.ExecuteQuery(user, database, q, yield) + err := db.ExecuteQuery(user, database, q, yield, nil) c.Assert(err, IsNil) return resultSeries } @@ -96,7 +96,7 @@ func (self *DatastoreSuite) TestPropagateErrorsProperly(c *C) { return fmt.Errorf("Whatever") } user := &MockUser{} - err = db.ExecuteQuery(user, "test", q, yield) + err = db.ExecuteQuery(user, "test", q, yield, nil) c.Assert(err, ErrorMatches, "Whatever") } @@ -133,8 +133,13 @@ func (self *DatastoreSuite) TestDeletingData(c *C) { } c.Assert(db.DropDatabase("test"), IsNil) user := &MockUser{} - err = db.ExecuteQuery(user, "test", q, yield) - c.Assert(err, ErrorMatches, ".*Field value doesn't exist.*") + err = db.ExecuteQuery(user, "test", q, yield, nil) + + // we don't have an error any more on query for fields that don't exist. + // This is because of the clustering. Some servers could have some fields + // while others don't. To be expected. + // c.Assert(err, ErrorMatches, ".*Field value doesn't exist.*") + c.Assert(err, IsNil) } func (self *DatastoreSuite) TestCanWriteAndRetrievePointsWithAlias(c *C) { @@ -176,7 +181,7 @@ func (self *DatastoreSuite) TestCanWriteAndRetrievePointsWithAlias(c *C) { return nil } user := &MockUser{} - err = db.ExecuteQuery(user, "test", q, yield) + err = db.ExecuteQuery(user, "test", q, yield, nil) c.Assert(err, IsNil) // we should get the actual data and the end of series data // indicator , i.e. a series with no points @@ -228,21 +233,21 @@ func (self *DatastoreSuite) TestCanWriteAndRetrievePoints(c *C) { return nil } user := &MockUser{} - err = db.ExecuteQuery(user, "test", q, yield) + err = db.ExecuteQuery(user, "test", q, yield, nil) c.Assert(err, IsNil) // we should get the actual data and the end of series data // indicator , i.e. a series with no points c.Assert(resultSeries, HasLen, 2) c.Assert(resultSeries[0].Points, HasLen, 2) c.Assert(resultSeries[0].Fields, HasLen, 1) - c.Assert(*resultSeries[0].Points[0].SequenceNumber, Equals, uint32(2)) - c.Assert(*resultSeries[0].Points[1].SequenceNumber, Equals, uint32(1)) + c.Assert(*resultSeries[0].Points[0].SequenceNumber, Equals, uint64(2)) + c.Assert(*resultSeries[0].Points[1].SequenceNumber, Equals, uint64(1)) c.Assert(*resultSeries[0].Points[0].GetTimestampInMicroseconds(), Equals, pointTime*1000000) c.Assert(*resultSeries[0].Points[1].GetTimestampInMicroseconds(), Equals, pointTime*1000000) c.Assert(*resultSeries[0].Points[0].Values[0].Int64Value, Equals, int64(2)) c.Assert(*resultSeries[0].Points[1].Values[0].Int64Value, Equals, int64(3)) c.Assert(resultSeries[1].Points, HasLen, 0) - c.Assert(resultSeries[1].Fields, HasLen, 1) + c.Assert(resultSeries[1].Fields, HasLen, 0) c.Assert(resultSeries, Not(DeepEquals), series) } @@ -319,8 +324,8 @@ func (self *DatastoreSuite) TestCanWriteDataWithDifferentTimesAndSeries(c *C) { results = executeQuery(user, "db1", "select blah from events;", db, c) c.Assert(results[0].Points, HasLen, 2) c.Assert(results[0].Fields, HasLen, 1) - c.Assert(*results[0].Points[0].SequenceNumber, Equals, uint32(1)) - c.Assert(*results[0].Points[1].SequenceNumber, Equals, uint32(3)) + c.Assert(*results[0].Points[0].SequenceNumber, Equals, uint64(1)) + c.Assert(*results[0].Points[1].SequenceNumber, Equals, uint64(3)) c.Assert(*results[0].Points[0].GetTimestampInMicroseconds(), Equals, now*1000000) c.Assert(*results[0].Points[1].GetTimestampInMicroseconds(), Equals, secondAgo*1000000) c.Assert(*results[0].Points[0].Values[0].DoubleValue, Equals, float64(0.1)) @@ -392,8 +397,8 @@ func (self *DatastoreSuite) TestCanQueryBasedOnTime(c *C) { results = executeQuery(user, "db1", "select val from foo;", db, c) c.Assert(results[0].Points, HasLen, 2) c.Assert(results[0].Fields, HasLen, 1) - c.Assert(*results[0].Points[0].SequenceNumber, Equals, uint32(3)) - c.Assert(*results[0].Points[1].SequenceNumber, Equals, uint32(3)) + c.Assert(*results[0].Points[0].SequenceNumber, Equals, uint64(3)) + c.Assert(*results[0].Points[1].SequenceNumber, Equals, uint64(3)) c.Assert(*results[0].Points[0].GetTimestampInMicroseconds(), Equals, now*1000000) c.Assert(*results[0].Points[1].GetTimestampInMicroseconds(), Equals, minutesAgo*1000000) c.Assert(*results[0].Points[0].Values[0].Int64Value, Equals, int64(3)) @@ -420,7 +425,7 @@ func (self *DatastoreSuite) TestCanDoWhereQueryEquals(c *C) { results = executeQuery(user, "db1", "select name from events where name == 'paul';", db, c) c.Assert(results[0].Points, HasLen, 1) c.Assert(results[0].Fields, HasLen, 1) - c.Assert(*results[0].Points[0].SequenceNumber, Equals, uint32(2)) + c.Assert(*results[0].Points[0].SequenceNumber, Equals, uint64(2)) c.Assert(*results[0].Points[0].Values[0].StringValue, Equals, "paul") } @@ -463,9 +468,9 @@ func (self *DatastoreSuite) TestCanDoCountStarQueries(c *C) { results := executeQuery(user, "foobar", "select count(*) from user_things;", db, c) c.Assert(results[0].Points, HasLen, 2) c.Assert(results[0].Fields, HasLen, 1) - c.Assert(*results[0].Points[0].SequenceNumber, Equals, uint32(2)) + c.Assert(*results[0].Points[0].SequenceNumber, Equals, uint64(2)) c.Assert(*results[0].Points[0].Values[0].Int64Value, Equals, int64(3)) - c.Assert(*results[0].Points[1].SequenceNumber, Equals, uint32(1)) + c.Assert(*results[0].Points[1].SequenceNumber, Equals, uint64(1)) c.Assert(*results[0].Points[1].Values[0].Int64Value, Equals, int64(1)) } @@ -488,7 +493,7 @@ func (self *DatastoreSuite) TestLimitsPointsReturnedBasedOnQuery(c *C) { results := executeQuery(user, "foobar", "select name from user_things limit 1;", db, c) c.Assert(results[0].Points, HasLen, 1) c.Assert(results[0].Fields, HasLen, 1) - c.Assert(*results[0].Points[0].SequenceNumber, Equals, uint32(2)) + c.Assert(*results[0].Points[0].SequenceNumber, Equals, uint64(2)) c.Assert(*results[0].Points[0].Values[0].StringValue, Equals, "paul") } @@ -744,7 +749,7 @@ func (self *DatastoreSuite) TestCanSelectFromARegex(c *C) { } return nil } - err = db.ExecuteQuery(user, "foobar", q, yield) + err = db.ExecuteQuery(user, "foobar", q, yield, nil) c.Assert(err, IsNil) c.Assert(resultSeries, HasLen, 2) c.Assert(resultSeries[0], DeepEquals, otherSeries) @@ -769,7 +774,7 @@ func (self *DatastoreSuite) TestBreaksLargeResultsIntoMultipleBatches(c *C) { for i := 0; i < 50000; i++ { for _, p := range series.Points { sequence += 1 - s := uint32(sequence) + s := uint64(sequence) p.SequenceNumber = &s } writtenPoints += 2 @@ -785,7 +790,7 @@ func (self *DatastoreSuite) TestBreaksLargeResultsIntoMultipleBatches(c *C) { return nil } user := &MockUser{} - err := db.ExecuteQuery(user, "foobar", q, yield) + err := db.ExecuteQuery(user, "foobar", q, yield, nil) c.Assert(err, IsNil) c.Assert(len(resultSeries), InRange, 2, 20) pointCount := 0 @@ -831,7 +836,7 @@ func (self *DatastoreSuite) TestCheckReadAccess(c *C) { } return nil } - err = db.ExecuteQuery(user, "foobar", q, yield) + err = db.ExecuteQuery(user, "foobar", q, yield, nil) c.Assert(err, ErrorMatches, ".*one or more.*") c.Assert(len(resultSeries), Equals, 1) c.Assert(*resultSeries[0].Name, Equals, "user_things") @@ -879,7 +884,7 @@ func (self *DatastoreSuite) TestCheckWriteAccess(c *C) { } return nil } - err = db.ExecuteQuery(user, "foobar", q, yield) + err = db.ExecuteQuery(user, "foobar", q, yield, nil) c.Assert(err, IsNil) c.Assert(resultSeries, HasLen, 1) c.Assert(resultSeries[0], DeepEquals, otherSeries) diff --git a/src/datastore/interface.go b/src/datastore/interface.go index fda4254ea13..58a06a7b65d 100644 --- a/src/datastore/interface.go +++ b/src/datastore/interface.go @@ -9,7 +9,16 @@ import ( ) type Datastore interface { - ExecuteQuery(user common.User, database string, query *parser.Query, yield func(*protocol.Series) error) error + ExecuteQuery(user common.User, database string, + query *parser.Query, yield func(*protocol.Series) error, + ringFilter func(database, series *string, time *int64) bool) error + // Logs the request to a local store and assigns a sequence number that is unique per server id per day + LogRequestAndAssignSequenceNumber(request *protocol.Request, replicationFactor *uint8, ownerServerId *uint32) error + // will replay all requests from a given number. If the number hasn't occured yet today, it replays from yesterday. + // So this log replay is only meant to work for outages that last less than maybe 12 hours. + ReplayRequestsFromSequenceNumber(*uint32, *uint32, *uint32, *uint8, *uint64, func(*[]byte) error) error + // Increment the named integer by the given amount and return the new value + AtomicIncrement(name string, val int) (uint64, error) WriteSeriesData(database string, series *protocol.Series) error DropDatabase(database string) error DeleteRangeOfSeries(database, series string, startTime, endTime time.Time) error diff --git a/src/datastore/leveldb_datastore.go b/src/datastore/leveldb_datastore.go index 03ebae21596..8d4f7752dfd 100644 --- a/src/datastore/leveldb_datastore.go +++ b/src/datastore/leveldb_datastore.go @@ -3,13 +3,16 @@ package datastore import ( "bytes" "code.google.com/p/goprotobuf/proto" + log "code.google.com/p/log4go" "common" "encoding/binary" "errors" "fmt" "github.com/jmhodges/levigo" "math" + "os" "parser" + "path/filepath" "protocol" "regexp" "strings" @@ -18,11 +21,17 @@ import ( ) type LevelDbDatastore struct { - db *levigo.DB - lastIdUsed uint64 - columnIdMutex sync.Mutex - readOptions *levigo.ReadOptions - writeOptions *levigo.WriteOptions + db *levigo.DB + requestLogLock sync.RWMutex + currentRequestLog *requestLogDb + previousRequestLog *requestLogDb + lastIdUsed uint64 + columnIdMutex sync.Mutex + readOptions *levigo.ReadOptions + writeOptions *levigo.WriteOptions + incrementLock sync.Mutex + requestId uint32 + requestLogDir string } type Field struct { @@ -36,6 +45,40 @@ type rawColumnValue struct { value []byte } +type requestLogDb struct { + dir string + db *levigo.DB +} + +func (self *requestLogDb) delete() error { + log.Warn("Deleting request log: ", self.dir) + self.db.Close() + return os.RemoveAll(self.dir) +} + +func getRequestLogDirForDate(baseDir string, t time.Time) string { + logDir := fmt.Sprintf("%d-%0.2d-%0.2d", t.Year(), t.Month, t.Day()) + return filepath.Join(baseDir, logDir) +} + +func NewRequestLogDb(dir string) (*requestLogDb, error) { + err := os.MkdirAll(dir, 0744) + if err != nil { + return nil, err + } + opts := levigo.NewOptions() + opts.SetCache(levigo.NewLRUCache(ONE_MEGABYTE)) + opts.SetCreateIfMissing(true) + opts.SetBlockSize(TWO_FIFTY_SIX_KILOBYTES) + filter := levigo.NewBloomFilter(BLOOM_FILTER_BITS_PER_KEY) + opts.SetFilterPolicy(filter) + db, err := levigo.Open(dir, opts) + if err != nil { + return nil, err + } + return &requestLogDb{dir: dir, db: db}, nil +} + // depending on the query order (whether it's ascending or not) returns // the min (or max in case of descending query) of the current // [timestamp,sequence] and the self's [timestamp,sequence] @@ -69,15 +112,25 @@ func (self *rawColumnValue) updatePointTimeAndSequence(currentTimeRaw, currentSe } const ( - ONE_MEGABYTE = 1024 * 1024 - ONE_GIGABYTE = ONE_MEGABYTE * 1024 - TWO_FIFTY_SIX_KILOBYTES = 256 * 1024 - BLOOM_FILTER_BITS_PER_KEY = 64 - MAX_POINTS_TO_SCAN = 1000000 - MAX_SERIES_SIZE = ONE_MEGABYTE + ONE_MEGABYTE = 1024 * 1024 + ONE_GIGABYTE = ONE_MEGABYTE * 1024 + TWO_FIFTY_SIX_KILOBYTES = 256 * 1024 + BLOOM_FILTER_BITS_PER_KEY = 64 + MAX_POINTS_TO_SCAN = 1000000 + MAX_SERIES_SIZE = ONE_MEGABYTE + REQUEST_SEQUENCE_NUMBER_KEY = "r" + REQUEST_LOG_BASE_DIR = "request_logs" + DATABASE_DIR = "db" + REQUEST_LOG_ROTATION_PERIOD = 24 * time.Hour + HOUR_TO_ROTATE_REQUEST_LOG = 0 + MINUTE_TO_ROTATE_REQUEST_LOG = 1 ) var ( + + // This datastore implements the PersistentAtomicInteger interface. All of the persistent + // integers start with this prefix, followed by their name + ATOMIC_INCREMENT_PREFIX = []byte{0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFD} // NEXT_ID_KEY holds the next id. ids are used to "intern" timeseries and column names NEXT_ID_KEY = []byte{0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00} // SERIES_COLUMN_INDEX_PREFIX is the prefix of the series to column names index @@ -85,9 +138,27 @@ var ( // DATABASE_SERIES_INDEX_PREFIX is the prefix of the database to series names index DATABASE_SERIES_INDEX_PREFIX = []byte{0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF} MAX_SEQUENCE = []byte{0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF} + + replicateWrite = protocol.Request_REPLICATION_WRITE ) func NewLevelDbDatastore(dbDir string) (Datastore, error) { + mainDbDir := filepath.Join(dbDir, DATABASE_DIR) + requestLogDir := filepath.Join(dbDir, REQUEST_LOG_BASE_DIR) + + err := os.MkdirAll(mainDbDir, 0744) + if err != nil { + return nil, err + } + previousLog, err := NewRequestLogDb(getRequestLogDirForDate(requestLogDir, time.Now().Add(-time.Hour*24))) + if err != nil { + return nil, err + } + currentLog, err := NewRequestLogDb(getRequestLogDirForDate(requestLogDir, time.Now())) + if err != nil { + return nil, err + } + opts := levigo.NewOptions() opts.SetCache(levigo.NewLRUCache(ONE_GIGABYTE)) opts.SetCreateIfMissing(true) @@ -116,12 +187,87 @@ func NewLevelDbDatastore(dbDir string) (Datastore, error) { wo := levigo.NewWriteOptions() - return &LevelDbDatastore{db: db, lastIdUsed: lastId, readOptions: ro, writeOptions: wo}, nil + leveldbStore := &LevelDbDatastore{ + db: db, + lastIdUsed: lastId, + readOptions: ro, + writeOptions: wo, + requestLogDir: requestLogDir, + currentRequestLog: currentLog, + previousRequestLog: previousLog} + + go leveldbStore.periodicallyRotateRequestLog() + + return leveldbStore, nil +} + +func (self *LevelDbDatastore) periodicallyRotateRequestLog() { + ticker := self.nextLogRotationTicker() + for { + <-ticker.C + self.rotateRequestLog() + ticker = self.nextLogRotationTicker() + } +} + +func (self *LevelDbDatastore) rotateRequestLog() { + log.Warn("Rotating request log...") + self.requestLogLock.Lock() + defer self.requestLogLock.Unlock() + oldLog := self.previousRequestLog + self.previousRequestLog = self.currentRequestLog + var err error + self.currentRequestLog, err = NewRequestLogDb(getRequestLogDirForDate(self.requestLogDir, time.Now())) + if err != nil { + log.Error("Error creating new requst log: ", err) + panic(err) + } + go oldLog.delete() +} + +func (self *LevelDbDatastore) nextLogRotationTicker() *time.Ticker { + nextTick := time.Date(time.Now().Year(), time.Now().Month(), time.Now().Day(), HOUR_TO_ROTATE_REQUEST_LOG, MINUTE_TO_ROTATE_REQUEST_LOG, 0, 0, time.Local) + if !nextTick.After(time.Now()) { + nextTick = nextTick.Add(REQUEST_LOG_ROTATION_PERIOD) + } + diff := nextTick.Sub(time.Now()) + return time.NewTicker(diff) +} + +func (self *LevelDbDatastore) AtomicIncrement(name string, val int) (uint64, error) { + self.incrementLock.Lock() + defer self.incrementLock.Unlock() + numberKey := append(ATOMIC_INCREMENT_PREFIX, []byte(name)...) + numberBytes, err := self.db.Get(self.readOptions, numberKey) + if err != nil { + return uint64(0), err + } + currentNumber := self.bytesToCurrentNumber(numberBytes) + currentNumber += uint64(val) + currentNumberBuffer := bytes.NewBuffer(make([]byte, 0, 8)) + binary.Write(currentNumberBuffer, binary.BigEndian, currentNumber) + self.db.Put(self.writeOptions, numberKey, currentNumberBuffer.Bytes()) + + return currentNumber, nil +} + +func (self *LevelDbDatastore) bytesToCurrentNumber(numberBytes []byte) uint64 { + currentNumber := uint64(0) + + if numberBytes != nil { + binary.Read(bytes.NewBuffer(numberBytes), binary.BigEndian, ¤tNumber) + } + return currentNumber } func (self *LevelDbDatastore) WriteSeriesData(database string, series *protocol.Series) error { wb := levigo.NewWriteBatch() defer wb.Close() + + if series == nil || len(series.Points) == 0 { + return errors.New("Unable to write no data. Series was nil or had no points.") + } + for fieldIndex, field := range series.Fields { temp := field id, _, err := self.getIdForDbSeriesColumn(&database, series.Name, &temp) @@ -132,11 +278,12 @@ func (self *LevelDbDatastore) WriteSeriesData(database string, series *protocol. timestampBuffer := bytes.NewBuffer(make([]byte, 0, 8)) sequenceNumberBuffer := bytes.NewBuffer(make([]byte, 0, 8)) binary.Write(timestampBuffer, binary.BigEndian, self.convertTimestampToUint(point.GetTimestampInMicroseconds())) - binary.Write(sequenceNumberBuffer, binary.BigEndian, uint64(*point.SequenceNumber)) + binary.Write(sequenceNumberBuffer, binary.BigEndian, *point.SequenceNumber) pointKey := append(append(id, timestampBuffer.Bytes()...), sequenceNumberBuffer.Bytes()...) // TODO: we should remove the column value if timestamp and sequence number - // were provided + // were provided. + // Paul: since these are assigned in the coordinator, we'll have to figure out how to represent this. if point.Values[fieldIndex] == nil { continue } @@ -148,6 +295,7 @@ func (self *LevelDbDatastore) WriteSeriesData(database string, series *protocol. wb.Put(pointKey, data) } } + return self.db.Write(self.writeOptions, wb) } @@ -192,7 +340,157 @@ func (self *LevelDbDatastore) DropDatabase(database string) error { return self.db.Write(self.writeOptions, wb) } -func (self *LevelDbDatastore) ExecuteQuery(user common.User, database string, query *parser.Query, yield func(*protocol.Series) error) error { +func (self *LevelDbDatastore) ReplayRequestsFromSequenceNumber(clusterVersion, originatingServerId, ownerServerId *uint32, replicationFactor *uint8, lastKnownSequence *uint64, yield func(*[]byte) error) error { + self.requestLogLock.RLock() + defer self.requestLogLock.RUnlock() + + requestLog := self.currentRequestLog + + key := self.requestLogKey(clusterVersion, originatingServerId, ownerServerId, lastKnownSequence, replicationFactor) + data, err := requestLog.db.Get(self.readOptions, key) + if err != nil { + return err + } + if data == nil { + err = self.replayFromLog(key, self.previousRequestLog, yield) + if err != nil { + return err + } + startSequence := uint64(0) + key = self.requestLogKey(clusterVersion, originatingServerId, ownerServerId, &startSequence, replicationFactor) + } + err = self.replayFromLog(key, requestLog, yield) + if err != nil { + return err + } + + yield(nil) + + return nil +} + +func (self *LevelDbDatastore) replayFromLog(seekKey []byte, requestLog *requestLogDb, yield func(*[]byte) error) error { + ro := levigo.NewReadOptions() + defer ro.Close() + ro.SetFillCache(false) + it := requestLog.db.NewIterator(ro) + defer it.Close() + + startingKey := seekKey[:len(seekKey)-8] + sliceTo := len(startingKey) + it.Seek(seekKey) + if it.Valid() { + if bytes.Equal(it.Key(), seekKey) { + it.Next() + } + } + + for it = it; it.Valid(); it.Next() { + k := it.Key() + if !bytes.Equal(k[:sliceTo], startingKey) { + return nil + } + b := it.Value() + err := yield(&b) + if err != nil { + return err + } + } + return nil +} + +func (self *LevelDbDatastore) keyForOwnerAndServerSequenceNumber(clusterVersion *uint32, replicationFactor *uint8, ownerServerId, serverId *uint32) string { + return fmt.Sprintf("%s%d_%d_%d_%d", REQUEST_SEQUENCE_NUMBER_KEY, *clusterVersion, *replicationFactor, *ownerServerId, *serverId) +} + +type SequenceMissingRequestsError struct { + message string + LastKnownRequestSequence uint64 +} + +func (self SequenceMissingRequestsError) Error() string { + return self.message +} + +func (self *LevelDbDatastore) requestLogKey(clusterVersion, originatingServerId, ownerServerId *uint32, sequenceNumber *uint64, replicationFactor *uint8) []byte { + clusterVersionBytes := bytes.NewBuffer(make([]byte, 0, 4)) + binary.Write(clusterVersionBytes, binary.BigEndian, *clusterVersion) + + ownerServerBytes := bytes.NewBuffer(make([]byte, 0, 4)) + binary.Write(ownerServerBytes, binary.BigEndian, *ownerServerId) + + serverBytes := bytes.NewBuffer(make([]byte, 0, 4)) + binary.Write(serverBytes, binary.BigEndian, *originatingServerId) + + sequenceBytes := bytes.NewBuffer(make([]byte, 0, 8)) + binary.Write(sequenceBytes, binary.BigEndian, *sequenceNumber) + + sequenceReplication := bytes.NewBuffer(make([]byte, 0, 1)) + binary.Write(sequenceReplication, binary.BigEndian, *replicationFactor) + + return append( + append( + append( + append(clusterVersionBytes.Bytes(), ownerServerBytes.Bytes()...), + sequenceReplication.Bytes()...), + serverBytes.Bytes()...), + sequenceBytes.Bytes()...) +} + +func (self *LevelDbDatastore) LogRequestAndAssignSequenceNumber(request *protocol.Request, replicationFactor *uint8, ownerServerId *uint32) error { + // log to this key structure on a different DB sharded by day: + var numberKey []byte + if request.SequenceNumber == nil { + sequenceNumber, err := self.AtomicIncrement(self.keyForOwnerAndServerSequenceNumber(request.ClusterVersion, replicationFactor, ownerServerId, request.OriginatingServerId), 1) + if err != nil { + return err + } + request.SequenceNumber = &sequenceNumber + } else { + // this is for a replicated write, ensure that it's the next in line for this owner and server + name := self.keyForOwnerAndServerSequenceNumber(request.ClusterVersion, replicationFactor, ownerServerId, request.OriginatingServerId) + numberKey = append(ATOMIC_INCREMENT_PREFIX, []byte(name)...) + numberBytes, err := self.db.Get(self.readOptions, numberKey) + if err != nil { + return err + } + previousSequenceNumber := self.bytesToCurrentNumber(numberBytes) + if previousSequenceNumber+uint64(1) != *request.SequenceNumber { + return SequenceMissingRequestsError{"Missing requests between last seen and this one.", previousSequenceNumber} + } + } + + self.requestLogLock.RLock() + requestLog := self.currentRequestLog + self.requestLogLock.RUnlock() + + // proxied writes should be logged as replicated ones. That's what is expected if they're replayed later + if *request.Type == protocol.Request_PROXY_WRITE { + request.Type = &replicateWrite + } + + data, err := request.Encode() + if err != nil { + return err + } + + key := self.requestLogKey(request.ClusterVersion, request.OriginatingServerId, ownerServerId, request.SequenceNumber, replicationFactor) + err = requestLog.db.Put(self.writeOptions, key, data) + if err != nil { + return err + } + if numberKey != nil { + currentNumberBuffer := bytes.NewBuffer(make([]byte, 0, 8)) + binary.Write(currentNumberBuffer, binary.BigEndian, *request.SequenceNumber) + self.db.Put(self.writeOptions, numberKey, currentNumberBuffer.Bytes()) + } + return nil +} + +func (self *LevelDbDatastore) ExecuteQuery(user common.User, database string, + query *parser.Query, yield func(*protocol.Series) error, + ringFilter func(database, series *string, time *int64) bool) error { + seriesAndColumns := query.GetReferencedColumns() hasAccess := true for series, columns := range seriesAndColumns { @@ -203,7 +501,7 @@ func (self *LevelDbDatastore) ExecuteQuery(user common.User, database string, qu hasAccess = false continue } - err := self.executeQueryForSeries(database, name, columns, query, yield) + err := self.executeQueryForSeries(database, name, columns, query, yield, ringFilter) if err != nil { return err } @@ -213,7 +511,7 @@ func (self *LevelDbDatastore) ExecuteQuery(user common.User, database string, qu hasAccess = false continue } - err := self.executeQueryForSeries(database, series.Name, columns, query, yield) + err := self.executeQueryForSeries(database, series.Name, columns, query, yield, ringFilter) if err != nil { return err } @@ -228,6 +526,10 @@ func (self *LevelDbDatastore) ExecuteQuery(user common.User, database string, qu func (self *LevelDbDatastore) Close() { self.db.Close() self.db = nil + self.currentRequestLog.db.Close() + self.currentRequestLog = nil + self.previousRequestLog.db.Close() + self.previousRequestLog = nil self.readOptions.Close() self.readOptions = nil self.writeOptions.Close() @@ -238,7 +540,13 @@ func (self *LevelDbDatastore) deleteRangeOfSeries(database, series string, start columns := self.getColumnNamesForSeries(database, series) fields, err := self.getFieldsForSeries(database, series, columns) if err != nil { - return err + // because a db is distributed across the cluster, it's possible we don't have the series indexed here. ignore + switch err := err.(type) { + case FieldLookupError: + return nil + default: + return err + } } ro := levigo.NewReadOptions() defer ro.Close() @@ -344,12 +652,22 @@ func isPointInRange(fieldId, startTime, endTime, point []byte) bool { return bytes.Equal(id, fieldId) && bytes.Compare(time, startTime) > -1 && bytes.Compare(time, endTime) < 1 } -func (self *LevelDbDatastore) executeQueryForSeries(database, series string, columns []string, query *parser.Query, yield func(*protocol.Series) error) error { +func (self *LevelDbDatastore) executeQueryForSeries(database, series string, columns []string, + query *parser.Query, yield func(*protocol.Series) error, + ringFilter func(database, series *string, time *int64) bool) error { + startTimeBytes, endTimeBytes := self.byteArraysForStartAndEndTimes(common.TimeToMicroseconds(query.GetStartTime()), common.TimeToMicroseconds(query.GetEndTime())) + emptyResult := &protocol.Series{Name: &series, Points: nil} fields, err := self.getFieldsForSeries(database, series, columns) if err != nil { - return err + // because a db is distributed across the cluster, it's possible we don't have the series indexed here. ignore + switch err := err.(type) { + case FieldLookupError: + return yield(emptyResult) + default: + return err + } } fieldCount := len(fields) fieldNames, iterators := self.getIterators(fields, startTimeBytes, endTimeBytes, query.Ascending) @@ -386,11 +704,11 @@ func (self *LevelDbDatastore) executeQueryForSeries(database, series string, col continue } - time := key[8:16] value := it.Value() sequenceNumber := key[16:] - rawValue := &rawColumnValue{time: time, sequence: sequenceNumber, value: value} + rawTime := key[8:16] + rawValue := &rawColumnValue{time: rawTime, sequence: sequenceNumber, value: value} rawColumnValues[i] = rawValue } @@ -428,6 +746,7 @@ func (self *LevelDbDatastore) executeQueryForSeries(database, series string, col } else { iterator.Prev() } + fv := &protocol.FieldValue{} err := proto.Unmarshal(rawColumnValues[i].value, fv) if err != nil { @@ -435,14 +754,13 @@ func (self *LevelDbDatastore) executeQueryForSeries(database, series string, col } resultByteCount += len(rawColumnValues[i].value) point.Values[i] = fv + var sequence uint64 + binary.Read(bytes.NewBuffer(rawColumnValues[i].sequence), binary.BigEndian, &sequence) var t uint64 binary.Read(bytes.NewBuffer(rawColumnValues[i].time), binary.BigEndian, &t) time := self.convertUintTimestampToInt64(&t) - var sequence uint64 - binary.Read(bytes.NewBuffer(rawColumnValues[i].sequence), binary.BigEndian, &sequence) - seq32 := uint32(sequence) point.SetTimestampInMicroseconds(time) - point.SequenceNumber = &seq32 + point.SequenceNumber = &sequence rawColumnValues[i] = nil } @@ -452,6 +770,11 @@ func (self *LevelDbDatastore) executeQueryForSeries(database, series string, col } limit -= 1 + + if ringFilter != nil && ringFilter(&database, &series, point.Timestamp) { + continue + } + result.Points = append(result.Points, point) // add byte count for the timestamp and the sequence @@ -474,7 +797,6 @@ func (self *LevelDbDatastore) executeQueryForSeries(database, series string, col if _, err := self.sendBatch(query, result, yield); err != nil { return err } - emptyResult := &protocol.Series{Name: &series, Fields: fieldNames, Points: nil} _, err = self.sendBatch(query, emptyResult, yield) return err } @@ -570,6 +892,14 @@ func (self *LevelDbDatastore) getColumnNamesForSeries(db, series string) []strin return names } +type FieldLookupError struct { + message string +} + +func (self FieldLookupError) Error() string { + return self.message +} + func (self *LevelDbDatastore) getFieldsForSeries(db, series string, columns []string) ([]*Field, error) { isCountQuery := false if len(columns) > 0 && columns[0] == "*" { @@ -579,7 +909,7 @@ func (self *LevelDbDatastore) getFieldsForSeries(db, series string, columns []st columns = self.getColumnNamesForSeries(db, series) } if len(columns) == 0 { - return nil, errors.New("Couldn't look up columns for series: " + series) + return nil, FieldLookupError{"Coulnd't look up columns for series: " + series} } fields := make([]*Field, len(columns), len(columns)) @@ -590,7 +920,7 @@ func (self *LevelDbDatastore) getFieldsForSeries(db, series string, columns []st return nil, errId } if !alreadyPresent { - return nil, errors.New("Field " + name + " doesn't exist in series " + series) + return nil, FieldLookupError{"Field " + name + " doesn't exist in series " + series} } fields[i] = &Field{Name: name, Id: id} } diff --git a/src/engine/engine_test.go b/src/engine/engine_test.go index 3bfb77afabc..f0ee9978278 100644 --- a/src/engine/engine_test.go +++ b/src/engine/engine_test.go @@ -3,6 +3,7 @@ package engine import ( . "checkers" "common" + "coordinator" "encoding/json" "fmt" . "launchpad.net/gocheck" @@ -45,7 +46,7 @@ func (self *MockCoordinator) WriteSeriesData(user common.User, database string, return nil } -func (self *MockCoordinator) CreateDatabase(user common.User, db string) error { +func (self *MockCoordinator) CreateDatabase(user common.User, db string, rf uint8) error { return nil } @@ -53,10 +54,18 @@ func (self *MockCoordinator) DropDatabase(user common.User, db string) error { return nil } -func (self *MockCoordinator) ListDatabases(user common.User) ([]string, error) { +func (self *MockCoordinator) ListDatabases(user common.User) ([]*coordinator.Database, error) { return nil, nil } +func (self *MockCoordinator) ReplicateWrite(request *protocol.Request) error { + return nil +} + +func (self *MockCoordinator) ReplayReplication(request *protocol.Request, replicationFactor *uint8, owningServerId *uint32, lastSeenSequenceNumber *uint64) { + return +} + func createEngine(c *C, seriesString string) EngineI { series, err := common.StringToSeriesArray(seriesString) c.Assert(err, IsNil) diff --git a/src/integration/benchmark_test.go b/src/integration/benchmark_test.go index c3bf357897f..92cae44dfb1 100644 --- a/src/integration/benchmark_test.go +++ b/src/integration/benchmark_test.go @@ -105,7 +105,7 @@ func (self *Server) start() error { } root := filepath.Join(dir, "..", "..") - filename := filepath.Join(root, "server") + filename := filepath.Join(root, "daemon") p, err := os.StartProcess(filename, []string{filename, "-cpuprofile", "/tmp/cpuprofile"}, &os.ProcAttr{ Dir: root, Env: os.Environ(), @@ -115,7 +115,7 @@ func (self *Server) start() error { return err } self.p = p - time.Sleep(2 * time.Second) + time.Sleep(4 * time.Second) return nil } diff --git a/src/protocol/protocol.pb.go b/src/protocol/protocol.pb.go index b19f5596d72..5c960f2f113 100644 --- a/src/protocol/protocol.pb.go +++ b/src/protocol/protocol.pb.go @@ -16,20 +16,29 @@ var _ = math.Inf type Request_Type int32 const ( - Request_QUERY Request_Type = 1 - Request_WRITE Request_Type = 2 - Request_GET_SERVERS Request_Type = 3 + Request_QUERY Request_Type = 1 + Request_REPLICATION_WRITE Request_Type = 2 + Request_PROXY_WRITE Request_Type = 3 + Request_REPLICATION_DELETE Request_Type = 4 + Request_PROXY_DELETE Request_Type = 5 + Request_REPLICATION_REPLAY Request_Type = 6 ) var Request_Type_name = map[int32]string{ 1: "QUERY", - 2: "WRITE", - 3: "GET_SERVERS", + 2: "REPLICATION_WRITE", + 3: "PROXY_WRITE", + 4: "REPLICATION_DELETE", + 5: "PROXY_DELETE", + 6: "REPLICATION_REPLAY", } var Request_Type_value = map[string]int32{ - "QUERY": 1, - "WRITE": 2, - "GET_SERVERS": 3, + "QUERY": 1, + "REPLICATION_WRITE": 2, + "PROXY_WRITE": 3, + "REPLICATION_DELETE": 4, + "PROXY_DELETE": 5, + "REPLICATION_REPLAY": 6, } func (x Request_Type) Enum() *Request_Type { @@ -40,6 +49,9 @@ func (x Request_Type) Enum() *Request_Type { func (x Request_Type) String() string { return proto.EnumName(Request_Type_name, int32(x)) } +func (x Request_Type) MarshalJSON() ([]byte, error) { + return json.Marshal(x.String()) +} func (x *Request_Type) UnmarshalJSON(data []byte) error { value, err := proto.UnmarshalJSONEnum(Request_Type_value, data, "Request_Type") if err != nil { @@ -52,20 +64,26 @@ func (x *Request_Type) UnmarshalJSON(data []byte) error { type Response_Type int32 const ( - Response_QUERY Response_Type = 1 - Response_WRITE_OK Response_Type = 2 - Response_END_STREAM Response_Type = 3 + Response_QUERY Response_Type = 1 + Response_WRITE_OK Response_Type = 2 + Response_END_STREAM Response_Type = 3 + Response_REPLICATION_REPLAY Response_Type = 4 + Response_REPLICATION_REPLAY_END Response_Type = 5 ) var Response_Type_name = map[int32]string{ 1: "QUERY", 2: "WRITE_OK", 3: "END_STREAM", + 4: "REPLICATION_REPLAY", + 5: "REPLICATION_REPLAY_END", } var Response_Type_value = map[string]int32{ - "QUERY": 1, - "WRITE_OK": 2, - "END_STREAM": 3, + "QUERY": 1, + "WRITE_OK": 2, + "END_STREAM": 3, + "REPLICATION_REPLAY": 4, + "REPLICATION_REPLAY_END": 5, } func (x Response_Type) Enum() *Response_Type { @@ -76,6 +94,9 @@ func (x Response_Type) Enum() *Response_Type { func (x Response_Type) String() string { return proto.EnumName(Response_Type_name, int32(x)) } +func (x Response_Type) MarshalJSON() ([]byte, error) { + return json.Marshal(x.String()) +} func (x *Response_Type) UnmarshalJSON(data []byte) error { value, err := proto.UnmarshalJSONEnum(Response_Type_value, data, "Response_Type") if err != nil { @@ -85,6 +106,39 @@ func (x *Response_Type) UnmarshalJSON(data []byte) error { return nil } +type Response_ErrorCode int32 + +const ( + Response_REQUEST_TOO_LARGE Response_ErrorCode = 1 +) + +var Response_ErrorCode_name = map[int32]string{ + 1: "REQUEST_TOO_LARGE", +} +var Response_ErrorCode_value = map[string]int32{ + "REQUEST_TOO_LARGE": 1, +} + +func (x Response_ErrorCode) Enum() *Response_ErrorCode { + p := new(Response_ErrorCode) + *p = x + return p +} +func (x Response_ErrorCode) String() string { + return proto.EnumName(Response_ErrorCode_name, int32(x)) +} +func (x Response_ErrorCode) MarshalJSON() ([]byte, error) { + return json.Marshal(x.String()) +} +func (x *Response_ErrorCode) UnmarshalJSON(data []byte) error { + value, err := proto.UnmarshalJSONEnum(Response_ErrorCode_value, data, "Response_ErrorCode") + if err != nil { + return err + } + *x = Response_ErrorCode(value) + return nil +} + type FieldValue struct { StringValue *string `protobuf:"bytes,1,opt,name=string_value" json:"string_value,omitempty"` DoubleValue *float64 `protobuf:"fixed64,3,opt,name=double_value" json:"double_value,omitempty"` @@ -127,8 +181,8 @@ func (m *FieldValue) GetInt64Value() int64 { type Point struct { Values []*FieldValue `protobuf:"bytes,1,rep,name=values" json:"values,omitempty"` - Timestamp *int64 `protobuf:"varint,2,req,name=timestamp" json:"timestamp,omitempty"` - SequenceNumber *uint32 `protobuf:"varint,3,req,name=sequence_number" json:"sequence_number,omitempty"` + Timestamp *int64 `protobuf:"varint,2,opt,name=timestamp" json:"timestamp,omitempty"` + SequenceNumber *uint64 `protobuf:"varint,3,opt,name=sequence_number" json:"sequence_number,omitempty"` XXX_unrecognized []byte `json:"-"` } @@ -150,7 +204,7 @@ func (m *Point) GetTimestamp() int64 { return 0 } -func (m *Point) GetSequenceNumber() uint32 { +func (m *Point) GetSequenceNumber() uint64 { if m != nil && m.SequenceNumber != nil { return *m.SequenceNumber } @@ -190,16 +244,27 @@ func (m *Series) GetFields() []string { } type Request struct { - Id *int32 `protobuf:"varint,1,req,name=id" json:"id,omitempty"` - Type *Request_Type `protobuf:"varint,2,req,name=type,enum=protocol.Request_Type" json:"type,omitempty"` - XXX_unrecognized []byte `json:"-"` + Id *uint32 `protobuf:"varint,1,req,name=id" json:"id,omitempty"` + Type *Request_Type `protobuf:"varint,2,req,name=type,enum=protocol.Request_Type" json:"type,omitempty"` + Database *string `protobuf:"bytes,3,req,name=database" json:"database,omitempty"` + Series *Series `protobuf:"bytes,4,opt,name=series" json:"series,omitempty"` + SequenceNumber *uint64 `protobuf:"varint,5,opt,name=sequence_number" json:"sequence_number,omitempty"` + OriginatingServerId *uint32 `protobuf:"varint,6,opt,name=originating_server_id" json:"originating_server_id,omitempty"` + ClusterVersion *uint32 `protobuf:"varint,10,opt,name=cluster_version" json:"cluster_version,omitempty"` + Query *string `protobuf:"bytes,7,opt,name=query" json:"query,omitempty"` + UserName *string `protobuf:"bytes,8,opt,name=user_name" json:"user_name,omitempty"` + RingLocationsToQuery *uint32 `protobuf:"varint,9,opt,name=ring_locations_to_query" json:"ring_locations_to_query,omitempty"` + ReplicationFactor *uint32 `protobuf:"varint,16,opt,name=replication_factor" json:"replication_factor,omitempty"` + OwnerServerId *uint32 `protobuf:"varint,17,opt,name=owner_server_id" json:"owner_server_id,omitempty"` + LastKnownSequenceNumber *uint64 `protobuf:"varint,18,opt,name=last_known_sequence_number" json:"last_known_sequence_number,omitempty"` + XXX_unrecognized []byte `json:"-"` } func (m *Request) Reset() { *m = Request{} } func (m *Request) String() string { return proto.CompactTextString(m) } func (*Request) ProtoMessage() {} -func (m *Request) GetId() int32 { +func (m *Request) GetId() uint32 { if m != nil && m.Id != nil { return *m.Id } @@ -213,20 +278,108 @@ func (m *Request) GetType() Request_Type { return Request_QUERY } +func (m *Request) GetDatabase() string { + if m != nil && m.Database != nil { + return *m.Database + } + return "" +} + +func (m *Request) GetSeries() *Series { + if m != nil { + return m.Series + } + return nil +} + +func (m *Request) GetSequenceNumber() uint64 { + if m != nil && m.SequenceNumber != nil { + return *m.SequenceNumber + } + return 0 +} + +func (m *Request) GetOriginatingServerId() uint32 { + if m != nil && m.OriginatingServerId != nil { + return *m.OriginatingServerId + } + return 0 +} + +func (m *Request) GetClusterVersion() uint32 { + if m != nil && m.ClusterVersion != nil { + return *m.ClusterVersion + } + return 0 +} + +func (m *Request) GetQuery() string { + if m != nil && m.Query != nil { + return *m.Query + } + return "" +} + +func (m *Request) GetUserName() string { + if m != nil && m.UserName != nil { + return *m.UserName + } + return "" +} + +func (m *Request) GetRingLocationsToQuery() uint32 { + if m != nil && m.RingLocationsToQuery != nil { + return *m.RingLocationsToQuery + } + return 0 +} + +func (m *Request) GetReplicationFactor() uint32 { + if m != nil && m.ReplicationFactor != nil { + return *m.ReplicationFactor + } + return 0 +} + +func (m *Request) GetOwnerServerId() uint32 { + if m != nil && m.OwnerServerId != nil { + return *m.OwnerServerId + } + return 0 +} + +func (m *Request) GetLastKnownSequenceNumber() uint64 { + if m != nil && m.LastKnownSequenceNumber != nil { + return *m.LastKnownSequenceNumber + } + return 0 +} + type Response struct { - Id *int32 `protobuf:"varint,1,req,name=id" json:"id,omitempty"` - Series *Series `protobuf:"bytes,2,opt,name=series" json:"series,omitempty"` - Servers []string `protobuf:"bytes,3,rep,name=servers" json:"servers,omitempty"` - XXX_unrecognized []byte `json:"-"` + Type *Response_Type `protobuf:"varint,1,req,name=type,enum=protocol.Response_Type" json:"type,omitempty"` + RequestId *uint32 `protobuf:"varint,2,req,name=request_id" json:"request_id,omitempty"` + Series *Series `protobuf:"bytes,3,opt,name=series" json:"series,omitempty"` + ErrorCode *Response_ErrorCode `protobuf:"varint,4,opt,name=error_code,enum=protocol.Response_ErrorCode" json:"error_code,omitempty"` + ErrorMessage *string `protobuf:"bytes,5,opt,name=error_message" json:"error_message,omitempty"` + NextPointTime *int64 `protobuf:"varint,6,opt,name=nextPointTime" json:"nextPointTime,omitempty"` + Request *Request `protobuf:"bytes,7,opt,name=request" json:"request,omitempty"` + XXX_unrecognized []byte `json:"-"` } func (m *Response) Reset() { *m = Response{} } func (m *Response) String() string { return proto.CompactTextString(m) } func (*Response) ProtoMessage() {} -func (m *Response) GetId() int32 { - if m != nil && m.Id != nil { - return *m.Id +func (m *Response) GetType() Response_Type { + if m != nil && m.Type != nil { + return *m.Type + } + return Response_QUERY +} + +func (m *Response) GetRequestId() uint32 { + if m != nil && m.RequestId != nil { + return *m.RequestId } return 0 } @@ -238,9 +391,30 @@ func (m *Response) GetSeries() *Series { return nil } -func (m *Response) GetServers() []string { +func (m *Response) GetErrorCode() Response_ErrorCode { + if m != nil && m.ErrorCode != nil { + return *m.ErrorCode + } + return Response_REQUEST_TOO_LARGE +} + +func (m *Response) GetErrorMessage() string { + if m != nil && m.ErrorMessage != nil { + return *m.ErrorMessage + } + return "" +} + +func (m *Response) GetNextPointTime() int64 { + if m != nil && m.NextPointTime != nil { + return *m.NextPointTime + } + return 0 +} + +func (m *Response) GetRequest() *Request { if m != nil { - return m.Servers + return m.Request } return nil } @@ -248,4 +422,5 @@ func (m *Response) GetServers() []string { func init() { proto.RegisterEnum("protocol.Request_Type", Request_Type_name, Request_Type_value) proto.RegisterEnum("protocol.Response_Type", Response_Type_name, Response_Type_value) + proto.RegisterEnum("protocol.Response_ErrorCode", Response_ErrorCode_name, Response_ErrorCode_value) } diff --git a/src/protocol/protocol.proto b/src/protocol/protocol.proto index c828116b8ea..271c1899529 100644 --- a/src/protocol/protocol.proto +++ b/src/protocol/protocol.proto @@ -9,8 +9,8 @@ message FieldValue { message Point { repeated FieldValue values = 1; - required int64 timestamp = 2; - required uint32 sequence_number = 3; + optional int64 timestamp = 2; + optional uint64 sequence_number = 3; } message Series { @@ -22,11 +22,37 @@ message Series { message Request { enum Type { QUERY = 1; - WRITE = 2; - GET_SERVERS = 3; + REPLICATION_WRITE = 2; + PROXY_WRITE = 3; + REPLICATION_DELETE = 4; + PROXY_DELETE = 5; + REPLICATION_REPLAY = 6; } - required int32 id = 1; + required uint32 id = 1; required Type type = 2; + required string database = 3; + optional Series series = 4; + // only write and delete requests get sequenceNumbers assigned. These are used to + // ensure that the receiving server is up to date + optional uint64 sequence_number = 5; + // the originzatingServerId is only used for writes and deletes. It is the id of the + // server that first committed the write to its local datastore. It is used for + // the other servers in the hash ring to ensure they remain consistent. + optional uint32 originating_server_id = 6; + optional uint32 cluster_version = 10; + optional string query = 7; + optional string user_name = 8; + // ringLocationsToQuery tells the server what data it should be returning. + // for example, if the number is 1, it will only return data that is owned by + // this server on the hash ring. If 2, it will return this server and data replicated + // from the server directly before it on the ring. 3, etc. + // If this field is left out, we assume that we'll be returning all data the server has + // for the query. + optional uint32 ring_locations_to_query = 9; + // optional fields for replication replay requests. should include originating serer id + optional uint32 replication_factor = 16; + optional uint32 owner_server_id = 17; + optional uint64 last_known_sequence_number = 18; } message Response { @@ -34,8 +60,17 @@ message Response { QUERY = 1; WRITE_OK = 2; END_STREAM = 3; + REPLICATION_REPLAY = 4; + REPLICATION_REPLAY_END = 5; } - required int32 id = 1; - optional Series series = 2; - repeated string servers = 3; + enum ErrorCode { + REQUEST_TOO_LARGE = 1; + } + required Type type = 1; + required uint32 request_id = 2; + optional Series series = 3; + optional ErrorCode error_code = 4; + optional string error_message = 5; + optional int64 nextPointTime = 6; + optional Request request = 7; } diff --git a/src/protocol/protocol_extensions.go b/src/protocol/protocol_extensions.go index 05b0782e571..80e65a4ccef 100644 --- a/src/protocol/protocol_extensions.go +++ b/src/protocol/protocol_extensions.go @@ -1,16 +1,18 @@ package protocol import ( + "bytes" "code.google.com/p/goprotobuf/proto" + "sort" ) -func UnmarshalPoint(data []byte) (point *Point, err error) { +func DecodePoint(buff *bytes.Buffer) (point *Point, err error) { point = &Point{} - err = proto.Unmarshal(data, point) + err = proto.Unmarshal(buff.Bytes(), point) return } -func MarshalPoint(point *Point) (data []byte, err error) { +func (point *Point) Encode() (data []byte, err error) { return proto.Marshal(point) } @@ -51,3 +53,54 @@ func (self *Point) GetFieldValue(idx int) interface{} { } return v.GetValue() } + +func DecodeRequest(buff *bytes.Buffer) (request *Request, err error) { + request = &Request{} + err = proto.Unmarshal(buff.Bytes(), request) + return +} + +func (self *Request) Encode() (data []byte, err error) { + return proto.Marshal(self) +} + +func DecodeResponse(buff *bytes.Buffer) (response *Response, err error) { + response = &Response{} + err = proto.Unmarshal(buff.Bytes(), response) + return +} + +func (self *Response) Encode() (data []byte, err error) { + return proto.Marshal(self) +} + +type PointsCollection []*Point + +func (s PointsCollection) Len() int { return len(s) } +func (s PointsCollection) Swap(i, j int) { s[i], s[j] = s[j], s[i] } + +type ByPointTimeDesc struct{ PointsCollection } +type ByPointTimeAsc struct{ PointsCollection } + +func (s ByPointTimeAsc) Less(i, j int) bool { + if s.PointsCollection[i] != nil && s.PointsCollection[j] != nil { + return *s.PointsCollection[i].Timestamp < *s.PointsCollection[j].Timestamp + } + return false +} +func (s ByPointTimeDesc) Less(i, j int) bool { + if s.PointsCollection[i] != nil && s.PointsCollection[j] != nil { + return *s.PointsCollection[i].Timestamp > *s.PointsCollection[j].Timestamp + } + return false +} + +func (self *Series) SortPointsTimeAscending() { + sort.Sort(ByPointTimeAsc{self.Points}) +} + +func (self *Series) SortPointsTimeDescending() { + if self.Points != nil { + sort.Sort(ByPointTimeDesc{self.Points}) + } +} diff --git a/src/protocol/protocol_test.go b/src/protocol/protocol_test.go index 701c07b8af1..20845368003 100644 --- a/src/protocol/protocol_test.go +++ b/src/protocol/protocol_test.go @@ -1,6 +1,7 @@ package protocol import ( + "bytes" . "launchpad.net/gocheck" "testing" "time" @@ -15,7 +16,7 @@ type ProtocolSuite struct{} var _ = Suite(&ProtocolSuite{}) -func (self *ProtocolSuite) TestCanMarshalAndUnmarshal(c *C) { +func (self *ProtocolSuite) TestCanEncodeAndDecode(c *C) { p := &Point{} v := &FieldValue{} @@ -24,14 +25,14 @@ func (self *ProtocolSuite) TestCanMarshalAndUnmarshal(c *C) { p.Values = []*FieldValue{v} t := time.Now().Unix() p.Timestamp = &t - s := uint32(23432423) + s := uint64(23432423) p.SequenceNumber = &s - d, err := MarshalPoint(p) + d, err := p.Encode() c.Assert(err, Equals, nil) c.Assert(len(d), Equals, 22) - point, err2 := UnmarshalPoint(d) + point, err2 := DecodePoint(bytes.NewBuffer(d)) c.Assert(err2, Equals, nil) c.Assert(point.Values[0].GetDoubleValue(), Equals, f) } diff --git a/src/server/mock_user_test.go b/src/server/mock_user_test.go new file mode 100644 index 00000000000..16ae0f5f51f --- /dev/null +++ b/src/server/mock_user_test.go @@ -0,0 +1,26 @@ +package server + +type MockUser struct { +} + +func (self *MockUser) GetName() string { + return "mockuser" +} +func (self *MockUser) IsDeleted() bool { + return false +} +func (self *MockUser) IsClusterAdmin() bool { + return false +} +func (self *MockUser) IsDbAdmin(db string) bool { + return false +} +func (self *MockUser) GetDb() string { + return "" +} +func (self *MockUser) HasWriteAccess(name string) bool { + return true +} +func (self *MockUser) HasReadAccess(name string) bool { + return true +} diff --git a/src/server/server.go b/src/server/server.go index 9e2b2da5dff..3d0419fcc98 100644 --- a/src/server/server.go +++ b/src/server/server.go @@ -1,4 +1,4 @@ -package main +package server import ( "admin" @@ -7,129 +7,84 @@ import ( "coordinator" "datastore" "engine" - "flag" - "fmt" - "io/ioutil" "log" - "os" - "os/signal" - "runtime" - "strconv" - "syscall" "time" ) -const ( - version = "dev" - gitSha = "HEAD" -) - -func waitForSignals(stopped <-chan bool) { - ch := make(chan os.Signal) - signal.Notify(ch, syscall.SIGTERM, syscall.SIGINT) - for { - sig := <-ch - fmt.Printf("Received signal: %s\n", sig.String()) - switch sig { - case syscall.SIGINT, syscall.SIGTERM: - runtime.SetCPUProfileRate(0) - <-stopped - os.Exit(0) - } - } +type Server struct { + RaftServer *coordinator.RaftServer + Db datastore.Datastore + ProtobufServer *coordinator.ProtobufServer + ClusterConfig *coordinator.ClusterConfiguration + HttpApi *http.HttpServer + AdminServer *admin.HttpServer + Coordinator coordinator.Coordinator + Config *configuration.Configuration + RequestHandler *coordinator.ProtobufRequestHandler + stopped bool } -func startProfiler(filename *string) error { - if filename == nil || *filename == "" { - return nil - } - - cpuProfileFile, err := os.Create(*filename) +func NewServer(config *configuration.Configuration) (*Server, error) { + log.Println("Opening database at ", config.DataDir) + db, err := datastore.NewLevelDbDatastore(config.DataDir) if err != nil { - return err + return nil, err } - runtime.SetCPUProfileRate(500) - stopped := make(chan bool) - - go waitForSignals(stopped) - go func() { - for { - select { - default: - data := runtime.CPUProfile() - if data == nil { - cpuProfileFile.Close() - stopped <- true - break - } - cpuProfileFile.Write(data) - } - } - }() - return nil -} - -func main() { - fileName := flag.String("config", "config.json.sample", "Config file") - wantsVersion := flag.Bool("v", false, "Get version number") - resetRootPassword := flag.Bool("reset-root", false, "Reset root password") - pidFile := flag.String("pidfile", "", "the pid file") - cpuProfiler := flag.String("cpuprofile", "", "filename where cpu profile data will be written") - - runtime.GOMAXPROCS(runtime.NumCPU()) - flag.Parse() - - startProfiler(cpuProfiler) - - if wantsVersion != nil && *wantsVersion { - fmt.Printf("InfluxDB v%s (git: %s)\n", version, gitSha) - return - } - config := configuration.LoadConfiguration(*fileName) + clusterConfig := coordinator.NewClusterConfiguration() + raftServer := coordinator.NewRaftServer(config, clusterConfig) + coord := coordinator.NewCoordinatorImpl(db, raftServer, clusterConfig) + requestHandler := coordinator.NewProtobufRequestHandler(db, coord, clusterConfig) + protobufServer := coordinator.NewProtobufServer(config.ProtobufPortString(), requestHandler) - if pidFile != nil && *pidFile != "" { - pid := strconv.Itoa(os.Getpid()) - if err := ioutil.WriteFile(*pidFile, []byte(pid), 0644); err != nil { - panic(err) - } + eng, err := engine.NewQueryEngine(coord) + if err != nil { + return nil, err } - log.Println("Starting Influx Server...") - clusterConfig := coordinator.NewClusterConfiguration() - os.MkdirAll(config.RaftDir, 0744) + httpApi := http.NewHttpServer(config.ApiHttpPortString(), eng, coord, coord) + adminServer := admin.NewHttpServer(config.AdminAssetsDir, config.AdminHttpPortString()) - raftServer := coordinator.NewRaftServer(config.RaftDir, "localhost", config.RaftServerPort, clusterConfig) - go func() { - raftServer.ListenAndServe(config.SeedServers, false) - }() + return &Server{ + RaftServer: raftServer, + Db: db, + ProtobufServer: protobufServer, + ClusterConfig: clusterConfig, + HttpApi: httpApi, + Coordinator: coord, + AdminServer: adminServer, + Config: config, + RequestHandler: requestHandler}, nil +} - if *resetRootPassword { - time.Sleep(2 * time.Second) // wait for the raft server to join the cluster +func (self *Server) ListenAndServe() error { + go self.ProtobufServer.ListenAndServe() - fmt.Printf("Resetting root's password to %s", coordinator.DEFAULT_ROOT_PWD) - if err := raftServer.CreateRootUser(); err != nil { - panic(err) - } + retryUntilJoinedCluster := false + if len(self.Config.SeedServers) > 0 { + retryUntilJoinedCluster = true } - os.MkdirAll(config.DataDir, 0744) - log.Println("Opening database at ", config.DataDir) - db, err := datastore.NewLevelDbDatastore(config.DataDir) + go self.RaftServer.ListenAndServe(self.Config.SeedServers, retryUntilJoinedCluster) + time.Sleep(time.Second * 3) + err := self.Coordinator.(*coordinator.CoordinatorImpl).ConnectToProtobufServers(self.Config.ProtobufConnectionString()) if err != nil { - panic(err) + return err } - coord := coordinator.NewCoordinatorImpl(db, raftServer, clusterConfig) - eng, err := engine.NewQueryEngine(coord) - if err != nil { - panic(err) + log.Println("Starting admin interface on port", self.Config.AdminHttpPort) + go self.AdminServer.ListenAndServe() + log.Println("Starting Http Api server on port", self.Config.ApiHttpPort) + self.HttpApi.ListenAndServe() + return nil +} + +func (self *Server) Stop() { + if self.stopped { + return } - log.Println() - adminServer := admin.NewHttpServer(config.AdminAssetsDir, config.AdminHttpPortString()) - log.Println("Starting admin interface on port", config.AdminHttpPort) - go func() { - adminServer.ListenAndServe() - }() - log.Println("Starting Http Api server on port", config.ApiHttpPort) - server := http.NewHttpServer(config.ApiHttpPortString(), eng, coord, coord) - server.ListenAndServe() + self.stopped = true + self.RaftServer.Close() + self.Db.Close() + self.HttpApi.Close() + self.ProtobufServer.Close() + // TODO: close admin server and protobuf client connections } diff --git a/src/server/server_test.go b/src/server/server_test.go new file mode 100644 index 00000000000..aea73020d03 --- /dev/null +++ b/src/server/server_test.go @@ -0,0 +1,366 @@ +package server + +import ( + "bytes" + "common" + "configuration" + "datastore" + "encoding/json" + "fmt" + "io/ioutil" + . "launchpad.net/gocheck" + "net" + "net/http" + "net/url" + "os" + "parser" + "protocol" + "runtime" + "testing" + "time" +) + +// Hook up gocheck into the gotest runner. +func Test(t *testing.T) { + TestingT(t) +} + +type ServerSuite struct { + servers []*Server +} + +var _ = Suite(&ServerSuite{}) + +func init() { + runtime.GOMAXPROCS(runtime.NumCPU() * 2) +} + +func (self *ServerSuite) SetUpSuite(c *C) { + self.servers = startCluster(3, c) + time.Sleep(time.Second * 4) + time.Sleep(time.Second) + err := self.servers[0].RaftServer.CreateDatabase("test_rep", uint8(2)) + c.Assert(err, IsNil) + time.Sleep(time.Millisecond * 10) + _, err = self.postToServer(self.servers[0], "/db/test_rep/users?u=root&p=root", `{"username": "paul", "password": "pass"}`, c) + c.Assert(err, IsNil) +} + +func (self *ServerSuite) TearDownSuite(c *C) { + for _, s := range self.servers { + s.Stop() + os.RemoveAll(s.Config.DataDir) + os.RemoveAll(s.Config.RaftDir) + } +} + +func getAvailablePorts(count int, c *C) []int { + listeners := make([]net.Listener, count, count) + ports := make([]int, count, count) + for i, _ := range listeners { + l, err := net.Listen("tcp4", ":0") + c.Assert(err, IsNil) + port := l.Addr().(*net.TCPAddr).Port + ports[i] = port + listeners[i] = l + } + for _, l := range listeners { + l.Close() + } + return ports +} + +func getDir(prefix string, c *C) string { + path, err := ioutil.TempDir(os.TempDir(), prefix) + c.Assert(err, IsNil) + return path +} + +func startCluster(count int, c *C) []*Server { + ports := getAvailablePorts(count*4, c) + seedServerPort := ports[0] + servers := make([]*Server, count, count) + for i, _ := range servers { + var seedServers []string + if i == 0 { + seedServers = []string{} + } else { + seedServers = []string{fmt.Sprintf("http://localhost:%d", seedServerPort)} + } + + portOffset := i * 4 + config := &configuration.Configuration{ + RaftServerPort: ports[portOffset], + AdminHttpPort: ports[portOffset+1], + ApiHttpPort: ports[portOffset+2], + ProtobufPort: ports[portOffset+3], + AdminAssetsDir: "./", + DataDir: getDir("influxdb_db", c), + RaftDir: getDir("influxdb_raft", c), + SeedServers: seedServers, + Hostname: "localhost", + } + server, err := NewServer(config) + if err != nil { + c.Error(err) + } + go func() { + err := server.ListenAndServe() + if err != nil { + c.Error(err) + } + }() + time.Sleep(time.Millisecond * 50) + servers[i] = server + } + return servers +} + +func (self *ServerSuite) postToServer(server *Server, url, data string, c *C) (*http.Response, error) { + fullUrl := fmt.Sprintf("http://localhost:%d%s", server.Config.ApiHttpPort, url) + resp, err := http.Post(fullUrl, "application/json", bytes.NewBufferString(data)) + c.Assert(err, IsNil) + return resp, err +} + +func executeQuery(user common.User, database, query string, db datastore.Datastore, c *C) []*protocol.Series { + q, errQ := parser.ParseQuery(query) + c.Assert(errQ, IsNil) + resultSeries := []*protocol.Series{} + yield := func(series *protocol.Series) error { + // ignore time series which have no data, this includes + // end of series indicator + if len(series.Points) > 0 { + resultSeries = append(resultSeries, series) + } + return nil + } + err := db.ExecuteQuery(user, database, q, yield, nil) + c.Assert(err, IsNil) + return resultSeries +} + +func (self *ServerSuite) TestDataReplication(c *C) { + servers := self.servers + + data := ` + [{ + "points": [ + ["val1", 2] + ], + "name": "foo", + "columns": ["val_1", "val_2"] + }]` + resp, _ := self.postToServer(servers[0], "/db/test_rep/series?u=paul&p=pass", data, c) + c.Assert(resp.StatusCode, Equals, http.StatusOK) + time.Sleep(time.Millisecond * 10) + + countWithPoint := 0 + user := &MockUser{} + for _, server := range servers { + results := executeQuery(user, "test_rep", "select * from foo;", server.Db, c) + pointCount := 0 + for _, series := range results { + if *series.Name == "foo" { + if len(series.Points) > 0 { + pointCount += 1 + } + } else { + c.Error(fmt.Sprintf("Got a series in the query we didn't expect: %s", *series.Name)) + } + } + if pointCount > 1 { + c.Error("Got too many points for the series from one db") + } else if pointCount > 0 { + countWithPoint += 1 + } + } + c.Assert(countWithPoint, Equals, 2) +} + +func (self *ServerSuite) TestCrossClusterQueries(c *C) { + data := `[{ + "name": "cluster_query", + "columns": ["val1"], + "points": [[1], [2], [3], [4]] + }]` + resp, _ := self.postToServer(self.servers[0], "/db/test_rep/series?u=paul&p=pass", data, c) + c.Assert(resp.StatusCode, Equals, http.StatusOK) + + time.Sleep(time.Second) + data = `[{ + "name": "cluster_query", + "columns": ["val1"], + "points": [[5], [6], [7]] + }]` + resp, _ = self.postToServer(self.servers[0], "/db/test_rep/series?u=paul&p=pass", data, c) + c.Assert(resp.StatusCode, Equals, http.StatusOK) + time.Sleep(time.Millisecond * 100) + + for _, s := range self.servers { + query := "select count(val1) from cluster_query;" + encodedQuery := url.QueryEscape(query) + resp, err := http.Get(fmt.Sprintf("http://localhost:%d/db/test_rep/series?u=paul&p=pass&q=%s", s.Config.ApiHttpPort, encodedQuery)) + c.Assert(err, IsNil) + body, err := ioutil.ReadAll(resp.Body) + c.Assert(err, IsNil) + c.Assert(resp.StatusCode, Equals, http.StatusOK) + resp.Body.Close() + var results []map[string]interface{} + err = json.Unmarshal(body, &results) + c.Assert(err, IsNil) + c.Assert(results, HasLen, 1) + point := results[0]["points"].([]interface{})[0].([]interface{}) + val := point[len(point)-1].(float64) + c.Assert(val, Equals, float64(7)) + } + + data = `[{ + "name": "cluster_query", + "columns": ["val1"], + "points": [[8], [9]] + }]` + resp, _ = self.postToServer(self.servers[0], "/db/test_rep/series?u=paul&p=pass", data, c) + c.Assert(resp.StatusCode, Equals, http.StatusOK) + time.Sleep(time.Millisecond * 100) + + data = `[{ + "name": "cluster_query", + "columns": ["val1"], + "points": [[10], [11]] + }]` + resp, _ = self.postToServer(self.servers[0], "/db/test_rep/series?u=paul&p=pass", data, c) + c.Assert(resp.StatusCode, Equals, http.StatusOK) + time.Sleep(time.Millisecond * 100) + + for _, s := range self.servers { + query := "select count(val1) from cluster_query;" + encodedQuery := url.QueryEscape(query) + resp, err := http.Get(fmt.Sprintf("http://localhost:%d/db/test_rep/series?u=paul&p=pass&q=%s", s.Config.ApiHttpPort, encodedQuery)) + c.Assert(err, IsNil) + body, err := ioutil.ReadAll(resp.Body) + c.Assert(err, IsNil) + c.Assert(resp.StatusCode, Equals, http.StatusOK) + resp.Body.Close() + var results []map[string]interface{} + err = json.Unmarshal(body, &results) + c.Assert(err, IsNil) + c.Assert(results, HasLen, 1) + point := results[0]["points"].([]interface{})[0].([]interface{}) + val := point[len(point)-1].(float64) + c.Assert(val, Equals, float64(11)) + } +} + +func (self *ServerSuite) TestFailureAndReplicationReplays(c *C) { + servers := self.servers + + err := servers[0].RaftServer.CreateDatabase("full_rep", uint8(3)) + c.Assert(err, IsNil) + time.Sleep(time.Millisecond * 10) + _, err = self.postToServer(self.servers[0], "/db/full_rep/users?u=root&p=root", `{"username": "paul", "password": "pass"}`, c) + c.Assert(err, IsNil) + + // write data and confirm that it went to all three servers + data := ` + [{ + "points": [ + [1] + ], + "name": "test_failure_replays", + "columns": ["val"] + }]` + resp, _ := self.postToServer(servers[0], "/db/full_rep/series?u=paul&p=pass", data, c) + c.Assert(resp.StatusCode, Equals, http.StatusOK) + time.Sleep(time.Millisecond * 10) + + countWithPoint := 0 + user := &MockUser{} + for _, server := range servers { + results := executeQuery(user, "full_rep", "select sum(val) from test_failure_replays;", server.Db, c) + pointCount := 0 + for _, series := range results { + if *series.Name == "test_failure_replays" { + if len(series.Points) > 0 { + pointCount += 1 + } + } else { + c.Error(fmt.Sprintf("Got a series in the query we didn't expect: %s", *series.Name)) + } + } + if pointCount > 0 { + countWithPoint += 1 + } + } + c.Assert(countWithPoint, Equals, 3) + + // kill a server, write data + killedConfig := servers[1].Config + + servers[1].Stop() + time.Sleep(time.Second) + // TODO: make the admin server actually close so we don't have to go to a new port + killedConfig.AdminHttpPort = 8110 + + data = ` + [{ + "points": [[2]], + "name": "test_failure_replays", + "columns": ["val"] + }] + ` + resp, _ = self.postToServer(servers[0], "/db/full_rep/series?u=paul&p=pass", data, c) + c.Assert(resp.StatusCode, Equals, http.StatusOK) + time.Sleep(time.Millisecond * 10) + + // now bring the server back up and make sure that it only has the old data. replays get triggered on write + server, err := NewServer(killedConfig) + if err != nil { + c.Error(err) + } + go func() { + err := server.ListenAndServe() + if err != nil { + c.Error(err) + } + }() + time.Sleep(time.Second * 4) + servers[1] = server + + getSum := func(db datastore.Datastore) int64 { + results := executeQuery(user, "full_rep", "select * from test_failure_replays;", db, c) + sum := int64(0) + for _, series := range results { + if *series.Name == "test_failure_replays" { + for _, point := range series.Points { + sum += *point.Values[0].Int64Value + } + } + } + return sum + } + c.Assert(getSum(servers[0].Db), Equals, int64(3)) + c.Assert(getSum(servers[1].Db), Equals, int64(1)) + c.Assert(getSum(servers[2].Db), Equals, int64(3)) + + // TODO: fix this. I do this 1k times because there's no way right now to force a replay + // on a server other than having a write with the originating server id and owner server id + // the same as the write that occured while the server was down. Doing this means it + // will almost certainly trigger one (i.e. a request will randomly hash to the org/owner server) + data = ` + [{ + "points": [[1]], + "name": "test_failure_replays", + "columns": ["val"] + }] + ` + for i := 0; i < 1000; i++ { + resp, _ = self.postToServer(servers[0], "/db/full_rep/series?u=paul&p=pass", data, c) + c.Assert(resp.StatusCode, Equals, http.StatusOK) + } + + time.Sleep(time.Millisecond * 10) + c.Assert(getSum(servers[0].Db), Equals, int64(1003)) + c.Assert(getSum(servers[1].Db), Equals, int64(1003)) + c.Assert(getSum(servers[2].Db), Equals, int64(1003)) +}