Skip to content

Commit

Permalink
Synchronize After Merge - specify connection through variables #3839
Browse files Browse the repository at this point in the history
  • Loading branch information
nadment authored and hansva committed May 16, 2024
1 parent 2cf44e4 commit 3f7cf41
Show file tree
Hide file tree
Showing 7 changed files with 87 additions and 119 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -628,8 +628,7 @@ public IRowMeta getRequiredFields(IVariables variables) throws HopException {
getParentTransformMeta().getParentPipelineMeta().findDatabase(connection, variables);

if (databaseMeta != null) {
Database db = new Database(loggingObject, variables, databaseMeta);
try {
try (Database db = new Database(loggingObject, variables, databaseMeta)) {
db.connect();

if (!Utils.isEmpty(realTableName)) {
Expand All @@ -647,8 +646,6 @@ public IRowMeta getRequiredFields(IVariables variables) throws HopException {
} catch (Exception e) {
throw new HopException(
BaseMessages.getString(PKG, "InsertUpdateMeta.Exception.ErrorGettingFields"), e);
} finally {
db.disconnect();
}
} else {
throw new HopException(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ private synchronized void lookupValues(Object[] row) throws HopException {
meta.getOperationOrderField()));
}

if (meta.istablenameInField()) {
if (meta.isTableNameInField()) {
// get dynamic table name
data.realTableName = data.inputRowMeta.getString(row, data.indexOfTableNameField);
if (Utils.isEmpty(data.realTableName)) {
Expand Down Expand Up @@ -112,7 +112,7 @@ private synchronized void lookupValues(Object[] row) throws HopException {
insertRowData[i] = row[data.valuenrs[i]];
}

if (meta.istablenameInField()) {
if (meta.isTableNameInField()) {
data.insertStatement = data.preparedStatements.get(data.realSchemaTable + "insert");
if (data.insertStatement == null) {
String sql =
Expand Down Expand Up @@ -165,7 +165,7 @@ private synchronized void lookupValues(Object[] row) throws HopException {

// LOOKUP

if (meta.istablenameInField()) {
if (meta.isTableNameInField()) {
// Prepare Lookup statement
data.lookupStatement = data.preparedStatements.get(data.realSchemaTable + "lookup");
if (data.lookupStatement == null) {
Expand Down Expand Up @@ -241,7 +241,7 @@ private synchronized void lookupValues(Object[] row) throws HopException {
if (!meta.isPerformLookup() || updateorDelete) {
// UPDATE :

if (meta.istablenameInField()) {
if (meta.isTableNameInField()) {
data.updateStatement = data.preparedStatements.get(data.realSchemaTable + "update");
if (data.updateStatement == null) {
String sql = getUpdateStatement(data.inputRowMeta);
Expand Down Expand Up @@ -297,7 +297,7 @@ private synchronized void lookupValues(Object[] row) throws HopException {
} else if (operation.equals(data.deleteValue)) {
// DELETE

if (meta.istablenameInField()) {
if (meta.isTableNameInField()) {
data.deleteStatement = data.preparedStatements.get(data.realSchemaTable + "delete");

if (data.deleteStatement == null) {
Expand Down Expand Up @@ -574,15 +574,13 @@ public String getLookupStatement(IRowMeta rowMeta) throws HopDatabaseException {
data.lookupParameterRowMeta = new RowMeta();
data.lookupReturnRowMeta = new RowMeta();

DatabaseMeta databaseMeta = meta.getDatabaseMeta();

String sql = "SELECT ";

for (int i = 0; i < meta.getUpdateLookup().length; i++) {
if (i != 0) {
sql += ", ";
}
sql += databaseMeta.quoteField(meta.getUpdateLookup()[i]);
sql += data.databaseMeta.quoteField(meta.getUpdateLookup()[i]);
data.lookupReturnRowMeta.addValueMeta(
rowMeta.searchValueMeta(meta.getUpdateStream()[i]).clone());
}
Expand All @@ -593,7 +591,7 @@ public String getLookupStatement(IRowMeta rowMeta) throws HopDatabaseException {
if (i != 0) {
sql += " AND ";
}
sql += databaseMeta.quoteField(meta.getKeyLookup()[i]);
sql += data.databaseMeta.quoteField(meta.getKeyLookup()[i]);
if ("BETWEEN".equalsIgnoreCase(meta.getKeyCondition()[i])) {
sql += " BETWEEN ? AND ? ";
data.lookupParameterRowMeta.addValueMeta(rowMeta.searchValueMeta(meta.getKeyStream()[i]));
Expand All @@ -613,7 +611,6 @@ public String getLookupStatement(IRowMeta rowMeta) throws HopDatabaseException {

// Lookup certain fields in a table
public String getUpdateStatement(IRowMeta rowMeta) throws HopDatabaseException {
DatabaseMeta databaseMeta = meta.getDatabaseMeta();
data.updateParameterRowMeta = new RowMeta();

String sql = "UPDATE " + data.realSchemaTable + Const.CR;
Expand All @@ -629,7 +626,7 @@ public String getUpdateStatement(IRowMeta rowMeta) throws HopDatabaseException {
comma = true;
}

sql += databaseMeta.quoteField(meta.getUpdateLookup()[i]);
sql += data.databaseMeta.quoteField(meta.getUpdateLookup()[i]);
sql += " = ?" + Const.CR;
data.updateParameterRowMeta.addValueMeta(
rowMeta.searchValueMeta(meta.getUpdateStream()[i]).clone());
Expand All @@ -642,7 +639,7 @@ public String getUpdateStatement(IRowMeta rowMeta) throws HopDatabaseException {
if (i != 0) {
sql += "AND ";
}
sql += databaseMeta.quoteField(meta.getKeyLookup()[i]);
sql += data.databaseMeta.quoteField(meta.getKeyLookup()[i]);
if ("BETWEEN".equalsIgnoreCase(meta.getKeyCondition()[i])) {
sql += " BETWEEN ? AND ? ";
data.updateParameterRowMeta.addValueMeta(rowMeta.searchValueMeta(meta.getKeyStream()[i]));
Expand All @@ -660,7 +657,6 @@ public String getUpdateStatement(IRowMeta rowMeta) throws HopDatabaseException {
}

public String getDeleteStatement(IRowMeta rowMeta) throws HopDatabaseException {
DatabaseMeta databaseMeta = meta.getDatabaseMeta();
data.deleteParameterRowMeta = new RowMeta();

String sql = "DELETE FROM " + data.realSchemaTable + Const.CR;
Expand All @@ -671,7 +667,7 @@ public String getDeleteStatement(IRowMeta rowMeta) throws HopDatabaseException {
if (i != 0) {
sql += "AND ";
}
sql += databaseMeta.quoteField(meta.getKeyLookup()[i]);
sql += data.databaseMeta.quoteField(meta.getKeyLookup()[i]);
if ("BETWEEN".equalsIgnoreCase(meta.getKeyCondition()[i])) {
sql += " BETWEEN ? AND ? ";
data.deleteParameterRowMeta.addValueMeta(rowMeta.searchValueMeta(meta.getKeyStream()[i]));
Expand Down Expand Up @@ -702,14 +698,14 @@ public boolean processRow() throws HopException {
data.inputRowMeta = data.outputRowMeta;
meta.getFields(data.outputRowMeta, getTransformName(), null, null, this, metadataProvider);

if (meta.istablenameInField()) {
if (meta.isTableNameInField()) {
// ICache the position of the table name field
if (data.indexOfTableNameField < 0) {
data.indexOfTableNameField = data.inputRowMeta.indexOfValue(meta.gettablenameField());
data.indexOfTableNameField = data.inputRowMeta.indexOfValue(meta.getTableNameField());
if (data.indexOfTableNameField < 0) {
String message =
"It was not possible to find table ["
+ meta.gettablenameField()
+ meta.getTableNameField()
+ "] in the input fields.";
logError(message);
throw new HopTransformException(message);
Expand Down Expand Up @@ -823,7 +819,7 @@ public boolean processRow() throws HopException {
}
}

if (!meta.istablenameInField()) {
if (!meta.isTableNameInField()) {
// Prepare Lookup statement
if (meta.isPerformLookup()) {
data.lookupStatement = data.preparedStatements.get(data.realSchemaTable + "lookup");
Expand Down Expand Up @@ -906,21 +902,28 @@ public boolean processRow() throws HopException {
public boolean init() {
if (super.init()) {
try {

DatabaseMeta databaseMeta = getPipelineMeta().findDatabase(meta.getConnection(), variables);
if (databaseMeta == null) {
logError(
BaseMessages.getString(
PKG, "SynchronizeAfterMerge.Init.ConnectionMissing", getTransformName()));
return false;
}

meta.normalizeAllocationFields();
data.realSchemaName = resolve(meta.getSchemaName());
if (meta.istablenameInField()) {
if (Utils.isEmpty(meta.gettablenameField())) {
if (meta.isTableNameInField()) {
if (Utils.isEmpty(meta.getTableNameField())) {
logError(
BaseMessages.getString(PKG, "SynchronizeAfterMerge.Log.Error.TableFieldnameEmpty"));
return false;
}
}

data.databaseMeta = meta.getDatabaseMeta();

// if we are using Oracle then set releaseSavepoint to false
// TODO: change when we remove those variants of IDatabase
if (data.databaseMeta.getIDatabase().isOracleVariant()) {
if (databaseMeta.getIDatabase().isOracleVariant()) {
data.releaseSavepoint = false;
}

Expand All @@ -932,9 +935,9 @@ public boolean init() {
//
data.specialErrorHandling =
getTransformMeta().isDoingErrorHandling()
&& meta.getDatabaseMeta().supportsErrorHandlingOnBatchUpdates();
&& databaseMeta.supportsErrorHandlingOnBatchUpdates();

data.supportsSavepoints = meta.getDatabaseMeta().getIDatabase().isUseSafePoints();
data.supportsSavepoints = databaseMeta.getIDatabase().isUseSafePoints();

if (data.batchMode && data.specialErrorHandling) {
data.batchMode = false;
Expand All @@ -943,13 +946,8 @@ public boolean init() {
}
}

if (meta.getDatabaseMeta() == null) {
logError(
BaseMessages.getString(
PKG, "SynchronizeAfterMerge.Init.ConnectionMissing", getTransformName()));
return false;
}
data.db = new Database(this, this, meta.getDatabaseMeta());
data.databaseMeta = databaseMeta;
data.db = new Database(this, this, databaseMeta);
data.db.connect();
data.db.setCommit(data.commitSize);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package org.apache.hop.pipeline.transforms.synchronizeaftermerge;

import com.google.common.base.Strings;
import java.util.ArrayList;
import java.util.List;
import org.apache.commons.lang.StringUtils;
Expand Down Expand Up @@ -168,7 +169,7 @@ public void widgetSelected(SelectionEvent e) {
shell.setText(BaseMessages.getString(PKG, "SynchronizeAfterMergeDialog.Shell.Title"));

int middle = props.getMiddlePct();
int margin = props.getMargin();
int margin = PropsUi.getMargin();

// THE BUTTONS go at the bottom
wOk = new Button(shell, SWT.PUSH);
Expand Down Expand Up @@ -223,7 +224,7 @@ public void widgetSelected(SelectionEvent e) {
wGeneralComp.setLayout(generalLayout);

// Connection line
wConnection = addConnectionLine(wGeneralComp, wTransformName, input.getDatabaseMeta(), lsMod);
wConnection = addConnectionLine(wGeneralComp, wTransformName, input.getConnection(), lsMod);
wConnection.addSelectionListener(lsSelection);

// Schema line...
Expand Down Expand Up @@ -788,7 +789,7 @@ private void generateMappings() {
}

// refresh data
input.setDatabaseMeta(pipelineMeta.findDatabase(wConnection.getText(), variables));
input.setConnection(wConnection.getText());
input.setTableName(variables.resolve(wTable.getText()));
ITransformMeta transformMetaInterface = transformMeta.getTransform();
try {
Expand Down Expand Up @@ -1011,9 +1012,9 @@ public void getData() {
}

wCommit.setText(input.getCommitSize());
wTablenameInField.setSelection(input.istablenameInField());
if (input.gettablenameField() != null) {
wTableField.setText(input.gettablenameField());
wTablenameInField.setSelection(input.isTableNameInField());
if (input.getTableNameField() != null) {
wTableField.setText(input.getTableNameField());
}
wBatch.setSelection(input.useBatchUpdate());
if (input.getOperationOrderField() != null) {
Expand Down Expand Up @@ -1071,8 +1072,8 @@ public void getData() {
if (input.getTableName() != null) {
wTable.setText(input.getTableName());
}
if (input.getDatabaseMeta() != null) {
wConnection.setText(input.getDatabaseMeta().getName());
if (input.getConnection() != null) {
wConnection.setText(input.getConnection());
}

wKey.setRowNums();
Expand All @@ -1097,8 +1098,8 @@ private void getInfo(SynchronizeAfterMergeMeta inf) {
inf.allocate(nrkeys, nrFields);

inf.setCommitSize(wCommit.getText());
inf.settablenameInField(wTablenameInField.getSelection());
inf.settablenameField(wTableField.getText());
inf.setTableNameInField(wTablenameInField.getSelection());
inf.setTableNameField(wTableField.getText());
inf.setUseBatchUpdate(wBatch.getSelection());
inf.setPerformLookup(wPerformLookup.getSelection());

Expand Down Expand Up @@ -1132,9 +1133,9 @@ private void getInfo(SynchronizeAfterMergeMeta inf) {
inf.getUpdate()[i] = "Y".equals(item.getText(3));
}

inf.setConnection(wConnection.getText());
inf.setSchemaName(wSchema.getText());
inf.setTableName(wTable.getText());
inf.setDatabaseMeta(pipelineMeta.findDatabase(wConnection.getText(), variables));

transformName = wTransformName.getText(); // return value
}
Expand All @@ -1147,7 +1148,7 @@ private void ok() {
// Get the information for the dialog into the input structure.
getInfo(input);

if (input.getDatabaseMeta() == null) {
if (Strings.isNullOrEmpty(input.getConnection())) {
MessageBox mb = new MessageBox(shell, SWT.OK | SWT.ICON_ERROR);
mb.setMessage(
BaseMessages.getString(
Expand Down Expand Up @@ -1253,18 +1254,15 @@ private void create() {
info);
IRowMeta prev = pipelineMeta.getPrevTransformFields(variables, transformName);

DatabaseMeta databaseMeta = pipelineMeta.findDatabase(wConnection.getText(), variables);

SqlStatement sql =
info.getSqlStatements(variables, pipelineMeta, transformMeta, prev, metadataProvider);
if (!sql.hasError()) {
if (sql.hasSql()) {
SqlEditor sqledit =
new SqlEditor(
shell,
SWT.NONE,
variables,
info.getDatabaseMeta(),
DbCache.getInstance(),
sql.getSql());
shell, SWT.NONE, variables, databaseMeta, DbCache.getInstance(), sql.getSql());
sqledit.open();
} else {
MessageBox mb = new MessageBox(shell, SWT.OK | SWT.ICON_INFORMATION);
Expand Down Expand Up @@ -1292,8 +1290,7 @@ private void create() {
private void getSchemaNames() {
DatabaseMeta databaseMeta = pipelineMeta.findDatabase(wConnection.getText(), variables);
if (databaseMeta != null) {
Database database = new Database(loggingObject, variables, databaseMeta);
try {
try (Database database = new Database(loggingObject, variables, databaseMeta)) {
database.connect();
String[] schemas = database.getSchemas();

Expand Down Expand Up @@ -1329,8 +1326,6 @@ private void getSchemaNames() {
BaseMessages.getString(PKG, "System.Dialog.Error.Title"),
BaseMessages.getString(PKG, "SynchronizeAfterMergeDialog.ErrorGettingSchemas"),
e);
} finally {
database.disconnect();
}
}
}
Expand Down
Loading

0 comments on commit 3f7cf41

Please sign in to comment.