Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Redis Graph module (GRAPH.QUERY, GRAPH.SLOWLOG) #157

Merged
merged 13 commits into from
Feb 4, 2021
Binary file modified data/dump.rdb
Binary file not shown.
8 changes: 8 additions & 0 deletions pkg/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,14 @@ func query(ctx context.Context, query backend.DataQuery, client redisClient) bac
case "rg.pyexecute":
return queryRgPyexecute(qm, client)

/**
* Redis Graph
*/
case "graph.query":
return queryGraphQuery(qm, client)
case "graph.slowlog":
return queryGraphSlowlog(qm, client)

/**
* Default
*/
Expand Down
2 changes: 2 additions & 0 deletions pkg/query_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,8 @@ func TestQuery(t *testing.T) {
{queryModel{Command: "rg.pyexecute"}},
{queryModel{Command: "rg.xrange"}},
{queryModel{Command: "rg.xrevrange"}},
{queryModel{Command: "graph.query"}},
{queryModel{Command: "graph.slowlog"}},
}

// Run Tests
Expand Down
201 changes: 201 additions & 0 deletions pkg/redis-graph.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,201 @@
package main

import (
"strconv"
"time"

"github.com/grafana/grafana-plugin-sdk-go/backend"
"github.com/grafana/grafana-plugin-sdk-go/data"
)

/**
* Represents node
*/
type nodeEntry struct {
id string
title string
subTitle string
mainStat string
arc int64
}

/**
* Represents edge
*/
type edgeEntry struct {
id string
source string
target string
mainStat string
}

/**
* GRAPH.QUERY <Graph name> {query}
*
* Executes the given query against a specified graph.
* @see https://oss.redislabs.com/redisgraph/commands/#graphquery
*/
func queryGraphQuery(qm queryModel, client redisClient) backend.DataResponse {
response := backend.DataResponse{}

var result []interface{}

// Run command
err := client.RunFlatCmd(&result, "GRAPH.QUERY", qm.Key, qm.Cypher)

// Check error
if err != nil {
return errorHandler(response, err)
}

// New Frame for nodes
frameWithNodes := data.NewFrame("nodes")
frameWithNodes.Meta = &data.FrameMeta{
PreferredVisualization: "nodeGraph",
}
frameWithNodes.Fields = append(frameWithNodes.Fields, data.NewField("id", nil, []string{}))
frameWithNodes.Fields = append(frameWithNodes.Fields, data.NewField("title", nil, []string{}))
frameWithNodes.Fields = append(frameWithNodes.Fields, data.NewField("subTitle", nil, []string{}))
frameWithNodes.Fields = append(frameWithNodes.Fields, data.NewField("mainStat", nil, []string{}))
frameWithNodes.Fields = append(frameWithNodes.Fields, data.NewField("arc__", nil, []int64{}))

// New Frame for edges
frameWithEdges := data.NewFrame("edges")
frameWithEdges.Meta = &data.FrameMeta{
PreferredVisualization: "nodeGraph",
}
frameWithEdges.Fields = append(frameWithEdges.Fields, data.NewField("id", nil, []string{}))
frameWithEdges.Fields = append(frameWithEdges.Fields, data.NewField("source", nil, []string{}))
frameWithEdges.Fields = append(frameWithEdges.Fields, data.NewField("target", nil, []string{}))
frameWithEdges.Fields = append(frameWithEdges.Fields, data.NewField("mainStat", nil, []string{}))

// Adding frames to response
response.Frames = append(response.Frames, frameWithNodes)
response.Frames = append(response.Frames, frameWithEdges)

existingNodes := map[string]bool{}

for _, entries := range result[1].([]interface{}) {
nodes, edges := findAllNodesAndEdges(entries)
for _, node := range nodes {
// Add each nodeEntry only once
if _, ok := existingNodes[node.id]; !ok {
frameWithNodes.AppendRow(node.id, node.title, node.subTitle, node.mainStat, node.arc)
existingNodes[node.id] = true
}
}
for _, edge := range edges {
frameWithEdges.AppendRow(edge.id, edge.source, edge.target, edge.mainStat)
}
}
return response
}

/**
* Parse array of entries and find
* either Nodes https://oss.redislabs.com/redisgraph/result_structure/#nodes
* or Relations https://oss.redislabs.com/redisgraph/result_structure/#relations
* and create corresponding nodeEntry or edgeEntry
**/
func findAllNodesAndEdges(input interface{}) ([]nodeEntry, []edgeEntry) {
nodes := []nodeEntry{}
edges := []edgeEntry{}

if entries, ok := input.([]interface{}); ok {
for _, entry := range entries {
entryFields := entry.([]interface{})

// Node https://oss.redislabs.com/redisgraph/result_structure/#nodes
if len(entryFields) == 3 {
node := nodeEntry{arc: 1}
idArray := entryFields[0].([]interface{})
node.id = strconv.FormatInt(idArray[1].(int64), 10)

// Assume first label will be a title if exists
labelsArray := entryFields[1].([]interface{})
labels := labelsArray[1].([]interface{})
if len(labels) > 0 {
node.title = string(labels[0].([]byte))
}

// Assume first property will be a mainStat if exists
propertiesArray := entryFields[2].([]interface{})
properties := propertiesArray[1].([]interface{})
if len(properties) > 0 {
propertyArray := properties[0].([]interface{})
switch propValue := propertyArray[1].(type) {
case []byte:
node.mainStat = string(propValue)
case int64:
node.mainStat = strconv.FormatInt(propValue, 10)
}
}

nodes = append(nodes, node)
}

// Relation https://oss.redislabs.com/redisgraph/result_structure/#relations
if len(entryFields) == 5 {
edge := edgeEntry{}
idArray := entryFields[0].([]interface{})
edge.id = strconv.FormatInt(idArray[1].(int64), 10)

// Main Stat
typeArray := entryFields[1].([]interface{})
edge.mainStat = string(typeArray[1].([]byte))

// Source
srcArray := entryFields[2].([]interface{})
edge.source = strconv.FormatInt(srcArray[1].(int64), 10)

// Target
destArray := entryFields[3].([]interface{})
edge.target = strconv.FormatInt(destArray[1].(int64), 10)

edges = append(edges, edge)
}
}
}
return nodes, edges
}

/**
* GRAPH.SLOWLOG <Graph name>
*
* Returns a list containing up to 10 of the slowest queries issued against the given graph ID.
* @see https://oss.redislabs.com/redisgraph/commands/#graphslowlog
*/
func queryGraphSlowlog(qm queryModel, client redisClient) backend.DataResponse {
response := backend.DataResponse{}

var result [][]string

// Run command
err := client.RunFlatCmd(&result, "GRAPH.SLOWLOG", qm.Key)

// Check error
if err != nil {
return errorHandler(response, err)
}

// New Frame
frame := data.NewFrame("GRAPH.SLOWLOG")
frame.Fields = append(frame.Fields, data.NewField("timestamp", nil, []time.Time{}))
frame.Fields = append(frame.Fields, data.NewField("command", nil, []string{}))
frame.Fields = append(frame.Fields, data.NewField("query", nil, []string{}))
frame.Fields = append(frame.Fields, data.NewField("duration", nil, []float64{}))
response.Frames = append(response.Frames, frame)

// Set Field Config
frame.Fields[3].Config = &data.FieldConfig{Unit: "µs"}

// Entries
for _, entry := range result {
timestamp, _ := strconv.ParseInt(entry[0], 10, 64)
duration, _ := strconv.ParseFloat(entry[3], 64)
frame.AppendRow(time.Unix(timestamp, 0), entry[1], entry[2], duration)
}

// Return
return response
}
79 changes: 79 additions & 0 deletions pkg/redis-graph_integration_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
// +build integration

package main

import (
"fmt"
"testing"

"github.com/mediocregopher/radix/v3"
"github.com/stretchr/testify/require"
)

/**
* GRAPH.QUERY
*/
func TestGraphQueryIntegration(t *testing.T) {
// Client
radixClient, _ := radix.NewPool("tcp", fmt.Sprintf("127.0.0.1:%d", integrationTestPort), 10)
client := radixV3Impl{radixClient: radixClient}

// Response
resp := queryGraphQuery(queryModel{Command: "graph.query", Key: "GOT_DEMO", Cypher: "MATCH (w:writer)-[r:wrote]->(b:book) return w,r,b"}, &client)
require.Len(t, resp.Frames, 2)
require.Len(t, resp.Frames[0].Fields, 5)
require.Equal(t, "id", resp.Frames[0].Fields[0].Name)
require.Equal(t, "title", resp.Frames[0].Fields[1].Name)
require.Equal(t, "subTitle", resp.Frames[0].Fields[2].Name)
require.Equal(t, "mainStat", resp.Frames[0].Fields[3].Name)
require.Equal(t, "arc__", resp.Frames[0].Fields[4].Name)
require.Equal(t, 15, resp.Frames[0].Fields[0].Len())
require.Len(t, resp.Frames[1].Fields, 4)
require.Equal(t, "id", resp.Frames[1].Fields[0].Name)
require.Equal(t, "source", resp.Frames[1].Fields[1].Name)
require.Equal(t, "target", resp.Frames[1].Fields[2].Name)
require.Equal(t, "mainStat", resp.Frames[1].Fields[3].Name)
require.Equal(t, 14, resp.Frames[1].Fields[0].Len())
}

func TestGraphQueryIntegrationWithoutRelations(t *testing.T) {
// Client
radixClient, _ := radix.NewPool("tcp", fmt.Sprintf("127.0.0.1:%d", integrationTestPort), 10)
client := radixV3Impl{radixClient: radixClient}

// Response
resp := queryGraphQuery(queryModel{Command: "graph.query", Key: "GOT_DEMO", Cypher: "MATCH (w:writer)-[wrote]->(b:book) return w,b"}, &client)
require.Len(t, resp.Frames, 2)
require.Len(t, resp.Frames[0].Fields, 5)
require.Equal(t, 15, resp.Frames[0].Fields[0].Len())
require.Len(t, resp.Frames[1].Fields, 4)
require.Equal(t, 0, resp.Frames[1].Fields[0].Len())
}

func TestGraphQueryIntegrationWithoutNodes(t *testing.T) {
// Client
radixClient, _ := radix.NewPool("tcp", fmt.Sprintf("127.0.0.1:%d", integrationTestPort), 10)
client := radixV3Impl{radixClient: radixClient}

// Response
resp := queryGraphQuery(queryModel{Command: "graph.query", Key: "GOT_DEMO", Cypher: "MATCH (w:writer)-[r:wrote]->(b:book) return r"}, &client)
require.Len(t, resp.Frames, 2)
require.Len(t, resp.Frames[0].Fields, 5)
require.Equal(t, 0, resp.Frames[0].Fields[0].Len())
require.Len(t, resp.Frames[1].Fields, 4)
require.Equal(t, 14, resp.Frames[1].Fields[0].Len())
}

/**
* GRAPH.SLOWLOG
*/
func TestGraphSlowlogIntegration(t *testing.T) {
// Client
radixClient, _ := radix.NewPool("tcp", fmt.Sprintf("127.0.0.1:%d", integrationTestPort), 10)
client := radixV3Impl{radixClient: radixClient}

// Response
resp := queryGraphSlowlog(queryModel{Command: "graph.slowlog", Key: "GOT_DEMO"}, &client)
require.Len(t, resp.Frames, 1)
require.Len(t, resp.Frames[0].Fields, 4)
}
Loading