Skip to content

Commit

Permalink
Add partitioned topic unit test for Reader. (#329)
Browse files Browse the repository at this point in the history
  • Loading branch information
shibd authored Jun 2, 2023
1 parent 2e1f8fd commit c959474
Show file tree
Hide file tree
Showing 3 changed files with 127 additions and 34 deletions.
52 changes: 18 additions & 34 deletions tests/client.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -17,32 +17,10 @@
* under the License.
*/

const http = require('http');
const httpRequest = require('./http_utils.js');
const Pulsar = require('../index.js');

const baseUrl = 'http://localhost:8080';
const requestAdminApi = (url, { headers, data = {}, method = 'PUT' }) => new Promise((resolve, reject) => {
const req = http.request(url, {
headers,
method,
}, (res) => {
let responseBody = '';
res.on('data', (chunk) => {
responseBody += chunk;
});
res.on('end', () => {
resolve({ responseBody, statusCode: res.statusCode });
});
});

req.on('error', (error) => {
reject(error);
});

req.write(JSON.stringify(data));

req.end();
});

(() => {
describe('Client', () => {
Expand Down Expand Up @@ -96,11 +74,14 @@ const requestAdminApi = (url, { headers, data = {}, method = 'PUT' }) => new Pro
const nonPartitionedTopicName = 'test-non-partitioned-topic';
const nonPartitionedTopic = `persistent://public/default/${nonPartitionedTopicName}`;
const nonPartitionedTopicAdminURL = `${baseUrl}/admin/v2/persistent/public/default/${nonPartitionedTopicName}`;
const createNonPartitionedTopicRes = await requestAdminApi(nonPartitionedTopicAdminURL, {
headers: {
'Content-Type': 'application/json',
const createNonPartitionedTopicRes = await httpRequest(
nonPartitionedTopicAdminURL, {
headers: {
'Content-Type': 'application/json',
},
method: 'PUT',
},
});
);
expect(createNonPartitionedTopicRes.statusCode).toBe(204);

const nonPartitionedTopicList = await client.getPartitionsForTopic(nonPartitionedTopic);
Expand All @@ -110,12 +91,15 @@ const requestAdminApi = (url, { headers, data = {}, method = 'PUT' }) => new Pro
const partitionedTopicName = 'test-partitioned-topic-1';
const partitionedTopic = `persistent://public/default/${partitionedTopicName}`;
const partitionedTopicAdminURL = `${baseUrl}/admin/v2/persistent/public/default/${partitionedTopicName}/partitions`;
const createPartitionedTopicRes = await requestAdminApi(partitionedTopicAdminURL, {
headers: {
'Content-Type': 'text/plain',
const createPartitionedTopicRes = await httpRequest(
partitionedTopicAdminURL, {
headers: {
'Content-Type': 'text/plain',
},
data: 4,
method: 'PUT',
},
data: 4,
});
);
expect(createPartitionedTopicRes.statusCode).toBe(204);

const partitionedTopicList = await client.getPartitionsForTopic(partitionedTopic);
Expand All @@ -126,9 +110,9 @@ const requestAdminApi = (url, { headers, data = {}, method = 'PUT' }) => new Pro
'persistent://public/default/test-partitioned-topic-1-partition-3',
]);

const deleteNonPartitionedTopicRes = await requestAdminApi(nonPartitionedTopicAdminURL, { method: 'DELETE' });
const deleteNonPartitionedTopicRes = await httpRequest(nonPartitionedTopicAdminURL, { method: 'DELETE' });
expect(deleteNonPartitionedTopicRes.statusCode).toBe(204);
const deletePartitionedTopicRes = await requestAdminApi(partitionedTopicAdminURL, { method: 'DELETE' });
const deletePartitionedTopicRes = await httpRequest(partitionedTopicAdminURL, { method: 'DELETE' });
expect(deletePartitionedTopicRes.statusCode).toBe(204);

await client.close();
Expand Down
45 changes: 45 additions & 0 deletions tests/http_utils.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

const http = require('http');

const request = (url, { headers, data = {}, method }) => new Promise((resolve, reject) => {
const req = http.request(url, {
headers,
method,
}, (res) => {
let responseBody = '';
res.on('data', (chunk) => {
responseBody += chunk;
});
res.on('end', () => {
resolve({ responseBody, statusCode: res.statusCode });
});
});

req.on('error', (error) => {
reject(error);
});

req.write(JSON.stringify(data));

req.end();
});

module.exports = request;
64 changes: 64 additions & 0 deletions tests/reader.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,11 @@
* under the License.
*/

const lodash = require('lodash');
const Pulsar = require('../index.js');
const httpRequest = require('./http_utils');

const baseUrl = 'http://localhost:8080';

(() => {
describe('Reader', () => {
Expand Down Expand Up @@ -66,5 +70,65 @@ const Pulsar = require('../index.js');
})).rejects.toThrow('StartMessageId is required and must be specified as a MessageId object when creating reader');
await client.close();
});

test('Reader by Partitioned Topic', async () => {
const client = new Pulsar.Client({
serviceUrl: 'pulsar://localhost:6650',
operationTimeoutSeconds: 30,
});

// Create partitioned topic.
const partitionedTopicName = 'test-reader-partitioned-topic';
const partitionedTopic = `persistent://public/default/${partitionedTopicName}`;
const partitionedTopicAdminURL = `${baseUrl}/admin/v2/persistent/public/default/${partitionedTopicName}/partitions`;
const createPartitionedTopicRes = await httpRequest(
partitionedTopicAdminURL, {
headers: {
'Content-Type': 'text/plain',
},
data: 4,
method: 'PUT',
},
);
expect(createPartitionedTopicRes.statusCode).toBe(204);

const producer = await client.createProducer({
topic: partitionedTopic,
sendTimeoutMs: 30000,
batchingEnabled: true,
});
expect(producer).not.toBeNull();

const reader = await client.createReader({
topic: partitionedTopic,
startMessageId: Pulsar.MessageId.latest(),
});
expect(reader).not.toBeNull();

const messages = [];
for (let i = 0; i < 10; i += 1) {
const msg = `my-message-${i}`;
producer.send({
data: Buffer.from(msg),
});
messages.push(msg);
}
await producer.flush();

expect(reader.hasNext()).toBe(true);

const results = [];
for (let i = 0; i < 10; i += 1) {
const msg = await reader.readNext();
results.push(msg.getData().toString());
}
expect(lodash.difference(messages, results)).toEqual([]);

expect(reader.hasNext()).toBe(false);

await producer.close();
await reader.close();
await client.close();
});
});
})();

0 comments on commit c959474

Please sign in to comment.