Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for earlier OS versions + Change stream line separator to CRLF #59

Open
wants to merge 4 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Package.swift
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import PackageDescription
let package = Package(
name: "Twift",
platforms: [
.macOS(.v12), .iOS(.v15)
.macOS(.v10_15), .iOS(.v13)
],
products: [
.library(name: "Twift", targets: ["Twift"])
Expand Down
75 changes: 75 additions & 0 deletions Sources/AsyncCRLFLineSequence.swift
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
import Foundation

// AsyncLineSequence but separated by CRLF only
struct AsyncCRLFLineSequence<Base: AsyncSequence>: AsyncSequence where Base.Element == UInt8 {
typealias Element = String

private let base: Base

struct AsyncIterator: AsyncIteratorProtocol {
private var byteSource: Base.AsyncIterator
private var buffer = [UInt8]()

init(underlyingIterator: Base.AsyncIterator) {
byteSource = underlyingIterator
}

mutating func next() async rethrows -> String? {
let _CR: UInt8 = 0x0D
let _LF: UInt8 = 0x0A

func yield() -> String? {
defer {
buffer.removeAll(keepingCapacity: true)
}
if buffer.isEmpty {
return nil
}
return String(decoding: buffer, as: UTF8.self)
}

while let first = try await byteSource.next() {
switch first {
case _CR:
// Try to read: 0D [0A].
guard let next = try await byteSource.next() else {
buffer.append(first)
return yield()
}
guard next == _LF else {
buffer.append(first)
buffer.append(next)
continue
}
if let result = yield() {
return result
}
default:
buffer.append(first)
}
}
// Don't emit an empty newline when there is no more content (e.g. end of file)
if !buffer.isEmpty {
return yield()
}
return nil
}
}

func makeAsyncIterator() -> AsyncIterator {
return AsyncIterator(underlyingIterator: base.makeAsyncIterator())
}

init(underlyingSequence: Base) {
base = underlyingSequence
}
}

extension AsyncSequence where Self.Element == UInt8 {
/**
A non-blocking sequence of CRLF-separated `Strings` created by decoding the elements of `self` as UTF8.
*/
var linesCRLF: AsyncCRLFLineSequence<Self> {
AsyncCRLFLineSequence(underlyingSequence: self)
}
}
13 changes: 13 additions & 0 deletions Sources/Date++.swift
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
import Foundation

extension Date {
func ISO8601Format() -> String {
if #available(iOS 15.0, macOS 12.0, *) {
return ISO8601Format(.init())
} else {
return ISO8601DateFormatter.string(from: self,
timeZone: TimeZone(secondsFromGMT: 0)!,
formatOptions: [.withInternetDateTime])
}
}
}
2 changes: 1 addition & 1 deletion Sources/Twift+Authentication.swift
Original file line number Diff line number Diff line change
Expand Up @@ -271,7 +271,7 @@ public struct OAuth2User: Codable {

/// Whether or not the access token has expired (i.e. whether `expiresAt` is in the past).
public var expired: Bool {
expiresAt < .now
expiresAt.timeIntervalSinceNow < 0
}

internal enum CodingKeys: String, CodingKey {
Expand Down
58 changes: 33 additions & 25 deletions Sources/Twift+Streams.swift
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,11 @@ extension Twift {
/// - backfillMinutes: By passing this parameter, you can request up to five (5) minutes worth of streaming data that you might have missed during a disconnection to be delivered to you upon reconnection. The backfilled Tweets will automatically flow through the reconnected stream, with older Tweets generally being delivered before any newly matching Tweets. You must include a whole number between 1 and 5 as the value to this parameter.
/// This feature will deliver duplicate Tweets, meaning that if you were disconnected for 90 seconds, and you requested two minutes of backfill, you will receive 30 seconds worth of duplicate Tweets. Due to this, you should make sure your system is tolerant of duplicate data.
/// This feature is currently only available to the Academic Research product track.
/// - Returns: An `AsyncSequence` of `TwitterAPIDataAndIncludes<Tweet, Tweet.Includes>` objects.
/// - Returns: A stream of `TwitterAPIDataAndIncludes<Tweet, Tweet.Includes>` objects.
public func volumeStream(fields: Set<Tweet.Field> = [],
expansions: [Tweet.Expansions] = [],
backfillMinutes: Int? = nil
) async throws -> AsyncThrowingCompactMapSequence<AsyncLineSequence<URLSession.AsyncBytes>, TwitterAPIDataAndIncludes<Tweet, Tweet.Includes>> {
) async throws -> Stream<TwitterAPIDataAndIncludes<Tweet, Tweet.Includes>> {
guard case .appOnly(_) = authenticationType else { throw TwiftError.WrongAuthenticationType(needs: .appOnly) }

var queryItems = fieldsAndExpansions(for: Tweet.self, fields: fields, expansions: expansions)
Expand All @@ -30,17 +30,7 @@ extension Twift {

signURLRequest(method: .GET, request: &request)

let (bytes, response) = try await URLSession.shared.bytes(for: request)

guard let response = response as? HTTPURLResponse,
response.statusCode == 200 else {
throw URLError.init(.resourceUnavailable)
}

return bytes.lines
.compactMap {
try? await self.decodeOrThrow(decodingType: TwitterAPIDataAndIncludes.self, data: Data($0.utf8))
}
return try await stream(for: request)
}

/// Streams Tweets in real-time based on a specific set of filter rules.
Expand All @@ -52,11 +42,11 @@ extension Twift {
/// - backfillMinutes: By passing this parameter, you can request up to five (5) minutes worth of streaming data that you might have missed during a disconnection to be delivered to you upon reconnection. The backfilled Tweets will automatically flow through the reconnected stream, with older Tweets generally being delivered before any newly matching Tweets. You must include a whole number between 1 and 5 as the value to this parameter.
/// This feature will deliver duplicate Tweets, meaning that if you were disconnected for 90 seconds, and you requested two minutes of backfill, you will receive 30 seconds worth of duplicate Tweets. Due to this, you should make sure your system is tolerant of duplicate data.
/// This feature is currently only available to the Academic Research product track.
/// - Returns: An `AsyncSequence` of `TwitterAPIDataAndIncludes<Tweet, Tweet.Includes>` objects.
/// - Returns: A stream of `TwitterAPIDataAndIncludes<Tweet, Tweet.Includes>` objects.
public func filteredStream(fields: Set<Tweet.Field> = [],
expansions: [Tweet.Expansions] = [],
backfillMinutes: Int? = nil
) async throws -> AsyncThrowingCompactMapSequence<AsyncLineSequence<URLSession.AsyncBytes>, TwitterAPIDataAndIncludes<Tweet, Tweet.Includes>> {
) async throws -> Stream<TwitterAPIDataAndIncludes<Tweet, Tweet.Includes>> {
guard case .appOnly(_) = authenticationType else { throw TwiftError.WrongAuthenticationType(needs: .appOnly) }

var queryItems = fieldsAndExpansions(for: Tweet.self, fields: fields, expansions: expansions)
Expand All @@ -70,17 +60,35 @@ extension Twift {

signURLRequest(method: .GET, request: &request)

let (bytes, response) = try await URLSession.shared.bytes(for: request)
guard let response = response as? HTTPURLResponse,
response.statusCode == 200 else {
throw URLError.init(.resourceUnavailable)
}

return bytes.lines
.compactMap {
try? await self.decodeOrThrow(decodingType: TwitterAPIDataAndIncludes.self, data: Data($0.utf8))
return try await stream(for: request)
}

func stream<T: Codable>(for request: URLRequest) async throws -> Stream<T> {
if #available(iOS 15.0, macOS 12.0, *) {
let (bytes, response) = try await URLSession.shared.bytes(for: request)
guard let response = response as? HTTPURLResponse,
response.statusCode == 200 else {
throw TwiftError.UnknownError(response)
}

return Stream(
bytes.linesCRLF
.compactMap { try? await self.decodeOrThrow(decodingType: T.self, data: Data($0.utf8)) }
)
} else {
let (bytes, response) = try await _AsyncBytes.bytes(for: request)

guard let response = response as? HTTPURLResponse,
response.statusCode == 200 else {
throw TwiftError.UnknownError(response)
}

return Stream(
bytes.linesCRLF
.compactMap { try? await self.decodeOrThrow(decodingType: T.self, data: Data($0.utf8)) }
)
}
}
}

Expand Down
2 changes: 1 addition & 1 deletion Sources/Twift.swift
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ public class Twift: NSObject, ObservableObject {

if dateStr == "string" && isTestEnvironment {
print("Test environment detected: simulating date for data decoder")
return .now
return Date()
}

throw TwiftError.UnknownError("Couldn't decode date from returned data: \(decoder.codingPath.description)")
Expand Down
26 changes: 26 additions & 0 deletions Sources/Types+Stream.swift
Original file line number Diff line number Diff line change
Expand Up @@ -42,3 +42,29 @@ public struct MutableFilteredStreamRule: Codable {
/// The optional tag for this stream rule
public var tag: String?
}

/// An asychronous sequence of stream objects.
public struct Stream<Element>: AsyncSequence {
private let makeUnderlyingIterator: () -> AsyncIterator

init<S: AsyncSequence>(_ base: S) where S.Element == Element {
makeUnderlyingIterator = { AsyncIterator(base.makeAsyncIterator()) }
}

public func makeAsyncIterator() -> AsyncIterator {
return makeUnderlyingIterator()
}

public struct AsyncIterator: AsyncIteratorProtocol {
private let _next: () async throws -> Element?

init<I: AsyncIteratorProtocol>(_ base: I) where I.Element == Element {
var iterator = base
_next = { try await iterator.next() }
}

public func next() async throws -> Element? {
return try await _next()
}
}
}
106 changes: 106 additions & 0 deletions Sources/_AsyncBytes.swift
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
import Foundation

struct _AsyncBytes: AsyncSequence {
typealias Element = UInt8

private let makeUnderlyingIterator: () -> AsyncIterator

init<I: AsyncIteratorProtocol>(_ underlyingIterator: I) where I.Element == UInt8 {
makeUnderlyingIterator = { AsyncIterator(underlyingIterator) }
}

public func makeAsyncIterator() -> AsyncIterator {
return makeUnderlyingIterator()
}

struct AsyncIterator: AsyncIteratorProtocol {
private let _next: () async throws -> Element?

init<I: AsyncIteratorProtocol>(_ base: I) where I.Element == Element {
var iterator = base
_next = { try await iterator.next() }
}

public func next() async throws -> Element? {
return try await _next()
}
}
}

extension _AsyncBytes {
static func bytes(for request: URLRequest) async throws -> (_AsyncBytes, URLResponse) {
return try await _URLSessionAsyncBytesDelegate().bytes(for: request)
}
}

private class _URLSessionAsyncBytesDelegate: NSObject, URLSessionDataDelegate {
private var responseContinuation: CheckedContinuation<URLResponse, Error>!

private let stream: AsyncThrowingStream<UInt8, Error>
private let streamContinuation: AsyncThrowingStream<UInt8, Error>.Continuation

override init() {
var continuation: AsyncThrowingStream<UInt8, Error>.Continuation!
stream = AsyncThrowingStream { continuation = $0 }
streamContinuation = continuation

super.init()
}

func bytes(for request: URLRequest) async throws -> (_AsyncBytes, URLResponse) {
let response = try await withCheckedThrowingContinuation { continuation in
responseContinuation = continuation

let session = URLSession(configuration: .default, delegate: self, delegateQueue: nil)
streamContinuation.onTermination = { @Sendable _ in session.invalidateAndCancel() }
session.dataTask(with: request).resume()
}
let iterator = AsyncIterator(stream.makeAsyncIterator()) { [streamContinuation] in
streamContinuation.finish()
}
return (_AsyncBytes(iterator), response)
}

func urlSession(_ session: URLSession, task: URLSessionTask, didCompleteWithError error: Error?) {
if task.response == nil, let error = error {
// Client-side error
responseContinuation.resume(throwing: error)
}
streamContinuation.finish(throwing: error)
}

func urlSession(_ session: URLSession, dataTask: URLSessionDataTask, didReceive response: URLResponse) async -> URLSession.ResponseDisposition {
responseContinuation.resume(returning: response)
return .allow
}

func urlSession(_ session: URLSession, dataTask: URLSessionDataTask, didReceive data: Data) {
for b in data {
streamContinuation.yield(b)
}
}

struct AsyncIterator: AsyncIteratorProtocol {
private var base: AsyncThrowingStream<UInt8, Error>.AsyncIterator
private let token: Token

init(_ underlyingIterator: AsyncThrowingStream<UInt8, Error>.AsyncIterator, onDeinit: @escaping () -> Void) {
base = underlyingIterator
token = Token(onDeinit: onDeinit)
}

mutating func next() async throws -> UInt8? {
return try await base.next()
}

private final class Token {
private let onDeinit: () -> Void

init(onDeinit: @escaping () -> Void) {
self.onDeinit = onDeinit
}

deinit { onDeinit() }
}
}
}