-
Notifications
You must be signed in to change notification settings - Fork 3k
Core: Make metrics reporter serializable (alternative impl) #8032
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Core: Make metrics reporter serializable (alternative impl) #8032
Conversation
aws/src/main/java/org/apache/iceberg/aws/s3/signer/S3V4RestSignerClient.java
Outdated
Show resolved
Hide resolved
|
|
||
| this.httpClient = clientBuilder.build(); | ||
| try { | ||
| return DynMethods.builder("mapper").hiddenImpl(impl).buildStaticChecked().invoke(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we're relying on a static mapper() method, which isn't ideal. Currently we only have 2 classes that follow this pattern, namely RESTObjectMapper and S3ObjectMapper
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the other alternative to lazily load the object mapper I can think of is that we could make all Ser/De classes in RESTSerializers and S3ObjectMapper implement Serializable
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
oops, while making all Ser/De classes implement Serializable would work for normal Java ser/de, it doesn't for Kryo without kryo.register(ObjectMapper.class, new JavaSerializer()); as it fails with
Unable to find class: sun.reflect.GeneratedConstructorAccessor48
Serialization trace:
delegate (sun.reflect.DelegatingConstructorAccessorImpl)
constructorAccessor (java.lang.reflect.Constructor)
_constructor (com.fasterxml.jackson.databind.introspect.AnnotatedConstructor)
_defaultCreator (com.fasterxml.jackson.databind.deser.std.StdValueInstantiator)
_valueInstantiator (com.fasterxml.jackson.databind.deser.BeanDeserializer)
_rootDeserializers (com.fasterxml.jackson.databind.ObjectMapper)
mapper (org.apache.iceberg.rest.HTTPClient)
client (org.apache.iceberg.rest.RESTMetricsReporter)
reporters (org.apache.iceberg.metrics.MetricsReporters$CompositeMetricsReporter)
metricsReporter (org.apache.iceberg.SerializableTable)
com.esotericsoftware.kryo.KryoException: Unable to find class: sun.reflect.GeneratedConstructorAccessor48
Serialization trace:
delegate (sun.reflect.DelegatingConstructorAccessorImpl)
constructorAccessor (java.lang.reflect.Constructor)
_constructor (com.fasterxml.jackson.databind.introspect.AnnotatedConstructor)
_defaultCreator (com.fasterxml.jackson.databind.deser.std.StdValueInstantiator)
_valueInstantiator (com.fasterxml.jackson.databind.deser.BeanDeserializer)
_rootDeserializers (com.fasterxml.jackson.databind.ObjectMapper)
mapper (org.apache.iceberg.rest.HTTPClient)
client (org.apache.iceberg.rest.RESTMetricsReporter)
reporters (org.apache.iceberg.metrics.MetricsReporters$CompositeMetricsReporter)
metricsReporter (org.apache.iceberg.SerializableTable)
at com.esotericsoftware.kryo.util.DefaultClassResolver.readName(DefaultClassResolver.java:160)
at com.esotericsoftware.kryo.util.DefaultClassResolver.readClass(DefaultClassResolver.java:133)
at com.esotericsoftware.kryo.Kryo.readClass(Kryo.java:693)
at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:118)
at com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:543)
at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:731)
40ff541 to
54ea53e
Compare
| if (null == mapper) { | ||
| synchronized (this) { | ||
| if (null == mapper) { | ||
| mapper = loadObjectMapperDynamically(properties.getOrDefault(OBJECT_MAPPER_IMPL, null)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think we should load this dynamically. Can we just add the signer request and response objects to the same object mapper? I'm sure there are other options.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've moved away from dynamically loading this and instead went with #8032 (comment)
|
|
||
| Object writeReplace() { | ||
| // fetch the latest headers from the AuthSession and carry them over in a separate supplier so | ||
| // that AuthSession doesn't have to be Serializable |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why change AuthSession to use a SerializableMap if it doesn't need to be serializable?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we need to change this for Kryo ser/de, otherwise it will fail with the below error. This is because we're passing a Lambda with the headers from AuthSession and Kryo ser/des everything in the scope of that Lambda
headers (org.apache.iceberg.rest.RESTMetricsReporter)
reporters (org.apache.iceberg.metrics.MetricsReporters$CompositeMetricsReporter)
metricsReporter (org.apache.iceberg.SerializableTable)
com.esotericsoftware.kryo.KryoException: java.lang.RuntimeException: Could not serialize lambda
Serialization trace:
headers (org.apache.iceberg.rest.RESTMetricsReporter)
reporters (org.apache.iceberg.metrics.MetricsReporters$CompositeMetricsReporter)
metricsReporter (org.apache.iceberg.SerializableTable)
at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:144)
at com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:543)
at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:813)
at com.esotericsoftware.kryo.serializers.CollectionSerializer.read(CollectionSerializer.java:134)
at com.esotericsoftware.kryo.serializers.CollectionSerializer.read(CollectionSerializer.java:40)
at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:731)
at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:125)
at com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:543)
at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:731)
at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:125)
at com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:543)
at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:813)
at org.apache.iceberg.TestHelpers$KryoHelpers.roundTripSerialize(TestHelpers.java:294)
at org.apache.iceberg.rest.TestRESTCatalog.testScanReportingOnSerializableTable(TestRESTCatalog.java:2033)
....
Caused by: java.lang.RuntimeException: Could not serialize lambda
at com.esotericsoftware.kryo.serializers.ClosureSerializer.read(ClosureSerializer.java:78)
at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:731)
at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:125)
... 97 more
Caused by: com.esotericsoftware.kryo.KryoException: java.lang.UnsupportedOperationException
Serialization trace:
headers (org.apache.iceberg.rest.auth.OAuth2Util$AuthSession)
capturedArgs (java.lang.invoke.SerializedLambda)
at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:144)
at com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:543)
at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:731)
at com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.read(DefaultArraySerializers.java:391)
at com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.read(DefaultArraySerializers.java:302)
at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:731)
at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:125)
at com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:543)
at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:709)
at com.esotericsoftware.kryo.serializers.ClosureSerializer.read(ClosureSerializer.java:75)
... 99 more
Caused by: java.lang.UnsupportedOperationException
at org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap.put(ImmutableMap.java:781)
at com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:162)
at com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:39)
at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:731)
at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:125)
For standard Java ser/de we're relying on the writeReplace() method. I've updated the comment to make it clear that this is only for standard Java ser/de
| @Test | ||
| public void testScanReportingOnSerializableTable() throws IOException, ClassNotFoundException { | ||
| Table table = catalog().buildTable(TABLE, SCHEMA).create(); | ||
| table |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we not send scan metrics if the table is empty?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
correct, we don't send anything if the table doesn't have any files to do scan planning for
|
Thanks, @nastra! This looks like it's a lot closer than the alternative. |
ecb2f65 to
9f01740
Compare
28b02bd to
a55a0a3
Compare
a55a0a3 to
74e0263
Compare
|
This pull request has been marked as stale due to 30 days of inactivity. It will be closed in 1 week if no further activity occurs. If you think that’s incorrect or this pull request requires a review, please simply write any comment. If closed, you can revive the PR at any time and @mention a reviewer or discuss it on the dev@iceberg.apache.org list. Thank you for your contributions. |
|
This pull request has been marked as stale due to 30 days of inactivity. It will be closed in 1 week if no further activity occurs. If you think that’s incorrect or this pull request requires a review, please simply write any comment. If closed, you can revive the PR at any time and @mention a reviewer or discuss it on the dev@iceberg.apache.org list. Thank you for your contributions. |
|
@manuzhang the PR is still valid but I didn't have time to re-visit it and fix the merge conflicts. I'll do that next week or so and hopefully @rdblue can review it then |
|
This pull request has been marked as stale due to 30 days of inactivity. It will be closed in 1 week if no further activity occurs. If you think that’s incorrect or this pull request requires a review, please simply write any comment. If closed, you can revive the PR at any time and @mention a reviewer or discuss it on the dev@iceberg.apache.org list. Thank you for your contributions. |
|
This pull request has been closed due to lack of activity. This is not a judgement on the merit of the PR in any way. It is just a way of keeping the PR queue manageable. If you think that is incorrect, or the pull request requires review, you can revive the PR at any time. |
This is an alternative implementation to #7370. This PR avoids breaking the API and rather makes the
HTTPClientserializable, where the underlyingCloseableHttpClientandObjectMapperare both loaded dynamically