Skip to content

Commit 4c75de0

Browse files
authored
Merge pull request #3985 from nirga/master
Initial version of new Kafka plugin
2 parents fc7fde4 + 5edf0ad commit 4c75de0

36 files changed

+1478
-4
lines changed

packages/app/package.json

+1
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
"@backstage/plugin-graphiql": "^0.2.3",
2121
"@backstage/plugin-org": "^0.3.2",
2222
"@backstage/plugin-jenkins": "^0.3.4",
23+
"@backstage/plugin-kafka": "^0.1.0",
2324
"@backstage/plugin-kubernetes": "^0.3.3",
2425
"@backstage/plugin-lighthouse": "^0.2.6",
2526
"@backstage/plugin-newrelic": "^0.2.2",

packages/app/src/components/catalog/EntityPage.tsx

+6
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,7 @@ import {
6363
UserProfileCard,
6464
} from '@backstage/plugin-org';
6565
import { Router as SentryRouter } from '@backstage/plugin-sentry';
66+
import { Router as KafkaRouter } from '@backstage/plugin-kafka';
6667
import { EmbeddedDocsRouter as DocsRouter } from '@backstage/plugin-techdocs';
6768
import { Button, Grid } from '@material-ui/core';
6869
import {
@@ -243,6 +244,11 @@ const ServiceEntityPage = ({ entity }: { entity: Entity }) => (
243244
title="Code Insights"
244245
element={<GitHubInsightsRouter entity={entity} />}
245246
/>
247+
<EntityPageLayout.Content
248+
path="/kafka/*"
249+
title="Kafka"
250+
element={<KafkaRouter entity={entity} />}
251+
/>
246252
</EntityPageLayout>
247253
);
248254

packages/app/src/plugins.ts

+1
Original file line numberDiff line numberDiff line change
@@ -43,3 +43,4 @@ export { plugin as PagerDuty } from '@backstage/plugin-pagerduty';
4343
export { plugin as Buildkite } from '@roadiehq/backstage-plugin-buildkite';
4444
export { plugin as Search } from '@backstage/plugin-search';
4545
export { plugin as Org } from '@backstage/plugin-org';
46+
export { plugin as Kafka } from '@backstage/plugin-kafka';

packages/backend/package.json

+1
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@
3535
"@backstage/plugin-catalog-backend": "^0.5.3",
3636
"@backstage/plugin-graphql-backend": "^0.1.4",
3737
"@backstage/plugin-kubernetes-backend": "^0.2.4",
38+
"@backstage/plugin-kafka-backend": "^0.1.0",
3839
"@backstage/plugin-proxy-backend": "^0.2.3",
3940
"@backstage/plugin-rollbar-backend": "^0.1.5",
4041
"@backstage/plugin-scaffolder-backend": "^0.4.1",

packages/backend/src/index.ts

+3
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@ import healthcheck from './plugins/healthcheck';
3838
import auth from './plugins/auth';
3939
import catalog from './plugins/catalog';
4040
import kubernetes from './plugins/kubernetes';
41+
import kafka from './plugins/kafka';
4142
import rollbar from './plugins/rollbar';
4243
import scaffolder from './plugins/scaffolder';
4344
import proxy from './plugins/proxy';
@@ -77,6 +78,7 @@ async function main() {
7778
const rollbarEnv = useHotMemoize(module, () => createEnv('rollbar'));
7879
const techdocsEnv = useHotMemoize(module, () => createEnv('techdocs'));
7980
const kubernetesEnv = useHotMemoize(module, () => createEnv('kubernetes'));
81+
const kafkaEnv = useHotMemoize(module, () => createEnv('kafka'));
8082
const graphqlEnv = useHotMemoize(module, () => createEnv('graphql'));
8183
const appEnv = useHotMemoize(module, () => createEnv('app'));
8284

@@ -87,6 +89,7 @@ async function main() {
8789
apiRouter.use('/auth', await auth(authEnv));
8890
apiRouter.use('/techdocs', await techdocs(techdocsEnv));
8991
apiRouter.use('/kubernetes', await kubernetes(kubernetesEnv));
92+
apiRouter.use('/kafka', await kafka(kafkaEnv));
9093
apiRouter.use('/proxy', await proxy(proxyEnv));
9194
apiRouter.use('/graphql', await graphql(graphqlEnv));
9295
apiRouter.use(notFoundHandler());

packages/backend/src/plugins/kafka.ts

+25
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
/*
2+
* Copyright 2020 Spotify AB
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
import { createRouter } from '@backstage/plugin-kafka-backend';
18+
import { PluginEnvironment } from '../types';
19+
20+
export default async function createPlugin({
21+
logger,
22+
config,
23+
}: PluginEnvironment) {
24+
return await createRouter({ logger, config });
25+
}

plugins/kafka-backend/.eslintrc.js

+3
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
module.exports = {
2+
extends: [require.resolve('@backstage/cli/config/eslint.backend')],
3+
};

plugins/kafka-backend/README.md

+28
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
# Kafka Backend
2+
3+
This is the backend part of the Kafka plugin. It responds to Kafka requests from the frontend.
4+
5+
## Configuration
6+
7+
This configures how to connect to the brokers in your Kafka cluster.
8+
9+
### clientId
10+
11+
The name of the client to use when connecting to the cluster.
12+
13+
### brokers
14+
15+
A list of the brokers' host names and ports to connect to.
16+
17+
### SSL (optional)
18+
19+
Configure TLS connection to the Kafka cluster. The options are passed directly to [tls.connect] and used to create the TLS secure context. Normally these would include `key` and `cert`.
20+
21+
Example:
22+
23+
```yaml
24+
kafka:
25+
clientId: backstage
26+
brokers:
27+
- localhost:9092
28+
```

plugins/kafka-backend/config.d.ts

+38
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,38 @@
1+
/*
2+
* Copyright 2020 Spotify AB
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
export interface Config {
17+
kafka?: {
18+
/**
19+
* Client ID used to Backstage uses to identify when connecting to the Kafka cluster.
20+
*/
21+
clientId: string;
22+
/**
23+
* List of brokers in the Kafka cluster to connect to.
24+
*/
25+
brokers: string[];
26+
27+
/**
28+
* Optional SSL connection parameters to connect to the cluster. Passed directly to Node tls.connect.
29+
* See https://nodejs.org/dist/latest-v8.x/docs/api/tls.html#tls_tls_createsecurecontext_options
30+
*/
31+
ssl?: {
32+
ca: string[];
33+
/** @visibility secret */
34+
key: string;
35+
cert: string;
36+
};
37+
};
38+
}

plugins/kafka-backend/package.json

+55
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,55 @@
1+
{
2+
"name": "@backstage/plugin-kafka-backend",
3+
"version": "0.1.0",
4+
"main": "src/index.ts",
5+
"types": "src/index.ts",
6+
"license": "Apache-2.0",
7+
"private": false,
8+
"publishConfig": {
9+
"access": "public",
10+
"main": "dist/index.cjs.js",
11+
"types": "dist/index.d.ts"
12+
},
13+
"homepage": "https://backstage.io",
14+
"repository": {
15+
"type": "git",
16+
"url": "https://github.com/backstage/backstage",
17+
"directory": "plugins/kafka-backend"
18+
},
19+
"keywords": [
20+
"backstage",
21+
"kafka"
22+
],
23+
"configSchema": "config.d.ts",
24+
"scripts": {
25+
"start": "backstage-cli backend:dev",
26+
"build": "backstage-cli backend:build",
27+
"lint": "backstage-cli lint",
28+
"test": "backstage-cli test",
29+
"prepack": "backstage-cli prepack",
30+
"postpack": "backstage-cli postpack",
31+
"clean": "backstage-cli clean"
32+
},
33+
"dependencies": {
34+
"@backstage/backend-common": "^0.4.1",
35+
"@backstage/catalog-model": "^0.6.0",
36+
"@backstage/config": "^0.1.2",
37+
"@types/express": "^4.17.6",
38+
"express": "^4.17.1",
39+
"express-promise-router": "^3.0.3",
40+
"kafkajs": "^1.16.0-beta.6",
41+
"lodash": "^4.17.15",
42+
"winston": "^3.2.1"
43+
},
44+
"devDependencies": {
45+
"@backstage/cli": "^0.4.3",
46+
"@types/jest-when": "^2.7.2",
47+
"@types/lodash": "^4.14.151",
48+
"jest-when": "^3.1.0",
49+
"supertest": "^4.0.2"
50+
},
51+
"files": [
52+
"dist",
53+
"schema.d.ts"
54+
]
55+
}

plugins/kafka-backend/src/index.ts

+17
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,17 @@
1+
/*
2+
* Copyright 2020 Spotify AB
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
export { createRouter } from './service/router';
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,97 @@
1+
/*
2+
* Copyright 2020 Spotify AB
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
import { Kafka, SeekEntry } from 'kafkajs';
18+
import { Logger } from 'winston';
19+
import { ConnectionOptions } from 'tls';
20+
21+
export type PartitionOffset = {
22+
id: number;
23+
offset: string;
24+
};
25+
26+
export type TopicOffset = {
27+
topic: string;
28+
partitions: PartitionOffset[];
29+
};
30+
31+
export type Options = {
32+
clientId: string;
33+
brokers: string[];
34+
ssl?: ConnectionOptions;
35+
logger: Logger;
36+
};
37+
38+
export interface KafkaApi {
39+
fetchTopicOffsets(topic: string): Promise<Array<PartitionOffset>>;
40+
fetchGroupOffsets(groupId: string): Promise<Array<TopicOffset>>;
41+
}
42+
43+
export class KafkaJsApiImpl implements KafkaApi {
44+
private readonly kafka: Kafka;
45+
private readonly logger: Logger;
46+
47+
constructor(options: Options) {
48+
options.logger.debug(
49+
`creating kafka client with clientId=${options.clientId} and brokers=${options.brokers}`,
50+
);
51+
52+
this.kafka = new Kafka(options);
53+
this.logger = options.logger;
54+
}
55+
56+
async fetchTopicOffsets(topic: string): Promise<Array<PartitionOffset>> {
57+
this.logger.debug(`fetching topic offsets for ${topic}`);
58+
59+
const admin = this.kafka.admin();
60+
await admin.connect();
61+
62+
try {
63+
return KafkaJsApiImpl.toPartitionOffsets(
64+
await admin.fetchTopicOffsets(topic),
65+
);
66+
} finally {
67+
await admin.disconnect();
68+
}
69+
}
70+
71+
async fetchGroupOffsets(groupId: string): Promise<Array<TopicOffset>> {
72+
this.logger.debug(`fetching consumer group offsets for ${groupId}`);
73+
74+
const admin = this.kafka.admin();
75+
await admin.connect();
76+
77+
try {
78+
const groupOffsets = await admin.fetchOffsets({ groupId });
79+
80+
return groupOffsets.map(topicOffset => ({
81+
topic: topicOffset.topic,
82+
partitions: KafkaJsApiImpl.toPartitionOffsets(topicOffset.partitions),
83+
}));
84+
} finally {
85+
await admin.disconnect();
86+
}
87+
}
88+
89+
private static toPartitionOffsets(
90+
result: Array<SeekEntry>,
91+
): Array<PartitionOffset> {
92+
return result.map(seekEntry => ({
93+
id: seekEntry.partition,
94+
offset: seekEntry.offset,
95+
}));
96+
}
97+
}

0 commit comments

Comments
 (0)