diff --git a/Cargo.lock b/Cargo.lock index 51074a73..fe00578f 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -91,26 +91,25 @@ dependencies = [ [[package]] name = "async-nats" -version = "0.29.0" +version = "0.30.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1174495e436c928905018f10a36160f7a8a6786450f50f4ce7fba05d1539704c" +checksum = "94e3e851ddf3b62be8a8085e1e453968df9cdbf990a37bbb589b5b4f587c68d7" dependencies = [ - "async-nats-tokio-rustls-deps", - "base64 0.13.1", - "base64-url", + "base64 0.21.2", "bytes", "futures", "http", "itoa 1.0.6", "memchr", "nkeys", - "nuid", + "nuid 0.3.2", "once_cell", "rand", "regex", "ring", "rustls-native-certs", "rustls-pemfile", + "rustls-webpki", "serde", "serde_json", "serde_nanos", @@ -119,21 +118,11 @@ dependencies = [ "time 0.3.21", "tokio", "tokio-retry", + "tokio-rustls 0.24.1", "tracing", "url 2.4.0", ] -[[package]] -name = "async-nats-tokio-rustls-deps" -version = "0.24.0-ALPHA.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8cdefe54cd7867d937c0a507d2a3a830af410044282cd3e4002b5b7860e1892e" -dependencies = [ - "rustls 0.21.1", - "tokio", - "webpki 0.22.0", -] - [[package]] name = "async-trait" version = "0.1.68" @@ -243,15 +232,6 @@ version = "0.21.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "604178f6c5c21f02dc555784810edfb88d34ac2c73b2eae109655649ee73ce3d" -[[package]] -name = "base64-url" -version = "1.4.13" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "67a99c239d0c7e77c85dddfa9cebce48704b3c49550fcd3b84dd637e4484899f" -dependencies = [ - "base64 0.13.1", -] - [[package]] name = "base64ct" version = "1.1.1" @@ -389,6 +369,9 @@ name = "bytes" version = "1.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "89b2fd2a0dcf38d7971e2194b6b6eebab45ae01067456a7fd93d5547a61b70be" +dependencies = [ + "serde", +] [[package]] name = "cc" @@ -584,29 +567,6 @@ dependencies = [ "const-oid", ] -[[package]] -name = "diesel" -version = "1.4.8" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b28135ecf6b7d446b43e27e225622a038cc4e2930a1022f51cdb97ada19b8e4d" -dependencies = [ - "bitflags", - "byteorder", - "diesel_derives", - "pq-sys", -] - -[[package]] -name = "diesel_derives" -version = "1.4.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "45f5098f628d02a7a0f68ddba586fb61e80edec3bdc1be3b921f4ceec60858d3" -dependencies = [ - "proc-macro2", - "quote 1.0.28", - "syn 1.0.109", -] - [[package]] name = "difflib" version = "0.4.0" @@ -692,7 +652,7 @@ dependencies = [ "svc-agent", "svc-authn", "svc-authz", - "svc-error", + "svc-error 0.6.0", "svc-events", "svc-nats-client", "svc-utils", @@ -1534,9 +1494,9 @@ dependencies = [ [[package]] name = "nkeys" -version = "0.2.0" +version = "0.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0e66a7cd1358277b2a6f77078e70aea7315ff2f20db969cc61153103ec162594" +checksum = "c2d151f6ece2f3d1077f6c779268de2516653d8344ddde65addd785cce764fe5" dependencies = [ "byteorder", "data-encoding", @@ -1583,6 +1543,16 @@ dependencies = [ "rand", ] +[[package]] +name = "nuid" +version = "0.4.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0b61b1710432e483e6a67b20b6c60c6afe0e2fad67aabba3bdb912f3f70ff6ae" +dependencies = [ + "once_cell", + "rand", +] + [[package]] name = "num-bigint" version = "0.2.6" @@ -1870,15 +1840,6 @@ version = "0.2.17" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "5b40af805b3121feab8a3c29f04d8ad262fa8e0561883e7653e024ae4479e6de" -[[package]] -name = "pq-sys" -version = "0.4.8" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "31c0052426df997c0cbd30789eb44ca097e3541717a7b8fa36b1c464ee7edebd" -dependencies = [ - "vcpkg", -] - [[package]] name = "predicates" version = "2.1.5" @@ -2257,8 +2218,8 @@ dependencies = [ "pollster", "thiserror", "tokio", - "tokio-rustls", - "webpki 0.21.4", + "tokio-rustls 0.22.0", + "webpki", ] [[package]] @@ -2318,14 +2279,14 @@ dependencies = [ "log", "ring", "sct 0.6.1", - "webpki 0.21.4", + "webpki", ] [[package]] name = "rustls" -version = "0.21.1" +version = "0.21.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c911ba11bc8433e811ce56fde130ccf32f5127cab0e0194e9c68c5a5b671791e" +checksum = "79ea77c539259495ce8ca47f53e66ae0330a8819f67e23ac96ca02f50e7b7d36" dependencies = [ "log", "ring", @@ -2356,9 +2317,9 @@ dependencies = [ [[package]] name = "rustls-webpki" -version = "0.100.1" +version = "0.101.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d6207cd5ed3d8dca7816f8f3725513a34609c0c765bf652b8c3cb4cfd87db46b" +checksum = "15f36a6828982f422756984e47912a7a51dcbc2a197aa791158f8ca61cd8204e" dependencies = [ "ring", "untrusted", @@ -2956,9 +2917,9 @@ checksum = "81cdd64d312baedb58e21336b31bc043b77e01cc99033ce76ef539f78e965ebc" [[package]] name = "svc-agent" -version = "0.20.0" +version = "0.21.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1c90205083c87c6bc25990d5cdd26f98d8c431441e86dbd4e7421b28c428e04d" +checksum = "bef9c610b65f24bab61b52f5a7264983d0e014858e8496282aec7d688baf7869" dependencies = [ "async-channel", "base64 0.21.2", @@ -2981,7 +2942,6 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "eb5cf659f78c8fff863c17ac4e674829517919716eeecab602e8d2941e89c111" dependencies = [ "chrono", - "diesel", "http", "jsonwebtoken", "serde", @@ -3015,6 +2975,19 @@ name = "svc-error" version = "0.5.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3f841d7fd45d6f179e9f3765491fcb5eea100a5bbe50ea47faf3f262031966d9" +dependencies = [ + "anyhow", + "crossbeam-channel", + "http", + "serde", + "serde_derive", +] + +[[package]] +name = "svc-error" +version = "0.6.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ad220c6bc89bc2e7b8af01db6dcfa4a513e18d78e7cf2f778e623ac22577eadf" dependencies = [ "anyhow", "crossbeam-channel", @@ -3032,9 +3005,9 @@ dependencies = [ [[package]] name = "svc-events" -version = "0.9.0" +version = "0.11.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a30b9e3f342270165a9e7543f9ba85a45bff586cc04f4d495ffd8b4a44447974" +checksum = "3ad84bd15a598b693df7dd08ca832c3414d59d6847f134f479ff547264669735" dependencies = [ "serde", "serde_json", @@ -3046,9 +3019,9 @@ dependencies = [ [[package]] name = "svc-nats-client" -version = "0.5.0" +version = "0.8.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c35276c77ae7ff528a5fb243f16bf95065a3ac8c91837356f2ee797c9a697203" +checksum = "cf7705838936003cae1b79e726be255ea9702b8aad516dec9c998c7c93ef6f8d" dependencies = [ "anyhow", "async-nats", @@ -3056,10 +3029,11 @@ dependencies = [ "futures", "futures-util", "humantime-serde", + "nuid 0.4.1", "reqwest", "serde", "svc-agent", - "svc-error", + "svc-error 0.6.0", "svc-events", "thiserror", "tokio", @@ -3069,9 +3043,9 @@ dependencies = [ [[package]] name = "svc-utils" -version = "0.7.0" +version = "0.8.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8b8fd4d41bfda7aec14a80746dd1e6f5ae6c04643a7e9c18065f38388b6a4477" +checksum = "b8443737f4d2444dc9e6a140a83d720685290d1ad763e75b59421958fa4a1a96" dependencies = [ "axum", "futures", @@ -3081,11 +3055,12 @@ dependencies = [ "prometheus", "svc-agent", "svc-authn", - "svc-error", + "svc-error 0.5.0", "tokio", "tower", "tower-http", "tracing", + "url 2.4.0", ] [[package]] @@ -3303,7 +3278,17 @@ checksum = "bc6844de72e57df1980054b38be3a9f4702aba4858be64dd700181a8a6d0e1b6" dependencies = [ "rustls 0.19.1", "tokio", - "webpki 0.21.4", + "webpki", +] + +[[package]] +name = "tokio-rustls" +version = "0.24.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c28327cf380ac148141087fbfb9de9d7bd4e84ab5d2c28fbc911d753de8a7081" +dependencies = [ + "rustls 0.21.5", + "tokio", ] [[package]] @@ -3746,16 +3731,6 @@ dependencies = [ "untrusted", ] -[[package]] -name = "webpki" -version = "0.22.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f095d78192e208183081cc07bc5515ef55216397af48b873e5edcd72637fa1bd" -dependencies = [ - "ring", - "untrusted", -] - [[package]] name = "whoami" version = "1.4.0" diff --git a/Cargo.toml b/Cargo.toml index 82314014..f9076619 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -46,19 +46,19 @@ sqlx = { version = "0.6", features = [ "bigdecimal", "runtime-tokio-native-tls", ] } -svc-agent = { version = "0.20", features = ["sqlx"] } +svc-agent = { version = "0.21", features = ["sqlx"] } svc-authn = { version = "0.8", features = ["jose", "sqlx"] } svc-authz = "0.12" -svc-error = { version = "0.5", features = [ +svc-error = { version = "0.6", features = [ "svc-agent", "svc-authn", "svc-authz", "sentry-extension", "sqlx", ] } -svc-events = "0.9" -svc-nats-client = "0.5" -svc-utils = { version = "0.7", features = ["log-middleware", "metrics-middleware", "cors-middleware", "authn-extractor"] } +svc-events = "0.11" +svc-nats-client = "0.8" +svc-utils = { version = "0.8", features = ["log-middleware", "metrics-middleware", "cors-middleware", "authn-extractor"] } tokio = { version = "1.28", features = ["full"] } tower = { version = "0.4", features = [ "timeout" ] } tracing = "0.1" diff --git a/sqlx-data.json b/sqlx-data.json index 8aeb70c0..af4f610f 100644 --- a/sqlx-data.json +++ b/sqlx-data.json @@ -1249,6 +1249,135 @@ }, "query": "\n UPDATE recording\n SET modified_segments =\n CASE\n WHEN created_by = $3 THEN $2\n ELSE segments\n END,\n adjusted_at = NOW()\n WHERE class_id = $1 AND deleted_at IS NULL\n RETURNING\n id,\n class_id,\n rtc_id,\n stream_uri,\n segments AS \"segments!: Option\",\n started_at,\n modified_segments AS \"modified_segments!: Option\",\n created_at,\n adjusted_at,\n transcoded_at,\n created_by AS \"created_by: AgentId\",\n deleted_at\n " }, + "7027e1415842b3bd10d9c5efb3e691341a7dfd7b51705cbf2dbed9cba3544a03": { + "describe": { + "columns": [ + { + "name": "user_account: _", + "ordinal": 0, + "type_info": { + "Custom": { + "kind": { + "Composite": [ + [ + "label", + "Text" + ], + [ + "audience", + "Text" + ] + ] + }, + "name": "account_id" + } + } + }, + { + "name": "last_op_id", + "ordinal": 1, + "type_info": "Int8" + }, + { + "name": "is_video_streaming_banned", + "ordinal": 2, + "type_info": "Bool" + }, + { + "name": "is_collaboration_banned", + "ordinal": 3, + "type_info": "Bool" + } + ], + "nullable": [ + false, + false, + false, + false + ], + "parameters": { + "Left": [ + { + "Custom": { + "kind": { + "Composite": [ + [ + "label", + "Text" + ], + [ + "audience", + "Text" + ] + ] + }, + "name": "account_id" + } + }, + "Int8", + "Int8" + ] + } + }, + "query": "\n INSERT INTO ban_account_op (user_account, last_op_id)\n VALUES ($1, $2)\n ON CONFLICT (user_account) DO UPDATE\n SET\n -- to reset sub-operation trackers only if op_id has changed\n is_video_streaming_banned = ($2 = $3 AND ban_account_op.is_video_streaming_banned),\n is_collaboration_banned = ($2 = $3 AND ban_account_op.is_collaboration_banned),\n last_op_id = EXCLUDED.last_op_id\n WHERE\n -- allow to 'complete' operation if there's no change in last_op_id\n -- or allow to do upsert without real changes so we can process\n -- the same message twice\n ban_account_op.last_op_id = EXCLUDED.last_op_id OR\n -- allow change op id if the previous operation is completed\n (\n ban_account_op.last_op_id = $3 AND\n ban_account_op.is_video_streaming_banned = true AND\n ban_account_op.is_collaboration_banned = true\n )\n RETURNING\n user_account AS \"user_account: _\",\n last_op_id,\n is_video_streaming_banned,\n is_collaboration_banned\n " + }, + "7174ed47333151e6ef71b45f2f2fe9d15d70ba860e5dc52e1dcff3aea12c4c1d": { + "describe": { + "columns": [ + { + "name": "user_account: _", + "ordinal": 0, + "type_info": { + "Custom": { + "kind": { + "Composite": [ + [ + "label", + "Text" + ], + [ + "audience", + "Text" + ] + ] + }, + "name": "account_id" + } + } + }, + { + "name": "last_op_id", + "ordinal": 1, + "type_info": "Int8" + }, + { + "name": "is_video_streaming_banned", + "ordinal": 2, + "type_info": "Bool" + }, + { + "name": "is_collaboration_banned", + "ordinal": 3, + "type_info": "Bool" + } + ], + "nullable": [ + false, + false, + false, + false + ], + "parameters": { + "Left": [ + "Record", + "Bool", + "Bool", + "Int8" + ] + } + }, + "query": "\n UPDATE ban_account_op\n SET\n is_video_streaming_banned = COALESCE($2, ban_account_op.is_video_streaming_banned),\n is_collaboration_banned = COALESCE($3, ban_account_op.is_collaboration_banned)\n WHERE\n ban_account_op.last_op_id = $4\n AND ban_account_op.user_account = $1\n RETURNING\n user_account AS \"user_account: _\",\n last_op_id,\n is_video_streaming_banned,\n is_collaboration_banned\n " + }, "73a64697ca7ad2d67c7b2a77cc1fd6c0fb7a2b262a30d8386e83f8cbd8be5a54": { "describe": { "columns": [ @@ -1635,90 +1764,6 @@ }, "query": "\n INSERT INTO recording (class_id, rtc_id, segments, modified_segments, stream_uri, started_at, adjusted_at, transcoded_at, created_by)\n VALUES ($1, $2, $3, $4, $5, NOW(), NOW(), NOW(), $6)\n RETURNING\n id,\n class_id,\n rtc_id,\n stream_uri,\n segments AS \"segments!: Option\",\n started_at,\n modified_segments AS \"modified_segments!: Option\",\n created_at,\n adjusted_at,\n transcoded_at,\n created_by AS \"created_by: AgentId\",\n deleted_at\n " }, - "b333f6270cc34a0b5d2cd2d23d39c705c219704ba7094dd078cbc2978146e534": { - "describe": { - "columns": [ - { - "name": "target_account: AccountId", - "ordinal": 0, - "type_info": { - "Custom": { - "kind": { - "Composite": [ - [ - "label", - "Text" - ], - [ - "audience", - "Text" - ] - ] - }, - "name": "account_id" - } - } - }, - { - "name": "class_id", - "ordinal": 1, - "type_info": "Uuid" - }, - { - "name": "banned_at", - "ordinal": 2, - "type_info": "Timestamptz" - }, - { - "name": "banned_operation_id", - "ordinal": 3, - "type_info": "Int8" - }, - { - "name": "unbanned_at", - "ordinal": 4, - "type_info": "Timestamptz" - }, - { - "name": "unbanned_operation_id", - "ordinal": 5, - "type_info": "Int8" - } - ], - "nullable": [ - false, - false, - false, - false, - true, - true - ], - "parameters": { - "Left": [ - "Uuid", - { - "Custom": { - "kind": { - "Composite": [ - [ - "label", - "Text" - ], - [ - "audience", - "Text" - ] - ] - }, - "name": "account_id" - } - }, - "Int8" - ] - } - }, - "query": "\n INSERT INTO ban_history (class_id, target_account, banned_operation_id)\n VALUES ($1, $2, $3)\n -- this allows us to run this query many times idempotently\n ON CONFLICT (banned_operation_id) DO NOTHING\n RETURNING\n target_account AS \"target_account: AccountId\",\n class_id,\n banned_at,\n banned_operation_id,\n unbanned_at,\n unbanned_operation_id\n " - }, "c67188dde7672c71f7e14a0ef09047934fbf808e5541e1b35c88004f36c16c8b": { "describe": { "columns": [ @@ -1841,80 +1886,6 @@ }, "query": "\n INSERT INTO class (\n scope, audience, time, tags, preserve_history, kind,\n conference_room_id, event_room_id,\n original_event_room_id, modified_event_room_id, reserve, room_events_uri,\n established, properties, original_class_id\n )\n VALUES ($1, $2, $3, $4, $5, $6::class_type, $7, $8, $9, $10, $11, $12, $13, $14, $15)\n ON CONFLICT (scope, audience)\n DO UPDATE\n SET time = EXCLUDED.time,\n tags = EXCLUDED.tags,\n preserve_history = EXCLUDED.preserve_history,\n reserve = EXCLUDED.reserve,\n properties = EXCLUDED.properties\n WHERE class.established = 'f'\n RETURNING\n id,\n kind AS \"kind!: ClassType\",\n scope,\n time AS \"time!: Time\",\n audience,\n created_at,\n tags,\n preserve_history,\n reserve,\n properties AS \"properties: _\",\n original_class_id,\n content_id\n " }, - "c6ce697aaad621b2230880c9d46251dffd92dc490f5473cd2ca32a8627dae130": { - "describe": { - "columns": [ - { - "name": "user_account: _", - "ordinal": 0, - "type_info": { - "Custom": { - "kind": { - "Composite": [ - [ - "label", - "Text" - ], - [ - "audience", - "Text" - ] - ] - }, - "name": "account_id" - } - } - }, - { - "name": "last_op_id", - "ordinal": 1, - "type_info": "Int8" - }, - { - "name": "is_video_streaming_banned", - "ordinal": 2, - "type_info": "Bool" - }, - { - "name": "is_collaboration_banned", - "ordinal": 3, - "type_info": "Bool" - } - ], - "nullable": [ - false, - false, - false, - false - ], - "parameters": { - "Left": [ - { - "Custom": { - "kind": { - "Composite": [ - [ - "label", - "Text" - ], - [ - "audience", - "Text" - ] - ] - }, - "name": "account_id" - } - }, - "Int8", - "Bool", - "Bool", - "Int8" - ] - } - }, - "query": "\n INSERT INTO ban_account_op (user_account, last_op_id, is_video_streaming_banned, is_collaboration_banned)\n VALUES ($1, $2, COALESCE($3, false), COALESCE($4, false))\n ON CONFLICT (user_account) DO UPDATE\n SET\n is_video_streaming_banned = COALESCE($3, ban_account_op.is_video_streaming_banned),\n is_collaboration_banned = COALESCE($4, ban_account_op.is_collaboration_banned),\n last_op_id = EXCLUDED.last_op_id\n WHERE\n -- allow to 'complete' operation if there's no change in last_op_id\n -- or allow to do upsert without real changes so we can process\n -- the same message twice\n ban_account_op.last_op_id = EXCLUDED.last_op_id OR\n -- allow change op id if the previous operation is completed\n (\n ban_account_op.last_op_id = $5 AND\n ban_account_op.is_video_streaming_banned = true AND\n ban_account_op.is_collaboration_banned = true\n )\n RETURNING\n user_account AS \"user_account: _\",\n last_op_id,\n is_video_streaming_banned,\n is_collaboration_banned\n " - }, "c7386d4717720730d7762fbcf2d24c5d0f508b246e987f4666684bf37b27ddc2": { "describe": { "columns": [ @@ -3401,6 +3372,90 @@ }, "query": "\n INSERT INTO class (\n scope, audience, time, tags, preserve_history, kind,\n conference_room_id, event_room_id, properties\n )\n VALUES ($1, $2, $3, $4, $5, $6::class_type, $7, $8, $9)\n RETURNING\n id,\n scope,\n kind AS \"kind!: ClassType\",\n audience,\n time AS \"time!: Time\",\n tags,\n properties AS \"properties: _\",\n preserve_history,\n created_at,\n event_room_id AS \"event_room_id!: Uuid\",\n conference_room_id AS \"conference_room_id!: Uuid\",\n original_event_room_id,\n modified_event_room_id,\n reserve,\n room_events_uri,\n host AS \"host: AgentId\",\n timed_out,\n original_class_id,\n content_id\n " }, + "f9aeac4c3a1e1747314db755f8e0790b01a0d2d603f20cdd069b7b57268ccdbe": { + "describe": { + "columns": [ + { + "name": "target_account!: AccountId", + "ordinal": 0, + "type_info": { + "Custom": { + "kind": { + "Composite": [ + [ + "label", + "Text" + ], + [ + "audience", + "Text" + ] + ] + }, + "name": "account_id" + } + } + }, + { + "name": "class_id!: _", + "ordinal": 1, + "type_info": "Uuid" + }, + { + "name": "banned_at!: _", + "ordinal": 2, + "type_info": "Timestamptz" + }, + { + "name": "banned_operation_id!: _", + "ordinal": 3, + "type_info": "Int8" + }, + { + "name": "unbanned_at", + "ordinal": 4, + "type_info": "Timestamptz" + }, + { + "name": "unbanned_operation_id", + "ordinal": 5, + "type_info": "Int8" + } + ], + "nullable": [ + null, + null, + null, + null, + null, + null + ], + "parameters": { + "Left": [ + "Uuid", + { + "Custom": { + "kind": { + "Composite": [ + [ + "label", + "Text" + ], + [ + "audience", + "Text" + ] + ] + }, + "name": "account_id" + } + }, + "Int8" + ] + } + }, + "query": "\n WITH i AS (\n INSERT INTO ban_history (class_id, target_account, banned_operation_id)\n VALUES ($1, $2, $3)\n -- this allows us to run this query many times idempotently\n ON CONFLICT (banned_operation_id) DO NOTHING\n RETURNING *\n )\n -- gets row if there was no conflict\n SELECT\n target_account AS \"target_account!: AccountId\",\n class_id AS \"class_id!: _\",\n banned_at AS \"banned_at!: _\",\n banned_operation_id AS \"banned_operation_id!: _\",\n unbanned_at,\n unbanned_operation_id\n FROM i\n -- or selects original row if there was a conflict\n UNION\n SELECT\n target_account AS \"target_account: AccountId\",\n class_id,\n banned_at,\n banned_operation_id,\n unbanned_at,\n unbanned_operation_id\n FROM ban_history\n WHERE\n banned_operation_id = $3\n " + }, "fe7779aca18f7e0fe8dcee465db39f3669a6b1c80064bab63e82f8f5aa99d7d1": { "describe": { "columns": [ diff --git a/src/app/api/v1/account/mod.rs b/src/app/api/v1/account/mod.rs index fb774e6c..f0f7f1d4 100644 --- a/src/app/api/v1/account/mod.rs +++ b/src/app/api/v1/account/mod.rs @@ -126,11 +126,11 @@ mod tests { use super::*; use crate::test_helpers::prelude::*; - #[tokio::test] - async fn update_property() { + #[sqlx::test] + async fn update_property(pool: sqlx::PgPool) { let agent = TestAgent::new("web", "user1", USR_AUDIENCE); - let db_pool = TestDb::new().await; + let db_pool = TestDb::new(pool); let authz = TestAuthz::new(); let state = TestState::new_with_pool(db_pool, authz); let state = Arc::new(state); @@ -151,11 +151,11 @@ mod tests { assert!(props.contains_key(&property_id)); } - #[tokio::test] - async fn read_property() { + #[sqlx::test] + async fn read_property(pool: sqlx::PgPool) { let agent = TestAgent::new("web", "user2", USR_AUDIENCE); - let db_pool = TestDb::new().await; + let db_pool = TestDb::new(pool); let authz = TestAuthz::new(); let state = TestState::new_with_pool(db_pool, authz); let state = Arc::new(state); @@ -185,11 +185,11 @@ mod tests { assert_eq!(property_value, expected_property_value); } - #[tokio::test] - async fn merge_properties() { + #[sqlx::test] + async fn merge_properties(pool: sqlx::PgPool) { let agent = TestAgent::new("web", "user3", USR_AUDIENCE); - let db_pool = TestDb::new().await; + let db_pool = TestDb::new(pool); let authz = TestAuthz::new(); let state = TestState::new_with_pool(db_pool, authz); let state = Arc::new(state); @@ -243,11 +243,11 @@ mod tests { assert_eq!(property_value, second_prop_value); } - #[tokio::test] - async fn overwrite_property() { + #[sqlx::test] + async fn overwrite_property(pool: sqlx::PgPool) { let agent = TestAgent::new("web", "user4", USR_AUDIENCE); - let db_pool = TestDb::new().await; + let db_pool = TestDb::new(pool); let authz = TestAuthz::new(); let state = TestState::new_with_pool(db_pool, authz); let state = Arc::new(state); @@ -300,11 +300,11 @@ mod tests { assert_eq!(property_value, second_prop_value); } - #[tokio::test] - async fn overwrite_property_complex() { + #[sqlx::test] + async fn overwrite_property_complex(pool: sqlx::PgPool) { let agent = TestAgent::new("web", "user5", USR_AUDIENCE); - let db_pool = TestDb::new().await; + let db_pool = TestDb::new(pool); let authz = TestAuthz::new(); let state = TestState::new_with_pool(db_pool, authz); let state = Arc::new(state); diff --git a/src/app/api/v1/authz/mod.rs b/src/app/api/v1/authz/mod.rs index 5fe038af..a7d5f0eb 100644 --- a/src/app/api/v1/authz/mod.rs +++ b/src/app/api/v1/authz/mod.rs @@ -489,12 +489,12 @@ fn test_extract_rtc_id() { assert_eq!(r, "14aa9730-26e1-487c-9153-bc8cb28d8eb0"); } -#[tokio::test] -async fn test_transform_tq_authz_request() { +#[sqlx::test] +async fn test_transform_tq_authz_request(pool: sqlx::PgPool) { use crate::test_helpers::prelude::TestAuthz; use crate::test_helpers::state::TestState; - let test_state = TestState::new(TestAuthz::new()).await; + let test_state = TestState::new(pool, TestAuthz::new()).await; // ["classrooms", CLASSROOM_ID, "priorities", priority]::* // becomes ["classrooms", CLASSROOM_ID]::* let mut authz_req: AuthzRequest = serde_json::from_str( @@ -617,12 +617,12 @@ fn test_transform_nats_gatekeeper_authz_request() { ); } -#[tokio::test] -async fn test_transform_storage_v1_authz_request() { +#[sqlx::test] +async fn test_transform_storage_v1_authz_request(pool: sqlx::PgPool) { use crate::test_helpers::prelude::TestAuthz; use crate::test_helpers::state::TestState; - let test_state = TestState::new(TestAuthz::new()).await; + let test_state = TestState::new(pool, TestAuthz::new()).await; let mut authz_req: AuthzRequest = serde_json::from_str( r#" diff --git a/src/app/api/v1/class/create_timestamp.rs b/src/app/api/v1/class/create_timestamp.rs index 4d228bfa..3746855c 100644 --- a/src/app/api/v1/class/create_timestamp.rs +++ b/src/app/api/v1/class/create_timestamp.rs @@ -74,11 +74,10 @@ mod create_timestamp_tests { use std::ops::Bound; use uuid::Uuid; - #[tokio::test] - async fn create_timestamp_unauthorized() { + #[sqlx::test] + async fn create_timestamp_unauthorized(pool: sqlx::PgPool) { let agent = TestAgent::new("web", "user1", USR_AUDIENCE); - - let db_pool = TestDb::new().await; + let db_pool = TestDb::new(pool); let webinar = { let mut conn = db_pool.get_conn().await; @@ -111,11 +110,10 @@ mod create_timestamp_tests { .expect_err("Unexpected success, should fail due to authz"); } - #[tokio::test] - async fn create_webinar_timestamp() { + #[sqlx::test] + async fn create_webinar_timestamp(pool: sqlx::PgPool) { let agent = TestAgent::new("web", "user1", USR_AUDIENCE); - - let db_pool = TestDb::new().await; + let db_pool = TestDb::new(pool); let webinar = { let mut conn = db_pool.get_conn().await; diff --git a/src/app/api/v1/class/properties.rs b/src/app/api/v1/class/properties.rs index e8535d8c..ec4a1c18 100644 --- a/src/app/api/v1/class/properties.rs +++ b/src/app/api/v1/class/properties.rs @@ -164,13 +164,13 @@ mod tests { use mockall::predicate as pred; use uuid::Uuid; - #[tokio::test] - async fn read_property_unauthorized() { + #[sqlx::test] + async fn read_property_unauthorized(pool: sqlx::PgPool) { let agent = TestAgent::new("web", "user1", USR_AUDIENCE); let event_room_id = Uuid::new_v4(); let conference_room_id = Uuid::new_v4(); - let state = TestState::new(TestAuthz::new()).await; + let state = TestState::new(pool, TestAuthz::new()).await; let webinar = { let mut conn = state.get_conn().await.expect("Failed to fetch connection"); @@ -204,13 +204,13 @@ mod tests { .expect_err("Unexpectedly succeeded"); } - #[tokio::test] - async fn read_property() { + #[sqlx::test] + async fn read_property(pool: sqlx::PgPool) { let agent = TestAgent::new("web", "user1", USR_AUDIENCE); let event_room_id = Uuid::new_v4(); let conference_room_id = Uuid::new_v4(); - let db_pool = TestDb::new().await; + let db_pool = TestDb::new(pool); let webinar = { let mut conn = db_pool.get_conn().await; @@ -256,13 +256,13 @@ mod tests { assert_eq!(property_value, serde_json::json!("test2")); } - #[tokio::test] - async fn update_property_unauthorized() { + #[sqlx::test] + async fn update_property_unauthorized(pool: sqlx::PgPool) { let agent = TestAgent::new("web", "user1", USR_AUDIENCE); let event_room_id = Uuid::new_v4(); let conference_room_id = Uuid::new_v4(); - let state = TestState::new(TestAuthz::new()).await; + let state = TestState::new(pool, TestAuthz::new()).await; let webinar = { let mut conn = state.get_conn().await.expect("Failed to fetch connection"); @@ -297,13 +297,13 @@ mod tests { .expect_err("Unexpectedly succeeded"); } - #[tokio::test] - async fn update_property() { + #[sqlx::test] + async fn update_property(pool: sqlx::PgPool) { let agent = TestAgent::new("web", "user1", USR_AUDIENCE); let event_room_id = Uuid::new_v4(); let conference_room_id = Uuid::new_v4(); - let db_pool = TestDb::new().await; + let db_pool = TestDb::new(pool); let webinar = { let mut conn = db_pool.get_conn().await; diff --git a/src/app/api/v1/class/read.rs b/src/app/api/v1/class/read.rs index 5b8c941c..ccd394e9 100644 --- a/src/app/api/v1/class/read.rs +++ b/src/app/api/v1/class/read.rs @@ -219,11 +219,10 @@ mod tests { }; use serde_json::Value; - #[tokio::test] - async fn read_webinar_unauthorized() { + #[sqlx::test] + async fn read_webinar_unauthorized(pool: sqlx::PgPool) { let agent = TestAgent::new("web", "user1", USR_AUDIENCE); - - let state = TestState::new(TestAuthz::new()).await; + let state = TestState::new(pool, TestAuthz::new()).await; let state = Arc::new(state); @@ -237,10 +236,10 @@ mod tests { .expect_err("Unexpectedly succeeded"); } - #[tokio::test] - async fn read_webinar() { + #[sqlx::test] + async fn read_webinar(pool: sqlx::PgPool) { let agent = TestAgent::new("web", "user1", USR_AUDIENCE); - let db_pool = TestDb::new().await; + let db_pool = TestDb::new(pool); let webinar = { let mut conn = db_pool.get_conn().await; @@ -281,10 +280,10 @@ mod tests { assert_eq!(v.get("turn_host").unwrap().as_str(), Some("turn0")); } - #[tokio::test] - async fn read_p2p() { + #[sqlx::test] + async fn read_p2p(pool: sqlx::PgPool) { let agent = TestAgent::new("web", "user1", USR_AUDIENCE); - let db_pool = TestDb::new().await; + let db_pool = TestDb::new(pool); let p2p = { let mut conn = db_pool.get_conn().await; @@ -329,10 +328,10 @@ mod tests { assert_eq!(turns.into_iter().collect::>().len(), 1); } - #[tokio::test] - async fn read_class_properties() { + #[sqlx::test] + async fn read_class_properties(pool: sqlx::PgPool) { let agent = TestAgent::new("web", "user1", USR_AUDIENCE); - let db_pool = TestDb::new().await; + let db_pool = TestDb::new(pool); let mut properties = KeyValueProperties::new(); properties.insert("test".to_owned(), serde_json::json!("test")); @@ -410,10 +409,10 @@ mod tests { ) } - #[tokio::test] - async fn read_account_properties() { + #[sqlx::test] + async fn read_account_properties(pool: sqlx::PgPool) { let agent = TestAgent::new("web", "user1", USR_AUDIENCE); - let db_pool = TestDb::new().await; + let db_pool = TestDb::new(pool); let mut properties = KeyValueProperties::new(); properties.insert("test".to_owned(), serde_json::json!("test")); diff --git a/src/app/api/v1/class/recreate.rs b/src/app/api/v1/class/recreate.rs index b76d3c61..b1f423d9 100644 --- a/src/app/api/v1/class/recreate.rs +++ b/src/app/api/v1/class/recreate.rs @@ -140,11 +140,11 @@ mod tests { use mockall::predicate as pred; use uuid::Uuid; - #[tokio::test] - async fn recreate_webinar_unauthorized() { + #[sqlx::test] + async fn recreate_webinar_unauthorized(pool: sqlx::PgPool) { let agent = TestAgent::new("web", "user1", USR_AUDIENCE); - let db_pool = TestDb::new().await; + let db_pool = TestDb::new(pool); let webinar = { let mut conn = db_pool.get_conn().await; @@ -172,14 +172,14 @@ mod tests { .expect_err("Unexpected success, should fail due to authz"); } - #[tokio::test] - async fn recreate_webinar() { + #[sqlx::test] + async fn recreate_webinar(pool: sqlx::PgPool) { let agent = TestAgent::new("web", "user1", USR_AUDIENCE); let recreated_event_room_id = Uuid::new_v4(); let recreated_conference_room_id = Uuid::new_v4(); - let db_pool = TestDb::new().await; + let db_pool = TestDb::new(pool); let webinar = { let mut conn = db_pool.get_conn().await; diff --git a/src/app/api/v1/class/update.rs b/src/app/api/v1/class/update.rs index da11c6b3..ca8e4fed 100644 --- a/src/app/api/v1/class/update.rs +++ b/src/app/api/v1/class/update.rs @@ -185,13 +185,13 @@ mod tests { let _update: ClassUpdate = serde_json::from_str(update).unwrap(); } - #[tokio::test] - async fn update_webinar_unauthorized() { + #[sqlx::test] + async fn update_webinar_unauthorized(pool: sqlx::PgPool) { let agent = TestAgent::new("web", "user1", USR_AUDIENCE); let event_room_id = Uuid::new_v4(); let conference_room_id = Uuid::new_v4(); - let state = TestState::new(TestAuthz::new()).await; + let state = TestState::new(pool, TestAuthz::new()).await; let webinar = { let mut conn = state.get_conn().await.expect("Failed to fetch connection"); factory::Webinar::new( @@ -220,13 +220,13 @@ mod tests { .expect_err("Unexpectedly succeeded"); } - #[tokio::test] - async fn update_webinar() { + #[sqlx::test] + async fn update_webinar(pool: sqlx::PgPool) { let agent = TestAgent::new("web", "user1", USR_AUDIENCE); let event_room_id = Uuid::new_v4(); let conference_room_id = Uuid::new_v4(); - let db_pool = TestDb::new().await; + let db_pool = TestDb::new(pool); let webinar = { let mut conn = db_pool.get_conn().await; diff --git a/src/app/api/v1/minigroup/mod.rs b/src/app/api/v1/minigroup/mod.rs index 3551b76e..c5ae2da8 100644 --- a/src/app/api/v1/minigroup/mod.rs +++ b/src/app/api/v1/minigroup/mod.rs @@ -247,14 +247,14 @@ mod tests { use mockall::predicate as pred; use uuid::Uuid; - #[tokio::test] - async fn create_minigroup_no_time() { + #[sqlx::test] + async fn create_minigroup_no_time(pool: sqlx::PgPool) { let agent = TestAgent::new("web", "user1", USR_AUDIENCE); let mut authz = TestAuthz::new(); authz.allow(agent.account_id(), vec!["classrooms"], "create"); - let mut state = TestState::new(authz).await; + let mut state = TestState::new(pool, authz).await; let event_room_id = Uuid::new_v4(); let conference_room_id = Uuid::new_v4(); @@ -289,14 +289,14 @@ mod tests { assert_eq!(new_minigroup.reserve(), Some(10),); } - #[tokio::test] - async fn create_minigroup_with_time() { + #[sqlx::test] + async fn create_minigroup_with_time(pool: sqlx::PgPool) { let agent = TestAgent::new("web", "user1", USR_AUDIENCE); let mut authz = TestAuthz::new(); authz.allow(agent.account_id(), vec!["classrooms"], "create"); - let mut state = TestState::new(authz).await; + let mut state = TestState::new(pool, authz).await; let event_room_id = Uuid::new_v4(); let conference_room_id = Uuid::new_v4(); @@ -337,11 +337,11 @@ mod tests { assert_eq!(new_minigroup.reserve(), Some(10),); } - #[tokio::test] - async fn create_minigroup_unauthorized() { + #[sqlx::test] + async fn create_minigroup_unauthorized(pool: sqlx::PgPool) { let agent = TestAgent::new("web", "user1", USR_AUDIENCE); - let state = TestState::new(TestAuthz::new()).await; + let state = TestState::new(pool, TestAuthz::new()).await; let scope = random_string(); @@ -362,14 +362,14 @@ mod tests { .expect_err("Unexpectedly succeeded"); } - #[tokio::test] - async fn create_minigroup_with_properties() { + #[sqlx::test] + async fn create_minigroup_with_properties(pool: sqlx::PgPool) { let agent = TestAgent::new("web", "user1", USR_AUDIENCE); let mut authz = TestAuthz::new(); authz.allow(agent.account_id(), vec!["classrooms"], "create"); - let mut state = TestState::new(authz).await; + let mut state = TestState::new(pool, authz).await; let event_room_id = Uuid::new_v4(); let conference_room_id = Uuid::new_v4(); diff --git a/src/app/api/v1/p2p/mod.rs b/src/app/api/v1/p2p/mod.rs index 3c9ae843..a05607f9 100644 --- a/src/app/api/v1/p2p/mod.rs +++ b/src/app/api/v1/p2p/mod.rs @@ -242,14 +242,14 @@ mod tests { use mockall::predicate as pred; use uuid::Uuid; - #[tokio::test] - async fn create_p2p() { + #[sqlx::test] + async fn create_p2p(pool: sqlx::PgPool) { let agent = TestAgent::new("web", "user1", USR_AUDIENCE); let mut authz = TestAuthz::new(); authz.allow(agent.account_id(), vec!["classrooms"], "create"); - let mut state = TestState::new(authz).await; + let mut state = TestState::new(pool, authz).await; let event_room_id = Uuid::new_v4(); let conference_room_id = Uuid::new_v4(); @@ -279,11 +279,11 @@ mod tests { .expect("p2p not found"); } - #[tokio::test] - async fn create_p2p_unauthorized() { + #[sqlx::test] + async fn create_p2p_unauthorized(pool: sqlx::PgPool) { let agent = TestAgent::new("web", "user1", USR_AUDIENCE); - let state = TestState::new(TestAuthz::new()).await; + let state = TestState::new(pool, TestAuthz::new()).await; let scope = random_string(); @@ -301,14 +301,14 @@ mod tests { .expect_err("Unexpectedly succeeded"); } - #[tokio::test] - async fn create_p2p_with_properties() { + #[sqlx::test] + async fn create_p2p_with_properties(pool: sqlx::PgPool) { let agent = TestAgent::new("web", "user1", USR_AUDIENCE); let mut authz = TestAuthz::new(); authz.allow(agent.account_id(), vec!["classrooms"], "create"); - let mut state = TestState::new(authz).await; + let mut state = TestState::new(pool, authz).await; let event_room_id = Uuid::new_v4(); let conference_room_id = Uuid::new_v4(); diff --git a/src/app/api/v1/tests/mod.rs b/src/app/api/v1/tests/mod.rs index f189daf6..43addf16 100644 --- a/src/app/api/v1/tests/mod.rs +++ b/src/app/api/v1/tests/mod.rs @@ -8,9 +8,9 @@ use super::*; use crate::app::http; use crate::test_helpers::prelude::*; -#[tokio::test] -async fn test_healthz() { - let state = TestState::new(TestAuthz::new()).await; +#[sqlx::test] +async fn test_healthz(pool: sqlx::PgPool) { + let state = TestState::new(pool, TestAuthz::new()).await; let state = Arc::new(state) as Arc; let app = http::router(state, HashMap::new()); @@ -29,15 +29,15 @@ async fn test_healthz() { assert_eq!(&body[..], b"Ok"); } -#[tokio::test] -async fn test_api_rollback() { +#[sqlx::test] +async fn test_api_rollback(pool: sqlx::PgPool) { let agent = TestAgent::new("web", "user123", USR_AUDIENCE); let token = agent.token(); let mut authz = TestAuthz::new(); authz.set_audience(SVC_AUDIENCE); authz.allow(agent.account_id(), vec!["scopes"], "rollback"); - let state = TestState::new(authz).await; + let state = TestState::new(pool, authz).await; let state = Arc::new(state) as Arc; let app = crate::app::http::router(state.clone(), make_authn()); diff --git a/src/app/api/v1/webinar/convert.rs b/src/app/api/v1/webinar/convert.rs index c6042abd..d3bcaaad 100644 --- a/src/app/api/v1/webinar/convert.rs +++ b/src/app/api/v1/webinar/convert.rs @@ -240,11 +240,11 @@ mod tests { use mockall::predicate as pred; use serde_json::{json, Value}; - #[tokio::test] - async fn convert_webinar_unauthorized() { + #[sqlx::test] + async fn convert_webinar_unauthorized(pool: sqlx::PgPool) { let agent = TestAgent::new("web", "user1", USR_AUDIENCE); - let state = TestState::new(TestAuthz::new()).await; + let state = TestState::new(pool, TestAuthz::new()).await; let event_room_id = Uuid::new_v4(); let conference_room_id = Uuid::new_v4(); @@ -269,14 +269,14 @@ mod tests { .expect_err("Unexpectedly succeeded"); } - #[tokio::test] - async fn convert_webinar() { + #[sqlx::test] + async fn convert_webinar(pool: sqlx::PgPool) { let agent = TestAgent::new("web", "user1", USR_AUDIENCE); let mut authz = TestAuthz::new(); authz.allow(agent.account_id(), vec!["classrooms"], "convert"); - let mut state = TestState::new(authz).await; + let mut state = TestState::new(pool, authz).await; let event_room_id = Uuid::new_v4(); let conference_room_id = Uuid::new_v4(); convert_webinar_mocks(&mut state, event_room_id, conference_room_id); @@ -313,14 +313,14 @@ mod tests { ); } - #[tokio::test] - async fn convert_webinar_with_recording() { + #[sqlx::test] + async fn convert_webinar_with_recording(pool: sqlx::PgPool) { let agent = TestAgent::new("web", "user1", USR_AUDIENCE); let mut authz = TestAuthz::new(); authz.allow(agent.account_id(), vec!["classrooms"], "convert"); - let mut state = TestState::new(authz).await; + let mut state = TestState::new(pool, authz).await; let event_room_id = Uuid::new_v4(); let conference_room_id = Uuid::new_v4(); convert_webinar_mocks(&mut state, event_room_id, conference_room_id); @@ -365,14 +365,14 @@ mod tests { ); } - #[tokio::test] - async fn convert_webinar_unspecified_time() { + #[sqlx::test] + async fn convert_webinar_unspecified_time(pool: sqlx::PgPool) { let agent = TestAgent::new("web", "user1", USR_AUDIENCE); let mut authz = TestAuthz::new(); authz.allow(agent.account_id(), vec!["classrooms"], "convert"); - let mut state = TestState::new(authz).await; + let mut state = TestState::new(pool, authz).await; let event_room_id = Uuid::new_v4(); let conference_room_id = Uuid::new_v4(); convert_unspecified_time_webinar_mocks(&mut state, event_room_id, conference_room_id); diff --git a/src/app/api/v1/webinar/create/replica.rs b/src/app/api/v1/webinar/create/replica.rs index eeeed083..cdf354a4 100644 --- a/src/app/api/v1/webinar/create/replica.rs +++ b/src/app/api/v1/webinar/create/replica.rs @@ -171,10 +171,10 @@ mod tests { use mockall::predicate as pred; use uuid::Uuid; - #[tokio::test] - async fn create_replica_unauthorized() { + #[sqlx::test] + async fn create_replica_unauthorized(pool: sqlx::PgPool) { let agent = TestAgent::new("web", "user1", USR_AUDIENCE); - let state = TestState::new(TestAuthz::new()).await; + let state = TestState::new(pool, TestAuthz::new()).await; let scope = random_string(); let class_id = Uuid::new_v4(); @@ -189,14 +189,14 @@ mod tests { .expect_err("Unexpectedly succeeded"); } - #[tokio::test] - async fn create_replica_original_webinar_not_found() { + #[sqlx::test] + async fn create_replica_original_webinar_not_found(pool: sqlx::PgPool) { let agent = TestAgent::new("web", "user1", USR_AUDIENCE); let mut authz = TestAuthz::new(); authz.allow(agent.account_id(), vec!["classrooms"], "create"); - let mut state = TestState::new(authz).await; + let mut state = TestState::new(pool, authz).await; let event_room_id = Uuid::new_v4(); let conference_room_id = Uuid::new_v4(); let class_id = Uuid::new_v4(); @@ -216,14 +216,14 @@ mod tests { .expect_err("Unexpectedly succeeded"); } - #[tokio::test] - async fn create_replica_from_original_webinar() { + #[sqlx::test] + async fn create_replica_from_original_webinar(pool: sqlx::PgPool) { let agent = TestAgent::new("web", "user1", USR_AUDIENCE); let mut authz = TestAuthz::new(); authz.allow(agent.account_id(), vec!["classrooms"], "create"); - let mut state = TestState::new(authz).await; + let mut state = TestState::new(pool, authz).await; let event_room_id = Uuid::new_v4(); let conference_room_id = Uuid::new_v4(); diff --git a/src/app/api/v1/webinar/create/webinar.rs b/src/app/api/v1/webinar/create/webinar.rs index 35af28d5..1b08315a 100644 --- a/src/app/api/v1/webinar/create/webinar.rs +++ b/src/app/api/v1/webinar/create/webinar.rs @@ -178,14 +178,14 @@ mod tests { use mockall::predicate as pred; use uuid::Uuid; - #[tokio::test] - async fn create_webinar_no_time() { + #[sqlx::test] + async fn create_webinar_no_time(pool: sqlx::PgPool) { let agent = TestAgent::new("web", "user1", USR_AUDIENCE); let mut authz = TestAuthz::new(); authz.allow(agent.account_id(), vec!["classrooms"], "create"); - let mut state = TestState::new(authz).await; + let mut state = TestState::new(pool, authz).await; let event_room_id = Uuid::new_v4(); let conference_room_id = Uuid::new_v4(); @@ -220,14 +220,14 @@ mod tests { assert_eq!(new_webinar.reserve(), Some(10)); } - #[tokio::test] - async fn create_webinar_with_time() { + #[sqlx::test] + async fn create_webinar_with_time(pool: sqlx::PgPool) { let agent = TestAgent::new("web", "user1", USR_AUDIENCE); let mut authz = TestAuthz::new(); authz.allow(agent.account_id(), vec!["classrooms"], "create"); - let mut state = TestState::new(authz).await; + let mut state = TestState::new(pool, authz).await; let event_room_id = Uuid::new_v4(); let conference_room_id = Uuid::new_v4(); @@ -268,11 +268,10 @@ mod tests { assert_eq!(new_webinar.reserve(), Some(10)); } - #[tokio::test] - async fn create_webinar_unauthorized() { + #[sqlx::test] + async fn create_webinar_unauthorized(pool: sqlx::PgPool) { let agent = TestAgent::new("web", "user1", USR_AUDIENCE); - - let state = TestState::new(TestAuthz::new()).await; + let state = TestState::new(pool, TestAuthz::new()).await; let scope = random_string(); @@ -293,14 +292,14 @@ mod tests { .expect_err("Unexpectedly succeeded"); } - #[tokio::test] - async fn create_webinar_with_properties() { + #[sqlx::test] + async fn create_webinar_with_properties(pool: sqlx::PgPool) { let agent = TestAgent::new("web", "user1", USR_AUDIENCE); let mut authz = TestAuthz::new(); authz.allow(agent.account_id(), vec!["classrooms"], "create"); - let mut state = TestState::new(authz).await; + let mut state = TestState::new(pool, authz).await; let event_room_id = Uuid::new_v4(); let conference_room_id = Uuid::new_v4(); diff --git a/src/app/api/v1/webinar/download.rs b/src/app/api/v1/webinar/download.rs index 249e59a8..eb3ddcc5 100644 --- a/src/app/api/v1/webinar/download.rs +++ b/src/app/api/v1/webinar/download.rs @@ -79,11 +79,11 @@ mod tests { use chrono::{Duration, Utc}; use hyper::body::to_bytes; - #[tokio::test] - async fn create_webinar_timestamp() { + #[sqlx::test] + async fn create_webinar_timestamp(pool: sqlx::PgPool) { let agent = TestAgent::new("web", "user1", USR_AUDIENCE); - let db_pool = TestDb::new().await; + let db_pool = TestDb::new(pool); let webinar = { let mut conn = db_pool.get_conn().await; diff --git a/src/app/error.rs b/src/app/error.rs index 339d13db..2e8da560 100644 --- a/src/app/error.rs +++ b/src/app/error.rs @@ -16,7 +16,7 @@ struct ErrorKindProperties { is_notify_sentry: bool, } -#[derive(Debug, Clone, Copy)] +#[derive(Debug, Clone, Copy, PartialEq, Eq)] pub enum ErrorKind { AccessDenied, AuthorizationFailed, @@ -276,6 +276,11 @@ impl Error { } } } + + #[cfg(test)] + pub fn kind(&self) -> ErrorKind { + self.kind + } } impl IntoResponse for Error { diff --git a/src/app/postprocessing_strategy/minigroup/tests.rs b/src/app/postprocessing_strategy/minigroup/tests.rs index 4bfc660b..2209a7af 100644 --- a/src/app/postprocessing_strategy/minigroup/tests.rs +++ b/src/app/postprocessing_strategy/minigroup/tests.rs @@ -14,10 +14,10 @@ mod handle_upload { use super::super::super::PostprocessingStrategy; use super::super::*; - #[tokio::test] - async fn handle_upload_stream() { + #[sqlx::test] + async fn handle_upload_stream(pool: sqlx::PgPool) { let now = Utc::now(); - let mut state = TestState::new(TestAuthz::new()).await; + let mut state = TestState::new(pool, TestAuthz::new()).await; let conference_room_id = Uuid::new_v4(); let event_room_id = Uuid::new_v4(); let agent1 = TestAgent::new("web", "user1", USR_AUDIENCE); @@ -221,10 +221,10 @@ mod handle_upload { assert_eq!(&recording2.created_by, agent2.agent_id()); } - #[tokio::test] - async fn handle_upload_mjr() { + #[sqlx::test] + async fn handle_upload_mjr(pool: sqlx::PgPool) { let now = Utc::now(); - let mut state = TestState::new(TestAuthz::new()).await; + let mut state = TestState::new(pool, TestAuthz::new()).await; let conference_room_id = Uuid::new_v4(); let event_room_id = Uuid::new_v4(); @@ -342,12 +342,12 @@ mod handle_adjust { use super::super::super::PostprocessingStrategy; use super::super::*; - #[tokio::test] - async fn handle_adjust() { + #[sqlx::test] + async fn handle_adjust(pool: sqlx::PgPool) { let now = Utc::now(); let agent1 = TestAgent::new("web", "user1", USR_AUDIENCE); let agent2 = TestAgent::new("web", "user2", USR_AUDIENCE); - let mut state = TestState::new(TestAuthz::new()).await; + let mut state = TestState::new(pool, TestAuthz::new()).await; let event_room_id = Uuid::new_v4(); let original_event_room_id = Uuid::new_v4(); let modified_event_room_id = Uuid::new_v4(); @@ -582,12 +582,12 @@ mod handle_adjust { } } - #[tokio::test] - async fn handle_adjust_with_pin_and_unpin() { + #[sqlx::test] + async fn handle_adjust_with_pin_and_unpin(pool: sqlx::PgPool) { let now = Utc::now(); let agent1 = TestAgent::new("web", "user1", USR_AUDIENCE); let agent2 = TestAgent::new("web", "user2", USR_AUDIENCE); - let mut state = TestState::new(TestAuthz::new()).await; + let mut state = TestState::new(pool, TestAuthz::new()).await; let event_room_id = Uuid::new_v4(); let original_event_room_id = Uuid::new_v4(); let modified_event_room_id = Uuid::new_v4(); @@ -822,12 +822,12 @@ mod handle_transcoding_completion { use super::super::super::PostprocessingStrategy; use super::super::*; - #[tokio::test] - async fn handle_transcoding_completion() { + #[sqlx::test] + async fn handle_transcoding_completion(pool: sqlx::PgPool) { let now = Utc::now(); let agent1 = TestAgent::new("web", "user1", USR_AUDIENCE); let agent2 = TestAgent::new("web", "user2", USR_AUDIENCE); - let state = TestState::new(TestAuthz::new()).await; + let state = TestState::new(pool, TestAuthz::new()).await; // Insert a minigroup with recordings. let (minigroup, recording1, recording2) = { diff --git a/src/app/stage/ban.rs b/src/app/stage/ban.rs index de696a19..6735b49b 100644 --- a/src/app/stage/ban.rs +++ b/src/app/stage/ban.rs @@ -198,7 +198,7 @@ pub async fn handle_video_streaming_banned( .error(AppErrorKind::DbConnAcquisitionFailed) .transient()?; - let op = ban_account_op::UpsertQuery::new_video_streaming_banned( + let op = ban_account_op::UpdateQuery::new_video_streaming_banned( video_streaming_banned.target_account.clone(), video_streaming_banned.operation_id, ) @@ -229,7 +229,7 @@ pub async fn handle_collaboration_banned( .error(AppErrorKind::DbConnAcquisitionFailed) .transient()?; - let op = ban_account_op::UpsertQuery::new_collaboration_banned( + let op = ban_account_op::UpdateQuery::new_collaboration_banned( collaboration_banned.target_account.clone(), collaboration_banned.operation_id, ) @@ -271,3 +271,364 @@ async fn finish( ) .await } + +#[cfg(test)] +mod tests { + use svc_events::{Event, EventV1}; + + use super::*; + + use crate::app::AppContext; + use crate::test_helpers::prelude::*; + + #[sqlx::test] + async fn handles_intents(pool: sqlx::PgPool) { + let state = TestState::new(pool, TestAuthz::new()).await; + + let minigroup = { + let mut conn = state.get_conn().await.expect("Failed to fetch connection"); + factory::Minigroup::new( + random_string(), + USR_AUDIENCE.to_string(), + (Bound::Unbounded, Bound::Unbounded).into(), + Uuid::new_v4(), + Uuid::new_v4(), + ) + .insert(&mut conn) + .await + }; + + let agent1 = TestAgent::new("web", "user1", USR_AUDIENCE); + + let intent = BanIntentV1 { + classroom_id: minigroup.id(), + ban: true, + sender: agent1.agent_id().clone(), + last_operation_id: 0, + target_account: agent1.account_id().clone(), + }; + let intent_id: EventId = ("ban".to_string(), "intent".to_string(), 0).into(); + + handle_intent(&state, intent.clone(), intent_id) + .await + .expect("failed to handle intent"); + + { + let pub_reqs = state.inspect_nats_client().get_publish_requests(); + assert_eq!(pub_reqs.len(), 1); + + let payload = serde_json::from_slice::(&pub_reqs[0].payload) + .expect("failed to parse event"); + assert!(matches!(payload, Event::V1(EventV1::BanAccepted(..)))); + } + + // should fail b/c we already started an operation with another sequence id + let intent_id: EventId = ("ban".to_string(), "intent".to_string(), 1).into(); + + handle_intent(&state, intent.clone(), intent_id) + .await + .expect("failed to handle intent"); + + { + let pub_reqs = state.inspect_nats_client().get_publish_requests(); + assert_eq!(pub_reqs.len(), 2); + + let payload = serde_json::from_slice::(&pub_reqs[1].payload) + .expect("failed to parse event"); + assert!(matches!(payload, Event::V1(EventV1::BanRejected(..)))); + } + } + + #[sqlx::test] + async fn fails_to_handle_video_streaming_banned_if_there_was_no_intent(pool: sqlx::PgPool) { + let state = TestState::new(pool, TestAuthz::new()).await; + let agent1 = TestAgent::new("web", "user1", USR_AUDIENCE); + + let video_streaming_banned = BanVideoStreamingCompletedV1 { + ban: true, + classroom_id: Uuid::new_v4(), + target_account: agent1.account_id().clone(), + operation_id: 0, + parent: ("ban".to_owned(), "accepted".to_owned(), 0).into(), + }; + + let r = handle_video_streaming_banned(&state, video_streaming_banned).await; + assert!(matches!( + r, + Err(HandleMessageFailure::Permanent( + e @ crate::app::error::Error { .. } + )) if e.kind() == AppErrorKind::OperationFailed + )); + + { + let mut conn = state.get_conn().await.expect("Failed to fetch connection"); + let r = db::ban_account_op::ReadQuery::by_account_id(agent1.account_id()) + .execute(&mut conn) + .await + .expect("failed to fetch ban account op entry"); + assert!(r.is_none()); + } + + { + let pub_reqs = state.inspect_nats_client().get_publish_requests(); + assert_eq!(pub_reqs.len(), 0); + } + } + + #[sqlx::test] + async fn fails_to_handle_collaboration_banned_if_there_was_no_intent(pool: sqlx::PgPool) { + let state = TestState::new(pool, TestAuthz::new()).await; + let agent1 = TestAgent::new("web", "user1", USR_AUDIENCE); + + let collaboration_banned = BanCollaborationCompletedV1 { + ban: true, + classroom_id: Uuid::new_v4(), + target_account: agent1.account_id().clone(), + operation_id: 0, + parent: ("ban".to_owned(), "accepted".to_owned(), 0).into(), + }; + + let r = handle_collaboration_banned(&state, collaboration_banned).await; + assert!(matches!( + r, + Err(HandleMessageFailure::Permanent( + e @ crate::app::error::Error { .. } + )) if e.kind() == AppErrorKind::OperationFailed + )); + + { + let mut conn = state.get_conn().await.expect("Failed to fetch connection"); + let r = db::ban_account_op::ReadQuery::by_account_id(agent1.account_id()) + .execute(&mut conn) + .await + .expect("failed to fetch ban account op entry"); + assert!(r.is_none()); + } + + { + let pub_reqs = state.inspect_nats_client().get_publish_requests(); + assert_eq!(pub_reqs.len(), 0); + } + } + + #[sqlx::test] + async fn handles_video_streaming_banned(pool: sqlx::PgPool) { + let state = TestState::new(pool, TestAuthz::new()).await; + + let minigroup = { + let mut conn = state.get_conn().await.expect("Failed to fetch connection"); + factory::Minigroup::new( + random_string(), + USR_AUDIENCE.to_string(), + (Bound::Unbounded, Bound::Unbounded).into(), + Uuid::new_v4(), + Uuid::new_v4(), + ) + .insert(&mut conn) + .await + }; + + let agent1 = TestAgent::new("web", "user-video", USR_AUDIENCE); + + { + let mut conn = state.get_conn().await.expect("Failed to fetch connection"); + db::ban_account_op::UpsertQuery::new_operation(agent1.account_id().clone(), 0, 0) + .execute(&mut conn) + .await + .expect("failed to run upsert query"); + }; + + let video_streaming_banned = BanVideoStreamingCompletedV1 { + ban: true, + classroom_id: minigroup.id(), + target_account: agent1.account_id().clone(), + operation_id: 0, + parent: ("ban".to_owned(), "accepted".to_owned(), 0).into(), + }; + + handle_video_streaming_banned(&state, video_streaming_banned) + .await + .expect("failed to handle video streaming banned"); + + { + let mut conn = state.get_conn().await.expect("Failed to fetch connection"); + let r = db::ban_account_op::ReadQuery::by_account_id(agent1.account_id()) + .execute(&mut conn) + .await + .expect("failed to fetch ban account op entry"); + assert!(matches!( + r, + Some(db::ban_account_op::Object { + last_op_id: 0, + is_video_streaming_banned: true, + .. + }) + )); + } + + { + let pub_reqs = state.inspect_nats_client().get_publish_requests(); + assert_eq!(pub_reqs.len(), 0); + } + } + + #[sqlx::test] + async fn handles_collaboration_banned(pool: sqlx::PgPool) { + let state = TestState::new(pool, TestAuthz::new()).await; + + let minigroup = { + let mut conn = state.get_conn().await.expect("Failed to fetch connection"); + factory::Minigroup::new( + random_string(), + USR_AUDIENCE.to_string(), + (Bound::Unbounded, Bound::Unbounded).into(), + Uuid::new_v4(), + Uuid::new_v4(), + ) + .insert(&mut conn) + .await + }; + + let agent1 = TestAgent::new("web", "user-collab", USR_AUDIENCE); + + { + let mut conn = state.get_conn().await.expect("Failed to fetch connection"); + db::ban_account_op::UpsertQuery::new_operation(agent1.account_id().clone(), 0, 0) + .execute(&mut conn) + .await + .expect("failed to run upsert query"); + }; + + let collaboration_banned = BanCollaborationCompletedV1 { + ban: true, + classroom_id: minigroup.id(), + target_account: agent1.account_id().clone(), + operation_id: 0, + parent: ("ban".to_owned(), "accepted".to_owned(), 0).into(), + }; + + handle_collaboration_banned(&state, collaboration_banned) + .await + .expect("failed to handle collaboration banned"); + + { + let mut conn = state.get_conn().await.expect("Failed to fetch connection"); + let r = db::ban_account_op::ReadQuery::by_account_id(agent1.account_id()) + .execute(&mut conn) + .await + .expect("failed to fetch ban account op entry"); + assert!(matches!( + r, + Some(db::ban_account_op::Object { + last_op_id: 0, + is_collaboration_banned: true, + .. + }) + )); + } + + { + let pub_reqs = state.inspect_nats_client().get_publish_requests(); + assert_eq!(pub_reqs.len(), 0); + } + } + + #[sqlx::test] + async fn finishes_operation(pool: sqlx::PgPool) { + let state = TestState::new(pool, TestAuthz::new()).await; + + let minigroup = { + let mut conn = state.get_conn().await.expect("Failed to fetch connection"); + factory::Minigroup::new( + random_string(), + USR_AUDIENCE.to_string(), + (Bound::Unbounded, Bound::Unbounded).into(), + Uuid::new_v4(), + Uuid::new_v4(), + ) + .insert(&mut conn) + .await + }; + + let agent1 = TestAgent::new("web", "user-finish", USR_AUDIENCE); + + { + let mut conn = state.get_conn().await.expect("Failed to fetch connection"); + db::ban_account_op::UpsertQuery::new_operation(agent1.account_id().clone(), 0, 0) + .execute(&mut conn) + .await + .expect("failed to run upsert query"); + }; + + let video_streaming_banned = BanVideoStreamingCompletedV1 { + ban: true, + classroom_id: minigroup.id(), + target_account: agent1.account_id().clone(), + operation_id: 0, + parent: ("ban".to_owned(), "accepted".to_owned(), 0).into(), + }; + + handle_video_streaming_banned(&state, video_streaming_banned) + .await + .expect("failed to handle video streaming banned"); + + { + let mut conn = state.get_conn().await.expect("Failed to fetch connection"); + let r = db::ban_account_op::ReadQuery::by_account_id(agent1.account_id()) + .execute(&mut conn) + .await + .expect("failed to fetch ban account op entry"); + assert!(matches!( + r, + Some(db::ban_account_op::Object { + last_op_id: 0, + is_video_streaming_banned: true, + .. + }) + )); + } + + { + let pub_reqs = state.inspect_nats_client().get_publish_requests(); + assert_eq!(pub_reqs.len(), 0); + } + + let collaboration_banned = BanCollaborationCompletedV1 { + ban: true, + classroom_id: minigroup.id(), + target_account: agent1.account_id().clone(), + operation_id: 0, + parent: ("ban".to_owned(), "accepted".to_owned(), 0).into(), + }; + + handle_collaboration_banned(&state, collaboration_banned) + .await + .expect("failed to handle collaboration banned"); + + { + let mut conn = state.get_conn().await.expect("Failed to fetch connection"); + let r = db::ban_account_op::ReadQuery::by_account_id(agent1.account_id()) + .execute(&mut conn) + .await + .expect("failed to fetch ban account op entry"); + assert!(matches!( + r, + Some(db::ban_account_op::Object { + last_op_id: 0, + is_collaboration_banned: true, + is_video_streaming_banned: true, + .. + }) + )); + } + + { + let pub_reqs = state.inspect_nats_client().get_publish_requests(); + assert_eq!(pub_reqs.len(), 1); + + let payload = serde_json::from_slice::(&pub_reqs[0].payload) + .expect("failed to parse event"); + assert!(matches!(payload, Event::V1(EventV1::BanCompleted(..)))); + } + } +} diff --git a/src/app/tide_state/mod.rs b/src/app/tide_state/mod.rs index 867a9bac..52ece2fa 100644 --- a/src/app/tide_state/mod.rs +++ b/src/app/tide_state/mod.rs @@ -33,7 +33,7 @@ pub trait AppContext: Sync + Send { fn config(&self) -> &Config; fn agent(&self) -> Option<&Agent>; fn turn_host_selector(&self) -> &TurnHostSelector; - fn nats_client(&self) -> Option<&dyn NatsClient>; + fn nats_client(&self) -> Option>; fn get_preroll_offset(&self, audience: &str) -> i64 { self.config() @@ -158,8 +158,8 @@ impl AppContext for TideState { &self.turn_host_selector } - fn nats_client(&self) -> Option<&dyn NatsClient> { - self.nats_client.as_deref() + fn nats_client(&self) -> Option> { + self.nats_client.clone() } } diff --git a/src/db/ban_account_op.rs b/src/db/ban_account_op.rs index 08d2b54e..bebe4254 100644 --- a/src/db/ban_account_op.rs +++ b/src/db/ban_account_op.rs @@ -1,6 +1,7 @@ use sqlx::PgConnection; use svc_authn::AccountId; +#[derive(Debug)] pub struct Object { pub user_account: AccountId, pub last_op_id: i64, @@ -62,8 +63,6 @@ pub async fn get_next_seq_id(conn: &mut PgConnection) -> sqlx::Result /// Returns `None` if `last_op_id` in database differs. pub struct UpsertQuery { user_account: AccountId, - is_video_streaming_banned: Option, - is_collaboration_banned: Option, last_op_id: i64, new_op_id: i64, } @@ -72,20 +71,63 @@ impl UpsertQuery { pub fn new_operation(user_account: AccountId, last_op_id: i64, new_op_id: i64) -> Self { Self { user_account, - is_video_streaming_banned: None, - is_collaboration_banned: None, last_op_id, new_op_id, } } + pub async fn execute(self, conn: &mut PgConnection) -> sqlx::Result> { + sqlx::query_as!( + Object, + r#" + INSERT INTO ban_account_op (user_account, last_op_id) + VALUES ($1, $2) + ON CONFLICT (user_account) DO UPDATE + SET + -- to reset sub-operation trackers only if op_id has changed + is_video_streaming_banned = ($2 = $3 AND ban_account_op.is_video_streaming_banned), + is_collaboration_banned = ($2 = $3 AND ban_account_op.is_collaboration_banned), + last_op_id = EXCLUDED.last_op_id + WHERE + -- allow to 'complete' operation if there's no change in last_op_id + -- or allow to do upsert without real changes so we can process + -- the same message twice + ban_account_op.last_op_id = EXCLUDED.last_op_id OR + -- allow change op id if the previous operation is completed + ( + ban_account_op.last_op_id = $3 AND + ban_account_op.is_video_streaming_banned = true AND + ban_account_op.is_collaboration_banned = true + ) + RETURNING + user_account AS "user_account: _", + last_op_id, + is_video_streaming_banned, + is_collaboration_banned + "#, + self.user_account as AccountId, + self.new_op_id, + self.last_op_id + ) + .fetch_optional(conn) + .await + } +} + +pub struct UpdateQuery { + user_account: AccountId, + is_video_streaming_banned: Option, + is_collaboration_banned: Option, + last_op_id: i64, +} + +impl UpdateQuery { pub fn new_video_streaming_banned(user_account: AccountId, op_id: i64) -> Self { Self { user_account, is_video_streaming_banned: Some(true), is_collaboration_banned: None, last_op_id: op_id, - new_op_id: op_id, } } @@ -95,7 +137,6 @@ impl UpsertQuery { is_video_streaming_banned: None, is_collaboration_banned: Some(true), last_op_id: op_id, - new_op_id: op_id, } } @@ -103,24 +144,13 @@ impl UpsertQuery { sqlx::query_as!( Object, r#" - INSERT INTO ban_account_op (user_account, last_op_id, is_video_streaming_banned, is_collaboration_banned) - VALUES ($1, $2, COALESCE($3, false), COALESCE($4, false)) - ON CONFLICT (user_account) DO UPDATE + UPDATE ban_account_op SET - is_video_streaming_banned = COALESCE($3, ban_account_op.is_video_streaming_banned), - is_collaboration_banned = COALESCE($4, ban_account_op.is_collaboration_banned), - last_op_id = EXCLUDED.last_op_id + is_video_streaming_banned = COALESCE($2, ban_account_op.is_video_streaming_banned), + is_collaboration_banned = COALESCE($3, ban_account_op.is_collaboration_banned) WHERE - -- allow to 'complete' operation if there's no change in last_op_id - -- or allow to do upsert without real changes so we can process - -- the same message twice - ban_account_op.last_op_id = EXCLUDED.last_op_id OR - -- allow change op id if the previous operation is completed - ( - ban_account_op.last_op_id = $5 AND - ban_account_op.is_video_streaming_banned = true AND - ban_account_op.is_collaboration_banned = true - ) + ban_account_op.last_op_id = $4 + AND ban_account_op.user_account = $1 RETURNING user_account AS "user_account: _", last_op_id, @@ -128,7 +158,6 @@ impl UpsertQuery { is_collaboration_banned "#, self.user_account as AccountId, - self.new_op_id, self.is_video_streaming_banned, self.is_collaboration_banned, self.last_op_id @@ -137,3 +166,130 @@ impl UpsertQuery { .await } } + +#[cfg(test)] +mod tests { + use super::*; + + use crate::app::AppContext; + use crate::test_helpers::prelude::*; + + #[sqlx::test] + async fn doesnt_allow_to_change_op_id_until_operation_is_completed(pool: sqlx::PgPool) { + let state = TestState::new(pool, TestAuthz::new()).await; + let mut conn = state.get_conn().await.expect("Failed to fetch connection"); + + let agent1 = TestAgent::new("web", "user1", USR_AUDIENCE); + + let r = UpsertQuery::new_operation(agent1.account_id().clone(), 0, 0) + .execute(&mut conn) + .await + .expect("failed to start new ban operation"); + assert!(r.is_some()); + + async fn test_possible_failure_states(agent: &TestAgent, conn: &mut sqlx::PgConnection) { + // should fail b/c previous operation is not completed yet + let r = UpsertQuery::new_operation(agent.account_id().clone(), 0, 100) + .execute(conn) + .await + .expect("failed to start new ban operation"); + assert!(r.is_none()); + + // should fail b/c we passed the wrong previous operation id + let r = UpsertQuery::new_operation(agent.account_id().clone(), 50, 100) + .execute(conn) + .await + .expect("failed to start new ban operation"); + assert!(r.is_none()); + + // should be ok since we didn't change anything (idempotency) + let r = UpsertQuery::new_operation(agent.account_id().clone(), 0, 0) + .execute(conn) + .await + .expect("failed to start new ban operation"); + assert!(r.is_some()); + } + + test_possible_failure_states(&agent1, &mut conn).await; + + // should fail b/c we passed to wrong operation id + let r = UpdateQuery::new_collaboration_banned(agent1.account_id().clone(), 10) + .execute(&mut conn) + .await + .expect("failed to complete collaboration ban"); + assert!(r.is_none()); + + let r = UpdateQuery::new_collaboration_banned(agent1.account_id().clone(), 0) + .execute(&mut conn) + .await + .expect("failed to complete collaboration ban"); + assert!(r.is_some()); + assert!(matches!( + r, + Some(Object { + is_collaboration_banned: true, + last_op_id: 0, + .. + }) + )); + + // should be ok to run twice (idempotency) + let r = UpdateQuery::new_collaboration_banned(agent1.account_id().clone(), 0) + .execute(&mut conn) + .await + .expect("failed to complete collaboration ban"); + assert!(r.is_some()); + + test_possible_failure_states(&agent1, &mut conn).await; + + // should fail b/c we passed to wrong operation id + let r = UpdateQuery::new_video_streaming_banned(agent1.account_id().clone(), 10) + .execute(&mut conn) + .await + .expect("failed to complete video streaming ban"); + assert!(r.is_none()); + + let r = UpdateQuery::new_video_streaming_banned(agent1.account_id().clone(), 0) + .execute(&mut conn) + .await + .expect("failed to complete video streaming ban"); + assert!(r.is_some()); + assert!(matches!( + r, + Some(Object { + is_video_streaming_banned: true, + is_collaboration_banned: true, + last_op_id: 0, + .. + }) + )); + + // should be ok to run twice (idempotency) + let r = UpdateQuery::new_video_streaming_banned(agent1.account_id().clone(), 0) + .execute(&mut conn) + .await + .expect("failed to complete video streaming ban"); + assert!(r.is_some()); + + // should fail b/c we passed the wrong previous operation id + let r = UpsertQuery::new_operation(agent1.account_id().clone(), 50, 100) + .execute(&mut conn) + .await + .expect("failed to start new ban operation"); + assert!(r.is_none()); + + // should be ok since we didn't change anything (idempotency) + let r = UpsertQuery::new_operation(agent1.account_id().clone(), 0, 0) + .execute(&mut conn) + .await + .expect("failed to start new ban operation"); + assert!(r.is_some()); + + // should be ok to start new operation afterwards + let r = UpsertQuery::new_operation(agent1.account_id().clone(), 0, 1) + .execute(&mut conn) + .await + .expect("failed to complete video streaming ban"); + assert!(r.is_some()); + } +} diff --git a/src/db/ban_history.rs b/src/db/ban_history.rs index 36accf4c..13cab186 100644 --- a/src/db/ban_history.rs +++ b/src/db/ban_history.rs @@ -2,7 +2,7 @@ use chrono::{DateTime, Utc}; use svc_authn::AccountId; use uuid::Uuid; -#[derive(Debug)] +#[derive(Debug, PartialEq, Eq)] pub struct Object { #[allow(unused)] target_account: AccountId, @@ -37,17 +37,34 @@ impl<'a> InsertQuery<'a> { sqlx::query_as!( Object, r#" - INSERT INTO ban_history (class_id, target_account, banned_operation_id) - VALUES ($1, $2, $3) - -- this allows us to run this query many times idempotently - ON CONFLICT (banned_operation_id) DO NOTHING - RETURNING - target_account AS "target_account: AccountId", - class_id, - banned_at, - banned_operation_id, + WITH i AS ( + INSERT INTO ban_history (class_id, target_account, banned_operation_id) + VALUES ($1, $2, $3) + -- this allows us to run this query many times idempotently + ON CONFLICT (banned_operation_id) DO NOTHING + RETURNING * + ) + -- gets row if there was no conflict + SELECT + target_account AS "target_account!: AccountId", + class_id AS "class_id!: _", + banned_at AS "banned_at!: _", + banned_operation_id AS "banned_operation_id!: _", unbanned_at, unbanned_operation_id + FROM i + -- or selects original row if there was a conflict + UNION + SELECT + target_account AS "target_account: AccountId", + class_id, + banned_at, + banned_operation_id, + unbanned_at, + unbanned_operation_id + FROM ban_history + WHERE + banned_operation_id = $3 "#, self.class_id, self.target_account as &AccountId, @@ -101,3 +118,90 @@ impl<'a> FinishBanQuery<'a> { .await } } + +#[cfg(test)] +mod tests { + use super::*; + + use crate::app::AppContext; + use crate::test_helpers::prelude::*; + + #[sqlx::test] + async fn finishes_ban(pool: sqlx::PgPool) { + let state = TestState::new(pool, TestAuthz::new()).await; + let mut conn = state.get_conn().await.expect("Failed to fetch connection"); + + let minigroup = factory::Minigroup::new( + random_string(), + USR_AUDIENCE.to_string(), + (Bound::Unbounded, Bound::Unbounded).into(), + Uuid::new_v4(), + Uuid::new_v4(), + ) + .insert(&mut conn) + .await; + + let entry_start = InsertQuery::new(minigroup.id(), &AccountId::new("test", "test"), 0) + .execute(&mut conn) + .await + .expect("failed to insert ban history entry"); + + let entry_finish = FinishBanQuery::new(minigroup.id(), &entry_start.target_account, 1) + .execute(&mut conn) + .await + .expect("failed to run finish ban query") + .expect("failed to find entry"); + + assert_eq!(entry_start.target_account, entry_finish.target_account); + assert_eq!(entry_start.class_id, entry_finish.class_id); + assert_eq!(entry_start.banned_at, entry_finish.banned_at); + assert_eq!( + entry_start.banned_operation_id, + entry_finish.banned_operation_id + ); + assert_eq!(entry_finish.unbanned_operation_id, Some(1)); + } + + #[sqlx::test] + async fn inserts_idempotently(pool: sqlx::PgPool) { + let state = TestState::new(pool, TestAuthz::new()).await; + let mut conn = state.get_conn().await.expect("Failed to fetch connection"); + + let minigroup1 = factory::Minigroup::new( + random_string(), + USR_AUDIENCE.to_string(), + (Bound::Unbounded, Bound::Unbounded).into(), + Uuid::new_v4(), + Uuid::new_v4(), + ) + .insert(&mut conn) + .await; + + let entry1 = InsertQuery::new(minigroup1.id(), &AccountId::new("test", "test"), 0) + .execute(&mut conn) + .await + .expect("failed to insert ban history entry"); + + let minigroup2 = factory::Minigroup::new( + random_string(), + USR_AUDIENCE.to_string(), + (Bound::Unbounded, Bound::Unbounded).into(), + Uuid::new_v4(), + Uuid::new_v4(), + ) + .insert(&mut conn) + .await; + + let entry2 = InsertQuery::new( + minigroup2.id(), + &AccountId::new("test-another", "test-another"), + 0, + ) + .execute(&mut conn) + .await + .expect("failed to insert ban history entry"); + + // so we just got original entry back + assert_eq!(entry1, entry2); + } +} diff --git a/src/db/class/insert_query.rs b/src/db/class/insert_query.rs index 47829646..28f97b53 100644 --- a/src/db/class/insert_query.rs +++ b/src/db/class/insert_query.rs @@ -214,9 +214,9 @@ mod tests { use super::*; use crate::test_helpers::prelude::*; - #[tokio::test] - async fn insert_already_established_webinar() { - let db = TestDb::new().await; + #[sqlx::test] + async fn insert_already_established_webinar(pool: sqlx::PgPool) { + let db = TestDb::new(pool); let mut conn = db.get_conn().await; let webinar = { @@ -255,9 +255,9 @@ mod tests { assert_eq!(time.0, Bound::Unbounded); } - #[tokio::test] - async fn insert_not_established_webinar() { - let db = TestDb::new().await; + #[sqlx::test] + async fn insert_not_established_webinar(pool: sqlx::PgPool) { + let db = TestDb::new(pool); let mut conn = db.get_conn().await; let dummy = InsertQuery::new( diff --git a/src/db/recording.rs b/src/db/recording.rs index 61afbf7f..660c7eaf 100644 --- a/src/db/recording.rs +++ b/src/db/recording.rs @@ -10,7 +10,7 @@ use serde_derive::{Deserialize, Serialize}; //////////////////////////////////////////////////////////////////////////////// #[allow(dead_code)] -#[derive(Clone, Debug)] +#[derive(Clone, Debug, PartialEq, Eq)] pub struct Object { id: Uuid, class_id: Uuid, @@ -573,11 +573,11 @@ pub mod tests { use crate::app::AppContext; use crate::test_helpers::prelude::*; - #[tokio::test] - async fn test_minigroup_adjust_not_using_deleted_recordings() { + #[sqlx::test] + async fn test_minigroup_adjust_not_using_deleted_recordings(pool: sqlx::PgPool) { let agent = TestAgent::new("web", "user1", USR_AUDIENCE); - let state = TestState::new(TestAuthz::new()).await; + let state = TestState::new(pool, TestAuthz::new()).await; let mut conn = state.get_conn().await.expect("Failed to fetch connection"); let minigroup = factory::Minigroup::new( random_string(), @@ -614,6 +614,49 @@ pub mod tests { assert_eq!(recordings[0].rtc_id(), recording.rtc_id()); } + #[sqlx::test] + async fn list_recording_skips_records_from_banned_users(pool: sqlx::PgPool) { + let agent1 = TestAgent::new("web", "user1", USR_AUDIENCE); + let agent2 = TestAgent::new("web", "user2", USR_AUDIENCE); + + let state = TestState::new(pool, TestAuthz::new()).await; + let mut conn = state.get_conn().await.expect("Failed to fetch connection"); + let minigroup = factory::Minigroup::new( + random_string(), + USR_AUDIENCE.to_string(), + (Bound::Unbounded, Bound::Unbounded).into(), + Uuid::new_v4(), + Uuid::new_v4(), + ) + .insert(&mut conn) + .await; + + // Recording of banned agent + let _rec1 = + factory::Recording::new(minigroup.id(), Uuid::new_v4(), agent1.agent_id().to_owned()) + .insert(&mut conn) + .await; + + crate::db::ban_history::InsertQuery::new(minigroup.id(), agent1.account_id(), 0) + .execute(&mut conn) + .await + .expect("failed to insert ban history entry"); + + // Recording of good agent + let rec2 = + factory::Recording::new(minigroup.id(), Uuid::new_v4(), agent2.agent_id().to_owned()) + .insert(&mut conn) + .await; + + let recs = RecordingListQuery::new(minigroup.id()) + .execute(&mut conn) + .await + .expect("failed to run list query"); + + assert_eq!(recs.len(), 1); + assert_eq!(recs[0], rec2); + } + pub struct RecordingInsertQuery { class_id: Uuid, rtc_id: Uuid, diff --git a/src/test_helpers/db.rs b/src/test_helpers/db.rs index 94aad054..98384e1a 100644 --- a/src/test_helpers/db.rs +++ b/src/test_helpers/db.rs @@ -1,41 +1,13 @@ -use std::env::var; +use sqlx::pool::PoolConnection; +use sqlx::postgres::{PgPool, Postgres}; -use sqlx::postgres::{PgPool, PgPoolOptions, Postgres}; -use sqlx::{pool::PoolConnection, Executor}; -use tokio::sync::OnceCell; - -static DB_TRUNCATE: OnceCell = OnceCell::const_new(); #[derive(Clone)] pub struct TestDb { pool: PgPool, } impl TestDb { - pub async fn new() -> Self { - #[cfg(feature = "dotenv")] - dotenv::dotenv().ok(); - - let url = var("DATABASE_URL").expect("DATABASE_URL must be specified"); - - let pool = PgPoolOptions::new() - .min_connections(1) - .max_connections(1) - .connect(&url) - .await - .expect("Failed to connect to the DB"); - - // todo: we should actually run every test in transaction, but thats not possible for now, maybe in sqlx 0.6 - DB_TRUNCATE - .get_or_init(|| async { - let mut conn = pool.acquire().await.expect("Failed to get DB connection"); - - conn.execute("TRUNCATE class CASCADE;") - .await - .expect("Failed to truncate class table"); - - true - }) - .await; + pub fn new(pool: PgPool) -> Self { Self { pool } } diff --git a/src/test_helpers/state.rs b/src/test_helpers/state.rs index 603ca3f4..45e2743a 100644 --- a/src/test_helpers/state.rs +++ b/src/test_helpers/state.rs @@ -1,5 +1,6 @@ use parking_lot::Mutex; use std::sync::Arc; +use svc_nats_client::test_helpers::TestNatsClient; use anyhow::Result; use async_trait::async_trait; @@ -12,10 +13,7 @@ use svc_agent::{ AgentId, }; use svc_authz::ClientMap as Authz; -use svc_nats_client::{ - AckPolicy, DeliverPolicy, Event, Message, MessageStream, Messages, NatsClient, PublishError, - Subject, SubscribeError, TermMessageError, -}; +use svc_nats_client::NatsClient; use url::Url; use vec1::{vec1, Vec1}; @@ -45,7 +43,7 @@ pub struct TestState { tq_client: Arc, authz: Authz, turn_host_selector: TurnHostSelector, - nats_client: Option>, + nats_client: Arc, } fn build_config() -> Config { @@ -102,34 +100,8 @@ fn build_config() -> Config { serde_json::from_value::(config).expect("Failed to parse test config") } -struct TestNatsClient; - -#[async_trait] -impl NatsClient for TestNatsClient { - async fn publish(&self, _event: &Event) -> Result<(), PublishError> { - Ok(()) - } - - async fn subscribe_durable(&self) -> Result { - unimplemented!() - } - - async fn subscribe_ephemeral( - &self, - _subject: Subject, - _deliver_policy: DeliverPolicy, - _ack_policy: AckPolicy, - ) -> Result { - unimplemented!() - } - - async fn terminate(&self, _message: &Message) -> Result<(), TermMessageError> { - unimplemented!() - } -} - impl TestState { - pub async fn new(authz: TestAuthz) -> Self { + pub async fn new(db: sqlx::PgPool, authz: TestAuthz) -> Self { let config = build_config(); let agent = TestAgent::new(&config.agent_label, config.id.label(), config.id.audience()); @@ -137,7 +109,7 @@ impl TestState { let address = agent.address().to_owned(); Self { - db_pool: TestDb::new().await, + db_pool: TestDb::new(db), turn_host_selector: TurnHostSelector::new(&config.turn_hosts), config, agent, @@ -146,7 +118,7 @@ impl TestState { event_client: Arc::new(MockEventClient::new()), tq_client: Arc::new(MockTqClient::new()), authz: authz.into(), - nats_client: Some(Arc::new(TestNatsClient {}) as Arc), + nats_client: Arc::new(TestNatsClient::new()), } } @@ -167,7 +139,7 @@ impl TestState { tq_client: Arc::new(MockTqClient::new()), authz: authz.into(), turn_host_selector: TurnHostSelector::new(&vec1!["turn.example.org".into()]), - nats_client: Some(Arc::new(TestNatsClient {}) as Arc), + nats_client: Arc::new(TestNatsClient::new()), } } @@ -188,6 +160,10 @@ impl TestState { let hosts = Vec1::try_from_vec(hosts).unwrap(); self.turn_host_selector = TurnHostSelector::new(&hosts); } + + pub fn inspect_nats_client(&self) -> &TestNatsClient { + &self.nats_client + } } impl TestState { @@ -263,8 +239,8 @@ impl AppContext for TestState { &self.turn_host_selector } - fn nats_client(&self) -> Option<&dyn NatsClient> { - self.nats_client.as_deref() + fn nats_client(&self) -> Option> { + Some(self.nats_client.clone()) } }