Skip to content

Commit

Permalink
fix: CSV to SQLServer
Browse files Browse the repository at this point in the history
  • Loading branch information
osalvador committed Aug 19, 2022
1 parent 4501f3c commit 4190724
Show file tree
Hide file tree
Showing 7 changed files with 606 additions and 485 deletions.
4 changes: 2 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -51,8 +51,8 @@ ReplicaDB is written in Java and requires a Java Runtime Environment (JRE) Stand
Just download [latest](https://github.com/osalvador/ReplicaDB/releases) release and unzip it.

```bash
$ curl -o ReplicaDB-0.12.1.tar.gz -L "https://github.com/osalvador/ReplicaDB/releases/download/v0.12.1/ReplicaDB-0.12.1.tar.gz"
$ tar -xvzf ReplicaDB-0.12.1.tar.gz
$ curl -o ReplicaDB-0.12.2.tar.gz -L "https://github.com/osalvador/ReplicaDB/releases/download/v0.12.2/ReplicaDB-0.12.2.tar.gz"
$ tar -xvzf ReplicaDB-0.12.2.tar.gz
$ ./bin/replicadb --help
```

Expand Down
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@

<groupId>org.replicadb</groupId>
<artifactId>ReplicaDB</artifactId>
<version>0.12.1</version>
<version>0.12.2</version>

<name>ReplicaDB</name>
<url>https://github.com/osalvador/ReplicaDB</url>
Expand Down
36 changes: 20 additions & 16 deletions src/main/java/org/replicadb/ReplicaTask.java
Original file line number Diff line number Diff line change
Expand Up @@ -8,16 +8,16 @@
import org.replicadb.manager.ManagerFactory;

import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Callable;

final public class ReplicaTask implements Callable<Integer> {
public final class ReplicaTask implements Callable<Integer> {

private static final Logger LOG = LogManager.getLogger(ReplicaTask.class.getName());

private int taskId;
private String taskName;
private ToolOptions options;
private final int taskId;
private final ToolOptions options;


public ReplicaTask(int id, ToolOptions options) {
Expand All @@ -28,17 +28,12 @@ public ReplicaTask(int id, ToolOptions options) {
@Override
public Integer call() throws Exception {

//System.out.println("Task ID :" + this.taskId + " performed by " + Thread.currentThread().getName());
this.taskName = "TaskId-"+this.taskId;
String taskName = "TaskId-" + this.taskId;

Thread.currentThread().setName(taskName);

LOG.info("Starting " + Thread.currentThread().getName());
LOG.info("Starting {}", Thread.currentThread().getName());

// Do stuff...
// Obtener una instancia del DriverManager del Source
// Obtener una instancia del DriverManager del Sink
// Mover datos de Source a Sink.
ManagerFactory managerF = new ManagerFactory();
ConnManager sourceDs = managerF.accept(options, DataSourceType.SOURCE);
ConnManager sinkDs = managerF.accept(options, DataSourceType.SINK);
Expand All @@ -47,22 +42,22 @@ public Integer call() throws Exception {
try {
sourceDs.getConnection();
} catch (Exception e) {
LOG.error("ERROR in " + this.taskName+ " getting Source connection: " + e.getMessage());
LOG.error("ERROR in {} getting Source connection: {} ", taskName, e.getMessage());
throw e;
}

try {
sinkDs.getConnection();
} catch (Exception e) {
LOG.error("ERROR in " + this.taskName + " getting Sink connection: " + e.getMessage());
LOG.error("ERROR in {} getting Sink connection:{} ", taskName,e.getMessage());
throw e;
}

ResultSet rs;
try {
rs = sourceDs.readTable(null, null, taskId);
} catch (Exception e) {
LOG.error("ERROR in " + this.taskName + " reading source table: " + e.getMessage());
LOG.error("ERROR in {} reading source table: {}", taskName, e.getMessage());
throw e;
}

Expand All @@ -71,7 +66,7 @@ public Integer call() throws Exception {
// TODO determine the total rows processed in all the managers
LOG.info("A total of {} rows processed by task {}", processedRows, taskId);
} catch (Exception e) {
LOG.error("ERROR in " + this.taskName + " inserting data to sink table: " + e.getMessage());
LOG.error("ERROR in {} inserting data to sink table: {} ", taskName, getExceptionMessageChain(e));
throw e;
}

Expand All @@ -83,5 +78,14 @@ public Integer call() throws Exception {

return this.taskId;
}

public static List<String> getExceptionMessageChain(Throwable throwable) {
List<String> result = new ArrayList<>();
while (throwable != null) {
result.add(throwable.getMessage());
throwable = throwable.getCause();
}
return result;
}
}

Loading

0 comments on commit 4190724

Please sign in to comment.