Skip to content

Commit a47271f

Browse files
committed
Ensure all data is present in frame from websocket. Don't send empty frames
1 parent 4c301e3 commit a47271f

File tree

2 files changed

+36
-66
lines changed

2 files changed

+36
-66
lines changed

pkg/opensearch/resource_handlers.go

Lines changed: 4 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -34,29 +34,13 @@ func (ds *OpenSearchDatasource) handleRegisterStreamQuery(ctx context.Context, r
3434

3535
// req.Body is already []byte, no need for io.ReadAll
3636
bodyBytes := req.Body
37-
// if err != nil { // This was for io.ReadAll, not needed now
38-
// ds.logger.Error("handleRegisterStreamQuery: failed to read request body", "refId", refId, "error", err)
39-
// return sender.Send(&backend.CallResourceResponse{
40-
// Status: http.StatusInternalServerError,
41-
// Body: []byte("Failed to read request body"),
42-
// })
43-
// }
44-
// req.Body is automatically closed by the SDK's resource handling (if it were an io.Closer, which it isn't directly here)
4537

4638
ds.logger.Debug("handleRegisterStreamQuery: received body", "refId", refId, "body", string(bodyBytes))
4739

48-
var query Query // Using the Query struct from models.go
49-
if err := json.Unmarshal(bodyBytes, &query); err != nil {
50-
ds.logger.Error("handleRegisterStreamQuery: failed to unmarshal query", "refId", refId, "error", err, "body", string(bodyBytes))
51-
return sender.Send(&backend.CallResourceResponse{
52-
Status: http.StatusBadRequest,
53-
Body: []byte("Failed to unmarshal query payload: " + err.Error()),
54-
})
55-
}
56-
57-
// Store the query. streamQueries is a sync.Map declared in OpenSearchDatasource struct.
58-
ds.streamQueries.Store(refId, query)
59-
ds.logger.Info("handleRegisterStreamQuery: successfully stored query for streaming", "refId", refId, "queryIsLuceQueryType", query.luceneQueryType)
40+
// Store the raw query JSON as json.RawMessage for streaming
41+
var rawQueryJSON json.RawMessage = bodyBytes
42+
ds.streamQueries.Store(refId, rawQueryJSON)
43+
ds.logger.Info("handleRegisterStreamQuery: successfully stored raw query JSON for streaming", "refId", refId)
6044

6145
// Send a JSON response
6246
responseBody := map[string]string{"message": "Query registered for streaming"}

pkg/opensearch/stream_handler.go

Lines changed: 32 additions & 46 deletions
Original file line numberDiff line numberDiff line change
@@ -72,28 +72,18 @@ func (o *OpenSearchDatasource) RunStream(ctx context.Context, req *backend.RunSt
7272
o.streamQueries.Delete(refId)
7373
}()
7474

75-
streamQuery, queryOk := val.(Query)
76-
if !queryOk {
75+
rawQueryJSON, ok := val.(json.RawMessage)
76+
if !ok {
7777
o.logger.Error("RunStream: failed to assert query type from map", "refId", refId, "type", fmt.Sprintf("%T", val))
7878
return fmt.Errorf("failed to assert query type for refId: %s", refId)
7979
}
8080

81-
o.logger.Info("RunStream: starting polling for query", "refId", refId, "rawQuery", streamQuery.RawQuery)
81+
o.logger.Info("RunStream: starting polling for query", "refId", refId, "rawQuery", string(rawQueryJSON))
8282

8383
ticker := time.NewTicker(5 * time.Second)
8484
defer ticker.Stop()
8585

86-
var lastTo time.Time
87-
originalTimeRange := streamQuery.TimeRange
88-
if !originalTimeRange.To.IsZero() {
89-
lastTo = originalTimeRange.To
90-
} else {
91-
if originalTimeRange.From.IsZero() {
92-
originalTimeRange.From = time.Now().Add(-5 * time.Minute)
93-
}
94-
}
95-
96-
firstPoll := true
86+
lastTo := time.Now()
9787

9888
for {
9989
select {
@@ -102,43 +92,25 @@ func (o *OpenSearchDatasource) RunStream(ctx context.Context, req *backend.RunSt
10292
return ctx.Err()
10393
case <-ticker.C:
10494
currentTime := time.Now()
105-
queryForPoll := streamQuery
106-
107-
if firstPoll {
108-
queryForPoll.TimeRange.From = originalTimeRange.From
109-
queryForPoll.TimeRange.To = currentTime
110-
firstPoll = false
111-
} else {
112-
queryForPoll.TimeRange.From = lastTo
113-
queryForPoll.TimeRange.To = currentTime
95+
backendQuery := backend.DataQuery{
96+
RefID: refId,
97+
TimeRange: backend.TimeRange{From: lastTo, To: currentTime},
98+
JSON: rawQueryJSON,
11499
}
115100

116-
if !queryForPoll.TimeRange.To.After(queryForPoll.TimeRange.From) {
117-
o.logger.Debug("RunStream: 'To' time is not after 'From' time, skipping poll to avoid empty range", "from", queryForPoll.TimeRange.From, "to", queryForPoll.TimeRange.To)
101+
if !backendQuery.TimeRange.To.After(backendQuery.TimeRange.From) {
102+
o.logger.Debug("RunStream: 'To' time is not after 'From' time, skipping poll to avoid empty range", "from", backendQuery.TimeRange.From, "to", backendQuery.TimeRange.To)
118103
continue
119104
}
120105

121-
o.logger.Info("RunStream: Polling OpenSearch", "refId", refId, "from", queryForPoll.TimeRange.From, "to", queryForPoll.TimeRange.To)
106+
o.logger.Info("RunStream: Polling OpenSearch", "refId", refId, "from", backendQuery.TimeRange.From, "to", backendQuery.TimeRange.To)
122107

123-
osClient, err := client.NewClient(ctx, req.PluginContext.DataSourceInstanceSettings, o.httpClient, &queryForPoll.TimeRange)
108+
osClient, err := client.NewClient(ctx, req.PluginContext.DataSourceInstanceSettings, o.httpClient, &backendQuery.TimeRange)
124109
if err != nil {
125110
o.logger.Error("RunStream: failed to create OpenSearch client for poll", "refId", refId, "error", err)
126111
continue
127112
}
128113

129-
queryJSON, err := json.Marshal(queryForPoll)
130-
if err != nil {
131-
o.logger.Error("RunStream: failed to marshal stream query to JSON for backend.DataQuery", "refId", refId, "error", err)
132-
continue
133-
}
134-
135-
backendQuery := backend.DataQuery{
136-
RefID: queryForPoll.RefID,
137-
TimeRange: queryForPoll.TimeRange,
138-
JSON: queryJSON,
139-
QueryType: queryForPoll.QueryType,
140-
}
141-
142114
queryExecutor := newQueryRequest(osClient, []backend.DataQuery{backendQuery}, req.PluginContext.DataSourceInstanceSettings)
143115
queryDataResponse, err := queryExecutor.execute(ctx)
144116

@@ -149,7 +121,7 @@ func (o *OpenSearchDatasource) RunStream(ctx context.Context, req *backend.RunSt
149121

150122
var framesToUpdate data.Frames
151123
if queryDataResponse != nil && queryDataResponse.Responses != nil {
152-
if respForRefId, found := queryDataResponse.Responses[queryForPoll.RefID]; found {
124+
if respForRefId, found := queryDataResponse.Responses[refId]; found {
153125
if respForRefId.Error != nil {
154126
o.logger.Error("RunStream: error in query response for poll", "refId", refId, "error", respForRefId.Error)
155127
continue
@@ -158,18 +130,24 @@ func (o *OpenSearchDatasource) RunStream(ctx context.Context, req *backend.RunSt
158130
}
159131
}
160132

161-
if len(framesToUpdate) > 0 {
162-
o.logger.Info("RunStream: new data found", "refId", refId, "frameCount", len(framesToUpdate))
163-
for _, frame := range framesToUpdate {
133+
var nonEmptyFrames data.Frames
134+
for _, frame := range framesToUpdate {
135+
if frameHasRows(frame) {
136+
nonEmptyFrames = append(nonEmptyFrames, frame)
137+
}
138+
}
139+
if len(nonEmptyFrames) > 0 {
140+
o.logger.Info("RunStream: new non-empty data found", "refId", refId, "frameCount", len(nonEmptyFrames))
141+
for _, frame := range nonEmptyFrames {
164142
err = sender.SendFrame(frame, data.IncludeAll)
165143
if err != nil {
166144
o.logger.Error("RunStream: failed to send frame to frontend", "refId", refId, "error", err)
167145
return err
168146
}
169147
}
170-
lastTo = queryForPoll.TimeRange.To
148+
lastTo = currentTime
171149
} else {
172-
o.logger.Debug("RunStream: no new data in this interval", "refId", refId)
150+
o.logger.Debug("RunStream: no new non-empty data in this interval", "refId", refId)
173151
}
174152
}
175153
}
@@ -180,3 +158,11 @@ func (*OpenSearchDatasource) PublishStream(_ context.Context, _ *backend.Publish
180158
Status: backend.PublishStreamStatusPermissionDenied,
181159
}, nil
182160
}
161+
162+
func frameHasRows(frame *data.Frame) bool {
163+
if frame == nil || len(frame.Fields) == 0 {
164+
return false
165+
}
166+
// All fields should have the same length, so just check the first
167+
return frame.Fields[0].Len() > 0
168+
}

0 commit comments

Comments
 (0)