refactor trade init error handling, fix deadlock in offer book service

wait min of 1 min and 1 conf before deleting trade with fund request
This commit is contained in:
woodser 2023-04-22 17:26:06 -04:00
parent a7ab31d44e
commit e0929653af
10 changed files with 179 additions and 89 deletions

View File

@ -165,7 +165,7 @@ public final class CoreMoneroConnectionsService {
return socks5ProxyProvider.getSocks5Proxy() == null ? null : socks5ProxyProvider.getSocks5Proxy().getInetAddress().getHostAddress() + ":" + socks5ProxyProvider.getSocks5Proxy().getPort();
}
public void addListener(MoneroConnectionManagerListener listener) {
public void addConnectionListener(MoneroConnectionManagerListener listener) {
synchronized (lock) {
listeners.add(listener);
}

View File

@ -94,7 +94,7 @@ public class OfferBookService {
jsonFileManager = new JsonFileManager(storageDir);
// listen for connection changes to monerod
connectionsService.addListener(new MoneroConnectionManagerListener() {
connectionsService.addConnectionListener(new MoneroConnectionManagerListener() {
@Override
public void onConnectionChanged(MoneroRpcConnection connection) {
maybeInitializeKeyImagePoller();
@ -297,8 +297,12 @@ public class OfferBookService {
if (offer.getOfferPayload().getReserveTxKeyImages().contains(keyImage)) {
synchronized (offerBookChangedListeners) {
offerBookChangedListeners.forEach(listener -> {
listener.onRemoved(offer);
listener.onAdded(offer);
// notify off thread to avoid deadlocking
new Thread(() -> {
listener.onRemoved(offer);
listener.onAdded(offer);
}).start();
});
}
}

View File

@ -198,20 +198,20 @@ public class OpenOfferManager implements PeerManager.Listener, DecryptedDirectMe
this.signedOfferPersistenceManager.initialize(signedOffers, "SignedOffers", PersistenceManager.Source.PRIVATE); // arbitrator stores reserve tx for signed offers
// listen for connection changes to monerod
connectionsService.addListener(new MoneroConnectionManagerListener() {
connectionsService.addConnectionListener(new MoneroConnectionManagerListener() {
@Override
public void onConnectionChanged(MoneroRpcConnection connection) {
maybeInitializeKeyImagePoller();
}
});
// remove open offer if reserved funds spent
// close open offer if reserved funds spent
offerBookService.addOfferBookChangedListener(new OfferBookChangedListener() {
@Override
public void onAdded(Offer offer) {
Optional<OpenOffer> openOfferOptional = getOpenOfferById(offer.getId());
if (openOfferOptional.isPresent() && openOfferOptional.get().getState() != OpenOffer.State.RESERVED && offer.isReservedFundsSpent()) {
removeOpenOffer(openOfferOptional.get(), null);
closeOpenOffer(offer);
}
}
@Override
@ -637,6 +637,7 @@ public class OpenOfferManager implements PeerManager.Listener, DecryptedDirectMe
}
}
// remove open offer which thaws its key images
private void onRemoved(@NotNull OpenOffer openOffer) {
Offer offer = openOffer.getOffer();
if (offer.getOfferPayload().getReserveTxKeyImages() != null) {
@ -652,7 +653,7 @@ public class OpenOfferManager implements PeerManager.Listener, DecryptedDirectMe
requestPersistence();
}
// Close openOffer after deposit published
// close open offer after key images spent
public void closeOpenOffer(Offer offer) {
getOpenOfferById(offer.getId()).ifPresent(openOffer -> {
removeOpenOffer(openOffer);

View File

@ -578,7 +578,7 @@ public abstract class Trade implements Tradable, Model {
});
// listen to daemon connection
xmrWalletService.getConnectionsService().addListener(newConnection -> onConnectionChanged(newConnection));
xmrWalletService.getConnectionsService().addConnectionListener(newConnection -> onConnectionChanged(newConnection));
// check if done
if (isPayoutUnlocked()) {
@ -841,7 +841,7 @@ public abstract class Trade implements Tradable, Model {
xmrWalletService.deleteWallet(getWalletName());
// delete trade wallet backups unless deposits requested and payouts not unlocked
if (isDepositRequested() && !isPayoutUnlocked()) {
if (isDepositRequested() && !isDepositFailed() && !isPayoutUnlocked()) {
log.warn("Refusing to delete backup wallet for " + getClass().getSimpleName() + " " + getId() + " in the small chance it becomes funded");
}
xmrWalletService.deleteWalletBackups(getWalletName());

View File

@ -21,7 +21,6 @@ import com.google.common.collect.ImmutableList;
import common.utils.GenUtils;
import haveno.common.ClockWatcher;
import haveno.common.UserThread;
import haveno.common.crypto.KeyRing;
import haveno.common.handlers.ErrorMessageHandler;
import haveno.common.handlers.FaultHandler;
@ -88,6 +87,7 @@ import monero.wallet.model.MoneroOutputQuery;
import org.bitcoinj.core.Coin;
import org.bouncycastle.crypto.params.KeyParameter;
import org.fxmisc.easybind.EasyBind;
import org.fxmisc.easybind.Subscription;
import org.fxmisc.easybind.monadic.MonadicBinding;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -292,10 +292,7 @@ public class TradeManager implements PersistedDataHost, DecryptedDirectMessageLi
log.info("{}.onShutDownStarted()", getClass().getSimpleName());
// collect trades to prepare
Set<Trade> trades = new HashSet<Trade>();
trades.addAll(tradableList.getList());
trades.addAll(closedTradableManager.getClosedTrades());
trades.addAll(failedTradesManager.getObservableList());
List<Trade> trades = getAllTrades();
// prepare to shut down trades in parallel
Set<Runnable> tasks = new HashSet<Runnable>();
@ -408,14 +405,25 @@ public class TradeManager implements PersistedDataHost, DecryptedDirectMessageLi
// initialize trades in parallel
int threadPoolSize = 10;
Set<Runnable> tasks = new HashSet<Runnable>();
Set<String> uids = new HashSet<String>();
Set<Trade> tradesToSkip = new HashSet<Trade>();
for (Trade trade : trades) {
tasks.add(() -> {
try {
// check for duplicate uid
if (!uids.add(trade.getUid())) {
log.warn("Found trade with duplicate uid, skipping. That should never happen. {} {}, uid={}", trade.getClass().getSimpleName(), trade.getId(), trade.getUid());
tradesToSkip.add(trade);
return;
}
// initialize trade
initPersistedTrade(trade);
// remove trade if protocol didn't initialize
if (getOpenTradeByUid(trade.getId()).isPresent() && !trade.isDepositRequested()) {
log.warn("Removing persisted {} {} with uid={} because it did not finish initializing (state={})", trade.getClass().getSimpleName(), trade.getId(), trade.getUid(), trade.getState());
if (getOpenTradeByUid(trade.getUid()).isPresent() && !trade.isDepositsPublished()) {
log.warn("Maybe removing persisted {} {} with uid={} because it did not finish initializing (state={})", trade.getClass().getSimpleName(), trade.getId(), trade.getUid(), trade.getState());
maybeRemoveTradeOnError(trade);
}
} catch (Exception e) {
@ -429,11 +437,12 @@ public class TradeManager implements PersistedDataHost, DecryptedDirectMessageLi
log.info("Done initializing persisted trades");
if (isShutDown) return;
// remove skipped trades
trades.removeAll(tradesToSkip);
// sync idle trades once in background after active trades
for (Trade trade : trades) {
if (trade.isIdling()) {
HavenoUtils.submitTask(() -> trade.syncWallet());
}
if (trade.isIdling()) HavenoUtils.submitTask(() -> trade.syncWallet());
}
getObservableList().addListener((ListChangeListener<Trade>) change -> onTradesChanged());
@ -480,7 +489,7 @@ public class TradeManager implements PersistedDataHost, DecryptedDirectMessageLi
if (getTradeProtocol(trade) != null) return;
initTradeAndProtocol(trade, createTradeProtocol(trade));
requestPersistence();
scheduleDeletionIfUnfunded(trade);
listenForCleanup(trade);
}
private void initTradeAndProtocol(Trade trade, TradeProtocol tradeProtocol) {
@ -585,11 +594,7 @@ public class TradeManager implements PersistedDataHost, DecryptedDirectMessageLi
((ArbitratorProtocol) getTradeProtocol(trade)).handleInitTradeRequest(request, sender, errorMessage -> {
log.warn("Arbitrator error during trade initialization for trade {}: {}", trade.getId(), errorMessage);
if (trade.getMaker().getReserveTxHash() != null || trade.getTaker().getReserveTxHash() != null) {
onMoveInvalidTradeToFailedTrades(trade); // arbitrator retains failed trades for analysis and penalty
} else {
maybeRemoveTradeOnError(trade);
}
maybeRemoveTradeOnError(trade);
if (takeOfferRequestErrorMessageHandler != null) takeOfferRequestErrorMessageHandler.handleErrorMessage(errorMessage);
});
@ -625,7 +630,7 @@ public class TradeManager implements PersistedDataHost, DecryptedDirectMessageLi
}
// reserve open offer
openOfferManager.reserveOpenOffer(openOffer); // TODO (woodser): reserve offer if arbitrator? probably. or, arbitrator does not have open offer?
openOfferManager.reserveOpenOffer(openOffer);
// get expected taker fee
BigInteger takerFee = HavenoUtils.getTakerFee(BigInteger.valueOf(offer.getOfferPayload().getAmount()));
@ -677,7 +682,6 @@ public class TradeManager implements PersistedDataHost, DecryptedDirectMessageLi
((MakerProtocol) getTradeProtocol(trade)).handleInitTradeRequest(request, sender, errorMessage -> {
log.warn("Maker error during trade initialization: " + errorMessage);
maybeRemoveTradeOnError(trade);
openOfferManager.unreserveOpenOffer(openOffer); // offer remains available // TODO: only unreserve if funds not deposited to multisig
if (takeOfferRequestErrorMessageHandler != null) takeOfferRequestErrorMessageHandler.handleErrorMessage(errorMessage);
});
@ -989,7 +993,7 @@ public class TradeManager implements PersistedDataHost, DecryptedDirectMessageLi
// If trade is in already in critical state (if taker role: taker fee; both roles: after deposit published)
// we move the trade to failedTradesManager
public void onMoveInvalidTradeToFailedTrades(Trade trade) {
maybeRemoveTradeOnError(trade);
removeTrade(trade);
failedTradesManager.add(trade);
}
@ -1160,6 +1164,10 @@ public class TradeManager implements PersistedDataHost, DecryptedDirectMessageLi
}
}
public List<Trade> getClosedTrades() {
return closedTradableManager.getClosedTrades();
}
public Optional<Trade> getClosedTrade(String tradeId) {
return closedTradableManager.getClosedTrades().stream().filter(e -> e.getId().equals(tradeId)).findFirst();
}
@ -1187,9 +1195,18 @@ public class TradeManager implements PersistedDataHost, DecryptedDirectMessageLi
}
private void maybeRemoveTradeOnError(Trade trade) {
log.info("TradeManager.maybeRemoveTradeOnError() " + trade.getId());
synchronized (tradableList) {
if (!tradableList.contains(trade)) return;
if (trade.isDepositRequested() && !trade.isDepositFailed()) {
listenForCleanup(trade);
} else {
removeTradeOnError(trade);
}
}
}
private void removeTradeOnError(Trade trade) {
log.info("TradeManager.removeTradeOnError() " + trade.getId());
synchronized (tradableList) {
// unreserve taker key images
if (trade instanceof TakerTrade && trade.getSelf().getReserveTxKeyImages() != null) {
@ -1198,42 +1215,100 @@ public class TradeManager implements PersistedDataHost, DecryptedDirectMessageLi
trade.getSelf().setReserveTxKeyImages(null);
}
// remove trade if wallet deleted
if (!trade.walletExists()) {
removeTrade(trade);
return;
// unreserve open offer
Optional<OpenOffer> openOffer = openOfferManager.getOpenOfferById(trade.getId());
if (trade instanceof MakerTrade && openOffer.isPresent()) {
openOfferManager.unreserveOpenOffer(openOffer.get());
}
// remove trade and wallet unless deposit requested without nack
if (!trade.isDepositRequested() || trade.isDepositFailed()) {
removeTrade(trade);
if (trade.walletExists()) trade.deleteWallet();
// remove trade from list
removeTrade(trade);
// delete trade wallet
if (trade.walletExists()) trade.deleteWallet();
}
}
private void listenForCleanup(Trade trade) {
if (getOpenTrade(trade.getId()).isPresent() && trade.isDepositRequested()) {
if (trade.isDepositsPublished()) {
cleanupPublishedTrade(trade);
} else {
scheduleDeletionIfUnfunded(trade);
log.warn("Scheduling to delete open trade if unfunded for {} {}", trade.getClass().getSimpleName(), trade.getId());
new TradeCleanupListener(trade); // TODO: better way than creating listener?
}
}
}
private void scheduleDeletionIfUnfunded(Trade trade) {
if (getOpenTrade(trade.getId()).isPresent() && trade.isDepositRequested() && !trade.isDepositsPublished()) {
log.warn("Scheduling to delete open trade if unfunded for {} {}", trade.getClass().getSimpleName(), trade.getId());
UserThread.runAfter(() -> {
if (isShutDown) return;
private void cleanupPublishedTrade(Trade trade) {
if (trade instanceof MakerTrade && openOfferManager.getOpenOfferById(trade.getId()).isPresent()) {
log.warn("Closing open offer as cleanup step");
openOfferManager.closeOpenOffer(checkNotNull(trade.getOffer()));
}
}
// get trade's deposit txs from daemon
MoneroTx makerDepositTx = trade.getMaker().getDepositTxHash() == null ? null : xmrWalletService.getDaemon().getTx(trade.getMaker().getDepositTxHash());
MoneroTx takerDepositTx = trade.getTaker().getDepositTxHash() == null ? null : xmrWalletService.getDaemon().getTx(trade.getTaker().getDepositTxHash());
private class TradeCleanupListener {
// delete multisig trade wallet if neither deposit tx published
if (makerDepositTx == null && takerDepositTx == null) {
log.warn("Deleting {} {} after protocol error", trade.getClass().getSimpleName(), trade.getId());
removeTrade(trade);
failedTradesManager.removeTrade(trade);
if (trade.walletExists()) trade.deleteWallet();
} else {
log.warn("Refusing to delete {} {} after protocol timeout because its wallet might be funded", trade.getClass().getSimpleName(), trade.getId());
private static final long REMOVE_AFTER_MS = 60000;
private static final int REMOVE_AFTER_NUM_CONFIRMATIONS = 1;
private Long startHeight;
private Subscription stateSubscription;
private Subscription heightSubscription;
public TradeCleanupListener(Trade trade) {
// listen for deposits published to close open offer
stateSubscription = EasyBind.subscribe(trade.stateProperty(), state -> {
if (trade.isDepositsPublished()) {
cleanupPublishedTrade(trade);
if (stateSubscription != null) {
stateSubscription.unsubscribe();
stateSubscription = null;
}
}
}, 60);
});
// listen for block confirmation to remove trade
long startTime = System.currentTimeMillis();
heightSubscription = EasyBind.subscribe(xmrWalletService.getConnectionsService().chainHeightProperty(), lastBlockHeight -> {
if (isShutDown) return;
if (startHeight == null) startHeight = lastBlockHeight.longValue();
if (lastBlockHeight.longValue() >= startHeight + REMOVE_AFTER_NUM_CONFIRMATIONS) {
new Thread(() -> {
// wait minimum time
GenUtils.waitFor(Math.max(0, REMOVE_AFTER_MS - (System.currentTimeMillis() - startTime)));
// get trade's deposit txs from daemon
MoneroTx makerDepositTx = trade.getMaker().getDepositTxHash() == null ? null : xmrWalletService.getDaemon().getTx(trade.getMaker().getDepositTxHash());
MoneroTx takerDepositTx = trade.getTaker().getDepositTxHash() == null ? null : xmrWalletService.getDaemon().getTx(trade.getTaker().getDepositTxHash());
// remove trade and wallet if neither deposit tx published
if (makerDepositTx == null && takerDepositTx == null) {
log.warn("Deleting {} {} after protocol error", trade.getClass().getSimpleName(), trade.getId());
if (trade instanceof ArbitratorTrade && (trade.getMaker().getReserveTxHash() != null || trade.getTaker().getReserveTxHash() != null)) {
onMoveInvalidTradeToFailedTrades(trade); // arbitrator retains trades with reserved funds for analysis and penalty
} else {
removeTradeOnError(trade);
failedTradesManager.removeTrade(trade);
}
} else if (!trade.isPayoutPublished()) {
// set error that wallet may be partially funded
String errorMessage = "Refusing to delete " + trade.getClass().getSimpleName() + " " + trade.getId() + " after protocol timeout because its wallet might be funded";
trade.prependErrorMessage(errorMessage);
log.warn(errorMessage);
}
// unsubscribe
if (heightSubscription != null) {
heightSubscription.unsubscribe();
heightSubscription = null;
}
}).start();
}
});
}
}

View File

@ -39,6 +39,7 @@ public class ProcessDepositResponse extends TradeTask {
// throw if error
DepositResponse message = (DepositResponse) processModel.getTradeMessage();
if (message.getErrorMessage() != null) {
trade.setStateIfValidTransitionTo(Trade.State.PUBLISH_DEPOSIT_TX_REQUEST_FAILED);
throw new RuntimeException(message.getErrorMessage());
}
@ -48,7 +49,6 @@ public class ProcessDepositResponse extends TradeTask {
processModel.getTradeManager().requestPersistence();
complete();
} catch (Throwable t) {
trade.setStateIfValidTransitionTo(Trade.State.PUBLISH_DEPOSIT_TX_REQUEST_FAILED);
failed(t);
}
}

View File

@ -219,6 +219,18 @@ public class MoneroKeyImagePoller {
}
}
/**
* Get the last known spent status for the given key image.
*
* @param keyImage the key image to get the spent status for
* @return the last known spent status of the key image
*/
public MoneroKeyImageSpentStatus getLastSpentStatus(String keyImage) {
synchronized (lastStatuses) {
return lastStatuses.get(keyImage);
}
}
public void poll() {
if (daemon == null) {
log.warn("Cannot poll key images because daemon is null");

View File

@ -605,7 +605,7 @@ public class XmrWalletService {
maybeInitMainWallet();
// set and listen to daemon connection
connectionsService.addListener(newConnection -> onConnectionChanged(newConnection));
connectionsService.addConnectionListener(newConnection -> onConnectionChanged(newConnection));
}
private synchronized void maybeInitMainWallet() {

View File

@ -17,7 +17,6 @@
package haveno.desktop.main.portfolio.pendingtrades.steps.buyer;
import com.jfoenix.controls.JFXBadge;
import haveno.common.UserThread;
import haveno.common.app.DevEnv;
import haveno.core.locale.Res;
@ -35,9 +34,7 @@ import haveno.desktop.main.portfolio.pendingtrades.PendingTradesViewModel;
import haveno.desktop.main.portfolio.pendingtrades.steps.TradeStepView;
import haveno.desktop.util.Layout;
import javafx.geometry.Insets;
import javafx.geometry.Pos;
import javafx.scene.control.Button;
import javafx.scene.control.Label;
import javafx.scene.layout.GridPane;
import javafx.scene.layout.HBox;
import javafx.scene.layout.Priority;

View File

@ -544,37 +544,38 @@ public class Connection implements HasCapabilities, Runnable, MessageListener {
}
private void doShutDown(CloseConnectionReason closeConnectionReason, @Nullable Runnable shutDownCompleteHandler) {
// Use UserThread.execute as its not clear if that is called from a non-UserThread
UserThread.execute(() -> connectionListener.onDisconnect(closeConnectionReason, this));
try {
socket.close();
} catch (SocketException e) {
log.trace("SocketException at shutdown might be expected {}", e.getMessage());
} catch (IOException e) {
log.error("Exception at shutdown. " + e.getMessage());
e.printStackTrace();
} finally {
protoOutputStream.onConnectionShutdown();
capabilitiesListeners.clear();
UserThread.execute(() -> {
connectionListener.onDisconnect(closeConnectionReason, this);
try {
protoInputStream.close();
socket.close();
} catch (SocketException e) {
log.trace("SocketException at shutdown might be expected {}", e.getMessage());
} catch (IOException e) {
log.error(e.getMessage());
log.error("Exception at shutdown. " + e.getMessage());
e.printStackTrace();
} finally {
protoOutputStream.onConnectionShutdown();
capabilitiesListeners.clear();
try {
protoInputStream.close();
} catch (IOException e) {
log.error(e.getMessage());
e.printStackTrace();
}
//noinspection UnstableApiUsage
MoreExecutors.shutdownAndAwaitTermination(singleThreadExecutor, 500, TimeUnit.MILLISECONDS);
//noinspection UnstableApiUsage
MoreExecutors.shutdownAndAwaitTermination(bundleSender, 500, TimeUnit.MILLISECONDS);
log.debug("Connection shutdown complete {}", this.toString());
// Use UserThread.execute as its not clear if that is called from a non-UserThread
if (shutDownCompleteHandler != null)
UserThread.execute(shutDownCompleteHandler);
}
//noinspection UnstableApiUsage
MoreExecutors.shutdownAndAwaitTermination(singleThreadExecutor, 500, TimeUnit.MILLISECONDS);
//noinspection UnstableApiUsage
MoreExecutors.shutdownAndAwaitTermination(bundleSender, 500, TimeUnit.MILLISECONDS);
log.debug("Connection shutdown complete {}", this.toString());
// Use UserThread.execute as its not clear if that is called from a non-UserThread
if (shutDownCompleteHandler != null)
UserThread.execute(shutDownCompleteHandler);
}
});
}
@Override