From a4e2a94e7b9acd7a94ec16282c012f86cbdad84f Mon Sep 17 00:00:00 2001 From: Marten Seemann Date: Wed, 8 Dec 2021 18:35:56 +0400 Subject: [PATCH] move the peerstoremanager to the host --- examples/go.mod | 2 +- examples/go.sum | 6 +- examples/ipfs-camp-2019/go.mod | 2 +- examples/ipfs-camp-2019/go.sum | 6 +- examples/pubsub/chat/go.mod | 2 +- examples/pubsub/chat/go.sum | 6 +- go.mod | 5 +- go.sum | 6 +- p2p/host/basic/basic_host.go | 13 +- p2p/host/pstoremanager/mock_peerstore_test.go | 456 ++++++++++++++++++ p2p/host/pstoremanager/pstoremanager.go | 130 +++++ p2p/host/pstoremanager/pstoremanager_test.go | 98 ++++ 12 files changed, 717 insertions(+), 15 deletions(-) create mode 100644 p2p/host/pstoremanager/mock_peerstore_test.go create mode 100644 p2p/host/pstoremanager/pstoremanager.go create mode 100644 p2p/host/pstoremanager/pstoremanager_test.go diff --git a/examples/go.mod b/examples/go.mod index 4938fc3671..132a48b81d 100644 --- a/examples/go.mod +++ b/examples/go.mod @@ -9,7 +9,7 @@ require ( github.com/ipfs/go-log/v2 v2.3.0 github.com/libp2p/go-libp2p v0.14.4 github.com/libp2p/go-libp2p-connmgr v0.2.4 - github.com/libp2p/go-libp2p-core v0.11.0 + github.com/libp2p/go-libp2p-core v0.12.0 github.com/libp2p/go-libp2p-discovery v0.6.0 github.com/libp2p/go-libp2p-kad-dht v0.15.0 github.com/libp2p/go-libp2p-noise v0.3.0 diff --git a/examples/go.sum b/examples/go.sum index 4258a1f2c2..0a492df2b2 100644 --- a/examples/go.sum +++ b/examples/go.sum @@ -461,8 +461,9 @@ github.com/libp2p/go-libp2p-core v0.8.2/go.mod h1:FfewUH/YpvWbEB+ZY9AQRQ4TAD8sJB github.com/libp2p/go-libp2p-core v0.8.6/go.mod h1:dgHr0l0hIKfWpGpqAMbpo19pen9wJfdCGv51mTmdpmM= github.com/libp2p/go-libp2p-core v0.9.0/go.mod h1:ESsbz31oC3C1AvMJoGx26RTuCkNhmkSRCqZ0kQtJ2/8= github.com/libp2p/go-libp2p-core v0.10.0/go.mod h1:ECdxehoYosLYHgDDFa2N4yE8Y7aQRAMf0sX9mf2sbGg= -github.com/libp2p/go-libp2p-core v0.11.0 h1:75jAgdA+IChNa+/mZXogfmrGkgwxkVvxmIC7pV+F6sI= github.com/libp2p/go-libp2p-core v0.11.0/go.mod h1:ECdxehoYosLYHgDDFa2N4yE8Y7aQRAMf0sX9mf2sbGg= +github.com/libp2p/go-libp2p-core v0.12.0 h1:S9bO2lhSJtOvAKo8QAdW9Zp1FEo0XkfXymqvrW6l/I8= +github.com/libp2p/go-libp2p-core v0.12.0/go.mod h1:ECdxehoYosLYHgDDFa2N4yE8Y7aQRAMf0sX9mf2sbGg= github.com/libp2p/go-libp2p-discovery v0.6.0 h1:1XdPmhMJr8Tmj/yUfkJMIi8mgwWrLUsCB3bMxdT+DSo= github.com/libp2p/go-libp2p-discovery v0.6.0/go.mod h1:/u1voHt0tKIe5oIA1RHBKQLVCWPna2dXmPNHc2zR9S8= github.com/libp2p/go-libp2p-kad-dht v0.15.0 h1:Ke+Oj78gX5UDXnA6HBdrgvi+fStJxgYTDa51U0TsCLo= @@ -484,8 +485,9 @@ github.com/libp2p/go-libp2p-noise v0.3.0/go.mod h1:JNjHbociDJKHD64KTkzGnzqJ0FEV5 github.com/libp2p/go-libp2p-peerstore v0.1.4/go.mod h1:+4BDbDiiKf4PzpANZDAT+knVdLxvqh7hXOujessqdzs= github.com/libp2p/go-libp2p-peerstore v0.2.6/go.mod h1:ss/TWTgHZTMpsU/oKVVPQCGuDHItOpf2W8RxAi50P2s= github.com/libp2p/go-libp2p-peerstore v0.2.8/go.mod h1:gGiPlXdz7mIHd2vfAsHzBNAMqSDkt2UBFwgcITgw1lA= -github.com/libp2p/go-libp2p-peerstore v0.4.0 h1:DOhRJLnM9Dc9lIXi3rPDZBf789LXy1BrzwIs7Tj0cKA= github.com/libp2p/go-libp2p-peerstore v0.4.0/go.mod h1:rDJUFyzEWPpXpEwywkcTYYzDHlwza8riYMaUzaN6hX0= +github.com/libp2p/go-libp2p-peerstore v0.6.0 h1:HJminhQSGISBIRb93N6WK3t6Fa8OOTnHd/VBjL4mY5A= +github.com/libp2p/go-libp2p-peerstore v0.6.0/go.mod h1:DGEmKdXrcYpK9Jha3sS7MhqYdInxJy84bIPtSu65bKc= github.com/libp2p/go-libp2p-pnet v0.2.0 h1:J6htxttBipJujEjz1y0a5+eYoiPcFHhSYHH6na5f0/k= github.com/libp2p/go-libp2p-pnet v0.2.0/go.mod h1:Qqvq6JH/oMZGwqs3N1Fqhv8NVhrdYcO0BW4wssv21LA= github.com/libp2p/go-libp2p-quic-transport v0.11.2/go.mod h1:wlanzKtIh6pHrq+0U3p3DY9PJfGqxMgPaGKaK5LifwQ= diff --git a/examples/ipfs-camp-2019/go.mod b/examples/ipfs-camp-2019/go.mod index 50b24bc2d5..dbef9e8b03 100644 --- a/examples/ipfs-camp-2019/go.mod +++ b/examples/ipfs-camp-2019/go.mod @@ -5,7 +5,7 @@ go 1.16 require ( github.com/gogo/protobuf v1.3.2 github.com/libp2p/go-libp2p v0.14.4 - github.com/libp2p/go-libp2p-core v0.11.0 + github.com/libp2p/go-libp2p-core v0.12.0 github.com/libp2p/go-libp2p-discovery v0.6.0 github.com/libp2p/go-libp2p-kad-dht v0.15.0 github.com/libp2p/go-libp2p-mplex v0.4.1 diff --git a/examples/ipfs-camp-2019/go.sum b/examples/ipfs-camp-2019/go.sum index 501e3a46aa..94484e8678 100644 --- a/examples/ipfs-camp-2019/go.sum +++ b/examples/ipfs-camp-2019/go.sum @@ -464,8 +464,9 @@ github.com/libp2p/go-libp2p-core v0.8.2/go.mod h1:FfewUH/YpvWbEB+ZY9AQRQ4TAD8sJB github.com/libp2p/go-libp2p-core v0.8.6/go.mod h1:dgHr0l0hIKfWpGpqAMbpo19pen9wJfdCGv51mTmdpmM= github.com/libp2p/go-libp2p-core v0.9.0/go.mod h1:ESsbz31oC3C1AvMJoGx26RTuCkNhmkSRCqZ0kQtJ2/8= github.com/libp2p/go-libp2p-core v0.10.0/go.mod h1:ECdxehoYosLYHgDDFa2N4yE8Y7aQRAMf0sX9mf2sbGg= -github.com/libp2p/go-libp2p-core v0.11.0 h1:75jAgdA+IChNa+/mZXogfmrGkgwxkVvxmIC7pV+F6sI= github.com/libp2p/go-libp2p-core v0.11.0/go.mod h1:ECdxehoYosLYHgDDFa2N4yE8Y7aQRAMf0sX9mf2sbGg= +github.com/libp2p/go-libp2p-core v0.12.0 h1:S9bO2lhSJtOvAKo8QAdW9Zp1FEo0XkfXymqvrW6l/I8= +github.com/libp2p/go-libp2p-core v0.12.0/go.mod h1:ECdxehoYosLYHgDDFa2N4yE8Y7aQRAMf0sX9mf2sbGg= github.com/libp2p/go-libp2p-discovery v0.5.1/go.mod h1:+srtPIU9gDaBNu//UHvcdliKBIcr4SfDcm0/PfPJLug= github.com/libp2p/go-libp2p-discovery v0.6.0 h1:1XdPmhMJr8Tmj/yUfkJMIi8mgwWrLUsCB3bMxdT+DSo= github.com/libp2p/go-libp2p-discovery v0.6.0/go.mod h1:/u1voHt0tKIe5oIA1RHBKQLVCWPna2dXmPNHc2zR9S8= @@ -488,8 +489,9 @@ github.com/libp2p/go-libp2p-noise v0.3.0/go.mod h1:JNjHbociDJKHD64KTkzGnzqJ0FEV5 github.com/libp2p/go-libp2p-peerstore v0.1.4/go.mod h1:+4BDbDiiKf4PzpANZDAT+knVdLxvqh7hXOujessqdzs= github.com/libp2p/go-libp2p-peerstore v0.2.6/go.mod h1:ss/TWTgHZTMpsU/oKVVPQCGuDHItOpf2W8RxAi50P2s= github.com/libp2p/go-libp2p-peerstore v0.2.8/go.mod h1:gGiPlXdz7mIHd2vfAsHzBNAMqSDkt2UBFwgcITgw1lA= -github.com/libp2p/go-libp2p-peerstore v0.4.0 h1:DOhRJLnM9Dc9lIXi3rPDZBf789LXy1BrzwIs7Tj0cKA= github.com/libp2p/go-libp2p-peerstore v0.4.0/go.mod h1:rDJUFyzEWPpXpEwywkcTYYzDHlwza8riYMaUzaN6hX0= +github.com/libp2p/go-libp2p-peerstore v0.6.0 h1:HJminhQSGISBIRb93N6WK3t6Fa8OOTnHd/VBjL4mY5A= +github.com/libp2p/go-libp2p-peerstore v0.6.0/go.mod h1:DGEmKdXrcYpK9Jha3sS7MhqYdInxJy84bIPtSu65bKc= github.com/libp2p/go-libp2p-pnet v0.2.0 h1:J6htxttBipJujEjz1y0a5+eYoiPcFHhSYHH6na5f0/k= github.com/libp2p/go-libp2p-pnet v0.2.0/go.mod h1:Qqvq6JH/oMZGwqs3N1Fqhv8NVhrdYcO0BW4wssv21LA= github.com/libp2p/go-libp2p-pubsub v0.5.3 h1:XCn5xvgA/AKpbbaeqbomfKtQCbT9QsU39tYsVj0IndQ= diff --git a/examples/pubsub/chat/go.mod b/examples/pubsub/chat/go.mod index fff00ac50f..ee30039e49 100644 --- a/examples/pubsub/chat/go.mod +++ b/examples/pubsub/chat/go.mod @@ -5,7 +5,7 @@ go 1.16 require ( github.com/gdamore/tcell/v2 v2.1.0 github.com/libp2p/go-libp2p v0.14.1 - github.com/libp2p/go-libp2p-core v0.11.0 + github.com/libp2p/go-libp2p-core v0.12.0 github.com/libp2p/go-libp2p-pubsub v0.6.0 github.com/rivo/tview v0.0.0-20210125085121-dbc1f32bb1d0 ) diff --git a/examples/pubsub/chat/go.sum b/examples/pubsub/chat/go.sum index c4e3b5bbe3..ef36e7807a 100644 --- a/examples/pubsub/chat/go.sum +++ b/examples/pubsub/chat/go.sum @@ -437,8 +437,9 @@ github.com/libp2p/go-libp2p-core v0.8.2/go.mod h1:FfewUH/YpvWbEB+ZY9AQRQ4TAD8sJB github.com/libp2p/go-libp2p-core v0.8.6/go.mod h1:dgHr0l0hIKfWpGpqAMbpo19pen9wJfdCGv51mTmdpmM= github.com/libp2p/go-libp2p-core v0.9.0/go.mod h1:ESsbz31oC3C1AvMJoGx26RTuCkNhmkSRCqZ0kQtJ2/8= github.com/libp2p/go-libp2p-core v0.10.0/go.mod h1:ECdxehoYosLYHgDDFa2N4yE8Y7aQRAMf0sX9mf2sbGg= -github.com/libp2p/go-libp2p-core v0.11.0 h1:75jAgdA+IChNa+/mZXogfmrGkgwxkVvxmIC7pV+F6sI= github.com/libp2p/go-libp2p-core v0.11.0/go.mod h1:ECdxehoYosLYHgDDFa2N4yE8Y7aQRAMf0sX9mf2sbGg= +github.com/libp2p/go-libp2p-core v0.12.0 h1:S9bO2lhSJtOvAKo8QAdW9Zp1FEo0XkfXymqvrW6l/I8= +github.com/libp2p/go-libp2p-core v0.12.0/go.mod h1:ECdxehoYosLYHgDDFa2N4yE8Y7aQRAMf0sX9mf2sbGg= github.com/libp2p/go-libp2p-discovery v0.6.0 h1:1XdPmhMJr8Tmj/yUfkJMIi8mgwWrLUsCB3bMxdT+DSo= github.com/libp2p/go-libp2p-discovery v0.6.0/go.mod h1:/u1voHt0tKIe5oIA1RHBKQLVCWPna2dXmPNHc2zR9S8= github.com/libp2p/go-libp2p-loggables v0.1.0/go.mod h1:EyumB2Y6PrYjr55Q3/tiJ/o3xoDasoRYM7nOzEpoa90= @@ -453,8 +454,9 @@ github.com/libp2p/go-libp2p-netutil v0.1.0/go.mod h1:3Qv/aDqtMLTUyQeundkKsA+YCTh github.com/libp2p/go-libp2p-noise v0.3.0 h1:NCVH7evhVt9njbTQshzT7N1S3Q6fjj9M11FCgfH5+cA= github.com/libp2p/go-libp2p-noise v0.3.0/go.mod h1:JNjHbociDJKHD64KTkzGnzqJ0FEV5gHJa6AB00kbCNQ= github.com/libp2p/go-libp2p-peerstore v0.2.6/go.mod h1:ss/TWTgHZTMpsU/oKVVPQCGuDHItOpf2W8RxAi50P2s= -github.com/libp2p/go-libp2p-peerstore v0.4.0 h1:DOhRJLnM9Dc9lIXi3rPDZBf789LXy1BrzwIs7Tj0cKA= github.com/libp2p/go-libp2p-peerstore v0.4.0/go.mod h1:rDJUFyzEWPpXpEwywkcTYYzDHlwza8riYMaUzaN6hX0= +github.com/libp2p/go-libp2p-peerstore v0.6.0 h1:HJminhQSGISBIRb93N6WK3t6Fa8OOTnHd/VBjL4mY5A= +github.com/libp2p/go-libp2p-peerstore v0.6.0/go.mod h1:DGEmKdXrcYpK9Jha3sS7MhqYdInxJy84bIPtSu65bKc= github.com/libp2p/go-libp2p-pnet v0.2.0 h1:J6htxttBipJujEjz1y0a5+eYoiPcFHhSYHH6na5f0/k= github.com/libp2p/go-libp2p-pnet v0.2.0/go.mod h1:Qqvq6JH/oMZGwqs3N1Fqhv8NVhrdYcO0BW4wssv21LA= github.com/libp2p/go-libp2p-pubsub v0.6.0 h1:98+RXuEWW17U6cAijK1yaTf6mw/B+n5yPA421z+dlo0= diff --git a/go.mod b/go.mod index 6de3930015..a0eb2b18e4 100644 --- a/go.mod +++ b/go.mod @@ -6,6 +6,7 @@ require ( github.com/btcsuite/btcd v0.22.0-beta // indirect github.com/davidlazar/go-crypto v0.0.0-20200604182044-b73af7476f6c // indirect github.com/gogo/protobuf v1.3.2 + github.com/golang/mock v1.6.0 github.com/google/uuid v1.3.0 // indirect github.com/huin/goupnp v1.0.2 // indirect github.com/ipfs/go-cid v0.0.7 @@ -23,13 +24,13 @@ require ( github.com/libp2p/go-libp2p-autonat v0.6.0 github.com/libp2p/go-libp2p-blankhost v0.2.0 github.com/libp2p/go-libp2p-circuit v0.4.0 - github.com/libp2p/go-libp2p-core v0.11.0 + github.com/libp2p/go-libp2p-core v0.12.0 github.com/libp2p/go-libp2p-discovery v0.6.0 github.com/libp2p/go-libp2p-mplex v0.4.1 github.com/libp2p/go-libp2p-nat v0.1.0 github.com/libp2p/go-libp2p-netutil v0.1.0 github.com/libp2p/go-libp2p-noise v0.3.0 - github.com/libp2p/go-libp2p-peerstore v0.4.0 + github.com/libp2p/go-libp2p-peerstore v0.6.0 github.com/libp2p/go-libp2p-quic-transport v0.15.0 github.com/libp2p/go-libp2p-swarm v0.8.0 github.com/libp2p/go-libp2p-testing v0.5.0 diff --git a/go.sum b/go.sum index de60c9afb5..cb9002e339 100644 --- a/go.sum +++ b/go.sum @@ -428,8 +428,9 @@ github.com/libp2p/go-libp2p-core v0.8.2/go.mod h1:FfewUH/YpvWbEB+ZY9AQRQ4TAD8sJB github.com/libp2p/go-libp2p-core v0.8.6/go.mod h1:dgHr0l0hIKfWpGpqAMbpo19pen9wJfdCGv51mTmdpmM= github.com/libp2p/go-libp2p-core v0.9.0/go.mod h1:ESsbz31oC3C1AvMJoGx26RTuCkNhmkSRCqZ0kQtJ2/8= github.com/libp2p/go-libp2p-core v0.10.0/go.mod h1:ECdxehoYosLYHgDDFa2N4yE8Y7aQRAMf0sX9mf2sbGg= -github.com/libp2p/go-libp2p-core v0.11.0 h1:75jAgdA+IChNa+/mZXogfmrGkgwxkVvxmIC7pV+F6sI= github.com/libp2p/go-libp2p-core v0.11.0/go.mod h1:ECdxehoYosLYHgDDFa2N4yE8Y7aQRAMf0sX9mf2sbGg= +github.com/libp2p/go-libp2p-core v0.12.0 h1:S9bO2lhSJtOvAKo8QAdW9Zp1FEo0XkfXymqvrW6l/I8= +github.com/libp2p/go-libp2p-core v0.12.0/go.mod h1:ECdxehoYosLYHgDDFa2N4yE8Y7aQRAMf0sX9mf2sbGg= github.com/libp2p/go-libp2p-discovery v0.6.0 h1:1XdPmhMJr8Tmj/yUfkJMIi8mgwWrLUsCB3bMxdT+DSo= github.com/libp2p/go-libp2p-discovery v0.6.0/go.mod h1:/u1voHt0tKIe5oIA1RHBKQLVCWPna2dXmPNHc2zR9S8= github.com/libp2p/go-libp2p-loggables v0.1.0/go.mod h1:EyumB2Y6PrYjr55Q3/tiJ/o3xoDasoRYM7nOzEpoa90= @@ -444,8 +445,9 @@ github.com/libp2p/go-libp2p-netutil v0.1.0/go.mod h1:3Qv/aDqtMLTUyQeundkKsA+YCTh github.com/libp2p/go-libp2p-noise v0.3.0 h1:NCVH7evhVt9njbTQshzT7N1S3Q6fjj9M11FCgfH5+cA= github.com/libp2p/go-libp2p-noise v0.3.0/go.mod h1:JNjHbociDJKHD64KTkzGnzqJ0FEV5gHJa6AB00kbCNQ= github.com/libp2p/go-libp2p-peerstore v0.2.6/go.mod h1:ss/TWTgHZTMpsU/oKVVPQCGuDHItOpf2W8RxAi50P2s= -github.com/libp2p/go-libp2p-peerstore v0.4.0 h1:DOhRJLnM9Dc9lIXi3rPDZBf789LXy1BrzwIs7Tj0cKA= github.com/libp2p/go-libp2p-peerstore v0.4.0/go.mod h1:rDJUFyzEWPpXpEwywkcTYYzDHlwza8riYMaUzaN6hX0= +github.com/libp2p/go-libp2p-peerstore v0.6.0 h1:HJminhQSGISBIRb93N6WK3t6Fa8OOTnHd/VBjL4mY5A= +github.com/libp2p/go-libp2p-peerstore v0.6.0/go.mod h1:DGEmKdXrcYpK9Jha3sS7MhqYdInxJy84bIPtSu65bKc= github.com/libp2p/go-libp2p-pnet v0.2.0 h1:J6htxttBipJujEjz1y0a5+eYoiPcFHhSYHH6na5f0/k= github.com/libp2p/go-libp2p-pnet v0.2.0/go.mod h1:Qqvq6JH/oMZGwqs3N1Fqhv8NVhrdYcO0BW4wssv21LA= github.com/libp2p/go-libp2p-quic-transport v0.13.0/go.mod h1:39/ZWJ1TW/jx1iFkKzzUg00W6tDJh73FC0xYudjr7Hc= diff --git a/p2p/host/basic/basic_host.go b/p2p/host/basic/basic_host.go index 7393100826..c50d65837d 100644 --- a/p2p/host/basic/basic_host.go +++ b/p2p/host/basic/basic_host.go @@ -9,6 +9,7 @@ import ( "sync" "time" + "github.com/libp2p/go-libp2p/p2p/host/pstoremanager" "github.com/libp2p/go-libp2p/p2p/host/relaysvc" relayv2 "github.com/libp2p/go-libp2p/p2p/protocol/circuitv2/relay" "github.com/libp2p/go-libp2p/p2p/protocol/holepunch" @@ -74,6 +75,7 @@ type BasicHost struct { refCount sync.WaitGroup network network.Network + psManager *pstoremanager.PeerstoreManager mux *msmux.MultistreamMuxer ids identify.IDService hps *holepunch.Service @@ -156,6 +158,11 @@ type HostOpts struct { // NewHost constructs a new *BasicHost and activates it by attaching its stream and connection handlers to the given inet.Network. func NewHost(n network.Network, opts *HostOpts) (*BasicHost, error) { + eventBus := eventbus.NewBus() + psManager, err := pstoremanager.NewPeerstoreManager(n.Peerstore(), eventBus) + if err != nil { + return nil, err + } hostCtx, cancel := context.WithCancel(context.Background()) if opts == nil { opts = &HostOpts{} @@ -163,11 +170,12 @@ func NewHost(n network.Network, opts *HostOpts) (*BasicHost, error) { h := &BasicHost{ network: n, + psManager: psManager, mux: msmux.NewMultistreamMuxer(), negtimeout: DefaultNegotiationTimeout, AddrsFactory: DefaultAddrsFactory, maResolver: madns.DefaultResolver, - eventbus: eventbus.NewBus(), + eventbus: eventBus, addrChangeChan: make(chan struct{}, 1), ctx: hostCtx, ctxCancel: cancel, @@ -176,7 +184,6 @@ func NewHost(n network.Network, opts *HostOpts) (*BasicHost, error) { h.updateLocalIpAddr() - var err error if h.emitters.evtLocalProtocolsUpdated, err = h.eventbus.Emitter(&event.EvtLocalProtocolsUpdated{}); err != nil { return nil, err } @@ -352,6 +359,7 @@ func (h *BasicHost) updateLocalIpAddr() { // Start starts background tasks in the host func (h *BasicHost) Start() { + h.psManager.Start() h.refCount.Add(1) go h.background() } @@ -1036,6 +1044,7 @@ func (h *BasicHost) Close() error { _ = h.emitters.evtLocalAddrsUpdated.Close() h.Network().Close() + h.psManager.Close() if h.Peerstore() != nil { h.Peerstore().Close() } diff --git a/p2p/host/pstoremanager/mock_peerstore_test.go b/p2p/host/pstoremanager/mock_peerstore_test.go new file mode 100644 index 0000000000..c1414cce52 --- /dev/null +++ b/p2p/host/pstoremanager/mock_peerstore_test.go @@ -0,0 +1,456 @@ +// Code generated by MockGen. DO NOT EDIT. +// Source: github.com/libp2p/go-libp2p-core/peerstore (interfaces: Peerstore) + +// Package pstoremanager_test is a generated GoMock package. +package pstoremanager_test + +import ( + context "context" + reflect "reflect" + time "time" + + gomock "github.com/golang/mock/gomock" + crypto "github.com/libp2p/go-libp2p-core/crypto" + peer "github.com/libp2p/go-libp2p-core/peer" + multiaddr "github.com/multiformats/go-multiaddr" +) + +// MockPeerstore is a mock of Peerstore interface. +type MockPeerstore struct { + ctrl *gomock.Controller + recorder *MockPeerstoreMockRecorder +} + +// MockPeerstoreMockRecorder is the mock recorder for MockPeerstore. +type MockPeerstoreMockRecorder struct { + mock *MockPeerstore +} + +// NewMockPeerstore creates a new mock instance. +func NewMockPeerstore(ctrl *gomock.Controller) *MockPeerstore { + mock := &MockPeerstore{ctrl: ctrl} + mock.recorder = &MockPeerstoreMockRecorder{mock} + return mock +} + +// EXPECT returns an object that allows the caller to indicate expected use. +func (m *MockPeerstore) EXPECT() *MockPeerstoreMockRecorder { + return m.recorder +} + +// AddAddr mocks base method. +func (m *MockPeerstore) AddAddr(arg0 peer.ID, arg1 multiaddr.Multiaddr, arg2 time.Duration) { + m.ctrl.T.Helper() + m.ctrl.Call(m, "AddAddr", arg0, arg1, arg2) +} + +// AddAddr indicates an expected call of AddAddr. +func (mr *MockPeerstoreMockRecorder) AddAddr(arg0, arg1, arg2 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "AddAddr", reflect.TypeOf((*MockPeerstore)(nil).AddAddr), arg0, arg1, arg2) +} + +// AddAddrs mocks base method. +func (m *MockPeerstore) AddAddrs(arg0 peer.ID, arg1 []multiaddr.Multiaddr, arg2 time.Duration) { + m.ctrl.T.Helper() + m.ctrl.Call(m, "AddAddrs", arg0, arg1, arg2) +} + +// AddAddrs indicates an expected call of AddAddrs. +func (mr *MockPeerstoreMockRecorder) AddAddrs(arg0, arg1, arg2 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "AddAddrs", reflect.TypeOf((*MockPeerstore)(nil).AddAddrs), arg0, arg1, arg2) +} + +// AddPrivKey mocks base method. +func (m *MockPeerstore) AddPrivKey(arg0 peer.ID, arg1 crypto.PrivKey) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "AddPrivKey", arg0, arg1) + ret0, _ := ret[0].(error) + return ret0 +} + +// AddPrivKey indicates an expected call of AddPrivKey. +func (mr *MockPeerstoreMockRecorder) AddPrivKey(arg0, arg1 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "AddPrivKey", reflect.TypeOf((*MockPeerstore)(nil).AddPrivKey), arg0, arg1) +} + +// AddProtocols mocks base method. +func (m *MockPeerstore) AddProtocols(arg0 peer.ID, arg1 ...string) error { + m.ctrl.T.Helper() + varargs := []interface{}{arg0} + for _, a := range arg1 { + varargs = append(varargs, a) + } + ret := m.ctrl.Call(m, "AddProtocols", varargs...) + ret0, _ := ret[0].(error) + return ret0 +} + +// AddProtocols indicates an expected call of AddProtocols. +func (mr *MockPeerstoreMockRecorder) AddProtocols(arg0 interface{}, arg1 ...interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + varargs := append([]interface{}{arg0}, arg1...) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "AddProtocols", reflect.TypeOf((*MockPeerstore)(nil).AddProtocols), varargs...) +} + +// AddPubKey mocks base method. +func (m *MockPeerstore) AddPubKey(arg0 peer.ID, arg1 crypto.PubKey) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "AddPubKey", arg0, arg1) + ret0, _ := ret[0].(error) + return ret0 +} + +// AddPubKey indicates an expected call of AddPubKey. +func (mr *MockPeerstoreMockRecorder) AddPubKey(arg0, arg1 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "AddPubKey", reflect.TypeOf((*MockPeerstore)(nil).AddPubKey), arg0, arg1) +} + +// AddrStream mocks base method. +func (m *MockPeerstore) AddrStream(arg0 context.Context, arg1 peer.ID) <-chan multiaddr.Multiaddr { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "AddrStream", arg0, arg1) + ret0, _ := ret[0].(<-chan multiaddr.Multiaddr) + return ret0 +} + +// AddrStream indicates an expected call of AddrStream. +func (mr *MockPeerstoreMockRecorder) AddrStream(arg0, arg1 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "AddrStream", reflect.TypeOf((*MockPeerstore)(nil).AddrStream), arg0, arg1) +} + +// Addrs mocks base method. +func (m *MockPeerstore) Addrs(arg0 peer.ID) []multiaddr.Multiaddr { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "Addrs", arg0) + ret0, _ := ret[0].([]multiaddr.Multiaddr) + return ret0 +} + +// Addrs indicates an expected call of Addrs. +func (mr *MockPeerstoreMockRecorder) Addrs(arg0 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Addrs", reflect.TypeOf((*MockPeerstore)(nil).Addrs), arg0) +} + +// ClearAddrs mocks base method. +func (m *MockPeerstore) ClearAddrs(arg0 peer.ID) { + m.ctrl.T.Helper() + m.ctrl.Call(m, "ClearAddrs", arg0) +} + +// ClearAddrs indicates an expected call of ClearAddrs. +func (mr *MockPeerstoreMockRecorder) ClearAddrs(arg0 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ClearAddrs", reflect.TypeOf((*MockPeerstore)(nil).ClearAddrs), arg0) +} + +// Close mocks base method. +func (m *MockPeerstore) Close() error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "Close") + ret0, _ := ret[0].(error) + return ret0 +} + +// Close indicates an expected call of Close. +func (mr *MockPeerstoreMockRecorder) Close() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Close", reflect.TypeOf((*MockPeerstore)(nil).Close)) +} + +// FirstSupportedProtocol mocks base method. +func (m *MockPeerstore) FirstSupportedProtocol(arg0 peer.ID, arg1 ...string) (string, error) { + m.ctrl.T.Helper() + varargs := []interface{}{arg0} + for _, a := range arg1 { + varargs = append(varargs, a) + } + ret := m.ctrl.Call(m, "FirstSupportedProtocol", varargs...) + ret0, _ := ret[0].(string) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// FirstSupportedProtocol indicates an expected call of FirstSupportedProtocol. +func (mr *MockPeerstoreMockRecorder) FirstSupportedProtocol(arg0 interface{}, arg1 ...interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + varargs := append([]interface{}{arg0}, arg1...) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "FirstSupportedProtocol", reflect.TypeOf((*MockPeerstore)(nil).FirstSupportedProtocol), varargs...) +} + +// Get mocks base method. +func (m *MockPeerstore) Get(arg0 peer.ID, arg1 string) (interface{}, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "Get", arg0, arg1) + ret0, _ := ret[0].(interface{}) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// Get indicates an expected call of Get. +func (mr *MockPeerstoreMockRecorder) Get(arg0, arg1 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Get", reflect.TypeOf((*MockPeerstore)(nil).Get), arg0, arg1) +} + +// GetProtocols mocks base method. +func (m *MockPeerstore) GetProtocols(arg0 peer.ID) ([]string, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "GetProtocols", arg0) + ret0, _ := ret[0].([]string) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// GetProtocols indicates an expected call of GetProtocols. +func (mr *MockPeerstoreMockRecorder) GetProtocols(arg0 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetProtocols", reflect.TypeOf((*MockPeerstore)(nil).GetProtocols), arg0) +} + +// LatencyEWMA mocks base method. +func (m *MockPeerstore) LatencyEWMA(arg0 peer.ID) time.Duration { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "LatencyEWMA", arg0) + ret0, _ := ret[0].(time.Duration) + return ret0 +} + +// LatencyEWMA indicates an expected call of LatencyEWMA. +func (mr *MockPeerstoreMockRecorder) LatencyEWMA(arg0 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "LatencyEWMA", reflect.TypeOf((*MockPeerstore)(nil).LatencyEWMA), arg0) +} + +// PeerInfo mocks base method. +func (m *MockPeerstore) PeerInfo(arg0 peer.ID) peer.AddrInfo { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "PeerInfo", arg0) + ret0, _ := ret[0].(peer.AddrInfo) + return ret0 +} + +// PeerInfo indicates an expected call of PeerInfo. +func (mr *MockPeerstoreMockRecorder) PeerInfo(arg0 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "PeerInfo", reflect.TypeOf((*MockPeerstore)(nil).PeerInfo), arg0) +} + +// Peers mocks base method. +func (m *MockPeerstore) Peers() peer.IDSlice { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "Peers") + ret0, _ := ret[0].(peer.IDSlice) + return ret0 +} + +// Peers indicates an expected call of Peers. +func (mr *MockPeerstoreMockRecorder) Peers() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Peers", reflect.TypeOf((*MockPeerstore)(nil).Peers)) +} + +// PeersWithAddrs mocks base method. +func (m *MockPeerstore) PeersWithAddrs() peer.IDSlice { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "PeersWithAddrs") + ret0, _ := ret[0].(peer.IDSlice) + return ret0 +} + +// PeersWithAddrs indicates an expected call of PeersWithAddrs. +func (mr *MockPeerstoreMockRecorder) PeersWithAddrs() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "PeersWithAddrs", reflect.TypeOf((*MockPeerstore)(nil).PeersWithAddrs)) +} + +// PeersWithKeys mocks base method. +func (m *MockPeerstore) PeersWithKeys() peer.IDSlice { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "PeersWithKeys") + ret0, _ := ret[0].(peer.IDSlice) + return ret0 +} + +// PeersWithKeys indicates an expected call of PeersWithKeys. +func (mr *MockPeerstoreMockRecorder) PeersWithKeys() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "PeersWithKeys", reflect.TypeOf((*MockPeerstore)(nil).PeersWithKeys)) +} + +// PrivKey mocks base method. +func (m *MockPeerstore) PrivKey(arg0 peer.ID) crypto.PrivKey { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "PrivKey", arg0) + ret0, _ := ret[0].(crypto.PrivKey) + return ret0 +} + +// PrivKey indicates an expected call of PrivKey. +func (mr *MockPeerstoreMockRecorder) PrivKey(arg0 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "PrivKey", reflect.TypeOf((*MockPeerstore)(nil).PrivKey), arg0) +} + +// PubKey mocks base method. +func (m *MockPeerstore) PubKey(arg0 peer.ID) crypto.PubKey { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "PubKey", arg0) + ret0, _ := ret[0].(crypto.PubKey) + return ret0 +} + +// PubKey indicates an expected call of PubKey. +func (mr *MockPeerstoreMockRecorder) PubKey(arg0 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "PubKey", reflect.TypeOf((*MockPeerstore)(nil).PubKey), arg0) +} + +// Put mocks base method. +func (m *MockPeerstore) Put(arg0 peer.ID, arg1 string, arg2 interface{}) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "Put", arg0, arg1, arg2) + ret0, _ := ret[0].(error) + return ret0 +} + +// Put indicates an expected call of Put. +func (mr *MockPeerstoreMockRecorder) Put(arg0, arg1, arg2 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Put", reflect.TypeOf((*MockPeerstore)(nil).Put), arg0, arg1, arg2) +} + +// RecordLatency mocks base method. +func (m *MockPeerstore) RecordLatency(arg0 peer.ID, arg1 time.Duration) { + m.ctrl.T.Helper() + m.ctrl.Call(m, "RecordLatency", arg0, arg1) +} + +// RecordLatency indicates an expected call of RecordLatency. +func (mr *MockPeerstoreMockRecorder) RecordLatency(arg0, arg1 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "RecordLatency", reflect.TypeOf((*MockPeerstore)(nil).RecordLatency), arg0, arg1) +} + +// RemovePeer mocks base method. +func (m *MockPeerstore) RemovePeer(arg0 peer.ID) { + m.ctrl.T.Helper() + m.ctrl.Call(m, "RemovePeer", arg0) +} + +// RemovePeer indicates an expected call of RemovePeer. +func (mr *MockPeerstoreMockRecorder) RemovePeer(arg0 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "RemovePeer", reflect.TypeOf((*MockPeerstore)(nil).RemovePeer), arg0) +} + +// RemoveProtocols mocks base method. +func (m *MockPeerstore) RemoveProtocols(arg0 peer.ID, arg1 ...string) error { + m.ctrl.T.Helper() + varargs := []interface{}{arg0} + for _, a := range arg1 { + varargs = append(varargs, a) + } + ret := m.ctrl.Call(m, "RemoveProtocols", varargs...) + ret0, _ := ret[0].(error) + return ret0 +} + +// RemoveProtocols indicates an expected call of RemoveProtocols. +func (mr *MockPeerstoreMockRecorder) RemoveProtocols(arg0 interface{}, arg1 ...interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + varargs := append([]interface{}{arg0}, arg1...) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "RemoveProtocols", reflect.TypeOf((*MockPeerstore)(nil).RemoveProtocols), varargs...) +} + +// SetAddr mocks base method. +func (m *MockPeerstore) SetAddr(arg0 peer.ID, arg1 multiaddr.Multiaddr, arg2 time.Duration) { + m.ctrl.T.Helper() + m.ctrl.Call(m, "SetAddr", arg0, arg1, arg2) +} + +// SetAddr indicates an expected call of SetAddr. +func (mr *MockPeerstoreMockRecorder) SetAddr(arg0, arg1, arg2 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "SetAddr", reflect.TypeOf((*MockPeerstore)(nil).SetAddr), arg0, arg1, arg2) +} + +// SetAddrs mocks base method. +func (m *MockPeerstore) SetAddrs(arg0 peer.ID, arg1 []multiaddr.Multiaddr, arg2 time.Duration) { + m.ctrl.T.Helper() + m.ctrl.Call(m, "SetAddrs", arg0, arg1, arg2) +} + +// SetAddrs indicates an expected call of SetAddrs. +func (mr *MockPeerstoreMockRecorder) SetAddrs(arg0, arg1, arg2 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "SetAddrs", reflect.TypeOf((*MockPeerstore)(nil).SetAddrs), arg0, arg1, arg2) +} + +// SetProtocols mocks base method. +func (m *MockPeerstore) SetProtocols(arg0 peer.ID, arg1 ...string) error { + m.ctrl.T.Helper() + varargs := []interface{}{arg0} + for _, a := range arg1 { + varargs = append(varargs, a) + } + ret := m.ctrl.Call(m, "SetProtocols", varargs...) + ret0, _ := ret[0].(error) + return ret0 +} + +// SetProtocols indicates an expected call of SetProtocols. +func (mr *MockPeerstoreMockRecorder) SetProtocols(arg0 interface{}, arg1 ...interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + varargs := append([]interface{}{arg0}, arg1...) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "SetProtocols", reflect.TypeOf((*MockPeerstore)(nil).SetProtocols), varargs...) +} + +// Start mocks base method. +func (m *MockPeerstore) Start() { + m.ctrl.T.Helper() + m.ctrl.Call(m, "Start") +} + +// Start indicates an expected call of Start. +func (mr *MockPeerstoreMockRecorder) Start() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Start", reflect.TypeOf((*MockPeerstore)(nil).Start)) +} + +// SupportsProtocols mocks base method. +func (m *MockPeerstore) SupportsProtocols(arg0 peer.ID, arg1 ...string) ([]string, error) { + m.ctrl.T.Helper() + varargs := []interface{}{arg0} + for _, a := range arg1 { + varargs = append(varargs, a) + } + ret := m.ctrl.Call(m, "SupportsProtocols", varargs...) + ret0, _ := ret[0].([]string) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// SupportsProtocols indicates an expected call of SupportsProtocols. +func (mr *MockPeerstoreMockRecorder) SupportsProtocols(arg0 interface{}, arg1 ...interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + varargs := append([]interface{}{arg0}, arg1...) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "SupportsProtocols", reflect.TypeOf((*MockPeerstore)(nil).SupportsProtocols), varargs...) +} + +// UpdateAddrs mocks base method. +func (m *MockPeerstore) UpdateAddrs(arg0 peer.ID, arg1, arg2 time.Duration) { + m.ctrl.T.Helper() + m.ctrl.Call(m, "UpdateAddrs", arg0, arg1, arg2) +} + +// UpdateAddrs indicates an expected call of UpdateAddrs. +func (mr *MockPeerstoreMockRecorder) UpdateAddrs(arg0, arg1, arg2 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "UpdateAddrs", reflect.TypeOf((*MockPeerstore)(nil).UpdateAddrs), arg0, arg1, arg2) +} diff --git a/p2p/host/pstoremanager/pstoremanager.go b/p2p/host/pstoremanager/pstoremanager.go new file mode 100644 index 0000000000..311ca0c98b --- /dev/null +++ b/p2p/host/pstoremanager/pstoremanager.go @@ -0,0 +1,130 @@ +package pstoremanager + +import ( + "context" + "sync" + "time" + + "github.com/libp2p/go-libp2p-core/event" + "github.com/libp2p/go-libp2p-core/network" + "github.com/libp2p/go-libp2p-core/peer" + "github.com/libp2p/go-libp2p-core/peerstore" + + logging "github.com/ipfs/go-log/v2" +) + +var log = logging.Logger("pstoremanager") + +type Option func(*PeerstoreManager) error + +// WithGracePeriod sets the grace period. +// If a peer doesn't reconnect during the grace period, its data is removed. +// Default: 1 minute. +func WithGracePeriod(p time.Duration) Option { + return func(m *PeerstoreManager) error { + m.gracePeriod = p + return nil + } +} + +// WithCleanupInterval set the clean up interval. +// During a clean up run peers that disconnected before the grace period are removed. +// If unset, the interval is set to half the grace period. +func WithCleanupInterval(t time.Duration) Option { + return func(m *PeerstoreManager) error { + m.cleanupInterval = t + return nil + } +} + +type PeerstoreManager struct { + pstore peerstore.Peerstore + eventBus event.Bus + + cancel context.CancelFunc + refCount sync.WaitGroup + + gracePeriod time.Duration + cleanupInterval time.Duration +} + +func NewPeerstoreManager(pstore peerstore.Peerstore, eventBus event.Bus, opts ...Option) (*PeerstoreManager, error) { + m := &PeerstoreManager{ + pstore: pstore, + gracePeriod: time.Minute, + eventBus: eventBus, + } + for _, opt := range opts { + if err := opt(m); err != nil { + return nil, err + } + } + if m.cleanupInterval == 0 { + m.cleanupInterval = m.gracePeriod / 2 + } + return m, nil +} + +func (m *PeerstoreManager) Start() { + ctx, cancel := context.WithCancel(context.Background()) + m.cancel = cancel + sub, err := m.eventBus.Subscribe(&event.EvtPeerConnectednessChanged{}) + if err != nil { + log.Warnf("subscription failed. Peerstore manager not activated. Error: %s", err) + return + } + m.refCount.Add(1) + go m.background(ctx, sub) +} + +func (m *PeerstoreManager) background(ctx context.Context, sub event.Subscription) { + defer m.refCount.Done() + defer sub.Close() + disconnected := make(map[peer.ID]time.Time) + + ticker := time.NewTicker(m.cleanupInterval) + defer ticker.Stop() + + defer func() { + for p := range disconnected { + m.pstore.RemovePeer(p) + } + }() + + for { + select { + case e, ok := <-sub.Out(): + if !ok { + return + } + ev := e.(event.EvtPeerConnectednessChanged) + p := ev.Peer + switch ev.Connectedness { + case network.NotConnected: + if _, ok := disconnected[p]; !ok { + disconnected[p] = time.Now() + } + case network.Connected: + // If we reconnect to the peer before we've cleared the information, keep it. + delete(disconnected, p) + } + case now := <-ticker.C: + for p, disconnectTime := range disconnected { + if disconnectTime.Add(m.gracePeriod).Before(now) { + m.pstore.RemovePeer(p) + delete(disconnected, p) + } + } + case <-ctx.Done(): + return + } + } +} + +func (m *PeerstoreManager) Close() error { + if m.cancel != nil { + m.cancel() + } + m.refCount.Wait() + return nil +} diff --git a/p2p/host/pstoremanager/pstoremanager_test.go b/p2p/host/pstoremanager/pstoremanager_test.go new file mode 100644 index 0000000000..a535d1bde6 --- /dev/null +++ b/p2p/host/pstoremanager/pstoremanager_test.go @@ -0,0 +1,98 @@ +package pstoremanager_test + +import ( + "testing" + "time" + + "github.com/libp2p/go-libp2p/p2p/host/pstoremanager" + + "github.com/libp2p/go-libp2p-core/event" + "github.com/libp2p/go-libp2p-core/network" + "github.com/libp2p/go-libp2p-core/peer" + + "github.com/libp2p/go-eventbus" + + "github.com/golang/mock/gomock" + "github.com/stretchr/testify/require" +) + +//go:generate sh -c "mockgen -package pstoremanager_test -destination mock_peerstore_test.go github.com/libp2p/go-libp2p-core/peerstore Peerstore" + +func TestGracePeriod(t *testing.T) { + t.Parallel() + ctrl := gomock.NewController(t) + defer ctrl.Finish() + eventBus := eventbus.NewBus() + pstore := NewMockPeerstore(ctrl) + const gracePeriod = 250 * time.Millisecond + man, err := pstoremanager.NewPeerstoreManager(pstore, eventBus, pstoremanager.WithGracePeriod(gracePeriod)) + require.NoError(t, err) + defer man.Close() + man.Start() + + emitter, err := eventBus.Emitter(new(event.EvtPeerConnectednessChanged)) + require.NoError(t, err) + start := time.Now() + removed := make(chan struct{}) + pstore.EXPECT().RemovePeer(peer.ID("foobar")).DoAndReturn(func(p peer.ID) { + defer close(removed) + // make sure the call happened after the grace period + require.GreaterOrEqual(t, time.Since(start), gracePeriod) + require.LessOrEqual(t, time.Since(start), 3*gracePeriod) + }) + require.NoError(t, emitter.Emit(event.EvtPeerConnectednessChanged{ + Peer: "foobar", + Connectedness: network.NotConnected, + })) + <-removed +} + +func TestReconnect(t *testing.T) { + t.Parallel() + ctrl := gomock.NewController(t) + eventBus := eventbus.NewBus() + pstore := NewMockPeerstore(ctrl) + const gracePeriod = 200 * time.Millisecond + man, err := pstoremanager.NewPeerstoreManager(pstore, eventBus, pstoremanager.WithGracePeriod(gracePeriod)) + require.NoError(t, err) + defer man.Close() + man.Start() + + emitter, err := eventBus.Emitter(new(event.EvtPeerConnectednessChanged)) + require.NoError(t, err) + require.NoError(t, emitter.Emit(event.EvtPeerConnectednessChanged{ + Peer: "foobar", + Connectedness: network.NotConnected, + })) + require.NoError(t, emitter.Emit(event.EvtPeerConnectednessChanged{ + Peer: "foobar", + Connectedness: network.Connected, + })) + time.Sleep(gracePeriod * 3 / 2) + // There should have been no calls to RemovePeer. + ctrl.Finish() +} + +func TestClose(t *testing.T) { + t.Parallel() + ctrl := gomock.NewController(t) + defer ctrl.Finish() + eventBus := eventbus.NewBus() + pstore := NewMockPeerstore(ctrl) + const gracePeriod = time.Hour + man, err := pstoremanager.NewPeerstoreManager(pstore, eventBus, pstoremanager.WithGracePeriod(gracePeriod)) + require.NoError(t, err) + man.Start() + + emitter, err := eventBus.Emitter(new(event.EvtPeerConnectednessChanged)) + require.NoError(t, err) + require.NoError(t, emitter.Emit(event.EvtPeerConnectednessChanged{ + Peer: "foobar", + Connectedness: network.NotConnected, + })) + time.Sleep(10 * time.Millisecond) // make sure the event is sent before we close + done := make(chan struct{}) + pstore.EXPECT().RemovePeer(peer.ID("foobar")).Do(func(peer.ID) { close(done) }) + require.NoError(t, man.Close()) + <-done +}