-import bitcoin
-from bitcoin import bind, _1, _2, _3
-from processor import Processor
import threading
import time
+import bitcoin
+from bitcoin import bind, _1, _2, _3
+
+from processor import Processor
import history1 as history
import membuf
+
class HistoryCache:
def __init__(self):
def clear(self, addresses):
with self.lock:
for address in addresses:
- if self.cache.has_key(address):
+ if address in self.cache:
del self.cache[address]
+
class MonitorAddress:
def __init__(self, processor, cache, backend):
self.monitor_output = {}
# key is address
self.monitor_address = set()
- # affected
- self.affected = {}
backend.memory_buffer.set_handles(self.tx_stored, self.tx_confirmed)
def monitor(self, address, result):
for info in result:
- if not info.has_key("raw_output_script"):
+ if "raw_output_script" not in info:
continue
assert info["is_input"] == 0
tx_hash = info["tx_hash"]
addrs.append((output_index, str(address)))
return tx_hash, previous_outputs, addrs
- def tx_stored(self, tx):
+ def effect_notify(self, tx, delete_outs):
affected_addrs = set()
tx_hash, previous_outputs, addrs = self.unpack(tx)
for prevout in previous_outputs:
- with self.lock:
- if self.monitor_output.has_key(prevout):
+ try:
+ with self.lock:
affected_addrs.add(self.monitor_output[prevout])
+ if delete_outs:
+ del self.monitor_output[prevout]
+ except KeyError:
+ pass
for idx, address in addrs:
with self.lock:
if address in self.monitor_address:
affected_addrs.add(address)
- with self.lock:
- self.affected[tx_hash] = affected_addrs
self.cache.clear(affected_addrs)
self.notify(affected_addrs)
+ # Used in confirmed txs
+ return tx_hash, addrs, affected_addrs
- def tx_confirmed(self, tx_desc):
- tx_hash, previous_outputs, addrs = self.unpack(tx)
- with self.lock:
- affected_addrs = self.affected[tx_hash]
- del self.affected[tx_hash]
- self.cache.clear(affected_addrs)
- self.notify(affected_addrs)
+ def tx_stored(self, tx):
+ self.effect_notify(tx, False)
+
+ def tx_confirmed(self, tx):
+ tx_hash, addrs, affected_addrs = self.effect_notify(tx, True)
# add new outputs to monitor
for idx, address in addrs:
outpoint = "%s:%s" % (tx_hash, idx)
- with self.lock:
- if address in affected_addrs:
+ if address in affected_addrs:
+ with self.lock:
self.monitor_output[outpoint] = address
- # delete spent outpoints
- for prevout in previous_outputs:
- with self.lock:
- if self.monitor_output.has_key(prevout):
- del self.monitor_output[prevout]
def notify(self, affected_addrs):
- templ_response = {"id": None,
- "method": "blockchain.address.subscribe",
- "params": []}
service = self.backend.mempool_service
chain = self.backend.blockchain
txpool = self.backend.transaction_pool
memory_buff = self.backend.memory_buffer
for address in affected_addrs:
- response = templ_response.copy()
- response["params"].append(address)
- history.payment_history(service, chain, txpool, memory_buff,
- address, bind(self.send_notify, _1, _2, response))
+ response = {"id": None,
+ "method": "blockchain.address.subscribe",
+ "params": [str(address)]}
+ history.payment_history(service, chain, txpool, memory_buff, address,
+ bind(self.send_notify, _1, _2, response))
def mempool_n(self, result):
assert result is not None
if ec:
print "Error: Monitor.send_notify()", ec
return
+ assert len(response["params"]) == 1
response["params"].append(self.mempool_n(result))
self.processor.push_response(response)
+
class Backend:
def __init__(self):
else:
print "Accepted transaction", tx_hash
+
class GhostValue:
def __init__(self):
self.value = value
self.event.set()
+
class NumblocksSubscribe:
def __init__(self, backend, processor):
latest = fork_point + len(arrivals)
self.latest.set(latest)
response = {"id": None, "method": "blockchain.numblocks.subscribe",
- "result": latest}
+ "params": [latest]}
self.processor.push_response(response)
self.backend.blockchain.subscribe_reorganize(self.reorganize)
def subscribe(self, request):
latest = self.latest.get()
response = {"id": request["id"],
- "method": "blockchain.numblocks.subscribe",
"result": latest,
"error": None}
self.processor.push_response(response)
+
class AddressGetHistory:
def __init__(self, backend, processor):
chain = self.backend.blockchain
txpool = self.backend.transaction_pool
memory_buff = self.backend.memory_buffer
- history.payment_history(service, chain, txpool, memory_buff,
- address, bind(self.respond, _1, _2, request))
+ history.payment_history(service, chain, txpool, memory_buff, address,
+ bind(self.respond, _1, _2, request))
def respond(self, ec, result, request):
if ec:
response = {"id": request["id"], "result": result, "error": None}
self.processor.push_response(response)
+
class AddressSubscribe:
def __init__(self, backend, processor, cache, monitor):
chain = self.backend.blockchain
txpool = self.backend.transaction_pool
memory_buff = self.backend.memory_buffer
- history.payment_history(service, chain, txpool, memory_buff,
- address, bind(self.construct, _1, _2, request))
+ history.payment_history(service, chain, txpool, memory_buff, address,
+ bind(self.construct, _1, _2, request))
def construct(self, ec, result, request):
if ec:
self.processor.push_response(response)
return True
+
class BlockchainProcessor(Processor):
def __init__(self, config):
self.broadcast_transaction(request)
def broadcast_transaction(self, request):
- raw_tx = bitcoin.data_chunk(str(request["params"]))
+ raw_tx = bitcoin.data_chunk(str(request["params"][0]))
exporter = bitcoin.satoshi_exporter()
try:
tx = exporter.load_transaction(raw_tx)
except RuntimeError:
- response = {"id": request["id"], "result": None,
- "error": {"message":
- "Exception while parsing the transaction data.",
- "code": -4}}
+ response = {
+ "id": request["id"],
+ "result": None,
+ "error": {
+ "message": "Exception while parsing the transaction data.",
+ "code": -4,
+ }
+ }
else:
self.backend.protocol.broadcast_transaction(tx)
tx_hash = str(bitcoin.hash_transaction(tx))
response = {"id": request["id"], "result": tx_hash, "error": None}
self.push_response(response)
-
- def run(self):
- print "Warning: pre-alpha prototype. Full of bugs."
- while not self.shared.stopped():
- time.sleep(1)
-