Skip to content

Commit

Permalink
[mds-compliance][mds-db] Fix/compliance handles timestamp (#316)
Browse files Browse the repository at this point in the history
* Query for policy by rule_id.

* Adding ability to see Policies active at a particular time.

* Refactored snapshot logic.

* Adding test to ensure timestamp paramenter works for count endpoint

* minor fixes

* Making policy rules unique and refactoring compliance api logic to reflect that, and renaming `end_date` param to `timestamp`.

* repairing test cases that broke because rules cannot be repeated

* more consistent error handling and better active policies and rule uniqueness checks

* Versioning and fixing the definition of an active policy

* minor fixes

* minor fixes

Co-authored-by: Mark Maxham <max@ellis-and-associates.com>
  • Loading branch information
janedotx and Mark Maxham authored May 15, 2020
1 parent 11f171d commit 86d157c
Show file tree
Hide file tree
Showing 8 changed files with 278 additions and 90 deletions.
118 changes: 63 additions & 55 deletions packages/mds-compliance/api.ts
Original file line number Diff line number Diff line change
Expand Up @@ -21,23 +21,32 @@ import logger from '@mds-core/mds-logger'
import {
isUUID,
now,
days,
pathsFor,
getPolygon,
pointInShape,
isInStatesOrEvents,
ServerError
ServerError,
NotFoundError,
BadParamsError,
AuthorizationError
} from '@mds-core/mds-utils'
import { Geography, Device, UUID, VehicleEvent } from '@mds-core/mds-types'
import { TEST1_PROVIDER_ID, TEST2_PROVIDER_ID, BLUE_SYSTEMS_PROVIDER_ID, providerName } from '@mds-core/mds-providers'
import { Geometry, FeatureCollection } from 'geojson'
import { parseRequest } from '@mds-core/mds-api-helpers'
import * as compliance_engine from './mds-compliance-engine'
import { ComplianceApiRequest, ComplianceApiResponse } from './types'
import {
ComplianceApiRequest,
ComplianceApiResponse,
ComplianceSnapshotApiResponse,
ComplianceCountApiResponse
} from './types'
import { ComplianceApiVersionMiddleware } from './middleware'

const AllowedProviderIDs = [TEST1_PROVIDER_ID, TEST2_PROVIDER_ID, BLUE_SYSTEMS_PROVIDER_ID]

function api(app: express.Express): express.Express {
app.use(ComplianceApiVersionMiddleware)
app.use(async (req: ComplianceApiRequest, res: ComplianceApiResponse, next: express.NextFunction) => {
try {
// verify presence of provider_id
Expand All @@ -48,25 +57,23 @@ function api(app: express.Express): express.Express {
/* istanbul ignore next */
if (!provider_id) {
logger.warn('Missing provider_id in', req.originalUrl)
return res.status(400).send({
result: 'missing provider_id'
})
return res.status(400).send({ error: new BadParamsError('missing provider_id') })
}

/* istanbul ignore next */
if (!isUUID(provider_id)) {
logger.warn(req.originalUrl, 'invalid provider_id is not a UUID', provider_id)
return res.status(400).send({
result: `invalid provider_id ${provider_id} is not a UUID`
})
return res
.status(400)
.send({ error: new BadParamsError(`invalid provider_id ${provider_id} is not a UUID`) })
}

// stash provider_id
res.locals.provider_id = provider_id

logger.info(providerName(provider_id), req.method, req.originalUrl)
} else {
return res.status(401).send('Unauthorized')
return res.status(401).send({ error: new AuthorizationError('Unauthorized') })
}
}
} catch (err) {
Expand All @@ -76,34 +83,29 @@ function api(app: express.Express): express.Express {
next()
})

app.get(pathsFor('/snapshot/:policy_uuid'), async (req: ComplianceApiRequest, res: ComplianceApiResponse) => {
const { provider_id } = res.locals
const { provider_id: queried_provider_id, end_date: query_end_date } = {
app.get(pathsFor('/snapshot/:policy_uuid'), async (req: ComplianceApiRequest, res: ComplianceSnapshotApiResponse) => {
const { provider_id, version } = res.locals
const { provider_id: queried_provider_id, timestamp } = {
...parseRequest(req).query('provider_id'),
...parseRequest(req, { parser: Number }).query('end_date')
...parseRequest(req, { parser: Number }).query('timestamp')
}

/* istanbul ignore next */
async function fail(err: Error) {
logger.error(err.stack || err)
return res.status(500).send(new ServerError())
}
// default to now() if no timestamp supplied
const query_date = timestamp || now()

const { policy_uuid } = req.params

if (!isUUID(policy_uuid)) {
return res.status(400).send({ err: 'bad_param' })
return res.status(400).send({ error: new BadParamsError('Bad policy UUID') })
}
const { start_date, end_date } = query_end_date
? { end_date: query_end_date, start_date: query_end_date - days(365) }
: { end_date: now() + days(365), start_date: now() - days(365) }

try {
const all_policies = await db.readPolicies({ start_date, get_published: null, get_unpublished: null })
const all_policies = await db.readActivePolicies(query_date)
const policy = compliance_engine.filterPolicies(all_policies).find(p => {
return p.policy_id === policy_uuid
})
if (!policy) {
return res.status(404).send({ err: 'not found' })
return res.status(404).send({ error: new NotFoundError('Policy not found') })
}

if (
Expand Down Expand Up @@ -131,10 +133,10 @@ function api(app: express.Express): express.Express {
])
const deviceIdSubset = deviceRecords.map((record: { device_id: UUID; provider_id: UUID }) => record.device_id)
const devices = await cache.readDevices(deviceIdSubset)
const events =
query_end_date && end_date < now()
? await db.readHistoricalEvents({ provider_id: target_provider_id, end_date })
: await cache.readEvents(deviceIdSubset)
// if a timestamp was supplied, the data we want is probably old enough it's going to be in the db
const events = timestamp
? await db.readHistoricalEvents({ provider_id: target_provider_id, end_date: timestamp })
: await cache.readEvents(deviceIdSubset)

const deviceMap = devices.reduce((map: { [d: string]: Device }, device) => {
return device ? Object.assign(map, { [device.device_id]: device }) : map
Expand All @@ -143,40 +145,39 @@ function api(app: express.Express): express.Express {
const filteredEvents = compliance_engine.filterEvents(events)
const result = compliance_engine.processPolicy(policy, filteredEvents, geographies, deviceMap)
if (result === undefined) {
return res.status(400).send({ err: 'bad_param' })
return res.status(400).send({ error: new BadParamsError('Unable to process compliance results') })
}
return res.status(200).send(result)

return res.status(200).send({ ...result, timestamp: query_date, version })
}
} else {
return res.status(401).send({ err: 'Unauthorized' })
return res.status(401).send({ error: new AuthorizationError() })
}
} catch (err) {
if (err.message.includes('not_found')) {
return res.status(400).send({ err: 'bad_param' })
}
await fail(err)
return res.status(500).send({ error: new ServerError() })
}
})

app.get(pathsFor('/count/:rule_id'), async (req: ComplianceApiRequest, res: ComplianceApiResponse) => {
if (!AllowedProviderIDs.includes(res.locals.provider_id)) {
return res.status(401).send({ result: 'unauthorized access' })
app.get(pathsFor('/count/:rule_id'), async (req: ComplianceApiRequest, res: ComplianceCountApiResponse) => {
const { timestamp } = {
...parseRequest(req, { parser: Number }).query('timestamp')
}

async function fail(err: Error) {
logger.error(err.stack || err)
if (err.message.includes('invalid rule_id')) {
return res.status(404).send(err.message)
}
/* istanbul ignore next */
return res
.status(500)
.send({ error: 'server_error', error_description: 'an internal server error has occurred and been logged' })
const query_date = timestamp || now()
if (!AllowedProviderIDs.includes(res.locals.provider_id)) {
return res.status(401).send({ error: new AuthorizationError('Unauthorized') })
}

const { rule_id } = req.params
try {
const rule = await db.readRule(rule_id)
const activePolicies = await db.readActivePolicies(query_date)
const [policy] = activePolicies.filter(activePolicy => {
const matches = activePolicy.rules.filter(policy_rule => policy_rule.rule_id === rule_id)
return matches.length !== 0
})
if (!policy) {
throw new NotFoundError('invalid rule_id')
}
const [rule] = policy.rules.filter(r => r.rule_id === rule_id)
const geography_ids = rule.geographies.reduce((acc: UUID[], geo: UUID) => {
return [...acc, geo]
}, [])
Expand All @@ -195,11 +196,13 @@ function api(app: express.Express): express.Express {
return [...acc, getPolygon(geographies, geography.geography_id)]
}, [])

const events = timestamp ? await db.readHistoricalEvents({ end_date: timestamp }) : await cache.readAllEvents()

// https://stackoverflow.com/a/51577579 to remove nulls in typesafe way
const events = (await cache.readAllEvents()).filter(
const filteredVehicleEvents = events.filter(
(event): event is VehicleEvent => event !== null && isInStatesOrEvents(rule, event)
)
const filteredEvents = compliance_engine.filterEvents(events)
const filteredEvents = compliance_engine.filterEvents(filteredVehicleEvents)

const count = filteredEvents.reduce((count_acc, event) => {
return (
Expand All @@ -213,9 +216,14 @@ function api(app: express.Express): express.Express {
)
}, 0)

return res.status(200).send({ count })
} catch (err) {
await fail(err)
const { version } = res.locals
return res.status(200).send({ policy, count, timestamp: query_date, version })
} catch (error) {
await logger.error(error.stack)
if (error instanceof NotFoundError) {
return res.status(404).send({ error })
}
return res.status(500).send({ error: new ServerError('An internal server error has occurred and been logged') })
}
})
return app
Expand Down
7 changes: 7 additions & 0 deletions packages/mds-compliance/middleware/compliance-api-version.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
import { ApiVersionMiddleware } from '@mds-core/mds-api-server'
import { COMPLIANCE_API_SUPPORTED_VERSIONS, COMPLIANCE_API_DEFAULT_VERSION } from '../types'

export const ComplianceApiVersionMiddleware = ApiVersionMiddleware(
'application/vnd.mds.compliance+json',
COMPLIANCE_API_SUPPORTED_VERSIONS
).withDefaultVersion(COMPLIANCE_API_DEFAULT_VERSION)
1 change: 1 addition & 0 deletions packages/mds-compliance/middleware/index.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
export * from './compliance-api-version'
54 changes: 33 additions & 21 deletions packages/mds-compliance/tests/api.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,8 @@ import {
makeTelemetryInArea,
restrictedAreas,
veniceSpecOps,
LA_CITY_BOUNDARY
LA_CITY_BOUNDARY,
START_ONE_MONTH_AGO
} from '@mds-core/mds-test-data'
import test from 'unit.js'
import { api as agency } from '@mds-core/mds-agency'
Expand Down Expand Up @@ -88,6 +89,7 @@ const COUNT_POLICY_JSON: Policy = {
description: 'Mobility caps as described in the One-Year Permit',
policy_id: COUNT_POLICY_UUID,
start_date: 1558389669540,
publish_date: 1558389669540,
end_date: null,
prev_policies: null,
provider_ids: [],
Expand Down Expand Up @@ -132,6 +134,7 @@ const COUNT_POLICY_JSON_2: Policy = {
policy_id: COUNT_POLICY_UUID_2,
start_date: 1558389669540,
end_date: null,
publish_date: 1558389669540,
prev_policies: null,
provider_ids: [],
rules: [
Expand Down Expand Up @@ -173,6 +176,7 @@ const COUNT_POLICY_JSON_4: Policy = {
name: 'LADOT Mobility Caps',
description: 'Mobility caps as described in the One-Year Permit',
policy_id: COUNT_POLICY_UUID_4,
publish_date: 1558389669540,
start_date: 1558389669540,
end_date: null,
prev_policies: null,
Expand Down Expand Up @@ -239,7 +243,7 @@ const TIME_POLICY_JSON: Policy = {
]
}

const APP_JSON = 'application/json; charset=utf-8'
const APP_JSON = 'application/vnd.mds.compliance+json; charset=utf-8; version=0.1'
describe('Tests Compliance API:', () => {
afterEach(async () => {
await Promise.all([db.shutdown(), cache.shutdown(), stream.shutdown()])
Expand Down Expand Up @@ -269,7 +273,6 @@ describe('Tests Compliance API:', () => {
await db.writeGeography(geography)
await db.writePolicy(COUNT_POLICY_JSON)
await db.publishGeography({ geography_id: geography.geography_id })
await db.publishPolicy(COUNT_POLICY_UUID)
done()
})
})
Expand Down Expand Up @@ -329,7 +332,6 @@ describe('Tests Compliance API:', () => {
await db.writeGeography(geography)
await db.publishGeography({ geography_id: geography.geography_id })
await db.writePolicy(COUNT_POLICY_JSON)
await db.publishPolicy(COUNT_POLICY_UUID)
done()
})
})
Expand All @@ -342,6 +344,8 @@ describe('Tests Compliance API:', () => {
.expect(200)
.end((err, result) => {
test.assert.deepEqual(result.body.total_violations, 0)
test.object(result.body).hasProperty('timestamp')
test.value(result.body.policy, COUNT_POLICY_UUID)
test.value(result).hasHeader('content-type', APP_JSON)
done(err)
})
Expand Down Expand Up @@ -432,7 +436,6 @@ describe('Tests Compliance API:', () => {
await db.writeGeography(geography)
await db.publishGeography({ geography_id: geography.geography_id })
await db.writePolicy(COUNT_POLICY_JSON)
await db.publishPolicy(COUNT_POLICY_UUID)
done()
})
})
Expand All @@ -446,6 +449,8 @@ describe('Tests Compliance API:', () => {
.end((err, result) => {
test.assert.deepEqual(result.body.compliance[0].matches[0].measured, 10)
test.assert.deepEqual(result.body.total_violations, 5)
test.object(result.body).hasProperty('timestamp')
test.object(result.body).hasProperty('version')
test.value(result).hasHeader('content-type', APP_JSON)
done(err)
})
Expand Down Expand Up @@ -561,7 +566,6 @@ describe('Tests Compliance API:', () => {
await db.writeGeography(geography)
await db.publishGeography({ geography_id: geography.geography_id })
await db.writePolicy(COUNT_POLICY_JSON_2)
await db.publishPolicy(COUNT_POLICY_UUID_2)
done()
})
})
Expand Down Expand Up @@ -813,15 +817,14 @@ describe('Tests Compliance API:', () => {
await db.writeGeography({ name: 'la', geography_id: GEOGRAPHY_UUID, geography_json: LA_CITY_BOUNDARY })
await db.publishGeography({ geography_id: GEOGRAPHY_UUID })
await db.writePolicy(COUNT_POLICY_JSON_4)
await db.publishPolicy(COUNT_POLICY_JSON_4.policy_id)
done()
})
})
})

it('Historical check reports 5 violations', done => {
request
.get(`/snapshot/${COUNT_POLICY_UUID_4}?end_date=${yesterday + 200}`)
.get(`/snapshot/${COUNT_POLICY_UUID_4}?timestamp=${yesterday + 200}`)
.set('Authorization', ADMIN_AUTH)
.expect(200)
.end((err, result) => {
Expand All @@ -845,7 +848,7 @@ describe('Tests Compliance API:', () => {
})

describe('Tests count endpoint', () => {
before(done => {
before(async () => {
const devices_a: Device[] = makeDevices(15, now())
const events_a = makeEventsWithTelemetry(devices_a, now(), CITY_OF_LA, 'trip_start')
const telemetry_a: Telemetry[] = devices_a.reduce((acc: Telemetry[], device) => {
Expand All @@ -864,26 +867,35 @@ describe('Tests Compliance API:', () => {
events: [...events_a, ...events_b],
telemetry: [...telemetry_a, ...telemetry_b]
}
Promise.all([db.initialize(), cache.initialize()]).then(() => {
Promise.all([cache.seed(seedData), db.seed(seedData)]).then(() => {
db.writePolicy(COUNT_POLICY_JSON).then(() => {
db.writeGeography({ name: 'la', geography_id: GEOGRAPHY_UUID, geography_json: LA_CITY_BOUNDARY }).then(
() => {
done()
}
)
})
})
})
await db.initialize()
await cache.initialize()
await cache.seed(seedData)
await db.seed(seedData)
await db.writeGeography({ name: 'la', geography_id: GEOGRAPHY_UUID, geography_json: LA_CITY_BOUNDARY })
await db.publishGeography({ geography_id: GEOGRAPHY_UUID })
await db.writePolicy(COUNT_POLICY_JSON)
})

it('Test count endpoint success', done => {
it('Test count endpoint, expecting events', done => {
request
.get(`/count/47c8c7d4-14b5-43a3-b9a5-a32ecc2fb2c6`)
.set('Authorization', ADMIN_AUTH)
.expect(200)
.end((err, result) => {
test.assert.deepEqual(result.body.count, 30)
test.object(result.body).hasProperty('policy')
test.value(result).hasHeader('content-type', APP_JSON)
done(err)
})
})

it('Test count endpoint, expecting no events', done => {
request
.get(`/count/47c8c7d4-14b5-43a3-b9a5-a32ecc2fb2c6?timestamp=${START_ONE_MONTH_AGO}`)
.set('Authorization', ADMIN_AUTH)
.expect(200)
.end((err, result) => {
test.assert.deepEqual(result.body.count, 0)
test.value(result).hasHeader('content-type', APP_JSON)
done(err)
})
Expand Down
Loading

0 comments on commit 86d157c

Please sign in to comment.