Skip to content

Commit

Permalink
Implement JobStateFetcher
Browse files Browse the repository at this point in the history
  • Loading branch information
Vladislav Alekseev committed Sep 25, 2020
1 parent c34dfa8 commit ac7e50f
Show file tree
Hide file tree
Showing 25 changed files with 210 additions and 150 deletions.
50 changes: 27 additions & 23 deletions Sources/EmceeLib/Commands/RunTestsOnRemoteQueueCommand.swift
Original file line number Diff line number Diff line change
Expand Up @@ -214,44 +214,48 @@ public final class RunTestsOnRemoteQueueCommand: Command {
remoteCacheConfig: remoteCacheConfig,
testArgFile: testArgFile
)


Logger.debug("Will now wait for job queue to deplete")
try waitForJobQueueToDeplete(jobId: testArgFile.prioritizedJob.jobId)

Logger.debug("Fetching job results")
return try queueClient.jobResults(jobId: testArgFile.prioritizedJob.jobId)
}

private func waitForJobQueueToDeplete(jobId: JobId) throws {
var caughtSignal = false
SignalHandling.addSignalHandler(signals: [.int, .term]) { signal in
Logger.info("Caught \(signal) signal")
Logger.info("Will delete job \(testArgFile.prioritizedJob.jobId)")
_ = try? queueClient.delete(jobId: testArgFile.prioritizedJob.jobId)
caughtSignal = true
}

Logger.info("Will now wait for job queue to deplete")
try di.get(Waiter.self).waitWhile(pollPeriod: 30.0, description: "Wait for job queue to deplete") {
if caughtSignal { return false }
let jobState = try queueClient.jobState(jobId: testArgFile.prioritizedJob.jobId)
switch jobState.queueState {
case .deleted:
return false
case .running(let runningQueueState):
BucketQueueStateLogger(runningQueueState: runningQueueState).logQueueSize()
return !runningQueueState.isDepleted

let state = try fetchJobState(jobId: jobId)
switch state.queueState {
case .deleted: return false
case .running(let queueState): return !queueState.isDepleted
}
}
Logger.info("Will now fetch job results")
return try queueClient.jobResults(jobId: testArgFile.prioritizedJob.jobId)
}

private func fetchJobState(jobId: JobId) throws -> JobState {
let callbackWaiter: CallbackWaiter<Either<JobState, Error>> = try di.get(Waiter.self).createCallbackWaiter()
try di.get(JobStateFetcher.self).fetch(
jobId: jobId,
callbackQueue: callbackQueue,
completion: callbackWaiter.set
)
return try callbackWaiter.wait(timeout: .infinity, description: "").dematerialize()
}

private func selectPort(ports: Set<SocketModels.Port>) throws -> SocketModels.Port {
enum PortScanningError: Error, CustomStringConvertible {
case noQueuePortDetected

var description: String {
switch self {
case .noQueuePortDetected:
return "No running queue server found"
}
}
struct NoRunningQueueFoundError: Error, CustomStringConvertible {
var description: String { "No running queue server found" }
}

guard let port = ports.sorted().last else { throw PortScanningError.noQueuePortDetected }
guard let port = ports.sorted().last else { throw NoRunningQueueFoundError() }
return port
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,15 +33,13 @@ public final class BucketResultSenderImpl: BucketResultSender {
request: request,
callbackQueue: callbackQueue,
callback: { (response: Either<BucketResultAcceptResponse, RequestSenderError>) in
do {
let value = try response.dematerialize()
switch value {
case .bucketResultAccepted(let bucketId):
completion(Either<BucketId, Error>.success(bucketId))
completion(
response.mapResult {
switch $0 {
case .bucketResultAccepted(let bucketId): return bucketId
}
}
} catch {
completion(Either<BucketId, Error>.error(error))
}
)
}
)
}
Expand Down
12 changes: 12 additions & 0 deletions Sources/QueueClient/Clients/JobStateFetcher/JobStateFetcher.swift
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
import Dispatch
import Foundation
import QueueModels
import Types

public protocol JobStateFetcher {
func fetch(
jobId: JobId,
callbackQueue: DispatchQueue,
completion: @escaping ((Either<JobState, Error>) -> ())
)
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
import Dispatch
import Foundation
import QueueModels
import RESTMethods
import RequestSender
import Types

public final class JobStateFetcherImpl: JobStateFetcher {
private let requestSender: RequestSender

public init(requestSender: RequestSender) {
self.requestSender = requestSender
}

public func fetch(
jobId: JobId,
callbackQueue: DispatchQueue,
completion: @escaping ((Either<JobState, Error>) -> ())
) {
let request = JobStateRequest(
payload: JobStatePayload(
jobId: jobId
)
)
requestSender.sendRequestWithCallback(
request: request,
callbackQueue: callbackQueue
) { (result: Either<JobStateResponse, RequestSenderError>) in
completion(result.mapResult { $0.jobState })
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,15 +20,13 @@ public final class QueueServerVersionFetcherImpl: QueueServerVersionFetcher {
request: QueueVersionRequest(),
callbackQueue: callbackQueue,
callback: { (result: Either<QueueVersionResponse, RequestSenderError>) in
do {
let response = try result.dematerialize()
switch response {
case .queueVersion(let version):
completion(Either.success(version))
completion(
result.mapResult {
switch $0 {
case .queueVersion(let version): return version
}
}
} catch {
completion(Either.error(error))
}
)
}
)
}
Expand Down
16 changes: 6 additions & 10 deletions Sources/QueueClient/Clients/TestScheduler/TestSchedulerImpl.swift
Original file line number Diff line number Diff line change
Expand Up @@ -32,17 +32,13 @@ public final class TestSchedulerImpl: TestScheduler {
),
callbackQueue: callbackQueue
) { (result: Either<ScheduleTestsResponse, RequestSenderError>) in
do {
let response = try result.dematerialize()
switch response {
case .scheduledTests:
Logger.debug("Successfully scheduled \(testEntryConfigurations.count) tests")
completion(.success(()))
completion(
result.mapResult {
switch $0 {
case .scheduledTests: return ()
}
}
} catch {
Logger.error("Failed to schedule \(testEntryConfigurations.count) tests: \(error)")
completion(.error(error))
}
)
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,9 @@ public final class WorkerDisablerImpl: WorkerDisabler {
request: DisableWorkerRequest(payload: DisableWorkerPayload(workerId: workerId)),
callbackQueue: callbackQueue,
callback: { (result: Either<WorkerDisabledResponse, RequestSenderError>) in
do {
let response = try result.dematerialize()
completion(.success(response.workerId))
} catch {
completion(.error(error))
}
completion(
result.mapResult { $0.workerId }
)
}
)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,9 @@ public final class WorkerEnablerImpl: WorkerEnabler {
request: EnableWorkerRequest(payload: EnableWorkerPayload(workerId: workerId)),
callbackQueue: callbackQueue,
callback: { (result: Either<WorkerEnabledResponse, RequestSenderError>) in
do {
let response = try result.dematerialize()
completion(.success(response.workerId))
} catch {
completion(.error(error))
}
completion(
result.mapResult { $0.workerId }
)
}
)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,9 @@ public final class WorkerKickstarterImpl: WorkerKickstarter {
request: KickstartWorkerRequest(payload: KickstartWorkerPayload(workerId: workerId)),
callbackQueue: callbackQueue,
callback: { (result: Either<KickstartWorkerResponse, RequestSenderError>) in
do {
completion(.success(try result.dematerialize().workerId))
} catch {
completion(.error(error))
}
completion(
result.mapResult { $0.workerId }
)
}
)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,15 +31,14 @@ public final class WorkerRegistererImpl: WorkerRegisterer {
),
callbackQueue: callbackQueue,
callback: { (result: Either<RegisterWorkerResponse, RequestSenderError>) in
do {
let response = try result.dematerialize()
switch response {
case .workerRegisterSuccess(let workerConfiguration):
completion(Either.success(workerConfiguration))
completion(
result.mapResult {
switch $0 {
case .workerRegisterSuccess(let workerConfiguration):
return workerConfiguration
}
}
} catch {
completion(Either.error(error))
}
)
}
)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,9 @@ public final class WorkerStatusFetcherImpl: WorkerStatusFetcher {
request: WorkerStatusRequest(payload: WorkerStatusPayload()),
callbackQueue: callbackQueue
) { (result: Either<WorkerStatusResponse, RequestSenderError>) in
do {
let response = try result.dematerialize()
completion(.success(response.workerAliveness))
} catch {
completion(.error(error))
}
completion(
result.mapResult { $0.workerAliveness }
)
}
}
}
12 changes: 0 additions & 12 deletions Sources/QueueClient/QueueClient.swift
Original file line number Diff line number Diff line change
Expand Up @@ -41,14 +41,6 @@ public final class QueueClient {
)
}

public func fetchJobState(jobId: JobId) throws {
try sendRequest(
.jobState,
payload: JobStateRequest(jobId: jobId),
completionHandler: handleJobStateResponse
)
}

public func deleteJob(jobId: JobId) throws {
try sendRequest(
.jobDelete,
Expand Down Expand Up @@ -109,10 +101,6 @@ public final class QueueClient {

// MARK: - Response Handlers

private func handleJobStateResponse(response: JobStateResponse) {
delegate?.queueClient(self, didFetchJobState: response.jobState)
}

private func handleJobResultsResponse(response: JobResultsResponse) {
delegate?.queueClient(self, didFetchJobResults: response.jobResults)
}
Expand Down
1 change: 0 additions & 1 deletion Sources/QueueClient/QueueClientDelegate.swift
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ import QueueModels

public protocol QueueClientDelegate: class {
func queueClient(_ sender: QueueClient, didFailWithError error: QueueClientError)
func queueClient(_ sender: QueueClient, didFetchJobState jobState: JobState)
func queueClient(_ sender: QueueClient, didFetchJobResults jobResults: JobResults)
func queueClient(_ sender: QueueClient, didDeleteJob jobId: JobId)
}
19 changes: 0 additions & 19 deletions Sources/QueueClient/SynchronousQueueClient.swift
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ import Types
public final class SynchronousQueueClient: QueueClientDelegate {
private let queueClient: QueueClient
private var jobResultsResult: Either<JobResults, QueueClientError>?
private var jobStateResult: Either<JobState, QueueClientError>?
private var jobDeleteResult: Either<JobId, QueueClientError>?
private let syncQueue = DispatchQueue(label: "ru.avito.SynchronousQueueClient")
private let requestTimeout: TimeInterval
Expand Down Expand Up @@ -51,19 +50,6 @@ public final class SynchronousQueueClient: QueueClientDelegate {
}
}

public func jobState(jobId: JobId) throws -> JobState {
return try synchronize {
jobStateResult = nil
return try runRetrying {
try queueClient.fetchJobState(jobId: jobId)
try SynchronousWaiter().waitWhile(timeout: requestTimeout, description: "Wait for \(jobId) job state") {
self.jobStateResult == nil
}
return try jobStateResult!.dematerialize()
}
}
}

public func delete(jobId: JobId) throws -> JobId {
return try synchronize {
jobDeleteResult = nil
Expand Down Expand Up @@ -102,13 +88,8 @@ public final class SynchronousQueueClient: QueueClientDelegate {

public func queueClient(_ sender: QueueClient, didFailWithError error: QueueClientError) {
jobResultsResult = Either.error(error)
jobStateResult = Either.error(error)
jobDeleteResult = Either.error(error)
}

public func queueClient(_ sender: QueueClient, didFetchJobState jobState: JobState) {
jobStateResult = Either.success(jobState)
}

public func queueClient(_ sender: QueueClient, didFetchJobResults jobResults: JobResults) {
jobResultsResult = Either.success(jobResults)
Expand Down
4 changes: 2 additions & 2 deletions Sources/QueueServer/Endpoints/JobStateEndpoint.swift
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,14 @@ import RESTServer

public final class JobStateEndpoint: RESTEndpoint {
private let stateProvider: JobStateProvider
public let path: RESTPath = RESTMethod.jobState
public let path: RESTPath = JobStateRESTMethod()
public let requestIndicatesActivity = false

public init(stateProvider: JobStateProvider) {
self.stateProvider = stateProvider
}

public func handle(payload: JobStateRequest) throws -> JobStateResponse {
public func handle(payload: JobStatePayload) throws -> JobStateResponse {
let jobState = try stateProvider.state(jobId: payload.jobId)
return JobStateResponse(jobState: jobState)
}
Expand Down
1 change: 0 additions & 1 deletion Sources/RESTInterfaces/RESTMethod.swift
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ public enum RESTMethod: String, RESTPath {
case getBucket
case jobDelete
case jobResults
case jobState
case queueVersion
case registerWorker
case reportAlive
Expand Down
10 changes: 10 additions & 0 deletions Sources/RESTMethods/JobState/JobStatePayload.swift
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
import Foundation
import QueueModels

public final class JobStatePayload: Codable {
public let jobId: JobId

public init(jobId: JobId) {
self.jobId = jobId
}
}
8 changes: 8 additions & 0 deletions Sources/RESTMethods/JobState/JobStateRESTMethod.swift
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
import Foundation
import RESTInterfaces

public final class JobStateRESTMethod: RESTPath {
public init() {}

public var pathWithLeadingSlash: String { "/jobState" }
}
Loading

0 comments on commit ac7e50f

Please sign in to comment.