CC = g++ -fPIC -Wall -ansi `pkg-config --cflags libbitcoin` -I/usr/include/python2.7
+membuf:
+ $(CC) -c membuf.cpp -o membuf.o
+ $(CC) -shared -Wl,-soname,membuf.so membuf.o -lpython2.7 -lboost_python `pkg-config --libs libbitcoin` -lboost_thread -o membuf.so
+
default:
$(CC) -c history.cpp -o history.o
$(CC) -shared -Wl,-soname,_history.so history.o -lpython2.7 -lboost_python `pkg-config --libs libbitcoin` -lboost_thread -o history1/_history.so
import threading
import time
-import history
+import history1 as history
+import membuf
class HistoryCache:
class MonitorAddress:
- def __init__(self, processor, cache):
+ def __init__(self, processor, cache, backend):
self.processor = processor
self.cache = cache
+ self.backend = backend
self.lock = threading.Lock()
# key is hash:index, value is address
self.monitor_output = {}
# affected
self.affected = {}
+ backend.memory_buffer.set_handles(self.tx_stored, self.tx_confirmed)
+
def monitor(self, address, result):
for info in result:
if not info.has_key("raw_output_script"):
with self.lock:
self.monitor_address.add(address)
- def tx_stored(self, tx_desc):
- tx_hash, prevouts, addrs = tx_desc
+ def unpack(self, tx):
+ tx_hash = bitcoin.hash_transaction(tx)
+ previous_outputs = []
+ for input in tx.inputs:
+ prevout = input.previous_output
+ prevout = "%s:%s" % (prevout.hash, prevout.index)
+ previous_outputs.append(prevout)
+ addrs = []
+ for output_index, output in enumerate(tx.outputs):
+ address = bitcoin.payment_address()
+ if address.extract(output.output_script):
+ addrs.append((output_index, str(address)))
+ return tx_hash, previous_outputs, addrs
+
+ def tx_stored(self, tx):
affected_addrs = set()
- for prevout_hash, prevout_index in prevouts:
- prevout = "%s:%s" % (prevout_hash, prevout_index)
+ tx_hash, previous_outputs, addrs = self.unpack(tx)
+ for prevout in previous_outputs:
with self.lock:
if self.monitor_output.has_key(prevout):
affected_addrs.add(self.monitor_output[prevout])
self.notify(affected_addrs)
def tx_confirmed(self, tx_desc):
- tx_hash, prevouts, addrs = tx_desc
+ tx_hash, previous_outputs, addrs = self.unpack(tx)
with self.lock:
affected_addrs = self.affected[tx_hash]
del self.affected[tx_hash]
if address in affected_addrs:
self.monitor_output[outpoint] = address
# delete spent outpoints
- for prevout_hash, prevout_index in prevouts:
- prevout = "%s:%s" % (prevout_hash, prevout_index)
+ for prevout in previous_outputs:
with self.lock:
if self.monitor_output.has_key(prevout):
del self.monitor_output[prevout]
templ_response = {"id": None,
"method": "blockchain.address.subscribe",
"params": []}
+ service = self.backend.mempool_service
chain = self.backend.blockchain
txpool = self.backend.transaction_pool
- membuf = self.backend.pool_buffer
+ memory_buff = self.backend.memory_buffer
for address in affected_addrs:
response = templ_response.copy()
response["params"].append(address)
- history.payment_history(chain, txpool, membuf, address,
- bind(self.send_notify, _1, response))
+ history.payment_history(service, chain, txpool, memory_buff,
+ address, bind(self.send_notify, _1, _2, response))
def mempool_n(self, result):
assert result is not None
last_id = last_info["block_hash"]
return last_id
- def send_notify(self, result, response):
+ def send_notify(self, ec, result, response):
+ if ec:
+ print "Error: Monitor.send_notify()", ec
+ return
response["params"].append(self.mempool_n(result))
self.processor.push_response(response)
class Backend:
- def __init__(self, monitor):
+ def __init__(self):
# Create 3 thread-pools each with 1 thread
self.network_service = bitcoin.async_service(1)
self.disk_service = bitcoin.async_service(1)
self.poller, self.transaction_pool)
self.session.start(self.handle_start)
- self.pool_buffer = \
- history.MemoryPoolBuffer(self.transaction_pool,
- self.blockchain, monitor)
+ self.memory_buffer = \
+ membuf.memory_buffer(self.mempool_service.internal_ptr,
+ self.blockchain.internal_ptr,
+ self.transaction_pool.internal_ptr)
def handle_start(self, ec):
if ec:
print "Error with new transaction:", ec
return
tx_hash = bitcoin.hash_transaction(tx)
- self.pool_buffer.recv_tx(tx, bind(self.store_tx, _1, tx_hash))
+ self.memory_buffer.receive(tx, bind(self.store_tx, _1, tx_hash))
# Re-subscribe to new transactions from node
node.subscribe_transaction(bind(self.recv_tx, _1, _2, node))
def get(self, request):
address = str(request["params"][0])
+ service = self.backend.mempool_service
chain = self.backend.blockchain
txpool = self.backend.transaction_pool
- membuf = self.backend.pool_buffer
- history.payment_history(chain, txpool, membuf, address,
- bind(self.respond, _1, request))
+ memory_buff = self.backend.memory_buffer
+ history.payment_history(service, chain, txpool, memory_buff,
+ address, bind(self.respond, _1, _2, request))
- def respond(self, result, request):
- if result is None:
+ def respond(self, ec, result, request):
+ if ec:
response = {"id": request["id"], "result": None,
- "error": {"message": "Error", "code": -4}}
+ "error": {"message": str(ec), "code": -4}}
else:
response = {"id": request["id"], "result": result, "error": None}
self.processor.push_response(response)
self.cache = cache
self.monitor = monitor
- self.backend.pool_buffer.cheat = self
-
def subscribe(self, request):
address = str(request["params"][0])
+ service = self.backend.mempool_service
chain = self.backend.blockchain
txpool = self.backend.transaction_pool
- membuf = self.backend.pool_buffer
- history.payment_history(chain, txpool, membuf, address,
- bind(self.construct, _1, request))
+ memory_buff = self.backend.memory_buffer
+ history.payment_history(service, chain, txpool, memory_buff,
+ address, bind(self.construct, _1, _2, request))
- def construct(self, result, request):
- if result is None:
+ def construct(self, ec, result, request):
+ if ec:
response = {"id": request["id"], "result": None,
- "error": {"message": "Error", "code": -4}}
+ "error": {"message": str(ec), "code": -4}}
+ self.processor.push_response(response)
return
last_id = self.monitor.mempool_n(result)
response = {"id": request["id"], "result": last_id, "error": None}
def __init__(self, config):
Processor.__init__(self)
cache = HistoryCache()
- monitor = MonitorAddress(self, cache)
- self.backend = Backend(monitor)
- monitor.backend = self.backend
+ self.backend = Backend()
+ monitor = MonitorAddress(self, cache, self.backend)
self.numblocks_subscribe = NumblocksSubscribe(self.backend, self)
self.address_get_history = AddressGetHistory(self.backend, self)
self.address_subscribe = \
def finish(ec, result):
print "Finish:", ec
- print result
+ for line in result:
+ for k, v in line.iteritems():
+ begin = k + ":"
+ print begin, " " * (12 - len(begin)), v
+ print
a = bitcoin.async_service(1)
chain = bitcoin.bdb_blockchain(a, "/home/genjix/libbitcoin/database",
blockchain_started)
txpool = bitcoin.transaction_pool(a, chain)
-address = "1Jqu2PVGDvNv4La113hgCJsvRUCDb3W65D"
+address = "1FpES68UNcxnXeoaFciqvUSGiKGZ33gbfQ"
history.payment_history(a, chain, txpool, address, finish)
raw_input()
#include <boost/python.hpp>
namespace python = boost::python;
-#include "/home/genjix/python-bitcoin/src/primitive.h"
+#include "memory_buffer.hpp"
namespace ph = std::placeholders;
: public std::enable_shared_from_this<query_history>
{
public:
- query_history(async_service& service,
- blockchain_ptr chain, transaction_pool_ptr txpool)
+ query_history(async_service& service, blockchain_ptr chain,
+ transaction_pool_ptr txpool, memory_buffer_ptr membuf)
: strand_(service.get_service()), chain_(chain),
txpool_(txpool), stopped_(false)
{
BITCOIN_ASSERT(entry->loaded_input && entry->loaded_output);
// value of the input is simply the inverse of
// the corresponding output
- entry->loaded_input->value = -entry->loaded_output->value;
+ entry->loaded_input->value = entry->loaded_output->value;
// Unspent outputs have a raw_output_script field
// Blank this field as it isn't used
entry->loaded_output->raw_output_script.clear();
<< "\"tx_hash\": \"" << pretty_hex(info->tx_hash) << "\","
<< "\"block_hash\": \"" << pretty_hex(info->block_hash) << "\","
<< "\"index\": " << info->index << ","
- << "\"value\": " << info->value << ","
+ // x for received, and -x for sent amounts
+ << "\"value\": " << (info->is_input ? "-" : "") << info->value << ","
<< "\"height\": " << info->height << ","
<< "\"timestamp\": " << info->timestamp << ","
<< "\"is_input\": " << info->is_input << ",";
auto entry = *it;
BITCOIN_ASSERT(entry->loaded_output);
write_info(json, entry->loaded_output);
+ if (entry->input_exists)
+ {
+ BITCOIN_ASSERT(entry->loaded_input);
+ json += ",";
+ write_info(json, entry->loaded_input);
+ }
}
json += "]";
pyfunction<const std::error_code&, const std::string&> f(handle_finish);
}
void payment_history(async_service_ptr service, blockchain_ptr chain,
- transaction_pool_ptr txpool, const std::string& address,
- python::object handle_finish)
+ transaction_pool_ptr txpool, memory_buffer_ptr membuf,
+ const std::string& address, python::object handle_finish)
{
query_history_ptr history =
- std::make_shared<query_history>(*service, chain, txpool);
+ std::make_shared<query_history>(*service, chain, txpool, membuf);
history->start(address,
std::bind(keep_query_alive_proxy, ph::_1, ph::_2,
handle_finish, history));
def wrap_finish(handle_finish, ec, result_json):
handle_finish(ec, json.loads(result_json))
-def payment_history(service, chain, txpool, address, finish):
+def payment_history(service, chain, txpool, membuf, address, finish):
_history.payment_history(service.internal_ptr, chain.internal_ptr,
- txpool.internal_ptr, address,
- bind(wrap_finish, finish, _1, _2))
+ txpool.internal_ptr, membuf.internal_ptr,
+ address, bind(wrap_finish, finish, _1, _2))
--- /dev/null
+#include <boost/python.hpp>
+namespace python = boost::python;
+
+#include "memory_buffer.hpp"
+
+struct memory_buffer_wrapper
+{
+ memory_buffer_wrapper(async_service_ptr service, blockchain_ptr chain,
+ transaction_pool_ptr txpool)
+ {
+ membuf = std::make_shared<memory_buffer>(service, chain, txpool);
+ }
+
+ void set_handles(python::object handle_tx_stored,
+ python::object handle_tx_confirmed)
+ {
+ membuf->set_handles(handle_tx_stored, handle_tx_confirmed);
+ }
+
+ void receive(const message::transaction& tx,
+ python::object handle_receive)
+ {
+ membuf->receive(tx, handle_receive);
+ }
+
+ memory_buffer_ptr membuf;
+};
+
+BOOST_PYTHON_MODULE(membuf)
+{
+ using namespace boost::python;
+ class_<memory_buffer_wrapper>("memory_buffer", init<
+ async_service_ptr, blockchain_ptr, transaction_pool_ptr>())
+ .def("receive", &memory_buffer_wrapper::receive)
+ .def("set_handles", &memory_buffer_wrapper::set_handles)
+ .def_readonly("internal_ptr", &memory_buffer_wrapper::membuf)
+ ;
+ class_<memory_buffer_ptr>("internal_memory_buffer", no_init);
+}
+
--- /dev/null
+#include <bitcoin/bitcoin.hpp>
+using namespace libbitcoin;
+
+#include "/home/genjix/python-bitcoin/src/primitive.h"
+
+namespace ph = std::placeholders;
+
+int cmp(const bc::message::output_point& a,
+ const bc::message::output_point& b)
+{
+ if (a.index < b.index)
+ return -1;
+ else if (a.index > b.index)
+ return 1;
+ // a.index == b.index
+ if (a.hash < b.hash)
+ return -1;
+ else if (a.hash > b.hash)
+ return 1;
+ return 0;
+}
+
+struct outpoint_less_cmp
+{
+ bool operator()(const bc::message::output_point& a,
+ const bc::message::output_point& b)
+ {
+ return cmp(a, b) == -1;
+ }
+};
+
+struct address_less_cmp
+{
+ bool operator()(const payment_address& a, const payment_address& b)
+ {
+ if (a.hash() < b.hash())
+ return true;
+ else if (a.hash() > b.hash())
+ return false;
+ if (a.version() < b.version())
+ return true;
+ return false;
+ }
+};
+
+class memory_buffer
+ : public std::enable_shared_from_this<memory_buffer>
+{
+public:
+ struct check_item
+ {
+ hash_digest tx_hash;
+ size_t index;
+ bool is_input;
+ uint64_t timestamp;
+ // Convenient storage
+ message::output_point previous_output;
+ };
+ typedef std::vector<check_item> check_result;
+ typedef std::shared_ptr<check_result> check_result_ptr;
+
+ typedef std::function<
+ void (const std::error_code&)> receive_handler;
+ typedef std::function<
+ void (const std::error_code&, check_result_ptr)> check_handler;
+
+ memory_buffer(async_service_ptr service, blockchain_ptr chain,
+ transaction_pool_ptr txpool)
+ : strand_(service->get_service()), chain_(chain), txpool_(txpool)
+ {
+ }
+
+ void set_handles(python::object handle_tx_stored,
+ python::object handle_tx_confirmed)
+ {
+ auto this_ptr = shared_from_this();
+ strand_.post(
+ [&, this_ptr, handle_tx_stored, handle_tx_confirmed]()
+ {
+ handle_tx_stored_ = handle_tx_stored;
+ handle_tx_confirmed_ = handle_tx_confirmed;
+ });
+ }
+
+ void receive(const message::transaction& tx,
+ python::object handle_receive)
+ {
+ txpool_->store(tx,
+ strand_.wrap(std::bind(&memory_buffer::confirmed,
+ shared_from_this(), ph::_1, tx)),
+ strand_.wrap(std::bind(&memory_buffer::stored,
+ shared_from_this(), ph::_1, tx, handle_receive)));
+ }
+
+ void check(const message::output_point_list& output_points,
+ const std::string& address, check_handler handle_check)
+ {
+ }
+
+private:
+ void stored(const std::error_code& ec, const message::transaction& tx,
+ python::object handle_receive)
+ {
+ if (ec)
+ {
+ pyfunction<const std::error_code&> f(handle_receive);
+ f(ec);
+ return;
+ }
+ const hash_digest& tx_hash = hash_transaction(tx);
+ for (uint32_t input_index = 0;
+ input_index < tx.inputs.size(); ++input_index)
+ {
+ const auto& prevout = tx.inputs[input_index].previous_output;
+ lookup_input_[prevout] =
+ message::input_point{tx_hash, input_index};
+ }
+ for (uint32_t output_index = 0;
+ output_index < tx.outputs.size(); ++output_index)
+ {
+ payment_address address;
+ if (extract(address, tx.outputs[output_index].output_script))
+ {
+ lookup_address_.insert(
+ std::make_pair(address,
+ message::output_point{tx_hash, output_index}));
+ }
+ }
+ timestamps_[tx_hash] = time(nullptr);
+ pyfunction<const std::error_code&> f(handle_receive);
+ f(std::error_code());
+ // tx stored
+ if (!handle_tx_stored_.is_none())
+ {
+ pyfunction<const message::transaction&> g(handle_tx_stored_);
+ g(tx);
+ }
+ }
+
+ void confirmed(const std::error_code& ec, const message::transaction& tx)
+ {
+ const hash_digest& tx_hash = hash_transaction(tx);
+ if (ec)
+ {
+ std::cerr << "Problem confirming transaction "
+ << pretty_hex(tx_hash) << " : " << ec.message() << std::endl;
+ return;
+ }
+ std::cout << "Confirmed " << pretty_hex(tx_hash) << std::endl;
+ for (uint32_t input_index = 0;
+ input_index < tx.inputs.size(); ++input_index)
+ {
+ const auto& prevout = tx.inputs[input_index].previous_output;
+ auto it = lookup_input_.find(prevout);
+ BITCOIN_ASSERT(it != lookup_input_.end());
+ BITCOIN_ASSERT((it->second ==
+ message::input_point{tx_hash, input_index}));
+ lookup_input_.erase(it);
+ }
+ for (uint32_t output_index = 0;
+ output_index < tx.outputs.size(); ++output_index)
+ {
+ message::output_point outpoint{tx_hash, output_index};
+ payment_address address;
+ if (extract(address, tx.outputs[output_index].output_script))
+ {
+ auto range = lookup_address_.equal_range(address);
+ auto it = range.first;
+ for (; it != range.second; ++it)
+ {
+ if (it->second == outpoint)
+ {
+ lookup_address_.erase(it);
+ break;
+ }
+ }
+ BITCOIN_ASSERT(it != range.second);
+ }
+ }
+ auto time_it = timestamps_.find(tx_hash);
+ BITCOIN_ASSERT(time_it != timestamps_.end());
+ timestamps_.erase(time_it);
+ // tx_confirmed
+ if (!handle_tx_stored_.is_none())
+ {
+ pyfunction<const message::transaction&> f(handle_tx_confirmed_);
+ f(tx);
+ }
+ }
+
+ io_service::strand strand_;
+ blockchain_ptr chain_;
+ transaction_pool_ptr txpool_;
+
+ std::map<message::output_point,
+ message::input_point, outpoint_less_cmp> lookup_input_;
+ std::multimap<payment_address,
+ message::output_point, address_less_cmp> lookup_address_;
+ std::map<hash_digest, uint64_t> timestamps_;
+
+ python::object handle_tx_stored_, handle_tx_confirmed_;
+};
+
+typedef std::shared_ptr<memory_buffer> memory_buffer_ptr;
+
def push_response(self, response):
print "response", response
- #self.dispatcher.request_dispatcher.push_response(response)
+ self.dispatcher.request_dispatcher.push_response(response)