From 1fdb02bd1f7e6c7e4f99cd56476a2da94c834a98 Mon Sep 17 00:00:00 2001 From: woodser Date: Sat, 29 Apr 2023 07:40:02 -0400 Subject: [PATCH] re-send deposits confirmed messages until acked --- .../haveno/core/offer/OfferFilterService.java | 4 +- .../main/java/haveno/core/trade/Trade.java | 11 ---- .../java/haveno/core/trade/TradeManager.java | 1 - .../core/trade/protocol/ProcessModel.java | 8 +-- .../haveno/core/trade/protocol/TradePeer.java | 5 ++ .../core/trade/protocol/TradeProtocol.java | 37 ++++++++---- .../tasks/SendDepositsConfirmedMessage.java | 59 ++++++++++++++++++- proto/src/main/proto/pb.proto | 2 +- 8 files changed, 92 insertions(+), 35 deletions(-) diff --git a/core/src/main/java/haveno/core/offer/OfferFilterService.java b/core/src/main/java/haveno/core/offer/OfferFilterService.java index f47d3ad5a0..8210ab4220 100644 --- a/core/src/main/java/haveno/core/offer/OfferFilterService.java +++ b/core/src/main/java/haveno/core/offer/OfferFilterService.java @@ -220,9 +220,9 @@ public class OfferFilterService { public boolean hasValidArbitrator(Offer offer) { Arbitrator arbitrator = user.getAcceptedArbitratorByAddress(offer.getOfferPayload().getArbitratorSigner()); - if (arbitrator == null) { + if (arbitrator == null && offer.getOfferPayload().getArbitratorSigner() != null) { List arbitratorAddresses = user.getAcceptedArbitrators().stream().map(Arbitrator::getNodeAddress).collect(Collectors.toList()); - log.warn("No arbitrator registered with offer's signer. offerId={}. Accepted arbitrators={}", offer.getOfferPayload().getArbitratorSigner(), arbitratorAddresses); + log.warn("No arbitrator is registered with offer's signer. offerId={}, arbitrator signer={}, accepted arbitrators={}", offer.getId(), offer.getOfferPayload().getArbitratorSigner(), arbitratorAddresses); } return arbitrator != null; } diff --git a/core/src/main/java/haveno/core/trade/Trade.java b/core/src/main/java/haveno/core/trade/Trade.java index ea589c38ff..ccfd0d5cd3 100644 --- a/core/src/main/java/haveno/core/trade/Trade.java +++ b/core/src/main/java/haveno/core/trade/Trade.java @@ -658,17 +658,6 @@ public abstract class Trade implements Tradable, Model { xmrWalletService.addWalletListener(idlePayoutSyncer); } - // send deposit confirmed message on startup or event - if (isDepositsConfirmed()) { - new Thread(() -> getProtocol().maybeSendDepositsConfirmedMessages()).start(); - } else { - EasyBind.subscribe(stateProperty(), state -> { - if (isDepositsConfirmed()) { - new Thread(() -> getProtocol().maybeSendDepositsConfirmedMessages()).start(); - } - }); - } - // reprocess pending payout messages this.getProtocol().maybeReprocessPaymentReceivedMessage(false); HavenoUtils.arbitrationManager.maybeReprocessDisputeClosedMessage(this, false); diff --git a/core/src/main/java/haveno/core/trade/TradeManager.java b/core/src/main/java/haveno/core/trade/TradeManager.java index 3766627d88..44651244a3 100644 --- a/core/src/main/java/haveno/core/trade/TradeManager.java +++ b/core/src/main/java/haveno/core/trade/TradeManager.java @@ -487,7 +487,6 @@ public class TradeManager implements PersistedDataHost, DecryptedDirectMessageLi if (getTradeProtocol(trade) != null) return; initTradeAndProtocol(trade, createTradeProtocol(trade)); requestPersistence(); - listenForCleanup(trade); } private void initTradeAndProtocol(Trade trade, TradeProtocol tradeProtocol) { diff --git a/core/src/main/java/haveno/core/trade/protocol/ProcessModel.java b/core/src/main/java/haveno/core/trade/protocol/ProcessModel.java index ee0b30b89c..152e34ba47 100644 --- a/core/src/main/java/haveno/core/trade/protocol/ProcessModel.java +++ b/core/src/main/java/haveno/core/trade/protocol/ProcessModel.java @@ -148,10 +148,6 @@ public class ProcessModel implements Model, PersistablePayload { @Setter private String multisigAddress; @Nullable - @Getter - @Setter - private boolean isDepositsConfirmedMessagesDelivered; - @Nullable @Setter @Getter private PaymentSentMessage paymentSentMessage; @@ -207,8 +203,7 @@ public class ProcessModel implements Model, PersistablePayload { .setFundsNeededForTrade(fundsNeededForTrade) .setPaymentSentMessageState(paymentSentMessageStateProperty.get().name()) .setBuyerPayoutAmountFromMediation(buyerPayoutAmountFromMediation) - .setSellerPayoutAmountFromMediation(sellerPayoutAmountFromMediation) - .setDepositsConfirmedMessagesDelivered(isDepositsConfirmedMessagesDelivered); + .setSellerPayoutAmountFromMediation(sellerPayoutAmountFromMediation); Optional.ofNullable(maker).ifPresent(e -> builder.setMaker((protobuf.TradePeer) maker.toProtoMessage())); Optional.ofNullable(taker).ifPresent(e -> builder.setTaker((protobuf.TradePeer) taker.toProtoMessage())); Optional.ofNullable(arbitrator).ifPresent(e -> builder.setArbitrator((protobuf.TradePeer) arbitrator.toProtoMessage())); @@ -234,7 +229,6 @@ public class ProcessModel implements Model, PersistablePayload { processModel.setFundsNeededForTrade(proto.getFundsNeededForTrade()); processModel.setBuyerPayoutAmountFromMediation(proto.getBuyerPayoutAmountFromMediation()); processModel.setSellerPayoutAmountFromMediation(proto.getSellerPayoutAmountFromMediation()); - processModel.setDepositsConfirmedMessagesDelivered(proto.getDepositsConfirmedMessagesDelivered()); // nullable processModel.setTakeOfferFeeTxId(ProtoUtil.stringOrNullFromProto(proto.getTakeOfferFeeTxId())); diff --git a/core/src/main/java/haveno/core/trade/protocol/TradePeer.java b/core/src/main/java/haveno/core/trade/protocol/TradePeer.java index 5b603501e0..be8b1d8fe7 100644 --- a/core/src/main/java/haveno/core/trade/protocol/TradePeer.java +++ b/core/src/main/java/haveno/core/trade/protocol/TradePeer.java @@ -119,6 +119,9 @@ public final class TradePeer implements PersistablePayload { private long securityDeposit; @Nullable private String updatedMultisigHex; + @Getter + @Setter + boolean depositsConfirmedMessageAcked; public TradePeer() { } @@ -163,6 +166,7 @@ public final class TradePeer implements PersistablePayload { Optional.ofNullable(depositTxKey).ifPresent(e -> builder.setDepositTxKey(depositTxKey)); Optional.ofNullable(securityDeposit).ifPresent(e -> builder.setSecurityDeposit(securityDeposit)); Optional.ofNullable(updatedMultisigHex).ifPresent(e -> builder.setUpdatedMultisigHex(updatedMultisigHex)); + builder.setDepositsConfirmedMessageAcked(depositsConfirmedMessageAcked); builder.setCurrentDate(currentDate); return builder.build(); @@ -204,6 +208,7 @@ public final class TradePeer implements PersistablePayload { tradePeer.setDepositTxKey(ProtoUtil.stringOrNullFromProto(proto.getDepositTxKey())); tradePeer.setSecurityDeposit(BigInteger.valueOf(proto.getSecurityDeposit())); tradePeer.setUpdatedMultisigHex(ProtoUtil.stringOrNullFromProto(proto.getUpdatedMultisigHex())); + tradePeer.setDepositsConfirmedMessageAcked(proto.getDepositsConfirmedMessageAcked()); return tradePeer; } } diff --git a/core/src/main/java/haveno/core/trade/protocol/TradeProtocol.java b/core/src/main/java/haveno/core/trade/protocol/TradeProtocol.java index 6e4ecf978f..c932c5c43d 100644 --- a/core/src/main/java/haveno/core/trade/protocol/TradeProtocol.java +++ b/core/src/main/java/haveno/core/trade/protocol/TradeProtocol.java @@ -75,7 +75,6 @@ import java.util.Collections; import java.util.Comparator; import java.util.List; import java.util.concurrent.CountDownLatch; -import java.util.concurrent.TimeUnit; @Slf4j public abstract class TradeProtocol implements DecryptedDirectMessageListener, DecryptedMailboxListener { @@ -255,6 +254,21 @@ public abstract class TradeProtocol implements DecryptedDirectMessageListener, D MailboxMessageService mailboxMessageService = processModel.getP2PService().getMailboxMessageService(); if (!trade.isCompleted()) mailboxMessageService.addDecryptedMailboxListener(this); handleMailboxCollection(mailboxMessageService.getMyDecryptedMailboxMessages()); + + // send deposits confirmed message if applicable + maybeSendDepositsConfirmedMessage(); + } + + private void maybeSendDepositsConfirmedMessage() { + if (trade.isDepositsConfirmed()) { + new Thread(() -> maybeSendDepositsConfirmedMessages()).start(); + } else { + EasyBind.subscribe(trade.stateProperty(), state -> { + if (trade.isDepositsConfirmed()) { + new Thread(() -> maybeSendDepositsConfirmedMessages()).start(); + } + }); + } } public void maybeReprocessPaymentReceivedMessage(boolean reprocessOnError) { @@ -617,6 +631,13 @@ public abstract class TradeProtocol implements DecryptedDirectMessageListener, D if (ackMessage.isSuccess()) { log.info("Received AckMessage for {} from {} with tradeId {} and uid {}", ackMessage.getSourceMsgClassName(), peer, trade.getId(), ackMessage.getSourceUid()); + + // handle ack for DepositsConfirmedMessage, which automatically re-sends if not ACKed in a certain time + if (ackMessage.getSourceMsgClassName().equals(DepositsConfirmedMessage.class.getSimpleName())) { + if (trade.getTradePeer(peer) != null) { + trade.getTradePeer(peer).setDepositsConfirmedMessageAcked(true); + } + } } else { String err = "Received AckMessage with error state for " + ackMessage.getSourceMsgClassName() + " from "+ peer + " with tradeId " + trade.getId() + " and errorMessage=" + ackMessage.getErrorMessage(); log.warn(err); @@ -834,8 +855,7 @@ public abstract class TradeProtocol implements DecryptedDirectMessageListener, D return tradeMessage.getTradeId().equals(trade.getId()); } else if (message instanceof AckMessage) { AckMessage ackMessage = (AckMessage) message; - return ackMessage.getSourceType() == AckMessageSourceType.TRADE_MESSAGE && - ackMessage.getSourceId().equals(trade.getId()); + return ackMessage.getSourceType() == AckMessageSourceType.TRADE_MESSAGE && ackMessage.getSourceId().equals(trade.getId()); } else { return false; } @@ -845,22 +865,15 @@ public abstract class TradeProtocol implements DecryptedDirectMessageListener, D if (trade.isShutDownStarted()) return; synchronized (trade) { if (!trade.isInitialized()) return; // skip if shutting down - if (trade.getProcessModel().isDepositsConfirmedMessagesDelivered()) return; // skip if already delivered latchTrade(); expect(new Condition(trade)) .setup(tasks(getDepositsConfirmedTasks()) .using(new TradeTaskRunner(trade, () -> { - trade.getProcessModel().setDepositsConfirmedMessagesDelivered(true); - handleTaskRunnerSuccess(null, null, "SendDepositsConfirmedMessages"); + handleTaskRunnerSuccess(null, null, "maybeSendDepositsConfirmedMessages"); }, (errorMessage) -> { - - // retry in 15 minutes - UserThread.runAfter(() -> { - maybeSendDepositsConfirmedMessages(); - }, 15, TimeUnit.MINUTES); - handleTaskRunnerFault(null, null, "SendDepositsConfirmedMessages", errorMessage); + handleTaskRunnerFault(null, null, "maybeSendDepositsConfirmedMessages", errorMessage); }))) .executeTasks(true); awaitTradeLatch(); diff --git a/core/src/main/java/haveno/core/trade/protocol/tasks/SendDepositsConfirmedMessage.java b/core/src/main/java/haveno/core/trade/protocol/tasks/SendDepositsConfirmedMessage.java index 5ac1555c55..ae26f17b03 100644 --- a/core/src/main/java/haveno/core/trade/protocol/tasks/SendDepositsConfirmedMessage.java +++ b/core/src/main/java/haveno/core/trade/protocol/tasks/SendDepositsConfirmedMessage.java @@ -17,12 +17,17 @@ package haveno.core.trade.protocol.tasks; +import java.util.concurrent.TimeUnit; + +import haveno.common.Timer; +import haveno.common.UserThread; import haveno.common.crypto.PubKeyRing; import haveno.common.taskrunner.TaskRunner; import haveno.core.trade.HavenoUtils; import haveno.core.trade.Trade; import haveno.core.trade.messages.DepositsConfirmedMessage; import haveno.core.trade.messages.TradeMailboxMessage; +import haveno.core.trade.protocol.TradePeer; import haveno.network.p2p.NodeAddress; import lombok.extern.slf4j.Slf4j; @@ -31,6 +36,11 @@ import lombok.extern.slf4j.Slf4j; */ @Slf4j public abstract class SendDepositsConfirmedMessage extends SendMailboxMessageTask { + private Timer timer; + private static final int MAX_RESEND_ATTEMPTS = 10; + private int delayInMin = 10; + private int resendCounter = 0; + private DepositsConfirmedMessage message; public SendDepositsConfirmedMessage(TaskRunner taskHandler, Trade trade) { @@ -41,6 +51,13 @@ public abstract class SendDepositsConfirmedMessage extends SendMailboxMessageTas protected void run() { try { runInterceptHook(); + + // skip if already acked by receiver + if (ackedByReceiver()) { + complete(); + return; + } + super.run(); } catch (Throwable t) { failed(t); @@ -81,7 +98,8 @@ public abstract class SendDepositsConfirmedMessage extends SendMailboxMessageTas @Override protected void setStateSent() { - // no additional handling + tryToSendAgainLater(); + processModel.getTradeManager().requestPersistence(); } @Override @@ -98,4 +116,43 @@ public abstract class SendDepositsConfirmedMessage extends SendMailboxMessageTas protected void setStateFault() { // no additional handling } + + private void cleanup() { + if (timer != null) { + timer.stop(); + } + } + + private void tryToSendAgainLater() { + + // skip if already acked + if (ackedByReceiver()) return; + + if (resendCounter >= MAX_RESEND_ATTEMPTS) { + cleanup(); + log.warn("We never received an ACK message when sending the DepositsConfirmedMessage to the peer. We stop trying to send the message."); + return; + } + + if (timer != null) { + timer.stop(); + } + + // first re-send is after 2 minutes, then double the delay each iteration + if (resendCounter == 0) { + int shortDelay = 2; + log.info("We will send the message again to the peer after a delay of {} min.", shortDelay); + timer = UserThread.runAfter(this::run, shortDelay, TimeUnit.MINUTES); + } else { + log.info("We will send the message again to the peer after a delay of {} min.", delayInMin); + timer = UserThread.runAfter(this::run, delayInMin, TimeUnit.MINUTES); + delayInMin = delayInMin * 2; + } + resendCounter++; + } + + private boolean ackedByReceiver() { + TradePeer peer = trade.getTradePeer(getReceiverNodeAddress()); + return peer.isDepositsConfirmedMessageAcked(); + } } diff --git a/proto/src/main/proto/pb.proto b/proto/src/main/proto/pb.proto index 2101719686..775d379f64 100644 --- a/proto/src/main/proto/pb.proto +++ b/proto/src/main/proto/pb.proto @@ -1530,7 +1530,6 @@ message ProcessModel { bool use_savings_wallet = 6; int64 funds_needed_for_trade = 7; string payment_sent_message_state = 8; - bool deposits_confirmed_messages_delivered = 9; bytes maker_signature = 10; TradePeer maker = 11; TradePeer taker = 12; @@ -1576,6 +1575,7 @@ message TradePeer { string deposit_tx_key = 1010; int64 security_deposit = 1011; string updated_multisig_hex = 1012; + bool deposits_confirmed_message_acked = 1013; } ///////////////////////////////////////////////////////////////////////////////////////////