-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathindex.js
90 lines (80 loc) · 2.25 KB
/
index.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
'use strict'
const {Writable} = require('stream')
const createEntitiesStore = require('./lib/entities-store')
const tripSignature = (u) => {
if (u.trip.trip_id) return u.trip.trip_id
if (u.trip.route_id && u.vehicle.id) {
return u.trip.route_id + '-' + u.vehicle.id
}
// todo: u.trip.route_id + slugg(u.vehicle.label) ?
return null
}
const gtfsRtAsDump = (opt = {}) => {
const {
ttl,
timestamp,
tripUpdateSignature,
vehiclePositionSignature,
} = {
ttl: 5 * 60 * 1000, // 5 minutes
timestamp: () => Date.now() / 1000 | 0,
tripUpdateSignature: (u) => {
const tripSig = tripSignature(u)
return tripSig ? 'trip_update-' + tripSig : null
},
vehiclePositionSignature: (p) => {
const vehicleSig = p.vehicle && p.vehicle.id
if (vehicleSig) return 'vehicle_position-' + vehicleSig
const tripSig = tripSignature(p)
return tripSig ? 'vehicle_position-' + tripSig : null
},
...opt
}
const entitiesStore = createEntitiesStore(ttl, timestamp)
const write = (entity) => {
// If the entity is not being deleted, exactly one of 'trip_update', 'vehicle' and 'alert' fields should be populated.
// https://developers.google.com/transit/gtfs-realtime/reference#message-feedentity
let sig = null
if (entity.trip_update) {
sig = tripUpdateSignature(entity.trip_update)
} else if (entity.vehicle) {
sig = vehiclePositionSignature(entity.vehicle)
}
// todo: alert
if (sig !== null) {
entitiesStore.put(sig, entity)
return;
}
const err = new Error('invalid/unsupported kind of FeedEntity')
err.feedEntity = entity
throw err
}
let feedMessage = null
const asFeedMessage = () => {
return feedMessage || entitiesStore.asFeedMessage()
}
const out = new Writable({
objectMode: true,
write: (entity, _, cb) => {
write(entity)
out.emit('change')
cb(null)
},
writev: (chunks, cb) => {
for (const {chunk: entity} of chunks) write(entity)
out.emit('change')
cb(null)
},
final: (cb) => {
feedMessage = entitiesStore.asFeedMessage()
entitiesStore.flush()
cb(null)
},
})
out.asFeedMessage = asFeedMessage
// todo: let asFeedMessage return this
out.timeModified = () => entitiesStore.getTimestamp()
out.nrOfEntities = entitiesStore.nrOfEntities
return out
}
module.exports = gtfsRtAsDump