From 273ceadcc0475b03a2be03ffbeded0831d8232fc Mon Sep 17 00:00:00 2001 From: CryptoManiac Date: Wed, 16 Jul 2014 05:02:26 +0400 Subject: [PATCH] Multi-threaded signatures checking support, rename threads. --- src/bitcoinrpc.cpp | 4 +- src/checkqueue.h | 206 ++++++++++++++++++++++++++++++++++++++++++++++++ src/init.cpp | 22 +++++- src/irc.cpp | 2 +- src/main.cpp | 54 ++++++++++++- src/main.h | 11 +++- src/miner.cpp | 2 +- src/net.cpp | 18 ++-- src/net.h | 1 + src/qt/qtipcserver.cpp | 2 +- src/rpcwallet.cpp | 4 +- src/walletdb.cpp | 2 +- 12 files changed, 305 insertions(+), 23 deletions(-) create mode 100644 src/checkqueue.h diff --git a/src/bitcoinrpc.cpp b/src/bitcoinrpc.cpp index 6610390..b9318f9 100644 --- a/src/bitcoinrpc.cpp +++ b/src/bitcoinrpc.cpp @@ -668,7 +668,7 @@ private: void ThreadRPCServer(void* parg) { // Make this thread recognisable as the RPC listener - RenameThread("bitcoin-rpclist"); + RenameThread("novacoin-rpclist"); try { @@ -968,7 +968,7 @@ static CCriticalSection cs_THREAD_RPCHANDLER; void ThreadRPCServer3(void* parg) { // Make this thread recognisable as the RPC handler - RenameThread("bitcoin-rpchand"); + RenameThread("novacoin-rpchand"); { LOCK(cs_THREAD_RPCHANDLER); diff --git a/src/checkqueue.h b/src/checkqueue.h new file mode 100644 index 0000000..36141dd --- /dev/null +++ b/src/checkqueue.h @@ -0,0 +1,206 @@ +// Copyright (c) 2012 The Bitcoin developers +// Distributed under the MIT/X11 software license, see the accompanying +// file COPYING or http://www.opensource.org/licenses/mit-license.php. +#ifndef CHECKQUEUE_H +#define CHECKQUEUE_H + +#include +#include +#include + +#include +#include + +template class CCheckQueueControl; + +/** Queue for verifications that have to be performed. + * The verifications are represented by a type T, which must provide an + * operator(), returning a bool. + * + * One thread (the master) is assumed to push batches of verifications + * onto the queue, where they are processed by N-1 worker threads. When + * the master is done adding work, it temporarily joins the worker pool + * as an N'th worker, until all jobs are done. + */ +template class CCheckQueue { +private: + // Mutex to protect the inner state + boost::mutex mutex; + + // Worker threads block on this when out of work + boost::condition_variable condWorker; + + // Master thread blocks on this when out of work + boost::condition_variable condMaster; + + // Quit method blocks on this until all workers are gone + boost::condition_variable condQuit; + + // The queue of elements to be processed. + // As the order of booleans doesn't matter, it is used as a LIFO (stack) + std::vector queue; + + // The number of workers (including the master) that are idle. + int nIdle; + + // The total number of workers (including the master). + int nTotal; + + // The temporary evaluation result. + bool fAllOk; + + // Number of verifications that haven't completed yet. + // This includes elements that are not anymore in queue, but still in + // worker's own batches. + unsigned int nTodo; + + // Whether we're shutting down. + bool fQuit; + + // The maximum number of elements to be processed in one batch + unsigned int nBatchSize; + + // Internal function that does bulk of the verification work. + bool Loop(bool fMaster = false) { + boost::condition_variable &cond = fMaster ? condMaster : condWorker; + std::vector vChecks; + vChecks.reserve(nBatchSize); + unsigned int nNow = 0; + bool fOk = true; + do { + { + boost::unique_lock lock(mutex); + // first do the clean-up of the previous loop run (allowing us to do it in the same critsect) + if (nNow) { + fAllOk &= fOk; + nTodo -= nNow; + if (nTodo == 0 && !fMaster) + // We processed the last element; inform the master he can exit and return the result + condMaster.notify_one(); + } else { + // first iteration + nTotal++; + } + // logically, the do loop starts here + while (queue.empty()) { + if ((fMaster || fQuit) && nTodo == 0) { + nTotal--; + if (nTotal==0) + condQuit.notify_one(); + bool fRet = fAllOk; + // reset the status for new work later + if (fMaster) + fAllOk = true; + // return the current status + return fRet; + } + nIdle++; + cond.wait(lock); // wait + nIdle--; + } + // Decide how many work units to process now. + // * Do not try to do everything at once, but aim for increasingly smaller batches so + // all workers finish approximately simultaneously. + // * Try to account for idle jobs which will instantly start helping. + // * Don't do batches smaller than 1 (duh), or larger than nBatchSize. + nNow = std::max(1U, std::min(nBatchSize, (unsigned int)queue.size() / (nTotal + nIdle + 1))); + vChecks.resize(nNow); + for (unsigned int i = 0; i < nNow; i++) { + // We want the lock on the mutex to be as short as possible, so swap jobs from the global + // queue to the local batch vector instead of copying. + vChecks[i].swap(queue.back()); + queue.pop_back(); + } + // Check whether we need to do work at all + fOk = fAllOk; + } + // execute work + BOOST_FOREACH(T &check, vChecks) + if (fOk) + fOk = check(); + vChecks.clear(); + } while(true); + } + +public: + // Create a new check queue + CCheckQueue(unsigned int nBatchSizeIn) : + nIdle(0), nTotal(0), fAllOk(true), nTodo(0), fQuit(false), nBatchSize(nBatchSizeIn) {} + + // Worker thread + void Thread() { + Loop(); + } + + // Wait until execution finishes, and return whether all evaluations where succesful. + bool Wait() { + return Loop(true); + } + + // Add a batch of checks to the queue + void Add(std::vector &vChecks) { + boost::unique_lock lock(mutex); + BOOST_FOREACH(T &check, vChecks) { + queue.push_back(T()); + check.swap(queue.back()); + } + nTodo += vChecks.size(); + if (vChecks.size() == 1) + condWorker.notify_one(); + else if (vChecks.size() > 1) + condWorker.notify_all(); + } + + // Shut the queue down + void Quit() { + boost::unique_lock lock(mutex); + fQuit = true; + // No need to wake the master, as he will quit automatically when all jobs are + // done. + condWorker.notify_all(); + + while (nTotal > 0) + condQuit.wait(lock); + } + + friend class CCheckQueueControl; +}; + +/** RAII-style controller object for a CCheckQueue that guarantees the passed + * queue is finished before continuing. + */ +template class CCheckQueueControl { +private: + CCheckQueue *pqueue; + bool fDone; + +public: + CCheckQueueControl(CCheckQueue *pqueueIn) : pqueue(pqueueIn), fDone(false) { + // passed queue is supposed to be unused, or NULL + if (pqueue != NULL) { + assert(pqueue->nTotal == pqueue->nIdle); + assert(pqueue->nTodo == 0); + assert(pqueue->fAllOk == true); + } + } + + bool Wait() { + if (pqueue == NULL) + return true; + bool fRet = pqueue->Wait(); + fDone = true; + return fRet; + } + + void Add(std::vector &vChecks) { + if (pqueue != NULL) + pqueue->Add(vChecks); + } + + ~CCheckQueueControl() { + if (!fDone) + Wait(); + } +}; + +#endif diff --git a/src/init.cpp b/src/init.cpp index 37c43a5..9c65dab 100644 --- a/src/init.cpp +++ b/src/init.cpp @@ -68,7 +68,7 @@ void Shutdown(void* parg) static bool fTaken; // Make this thread recognisable as the shutdown thread - RenameThread("bitcoin-shutoff"); + RenameThread("novacoin-shutoff"); bool fFirstThread = false; { @@ -86,6 +86,10 @@ void Shutdown(void* parg) nTransactionsUpdated++; // CTxDB().Close(); bitdb.Flush(false); + { + LOCK(cs_main); + ThreadScriptCheckQuit(); + } StopNode(); bitdb.Flush(true); boost::filesystem::remove(GetPidFile()); @@ -298,6 +302,7 @@ std::string HelpMessage() " -salvagewallet " + _("Attempt to recover private keys from a corrupt wallet.dat") + "\n" + " -checkblocks= " + _("How many blocks to check at startup (default: 2500, 0 = all)") + "\n" + " -checklevel= " + _("How thorough the block verification is (0-6, default: 1)") + "\n" + + " -par=N " + _("Set the number of script verification threads (1-16, 0=auto, default: 0)") + "\n" + " -loadblock= " + _("Imports blocks from external blk000?.dat file") + "\n" + "\n" + _("Block creation options:") + "\n" + @@ -421,6 +426,15 @@ bool AppInit2() // ********************************************************* Step 3: parameter-to-internal-flags + // -par=0 means autodetect, but nScriptCheckThreads==0 means no concurrency + nScriptCheckThreads = GetArg("-par", 0); + if (nScriptCheckThreads == 0) + nScriptCheckThreads = boost::thread::hardware_concurrency(); + if (nScriptCheckThreads <= 1) + nScriptCheckThreads = 0; + else if (nScriptCheckThreads > MAX_SCRIPTCHECK_THREADS) + nScriptCheckThreads = MAX_SCRIPTCHECK_THREADS; + fDebug = GetBoolArg("-debug"); // -debug implies fDebug* @@ -533,6 +547,12 @@ bool AppInit2() if (fDaemon) fprintf(stdout, "NovaCoin server starting\n"); + if (nScriptCheckThreads) { + printf("Using %u threads for script verification\n", nScriptCheckThreads); + for (int i=0; i @@ -59,6 +60,7 @@ uint256 nBestInvalidTrust = 0; uint256 hashBestChain = 0; CBlockIndex* pindexBest = NULL; int64 nTimeBestReceived = 0; +int nScriptCheckThreads = 0; CMedianFilter cPeerBlockCounts(5, 0); // Amount of blocks that other nodes claim to have @@ -724,7 +726,7 @@ bool CTxMemPool::accept(CTxDB& txdb, CTransaction &tx, bool fCheckInputs, // Check against previous transactions // This is done last to help prevent CPU exhaustion denial-of-service attacks. - if (!tx.ConnectInputs(txdb, mapInputs, mapUnused, CDiskTxPos(1,1,1), pindexBest, false, false, STANDARD_SCRIPT_VERIFY_FLAGS)) + if (!tx.ConnectInputs(txdb, mapInputs, mapUnused, CDiskTxPos(1,1,1), pindexBest, false, false, true, STANDARD_SCRIPT_VERIFY_FLAGS)) { return error("CTxMemPool::accept() : ConnectInputs failed %s", hash.ToString().substr(0,10).c_str()); } @@ -1471,13 +1473,12 @@ bool VerifySignature(const CTransaction& txFrom, const CTransaction& txTo, unsig } bool CTransaction::ConnectInputs(CTxDB& txdb, MapPrevTx inputs, map& mapTestPool, const CDiskTxPos& posThisTx, - const CBlockIndex* pindexBlock, bool fBlock, bool fMiner, unsigned int flags) + const CBlockIndex* pindexBlock, bool fBlock, bool fMiner, bool fScriptChecks, unsigned int flags, std::vector *pvChecks) { // Take over previous transactions' spent pointers // fBlock is true when this is called from AcceptBlock when a new best-block is added to the blockchain // fMiner is true when called from the internal bitcoin miner // ... both are false when called from CTransaction::AcceptToMemoryPool - bool fScriptChecks = !(fBlock && Checkpoints::GetTotalBlocksEstimate() >= nBestHeight); if (!IsCoinBase()) { @@ -1509,6 +1510,10 @@ bool CTransaction::ConnectInputs(CTxDB& txdb, MapPrevTx inputs, mapreserve(vin.size()); + // The first loop above does all the inexpensive checks. // Only if ALL inputs pass do we perform expensive ECDSA signature checks. // Helps prevent CPU exhaustion attacks. @@ -1531,6 +1536,25 @@ bool CTransaction::ConnectInputs(CTxDB& txdb, MapPrevTx inputs, mappush_back(CScriptCheck()); + check.swap(pvChecks->back()); + } + else if (!check()) + { + if (flags & STANDARD_NOT_MANDATORY_VERIFY_FLAGS) + { + CScriptCheck check(txPrev, *this, i, flags & ~STANDARD_NOT_MANDATORY_VERIFY_FLAGS, 0); + if (!check()) + return error("ConnectInputs() : %s STANDARD_NOT_MANDATORY_VERIFY_FLAGS VerifySignature failed", GetHash().ToString().substr(0,10).c_str()); + } + + return DoS(100,error("ConnectInputs() : %s STANDARD_MANDATORY_VERIFY_FLAGS VerifySignature failed", GetHash().ToString().substr(0,10).c_str())); + } + +/* if (!VerifySignature(txPrev, *this, i, flags, 0)) { if (flags & STANDARD_NOT_MANDATORY_VERIFY_FLAGS) @@ -1540,6 +1564,7 @@ bool CTransaction::ConnectInputs(CTxDB& txdb, MapPrevTx inputs, map scriptcheckqueue(128); + +void ThreadScriptCheck(void*) { + vnThreadsRunning[THREAD_SCRIPTCHECK]++; + RenameThread("novacoin-scriptch"); + scriptcheckqueue.Thread(); + vnThreadsRunning[THREAD_SCRIPTCHECK]--; +} + +void ThreadScriptCheckQuit() { + scriptcheckqueue.Quit(); +} + bool CBlock::ConnectBlock(CTxDB& txdb, CBlockIndex* pindex, bool fJustCheck) { // Check it again in case a previous version let a bad block in, but skip BlockSig checking @@ -1681,6 +1719,7 @@ bool CBlock::ConnectBlock(CTxDB& txdb, CBlockIndex* pindex, bool fJustCheck) // two in the chain that violate it. This prevents exploiting the issue against nodes in their // initial block download. bool fEnforceBIP30 = true; // Always active in NovaCoin + bool fScriptChecks = pindex->nHeight >= Checkpoints::GetTotalBlocksEstimate(); //// issue here: it doesn't know the version unsigned int nTxPos; @@ -1692,6 +1731,8 @@ bool CBlock::ConnectBlock(CTxDB& txdb, CBlockIndex* pindex, bool fJustCheck) nTxPos = pindex->nBlockPos + ::GetSerializeSize(CBlock(), SER_DISK, CLIENT_VERSION) - (2 * GetSizeOfCompactSize(0)) + GetSizeOfCompactSize(vtx.size()); map mapQueuedChanges; + CCheckQueueControl control(fScriptChecks && nScriptCheckThreads ? &scriptcheckqueue : NULL); + int64 nFees = 0; int64 nValueIn = 0; int64 nValueOut = 0; @@ -1740,13 +1781,18 @@ bool CBlock::ConnectBlock(CTxDB& txdb, CBlockIndex* pindex, bool fJustCheck) if (!tx.IsCoinStake()) nFees += nTxValueIn - nTxValueOut; - if (!tx.ConnectInputs(txdb, mapInputs, mapQueuedChanges, posThisTx, pindex, true, false, SCRIPT_VERIFY_NOCACHE | SCRIPT_VERIFY_P2SH)) + std::vector vChecks; + if (!tx.ConnectInputs(txdb, mapInputs, mapQueuedChanges, posThisTx, pindex, true, false, fScriptChecks, SCRIPT_VERIFY_NOCACHE | SCRIPT_VERIFY_P2SH, nScriptCheckThreads ? &vChecks : NULL)) return false; + control.Add(vChecks); } mapQueuedChanges[hashTx] = CTxIndex(posThisTx, tx.vout.size()); } + if (!control.Wait()) + return DoS(100, false); + if (IsProofOfWork()) { int64 nBlockReward = GetProofOfWorkReward(nBits, fProtocol048 ? nFees : 0); diff --git a/src/main.h b/src/main.h index dc8499a..4c42454 100644 --- a/src/main.h +++ b/src/main.h @@ -44,6 +44,8 @@ static const int64 MIN_TXOUT_AMOUNT = CENT/100; inline bool MoneyRange(int64 nValue) { return (nValue >= 0 && nValue <= MAX_MONEY); } // Threshold for nLockTime: below this value it is interpreted as block number, otherwise as UNIX timestamp. static const unsigned int LOCKTIME_THRESHOLD = 500000000; // Tue Nov 5 00:53:20 1985 UTC +// Maximum number of script-checking threads allowed +static const int MAX_SCRIPTCHECK_THREADS = 16; #ifdef USE_UPNP static const int fHaveUPnP = true; @@ -87,6 +89,7 @@ extern int64 nTransactionFee; extern int64 nMinimumInputValue; extern bool fUseFastIndex; extern unsigned int nDerivationMethodIndex; +extern int nScriptCheckThreads; // Minimum disk space required - used in CheckDiskSpace() static const uint64 nMinDiskSpace = 52428800; @@ -94,6 +97,7 @@ static const uint64 nMinDiskSpace = 52428800; class CReserveKey; class CTxDB; class CTxIndex; +class CScriptCheck; void RegisterWallet(CWallet* pwalletIn); void UnregisterWallet(CWallet* pwalletIn); @@ -109,6 +113,11 @@ bool ProcessMessages(CNode* pfrom); bool SendMessages(CNode* pto, bool fSendTrickle); bool LoadExternalBlockFile(FILE* fileIn); +// Run an instance of the script checking thread +void ThreadScriptCheck(void* parg); +// Stop the script checking threads +void ThreadScriptCheckQuit(); + bool CheckProofOfWork(uint256 hash, unsigned int nBits); unsigned int GetNextTargetRequired(const CBlockIndex* pindexLast, bool fProofOfStake); int64 GetProofOfWorkReward(unsigned int nBits, int64 nFees=0); @@ -695,7 +704,7 @@ public: */ bool ConnectInputs(CTxDB& txdb, MapPrevTx inputs, std::map& mapTestPool, const CDiskTxPos& posThisTx, - const CBlockIndex* pindexBlock, bool fBlock, bool fMiner, unsigned int flags=STANDARD_SCRIPT_VERIFY_FLAGS); + const CBlockIndex* pindexBlock, bool fBlock, bool fMiner, bool fScriptChecks=true, unsigned int flags=STANDARD_SCRIPT_VERIFY_FLAGS, std::vector *pvChecks = NULL); bool ClientConnectInputs(); bool CheckTransaction() const; bool AcceptToMemoryPool(CTxDB& txdb, bool fCheckInputs=true, bool* pfMissingInputs=NULL); diff --git a/src/miner.cpp b/src/miner.cpp index 959175b..6dff176 100644 --- a/src/miner.cpp +++ b/src/miner.cpp @@ -308,7 +308,7 @@ CBlock* CreateNewBlock(CWallet* pwallet, bool fProofOfStake) if (nBlockSigOps + nTxSigOps >= MAX_BLOCK_SIGOPS) continue; - if (!tx.ConnectInputs(txdb, mapInputs, mapTestPoolTmp, CDiskTxPos(1,1,1), pindexPrev, false, true, MANDATORY_SCRIPT_VERIFY_FLAGS)) + if (!tx.ConnectInputs(txdb, mapInputs, mapTestPoolTmp, CDiskTxPos(1,1,1), pindexPrev, false, true, true, MANDATORY_SCRIPT_VERIFY_FLAGS)) continue; mapTestPoolTmp[tx.GetHash()] = CTxIndex(CDiskTxPos(1,1,1), tx.vout.size()); swap(mapTestPool, mapTestPoolTmp); diff --git a/src/net.cpp b/src/net.cpp index 206d164..13fbbdf 100644 --- a/src/net.cpp +++ b/src/net.cpp @@ -409,7 +409,7 @@ bool GetMyExternalIP(CNetAddr& ipRet) void ThreadGetMyExternalIP(void* parg) { // Make this thread recognisable as the external IP detection thread - RenameThread("bitcoin-ext-ip"); + RenameThread("novacoin-ext-ip"); CNetAddr addrLocalHost; if (GetMyExternalIP(addrLocalHost)) @@ -641,7 +641,7 @@ void CNode::copyStats(CNodeStats &stats) void ThreadSocketHandler(void* parg) { // Make this thread recognisable as the networking thread - RenameThread("bitcoin-net"); + RenameThread("novacoin-net"); try { @@ -1000,7 +1000,7 @@ void ThreadSocketHandler2(void* parg) void ThreadMapPort(void* parg) { // Make this thread recognisable as the UPnP thread - RenameThread("bitcoin-UPnP"); + RenameThread("novacoin-UPnP"); try { @@ -1159,7 +1159,7 @@ static const char *strDNSSeed[][2] = { void ThreadDNSAddressSeed(void* parg) { // Make this thread recognisable as the DNS seeding thread - RenameThread("bitcoin-dnsseed"); + RenameThread("novacoin-dnsseed"); try { @@ -1254,7 +1254,7 @@ void ThreadDumpAddress2(void* parg) void ThreadDumpAddress(void* parg) { // Make this thread recognisable as the address dumping thread - RenameThread("bitcoin-adrdump"); + RenameThread("novacoin-adrdump"); try { @@ -1269,7 +1269,7 @@ void ThreadDumpAddress(void* parg) void ThreadOpenConnections(void* parg) { // Make this thread recognisable as the connection opening thread - RenameThread("bitcoin-opencon"); + RenameThread("novacoin-opencon"); try { @@ -1451,7 +1451,7 @@ void ThreadOpenConnections2(void* parg) void ThreadOpenAddedConnections(void* parg) { // Make this thread recognisable as the connection opening thread - RenameThread("bitcoin-opencon"); + RenameThread("novacoin-opencon"); try { @@ -1582,7 +1582,7 @@ bool OpenNetworkConnection(const CAddress& addrConnect, CSemaphoreGrant *grantOu void ThreadMessageHandler(void* parg) { // Make this thread recognisable as the message handling thread - RenameThread("bitcoin-msghand"); + RenameThread("novacoin-msghand"); try { @@ -1833,7 +1833,7 @@ void static Discover() void StartNode(void* parg) { // Make this thread recognisable as the startup thread - RenameThread("bitcoin-start"); + RenameThread("novacoin-start"); if (semOutbound == NULL) { // initialize semaphore diff --git a/src/net.h b/src/net.h index be71da9..0d51bff 100644 --- a/src/net.h +++ b/src/net.h @@ -107,6 +107,7 @@ enum threadId THREAD_DUMPADDRESS, THREAD_RPCHANDLER, THREAD_MINTER, + THREAD_SCRIPTCHECK, THREAD_MAX }; diff --git a/src/qt/qtipcserver.cpp b/src/qt/qtipcserver.cpp index bb49dac..739e4d1 100644 --- a/src/qt/qtipcserver.cpp +++ b/src/qt/qtipcserver.cpp @@ -75,7 +75,7 @@ void ipcScanRelay(int argc, char *argv[]) static void ipcThread(void* pArg) { // Make this thread recognisable as the GUI-IPC thread - RenameThread("bitcoin-gui-ipc"); + RenameThread("novacoin-gui-ipc"); try { diff --git a/src/rpcwallet.cpp b/src/rpcwallet.cpp index 78066f2..aa1747a 100644 --- a/src/rpcwallet.cpp +++ b/src/rpcwallet.cpp @@ -1377,7 +1377,7 @@ Value keypoolrefill(const Array& params, bool fHelp) void ThreadTopUpKeyPool(void* parg) { // Make this thread recognisable as the key-topping-up thread - RenameThread("bitcoin-key-top"); + RenameThread("novacoin-key-top"); pwalletMain->TopUpKeyPool(); } @@ -1385,7 +1385,7 @@ void ThreadTopUpKeyPool(void* parg) void ThreadCleanWalletPassphrase(void* parg) { // Make this thread recognisable as the wallet relocking thread - RenameThread("bitcoin-lock-wa"); + RenameThread("novacoin-lock-wa"); int64 nMyWakeTime = GetTimeMillis() + *((int64*)parg) * 1000; diff --git a/src/walletdb.cpp b/src/walletdb.cpp index e23b291..556faf3 100644 --- a/src/walletdb.cpp +++ b/src/walletdb.cpp @@ -543,7 +543,7 @@ DBErrors CWalletDB::LoadWallet(CWallet* pwallet) void ThreadFlushWalletDB(void* parg) { // Make this thread recognisable as the wallet flushing thread - RenameThread("bitcoin-wallet"); + RenameThread("novacoin-wallet"); const string& strFile = ((const string*)parg)[0]; static bool fOneThread; -- 1.7.1