From ce2f8e05439a0825ce21e5eb79e05ec2f0326593 Mon Sep 17 00:00:00 2001 From: zxJin-x Date: Mon, 17 Jan 2022 10:56:58 +0800 Subject: [PATCH] Support query pulsar message by messageId --- lib/api/pulsar/pulsar_topic_api.dart | 16 ++- lib/generated/intl/messages_en.dart | 8 +- lib/generated/intl/messages_zh.dart | 8 +- lib/generated/l10n.dart | 44 ++++++- lib/l10n/intl_en.arb | 6 +- lib/l10n/intl_zh.arb | 6 +- .../screen/pulsar_partitioned_topic.dart | 8 +- ...=> pulsar_partitioned_topic_consumer.dart} | 14 +-- .../pulsar/widget/pulsar_topic_consume.dart | 119 +++++++++++++++++- ...artitioned_topic_consumer_view_model.dart} | 8 +- .../pulsar_topic_consume_view_model.dart | 61 ++++++++- 11 files changed, 268 insertions(+), 30 deletions(-) rename lib/ui/pulsar/widget/{pulsar_partitioned_topic_consume.dart => pulsar_partitioned_topic_consumer.dart} (86%) rename lib/vm/pulsar/{pulsar_partitioned_topic_consume_view_model.dart => pulsar_partitioned_topic_consumer_view_model.dart} (83%) diff --git a/lib/api/pulsar/pulsar_topic_api.dart b/lib/api/pulsar/pulsar_topic_api.dart index f073dcb..b60697e 100644 --- a/lib/api/pulsar/pulsar_topic_api.dart +++ b/lib/api/pulsar/pulsar_topic_api.dart @@ -68,7 +68,21 @@ class PulsarTopicApi { static Future fetchConsumerMessage( String host, int port, String tenant, String namespace, String topic, String ledgerId, String entryId) async { - var url = 'http://$host:${port.toString()}/admin/v2/persistent/$tenant/$namespace/ledger/$ledgerId/entry/$entryId'; + var url = + 'http://$host:${port.toString()}/admin/v2/persistent/$tenant/$namespace/$topic/ledger/$ledgerId/entry/$entryId'; + var response = await http.get(Uri.parse(url), headers: { + 'Content-Type': 'application/json; charset=UTF-8', + }); + if (HttpUtil.abnormal(response.statusCode)) { + log('ErrorCode is ${response.statusCode}, body is ${response.body}'); + return ""; + } + return response.body; + } + + static Future fetchMessageId( + String host, int port, String tenant, String namespace, String topic, String timestamp) async { + var url = 'http://$host:${port.toString()}/admin/v2/persistent/$tenant/$namespace/$topic/messageid/$timestamp'; var response = await http.get(Uri.parse(url), headers: { 'Content-Type': 'application/json; charset=UTF-8', }); diff --git a/lib/generated/intl/messages_en.dart b/lib/generated/intl/messages_en.dart index 73d0377..0713413 100644 --- a/lib/generated/intl/messages_en.dart +++ b/lib/generated/intl/messages_en.dart @@ -44,10 +44,12 @@ class MessageLookup extends MessageLookupByLibrary { "deleteTopic": MessageLookupByLibrary.simpleMessage("Delete Topic"), "detail": MessageLookupByLibrary.simpleMessage("detail"), "email": MessageLookupByLibrary.simpleMessage("email"), + "entryId": MessageLookupByLibrary.simpleMessage("entryId"), "execute": MessageLookupByLibrary.simpleMessage("execute"), "forceDelete": MessageLookupByLibrary.simpleMessage("forceDelete"), "isLeader": MessageLookupByLibrary.simpleMessage("Is Leader"), "languageSettings": MessageLookupByLibrary.simpleMessage("Language Settings"), + "messageList": MessageLookupByLibrary.simpleMessage("message list"), "name": MessageLookupByLibrary.simpleMessage("name"), "namespace": MessageLookupByLibrary.simpleMessage("namespace"), "namespaceName": MessageLookupByLibrary.simpleMessage("Namespace Name"), @@ -58,10 +60,14 @@ class MessageLookup extends MessageLookupByLibrary { "producer": MessageLookupByLibrary.simpleMessage("Producer"), "producerList": MessageLookupByLibrary.simpleMessage("Producer List"), "refresh": MessageLookupByLibrary.simpleMessage("Refresh"), + "searchByMessageId": MessageLookupByLibrary.simpleMessage( + "Search by MessageId, type is ledgerId entryId, submit with enter key"), "searchByMessageIdWithHint": MessageLookupByLibrary.simpleMessage( - "Search by MessageId, type is ledgerId*entryId, submit with enter key"), + "Search by MessageId, single search type should be ledgerId entryId, multi search type should be ledgerId entryId entryId, submit with enter key"), "searchByNamespace": MessageLookupByLibrary.simpleMessage("Search by Namespace Name"), "searchByTenant": MessageLookupByLibrary.simpleMessage("Search by Tenant Name"), + "searchByTimestampWithHint": + MessageLookupByLibrary.simpleMessage("Search MessageId by timestamp,submit with enter key"), "searchByTopic": MessageLookupByLibrary.simpleMessage("Search by Topic Name"), "second": MessageLookupByLibrary.simpleMessage("second"), "settings": MessageLookupByLibrary.simpleMessage("Settings"), diff --git a/lib/generated/intl/messages_zh.dart b/lib/generated/intl/messages_zh.dart index 59663f3..250791b 100644 --- a/lib/generated/intl/messages_zh.dart +++ b/lib/generated/intl/messages_zh.dart @@ -44,10 +44,12 @@ class MessageLookup extends MessageLookupByLibrary { "deleteTopic": MessageLookupByLibrary.simpleMessage("删除 Topic"), "detail": MessageLookupByLibrary.simpleMessage("详细信息"), "email": MessageLookupByLibrary.simpleMessage("邮箱"), + "entryId": MessageLookupByLibrary.simpleMessage("编号"), "execute": MessageLookupByLibrary.simpleMessage("执行"), "forceDelete": MessageLookupByLibrary.simpleMessage("强制删除"), "isLeader": MessageLookupByLibrary.simpleMessage("是否是主节点"), "languageSettings": MessageLookupByLibrary.simpleMessage("语言设置"), + "messageList": MessageLookupByLibrary.simpleMessage("消息列表"), "name": MessageLookupByLibrary.simpleMessage("名称"), "namespace": MessageLookupByLibrary.simpleMessage("命名空间"), "namespaceName": MessageLookupByLibrary.simpleMessage("命名空间名称"), @@ -58,10 +60,12 @@ class MessageLookup extends MessageLookupByLibrary { "producer": MessageLookupByLibrary.simpleMessage("生产者"), "producerList": MessageLookupByLibrary.simpleMessage("生产者列表"), "refresh": MessageLookupByLibrary.simpleMessage("刷新"), - "searchByMessageIdWithHint": - MessageLookupByLibrary.simpleMessage("通过messageId查询消息,格式ledgerId*entryId,按enter键进行查询。"), + "searchByMessageId": MessageLookupByLibrary.simpleMessage("通过messageId查询消息,格式ledgerId entryId,按enter键进行查询。"), + "searchByMessageIdWithHint": MessageLookupByLibrary.simpleMessage( + "通过messageId查询消息,单条查询格式ledgerId entryId,范围查询格式位ledgerId entryId entryId,按enter键进行查询。"), "searchByNamespace": MessageLookupByLibrary.simpleMessage("按命名空间名称搜索"), "searchByTenant": MessageLookupByLibrary.simpleMessage("按租户名称搜索"), + "searchByTimestampWithHint": MessageLookupByLibrary.simpleMessage("通过时间戳查询消息Id, 按enter键进行查询"), "searchByTopic": MessageLookupByLibrary.simpleMessage("按 Topic 名称搜索"), "second": MessageLookupByLibrary.simpleMessage("秒"), "settings": MessageLookupByLibrary.simpleMessage("设置"), diff --git a/lib/generated/l10n.dart b/lib/generated/l10n.dart index 5777a5f..56d027f 100644 --- a/lib/generated/l10n.dart +++ b/lib/generated/l10n.dart @@ -258,6 +258,16 @@ class S { ); } + /// `entryId` + String get entryId { + return Intl.message( + 'entryId', + name: 'entryId', + desc: '', + args: [], + ); + } + /// `execute` String get execute { return Intl.message( @@ -298,6 +308,16 @@ class S { ); } + /// `message list` + String get messageList { + return Intl.message( + 'message list', + name: 'messageList', + desc: '', + args: [], + ); + } + /// `name` String get name { return Intl.message( @@ -398,10 +418,20 @@ class S { ); } - /// `Search by MessageId, type is ledgerId*entryId, submit with enter key` + /// `Search by MessageId, type is ledgerId entryId, submit with enter key` + String get searchByMessageId { + return Intl.message( + 'Search by MessageId, type is ledgerId entryId, submit with enter key', + name: 'searchByMessageId', + desc: '', + args: [], + ); + } + + /// `Search by MessageId, single search type should be ledgerId entryId, multi search type should be ledgerId entryId entryId, submit with enter key` String get searchByMessageIdWithHint { return Intl.message( - 'Search by MessageId, type is ledgerId*entryId, submit with enter key', + 'Search by MessageId, single search type should be ledgerId entryId, multi search type should be ledgerId entryId entryId, submit with enter key', name: 'searchByMessageIdWithHint', desc: '', args: [], @@ -428,6 +458,16 @@ class S { ); } + /// `Search MessageId by timestamp,submit with enter key` + String get searchByTimestampWithHint { + return Intl.message( + 'Search MessageId by timestamp,submit with enter key', + name: 'searchByTimestampWithHint', + desc: '', + args: [], + ); + } + /// `Search by Topic Name` String get searchByTopic { return Intl.message( diff --git a/lib/l10n/intl_en.arb b/lib/l10n/intl_en.arb index 492886e..a2a596d 100644 --- a/lib/l10n/intl_en.arb +++ b/lib/l10n/intl_en.arb @@ -21,10 +21,12 @@ "deleteTopic": "Delete Topic", "detail": "detail", "email": "email", + "entryId": "entryId", "execute": "execute", "forceDelete": "forceDelete", "isLeader": "Is Leader", "languageSettings": "Language Settings", + "messageList": "message list", "name": "name", "namespace": "namespace", "namespaceName": "Namespace Name", @@ -35,9 +37,11 @@ "producer": "Producer", "producerList": "Producer List", "refresh": "Refresh", - "searchByMessageIdWithHint": "Search by MessageId, type is ledgerId*entryId, submit with enter key", + "searchByMessageId": "Search by MessageId, type is ledgerId entryId, submit with enter key", + "searchByMessageIdWithHint": "Search by MessageId, single search type should be ledgerId entryId, multi search type should be ledgerId entryId entryId, submit with enter key", "searchByNamespace": "Search by Namespace Name", "searchByTenant": "Search by Tenant Name", + "searchByTimestampWithHint": "Search MessageId by timestamp,submit with enter key", "searchByTopic": "Search by Topic Name", "second": "second", "settings": "Settings", diff --git a/lib/l10n/intl_zh.arb b/lib/l10n/intl_zh.arb index c3bb531..6f5454a 100644 --- a/lib/l10n/intl_zh.arb +++ b/lib/l10n/intl_zh.arb @@ -21,10 +21,12 @@ "deleteTopic": "删除 Topic", "detail": "详细信息", "email": "邮箱", + "entryId": "编号", "execute": "执行", "forceDelete": "强制删除", "isLeader": "是否是主节点", "languageSettings": "语言设置", + "messageList": "消息列表", "name": "名称", "namespace": "命名空间", "namespaceName": "命名空间名称", @@ -35,9 +37,11 @@ "producer": "生产者", "producerList": "生产者列表", "refresh": "刷新", - "searchByMessageIdWithHint": "通过messageId查询消息,格式ledgerId*entryId,按enter键进行查询。", + "searchByMessageId": "通过messageId查询消息,格式ledgerId entryId,按enter键进行查询。", + "searchByMessageIdWithHint": "通过messageId查询消息,单条查询格式ledgerId entryId,范围查询格式位ledgerId entryId entryId,按enter键进行查询。", "searchByNamespace": "按命名空间名称搜索", "searchByTenant": "按租户名称搜索", + "searchByTimestampWithHint": "通过时间戳查询消息Id, 按enter键进行查询", "searchByTopic": "按 Topic 名称搜索", "second": "秒", "settings": "设置", diff --git a/lib/ui/pulsar/screen/pulsar_partitioned_topic.dart b/lib/ui/pulsar/screen/pulsar_partitioned_topic.dart index 356e8e2..b794f5e 100644 --- a/lib/ui/pulsar/screen/pulsar_partitioned_topic.dart +++ b/lib/ui/pulsar/screen/pulsar_partitioned_topic.dart @@ -1,12 +1,12 @@ import 'package:flutter/material.dart'; import 'package:paas_dashboard_flutter/generated/l10n.dart'; import 'package:paas_dashboard_flutter/ui/pulsar/widget/pulsar_partitioned_topic_basic.dart'; -import 'package:paas_dashboard_flutter/ui/pulsar/widget/pulsar_partitioned_topic_consume.dart'; +import 'package:paas_dashboard_flutter/ui/pulsar/widget/pulsar_partitioned_topic_consumer.dart'; import 'package:paas_dashboard_flutter/ui/pulsar/widget/pulsar_partitioned_topic_detail.dart'; import 'package:paas_dashboard_flutter/ui/pulsar/widget/pulsar_partitioned_topic_producer.dart'; import 'package:paas_dashboard_flutter/ui/pulsar/widget/pulsar_partitioned_topic_subscription.dart'; import 'package:paas_dashboard_flutter/vm/pulsar/pulsar_partitioned_topic_basic_view_model.dart'; -import 'package:paas_dashboard_flutter/vm/pulsar/pulsar_partitioned_topic_consume_view_model.dart'; +import 'package:paas_dashboard_flutter/vm/pulsar/pulsar_partitioned_topic_consumer_view_model.dart'; import 'package:paas_dashboard_flutter/vm/pulsar/pulsar_partitioned_topic_detail_view_model.dart'; import 'package:paas_dashboard_flutter/vm/pulsar/pulsar_partitioned_topic_producer_view_model.dart'; import 'package:paas_dashboard_flutter/vm/pulsar/pulsar_partitioned_topic_subscription_view_model.dart'; @@ -62,9 +62,9 @@ class _PulsarPartitionedTopicState extends State { child: PulsarPartitionedTopicSubscriptionWidget(), ).build(context), ChangeNotifierProvider( - create: (context) => PulsarPartitionedTopicConsumeViewModel( + create: (context) => PulsarPartitionedTopicConsumerViewModel( vm.pulsarInstancePo, vm.tenantResp, vm.namespaceResp, vm.topicResp), - child: PulsarPartitionedTopicConsumeWidget(), + child: PulsarPartitionedTopicConsumerWidget(), ).build(context), ChangeNotifierProvider( create: (context) => PulsarPartitionedTopicProducerViewModel( diff --git a/lib/ui/pulsar/widget/pulsar_partitioned_topic_consume.dart b/lib/ui/pulsar/widget/pulsar_partitioned_topic_consumer.dart similarity index 86% rename from lib/ui/pulsar/widget/pulsar_partitioned_topic_consume.dart rename to lib/ui/pulsar/widget/pulsar_partitioned_topic_consumer.dart index ab3a8e6..e2c1883 100644 --- a/lib/ui/pulsar/widget/pulsar_partitioned_topic_consume.dart +++ b/lib/ui/pulsar/widget/pulsar_partitioned_topic_consumer.dart @@ -2,29 +2,29 @@ import 'package:flutter/material.dart'; import 'package:paas_dashboard_flutter/generated/l10n.dart'; import 'package:paas_dashboard_flutter/ui/util/exception_util.dart'; import 'package:paas_dashboard_flutter/ui/util/spinner_util.dart'; -import 'package:paas_dashboard_flutter/vm/pulsar/pulsar_partitioned_topic_consume_view_model.dart'; +import 'package:paas_dashboard_flutter/vm/pulsar/pulsar_partitioned_topic_consumer_view_model.dart'; import 'package:provider/provider.dart'; -class PulsarPartitionedTopicConsumeWidget extends StatefulWidget { - PulsarPartitionedTopicConsumeWidget(); +class PulsarPartitionedTopicConsumerWidget extends StatefulWidget { + PulsarPartitionedTopicConsumerWidget(); @override State createState() { - return new PulsarPartitionedTopicConsumeWidgetState(); + return new PulsarPartitionedTopicConsumerWidgetState(); } } -class PulsarPartitionedTopicConsumeWidgetState extends State { +class PulsarPartitionedTopicConsumerWidgetState extends State { @override void initState() { super.initState(); - final vm = Provider.of(context, listen: false); + final vm = Provider.of(context, listen: false); vm.fetchConsumers(); } @override Widget build(BuildContext context) { - final vm = Provider.of(context); + final vm = Provider.of(context); if (vm.loading) { WidgetsBinding.instance!.addPostFrameCallback((timeStamp) { SpinnerUtil.create(); diff --git a/lib/ui/pulsar/widget/pulsar_topic_consume.dart b/lib/ui/pulsar/widget/pulsar_topic_consume.dart index 81b81be..549b1b4 100644 --- a/lib/ui/pulsar/widget/pulsar_topic_consume.dart +++ b/lib/ui/pulsar/widget/pulsar_topic_consume.dart @@ -1,4 +1,9 @@ import 'package:flutter/material.dart'; +import 'package:paas_dashboard_flutter/generated/l10n.dart'; +import 'package:paas_dashboard_flutter/ui/util/exception_util.dart'; +import 'package:paas_dashboard_flutter/ui/util/spinner_util.dart'; +import 'package:paas_dashboard_flutter/vm/pulsar/pulsar_topic_consume_view_model.dart'; +import 'package:provider/provider.dart'; class PulsarTopicConsumeWidget extends StatefulWidget { PulsarTopicConsumeWidget(); @@ -10,15 +15,125 @@ class PulsarTopicConsumeWidget extends StatefulWidget { } class PulsarTopicConsumeWidgetState extends State { - final searchTextController = TextEditingController(); + final searchTimestampController = TextEditingController(); + final searchMessageIdController = TextEditingController(); @override void initState() { super.initState(); + searchTimestampController.addListener(() {}); + searchMessageIdController.addListener(() {}); } @override Widget build(BuildContext context) { - throw UnimplementedError(); + final vm = Provider.of(context); + if (vm.loading) { + WidgetsBinding.instance!.addPostFrameCallback((timeStamp) { + SpinnerUtil.create(); + }); + } + ExceptionUtil.processLoadException(vm, context); + ExceptionUtil.processOpException(vm, context); + var messageId = TextEditingController(text: vm.messageId); + var messageList = SingleChildScrollView(); + if (vm.messageList.length != 0) { + messageList = SingleChildScrollView( + scrollDirection: Axis.vertical, + child: DataTable( + showCheckboxColumn: false, + columns: [ + DataColumn(label: Text(S.of(context).entryId)), + DataColumn(label: Text(S.of(context).messageList)), + ], + rows: vm.messageList + .map((data) => DataRow(cells: [ + DataCell( + TextField( + controller: TextEditingController(text: data.entryId), + maxLines: null, + keyboardType: TextInputType.multiline, + decoration: InputDecoration(border: InputBorder.none), + readOnly: true, + ), + ), + DataCell( + TextField( + controller: TextEditingController(text: data.message), + maxLines: null, + keyboardType: TextInputType.multiline, + decoration: InputDecoration(border: InputBorder.none), + readOnly: true, + ), + ), + ])) + .toList()), + ); + } + + var body = ListView( + children: [ + Container( + margin: EdgeInsetsDirectional.only(top: 10), + child: TextField( + controller: searchMessageIdController, + decoration: InputDecoration( + fillColor: Colors.green, + labelText: Text(S.of(context).searchByTimestampWithHint).data, + hintText: Text(S.of(context).searchByTimestampWithHint).data, + enabledBorder: OutlineInputBorder( + borderSide: BorderSide( + color: Colors.blueGrey, + width: 5.0, + ))), + cursorColor: Colors.orange, + onSubmitted: (text) { + vm.fetchMessageId(text); + }, + ), + ), + Container( + margin: EdgeInsetsDirectional.only(top: 10), + child: TextField( + controller: messageId, + maxLines: null, + keyboardType: TextInputType.multiline, + decoration: InputDecoration(border: InputBorder.none), + readOnly: true, + ), + ), + Container( + margin: EdgeInsetsDirectional.only(top: 10), + child: TextField( + controller: searchTimestampController, + decoration: InputDecoration( + fillColor: Colors.green, + labelText: Text(S.of(context).searchByMessageIdWithHint).data, + hintText: Text(S.of(context).searchByMessageIdWithHint).data, + enabledBorder: OutlineInputBorder( + borderSide: BorderSide( + color: Colors.blueGrey, + width: 5.0, + ))), + cursorColor: Colors.orange, + onSubmitted: (text) { + vm.fetchConsumerMessage(text); + }, + ), + ), + Container( + margin: EdgeInsetsDirectional.only(top: 10), + child: TextField( + controller: TextEditingController(text: vm.message), + maxLines: null, + keyboardType: TextInputType.multiline, + decoration: InputDecoration(border: InputBorder.none), + readOnly: true, + ), + ), + messageList + ], + ); + return body; } } diff --git a/lib/vm/pulsar/pulsar_partitioned_topic_consume_view_model.dart b/lib/vm/pulsar/pulsar_partitioned_topic_consumer_view_model.dart similarity index 83% rename from lib/vm/pulsar/pulsar_partitioned_topic_consume_view_model.dart rename to lib/vm/pulsar/pulsar_partitioned_topic_consumer_view_model.dart index 900881b..5ffed29 100644 --- a/lib/vm/pulsar/pulsar_partitioned_topic_consume_view_model.dart +++ b/lib/vm/pulsar/pulsar_partitioned_topic_consumer_view_model.dart @@ -6,16 +6,16 @@ import 'package:paas_dashboard_flutter/module/pulsar/pulsar_topic.dart'; import 'package:paas_dashboard_flutter/persistent/po/pulsar_instance_po.dart'; import 'package:paas_dashboard_flutter/vm/base_load_list_view_model.dart'; -class PulsarPartitionedTopicConsumeViewModel extends BaseLoadListViewModel { +class PulsarPartitionedTopicConsumerViewModel extends BaseLoadListViewModel { final PulsarInstancePo pulsarInstancePo; final TenantResp tenantResp; final NamespaceResp namespaceResp; final TopicResp topicResp; - PulsarPartitionedTopicConsumeViewModel(this.pulsarInstancePo, this.tenantResp, this.namespaceResp, this.topicResp); + PulsarPartitionedTopicConsumerViewModel(this.pulsarInstancePo, this.tenantResp, this.namespaceResp, this.topicResp); - PulsarPartitionedTopicConsumeViewModel deepCopy() { - return new PulsarPartitionedTopicConsumeViewModel( + PulsarPartitionedTopicConsumerViewModel deepCopy() { + return new PulsarPartitionedTopicConsumerViewModel( pulsarInstancePo.deepCopy(), tenantResp.deepCopy(), namespaceResp.deepCopy(), topicResp.deepCopy()); } diff --git a/lib/vm/pulsar/pulsar_topic_consume_view_model.dart b/lib/vm/pulsar/pulsar_topic_consume_view_model.dart index 48db072..bab7dc9 100644 --- a/lib/vm/pulsar/pulsar_topic_consume_view_model.dart +++ b/lib/vm/pulsar/pulsar_topic_consume_view_model.dart @@ -1,3 +1,4 @@ +import 'package:paas_dashboard_flutter/api/pulsar/pulsar_topic_api.dart'; import 'package:paas_dashboard_flutter/module/pulsar/pulsar_consume.dart'; import 'package:paas_dashboard_flutter/module/pulsar/pulsar_namespace.dart'; import 'package:paas_dashboard_flutter/module/pulsar/pulsar_tenant.dart'; @@ -10,14 +11,12 @@ class PulsarTopicConsumeViewModel extends BaseLoadListViewModel { final TenantResp tenantResp; final NamespaceResp namespaceResp; final TopicResp topicResp; + String? message; + String? messageId; + List messageList = []; PulsarTopicConsumeViewModel(this.pulsarInstancePo, this.tenantResp, this.namespaceResp, this.topicResp); - PulsarTopicConsumeViewModel deepCopy() { - return new PulsarTopicConsumeViewModel( - pulsarInstancePo.deepCopy(), tenantResp.deepCopy(), namespaceResp.deepCopy(), topicResp.deepCopy()); - } - int get id { return this.pulsarInstancePo.id; } @@ -45,4 +44,56 @@ class PulsarTopicConsumeViewModel extends BaseLoadListViewModel { String get topic { return this.topicResp.topicName; } + + Future fetchConsumerMessage(String messageId) async { + try { + var messageIdArr = messageId.split(" "); + String data; + message = ""; + messageList = []; + if (messageIdArr.length == 2) { + data = await PulsarTopicApi.fetchConsumerMessage( + host, port, tenant, namespace, topic, messageIdArr[0], messageIdArr[1]); + this.message = data.substring(data.indexOf("@") + 1); + } + if (messageIdArr.length == 3) { + var startEntryId = int.parse(messageIdArr[1]); + var endEntryId = int.parse(messageIdArr[2]); + for (int i = startEntryId; i <= endEntryId; i++) { + data = await PulsarTopicApi.fetchConsumerMessage( + host, port, tenant, namespace, topic, messageIdArr[0], i.toString()); + Message message = new Message(i.toString(), data.substring(data.indexOf("@") + 1)); + messageList.add(message); + } + } + loadSuccess(); + } on Exception catch (e) { + loadException = e; + loading = false; + } + notifyListeners(); + } + + Future fetchMessageId(String timestamp) async { + try { + if (timestamp == "") { + return null; + } + String data; + data = await PulsarTopicApi.fetchMessageId(host, port, tenant, namespace, topic, timestamp); + this.messageId = data; + loadSuccess(); + } on Exception catch (e) { + loadException = e; + loading = false; + } + notifyListeners(); + } +} + +class Message { + String entryId; + String message; + + Message(this.entryId, this.message); }