From ce2fafb656aa4a5ae43bda3617c6bdedcf9c5a84 Mon Sep 17 00:00:00 2001 From: genjix Date: Sun, 22 Apr 2012 01:49:59 +0100 Subject: [PATCH] C++ core with working memory pool --- backends/libbitcoin/Makefile | 4 + backends/libbitcoin/__init__.py | 91 ++++++++----- backends/libbitcoin/h1.py | 8 +- backends/libbitcoin/history.cpp | 23 ++- backends/libbitcoin/history1/__init__.py | 6 +- backends/libbitcoin/membuf.cpp | 40 ++++++ backends/libbitcoin/memory_buffer.hpp | 205 ++++++++++++++++++++++++++++++ processor.py | 2 +- 8 files changed, 330 insertions(+), 49 deletions(-) create mode 100644 backends/libbitcoin/membuf.cpp create mode 100644 backends/libbitcoin/memory_buffer.hpp diff --git a/backends/libbitcoin/Makefile b/backends/libbitcoin/Makefile index 3f44322..96cc1cf 100644 --- a/backends/libbitcoin/Makefile +++ b/backends/libbitcoin/Makefile @@ -1,5 +1,9 @@ CC = g++ -fPIC -Wall -ansi `pkg-config --cflags libbitcoin` -I/usr/include/python2.7 +membuf: + $(CC) -c membuf.cpp -o membuf.o + $(CC) -shared -Wl,-soname,membuf.so membuf.o -lpython2.7 -lboost_python `pkg-config --libs libbitcoin` -lboost_thread -o membuf.so + default: $(CC) -c history.cpp -o history.o $(CC) -shared -Wl,-soname,_history.so history.o -lpython2.7 -lboost_python `pkg-config --libs libbitcoin` -lboost_thread -o history1/_history.so diff --git a/backends/libbitcoin/__init__.py b/backends/libbitcoin/__init__.py index cf2a462..8e51169 100644 --- a/backends/libbitcoin/__init__.py +++ b/backends/libbitcoin/__init__.py @@ -4,7 +4,8 @@ from processor import Processor import threading import time -import history +import history1 as history +import membuf class HistoryCache: @@ -31,9 +32,10 @@ class HistoryCache: class MonitorAddress: - def __init__(self, processor, cache): + def __init__(self, processor, cache, backend): self.processor = processor self.cache = cache + self.backend = backend self.lock = threading.Lock() # key is hash:index, value is address self.monitor_output = {} @@ -42,6 +44,8 @@ class MonitorAddress: # affected self.affected = {} + backend.memory_buffer.set_handles(self.tx_stored, self.tx_confirmed) + def monitor(self, address, result): for info in result: if not info.has_key("raw_output_script"): @@ -55,11 +59,24 @@ class MonitorAddress: with self.lock: self.monitor_address.add(address) - def tx_stored(self, tx_desc): - tx_hash, prevouts, addrs = tx_desc + def unpack(self, tx): + tx_hash = bitcoin.hash_transaction(tx) + previous_outputs = [] + for input in tx.inputs: + prevout = input.previous_output + prevout = "%s:%s" % (prevout.hash, prevout.index) + previous_outputs.append(prevout) + addrs = [] + for output_index, output in enumerate(tx.outputs): + address = bitcoin.payment_address() + if address.extract(output.output_script): + addrs.append((output_index, str(address))) + return tx_hash, previous_outputs, addrs + + def tx_stored(self, tx): affected_addrs = set() - for prevout_hash, prevout_index in prevouts: - prevout = "%s:%s" % (prevout_hash, prevout_index) + tx_hash, previous_outputs, addrs = self.unpack(tx) + for prevout in previous_outputs: with self.lock: if self.monitor_output.has_key(prevout): affected_addrs.add(self.monitor_output[prevout]) @@ -73,7 +90,7 @@ class MonitorAddress: self.notify(affected_addrs) def tx_confirmed(self, tx_desc): - tx_hash, prevouts, addrs = tx_desc + tx_hash, previous_outputs, addrs = self.unpack(tx) with self.lock: affected_addrs = self.affected[tx_hash] del self.affected[tx_hash] @@ -86,8 +103,7 @@ class MonitorAddress: if address in affected_addrs: self.monitor_output[outpoint] = address # delete spent outpoints - for prevout_hash, prevout_index in prevouts: - prevout = "%s:%s" % (prevout_hash, prevout_index) + for prevout in previous_outputs: with self.lock: if self.monitor_output.has_key(prevout): del self.monitor_output[prevout] @@ -96,14 +112,15 @@ class MonitorAddress: templ_response = {"id": None, "method": "blockchain.address.subscribe", "params": []} + service = self.backend.mempool_service chain = self.backend.blockchain txpool = self.backend.transaction_pool - membuf = self.backend.pool_buffer + memory_buff = self.backend.memory_buffer for address in affected_addrs: response = templ_response.copy() response["params"].append(address) - history.payment_history(chain, txpool, membuf, address, - bind(self.send_notify, _1, response)) + history.payment_history(service, chain, txpool, memory_buff, + address, bind(self.send_notify, _1, _2, response)) def mempool_n(self, result): assert result is not None @@ -118,13 +135,16 @@ class MonitorAddress: last_id = last_info["block_hash"] return last_id - def send_notify(self, result, response): + def send_notify(self, ec, result, response): + if ec: + print "Error: Monitor.send_notify()", ec + return response["params"].append(self.mempool_n(result)) self.processor.push_response(response) class Backend: - def __init__(self, monitor): + def __init__(self): # Create 3 thread-pools each with 1 thread self.network_service = bitcoin.async_service(1) self.disk_service = bitcoin.async_service(1) @@ -150,9 +170,10 @@ class Backend: self.poller, self.transaction_pool) self.session.start(self.handle_start) - self.pool_buffer = \ - history.MemoryPoolBuffer(self.transaction_pool, - self.blockchain, monitor) + self.memory_buffer = \ + membuf.memory_buffer(self.mempool_service.internal_ptr, + self.blockchain.internal_ptr, + self.transaction_pool.internal_ptr) def handle_start(self, ec): if ec: @@ -183,7 +204,7 @@ class Backend: print "Error with new transaction:", ec return tx_hash = bitcoin.hash_transaction(tx) - self.pool_buffer.recv_tx(tx, bind(self.store_tx, _1, tx_hash)) + self.memory_buffer.receive(tx, bind(self.store_tx, _1, tx_hash)) # Re-subscribe to new transactions from node node.subscribe_transaction(bind(self.recv_tx, _1, _2, node)) @@ -247,16 +268,17 @@ class AddressGetHistory: def get(self, request): address = str(request["params"][0]) + service = self.backend.mempool_service chain = self.backend.blockchain txpool = self.backend.transaction_pool - membuf = self.backend.pool_buffer - history.payment_history(chain, txpool, membuf, address, - bind(self.respond, _1, request)) + memory_buff = self.backend.memory_buffer + history.payment_history(service, chain, txpool, memory_buff, + address, bind(self.respond, _1, _2, request)) - def respond(self, result, request): - if result is None: + def respond(self, ec, result, request): + if ec: response = {"id": request["id"], "result": None, - "error": {"message": "Error", "code": -4}} + "error": {"message": str(ec), "code": -4}} else: response = {"id": request["id"], "result": result, "error": None} self.processor.push_response(response) @@ -269,20 +291,20 @@ class AddressSubscribe: self.cache = cache self.monitor = monitor - self.backend.pool_buffer.cheat = self - def subscribe(self, request): address = str(request["params"][0]) + service = self.backend.mempool_service chain = self.backend.blockchain txpool = self.backend.transaction_pool - membuf = self.backend.pool_buffer - history.payment_history(chain, txpool, membuf, address, - bind(self.construct, _1, request)) + memory_buff = self.backend.memory_buffer + history.payment_history(service, chain, txpool, memory_buff, + address, bind(self.construct, _1, _2, request)) - def construct(self, result, request): - if result is None: + def construct(self, ec, result, request): + if ec: response = {"id": request["id"], "result": None, - "error": {"message": "Error", "code": -4}} + "error": {"message": str(ec), "code": -4}} + self.processor.push_response(response) return last_id = self.monitor.mempool_n(result) response = {"id": request["id"], "result": last_id, "error": None} @@ -306,9 +328,8 @@ class BlockchainProcessor(Processor): def __init__(self, config): Processor.__init__(self) cache = HistoryCache() - monitor = MonitorAddress(self, cache) - self.backend = Backend(monitor) - monitor.backend = self.backend + self.backend = Backend() + monitor = MonitorAddress(self, cache, self.backend) self.numblocks_subscribe = NumblocksSubscribe(self.backend, self) self.address_get_history = AddressGetHistory(self.backend, self) self.address_subscribe = \ diff --git a/backends/libbitcoin/h1.py b/backends/libbitcoin/h1.py index 659a3cf..460ba53 100644 --- a/backends/libbitcoin/h1.py +++ b/backends/libbitcoin/h1.py @@ -6,13 +6,17 @@ def blockchain_started(ec, chain): def finish(ec, result): print "Finish:", ec - print result + for line in result: + for k, v in line.iteritems(): + begin = k + ":" + print begin, " " * (12 - len(begin)), v + print a = bitcoin.async_service(1) chain = bitcoin.bdb_blockchain(a, "/home/genjix/libbitcoin/database", blockchain_started) txpool = bitcoin.transaction_pool(a, chain) -address = "1Jqu2PVGDvNv4La113hgCJsvRUCDb3W65D" +address = "1FpES68UNcxnXeoaFciqvUSGiKGZ33gbfQ" history.payment_history(a, chain, txpool, address, finish) raw_input() diff --git a/backends/libbitcoin/history.cpp b/backends/libbitcoin/history.cpp index d146a86..a1b5bbc 100644 --- a/backends/libbitcoin/history.cpp +++ b/backends/libbitcoin/history.cpp @@ -7,7 +7,7 @@ using namespace libbitcoin; #include namespace python = boost::python; -#include "/home/genjix/python-bitcoin/src/primitive.h" +#include "memory_buffer.hpp" namespace ph = std::placeholders; @@ -68,8 +68,8 @@ class query_history : public std::enable_shared_from_this { public: - query_history(async_service& service, - blockchain_ptr chain, transaction_pool_ptr txpool) + query_history(async_service& service, blockchain_ptr chain, + transaction_pool_ptr txpool, memory_buffer_ptr membuf) : strand_(service.get_service()), chain_(chain), txpool_(txpool), stopped_(false) { @@ -158,7 +158,7 @@ private: BITCOIN_ASSERT(entry->loaded_input && entry->loaded_output); // value of the input is simply the inverse of // the corresponding output - entry->loaded_input->value = -entry->loaded_output->value; + entry->loaded_input->value = entry->loaded_output->value; // Unspent outputs have a raw_output_script field // Blank this field as it isn't used entry->loaded_output->raw_output_script.clear(); @@ -351,7 +351,8 @@ void write_info(std::string& json, info_unit_ptr info) << "\"tx_hash\": \"" << pretty_hex(info->tx_hash) << "\"," << "\"block_hash\": \"" << pretty_hex(info->block_hash) << "\"," << "\"index\": " << info->index << "," - << "\"value\": " << info->value << "," + // x for received, and -x for sent amounts + << "\"value\": " << (info->is_input ? "-" : "") << info->value << "," << "\"height\": " << info->height << "," << "\"timestamp\": " << info->timestamp << "," << "\"is_input\": " << info->is_input << ","; @@ -378,6 +379,12 @@ void keep_query_alive_proxy(const std::error_code& ec, auto entry = *it; BITCOIN_ASSERT(entry->loaded_output); write_info(json, entry->loaded_output); + if (entry->input_exists) + { + BITCOIN_ASSERT(entry->loaded_input); + json += ","; + write_info(json, entry->loaded_input); + } } json += "]"; pyfunction f(handle_finish); @@ -385,11 +392,11 @@ void keep_query_alive_proxy(const std::error_code& ec, } void payment_history(async_service_ptr service, blockchain_ptr chain, - transaction_pool_ptr txpool, const std::string& address, - python::object handle_finish) + transaction_pool_ptr txpool, memory_buffer_ptr membuf, + const std::string& address, python::object handle_finish) { query_history_ptr history = - std::make_shared(*service, chain, txpool); + std::make_shared(*service, chain, txpool, membuf); history->start(address, std::bind(keep_query_alive_proxy, ph::_1, ph::_2, handle_finish, history)); diff --git a/backends/libbitcoin/history1/__init__.py b/backends/libbitcoin/history1/__init__.py index f257a17..25cd9e6 100644 --- a/backends/libbitcoin/history1/__init__.py +++ b/backends/libbitcoin/history1/__init__.py @@ -5,8 +5,8 @@ import json def wrap_finish(handle_finish, ec, result_json): handle_finish(ec, json.loads(result_json)) -def payment_history(service, chain, txpool, address, finish): +def payment_history(service, chain, txpool, membuf, address, finish): _history.payment_history(service.internal_ptr, chain.internal_ptr, - txpool.internal_ptr, address, - bind(wrap_finish, finish, _1, _2)) + txpool.internal_ptr, membuf.internal_ptr, + address, bind(wrap_finish, finish, _1, _2)) diff --git a/backends/libbitcoin/membuf.cpp b/backends/libbitcoin/membuf.cpp new file mode 100644 index 0000000..07c09d1 --- /dev/null +++ b/backends/libbitcoin/membuf.cpp @@ -0,0 +1,40 @@ +#include +namespace python = boost::python; + +#include "memory_buffer.hpp" + +struct memory_buffer_wrapper +{ + memory_buffer_wrapper(async_service_ptr service, blockchain_ptr chain, + transaction_pool_ptr txpool) + { + membuf = std::make_shared(service, chain, txpool); + } + + void set_handles(python::object handle_tx_stored, + python::object handle_tx_confirmed) + { + membuf->set_handles(handle_tx_stored, handle_tx_confirmed); + } + + void receive(const message::transaction& tx, + python::object handle_receive) + { + membuf->receive(tx, handle_receive); + } + + memory_buffer_ptr membuf; +}; + +BOOST_PYTHON_MODULE(membuf) +{ + using namespace boost::python; + class_("memory_buffer", init< + async_service_ptr, blockchain_ptr, transaction_pool_ptr>()) + .def("receive", &memory_buffer_wrapper::receive) + .def("set_handles", &memory_buffer_wrapper::set_handles) + .def_readonly("internal_ptr", &memory_buffer_wrapper::membuf) + ; + class_("internal_memory_buffer", no_init); +} + diff --git a/backends/libbitcoin/memory_buffer.hpp b/backends/libbitcoin/memory_buffer.hpp new file mode 100644 index 0000000..9d7621c --- /dev/null +++ b/backends/libbitcoin/memory_buffer.hpp @@ -0,0 +1,205 @@ +#include +using namespace libbitcoin; + +#include "/home/genjix/python-bitcoin/src/primitive.h" + +namespace ph = std::placeholders; + +int cmp(const bc::message::output_point& a, + const bc::message::output_point& b) +{ + if (a.index < b.index) + return -1; + else if (a.index > b.index) + return 1; + // a.index == b.index + if (a.hash < b.hash) + return -1; + else if (a.hash > b.hash) + return 1; + return 0; +} + +struct outpoint_less_cmp +{ + bool operator()(const bc::message::output_point& a, + const bc::message::output_point& b) + { + return cmp(a, b) == -1; + } +}; + +struct address_less_cmp +{ + bool operator()(const payment_address& a, const payment_address& b) + { + if (a.hash() < b.hash()) + return true; + else if (a.hash() > b.hash()) + return false; + if (a.version() < b.version()) + return true; + return false; + } +}; + +class memory_buffer + : public std::enable_shared_from_this +{ +public: + struct check_item + { + hash_digest tx_hash; + size_t index; + bool is_input; + uint64_t timestamp; + // Convenient storage + message::output_point previous_output; + }; + typedef std::vector check_result; + typedef std::shared_ptr check_result_ptr; + + typedef std::function< + void (const std::error_code&)> receive_handler; + typedef std::function< + void (const std::error_code&, check_result_ptr)> check_handler; + + memory_buffer(async_service_ptr service, blockchain_ptr chain, + transaction_pool_ptr txpool) + : strand_(service->get_service()), chain_(chain), txpool_(txpool) + { + } + + void set_handles(python::object handle_tx_stored, + python::object handle_tx_confirmed) + { + auto this_ptr = shared_from_this(); + strand_.post( + [&, this_ptr, handle_tx_stored, handle_tx_confirmed]() + { + handle_tx_stored_ = handle_tx_stored; + handle_tx_confirmed_ = handle_tx_confirmed; + }); + } + + void receive(const message::transaction& tx, + python::object handle_receive) + { + txpool_->store(tx, + strand_.wrap(std::bind(&memory_buffer::confirmed, + shared_from_this(), ph::_1, tx)), + strand_.wrap(std::bind(&memory_buffer::stored, + shared_from_this(), ph::_1, tx, handle_receive))); + } + + void check(const message::output_point_list& output_points, + const std::string& address, check_handler handle_check) + { + } + +private: + void stored(const std::error_code& ec, const message::transaction& tx, + python::object handle_receive) + { + if (ec) + { + pyfunction f(handle_receive); + f(ec); + return; + } + const hash_digest& tx_hash = hash_transaction(tx); + for (uint32_t input_index = 0; + input_index < tx.inputs.size(); ++input_index) + { + const auto& prevout = tx.inputs[input_index].previous_output; + lookup_input_[prevout] = + message::input_point{tx_hash, input_index}; + } + for (uint32_t output_index = 0; + output_index < tx.outputs.size(); ++output_index) + { + payment_address address; + if (extract(address, tx.outputs[output_index].output_script)) + { + lookup_address_.insert( + std::make_pair(address, + message::output_point{tx_hash, output_index})); + } + } + timestamps_[tx_hash] = time(nullptr); + pyfunction f(handle_receive); + f(std::error_code()); + // tx stored + if (!handle_tx_stored_.is_none()) + { + pyfunction g(handle_tx_stored_); + g(tx); + } + } + + void confirmed(const std::error_code& ec, const message::transaction& tx) + { + const hash_digest& tx_hash = hash_transaction(tx); + if (ec) + { + std::cerr << "Problem confirming transaction " + << pretty_hex(tx_hash) << " : " << ec.message() << std::endl; + return; + } + std::cout << "Confirmed " << pretty_hex(tx_hash) << std::endl; + for (uint32_t input_index = 0; + input_index < tx.inputs.size(); ++input_index) + { + const auto& prevout = tx.inputs[input_index].previous_output; + auto it = lookup_input_.find(prevout); + BITCOIN_ASSERT(it != lookup_input_.end()); + BITCOIN_ASSERT((it->second == + message::input_point{tx_hash, input_index})); + lookup_input_.erase(it); + } + for (uint32_t output_index = 0; + output_index < tx.outputs.size(); ++output_index) + { + message::output_point outpoint{tx_hash, output_index}; + payment_address address; + if (extract(address, tx.outputs[output_index].output_script)) + { + auto range = lookup_address_.equal_range(address); + auto it = range.first; + for (; it != range.second; ++it) + { + if (it->second == outpoint) + { + lookup_address_.erase(it); + break; + } + } + BITCOIN_ASSERT(it != range.second); + } + } + auto time_it = timestamps_.find(tx_hash); + BITCOIN_ASSERT(time_it != timestamps_.end()); + timestamps_.erase(time_it); + // tx_confirmed + if (!handle_tx_stored_.is_none()) + { + pyfunction f(handle_tx_confirmed_); + f(tx); + } + } + + io_service::strand strand_; + blockchain_ptr chain_; + transaction_pool_ptr txpool_; + + std::map lookup_input_; + std::multimap lookup_address_; + std::map timestamps_; + + python::object handle_tx_stored_, handle_tx_confirmed_; +}; + +typedef std::shared_ptr memory_buffer_ptr; + diff --git a/processor.py b/processor.py index 446c37f..42b22d0 100644 --- a/processor.py +++ b/processor.py @@ -41,7 +41,7 @@ class Processor(threading.Thread): def push_response(self, response): print "response", response - #self.dispatcher.request_dispatcher.push_response(response) + self.dispatcher.request_dispatcher.push_response(response) -- 1.7.1