Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

This is the beginning of the clustering stuff. Client/server communication, etc. #20

Closed
wants to merge 31 commits into from
Closed
Show file tree
Hide file tree
Changes from 8 commits
Commits
Show all changes
31 commits
Select commit Hold shift + click to select a range
b85c337
Make datastore assign point times and sequence numbers if absent. Add…
pauldix Nov 7, 2013
f2e7749
Move server to influxd to make way for server struct.
pauldix Nov 8, 2013
26c1044
Add protobuf server port to config. Create server object to wrap ever…
pauldix Nov 9, 2013
11554a2
Add replication factor to databases with default of 1. Move Database …
pauldix Nov 10, 2013
023219d
Initialize admin server in server.
pauldix Nov 10, 2013
b255ffe
Add sharding and replication to coordinator and replication to the re…
pauldix Nov 14, 2013
915356f
Make datastore test expect 0 fields on empty results
pauldix Nov 14, 2013
55faca5
Merge branch 'master' into add-clustering
pauldix Nov 14, 2013
75995f1
Fix bug where proxies would happen even if a successful write occurs …
pauldix Nov 15, 2013
70c75ef
* Fix IsSingleServer to return true when it's actually single.
pauldix Nov 16, 2013
ec1ef67
Merge branch 'master' into add-clustering
pauldix Nov 16, 2013
ee6e614
Fix proxied writes to try all possible replicas
pauldix Nov 17, 2013
a7e5ca7
Use CompareAndSwap in ProtobufClient instead of locks
pauldix Nov 17, 2013
5ec7698
Move go-cache dependency to InfluxDB fork so we can control the version
pauldix Nov 17, 2013
a648134
Add sweeper to clear out timed out requests. Fixes for clearing reque…
pauldix Nov 17, 2013
7e6ee7a
Fix calls to CompareAndSwap in ProtobufClient
pauldix Nov 17, 2013
c586397
Some cleanup with renaming and removing extraneous call to CompareAnd…
pauldix Nov 17, 2013
597f7a5
Add method comments to make Johnny S. happy ;)
pauldix Nov 17, 2013
fa5ee70
Fix bug where if local write fails, it would end up proxying the writ…
pauldix Nov 17, 2013
46a93cc
use daemon instead of server
jvshahid Nov 17, 2013
041dc79
make the integration test wait for the server to start
jvshahid Nov 17, 2013
28863f2
Fix coordinator to assign sequence_number and time even when in singl…
pauldix Nov 17, 2013
c7c88d8
WIP: Wire up distributed queries
pauldix Nov 18, 2013
eecc9be
Merge branch 'master' into add-clustering
pauldix Nov 20, 2013
4c53668
Update datastore to take optional ring filter. Fix replication and di…
pauldix Nov 21, 2013
17ffb61
Update write requests to have a cluster version. Add sequenceNumber, …
pauldix Nov 24, 2013
7e4378f
Add request logging that rolls over every day and deletes any logs ol…
pauldix Nov 26, 2013
065ccfc
Merge branch 'master' into add-clustering
pauldix Nov 30, 2013
28787ea
WIP: Implement replication log replays for servers that get out of sync.
pauldix Dec 2, 2013
ad8afe9
Fix api, coordinator, and client server test to work with new replay …
pauldix Dec 2, 2013
3f7c2fa
Fix bugs in replication replays and test. Fix other tests to work wit…
pauldix Dec 3, 2013
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion config.json.sample
Original file line number Diff line number Diff line change
Expand Up @@ -5,5 +5,6 @@
"RaftServerPort": 8090,
"SeedServers": [],
"DataDir": "/tmp/influxdb/development/db",
"RaftDir": "/tmp/influxdb/development/raft"
"RaftDir": "/tmp/influxdb/development/raft",
"ProtobufPort": 8099
}
12 changes: 6 additions & 6 deletions package.sh
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ function package_admin_interface {
}

function packae_source {
rm -f server
rm -f influxd
git ls-files --others | egrep -v 'github|launchpad|code.google' > /tmp/influxdb.ignored
echo "pkg/*" >> /tmp/influxdb.ignored
echo "packages/*" >> /tmp/influxdb.ignored
Expand Down Expand Up @@ -55,7 +55,7 @@ function package_files {

package_admin_interface

mv server build/influxdb
mv influxd build/influxdb

# cp -R src/admin/site/ build/admin/
mkdir build/admin
Expand Down Expand Up @@ -110,14 +110,14 @@ function build_packages {
function setup_version {
echo "Changing version from dev to $influxdb_version"
sha1=`git rev-list --max-count=1 HEAD`
sed -i.bak -e "s/version = \"dev\"/version = \"$influxdb_version\"/" -e "s/gitSha\s*=\s*\"HEAD\"/gitSha = \"$sha1\"/" src/server/server.go
sed -i.bak -e "s/version = \"dev\"/version = \"$influxdb_version\"/" -e "s/gitSha\s*=\s*\"HEAD\"/gitSha = \"$sha1\"/" src/daemon/influxd.go
sed -i.bak -e "s/REPLACE_VERSION/$influxdb_version/" scripts/post_install.sh
}

function revert_version {
if [ -e src/server/server.go.bak ]; then
rm src/server/server.go
mv src/server/server.go.bak src/server/server.go
if [ -e src/daemon/influxd.go.bak ]; then
rm src/daemon/influxd.go
mv src/daemon/influxd.go.bak src/daemon/influxd.go
fi

if [ -e scripts/post_install.sh ]; then
Expand Down
15 changes: 4 additions & 11 deletions src/api/http/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -344,23 +344,16 @@ func (self *HttpServer) writePoints(w libhttp.ResponseWriter, r *libhttp.Request
}

type createDatabaseRequest struct {
Name string `json:"name"`
}

type Database struct {
Name string `json:"name"`
Name string `json:"name"`
ReplicationFactor uint8 `json:"replicationFactor"`
}

func (self *HttpServer) listDatabases(w libhttp.ResponseWriter, r *libhttp.Request) {
self.tryAsClusterAdmin(w, r, func(u common.User) (int, interface{}) {
dbNames, err := self.coordinator.ListDatabases(u)
databases, err := self.coordinator.ListDatabases(u)
if err != nil {
return errorToStatusCode(err), err.Error()
}
databases := make([]*Database, 0, len(dbNames))
for _, db := range dbNames {
databases = append(databases, &Database{db})
}
return libhttp.StatusOK, databases
})
}
Expand All @@ -376,7 +369,7 @@ func (self *HttpServer) createDatabase(w libhttp.ResponseWriter, r *libhttp.Requ
if err != nil {
return libhttp.StatusBadRequest, err.Error()
}
err = self.coordinator.CreateDatabase(user, createRequest.Name)
err = self.coordinator.CreateDatabase(user, createRequest.Name, createRequest.ReplicationFactor)
if err != nil {
return errorToStatusCode(err), err.Error()
}
Expand Down
15 changes: 8 additions & 7 deletions src/api/http/api_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package http
import (
"bytes"
"common"
"coordinator"
"encoding/base64"
"encoding/json"
"fmt"
Expand Down Expand Up @@ -109,13 +110,13 @@ func (self *MockCoordinator) WriteSeriesData(_ common.User, db string, series *p
return nil
}

func (self *MockCoordinator) CreateDatabase(_ common.User, db string) error {
func (self *MockCoordinator) CreateDatabase(_ common.User, db string, _ uint8) error {
self.db = db
return nil
}

func (self *MockCoordinator) ListDatabases(_ common.User) ([]string, error) {
return []string{"db1", "db2"}, nil
func (self *MockCoordinator) ListDatabases(_ common.User) ([]*coordinator.Database, error) {
return []*coordinator.Database{&coordinator.Database{"db1", 1}, &coordinator.Database{"db2", 1}}, nil
}

func (self *MockCoordinator) DropDatabase(_ common.User, db string) error {
Expand Down Expand Up @@ -602,10 +603,10 @@ func (self *ApiSuite) TestDatabasesIndex(c *C) {
defer resp.Body.Close()
body, err := ioutil.ReadAll(resp.Body)
c.Assert(err, IsNil)
users := []*Database{}
users := []*coordinator.Database{}
err = json.Unmarshal(body, &users)
c.Assert(err, IsNil)
c.Assert(users, DeepEquals, []*Database{&Database{"db1"}, &Database{"db2"}})
c.Assert(users, DeepEquals, []*coordinator.Database{&coordinator.Database{"db1", uint8(1)}, &coordinator.Database{"db2", uint8(1)}})
}
}

Expand All @@ -620,8 +621,8 @@ func (self *ApiSuite) TestBasicAuthentication(c *C) {
body, err := ioutil.ReadAll(resp.Body)
c.Assert(err, IsNil)
c.Assert(resp.StatusCode, Equals, libhttp.StatusOK)
users := []*Database{}
users := []*coordinator.Database{}
err = json.Unmarshal(body, &users)
c.Assert(err, IsNil)
c.Assert(users, DeepEquals, []*Database{&Database{"db1"}, &Database{"db2"}})
c.Assert(users, DeepEquals, []*coordinator.Database{&coordinator.Database{"db1", 1}, &coordinator.Database{"db2", 1}})
}
17 changes: 17 additions & 0 deletions src/common/helpers.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,11 @@
package common

import (
"bytes"
"crypto/sha1"
"encoding/binary"
"encoding/json"
"fmt"
"protocol"
"time"
)
Expand All @@ -15,3 +19,16 @@ func StringToSeriesArray(seriesString string) ([]*protocol.Series, error) {
func CurrentTime() int64 {
return time.Now().UnixNano() / int64(1000)
}

func RingLocation(database *string, timeSeries *string, time *int64) int {
hasher := sha1.New()
hasher.Write([]byte(fmt.Sprintf("%s%s%d", *database, *timeSeries, *time)))
buf := bytes.NewBuffer(hasher.Sum(nil))
var n int64
binary.Read(buf, binary.LittleEndian, &n)
nInt := int(n)
if nInt < 0 {
nInt = nInt * -1
}
return nInt
}
24 changes: 24 additions & 0 deletions src/configuration/configuration.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"fmt"
"io/ioutil"
"log"
"os"
)

type Configuration struct {
Expand All @@ -15,6 +16,8 @@ type Configuration struct {
SeedServers []string
DataDir string
RaftDir string
ProtobufPort int
Hostname string
}

func LoadConfiguration(fileName string) *Configuration {
Expand Down Expand Up @@ -43,3 +46,24 @@ func (self *Configuration) AdminHttpPortString() string {
func (self *Configuration) ApiHttpPortString() string {
return fmt.Sprintf(":%d", self.ApiHttpPort)
}

func (self *Configuration) ProtobufPortString() string {
return fmt.Sprintf(":%d", self.ProtobufPort)
}

func (self *Configuration) HostnameOrDetect() string {
if self.Hostname != "" {
return self.Hostname
} else {
n, err := os.Hostname()
if err == nil {
return n
} else {
return "localhost"
}
}
}

func (self *Configuration) ProtobufConnectionString() string {
return fmt.Sprintf("%s:%d", self.HostnameOrDetect(), self.ProtobufPort)
}
1 change: 0 additions & 1 deletion src/coordinator/client.go

This file was deleted.

79 changes: 79 additions & 0 deletions src/coordinator/client_server_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
package coordinator

import (
"datastore"
"fmt"
. "launchpad.net/gocheck"
"os"
"protocol"
"time"
)

type ClientServerSuite struct{}

var _ = Suite(&ClientServerSuite{})

const DB_DIR = "/tmp/influxdb/datastore_test"

func newDatastore(c *C) datastore.Datastore {
os.MkdirAll(DB_DIR, 0744)
db, err := datastore.NewLevelDbDatastore(DB_DIR)
c.Assert(err, Equals, nil)
return db
}

func cleanDb(db datastore.Datastore) {
if db != nil {
db.Close()
}
os.RemoveAll(DB_DIR)
}

func (self *ClientServerSuite) TestClientCanMakeRequests(c *C) {
server := startAndVerifyCluster(1, c)[0]
defer clean(server)
db := newDatastore(c)
requestHandler := NewProtobufRequestHandler(db, server)
protobufServer := NewProtobufServer(":8091", requestHandler)
go protobufServer.ListenAndServe()
c.Assert(protobufServer, Not(IsNil))
protobufClient := NewProtobufClient("localhost:8091")
responseStream := make(chan *protocol.Response, 1)

mock := `
{
"points": [
{ "values": [{"int64_value": 3}]},
{ "values": [{"int64_value": 23}]}
],
"name": "foo",
"fields": ["val"]
}`
fmt.Println("creating series")
series := stringToSeries(mock, c)
id := uint32(1)
database := "pauldb"
proxyWrite := protocol.Request_PROXY_WRITE
request := &protocol.Request{Id: &id, Type: &proxyWrite, Database: &database, Series: series}

time.Sleep(time.Second * 1)
err := protobufClient.MakeRequest(request, responseStream)
c.Assert(err, IsNil)
timer := time.NewTimer(time.Second)
select {
case <-timer.C:
c.Error("Timed out waiting for response")
case response := <-responseStream:
c.Assert(*response.Type, Equals, protocol.Response_WRITE_OK)
}
}

func (self *ClientServerSuite) TestClientReconnectsIfDisconnected(c *C) {
}

func (self *ClientServerSuite) TestServerExecutesReplayRequestIfWriteIsOutOfSequence(c *C) {
}

func (self *ClientServerSuite) TestServerKillsOldHandlerWhenClientReconnects(c *C) {

}
Loading