-
Notifications
You must be signed in to change notification settings - Fork 4.2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
(java) postgres destination #539
Conversation
4ba6055
to
45cbc82
Compare
@@ -0,0 +1,44 @@ | |||
{ | |||
"destinationId": "", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
need to adapt this to the new format.
bee0350
to
e95f4cd
Compare
private final ExecutorService[] threadPools; | ||
|
||
public SchedulerShutdownHandler(final ExecutorService... threadPools) { | ||
public GracefulShutdownHandler( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
NIT: weird line break
@@ -60,7 +61,7 @@ protected JsonNode getInvalidConfig() { | |||
} | |||
|
|||
@Override | |||
protected List<JsonNode> recordRetriever(TestDestinationEnv testEnv) throws Exception { | |||
protected Set<JsonNode> recordRetriever(TestDestinationEnv testEnv, String streamName) throws Exception { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You should keep List and sort or create an assert that is not sensitive to order. Set will make it hard to test for duplicated values.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
good point.
final String tableName = stream.getName(); | ||
final String tmpTableName = stream.getName() + "_" + Instant.now().toEpochMilli(); | ||
DatabaseHelper.query(connectionPool, ctx -> ctx.execute(String.format( | ||
"CREATE EXTENSION IF NOT EXISTS \"uuid-ossp\";\n" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why do you need this?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am injecting our own id in the output as a uuid. in order to do that we need to make sure that the uuid lib is installed in the database. the alternative here is to just save the uuid as a string so that we don't need to install the lib.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should prefer inserting as a string so we aren't modifying the user's db in unexpected ways. I wouldn't expect any ETL tool to install a plugin in my database.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Onboard with @jrhizor
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
kk
* @param writeBuffers - map of stream name to its respective buffer. | ||
* @param connectionPool - connection to the db. | ||
*/ | ||
private static void writeStreamsWithNRecords( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
NIT: weird line break.
final StringBuilder query = new StringBuilder(String.format("INSERT INTO %s(%s)\n", tmpTableName, COLUMN_NAME)) | ||
.append("VALUES \n"); | ||
boolean firstRecordInQuery = true; | ||
for (int i = 0; i <= batchSize; i++) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do you have an index error here? If it is expected, you should add a comment to explain it since it not a normal code structure
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
made it standard.
writerPool.shutdown(); | ||
writerPool.awaitTermination(1, TimeUnit.SECONDS); | ||
} else { | ||
LOGGER.error("executing on success close procedure."); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This shouldn't be an error log
writeConfigs.values().forEach(writeConfig -> writeConfig.getWriteBuffer().closeInput()); | ||
|
||
if (hasFailed) { | ||
LOGGER.error("executing on failed close procedure."); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This shoudln't be an error log
e95f4cd
to
3be63f8
Compare
integrationTestImplementation project(':airbyte-integrations:integration-test-lib') | ||
integrationTestImplementation project(':airbyte-integrations:postgres-destination') | ||
|
||
testImplementation "org.testcontainers:postgresql:1.15.0-rc2" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: group integrationTestImplementation
together
return DatabaseHelper.getConnectionPool(db.getUsername(), db.getPassword(), db.getJdbcUrl()); | ||
} | ||
|
||
private Set<JsonNode> recordRetriever(String streamName) throws Exception { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this is duplicated from the integration test?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes.
private final ExecutorService[] threadPools; | ||
|
||
public SchedulerShutdownHandler(final ExecutorService... threadPools) { | ||
public GracefulShutdownHandler( | ||
long terminationWaitTime, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I slightly prefer Duration
here so the time and unit are tied together by whatever is providing it.
return new StandardCheckConnectionOutput().withStatus(Status.FAILURE).withMessage(e.getMessage()); | ||
} | ||
|
||
return new StandardCheckConnectionOutput().withStatus(Status.SUCCESS); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we move this into the end of the try
so the method reads linearly except in the exceptional case?
try { | ||
final BasicDataSource connectionPool = getConnectionPool(config); | ||
DatabaseHelper.query(connectionPool, ctx -> ctx.execute( | ||
"SELECT *\n" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is there a reason to do this query instead of SELECT 1
or something for a faster check
?
} | ||
|
||
@Override | ||
public StandardDiscoverSchemaOutput discover(JsonNode config) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does Destination
need this as part of the interface?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
eh. i dunno. we got to decide what we want out of discover with destination.
final String tableName = stream.getName(); | ||
final String tmpTableName = stream.getName() + "_" + Instant.now().toEpochMilli(); | ||
DatabaseHelper.query(connectionPool, ctx -> ctx.execute(String.format( | ||
"CREATE EXTENSION IF NOT EXISTS \"uuid-ossp\";\n" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should prefer inserting as a string so we aren't modifying the user's db in unexpected ways. I wouldn't expect any ETL tool to install a plugin in my database.
tmpTableName, COLUMN_NAME))); | ||
|
||
final Path queueRoot = Files.createTempDirectory("queues"); | ||
final BigQueue writeBuffer = new BigQueue(queueRoot.resolve(stream.getName()), stream.getName()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How do we handle two sources with the same named streams syncing at the same time?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I assumed these would be running in separate workers, but I guess the source shouldn't assume it?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
different workers. why shouldn't the source assume this? we add noise at the end of the paths to fix this, but i think the contract is that the source runs on its own docker image (with its own filesystem).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
you're right -- each source should run in its own container. i was being silly in my head .
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM % comment about batch size/missed batches
private final Map<String, WriteConfig> writeConfigs; | ||
private final Schema schema; | ||
|
||
public RecordConsumer(BasicDataSource connectionPool, Map<String, WriteConfig> writeConfigs, Schema schema) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
public RecordConsumer(BasicDataSource connectionPool, Map<String, WriteConfig> writeConfigs, Schema schema) { | |
public RecordConsumer(BasicDataSource connectionPool, Map<String, WriteConfig> streamNameToWriteConfigs, Schema schema) { |
* @param writeBuffers - map of stream name to its respective buffer. | ||
* @param connectionPool - connection to the db. | ||
*/ | ||
private static void writeStreamsWithNRecords(int minRecords, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
wouldn't this miss the last batch if it is smaller than the minimum batch size?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
check out the close function on consumer.
.stream() | ||
.map(Record::intoMap) | ||
.map(r -> r.entrySet().stream().map(e -> { | ||
if (e.getValue().getClass().equals(org.jooq.JSONB.class)) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why would this not be the case for all records?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it's a jooq issue. it couldn't create a record with a value that had a jsonb class:
adding this comment:
// jooq needs more configuration to handle jsonb natively. coerce it to a string for now and handle deserializing later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
awesome, thanks for retaining the context in a comment
} | ||
|
||
@Override | ||
protected List<JsonNode> recordRetriever(TestDestinationEnv env, String streamName) throws Exception { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
protected List<JsonNode> recordRetriever(TestDestinationEnv env, String streamName) throws Exception { | |
protected List<JsonNode> retrieveRecords(TestDestinationEnv env, String streamName) throws Exception { |
a0c96ec
to
2f525f9
Compare
c9e65b6
to
936a3e9
Compare
3be63f8
to
ff18d12
Compare
8b763e3
to
718528f
Compare
What
GracefulShutdownHandler
forExecutorService
s to commons.transaction
method to DatabaseHelper.closes #540
First Impressions
Checklist
Recommended reading order
I recommend focusing on the postgres-destination code and then the little updates to commons and utility methods will make sense from there.
PostgresDestination.java
,PostgresDestinationTest.java
,PostgresDestinationIntegrationTest.java