Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: iceberg timestamp value handling #49807

Merged
merged 4 commits into from
Dec 16, 2024

Conversation

subodh1810
Copy link
Contributor

@subodh1810 subodh1810 commented Dec 16, 2024

@subodh1810 subodh1810 self-assigned this Dec 16, 2024
@subodh1810 subodh1810 requested a review from a team as a code owner December 16, 2024 10:16
Copy link

vercel bot commented Dec 16, 2024

The latest updates on your projects. Learn more about Vercel for Git ↗︎

Name Status Preview Comments Updated (UTC)
airbyte-docs ✅ Ready (Inspect) Visit Preview 💬 Add feedback Dec 16, 2024 3:55pm

@@ -2,7 +2,7 @@ data:
connectorSubtype: file
connectorType: destination
definitionId: 37a928c1-2d5c-431a-a97d-ae236bd1ea0c
dockerImageTag: 0.1.13
dockerImageTag: 0.1.14
dockerRepository: airbyte/destination-iceberg-v2
githubIssueLabel: destination-iceberg-v2
icon: s3.svg
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: since we are bumping this, we should fix this icon reference :)

@@ -20,6 +20,9 @@ import org.apache.iceberg.data.GenericRecord
import org.apache.iceberg.types.Type

class AirbyteValueToIcebergRecord {

private val timeStringUtility = TimeStringUtility()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we are not going to let Micronaut manage/inject this, it may make sense to make it a Kotlin object instead of a class to make it a singleton. That would allow us to reference the methods in a static-like way.

import java.time.ZoneOffset
import java.time.ZonedDateTime

class TimeStringUtility {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Re: my previous comment -- this seems like this could be converted to an object.

Copy link
Contributor

@jdpgrailsdev jdpgrailsdev left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

:shipit:

Copy link
Contributor

@edgao edgao left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

one nit about file location + code reuse, otherwise lgtm

import java.time.ZoneOffset
import java.time.ZonedDateTime

object TimeStringUtility {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: this class could probably live in core? and then TimeStringToInteger could use these functions

@subodh1810 subodh1810 merged commit 369cde7 into master Dec 16, 2024
36 checks passed
@subodh1810 subodh1810 deleted the fix-timestamp-processing-iceberg branch December 16, 2024 18:25
barduinor pushed a commit to box-community/airbyte that referenced this pull request Dec 17, 2024
@edgao
Copy link
Contributor

edgao commented Dec 17, 2024

something might still be busted here? I tried enabling testBasicTypes on #49848

and the connector crashes on some timestamp value:

java.lang.ClassCastException: class java.time.OffsetDateTime cannot be cast to class java.time.LocalDateTime (java.time.OffsetDateTime and java.time.LocalDateTime are in module java.base of loader 'bootstrap')
	at org.apache.iceberg.data.parquet.BaseParquetWriter$TimestampWriter.write(BaseParquetWriter.java:269)
	at org.apache.iceberg.parquet.ParquetValueWriters$OptionWriter.write(ParquetValueWriters.java:348)
	at org.apache.iceberg.parquet.ParquetValueWriters$StructWriter.write(ParquetValueWriters.java:581)
	at org.apache.iceberg.parquet.ParquetWriter.add(ParquetWriter.java:135)
	at org.apache.iceberg.io.DataWriter.write(DataWriter.java:71)
	at org.apache.iceberg.io.BaseTaskWriter$RollingFileWriter.write(BaseTaskWriter.java:392)
	at org.apache.iceberg.io.BaseTaskWriter$RollingFileWriter.write(BaseTaskWriter.java:375)
	at org.apache.iceberg.io.BaseTaskWriter$BaseRollingWriter.write(BaseTaskWriter.java:307)
	at org.apache.iceberg.io.UnpartitionedWriter.write(UnpartitionedWriter.java:42)
	at io.airbyte.integrations.destination.iceberg.v2.IcebergStreamLoader.processRecords(IcebergStreamLoader.kt:54)
	at io.airbyte.cdk.load.task.implementor.DefaultProcessRecordsTask$execute$2$1.emit(ProcessRecordsTask.kt:61)
	at io.airbyte.cdk.load.task.implementor.DefaultProcessRecordsTask$execute$2$1$emit$1.invokeSuspend(ProcessRecordsTask.kt)
	at kotlin.coroutines.jvm.internal.BaseContinuationImpl.resumeWith(ContinuationImpl.kt:33)

@subodh1810
Copy link
Contributor Author

something might still be busted here? I tried enabling testBasicTypes on #49848

and the connector crashes on some timestamp value:

java.lang.ClassCastException: class java.time.OffsetDateTime cannot be cast to class java.time.LocalDateTime (java.time.OffsetDateTime and java.time.LocalDateTime are in module java.base of loader 'bootstrap')
	at org.apache.iceberg.data.parquet.BaseParquetWriter$TimestampWriter.write(BaseParquetWriter.java:269)
	at org.apache.iceberg.parquet.ParquetValueWriters$OptionWriter.write(ParquetValueWriters.java:348)
	at org.apache.iceberg.parquet.ParquetValueWriters$StructWriter.write(ParquetValueWriters.java:581)
	at org.apache.iceberg.parquet.ParquetWriter.add(ParquetWriter.java:135)
	at org.apache.iceberg.io.DataWriter.write(DataWriter.java:71)
	at org.apache.iceberg.io.BaseTaskWriter$RollingFileWriter.write(BaseTaskWriter.java:392)
	at org.apache.iceberg.io.BaseTaskWriter$RollingFileWriter.write(BaseTaskWriter.java:375)
	at org.apache.iceberg.io.BaseTaskWriter$BaseRollingWriter.write(BaseTaskWriter.java:307)
	at org.apache.iceberg.io.UnpartitionedWriter.write(UnpartitionedWriter.java:42)
	at io.airbyte.integrations.destination.iceberg.v2.IcebergStreamLoader.processRecords(IcebergStreamLoader.kt:54)
	at io.airbyte.cdk.load.task.implementor.DefaultProcessRecordsTask$execute$2$1.emit(ProcessRecordsTask.kt:61)
	at io.airbyte.cdk.load.task.implementor.DefaultProcessRecordsTask$execute$2$1$emit$1.invokeSuspend(ProcessRecordsTask.kt)
	at kotlin.coroutines.jvm.internal.BaseContinuationImpl.resumeWith(ContinuationImpl.kt:33)

@edgao raised PR to fix this

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
area/connectors Connector related issues CDK Connector Development Kit connectors/destination/iceberg-v2
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants