From 8600c0cb0d7a6ee80bbdc932f02c5ebade084bff Mon Sep 17 00:00:00 2001 From: woodser Date: Fri, 26 Jan 2024 09:07:03 -0500 Subject: [PATCH] shut down http connections with 5s timeout --- .../main/java/haveno/common/ThreadUtils.java | 9 +- .../haveno/core/app/HavenoExecutable.java | 4 +- .../app/misc/ExecutableForAppWithP2p.java | 2 +- .../haveno/core/offer/OpenOfferManager.java | 109 +++++++++--------- .../core/provider/price/PriceFeedService.java | 1 + .../haveno/network/http/HttpClientImpl.java | 29 ++--- 6 files changed, 85 insertions(+), 69 deletions(-) diff --git a/common/src/main/java/haveno/common/ThreadUtils.java b/common/src/main/java/haveno/common/ThreadUtils.java index d366b869b0..518dd74dd6 100644 --- a/common/src/main/java/haveno/common/ThreadUtils.java +++ b/common/src/main/java/haveno/common/ThreadUtils.java @@ -35,7 +35,12 @@ public class ThreadUtils { private static final int POOL_SIZE = 10; private static final ExecutorService POOL = Executors.newFixedThreadPool(POOL_SIZE); - + /** + * Execute the given command in a thread with the given id. + * + * @param command the command to execute + * @param threadId the thread id + */ public static void execute(Runnable command, String threadId) { synchronized (EXECUTORS) { if (!EXECUTORS.containsKey(threadId)) EXECUTORS.put(threadId, Executors.newFixedThreadPool(1)); @@ -107,6 +112,8 @@ public class ThreadUtils { } } + // TODO: consolidate and cleanup apis + public static Future submitToPool(Runnable task) { return submitToPool(Arrays.asList(task)).get(0); } diff --git a/core/src/main/java/haveno/core/app/HavenoExecutable.java b/core/src/main/java/haveno/core/app/HavenoExecutable.java index 77571041c2..c6d83ec380 100644 --- a/core/src/main/java/haveno/core/app/HavenoExecutable.java +++ b/core/src/main/java/haveno/core/app/HavenoExecutable.java @@ -341,7 +341,7 @@ public abstract class HavenoExecutable implements GracefulShutDownHandler, Haven tasks.add(() -> injector.getInstance(XmrConnectionService.class).onShutDownStarted()); tasks.add(() -> injector.getInstance(TradeManager.class).onShutDownStarted()); try { - ThreadUtils.awaitTasks(tasks, tasks.size(), 120000l); // run in parallel with timeout + ThreadUtils.awaitTasks(tasks, tasks.size(), 90000l); // run in parallel with timeout } catch (Exception e) { e.printStackTrace(); } @@ -361,9 +361,9 @@ public abstract class HavenoExecutable implements GracefulShutDownHandler, Haven // shut down p2p service injector.getInstance(P2PService.class).shutDown(() -> { - log.info("Done shutting down OpenOfferManager, OfferBookService, and P2PService"); // shut down monero wallets and connections + log.info("Shutting down wallet and connection services"); injector.getInstance(WalletsSetup.class).shutDownComplete.addListener((ov, o, n) -> { // done shutting down diff --git a/core/src/main/java/haveno/core/app/misc/ExecutableForAppWithP2p.java b/core/src/main/java/haveno/core/app/misc/ExecutableForAppWithP2p.java index e102dfc37f..127e3fb5e8 100644 --- a/core/src/main/java/haveno/core/app/misc/ExecutableForAppWithP2p.java +++ b/core/src/main/java/haveno/core/app/misc/ExecutableForAppWithP2p.java @@ -125,9 +125,9 @@ public abstract class ExecutableForAppWithP2p extends HavenoExecutable { // shut down p2p service injector.getInstance(P2PService.class).shutDown(() -> { - log.info("Done shutting down OpenOfferManager, OfferBookService, and P2PService"); // shut down monero wallets and connections + log.info("Shutting down wallet and connection services"); injector.getInstance(WalletsSetup.class).shutDownComplete.addListener((ov, o, n) -> { module.close(injector); PersistenceManager.flushAllDataToDiskAtShutdown(() -> { diff --git a/core/src/main/java/haveno/core/offer/OpenOfferManager.java b/core/src/main/java/haveno/core/offer/OpenOfferManager.java index 7f2eb84f2e..acaa51bd8b 100644 --- a/core/src/main/java/haveno/core/offer/OpenOfferManager.java +++ b/core/src/main/java/haveno/core/offer/OpenOfferManager.java @@ -416,62 +416,67 @@ public class OpenOfferManager implements PeerManager.Listener, DecryptedDirectMe maybeUpdatePersistedOffers(); - ThreadUtils.execute(() -> { - - // Wait for prices to be available + // run off user thread so app is not blocked from starting + ThreadUtils.submitToPool(() -> { + + // wait for prices to be available priceFeedService.awaitExternalPrices(); - // Republish means we send the complete offer object - republishOffers(); - startPeriodicRepublishOffersTimer(); - - // Refresh is started once we get a success from republish - - // We republish after a bit as it might be that our connected node still has the offer in the data map - // but other peers have it already removed because of expired TTL. - // Those other not directly connected peers would not get the broadcast of the new offer, as the first - // connected peer (seed node) does not broadcast if it has the data in the map. - // To update quickly to the whole network we repeat the republishOffers call after a few seconds when we - // are better connected to the network. There is no guarantee that all peers will receive it but we also - // have our periodic timer, so after that longer interval the offer should be available to all peers. - if (retryRepublishOffersTimer == null) - retryRepublishOffersTimer = UserThread.runAfter(OpenOfferManager.this::republishOffers, - REPUBLISH_AGAIN_AT_STARTUP_DELAY_SEC); - - p2PService.getPeerManager().addListener(this); - - // TODO: add to invalid offers on failure - // openOffers.stream() - // .forEach(openOffer -> OfferUtil.getInvalidMakerFeeTxErrorMessage(openOffer.getOffer(), btcWalletService) - // .ifPresent(errorMsg -> invalidOffers.add(new Tuple2<>(openOffer, errorMsg)))); - - // process scheduled offers - processScheduledOffers((transaction) -> {}, (errorMessage) -> { - log.warn("Error processing unposted offers: " + errorMessage); - }); - - // register to process unposted offers when unlocked balance increases - if (xmrWalletService.getWallet() != null) lastUnlockedBalance = xmrWalletService.getWallet().getUnlockedBalance(0); - xmrWalletService.addWalletListener(new MoneroWalletListener() { - @Override - public void onBalancesChanged(BigInteger newBalance, BigInteger newUnlockedBalance) { - if (lastUnlockedBalance == null || lastUnlockedBalance.compareTo(newUnlockedBalance) < 0) { - processScheduledOffers((transaction) -> {}, (errorMessage) -> { - log.warn("Error processing unposted offers on new unlocked balance: " + errorMessage); // TODO: popup to notify user that offer did not post - }); + // process open offers on dedicated thread + ThreadUtils.execute(() -> { + + // Republish means we send the complete offer object + republishOffers(); + startPeriodicRepublishOffersTimer(); + + // Refresh is started once we get a success from republish + + // We republish after a bit as it might be that our connected node still has the offer in the data map + // but other peers have it already removed because of expired TTL. + // Those other not directly connected peers would not get the broadcast of the new offer, as the first + // connected peer (seed node) does not broadcast if it has the data in the map. + // To update quickly to the whole network we repeat the republishOffers call after a few seconds when we + // are better connected to the network. There is no guarantee that all peers will receive it but we also + // have our periodic timer, so after that longer interval the offer should be available to all peers. + if (retryRepublishOffersTimer == null) + retryRepublishOffersTimer = UserThread.runAfter(OpenOfferManager.this::republishOffers, + REPUBLISH_AGAIN_AT_STARTUP_DELAY_SEC); + + p2PService.getPeerManager().addListener(this); + + // TODO: add to invalid offers on failure + // openOffers.stream() + // .forEach(openOffer -> OfferUtil.getInvalidMakerFeeTxErrorMessage(openOffer.getOffer(), btcWalletService) + // .ifPresent(errorMsg -> invalidOffers.add(new Tuple2<>(openOffer, errorMsg)))); + + // process scheduled offers + processScheduledOffers((transaction) -> {}, (errorMessage) -> { + log.warn("Error processing unposted offers: " + errorMessage); + }); + + // register to process unposted offers when unlocked balance increases + if (xmrWalletService.getWallet() != null) lastUnlockedBalance = xmrWalletService.getWallet().getUnlockedBalance(0); + xmrWalletService.addWalletListener(new MoneroWalletListener() { + @Override + public void onBalancesChanged(BigInteger newBalance, BigInteger newUnlockedBalance) { + if (lastUnlockedBalance == null || lastUnlockedBalance.compareTo(newUnlockedBalance) < 0) { + processScheduledOffers((transaction) -> {}, (errorMessage) -> { + log.warn("Error processing unposted offers on new unlocked balance: " + errorMessage); // TODO: popup to notify user that offer did not post + }); + } + lastUnlockedBalance = newUnlockedBalance; } - lastUnlockedBalance = newUnlockedBalance; + }); + + // initialize key image poller for signed offers + maybeInitializeKeyImagePoller(); + + // poll spent status of key images + for (SignedOffer signedOffer : signedOffers.getList()) { + signedOfferKeyImagePoller.addKeyImages(signedOffer.getReserveTxKeyImages()); } - }); - - // initialize key image poller for signed offers - maybeInitializeKeyImagePoller(); - - // poll spent status of key images - for (SignedOffer signedOffer : signedOffers.getList()) { - signedOfferKeyImagePoller.addKeyImages(signedOffer.getReserveTxKeyImages()); - } - }, THREAD_ID); + }, THREAD_ID); + }); } diff --git a/core/src/main/java/haveno/core/provider/price/PriceFeedService.java b/core/src/main/java/haveno/core/provider/price/PriceFeedService.java index eca0c8fb97..203d755694 100644 --- a/core/src/main/java/haveno/core/provider/price/PriceFeedService.java +++ b/core/src/main/java/haveno/core/provider/price/PriceFeedService.java @@ -116,6 +116,7 @@ public class PriceFeedService { /////////////////////////////////////////////////////////////////////////////////////////// public void shutDown() { + log.info("Shutting down {}", getClass().getSimpleName()); if (requestTimer != null) { requestTimer.stop(); requestTimer = null; diff --git a/p2p/src/main/java/haveno/network/http/HttpClientImpl.java b/p2p/src/main/java/haveno/network/http/HttpClientImpl.java index b28072efe6..e1d13b05fd 100644 --- a/p2p/src/main/java/haveno/network/http/HttpClientImpl.java +++ b/p2p/src/main/java/haveno/network/http/HttpClientImpl.java @@ -18,6 +18,8 @@ package haveno.network.http; import com.runjva.sourceforge.jsocks.protocol.Socks5Proxy; + +import haveno.common.ThreadUtils; import haveno.common.app.Version; import haveno.common.util.Utilities; import haveno.network.Socks5ProxyProvider; @@ -65,6 +67,7 @@ public class HttpClientImpl implements HttpClient { private HttpURLConnection connection; @Nullable private CloseableHttpClient closeableHttpClient; + private static final long SHUTDOWN_TIMEOUT_MS = 5000l; @Getter @Setter @@ -88,6 +91,18 @@ public class HttpClientImpl implements HttpClient { @Override public void shutDown() { + try { + ThreadUtils.awaitTask(() -> { + doShutDown(connection, closeableHttpClient); + connection = null; + closeableHttpClient = null; + }, SHUTDOWN_TIMEOUT_MS); + } catch (Exception e) { + // ignore + } + } + + private void doShutDown(HttpURLConnection connection, CloseableHttpClient closeableHttpClient) { try { if (connection != null) { connection.getInputStream().close(); @@ -137,19 +152,7 @@ public class HttpClientImpl implements HttpClient { public void cancelPendingRequest() { if (!hasPendingRequest) return; - try { - if (connection != null) { - connection.getInputStream().close(); - connection.disconnect(); - connection = null; - } - if (closeableHttpClient != null) { - closeableHttpClient.close(); - closeableHttpClient = null; - } - } catch (IOException err) { - // igbnore - } + shutDown(); hasPendingRequest = false; }