Skip to content

Commit

Permalink
get basically working
Browse files Browse the repository at this point in the history
  • Loading branch information
serprex committed Oct 31, 2024
1 parent 8f82a71 commit 8c5f996
Show file tree
Hide file tree
Showing 4 changed files with 23 additions and 15 deletions.
8 changes: 6 additions & 2 deletions flow/cmd/mirror_status.go
Original file line number Diff line number Diff line change
Expand Up @@ -456,6 +456,10 @@ func (h *FlowRequestHandler) getMirrorCreatedAt(ctx context.Context, flowJobName
}

func (h *FlowRequestHandler) GetCDCBatches(ctx context.Context, req *protos.GetCDCBatchesRequest) (*protos.GetCDCBatchesResponse, error) {
return h.CDCBatches(ctx, req)
}

func (h *FlowRequestHandler) CDCBatches(ctx context.Context, req *protos.GetCDCBatchesRequest) (*protos.GetCDCBatchesResponse, error) {
limit := req.Limit
limitClause := ""
if limit > 0 {
Expand Down Expand Up @@ -540,15 +544,15 @@ func (h *FlowRequestHandler) GetCDCBatches(ctx context.Context, req *protos.GetC
firstId := batches[0].BatchId
if err := h.pool.QueryRow(
ctx,
"select count(distinct batch_id), count(distinct batch_id) filter (where id > $2) from peerdb_stats.flow_errors where flow_name=$1",
"select count(distinct batch_id), count(distinct batch_id) filter (where batch_id > $2) from peerdb_stats.cdc_batches where flow_name=$1 and start_time is not null",
req.FlowJobName,
firstId,
).Scan(&total, &rowsBehind); err != nil {
return nil, err
}
} else if err := h.pool.QueryRow(
ctx,
"select count(distinct batch_id) from peerdb_stats.flow_errors where flow_name=$1",
"select count(distinct batch_id) from peerdb_stats.cdc_batches where flow_name=$1 and start_time is not null",
req.FlowJobName,
).Scan(&total); err != nil {
return nil, err
Expand Down
8 changes: 6 additions & 2 deletions protos/route.proto
Original file line number Diff line number Diff line change
Expand Up @@ -550,11 +550,15 @@ service FlowService {
}

rpc GetCDCBatches(GetCDCBatchesRequest) returns (GetCDCBatchesResponse) {
option (google.api.http) = { get: "/v1/mirrors/cdc/batches/{flow_job_name}"};
option (google.api.http) = { get: "/v1/mirrors/cdc/batches/{flow_job_name}" };
}

rpc CDCBatches(GetCDCBatchesRequest) returns (GetCDCBatchesResponse) {
option (google.api.http) = { post: "/v1/mirrors/cdc/batches", body: "*" };
}

rpc InitialLoadSummary(InitialLoadSummaryRequest) returns (InitialLoadSummaryResponse) {
option (google.api.http) = { get: "/v1/mirrors/cdc/initial_load/{parent_mirror_name}"};
option (google.api.http) = { get: "/v1/mirrors/cdc/initial_load/{parent_mirror_name}" };
}

rpc GetPeerInfo(PeerInfoRequest) returns (PeerInfoResponse) {
Expand Down
1 change: 1 addition & 0 deletions ui/app/mirrors/[mirrorId]/handlers.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ export const getMirrorState = async (
body: JSON.stringify({
flow_job_name,
include_flow_info: true,
exclude_batches: true,
}),
});
if (!res.ok) throw res.json();
Expand Down
21 changes: 10 additions & 11 deletions ui/app/mirrors/[mirrorId]/syncStatusTable.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -76,17 +76,14 @@ export const SyncStatusTable = ({ mirrorName }: SyncStatusTableProps) => {
beforeId: beforeId,
afterId: afterId,
};
const res = await fetch(
`/api/v1/mirrors/cdc/batches/${encodeURIComponent(mirrorName)}`,
{
method: 'POST',
headers: {
'Content-Type': 'application/json',
},
cache: 'no-store',
body: JSON.stringify(req),
}
);
const res = await fetch(`/api/v1/mirrors/cdc/batches`, {
method: 'POST',
headers: {
'Content-Type': 'application/json',
},
cache: 'no-store',
body: JSON.stringify(req),
});
const data: GetCDCBatchesResponse = await res.json();
setBatches(data.cdcBatches);
setCurrentPage(data.page);
Expand All @@ -100,11 +97,13 @@ export const SyncStatusTable = ({ mirrorName }: SyncStatusTableProps) => {
if (batches.length === 0) {
setBeforeAfterId([-1, -1]);
}
setBeforeAfterId([batches[batches.length - 1].batchId, -1]);
}, [batches]);
const prevPage = useCallback(() => {
if (batches.length === 0 || currentPage < 3) {
setBeforeAfterId([-1, -1]);
}
setBeforeAfterId([-1, batches[0].batchId]);
}, [batches, currentPage]);

return (
Expand Down

0 comments on commit 8c5f996

Please sign in to comment.