Skip to content

Commit

Permalink
Added data compaction flag
Browse files Browse the repository at this point in the history
  • Loading branch information
John-Wiens committed May 17, 2024
1 parent eb45e44 commit afc9574
Show file tree
Hide file tree
Showing 6 changed files with 28 additions and 60 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
import us.dot.its.jpo.ode.api.models.IntersectionReferenceData;

public interface ProcessedMapRepository extends DataLoader<ProcessedMap<LineString>>{
Query getQuery(Integer intersectionID, Long startTime, Long endTime,boolean latest);
Query getQuery(Integer intersectionID, Long startTime, Long endTime,boolean latest, boolean compact);

long getQueryResultCount(Query query);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ public class ProcessedMapRepositoryImpl implements ProcessedMapRepository {
private ObjectMapper mapper = DateJsonMapper.getInstance();
private Logger logger = LoggerFactory.getLogger(ProcessedMapRepositoryImpl.class);

public Query getQuery(Integer intersectionID, Long startTime, Long endTime, boolean latest) {
public Query getQuery(Integer intersectionID, Long startTime, Long endTime, boolean latest, boolean compact) {
Query query = new Query();

if (intersectionID != null) {
Expand All @@ -78,8 +78,13 @@ public Query getQuery(Integer intersectionID, Long startTime, Long endTime, bool
query.limit(props.getMaximumResponseSize());
}

if (compact){
query.fields().exclude("recordGeneratedAt", "properties.validationMessages");
}else{
query.fields().exclude("recordGeneratedAt");
}

query.addCriteria(Criteria.where("properties.timeStamp").gte(startTimeString).lte(endTimeString));
query.fields().exclude("recordGeneratedAt");
return query;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
import org.springframework.data.mongodb.core.query.Query;

public interface ProcessedSpatRepository extends DataLoader<ProcessedSpat>{
Query getQuery(Integer intersectionID, Long startTime, Long endTime);
Query getQuery(Integer intersectionID, Long startTime, Long endTime, boolean compact);

long getQueryResultCount(Query query);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,9 @@
import com.fasterxml.jackson.databind.ObjectMapper;

import us.dot.its.jpo.geojsonconverter.DateJsonMapper;
import us.dot.its.jpo.geojsonconverter.pojos.geojson.map.ProcessedMap;
import us.dot.its.jpo.geojsonconverter.pojos.spat.ProcessedSpat;
import us.dot.its.jpo.ode.api.ConflictMonitorApiProperties;
import us.dot.its.jpo.ode.api.models.IDCount;
import org.springframework.data.domain.Sort;
import java.util.HashMap;
import java.util.Map;

Expand All @@ -29,9 +27,6 @@
import org.springframework.data.mongodb.core.aggregation.ConvertOperators;
import org.springframework.data.mongodb.core.aggregation.DateOperators;

import java.time.format.DateTimeFormatter;
import java.time.ZonedDateTime;

@Component
public class ProcessedSpatRepositoryImpl implements ProcessedSpatRepository {

Expand All @@ -45,7 +40,7 @@ public class ProcessedSpatRepositoryImpl implements ProcessedSpatRepository {
private DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd-HH");
private ObjectMapper mapper = DateJsonMapper.getInstance();

public Query getQuery(Integer intersectionID, Long startTime, Long endTime) {
public Query getQuery(Integer intersectionID, Long startTime, Long endTime, boolean compact) {
Query query = new Query();

if (intersectionID != null) {
Expand All @@ -61,9 +56,16 @@ public Query getQuery(Integer intersectionID, Long startTime, Long endTime) {
if (endTime != null) {
endTimeString = Instant.ofEpochMilli(endTime).toString();
}

if (compact){
query.fields().exclude("recordGeneratedAt", "validationMessages");
}else{
query.fields().exclude("recordGeneratedAt");
}

query.limit(props.getMaximumResponseSize());
query.addCriteria(Criteria.where("odeReceivedAt").gte(startTimeString).lte(endTimeString));
query.fields().exclude("recordGeneratedAt");

return query;
}

Expand All @@ -76,26 +78,20 @@ public List<ProcessedSpat> findProcessedSpats(Query query) {
}

public List<IDCount> getSpatBroadcastRates(int intersectionID, Long startTime, Long endTime){
Query query = getQuery(intersectionID, startTime, endTime);
Query query = getQuery(intersectionID, startTime, endTime, true);

query.fields().include("utcTimeStamp");
List<Map> times = mongoTemplate.find(query, Map.class, collectionName);

//List<ZonedDateTime> spats = findProcessedSpats(query);
System.out.println("Retreived Spat List" + times.size());
Map<String, IDCount> results = new HashMap<>();

for(Map doc: times){
//ZonedDateTime time = spat.getUtcTimeStamp();
System.out.println(doc);
ZonedDateTime time = mapper.convertValue(doc.get("utcTimeStamp"), ZonedDateTime.class);
String key = time.format(formatter);

//String key = map.getProperties().getTimeStamp().substring(0,10) + map.getProperties().getTimeStamp().substring(11,13);
if(results.containsKey(key)){
IDCount count = results.get(key);
count.setCount(count.getCount() +1);
//results.put(key, count);
}
else{
IDCount count = new IDCount();
Expand All @@ -105,46 +101,12 @@ public List<IDCount> getSpatBroadcastRates(int intersectionID, Long startTime, L
}
}

System.out.println("Finished Message Parsing");

//AggregationResults<IDCount> result = mongoTemplate.aggregate(aggregation, collectionName, IDCount.class);
//List<IDCount> results = result.getMappedResults();
//results = new ArrayList<IDCount>(results);

List<IDCount> outputCounts = new ArrayList<>(results.values());
for (IDCount r : outputCounts) {
r.setCount((double) r.getCount() / 3600.0);
}
return outputCounts;
// String startTimeString = Instant.ofEpochMilli(0).toString();
// String endTimeString = Instant.now().toString();

// if (startTime != null) {
// startTimeString = Instant.ofEpochMilli(startTime).toString();
// }
// if (endTime != null) {
// endTimeString = Instant.ofEpochMilli(endTime).toString();
// }

// AggregationOptions options = AggregationOptions.builder().allowDiskUse(true).build();

// Aggregation aggregation = Aggregation.newAggregation(
// Aggregation.match(Criteria.where("intersectionId").is(intersectionID)),
// Aggregation.match(Criteria.where("utcTimeStamp").gte(startTimeString).lte(endTimeString)),
// Aggregation.project("utcTimeStamp"),
// Aggregation.project()
// .and(DateOperators.DateFromString.fromStringOf("utcTimeStamp")).as("date"),
// Aggregation.project()
// .and(DateOperators.DateToString.dateOf("date").toString("%Y-%m-%d-%H")).as("dateStr"),
// Aggregation.group("dateStr").count().as("count"),
// Aggregation.sort(Sort.Direction.ASC, "_id")
// ).withOptions(options);

// AggregationResults<IDCount> result = mongoTemplate.aggregate(aggregation, collectionName, IDCount.class);
// List<IDCount> results = result.getMappedResults();
// for (IDCount r: results){
// r.setCount((float)r.getCount() / 3600.0);
// }


}

Expand Down Expand Up @@ -191,5 +153,4 @@ public List<IDCount> getSpatBroadcastRateDistribution(int intersectionID, Long s
public void add(ProcessedSpat item) {
mongoTemplate.save(item, collectionName);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -44,13 +44,14 @@ public ResponseEntity<List<ProcessedMap>> findMaps(
@RequestParam(name = "intersection_id", required = false) Integer intersectionID,
@RequestParam(name = "start_time_utc_millis", required = false) Long startTime,
@RequestParam(name = "end_time_utc_millis", required = false) Long endTime,
@RequestParam(name= "latest", required = false, defaultValue = "false") boolean latest,
@RequestParam(name = "test", required = false, defaultValue = "false") boolean testData) {
@RequestParam(name = "latest", required = false, defaultValue = "false") boolean latest,
@RequestParam(name = "test", required = false, defaultValue = "false") boolean testData,
@RequestParam(name = "compact", required = false, defaultValue = "true") boolean compact) {

if (testData) {
return ResponseEntity.ok(MockMapGenerator.getProcessedMaps());
} else {
Query query = processedMapRepo.getQuery(intersectionID, startTime, endTime, latest);
Query query = processedMapRepo.getQuery(intersectionID, startTime, endTime, latest, compact);
long count = processedMapRepo.getQueryResultCount(query);

logger.info("Returning ProcessedMap Response with Size: " + count);
Expand All @@ -71,7 +72,7 @@ public ResponseEntity<Long> countMaps(
if (testData) {
return ResponseEntity.ok(5L);
} else {
Query query = processedMapRepo.getQuery(intersectionID, startTime, endTime, false);
Query query = processedMapRepo.getQuery(intersectionID, startTime, endTime, false, true);
long count = processedMapRepo.getQueryResultCount(query);

logger.info("Found: " + count + "Processed Map Messages");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,12 +46,13 @@ public ResponseEntity<List<ProcessedSpat>> findSpats(
@RequestParam(name = "intersection_id", required = false) Integer intersectionID,
@RequestParam(name = "start_time_utc_millis", required = false) Long startTime,
@RequestParam(name = "end_time_utc_millis", required = false) Long endTime,
@RequestParam(name = "compact", required = false, defaultValue = "true") boolean compact,
@RequestParam(name = "test", required = false, defaultValue = "false") boolean testData) {

if (testData) {
return ResponseEntity.ok(MockSpatGenerator.getProcessedSpats());
} else {
Query query = processedSpatRepo.getQuery(intersectionID, startTime, endTime);
Query query = processedSpatRepo.getQuery(intersectionID, startTime, endTime, compact);
long count = processedSpatRepo.getQueryResultCount(query);
logger.info("Returning Processed Spat Response with Size: " + count);
return ResponseEntity.ok(processedSpatRepo.findProcessedSpats(query));
Expand All @@ -70,7 +71,7 @@ public ResponseEntity<Long> countSpats(
if (testData) {
return ResponseEntity.ok(80L);
} else {
Query query = processedSpatRepo.getQuery(intersectionID, startTime, endTime);
Query query = processedSpatRepo.getQuery(intersectionID, startTime, endTime, true);
long count = processedSpatRepo.getQueryResultCount(query);
logger.info("Found: " + count + "Processed Spat Messages");
return ResponseEntity.ok(count);
Expand Down

0 comments on commit afc9574

Please sign in to comment.