Skip to content

Commit

Permalink
Support query pulsar message by messageId
Browse files Browse the repository at this point in the history
  • Loading branch information
tyluffy committed Jan 16, 2022
1 parent 6b0b5a4 commit d4dae63
Show file tree
Hide file tree
Showing 13 changed files with 282 additions and 28 deletions.
27 changes: 27 additions & 0 deletions lib/api/pulsar/pulsar_partitioned_topic_api.dart
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,33 @@ class PulsarPartitionedTopicApi {
return respList;
}

static Future<String> fetchConsumerMessage(
String host, int port, String tenant, String namespace, String topic, String ledgerId, String entryId) async {
var url =
'http://$host:${port.toString()}/admin/v2/persistent/$tenant/$namespace/$topic/ledger/$ledgerId/entry/$entryId';
var response = await http.get(Uri.parse(url), headers: <String, String>{
'Content-Type': 'application/json; charset=UTF-8',
});
if (HttpUtil.abnormal(response.statusCode)) {
log('ErrorCode is ${response.statusCode}, body is ${response.body}');
return "";
}
return response.body;
}

static Future<String> fetchMessageId(
String host, int port, String tenant, String namespace, String topic, String timestamp) async {
var url = 'http://$host:${port.toString()}/admin/v2/persistent/$tenant/$namespace/$topic/messageid/$timestamp';
var response = await http.get(Uri.parse(url), headers: <String, String>{
'Content-Type': 'application/json; charset=UTF-8',
});
if (HttpUtil.abnormal(response.statusCode)) {
log('ErrorCode is ${response.statusCode}, body is ${response.body}');
return "";
}
return response.body;
}

static Future<List<ProducerResp>> getProducers(
String host, int port, String tenant, String namespace, String topic) async {
String data = "";
Expand Down
16 changes: 15 additions & 1 deletion lib/api/pulsar/pulsar_topic_api.dart
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,21 @@ class PulsarTopicApi {

static Future<String> fetchConsumerMessage(
String host, int port, String tenant, String namespace, String topic, String ledgerId, String entryId) async {
var url = 'http://$host:${port.toString()}/admin/v2/persistent/$tenant/$namespace/ledger/$ledgerId/entry/$entryId';
var url =
'http://$host:${port.toString()}/admin/v2/persistent/$tenant/$namespace/$topic/ledger/$ledgerId/entry/$entryId';
var response = await http.get(Uri.parse(url), headers: <String, String>{
'Content-Type': 'application/json; charset=UTF-8',
});
if (HttpUtil.abnormal(response.statusCode)) {
log('ErrorCode is ${response.statusCode}, body is ${response.body}');
return "";
}
return response.body;
}

static Future<String> fetchMessageId(
String host, int port, String tenant, String namespace, String topic, String timestamp) async {
var url = 'http://$host:${port.toString()}/admin/v2/persistent/$tenant/$namespace/$topic/messageid/$timestamp';
var response = await http.get(Uri.parse(url), headers: <String, String>{
'Content-Type': 'application/json; charset=UTF-8',
});
Expand Down
7 changes: 6 additions & 1 deletion lib/generated/intl/messages_en.dart
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ class MessageLookup extends MessageLookupByLibrary {
"forceDelete": MessageLookupByLibrary.simpleMessage("forceDelete"),
"isLeader": MessageLookupByLibrary.simpleMessage("Is Leader"),
"languageSettings": MessageLookupByLibrary.simpleMessage("Language Settings"),
"messageList": MessageLookupByLibrary.simpleMessage("message list"),
"name": MessageLookupByLibrary.simpleMessage("name"),
"namespace": MessageLookupByLibrary.simpleMessage("namespace"),
"namespaceName": MessageLookupByLibrary.simpleMessage("Namespace Name"),
Expand All @@ -58,10 +59,14 @@ class MessageLookup extends MessageLookupByLibrary {
"producer": MessageLookupByLibrary.simpleMessage("Producer"),
"producerList": MessageLookupByLibrary.simpleMessage("Producer List"),
"refresh": MessageLookupByLibrary.simpleMessage("Refresh"),
"searchByMessageIdWithHint": MessageLookupByLibrary.simpleMessage(
"searchByMessageId": MessageLookupByLibrary.simpleMessage(
"Search by MessageId, type is ledgerId*entryId, submit with enter key"),
"searchByMessageIdWithHint": MessageLookupByLibrary.simpleMessage(
"Search by MessageId, single search type should be ledgerId*entryId,multi search type should be ledgerId entryId entryId, submit with enter key"),
"searchByNamespace": MessageLookupByLibrary.simpleMessage("Search by Namespace Name"),
"searchByTenant": MessageLookupByLibrary.simpleMessage("Search by Tenant Name"),
"searchByTimestampWithHint":
MessageLookupByLibrary.simpleMessage("Search MessageId by timestamp,submit with enter key"),
"searchByTopic": MessageLookupByLibrary.simpleMessage("Search by Topic Name"),
"second": MessageLookupByLibrary.simpleMessage("second"),
"settings": MessageLookupByLibrary.simpleMessage("Settings"),
Expand Down
7 changes: 5 additions & 2 deletions lib/generated/intl/messages_zh.dart
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ class MessageLookup extends MessageLookupByLibrary {
"forceDelete": MessageLookupByLibrary.simpleMessage("强制删除"),
"isLeader": MessageLookupByLibrary.simpleMessage("是否是主节点"),
"languageSettings": MessageLookupByLibrary.simpleMessage("语言设置"),
"messageList": MessageLookupByLibrary.simpleMessage("消息列表"),
"name": MessageLookupByLibrary.simpleMessage("名称"),
"namespace": MessageLookupByLibrary.simpleMessage("命名空间"),
"namespaceName": MessageLookupByLibrary.simpleMessage("命名空间名称"),
Expand All @@ -58,10 +59,12 @@ class MessageLookup extends MessageLookupByLibrary {
"producer": MessageLookupByLibrary.simpleMessage("生产者"),
"producerList": MessageLookupByLibrary.simpleMessage("生产者列表"),
"refresh": MessageLookupByLibrary.simpleMessage("刷新"),
"searchByMessageIdWithHint":
MessageLookupByLibrary.simpleMessage("通过messageId查询消息,格式ledgerId*entryId,按enter键进行查询。"),
"searchByMessageId": MessageLookupByLibrary.simpleMessage("通过messageId查询消息,格式ledgerId*entryId,按enter键进行查询。"),
"searchByMessageIdWithHint": MessageLookupByLibrary.simpleMessage(
"通过messageId查询消息,单条查询格式ledgerId entryId,范围查询格式位ledgerId entryId entryId, 按enter键进行查询。"),
"searchByNamespace": MessageLookupByLibrary.simpleMessage("按命名空间名称搜索"),
"searchByTenant": MessageLookupByLibrary.simpleMessage("按租户名称搜索"),
"searchByTimestampWithHint": MessageLookupByLibrary.simpleMessage("通过时间戳查询消息Id, 按enter键进行查询"),
"searchByTopic": MessageLookupByLibrary.simpleMessage("按 Topic 名称搜索"),
"second": MessageLookupByLibrary.simpleMessage("秒"),
"settings": MessageLookupByLibrary.simpleMessage("设置"),
Expand Down
32 changes: 31 additions & 1 deletion lib/generated/l10n.dart

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 4 additions & 1 deletion lib/l10n/intl_en.arb
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
"forceDelete": "forceDelete",
"isLeader": "Is Leader",
"languageSettings": "Language Settings",
"messageList": "message list",
"name": "name",
"namespace": "namespace",
"namespaceName": "Namespace Name",
Expand All @@ -35,9 +36,11 @@
"producer": "Producer",
"producerList": "Producer List",
"refresh": "Refresh",
"searchByMessageIdWithHint": "Search by MessageId, type is ledgerId*entryId, submit with enter key",
"searchByMessageId": "Search by MessageId, type is ledgerId*entryId, submit with enter key",
"searchByMessageIdWithHint": "Search by MessageId, single search type should be ledgerId*entryId,multi search type should be ledgerId entryId entryId, submit with enter key",
"searchByNamespace": "Search by Namespace Name",
"searchByTenant": "Search by Tenant Name",
"searchByTimestampWithHint": "Search MessageId by timestamp,submit with enter key",
"searchByTopic": "Search by Topic Name",
"second": "second",
"settings": "Settings",
Expand Down
5 changes: 4 additions & 1 deletion lib/l10n/intl_zh.arb
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
"forceDelete": "强制删除",
"isLeader": "是否是主节点",
"languageSettings": "语言设置",
"messageList": "消息列表",
"name": "名称",
"namespace": "命名空间",
"namespaceName": "命名空间名称",
Expand All @@ -35,9 +36,11 @@
"producer": "生产者",
"producerList": "生产者列表",
"refresh": "刷新",
"searchByMessageIdWithHint": "通过messageId查询消息,格式ledgerId*entryId,按enter键进行查询。",
"searchByMessageId": "通过messageId查询消息,格式ledgerId*entryId,按enter键进行查询。",
"searchByMessageIdWithHint": "通过messageId查询消息,单条查询格式ledgerId entryId,范围查询格式位ledgerId entryId entryId, 按enter键进行查询。",
"searchByNamespace": "按命名空间名称搜索",
"searchByTenant": "按租户名称搜索",
"searchByTimestampWithHint": "通过时间戳查询消息Id, 按enter键进行查询",
"searchByTopic": "按 Topic 名称搜索",
"second": "秒",
"settings": "设置",
Expand Down
8 changes: 4 additions & 4 deletions lib/ui/pulsar/screen/pulsar_partitioned_topic.dart
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
import 'package:flutter/material.dart';
import 'package:paas_dashboard_flutter/generated/l10n.dart';
import 'package:paas_dashboard_flutter/ui/pulsar/widget/pulsar_partitioned_topic_basic.dart';
import 'package:paas_dashboard_flutter/ui/pulsar/widget/pulsar_partitioned_topic_consume.dart';
import 'package:paas_dashboard_flutter/ui/pulsar/widget/pulsar_partitioned_topic_consumer.dart';
import 'package:paas_dashboard_flutter/ui/pulsar/widget/pulsar_partitioned_topic_detail.dart';
import 'package:paas_dashboard_flutter/ui/pulsar/widget/pulsar_partitioned_topic_producer.dart';
import 'package:paas_dashboard_flutter/ui/pulsar/widget/pulsar_partitioned_topic_subscription.dart';
import 'package:paas_dashboard_flutter/vm/pulsar/pulsar_partitioned_topic_basic_view_model.dart';
import 'package:paas_dashboard_flutter/vm/pulsar/pulsar_partitioned_topic_consume_view_model.dart';
import 'package:paas_dashboard_flutter/vm/pulsar/pulsar_partitioned_topic_consumer_view_model.dart';
import 'package:paas_dashboard_flutter/vm/pulsar/pulsar_partitioned_topic_detail_view_model.dart';
import 'package:paas_dashboard_flutter/vm/pulsar/pulsar_partitioned_topic_producer_view_model.dart';
import 'package:paas_dashboard_flutter/vm/pulsar/pulsar_partitioned_topic_subscription_view_model.dart';
Expand Down Expand Up @@ -62,9 +62,9 @@ class _PulsarPartitionedTopicState extends State<PulsarPartitionedTopic> {
child: PulsarPartitionedTopicSubscriptionWidget(),
).build(context),
ChangeNotifierProvider(
create: (context) => PulsarPartitionedTopicConsumeViewModel(
create: (context) => PulsarPartitionedTopicConsumerViewModel(
vm.pulsarInstancePo, vm.tenantResp, vm.namespaceResp, vm.topicResp),
child: PulsarPartitionedTopicConsumeWidget(),
child: PulsarPartitionedTopicConsumerWidget(),
).build(context),
ChangeNotifierProvider(
create: (context) => PulsarPartitionedTopicProducerViewModel(
Expand Down
4 changes: 2 additions & 2 deletions lib/ui/pulsar/screen/pulsar_topic.dart
Original file line number Diff line number Diff line change
Expand Up @@ -67,8 +67,8 @@ class _PulsarTopicState extends State<PulsarTopic> {
child: PulsarTopicProducerWidget(),
).build(context),
ChangeNotifierProvider(
create: (context) =>
PulsarTopicConsumeViewModel(vm.pulsarInstancePo, vm.tenantResp, vm.namespaceResp, vm.topicResp),
create: (context) => PulsarTopicConsumeViewModel(
vm.pulsarInstancePo, vm.tenantResp, vm.namespaceResp, vm.topicResp, "", "", []),
child: PulsarTopicConsumeWidget(),
).build(context),
],
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,29 +2,29 @@ import 'package:flutter/material.dart';
import 'package:paas_dashboard_flutter/generated/l10n.dart';
import 'package:paas_dashboard_flutter/ui/util/exception_util.dart';
import 'package:paas_dashboard_flutter/ui/util/spinner_util.dart';
import 'package:paas_dashboard_flutter/vm/pulsar/pulsar_partitioned_topic_consume_view_model.dart';
import 'package:paas_dashboard_flutter/vm/pulsar/pulsar_partitioned_topic_consumer_view_model.dart';
import 'package:provider/provider.dart';

class PulsarPartitionedTopicConsumeWidget extends StatefulWidget {
PulsarPartitionedTopicConsumeWidget();
class PulsarPartitionedTopicConsumerWidget extends StatefulWidget {
PulsarPartitionedTopicConsumerWidget();

@override
State<StatefulWidget> createState() {
return new PulsarPartitionedTopicConsumeWidgetState();
return new PulsarPartitionedTopicConsumerWidgetState();
}
}

class PulsarPartitionedTopicConsumeWidgetState extends State<PulsarPartitionedTopicConsumeWidget> {
class PulsarPartitionedTopicConsumerWidgetState extends State<PulsarPartitionedTopicConsumerWidget> {
@override
void initState() {
super.initState();
final vm = Provider.of<PulsarPartitionedTopicConsumeViewModel>(context, listen: false);
final vm = Provider.of<PulsarPartitionedTopicConsumerViewModel>(context, listen: false);
vm.fetchConsumers();
}

@override
Widget build(BuildContext context) {
final vm = Provider.of<PulsarPartitionedTopicConsumeViewModel>(context);
final vm = Provider.of<PulsarPartitionedTopicConsumerViewModel>(context);
if (vm.loading) {
WidgetsBinding.instance!.addPostFrameCallback((timeStamp) {
SpinnerUtil.create();
Expand Down
Loading

0 comments on commit d4dae63

Please sign in to comment.