-
Notifications
You must be signed in to change notification settings - Fork 762
/
Copy pathQueryLanguage.Conversions.cs
109 lines (87 loc) · 3.67 KB
/
QueryLanguage.Conversions.cs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
// Licensed to the .NET Foundation under one or more agreements.
// The .NET Foundation licenses this file to you under the MIT License.
// See the LICENSE file in the project root for more information.
using System.Collections.Generic;
using System.Reactive.Concurrency;
namespace System.Reactive.Linq
{
using ObservableImpl;
internal partial class QueryLanguage
{
#region + Subscribe +
public virtual IDisposable Subscribe<TSource>(IEnumerable<TSource> source, IObserver<TSource> observer)
{
return Subscribe_(source, observer, SchedulerDefaults.Iteration);
}
public virtual IDisposable Subscribe<TSource>(IEnumerable<TSource> source, IObserver<TSource> observer, IScheduler scheduler)
{
return Subscribe_(source, observer, scheduler);
}
private static IDisposable Subscribe_<TSource>(IEnumerable<TSource> source, IObserver<TSource> observer, IScheduler scheduler)
{
var longRunning = scheduler.AsLongRunning();
if (longRunning != null)
{
//
// [OK] Use of unsafe Subscribe: we're calling into a known producer implementation.
//
return new ToObservableLongRunning<TSource>(source, longRunning).Subscribe/*Unsafe*/(observer);
}
//
// [OK] Use of unsafe Subscribe: we're calling into a known producer implementation.
//
return new ToObservableRecursive<TSource>(source, scheduler).Subscribe/*Unsafe*/(observer);
}
#endregion
#region + ToEnumerable +
public virtual IEnumerable<TSource> ToEnumerable<TSource>(IObservable<TSource> source)
{
return new AnonymousEnumerable<TSource>(() => source.GetEnumerator());
}
#endregion
#region ToEvent
public virtual IEventSource<Unit> ToEvent(IObservable<Unit> source)
{
return new EventSource<Unit>(source, (h, _) => h(Unit.Default));
}
public virtual IEventSource<TSource> ToEvent<TSource>(IObservable<TSource> source)
{
return new EventSource<TSource>(source, (h, value) => h(value));
}
#endregion
#region ToEventPattern
public virtual IEventPatternSource<TEventArgs> ToEventPattern<TEventArgs>(IObservable<EventPattern<TEventArgs>> source)
{
return new EventPatternSource<TEventArgs>(
source,
(h, evt) => h(evt.Sender, evt.EventArgs)
);
}
#endregion
#region + ToObservable +
public virtual IObservable<TSource> ToObservable<TSource>(IEnumerable<TSource> source)
{
return ToObservable_(source, SchedulerDefaults.Iteration);
}
public virtual IObservable<TSource> ToObservable<TSource>(IEnumerable<TSource> source, IScheduler scheduler)
{
return ToObservable_(source, scheduler);
}
private static IObservable<TSource> ToObservable_<TSource>(IEnumerable<TSource> source, IScheduler scheduler)
{
var longRunning = scheduler.AsLongRunning();
if (longRunning != null)
{
//
// [OK] Use of unsafe Subscribe: we're calling into a known producer implementation.
//
return new ToObservableLongRunning<TSource>(source, longRunning);
}
//
// [OK] Use of unsafe Subscribe: we're calling into a known producer implementation.
//
return new ToObservableRecursive<TSource>(source, scheduler);
}
#endregion
}
}