handle connection change on dedicated thread, other thread improvements

This commit is contained in:
woodser 2023-12-18 08:01:07 -05:00
parent b1f8411641
commit f162cad439
11 changed files with 121 additions and 72 deletions

View File

@ -263,6 +263,7 @@ public final class XmrConnectionService {
public void verifyConnection() {
if (daemon == null) throw new RuntimeException("No connection to Monero node");
if (!Boolean.TRUE.equals(isConnected())) throw new RuntimeException("No connection to Monero node");
if (!isSyncedWithinTolerance()) throw new RuntimeException("Monero node is not synced");
}
@ -493,10 +494,11 @@ public final class XmrConnectionService {
}, getDefaultRefreshPeriodMs() * 2 / 1000);
}
// notify final connection
isInitialized = true;
onConnectionChanged(connectionManager.getConnection());
}
// notify initial connection
onConnectionChanged(connectionManager.getConnection());
}
private void maybeStartLocalNode() {
@ -541,7 +543,7 @@ public final class XmrConnectionService {
// notify listeners in parallel
synchronized (listenerLock) {
for (MoneroConnectionManagerListener listener : listeners) {
new Thread(() -> listener.onConnectionChanged(currentConnection)).start();
HavenoUtils.submitToPool(() -> listener.onConnectionChanged(currentConnection));
}
}
}

View File

@ -34,8 +34,6 @@ import haveno.network.p2p.BootstrapListener;
import haveno.network.p2p.P2PService;
import haveno.network.p2p.storage.HashMapChangedListener;
import haveno.network.p2p.storage.payload.ProtectedStorageEntry;
import monero.common.MoneroConnectionManagerListener;
import monero.common.MoneroRpcConnection;
import monero.daemon.model.MoneroKeyImageSpentStatus;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -94,13 +92,10 @@ public class OfferBookService {
jsonFileManager = new JsonFileManager(storageDir);
// listen for connection changes to monerod
xmrConnectionService.addConnectionListener(new MoneroConnectionManagerListener() {
@Override
public void onConnectionChanged(MoneroRpcConnection connection) {
maybeInitializeKeyImagePoller();
keyImagePoller.setDaemon(xmrConnectionService.getDaemon());
keyImagePoller.setRefreshPeriodMs(getKeyImageRefreshPeriodMs());
}
xmrConnectionService.addConnectionListener((connection) -> {
maybeInitializeKeyImagePoller();
keyImagePoller.setDaemon(xmrConnectionService.getDaemon());
keyImagePoller.setRefreshPeriodMs(getKeyImageRefreshPeriodMs());
});
// listen for offers

View File

@ -75,8 +75,6 @@ import haveno.network.p2p.peers.PeerManager;
import javafx.collections.FXCollections;
import javafx.collections.ObservableList;
import lombok.Getter;
import monero.common.MoneroConnectionManagerListener;
import monero.common.MoneroRpcConnection;
import monero.daemon.model.MoneroKeyImageSpentStatus;
import monero.daemon.model.MoneroTx;
import monero.wallet.model.MoneroIncomingTransfer;
@ -208,12 +206,7 @@ public class OpenOfferManager implements PeerManager.Listener, DecryptedDirectMe
this.signedOfferPersistenceManager.initialize(signedOffers, "SignedOffers", PersistenceManager.Source.PRIVATE); // arbitrator stores reserve tx for signed offers
// listen for connection changes to monerod
xmrConnectionService.addConnectionListener(new MoneroConnectionManagerListener() {
@Override
public void onConnectionChanged(MoneroRpcConnection connection) {
maybeInitializeKeyImagePoller();
}
});
xmrConnectionService.addConnectionListener((connection) -> maybeInitializeKeyImagePoller());
// close open offer if reserved funds spent
offerBookService.addOfferBookChangedListener(new OfferBookChangedListener() {
@ -308,7 +301,7 @@ public class OpenOfferManager implements PeerManager.Listener, DecryptedDirectMe
}
public void shutDown(@Nullable Runnable completeHandler) {
HavenoUtils.shutDownThreadId(THREAD_ID);
HavenoUtils.removeThreadId(THREAD_ID);
stopped = true;
p2PService.getPeerManager().removeListener(this);
p2PService.removeDecryptedDirectMessageListener(this);

View File

@ -235,7 +235,7 @@ public class PriceFeedService {
if (baseUrlOfRespondingProvider == null) {
final String oldBaseUrl = priceProvider.getBaseUrl();
setNewPriceProvider();
log.warn("We did not received a response from provider {}. " +
log.warn("We did not receive a response from provider {}. " +
"We select the new provider {} and use that for a new request.", oldBaseUrl, priceProvider.getBaseUrl());
}
request(true);

View File

@ -398,7 +398,7 @@ public final class ArbitrationManager extends DisputeManager<ArbitrationDisputeL
if (!expectedSellerAmount.equals(actualSellerAmount)) throw new RuntimeException("Unexpected seller payout: " + expectedSellerAmount + " vs " + actualSellerAmount);
// check wallet's daemon connection
trade.checkDaemonConnection();
trade.checkAndVerifyDaemonConnection();
// determine if we already signed dispute payout tx
// TODO: better way, such as by saving signed dispute payout tx hex in designated field instead of shared payoutTxHex field?

View File

@ -45,6 +45,7 @@ import java.text.DecimalFormat;
import java.text.DecimalFormatSymbols;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
@ -474,7 +475,13 @@ public class HavenoUtils {
}
public static Future<?> submitToPool(Runnable task) {
return POOL.submit(task);
return submitToPool(Arrays.asList(task)).get(0);
}
public static List<Future<?>> submitToPool(List<Runnable> tasks) {
List<Future<?>> futures = new ArrayList<>();
for (Runnable task : tasks) futures.add(POOL.submit(task));
return futures;
}
public static Future<?> submitToSharedThread(Runnable task) {
@ -488,7 +495,17 @@ public class HavenoUtils {
}
}
public static void shutDownThreadId(String threadId) {
public static Future<?> awaitThread(Runnable task, String threadId) {
Future<?> future = submitToThread(task, threadId);
try {
future.get();
} catch (Exception e) {
throw new RuntimeException(e);
}
return future;
}
public static void removeThreadId(String threadId) {
synchronized (POOLS) {
if (POOLS.containsKey(threadId)) {
POOLS.get(threadId).shutdown();
@ -497,7 +514,11 @@ public class HavenoUtils {
}
}
// TODO: update monero-java and replace with GenUtils.awaitTasks()
// TODO: these are unused; remove? use monero-java awaitTasks() when updated
public static Future<?> awaitTask(Runnable task) {
return awaitTasks(Arrays.asList(task)).get(0);
}
public static List<Future<?>> awaitTasks(Collection<Runnable> tasks) {
return awaitTasks(tasks, tasks.size());

View File

@ -102,6 +102,7 @@ import java.util.Arrays;
import java.util.Date;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ThreadLocalRandom;
import java.util.stream.Collectors;
@ -376,6 +377,8 @@ public abstract class Trade implements Tradable, Model {
// Immutable
@Getter
transient final private XmrWalletService xmrWalletService;
@Getter
transient final private XmrConnectionService xmrConnectionService;
transient final private DoubleProperty initProgressProperty = new SimpleDoubleProperty(0.0);
transient final private ObjectProperty<State> stateProperty = new SimpleObjectProperty<>(state);
@ -476,6 +479,7 @@ public abstract class Trade implements Tradable, Model {
this.takerFee = takerFee.longValueExact();
this.price = tradePrice;
this.xmrWalletService = xmrWalletService;
this.xmrConnectionService = xmrWalletService.getConnectionService();
this.processModel = processModel;
this.uid = uid;
this.takeOfferDate = new Date().getTime();
@ -588,9 +592,11 @@ public abstract class Trade implements Tradable, Model {
getArbitrator().setPubKeyRing(arbitrator.getPubKeyRing());
});
// handle daemon changes with max parallelization
xmrWalletService.getConnectionService().addConnectionListener(newConnection -> {
HavenoUtils.submitToPool(() -> onConnectionChanged(newConnection));
// handle connection change on dedicated thread
xmrConnectionService.addConnectionListener(connection -> {
HavenoUtils.submitToPool(() -> {
HavenoUtils.submitToThread(() -> onConnectionChanged(connection), getConnectionChangedThreadId());
});
});
// check if done
@ -644,7 +650,7 @@ public abstract class Trade implements Tradable, Model {
new Thread(() -> {
GenUtils.waitFor(1000);
if (isShutDownStarted) return;
if (Boolean.TRUE.equals(xmrWalletService.getConnectionService().isConnected())) xmrWalletService.syncWallet(xmrWalletService.getWallet());
if (Boolean.TRUE.equals(xmrConnectionService.isConnected())) xmrWalletService.syncWallet(xmrWalletService.getWallet());
}).start();
// complete disputed trade
@ -762,19 +768,31 @@ public abstract class Trade implements Tradable, Model {
return MONERO_TRADE_WALLET_PREFIX + getId();
}
public void checkDaemonConnection() {
XmrConnectionService xmrConnectionService = xmrWalletService.getConnectionService();
public void checkAndVerifyDaemonConnection() {
// check connection which might update
xmrConnectionService.checkConnection();
xmrConnectionService.verifyConnection();
if (!getWallet().isConnectedToDaemon()) throw new RuntimeException("Trade wallet is not connected to a Monero node");
// check wallet connection on same thread as connection change
CountDownLatch latch = new CountDownLatch(1);
HavenoUtils.submitToPool((() -> {
HavenoUtils.submitToThread(() -> {
if (!isWalletConnectedToDaemon()) throw new RuntimeException("Trade wallet is not connected to a Monero node"); // wallet connection is updated on trade thread
latch.countDown();
}, getConnectionChangedThreadId());
}));
HavenoUtils.awaitLatch(latch); // TODO: better way?
}
public boolean isWalletConnected() {
try {
checkDaemonConnection();
return true;
} catch (Exception e) {
return false;
public boolean isWalletConnectedToDaemon() {
synchronized (walletLock) {
try {
if (wallet == null) return false;
return wallet.isConnectedToDaemon();
} catch (Exception e) {
return false;
}
}
}
@ -790,7 +808,7 @@ public abstract class Trade implements Tradable, Model {
syncNormalStartTimeMs = System.currentTimeMillis();
// override wallet refresh period
setWalletRefreshPeriod(xmrWalletService.getConnectionService().getRefreshPeriodMs());
setWalletRefreshPeriod(xmrConnectionService.getRefreshPeriodMs());
// reset wallet refresh period after duration
new Thread(() -> {
@ -934,7 +952,7 @@ public abstract class Trade implements Tradable, Model {
public MoneroTxWallet createPayoutTx() {
// check connection to monero daemon
checkDaemonConnection();
checkAndVerifyDaemonConnection();
// check multisig import
if (getWallet().isMultisigImportNeeded()) throw new RuntimeException("Cannot create payout tx because multisig import is needed");
@ -1022,7 +1040,7 @@ public abstract class Trade implements Tradable, Model {
if (!sellerPayoutDestination.getAmount().equals(expectedSellerPayout)) throw new IllegalArgumentException("Seller destination amount is not deposit amount - trade amount - 1/2 tx costs, " + sellerPayoutDestination.getAmount() + " vs " + expectedSellerPayout);
// check wallet connection
if (sign || publish) checkDaemonConnection();
if (sign || publish) checkAndVerifyDaemonConnection();
// handle tx signing
if (sign) {
@ -1217,6 +1235,11 @@ public abstract class Trade implements Tradable, Model {
public void onComplete() {
}
public void onRemoved() {
HavenoUtils.removeThreadId(getId());
HavenoUtils.removeThreadId(getConnectionChangedThreadId());
}
///////////////////////////////////////////////////////////////////////////////////////////
// Abstract
@ -1764,7 +1787,7 @@ public abstract class Trade implements Tradable, Model {
*/
public long getReprocessDelayInSeconds(int reprocessCount) {
int retryCycles = 3; // reprocess on next refresh periods for first few attempts (app might auto switch to a good connection)
if (reprocessCount < retryCycles) return xmrWalletService.getConnectionService().getRefreshPeriodMs() / 1000;
if (reprocessCount < retryCycles) return xmrConnectionService.getRefreshPeriodMs() / 1000;
long delay = 60;
for (int i = retryCycles; i < reprocessCount; i++) delay *= 2;
return Math.min(MAX_REPROCESS_DELAY_SECONDS, delay);
@ -1775,6 +1798,10 @@ public abstract class Trade implements Tradable, Model {
// Private
///////////////////////////////////////////////////////////////////////////////////////////
private String getConnectionChangedThreadId() {
return getId() + ".onConnectionChanged";
}
// lazy initialization
private ObjectProperty<BigInteger> getAmountProperty() {
if (tradeAmountProperty == null)
@ -1812,7 +1839,7 @@ public abstract class Trade implements Tradable, Model {
// sync and reprocess messages on new thread
if (isInitialized && connection != null && !Boolean.FALSE.equals(connection.isConnected())) {
new Thread(() -> initSyncing()).start();
HavenoUtils.submitToPool(() -> initSyncing());
}
}
}
@ -1824,9 +1851,9 @@ public abstract class Trade implements Tradable, Model {
} else {
long startSyncingInMs = ThreadLocalRandom.current().nextLong(0, getWalletRefreshPeriod()); // random time to start syncing
UserThread.runAfter(() -> {
if (!isShutDownStarted) {
initSyncingAux();
}
HavenoUtils.submitToPool(() -> {
if (!isShutDownStarted) initSyncingAux();
});
}, startSyncingInMs / 1000l);
}
}
@ -1863,7 +1890,7 @@ public abstract class Trade implements Tradable, Model {
if (!wasWalletSynced) {
wasWalletSynced = true;
if (xmrWalletService.isProxyApplied(wasWalletSynced)) {
onConnectionChanged(xmrWalletService.getConnectionService().getConnection());
onConnectionChanged(xmrConnectionService.getConnection());
}
}
@ -1916,12 +1943,11 @@ public abstract class Trade implements Tradable, Model {
}
private void pollWallet() {
synchronized (walletLock) {
MoneroWallet wallet = getWallet();
try {
try {
synchronized (walletLock) {
// log warning if wallet is too far behind daemon
MoneroDaemonInfo lastInfo = xmrWalletService.getConnectionService().getLastInfo();
MoneroDaemonInfo lastInfo = xmrConnectionService.getLastInfo();
long walletHeight = wallet.getHeight();
if (wasWalletSynced && isDepositsPublished() && !isIdling() && lastInfo != null && walletHeight < lastInfo.getHeight() - 3 && !Config.baseCurrencyNetwork().isTestnet()) {
log.warn("Wallet is more than 3 blocks behind monerod for {} {}, wallet height={}, monerod height={},", getClass().getSimpleName(), getShortId(), walletHeight, lastInfo.getHeight());
@ -2000,18 +2026,20 @@ public abstract class Trade implements Tradable, Model {
}
}
}
} catch (Exception e) {
if (!isShutDownStarted && wallet != null && isWalletConnected()) {
e.printStackTrace();
log.warn("Error polling trade wallet for {} {}: {}. Monerod={}", getClass().getSimpleName(), getId(), e.getMessage(), getXmrWalletService().getConnectionService().getConnection());
}
}
} catch (Exception e) {
boolean isWalletConnected = isWalletConnectedToDaemon();
if (!isWalletConnected) xmrConnectionService.checkConnection(); // check connection if wallet is not connected
if (!isShutDownStarted && wallet != null && isWalletConnected) {
e.printStackTrace();
log.warn("Error polling trade wallet for {} {}: {}. Monerod={}", getClass().getSimpleName(), getId(), e.getMessage(), getXmrWalletService().getConnectionService().getConnection());
}
}
}
private long getWalletRefreshPeriod() {
if (isIdling()) return IDLE_SYNC_PERIOD_MS;
return xmrWalletService.getConnectionService().getRefreshPeriodMs();
return xmrConnectionService.getRefreshPeriodMs();
}
private void setStateDepositsPublished() {
@ -2082,8 +2110,12 @@ public abstract class Trade implements Tradable, Model {
processing = false;
} catch (Exception e) {
processing = false;
e.printStackTrace();
if (isInitialized && !isShutDownStarted && !isWalletConnected()) throw e;
boolean isWalletConnected = isWalletConnectedToDaemon();
if (!isWalletConnected) xmrConnectionService.checkConnection(); // check connection if wallet is not connected
if (isInitialized &&!isShutDownStarted && isWalletConnected) {
e.printStackTrace();
log.warn("Error polling idle trade for {} {}: {}. Monerod={}", getClass().getSimpleName(), getId(), e.getMessage(), getXmrWalletService().getConnectionService().getConnection());
};
}
}, getId());
}

View File

@ -1206,7 +1206,7 @@ public class TradeManager implements PersistedDataHost, DecryptedDirectMessageLi
// remove trade
tradableList.remove(trade);
HavenoUtils.shutDownThreadId(trade.getId());
trade.onRemoved();
// unregister and persist
p2PService.removeDecryptedDirectMessageListener(getTradeProtocol(trade));

View File

@ -114,7 +114,7 @@ public abstract class TradeProtocol implements DecryptedDirectMessageListener, D
protected void onTradeMessage(TradeMessage message, NodeAddress peerNodeAddress) {
log.info("Received {} as TradeMessage from {} with tradeId {} and uid {}", message.getClass().getSimpleName(), peerNodeAddress, message.getTradeId(), message.getUid());
new Thread(() -> handle(message, peerNodeAddress)).start();
HavenoUtils.submitToThread(() -> handle(message, peerNodeAddress), trade.getId());
}
protected void onMailboxMessage(TradeMessage message, NodeAddress peerNodeAddress) {
@ -264,9 +264,9 @@ public abstract class TradeProtocol implements DecryptedDirectMessageListener, D
}
private void maybeSendDepositsConfirmedMessage() {
new Thread(() -> maybeSendDepositsConfirmedMessages()).start();
HavenoUtils.submitToThread(() -> maybeSendDepositsConfirmedMessages(), trade.getId());
EasyBind.subscribe(trade.stateProperty(), state -> {
new Thread(() -> maybeSendDepositsConfirmedMessages()).start();
HavenoUtils.submitToThread(() -> maybeSendDepositsConfirmedMessages(), trade.getId());
});
}
@ -279,7 +279,9 @@ public abstract class TradeProtocol implements DecryptedDirectMessageListener, D
}
log.warn("Reprocessing payment received message for {} {}", trade.getClass().getSimpleName(), trade.getId());
new Thread(() -> handle(trade.getSeller().getPaymentReceivedMessage(), trade.getSeller().getPaymentReceivedMessage().getSenderNodeAddress(), reprocessOnError)).start();
HavenoUtils.submitToThread(() -> {
handle(trade.getSeller().getPaymentReceivedMessage(), trade.getSeller().getPaymentReceivedMessage().getSenderNodeAddress(), reprocessOnError);
}, trade.getId());
}
}
@ -351,9 +353,10 @@ public abstract class TradeProtocol implements DecryptedDirectMessageListener, D
.executeTasks(true);
awaitTradeLatch();
} else {
// process sign contract request after multisig created
EasyBind.subscribe(trade.stateProperty(), state -> {
if (state == Trade.State.MULTISIG_COMPLETED) new Thread(() -> handleSignContractRequest(message, sender)).start(); // process notification without trade lock
if (state == Trade.State.MULTISIG_COMPLETED) HavenoUtils.submitToThread(() -> handleSignContractRequest(message, sender), trade.getId()); // process notification without trade lock
});
}
}
@ -393,9 +396,10 @@ public abstract class TradeProtocol implements DecryptedDirectMessageListener, D
.executeTasks(true);
awaitTradeLatch();
} else {
// process sign contract response after contract signed
EasyBind.subscribe(trade.stateProperty(), state -> {
if (state == Trade.State.CONTRACT_SIGNED) new Thread(() -> handleSignContractResponse(message, sender)).start(); // process notification without trade lock
if (state == Trade.State.CONTRACT_SIGNED) HavenoUtils.submitToThread(() -> handleSignContractResponse(message, sender), trade.getId()); // process notification without trade lock
});
}
}

View File

@ -37,7 +37,7 @@ public class SellerPreparePaymentReceivedMessage extends TradeTask {
runInterceptHook();
// check connection
trade.checkDaemonConnection();
trade.checkAndVerifyDaemonConnection();
// handle first time preparation
if (trade.getArbitrator().getPaymentReceivedMessage() == null) {

View File

@ -687,7 +687,9 @@ public class XmrWalletService {
private void initialize() {
// listen for connection changes
xmrConnectionService.addConnectionListener(newConnection -> onConnectionChanged(newConnection));
xmrConnectionService.addConnectionListener(connection -> {
HavenoUtils.submitToThread(() -> onConnectionChanged(connection), THREAD_ID);
});
// wait for monerod to sync
if (xmrConnectionService.downloadPercentageProperty().get() != 1) {
@ -937,7 +939,7 @@ public class XmrWalletService {
// sync wallet on new thread
if (connection != null) {
wallet.getDaemonConnection().setPrintStackTrace(PRINT_STACK_TRACE);
HavenoUtils.submitToThread(() -> {
HavenoUtils.submitToPool(() -> {
synchronized (walletLock) {
try {
if (Boolean.TRUE.equals(connection.isConnected())) wallet.sync();
@ -946,7 +948,7 @@ public class XmrWalletService {
log.warn("Failed to sync main wallet after setting daemon connection: " + e.getMessage());
}
}
}, THREAD_ID);
});
}
log.info("Done setting main wallet daemon connection: " + (connection == null ? null : connection.getUri()));