handle timeout in trade initialization with protocol latch
This commit is contained in:
parent
83b8616f6f
commit
b4fe0f0ee6
@ -98,7 +98,7 @@ public class HavenoHeadlessApp implements HeadlessApp {
|
||||
lastVersion, Version.VERSION));
|
||||
|
||||
corruptedStorageFileHandler.getFiles().ifPresent(files -> log.warn("getCorruptedDatabaseFiles. files={}", files));
|
||||
tradeManager.setTakeOfferRequestErrorMessageHandler(errorMessage -> log.error("onTakeOfferRequestErrorMessageHandler"));
|
||||
tradeManager.setTakeOfferRequestErrorMessageHandler(errorMessage -> log.error("Error taking offer: " + errorMessage));
|
||||
}
|
||||
|
||||
public void stop() {
|
||||
|
@ -633,7 +633,7 @@ public class OpenOfferManager implements PeerManager.Listener, DecryptedDirectMe
|
||||
latch.countDown();
|
||||
errorMessages.add(errorMessage);
|
||||
});
|
||||
TradeUtils.waitForLatch(latch);
|
||||
TradeUtils.awaitLatch(latch);
|
||||
}
|
||||
requestPersistence();
|
||||
if (errorMessages.size() > 0) errorMessageHandler.handleErrorMessage(errorMessages.toString());
|
||||
|
@ -305,7 +305,6 @@ public class TradeManager implements PersistedDataHost, DecryptedDirectMessageLi
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
///////////////////////////////////////////////////////////////////////////////////////////
|
||||
// Init pending trade
|
||||
///////////////////////////////////////////////////////////////////////////////////////////
|
||||
@ -478,6 +477,12 @@ public class TradeManager implements PersistedDataHost, DecryptedDirectMessageLi
|
||||
return;
|
||||
}
|
||||
|
||||
Optional<Trade> tradeOptional = getOpenTrade(request.getTradeId());
|
||||
if (tradeOptional.isPresent()) {
|
||||
log.warn("Maker trade already exists with id " + request.getTradeId() + ". This should never happen.");
|
||||
return;
|
||||
}
|
||||
|
||||
openOfferManager.reserveOpenOffer(openOffer); // TODO (woodser): reserve offer if arbitrator? probably. or, arbitrator does not have open offer?
|
||||
|
||||
Trade trade;
|
||||
@ -751,8 +756,8 @@ public class TradeManager implements PersistedDataHost, DecryptedDirectMessageLi
|
||||
requestPersistence();
|
||||
}, errorMessage -> {
|
||||
log.warn("Taker error during trade initialization: " + errorMessage);
|
||||
removeTrade(trade);
|
||||
errorMessageHandler.handleErrorMessage(errorMessage);
|
||||
removeTrade(trade);
|
||||
});
|
||||
requestPersistence();
|
||||
}
|
||||
|
@ -205,7 +205,7 @@ public class TradeUtils {
|
||||
// return new Tuple2<>(multiSigAddress.getAddressString(), payoutAddress);
|
||||
}
|
||||
|
||||
public static void waitForLatch(CountDownLatch latch) {
|
||||
public static void awaitLatch(CountDownLatch latch) {
|
||||
try {
|
||||
latch.await();
|
||||
} catch (InterruptedException e) {
|
||||
|
@ -2,7 +2,6 @@ package bisq.core.trade.protocol;
|
||||
|
||||
import bisq.core.trade.ArbitratorTrade;
|
||||
import bisq.core.trade.Trade;
|
||||
import bisq.core.trade.TradeUtils;
|
||||
import bisq.core.trade.messages.DepositRequest;
|
||||
import bisq.core.trade.messages.InitMultisigRequest;
|
||||
import bisq.core.trade.messages.InitTradeRequest;
|
||||
@ -16,7 +15,6 @@ import bisq.core.trade.protocol.tasks.ProcessInitTradeRequest;
|
||||
import bisq.core.trade.protocol.tasks.ProcessSignContractRequest;
|
||||
import bisq.core.util.Validator;
|
||||
import bisq.network.p2p.NodeAddress;
|
||||
import java.util.concurrent.CountDownLatch;
|
||||
import bisq.common.handlers.ErrorMessageHandler;
|
||||
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
@ -32,14 +30,12 @@ public class ArbitratorProtocol extends DisputeProtocol {
|
||||
// Incoming messages
|
||||
///////////////////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
public void handleInitTradeRequest(InitTradeRequest message,
|
||||
NodeAddress peer,
|
||||
ErrorMessageHandler errorMessageHandler) {
|
||||
public void handleInitTradeRequest(InitTradeRequest message, NodeAddress peer, ErrorMessageHandler errorMessageHandler) {
|
||||
System.out.println("ArbitratorProtocol.handleInitTradeRequest()");
|
||||
synchronized (trade) {
|
||||
this.errorMessageHandler = errorMessageHandler;
|
||||
processModel.setTradeMessage(message); // TODO (woodser): confirm these are null without being set
|
||||
CountDownLatch latch = new CountDownLatch(1);
|
||||
//processModel.setTempTradingPeerNodeAddress(peer);
|
||||
latchTrade();
|
||||
expect(phase(Trade.Phase.INIT)
|
||||
.with(message)
|
||||
.from(peer))
|
||||
@ -50,17 +46,16 @@ public class ArbitratorProtocol extends DisputeProtocol {
|
||||
ArbitratorSendsInitTradeAndMultisigRequests.class)
|
||||
.using(new TradeTaskRunner(trade,
|
||||
() -> {
|
||||
latch.countDown();
|
||||
unlatchTrade();
|
||||
handleTaskRunnerSuccess(peer, message);
|
||||
},
|
||||
errorMessage -> {
|
||||
latch.countDown();
|
||||
errorMessageHandler.handleErrorMessage(errorMessage);
|
||||
handleError(errorMessage);
|
||||
handleTaskRunnerFault(peer, message, errorMessage);
|
||||
}))
|
||||
.withTimeout(TRADE_TIMEOUT))
|
||||
.executeTasks();
|
||||
TradeUtils.waitForLatch(latch);
|
||||
awaitTradeLatch();
|
||||
}
|
||||
}
|
||||
|
||||
@ -70,7 +65,7 @@ public class ArbitratorProtocol extends DisputeProtocol {
|
||||
synchronized (trade) {
|
||||
Validator.checkTradeId(processModel.getOfferId(), request);
|
||||
processModel.setTradeMessage(request);
|
||||
CountDownLatch latch = new CountDownLatch(1);
|
||||
latchTrade();
|
||||
expect(anyPhase(Trade.Phase.INIT)
|
||||
.with(request)
|
||||
.from(sender))
|
||||
@ -78,17 +73,16 @@ public class ArbitratorProtocol extends DisputeProtocol {
|
||||
ProcessInitMultisigRequest.class)
|
||||
.using(new TradeTaskRunner(trade,
|
||||
() -> {
|
||||
latch.countDown();
|
||||
unlatchTrade();
|
||||
handleTaskRunnerSuccess(sender, request);
|
||||
},
|
||||
errorMessage -> {
|
||||
latch.countDown();
|
||||
errorMessageHandler.handleErrorMessage(errorMessage);
|
||||
handleError(errorMessage);
|
||||
handleTaskRunnerFault(sender, request, errorMessage);
|
||||
}))
|
||||
.withTimeout(TRADE_TIMEOUT))
|
||||
.executeTasks();
|
||||
TradeUtils.waitForLatch(latch);
|
||||
awaitTradeLatch();
|
||||
}
|
||||
}
|
||||
|
||||
@ -98,7 +92,7 @@ public class ArbitratorProtocol extends DisputeProtocol {
|
||||
synchronized (trade) {
|
||||
Validator.checkTradeId(processModel.getOfferId(), message);
|
||||
processModel.setTradeMessage(message); // TODO (woodser): synchronize access since concurrent requests processed
|
||||
CountDownLatch latch = new CountDownLatch(1);
|
||||
latchTrade();
|
||||
expect(anyPhase(Trade.Phase.INIT)
|
||||
.with(message)
|
||||
.from(sender))
|
||||
@ -107,17 +101,16 @@ public class ArbitratorProtocol extends DisputeProtocol {
|
||||
ProcessSignContractRequest.class)
|
||||
.using(new TradeTaskRunner(trade,
|
||||
() -> {
|
||||
latch.countDown();
|
||||
unlatchTrade();
|
||||
handleTaskRunnerSuccess(sender, message);
|
||||
},
|
||||
errorMessage -> {
|
||||
latch.countDown();
|
||||
errorMessageHandler.handleErrorMessage(errorMessage);
|
||||
handleError(errorMessage);
|
||||
handleTaskRunnerFault(sender, message, errorMessage);
|
||||
}))
|
||||
.withTimeout(TRADE_TIMEOUT))
|
||||
.executeTasks();
|
||||
TradeUtils.waitForLatch(latch);
|
||||
awaitTradeLatch();
|
||||
}
|
||||
}
|
||||
|
||||
@ -126,7 +119,7 @@ public class ArbitratorProtocol extends DisputeProtocol {
|
||||
synchronized (trade) {
|
||||
Validator.checkTradeId(processModel.getOfferId(), request);
|
||||
processModel.setTradeMessage(request);
|
||||
CountDownLatch latch = new CountDownLatch(1);
|
||||
latchTrade();
|
||||
expect(anyPhase(Trade.Phase.INIT)
|
||||
.with(request)
|
||||
.from(sender))
|
||||
@ -134,18 +127,17 @@ public class ArbitratorProtocol extends DisputeProtocol {
|
||||
ArbitratorProcessesDepositRequest.class)
|
||||
.using(new TradeTaskRunner(trade,
|
||||
() -> {
|
||||
latch.countDown();
|
||||
unlatchTrade();
|
||||
stopTimeout();
|
||||
handleTaskRunnerSuccess(sender, request);
|
||||
},
|
||||
errorMessage -> {
|
||||
latch.countDown();
|
||||
errorMessageHandler.handleErrorMessage(errorMessage);
|
||||
handleError(errorMessage);
|
||||
handleTaskRunnerFault(sender, request, errorMessage);
|
||||
}))
|
||||
.withTimeout(TRADE_TIMEOUT))
|
||||
.executeTasks();
|
||||
TradeUtils.waitForLatch(latch);
|
||||
awaitTradeLatch();
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -20,7 +20,6 @@ package bisq.core.trade.protocol;
|
||||
import bisq.core.trade.BuyerAsMakerTrade;
|
||||
import bisq.core.trade.Trade;
|
||||
import bisq.core.trade.Trade.State;
|
||||
import bisq.core.trade.TradeUtils;
|
||||
import bisq.core.trade.messages.DelayedPayoutTxSignatureRequest;
|
||||
import bisq.core.trade.messages.DepositResponse;
|
||||
import bisq.core.trade.messages.DepositTxAndDelayedPayoutTxMessage;
|
||||
@ -49,7 +48,6 @@ import bisq.core.trade.protocol.tasks.maker.MakerVerifyTakerFeePayment;
|
||||
import bisq.core.util.Validator;
|
||||
|
||||
import bisq.network.p2p.NodeAddress;
|
||||
import java.util.concurrent.CountDownLatch;
|
||||
import bisq.common.handlers.ErrorMessageHandler;
|
||||
import bisq.common.handlers.ResultHandler;
|
||||
|
||||
@ -80,7 +78,7 @@ public class BuyerAsMakerProtocol extends BuyerProtocol implements MakerProtocol
|
||||
System.out.println(getClass().getCanonicalName() + ".handleInitTradeRequest()");
|
||||
synchronized (trade) {
|
||||
this.errorMessageHandler = errorMessageHandler;
|
||||
CountDownLatch latch = new CountDownLatch(1);
|
||||
latchTrade();
|
||||
expect(phase(Trade.Phase.INIT)
|
||||
.with(message)
|
||||
.from(peer))
|
||||
@ -91,17 +89,16 @@ public class BuyerAsMakerProtocol extends BuyerProtocol implements MakerProtocol
|
||||
MakerSendsInitTradeRequestIfUnreserved.class)
|
||||
.using(new TradeTaskRunner(trade,
|
||||
() -> {
|
||||
latch.countDown();
|
||||
unlatchTrade();
|
||||
handleTaskRunnerSuccess(peer, message);
|
||||
},
|
||||
errorMessage -> {
|
||||
latch.countDown();
|
||||
handleError(errorMessage);
|
||||
handleTaskRunnerFault(peer, message, errorMessage);
|
||||
errorMessageHandler.handleErrorMessage(errorMessage);
|
||||
}))
|
||||
.withTimeout(TRADE_TIMEOUT))
|
||||
.executeTasks();
|
||||
TradeUtils.waitForLatch(latch);
|
||||
awaitTradeLatch();
|
||||
}
|
||||
}
|
||||
|
||||
@ -111,7 +108,7 @@ public class BuyerAsMakerProtocol extends BuyerProtocol implements MakerProtocol
|
||||
synchronized (trade) {
|
||||
Validator.checkTradeId(processModel.getOfferId(), request);
|
||||
processModel.setTradeMessage(request);
|
||||
CountDownLatch latch = new CountDownLatch(1);
|
||||
latchTrade();
|
||||
expect(anyPhase(Trade.Phase.INIT)
|
||||
.with(request)
|
||||
.from(sender))
|
||||
@ -120,17 +117,15 @@ public class BuyerAsMakerProtocol extends BuyerProtocol implements MakerProtocol
|
||||
SendSignContractRequestAfterMultisig.class)
|
||||
.using(new TradeTaskRunner(trade,
|
||||
() -> {
|
||||
latch.countDown();
|
||||
unlatchTrade();
|
||||
handleTaskRunnerSuccess(sender, request);
|
||||
},
|
||||
errorMessage -> {
|
||||
latch.countDown();
|
||||
handleError(errorMessage);
|
||||
handleTaskRunnerFault(sender, request, errorMessage);
|
||||
errorMessageHandler.handleErrorMessage(errorMessage);
|
||||
}))
|
||||
.withTimeout(TRADE_TIMEOUT))
|
||||
})))
|
||||
.executeTasks();
|
||||
TradeUtils.waitForLatch(latch);
|
||||
awaitTradeLatch();
|
||||
}
|
||||
}
|
||||
|
||||
@ -140,7 +135,7 @@ public class BuyerAsMakerProtocol extends BuyerProtocol implements MakerProtocol
|
||||
synchronized (trade) {
|
||||
Validator.checkTradeId(processModel.getOfferId(), message);
|
||||
processModel.setTradeMessage(message);
|
||||
CountDownLatch latch = new CountDownLatch(1);
|
||||
latchTrade();
|
||||
expect(anyPhase(Trade.Phase.INIT)
|
||||
.with(message)
|
||||
.from(sender))
|
||||
@ -149,17 +144,15 @@ public class BuyerAsMakerProtocol extends BuyerProtocol implements MakerProtocol
|
||||
ProcessSignContractRequest.class)
|
||||
.using(new TradeTaskRunner(trade,
|
||||
() -> {
|
||||
latch.countDown();
|
||||
unlatchTrade();
|
||||
handleTaskRunnerSuccess(sender, message);
|
||||
},
|
||||
errorMessage -> {
|
||||
latch.countDown();
|
||||
handleError(errorMessage);
|
||||
handleTaskRunnerFault(sender, message, errorMessage);
|
||||
errorMessageHandler.handleErrorMessage(errorMessage);
|
||||
}))
|
||||
.withTimeout(TRADE_TIMEOUT))
|
||||
})))
|
||||
.executeTasks();
|
||||
TradeUtils.waitForLatch(latch);
|
||||
awaitTradeLatch();
|
||||
}
|
||||
}
|
||||
|
||||
@ -170,7 +163,7 @@ public class BuyerAsMakerProtocol extends BuyerProtocol implements MakerProtocol
|
||||
Validator.checkTradeId(processModel.getOfferId(), message);
|
||||
if (trade.getState() == State.CONTRACT_SIGNATURE_REQUESTED) {
|
||||
processModel.setTradeMessage(message);
|
||||
CountDownLatch latch = new CountDownLatch(1);
|
||||
if (tradeLatch == null) latchTrade(); // may be initialized from previous message
|
||||
expect(state(Trade.State.CONTRACT_SIGNATURE_REQUESTED)
|
||||
.with(message)
|
||||
.from(sender))
|
||||
@ -179,17 +172,16 @@ public class BuyerAsMakerProtocol extends BuyerProtocol implements MakerProtocol
|
||||
ProcessSignContractResponse.class)
|
||||
.using(new TradeTaskRunner(trade,
|
||||
() -> {
|
||||
latch.countDown();
|
||||
unlatchTrade();
|
||||
handleTaskRunnerSuccess(sender, message);
|
||||
},
|
||||
errorMessage -> {
|
||||
latch.countDown();
|
||||
handleError(errorMessage);
|
||||
handleTaskRunnerFault(sender, message, errorMessage);
|
||||
errorMessageHandler.handleErrorMessage(errorMessage);
|
||||
}))
|
||||
.withTimeout(TRADE_TIMEOUT))
|
||||
.withTimeout(TRADE_TIMEOUT)) // extend timeout
|
||||
.executeTasks();
|
||||
TradeUtils.waitForLatch(latch);
|
||||
awaitTradeLatch();
|
||||
} else {
|
||||
EasyBind.subscribe(trade.stateProperty(), state -> {
|
||||
if (state == State.CONTRACT_SIGNATURE_REQUESTED) handleSignContractResponse(message, sender);
|
||||
@ -204,7 +196,7 @@ public class BuyerAsMakerProtocol extends BuyerProtocol implements MakerProtocol
|
||||
synchronized (trade) {
|
||||
Validator.checkTradeId(processModel.getOfferId(), response);
|
||||
processModel.setTradeMessage(response);
|
||||
CountDownLatch latch = new CountDownLatch(1);
|
||||
latchTrade();
|
||||
expect(state(Trade.State.CONTRACT_SIGNATURE_REQUESTED)
|
||||
.with(response)
|
||||
.from(sender)) // TODO (woodser): ensure this asserts sender == response.getSenderNodeAddress()
|
||||
@ -213,17 +205,16 @@ public class BuyerAsMakerProtocol extends BuyerProtocol implements MakerProtocol
|
||||
ProcessDepositResponse.class)
|
||||
.using(new TradeTaskRunner(trade,
|
||||
() -> {
|
||||
latch.countDown();
|
||||
unlatchTrade();
|
||||
handleTaskRunnerSuccess(sender, response);
|
||||
},
|
||||
errorMessage -> {
|
||||
latch.countDown();
|
||||
handleError(errorMessage);
|
||||
handleTaskRunnerFault(sender, response, errorMessage);
|
||||
errorMessageHandler.handleErrorMessage(errorMessage);
|
||||
}))
|
||||
.withTimeout(TRADE_TIMEOUT))
|
||||
.executeTasks();
|
||||
TradeUtils.waitForLatch(latch);
|
||||
awaitTradeLatch();
|
||||
}
|
||||
}
|
||||
|
||||
@ -234,7 +225,7 @@ public class BuyerAsMakerProtocol extends BuyerProtocol implements MakerProtocol
|
||||
Validator.checkTradeId(processModel.getOfferId(), request);
|
||||
if (trade.getState() == State.MAKER_RECEIVED_DEPOSIT_TX_PUBLISHED_MSG) {
|
||||
processModel.setTradeMessage(request);
|
||||
CountDownLatch latch = new CountDownLatch(1);
|
||||
if (tradeLatch == null) latchTrade(); // may be initialized from previous message
|
||||
expect(state(Trade.State.MAKER_RECEIVED_DEPOSIT_TX_PUBLISHED_MSG)
|
||||
.with(request)
|
||||
.from(sender)) // TODO (woodser): ensure this asserts sender == response.getSenderNodeAddress()
|
||||
@ -244,18 +235,17 @@ public class BuyerAsMakerProtocol extends BuyerProtocol implements MakerProtocol
|
||||
MakerRemovesOpenOffer.class)
|
||||
.using(new TradeTaskRunner(trade,
|
||||
() -> {
|
||||
latch.countDown();
|
||||
stopTimeout();
|
||||
unlatchTrade();
|
||||
handleTaskRunnerSuccess(sender, request);
|
||||
},
|
||||
errorMessage -> {
|
||||
latch.countDown();
|
||||
handleError(errorMessage);
|
||||
handleTaskRunnerFault(sender, request, errorMessage);
|
||||
errorMessageHandler.handleErrorMessage(errorMessage);
|
||||
}))
|
||||
.withTimeout(TRADE_TIMEOUT))
|
||||
.executeTasks();
|
||||
TradeUtils.waitForLatch(latch);
|
||||
awaitTradeLatch();
|
||||
} else {
|
||||
EasyBind.subscribe(trade.stateProperty(), state -> {
|
||||
if (state == State.MAKER_RECEIVED_DEPOSIT_TX_PUBLISHED_MSG) handlePaymentAccountPayloadRequest(request, sender);
|
||||
|
@ -22,7 +22,6 @@ import bisq.core.offer.Offer;
|
||||
import bisq.core.trade.BuyerAsTakerTrade;
|
||||
import bisq.core.trade.Trade;
|
||||
import bisq.core.trade.Trade.State;
|
||||
import bisq.core.trade.TradeUtils;
|
||||
import bisq.core.trade.handlers.TradeResultHandler;
|
||||
import bisq.core.trade.messages.DelayedPayoutTxSignatureRequest;
|
||||
import bisq.core.trade.messages.DepositResponse;
|
||||
@ -58,7 +57,6 @@ import bisq.core.trade.protocol.tasks.taker.TakerSendsInitTradeRequestToArbitrat
|
||||
import bisq.core.trade.protocol.tasks.taker.TakerVerifyMakerFeePayment;
|
||||
import bisq.core.util.Validator;
|
||||
import bisq.network.p2p.NodeAddress;
|
||||
import java.util.concurrent.CountDownLatch;
|
||||
import bisq.common.handlers.ErrorMessageHandler;
|
||||
import bisq.common.handlers.ResultHandler;
|
||||
|
||||
@ -98,7 +96,7 @@ public class BuyerAsTakerProtocol extends BuyerProtocol implements TakerProtocol
|
||||
synchronized (trade) {
|
||||
this.tradeResultHandler = tradeResultHandler;
|
||||
this.errorMessageHandler = errorMessageHandler;
|
||||
CountDownLatch latch = new CountDownLatch(1);
|
||||
latchTrade();
|
||||
expect(phase(Trade.Phase.INIT)
|
||||
.with(TakerEvent.TAKE_OFFER)
|
||||
.from(trade.getTradingPeerNodeAddress()))
|
||||
@ -108,15 +106,14 @@ public class BuyerAsTakerProtocol extends BuyerProtocol implements TakerProtocol
|
||||
TakerSendsInitTradeRequestToArbitrator.class)
|
||||
.using(new TradeTaskRunner(trade,
|
||||
() -> {
|
||||
latch.countDown();
|
||||
unlatchTrade();
|
||||
},
|
||||
errorMessage -> {
|
||||
latch.countDown();
|
||||
errorMessageHandler.handleErrorMessage(errorMessage);
|
||||
handleError(errorMessage);
|
||||
}))
|
||||
.withTimeout(TRADE_TIMEOUT))
|
||||
.executeTasks();
|
||||
TradeUtils.waitForLatch(latch);
|
||||
awaitTradeLatch();
|
||||
}
|
||||
}
|
||||
|
||||
@ -126,7 +123,7 @@ public class BuyerAsTakerProtocol extends BuyerProtocol implements TakerProtocol
|
||||
synchronized (trade) {
|
||||
Validator.checkTradeId(processModel.getOfferId(), request);
|
||||
processModel.setTradeMessage(request);
|
||||
CountDownLatch latch = new CountDownLatch(1);
|
||||
latchTrade();
|
||||
expect(anyPhase(Trade.Phase.INIT)
|
||||
.with(request)
|
||||
.from(sender))
|
||||
@ -135,17 +132,16 @@ public class BuyerAsTakerProtocol extends BuyerProtocol implements TakerProtocol
|
||||
SendSignContractRequestAfterMultisig.class)
|
||||
.using(new TradeTaskRunner(trade,
|
||||
() -> {
|
||||
latch.countDown();
|
||||
unlatchTrade();
|
||||
handleTaskRunnerSuccess(sender, request);
|
||||
},
|
||||
errorMessage -> {
|
||||
latch.countDown();
|
||||
handleError(errorMessage);
|
||||
handleTaskRunnerFault(sender, request, errorMessage);
|
||||
errorMessageHandler.handleErrorMessage(errorMessage);
|
||||
}))
|
||||
.withTimeout(TRADE_TIMEOUT))
|
||||
.executeTasks();
|
||||
TradeUtils.waitForLatch(latch);
|
||||
awaitTradeLatch();
|
||||
}
|
||||
}
|
||||
|
||||
@ -155,7 +151,7 @@ public class BuyerAsTakerProtocol extends BuyerProtocol implements TakerProtocol
|
||||
synchronized (trade) {
|
||||
Validator.checkTradeId(processModel.getOfferId(), message);
|
||||
processModel.setTradeMessage(message);
|
||||
CountDownLatch latch = new CountDownLatch(1);
|
||||
latchTrade();
|
||||
expect(anyPhase(Trade.Phase.INIT)
|
||||
.with(message)
|
||||
.from(sender))
|
||||
@ -164,17 +160,16 @@ public class BuyerAsTakerProtocol extends BuyerProtocol implements TakerProtocol
|
||||
ProcessSignContractRequest.class)
|
||||
.using(new TradeTaskRunner(trade,
|
||||
() -> {
|
||||
latch.countDown();
|
||||
unlatchTrade();
|
||||
handleTaskRunnerSuccess(sender, message);
|
||||
},
|
||||
errorMessage -> {
|
||||
latch.countDown();
|
||||
handleError(errorMessage);
|
||||
handleTaskRunnerFault(sender, message, errorMessage);
|
||||
errorMessageHandler.handleErrorMessage(errorMessage);
|
||||
}))
|
||||
.withTimeout(TRADE_TIMEOUT))
|
||||
.executeTasks();
|
||||
TradeUtils.waitForLatch(latch);
|
||||
awaitTradeLatch();
|
||||
}
|
||||
}
|
||||
|
||||
@ -185,7 +180,7 @@ public class BuyerAsTakerProtocol extends BuyerProtocol implements TakerProtocol
|
||||
Validator.checkTradeId(processModel.getOfferId(), message);
|
||||
if (trade.getState() == State.CONTRACT_SIGNATURE_REQUESTED) {
|
||||
processModel.setTradeMessage(message);
|
||||
CountDownLatch latch = new CountDownLatch(1);
|
||||
if (tradeLatch == null) latchTrade(); // may be initialized from previous message
|
||||
expect(state(Trade.State.CONTRACT_SIGNATURE_REQUESTED)
|
||||
.with(message)
|
||||
.from(sender))
|
||||
@ -194,17 +189,16 @@ public class BuyerAsTakerProtocol extends BuyerProtocol implements TakerProtocol
|
||||
ProcessSignContractResponse.class)
|
||||
.using(new TradeTaskRunner(trade,
|
||||
() -> {
|
||||
latch.countDown();
|
||||
unlatchTrade();
|
||||
handleTaskRunnerSuccess(sender, message);
|
||||
},
|
||||
errorMessage -> {
|
||||
latch.countDown();
|
||||
handleError(errorMessage);
|
||||
handleTaskRunnerFault(sender, message, errorMessage);
|
||||
errorMessageHandler.handleErrorMessage(errorMessage);
|
||||
}))
|
||||
.withTimeout(TRADE_TIMEOUT))
|
||||
.executeTasks();
|
||||
TradeUtils.waitForLatch(latch);
|
||||
awaitTradeLatch();
|
||||
} else {
|
||||
EasyBind.subscribe(trade.stateProperty(), state -> {
|
||||
if (state != State.CONTRACT_SIGNATURE_REQUESTED) return;
|
||||
@ -220,7 +214,7 @@ public class BuyerAsTakerProtocol extends BuyerProtocol implements TakerProtocol
|
||||
synchronized (trade) {
|
||||
Validator.checkTradeId(processModel.getOfferId(), response);
|
||||
processModel.setTradeMessage(response);
|
||||
CountDownLatch latch = new CountDownLatch(1);
|
||||
latchTrade();
|
||||
expect(state(Trade.State.CONTRACT_SIGNATURE_REQUESTED)
|
||||
.with(response)
|
||||
.from(sender))
|
||||
@ -229,17 +223,16 @@ public class BuyerAsTakerProtocol extends BuyerProtocol implements TakerProtocol
|
||||
ProcessDepositResponse.class)
|
||||
.using(new TradeTaskRunner(trade,
|
||||
() -> {
|
||||
latch.countDown();
|
||||
unlatchTrade();
|
||||
handleTaskRunnerSuccess(sender, response);
|
||||
},
|
||||
errorMessage -> {
|
||||
latch.countDown();
|
||||
handleError(errorMessage);
|
||||
handleTaskRunnerFault(sender, response, errorMessage);
|
||||
errorMessageHandler.handleErrorMessage(errorMessage);
|
||||
}))
|
||||
.withTimeout(TRADE_TIMEOUT))
|
||||
.executeTasks();
|
||||
TradeUtils.waitForLatch(latch);
|
||||
awaitTradeLatch();
|
||||
}
|
||||
}
|
||||
|
||||
@ -248,7 +241,7 @@ public class BuyerAsTakerProtocol extends BuyerProtocol implements TakerProtocol
|
||||
System.out.println(getClass().getCanonicalName() + ".handlePaymentAccountPayloadRequest()");
|
||||
synchronized (trade) {
|
||||
Validator.checkTradeId(processModel.getOfferId(), request);
|
||||
CountDownLatch latch = new CountDownLatch(1);
|
||||
if (tradeLatch == null) latchTrade(); // may be initialized from previous message
|
||||
if (trade.getState() == State.MAKER_RECEIVED_DEPOSIT_TX_PUBLISHED_MSG) {
|
||||
processModel.setTradeMessage(request);
|
||||
expect(state(Trade.State.MAKER_RECEIVED_DEPOSIT_TX_PUBLISHED_MSG) // TODO (woodser): rename to RECEIVED_DEPOSIT_TX_PUBLISHED_MSG
|
||||
@ -259,19 +252,18 @@ public class BuyerAsTakerProtocol extends BuyerProtocol implements TakerProtocol
|
||||
ProcessPaymentAccountPayloadRequest.class)
|
||||
.using(new TradeTaskRunner(trade,
|
||||
() -> {
|
||||
latch.countDown();
|
||||
unlatchTrade();
|
||||
stopTimeout();
|
||||
handleTaskRunnerSuccess(sender, request);
|
||||
tradeResultHandler.handleResult(trade); // trade is initialized
|
||||
},
|
||||
errorMessage -> {
|
||||
latch.countDown();
|
||||
handleError(errorMessage);
|
||||
handleTaskRunnerFault(sender, request, errorMessage);
|
||||
errorMessageHandler.handleErrorMessage(errorMessage);
|
||||
}))
|
||||
.withTimeout(TRADE_TIMEOUT))
|
||||
.executeTasks();
|
||||
TradeUtils.waitForLatch(latch);
|
||||
awaitTradeLatch();
|
||||
} else {
|
||||
EasyBind.subscribe(trade.stateProperty(), state -> {
|
||||
if (state == State.MAKER_RECEIVED_DEPOSIT_TX_PUBLISHED_MSG) handlePaymentAccountPayloadRequest(request, sender);
|
||||
|
@ -19,7 +19,6 @@ package bisq.core.trade.protocol;
|
||||
|
||||
import bisq.core.trade.BuyerTrade;
|
||||
import bisq.core.trade.Trade;
|
||||
import bisq.core.trade.TradeUtils;
|
||||
import bisq.core.trade.messages.DelayedPayoutTxSignatureRequest;
|
||||
import bisq.core.trade.messages.DepositTxAndDelayedPayoutTxMessage;
|
||||
import bisq.core.trade.messages.PaymentReceivedMessage;
|
||||
@ -164,8 +163,7 @@ public abstract class BuyerProtocol extends DisputeProtocol {
|
||||
log.info("BuyerProtocol.handle(PaymentReceivedMessage)");
|
||||
synchronized (trade) {
|
||||
processModel.setTradeMessage(message);
|
||||
processModel.setTempTradingPeerNodeAddress(peer);
|
||||
CountDownLatch latch = new CountDownLatch(1);
|
||||
latchTrade();
|
||||
expect(anyPhase(Trade.Phase.PAYMENT_SENT, Trade.Phase.PAYOUT_PUBLISHED)
|
||||
.with(message)
|
||||
.from(peer))
|
||||
@ -174,15 +172,15 @@ public abstract class BuyerProtocol extends DisputeProtocol {
|
||||
BuyerProcessesPaymentReceivedMessage.class)
|
||||
.using(new TradeTaskRunner(trade,
|
||||
() -> {
|
||||
latch.countDown();
|
||||
unlatchTrade();
|
||||
handleTaskRunnerSuccess(peer, message);
|
||||
},
|
||||
errorMessage -> {
|
||||
latch.countDown();
|
||||
handleError(errorMessage);
|
||||
handleTaskRunnerFault(peer, message, errorMessage);
|
||||
})))
|
||||
.executeTasks();
|
||||
TradeUtils.waitForLatch(latch);
|
||||
awaitTradeLatch();
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -21,7 +21,6 @@ package bisq.core.trade.protocol;
|
||||
import bisq.core.trade.SellerAsMakerTrade;
|
||||
import bisq.core.trade.Trade;
|
||||
import bisq.core.trade.Trade.State;
|
||||
import bisq.core.trade.TradeUtils;
|
||||
import bisq.core.trade.messages.PaymentSentMessage;
|
||||
import bisq.core.trade.messages.DepositResponse;
|
||||
import bisq.core.trade.messages.DepositTxMessage;
|
||||
@ -49,7 +48,6 @@ import bisq.core.trade.protocol.tasks.seller_as_maker.SellerAsMakerFinalizesDepo
|
||||
import bisq.core.trade.protocol.tasks.seller_as_maker.SellerAsMakerProcessDepositTxMessage;
|
||||
import bisq.core.util.Validator;
|
||||
import bisq.network.p2p.NodeAddress;
|
||||
import java.util.concurrent.CountDownLatch;
|
||||
import bisq.common.handlers.ErrorMessageHandler;
|
||||
import bisq.common.handlers.ResultHandler;
|
||||
|
||||
@ -80,7 +78,7 @@ public class SellerAsMakerProtocol extends SellerProtocol implements MakerProtoc
|
||||
System.out.println(getClass().getCanonicalName() + ".handleInitTradeRequest()");
|
||||
synchronized (trade) {
|
||||
this.errorMessageHandler = errorMessageHandler;
|
||||
CountDownLatch latch = new CountDownLatch(1);
|
||||
latchTrade();
|
||||
expect(phase(Trade.Phase.INIT)
|
||||
.with(message)
|
||||
.from(peer))
|
||||
@ -91,17 +89,16 @@ public class SellerAsMakerProtocol extends SellerProtocol implements MakerProtoc
|
||||
MakerSendsInitTradeRequestIfUnreserved.class)
|
||||
.using(new TradeTaskRunner(trade,
|
||||
() -> {
|
||||
latch.countDown();
|
||||
unlatchTrade();
|
||||
handleTaskRunnerSuccess(peer, message);
|
||||
},
|
||||
errorMessage -> {
|
||||
latch.countDown();
|
||||
handleError(errorMessage);
|
||||
handleTaskRunnerFault(peer, message, errorMessage);
|
||||
errorMessageHandler.handleErrorMessage(errorMessage);
|
||||
}))
|
||||
.withTimeout(TRADE_TIMEOUT))
|
||||
.executeTasks();
|
||||
TradeUtils.waitForLatch(latch);
|
||||
awaitTradeLatch();
|
||||
}
|
||||
}
|
||||
|
||||
@ -111,7 +108,7 @@ public class SellerAsMakerProtocol extends SellerProtocol implements MakerProtoc
|
||||
synchronized (trade) {
|
||||
Validator.checkTradeId(processModel.getOfferId(), request);
|
||||
processModel.setTradeMessage(request);
|
||||
CountDownLatch latch = new CountDownLatch(1);
|
||||
latchTrade();
|
||||
expect(anyPhase(Trade.Phase.INIT)
|
||||
.with(request)
|
||||
.from(sender))
|
||||
@ -120,17 +117,16 @@ public class SellerAsMakerProtocol extends SellerProtocol implements MakerProtoc
|
||||
SendSignContractRequestAfterMultisig.class)
|
||||
.using(new TradeTaskRunner(trade,
|
||||
() -> {
|
||||
latch.countDown();
|
||||
unlatchTrade();
|
||||
handleTaskRunnerSuccess(sender, request);
|
||||
},
|
||||
errorMessage -> {
|
||||
latch.countDown();
|
||||
handleError(errorMessage);
|
||||
handleTaskRunnerFault(sender, request, errorMessage);
|
||||
errorMessageHandler.handleErrorMessage(errorMessage);
|
||||
}))
|
||||
.withTimeout(TRADE_TIMEOUT))
|
||||
.executeTasks();
|
||||
TradeUtils.waitForLatch(latch);
|
||||
awaitTradeLatch();
|
||||
}
|
||||
}
|
||||
|
||||
@ -140,7 +136,7 @@ public class SellerAsMakerProtocol extends SellerProtocol implements MakerProtoc
|
||||
synchronized (trade) {
|
||||
Validator.checkTradeId(processModel.getOfferId(), message);
|
||||
processModel.setTradeMessage(message);
|
||||
CountDownLatch latch = new CountDownLatch(1);
|
||||
latchTrade();
|
||||
expect(anyPhase(Trade.Phase.INIT)
|
||||
.with(message)
|
||||
.from(sender))
|
||||
@ -149,17 +145,16 @@ public class SellerAsMakerProtocol extends SellerProtocol implements MakerProtoc
|
||||
ProcessSignContractRequest.class)
|
||||
.using(new TradeTaskRunner(trade,
|
||||
() -> {
|
||||
latch.countDown();
|
||||
unlatchTrade();
|
||||
handleTaskRunnerSuccess(sender, message);
|
||||
},
|
||||
errorMessage -> {
|
||||
latch.countDown();
|
||||
handleError(errorMessage);
|
||||
handleTaskRunnerFault(sender, message, errorMessage);
|
||||
errorMessageHandler.handleErrorMessage(errorMessage);
|
||||
}))
|
||||
.withTimeout(TRADE_TIMEOUT))
|
||||
.executeTasks();
|
||||
TradeUtils.waitForLatch(latch);
|
||||
awaitTradeLatch();
|
||||
}
|
||||
}
|
||||
|
||||
@ -170,7 +165,7 @@ public class SellerAsMakerProtocol extends SellerProtocol implements MakerProtoc
|
||||
Validator.checkTradeId(processModel.getOfferId(), message);
|
||||
if (trade.getState() == State.CONTRACT_SIGNATURE_REQUESTED) {
|
||||
processModel.setTradeMessage(message);
|
||||
CountDownLatch latch = new CountDownLatch(1);
|
||||
if (tradeLatch == null) latchTrade(); // may be initialized from previous message
|
||||
expect(state(Trade.State.CONTRACT_SIGNATURE_REQUESTED)
|
||||
.with(message)
|
||||
.from(sender))
|
||||
@ -179,17 +174,16 @@ public class SellerAsMakerProtocol extends SellerProtocol implements MakerProtoc
|
||||
ProcessSignContractResponse.class)
|
||||
.using(new TradeTaskRunner(trade,
|
||||
() -> {
|
||||
latch.countDown();
|
||||
unlatchTrade();
|
||||
handleTaskRunnerSuccess(sender, message);
|
||||
},
|
||||
errorMessage -> {
|
||||
latch.countDown();
|
||||
handleError(errorMessage);
|
||||
handleTaskRunnerFault(sender, message, errorMessage);
|
||||
errorMessageHandler.handleErrorMessage(errorMessage);
|
||||
}))
|
||||
.withTimeout(TRADE_TIMEOUT))
|
||||
.executeTasks();
|
||||
TradeUtils.waitForLatch(latch);
|
||||
awaitTradeLatch();
|
||||
} else {
|
||||
EasyBind.subscribe(trade.stateProperty(), state -> {
|
||||
if (state == State.CONTRACT_SIGNATURE_REQUESTED) handleSignContractResponse(message, sender);
|
||||
@ -204,7 +198,7 @@ public class SellerAsMakerProtocol extends SellerProtocol implements MakerProtoc
|
||||
synchronized (trade) {
|
||||
Validator.checkTradeId(processModel.getOfferId(), response);
|
||||
processModel.setTradeMessage(response);
|
||||
CountDownLatch latch = new CountDownLatch(1);
|
||||
latchTrade();
|
||||
expect(state(Trade.State.CONTRACT_SIGNATURE_REQUESTED)
|
||||
.with(response)
|
||||
.from(sender)) // TODO (woodser): ensure this asserts sender == response.getSenderNodeAddress()
|
||||
@ -213,17 +207,16 @@ public class SellerAsMakerProtocol extends SellerProtocol implements MakerProtoc
|
||||
ProcessDepositResponse.class)
|
||||
.using(new TradeTaskRunner(trade,
|
||||
() -> {
|
||||
latch.countDown();
|
||||
unlatchTrade();
|
||||
handleTaskRunnerSuccess(sender, response);
|
||||
},
|
||||
errorMessage -> {
|
||||
latch.countDown();
|
||||
handleError(errorMessage);
|
||||
handleTaskRunnerFault(sender, response, errorMessage);
|
||||
errorMessageHandler.handleErrorMessage(errorMessage);
|
||||
}))
|
||||
.withTimeout(TRADE_TIMEOUT))
|
||||
.executeTasks();
|
||||
TradeUtils.waitForLatch(latch);
|
||||
awaitTradeLatch();
|
||||
}
|
||||
}
|
||||
|
||||
@ -234,7 +227,7 @@ public class SellerAsMakerProtocol extends SellerProtocol implements MakerProtoc
|
||||
Validator.checkTradeId(processModel.getOfferId(), request);
|
||||
if (trade.getState() == State.MAKER_RECEIVED_DEPOSIT_TX_PUBLISHED_MSG) {
|
||||
processModel.setTradeMessage(request);
|
||||
CountDownLatch latch = new CountDownLatch(1);
|
||||
if (tradeLatch == null) latchTrade(); // may be initialized from previous message
|
||||
expect(state(Trade.State.MAKER_RECEIVED_DEPOSIT_TX_PUBLISHED_MSG)
|
||||
.with(request)
|
||||
.from(sender)) // TODO (woodser): ensure this asserts sender == response.getSenderNodeAddress()
|
||||
@ -244,18 +237,17 @@ public class SellerAsMakerProtocol extends SellerProtocol implements MakerProtoc
|
||||
MakerRemovesOpenOffer.class)
|
||||
.using(new TradeTaskRunner(trade,
|
||||
() -> {
|
||||
latch.countDown();
|
||||
unlatchTrade();
|
||||
stopTimeout();
|
||||
handleTaskRunnerSuccess(sender, request);
|
||||
},
|
||||
errorMessage -> {
|
||||
latch.countDown();
|
||||
handleError(errorMessage);
|
||||
handleTaskRunnerFault(sender, request, errorMessage);
|
||||
errorMessageHandler.handleErrorMessage(errorMessage);
|
||||
}))
|
||||
.withTimeout(TRADE_TIMEOUT))
|
||||
.executeTasks();
|
||||
TradeUtils.waitForLatch(latch);
|
||||
awaitTradeLatch();
|
||||
} else {
|
||||
EasyBind.subscribe(trade.stateProperty(), state -> {
|
||||
if (state == State.MAKER_RECEIVED_DEPOSIT_TX_PUBLISHED_MSG) handlePaymentAccountPayloadRequest(request, sender);
|
||||
|
@ -22,7 +22,6 @@ import bisq.core.offer.Offer;
|
||||
import bisq.core.trade.SellerAsTakerTrade;
|
||||
import bisq.core.trade.Trade;
|
||||
import bisq.core.trade.Trade.State;
|
||||
import bisq.core.trade.TradeUtils;
|
||||
import bisq.core.trade.handlers.TradeResultHandler;
|
||||
import bisq.core.trade.messages.PaymentSentMessage;
|
||||
import bisq.core.trade.messages.DepositResponse;
|
||||
@ -53,7 +52,6 @@ import bisq.core.trade.protocol.tasks.taker.TakerVerifyMakerFeePayment;
|
||||
import bisq.core.util.Validator;
|
||||
|
||||
import bisq.network.p2p.NodeAddress;
|
||||
import java.util.concurrent.CountDownLatch;
|
||||
import bisq.common.handlers.ErrorMessageHandler;
|
||||
import bisq.common.handlers.ResultHandler;
|
||||
|
||||
@ -91,7 +89,7 @@ public class SellerAsTakerProtocol extends SellerProtocol implements TakerProtoc
|
||||
synchronized (trade) {
|
||||
this.tradeResultHandler = tradeResultHandler;
|
||||
this.errorMessageHandler = errorMessageHandler;
|
||||
CountDownLatch latch = new CountDownLatch(1);
|
||||
latchTrade();
|
||||
expect(phase(Trade.Phase.INIT)
|
||||
.with(TakerEvent.TAKE_OFFER)
|
||||
.from(trade.getTradingPeerNodeAddress()))
|
||||
@ -101,15 +99,14 @@ public class SellerAsTakerProtocol extends SellerProtocol implements TakerProtoc
|
||||
TakerSendsInitTradeRequestToArbitrator.class)
|
||||
.using(new TradeTaskRunner(trade,
|
||||
() -> {
|
||||
latch.countDown();
|
||||
unlatchTrade();
|
||||
},
|
||||
errorMessage -> {
|
||||
latch.countDown();
|
||||
errorMessageHandler.handleErrorMessage(errorMessage);
|
||||
handleError(errorMessage);
|
||||
}))
|
||||
.withTimeout(TRADE_TIMEOUT))
|
||||
.executeTasks();
|
||||
TradeUtils.waitForLatch(latch);
|
||||
awaitTradeLatch();
|
||||
}
|
||||
}
|
||||
|
||||
@ -119,7 +116,7 @@ public class SellerAsTakerProtocol extends SellerProtocol implements TakerProtoc
|
||||
synchronized (trade) {
|
||||
Validator.checkTradeId(processModel.getOfferId(), request);
|
||||
processModel.setTradeMessage(request);
|
||||
CountDownLatch latch = new CountDownLatch(1);
|
||||
latchTrade();
|
||||
expect(anyPhase(Trade.Phase.INIT)
|
||||
.with(request)
|
||||
.from(sender))
|
||||
@ -128,17 +125,16 @@ public class SellerAsTakerProtocol extends SellerProtocol implements TakerProtoc
|
||||
SendSignContractRequestAfterMultisig.class)
|
||||
.using(new TradeTaskRunner(trade,
|
||||
() -> {
|
||||
latch.countDown();
|
||||
unlatchTrade();
|
||||
handleTaskRunnerSuccess(sender, request);
|
||||
},
|
||||
errorMessage -> {
|
||||
latch.countDown();
|
||||
handleError(errorMessage);
|
||||
handleTaskRunnerFault(sender, request, errorMessage);
|
||||
errorMessageHandler.handleErrorMessage(errorMessage);
|
||||
}))
|
||||
.withTimeout(TRADE_TIMEOUT))
|
||||
.executeTasks();
|
||||
TradeUtils.waitForLatch(latch);
|
||||
awaitTradeLatch();
|
||||
}
|
||||
}
|
||||
|
||||
@ -148,7 +144,7 @@ public class SellerAsTakerProtocol extends SellerProtocol implements TakerProtoc
|
||||
synchronized (trade) {
|
||||
Validator.checkTradeId(processModel.getOfferId(), message);
|
||||
processModel.setTradeMessage(message);
|
||||
CountDownLatch latch = new CountDownLatch(1);
|
||||
latchTrade();
|
||||
expect(anyPhase(Trade.Phase.INIT)
|
||||
.with(message)
|
||||
.from(sender))
|
||||
@ -157,17 +153,16 @@ public class SellerAsTakerProtocol extends SellerProtocol implements TakerProtoc
|
||||
ProcessSignContractRequest.class)
|
||||
.using(new TradeTaskRunner(trade,
|
||||
() -> {
|
||||
latch.countDown();
|
||||
unlatchTrade();
|
||||
handleTaskRunnerSuccess(sender, message);
|
||||
},
|
||||
errorMessage -> {
|
||||
latch.countDown();
|
||||
handleError(errorMessage);
|
||||
handleTaskRunnerFault(sender, message, errorMessage);
|
||||
errorMessageHandler.handleErrorMessage(errorMessage);
|
||||
}))
|
||||
.withTimeout(TRADE_TIMEOUT))
|
||||
.executeTasks();
|
||||
TradeUtils.waitForLatch(latch);
|
||||
awaitTradeLatch();
|
||||
}
|
||||
}
|
||||
|
||||
@ -178,7 +173,7 @@ public class SellerAsTakerProtocol extends SellerProtocol implements TakerProtoc
|
||||
Validator.checkTradeId(processModel.getOfferId(), message);
|
||||
if (trade.getState() == State.CONTRACT_SIGNATURE_REQUESTED) {
|
||||
processModel.setTradeMessage(message);
|
||||
CountDownLatch latch = new CountDownLatch(1);
|
||||
if (tradeLatch == null) latchTrade(); // may be initialized from previous message
|
||||
expect(state(Trade.State.CONTRACT_SIGNATURE_REQUESTED)
|
||||
.with(message)
|
||||
.from(sender))
|
||||
@ -187,17 +182,16 @@ public class SellerAsTakerProtocol extends SellerProtocol implements TakerProtoc
|
||||
ProcessSignContractResponse.class)
|
||||
.using(new TradeTaskRunner(trade,
|
||||
() -> {
|
||||
latch.countDown();
|
||||
unlatchTrade();
|
||||
handleTaskRunnerSuccess(sender, message);
|
||||
},
|
||||
errorMessage -> {
|
||||
latch.countDown();
|
||||
handleError(errorMessage);
|
||||
handleTaskRunnerFault(sender, message, errorMessage);
|
||||
errorMessageHandler.handleErrorMessage(errorMessage);
|
||||
}))
|
||||
.withTimeout(TRADE_TIMEOUT))
|
||||
.executeTasks();
|
||||
TradeUtils.waitForLatch(latch);
|
||||
awaitTradeLatch();
|
||||
} else {
|
||||
EasyBind.subscribe(trade.stateProperty(), state -> {
|
||||
if (state != State.CONTRACT_SIGNATURE_REQUESTED) return;
|
||||
@ -213,7 +207,7 @@ public class SellerAsTakerProtocol extends SellerProtocol implements TakerProtoc
|
||||
synchronized (trade) {
|
||||
Validator.checkTradeId(processModel.getOfferId(), response);
|
||||
processModel.setTradeMessage(response);
|
||||
CountDownLatch latch = new CountDownLatch(1);
|
||||
latchTrade();
|
||||
expect(state(Trade.State.CONTRACT_SIGNATURE_REQUESTED)
|
||||
.with(response)
|
||||
.from(sender))
|
||||
@ -222,17 +216,16 @@ public class SellerAsTakerProtocol extends SellerProtocol implements TakerProtoc
|
||||
ProcessDepositResponse.class)
|
||||
.using(new TradeTaskRunner(trade,
|
||||
() -> {
|
||||
latch.countDown();
|
||||
unlatchTrade();
|
||||
handleTaskRunnerSuccess(sender, response);
|
||||
},
|
||||
errorMessage -> {
|
||||
latch.countDown();
|
||||
handleError(errorMessage);
|
||||
handleTaskRunnerFault(sender, response, errorMessage);
|
||||
errorMessageHandler.handleErrorMessage(errorMessage);
|
||||
}))
|
||||
.withTimeout(TRADE_TIMEOUT))
|
||||
.executeTasks();
|
||||
TradeUtils.waitForLatch(latch);
|
||||
awaitTradeLatch();
|
||||
}
|
||||
}
|
||||
|
||||
@ -241,7 +234,7 @@ public class SellerAsTakerProtocol extends SellerProtocol implements TakerProtoc
|
||||
System.out.println(getClass().getCanonicalName() + ".handlePaymentAccountPayloadRequest()");
|
||||
synchronized (trade) {
|
||||
Validator.checkTradeId(processModel.getOfferId(), request);
|
||||
CountDownLatch latch = new CountDownLatch(1);
|
||||
if (tradeLatch == null) latchTrade(); // may be initialized from previous message
|
||||
if (trade.getState() == State.MAKER_RECEIVED_DEPOSIT_TX_PUBLISHED_MSG) {
|
||||
processModel.setTradeMessage(request);
|
||||
expect(state(Trade.State.MAKER_RECEIVED_DEPOSIT_TX_PUBLISHED_MSG) // TODO (woodser): rename to RECEIVED_DEPOSIT_TX_PUBLISHED_MSG
|
||||
@ -252,19 +245,18 @@ public class SellerAsTakerProtocol extends SellerProtocol implements TakerProtoc
|
||||
ProcessPaymentAccountPayloadRequest.class)
|
||||
.using(new TradeTaskRunner(trade,
|
||||
() -> {
|
||||
latch.countDown();
|
||||
unlatchTrade();
|
||||
stopTimeout();
|
||||
handleTaskRunnerSuccess(sender, request);
|
||||
tradeResultHandler.handleResult(trade); // trade is initialized
|
||||
},
|
||||
errorMessage -> {
|
||||
latch.countDown();
|
||||
handleError(errorMessage);
|
||||
handleTaskRunnerFault(sender, request, errorMessage);
|
||||
errorMessageHandler.handleErrorMessage(errorMessage);
|
||||
}))
|
||||
.withTimeout(TRADE_TIMEOUT))
|
||||
.executeTasks();
|
||||
TradeUtils.waitForLatch(latch);
|
||||
awaitTradeLatch();
|
||||
} else {
|
||||
EasyBind.subscribe(trade.stateProperty(), state -> {
|
||||
if (state == State.MAKER_RECEIVED_DEPOSIT_TX_PUBLISHED_MSG) handlePaymentAccountPayloadRequest(request, sender);
|
||||
|
@ -30,7 +30,6 @@ import bisq.core.trade.protocol.tasks.seller.SellerSendsPaymentReceivedMessage;
|
||||
import bisq.core.trade.protocol.tasks.seller.SellerPreparesPaymentReceivedMessage;
|
||||
|
||||
import bisq.network.p2p.NodeAddress;
|
||||
import java.util.concurrent.CountDownLatch;
|
||||
import bisq.common.handlers.ErrorMessageHandler;
|
||||
import bisq.common.handlers.ResultHandler;
|
||||
|
||||
|
@ -64,6 +64,7 @@ public abstract class TradeProtocol implements DecryptedDirectMessageListener, D
|
||||
|
||||
protected final ProcessModel processModel;
|
||||
protected final Trade trade;
|
||||
protected CountDownLatch tradeLatch; // to synchronize on trade
|
||||
private Timer timeoutTimer;
|
||||
protected TradeResultHandler tradeResultHandler;
|
||||
protected ErrorMessageHandler errorMessageHandler;
|
||||
@ -220,16 +221,15 @@ public abstract class TradeProtocol implements DecryptedDirectMessageListener, D
|
||||
synchronized (trade) {
|
||||
Validator.checkTradeId(processModel.getOfferId(), message);
|
||||
processModel.setTradeMessage(message);
|
||||
CountDownLatch latch = new CountDownLatch(1);
|
||||
latchTrade();
|
||||
TradeTaskRunner taskRunner = new TradeTaskRunner(trade,
|
||||
() -> {
|
||||
unlatchTrade();
|
||||
stopTimeout();
|
||||
latch.countDown();
|
||||
handleTaskRunnerSuccess(peer, message, "handleUpdateMultisigRequest");
|
||||
},
|
||||
errorMessage -> {
|
||||
latch.countDown();
|
||||
errorMessageHandler.handleErrorMessage(errorMessage);
|
||||
handleError(errorMessage);
|
||||
handleTaskRunnerFault(peer, message, errorMessage);
|
||||
});
|
||||
taskRunner.addTasks(
|
||||
@ -237,7 +237,7 @@ public abstract class TradeProtocol implements DecryptedDirectMessageListener, D
|
||||
);
|
||||
startTimeout(TRADE_TIMEOUT);
|
||||
taskRunner.run();
|
||||
TradeUtils.waitForLatch(latch);
|
||||
awaitTradeLatch();
|
||||
}
|
||||
}
|
||||
|
||||
@ -365,6 +365,32 @@ public abstract class TradeProtocol implements DecryptedDirectMessageListener, D
|
||||
);
|
||||
}
|
||||
|
||||
// TODO: trade protocols block if these are synchronized
|
||||
|
||||
protected void handleError(String errorMessage) {
|
||||
log.error(errorMessage);
|
||||
unlatchTrade();
|
||||
trade.setErrorMessage(errorMessage);
|
||||
if (errorMessageHandler != null) errorMessageHandler.handleErrorMessage(errorMessage);
|
||||
processModel.getTradeManager().requestPersistence();
|
||||
cleanup();
|
||||
}
|
||||
|
||||
protected void latchTrade() {
|
||||
if (tradeLatch != null) throw new RuntimeException("Trade latch is not null. This should never happen.");
|
||||
tradeLatch = new CountDownLatch(1);
|
||||
}
|
||||
|
||||
protected void unlatchTrade() {
|
||||
if (tradeLatch != null) tradeLatch.countDown();
|
||||
tradeLatch = null;
|
||||
}
|
||||
|
||||
protected void awaitTradeLatch() {
|
||||
if (tradeLatch == null) return;
|
||||
TradeUtils.awaitLatch(tradeLatch);
|
||||
}
|
||||
|
||||
///////////////////////////////////////////////////////////////////////////////////////////
|
||||
// Timeout
|
||||
///////////////////////////////////////////////////////////////////////////////////////////
|
||||
@ -372,11 +398,7 @@ public abstract class TradeProtocol implements DecryptedDirectMessageListener, D
|
||||
protected void startTimeout(long timeoutSec) {
|
||||
stopTimeout();
|
||||
timeoutTimer = UserThread.runAfter(() -> {
|
||||
log.error("Timeout reached. TradeID={}, state={}, timeoutSec={}", trade.getId(), trade.stateProperty().get(), timeoutSec);
|
||||
trade.setErrorMessage("Timeout reached. Protocol did not complete in " + timeoutSec + " sec.");
|
||||
if (errorMessageHandler != null) errorMessageHandler.handleErrorMessage("Timeout reached. Protocol did not complete in " + timeoutSec + " sec. TradeID=" + trade.getId() + ", state=" + trade.stateProperty().get());
|
||||
processModel.getTradeManager().requestPersistence();
|
||||
cleanup();
|
||||
handleError("Timeout reached. Protocol did not complete in " + timeoutSec + " sec. TradeID=" + trade.getId() + ", state=" + trade.stateProperty().get());
|
||||
}, timeoutSec);
|
||||
}
|
||||
|
||||
|
@ -22,7 +22,7 @@ import bisq.desktop.main.offer.OfferDataModel;
|
||||
import bisq.desktop.main.offer.offerbook.OfferBook;
|
||||
import bisq.desktop.main.overlays.popups.Popup;
|
||||
import bisq.desktop.util.GUIUtil;
|
||||
|
||||
import bisq.common.handlers.ErrorMessageHandler;
|
||||
import bisq.core.account.witness.AccountAgeWitnessService;
|
||||
import bisq.core.btc.listeners.XmrBalanceListener;
|
||||
import bisq.core.btc.model.XmrAddressEntry;
|
||||
@ -302,7 +302,7 @@ class TakeOfferDataModel extends OfferDataModel {
|
||||
|
||||
// errorMessageHandler is used only in the check availability phase. As soon we have a trade we write the error msg in the trade object as we want to
|
||||
// have it persisted as well.
|
||||
void onTakeOffer(TradeResultHandler tradeResultHandler) {
|
||||
void onTakeOffer(TradeResultHandler tradeResultHandler, ErrorMessageHandler errorMessageHandler) {
|
||||
checkNotNull(txFeeFromFeeService, "txFeeFromFeeService must not be null");
|
||||
checkNotNull(getTakerFee(), "takerFee must not be null");
|
||||
|
||||
@ -334,7 +334,7 @@ class TakeOfferDataModel extends OfferDataModel {
|
||||
tradeResultHandler,
|
||||
errorMessage -> {
|
||||
log.warn(errorMessage);
|
||||
new Popup().warning(errorMessage).show();
|
||||
errorMessageHandler.handleErrorMessage(errorMessage);
|
||||
}
|
||||
);
|
||||
}
|
||||
|
@ -29,7 +29,6 @@ import bisq.desktop.util.DisplayUtils;
|
||||
import bisq.desktop.util.GUIUtil;
|
||||
import bisq.core.account.witness.AccountAgeWitnessService;
|
||||
import bisq.core.btc.wallet.Restrictions;
|
||||
import bisq.core.locale.CurrencyUtil;
|
||||
import bisq.core.locale.Res;
|
||||
import bisq.core.monetary.Price;
|
||||
import bisq.core.offer.Offer;
|
||||
@ -124,13 +123,11 @@ class TakeOfferViewModel extends ActivatableWithDataModel<TakeOfferDataModel> im
|
||||
private ChangeListener<Coin> amountAsCoinListener;
|
||||
private ChangeListener<Boolean> isWalletFundedListener;
|
||||
private ChangeListener<Trade.State> tradeStateListener;
|
||||
private ChangeListener<String> tradeErrorListener;
|
||||
private ChangeListener<Offer.State> offerStateListener;
|
||||
private ChangeListener<String> offerErrorListener;
|
||||
private ChangeListener<Number> getMempoolStatusListener;
|
||||
private ConnectionListener connectionListener;
|
||||
// private Subscription isFeeSufficientSubscription;
|
||||
private Runnable takeOfferSucceededHandler;
|
||||
private Runnable takeOfferResultHandler;
|
||||
String marketPriceMargin;
|
||||
|
||||
///////////////////////////////////////////////////////////////////////////////////////////
|
||||
@ -220,11 +217,6 @@ class TakeOfferViewModel extends ActivatableWithDataModel<TakeOfferDataModel> im
|
||||
|
||||
checkNotNull(dataModel.getAddressEntry(), "dataModel.getAddressEntry() must not be null");
|
||||
|
||||
offerErrorListener = (observable, oldValue, newValue) -> {
|
||||
if (newValue != null)
|
||||
errorMessage.set(newValue);
|
||||
};
|
||||
offer.errorMessageProperty().addListener(offerErrorListener);
|
||||
errorMessage.set(offer.getErrorMessage());
|
||||
|
||||
btcValidator.setMaxValue(offer.getAmount());
|
||||
@ -237,7 +229,7 @@ class TakeOfferViewModel extends ActivatableWithDataModel<TakeOfferDataModel> im
|
||||
///////////////////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
void onTakeOffer(Runnable resultHandler) {
|
||||
takeOfferSucceededHandler = resultHandler;
|
||||
takeOfferResultHandler = resultHandler;
|
||||
takeOfferRequested = true;
|
||||
showTransactionPublishedScreen.set(false);
|
||||
dataModel.onTakeOffer(trade -> {
|
||||
@ -245,9 +237,10 @@ class TakeOfferViewModel extends ActivatableWithDataModel<TakeOfferDataModel> im
|
||||
takeOfferCompleted.set(true);
|
||||
trade.stateProperty().addListener(tradeStateListener);
|
||||
applyTradeState();
|
||||
trade.errorMessageProperty().addListener(tradeErrorListener);
|
||||
applyTradeErrorMessage(trade.getErrorMessage());
|
||||
takeOfferCompleted.set(true);
|
||||
}, errMessage -> {
|
||||
applyTradeErrorMessage(errMessage);
|
||||
});
|
||||
|
||||
updateButtonDisableState();
|
||||
@ -418,6 +411,7 @@ class TakeOfferViewModel extends ActivatableWithDataModel<TakeOfferDataModel> im
|
||||
private void applyTradeErrorMessage(@Nullable String errorMessage) {
|
||||
if (errorMessage != null) {
|
||||
String appendMsg = "";
|
||||
if (trade != null) {
|
||||
switch (trade.getState().getPhase()) {
|
||||
case INIT:
|
||||
appendMsg = Res.get("takeOffer.error.noFundsLost");
|
||||
@ -434,13 +428,16 @@ class TakeOfferViewModel extends ActivatableWithDataModel<TakeOfferDataModel> im
|
||||
case WITHDRAWN:
|
||||
appendMsg = Res.get("takeOffer.error.payoutPublished");
|
||||
break;
|
||||
default:
|
||||
break;
|
||||
}
|
||||
}
|
||||
this.errorMessage.set(errorMessage + appendMsg);
|
||||
|
||||
updateSpinnerInfo();
|
||||
|
||||
if (takeOfferSucceededHandler != null)
|
||||
takeOfferSucceededHandler.run();
|
||||
if (takeOfferResultHandler != null)
|
||||
takeOfferResultHandler.run();
|
||||
} else {
|
||||
this.errorMessage.set(null);
|
||||
}
|
||||
@ -449,8 +446,8 @@ class TakeOfferViewModel extends ActivatableWithDataModel<TakeOfferDataModel> im
|
||||
private void applyTradeState() {
|
||||
if (trade.isTakerFeePublished()) {
|
||||
if (trade.getTakerFeeTxId() != null) {
|
||||
if (takeOfferSucceededHandler != null)
|
||||
takeOfferSucceededHandler.run();
|
||||
if (takeOfferResultHandler != null)
|
||||
takeOfferResultHandler.run();
|
||||
|
||||
showTransactionPublishedScreen.set(true);
|
||||
updateSpinnerInfo();
|
||||
@ -505,7 +502,6 @@ class TakeOfferViewModel extends ActivatableWithDataModel<TakeOfferDataModel> im
|
||||
isWalletFundedListener = (ov, oldValue, newValue) -> updateButtonDisableState();
|
||||
|
||||
tradeStateListener = (ov, oldValue, newValue) -> applyTradeState();
|
||||
tradeErrorListener = (ov, oldValue, newValue) -> applyTradeErrorMessage(newValue);
|
||||
offerStateListener = (ov, oldValue, newValue) -> applyOfferState(newValue);
|
||||
|
||||
getMempoolStatusListener = (observable, oldValue, newValue) -> {
|
||||
@ -583,12 +579,10 @@ class TakeOfferViewModel extends ActivatableWithDataModel<TakeOfferDataModel> im
|
||||
dataModel.getIsBtcWalletFunded().removeListener(isWalletFundedListener);
|
||||
if (offer != null) {
|
||||
offer.stateProperty().removeListener(offerStateListener);
|
||||
offer.errorMessageProperty().removeListener(offerErrorListener);
|
||||
}
|
||||
|
||||
if (trade != null) {
|
||||
trade.stateProperty().removeListener(tradeStateListener);
|
||||
trade.errorMessageProperty().removeListener(tradeErrorListener);
|
||||
}
|
||||
p2PService.getNetworkNode().removeConnectionListener(connectionListener);
|
||||
//isFeeSufficientSubscription.unsubscribe();
|
||||
|
Loading…
Reference in New Issue
Block a user