Skip to content

Commit

Permalink
Merge pull request #48 from Peergos/fix/peergos
Browse files Browse the repository at this point in the history
for use in peergos
  • Loading branch information
ianopolous authored Jul 26, 2023
2 parents 9be8165 + 630d097 commit cd42a07
Show file tree
Hide file tree
Showing 37 changed files with 320 additions and 183 deletions.
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,7 @@
<dependency>
<groupId>com.github.peergos</groupId>
<artifactId>jvm-libp2p</artifactId>
<version>0.14.0</version>
<version>0.16.0</version>
</dependency>
<dependency>
<groupId>org.bouncycastle</groupId>
Expand Down
11 changes: 10 additions & 1 deletion src/main/java/org/peergos/BitswapBlockService.java
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import io.ipfs.cid.*;
import io.libp2p.core.*;
import org.peergos.protocol.bitswap.*;
import org.peergos.protocol.dht.*;

import java.util.*;
import java.util.stream.*;
Expand All @@ -11,14 +12,22 @@ public class BitswapBlockService implements BlockService {

private final Host us;
private final Bitswap bitswap;
private final Kademlia dht;

public BitswapBlockService(Host us, Bitswap bitswap) {
public BitswapBlockService(Host us, Bitswap bitswap, Kademlia dht) {
this.us = us;
this.bitswap = bitswap;
this.dht = dht;
}

@Override
public List<HashedBlock> get(List<Want> hashes, Set<PeerId> peers, boolean addToBlockstore) {
if (peers.isEmpty()) {
List<PeerAddresses> providers = dht.findProviders(hashes.get(0).cid, us, 5).join();
peers = providers.stream()
.map(p -> PeerId.fromBase58(p.peerId.toBase58()))
.collect(Collectors.toSet());
}
return bitswap.get(hashes, us, peers, addToBlockstore)
.stream()
.map(f -> f.join())
Expand Down
10 changes: 8 additions & 2 deletions src/main/java/org/peergos/EmbeddedIpfs.java
Original file line number Diff line number Diff line change
@@ -1,13 +1,18 @@
package org.peergos;

import identify.pb.*;
import io.ipfs.multiaddr.*;
import io.ipfs.multihash.*;
import io.ipfs.multihash.Multihash;
import io.libp2p.core.*;
import io.libp2p.core.multiformats.*;
import io.libp2p.core.multistream.*;
import io.libp2p.etc.types.*;
import io.libp2p.protocol.*;
import org.peergos.blockstore.*;
import org.peergos.blockstore.s3.S3Blockstore;
import org.peergos.config.*;
import org.peergos.protocol.*;
import org.peergos.protocol.autonat.*;
import org.peergos.protocol.bitswap.*;
import org.peergos.protocol.circuit.*;
Expand Down Expand Up @@ -47,7 +52,7 @@ public EmbeddedIpfs(Host node,
this.bitswap = bitswap;
this.p2pHttp = p2pHttp;
this.bootstrap = bootstrap;
this.blocks = new BitswapBlockService(node, bitswap);
this.blocks = new BitswapBlockService(node, bitswap, dht);
}

public List<HashedBlock> getBlocks(List<Want> wants, Set<PeerId> peers, boolean addToLocal) {
Expand Down Expand Up @@ -75,6 +80,7 @@ public List<HashedBlock> getBlocks(List<Want> wants, Set<PeerId> peers, boolean

public void start() {
node.start().join();
IdentifyBuilder.addIdentifyProtocol(node);
LOG.info("Node started and listening on " + node.listenAddresses());
LOG.info("Starting bootstrap process");
int connections = dht.bootstrapRoutingTable(node, bootstrap, addr -> !addr.contains("/wss/"));
Expand Down Expand Up @@ -131,7 +137,7 @@ public static EmbeddedIpfs build(Path ipfsPath,
}
Multihash ourPeerId = Multihash.deserialize(builder.getPeerId().getBytes());

Kademlia dht = new Kademlia(new KademliaEngine(ourPeerId, providers, records), false);
Kademlia dht = new Kademlia(new KademliaEngine(ourPeerId, providers, records, blockstore), false);
CircuitStopProtocol.Binding stop = new CircuitStopProtocol.Binding();
CircuitHopProtocol.RelayManager relayManager = CircuitHopProtocol.RelayManager.limitTo(builder.getPrivateKey(), ourPeerId, 5);
Bitswap bitswap = new Bitswap(new BitswapEngine(blockstore, authoriser));
Expand Down
43 changes: 25 additions & 18 deletions src/main/java/org/peergos/HostBuilder.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@
import org.peergos.protocol.circuit.*;
import org.peergos.protocol.dht.*;
import java.util.*;
import java.util.concurrent.*;
import java.util.concurrent.atomic.*;
import java.util.stream.*;

public class HostBuilder {
Expand Down Expand Up @@ -112,7 +114,7 @@ public static HostBuilder create(int listenPort,
.generateIdentity()
.listen(List.of(new MultiAddress("/ip4/0.0.0.0/tcp/" + listenPort)));
Multihash ourPeerId = Multihash.deserialize(builder.peerId.getBytes());
Kademlia dht = new Kademlia(new KademliaEngine(ourPeerId, providers, records), false);
Kademlia dht = new Kademlia(new KademliaEngine(ourPeerId, providers, records, blocks), false);
CircuitStopProtocol.Binding stop = new CircuitStopProtocol.Binding();
CircuitHopProtocol.RelayManager relayManager = CircuitHopProtocol.RelayManager.limitTo(builder.privKey, ourPeerId, 5);
return builder.addProtocols(List.of(
Expand Down Expand Up @@ -145,32 +147,37 @@ public static Host build(PrivKey privKey,
Host host = BuilderJKt.hostJ(Builder.Defaults.None, b -> {
b.getIdentity().setFactory(() -> privKey);
b.getTransports().add(TcpTransport::new);
b.getSecureChannels().add((k, m) -> new NoiseXXSecureChannel(k, m));
b.getSecureChannels().add((k, m) -> new TlsSecureChannel(k, m));
b.getSecureChannels().add(NoiseXXSecureChannel::new);
b.getSecureChannels().add(TlsSecureChannel::new);

b.getMuxers().addAll(muxers);
b.getAddressBook().setImpl(new RamAddressBook());
RamAddressBook addrs = new RamAddressBook();
b.getAddressBook().setImpl(addrs);
// Uncomment to add mux debug logging
// b.getDebug().getMuxFramesHandler().addLogger(LogLevel.INFO, "MUX");

for (ProtocolBinding<?> protocol : protocols) {
b.getProtocols().add(protocol);
if (protocol instanceof AddressBookConsumer)
((AddressBookConsumer) protocol).setAddressBook(b.getAddressBook().getImpl());
((AddressBookConsumer) protocol).setAddressBook(addrs);
}

IdentifyOuterClass.Identify.Builder identifyBuilder = IdentifyOuterClass.Identify.newBuilder()
.setProtocolVersion("ipfs/0.1.0")
.setAgentVersion("nabu/v0.1.0")
.setPublicKey(ByteArrayExtKt.toProtobuf(privKey.publicKey().bytes()))
.addAllListenAddrs(listenAddrs.stream()
.map(Multiaddr::fromString)
.map(Multiaddr::serialize)
.map(ByteArrayExtKt::toProtobuf)
.collect(Collectors.toList()));
for (ProtocolBinding<?> protocol : protocols) {
identifyBuilder = identifyBuilder.addAllProtocols(protocol.getProtocolDescriptor().getAnnounceProtocols());
}
b.getProtocols().add(new Identify(identifyBuilder.build()));
// Send an identify req on all new incoming connections
b.getConnectionHandlers().add(connection -> {
PeerId remotePeer = connection.secureSession().getRemoteId();
Multiaddr remote = connection.remoteAddress().withP2P(remotePeer);
addrs.addAddrs(remotePeer, 0, remote);
if (connection.isInitiator())
return;
StreamPromise<IdentifyController> stream = connection.muxerSession()
.createStream(new IdentifyBinding(new IdentifyProtocol()));
stream.getController()
.thenCompose(IdentifyController::id)
.thenApply(remoteId -> addrs.addAddrs(remotePeer, 0, remoteId.getListenAddrsList()
.stream()
.map(bytes -> Multiaddr.deserialize(bytes.toByteArray()))
.toArray(Multiaddr[]::new)));
});

for (String listenAddr : listenAddrs) {
b.getNetwork().listen(listenAddr);
Expand Down
21 changes: 15 additions & 6 deletions src/main/java/org/peergos/HttpProxyService.java
Original file line number Diff line number Diff line change
Expand Up @@ -37,18 +37,22 @@ public HttpProxyService(Host node, HttpProtocol.Binding p2pHttpBinding, Kademlia
public ProxyResponse proxyRequest(Multihash targetNodeId, ProxyRequest request) throws IOException, ProxyException {

AddressBook addressBook = node.getAddressBook();
Optional<Multiaddr> targetAddressesOpt = addressBook.get(PeerId.fromBase58(targetNodeId.bareMultihash().toBase58())).join().stream().findFirst();
PeerId peerId = PeerId.fromBase58(targetNodeId.bareMultihash().toBase58());
Optional<Multiaddr> targetAddressesOpt = addressBook.get(peerId).join().stream().findFirst();
Multiaddr[] allAddresses = null;
if (targetAddressesOpt.isEmpty()) {
List<PeerAddresses> closestPeers = dht.findClosestPeers(targetNodeId, 20, node);
Optional<PeerAddresses> matching = closestPeers.stream().filter(p -> p.peerId.equals(targetNodeId)).findFirst();
if (matching.isEmpty()) {
throw new ProxyException("Target not found: " + targetNodeId);
}
PeerAddresses peer = matching.get();
Multiaddr[] addrs = peer.getPublicAddresses().stream().map(a -> Multiaddr.fromString(a.toString())).toArray(Multiaddr[]::new);
targetAddressesOpt = Optional.of(addrs[0]);
allAddresses = peer.getPublicAddresses().stream().map(a -> Multiaddr.fromString(a.toString())).toArray(Multiaddr[]::new);
}
HttpProtocol.HttpController proxier = p2pHttpBinding.dial(node, targetAddressesOpt.get()).getController().join();
Multiaddr[] addressesToDial = targetAddressesOpt.isPresent() ?
Arrays.asList(targetAddressesOpt.get()).toArray(Multiaddr[]::new)
: allAddresses;
HttpProtocol.HttpController proxier = p2pHttpBinding.dial(node, peerId, addressesToDial).getController().join();
String urlParams = constructQueryParamString(request.queryParams);
FullHttpRequest httpRequest = new DefaultFullHttpRequest(HttpVersion.HTTP_1_1,
HttpMethod.valueOf(request.method.name()),
Expand All @@ -66,9 +70,14 @@ public ProxyResponse proxyRequest(Multihash targetNodeId, ProxyRequest request)
resp.content().readBytes(bout, contentLength);
Map<String, String> headers = new HashMap<>();
for (Map.Entry<String, String> entry: resp.headers().entries()) {
headers.put(entry.getKey(), entry.getValue());
String key = entry.getKey();
if (key != null) {
headers.put(key, entry.getValue());
}
}
return new ProxyResponse(bout.toByteArray(), headers, resp.status().code());
int code = resp.status().code();
resp.release();
return new ProxyResponse(bout.toByteArray(), headers, code);
}
private String constructQueryParamString(Map<String, List<String>> queryParams) {
StringBuilder sb = new StringBuilder();
Expand Down
10 changes: 1 addition & 9 deletions src/main/java/org/peergos/Nabu.java
Original file line number Diff line number Diff line change
Expand Up @@ -31,15 +31,7 @@ public class Nabu {
private static final Logger LOG = Logger.getLogger(Nabu.class.getName());

private static HttpProtocol.HttpRequestProcessor proxyHandler(MultiAddress target) {
return (s, req, h) -> {
try {
FullHttpResponse reply = RequestSender.proxy(target, (FullHttpRequest) req);
h.accept(reply.retain());
} catch (IOException ioe) {
FullHttpResponse exceptionReply = HttpUtil.replyError(ioe);
h.accept(exceptionReply.retain());
}
};
return (s, req, h) -> HttpProtocol.proxyRequest(req, new InetSocketAddress(target.getHost(), target.getPort()), h);
}

public Nabu(Args args) throws Exception {
Expand Down
10 changes: 5 additions & 5 deletions src/main/java/org/peergos/Want.java
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,10 @@

public class Want {
public final Cid cid;
public final Optional<String> auth;
public Want(Cid cid, Optional<String> auth) {
public final Optional<String> authHex;
public Want(Cid cid, Optional<String> authHex) {
this.cid = cid;
this.auth = auth;
this.authHex = authHex.flatMap(a -> a.isEmpty() ? Optional.empty() : Optional.of(a));
}

public Want(Cid h) {
Expand All @@ -21,11 +21,11 @@ public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
Want want = (Want) o;
return cid.equals(want.cid) && auth.equals(want.auth);
return cid.equals(want.cid) && authHex.equals(want.authHex);
}

@Override
public int hashCode() {
return Objects.hash(cid, auth);
return Objects.hash(cid, authHex);
}
}
2 changes: 2 additions & 0 deletions src/main/java/org/peergos/blockstore/Blockstore.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ default Cid keyToHash(String key) {

CompletableFuture<Boolean> has(Cid c);

CompletableFuture<Boolean> hasAny(Multihash h);

CompletableFuture<Optional<byte[]>> get(Cid c);

CompletableFuture<Cid> put(byte[] block, Cid.Codec codec);
Expand Down
7 changes: 7 additions & 0 deletions src/main/java/org/peergos/blockstore/FileBlockstore.java
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import io.ipfs.cid.Cid;
import io.ipfs.multihash.Multihash;
import org.peergos.Hash;
import org.peergos.util.*;

import java.io.*;
import java.nio.file.Files;
Expand Down Expand Up @@ -60,6 +61,12 @@ public CompletableFuture<Boolean> has(Cid cid) {
return CompletableFuture.completedFuture(file.exists());
}

@Override
public CompletableFuture<Boolean> hasAny(Multihash h) {
return Futures.of(Stream.of(Cid.Codec.DagCbor, Cid.Codec.Raw, Cid.Codec.DagProtobuf)
.anyMatch(c -> has(new Cid(1, c, h.getType(), h.getHash())).join()));
}

@Override
public CompletableFuture<Optional<byte[]>> get(Cid cid) {
try {
Expand Down
9 changes: 9 additions & 0 deletions src/main/java/org/peergos/blockstore/FilteredBlockstore.java
Original file line number Diff line number Diff line change
@@ -1,10 +1,13 @@
package org.peergos.blockstore;

import io.ipfs.cid.Cid;
import io.ipfs.multihash.*;
import org.peergos.util.*;

import java.util.List;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.stream.*;

public class FilteredBlockstore implements Blockstore {

Expand All @@ -28,6 +31,12 @@ public CompletableFuture<Boolean> has(Cid c) {
return CompletableFuture.completedFuture(false);
}

@Override
public CompletableFuture<Boolean> hasAny(Multihash h) {
return Futures.of(Stream.of(Cid.Codec.DagCbor, Cid.Codec.Raw, Cid.Codec.DagProtobuf)
.anyMatch(c -> has(new Cid(1, c, h.getType(), h.getHash())).join()));
}

@Override
public CompletableFuture<Optional<byte[]>> get(Cid c) {
if (filter.has(c))
Expand Down
6 changes: 6 additions & 0 deletions src/main/java/org/peergos/blockstore/ProvidingBlockstore.java
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package org.peergos.blockstore;

import io.ipfs.cid.*;
import io.ipfs.multihash.*;

import java.util.*;
import java.util.concurrent.*;
Expand All @@ -19,6 +20,11 @@ public CompletableFuture<Boolean> has(Cid c) {
return target.has(c);
}

@Override
public CompletableFuture<Boolean> hasAny(Multihash h) {
return target.hasAny(h);
}

@Override
public CompletableFuture<Optional<byte[]>> get(Cid c) {
return target.get(c);
Expand Down
8 changes: 8 additions & 0 deletions src/main/java/org/peergos/blockstore/RamBlockstore.java
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,11 @@
import io.ipfs.cid.*;
import io.ipfs.multihash.*;
import org.peergos.*;
import org.peergos.util.*;

import java.util.*;
import java.util.concurrent.*;
import java.util.stream.*;

public class RamBlockstore implements Blockstore {

Expand All @@ -16,6 +18,12 @@ public CompletableFuture<Boolean> has(Cid c) {
return CompletableFuture.completedFuture(blocks.containsKey(c));
}

@Override
public CompletableFuture<Boolean> hasAny(Multihash h) {
return Futures.of(Stream.of(Cid.Codec.DagCbor, Cid.Codec.Raw, Cid.Codec.DagProtobuf)
.anyMatch(c -> has(new Cid(1, c, h.getType(), h.getHash())).join()));
}

@Override
public CompletableFuture<Optional<byte[]>> get(Cid c) {
return CompletableFuture.completedFuture(Optional.ofNullable(blocks.get(c)));
Expand Down
11 changes: 11 additions & 0 deletions src/main/java/org/peergos/blockstore/TypeLimitedBlockstore.java
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package org.peergos.blockstore;

import io.ipfs.cid.Cid;
import io.ipfs.multihash.*;
import org.peergos.util.*;

import java.util.List;
import java.util.Optional;
Expand All @@ -24,6 +26,15 @@ public CompletableFuture<Boolean> bloomAdd(Cid cid) {
throw new IllegalArgumentException("Unsupported codec: " + cid.codec);
}

@Override
public CompletableFuture<Boolean> hasAny(Multihash h) {
for (Cid.Codec codec : allowedCodecs) {
if (has(new Cid(1, codec, h.getType(), h.getHash())).join())
return Futures.of(true);
}
return Futures.of(false);
}

@Override
public CompletableFuture<Boolean> has(Cid cid) {
if (allowedCodecs.contains(cid.codec)) {
Expand Down
7 changes: 7 additions & 0 deletions src/main/java/org/peergos/blockstore/s3/S3Blockstore.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import java.util.function.Supplier;
import java.util.logging.Level;
import java.util.logging.Logger;
import java.util.stream.*;

public class S3Blockstore implements Blockstore {

Expand Down Expand Up @@ -164,6 +165,12 @@ public CompletableFuture<Boolean> has(Cid cid) {
return getWithBackoff(() -> getSizeWithoutRetry(cid)).thenApply(optSize -> optSize.isPresent());
}

@Override
public CompletableFuture<Boolean> hasAny(Multihash h) {
return Futures.of(Stream.of(Cid.Codec.DagCbor, Cid.Codec.Raw, Cid.Codec.DagProtobuf)
.anyMatch(c -> has(new Cid(1, c, h.getType(), h.getHash())).join()));
}

private CompletableFuture<Optional<Integer>> getSizeWithoutRetry(Cid cid) {
try {
PresignedUrl headUrl = S3Request.preSignHead(folder + hashToKey(cid), Optional.of(60),
Expand Down
Loading

0 comments on commit cd42a07

Please sign in to comment.