Skip to content

Commit

Permalink
Update receiver name to match across methods:
Browse files Browse the repository at this point in the history
Signed-off-by: Jacob Weinstock <jakobweinstock@gmail.com>
  • Loading branch information
jacobweinstock committed Jun 3, 2024
1 parent 58196fb commit 06e43a3
Show file tree
Hide file tree
Showing 2 changed files with 27 additions and 27 deletions.
4 changes: 2 additions & 2 deletions internal/server/kubernetes_api.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,6 @@ func NewKubeBackedServerFromREST(logger logr.Logger, config *rest.Config, namesp
}

// Register registers the service on the gRPC server.
func (s *KubernetesBackedServer) Register(server *grpc.Server) {
proto.RegisterWorkflowServiceServer(server, s)
func (k *KubernetesBackedServer) Register(server *grpc.Server) {
proto.RegisterWorkflowServiceServer(server, k)
}
50 changes: 25 additions & 25 deletions internal/server/kubernetes_api_workflow.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,9 +34,9 @@ func getWorkflowContext(wf v1alpha1.Workflow) *proto.WorkflowContext {
}
}

func (s *KubernetesBackedServer) getCurrentAssignedNonTerminalWorkflowsForWorker(ctx context.Context, workerID string) ([]v1alpha1.Workflow, error) {
func (k *KubernetesBackedServer) getCurrentAssignedNonTerminalWorkflowsForWorker(ctx context.Context, workerID string) ([]v1alpha1.Workflow, error) {
stored := &v1alpha1.WorkflowList{}
err := s.ClientFunc().List(ctx, stored, &client.MatchingFields{
err := k.ClientFunc().List(ctx, stored, &client.MatchingFields{
workflowByNonTerminalState: workerID,
})
if err != nil {
Expand All @@ -52,43 +52,43 @@ func (s *KubernetesBackedServer) getCurrentAssignedNonTerminalWorkflowsForWorker
return wfs, nil
}

func (s *KubernetesBackedServer) getWorkflowByName(ctx context.Context, workflowID, namespace string) (*v1alpha1.Workflow, error) {
func (k *KubernetesBackedServer) getWorkflowByName(ctx context.Context, workflowID, namespace string) (*v1alpha1.Workflow, error) {
wflw := &v1alpha1.Workflow{}
err := s.ClientFunc().Get(ctx, types.NamespacedName{Name: workflowID, Namespace: namespace}, wflw)
err := k.ClientFunc().Get(ctx, types.NamespacedName{Name: workflowID, Namespace: namespace}, wflw)
if err != nil {
s.logger.Error(err, "get client", "workflow", workflowID)
k.logger.Error(err, "get client", "workflow", workflowID)
return nil, err
}
return wflw, nil
}

// The following APIs are used by the worker.

func (s *KubernetesBackedServer) GetWorkflowContexts(req *proto.WorkflowContextRequest, stream proto.WorkflowService_GetWorkflowContextsServer) error {
func (k *KubernetesBackedServer) GetWorkflowContexts(req *proto.WorkflowContextRequest, stream proto.WorkflowService_GetWorkflowContextsServer) error {
if req.GetWorkerId() == "" {
return status.Errorf(codes.InvalidArgument, errInvalidWorkflowID)
}
wflows, err := s.getCurrentAssignedNonTerminalWorkflowsForWorker(stream.Context(), req.WorkerId)
wflows, err := k.getCurrentAssignedNonTerminalWorkflowsForWorker(stream.Context(), req.WorkerId)
if err != nil {
return err
}

ctx := context.TODO()
id := req.WorkerId
if s.AutoCapMode != AutoCapModeDisabled && len(wflows) == 0 && (s.AutoCapMode == AutoCapModeDiscovery || s.AutoCapMode == AutoCapModeEnrollment) && !s.hardwareObjectExists(ctx, id) {
if k.AutoCapMode != AutoCapModeDisabled && len(wflows) == 0 && (k.AutoCapMode == AutoCapModeDiscovery || k.AutoCapMode == AutoCapModeEnrollment) && !k.hardwareObjectExists(ctx, id) {
// In the future, the worker could be signaled to send hardware device information to be used in creation of the Hardware object.
// or the proto.WorkflowContextRequest could be extended to include Hardware information.
if err := s.createHardwareObject(ctx, id); err != nil {
s.logger.Error(err, "failed to create hardware object")
if err := k.createHardwareObject(ctx, id); err != nil {
k.logger.Error(err, "failed to create hardware object")
return err
}

if s.AutoCapMode == AutoCapModeEnrollment {
if err := s.createWorkflowObject(ctx, id); err != nil {
s.logger.Error(err, "failed to create workflow object")
if k.AutoCapMode == AutoCapModeEnrollment {
if err := k.createWorkflowObject(ctx, id); err != nil {
k.logger.Error(err, "failed to create workflow object")
return err
}
wflows, err = s.getCurrentAssignedNonTerminalWorkflowsForWorker(stream.Context(), req.WorkerId)
wflows, err = k.getCurrentAssignedNonTerminalWorkflowsForWorker(stream.Context(), req.WorkerId)
if err != nil {
return err
}
Expand All @@ -103,20 +103,20 @@ func (s *KubernetesBackedServer) GetWorkflowContexts(req *proto.WorkflowContextR
return nil
}

func (s *KubernetesBackedServer) GetWorkflowActions(ctx context.Context, req *proto.WorkflowActionsRequest) (*proto.WorkflowActionList, error) {
func (k *KubernetesBackedServer) GetWorkflowActions(ctx context.Context, req *proto.WorkflowActionsRequest) (*proto.WorkflowActionList, error) {
wfID := req.GetWorkflowId()
if wfID == "" {
return nil, status.Errorf(codes.InvalidArgument, errInvalidWorkflowID)
}
wf, err := s.getWorkflowByName(ctx, wfID, s.namespace)
wf, err := k.getWorkflowByName(ctx, wfID, k.namespace)
if err != nil {
return nil, err
}
return workflow.ActionListCRDToProto(wf), nil
}

// Modifies a workflow for a given workflowContext.
func (s *KubernetesBackedServer) modifyWorkflowState(wf *v1alpha1.Workflow, wfContext *proto.WorkflowContext) error {
func (k *KubernetesBackedServer) modifyWorkflowState(wf *v1alpha1.Workflow, wfContext *proto.WorkflowContext) error {
if wf == nil {
return errors.New("no workflow provided")
}
Expand Down Expand Up @@ -157,19 +157,19 @@ cont:
// Workflow is running, so set the start time to now
wf.Status.State = v1alpha1.WorkflowState(proto.State_name[int32(wfContext.CurrentActionState)])
wf.Status.Tasks[taskIndex].Actions[actionIndex].StartedAt = func() *metav1.Time {
t := metav1.NewTime(s.nowFunc())
t := metav1.NewTime(k.nowFunc())
return &t
}()
case proto.State_STATE_FAILED, proto.State_STATE_TIMEOUT:
// Handle terminal statuses by updating the workflow state and time
wf.Status.State = v1alpha1.WorkflowState(proto.State_name[int32(wfContext.CurrentActionState)])
if wf.Status.Tasks[taskIndex].Actions[actionIndex].StartedAt != nil {
wf.Status.Tasks[taskIndex].Actions[actionIndex].Seconds = int64(s.nowFunc().Sub(wf.Status.Tasks[taskIndex].Actions[actionIndex].StartedAt.Time).Seconds())
wf.Status.Tasks[taskIndex].Actions[actionIndex].Seconds = int64(k.nowFunc().Sub(wf.Status.Tasks[taskIndex].Actions[actionIndex].StartedAt.Time).Seconds())
}
case proto.State_STATE_SUCCESS:
// Handle a success by marking the task as complete
if wf.Status.Tasks[taskIndex].Actions[actionIndex].StartedAt != nil {
wf.Status.Tasks[taskIndex].Actions[actionIndex].Seconds = int64(s.nowFunc().Sub(wf.Status.Tasks[taskIndex].Actions[actionIndex].StartedAt.Time).Seconds())
wf.Status.Tasks[taskIndex].Actions[actionIndex].Seconds = int64(k.nowFunc().Sub(wf.Status.Tasks[taskIndex].Actions[actionIndex].StartedAt.Time).Seconds())
}
// Mark success on last action success
if wfContext.CurrentActionIndex+1 == wfContext.TotalNumberOfActions {
Expand Down Expand Up @@ -204,15 +204,15 @@ func getWorkflowContextForRequest(req *proto.WorkflowActionStatus, wf *v1alpha1.
return wfContext
}

func (s *KubernetesBackedServer) ReportActionStatus(ctx context.Context, req *proto.WorkflowActionStatus) (*proto.Empty, error) {
func (k *KubernetesBackedServer) ReportActionStatus(ctx context.Context, req *proto.WorkflowActionStatus) (*proto.Empty, error) {
err := validateActionStatusRequest(req)
if err != nil {
return nil, err
}
wfID := req.GetWorkflowId()
l := s.logger.WithValues("actionName", req.GetActionName(), "status", req.GetActionStatus(), "workflowID", req.GetWorkflowId(), "taskName", req.GetTaskName(), "worker", req.WorkerId)
l := k.logger.WithValues("actionName", req.GetActionName(), "status", req.GetActionStatus(), "workflowID", req.GetWorkflowId(), "taskName", req.GetTaskName(), "worker", req.WorkerId)

wf, err := s.getWorkflowByName(ctx, wfID, s.namespace)
wf, err := k.getWorkflowByName(ctx, wfID, k.namespace)
if err != nil {
l.Error(err, "get workflow")
return nil, status.Errorf(codes.InvalidArgument, errInvalidWorkflowID)
Expand All @@ -225,13 +225,13 @@ func (s *KubernetesBackedServer) ReportActionStatus(ctx context.Context, req *pr
}

wfContext := getWorkflowContextForRequest(req, wf)
err = s.modifyWorkflowState(wf, wfContext)
err = k.modifyWorkflowState(wf, wfContext)
if err != nil {
l.Error(err, "modify workflow state")
return nil, status.Errorf(codes.InvalidArgument, errInvalidWorkflowID)
}
l.Info("updating workflow in Kubernetes")
err = s.ClientFunc().Status().Update(ctx, wf)
err = k.ClientFunc().Status().Update(ctx, wf)
if err != nil {
l.Error(err, "applying update to workflow")
return nil, status.Errorf(codes.InvalidArgument, errInvalidWorkflowID)
Expand Down

0 comments on commit 06e43a3

Please sign in to comment.