Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(state) - Query filter fields in the response item #3485

Open
wants to merge 7 commits into
base: main
Choose a base branch
from
34 changes: 34 additions & 0 deletions .github/infrastructure/docker-compose-mongodb.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
version: '2'

services:
mongodb:
image : mongo:4.2
container_name: mongodb
hostname: mongodb
restart: always
environment:
- PUID=1000
- PGID=1000
- MONGO_INITDB_DATABASE=daprStore
- MONGO_REPLICA_SET_NAME=test-rs
ports:
- 27017:27017
volumes:
- mongo:/data/db
entrypoint: [ "/usr/bin/mongod", "--bind_ip_all", "--replSet", "test-rs" ]

mongosetup:
image: mongo:4.2
links:
- mongodb
depends_on:
- mongodb
volumes:
- ./../scripts/components-scripts/conformance-state.mongodb-setup.sh:/scripts/mongo_setup.sh
restart: "no"
entrypoint: [ "bash", "/scripts/mongo_setup.sh" ]

volumes:
mongo:


9 changes: 9 additions & 0 deletions .github/infrastructure/docker-compose-redis7stack.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
version: '2'
services:
redis:
image: redis/redis-stack:7.2.0-v11
ports:
- "6381:6379"
- "8001:8001"
environment:
- REDIS_REPLICATION_MODE=master
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
#!/bin/bash
sleep 5
echo SETUP.sh time now: `date +"%T" `
mongo --host mongodb:27017 <<EOF
var cfg = {
"_id": "test-rs",
"version": 1,
"members": [
{
"_id": 0,
"host": "localhost:27017"
}
]
};
rs.initiate(cfg, { force: true });
while (! db.isMaster().ismaster ) { sleep(1000) };
db.getMongo().setReadPref('primary');
rs.status();
EOF

5 changes: 5 additions & 0 deletions .github/scripts/test-info.mjs
Original file line number Diff line number Diff line change
Expand Up @@ -778,6 +778,11 @@ const components = {
conformanceSetup: 'docker-compose.sh redis7 redis',
sourcePkg: ['state/redis', 'common/component/redis'],
},
'state.redis.v7stack': {
conformance: true,
conformanceSetup: 'docker-compose.sh redis7stack redis',
sourcePkg: ['state/redis', 'common/component/redis'],
},
'state.rethinkdb': {
conformance: true,
conformanceSetup: 'docker-compose.sh rethinkdb',
Expand Down
86 changes: 77 additions & 9 deletions common/component/postgresql/v1/postgresql_query.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,14 @@ package postgresql

import (
"context"
"encoding/json"
"fmt"
"strconv"
"strings"

daprmetadata "github.com/dapr/components-contrib/metadata"
"github.com/dapr/components-contrib/state/utils"

pginterfaces "github.com/dapr/components-contrib/common/component/postgresql/interfaces"
"github.com/dapr/components-contrib/state"
"github.com/dapr/components-contrib/state/query"
Expand Down Expand Up @@ -66,6 +70,13 @@ func (p *PostgreSQLQuery) Query(parentCtx context.Context, req *state.QueryReque
etagColumn: p.etagColumn,
}
qbuilder := query.NewQueryBuilder(q)
selectedAttributes, ok := daprmetadata.TryGetQuerySelectedAttributes(req.Metadata)
if ok {
var err error
if q.querySelectedAttributes, err = utils.ParseQuerySelectedAttributes(selectedAttributes); err != nil {
return nil, fmt.Errorf("postgresql store: error parsing selected attributes: %w", err)
}
}
if err := qbuilder.BuildQuery(&req.Query); err != nil {
return &state.QueryResponse{}, err
}
Expand All @@ -81,12 +92,13 @@ func (p *PostgreSQLQuery) Query(parentCtx context.Context, req *state.QueryReque
}

type Query struct {
query string
params []interface{}
limit int
skip *int64
tableName string
etagColumn string
query string
params []interface{}
limit int
skip *int64
tableName string
etagColumn string
querySelectedAttributes []utils.Attribute
}

func (q *Query) VisitEQ(f *query.EQ) (string, error) {
Expand Down Expand Up @@ -222,8 +234,19 @@ func (q *Query) VisitOR(f *query.OR) (string, error) {
}

func (q *Query) Finalize(filters string, qq *query.Query) error {
q.query = fmt.Sprintf("SELECT key, value, %s as etag FROM "+q.tableName, q.etagColumn)
if q.querySelectedAttributes != nil {
var columns string
for idx, item := range q.querySelectedAttributes {
if idx != 0 {
columns += ", "
}
columns += translateFieldToFilter(item.Path) + " as " + item.Name
}

q.query = fmt.Sprintf("SELECT key, %s, %s as etag FROM "+q.tableName, columns, q.etagColumn)
} else {
q.query = fmt.Sprintf("SELECT key, value, %s as etag FROM "+q.tableName, q.etagColumn)
}
if filters != "" {
q.query += " WHERE " + filters
}
Expand Down Expand Up @@ -267,14 +290,59 @@ func (q *Query) execute(ctx context.Context, db pginterfaces.DBQuerier) ([]state
defer rows.Close()

ret := []state.QueryItem{}

for rows.Next() {
var (
key string
data []byte
etag uint32
)
if err = rows.Scan(&key, &data, &etag); err != nil {
return nil, "", err
if q.querySelectedAttributes != nil {
var values []interface{}
if values, err = rows.Values(); err != nil {
return nil, "", err
}
var ok bool
key = values[0].(string)
etag, ok = values[len(values)-1].(uint32)
if !ok {
// Cockroachdb the type etag is int64 instead of uint32, necessary the casting
etag = uint32(values[len(values)-1].(int64))
}
result := make(map[string]interface{})
for idx, item := range q.querySelectedAttributes {
value := values[idx+1]
switch item.Type {
case utils.Bool:
result[item.Name], err = strconv.ParseBool(value.(string))
case utils.Numeric:
var jsonObject interface{}
err = json.Unmarshal([]byte(value.(string)), &jsonObject)
result[item.Name] = jsonObject
case utils.Object:
var jsonObject interface{}
err = json.Unmarshal([]byte(value.(string)), &jsonObject)
result[item.Name] = jsonObject
case utils.Array:
var jsonArray []interface{}
err = json.Unmarshal([]byte(value.(string)), &jsonArray)
result[item.Name] = jsonArray
default:
result[item.Name] = value.(string)
}

if err != nil {
return nil, "", err
}
data, err = json.Marshal(result)
if err != nil {
return nil, "", err
}
}
} else {
if err = rows.Scan(&key, &data, &etag); err != nil {
return nil, "", err
}
}
result := state.QueryItem{
Key: key,
Expand Down
19 changes: 17 additions & 2 deletions common/component/postgresql/v1/postgresql_query_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ import (
"os"
"testing"

"github.com/dapr/components-contrib/state/utils"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

Expand All @@ -26,8 +28,9 @@ import (

func TestPostgresqlQueryBuildQuery(t *testing.T) {
tests := []struct {
input string
query string
input string
query string
selectedAttributes string
}{
{
input: "../../../../tests/state/query/q1.json",
Expand Down Expand Up @@ -61,18 +64,30 @@ func TestPostgresqlQueryBuildQuery(t *testing.T) {
input: "../../../../tests/state/query/q8.json",
query: "SELECT key, value, xmin as etag FROM state WHERE (value->'person'->>'org'>=$1 OR (value->'person'->>'org'<$2 AND (value->>'state'=$3 OR value->>'state'=$4))) ORDER BY value->>'state' DESC, value->'person'->>'name' LIMIT 2",
},
{
input: "../../../../tests/state/query/q8.json",
query: "SELECT key, value->>'person' as Person, value->'person'->>'org' as Organization, xmin as etag FROM state WHERE (value->'person'->>'org'>=$1 OR (value->'person'->>'org'<$2 AND (value->>'state'=$3 OR value->>'state'=$4))) ORDER BY value->>'state' DESC, value->'person'->>'name' LIMIT 2",
selectedAttributes: `[{"name":"Person", "path":"person", "type":"Object"},{"name":"Organization", "path":"person.org", "type":"Text"}]`,
},
}
for _, test := range tests {
data, err := os.ReadFile(test.input)
require.NoError(t, err)
var qq query.Query

err = json.Unmarshal(data, &qq)
require.NoError(t, err)

q := &Query{
tableName: defaultTableName,
etagColumn: "xmin",
}

if len(test.selectedAttributes) > 0 {
q.querySelectedAttributes, err = utils.ParseQuerySelectedAttributes(test.selectedAttributes)
require.NoError(t, err)
}

qbuilder := query.NewQueryBuilder(q)
err = qbuilder.BuildQuery(&qq)
require.NoError(t, err)
Expand Down
11 changes: 11 additions & 0 deletions metadata/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,9 @@ const (
// QueryIndexName defines the metadata key for the name of query indexing schema (for redis).
QueryIndexName = "queryIndexName"

// QuerySelectedAttributes defines the metadata key for getting only the selected attributes in the result item (for redis)
QuerySelectedAttributes = "querySelectedAttributes"

// MaxBulkPubBytesKey defines the maximum bytes to publish in a bulk publish request metadata.
MaxBulkPubBytesKey string = "maxBulkPubBytes"
)
Expand Down Expand Up @@ -130,6 +133,14 @@ func TryGetQueryIndexName(props map[string]string) (string, bool) {
return "", false
}

func TryGetQuerySelectedAttributes(props map[string]string) (string, bool) {
if val, ok := props[QuerySelectedAttributes]; ok && val != "" {
return val, true
}

return "", false
}

// GetMetadataProperty returns a property from the metadata map, with support for aliases
func GetMetadataProperty(props map[string]string, keys ...string) (val string, ok bool) {
lcProps := make(map[string]string, len(props))
Expand Down
9 changes: 9 additions & 0 deletions state/azure/cosmosdb/cosmosdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -592,6 +592,15 @@ func (c *StateStore) Query(ctx context.Context, req *state.QueryRequest) (*state
q := &Query{}

qbuilder := query.NewQueryBuilder(q)

selectedAttributes, ok := contribmeta.TryGetQuerySelectedAttributes(req.Metadata)
if ok {
var err error
if q.querySelectedAttributes, err = stateutils.ParseQuerySelectedAttributes(selectedAttributes); err != nil {
return nil, fmt.Errorf("postgresql store: error parsing selected attributes: %w", err)
}
}

if err := qbuilder.BuildQuery(&req.Query); err != nil {
return &state.QueryResponse{}, err
}
Expand Down
41 changes: 34 additions & 7 deletions state/azure/cosmosdb/cosmosdb_query.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ import (
"fmt"
"strings"

"github.com/dapr/components-contrib/state/utils"

"github.com/Azure/azure-sdk-for-go/sdk/data/azcosmos"
jsoniter "github.com/json-iterator/go"

Expand All @@ -34,10 +36,11 @@ type InternalQuery struct {
}

type Query struct {
query InternalQuery
limit int
token string
partitionKey string
query InternalQuery
limit int
token string
partitionKey string
querySelectedAttributes []utils.Attribute
}

func (q *Query) VisitEQ(f *query.EQ) (string, error) {
Expand Down Expand Up @@ -226,7 +229,16 @@ func (q *Query) Finalize(filters string, qq *query.Query) error {
orderBy = " ORDER BY " + strings.Join(order, ", ")
}

q.query.query = "SELECT * FROM c" + filter + orderBy
if q.querySelectedAttributes != nil {
var columns string
columns = "c['id'], c['_etag']"
for _, item := range q.querySelectedAttributes {
columns += ", " + replaceKeywords("c.value."+item.Path) + " as '" + item.Name + "'"
}
q.query.query = "SELECT " + columns + " FROM c" + filter + orderBy
} else {
q.query.query = "SELECT * FROM c" + filter + orderBy
}
q.limit = qq.Page.Limit
q.token = qq.Page.Token

Expand Down Expand Up @@ -272,7 +284,6 @@ func (q *Query) execute(ctx context.Context, client *azcosmos.ContainerClient) (
} else {
pk = azcosmos.NewPartitionKeyBool(true)
}

queryPager := client.NewQueryItemsPager(q.query.query, pk, opts)

token := ""
Expand All @@ -291,7 +302,23 @@ func (q *Query) execute(ctx context.Context, client *azcosmos.ContainerClient) (
}
for _, item := range queryResponse.Items {
tempItem := CosmosItem{}
err := json.Unmarshal(item, &tempItem)
var err error
if q.querySelectedAttributes != nil {
properties := make(map[string]interface{})
err = json.Unmarshal(item, &properties)
tempItem.ID = properties["id"].(string)
tempItem.Etag = properties["_etag"].(string)
if err != nil {
return nil, "", err
}
value := make(map[string]interface{})
for _, item := range q.querySelectedAttributes {
value[item.Name] = properties[item.Name]
}
tempItem.Value = value
} else {
err = json.Unmarshal(item, &tempItem)
}
if err != nil {
return nil, "", err
}
Expand Down
Loading