Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Bug Fix for issue: JoinOperation errors are not propagated to the main EtlProcess #19

Merged
merged 2 commits into from
Jul 15, 2015
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
111 changes: 111 additions & 0 deletions Rhino.Etl.Core/Operations/AbstractJoinOperation.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;

namespace Rhino.Etl.Core.Operations
{
/// <summary>
/// Perform a join between two sources.
/// </summary>
public abstract class AbstractJoinOperation : AbstractOperation
{
/// <summary>
/// The left process
/// </summary>
protected readonly PartialProcessOperation left = new PartialProcessOperation();

/// <summary>
/// The rigth process
/// </summary>
protected readonly PartialProcessOperation right = new PartialProcessOperation();

/// <summary>
/// Is left registered?
/// </summary>
protected bool leftRegistered = false;

/// <summary>
/// Initializes this instance.
/// </summary>
protected virtual void Initialize()
{
}

/// <summary>
/// Called when a row on the right side was filtered by
/// the join condition, allow a derived class to perform
/// logic associated to that, such as logging
/// </summary>
protected virtual void RightOrphanRow(Row row)
{
}

/// <summary>
/// Called when a row on the left side was filtered by
/// the join condition, allow a derived class to perform
/// logic associated to that, such as logging
/// </summary>
/// <param name="row">The row.</param>
protected virtual void LeftOrphanRow(Row row)
{
}

/// <summary>
/// Check left/right branches are not null
/// </summary>
protected void PrepareForJoin()
{
Initialize();
Guard.Against(left == null, "Left branch of a join cannot be null");
Guard.Against(right == null, "Right branch of a join cannot be null");
}

/// <summary>
/// Initializes this instance
/// </summary>
/// <param name="pipelineExecuter">The current pipeline executer.</param>
public override void PrepareForExecution(IPipelineExecuter pipelineExecuter)
{
left.PrepareForExecution(pipelineExecuter);
right.PrepareForExecution(pipelineExecuter);
}

/// <summary>
/// Gets all errors that occured when running this operation
/// </summary>
/// <returns></returns>
public override IEnumerable<Exception> GetAllErrors()
{
foreach (Exception error in left.GetAllErrors())
{
yield return error;
}
foreach (Exception error in right.GetAllErrors())
{
yield return error;
}
foreach (Exception error in Errors)
{
yield return error;
}
}

/// <summary>
/// Merges the two rows into a single row
/// </summary>
/// <param name="leftRow">The left row.</param>
/// <param name="rightRow">The right row.</param>
/// <returns></returns>
protected abstract Row MergeRows(Row leftRow, Row rightRow);

/// <summary>
/// Performs application-defined tasks associated with freeing, releasing, or resetting unmanaged resources.
/// </summary>
public override void Dispose()
{
left.Dispose();
right.Dispose();
}
}
}
120 changes: 16 additions & 104 deletions Rhino.Etl.Core/Operations/JoinOperation.cs
Original file line number Diff line number Diff line change
@@ -1,22 +1,19 @@
namespace Rhino.Etl.Core.Operations
{
using Enumerables;
using System;
using System.Collections.Generic;
using Enumerables;

/// <summary>
/// Perform a join between two sources. The left part of the join is optional and if not specified it will use the current pipeline as input.
/// </summary>
public abstract class JoinOperation : AbstractOperation
public abstract class JoinOperation : AbstractJoinOperation
{
private readonly PartialProcessOperation left = new PartialProcessOperation();
private readonly PartialProcessOperation right = new PartialProcessOperation();
private JoinType jointype;
private string[] leftColumns;
private string[] rightColumns;
private Dictionary<Row, object> rightRowsWereMatched = new Dictionary<Row, object>();
private Dictionary<ObjectArrayKeys, List<Row>> rightRowsByJoinKey = new Dictionary<ObjectArrayKeys, List<Row>>();
private bool leftRegistered = false;

/// <summary>
/// Sets the right part of the join
Expand All @@ -28,7 +25,6 @@ public JoinOperation Right(IOperation value)
return this;
}


/// <summary>
/// Sets the left part of the join
/// </summary>
Expand All @@ -49,6 +45,10 @@ public override IEnumerable<Row> Execute(IEnumerable<Row> rows)
{
PrepareForJoin();

SetupJoinConditions();
Guard.Against(leftColumns == null, "You must setup the left columns");
Guard.Against(rightColumns == null, "You must setup the right columns");

IEnumerable<Row> rightEnumerable = GetRightEnumerable();

IEnumerable<Row> execute = left.Execute(leftRegistered ? null : rows);
Expand Down Expand Up @@ -86,19 +86,6 @@ public override IEnumerable<Row> Execute(IEnumerable<Row> rows)
}
}

private void PrepareForJoin()
{
Initialize();

Guard.Against(left == null, "Left branch of a join cannot be null");
Guard.Against(right == null, "Right branch of a join cannot be null");

SetupJoinConditions();

Guard.Against(leftColumns == null, "You must setup the left columns");
Guard.Against(rightColumns == null, "You must setup the right columns");
}

private IEnumerable<Row> GetRightEnumerable()
{
IEnumerable<Row> rightEnumerable = new CachingEnumerable<Row>(
Expand All @@ -117,42 +104,6 @@ private IEnumerable<Row> GetRightEnumerable()
return rightEnumerable;
}

/// <summary>
/// Called when a row on the right side was filtered by
/// the join condition, allow a derived class to perform
/// logic associated to that, such as logging
/// </summary>
protected virtual void RightOrphanRow(Row row)
{

}

/// <summary>
/// Called when a row on the left side was filtered by
/// the join condition, allow a derived class to perform
/// logic associated to that, such as logging
/// </summary>
/// <param name="row">The row.</param>
protected virtual void LeftOrphanRow(Row row)
{

}

/// <summary>
/// Merges the two rows into a single row
/// </summary>
/// <param name="leftRow">The left row.</param>
/// <param name="rightRow">The right row.</param>
/// <returns></returns>
protected abstract Row MergeRows(Row leftRow, Row rightRow);

/// <summary>
/// Initializes this instance.
/// </summary>
protected virtual void Initialize()
{
}

/// <summary>
/// Setups the join conditions.
/// </summary>
Expand All @@ -176,7 +127,6 @@ protected JoinBuilder LeftJoin
get { return new JoinBuilder(this, JoinType.Left); }
}


/// <summary>
/// Create a right outer join
/// </summary>
Expand All @@ -186,7 +136,6 @@ protected JoinBuilder RightJoin
get { return new JoinBuilder(this, JoinType.Right); }
}


/// <summary>
/// Create a full outer join
/// </summary>
Expand All @@ -196,43 +145,6 @@ protected JoinBuilder FullOuterJoin
get { return new JoinBuilder(this, JoinType.Full); }
}


/// <summary>
/// Performs application-defined tasks associated with freeing, releasing, or resetting unmanaged resources.
/// </summary>
public override void Dispose()
{
left.Dispose();
right.Dispose();
}


/// <summary>
/// Initializes this instance
/// </summary>
/// <param name="pipelineExecuter">The current pipeline executer.</param>
public override void PrepareForExecution(IPipelineExecuter pipelineExecuter)
{
left.PrepareForExecution(pipelineExecuter);
right.PrepareForExecution(pipelineExecuter);
}

/// <summary>
/// Gets all errors that occured when running this operation
/// </summary>
/// <returns></returns>
public override IEnumerable<Exception> GetAllErrors()
{
foreach (Exception error in left.GetAllErrors())
{
yield return error;
}
foreach (Exception error in right.GetAllErrors())
{
yield return error;
}
}

/// <summary>
/// Fluent interface to create joins
/// </summary>
Expand Down Expand Up @@ -277,36 +189,36 @@ public JoinBuilder Right(params string[] columns)
/// <summary>
/// Occurs when a row is processed.
/// </summary>
public override event Action<IOperation, Row> OnRowProcessed
public override event Action<IOperation, Row> OnRowProcessed
{
add
{
foreach (IOperation operation in new[] { left, right })
operation.OnRowProcessed += value;
base.OnRowProcessed += value;
foreach (IOperation operation in new[] { left, right })
operation.OnRowProcessed += value;
base.OnRowProcessed += value;
}
remove
{
foreach (IOperation operation in new[] { left, right })
operation.OnRowProcessed -= value;
base.OnRowProcessed -= value;
foreach (IOperation operation in new[] { left, right })
operation.OnRowProcessed -= value;
base.OnRowProcessed -= value;
}
}

/// <summary>
/// Occurs when all the rows has finished processing.
/// </summary>
public override event Action<IOperation> OnFinishedProcessing
public override event Action<IOperation> OnFinishedProcessing
{
add
{
foreach (IOperation operation in new[] { left, right })
foreach (IOperation operation in new[] { left, right })
operation.OnFinishedProcessing += value;
base.OnFinishedProcessing += value;
}
remove
{
foreach (IOperation operation in new[] { left, right })
foreach (IOperation operation in new[] { left, right })
operation.OnFinishedProcessing -= value;
base.OnFinishedProcessing -= value;
}
Expand Down
Loading