Skip to content

Commit

Permalink
Merge pull request #124 from megawac/streams
Browse files Browse the repository at this point in the history
Make Topic streams pipeable
  • Loading branch information
rctoris committed Oct 30, 2014
2 parents b010aaa + 9356845 commit f1a9279
Show file tree
Hide file tree
Showing 2 changed files with 69 additions and 19 deletions.
25 changes: 22 additions & 3 deletions src/node/TopicStream.js
Original file line number Diff line number Diff line change
@@ -1,12 +1,31 @@
var Topic = require('../core/Topic');
var ReadableStream = require('stream').Readable;
var DuplexStream = require('stream').Duplex;

Topic.prototype.toStream = function() {
var stream = new ReadableStream({
/**
* Publish a connected ROS topic to a duplex
* stream. This stream can be piped to, which will
* publish to the topic
*/
Topic.prototype.toStream = function(transform) {
var topic = this;
var hasTransform = typeof transform === 'function';

var stream = new DuplexStream({
objectMode: true
});
stream._read = function() {};

// Publish to the topic if someone pipes to stream
stream._write = function(chunk, encoding, callback) {
if (hasTransform) {
chunk = transform(chunk);
}
if (chunk) {
topic.publish(chunk);
}
callback();
};

this.subscribe(function(message) {
stream.push(message);
});
Expand Down
63 changes: 47 additions & 16 deletions test/examples/topic-listener.example.js
Original file line number Diff line number Diff line change
@@ -1,22 +1,22 @@
var expect = require('chai').expect;
var ROSLIB = require('../..');

describe('Topics Example', function() {
this.timeout(1000);
var ros = new ROSLIB.Ros({
url: 'ws://localhost:9090'
});

var ros = new ROSLIB.Ros({
url: 'ws://localhost:9090'
});
var example = ros.Topic({
name: '/test_topic',
messageType: 'std_msgs/String'
});

var example = ros.Topic({
name: '/test_topic',
messageType: 'std_msgs/String'
});
function format(msg) {
return {data: msg};
}
var messages = ['1', '2', '3', '4'].map(format);

function format(msg) {
return {data: msg};
}
var messages = ['1', '2', '3', '4'].map(format);
describe('Topics Example', function() {
this.timeout(1000);

function createAndStreamTopic(topicName) {
var topic = ros.Topic({
Expand Down Expand Up @@ -53,8 +53,39 @@ describe('Topics Example', function() {

topic.on('unsubscribe', done);
});
});

if (ROSLIB.Topic.prototype.toStream) {
var TransformStream = require('stream').Transform;
describe('Topic Streams are readable and writable', function() {
this.timeout(1000);

function createAndStreamTopic(topicName) {
var stream = new TransformStream({objectMode: true});
var topic = ros.Topic({
name: topicName,
messageType: 'std_msgs/String'
});

var idx = 0;
function emit() {
setTimeout(function() {
stream.push(messages[idx++]);
if (idx < messages.length) {
emit();
} else {
stream.end();
topic.unsubscribe();
topic.unadvertise();
}
}, 50);
}
emit();

stream.pipe(topic.toStream());
return topic;
}

if (ROSLIB.Topic.prototype.toStream) {
it('Topic.toStream()', function(done) {
var stream = createAndStreamTopic('/echo/test-stream').toStream();
var expected = messages.slice();
Expand All @@ -65,5 +96,5 @@ describe('Topics Example', function() {
});
stream.on('end', done);
});
}
});
});
}

0 comments on commit f1a9279

Please sign in to comment.