Skip to content

Commit

Permalink
Merge pull request #3898 from nadment/3715
Browse files Browse the repository at this point in the history
Add IExecutionStoppedListener to IWorkflowEngine  #3715
  • Loading branch information
hansva authored May 3, 2024
2 parents d9ac1b5 + 8462f11 commit fe01692
Show file tree
Hide file tree
Showing 17 changed files with 699 additions and 207 deletions.
111 changes: 74 additions & 37 deletions engine/src/main/java/org/apache/hop/pipeline/Pipeline.java
Original file line number Diff line number Diff line change
Expand Up @@ -1063,7 +1063,7 @@ public void prepareExecution() throws HopException {

// Just for safety, fire the pipeline finished listeners...
try {
firePipelineExecutionFinishedListeners();
fireExecutionFinishedListeners();
} catch (HopException e) {
// listeners produces errors
log.logError(BaseMessages.getString(PKG, "Pipeline.FinishListeners.Exception"));
Expand Down Expand Up @@ -1109,7 +1109,7 @@ public void startThreads() throws HopException {
ExtensionPointHandler.callExtensionPoint(
log, this, HopExtensionPoint.PipelineStartThreads.id, this);

firePipelineExecutionStartedListeners();
fireExecutionStartedListeners();

for (int i = 0; i < transforms.size(); i++) {
final TransformMetaDataCombi sid = transforms.get(i);
Expand Down Expand Up @@ -1137,7 +1137,7 @@ public void startThreads() throws HopException {
executionEndDate = new Date();

try {
firePipelineExecutionFinishedListeners();
fireExecutionFinishedListeners();
} catch (Exception e) {
transform.setErrors(transform.getErrors() + 1L);
log.logError(
Expand Down Expand Up @@ -1293,7 +1293,7 @@ public void run() {
// So we fire the execution finished listeners here.
//
if (transforms.isEmpty()) {
firePipelineExecutionFinishedListeners();
fireExecutionFinishedListeners();
}

if (log.isDetailed()) {
Expand All @@ -1312,16 +1312,23 @@ public void run() {
* @throws HopException if any errors occur during notification
*/
@Override
@Deprecated(since = "2.9", forRemoval = true)
public void firePipelineExecutionFinishedListeners() throws HopException {
fireExecutionFinishedListeners();
}

@Override
public void fireExecutionFinishedListeners() throws HopException {
synchronized (executionFinishedListeners) {
if (executionFinishedListeners.size() == 0) {
return;
}
// prevent Exception from one listener to block others execution
List<HopException> badGuys = new ArrayList<>(executionFinishedListeners.size());
for (IExecutionFinishedListener executionListener : executionFinishedListeners) {
for (IExecutionFinishedListener<IPipelineEngine<PipelineMeta>> listener :
executionFinishedListeners) {
try {
executionListener.finished(this);
listener.finished(this);
} catch (HopException e) {
badGuys.add(e);
}
Expand Down Expand Up @@ -1357,10 +1364,17 @@ public void pipelineCompleted() throws HopException {
* @throws HopException if any errors occur during notification
*/
@Override
@Deprecated(since = "2.9", forRemoval = true)
public void firePipelineExecutionStartedListeners() throws HopException {
fireExecutionStartedListeners();
}

@Override
public void fireExecutionStartedListeners() throws HopException {
synchronized (executionStartedListeners) {
for (IExecutionStartedListener executionListener : executionStartedListeners) {
executionListener.started(this);
for (IExecutionStartedListener<IPipelineEngine<PipelineMeta>> listener :
executionStartedListeners) {
listener.started(this);
}
}
}
Expand Down Expand Up @@ -1595,7 +1609,7 @@ public void safeStop() {
}
transforms.stream().filter(this::isInputTransform).forEach(combi -> stopTransform(combi, true));

firePipelineExecutionStoppedListeners();
fireExecutionStoppedListeners();
}

private boolean isInputTransform(TransformMetaDataCombi combi) {
Expand All @@ -1617,7 +1631,7 @@ public void stopAll() {
setStopped(true);
isAlreadyStopped.set(true);

firePipelineExecutionStoppedListeners();
fireExecutionStoppedListeners();
}

public void stopTransform(TransformMetaDataCombi combi, boolean safeStop) {
Expand All @@ -1638,11 +1652,16 @@ public void stopTransform(TransformMetaDataCombi combi, boolean safeStop) {
}

@Override
@Deprecated(since = "2.9", forRemoval = true)
public void firePipelineExecutionStoppedListeners() {
// Fire the stopped listener...
//
fireExecutionStoppedListeners();
}

@Override
public void fireExecutionStoppedListeners() {
synchronized (executionStoppedListeners) {
for (IExecutionStoppedListener listener : executionStoppedListeners) {
for (IExecutionStoppedListener<IPipelineEngine<PipelineMeta>> listener :
executionStoppedListeners) {
listener.stopped(this);
}
}
Expand Down Expand Up @@ -2483,39 +2502,51 @@ public void setTransformPerformanceSnapShots(
this.transformPerformanceSnapShots = transformPerformanceSnapShots;
}

/**
* Adds a pipeline started listener.
*
* @param executionStartedListener the pipeline started listener
*/
@Override
public void addExecutionStartedListener(IExecutionStartedListener executionStartedListener) {
synchronized (executionStartedListener) {
executionStartedListeners.add(executionStartedListener);
public void addExecutionStartedListener(
IExecutionStartedListener<IPipelineEngine<PipelineMeta>> listener) {
synchronized (executionStartedListeners) {
executionStartedListeners.add(listener);
}
}

/**
* Adds a pipeline finished listener.
*
* @param executionFinishedListener the pipeline finished listener
*/
@Override
public void addExecutionFinishedListener(IExecutionFinishedListener executionFinishedListener) {
synchronized (executionFinishedListener) {
executionFinishedListeners.add(executionFinishedListener);
public void removeExecutionStartedListener(
IExecutionStartedListener<IPipelineEngine<PipelineMeta>> listener) {
synchronized (executionStartedListeners) {
executionStartedListeners.remove(listener);
}
}

@Override
public void addExecutionFinishedListener(
IExecutionFinishedListener<IPipelineEngine<PipelineMeta>> listener) {
synchronized (executionFinishedListeners) {
executionFinishedListeners.add(listener);
}
}

@Override
public void removeExecutionFinishedListener(
IExecutionFinishedListener<IPipelineEngine<PipelineMeta>> listener) {
synchronized (executionFinishedListeners) {
executionFinishedListeners.remove(listener);
}
}

/**
* Adds a pipeline stopped listener.
*
* @param executionStoppedListener the pipeline stopped listener
*/
@Override
public void addExecutionStoppedListener(IExecutionStoppedListener executionStoppedListener) {
synchronized (executionStoppedListener) {
executionStoppedListeners.add(executionStoppedListener);
public void addExecutionStoppedListener(
IExecutionStoppedListener<IPipelineEngine<PipelineMeta>> listener) {
synchronized (executionStoppedListeners) {
executionStoppedListeners.add(listener);
}
}

@Override
public void removeExecutionStoppedListener(
IExecutionStoppedListener<IPipelineEngine<PipelineMeta>> listener) {
synchronized (executionStoppedListeners) {
executionStoppedListeners.remove(listener);
}
}

Expand All @@ -2524,6 +2555,7 @@ public void addExecutionStoppedListener(IExecutionStoppedListener executionStopp
*
* @param executionStoppedListeners the list of stop-event listeners to set
*/
@Deprecated(since = "2.9", forRemoval = true)
public void setExecutionStoppedListeners(
List<IExecutionStoppedListener<IPipelineEngine<PipelineMeta>>> executionStoppedListeners) {
this.executionStoppedListeners = Collections.synchronizedList(executionStoppedListeners);
Expand All @@ -2535,6 +2567,7 @@ public void setExecutionStoppedListeners(
*
* @return the list of stop-event listeners
*/
@Deprecated(since = "2.9", forRemoval = true)
public List<IExecutionStoppedListener<IPipelineEngine<PipelineMeta>>>
getExecutionStoppedListeners() {
return executionStoppedListeners;
Expand Down Expand Up @@ -3543,6 +3576,7 @@ public void setFeedbackSize(int feedbackSize) {
*
* @return value of executionStartedListeners
*/
@Deprecated(since = "2.9", forRemoval = true)
public List<IExecutionStartedListener<IPipelineEngine<PipelineMeta>>>
getExecutionStartedListeners() {
return executionStartedListeners;
Expand All @@ -3551,6 +3585,7 @@ public void setFeedbackSize(int feedbackSize) {
/**
* @param executionStartedListeners The executionStartedListeners to set
*/
@Deprecated(since = "2.9", forRemoval = true)
public void setExecutionStartedListeners(
List<IExecutionStartedListener<IPipelineEngine<PipelineMeta>>> executionStartedListeners) {
this.executionStartedListeners = executionStartedListeners;
Expand All @@ -3561,6 +3596,7 @@ public void setExecutionStartedListeners(
*
* @return value of executionFinishedListeners
*/
@Deprecated(since = "2.9", forRemoval = true)
public List<IExecutionFinishedListener<IPipelineEngine<PipelineMeta>>>
getExecutionFinishedListeners() {
return executionFinishedListeners;
Expand All @@ -3569,6 +3605,7 @@ public void setExecutionStartedListeners(
/**
* @param executionFinishedListeners The executionFinishedListeners to set
*/
@Deprecated(since = "2.9", forRemoval = true)
public void setExecutionFinishedListeners(
List<IExecutionFinishedListener<IPipelineEngine<PipelineMeta>>> executionFinishedListeners) {
this.executionFinishedListeners = executionFinishedListeners;
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -195,38 +195,92 @@ public interface IPipelineEngine<T extends PipelineMeta>
boolean isPaused();

/**
* Call the given listener lambda when this pipeline engine has started execution.
* Attach a listener to notify when the pipeline has started execution.
*
* @param listener
* @throws HopException
* @param listener the pipeline started listener
*/
void addExecutionStartedListener(IExecutionStartedListener<IPipelineEngine<T>> listener)
throws HopException;
void addExecutionStartedListener(IExecutionStartedListener<IPipelineEngine<T>> listener);

void firePipelineExecutionStartedListeners() throws HopException;
/**
* Detach a listener to notify when the pipeline has started execution.
*
* @param listener the pipeline started listener
*/
void removeExecutionStartedListener(IExecutionStartedListener<IPipelineEngine<T>> listener);

/**
* Call the given listener lambda when this pipeline engine has completed execution.
* Use the {@link #fireExecutionStartedListeners} method.
*
* @param listener
* @throws HopException
*/
void addExecutionFinishedListener(IExecutionFinishedListener<IPipelineEngine<T>> listener)
throws HopException;
@Deprecated(since = "2.9", forRemoval = true)
void firePipelineExecutionStartedListeners() throws HopException;

void firePipelineExecutionFinishedListeners() throws HopException;
/**
* Make attempt to fire all registered started execution listeners if possible.
*
* @throws HopException if any errors occur during notification
*/
void fireExecutionStartedListeners() throws HopException;

/**
* Attach a listener to notify when the pipeline has completed execution.
*
* @param listener the pipeline finished listener
*/
void addExecutionFinishedListener(IExecutionFinishedListener<IPipelineEngine<T>> listener);

/**
* Call the given listener lambda when this pipeline engine has stopped execution.
* Detach a listener to notify when the pipeline has completed execution.
*
* @param listener the pipeline finished listener
*/
void removeExecutionFinishedListener(IExecutionFinishedListener<IPipelineEngine<T>> listener);

/**
* Use the {@link #fireExecutionFinishedListeners} method.
*
* @param listener
* @throws HopException
*/
void addExecutionStoppedListener(IExecutionStoppedListener<IPipelineEngine<T>> listener)
throws HopException;
@Deprecated(since = "2.9", forRemoval = true)
void firePipelineExecutionFinishedListeners() throws HopException;

/**
* Make attempt to fire all registered finished execution listeners if possible.
*
* @throws HopException if any errors occur during notification
*/
void fireExecutionFinishedListeners() throws HopException;

/**
* Attach a listener to notify when the pipeline has stopped execution.
*
* @param listener the pipeline stopped listener
*/
void addExecutionStoppedListener(IExecutionStoppedListener<IPipelineEngine<T>> listener);

/**
* Detach a listener to notify when the pipeline has stopped execution.
*
* @param listener the pipeline stopped listener
*/
void removeExecutionStoppedListener(IExecutionStoppedListener<IPipelineEngine<T>> listener);

/**
* Use the {@link #fireExecutionStoppedListeners} method.
*
* @throws HopException
*/
@Deprecated(since = "2.9", forRemoval = true)
void firePipelineExecutionStoppedListeners() throws HopException;

/**
* Make attempt to fire all registered stopped execution listeners if possible.
*
* @throws HopException if any errors occur during notification
*/
void fireExecutionStoppedListeners() throws HopException;

/**
* Retrieve the logging text of a particular component in the engine
*
Expand Down Expand Up @@ -301,7 +355,7 @@ void addExecutionStoppedListener(IExecutionStoppedListener<IPipelineEngine<T>> l
*
* @param parentPipeline
*/
void setParentPipeline(IPipelineEngine parentPipeline);
void setParentPipeline(IPipelineEngine<PipelineMeta> parentPipeline);

/**
* Inform the pipeline about a previous execution result in a workflow or pipeline
Expand Down
Loading

0 comments on commit fe01692

Please sign in to comment.