Skip to content

Commit

Permalink
Merge pull request #116 from usdot-jpo-ode/compression
Browse files Browse the repository at this point in the history
Compression
John-Wiens authored Oct 30, 2024
2 parents d8c7b58 + 123bea3 commit 8a2cfe2
Showing 26 changed files with 25 additions and 58 deletions.
Original file line number Diff line number Diff line change
@@ -130,8 +130,6 @@ public APIServiceController(
try {

logger.info("Starting {}", this.getClass().getSimpleName());

System.out.println("Controller is Null" + stompController != null);

SpatSocketForwardTopology spatSocketForwardTopology = new SpatSocketForwardTopology(
"topic.ProcessedSpat",
Original file line number Diff line number Diff line change
@@ -98,7 +98,7 @@ public class ConflictMonitorApiProperties {
private String securitySvcsSignatureEndpoint = "sign";



private int lingerMs = 0;


@Autowired
@@ -185,6 +185,15 @@ public void setKafkaBrokers(String kafkaBrokers) {
this.kafkaBrokers = kafkaBrokers;
}

@Value("${kafka.linger_ms}")
public void setKafkaLingerMs(int lingerMs) {
this.lingerMs = lingerMs;
}

public int getKafkaLingerMs() {
return lingerMs;
}

public String getKafkaProducerType() {
return kafkaProducerType;
}
@@ -257,6 +266,8 @@ public void setKafkaTopicsDisabledSet(Set<String> kafkaTopicsDisabledSet) {
this.kafkaTopicsDisabledSet = kafkaTopicsDisabledSet;
}




@Bean
public ObjectMapper defaultMapper() {
@@ -380,7 +391,11 @@ public Properties createStreamProperties(String name) {
streamProps.put(ProducerConfig.DELIVERY_TIMEOUT_MS_CONFIG, FIVE_MINUTES_MS);

// Disable batching
streamProps.put(ProducerConfig.BATCH_SIZE_CONFIG, 0);
// streamProps.put(ProducerConfig.BATCH_SIZE_CONFIG, 0);

streamProps.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "zstd");
streamProps.put(ProducerConfig.LINGER_MS_CONFIG, getKafkaLingerMs());


if (confluentCloudEnabled) {
streamProps.put("ssl.endpoint.identification.algorithm", "https");
Original file line number Diff line number Diff line change
@@ -299,7 +299,6 @@ public void addSpatBroadcastRateDistribution(List<IDCount> data, Long startTime,
for(IDCount elem : data){
int index = Integer.parseInt(elem.getId()) / 10;
output.get(index).setCount(elem.getCount());
System.out.println(elem);
}

// Convert to Chart Data and generate graph
@@ -348,7 +347,6 @@ public void addMapBroadcastRateDistribution(List<IDCount> data, Long startTime,
for(IDCount elem : data){
int index = Integer.parseInt(elem.getId());
output.get(index).setCount(elem.getCount());
System.out.println(elem);
}

// Convert to Chart Data and generate graph
Original file line number Diff line number Diff line change
@@ -43,7 +43,6 @@ public Query getQuery(Integer intersectionID, Integer roadRegulatorID, String no
}

if(key != null){
System.out.println(key);
query.addCriteria(Criteria.where("key").is(key));
}

Original file line number Diff line number Diff line change
@@ -220,14 +220,14 @@ public static EncodedMessage identifyAsn1(String hexPacket){
public static String decodeXmlWithAcm(String xmlMessage) throws Exception {


System.out.println("Decoding Message: " + xmlMessage);
log.info("Decoding Message: " + xmlMessage);
log.info("Decoding message: {}", xmlMessage);

// Save XML to temp file
String tempDir = FileUtils.getTempDirectoryPath();
String tempFileName = "asn1-codec-java-" + UUID.randomUUID().toString() + ".xml";
log.info("Temp file name: {}", tempFileName);
System.out.println("Temp File Name: " + tempFileName);
log.info("Temp File Name: " + tempFileName);
Path tempFilePath = Path.of(tempDir, tempFileName);
File tempFile = new File(tempFilePath.toString());
FileUtils.writeStringToFile(tempFile, xmlMessage, StandardCharsets.UTF_8);
@@ -239,7 +239,7 @@ public static String decodeXmlWithAcm(String xmlMessage) throws Exception {
Process process = pb.start();
String result = IOUtils.toString(process.getInputStream(), StandardCharsets.UTF_8);
log.info("Result: {}", result);
System.out.println("Decode Result: " + result);
log.info("Decode Result: " + result);

// Clean up temp file
tempFile.delete();
Original file line number Diff line number Diff line change
@@ -35,7 +35,6 @@ public ZonedDateTime convert(@NonNull String source) {
// Block of code to handle errors
}
}
System.out.println("Unable to Parse the following source time: "+ source);
return ZonedDateTime.of(0, 0, 0, 0, 0, 0, 0, null);
}
}
Original file line number Diff line number Diff line change
@@ -30,8 +30,6 @@ public class KeycloakNoSecurityConfig {

@Bean
public SecurityFilterChain securityFilterChain(HttpSecurity httpSecurity) throws Exception {

System.out.println("Running without KeyCloak Authentication");
return httpSecurity
.sessionManagement(sm -> sm.sessionCreationPolicy(SessionCreationPolicy.STATELESS))
.cors(corsConfigurer -> CorsUtil.configureCors(corsConfigurer, properties))
Original file line number Diff line number Diff line change
@@ -35,8 +35,6 @@ public class KeycloakSecurityConfig {
@Bean
public SecurityFilterChain securityFilterChain(HttpSecurity httpSecurity) throws Exception {

System.out.println("Running with KeyCloak Authentication");

return httpSecurity
.sessionManagement(sm -> sm.sessionCreationPolicy(SessionCreationPolicy.STATELESS))
.cors(corsConfigurer -> CorsUtil.configureCors(corsConfigurer, properties))
Original file line number Diff line number Diff line change
@@ -34,7 +34,6 @@ class MethodSecurityConfig {
public MethodSecurityConfig(PermissionEvaluator permissionEvaluator, ApplicationContext applicationContext) {
this.applicationContext = applicationContext;
this.permissionEvaluator = permissionEvaluator;
System.out.println("Method-level security annotations are enabled");
}

@Bean
Original file line number Diff line number Diff line change
@@ -20,11 +20,6 @@ public static AuthorizationDecision checkAccess(Supplier<Authentication> authent

var auth = authentication.get();

System.out.printf("Check access for username=%s path=%s%n", auth.getName(), requestContext.getRequest().getRequestURI());
System.out.printf("Authorities: %s%n", auth.getAuthorities());
System.out.printf("Is authenticated: %s%n", auth.isAuthenticated());
System.out.printf("Details: %s%n", auth.getDetails());

return auth.isAuthenticated() ? GRANTED : DENIED;
}
}
Original file line number Diff line number Diff line change
@@ -16,8 +16,6 @@ class DefaultPermissionEvaluator implements PermissionEvaluator {

@Override
public boolean hasPermission(Authentication auth, Object targetDomainObject, Object permission) {
System.out.printf("check permission user=%s target=%s permission=%s%n", auth.getName(), targetDomainObject, permission);

return true;
}

Original file line number Diff line number Diff line change
@@ -30,7 +30,6 @@ public JwtAuthenticationToken convert(Jwt jwt) {
String username = getUsernameFrom(jwt);

var token = new JwtAuthenticationToken(jwt, authorities, username);
System.out.printf("KeycloakJwtAuthenticationConverter: Converted token: %s%n Authorities: %s%n Username: %s%n", token, authorities, username);
return token;
}

Original file line number Diff line number Diff line change
@@ -50,7 +50,7 @@ public static EmailSettings fromAttributes(Map<String, List<String>> attributes)
}


System.out.println("No Settings found Returning Default");
logger.info("No Settings found Returning Default");
return new EmailSettings();
}

Original file line number Diff line number Diff line change
@@ -51,7 +51,6 @@ public String toString() {
try {
testReturn = (mapper.writeValueAsString(this));
} catch (JsonProcessingException e) {
System.out.println(e);
}
return testReturn;
}
Original file line number Diff line number Diff line change
@@ -70,9 +70,7 @@ public void sendEmailViaSendGrid(String to, String subject, String text) {
request.setEndpoint("mail/send");
request.setBody(mail.build());
Response response = this.sendGrid.api(request);
System.out.println("Message Sent via Sendgrid to:" + to);
} catch (IOException ex) {
System.out.println(ex);
}
}

@@ -87,9 +85,7 @@ public void sendEmailViaPostmark(String to, String subject, String text){
htmlText);
try {
postmark.deliverMessage(message);
System.out.println("Message Sent via Postmark to:" + to);
} catch (PostmarkException | IOException e) {
System.out.println("Unable to send message" + e);
}
}

@@ -99,13 +95,11 @@ public void sendEmailViaSpringMail(String to, String subject, String text){
message.setSubject(subject);
message.setText(text);
mailSender.send(message);
System.out.println("Message Sent Via SMTP to: " + to);
}



public void sendSimpleMessage(String to, String subject, String text) {
System.out.println("Sending Simple Message");
if(props.getEmailBroker().equals("sendgrid")){
sendEmailViaSendGrid(to, subject, text);
}else if (props.getEmailBroker().equals("postmark")){
@@ -217,8 +211,6 @@ public List<UserRepresentation> getEmailList(List<String> notificationTypes, Lis

}

System.out.println("Returning" + emailList.size() + "Users");

return emailList;

}
Original file line number Diff line number Diff line change
@@ -26,10 +26,8 @@ public static List<OdeBsmData> getJsonBsms(){
OdeBsmData bsm = objectMapper.readValue(bsmString, OdeBsmData.class);
bsms.add(bsm);
} catch (JsonMappingException e) {
System.out.println("A Json Mapping Exception Occurred while trying to get data from mocked BSM.");
e.printStackTrace();
} catch (JsonProcessingException e) {
System.out.println("A Json Processing Exception Occurred while trying to get data from a mocked BSM.");
e.printStackTrace();
}
return bsms;
Original file line number Diff line number Diff line change
@@ -32,10 +32,8 @@ public static List<ProcessedMap<LineString>> getProcessedMaps(){
ProcessedMap<LineString> map = objectMapper.readValue(processedMapString, typeReference);
maps.add(map);
} catch (JsonMappingException e) {
System.out.println("A Json Mapping Exception Occurred while trying to get data from mocked map.");
e.printStackTrace();
} catch (JsonProcessingException e) {
System.out.println("A Json Processing Exception Occurred while trying to get data from a mocked map.");
e.printStackTrace();
}
return maps;
@@ -51,10 +49,8 @@ public static List<OdeMapData> getJsonMaps(){
OdeMapData map = objectMapper.readValue(mapString, OdeMapData.class);
maps.add(map);
} catch (JsonMappingException e) {
System.out.println("A Json Mapping Exception Occurred while trying to get data from mocked map.");
e.printStackTrace();
} catch (JsonProcessingException e) {
System.out.println("A Json Processing Exception Occurred while trying to get data from a mocked map.");
e.printStackTrace();
}
return maps;
Original file line number Diff line number Diff line change
@@ -24,10 +24,8 @@ public static List<ProcessedSpat> getProcessedSpats(){
ProcessedSpat spat = objectMapper.readValue(processedSpatString, ProcessedSpat.class);
spats.add(spat);
} catch (JsonMappingException e) {
System.out.println("A Json Mapping Exception Occurred while trying to get data from mocked spat.");
e.printStackTrace();
} catch (JsonProcessingException e) {
System.out.println("A Json Processing Exception Occurred while trying to get data from a mocked spat.");
e.printStackTrace();
}
return spats;
@@ -43,10 +41,8 @@ public static List<OdeSpatData> getJsonSpats(){
OdeSpatData spat = objectMapper.readValue(spatString, OdeSpatData.class);
spats.add(spat);
} catch (JsonMappingException e) {
System.out.println("A Json Mapping Exception Occurred while trying to get data from mocked spat.");
e.printStackTrace();
} catch (JsonProcessingException e) {
System.out.println("A Json Processing Exception Occurred while trying to get data from a mocked spat.");
e.printStackTrace();
}
return spats;
Original file line number Diff line number Diff line change
@@ -23,10 +23,8 @@ public static List<OdeSrmData> getJsonSrms(){
OdeSrmData srm = objectMapper.readValue(srmString, OdeSrmData.class);
srms.add(srm);
} catch (JsonMappingException e) {
System.out.println("A Json Mapping Exception Occurred while trying to get data from mocked srm.");
e.printStackTrace();
} catch (JsonProcessingException e) {
System.out.println("A Json Processing Exception Occurred while trying to get data from a mocked srm.");
e.printStackTrace();
}
return srms;
Original file line number Diff line number Diff line change
@@ -23,10 +23,8 @@ public static List<OdeSsmData> getJsonSsms(){
OdeSsmData ssm = objectMapper.readValue(ssmString, OdeSsmData.class);
ssms.add(ssm);
} catch (JsonMappingException e) {
System.out.println("A Json Mapping Exception Occurred while trying to get data from mocked ssm.");
e.printStackTrace();
} catch (JsonProcessingException e) {
System.out.println("A Json Processing Exception Occurred while trying to get data from a mocked ssm.");
e.printStackTrace();
}
return ssms;
Original file line number Diff line number Diff line change
@@ -23,10 +23,8 @@ public static List<ObjectNode> getJsonTims(){
ObjectNode tim = objectMapper.readValue(timString, ObjectNode.class);
tims.add(tim);
} catch (JsonMappingException e) {
System.out.println("A Json Mapping Exception Occurred while trying to get data from mocked tim.");
e.printStackTrace();
} catch (JsonProcessingException e) {
System.out.println("A Json Processing Exception Occurred while trying to get data from a mocked tim.");
e.printStackTrace();
}
return tims;
Original file line number Diff line number Diff line change
@@ -21,6 +21,10 @@ schema.bsm: classpath:schemas/bsm.schema.json
schema.map: classpath:schemas/map.schema.json
schema.spat: classpath:schemas/spat.schema.json


# Amount of time to wait to try and increase batching
kafka.linger_ms: 50

kafka.topics:
autoCreateTopics: false
numPartitions: 1
Original file line number Diff line number Diff line change
@@ -111,9 +111,6 @@

// // List<DefaultConfig> responseMap = responseEntity.getBody();

// // System.out.println(responseMap.size());
// // System.out.println(responseMap.get(0));

// // assertEquals(HttpStatus.OK, responseEntity.getStatusCode());
// // assertEquals(MediaType.APPLICATION_JSON, responseEntity.getHeaders().getContentType());
// // assertEquals(test, responseMap.get(0));
Original file line number Diff line number Diff line change
@@ -54,7 +54,6 @@ public void testSpatGetAsOdeData() {
@Test
public void testSpatGetAsOdeJson() throws XmlUtilsException{
OdeSpatData spat = spatDecoder.getAsOdeJson(odeSpatDecodedXmlReference);
System.out.println("testSpatGetAsOdeJson" + spat);
assertEquals(spat.toJson(), odeSpatDecodedDataReference);
}

Original file line number Diff line number Diff line change
@@ -38,8 +38,6 @@ public void testSsmGetAsOdeData() {
OdeData data = ssmDecoder.getAsOdeData(ssm.getAsn1Text());

OdeMsgMetadata metadata = data.getMetadata();

System.out.println(data);
// Copy over fields that might be different
metadata.setOdeReceivedAt("2024-05-14T23:01:21.516531700Z");
metadata.setSerialId(metadata.getSerialId().setStreamId("fc430f29-b761-4a2c-90fb-dc4c9f5d4e9c"));
@@ -52,7 +50,6 @@ public void testSsmGetAsOdeData() {
public void testSsmGetAsOdeJson() throws XmlUtilsException{
OdeSsmData ssm = ssmDecoder.getAsOdeJson(odeSsmDecodedXmlReference);
assertEquals(ssm.toJson(), odeSsmDecodedDataReference);
System.out.println(ssm);
}

}
Original file line number Diff line number Diff line change
@@ -33,7 +33,6 @@ public void testTimGetAsOdeData() {

OdeMsgMetadata metadata = data.getMetadata();

System.out.println(data);
// Copy over fields that might be different
metadata.setOdeReceivedAt("2024-05-14T23:01:21.516531700Z");
metadata.setSerialId(metadata.getSerialId().setStreamId("fc430f29-b761-4a2c-90fb-dc4c9f5d4e9c"));

0 comments on commit 8a2cfe2

Please sign in to comment.