diff --git a/pkg/plugin/datasource.go b/pkg/plugin/datasource.go index fe0f6ce..cafaba7 100644 --- a/pkg/plugin/datasource.go +++ b/pkg/plugin/datasource.go @@ -116,6 +116,7 @@ func (ds *MQTTDatasource) SubscribeStream(_ context.Context, req *backend.Subscr func (ds *MQTTDatasource) RunStream(ctx context.Context, req *backend.RunStreamRequest, sender *backend.StreamSender) error { ds.Client.Subscribe(req.Path) defer ds.Client.Unsubscribe(req.Path) + fields := make(map[string]*data.Field) for { select { @@ -126,7 +127,7 @@ func (ds *MQTTDatasource) RunStream(ctx context.Context, req *backend.RunStreamR if message.Topic != req.Path { continue } - err := ds.SendMessage(message, req, sender) + err := ds.SendMessage(fields, message, req, sender) if err != nil { log.DefaultLogger.Error(fmt.Sprintf("unable to send message: %s", err.Error())) } @@ -162,7 +163,7 @@ func (ds *MQTTDatasource) Query(query backend.DataQuery) backend.DataResponse { return response } - frame := ToFrame(qm.Topic, messages) + frame := ToFrame(make(map[string]*data.Field), qm.Topic, messages) if qm.Topic != "" { frame.SetMeta(&data.FrameMeta{ @@ -173,8 +174,7 @@ func (ds *MQTTDatasource) Query(query backend.DataQuery) backend.DataResponse { response.Frames = append(response.Frames, frame) return response } - -func (ds *MQTTDatasource) SendMessage(msg mqtt.StreamMessage, req *backend.RunStreamRequest, sender *backend.StreamSender) error { +func (ds *MQTTDatasource) SendMessage(fields map[string]*data.Field, msg mqtt.StreamMessage, req *backend.RunStreamRequest, sender *backend.StreamSender) error { if !ds.Client.IsSubscribed(req.Path) { return nil } @@ -184,7 +184,7 @@ func (ds *MQTTDatasource) SendMessage(msg mqtt.StreamMessage, req *backend.RunSt Value: msg.Value, } - frame := ToFrame(msg.Topic, []mqtt.Message{message}) + frame := ToFrame(fields, msg.Topic, []mqtt.Message{message}) log.DefaultLogger.Debug(fmt.Sprintf("Sending message to client for topic %s", msg.Topic)) return sender.SendFrame(frame, data.IncludeAll) diff --git a/pkg/plugin/message.go b/pkg/plugin/message.go index 4743078..e1adbfa 100644 --- a/pkg/plugin/message.go +++ b/pkg/plugin/message.go @@ -2,7 +2,6 @@ package plugin import ( "encoding/json" - "fmt" "sort" "strconv" "strings" @@ -11,12 +10,12 @@ import ( "github.com/grafana/mqtt-datasource/pkg/mqtt" ) -func ToFrame(topic string, messages []mqtt.Message) *data.Frame { +func ToFrame(fields map[string]*data.Field, topic string, messages []mqtt.Message) *data.Frame { count := len(messages) if count > 0 { first := messages[0].Value if strings.HasPrefix(first, "{") { - return jsonMessagesToFrame(topic, messages) + return jsonMessagesToFrame(fields, topic, messages) } } @@ -36,61 +35,74 @@ func ToFrame(topic string, messages []mqtt.Message) *data.Frame { return data.NewFrame(topic, timeField, valueField) } -func jsonMessagesToFrame(topic string, messages []mqtt.Message) *data.Frame { - count := len(messages) - if count == 0 { - return nil +// unnestMap recursively unwraps a given arbitrary map, thereby appending its concrete values under the resulting path. +func unnestMap(v interface{}, newmap map[string]interface{}, key string) map[string]interface{} { + switch v := v.(type) { + case map[string]interface{}: // Object + for k, val := range v { + unnestMap(val, newmap, key+"."+k) + } + case []interface{}: // Array + for newkey := 0; newkey < len(v); newkey++ { + newmap[key+"["+strconv.Itoa(newkey)+"]"] = v[newkey] + } + default: // Number (float64), String (string), Boolean (bool), Null (nil) + newmap[key] = v } + return newmap - var body map[string]float64 - err := json.Unmarshal([]byte(messages[0].Value), &body) - if err != nil { - frame := data.NewFrame(topic) - frame.AppendNotices(data.Notice{Severity: data.NoticeSeverityError, - Text: fmt.Sprintf("error unmarshalling json message: %s", err.Error()), - }) - return frame +} + +// changeMapStructure transforms a nested arbitrary map to a simple key value layout. +func changeMapStructure(nestedMap map[string]interface{}) map[string]interface{} { + newmap := make(map[string]interface{}) + for k, v := range nestedMap { + newmap = unnestMap(v, newmap, k) } + return newmap +} +func jsonMessagesToFrame(fields map[string]*data.Field, topic string, messages []mqtt.Message) *data.Frame { + count := len(messages) + if count == 0 { + return nil + } timeField := data.NewFieldFromFieldType(data.FieldTypeTime, count) timeField.Name = "Time" - timeField.SetConcrete(0, messages[0].Timestamp) - - // Create a field for each key and set the first value - keys := make([]string, 0, len(body)) - fields := make(map[string]*data.Field, len(body)) - for key, val := range body { - field := data.NewFieldFromFieldType(data.FieldTypeNullableFloat64, count) - field.Name = key - field.SetConcrete(0, val) - fields[key] = field - keys = append(keys, key) - } - sort.Strings(keys) // keys stable field order. - - // Add rows 1...n for row, m := range messages { - if row == 0 { - continue - } - - err := json.Unmarshal([]byte(m.Value), &body) - if err != nil { - continue // bad row? - } - timeField.SetConcrete(row, m.Timestamp) - for key, val := range body { - field, ok := fields[key] - if ok { - field.SetConcrete(row, val) + var body map[string]interface{} + if err := json.Unmarshal([]byte(m.Value), &body); err != nil { + return nil + } + for key, val := range changeMapStructure(body) { + var t data.FieldType + field, exists := fields[key] + switch val.(type) { + case float64: + t = data.FieldTypeNullableFloat64 + case string: + t = data.FieldTypeNullableString + case bool: + t = data.FieldTypeNullableBool + default: + delete(fields, key) + continue } + if !exists || field.Type() != t { + field = data.NewFieldFromFieldType(t, count) + field.Name = key + fields[key] = field + } + field.SetConcrete(row, val) } } - frame := data.NewFrame(topic, timeField) - for _, key := range keys { - frame.Fields = append(frame.Fields, fields[key]) + for _, f := range fields { + frame.Fields = append(frame.Fields, f) } + sort.Slice(frame.Fields, func(i, j int) bool { + return frame.Fields[i].Name < frame.Fields[j].Name + }) return frame } diff --git a/pkg/plugin/message_test.go b/pkg/plugin/message_test.go index 0332951..f3f359e 100644 --- a/pkg/plugin/message_test.go +++ b/pkg/plugin/message_test.go @@ -5,13 +5,14 @@ import ( "testing" "time" + "github.com/grafana/grafana-plugin-sdk-go/data" "github.com/grafana/mqtt-datasource/pkg/mqtt" "github.com/grafana/mqtt-datasource/pkg/plugin" "github.com/stretchr/testify/require" ) func TestSimpleValueMessage(t *testing.T) { - frame := plugin.ToFrame("test/data", []mqtt.Message{ + frame := plugin.ToFrame(make(map[string]*data.Field), "test/data", []mqtt.Message{ { Timestamp: time.Unix(1, 0), Value: "1", @@ -24,31 +25,45 @@ func TestSimpleValueMessage(t *testing.T) { func TestJSONValuesMessage(t *testing.T) { timestamp := time.Unix(1, 0) - values := []float64{ - -0.5182926829268293, - -0.3582317073170732, + values := []interface{}{ + "-0.5182926829268293", + "Hi", 0.1753048780487805, - 0.20599365234375, + false, -0.050048828125, - 1.03582763671875, + true, } - msg := fmt.Sprintf(`{"ax": %v, "ay": %v, "az": %v, "gx": %v, "gy": %v, "gz": %v}`, - values[0], values[1], values[2], values[3], values[4], values[5]) - frame := plugin.ToFrame("test/data", []mqtt.Message{ + cases := []struct { + msg string + keys []string + }{ { - Timestamp: timestamp, - Value: msg, - }, - }) - numFields := len(values) + 1 - require.NotNil(t, frame) - require.Equal(t, numFields, len(frame.Fields)) - v, ok := frame.Fields[0].ConcreteAt(0) - require.Equal(t, true, ok) - require.Equal(t, v, timestamp) - for idx, val := range values { - v, err := frame.Fields[idx+1].FloatAt(0) - require.NoError(t, err) - require.Equal(t, val, v) + msg: fmt.Sprintf(`{"ax": %#v, "ay": %#v, "az": %#v, "gx": %#v, "gy": %#v, "gz": %#v}`, + values[0], values[1], values[2], values[3], values[4], values[5]), + keys: []string{"ax", "ay", "az", "gx", "gy", "gz"}}, + + { + msg: fmt.Sprintf(`{"ax": %#v, "ay":{"az": { "gx": %#v}, "gy": %#v}, "gz":{"b":{"bx": %#v,"by": %#v, "bz": %#v}}}`, + values[0], values[1], values[2], values[3], values[4], values[5]), + keys: []string{"ax", "ay.az.gx", "ay.gy", "gz.b.bx", "gz.b.by", "gz.b.bz"}}, + } + for _, c := range cases { + frame := plugin.ToFrame(make(map[string]*data.Field), "test/data", []mqtt.Message{ + { + Timestamp: timestamp, + Value: c.msg, + }, + }) + numFields := len(values) + 1 + require.NotNil(t, frame) + require.Equal(t, numFields, len(frame.Fields)) + v, ok := frame.Fields[0].ConcreteAt(0) + require.Equal(t, true, ok) + require.Equal(t, v, timestamp) + for idx, val := range values { + require.Equal(t, frame.Fields[idx+1].Name, c.keys[idx]) + v, _ := frame.Fields[idx+1].ConcreteAt(0) + require.Equal(t, val, v) + } } }