From b0b2cc2c3a999d416c16d6e154417eacb831108b Mon Sep 17 00:00:00 2001 From: John Zhou <37914490+johnnzhou@users.noreply.github.com> Date: Thu, 5 Sep 2024 22:58:33 -0700 Subject: [PATCH] Refactor test client (#5) --- Package.swift | 7 +- Sources/Demo/Client.swift | 71 +++++++ Sources/{ => LCLSpeedtest}/Constants.swift | 8 +- .../{ => LCLSpeedtest}/DownloadClient.swift | 102 +++++----- Sources/{ => LCLSpeedtest}/Errors.swift | 0 .../Models/Measurement.swift | 6 + .../Models/TestServer.swift | 0 .../{ => LCLSpeedtest}/Models/TestType.swift | 0 Sources/{ => LCLSpeedtest}/Networking.swift | 0 .../{ => LCLSpeedtest}/SpeedTestClient.swift | 10 +- .../{ => LCLSpeedtest}/SpeedTestable.swift | 10 +- Sources/LCLSpeedtest/UploadClient.swift | 191 ++++++++++++++++++ .../Utils/WebSocketClosCode+Extension.swift | 0 Sources/UploadClient.swift | 185 ----------------- Sources/Utils/Date+Extension.swift | 26 --- 15 files changed, 339 insertions(+), 277 deletions(-) create mode 100644 Sources/Demo/Client.swift rename Sources/{ => LCLSpeedtest}/Constants.swift (81%) rename Sources/{ => LCLSpeedtest}/DownloadClient.swift (51%) rename Sources/{ => LCLSpeedtest}/Errors.swift (100%) rename Sources/{ => LCLSpeedtest}/Models/Measurement.swift (94%) rename Sources/{ => LCLSpeedtest}/Models/TestServer.swift (100%) rename Sources/{ => LCLSpeedtest}/Models/TestType.swift (100%) rename Sources/{ => LCLSpeedtest}/Networking.swift (100%) rename Sources/{ => LCLSpeedtest}/SpeedTestClient.swift (90%) rename Sources/{ => LCLSpeedtest}/SpeedTestable.swift (95%) create mode 100644 Sources/LCLSpeedtest/UploadClient.swift rename Sources/{ => LCLSpeedtest}/Utils/WebSocketClosCode+Extension.swift (100%) delete mode 100644 Sources/UploadClient.swift delete mode 100644 Sources/Utils/Date+Extension.swift diff --git a/Package.swift b/Package.swift index 23801f1..073e9fa 100644 --- a/Package.swift +++ b/Package.swift @@ -12,7 +12,7 @@ let package = Package( .library(name: "LCLSpeedtest", targets: ["LCLSpeedtest"]) ], dependencies: [ - .package(url: "https://github.com/apple/swift-nio.git", from: "2.62.0"), + .package(url: "https://github.com/johnnzhou/swift-nio.git", branch: "main"), .package(url: "https://github.com/apple/swift-log.git", from: "1.5.3"), .package(url: "https://github.com/johnnzhou/websocket-kit.git", branch: "main") ], @@ -23,10 +23,11 @@ let package = Package( .product(name: "NIO", package: "swift-nio"), .product(name: "NIOCore", package: "swift-nio"), .product(name: "NIOPosix", package: "swift-nio"), - .product(name: "NIOHTTP1", package: "swift-nio"), + .product(name: "NIOConcurrencyHelpers", package: "swift-nio"), .product(name: "Logging", package: "swift-log"), .product(name: "WebSocketKit", package: "websocket-kit") ] - ) + ), + .executableTarget(name: "Demo", dependencies: ["LCLSpeedtest"]) ] ) diff --git a/Sources/Demo/Client.swift b/Sources/Demo/Client.swift new file mode 100644 index 0000000..b70993b --- /dev/null +++ b/Sources/Demo/Client.swift @@ -0,0 +1,71 @@ +import Foundation +import LCLSpeedtest + +@main +struct Client { + static func main() async throws { + var client = SpeedTestClient() + + client.onDownloadProgress = { progress in + print("progress: \(progress.convertTo(unit: .Mbps)) mbps") + } + try await client.start(with: .download) + } +} + +enum MeasurementUnit: String, CaseIterable, Identifiable, Encodable { + + case Mbps + case MBps + + var id: Self {self} + + var string: String { + switch self { + case .Mbps: + return "mbps" + case .MBps: + return "MB/s" + } + } +} + +extension MeasurementProgress { + + /// data in Mbps + var defaultValueInMegaBits: Double { + get { + self.convertTo(unit: .Mbps) + } + } + + /// data in MB/s + var defaultValueInMegaBytes: Double { + get { + self.convertTo(unit: .MBps) + } + } + + /** + Convert the measurement data to the given unit + + - Parameters: + unit: the target unit to convert to + - Returns: the value in `Double` under the specified unit measurement + */ + func convertTo(unit: MeasurementUnit) -> Double { + let elapsedTime = appInfo.elapsedTime + let numBytes = appInfo.numBytes + let time = Float64(elapsedTime) / 1000000 + var speed = Float64(numBytes) / time + switch unit { + case .Mbps: + speed *= 8 + case .MBps: + speed *= 1 + } + + speed /= 1000000 + return speed + } +} diff --git a/Sources/Constants.swift b/Sources/LCLSpeedtest/Constants.swift similarity index 81% rename from Sources/Constants.swift rename to Sources/LCLSpeedtest/Constants.swift index 6bd3e44..3e6cad1 100644 --- a/Sources/Constants.swift +++ b/Sources/LCLSpeedtest/Constants.swift @@ -19,13 +19,15 @@ let MAX_RETRY_COUNT: UInt8 = 5 let DISCOVER_SERVER_URL = "https://locate.measurementlab.net/v2/nearest/ndt/ndt7?client_name=ndt7-client-ios" /// Maximum message size to send in one websocket frame. -let MAX_MESSAGE_SIZE: Int = 1 << 24 +let MAX_MESSAGE_SIZE: Int = 1 << 23 /// Minimum message size to send in one websocket frame. let MIN_MESSAGE_SIZE: Int = 1 << 13 /// The number of second to update measurement report to the caller -let MEASUREMENT_REPORT_INTERVAL: Int64 = 250000 // 250 ms in microsecond +let MEASUREMENT_REPORT_INTERVAL: Int64 = 250 // 250 ms /// The number of second to measure the throughput. -let MEASUREMENT_DURATION: Int64 = 10000000 // 10 second in microsecond +let MEASUREMENT_DURATION: Int64 = 10 // 10 seconds + +let SCALING_FACTOR = 16 diff --git a/Sources/DownloadClient.swift b/Sources/LCLSpeedtest/DownloadClient.swift similarity index 51% rename from Sources/DownloadClient.swift rename to Sources/LCLSpeedtest/DownloadClient.swift index f0f38e8..806fb7c 100644 --- a/Sources/DownloadClient.swift +++ b/Sources/LCLSpeedtest/DownloadClient.swift @@ -18,44 +18,38 @@ import NIOWebSocket internal final class DownloadClient: SpeedTestable { private let url: URL - private let eventloop: MultiThreadedEventLoopGroup + private let eventloopGroup: MultiThreadedEventLoopGroup - private var startTime: Int64 - private var numBytes: Int64 - private var previousTimeMark: Int64 + private var startTime: NIODeadline + private var totalBytes: Int + private var previousTimeMark: NIODeadline private let jsonDecoder: JSONDecoder + private let emitter = DispatchQueue(label: "downloader", qos: .userInteractive) required init(url: URL) { self.url = url - self.eventloop = MultiThreadedEventLoopGroup.singleton - self.startTime = 0 - self.previousTimeMark = 0 - self.numBytes = 0 + self.eventloopGroup = MultiThreadedEventLoopGroup(numberOfThreads: 4) + self.startTime = .now() + self.previousTimeMark = .now() + self.totalBytes = 0 self.jsonDecoder = JSONDecoder() } - deinit { - do { - try self.eventloop.syncShutdownGracefully() - } catch { - fatalError("Failed to close channel gracefully: \(error)") - } - } - var onMeasurement: ((SpeedTestMeasurement) -> Void)? var onProgress: ((MeasurementProgress) -> Void)? var onFinish: ((MeasurementProgress, Error?) -> Void)? func start() throws -> EventLoopFuture { - let promise = self.eventloop.next().makePromise(of: Void.self) - try WebSocket.connect( + let promise = self.eventloopGroup.next().makePromise(of: Void.self) + WebSocket.connect( to: self.url, headers: self.httpHeaders, configuration: self.configuration, - on: self.eventloop + on: self.eventloopGroup ) { ws in print("websocket connected") - self.startTime = Date.nowInMicroSecond + + self.startTime = .now() ws.onText(self.onText) ws.onBinary(self.onBinary) @@ -68,36 +62,40 @@ internal final class DownloadClient: SpeedTestable { switch closeResult { case .success: if let onFinish = self.onFinish { - onFinish( - DownloadClient.generateMeasurementProgress( - startTime: self.startTime, - numBytes: self.numBytes, - direction: .download - ), - nil - ) + self.emitter.async { + onFinish( + DownloadClient.generateMeasurementProgress( + startTime: self.startTime, + numBytes: self.totalBytes, + direction: .download + ), + nil + ) + } } ws.close(code: .normalClosure, promise: promise) case .failure(let error): if let onFinish = self.onFinish { - onFinish( - DownloadClient.generateMeasurementProgress( - startTime: self.startTime, - numBytes: self.numBytes, - direction: .download - ), - error - ) + self.emitter.async { + onFinish( + DownloadClient.generateMeasurementProgress( + startTime: self.startTime, + numBytes: self.totalBytes, + direction: .download + ), + error + ) + } } ws.close(code: .goingAway, promise: promise) } } - }.wait() + }.cascadeFailure(to: promise) return promise.futureResult } func stop() throws { - var itr = self.eventloop.makeIterator() + var itr = self.eventloopGroup.makeIterator() while let next = itr.next() { try next.close() } @@ -107,9 +105,11 @@ internal final class DownloadClient: SpeedTestable { let buffer = ByteBuffer(string: text) do { let measurement: SpeedTestMeasurement = try jsonDecoder.decode(SpeedTestMeasurement.self, from: buffer) - self.numBytes += Int64(buffer.readableBytes) + self.totalBytes += buffer.readableBytes if let onMeasurement = self.onMeasurement { - onMeasurement(measurement) + self.emitter.async { + onMeasurement(measurement) + } } } catch { print("onText Error: \(error)") @@ -117,18 +117,20 @@ internal final class DownloadClient: SpeedTestable { } func onBinary(ws: WebSocket, bytes: ByteBuffer) { - self.numBytes += Int64(bytes.readableBytes) + self.totalBytes += bytes.readableBytes if let onProgress = self.onProgress { - let current = Date.nowInMicroSecond - if current - previousTimeMark >= MEASUREMENT_REPORT_INTERVAL { - onProgress( - DownloadClient.generateMeasurementProgress( - startTime: self.startTime, - numBytes: self.numBytes, - direction: .download + let current = NIODeadline.now() + if (current - self.previousTimeMark) > TimeAmount.milliseconds(MEASUREMENT_REPORT_INTERVAL) { + self.emitter.async { + onProgress( + DownloadClient.generateMeasurementProgress( + startTime: self.startTime, + numBytes: self.totalBytes, + direction: .download + ) ) - ) - previousTimeMark = current + } + self.previousTimeMark = current } } } diff --git a/Sources/Errors.swift b/Sources/LCLSpeedtest/Errors.swift similarity index 100% rename from Sources/Errors.swift rename to Sources/LCLSpeedtest/Errors.swift diff --git a/Sources/Models/Measurement.swift b/Sources/LCLSpeedtest/Models/Measurement.swift similarity index 94% rename from Sources/Models/Measurement.swift rename to Sources/LCLSpeedtest/Models/Measurement.swift index d6851bd..26eb902 100644 --- a/Sources/Models/Measurement.swift +++ b/Sources/LCLSpeedtest/Models/Measurement.swift @@ -127,4 +127,10 @@ extension MeasurementProgress { test: direction.rawValue ) } + + var mbps: Double { + let elapsedTime = appInfo.elapsedTime // microsecond + let numBytes = appInfo.numBytes + return Double(numBytes * 8) / Double(elapsedTime) + } } diff --git a/Sources/Models/TestServer.swift b/Sources/LCLSpeedtest/Models/TestServer.swift similarity index 100% rename from Sources/Models/TestServer.swift rename to Sources/LCLSpeedtest/Models/TestServer.swift diff --git a/Sources/Models/TestType.swift b/Sources/LCLSpeedtest/Models/TestType.swift similarity index 100% rename from Sources/Models/TestType.swift rename to Sources/LCLSpeedtest/Models/TestType.swift diff --git a/Sources/Networking.swift b/Sources/LCLSpeedtest/Networking.swift similarity index 100% rename from Sources/Networking.swift rename to Sources/LCLSpeedtest/Networking.swift diff --git a/Sources/SpeedTestClient.swift b/Sources/LCLSpeedtest/SpeedTestClient.swift similarity index 90% rename from Sources/SpeedTestClient.swift rename to Sources/LCLSpeedtest/SpeedTestClient.swift index 32c29c0..6b2aecb 100644 --- a/Sources/SpeedTestClient.swift +++ b/Sources/LCLSpeedtest/SpeedTestClient.swift @@ -66,11 +66,11 @@ public struct SpeedTestClient { } /// Run the download test using the available test servers - private mutating func runDownloadTest(using testServers: [TestServer]) async throws { - guard let downloadPath = testServers.first?.urls.downloadPath, - let downloadURL = URL(string: downloadPath) else { - throw SpeedTestError.invalidTestURL("Cannot locate URL for download test") - } + private mutating func runDownloadTest(using testServers: [TestServer]) async throws { + guard let downloadPath = testServers.first?.urls.downloadPath, + let downloadURL = URL(string: downloadPath) else { + throw SpeedTestError.invalidTestURL("Cannot locate URL for download test") + } downloader = DownloadClient(url: downloadURL) downloader?.onProgress = self.onDownloadProgress diff --git a/Sources/SpeedTestable.swift b/Sources/LCLSpeedtest/SpeedTestable.swift similarity index 95% rename from Sources/SpeedTestable.swift rename to Sources/LCLSpeedtest/SpeedTestable.swift index 7f34ac1..7db98a7 100644 --- a/Sources/SpeedTestable.swift +++ b/Sources/LCLSpeedtest/SpeedTestable.swift @@ -13,7 +13,7 @@ import Foundation import WebSocketKit import NIOWebSocket -import NIOCore + import NIOCore /// This protocol defines callbacks to monitor the speed test progress, including the measurement progress, /// measurement result, and potential errors when test finishes. @@ -91,13 +91,13 @@ extension SpeedTestable { /// /// - Returns: a `MeasurementProgress` containing the sampling period, number of bytes transmitted and test direction. static func generateMeasurementProgress( - startTime: Int64, - numBytes: Int64, + startTime: NIODeadline, + numBytes: Int, direction: TestDirection ) -> MeasurementProgress { return MeasurementProgress.create( - elapedTime: Date.nowInMicroSecond - startTime, - numBytes: numBytes, + elapedTime: (NIODeadline.now() - startTime).nanoseconds / 1000, + numBytes: Int64(numBytes), direction: direction ) } diff --git a/Sources/LCLSpeedtest/UploadClient.swift b/Sources/LCLSpeedtest/UploadClient.swift new file mode 100644 index 0000000..b497912 --- /dev/null +++ b/Sources/LCLSpeedtest/UploadClient.swift @@ -0,0 +1,191 @@ +// +// This source file is part of the LCL open source project +// +// Copyright (c) 2021-2024 Local Connectivity Lab and the project authors +// Licensed under Apache License v2.0 +// +// See LICENSE for license information +// See CONTRIBUTORS for the list of project authors +// +// SPDX-License-Identifier: Apache-2.0 +// + +import Foundation +import WebSocketKit +import NIOCore +import NIOPosix +import NIOWebSocket + +internal final class UploadClient: SpeedTestable { + private let url: URL + private let eventloopGroup: MultiThreadedEventLoopGroup + + private var startTime: NIODeadline + private var totalBytes: Int + private var previousTimeMark: NIODeadline + private let jsonDecoder: JSONDecoder + private let emitter = DispatchQueue(label: "uploader", qos: .userInteractive) + + required init(url: URL) { + self.url = url + self.eventloopGroup = MultiThreadedEventLoopGroup(numberOfThreads: 4) + self.startTime = .now() + self.previousTimeMark = .now() + self.totalBytes = 0 + self.jsonDecoder = JSONDecoder() + } + + var onMeasurement: ((SpeedTestMeasurement) -> Void)? + + var onProgress: ((MeasurementProgress) -> Void)? + + var onFinish: ((MeasurementProgress, Error?) -> Void)? + + func start() throws -> EventLoopFuture { + let promise = self.eventloopGroup.next().makePromise(of: Void.self) + WebSocket.connect( + to: self.url, + headers: self.httpHeaders, + queueSize: 1 << 26, + configuration: self.configuration, + on: self.eventloopGroup + ) { ws in + print("websocket connected") + + ws.onText(self.onText) + ws.onBinary(self.onBinary) + ws.onClose.whenComplete { result in + let closeResult = self.onClose( + closeCode: ws.closeCode ?? .unknown(WebSocketErrorCode.missingErrorCode), + closingResult: result + ) + switch closeResult { + case .success: + if let onFinish = self.onFinish { + self.emitter.async { + onFinish( + UploadClient.generateMeasurementProgress( + startTime: self.startTime, + numBytes: self.totalBytes, + direction: .upload + ), + nil + ) + } + } + ws.close(code: .normalClosure, promise: promise) + case .failure(let error): + if let onFinish = self.onFinish { + self.emitter.async { + onFinish( + UploadClient.generateMeasurementProgress( + startTime: self.startTime, + numBytes: self.totalBytes, + direction: .upload + ), + error + ) + } + } + ws.close(code: .goingAway, promise: promise) + } + } + + self.upload(using: ws).cascadeFailure(to: promise) + }.cascadeFailure(to: promise) + return promise.futureResult + } + + func stop() throws { + var itr = self.eventloopGroup.makeIterator() + while let next = itr.next() { + try next.close() + } + } + + func onText(ws: WebSocketKit.WebSocket, text: String) { + let buffer = ByteBuffer(string: text) + do { + let measurement: SpeedTestMeasurement = try self.jsonDecoder.decode(SpeedTestMeasurement.self, from: buffer) + if let onMeasurement = self.onMeasurement { + self.emitter.async { + onMeasurement(measurement) + } + } + } catch { + print("onText Error: \(error)") + } + } + + func onBinary(ws: WebSocket, bytes: ByteBuffer) { + do { + // this should not be invoked in upload test + try ws.close(code: .policyViolation).wait() + } catch { + print("Cannot close connection due to policy violation") + } + } + + /// Send as many bytes to the server as possible within the `MEASUREMENT_DURATION`. + /// Start the message size from `MIN_MESSAGE_SIZE` and increment the size according to the number of bytes queued. + /// We always assume that the buffer size in the websocket is 7 times the current load. + /// The system will always try to send as many bytes as possible, and will try to update the progress to the caller. + private func upload(using ws: WebSocket) -> EventLoopFuture { + self.startTime = NIODeadline.now() + + let el = self.eventloopGroup.next() + let promise = el.makePromise(of: Void.self) + + func send(newLoadSize: Int) { + guard NIODeadline.now() - self.startTime < TimeAmount.seconds(MEASUREMENT_DURATION) else { + promise.succeed() + return + } + let el = self.eventloopGroup.next() + ws.getBufferedBytes().hop(to: el).map { bufferedBytes in + let nextIncrementSize = newLoadSize > MAX_MESSAGE_SIZE ? MAX_MESSAGE_SIZE : SCALING_FACTOR * newLoadSize + let loadSize = (self.totalBytes - bufferedBytes > nextIncrementSize) ? newLoadSize * 2 : newLoadSize + if bufferedBytes < 7 * loadSize { + let payload = ws.allocator.buffer(repeating: 0, count: loadSize) + let p = el.makePromise(of: Void.self) + ws.send(payload, promise: p) + p.futureResult.cascadeFailure(to: promise) + self.totalBytes += loadSize + } + + ws.getBufferedBytes().hop(to: el).map { buffered in + self.reportToClient(currentBufferSize: buffered) + }.cascadeFailure(to: promise) + + send(newLoadSize: loadSize) + }.cascadeFailure(to: promise) + } + + self.previousTimeMark = .now() + send(newLoadSize: MIN_MESSAGE_SIZE) + + return promise.futureResult + } + + /// report the current measurement result to the caller using the current buffer size + /// if the time elapsed from the last report is greater than `MEASUREMENT_REPORT_INTERVAL`. + private func reportToClient(currentBufferSize: Int) { + guard let onProgress = self.onProgress else { + return + } + + let current = NIODeadline.now() + if (current - self.previousTimeMark) > TimeAmount.milliseconds(MEASUREMENT_REPORT_INTERVAL) { + self.emitter.async { + onProgress( + UploadClient.generateMeasurementProgress( + startTime: self.startTime, + numBytes: self.totalBytes - currentBufferSize, + direction: .upload + ) + ) + } + self.previousTimeMark = current + } + } +} diff --git a/Sources/Utils/WebSocketClosCode+Extension.swift b/Sources/LCLSpeedtest/Utils/WebSocketClosCode+Extension.swift similarity index 100% rename from Sources/Utils/WebSocketClosCode+Extension.swift rename to Sources/LCLSpeedtest/Utils/WebSocketClosCode+Extension.swift diff --git a/Sources/UploadClient.swift b/Sources/UploadClient.swift deleted file mode 100644 index ee9e0e4..0000000 --- a/Sources/UploadClient.swift +++ /dev/null @@ -1,185 +0,0 @@ -// -// This source file is part of the LCL open source project -// -// Copyright (c) 2021-2024 Local Connectivity Lab and the project authors -// Licensed under Apache License v2.0 -// -// See LICENSE for license information -// See CONTRIBUTORS for the list of project authors -// -// SPDX-License-Identifier: Apache-2.0 -// - -import Foundation -import WebSocketKit -import NIOCore -import NIOPosix -import NIOWebSocket - -internal final class UploadClient: SpeedTestable { - private let url: URL - private let eventloop: MultiThreadedEventLoopGroup - - private var startTime: Int64 - private var numBytes: Int64 - private var previousTimeMark: Int64 - private let jsonDecoder: JSONDecoder - - required init(url: URL) { - self.url = url - self.eventloop = MultiThreadedEventLoopGroup.singleton - self.startTime = 0 - self.previousTimeMark = 0 - self.numBytes = 0 - self.jsonDecoder = JSONDecoder() - } - - deinit { - do { - try self.eventloop.syncShutdownGracefully() - } catch { - fatalError("Failed to close channel gracefully: \(error)") - } - } - - var onMeasurement: ((SpeedTestMeasurement) -> Void)? - - var onProgress: ((MeasurementProgress) -> Void)? - - var onFinish: ((MeasurementProgress, Error?) -> Void)? - - func start() throws -> NIOCore.EventLoopFuture { - let promise = self.eventloop.next().makePromise(of: Void.self) - try WebSocket.connect( - to: self.url, - headers: self.httpHeaders, - queueSize: 1 << 26, - configuration: self.configuration, - on: self.eventloop - ) { ws in - print("websocket connected") - self.startTime = Date.nowInMicroSecond - - ws.onText(self.onText) - ws.onBinary(self.onBinary) - ws.onClose.whenComplete { result in - let closeResult = self.onClose( - closeCode: ws.closeCode ?? .unknown(WebSocketErrorCode.missingErrorCode), - closingResult: result - ) - switch closeResult { - case .success: - if let onFinish = self.onFinish { - onFinish( - UploadClient.generateMeasurementProgress( - startTime: self.startTime, - numBytes: self.numBytes, - direction: .upload - ), - nil - ) - } - ws.close(code: .normalClosure, promise: promise) - case .failure(let error): - if let onFinish = self.onFinish { - onFinish( - UploadClient.generateMeasurementProgress( - startTime: self.startTime, - numBytes: self.numBytes, - direction: .upload - ), - error - ) - } - ws.close(code: .goingAway, promise: promise) - } - } - - self.upload(using: ws) - }.wait() - return promise.futureResult - } - - func stop() throws { - var itr = self.eventloop.makeIterator() - while let next = itr.next() { - try next.close() - } - } - - func onText(ws: WebSocketKit.WebSocket, text: String) { - let buffer = ByteBuffer(string: text) - do { - let measurement: SpeedTestMeasurement = try jsonDecoder.decode(SpeedTestMeasurement.self, from: buffer) - if let onMeasurement = self.onMeasurement { - onMeasurement(measurement) - } - } catch { - print("onText Error: \(error)") - } - } - - func onBinary(ws: WebSocket, bytes: ByteBuffer) { - do { - // this should not be invoked in upload test - try ws.close(code: .policyViolation).wait() - } catch { - print("Cannot close connection due to policy violation") - } - } - - /// Send as many bytes to the server as possible within the `MEASUREMENT_DURATION`. - /// Start the message size from `MIN_MESSAGE_SIZE` and increment the size according to the number of bytes queued. - /// We always assume that the buffer size in the websocket is 7 times the current load. - /// The system will always try to send as many bytes as possible, and will try to update the progress to the caller. - private func upload(using ws: WebSocket) { - let start = Date.nowInMicroSecond - var currentLoad = MIN_MESSAGE_SIZE - while Date.nowInMicroSecond - start < MEASUREMENT_DURATION { - let loadSize = calibrateLoadSize(initial: currentLoad, ws: ws) - - if ws.bufferedBytes < 7 * loadSize { - let payload = ByteBuffer(repeating: 0, count: loadSize) - ws.send(payload) - currentLoad = loadSize - self.numBytes += Int64(currentLoad) - } - reportToClient(currentBufferSize: ws.bufferedBytes) - } - } - - /// report the current measurement result to the caller using the current buffer size - /// if the time elapsed from the last report is greater than `MEASUREMENT_REPORT_INTERVAL`. - private func reportToClient(currentBufferSize: Int) { - let current = Date.nowInMicroSecond - if let onProgress = self.onProgress { - if current - previousTimeMark >= MEASUREMENT_REPORT_INTERVAL { - onProgress( - UploadClient.generateMeasurementProgress( - startTime: self.startTime, - numBytes: self.numBytes - Int64(currentBufferSize), - direction: .upload - ) - ) - previousTimeMark = current - } - } - } - - /// Calibrate the buffer size that will be sent to the server according to the initial buffer size and the amount of buffer - /// currently queued in the websocket pipeline. - /// - /// The new size increment by a factor of 16 by default. However, if the new size exceeds the `MAX_MESSAGE_SIZE` limit, - /// then the size will be set to `MAX_MESSAGE_SIZE`. - /// If there are sufficient amount of spaces available in the websocket buffer, then the system will double the buffer size to maximize the network load. - /// - /// - Parameters: - /// - initial: the initial buffer size - /// - ws: the websocket object that knows the number of bytes currently queued in the system. - /// - /// - Returns: the number of bytes for next round of upload. - private func calibrateLoadSize(initial size: Int, ws: WebSocket) -> Int { - let nextSizeIncrement: Int = size >= MAX_MESSAGE_SIZE ? MAX_MESSAGE_SIZE : 16 * size - return (self.numBytes - Int64(ws.bufferedBytes) >= nextSizeIncrement) ? size * 2 : size - } -} diff --git a/Sources/Utils/Date+Extension.swift b/Sources/Utils/Date+Extension.swift deleted file mode 100644 index 67a55db..0000000 --- a/Sources/Utils/Date+Extension.swift +++ /dev/null @@ -1,26 +0,0 @@ -// -// This source file is part of the LCL open source project -// -// Copyright (c) 2021-2024 Local Connectivity Lab and the project authors -// Licensed under Apache License v2.0 -// -// See LICENSE for license information -// See CONTRIBUTORS for the list of project authors -// -// SPDX-License-Identifier: Apache-2.0 -// -import Foundation - -extension Date { - - /// Get the current time in microsecond - /// - /// - Returns: current time in microsecond. - static var nowInMicroSecond: Int64 { - if #available(macOS 12, iOS 15, *) { - Int64(Date.now.timeIntervalSince1970 * 1000_000) - } else { - Int64(Date().timeIntervalSince1970 * 1000_000) - } - } -}