Skip to content

Commit

Permalink
Allow multiple Streaming queries per panel
Browse files Browse the repository at this point in the history
  • Loading branch information
mikhail-vl committed Jul 15, 2021
1 parent 0e1299e commit 7d1a751
Show file tree
Hide file tree
Showing 14 changed files with 269 additions and 378 deletions.
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

![Dashboard](https://raw.githubusercontent.com/RedisGrafana/grafana-redis-datasource/master/src/img/redis-dashboard.png)

[![Grafana 7](https://img.shields.io/badge/Grafana-7-orange)](https://www.grafana.com)
[![Grafana 8](https://img.shields.io/badge/Grafana-8-orange)](https://www.grafana.com)
[![Redis Data Source](https://img.shields.io/badge/dynamic/json?color=blue&label=Redis%20Data%20Source&query=%24.version&url=https%3A%2F%2Fgrafana.com%2Fapi%2Fplugins%2Fredis-datasource)](https://grafana.com/grafana/plugins/redis-datasource)
[![Redis Application plugin](https://img.shields.io/badge/dynamic/json?color=blue&label=Redis%20Application%20plugin&query=%24.version&url=https%3A%2F%2Fgrafana.com%2Fapi%2Fplugins%2Fredis-app)](https://grafana.com/grafana/plugins/redis-app)
[![Redis Explorer plugin](https://img.shields.io/badge/dynamic/json?color=blue&label=Redis%20Explorer%20plugin&query=%24.version&url=https%3A%2F%2Fgrafana.com%2Fapi%2Fplugins%2Fredis-explorer-app)](https://grafana.com/grafana/plugins/redis-explorer-app)
Expand Down
52 changes: 36 additions & 16 deletions src/components/query-editor/query-editor.test.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,42 @@ describe('QueryEditor', () => {
});
});

/**
* Streaming
*/
describe('Streaming', () => {
const getComponent = (wrapper: ShallowComponent) =>
wrapper.findWhere((node) => {
return node.name() === 'Switch' && node.prop('label') === 'Streaming';
});

it('Should set value from query', () => {
const query = getQuery({ streaming: true });
const wrapper = shallow<QueryEditor>(
<QueryEditor datasource={{} as any} query={query} onRunQuery={onRunQuery} onChange={onChange} />
);
const testedComponent = getComponent(wrapper);
expect(testedComponent.prop('checked')).toEqual(true);
});

it('Should call onStreamingChange when onChange prop was called', () => {
const query = getQuery({ streaming: true });
const wrapper = shallow<QueryEditor>(
<QueryEditor datasource={{} as any} query={query} onRunQuery={onRunQuery} onChange={onChange} />
);
const testedMethod = jest.spyOn(wrapper.instance(), 'onStreamingChange');
wrapper.instance().forceUpdate();
const testedComponent = getComponent(wrapper);
const newValue = true;
testedComponent.simulate('change', { currentTarget: { checked: newValue } });
expect(testedMethod).toHaveBeenCalledWith({ currentTarget: { checked: newValue } });
expect(onChange).toHaveBeenCalledWith({
...query,
streaming: newValue,
});
});
});

runQueryFieldsTest([
{
name: 'query',
Expand Down Expand Up @@ -371,22 +407,6 @@ describe('QueryEditor', () => {
*/
describe('Streaming fields', () => {
runQueryFieldsTest([
{
name: 'streaming',
getComponent: (wrapper: ShallowComponent) =>
wrapper.findWhere((node) => {
return node.name() === 'Switch' && node.prop('label') === 'Streaming';
}),
type: 'switch',
queryWhenShown: {
refId: 'A',
type: QueryTypeValue.TIMESERIES,
},
queryWhenHidden: {
refId: 'B',
type: QueryTypeValue.TIMESERIES,
},
},
{
name: 'streamingInterval',
getComponent: (wrapper: ShallowComponent) =>
Expand Down
111 changes: 53 additions & 58 deletions src/components/query-editor/query-editor.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -251,7 +251,6 @@ export class QueryEditor extends PureComponent<Props> {
streamingInterval,
streamingCapacity,
streamingDataType,
refId,
} = this.props.query;
const { onRunQuery } = this.props;

Expand Down Expand Up @@ -495,66 +494,62 @@ export class QueryEditor extends PureComponent<Props> {
</div>
)}

{refId === 'A' && (
<>
<div className="gf-form">
<Switch
label="Streaming"
labelClass="width-8"
tooltip="If checked, the datasource will stream data. Only Ref A is supported."
checked={streaming || false}
onChange={this.onStreamingChange}
<div className="gf-form">
<Switch
label="Streaming"
labelClass="width-8"
tooltip="If checked, the datasource will stream data."
checked={streaming || false}
onChange={this.onStreamingChange}
/>
{streaming && (
<>
<FormField
labelWidth={8}
value={streamingInterval}
type="number"
onChange={this.onStreamingIntervalChange}
label="Interval"
tooltip="Streaming interval in milliseconds. Default is 1000ms. For multiple Streaming targets minimum value will be taken."
placeholder="1000"
/>
{streaming && (
<>
<FormField
labelWidth={8}
value={streamingInterval}
type="number"
onChange={this.onStreamingIntervalChange}
label="Interval"
tooltip="Streaming interval in milliseconds. Default is 1000ms."
placeholder="1000"
/>
<FormField
labelWidth={8}
value={streamingCapacity}
type="number"
onChange={this.onStreamingCapacityChange}
label="Capacity"
tooltip="Values will be constantly added and will never exceed the given capacity. Default is 1000."
placeholder="1000"
/>
</>
)}
</div>
<FormField
labelWidth={8}
value={streamingCapacity}
type="number"
onChange={this.onStreamingCapacityChange}
label="Capacity"
tooltip="Values will be constantly added and will never exceed the given capacity. Default is 1000."
placeholder="1000"
/>
</>
)}
</div>

{streaming && (
<div className="gf-form">
<InlineFormLabel width={8} tooltip="If checked Time series, the last line of data will be applied.">
Data type
</InlineFormLabel>
<RadioButtonGroup
options={StreamingDataTypes}
value={streamingDataType || StreamingDataType.TIMESERIES}
onChange={this.onStreamingDataTypeChange}
/>
{streamingDataType !== StreamingDataType.DATAFRAME && (
<FormField
className={css`
margin-left: 5px;
`}
labelWidth={8}
inputWidth={30}
value={field}
tooltip="Specify field(s) to return from backend. Required for Alerting to trigger on specific fields."
onChange={this.onFieldChange}
label="Filter Field"
/>
)}
</div>
{streaming && (
<div className="gf-form">
<InlineFormLabel width={8} tooltip="If checked Time series, the last line of data will be applied.">
Data type
</InlineFormLabel>
<RadioButtonGroup
options={StreamingDataTypes}
value={streamingDataType || StreamingDataType.TIMESERIES}
onChange={this.onStreamingDataTypeChange}
/>
{streamingDataType !== StreamingDataType.DATAFRAME && (
<FormField
className={css`
margin-left: 5px;
`}
labelWidth={8}
inputWidth={30}
value={field}
tooltip="Specify field(s) to return from backend. Required for Alerting to trigger on specific fields."
onChange={this.onFieldChange}
label="Filter Field"
/>
)}
</>
</div>
)}

<Button onClick={onRunQuery}>Run</Button>
Expand Down
2 changes: 2 additions & 0 deletions src/data-source/data-source.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,7 @@ describe('DataSource', () => {
subscriber.next({
data: [
toDataFrame({
refId: 'A',
fields: [
{
name: 'get',
Expand Down Expand Up @@ -149,6 +150,7 @@ describe('DataSource', () => {
query: '',
streaming: true,
streamingCapacity: 1,
streamingInterval: 1,
},
],
});
Expand Down
55 changes: 29 additions & 26 deletions src/data-source/data-source.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
import { head } from 'lodash';
import { Observable } from 'rxjs';
import { map as map$, switchMap as switchMap$ } from 'rxjs/operators';
import {
Expand All @@ -12,8 +11,8 @@ import {
} from '@grafana/data';
import { DataSourceWithBackend, getTemplateSrv } from '@grafana/runtime';
import { DefaultStreamingInterval, StreamingDataType } from '../constants';
import { DataFrameFormatter, TimeSeriesFormatter } from '../frame-formatters';
import { RedisQuery } from '../redis';
import { TimeSeriesStreaming } from '../time-series';
import { RedisDataSourceOptions } from '../types';

/**
Expand Down Expand Up @@ -92,57 +91,61 @@ export class DataSource extends DataSourceWithBackend<RedisQuery, RedisDataSourc
* Override query to support streaming
*/
query(request: DataQueryRequest<RedisQuery>): Observable<DataQueryResponse> {
const refA = head(request.targets);

/**
* No query
* Need to typescript types narrowing
*/
if (!refA) {
if (!request.targets.length) {
return super.query(request);
}

/**
* No streaming enabled
*/
if (!refA?.streaming) {
const streaming = request.targets.filter((target) => target.streaming);
if (!streaming.length) {
return super.query(request);
}

/**
* Streaming enabled
*/
return new Observable<DataQueryResponse>((subscriber) => {
const { streamingDataType = StreamingDataType.TIMESERIES } = refA;
const frames: { [id: string]: TimeSeriesStreaming } = {};
request.targets.forEach((target) => {
/**
* Time-series frame
*/
if (target.streamingDataType !== StreamingDataType.DATAFRAME) {
frames[target.refId] = new TimeSeriesStreaming(target);
}
});

/**
* Apply frame formatted by streamingDataType
* Get minimum Streaming Interval
*/
let frame: TimeSeriesFormatter | DataFrameFormatter;
if (streamingDataType === StreamingDataType.DATAFRAME) {
frame = new DataFrameFormatter();
} else {
frame = new TimeSeriesFormatter(refA);
}
const streamingInterval = request.targets.map((target) =>
target.streamingInterval ? target.streamingInterval : DefaultStreamingInterval
);

/**
* Interval
*/
const intervalId = setInterval(async () => {
/**
* Run Query
*/
const data = await frame.update(super.query(request));
if (!data) {
return;
}
const response = await super.query(request).toPromise();

subscriber.next({
data: [data],
key: refA.refId,
state: LoadingState.Streaming,
response.data.forEach(async (frame) => {
if (frames[frame.refId]) {
frame = await frames[frame.refId].update(frame.fields);
}

subscriber.next({
data: [frame],
key: frame.refId,
state: LoadingState.Streaming,
});
});
}, refA.streamingInterval || DefaultStreamingInterval);
}, Math.min(...streamingInterval));

return () => {
clearInterval(intervalId);
Expand Down
32 changes: 0 additions & 32 deletions src/frame-formatters/data/data.test.ts

This file was deleted.

17 changes: 0 additions & 17 deletions src/frame-formatters/data/data.ts

This file was deleted.

1 change: 0 additions & 1 deletion src/frame-formatters/data/index.ts

This file was deleted.

2 changes: 0 additions & 2 deletions src/frame-formatters/index.ts

This file was deleted.

Loading

0 comments on commit 7d1a751

Please sign in to comment.