From 0a3736bba0548d9fcb0180329a3f5b99dca2cfde Mon Sep 17 00:00:00 2001 From: woodser Date: Mon, 1 Jan 2024 09:04:18 -0500 Subject: [PATCH] reprocess trade messages off UserThread --- .../arbitration/ArbitrationManager.java | 18 ++--- .../core/trade/protocol/TradeProtocol.java | 66 +++++++++---------- 2 files changed, 42 insertions(+), 42 deletions(-) diff --git a/core/src/main/java/haveno/core/support/dispute/arbitration/ArbitrationManager.java b/core/src/main/java/haveno/core/support/dispute/arbitration/ArbitrationManager.java index 0dcc35b77c..9af214c8cc 100644 --- a/core/src/main/java/haveno/core/support/dispute/arbitration/ArbitrationManager.java +++ b/core/src/main/java/haveno/core/support/dispute/arbitration/ArbitrationManager.java @@ -325,16 +325,18 @@ public final class ArbitrationManager extends DisputeManager { + synchronized (trade) { - // skip if no need to reprocess - if (trade.isArbitrator() || trade.getArbitrator().getDisputeClosedMessage() == null || trade.getArbitrator().getDisputeClosedMessage().getUnsignedPayoutTxHex() == null || trade.getDisputeState().ordinal() >= Trade.DisputeState.DISPUTE_CLOSED.ordinal()) { - return; + // skip if no need to reprocess + if (trade.isArbitrator() || trade.getArbitrator().getDisputeClosedMessage() == null || trade.getArbitrator().getDisputeClosedMessage().getUnsignedPayoutTxHex() == null || trade.getDisputeState().ordinal() >= Trade.DisputeState.DISPUTE_CLOSED.ordinal()) { + return; + } + + log.warn("Reprocessing dispute closed message for {} {}", trade.getClass().getSimpleName(), trade.getId()); + handleDisputeClosedMessage(trade.getArbitrator().getDisputeClosedMessage(), reprocessOnError); } - - log.warn("Reprocessing dispute closed message for {} {}", trade.getClass().getSimpleName(), trade.getId()); - new Thread(() -> handleDisputeClosedMessage(trade.getArbitrator().getDisputeClosedMessage(), reprocessOnError)).start(); - } + }).start(); } private MoneroTxSet signAndPublishDisputePayoutTx(Trade trade) { diff --git a/core/src/main/java/haveno/core/trade/protocol/TradeProtocol.java b/core/src/main/java/haveno/core/trade/protocol/TradeProtocol.java index 63eda10281..d9ca7a0a40 100644 --- a/core/src/main/java/haveno/core/trade/protocol/TradeProtocol.java +++ b/core/src/main/java/haveno/core/trade/protocol/TradeProtocol.java @@ -259,29 +259,46 @@ public abstract class TradeProtocol implements DecryptedDirectMessageListener, D } // send deposits confirmed message if applicable - maybeSendDepositsConfirmedMessage(); + maybeSendDepositsConfirmedMessages(); + EasyBind.subscribe(trade.stateProperty(), state -> maybeSendDepositsConfirmedMessages()); } - private void maybeSendDepositsConfirmedMessage() { - HavenoUtils.submitToThread(() -> maybeSendDepositsConfirmedMessages(), trade.getId()); - EasyBind.subscribe(trade.stateProperty(), state -> { - HavenoUtils.submitToThread(() -> maybeSendDepositsConfirmedMessages(), trade.getId()); - }); + public void maybeSendDepositsConfirmedMessages() { + HavenoUtils.submitToThread(() -> { + if (!trade.isDepositsConfirmed() || trade.isDepositsConfirmedAcked() || trade.isPayoutPublished()) return; + synchronized (trade) { + if (!trade.isInitialized() || trade.isShutDownStarted()) return; // skip if shutting down + latchTrade(); + expect(new Condition(trade)) + .setup(tasks(getDepositsConfirmedTasks()) + .using(new TradeTaskRunner(trade, + () -> { + handleTaskRunnerSuccess(null, null, "maybeSendDepositsConfirmedMessages"); + }, + (errorMessage) -> { + handleTaskRunnerFault(null, null, "maybeSendDepositsConfirmedMessages", errorMessage); + }))) + .executeTasks(true); + awaitTradeLatch(); + } + }, trade.getId()); } public void maybeReprocessPaymentReceivedMessage(boolean reprocessOnError) { - synchronized (trade) { + HavenoUtils.submitToThread(() -> { + synchronized (trade) { - // skip if no need to reprocess - if (trade.isSeller() || trade.getSeller().getPaymentReceivedMessage() == null || trade.getState().ordinal() >= Trade.State.SELLER_SENT_PAYMENT_RECEIVED_MSG.ordinal()) { - return; + // skip if no need to reprocess + if (trade.isSeller() || trade.getSeller().getPaymentReceivedMessage() == null || trade.getState().ordinal() >= Trade.State.SELLER_SENT_PAYMENT_RECEIVED_MSG.ordinal()) { + return; + } + + log.warn("Reprocessing payment received message for {} {}", trade.getClass().getSimpleName(), trade.getId()); + handle(trade.getSeller().getPaymentReceivedMessage(), trade.getSeller().getPaymentReceivedMessage().getSenderNodeAddress(), reprocessOnError); } - log.warn("Reprocessing payment received message for {} {}", trade.getClass().getSimpleName(), trade.getId()); - HavenoUtils.submitToThread(() -> { - handle(trade.getSeller().getPaymentReceivedMessage(), trade.getSeller().getPaymentReceivedMessage().getSenderNodeAddress(), reprocessOnError); - }, trade.getId()); - } + handle(trade.getSeller().getPaymentReceivedMessage(), trade.getSeller().getPaymentReceivedMessage().getSenderNodeAddress(), reprocessOnError); + }, trade.getId()); } public void handleInitMultisigRequest(InitMultisigRequest request, NodeAddress sender) { @@ -844,23 +861,4 @@ public abstract class TradeProtocol implements DecryptedDirectMessageListener, D return false; } } - - public void maybeSendDepositsConfirmedMessages() { - if (!trade.isDepositsConfirmed() || trade.isDepositsConfirmedAcked() || trade.isPayoutPublished()) return; - synchronized (trade) { - if (!trade.isInitialized() || trade.isShutDownStarted()) return; // skip if shutting down - latchTrade(); - expect(new Condition(trade)) - .setup(tasks(getDepositsConfirmedTasks()) - .using(new TradeTaskRunner(trade, - () -> { - handleTaskRunnerSuccess(null, null, "maybeSendDepositsConfirmedMessages"); - }, - (errorMessage) -> { - handleTaskRunnerFault(null, null, "maybeSendDepositsConfirmedMessages", errorMessage); - }))) - .executeTasks(true); - awaitTradeLatch(); - } - } }