Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
86 changes: 79 additions & 7 deletions pkg/opensearch/opensearch.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,34 +9,79 @@ import (
"net/url"
"path"
"strings"
"sync"
"time"

"github.com/bitly/go-simplejson"
"github.com/grafana/grafana-plugin-sdk-go/backend"
"github.com/grafana/grafana-plugin-sdk-go/backend/datasource"
"github.com/grafana/grafana-plugin-sdk-go/backend/instancemgmt"
"github.com/grafana/grafana-plugin-sdk-go/backend/log"
"github.com/grafana/grafana-plugin-sdk-go/data"
"github.com/grafana/grafana-plugin-sdk-go/experimental/errorsource"
"github.com/grafana/opensearch-datasource/pkg/opensearch/client"
)

// This is a list of interfaces that the Service implements.
// Go will not compile if we don't implement all of these interfaces.
var (
_ backend.QueryDataHandler = (*OpenSearchDatasource)(nil)
_ backend.StreamHandler = (*OpenSearchDatasource)(nil)
_ backend.CallResourceHandler = (*OpenSearchDatasource)(nil)
)

type datasourceInfo struct {
HTTPClient *http.Client
URL string

// open streams
streams map[string]data.FrameJSONCache
streamsMu sync.RWMutex
}

func newInstanceSettings(client *http.Client) datasource.InstanceFactoryFunc {
return func(ctx context.Context, settings backend.DataSourceInstanceSettings) (instancemgmt.Instance, error) {
model := &datasourceInfo{
HTTPClient: client,
URL: settings.URL,
streams: make(map[string]data.FrameJSONCache),
}
return model, nil
}
}

// OpenSearchExecutor represents a handler for handling OpenSearch datasource request
type OpenSearchExecutor struct{}

// OpenSearchDatasource is an OpenSearch data source.
type OpenSearchDatasource struct {
HttpClient *http.Client
// instance manager just holds a pointer to the open search client
im instancemgmt.InstanceManager

// httpClient is the general httpClient for the datasource to use for miscellaneous calls
httpClient *http.Client
logger log.Logger

// streamQueries stores active queries for streaming sessions, keyed by a unique ID (e.g., refId)
streamQueries sync.Map
}

// NewOpenSearchDatasource creates a new OpenSearchDatasource and sets up its configuration.
func NewOpenSearchDatasource(ctx context.Context, settings backend.DataSourceInstanceSettings) (instancemgmt.Instance, error) {
log.DefaultLogger.Debug("Initializing new data source instance")

// Use a default http client for now
httpClient, err := client.NewDatasourceHttpClient(ctx, &settings)
if err != nil {
return nil, err
}

return &OpenSearchDatasource{
HttpClient: httpClient,
}, nil
ds := &OpenSearchDatasource{
im: datasource.NewInstanceManager(newInstanceSettings(httpClient)),
httpClient: httpClient,
logger: backend.NewLoggerWith("logger", "tsdb.opensearch"),
}
return ds, nil
}

// CheckHealth handles health checks sent from Grafana to the plugin.
Expand Down Expand Up @@ -112,7 +157,7 @@ func (ds *OpenSearchDatasource) CheckHealth(ctx context.Context, req *backend.Ch
}
request.Header = req.GetHTTPHeaders()

response, err := ds.HttpClient.Do(request)
response, err := ds.httpClient.Do(request)
if err != nil {
res.Status = backend.HealthStatusError
res.Message = err.Error()
Expand Down Expand Up @@ -179,12 +224,18 @@ func (ds *OpenSearchDatasource) CheckHealth(ctx context.Context, req *backend.Ch
// The QueryDataResponse contains a map of RefID to the response for each query, and each response
// contains Frames ([]*Frame).
func (ds *OpenSearchDatasource) QueryData(ctx context.Context, req *backend.QueryDataRequest) (*backend.QueryDataResponse, error) {
ds.logger.Info("QueryData called", "numQueries", len(req.Queries))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we do a review of all the lines where we log stuff, both FE and BE? They're good for debugging, but I think it's better to be conservative since they might be a burden on our cloud logs

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm happy to remove or update the levels afterward. The main reason for adding them was to gauge how far along we are in the pipeline.

if len(req.Queries) == 0 {
return nil, fmt.Errorf("query contains no queries")
}

if len(req.Queries) > 0 {
jsonData, _ := req.Queries[0].JSON.MarshalJSON()
ds.logger.Info("QueryData - First query JSON", "refId", req.Queries[0].RefID, "json", string(jsonData), "interval", req.Queries[0].Interval.String(), "timeRangeFrom", req.Queries[0].TimeRange.From.String(), "timeRangeTo", req.Queries[0].TimeRange.To.String())
}

timeRange := req.Queries[0].TimeRange
osClient, err := client.NewClient(ctx, req.PluginContext.DataSourceInstanceSettings, ds.HttpClient, &timeRange)
osClient, err := client.NewClient(ctx, req.PluginContext.DataSourceInstanceSettings, ds.httpClient, &timeRange)
if err != nil {
return nil, err
}
Expand All @@ -195,7 +246,11 @@ func (ds *OpenSearchDatasource) QueryData(ctx context.Context, req *backend.Quer
}

query := newQueryRequest(osClient, req.Queries, req.PluginContext.DataSourceInstanceSettings)
ds.logger.Info("QueryData - About to execute query")
response, err = wrapError(query.execute(ctx))
if err != nil {
ds.logger.Error("QueryData - Error executing query", "error", err.Error())
}
return response, err
}

Expand Down Expand Up @@ -302,6 +357,9 @@ func extractParametersFromServiceMapFrames(resp *backend.QueryDataResponse) ([]s
}

func (ds *OpenSearchDatasource) CallResource(ctx context.Context, req *backend.CallResourceRequest, sender backend.CallResourceResponseSender) error {
ds.logger.Info("CallResource called", "path", req.Path, "method", req.Method)

// Existing resource call logic
// allowed paths for resource calls:
// - empty string for fetching db version
// - /_mapping for fetching index mapping, e.g. requests going to `index/_mapping`
Expand All @@ -322,7 +380,7 @@ func (ds *OpenSearchDatasource) CallResource(ctx context.Context, req *backend.C
}
request.Header = req.GetHTTPHeaders()

response, err := ds.HttpClient.Do(request)
response, err := ds.httpClient.Do(request)
if err != nil {
return err
}
Expand Down Expand Up @@ -362,3 +420,17 @@ func createOpensearchURL(reqPath string, urlStr string) (string, error) {
}
return osUrlString, nil
}

func (o *OpenSearchDatasource) getDSInfo(ctx context.Context, pluginCtx backend.PluginContext) (*datasourceInfo, error) {
i, err := o.im.Get(ctx, pluginCtx)
if err != nil {
return nil, err
}

instance, ok := i.(*datasourceInfo)
if !ok {
return nil, fmt.Errorf("failed to cast data source info")
}

return instance, nil
}
170 changes: 170 additions & 0 deletions pkg/opensearch/stream_handler.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,170 @@
package opensearch

import (
"context"
"fmt"
"strings"
"time"

"github.com/grafana/grafana-plugin-sdk-go/backend"
"github.com/grafana/grafana-plugin-sdk-go/data"
"github.com/grafana/opensearch-datasource/pkg/opensearch/client"
)

func (o *OpenSearchDatasource) SubscribeStream(
ctx context.Context, req *backend.SubscribeStreamRequest,
) (*backend.SubscribeStreamResponse, error) {
o.logger.Info("SubscribeStream called", "path", req.Path)
o.logger.Debug("SubscribeStream", "full_req_details", req)

if !strings.HasPrefix(req.Path, "tail/") {
o.logger.Error("SubscribeStream: invalid path format", "path", req.Path)
return &backend.SubscribeStreamResponse{
Status: backend.SubscribeStreamStatusNotFound,
}, fmt.Errorf("invalid path format for stream: %s, expected 'tail/{refId}'", req.Path)
}

refIdParts := strings.Split(strings.TrimPrefix(req.Path, "tail/"), "/")
if len(refIdParts) == 0 || refIdParts[0] == "" {
o.logger.Error("SubscribeStream: missing refId in path", "path", req.Path)
return &backend.SubscribeStreamResponse{
Status: backend.SubscribeStreamStatusNotFound,
}, fmt.Errorf("missing refId in stream path: %s", req.Path)
}

_, err := o.getDSInfo(ctx, req.PluginContext)
if err != nil {
o.logger.Error("SubscribeStream: failed to get datasource info", "error", err)
return &backend.SubscribeStreamResponse{
Status: backend.SubscribeStreamStatusNotFound,
}, fmt.Errorf("failed to get datasource info: %w", err)
}

o.logger.Info("SubscribeStream: path validated, authorizing stream", "path", req.Path)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should add valiadation for OrgId (like here) too, to ensure that the requests are coming from the correct instance:

return &backend.SubscribeStreamResponse{
Status: backend.SubscribeStreamStatusOK,
}, nil
}

func (o *OpenSearchDatasource) RunStream(ctx context.Context, req *backend.RunStreamRequest, sender *backend.StreamSender) error {
o.logger.Info("RunStream (Polling Mode) called", "path", req.Path)
o.logger.Debug("RunStream", "full_req_details", req)

if !strings.HasPrefix(req.Path, "tail/") {
o.logger.Error("RunStream: invalid path format", "path", req.Path)
return fmt.Errorf("RunStream: invalid path format for stream: %s, expected 'tail/{refId}'", req.Path)
}
refId := strings.TrimPrefix(req.Path, "tail/")
if refId == "" {
o.logger.Error("RunStream: missing refId in path after trim", "path", req.Path)
return fmt.Errorf("RunStream: missing refId in stream path after trim: %s", req.Path)
}

// Get query data directly from the stream request (standard Grafana pattern)
rawQueryJSON := req.Data
if len(rawQueryJSON) == 0 {
o.logger.Error("RunStream: no query data provided in stream request", "refId", refId, "path", req.Path)
return fmt.Errorf("no query data provided for streaming refId: %s", refId)
}

o.logger.Info("RunStream: starting polling for query", "refId", refId, "rawQuery", string(rawQueryJSON))

ticker := time.NewTicker(2 * time.Second) // Faster polling for more responsive live tailing
defer ticker.Stop()

lastTo := time.Now().Add(-1 * time.Second) // Start slightly in the past to catch any timing issues

for {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we extract OpenSearch polling logic (request and time range building & response processing) to a separate method on o *OpenSearchDatasource? It could return a channel, which we would just need to subscribe to in RunStream handler. It would probably be easier to add a test for opensearch streaming function it in that case then (which would be great to have!).

select {
case <-ctx.Done():
o.logger.Info("RunStream: context canceled, stopping stream", "refId", refId)
return ctx.Err()
case <-ticker.C:
currentTime := time.Now()
backendQuery := backend.DataQuery{
RefID: refId,
TimeRange: backend.TimeRange{From: lastTo, To: currentTime},
JSON: rawQueryJSON,
}

if !backendQuery.TimeRange.To.After(backendQuery.TimeRange.From) {
o.logger.Debug("RunStream: 'To' time is not after 'From' time, skipping poll to avoid empty range", "from", backendQuery.TimeRange.From, "to", backendQuery.TimeRange.To)
continue
}

o.logger.Info("RunStream: Polling OpenSearch", "refId", refId, "from", backendQuery.TimeRange.From, "to", backendQuery.TimeRange.To, "duration", backendQuery.TimeRange.To.Sub(backendQuery.TimeRange.From))

osClient, err := client.NewClient(ctx, req.PluginContext.DataSourceInstanceSettings, o.httpClient, &backendQuery.TimeRange)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we we should be able to just run

queryDataResponse, err :=  o.QueryData(ctx, &backend.QueryDataRequest{
				Queries:  []backend.DataQuery{backendQuery},
				PluginContext: req.PluginContext,	
				Headers:  req.Headers,
			})

instead of this and the following lines, since we have all the data to construct the data query request.

if err != nil {
o.logger.Error("RunStream: failed to create OpenSearch client for poll", "refId", refId, "error", err)
continue
}

queryExecutor := newQueryRequest(osClient, []backend.DataQuery{backendQuery}, req.PluginContext.DataSourceInstanceSettings)
queryDataResponse, err := queryExecutor.execute(ctx)

if err != nil {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Query response returned contains an .Error field instead of returning an error, so this error wouldn't get logged or passed down to the user

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would that be just passing things down as JSON?

json, err := respForRefId.MarshalJSON()
if err != nil {
	o.logger.Error("RunStream: failed to marshal query response to JSON", "refId", refId, "error", err)
	return err
}
err = sender.SendJSON(json)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This looks good now. I think if we add the queryData() call, it will mostly take care of the error being added to the response already too

o.logger.Error("RunStream: error executing OpenSearch query poll", "refId", refId, "error", err)
if respForRefId, found := queryDataResponse.Responses[refId]; found {
respForRefId.Error = err
json, err := respForRefId.MarshalJSON()
if err != nil {
o.logger.Error("RunStream: failed to marshal query response to JSON", "refId", refId, "error", err)
return err
}
err = sender.SendJSON(json)
if err != nil {
o.logger.Error("RunStream: failed to send JSON to frontend", "refId", refId, "error", err)
return err
}
}
}

var framesToUpdate data.Frames
if queryDataResponse != nil && queryDataResponse.Responses != nil {
if respForRefId, found := queryDataResponse.Responses[refId]; found {
if respForRefId.Error != nil {
o.logger.Error("RunStream: error in query response for poll", "refId", refId, "error", respForRefId.Error)
continue
}
framesToUpdate = respForRefId.Frames
}
}

var nonEmptyFrames data.Frames
for _, frame := range framesToUpdate {
if frameHasRows(frame) {
nonEmptyFrames = append(nonEmptyFrames, frame)
}
}
if len(nonEmptyFrames) > 0 {
o.logger.Info("RunStream: new non-empty data found", "refId", refId, "frameCount", len(nonEmptyFrames))
for _, frame := range nonEmptyFrames {
err = sender.SendFrame(frame, data.IncludeAll)
if err != nil {
o.logger.Error("RunStream: failed to send frame to frontend", "refId", refId, "error", err)
return err
}
}
} else {
o.logger.Debug("RunStream: no new non-empty data in this interval", "refId", refId)
}
// Always advance lastTo to avoid querying the same time range repeatedly
lastTo = currentTime
}
}
}

func (*OpenSearchDatasource) PublishStream(_ context.Context, _ *backend.PublishStreamRequest) (*backend.PublishStreamResponse, error) {
return &backend.PublishStreamResponse{
Status: backend.PublishStreamStatusPermissionDenied,
}, nil
}

func frameHasRows(frame *data.Frame) bool {
if frame == nil || len(frame.Fields) == 0 {
return false
}
// All fields should have the same length, so just check the first
return frame.Fields[0].Len() > 0
}
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,11 @@ export const SettingsEditor = ({ bucketAgg }: Props) => {
/>
</InlineField>

<InlineField label="Execution Hint" {...inlineFieldProps} tooltip="Determines how the aggregation should be executed. OpenSearch automatically chooses the optimal hint based on field type (global_ordinals for keyword fields, map for scripts) if not specified.">
<InlineField
label="Execution Hint"
{...inlineFieldProps}
tooltip="Determines how the aggregation should be executed. OpenSearch automatically chooses the optimal hint based on field type (global_ordinals for keyword fields, map for scripts) if not specified."
>
<Select
data-testid="execution-hint-select"
onChange={(e) =>
Expand Down
Loading