Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Reduce initial data request #4586

Merged
merged 30 commits into from
Oct 2, 2020
Merged
Show file tree
Hide file tree
Changes from 8 commits
Commits
Show all changes
30 commits
Select commit Hold shift + click to select a range
2e50e4c
Add PersistableNetworkPayloadStore
chimp1984 Oct 1, 2020
240f0b9
Use PersistableNetworkPayloadStore as base class for stores which had…
chimp1984 Oct 1, 2020
431debe
Unrelated to PR topic fix: Only log warnings if banned object is not …
chimp1984 Oct 1, 2020
b90fd39
Unrelated to PR topic: Improve logs for windows: \n is not recognized…
chimp1984 Oct 1, 2020
7122ef0
Refactoring: Rename variables
chimp1984 Oct 1, 2020
df90b24
Add new methods and refactor APIs. Needed later for new classes....
chimp1984 Oct 1, 2020
9446f28
Split read store and get store so it can be reused from new class (fu…
chimp1984 Oct 1, 2020
384152f
Add version field to data requests classes
chimp1984 Oct 1, 2020
c79504c
Add getter (needed later)
chimp1984 Oct 1, 2020
3fb73ac
Add HistoricalDataStoreService
chimp1984 Oct 1, 2020
3df2f7e
Add HISTORY field. Make isNewVersion public
chimp1984 Oct 1, 2020
e44fdbd
Refactoring: Rename variable
chimp1984 Oct 1, 2020
62836d7
Add support for HistoricalDataStoreService
chimp1984 Oct 1, 2020
58efb62
Refactoring: Rearrange method (moved method)
chimp1984 Oct 1, 2020
65de106
Use HistoricalDataStoreService for TradeStatistics2StorageService
chimp1984 Oct 1, 2020
8ea6da0
Apply changes to test classes
chimp1984 Oct 1, 2020
6f5bfde
Deactivate usage of HistoricalDataStoreService for now.
chimp1984 Oct 1, 2020
a1debd8
Fix grammar
chimp1984 Oct 1, 2020
5027c86
Fix proto field index
chimp1984 Oct 1, 2020
3ee60d5
Add license
chimp1984 Oct 1, 2020
c308791
Add license
chimp1984 Oct 1, 2020
25a7979
Merge branch 'master_upstream' into reduce-initial-date-request
chimp1984 Oct 2, 2020
ef7f5a7
Revert nonce index at PreliminaryGetDataRequest protobuf entry to 21 …
chimp1984 Oct 2, 2020
d3384e6
Update p2p/src/main/java/bisq/network/p2p/storage/persistence/Histori…
chimp1984 Oct 2, 2020
9d51714
Update p2p/src/main/java/bisq/network/p2p/storage/persistence/Histori…
chimp1984 Oct 2, 2020
35d13fb
Make putIfAbsent method more clear
chimp1984 Oct 2, 2020
611bcef
Update p2p/src/main/java/bisq/network/p2p/storage/persistence/Histori…
chimp1984 Oct 2, 2020
e9c57b1
Update p2p/src/main/java/bisq/network/p2p/storage/P2PDataStorage.java
chimp1984 Oct 2, 2020
308aa16
Update p2p/src/main/java/bisq/network/p2p/storage/P2PDataStorage.java
chimp1984 Oct 2, 2020
807b9fc
Remove unused method, fix typo, remove unneeded annotations
chimp1984 Oct 2, 2020
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 8 additions & 3 deletions common/src/main/java/bisq/common/storage/Storage.java
Original file line number Diff line number Diff line change
Expand Up @@ -77,12 +77,17 @@ public Storage(@Named(Config.STORAGE_DIR) File dir,
this.corruptedDatabaseFilesHandler = corruptedDatabaseFilesHandler;
}

@Nullable
public T getPersisted(String fileName) {
return getPersisted(new File(dir, fileName));
}

@Nullable
public T initAndGetPersistedWithFileName(String fileName, long delay) {
this.fileName = fileName;
storageFile = new File(dir, fileName);
fileManager = new FileManager<>(dir, storageFile, delay, persistenceProtoResolver);
return getPersisted();
return getPersisted(storageFile);
}

@Nullable
Expand All @@ -96,7 +101,7 @@ public T initAndGetPersisted(T persistable, String fileName, long delay) {
this.fileName = fileName;
storageFile = new File(dir, fileName);
fileManager = new FileManager<>(dir, storageFile, delay, persistenceProtoResolver);
return getPersisted();
return getPersisted(storageFile);
}

public void queueUpForSave() {
Expand Down Expand Up @@ -144,7 +149,7 @@ public void remove(String fileName) {
// We do the file read on the UI thread to avoid problems from multi threading.
// Data are small and read is done only at startup, so it is no performance issue.
@Nullable
private T getPersisted() {
private T getPersisted(File storageFile) {
if (storageFile.exists()) {
long now = System.currentTimeMillis();
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,18 +19,13 @@


import bisq.network.p2p.storage.P2PDataStorage;
import bisq.network.p2p.storage.payload.PersistableNetworkPayload;

import bisq.common.proto.persistable.ThreadedPersistableEnvelope;
import bisq.network.p2p.storage.persistence.PersistableNetworkPayloadStore;

import com.google.protobuf.Message;

import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors;

import lombok.Getter;
import lombok.extern.slf4j.Slf4j;


Expand All @@ -40,9 +35,7 @@
* definition and provide a hashMap for the domain access.
*/
@Slf4j
public class SignedWitnessStore implements ThreadedPersistableEnvelope {
@Getter
private Map<P2PDataStorage.ByteArray, PersistableNetworkPayload> map = new ConcurrentHashMap<>();
public class SignedWitnessStore extends PersistableNetworkPayloadStore {

SignedWitnessStore() {
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,18 +18,13 @@
package bisq.core.account.witness;

import bisq.network.p2p.storage.P2PDataStorage;
import bisq.network.p2p.storage.payload.PersistableNetworkPayload;

import bisq.common.proto.persistable.ThreadedPersistableEnvelope;
import bisq.network.p2p.storage.persistence.PersistableNetworkPayloadStore;

import com.google.protobuf.Message;

import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors;

import lombok.Getter;
import lombok.extern.slf4j.Slf4j;


Expand All @@ -39,9 +34,7 @@
* definition and provide a hashMap for the domain access.
*/
@Slf4j
public class AccountAgeWitnessStore implements ThreadedPersistableEnvelope {
@Getter
private Map<P2PDataStorage.ByteArray, PersistableNetworkPayload> map = new ConcurrentHashMap<>();
public class AccountAgeWitnessStore extends PersistableNetworkPayloadStore {

AccountAgeWitnessStore() {
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,18 +18,13 @@
package bisq.core.dao.governance.blindvote.storage;

import bisq.network.p2p.storage.P2PDataStorage;
import bisq.network.p2p.storage.payload.PersistableNetworkPayload;

import bisq.common.proto.persistable.ThreadedPersistableEnvelope;
import bisq.network.p2p.storage.persistence.PersistableNetworkPayloadStore;

import com.google.protobuf.Message;

import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors;

import lombok.Getter;
import lombok.extern.slf4j.Slf4j;


Expand All @@ -39,9 +34,7 @@
* definition and provide a hashMap for the domain access.
*/
@Slf4j
public class BlindVoteStore implements ThreadedPersistableEnvelope {
@Getter
private Map<P2PDataStorage.ByteArray, PersistableNetworkPayload> map = new ConcurrentHashMap<>();
public class BlindVoteStore extends PersistableNetworkPayloadStore {

BlindVoteStore() {
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,18 +18,13 @@
package bisq.core.dao.governance.proposal.storage.appendonly;

import bisq.network.p2p.storage.P2PDataStorage;
import bisq.network.p2p.storage.payload.PersistableNetworkPayload;

import bisq.common.proto.persistable.ThreadedPersistableEnvelope;
import bisq.network.p2p.storage.persistence.PersistableNetworkPayloadStore;

import com.google.protobuf.Message;

import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors;

import lombok.Getter;
import lombok.extern.slf4j.Slf4j;


Expand All @@ -39,9 +34,7 @@
* definition and provide a hashMap for the domain access.
*/
@Slf4j
public class ProposalStore implements ThreadedPersistableEnvelope {
@Getter
private Map<P2PDataStorage.ByteArray, PersistableNetworkPayload> map = new ConcurrentHashMap<>();
public class ProposalStore extends PersistableNetworkPayloadStore {

ProposalStore() {
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,11 +79,12 @@ public void applyBannedNodes(@Nullable List<String> bannedNodes) {
fillProviderList();
selectNextProviderBaseUrl();

if (bannedNodes == null)
if (bannedNodes == null) {
log.info("Selected provider baseUrl={}, providerList={}", baseUrl, providerList);
else
} else if (!bannedNodes.isEmpty()) {
log.warn("We have banned provider nodes: bannedNodes={}, selected provider baseUrl={}, providerList={}",
bannedNodes, baseUrl, providerList);
}
}

public void selectNextProviderBaseUrl() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,8 +100,11 @@ public Map<NodeAddress, T> getDisputeAgents() {
} else {
bannedDisputeAgents = null;
}
if (bannedDisputeAgents != null)

if (bannedDisputeAgents != null && !bannedDisputeAgents.isEmpty()) {
log.warn("bannedDisputeAgents=" + bannedDisputeAgents);
}

Set<T> disputeAgentSet = getDisputeAgentSet(bannedDisputeAgents);

Map<NodeAddress, T> map = new HashMap<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,18 +18,13 @@
package bisq.core.trade.statistics;

import bisq.network.p2p.storage.P2PDataStorage;
import bisq.network.p2p.storage.payload.PersistableNetworkPayload;

import bisq.common.proto.persistable.ThreadedPersistableEnvelope;
import bisq.network.p2p.storage.persistence.PersistableNetworkPayloadStore;

import com.google.protobuf.Message;

import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors;

import lombok.Getter;
import lombok.extern.slf4j.Slf4j;

/**
Expand All @@ -38,9 +33,7 @@
* definition and provide a hashMap for the domain access.
*/
@Slf4j
public class TradeStatistics2Store implements ThreadedPersistableEnvelope {
@Getter
private final Map<P2PDataStorage.ByteArray, PersistableNetworkPayload> map = new ConcurrentHashMap<>();
public class TradeStatistics2Store extends PersistableNetworkPayloadStore {

TradeStatistics2Store() {
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,22 +88,22 @@ public void handle(GetDataRequest getDataRequest, final Connection connection) {
.map(e -> "node address " + e.getFullAddress())
.orElseGet(() -> "connection UID " + connection.getUid());

AtomicBoolean outPersistableNetworkPayloadOutputTruncated = new AtomicBoolean(false);
AtomicBoolean outProtectedStoragePayloadOutputTruncated = new AtomicBoolean(false);
AtomicBoolean wasPersistableNetworkPayloadsTruncated = new AtomicBoolean(false);
AtomicBoolean wasProtectedStorageEntriesTruncated = new AtomicBoolean(false);
GetDataResponse getDataResponse = dataStorage.buildGetDataResponse(
getDataRequest,
MAX_ENTRIES,
outPersistableNetworkPayloadOutputTruncated,
outProtectedStoragePayloadOutputTruncated,
wasPersistableNetworkPayloadsTruncated,
wasProtectedStorageEntriesTruncated,
connection.getCapabilities());

if (outPersistableNetworkPayloadOutputTruncated.get()) {
if (wasPersistableNetworkPayloadsTruncated.get()) {
log.warn("The getData request from peer with {} caused too much PersistableNetworkPayload " +
"entries to get delivered. We limited the entries for the response to {} entries",
connectionInfo, MAX_ENTRIES);
}

if (outProtectedStoragePayloadOutputTruncated.get()) {
if (wasProtectedStorageEntriesTruncated.get()) {
log.warn("The getData request from peer with {} caused too much ProtectedStorageEntry " +
"entries to get delivered. We limited the entries for the response to {} entries",
connectionInfo, MAX_ENTRIES);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -260,15 +260,16 @@ private void logContents(NetworkEnvelope networkEnvelope,

// Log different data types
StringBuilder sb = new StringBuilder();
sb.append("\n#################################################################\n");
sb.append("Connected to node: " + peersNodeAddress.getFullAddress() + "\n");
String sep = System.lineSeparator();
sb.append(sep).append("#################################################################").append(sep);
sb.append("Connected to node: ").append(peersNodeAddress.getFullAddress()).append(sep);
int items = dataSet.size() + persistableNetworkPayloadSet.size();
sb.append("Received ").append(items).append(" instances from a ")
.append(getDataRequestType).append("\n");
.append(getDataRequestType).append(sep);
payloadByClassName.forEach((key, value) -> sb.append(key)
.append(": ")
.append(value.size())
.append("\n"));
.append(sep));
sb.append("#################################################################");
log.info(sb.toString());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@
import lombok.Getter;
import lombok.ToString;

import javax.annotation.Nullable;

@EqualsAndHashCode(callSuper = true)
@Getter
@ToString
Expand All @@ -35,11 +37,18 @@ public abstract class GetDataRequest extends NetworkEnvelope implements Extended
// Keys for ProtectedStorageEntry items to be excluded from the request because the peer has them already
protected final Set<byte[]> excludedKeys;

// Added at v1.4.0
// The version of the requester. Used for response to send potentially missing historical data
@Nullable
protected final String version;

public GetDataRequest(int messageVersion,
int nonce,
Set<byte[]> excludedKeys) {
Set<byte[]> excludedKeys,
@Nullable String version) {
super(messageVersion);
this.nonce = nonce;
this.excludedKeys = excludedKeys;
this.version = version;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,14 +27,15 @@

import com.google.protobuf.ByteString;

import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;

import lombok.EqualsAndHashCode;
import lombok.Value;
import lombok.extern.slf4j.Slf4j;

import static com.google.common.base.Preconditions.checkNotNull;
import javax.annotation.Nullable;

@Slf4j
@EqualsAndHashCode(callSuper = true)
Expand All @@ -48,6 +49,7 @@ public GetUpdatedDataRequest(NodeAddress senderNodeAddress,
this(senderNodeAddress,
nonce,
excludedKeys,
Version.VERSION,
Version.getP2PMessageVersion());
}

Expand All @@ -59,35 +61,41 @@ public GetUpdatedDataRequest(NodeAddress senderNodeAddress,
private GetUpdatedDataRequest(NodeAddress senderNodeAddress,
int nonce,
Set<byte[]> excludedKeys,
@Nullable String version,
int messageVersion) {
super(messageVersion,
nonce,
excludedKeys);
checkNotNull(senderNodeAddress, "senderNodeAddress must not be null at GetUpdatedDataRequest");
excludedKeys,
version);
this.senderNodeAddress = senderNodeAddress;
}

@Override
public protobuf.NetworkEnvelope toProtoNetworkEnvelope() {
final protobuf.GetUpdatedDataRequest.Builder builder = protobuf.GetUpdatedDataRequest.newBuilder()
protobuf.GetUpdatedDataRequest.Builder builder = protobuf.GetUpdatedDataRequest.newBuilder()
.setSenderNodeAddress(senderNodeAddress.toProtoMessage())
.setNonce(nonce)
.addAllExcludedKeys(excludedKeys.stream()
.map(ByteString::copyFrom)
.collect(Collectors.toList()));

Optional.ofNullable(version).ifPresent(builder::setVersion);
NetworkEnvelope proto = getNetworkEnvelopeBuilder()
.setGetUpdatedDataRequest(builder)
.build();
log.info("Sending a GetUpdatedDataRequest with {} kB", proto.getSerializedSize() / 1000d);
log.info("Sending a GetUpdatedDataRequest with {} kB and {} excluded key entries. Requesters version={}",
proto.getSerializedSize() / 1000d, excludedKeys.size(), version);
return proto;
}

public static GetUpdatedDataRequest fromProto(protobuf.GetUpdatedDataRequest proto, int messageVersion) {
log.info("Received a GetUpdatedDataRequest with {} kB", proto.getSerializedSize() / 1000d);
Set<byte[]> excludedKeys = ProtoUtil.byteSetFromProtoByteStringList(proto.getExcludedKeysList());
String requestersVersion = ProtoUtil.stringOrNullFromProto(proto.getVersion());
log.info("Received a GetUpdatedDataRequest with {} kB and {} excluded key entries. Requesters version={}",
proto.getSerializedSize() / 1000d, excludedKeys.size(), requestersVersion);
return new GetUpdatedDataRequest(NodeAddress.fromProto(proto.getSenderNodeAddress()),
proto.getNonce(),
ProtoUtil.byteSetFromProtoByteStringList(proto.getExcludedKeysList()),
excludedKeys,
requestersVersion,
messageVersion);
}
}
Loading