process payout tx if not confirmed, send deposit responses once

This commit is contained in:
woodser 2024-05-08 17:56:36 -04:00
parent f1b8cd1e2e
commit f4de560764
3 changed files with 34 additions and 21 deletions

View File

@ -121,7 +121,7 @@ public class ArbitratorProtocol extends DisputeProtocol {
public void handleError(String errorMessage) { public void handleError(String errorMessage) {
// set trade state to send deposit responses with nack // set trade state to send deposit responses with nack
if (trade instanceof ArbitratorTrade && trade.getState() == Trade.State.SAW_ARRIVED_PUBLISH_DEPOSIT_TX_REQUEST) { if (trade instanceof ArbitratorTrade && trade.getState() == Trade.State.SAW_ARRIVED_PUBLISH_DEPOSIT_TX_REQUEST) {
trade.setState(Trade.State.PUBLISH_DEPOSIT_TX_REQUEST_FAILED); trade.setStateIfValidTransitionTo(Trade.State.PUBLISH_DEPOSIT_TX_REQUEST_FAILED);
} }
super.handleError(errorMessage); super.handleError(errorMessage);
} }

View File

@ -44,6 +44,7 @@ import java.util.UUID;
public class ArbitratorProcessDepositRequest extends TradeTask { public class ArbitratorProcessDepositRequest extends TradeTask {
private Throwable error; private Throwable error;
private boolean depositResponsesSent;
@SuppressWarnings({"unused"}) @SuppressWarnings({"unused"})
public ArbitratorProcessDepositRequest(TaskRunner taskHandler, Trade trade) { public ArbitratorProcessDepositRequest(TaskRunner taskHandler, Trade trade) {
@ -68,7 +69,7 @@ public class ArbitratorProcessDepositRequest extends TradeTask {
} catch (Throwable t) { } catch (Throwable t) {
this.error = t; this.error = t;
t.printStackTrace(); t.printStackTrace();
trade.setState(Trade.State.PUBLISH_DEPOSIT_TX_REQUEST_FAILED); trade.setStateIfValidTransitionTo(Trade.State.PUBLISH_DEPOSIT_TX_REQUEST_FAILED);
failed(t); failed(t);
} }
processModel.getTradeManager().requestPersistence(); processModel.getTradeManager().requestPersistence();
@ -137,32 +138,33 @@ public class ArbitratorProcessDepositRequest extends TradeTask {
if (isTimedOut()) throw new RuntimeException("Trade protocol has timed out before relaying deposit txs for {} {}" + trade.getClass().getSimpleName() + " " + trade.getShortId()); if (isTimedOut()) throw new RuntimeException("Trade protocol has timed out before relaying deposit txs for {} {}" + trade.getClass().getSimpleName() + " " + trade.getShortId());
trade.addInitProgressStep(); trade.addInitProgressStep();
boolean depositTxsRelayed = false;
try { try {
// submit txs to pool but do not relay // submit txs to pool but do not relay
MoneroSubmitTxResult makerResult = daemon.submitTxHex(processModel.getMaker().getDepositTxHex(), true); MoneroSubmitTxResult makerResult = daemon.submitTxHex(processModel.getMaker().getDepositTxHex(), true);
if (!makerResult.isGood()) throw new RuntimeException("Error submitting maker deposit tx: " + JsonUtils.serialize(makerResult));
MoneroSubmitTxResult takerResult = daemon.submitTxHex(processModel.getTaker().getDepositTxHex(), true); MoneroSubmitTxResult takerResult = daemon.submitTxHex(processModel.getTaker().getDepositTxHex(), true);
if (!makerResult.isGood()) throw new RuntimeException("Error submitting maker deposit tx: " + JsonUtils.serialize(makerResult));
if (!takerResult.isGood()) throw new RuntimeException("Error submitting taker deposit tx: " + JsonUtils.serialize(takerResult)); if (!takerResult.isGood()) throw new RuntimeException("Error submitting taker deposit tx: " + JsonUtils.serialize(takerResult));
// relay txs // relay txs
daemon.relayTxsByHash(Arrays.asList(processModel.getMaker().getDepositTxHash(), processModel.getTaker().getDepositTxHash())); daemon.relayTxsByHash(Arrays.asList(processModel.getMaker().getDepositTxHash(), processModel.getTaker().getDepositTxHash()));
depositTxsRelayed = true;
// update trade state // update trade state
log.info("Arbitrator published deposit txs for trade " + trade.getId()); log.info("Arbitrator published deposit txs for trade " + trade.getId());
trade.setState(Trade.State.ARBITRATOR_PUBLISHED_DEPOSIT_TXS); trade.setStateIfValidTransitionTo(Trade.State.ARBITRATOR_PUBLISHED_DEPOSIT_TXS);
} catch (Exception e) { } catch (Exception e) {
log.warn("Arbitrator error publishing deposit txs for trade {} {}: {}", trade.getClass().getSimpleName(), trade.getShortId(), e.getMessage());
e.printStackTrace();
if (!depositTxsRelayed) {
// flush txs from pool // flush txs from pool
try { try {
daemon.flushTxPool(processModel.getMaker().getDepositTxHash()); daemon.flushTxPool(processModel.getMaker().getDepositTxHash(), processModel.getTaker().getDepositTxHash());
} catch (Exception e2) { } catch (Exception e2) {
e2.printStackTrace(); e2.printStackTrace();
} }
try {
daemon.flushTxPool(processModel.getTaker().getDepositTxHash());
} catch (Exception e2) {
e2.printStackTrace();
} }
throw e; throw e;
} }
@ -170,10 +172,11 @@ public class ArbitratorProcessDepositRequest extends TradeTask {
// subscribe to trade state once to send responses with ack or nack // subscribe to trade state once to send responses with ack or nack
trade.stateProperty().addListener((obs, oldState, newState) -> { trade.stateProperty().addListener((obs, oldState, newState) -> {
if (oldState == newState) return;
if (newState == Trade.State.PUBLISH_DEPOSIT_TX_REQUEST_FAILED) { if (newState == Trade.State.PUBLISH_DEPOSIT_TX_REQUEST_FAILED) {
sendDepositResponses(error == null ? "Arbitrator failed to publish deposit txs within timeout for trade " + trade.getId() : error.getMessage()); sendDepositResponsesOnce(error == null ? "Arbitrator failed to publish deposit txs within timeout for trade " + trade.getId() : error.getMessage());
} else if (newState == Trade.State.ARBITRATOR_PUBLISHED_DEPOSIT_TXS) { } else if (newState.ordinal() >= Trade.State.ARBITRATOR_PUBLISHED_DEPOSIT_TXS.ordinal()) {
sendDepositResponses(null); sendDepositResponsesOnce(null);
} }
}); });
@ -186,7 +189,17 @@ public class ArbitratorProcessDepositRequest extends TradeTask {
return !processModel.getTradeManager().hasOpenTrade(trade); return !processModel.getTradeManager().hasOpenTrade(trade);
} }
private void sendDepositResponses(String errorMessage) { private synchronized void sendDepositResponsesOnce(String errorMessage) {
// skip if sent
if (depositResponsesSent) return;
depositResponsesSent = true;
// log error
if (errorMessage != null) {
log.warn("Sending deposit responses with error={}", errorMessage);
Thread.dumpStack();
}
// create deposit response // create deposit response
DepositResponse response = new DepositResponse( DepositResponse response = new DepositResponse(
@ -204,7 +217,7 @@ public class ArbitratorProcessDepositRequest extends TradeTask {
} }
private void sendDepositResponse(NodeAddress nodeAddress, PubKeyRing pubKeyRing, DepositResponse response) { private void sendDepositResponse(NodeAddress nodeAddress, PubKeyRing pubKeyRing, DepositResponse response) {
log.info("Sending deposit response to trader={}; offerId={}", nodeAddress, trade.getId()); log.info("Sending deposit response to trader={}; offerId={}, error={}", nodeAddress, trade.getId(), error);
processModel.getP2PService().sendEncryptedDirectMessage(nodeAddress, pubKeyRing, response, new SendDirectMessageListener() { processModel.getP2PService().sendEncryptedDirectMessage(nodeAddress, pubKeyRing, response, new SendDirectMessageListener() {
@Override @Override
public void onArrived() { public void onArrived() {

View File

@ -94,8 +94,8 @@ public class ProcessPaymentReceivedMessage extends TradeTask {
} }
trade.requestPersistence(); trade.requestPersistence();
// process payout tx unless already published // process payout tx if not confirmed
if (!trade.isPayoutPublished()) processPayoutTx(message); if (!trade.isPayoutConfirmed()) processPayoutTx(message);
// close open disputes // close open disputes
if (trade.isPayoutPublished() && trade.getDisputeState().ordinal() >= Trade.DisputeState.DISPUTE_REQUESTED.ordinal()) { if (trade.isPayoutPublished() && trade.getDisputeState().ordinal() >= Trade.DisputeState.DISPUTE_REQUESTED.ordinal()) {