Skip to content

Commit

Permalink
Merge pull request #89 from usdot-jpo-ode/spat-timestamp-fix
Browse files Browse the repository at this point in the history
SPAT Timestamp - Latest
  • Loading branch information
John-Wiens authored May 31, 2024
2 parents 941a7f6 + 2bcea48 commit 6e9cbd1
Show file tree
Hide file tree
Showing 13 changed files with 45 additions and 67 deletions.
2 changes: 1 addition & 1 deletion jpo-conflictmonitor
Submodule jpo-conflictmonitor updated from 14f1cb to 61d147
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
import us.dot.its.jpo.ode.api.models.IntersectionReferenceData;

public interface ProcessedMapRepository extends DataLoader<ProcessedMap<LineString>>{
Query getQuery(Integer intersectionID, Long startTime, Long endTime,boolean latest);
Query getQuery(Integer intersectionID, Long startTime, Long endTime,boolean latest, boolean compact);

long getQueryResultCount(Query query);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ public class ProcessedMapRepositoryImpl implements ProcessedMapRepository {
private ObjectMapper mapper = DateJsonMapper.getInstance();
private Logger logger = LoggerFactory.getLogger(ProcessedMapRepositoryImpl.class);

public Query getQuery(Integer intersectionID, Long startTime, Long endTime, boolean latest) {
public Query getQuery(Integer intersectionID, Long startTime, Long endTime, boolean latest, boolean compact) {
Query query = new Query();

if (intersectionID != null) {
Expand All @@ -78,8 +78,13 @@ public Query getQuery(Integer intersectionID, Long startTime, Long endTime, bool
query.limit(props.getMaximumResponseSize());
}

if (compact){
query.fields().exclude("recordGeneratedAt", "properties.validationMessages");
}else{
query.fields().exclude("recordGeneratedAt");
}

query.addCriteria(Criteria.where("properties.timeStamp").gte(startTimeString).lte(endTimeString));
query.fields().exclude("recordGeneratedAt");
return query;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
import org.springframework.data.mongodb.core.query.Query;

public interface ProcessedSpatRepository extends DataLoader<ProcessedSpat>{
Query getQuery(Integer intersectionID, Long startTime, Long endTime);
Query getQuery(Integer intersectionID, Long startTime, Long endTime, boolean latest, boolean compact);

long getQueryResultCount(Query query);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import java.util.ArrayList;
import java.util.List;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.domain.Sort;
import org.springframework.data.mongodb.core.MongoTemplate;
import org.springframework.data.mongodb.core.query.Criteria;
import org.springframework.data.mongodb.core.query.Query;
Expand Down Expand Up @@ -40,7 +41,7 @@ public class ProcessedSpatRepositoryImpl implements ProcessedSpatRepository {
private DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd-HH");
private ObjectMapper mapper = DateJsonMapper.getInstance();

public Query getQuery(Integer intersectionID, Long startTime, Long endTime) {
public Query getQuery(Integer intersectionID, Long startTime, Long endTime, boolean latest, boolean compact) {
Query query = new Query();

if (intersectionID != null) {
Expand All @@ -56,9 +57,21 @@ public Query getQuery(Integer intersectionID, Long startTime, Long endTime) {
if (endTime != null) {
endTimeString = Instant.ofEpochMilli(endTime).toString();
}
query.limit(props.getMaximumResponseSize());
query.addCriteria(Criteria.where("odeReceivedAt").gte(startTimeString).lte(endTimeString));
query.fields().exclude("recordGeneratedAt");

if (latest) {
query.with(Sort.by(Sort.Direction.DESC, "utcTimeStamp"));
query.limit(1);
}else{
query.limit(props.getMaximumResponseSize());
}

if (compact){
query.fields().exclude("recordGeneratedAt", "validationMessages");
}else{
query.fields().exclude("recordGeneratedAt");
}

query.addCriteria(Criteria.where("utcTimeStamp").gte(startTimeString).lte(endTimeString));
return query;
}

Expand All @@ -72,26 +85,20 @@ public List<ProcessedSpat> findProcessedSpats(Query query) {
}

public List<IDCount> getSpatBroadcastRates(int intersectionID, Long startTime, Long endTime){
Query query = getQuery(intersectionID, startTime, endTime);
Query query = getQuery(intersectionID, startTime, endTime, false, true);

query.fields().include("utcTimeStamp");
List<Map> times = mongoTemplate.find(query, Map.class, collectionName);

//List<ZonedDateTime> spats = findProcessedSpats(query);
System.out.println("Retreived Spat List" + times.size());
Map<String, IDCount> results = new HashMap<>();

for(Map doc: times){
//ZonedDateTime time = spat.getUtcTimeStamp();
System.out.println(doc);
ZonedDateTime time = mapper.convertValue(doc.get("utcTimeStamp"), ZonedDateTime.class);
String key = time.format(formatter);

//String key = map.getProperties().getTimeStamp().substring(0,10) + map.getProperties().getTimeStamp().substring(11,13);
if(results.containsKey(key)){
IDCount count = results.get(key);
count.setCount(count.getCount() +1);
//results.put(key, count);
}
else{
IDCount count = new IDCount();
Expand All @@ -101,46 +108,12 @@ public List<IDCount> getSpatBroadcastRates(int intersectionID, Long startTime, L
}
}

System.out.println("Finished Message Parsing");

//AggregationResults<IDCount> result = mongoTemplate.aggregate(aggregation, collectionName, IDCount.class);
//List<IDCount> results = result.getMappedResults();
//results = new ArrayList<IDCount>(results);

List<IDCount> outputCounts = new ArrayList<>(results.values());
for (IDCount r : outputCounts) {
r.setCount((double) r.getCount() / 3600.0);
}
return outputCounts;
// String startTimeString = Instant.ofEpochMilli(0).toString();
// String endTimeString = Instant.now().toString();

// if (startTime != null) {
// startTimeString = Instant.ofEpochMilli(startTime).toString();
// }
// if (endTime != null) {
// endTimeString = Instant.ofEpochMilli(endTime).toString();
// }

// AggregationOptions options = AggregationOptions.builder().allowDiskUse(true).build();

// Aggregation aggregation = Aggregation.newAggregation(
// Aggregation.match(Criteria.where("intersectionId").is(intersectionID)),
// Aggregation.match(Criteria.where("utcTimeStamp").gte(startTimeString).lte(endTimeString)),
// Aggregation.project("utcTimeStamp"),
// Aggregation.project()
// .and(DateOperators.DateFromString.fromStringOf("utcTimeStamp")).as("date"),
// Aggregation.project()
// .and(DateOperators.DateToString.dateOf("date").toString("%Y-%m-%d-%H")).as("dateStr"),
// Aggregation.group("dateStr").count().as("count"),
// Aggregation.sort(Sort.Direction.ASC, "_id")
// ).withOptions(options);

// AggregationResults<IDCount> result = mongoTemplate.aggregate(aggregation, collectionName, IDCount.class);
// List<IDCount> results = result.getMappedResults();
// for (IDCount r: results){
// r.setCount((float)r.getCount() / 3600.0);
// }


}

Expand Down Expand Up @@ -187,5 +160,4 @@ public List<IDCount> getSpatBroadcastRateDistribution(int intersectionID, Long s
public void add(ProcessedSpat item) {
mongoTemplate.save(item, collectionName);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -44,13 +44,14 @@ public ResponseEntity<List<ProcessedMap>> findMaps(
@RequestParam(name = "intersection_id", required = false) Integer intersectionID,
@RequestParam(name = "start_time_utc_millis", required = false) Long startTime,
@RequestParam(name = "end_time_utc_millis", required = false) Long endTime,
@RequestParam(name= "latest", required = false, defaultValue = "false") boolean latest,
@RequestParam(name = "test", required = false, defaultValue = "false") boolean testData) {
@RequestParam(name = "latest", required = false, defaultValue = "false") boolean latest,
@RequestParam(name = "test", required = false, defaultValue = "false") boolean testData,
@RequestParam(name = "compact", required = false, defaultValue = "false") boolean compact) {

if (testData) {
return ResponseEntity.ok(MockMapGenerator.getProcessedMaps());
} else {
Query query = processedMapRepo.getQuery(intersectionID, startTime, endTime, latest);
Query query = processedMapRepo.getQuery(intersectionID, startTime, endTime, latest, compact);
long count = processedMapRepo.getQueryResultCount(query);

logger.info("Returning ProcessedMap Response with Size: " + count);
Expand All @@ -71,7 +72,7 @@ public ResponseEntity<Long> countMaps(
if (testData) {
return ResponseEntity.ok(5L);
} else {
Query query = processedMapRepo.getQuery(intersectionID, startTime, endTime, false);
Query query = processedMapRepo.getQuery(intersectionID, startTime, endTime, false, true);
long count = processedMapRepo.getQueryResultCount(query);

logger.info("Found: " + count + "Processed Map Messages");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,12 +46,14 @@ public ResponseEntity<List<ProcessedSpat>> findSpats(
@RequestParam(name = "intersection_id", required = false) Integer intersectionID,
@RequestParam(name = "start_time_utc_millis", required = false) Long startTime,
@RequestParam(name = "end_time_utc_millis", required = false) Long endTime,
@RequestParam(name = "latest", required = false, defaultValue = "false") boolean latest,
@RequestParam(name = "compact", required = false, defaultValue = "false") boolean compact,
@RequestParam(name = "test", required = false, defaultValue = "false") boolean testData) {

if (testData) {
return ResponseEntity.ok(MockSpatGenerator.getProcessedSpats());
} else {
Query query = processedSpatRepo.getQuery(intersectionID, startTime, endTime);
Query query = processedSpatRepo.getQuery(intersectionID, startTime, endTime, latest, compact);
long count = processedSpatRepo.getQueryResultCount(query);
logger.info("Returning Processed Spat Response with Size: " + count);
return ResponseEntity.ok(processedSpatRepo.findProcessedSpats(query));
Expand All @@ -70,7 +72,7 @@ public ResponseEntity<Long> countSpats(
if (testData) {
return ResponseEntity.ok(80L);
} else {
Query query = processedSpatRepo.getQuery(intersectionID, startTime, endTime);
Query query = processedSpatRepo.getQuery(intersectionID, startTime, endTime,false, true);
long count = processedSpatRepo.getQueryResultCount(query);
logger.info("Found: " + count + "Processed Spat Messages");
return ResponseEntity.ok(count);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@

import us.dot.its.jpo.conflictmonitor.monitor.models.assessments.ConnectionOfTravelAssessment;
import us.dot.its.jpo.conflictmonitor.monitor.models.assessments.LaneDirectionOfTravelAssessment;
import us.dot.its.jpo.conflictmonitor.monitor.models.assessments.SignalStateAssessment;
import us.dot.its.jpo.conflictmonitor.monitor.models.assessments.StopLinePassageAssessment;
import us.dot.its.jpo.conflictmonitor.monitor.models.assessments.StopLineStopAssessment;
import us.dot.its.jpo.ode.api.accessors.assessments.ConnectionOfTravelAssessment.ConnectionOfTravelAssessmentRepository;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,10 +42,10 @@ public void testProcessedMap() {

List<ProcessedMap> list = MockMapGenerator.getProcessedMaps();

Query query = processedMapRepo.getQuery(null, null, null, false);
Query query = processedMapRepo.getQuery(null, null, null, false, false);
when(processedMapRepo.findProcessedMaps(query)).thenReturn(list);

ResponseEntity<List<ProcessedMap>> result = controller.findMaps(null, null, null, false, false);
ResponseEntity<List<ProcessedMap>> result = controller.findMaps(null, null, null, false, false, false);
assertThat(result.getStatusCode()).isEqualTo(HttpStatus.OK);
assertThat(result.getBody()).isEqualTo(list);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
import java.util.Collection;
import java.util.Set;


import org.mockito.Mockito;
import org.springframework.security.core.Authentication;
import org.springframework.security.core.GrantedAuthority;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,10 +41,10 @@ public void testProcessedSpat() {

List<ProcessedSpat> list = MockSpatGenerator.getProcessedSpats();

Query query = processedSpatRepo.getQuery(null, null, null);
Query query = processedSpatRepo.getQuery(null, null, null, false, false);
when(processedSpatRepo.findProcessedSpats(query)).thenReturn(list);

ResponseEntity<List<ProcessedSpat>> result = controller.findSpats(null, null, null, false);
ResponseEntity<List<ProcessedSpat>> result = controller.findSpats(null, null, null, false,false,false);
assertThat(result.getStatusCode()).isEqualTo(HttpStatus.OK);
// assertThat(result.getHeaders().getContentType()).isEqualTo(MediaType.APPLICATION_JSON);
assertThat(result.getBody()).isEqualTo(list);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ public void testGetQuery() {

boolean latest = true;

Query query = repository.getQuery(intersectionID, startTime, endTime, latest);
Query query = repository.getQuery(intersectionID, startTime, endTime, latest, false);


// Assert IntersectionID
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ void setUp() {
@Test
public void testGetQuery() {

Query query = repository.getQuery(intersectionID, startTime, endTime);
Query query = repository.getQuery(intersectionID, startTime, endTime, false, false);



Expand All @@ -60,7 +60,7 @@ public void testGetQuery() {


// Assert Start and End Time
Document queryTimeDocument = (Document)query.getQueryObject().get("odeReceivedAt");
Document queryTimeDocument = (Document)query.getQueryObject().get("utcTimeStamp");
assertThat(queryTimeDocument.getString("$gte")).isEqualTo(Instant.ofEpochMilli(startTime).toString());
assertThat(queryTimeDocument.getString("$lte")).isEqualTo(Instant.ofEpochMilli(endTime).toString());

Expand Down

0 comments on commit 6e9cbd1

Please sign in to comment.