Skip to content

Commit

Permalink
Adds merge operator collection overloads. #579
Browse files Browse the repository at this point in the history
  • Loading branch information
kzaher committed Feb 26, 2017
1 parent 250590e commit 6eb6853
Show file tree
Hide file tree
Showing 5 changed files with 202 additions and 16 deletions.
32 changes: 16 additions & 16 deletions Rx.xcodeproj/project.pbxproj
Original file line number Diff line number Diff line change
Expand Up @@ -965,12 +965,12 @@
C8D132451C42D15E00B59FFF /* SectionedViewDataSourceType.swift in Sources */ = {isa = PBXBuildFile; fileRef = C8D132431C42D15E00B59FFF /* SectionedViewDataSourceType.swift */; };
C8D132461C42D15E00B59FFF /* SectionedViewDataSourceType.swift in Sources */ = {isa = PBXBuildFile; fileRef = C8D132431C42D15E00B59FFF /* SectionedViewDataSourceType.swift */; };
C8D132471C42D15E00B59FFF /* SectionedViewDataSourceType.swift in Sources */ = {isa = PBXBuildFile; fileRef = C8D132431C42D15E00B59FFF /* SectionedViewDataSourceType.swift */; };
C8D82B9A1E5A348200A8AB2D /* Observable+MultipleTest2.swift in Sources */ = {isa = PBXBuildFile; fileRef = C8D82B991E5A348200A8AB2D /* Observable+MultipleTest2.swift */; };
C8D82B9B1E5A348200A8AB2D /* Observable+MultipleTest2.swift in Sources */ = {isa = PBXBuildFile; fileRef = C8D82B991E5A348200A8AB2D /* Observable+MultipleTest2.swift */; };
C8D82B9C1E5A348200A8AB2D /* Observable+MultipleTest2.swift in Sources */ = {isa = PBXBuildFile; fileRef = C8D82B991E5A348200A8AB2D /* Observable+MultipleTest2.swift */; };
C8D82B9E1E5A34B100A8AB2D /* Observable+MultipleTest3.swift in Sources */ = {isa = PBXBuildFile; fileRef = C8D82B9D1E5A34B100A8AB2D /* Observable+MultipleTest3.swift */; };
C8D82B9F1E5A34B100A8AB2D /* Observable+MultipleTest3.swift in Sources */ = {isa = PBXBuildFile; fileRef = C8D82B9D1E5A34B100A8AB2D /* Observable+MultipleTest3.swift */; };
C8D82BA01E5A34B100A8AB2D /* Observable+MultipleTest3.swift in Sources */ = {isa = PBXBuildFile; fileRef = C8D82B9D1E5A34B100A8AB2D /* Observable+MultipleTest3.swift */; };
C8D9D6151E63747100AFCA6C /* Observable+MultipleTest2.swift in Sources */ = {isa = PBXBuildFile; fileRef = C8D9D6111E636C1D00AFCA6C /* Observable+MultipleTest2.swift */; };
C8D9D6161E63747200AFCA6C /* Observable+MultipleTest2.swift in Sources */ = {isa = PBXBuildFile; fileRef = C8D9D6111E636C1D00AFCA6C /* Observable+MultipleTest2.swift */; };
C8D9D6171E63747200AFCA6C /* Observable+MultipleTest2.swift in Sources */ = {isa = PBXBuildFile; fileRef = C8D9D6111E636C1D00AFCA6C /* Observable+MultipleTest2.swift */; };
C8D9D6181E63747600AFCA6C /* Observable+MultipleTest3.swift in Sources */ = {isa = PBXBuildFile; fileRef = C8D9D6121E636C1D00AFCA6C /* Observable+MultipleTest3.swift */; };
C8D9D61A1E63747700AFCA6C /* Observable+MultipleTest3.swift in Sources */ = {isa = PBXBuildFile; fileRef = C8D9D6121E636C1D00AFCA6C /* Observable+MultipleTest3.swift */; };
C8D9D61B1E63747800AFCA6C /* Observable+MultipleTest3.swift in Sources */ = {isa = PBXBuildFile; fileRef = C8D9D6121E636C1D00AFCA6C /* Observable+MultipleTest3.swift */; };
C8E8BA401E2BBDC800A4AC2C /* (null) in Sources */ = {isa = PBXBuildFile; };
C8E8BA5A1E2C181A00A4AC2C /* RxSwift.framework in Frameworks */ = {isa = PBXBuildFile; fileRef = C8A56AD71AD7424700B4673B /* RxSwift.framework */; };
C8E8BA641E2C186200A4AC2C /* Benchmarks.swift in Sources */ = {isa = PBXBuildFile; fileRef = C8E8BA621E2C186200A4AC2C /* Benchmarks.swift */; };
Expand Down Expand Up @@ -1935,8 +1935,8 @@
C8D132431C42D15E00B59FFF /* SectionedViewDataSourceType.swift */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.swift; path = SectionedViewDataSourceType.swift; sourceTree = "<group>"; };
C8D132521C42DA7F00B59FFF /* SectionedViewDataSourceMock.swift */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.swift; path = SectionedViewDataSourceMock.swift; sourceTree = "<group>"; };
C8D2C1501D4F3CD6006E2431 /* Rx.playground */ = {isa = PBXFileReference; lastKnownFileType = file.playground; path = Rx.playground; sourceTree = "<group>"; };
C8D82B991E5A348200A8AB2D /* Observable+MultipleTest2.swift */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.swift; path = "Observable+MultipleTest2.swift"; sourceTree = "<group>"; };
C8D82B9D1E5A34B100A8AB2D /* Observable+MultipleTest3.swift */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.swift; path = "Observable+MultipleTest3.swift"; sourceTree = "<group>"; };
C8D9D6111E636C1D00AFCA6C /* Observable+MultipleTest2.swift */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.swift; path = "Observable+MultipleTest2.swift"; sourceTree = "<group>"; };
C8D9D6121E636C1D00AFCA6C /* Observable+MultipleTest3.swift */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.swift; path = "Observable+MultipleTest3.swift"; sourceTree = "<group>"; };
C8E8BA551E2C181A00A4AC2C /* Benchmarks.xctest */ = {isa = PBXFileReference; explicitFileType = wrapper.cfbundle; includeInIndex = 0; path = Benchmarks.xctest; sourceTree = BUILT_PRODUCTS_DIR; };
C8E8BA621E2C186200A4AC2C /* Benchmarks.swift */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.swift; path = Benchmarks.swift; sourceTree = "<group>"; };
C8E8BA631E2C186200A4AC2C /* Info.plist */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = text.plist.xml; path = Info.plist; sourceTree = "<group>"; };
Expand Down Expand Up @@ -2511,8 +2511,8 @@
C835090E1C38706D0027C24C /* Observable+CreationTest.swift */,
C8E9E42A1D43B26C0049644E /* Observable+DebugTest.swift */,
C83509131C38706D0027C24C /* Observable+MultipleTest.swift */,
C8D82B991E5A348200A8AB2D /* Observable+MultipleTest2.swift */,
C8D82B9D1E5A34B100A8AB2D /* Observable+MultipleTest3.swift */,
C8D9D6111E636C1D00AFCA6C /* Observable+MultipleTest2.swift */,
C8D9D6121E636C1D00AFCA6C /* Observable+MultipleTest3.swift */,
C835090F1C38706D0027C24C /* Observable+MultipleTest+CombineLatest.swift */,
C83509101C38706D0027C24C /* Observable+MultipleTest+CombineLatest.tt */,
C83509111C38706D0027C24C /* Observable+MultipleTest+Zip.swift */,
Expand Down Expand Up @@ -3955,7 +3955,7 @@
C85218011E33FC160015DD38 /* RecursiveLock.swift in Sources */,
C822BACA1DB4058000F98810 /* Event+Test.swift in Sources */,
C83509421C38706E0027C24C /* MainThreadPrimitiveHotObservable.swift in Sources */,
C8D82B9E1E5A34B100A8AB2D /* Observable+MultipleTest3.swift in Sources */,
C8D9D6181E63747600AFCA6C /* Observable+MultipleTest3.swift in Sources */,
C835093A1C38706E0027C24C /* RuntimeStateSnapshot.swift in Sources */,
C8561B661DFE1169005E97F1 /* ExampleTests.swift in Sources */,
C86B1E221D42BF5200130546 /* SchedulerTests.swift in Sources */,
Expand All @@ -3965,7 +3965,7 @@
C8C217D71CB710200038A2E6 /* UICollectionView+RxTests.swift in Sources */,
C83509451C38706E0027C24C /* Observable.Extensions.swift in Sources */,
C835093B1C38706E0027C24C /* RXObjCRuntime+Testing.m in Sources */,
C8D82B9A1E5A348200A8AB2D /* Observable+MultipleTest2.swift in Sources */,
C8D9D6151E63747100AFCA6C /* Observable+MultipleTest2.swift in Sources */,
C83509641C38706E0027C24C /* VariableTest.swift in Sources */,
C83509461C38706E0027C24C /* PrimitiveHotObservable.swift in Sources */,
C835097E1C38726E0027C24C /* RxMutableBox.swift in Sources */,
Expand Down Expand Up @@ -4132,7 +4132,7 @@
C8350A171C38756A0027C24C /* SubjectConcurrencyTest.swift in Sources */,
C83509EA1C3875580027C24C /* BackgroundThreadPrimitiveHotObservable.swift in Sources */,
C84CB1721C3876B800EB63CC /* UIView+RxTests.swift in Sources */,
C8D82B9B1E5A348200A8AB2D /* Observable+MultipleTest2.swift in Sources */,
C8D9D6161E63747200AFCA6C /* Observable+MultipleTest2.swift in Sources */,
C8353CEA1DA19BC500BE3F5C /* TestErrors.swift in Sources */,
C83509F81C38755D0027C24C /* HistoricalSchedulerTest.swift in Sources */,
C8379EF51D1DD326003EF8FC /* UIButton+RxTests.swift in Sources */,
Expand All @@ -4144,7 +4144,7 @@
C83509D31C3875390027C24C /* RXObjCRuntime+Testing.m in Sources */,
C83509FD1C38755D0027C24C /* Observable+ConcurrencyTest.swift in Sources */,
C8C4F17B1DE9DF0200003FA7 /* UICollectionView+RxTests.swift in Sources */,
C8D82B9F1E5A34B100A8AB2D /* Observable+MultipleTest3.swift in Sources */,
C8D9D61B1E63747800AFCA6C /* Observable+MultipleTest3.swift in Sources */,
C8350A141C38756A0027C24C /* Observable+TimeTest.swift in Sources */,
C8C4F1841DE9DF0200003FA7 /* UISegmentedControl+RxTests.swift in Sources */,
C83509D01C38752E0027C24C /* RuntimeStateSnapshot.swift in Sources */,
Expand All @@ -4164,7 +4164,7 @@
C8350A201C38756B0027C24C /* QueueTests.swift in Sources */,
1AF67DA41CED427D00C310FA /* PublishSubjectTest.swift in Sources */,
C894A1A41E183CA00098327C /* Observable+StandardSequenceOperatorsTest2.swift in Sources */,
C8D82B9C1E5A348200A8AB2D /* Observable+MultipleTest2.swift in Sources */,
C8D9D6171E63747200AFCA6C /* Observable+MultipleTest2.swift in Sources */,
0BA9496E1E224B9C0036DD06 /* AsyncSubjectTests.swift in Sources */,
C83509E71C3875580027C24C /* PrimitiveHotObservable.swift in Sources */,
C83509CE1C3875230027C24C /* NSObject+RxTests.swift in Sources */,
Expand All @@ -4187,7 +4187,7 @@
C8350A0E1C3875630027C24C /* Observable+MultipleTest+Zip.swift in Sources */,
C83509CF1C3875260027C24C /* NSView+RxTests.swift in Sources */,
C8350A1F1C38756B0027C24C /* ObserverTests.swift in Sources */,
C8D82BA01E5A34B100A8AB2D /* Observable+MultipleTest3.swift in Sources */,
C8D9D61A1E63747700AFCA6C /* Observable+MultipleTest3.swift in Sources */,
C834F6C61DB3950600C29244 /* NSControl+RxTests.swift in Sources */,
C83509D61C3875420027C24C /* SentMessageTest.swift in Sources */,
C8350A021C38755E0027C24C /* BagTest.swift in Sources */,
Expand Down
24 changes: 24 additions & 0 deletions RxSwift/Observables/Implementations/Merge.swift
Original file line number Diff line number Diff line change
Expand Up @@ -335,6 +335,17 @@ class MergeSink<SourceType, S: ObservableConvertibleType, O: ObserverType>
iterDisposable.setDisposable(subscription)
}
}

func run(_ sources: [SourceType]) -> Disposable {
let _ = _group.insert(_sourceSubscription)
_stopped = true

for source in sources {
self.on(.next(source))
}

return _group
}

func run(_ source: Observable<SourceType>) -> Disposable {
let _ = _group.insert(_sourceSubscription)
Expand Down Expand Up @@ -420,3 +431,16 @@ final class Merge<S: ObservableConvertibleType> : Producer<S.E> {
}
}

final class MergeArray<E> : Producer<E> {
private let _sources: [Observable<E>]

init(sources: [Observable<E>]) {
_sources = sources
}

override func run<O: ObserverType>(_ observer: O, cancel: Cancelable) -> (sink: Disposable, subscription: Disposable) where O.E == E {
let sink = MergeBasicSink<Observable<E>, O>(observer: observer, cancel: cancel)
let subscription = sink.run(_sources)
return (sink: sink, subscription: subscription)
}
}
38 changes: 38 additions & 0 deletions RxSwift/Observables/Observable+Multiple.swift
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,44 @@ extension ObservableType where E : ObservableConvertibleType {

// MARK: merge

extension Observable {
/**
Merges elements from all observable sequences from collection into a single observable sequence.
- seealso: [merge operator on reactivex.io](http://reactivex.io/documentation/operators/merge.html)
- parameter sources: Collection of observable sequences to merge.
- returns: The observable sequence that merges the elements of the observable sequences.
*/
public static func merge<C: Collection>(_ sources: C) -> Observable<E> where C.Iterator.Element == Observable<E> {
return MergeArray(sources: Array(sources))
}

/**
Merges elements from all observable sequences from array into a single observable sequence.
- seealso: [merge operator on reactivex.io](http://reactivex.io/documentation/operators/merge.html)
- parameter sources: Array of observable sequences to merge.
- returns: The observable sequence that merges the elements of the observable sequences.
*/
public static func merge(_ sources: [Observable<E>]) -> Observable<E> {
return MergeArray(sources: sources)
}

/**
Merges elements from all observable sequences into a single observable sequence.
- seealso: [merge operator on reactivex.io](http://reactivex.io/documentation/operators/merge.html)
- parameter sources: Collection of observable sequences to merge.
- returns: The observable sequence that merges the elements of the observable sequences.
*/
public static func merge(_ sources: Observable<E>...) -> Observable<E> {
return MergeArray(sources: sources)
}
}

extension ObservableType where E : ObservableConvertibleType {

/**
Expand Down
2 changes: 2 additions & 0 deletions Sources/AllTestz/main.swift
Original file line number Diff line number Diff line change
Expand Up @@ -920,6 +920,8 @@ final class ObservableMultipleTest_ : ObservableMultipleTest, RxTestCase {
("testMerge_MergeConcat_Disposed", ObservableMultipleTest.testMerge_MergeConcat_Disposed),
("testMerge_MergeConcat_OuterError", ObservableMultipleTest.testMerge_MergeConcat_OuterError),
("testMerge_MergeConcat_InnerError", ObservableMultipleTest.testMerge_MergeConcat_InnerError),
("testMergeSync_Data", ObservableMultipleTest.testMergeSync_Data),
("testMergeSync_ObservableOfObservable_InnerThrows", ObservableMultipleTest.testMergeSync_ObservableOfObservable_InnerThrows),
("testCombineLatest_DeadlockErrorAfterN", ObservableMultipleTest.testCombineLatest_DeadlockErrorAfterN),
("testCombineLatest_DeadlockErrorImmediatelly", ObservableMultipleTest.testCombineLatest_DeadlockErrorImmediatelly),
("testReplay_DeadlockEmpty", ObservableMultipleTest.testReplay_DeadlockEmpty),
Expand Down
122 changes: 122 additions & 0 deletions Tests/RxSwiftTests/Observable+MultipleTest2.swift
Original file line number Diff line number Diff line change
Expand Up @@ -993,6 +993,128 @@ extension ObservableMultipleTest {
#endif
}

extension ObservableMultipleTest {
func testMergeSync_Data() {
let test: (@escaping (Observable<Int>, Observable<Int>, Observable<Int>) -> (Observable<Int>)) -> () = { make in
let scheduler = TestScheduler(initialClock: 0)

let ys1 = scheduler.createColdObservable([
next(10, 101),
next(20, 102),
completed(230)
])

let ys2 = scheduler.createColdObservable([
next(10, 201),
next(20, 202),
completed(50)
])

let ys3 = scheduler.createColdObservable([
next(10, 301),
next(20, 302),
completed(150)
])

let res = scheduler.start {
make(ys1.asObservable(), ys2.asObservable(), ys3.asObservable())
}

let messages = [
next(210, 101),
next(210, 201),
next(210, 301),
next(220, 102),
next(220, 202),
next(220, 302),
completed(430)
]

XCTAssertEqual(res.events, messages)

XCTAssertEqual(ys1.subscriptions, [
Subscription(200, 430),
])

XCTAssertEqual(ys2.subscriptions, [
Subscription(200, 250),
])

XCTAssertEqual(ys3.subscriptions, [
Subscription(200, 350),
])
}

test { ys1, ys2, ys3 in
return Observable.merge(ys1, ys2, ys3)
}
test { ys1, ys2, ys3 in
return Observable.merge(AnyCollection([ys1, ys2, ys3]))
}
test { ys1, ys2, ys3 in
return Observable.merge([ys1, ys2, ys3])
}
}

func testMergeSync_ObservableOfObservable_InnerThrows() {
let test: (@escaping (Observable<Int>, Observable<Int>, Observable<Int>) -> (Observable<Int>)) -> () = { make in
let scheduler = TestScheduler(initialClock: 0)

let ys1 = scheduler.createColdObservable([
next(10, 101),
next(20, 102),
completed(230)
])

let ys2 = scheduler.createColdObservable([
next(10, 201),
error(15, testError)
])

let ys3 = scheduler.createColdObservable([
next(10, 301),
next(20, 302),
completed(150)
])

let res = scheduler.start {
make(ys1.asObservable(), ys2.asObservable(), ys3.asObservable())
}

let messages = [
next(210, 101),
next(210, 201),
next(210, 301),
error(215, testError)
]

XCTAssertEqual(res.events, messages)

XCTAssertEqual(ys1.subscriptions, [
Subscription(200, 215),
])

XCTAssertEqual(ys2.subscriptions, [
Subscription(200, 215),
])

XCTAssertEqual(ys3.subscriptions, [
Subscription(200, 215),
])
}

test { ys1, ys2, ys3 in
return Observable.merge(ys1, ys2, ys3)
}
test { ys1, ys2, ys3 in
return Observable.merge(AnyCollection([ys1, ys2, ys3]))
}
test { ys1, ys2, ys3 in
return Observable.merge([ys1, ys2, ys3])
}
}
}

// MARK: combine latest
extension ObservableMultipleTest {

Expand Down

0 comments on commit 6eb6853

Please sign in to comment.