Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Sorted Merge (Help Needed) #325

Closed
svozza opened this issue Jun 28, 2015 · 11 comments
Closed

Sorted Merge (Help Needed) #325

svozza opened this issue Jun 28, 2015 · 11 comments
Labels

Comments

@svozza
Copy link
Collaborator

svozza commented Jun 28, 2015

I'm having a bit of difficulty implementing a transform that merges an arbitrary number of sorted streams. I hacked around a bit with the zipAll implementation but for some reason only the first element of the sorted stream gets emited when I call each (toArray doesn't even produce output). However, I know that the function emits the rest of the items in the correct order because if I put some console.log statements in I can track the elements as they're processed. Here's what I have so far (apologies for the code dump):

function sortedMergeBy(pred, streams) {
    var buffer = [];

    return _(streams).collect().flatMap(function (srcs) {

        function nextValue(index, src, push, next) {
            src.pull(function (err, x) {
                if (err) {
                    push(err);
                    nextValue(index, src, push, next);
                }
                else if (x === _.nil) {
                    // push last element in buffer
                    push(null, buffer[index]);
                    // must be final stream
                    if (srcs.length === 1) {
                        push(null, _.nil);
                    }
                    else {
                        // remove stream from list of streams to be
                        // pulled from and remove corresponding index in
                        // buffer
                        buffer.splice(index, 1);
                        srcs.splice(index, 1);
                        next();
                    }
                }
                else {
                    // buffer now contains an element from each stream
                    if (buffer.length === srcs.length) {
                        push(null, buffer[index]);
                    }
                    // replace old buffer value with new one, using the same index
                    buffer[index] = x;
                    next();
                }
            });
        }

        if (!srcs.length) {
            return _([]);
        }

        return _(function (push, next) {
            // need to buffer first element of all streams first before 
            // beginning comparisons
            if (buffer.length < srcs.length) {
                for (var i = 0, length = srcs.length; i < length; i++) {
                    nextValue(i, srcs[i], push, next);
                }
            }
            // apply comparison function to find index in buffer of
            // element to push
            var pair = buffer.reduce(function (memo, item, idx) {
                return pred(item, memo[1]) ? [idx, item] : memo;
            }, [0, buffer[0]]);
            // index of element in buffer allows us to know which stream to pull from
            nextValue(pair[0], srcs[pair[0]], push, next);
        });
    });
};

Here's the output of running it:

var arr = [8, 20, 303, 304, 555, 666];
var arr2 = [3, 4, 5, 6, 77, 88];
var arr3 = [77, 89, 97 ,1000, 1100, 1200];
var arr4 = [6, 7, 8, 9, 10, 11];
var arr5 = [13, 17, 18, 19, 111, 211];

var xs = [_(arr), _(arr2), _(arr3), _(arr4), _(arr5)];

sortedMergeBy(function(a, b) {
    return b > a;
}, xs).each(_.log); //=> 3

Is there anything obvious I'm doing wrong here?

@vqvu
Copy link
Collaborator

vqvu commented Jun 28, 2015

I think this is a bug in how generator streams interact with pull in 2.x. For some reason, the stream is uncapped if you pull from it. It seems to happen if you call next first without pushing anything. Here's a simplified test case.

var i = 0;
var a = _(function (push, next) {
    // This is perfectly legal. It should act the same as if you called next *after* push.
    if (i === 0) {
        next();
    }
    push(null, i++);
    if (i === 5) {
        push(null, _.nil);
    }
    else {
        next();
    }
});
a.pull(_.log); // => null, 1
a.each(_.log); // => nothing

I'll look more into this tomorrow.

@vqvu
Copy link
Collaborator

vqvu commented Jun 28, 2015

Bug aside, your code doesn't work with async sources. You need to find a different way to determine when all streams have emitted their first value. The buffer.length === srcs.length check doesn't work because of this

var s = [];
s[5] = 5;
console.log(s.length); // => 6

So if your last stream is the fastest, you'll think buffer is filled when it's not.

Also, you probably want an else here

if (bufferNotFilled) {
    for (var i = 0, length = srcs.length; i < length; i++) {
        nextValue(i, srcs[i], push, next);
    }
}
else {
    // apply comparison function to find index in buffer of
    // element to push
    var pair = buffer.reduce(function (memo, item, idx) {
        return pred(item, memo[1]) ? [idx, item] : memo;
    }, [0, buffer[0]]);
    // index of element in buffer allows us to know which stream to pull from
    nextValue(pair[0], srcs[pair[0]], push, next);
}

@vqvu vqvu added the bug label Jun 28, 2015
@svozza
Copy link
Collaborator Author

svozza commented Jun 28, 2015

Ah right, I hadn't really considered the async case because my streams are really just a bunch of arrays I get all at once that I dump into Highland. I guess I could have another array that the sources do an array.push to on their first pass and then when that equals src.length I'll know I've got all of them.

@svozza
Copy link
Collaborator Author

svozza commented Jun 28, 2015

Or a much more elegant solution given that I'm running Node 0.12 is to use a hashmap.

function sortedMergeBy(pred, streams) {
    var buffer = new Map();

    return _(streams).collect().flatMap(function (srcs) {

        function nextValue(src, push, next) {
            src.pull(function (err, x) {
                if (err) {
                    push(err);
                    nextValue(src, push, next);
                }
                else if (x === _.nil) {
                    // push last element in buffer
                    push(null, buffer.get(src));
                    // must be final stream
                    if (buffer.size === 1) {
                        push(null, _.nil);
                    }
                    else {
                        // remove stream from map of streams and
                        // from array of source streams
                        buffer.delete(src);
                        srcs.splice(srcs.indexOf(src), 1);
                        next();
                    }
                }
                else {
                    // buffer now contains all streams
                    if (buffer.size === srcs.length) {
                        push(null, buffer.get(src));
                    }
                    // replace old buffer key/value with new one
                    buffer.set(src, x);
                    next();
                }
            });
        }

        if (!srcs.length) {
            return _([]);
        }

        return _(function (push, next) {
            // need to buffer first element of all streams first before beginning 
            // comparisons
            if (buffer.size < srcs.length) {
                for (var i = 0, length = srcs.length; i < length; i++) {
                    nextValue(srcs[i], push, next);
                }
            }
            // apply comparison function to find which stream in buffer to
            // push from next
            var srcToPull;
            for(var pair of buffer.entries()) {
                srcToPull = srcToPull == null || pred(pair[1], srcToPull[1]) ? pair : srcToPull;
            }  
            nextValue(srcToPull[0], push, next);
        });
    });
};

@vqvu
Copy link
Collaborator

vqvu commented Jun 28, 2015

Probable fix in #326.

@svozza
Copy link
Collaborator Author

svozza commented Jun 28, 2015

Brilliant!

@vqvu
Copy link
Collaborator

vqvu commented Jun 28, 2015

This might not matter to you since you don't have async streams, but the current implementation still doesn't work. You only want to run the first for loop once, but this may run multiple times if you have async streams, since you call next in nextValue, the first async stream to complete will call next, which will cause the generator to be executed again. That will execute nextValue a bunch more times, and possible try to pull from a stream that already has a pull pending (which is illegal).

return _(function (push, next) {
    // need to buffer first element of all streams first before beginning 
    // comparisons
    if (firstTime) {
        for (var i = 0, length = srcs.length; i < length; i++) {
            nextValue(srcs[i], push, next);
        }
    }
    // apply comparison function to find which stream in buffer to
    // push from next
    if (buffer.length === srcs.length) {
        var srcToPull;
        for(var pair of buffer.entries()) {
            srcToPull = srcToPull == null || pred(pair[1], srcToPull[1]) ? pair : srcToPull;
        }  
        nextValue(srcToPull[0], push, next);
    }
});

@svozza
Copy link
Collaborator Author

svozza commented Jun 29, 2015

I see what you mean. Back to the drawing board!

@vqvu vqvu closed this as completed in ade706b Jun 29, 2015
@svozza
Copy link
Collaborator Author

svozza commented Dec 3, 2015

I finally got round to doing this and it looks like the fix in #326 didn't solve the issue but it does work perfectly in Highland 3.0.0!

https://github.com/svozza/sortedmergeby/blob/master/sortedMergeBy.js

(I swear I'm not procrastinating over my using PR 😃 )

@vqvu
Copy link
Collaborator

vqvu commented Dec 5, 2015

How do you know the fix doesn't work? I just ran your tests against master and they all pass.

Edit: typo.

@vqvu vqvu mentioned this issue Dec 7, 2015
@svozza
Copy link
Collaborator Author

svozza commented Dec 7, 2015

Completely missed your response here! I just realised I was running my initial tests against 2.5.1, which doesn't work but master does.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

No branches or pull requests

2 participants