Skip to content

Commit

Permalink
Feature/ranking (#98)
Browse files Browse the repository at this point in the history
* feat: init osrm-ranking binary

* feat: interfaces for traffic feeder and cache

* fix: close channel should not called by user

* feat: implement traffic feeder

* fix: force streaming delta running before get all

* feat: define nodebasededge structure; refactor: better package name

* fix: build error

* feat: implement traffic cache indexed by wayid

* feat: improve log

* feat: add monitor proc for cache

* feat: combine target proxy ip,port into one flag

* refactor: move incidentscache as a new package

* feat: load wayid2nodeids mapping

* fix: error print log for scanner

* feat: operate map at the end

* feat: store edge->wayID mapping

* fix: index error

* test: unit test for wayid2nodeids and edge2wayid mapping

* feat: split load to sub function and 3 go routine

* test: parse line unittest

* feat: parse line in parallel

* refactor: split files

* feat: split store tasks

* refactor: review tasks

* refactor: rename

* feat: store nodeIDs set

* feat: thread safe protection for query

* feat: use cached channel to improve performance:
for load na wayID->nodeIDs mapping only, it can reduce load time from about 180 seconds to 110 seconds.

* feat: remove too big edge2way mapping and unncessary nodes set

* refactor: rename package

* feat: implement way2Edges interface

* feat: incident cache indexed by edge
TODO: test

* fix: way2edge supports reverse direction

* feat: traffic cache indexed by edge

* refactor: rename file

* fix: crash

* fix: crash

* feat: feed in parallel

* feat: command line tool trafficcache-parallel-test for cache test

* fix: error instead of fatal

* refactor: simplify dependency

* feat: traffic inquirer and implementation

* feat: use graph.NodeID represents node instead of int64 directly; use int64 represents wayID directly since it has been widely used already

* refactor: simplify dependency; use int64 instead of explicit  NodeID type

* feat: remove indexed by way cache from osrm-ranking, we can test it in trafficcache-parallel-test

* feat: osrm api route v1

* feat: osrm reverse proxy

* refactor: rename query traffic packages; refine ranking interface

* feat: parse coordinates

* fix: continue when can not run more

* feat: osrm route v1 api

* feat: osrm api - route service v1 response

* feat: omit empty for json response

* feat: ranking service TODO: ranking

* chore: ignore debug bin

* refactor: rename pkg name to rankingservice

* feat: always use float64 instead of float32

* feat: rank routes by duration

* feat: update osrm routes with traffic, then ranking

* fix: info log

* feat: post process after ranking

* feat: osrmv1 api provides alternatives number interface

* feat: control backend timeout and alternatives by flags

* feat: consider turn duration/weight

* feat: monitor

* docs: comments

* refactor: remove useless flag

* refactor: rename monitor fields

* test: ut for rank by duration

* test: ut for flows cache

* fix: delete way/edge once it'll not be blocked by incidents anymore

* fix: name

* test: ut for flows cache indexed by edge

* test: ut for incidents cache indexed by edge

* refactor: better func name

* feat: use std lib abs for float64

* docs: update readme for ranking and trafficcache-parallel-test

* docs: add monitor API for ranking
  • Loading branch information
wangyoucao577 authored and CodeBear801 committed Dec 3, 2019
1 parent dea8d0a commit 0050f5a
Show file tree
Hide file tree
Showing 52 changed files with 2,948 additions and 9 deletions.
6 changes: 6 additions & 0 deletions integration/.gitignore
Original file line number Diff line number Diff line change
@@ -1,9 +1,15 @@

# binaries
cmd/__debug_bin
cmd/*/__debug_bin
cmd/osrm-traffic-updater/osrm-traffic-updater
cmd/snappy/snappy
cmd/wayid2nodeid-extractor/wayid2nodeid-extractor
cmd/trafficproxy-cli/trafficproxy-cli
cmd/osrm-ranking/osrm-ranking
cmd/trafficcache-parallel-test/trafficcache-parallel-test

# test files
cmd/trafficproxy-cli/*_flows.csv
cmd/trafficproxy-cli/*_incidents.csv

15 changes: 13 additions & 2 deletions integration/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,14 @@ Command line tool [cmd/osrm-traffic-updater](cmd/osrm-traffic-updater/) is desig
Command line tool for extract wayid to nodeids mapping from PBF. Code in [cmd/wayid2nodeid-extractor](cmd/wayid2nodeid-extractor/).

## snappy
Command line tool for [snappy](github.com/golang/snappy) compression. Code in [cmd/snappy](cmd/snappy/).
Command line tool for [snappy](github.com/golang/snappy) compression. Code in [cmd/snappy](cmd/snappy/).

## osrm-ranking
Update `duration/weight` by traffic for many `alternatives`, then pick up best as result.
- design [OSRM with Telenav Traffic Design - Alternatives Ranking](doc/osrm-with-telenav-traffic.md)
- code [cmd/osrm-ranking](cmd/osrm-ranking)
- monitor API: `/monitor`, e.g. `http://localhost:8080/monitor`

## trafficproxy-cli
Command line tool for querying traffic from `trafficproxy`. Code in [cmd/trafficproxy-cli](cmd/trafficproxy-cli/).
Typical usage:
Expand All @@ -35,4 +42,8 @@ $ trafficproxy-cli -log_dir=. ...
# for more options, see help
$ trafficproxy-cli -h

```
```

## trafficcache-parallel-test
Command line tool for traffic cache test. There could be two type of traffic caches, i.e. indexed by wayID and indexed by Edge. This tool possible to run them in parallel and do some comparison test. Code in [cmd/trafficcache-parallel-test](cmd/trafficcache-parallel-test/).

17 changes: 17 additions & 0 deletions integration/cmd/osrm-ranking/flags.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
package main

import (
"flag"
)

var flags struct {
listenPort int
wayID2NodeIDsMappingFile string
osrmBackendEndpoint string
}

func init() {
flag.IntVar(&flags.listenPort, "p", 8080, "Listen port.")
flag.StringVar(&flags.wayID2NodeIDsMappingFile, "m", "wayid2nodeids.csv.snappy", "OSRM way id to node ids mapping table, snappy compressed.")
flag.StringVar(&flags.osrmBackendEndpoint, "osrm", "", "Backend OSRM-backend endpoint")
}
80 changes: 80 additions & 0 deletions integration/cmd/osrm-ranking/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
package main

import (
"encoding/json"
"flag"
"net/http"
"strconv"
"time"

"github.com/Telenav/osrm-backend/integration/pkg/trafficproxyclient"
"github.com/Telenav/osrm-backend/integration/rankingservice"
"github.com/Telenav/osrm-backend/integration/trafficcache/trafficcacheindexedbyedge"
"github.com/Telenav/osrm-backend/integration/wayid2nodeids"

"github.com/golang/glog"
)

func main() {
flag.Parse()
defer glog.Flush()

// monitor
upClock := time.Now()
monitorContents := newMonitorContents()
monitorContents.TrafficCacheMonitorContents.Name = "traffic cache(indexed by edge)"

// wayid2nodeids mapping
wayID2NodeIDsMapping := wayid2nodeids.NewMappingFrom(flags.wayID2NodeIDsMappingFile)
if err := wayID2NodeIDsMapping.Load(); err != nil {
glog.Error(err)
return
}

// prepare traffic cache
trafficCache := trafficcacheindexedbyedge.New(wayID2NodeIDsMapping)
feeder := trafficproxyclient.NewFeeder()
feeder.RegisterEaters(trafficCache)
go func() {
for {
err := feeder.Run()
if err != nil {
glog.Warning(err)
}
trafficCache.Clear()
time.Sleep(5 * time.Second) // try again later
}
}()

//start http listening
mux := http.NewServeMux()
mux.HandleFunc("/monitor/", func(w http.ResponseWriter, req *http.Request) {
monitorContents.UpTime = jsonDuration(time.Now().Sub(upClock))

// update wayid2nodeids contents
monitorContents.WayID2NodeIDsMonitorContents.IsReady = wayID2NodeIDsMapping.IsReady()
monitorContents.WayID2NodeIDsMonitorContents.Ways = wayID2NodeIDsMapping.WayIDsCount()

// update traffic cache contents
monitorContents.TrafficCacheMonitorContents.Flows = trafficCache.Flows.Count()
monitorContents.TrafficCacheMonitorContents.FlowsAffectedWays = trafficCache.Flows.AffectedWaysCount()
monitorContents.TrafficCacheMonitorContents.Incidents = trafficCache.Incidents.Count()
monitorContents.TrafficCacheMonitorContents.IncidentsAffectedWays = trafficCache.Incidents.AffectedWaysCount()
monitorContents.TrafficCacheMonitorContents.IncidentsAffectedEdges = trafficCache.Incidents.AffectedEdgesCount()
glog.Infof("monitor %s, [flows] %d affectedways %d, [incidents] blocking-only %d, affectedways %d affectededges %d",
monitorContents.TrafficCacheMonitorContents.Name, monitorContents.TrafficCacheMonitorContents.Flows, monitorContents.TrafficCacheMonitorContents.FlowsAffectedWays,
monitorContents.TrafficCacheMonitorContents.Incidents, monitorContents.TrafficCacheMonitorContents.IncidentsAffectedWays, monitorContents.TrafficCacheMonitorContents.IncidentsAffectedEdges)

w.WriteHeader(http.StatusOK)
json.NewEncoder(w).Encode(monitorContents)
})

//start ranking service
rankingService := rankingservice.New(flags.osrmBackendEndpoint, trafficCache)
mux.Handle("/route/v1/driving/", rankingService)

// listen
listening := ":" + strconv.Itoa(flags.listenPort)
glog.Infof("Listening on %s", listening)
glog.Fatal(http.ListenAndServe(listening, mux))
}
38 changes: 38 additions & 0 deletions integration/cmd/osrm-ranking/monitor.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
package main

import (
"fmt"
"time"
)

type monitorContents struct {
UpTime jsonDuration `json:"uptime"`
TrafficCacheMonitorContents *trafficCacheMonitorContents `json:"traffic_cache"`
WayID2NodeIDsMonitorContents *wayID2NodeIDsMonitorContents `json:"wayid2nodeids"`
}

type trafficCacheMonitorContents struct {
Name string `json:"name"`
Flows int64 `json:"flows"`
FlowsAffectedWays int64 `json:"flows_affected_ways"`
Incidents int `json:"incidents"`
IncidentsAffectedWays int `json:"incidents_affected_ways"`
IncidentsAffectedEdges int `json:"incidents_affected_edges"`
}

type wayID2NodeIDsMonitorContents struct {
IsReady bool `json:"is_ready"`
Ways int `json:"ways"`
}

func newMonitorContents() *monitorContents {
return &monitorContents{
0, &trafficCacheMonitorContents{}, &wayID2NodeIDsMonitorContents{},
}
}

type jsonDuration time.Duration

func (j jsonDuration) MarshalJSON() (b []byte, err error) {
return []byte(fmt.Sprintf(`"%s"`, time.Duration(j).String())), nil
}
20 changes: 20 additions & 0 deletions integration/cmd/trafficcache-parallel-test/flags.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
package main

import (
"flag"
"time"
)

var flags struct {
monitorInterval time.Duration
wayID2NodeIDsMappingFile string
indexedByWayID bool
indexedByEdge bool
}

func init() {
flag.DurationVar(&flags.monitorInterval, "monitor-interval", 10*time.Second, "Log for traffic cache status will print out per monitor-interval.")
flag.StringVar(&flags.wayID2NodeIDsMappingFile, "m", "wayid2nodeids.csv.snappy", "OSRM way id to node ids mapping table, snappy compressed.")
flag.BoolVar(&flags.indexedByWayID, "indexed-by-way", true, "Run cache indexed by wayID.")
flag.BoolVar(&flags.indexedByEdge, "indexed-by-edge", true, "Run cache indexed by Edge.")
}
98 changes: 98 additions & 0 deletions integration/cmd/trafficcache-parallel-test/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
package main

import (
"flag"
"time"

"github.com/Telenav/osrm-backend/integration/pkg/trafficproxyclient"
"github.com/Telenav/osrm-backend/integration/trafficcache/trafficcache"
"github.com/Telenav/osrm-backend/integration/trafficcache/trafficcacheindexedbyedge"
"github.com/Telenav/osrm-backend/integration/wayid2nodeids"

"github.com/golang/glog"
)

func main() {
flag.Parse()
defer glog.Flush()

var cacheByWay *trafficcache.Cache
var cacheByEdge *trafficcacheindexedbyedge.Cache
if flags.indexedByEdge {
wayID2NodeIDsMapping := wayid2nodeids.NewMappingFrom(flags.wayID2NodeIDsMappingFile)
if err := wayID2NodeIDsMapping.Load(); err != nil {
glog.Error(err)
return
}
cacheByEdge = trafficcacheindexedbyedge.New(wayID2NodeIDsMapping)
}
if flags.indexedByWayID {
cacheByWay = trafficcache.New()
}

// traffic cache feeder
feeder := trafficproxyclient.NewFeeder()
if cacheByWay != nil {
feeder.RegisterEaters(cacheByWay)
}
if cacheByEdge != nil {
feeder.RegisterEaters(cacheByEdge)
}

go func() {
for {
err := feeder.Run()
if err != nil {
glog.Warning(err)
}

if cacheByWay != nil {
cacheByWay.Clear()
}
if cacheByEdge != nil {
cacheByEdge.Clear()
}
time.Sleep(5 * time.Second) // try again later
}
}()

// monitor
startTime := time.Now()
for {
currentTime := time.Now()
if currentTime.Sub(startTime) < flags.monitorInterval {
time.Sleep(time.Second)
continue
}
startTime = currentTime

var cacheByWayFlowsCount, cacheByWayIncidentsCount, cacheByWayIncidentAffectedWaysCount int64
var cacheByEdgeFlowsAffectedWaysCount, cacheByEdgeIncidentsCount, cacheByEdgeIncidentAffectedWaysCount int64
if cacheByWay != nil {
cacheByWayFlowsCount = cacheByWay.Flows.Count()
cacheByWayIncidentsCount = int64(cacheByWay.Incidents.Count())
cacheByWayIncidentAffectedWaysCount = int64(cacheByWay.Incidents.AffectedWaysCount())
glog.Infof("traffic in cache(indexed by wayID), [flows] %d, [incidents] blocking-only %d, affectedways %d",
cacheByWayFlowsCount, cacheByWayIncidentsCount, cacheByWayIncidentAffectedWaysCount)
}
if cacheByEdge != nil {
cacheByEdgeFlowsAffectedWaysCount = cacheByEdge.Flows.AffectedWaysCount()
cacheByEdgeIncidentsCount = int64(cacheByEdge.Incidents.Count())
cacheByEdgeIncidentAffectedWaysCount = int64(cacheByEdge.Incidents.AffectedWaysCount())
glog.Infof("traffic in cache(indexed by Edge), [flows] %d affectedways %d, [incidents] blocking-only %d, affectedways %d affectededges %d",
cacheByEdge.Flows.Count(), cacheByEdgeFlowsAffectedWaysCount,
cacheByEdgeIncidentsCount, cacheByEdgeIncidentAffectedWaysCount, cacheByEdge.Incidents.AffectedEdgesCount())
}
if cacheByWay != nil && cacheByEdge != nil {
warnMismatch("flows", cacheByWayFlowsCount, cacheByEdgeFlowsAffectedWaysCount)
warnMismatch("incidents", cacheByWayIncidentsCount, cacheByEdgeIncidentsCount)
warnMismatch("incidents affected ways", cacheByWayIncidentAffectedWaysCount, cacheByEdgeIncidentAffectedWaysCount)
}
}
}

func warnMismatch(name string, v1, v2 int64) {
if v1 != v2 {
glog.Warningf("%s mismatch: %d != %d, delta %d", name, v1, v2, v1-v2)
}
}
4 changes: 4 additions & 0 deletions integration/cmd/trafficproxy-cli/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,17 +52,21 @@ func main() {
} else if flags.rpcMode == rpcModeStreamingDelta {

responseChan := make(chan proxy.TrafficResponse)
waitChan := make(chan struct{})

// async startup dumper
go func() {
trafficdumper.DumpStreamingDelta(responseChan)
waitChan <- struct{}{}
}()

// startup streaming delta
err := trafficproxyclient.StreamingDeltaFlowsIncidents(responseChan)
if err != nil {
glog.Error(err)
}
close(responseChan)
<-waitChan
return
}

Expand Down
27 changes: 27 additions & 0 deletions integration/graph/edge.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
//Package graph defines a Node Based Graph.
//more details refer to https://github.com/Telenav/open-source-spec/blob/master/osrm/doc/understanding_osrm_graph_representation.md#terminology
package graph

//Edge represents NodeBasedEdge structure. It's an directed edge between two nodes.
//https://github.com/Telenav/open-source-spec/blob/master/osrm/doc/understanding_osrm_graph_representation.md#terminology
type Edge struct {
From int64 // use int64 directly to indicate a unique node
To int64 // use int64 directly to indicate a unique node
}

// Reverse returns reverse direction edge from original one.
func (e Edge) Reverse() Edge {
return Edge{From: e.To, To: e.From}
}

// ReverseEdges reverses the edges.
func ReverseEdges(s []Edge) []Edge {
if len(s) == 0 {
return s
}

for i, j := 0, len(s)-1; i <= j; i, j = i+1, j-1 {
s[i], s[j] = s[j].Reverse(), s[i].Reverse()
}
return s
}
Loading

0 comments on commit 0050f5a

Please sign in to comment.