Skip to content

Commit

Permalink
fix: ReaderListenerProxy will make a segfault (#376)
Browse files Browse the repository at this point in the history
(cherry picked from commit e3bf582)
  • Loading branch information
shibd committed Apr 9, 2024
1 parent c9f029e commit 262918a
Show file tree
Hide file tree
Showing 2 changed files with 54 additions and 11 deletions.
24 changes: 13 additions & 11 deletions src/Reader.cc
Original file line number Diff line number Diff line change
Expand Up @@ -60,17 +60,19 @@ struct ReaderListenerProxyData {
void ReaderListenerProxy(Napi::Env env, Napi::Function jsCallback, ReaderListenerProxyData *data) {
Napi::Object msg = Message::NewInstance({}, data->cMessage);
Reader *reader = data->reader;

Napi::Value ret = jsCallback.Call({msg, reader->Value()});
if (ret.IsPromise()) {
Napi::Promise promise = ret.As<Napi::Promise>();
Napi::Value thenValue = promise.Get("then");
if (thenValue.IsFunction()) {
Napi::Function then = thenValue.As<Napi::Function>();
Napi::Function callback =
Napi::Function::New(env, [data](const Napi::CallbackInfo &info) { data->callback(); });
then.Call(promise, {callback});
return;
// `reader` might be null in certain cases, segmentation fault might happend without this null check.
if (reader) {
Napi::Value ret = jsCallback.Call({msg, reader->Value()});
if (ret.IsPromise()) {
Napi::Promise promise = ret.As<Napi::Promise>();
Napi::Value thenValue = promise.Get("then");
if (thenValue.IsFunction()) {
Napi::Function then = thenValue.As<Napi::Function>();
Napi::Function callback =
Napi::Function::New(env, [data](const Napi::CallbackInfo &info) { data->callback(); });
then.Call(promise, {callback});
return;
}
}
}
data->callback();
Expand Down
41 changes: 41 additions & 0 deletions tests/reader.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -130,5 +130,46 @@ const baseUrl = 'http://localhost:8080';
await reader.close();
await client.close();
});

test('Reader should not throw segmentation fault when create and close', async () => {
const NUM_ITS = 1000;
const its = Array.from({ length: NUM_ITS }, (_, i) => i);

const client = new Pulsar.Client({
serviceUrl: 'pulsar://localhost:6650',
});

const producer = await client.createProducer({
topic: 'persistent://public/default/my-topic',
sendTimeoutMs: 30000,
batchingEnabled: true,
});

// Send messages
for (let i = 0; i < 10; i += 1) {
const msg = `my-message-${i}`;
producer.send({
data: Buffer.from(msg),
});
console.log(`Sent message: ${msg}`);
}
await producer.flush();

await Promise.all(
its.map(async () => {
const reader = await client.createReader({
topic: 'persistent://public/default/my-topic',
startMessageId: Pulsar.MessageId.earliest(),
listener: (message) => {
console.log(message.getData().toString());
},
});
await reader.close();
}),
);
await producer.close();
await client.close();
expect(true).toBe(true);
});
});
})();

0 comments on commit 262918a

Please sign in to comment.