Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adds ReplaySubject and Publisher.share(replay:) #23

Merged
merged 11 commits into from
Apr 19, 2020
24 changes: 24 additions & 0 deletions CombineExt.xcodeproj/project.pbxproj
Original file line number Diff line number Diff line change
Expand Up @@ -45,10 +45,14 @@
78C193E4241D63620001B7FD /* DematerializeTests.swift in Sources */ = {isa = PBXBuildFile; fileRef = 78C193E3241D63620001B7FD /* DematerializeTests.swift */; };
AAEAF0E72436D346007C35E0 /* SetOutputType.swift in Sources */ = {isa = PBXBuildFile; fileRef = AAEAF0E62436D346007C35E0 /* SetOutputType.swift */; };
AAEAF0E92436D785007C35E0 /* SetOutputTypeTests.swift in Sources */ = {isa = PBXBuildFile; fileRef = AAEAF0E82436D785007C35E0 /* SetOutputTypeTests.swift */; };
BF3C6B6C24421D27004D4A8A /* ShareReplay.swift in Sources */ = {isa = PBXBuildFile; fileRef = BF3C6B6B24421D27004D4A8A /* ShareReplay.swift */; };
BF50924B241FFE8E00600DF4 /* ZipManyTests.swift in Sources */ = {isa = PBXBuildFile; fileRef = BF50924A241FFE8E00600DF4 /* ZipManyTests.swift */; };
BF8121BC241FF42C006A93B8 /* ZipMany.swift in Sources */ = {isa = PBXBuildFile; fileRef = BF8121BB241FF42C006A93B8 /* ZipMany.swift */; };
BF84B7412426B786001BFA88 /* RemoveAllDuplicates.swift in Sources */ = {isa = PBXBuildFile; fileRef = BF84B7402426B786001BFA88 /* RemoveAllDuplicates.swift */; };
BF84B7432426C332001BFA88 /* RemoveAllDuplicatesTests.swift in Sources */ = {isa = PBXBuildFile; fileRef = BF84B7422426C332001BFA88 /* RemoveAllDuplicatesTests.swift */; };
BF9D85D32444BB92001783E6 /* ReplaySubject.swift in Sources */ = {isa = PBXBuildFile; fileRef = BF9D85D22444BB92001783E6 /* ReplaySubject.swift */; };
BF9D85D52444D12F001783E6 /* ReplaySubjectTests.swift in Sources */ = {isa = PBXBuildFile; fileRef = BF9D85D42444D12F001783E6 /* ReplaySubjectTests.swift */; };
BF9D85D724450090001783E6 /* ShareReplayTests.swift in Sources */ = {isa = PBXBuildFile; fileRef = BF9D85D624450090001783E6 /* ShareReplayTests.swift */; };
BFB4EA132428256B0096E9E9 /* CombineLatestMany.swift in Sources */ = {isa = PBXBuildFile; fileRef = BFB4EA122428256B0096E9E9 /* CombineLatestMany.swift */; };
BFB4EA1524283ECF0096E9E9 /* CombineLatestManyTests.swift in Sources */ = {isa = PBXBuildFile; fileRef = BFB4EA1424283ECF0096E9E9 /* CombineLatestManyTests.swift */; };
DC16910F24281A1800B234C4 /* MapMany.swift in Sources */ = {isa = PBXBuildFile; fileRef = DC16910E24281A1800B234C4 /* MapMany.swift */; };
Expand Down Expand Up @@ -93,10 +97,14 @@
78C193E3241D63620001B7FD /* DematerializeTests.swift */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.swift; path = DematerializeTests.swift; sourceTree = "<group>"; };
AAEAF0E62436D346007C35E0 /* SetOutputType.swift */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.swift; path = SetOutputType.swift; sourceTree = "<group>"; };
AAEAF0E82436D785007C35E0 /* SetOutputTypeTests.swift */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.swift; path = SetOutputTypeTests.swift; sourceTree = "<group>"; };
BF3C6B6B24421D27004D4A8A /* ShareReplay.swift */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.swift; path = ShareReplay.swift; sourceTree = "<group>"; };
BF50924A241FFE8E00600DF4 /* ZipManyTests.swift */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.swift; path = ZipManyTests.swift; sourceTree = "<group>"; };
BF8121BB241FF42C006A93B8 /* ZipMany.swift */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.swift; path = ZipMany.swift; sourceTree = "<group>"; };
BF84B7402426B786001BFA88 /* RemoveAllDuplicates.swift */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.swift; path = RemoveAllDuplicates.swift; sourceTree = "<group>"; };
BF84B7422426C332001BFA88 /* RemoveAllDuplicatesTests.swift */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.swift; path = RemoveAllDuplicatesTests.swift; sourceTree = "<group>"; };
BF9D85D22444BB92001783E6 /* ReplaySubject.swift */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.swift; path = ReplaySubject.swift; sourceTree = "<group>"; };
BF9D85D42444D12F001783E6 /* ReplaySubjectTests.swift */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.swift; path = ReplaySubjectTests.swift; sourceTree = "<group>"; };
BF9D85D624450090001783E6 /* ShareReplayTests.swift */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.swift; path = ShareReplayTests.swift; sourceTree = "<group>"; };
BFB4EA122428256B0096E9E9 /* CombineLatestMany.swift */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.swift; path = CombineLatestMany.swift; sourceTree = "<group>"; };
BFB4EA1424283ECF0096E9E9 /* CombineLatestManyTests.swift */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.swift; path = CombineLatestManyTests.swift; sourceTree = "<group>"; };
"CombineExt::CombineExt::Product" /* CombineExt.framework */ = {isa = PBXFileReference; explicitFileType = wrapper.framework; path = CombineExt.framework; sourceTree = BUILT_PRODUCTS_DIR; };
Expand Down Expand Up @@ -163,6 +171,14 @@
path = Common;
sourceTree = "<group>";
};
BF9D85D12444BB7F001783E6 /* Subjects */ = {
isa = PBXGroup;
children = (
BF9D85D22444BB92001783E6 /* ReplaySubject.swift */,
);
path = Subjects;
sourceTree = "<group>";
};
OBJ_11 /* Tests */ = {
isa = PBXGroup;
children = (
Expand All @@ -181,6 +197,8 @@
AAEAF0E82436D785007C35E0 /* SetOutputTypeTests.swift */,
788CD8F4242F9DFB0015B3C7 /* AmbTests.swift */,
BF84B7422426C332001BFA88 /* RemoveAllDuplicatesTests.swift */,
BF9D85D42444D12F001783E6 /* ReplaySubjectTests.swift */,
BF9D85D624450090001783E6 /* ShareReplayTests.swift */,
);
path = Tests;
sourceTree = SOURCE_ROOT;
Expand Down Expand Up @@ -209,6 +227,7 @@
OBJ_7 /* Sources */ = {
isa = PBXGroup;
children = (
BF9D85D12444BB7F001783E6 /* Subjects */,
78C193DA241D07160001B7FD /* Common */,
78C193D5241C2E4F0001B7FD /* Models */,
78002BB3241E90FE0018AA28 /* Relays */,
Expand All @@ -233,6 +252,7 @@
AAEAF0E62436D346007C35E0 /* SetOutputType.swift */,
788CD8FA2431228C0015B3C7 /* Amb.swift */,
BF84B7402426B786001BFA88 /* RemoveAllDuplicates.swift */,
BF3C6B6B24421D27004D4A8A /* ShareReplay.swift */,
);
path = Operators;
sourceTree = "<group>";
Expand Down Expand Up @@ -341,6 +361,7 @@
isa = PBXSourcesBuildPhase;
buildActionMask = 0;
files = (
BF3C6B6C24421D27004D4A8A /* ShareReplay.swift in Sources */,
DC16910F24281A1800B234C4 /* MapMany.swift in Sources */,
78C193CF241C16C40001B7FD /* FlatMapLatest.swift in Sources */,
78C193DC241D0A9F0001B7FD /* Sink.swift in Sources */,
Expand All @@ -355,6 +376,7 @@
BF84B7412426B786001BFA88 /* RemoveAllDuplicates.swift in Sources */,
78C193D4241C2DE00001B7FD /* Create.swift in Sources */,
OBJ_22 /* AssignToMany.swift in Sources */,
BF9D85D32444BB92001783E6 /* ReplaySubject.swift in Sources */,
AAEAF0E72436D346007C35E0 /* SetOutputType.swift in Sources */,
78C193D7241C2E580001B7FD /* Event.swift in Sources */,
OBJ_23 /* WithLatestFrom.swift in Sources */,
Expand All @@ -378,6 +400,7 @@
78C193D2241C1B750001B7FD /* FlatMapLatestTests.swift in Sources */,
78AA9297241B8532009BD68B /* AssignToManyTests.swift in Sources */,
BFB4EA1524283ECF0096E9E9 /* CombineLatestManyTests.swift in Sources */,
BF9D85D52444D12F001783E6 /* ReplaySubjectTests.swift in Sources */,
AAEAF0E92436D785007C35E0 /* SetOutputTypeTests.swift in Sources */,
78988A25241FFE2E00F3A4AF /* PartitionTests.swift in Sources */,
OBJ_41 /* WithLatestFromTests.swift in Sources */,
Expand All @@ -387,6 +410,7 @@
78C193E4241D63620001B7FD /* DematerializeTests.swift in Sources */,
BF50924B241FFE8E00600DF4 /* ZipManyTests.swift in Sources */,
788CD8F5242F9DFB0015B3C7 /* AmbTests.swift in Sources */,
BF9D85D724450090001783E6 /* ShareReplayTests.swift in Sources */,
78988A1E241EAFDD00F3A4AF /* PassthroughRelayTests.swift in Sources */,
DC1691122428228200B234C4 /* MapManyTests.swift in Sources */,
78C193D9241CEEA80001B7FD /* CreateTests.swift in Sources */,
Expand Down
77 changes: 77 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -34,12 +34,16 @@ All operators, utilities and helpers respect Combine's publisher contract, inclu
* [mapMany(_:)](#MapMany)
* [setOutputType(to:)](#setOutputType)
* [removeAllDuplicates and removeAllDuplicates(by:) ](#removeAllDuplicates)
* [share(replay:)](#sharereplay)

### Publishers
* [AnyPublisher.create](#AnypublisherCreate)
* [CurrentValueRelay](#CurrentValueRelay)
* [PassthroughRelay](#PassthroughRelay)

### Subjects
* [ReplaySubject](#ReplaySubject)

> **Note**: This is still a relatively early version of CombineExt, with much more to be desired. I gladly accept PRs, ideas, opinions, or improvements. Thank you! :)

## Installation
Expand Down Expand Up @@ -174,6 +178,8 @@ amb: 6
amb: completed with .finished
```

------

### materialize

Convert any publisher to a publisher of its events. Given a `Publisher<Output, MyError>`, this operator will return a `Publisher<Event<Output, MyError>, Never>`, which means your failure will actually be a regular value, which makes error handling much simpler in many use cases.
Expand Down Expand Up @@ -348,6 +354,8 @@ You may also use `.zip()` directly on a collection of publishers with the same o
zipped: 10
```

------

### CombineLatestMany

This repo includes two overloads on Combine’s `Publisher.combineLatest` methods (which, at the time of writing only go up to arity three) and an `Collection.combineLatest` constrained extension.
Expand Down Expand Up @@ -379,6 +387,8 @@ combineLatest: [true, true, true, true]
combineLatest: [false, true, true, true]
```

------

### MapMany

Projects each element of a publisher collection into a new publisher collection form.
Expand All @@ -399,10 +409,14 @@ intArrayPublisher.send([10, 2, 2, 4, 3, 8])
["10", "2", "2", "4", "3", "8"]
```

------

### setOutputType

`Publisher.setOutputType(to:)` is an analog to [`.setFailureType(to:)`](https://developer.apple.com/documentation/combine/publisher/3204753-setfailuretype) for when `Output` is constrained to `Never`. This is especially helpful when chaining operators after an [`.ignoreOutput()`](https://developer.apple.com/documentation/combine/publisher/3204714-ignoreoutput) call.

------

### removeAllDuplicates

`Publisher.removeAllDuplicates` and `.removeAllDuplicates(by:)` are stricter forms of Apple’s [`Publisher.removeDuplicates`](https://developer.apple.com/documentation/combine/publisher/3204745-removeduplicates) and [`.removeDuplicates(by:)`](https://developer.apple.com/documentation/combine/publisher/3204746-removeduplicates)—the operators de-duplicate across _all_ previous value events, instead of pairwise.
Expand All @@ -422,6 +436,40 @@ removeAllDuplicates: 3
removeAllDuplicates: 4
```

------

### share(replay:)

Similar to [`Publisher.share`](https://developer.apple.com/documentation/combine/publisher/3204754-share), `.share(replay:)` can be used to create a publisher instance with reference semantics which replays a pre-defined amount of value events to further subscribers.

```swift
let subject = PassthroughSubject<Int, Never>()

let replayedPublisher = subject
.share(replay: 3)

subscription1 = replayedPublisher
.sink(receiveValue: { print("first subscriber: \($0)") })

subject.send(1)
subject.send(2)
subject.send(3)
subject.send(4)

subscription2 = replayedPublisher
.sink(receiveValue: { print("second subscriber: \($0)") })
```

```none
first subscriber: 1
first subscriber: 2
first subscriber: 3
first subscriber: 4
second subscriber: 2
second subscriber: 3
second subscriber: 4
```

## Publishers

This section outlines some of the custom Combine publishers CombineExt provides
Expand Down Expand Up @@ -524,6 +572,35 @@ great
guarantees
```

## Subjects

### ReplaySubject

A Combine analog to Rx’s [`ReplaySubject` type](http://reactivex.io/documentation/subject.html). It’s similar to a [`CurrentValueSubject`](https://developer.apple.com/documentation/combine/currentvaluesubject) in that it buffers values, but, it takes it a step further in allowing consumers to specify the number of values to buffer and replay to future subscribers. Also, it will handle forwarding any completion events after the buffer is cleared upon subscription.

```swift
let subject = ReplaySubject<Int, Never>(bufferSize: 3)

subject.send(1)
subject.send(2)
subject.send(3)
subject.send(4)

subject
.sink(receiveValue: { print($0) })

subject.send(5)
```

#### Output:

```none
2
3
4
5
```

## License

MIT, of course ;-) See the [LICENSE](LICENSE) file.
Expand Down
21 changes: 21 additions & 0 deletions Sources/Operators/ShareReplay.swift
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
//
// ShareReplay.swift
// CombineExt
//
// Created by Jasdev Singh on 13/04/2020.
// Copyright © 2020 Combine Community. All rights reserved.
//

import Combine

public extension Publisher {
/// A variation on [share()](https://developer.apple.com/documentation/combine/publisher/3204754-share)
/// that allows for buffering and replaying a `replay` amount of value events to future subscribers.
///
/// - Parameter count: The number of value events to buffer in a first-in-first-out manner.
/// - Returns: A publisher that replays the specified number of value events to future subscribers.
func share(replay count: Int) -> Publishers.Autoconnect<Publishers.Multicast<Self, ReplaySubject<Output, Failure>>> {
multicast { ReplaySubject(bufferSize: count) }
.autoconnect()
}
}
129 changes: 129 additions & 0 deletions Sources/Subjects/ReplaySubject.swift
Original file line number Diff line number Diff line change
@@ -0,0 +1,129 @@
//
// ReplaySubject.swift
// CombineExt
//
// Created by Jasdev Singh on 13/04/2020.
// Copyright © 2020 Combine Community. All rights reserved.
//

import Combine

/// A `ReplaySubject` is a subject that can buffer one or more values. It stores value events, up to its `bufferSize` in a
/// first-in-first-out manner and then replays it to
/// future subscribers and also forwards completion events.
///
/// The implementation borrows heavily from [Entwine’s](https://github.com/tcldr/Entwine/blob/b839c9fcc7466878d6a823677ce608da998b95b9/Sources/Entwine/Operators/ReplaySubject.swift).
public final class ReplaySubject<Output, Failure: Error>: Subject {
public typealias Output = Output
public typealias Failure = Failure

private let bufferSize: Int
private var buffer = [Output]()

// Keeping track of all live subscriptions, so `send` events can be forwarded to them.
private var subscriptions = [Subscription<AnySubscriber<Output, Failure>>]()

// We also track subscriber identifiers, to more quickly bottom-out double subscribes instead of having to do a
// linear pass over `subscriptions`.
private var subscriberIdentifiers = Set<CombineIdentifier>()

private var completion: Subscribers.Completion<Failure>?
private var isActive: Bool { completion == nil }

/// Create a `ReplaySubject`, buffering up to `bufferSize` values and replaying them to new subscribers
/// - Parameter bufferSize: The maximum number of value events to buffer and replay to all future subscribers.
public init(bufferSize: Int) {
self.bufferSize = bufferSize
}

public func send(_ value: Output) {
guard isActive else { return }

buffer.append(value)

if buffer.count > bufferSize {
buffer.removeFirst()
}

subscriptions.forEach { $0.forwardValueToBuffer(value) }
}

public func send(completion: Subscribers.Completion<Failure>) {
guard isActive else { return }

self.completion = completion

subscriptions.forEach { $0.forwardCompletionToBuffer(completion) }
}

public func send(subscription: Combine.Subscription) {
subscription.request(.unlimited)
}

public func receive<Subscriber: Combine.Subscriber>(subscriber: Subscriber) where Failure == Subscriber.Failure, Output == Subscriber.Input {
let subscriberIdentifier = subscriber.combineIdentifier

guard !subscriberIdentifiers.contains(subscriberIdentifier) else {
subscriber.receive(subscription: Subscriptions.empty)
return
}

let subscription = Subscription(downstream:
AnySubscriber(subscriber)) { [weak self] in
guard let self = self,
let subscriptionIndex = self.subscriptions
.firstIndex(where: { $0.innerSubscriberIdentifier == subscriberIdentifier }) else { return }

self.subscriberIdentifiers.remove(subscriberIdentifier)
self.subscriptions.remove(at: subscriptionIndex)
}

subscriberIdentifiers.insert(subscriberIdentifier)
subscriptions.append(subscription)

subscriber.receive(subscription: subscription)
subscription.replay(buffer, completion: completion)
}
}

extension ReplaySubject {
final class Subscription<Downstream: Subscriber>: Combine.Subscription where Output == Downstream.Input, Failure == Downstream.Failure {
private var demandBuffer: DemandBuffer<Downstream>?
private var cancellationHandler: (() -> Void)?

fileprivate let innerSubscriberIdentifier: CombineIdentifier

init(downstream: Downstream, cancellationHandler: (() -> Void)?) {
self.demandBuffer = DemandBuffer(subscriber: downstream)
self.innerSubscriberIdentifier = downstream.combineIdentifier
self.cancellationHandler = cancellationHandler
}

func replay(_ buffer: [Output], completion: Subscribers.Completion<Failure>?) {
buffer.forEach(forwardValueToBuffer)

if let completion = completion {
forwardCompletionToBuffer(completion)
}
}

func forwardValueToBuffer(_ value: Output) {
_ = demandBuffer?.buffer(value: value)
}

func forwardCompletionToBuffer(_ completion: Subscribers.Completion<Failure>) {
demandBuffer?.complete(completion: completion)
}

func request(_ demand: Subscribers.Demand) {
_ = demandBuffer?.demand(demand)
}

func cancel() {
cancellationHandler?()
cancellationHandler = nil

demandBuffer = nil
}
}
}
Loading