self.headers_data = ''
self.headers_path = config.get('leveldb', 'path_fulltree')
+ self.mempool_values = {}
self.mempool_addresses = {}
self.mempool_hist = {}
self.mempool_hashes = set([])
self.memorypool_update()
print_log("Memory pool initialized.")
- threading.Timer(10, self.main_iteration).start()
+ self.timer = threading.Timer(10, self.main_iteration)
+ self.timer.start()
try:
respdata = urllib.urlopen(self.bitcoind_url, postdata).read()
except:
+ print_log("error calling bitcoind")
traceback.print_exc(file=sys.stdout)
self.shared.stop()
hist = self.storage.get_history(addr)
is_known = True
except:
+ print_log("error get_history")
self.shared.stop()
raise
if hist:
# add memory pool
with self.mempool_lock:
- for txid in self.mempool_hist.get(addr, []):
+ for txid, delta in self.mempool_hist.get(addr, []):
hist.append({'tx_hash':txid, 'height':0})
# add something to distinguish between unused and empty addresses
return hist
+ def get_unconfirmed_value(self, addr):
+ v = 0
+ with self.mempool_lock:
+ for txid, delta in self.mempool_hist.get(addr, []):
+ v += delta
+ return v
+
+
def get_status(self, addr, cache_only=False):
tx_points = self.get_history(addr, cache_only)
if cache_only and tx_points == -1:
if session in l:
l.remove(session)
if session in l:
- print "error rc!!"
+ print_log("error rc!!")
self.shared.stop()
if l == []:
self.watched_addresses.pop(addr)
error = str(e) + ': ' + address
print_log("error:", error)
+ elif method == 'blockchain.address.get_mempool':
+ try:
+ address = str(params[0])
+ result = self.get_unconfirmed_history(address, cache_only)
+ except BaseException, e:
+ error = str(e) + ': ' + address
+ print_log("error:", error)
+
elif method == 'blockchain.address.get_balance':
try:
address = str(params[0])
- result = self.storage.get_balance(address)
+ confirmed = self.storage.get_balance(address)
+ unconfirmed = self.get_unconfirmed_value(address)
+ result = { 'confirmed':confirmed, 'unconfirmed':unconfirmed }
except BaseException, e:
error = str(e) + ': ' + address
print_log("error:", error)
result = self.storage.get_address(txi)
except BaseException, e:
error = str(e)
- print_log("error:", error, txid, pos)
+ print_log("error:", error, params)
elif method == 'blockchain.block.get_header':
if cache_only:
try:
respdata = urllib.urlopen(self.bitcoind_url, postdata).read()
except:
+ print_log("bitcoind error (getfullblock)")
traceback.print_exc(file=sys.stdout)
self.shared.stop()
mempool_hashes = set(self.bitcoind('getrawmempool'))
touched_addresses = set([])
+ # get new transactions
+ new_tx = {}
for tx_hash in mempool_hashes:
if tx_hash in self.mempool_hashes:
continue
if not tx:
continue
- mpa = self.mempool_addresses.get(tx_hash, [])
+ new_tx[tx_hash] = tx
+ self.mempool_hashes.add(tx_hash)
+
+ # remove older entries from mempool_hashes
+ self.mempool_hashes = mempool_hashes
+
+
+ # check all tx outputs
+ for tx_hash, tx in new_tx.items():
+ mpa = self.mempool_addresses.get(tx_hash, {})
+ out_values = []
+ for x in tx.get('outputs'):
+ out_values.append( x['value'] )
+
+ addr = x.get('address')
+ if not addr:
+ continue
+ v = mpa.get(addr,0)
+ v += x['value']
+ mpa[addr] = v
+ touched_addresses.add(addr)
+
+ self.mempool_addresses[tx_hash] = mpa
+ self.mempool_values[tx_hash] = out_values
+
+ # check all inputs
+ for tx_hash, tx in new_tx.items():
+ mpa = self.mempool_addresses.get(tx_hash, {})
for x in tx.get('inputs'):
# we assume that the input address can be parsed by deserialize(); this is true for Electrum transactions
addr = x.get('address')
- if addr and addr not in mpa:
- mpa.append(addr)
- touched_addresses.add(addr)
+ if not addr:
+ continue
- for x in tx.get('outputs'):
- addr = x.get('address')
- if addr and addr not in mpa:
- mpa.append(addr)
- touched_addresses.add(addr)
+ v = self.mempool_values.get(x.get('prevout_hash'))
+ if v:
+ value = v[ x.get('prevout_n')]
+ else:
+ txi = (x.get('prevout_hash') + int_to_hex(x.get('prevout_n'), 4)).decode('hex')
+ value = self.storage.get_utxo_value(addr,txi)
+
+ v = mpa.get(addr,0)
+ v -= value
+ mpa[addr] = v
+ touched_addresses.add(addr)
self.mempool_addresses[tx_hash] = mpa
- self.mempool_hashes.add(tx_hash)
- # remove older entries from mempool_hashes
- self.mempool_hashes = mempool_hashes
# remove deprecated entries from mempool_addresses
for tx_hash, addresses in self.mempool_addresses.items():
if tx_hash not in self.mempool_hashes:
self.mempool_addresses.pop(tx_hash)
+ self.mempool_values.pop(tx_hash)
for addr in addresses:
touched_addresses.add(addr)
# rebuild mempool histories
new_mempool_hist = {}
for tx_hash, addresses in self.mempool_addresses.items():
- for addr in addresses:
+ for addr, delta in addresses.items():
h = new_mempool_hist.get(addr, [])
if tx_hash not in h:
- h.append(tx_hash)
+ h.append((tx_hash, delta))
new_mempool_hist[addr] = h
with self.mempool_lock:
# TODO: update cache here. if new value equals cached value, do not send notification
self.address_queue.put((address,sessions))
+
+ def close(self):
+ self.timer.join()
+ print_log("Closing database...")
+ self.storage.close()
+ print_log("Database is closed")
+
+
def main_iteration(self):
if self.shared.stopped():
- print_log("blockchain processor terminating")
- self.storage.close()
+ print_log("Stopping timer")
return
with self.dblock:
'params': [addr, status],
})
- if not self.shared.stopped():
- threading.Timer(10, self.main_iteration).start()
- else:
- print_log("blockchain processor terminating")
+ # next iteration
+ self.timer = threading.Timer(10, self.main_iteration)
+ self.timer.start()
+