reprocess trade messages off UserThread

This commit is contained in:
woodser 2024-01-01 09:04:18 -05:00
parent 9a6a9ac93e
commit 0a3736bba0
2 changed files with 42 additions and 42 deletions

View File

@ -325,16 +325,18 @@ public final class ArbitrationManager extends DisputeManager<ArbitrationDisputeL
} }
public void maybeReprocessDisputeClosedMessage(Trade trade, boolean reprocessOnError) { public void maybeReprocessDisputeClosedMessage(Trade trade, boolean reprocessOnError) {
synchronized (trade) { new Thread(() -> {
synchronized (trade) {
// skip if no need to reprocess // skip if no need to reprocess
if (trade.isArbitrator() || trade.getArbitrator().getDisputeClosedMessage() == null || trade.getArbitrator().getDisputeClosedMessage().getUnsignedPayoutTxHex() == null || trade.getDisputeState().ordinal() >= Trade.DisputeState.DISPUTE_CLOSED.ordinal()) { if (trade.isArbitrator() || trade.getArbitrator().getDisputeClosedMessage() == null || trade.getArbitrator().getDisputeClosedMessage().getUnsignedPayoutTxHex() == null || trade.getDisputeState().ordinal() >= Trade.DisputeState.DISPUTE_CLOSED.ordinal()) {
return; return;
}
log.warn("Reprocessing dispute closed message for {} {}", trade.getClass().getSimpleName(), trade.getId());
handleDisputeClosedMessage(trade.getArbitrator().getDisputeClosedMessage(), reprocessOnError);
} }
}).start();
log.warn("Reprocessing dispute closed message for {} {}", trade.getClass().getSimpleName(), trade.getId());
new Thread(() -> handleDisputeClosedMessage(trade.getArbitrator().getDisputeClosedMessage(), reprocessOnError)).start();
}
} }
private MoneroTxSet signAndPublishDisputePayoutTx(Trade trade) { private MoneroTxSet signAndPublishDisputePayoutTx(Trade trade) {

View File

@ -259,29 +259,46 @@ public abstract class TradeProtocol implements DecryptedDirectMessageListener, D
} }
// send deposits confirmed message if applicable // send deposits confirmed message if applicable
maybeSendDepositsConfirmedMessage(); maybeSendDepositsConfirmedMessages();
EasyBind.subscribe(trade.stateProperty(), state -> maybeSendDepositsConfirmedMessages());
} }
private void maybeSendDepositsConfirmedMessage() { public void maybeSendDepositsConfirmedMessages() {
HavenoUtils.submitToThread(() -> maybeSendDepositsConfirmedMessages(), trade.getId()); HavenoUtils.submitToThread(() -> {
EasyBind.subscribe(trade.stateProperty(), state -> { if (!trade.isDepositsConfirmed() || trade.isDepositsConfirmedAcked() || trade.isPayoutPublished()) return;
HavenoUtils.submitToThread(() -> maybeSendDepositsConfirmedMessages(), trade.getId()); synchronized (trade) {
}); if (!trade.isInitialized() || trade.isShutDownStarted()) return; // skip if shutting down
latchTrade();
expect(new Condition(trade))
.setup(tasks(getDepositsConfirmedTasks())
.using(new TradeTaskRunner(trade,
() -> {
handleTaskRunnerSuccess(null, null, "maybeSendDepositsConfirmedMessages");
},
(errorMessage) -> {
handleTaskRunnerFault(null, null, "maybeSendDepositsConfirmedMessages", errorMessage);
})))
.executeTasks(true);
awaitTradeLatch();
}
}, trade.getId());
} }
public void maybeReprocessPaymentReceivedMessage(boolean reprocessOnError) { public void maybeReprocessPaymentReceivedMessage(boolean reprocessOnError) {
synchronized (trade) { HavenoUtils.submitToThread(() -> {
synchronized (trade) {
// skip if no need to reprocess // skip if no need to reprocess
if (trade.isSeller() || trade.getSeller().getPaymentReceivedMessage() == null || trade.getState().ordinal() >= Trade.State.SELLER_SENT_PAYMENT_RECEIVED_MSG.ordinal()) { if (trade.isSeller() || trade.getSeller().getPaymentReceivedMessage() == null || trade.getState().ordinal() >= Trade.State.SELLER_SENT_PAYMENT_RECEIVED_MSG.ordinal()) {
return; return;
}
log.warn("Reprocessing payment received message for {} {}", trade.getClass().getSimpleName(), trade.getId());
handle(trade.getSeller().getPaymentReceivedMessage(), trade.getSeller().getPaymentReceivedMessage().getSenderNodeAddress(), reprocessOnError);
} }
log.warn("Reprocessing payment received message for {} {}", trade.getClass().getSimpleName(), trade.getId()); handle(trade.getSeller().getPaymentReceivedMessage(), trade.getSeller().getPaymentReceivedMessage().getSenderNodeAddress(), reprocessOnError);
HavenoUtils.submitToThread(() -> { }, trade.getId());
handle(trade.getSeller().getPaymentReceivedMessage(), trade.getSeller().getPaymentReceivedMessage().getSenderNodeAddress(), reprocessOnError);
}, trade.getId());
}
} }
public void handleInitMultisigRequest(InitMultisigRequest request, NodeAddress sender) { public void handleInitMultisigRequest(InitMultisigRequest request, NodeAddress sender) {
@ -844,23 +861,4 @@ public abstract class TradeProtocol implements DecryptedDirectMessageListener, D
return false; return false;
} }
} }
public void maybeSendDepositsConfirmedMessages() {
if (!trade.isDepositsConfirmed() || trade.isDepositsConfirmedAcked() || trade.isPayoutPublished()) return;
synchronized (trade) {
if (!trade.isInitialized() || trade.isShutDownStarted()) return; // skip if shutting down
latchTrade();
expect(new Condition(trade))
.setup(tasks(getDepositsConfirmedTasks())
.using(new TradeTaskRunner(trade,
() -> {
handleTaskRunnerSuccess(null, null, "maybeSendDepositsConfirmedMessages");
},
(errorMessage) -> {
handleTaskRunnerFault(null, null, "maybeSendDepositsConfirmedMessages", errorMessage);
})))
.executeTasks(true);
awaitTradeLatch();
}
}
} }