-
Notifications
You must be signed in to change notification settings - Fork 96
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Implement EtagCacheResponseProcessor #311
Conversation
1d5f943
to
41dd73d
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Needs tests.
try { | ||
((ObjectNode) json).set( | ||
DruidJsonResponseContentKeys.RESPONSE.getName(), | ||
mapper.readTree(dataCache.getDataValue(DruidJsonResponseContentKeys.RESPONSE.getName())) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It might be slightly more efficient to keep this around rather than reserialize on cache hit. But I don't think it matters much.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Did you mean assigning it to a variable?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes
private final ResponseProcessor next; | ||
private final QuerySigningService<Long> querySigningService; | ||
private final @NotNull TupleDataCache<String, Long, String> dataCache; | ||
private final JsonNode respnoseCache; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
s/respnose/response
private final ObjectWriter writer; | ||
private final ResponseProcessor next; | ||
private final QuerySigningService<Long> querySigningService; | ||
private final @NotNull TupleDataCache<String, Long, String> dataCache; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Put this annotation on the constructor parameter
} | ||
|
||
int statusCode = json.get(DruidJsonResponseContentKeys.STATUS_CODE.getName()).asInt(); | ||
if (statusCode == Status.NOT_MODIFIED.getStatusCode()) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could we get some more comments in here around what's supposed to be happening? I'd much rather follow comments for understanding than having to trace code closely to understand it.
try { | ||
((ObjectNode) json).set( | ||
DruidJsonResponseContentKeys.RESPONSE.getName(), | ||
respnoseCache == null |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Won't this always be null
? Perhaps I'm missing something, but why are we bothering with a conditional? Or perhaps the constructor was supposed to allow this to be set?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nice catch!
|
||
try { | ||
dataCache.set( | ||
DruidJsonResponseContentKeys.RESPONSE.getName(), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Using "response" as the cache key seems like a horrible idea... every query is going to overwrite this. Why not use the Druid query, like the V2 processor does?
String.format( | ||
'{"%s": %d}', | ||
DruidJsonResponseContentKeys.STATUS_CODE.getName(), | ||
NOT_MODIFIED.getStatusCode() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Use Groovy's string interpolation here, it's much cleaner:
"""{"${DruidJsonResponseContentKeys.STATUS_CODE.name}": $NOT_MODIFIED.statusCode}"""
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Beautiful!
etagCacheResponseProcessor.processResponse(json, druidAggregationQuery, Mock(LoggingContext)) | ||
|
||
then: | ||
then: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
2 then:
tags?
String.format( | ||
'{"%s": %d}', | ||
DruidJsonResponseContentKeys.STATUS_CODE.getName(), | ||
OK.getStatusCode() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Use groovy string templating
given: | ||
querySigningService.getSegmentSetId(_ as DruidAggregationQuery) >> Optional.empty() | ||
JsonNode json = MAPPER.readTree( | ||
String.format( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Use string templates
|
||
then: | ||
1 * dataCache.set(DruidJsonResponseContentKeys.RESPONSE.getName(), null, 'null') | ||
1 * dataCache.set(DruidJsonResponseContentKeys.ETAG.getName(), null, '"someEtag"') |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This looks wrong... are we really setting the cache value to "someEtag"
? (ie. including the quotes around the string)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah. I have removed it. Thanks!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ask me if you have questions or doubt.
private final @NotNull TupleDataCache<String, Long, String> dataCache; | ||
private final JsonNode respnoseCache; | ||
|
||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Extra line
public EtagCacheResponseProcessor( | ||
ResponseProcessor next, | ||
TupleDataCache<String, Long, String> dataCache, | ||
QuerySigningService<Long> querySigningService, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should not need this anymore, since we have etag now.
private final ObjectWriter writer; | ||
private final ResponseProcessor next; | ||
private final QuerySigningService<Long> querySigningService; | ||
private final @NotNull TupleDataCache<String, Long, String> dataCache; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we need a implementation of this as TupleDataCache<String, String, String>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Right. I've changed it.
private final ResponseProcessor next; | ||
private final QuerySigningService<Long> querySigningService; | ||
private final @NotNull TupleDataCache<String, Long, String> dataCache; | ||
private final JsonNode respnoseCache; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
typo respnose -> response
? mapper.readTree( | ||
dataCache.getDataValue(DruidJsonResponseContentKeys.RESPONSE.getName()) | ||
) | ||
: respnoseCache |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What is this respnoseCache? This thing will always be null
right? It never got assigned from what I can see.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nice catch!
// make sure JSON response comes with etag | ||
if (!json.has(DruidJsonResponseContentKeys.ETAG.getName())) { | ||
logAndGetErrorCallback(ErrorMessageFormat.ETAG_MISSING_FROM_RESPONSE.format(), druidQuery); | ||
return; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
maybe the better behavior is to log a warning and simply move on without caching? @michael-mclawhorn @cdeszaq
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, cache should be best-effort. Log a warning and move on is the right thing to do. Not being able to safely cache doesn't prevent the response from being correctly returned.
querySigningService.getSegmentSetId(druidQuery).orElse(null), | ||
writer.writeValueAsString(json.get(DruidJsonResponseContentKeys.RESPONSE.getName())) | ||
); | ||
dataCache.set( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There should only be one dataCache
set only.
dataCache.set(
cacheKey,
etag,
writer.writeValueAsString(json.get(DruidJsonResponseContentKeys.RESPONSE.getName()))
);
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
cache key should be passed in to the class as constructor argument.
It looks like Gary caught most of my pending comments already and you've already fixed them. I think I'm about to 👍 |
: responseCache; | ||
((ObjectNode) json).set( | ||
DruidJsonResponseContentKeys.RESPONSE.getName(), | ||
responseCache |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I still don't see what is the meaning of responseCache
. This code is going to be only touched once so responseCache
is going to be null
and get assigned and never been used again outside?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Last comment most likely
} else if (statusCode == Status.OK.getStatusCode()) { // If response is a OK, cache it, including etag | ||
// make sure JSON response comes with etag | ||
if (!json.has(DruidJsonResponseContentKeys.ETAG.getName())) { | ||
logAndGetErrorCallback(ErrorMessageFormat.ETAG_MISSING_FROM_RESPONSE.format(), druidQuery); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
dont throw error as commented before, instead LOG.warn and skip dataCache update and directly falls to next responseProcessor
json.get(DruidJsonResponseContentKeys.RESPONSE.getName()) == MAPPER.readTree('[{"k1":"v1"}]') | ||
} | ||
|
||
def "processResponse reports error when status code is OK but etag is missing and processResponse moves on without caching"() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
change behavior accordingly
350efed
to
1b85fea
Compare
You can make codacy happy if you use fewer raw exception types and instead use a mix of illegal state or illegal argument as appropriate. |
@cdeszaq I'm going to clear your review if you don't have time to rereview in the near future. |
8efbe6a
to
0f3977e
Compare
1b85fea
to
dc8e413
Compare
mapper.readTree(dataCache.getDataValue(cacheKey)) | ||
); | ||
} catch (IOException ioe) { | ||
throw new IllegalStateException(ioe); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we should use the logAndGetErrorCallback
?
} catch (JsonProcessingException exception) { | ||
String message = "Unable to parse JSON response while caching"; | ||
LOG.error(message, exception); | ||
throw new RuntimeException(message, exception); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we should use the logAndGetErrorCallback
?
// response processor | ||
if (statusCode == Status.NOT_MODIFIED.getStatusCode()) { | ||
try { | ||
((ObjectNode) json).set( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we make a copy of the json? It feels a bit dangerous modifying the json we get by passing in.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I can make a copy, except that it makes it impossible for test to check what processed json looks like
try { | ||
dataCache.set( | ||
cacheKey, | ||
json.get(DruidJsonResponseContentKeys.ETAG.getName()).asText(), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There is a bug in the prepare for cache pr that is merged. here, we need to fix it in this pr. The way to fix it is to add a additional key and use the new key where appropriate. Refer to the RFC for the key we need.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Last few things, otherwise good.
json.get(DruidJsonResponseContentKeys.RESPONSE.getName()) == MAPPER.readTree('[{"k1":"v1"}]') | ||
} | ||
|
||
def "processResponse reports error when status code is OK but etag is missing and processResponse moves on without caching"() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Where does this test check error is reported?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I forgot to update test name. It keeps going without cache without reporting error
); | ||
} catch (JsonProcessingException exception) { | ||
logAndGetErrorCallback("Unable to parse JSON response while caching", druidQuery); | ||
return; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not need
); | ||
} catch (IOException ioe) { | ||
logAndGetErrorCallback(ioe.getLocalizedMessage(), druidQuery); | ||
return; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
not need
// make sure JSON response comes with status code | ||
if (!json.has(DruidJsonResponseContentKeys.STATUS_CODE.getName())) { | ||
logAndGetErrorCallback(ErrorMessageFormat.STATUS_CODE_MISSING_FROM_RESPONSE.format(), druidQuery); | ||
return; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
not need
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Well, if you assume logAndGetErrorCallback
does return
, it's actually not the case.
2 approvals already; the last reviewer probably don't have time
The 2nd PR implementing #255