Skip to content

Commit

Permalink
Merge pull request #4180 from benbjohnson/refactor-select-mapper
Browse files Browse the repository at this point in the history
Cursor & SelectMapper Refactor (WIP)
  • Loading branch information
benbjohnson committed Sep 22, 2015
2 parents 410eb4e + 8e27cf1 commit 522f9ea
Show file tree
Hide file tree
Showing 23 changed files with 1,536 additions and 1,647 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
- [#4165](https://github.com/influxdb/influxdb/pull/4165): Tag all Go runtime stats when writing to internal database.
- [#4118](https://github.com/influxdb/influxdb/issues/4118): Return consistent, correct result for SHOW MEASUREMENTS with multiple AND conditions
- [#4191](https://github.com/influxdb/influxdb/pull/4191): Correctly marshal remote mapper responses. Fixes [#4170](https://github.com/influxdb/influxdb/issues/4170)
- [#4180](https://github.com/influxdb/influxdb/pull/4180): Cursor & SelectMapper Refactor

## v0.9.4 [2015-09-14]

Expand Down
83 changes: 72 additions & 11 deletions cluster/shard_mapper.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package cluster

import (
"encoding/json"
"fmt"
"math/rand"
"net"
Expand Down Expand Up @@ -40,11 +41,7 @@ func NewShardMapper(timeout time.Duration) *ShardMapper {

// CreateMapper returns a Mapper for the given shard ID.
func (s *ShardMapper) CreateMapper(sh meta.ShardInfo, stmt influxql.Statement, chunkSize int) (tsdb.Mapper, error) {
m, err := s.TSDBStore.CreateMapper(sh.ID, stmt, chunkSize)
if err != nil {
return nil, err
}

// Create a remote mapper if the local node doesn't own the shard.
if !sh.OwnedBy(s.MetaStore.NodeID()) || s.ForceRemoteMapping {
// Pick a node in a pseudo-random manner.
conn, err := s.dial(sh.Owners[rand.Intn(len(sh.Owners))].NodeID)
Expand All @@ -53,7 +50,13 @@ func (s *ShardMapper) CreateMapper(sh meta.ShardInfo, stmt influxql.Statement, c
}
conn.SetDeadline(time.Now().Add(s.timeout))

m.SetRemote(NewRemoteMapper(conn, sh.ID, stmt, chunkSize))
return NewRemoteMapper(conn, sh.ID, stmt, chunkSize), nil
}

// If it is local then return the mapper from the store.
m, err := s.TSDBStore.CreateMapper(sh.ID, stmt, chunkSize)
if err != nil {
return nil, err
}

return m, nil
Expand Down Expand Up @@ -87,6 +90,8 @@ type RemoteMapper struct {

conn net.Conn
bufferedResponse *MapShardResponse

unmarshallers []tsdb.UnmarshalFunc // Mapping-specific unmarshal functions.
}

// NewRemoteMapper returns a new remote mapper using the given connection.
Expand All @@ -106,6 +111,7 @@ func (r *RemoteMapper) Open() (err error) {
r.conn.Close()
}
}()

// Build Map request.
var request MapShardRequest
request.SetShardID(r.shardID)
Expand Down Expand Up @@ -143,11 +149,18 @@ func (r *RemoteMapper) Open() (err error) {
r.tagsets = r.bufferedResponse.TagSets()
r.fields = r.bufferedResponse.Fields()

return nil
}
// Set up each mapping function for this statement.
if stmt, ok := r.stmt.(*influxql.SelectStatement); ok {
for _, c := range stmt.FunctionCalls() {
fn, err := tsdb.InitializeUnmarshaller(c)
if err != nil {
return err
}
r.unmarshallers = append(r.unmarshallers, fn)
}
}

func (r *RemoteMapper) SetRemote(m tsdb.Mapper) error {
return fmt.Errorf("cannot set remote mapper on a remote mapper")
return nil
}

func (r *RemoteMapper) TagSets() []string {
Expand Down Expand Up @@ -187,7 +200,55 @@ func (r *RemoteMapper) NextChunk() (chunk interface{}, err error) {
return nil, nil
}

return response.Data(), err
moj := &tsdb.MapperOutputJSON{}
if err := json.Unmarshal(response.Data(), moj); err != nil {
return nil, err
}
mvj := []*tsdb.MapperValueJSON{}
if err := json.Unmarshal(moj.Values, &mvj); err != nil {
return nil, err
}

// Prep the non-JSON version of Mapper output.
mo := &tsdb.MapperOutput{
Name: moj.Name,
Tags: moj.Tags,
Fields: moj.Fields,
}

if len(mvj) == 1 && len(mvj[0].AggData) > 0 {
// The MapperValue is carrying aggregate data, so run it through the
// custom unmarshallers for the map functions through which the data
// was mapped.
aggValues := []interface{}{}
for i, b := range mvj[0].AggData {
v, err := r.unmarshallers[i](b)
if err != nil {
return nil, err
}
aggValues = append(aggValues, v)
}
mo.Values = []*tsdb.MapperValue{&tsdb.MapperValue{
Value: aggValues,
Tags: mvj[0].Tags,
}}
} else {
// Must be raw data instead.
for _, v := range mvj {
var rawValue interface{}
if err := json.Unmarshal(v.RawData, &rawValue); err != nil {
return nil, err
}

mo.Values = append(mo.Values, &tsdb.MapperValue{
Time: v.Time,
Value: rawValue,
Tags: v.Tags,
})
}
}

return mo, nil
}

// Close the Mapper
Expand Down
6 changes: 1 addition & 5 deletions cluster/shard_mapper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,14 +80,10 @@ func TestShardWriter_RemoteMapper_Success(t *testing.T) {
if err != nil {
t.Fatalf("failed to get next chunk from mapper: %s", err.Error())
}
b, ok := chunk.([]byte)
output, ok := chunk.(*tsdb.MapperOutput)
if !ok {
t.Fatal("chunk is not of expected type")
}
output := &tsdb.MapperOutput{}
if err := json.Unmarshal(b, output); err != nil {
t.Fatal(err)
}
if output.Name != "cpu" {
t.Fatalf("received output incorrect, exp: %v, got %v", expOutput, output)
}
Expand Down
2 changes: 2 additions & 0 deletions cmd/influxd/run/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2805,6 +2805,7 @@ func TestServer_Query_Wildcards(t *testing.T) {
t.Logf("SKIP:: %s", query.name)
continue
}

if err := query.Execute(s); err != nil {
t.Error(query.Error(err))
} else if !query.success() {
Expand Down Expand Up @@ -3181,6 +3182,7 @@ func TestServer_Query_Where_Fields(t *testing.T) {
t.Logf("SKIP:: %s", query.name)
continue
}

if err := query.Execute(s); err != nil {
t.Error(query.Error(err))
} else if !query.success() {
Expand Down
18 changes: 6 additions & 12 deletions cmd/inspect/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,30 +77,24 @@ func main() {

for _, key := range series {
fieldSummary := []string{}

cursor := tx.Cursor(key, tsdb.Forward)
cursor := tx.Cursor(key, m.FieldNames(), shard.FieldCodec(m.Name), true)

// Series doesn't exist in this shard
if cursor == nil {
continue
}

// Seek to the beginning
_, value := cursor.Seek([]byte{})
codec := shard.FieldCodec(m.Name)
if codec != nil {
fields, err := codec.DecodeFieldsWithNames(value)
if err != nil {
fmt.Printf("Failed to decode values: %v", err)
}

_, fields := cursor.SeekTo(0)
if fields, ok := fields.(map[string]interface{}); ok {
for field, value := range fields {
fieldSummary = append(fieldSummary, fmt.Sprintf("%s:%T", field, value))
}
sort.Strings(fieldSummary)

fmt.Fprintf(tw, "%d\t%s\t%s\t%d/%d\t%d [%s]\t%d\n", shardID, db, m.Name, len(tags), tagValues,
len(fields), strings.Join(fieldSummary, ","), len(series))
}
fmt.Fprintf(tw, "%d\t%s\t%s\t%d/%d\t%d [%s]\t%d\n", shardID, db, m.Name, len(tags), tagValues,
len(fields), strings.Join(fieldSummary, ","), len(series))
break
}
tx.Rollback()
Expand Down
43 changes: 43 additions & 0 deletions influxql/ast.go
Original file line number Diff line number Diff line change
Expand Up @@ -718,6 +718,18 @@ type SelectStatement struct {
FillValue interface{}
}

// SourceNames returns a list of source names.
func (s *SelectStatement) SourceNames() []string {
a := make([]string, 0, len(s.Sources))
for _, src := range s.Sources {
switch src := src.(type) {
case *Measurement:
a = append(a, src.Name)
}
}
return a
}

// HasDerivative returns true if one of the function calls in the statement is a
// derivative aggregate
func (s *SelectStatement) HasDerivative() bool {
Expand All @@ -743,6 +755,11 @@ func (s *SelectStatement) IsSimpleDerivative() bool {
return false
}

// TimeAscending returns true if the time field is sorted in chronological order.
func (s *SelectStatement) TimeAscending() bool {
return len(s.SortFields) == 0 || s.SortFields[0].Ascending
}

// Clone returns a deep copy of the statement.
func (s *SelectStatement) Clone() *SelectStatement {
clone := &SelectStatement{
Expand Down Expand Up @@ -1517,6 +1534,25 @@ func (s *SelectStatement) NamesInDimension() []string {
return a
}

// LimitTagSets returns a tag set list with SLIMIT and SOFFSET applied.
func (s *SelectStatement) LimitTagSets(a []*TagSet) []*TagSet {
// Ignore if no limit or offset is specified.
if s.SLimit == 0 && s.SOffset == 0 {
return a
}

// If offset is beyond the number of tag sets then return nil.
if s.SOffset > len(a) {
return nil
}

// Clamp limit to the max number of tag sets.
if s.SOffset+s.SLimit > len(a) {
s.SLimit = len(a) - s.SOffset
}
return a[s.SOffset : s.SOffset+s.SLimit]
}

// walkNames will walk the Expr and return the database fields
func walkNames(exp Expr) []string {
switch expr := exp.(type) {
Expand Down Expand Up @@ -2950,6 +2986,13 @@ func evalBinaryExpr(expr *BinaryExpr, m map[string]interface{}) interface{} {
return nil
}

// EvalBool evaluates expr and returns true if result is a boolean true.
// Otherwise returns false.
func EvalBool(expr Expr, m map[string]interface{}) bool {
v, _ := Eval(expr, m).(bool)
return v
}

// Reduce evaluates expr using the available values in valuer.
// References that don't exist in valuer are ignored.
func Reduce(expr Expr, valuer Valuer) Expr {
Expand Down
Loading

0 comments on commit 522f9ea

Please sign in to comment.