Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Distributed Query/Clustering Fixes #2353

Merged
merged 7 commits into from
Apr 21, 2015
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
### Features
- [#2301](https://github.com/influxdb/influxdb/pull/2301): Distributed query load balancing and failover
- [#2336](https://github.com/influxdb/influxdb/pull/2336): Handle distributed queries when shards != data nodes
- [#2353](https://github.com/influxdb/influxdb/pull/2353): Distributed Query/Clustering Fixes

### Bugfixes
- [#2297](https://github.com/influxdb/influxdb/pull/2297): create /var/run during startup. Thanks @neonstalwart.
Expand All @@ -18,6 +19,10 @@
- [#2338](https://github.com/influxdb/influxdb/pull/2338): Fix panic if tag key isn't double quoted when it should have been
- [#2340](https://github.com/influxdb/influxdb/pull/2340): Fix SHOW DIAGNOSTICS panic if any shard was non-local.
- [#2351](https://github.com/influxdb/influxdb/pull/2351): Fix data race by rlocking shard during diagnostics.
- [#2348](https://github.com/influxdb/influxdb/pull/2348): Data node fail to join cluster in 0.9.0rc25
- [#2343](https://github.com/influxdb/influxdb/pull/2343): Node falls behind Metastore updates
- [#2334](https://github.com/influxdb/influxdb/pull/2334): Test Partial replication is very problematic
- [#2272](https://github.com/influxdb/influxdb/pull/2272): clustering: influxdb 0.9.0-rc23 panics when doing a GET with merge_metrics in a

## v0.9.0-rc25 [2015-04-15]

Expand Down
2 changes: 1 addition & 1 deletion client/influxdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ func NewClient(c Config) (*Client, error) {
url: c.URL,
username: c.Username,
password: c.Password,
httpClient: &http.Client{},
httpClient: http.DefaultClient,
userAgent: c.UserAgent,
}
if client.userAgent == "" {
Expand Down
32 changes: 15 additions & 17 deletions cmd/influxd/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -642,30 +642,18 @@ func (cmd *RunCommand) openServer(joinURLs []url.URL) *influxdb.Server {
// Give brokers time to elect a leader if entire cluster is being restarted.
time.Sleep(1 * time.Second)

if s.ID() == 0 && s.Index() == 0 {
if len(joinURLs) > 0 {
joinServer(s, *cmd.node.ClusterURL(), joinURLs)
return s
}

if err := s.Initialize(*cmd.node.ClusterURL()); err != nil {
log.Fatalf("server initialization error(0): %s", err)
}

u := cmd.node.ClusterURL()
log.Printf("initialized data node: %s\n", u.String())
return s
if s.ID() == 0 {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So why wasn't this condition correct? Why was also requiring Index() to be 0 wrong?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

From what I understand, after you open your own meta store, you will read your ID from that, if you're ID isn't 0, your index certainly shouldn't be either, as you have been part of a cluster from before.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If I understand this issue correctly, the bug was that sometimes s.ID() was 0 but s.Index() was not, hence the bug?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. Index was non-zero.

joinOrInitializeServer(s, *cmd.node.ClusterURL(), joinURLs)
} else {
log.Printf("data node already member of cluster. Using existing state and ignoring join URLs")
}

return s
}

// joins a server to an existing cluster.
func joinServer(s *influxdb.Server, u url.URL, joinURLs []url.URL) {
// TODO: Use separate broker and data join urls.

// joinOrInitializeServer joins a new server to an existing cluster or initializes it as the first
// member of the cluster
func joinOrInitializeServer(s *influxdb.Server, u url.URL, joinURLs []url.URL) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Now that I see it I don't fully understand the comments in this function.

"Create data node on an existing data node." -- what do this mean?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Comment is incorrect, will fix.

// Create data node on an existing data node.
for _, joinURL := range joinURLs {
if err := s.Join(&u, &joinURL); err == influxdb.ErrDataNodeNotFound {
Expand All @@ -676,12 +664,22 @@ func joinServer(s *influxdb.Server, u url.URL, joinURLs []url.URL) {
log.Printf("initialized data node: %s\n", (&u).String())
return
} else if err != nil {
// does not return so that the next joinURL can be tried
log.Printf("join: failed to connect data node: %s: %s", (&u).String(), err)
} else {
log.Printf("join: connected data node to %s", u)
return
}
}

if len(joinURLs) == 0 {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not strictly part of this, but are we missing a return at line 666 above? If so, can you add it to this PR?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We did introduce that in this PR, and yes, it should have a return. Will fix.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK. Perhaps it's something about the race testing framework?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No. Should not return here because this is looping over each joinURL. If one fails, it will try the next one. If they all fail, L681 will run and fatal out.

if err := s.Initialize(u); err != nil {
log.Fatalf("server initialization error(2): %s", err)
}
log.Printf("initialized data node: %s\n", (&u).String())
return
}

log.Fatalf("join: failed to connect data node to any specified server")
}

Expand Down
11 changes: 6 additions & 5 deletions cmd/influxd/server_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1458,9 +1458,10 @@ func Test3NodeServerFailover(t *testing.T) {
}

// ensure that all queries work if there are more nodes in a cluster than the replication factor
func Test3NodeClusterPartiallyReplicated(t *testing.T) {
// and there is more than 1 shards
func Test5NodeClusterPartiallyReplicated(t *testing.T) {
t.Parallel()
testName := "3-node server integration partial replication"
testName := "5-node server integration partial replication"
if testing.Short() {
t.Skip(fmt.Sprintf("skipping '%s'", testName))
}
Expand All @@ -1469,11 +1470,11 @@ func Test3NodeClusterPartiallyReplicated(t *testing.T) {
os.RemoveAll(dir)
}()

nodes := createCombinedNodeCluster(t, testName, dir, 3, nil)
nodes := createCombinedNodeCluster(t, testName, dir, 5, nil)
defer nodes.Close()

runTestsData(t, testName, nodes, "mydb", "myrp", len(nodes)-1)
runTest_rawDataReturnsInOrder(t, testName, nodes, "mydb", "myrp", len(nodes)-1)
runTestsData(t, testName, nodes, "mydb", "myrp", 2)
runTest_rawDataReturnsInOrder(t, testName, nodes, "mydb", "myrp", 2)
}

func TestClientLibrary(t *testing.T) {
Expand Down
5 changes: 4 additions & 1 deletion influxql/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -276,8 +276,11 @@ func (m *MapReduceJob) processRawQuery(out chan *Row, filterEmptyResults bool) {
// add up to the index to the values
values = append(values, o[:ind]...)

// clear out previously sent mapper output data
mapperOutputs[j] = mapperOutputs[j][ind:]

// if we emptied out all the values, set this output to nil so that the mapper will get run again on the next loop
if ind == len(o) {
if len(mapperOutputs[j]) == 0 {
mapperOutputs[j] = nil
}
}
Expand Down
32 changes: 26 additions & 6 deletions messaging/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,11 +34,11 @@ const (
// Client represents a client for the broker's HTTP API.
type Client struct {
mu sync.Mutex
path string // config file path
conns []*Conn // all connections opened by client
url url.URL // current known leader URL
urls []url.URL // list of available broker URLs
dataURL url.URL // URL of the client's data node
path string // config file path
conns map[uint64]*Conn // all connections opened by client
url url.URL // current known leader URL
urls []url.URL // list of available broker URLs
dataURL url.URL // URL of the client's data node

opened bool

Expand All @@ -61,6 +61,7 @@ func NewClient(dataURL url.URL) *Client {
ReconnectTimeout: DefaultReconnectTimeout,
PingInterval: DefaultPingInterval,
dataURL: dataURL,
conns: map[uint64]*Conn{},
}
return c
}
Expand Down Expand Up @@ -353,12 +354,31 @@ func (c *Client) Conn(topicID uint64) *Conn {
conn := NewConn(topicID, &c.dataURL)
conn.SetURL(c.url)

if _, ok := c.conns[topicID]; ok {
panic(fmt.Sprintf("connection for topic %d already exists", topicID))
}
// Add to list of client connections.
c.conns = append(c.conns, conn)
c.conns[topicID] = conn

return conn
}

// CloseConn closes the connection to the broker for a given topic
func (c *Client) CloseConn(topicID uint64) error {
c.mu.Lock()
defer c.mu.Unlock()

if conn, ok := c.conns[topicID]; ok && conn != nil {
if err := conn.Close(); err != nil {
return err
}

delete(c.conns, topicID)
}

return nil
}

// pinger periodically pings the broker to check that it is alive.
func (c *Client) pinger(closing chan struct{}) {
defer c.wg.Done()
Expand Down
32 changes: 32 additions & 0 deletions server.go
Original file line number Diff line number Diff line change
Expand Up @@ -1015,8 +1015,38 @@ func (s *Server) applyDropDatabase(m *messaging.Message) (err error) {
// Remove from metastore.
err = s.meta.mustUpdate(m.Index, func(tx *metatx) error { return tx.dropDatabase(c.Name) })

db := s.databases[c.Name]
for _, rp := range db.policies {
for _, sg := range rp.shardGroups {
for _, sh := range sg.Shards {

// if we have this shard locally, close and remove it
if sh.store != nil {
// close topic readers/heartbeaters/etc. connections
err := s.client.CloseConn(sh.ID)
if err != nil {
panic(err)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would log.Fatal be better here? Not sure, just a suggestion.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These shouldn't ever happen so I used panic so we can get get a line number and more context if they did. Not sure how best to handle this though because we've dropped the database from the metastore but we're failing to cleanup resources.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cool. Panic works for me.

On Tuesday, April 21, 2015, Jason Wilder notifications@github.com wrote:

In server.go
#2353 (comment):

@@ -1015,8 +1015,38 @@ func (s *Server) applyDropDatabase(m *messaging.Message) (err error) {
// Remove from metastore.
err = s.meta.mustUpdate(m.Index, func(tx *metatx) error { return tx.dropDatabase(c.Name) })

  • db := s.databases[c.Name]
  • for _, rp := range db.policies {
  •   for _, sg := range rp.shardGroups {
    
  •       for _, sh := range sg.Shards {
    
  •           // if we have this shard locally, close and remove it
    
  •           if sh.store != nil {
    
  •               // close topic readers/heartbeaters/etc. connections
    
  •               err := s.client.CloseConn(sh.ID)
    
  •               if err != nil {
    
  •                   panic(err)
    

These shouldn't ever happen so I used panic so we can get get a line
number and more context if they did. Not sure how best to handle this
though because we've dropped the database from the metastore but we're
failing to cleanup resources.


Reply to this email directly or view it on GitHub
https://github.com/influxdb/influxdb/pull/2353/files#r28811098.

}

err = sh.close()
if err != nil {
panic(err)
}

err = os.Remove(s.shardPath(sh.ID))
if err != nil {
panic(err)
}
}

delete(s.shards, sh.ID)
}
}
}

// Delete the database entry.
delete(s.databases, c.Name)

return
}

Expand Down Expand Up @@ -3157,6 +3187,7 @@ func (s *Server) StartLocalMapper(rm *RemoteMapper) (*LocalMapper, error) {
selectFields: rm.SelectFields,
selectTags: rm.SelectTags,
interval: rm.Interval,
tmin: rm.TMin,
tmax: rm.TMax,
limit: limit,
}
Expand Down Expand Up @@ -3517,6 +3548,7 @@ type MessagingClient interface {

// Conn returns an open, streaming connection to a topic.
Conn(topicID uint64) MessagingConn
CloseConn(topicID uint64) error
}

type messagingClient struct {
Expand Down
18 changes: 18 additions & 0 deletions test/messaging.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,24 @@ func (c *MessagingClient) Conn(topicID uint64) influxdb.MessagingConn {
return c.ConnFunc(topicID)
}

func (c *MessagingClient) CloseConn(topicID uint64) error {
c.mu.Lock()
defer c.mu.Unlock()

conns := []*MessagingConn{}
for _, conn := range c.conns {
if conn.topicID == topicID {
if err := conn.Close(); err != nil {
return err
}
continue
}
conns = append(conns, conn)
}
c.conns = conns
return nil
}

// DefaultConnFunc returns a connection for a specific topic.
func (c *MessagingClient) DefaultConnFunc(topicID uint64) influxdb.MessagingConn {
c.mu.Lock()
Expand Down
7 changes: 4 additions & 3 deletions tx.go
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,7 @@ func (tx *tx) CreateMapReduceJobs(stmt *influxql.SelectStatement, tagKeys []stri
shards[shard] = append(shards[shard], sid)
}

for shard, _ := range shards {
for shard, sids := range shards {
var mapper influxql.Mapper

// create either a remote or local mapper for this shard
Expand All @@ -167,7 +167,7 @@ func (tx *tx) CreateMapReduceJobs(stmt *influxql.SelectStatement, tagKeys []stri
MeasurementName: m.Name,
TMin: tmin.UnixNano(),
TMax: tmax.UnixNano(),
SeriesIDs: t.SeriesIDs,
SeriesIDs: sids,
ShardID: shard.ID,
WhereFields: whereFields,
SelectFields: selectFields,
Expand All @@ -179,14 +179,15 @@ func (tx *tx) CreateMapReduceJobs(stmt *influxql.SelectStatement, tagKeys []stri
mapper.(*RemoteMapper).SetFilters(t.Filters)
} else {
mapper = &LocalMapper{
seriesIDs: t.SeriesIDs,
seriesIDs: sids,
db: shard.store,
job: job,
decoder: NewFieldCodec(m),
filters: t.Filters,
whereFields: whereFields,
selectFields: selectFields,
selectTags: selectTags,
tmin: tmin.UnixNano(),
tmax: tmax.UnixNano(),
interval: interval,
// multiple mappers may need to be merged together to get the results
Expand Down