@@ -1579,13 +1579,23 @@ private void buildProducerAndAddTopic(Topic topic, long producerId, String produ
1579
1579
if (ex .getCause () instanceof BrokerServiceException .TopicMigratedException ) {
1580
1580
Optional <ClusterUrl > clusterURL = getMigratedClusterUrl (service .getPulsar ());
1581
1581
if (clusterURL .isPresent ()) {
1582
- log .info ("[{}] redirect migrated producer to topic {}: producerId={}, {}" , remoteAddress , topicName ,
1583
- producerId , ex .getCause ().getMessage ());
1584
- commandSender .sendTopicMigrated (ResourceType .Producer , producerId ,
1585
- clusterURL .get ().getBrokerServiceUrl (), clusterURL .get ().getBrokerServiceUrlTls ());
1586
- closeProducer (producer );
1587
- return null ;
1588
-
1582
+ if (topic .isReplicationBacklogExist ()) {
1583
+ log .info ("Topic {} is migrated but replication backlog exist: "
1584
+ + "producerId = {}, producerName = {}, {}" , topicName ,
1585
+ producerId , producerName , ex .getCause ().getMessage ());
1586
+ } else {
1587
+ log .info ("[{}] redirect migrated producer to topic {}: "
1588
+ + "producerId={}, producerName = {}, {}" , remoteAddress ,
1589
+ topicName , producerId , producerName , ex .getCause ().getMessage ());
1590
+ boolean msgSent = commandSender .sendTopicMigrated (ResourceType .Producer , producerId ,
1591
+ clusterURL .get ().getBrokerServiceUrl (), clusterURL .get ().getBrokerServiceUrlTls ());
1592
+ if (!msgSent ) {
1593
+ log .info ("client doesn't support topic migration handling {}-{}-{}" , topic ,
1594
+ remoteAddress , producerId );
1595
+ }
1596
+ closeProducer (producer );
1597
+ return null ;
1598
+ }
1589
1599
} else {
1590
1600
log .warn ("[{}] failed producer because migration url not configured topic {}: producerId={}, {}" ,
1591
1601
remoteAddress , topicName , producerId , ex .getCause ().getMessage ());
0 commit comments