Skip to content

Commit

Permalink
Add Redis Graph module (GRAPH.QUERY, GRAPH.SLOWLOG) (#157)
Browse files Browse the repository at this point in the history
* Update docker to use 7.4.0

* Add Graph.Query

* Add Redis Graph

* Add Test data for Redis Graph

* Add RedisGraph to 7.4 dashboard

* Implementing graph

* Implement slowlog.

* Change default query on dashboard

* Revert docker configuration

* Update dashboard

* Format

Co-authored-by: Andrei Shamanau <andrei.shamanau@softeq.com>
  • Loading branch information
mikhail-vl and ashamanau authored Feb 4, 2021
1 parent 3e20888 commit 97d9409
Show file tree
Hide file tree
Showing 10 changed files with 752 additions and 1 deletion.
Binary file modified data/dump.rdb
Binary file not shown.
8 changes: 8 additions & 0 deletions pkg/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,14 @@ func query(ctx context.Context, query backend.DataQuery, client redisClient) bac
case "rg.pyexecute":
return queryRgPyexecute(qm, client)

/**
* Redis Graph
*/
case "graph.query":
return queryGraphQuery(qm, client)
case "graph.slowlog":
return queryGraphSlowlog(qm, client)

/**
* Default
*/
Expand Down
2 changes: 2 additions & 0 deletions pkg/query_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,8 @@ func TestQuery(t *testing.T) {
{queryModel{Command: "rg.pyexecute"}},
{queryModel{Command: "rg.xrange"}},
{queryModel{Command: "rg.xrevrange"}},
{queryModel{Command: "graph.query"}},
{queryModel{Command: "graph.slowlog"}},
}

// Run Tests
Expand Down
201 changes: 201 additions & 0 deletions pkg/redis-graph.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,201 @@
package main

import (
"strconv"
"time"

"github.com/grafana/grafana-plugin-sdk-go/backend"
"github.com/grafana/grafana-plugin-sdk-go/data"
)

/**
* Represents node
*/
type nodeEntry struct {
id string
title string
subTitle string
mainStat string
arc int64
}

/**
* Represents edge
*/
type edgeEntry struct {
id string
source string
target string
mainStat string
}

/**
* GRAPH.QUERY <Graph name> {query}
*
* Executes the given query against a specified graph.
* @see https://oss.redislabs.com/redisgraph/commands/#graphquery
*/
func queryGraphQuery(qm queryModel, client redisClient) backend.DataResponse {
response := backend.DataResponse{}

var result []interface{}

// Run command
err := client.RunFlatCmd(&result, "GRAPH.QUERY", qm.Key, qm.Cypher)

// Check error
if err != nil {
return errorHandler(response, err)
}

// New Frame for nodes
frameWithNodes := data.NewFrame("nodes")
frameWithNodes.Meta = &data.FrameMeta{
PreferredVisualization: "nodeGraph",
}
frameWithNodes.Fields = append(frameWithNodes.Fields, data.NewField("id", nil, []string{}))
frameWithNodes.Fields = append(frameWithNodes.Fields, data.NewField("title", nil, []string{}))
frameWithNodes.Fields = append(frameWithNodes.Fields, data.NewField("subTitle", nil, []string{}))
frameWithNodes.Fields = append(frameWithNodes.Fields, data.NewField("mainStat", nil, []string{}))
frameWithNodes.Fields = append(frameWithNodes.Fields, data.NewField("arc__", nil, []int64{}))

// New Frame for edges
frameWithEdges := data.NewFrame("edges")
frameWithEdges.Meta = &data.FrameMeta{
PreferredVisualization: "nodeGraph",
}
frameWithEdges.Fields = append(frameWithEdges.Fields, data.NewField("id", nil, []string{}))
frameWithEdges.Fields = append(frameWithEdges.Fields, data.NewField("source", nil, []string{}))
frameWithEdges.Fields = append(frameWithEdges.Fields, data.NewField("target", nil, []string{}))
frameWithEdges.Fields = append(frameWithEdges.Fields, data.NewField("mainStat", nil, []string{}))

// Adding frames to response
response.Frames = append(response.Frames, frameWithNodes)
response.Frames = append(response.Frames, frameWithEdges)

existingNodes := map[string]bool{}

for _, entries := range result[1].([]interface{}) {
nodes, edges := findAllNodesAndEdges(entries)
for _, node := range nodes {
// Add each nodeEntry only once
if _, ok := existingNodes[node.id]; !ok {
frameWithNodes.AppendRow(node.id, node.title, node.subTitle, node.mainStat, node.arc)
existingNodes[node.id] = true
}
}
for _, edge := range edges {
frameWithEdges.AppendRow(edge.id, edge.source, edge.target, edge.mainStat)
}
}
return response
}

/**
* Parse array of entries and find
* either Nodes https://oss.redislabs.com/redisgraph/result_structure/#nodes
* or Relations https://oss.redislabs.com/redisgraph/result_structure/#relations
* and create corresponding nodeEntry or edgeEntry
**/
func findAllNodesAndEdges(input interface{}) ([]nodeEntry, []edgeEntry) {
nodes := []nodeEntry{}
edges := []edgeEntry{}

if entries, ok := input.([]interface{}); ok {
for _, entry := range entries {
entryFields := entry.([]interface{})

// Node https://oss.redislabs.com/redisgraph/result_structure/#nodes
if len(entryFields) == 3 {
node := nodeEntry{arc: 1}
idArray := entryFields[0].([]interface{})
node.id = strconv.FormatInt(idArray[1].(int64), 10)

// Assume first label will be a title if exists
labelsArray := entryFields[1].([]interface{})
labels := labelsArray[1].([]interface{})
if len(labels) > 0 {
node.title = string(labels[0].([]byte))
}

// Assume first property will be a mainStat if exists
propertiesArray := entryFields[2].([]interface{})
properties := propertiesArray[1].([]interface{})
if len(properties) > 0 {
propertyArray := properties[0].([]interface{})
switch propValue := propertyArray[1].(type) {
case []byte:
node.mainStat = string(propValue)
case int64:
node.mainStat = strconv.FormatInt(propValue, 10)
}
}

nodes = append(nodes, node)
}

// Relation https://oss.redislabs.com/redisgraph/result_structure/#relations
if len(entryFields) == 5 {
edge := edgeEntry{}
idArray := entryFields[0].([]interface{})
edge.id = strconv.FormatInt(idArray[1].(int64), 10)

// Main Stat
typeArray := entryFields[1].([]interface{})
edge.mainStat = string(typeArray[1].([]byte))

// Source
srcArray := entryFields[2].([]interface{})
edge.source = strconv.FormatInt(srcArray[1].(int64), 10)

// Target
destArray := entryFields[3].([]interface{})
edge.target = strconv.FormatInt(destArray[1].(int64), 10)

edges = append(edges, edge)
}
}
}
return nodes, edges
}

/**
* GRAPH.SLOWLOG <Graph name>
*
* Returns a list containing up to 10 of the slowest queries issued against the given graph ID.
* @see https://oss.redislabs.com/redisgraph/commands/#graphslowlog
*/
func queryGraphSlowlog(qm queryModel, client redisClient) backend.DataResponse {
response := backend.DataResponse{}

var result [][]string

// Run command
err := client.RunFlatCmd(&result, "GRAPH.SLOWLOG", qm.Key)

// Check error
if err != nil {
return errorHandler(response, err)
}

// New Frame
frame := data.NewFrame("GRAPH.SLOWLOG")
frame.Fields = append(frame.Fields, data.NewField("timestamp", nil, []time.Time{}))
frame.Fields = append(frame.Fields, data.NewField("command", nil, []string{}))
frame.Fields = append(frame.Fields, data.NewField("query", nil, []string{}))
frame.Fields = append(frame.Fields, data.NewField("duration", nil, []float64{}))
response.Frames = append(response.Frames, frame)

// Set Field Config
frame.Fields[3].Config = &data.FieldConfig{Unit: "µs"}

// Entries
for _, entry := range result {
timestamp, _ := strconv.ParseInt(entry[0], 10, 64)
duration, _ := strconv.ParseFloat(entry[3], 64)
frame.AppendRow(time.Unix(timestamp, 0), entry[1], entry[2], duration)
}

// Return
return response
}
79 changes: 79 additions & 0 deletions pkg/redis-graph_integration_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
// +build integration

package main

import (
"fmt"
"testing"

"github.com/mediocregopher/radix/v3"
"github.com/stretchr/testify/require"
)

/**
* GRAPH.QUERY
*/
func TestGraphQueryIntegration(t *testing.T) {
// Client
radixClient, _ := radix.NewPool("tcp", fmt.Sprintf("127.0.0.1:%d", integrationTestPort), 10)
client := radixV3Impl{radixClient: radixClient}

// Response
resp := queryGraphQuery(queryModel{Command: "graph.query", Key: "GOT_DEMO", Cypher: "MATCH (w:writer)-[r:wrote]->(b:book) return w,r,b"}, &client)
require.Len(t, resp.Frames, 2)
require.Len(t, resp.Frames[0].Fields, 5)
require.Equal(t, "id", resp.Frames[0].Fields[0].Name)
require.Equal(t, "title", resp.Frames[0].Fields[1].Name)
require.Equal(t, "subTitle", resp.Frames[0].Fields[2].Name)
require.Equal(t, "mainStat", resp.Frames[0].Fields[3].Name)
require.Equal(t, "arc__", resp.Frames[0].Fields[4].Name)
require.Equal(t, 15, resp.Frames[0].Fields[0].Len())
require.Len(t, resp.Frames[1].Fields, 4)
require.Equal(t, "id", resp.Frames[1].Fields[0].Name)
require.Equal(t, "source", resp.Frames[1].Fields[1].Name)
require.Equal(t, "target", resp.Frames[1].Fields[2].Name)
require.Equal(t, "mainStat", resp.Frames[1].Fields[3].Name)
require.Equal(t, 14, resp.Frames[1].Fields[0].Len())
}

func TestGraphQueryIntegrationWithoutRelations(t *testing.T) {
// Client
radixClient, _ := radix.NewPool("tcp", fmt.Sprintf("127.0.0.1:%d", integrationTestPort), 10)
client := radixV3Impl{radixClient: radixClient}

// Response
resp := queryGraphQuery(queryModel{Command: "graph.query", Key: "GOT_DEMO", Cypher: "MATCH (w:writer)-[wrote]->(b:book) return w,b"}, &client)
require.Len(t, resp.Frames, 2)
require.Len(t, resp.Frames[0].Fields, 5)
require.Equal(t, 15, resp.Frames[0].Fields[0].Len())
require.Len(t, resp.Frames[1].Fields, 4)
require.Equal(t, 0, resp.Frames[1].Fields[0].Len())
}

func TestGraphQueryIntegrationWithoutNodes(t *testing.T) {
// Client
radixClient, _ := radix.NewPool("tcp", fmt.Sprintf("127.0.0.1:%d", integrationTestPort), 10)
client := radixV3Impl{radixClient: radixClient}

// Response
resp := queryGraphQuery(queryModel{Command: "graph.query", Key: "GOT_DEMO", Cypher: "MATCH (w:writer)-[r:wrote]->(b:book) return r"}, &client)
require.Len(t, resp.Frames, 2)
require.Len(t, resp.Frames[0].Fields, 5)
require.Equal(t, 0, resp.Frames[0].Fields[0].Len())
require.Len(t, resp.Frames[1].Fields, 4)
require.Equal(t, 14, resp.Frames[1].Fields[0].Len())
}

/**
* GRAPH.SLOWLOG
*/
func TestGraphSlowlogIntegration(t *testing.T) {
// Client
radixClient, _ := radix.NewPool("tcp", fmt.Sprintf("127.0.0.1:%d", integrationTestPort), 10)
client := radixV3Impl{radixClient: radixClient}

// Response
resp := queryGraphSlowlog(queryModel{Command: "graph.slowlog", Key: "GOT_DEMO"}, &client)
require.Len(t, resp.Frames, 1)
require.Len(t, resp.Frames[0].Fields, 4)
}
Loading

0 comments on commit 97d9409

Please sign in to comment.