Skip to content
Paul Westcott edited this page Apr 12, 2017 · 1 revision
and ConcatCommon<'T>(consumer:Activity<'T>) =
    inherit Folder<'T,NoValue>(Unchecked.defaultof<_>)
    override me.ProcessNext value = consumer.ProcessNext value

and ConcatOutOfBand () =
    let outOfBands = ResizeArray<IOutOfBand> ()

    interface IOutOfBand with
        member this.StopFurtherProcessing pipeIdx =
            for i = 0 to outOfBands.Count-1 do
                outOfBands.[i].StopFurtherProcessing pipeIdx

    member this.Push outOfBand =
        outOfBands.Add outOfBand

    member this.Pop n =
        for i = 1 to n do
            outOfBands.RemoveAt (outOfBands.Count-1)

and IConcatGetOutOfBand =
    abstract GetOutOfBand : unit -> ConcatOutOfBand

and ConcatFold<'T,'U,'Collection,'Result when 'Collection :> ISeq<'T>>(outOfBand:ConcatOutOfBand, result:Folder<'U,'Result>, consumer:Activity<'T,'U>, common:Folder<'T,NoValue>) as this =
    inherit Folder<'Collection, 'Result>(Unchecked.defaultof<_>)

    do
        outOfBand.Push this
        outOfBand.Push result
        outOfBand.Push common

    override __.ProcessNext value =
        value.Fold (fun _ -> common) |> ignore
        Unchecked.defaultof<_> (* return value unused in Fold context *)

    override this.ChainComplete _ =
        outOfBand.Pop 3
        consumer.ChainComplete result.HaltedIdx
        this.Result <- result.Result

    override this.ChainDispose () =
        consumer.ChainDispose ()

    interface IConcatGetOutOfBand with
        member __.GetOutOfBand () = outOfBand

and ConcatEnumerable<'T,'U,'Collection when 'Collection :> ISeq<'T>> (sources:ISeq<'Collection>, transformFactory:TransformFactory<'T,'U>, pipeIdx:PipeIdx) =
    inherit EnumerableBase<'U>()

    interface IEnumerable<'U> with
        member __.GetEnumerator () : IEnumerator<'U> =
            let result = Result<'U> ()
            Upcast.enumerator (new ConcatEnumerator<'T,'U,'Collection>(sources, createFold transformFactory result pipeIdx, result))

    interface ISeq<'U> with
        member this.PushTransform (next:TransformFactory<'U,'V>) : ISeq<'V> =
            Upcast.seq (new ConcatEnumerable<'T,'V,'Collection>(sources, ComposedFactory.Combine transformFactory next, pipeIdx+1))

        member this.Fold<'Result> (createFolder:PipeIdx->Folder<'U,'Result>) =
            sources.Fold (fun lowerPipeIdx ->
                let thisPipeIdx = lowerPipeIdx + pipeIdx

                let result = createFolder (thisPipeIdx+1)

                let outOfBand = 
                    match box result with
                    | :? IConcatGetOutOfBand as get -> get.GetOutOfBand ()
                    | _ -> ConcatOutOfBand ()

                let consumer =
                    transformFactory.Compose (Upcast.outOfBand outOfBand) thisPipeIdx result 
                     
                let common =
                    match box consumer with
                    | :? ConcatCommon<'T> as common -> common
                    | _ -> upcast ConcatCommon consumer

                upcast ConcatFold (outOfBand, result, consumer, common))
Clone this wiki locally