diff --git a/CHANGELOG.md b/CHANGELOG.md index b52b211a7e1..ad0cec30015 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,9 @@ ## v0.9.0-rc23 [Unreleased] +### Features +- [#2202](https://github.com/influxdb/influxdb/pull/2202): Initial implementation of Distributed Queries +- [#2202](https://github.com/influxdb/influxdb/pull/2202): 64-bit Series IDs. INCOMPATIBLE WITH PREVIOUS DATASTORES. + ### Bugfixes - [#2225](https://github.com/influxdb/influxdb/pull/2225): Make keywords completely case insensitive - [#2228](https://github.com/influxdb/influxdb/pull/2228): Accept keyword default unquoted in ALTER RETENTION POLICY statement diff --git a/cmd/influxd/handler.go b/cmd/influxd/handler.go index 7568a4beba0..3bf18848cbd 100644 --- a/cmd/influxd/handler.go +++ b/cmd/influxd/handler.go @@ -58,6 +58,7 @@ func (h *Handler) ServeHTTP(w http.ResponseWriter, r *http.Request) { if strings.HasPrefix(r.URL.Path, "/data_nodes") || strings.HasPrefix(r.URL.Path, "/process_continuous_queries") || + strings.HasPrefix(r.URL.Path, "/run_mapper") || strings.HasPrefix(r.URL.Path, "/metastore") { h.serveMetadata(w, r) return diff --git a/cmd/influxd/server_integration_test.go b/cmd/influxd/server_integration_test.go index 64cd7f74b8d..94c164f9ba8 100644 --- a/cmd/influxd/server_integration_test.go +++ b/cmd/influxd/server_integration_test.go @@ -156,9 +156,9 @@ func createDatabase(t *testing.T, testName string, nodes Cluster, database strin // createRetentionPolicy creates a retetention policy and verifies that the creation was successful. // Replication factor is set to equal the number nodes in the cluster. -func createRetentionPolicy(t *testing.T, testName string, nodes Cluster, database, retention string) { +func createRetentionPolicy(t *testing.T, testName string, nodes Cluster, database, retention string, replicationFactor int) { t.Logf("Creating retention policy %s for database %s", retention, database) - command := fmt.Sprintf("CREATE RETENTION POLICY %s ON %s DURATION 1h REPLICATION %d DEFAULT", retention, database, len(nodes)) + command := fmt.Sprintf("CREATE RETENTION POLICY %s ON %s DURATION 1h REPLICATION %d DEFAULT", retention, database, replicationFactor) query(t, nodes[:1], "", command, `{"results":[{}]}`, "") } @@ -281,12 +281,16 @@ var limitAndOffset = func(t *testing.T, node *TestNode, database, retention stri } } -func runTest_rawDataReturnsInOrder(t *testing.T, testName string, nodes Cluster, database, retention string) { +func runTest_rawDataReturnsInOrder(t *testing.T, testName string, nodes Cluster, database, retention string, replicationFactor int) { + // skip this test if they're just looking to run some of thd data tests + if os.Getenv("TEST_PREFIX") != "" { + return + } t.Logf("Running %s:rawDataReturnsInOrder against %d-node cluster", testName, len(nodes)) // Start by ensuring database and retention policy exist. createDatabase(t, testName, nodes, database) - createRetentionPolicy(t, testName, nodes, database, retention) + createRetentionPolicy(t, testName, nodes, database, retention, replicationFactor) numPoints := 500 var expected string @@ -297,7 +301,7 @@ func runTest_rawDataReturnsInOrder(t *testing.T, testName string, nodes Cluster, } expected = fmt.Sprintf(`{"results":[{"series":[{"name":"cpu","columns":["time","count"],"values":[["1970-01-01T00:00:00Z",%d]]}]}]}`, numPoints-1) - got, ok := queryAndWait(t, nodes, database, `SELECT count(value) FROM cpu`, expected, "", 60*time.Second) + got, ok := queryAndWait(t, nodes, database, `SELECT count(value) FROM cpu`, expected, "", 120*time.Second) if !ok { t.Errorf("test %s:rawDataReturnsInOrder failed, SELECT count() query returned unexpected data\nexp: %s\n, got: %s", testName, expected, got) } @@ -347,7 +351,7 @@ func runTests_Errors(t *testing.T, nodes Cluster) { } // runTests tests write and query of data. Setting testNumbers allows only a subset of tests to be run. -func runTestsData(t *testing.T, testName string, nodes Cluster, database, retention string) { +func runTestsData(t *testing.T, testName string, nodes Cluster, database, retention string, replicationFactor int) { t.Logf("Running tests against %d-node cluster", len(nodes)) yesterday := time.Now().Add(-1 * time.Hour * 24).UTC() @@ -355,7 +359,7 @@ func runTestsData(t *testing.T, testName string, nodes Cluster, database, retent // Start by ensuring database and retention policy exist. createDatabase(t, testName, nodes, database) - createRetentionPolicy(t, testName, nodes, database, retention) + createRetentionPolicy(t, testName, nodes, database, retention, replicationFactor) // The tests. Within these tests %DB% and %RP% will be replaced with the database and retention passed into // this function. @@ -558,7 +562,7 @@ func runTestsData(t *testing.T, testName string, nodes Cluster, database, retent {"name": "load", "timestamp": "2000-01-01T00:00:10Z", "tags": {"region": "us-east", "host": "serverB"}, "fields": {"value": 30}}, {"name": "load", "timestamp": "2000-01-01T00:00:00Z", "tags": {"region": "us-west", "host": "serverC"}, "fields": {"value": 100}} ]}`, - query: `SELECT sum(value) FROM load GROUP BY time(10s), region, host`, + query: `SELECT sum(value) FROM load GROUP BY region, host`, queryDb: "%DB%", expected: `{"results":[{"series":[{"name":"load","tags":{"host":"serverA","region":"us-east"},"columns":["time","sum"],"values":[["1970-01-01T00:00:00Z",20]]},{"name":"load","tags":{"host":"serverB","region":"us-east"},"columns":["time","sum"],"values":[["1970-01-01T00:00:00Z",30]]},{"name":"load","tags":{"host":"serverC","region":"us-west"},"columns":["time","sum"],"values":[["1970-01-01T00:00:00Z",100]]}]}]}`, }, @@ -625,9 +629,9 @@ func runTestsData(t *testing.T, testName string, nodes Cluster, database, retent }, { name: "wildcard GROUP BY queries with time", - query: `SELECT mean(value) FROM cpu GROUP BY *,time(1m)`, + query: `SELECT mean(value) FROM cpu WHERE time >= '2000-01-01T00:00:00Z' AND time < '2000-01-01T00:01:00Z' GROUP BY *,time(1m)`, queryDb: "%DB%", - expected: `{"results":[{"series":[{"name":"cpu","tags":{"region":"us-east"},"columns":["time","mean"],"values":[["1970-01-01T00:00:00Z",15]]},{"name":"cpu","tags":{"region":"us-west"},"columns":["time","mean"],"values":[["1970-01-01T00:00:00Z",30]]}]}]}`, + expected: `{"results":[{"series":[{"name":"cpu","tags":{"region":"us-east"},"columns":["time","mean"],"values":[["2000-01-01T00:00:00Z",15]]},{"name":"cpu","tags":{"region":"us-west"},"columns":["time","mean"],"values":[["2000-01-01T00:00:00Z",30]]}]}]}`, }, // WHERE tag queries @@ -1307,7 +1311,7 @@ func runTestsData(t *testing.T, testName string, nodes Cluster, database, retent t.Logf(`reseting for test "%s"`, name) deleteDatabase(t, testName, nodes, database) createDatabase(t, testName, nodes, database) - createRetentionPolicy(t, testName, nodes, database, retention) + createRetentionPolicy(t, testName, nodes, database, retention, replicationFactor) } if tt.write != "" { @@ -1354,8 +1358,8 @@ func TestSingleServer(t *testing.T) { nodes := createCombinedNodeCluster(t, testName, dir, 1, nil) defer nodes.Close() - runTestsData(t, testName, nodes, "mydb", "myrp") - runTest_rawDataReturnsInOrder(t, testName, nodes, "mydb", "myrp") + runTestsData(t, testName, nodes, "mydb", "myrp", len(nodes)) + runTest_rawDataReturnsInOrder(t, testName, nodes, "mydb", "myrp", len(nodes)) } func Test3NodeServer(t *testing.T) { @@ -1372,9 +1376,27 @@ func Test3NodeServer(t *testing.T) { nodes := createCombinedNodeCluster(t, testName, dir, 3, nil) defer nodes.Close() - runTestsData(t, testName, nodes, "mydb", "myrp") - runTest_rawDataReturnsInOrder(t, testName, nodes, "mydb", "myrp") + runTestsData(t, testName, nodes, "mydb", "myrp", len(nodes)) + runTest_rawDataReturnsInOrder(t, testName, nodes, "mydb", "myrp", len(nodes)) + +} + +// ensure that all queries work if there are more nodes in a cluster than the replication factor +func Test3NodeClusterPartiallyReplicated(t *testing.T) { + testName := "3-node server integration" + if testing.Short() { + t.Skip(fmt.Sprintf("skipping '%s'", testName)) + } + dir := tempfile() + defer func() { + os.RemoveAll(dir) + }() + + nodes := createCombinedNodeCluster(t, testName, dir, 3, nil) + defer nodes.Close() + runTestsData(t, testName, nodes, "mydb", "myrp", len(nodes)-1) + runTest_rawDataReturnsInOrder(t, testName, nodes, "mydb", "myrp", len(nodes)-1) } func TestClientLibrary(t *testing.T) { @@ -1478,7 +1500,7 @@ func TestClientLibrary(t *testing.T) { test.rp = "myrp" } createDatabase(t, testName, nodes, test.db) - createRetentionPolicy(t, testName, nodes, test.db, test.rp) + createRetentionPolicy(t, testName, nodes, test.db, test.rp, len(nodes)) t.Logf("testing %s - %s\n", testName, test.name) for _, w := range test.writes { writeResult, err := c.Write(w.bp) @@ -1533,7 +1555,7 @@ func Test_ServerSingleGraphiteIntegration(t *testing.T) { defer nodes.Close() createDatabase(t, testName, nodes, "graphite") - createRetentionPolicy(t, testName, nodes, "graphite", "raw") + createRetentionPolicy(t, testName, nodes, "graphite", "raw", len(nodes)) // Connect to the graphite endpoint we just spun up conn, err := net.Dial("tcp", g.ConnectionString(c.BindAddress)) @@ -1584,7 +1606,7 @@ func Test_ServerSingleGraphiteIntegration_FractionalTime(t *testing.T) { defer nodes.Close() createDatabase(t, testName, nodes, "graphite") - createRetentionPolicy(t, testName, nodes, "graphite", "raw") + createRetentionPolicy(t, testName, nodes, "graphite", "raw", len(nodes)) // Connect to the graphite endpoint we just spun up conn, err := net.Dial("tcp", g.ConnectionString(c.BindAddress)) @@ -1636,7 +1658,7 @@ func Test_ServerSingleGraphiteIntegration_ZeroDataPoint(t *testing.T) { defer nodes.Close() createDatabase(t, testName, nodes, "graphite") - createRetentionPolicy(t, testName, nodes, "graphite", "raw") + createRetentionPolicy(t, testName, nodes, "graphite", "raw", len(nodes)) // Connect to the graphite endpoint we just spun up conn, err := net.Dial("tcp", g.ConnectionString(c.BindAddress)) @@ -1747,7 +1769,7 @@ func Test_ServerOpenTSDBIntegration(t *testing.T) { defer nodes.Close() createDatabase(t, testName, nodes, "opentsdb") - createRetentionPolicy(t, testName, nodes, "opentsdb", "raw") + createRetentionPolicy(t, testName, nodes, "opentsdb", "raw", len(nodes)) // Connect to the graphite endpoint we just spun up conn, err := net.Dial("tcp", o.ListenAddress(c.BindAddress)) @@ -1798,7 +1820,7 @@ func Test_ServerOpenTSDBIntegration_WithTags(t *testing.T) { defer nodes.Close() createDatabase(t, testName, nodes, "opentsdb") - createRetentionPolicy(t, testName, nodes, "opentsdb", "raw") + createRetentionPolicy(t, testName, nodes, "opentsdb", "raw", len(nodes)) // Connect to the graphite endpoint we just spun up conn, err := net.Dial("tcp", o.ListenAddress(c.BindAddress)) @@ -1852,7 +1874,7 @@ func Test_ServerOpenTSDBIntegration_BadData(t *testing.T) { defer nodes.Close() createDatabase(t, testName, nodes, "opentsdb") - createRetentionPolicy(t, testName, nodes, "opentsdb", "raw") + createRetentionPolicy(t, testName, nodes, "opentsdb", "raw", len(nodes)) // Connect to the graphite endpoint we just spun up conn, err := net.Dial("tcp", o.ListenAddress(c.BindAddress)) diff --git a/commands.go b/commands.go index ec82175d324..ede928ce088 100644 --- a/commands.go +++ b/commands.go @@ -202,7 +202,7 @@ func (c *createMeasurementsIfNotExistsCommand) addFieldIfNotExists(measurement, type dropSeriesCommand struct { Database string `json:"database"` - SeriesByMeasurement map[string][]uint32 `json:"seriesIds"` + SeriesByMeasurement map[string][]uint64 `json:"seriesIds"` } // createContinuousQueryCommand is the raft command for creating a continuous query on a database diff --git a/database.go b/database.go index 2f8b874c4e9..09864abf46a 100644 --- a/database.go +++ b/database.go @@ -31,7 +31,7 @@ type database struct { // in memory indexing structures measurements map[string]*Measurement // measurement name to object and index - series map[uint32]*Series // map series id to the Series object + series map[uint64]*Series // map series id to the Series object names []string // sorted list of the measurement names } @@ -41,7 +41,7 @@ func newDatabase() *database { policies: make(map[string]*RetentionPolicy), continuousQueries: make([]*ContinuousQuery, 0), measurements: make(map[string]*Measurement), - series: make(map[uint32]*Series), + series: make(map[uint64]*Series), names: make([]string, 0), } } @@ -56,7 +56,7 @@ func (db *database) shardGroupByTimestamp(policy string, timestamp time.Time) (* } // Series takes a series ID and returns a series. -func (db *database) Series(id uint32) *Series { +func (db *database) Series(id uint64) *Series { return db.series[id] } @@ -119,7 +119,7 @@ type Measurement struct { // in-memory index fields series map[string]*Series // sorted tagset string to the series object - seriesByID map[uint32]*Series // lookup table for series by their id + seriesByID map[uint64]*Series // lookup table for series by their id measurement *Measurement seriesByTagKeyValue map[string]map[string]seriesIDs // map from tag key to value to sorted set of series ids seriesIDs seriesIDs // sorted list of series IDs in this measurement @@ -132,7 +132,7 @@ func NewMeasurement(name string) *Measurement { Fields: make([]*Field, 0), series: make(map[string]*Series), - seriesByID: make(map[uint32]*Series), + seriesByID: make(map[uint64]*Series), seriesByTagKeyValue: make(map[string]map[string]seriesIDs), seriesIDs: make(seriesIDs, 0), } @@ -227,7 +227,7 @@ func (m *Measurement) addSeries(s *Series) bool { } // dropSeries will remove a series from the measurementIndex. Returns true if already removed -func (m *Measurement) dropSeries(seriesID uint32) bool { +func (m *Measurement) dropSeries(seriesID uint64) bool { if _, ok := m.seriesByID[seriesID]; !ok { return true } @@ -237,7 +237,7 @@ func (m *Measurement) dropSeries(seriesID uint32) bool { delete(m.series, tagset) delete(m.seriesByID, seriesID) - var ids []uint32 + var ids []uint64 for _, id := range m.seriesIDs { if id != seriesID { ids = append(ids, id) @@ -250,7 +250,7 @@ func (m *Measurement) dropSeries(seriesID uint32) bool { for k, v := range m.seriesByTagKeyValue { values := v for kk, vv := range values { - var ids []uint32 + var ids []uint64 for _, id := range vv { if id != seriesID { ids = append(ids, id) @@ -281,8 +281,8 @@ func (m *Measurement) seriesByTags(tags map[string]string) *Series { // filters walks the where clause of a select statement and returns a map with all series ids // matching the where clause and any filter expression that should be applied to each -func (m *Measurement) filters(stmt *influxql.SelectStatement) map[uint32]influxql.Expr { - seriesIdsToExpr := make(map[uint32]influxql.Expr) +func (m *Measurement) filters(stmt *influxql.SelectStatement) map[uint64]influxql.Expr { + seriesIdsToExpr := make(map[uint64]influxql.Expr) if stmt.Condition == nil || stmt.OnlyTimeDimensions() { for _, id := range m.seriesIDs { seriesIdsToExpr[id] = nil @@ -418,7 +418,7 @@ func (m *Measurement) idsForExpr(n *influxql.BinaryExpr) (seriesIDs, bool, influ // value should be included in the resulting set, and an expression if the return is a field expression. // The map that it takes maps each series id to the field expression that should be used to evaluate it when iterating over its cursor. // Series that have no field expressions won't be in the map -func (m *Measurement) walkWhereForSeriesIds(expr influxql.Expr, filters map[uint32]influxql.Expr) (seriesIDs, bool, influxql.Expr) { +func (m *Measurement) walkWhereForSeriesIds(expr influxql.Expr, filters map[uint64]influxql.Expr) (seriesIDs, bool, influxql.Expr) { switch n := expr.(type) { case *influxql.BinaryExpr: switch n.Op { @@ -573,7 +573,7 @@ func (m *Measurement) seriesIDsAllOrByExpr(expr influxql.Expr) (seriesIDs, error } // Get series IDs that match the WHERE clause. - filters := map[uint32]influxql.Expr{} + filters := map[uint64]influxql.Expr{} ids, _, _ := m.walkWhereForSeriesIds(expr, filters) return ids, nil @@ -955,7 +955,7 @@ func (f *FieldCodec) FieldByName(name string) *Field { // Series belong to a Measurement and represent unique time series in a database type Series struct { - ID uint32 + ID uint64 Tags map[string]string measurement *Measurement @@ -973,7 +973,7 @@ func (s *Series) match(tags map[string]string) bool { // seriesIDs is a convenience type for sorting, checking equality, and doing // union and intersection of collections of series ids. -type seriesIDs []uint32 +type seriesIDs []uint64 func (a seriesIDs) Len() int { return len(a) } func (a seriesIDs) Less(i, j int) bool { return a[i] < a[j] } @@ -1008,7 +1008,7 @@ func (a seriesIDs) intersect(other seriesIDs) seriesIDs { // That is, don't run comparisons against lower values that we've already passed var i, j int - ids := make([]uint32, 0, len(l)) + ids := make([]uint64, 0, len(l)) for i < len(l) && j < len(r) { if l[i] == r[j] { ids = append(ids, l[i]) @@ -1029,7 +1029,7 @@ func (a seriesIDs) intersect(other seriesIDs) seriesIDs { func (a seriesIDs) union(other seriesIDs) seriesIDs { l := a r := other - ids := make([]uint32, 0, len(l)+len(r)) + ids := make([]uint64, 0, len(l)+len(r)) var i, j int for i < len(l) && j < len(r) { if l[i] == r[j] { @@ -1062,7 +1062,7 @@ func (a seriesIDs) reject(other seriesIDs) seriesIDs { r := other var i, j int - ids := make([]uint32, 0, len(l)) + ids := make([]uint64, 0, len(l)) for i < len(l) && j < len(r) { if l[i] == r[j] { i++ @@ -1150,7 +1150,7 @@ func (db *database) dropMeasurement(name string) error { delete(db.measurements, name) // collect the series ids to remove - var ids []uint32 + var ids []uint64 // remove series from in memory map for id, series := range db.series { @@ -1171,7 +1171,7 @@ func (db *database) dropMeasurement(name string) error { } // dropSeries will delete all data with the seriesID -func (rp *RetentionPolicy) dropSeries(seriesIDs ...uint32) error { +func (rp *RetentionPolicy) dropSeries(seriesIDs ...uint64) error { for _, g := range rp.shardGroups { err := g.dropSeries(seriesIDs...) if err != nil { @@ -1257,7 +1257,7 @@ func (db *database) addSeriesToIndex(measurementName string, s *Series) bool { } // dropSeries removes the series from the in memory references -func (db *database) dropSeries(seriesByMeasurement map[string][]uint32) error { +func (db *database) dropSeries(seriesByMeasurement map[string][]uint64) error { for measurement, ids := range seriesByMeasurement { for _, id := range ids { // if the series is already gone, return @@ -1304,7 +1304,7 @@ func (db *database) MeasurementAndSeries(name string, tags map[string]string) (* } // SeriesByID returns the Series that has the given id. -func (db *database) SeriesByID(id uint32) *Series { +func (db *database) SeriesByID(id uint64) *Series { return db.series[id] } @@ -1314,7 +1314,7 @@ func (db *database) MeasurementNames() []string { } // DropSeries will clear the index of all references to a series. -func (db *database) DropSeries(id uint32) { +func (db *database) DropSeries(id uint64) { panic("not implemented") } diff --git a/httpd/handler.go b/httpd/handler.go index d11066b3205..faa262b87e6 100644 --- a/httpd/handler.go +++ b/httpd/handler.go @@ -88,6 +88,10 @@ func NewClusterHandler(s *influxdb.Server, requireAuthentication bool, version s "wait", // Wait. "GET", "/wait/:index", true, true, h.serveWait, }, + route{ + "run_mapper", + "POST", "/run_mapper", true, true, h.serveRunMapper, + }, }) return h } @@ -202,6 +206,8 @@ func (h *Handler) serveQuery(w http.ResponseWriter, r *http.Request, user *influ if chunked { if cs, err := strconv.ParseInt(q.Get("chunk_size"), 10, 64); err == nil { chunkSize = int(cs) + } else { + chunkSize = DefaultChunkSize } } @@ -700,6 +706,99 @@ func (h *Handler) serveProcessContinuousQueries(w http.ResponseWriter, r *http.R w.WriteHeader(http.StatusAccepted) } +func (h *Handler) serveRunMapper(w http.ResponseWriter, r *http.Request) { + // we always return a 200, even if there's an error because we always include an error object + // that can be passed on + w.Header().Add("content-type", "application/json") + w.WriteHeader(200) + + // Read in the mapper info from the request body + var m influxdb.RemoteMapper + + if err := json.NewDecoder(r.Body).Decode(&m); err != nil { + mapError(w, err) + return + } + + // create a local mapper and chunk out the results to the other server + lm, err := h.server.StartLocalMapper(&m) + if err != nil { + mapError(w, err) + return + } + if err := lm.Open(); err != nil { + mapError(w, err) + return + } + defer lm.Close() + call, err := m.CallExpr() + if err != nil { + mapError(w, err) + return + } + + if err := lm.Begin(call, m.TMin, m.ChunkSize); err != nil { + mapError(w, err) + return + } + + // see if this is an aggregate query or not + isRaw := true + if call != nil { + isRaw = false + } + + // write results to the client until the next interval is empty + for { + v, err := lm.NextInterval() + if err != nil { + mapError(w, err) + return + } + + // see if we're done. only bail if v is nil and we're empty. v could be nil for + // group by intervals that don't have data. We should keep iterating to get to the next interval. + if v == nil && lm.IsEmpty(m.TMax) { + break + } + + // marshal and write out + d, err := json.Marshal(&v) + if err != nil { + mapError(w, err) + return + } + b, err := json.Marshal(&influxdb.MapResponse{Data: d}) + if err != nil { + mapError(w, err) + return + } + w.Write(b) + w.(http.Flusher).Flush() + + // if this is an aggregate query, we should only call next interval as many times as the chunk size + if !isRaw { + m.ChunkSize-- + if m.ChunkSize == 0 { + break + } + } + + // bail out if we're empty + if lm.IsEmpty(m.TMax) { + break + } + } + + d, err := json.Marshal(&influxdb.MapResponse{Completed: true}) + if err != nil { + mapError(w, err) + } else { + w.Write(d) + w.(http.Flusher).Flush() + } +} + type dataNodeJSON struct { ID uint64 `json:"id"` URL string `json:"url"` @@ -723,6 +822,12 @@ func isFieldNotFoundError(err error) bool { return (strings.HasPrefix(err.Error(), "field not found")) } +// mapError writes an error result after trying to start a mapper +func mapError(w http.ResponseWriter, err error) { + b, _ := json.Marshal(&influxdb.MapResponse{Err: err.Error()}) + w.Write(b) +} + // httpError writes an error to the client in a standard format. func httpError(w http.ResponseWriter, error string, pretty bool, code int) { w.Header().Add("content-type", "application/json") diff --git a/influxdb.go b/influxdb.go index 1b5e7cb4cdf..982fd6bb49f 100644 --- a/influxdb.go +++ b/influxdb.go @@ -140,6 +140,9 @@ var ( // ErrContinuousQueryNotFound is returned when dropping a nonexistent continuous query. ErrContinuousQueryNotFound = errors.New("continuous query not found") + + // ErrShardNotLocal is thrown whan a server attempts to run a mapper against a shard it doesn't have a copy of. + ErrShardNotLocal = errors.New("shard not local") ) func ErrDatabaseNotFound(name string) error { return Errorf("database not found: %s", name) } diff --git a/influxql/ast.go b/influxql/ast.go index 0514c1cb7f5..3db30ac87f9 100644 --- a/influxql/ast.go +++ b/influxql/ast.go @@ -1196,7 +1196,7 @@ func (s *ShowSeriesStatement) RequiredPrivileges() ExecutionPrivileges { // DropSeriesStatement represents a command for removing a series from the database. type DropSeriesStatement struct { // The Id of the series being dropped (optional) - SeriesID uint32 + SeriesID uint64 // Data source that fields are extracted from (optional) Source Source diff --git a/influxql/engine.go b/influxql/engine.go index 2a2edff8e48..4f096f728fa 100644 --- a/influxql/engine.go +++ b/influxql/engine.go @@ -21,6 +21,9 @@ const ( // Since time is always selected, the column count when selecting only a single other value will be 2 SelectColumnCountWithOneValue = 2 + + // IgnoredChunkSize is what gets passed into Mapper.Begin for aggregate queries as they don't chunk points out + IgnoredChunkSize = 0 ) // Tx represents a transaction. @@ -66,6 +69,13 @@ func (m *MapReduceJob) Key() []byte { } func (m *MapReduceJob) Execute(out chan *Row, filterEmptyResults bool) { + if err := m.Open(); err != nil { + out <- &Row{Err: err} + m.Close() + return + } + defer m.Close() + // if it's a raw query we handle processing differently if m.stmt.IsRawQuery { m.processRawQuery(out, filterEmptyResults) @@ -199,7 +209,7 @@ func (m *MapReduceJob) Execute(out chan *Row, filterEmptyResults bool) { func (m *MapReduceJob) processRawQuery(out chan *Row, filterEmptyResults bool) { // initialize the mappers for _, mm := range m.Mappers { - if err := mm.Begin(nil, m.TMin); err != nil { + if err := mm.Begin(nil, m.TMin, m.chunkSize); err != nil { out <- &Row{Err: err} return } @@ -224,8 +234,7 @@ func (m *MapReduceJob) processRawQuery(out chan *Row, filterEmptyResults bool) { continue } - mm.SetLimit(m.chunkSize) - res, err := mm.NextInterval(m.TMax) + res, err := mm.NextInterval() if err != nil { out <- &Row{Err: err} return @@ -246,7 +255,7 @@ func (m *MapReduceJob) processRawQuery(out chan *Row, filterEmptyResults bool) { } // find the min of the last point in each mapper - t := o[len(o)-1].timestamp + t := o[len(o)-1].Timestamp if t < min { min = t } @@ -258,7 +267,7 @@ func (m *MapReduceJob) processRawQuery(out chan *Row, filterEmptyResults bool) { // find the index of the point up to the min ind := len(o) for i, mo := range o { - if mo.timestamp > min { + if mo.Timestamp > min { ind = i break } @@ -303,6 +312,7 @@ func (m *MapReduceJob) processRawQuery(out chan *Row, filterEmptyResults bool) { } valuesSent += len(values) } + valuesToReturn = append(valuesToReturn, values...) // hit the chunk size? Send out what has been accumulated, but keep @@ -574,13 +584,13 @@ func (m *MapReduceJob) processRawResults(values []*rawQueryMapOutput) *Row { vals := make([]interface{}, len(selectNames)) if singleValue { - vals[0] = time.Unix(0, v.timestamp).UTC() - vals[1] = v.values.(interface{}) + vals[0] = time.Unix(0, v.Timestamp).UTC() + vals[1] = v.Values.(interface{}) } else { - fields := v.values.(map[string]interface{}) + fields := v.Values.(map[string]interface{}) // time is always the first value - vals[0] = time.Unix(0, v.timestamp).UTC() + vals[0] = time.Unix(0, v.Timestamp).UTC() // populate the other values for i := 1; i < len(selectNames); i++ { @@ -599,24 +609,18 @@ func (m *MapReduceJob) processAggregate(c *Call, reduceFunc ReduceFunc, resultVa // intialize the mappers for _, mm := range m.Mappers { - if err := mm.Begin(c, m.TMin); err != nil { + // for aggregate queries, we use the chunk size to determine how many times NextInterval should be called. + // This is the number of buckets that we need to fill. + if err := mm.Begin(c, m.TMin, len(resultValues)); err != nil { return err } } - // the first interval in a query with a group by may be smaller than the others. This happens when they have a - // where time > clause that is in the middle of the bucket that the group by time creates - firstInterval := (m.TMin/m.interval*m.interval + m.interval) - m.TMin - // populate the result values for each interval of time for i, _ := range resultValues { // collect the results from each mapper for j, mm := range m.Mappers { - interval := m.interval - if i == 0 { - interval = firstInterval - } - res, err := mm.NextInterval(interval) + res, err := mm.NextInterval() if err != nil { return err } @@ -645,29 +649,24 @@ type Mapper interface { // Close will close the mapper (either the bolt transaction or the request) Close() - // Begin will set up the mapper to run the map function for a given aggregate call starting at the passed in time - Begin(*Call, int64) error + // Begin will set up the mapper to run the map function for a given aggregate call starting at the passed in time. + // For raw data queries it will yield to the mapper no more than limit number of points. + Begin(aggregate *Call, startingTime int64, limit int) error // NextInterval will get the time ordered next interval of the given interval size from the mapper. This is a // forward only operation from the start time passed into Begin. Will return nil when there is no more data to be read. - // We pass the interval in here so that it can be varied over the period of the query. This is useful for queries that - // must respect natural time boundaries like months or queries that span daylight savings time borders. Note that if - // a limit is set on the mapper, the interval passed here should represent the MaxTime in a nano epoch. - NextInterval(interval int64) (interface{}, error) - - // Set limit will limit the number of data points yielded on the next interval. If a limit is set, the interval - // passed into NextInterval will be used as the MaxTime to scan until. - SetLimit(limit int) + // Interval periods can be different based on time boundaries (months, daylight savings, etc) of the query. + NextInterval() (interface{}, error) } type TagSet struct { Tags map[string]string Filters []Expr - SeriesIDs []uint32 + SeriesIDs []uint64 Key []byte } -func (t *TagSet) AddFilter(id uint32, filter Expr) { +func (t *TagSet) AddFilter(id uint64, filter Expr) { t.SeriesIDs = append(t.SeriesIDs, id) t.Filters = append(t.Filters, filter) } @@ -745,20 +744,12 @@ type Executor struct { } // Execute begins execution of the query and returns a channel to receive rows. -func (e *Executor) Execute() (<-chan *Row, error) { - // Open transaction. - for _, j := range e.jobs { - if err := j.Open(); err != nil { - e.close() - return nil, err - } - } - +func (e *Executor) Execute() <-chan *Row { // Create output channel and stream data in a separate goroutine. out := make(chan *Row, 0) go e.execute(out) - return out, nil + return out } func (e *Executor) close() { diff --git a/influxql/functions.go b/influxql/functions.go index 57c5367195f..33cbde18372 100644 --- a/influxql/functions.go +++ b/influxql/functions.go @@ -7,23 +7,30 @@ package influxql // When adding an aggregate function, define a mapper, a reducer, and add them in the switch statement in the MapReduceFuncs function import ( + "encoding/json" "fmt" "math" "sort" "strings" ) -// Iterator represents a forward-only iterator over a set of points. These are used by the MapFunctions in this file +// Iterator represents a forward-only iterator over a set of points. +// These are used by the MapFunctions in this file type Iterator interface { - Next() (seriesID uint32, timestamp int64, value interface{}) + Next() (seriesID uint64, timestamp int64, value interface{}) } -// MapFunc represents a function used for mapping over a sequential series of data. The iterator represents a single group by interval +// MapFunc represents a function used for mapping over a sequential series of data. +// The iterator represents a single group by interval type MapFunc func(Iterator) interface{} // ReduceFunc represents a function used for reducing mapper output. type ReduceFunc func([]interface{}) interface{} +// UnmarshalFunc represents a function that can take bytes from a mapper from remote +// server and marshal it into an interface the reduer can use +type UnmarshalFunc func([]byte) (interface{}, error) + // InitializeMapFunc takes an aggregate call from the query and returns the MapFunc func InitializeMapFunc(c *Call) (MapFunc, error) { // see if it's a query for raw data @@ -110,6 +117,52 @@ func InitializeReduceFunc(c *Call) (ReduceFunc, error) { } } +func InitializeUnmarshaller(c *Call) (UnmarshalFunc, error) { + // if c is nil it's a raw data query + if c == nil { + return func(b []byte) (interface{}, error) { + warn("MARSHAL OUTPUT: ", string(b)) + a := make([]*rawQueryMapOutput, 0) + err := json.Unmarshal(b, &a) + return a, err + }, nil + } + + // Retrieve marshal function by name + switch strings.ToLower(c.Name) { + case "mean": + return func(b []byte) (interface{}, error) { + var o meanMapOutput + err := json.Unmarshal(b, &o) + return &o, err + }, nil + case "spread": + return func(b []byte) (interface{}, error) { + var o spreadMapOutput + err := json.Unmarshal(b, &o) + return &o, err + }, nil + case "first": + return func(b []byte) (interface{}, error) { + var o firstLastMapOutput + err := json.Unmarshal(b, &o) + return &o, err + }, nil + case "last": + return func(b []byte) (interface{}, error) { + var o firstLastMapOutput + err := json.Unmarshal(b, &o) + return &o, err + }, nil + default: + return func(b []byte) (interface{}, error) { + var val interface{} + err := json.Unmarshal(b, &val) + return val, err + }, nil + } +} + // MapCount computes the number of values in an iterator. func MapCount(itr Iterator) interface{} { n := 0 @@ -519,12 +572,12 @@ func MapRawQuery(itr Iterator) interface{} { } type rawQueryMapOutput struct { - timestamp int64 - values interface{} + Timestamp int64 + Values interface{} } type rawOutputs []*rawQueryMapOutput func (a rawOutputs) Len() int { return len(a) } -func (a rawOutputs) Less(i, j int) bool { return a[i].timestamp < a[j].timestamp } +func (a rawOutputs) Less(i, j int) bool { return a[i].Timestamp < a[j].Timestamp } func (a rawOutputs) Swap(i, j int) { a[i], a[j] = a[j], a[i] } diff --git a/influxql/parser.go b/influxql/parser.go index d4f961da006..3acf1d48e34 100644 --- a/influxql/parser.go +++ b/influxql/parser.go @@ -388,6 +388,22 @@ func (p *Parser) parseUInt32() (uint32, error) { return uint32(n), nil } +// parseUInt64 parses a string and returns a 64-bit unsigned integer literal. +func (p *Parser) parseUInt64() (uint64, error) { + tok, pos, lit := p.scanIgnoreWhitespace() + if tok != NUMBER { + return 0, newParseError(tokstr(tok, lit), []string{"number"}, pos) + } + + // Convert string to unsigned 64-bit integer + n, err := strconv.ParseUint(lit, 10, 64) + if err != nil { + return 0, &ParseError{Message: err.Error(), Pos: pos} + } + + return uint64(n), nil +} + // parseDuration parses a string and returns a duration literal. // This function assumes the DURATION token has already been consumed. func (p *Parser) parseDuration() (time.Duration, error) { @@ -1023,7 +1039,7 @@ func (p *Parser) parseDropSeriesStatement() (*DropSeriesStatement, error) { // If they didn't provide a FROM or a WHERE, they need to provide the SeriesID if stmt.Condition == nil && stmt.Source == nil { - id, err := p.parseUInt32() + id, err := p.parseUInt64() if err != nil { return nil, err } diff --git a/metastore.go b/metastore.go index 66867a24ff3..0b66fb2b927 100644 --- a/metastore.go +++ b/metastore.go @@ -238,8 +238,8 @@ func (tx *metatx) createSeries(database, name string, tags map[string]string) (* id, _ := t.NextSequence() // store the tag map for the series - s := &Series{ID: uint32(id), Tags: tags} - idBytes := u32tob(uint32(id)) + s := &Series{ID: uint64(id), Tags: tags} + idBytes := u64tob(uint64(id)) if err := b.Put(idBytes, mustMarshalJSON(s)); err != nil { return nil, err @@ -248,12 +248,12 @@ func (tx *metatx) createSeries(database, name string, tags map[string]string) (* } // dropSeries removes all seriesIDS for a given database/measurement -func (tx *metatx) dropSeries(database string, seriesByMeasurement map[string][]uint32) error { +func (tx *metatx) dropSeries(database string, seriesByMeasurement map[string][]uint64) error { for measurement, ids := range seriesByMeasurement { b := tx.Bucket([]byte("Databases")).Bucket([]byte(database)).Bucket([]byte("Series")).Bucket([]byte(measurement)) if b != nil { for _, id := range ids { - if err := b.Delete(u32tob(id)); err != nil { + if err := b.Delete(u64tob(id)); err != nil { return err } } diff --git a/remote_mapper.go b/remote_mapper.go new file mode 100644 index 00000000000..ee03e8a71bf --- /dev/null +++ b/remote_mapper.go @@ -0,0 +1,196 @@ +package influxdb + +import ( + "bytes" + "encoding/json" + "errors" + "net/http" + + "github.com/influxdb/influxdb/influxql" +) + +const ( + MAX_MAP_RESPONSE_SIZE = 1024 * 1024 +) + +// RemoteMapper implements the influxql.Mapper interface. The engine uses the remote mapper +// to pull map results from shards that only exist on other servers in the cluster. +type RemoteMapper struct { + dataNodes []*DataNode + resp *http.Response + results chan interface{} + unmarshal influxql.UnmarshalFunc + complete bool + + Call string `json:",omitempty"` + Database string `json:",omitempty"` + MeasurementName string `json:",omitempty"` + TMin int64 `json:",omitempty"` + TMax int64 `json:",omitempty"` + SeriesIDs []uint64 `json:",omitempty"` + ShardID uint64 `json:",omitempty"` + Filters []string `json:",omitempty"` + WhereFields []*Field `json:",omitempty"` + SelectFields []*Field `json:",omitempty"` + SelectTags []string `json:",omitempty"` + Limit int `json:",omitempty"` + Offset int `json:",omitempty"` + Interval int64 `json:",omitempty"` + ChunkSize int `json:",omitempty"` +} + +// Responses get streamed back to the remote mapper from the remote machine that runs a local mapper +type MapResponse struct { + Err string `json:",omitempty"` + Data []byte + Completed bool `json:",omitempty"` +} + +// Open is a no op, real work is done starting with Being +func (m *RemoteMapper) Open() error { return nil } + +// Close the response body +func (m *RemoteMapper) Close() { + if m.resp != nil && m.resp.Body != nil { + m.resp.Body.Close() + } +} + +// Begin sends a request to the remote server to start streaming map results +func (m *RemoteMapper) Begin(c *influxql.Call, startingTime int64, chunkSize int) error { + // get the function for unmarshaling results + f, err := influxql.InitializeUnmarshaller(c) + if err != nil { + return err + } + m.unmarshal = f + + if c != nil { + m.Call = c.String() + } + m.ChunkSize = chunkSize + m.TMin = startingTime + + // send the request to map to the remote server + b, err := json.Marshal(m) + if err != nil { + return err + } + + // request to start streaming results + resp, err := http.Post(m.dataNodes[0].URL.String()+"/run_mapper", "application/json", bytes.NewReader(b)) + if err != nil { + return err + } + m.resp = resp + + return nil +} + +// NextInterval is part of the mapper interface. In this case we read the next chunk from the remote mapper +func (m *RemoteMapper) NextInterval() (interface{}, error) { + // just return nil if the mapper has completed its run + if m.complete { + return nil, nil + } + + // read the chunk + chunk := make([]byte, MAX_MAP_RESPONSE_SIZE, MAX_MAP_RESPONSE_SIZE) + n, err := m.resp.Body.Read(chunk) + if err != nil { + return nil, err + } + if n == 0 { + return nil, nil + } + + // marshal the response + mr := &MapResponse{} + err = json.Unmarshal(chunk[:n], mr) + if err != nil { + return nil, err + } + if mr.Err != "" { + return nil, errors.New(mr.Err) + } + + // if it's a complete message, we've emptied this mapper of all data + if mr.Completed { + m.complete = true + return nil, nil + } + + // marshal the data that came from the MapFN + v, err := m.unmarshal(mr.Data) + if err != nil { + return nil, err + } + + return v, nil +} + +// CallExpr will parse the Call string into an expression or return nil +func (m *RemoteMapper) CallExpr() (*influxql.Call, error) { + if m.Call == "" { + return nil, nil + } + + c, err := influxql.ParseExpr(m.Call) + if err != nil { + return nil, err + } + call, ok := c.(*influxql.Call) + + if !ok { + return nil, errors.New("unable to marshal aggregate call") + } + return call, nil +} + +// FilterExprs will parse the filter strings and return any expressions. This array +// will be the same size as the SeriesIDs array with each element having a filter (which could be nil) +func (m *RemoteMapper) FilterExprs() []influxql.Expr { + exprs := make([]influxql.Expr, len(m.SeriesIDs), len(m.SeriesIDs)) + + // if filters is empty, they're all nil. if filters has one element, all filters + // should be set to that. Otherwise marshal each filter + if len(m.Filters) == 1 { + f, _ := influxql.ParseExpr(m.Filters[0]) + for i, _ := range exprs { + exprs[i] = f + } + } else if len(m.Filters) > 1 { + for i, s := range m.Filters { + f, _ := influxql.ParseExpr(s) + exprs[i] = f + } + } + + return exprs +} + +// SetFilters will convert the given arrray of filters into filters that can be marshaled and sent to the remote system +func (m *RemoteMapper) SetFilters(filters []influxql.Expr) { + l := filters[0] + allFiltersTheSame := true + for _, f := range filters { + if l != f { + allFiltersTheSame = false + break + } + } + + // we don't need anything if they're all the same and nil + if l == nil && allFiltersTheSame { + return + } else if allFiltersTheSame { // just set one filter element since they're all the same + m.Filters = []string{l.String()} + return + } + + // marshal all of them since there are different ones + m.Filters = make([]string, len(filters), len(filters)) + for i, f := range filters { + m.Filters[i] = f.String() + } +} diff --git a/server.go b/server.go index 05ca4513ce5..9dfd84d10cf 100644 --- a/server.go +++ b/server.go @@ -7,6 +7,7 @@ import ( "fmt" "io" "log" + "math" "net/http" "net/url" "os" @@ -818,6 +819,17 @@ func (s *Server) DataNode(id uint64) *DataNode { return s.dataNodes[id] } +// DataNodesByID returns the data nodes matching the passed ids +func (s *Server) DataNodesByID(ids []uint64) []*DataNode { + s.mu.RLock() + defer s.mu.RUnlock() + var a []*DataNode + for _, id := range ids { + a = append(a, s.dataNodes[id]) + } + return a +} + // DataNodeByURL returns a data node by url. func (s *Server) DataNodeByURL(u *url.URL) *DataNode { s.mu.RLock() @@ -1694,7 +1706,7 @@ func (s *Server) applyDropSeries(m *messaging.Message) error { } // DropSeries deletes from an existing series. -func (s *Server) DropSeries(database string, seriesByMeasurement map[string][]uint32) error { +func (s *Server) DropSeries(database string, seriesByMeasurement map[string][]uint64) error { c := dropSeriesCommand{Database: database, SeriesByMeasurement: seriesByMeasurement} _, err := s.broadcast(dropSeriesMessageType, c) return err @@ -2236,10 +2248,7 @@ func (s *Server) executeSelectStatement(statementID int, stmt *influxql.SelectSt } // Execute plan. - ch, err := e.Execute() - if err != nil { - return err - } + ch := e.Execute() // Stream results from the channel. We should send an empty result if nothing comes through. resultSent := false @@ -2456,13 +2465,13 @@ func (s *Server) executeDropMeasurementStatement(stmt *influxql.DropMeasurementS func (s *Server) executeDropSeriesStatement(stmt *influxql.DropSeriesStatement, database string, user *User) *Result { s.mu.RLock() - seriesByMeasurement := make(map[string][]uint32) + seriesByMeasurement := make(map[string][]uint64) // Handle the simple `DROP SERIES ` case. if stmt.Source == nil && stmt.Condition == nil { for _, db := range s.databases { for _, m := range db.measurements { if m.seriesByID[stmt.SeriesID] != nil { - seriesByMeasurement[m.Name] = []uint32{stmt.SeriesID} + seriesByMeasurement[m.Name] = []uint64{stmt.SeriesID} } } } @@ -2491,7 +2500,7 @@ func (s *Server) executeDropSeriesStatement(stmt *influxql.DropSeriesStatement, var ids seriesIDs if stmt.Condition != nil { // Get series IDs that match the WHERE clause. - filters := map[uint32]influxql.Expr{} + filters := map[uint64]influxql.Expr{} ids, _, _ = m.walkWhereForSeriesIds(stmt.Condition, filters) // TODO: check return of walkWhereForSeriesIds for fields @@ -2534,7 +2543,7 @@ func (s *Server) executeShowSeriesStatement(stmt *influxql.ShowSeriesStatement, if stmt.Condition != nil { // Get series IDs that match the WHERE clause. - filters := map[uint32]influxql.Expr{} + filters := map[uint64]influxql.Expr{} ids, _, _ = m.walkWhereForSeriesIds(stmt.Condition, filters) // If no series matched, then go to the next measurement. @@ -2749,7 +2758,7 @@ func (s *Server) executeShowTagValuesStatement(stmt *influxql.ShowTagValuesState if stmt.Condition != nil { // Get series IDs that match the WHERE clause. - filters := map[uint32]influxql.Expr{} + filters := map[uint64]influxql.Expr{} ids, _, _ = m.walkWhereForSeriesIds(stmt.Condition, filters) // If no series matched, then go to the next measurement. @@ -3082,6 +3091,65 @@ func (s *Server) measurement(database, name string) (*Measurement, error) { // Begin returns an unopened transaction associated with the server. func (s *Server) Begin() (influxql.Tx, error) { return newTx(s), nil } +// StartLocalMapper will create a local mapper for the passed in remote mapper +func (s *Server) StartLocalMapper(rm *RemoteMapper) (*LocalMapper, error) { + s.mu.RLock() + defer s.mu.RUnlock() + + // get everything we need to run the local mapper + shard := s.shards[rm.ShardID] + if shard == nil { + return nil, ErrShardNotFound + } + + // this should never be the case, but we have to be sure + if shard.store == nil { + return nil, ErrShardNotLocal + } + + db := s.databases[rm.Database] + if db == nil { + return nil, ErrDatabaseNotFound(rm.Database) + } + + m := db.measurements[rm.MeasurementName] + if m == nil { + return nil, ErrMeasurementNotFound(rm.MeasurementName) + } + + // create a job, it's only used as a container for a few variables + job := &influxql.MapReduceJob{ + MeasurementName: rm.MeasurementName, + TMin: rm.TMin, + TMax: rm.TMax, + } + + // limits and offsets can't be evaluated at the local mapper so we need to read + // limit + offset points to be sure that the reducer will be able to correctly put things together + limit := uint64(rm.Limit) + uint64(rm.Offset) + // if limit is zero, just set to the max number since we use limit == 0 later to determine if the mapper is empty + if limit == 0 { + limit = math.MaxUint64 + } + + // now create and start the local mapper + lm := &LocalMapper{ + seriesIDs: rm.SeriesIDs, + job: job, + db: shard.store, + decoder: NewFieldCodec(m), + filters: rm.FilterExprs(), + whereFields: rm.WhereFields, + selectFields: rm.SelectFields, + selectTags: rm.SelectTags, + interval: rm.Interval, + tmax: rm.TMax, + limit: limit, + } + + return lm, nil +} + // NormalizeStatement adds a default database and policy to the measurements in statement. func (s *Server) NormalizeStatement(stmt influxql.Statement, defaultDatabase string) (err error) { s.mu.RLock() @@ -3791,10 +3859,7 @@ func (s *Server) runContinuousQueryAndWriteResult(cq *ContinuousQuery) error { } // Execute plan. - ch, err := e.Execute() - if err != nil { - return err - } + ch := e.Execute() // Read all rows from channel and write them in for row := range ch { diff --git a/shard.go b/shard.go index 6d4bfeebd31..fd82bd78e7b 100644 --- a/shard.go +++ b/shard.go @@ -30,7 +30,7 @@ func (g *ShardGroup) close() { } // ShardBySeriesID returns the shard that a series is assigned to in the group. -func (g *ShardGroup) ShardBySeriesID(seriesID uint32) *Shard { +func (g *ShardGroup) ShardBySeriesID(seriesID uint64) *Shard { return g.Shards[int(seriesID)%len(g.Shards)] } @@ -45,7 +45,7 @@ func (g *ShardGroup) Contains(min, max time.Time) bool { } // dropSeries will delete all data with the seriesID -func (g *ShardGroup) dropSeries(seriesIDs ...uint32) error { +func (g *ShardGroup) dropSeries(seriesIDs ...uint64) error { for _, s := range g.Shards { err := s.dropSeries(seriesIDs...) if err != nil { @@ -176,10 +176,10 @@ func (s *Shard) HasDataNodeID(id uint64) bool { } // readSeries reads encoded series data from a shard. -func (s *Shard) readSeries(seriesID uint32, timestamp int64) (values []byte, err error) { +func (s *Shard) readSeries(seriesID uint64, timestamp int64) (values []byte, err error) { err = s.store.View(func(tx *bolt.Tx) error { // Find series bucket. - b := tx.Bucket(u32tob(seriesID)) + b := tx.Bucket(u64tob(seriesID)) if b == nil { return nil } @@ -207,7 +207,7 @@ func (s *Shard) writeSeries(index uint64, batch []byte) error { data := batch[:payloadLength] // Create a bucket for the series. - b, err := tx.CreateBucketIfNotExists(u32tob(seriesID)) + b, err := tx.CreateBucketIfNotExists(u64tob(seriesID)) if err != nil { return err } @@ -235,13 +235,13 @@ func (s *Shard) writeSeries(index uint64, batch []byte) error { }) } -func (s *Shard) dropSeries(seriesIDs ...uint32) error { +func (s *Shard) dropSeries(seriesIDs ...uint64) error { if s.store == nil { return nil } return s.store.Update(func(tx *bolt.Tx) error { for _, seriesID := range seriesIDs { - err := tx.DeleteBucket(u32tob(seriesID)) + err := tx.DeleteBucket(u64tob(seriesID)) if err != bolt.ErrBucketNotFound { return err } @@ -298,22 +298,22 @@ func (s *Shard) processor(conn MessagingConn, closing <-chan struct{}) { type Shards []*Shard // pointHeaderSize represents the size of a point header, in bytes. -const pointHeaderSize = 4 + 4 + 8 // seriesID + payload length + timestamp +const pointHeaderSize = 8 + 4 + 8 // seriesID + payload length + timestamp // marshalPointHeader encodes a series id, payload length, timestamp, & flagset into a byte slice. -func marshalPointHeader(seriesID uint32, payloadLength uint32, timestamp int64) []byte { +func marshalPointHeader(seriesID uint64, payloadLength uint32, timestamp int64) []byte { b := make([]byte, pointHeaderSize) - binary.BigEndian.PutUint32(b[0:4], seriesID) - binary.BigEndian.PutUint32(b[4:8], payloadLength) - binary.BigEndian.PutUint64(b[8:16], uint64(timestamp)) + binary.BigEndian.PutUint64(b[0:8], seriesID) + binary.BigEndian.PutUint32(b[8:12], payloadLength) + binary.BigEndian.PutUint64(b[12:20], uint64(timestamp)) return b } // unmarshalPointHeader decodes a byte slice into a series id, timestamp & flagset. -func unmarshalPointHeader(b []byte) (seriesID uint32, payloadLength uint32, timestamp int64) { - seriesID = binary.BigEndian.Uint32(b[0:4]) - payloadLength = binary.BigEndian.Uint32(b[4:8]) - timestamp = int64(binary.BigEndian.Uint64(b[8:16])) +func unmarshalPointHeader(b []byte) (seriesID uint64, payloadLength uint32, timestamp int64) { + seriesID = binary.BigEndian.Uint64(b[0:8]) + payloadLength = binary.BigEndian.Uint32(b[8:12]) + timestamp = int64(binary.BigEndian.Uint64(b[12:20])) return } diff --git a/tx.go b/tx.go index 4a37aa6f742..65a752e7915 100644 --- a/tx.go +++ b/tx.go @@ -113,6 +113,14 @@ func (tx *tx) CreateMapReduceJobs(stmt *influxql.SelectStatement, tagKeys []stri return nil, nil } + // get the group by interval, if there is one + var interval int64 + if d, err := stmt.GroupByInterval(); err != nil { + return nil, err + } else { + interval = d.Nanoseconds() + } + // get the sorted unique tag sets for this query. tagSets := m.tagSets(stmt, tagKeys) @@ -137,15 +145,49 @@ func (tx *tx) CreateMapReduceJobs(stmt *influxql.SelectStatement, tagKeys []stri } shard := sg.Shards[0] - mapper := &LocalMapper{ - seriesIDs: t.SeriesIDs, - db: shard.store, - job: job, - decoder: NewFieldCodec(m), - filters: t.Filters, - whereFields: whereFields, - selectFields: selectFields, - selectTags: selectTags, + + var mapper influxql.Mapper + + // create either a remote or local mapper for this shard + if shard.store == nil { + nodes := tx.server.DataNodesByID(shard.DataNodeIDs) + if len(nodes) == 0 { + return nil, ErrShardNotFound + } + + mapper = &RemoteMapper{ + dataNodes: nodes, + Database: mm.Database, + MeasurementName: m.Name, + TMin: tmin.UnixNano(), + TMax: tmax.UnixNano(), + SeriesIDs: t.SeriesIDs, + ShardID: shard.ID, + WhereFields: whereFields, + SelectFields: selectFields, + SelectTags: selectTags, + Limit: stmt.Limit, + Offset: stmt.Offset, + Interval: interval, + } + mapper.(*RemoteMapper).SetFilters(t.Filters) + } else { + mapper = &LocalMapper{ + seriesIDs: t.SeriesIDs, + db: shard.store, + job: job, + decoder: NewFieldCodec(m), + filters: t.Filters, + whereFields: whereFields, + selectFields: selectFields, + selectTags: selectTags, + tmax: tmax.UnixNano(), + interval: interval, + // multiple mappers may need to be merged together to get the results + // for a raw query. So each mapper will have to read at least the + // limit plus the offset in data points to ensure we've hit our mark + limit: uint64(stmt.Limit) + uint64(stmt.Offset), + } } mappers = append(mappers, mapper) @@ -202,28 +244,30 @@ func (tx *tx) fieldNames(fields []*influxql.Field) []string { // LocalMapper implements the influxql.Mapper interface for running map tasks over a shard that is local to this server type LocalMapper struct { - cursorsEmpty bool // boolean that lets us know if the cursors are empty - decoder fieldDecoder // decoder for the raw data bytes - tagSet *influxql.TagSet // filters to be applied to each series - filters []influxql.Expr // filters for each series - cursors []*bolt.Cursor // bolt cursors for each series id - seriesIDs []uint32 // seriesIDs to be read from this shard - db *bolt.DB // bolt store for the shard accessed by this mapper - txn *bolt.Tx // read transactions by shard id - job *influxql.MapReduceJob // the MRJob this mapper belongs to - mapFunc influxql.MapFunc // the map func - fieldID uint8 // the field ID associated with the mapFunc curently being run - fieldName string // the field name associated with the mapFunc currently being run - keyBuffer []int64 // the current timestamp key for each cursor - valueBuffer [][]byte // the current value for each cursor - tmin int64 // the min of the current group by interval being iterated over - tmax int64 // the max of the current group by interval being iterated over - additionalNames []string // additional field or tag names that might be requested from the map function - whereFields []*Field // field names that occur in the where clause - selectFields []*Field // field names that occur in the select clause - selectTags []string // tag keys that occur in the select clause - isRaw bool // if the query is a non-aggregate query - limit int // used for raw queries to limit the amount of data read in before pushing out to client + cursorsEmpty bool // boolean that lets us know if the cursors are empty + decoder fieldDecoder // decoder for the raw data bytes + filters []influxql.Expr // filters for each series + cursors []*bolt.Cursor // bolt cursors for each series id + seriesIDs []uint64 // seriesIDs to be read from this shard + db *bolt.DB // bolt store for the shard accessed by this mapper + txn *bolt.Tx // read transactions by shard id + job *influxql.MapReduceJob // the MRJob this mapper belongs to + mapFunc influxql.MapFunc // the map func + fieldID uint8 // the field ID associated with the mapFunc curently being run + fieldName string // the field name associated with the mapFunc currently being run + keyBuffer []int64 // the current timestamp key for each cursor + valueBuffer [][]byte // the current value for each cursor + tmin int64 // the min of the current group by interval being iterated over + tmax int64 // the max of the current group by interval being iterated over + additionalNames []string // additional field or tag names that might be requested from the map function + whereFields []*Field // field names that occur in the where clause + selectFields []*Field // field names that occur in the select clause + selectTags []string // tag keys that occur in the select clause + isRaw bool // if the query is a non-aggregate query + interval int64 // the group by interval of the query, if any + limit uint64 // used for raw queries for LIMIT + perIntervalLimit int // used for raw queries to determine how far into a chunk we are + chunkSize int // used for raw queries to determine how much data to read before flushing to client } // Open opens the LocalMapper. @@ -239,7 +283,7 @@ func (l *LocalMapper) Open() error { l.cursors = make([]*bolt.Cursor, len(l.seriesIDs)) for i, id := range l.seriesIDs { - b := l.txn.Bucket(u32tob(id)) + b := l.txn.Bucket(u64tob(id)) if b == nil { continue } @@ -256,7 +300,7 @@ func (l *LocalMapper) Close() { } // Begin will set up the mapper to run the map function for a given aggregate call starting at the passed in time -func (l *LocalMapper) Begin(c *influxql.Call, startingTime int64) error { +func (l *LocalMapper) Begin(c *influxql.Call, startingTime int64, chunkSize int) error { // set up the buffers. These ensure that we return data in time order mapFunc, err := influxql.InitializeMapFunc(c) if err != nil { @@ -265,6 +309,7 @@ func (l *LocalMapper) Begin(c *influxql.Call, startingTime int64) error { l.mapFunc = mapFunc l.keyBuffer = make([]int64, len(l.cursors)) l.valueBuffer = make([][]byte, len(l.cursors)) + l.chunkSize = chunkSize l.tmin = startingTime // determine if this is a raw data query with a single field, multiple fields, or an aggregate @@ -274,6 +319,11 @@ func (l *LocalMapper) Begin(c *influxql.Call, startingTime int64) error { if len(l.selectFields) == 1 { fieldName = l.selectFields[0].Name } + + // if they haven't set a limit, just set it to the max int size + if l.limit == 0 { + l.limit = math.MaxUint64 + } } else { lit, ok := c.Args[0].(*influxql.VarRef) if !ok { @@ -317,17 +367,26 @@ func (l *LocalMapper) Begin(c *influxql.Call, startingTime int64) error { // NextInterval will get the time ordered next interval of the given interval size from the mapper. This is a // forward only operation from the start time passed into Begin. Will return nil when there is no more data to be read. // If this is a raw query, interval should be the max time to hit in the query -func (l *LocalMapper) NextInterval(interval int64) (interface{}, error) { +func (l *LocalMapper) NextInterval() (interface{}, error) { if l.cursorsEmpty || l.tmin > l.job.TMax { return nil, nil } + // after we call to the mapper, this will be the tmin for the next interval. + nextMin := l.tmin + l.interval + // Set the upper bound of the interval. if l.isRaw { - l.tmax = interval - } else if interval > 0 { - // Make sure the bottom of the interval lands on a natural boundary. - l.tmax = l.tmin + interval - 1 + l.perIntervalLimit = l.chunkSize + } else if l.interval > 0 { + // Set tmax to ensure that the interval lands on the boundary of the interval + if l.tmin%l.interval != 0 { + // the first interval in a query with a group by may be smaller than the others. This happens when they have a + // where time > clause that is in the middle of the bucket that the group by time creates. That will be the + // case on the first interval when the tmin % the interval isn't equal to zero + nextMin = l.tmin/l.interval*l.interval + l.interval + } + l.tmax = nextMin - 1 } // Execute the map function. This local mapper acts as the iterator @@ -344,23 +403,19 @@ func (l *LocalMapper) NextInterval(interval int64) (interface{}, error) { // Move the interval forward if it's not a raw query. For raw queries we use the limit to advance intervals. if !l.isRaw { - l.tmin += interval + l.tmin = nextMin } return val, nil } -// SetLimit will tell the mapper to only yield that number of points (or to the max time) to Next -func (l *LocalMapper) SetLimit(limit int) { - l.limit = limit -} - // Next returns the next matching timestamped value for the LocalMapper. -func (l *LocalMapper) Next() (seriesID uint32, timestamp int64, value interface{}) { +func (l *LocalMapper) Next() (seriesID uint64, timestamp int64, value interface{}) { for { - // if it's a raw query and we've hit the limit of the number of points to read in, bail - if l.isRaw && l.limit == 0 { - return uint32(0), int64(0), nil + // if it's a raw query and we've hit the limit of the number of points to read in + // for either this chunk or for the absolute query, bail + if l.isRaw && (l.limit == 0 || l.perIntervalLimit == 0) { + return uint64(0), int64(0), nil } // find the minimum timestamp @@ -432,12 +487,30 @@ func (l *LocalMapper) Next() (seriesID uint32, timestamp int64, value interface{ // if it's a raw query, we always limit the amount we read in if l.isRaw { l.limit-- + l.perIntervalLimit-- } return seriesID, timestamp, value } } +// IsEmpty returns true if either all cursors are nil or all cursors are past the passed in max time +func (l *LocalMapper) IsEmpty(tmax int64) bool { + if l.cursorsEmpty || l.limit == 0 { + return true + } + + // look at the next time for each cursor + for _, t := range l.keyBuffer { + // if the time is less than the max, we haven't emptied this mapper yet + if t != 0 && t <= tmax { + return false + } + } + + return true +} + // matchesFilter returns true if the value matches the where clause func matchesWhere(f influxql.Expr, fields map[string]interface{}) bool { if ok, _ := influxql.Eval(f, fields).(bool); !ok {