refactor trade and connection threading to new ThreadUtils

This commit is contained in:
woodser 2024-01-05 13:25:29 -05:00
parent 59b2a1121b
commit 0d60df2aa7
17 changed files with 739 additions and 652 deletions

View File

@ -0,0 +1,144 @@
/*
* This file is part of Haveno.
*
* Haveno is free software: you can redistribute it and/or modify it
* under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or (at
* your option) any later version.
*
* Haveno is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE. See the GNU Affero General Public
* License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with Haveno. If not, see <http://www.gnu.org/licenses/>.
*/
package haveno.common;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
public class ThreadUtils {
private static final Map<String, ExecutorService> EXECUTORS = new HashMap<>();
private static final Map<String, Thread> THREAD_BY_ID = new HashMap<>();
private static final int POOL_SIZE = 10;
private static final ExecutorService POOL = Executors.newFixedThreadPool(POOL_SIZE);
public static void execute(Runnable command, String threadId) {
synchronized (EXECUTORS) {
if (!EXECUTORS.containsKey(threadId)) EXECUTORS.put(threadId, Executors.newFixedThreadPool(1));
EXECUTORS.get(threadId).execute(() -> {
synchronized (THREAD_BY_ID) {
THREAD_BY_ID.put(threadId, Thread.currentThread());
}
command.run();
});
}
}
public static void await(Runnable command, String threadId) {
if (isCurrentThread(Thread.currentThread(), threadId)) {
command.run();
} else {
CountDownLatch latch = new CountDownLatch(1);
execute(command, threadId); // run task
execute(() -> latch.countDown(), threadId); // await next tick
try {
latch.await();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
}
public static void shutDown(String threadId, long timeoutMs) {
ExecutorService pool = null;
synchronized (EXECUTORS) {
if (!EXECUTORS.containsKey(threadId)) return; // thread not found
pool = EXECUTORS.get(threadId);
}
pool.shutdown();
try {
if (!pool.awaitTermination(timeoutMs, TimeUnit.MILLISECONDS)) pool.shutdownNow();
} catch (InterruptedException e) {
pool.shutdownNow();
throw new RuntimeException(e);
} finally {
synchronized (EXECUTORS) {
EXECUTORS.remove(threadId);
}
synchronized (THREAD_BY_ID) {
THREAD_BY_ID.remove(threadId);
}
}
}
public static Future<?> submitToPool(Runnable task) {
return submitToPool(Arrays.asList(task)).get(0);
}
public static List<Future<?>> submitToPool(List<Runnable> tasks) {
List<Future<?>> futures = new ArrayList<>();
for (Runnable task : tasks) futures.add(POOL.submit(task));
return futures;
}
// TODO: these are unused; remove? use monero-java awaitTasks() when updated
public static Future<?> awaitTask(Runnable task) {
return awaitTasks(Arrays.asList(task)).get(0);
}
public static List<Future<?>> awaitTasks(Collection<Runnable> tasks) {
return awaitTasks(tasks, tasks.size());
}
public static List<Future<?>> awaitTasks(Collection<Runnable> tasks, int maxConcurrency) {
return awaitTasks(tasks, maxConcurrency, null);
}
public static List<Future<?>> awaitTasks(Collection<Runnable> tasks, int maxConcurrency, Long timeoutSeconds) {
List<Future<?>> futures = new ArrayList<>();
if (tasks.isEmpty()) return futures;
ExecutorService pool = Executors.newFixedThreadPool(maxConcurrency);
for (Runnable task : tasks) futures.add(pool.submit(task));
pool.shutdown();
// interrupt after timeout
if (timeoutSeconds != null) {
try {
if (!pool.awaitTermination(timeoutSeconds, TimeUnit.SECONDS)) pool.shutdownNow();
} catch (InterruptedException e) {
pool.shutdownNow();
throw new RuntimeException(e);
}
}
// throw exception from any tasks
try {
for (Future<?> future : futures) future.get();
} catch (Exception e) {
throw new RuntimeException(e);
}
return futures;
}
private static boolean isCurrentThread(Thread thread, String threadId) {
synchronized (THREAD_BY_ID) {
if (!THREAD_BY_ID.containsKey(threadId)) return false;
return thread == THREAD_BY_ID.get(threadId);
}
}
}

View File

@ -1,5 +1,6 @@
package haveno.core.api;
import haveno.common.ThreadUtils;
import haveno.common.UserThread;
import haveno.common.app.DevEnv;
import haveno.common.config.BaseCurrencyNetwork;
@ -562,7 +563,7 @@ public final class XmrConnectionService {
// notify listeners in parallel
synchronized (listenerLock) {
for (MoneroConnectionManagerListener listener : listeners) {
HavenoUtils.submitToPool(() -> listener.onConnectionChanged(currentConnection));
ThreadUtils.submitToPool(() -> listener.onConnectionChanged(currentConnection));
}
}
}

View File

@ -19,6 +19,8 @@ package haveno.core.app;
import com.google.inject.Guice;
import com.google.inject.Injector;
import haveno.common.ThreadUtils;
import haveno.common.UserThread;
import haveno.common.app.AppModule;
import haveno.common.config.Config;
@ -41,7 +43,6 @@ import haveno.core.provider.price.PriceFeedService;
import haveno.core.setup.CorePersistedDataHost;
import haveno.core.setup.CoreSetup;
import haveno.core.support.dispute.arbitration.arbitrator.ArbitratorManager;
import haveno.core.trade.HavenoUtils;
import haveno.core.trade.TradeManager;
import haveno.core.trade.statistics.TradeStatisticsManager;
import haveno.core.xmr.setup.WalletsSetup;
@ -340,7 +341,7 @@ public abstract class HavenoExecutable implements GracefulShutDownHandler, Haven
tasks.add(() -> injector.getInstance(XmrConnectionService.class).onShutDownStarted());
tasks.add(() -> injector.getInstance(TradeManager.class).onShutDownStarted());
try {
HavenoUtils.awaitTasks(tasks, tasks.size(), 120l); // run in parallel with timeout
ThreadUtils.awaitTasks(tasks, tasks.size(), 120l); // run in parallel with timeout
} catch (Exception e) {
e.printStackTrace();
}

View File

@ -18,6 +18,8 @@
package haveno.core.app.misc;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import haveno.common.ThreadUtils;
import haveno.common.UserThread;
import haveno.common.app.DevEnv;
import haveno.common.config.Config;
@ -33,7 +35,6 @@ import haveno.core.offer.OfferBookService;
import haveno.core.offer.OpenOfferManager;
import haveno.core.provider.price.PriceFeedService;
import haveno.core.support.dispute.arbitration.arbitrator.ArbitratorManager;
import haveno.core.trade.HavenoUtils;
import haveno.core.trade.TradeManager;
import haveno.core.trade.statistics.TradeStatisticsManager;
import haveno.core.xmr.setup.WalletsSetup;
@ -103,7 +104,7 @@ public abstract class ExecutableForAppWithP2p extends HavenoExecutable {
tasks.add(() -> injector.getInstance(XmrConnectionService.class).onShutDownStarted());
tasks.add(() -> injector.getInstance(TradeManager.class).onShutDownStarted());
try {
HavenoUtils.awaitTasks(tasks, tasks.size(), 120l); // run in parallel with timeout
ThreadUtils.awaitTasks(tasks, tasks.size(), 120l); // run in parallel with timeout
} catch (Exception e) {
e.printStackTrace();
}

View File

@ -18,6 +18,7 @@
package haveno.core.offer;
import common.utils.GenUtils;
import haveno.common.ThreadUtils;
import haveno.common.Timer;
import haveno.common.UserThread;
import haveno.common.app.Capabilities;
@ -150,6 +151,7 @@ public class OpenOfferManager implements PeerManager.Listener, DecryptedDirectMe
// poll key images of signed offers
private XmrKeyImagePoller signedOfferKeyImagePoller;
private static final long SHUTDOWN_TIMEOUT_MS = 90000;
private static final long KEY_IMAGE_REFRESH_PERIOD_MS_LOCAL = 20000; // 20 seconds
private static final long KEY_IMAGE_REFRESH_PERIOD_MS_REMOTE = 300000; // 5 minutes
@ -301,7 +303,6 @@ public class OpenOfferManager implements PeerManager.Listener, DecryptedDirectMe
}
public void shutDown(@Nullable Runnable completeHandler) {
HavenoUtils.removeThreadId(THREAD_ID);
stopped = true;
p2PService.getPeerManager().removeListener(this);
p2PService.removeDecryptedDirectMessageListener(this);
@ -316,7 +317,7 @@ public class OpenOfferManager implements PeerManager.Listener, DecryptedDirectMe
int size = openOffers.size();
log.info("Remove open offers at shutDown. Number of open offers: {}", size);
if (offerBookService.isBootstrapped() && size > 0) {
HavenoUtils.submitToThread(() -> { // finish tasks
ThreadUtils.execute(() -> { // finish tasks
UserThread.execute(() -> {
openOffers.forEach(openOffer -> offerBookService.removeOfferAtShutDown(openOffer.getOffer().getOfferPayload()));
@ -337,6 +338,9 @@ public class OpenOfferManager implements PeerManager.Listener, DecryptedDirectMe
if (completeHandler != null)
completeHandler.run();
}
// shut down pool
ThreadUtils.shutDown(THREAD_ID, SHUTDOWN_TIMEOUT_MS);
}
public void removeAllOpenOffers(@Nullable Runnable completeHandler) {
@ -400,7 +404,7 @@ public class OpenOfferManager implements PeerManager.Listener, DecryptedDirectMe
maybeUpdatePersistedOffers();
HavenoUtils.submitToThread(() -> {
ThreadUtils.execute(() -> {
// Wait for prices to be available
priceFeedService.awaitExternalPrices();
@ -506,7 +510,7 @@ public class OpenOfferManager implements PeerManager.Listener, DecryptedDirectMe
OpenOffer openOffer = new OpenOffer(offer, triggerPrice, reserveExactAmount);
// schedule or post offer
HavenoUtils.submitToThread(() -> {
ThreadUtils.execute(() -> {
synchronized (processOffersLock) {
CountDownLatch latch = new CountDownLatch(1);
processUnpostedOffer(getOpenOffers(), openOffer, (transaction) -> {
@ -807,7 +811,7 @@ public class OpenOfferManager implements PeerManager.Listener, DecryptedDirectMe
private void processScheduledOffers(TransactionResultHandler resultHandler, // TODO (woodser): transaction not needed with result handler
ErrorMessageHandler errorMessageHandler) {
HavenoUtils.submitToThread(() -> {
ThreadUtils.execute(() -> {
synchronized (processOffersLock) {
List<String> errorMessages = new ArrayList<String>();
List<OpenOffer> openOffers = getOpenOffers();
@ -1571,7 +1575,7 @@ public class OpenOfferManager implements PeerManager.Listener, DecryptedDirectMe
stopPeriodicRefreshOffersTimer();
HavenoUtils.submitToThread(() -> {
ThreadUtils.execute(() -> {
processListForRepublishOffers(getOpenOffers());
}, THREAD_ID);
}
@ -1607,7 +1611,7 @@ public class OpenOfferManager implements PeerManager.Listener, DecryptedDirectMe
}
private void republishOffer(OpenOffer openOffer, @Nullable Runnable completeHandler) {
HavenoUtils.submitToThread(() -> {
ThreadUtils.execute(() -> {
// determine if offer is valid
boolean isValid = true;

View File

@ -17,6 +17,7 @@
package haveno.core.support.dispute;
import haveno.common.ThreadUtils;
import haveno.common.UserThread;
import haveno.common.app.Version;
import haveno.common.config.Config;
@ -447,6 +448,7 @@ public abstract class DisputeManager<T extends DisputeList<Dispute>> extends Sup
return;
}
ThreadUtils.execute(() -> {
synchronized (trade) {
String errorMessage = null;
PubKeyRing senderPubKeyRing = null;
@ -547,6 +549,7 @@ public abstract class DisputeManager<T extends DisputeList<Dispute>> extends Sup
requestPersistence();
}
}, trade.getId());
}
// arbitrator sends dispute opened message to opener's peer

View File

@ -20,6 +20,7 @@ package haveno.core.support.dispute.arbitration;
import com.google.inject.Inject;
import com.google.inject.Singleton;
import common.utils.GenUtils;
import haveno.common.ThreadUtils;
import haveno.common.Timer;
import haveno.common.UserThread;
import haveno.common.app.Version;
@ -116,7 +117,7 @@ public final class ArbitrationManager extends DisputeManager<ArbitrationDisputeL
log.info("Received {} from {} with tradeId {} and uid {}",
message.getClass().getSimpleName(), message.getSenderNodeAddress(), message.getTradeId(), message.getUid());
HavenoUtils.submitToThread(() -> {
ThreadUtils.execute(() -> {
if (message instanceof DisputeOpenedMessage) {
handleDisputeOpenedMessage((DisputeOpenedMessage) message);
} else if (message instanceof ChatMessage) {
@ -187,6 +188,7 @@ public final class ArbitrationManager extends DisputeManager<ArbitrationDisputeL
}
// try to process dispute closed message
ThreadUtils.execute(() -> {
ChatMessage chatMessage = null;
Dispute dispute = null;
synchronized (trade) {
@ -325,6 +327,7 @@ public final class ArbitrationManager extends DisputeManager<ArbitrationDisputeL
}
}
}
}, trade.getId());
}
public void maybeReprocessDisputeClosedMessage(Trade trade, boolean reprocessOnError) {

View File

@ -44,18 +44,8 @@ import java.security.PrivateKey;
import java.text.DecimalFormat;
import java.text.DecimalFormatSymbols;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import javax.annotation.Nullable;
import lombok.extern.slf4j.Slf4j;
import monero.common.MoneroRpcConnection;
@ -79,9 +69,6 @@ public class HavenoUtils {
private static final BigInteger XMR_AU_MULTIPLIER = new BigInteger("1000000000000");
public static final DecimalFormat XMR_FORMATTER = new DecimalFormat("##############0.000000000000", DECIMAL_FORMAT_SYMBOLS);
public static final SimpleDateFormat DATE_FORMAT = new SimpleDateFormat("dd-MM-yyyy HH:mm:ss");
private static final int POOL_SIZE = 10;
private static final ExecutorService POOL = Executors.newFixedThreadPool(POOL_SIZE);
private static final Map<String, ExecutorService> POOLS = new HashMap<>();
// TODO: better way to share references?
public static ArbitrationManager arbitrationManager;
@ -474,86 +461,6 @@ public class HavenoUtils {
}
}
public static Future<?> submitToPool(Runnable task) {
return submitToPool(Arrays.asList(task)).get(0);
}
public static List<Future<?>> submitToPool(List<Runnable> tasks) {
List<Future<?>> futures = new ArrayList<>();
for (Runnable task : tasks) futures.add(POOL.submit(task));
return futures;
}
public static Future<?> submitToSharedThread(Runnable task) {
return submitToThread(task, HavenoUtils.class.getSimpleName());
}
public static Future<?> submitToThread(Runnable task, String threadId) {
synchronized (POOLS) {
if (!POOLS.containsKey(threadId)) POOLS.put(threadId, Executors.newFixedThreadPool(1));
return POOLS.get(threadId).submit(task);
}
}
public static Future<?> awaitThread(Runnable task, String threadId) {
Future<?> future = submitToThread(task, threadId);
try {
future.get();
} catch (Exception e) {
throw new RuntimeException(e);
}
return future;
}
public static void removeThreadId(String threadId) {
synchronized (POOLS) {
if (POOLS.containsKey(threadId)) {
POOLS.get(threadId).shutdown();
POOLS.remove(threadId);
}
}
}
// TODO: these are unused; remove? use monero-java awaitTasks() when updated
public static Future<?> awaitTask(Runnable task) {
return awaitTasks(Arrays.asList(task)).get(0);
}
public static List<Future<?>> awaitTasks(Collection<Runnable> tasks) {
return awaitTasks(tasks, tasks.size());
}
public static List<Future<?>> awaitTasks(Collection<Runnable> tasks, int maxConcurrency) {
return awaitTasks(tasks, maxConcurrency, null);
}
public static List<Future<?>> awaitTasks(Collection<Runnable> tasks, int maxConcurrency, Long timeoutSeconds) {
List<Future<?>> futures = new ArrayList<>();
if (tasks.isEmpty()) return futures;
ExecutorService pool = Executors.newFixedThreadPool(maxConcurrency);
for (Runnable task : tasks) futures.add(pool.submit(task));
pool.shutdown();
// interrupt after timeout
if (timeoutSeconds != null) {
try {
if (!pool.awaitTermination(timeoutSeconds, TimeUnit.SECONDS)) pool.shutdownNow();
} catch (InterruptedException e) {
pool.shutdownNow();
throw new RuntimeException(e);
}
}
// throw exception from any tasks
try {
for (Future<?> future : futures) future.get();
} catch (Exception e) {
throw new RuntimeException(e);
}
return futures;
}
public static String toCamelCase(String underscore) {
return CaseFormat.UPPER_UNDERSCORE.to(CaseFormat.LOWER_CAMEL, underscore);
}

View File

@ -21,6 +21,7 @@ import com.google.common.base.Preconditions;
import com.google.protobuf.ByteString;
import com.google.protobuf.Message;
import common.utils.GenUtils;
import haveno.common.ThreadUtils;
import haveno.common.UserThread;
import haveno.common.config.Config;
import haveno.common.crypto.Encryption;
@ -116,6 +117,7 @@ import static com.google.common.base.Preconditions.checkNotNull;
public abstract class Trade implements Tradable, Model {
private static final String MONERO_TRADE_WALLET_PREFIX = "xmr_trade_";
private static final long SHUTDOWN_TIMEOUT_MS = 90000;
private final Object walletLock = new Object();
private final Object pollLock = new Object();
private MoneroWallet wallet;
@ -586,7 +588,7 @@ public abstract class Trade implements Tradable, Model {
///////////////////////////////////////////////////////////////////////////////////////////
public void initialize(ProcessModelServiceProvider serviceProvider) {
synchronized (this) {
ThreadUtils.await(() -> {
if (isInitialized) throw new IllegalStateException(getClass().getSimpleName() + " " + getId() + " is already initialized");
// set arbitrator pub key ring once known
@ -596,8 +598,8 @@ public abstract class Trade implements Tradable, Model {
// handle connection change on dedicated thread
xmrConnectionService.addConnectionListener(connection -> {
HavenoUtils.submitToPool(() -> {
HavenoUtils.submitToThread(() -> onConnectionChanged(connection), getConnectionChangedThreadId());
ThreadUtils.submitToPool(() -> { // TODO: remove this?
ThreadUtils.execute(() -> onConnectionChanged(connection), getConnectionChangedThreadId());
});
});
@ -621,7 +623,7 @@ public abstract class Trade implements Tradable, Model {
// handle trade state events
tradeStateSubscription = EasyBind.subscribe(stateProperty, newValue -> {
HavenoUtils.submitToThread(() -> {
ThreadUtils.execute(() -> {
if (newValue == Trade.State.MULTISIG_COMPLETED) {
updateWalletRefreshPeriod();
startPolling();
@ -631,7 +633,7 @@ public abstract class Trade implements Tradable, Model {
// handle trade phase events
tradePhaseSubscription = EasyBind.subscribe(phaseProperty, newValue -> {
HavenoUtils.submitToThread(() -> {
ThreadUtils.execute(() -> {
if (isDepositsPublished() && !isPayoutUnlocked()) updateWalletRefreshPeriod();
if (isPaymentReceived()) {
UserThread.execute(() -> {
@ -646,7 +648,7 @@ public abstract class Trade implements Tradable, Model {
// handle payout events
payoutStateSubscription = EasyBind.subscribe(payoutStateProperty, newValue -> {
HavenoUtils.submitToThread(() -> {
ThreadUtils.execute(() -> {
if (isPayoutPublished()) updateWalletRefreshPeriod();
// handle when payout published
@ -677,7 +679,7 @@ public abstract class Trade implements Tradable, Model {
if (newValue == Trade.PayoutState.PAYOUT_UNLOCKED) {
if (!isInitialized) return;
log.info("Payout unlocked for {} {}, deleting multisig wallet", getClass().getSimpleName(), getId());
HavenoUtils.submitToThread(() -> {
ThreadUtils.execute(() -> {
deleteWallet();
maybeClearProcessData();
if (idlePayoutSyncer != null) {
@ -722,7 +724,7 @@ public abstract class Trade implements Tradable, Model {
// initialize syncing and polling
initSyncing();
}
}, getId());
}
public void requestPersistence() {
@ -787,8 +789,8 @@ public abstract class Trade implements Tradable, Model {
// check wallet connection on same thread as connection change
CountDownLatch latch = new CountDownLatch(1);
HavenoUtils.submitToPool((() -> {
HavenoUtils.submitToThread(() -> {
ThreadUtils.submitToPool((() -> {
ThreadUtils.execute(() -> {
if (!isWalletConnectedToDaemon()) throw new RuntimeException("Trade wallet is not connected to a Monero node"); // wallet connection is updated on trade thread
latch.countDown();
}, getConnectionChangedThreadId());
@ -1222,36 +1224,47 @@ public abstract class Trade implements Tradable, Model {
}
public void onShutDownStarted() {
if (wallet != null) log.info("Preparing to shut down {} {}", getClass().getSimpleName(), getId());
isShutDownStarted = true;
if (wallet != null) log.info("{} {} preparing for shut down", getClass().getSimpleName(), getId());
stopPolling();
// repeatedly acquire trade lock to allow other threads to finish
for (int i = 0; i < 20; i++) {
synchronized (this) {
synchronized (walletLock) {
if (isShutDown) break;
}
}
}
}
public void shutDown() {
if (!isPayoutUnlocked()) log.info("{} {} shutting down", getClass().getSimpleName(), getId());
synchronized (this) {
if (!isPayoutUnlocked()) log.info("Shutting down {} {}", getClass().getSimpleName(), getId());
// shut down thread pools with timeout
List<Runnable> tasks = new ArrayList<>();
tasks.add(() -> ThreadUtils.shutDown(getId(), SHUTDOWN_TIMEOUT_MS));
tasks.add(() -> ThreadUtils.shutDown(getConnectionChangedThreadId(), SHUTDOWN_TIMEOUT_MS));
try {
ThreadUtils.awaitTasks(tasks);
} catch (Exception e) {
log.warn("Timeout shutting down {} {}", getClass().getSimpleName(), getId());
// force stop wallet
if (wallet != null) {
log.warn("Force stopping wallet for {} {}", getClass().getSimpleName(), getId());
xmrWalletService.stopWallet(wallet, wallet.getPath(), true);
wallet = null;
}
}
// de-initialize
isInitialized = false;
isShutDown = true;
synchronized (walletLock) {
if (idlePayoutSyncer != null) {
xmrWalletService.removeWalletListener(idlePayoutSyncer);
idlePayoutSyncer = null;
}
if (wallet != null) {
xmrWalletService.saveWallet(wallet, false); // skip backup
stopWallet();
}
}
UserThread.execute(() -> {
if (tradeStateSubscription != null) tradeStateSubscription.unsubscribe();
if (tradePhaseSubscription != null) tradePhaseSubscription.unsubscribe();
if (payoutStateSubscription != null) payoutStateSubscription.unsubscribe();
idlePayoutSyncer = null; // main wallet removes listener itself
}
});
}
///////////////////////////////////////////////////////////////////////////////////////////
@ -1262,11 +1275,6 @@ public abstract class Trade implements Tradable, Model {
public void onComplete() {
}
public void onRemoved() {
HavenoUtils.removeThreadId(getId());
HavenoUtils.removeThreadId(getConnectionChangedThreadId());
}
///////////////////////////////////////////////////////////////////////////////////////////
// Abstract
@ -1864,7 +1872,7 @@ public abstract class Trade implements Tradable, Model {
// sync and reprocess messages on new thread
if (isInitialized && connection != null && !Boolean.FALSE.equals(connection.isConnected())) {
HavenoUtils.submitToPool(() -> initSyncing());
ThreadUtils.execute(() -> initSyncing(), getId());
}
}
}
@ -1875,11 +1883,9 @@ public abstract class Trade implements Tradable, Model {
initSyncingAux();
} else {
long startSyncingInMs = ThreadLocalRandom.current().nextLong(0, getWalletRefreshPeriod()); // random time to start syncing
UserThread.runAfter(() -> {
HavenoUtils.submitToPool(() -> {
UserThread.runAfter(() -> ThreadUtils.execute(() -> {
if (!isShutDownStarted) initSyncingAux();
});
}, startSyncingInMs / 1000l);
}, getId()), startSyncingInMs / 1000l);
}
}
@ -2108,13 +2114,11 @@ public abstract class Trade implements Tradable, Model {
@Override
public void onNewBlock(long height) {
HavenoUtils.submitToThread(() -> { // allow rapid notifications
ThreadUtils.execute(() -> { // allow rapid notifications
// skip rapid succession blocks
synchronized (this) {
if (processing) return;
processing = true;
}
// skip if not idling and not waiting for payout to unlock
if (!isIdling() || !isPayoutPublished() || isPayoutUnlocked()) {

View File

@ -21,6 +21,7 @@ import com.google.common.collect.ImmutableList;
import common.utils.GenUtils;
import haveno.common.ClockWatcher;
import haveno.common.ThreadUtils;
import haveno.common.crypto.KeyRing;
import haveno.common.crypto.PubKeyRing;
import haveno.common.handlers.ErrorMessageHandler;
@ -234,7 +235,7 @@ public class TradeManager implements PersistedDataHost, DecryptedDirectMessageLi
NetworkEnvelope networkEnvelope = message.getNetworkEnvelope();
if (!(networkEnvelope instanceof TradeMessage)) return;
String tradeId = ((TradeMessage) networkEnvelope).getTradeId();
HavenoUtils.submitToThread(() -> {
ThreadUtils.execute(() -> {
if (networkEnvelope instanceof InitTradeRequest) {
handleInitTradeRequest((InitTradeRequest) networkEnvelope, peer);
} else if (networkEnvelope instanceof InitMultisigRequest) {
@ -315,10 +316,10 @@ public class TradeManager implements PersistedDataHost, DecryptedDirectMessageLi
}
});
try {
HavenoUtils.awaitTasks(tasks);
ThreadUtils.awaitTasks(tasks);
} catch (Exception e) {
log.warn("Error notifying trades that shut down started: {}", e.getMessage());
e.printStackTrace();
throw e;
}
}
@ -345,7 +346,7 @@ public class TradeManager implements PersistedDataHost, DecryptedDirectMessageLi
}
});
try {
HavenoUtils.awaitTasks(tasks);
ThreadUtils.awaitTasks(tasks);
} catch (Exception e) {
log.warn("Error shutting down trades: {}", e.getMessage());
e.printStackTrace();
@ -413,7 +414,7 @@ public class TradeManager implements PersistedDataHost, DecryptedDirectMessageLi
}
});
};
HavenoUtils.awaitTasks(tasks, threadPoolSize);
ThreadUtils.awaitTasks(tasks, threadPoolSize);
log.info("Done initializing persisted trades");
if (isShutDownStarted) return;
@ -422,7 +423,7 @@ public class TradeManager implements PersistedDataHost, DecryptedDirectMessageLi
// sync idle trades once in background after active trades
for (Trade trade : trades) {
if (trade.isIdling()) HavenoUtils.submitToPool(() -> trade.syncAndPollWallet());
if (trade.isIdling()) ThreadUtils.submitToPool(() -> trade.syncAndPollWallet());
}
// process after all wallets initialized
@ -1224,7 +1225,6 @@ public class TradeManager implements PersistedDataHost, DecryptedDirectMessageLi
// remove trade
tradableList.remove(trade);
trade.onRemoved();
// unregister and persist
p2PService.removeDecryptedDirectMessageListener(getTradeProtocol(trade));

View File

@ -17,6 +17,7 @@ e * This file is part of Haveno.
package haveno.core.trade.protocol;
import haveno.common.ThreadUtils;
import haveno.common.handlers.ErrorMessageHandler;
import haveno.core.trade.BuyerAsMakerTrade;
import haveno.core.trade.Trade;
@ -43,7 +44,7 @@ public class BuyerAsMakerProtocol extends BuyerProtocol implements MakerProtocol
NodeAddress peer,
ErrorMessageHandler errorMessageHandler) {
System.out.println(getClass().getCanonicalName() + ".handleInitTradeRequest()");
new Thread(() -> {
ThreadUtils.execute(() -> {
synchronized (trade) {
latchTrade();
this.errorMessageHandler = errorMessageHandler;
@ -66,6 +67,6 @@ public class BuyerAsMakerProtocol extends BuyerProtocol implements MakerProtocol
.executeTasks(true);
awaitTradeLatch();
}
}).start();
}, trade.getId());
}
}

View File

@ -18,6 +18,7 @@
package haveno.core.trade.protocol;
import haveno.common.ThreadUtils;
import haveno.common.handlers.ErrorMessageHandler;
import haveno.core.trade.BuyerAsTakerTrade;
import haveno.core.trade.Trade;
@ -47,7 +48,7 @@ public class BuyerAsTakerProtocol extends BuyerProtocol implements TakerProtocol
public void onTakeOffer(TradeResultHandler tradeResultHandler,
ErrorMessageHandler errorMessageHandler) {
System.out.println(getClass().getCanonicalName() + ".onTakeOffer()");
new Thread(() -> {
ThreadUtils.execute(() -> {
synchronized (trade) {
latchTrade();
this.tradeResultHandler = tradeResultHandler;
@ -71,6 +72,6 @@ public class BuyerAsTakerProtocol extends BuyerProtocol implements TakerProtocol
.executeTasks(true);
awaitTradeLatch();
}
}).start();
}, trade.getId());
}
}

View File

@ -18,6 +18,7 @@
package haveno.core.trade.protocol;
import haveno.common.ThreadUtils;
import haveno.common.handlers.ErrorMessageHandler;
import haveno.core.trade.SellerAsMakerTrade;
import haveno.core.trade.Trade;
@ -48,7 +49,7 @@ public class SellerAsMakerProtocol extends SellerProtocol implements MakerProtoc
NodeAddress peer,
ErrorMessageHandler errorMessageHandler) {
System.out.println(getClass().getCanonicalName() + ".handleInitTradeRequest()");
new Thread(() -> {
ThreadUtils.execute(() -> {
synchronized (trade) {
latchTrade();
this.errorMessageHandler = errorMessageHandler;
@ -71,6 +72,6 @@ public class SellerAsMakerProtocol extends SellerProtocol implements MakerProtoc
.executeTasks(true);
awaitTradeLatch();
}
}).start();
}, trade.getId());
}
}

View File

@ -18,6 +18,7 @@
package haveno.core.trade.protocol;
import haveno.common.ThreadUtils;
import haveno.common.handlers.ErrorMessageHandler;
import haveno.core.trade.SellerAsTakerTrade;
import haveno.core.trade.Trade;
@ -48,7 +49,7 @@ public class SellerAsTakerProtocol extends SellerProtocol implements TakerProtoc
public void onTakeOffer(TradeResultHandler tradeResultHandler,
ErrorMessageHandler errorMessageHandler) {
System.out.println(getClass().getSimpleName() + ".onTakeOffer()");
new Thread(() -> {
ThreadUtils.execute(() -> {
synchronized (trade) {
latchTrade();
this.tradeResultHandler = tradeResultHandler;
@ -72,6 +73,6 @@ public class SellerAsTakerProtocol extends SellerProtocol implements TakerProtoc
.executeTasks(true);
awaitTradeLatch();
}
}).start();
}, trade.getId());
}
}

View File

@ -17,6 +17,7 @@
package haveno.core.trade.protocol;
import haveno.common.ThreadUtils;
import haveno.common.Timer;
import haveno.common.UserThread;
import haveno.common.crypto.PubKeyRing;
@ -113,12 +114,12 @@ public abstract class TradeProtocol implements DecryptedDirectMessageListener, D
protected void onTradeMessage(TradeMessage message, NodeAddress peerNodeAddress) {
log.info("Received {} as TradeMessage from {} with tradeId {} and uid {}", message.getClass().getSimpleName(), peerNodeAddress, message.getTradeId(), message.getUid());
HavenoUtils.submitToThread(() -> handle(message, peerNodeAddress), trade.getId());
ThreadUtils.execute(() -> handle(message, peerNodeAddress), trade.getId());
}
protected void onMailboxMessage(TradeMessage message, NodeAddress peerNodeAddress) {
log.info("Received {} as MailboxMessage from {} with tradeId {} and uid {}", message.getClass().getSimpleName(), peerNodeAddress, message.getTradeId(), message.getUid());
handle(message, peerNodeAddress);
ThreadUtils.execute(() -> handle(message, peerNodeAddress), trade.getId());
}
private void handle(TradeMessage message, NodeAddress peerNodeAddress) {
@ -264,7 +265,7 @@ public abstract class TradeProtocol implements DecryptedDirectMessageListener, D
}
public void maybeSendDepositsConfirmedMessages() {
HavenoUtils.submitToThread(() -> {
ThreadUtils.execute(() -> {
if (!trade.isDepositsConfirmed() || trade.isDepositsConfirmedAcked() || trade.isPayoutPublished()) return;
synchronized (trade) {
if (!trade.isInitialized() || trade.isShutDownStarted()) return; // skip if shutting down
@ -285,7 +286,7 @@ public abstract class TradeProtocol implements DecryptedDirectMessageListener, D
}
public void maybeReprocessPaymentReceivedMessage(boolean reprocessOnError) {
HavenoUtils.submitToThread(() -> {
ThreadUtils.execute(() -> {
synchronized (trade) {
// skip if no need to reprocess
@ -296,13 +297,12 @@ public abstract class TradeProtocol implements DecryptedDirectMessageListener, D
log.warn("Reprocessing payment received message for {} {}", trade.getClass().getSimpleName(), trade.getId());
handle(trade.getSeller().getPaymentReceivedMessage(), trade.getSeller().getPaymentReceivedMessage().getSenderNodeAddress(), reprocessOnError);
}
handle(trade.getSeller().getPaymentReceivedMessage(), trade.getSeller().getPaymentReceivedMessage().getSenderNodeAddress(), reprocessOnError);
}, trade.getId());
}
public void handleInitMultisigRequest(InitMultisigRequest request, NodeAddress sender) {
System.out.println(getClass().getSimpleName() + ".handleInitMultisigRequest()");
ThreadUtils.execute(() -> {
synchronized (trade) {
// check trade
@ -312,7 +312,7 @@ public abstract class TradeProtocol implements DecryptedDirectMessageListener, D
}
Validator.checkTradeId(processModel.getOfferId(), request);
// proocess message
// process message
latchTrade();
processModel.setTradeMessage(request);
expect(anyPhase(Trade.Phase.INIT)
@ -333,10 +333,12 @@ public abstract class TradeProtocol implements DecryptedDirectMessageListener, D
.executeTasks(true);
awaitTradeLatch();
}
}, trade.getId());
}
public void handleSignContractRequest(SignContractRequest message, NodeAddress sender) {
System.out.println(getClass().getSimpleName() + ".handleSignContractRequest() " + trade.getId());
ThreadUtils.execute(() -> {
synchronized (trade) {
// check trade
@ -372,14 +374,16 @@ public abstract class TradeProtocol implements DecryptedDirectMessageListener, D
// process sign contract request after multisig created
EasyBind.subscribe(trade.stateProperty(), state -> {
if (state == Trade.State.MULTISIG_COMPLETED) HavenoUtils.submitToThread(() -> handleSignContractRequest(message, sender), trade.getId()); // process notification without trade lock
if (state == Trade.State.MULTISIG_COMPLETED) ThreadUtils.execute(() -> handleSignContractRequest(message, sender), trade.getId()); // process notification without trade lock
});
}
}
}, trade.getId());
}
public void handleSignContractResponse(SignContractResponse message, NodeAddress sender) {
System.out.println(getClass().getSimpleName() + ".handleSignContractResponse() " + trade.getId());
ThreadUtils.execute(() -> {
synchronized (trade) {
// check trade
@ -415,14 +419,16 @@ public abstract class TradeProtocol implements DecryptedDirectMessageListener, D
// process sign contract response after contract signed
EasyBind.subscribe(trade.stateProperty(), state -> {
if (state == Trade.State.CONTRACT_SIGNED) HavenoUtils.submitToThread(() -> handleSignContractResponse(message, sender), trade.getId()); // process notification without trade lock
if (state == Trade.State.CONTRACT_SIGNED) ThreadUtils.execute(() -> handleSignContractResponse(message, sender), trade.getId()); // process notification without trade lock
});
}
}
}, trade.getId());
}
public void handleDepositResponse(DepositResponse response, NodeAddress sender) {
System.out.println(getClass().getSimpleName() + ".handleDepositResponse()");
ThreadUtils.execute(() -> {
synchronized (trade) {
// check trade
@ -456,10 +462,12 @@ public abstract class TradeProtocol implements DecryptedDirectMessageListener, D
.executeTasks(true);
awaitTradeLatch();
}
}, trade.getId());
}
public void handle(DepositsConfirmedMessage response, NodeAddress sender) {
System.out.println(getClass().getSimpleName() + ".handle(DepositsConfirmedMessage)");
ThreadUtils.execute(() -> {
synchronized (trade) {
latchTrade();
this.errorMessageHandler = null;
@ -480,6 +488,7 @@ public abstract class TradeProtocol implements DecryptedDirectMessageListener, D
.executeTasks();
awaitTradeLatch();
}
}, trade.getId());
}
// received by seller and arbitrator
@ -489,6 +498,7 @@ public abstract class TradeProtocol implements DecryptedDirectMessageListener, D
log.warn("Ignoring PaymentSentMessage since not seller or arbitrator");
return;
}
ThreadUtils.execute(() -> {
// We are more tolerant with expected phase and allow also DEPOSITS_PUBLISHED as it can be the case
// that the wallet is still syncing and so the DEPOSITS_CONFIRMED state to yet triggered when we received
// a mailbox message with PaymentSentMessage.
@ -526,6 +536,7 @@ public abstract class TradeProtocol implements DecryptedDirectMessageListener, D
.executeTasks(true);
awaitTradeLatch();
}
}, trade.getId());
}
// received by buyer and arbitrator
@ -535,6 +546,7 @@ public abstract class TradeProtocol implements DecryptedDirectMessageListener, D
private void handle(PaymentReceivedMessage message, NodeAddress peer, boolean reprocessOnError) {
System.out.println(getClass().getSimpleName() + ".handle(PaymentReceivedMessage)");
ThreadUtils.execute(() -> {
if (!(trade instanceof BuyerTrade || trade instanceof ArbitratorTrade)) {
log.warn("Ignoring PaymentReceivedMessage since not buyer or arbitrator");
return;
@ -585,6 +597,7 @@ public abstract class TradeProtocol implements DecryptedDirectMessageListener, D
.executeTasks(true);
awaitTradeLatch();
}
}, trade.getId());
}
public void onWithdrawCompleted() {

View File

@ -4,6 +4,7 @@ import com.google.common.util.concurrent.Service.State;
import com.google.inject.name.Named;
import common.utils.JsonUtils;
import haveno.common.ThreadUtils;
import haveno.common.UserThread;
import haveno.common.config.Config;
import haveno.common.file.FileUtil;
@ -707,6 +708,10 @@ public class XmrWalletService {
public void onShutDownStarted() {
log.info("XmrWalletService.onShutDownStarted()");
this.isShutDownStarted = true;
}
public void shutDown() {
log.info("Shutting down {}", getClass().getSimpleName());
// remove listeners which stops polling wallet
// TODO monero-java: wallet.stopPolling()?
@ -717,15 +722,11 @@ public class XmrWalletService {
}
}
}
}
public void shutDown() {
log.info("Shutting down {}", getClass().getSimpleName());
// shut down trade and main wallets at same time
walletListeners.clear();
closeMainWallet(true);
log.info("Done shutting down all wallets");
log.info("Done shutting down main wallet");
}
// ------------------------------ PRIVATE HELPERS -------------------------
@ -734,7 +735,7 @@ public class XmrWalletService {
// listen for connection changes
xmrConnectionService.addConnectionListener(connection -> {
HavenoUtils.submitToThread(() -> onConnectionChanged(connection), THREAD_ID);
ThreadUtils.execute(() -> onConnectionChanged(connection), THREAD_ID);
});
// initialize main wallet when daemon synced
@ -744,7 +745,7 @@ public class XmrWalletService {
}
private void initMainWalletIfConnected() {
HavenoUtils.submitToThread(() -> {
ThreadUtils.execute(() -> {
synchronized (walletLock) {
if (xmrConnectionService.downloadPercentageProperty().get() == 1 && wallet == null && !isShutDownStarted) {
maybeInitMainWallet(true);
@ -817,12 +818,12 @@ public class XmrWalletService {
// reschedule to init main wallet
UserThread.runAfter(() -> {
HavenoUtils.submitToThread(() -> maybeInitMainWallet(true, MAX_SYNC_ATTEMPTS), THREAD_ID);
ThreadUtils.execute(() -> maybeInitMainWallet(true, MAX_SYNC_ATTEMPTS), THREAD_ID);
}, xmrConnectionService.getRefreshPeriodMs() / 1000);
} else {
log.warn("Trying again in {} seconds", xmrConnectionService.getRefreshPeriodMs() / 1000);
UserThread.runAfter(() -> {
HavenoUtils.submitToThread(() -> maybeInitMainWallet(true, numAttempts - 1), THREAD_ID);
ThreadUtils.execute(() -> maybeInitMainWallet(true, numAttempts - 1), THREAD_ID);
}, xmrConnectionService.getRefreshPeriodMs() / 1000);
}
}
@ -994,7 +995,7 @@ public class XmrWalletService {
// sync wallet on new thread
if (connection != null) {
wallet.getDaemonConnection().setPrintStackTrace(PRINT_STACK_TRACE);
HavenoUtils.submitToPool(() -> {
ThreadUtils.submitToPool(() -> {
if (isShutDownStarted) return;
wallet.startSyncing(xmrConnectionService.getRefreshPeriodMs());
try {
@ -1034,7 +1035,7 @@ public class XmrWalletService {
}
// excute tasks in parallel
HavenoUtils.awaitTasks(tasks, Math.min(10, 1 + trades.size()));
ThreadUtils.awaitTasks(tasks, Math.min(10, 1 + trades.size()));
log.info("Done changing all wallet passwords");
}
@ -1318,7 +1319,7 @@ public class XmrWalletService {
BigInteger balance;
if (balanceListener.getSubaddressIndex() != null && balanceListener.getSubaddressIndex() != 0) balance = getBalanceForSubaddress(balanceListener.getSubaddressIndex());
else balance = getAvailableBalance();
HavenoUtils.submitToThread(() -> {
ThreadUtils.execute(() -> {
try {
balanceListener.onBalanceChanged(balance);
} catch (Exception e) {
@ -1369,14 +1370,14 @@ public class XmrWalletService {
@Override
public void onSyncProgress(long height, long startHeight, long endHeight, double percentDone, String message) {
HavenoUtils.submitToThread(() -> {
ThreadUtils.execute(() -> {
for (MoneroWalletListenerI listener : walletListeners) listener.onSyncProgress(height, startHeight, endHeight, percentDone, message);
}, THREAD_ID);
}
@Override
public void onNewBlock(long height) {
HavenoUtils.submitToThread(() -> {
ThreadUtils.execute(() -> {
walletHeight.set(height);
for (MoneroWalletListenerI listener : walletListeners) listener.onNewBlock(height);
}, THREAD_ID);
@ -1384,7 +1385,7 @@ public class XmrWalletService {
@Override
public void onBalancesChanged(BigInteger newBalance, BigInteger newUnlockedBalance) {
HavenoUtils.submitToThread(() -> {
ThreadUtils.execute(() -> {
for (MoneroWalletListenerI listener : walletListeners) listener.onBalancesChanged(newBalance, newUnlockedBalance);
updateBalanceListeners();
}, THREAD_ID);
@ -1392,14 +1393,14 @@ public class XmrWalletService {
@Override
public void onOutputReceived(MoneroOutputWallet output) {
HavenoUtils.submitToThread(() -> {
ThreadUtils.execute(() -> {
for (MoneroWalletListenerI listener : walletListeners) listener.onOutputReceived(output);
}, THREAD_ID);
}
@Override
public void onOutputSpent(MoneroOutputWallet output) {
HavenoUtils.submitToThread(() -> {
ThreadUtils.execute(() -> {
for (MoneroWalletListenerI listener : walletListeners) listener.onOutputSpent(output);
}, THREAD_ID);
}

View File

@ -32,6 +32,7 @@ import haveno.network.p2p.storage.payload.CapabilityRequiringPayload;
import haveno.network.p2p.storage.payload.PersistableNetworkPayload;
import haveno.common.Proto;
import haveno.common.ThreadUtils;
import haveno.common.app.Capabilities;
import haveno.common.app.HasCapabilities;
import haveno.common.app.Version;
@ -73,7 +74,6 @@ import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArraySet;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.stream.Collectors;
@ -109,7 +109,7 @@ public class Connection implements HasCapabilities, Runnable, MessageListener {
//TODO decrease limits again after testing
private static final int SOCKET_TIMEOUT = (int) TimeUnit.SECONDS.toMillis(240);
private static final int SHUTDOWN_TIMEOUT = 100;
private static final ExecutorService EXECUTOR = Executors.newFixedThreadPool(1); // one shared thread to handle messages sequentially
private static final String THREAD_ID = Connection.class.getSimpleName();
public static int getPermittedMessageSize() {
return PERMITTED_MESSAGE_SIZE;
@ -212,7 +212,7 @@ public class Connection implements HasCapabilities, Runnable, MessageListener {
reportInvalidRequest(RuleViolation.PEER_BANNED);
}
}
EXECUTOR.execute(() -> connectionListener.onConnection(this));
ThreadUtils.execute(() -> connectionListener.onConnection(this), THREAD_ID);
} catch (Throwable e) {
handleException(e);
}
@ -266,8 +266,8 @@ public class Connection implements HasCapabilities, Runnable, MessageListener {
if (!stopped) {
protoOutputStream.writeEnvelope(networkEnvelope);
EXECUTOR.execute(() -> messageListeners.forEach(e -> e.onMessageSent(networkEnvelope, this)));
EXECUTOR.execute(() -> connectionStatistics.addSendMsgMetrics(System.currentTimeMillis() - ts, networkEnvelopeSize));
ThreadUtils.execute(() -> messageListeners.forEach(e -> e.onMessageSent(networkEnvelope, this)), THREAD_ID);
ThreadUtils.execute(() -> connectionStatistics.addSendMsgMetrics(System.currentTimeMillis() - ts, networkEnvelopeSize), THREAD_ID);
}
} catch (Throwable t) {
handleException(t);
@ -396,7 +396,7 @@ public class Connection implements HasCapabilities, Runnable, MessageListener {
if (networkEnvelope instanceof BundleOfEnvelopes) {
onBundleOfEnvelopes((BundleOfEnvelopes) networkEnvelope, connection);
} else {
EXECUTOR.execute(() -> messageListeners.forEach(e -> e.onMessage(networkEnvelope, connection)));
ThreadUtils.execute(() -> messageListeners.forEach(e -> e.onMessage(networkEnvelope, connection)), THREAD_ID);
}
}
@ -432,8 +432,9 @@ public class Connection implements HasCapabilities, Runnable, MessageListener {
envelopesToProcess.add(networkEnvelope);
}
}
envelopesToProcess.forEach(envelope -> EXECUTOR.execute(() ->
messageListeners.forEach(listener -> listener.onMessage(envelope, connection))));
envelopesToProcess.forEach(envelope -> ThreadUtils.execute(() -> {
messageListeners.forEach(listener -> listener.onMessage(envelope, connection));
}, THREAD_ID));
}
@ -503,7 +504,7 @@ public class Connection implements HasCapabilities, Runnable, MessageListener {
t.printStackTrace();
} finally {
stopped = true;
EXECUTOR.execute(() -> doShutDown(closeConnectionReason, shutDownCompleteHandler));
ThreadUtils.execute(() -> doShutDown(closeConnectionReason, shutDownCompleteHandler), THREAD_ID);
}
}, "Connection:SendCloseConnectionMessage-" + this.uid).start();
} else {
@ -513,12 +514,12 @@ public class Connection implements HasCapabilities, Runnable, MessageListener {
} else {
//TODO find out why we get called that
log.debug("stopped was already at shutDown call");
EXECUTOR.execute(() -> doShutDown(closeConnectionReason, shutDownCompleteHandler));
ThreadUtils.execute(() -> doShutDown(closeConnectionReason, shutDownCompleteHandler), THREAD_ID);
}
}
private void doShutDown(CloseConnectionReason closeConnectionReason, @Nullable Runnable shutDownCompleteHandler) {
EXECUTOR.execute(() -> connectionListener.onDisconnect(closeConnectionReason, this));
ThreadUtils.execute(() -> connectionListener.onDisconnect(closeConnectionReason, this), THREAD_ID);
try {
protoOutputStream.onConnectionShutdown();
socket.close();
@ -541,7 +542,7 @@ public class Connection implements HasCapabilities, Runnable, MessageListener {
log.debug("Connection shutdown complete {}", this);
if (shutDownCompleteHandler != null)
EXECUTOR.execute(shutDownCompleteHandler);
ThreadUtils.execute(shutDownCompleteHandler, THREAD_ID);
}
}
@ -844,8 +845,8 @@ public class Connection implements HasCapabilities, Runnable, MessageListener {
log.info("We got a {} from a peer with yet unknown address on connection with uid={}", networkEnvelope.getClass().getSimpleName(), uid);
}
EXECUTOR.execute(() -> onMessage(networkEnvelope, this));
EXECUTOR.execute(() -> connectionStatistics.addReceivedMsgMetrics(System.currentTimeMillis() - ts, size));
ThreadUtils.execute(() -> onMessage(networkEnvelope, this), THREAD_ID);
ThreadUtils.execute(() -> connectionStatistics.addReceivedMsgMetrics(System.currentTimeMillis() - ts, size), THREAD_ID);
}
} catch (InvalidClassException e) {
log.error(e.getMessage());
@ -894,7 +895,7 @@ public class Connection implements HasCapabilities, Runnable, MessageListener {
capabilitiesListeners.forEach(weakListener -> {
SupportedCapabilitiesListener supportedCapabilitiesListener = weakListener.get();
if (supportedCapabilitiesListener != null) {
EXECUTOR.execute(() -> supportedCapabilitiesListener.onChanged(supportedCapabilities));
ThreadUtils.execute(() -> supportedCapabilitiesListener.onChanged(supportedCapabilities), THREAD_ID);
}
});
return false;