Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Distributed Queries #2202

Merged
merged 21 commits into from
Apr 11, 2015
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
21 commits
Select commit Hold shift + click to select a range
5e82ca5
Update server and handler to work with streamed responses
pauldix Mar 26, 2015
2a3ed63
Add chunked responses and streaming of raw queries.
pauldix Mar 28, 2015
728f5de
uncoment raw ordering test
pauldix Mar 28, 2015
1139950
WIP: Initial implementation of remote mapper for distributed queries.
pauldix Mar 29, 2015
6e8ea9a
Fix errors on limits and chunked raw queries.
pauldix Apr 4, 2015
d41b85a
Remove the interval setting from NextInterval to make remote mappers …
pauldix Apr 5, 2015
4a0c468
Fix the group by multiple dimensions test to be correct.
pauldix Apr 5, 2015
f5dfb14
Fix wildcard group by query with time test to be correct.
pauldix Apr 5, 2015
7661546
Finish up distributed queries.
pauldix Apr 6, 2015
b353119
Add change for distributed query engine
pauldix Apr 6, 2015
8a25683
Fix opentsdb integration tests after rebase
pauldix Apr 7, 2015
bf1a8aa
Use uint64 for Series IDs
otoolep Apr 8, 2015
37d4f2a
Fixes based on feedback.
pauldix Apr 8, 2015
9282a8a
Fix compilation errors after parser merge
otoolep Apr 9, 2015
559e1d4
Use different base port range for DQ testing
otoolep Apr 9, 2015
350795d
'reflect' is not used
otoolep Apr 9, 2015
2c554f4
Hook up "run_mapper" in top-level handler
otoolep Apr 10, 2015
925de06
Seems like partial replication reads take longer
otoolep Apr 10, 2015
61d7d0e
Limix max remote response to 1MB
otoolep Apr 10, 2015
5882f0b
Update CHANGELOG
otoolep Apr 10, 2015
5890025
Make it clearer in tests where numbers come from
otoolep Apr 10, 2015
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
## v0.9.0-rc23 [Unreleased]

### Features
- [#2202](https://github.com/influxdb/influxdb/pull/2202): Initial implementation of Distributed Queries
- [#2202](https://github.com/influxdb/influxdb/pull/2202): 64-bit Series IDs. INCOMPATIBLE WITH PREVIOUS DATASTORES.

### Bugfixes
- [#2225](https://github.com/influxdb/influxdb/pull/2225): Make keywords completely case insensitive
- [#2228](https://github.com/influxdb/influxdb/pull/2228): Accept keyword default unquoted in ALTER RETENTION POLICY statement
Expand Down
1 change: 1 addition & 0 deletions cmd/influxd/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ func (h *Handler) ServeHTTP(w http.ResponseWriter, r *http.Request) {

if strings.HasPrefix(r.URL.Path, "/data_nodes") ||
strings.HasPrefix(r.URL.Path, "/process_continuous_queries") ||
strings.HasPrefix(r.URL.Path, "/run_mapper") ||
strings.HasPrefix(r.URL.Path, "/metastore") {
h.serveMetadata(w, r)
return
Expand Down
66 changes: 44 additions & 22 deletions cmd/influxd/server_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -156,9 +156,9 @@ func createDatabase(t *testing.T, testName string, nodes Cluster, database strin

// createRetentionPolicy creates a retetention policy and verifies that the creation was successful.
// Replication factor is set to equal the number nodes in the cluster.
func createRetentionPolicy(t *testing.T, testName string, nodes Cluster, database, retention string) {
func createRetentionPolicy(t *testing.T, testName string, nodes Cluster, database, retention string, replicationFactor int) {
t.Logf("Creating retention policy %s for database %s", retention, database)
command := fmt.Sprintf("CREATE RETENTION POLICY %s ON %s DURATION 1h REPLICATION %d DEFAULT", retention, database, len(nodes))
command := fmt.Sprintf("CREATE RETENTION POLICY %s ON %s DURATION 1h REPLICATION %d DEFAULT", retention, database, replicationFactor)
query(t, nodes[:1], "", command, `{"results":[{}]}`, "")
}

Expand Down Expand Up @@ -281,12 +281,16 @@ var limitAndOffset = func(t *testing.T, node *TestNode, database, retention stri
}
}

func runTest_rawDataReturnsInOrder(t *testing.T, testName string, nodes Cluster, database, retention string) {
func runTest_rawDataReturnsInOrder(t *testing.T, testName string, nodes Cluster, database, retention string, replicationFactor int) {
// skip this test if they're just looking to run some of thd data tests
if os.Getenv("TEST_PREFIX") != "" {
return
}
t.Logf("Running %s:rawDataReturnsInOrder against %d-node cluster", testName, len(nodes))

// Start by ensuring database and retention policy exist.
createDatabase(t, testName, nodes, database)
createRetentionPolicy(t, testName, nodes, database, retention)
createRetentionPolicy(t, testName, nodes, database, retention, replicationFactor)
numPoints := 500
var expected string

Expand All @@ -297,7 +301,7 @@ func runTest_rawDataReturnsInOrder(t *testing.T, testName string, nodes Cluster,
}

expected = fmt.Sprintf(`{"results":[{"series":[{"name":"cpu","columns":["time","count"],"values":[["1970-01-01T00:00:00Z",%d]]}]}]}`, numPoints-1)
got, ok := queryAndWait(t, nodes, database, `SELECT count(value) FROM cpu`, expected, "", 60*time.Second)
got, ok := queryAndWait(t, nodes, database, `SELECT count(value) FROM cpu`, expected, "", 120*time.Second)
if !ok {
t.Errorf("test %s:rawDataReturnsInOrder failed, SELECT count() query returned unexpected data\nexp: %s\n, got: %s", testName, expected, got)
}
Expand Down Expand Up @@ -347,15 +351,15 @@ func runTests_Errors(t *testing.T, nodes Cluster) {
}

// runTests tests write and query of data. Setting testNumbers allows only a subset of tests to be run.
func runTestsData(t *testing.T, testName string, nodes Cluster, database, retention string) {
func runTestsData(t *testing.T, testName string, nodes Cluster, database, retention string, replicationFactor int) {
t.Logf("Running tests against %d-node cluster", len(nodes))

yesterday := time.Now().Add(-1 * time.Hour * 24).UTC()
now := time.Now().UTC()

// Start by ensuring database and retention policy exist.
createDatabase(t, testName, nodes, database)
createRetentionPolicy(t, testName, nodes, database, retention)
createRetentionPolicy(t, testName, nodes, database, retention, replicationFactor)

// The tests. Within these tests %DB% and %RP% will be replaced with the database and retention passed into
// this function.
Expand Down Expand Up @@ -558,7 +562,7 @@ func runTestsData(t *testing.T, testName string, nodes Cluster, database, retent
{"name": "load", "timestamp": "2000-01-01T00:00:10Z", "tags": {"region": "us-east", "host": "serverB"}, "fields": {"value": 30}},
{"name": "load", "timestamp": "2000-01-01T00:00:00Z", "tags": {"region": "us-west", "host": "serverC"}, "fields": {"value": 100}}
]}`,
query: `SELECT sum(value) FROM load GROUP BY time(10s), region, host`,
query: `SELECT sum(value) FROM load GROUP BY region, host`,
queryDb: "%DB%",
expected: `{"results":[{"series":[{"name":"load","tags":{"host":"serverA","region":"us-east"},"columns":["time","sum"],"values":[["1970-01-01T00:00:00Z",20]]},{"name":"load","tags":{"host":"serverB","region":"us-east"},"columns":["time","sum"],"values":[["1970-01-01T00:00:00Z",30]]},{"name":"load","tags":{"host":"serverC","region":"us-west"},"columns":["time","sum"],"values":[["1970-01-01T00:00:00Z",100]]}]}]}`,
},
Expand Down Expand Up @@ -625,9 +629,9 @@ func runTestsData(t *testing.T, testName string, nodes Cluster, database, retent
},
{
name: "wildcard GROUP BY queries with time",
query: `SELECT mean(value) FROM cpu GROUP BY *,time(1m)`,
query: `SELECT mean(value) FROM cpu WHERE time >= '2000-01-01T00:00:00Z' AND time < '2000-01-01T00:01:00Z' GROUP BY *,time(1m)`,
queryDb: "%DB%",
expected: `{"results":[{"series":[{"name":"cpu","tags":{"region":"us-east"},"columns":["time","mean"],"values":[["1970-01-01T00:00:00Z",15]]},{"name":"cpu","tags":{"region":"us-west"},"columns":["time","mean"],"values":[["1970-01-01T00:00:00Z",30]]}]}]}`,
expected: `{"results":[{"series":[{"name":"cpu","tags":{"region":"us-east"},"columns":["time","mean"],"values":[["2000-01-01T00:00:00Z",15]]},{"name":"cpu","tags":{"region":"us-west"},"columns":["time","mean"],"values":[["2000-01-01T00:00:00Z",30]]}]}]}`,
},

// WHERE tag queries
Expand Down Expand Up @@ -1307,7 +1311,7 @@ func runTestsData(t *testing.T, testName string, nodes Cluster, database, retent
t.Logf(`reseting for test "%s"`, name)
deleteDatabase(t, testName, nodes, database)
createDatabase(t, testName, nodes, database)
createRetentionPolicy(t, testName, nodes, database, retention)
createRetentionPolicy(t, testName, nodes, database, retention, replicationFactor)
}

if tt.write != "" {
Expand Down Expand Up @@ -1354,8 +1358,8 @@ func TestSingleServer(t *testing.T) {
nodes := createCombinedNodeCluster(t, testName, dir, 1, nil)
defer nodes.Close()

runTestsData(t, testName, nodes, "mydb", "myrp")
runTest_rawDataReturnsInOrder(t, testName, nodes, "mydb", "myrp")
runTestsData(t, testName, nodes, "mydb", "myrp", len(nodes))
runTest_rawDataReturnsInOrder(t, testName, nodes, "mydb", "myrp", len(nodes))
}

func Test3NodeServer(t *testing.T) {
Expand All @@ -1372,9 +1376,27 @@ func Test3NodeServer(t *testing.T) {
nodes := createCombinedNodeCluster(t, testName, dir, 3, nil)
defer nodes.Close()

runTestsData(t, testName, nodes, "mydb", "myrp")
runTest_rawDataReturnsInOrder(t, testName, nodes, "mydb", "myrp")
runTestsData(t, testName, nodes, "mydb", "myrp", len(nodes))
runTest_rawDataReturnsInOrder(t, testName, nodes, "mydb", "myrp", len(nodes))

}

// ensure that all queries work if there are more nodes in a cluster than the replication factor
func Test3NodeClusterPartiallyReplicated(t *testing.T) {
testName := "3-node server integration"
if testing.Short() {
t.Skip(fmt.Sprintf("skipping '%s'", testName))
}
dir := tempfile()
defer func() {
os.RemoveAll(dir)
}()

nodes := createCombinedNodeCluster(t, testName, dir, 3, nil)
defer nodes.Close()

runTestsData(t, testName, nodes, "mydb", "myrp", len(nodes)-1)
runTest_rawDataReturnsInOrder(t, testName, nodes, "mydb", "myrp", len(nodes)-1)
}

func TestClientLibrary(t *testing.T) {
Expand Down Expand Up @@ -1478,7 +1500,7 @@ func TestClientLibrary(t *testing.T) {
test.rp = "myrp"
}
createDatabase(t, testName, nodes, test.db)
createRetentionPolicy(t, testName, nodes, test.db, test.rp)
createRetentionPolicy(t, testName, nodes, test.db, test.rp, len(nodes))
t.Logf("testing %s - %s\n", testName, test.name)
for _, w := range test.writes {
writeResult, err := c.Write(w.bp)
Expand Down Expand Up @@ -1533,7 +1555,7 @@ func Test_ServerSingleGraphiteIntegration(t *testing.T) {
defer nodes.Close()

createDatabase(t, testName, nodes, "graphite")
createRetentionPolicy(t, testName, nodes, "graphite", "raw")
createRetentionPolicy(t, testName, nodes, "graphite", "raw", len(nodes))

// Connect to the graphite endpoint we just spun up
conn, err := net.Dial("tcp", g.ConnectionString(c.BindAddress))
Expand Down Expand Up @@ -1584,7 +1606,7 @@ func Test_ServerSingleGraphiteIntegration_FractionalTime(t *testing.T) {
defer nodes.Close()

createDatabase(t, testName, nodes, "graphite")
createRetentionPolicy(t, testName, nodes, "graphite", "raw")
createRetentionPolicy(t, testName, nodes, "graphite", "raw", len(nodes))

// Connect to the graphite endpoint we just spun up
conn, err := net.Dial("tcp", g.ConnectionString(c.BindAddress))
Expand Down Expand Up @@ -1636,7 +1658,7 @@ func Test_ServerSingleGraphiteIntegration_ZeroDataPoint(t *testing.T) {
defer nodes.Close()

createDatabase(t, testName, nodes, "graphite")
createRetentionPolicy(t, testName, nodes, "graphite", "raw")
createRetentionPolicy(t, testName, nodes, "graphite", "raw", len(nodes))

// Connect to the graphite endpoint we just spun up
conn, err := net.Dial("tcp", g.ConnectionString(c.BindAddress))
Expand Down Expand Up @@ -1747,7 +1769,7 @@ func Test_ServerOpenTSDBIntegration(t *testing.T) {
defer nodes.Close()

createDatabase(t, testName, nodes, "opentsdb")
createRetentionPolicy(t, testName, nodes, "opentsdb", "raw")
createRetentionPolicy(t, testName, nodes, "opentsdb", "raw", len(nodes))

// Connect to the graphite endpoint we just spun up
conn, err := net.Dial("tcp", o.ListenAddress(c.BindAddress))
Expand Down Expand Up @@ -1798,7 +1820,7 @@ func Test_ServerOpenTSDBIntegration_WithTags(t *testing.T) {
defer nodes.Close()

createDatabase(t, testName, nodes, "opentsdb")
createRetentionPolicy(t, testName, nodes, "opentsdb", "raw")
createRetentionPolicy(t, testName, nodes, "opentsdb", "raw", len(nodes))

// Connect to the graphite endpoint we just spun up
conn, err := net.Dial("tcp", o.ListenAddress(c.BindAddress))
Expand Down Expand Up @@ -1852,7 +1874,7 @@ func Test_ServerOpenTSDBIntegration_BadData(t *testing.T) {
defer nodes.Close()

createDatabase(t, testName, nodes, "opentsdb")
createRetentionPolicy(t, testName, nodes, "opentsdb", "raw")
createRetentionPolicy(t, testName, nodes, "opentsdb", "raw", len(nodes))

// Connect to the graphite endpoint we just spun up
conn, err := net.Dial("tcp", o.ListenAddress(c.BindAddress))
Expand Down
2 changes: 1 addition & 1 deletion commands.go
Original file line number Diff line number Diff line change
Expand Up @@ -202,7 +202,7 @@ func (c *createMeasurementsIfNotExistsCommand) addFieldIfNotExists(measurement,

type dropSeriesCommand struct {
Database string `json:"database"`
SeriesByMeasurement map[string][]uint32 `json:"seriesIds"`
SeriesByMeasurement map[string][]uint64 `json:"seriesIds"`
}

// createContinuousQueryCommand is the raft command for creating a continuous query on a database
Expand Down
Loading