Skip to content

Commit

Permalink
fix: nit
Browse files Browse the repository at this point in the history
  • Loading branch information
iamKunalGupta committed Nov 1, 2024
1 parent eafd270 commit ce78585
Show file tree
Hide file tree
Showing 6 changed files with 39 additions and 36 deletions.
23 changes: 6 additions & 17 deletions flow/activities/maintenance_activity.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,10 +43,10 @@ type MaintenanceMirrorInfoItem struct {

func (a *MaintenanceActivity) GetAllMirrors(ctx context.Context) (MaintenanceMirrorsInfo, error) {
rows, err := a.CatalogPool.Query(ctx, `
select distinct on(f.name)
f.id, f.name, f.workflow_id,
f.created_at, coalesce(f.query_string, '')='' is_cdc
from flows f
select distinct on(name)
id, name, workflow_id,
created_at, coalesce(query_string, '')='' is_cdc
from flows
`)
if err != nil {
return MaintenanceMirrorsInfo{}, err
Expand All @@ -63,18 +63,7 @@ func (a *MaintenanceActivity) GetAllMirrors(ctx context.Context) (MaintenanceMir
}

func (a *MaintenanceActivity) getMirrorStatus(ctx context.Context, mirrorInfo MaintenanceMirrorInfoItem) (protos.FlowStatus, error) {
encodedState, err := a.TemporalClient.QueryWorkflow(ctx, mirrorInfo.WorkflowId, "", shared.FlowStatusQuery)
if err != nil {
slog.Error("Error querying mirror status for maintenance", "mirror", mirrorInfo.Name, "workflowId", mirrorInfo.WorkflowId, "error", err)
return protos.FlowStatus_STATUS_UNKNOWN, err
}
var state protos.FlowStatus
if err = encodedState.Get(&state); err != nil {
slog.Error("Error decoding mirror status for maintenance", "mirror", mirrorInfo.Name, "workflowId", mirrorInfo.WorkflowId, "error", err)
return protos.FlowStatus_STATUS_UNKNOWN,
fmt.Errorf("error decoding mirror status for maintenance: %w", err)
}
return state, nil
return shared.GetWorkflowStatus(ctx, a.TemporalClient, mirrorInfo.WorkflowId)
}

func (a *MaintenanceActivity) WaitForRunningSnapshots(ctx context.Context) (MaintenanceMirrorsInfo, error) {
Expand Down Expand Up @@ -212,7 +201,7 @@ func (a *MaintenanceActivity) PauseMirrorIfRunning(ctx context.Context, mirrorIn
return true, nil
}

func (a *MaintenanceActivity) CleanupBackupedFlows(ctx context.Context) error {
func (a *MaintenanceActivity) CleanBackedUpFlows(ctx context.Context) error {
_, err := a.CatalogPool.Exec(ctx, `
update maintenance.maintenance_flows
set state = $1,
Expand Down
15 changes: 1 addition & 14 deletions flow/cmd/mirror_status.go
Original file line number Diff line number Diff line change
Expand Up @@ -406,20 +406,7 @@ func (h *FlowRequestHandler) isCDCFlow(ctx context.Context, flowJobName string)
}

func (h *FlowRequestHandler) getWorkflowStatus(ctx context.Context, workflowID string) (protos.FlowStatus, error) {
res, err := h.temporalClient.QueryWorkflow(ctx, workflowID, "", shared.FlowStatusQuery)
if err != nil {
slog.Error(fmt.Sprintf("failed to get status in workflow with ID %s: %s", workflowID, err.Error()))
return protos.FlowStatus_STATUS_UNKNOWN,
fmt.Errorf("failed to get status in workflow with ID %s: %w", workflowID, err)
}
var state protos.FlowStatus
err = res.Get(&state)
if err != nil {
slog.Error(fmt.Sprintf("failed to get status in workflow with ID %s: %s", workflowID, err.Error()))
return protos.FlowStatus_STATUS_UNKNOWN,
fmt.Errorf("failed to get status in workflow with ID %s: %w", workflowID, err)
}
return state, nil
return shared.GetWorkflowStatus(ctx, h.temporalClient, workflowID)
}

func (h *FlowRequestHandler) getCDCWorkflowState(ctx context.Context,
Expand Down
26 changes: 26 additions & 0 deletions flow/shared/worklow.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
package shared

import (
"context"
"fmt"
"github.com/PeerDB-io/peer-flow/generated/protos"
"go.temporal.io/sdk/client"
"log/slog"
)

func GetWorkflowStatus(ctx context.Context, temporalClient client.Client, workflowID string) (protos.FlowStatus, error) {
res, err := temporalClient.QueryWorkflow(ctx, workflowID, "", FlowStatusQuery)
if err != nil {
slog.Error(fmt.Sprintf("failed to get status in workflow with ID %s: %s", workflowID, err.Error()))
return protos.FlowStatus_STATUS_UNKNOWN,
fmt.Errorf("failed to get status in workflow with ID %s: %w", workflowID, err)
}
var state protos.FlowStatus
err = res.Get(&state)
if err != nil {
slog.Error(fmt.Sprintf("failed to get status in workflow with ID %s: %s", workflowID, err.Error()))
return protos.FlowStatus_STATUS_UNKNOWN,
fmt.Errorf("failed to get status in workflow with ID %s: %w", workflowID, err)
}
return state, nil
}
2 changes: 1 addition & 1 deletion flow/workflows/maintenance_flow.go
Original file line number Diff line number Diff line change
Expand Up @@ -204,7 +204,7 @@ func endMaintenance(ctx workflow.Context, logger log.Logger) (*protos.EndMainten
return nil, err
}

clearBackupsFuture := workflow.ExecuteActivity(ctx, maintenance.CleanupBackupedFlows)
clearBackupsFuture := workflow.ExecuteActivity(ctx, maintenance.CleanBackedUpFlows)
err = clearBackupsFuture.Get(ctx, nil)
if err != nil {
return nil, err
Expand Down
4 changes: 2 additions & 2 deletions nexus/catalog/migrations/V40__maintenance_flows.sql
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,8 @@ CREATE SCHEMA IF NOT EXISTS maintenance;

CREATE TABLE IF NOT EXISTS maintenance.maintenance_flows
(
id bigint PRIMARY KEY,
flow_id serial NOT NULL,
id serial PRIMARY KEY,
flow_id bigint NOT NULL,
flow_name TEXT NOT NULL,
workflow_id TEXT NOT NULL,
flow_created_at TIMESTAMP NOT NULL,
Expand Down
5 changes: 3 additions & 2 deletions protos/route.proto
Original file line number Diff line number Diff line change
Expand Up @@ -437,8 +437,9 @@ message InstanceInfoResponse {
}

enum MaintenanceStatus {
MAINTENANCE_STATUS_START = 0;
MAINTENANCE_STATUS_END = 1;
MAINTENANCE_STATUS_UNKOWN = 0;
MAINTENANCE_STATUS_START = 1;
MAINTENANCE_STATUS_END = 2;
}

message MaintenanceRequest {
Expand Down

0 comments on commit ce78585

Please sign in to comment.