C++ core with working memory pool
authorgenjix <fake@lol.u>
Sun, 22 Apr 2012 00:49:59 +0000 (01:49 +0100)
committergenjix <fake@lol.u>
Sun, 22 Apr 2012 00:49:59 +0000 (01:49 +0100)
backends/libbitcoin/Makefile
backends/libbitcoin/__init__.py
backends/libbitcoin/h1.py
backends/libbitcoin/history.cpp
backends/libbitcoin/history1/__init__.py
backends/libbitcoin/membuf.cpp [new file with mode: 0644]
backends/libbitcoin/memory_buffer.hpp [new file with mode: 0644]
processor.py

index 3f44322..96cc1cf 100644 (file)
@@ -1,5 +1,9 @@
 CC = g++ -fPIC -Wall -ansi `pkg-config --cflags libbitcoin` -I/usr/include/python2.7 
 
+membuf:
+       $(CC) -c membuf.cpp -o membuf.o
+       $(CC) -shared -Wl,-soname,membuf.so membuf.o -lpython2.7 -lboost_python `pkg-config --libs libbitcoin` -lboost_thread -o membuf.so
+
 default:
        $(CC) -c history.cpp -o history.o
        $(CC) -shared -Wl,-soname,_history.so history.o -lpython2.7 -lboost_python `pkg-config --libs libbitcoin` -lboost_thread -o history1/_history.so
index cf2a462..8e51169 100644 (file)
@@ -4,7 +4,8 @@ from processor import Processor
 import threading
 import time
 
-import history 
+import history1 as history
+import membuf
 
 class HistoryCache:
 
@@ -31,9 +32,10 @@ class HistoryCache:
 
 class MonitorAddress:
 
-    def __init__(self, processor, cache):
+    def __init__(self, processor, cache, backend):
         self.processor = processor
         self.cache = cache
+        self.backend = backend
         self.lock = threading.Lock()
         # key is hash:index, value is address
         self.monitor_output = {}
@@ -42,6 +44,8 @@ class MonitorAddress:
         # affected
         self.affected = {}
 
+        backend.memory_buffer.set_handles(self.tx_stored, self.tx_confirmed)
+
     def monitor(self, address, result):
         for info in result:
             if not info.has_key("raw_output_script"):
@@ -55,11 +59,24 @@ class MonitorAddress:
         with self.lock:
             self.monitor_address.add(address)
 
-    def tx_stored(self, tx_desc):
-        tx_hash, prevouts, addrs = tx_desc
+    def unpack(self, tx):
+        tx_hash = bitcoin.hash_transaction(tx)
+        previous_outputs = []
+        for input in tx.inputs:
+            prevout = input.previous_output
+            prevout = "%s:%s" % (prevout.hash, prevout.index)
+            previous_outputs.append(prevout)
+        addrs = []
+        for output_index, output in enumerate(tx.outputs):
+            address = bitcoin.payment_address()
+            if address.extract(output.output_script):
+                addrs.append((output_index, str(address)))
+        return tx_hash, previous_outputs, addrs
+
+    def tx_stored(self, tx):
         affected_addrs = set()
-        for prevout_hash, prevout_index in prevouts:
-            prevout = "%s:%s" % (prevout_hash, prevout_index)
+        tx_hash, previous_outputs, addrs = self.unpack(tx)
+        for prevout in previous_outputs:
             with self.lock:
                 if self.monitor_output.has_key(prevout):
                     affected_addrs.add(self.monitor_output[prevout])
@@ -73,7 +90,7 @@ class MonitorAddress:
         self.notify(affected_addrs)
 
     def tx_confirmed(self, tx_desc):
-        tx_hash, prevouts, addrs = tx_desc
+        tx_hash, previous_outputs, addrs = self.unpack(tx)
         with self.lock:
             affected_addrs = self.affected[tx_hash]
             del self.affected[tx_hash]
@@ -86,8 +103,7 @@ class MonitorAddress:
                 if address in affected_addrs:
                     self.monitor_output[outpoint] = address
         # delete spent outpoints
-        for prevout_hash, prevout_index in prevouts:
-            prevout = "%s:%s" % (prevout_hash, prevout_index)
+        for prevout in previous_outputs:
             with self.lock:
                 if self.monitor_output.has_key(prevout):
                     del self.monitor_output[prevout]
@@ -96,14 +112,15 @@ class MonitorAddress:
         templ_response = {"id": None,
                           "method": "blockchain.address.subscribe",
                           "params": []}
+        service = self.backend.mempool_service
         chain = self.backend.blockchain
         txpool = self.backend.transaction_pool
-        membuf = self.backend.pool_buffer
+        memory_buff = self.backend.memory_buffer
         for address in affected_addrs:
             response = templ_response.copy()
             response["params"].append(address)
-            history.payment_history(chain, txpool, membuf, address,
-                bind(self.send_notify, _1, response))
+            history.payment_history(service, chain, txpool, memory_buff,
+                address, bind(self.send_notify, _1, _2, response))
 
     def mempool_n(self, result):
         assert result is not None
@@ -118,13 +135,16 @@ class MonitorAddress:
             last_id = last_info["block_hash"]
         return last_id
 
-    def send_notify(self, result, response):
+    def send_notify(self, ec, result, response):
+        if ec:
+            print "Error: Monitor.send_notify()", ec
+            return
         response["params"].append(self.mempool_n(result))
         self.processor.push_response(response)
 
 class Backend:
 
-    def __init__(self, monitor):
+    def __init__(self):
         # Create 3 thread-pools each with 1 thread
         self.network_service = bitcoin.async_service(1)
         self.disk_service = bitcoin.async_service(1)
@@ -150,9 +170,10 @@ class Backend:
                             self.poller, self.transaction_pool)
         self.session.start(self.handle_start)
 
-        self.pool_buffer = \
-            history.MemoryPoolBuffer(self.transaction_pool,
-                                     self.blockchain, monitor)
+        self.memory_buffer = \
+            membuf.memory_buffer(self.mempool_service.internal_ptr,
+                                 self.blockchain.internal_ptr,
+                                 self.transaction_pool.internal_ptr)
 
     def handle_start(self, ec):
         if ec:
@@ -183,7 +204,7 @@ class Backend:
             print "Error with new transaction:", ec
             return
         tx_hash = bitcoin.hash_transaction(tx)
-        self.pool_buffer.recv_tx(tx, bind(self.store_tx, _1, tx_hash))
+        self.memory_buffer.receive(tx, bind(self.store_tx, _1, tx_hash))
         # Re-subscribe to new transactions from node
         node.subscribe_transaction(bind(self.recv_tx, _1, _2, node))
 
@@ -247,16 +268,17 @@ class AddressGetHistory:
 
     def get(self, request):
         address = str(request["params"][0])
+        service = self.backend.mempool_service
         chain = self.backend.blockchain
         txpool = self.backend.transaction_pool
-        membuf = self.backend.pool_buffer
-        history.payment_history(chain, txpool, membuf, address,
-            bind(self.respond, _1, request))
+        memory_buff = self.backend.memory_buffer
+        history.payment_history(service, chain, txpool, memory_buff,
+            address, bind(self.respond, _1, _2, request))
 
-    def respond(self, result, request):
-        if result is None:
+    def respond(self, ec, result, request):
+        if ec:
             response = {"id": request["id"], "result": None,
-                        "error": {"message": "Error", "code": -4}}
+                        "error": {"message": str(ec), "code": -4}}
         else:
             response = {"id": request["id"], "result": result, "error": None}
         self.processor.push_response(response)
@@ -269,20 +291,20 @@ class AddressSubscribe:
         self.cache = cache
         self.monitor = monitor
 
-        self.backend.pool_buffer.cheat = self
-
     def subscribe(self, request):
         address = str(request["params"][0])
+        service = self.backend.mempool_service
         chain = self.backend.blockchain
         txpool = self.backend.transaction_pool
-        membuf = self.backend.pool_buffer
-        history.payment_history(chain, txpool, membuf, address,
-            bind(self.construct, _1, request))
+        memory_buff = self.backend.memory_buffer
+        history.payment_history(service, chain, txpool, memory_buff,
+            address, bind(self.construct, _1, _2, request))
 
-    def construct(self, result, request):
-        if result is None:
+    def construct(self, ec, result, request):
+        if ec:
             response = {"id": request["id"], "result": None,
-                        "error": {"message": "Error", "code": -4}}
+                        "error": {"message": str(ec), "code": -4}}
+            self.processor.push_response(response)
             return
         last_id = self.monitor.mempool_n(result)
         response = {"id": request["id"], "result": last_id, "error": None}
@@ -306,9 +328,8 @@ class BlockchainProcessor(Processor):
     def __init__(self, config):
         Processor.__init__(self)
         cache = HistoryCache()
-        monitor = MonitorAddress(self, cache)
-        self.backend = Backend(monitor)
-        monitor.backend = self.backend
+        self.backend = Backend()
+        monitor = MonitorAddress(self, cache, self.backend)
         self.numblocks_subscribe = NumblocksSubscribe(self.backend, self)
         self.address_get_history = AddressGetHistory(self.backend, self)
         self.address_subscribe = \
index 659a3cf..460ba53 100644 (file)
@@ -6,13 +6,17 @@ def blockchain_started(ec, chain):
 
 def finish(ec, result):
     print "Finish:", ec
-    print result
+    for line in result:
+        for k, v in line.iteritems():
+            begin = k + ":"
+            print begin, " " * (12 - len(begin)), v
+        print
 
 a = bitcoin.async_service(1)
 chain = bitcoin.bdb_blockchain(a, "/home/genjix/libbitcoin/database",
                                blockchain_started)
 txpool = bitcoin.transaction_pool(a, chain)
-address = "1Jqu2PVGDvNv4La113hgCJsvRUCDb3W65D"
+address = "1FpES68UNcxnXeoaFciqvUSGiKGZ33gbfQ"
 history.payment_history(a, chain, txpool, address, finish)
 raw_input()
 
index d146a86..a1b5bbc 100644 (file)
@@ -7,7 +7,7 @@ using namespace libbitcoin;
 #include <boost/python.hpp>
 namespace python = boost::python;
 
-#include "/home/genjix/python-bitcoin/src/primitive.h"
+#include "memory_buffer.hpp"
 
 namespace ph = std::placeholders;
 
@@ -68,8 +68,8 @@ class query_history
   : public std::enable_shared_from_this<query_history>
 {
 public:
-    query_history(async_service& service,
-        blockchain_ptr chain, transaction_pool_ptr txpool)
+    query_history(async_service& service, blockchain_ptr chain,
+        transaction_pool_ptr txpool, memory_buffer_ptr membuf)
       : strand_(service.get_service()), chain_(chain),
         txpool_(txpool), stopped_(false)
     {
@@ -158,7 +158,7 @@ private:
                 BITCOIN_ASSERT(entry->loaded_input && entry->loaded_output);
                 // value of the input is simply the inverse of
                 // the corresponding output
-                entry->loaded_input->value = -entry->loaded_output->value;
+                entry->loaded_input->value = entry->loaded_output->value;
                 // Unspent outputs have a raw_output_script field
                 // Blank this field as it isn't used
                 entry->loaded_output->raw_output_script.clear();
@@ -351,7 +351,8 @@ void write_info(std::string& json, info_unit_ptr info)
         << "\"tx_hash\": \"" << pretty_hex(info->tx_hash) << "\","
         << "\"block_hash\": \"" << pretty_hex(info->block_hash) << "\","
         << "\"index\": " << info->index << ","
-        << "\"value\": " << info->value << ","
+        // x for received, and -x for sent amounts
+        << "\"value\": " << (info->is_input ? "-" : "") << info->value << ","
         << "\"height\": " << info->height << ","
         << "\"timestamp\": " << info->timestamp << ","
         << "\"is_input\": " << info->is_input << ",";
@@ -378,6 +379,12 @@ void keep_query_alive_proxy(const std::error_code& ec,
         auto entry = *it;
         BITCOIN_ASSERT(entry->loaded_output);
         write_info(json, entry->loaded_output);
+        if (entry->input_exists)
+        {
+            BITCOIN_ASSERT(entry->loaded_input);
+            json += ",";
+            write_info(json, entry->loaded_input);
+        }
     }
     json += "]";
     pyfunction<const std::error_code&, const std::string&> f(handle_finish);
@@ -385,11 +392,11 @@ void keep_query_alive_proxy(const std::error_code& ec,
 }
 
 void payment_history(async_service_ptr service, blockchain_ptr chain,
-    transaction_pool_ptr txpool, const std::string& address,
-    python::object handle_finish)
+    transaction_pool_ptr txpool, memory_buffer_ptr membuf,
+    const std::string& address, python::object handle_finish)
 {
     query_history_ptr history =
-        std::make_shared<query_history>(*service, chain, txpool);
+        std::make_shared<query_history>(*service, chain, txpool, membuf);
     history->start(address,
         std::bind(keep_query_alive_proxy, ph::_1, ph::_2,
             handle_finish, history));
index f257a17..25cd9e6 100644 (file)
@@ -5,8 +5,8 @@ import json
 def wrap_finish(handle_finish, ec, result_json):
     handle_finish(ec, json.loads(result_json))
 
-def payment_history(service, chain, txpool, address, finish):
+def payment_history(service, chain, txpool, membuf, address, finish):
     _history.payment_history(service.internal_ptr, chain.internal_ptr,
-                             txpool.internal_ptr, address,
-                             bind(wrap_finish, finish, _1, _2))
+                             txpool.internal_ptr, membuf.internal_ptr,
+                             address, bind(wrap_finish, finish, _1, _2))
 
diff --git a/backends/libbitcoin/membuf.cpp b/backends/libbitcoin/membuf.cpp
new file mode 100644 (file)
index 0000000..07c09d1
--- /dev/null
@@ -0,0 +1,40 @@
+#include <boost/python.hpp>
+namespace python = boost::python;
+
+#include "memory_buffer.hpp"
+
+struct memory_buffer_wrapper
+{
+    memory_buffer_wrapper(async_service_ptr service, blockchain_ptr chain,
+        transaction_pool_ptr txpool)
+    {
+        membuf = std::make_shared<memory_buffer>(service, chain, txpool);
+    }
+
+    void set_handles(python::object handle_tx_stored,
+        python::object handle_tx_confirmed)
+    {
+        membuf->set_handles(handle_tx_stored, handle_tx_confirmed);
+    }
+
+    void receive(const message::transaction& tx,
+        python::object handle_receive)
+    {
+        membuf->receive(tx, handle_receive);
+    }
+
+    memory_buffer_ptr membuf;
+};
+
+BOOST_PYTHON_MODULE(membuf)
+{
+    using namespace boost::python;
+    class_<memory_buffer_wrapper>("memory_buffer", init<
+            async_service_ptr, blockchain_ptr, transaction_pool_ptr>())
+        .def("receive", &memory_buffer_wrapper::receive)
+        .def("set_handles", &memory_buffer_wrapper::set_handles)
+        .def_readonly("internal_ptr", &memory_buffer_wrapper::membuf)
+    ;
+    class_<memory_buffer_ptr>("internal_memory_buffer", no_init);
+}
+
diff --git a/backends/libbitcoin/memory_buffer.hpp b/backends/libbitcoin/memory_buffer.hpp
new file mode 100644 (file)
index 0000000..9d7621c
--- /dev/null
@@ -0,0 +1,205 @@
+#include <bitcoin/bitcoin.hpp>
+using namespace libbitcoin;
+
+#include "/home/genjix/python-bitcoin/src/primitive.h"
+
+namespace ph = std::placeholders;
+
+int cmp(const bc::message::output_point& a,
+    const bc::message::output_point& b)
+{
+    if (a.index < b.index)
+        return -1;
+    else if (a.index > b.index)
+        return 1;
+    // a.index == b.index
+    if (a.hash < b.hash)
+        return -1;
+    else if (a.hash > b.hash)
+        return 1;
+    return 0;
+}
+
+struct outpoint_less_cmp
+{
+    bool operator()(const bc::message::output_point& a,
+        const bc::message::output_point& b)
+    {
+        return cmp(a, b) == -1;
+    }
+};
+
+struct address_less_cmp
+{
+    bool operator()(const payment_address& a, const payment_address& b)
+    {
+        if (a.hash() < b.hash())
+            return true;
+        else if (a.hash() > b.hash())
+            return false;
+        if (a.version() < b.version())
+            return true;
+        return false;
+    }
+};
+
+class memory_buffer
+  : public std::enable_shared_from_this<memory_buffer>
+{
+public:
+    struct check_item
+    {
+        hash_digest tx_hash;
+        size_t index;
+        bool is_input;
+        uint64_t timestamp;
+        // Convenient storage
+        message::output_point previous_output;
+    };
+    typedef std::vector<check_item> check_result;
+    typedef std::shared_ptr<check_result> check_result_ptr;
+
+    typedef std::function<
+        void (const std::error_code&)> receive_handler;
+    typedef std::function<
+        void (const std::error_code&, check_result_ptr)> check_handler;
+
+    memory_buffer(async_service_ptr service, blockchain_ptr chain,
+        transaction_pool_ptr txpool)
+      : strand_(service->get_service()), chain_(chain), txpool_(txpool)
+    {
+    }
+
+    void set_handles(python::object handle_tx_stored,
+        python::object handle_tx_confirmed)
+    {
+        auto this_ptr = shared_from_this();
+        strand_.post(
+            [&, this_ptr, handle_tx_stored, handle_tx_confirmed]()
+            {
+                handle_tx_stored_ = handle_tx_stored;
+                handle_tx_confirmed_ = handle_tx_confirmed;
+            });
+    }
+
+    void receive(const message::transaction& tx,
+        python::object handle_receive)
+    {
+        txpool_->store(tx,
+            strand_.wrap(std::bind(&memory_buffer::confirmed,
+                shared_from_this(), ph::_1, tx)),
+            strand_.wrap(std::bind(&memory_buffer::stored,
+                shared_from_this(), ph::_1, tx, handle_receive)));
+    }
+
+    void check(const message::output_point_list& output_points,
+        const std::string& address, check_handler handle_check)
+    {
+    }
+
+private:
+    void stored(const std::error_code& ec, const message::transaction& tx,
+        python::object handle_receive)
+    {
+        if (ec)
+        {
+            pyfunction<const std::error_code&> f(handle_receive);
+            f(ec);
+            return;
+        }
+        const hash_digest& tx_hash = hash_transaction(tx);
+        for (uint32_t input_index = 0;
+            input_index < tx.inputs.size(); ++input_index)
+        {
+            const auto& prevout = tx.inputs[input_index].previous_output;
+            lookup_input_[prevout] =
+                message::input_point{tx_hash, input_index};
+        }
+        for (uint32_t output_index = 0;
+            output_index < tx.outputs.size(); ++output_index)
+        {
+            payment_address address;
+            if (extract(address, tx.outputs[output_index].output_script))
+            {
+                lookup_address_.insert(
+                    std::make_pair(address,
+                        message::output_point{tx_hash, output_index}));
+            }
+        }
+        timestamps_[tx_hash] = time(nullptr);
+        pyfunction<const std::error_code&> f(handle_receive);
+        f(std::error_code());
+        // tx stored
+        if (!handle_tx_stored_.is_none())
+        {
+            pyfunction<const message::transaction&> g(handle_tx_stored_);
+            g(tx);
+        }
+    }
+
+    void confirmed(const std::error_code& ec, const message::transaction& tx)
+    {
+        const hash_digest& tx_hash = hash_transaction(tx);
+        if (ec)
+        {
+            std::cerr << "Problem confirming transaction "
+                << pretty_hex(tx_hash) << " : " << ec.message() << std::endl;
+            return;
+        }
+        std::cout << "Confirmed " << pretty_hex(tx_hash) << std::endl;
+        for (uint32_t input_index = 0;
+            input_index < tx.inputs.size(); ++input_index)
+        {
+            const auto& prevout = tx.inputs[input_index].previous_output;
+            auto it = lookup_input_.find(prevout);
+            BITCOIN_ASSERT(it != lookup_input_.end());
+            BITCOIN_ASSERT((it->second ==
+                message::input_point{tx_hash, input_index}));
+            lookup_input_.erase(it);
+        }
+        for (uint32_t output_index = 0;
+            output_index < tx.outputs.size(); ++output_index)
+        {
+            message::output_point outpoint{tx_hash, output_index};
+            payment_address address;
+            if (extract(address, tx.outputs[output_index].output_script))
+            {
+                auto range = lookup_address_.equal_range(address);
+                auto it = range.first;
+                for (; it != range.second; ++it)
+                {
+                    if (it->second == outpoint)
+                    {
+                        lookup_address_.erase(it);
+                        break;
+                    }
+                }
+                BITCOIN_ASSERT(it != range.second);
+            }
+        }
+        auto time_it = timestamps_.find(tx_hash);
+        BITCOIN_ASSERT(time_it != timestamps_.end());
+        timestamps_.erase(time_it);
+        // tx_confirmed
+        if (!handle_tx_stored_.is_none())
+        {
+            pyfunction<const message::transaction&> f(handle_tx_confirmed_);
+            f(tx);
+        }
+    }
+
+    io_service::strand strand_;
+    blockchain_ptr chain_;
+    transaction_pool_ptr txpool_;
+
+    std::map<message::output_point,
+        message::input_point, outpoint_less_cmp> lookup_input_;
+    std::multimap<payment_address,
+        message::output_point, address_less_cmp> lookup_address_;
+    std::map<hash_digest, uint64_t> timestamps_;
+
+    python::object handle_tx_stored_, handle_tx_confirmed_;
+};
+
+typedef std::shared_ptr<memory_buffer> memory_buffer_ptr;
+
index 446c37f..42b22d0 100644 (file)
@@ -41,7 +41,7 @@ class Processor(threading.Thread):
 
     def push_response(self, response):
         print "response", response
-        #self.dispatcher.request_dispatcher.push_response(response)
+        self.dispatcher.request_dispatcher.push_response(response)