Skip to content

Commit

Permalink
fixing lint complains
Browse files Browse the repository at this point in the history
  • Loading branch information
andrewstanovsky committed May 3, 2023
1 parent 34aed2e commit b3cb60f
Show file tree
Hide file tree
Showing 8 changed files with 172 additions and 167 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -144,18 +144,19 @@ export class AlfredResourcesFactory implements core.IResourcesFactory<AlfredReso
const kafkaSslCACertFilePath: string = config.get("kafka:lib:sslCACertFilePath");
const eventHubConnString: string = config.get("kafka:lib:eventHubConnString");

const producer = services.createProducer(
kafkaLibrary,
kafkaEndpoint,
kafkaClientId,
topic,
false,
kafkaProducerPollIntervalMs,
kafkaNumberOfPartitions,
kafkaReplicationFactor,
kafkaMaxBatchSize,
kafkaSslCACertFilePath,
eventHubConnString);
const producer = services.createProducer(
kafkaLibrary,
kafkaEndpoint,
kafkaClientId,
topic,
false,
kafkaProducerPollIntervalMs,
kafkaNumberOfPartitions,
kafkaReplicationFactor,
kafkaMaxBatchSize,
kafkaSslCACertFilePath,
eventHubConnString,
);

const redisConfig = config.get("redis");
const webSocketLibrary = config.get("alfred:webSocketLib");
Expand Down
6 changes: 3 additions & 3 deletions server/routerlicious/packages/routerlicious/src/deli/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ export async function deliCreate(
const kafkaReplicationFactor = config.get("kafka:lib:replicationFactor");
const kafkaMaxBatchSize = config.get("kafka:lib:maxBatchSize");
const kafkaSslCACertFilePath: string = config.get("kafka:lib:sslCACertFilePath");
const eventHubConnString: string = config.get("kafka:lib:eventHubConnString");
const eventHubConnString: string = config.get("kafka:lib:eventHubConnString");

const kafkaForwardClientId = config.get("deli:kafkaClientId");
const kafkaReverseClientId = config.get("alfred:kafkaClientId");
Expand Down Expand Up @@ -82,7 +82,7 @@ export async function deliCreate(
kafkaReplicationFactor,
kafkaMaxBatchSize,
kafkaSslCACertFilePath,
eventHubConnString,
eventHubConnString,
);
const reverseProducer = services.createProducer(
kafkaLibrary,
Expand All @@ -95,7 +95,7 @@ export async function deliCreate(
kafkaReplicationFactor,
kafkaMaxBatchSize,
kafkaSslCACertFilePath,
eventHubConnString,
eventHubConnString,
);

const redisConfig = config.get("redis");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ export async function scribeCreate(
const kafkaReplicationFactor = config.get("kafka:lib:replicationFactor");
const kafkaMaxBatchSize = config.get("kafka:lib:maxBatchSize");
const kafkaSslCACertFilePath: string = config.get("kafka:lib:sslCACertFilePath");
const eventHubConnString: string = config.get("kafka:lib:eventHubConnString");
const eventHubConnString: string = config.get("kafka:lib:eventHubConnString");
const sendTopic = config.get("lambdas:deli:topic");
const kafkaClientId = config.get("scribe:kafkaClientId");
const mongoExpireAfterSeconds = config.get("mongo:expireAfterSeconds") as number;
Expand Down Expand Up @@ -118,7 +118,7 @@ export async function scribeCreate(
kafkaReplicationFactor,
kafkaMaxBatchSize,
kafkaSslCACertFilePath,
eventHubConnString,
eventHubConnString,
);

const externalOrdererUrl = config.get("worker:serverUrl");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@
"nconf": "^0.12.0",
"node-rdkafka": "^2.15.0",
"sillyname": "^0.1.0",
"winston": "^3.6.0"
"winston": "^3.6.0"
},
"devDependencies": {
"@fluid-tools/build-cli": "^0.15.0",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,145 +10,151 @@ import * as log from "winston";
import { tryImportNodeRdkafka } from "./tryImport";

export interface IKafkaBaseOptions {
numberOfPartitions: number;
replicationFactor: number;
disableTopicCreation?: boolean;
sslCACertFilePath?: string;
restartOnKafkaErrorCodes?: number[];
eventHubConnString?: string;
numberOfPartitions: number;
replicationFactor: number;
disableTopicCreation?: boolean;
sslCACertFilePath?: string;
restartOnKafkaErrorCodes?: number[];
eventHubConnString?: string;
}

export interface IKafkaEndpoints {
kafka: string[];
zooKeeper?: string[];
kafka: string[];
zooKeeper?: string[];
}

export abstract class RdkafkaBase extends EventEmitter {
protected readonly kafka: typeof kafkaTypes;
protected readonly sslOptions?: kafkaTypes.ConsumerGlobalConfig;
protected defaultRestartOnKafkaErrorCodes: number[] = [];
private readonly options: IKafkaBaseOptions;

constructor(
protected readonly endpoints: IKafkaEndpoints,
public readonly clientId: string,
public readonly topic: string,
options?: Partial<IKafkaBaseOptions>,
) {
super();

const kafka = tryImportNodeRdkafka();
if (!kafka) {
throw new Error("Invalid node-rdkafka package");
}

this.kafka = kafka;
this.options = {
...options,
numberOfPartitions: options?.numberOfPartitions ?? 32,
replicationFactor: options?.replicationFactor ?? 3,
};

// In RdKafka, we can check what features are enabled using kafka.features. If "ssl" is listed,
// it means RdKafka has been built with support for SSL.
// To build node-rdkafka with SSL support, make sure OpenSSL libraries are available in the
// environment node-rdkafka would be running. Once OpenSSL is available, building node-rdkafka
// as usual will automatically include SSL support.
const rdKafkaHasSSLEnabled =
kafka.features.filter((feature) => feature.toLowerCase().includes("ssl"));

if (options?.sslCACertFilePath) {
// If the use of SSL is desired, but rdkafka has not been built with SSL support,
// throw an error making that clear to the user.
if (!rdKafkaHasSSLEnabled) {
throw new Error(
"Attempted to configure SSL, but rdkafka has not been built to support it. " +
"Please make sure OpenSSL is available and build rdkafka again.");
}

this.sslOptions = {
"security.protocol": "ssl",
"ssl.ca.location": options?.sslCACertFilePath,
};
} else if (options?.eventHubConnString) {
if (!kafka.features.filter((feature) => feature.toLowerCase().includes("sasl_ssl"))) {
throw new Error(
"Attempted to configure SASL_SSL for Event Hubs, but rdkafka has not been built to support it. " +
"Please make sure OpenSSL is available and build rdkafka again.");
}

log.error(`EventHubConnString in resourceFactory.ts: ${options?.eventHubConnString}`)

this.sslOptions = {
"security.protocol": "sasl_ssl",
"sasl.mechanisms": "PLAIN",
"sasl.username": "$ConnectionString",
"sasl.password": options?.eventHubConnString,
};
}

setTimeout(() => void this.initialize(), 1);
}

protected abstract connect(): void;

private async initialize() {
try {
if (!this.options.disableTopicCreation) {
await this.ensureTopics();
}

this.connect();
} catch (ex) {
this.error(ex);

// eslint-disable-next-line @typescript-eslint/no-floating-promises
this.initialize();

return;
}
}

protected async ensureTopics() {
const options: kafkaTypes.GlobalConfig = {
"client.id": `${this.clientId}-admin`,
"metadata.broker.list": this.endpoints.kafka.join(","),
...this.sslOptions,
};

const adminClient = this.kafka.AdminClient.create(options);

const newTopic: kafkaTypes.NewTopic = {
topic: this.topic,
num_partitions: this.options.numberOfPartitions,
replication_factor: this.options.replicationFactor,
};

return new Promise<void>((resolve, reject) => {
adminClient.createTopic(newTopic, 10000, (err) => {
adminClient.disconnect();

if (err && err.code !== this.kafka.CODES.ERRORS.ERR_TOPIC_ALREADY_EXISTS) {
reject(err);
} else {
resolve();
}
});
});
}

protected error(error: any, errorData: IContextErrorData = { restart: false }) {
const errorCodesToCauseRestart = this.options.restartOnKafkaErrorCodes ?? this.defaultRestartOnKafkaErrorCodes;

if (RdkafkaBase.isObject(error)
&& errorCodesToCauseRestart.includes((error as kafkaTypes.LibrdKafkaError).code)) {
errorData.restart = true;
}

this.emit("error", error, errorData);
}

protected static isObject(value: any): value is object {
return value !== null && typeof (value) === "object";
}
protected readonly kafka: typeof kafkaTypes;
protected readonly sslOptions?: kafkaTypes.ConsumerGlobalConfig;
protected defaultRestartOnKafkaErrorCodes: number[] = [];
private readonly options: IKafkaBaseOptions;

constructor(
protected readonly endpoints: IKafkaEndpoints,
public readonly clientId: string,
public readonly topic: string,
options?: Partial<IKafkaBaseOptions>,
) {
super();

const kafka = tryImportNodeRdkafka();
if (!kafka) {
throw new Error("Invalid node-rdkafka package");
}

this.kafka = kafka;
this.options = {
...options,
numberOfPartitions: options?.numberOfPartitions ?? 32,
replicationFactor: options?.replicationFactor ?? 3,
};

// In RdKafka, we can check what features are enabled using kafka.features. If "ssl" is listed,
// it means RdKafka has been built with support for SSL.
// To build node-rdkafka with SSL support, make sure OpenSSL libraries are available in the
// environment node-rdkafka would be running. Once OpenSSL is available, building node-rdkafka
// as usual will automatically include SSL support.
const rdKafkaHasSSLEnabled = kafka.features.filter((feature) =>
feature.toLowerCase().includes("ssl"),
);

if (options?.sslCACertFilePath) {
// If the use of SSL is desired, but rdkafka has not been built with SSL support,
// throw an error making that clear to the user.
if (!rdKafkaHasSSLEnabled) {
throw new Error(
"Attempted to configure SSL, but rdkafka has not been built to support it. " +
"Please make sure OpenSSL is available and build rdkafka again.",
);
}

this.sslOptions = {
"security.protocol": "ssl",
"ssl.ca.location": options?.sslCACertFilePath,
};
} else if (options?.eventHubConnString) {
if (!kafka.features.filter((feature) => feature.toLowerCase().includes("sasl_ssl"))) {
throw new Error(
"Attempted to configure SASL_SSL for Event Hubs, but rdkafka has not been built to support it. " +
"Please make sure OpenSSL is available and build rdkafka again.",
);
}

log.error(`EventHubConnString in resourceFactory.ts: ${options?.eventHubConnString}`);

this.sslOptions = {
"security.protocol": "sasl_ssl",
"sasl.mechanisms": "PLAIN",
"sasl.username": "$ConnectionString",
"sasl.password": options?.eventHubConnString,
};
}

setTimeout(() => void this.initialize(), 1);
}

protected abstract connect(): void;

private async initialize() {
try {
if (!this.options.disableTopicCreation) {
await this.ensureTopics();
}

this.connect();
} catch (ex) {
this.error(ex);

// eslint-disable-next-line @typescript-eslint/no-floating-promises
this.initialize();

return;
}
}

protected async ensureTopics() {
const options: kafkaTypes.GlobalConfig = {
"client.id": `${this.clientId}-admin`,
"metadata.broker.list": this.endpoints.kafka.join(","),
...this.sslOptions,
};

const adminClient = this.kafka.AdminClient.create(options);

const newTopic: kafkaTypes.NewTopic = {
topic: this.topic,
num_partitions: this.options.numberOfPartitions,
replication_factor: this.options.replicationFactor,
};

return new Promise<void>((resolve, reject) => {
adminClient.createTopic(newTopic, 10000, (err) => {
adminClient.disconnect();

if (err && err.code !== this.kafka.CODES.ERRORS.ERR_TOPIC_ALREADY_EXISTS) {
reject(err);
} else {
resolve();
}
});
});
}

protected error(error: any, errorData: IContextErrorData = { restart: false }) {
const errorCodesToCauseRestart =
this.options.restartOnKafkaErrorCodes ?? this.defaultRestartOnKafkaErrorCodes;

if (
RdkafkaBase.isObject(error) &&
errorCodesToCauseRestart.includes((error as kafkaTypes.LibrdKafkaError).code)
) {
errorData.restart = true;
}

this.emit("error", error, errorData);
}

protected static isObject(value: any): value is object {
return value !== null && typeof value === "object";
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ export class RdkafkaConsumer extends RdkafkaBase implements IConsumer {
"offset_commit_cb": true,
"rebalance_cb": this.consumerOptions.optimizedRebalance
? (err: kafkaTypes.LibrdKafkaError, assignments: kafkaTypes.Assignment[]) =>
this.rebalance(consumer, err, assignments)
this.rebalance(consumer, err, assignments)
: true,
...this.consumerOptions.additionalOptions,
...this.sslOptions,
Expand Down
Loading

0 comments on commit b3cb60f

Please sign in to comment.