Skip to content

Commit

Permalink
JobResultFetcher
Browse files Browse the repository at this point in the history
  • Loading branch information
Vladislav Alekseev committed Sep 28, 2020
1 parent ac7e50f commit abd2741
Show file tree
Hide file tree
Showing 26 changed files with 286 additions and 383 deletions.
8 changes: 0 additions & 8 deletions Package.swift
Original file line number Diff line number Diff line change
Expand Up @@ -1137,7 +1137,6 @@ let package = Package(
"DistWorkerModels",
"Logging",
"QueueModels",
"RESTInterfaces",
"RESTMethods",
"RequestSender",
"ScheduleStrategy",
Expand All @@ -1153,22 +1152,15 @@ let package = Package(
// MARK: QueueClientTests
name: "QueueClientTests",
dependencies: [
"BuildArtifactsTestHelpers",
"DistWorkerModels",
"DistWorkerModelsTestHelpers",
"QueueClient",
"QueueModels",
"QueueModelsTestHelpers",
"RESTInterfaces",
"RESTMethods",
"RequestSender",
"RequestSenderTestHelpers",
"RunnerModels",
"RunnerTestHelpers",
"SimulatorPoolTestHelpers",
"SocketModels",
"Swifter",
"SynchronousWaiter",
"TestHelpers",
"Types",
"WorkerAlivenessModels",
Expand Down
58 changes: 45 additions & 13 deletions Sources/EmceeLib/Commands/RunTestsOnRemoteQueueCommand.swift
Original file line number Diff line number Diff line change
Expand Up @@ -196,16 +196,27 @@ public final class RunTestsOnRemoteQueueCommand: Command {
),
for: TestDiscoveryQuerier.self
)

let queueClient = SynchronousQueueClient(queueServerAddress: queueServerAddress)
di.set(
JobStateFetcherImpl(
requestSender: try di.get(RequestSenderProvider.self).requestSender(socketAddress: queueServerAddress)
),
for: JobStateFetcher.self
)
di.set(
JobResultsFetcherImpl(
requestSender: try di.get(RequestSenderProvider.self).requestSender(socketAddress: queueServerAddress)
),
for: JobResultsFetcher.self
)
di.set(
JobDeleterImpl(
requestSender: try di.get(RequestSenderProvider.self).requestSender(socketAddress: queueServerAddress)
),
for: JobDeleter.self
)

defer {
Logger.info("Will delete job \(testArgFile.prioritizedJob)")
do {
_ = try queueClient.delete(jobId: testArgFile.prioritizedJob.jobId)
} catch {
Logger.error("Failed to delete job \(testArgFile.prioritizedJob): \(error)")
}
deleteJob(jobId: testArgFile.prioritizedJob.jobId)
}

try JobPreparer(di: di).formJob(
Expand All @@ -215,11 +226,8 @@ public final class RunTestsOnRemoteQueueCommand: Command {
testArgFile: testArgFile
)

Logger.debug("Will now wait for job queue to deplete")
try waitForJobQueueToDeplete(jobId: testArgFile.prioritizedJob.jobId)

Logger.debug("Fetching job results")
return try queueClient.jobResults(jobId: testArgFile.prioritizedJob.jobId)
return try fetchJobResults(jobId: testArgFile.prioritizedJob.jobId)
}

private func waitForJobQueueToDeplete(jobId: JobId) throws {
Expand All @@ -229,7 +237,7 @@ public final class RunTestsOnRemoteQueueCommand: Command {
caughtSignal = true
}

try di.get(Waiter.self).waitWhile(pollPeriod: 30.0, description: "Wait for job queue to deplete") {
try di.get(Waiter.self).waitWhile(pollPeriod: 30.0, description: "Waiting for job queue to deplete") {
if caughtSignal { return false }

let state = try fetchJobState(jobId: jobId)
Expand All @@ -240,6 +248,16 @@ public final class RunTestsOnRemoteQueueCommand: Command {
}
}

private func fetchJobResults(jobId: JobId) throws -> JobResults {
let callbackWaiter: CallbackWaiter<Either<JobResults, Error>> = try di.get(Waiter.self).createCallbackWaiter()
try di.get(JobResultsFetcher.self).fetch(
jobId: jobId,
callbackQueue: callbackQueue,
completion: callbackWaiter.set
)
return try callbackWaiter.wait(timeout: .infinity, description: "Fetching job results").dematerialize()
}

private func fetchJobState(jobId: JobId) throws -> JobState {
let callbackWaiter: CallbackWaiter<Either<JobState, Error>> = try di.get(Waiter.self).createCallbackWaiter()
try di.get(JobStateFetcher.self).fetch(
Expand All @@ -258,4 +276,18 @@ public final class RunTestsOnRemoteQueueCommand: Command {
guard let port = ports.sorted().last else { throw NoRunningQueueFoundError() }
return port
}

private func deleteJob(jobId: JobId) {
do {
let callbackWaiter: CallbackWaiter<Either<(), Error>> = try di.get(Waiter.self).createCallbackWaiter()
try di.get(JobDeleter.self).delete(
jobId: jobId,
callbackQueue: callbackQueue,
completion: callbackWaiter.set
)
try callbackWaiter.wait(timeout: .infinity, description: "Deleting job").dematerialize()
} catch {
Logger.error("Failed to delete job: \(error)")
}
}
}
12 changes: 12 additions & 0 deletions Sources/QueueClient/Clients/JobDeleter/JobDeleter.swift
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
import Dispatch
import Foundation
import QueueModels
import Types

public protocol JobDeleter {
func delete(
jobId: JobId,
callbackQueue: DispatchQueue,
completion: @escaping (Either<(), Error>) -> ()
)
}
32 changes: 32 additions & 0 deletions Sources/QueueClient/Clients/JobDeleter/JobDeleterImpl.swift
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
import Dispatch
import Foundation
import QueueModels
import RESTMethods
import RequestSender
import Types

public final class JobDeleterImpl: JobDeleter {
private let requestSender: RequestSender

public init(requestSender: RequestSender) {
self.requestSender = requestSender
}

public func delete(
jobId: JobId,
callbackQueue: DispatchQueue,
completion: @escaping (Either<(), Error>) -> ()
) {
let request = JobDeleteRequest(
payload: JobDeletePayload(
jobId: jobId
)
)
requestSender.sendRequestWithCallback(
request: request,
callbackQueue: callbackQueue
) { (result: Either<JobDeleteResponse, RequestSenderError>) in
completion(result.mapResult { _ in () })
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
import Dispatch
import Foundation
import QueueModels
import Types

public protocol JobResultsFetcher {
func fetch(
jobId: JobId,
callbackQueue: DispatchQueue,
completion: @escaping (Either<JobResults, Error>) -> ()
)
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
import Foundation
import QueueModels
import RESTMethods
import RequestSender
import Types

public final class JobResultsFetcherImpl: JobResultsFetcher {
private let requestSender: RequestSender

public init(requestSender: RequestSender) {
self.requestSender = requestSender
}

public func fetch(
jobId: JobId,
callbackQueue: DispatchQueue,
completion: @escaping (Either<JobResults, Error>) -> ()
) {
let request = JobResultRequest(
payload: JobResultsPayload(
jobId: jobId
)
)

requestSender.sendRequestWithCallback(
request: request,
callbackQueue: callbackQueue,
callback: { (response: Either<JobResultsResponse, RequestSenderError>) in
completion(
response.mapResult { $0.jobResults }
)
}
)
}
}
111 changes: 0 additions & 111 deletions Sources/QueueClient/QueueClient.swift

This file was deleted.

8 changes: 0 additions & 8 deletions Sources/QueueClient/QueueClientDelegate.swift

This file was deleted.

27 changes: 0 additions & 27 deletions Sources/QueueClient/QueueClientError.swift

This file was deleted.

Loading

0 comments on commit abd2741

Please sign in to comment.