Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Cursor & SelectMapper Refactor (WIP) #4180

Merged
merged 9 commits into from
Sep 22, 2015
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
- [#4165](https://github.com/influxdb/influxdb/pull/4165): Tag all Go runtime stats when writing to internal database.
- [#4118](https://github.com/influxdb/influxdb/issues/4118): Return consistent, correct result for SHOW MEASUREMENTS with multiple AND conditions
- [#4191](https://github.com/influxdb/influxdb/pull/4191): Correctly marshal remote mapper responses. Fixes [#4170](https://github.com/influxdb/influxdb/issues/4170)
- [#4180](https://github.com/influxdb/influxdb/pull/4180): Cursor & SelectMapper Refactor

## v0.9.4 [2015-09-14]

Expand Down
83 changes: 72 additions & 11 deletions cluster/shard_mapper.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package cluster

import (
"encoding/json"
"fmt"
"math/rand"
"net"
Expand Down Expand Up @@ -40,11 +41,7 @@ func NewShardMapper(timeout time.Duration) *ShardMapper {

// CreateMapper returns a Mapper for the given shard ID.
func (s *ShardMapper) CreateMapper(sh meta.ShardInfo, stmt influxql.Statement, chunkSize int) (tsdb.Mapper, error) {
m, err := s.TSDBStore.CreateMapper(sh.ID, stmt, chunkSize)
if err != nil {
return nil, err
}

// Create a remote mapper if the local node doesn't own the shard.
if !sh.OwnedBy(s.MetaStore.NodeID()) || s.ForceRemoteMapping {
// Pick a node in a pseudo-random manner.
conn, err := s.dial(sh.Owners[rand.Intn(len(sh.Owners))].NodeID)
Expand All @@ -53,7 +50,13 @@ func (s *ShardMapper) CreateMapper(sh meta.ShardInfo, stmt influxql.Statement, c
}
conn.SetDeadline(time.Now().Add(s.timeout))

m.SetRemote(NewRemoteMapper(conn, sh.ID, stmt, chunkSize))
return NewRemoteMapper(conn, sh.ID, stmt, chunkSize), nil
}

// If it is local then return the mapper from the store.
m, err := s.TSDBStore.CreateMapper(sh.ID, stmt, chunkSize)
if err != nil {
return nil, err
}

return m, nil
Expand Down Expand Up @@ -87,6 +90,8 @@ type RemoteMapper struct {

conn net.Conn
bufferedResponse *MapShardResponse

unmarshallers []tsdb.UnmarshalFunc // Mapping-specific unmarshal functions.
}

// NewRemoteMapper returns a new remote mapper using the given connection.
Expand All @@ -106,6 +111,7 @@ func (r *RemoteMapper) Open() (err error) {
r.conn.Close()
}
}()

// Build Map request.
var request MapShardRequest
request.SetShardID(r.shardID)
Expand Down Expand Up @@ -143,11 +149,18 @@ func (r *RemoteMapper) Open() (err error) {
r.tagsets = r.bufferedResponse.TagSets()
r.fields = r.bufferedResponse.Fields()

return nil
}
// Set up each mapping function for this statement.
if stmt, ok := r.stmt.(*influxql.SelectStatement); ok {
for _, c := range stmt.FunctionCalls() {
fn, err := tsdb.InitializeUnmarshaller(c)
if err != nil {
return err
}
r.unmarshallers = append(r.unmarshallers, fn)
}
}

func (r *RemoteMapper) SetRemote(m tsdb.Mapper) error {
return fmt.Errorf("cannot set remote mapper on a remote mapper")
return nil
}

func (r *RemoteMapper) TagSets() []string {
Expand Down Expand Up @@ -187,7 +200,55 @@ func (r *RemoteMapper) NextChunk() (chunk interface{}, err error) {
return nil, nil
}

return response.Data(), err
moj := &tsdb.MapperOutputJSON{}
if err := json.Unmarshal(response.Data(), moj); err != nil {
return nil, err
}
mvj := []*tsdb.MapperValueJSON{}
if err := json.Unmarshal(moj.Values, &mvj); err != nil {
return nil, err
}

// Prep the non-JSON version of Mapper output.
mo := &tsdb.MapperOutput{
Name: moj.Name,
Tags: moj.Tags,
Fields: moj.Fields,
}

if len(mvj) == 1 && len(mvj[0].AggData) > 0 {
// The MapperValue is carrying aggregate data, so run it through the
// custom unmarshallers for the map functions through which the data
// was mapped.
aggValues := []interface{}{}
for i, b := range mvj[0].AggData {
v, err := r.unmarshallers[i](b)
if err != nil {
return nil, err
}
aggValues = append(aggValues, v)
}
mo.Values = []*tsdb.MapperValue{&tsdb.MapperValue{
Value: aggValues,
Tags: mvj[0].Tags,
}}
} else {
// Must be raw data instead.
for _, v := range mvj {
var rawValue interface{}
if err := json.Unmarshal(v.RawData, &rawValue); err != nil {
return nil, err
}

mo.Values = append(mo.Values, &tsdb.MapperValue{
Time: v.Time,
Value: rawValue,
Tags: v.Tags,
})
}
}

return mo, nil
}

// Close the Mapper
Expand Down
6 changes: 1 addition & 5 deletions cluster/shard_mapper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,14 +80,10 @@ func TestShardWriter_RemoteMapper_Success(t *testing.T) {
if err != nil {
t.Fatalf("failed to get next chunk from mapper: %s", err.Error())
}
b, ok := chunk.([]byte)
output, ok := chunk.(*tsdb.MapperOutput)
if !ok {
t.Fatal("chunk is not of expected type")
}
output := &tsdb.MapperOutput{}
if err := json.Unmarshal(b, output); err != nil {
t.Fatal(err)
}
if output.Name != "cpu" {
t.Fatalf("received output incorrect, exp: %v, got %v", expOutput, output)
}
Expand Down
2 changes: 2 additions & 0 deletions cmd/influxd/run/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2805,6 +2805,7 @@ func TestServer_Query_Wildcards(t *testing.T) {
t.Logf("SKIP:: %s", query.name)
continue
}

if err := query.Execute(s); err != nil {
t.Error(query.Error(err))
} else if !query.success() {
Expand Down Expand Up @@ -3181,6 +3182,7 @@ func TestServer_Query_Where_Fields(t *testing.T) {
t.Logf("SKIP:: %s", query.name)
continue
}

if err := query.Execute(s); err != nil {
t.Error(query.Error(err))
} else if !query.success() {
Expand Down
18 changes: 6 additions & 12 deletions cmd/inspect/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,30 +77,24 @@ func main() {

for _, key := range series {
fieldSummary := []string{}

cursor := tx.Cursor(key, tsdb.Forward)
cursor := tx.Cursor(key, m.FieldNames(), shard.FieldCodec(m.Name), true)

// Series doesn't exist in this shard
if cursor == nil {
continue
}

// Seek to the beginning
_, value := cursor.Seek([]byte{})
codec := shard.FieldCodec(m.Name)
if codec != nil {
fields, err := codec.DecodeFieldsWithNames(value)
if err != nil {
fmt.Printf("Failed to decode values: %v", err)
}

_, fields := cursor.SeekTo(0)
if fields, ok := fields.(map[string]interface{}); ok {
for field, value := range fields {
fieldSummary = append(fieldSummary, fmt.Sprintf("%s:%T", field, value))
}
sort.Strings(fieldSummary)

fmt.Fprintf(tw, "%d\t%s\t%s\t%d/%d\t%d [%s]\t%d\n", shardID, db, m.Name, len(tags), tagValues,
len(fields), strings.Join(fieldSummary, ","), len(series))
}
fmt.Fprintf(tw, "%d\t%s\t%s\t%d/%d\t%d [%s]\t%d\n", shardID, db, m.Name, len(tags), tagValues,
len(fields), strings.Join(fieldSummary, ","), len(series))
break
}
tx.Rollback()
Expand Down
43 changes: 43 additions & 0 deletions influxql/ast.go
Original file line number Diff line number Diff line change
Expand Up @@ -718,6 +718,18 @@ type SelectStatement struct {
FillValue interface{}
}

// SourceNames returns a list of source names.
func (s *SelectStatement) SourceNames() []string {
a := make([]string, 0, len(s.Sources))
for _, src := range s.Sources {
switch src := src.(type) {
case *Measurement:
a = append(a, src.Name)
}
}
return a
}

// HasDerivative returns true if one of the function calls in the statement is a
// derivative aggregate
func (s *SelectStatement) HasDerivative() bool {
Expand All @@ -743,6 +755,11 @@ func (s *SelectStatement) IsSimpleDerivative() bool {
return false
}

// TimeAscending returns true if the time field is sorted in chronological order.
func (s *SelectStatement) TimeAscending() bool {
return len(s.SortFields) == 0 || s.SortFields[0].Ascending
}

// Clone returns a deep copy of the statement.
func (s *SelectStatement) Clone() *SelectStatement {
clone := &SelectStatement{
Expand Down Expand Up @@ -1517,6 +1534,25 @@ func (s *SelectStatement) NamesInDimension() []string {
return a
}

// LimitTagSets returns a tag set list with SLIMIT and SOFFSET applied.
func (s *SelectStatement) LimitTagSets(a []*TagSet) []*TagSet {
// Ignore if no limit or offset is specified.
if s.SLimit == 0 && s.SOffset == 0 {
return a
}

// If offset is beyond the number of tag sets then return nil.
if s.SOffset > len(a) {
return nil
}

// Clamp limit to the max number of tag sets.
if s.SOffset+s.SLimit > len(a) {
s.SLimit = len(a) - s.SOffset
}
return a[s.SOffset : s.SOffset+s.SLimit]
}

// walkNames will walk the Expr and return the database fields
func walkNames(exp Expr) []string {
switch expr := exp.(type) {
Expand Down Expand Up @@ -2950,6 +2986,13 @@ func evalBinaryExpr(expr *BinaryExpr, m map[string]interface{}) interface{} {
return nil
}

// EvalBool evaluates expr and returns true if result is a boolean true.
// Otherwise returns false.
func EvalBool(expr Expr, m map[string]interface{}) bool {
v, _ := Eval(expr, m).(bool)
return v
}

// Reduce evaluates expr using the available values in valuer.
// References that don't exist in valuer are ignored.
func Reduce(expr Expr, valuer Valuer) Expr {
Expand Down
Loading