Skip to content

Commit 4c301e3

Browse files
committed
Connecting live tail webSocket
1 parent b87596a commit 4c301e3

File tree

5 files changed

+326
-169
lines changed

5 files changed

+326
-169
lines changed

pkg/opensearch/lucene_handler.go

Lines changed: 2 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,8 @@ func (h *luceneHandler) processQuery(q *Query) error {
4444
if len(q.BucketAggs) == 0 {
4545
// If no aggregations, only trace, document, and logs queries are valid
4646
if q.luceneQueryType != "Traces" {
47-
if len(q.Metrics) == 0 || !(q.Metrics[0].Type == rawDataType || q.Metrics[0].Type == rawDocumentType) {
47+
// Add logsType to the allowed types when no bucketAggs are present
48+
if len(q.Metrics) == 0 || !(q.Metrics[0].Type == rawDataType || q.Metrics[0].Type == rawDocumentType || q.Metrics[0].Type == logsType) {
4849
return fmt.Errorf("invalid query, missing metrics and aggregations")
4950
}
5051
}
@@ -132,20 +133,6 @@ func processLogsQuery(q *Query, b *client.SearchRequestBuilder, from, to int64,
132133
size = defaultLogsSize
133134
}
134135
b.Size(size)
135-
136-
// For log query, we use only date histogram aggregation
137-
aggBuilder := b.Agg()
138-
defaultBucketAgg := &BucketAgg{
139-
Type: dateHistType,
140-
Field: defaultTimeField,
141-
ID: "1",
142-
Settings: utils.NewJsonFromAny(map[string]interface{}{
143-
"interval": "auto",
144-
})}
145-
defaultBucketAgg.Settings = utils.NewJsonFromAny(
146-
defaultBucketAgg.generateSettingsForDSL(),
147-
)
148-
_ = addDateHistogramAgg(aggBuilder, defaultBucketAgg, from, to, defaultTimeField)
149136
}
150137

151138
func (bucketAgg BucketAgg) generateSettingsForDSL() map[string]interface{} {

pkg/opensearch/opensearch.go

Lines changed: 40 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,9 @@ import (
2525
// This is a list of interfaces that the Service implements.
2626
// Go will not compile if we don't implement all of these interfaces.
2727
var (
28-
_ backend.StreamHandler = (*OpenSearchDatasource)(nil)
28+
_ backend.QueryDataHandler = (*OpenSearchDatasource)(nil)
29+
_ backend.StreamHandler = (*OpenSearchDatasource)(nil)
30+
_ backend.CallResourceHandler = (*OpenSearchDatasource)(nil)
2931
)
3032

3133
type datasourceInfo struct {
@@ -51,26 +53,35 @@ func newInstanceSettings(client *http.Client) datasource.InstanceFactoryFunc {
5153
// OpenSearchExecutor represents a handler for handling OpenSearch datasource request
5254
type OpenSearchExecutor struct{}
5355

56+
// OpenSearchDatasource is an OpenSearch data source.
5457
type OpenSearchDatasource struct {
55-
HttpClient *http.Client
56-
backend.StreamHandler
57-
im instancemgmt.InstanceManager
58-
logger log.Logger
58+
// instance manager just holds a pointer to the open search client
59+
im instancemgmt.InstanceManager
60+
61+
// httpClient is the general httpClient for the datasource to use for miscellaneous calls
62+
httpClient *http.Client
63+
logger log.Logger
64+
65+
// streamQueries stores active queries for streaming sessions, keyed by a unique ID (e.g., refId)
66+
streamQueries sync.Map
5967
}
6068

69+
// NewOpenSearchDatasource creates a new OpenSearchDatasource and sets up its configuration.
6170
func NewOpenSearchDatasource(ctx context.Context, settings backend.DataSourceInstanceSettings) (instancemgmt.Instance, error) {
6271
log.DefaultLogger.Debug("Initializing new data source instance")
6372

73+
// Use a default http client for now
6474
httpClient, err := client.NewDatasourceHttpClient(ctx, &settings)
6575
if err != nil {
6676
return nil, err
6777
}
6878

69-
return &OpenSearchDatasource{
70-
HttpClient: httpClient,
79+
ds := &OpenSearchDatasource{
7180
im: datasource.NewInstanceManager(newInstanceSettings(httpClient)),
81+
httpClient: httpClient,
7282
logger: backend.NewLoggerWith("logger", "tsdb.opensearch"),
73-
}, nil
83+
}
84+
return ds, nil
7485
}
7586

7687
// CheckHealth handles health checks sent from Grafana to the plugin.
@@ -146,7 +157,7 @@ func (ds *OpenSearchDatasource) CheckHealth(ctx context.Context, req *backend.Ch
146157
}
147158
request.Header = req.GetHTTPHeaders()
148159

149-
response, err := ds.HttpClient.Do(request)
160+
response, err := ds.httpClient.Do(request)
150161
if err != nil {
151162
res.Status = backend.HealthStatusError
152163
res.Message = err.Error()
@@ -213,12 +224,18 @@ func (ds *OpenSearchDatasource) CheckHealth(ctx context.Context, req *backend.Ch
213224
// The QueryDataResponse contains a map of RefID to the response for each query, and each response
214225
// contains Frames ([]*Frame).
215226
func (ds *OpenSearchDatasource) QueryData(ctx context.Context, req *backend.QueryDataRequest) (*backend.QueryDataResponse, error) {
227+
ds.logger.Info("QueryData called", "numQueries", len(req.Queries))
216228
if len(req.Queries) == 0 {
217229
return nil, fmt.Errorf("query contains no queries")
218230
}
219231

232+
if len(req.Queries) > 0 {
233+
jsonData, _ := req.Queries[0].JSON.MarshalJSON()
234+
ds.logger.Info("QueryData - First query JSON", "refId", req.Queries[0].RefID, "json", string(jsonData), "interval", req.Queries[0].Interval.String(), "timeRangeFrom", req.Queries[0].TimeRange.From.String(), "timeRangeTo", req.Queries[0].TimeRange.To.String())
235+
}
236+
220237
timeRange := req.Queries[0].TimeRange
221-
osClient, err := client.NewClient(ctx, req.PluginContext.DataSourceInstanceSettings, ds.HttpClient, &timeRange)
238+
osClient, err := client.NewClient(ctx, req.PluginContext.DataSourceInstanceSettings, ds.httpClient, &timeRange)
222239
if err != nil {
223240
return nil, err
224241
}
@@ -229,7 +246,11 @@ func (ds *OpenSearchDatasource) QueryData(ctx context.Context, req *backend.Quer
229246
}
230247

231248
query := newQueryRequest(osClient, req.Queries, req.PluginContext.DataSourceInstanceSettings)
249+
ds.logger.Info("QueryData - About to execute query")
232250
response, err = wrapError(query.execute(ctx))
251+
if err != nil {
252+
ds.logger.Error("QueryData - Error executing query", "error", err.Error())
253+
}
233254
return response, err
234255
}
235256

@@ -335,6 +356,14 @@ func extractParametersFromServiceMapFrames(resp *backend.QueryDataResponse) ([]s
335356
}
336357

337358
func (ds *OpenSearchDatasource) CallResource(ctx context.Context, req *backend.CallResourceRequest, sender backend.CallResourceResponseSender) error {
359+
ds.logger.Info("CallResource called", "path", req.Path, "method", req.Method)
360+
361+
// Route for registering stream queries
362+
if strings.HasPrefix(req.Path, "_stream_query_register/") {
363+
return ds.handleRegisterStreamQuery(ctx, req, sender)
364+
}
365+
366+
// Existing resource call logic
338367
// allowed paths for resource calls:
339368
// - empty string for fetching db version
340369
// - /_mapping for fetching index mapping, e.g. requests going to `index/_mapping`
@@ -355,7 +384,7 @@ func (ds *OpenSearchDatasource) CallResource(ctx context.Context, req *backend.C
355384
}
356385
request.Header = req.GetHTTPHeaders()
357386

358-
response, err := ds.HttpClient.Do(request)
387+
response, err := ds.httpClient.Do(request)
359388
if err != nil {
360389
return err
361390
}
Lines changed: 77 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,77 @@
1+
package opensearch
2+
3+
import (
4+
"context"
5+
"encoding/json"
6+
"net/http"
7+
"strings"
8+
9+
"github.com/grafana/grafana-plugin-sdk-go/backend"
10+
)
11+
12+
// handleRegisterStreamQuery handles the HTTP request to register a query for a streaming session.
13+
func (ds *OpenSearchDatasource) handleRegisterStreamQuery(ctx context.Context, req *backend.CallResourceRequest, sender backend.CallResourceResponseSender) error {
14+
ds.logger.Info("handleRegisterStreamQuery called", "path", req.Path, "method", req.Method)
15+
16+
if req.Method != http.MethodPost {
17+
return sender.Send(&backend.CallResourceResponse{
18+
Status: http.StatusMethodNotAllowed,
19+
Body: []byte("Method not allowed"),
20+
})
21+
}
22+
23+
// Extract refId from path, e.g., /_stream_query_register/{refId}
24+
// The path in CallResourceRequest does not include the initial /resources prefix.
25+
pathParts := strings.Split(strings.TrimPrefix(req.Path, "_stream_query_register/"), "/")
26+
if len(pathParts) == 0 || pathParts[0] == "" {
27+
ds.logger.Error("handleRegisterStreamQuery: missing refId in path", "path", req.Path)
28+
return sender.Send(&backend.CallResourceResponse{
29+
Status: http.StatusBadRequest,
30+
Body: []byte("Missing refId in path"),
31+
})
32+
}
33+
refId := pathParts[0]
34+
35+
// req.Body is already []byte, no need for io.ReadAll
36+
bodyBytes := req.Body
37+
// if err != nil { // This was for io.ReadAll, not needed now
38+
// ds.logger.Error("handleRegisterStreamQuery: failed to read request body", "refId", refId, "error", err)
39+
// return sender.Send(&backend.CallResourceResponse{
40+
// Status: http.StatusInternalServerError,
41+
// Body: []byte("Failed to read request body"),
42+
// })
43+
// }
44+
// req.Body is automatically closed by the SDK's resource handling (if it were an io.Closer, which it isn't directly here)
45+
46+
ds.logger.Debug("handleRegisterStreamQuery: received body", "refId", refId, "body", string(bodyBytes))
47+
48+
var query Query // Using the Query struct from models.go
49+
if err := json.Unmarshal(bodyBytes, &query); err != nil {
50+
ds.logger.Error("handleRegisterStreamQuery: failed to unmarshal query", "refId", refId, "error", err, "body", string(bodyBytes))
51+
return sender.Send(&backend.CallResourceResponse{
52+
Status: http.StatusBadRequest,
53+
Body: []byte("Failed to unmarshal query payload: " + err.Error()),
54+
})
55+
}
56+
57+
// Store the query. streamQueries is a sync.Map declared in OpenSearchDatasource struct.
58+
ds.streamQueries.Store(refId, query)
59+
ds.logger.Info("handleRegisterStreamQuery: successfully stored query for streaming", "refId", refId, "queryIsLuceQueryType", query.luceneQueryType)
60+
61+
// Send a JSON response
62+
responseBody := map[string]string{"message": "Query registered for streaming"}
63+
jsonBody, err := json.Marshal(responseBody)
64+
if err != nil {
65+
ds.logger.Error("handleRegisterStreamQuery: failed to marshal JSON response", "refId", refId, "error", err)
66+
return sender.Send(&backend.CallResourceResponse{
67+
Status: http.StatusInternalServerError,
68+
Body: []byte("Error creating success response"), // Keep simple error for brevity
69+
})
70+
}
71+
72+
return sender.Send(&backend.CallResourceResponse{
73+
Status: http.StatusOK,
74+
Body: jsonBody,
75+
Headers: map[string][]string{"Content-Type": {"application/json"}}, // Explicitly set Content-Type for the response
76+
})
77+
}

0 commit comments

Comments
 (0)