Skip to content

Commit

Permalink
graph
Browse files Browse the repository at this point in the history
  • Loading branch information
serprex committed Oct 31, 2024
1 parent 8c5f996 commit 1e92f7e
Show file tree
Hide file tree
Showing 8 changed files with 94 additions and 46 deletions.
33 changes: 28 additions & 5 deletions flow/cmd/mirror_status.go
Original file line number Diff line number Diff line change
Expand Up @@ -195,6 +195,32 @@ func (h *FlowRequestHandler) cdcFlowStatus(
}, nil
}

func (h *FlowRequestHandler) CDCGraph(ctx context.Context, req *protos.GraphRequest) (*protos.GraphResponse, error) {
truncField := "minute"
switch req.AggregateType {
case "hour", "day", "month":
truncField = req.AggregateType
}
rows, err := h.pool.Query(ctx, `select tm, coalesce(sum(rows_in_batch), 0)
from generate_series(date_trunc($2, now() - $1::INTERVAL * 30), now(), $1::INTERVAL) tm
left join peerdb_stats.cdc_batches on start_time >= tm and start_time < tm + $1::INTERVAL
group by 1 order by 1`, req.AggregateType, truncField)
if err != nil {
return nil, err
}
var data []*protos.GraphResponseItem
var t time.Time
var r int64
if _, err := pgx.ForEachRow(rows, []any{&t, &r}, func() error {
data = append(data, &protos.GraphResponseItem{Time: float64(t.UnixMilli()), Rows: float64(r)})
return nil
}); err != nil {
return nil, err
}

return &protos.GraphResponse{Data: data}, nil
}

func (h *FlowRequestHandler) InitialLoadSummary(
ctx context.Context,
req *protos.InitialLoadSummaryRequest,
Expand Down Expand Up @@ -542,11 +568,8 @@ func (h *FlowRequestHandler) CDCBatches(ctx context.Context, req *protos.GetCDCB
var rowsBehind int32
if len(batches) > 0 {
firstId := batches[0].BatchId
if err := h.pool.QueryRow(
ctx,
"select count(distinct batch_id), count(distinct batch_id) filter (where batch_id > $2) from peerdb_stats.cdc_batches where flow_name=$1 and start_time is not null",
req.FlowJobName,
firstId,
if err := h.pool.QueryRow(ctx, `select count(distinct batch_id), count(distinct batch_id) filter (where batch_id > $2)
from peerdb_stats.cdc_batches where flow_name=$1 and start_time is not null`, req.FlowJobName, firstId,
).Scan(&total, &rowsBehind); err != nil {
return nil, err
}
Expand Down
18 changes: 18 additions & 0 deletions protos/route.proto
Original file line number Diff line number Diff line change
Expand Up @@ -354,6 +354,20 @@ message GetCDCBatchesResponse {
int32 page = 3;
}

message GraphRequest {
string flow_job_name = 1;
string aggregate_type = 2; // TODO name?
}

message GraphResponseItem {
double time = 1;
double rows = 2;
}

message GraphResponse {
repeated GraphResponseItem data = 1;
}

message MirrorLog {
string flow_name = 1;
string error_message = 2;
Expand Down Expand Up @@ -557,6 +571,10 @@ service FlowService {
option (google.api.http) = { post: "/v1/mirrors/cdc/batches", body: "*" };
}

rpc CDCGraph(GraphRequest) returns (GraphResponse) {
option (google.api.http) = { post: "/v1/mirrors/cdc/graph", body: "*" };
}

rpc InitialLoadSummary(InitialLoadSummaryRequest) returns (InitialLoadSummaryResponse) {
option (google.api.http) = { get: "/v1/mirrors/cdc/initial_load/{parent_mirror_name}" };
}
Expand Down
4 changes: 1 addition & 3 deletions ui/app/mirrors/[mirrorId]/aggregatedCountsByInterval.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ type timestampType = {
count: number;
};

function aggregateCountsByInterval(
export default function aggregateCountsByInterval(
timestamps: timestampType[],
interval: TimeAggregateTypes
): [string, number][] {
Expand Down Expand Up @@ -83,5 +83,3 @@ function aggregateCountsByInterval(

return resultArray;
}

export default aggregateCountsByInterval;
58 changes: 38 additions & 20 deletions ui/app/mirrors/[mirrorId]/cdcGraph.tsx
Original file line number Diff line number Diff line change
@@ -1,31 +1,51 @@
'use client';
import SelectTheme from '@/app/styles/select';
import { TimeAggregateTypes, timeOptions } from '@/app/utils/graph';
import {
formatGraphLabel,
TimeAggregateTypes,
timeOptions,
} from '@/app/utils/graph';
import { Label } from '@/lib/Label';
import { BarChart } from '@tremor/react';
import { useMemo, useState } from 'react';
import { useEffect, useState } from 'react';
import ReactSelect from 'react-select';

type CdcGraphProps = {};
type CdcGraphProps = { mirrorName: string };

function CdcGraph({}: CdcGraphProps) {
let [aggregateType, setAggregateType] = useState<TimeAggregateTypes>(
export default function CdcGraph({ mirrorName }: CdcGraphProps) {
const [aggregateType, setAggregateType] = useState<TimeAggregateTypes>(
TimeAggregateTypes.HOUR
);
const [graphValues, setGraphValues] = useState<
{ name: string; 'Rows synced at a point in time': number }[]
>([]);

const graphValues = useMemo(() => {
return []; /* TODO
const rows = syncs.map((sync) => ({
timestamp: sync.endTime,
count: sync.numRows,
}));
let timedRowCounts = aggregateCountsByInterval(rows, aggregateType);
timedRowCounts = timedRowCounts.slice(0, 29).reverse();
return timedRowCounts.map((count) => ({
name: formatGraphLabel(new Date(count[0]), aggregateType),
'Rows synced at a point in time': Number(count[1]),
})); */
}, [aggregateType]);
useEffect(() => {
const fetchData = async () => {
const req: any = {
flowJobName: mirrorName,
aggregateType,
};

const res = await fetch('/api/v1/mirrors/cdc/graph', {
method: 'POST',
headers: {
'Content-Type': 'application/json',
},
cache: 'no-store',
body: JSON.stringify(req),
});
const data: { data: { time: number; rows: number }[] } = await res.json();
setGraphValues(
data.data.map(({ time, rows }) => ({
name: formatGraphLabel(new Date(time), aggregateType),
'Rows synced at a point in time': Number(rows),
}))
);
};

fetchData();
}, [mirrorName, aggregateType]);

return (
<div>
Expand All @@ -51,5 +71,3 @@ function CdcGraph({}: CdcGraphProps) {
</div>
);
}

export default CdcGraph;
11 changes: 2 additions & 9 deletions ui/app/mirrors/[mirrorId]/page.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -51,11 +51,7 @@ export default function ViewMirror({ params: { mirrorId } }: EditMirrorProps) {
let actionsDropdown = null;

if (mirrorState?.cdcStatus) {
syncStatusChild = (
<SyncStatus
flowJobName={mirrorId}
/>
);
syncStatusChild = <SyncStatus flowJobName={mirrorId} />;

const dbType = dBTypeFromJSON(mirrorState.cdcStatus.destinationType);

Expand Down Expand Up @@ -92,10 +88,7 @@ export default function ViewMirror({ params: { mirrorId } }: EditMirrorProps) {
<Header variant='title2'>{mirrorId}</Header>
{actionsDropdown}
</div>
<CDCMirror
syncStatusChild={syncStatusChild}
status={mirrorState}
/>
<CDCMirror syncStatusChild={syncStatusChild} status={mirrorState} />
</LayoutMain>
);
} else if (mirrorState?.qrepStatus) {
Expand Down
12 changes: 5 additions & 7 deletions ui/app/mirrors/[mirrorId]/qrepGraph.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -17,22 +17,20 @@ type QRepGraphProps = {
};

function QrepGraph({ syncs }: QRepGraphProps) {
let [aggregateType, setAggregateType] = useState<TimeAggregateTypes>(
const [aggregateType, setAggregateType] = useState<TimeAggregateTypes>(
TimeAggregateTypes.HOUR
);
const initialCount: [string, number][] = [];
let [counts, setCounts] = useState(initialCount);
const [counts, setCounts] = useState(initialCount);

useEffect(() => {
let rows = syncs.map((sync) => ({
const rows = syncs.map((sync) => ({
timestamp: sync.startTime!,
count: Number(sync.rowsInPartition) ?? 0,
}));

let counts = aggregateCountsByInterval(rows, aggregateType);
counts = counts.slice(0, 29);
counts = counts.reverse();
setCounts(counts);
const counts = aggregateCountsByInterval(rows, aggregateType);
setCounts(counts.slice(0, 29).reverse());
}, [aggregateType, syncs]);

return (
Expand Down
2 changes: 1 addition & 1 deletion ui/app/mirrors/[mirrorId]/syncStatus.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ export default function SyncStatus({ flowJobName }: SyncStatusProps) {
<div>
<RowsDisplay totalRowsData={tableStats.totalData} />
<div className='my-10'>
<CdcGraph />
<CdcGraph mirrorName={flowJobName} />
</div>
<SyncStatusTable mirrorName={flowJobName} />
<TableStats tableSyncs={tableStats.tablesData} />
Expand Down
2 changes: 1 addition & 1 deletion ui/app/mirrors/[mirrorId]/syncStatusTable.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ export const SyncStatusTable = ({ mirrorName }: SyncStatusTableProps) => {
beforeId: beforeId,
afterId: afterId,
};
const res = await fetch(`/api/v1/mirrors/cdc/batches`, {
const res = await fetch('/api/v1/mirrors/cdc/batches', {
method: 'POST',
headers: {
'Content-Type': 'application/json',
Expand Down

0 comments on commit 1e92f7e

Please sign in to comment.