From e0929653afaae358eda526c2415dccd468ca4487 Mon Sep 17 00:00:00 2001 From: woodser Date: Sat, 22 Apr 2023 17:26:06 -0400 Subject: [PATCH] refactor trade init error handling, fix deadlock in offer book service wait min of 1 min and 1 conf before deleting trade with fund request --- .../api/CoreMoneroConnectionsService.java | 2 +- .../haveno/core/offer/OfferBookService.java | 10 +- .../haveno/core/offer/OpenOfferManager.java | 9 +- .../main/java/haveno/core/trade/Trade.java | 4 +- .../java/haveno/core/trade/TradeManager.java | 169 +++++++++++++----- .../tasks/ProcessDepositResponse.java | 2 +- .../core/xmr/wallet/MoneroKeyImagePoller.java | 12 ++ .../core/xmr/wallet/XmrWalletService.java | 2 +- .../steps/buyer/BuyerStep4View.java | 3 - .../network/p2p/network/Connection.java | 55 +++--- 10 files changed, 179 insertions(+), 89 deletions(-) diff --git a/core/src/main/java/haveno/core/api/CoreMoneroConnectionsService.java b/core/src/main/java/haveno/core/api/CoreMoneroConnectionsService.java index f1075b6dc1..fb67a95f45 100644 --- a/core/src/main/java/haveno/core/api/CoreMoneroConnectionsService.java +++ b/core/src/main/java/haveno/core/api/CoreMoneroConnectionsService.java @@ -165,7 +165,7 @@ public final class CoreMoneroConnectionsService { return socks5ProxyProvider.getSocks5Proxy() == null ? null : socks5ProxyProvider.getSocks5Proxy().getInetAddress().getHostAddress() + ":" + socks5ProxyProvider.getSocks5Proxy().getPort(); } - public void addListener(MoneroConnectionManagerListener listener) { + public void addConnectionListener(MoneroConnectionManagerListener listener) { synchronized (lock) { listeners.add(listener); } diff --git a/core/src/main/java/haveno/core/offer/OfferBookService.java b/core/src/main/java/haveno/core/offer/OfferBookService.java index 76c195fedd..42f07ad71c 100644 --- a/core/src/main/java/haveno/core/offer/OfferBookService.java +++ b/core/src/main/java/haveno/core/offer/OfferBookService.java @@ -94,7 +94,7 @@ public class OfferBookService { jsonFileManager = new JsonFileManager(storageDir); // listen for connection changes to monerod - connectionsService.addListener(new MoneroConnectionManagerListener() { + connectionsService.addConnectionListener(new MoneroConnectionManagerListener() { @Override public void onConnectionChanged(MoneroRpcConnection connection) { maybeInitializeKeyImagePoller(); @@ -297,8 +297,12 @@ public class OfferBookService { if (offer.getOfferPayload().getReserveTxKeyImages().contains(keyImage)) { synchronized (offerBookChangedListeners) { offerBookChangedListeners.forEach(listener -> { - listener.onRemoved(offer); - listener.onAdded(offer); + + // notify off thread to avoid deadlocking + new Thread(() -> { + listener.onRemoved(offer); + listener.onAdded(offer); + }).start(); }); } } diff --git a/core/src/main/java/haveno/core/offer/OpenOfferManager.java b/core/src/main/java/haveno/core/offer/OpenOfferManager.java index b4b9dc9284..f25bd11ee3 100644 --- a/core/src/main/java/haveno/core/offer/OpenOfferManager.java +++ b/core/src/main/java/haveno/core/offer/OpenOfferManager.java @@ -198,20 +198,20 @@ public class OpenOfferManager implements PeerManager.Listener, DecryptedDirectMe this.signedOfferPersistenceManager.initialize(signedOffers, "SignedOffers", PersistenceManager.Source.PRIVATE); // arbitrator stores reserve tx for signed offers // listen for connection changes to monerod - connectionsService.addListener(new MoneroConnectionManagerListener() { + connectionsService.addConnectionListener(new MoneroConnectionManagerListener() { @Override public void onConnectionChanged(MoneroRpcConnection connection) { maybeInitializeKeyImagePoller(); } }); - // remove open offer if reserved funds spent + // close open offer if reserved funds spent offerBookService.addOfferBookChangedListener(new OfferBookChangedListener() { @Override public void onAdded(Offer offer) { Optional openOfferOptional = getOpenOfferById(offer.getId()); if (openOfferOptional.isPresent() && openOfferOptional.get().getState() != OpenOffer.State.RESERVED && offer.isReservedFundsSpent()) { - removeOpenOffer(openOfferOptional.get(), null); + closeOpenOffer(offer); } } @Override @@ -637,6 +637,7 @@ public class OpenOfferManager implements PeerManager.Listener, DecryptedDirectMe } } + // remove open offer which thaws its key images private void onRemoved(@NotNull OpenOffer openOffer) { Offer offer = openOffer.getOffer(); if (offer.getOfferPayload().getReserveTxKeyImages() != null) { @@ -652,7 +653,7 @@ public class OpenOfferManager implements PeerManager.Listener, DecryptedDirectMe requestPersistence(); } - // Close openOffer after deposit published + // close open offer after key images spent public void closeOpenOffer(Offer offer) { getOpenOfferById(offer.getId()).ifPresent(openOffer -> { removeOpenOffer(openOffer); diff --git a/core/src/main/java/haveno/core/trade/Trade.java b/core/src/main/java/haveno/core/trade/Trade.java index 00ddb715d5..e4a41cb6e2 100644 --- a/core/src/main/java/haveno/core/trade/Trade.java +++ b/core/src/main/java/haveno/core/trade/Trade.java @@ -578,7 +578,7 @@ public abstract class Trade implements Tradable, Model { }); // listen to daemon connection - xmrWalletService.getConnectionsService().addListener(newConnection -> onConnectionChanged(newConnection)); + xmrWalletService.getConnectionsService().addConnectionListener(newConnection -> onConnectionChanged(newConnection)); // check if done if (isPayoutUnlocked()) { @@ -841,7 +841,7 @@ public abstract class Trade implements Tradable, Model { xmrWalletService.deleteWallet(getWalletName()); // delete trade wallet backups unless deposits requested and payouts not unlocked - if (isDepositRequested() && !isPayoutUnlocked()) { + if (isDepositRequested() && !isDepositFailed() && !isPayoutUnlocked()) { log.warn("Refusing to delete backup wallet for " + getClass().getSimpleName() + " " + getId() + " in the small chance it becomes funded"); } xmrWalletService.deleteWalletBackups(getWalletName()); diff --git a/core/src/main/java/haveno/core/trade/TradeManager.java b/core/src/main/java/haveno/core/trade/TradeManager.java index 8a3a10be59..47e8ca7d5a 100644 --- a/core/src/main/java/haveno/core/trade/TradeManager.java +++ b/core/src/main/java/haveno/core/trade/TradeManager.java @@ -21,7 +21,6 @@ import com.google.common.collect.ImmutableList; import common.utils.GenUtils; import haveno.common.ClockWatcher; -import haveno.common.UserThread; import haveno.common.crypto.KeyRing; import haveno.common.handlers.ErrorMessageHandler; import haveno.common.handlers.FaultHandler; @@ -88,6 +87,7 @@ import monero.wallet.model.MoneroOutputQuery; import org.bitcoinj.core.Coin; import org.bouncycastle.crypto.params.KeyParameter; import org.fxmisc.easybind.EasyBind; +import org.fxmisc.easybind.Subscription; import org.fxmisc.easybind.monadic.MonadicBinding; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -292,10 +292,7 @@ public class TradeManager implements PersistedDataHost, DecryptedDirectMessageLi log.info("{}.onShutDownStarted()", getClass().getSimpleName()); // collect trades to prepare - Set trades = new HashSet(); - trades.addAll(tradableList.getList()); - trades.addAll(closedTradableManager.getClosedTrades()); - trades.addAll(failedTradesManager.getObservableList()); + List trades = getAllTrades(); // prepare to shut down trades in parallel Set tasks = new HashSet(); @@ -408,14 +405,25 @@ public class TradeManager implements PersistedDataHost, DecryptedDirectMessageLi // initialize trades in parallel int threadPoolSize = 10; Set tasks = new HashSet(); + Set uids = new HashSet(); + Set tradesToSkip = new HashSet(); for (Trade trade : trades) { tasks.add(() -> { try { + + // check for duplicate uid + if (!uids.add(trade.getUid())) { + log.warn("Found trade with duplicate uid, skipping. That should never happen. {} {}, uid={}", trade.getClass().getSimpleName(), trade.getId(), trade.getUid()); + tradesToSkip.add(trade); + return; + } + + // initialize trade initPersistedTrade(trade); // remove trade if protocol didn't initialize - if (getOpenTradeByUid(trade.getId()).isPresent() && !trade.isDepositRequested()) { - log.warn("Removing persisted {} {} with uid={} because it did not finish initializing (state={})", trade.getClass().getSimpleName(), trade.getId(), trade.getUid(), trade.getState()); + if (getOpenTradeByUid(trade.getUid()).isPresent() && !trade.isDepositsPublished()) { + log.warn("Maybe removing persisted {} {} with uid={} because it did not finish initializing (state={})", trade.getClass().getSimpleName(), trade.getId(), trade.getUid(), trade.getState()); maybeRemoveTradeOnError(trade); } } catch (Exception e) { @@ -429,11 +437,12 @@ public class TradeManager implements PersistedDataHost, DecryptedDirectMessageLi log.info("Done initializing persisted trades"); if (isShutDown) return; + // remove skipped trades + trades.removeAll(tradesToSkip); + // sync idle trades once in background after active trades for (Trade trade : trades) { - if (trade.isIdling()) { - HavenoUtils.submitTask(() -> trade.syncWallet()); - } + if (trade.isIdling()) HavenoUtils.submitTask(() -> trade.syncWallet()); } getObservableList().addListener((ListChangeListener) change -> onTradesChanged()); @@ -480,7 +489,7 @@ public class TradeManager implements PersistedDataHost, DecryptedDirectMessageLi if (getTradeProtocol(trade) != null) return; initTradeAndProtocol(trade, createTradeProtocol(trade)); requestPersistence(); - scheduleDeletionIfUnfunded(trade); + listenForCleanup(trade); } private void initTradeAndProtocol(Trade trade, TradeProtocol tradeProtocol) { @@ -585,11 +594,7 @@ public class TradeManager implements PersistedDataHost, DecryptedDirectMessageLi ((ArbitratorProtocol) getTradeProtocol(trade)).handleInitTradeRequest(request, sender, errorMessage -> { log.warn("Arbitrator error during trade initialization for trade {}: {}", trade.getId(), errorMessage); - if (trade.getMaker().getReserveTxHash() != null || trade.getTaker().getReserveTxHash() != null) { - onMoveInvalidTradeToFailedTrades(trade); // arbitrator retains failed trades for analysis and penalty - } else { - maybeRemoveTradeOnError(trade); - } + maybeRemoveTradeOnError(trade); if (takeOfferRequestErrorMessageHandler != null) takeOfferRequestErrorMessageHandler.handleErrorMessage(errorMessage); }); @@ -625,7 +630,7 @@ public class TradeManager implements PersistedDataHost, DecryptedDirectMessageLi } // reserve open offer - openOfferManager.reserveOpenOffer(openOffer); // TODO (woodser): reserve offer if arbitrator? probably. or, arbitrator does not have open offer? + openOfferManager.reserveOpenOffer(openOffer); // get expected taker fee BigInteger takerFee = HavenoUtils.getTakerFee(BigInteger.valueOf(offer.getOfferPayload().getAmount())); @@ -677,7 +682,6 @@ public class TradeManager implements PersistedDataHost, DecryptedDirectMessageLi ((MakerProtocol) getTradeProtocol(trade)).handleInitTradeRequest(request, sender, errorMessage -> { log.warn("Maker error during trade initialization: " + errorMessage); maybeRemoveTradeOnError(trade); - openOfferManager.unreserveOpenOffer(openOffer); // offer remains available // TODO: only unreserve if funds not deposited to multisig if (takeOfferRequestErrorMessageHandler != null) takeOfferRequestErrorMessageHandler.handleErrorMessage(errorMessage); }); @@ -989,7 +993,7 @@ public class TradeManager implements PersistedDataHost, DecryptedDirectMessageLi // If trade is in already in critical state (if taker role: taker fee; both roles: after deposit published) // we move the trade to failedTradesManager public void onMoveInvalidTradeToFailedTrades(Trade trade) { - maybeRemoveTradeOnError(trade); + removeTrade(trade); failedTradesManager.add(trade); } @@ -1160,6 +1164,10 @@ public class TradeManager implements PersistedDataHost, DecryptedDirectMessageLi } } + public List getClosedTrades() { + return closedTradableManager.getClosedTrades(); + } + public Optional getClosedTrade(String tradeId) { return closedTradableManager.getClosedTrades().stream().filter(e -> e.getId().equals(tradeId)).findFirst(); } @@ -1187,9 +1195,18 @@ public class TradeManager implements PersistedDataHost, DecryptedDirectMessageLi } private void maybeRemoveTradeOnError(Trade trade) { - log.info("TradeManager.maybeRemoveTradeOnError() " + trade.getId()); synchronized (tradableList) { - if (!tradableList.contains(trade)) return; + if (trade.isDepositRequested() && !trade.isDepositFailed()) { + listenForCleanup(trade); + } else { + removeTradeOnError(trade); + } + } + } + + private void removeTradeOnError(Trade trade) { + log.info("TradeManager.removeTradeOnError() " + trade.getId()); + synchronized (tradableList) { // unreserve taker key images if (trade instanceof TakerTrade && trade.getSelf().getReserveTxKeyImages() != null) { @@ -1198,42 +1215,100 @@ public class TradeManager implements PersistedDataHost, DecryptedDirectMessageLi trade.getSelf().setReserveTxKeyImages(null); } - // remove trade if wallet deleted - if (!trade.walletExists()) { - removeTrade(trade); - return; + // unreserve open offer + Optional openOffer = openOfferManager.getOpenOfferById(trade.getId()); + if (trade instanceof MakerTrade && openOffer.isPresent()) { + openOfferManager.unreserveOpenOffer(openOffer.get()); } - // remove trade and wallet unless deposit requested without nack - if (!trade.isDepositRequested() || trade.isDepositFailed()) { - removeTrade(trade); - if (trade.walletExists()) trade.deleteWallet(); + // remove trade from list + removeTrade(trade); + + // delete trade wallet + if (trade.walletExists()) trade.deleteWallet(); + } + } + + private void listenForCleanup(Trade trade) { + if (getOpenTrade(trade.getId()).isPresent() && trade.isDepositRequested()) { + if (trade.isDepositsPublished()) { + cleanupPublishedTrade(trade); } else { - scheduleDeletionIfUnfunded(trade); + log.warn("Scheduling to delete open trade if unfunded for {} {}", trade.getClass().getSimpleName(), trade.getId()); + new TradeCleanupListener(trade); // TODO: better way than creating listener? } } } - private void scheduleDeletionIfUnfunded(Trade trade) { - if (getOpenTrade(trade.getId()).isPresent() && trade.isDepositRequested() && !trade.isDepositsPublished()) { - log.warn("Scheduling to delete open trade if unfunded for {} {}", trade.getClass().getSimpleName(), trade.getId()); - UserThread.runAfter(() -> { - if (isShutDown) return; + private void cleanupPublishedTrade(Trade trade) { + if (trade instanceof MakerTrade && openOfferManager.getOpenOfferById(trade.getId()).isPresent()) { + log.warn("Closing open offer as cleanup step"); + openOfferManager.closeOpenOffer(checkNotNull(trade.getOffer())); + } + } - // get trade's deposit txs from daemon - MoneroTx makerDepositTx = trade.getMaker().getDepositTxHash() == null ? null : xmrWalletService.getDaemon().getTx(trade.getMaker().getDepositTxHash()); - MoneroTx takerDepositTx = trade.getTaker().getDepositTxHash() == null ? null : xmrWalletService.getDaemon().getTx(trade.getTaker().getDepositTxHash()); + private class TradeCleanupListener { - // delete multisig trade wallet if neither deposit tx published - if (makerDepositTx == null && takerDepositTx == null) { - log.warn("Deleting {} {} after protocol error", trade.getClass().getSimpleName(), trade.getId()); - removeTrade(trade); - failedTradesManager.removeTrade(trade); - if (trade.walletExists()) trade.deleteWallet(); - } else { - log.warn("Refusing to delete {} {} after protocol timeout because its wallet might be funded", trade.getClass().getSimpleName(), trade.getId()); + private static final long REMOVE_AFTER_MS = 60000; + private static final int REMOVE_AFTER_NUM_CONFIRMATIONS = 1; + private Long startHeight; + private Subscription stateSubscription; + private Subscription heightSubscription; + + public TradeCleanupListener(Trade trade) { + + // listen for deposits published to close open offer + stateSubscription = EasyBind.subscribe(trade.stateProperty(), state -> { + if (trade.isDepositsPublished()) { + cleanupPublishedTrade(trade); + if (stateSubscription != null) { + stateSubscription.unsubscribe(); + stateSubscription = null; + } } - }, 60); + }); + + // listen for block confirmation to remove trade + long startTime = System.currentTimeMillis(); + heightSubscription = EasyBind.subscribe(xmrWalletService.getConnectionsService().chainHeightProperty(), lastBlockHeight -> { + if (isShutDown) return; + if (startHeight == null) startHeight = lastBlockHeight.longValue(); + if (lastBlockHeight.longValue() >= startHeight + REMOVE_AFTER_NUM_CONFIRMATIONS) { + new Thread(() -> { + + // wait minimum time + GenUtils.waitFor(Math.max(0, REMOVE_AFTER_MS - (System.currentTimeMillis() - startTime))); + + // get trade's deposit txs from daemon + MoneroTx makerDepositTx = trade.getMaker().getDepositTxHash() == null ? null : xmrWalletService.getDaemon().getTx(trade.getMaker().getDepositTxHash()); + MoneroTx takerDepositTx = trade.getTaker().getDepositTxHash() == null ? null : xmrWalletService.getDaemon().getTx(trade.getTaker().getDepositTxHash()); + + // remove trade and wallet if neither deposit tx published + if (makerDepositTx == null && takerDepositTx == null) { + log.warn("Deleting {} {} after protocol error", trade.getClass().getSimpleName(), trade.getId()); + if (trade instanceof ArbitratorTrade && (trade.getMaker().getReserveTxHash() != null || trade.getTaker().getReserveTxHash() != null)) { + onMoveInvalidTradeToFailedTrades(trade); // arbitrator retains trades with reserved funds for analysis and penalty + } else { + removeTradeOnError(trade); + failedTradesManager.removeTrade(trade); + } + } else if (!trade.isPayoutPublished()) { + + // set error that wallet may be partially funded + String errorMessage = "Refusing to delete " + trade.getClass().getSimpleName() + " " + trade.getId() + " after protocol timeout because its wallet might be funded"; + trade.prependErrorMessage(errorMessage); + log.warn(errorMessage); + } + + // unsubscribe + if (heightSubscription != null) { + heightSubscription.unsubscribe(); + heightSubscription = null; + } + + }).start(); + } + }); } } diff --git a/core/src/main/java/haveno/core/trade/protocol/tasks/ProcessDepositResponse.java b/core/src/main/java/haveno/core/trade/protocol/tasks/ProcessDepositResponse.java index b5d98ec5a5..c210f488b8 100644 --- a/core/src/main/java/haveno/core/trade/protocol/tasks/ProcessDepositResponse.java +++ b/core/src/main/java/haveno/core/trade/protocol/tasks/ProcessDepositResponse.java @@ -39,6 +39,7 @@ public class ProcessDepositResponse extends TradeTask { // throw if error DepositResponse message = (DepositResponse) processModel.getTradeMessage(); if (message.getErrorMessage() != null) { + trade.setStateIfValidTransitionTo(Trade.State.PUBLISH_DEPOSIT_TX_REQUEST_FAILED); throw new RuntimeException(message.getErrorMessage()); } @@ -48,7 +49,6 @@ public class ProcessDepositResponse extends TradeTask { processModel.getTradeManager().requestPersistence(); complete(); } catch (Throwable t) { - trade.setStateIfValidTransitionTo(Trade.State.PUBLISH_DEPOSIT_TX_REQUEST_FAILED); failed(t); } } diff --git a/core/src/main/java/haveno/core/xmr/wallet/MoneroKeyImagePoller.java b/core/src/main/java/haveno/core/xmr/wallet/MoneroKeyImagePoller.java index c76b65d7e3..71f90bf8a9 100644 --- a/core/src/main/java/haveno/core/xmr/wallet/MoneroKeyImagePoller.java +++ b/core/src/main/java/haveno/core/xmr/wallet/MoneroKeyImagePoller.java @@ -219,6 +219,18 @@ public class MoneroKeyImagePoller { } } + /** + * Get the last known spent status for the given key image. + * + * @param keyImage the key image to get the spent status for + * @return the last known spent status of the key image + */ + public MoneroKeyImageSpentStatus getLastSpentStatus(String keyImage) { + synchronized (lastStatuses) { + return lastStatuses.get(keyImage); + } + } + public void poll() { if (daemon == null) { log.warn("Cannot poll key images because daemon is null"); diff --git a/core/src/main/java/haveno/core/xmr/wallet/XmrWalletService.java b/core/src/main/java/haveno/core/xmr/wallet/XmrWalletService.java index 3243befb79..08c2a54f14 100644 --- a/core/src/main/java/haveno/core/xmr/wallet/XmrWalletService.java +++ b/core/src/main/java/haveno/core/xmr/wallet/XmrWalletService.java @@ -605,7 +605,7 @@ public class XmrWalletService { maybeInitMainWallet(); // set and listen to daemon connection - connectionsService.addListener(newConnection -> onConnectionChanged(newConnection)); + connectionsService.addConnectionListener(newConnection -> onConnectionChanged(newConnection)); } private synchronized void maybeInitMainWallet() { diff --git a/desktop/src/main/java/haveno/desktop/main/portfolio/pendingtrades/steps/buyer/BuyerStep4View.java b/desktop/src/main/java/haveno/desktop/main/portfolio/pendingtrades/steps/buyer/BuyerStep4View.java index 8ba61e0b9d..461702134d 100644 --- a/desktop/src/main/java/haveno/desktop/main/portfolio/pendingtrades/steps/buyer/BuyerStep4View.java +++ b/desktop/src/main/java/haveno/desktop/main/portfolio/pendingtrades/steps/buyer/BuyerStep4View.java @@ -17,7 +17,6 @@ package haveno.desktop.main.portfolio.pendingtrades.steps.buyer; -import com.jfoenix.controls.JFXBadge; import haveno.common.UserThread; import haveno.common.app.DevEnv; import haveno.core.locale.Res; @@ -35,9 +34,7 @@ import haveno.desktop.main.portfolio.pendingtrades.PendingTradesViewModel; import haveno.desktop.main.portfolio.pendingtrades.steps.TradeStepView; import haveno.desktop.util.Layout; import javafx.geometry.Insets; -import javafx.geometry.Pos; import javafx.scene.control.Button; -import javafx.scene.control.Label; import javafx.scene.layout.GridPane; import javafx.scene.layout.HBox; import javafx.scene.layout.Priority; diff --git a/p2p/src/main/java/haveno/network/p2p/network/Connection.java b/p2p/src/main/java/haveno/network/p2p/network/Connection.java index 46e775bda6..4e554a05b0 100644 --- a/p2p/src/main/java/haveno/network/p2p/network/Connection.java +++ b/p2p/src/main/java/haveno/network/p2p/network/Connection.java @@ -544,37 +544,38 @@ public class Connection implements HasCapabilities, Runnable, MessageListener { } private void doShutDown(CloseConnectionReason closeConnectionReason, @Nullable Runnable shutDownCompleteHandler) { - // Use UserThread.execute as its not clear if that is called from a non-UserThread - UserThread.execute(() -> connectionListener.onDisconnect(closeConnectionReason, this)); - try { - socket.close(); - } catch (SocketException e) { - log.trace("SocketException at shutdown might be expected {}", e.getMessage()); - } catch (IOException e) { - log.error("Exception at shutdown. " + e.getMessage()); - e.printStackTrace(); - } finally { - protoOutputStream.onConnectionShutdown(); - - capabilitiesListeners.clear(); - + UserThread.execute(() -> { + connectionListener.onDisconnect(closeConnectionReason, this); try { - protoInputStream.close(); + socket.close(); + } catch (SocketException e) { + log.trace("SocketException at shutdown might be expected {}", e.getMessage()); } catch (IOException e) { - log.error(e.getMessage()); + log.error("Exception at shutdown. " + e.getMessage()); e.printStackTrace(); + } finally { + protoOutputStream.onConnectionShutdown(); + + capabilitiesListeners.clear(); + + try { + protoInputStream.close(); + } catch (IOException e) { + log.error(e.getMessage()); + e.printStackTrace(); + } + + //noinspection UnstableApiUsage + MoreExecutors.shutdownAndAwaitTermination(singleThreadExecutor, 500, TimeUnit.MILLISECONDS); + //noinspection UnstableApiUsage + MoreExecutors.shutdownAndAwaitTermination(bundleSender, 500, TimeUnit.MILLISECONDS); + + log.debug("Connection shutdown complete {}", this.toString()); + // Use UserThread.execute as its not clear if that is called from a non-UserThread + if (shutDownCompleteHandler != null) + UserThread.execute(shutDownCompleteHandler); } - - //noinspection UnstableApiUsage - MoreExecutors.shutdownAndAwaitTermination(singleThreadExecutor, 500, TimeUnit.MILLISECONDS); - //noinspection UnstableApiUsage - MoreExecutors.shutdownAndAwaitTermination(bundleSender, 500, TimeUnit.MILLISECONDS); - - log.debug("Connection shutdown complete {}", this.toString()); - // Use UserThread.execute as its not clear if that is called from a non-UserThread - if (shutDownCompleteHandler != null) - UserThread.execute(shutDownCompleteHandler); - } + }); } @Override