Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for earlier OS versions + Change stream line separator to CRLF #59

Open
wants to merge 4 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Package.swift
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import PackageDescription
let package = Package(
name: "Twift",
platforms: [
.macOS(.v12), .iOS(.v15)
.macOS(.v10_15), .iOS(.v13)
],
products: [
.library(name: "Twift", targets: ["Twift"])
Expand Down
75 changes: 75 additions & 0 deletions Sources/AsyncCRLFLineSequence.swift
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
import Foundation

// AsyncLineSequence but separated by CRLF only
struct AsyncCRLFLineSequence<Base: AsyncSequence>: AsyncSequence where Base.Element == UInt8 {
typealias Element = String

private let base: Base

struct AsyncIterator: AsyncIteratorProtocol {
private var byteSource: Base.AsyncIterator
private var buffer = [UInt8]()

init(underlyingIterator: Base.AsyncIterator) {
byteSource = underlyingIterator
}

mutating func next() async rethrows -> String? {
let _CR: UInt8 = 0x0D
let _LF: UInt8 = 0x0A

func yield() -> String? {
defer {
buffer.removeAll(keepingCapacity: true)
}
if buffer.isEmpty {
return nil
}
return String(decoding: buffer, as: UTF8.self)
}

while let first = try await byteSource.next() {
switch first {
case _CR:
// Try to read: 0D [0A].
guard let next = try await byteSource.next() else {
buffer.append(first)
return yield()
}
guard next == _LF else {
buffer.append(first)
buffer.append(next)
continue
}
if let result = yield() {
return result
}
default:
buffer.append(first)
}
}
// Don't emit an empty newline when there is no more content (e.g. end of file)
if !buffer.isEmpty {
return yield()
}
return nil
}
}

func makeAsyncIterator() -> AsyncIterator {
return AsyncIterator(underlyingIterator: base.makeAsyncIterator())
}

init(underlyingSequence: Base) {
base = underlyingSequence
}
}

extension AsyncSequence where Self.Element == UInt8 {
/**
A non-blocking sequence of CRLF-separated `Strings` created by decoding the elements of `self` as UTF8.
*/
var linesCRLF: AsyncCRLFLineSequence<Self> {
AsyncCRLFLineSequence(underlyingSequence: self)
}
}
118 changes: 118 additions & 0 deletions Sources/Polyfill.swift
Original file line number Diff line number Diff line change
@@ -0,0 +1,118 @@
import Foundation

extension Date {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Personally, I would put extensions & polyfills each in their own file based on what type they're extending, e.g. DateExtensions.swift (common Swift style), Date+TwiftExtensions.swift (older Obj-C style), or following the existing naming of files in Twift, Date++.swift , or maybe Date+Polyfill.swift considering the use-case.

func _ISO8601Format() -> String {
if #available(iOS 15.0, macOS 12.0, *) {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can wrap functions & structs in available (by using @available instead of #available), so they can have the same name as Apple's identifiers, but not conflict with them on newer OSes.

E.G. Change this:

extension Date {
  func _ISO8601Format() -> String {
    if #available(iOS 15.0, macOS 12.0, *) {
      
    }
  }
}

to this:

@available(iOS 15.0, macOS 12.0, *)
extension Date {
  func ISO8601Format() -> String {
     // only your custom implementation here; no need to call out to `ISO8601Format()`
  }
}

return ISO8601Format()
} else {
return ISO8601DateFormatter.string(from: self,
timeZone: TimeZone(secondsFromGMT: 0)!,
formatOptions: [.withInternetDateTime])
}
}
}

struct _AsyncBytes: AsyncSequence {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Likewise for AsyncBytes regarding the @available(…) attribute comments above.

typealias Element = UInt8

private let makeUnderlyingIterator: () -> AsyncIterator

init<I: AsyncIteratorProtocol>(_ underlyingIterator: I) where I.Element == UInt8 {
makeUnderlyingIterator = { AsyncIterator(underlyingIterator) }
}

public func makeAsyncIterator() -> AsyncIterator {
return makeUnderlyingIterator()
}

struct AsyncIterator: AsyncIteratorProtocol {
private let _next: () async throws -> Element?

init<I: AsyncIteratorProtocol>(_ base: I) where I.Element == Element {
var iterator = base
_next = { try await iterator.next() }
}

public func next() async throws -> Element? {
return try await _next()
}
}
}

extension _AsyncBytes {
static func bytes(for request: URLRequest) async throws -> (_AsyncBytes, URLResponse) {
return try await _URLSessionAsyncBytesDelegate().bytes(for: request)
}
}

private class _URLSessionAsyncBytesDelegate: NSObject, URLSessionDataDelegate {
private var responseContinuation: CheckedContinuation<URLResponse, Error>!

private let stream: AsyncThrowingStream<UInt8, Error>
private let streamContinuation: AsyncThrowingStream<UInt8, Error>.Continuation

override init() {
var continuation: AsyncThrowingStream<UInt8, Error>.Continuation!
stream = AsyncThrowingStream { continuation = $0 }
streamContinuation = continuation

super.init()
}

func bytes(for request: URLRequest) async throws -> (_AsyncBytes, URLResponse) {
let response = try await withCheckedThrowingContinuation { continuation in
responseContinuation = continuation

let session = URLSession(configuration: .default, delegate: self, delegateQueue: nil)
streamContinuation.onTermination = { @Sendable _ in session.invalidateAndCancel() }
session.dataTask(with: request).resume()
}
let iterator = AsyncIterator(stream.makeAsyncIterator()) { [streamContinuation] in
streamContinuation.finish()
}
return (_AsyncBytes(iterator), response)
}

func urlSession(_ session: URLSession, task: URLSessionTask, didCompleteWithError error: Error?) {
if task.response == nil, let error = error {
// Client-side error
responseContinuation.resume(throwing: error)
}
streamContinuation.finish(throwing: error)
}

func urlSession(_ session: URLSession, dataTask: URLSessionDataTask, didReceive response: URLResponse) async -> URLSession.ResponseDisposition {
responseContinuation.resume(returning: response)
return .allow
}

func urlSession(_ session: URLSession, dataTask: URLSessionDataTask, didReceive data: Data) {
for b in data {
streamContinuation.yield(b)
}
}

struct AsyncIterator: AsyncIteratorProtocol {
private var base: AsyncThrowingStream<UInt8, Error>.AsyncIterator
private let token: Token

init(_ underlyingIterator: AsyncThrowingStream<UInt8, Error>.AsyncIterator, onDeinit: @escaping () -> Void) {
base = underlyingIterator
token = Token(onDeinit: onDeinit)
}

mutating func next() async throws -> UInt8? {
return try await base.next()
}

private final class Token {
private let onDeinit: () -> Void

init(onDeinit: @escaping () -> Void) {
self.onDeinit = onDeinit
}

deinit { onDeinit() }
}
}
}
2 changes: 1 addition & 1 deletion Sources/Twift+Authentication.swift
Original file line number Diff line number Diff line change
Expand Up @@ -270,7 +270,7 @@ public struct OAuth2User: Codable {

/// Whether or not the access token has expired (i.e. whether `expiresAt` is in the past).
public var expired: Bool {
expiresAt < .now
expiresAt.timeIntervalSinceNow < 0
}

internal enum CodingKeys: String, CodingKey {
Expand Down
8 changes: 4 additions & 4 deletions Sources/Twift+Search.swift
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,8 @@ extension Twift {
if let nextToken = nextToken { queryItems.append(URLQueryItem(name: "next_token", value: nextToken)) }
if let sinceId = sinceId { queryItems.append(URLQueryItem(name: "since_id", value: sinceId)) }
if let untilId = untilId { queryItems.append(URLQueryItem(name: "until_id", value: untilId)) }
if let startTime = startTime?.ISO8601Format() { queryItems.append(URLQueryItem(name: "start_time", value: startTime)) }
if let endTime = endTime?.ISO8601Format() { queryItems.append(URLQueryItem(name: "end_time", value: endTime)) }
if let startTime = startTime?._ISO8601Format() { queryItems.append(URLQueryItem(name: "start_time", value: startTime)) }
if let endTime = endTime?._ISO8601Format() { queryItems.append(URLQueryItem(name: "end_time", value: endTime)) }

let fieldsAndExpansions = fieldsAndExpansions(for: Tweet.self, fields: fields, expansions: expansions)

Expand Down Expand Up @@ -74,8 +74,8 @@ extension Twift {
if let nextToken = nextToken { queryItems.append(URLQueryItem(name: "next_token", value: nextToken)) }
if let sinceId = sinceId { queryItems.append(URLQueryItem(name: "since_id", value: sinceId)) }
if let untilId = untilId { queryItems.append(URLQueryItem(name: "until_id", value: untilId)) }
if let startTime = startTime?.ISO8601Format() { queryItems.append(URLQueryItem(name: "start_time", value: startTime)) }
if let endTime = endTime?.ISO8601Format() { queryItems.append(URLQueryItem(name: "end_time", value: endTime)) }
if let startTime = startTime?._ISO8601Format() { queryItems.append(URLQueryItem(name: "start_time", value: startTime)) }
if let endTime = endTime?._ISO8601Format() { queryItems.append(URLQueryItem(name: "end_time", value: endTime)) }

let fieldsAndExpansions = fieldsAndExpansions(for: Tweet.self, fields: fields, expansions: expansions)

Expand Down
58 changes: 33 additions & 25 deletions Sources/Twift+Streams.swift
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,11 @@ extension Twift {
/// - backfillMinutes: By passing this parameter, you can request up to five (5) minutes worth of streaming data that you might have missed during a disconnection to be delivered to you upon reconnection. The backfilled Tweets will automatically flow through the reconnected stream, with older Tweets generally being delivered before any newly matching Tweets. You must include a whole number between 1 and 5 as the value to this parameter.
/// This feature will deliver duplicate Tweets, meaning that if you were disconnected for 90 seconds, and you requested two minutes of backfill, you will receive 30 seconds worth of duplicate Tweets. Due to this, you should make sure your system is tolerant of duplicate data.
/// This feature is currently only available to the Academic Research product track.
/// - Returns: An `AsyncSequence` of `TwitterAPIDataAndIncludes<Tweet, Tweet.Includes>` objects.
/// - Returns: A stream of `TwitterAPIDataAndIncludes<Tweet, Tweet.Includes>` objects.
public func volumeStream(fields: Set<Tweet.Field> = [],
expansions: [Tweet.Expansions] = [],
backfillMinutes: Int? = nil
) async throws -> AsyncThrowingCompactMapSequence<AsyncLineSequence<URLSession.AsyncBytes>, TwitterAPIDataAndIncludes<Tweet, Tweet.Includes>> {
) async throws -> Stream<TwitterAPIDataAndIncludes<Tweet, Tweet.Includes>> {
guard case .appOnly(_) = authenticationType else { throw TwiftError.WrongAuthenticationType(needs: .appOnly) }

var queryItems = fieldsAndExpansions(for: Tweet.self, fields: fields, expansions: expansions)
Expand All @@ -30,17 +30,7 @@ extension Twift {

signURLRequest(method: .GET, request: &request)

let (bytes, response) = try await URLSession.shared.bytes(for: request)

guard let response = response as? HTTPURLResponse,
response.statusCode == 200 else {
throw URLError.init(.resourceUnavailable)
}

return bytes.lines
.compactMap {
try? await self.decodeOrThrow(decodingType: TwitterAPIDataAndIncludes.self, data: Data($0.utf8))
}
return try await stream(for: request)
}

/// Streams Tweets in real-time based on a specific set of filter rules.
Expand All @@ -52,11 +42,11 @@ extension Twift {
/// - backfillMinutes: By passing this parameter, you can request up to five (5) minutes worth of streaming data that you might have missed during a disconnection to be delivered to you upon reconnection. The backfilled Tweets will automatically flow through the reconnected stream, with older Tweets generally being delivered before any newly matching Tweets. You must include a whole number between 1 and 5 as the value to this parameter.
/// This feature will deliver duplicate Tweets, meaning that if you were disconnected for 90 seconds, and you requested two minutes of backfill, you will receive 30 seconds worth of duplicate Tweets. Due to this, you should make sure your system is tolerant of duplicate data.
/// This feature is currently only available to the Academic Research product track.
/// - Returns: An `AsyncSequence` of `TwitterAPIDataAndIncludes<Tweet, Tweet.Includes>` objects.
/// - Returns: A stream of `TwitterAPIDataAndIncludes<Tweet, Tweet.Includes>` objects.
public func filteredStream(fields: Set<Tweet.Field> = [],
expansions: [Tweet.Expansions] = [],
backfillMinutes: Int? = nil
) async throws -> AsyncThrowingCompactMapSequence<AsyncLineSequence<URLSession.AsyncBytes>, TwitterAPIDataAndIncludes<Tweet, Tweet.Includes>> {
) async throws -> Stream<TwitterAPIDataAndIncludes<Tweet, Tweet.Includes>> {
guard case .appOnly(_) = authenticationType else { throw TwiftError.WrongAuthenticationType(needs: .appOnly) }

var queryItems = fieldsAndExpansions(for: Tweet.self, fields: fields, expansions: expansions)
Expand All @@ -70,17 +60,35 @@ extension Twift {

signURLRequest(method: .GET, request: &request)

let (bytes, response) = try await URLSession.shared.bytes(for: request)
guard let response = response as? HTTPURLResponse,
response.statusCode == 200 else {
throw URLError.init(.resourceUnavailable)
}

return bytes.lines
.compactMap {
try? await self.decodeOrThrow(decodingType: TwitterAPIDataAndIncludes.self, data: Data($0.utf8))
return try await stream(for: request)
}

func stream<T: Codable>(for request: URLRequest) async throws -> Stream<T> {
if #available(iOS 15.0, macOS 12.0, *) {
let (bytes, response) = try await URLSession.shared.bytes(for: request)
guard let response = response as? HTTPURLResponse,
response.statusCode == 200 else {
throw TwiftError.UnknownError(response)
}

return Stream(
bytes.linesCRLF
.compactMap { try? await self.decodeOrThrow(decodingType: T.self, data: Data($0.utf8)) }
)
} else {
let (bytes, response) = try await _AsyncBytes.bytes(for: request)

guard let response = response as? HTTPURLResponse,
response.statusCode == 200 else {
throw TwiftError.UnknownError(response)
}

return Stream(
bytes.linesCRLF
.compactMap { try? await self.decodeOrThrow(decodingType: T.self, data: Data($0.utf8)) }
)
}
}
}

Expand Down
12 changes: 6 additions & 6 deletions Sources/Twift+Tweets.swift
Original file line number Diff line number Diff line change
Expand Up @@ -71,8 +71,8 @@ extension Twift {
if let exclude = exclude { queryItems.append(URLQueryItem(name: "exclude", value: exclude.map(\.rawValue).joined(separator: ","))) }
if let sinceId = sinceId { queryItems.append(URLQueryItem(name: "since_id", value: sinceId)) }
if let untilId = untilId { queryItems.append(URLQueryItem(name: "until_id", value: untilId)) }
if let startTime = startTime?.ISO8601Format() { queryItems.append(URLQueryItem(name: "start_time", value: startTime)) }
if let endTime = endTime?.ISO8601Format() { queryItems.append(URLQueryItem(name: "end_time", value: endTime)) }
if let startTime = startTime?._ISO8601Format() { queryItems.append(URLQueryItem(name: "start_time", value: startTime)) }
if let endTime = endTime?._ISO8601Format() { queryItems.append(URLQueryItem(name: "end_time", value: endTime)) }

let fieldsAndExpansions = fieldsAndExpansions(for: Tweet.self, fields: fields, expansions: expansions)

Expand Down Expand Up @@ -111,8 +111,8 @@ extension Twift {
if let exclude = exclude { queryItems.append(URLQueryItem(name: "exclude", value: exclude.map(\.rawValue).joined(separator: ","))) }
if let sinceId = sinceId { queryItems.append(URLQueryItem(name: "since_id", value: sinceId)) }
if let untilId = untilId { queryItems.append(URLQueryItem(name: "until_id", value: untilId)) }
if let startTime = startTime?.ISO8601Format() { queryItems.append(URLQueryItem(name: "start_time", value: startTime)) }
if let endTime = endTime?.ISO8601Format() { queryItems.append(URLQueryItem(name: "end_time", value: endTime)) }
if let startTime = startTime?._ISO8601Format() { queryItems.append(URLQueryItem(name: "start_time", value: startTime)) }
if let endTime = endTime?._ISO8601Format() { queryItems.append(URLQueryItem(name: "end_time", value: endTime)) }

let fieldsAndExpansions = fieldsAndExpansions(for: Tweet.self, fields: fields, expansions: expansions)

Expand Down Expand Up @@ -151,8 +151,8 @@ extension Twift {
if let exclude = exclude { queryItems.append(URLQueryItem(name: "exclude", value: exclude.map(\.rawValue).joined(separator: ","))) }
if let sinceId = sinceId { queryItems.append(URLQueryItem(name: "since_id", value: sinceId)) }
if let untilId = untilId { queryItems.append(URLQueryItem(name: "until_id", value: untilId)) }
if let startTime = startTime?.ISO8601Format() { queryItems.append(URLQueryItem(name: "start_time", value: startTime)) }
if let endTime = endTime?.ISO8601Format() { queryItems.append(URLQueryItem(name: "end_time", value: endTime)) }
if let startTime = startTime?._ISO8601Format() { queryItems.append(URLQueryItem(name: "start_time", value: startTime)) }
if let endTime = endTime?._ISO8601Format() { queryItems.append(URLQueryItem(name: "end_time", value: endTime)) }

let fieldsAndExpansions = fieldsAndExpansions(for: Tweet.self, fields: fields, expansions: expansions)

Expand Down
2 changes: 1 addition & 1 deletion Sources/Twift.swift
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ public class Twift: NSObject, ObservableObject {

if dateStr == "string" && isTestEnvironment {
print("Test environment detected: simulating date for data decoder")
return .now
return Date()
}

throw TwiftError.UnknownError("Couldn't decode date from returned data: \(decoder.codingPath.description)")
Expand Down
26 changes: 26 additions & 0 deletions Sources/Types+Stream.swift
Original file line number Diff line number Diff line change
Expand Up @@ -42,3 +42,29 @@ public struct MutableFilteredStreamRule: Codable {
/// The optional tag for this stream rule
public var tag: String?
}

/// An asychronous sequence of stream objects.
public struct Stream<Element>: AsyncSequence {
private let makeUnderlyingIterator: () -> AsyncIterator

init<S: AsyncSequence>(_ base: S) where S.Element == Element {
makeUnderlyingIterator = { AsyncIterator(base.makeAsyncIterator()) }
}

public func makeAsyncIterator() -> AsyncIterator {
return makeUnderlyingIterator()
}

public struct AsyncIterator: AsyncIteratorProtocol {
private let _next: () async throws -> Element?

init<I: AsyncIteratorProtocol>(_ base: I) where I.Element == Element {
var iterator = base
_next = { try await iterator.next() }
}

public func next() async throws -> Element? {
return try await _next()
}
}
}