-
Notifications
You must be signed in to change notification settings - Fork 3k
AWS: Add SQS MetricsReporter #7444
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
amogh-jahagirdar
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for publishing @rajarshisarkar had some comments, also can we add unit tests?
aws/src/main/java/org/apache/iceberg/aws/metrics/SqsMetricsReporter.java
Outdated
Show resolved
Hide resolved
aws/src/main/java/org/apache/iceberg/aws/metrics/SqsMetricsReporter.java
Outdated
Show resolved
Hide resolved
aws/src/main/java/org/apache/iceberg/aws/metrics/SqsMetricsReporter.java
Outdated
Show resolved
Hide resolved
aws/src/main/java/org/apache/iceberg/aws/metrics/SqsMetricsReporter.java
Show resolved
Hide resolved
aws/src/main/java/org/apache/iceberg/aws/metrics/SqsMetricsReporter.java
Outdated
Show resolved
Hide resolved
aws/src/main/java/org/apache/iceberg/aws/metrics/SqsMetricsReporter.java
Outdated
Show resolved
Hide resolved
|
Feels like we should use the async client for this? We should not be blocked by metrics reporting calls |
269d051 to
2ecb6b5
Compare
This would mean we introduce the AsyncHttp clients too: https://docs.aws.amazon.com/sdk-for-java/latest/developer-guide/http-configuration.html#http-clients-available. Thoughts? |
aws/src/main/java/org/apache/iceberg/aws/metrics/SqsMetricsReporter.java
Outdated
Show resolved
Hide resolved
aws/src/main/java/org/apache/iceberg/aws/metrics/SqsMetricsReporter.java
Outdated
Show resolved
Hide resolved
172fca2 to
f18775e
Compare
Added |
aws/src/main/java/org/apache/iceberg/aws/metrics/SqsClientFactory.java
Outdated
Show resolved
Hide resolved
aws/src/main/java/org/apache/iceberg/aws/metrics/SqsMetricsReporter.java
Outdated
Show resolved
Hide resolved
aws/src/main/java/org/apache/iceberg/aws/metrics/SqsMetricsReporterProperties.java
Outdated
Show resolved
Hide resolved
aws/src/main/java/org/apache/iceberg/aws/metrics/SqsMetricsReporter.java
Outdated
Show resolved
Hide resolved
aws/src/main/java/org/apache/iceberg/aws/metrics/SqsMetricsReporter.java
Outdated
Show resolved
Hide resolved
aws/src/main/java/org/apache/iceberg/aws/metrics/SqsMetricsReporter.java
Show resolved
Hide resolved
aws/src/test/java/org/apache/iceberg/aws/metrics/SqsMetricsReporterTest.java
Outdated
Show resolved
Hide resolved
bf2cabc to
5812918
Compare
|
Introduced |
73d443e to
9d4fd29
Compare
aws/src/main/java/org/apache/iceberg/aws/metrics/SqsMetricsReporter.java
Outdated
Show resolved
Hide resolved
aws/src/main/java/org/apache/iceberg/aws/metrics/SqsMetricsReporter.java
Outdated
Show resolved
Hide resolved
9d4fd29 to
9c919ce
Compare
| * <p>For more details, see | ||
| * https://sdk.amazonaws.com/java/api/latest/software/amazon/awssdk/http/nio/netty/NettyNioAsyncHttpClient.Builder.html | ||
| */ | ||
| public static final String NETTYNIO_WRITE_TIMEOUT = "http-client.nettynio.write-timeout"; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: can you add time units to this, like -ms?
| * <p>For more details, see | ||
| * https://sdk.amazonaws.com/java/api/latest/software/amazon/awssdk/http/nio/netty/NettyNioAsyncHttpClient.Builder.html | ||
| */ | ||
| public static final String NETTYNIO_READ_TIMEOUT = "http-client.nettynio.read-timeout"; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: can you add time units to this, like -ms?
| PropertyUtil.propertyAsString( | ||
| properties, | ||
| SqsMetricsReporterProperties.METRICS_REPORTER_SQS_CLIENT_FACTORY_IMPL, | ||
| DefaultSqsMetricsReporterAwsClientFactory.class.getName()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
similar to the old client factory, we should be able to make DefaultSqsMetricsReporterAwsClientFactory a singleton, and directly return that if METRICS_REPORTER_SQS_CLIENT_FACTORY_IMPL is not set.
| import org.apache.iceberg.relocated.com.google.common.collect.Maps; | ||
| import software.amazon.awssdk.services.sqs.SqsAsyncClient; | ||
|
|
||
| public class DefaultSqsMetricsReporterAwsClientFactory |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this can be an inner package protected class in the factories class.
| message = ScanReportParser.toJson((ScanReport) report); | ||
| } | ||
|
|
||
| if (null == message) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this can be just else clause of the last clause? The parsers cannot return null.
| } | ||
| } | ||
| }); | ||
| } catch (Exception e) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can we at least use RuntimeException? Exception is too generic
| } | ||
| }); | ||
| } catch (Exception e) { | ||
| LOG.warn("Failed to report metrics to SQS queue {}", sqsQueueUrl, e); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this deserves a LOG.error?
| future.whenComplete( | ||
| (response, error) -> { | ||
| if (response != null) { | ||
| LOG.info("Metrics {} reported to: {}", response, sqsQueueUrl); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think we need to log the response? Just logging error when it exists seems sufficient to me, what do you think?
| } | ||
|
|
||
| @Test | ||
| public void report() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: use testXXX for testing methods
|
|
||
| @Test | ||
| public void report() { | ||
| sqsMetricsReporter.report(commitReport); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can we test both scan and commit reports, and more test cases for success and failure cases that you are catching in the implementation?
|
This pull request has been marked as stale due to 30 days of inactivity. It will be closed in 1 week if no further activity occurs. If you think that’s incorrect or this pull request requires a review, please simply write any comment. If closed, you can revive the PR at any time and @mention a reviewer or discuss it on the dev@iceberg.apache.org list. Thank you for your contributions. |
|
This pull request has been closed due to lack of activity. This is not a judgement on the merit of the PR in any way. It is just a way of keeping the PR queue manageable. If you think that is incorrect, or the pull request requires review, you can revive the PR at any time. |
This PR adds support for AWS SQS MetricsReporter.
Spark SQL launch command:
cc: @jackye1995 @amogh-jahagirdar @singhpk234