Skip to content

Commit

Permalink
Synchronous queries are no longer stored in ApiJobStore.
Browse files Browse the repository at this point in the history
-- The `delay` operator considers `Observable::onCompleted` to be an
"emitted item." Therefore, the job row was still being stored in the
ApiJobStore even when the query was synchronous.

--This has been fixed by replacing `delay` with a subscription, and/or
zip when appropriate.
  • Loading branch information
Andrew Cholewa committed Sep 23, 2016
1 parent 33787a2 commit 878a367
Show file tree
Hide file tree
Showing 2 changed files with 53 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -173,8 +173,8 @@ private Observable<String> buildStorePreResponseChain(
) {
// We don't want to store the result in the PreResponseStore unless the query is asynchronous. The query is
// asynchronous iff the asynchronousPayload emits at least one item.
return preResponseEmitter
.delay(ignored -> asynchronousPayload)
return asynchronousPayload
.flatMap(ignored -> preResponseEmitter)
.flatMap(preResponse -> preResponseStore.save(jobRow.getId(), preResponse));
}

Expand Down Expand Up @@ -203,9 +203,12 @@ private Observable<JobRow> buildUpdateRowChain(
) {
// The job status should not be updated until both the PreResponse has been stored, and the storage of the
// original Job ticket has been attempted.
return preResponseEmitter
.delay(ignored -> preResponseStoredNotification)
.delay(ignored -> jobRowStoredNotification)
return Observable.zip(
preResponseStoredNotification,
jobRowStoredNotification,
(ignored1, ignored2) -> preResponseEmitter
)
.flatMap(x -> x)
.map(PreResponse::getResponseContext)
.map(responseContext -> responseContext.containsKey(ResponseContextKeys.ERROR_MESSAGE.getName()))
.map(isError -> isError ?
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
package com.yahoo.bard.webservice.async

import com.yahoo.bard.webservice.util.JsonSlurper

/**
* Created by acholewa on 9/23/16.
*/
class SynchronousQueriesAreNotStoredSpec extends AsyncFunctionalSpec {
@Override
Map<String, Closure<String>> getResultsToTargetFunctions() {
return [
data: {"data/shapes/day"},
jobs: {"jobs"}
]
}

@Override
Map<String, Closure<Void>> getResultAssertions() {
return [
data: {assert it.status == 200},
jobs: {
assert !new JsonSlurper().parseText(it.readEntity(String)).jobs
}
]
}

@Override
Map<String, Closure<Map<String, List<String>>>> getQueryParameters() {
return [
data: {[
metrics: ["height"],
asyncAfter: ["never"],
dateTime: ["2016-08-30/2016-08-31"]
]},
jobs: {[
filters: ["userId-eq[greg]"]
]}
]
}

@Override
Closure<String> getFakeDruidResponse() {
return {"[]"}
}
}

0 comments on commit 878a367

Please sign in to comment.