Skip to content

Commit

Permalink
Admin examples for available APIs (#84)
Browse files Browse the repository at this point in the history
  • Loading branch information
emasab authored Sep 15, 2024
1 parent 8ce7cfb commit d014984
Show file tree
Hide file tree
Showing 7 changed files with 412 additions and 40 deletions.
40 changes: 0 additions & 40 deletions examples/kafkajs/admin.js

This file was deleted.

79 changes: 79 additions & 0 deletions examples/kafkajs/admin/create-topics.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
// require('kafkajs') is replaced with require('@confluentinc/kafka-javascript').KafkaJS.
const { Kafka } = require('@confluentinc/kafka-javascript').KafkaJS;
const { parseArgs } = require('node:util');

async function adminStart() {
const args = parseArgs({
options: {
'bootstrap-servers': {
type: 'string',
short: 'b',
default: 'localhost:9092',
},
'topic': {
type: 'string',
short: 't',
default: 'test-topic',
},
'timeout': {
type: 'string',
short: 'm',
default: undefined,
},
'num-partitions': {
type: 'string',
short: 'p',
default: '3',
},
'replication-factor': {
type: 'string',
short: 'r',
default: '1',
}
},
});

let {
'bootstrap-servers': bootstrapServers,
timeout,
'num-partitions': numPartitions,
'replication-factor': replicationFactor,
topic,
} = args.values;

if (timeout) {
timeout = Number(timeout) || 0;
}

numPartitions = Number(numPartitions) || 3;
replicationFactor = Number(replicationFactor) || 1;

const kafka = new Kafka({
kafkaJS: {
brokers: [bootstrapServers],
}
});

const admin = kafka.admin();
await admin.connect();

try {
await admin.createTopics({
topics: [
{
topic: topic,
numPartitions: numPartitions,
replicationFactor: replicationFactor,
}
],
timeout,
});
console.log(`Topic "${topic}" created successfully`);
} catch(err) {
console.log(`Topic creation failed`, err);
}

await admin.disconnect();
}

adminStart();
64 changes: 64 additions & 0 deletions examples/kafkajs/admin/delete-groups.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
// require('kafkajs') is replaced with require('@confluentinc/kafka-javascript').KafkaJS.
const { Kafka } = require('@confluentinc/kafka-javascript').KafkaJS;
const { parseArgs } = require('node:util');

async function adminStart() {
const args = parseArgs({
options: {
'bootstrap-servers': {
type: 'string',
short: 'b',
default: 'localhost:9092',
},
'timeout': {
type: 'string',
short: 'm',
default: undefined,
},
'group-ids': {
type: 'string',
short: 'g',
multiple: true,
default: [],
},
},
});

let {
'bootstrap-servers': bootstrapServers,
timeout,
'group-ids': groupIds,
} = args.values;

if (!groupIds.length) {
console.error('Group ids are required');
process.exit(1);
}

if (timeout) {
timeout = Number(timeout) || 0;
}

const kafka = new Kafka({
kafkaJS: {
brokers: [bootstrapServers],
}
});

const admin = kafka.admin();
await admin.connect();

try {
await admin.deleteGroups(
groupIds,
{ timeout },
);
console.log(`Groups "${groupIds.join(',')}" deleted successfully`);
} catch(err) {
console.log(`Group deletion failed`, err);
}

await admin.disconnect();
}

adminStart();
64 changes: 64 additions & 0 deletions examples/kafkajs/admin/delete-topics.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
// require('kafkajs') is replaced with require('@confluentinc/kafka-javascript').KafkaJS.
const { Kafka } = require('@confluentinc/kafka-javascript').KafkaJS;
const { parseArgs } = require('node:util');

async function adminStart() {
const args = parseArgs({
options: {
'bootstrap-servers': {
type: 'string',
short: 'b',
default: 'localhost:9092',
},
'timeout': {
type: 'string',
short: 'm',
default: undefined,
},
'topics': {
type: 'string',
short: 't',
multiple: true,
default: [],
},
},
});

let {
'bootstrap-servers': bootstrapServers,
timeout,
topics,
} = args.values;

if (!topics.length) {
console.error('Topics names is required');
process.exit(1);
}

if (timeout) {
timeout = Number(timeout) || 0;
}

const kafka = new Kafka({
kafkaJS: {
brokers: [bootstrapServers],
}
});

const admin = kafka.admin();
await admin.connect();

try {
await admin.deleteTopics({
topics,
timeout,
});
console.log(`Topics "${topics.join(',')}" deleted successfully`);
} catch(err) {
console.log(`Topic deletion failed`, err);
}

await admin.disconnect();
}

adminStart();
88 changes: 88 additions & 0 deletions examples/kafkajs/admin/describe-groups.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
// require('kafkajs') is replaced with require('@confluentinc/kafka-javascript').KafkaJS.
const { Kafka, ConsumerGroupStates } = require('@confluentinc/kafka-javascript').KafkaJS;
const { parseArgs } = require('node:util');

function printNode(node, prefix = '') {
if (!node)
return;
console.log(`${prefix}\tHost: ${node.host}`);
console.log(`${prefix}\tPort: ${node.port}`);
console.log(`${prefix}\tRack: ${node.rack}`);
}

async function adminStart() {
const args = parseArgs({
options: {
'bootstrap-servers': {
type: 'string',
short: 'b',
default: 'localhost:9092',
},
'timeout': {
type: 'string',
short: 'm',
default: undefined,
},
'groups': {
type: 'string',
short: 'g',
multiple: true,
default: [],
},
'include-authorized-operations': {
type: 'boolean',
short: 'i',
default: false,
}
},
});

let {
'bootstrap-servers': bootstrapServers,
timeout,
groups,
'include-authorized-operations': includeAuthorizedOperations,
} = args.values;

if (timeout) {
timeout = Number(timeout) || 0;
}

const kafka = new Kafka({
kafkaJS: {
brokers: [bootstrapServers],
}
});

const admin = kafka.admin();
await admin.connect();

try {
const groupDescriptions = await admin.describeGroups(
groups,
{
timeout,
includeAuthorizedOperations,
}
);
for (const group of groupDescriptions.groups) {
console.log(`Group id: ${group.groupId}`);
console.log(`\tError: ${group.error}`);
console.log(`\tProtocol: ${group.protocol}`);
console.log(`\tProtocol type: ${group.protocolType}`);
console.log(`\tPartition assignor: ${group.partitionAssignor}`);
console.log(`\tState: ${group.state}`);
console.log(`\tCoordinator: ${group.coordinator ? group.coordinator.id : group.coordinator}`);
printNode(group.coordinator, '\t');
console.log(`\tAuthorized operations: ${group.authorizedOperations}`);
console.log(`\tIs simple: ${group.isSimpleConsumerGroup}`);
console.log(`\tState: ${group.state}`);
}
} catch(err) {
console.log('Describe groups failed', err);
}

await admin.disconnect();
}

adminStart();
Loading

0 comments on commit d014984

Please sign in to comment.