Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore(spanner): add json handling in executor #10327

Merged
merged 4 commits into from
Jun 6, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,10 @@ type CloudStreamHandler struct {
func (h *CloudStreamHandler) Execute() error {
log.Println("ExecuteActionAsync RPC called. Start handling input stream")

// Enable UseNumberWithJSONDecoderEncoder so that JSON numbers are decoded
// as Number (preserving precision) and not float64 (risking loss).
spanner.UseNumberWithJSONDecoderEncoder(true)

var c *actions.ExecutionFlowContext
func() {
h.mu.Lock()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
package utility

import (
"encoding/json"
"fmt"
"log"
"math/big"
Expand Down Expand Up @@ -43,6 +42,14 @@ func BuildQuery(queryAction *executorpb.QueryAction) (spanner.Statement, error)
return stmt, nil
}

// encodedJSON is a pre-encoded JSON value, so when marshaled the underlying
// bytes are returned as-is.
type encodedJSON []byte

func (v encodedJSON) MarshalJSON() ([]byte, error) {
return []byte(v), nil
}

// ExecutorValueToSpannerValue converts executorpb.Value with given spannerpb.Type into a cloud spanner interface.
// Parameter null indicates whether this value is NULL.
func ExecutorValueToSpannerValue(t *spannerpb.Type, v *executorpb.Value, null bool) (any, error) {
Expand Down Expand Up @@ -97,12 +104,7 @@ func ExecutorValueToSpannerValue(t *spannerpb.Type, v *executorpb.Value, null bo
return spanner.NullJSON{}, nil
}
x := v.GetStringValue()
var y interface{}
err := json.Unmarshal([]byte(x), &y)
if err != nil {
return nil, err
}
return spanner.NullJSON{Value: y, Valid: true}, nil
return spanner.NullJSON{Value: encodedJSON(x), Valid: true}, nil
case spannerpb.TypeCode_STRUCT:
return executorStructValueToSpannerValue(t, v.GetStructValue(), null)
case spannerpb.TypeCode_ARRAY:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,12 @@
package utility

import (
"bytes"
"encoding/json"
"fmt"
"log"
"math/big"
"strings"
"time"

"cloud.google.com/go/civil"
Expand Down Expand Up @@ -124,7 +128,7 @@ func extractRowValue(row *spanner.Row, i int, t *sppb.Type) (*executorpb.Value,
if err != nil {
return nil, err
}
val.ValueType = &executorpb.Value_StringValue{StringValue: v.String()}
val.ValueType = &executorpb.Value_StringValue{StringValue: encodeJSON(v)}
case sppb.TypeCode_ARRAY:
val, err = extractRowArrayValue(row, i, t.GetArrayElementType())
if err != nil {
Expand All @@ -136,6 +140,25 @@ func extractRowValue(row *spanner.Row, i int, t *sppb.Type) (*executorpb.Value,
return val, nil
}

func encodeJSON(n spanner.NullJSON) string {
if !n.Valid {
return "<null>"
}
var buf bytes.Buffer
encoder := json.NewEncoder(&buf)
encoder.SetEscapeHTML(false)

err := encoder.Encode(n.Value)
if err != nil {
return fmt.Sprintf("error: %v", err)
}
resultString := buf.String()

// Trim the new line since json.Encoder.Encode adds a new line character at the end.
resultString = strings.TrimSuffix(resultString, "\n")
return resultString
}

// extractRowArrayValue extracts a single column's array value at given index i from result row.
func extractRowArrayValue(row *spanner.Row, i int, t *sppb.Type) (*executorpb.Value, error) {
val := &executorpb.Value{}
Expand Down Expand Up @@ -298,7 +321,7 @@ func extractRowArrayValue(row *spanner.Row, i int, t *sppb.Type) (*executorpb.Va
value := &executorpb.Value{ValueType: &executorpb.Value_IsNull{IsNull: true}}
arrayValue.Value = append(arrayValue.Value, value)
} else {
value := &executorpb.Value{ValueType: &executorpb.Value_StringValue{StringValue: vv.String()}}
value := &executorpb.Value{ValueType: &executorpb.Value_StringValue{StringValue: encodeJSON(vv)}}
arrayValue.Value = append(arrayValue.Value, value)
}
}
Expand Down
Loading