Skip to content

Commit

Permalink
Remove the nativeHeader conversion logic (#24669)
Browse files Browse the repository at this point in the history
Remove the nativeHeader conversion logic in EventHub message converter
  • Loading branch information
Moary Chen authored Oct 12, 2021
1 parent cf5c1c2 commit d37ed4c
Show file tree
Hide file tree
Showing 3 changed files with 10 additions and 106 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -138,52 +138,4 @@ private <U> Message<?> internalToMessage(I azureMessage, Map<String, Object> hea

return MessageBuilder.withPayload(fromPayload(payload, targetPayloadClass)).copyHeaders(headers).build();
}

/**
* Convert the json string to class targetType instance.
* @param value json string
* @param targetType target class to convert
* @param <M> Target class type
* @return Return the corresponding class instance
* @throws ConversionException When fail to convert.
*/
protected <M> M readValue(String value, Class<M> targetType) {
try {
return getObjectMapper().readValue(value, targetType);
} catch (IOException e) {
throw new ConversionException("Failed to read JSON: " + value, e);
}
}

/**
* Check value is valid json string.
* @param value json string to check
* @return true if it's json string.
*/
protected boolean isValidJson(Object value) {
try {
if (value instanceof String) {
getObjectMapper().readTree((String) value);
return true;
}
LOGGER.warn("Not a valid json string: " + value);
return false;
} catch (IOException e) {
return false;
}
}

/**
* Convert the object to json string
* @param value object to be converted
* @return json string
* @throws ConversionException When fail to convert.
*/
protected String toJson(Object value) {
try {
return getObjectMapper().writeValueAsString(value);
} catch (IOException e) {
throw new ConversionException("Failed to convert to JSON: " + value.toString(), e);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@
import org.slf4j.LoggerFactory;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageHeaders;
import org.springframework.util.LinkedMultiValueMap;

import java.nio.charset.StandardCharsets;
import java.util.Arrays;
Expand All @@ -20,8 +19,6 @@
import java.util.Map;
import java.util.Set;

import static org.springframework.messaging.support.NativeMessageHeaderAccessor.NATIVE_HEADERS;

/**
* A converter to turn a {@link Message} to {@link EventData} and vice versa.
*
Expand Down Expand Up @@ -56,15 +53,11 @@ protected EventData fromByte(byte[] payload) {
protected void setCustomHeaders(MessageHeaders headers, EventData azureMessage) {
super.setCustomHeaders(headers, azureMessage);
headers.forEach((key, value) -> {
if (key.equals(NATIVE_HEADERS) && value instanceof LinkedMultiValueMap) {
azureMessage.getProperties().put(key, toJson(value));
if (SYSTEM_HEADERS.contains(key)) {
LOGGER.warn("System property {}({}) is not allowed to be defined and will be ignored.",
key, value);
} else {
if (SYSTEM_HEADERS.contains(key)) {
LOGGER.warn("System property {}({}) is not allowed to be defined and will be ignored.",
key, value);
} else {
azureMessage.getProperties().put(key, value.toString());
}
azureMessage.getProperties().put(key, value.toString());
}
});
}
Expand All @@ -75,11 +68,6 @@ protected Map<String, Object> buildCustomHeaders(EventData azureMessage) {

headers.putAll(getSystemProperties(azureMessage));

Map<String, Object> properties = azureMessage.getProperties();
if (properties.containsKey(NATIVE_HEADERS) && isValidJson(properties.get(NATIVE_HEADERS))) {
String nativeHeader = (String) properties.remove(NATIVE_HEADERS);
properties.put(NATIVE_HEADERS, readValue(nativeHeader, LinkedMultiValueMap.class));
}
headers.putAll(azureMessage.getProperties());
return headers;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,10 @@
import org.springframework.integration.support.MessageBuilder;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageHeaders;
import org.springframework.util.LinkedMultiValueMap;

import java.nio.charset.StandardCharsets;
import java.time.Instant;
import java.time.temporal.ChronoUnit;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;

Expand Down Expand Up @@ -59,22 +57,6 @@ public void testNonUtf8DecodingPayload() {
assertNotEquals(payload, azureMessage.getBodyAsString());
}

private static class MyEventHubMessageConverter extends EventHubMessageConverter {

public void setCustomHeaders(MessageHeaders headers, EventData azureMessage) {
super.setCustomHeaders(headers, azureMessage);
}

public Map<String, Object> buildCustomHeaders(EventData azureMessage) {
return super.buildCustomHeaders(azureMessage);
}

@Override
protected String toJson(Object value) {
return super.toJson(value);
}
}

@Test
public void testConvertCustomHeadersToEventData() {
Map<String, Object> headerMap = new HashMap<>();
Expand All @@ -83,37 +65,19 @@ public void testConvertCustomHeadersToEventData() {

EventData eventData = new EventData(EVENT_DATA);

MyEventHubMessageConverter converter = new MyEventHubMessageConverter();
EventHubMessageConverter converter = new EventHubMessageConverter();
converter.setCustomHeaders(headers, eventData);

assertEquals(eventData.getProperties().get("fake-header"), "fake-value");
assertEquals(eventData.getBodyAsString(), EVENT_DATA);
}

@Test
public void testConvertNativeHeadersToEventData() {
Map<String, Object> headerMap = new HashMap<>();
LinkedMultiValueMap<String, String> nativeHeaders = new LinkedMultiValueMap<>();
nativeHeaders.put("spanId", Arrays.asList("spanId-1", "spanId-2"));
nativeHeaders.put("spanTraceId", Arrays.asList("spanTraceId-1", "spanTraceId-2"));
headerMap.put(NATIVE_HEADERS, nativeHeaders);
MessageHeaders headers = new MessageHeaders(headerMap);

EventData eventData = new EventData(EVENT_DATA);

MyEventHubMessageConverter converter = new MyEventHubMessageConverter();
converter.setCustomHeaders(headers, eventData);

assertEquals(eventData.getProperties().get(NATIVE_HEADERS).getClass(), String.class);
assertEquals(eventData.getProperties().get(NATIVE_HEADERS), converter.toJson(nativeHeaders));
}

@Test
public void testCustomHeadersFromEventData() {
EventData eventData = new EventData(EVENT_DATA);
eventData.getProperties().put("fake-header", "fake-value");

MyEventHubMessageConverter converter = new MyEventHubMessageConverter();
EventHubMessageConverter converter = new EventHubMessageConverter();
Map<String, Object> headerHeadersMap = converter.buildCustomHeaders(eventData);
assertEquals(headerHeadersMap.get("fake-header"), "fake-value");
assertEquals(eventData.getBodyAsString(), EVENT_DATA);
Expand All @@ -125,9 +89,9 @@ public void testNativeHeadersFromEventData() {
String nativeHeadersString = "{\"spanId\":[\"spanId-1\", \"spanId-2\"],\"spanTraceId\":[\"spanTraceId-1\", \"spanTraceId-2\"]}";
eventData.getProperties().put(NATIVE_HEADERS, nativeHeadersString);

MyEventHubMessageConverter converter = new MyEventHubMessageConverter();
EventHubMessageConverter converter = new EventHubMessageConverter();
Map<String, Object> headerHeadersMap = converter.buildCustomHeaders(eventData);
assertEquals(headerHeadersMap.get(NATIVE_HEADERS).getClass(), LinkedMultiValueMap.class);
assertEquals(headerHeadersMap.get(NATIVE_HEADERS).getClass(), String.class);
}

@Test
Expand All @@ -141,7 +105,7 @@ public void testSystemPropertiesScreenedOut() {

EventData eventData = new EventData(EVENT_DATA);

MyEventHubMessageConverter converter = new MyEventHubMessageConverter();
EventHubMessageConverter converter = new EventHubMessageConverter();
converter.setCustomHeaders(headers, eventData);

assertFalse(eventData.getProperties().containsKey(EventHubHeaders.PARTITION_KEY));
Expand All @@ -154,7 +118,7 @@ public void testSystemPropertiesScreenedOut() {
public void testSystemPropertiesConvertedFromEventData() {
EventData eventData = new EventData(EVENT_DATA);

MyEventHubMessageConverter converter = new MyEventHubMessageConverter();
EventHubMessageConverter converter = new EventHubMessageConverter();
Map<String, Object> headerHeadersMap = converter.buildCustomHeaders(eventData);

assertTrue(headerHeadersMap.containsKey(EventHubHeaders.ENQUEUED_TIME));
Expand Down

0 comments on commit d37ed4c

Please sign in to comment.