Skip to content

Commit

Permalink
WIP: Consumer groups list with search. (provectus#17)
Browse files Browse the repository at this point in the history
* Added concumer groups list with search.

* added endpoint for group consumers

* removed redundand code and imports

* changed method to async mono

* method located better

* changes after review

* changed foreach to map

Co-authored-by: Sofia Shnaidman <sshnaidman@provectus.com>
Co-authored-by: Roman Nedzvetskiy <roman@Romans-MacBook-Pro.local>
  • Loading branch information
3 people authored Apr 7, 2020
1 parent a96c67b commit 59b6679
Show file tree
Hide file tree
Showing 24 changed files with 374 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,12 @@

import com.provectus.kafka.ui.cluster.model.ClustersStorage;
import com.provectus.kafka.ui.cluster.model.KafkaCluster;
import com.provectus.kafka.ui.cluster.util.ClusterUtil;
import com.provectus.kafka.ui.kafka.KafkaService;
import com.provectus.kafka.ui.model.*;
import lombok.RequiredArgsConstructor;
import lombok.SneakyThrows;
import org.apache.kafka.clients.admin.ConsumerGroupListing;
import org.springframework.http.ResponseEntity;
import org.springframework.stereotype.Service;
import reactor.core.publisher.Flux;
Expand Down Expand Up @@ -58,4 +61,15 @@ public Mono<ResponseEntity<Topic>> createTopic(String name, Mono<TopicFormData>
if (cluster == null) return null;
return kafkaService.createTopic(cluster, topicFormData);
}

@SneakyThrows
public Mono<ResponseEntity<Flux<ConsumerGroup>>> getConsumerGroup (String clusterName) {
var cluster = clustersStorage.getClusterByName(clusterName);
return ClusterUtil.toMono(cluster.getAdminClient().listConsumerGroups().all())
.flatMap(s -> ClusterUtil.toMono(cluster.getAdminClient()
.describeConsumerGroups(s.stream().map(ConsumerGroupListing::groupId).collect(Collectors.toList())).all()))
.map(s -> s.values().stream()
.map(c -> ClusterUtil.convertToConsumerGroup(c, cluster)).collect(Collectors.toList()))
.map(s -> ResponseEntity.ok(Flux.fromIterable(s)));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
package com.provectus.kafka.ui.cluster.util;

import com.provectus.kafka.ui.cluster.model.KafkaCluster;
import com.provectus.kafka.ui.model.ConsumerGroup;
import org.apache.kafka.clients.admin.ConsumerGroupDescription;
import org.apache.kafka.common.KafkaFuture;
import reactor.core.publisher.Mono;

import java.util.HashSet;
import java.util.Set;

public class ClusterUtil {

public static <T> Mono<T> toMono(KafkaFuture<T> future){
return Mono.create(sink -> future.whenComplete((res, ex)->{
if (ex!=null) {
sink.error(ex);
} else {
sink.success(res);
}
}));
}

public static ConsumerGroup convertToConsumerGroup(ConsumerGroupDescription c, KafkaCluster cluster) {
ConsumerGroup consumerGroup = new ConsumerGroup();
consumerGroup.setClusterId(cluster.getCluster().getId());
consumerGroup.setConsumerGroupId(c.groupId());
consumerGroup.setNumConsumers(c.members().size());
Set<String> topics = new HashSet<>();
c.members().forEach(s1 -> s1.assignment().topicPartitions().forEach(s2 -> topics.add(s2.topic())));
consumerGroup.setNumTopics(topics.size());
return consumerGroup;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import com.provectus.kafka.ui.cluster.service.ClusterService;
import com.provectus.kafka.ui.model.*;
import lombok.RequiredArgsConstructor;
import org.apache.kafka.clients.admin.ListConsumerGroupsResult;
import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.RestController;
import org.springframework.web.server.ServerWebExchange;
Expand Down Expand Up @@ -53,4 +54,9 @@ public Mono<ResponseEntity<Topic>> createTopic(String clusterId, @Valid Mono<Top
public Mono<ResponseEntity<Flux<Broker>>> getBrokers(String clusterId, ServerWebExchange exchange) {
return Mono.just(ResponseEntity.ok(Flux.fromIterable(new ArrayList<>())));
}

@Override
public Mono<ResponseEntity<Flux<ConsumerGroup>>> getConsumerGroup(String clusterName, ServerWebExchange exchange) {
return clusterService.getConsumerGroup(clusterName);
}
}
36 changes: 35 additions & 1 deletion kafka-ui-contract/src/main/resources/swagger/kafka-ui-api.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,28 @@ paths:
items:
$ref: '#/components/schemas/TopicConfig'

/api/clusters/{clusterName}/consumerGroups:
get:
tags:
- /api/clusters
summary: getConsumerGroup
operationId: getConsumerGroup
parameters:
- name: clusterName
in: path
required: true
schema:
type: string
responses:
200:
description: OK
content:
application/json:
schema:
type: array
items:
$ref: '#/components/schemas/ConsumerGroup'

components:
schemas:
Cluster:
Expand Down Expand Up @@ -307,4 +329,16 @@ components:
type: object
properties:
id:
type: string
type: string

ConsumerGroup:
type: object
properties:
clusterId:
type: string
consumerGroupId:
type: string
numConsumers:
type: integer
numTopics:
type: integer
2 changes: 2 additions & 0 deletions kafka-ui-react-app/mock/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ const brokerMetrics = require('./payload/brokerMetrics.json');
const topics = require('./payload/topics.json');
const topicDetails = require('./payload/topicDetails.json');
const topicConfigs = require('./payload/topicConfigs.json');
const consumerGroups = require('./payload/consumerGroups.json');

const db = {
clusters,
Expand All @@ -13,6 +14,7 @@ const db = {
topics: topics.map((topic) => ({...topic, id: topic.name})),
topicDetails,
topicConfigs,
consumerGroups: consumerGroups.map((group) => ({...group, id: group.consumerGroupId}))
};
const server = jsonServer.create();
const router = jsonServer.router(db);
Expand Down
39 changes: 39 additions & 0 deletions kafka-ui-react-app/mock/payload/consumerGroups.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
[
{
"clusterId": "fake.cluster",
"consumerGroupId": "_fake.cluster.consumer_1",
"numConsumers": 1,
"numTopics": 11
},
{
"clusterId": "fake.cluster",
"consumerGroupId": "_fake.cluster.consumer_2",
"numConsumers": 2,
"numTopics": 22
},
{
"clusterId": "fake.cluster",
"consumerGroupId": "_fake.cluster.consumer_3",
"numConsumers": 3,
"numTopics": 33
},

{
"clusterId": "kafka-ui.cluster",
"consumerGroupId": "_kafka-ui.cluster.consumer_1",
"numConsumers": 4,
"numTopics": 44
},
{
"clusterId": "kafka-ui.cluster",
"consumerGroupId": "_kafka-ui.cluster.consumer_2",
"numConsumers": 5,
"numTopics": 55
},
{
"clusterId": "kafka-ui.cluster",
"consumerGroupId": "_kafka-ui.cluster.consumer_3",
"numConsumers": 6,
"numTopics": 66
}
]
2 changes: 2 additions & 0 deletions kafka-ui-react-app/src/components/App.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import TopicsContainer from './Topics/TopicsContainer';
import NavConatiner from './Nav/NavConatiner';
import PageLoader from './common/PageLoader/PageLoader';
import Dashboard from './Dashboard/Dashboard';
import ConsumersGroupsContainer from './ConsumerGroups/ConsumersGroupsContainer';

interface AppProps {
isClusterListFetched: boolean;
Expand Down Expand Up @@ -39,6 +40,7 @@ const App: React.FC<AppProps> = ({
<Route exact path="/clusters" component={Dashboard} />
<Route path="/clusters/:clusterName/topics" component={TopicsContainer} />
<Route path="/clusters/:clusterName/brokers" component={BrokersContainer} />
<Route path="/clusters/:clusterName/consumer-groups" component={ConsumersGroupsContainer} />
<Redirect from="/clusters/:clusterName" to="/clusters/:clusterName/brokers" />
</Switch>
) : (
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
import React from 'react';
import { ClusterName } from 'redux/interfaces';
import {
Switch,
Route,
} from 'react-router-dom';
import ListContainer from './List/ListContainer';
import PageLoader from 'components/common/PageLoader/PageLoader';

interface Props {
clusterName: ClusterName;
isFetched: boolean;
fetchConsumerGroupsList: (clusterName: ClusterName) => void;
}

const ConsumerGroups: React.FC<Props> = ({
clusterName,
isFetched,
fetchConsumerGroupsList,
}) => {
React.useEffect(() => { fetchConsumerGroupsList(clusterName); }, [fetchConsumerGroupsList, clusterName]);

if (isFetched) {
return (
<Switch>
<Route exact path="/clusters/:clusterName/consumer-groups" component={ListContainer} />
</Switch>
);
}

return (<PageLoader />);
};

export default ConsumerGroups;
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
import { connect } from 'react-redux';
import { fetchConsumerGroupsList } from 'redux/actions';
import { RootState, ClusterName } from 'redux/interfaces';
import { RouteComponentProps } from 'react-router-dom';
import ConsumerGroups from './ConsumerGroups';
import { getIsConsumerGroupsListFetched } from '../../redux/reducers/consumerGroups/selectors';


interface RouteProps {
clusterName: ClusterName;
}

interface OwnProps extends RouteComponentProps<RouteProps> { }

const mapStateToProps = (state: RootState, { match: { params: { clusterName } }}: OwnProps) => ({
isFetched: getIsConsumerGroupsListFetched(state),
clusterName,
});

const mapDispatchToProps = {
fetchConsumerGroupsList: (clusterName: ClusterName) => fetchConsumerGroupsList(clusterName),
};

export default connect(mapStateToProps, mapDispatchToProps)(ConsumerGroups);
64 changes: 64 additions & 0 deletions kafka-ui-react-app/src/components/ConsumerGroups/List/List.tsx
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
import React, { ChangeEvent } from 'react';
import { ConsumerGroup, ClusterName } from 'redux/interfaces';
import ListItem from './ListItem';
import Breadcrumb from 'components/common/Breadcrumb/Breadcrumb';

interface Props {
clusterName: ClusterName;
consumerGroups: (ConsumerGroup)[];
}

const List: React.FC<Props> = ({
consumerGroups,
}) => {

const [searchText, setSearchText] = React.useState<string>('');

const handleInputChange = (event: React.ChangeEvent<HTMLInputElement>) => {
setSearchText(event.target.value);
};

const items = consumerGroups;

return (
<div className="section">
<Breadcrumb>All Consumer Groups</Breadcrumb>

<div className="box">
<div className="columns">
<div className="column is-half is-offset-half">
<input id="searchText"
type="text"
name="searchText"
className="input"
placeholder="Search"
value={searchText}
onChange={handleInputChange}
/>
</div>
</div>
<table className="table is-striped is-fullwidth">
<thead>
<tr>
<th>Consumer group ID</th>
<th>Num of consumers</th>
<th>Num of topics</th>
</tr>
</thead>
<tbody>
{items
.filter( (consumerGroup) => !searchText || consumerGroup?.consumerGroupId?.indexOf(searchText) >= 0)
.map((consumerGroup, index) => (
<ListItem
key={`consumer-group-list-item-key-${index}`}
{...consumerGroup}
/>
))}
</tbody>
</table>
</div>
</div>
);
};

export default List;
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
import { connect } from 'react-redux';
import {ClusterName, RootState} from 'redux/interfaces';
import { getConsumerGroupsList } from 'redux/reducers/consumerGroups/selectors';
import List from './List';
import { withRouter, RouteComponentProps } from 'react-router-dom';

interface RouteProps {
clusterName: ClusterName;
}

interface OwnProps extends RouteComponentProps<RouteProps> { }

const mapStateToProps = (state: RootState, { match: { params: { clusterName } } }: OwnProps) => ({
clusterName,
consumerGroups: getConsumerGroupsList(state)
});

export default withRouter(
connect(mapStateToProps)(List)
);
24 changes: 24 additions & 0 deletions kafka-ui-react-app/src/components/ConsumerGroups/List/ListItem.tsx
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
import React from 'react';
import { NavLink } from 'react-router-dom';
import { ConsumerGroup } from 'redux/interfaces';

const ListItem: React.FC<ConsumerGroup> = ({
consumerGroupId,
numConsumers,
numTopics,
}) => {
return (
<tr>
{/* <td>
<NavLink exact to={`consumer-groups/${consumerGroupId}`} activeClassName="is-active" className="title is-6">
{consumerGroupId}
</NavLink>
</td> */}
<td>{consumerGroupId}</td>
<td>{numConsumers}</td>
<td>{numTopics}</td>
</tr>
);
}

export default ListItem;
5 changes: 4 additions & 1 deletion kafka-ui-react-app/src/components/Nav/ClusterMenu.tsx
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import React, { CSSProperties } from 'react';
import { Cluster } from 'redux/interfaces';
import { NavLink } from 'react-router-dom';
import { clusterBrokersPath, clusterTopicsPath } from 'lib/paths';
import { clusterBrokersPath, clusterTopicsPath, clusterConsumerGroupsPath } from 'lib/paths';

interface Props extends Cluster {}

Expand Down Expand Up @@ -37,6 +37,9 @@ const ClusterMenu: React.FC<Props> = ({
<NavLink to={clusterTopicsPath(name)} activeClassName="is-active" title="Topics">
Topics
</NavLink>
<NavLink to={clusterConsumerGroupsPath(name)} activeClassName="is-active" title="Consumers">
Consumers
</NavLink>
</ul>
</li>
</ul>
Expand Down
2 changes: 2 additions & 0 deletions kafka-ui-react-app/src/lib/paths.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,3 +13,5 @@ export const clusterTopicNewPath = (clusterName: ClusterName) => `${clusterPath(
export const clusterTopicPath = (clusterName: ClusterName, topicName: TopicName) => `${clusterTopicsPath(clusterName)}/${topicName}`;
export const clusterTopicSettingsPath = (clusterName: ClusterName, topicName: TopicName) => `${clusterTopicsPath(clusterName)}/${topicName}/settings`;
export const clusterTopicMessagesPath = (clusterName: ClusterName, topicName: TopicName) => `${clusterTopicsPath(clusterName)}/${topicName}/messages`;

export const clusterConsumerGroupsPath = (clusterName: ClusterName) => `${clusterPath(clusterName)}/consumer-groups`;
Loading

0 comments on commit 59b6679

Please sign in to comment.