diff --git a/Cargo.lock b/Cargo.lock index 1d5206d34b2..15f3faaa2ee 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -9,7 +9,7 @@ dependencies = [ "byteorder", "bytes 0.4.12", "env_logger 0.7.1", - "futures 0.3.1", + "futures 0.3.4", "integer-encoding", "log", "protobuf", @@ -67,9 +67,9 @@ dependencies = [ [[package]] name = "aho-corasick" -version = "0.7.6" +version = "0.7.9" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "58fb5e95d83b38284460a5fda7d6470aa0b8844d283a0b614b8535e880800d2d" +checksum = "d5e63fd144e18ba274ae7095c0197a870a7b9468abc801dd62f190d80817d2ec" dependencies = [ "memchr", ] @@ -103,14 +103,26 @@ checksum = "cff77d8686867eceff3105329d4698d96c2391c176d5d03adc90c7389162b5b8" [[package]] name = "assert-json-diff" -version = "1.0.1" +version = "1.0.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9881d306dee755eccf052d652b774a6b2861e86b4772f555262130e58e4f81d2" +checksum = "9c356497fd3417158bcb318266ac83c391219ca3a5fa659049f42e0041ab57d6" dependencies = [ + "extend", "serde", "serde_json", ] +[[package]] +name = "async-trait" +version = "0.1.24" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "750b1c38a1dfadd108da0f01c08f4cdc7ff1bb39b325f9c82cc972361780a6e1" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + [[package]] name = "atty" version = "0.2.14" @@ -136,9 +148,9 @@ checksum = "f8aac770f1885fd7e387acedd76065302551364496e46b3dd00860b2f8359b9d" [[package]] name = "backtrace" -version = "0.3.42" +version = "0.3.44" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b4b1549d804b6c73f4817df2ba073709e96e426f12987127c48e6745568c350b" +checksum = "e4036b9bf40f3cf16aba72a3d65e8a520fc4bafcdc7079aea8f848c58c5b5536" dependencies = [ "backtrace-sys", "cfg-if", @@ -256,9 +268,9 @@ checksum = "e3b5ca7a04898ad4bcd41c90c5285445ff5b791899bb1b0abdd2a2aa791211d7" [[package]] name = "byteorder" -version = "1.3.2" +version = "1.3.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a7c3dd8985a7111efc5c80b44e23ecdd8c007de8ade3b96595387e812b957cf5" +checksum = "08c48aae112d48ed9f069b33538ea9e3e90aa263cfa3d1c24309612b1f7472de" [[package]] name = "bytes" @@ -357,9 +369,9 @@ dependencies = [ [[package]] name = "colored" -version = "1.9.2" +version = "1.9.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8815e2ab78f3a59928fc32e141fbeece88320a240e43f47b2fd64ea3a88a5b3d" +checksum = "f4ffc801dacf156c5854b9df4f425a626539c3a6ef7893cc0c5084a23f0b6c59" dependencies = [ "atty", "lazy_static", @@ -436,54 +448,47 @@ dependencies = [ [[package]] name = "crossbeam-deque" -version = "0.7.2" +version = "0.7.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c3aa945d63861bfe624b55d153a39684da1e8c0bc8fba932f7ee3a3c16cea3ca" +checksum = "9f02af974daeee82218205558e51ec8768b48cf524bd01d550abe5573a608285" dependencies = [ "crossbeam-epoch", - "crossbeam-utils 0.7.0", + "crossbeam-utils", + "maybe-uninit", ] [[package]] name = "crossbeam-epoch" -version = "0.8.0" +version = "0.8.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5064ebdbf05ce3cb95e45c8b086f72263f4166b29b97f6baff7ef7fe047b55ac" +checksum = "058ed274caafc1f60c4997b5fc07bf7dc7cca454af7c6e81edffe5f33f70dace" dependencies = [ - "autocfg 0.1.7", + "autocfg 1.0.0", "cfg-if", - "crossbeam-utils 0.7.0", + "crossbeam-utils", "lazy_static", + "maybe-uninit", "memoffset", "scopeguard", ] [[package]] name = "crossbeam-queue" -version = "0.1.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7c979cd6cfe72335896575c6b5688da489e420d36a27a0b9eb0c73db574b4a4b" -dependencies = [ - "crossbeam-utils 0.6.6", -] - -[[package]] -name = "crossbeam-utils" -version = "0.6.6" +version = "0.2.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "04973fa96e96579258a5091af6003abde64af786b860f18622b82e026cca60e6" +checksum = "c695eeca1e7173472a32221542ae469b3e9aac3a4fc81f7696bcad82029493db" dependencies = [ "cfg-if", - "lazy_static", + "crossbeam-utils", ] [[package]] name = "crossbeam-utils" -version = "0.7.0" +version = "0.7.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ce446db02cdc3165b94ae73111e570793400d0794e46125cc4056c81cbb039f4" +checksum = "c3c7c73a2d1e9fc0886a08b93e98eb643461230d5f1925e4036204d5f2e261a8" dependencies = [ - "autocfg 0.1.7", + "autocfg 1.0.0", "cfg-if", "lazy_static", ] @@ -591,9 +596,9 @@ checksum = "77c90badedccf4105eca100756a0b1289e191f6fcbdadd3cee1d2f614f97da8f" [[package]] name = "dtoa" -version = "0.4.4" +version = "0.4.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ea57b42383d091c85abcc2706240b94ab2a8fa1fc81c10ff23c4de06e2a90b5e" +checksum = "4358a9e11b9a09cf52383b451b49a169e8d797b68aa02301ff586d70d9661ea3" [[package]] name = "either" @@ -638,11 +643,23 @@ dependencies = [ [[package]] name = "error-chain" -version = "0.12.1" +version = "0.12.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d371106cc88ffdfb1eabd7111e432da544f16f3e2d7bf1dfe8bf575f1df045cd" +dependencies = [ + "version_check", +] + +[[package]] +name = "extend" +version = "0.1.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3ab49e9dcb602294bc42f9a7dfc9bc6e936fca4418ea300dbfb84fe16de0b7d9" +checksum = "fe9db393664b0e6c6230a14115e7e798f80b70f54038dc21165db24c6b7f28fc" dependencies = [ - "version_check 0.1.5", + "proc-macro-error", + "proc-macro2", + "quote", + "syn", ] [[package]] @@ -736,9 +753,9 @@ checksum = "1b980f2816d6ee8673b6517b52cb0e808a180efc92e5c19d02cdda79066703ef" [[package]] name = "futures" -version = "0.3.1" +version = "0.3.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b6f16056ecbb57525ff698bb955162d0cd03bee84e6241c27ff75c08d8ca5987" +checksum = "5c329ae8753502fb44ae4fc2b622fa2a94652c41e795143765ba0927f92ab780" dependencies = [ "futures-channel", "futures-core", @@ -751,9 +768,9 @@ dependencies = [ [[package]] name = "futures-channel" -version = "0.3.1" +version = "0.3.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "fcae98ca17d102fd8a3603727b9259fcf7fa4239b603d2142926189bc8999b86" +checksum = "f0c77d04ce8edd9cb903932b608268b3fffec4163dc053b3b402bf47eac1f1a8" dependencies = [ "futures-core", "futures-sink", @@ -761,9 +778,9 @@ dependencies = [ [[package]] name = "futures-core" -version = "0.3.1" +version = "0.3.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "79564c427afefab1dfb3298535b21eda083ef7935b4f0ecbfcb121f0aec10866" +checksum = "f25592f769825e89b92358db00d26f965761e094951ac44d3663ef25b7ac464a" [[package]] name = "futures-cpupool" @@ -777,9 +794,9 @@ dependencies = [ [[package]] name = "futures-executor" -version = "0.3.1" +version = "0.3.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1e274736563f686a837a0568b478bdabfeaec2dca794b5649b04e2fe1627c231" +checksum = "f674f3e1bcb15b37284a90cedf55afdba482ab061c407a9c0ebbd0f3109741ba" dependencies = [ "futures-core", "futures-task", @@ -788,15 +805,15 @@ dependencies = [ [[package]] name = "futures-io" -version = "0.3.1" +version = "0.3.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e676577d229e70952ab25f3945795ba5b16d63ca794ca9d2c860e5595d20b5ff" +checksum = "a638959aa96152c7a4cddf50fcb1e3fede0583b27157c26e67d6f99904090dc6" [[package]] name = "futures-macro" -version = "0.3.1" +version = "0.3.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "52e7c56c15537adb4f76d0b7a76ad131cb4d2f4f32d3b0bcabcbe1c7c5e87764" +checksum = "9a5081aa3de1f7542a794a397cde100ed903b0630152d0973479018fd85423a7" dependencies = [ "proc-macro-hack", "proc-macro2", @@ -806,21 +823,21 @@ dependencies = [ [[package]] name = "futures-sink" -version = "0.3.1" +version = "0.3.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "171be33efae63c2d59e6dbba34186fe0d6394fb378069a76dfd80fdcffd43c16" +checksum = "3466821b4bc114d95b087b850a724c6f83115e929bc88f1fa98a3304a944c8a6" [[package]] name = "futures-task" -version = "0.3.1" +version = "0.3.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0bae52d6b29cf440e298856fec3965ee6fa71b06aa7495178615953fd669e5f9" +checksum = "7b0a34e53cf6cdcd0178aa573aed466b646eb3db769570841fda0c7ede375a27" [[package]] name = "futures-util" -version = "0.3.1" +version = "0.3.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c0d66274fb76985d3c62c886d1da7ac4c0903a8c9f754e8fe0f35a6a6cc39e76" +checksum = "22766cf25d64306bedf0384da004d05c9974ab104fcc4528f1236181c18004c5" dependencies = [ "futures-channel", "futures-core", @@ -907,7 +924,7 @@ dependencies = [ "addressing", "crypto", "directory-client", - "futures 0.3.1", + "futures 0.3.4", "itertools", "log", "mix-client", @@ -919,15 +936,15 @@ dependencies = [ "serde_derive", "sfw-provider-requests", "sphinx", - "tokio 0.2.10", + "tokio 0.2.12", "topology", ] [[package]] name = "hermit-abi" -version = "0.1.6" +version = "0.1.8" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "eff2656d88f158ce120947499e971d743c05dbcbed62e5bd2f38f1698bbc3772" +checksum = "1010591b26bbfe835e9faeabeb11866061cc7dcebffd56ad7d0942d0e61aefd8" dependencies = [ "libc", ] @@ -1068,9 +1085,9 @@ dependencies = [ [[package]] name = "indexmap" -version = "1.3.1" +version = "1.3.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0b54058f0a6ff80b6803da8faf8997cde53872b38f4023728f6830b06cd3c0dc" +checksum = "076f042c5b7b98f31d205f1249267e12a6518c1481e9dae9764af19b707d2292" dependencies = [ "autocfg 1.0.0", ] @@ -1086,9 +1103,12 @@ dependencies = [ [[package]] name = "integer-encoding" -version = "1.0.7" +version = "1.1.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1aec89c15e2cfa0f0eae8ca60e03cb10b30d25ea2c0ad7d6be60a95e32729994" +checksum = "553a280155f89ff1c798520ccf877d1bc9571314164e998f8a7b5fe2d84cb45c" +dependencies = [ + "async-trait", +] [[package]] name = "iovec" @@ -1116,9 +1136,9 @@ checksum = "b8b7a7c0c47db5545ed3fef7468ee7bb5b74691498139e4b3f6a20685dc6dd8e" [[package]] name = "jobserver" -version = "0.1.19" +version = "0.1.21" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "67b06c1b455f1cf4269a8cfc320ab930a810e2375a42af5075eb8a8b36405ce0" +checksum = "5c71313ebb9439f74b00d9d2dcec36440beaf57a6aa0623068441dd7cd81a7f2" dependencies = [ "libc", ] @@ -1147,9 +1167,9 @@ checksum = "e2abad23fbc42b3700f2f279844dc832adb2b2eb069b2df918f455c4e18cc646" [[package]] name = "libc" -version = "0.2.66" +version = "0.2.67" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d515b1f41455adea1313a4a2ac8a8a477634fbae63cc6100e3aebb207ce61558" +checksum = "eb147597cdf94ed43ab7a9038716637d2d1bf2bc571da995d0028dec06bd3018" [[package]] name = "libgit2-sys" @@ -1225,9 +1245,9 @@ checksum = "60302e4db3a61da70c0cb7991976248362f30319e88850c487b9b95bbf059e00" [[package]] name = "memchr" -version = "2.3.0" +version = "2.3.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3197e20c7edb283f87c071ddfc7a2cca8f8e0b888c242959846a6fce03c72223" +checksum = "3728d817d99e5ac407411fa471ff9800a778d88a24685968b36824eaf4bee400" [[package]] name = "memoffset" @@ -1256,9 +1276,9 @@ dependencies = [ [[package]] name = "miniz_oxide" -version = "0.3.5" +version = "0.3.6" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6f3f74f726ae935c3f514300cc6773a0c9492abc5e972d42ba0c0ebb88757625" +checksum = "aa679ff6578b1cddee93d7e82e263b94a575e0bfced07284eb0c037c1d2416a5" dependencies = [ "adler32", ] @@ -1337,15 +1357,15 @@ dependencies = [ "rand 0.7.3", "rand_distr", "sphinx", - "tokio 0.2.10", + "tokio 0.2.12", "topology", ] [[package]] name = "mockito" -version = "0.22.0" +version = "0.23.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4e524e85ea7c80559354217a6470c14abc2411802a9996aed1821559b9e28e3c" +checksum = "ae82e6bad452dd42b0f4437414eae3c8c27b958a55dc6c198e351042c4e3024e" dependencies = [ "assert-json-diff", "colored", @@ -1353,7 +1373,7 @@ dependencies = [ "httparse", "lazy_static", "log", - "percent-encoding 1.0.1", + "percent-encoding 2.1.0", "rand 0.7.3", "regex", "serde_json", @@ -1363,9 +1383,9 @@ dependencies = [ name = "multi-tcp-client" version = "0.1.0" dependencies = [ - "futures 0.3.1", + "futures 0.3.4", "log", - "tokio 0.2.10", + "tokio 0.2.12", ] [[package]] @@ -1440,7 +1460,7 @@ dependencies = [ "directory-client", "dirs", "dotenv", - "futures 0.3.1", + "futures 0.3.4", "healthcheck", "log", "mix-client", @@ -1454,7 +1474,7 @@ dependencies = [ "sfw-provider-requests", "sphinx", "tempfile", - "tokio 0.2.10", + "tokio 0.2.12", "tokio-tungstenite", "topology", ] @@ -1473,7 +1493,7 @@ dependencies = [ "directory-client", "dirs", "dotenv", - "futures 0.3.1", + "futures 0.3.4", "log", "multi-tcp-client", "pemstore", @@ -1481,7 +1501,7 @@ dependencies = [ "serde", "sphinx", "tempfile", - "tokio 0.2.10", + "tokio 0.2.12", ] [[package]] @@ -1497,7 +1517,7 @@ dependencies = [ "directory-client", "dirs", "dotenv", - "futures 0.3.1", + "futures 0.3.4", "hmac", "log", "pemstore", @@ -1509,7 +1529,7 @@ dependencies = [ "sha2", "sphinx", "tempfile", - "tokio 0.2.10", + "tokio 0.2.12", ] [[package]] @@ -1525,13 +1545,13 @@ dependencies = [ "directory-client", "dirs", "dotenv", - "futures 0.3.1", + "futures 0.3.4", "healthcheck", "log", "pretty_env_logger", "serde", "tempfile", - "tokio 0.2.10", + "tokio 0.2.12", "topology", ] @@ -1543,9 +1563,9 @@ checksum = "2839e79665f131bdb5782e51f2c6c9599c133c6098982a54c794358bf432529c" [[package]] name = "openssl" -version = "0.10.26" +version = "0.10.28" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3a3cc5799d98e1088141b8e01ff760112bbd9f19d850c124500566ca6901a585" +checksum = "973293749822d7dd6370d6da1e523b0d1db19f06c459134c658b2a4261378b52" dependencies = [ "bitflags", "cfg-if", @@ -1563,11 +1583,11 @@ checksum = "77af24da69f9d9341038eba93a073b1fdaaa1b788221b00a69bce9e762cb32de" [[package]] name = "openssl-sys" -version = "0.9.53" +version = "0.9.54" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "465d16ae7fc0e313318f7de5cecf57b2fbe7511fd213978b457e1c96ff46736f" +checksum = "1024c0a59774200a555087a6da3f253a9095a5f344e353b212ac4c8b8e450986" dependencies = [ - "autocfg 0.1.7", + "autocfg 1.0.0", "cc", "libc", "pkg-config", @@ -1635,9 +1655,9 @@ checksum = "d4fd5641d01c8f18a23da7b6fe29298ff4b55afcccdf78973b24cf3175fee32e" [[package]] name = "pest" -version = "2.1.2" +version = "2.1.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7e4fb201c5c22a55d8b24fef95f78be52738e5e1361129be1b5e862ecdb6894a" +checksum = "10f4872ae94d7b90ae48754df22fd42ad52ce740b8f370b03da4835417403e53" dependencies = [ "ucd-trie", ] @@ -1654,9 +1674,9 @@ dependencies = [ [[package]] name = "pest_generator" -version = "2.1.1" +version = "2.1.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7b9fcf299b5712d06ee128a556c94709aaa04512c4dffb8ead07c5c998447fc0" +checksum = "27e5277315f6b4f27e0e6744feb5d5ba1891e7164871033d3c8344c6783b349a" dependencies = [ "pest", "pest_meta", @@ -1667,9 +1687,9 @@ dependencies = [ [[package]] name = "pest_meta" -version = "2.1.2" +version = "2.1.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "df43fd99896fd72c485fe47542c7b500e4ac1e8700bf995544d1317a60ded547" +checksum = "54be6e404f5317079812fc8f9f5279de376d8856929e21c184ecf6bbd692a11d" dependencies = [ "maplit", "pest", @@ -1678,18 +1698,18 @@ dependencies = [ [[package]] name = "pin-project" -version = "0.4.7" +version = "0.4.8" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "75fca1c4ff21f60ca2d37b80d72b63dab823a9d19d3cda3a81d18bc03f0ba8c5" +checksum = "7804a463a8d9572f13453c516a5faea534a2403d7ced2f0c7e100eeff072772c" dependencies = [ "pin-project-internal", ] [[package]] name = "pin-project-internal" -version = "0.4.7" +version = "0.4.8" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6544cd4e4ecace61075a6ec78074beeef98d58aa9a3d07d053d993b2946a90d6" +checksum = "385322a45f2ecf3410c68d2a549a4a2685e8051d0f278e39743ff4e451cb9b3f" dependencies = [ "proc-macro2", "quote", @@ -1731,6 +1751,32 @@ dependencies = [ "log", ] +[[package]] +name = "proc-macro-error" +version = "0.4.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "052b3c9af39c7e5e94245f820530487d19eb285faedcb40e0c3275132293f242" +dependencies = [ + "proc-macro-error-attr", + "proc-macro2", + "quote", + "rustversion", + "syn", +] + +[[package]] +name = "proc-macro-error-attr" +version = "0.4.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d175bef481c7902e63e3165627123fff3502f06ac043d3ef42d08c1246da9253" +dependencies = [ + "proc-macro2", + "quote", + "rustversion", + "syn", + "syn-mid", +] + [[package]] name = "proc-macro-hack" version = "0.5.11" @@ -1750,9 +1796,9 @@ checksum = "369a6ed065f249a159e06c45752c780bda2fb53c995718f9e484d08daa9eb42e" [[package]] name = "proc-macro2" -version = "1.0.8" +version = "1.0.9" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3acb317c6ff86a4e579dfa00fc5e6cca91ecbb4e7eb2df0468805b674eb88548" +checksum = "6c09721c6781493a2a492a96b5a5bf19b65917fe6728884e7c44dd0c60ca3435" dependencies = [ "unicode-xid", ] @@ -1786,12 +1832,12 @@ dependencies = [ name = "provider-client" version = "0.1.0" dependencies = [ - "futures 0.3.1", + "futures 0.3.4", "log", "pretty_env_logger", "sfw-provider-requests", "sphinx", - "tokio 0.2.10", + "tokio 0.2.12", ] [[package]] @@ -2006,9 +2052,9 @@ dependencies = [ [[package]] name = "regex" -version = "1.3.3" +version = "1.3.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b5508c1941e4e7cb19965abef075d35a9a8b5cdf0846f30b4050e9b55dc55e87" +checksum = "322cf97724bea3ee221b78fe25ac9c46114ebb51747ad5babd51a2fc6a8235a8" dependencies = [ "aho-corasick", "memchr", @@ -2018,9 +2064,9 @@ dependencies = [ [[package]] name = "regex-syntax" -version = "0.6.13" +version = "0.6.14" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e734e891f5b408a29efbf8309e656876276f49ab6a6ac208600b4419bd893d90" +checksum = "b28dfe3fe9badec5dbf0a79a9cccad2cfc2ab5484bdb3e44cbd1ae8b3ba2be06" [[package]] name = "remove_dir_all" @@ -2074,7 +2120,7 @@ dependencies = [ "base64 0.11.0", "blake2b_simd", "constant_time_eq", - "crossbeam-utils 0.7.0", + "crossbeam-utils", ] [[package]] @@ -2092,6 +2138,17 @@ dependencies = [ "semver", ] +[[package]] +name = "rustversion" +version = "1.0.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b3bba175698996010c4f6dce5e7f173b6eb781fce25d2cfc45e27091ce0b79f6" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + [[package]] name = "ryu" version = "1.0.2" @@ -2100,9 +2157,9 @@ checksum = "bfa8506c1de11c9c4e4c38863ccbe02a305c8188e85a05a784c9e11e1c3910c8" [[package]] name = "schannel" -version = "0.1.16" +version = "0.1.17" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "87f550b06b6cba9c8b8be3ee73f391990116bf527450d2556e9b9ce263b9a021" +checksum = "507a9e6e8ffe0a4e0ebb9a10293e62fdf7657c06f1b8bb07a8fcf697d2abf295" dependencies = [ "lazy_static", "winapi 0.3.8", @@ -2110,9 +2167,9 @@ dependencies = [ [[package]] name = "scopeguard" -version = "1.0.0" +version = "1.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b42e15e59b18a828bbf5c58ea01debb36b9b096346de35d941dcb89009f24a0d" +checksum = "d29ab0c6d3fc0ee92fe66e2d99f700eab17a8d57d1c1d3b748380fb20baa78cd" [[package]] name = "security-framework" @@ -2172,9 +2229,9 @@ dependencies = [ [[package]] name = "serde_json" -version = "1.0.45" +version = "1.0.48" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "eab8f15f15d6c41a154c1b128a22f2dfabe350ef53c40953d84e36155c91192b" +checksum = "9371ade75d4c2d6cb154141b9752cf3781ec9c05e0e5cf35060e1e70ee7b9c25" dependencies = [ "itoa", "ryu", @@ -2252,9 +2309,9 @@ dependencies = [ [[package]] name = "smallvec" -version = "1.1.0" +version = "1.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "44e59e0c9fa00817912ae6e4e6e3c4fe04455e75699d06eedc7d85917ed8e8f4" +checksum = "5c2fb2ec9bcd216a5b0d0ccf31ab17b5ed1d627960edff65bbe95d3ce221cefc" [[package]] name = "socket2" @@ -2271,7 +2328,7 @@ dependencies = [ [[package]] name = "sphinx" version = "0.1.0" -source = "git+https://github.com/nymtech/sphinx?rev=4e51ed38a5de6fcb454c8903f1fe5e6b52720c77#4e51ed38a5de6fcb454c8903f1fe5e6b52720c77" +source = "git+https://github.com/nymtech/sphinx?rev=23f9c89b257ee0936e70afd682e9ed6a62e89eee#23f9c89b257ee0936e70afd682e9ed6a62e89eee" dependencies = [ "aes-ctr", "arrayref", @@ -2328,15 +2385,26 @@ checksum = "7c65d530b10ccaeac294f349038a597e435b18fb456aadd0840a623f83b9e941" [[package]] name = "syn" -version = "1.0.14" +version = "1.0.16" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "af6f3550d8dff9ef7dc34d384ac6f107e5d31c8f57d9f28e0081503f547ac8f5" +checksum = "123bd9499cfb380418d509322d7a6d52e5315f064fe4b3ad18a53d6b92c07859" dependencies = [ "proc-macro2", "quote", "unicode-xid", ] +[[package]] +name = "syn-mid" +version = "0.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7be3539f6c128a931cf19dcee741c1af532c7fd387baa739c03dd2e96479338a" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + [[package]] name = "synstructure" version = "0.12.3" @@ -2423,9 +2491,9 @@ dependencies = [ [[package]] name = "tokio" -version = "0.2.10" +version = "0.2.12" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c1fc73332507b971a5010664991a441b5ee0de92017f5a0e8b00fd684573045b" +checksum = "b34bee1facdc352fba10c9c58b654e6ecb6a2250167772bf86071f7c5f2f5061" dependencies = [ "bytes 0.5.4", "fnv", @@ -2458,9 +2526,9 @@ dependencies = [ [[package]] name = "tokio-codec" -version = "0.1.1" +version = "0.1.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5c501eceaf96f0e1793cf26beb63da3d11c738c4a943fdf3746d81d64684c39f" +checksum = "25b2998660ba0e70d18684de5d06b70b70a3a747469af9dea7618cc59e75976b" dependencies = [ "bytes 0.4.12", "futures 0.1.29", @@ -2469,9 +2537,9 @@ dependencies = [ [[package]] name = "tokio-current-thread" -version = "0.1.6" +version = "0.1.7" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d16217cad7f1b840c5a97dfb3c43b0c871fef423a6e8d2118c604e843662a443" +checksum = "b1de0e32a83f131e002238d7ccde18211c0a5397f60cbfffcb112868c2e0e20e" dependencies = [ "futures 0.1.29", "tokio-executor", @@ -2479,19 +2547,19 @@ dependencies = [ [[package]] name = "tokio-executor" -version = "0.1.9" +version = "0.1.10" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ca6df436c42b0c3330a82d855d2ef017cd793090ad550a6bc2184f4b933532ab" +checksum = "fb2d1b8f4548dbf5e1f7818512e9c406860678f29c300cdf0ebac72d1a3a1671" dependencies = [ - "crossbeam-utils 0.6.6", + "crossbeam-utils", "futures 0.1.29", ] [[package]] name = "tokio-io" -version = "0.1.12" +version = "0.1.13" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5090db468dad16e1a7a54c8c67280c5e4b544f3d3e018f0b913b400261f85926" +checksum = "57fc868aae093479e3131e3d165c93b1c7474109d13c90ec0dda2a1bbfff0674" dependencies = [ "bytes 0.4.12", "futures 0.1.29", @@ -2500,21 +2568,22 @@ dependencies = [ [[package]] name = "tokio-macros" -version = "0.2.3" +version = "0.2.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "50a61f268a3db2acee8dcab514efc813dc6dbe8a00e86076f935f94304b59a7a" +checksum = "f0c3acc6aa564495a0f2e1d59fab677cd7f81a19994cfc7f3ad0e64301560389" dependencies = [ + "proc-macro2", "quote", "syn", ] [[package]] name = "tokio-reactor" -version = "0.1.11" +version = "0.1.12" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6732fe6b53c8d11178dcb77ac6d9682af27fc6d4cb87789449152e5377377146" +checksum = "09bc590ec4ba8ba87652da2068d150dcada2cfa2e07faae270a5e0409aa51351" dependencies = [ - "crossbeam-utils 0.6.6", + "crossbeam-utils", "futures 0.1.29", "lazy_static", "log", @@ -2529,9 +2598,9 @@ dependencies = [ [[package]] name = "tokio-sync" -version = "0.1.7" +version = "0.1.8" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d06554cce1ae4a50f42fba8023918afa931413aded705b560e29600ccf7c6d76" +checksum = "edfe50152bc8164fcc456dab7891fa9bf8beaf01c5ee7e1dd43a397c3cf87dee" dependencies = [ "fnv", "futures 0.1.29", @@ -2539,9 +2608,9 @@ dependencies = [ [[package]] name = "tokio-tcp" -version = "0.1.3" +version = "0.1.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1d14b10654be682ac43efee27401d792507e30fd8d26389e1da3b185de2e4119" +checksum = "98df18ed66e3b72e742f185882a9e201892407957e45fbff8da17ae7a7c51f72" dependencies = [ "bytes 0.4.12", "futures 0.1.29", @@ -2553,13 +2622,13 @@ dependencies = [ [[package]] name = "tokio-threadpool" -version = "0.1.17" +version = "0.1.18" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f0c32ffea4827978e9aa392d2f743d973c1dfa3730a2ed3f22ce1e6984da848c" +checksum = "df720b6581784c118f0eb4310796b12b1d242a7eb95f716a8367855325c25f89" dependencies = [ "crossbeam-deque", "crossbeam-queue", - "crossbeam-utils 0.6.6", + "crossbeam-utils", "futures 0.1.29", "lazy_static", "log", @@ -2570,11 +2639,11 @@ dependencies = [ [[package]] name = "tokio-timer" -version = "0.2.12" +version = "0.2.13" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1739638e364e558128461fc1ad84d997702c8e31c2e6b18fb99842268199e827" +checksum = "93044f2d313c95ff1cb7809ce9a7a05735b012288a888b62d4434fd58c94f296" dependencies = [ - "crossbeam-utils 0.6.6", + "crossbeam-utils", "futures 0.1.29", "slab", "tokio-executor", @@ -2586,10 +2655,10 @@ version = "0.10.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b8b8fe88007ebc363512449868d7da4389c9400072a3f666f212c7280082882a" dependencies = [ - "futures 0.3.1", + "futures 0.3.4", "log", "pin-project", - "tokio 0.2.10", + "tokio 0.2.12", "tungstenite", ] @@ -2669,7 +2738,7 @@ version = "2.6.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "50f37be617794602aabbeee0be4f259dc1778fabe05e2d67ee8f79326d5cb4f6" dependencies = [ - "version_check 0.9.1", + "version_check", ] [[package]] @@ -2687,7 +2756,7 @@ version = "0.1.12" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "5479532badd04e128284890390c1e876ef7a993d0570b3597ae43dfa1d59afa4" dependencies = [ - "smallvec 1.1.0", + "smallvec 1.2.0", ] [[package]] @@ -2766,12 +2835,6 @@ dependencies = [ "semver", ] -[[package]] -name = "version_check" -version = "0.1.5" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "914b1a6776c4c929a602fafd8bc742e06365d4bcbe48c30f9cca5824f70dc9dd" - [[package]] name = "version_check" version = "0.9.1" diff --git a/common/clients/directory-client/Cargo.toml b/common/clients/directory-client/Cargo.toml index a9e30636e00..a38459dcd2f 100644 --- a/common/clients/directory-client/Cargo.toml +++ b/common/clients/directory-client/Cargo.toml @@ -16,4 +16,4 @@ serde = { version = "1.0.104", features = ["derive"] } topology = {path = "../../topology"} [dev-dependencies] -mockito = "0.22.0" +mockito = "0.23.0" diff --git a/common/clients/mix-client/Cargo.toml b/common/clients/mix-client/Cargo.toml index b269af2d022..40ddb9306d1 100644 --- a/common/clients/mix-client/Cargo.toml +++ b/common/clients/mix-client/Cargo.toml @@ -18,6 +18,6 @@ addressing = {path = "../../addressing"} topology = {path = "../../topology"} ## will be moved to proper dependencies once released -sphinx = { git = "https://github.com/nymtech/sphinx", rev="4e51ed38a5de6fcb454c8903f1fe5e6b52720c77" } +sphinx = { git = "https://github.com/nymtech/sphinx", rev="23f9c89b257ee0936e70afd682e9ed6a62e89eee" } # sphinx = { path = "../../../../sphinx"} diff --git a/common/clients/provider-client/Cargo.toml b/common/clients/provider-client/Cargo.toml index 7897a94c7ef..f5ecafe9cc4 100644 --- a/common/clients/provider-client/Cargo.toml +++ b/common/clients/provider-client/Cargo.toml @@ -16,4 +16,4 @@ tokio = { version = "0.2", features = ["full"] } sfw-provider-requests = { path = "../../../sfw-provider/sfw-provider-requests" } ## will be moved to proper dependencies once released -sphinx = { git = "https://github.com/nymtech/sphinx", rev="4e51ed38a5de6fcb454c8903f1fe5e6b52720c77" } +sphinx = { git = "https://github.com/nymtech/sphinx", rev="23f9c89b257ee0936e70afd682e9ed6a62e89eee" } diff --git a/common/clients/provider-client/src/lib.rs b/common/clients/provider-client/src/lib.rs index 72bca9c682d..7191941a93e 100644 --- a/common/clients/provider-client/src/lib.rs +++ b/common/clients/provider-client/src/lib.rs @@ -89,7 +89,7 @@ impl ProviderClient { } }; - let pull_request = PullRequest::new(self.our_address, auth_token); + let pull_request = PullRequest::new(self.our_address.clone(), auth_token); let bytes = pull_request.to_bytes(); let response = self.send_request(bytes).await?; @@ -103,7 +103,7 @@ impl ProviderClient { return Err(ProviderClientError::ClientAlreadyRegisteredError); } - let register_request = RegisterRequest::new(self.our_address); + let register_request = RegisterRequest::new(self.our_address.clone()); let bytes = register_request.to_bytes(); let response = self.send_request(bytes).await?; diff --git a/common/crypto/Cargo.toml b/common/crypto/Cargo.toml index 7044e14cc95..f2a29d0ec2d 100644 --- a/common/crypto/Cargo.toml +++ b/common/crypto/Cargo.toml @@ -15,4 +15,4 @@ rand = "0.7.2" rand_os = "0.1" ## will be moved to proper dependencies once released -sphinx = { git = "https://github.com/nymtech/sphinx", rev="4e51ed38a5de6fcb454c8903f1fe5e6b52720c77" } \ No newline at end of file +sphinx = { git = "https://github.com/nymtech/sphinx", rev="23f9c89b257ee0936e70afd682e9ed6a62e89eee" } \ No newline at end of file diff --git a/common/crypto/src/identity/mod.rs b/common/crypto/src/identity/mod.rs index 8f627e14aac..192b4911df9 100644 --- a/common/crypto/src/identity/mod.rs +++ b/common/crypto/src/identity/mod.rs @@ -68,7 +68,7 @@ impl MixIdentityPublicKey { let public_key_bytes = self.to_bytes(); temporary_address.copy_from_slice(&public_key_bytes[..]); - temporary_address + DestinationAddressBytes::from_bytes(temporary_address) } pub fn to_bytes(&self) -> Vec { diff --git a/common/healthcheck/Cargo.toml b/common/healthcheck/Cargo.toml index 0eb6b3c147d..b8f72127927 100644 --- a/common/healthcheck/Cargo.toml +++ b/common/healthcheck/Cargo.toml @@ -27,7 +27,7 @@ sfw-provider-requests = { path = "../../sfw-provider/sfw-provider-requests" } topology = {path = "../topology" } ## will be moved to proper dependencies once released -sphinx = { git = "https://github.com/nymtech/sphinx", rev="4e51ed38a5de6fcb454c8903f1fe5e6b52720c77" } +sphinx = { git = "https://github.com/nymtech/sphinx", rev="23f9c89b257ee0936e70afd682e9ed6a62e89eee" } # sphinx = { path = "../../../sphinx"} [dev-dependencies] diff --git a/common/healthcheck/src/path_check.rs b/common/healthcheck/src/path_check.rs index 3a64043706e..3833dd5c8ec 100644 --- a/common/healthcheck/src/path_check.rs +++ b/common/healthcheck/src/path_check.rs @@ -37,7 +37,8 @@ impl PathChecker { let address = identity_keys.public_key().derive_address(); for provider in providers { - let mut provider_client = ProviderClient::new(provider.client_listener, address, None); + let mut provider_client = + ProviderClient::new(provider.client_listener, address.clone(), None); let insertion_result = match provider_client.register().await { Ok(token) => { debug!("registered at provider {}", provider.pub_key); diff --git a/common/topology/Cargo.toml b/common/topology/Cargo.toml index c3ee8e72e1f..b414460c823 100644 --- a/common/topology/Cargo.toml +++ b/common/topology/Cargo.toml @@ -19,5 +19,5 @@ addressing = {path = "../addressing"} version-checker = {path = "../version-checker" } ## will be moved to proper dependencies once released -sphinx = { git = "https://github.com/nymtech/sphinx", rev="4e51ed38a5de6fcb454c8903f1fe5e6b52720c77" } +sphinx = { git = "https://github.com/nymtech/sphinx", rev="23f9c89b257ee0936e70afd682e9ed6a62e89eee" } # sphinx = { path = "../../../sphinx"} diff --git a/mixnode/Cargo.toml b/mixnode/Cargo.toml index 277138b2ad9..e7ec30adbaf 100644 --- a/mixnode/Cargo.toml +++ b/mixnode/Cargo.toml @@ -28,7 +28,7 @@ multi-tcp-client = { path = "../common/clients/multi-tcp-client" } pemstore = {path = "../common/pemstore"} ## will be moved to proper dependencies once released -sphinx = { git = "https://github.com/nymtech/sphinx", rev="4e51ed38a5de6fcb454c8903f1fe5e6b52720c77" } +sphinx = { git = "https://github.com/nymtech/sphinx", rev="23f9c89b257ee0936e70afd682e9ed6a62e89eee" } [build-dependencies] built = "0.3.2" diff --git a/mixnode/src/node/metrics.rs b/mixnode/src/node/metrics.rs index 3bb5b97c820..cf8e52e9428 100644 --- a/mixnode/src/node/metrics.rs +++ b/mixnode/src/node/metrics.rs @@ -17,6 +17,8 @@ pub(crate) enum MetricEvent { } #[derive(Debug, Clone)] +// Note: you should NEVER create more than a single instance of this using 'new()'. +// You should always use .clone() to create additional instances struct MixMetrics { inner: Arc>, } diff --git a/mixnode/src/node/mod.rs b/mixnode/src/node/mod.rs index 35167905df8..100ac7d1666 100644 --- a/mixnode/src/node/mod.rs +++ b/mixnode/src/node/mod.rs @@ -43,7 +43,7 @@ impl MixNode { } } - pub fn start_presence_notifier(&self) { + fn start_presence_notifier(&self) { info!("Starting presence notifier..."); let notifier_config = presence::NotifierConfig::new( self.config.get_presence_directory_server(), @@ -55,7 +55,7 @@ impl MixNode { presence::Notifier::new(notifier_config).start(self.runtime.handle()); } - pub fn start_metrics_reporter(&self) -> metrics::MetricsReporter { + fn start_metrics_reporter(&self) -> metrics::MetricsReporter { info!("Starting metrics reporter..."); metrics::MetricsController::new( self.config.get_metrics_directory_server(), @@ -65,7 +65,7 @@ impl MixNode { .start(self.runtime.handle()) } - pub fn start_socket_listener( + fn start_socket_listener( &self, metrics_reporter: metrics::MetricsReporter, forwarding_channel: mpsc::UnboundedSender<(SocketAddr, Vec)>, @@ -84,7 +84,7 @@ impl MixNode { ); } - pub fn start_packet_forwarder(&mut self) -> mpsc::UnboundedSender<(SocketAddr, Vec)> { + fn start_packet_forwarder(&mut self) -> mpsc::UnboundedSender<(SocketAddr, Vec)> { info!("Starting packet forwarder..."); // this can later be replaced with topology information diff --git a/nym-client/Cargo.toml b/nym-client/Cargo.toml index 4d32bdba48c..87ba8de7330 100644 --- a/nym-client/Cargo.toml +++ b/nym-client/Cargo.toml @@ -40,7 +40,7 @@ sfw-provider-requests = { path = "../sfw-provider/sfw-provider-requests" } topology = {path = "../common/topology" } ## will be moved to proper dependencies once released -sphinx = { git = "https://github.com/nymtech/sphinx", rev="4e51ed38a5de6fcb454c8903f1fe5e6b52720c77" } +sphinx = { git = "https://github.com/nymtech/sphinx", rev="23f9c89b257ee0936e70afd682e9ed6a62e89eee" } # sphinx = { path = "../../sphinx"} [build-dependencies] diff --git a/nym-client/src/client/cover_traffic_stream.rs b/nym-client/src/client/cover_traffic_stream.rs index ac012a9a07a..d23ca59747e 100644 --- a/nym-client/src/client/cover_traffic_stream.rs +++ b/nym-client/src/client/cover_traffic_stream.rs @@ -29,7 +29,7 @@ pub(crate) async fn start_loop_cover_traffic_stream( }; let cover_message = match mix_client::packet::loop_cover_message( - our_info.address, + our_info.address.clone(), our_info.identifier, topology, average_packet_delay_duration, diff --git a/nym-client/src/client/mod.rs b/nym-client/src/client/mod.rs index e950d806ff5..e8783f44b45 100644 --- a/nym-client/src/client/mod.rs +++ b/nym-client/src/client/mod.rs @@ -119,7 +119,7 @@ impl NymClient { let mut provider_poller = provider_poller::ProviderPoller::new( poller_input_tx, provider_client_listener_address, - self_address, + self_address.clone(), self.config .get_provider_auth_token() .map(|str_token| AuthToken::try_from_base58_string(str_token).ok()) @@ -149,7 +149,7 @@ impl NymClient { let loop_cover_traffic_future = rt.spawn(cover_traffic_stream::start_loop_cover_traffic_stream( mix_tx.clone(), - Destination::new(self_address, Default::default()), + Destination::new(self_address.clone(), Default::default()), topology_controller.get_inner_ref(), self.config.get_loop_cover_traffic_average_delay(), self.config.get_average_packet_delay(), @@ -160,7 +160,7 @@ impl NymClient { let topology_ref = topology_controller.get_inner_ref(); let average_packet_delay = self.config.get_average_packet_delay(); let message_sending_average_delay = self.config.get_message_sending_average_delay(); - + let self_address_clone = self_address.clone(); // future constantly pumping traffic at some specified average rate // if a real message is available on 'input_rx' that might have been received from say // the websocket, the real message is used, otherwise a loop cover message is generated @@ -169,7 +169,7 @@ impl NymClient { real_traffic_stream::OutQueueControl::new( mix_tx, input_rx, - Destination::new(self_address, Default::default()), + Destination::new(self_address_clone, Default::default()), topology_ref, average_packet_delay, message_sending_average_delay, diff --git a/nym-client/src/client/real_traffic_stream.rs b/nym-client/src/client/real_traffic_stream.rs index 5da904a7fe4..0fb9a198fe7 100644 --- a/nym-client/src/client/real_traffic_stream.rs +++ b/nym-client/src/client/real_traffic_stream.rs @@ -102,7 +102,7 @@ impl OutQueueControl { let next_packet = match next_message { StreamMessage::Cover => mix_client::packet::loop_cover_message( - self.our_info.address, + self.our_info.address.clone(), self.our_info.identifier, topology, self.average_packet_delay, diff --git a/nym-client/src/commands/init.rs b/nym-client/src/commands/init.rs index c9f47790c4e..586b63df00f 100644 --- a/nym-client/src/commands/init.rs +++ b/nym-client/src/commands/init.rs @@ -49,8 +49,11 @@ async fn try_provider_registrations( ) -> Option<(String, AuthToken)> { // since the order of providers is non-deterministic we can just try to get a first 'working' provider for provider in providers { - let provider_client = - provider_client::ProviderClient::new(provider.client_listener, our_address, None); + let provider_client = provider_client::ProviderClient::new( + provider.client_listener, + our_address.clone(), + None, + ); let auth_token = provider_client.register().await; if let Ok(token) = auth_token { return Some((provider.pub_key, token)); diff --git a/nym-client/src/sockets/tcp.rs b/nym-client/src/sockets/tcp.rs index ad6d7ec463a..c8b65c0df4c 100644 --- a/nym-client/src/sockets/tcp.rs +++ b/nym-client/src/sockets/tcp.rs @@ -84,7 +84,7 @@ fn parse_send_request(data: &[u8]) -> Result { Ok(ClientRequest::Send { message, - recipient_address, + recipient_address: DestinationAddressBytes::from_bytes(recipient_address), }) } @@ -153,7 +153,7 @@ impl ClientRequest { async fn handle_own_details(self_address_bytes: DestinationAddressBytes) -> ServerResponse { ServerResponse::OwnDetails { - address: self_address_bytes.to_vec(), + address: self_address_bytes.to_bytes().to_vec(), } } } @@ -274,7 +274,7 @@ async fn accept_connection( topology: topology.clone(), msg_input: msg_input.clone(), msg_query: msg_query.clone(), - self_address, + self_address: self_address.clone(), }; match handle_connection(&buf[..n], request_handling_data).await { Ok(res) => res, @@ -313,7 +313,7 @@ pub async fn start_tcpsocket( stream, message_tx.clone(), received_messages_query_tx.clone(), - self_address, + self_address.clone(), topology.clone(), )); } diff --git a/nym-client/src/sockets/ws.rs b/nym-client/src/sockets/ws.rs index c9ff1ab2bea..845c4971c97 100644 --- a/nym-client/src/sockets/ws.rs +++ b/nym-client/src/sockets/ws.rs @@ -50,7 +50,9 @@ impl Connection { } ClientRequest::Fetch => ClientRequest::handle_fetch(self.msg_query.clone()).await, ClientRequest::GetClients => ClientRequest::handle_get_clients(&self.topology).await, - ClientRequest::OwnDetails => ClientRequest::handle_own_details(self.self_address).await, + ClientRequest::OwnDetails => { + ClientRequest::handle_own_details(self.self_address.clone()).await + } } } @@ -218,7 +220,10 @@ impl ClientRequest { let dummy_surb = [0; 16]; - let input_msg = InputMessage(Destination::new(address, dummy_surb), message_bytes); + let input_msg = InputMessage( + Destination::new(DestinationAddressBytes::from_bytes(address), dummy_surb), + message_bytes, + ); input_tx.send(input_msg).await.unwrap(); ServerResponse::Send @@ -279,7 +284,7 @@ impl ClientRequest { } async fn handle_own_details(self_address_bytes: DestinationAddressBytes) -> ServerResponse { - let self_address = bs58::encode(&self_address_bytes).into_string(); + let self_address = self_address_bytes.to_base58_string(); ServerResponse::OwnDetails { address: self_address, } @@ -404,7 +409,7 @@ pub async fn start_websocket( stream, message_tx.clone(), received_messages_query_tx.clone(), - self_address, + self_address.clone(), topology.clone(), )); } diff --git a/sfw-provider/Cargo.toml b/sfw-provider/Cargo.toml index 62b3f15bef7..f32a530a818 100644 --- a/sfw-provider/Cargo.toml +++ b/sfw-provider/Cargo.toml @@ -31,7 +31,7 @@ pemstore = {path = "../common/pemstore"} sfw-provider-requests = { path = "./sfw-provider-requests" } ## will be moved to proper dependencies once released -sphinx = { git = "https://github.com/nymtech/sphinx", rev="4e51ed38a5de6fcb454c8903f1fe5e6b52720c77" } +sphinx = { git = "https://github.com/nymtech/sphinx", rev="23f9c89b257ee0936e70afd682e9ed6a62e89eee" } [build-dependencies] built = "0.3.2" diff --git a/sfw-provider/sfw-provider-requests/Cargo.toml b/sfw-provider/sfw-provider-requests/Cargo.toml index 9d0cb7e241b..a34bae654e4 100644 --- a/sfw-provider/sfw-provider-requests/Cargo.toml +++ b/sfw-provider/sfw-provider-requests/Cargo.toml @@ -8,4 +8,4 @@ edition = "2018" [dependencies] bs58 = "0.3.0" -sphinx = { git = "https://github.com/nymtech/sphinx", rev="4e51ed38a5de6fcb454c8903f1fe5e6b52720c77" } +sphinx = { git = "https://github.com/nymtech/sphinx", rev="23f9c89b257ee0936e70afd682e9ed6a62e89eee" } diff --git a/sfw-provider/sfw-provider-requests/src/lib.rs b/sfw-provider/sfw-provider-requests/src/lib.rs index 6d307aaf425..7a4656e4172 100644 --- a/sfw-provider/sfw-provider-requests/src/lib.rs +++ b/sfw-provider/sfw-provider-requests/src/lib.rs @@ -6,7 +6,7 @@ pub const DUMMY_MESSAGE_CONTENT: &[u8] = // To be renamed to 'AuthToken' once it is safe to replace it #[derive(Debug, PartialEq, Eq, Hash, Clone)] -pub struct AuthToken(pub [u8; 32]); +pub struct AuthToken([u8; 32]); #[derive(Debug)] pub enum AuthTokenConversionError { @@ -15,6 +15,18 @@ pub enum AuthTokenConversionError { } impl AuthToken { + pub fn from_bytes(bytes: [u8; 32]) -> Self { + AuthToken(bytes) + } + + pub fn to_bytes(&self) -> [u8; 32] { + self.0 + } + + pub fn as_bytes(&self) -> &[u8] { + &self.0 + } + pub fn try_from_base58_string>( val: S, ) -> Result { diff --git a/sfw-provider/sfw-provider-requests/src/requests.rs b/sfw-provider/sfw-provider-requests/src/requests.rs index c8a15f44dc4..06ab0a87414 100644 --- a/sfw-provider/sfw-provider-requests/src/requests.rs +++ b/sfw-provider/sfw-provider-requests/src/requests.rs @@ -79,7 +79,7 @@ impl ProviderRequest for PullRequest { Self::get_prefix() .to_vec() .into_iter() - .chain(self.destination_address.iter().cloned()) + .chain(self.destination_address.to_bytes().iter().cloned()) .chain(self.auth_token.0.iter().cloned()) .collect() } @@ -102,8 +102,8 @@ impl ProviderRequest for PullRequest { auth_token.copy_from_slice(&bytes[34..]); Ok(PullRequest { - auth_token: AuthToken(auth_token), - destination_address, + auth_token: AuthToken::from_bytes(auth_token), + destination_address: DestinationAddressBytes::from_bytes(destination_address), }) } } @@ -130,7 +130,7 @@ impl ProviderRequest for RegisterRequest { Self::get_prefix() .to_vec() .into_iter() - .chain(self.destination_address.iter().cloned()) + .chain(self.destination_address.to_bytes().iter().cloned()) .collect() } @@ -149,7 +149,7 @@ impl ProviderRequest for RegisterRequest { destination_address.copy_from_slice(&bytes[2..]); Ok(RegisterRequest { - destination_address, + destination_address: DestinationAddressBytes::from_bytes(destination_address), }) } } @@ -160,12 +160,12 @@ mod creating_pull_request { #[test] fn it_is_possible_to_recover_it_from_bytes() { - let address = [ + let address = DestinationAddressBytes::from_bytes([ 1, 2, 3, 4, 5, 6, 7, 8, 9, 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 0, 1, 2, - ]; + ]); let auth_token = [1u8; 32]; - let pull_request = PullRequest::new(address, AuthToken(auth_token)); + let pull_request = PullRequest::new(address.clone(), AuthToken(auth_token)); let bytes = pull_request.to_bytes(); let recovered = PullRequest::from_bytes(&bytes).unwrap(); @@ -175,12 +175,12 @@ mod creating_pull_request { #[test] fn it_is_possible_to_recover_it_from_bytes_with_enum_wrapper() { - let address = [ + let address = DestinationAddressBytes::from_bytes([ 1, 2, 3, 4, 5, 6, 7, 8, 9, 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 0, 1, 2, - ]; + ]); let auth_token = [1u8; 32]; - let pull_request = PullRequest::new(address, AuthToken(auth_token)); + let pull_request = PullRequest::new(address.clone(), AuthToken(auth_token)); let bytes = pull_request.to_bytes(); let recovered = ProviderRequests::from_bytes(&bytes).unwrap(); @@ -200,11 +200,11 @@ mod creating_register_request { #[test] fn it_is_possible_to_recover_it_from_bytes() { - let address = [ + let address = DestinationAddressBytes::from_bytes([ 1, 2, 3, 4, 5, 6, 7, 8, 9, 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 0, 1, 2, - ]; - let register_request = RegisterRequest::new(address); + ]); + let register_request = RegisterRequest::new(address.clone()); let bytes = register_request.to_bytes(); let recovered = RegisterRequest::from_bytes(&bytes).unwrap(); @@ -213,11 +213,11 @@ mod creating_register_request { #[test] fn it_is_possible_to_recover_it_from_bytes_with_enum_wrapper() { - let address = [ + let address = DestinationAddressBytes::from_bytes([ 1, 2, 3, 4, 5, 6, 7, 8, 9, 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 0, 1, 2, - ]; - let register_request = RegisterRequest::new(address); + ]); + let register_request = RegisterRequest::new(address.clone()); let bytes = register_request.to_bytes(); let recovered = ProviderRequests::from_bytes(&bytes).unwrap(); diff --git a/sfw-provider/sfw-provider-requests/src/responses.rs b/sfw-provider/sfw-provider-requests/src/responses.rs index 4135ed06e48..47163ef0225 100644 --- a/sfw-provider/sfw-provider-requests/src/responses.rs +++ b/sfw-provider/sfw-provider-requests/src/responses.rs @@ -26,6 +26,10 @@ pub struct RegisterResponse { pub auth_token: AuthToken, } +pub struct ErrorResponse { + pub message: String, +} + impl PullResponse { pub fn new(messages: Vec>) -> Self { PullResponse { messages } @@ -38,6 +42,14 @@ impl RegisterResponse { } } +impl ErrorResponse { + pub fn new>(message: S) -> Self { + ErrorResponse { + message: message.into(), + } + } +} + // TODO: This should go into some kind of utils module/crate fn read_be_u16(input: &mut &[u8]) -> u16 { let (int_bytes, rest) = input.split_at(std::mem::size_of::()); @@ -126,6 +138,19 @@ impl ProviderResponse for RegisterResponse { } } +impl ProviderResponse for ErrorResponse { + fn to_bytes(&self) -> Vec { + self.message.clone().into_bytes() + } + + fn from_bytes(bytes: &[u8]) -> Result { + match String::from_utf8(bytes.to_vec()) { + Err(_) => Err(ProviderResponseError::UnmarshalError), + Ok(message) => Ok(ErrorResponse { message }), + } + } +} + #[cfg(test)] mod creating_pull_response { use super::*; diff --git a/sfw-provider/src/commands/run.rs b/sfw-provider/src/commands/run.rs index cabb7bcfd16..123398e6db4 100644 --- a/sfw-provider/src/commands/run.rs +++ b/sfw-provider/src/commands/run.rs @@ -157,6 +157,5 @@ pub fn execute(matches: &ArgMatches) { config.get_clients_ledger_path() ); - let provider = ServiceProvider::new(config); - provider.start().unwrap() + ServiceProvider::new(config).run(); } diff --git a/sfw-provider/src/provider/client_handling/ledger.rs b/sfw-provider/src/provider/client_handling/ledger.rs new file mode 100644 index 00000000000..a5dda44f365 --- /dev/null +++ b/sfw-provider/src/provider/client_handling/ledger.rs @@ -0,0 +1,74 @@ +use directory_client::presence::providers::MixProviderClient; +use futures::lock::Mutex; +use sfw_provider_requests::AuthToken; +use sphinx::route::DestinationAddressBytes; +use std::collections::HashMap; +use std::io; +use std::path::PathBuf; +use std::sync::Arc; + +#[derive(Debug, Clone)] +// Note: you should NEVER create more than a single instance of this using 'new()'. +// You should always use .clone() to create additional instances +pub struct ClientLedger { + inner: Arc>, +} + +impl ClientLedger { + pub(crate) fn new() -> Self { + ClientLedger { + inner: Arc::new(Mutex::new(ClientLedgerInner(HashMap::new()))), + } + } + + pub(crate) async fn verify_token( + &self, + auth_token: &AuthToken, + client_address: &DestinationAddressBytes, + ) -> bool { + match self.inner.lock().await.0.get(client_address) { + None => false, + Some(expected_token) => expected_token == auth_token, + } + } + + pub(crate) async fn insert_token( + &mut self, + auth_token: AuthToken, + client_address: DestinationAddressBytes, + ) -> Option { + self.inner.lock().await.0.insert(client_address, auth_token) + } + + pub(crate) async fn remove_token( + &mut self, + client_address: &DestinationAddressBytes, + ) -> Option { + self.inner.lock().await.0.remove(client_address) + } + + pub(crate) async fn current_clients(&self) -> Vec { + self.inner + .lock() + .await + .0 + .keys() + .map(|client_address| client_address.to_base58_string()) + .map(|pub_key| MixProviderClient { pub_key }) + .collect() + } + + #[allow(dead_code)] + pub(crate) fn load(_file: PathBuf) -> Self { + // TODO: actual loading, + // temporarily just create a new one + Self::new() + } + + #[allow(dead_code)] + pub(crate) async fn save(&self, _file: PathBuf) -> io::Result<()> { + unimplemented!() + } +} + +struct ClientLedgerInner(HashMap); diff --git a/sfw-provider/src/provider/client_handling/listener.rs b/sfw-provider/src/provider/client_handling/listener.rs new file mode 100644 index 00000000000..f3cee1bf5ee --- /dev/null +++ b/sfw-provider/src/provider/client_handling/listener.rs @@ -0,0 +1,95 @@ +use crate::provider::client_handling::request_processing::{ + ClientProcessingResult, RequestProcessor, +}; +use log::*; +use sfw_provider_requests::responses::{ + ErrorResponse, ProviderResponse, PullResponse, RegisterResponse, +}; +use std::io; +use std::net::SocketAddr; +use tokio::prelude::*; +use tokio::runtime::Handle; +use tokio::task::JoinHandle; + +async fn process_request( + socket: &mut tokio::net::TcpStream, + packet_data: &[u8], + request_processor: &mut RequestProcessor, +) { + match request_processor.process_client_request(packet_data).await { + Err(e) => { + warn!("We failed to process client request - {:?}", e); + let response_bytes = ErrorResponse::new(format!("{:?}", e)).to_bytes(); + if let Err(e) = socket.write_all(&response_bytes).await { + debug!("Failed to write response to the client - {:?}", e); + } + } + Ok(res) => match res { + ClientProcessingResult::RegisterResponse(auth_token) => { + let response_bytes = RegisterResponse::new(auth_token).to_bytes(); + if let Err(e) = socket.write_all(&response_bytes).await { + debug!("Failed to write response to the client - {:?}", e); + } + } + ClientProcessingResult::PullResponse(retrieved_messages) => { + let (messages, paths): (Vec<_>, Vec<_>) = retrieved_messages + .into_iter() + .map(|c| c.into_tuple()) + .unzip(); + let response_bytes = PullResponse::new(messages).to_bytes(); + if let Ok(_) = socket.write_all(&response_bytes).await { + // only delete stored messages if we managed to actually send the response + if let Err(e) = request_processor.delete_sent_messages(paths).await { + error!("Somehow failed to delete stored messages! - {:?}", e); + } + } + } + }, + } +} + +async fn process_socket_connection( + mut socket: tokio::net::TcpStream, + mut request_processor: RequestProcessor, +) { + let mut buf = [0u8; 1024]; + loop { + match socket.read(&mut buf).await { + // socket closed + Ok(n) if n == 0 => { + trace!("Remote connection closed."); + return; + } + // in here we do not really want to process multiple requests from the same + // client concurrently as then we might end up with really weird race conditions + // plus realistically it wouldn't really introduce any speed up + Ok(n) => process_request(&mut socket, &buf[0..n], &mut request_processor).await, + Err(e) => { + warn!( + "failed to read from socket. Closing the connection; err = {:?}", + e + ); + return; + } + }; + } +} + +pub(crate) fn run_client_socket_listener( + handle: &Handle, + addr: SocketAddr, + request_processor: RequestProcessor, +) -> JoinHandle> { + let handle_clone = handle.clone(); + handle.spawn(async move { + let mut listener = tokio::net::TcpListener::bind(addr).await?; + loop { + let (socket, _) = listener.accept().await?; + + let thread_request_processor = request_processor.clone(); + handle_clone.spawn(async move { + process_socket_connection(socket, thread_request_processor).await; + }); + } + }) +} diff --git a/sfw-provider/src/provider/client_handling/mod.rs b/sfw-provider/src/provider/client_handling/mod.rs index 38903165487..c7fdd30b237 100644 --- a/sfw-provider/src/provider/client_handling/mod.rs +++ b/sfw-provider/src/provider/client_handling/mod.rs @@ -1,278 +1,3 @@ -use crate::provider::storage::{ClientStorage, StoreError}; -use crate::provider::ClientLedger; -use crypto::encryption; -use futures::lock::Mutex as FMutex; -use hmac::{Hmac, Mac}; -use log::*; -use sfw_provider_requests::requests::{ - ProviderRequestError, ProviderRequests, PullRequest, RegisterRequest, -}; -use sfw_provider_requests::responses::{ProviderResponse, PullResponse, RegisterResponse}; -use sfw_provider_requests::AuthToken; -use sha2::Sha256; -use std::io; -use std::path::{Path, PathBuf}; -use std::sync::Arc; - -type HmacSha256 = Hmac; - -#[derive(Debug)] -pub enum ClientProcessingError { - ClientDoesntExistError, - StoreError, - InvalidRequest, - WrongToken, - IOError, -} - -impl From for ClientProcessingError { - fn from(_: ProviderRequestError) -> Self { - use ClientProcessingError::*; - - InvalidRequest - } -} - -impl From for ClientProcessingError { - fn from(e: StoreError) -> Self { - match e { - StoreError::ClientDoesntExistError => ClientProcessingError::ClientDoesntExistError, - _ => ClientProcessingError::StoreError, - } - } -} - -impl From for ClientProcessingError { - fn from(_: io::Error) -> Self { - use ClientProcessingError::*; - - IOError - } -} - -#[derive(Debug)] -pub(crate) struct ClientProcessingData { - store_dir: PathBuf, - registered_clients_ledger: Arc>, - secret_key: encryption::PrivateKey, - message_retrieval_limit: u16, -} - -impl ClientProcessingData { - pub(crate) fn new( - store_dir: PathBuf, - registered_clients_ledger: Arc>, - secret_key: encryption::PrivateKey, - message_retrieval_limit: u16, - ) -> Self { - ClientProcessingData { - store_dir, - registered_clients_ledger, - secret_key, - message_retrieval_limit, - } - } - - pub(crate) fn add_arc(self) -> Arc { - Arc::new(self) - } -} - -pub(crate) struct ClientRequestProcessor; - -impl ClientRequestProcessor { - pub(crate) async fn process_client_request( - data: &[u8], - processing_data: Arc, - ) -> Result, ClientProcessingError> { - let client_request = ProviderRequests::from_bytes(&data)?; - trace!("Received the following request: {:?}", client_request); - match client_request { - ProviderRequests::Register(req) => Ok(ClientRequestProcessor::register_new_client( - req, - processing_data, - ) - .await? - .to_bytes()), - ProviderRequests::PullMessages(req) => Ok( - ClientRequestProcessor::process_pull_messages_request(req, processing_data) - .await? - .to_bytes(), - ), - } - } - - async fn process_pull_messages_request( - req: PullRequest, - processing_data: Arc, - ) -> Result { - // TODO: this lock is completely unnecessary as we're only reading the data. - // Wait for https://github.com/nymtech/nym-sfw-provider/issues/19 to resolve. - let unlocked_ledger = processing_data.registered_clients_ledger.lock().await; - - if unlocked_ledger.has_token(&req.auth_token) { - // drop the mutex so that we could do IO without blocking others wanting to get the lock - drop(unlocked_ledger); - let retrieved_messages = ClientStorage::retrieve_client_files( - req.destination_address, - processing_data.store_dir.as_path(), - processing_data.message_retrieval_limit, - )?; - Ok(PullResponse::new(retrieved_messages)) - } else { - Err(ClientProcessingError::WrongToken) - } - } - - async fn register_new_client( - req: RegisterRequest, - processing_data: Arc, - ) -> Result { - debug!( - "Processing register new client request: {:?}", - req.destination_address - ); - let mut unlocked_ledger = processing_data.registered_clients_ledger.lock().await; - - let auth_token = ClientRequestProcessor::generate_new_auth_token( - req.destination_address.to_vec(), - &processing_data.secret_key, - ); - if !unlocked_ledger.has_token(&auth_token) { - unlocked_ledger.insert_token(auth_token.clone(), req.destination_address); - ClientRequestProcessor::create_storage_dir( - req.destination_address, - processing_data.store_dir.as_path(), - )?; - } - Ok(RegisterResponse::new(auth_token)) - } - - fn create_storage_dir( - client_address: sphinx::route::DestinationAddressBytes, - store_dir: &Path, - ) -> io::Result<()> { - let client_dir_name = bs58::encode(client_address).into_string(); - let full_store_dir = store_dir.join(client_dir_name); - std::fs::create_dir_all(full_store_dir) - } - - fn generate_new_auth_token(data: Vec, key: &encryption::PrivateKey) -> AuthToken { - // also note that `new_varkey` doesn't even have an execution branch returning an error - let mut auth_token_raw = HmacSha256::new_varkey(&key.to_bytes()) - .expect("HMAC should be able take key of any size"); - auth_token_raw.input(&data); - let mut auth_token = [0u8; 32]; - auth_token.copy_from_slice(&auth_token_raw.result().code().to_vec()); - AuthToken(auth_token) - } -} - -#[cfg(test)] -mod register_new_client { - // use super::*; - - // TODO: those tests require being called in async context. we need to research how to test this stuff... - // #[test] - // fn registers_new_auth_token_for_each_new_client() { - // let req1 = RegisterRequest { - // destination_address: [1u8; 32], - // }; - // let registered_client_ledger = ClientLedger::new(); - // let store_dir = PathBuf::from("./foo/"); - // let key = Scalar::from_bytes_mod_order([1u8; 32]); - // let client_processing_data = ClientProcessingData::new(store_dir, registered_client_ledger, key).add_arc_futures_mutex(); - // - // - // // need to do async.... - // client_processing_data.lock().await; - // assert_eq!(0, registered_client_ledger.0.len()); - // ClientRequestProcessor::register_new_client( - // req1, - // client_processing_data.clone(), - // ); - // - // assert_eq!(1, registered_client_ledger.0.len()); - // - // let req2 = RegisterRequest { - // destination_address: [2u8; 32], - // }; - // ClientRequestProcessor::register_new_client( - // req2, - // client_processing_data, - // ); - // assert_eq!(2, registered_client_ledger.0.len()); - // } - // - // #[test] - // fn registers_given_token_only_once() { - // let req1 = RegisterRequest { - // destination_address: [1u8; 32], - // }; - // let registered_client_ledger = ClientLedger::new(); - // let store_dir = PathBuf::from("./foo/"); - // let key = Scalar::from_bytes_mod_order([1u8; 32]); - // let client_processing_data = ClientProcessingData::new(store_dir, registered_client_ledger, key).add_arc_futures_mutex(); - // - // ClientRequestProcessor::register_new_client( - // req1, - // client_processing_data.clone(), - // ); - // let req2 = RegisterRequest { - // destination_address: [1u8; 32], - // }; - // ClientRequestProcessor::register_new_client( - // req2, - // client_processing_data.clone(), - // ); - // - // client_processing_data.lock().await; - // - // assert_eq!(1, registered_client_ledger.0.len()) - // } -} - -#[cfg(test)] -mod create_storage_dir { - use super::*; - use sphinx::route::DestinationAddressBytes; - - #[test] - fn it_creates_a_correct_storage_directory() { - let client_address: DestinationAddressBytes = [1u8; 32]; - let store_dir = Path::new("/tmp/"); - ClientRequestProcessor::create_storage_dir(client_address, store_dir).unwrap(); - } -} - -#[cfg(test)] -mod generating_new_auth_token { - use super::*; - - #[test] - fn for_the_same_input_generates_the_same_auth_token() { - let data1 = vec![1u8; 55]; - let data2 = vec![1u8; 55]; - let key = encryption::PrivateKey::from_bytes(&[1u8; 32]); - let token1 = ClientRequestProcessor::generate_new_auth_token(data1, &key); - let token2 = ClientRequestProcessor::generate_new_auth_token(data2, &key); - assert_eq!(token1, token2); - } - - #[test] - fn for_different_inputs_generates_different_auth_tokens() { - let data1 = vec![1u8; 55]; - let data2 = vec![2u8; 55]; - let key = encryption::PrivateKey::from_bytes(&[1u8; 32]); - let token1 = ClientRequestProcessor::generate_new_auth_token(data1, &key); - let token2 = ClientRequestProcessor::generate_new_auth_token(data2, &key); - assert_ne!(token1, token2); - - let data1 = vec![1u8; 50]; - let data2 = vec![2u8; 55]; - let key = encryption::PrivateKey::from_bytes(&[1u8; 32]); - let token1 = ClientRequestProcessor::generate_new_auth_token(data1, &key); - let token2 = ClientRequestProcessor::generate_new_auth_token(data2, &key); - assert_ne!(token1, token2); - } -} +pub(crate) mod ledger; +pub(crate) mod listener; +pub(crate) mod request_processing; diff --git a/sfw-provider/src/provider/client_handling/request_processing.rs b/sfw-provider/src/provider/client_handling/request_processing.rs new file mode 100644 index 00000000000..6e7a36bb08d --- /dev/null +++ b/sfw-provider/src/provider/client_handling/request_processing.rs @@ -0,0 +1,203 @@ +use crate::provider::client_handling::ledger::ClientLedger; +use crate::provider::storage::{ClientFile, ClientStorage}; +use crypto::encryption; +use hmac::{Hmac, Mac}; +use log::*; +use sfw_provider_requests::requests::{ + ProviderRequestError, ProviderRequests, PullRequest, RegisterRequest, +}; +use sfw_provider_requests::AuthToken; +use sha2::Sha256; +use sphinx::route::DestinationAddressBytes; +use std::io; +use std::path::PathBuf; +use std::sync::Arc; + +#[derive(Debug)] +pub enum ClientProcessingResult { + PullResponse(Vec), + RegisterResponse(AuthToken), +} + +#[derive(Debug)] +pub enum ClientProcessingError { + InvalidRequest, + InvalidToken, + IOError(String), +} + +impl From for ClientProcessingError { + fn from(_: ProviderRequestError) -> Self { + use ClientProcessingError::*; + + InvalidRequest + } +} + +impl From for ClientProcessingError { + fn from(e: io::Error) -> Self { + use ClientProcessingError::*; + + IOError(e.to_string()) + } +} + +// PacketProcessor contains all data required to correctly process client requests +#[derive(Clone)] +pub struct RequestProcessor { + secret_key: Arc, + client_storage: ClientStorage, + client_ledger: ClientLedger, +} + +impl RequestProcessor { + pub(crate) fn new( + secret_key: encryption::PrivateKey, + client_storage: ClientStorage, + client_ledger: ClientLedger, + ) -> Self { + RequestProcessor { + secret_key: Arc::new(secret_key), + client_storage, + client_ledger, + } + } + + pub(crate) async fn process_client_request( + &mut self, + request_bytes: &[u8], + ) -> Result { + let client_request = ProviderRequests::from_bytes(request_bytes)?; + debug!("Received the following request: {:?}", client_request); + match client_request { + ProviderRequests::Register(req) => self.process_register_request(req).await, + ProviderRequests::PullMessages(req) => self.process_pull_request(req).await, + } + } + + pub(crate) async fn process_register_request( + &mut self, + req: RegisterRequest, + ) -> Result { + debug!( + "Processing register new client request: {:?}", + req.destination_address.to_base58_string() + ); + + let auth_token = self.generate_new_auth_token(req.destination_address.clone()); + if let Some(_) = self + .client_ledger + .insert_token(auth_token.clone(), req.destination_address.clone()) + .await + { + info!( + "Client {:?} was already registered before!", + req.destination_address.to_base58_string() + ) + } else { + if let Err(e) = self + .client_storage + .create_storage_dir(req.destination_address.clone()) + .await + { + error!("We failed to create inbox directory for the client -{:?}\nReverting issued token...", e); + // we must revert our changes if this operation failed + self.client_ledger + .remove_token(&req.destination_address) + .await; + } + } + + Ok(ClientProcessingResult::RegisterResponse(auth_token)) + } + + fn generate_new_auth_token(&self, client_address: DestinationAddressBytes) -> AuthToken { + type HmacSha256 = Hmac; + + // note that `new_varkey` doesn't even have an execution branch returning an error + // (true as of hmac 0.7.1) + let mut auth_token_raw = HmacSha256::new_varkey(&self.secret_key.to_bytes()).unwrap(); + auth_token_raw.input(client_address.as_bytes()); + let mut auth_token = [0u8; 32]; + auth_token.copy_from_slice(auth_token_raw.result().code().as_slice()); + AuthToken::from_bytes(auth_token) + } + + pub(crate) async fn process_pull_request( + &self, + req: PullRequest, + ) -> Result { + if self + .client_ledger + .verify_token(&req.auth_token, &req.destination_address) + .await + { + let retrieved_messages = self + .client_storage + .retrieve_client_files(req.destination_address) + .await?; + return Ok(ClientProcessingResult::PullResponse(retrieved_messages)); + } + + Err(ClientProcessingError::InvalidToken) + } + + pub(crate) async fn delete_sent_messages(&self, file_paths: Vec) -> io::Result<()> { + self.client_storage.delete_files(file_paths).await + } +} + +#[cfg(test)] +mod generating_new_auth_token { + use super::*; + + #[test] + fn for_the_same_input_generates_the_same_auth_token() { + let client_address1 = DestinationAddressBytes::from_bytes([1; 32]); + let client_address2 = DestinationAddressBytes::from_bytes([1; 32]); + let key = encryption::PrivateKey::from_bytes(&[2u8; 32]); + + let request_processor = RequestProcessor { + secret_key: Arc::new(key), + client_storage: ClientStorage::new(3, 16, Default::default()), + client_ledger: ClientLedger::new(), + }; + + let token1 = request_processor.generate_new_auth_token(client_address1); + let token2 = request_processor.generate_new_auth_token(client_address2); + assert_eq!(token1, token2); + } + + #[test] + fn for_different_inputs_generates_different_auth_tokens() { + let client_address1 = DestinationAddressBytes::from_bytes([1; 32]); + let client_address2 = DestinationAddressBytes::from_bytes([2; 32]); + let key1 = encryption::PrivateKey::from_bytes(&[3u8; 32]); + let key2 = encryption::PrivateKey::from_bytes(&[4u8; 32]); + + let request_processor1 = RequestProcessor { + secret_key: Arc::new(key1), + client_storage: ClientStorage::new(3, 16, Default::default()), + client_ledger: ClientLedger::new(), + }; + + let request_processor2 = RequestProcessor { + secret_key: Arc::new(key2), + client_storage: ClientStorage::new(3, 16, Default::default()), + client_ledger: ClientLedger::new(), + }; + + let token1 = request_processor1.generate_new_auth_token(client_address1.clone()); + let token2 = request_processor1.generate_new_auth_token(client_address2.clone()); + + let token3 = request_processor2.generate_new_auth_token(client_address1); + let token4 = request_processor2.generate_new_auth_token(client_address2); + + assert_ne!(token1, token2); + assert_ne!(token1, token3); + assert_ne!(token1, token4); + assert_ne!(token2, token3); + assert_ne!(token2, token4); + assert_ne!(token3, token4); + } +} diff --git a/sfw-provider/src/provider/mix_handling/listener.rs b/sfw-provider/src/provider/mix_handling/listener.rs new file mode 100644 index 00000000000..00a6b970622 --- /dev/null +++ b/sfw-provider/src/provider/mix_handling/listener.rs @@ -0,0 +1,78 @@ +use crate::provider::mix_handling::packet_processing::{MixProcessingResult, PacketProcessor}; +use log::*; +use std::io; +use std::net::SocketAddr; +use tokio::prelude::*; +use tokio::runtime::Handle; +use tokio::task::JoinHandle; + +async fn process_received_packet( + packet_data: [u8; sphinx::PACKET_SIZE], + packet_processor: PacketProcessor, +) { + match packet_processor.process_sphinx_packet(packet_data).await { + Err(e) => debug!("We failed to process received sphinx packet - {:?}", e), + Ok(res) => match res { + MixProcessingResult::ForwardHop => { + error!("Somehow processed a forward hop message - those are not implemented for providers!") + } + MixProcessingResult::FinalHop => { + trace!("successfully processed [and stored] a final hop packet") + } + }, + } +} + +async fn process_socket_connection( + mut socket: tokio::net::TcpStream, + packet_processor: PacketProcessor, +) { + let mut buf = [0u8; sphinx::PACKET_SIZE]; + loop { + match socket.read(&mut buf).await { + // socket closed + Ok(n) if n == 0 => { + trace!("Remote connection closed."); + return; + } + Ok(n) => { + if n != sphinx::PACKET_SIZE { + warn!("read data of different length than expected sphinx packet size - {} (expected {})", n, sphinx::PACKET_SIZE); + continue; + } + + // we must be able to handle multiple packets from same connection independently + tokio::spawn(process_received_packet( + buf.clone(), + packet_processor.clone(), + )) + } + Err(e) => { + warn!( + "failed to read from socket. Closing the connection; err = {:?}", + e + ); + return; + } + }; + } +} + +pub(crate) fn run_mix_socket_listener( + handle: &Handle, + addr: SocketAddr, + packet_processor: PacketProcessor, +) -> JoinHandle> { + let handle_clone = handle.clone(); + handle.spawn(async move { + let mut listener = tokio::net::TcpListener::bind(addr).await?; + loop { + let (socket, _) = listener.accept().await?; + + let thread_packet_processor = packet_processor.clone(); + handle_clone.spawn(async move { + process_socket_connection(socket, thread_packet_processor).await; + }); + } + }) +} diff --git a/sfw-provider/src/provider/mix_handling/mod.rs b/sfw-provider/src/provider/mix_handling/mod.rs index b50303331c0..236fd73481d 100644 --- a/sfw-provider/src/provider/mix_handling/mod.rs +++ b/sfw-provider/src/provider/mix_handling/mod.rs @@ -1,106 +1,2 @@ -use crate::provider::storage::StoreData; -use crypto::encryption; -use log::{error, warn}; -use sphinx::{ProcessedPacket, SphinxPacket}; -use std::path::PathBuf; -use std::sync::{Arc, RwLock}; - -// TODO: this will probably need to be moved elsewhere I imagine -// DUPLICATE WITH MIXNODE CODE!!! -#[derive(Debug)] -pub enum MixProcessingError { - FileIOFailure, - InvalidPayload, - NonMatchingRecipient, - ReceivedForwardHopError, - SphinxRecoveryError, - SphinxProcessingError, -} - -impl From for MixProcessingError { - // for time being just have a single error instance for all possible results of sphinx::ProcessingError - fn from(_: sphinx::ProcessingError) -> Self { - use MixProcessingError::*; - - SphinxRecoveryError - } -} - -impl From for MixProcessingError { - fn from(_: std::io::Error) -> Self { - use MixProcessingError::*; - - FileIOFailure - } -} - -// ProcessingData defines all data required to correctly unwrap sphinx packets -#[derive(Debug, Clone)] -pub(crate) struct MixProcessingData { - secret_key: encryption::PrivateKey, - pub(crate) store_dir: PathBuf, - new_messages_filename_length: u16, -} - -impl MixProcessingData { - pub(crate) fn new( - secret_key: encryption::PrivateKey, - store_dir: PathBuf, - new_messages_filename_length: u16, - ) -> Self { - MixProcessingData { - secret_key, - store_dir, - new_messages_filename_length, - } - } - - pub(crate) fn add_arc_rwlock(self) -> Arc> { - Arc::new(RwLock::new(self)) - } -} - -pub(crate) struct MixPacketProcessor(()); - -impl MixPacketProcessor { - pub fn process_sphinx_data_packet( - packet_data: &[u8], - processing_data: &RwLock, - ) -> Result { - let packet = SphinxPacket::from_bytes(packet_data)?; - let read_processing_data = match processing_data.read() { - Ok(guard) => guard, - Err(e) => { - error!("processing data lock was poisoned! - {:?}", e); - std::process::exit(1) - } - }; - let (client_address, client_surb_id, payload) = - match packet.process(read_processing_data.secret_key.inner()) { - Ok(ProcessedPacket::ProcessedPacketFinalHop(client_address, surb_id, payload)) => { - (client_address, surb_id, payload) - } - Ok(_) => return Err(MixProcessingError::ReceivedForwardHopError), - Err(e) => { - warn!("Error unwrapping Sphinx packet: {:?}", e); - return Err(MixProcessingError::SphinxProcessingError); - } - }; - - // TODO: should provider try to be recovering plaintext? this would potentially make client retrieve messages of non-constant length, - // perhaps provider should be re-padding them on retrieval or storing full data? - let (payload_destination, message) = payload - .try_recover_destination_and_plaintext() - .ok_or_else(|| MixProcessingError::InvalidPayload)?; - if client_address != payload_destination { - return Err(MixProcessingError::NonMatchingRecipient); - } - - Ok(StoreData::new( - client_address, - client_surb_id, - message, - read_processing_data.new_messages_filename_length, - )) - } -} +pub(crate) mod listener; +pub(crate) mod packet_processing; diff --git a/sfw-provider/src/provider/mix_handling/packet_processing.rs b/sfw-provider/src/provider/mix_handling/packet_processing.rs new file mode 100644 index 00000000000..12d50c322c0 --- /dev/null +++ b/sfw-provider/src/provider/mix_handling/packet_processing.rs @@ -0,0 +1,100 @@ +use crate::provider::storage::{ClientStorage, StoreData}; +use crypto::encryption; +use log::*; +use sphinx::payload::Payload; +use sphinx::route::{DestinationAddressBytes, SURBIdentifier}; +use sphinx::{ProcessedPacket, SphinxPacket}; +use std::io; +use std::ops::Deref; +use std::sync::Arc; + +#[derive(Debug)] +pub enum MixProcessingError { + ReceivedForwardHopError, + NonMatchingRecipient, + InvalidPayload, + SphinxProcessingError, + IOError(String), +} + +pub enum MixProcessingResult { + #[allow(dead_code)] + ForwardHop, + FinalHop, +} + +impl From for MixProcessingError { + // for time being just have a single error instance for all possible results of sphinx::ProcessingError + fn from(_: sphinx::ProcessingError) -> Self { + use MixProcessingError::*; + + SphinxProcessingError + } +} + +impl From for MixProcessingError { + fn from(e: io::Error) -> Self { + use MixProcessingError::*; + + IOError(e.to_string()) + } +} + +// PacketProcessor contains all data required to correctly unwrap and store sphinx packets +#[derive(Clone)] +pub struct PacketProcessor { + secret_key: Arc, + client_store: ClientStorage, +} + +impl PacketProcessor { + pub(crate) fn new(secret_key: encryption::PrivateKey, client_store: ClientStorage) -> Self { + PacketProcessor { + secret_key: Arc::new(secret_key), + client_store, + } + } + + async fn process_final_hop( + &self, + client_address: DestinationAddressBytes, + surb_id: SURBIdentifier, + payload: Payload, + ) -> Result { + // TODO: should provider try to be recovering plaintext? this would potentially make client retrieve messages of non-constant length, + // perhaps provider should be re-padding them on retrieval or storing full data? + let (payload_destination, message) = payload + .try_recover_destination_and_plaintext() + .ok_or_else(|| MixProcessingError::InvalidPayload)?; + if client_address != payload_destination { + return Err(MixProcessingError::NonMatchingRecipient); + } + + let store_data = StoreData::new(client_address, surb_id, message); + self.client_store.store_processed_data(store_data).await?; + + Ok(MixProcessingResult::FinalHop) + } + + pub(crate) async fn process_sphinx_packet( + &self, + raw_packet_data: [u8; sphinx::PACKET_SIZE], + ) -> Result { + let packet = SphinxPacket::from_bytes(&raw_packet_data)?; + + match packet.process(self.secret_key.deref().inner()) { + Ok(ProcessedPacket::ProcessedPacketForwardHop(_, _, _)) => { + warn!("Received a forward hop message - those are not implemented for providers"); + Err(MixProcessingError::ReceivedForwardHopError) + } + Ok(ProcessedPacket::ProcessedPacketFinalHop(client_address, surb_id, payload)) => { + self.process_final_hop(client_address, surb_id, payload) + .await + } + Err(e) => { + warn!("Failed to unwrap Sphinx packet: {:?}", e); + Err(MixProcessingError::SphinxProcessingError) + } + } + } +} diff --git a/sfw-provider/src/provider/mod.rs b/sfw-provider/src/provider/mod.rs index fa6518e1180..6b40f3ed831 100644 --- a/sfw-provider/src/provider/mod.rs +++ b/sfw-provider/src/provider/mod.rs @@ -1,22 +1,10 @@ use crate::config::persistance::pathfinder::ProviderPathfinder; use crate::config::Config; -use crate::provider::client_handling::{ClientProcessingData, ClientRequestProcessor}; -use crate::provider::mix_handling::{MixPacketProcessor, MixProcessingData}; +use crate::provider::client_handling::ledger::ClientLedger; use crate::provider::storage::ClientStorage; use crypto::encryption; -use directory_client::presence::providers::MixProviderClient; -use futures::io::Error; -use futures::lock::Mutex as FMutex; use log::*; use pemstore::pemstore::PemStore; -use sfw_provider_requests::AuthToken; -use sphinx::route::DestinationAddressBytes; -use std::collections::HashMap; -use std::net::{Shutdown, SocketAddr}; -use std::path::PathBuf; -use std::sync::Arc; -use std::sync::RwLock; -use tokio::prelude::*; use tokio::runtime::Runtime; mod client_handling; @@ -24,88 +12,14 @@ mod mix_handling; pub mod presence; mod storage; -#[derive(Debug)] -pub enum ProviderError { - TcpListenerBindingError, - TcpListenerConnectionError, - TcpListenerUnexpectedEof, - - TcpListenerUnknownError, -} - -impl From for ProviderError { - fn from(err: Error) -> Self { - use ProviderError::*; - match err.kind() { - io::ErrorKind::ConnectionRefused => TcpListenerConnectionError, - io::ErrorKind::ConnectionReset => TcpListenerConnectionError, - io::ErrorKind::ConnectionAborted => TcpListenerConnectionError, - io::ErrorKind::NotConnected => TcpListenerConnectionError, - - io::ErrorKind::AddrInUse => TcpListenerBindingError, - io::ErrorKind::AddrNotAvailable => TcpListenerBindingError, - io::ErrorKind::UnexpectedEof => TcpListenerUnexpectedEof, - _ => TcpListenerUnknownError, - } - } -} - -#[derive(Debug)] -pub struct ClientLedger(HashMap); - -impl ClientLedger { - fn new() -> Self { - ClientLedger(HashMap::new()) - } - - fn add_arc_futures_mutex(self) -> Arc> { - Arc::new(FMutex::new(self)) - } - - fn has_token(&self, auth_token: &AuthToken) -> bool { - self.0.contains_key(auth_token) - } - - fn insert_token( - &mut self, - auth_token: AuthToken, - client_address: DestinationAddressBytes, - ) -> Option { - self.0.insert(auth_token, client_address) - } - - fn current_clients(&self) -> Vec { - self.0 - .iter() - .map(|(_, v)| bs58::encode(v).into_string()) - .map(|pub_key| MixProviderClient { pub_key }) - .collect() - } - - #[allow(dead_code)] - fn load(_file: PathBuf) -> Self { - unimplemented!() - } -} - pub struct ServiceProvider { + runtime: Runtime, config: Config, sphinx_keypair: encryption::KeyPair, registered_clients_ledger: ClientLedger, } impl ServiceProvider { - pub fn new(config: Config) -> Self { - let sphinx_keypair = Self::load_sphinx_keys(&config); - - ServiceProvider { - config, - sphinx_keypair, - // TODO: load initial ledger from file - registered_clients_ledger: ClientLedger::new(), - } - } - fn load_sphinx_keys(config_file: &Config) -> encryption::KeyPair { let sphinx_keypair = PemStore::new(ProviderPathfinder::new_from_config(&config_file)) .read_encryption() @@ -117,177 +31,19 @@ impl ServiceProvider { sphinx_keypair } - async fn process_mixnet_socket_connection( - mut socket: tokio::net::TcpStream, - processing_data: Arc>, - ) { - let mut buf = [0u8; sphinx::PACKET_SIZE]; - - // In a loop, read data from the socket and write the data back. - loop { - match socket.read(&mut buf).await { - // socket closed - Ok(n) if n == 0 => { - trace!("Remote connection closed."); - return; - } - Ok(_) => { - let store_data = match MixPacketProcessor::process_sphinx_data_packet( - buf.as_ref(), - processing_data.as_ref(), - ) { - Ok(sd) => sd, - Err(e) => { - warn!("failed to process sphinx packet; err = {:?}", e); - return; - } - }; - let processing_data_lock = match processing_data.read() { - Ok(guard) => guard, - Err(e) => { - error!("processing data lock was poisoned! - {:?}", e); - std::process::exit(1) - } - }; - ClientStorage::store_processed_data( - store_data, - processing_data_lock.store_dir.as_path(), - ) - .unwrap_or_else(|e| { - error!("failed to store processed sphinx message; err = {:?}", e); - }); - } - Err(e) => { - warn!("failed to read from socket; err = {:?}", e); - return; - } - }; - - // Write the some data back - if let Err(e) = socket.write_all(b"foomp").await { - warn!("failed to write reply to socket; err = {:?}", e); - return; - } - } - } - - async fn send_response(mut socket: tokio::net::TcpStream, data: &[u8]) { - if let Err(e) = socket.write_all(data).await { - warn!("failed to write reply to socket; err = {:?}", e) - } - if let Err(e) = socket.shutdown(Shutdown::Write) { - warn!("failed to close write part of the socket; err = {:?}", e) - } - } - - // TODO: FIGURE OUT HOW TO SET READ_DEADLINES IN TOKIO - async fn process_client_socket_connection( - mut socket: tokio::net::TcpStream, - processing_data: Arc, - ) { - let mut buf = [0; 1024]; - - // TODO: restore the for loop once we go back to persistent tcp socket connection - let response = match socket.read(&mut buf).await { - // socket closed - Ok(n) if n == 0 => { - trace!("Remote connection closed."); - Err(()) - } - Ok(n) => { - match ClientRequestProcessor::process_client_request( - buf[..n].as_ref(), - processing_data, - ) - .await - { - Err(e) => { - warn!("failed to process client request; err = {:?}", e); - Err(()) - } - Ok(res) => Ok(res), - } - } - Err(e) => { - warn!("failed to read from socket; err = {:?}", e); - Err(()) - } - }; - - if let Err(e) = socket.shutdown(Shutdown::Read) { - warn!("failed to close read part of the socket; err = {:?}", e) - } - - match response { - Ok(res) => { - ServiceProvider::send_response(socket, &res).await; - } - _ => { - ServiceProvider::send_response(socket, b"bad foomp").await; - } - } - } - - async fn start_mixnet_listening( - address: SocketAddr, - secret_key: encryption::PrivateKey, - store_dir: PathBuf, - new_messages_filename_length: u16, - ) -> Result<(), ProviderError> { - let mut listener = tokio::net::TcpListener::bind(address).await?; - let processing_data = - MixProcessingData::new(secret_key, store_dir, new_messages_filename_length) - .add_arc_rwlock(); - - loop { - let (socket, _) = listener.accept().await?; - // do note that the underlying data is NOT copied here; arc is incremented and lock is shared - // (if I understand it all correctly) - let thread_processing_data = processing_data.clone(); - tokio::spawn(async move { - ServiceProvider::process_mixnet_socket_connection(socket, thread_processing_data) - .await - }); - } - } - - async fn start_client_listening( - address: SocketAddr, - store_dir: PathBuf, - client_ledger: Arc>, - secret_key: encryption::PrivateKey, - message_retrieval_limit: u16, - ) -> Result<(), ProviderError> { - let mut listener = tokio::net::TcpListener::bind(address).await?; - let processing_data = ClientProcessingData::new( - store_dir, - client_ledger, - secret_key, - message_retrieval_limit, - ) - .add_arc(); - - loop { - let (socket, _) = listener.accept().await?; - // do note that the underlying data is NOT copied here; arc is incremented and lock is shared - // (if I understand it all correctly) - let thread_processing_data = processing_data.clone(); - tokio::spawn(async move { - ServiceProvider::process_client_socket_connection(socket, thread_processing_data) - .await - }); + pub fn new(config: Config) -> Self { + let sphinx_keypair = Self::load_sphinx_keys(&config); + let registered_clients_ledger = ClientLedger::load(config.get_clients_ledger_path()); + ServiceProvider { + runtime: Runtime::new().unwrap(), + config, + sphinx_keypair, + registered_clients_ledger, } } - pub fn start(self) -> Result<(), Box> { - // Create the runtime, probably later move it to Provider struct itself? - // TODO: figure out the difference between Runtime and Handle - let mut rt = Runtime::new()?; - // let mut h = rt.handle(); - - let initial_client_ledger = self.registered_clients_ledger; - let thread_shareable_ledger = initial_client_ledger.add_arc_futures_mutex(); - + fn start_presence_notifier(&self) { + info!("Starting presence notifier..."); let notifier_config = presence::NotifierConfig::new( self.config.get_presence_directory_server(), self.config.get_mix_announce_address(), @@ -295,35 +51,64 @@ impl ServiceProvider { self.sphinx_keypair.public_key().to_base58_string(), self.config.get_presence_sending_delay(), ); + presence::Notifier::new(notifier_config, self.registered_clients_ledger.clone()) + .start(self.runtime.handle()); + } - let presence_future = rt.spawn({ - let presence_notifier = - presence::Notifier::new(notifier_config, thread_shareable_ledger.clone()); - presence_notifier.run() - }); + fn start_mix_socket_listener(&self, client_storage: ClientStorage) { + info!("Starting mix socket listener..."); + let packet_processor = mix_handling::packet_processing::PacketProcessor::new( + self.sphinx_keypair.private_key().clone(), + client_storage, + ); - let mix_future = rt.spawn(ServiceProvider::start_mixnet_listening( + mix_handling::listener::run_mix_socket_listener( + self.runtime.handle(), self.config.get_mix_listening_address(), - self.sphinx_keypair.private_key().clone(), // CLONE IS DONE TEMPORARILY UNTIL PROVIDER IS REFACTORED THE MIXNODE STYLE - self.config.get_clients_inboxes_dir(), - self.config.get_stored_messages_filename_length(), - )); - let client_future = rt.spawn(ServiceProvider::start_client_listening( + packet_processor, + ); + } + + fn start_client_socket_listener(&self, client_storage: ClientStorage) { + info!("Starting client socket listener..."); + let packet_processor = client_handling::request_processing::RequestProcessor::new( + self.sphinx_keypair.private_key().clone(), + client_storage, + self.registered_clients_ledger.clone(), + ); + + client_handling::listener::run_client_socket_listener( + self.runtime.handle(), self.config.get_clients_listening_address(), + packet_processor, + ); + } + + pub fn run(&mut self) { + // A possible future optimisation, depending on bottlenecks and resource usage: + // considering, presumably, there will be more mix packets received than client requests: + // create 2 separate runtimes - one with bigger threadpool dedicated solely for + // the mix handling and the other one for the rest of tasks + + let client_storage = ClientStorage::new( + self.config.get_message_retrieval_limit() as usize, + self.config.get_stored_messages_filename_length(), self.config.get_clients_inboxes_dir(), - thread_shareable_ledger, - self.sphinx_keypair.private_key().clone(), // CLONE IS DONE TEMPORARILY UNTIL PROVIDER IS REFACTORED THE MIXNODE STYLE - self.config.get_message_retrieval_limit(), - )); - // Spawn the root task - rt.block_on(async { - let future_results = - futures::future::join3(mix_future, client_future, presence_future).await; - assert!(future_results.0.is_ok() && future_results.1.is_ok()); - }); + ); + + self.start_presence_notifier(); + self.start_mix_socket_listener(client_storage.clone()); + self.start_client_socket_listener(client_storage); - // this line in theory should never be reached as the runtime should be permanently blocked on listeners - error!("The server went kaput..."); - Ok(()) + if let Err(e) = self.runtime.block_on(tokio::signal::ctrl_c()) { + error!( + "There was an error while capturing SIGINT - {:?}. We will terminate regardless", + e + ); + } + + println!( + "Received SIGINT - the provider will terminate now (threads are not YET nicely stopped)" + ); } } diff --git a/sfw-provider/src/provider/presence.rs b/sfw-provider/src/provider/presence.rs index 67df2bba081..babdeacca65 100644 --- a/sfw-provider/src/provider/presence.rs +++ b/sfw-provider/src/provider/presence.rs @@ -3,10 +3,10 @@ use crate::provider::ClientLedger; use directory_client::presence::providers::MixProviderPresence; use directory_client::requests::presence_providers_post::PresenceMixProviderPoster; use directory_client::DirectoryClient; -use futures::lock::Mutex as FMutex; use log::{debug, error}; -use std::sync::Arc; use std::time::Duration; +use tokio::runtime::Handle; +use tokio::task::JoinHandle; pub struct NotifierConfig { directory_server: String, @@ -36,7 +36,7 @@ impl NotifierConfig { pub struct Notifier { net_client: directory_client::Client, - client_ledger: Arc>, + client_ledger: ClientLedger, sending_delay: Duration, client_listener: String, mixnet_listener: String, @@ -44,7 +44,7 @@ pub struct Notifier { } impl Notifier { - pub fn new(config: NotifierConfig, client_ledger: Arc>) -> Notifier { + pub fn new(config: NotifierConfig, client_ledger: ClientLedger) -> Notifier { let directory_client_cfg = directory_client::Config { base_url: config.directory_server, }; @@ -61,13 +61,11 @@ impl Notifier { } async fn make_presence(&self) -> MixProviderPresence { - let unlocked_ledger = self.client_ledger.lock().await; - MixProviderPresence { client_listener: self.client_listener.clone(), mixnet_listener: self.mixnet_listener.clone(), pub_key: self.pub_key_string.clone(), - registered_clients: unlocked_ledger.current_clients(), + registered_clients: self.client_ledger.current_clients().await, last_seen: 0, version: built_info::PKG_VERSION.to_string(), } @@ -80,11 +78,13 @@ impl Notifier { } } - pub async fn run(self) { - loop { - let presence = self.make_presence().await; - self.notify(presence); - tokio::time::delay_for(self.sending_delay).await; - } + pub fn start(self, handle: &Handle) -> JoinHandle<()> { + handle.spawn(async move { + loop { + let presence = self.make_presence().await; + self.notify(presence); + tokio::time::delay_for(self.sending_delay).await; + } + }) } } diff --git a/sfw-provider/src/provider/storage/mod.rs b/sfw-provider/src/provider/storage/mod.rs index 6123d5c6861..9df6ff440ad 100644 --- a/sfw-provider/src/provider/storage/mod.rs +++ b/sfw-provider/src/provider/storage/mod.rs @@ -1,22 +1,36 @@ +use futures::lock::Mutex; +use futures::StreamExt; use log::*; use rand::Rng; use sfw_provider_requests::DUMMY_MESSAGE_CONTENT; use sphinx::route::{DestinationAddressBytes, SURBIdentifier}; -use std::fs::File; use std::io; -use std::io::Write; -use std::path::{Path, PathBuf}; +use std::path::PathBuf; +use std::sync::Arc; +use tokio::fs; +use tokio::fs::File; +use tokio::prelude::*; + +fn dummy_message() -> ClientFile { + ClientFile { + content: DUMMY_MESSAGE_CONTENT.to_vec(), + path: Default::default(), + } +} -pub enum StoreError { - ClientDoesntExistError, - FileIOFailure, +#[derive(Clone, Debug)] +pub struct ClientFile { + content: Vec, + path: PathBuf, } -impl From for StoreError { - fn from(_: std::io::Error) -> Self { - use StoreError::*; +impl ClientFile { + fn new(content: Vec, path: PathBuf) -> Self { + ClientFile { content, path } + } - FileIOFailure + pub(crate) fn into_tuple(self) -> (Vec, PathBuf) { + (self.content, self.path) } } @@ -25,7 +39,6 @@ pub struct StoreData { #[allow(dead_code)] client_surb_id: SURBIdentifier, message: Vec, - filename_length: u16, } impl StoreData { @@ -33,22 +46,58 @@ impl StoreData { client_address: DestinationAddressBytes, client_surb_id: SURBIdentifier, message: Vec, - filename_length: u16, ) -> Self { StoreData { client_address, client_surb_id, message, - filename_length, } } } -// TODO: replace with database -pub struct ClientStorage(()); +// TODO: replace with proper database... +// Note: you should NEVER create more than a single instance of this using 'new()'. +// You should always use .clone() to create additional instances +#[derive(Clone, Debug)] +pub struct ClientStorage { + inner: Arc>, +} + +// even though the data inside is extremely cheap to copy, we have to have a single mutex, +// so might as well store the data behind it +pub struct ClientStorageInner { + message_retrieval_limit: usize, + filename_length: u16, + main_store_path_dir: PathBuf, +} -// TODO: change it to some generic implementation to inject fs +// TODO: change it to some generic implementation to inject fs (or even better - proper database) impl ClientStorage { + pub(crate) fn new(message_limit: usize, filename_len: u16, main_store_dir: PathBuf) -> Self { + ClientStorage { + inner: Arc::new(Mutex::new(ClientStorageInner { + message_retrieval_limit: message_limit, + filename_length: filename_len, + main_store_path_dir: main_store_dir, + })), + } + } + + // TODO: does this method really require locking? + // The worst that can happen is client sending 2 requests: to pull messages and register + // if register does not lock, then under specific timing pull messages will fail, + // but can simply be retried with no issues + pub(crate) async fn create_storage_dir( + &self, + client_address: DestinationAddressBytes, + ) -> io::Result<()> { + let inner_data = self.inner.lock().await; + + let client_dir_name = client_address.to_base58_string(); + let full_store_dir = inner_data.main_store_path_dir.join(client_dir_name); + fs::create_dir_all(full_store_dir).await + } + pub(crate) fn generate_random_file_name(length: usize) -> String { rand::thread_rng() .sample_iter(&rand::distributions::Alphanumeric) @@ -56,16 +105,13 @@ impl ClientStorage { .collect::() } - fn dummy_message() -> Vec { - // TODO: should it be padded to constant length? - DUMMY_MESSAGE_CONTENT.to_vec() - } + pub(crate) async fn store_processed_data(&self, store_data: StoreData) -> io::Result<()> { + let inner_data = self.inner.lock().await; - pub fn store_processed_data(store_data: StoreData, store_dir: &Path) -> io::Result<()> { - let client_dir_name = bs58::encode(store_data.client_address).into_string(); - let full_store_dir = store_dir.join(client_dir_name); - let full_store_path = full_store_dir.join(ClientStorage::generate_random_file_name( - store_data.filename_length as usize, + let client_dir_name = store_data.client_address.to_base58_string(); + let full_store_dir = inner_data.main_store_path_dir.join(client_dir_name); + let full_store_path = full_store_dir.join(Self::generate_random_file_name( + inner_data.filename_length as usize, )); debug!( "going to store: {:?} in file: {:?}", @@ -74,44 +120,62 @@ impl ClientStorage { // TODO: what to do with surbIDs?? - // we can use normal io here, no need for tokio as it's all happening in one thread per connection - let mut file = File::create(full_store_path)?; - file.write_all(store_data.message.as_ref())?; - - Ok(()) + let mut file = File::create(full_store_path).await?; + file.write_all(store_data.message.as_ref()).await } - pub fn retrieve_client_files( + pub(crate) async fn retrieve_client_files( + &self, client_address: DestinationAddressBytes, - store_dir: &Path, - message_retrieval_limit: u16, - ) -> Result>, StoreError> { - let client_dir_name = bs58::encode(client_address).into_string(); - let full_store_dir = store_dir.join(client_dir_name); + ) -> io::Result> { + let inner_data = self.inner.lock().await; + + let client_dir_name = client_address.to_base58_string(); + let full_store_dir = inner_data.main_store_path_dir.join(client_dir_name); trace!("going to lookup: {:?}!", full_store_dir); if !full_store_dir.exists() { - return Err(StoreError::ClientDoesntExistError); + return Err(io::Error::new( + io::ErrorKind::NotFound, + "Target client does not exist", + )); + } + + let mut msgs = Vec::new(); + let mut read_dir = fs::read_dir(full_store_dir).await?; + + while let Some(dir_entry) = read_dir.next().await { + if let Ok(dir_entry) = dir_entry { + if !Self::is_valid_file(&dir_entry).await { + continue; + } + // Do not delete the file itself here! + // Only do it after client has received it + let client_file = + ClientFile::new(fs::read(dir_entry.path()).await?, dir_entry.path()); + msgs.push(client_file) + } + if msgs.len() == inner_data.message_retrieval_limit { + break; + } } - let msgs: Vec<_> = std::fs::read_dir(full_store_dir)? - .filter_map(|entry| entry.ok()) - .filter(|entry| ClientStorage::is_valid_file(entry)) - .map(|entry| { - // Not yet sure how to exactly get rid of those unwraps - let content = std::fs::read(entry.path()).unwrap(); - ClientStorage::delete_file(entry.path()).unwrap(); - content - }) // TODO: THIS MAP IS UNSAFE (BOTH FOR READING AND DELETING)!! - in the case where there are multiple requests from the same client being processed in parallel - .chain(std::iter::repeat(ClientStorage::dummy_message())) - .take(message_retrieval_limit as usize) - .collect(); + let dummy_message = dummy_message(); + + // make sure we always return as many messages as we need + if msgs.len() != inner_data.message_retrieval_limit as usize { + msgs = msgs + .into_iter() + .chain(std::iter::repeat(dummy_message)) + .take(inner_data.message_retrieval_limit) + .collect(); + } Ok(msgs) } - fn is_valid_file(entry: &std::fs::DirEntry) -> bool { - let metadata = match entry.metadata() { + async fn is_valid_file(entry: &fs::DirEntry) -> bool { + let metadata = match entry.metadata().await { Ok(meta) => meta, Err(e) => { error!( @@ -134,11 +198,18 @@ impl ClientStorage { is_file } - // TODO: THIS NEEDS A LOCKING MECHANISM!!! (or a db layer on top - basically 'ClientStorage' on steroids) - // TODO 2: This should only be called AFTER we sent the reply. Because if client's connection failed after sending request - // the messages would be deleted but he wouldn't have received them - fn delete_file(path: PathBuf) -> io::Result<()> { - trace!("Here {:?} will be deleted!", path); - std::fs::remove_file(path) // another argument for db layer -> remove_file is NOT guaranteed to immediately get rid of the file + pub(crate) async fn delete_files(&self, file_paths: Vec) -> io::Result<()> { + let dummy_message = dummy_message(); + let _guard = self.inner.lock().await; + + for file_path in file_paths { + if file_path == dummy_message.path { + continue; + } + if let Err(e) = fs::remove_file(file_path).await { + error!("Failed to delete client message! - {:?}", e) + } + } + Ok(()) } }