synchronize P2PDataStorage to avoid race conditions
This commit is contained in:
parent
93a93d757c
commit
870630b381
@ -232,8 +232,10 @@ public class P2PDataStorage implements MessageListener, ConnectionListener, Pers
|
||||
|
||||
appendOnlyDataStoreService.readFromResources(postFix, () -> appendOnlyDataStoreServiceReady.set(true));
|
||||
protectedDataStoreService.readFromResources(postFix, () -> {
|
||||
map.putAll(protectedDataStoreService.getMap());
|
||||
protectedDataStoreServiceReady.set(true);
|
||||
synchronized (map) {
|
||||
map.putAll(protectedDataStoreService.getMap());
|
||||
protectedDataStoreServiceReady.set(true);
|
||||
}
|
||||
});
|
||||
resourceDataStoreService.readFromResources(postFix, () -> resourceDataStoreServiceReady.set(true));
|
||||
}
|
||||
@ -241,20 +243,24 @@ public class P2PDataStorage implements MessageListener, ConnectionListener, Pers
|
||||
// Uses synchronous execution on the userThread. Only used by tests. The async methods should be used by app code.
|
||||
@VisibleForTesting
|
||||
public void readFromResourcesSync(String postFix) {
|
||||
appendOnlyDataStoreService.readFromResourcesSync(postFix);
|
||||
protectedDataStoreService.readFromResourcesSync(postFix);
|
||||
resourceDataStoreService.readFromResourcesSync(postFix);
|
||||
|
||||
map.putAll(protectedDataStoreService.getMap());
|
||||
synchronized (map) {
|
||||
appendOnlyDataStoreService.readFromResourcesSync(postFix);
|
||||
protectedDataStoreService.readFromResourcesSync(postFix);
|
||||
resourceDataStoreService.readFromResourcesSync(postFix);
|
||||
|
||||
map.putAll(protectedDataStoreService.getMap());
|
||||
}
|
||||
}
|
||||
|
||||
// We get added mailbox message data from MailboxMessageService. We want to add those early so we can get it added
|
||||
// to our excluded keys to reduce initial data response data size.
|
||||
public void addProtectedMailboxStorageEntryToMap(ProtectedStorageEntry protectedStorageEntry) {
|
||||
ProtectedStoragePayload protectedStoragePayload = protectedStorageEntry.getProtectedStoragePayload();
|
||||
ByteArray hashOfPayload = get32ByteHashAsByteArray(protectedStoragePayload);
|
||||
map.put(hashOfPayload, protectedStorageEntry);
|
||||
//log.trace("## addProtectedMailboxStorageEntryToMap hashOfPayload={}, map={}", hashOfPayload, printMap());
|
||||
synchronized (map) {
|
||||
ProtectedStoragePayload protectedStoragePayload = protectedStorageEntry.getProtectedStoragePayload();
|
||||
ByteArray hashOfPayload = get32ByteHashAsByteArray(protectedStoragePayload);
|
||||
map.put(hashOfPayload, protectedStorageEntry);
|
||||
//log.trace("## addProtectedMailboxStorageEntryToMap hashOfPayload={}, map={}", hashOfPayload, printMap());
|
||||
}
|
||||
}
|
||||
|
||||
///////////////////////////////////////////////////////////////////////////////////////////
|
||||
@ -627,28 +633,30 @@ public class P2PDataStorage implements MessageListener, ConnectionListener, Pers
|
||||
|
||||
@VisibleForTesting
|
||||
void removeExpiredEntries() {
|
||||
// The moment when an object becomes expired will not be synchronous in the network and we could
|
||||
// get add network_messages after the object has expired. To avoid repeated additions of already expired
|
||||
// object when we get it sent from new peers, we don’t remove the sequence number from the map.
|
||||
// That way an ADD message for an already expired data will fail because the sequence number
|
||||
// is equal and not larger as expected.
|
||||
ArrayList<Map.Entry<ByteArray, ProtectedStorageEntry>> toRemoveList = map.entrySet().stream()
|
||||
.filter(entry -> entry.getValue().isExpired(this.clock))
|
||||
.collect(Collectors.toCollection(ArrayList::new));
|
||||
synchronized (map) {
|
||||
// The moment when an object becomes expired will not be synchronous in the network and we could
|
||||
// get add network_messages after the object has expired. To avoid repeated additions of already expired
|
||||
// object when we get it sent from new peers, we don’t remove the sequence number from the map.
|
||||
// That way an ADD message for an already expired data will fail because the sequence number
|
||||
// is equal and not larger as expected.
|
||||
ArrayList<Map.Entry<ByteArray, ProtectedStorageEntry>> toRemoveList = map.entrySet().stream()
|
||||
.filter(entry -> entry.getValue().isExpired(this.clock))
|
||||
.collect(Collectors.toCollection(ArrayList::new));
|
||||
|
||||
// Batch processing can cause performance issues, so do all of the removes first, then update the listeners
|
||||
// to let them know about the removes.
|
||||
if (log.isDebugEnabled()) {
|
||||
toRemoveList.forEach(toRemoveItem -> {
|
||||
log.debug("We found an expired data entry. We remove the protectedData:\n\t{}",
|
||||
Utilities.toTruncatedString(toRemoveItem.getValue()));
|
||||
});
|
||||
}
|
||||
removeFromMapAndDataStore(toRemoveList);
|
||||
// Batch processing can cause performance issues, so do all of the removes first, then update the listeners
|
||||
// to let them know about the removes.
|
||||
if (log.isDebugEnabled()) {
|
||||
toRemoveList.forEach(toRemoveItem -> {
|
||||
log.debug("We found an expired data entry. We remove the protectedData:\n\t{}",
|
||||
Utilities.toTruncatedString(toRemoveItem.getValue()));
|
||||
});
|
||||
}
|
||||
removeFromMapAndDataStore(toRemoveList);
|
||||
|
||||
if (sequenceNumberMap.size() > this.maxSequenceNumberMapSizeBeforePurge) {
|
||||
sequenceNumberMap.setMap(getPurgedSequenceNumberMap(sequenceNumberMap.getMap()));
|
||||
requestPersistence();
|
||||
if (sequenceNumberMap.size() > this.maxSequenceNumberMapSizeBeforePurge) {
|
||||
sequenceNumberMap.setMap(getPurgedSequenceNumberMap(sequenceNumberMap.getMap()));
|
||||
requestPersistence();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@ -699,22 +707,24 @@ public class P2PDataStorage implements MessageListener, ConnectionListener, Pers
|
||||
NodeAddress peersNodeAddress = connection.getPeersNodeAddressOptional().get();
|
||||
|
||||
// Backdate all the eligible payloads based on the node that disconnected
|
||||
map.values().stream()
|
||||
.filter(protectedStorageEntry -> protectedStorageEntry.getProtectedStoragePayload() instanceof RequiresOwnerIsOnlinePayload)
|
||||
.filter(protectedStorageEntry -> ((RequiresOwnerIsOnlinePayload) protectedStorageEntry.getProtectedStoragePayload()).getOwnerNodeAddress().equals(peersNodeAddress))
|
||||
.forEach(protectedStorageEntry -> {
|
||||
// We only set the data back by half of the TTL and remove the data only if is has
|
||||
// expired after that back dating.
|
||||
// We might get connection drops which are not caused by the node going offline, so
|
||||
// we give more tolerance with that approach, giving the node the chance to
|
||||
// refresh the TTL with a refresh message.
|
||||
// We observed those issues during stress tests, but it might have been caused by the
|
||||
// test set up (many nodes/connections over 1 router)
|
||||
// TODO investigate what causes the disconnections.
|
||||
// Usually the are: SOCKET_TIMEOUT ,TERMINATED (EOFException)
|
||||
log.debug("Backdating {} due to closeConnectionReason={}", protectedStorageEntry, closeConnectionReason);
|
||||
protectedStorageEntry.backDate();
|
||||
});
|
||||
synchronized (map) {
|
||||
map.values().stream()
|
||||
.filter(protectedStorageEntry -> protectedStorageEntry.getProtectedStoragePayload() instanceof RequiresOwnerIsOnlinePayload)
|
||||
.filter(protectedStorageEntry -> ((RequiresOwnerIsOnlinePayload) protectedStorageEntry.getProtectedStoragePayload()).getOwnerNodeAddress().equals(peersNodeAddress))
|
||||
.forEach(protectedStorageEntry -> {
|
||||
// We only set the data back by half of the TTL and remove the data only if is has
|
||||
// expired after that back dating.
|
||||
// We might get connection drops which are not caused by the node going offline, so
|
||||
// we give more tolerance with that approach, giving the node the chance to
|
||||
// refresh the TTL with a refresh message.
|
||||
// We observed those issues during stress tests, but it might have been caused by the
|
||||
// test set up (many nodes/connections over 1 router)
|
||||
// TODO investigate what causes the disconnections.
|
||||
// Usually the are: SOCKET_TIMEOUT ,TERMINATED (EOFException)
|
||||
log.debug("Backdating {} due to closeConnectionReason={}", protectedStorageEntry, closeConnectionReason);
|
||||
protectedStorageEntry.backDate();
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
///////////////////////////////////////////////////////////////////////////////////////////
|
||||
@ -818,81 +828,83 @@ public class P2PDataStorage implements MessageListener, ConnectionListener, Pers
|
||||
@Nullable NodeAddress sender,
|
||||
@Nullable BroadcastHandler.Listener listener,
|
||||
boolean allowBroadcast) {
|
||||
ProtectedStoragePayload protectedStoragePayload = protectedStorageEntry.getProtectedStoragePayload();
|
||||
ByteArray hashOfPayload = get32ByteHashAsByteArray(protectedStoragePayload);
|
||||
|
||||
//log.trace("## call addProtectedStorageEntry hash={}, map={}", hashOfPayload, printMap());
|
||||
|
||||
// We do that check early as it is a very common case for returning, so we return early
|
||||
// If we have seen a more recent operation for this payload and we have a payload locally, ignore it
|
||||
ProtectedStorageEntry storedEntry = map.get(hashOfPayload);
|
||||
if (storedEntry != null && !hasSequenceNrIncreased(protectedStorageEntry.getSequenceNumber(), hashOfPayload)) {
|
||||
log.trace("## hasSequenceNrIncreased is false. hash={}", hashOfPayload);
|
||||
return false;
|
||||
synchronized (map) {
|
||||
ProtectedStoragePayload protectedStoragePayload = protectedStorageEntry.getProtectedStoragePayload();
|
||||
ByteArray hashOfPayload = get32ByteHashAsByteArray(protectedStoragePayload);
|
||||
|
||||
//log.trace("## call addProtectedStorageEntry hash={}, map={}", hashOfPayload, printMap());
|
||||
|
||||
// We do that check early as it is a very common case for returning, so we return early
|
||||
// If we have seen a more recent operation for this payload and we have a payload locally, ignore it
|
||||
ProtectedStorageEntry storedEntry = map.get(hashOfPayload);
|
||||
if (storedEntry != null && !hasSequenceNrIncreased(protectedStorageEntry.getSequenceNumber(), hashOfPayload)) {
|
||||
log.trace("## hasSequenceNrIncreased is false. hash={}", hashOfPayload);
|
||||
return false;
|
||||
}
|
||||
|
||||
if (hasAlreadyRemovedAddOncePayload(protectedStoragePayload, hashOfPayload)) {
|
||||
log.trace("## We have already removed that AddOncePayload by a previous removeDataMessage. " +
|
||||
"We ignore that message. ProtectedStoragePayload: {}", protectedStoragePayload.toString());
|
||||
return false;
|
||||
}
|
||||
|
||||
// To avoid that expired data get stored and broadcast we check for expire date.
|
||||
if (protectedStorageEntry.isExpired(clock)) {
|
||||
String peer = sender != null ? sender.getFullAddress() : "sender is null";
|
||||
log.trace("## We received an expired protectedStorageEntry from peer {}. ProtectedStoragePayload={}",
|
||||
peer, protectedStorageEntry.getProtectedStoragePayload().getClass().getSimpleName());
|
||||
return false;
|
||||
}
|
||||
|
||||
// We want to allow add operations for equal sequence numbers if we don't have the payload locally. This is
|
||||
// the case for non-persistent Payloads that need to be reconstructed from peer and seed nodes each startup.
|
||||
MapValue sequenceNumberMapValue = sequenceNumberMap.get(hashOfPayload);
|
||||
if (sequenceNumberMapValue != null &&
|
||||
protectedStorageEntry.getSequenceNumber() < sequenceNumberMapValue.sequenceNr) {
|
||||
log.trace("## sequenceNr too low hash={}", hashOfPayload);
|
||||
return false;
|
||||
}
|
||||
|
||||
// Verify the ProtectedStorageEntry is well formed and valid for the add operation
|
||||
if (!protectedStorageEntry.isValidForAddOperation()) {
|
||||
log.trace("## !isValidForAddOperation hash={}", hashOfPayload);
|
||||
return false;
|
||||
}
|
||||
|
||||
// If we have already seen an Entry with the same hash, verify the metadata is equal
|
||||
if (storedEntry != null && !protectedStorageEntry.matchesRelevantPubKey(storedEntry)) {
|
||||
log.trace("## !matchesRelevantPubKey hash={}", hashOfPayload);
|
||||
return false;
|
||||
}
|
||||
|
||||
// Test against filterPredicate set from FilterManager
|
||||
if (filterPredicate != null &&
|
||||
!filterPredicate.test(protectedStorageEntry.getProtectedStoragePayload())) {
|
||||
log.debug("filterPredicate test failed. hashOfPayload={}", hashOfPayload);
|
||||
return false;
|
||||
}
|
||||
|
||||
// This is an updated entry. Record it and signal listeners.
|
||||
map.put(hashOfPayload, protectedStorageEntry);
|
||||
hashMapChangedListeners.forEach(e -> e.onAdded(Collections.singletonList(protectedStorageEntry)));
|
||||
|
||||
// Record the updated sequence number and persist it. Higher delay so we can batch more items.
|
||||
sequenceNumberMap.put(hashOfPayload, new MapValue(protectedStorageEntry.getSequenceNumber(), this.clock.millis()));
|
||||
requestPersistence();
|
||||
|
||||
//log.trace("## ProtectedStorageEntry added to map. hash={}, map={}", hashOfPayload, printMap());
|
||||
|
||||
// Optionally, broadcast the add/update depending on the calling environment
|
||||
if (allowBroadcast) {
|
||||
broadcaster.broadcast(new AddDataMessage(protectedStorageEntry), sender, listener);
|
||||
log.trace("## broadcasted ProtectedStorageEntry. hash={}", hashOfPayload);
|
||||
}
|
||||
// Persist ProtectedStorageEntries carrying PersistablePayload payloads
|
||||
if (protectedStoragePayload instanceof PersistablePayload)
|
||||
protectedDataStoreService.put(hashOfPayload, protectedStorageEntry);
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
if (hasAlreadyRemovedAddOncePayload(protectedStoragePayload, hashOfPayload)) {
|
||||
log.trace("## We have already removed that AddOncePayload by a previous removeDataMessage. " +
|
||||
"We ignore that message. ProtectedStoragePayload: {}", protectedStoragePayload.toString());
|
||||
return false;
|
||||
}
|
||||
|
||||
// To avoid that expired data get stored and broadcast we check for expire date.
|
||||
if (protectedStorageEntry.isExpired(clock)) {
|
||||
String peer = sender != null ? sender.getFullAddress() : "sender is null";
|
||||
log.trace("## We received an expired protectedStorageEntry from peer {}. ProtectedStoragePayload={}",
|
||||
peer, protectedStorageEntry.getProtectedStoragePayload().getClass().getSimpleName());
|
||||
return false;
|
||||
}
|
||||
|
||||
// We want to allow add operations for equal sequence numbers if we don't have the payload locally. This is
|
||||
// the case for non-persistent Payloads that need to be reconstructed from peer and seed nodes each startup.
|
||||
MapValue sequenceNumberMapValue = sequenceNumberMap.get(hashOfPayload);
|
||||
if (sequenceNumberMapValue != null &&
|
||||
protectedStorageEntry.getSequenceNumber() < sequenceNumberMapValue.sequenceNr) {
|
||||
log.trace("## sequenceNr too low hash={}", hashOfPayload);
|
||||
return false;
|
||||
}
|
||||
|
||||
// Verify the ProtectedStorageEntry is well formed and valid for the add operation
|
||||
if (!protectedStorageEntry.isValidForAddOperation()) {
|
||||
log.trace("## !isValidForAddOperation hash={}", hashOfPayload);
|
||||
return false;
|
||||
}
|
||||
|
||||
// If we have already seen an Entry with the same hash, verify the metadata is equal
|
||||
if (storedEntry != null && !protectedStorageEntry.matchesRelevantPubKey(storedEntry)) {
|
||||
log.trace("## !matchesRelevantPubKey hash={}", hashOfPayload);
|
||||
return false;
|
||||
}
|
||||
|
||||
// Test against filterPredicate set from FilterManager
|
||||
if (filterPredicate != null &&
|
||||
!filterPredicate.test(protectedStorageEntry.getProtectedStoragePayload())) {
|
||||
log.debug("filterPredicate test failed. hashOfPayload={}", hashOfPayload);
|
||||
return false;
|
||||
}
|
||||
|
||||
// This is an updated entry. Record it and signal listeners.
|
||||
map.put(hashOfPayload, protectedStorageEntry);
|
||||
hashMapChangedListeners.forEach(e -> e.onAdded(Collections.singletonList(protectedStorageEntry)));
|
||||
|
||||
// Record the updated sequence number and persist it. Higher delay so we can batch more items.
|
||||
sequenceNumberMap.put(hashOfPayload, new MapValue(protectedStorageEntry.getSequenceNumber(), this.clock.millis()));
|
||||
requestPersistence();
|
||||
|
||||
//log.trace("## ProtectedStorageEntry added to map. hash={}, map={}", hashOfPayload, printMap());
|
||||
|
||||
// Optionally, broadcast the add/update depending on the calling environment
|
||||
if (allowBroadcast) {
|
||||
broadcaster.broadcast(new AddDataMessage(protectedStorageEntry), sender, listener);
|
||||
log.trace("## broadcasted ProtectedStorageEntry. hash={}", hashOfPayload);
|
||||
}
|
||||
// Persist ProtectedStorageEntries carrying PersistablePayload payloads
|
||||
if (protectedStoragePayload instanceof PersistablePayload)
|
||||
protectedDataStoreService.put(hashOfPayload, protectedStorageEntry);
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
/**
|
||||
@ -935,50 +947,51 @@ public class P2PDataStorage implements MessageListener, ConnectionListener, Pers
|
||||
*/
|
||||
public boolean refreshTTL(RefreshOfferMessage refreshTTLMessage,
|
||||
@Nullable NodeAddress sender) {
|
||||
|
||||
try {
|
||||
ByteArray hashOfPayload = new ByteArray(refreshTTLMessage.getHashOfPayload());
|
||||
ProtectedStorageEntry storedData = map.get(hashOfPayload);
|
||||
|
||||
if (storedData == null) {
|
||||
log.debug("We don't have data for that refresh message in our map. That is expected if we missed the data publishing.");
|
||||
|
||||
synchronized (map) {
|
||||
try {
|
||||
ByteArray hashOfPayload = new ByteArray(refreshTTLMessage.getHashOfPayload());
|
||||
ProtectedStorageEntry storedData = map.get(hashOfPayload);
|
||||
|
||||
if (storedData == null) {
|
||||
log.debug("We don't have data for that refresh message in our map. That is expected if we missed the data publishing.");
|
||||
|
||||
return false;
|
||||
}
|
||||
|
||||
ProtectedStorageEntry storedEntry = map.get(hashOfPayload);
|
||||
ProtectedStorageEntry updatedEntry = new ProtectedStorageEntry(
|
||||
storedEntry.getProtectedStoragePayload(),
|
||||
storedEntry.getOwnerPubKey(),
|
||||
refreshTTLMessage.getSequenceNumber(),
|
||||
refreshTTLMessage.getSignature(),
|
||||
this.clock);
|
||||
|
||||
|
||||
// If we have seen a more recent operation for this payload, we ignore the current one
|
||||
if (!hasSequenceNrIncreased(updatedEntry.getSequenceNumber(), hashOfPayload))
|
||||
return false;
|
||||
|
||||
// Verify the updated ProtectedStorageEntry is well formed and valid for update
|
||||
if (!updatedEntry.isValidForAddOperation())
|
||||
return false;
|
||||
|
||||
// Update the hash map with the updated entry
|
||||
map.put(hashOfPayload, updatedEntry);
|
||||
|
||||
// Record the latest sequence number and persist it
|
||||
sequenceNumberMap.put(hashOfPayload, new MapValue(updatedEntry.getSequenceNumber(), this.clock.millis()));
|
||||
requestPersistence();
|
||||
|
||||
// Always broadcast refreshes
|
||||
broadcaster.broadcast(refreshTTLMessage, sender);
|
||||
|
||||
} catch (IllegalArgumentException e) {
|
||||
log.error("refreshTTL failed, missing data: {}", e.toString());
|
||||
e.printStackTrace();
|
||||
return false;
|
||||
}
|
||||
|
||||
ProtectedStorageEntry storedEntry = map.get(hashOfPayload);
|
||||
ProtectedStorageEntry updatedEntry = new ProtectedStorageEntry(
|
||||
storedEntry.getProtectedStoragePayload(),
|
||||
storedEntry.getOwnerPubKey(),
|
||||
refreshTTLMessage.getSequenceNumber(),
|
||||
refreshTTLMessage.getSignature(),
|
||||
this.clock);
|
||||
|
||||
|
||||
// If we have seen a more recent operation for this payload, we ignore the current one
|
||||
if (!hasSequenceNrIncreased(updatedEntry.getSequenceNumber(), hashOfPayload))
|
||||
return false;
|
||||
|
||||
// Verify the updated ProtectedStorageEntry is well formed and valid for update
|
||||
if (!updatedEntry.isValidForAddOperation())
|
||||
return false;
|
||||
|
||||
// Update the hash map with the updated entry
|
||||
map.put(hashOfPayload, updatedEntry);
|
||||
|
||||
// Record the latest sequence number and persist it
|
||||
sequenceNumberMap.put(hashOfPayload, new MapValue(updatedEntry.getSequenceNumber(), this.clock.millis()));
|
||||
requestPersistence();
|
||||
|
||||
// Always broadcast refreshes
|
||||
broadcaster.broadcast(refreshTTLMessage, sender);
|
||||
|
||||
} catch (IllegalArgumentException e) {
|
||||
log.error("refreshTTL failed, missing data: {}", e.toString());
|
||||
e.printStackTrace();
|
||||
return false;
|
||||
return true;
|
||||
}
|
||||
return true;
|
||||
}
|
||||
|
||||
/**
|
||||
@ -991,48 +1004,50 @@ public class P2PDataStorage implements MessageListener, ConnectionListener, Pers
|
||||
*/
|
||||
public boolean remove(ProtectedStorageEntry protectedStorageEntry,
|
||||
@Nullable NodeAddress sender) {
|
||||
ProtectedStoragePayload protectedStoragePayload = protectedStorageEntry.getProtectedStoragePayload();
|
||||
ByteArray hashOfPayload = get32ByteHashAsByteArray(protectedStoragePayload);
|
||||
|
||||
// If we have seen a more recent operation for this payload, ignore this one
|
||||
if (!hasSequenceNrIncreased(protectedStorageEntry.getSequenceNumber(), hashOfPayload))
|
||||
return false;
|
||||
|
||||
// Verify the ProtectedStorageEntry is well formed and valid for the remove operation
|
||||
if (!protectedStorageEntry.isValidForRemoveOperation())
|
||||
return false;
|
||||
|
||||
// If we have already seen an Entry with the same hash, verify the metadata is the same
|
||||
ProtectedStorageEntry storedEntry = map.get(hashOfPayload);
|
||||
if (storedEntry != null && !protectedStorageEntry.matchesRelevantPubKey(storedEntry))
|
||||
return false;
|
||||
|
||||
// Record the latest sequence number and persist it
|
||||
sequenceNumberMap.put(hashOfPayload, new MapValue(protectedStorageEntry.getSequenceNumber(), this.clock.millis()));
|
||||
requestPersistence();
|
||||
|
||||
// Update that we have seen this AddOncePayload so the next time it is seen it fails verification
|
||||
if (protectedStoragePayload instanceof AddOncePayload) {
|
||||
removedPayloadsService.addHash(hashOfPayload);
|
||||
synchronized (map) {
|
||||
ProtectedStoragePayload protectedStoragePayload = protectedStorageEntry.getProtectedStoragePayload();
|
||||
ByteArray hashOfPayload = get32ByteHashAsByteArray(protectedStoragePayload);
|
||||
|
||||
// If we have seen a more recent operation for this payload, ignore this one
|
||||
if (!hasSequenceNrIncreased(protectedStorageEntry.getSequenceNumber(), hashOfPayload))
|
||||
return false;
|
||||
|
||||
// Verify the ProtectedStorageEntry is well formed and valid for the remove operation
|
||||
if (!protectedStorageEntry.isValidForRemoveOperation())
|
||||
return false;
|
||||
|
||||
// If we have already seen an Entry with the same hash, verify the metadata is the same
|
||||
ProtectedStorageEntry storedEntry = map.get(hashOfPayload);
|
||||
if (storedEntry != null && !protectedStorageEntry.matchesRelevantPubKey(storedEntry))
|
||||
return false;
|
||||
|
||||
// Record the latest sequence number and persist it
|
||||
sequenceNumberMap.put(hashOfPayload, new MapValue(protectedStorageEntry.getSequenceNumber(), this.clock.millis()));
|
||||
requestPersistence();
|
||||
|
||||
// Update that we have seen this AddOncePayload so the next time it is seen it fails verification
|
||||
if (protectedStoragePayload instanceof AddOncePayload) {
|
||||
removedPayloadsService.addHash(hashOfPayload);
|
||||
}
|
||||
|
||||
if (storedEntry != null) {
|
||||
// Valid remove entry, do the remove and signal listeners
|
||||
removeFromMapAndDataStore(protectedStorageEntry, hashOfPayload);
|
||||
} /* else {
|
||||
// This means the RemoveData or RemoveMailboxData was seen prior to the AddData. We have already updated
|
||||
// the SequenceNumberMap appropriately so the stale Add will not pass validation, but we still want to
|
||||
// broadcast the remove to peers so they can update their state appropriately
|
||||
} */
|
||||
printData("after remove");
|
||||
|
||||
if (protectedStorageEntry instanceof ProtectedMailboxStorageEntry) {
|
||||
broadcaster.broadcast(new RemoveMailboxDataMessage((ProtectedMailboxStorageEntry) protectedStorageEntry), sender);
|
||||
} else {
|
||||
broadcaster.broadcast(new RemoveDataMessage(protectedStorageEntry), sender);
|
||||
}
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
if (storedEntry != null) {
|
||||
// Valid remove entry, do the remove and signal listeners
|
||||
removeFromMapAndDataStore(protectedStorageEntry, hashOfPayload);
|
||||
} /* else {
|
||||
// This means the RemoveData or RemoveMailboxData was seen prior to the AddData. We have already updated
|
||||
// the SequenceNumberMap appropriately so the stale Add will not pass validation, but we still want to
|
||||
// broadcast the remove to peers so they can update their state appropriately
|
||||
} */
|
||||
printData("after remove");
|
||||
|
||||
if (protectedStorageEntry instanceof ProtectedMailboxStorageEntry) {
|
||||
broadcaster.broadcast(new RemoveMailboxDataMessage((ProtectedMailboxStorageEntry) protectedStorageEntry), sender);
|
||||
} else {
|
||||
broadcaster.broadcast(new RemoveDataMessage(protectedStorageEntry), sender);
|
||||
}
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
public ProtectedStorageEntry getProtectedStorageEntry(ProtectedStoragePayload protectedStoragePayload,
|
||||
@ -1107,30 +1122,32 @@ public class P2PDataStorage implements MessageListener, ConnectionListener, Pers
|
||||
}
|
||||
|
||||
private void removeFromMapAndDataStore(Collection<Map.Entry<ByteArray, ProtectedStorageEntry>> entriesToRemove) {
|
||||
if (entriesToRemove.isEmpty())
|
||||
return;
|
||||
synchronized (map) {
|
||||
if (entriesToRemove.isEmpty())
|
||||
return;
|
||||
|
||||
List<ProtectedStorageEntry> removedProtectedStorageEntries = new ArrayList<>(entriesToRemove.size());
|
||||
entriesToRemove.forEach(entry -> {
|
||||
ByteArray hashOfPayload = entry.getKey();
|
||||
ProtectedStorageEntry protectedStorageEntry = entry.getValue();
|
||||
List<ProtectedStorageEntry> removedProtectedStorageEntries = new ArrayList<>(entriesToRemove.size());
|
||||
entriesToRemove.forEach(entry -> {
|
||||
ByteArray hashOfPayload = entry.getKey();
|
||||
ProtectedStorageEntry protectedStorageEntry = entry.getValue();
|
||||
|
||||
//log.trace("## removeFromMapAndDataStore: hashOfPayload={}, map before remove={}", hashOfPayload, printMap());
|
||||
map.remove(hashOfPayload);
|
||||
//log.trace("## removeFromMapAndDataStore: map after remove={}", printMap());
|
||||
//log.trace("## removeFromMapAndDataStore: hashOfPayload={}, map before remove={}", hashOfPayload, printMap());
|
||||
map.remove(hashOfPayload);
|
||||
//log.trace("## removeFromMapAndDataStore: map after remove={}", printMap());
|
||||
|
||||
// We inform listeners even the entry was not found in our map
|
||||
removedProtectedStorageEntries.add(protectedStorageEntry);
|
||||
// We inform listeners even the entry was not found in our map
|
||||
removedProtectedStorageEntries.add(protectedStorageEntry);
|
||||
|
||||
ProtectedStoragePayload protectedStoragePayload = protectedStorageEntry.getProtectedStoragePayload();
|
||||
if (protectedStoragePayload instanceof PersistablePayload) {
|
||||
ProtectedStorageEntry previous = protectedDataStoreService.remove(hashOfPayload, protectedStorageEntry);
|
||||
if (previous == null)
|
||||
log.warn("We cannot remove the protectedStorageEntry from the protectedDataStoreService as it does not exist.");
|
||||
}
|
||||
});
|
||||
ProtectedStoragePayload protectedStoragePayload = protectedStorageEntry.getProtectedStoragePayload();
|
||||
if (protectedStoragePayload instanceof PersistablePayload) {
|
||||
ProtectedStorageEntry previous = protectedDataStoreService.remove(hashOfPayload, protectedStorageEntry);
|
||||
if (previous == null)
|
||||
log.warn("We cannot remove the protectedStorageEntry from the protectedDataStoreService as it does not exist.");
|
||||
}
|
||||
});
|
||||
|
||||
hashMapChangedListeners.forEach(e -> e.onRemoved(removedProtectedStorageEntries));
|
||||
hashMapChangedListeners.forEach(e -> e.onRemoved(removedProtectedStorageEntries));
|
||||
}
|
||||
}
|
||||
|
||||
private boolean hasSequenceNrIncreased(int newSequenceNumber, ByteArray hashOfData) {
|
||||
|
Loading…
Reference in New Issue
Block a user