diff --git a/megabus/src/main/java/com/bazaarvoice/megabus/refproducer/MegabusRefProducer.java b/megabus/src/main/java/com/bazaarvoice/megabus/refproducer/MegabusRefProducer.java index 06eb9e2604..77c5b63a89 100644 --- a/megabus/src/main/java/com/bazaarvoice/megabus/refproducer/MegabusRefProducer.java +++ b/megabus/src/main/java/com/bazaarvoice/megabus/refproducer/MegabusRefProducer.java @@ -59,6 +59,8 @@ public class MegabusRefProducer extends AbstractScheduledService { private final Meter _eventMeter; private final Meter _errorMeter; + private static final Logger logger = LoggerFactory.getLogger(MegabusRefProducer.class); + public MegabusRefProducer(MegabusRefProducerConfiguration config, DatabusEventStore eventStore, RateLimitedLogFactory logFactory, MetricRegistry metricRegistry, Producer producer, ObjectMapper objectMapper, Topic topic, @@ -147,6 +149,9 @@ boolean peekAndAckEvents() { .map(ref -> new MegabusRef(ref.getTable(), ref.getKey(), ref.getChangeId(), _clock.instant(), MegabusRef.RefType.NORMAL)) .collect(Collectors.groupingBy(ref -> { String key = Coordinate.of(ref.getTable(), ref.getKey()).toString(); + if(ref.getTable().contains("apikey")){ + logger.info("debugging mega-bus delay while provisioning apikey in MegabusRefProducer: = {}",key); + } return Utils.toPositive(Utils.murmur2(key.getBytes())) % _topic.getPartitions(); }, Collectors.toList())) .entrySet() diff --git a/megabus/src/main/java/com/bazaarvoice/megabus/tableevents/TableEventProcessor.java b/megabus/src/main/java/com/bazaarvoice/megabus/tableevents/TableEventProcessor.java index 178038ee3a..6f713d26ee 100644 --- a/megabus/src/main/java/com/bazaarvoice/megabus/tableevents/TableEventProcessor.java +++ b/megabus/src/main/java/com/bazaarvoice/megabus/tableevents/TableEventProcessor.java @@ -37,7 +37,7 @@ public class TableEventProcessor extends AbstractScheduledService { private static final int FUTURE_BATCH_SIZE = 10000; - private static final Logger _log = LoggerFactory.getLogger(TableEventProcessor.class); + private static final Logger logger = LoggerFactory.getLogger(TableEventProcessor.class); private final TableEventRegistry _tableEventRegistry; private final MetricRegistry _metricRegistry; @@ -80,11 +80,13 @@ protected void runOneIteration() throws Exception { } private void processTableEvent(String table, String uuid, MegabusRef.RefType refType) { - Iterator> futures = _tableEventTools.getIdsForStorage(table, uuid) .map(key -> new MegabusRef(table, key, TimeUUIDs.minimumUuid(), null, refType)) .map(ref -> { String key = Coordinate.of(ref.getTable(), ref.getKey()).toString(); + if(table.contains("apikey")){ + logger.info("debugging mega-bus delay while provisioning apikey in TableEventProcessor: = {}",key); + } return new ProducerRecord(_topic.getName(), Utils.toPositive(Utils.murmur2(key.getBytes())) % _topic.getPartitions(), TimeUUIDs.newUUID().toString(),_objectMapper.valueToTree(Collections.singletonList(ref)));