Skip to content

Commit

Permalink
Synchronous queries are no longer stored in ApiJobStore.
Browse files Browse the repository at this point in the history
-- The `delay` operator considers `Observable::onCompleted` to be an
"emitted item." Therefore, the job row was still being stored in the
ApiJobStore even when the query was synchronous.

--This has been fixed by replacing `delay` with a subscription, and/or
zip when appropriate.
  • Loading branch information
Andrew Cholewa committed Sep 23, 2016
1 parent 369787a commit 2313131
Show file tree
Hide file tree
Showing 2 changed files with 52 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -113,12 +113,14 @@ public AsynchronousWorkflows buildAsynchronousWorkflows(
// JobRow has been stored because otherwise we have a race condition where the version of the JobRow with
// a success/failure status could be written to the store first, and then immediately overwritten by the
// original JobRow, leaving the JobRow in a state of "pending" for its lifetime.
Observable<JobRow> jobRowUpdatedNotification = buildUpdateRowChain(
ConnectableObservable<JobRow> jobRowUpdatedNotification = buildUpdateRowChain(
jobRowStoredNotification,
preResponseStoredNotification,
errorHandlingPreResponseEmitter,
jobMetadata
);
)
.replay(1);
jobRowUpdatedNotification.connect();

return new AsynchronousWorkflows(
synchronousPayload,
Expand Down Expand Up @@ -174,7 +176,7 @@ private Observable<String> buildStorePreResponseChain(
// We don't want to store the result in the PreResponseStore unless the query is asynchronous. The query is
// asynchronous iff the asynchronousPayload emits at least one item.
return preResponseEmitter
.delay(ignored -> asynchronousPayload)
.zipWith(asynchronousPayload, (preResponse, ignored) -> preResponse)
.flatMap(preResponse -> preResponseStore.save(jobRow.getId(), preResponse));
}

Expand Down Expand Up @@ -204,8 +206,8 @@ private Observable<JobRow> buildUpdateRowChain(
// The job status should not be updated until both the PreResponse has been stored, and the storage of the
// original Job ticket has been attempted.
return preResponseEmitter
.delay(ignored -> preResponseStoredNotification)
.delay(ignored -> jobRowStoredNotification)
.zipWith(jobRowStoredNotification, (preResponse, ignored) -> preResponse)
.zipWith(preResponseStoredNotification, (preResponse, ignored) -> preResponse)
.map(PreResponse::getResponseContext)
.map(responseContext -> responseContext.containsKey(ResponseContextKeys.ERROR_MESSAGE.getName()))
.map(isError -> isError ?
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
package com.yahoo.bard.webservice.async

import com.yahoo.bard.webservice.util.JsonSlurper

/**
* Created by acholewa on 9/23/16.
*/
class SynchronousQueriesAreNotStoredSpec extends AsyncFunctionalSpec {
@Override
Map<String, Closure<String>> getResultsToTargetFunctions() {
return [
data: {"data/shapes/day"},
jobs: {"jobs"}
]
}

@Override
Map<String, Closure<Void>> getResultAssertions() {
return [
data: {assert it.status == 200},
jobs: {
assert !new JsonSlurper().parseText(it.readEntity(String)).jobs
}
]
}

@Override
Map<String, Closure<Map<String, List<String>>>> getQueryParameters() {
return [
data: {[
metrics: ["height"],
asyncAfter: ["never"],
dateTime: ["2016-08-30/2016-08-31"]
]},
jobs: {[
filters: ["userId-eq[greg]"]
]}
]
}

@Override
Closure<String> getFakeDruidResponse() {
return {"[]"}
}
}

0 comments on commit 2313131

Please sign in to comment.