Skip to content

Commit

Permalink
remove obsolete TODOs and refactor (Netflix#3161)
Browse files Browse the repository at this point in the history
* remove obsolete TODOs and refactor

* spotless

* using mockbean

* Revert "using mockbean"

This reverts commit e1cb986.

* revert changes to event processor

* fix tests
  • Loading branch information
apanicker-nflx authored Aug 10, 2022
1 parent 7738287 commit 290ab2b
Show file tree
Hide file tree
Showing 32 changed files with 42,806 additions and 3,593 deletions.
906 changes: 841 additions & 65 deletions annotations-processor/dependencies.lock

Large diffs are not rendered by default.

656 changes: 642 additions & 14 deletions annotations/dependencies.lock

Large diffs are not rendered by default.

1,373 changes: 1,223 additions & 150 deletions awss3-storage/dependencies.lock

Large diffs are not rendered by default.

1,463 changes: 1,311 additions & 152 deletions awssqs-event-queue/dependencies.lock

Large diffs are not rendered by default.

2,248 changes: 2,094 additions & 154 deletions cassandra-persistence/dependencies.lock

Large diffs are not rendered by default.

1,977 changes: 1,827 additions & 150 deletions client-spring/dependencies.lock

Large diffs are not rendered by default.

2,381 changes: 2,281 additions & 100 deletions client/dependencies.lock

Large diffs are not rendered by default.

1,213 changes: 1,124 additions & 89 deletions common/dependencies.lock

Large diffs are not rendered by default.

1,653 changes: 1,550 additions & 103 deletions core/dependencies.lock

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,7 @@ public DefaultEventProcessor(
this.objectMapper = objectMapper;
this.jsonUtils = jsonUtils;
this.evaluators = evaluators;
this.retryTemplate = retryTemplate;

if (properties.getEventProcessorThreadCount() <= 0) {
throw new IllegalStateException(
Expand All @@ -105,7 +106,6 @@ public DefaultEventProcessor(
properties.getEventProcessorThreadCount(), threadFactory);

this.isEventMessageIndexingEnabled = properties.isEventMessageIndexingEnabled();
this.retryTemplate = retryTemplate;
LOGGER.info("Event Processing is ENABLED");
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,7 @@
*/
package com.netflix.conductor.core.events;

import java.util.Comparator;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.*;
import java.util.stream.Collectors;

import org.apache.commons.lang3.StringUtils;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -838,11 +838,6 @@ WorkflowModel completeWorkflow(WorkflowModel workflow) {
throw new ConflictException(msg);
}

// FIXME Backwards compatibility for legacy workflows already running.
// This code will be removed in a future version.
if (workflow.getWorkflowDefinition() == null) {
workflow = metadataMapperService.populateWorkflowWithDefinitions(workflow);
}
deciderService.updateWorkflowOutput(workflow, null);

workflow.setStatus(WorkflowModel.Status.COMPLETED);
Expand Down Expand Up @@ -906,12 +901,6 @@ public WorkflowModel terminateWorkflow(
workflow.setStatus(WorkflowModel.Status.TERMINATED);
}

// FIXME Backwards compatibility for legacy workflows already running.
// This code will be removed in a future version.
if (workflow.getWorkflowDefinition() == null) {
workflow = metadataMapperService.populateWorkflowWithDefinitions(workflow);
}

try {
deciderService.updateWorkflowOutput(workflow, null);
} catch (Exception e) {
Expand Down Expand Up @@ -1037,13 +1026,6 @@ public void updateTask(TaskResult taskResult) {
String workflowId = taskResult.getWorkflowInstanceId();
WorkflowModel workflowInstance = executionDAOFacade.getWorkflowModel(workflowId, false);

// FIXME Backwards compatibility for legacy workflows already running.
// This code will be removed in a future version.
if (workflowInstance.getWorkflowDefinition() == null) {
workflowInstance =
metadataMapperService.populateWorkflowWithDefinitions(workflowInstance);
}

TaskModel task =
Optional.ofNullable(executionDAOFacade.getTaskModel(taskResult.getTaskId()))
.orElseThrow(
Expand Down Expand Up @@ -1288,10 +1270,6 @@ public boolean decide(String workflowId) {
// If it is a new workflow, the tasks will be still empty even though include tasks is true
WorkflowModel workflow = executionDAOFacade.getWorkflowModel(workflowId, true);

// FIXME Backwards compatibility for legacy workflows already running.
// This code will be removed in a future version.
workflow = metadataMapperService.populateWorkflowWithDefinitions(workflow);

if (workflow.getStatus().isTerminal()) {
if (!workflow.getStatus().isSuccessful()) {
cancelNonTerminalTasks(workflow);
Expand Down Expand Up @@ -1539,10 +1517,6 @@ public void skipTaskFromWorkflow(

WorkflowModel workflow = executionDAOFacade.getWorkflowModel(workflowId, true);

// FIXME Backwards compatibility for legacy workflows already running.
// This code will be removed in a future version.
workflow = metadataMapperService.populateWorkflowWithDefinitions(workflow);

// If the workflow is not running then cannot skip any task
if (!workflow.getStatus().equals(WorkflowModel.Status.RUNNING)) {
String errorMsg =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,33 +12,18 @@
*/
package com.netflix.conductor.service;

import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.*;
import java.util.stream.Collectors;

import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Service;

import com.netflix.conductor.annotations.Trace;
import com.netflix.conductor.common.metadata.events.EventExecution;
import com.netflix.conductor.common.metadata.tasks.PollData;
import com.netflix.conductor.common.metadata.tasks.Task;
import com.netflix.conductor.common.metadata.tasks.TaskDef;
import com.netflix.conductor.common.metadata.tasks.TaskExecLog;
import com.netflix.conductor.common.metadata.tasks.TaskResult;
import com.netflix.conductor.common.run.ExternalStorageLocation;
import com.netflix.conductor.common.run.SearchResult;
import com.netflix.conductor.common.run.TaskSummary;
import com.netflix.conductor.common.run.Workflow;
import com.netflix.conductor.common.run.WorkflowSummary;
import com.netflix.conductor.common.metadata.tasks.*;
import com.netflix.conductor.common.run.*;
import com.netflix.conductor.common.utils.ExternalPayloadStorage;
import com.netflix.conductor.common.utils.ExternalPayloadStorage.Operation;
import com.netflix.conductor.common.utils.ExternalPayloadStorage.PayloadType;
Expand Down Expand Up @@ -612,13 +597,25 @@ public List<TaskExecLog> getTaskLogs(String taskId) {
/**
* Get external uri for the payload
*
* @param operation the type of {@link Operation} to be performed
* @param payloadType the {@link PayloadType} at the external uri
* @param path the path for which the external storage location is to be populated
* @param operation the type of {@link Operation} to be performed
* @param type the {@link PayloadType} at the external uri
* @return the external uri at which the payload is stored/to be stored
*/
public ExternalStorageLocation getExternalStorageLocation(
Operation operation, PayloadType payloadType, String path) {
return externalPayloadStorage.getLocation(operation, payloadType, path);
String path, String operation, String type) {
try {
ExternalPayloadStorage.Operation payloadOperation =
ExternalPayloadStorage.Operation.valueOf(StringUtils.upperCase(operation));
ExternalPayloadStorage.PayloadType payloadType =
ExternalPayloadStorage.PayloadType.valueOf(StringUtils.upperCase(type));
return externalPayloadStorage.getLocation(payloadOperation, payloadType, path);
} catch (Exception e) {
String errorMsg =
String.format(
"Invalid input - Operation: %s, PayloadType: %s", operation, type);
LOGGER.error(errorMsg);
throw new IllegalArgumentException(errorMsg);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@
import com.netflix.conductor.common.run.ExternalStorageLocation;
import com.netflix.conductor.common.run.SearchResult;
import com.netflix.conductor.common.run.TaskSummary;
import com.netflix.conductor.common.utils.ExternalPayloadStorage;
import com.netflix.conductor.core.utils.QueueUtils;
import com.netflix.conductor.dao.QueueDAO;
import com.netflix.conductor.metrics.Monitors;
Expand Down Expand Up @@ -365,22 +364,6 @@ public SearchResult<Task> searchV2(
*/
public ExternalStorageLocation getExternalStorageLocation(
String path, String operation, String type) {
try {
ExternalPayloadStorage.Operation payloadOperation =
ExternalPayloadStorage.Operation.valueOf(StringUtils.upperCase(operation));
ExternalPayloadStorage.PayloadType payloadType =
ExternalPayloadStorage.PayloadType.valueOf(StringUtils.upperCase(type));
return executionService.getExternalStorageLocation(payloadOperation, payloadType, path);
} catch (Exception e) {
// FIXME: for backwards compatibility
LOGGER.error(
"Invalid input - Operation: {}, PayloadType: {}, defaulting to WRITE/TASK_OUTPUT",
operation,
type);
return executionService.getExternalStorageLocation(
ExternalPayloadStorage.Operation.WRITE,
ExternalPayloadStorage.PayloadType.TASK_OUTPUT,
path);
}
return executionService.getExternalStorageLocation(path, operation, type);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
import java.util.Map;
import java.util.Optional;

import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Service;
Expand All @@ -32,7 +31,6 @@
import com.netflix.conductor.common.run.SearchResult;
import com.netflix.conductor.common.run.Workflow;
import com.netflix.conductor.common.run.WorkflowSummary;
import com.netflix.conductor.common.utils.ExternalPayloadStorage;
import com.netflix.conductor.core.exception.NotFoundException;
import com.netflix.conductor.core.execution.WorkflowExecutor;
import com.netflix.conductor.core.utils.Utils;
Expand Down Expand Up @@ -520,22 +518,6 @@ public SearchResult<Workflow> searchWorkflowsByTasksV2(
*/
public ExternalStorageLocation getExternalStorageLocation(
String path, String operation, String type) {
try {
ExternalPayloadStorage.Operation payloadOperation =
ExternalPayloadStorage.Operation.valueOf(StringUtils.upperCase(operation));
ExternalPayloadStorage.PayloadType payloadType =
ExternalPayloadStorage.PayloadType.valueOf(StringUtils.upperCase(type));
return executionService.getExternalStorageLocation(payloadOperation, payloadType, path);
} catch (Exception e) {
// FIXME: for backwards compatibility
LOGGER.error(
"Invalid input - Operation: {}, PayloadType: {}, defaulting to WRITE/WORKFLOW_INPUT",
operation,
type);
return executionService.getExternalStorageLocation(
ExternalPayloadStorage.Operation.WRITE,
ExternalPayloadStorage.PayloadType.WORKFLOW_INPUT,
path);
}
return executionService.getExternalStorageLocation(path, operation, type);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -40,18 +40,9 @@

import com.fasterxml.jackson.databind.ObjectMapper;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyBoolean;
import static org.mockito.ArgumentMatchers.anyMap;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.doNothing;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import static org.junit.Assert.*;
import static org.mockito.ArgumentMatchers.*;
import static org.mockito.Mockito.*;

@ContextConfiguration(classes = {TestObjectMapperConfiguration.class})
@RunWith(SpringRunner.class)
Expand Down
Loading

0 comments on commit 290ab2b

Please sign in to comment.