diff --git a/CHANGELOG.md b/CHANGELOG.md index 26e39720665..541992759cb 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,6 +2,10 @@ ## Unreleased +**Internal**: + +- Add BufferService with SQLite backend. ([#1920](https://github.com/getsentry/relay/pull/1920)) + **Breaking Changes**: This release contains major changes to the web layer, including TCP and HTTP handling as well as all web endpoint handlers. Due to these changes, some functionality was retired and Relay responds differently in specific cases. @@ -36,6 +40,9 @@ Metrics: - Indicate if OS-version may be frozen with '>=' prefix. ([#1945](https://github.com/getsentry/relay/pull/1945)) - Normalize monitor slug parameters into slugs. ([#1913](https://github.com/getsentry/relay/pull/1913)) - Smart trim loggers for Java platforms. ([#1941](https://github.com/getsentry/relay/pull/1941)) + +**Internal**: + - PII scrub `span.data` by default. ([#1953](https://github.com/getsentry/relay/pull/1953)) - Scrub sensitive cookies. ([#1951](https://github.com/getsentry/relay/pull/1951))) diff --git a/Cargo.lock b/Cargo.lock index fc82e0a499c..80aed5147bb 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -17,6 +17,17 @@ version = "1.0.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f26201604c87b1e01bd3d98f8d5d9a8fcbb815e8cedb41ffccbeb4bf593a35fe" +[[package]] +name = "ahash" +version = "0.7.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fcb51a0695d8f838b1ee009b3fbf66bda078cd64590202a864a8f3e8c4315c47" +dependencies = [ + "getrandom", + "once_cell", + "version_check", +] + [[package]] name = "ahash" version = "0.8.3" @@ -125,13 +136,22 @@ dependencies = [ [[package]] name = "async-trait" -version = "0.1.64" +version = "0.1.68" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1cd7fce9ba8c3c042128ce72d8b2ddbf3a05747efb67ea0313c635e10bda47a2" +checksum = "b9ccdd8f2a161be9bd5c023df56f1b2a0bd1d83872ae53b71a84a12c9bf6e842" dependencies = [ "proc-macro2", "quote", - "syn", + "syn 2.0.11", +] + +[[package]] +name = "atoi" +version = "1.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d7c57d12312ff59c811c0643f4d80830505833c9ffaebd193d819392b265be8e" +dependencies = [ + "num-traits", ] [[package]] @@ -162,9 +182,9 @@ checksum = "d468802bab17cbc0cc575e9b053f41e72aa36bfa6b7f55e3529ffa43161b97fa" [[package]] name = "axum" -version = "0.6.11" +version = "0.6.12" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "13d8068b6ccb8b34db9de397c7043f91db8b4c66414952c6db944f238c4d3db3" +checksum = "349f8ccfd9221ee7d1f3d4b33e1f8319b3a81ed8f61f2ea40b37b859794b4491" dependencies = [ "async-trait", "axum-core", @@ -214,14 +234,14 @@ dependencies = [ [[package]] name = "axum-macros" -version = "0.3.6" +version = "0.3.7" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "39bcef27b56d5cad8912d735d5ed1286f073f7bcb88cc31b38a15b514fcf8600" +checksum = "2bb524613be645939e280b7279f7b017f98cf7f5ef084ec374df373530e73277" dependencies = [ "heck", "proc-macro2", "quote", - "syn", + "syn 2.0.11", ] [[package]] @@ -295,7 +315,7 @@ dependencies = [ "regex", "rustc-hash", "shlex", - "syn", + "syn 1.0.107", "which", ] @@ -550,7 +570,7 @@ dependencies = [ "proc-macro-error", "proc-macro2", "quote", - "syn", + "syn 1.0.107", ] [[package]] @@ -673,6 +693,21 @@ dependencies = [ "libc", ] +[[package]] +name = "crc" +version = "3.0.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "86ec7a15cbe22e59248fc7eadb1907dab5ba09372595da4d73dd805ed4417dfe" +dependencies = [ + "crc-catalog", +] + +[[package]] +name = "crc-catalog" +version = "2.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9cace84e55f07e7301bae1c519df89cdad8cc3cd868413d3fdbdeca9ff3db484" + [[package]] name = "crc16" version = "0.4.0" @@ -758,6 +793,16 @@ dependencies = [ "scopeguard", ] +[[package]] +name = "crossbeam-queue" +version = "0.3.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d1cfb3ea8a53f37c40dea2c7bedcbd88bdfae54f5e2175d6ecaff1c988353add" +dependencies = [ + "cfg-if", + "crossbeam-utils", +] + [[package]] name = "crossbeam-utils" version = "0.8.14" @@ -824,7 +869,7 @@ dependencies = [ "proc-macro2", "quote", "scratch", - "syn", + "syn 1.0.107", ] [[package]] @@ -841,7 +886,7 @@ checksum = "81bbeb29798b407ccd82a3324ade1a7286e0d29851475990b612670f6f5124d2" dependencies = [ "proc-macro2", "quote", - "syn", + "syn 1.0.107", ] [[package]] @@ -864,7 +909,7 @@ dependencies = [ "ident_case", "proc-macro2", "quote", - "syn", + "syn 1.0.107", ] [[package]] @@ -875,7 +920,7 @@ checksum = "b36230598a2d5de7ec1c6f51f72d8a99a9208daff41de2084d06e3fd3ea56685" dependencies = [ "darling_core", "quote", - "syn", + "syn 1.0.107", ] [[package]] @@ -904,7 +949,7 @@ dependencies = [ "proc-macro2", "quote", "rustc_version", - "syn", + "syn 1.0.107", ] [[package]] @@ -949,9 +994,15 @@ dependencies = [ "serde", "serde_json", "serde_yaml 0.9.17", - "syn", + "syn 1.0.107", ] +[[package]] +name = "dotenvy" +version = "0.15.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "03d8c417d7a8cb362e0c37e5d815f5eb7c37f79ff93707329d5a194e42e54ca0" + [[package]] name = "dyn-clone" version = "1.0.10" @@ -1087,7 +1138,7 @@ dependencies = [ "heck", "proc-macro2", "quote", - "syn", + "syn 1.0.107", ] [[package]] @@ -1098,7 +1149,7 @@ checksum = "c375b9c5eadb68d0a6efee2999fef292f45854c3444c86f09d8ab086ba942b0e" dependencies = [ "num-traits", "quote", - "syn", + "syn 1.0.107", ] [[package]] @@ -1119,7 +1170,7 @@ dependencies = [ "darling", "proc-macro2", "quote", - "syn", + "syn 1.0.107", ] [[package]] @@ -1165,6 +1216,12 @@ dependencies = [ "libc", ] +[[package]] +name = "event-listener" +version = "2.5.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0206175f82b8d6bf6652ff7d71a1e27fd2e4efde587fd368662814d6ec1d9ce0" + [[package]] name = "failure" version = "0.1.8" @@ -1214,6 +1271,18 @@ version = "0.3.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8ce81f49ae8a0482e4c55ea62ebbd7e5a686af544c00b9d090bba3ff9be97b3d" +[[package]] +name = "flume" +version = "0.10.14" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1657b4441c3403d9f7b3409e47575237dac27b1b5726df654a6ecbf92f0f7577" +dependencies = [ + "futures-core", + "futures-sink", + "pin-project", + "spin", +] + [[package]] name = "fnv" version = "1.0.7" @@ -1298,6 +1367,17 @@ dependencies = [ "futures-util", ] +[[package]] +name = "futures-intrusive" +version = "0.4.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a604f7a68fbf8103337523b1fadc8ade7361ee3f112f7c680ad179651616aed5" +dependencies = [ + "futures-core", + "lock_api", + "parking_lot 0.11.2", +] + [[package]] name = "futures-io" version = "0.3.26" @@ -1312,7 +1392,7 @@ checksum = "95a73af87da33b5acf53acfebdc339fe592ecf5357ac7c0a7734ab9d8c876a70" dependencies = [ "proc-macro2", "quote", - "syn", + "syn 1.0.107", ] [[package]] @@ -1450,6 +1530,9 @@ name = "hashbrown" version = "0.12.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8a9ee70c43aaf417c914396645a0fa852624801b24ebb7ae78fe8272889ac888" +dependencies = [ + "ahash 0.7.6", +] [[package]] name = "hashbrown" @@ -1457,7 +1540,16 @@ version = "0.13.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "43a3c133739dddd0d2990f9a4bdf8eb4b21ef50e4851ca85ab661199821d510e" dependencies = [ - "ahash", + "ahash 0.8.3", +] + +[[package]] +name = "hashlink" +version = "0.8.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "69fe1fcf8b4278d860ad0548329f892a3631fb63f82574df68275f34cdbe0ffa" +dependencies = [ + "hashbrown 0.12.3", ] [[package]] @@ -1490,6 +1582,9 @@ name = "heck" version = "0.4.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "95505c38b4572b2d910cecb0281560f54b440a19336cbbcb27bf6ce6adc6f5a8" +dependencies = [ + "unicode-segmentation", +] [[package]] name = "hermit-abi" @@ -1851,6 +1946,17 @@ dependencies = [ "winapi", ] +[[package]] +name = "libsqlite3-sys" +version = "0.24.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "898745e570c7d0453cc1fbc4a701eb6c662ed54e8fec8b7d14be137ebeeb9d14" +dependencies = [ + "cc", + "pkg-config", + "vcpkg", +] + [[package]] name = "libz-sys" version = "1.1.8" @@ -2152,7 +2258,7 @@ dependencies = [ "proc-macro-crate", "proc-macro2", "quote", - "syn", + "syn 1.0.107", ] [[package]] @@ -2205,7 +2311,7 @@ checksum = "b501e44f11665960c7e7fcf062c7d96a14ade4aa98116c004b2e37b5be7d736c" dependencies = [ "proc-macro2", "quote", - "syn", + "syn 1.0.107", ] [[package]] @@ -2254,6 +2360,17 @@ version = "6.4.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9b7820b9daea5457c9f21c69448905d723fbd21136ccf521748f23fd49e723ee" +[[package]] +name = "parking_lot" +version = "0.11.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7d17b78036a60663b797adeaee46f5c9dfebb86948d1255007a1d6be0271ff99" +dependencies = [ + "instant", + "lock_api", + "parking_lot_core 0.8.6", +] + [[package]] name = "parking_lot" version = "0.12.1" @@ -2261,7 +2378,21 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3742b2c103b9f06bc9fff0a37ff4912935851bee6d36f3c02bcc755bcfec228f" dependencies = [ "lock_api", - "parking_lot_core", + "parking_lot_core 0.9.7", +] + +[[package]] +name = "parking_lot_core" +version = "0.8.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "60a2cfe6f0ad2bfc16aefa463b497d5c7a5ecd44a23efa72aa342d90177356dc" +dependencies = [ + "cfg-if", + "instant", + "libc", + "redox_syscall", + "smallvec", + "winapi", ] [[package]] @@ -2325,7 +2456,7 @@ dependencies = [ "pest_meta", "proc-macro2", "quote", - "syn", + "syn 1.0.107", ] [[package]] @@ -2365,7 +2496,7 @@ checksum = "069bdb1e05adc7a8990dce9cc75370895fbe4e3d58b9b73bf1aee56359344a55" dependencies = [ "proc-macro2", "quote", - "syn", + "syn 1.0.107", ] [[package]] @@ -2451,7 +2582,7 @@ dependencies = [ "proc-macro-error-attr", "proc-macro2", "quote", - "syn", + "syn 1.0.107", "version_check", ] @@ -2468,9 +2599,9 @@ dependencies = [ [[package]] name = "proc-macro2" -version = "1.0.51" +version = "1.0.54" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5d727cae5b39d21da60fa540906919ad737832fe0b1c165da3a34d6548c849d6" +checksum = "e472a104799c74b514a57226160104aa483546de37e839ec50e3c2e41dd87534" dependencies = [ "unicode-ident", ] @@ -2492,9 +2623,9 @@ checksum = "a1d01941d82fa2ab50be1e79e6714289dd7cde78eba4c074bc5a4374f650dfe0" [[package]] name = "quote" -version = "1.0.23" +version = "1.0.26" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8856d8364d252a14d474036ea1358d63c9e6965c8e5c1885c18f73d70bff9c7b" +checksum = "4424af4bf778aae2051a77b60283332f386554255d722233d09fbfc7e30da2fc" dependencies = [ "proc-macro2", ] @@ -2506,7 +2637,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "51de85fb3fb6524929c8a2eb85e6b6d363de4e8c48f9e2c2eac4944abc181c93" dependencies = [ "log", - "parking_lot", + "parking_lot 0.12.1", "scheduled-thread-pool", ] @@ -2848,7 +2979,7 @@ dependencies = [ "globset", "lru", "once_cell", - "parking_lot", + "parking_lot 0.12.1", "regex", "schemars", "sentry-types", @@ -2917,7 +3048,7 @@ name = "relay-ffi-macros" version = "23.3.1" dependencies = [ "quote", - "syn", + "syn 1.0.107", ] [[package]] @@ -2981,7 +3112,7 @@ version = "23.3.1" dependencies = [ "proc-macro2", "quote", - "syn", + "syn 1.0.107", "synstructure", ] @@ -3145,7 +3276,7 @@ dependencies = [ "minidump", "multer", "once_cell", - "parking_lot", + "parking_lot 0.12.1", "regex", "relay-auth", "relay-aws-extension", @@ -3172,6 +3303,7 @@ dependencies = [ "serde_json", "similar-asserts", "smallvec", + "sqlx", "symbolic-common", "symbolic-unreal", "tempfile", @@ -3189,7 +3321,7 @@ name = "relay-statsd" version = "23.3.1" dependencies = [ "cadence", - "parking_lot", + "parking_lot 0.12.1", "rand 0.8.5", "relay-log", ] @@ -3378,7 +3510,7 @@ version = "0.2.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "977a7519bff143a44f842fd07e80ad1329295bd71686457f18e496736f4bf9bf" dependencies = [ - "parking_lot", + "parking_lot 0.12.1", ] [[package]] @@ -3404,7 +3536,7 @@ dependencies = [ "proc-macro2", "quote", "serde_derive_internals", - "syn", + "syn 1.0.107", ] [[package]] @@ -3436,7 +3568,7 @@ checksum = "bdbda6ac5cd1321e724fa9cee216f3a61885889b896f073b8f82322789c5250e" dependencies = [ "proc-macro2", "quote", - "syn", + "syn 1.0.107", ] [[package]] @@ -3632,7 +3764,7 @@ checksum = "af487d118eecd09402d70a5d72551860e788df87b464af30e5ea6a38c75c541e" dependencies = [ "proc-macro2", "quote", - "syn", + "syn 1.0.107", ] [[package]] @@ -3643,7 +3775,7 @@ checksum = "85bf8229e7920a9f636479437026331ce11aa132b4dde37d121944a44d6e5f3c" dependencies = [ "proc-macro2", "quote", - "syn", + "syn 1.0.107", ] [[package]] @@ -3659,9 +3791,9 @@ dependencies = [ [[package]] name = "serde_path_to_error" -version = "0.1.10" +version = "0.1.11" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "db0969fff533976baadd92e08b1d102c5a3d8a8049eadfd69d4d1e3c5b2ed189" +checksum = "f7f05c1d5476066defcdfacce1f52fc3cae3af1d3089727100c02ae92e5abbe0" dependencies = [ "serde", ] @@ -3825,7 +3957,7 @@ checksum = "133659a15339456eeeb07572eb02a91c91e9815e9cbc89566944d2c8d3efdbf6" dependencies = [ "proc-macro2", "quote", - "syn", + "syn 1.0.107", ] [[package]] @@ -3840,9 +3972,107 @@ dependencies = [ [[package]] name = "spin" -version = "0.9.6" +version = "0.9.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b5d6e0250b93c8427a177b849d144a96d5acc57006149479403d7861ab721e34" +checksum = "7dccf47db1b41fa1573ed27ccf5e08e3ca771cb994f776668c5ebda893b248fc" +dependencies = [ + "lock_api", +] + +[[package]] +name = "sqlformat" +version = "0.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0c12bc9199d1db8234678b7051747c07f517cdcf019262d1847b94ec8b1aee3e" +dependencies = [ + "itertools", + "nom", + "unicode_categories", +] + +[[package]] +name = "sqlx" +version = "0.6.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9249290c05928352f71c077cc44a464d880c63f26f7534728cca008e135c0428" +dependencies = [ + "sqlx-core", + "sqlx-macros", +] + +[[package]] +name = "sqlx-core" +version = "0.6.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "dcbc16ddba161afc99e14d1713a453747a2b07fc097d2009f4c300ec99286105" +dependencies = [ + "ahash 0.7.6", + "atoi", + "bitflags", + "byteorder", + "bytes", + "crc", + "crossbeam-queue", + "dotenvy", + "either", + "event-listener", + "flume", + "futures-channel", + "futures-core", + "futures-executor", + "futures-intrusive", + "futures-util", + "hashlink", + "hex", + "indexmap", + "itoa", + "libc", + "libsqlite3-sys", + "log", + "memchr", + "once_cell", + "paste", + "percent-encoding", + "sha2 0.10.6", + "smallvec", + "sqlformat", + "sqlx-rt", + "stringprep", + "thiserror", + "tokio-stream", + "url", +] + +[[package]] +name = "sqlx-macros" +version = "0.6.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b850fa514dc11f2ee85be9d055c512aa866746adfacd1cb42d867d68e6a5b0d9" +dependencies = [ + "dotenvy", + "either", + "heck", + "once_cell", + "proc-macro2", + "quote", + "sha2 0.10.6", + "sqlx-core", + "sqlx-rt", + "syn 1.0.107", + "url", +] + +[[package]] +name = "sqlx-rt" +version = "0.6.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "24c5b2d25fa654cc5f841750b8e1cdedbe21189bf9a9382ee90bfa9dd3562396" +dependencies = [ + "native-tls", + "once_cell", + "tokio", + "tokio-native-tls", +] [[package]] name = "stable_deref_trait" @@ -3858,12 +4088,22 @@ checksum = "213494b7a2b503146286049378ce02b482200519accc31872ee8be91fa820a08" dependencies = [ "new_debug_unreachable", "once_cell", - "parking_lot", + "parking_lot 0.12.1", "phf_shared", "precomputed-hash", "serde", ] +[[package]] +name = "stringprep" +version = "0.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8ee348cb74b87454fff4b551cbf727025810a004f88aeacae7f85b87f4e9a1c1" +dependencies = [ + "unicode-bidi", + "unicode-normalization", +] + [[package]] name = "strsim" version = "0.10.0" @@ -3924,6 +4164,17 @@ dependencies = [ "unicode-ident", ] +[[package]] +name = "syn" +version = "2.0.11" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "21e3787bb71465627110e7d87ed4faaa36c1f61042ee67badb9e2ef173accc40" +dependencies = [ + "proc-macro2", + "quote", + "unicode-ident", +] + [[package]] name = "sync_wrapper" version = "0.1.2" @@ -3938,7 +4189,7 @@ checksum = "f36bdaa60a83aca3921b5259d5400cbf5e90fc51931376a9bd4a0eb79aa7210f" dependencies = [ "proc-macro2", "quote", - "syn", + "syn 1.0.107", "unicode-xid", ] @@ -3998,7 +4249,7 @@ checksum = "1fb327af4685e4d03fa8cbcf1716380da910eeb2bb8be417e7f9fd3fb164f36f" dependencies = [ "proc-macro2", "quote", - "syn", + "syn 1.0.107", ] [[package]] @@ -4091,7 +4342,7 @@ checksum = "d266c00fde287f55d3f1c3e96c500c362a2b8c695076ec180f27918820bc6df8" dependencies = [ "proc-macro2", "quote", - "syn", + "syn 1.0.107", ] [[package]] @@ -4104,6 +4355,17 @@ dependencies = [ "tokio", ] +[[package]] +name = "tokio-stream" +version = "0.1.12" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8fb52b74f05dbf495a8fba459fdc331812b96aa086d9eb78101fa0d4569c3313" +dependencies = [ + "futures-core", + "pin-project-lite", + "tokio", +] + [[package]] name = "tokio-util" version = "0.7.4" @@ -4206,7 +4468,7 @@ checksum = "4017f8f45139870ca7e672686113917c71c7a6e02d4924eda67186083c03081a" dependencies = [ "proc-macro2", "quote", - "syn", + "syn 1.0.107", ] [[package]] @@ -4254,7 +4516,7 @@ dependencies = [ "ipconfig", "lazy_static", "lru-cache", - "parking_lot", + "parking_lot 0.12.1", "resolv-conf", "smallvec", "thiserror", @@ -4352,6 +4614,12 @@ version = "0.2.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f962df74c8c05a667b5ee8bcf162993134c104e96440b663c8daa176dc772d8c" +[[package]] +name = "unicode_categories" +version = "0.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "39ec24b3121d976906ece63c9daad25b85969647682eee313cb5779fdd69e14e" + [[package]] name = "unsafe-libyaml" version = "0.2.5" @@ -4469,7 +4737,7 @@ dependencies = [ "once_cell", "proc-macro2", "quote", - "syn", + "syn 1.0.107", "wasm-bindgen-shared", ] @@ -4503,7 +4771,7 @@ checksum = "2aff81306fcac3c7515ad4e177f521b5c9a15f2b08f4e32d823066102f35a5f6" dependencies = [ "proc-macro2", "quote", - "syn", + "syn 1.0.107", "wasm-bindgen-backend", "wasm-bindgen-shared", ] diff --git a/migrations/20230311081917_envelopes_table.sql b/migrations/20230311081917_envelopes_table.sql new file mode 100644 index 00000000000..e223e55cef4 --- /dev/null +++ b/migrations/20230311081917_envelopes_table.sql @@ -0,0 +1,8 @@ +CREATE TABLE IF NOT EXISTS envelopes ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + own_key TEXT, + sampling_key TEXT, + envelope BLOB +); + +CREATE INDEX IF NOT EXISTS project_keys ON envelopes (own_key, sampling_key); diff --git a/relay-config/src/config.rs b/relay-config/src/config.rs index 14522a63038..5ba27baa2e9 100644 --- a/relay-config/src/config.rs +++ b/relay-config/src/config.rs @@ -726,6 +726,33 @@ impl Default for Http { } } +fn buffer_max_connections() -> u32 { + 30 +} + +fn buffer_min_connections() -> u32 { + 10 +} + +/// Controls internal caching behavior. +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct PersistentBuffer { + /// The path to the persistent buffer file. + path: PathBuf, + /// Maximum number of connections, which will be maintained by the pool. + #[serde(default = "buffer_max_connections")] + max_connections: u32, + /// Minimal number of connections, which will be maintained by the pool. + #[serde(default = "buffer_min_connections")] + min_connections: u32, + /// The maximum size of the buffer to keep, in bytes. + /// + /// If not set the befault is 10737418240 bytes or 10 GB. + max_disk_size: Option<usize>, + /// The maximum number of envelopes to keep in the memory buffer before spooling them to disk. + max_memory_size: Option<usize>, +} + /// Controls internal caching behavior. #[derive(Serialize, Deserialize, Debug)] #[serde(default)] @@ -759,6 +786,11 @@ struct Cache { file_interval: u32, /// Interval for evicting outdated project configs from memory. eviction_interval: u32, + /// The settings to configure persistent buffering. + /// + /// When enabled all incoming envelopes will be persisted to the disk instead of keeping + /// them in memory. + persistent_envelope_buffer: Option<PersistentBuffer>, } impl Default for Cache { @@ -774,6 +806,7 @@ impl Default for Cache { batch_size: 500, file_interval: 10, // 10 seconds eviction_interval: 60, // 60 seconds + persistent_envelope_buffer: None, } } } @@ -1666,6 +1699,65 @@ impl Config { Duration::from_secs(self.values.cache.eviction_interval.into()) } + /// Returns `true` if the persistent envelope buffer is enabled, `false` otherwise. + pub fn cache_persistent_buffer_enabled(&self) -> bool { + self.values.cache.persistent_envelope_buffer.is_some() + } + + /// Returns the path of the buffer file if the `cache.persistent_envelope_buffer.path` is configured. + pub fn cache_persistent_buffer_path(&self) -> Option<PathBuf> { + self.values + .cache + .persistent_envelope_buffer + .as_ref() + .map(|b| b.path.to_owned()) + } + + /// Maximum number of connections to create to buffer file. + pub fn cache_persistent_buffer_max_connections(&self) -> u32 { + self.values + .cache + .persistent_envelope_buffer + .as_ref() + .map(|b| b.max_connections) + .unwrap_or(buffer_max_connections()) + } + + /// Minimum number of connections to create to buffer file. + pub fn cache_persistent_buffer_min_connections(&self) -> u32 { + self.values + .cache + .persistent_envelope_buffer + .as_ref() + .map(|b| b.min_connections) + .unwrap_or(buffer_min_connections()) + } + + /// The maximum size of the buffer, in bytes. + /// + /// Default: 10737418240 bytes or 10 GB. + pub fn cache_persistent_buffer_max_disk_size(&self) -> usize { + self.values + .cache + .persistent_envelope_buffer + .as_ref() + .and_then(|b| b.max_disk_size) + .unwrap_or(10 * 1024 * 1024 * 1024) + } + + /// The maximum size of the memory buffer. + /// + /// Set to smaller between `cache.persistent_envelope_buffer.max_memory_size` and `cache.envelope_buffer_size`. + /// Default set to half of `cache.envelope_buffer_size` + pub fn cache_persistent_buffer_max_memory_size(&self) -> usize { + self.values + .cache + .persistent_envelope_buffer + .as_ref() + .and_then(|b| b.max_memory_size) + .unwrap_or(self.envelope_buffer_size() / 2) + } + /// Returns the maximum size of an event payload in bytes. pub fn max_event_size(&self) -> usize { self.values.limits.max_event_size.as_bytes() diff --git a/relay-log/src/setup.rs b/relay-log/src/setup.rs index c2c4cd4ed96..4b53c1ef76e 100644 --- a/relay-log/src/setup.rs +++ b/relay-log/src/setup.rs @@ -131,7 +131,10 @@ fn set_default_filters(builder: &mut env_logger::Builder) { builder // Configure INFO as default for all third-party crates. .filter_level(LevelFilter::Info) - // Trust DNS is very spammy on INFO, so configure a higher warn level. + // Actix-web has useful information on the debug stream, so allow this. + .filter_module("actix_web::pipeline", LevelFilter::Debug) + // Logs from sqlx are very spammy on INFO level, so configure a higher WARN level. + .filter_module("sqlx", LevelFilter::Warn) .filter_module("trust_dns_proto", LevelFilter::Warn); // Add all internal modules with maximum log-level. diff --git a/relay-server/Cargo.toml b/relay-server/Cargo.toml index 11bf250b827..407a3a85822 100644 --- a/relay-server/Cargo.toml +++ b/relay-server/Cargo.toml @@ -69,6 +69,7 @@ rmp-serde = "1.1.1" serde = { version = "1.0.114", features = ["derive"] } serde_json = "1.0.55" smallvec = { version = "1.4.0", features = ["serde"] } +sqlx = { version = "0.6.2", features = ["macros", "migrate", "sqlite", "runtime-tokio-native-tls"], default-features=false } symbolic-common = { version = "11.1.0", optional = true, default-features=false } symbolic-unreal = { version = "11.1.0", optional = true, default-features=false, features=["serde"] } thiserror = "1.0.38" diff --git a/relay-server/src/actors/project_buffer.rs b/relay-server/src/actors/project_buffer.rs index 2161d093d3c..dca393f022f 100644 --- a/relay-server/src/actors/project_buffer.rs +++ b/relay-server/src/actors/project_buffer.rs @@ -1,10 +1,53 @@ use std::collections::{BTreeMap, BTreeSet}; +use std::io::{Error, ErrorKind}; +use std::path::PathBuf; +use std::pin::Pin; +use std::sync::Arc; +use futures::stream::{self, StreamExt}; use relay_common::ProjectKey; -use relay_system::{FromMessage, Interface, Service}; +use relay_config::Config; +use relay_log::LogError; +use relay_system::{Addr, FromMessage, Interface, Service}; +use sqlx::migrate::MigrateError; +use sqlx::sqlite::{SqliteConnectOptions, SqliteJournalMode, SqlitePoolOptions, SqliteRow}; +use sqlx::{Pool, QueryBuilder, Row, Sqlite}; use tokio::sync::mpsc; -use crate::utils::ManagedEnvelope; +use crate::actors::project_cache::{ProjectCache, UpdateBufferIndex}; +use crate::envelope::{Envelope, EnvelopeError}; +use crate::utils::{BufferGuard, ManagedEnvelope}; + +/// SQLite allocates space to hold all host parameters between 1 and the largest host parameter number used. +/// +/// To prevent excessive memory allocations, the maximum value of a host parameter number is SQLITE_MAX_VARIABLE_NUMBER, +/// which defaults to 999 for SQLite versions prior to 3.32.0 (2020-05-22) or 32766 for SQLite versions after 3.32.0. +/// +/// Keep it on the lower side for now. +const SQLITE_LIMIT_VARIABLE_NUMBER: usize = 999; + +/// The set of errors which can happend while working the the buffer. +#[derive(Debug, thiserror::Error)] +pub enum BufferError { + #[error("failed to store the envelope in the buffer, max size {0} reached")] + Full(u64), + + #[error("failed to move envelope from disk to memory")] + CapacityExceeded(#[from] crate::utils::BufferError), + + #[error("failed to get the size of the buffer on the filesystem")] + DatabaseSizeError(#[from] std::io::Error), + + /// Describes the errors linked with the `Sqlite` backed buffer. + #[error("failed to fetch data from the database")] + DatabaseError(#[from] sqlx::Error), + + #[error(transparent)] + EnvelopeError(#[from] EnvelopeError), + + #[error("failed to run migrations")] + MigrationFailed(#[from] MigrateError), +} /// This key represents the index element in the queue. /// @@ -42,13 +85,22 @@ impl Enqueue { /// Removes messages from the internal buffer and streams them to the sender. #[derive(Debug)] pub struct DequeueMany { + project_key: ProjectKey, keys: Vec<QueueKey>, sender: mpsc::UnboundedSender<ManagedEnvelope>, } impl DequeueMany { - pub fn new(keys: Vec<QueueKey>, sender: mpsc::UnboundedSender<ManagedEnvelope>) -> Self { - Self { keys, sender } + pub fn new( + project_key: ProjectKey, + keys: Vec<QueueKey>, + sender: mpsc::UnboundedSender<ManagedEnvelope>, + ) -> Self { + Self { + project_key, + keys, + sender, + } } } @@ -68,11 +120,14 @@ impl RemoveMany { } } -/// The envelopes [`BufferService`]. +/// The interface for [`BufferService`]. /// -/// Buffer maintaince internal storage (internal buffer) of the envelopes, which keep accumilating +/// Buffer maintaince internal storage (internal buffer) of the envelopes, which keep accumulating /// till the request to dequeue them again comes in. /// +/// The envelopes first will be kept in memory buffer and it that hits the configured limit the +/// envelopes will be buffer to the disk. +/// /// To add the envelopes to the buffer use [`Enqueue`] which will persists the envelope in the /// internal storage. To retrie the envelopes one can use [`DequeueMany`], where one expected /// provide the list of [`QueueKey`]s and the [`mpsc::UnboundedSender`] - all the found envelopes @@ -113,66 +168,334 @@ impl FromMessage<RemoveMany> for Buffer { } } -/// In-memory implementation of the [`Buffer`] interface. +#[derive(Debug)] +struct BufferSpoolConfig { + db: Pool<Sqlite>, + max_disk_size: usize, + max_memory_size: usize, +} + +/// [`Buffer`] interface implementation backed by SQLite. #[derive(Debug)] pub struct BufferService { - /// Contains the cache of the incoming envelopes. buffer: BTreeMap<QueueKey, Vec<ManagedEnvelope>>, + buffer_guard: Arc<BufferGuard>, + config: Arc<Config>, + project_cache: Addr<ProjectCache>, + spool_config: Option<BufferSpoolConfig>, + count_mem_envelopes: i64, } impl BufferService { - /// Creates a new [`BufferService`]. - pub fn new() -> Self { - Self { + async fn setup(path: &PathBuf) -> Result<(), BufferError> { + let options = SqliteConnectOptions::new() + .filename(path) + .journal_mode(SqliteJournalMode::Wal) + .create_if_missing(true); + + let db = SqlitePoolOptions::new().connect_with(options).await?; + sqlx::migrate!("../migrations").run(&db).await?; + + Ok(()) + } + + /// Creates a new [`BufferService`] from the provided path to the SQLite database file. + pub async fn create( + buffer_guard: Arc<BufferGuard>, + project_cache: Addr<ProjectCache>, + config: Arc<Config>, + ) -> Result<Self, BufferError> { + let mut service = Self { buffer: BTreeMap::new(), + buffer_guard, + config: config.clone(), + project_cache, + count_mem_envelopes: 0, + spool_config: None, + }; + + // Only if persistent buffer enabled, we create the pool and set the config. + if let Some(path) = config.cache_persistent_buffer_path() { + relay_log::info!("Using the buffer file: {}", path.to_string_lossy()); + + Self::setup(&path).await?; + + let options = SqliteConnectOptions::new() + .filename(&path) + // The WAL journaling mode uses a write-ahead log instead of a rollback journal to implement transactions. + // The WAL journaling mode is persistent; after being set it stays in effect + // across multiple database connections and after closing and reopening the database. + // + // 1. WAL is significantly faster in most scenarios. + // 2. WAL provides more concurrency as readers do not block writers and a writer does not block readers. Reading and writing can proceed concurrently. + // 3. Disk I/O operations tends to be more sequential using WAL. + // 4. WAL uses many fewer fsync() operations and is thus less vulnerable to problems on systems where the fsync() system call is broken. + .journal_mode(SqliteJournalMode::Wal) + // If shared-cache mode is enabled and a thread establishes multiple + // connections to the same database, the connections share a single data and schema cache. + // This can significantly reduce the quantity of memory and IO required by the system. + .shared_cache(true); + + let db = SqlitePoolOptions::new() + .max_connections(config.cache_persistent_buffer_max_connections()) + .min_connections(config.cache_persistent_buffer_min_connections()) + .connect_with(options) + .await?; + + let spool_config = BufferSpoolConfig { + db, + max_disk_size: config.cache_persistent_buffer_max_disk_size(), + max_memory_size: config.cache_persistent_buffer_max_memory_size(), + }; + + service.spool_config = Some(spool_config); } + + Ok(service) + } + + /// Checks the provided path and gets the size of the file this path points to. + fn estimate_buffer_size(path: Option<PathBuf>) -> Result<u64, BufferError> { + path.and_then(|path| std::fs::metadata(path).ok()) + .map(|m| m.len()) + .ok_or(BufferError::DatabaseSizeError(Error::from( + ErrorKind::NotFound, + ))) + } + + /// Tries to save in-memory buffer to disk. + /// + /// It will spool to disk only if the persistent storage enabled in the configuration. + fn try_spool(&mut self) -> Result<(), BufferError> { + let Self { + spool_config, + ref mut buffer, + .. + } = self; + + // Buffer to disk only if the DB and config are provided. + if let Some(BufferSpoolConfig { + db, + max_disk_size, + max_memory_size, + }) = spool_config + { + // And if the count of in memory envelopes is over the defined max buffer size. + if self.count_mem_envelopes > *max_memory_size as i64 { + let estimated_db_size = + Self::estimate_buffer_size(self.config.cache_persistent_buffer_path())?; + + // Reject all the enqueue requests if we exceed the max size of the buffer. + if estimated_db_size as usize > *max_disk_size { + return Err(BufferError::Full(estimated_db_size)); + } + + let buf = std::mem::take(buffer); + let db = db.clone(); + self.count_mem_envelopes = 0; + + // spawn the task to enqueue the entire buffer + tokio::spawn(async move { + let mut query_builder: QueryBuilder<Sqlite> = QueryBuilder::new( + "INSERT INTO envelopes (own_key, sampling_key, envelope) ", + ); + + // Flatten all the envelopes + let envelopes = buf.into_iter().flat_map(|(k, vals)| { + vals.into_iter() + .map(move |v| (k, v.into_envelope().to_vec())) + }); + + // Since we have 3 variables we have to bind, we devide the SQLite limit by 3 + // here to prepare the chnunks which will be preparing the batch inserts. + let mut envelopes = + stream::iter(envelopes).chunks(SQLITE_LIMIT_VARIABLE_NUMBER / 3); + while let Some(chunk) = envelopes.next().await { + query_builder.push_values(chunk.into_iter(), |mut b, v| match v.1 { + Ok(envelope_bytes) => { + b.push_bind(v.0.own_key.to_string()) + .push_bind(v.0.sampling_key.to_string()) + .push_bind(envelope_bytes); + } + Err(err) => { + relay_log::error!( + "failed to serialize the envelope: {}", + LogError(&err) + ) + } + }); + + let query = query_builder.build(); + if let Err(err) = query.execute(&db).await { + relay_log::error!( + "failed to buffer envelopes to disk: {}", + LogError(&err) + ) + } + // Reset the builder to initial state set by `QueryBuilder::new` function, + // so it can be reused for another chunk. + query_builder.reset(); + } + }); + } + } + Ok(()) + } + + /// Extreacts the envelope from the `SqliteRow`. + /// + /// Reads the bytes and tries to perse them into `Envelope`. + fn extract_envelope(&self, row: SqliteRow) -> Result<ManagedEnvelope, BufferError> { + let envelope_row: Vec<u8> = row.try_get("envelope")?; + let envelope_bytes = bytes::Bytes::from(envelope_row); + let envelope = Envelope::parse_bytes(envelope_bytes)?; + let managed_envelope = self.buffer_guard.enter(envelope)?; + Ok(managed_envelope) } /// Handles the enqueueing messages into the internal buffer. - fn handle_enqueue(&mut self, message: Enqueue) { - self.buffer - .entry(message.key) - .or_default() - .push(message.value); + async fn handle_enqueue(&mut self, message: Enqueue) -> Result<(), BufferError> { + let Enqueue { + key, + value: managed_envelope, + } = message; + + self.try_spool()?; + + // save to the internal buffer + self.buffer.entry(key).or_default().push(managed_envelope); + self.count_mem_envelopes += 1; + + Ok(()) + } + + /// Tries to delete the envelops from persistent buffer in batches, extract and convert them to + /// managed envelopes and send to back into processing pipeline. + /// + /// If the error happens in the deletion/fetching phase, a key is returned to allow retrying later. + async fn fetch_and_delete( + &self, + db: &Pool<Sqlite>, + key: QueueKey, + sender: &mpsc::UnboundedSender<ManagedEnvelope>, + ) -> Result<(), QueueKey> { + loop { + let mut envelopes = sqlx::query( + "DELETE FROM envelopes WHERE id IN (SELECT id FROM envelopes WHERE own_key = ? AND sampling_key = ? LIMIT 100) RETURNING envelope", + ) + .bind(key.own_key.to_string()) + .bind(key.sampling_key.to_string()) + .fetch(db).peekable(); + + // Stream is empty, we can break the loop, since we read everything by now. + if Pin::new(&mut envelopes).peek().await.is_none() { + return Ok(()); + } + + while let Some(envelope) = envelopes.next().await { + let envelope = match envelope { + Ok(envelope) => envelope, + + // Bail if there are errors in the stream. + Err(err) => { + relay_log::error!( + "failed to read the buffer stream from the disk: {}", + LogError(&err) + ); + return Err(key); + } + }; + + match self.extract_envelope(envelope) { + Ok(managed_envelope) => { + sender.send(managed_envelope).ok(); + } + Err(err) => relay_log::error!( + "failed to extract envelope from the buffer: {}", + LogError(&err) + ), + } + } + } } /// Handles the dequeueing messages from the internal buffer. /// /// This method removes the envelopes from the buffer and stream them to the sender. - fn handle_dequeue(&mut self, message: DequeueMany) { - let DequeueMany { keys, sender } = message; - for key in keys { - for value in self.buffer.remove(&key).unwrap_or_default() { + async fn handle_dequeue(&mut self, message: DequeueMany) -> Result<(), BufferError> { + let DequeueMany { + project_key, + mut keys, + sender, + } = message; + + for key in &keys { + for value in self.buffer.remove(key).unwrap_or_default() { + self.count_mem_envelopes -= 1; sender.send(value).ok(); } } + + // Persistent buffer is configured, lets try to get data from the disk. + if let Some(BufferSpoolConfig { db, .. }) = &self.spool_config { + let mut unused_keys = BTreeSet::new(); + + while let Some(key) = keys.pop() { + // If the error with a key is returned we must save it for the next iterration. + if let Err(key) = self.fetch_and_delete(db, key, &sender).await { + unused_keys.insert(key); + } + } + if !unused_keys.is_empty() { + self.project_cache + .send(UpdateBufferIndex::new(project_key, unused_keys)) + } + } + + Ok(()) } /// Handles the remove request. /// - /// This remove all the envelopes from the internal buffer for the provided keys. - /// If any of the provided keys are still have the envelopes, the error will be logged with the + /// This removes all the envelopes from the internal buffer for the provided keys. + /// If any of the provided keys still have the envelopes, the error will be logged with the /// number of envelopes dropped for the specific project key. - fn handle_remove(&mut self, message: RemoveMany) { + async fn handle_remove(&mut self, message: RemoveMany) -> Result<(), BufferError> { let RemoveMany { project_key, keys } = message; - let mut count = 0; - for key in keys { - count += self.buffer.remove(&key).map_or(0, |k| k.len()); + let mut count: u64 = 0; + for key in &keys { + count += self.buffer.remove(key).map_or(0, |k| k.len() as u64); } + + if let Some(BufferSpoolConfig { db, .. }) = &self.spool_config { + for key in keys { + let result = + sqlx::query("DELETE FROM envelopes where own_key = ? AND sampling_key = ?") + .bind(key.own_key.to_string()) + .bind(key.sampling_key.to_string()) + .execute(db) + .await?; + + count += result.rows_affected(); + } + } + if count > 0 { relay_log::with_scope( |scope| scope.set_tag("project_key", project_key), || relay_log::error!("evicted project with {} envelopes", count), ); } + + Ok(()) } /// Handles all the incoming messages from the [`Buffer`] interface. - fn handle_message(&mut self, message: Buffer) { + async fn handle_message(&mut self, message: Buffer) -> Result<(), BufferError> { match message { - Buffer::Enqueue(message) => self.handle_enqueue(message), - Buffer::DequeueMany(message) => self.handle_dequeue(message), - Buffer::RemoveMany(message) => self.handle_remove(message), + Buffer::Enqueue(message) => self.handle_enqueue(message).await, + Buffer::DequeueMany(message) => self.handle_dequeue(message).await, + Buffer::RemoveMany(message) => self.handle_remove(message).await, } } } @@ -183,7 +506,9 @@ impl Service for BufferService { fn spawn_handler(mut self, mut rx: relay_system::Receiver<Self::Interface>) { tokio::spawn(async move { while let Some(message) = rx.recv().await { - self.handle_message(message); + if let Err(err) = self.handle_message(message).await { + relay_log::error!("failed to handle an incoming message: {}", LogError(&err)) + } } }); } @@ -192,8 +517,17 @@ impl Service for BufferService { impl Drop for BufferService { fn drop(&mut self) { let count: usize = self.buffer.values().map(|v| v.len()).sum(); + // We have envelopes in memory, try to buffer them to the disk. if count > 0 { - relay_log::error!("dropped queue with {} envelopes", count); + if let Err(err) = self.try_spool() { + relay_log::error!("failed to spool {} on shutdown: {}", count, LogError(&err)); + } + + // The buffer must be empty by now, report the error otherwise. + let count: usize = self.buffer.values().map(|v| v.len()).sum(); + if count > 0 { + relay_log::error!("dropped {} envelopes", count); + } } } } diff --git a/relay-server/src/actors/project_cache.rs b/relay-server/src/actors/project_cache.rs index 024dfdab9c1..0114cf23647 100644 --- a/relay-server/src/actors/project_cache.rs +++ b/relay-server/src/actors/project_cache.rs @@ -3,6 +3,7 @@ use std::sync::Arc; use relay_common::ProjectKey; use relay_config::{Config, RelayMode}; +use relay_log::LogError; use relay_metrics::{self, Aggregator, FlushBuckets, InsertMetrics, MergeBuckets}; use relay_quotas::RateLimits; use relay_redis::RedisPool; @@ -26,7 +27,7 @@ use crate::actors::upstream::UpstreamRelay; use crate::service::REGISTRY; use crate::statsd::{RelayCounters, RelayGauges, RelayHistograms, RelayTimers}; -use crate::utils::{self, GarbageDisposal, ManagedEnvelope}; +use crate::utils::{self, BufferGuard, GarbageDisposal, ManagedEnvelope}; /// Requests a refresh of a project state from one of the available sources. /// @@ -164,6 +165,22 @@ impl UpdateRateLimits { } } +/// Updates the buffer index for [`ProjectKey`] with the [`QueueKey`] keys. +/// +/// This message is sent from the project buffer in case of the error while fetching the data from +/// the persistent buffer, ensuring that we still have the index pointing to the keys, which could be found in the +/// persistent storage. +pub struct UpdateBufferIndex { + project_key: ProjectKey, + keys: BTreeSet<QueueKey>, +} + +impl UpdateBufferIndex { + pub fn new(project_key: ProjectKey, keys: BTreeSet<QueueKey>) -> Self { + Self { project_key, keys } + } +} + /// A cache for [`ProjectState`]s. /// /// The project maintains information about organizations, projects, and project keys along with @@ -192,6 +209,7 @@ pub enum ProjectCache { InsertMetrics(InsertMetrics), MergeBuckets(MergeBuckets), FlushBuckets(FlushBuckets), + UpdateBufferIndex(UpdateBufferIndex), } impl ProjectCache { @@ -202,6 +220,14 @@ impl ProjectCache { impl Interface for ProjectCache {} +impl FromMessage<UpdateBufferIndex> for ProjectCache { + type Response = relay_system::NoResponse; + + fn from_message(message: UpdateBufferIndex, _: ()) -> Self { + Self::UpdateBufferIndex(message) + } +} + impl FromMessage<RequestUpdate> for ProjectCache { type Response = relay_system::NoResponse; @@ -381,6 +407,7 @@ struct UpdateProjectState { /// Holds the addresses of all services required for [`ProjectCache`]. #[derive(Debug, Clone)] pub struct Services { + buffer: Arc<BufferGuard>, pub aggregator: Addr<Aggregator>, pub envelope_processor: Addr<EnvelopeProcessor>, pub envelope_manager: Addr<EnvelopeManager>, @@ -392,6 +419,7 @@ pub struct Services { impl Services { /// Creates new [`Services`] context. pub fn new( + buffer: Arc<BufferGuard>, aggregator: Addr<Aggregator>, envelope_processor: Addr<EnvelopeProcessor>, envelope_manager: Addr<EnvelopeManager>, @@ -400,6 +428,7 @@ impl Services { upstream_relay: Addr<UpstreamRelay>, ) -> Self { Self { + buffer, aggregator, envelope_processor, envelope_manager, @@ -437,7 +466,7 @@ impl ProjectCacheBroker { self.buffer.send(Enqueue::new(key, value)); } - /// Sends the message to [`BufferService`] to dequeue the envelopes. + /// Sends the message to the buffer service to dequeue the envelopes. /// /// All the found envelopes will be send back through the `buffer_tx` channel and dirrectly /// forwarded to `handle_processing`. @@ -473,8 +502,11 @@ impl ProjectCacheBroker { } if !result.is_empty() { - self.buffer - .send(DequeueMany::new(result, self.buffer_tx.clone())) + self.buffer.send(DequeueMany::new( + partial_key, + result, + self.buffer_tx.clone(), + )) } } @@ -529,7 +561,7 @@ impl ProjectCacheBroker { /// Updates the [`Project`] with received [`ProjectState`]. /// - /// If the project state is valid we also send the message to [`BufferService`] to dequeue the + /// If the project state is valid we also send the message to the buffer service to dequeue the /// envelopes for this project. fn merge_state(&mut self, message: UpdateProjectState) { let UpdateProjectState { @@ -739,6 +771,10 @@ impl ProjectCacheBroker { .flush_buckets(context, message.partition_key, message.buckets); } + fn handle_buffer_index(&mut self, message: UpdateBufferIndex) { + self.index.insert(message.project_key, message.keys); + } + fn handle_message(&mut self, message: ProjectCache) { match message { ProjectCache::RequestUpdate(message) => self.handle_request_update(message), @@ -754,6 +790,7 @@ impl ProjectCacheBroker { ProjectCache::InsertMetrics(message) => self.handle_insert_metrics(message), ProjectCache::MergeBuckets(message) => self.handle_merge_buckets(message), ProjectCache::FlushBuckets(message) => self.handle_flush_buckets(message), + ProjectCache::UpdateBufferIndex(message) => self.handle_buffer_index(message), } } } @@ -786,6 +823,8 @@ impl Service for ProjectCacheService { services, redis, } = self; + let buffer_guard = services.buffer.clone(); + let project_cache = services.project_cache.clone(); tokio::spawn(async move { let mut ticker = tokio::time::interval(config.cache_eviction_interval()); @@ -796,6 +835,16 @@ impl Service for ProjectCacheService { // Channel for envelope buffering. let (buffer_tx, mut buffer_rx) = mpsc::unbounded_channel(); + let buffer = + match BufferService::create(buffer_guard, project_cache, config.clone()).await { + Ok(buffer) => buffer.start(), + Err(err) => { + relay_log::error!("failed to start buffer service: {}", LogError(&err)); + // NOTE: The process will exit with error if the buffer file could not be + // opened or the migrations could not be run. + std::process::exit(1); + } + }; // Main broker that serializes public and internal messages, and triggers project state // fetches via the project source. @@ -807,8 +856,8 @@ impl Service for ProjectCacheService { services, state_tx, buffer_tx, - index: Default::default(), - buffer: BufferService::new().start(), + index: BTreeMap::new(), + buffer, }; loop { diff --git a/relay-server/src/service.rs b/relay-server/src/service.rs index 58dc5868264..58c7f69cb3b 100644 --- a/relay-server/src/service.rs +++ b/relay-server/src/service.rs @@ -165,6 +165,7 @@ impl ServiceState { // Keep all the services in one context. let project_cache_services = Services::new( + buffer.clone(), aggregator.clone(), processor.clone(), envelope_manager.clone(), diff --git a/relay-server/src/utils/buffer.rs b/relay-server/src/utils/buffer.rs index f8757c0ec69..6c3075587b9 100644 --- a/relay-server/src/utils/buffer.rs +++ b/relay-server/src/utils/buffer.rs @@ -47,7 +47,7 @@ impl BufferGuard { /// Reserves resources for processing an envelope in Relay. /// - /// Returns `Ok(EnvelopeContext)` on success, which internally holds a handle to the reserved + /// Returns `Ok(ManagedEnvelope)` on success, which internally holds a handle to the reserved /// resources. When the managed envelope is dropped, the slot is automatically reclaimed and can /// be reused by a subsequent call to `enter`. /// diff --git a/relay-server/src/utils/managed_envelope.rs b/relay-server/src/utils/managed_envelope.rs index fb6940ee4d1..b3bf00c717d 100644 --- a/relay-server/src/utils/managed_envelope.rs +++ b/relay-server/src/utils/managed_envelope.rs @@ -125,6 +125,16 @@ impl ManagedEnvelope { self.envelope.as_mut() } + /// Consumes itself returning the managed envelope. + /// + /// This also releases the slot with [`SemaphorePermit`] and sets the internal context + /// to done so there is no rejection issued once the [`ManagedEnvelope`] is consumed. + pub fn into_envelope(mut self) -> Box<Envelope> { + self.context.slot.take(); + self.context.done = true; + Box::new(self.envelope.take_items()) + } + /// Take the envelope out of the context and replace it with a dummy. /// /// Note that after taking out the envelope, the envelope summary is incorrect. diff --git a/tests/integration/test_projectconfigs.py b/tests/integration/test_projectconfigs.py index dc2e3f71fa2..9571a21b182 100644 --- a/tests/integration/test_projectconfigs.py +++ b/tests/integration/test_projectconfigs.py @@ -8,6 +8,8 @@ from requests.exceptions import HTTPError import queue from collections import namedtuple +import tempfile +import os from sentry_relay import PublicKey, SecretKey, generate_key_pair @@ -238,12 +240,30 @@ def get_response(relay, packed, signature): return data -def test_unparsable_project_config(mini_sentry, relay): +@pytest.mark.parametrize( + "buffer_config", + [False, True], +) +def test_unparsable_project_config(buffer_config, mini_sentry, relay): project_key = 42 relay_config = { - "cache": {"project_expiry": 2, "project_grace_period": 20, "miss_expiry": 2}, + "cache": { + "project_expiry": 2, + "project_grace_period": 20, + "miss_expiry": 2, + }, "http": {"max_retry_interval": 1}, } + + if buffer_config: + temp = tempfile.mkdtemp() + dbfile = os.path.join(temp, "buffer.db") + # set the buffer to something low to force the spooling + relay_config["cache"]["persistent_envelope_buffer"] = { + "path": dbfile, + "max_memory_size": 1, + } + relay = relay(mini_sentry, relay_config, wait_health_check=True) mini_sentry.add_full_project_config(project_key) public_key = mini_sentry.get_dsn_public_key(project_key) @@ -324,6 +344,9 @@ def test_unparsable_project_config(mini_sentry, relay): } ] + relay.send_event(project_key) + relay.send_event(project_key) + relay.send_event(project_key) # Wait for caches to expire. And we will get into the grace period. time.sleep(3) # The state should be fixed and updated by now, since we keep re-trying to fetch new one all the time.