-from Abe.util import hash_to_address, decode_check_address
-from Abe.DataStore import DataStore as Datastore_class
-from Abe import DataStore, readconf, BCDataStream, deserialize, util, base58
-
import binascii
-
-import thread, traceback, sys, urllib, operator
from json import dumps, loads
+import operator
from Queue import Queue
-import time, threading
-
-
-import hashlib
-encode = lambda x: x[::-1].encode('hex')
-decode = lambda x: x.decode('hex')[::-1]
-Hash = lambda x: hashlib.sha256(hashlib.sha256(x).digest()).digest()
-
-def rev_hex(s):
- return s.decode('hex')[::-1].encode('hex')
-
-def int_to_hex(i, length=1):
- s = hex(i)[2:].rstrip('L')
- s = "0"*(2*length - len(s)) + s
- return rev_hex(s)
+import sys
+import thread
+import threading
+import time
+import traceback
+import urllib
+
+from Abe import DataStore, readconf, BCDataStream, deserialize
+from Abe.util import hash_to_address, decode_check_address
-def header_to_string(res):
- s = int_to_hex(res.get('version'),4) \
- + rev_hex(res.get('prev_block_hash')) \
- + rev_hex(res.get('merkle_root')) \
- + int_to_hex(int(res.get('timestamp')),4) \
- + int_to_hex(int(res.get('bits')),4) \
- + int_to_hex(int(res.get('nonce')),4)
- return s
+from processor import Processor, print_log
+from utils import *
-class AbeStore(Datastore_class):
+class AbeStore(Datastore.Datastore):
def __init__(self, config):
conf = DataStore.CONFIG_DEFAULTS
- args, argv = readconf.parse_argv( [], conf)
- args.dbtype = config.get('database','type')
+ args, argv = readconf.parse_argv([], conf)
+ args.dbtype = config.get('database', 'type')
if args.dbtype == 'sqlite3':
- args.connect_args = { 'database' : config.get('database','database') }
+ args.connect_args = {'database': config.get('database', 'database')}
elif args.dbtype == 'MySQLdb':
- args.connect_args = { 'db' : config.get('database','database'), 'user' : config.get('database','username'), 'passwd' : config.get('database','password') }
+ args.connect_args = {'db': config.get('database', 'database'), 'user': config.get('database', 'username'), 'passwd': config.get('database', 'password')}
elif args.dbtype == 'psycopg2':
- args.connect_args = { 'database' : config.get('database','database') }
+ args.connect_args = {'database': config.get('database', 'database')}
coin = config.get('server', 'coin')
self.addrtype = 0
if coin == 'litecoin':
- print_log ('Litecoin settings:')
- datadir = config.get('server','datadir')
- print_log (' datadir = ' + datadir)
- args.datadir = [{"dirname":datadir,"chain":"Litecoin","code3":"LTC","address_version":"\u0030"}]
- print_log (' addrtype = 48')
+ print_log('Litecoin settings:')
+ datadir = config.get('server', 'datadir')
+ print_log(' datadir = ' + datadir)
+ args.datadir = [{"dirname": datadir, "chain": "Litecoin", "code3": "LTC", "address_version": "\u0030"}]
+ print_log(' addrtype = 48')
self.addrtype = 48
- Datastore_class.__init__(self,args)
+ Datastore.Datastore.__init__(self, args)
# Use 1 (Bitcoin) if chain_id is not sent
self.chain_id = self.datadirs[0]["chain_id"] or 1
- print_log ('Coin chain_id = %d' % self.chain_id)
+ print_log('Coin chain_id = %d' % self.chain_id)
- self.sql_limit = int( config.get('database','limit') )
+ self.sql_limit = int(config.get('database', 'limit'))
self.tx_cache = {}
- self.bitcoind_url = 'http://%s:%s@%s:%s/' % ( config.get('bitcoind','user'), config.get('bitcoind','password'), config.get('bitcoind','host'), config.get('bitcoind','port'))
+ self.bitcoind_url = 'http://%s:%s@%s:%s/' % (config.get('bitcoind', 'user'), config.get('bitcoind', 'password'), config.get('bitcoind', 'host'), config.get('bitcoind', 'port'))
self.chunk_cache = {}
self.last_tx_id = 0
self.known_mempool_hashes = []
-
-
def import_tx(self, tx, is_coinbase):
tx_id = super(AbeStore, self).import_tx(tx, is_coinbase)
self.last_tx_id = tx_id
return tx_id
-
-
-
def import_block(self, b, chain_ids=frozenset()):
- #print_log ("import block")
+ #print_log("import block")
block_id = super(AbeStore, self).import_block(b, chain_ids)
for pos in xrange(len(b['transactions'])):
tx = b['transactions'][pos]
if 'hash' not in tx:
- tx['hash'] = util.double_sha256(tx['tx'])
+ tx['hash'] = Hash(tx['tx'])
tx_id = self.tx_find_id_and_value(tx)
if tx_id:
self.update_tx_cache(tx_id)
else:
- print_log ("error: import_block: no tx_id")
+ print_log("error: import_block: no tx_id")
return block_id
-
def update_tx_cache(self, txid):
inrows = self.get_tx_inputs(txid, False)
for row in inrows:
_hash = self.binout(row[6])
if not _hash:
- #print_log ("WARNING: missing tx_in for tx", txid)
+ #print_log("WARNING: missing tx_in for tx", txid)
continue
address = hash_to_address(chr(self.addrtype), _hash)
with self.cache_lock:
- if self.tx_cache.has_key(address):
- print_log ("cache: invalidating", address)
+ if address in self.tx_cache:
+ print_log("cache: invalidating", address)
self.tx_cache.pop(address)
self.address_queue.put(address)
for row in outrows:
_hash = self.binout(row[6])
if not _hash:
- #print_log ("WARNING: missing tx_out for tx", txid)
+ #print_log("WARNING: missing tx_out for tx", txid)
continue
address = hash_to_address(chr(self.addrtype), _hash)
with self.cache_lock:
- if self.tx_cache.has_key(address):
- print_log ("cache: invalidating", address)
+ if address in self.tx_cache:
+ print_log("cache: invalidating", address)
self.tx_cache.pop(address)
self.address_queue.put(address)
- def safe_sql(self,sql, params=(), lock=True):
-
+ def safe_sql(self, sql, params=(), lock=True):
error = False
try:
- if lock: self.lock.acquire()
- ret = self.selectall(sql,params)
+ if lock:
+ self.lock.acquire()
+ ret = self.selectall(sql, params)
except:
error = True
traceback.print_exc(file=sys.stdout)
finally:
- if lock: self.lock.release()
+ if lock:
+ self.lock.release()
- if error:
- raise BaseException('sql error')
+ if error:
+ raise Exception('sql error')
return ret
-
def get_tx_outputs(self, tx_id, lock=True):
return self.safe_sql("""SELECT
LEFT JOIN txin ON (txin.txout_id = txout.txout_id)
LEFT JOIN pubkey ON (pubkey.pubkey_id = txout.pubkey_id)
LEFT JOIN tx nexttx ON (txin.tx_id = nexttx.tx_id)
- WHERE txout.tx_id = %d
+ WHERE txout.tx_id = %d
ORDER BY txout.txout_pos
- """%(tx_id), (), lock)
+ """ % (tx_id), (), lock)
def get_tx_inputs(self, tx_id, lock=True):
return self.safe_sql(""" SELECT
LEFT JOIN unlinked_txin u ON (u.txin_id = txin.txin_id)
WHERE txin.tx_id = %d
ORDER BY txin.txin_pos
- """%(tx_id,), (), lock)
-
+ """ % (tx_id,), (), lock)
def get_address_out_rows(self, dbhash):
out = self.safe_sql(""" SELECT
AND cc.in_longest = 1
LIMIT ? """, (dbhash, self.chain_id, self.sql_limit))
- if len(out)==self.sql_limit:
- raise BaseException('limit reached')
+ if len(out) == self.sql_limit:
+ raise Exception('limit reached')
return out
def get_address_out_rows_memorypool(self, dbhash):
tx.tx_id,
txin.txin_pos,
-prevout.txout_value
- FROM tx
+ FROM tx
JOIN txin ON (txin.tx_id = tx.tx_id)
JOIN txout prevout ON (txin.txout_id = prevout.txout_id)
JOIN pubkey ON (pubkey.pubkey_id = prevout.pubkey_id)
WHERE pubkey.pubkey_hash = ?
- LIMIT ? """, (dbhash,self.sql_limit))
+ LIMIT ? """, (dbhash, self.sql_limit))
- if len(out)==self.sql_limit:
- raise BaseException('limit reached')
+ if len(out) == self.sql_limit:
+ raise Exception('limit reached')
return out
def get_address_in_rows(self, dbhash):
AND cc.in_longest = 1
LIMIT ? """, (dbhash, self.chain_id, self.sql_limit))
- if len(out)==self.sql_limit:
- raise BaseException('limit reached')
+ if len(out) == self.sql_limit:
+ raise Exception('limit reached')
return out
def get_address_in_rows_memorypool(self, dbhash):
- out = self.safe_sql( """ SELECT
+ out = self.safe_sql(""" SELECT
0,
tx.tx_hash,
tx.tx_id,
JOIN txout ON (txout.tx_id = tx.tx_id)
JOIN pubkey ON (pubkey.pubkey_id = txout.pubkey_id)
WHERE pubkey.pubkey_hash = ?
- LIMIT ? """, (dbhash,self.sql_limit))
+ LIMIT ? """, (dbhash, self.sql_limit))
- if len(out)==self.sql_limit:
- raise BaseException('limit reached')
+ if len(out) == self.sql_limit:
+ raise Exception('limit reached')
return out
-
-
def get_history(self, addr, cache_only=False):
+ # todo: make this more efficient. it iterates over txpoints multiple times
with self.cache_lock:
- cached_version = self.tx_cache.get( addr )
+ cached_version = self.tx_cache.get(addr)
if cached_version is not None:
return cached_version
- if cache_only: return -1
+ if cache_only:
+ return -1
version, binaddr = decode_check_address(addr)
if binaddr is None:
dbhash = self.binin(binaddr)
rows = []
- rows += self.get_address_out_rows( dbhash )
- rows += self.get_address_in_rows( dbhash )
+ rows += self.get_address_out_rows(dbhash)
+ rows += self.get_address_in_rows(dbhash)
txpoints = []
known_tx = []
try:
nTime, chain_id, height, is_in, blk_hash, tx_hash, tx_id, pos, value = row
except:
- print_log ("cannot unpack row", row)
+ print_log("cannot unpack row", row)
break
tx_hash = self.hashout_hex(tx_hash)
- txpoint = {
- "timestamp": int(nTime),
- "height": int(height),
- "is_input": int(is_in),
- "block_hash": self.hashout_hex(blk_hash),
- "tx_hash": tx_hash,
- "tx_id": int(tx_id),
- "index": int(pos),
- "value": int(value),
- }
-
- txpoints.append(txpoint)
- known_tx.append(self.hashout_hex(tx_hash))
+ txpoints.append({
+ "timestamp": int(nTime),
+ "height": int(height),
+ "is_input": int(is_in),
+ "block_hash": self.hashout_hex(blk_hash),
+ "tx_hash": tx_hash,
+ "tx_id": int(tx_id),
+ "index": int(pos),
+ "value": int(value),
+ })
+ known_tx.append(self.hashout_hex(tx_hash))
# todo: sort them really...
txpoints = sorted(txpoints, key=operator.itemgetter("timestamp"))
# read memory pool
rows = []
- rows += self.get_address_in_rows_memorypool( dbhash )
- rows += self.get_address_out_rows_memorypool( dbhash )
+ rows += self.get_address_in_rows_memorypool(dbhash)
+ rows += self.get_address_out_rows_memorypool(dbhash)
address_has_mempool = False
for row in rows:
# discard transactions that are too old
if self.last_tx_id - tx_id > 50000:
- print_log ("discarding tx id", tx_id)
+ print_log("discarding tx id", tx_id)
continue
# this means that pending transactions were added to the db, even if they are not returned by getmemorypool
address_has_mempool = True
- #print_log ("mempool", tx_hash)
- txpoint = {
- "timestamp": 0,
- "height": 0,
- "is_input": int(is_in),
- "block_hash": 'mempool',
- "tx_hash": tx_hash,
- "tx_id": int(tx_id),
- "index": int(pos),
- "value": int(value),
- }
- txpoints.append(txpoint)
-
+ #print_log("mempool", tx_hash)
+ txpoints.append({
+ "timestamp": 0,
+ "height": 0,
+ "is_input": int(is_in),
+ "block_hash": 'mempool',
+ "tx_hash": tx_hash,
+ "tx_id": int(tx_id),
+ "index": int(pos),
+ "value": int(value),
+ })
for txpoint in txpoints:
tx_id = txpoint['tx_id']
-
+
txinputs = []
inrows = self.get_tx_inputs(tx_id)
for row in inrows:
_hash = self.binout(row[6])
if not _hash:
- #print_log ("WARNING: missing tx_in for tx", tx_id, addr)
+ #print_log("WARNING: missing tx_in for tx", tx_id, addr)
continue
address = hash_to_address(chr(self.addrtype), _hash)
txinputs.append(address)
for row in outrows:
_hash = self.binout(row[6])
if not _hash:
- #print_log ("WARNING: missing tx_out for tx", tx_id, addr)
+ #print_log("WARNING: missing tx_out for tx", tx_id, addr)
continue
address = hash_to_address(chr(self.addrtype), _hash)
txoutputs.append(address)
if not txpoint['is_input']:
# detect if already redeemed...
for row in outrows:
- if row[6] == dbhash: break
+ if row[6] == dbhash:
+ break
else:
raise
#row = self.get_tx_output(tx_id,dbhash)
# pos, script, value, o_hash, o_id, o_pos, binaddr = row
# if not redeemed, we add the script
if row:
- if not row[4]: txpoint['raw_output_script'] = row[1]
+ if not row[4]:
+ txpoint['raw_output_script'] = row[1]
txpoint.pop('tx_id')
-
- txpoints = map(lambda x: {'tx_hash':x['tx_hash'], 'height':x['height']}, txpoints)
+ txpoints = map(lambda x: {'tx_hash': x['tx_hash'], 'height': x['height']}, txpoints)
out = []
for item in txpoints:
- if item not in out: out.append(item)
+ if item not in out:
+ out.append(item)
# cache result
## do not cache mempool results because statuses are ambiguous
#if not address_has_mempool:
with self.cache_lock:
self.tx_cache[addr] = out
-
- return out
+ return out
def get_status(self, addr, cache_only=False):
# for 0.5 clients
tx_points = self.get_history(addr, cache_only)
- if cache_only and tx_points == -1: return -1
+ if cache_only and tx_points == -1:
+ return -1
- if not tx_points: return None
+ if not tx_points:
+ return None
status = ''
for tx in tx_points:
status += tx.get('tx_hash') + ':%d:' % tx.get('height')
- return hashlib.sha256( status ).digest().encode('hex')
-
+ return hashlib.sha256(status).digest().encode('hex')
def get_block_header(self, block_height):
out = self.safe_sql("""
prev_block_hash,
block_id
FROM chain_summary
- WHERE block_height = %d AND in_longest = 1"""%block_height)
+ WHERE block_height = %d AND in_longest = 1""" % block_height)
- if not out: raise BaseException("block not found")
+ if not out:
+ raise Exception("block not found")
row = out[0]
- (block_hash, block_version, hashMerkleRoot, nTime, nBits, nNonce, height,prev_block_hash, block_id) \
- = ( self.hashout_hex(row[0]), int(row[1]), self.hashout_hex(row[2]), int(row[3]), int(row[4]), int(row[5]), int(row[6]), self.hashout_hex(row[7]), int(row[8]) )
-
- out = {"block_height":block_height, "version":block_version, "prev_block_hash":prev_block_hash,
- "merkle_root":hashMerkleRoot, "timestamp":nTime, "bits":nBits, "nonce":nNonce}
- return out
-
+ (block_hash, block_version, hashMerkleRoot, nTime, nBits, nNonce, height, prev_block_hash, block_id) \
+ = (self.hashout_hex(row[0]), int(row[1]), self.hashout_hex(row[2]), int(row[3]), int(row[4]), int(row[5]), int(row[6]), self.hashout_hex(row[7]), int(row[8]))
+
+ return {
+ "block_height": block_height,
+ "version": block_version,
+ "prev_block_hash": prev_block_hash,
+ "merkle_root": hashMerkleRoot,
+ "timestamp": nTime,
+ "bits": nBits,
+ "nonce": nNonce,
+ }
def get_chunk(self, index):
with self.cache_lock:
msg = self.chunk_cache.get(index)
- if msg: return msg
+ if msg:
+ return msg
sql = """
SELECT
prev_block_hash,
block_height
FROM chain_summary
- WHERE block_height >= %d AND block_height< %d AND in_longest = 1 ORDER BY block_height"""%(index*2016, (index+1)*2016)
+ WHERE block_height >= %d AND block_height< %d AND in_longest = 1 ORDER BY block_height""" % (index * 2016, (index+1) * 2016)
out = self.safe_sql(sql)
msg = ''
for row in out:
(block_hash, block_version, hashMerkleRoot, nTime, nBits, nNonce, height, prev_block_hash, block_height) \
- = ( self.hashout_hex(row[0]), int(row[1]), self.hashout_hex(row[2]), int(row[3]), int(row[4]), int(row[5]), int(row[6]), self.hashout_hex(row[7]), int(row[8]) )
- h = {"block_height":block_height, "version":block_version, "prev_block_hash":prev_block_hash,
- "merkle_root":hashMerkleRoot, "timestamp":nTime, "bits":nBits, "nonce":nNonce}
-
- if h.get('block_height')==0: h['prev_block_hash'] = "0"*64
+ = (self.hashout_hex(row[0]), int(row[1]), self.hashout_hex(row[2]), int(row[3]), int(row[4]), int(row[5]), int(row[6]), self.hashout_hex(row[7]), int(row[8]))
+ h = {
+ "block_height": block_height,
+ "version": block_version,
+ "prev_block_hash": prev_block_hash,
+ "merkle_root": hashMerkleRoot,
+ "timestamp": nTime,
+ "bits": nBits,
+ "nonce": nNonce,
+ }
+
+ if h.get('block_height') == 0:
+ h['prev_block_hash'] = "0" * 64
msg += header_to_string(h)
- #print_log ("hash", encode(Hash(msg.decode('hex'))))
+ #print_log("hash", encode(Hash(msg.decode('hex'))))
#if h.get('block_height')==1:break
with self.cache_lock:
self.chunk_cache[index] = msg
- print_log ("get_chunk", index, len(msg))
+ print_log("get_chunk", index, len(msg))
return msg
-
-
def get_raw_tx(self, tx_hash, height):
- postdata = dumps({"method": 'getrawtransaction', 'params': [tx_hash, 0, height], 'id':'jsonrpc'})
+ postdata = dumps({"method": 'getrawtransaction', 'params': [tx_hash, 0, height], 'id': 'jsonrpc'})
respdata = urllib.urlopen(self.bitcoind_url, postdata).read()
r = loads(respdata)
- if r['error'] != None:
- raise BaseException(r['error'])
-
- hextx = r.get('result')
- return hextx
+ if r['error'] is not None:
+ raise Exception(r['error'])
+ return r.get('result')
def get_tx_merkle(self, tx_hash):
-
out = self.safe_sql("""
- SELECT block_tx.block_id FROM tx
- JOIN block_tx on tx.tx_id = block_tx.tx_id
+ SELECT block_tx.block_id FROM tx
+ JOIN block_tx on tx.tx_id = block_tx.tx_id
JOIN chain_summary on chain_summary.block_id = block_tx.block_id
- WHERE tx_hash='%s' AND in_longest = 1"""%tx_hash)
+ WHERE tx_hash='%s' AND in_longest = 1""" % tx_hash)
- if not out: raise BaseException("not in a block")
+ if not out:
+ raise Exception("not in a block")
block_id = int(out[0][0])
# get block height
- out = self.safe_sql("SELECT block_height FROM chain_summary WHERE block_id = %d AND in_longest = 1"%block_id)
+ out = self.safe_sql("SELECT block_height FROM chain_summary WHERE block_id = %d AND in_longest = 1" % block_id)
- if not out: raise BaseException("block not found")
+ if not out:
+ raise Exception("block not found")
block_height = int(out[0][0])
merkle = []
ORDER BY tx_pos""", (block_id,)):
_id, _pos, _hash = row
merkle.append(_hash)
- if _hash == tx_hash: tx_pos = int(_pos)
+ if _hash == tx_hash:
+ tx_pos = int(_pos)
# find subset.
# TODO: do not compute this on client request, better store the hash tree of each block in a database...
s = []
while len(merkle) != 1:
- if len(merkle)%2: merkle.append( merkle[-1] )
+ if len(merkle) % 2:
+ merkle.append(merkle[-1])
n = []
while merkle:
- new_hash = Hash( merkle[0] + merkle[1] )
+ new_hash = Hash(merkle[0] + merkle[1])
if merkle[0] == target_hash:
- s.append( encode(merkle[1]))
+ s.append(encode(merkle[1]))
target_hash = new_hash
elif merkle[1] == target_hash:
- s.append( encode(merkle[0]))
+ s.append(encode(merkle[0]))
target_hash = new_hash
- n.append( new_hash )
+ n.append(new_hash)
merkle = merkle[2:]
merkle = n
# send result
- return {"block_height":block_height, "merkle":s, "pos":tx_pos}
-
-
-
+ return {"block_height": block_height, "merkle": s, "pos": tx_pos}
def memorypool_update(store):
-
ds = BCDataStream.BCDataStream()
- postdata = dumps({"method": 'getrawmempool', 'params': [], 'id':'jsonrpc'})
+ postdata = dumps({"method": 'getrawmempool', 'params': [], 'id': 'jsonrpc'})
respdata = urllib.urlopen(store.bitcoind_url, postdata).read()
r = loads(respdata)
- if r['error'] != None:
- print_log (r['error'])
+ if r['error'] is not None:
+ print_log(r['error'])
return
mempool_hashes = r.get('result')
- num_new_tx = 0
+ num_new_tx = 0
for tx_hash in mempool_hashes:
- if tx_hash in store.known_mempool_hashes: continue
+ if tx_hash in store.known_mempool_hashes:
+ continue
store.known_mempool_hashes.append(tx_hash)
num_new_tx += 1
- postdata = dumps({"method": 'getrawtransaction', 'params': [tx_hash], 'id':'jsonrpc'})
+ postdata = dumps({"method": 'getrawtransaction', 'params': [tx_hash], 'id': 'jsonrpc'})
respdata = urllib.urlopen(store.bitcoind_url, postdata).read()
r = loads(respdata)
- if r['error'] != None:
+ if r['error'] is not None:
continue
hextx = r.get('result')
ds.clear()
ds.write(hextx.decode('hex'))
tx = deserialize.parse_Transaction(ds)
- tx['hash'] = util.double_sha256(tx['tx'])
-
+ tx['hash'] = Hash(tx['tx'])
+
if store.tx_find_id_and_value(tx):
pass
else:
tx_id = store.import_tx(tx, False)
store.update_tx_cache(tx_id)
- #print_log (tx_hash)
+ #print_log(tx_hash)
store.commit()
store.known_mempool_hashes = mempool_hashes
return num_new_tx
-
- def send_tx(self,tx):
- postdata = dumps({"method": 'sendrawtransaction', 'params': [tx], 'id':'jsonrpc'})
+ def send_tx(self, tx):
+ postdata = dumps({"method": 'sendrawtransaction', 'params': [tx], 'id': 'jsonrpc'})
respdata = urllib.urlopen(self.bitcoind_url, postdata).read()
r = loads(respdata)
- if r['error'] != None:
+ if r['error'] is not None:
msg = r['error'].get('message')
out = "error: transaction rejected by memorypool: " + msg + "\n" + tx
else:
out = r['result']
return out
-
def main_iteration(self):
with self.lock:
t1 = time.time()
time_catch_up = t2 - t1
n = self.memorypool_update()
time_mempool = time.time() - t2
- height = self.get_block_number( self.chain_id )
+ height = self.get_block_number(self.chain_id)
with self.cache_lock:
- try:
- self.chunk_cache.pop(height/2016)
- except:
+ try:
+ self.chunk_cache.pop(height/2016)
+ except:
pass
- block_header = self.get_block_header( height )
+ block_header = self.get_block_header(height)
return block_header, time_catch_up, time_mempool, n
-
-
-
def catch_up(store):
# if there is an exception, do rollback and then re-raise the exception
for dircfg in store.datadirs:
raise e
-
-
-from processor import Processor, print_log
-
class BlockchainProcessor(Processor):
def __init__(self, config, shared):
# catch_up first
self.block_header, time_catch_up, time_mempool, n = self.store.main_iteration()
self.block_number = self.block_header.get('block_height')
- print_log ("blockchain: %d blocks"%self.block_number)
+ print_log("blockchain: %d blocks" % self.block_number)
threading.Timer(10, self.run_store_iteration).start()
-
def add_request(self, request):
# see if we can get if from cache. if not, add to queue
- if self.process( request, cache_only = True) == -1:
+ if self.process(request, cache_only=True) == -1:
self.queue.put(request)
-
- def process(self, request, cache_only = False):
- #print_log ("abe process", request)
+ def process(self, request, cache_only=False):
+ #print_log("abe process", request)
message_id = request['id']
method = request['method']
- params = request.get('params',[])
+ params = request.get('params', [])
result = None
error = None
address = params[0]
result = self.store.get_status(address, cache_only)
self.watch_address(address)
- except BaseException, e:
+ except Exception, e:
error = str(e) + ': ' + address
- print_log ("error:", error)
+ print_log("error:", error)
elif method == 'blockchain.address.get_history':
try:
address = params[0]
- result = self.store.get_history( address, cache_only )
- except BaseException, e:
+ result = self.store.get_history(address, cache_only)
+ except Exception, e:
error = str(e) + ': ' + address
- print_log ("error:", error)
+ print_log("error:", error)
elif method == 'blockchain.block.get_header':
- if cache_only:
+ if cache_only:
result = -1
else:
try:
height = params[0]
- result = self.store.get_block_header( height )
- except BaseException, e:
- error = str(e) + ': %d'% height
- print_log ("error:", error)
-
+ result = self.store.get_block_header(height)
+ except Exception, e:
+ error = str(e) + ': %d' % height
+ print_log("error:", error)
+
elif method == 'blockchain.block.get_chunk':
if cache_only:
result = -1
else:
try:
index = params[0]
- result = self.store.get_chunk( index )
- except BaseException, e:
- error = str(e) + ': %d'% index
- print_log ("error:", error)
-
+ result = self.store.get_chunk(index)
+ except Exception, e:
+ error = str(e) + ': %d' % index
+ print_log("error:", error)
+
elif method == 'blockchain.transaction.broadcast':
txo = self.store.send_tx(params[0])
- print_log ("sent tx:", txo)
- result = txo
+ print_log("sent tx:", txo)
+ result = txo
elif method == 'blockchain.transaction.get_merkle':
if cache_only:
else:
try:
tx_hash = params[0]
- result = self.store.get_tx_merkle(tx_hash )
- except BaseException, e:
+ result = self.store.get_tx_merkle(tx_hash)
+ except Exception, e:
error = str(e) + ': ' + tx_hash
- print_log ("error:", error)
-
+ print_log("error:", error)
+
elif method == 'blockchain.transaction.get':
try:
tx_hash = params[0]
height = params[1]
- result = self.store.get_raw_tx(tx_hash, height )
- except BaseException, e:
+ result = self.store.get_raw_tx(tx_hash, height)
+ except Exception, e:
error = str(e) + ': ' + tx_hash
- print_log ("error:", error)
+ print_log("error:", error)
else:
- error = "unknown method:%s"%method
+ error = "unknown method:%s" % method
- if cache_only and result == -1: return -1
+ if cache_only and result == -1:
+ return -1
if error:
- response = { 'id':message_id, 'error':error }
+ response = {'id': message_id, 'error': error}
self.push_response(response)
elif result != '':
- response = { 'id':message_id, 'result':result }
+ response = {'id': message_id, 'result': result}
self.push_response(response)
-
def watch_address(self, addr):
if addr not in self.watched_addresses:
self.watched_addresses.append(addr)
-
def run_store_iteration(self):
-
try:
block_header, time_catch_up, time_mempool, n = self.store.main_iteration()
except:
traceback.print_exc(file=sys.stdout)
- print_log ("terminating")
+ print_log("terminating")
self.shared.stop()
- if self.shared.stopped():
- print_log ("exit timer")
+ if self.shared.stopped():
+ print_log("exit timer")
return
- #print_log ("block number: %d (%.3fs) mempool:%d (%.3fs)"%(self.block_number, time_catch_up, n, time_mempool))
+ #print_log("block number: %d (%.3fs) mempool:%d (%.3fs)"%(self.block_number, time_catch_up, n, time_mempool))
if self.block_number != block_header.get('block_height'):
self.block_number = block_header.get('block_height')
- print_log ("block number: %d (%.3fs)"%(self.block_number, time_catch_up))
- self.push_response({ 'id': None, 'method':'blockchain.numblocks.subscribe', 'params':[self.block_number] })
+ print_log("block number: %d (%.3fs)" % (self.block_number, time_catch_up))
+ self.push_response({'id': None, 'method': 'blockchain.numblocks.subscribe', 'params': [self.block_number]})
if self.block_header != block_header:
self.block_header = block_header
- self.push_response({ 'id': None, 'method':'blockchain.headers.subscribe', 'params':[self.block_header] })
-
+ self.push_response({'id': None, 'method': 'blockchain.headers.subscribe', 'params': [self.block_header]})
while True:
try:
break
if addr in self.watched_addresses:
try:
- status = self.store.get_status( addr )
- self.push_response({ 'id': None, 'method':'blockchain.address.subscribe', 'params':[addr, status] })
+ status = self.store.get_status(addr)
+ self.push_response({'id': None, 'method': 'blockchain.address.subscribe', 'params': [addr, status]})
except:
break
threading.Timer(10, self.run_store_iteration).start()
-
-
+import ast
+import hashlib
from json import dumps, loads
-import leveldb, urllib
-import deserialize
-import ast, time, threading, hashlib
+import leveldb
+import os
from Queue import Queue
-import traceback, sys, os, random
-
-
-from util import Hash, hash_encode, hash_decode, rev_hex, int_to_hex
-from util import bc_address_to_hash_160, hash_160_to_bc_address, header_to_string, header_from_string
+import random
+import sys
+import time
+import threading
+import traceback
+import urllib
+
+from backends.bitcoind import deserialize
from processor import Processor, print_log
+from utils import *
+
class BlockchainProcessor(Processor):
self.shared.stop()
self.bitcoind_url = 'http://%s:%s@%s:%s/' % (
- config.get('bitcoind','user'),
- config.get('bitcoind','password'),
- config.get('bitcoind','host'),
- config.get('bitcoind','port'))
+ config.get('bitcoind', 'user'),
+ config.get('bitcoind', 'password'),
+ config.get('bitcoind', 'host'),
+ config.get('bitcoind', 'port'))
self.height = 0
self.is_test = False
self.sent_height = 0
self.sent_header = None
-
try:
hist = self.deserialize(self.db.Get('height'))
- self.last_hash, self.height, _ = hist[0]
- print_log( "hist", hist )
+ self.last_hash, self.height, _ = hist[0]
+ print_log("hist", hist)
except:
#traceback.print_exc(file=sys.stdout)
print_log('initializing database')
shared.stop()
sys.exit(0)
- print_log( "blockchain is up to date." )
+ print_log("blockchain is up to date.")
threading.Timer(10, self.main_iteration).start()
-
-
def bitcoind(self, method, params=[]):
- postdata = dumps({"method": method, 'params': params, 'id':'jsonrpc'})
+ postdata = dumps({"method": method, 'params': params, 'id': 'jsonrpc'})
try:
respdata = urllib.urlopen(self.bitcoind_url, postdata).read()
except:
self.shared.stop()
r = loads(respdata)
- if r['error'] != None:
+ if r['error'] is not None:
raise BaseException(r['error'])
return r.get('result')
-
def serialize(self, h):
s = ''
s += txid + int_to_hex(txpos, 4) + int_to_hex(height, 4)
return s.decode('hex')
-
def deserialize(self, s):
h = []
while s:
txid = s[0:32].encode('hex')
- txpos = int( rev_hex( s[32:36].encode('hex') ), 16 )
- height = int( rev_hex( s[36:40].encode('hex') ), 16 )
- h.append( ( txid, txpos, height ) )
+ txpos = int(rev_hex(s[32:36].encode('hex')), 16)
+ height = int(rev_hex(s[36:40].encode('hex')), 16)
+ h.append((txid, txpos, height))
s = s[40:]
return h
-
def block2header(self, b):
- return {"block_height":b.get('height'), "version":b.get('version'), "prev_block_hash":b.get('previousblockhash'),
- "merkle_root":b.get('merkleroot'), "timestamp":b.get('time'), "bits":int(b.get('bits'),16), "nonce":b.get('nonce')}
-
+ return {
+ "block_height": b.get('height'),
+ "version": b.get('version'),
+ "prev_block_hash": b.get('previousblockhash'),
+ "merkle_root": b.get('merkleroot'),
+ "timestamp": b.get('time'),
+ "bits": int(b.get('bits'), 16),
+ "nonce": b.get('nonce'),
+ }
def get_header(self, height):
block_hash = self.bitcoind('getblockhash', [height])
b = self.bitcoind('getblock', [block_hash])
return self.block2header(b)
-
def init_headers(self, db_height):
self.chunk_cache = {}
- self.headers_filename = os.path.join( self.dbpath, 'blockchain_headers')
+ self.headers_filename = os.path.join(self.dbpath, 'blockchain_headers')
if os.path.exists(self.headers_filename):
height = os.path.getsize(self.headers_filename)/80 - 1 # the current height
else:
prev_hash = None
else:
- open(self.headers_filename,'wb').close()
+ open(self.headers_filename, 'wb').close()
prev_hash = None
height = -1
if height < db_height:
- print_log( "catching up missing headers:", height, db_height)
+ print_log("catching up missing headers:", height, db_height)
try:
while height < db_height:
height = height + 1
header = self.get_header(height)
- if height>1:
+ if height > 1:
assert prev_hash == header.get('prev_block_hash')
self.write_header(header, sync=False)
prev_hash = self.hash_header(header)
- if height%1000==0: print_log("headers file:",height)
+ if (height % 1000) == 0:
+ print_log("headers file:", height)
except KeyboardInterrupt:
self.flush_headers()
sys.exit()
self.flush_headers()
-
def hash_header(self, header):
return rev_hex(Hash(header_to_string(header).decode('hex')).encode('hex'))
-
def read_header(self, block_height):
if os.path.exists(self.headers_filename):
- f = open(self.headers_filename,'rb')
- f.seek(block_height*80)
- h = f.read(80)
- f.close()
+ with open(self.headers_filename, 'rb') as f:
+ f.seek(block_height * 80)
+ h = f.read(80)
if len(h) == 80:
h = header_from_string(h)
return h
-
def read_chunk(self, index):
- f = open(self.headers_filename,'rb')
- f.seek(index*2016*80)
- chunk = f.read(2016*80)
- f.close()
+ with open(self.headers_filename, 'rb') as f:
+ f.seek(index*2016*80)
+ chunk = f.read(2016*80)
return chunk.encode('hex')
-
def write_header(self, header, sync=True):
if not self.headers_data:
self.headers_offset = header.get('block_height')
self.headers_data = self.headers_data[:-40]
def flush_headers(self):
- if not self.headers_data: return
- f = open(self.headers_filename,'rb+')
- f.seek(self.headers_offset*80)
- f.write(self.headers_data)
- f.close()
+ if not self.headers_data:
+ return
+ with open(self.headers_filename, 'rb+') as f:
+ f.seek(self.headers_offset*80)
+ f.write(self.headers_data)
self.headers_data = ''
-
def get_chunk(self, i):
# store them on disk; store the current chunk in memory
with self.cache_lock:
return chunk
-
def get_mempool_transaction(self, txid):
try:
raw_tx = self.bitcoind('getrawtransaction', [txid, 0, -1])
vds = deserialize.BCDataStream()
vds.write(raw_tx.decode('hex'))
- out = deserialize.parse_Transaction(vds, is_coinbase = False)
- return out
+ return deserialize.parse_Transaction(vds, is_coinbase=False)
def get_history(self, addr, cache_only=False):
- with self.cache_lock: hist = self.history_cache.get( addr )
- if hist is not None: return hist
- if cache_only: return -1
+ with self.cache_lock:
+ hist = self.history_cache.get(addr)
+ if hist is not None:
+ return hist
+ if cache_only:
+ return -1
with self.dblock:
try:
hash_160 = bc_address_to_hash_160(addr)
hist = self.deserialize(self.db.Get(hash_160))
is_known = True
- except:
+ except:
hist = []
is_known = False
# should not be necessary
- hist.sort( key=lambda tup: tup[1])
+ hist.sort(key=lambda tup: tup[1])
# check uniqueness too...
# add memory pool
with self.mempool_lock:
- for txid in self.mempool_hist.get(addr,[]):
+ for txid in self.mempool_hist.get(addr, []):
hist.append((txid, 0, 0))
- hist = map(lambda x: {'tx_hash':x[0], 'height':x[2]}, hist)
+ hist = map(lambda x: {'tx_hash': x[0], 'height': x[2]}, hist)
# add something to distinguish between unused and empty addresses
- if hist == [] and is_known: hist = ['*']
+ if hist == [] and is_known:
+ hist = ['*']
- with self.cache_lock: self.history_cache[addr] = hist
+ with self.cache_lock:
+ self.history_cache[addr] = hist
return hist
-
def get_status(self, addr, cache_only=False):
tx_points = self.get_history(addr, cache_only)
- if cache_only and tx_points == -1: return -1
+ if cache_only and tx_points == -1:
+ return -1
- if not tx_points: return None
- if tx_points == ['*']: return '*'
+ if not tx_points:
+ return None
+ if tx_points == ['*']:
+ return '*'
status = ''
for tx in tx_points:
status += tx.get('tx_hash') + ':%d:' % tx.get('height')
- return hashlib.sha256( status ).digest().encode('hex')
-
+ return hashlib.sha256(status).digest().encode('hex')
def get_merkle(self, tx_hash, height):
b = self.bitcoind('getblock', [block_hash])
tx_list = b.get('tx')
tx_pos = tx_list.index(tx_hash)
-
+
merkle = map(hash_decode, tx_list)
target_hash = hash_decode(tx_hash)
s = []
while len(merkle) != 1:
- if len(merkle)%2: merkle.append( merkle[-1] )
+ if len(merkle) % 2:
+ merkle.append(merkle[-1])
n = []
while merkle:
- new_hash = Hash( merkle[0] + merkle[1] )
+ new_hash = Hash(merkle[0] + merkle[1])
if merkle[0] == target_hash:
- s.append( hash_encode( merkle[1]))
+ s.append(hash_encode(merkle[1]))
target_hash = new_hash
elif merkle[1] == target_hash:
- s.append( hash_encode( merkle[0]))
+ s.append(hash_encode(merkle[0]))
target_hash = new_hash
- n.append( new_hash )
+ n.append(new_hash)
merkle = merkle[2:]
merkle = n
- return {"block_height":height, "merkle":s, "pos":tx_pos}
-
-
-
+ return {"block_height": height, "merkle": s, "pos": tx_pos}
def add_to_history(self, addr, tx_hash, tx_pos, tx_height):
-
# keep it sorted
s = (tx_hash + int_to_hex(tx_pos, 4) + int_to_hex(tx_height, 4)).decode('hex')
- serialized_hist = self.batch_list[addr]
+ serialized_hist = self.batch_list[addr]
l = len(serialized_hist)/40
for i in range(l-1, -1, -1):
item = serialized_hist[40*i:40*(i+1)]
- item_height = int( rev_hex( item[36:40].encode('hex') ), 16 )
+ item_height = int(rev_hex(item[36:40].encode('hex')), 16)
if item_height < tx_height:
serialized_hist = serialized_hist[0:40*(i+1)] + s + serialized_hist[40*(i+1):]
break
txo = (tx_hash + int_to_hex(tx_pos, 4)).decode('hex')
self.batch_txio[txo] = addr
-
def remove_from_history(self, addr, tx_hash, tx_pos):
-
txi = (tx_hash + int_to_hex(tx_pos, 4)).decode('hex')
if addr is None:
addr = self.batch_txio[txi]
except:
raise BaseException(tx_hash, tx_pos)
-
+
serialized_hist = self.batch_list[addr]
l = len(serialized_hist)/40
for i in range(l):
item = serialized_hist[40*i:40*(i+1)]
if item[0:36] == txi:
- height = int( rev_hex( item[36:40].encode('hex') ), 16 )
+ height = int(rev_hex(item[36:40].encode('hex')), 16)
serialized_hist = serialized_hist[0:40*i] + serialized_hist[40*(i+1):]
break
else:
self.batch_list[addr] = serialized_hist
return height, addr
-
def deserialize_block(self, block):
txlist = block.get('tx')
tx_hashes = [] # ordered txids
return tx_hashes, txdict
def get_undo_info(self, height):
- s = self.db.Get("undo%d"%(height%100))
+ s = self.db.Get("undo%d" % (height % 100))
return eval(s)
def write_undo_info(self, batch, height, undo_info):
if self.is_test or height > self.bitcoind_height - 100:
- batch.Put("undo%d"%(height%100), repr(undo_info))
-
+ batch.Put("undo%d" % (height % 100), repr(undo_info))
def import_block(self, block, block_hash, block_height, sync, revert=False):
t00 = time.time()
-
if not revert:
# read addresses of tx inputs
for tx in txdict.values():
for x in tx.get('outputs'):
txo = (txid + int_to_hex(x.get('index'), 4)).decode('hex')
block_outputs.append(txo)
-
+
# read histories of addresses
for txid, tx in txdict.items():
for x in tx.get('outputs'):
for addr in addr_to_read:
try:
self.batch_list[addr] = self.db.Get(addr)
- except:
+ except:
self.batch_list[addr] = ''
-
- if revert:
+ if revert:
undo_info = self.get_undo_info(block_height)
# print "undo", block_height, undo_info
- else: undo_info = {}
+ else:
+ undo_info = {}
# process
t1 = time.time()
- if revert: tx_hashes = tx_hashes[::-1]
- for txid in tx_hashes: # must be ordered
+ if revert:
+ tx_hashes = tx_hashes[::-1]
+ for txid in tx_hashes: # must be ordered
tx = txdict[txid]
if not revert:
undo = []
for x in tx.get('inputs'):
- prevout_height, prevout_addr = self.remove_from_history( None, x.get('prevout_hash'), x.get('prevout_n'))
- undo.append( (prevout_height, prevout_addr) )
+ prevout_height, prevout_addr = self.remove_from_history(None, x.get('prevout_hash'), x.get('prevout_n'))
+ undo.append((prevout_height, prevout_addr))
undo_info[txid] = undo
for x in tx.get('outputs'):
hash_160 = bc_address_to_hash_160(x.get('address'))
- self.add_to_history( hash_160, txid, x.get('index'), block_height)
-
+ self.add_to_history(hash_160, txid, x.get('index'), block_height)
+
else:
for x in tx.get('outputs'):
hash_160 = bc_address_to_hash_160(x.get('address'))
- self.remove_from_history( hash_160, txid, x.get('index'))
+ self.remove_from_history(hash_160, txid, x.get('index'))
i = 0
for x in tx.get('inputs'):
self.batch_list[prevout_addr] = self.db.Get(prevout_addr)
# re-add them to the history
- self.add_to_history( prevout_addr, x.get('prevout_hash'), x.get('prevout_n'), prevout_height)
- # print_log( "new hist for", hash_160_to_bc_address(prevout_addr), self.deserialize(self.batch_list[prevout_addr]) )
+ self.add_to_history(prevout_addr, x.get('prevout_hash'), x.get('prevout_n'), prevout_height)
+ # print_log("new hist for", hash_160_to_bc_address(prevout_addr), self.deserialize(self.batch_list[prevout_addr]) )
# write
max_len = 0
# delete spent inputs
for txi in block_inputs:
batch.Delete(txi)
- # add undo info
+ # add undo info
self.write_undo_info(batch, block_height, undo_info)
else:
# restore spent inputs
for txo in block_outputs:
batch.Delete(txo)
-
# add the max
- batch.Put('height', self.serialize( [(block_hash, block_height, 0)] ) )
+ batch.Put('height', self.serialize([(block_hash, block_height, 0)]))
# actual write
- self.db.Write(batch, sync = sync)
+ self.db.Write(batch, sync=sync)
t3 = time.time()
- if t3 - t0 > 10 and not sync:
- print_log("block", block_height,
- "parse:%0.2f "%(t00 - t0),
- "read:%0.2f "%(t1 - t00),
- "proc:%.2f "%(t2-t1),
- "write:%.2f "%(t3-t2),
+ if t3 - t0 > 10 and not sync:
+ print_log("block", block_height,
+ "parse:%0.2f " % (t00 - t0),
+ "read:%0.2f " % (t1 - t00),
+ "proc:%.2f " % (t2-t1),
+ "write:%.2f " % (t3-t2),
"max:", max_len, hash_160_to_bc_address(max_addr))
- for h160 in self.batch_list.keys():
+ for h160 in self.batch_list.keys():
addr = hash_160_to_bc_address(h160)
self.invalidate_cache(addr)
-
-
def add_request(self, request):
# see if we can get if from cache. if not, add to queue
- if self.process( request, cache_only = True) == -1:
+ if self.process(request, cache_only=True) == -1:
self.queue.put(request)
-
-
- def process(self, request, cache_only = False):
+ def process(self, request, cache_only=False):
#print "abe process", request
message_id = request['id']
method = request['method']
- params = request.get('params',[])
+ params = request.get('params', [])
result = None
error = None
self.watch_address(address)
except BaseException, e:
error = str(e) + ': ' + address
- print_log( "error:", error )
+ print_log("error:", error)
elif method == 'blockchain.address.unsubscribe':
try:
password = params[0]
address = params[1]
- if password == self.config.get('server','password'):
+ if password == self.config.get('server', 'password'):
self.watched_addresses.remove(address)
print_log('unsubscribed', address)
result = "ok"
result = "authentication error"
except BaseException, e:
error = str(e) + ': ' + address
- print_log( "error:", error )
+ print_log("error:", error)
elif method == 'blockchain.address.get_history':
try:
address = params[0]
- result = self.get_history( address, cache_only )
+ result = self.get_history(address, cache_only)
except BaseException, e:
error = str(e) + ': ' + address
- print_log( "error:", error )
+ print_log("error:", error)
elif method == 'blockchain.block.get_header':
- if cache_only:
+ if cache_only:
result = -1
else:
try:
height = params[0]
- result = self.get_header( height )
+ result = self.get_header(height)
except BaseException, e:
- error = str(e) + ': %d'% height
- print_log( "error:", error )
-
+ error = str(e) + ': %d' % height
+ print_log("error:", error)
+
elif method == 'blockchain.block.get_chunk':
if cache_only:
result = -1
else:
try:
index = params[0]
- result = self.get_chunk( index )
+ result = self.get_chunk(index)
except BaseException, e:
- error = str(e) + ': %d'% index
- print_log( "error:", error)
+ error = str(e) + ': %d' % index
+ print_log("error:", error)
elif method == 'blockchain.transaction.broadcast':
try:
txo = self.bitcoind('sendrawtransaction', params)
- print_log( "sent tx:", txo )
- result = txo
+ print_log("sent tx:", txo)
+ result = txo
except BaseException, e:
- result = str(e) # do not send an error
- print_log( "error:", str(e), params )
+ result = str(e) # do not send an error
+ print_log("error:", result, params)
elif method == 'blockchain.transaction.get_merkle':
if cache_only:
try:
tx_hash = params[0]
tx_height = params[1]
- result = self.get_merkle(tx_hash, tx_height)
+ result = self.get_merkle(tx_hash, tx_height)
except BaseException, e:
error = str(e) + ': ' + tx_hash
- print_log( "error:", error )
-
+ print_log("error:", error)
+
elif method == 'blockchain.transaction.get':
try:
tx_hash = params[0]
height = params[1]
- result = self.bitcoind('getrawtransaction', [tx_hash, 0, height] )
+ result = self.bitcoind('getrawtransaction', [tx_hash, 0, height])
except BaseException, e:
error = str(e) + ': ' + tx_hash
- print_log( "error:", error )
+ print_log("error:", error)
else:
- error = "unknown method:%s"%method
+ error = "unknown method:%s" % method
- if cache_only and result == -1: return -1
+ if cache_only and result == -1:
+ return -1
if error:
- response = { 'id':message_id, 'error':error }
- self.push_response(response)
+ response = {'id': message_id, 'error': error}
elif result != '':
- response = { 'id':message_id, 'result':result }
- self.push_response(response)
-
+ response = {'id': message_id, 'result': result}
+ self.push_response(response)
def watch_address(self, addr):
if addr not in self.watched_addresses:
self.watched_addresses.append(addr)
-
-
- def catch_up(self, sync = True):
-
+ def catch_up(self, sync=True):
t1 = time.time()
while not self.shared.stopped():
-
# are we done yet?
info = self.bitcoind('getinfo')
self.bitcoind_height = info.get('blocks')
bitcoind_block_hash = self.bitcoind('getblockhash', [self.bitcoind_height])
- if self.last_hash == bitcoind_block_hash:
+ if self.last_hash == bitcoind_block_hash:
self.up_to_date = True
break
# not done..
self.up_to_date = False
- next_block_hash = self.bitcoind('getblockhash', [self.height+1])
+ next_block_hash = self.bitcoind('getblockhash', [self.height + 1])
next_block = self.bitcoind('getblock', [next_block_hash, 1])
- # fixme: this is unsafe, if we revert when the undo info is not yet written
- revert = (random.randint(1, 100)==1) if self.is_test else False
+ # fixme: this is unsafe, if we revert when the undo info is not yet written
+ revert = (random.randint(1, 100) == 1) if self.is_test else False
if (next_block.get('previousblockhash') == self.last_hash) and not revert:
self.write_header(self.block2header(next_block), sync)
self.last_hash = next_block_hash
- if (self.height)%100 == 0 and not sync:
+ if self.height % 100 == 0 and not sync:
t2 = time.time()
- print_log( "catch_up: block %d (%.3fs)"%( self.height, t2 - t1 ) )
+ print_log("catch_up: block %d (%.3fs)" % (self.height, t2 - t1))
t1 = t2
-
+
else:
# revert current block
block = self.bitcoind('getblock', [self.last_hash, 1])
- print_log( "blockchain reorg", self.height, block.get('previousblockhash'), self.last_hash )
+ print_log("blockchain reorg", self.height, block.get('previousblockhash'), self.last_hash)
self.import_block(block, self.last_hash, self.height, sync, revert=True)
self.pop_header()
self.flush_headers()
- self.height = self.height -1
+ self.height -= 1
# read previous header from disk
self.header = self.read_header(self.height)
self.last_hash = self.hash_header(self.header)
-
self.header = self.block2header(self.bitcoind('getblock', [self.last_hash]))
-
-
-
def memorypool_update(self):
-
mempool_hashes = self.bitcoind('getrawmempool')
for tx_hash in mempool_hashes:
- if tx_hash in self.mempool_hashes: continue
+ if tx_hash in self.mempool_hashes:
+ continue
tx = self.get_mempool_transaction(tx_hash)
- if not tx: continue
+ if not tx:
+ continue
for x in tx.get('inputs'):
txi = (x.get('prevout_hash') + int_to_hex(x.get('prevout_n'), 4)).decode('hex')
except:
continue
l = self.mempool_addresses.get(tx_hash, [])
- if addr not in l:
- l.append( addr )
+ if addr not in l:
+ l.append(addr)
self.mempool_addresses[tx_hash] = l
for x in tx.get('outputs'):
addr = x.get('address')
l = self.mempool_addresses.get(tx_hash, [])
- if addr not in l:
- l.append( addr )
+ if addr not in l:
+ l.append(addr)
self.mempool_addresses[tx_hash] = l
self.mempool_hashes.append(tx_hash)
for tx_hash, addresses in self.mempool_addresses.items():
for addr in addresses:
h = new_mempool_hist.get(addr, [])
- if tx_hash not in h:
- h.append( tx_hash )
+ if tx_hash not in h:
+ h.append(tx_hash)
new_mempool_hist[addr] = h
for addr in new_mempool_hist.keys():
if addr in self.mempool_hist.keys():
- if self.mempool_hist[addr] != new_mempool_hist[addr]:
+ if self.mempool_hist[addr] != new_mempool_hist[addr]:
self.invalidate_cache(addr)
else:
self.invalidate_cache(addr)
with self.mempool_lock:
self.mempool_hist = new_mempool_hist
-
-
def invalidate_cache(self, address):
with self.cache_lock:
- if self.history_cache.has_key(address):
- print_log( "cache: invalidating", address )
+ if 'address' in self.history_cache:
+ print_log("cache: invalidating", address)
self.history_cache.pop(address)
if address in self.watched_addresses:
self.address_queue.put(address)
-
-
def main_iteration(self):
-
- if self.shared.stopped():
- print_log( "blockchain processor terminating" )
+ if self.shared.stopped():
+ print_log("blockchain processor terminating")
return
with self.dblock:
t3 = time.time()
# print "mempool:", len(self.mempool_addresses), len(self.mempool_hist), "%.3fs"%(t3 - t2)
-
if self.sent_height != self.height:
self.sent_height = self.height
- self.push_response({ 'id': None, 'method':'blockchain.numblocks.subscribe', 'params':[self.height] })
+ self.push_response({
+ 'id': None,
+ 'method': 'blockchain.numblocks.subscribe',
+ 'params': [self.height],
+ })
if self.sent_header != self.header:
- print_log( "blockchain: %d (%.3fs)"%( self.height, t2 - t1 ) )
+ print_log("blockchain: %d (%.3fs)" % (self.height, t2 - t1))
self.sent_header = self.header
- self.push_response({ 'id': None, 'method':'blockchain.headers.subscribe', 'params':[self.header] })
+ self.push_response({
+ 'id': None,
+ 'method': 'blockchain.headers.subscribe',
+ 'params': [self.header],
+ })
while True:
try:
except:
break
if addr in self.watched_addresses:
- status = self.get_status( addr )
- self.push_response({ 'id': None, 'method':'blockchain.address.subscribe', 'params':[addr, status] })
-
- if not self.shared.stopped():
+ status = self.get_status(addr)
+ self.push_response({
+ 'id': None,
+ 'method': 'blockchain.address.subscribe',
+ 'params': [addr, status],
+ })
+
+ if not self.shared.stopped():
threading.Timer(10, self.main_iteration).start()
else:
- print_log( "blockchain processor terminating" )
-
-
-
-
+ print_log("blockchain processor terminating")
#
#
-#from bitcoin import public_key_to_bc_address, hash_160_to_bc_address, hash_encode
-#import socket
-import time, hashlib
+import mmap
+import string
import struct
-addrtype = 0
-
-
-Hash = lambda x: hashlib.sha256(hashlib.sha256(x).digest()).digest()
-hash_encode = lambda x: x[::-1].encode('hex')
-hash_decode = lambda x: x.decode('hex')[::-1]
-
-def hash_160(public_key):
- md = hashlib.new('ripemd160')
- md.update(hashlib.sha256(public_key).digest())
- return md.digest()
-
-def public_key_to_bc_address(public_key):
- h160 = hash_160(public_key)
- return hash_160_to_bc_address(h160)
-
-def hash_160_to_bc_address(h160):
- vh160 = chr(addrtype) + h160
- h = Hash(vh160)
- addr = vh160 + h[0:4]
- return b58encode(addr)
+import types
-__b58chars = '123456789ABCDEFGHJKLMNPQRSTUVWXYZabcdefghijkmnopqrstuvwxyz'
-__b58base = len(__b58chars)
+from utils import *
-def b58encode(v):
- """ encode v, which is a string of bytes, to base58."""
-
- long_value = 0L
- for (i, c) in enumerate(v[::-1]):
- long_value += (256**i) * ord(c)
-
- result = ''
- while long_value >= __b58base:
- div, mod = divmod(long_value, __b58base)
- result = __b58chars[mod] + result
- long_value = div
- result = __b58chars[long_value] + result
-
- # Bitcoin does a little leading-zero-compression:
- # leading 0-bytes in the input become leading-1s
- nPad = 0
- for c in v:
- if c == '\0': nPad += 1
- else: break
-
- return (__b58chars[0]*nPad) + result
-
-def b58decode(v, length):
- """ decode v into a string of len bytes."""
- long_value = 0L
- for (i, c) in enumerate(v[::-1]):
- long_value += __b58chars.find(c) * (__b58base**i)
-
- result = ''
- while long_value >= 256:
- div, mod = divmod(long_value, 256)
- result = chr(mod) + result
- long_value = div
- result = chr(long_value) + result
- nPad = 0
- for c in v:
- if c == __b58chars[0]: nPad += 1
- else: break
+class SerializationError(Exception):
+ """Thrown when there's a problem deserializing or serializing."""
- result = chr(0)*nPad + result
- if length is not None and len(result) != length:
- return None
- return result
+class BCDataStream(object):
+ """Workalike python implementation of Bitcoin's CDataStream class."""
+ def __init__(self):
+ self.input = None
+ self.read_cursor = 0
+ def clear(self):
+ self.input = None
+ self.read_cursor = 0
+
+ def write(self, bytes): # Initialize with string of bytes
+ if self.input is None:
+ self.input = bytes
+ else:
+ self.input += bytes
+
+ def map_file(self, file, start): # Initialize with bytes from file
+ self.input = mmap.mmap(file.fileno(), 0, access=mmap.ACCESS_READ)
+ self.read_cursor = start
+
+ def seek_file(self, position):
+ self.read_cursor = position
+
+ def close_file(self):
+ self.input.close()
+
+ def read_string(self):
+ # Strings are encoded depending on length:
+ # 0 to 252 : 1-byte-length followed by bytes (if any)
+ # 253 to 65,535 : byte'253' 2-byte-length followed by bytes
+ # 65,536 to 4,294,967,295 : byte '254' 4-byte-length followed by bytes
+ # ... and the Bitcoin client is coded to understand:
+ # greater than 4,294,967,295 : byte '255' 8-byte-length followed by bytes of string
+ # ... but I don't think it actually handles any strings that big.
+ if self.input is None:
+ raise SerializationError("call write(bytes) before trying to deserialize")
+
+ try:
+ length = self.read_compact_size()
+ except IndexError:
+ raise SerializationError("attempt to read past end of buffer")
+
+ return self.read_bytes(length)
+
+ def write_string(self, string):
+ # Length-encoded as with read-string
+ self.write_compact_size(len(string))
+ self.write(string)
+
+ def read_bytes(self, length):
+ try:
+ result = self.input[self.read_cursor:self.read_cursor+length]
+ self.read_cursor += length
+ return result
+ except IndexError:
+ raise SerializationError("attempt to read past end of buffer")
+
+ return ''
+
+ def read_boolean(self):
+ return self.read_bytes(1)[0] != chr(0)
+
+ def read_int16(self):
+ return self._read_num('<h')
+
+ def read_uint16(self):
+ return self._read_num('<H')
+
+ def read_int32(self):
+ return self._read_num('<i')
+
+ def read_uint32(self):
+ return self._read_num('<I')
+
+ def read_int64(self):
+ return self._read_num('<q')
+
+ def read_uint64(self):
+ return self._read_num('<Q')
+
+ def write_boolean(self, val):
+ return self.write(chr(1) if val else chr(0))
+
+ def write_int16(self, val):
+ return self._write_num('<h', val)
+
+ def write_uint16(self, val):
+ return self._write_num('<H', val)
+
+ def write_int32(self, val):
+ return self._write_num('<i', val)
+
+ def write_uint32(self, val):
+ return self._write_num('<I', val)
+
+ def write_int64(self, val):
+ return self._write_num('<q', val)
+
+ def write_uint64(self, val):
+ return self._write_num('<Q', val)
+
+ def read_compact_size(self):
+ size = ord(self.input[self.read_cursor])
+ self.read_cursor += 1
+ if size == 253:
+ size = self._read_num('<H')
+ elif size == 254:
+ size = self._read_num('<I')
+ elif size == 255:
+ size = self._read_num('<Q')
+ return size
+
+ def write_compact_size(self, size):
+ if size < 0:
+ raise SerializationError("attempt to write size < 0")
+ elif size < 253:
+ self.write(chr(size))
+ elif size < 2**16:
+ self.write('\xfd')
+ self._write_num('<H', size)
+ elif size < 2**32:
+ self.write('\xfe')
+ self._write_num('<I', size)
+ elif size < 2**64:
+ self.write('\xff')
+ self._write_num('<Q', size)
+
+ def _read_num(self, format):
+ (i,) = struct.unpack_from(format, self.input, self.read_cursor)
+ self.read_cursor += struct.calcsize(format)
+ return i
+
+ def _write_num(self, format, num):
+ s = struct.pack(format, num)
+ self.write(s)
-#
-# Workalike python implementation of Bitcoin's CDataStream class.
-#
-import struct
-import StringIO
-import mmap
-class SerializationError(Exception):
- """ Thrown when there's a problem deserializing or serializing """
+class EnumException(Exception):
+ pass
-class BCDataStream(object):
- def __init__(self):
- self.input = None
- self.read_cursor = 0
-
- def clear(self):
- self.input = None
- self.read_cursor = 0
-
- def write(self, bytes): # Initialize with string of bytes
- if self.input is None:
- self.input = bytes
- else:
- self.input += bytes
-
- def map_file(self, file, start): # Initialize with bytes from file
- self.input = mmap.mmap(file.fileno(), 0, access=mmap.ACCESS_READ)
- self.read_cursor = start
- def seek_file(self, position):
- self.read_cursor = position
- def close_file(self):
- self.input.close()
-
- def read_string(self):
- # Strings are encoded depending on length:
- # 0 to 252 : 1-byte-length followed by bytes (if any)
- # 253 to 65,535 : byte'253' 2-byte-length followed by bytes
- # 65,536 to 4,294,967,295 : byte '254' 4-byte-length followed by bytes
- # ... and the Bitcoin client is coded to understand:
- # greater than 4,294,967,295 : byte '255' 8-byte-length followed by bytes of string
- # ... but I don't think it actually handles any strings that big.
- if self.input is None:
- raise SerializationError("call write(bytes) before trying to deserialize")
-
- try:
- length = self.read_compact_size()
- except IndexError:
- raise SerializationError("attempt to read past end of buffer")
-
- return self.read_bytes(length)
-
- def write_string(self, string):
- # Length-encoded as with read-string
- self.write_compact_size(len(string))
- self.write(string)
-
- def read_bytes(self, length):
- try:
- result = self.input[self.read_cursor:self.read_cursor+length]
- self.read_cursor += length
- return result
- except IndexError:
- raise SerializationError("attempt to read past end of buffer")
-
- return ''
-
- def read_boolean(self): return self.read_bytes(1)[0] != chr(0)
- def read_int16(self): return self._read_num('<h')
- def read_uint16(self): return self._read_num('<H')
- def read_int32(self): return self._read_num('<i')
- def read_uint32(self): return self._read_num('<I')
- def read_int64(self): return self._read_num('<q')
- def read_uint64(self): return self._read_num('<Q')
-
- def write_boolean(self, val): return self.write(chr(1) if val else chr(0))
- def write_int16(self, val): return self._write_num('<h', val)
- def write_uint16(self, val): return self._write_num('<H', val)
- def write_int32(self, val): return self._write_num('<i', val)
- def write_uint32(self, val): return self._write_num('<I', val)
- def write_int64(self, val): return self._write_num('<q', val)
- def write_uint64(self, val): return self._write_num('<Q', val)
-
- def read_compact_size(self):
- size = ord(self.input[self.read_cursor])
- self.read_cursor += 1
- if size == 253:
- size = self._read_num('<H')
- elif size == 254:
- size = self._read_num('<I')
- elif size == 255:
- size = self._read_num('<Q')
- return size
-
- def write_compact_size(self, size):
- if size < 0:
- raise SerializationError("attempt to write size < 0")
- elif size < 253:
- self.write(chr(size))
- elif size < 2**16:
- self.write('\xfd')
- self._write_num('<H', size)
- elif size < 2**32:
- self.write('\xfe')
- self._write_num('<I', size)
- elif size < 2**64:
- self.write('\xff')
- self._write_num('<Q', size)
-
- def _read_num(self, format):
- (i,) = struct.unpack_from(format, self.input, self.read_cursor)
- self.read_cursor += struct.calcsize(format)
- return i
-
- def _write_num(self, format, num):
- s = struct.pack(format, num)
- self.write(s)
-#
-# enum-like type
-# From the Python Cookbook, downloaded from http://code.activestate.com/recipes/67107/
-#
-import types, string, exceptions
+class Enumeration:
+ """enum-like type
-class EnumException(exceptions.Exception):
- pass
+ From the Python Cookbook, downloaded from http://code.activestate.com/recipes/67107/
+ """
-class Enumeration:
def __init__(self, name, enumList):
self.__doc__ = name
- lookup = { }
- reverseLookup = { }
+ lookup = {}
+ reverseLookup = {}
i = 0
- uniqueNames = [ ]
- uniqueValues = [ ]
+ uniqueNames = []
+ uniqueValues = []
for x in enumList:
- if type(x) == types.TupleType:
+ if isinstance(x, types.TupleType):
x, i = x
- if type(x) != types.StringType:
- raise EnumException, "enum name is not a string: " + x
- if type(i) != types.IntType:
- raise EnumException, "enum value is not an integer: " + i
+ if not isinstance(x, types.StringType):
+ raise EnumException("enum name is not a string: %r" % x)
+ if not isinstance(i, types.IntType):
+ raise EnumException("enum value is not an integer: %r" % i)
if x in uniqueNames:
- raise EnumException, "enum name is not unique: " + x
+ raise EnumException("enum name is not unique: %r" % x)
if i in uniqueValues:
- raise EnumException, "enum value is not unique for " + x
+ raise EnumException("enum value is not unique for %r" % x)
uniqueNames.append(x)
uniqueValues.append(i)
lookup[x] = i
i = i + 1
self.lookup = lookup
self.reverseLookup = reverseLookup
+
def __getattr__(self, attr):
- if not self.lookup.has_key(attr):
+ if attr not in self.lookup:
raise AttributeError
return self.lookup[attr]
+
def whatis(self, value):
return self.reverseLookup[value]
def long_hex(bytes):
return bytes.encode('hex_codec')
+
# This function comes from bitcointools, bct-LICENSE.txt.
def short_hex(bytes):
t = bytes.encode('hex_codec')
return t[0:4]+"..."+t[-4:]
-
def parse_TxIn(vds):
- d = {}
- d['prevout_hash'] = hash_encode(vds.read_bytes(32))
- d['prevout_n'] = vds.read_uint32()
- scriptSig = vds.read_bytes(vds.read_compact_size())
- d['sequence'] = vds.read_uint32()
- # actually I don't need that at all
- # if not is_coinbase: d['address'] = extract_public_key(scriptSig)
- # d['script'] = decode_script(scriptSig)
- return d
+ d = {}
+ d['prevout_hash'] = hash_encode(vds.read_bytes(32))
+ d['prevout_n'] = vds.read_uint32()
+ scriptSig = vds.read_bytes(vds.read_compact_size())
+ d['sequence'] = vds.read_uint32()
+ # actually I don't need that at all
+ # if not is_coinbase: d['address'] = extract_public_key(scriptSig)
+ # d['script'] = decode_script(scriptSig)
+ return d
def parse_TxOut(vds, i):
- d = {}
- d['value'] = vds.read_int64()
- scriptPubKey = vds.read_bytes(vds.read_compact_size())
- d['address'] = extract_public_key(scriptPubKey)
- #d['script'] = decode_script(scriptPubKey)
- d['raw_output_script'] = scriptPubKey.encode('hex')
- d['index'] = i
- return d
+ d = {}
+ d['value'] = vds.read_int64()
+ scriptPubKey = vds.read_bytes(vds.read_compact_size())
+ d['address'] = extract_public_key(scriptPubKey)
+ #d['script'] = decode_script(scriptPubKey)
+ d['raw_output_script'] = scriptPubKey.encode('hex')
+ d['index'] = i
+ return d
def parse_Transaction(vds, is_coinbase):
- d = {}
- start = vds.read_cursor
- d['version'] = vds.read_int32()
- n_vin = vds.read_compact_size()
- d['inputs'] = []
- for i in xrange(n_vin):
- o = parse_TxIn(vds)
- if not is_coinbase:
- d['inputs'].append(o)
- n_vout = vds.read_compact_size()
- d['outputs'] = []
- for i in xrange(n_vout):
- o = parse_TxOut(vds, i)
-
- #if o['address'] == "None" and o['value']==0:
- # print("skipping strange tx output with zero value")
- # continue
- # if o['address'] != "None":
- d['outputs'].append(o)
-
- d['lockTime'] = vds.read_uint32()
- return d
-
-
+ d = {}
+ start = vds.read_cursor
+ d['version'] = vds.read_int32()
+ n_vin = vds.read_compact_size()
+ d['inputs'] = []
+ for i in xrange(n_vin):
+ o = parse_TxIn(vds)
+ if not is_coinbase:
+ d['inputs'].append(o)
+ n_vout = vds.read_compact_size()
+ d['outputs'] = []
+ for i in xrange(n_vout):
+ o = parse_TxOut(vds, i)
+
+ #if o['address'] == "None" and o['value']==0:
+ # print("skipping strange tx output with zero value")
+ # continue
+ # if o['address'] != "None":
+ d['outputs'].append(o)
+
+ d['lockTime'] = vds.read_uint32()
+ return d
opcodes = Enumeration("Opcodes", [
- ("OP_0", 0), ("OP_PUSHDATA1",76), "OP_PUSHDATA2", "OP_PUSHDATA4", "OP_1NEGATE", "OP_RESERVED",
+ ("OP_0", 0), ("OP_PUSHDATA1", 76), "OP_PUSHDATA2", "OP_PUSHDATA4", "OP_1NEGATE", "OP_RESERVED",
"OP_1", "OP_2", "OP_3", "OP_4", "OP_5", "OP_6", "OP_7",
"OP_8", "OP_9", "OP_10", "OP_11", "OP_12", "OP_13", "OP_14", "OP_15", "OP_16",
"OP_NOP", "OP_VER", "OP_IF", "OP_NOTIF", "OP_VERIF", "OP_VERNOTIF", "OP_ELSE", "OP_ENDIF", "OP_VERIFY",
("OP_INVALIDOPCODE", 0xFFFF),
])
+
def script_GetOp(bytes):
- i = 0
- while i < len(bytes):
- vch = None
- opcode = ord(bytes[i])
- i += 1
- if opcode >= opcodes.OP_SINGLEBYTE_END:
- opcode <<= 8
- opcode |= ord(bytes[i])
- i += 1
-
- if opcode <= opcodes.OP_PUSHDATA4:
- nSize = opcode
- if opcode == opcodes.OP_PUSHDATA1:
- nSize = ord(bytes[i])
+ i = 0
+ while i < len(bytes):
+ vch = None
+ opcode = ord(bytes[i])
i += 1
- elif opcode == opcodes.OP_PUSHDATA2:
- (nSize,) = struct.unpack_from('<H', bytes, i)
- i += 2
- elif opcode == opcodes.OP_PUSHDATA4:
- (nSize,) = struct.unpack_from('<I', bytes, i)
- i += 4
- vch = bytes[i:i+nSize]
- i += nSize
+ if opcode >= opcodes.OP_SINGLEBYTE_END:
+ opcode <<= 8
+ opcode |= ord(bytes[i])
+ i += 1
+
+ if opcode <= opcodes.OP_PUSHDATA4:
+ nSize = opcode
+ if opcode == opcodes.OP_PUSHDATA1:
+ nSize = ord(bytes[i])
+ i += 1
+ elif opcode == opcodes.OP_PUSHDATA2:
+ (nSize,) = struct.unpack_from('<H', bytes, i)
+ i += 2
+ elif opcode == opcodes.OP_PUSHDATA4:
+ (nSize,) = struct.unpack_from('<I', bytes, i)
+ i += 4
+ vch = bytes[i:i+nSize]
+ i += nSize
+
+ yield (opcode, vch, i)
- yield (opcode, vch, i)
def script_GetOpName(opcode):
- return (opcodes.whatis(opcode)).replace("OP_", "")
+ return (opcodes.whatis(opcode)).replace("OP_", "")
+
def decode_script(bytes):
- result = ''
- for (opcode, vch, i) in script_GetOp(bytes):
- if len(result) > 0: result += " "
- if opcode <= opcodes.OP_PUSHDATA4:
- result += "%d:"%(opcode,)
- result += short_hex(vch)
- else:
- result += script_GetOpName(opcode)
- return result
+ result = ''
+ for (opcode, vch, i) in script_GetOp(bytes):
+ if len(result) > 0:
+ result += " "
+ if opcode <= opcodes.OP_PUSHDATA4:
+ result += "%d:" % (opcode,)
+ result += short_hex(vch)
+ else:
+ result += script_GetOpName(opcode)
+ return result
+
def match_decoded(decoded, to_match):
- if len(decoded) != len(to_match):
- return False;
- for i in range(len(decoded)):
- if to_match[i] == opcodes.OP_PUSHDATA4 and decoded[i][0] <= opcodes.OP_PUSHDATA4:
- continue # Opcodes below OP_PUSHDATA4 all just push data onto stack, and are equivalent.
- if to_match[i] != decoded[i][0]:
- return False
- return True
+ if len(decoded) != len(to_match):
+ return False
+ for i in range(len(decoded)):
+ if to_match[i] == opcodes.OP_PUSHDATA4 and decoded[i][0] <= opcodes.OP_PUSHDATA4:
+ continue # Opcodes below OP_PUSHDATA4 all just push data onto stack, and are equivalent.
+ if to_match[i] != decoded[i][0]:
+ return False
+ return True
+
def extract_public_key(bytes):
- decoded = [ x for x in script_GetOp(bytes) ]
-
- # non-generated TxIn transactions push a signature
- # (seventy-something bytes) and then their public key
- # (65 bytes) onto the stack:
- match = [ opcodes.OP_PUSHDATA4, opcodes.OP_PUSHDATA4 ]
- if match_decoded(decoded, match):
- return public_key_to_bc_address(decoded[1][1])
-
- # The Genesis Block, self-payments, and pay-by-IP-address payments look like:
- # 65 BYTES:... CHECKSIG
- match = [ opcodes.OP_PUSHDATA4, opcodes.OP_CHECKSIG ]
- if match_decoded(decoded, match):
- return public_key_to_bc_address(decoded[0][1])
-
- # coins sent to black hole
- # DUP HASH160 20 BYTES:... EQUALVERIFY CHECKSIG
- match = [ opcodes.OP_DUP, opcodes.OP_HASH160, opcodes.OP_0, opcodes.OP_EQUALVERIFY, opcodes.OP_CHECKSIG ]
- if match_decoded(decoded, match):
+ decoded = list(script_GetOp(bytes))
+
+ # non-generated TxIn transactions push a signature
+ # (seventy-something bytes) and then their public key
+ # (65 bytes) onto the stack:
+ match = [opcodes.OP_PUSHDATA4, opcodes.OP_PUSHDATA4]
+ if match_decoded(decoded, match):
+ return public_key_to_bc_address(decoded[1][1])
+
+ # The Genesis Block, self-payments, and pay-by-IP-address payments look like:
+ # 65 BYTES:... CHECKSIG
+ match = [opcodes.OP_PUSHDATA4, opcodes.OP_CHECKSIG]
+ if match_decoded(decoded, match):
+ return public_key_to_bc_address(decoded[0][1])
+
+ # coins sent to black hole
+ # DUP HASH160 20 BYTES:... EQUALVERIFY CHECKSIG
+ match = [opcodes.OP_DUP, opcodes.OP_HASH160, opcodes.OP_0, opcodes.OP_EQUALVERIFY, opcodes.OP_CHECKSIG]
+ if match_decoded(decoded, match):
+ return "None"
+
+ # Pay-by-Bitcoin-address TxOuts look like:
+ # DUP HASH160 20 BYTES:... EQUALVERIFY CHECKSIG
+ match = [opcodes.OP_DUP, opcodes.OP_HASH160, opcodes.OP_PUSHDATA4, opcodes.OP_EQUALVERIFY, opcodes.OP_CHECKSIG]
+ if match_decoded(decoded, match):
+ return hash_160_to_bc_address(decoded[2][1])
+
+ # strange tx
+ match = [opcodes.OP_DUP, opcodes.OP_HASH160, opcodes.OP_PUSHDATA4, opcodes.OP_EQUALVERIFY, opcodes.OP_CHECKSIG, opcodes.OP_NOP]
+ if match_decoded(decoded, match):
+ return hash_160_to_bc_address(decoded[2][1])
+
+ #raise BaseException("address not found in script") see ce35795fb64c268a52324b884793b3165233b1e6d678ccaadf760628ec34d76b
return "None"
-
- # Pay-by-Bitcoin-address TxOuts look like:
- # DUP HASH160 20 BYTES:... EQUALVERIFY CHECKSIG
- match = [ opcodes.OP_DUP, opcodes.OP_HASH160, opcodes.OP_PUSHDATA4, opcodes.OP_EQUALVERIFY, opcodes.OP_CHECKSIG ]
- if match_decoded(decoded, match):
- return hash_160_to_bc_address(decoded[2][1])
-
- # strange tx
- match = [ opcodes.OP_DUP, opcodes.OP_HASH160, opcodes.OP_PUSHDATA4, opcodes.OP_EQUALVERIFY, opcodes.OP_CHECKSIG, opcodes.OP_NOP ]
- if match_decoded(decoded, match):
- return hash_160_to_bc_address(decoded[2][1])
-
- #raise BaseException("address not found in script") see ce35795fb64c268a52324b884793b3165233b1e6d678ccaadf760628ec34d76b
- return "None"
-import threading, socket, traceback, time, sys
-
-def random_string(N):
- import random, string
- return ''.join(random.choice(string.ascii_uppercase + string.digits) for x in range(N))
+import socket
+import sys
+import threading
+import time
+import traceback
from processor import Processor
+from utils import Hash, print_log
from version import VERSION
+
class IrcThread(threading.Thread):
def __init__(self, processor, config):
threading.Thread.__init__(self)
+
self.processor = processor
self.daemon = True
- self.stratum_tcp_port = config.get('server','stratum_tcp_port')
- self.stratum_http_port = config.get('server','stratum_http_port')
- self.stratum_tcp_ssl_port = config.get('server','stratum_tcp_ssl_port')
- self.stratum_http_ssl_port = config.get('server','stratum_http_ssl_port')
- self.report_stratum_tcp_port = config.get('server','report_stratum_tcp_port')
- self.report_stratum_http_port = config.get('server','report_stratum_http_port')
- self.report_stratum_tcp_ssl_port = config.get('server','report_stratum_tcp_ssl_port')
- self.report_stratum_http_ssl_port = config.get('server','report_stratum_http_ssl_port')
+ self.stratum_tcp_port = config.get('server', 'stratum_tcp_port')
+ self.stratum_http_port = config.get('server', 'stratum_http_port')
+ self.stratum_tcp_ssl_port = config.get('server', 'stratum_tcp_ssl_port')
+ self.stratum_http_ssl_port = config.get('server', 'stratum_http_ssl_port')
+ self.report_stratum_tcp_port = config.get('server', 'report_stratum_tcp_port')
+ self.report_stratum_http_port = config.get('server', 'report_stratum_http_port')
+ self.report_stratum_tcp_ssl_port = config.get('server', 'report_stratum_tcp_ssl_port')
+ self.report_stratum_http_ssl_port = config.get('server', 'report_stratum_http_ssl_port')
self.peers = {}
- self.host = config.get('server','host')
- self.report_host = config.get('server','report_host')
+ self.host = config.get('server', 'host')
+ self.report_host = config.get('server', 'report_host')
self.nick = config.get('server', 'irc_nick')
- if self.report_stratum_tcp_port: self.stratum_tcp_port = self.report_stratum_tcp_port
- if self.report_stratum_http_port: self.stratum_http_port = self.report_stratum_http_port
- if self.report_stratum_tcp_ssl_port: self.stratum_tcp_ssl_port = self.report_stratum_tcp_ssl_port
- if self.report_stratum_http_ssl_port: self.stratum_http_ssl_port = self.report_stratum_http_ssl_port
- if self.report_host: self.host = self.report_host
- if not self.nick: self.nick = random_string(10)
+ if self.report_stratum_tcp_port:
+ self.stratum_tcp_port = self.report_stratum_tcp_port
+ if self.report_stratum_http_port:
+ self.stratum_http_port = self.report_stratum_http_port
+ if self.report_stratum_tcp_ssl_port:
+ self.stratum_tcp_ssl_port = self.report_stratum_tcp_ssl_port
+ if self.report_stratum_http_ssl_port:
+ self.stratum_http_ssl_port = self.report_stratum_http_ssl_port
+ if self.report_host:
+ self.host = self.report_host
+ if not self.nick:
+ self.nick = Hash(self.report_host)[:10]
self.prepend = 'E_'
if config.get('server', 'coin') == 'litecoin':
self.prepend = 'EL_'
def get_peers(self):
return self.peers.values()
-
def getname(self):
s = 'v' + VERSION + ' '
- if self.pruning: s += 'p '
+ if self.pruning:
+ s += 'p '
if self.stratum_tcp_port:
- s += 't' + self.stratum_tcp_port + ' '
+ s += 't' + self.stratum_tcp_port + ' '
if self.stratum_http_port:
s += 'h' + self.stratum_http_port + ' '
if self.stratum_tcp_port:
- s += 's' + self.stratum_tcp_ssl_port + ' '
+ s += 's' + self.stratum_tcp_ssl_port + ' '
if self.stratum_http_port:
s += 'g' + self.stratum_http_ssl_port + ' '
return s
-
def run(self):
ircname = self.getname()
sf = s.makefile('r', 0)
t = 0
while not self.processor.shared.stopped():
- line = sf.readline()
- line = line.rstrip('\r\n')
- line = line.split()
- if not line: continue
- if line[0]=='PING':
- s.send('PONG '+line[1]+'\n')
- elif '353' in line: # answer to /names
+ line = sf.readline().rstrip('\r\n').split()
+ if not line:
+ continue
+ if line[0] == 'PING':
+ s.send('PONG ' + line[1] + '\n')
+ elif '353' in line: # answer to /names
k = line.index('353')
for item in line[k+1:]:
if item.startswith(self.prepend):
- s.send('WHO %s\n'%item)
- elif '352' in line: # answer to /who
+ s.send('WHO %s\n' % item)
+ elif '352' in line: # answer to /who
# warning: this is a horrible hack which apparently works
k = line.index('352')
- ip = line[k+4]
- ip = socket.gethostbyname(ip)
+ ip = socket.gethostbyname(line[k+4])
name = line[k+6]
host = line[k+9]
- ports = line[k+10:]
+ ports = line[k+10:]
self.peers[name] = (ip, host, ports)
if time.time() - t > 5*60:
- self.processor.push_response({'method':'server.peers', 'params':[self.get_peers()]})
+ self.processor.push_response({'method': 'server.peers', 'params': [self.get_peers()]})
s.send('NAMES #electrum\n')
t = time.time()
self.peers = {}
sf.close()
s.close()
- print "quitting IRC"
-
+ print_log("quitting IRC")
class ServerProcessor(Processor):
def __init__(self, config):
Processor.__init__(self)
self.daemon = True
- self.banner = config.get('server','banner')
- self.password = config.get('server','password')
+ self.banner = config.get('server', 'banner')
+ self.password = config.get('server', 'password')
if config.get('server', 'irc') == 'yes':
self.irc = IrcThread(self, config)
- else:
+ else:
self.irc = None
-
def get_peers(self):
if self.irc:
return self.irc.get_peers()
else:
return []
-
def run(self):
if self.irc:
self.irc.start()
password = None
if password != self.password:
- response = { 'id':request['id'], 'result':None, 'error':'incorrect password'}
- self.push_response(response)
+ self.push_response({'id': request['id'],
+ 'result': None,
+ 'error': 'incorrect password'})
return
if method == 'server.banner':
- result = self.banner.replace('\\n','\n')
+ result = self.banner.replace('\\n', '\n')
elif method == 'server.peers.subscribe':
result = self.get_peers()
result = 'stopping, please wait until all threads terminate.'
elif method == 'server.info':
- result = map(lambda s: { "time":s.time,
- "name":s.name,
- "address":s.address,
- "version":s.version,
- "subscriptions":len(s.subscriptions)},
+ result = map(lambda s: {"time": s.time,
+ "name": s.name,
+ "address": s.address,
+ "version": s.version,
+ "subscriptions": len(s.subscriptions)},
self.dispatcher.request_dispatcher.get_sessions())
elif method == 'server.cache':
result = p.queue.qsize()
else:
- print "unknown method", request
-
- if result!='':
- response = { 'id':request['id'], 'result':result }
- self.push_response(response)
+ print_log("unknown method", request)
+ if result != '':
+ self.push_response({'id': request['id'], 'result': result})
-import bitcoin
-from bitcoin import bind, _1, _2, _3
-from processor import Processor
import threading
import time
+import bitcoin
+from bitcoin import bind, _1, _2, _3
+
+from processor import Processor
import history1 as history
import membuf
+
class HistoryCache:
def __init__(self):
def clear(self, addresses):
with self.lock:
for address in addresses:
- if self.cache.has_key(address):
+ if address in self.cache:
del self.cache[address]
+
class MonitorAddress:
def __init__(self, processor, cache, backend):
def monitor(self, address, result):
for info in result:
- if not info.has_key("raw_output_script"):
+ if "raw_output_script" not in info:
continue
assert info["is_input"] == 0
tx_hash = info["tx_hash"]
response = {"id": None,
"method": "blockchain.address.subscribe",
"params": [str(address)]}
- history.payment_history(service, chain, txpool, memory_buff,
- address, bind(self.send_notify, _1, _2, response))
+ history.payment_history(service, chain, txpool, memory_buff, address,
+ bind(self.send_notify, _1, _2, response))
def mempool_n(self, result):
assert result is not None
response["params"].append(self.mempool_n(result))
self.processor.push_response(response)
+
class Backend:
def __init__(self):
else:
print "Accepted transaction", tx_hash
+
class GhostValue:
def __init__(self):
self.value = value
self.event.set()
+
class NumblocksSubscribe:
def __init__(self, backend, processor):
"error": None}
self.processor.push_response(response)
+
class AddressGetHistory:
def __init__(self, backend, processor):
chain = self.backend.blockchain
txpool = self.backend.transaction_pool
memory_buff = self.backend.memory_buffer
- history.payment_history(service, chain, txpool, memory_buff,
- address, bind(self.respond, _1, _2, request))
+ history.payment_history(service, chain, txpool, memory_buff, address,
+ bind(self.respond, _1, _2, request))
def respond(self, ec, result, request):
if ec:
response = {"id": request["id"], "result": result, "error": None}
self.processor.push_response(response)
+
class AddressSubscribe:
def __init__(self, backend, processor, cache, monitor):
chain = self.backend.blockchain
txpool = self.backend.transaction_pool
memory_buff = self.backend.memory_buffer
- history.payment_history(service, chain, txpool, memory_buff,
- address, bind(self.construct, _1, _2, request))
+ history.payment_history(service, chain, txpool, memory_buff, address,
+ bind(self.construct, _1, _2, request))
def construct(self, ec, result, request):
if ec:
self.processor.push_response(response)
return True
+
class BlockchainProcessor(Processor):
def __init__(self, config):
try:
tx = exporter.load_transaction(raw_tx)
except RuntimeError:
- response = {"id": request["id"], "result": None,
- "error": {"message":
- "Exception while parsing the transaction data.",
- "code": -4}}
+ response = {
+ "id": request["id"],
+ "result": None,
+ "error": {
+ "message": "Exception while parsing the transaction data.",
+ "code": -4,
+ }
+ }
else:
self.backend.protocol.broadcast_transaction(tx)
tx_hash = str(bitcoin.hash_transaction(tx))
response = {"id": request["id"], "result": tx_hash, "error": None}
self.push_response(response)
-
-import bitcoin
import threading
import time
+import bitcoin
+
+
class ExpiryQueue(threading.Thread):
def __init__(self):
expiry_queue = ExpiryQueue()
+
class StatementLine:
def __init__(self, output_point):
return False
return True
+
class PaymentHistory:
def __init__(self, chain):
for outpoint in output_points:
statement_line = StatementLine(outpoint)
self.statement.append(statement_line)
- self.chain.fetch_spend(outpoint,
- bitcoin.bind(self.load_spend,
- bitcoin._1, bitcoin._2, statement_line))
+ self.chain.fetch_spend(
+ outpoint,
+ bitcoin.bind(self.load_spend, bitcoin._1, bitcoin._2, statement_line)
+ )
self.load_tx_info(outpoint, statement_line, False)
def load_spend(self, ec, inpoint, statement_line):
line.input_loaded["value"] = -line.output_loaded["value"]
result.append(line.input_loaded)
else:
- line.output_loaded["raw_output_script"] = \
- line.raw_output_script
+ line.output_loaded["raw_output_script"] = line.raw_output_script
result.append(line.output_loaded)
self.handle_finish(result)
self.stop()
info["tx_hash"] = str(point.hash)
info["index"] = point.index
info["is_input"] = 1 if is_input else 0
- self.chain.fetch_transaction_index(point.hash,
- bitcoin.bind(self.tx_index, bitcoin._1, bitcoin._2, bitcoin._3,
- statement_line, info))
+ self.chain.fetch_transaction_index(
+ point.hash,
+ bitcoin.bind(self.tx_index, bitcoin._1, bitcoin._2, bitcoin._3, statement_line, info)
+ )
def tx_index(self, ec, block_depth, offset, statement_line, info):
info["height"] = block_depth
- self.chain.fetch_block_header_by_depth(block_depth,
- bitcoin.bind(self.block_header, bitcoin._1, bitcoin._2,
- statement_line, info))
+ self.chain.fetch_block_header_by_depth(
+ block_depth,
+ bitcoin.bind(self.block_header, bitcoin._1, bitcoin._2, statement_line, info)
+ )
def block_header(self, ec, blk_head, statement_line, info):
info["timestamp"] = blk_head.timestamp
info["block_hash"] = str(bitcoin.hash_block_header(blk_head))
tx_hash = bitcoin.hash_digest(info["tx_hash"])
- self.chain.fetch_transaction(tx_hash,
- bitcoin.bind(self.load_tx, bitcoin._1, bitcoin._2,
- statement_line, info))
+ self.chain.fetch_transaction(
+ tx_hash,
+ bitcoin.bind(self.load_tx, bitcoin._1, bitcoin._2, statement_line, info)
+ )
def load_tx(self, ec, tx, statement_line, info):
outputs = []
for tx_idx, tx_in in enumerate(tx.inputs):
if info["is_input"] == 1 and info["index"] == tx_idx:
continue
- self.chain.fetch_transaction(tx_in.previous_output.hash,
- bitcoin.bind(self.load_input, bitcoin._1, bitcoin._2,
- tx_in.previous_output.index, statement_line, info, tx_idx))
+ self.chain.fetch_transaction(
+ tx_in.previous_output.hash,
+ bitcoin.bind(self.load_input, bitcoin._1, bitcoin._2, tx_in.previous_output.index, statement_line, info, tx_idx)
+ )
def load_input(self, ec, tx, index, statement_line, info, inputs_index):
script = tx.outputs[index].output_script
statement_line.output_loaded = info
self.finish_if_done()
+
def payment_history(chain, address, handle_finish):
ph = PaymentHistory(chain)
expiry_queue.add(ph)
ph.run(address, handle_finish)
+
if __name__ == "__main__":
def finish(result):
print result
+
def last(ec, depth):
print "D:", depth
print "Looking up", address
payment_history(chain, address, finish)
raw_input()
-
import history1 as history
import membuf
+
def blockchain_started(ec, chain):
print "Blockchain initialisation:", ec
+
def finish(ec, result):
print "Finish:", ec
for line in result:
print begin, " " * (12 - len(begin)), v
print
+
a = bitcoin.async_service(1)
chain = bitcoin.bdb_blockchain(a, "/home/genjix/libbitcoin/database",
blockchain_started)
txdat = bitcoin.data_chunk("0100000001d6cad920a04acd6c0609cd91fe4dafa1f3b933ac90e032c78fdc19d98785f2bb010000008b483045022043f8ce02784bd7231cb362a602920f2566c18e1877320bf17d4eabdac1019b2f022100f1fd06c57330683dff50e1b4571fb0cdab9592f36e3d7e98d8ce3f94ce3f255b01410453aa8d5ddef56731177915b7b902336109326f883be759ec9da9c8f1212c6fa3387629d06e5bf5e6bcc62ec5a70d650c3b1266bb0bcc65ca900cff5311cb958bffffffff0280969800000000001976a9146025cabdbf823949f85595f3d1c54c54cd67058b88ac602d2d1d000000001976a914c55c43631ab14f7c4fd9c5f153f6b9123ec32c8888ac00000000")
ex = bitcoin.satoshi_exporter()
tx = ex.load_transaction(txdat)
+
+
def stored(ec):
print "mbuff", ec
+
mbuff = membuf.memory_buffer(a.internal_ptr, chain.internal_ptr,
txpool.internal_ptr)
mbuff.receive(tx, stored)
raw_input()
history.payment_history(a, chain, txpool, mbuff, address, finish)
raw_input()
-
+import threading
+import time
+
import bitcoin
from bitcoin import bind, _1, _2, _3
-import threading
import multimap
-import time
+
class ExpiryQueue(threading.Thread):
with self.lock:
self.items.append(item)
+
expiry_queue = ExpiryQueue()
+
class MemoryPoolBuffer:
def __init__(self, txpool, chain, monitor):
address = bitcoin.payment_address()
if address.extract(output.output_script):
desc[2].append((idx, str(address)))
- self.txpool.store(tx,
+ self.txpool.store(
+ tx,
bind(self.confirmed, _1, desc),
- bind(self.mempool_stored, _1, desc, handle_store))
+ bind(self.mempool_stored, _1, desc, handle_store)
+ )
def mempool_stored(self, ec, desc, handle_store):
tx_hash, prevouts, addrs = desc
pass
result = []
for outpoint in output_points:
- if self.lookup_input.has_key(str(outpoint)):
+ if str(outpoint) in self.lookup_input:
point = self.lookup_input[str(outpoint)]
info = ExtendableDict()
info["tx_hash"] = point[0]
info["is_input"] = 1
info["timestamp"] = self.timestamps[info["tx_hash"]]
result.append(info)
- if self.lookup_address.has_key(str(address)):
+ if str(address) in self.lookup_address:
addr_points = self.lookup_address[str(address)]
for point in addr_points:
info = ExtendableDict()
result.append(info)
handle(result)
+
class PaymentEntry:
def __init__(self, output_point):
def has_input(self):
return self.input_point is not False
+
class History:
def __init__(self, chain, txpool, membuf):
address = bitcoin.payment_address(address)
# To begin we fetch all the outputs (payments in)
# associated with this address
- self.chain.fetch_outputs(address,
- bind(self.check_membuf, _1, _2))
+ self.chain.fetch_outputs(address, bind(self.check_membuf, _1, _2))
def stop(self):
with self.lock:
- assert self._stopped == False
+ assert self._stopped is False
self._stopped = True
def stopped(self):
def check_membuf(self, ec, output_points):
if self.stop_on_error(ec):
return
- self.membuf.check(output_points, self.address,
- bind(self.start_loading, _1, output_points))
+ self.membuf.check(output_points, self.address, bind(self.start_loading, _1, output_points))
def start_loading(self, membuf_result, output_points):
if len(membuf_result) == 0 and len(output_points) == 0:
with self.lock:
self.statement.append(entry)
# Attempt to fetch the spend of this output
- self.chain.fetch_spend(outpoint,
- bind(self.load_spend, _1, _2, entry))
+ self.chain.fetch_spend(outpoint, bind(self.load_spend, _1, _2, entry))
self.load_tx_info(outpoint, entry, False)
# Load memory pool transactions
with self.lock:
self.membuf_result = membuf_result
for info in self.membuf_result:
- self.txpool.fetch(bitcoin.hash_digest(info["tx_hash"]),
- bind(self.load_pool_tx, _1, _2, info))
+ self.txpool.fetch(bitcoin.hash_digest(info["tx_hash"]), bind(self.load_pool_tx, _1, _2, info))
def load_spend(self, ec, inpoint, entry):
# Need a custom self.stop_on_error(...) as a missing spend
if any(not entry.is_loaded() for entry in self.statement):
return
# Memory buffer transactions finished loading?
- if any(not info.has_key("height") for info in self.membuf_result):
+ if any("height" not in info for info in self.membuf_result):
return
# Whole operation completed successfully! Finish up.
result = []
# Before loading the transaction, Stratum requires the hash
# of the parent block, so we load the block depth and then
# fetch the block header and hash it.
- self.chain.fetch_transaction_index(point.hash,
- bind(self.tx_index, _1, _2, _3, entry, info))
+ self.chain.fetch_transaction_index(point.hash, bind(self.tx_index, _1, _2, _3, entry, info))
def tx_index(self, ec, block_depth, offset, entry, info):
if self.stop_on_error(ec):
return
info["height"] = block_depth
# And now for the block hash
- self.chain.fetch_block_header_by_depth(block_depth,
- bind(self.block_header, _1, _2, entry, info))
+ self.chain.fetch_block_header_by_depth(block_depth, bind(self.block_header, _1, _2, entry, info))
def block_header(self, ec, blk_head, entry, info):
if self.stop_on_error(ec):
info["block_hash"] = str(bitcoin.hash_block_header(blk_head))
tx_hash = bitcoin.hash_digest(info["tx_hash"])
# Now load the actual main transaction for this input or output
- self.chain.fetch_transaction(tx_hash,
- bind(self.load_chain_tx, _1, _2, entry, info))
+ self.chain.fetch_transaction(tx_hash, bind(self.load_chain_tx, _1, _2, entry, info))
def load_pool_tx(self, ec, tx, info):
if self.stop_on_error(ec):
info["block_hash"] = "mempool"
self.finish_if_done()
create_handler = lambda prevout_index, input_index: \
- bind(self.load_input_pool_tx, _1, _2,
- prevout_index, info, input_index)
+ bind(self.load_input_pool_tx, _1, _2, prevout_index, info, input_index)
self.fetch_input_txs(tx, info, create_handler)
def load_tx(self, tx, info):
entry.input_loaded = info
self.finish_if_done()
create_handler = lambda prevout_index, input_index: \
- bind(self.load_input_chain_tx, _1, _2,
- prevout_index, entry, info, input_index)
+ bind(self.load_input_chain_tx, _1, _2, prevout_index, entry, info, input_index)
self.fetch_input_txs(tx, info, create_handler)
def inputs_all_loaded(self, info_inputs):
info["block_hash"] = "mempool"
self.finish_if_done()
+
def payment_history(chain, txpool, membuf, address, handle_finish):
h = History(chain, txpool, membuf)
expiry_queue.add(h)
h.start(address, handle_finish)
+
if __name__ == "__main__":
ex = bitcoin.satoshi_exporter()
tx_a = bitcoin.data_chunk("0100000003d0406a31f628e18f5d894b2eaf4af719906dc61be4fb433a484ed870f6112d15000000008b48304502210089c11db8c1524d8839243803ac71e536f3d876e8265bbb3bc4a722a5d0bd40aa022058c3e59a7842ef1504b1c2ce048f9af2d69bbf303401dced1f68b38d672098a10141046060f6c8e355b94375eec2cc1d231f8044e811552d54a7c4b36fe8ee564861d07545c6c9d5b9f60d16e67d683b93486c01d3bd3b64d142f48af70bb7867d0ffbffffffff6152ed1552b1f2635317cea7be06615a077fc0f4aa62795872836c4182ca0f25000000008b48304502205f75a468ddb08070d235f76cb94c3f3e2a75e537bc55d087cc3e2a1559b7ac9b022100b17e4c958aaaf9b93359f5476aa5ed438422167e294e7207d5cfc105e897ed91014104a7108ec63464d6735302085124f3b7a06aa8f9363eab1f85f49a21689b286eb80fbabda7f838d9b6bff8550b377ad790b41512622518801c5230463dbbff6001ffffffff01c52914dcb0f3d8822e5a9e3374e5893a7b6033c9cfce5a8e5e6a1b3222a5cb010000008c4930460221009561f7206cc98f40f3eab5f3308b12846d76523bd07b5f058463f387694452b2022100b2684ec201760fa80b02954e588f071e46d0ff16562c1ab393888416bf8fcc44014104a7108ec63464d6735302085124f3b7a06aa8f9363eab1f85f49a21689b286eb80fbabda7f838d9b6bff8550b377ad790b41512622518801c5230463dbbff6001ffffffff02407e0f00000000001976a914c3b98829108923c41b3c1ba6740ecb678752fd5e88ac40420f00000000001976a914424648ea6548cc1c4ea707c7ca58e6131791785188ac00000000")
def blockchain_started(ec, chain):
print "Blockchain initialisation:", ec
+
def store_tx(ec):
print "Tx", ec
+
def finish(result):
print "Finish"
if result is None:
class FakeMonitor:
def tx_stored(self, tx):
pass
+
def tx_confirmed(self, tx):
pass
#payment_history(chain, txpool, membuf, address[1], finish)
raw_input()
print "Stopping..."
-
-import _history
-from bitcoin import bind, _1, _2
import json
+from bitcoin import bind, _1, _2
+import _history
+
+
def wrap_finish(handle_finish, ec, result_json):
try:
result = json.loads(result_json)
else:
handle_finish(ec, result)
+
def payment_history(service, chain, txpool, membuf, address, finish):
_history.payment_history(service.internal_ptr, chain.internal_ptr,
txpool.internal_ptr, membuf.internal_ptr,
str(address), bind(wrap_finish, finish, _1, _2))
-
return self.multi[key]
def __setitem__(self, key, value):
- if not self.multi.has_key(key):
+ if key not in self.multi:
self.multi[key] = []
self.multi[key].append(value)
def __repr__(self):
return repr(self.multi)
+
def __str__(self):
return str(self.multi)
def has_key(self, key):
- return self.multi.has_key(key)
+ return key in self.multi
+
if __name__ == "__main__":
m = MultiMap()
m.delete("foo", 1)
m.delete("bar", 2)
print m.multi
-
-import bitcoin, trace_tx
+import bitcoin
+
+import trace_tx
+
def blockchain_started(ec, chain):
print "Blockchain initialisation:", ec
+
+
def handle_tx(ec, tx):
if ec:
print ec
trace_tx.trace_tx(service.internal_ptr, chain.internal_ptr, tx, finish)
+
def finish(ec, result):
print ec
print result
-service = bitcoin.async_service(1)
-chain = bitcoin.bdb_blockchain(service, "/home/genjix/libbitcoin/database",
- blockchain_started)
-chain.fetch_transaction(
- bitcoin.hash_digest("16e3e3bfbaa072e33e6a9be1df7a13ecde5ad46a8d4d4893dbecaf0c0aeeb842"),
- handle_tx)
-raw_input()
+if __name__ == '__main__':
+ service = bitcoin.async_service(1)
+ chain = bitcoin.bdb_blockchain(service, "/home/genjix/libbitcoin/database",
+ blockchain_started)
+ chain.fetch_transaction(
+ bitcoin.hash_digest("16e3e3bfbaa072e33e6a9be1df7a13ecde5ad46a8d4d4893dbecaf0c0aeeb842"),
+ handle_tx
+ )
+ raw_input()
import json
+import Queue as queue
import socket
import threading
import time
-import traceback, sys
-import Queue as queue
-
-def random_string(N):
- import random, string
- return ''.join(random.choice(string.ascii_uppercase + string.digits) for x in range(N))
-
-def timestr():
- return time.strftime("[%d/%m/%Y-%H:%M:%S]")
-
-
-print_lock = threading.Lock()
-def print_log(*args):
- args = [str(item) for item in args]
- with print_lock:
- sys.stderr.write(timestr() + " " + " ".join(args) + "\n")
- sys.stderr.flush()
+import traceback
+import sys
+from utils import random_string, timestr, print_log
class Shared:
self.config = config
def stop(self):
- print_log( "Stopping Stratum" )
+ print_log("Stopping Stratum")
with self.lock:
self._stopped = True
except:
traceback.print_exc(file=sys.stdout)
- print_log( "processor terminating")
-
+ print_log("processor terminating")
class Dispatcher:
self.request_dispatcher.processors[prefix] = processor
-
class RequestDispatcher(threading.Thread):
def __init__(self, shared):
return self.response_queue.get()
def push_request(self, session, item):
- self.request_queue.put((session,item))
+ self.request_queue.put((session, item))
def pop_request(self):
return self.request_queue.get()
self.do_dispatch(session, request)
except:
traceback.print_exc(file=sys.stdout)
-
self.stop()
""" dispatch request to the relevant processor """
method = request['method']
- params = request.get('params',[])
+ params = request.get('params', [])
suffix = method.split('.')[-1]
if session is not None:
try:
p = self.processors[prefix]
except:
- print_log( "error: no processor for", prefix)
+ print_log("error: no processor for", prefix)
return
p.add_request(request)
addr = None
if self.subscriptions:
- print_log( "%4s"%self.name, "%15s"%self.address, "%35s"%addr, "%3d"%len(self.subscriptions), self.version )
+ print_log("%4s" % self.name,
+ "%15s" % self.address,
+ "%35s" % addr,
+ "%3d" % len(self.subscriptions),
+ self.version)
def stopped(self):
with self.lock:
def contains_subscription(self, subdesc):
with self.lock:
return subdesc in self.subscriptions
-
+
class ResponseDispatcher(threading.Thread):
params = response.get('params')
# A notification
- if internal_id is None: # and method is not None and params is not None:
+ if internal_id is None: # and method is not None and params is not None:
found = self.notification(method, params, response)
if not found and method == 'blockchain.address.subscribe':
- params2 = [self.shared.config.get('server','password')] + params
- self.request_dispatcher.push_request(None,{'method':method.replace('.subscribe', '.unsubscribe'), 'params':params2, 'id':None})
+ request = {
+ 'id': None,
+ 'method': method.replace('.subscribe', '.unsubscribe'),
+ 'params': [self.shared.config.get('server', 'password')] + params,
+ }
+ self.request_dispatcher.push_request(None, request)
# A response
- elif internal_id is not None:
+ elif internal_id is not None:
self.send_response(internal_id, response)
else:
- print_log( "no method", response)
+ print_log("no method", response)
def notification(self, method, params, response):
subdesc = Session.build_subdesc(method, params)
if session.contains_subscription(subdesc):
session.send_response(response)
found = True
- # if not found: print_log( "no subscriber for", subdesc)
+ # if not found: print_log("no subscriber for", subdesc)
return found
def send_response(self, internal_id, response):
response['id'] = message_id
session.send_response(response)
#else:
- # print_log( "send_response: no session", message_id, internal_id, response )
-
+ # print_log("send_response: no session", message_id, internal_id, response )
# License along with this program. If not, see
# <http://www.gnu.org/licenses/agpl.html>.
-import time, sys, traceback, threading
import ConfigParser
-
import logging
+import socket
+import sys
+import time
+import threading
+import traceback
+
+import json
+
logging.basicConfig()
if sys.maxsize <= 2**32:
except IOError:
pass
+
def create_config():
config = ConfigParser.ConfigParser()
# set some defaults, which will be overwritten by the config file
try:
with open('/etc/electrum.banner', 'r') as f:
- config.set('server','banner', f.read())
+ config.set('server', 'banner', f.read())
except IOError:
pass
return config
+
def run_rpc_command(command, stratum_tcp_port):
- import socket, json
try:
- s = socket.socket( socket.AF_INET, socket.SOCK_STREAM )
- s.connect(( host, int(stratum_tcp_port )))
+ s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
+ s.connect((host, int(stratum_tcp_port)))
except:
print "cannot connect to server."
return
method = 'server.' + command
- request = json.dumps( { 'id':0, 'method':method, 'params':[password] } )
+ request = json.dumps({'id': 0, 'method': method, 'params': [password]})
s.send(request + '\n')
msg = ''
while True:
o = s.recv(1024)
msg += o
- if msg.find('\n') != -1: break
+ if msg.find('\n') != -1:
+ break
s.close()
r = json.loads(msg).get('result')
- if command == 'info':
+ if command == 'info':
now = time.time()
- print 'type address sub version time'
+ print 'type address sub version time'
for item in r:
- print '%4s %15s %3s %7s %.2f'%( item.get('name'),
- item.get('address'),
- item.get('subscriptions'),
- item.get('version'),
- (now - item.get('time')) )
+ print '%4s %15s %3s %7s %.2f' % (item.get('name'),
+ item.get('address'),
+ item.get('subscriptions'),
+ item.get('version'),
+ (now - item.get('time')),
+ )
else:
print r
+
if __name__ == '__main__':
config = create_config()
password = config.get('server', 'password')
ssl_certfile = config.get('server', 'ssl_certfile')
ssl_keyfile = config.get('server', 'ssl_keyfile')
- if stratum_tcp_ssl_port or stratum_http_ssl_port: assert ssl_certfile and ssl_keyfile
+ if stratum_tcp_ssl_port or stratum_http_ssl_port:
+ assert ssl_certfile and ssl_keyfile
if len(sys.argv) > 1:
run_rpc_command(sys.argv[1], stratum_tcp_port)
print "Unknown backend '%s' specified\n" % backend_name
sys.exit(1)
- for i in range(5): print ""
- print_log( "Starting Electrum server on", host)
+ for i in xrange(5):
+ print ""
+ print_log("Starting Electrum server on", host)
# Create hub
dispatcher = Dispatcher(config)
except:
shared.stop()
- print_log( "Electrum Server stopped")
-
+ print_log("Electrum Server stopped")
# You should have received a copy of the GNU Affero General Public
# License along with this program. If not, see
# <http://www.gnu.org/licenses/agpl.html>.
+"""
+sessions are identified with cookies
+ - each session has a buffer of responses to requests
-import jsonrpclib
-from jsonrpclib import Fault
-from jsonrpclib.jsonrpc import USE_UNIX_SOCKETS
+
+from the processor point of view:
+ - the user only defines process() ; the rest is session management. thus sessions should not belong to processor
+
+"""
+import json
+import logging
+import os
+import Queue
import SimpleXMLRPCServer
-import SocketServer
import socket
-import logging
-import os, time
-import types
+import SocketServer
+import sys
+import time
+import threading
import traceback
-import sys, threading
+import types
+import jsonrpclib
+from jsonrpclib import Fault
+from jsonrpclib.jsonrpc import USE_UNIX_SOCKETS
from OpenSSL import SSL
try:
# For Windows
fcntl = None
-import json
-
-
-"""
-sessions are identified with cookies
- - each session has a buffer of responses to requests
-
-from the processor point of view:
- - the user only defines process() ; the rest is session management. thus sessions should not belong to processor
-
-"""
-
-
-from processor import random_string, print_log
+from processor import Session
+from utils import random_string, print_log
def get_version(request):
if 'id' in request.keys():
return 1.0
return None
-
+
+
def validate_request(request):
- if type(request) is not types.DictType:
- fault = Fault(
- -32600, 'Request must be {}, not %s.' % type(request)
- )
- return fault
+ if not isinstance(request, types.DictType):
+ return Fault(-32600, 'Request must be {}, not %s.' % type(request))
rpcid = request.get('id', None)
version = get_version(request)
if not version:
- fault = Fault(-32600, 'Request %s invalid.' % request, rpcid=rpcid)
- return fault
+ return Fault(-32600, 'Request %s invalid.' % request, rpcid=rpcid)
request.setdefault('params', [])
method = request.get('method', None)
params = request.get('params')
param_types = (types.ListType, types.DictType, types.TupleType)
- if not method or type(method) not in types.StringTypes or \
- type(params) not in param_types:
- fault = Fault(
- -32600, 'Invalid request parameters or method.', rpcid=rpcid
- )
- return fault
+ if not method or type(method) not in types.StringTypes or type(params) not in param_types:
+ return Fault(-32600, 'Invalid request parameters or method.', rpcid=rpcid)
return True
+
class StratumJSONRPCDispatcher(SimpleXMLRPCServer.SimpleXMLRPCDispatcher):
def __init__(self, encoding=None):
- SimpleXMLRPCServer.SimpleXMLRPCDispatcher.__init__(self,
- allow_none=True,
- encoding=encoding)
+ # todo: use super
+ SimpleXMLRPCServer.SimpleXMLRPCDispatcher.__init__(self, allow_none=True, encoding=encoding)
- def _marshaled_dispatch(self, session_id, data, dispatch_method = None):
+ def _marshaled_dispatch(self, session_id, data, dispatch_method=None):
response = None
try:
request = jsonrpclib.loads(data)
session.time = time.time()
responses = []
- if type(request) is not types.ListType:
- request = [ request ]
+ if not isinstance(request, types.ListType):
+ request = [request]
for req_entry in request:
result = validate_request(req_entry)
continue
self.dispatcher.do_dispatch(session, req_entry)
-
+
if req_entry['method'] == 'server.stop':
- return json.dumps({'result':'ok'})
+ return json.dumps({'result': 'ok'})
r = self.poll_session(session)
for item in r:
responses.append(json.dumps(item))
-
+
if len(responses) > 1:
response = '[%s]' % ','.join(responses)
elif len(responses) == 1:
return response
-
def create_session(self):
session_id = random_string(10)
session = HttpSession(session_id)
return responses
+class StratumJSONRPCRequestHandler(SimpleXMLRPCServer.SimpleXMLRPCRequestHandler):
-
-class StratumJSONRPCRequestHandler(
- SimpleXMLRPCServer.SimpleXMLRPCRequestHandler):
-
def do_OPTIONS(self):
self.send_response(200)
self.send_header('Allow', 'GET, POST, OPTIONS')
self.send_header('Access-Control-Allow-Headers', '*')
self.send_header('Content-Length', '0')
self.end_headers()
-
+
def do_GET(self):
if not self.is_rpc_path_valid():
self.report_404()
session_id = None
c = self.headers.get('cookie')
if c:
- if c[0:8]=='SESSION=':
+ if c[0:8] == 'SESSION=':
#print "found cookie", c[8:]
session_id = c[8:]
fault = jsonrpclib.Fault(-32603, 'Server error: %s' % trace_string)
response = fault.response()
print "500", trace_string
- if response == None:
+ if response is None:
response = ''
if session_id:
- self.send_header("Set-Cookie", "SESSION=%s"%session_id)
+ self.send_header("Set-Cookie", "SESSION=%s" % session_id)
self.send_header("Content-type", "application/json-rpc")
self.send_header("Access-Control-Allow-Origin", "*")
self.wfile.flush()
self.shutdown_connection()
-
def do_POST(self):
if not self.is_rpc_path_valid():
self.report_404()
session_id = None
c = self.headers.get('cookie')
if c:
- if c[0:8]=='SESSION=':
+ if c[0:8] == 'SESSION=':
#print "found cookie", c[8:]
session_id = c[8:]
fault = jsonrpclib.Fault(-32603, 'Server error: %s' % trace_string)
response = fault.response()
print "500", trace_string
- if response == None:
+ if response is None:
response = ''
if session_id:
- self.send_header("Set-Cookie", "SESSION=%s"%session_id)
+ self.send_header("Set-Cookie", "SESSION=%s" % session_id)
self.send_header("Content-type", "application/json-rpc")
self.send_header("Access-Control-Allow-Origin", "*")
self.server_bind()
self.server_activate()
- def shutdown_request(self,request):
+ def shutdown_request(self, request):
#request.shutdown()
pass
# Unix sockets can't be bound if they already exist in the
# filesystem. The convention of e.g. X11 is to unlink
# before binding again.
- if os.path.exists(addr):
+ if os.path.exists(addr):
try:
os.unlink(addr)
except OSError:
# Unix sockets can't be bound if they already exist in the
# filesystem. The convention of e.g. X11 is to unlink
# before binding again.
- if os.path.exists(addr):
+ if os.path.exists(addr):
try:
os.unlink(addr)
except OSError:
fcntl.fcntl(self.fileno(), fcntl.F_SETFD, flags)
-
-
-
-
-from processor import Session
-import Queue
-
class HttpSession(Session):
def __init__(self, session_id):
self._stopped = True
return self._stopped
+
class HttpServer(threading.Thread):
def __init__(self, dispatcher, host, port, use_ssl, certfile, keyfile):
self.shared = dispatcher.shared
self.keyfile = keyfile
self.lock = threading.Lock()
-
def run(self):
# see http://code.google.com/p/jsonrpclib/
from SocketServer import ThreadingMixIn
if self.use_ssl:
- class StratumThreadedServer(ThreadingMixIn, StratumHTTPSSLServer): pass
- self.server = StratumThreadedServer(( self.host, self.port), self.certfile, self.keyfile)
- print_log( "HTTPS server started.")
+ class StratumThreadedServer(ThreadingMixIn, StratumHTTPSSLServer):
+ pass
+ self.server = StratumThreadedServer((self.host, self.port), self.certfile, self.keyfile)
+ print_log("HTTPS server started.")
else:
- class StratumThreadedServer(ThreadingMixIn, StratumHTTPServer): pass
- self.server = StratumThreadedServer(( self.host, self.port))
- print_log( "HTTP server started.")
+ class StratumThreadedServer(ThreadingMixIn, StratumHTTPServer):
+ pass
+ self.server = StratumThreadedServer((self.host, self.port))
+ print_log("HTTP server started.")
self.server.dispatcher = self.dispatcher
self.server.register_function(None, 'server.stop')
self.server.register_function(None, 'server.info')
self.server.serve_forever()
-
import json
+import Queue as queue
import socket
import threading
import time
-import Queue as queue
-from processor import Session, Dispatcher, print_log
+from processor import Session, Dispatcher
+from utils import print_log
+
class TcpSession(Session):
self.stop()
-
class TcpClientRequestor(threading.Thread):
def __init__(self, dispatcher, session):
raw_command = self.message[0:raw_buffer].strip()
self.message = self.message[raw_buffer + 1:]
- if raw_command == 'quit':
+ if raw_command == 'quit':
self.session.stop()
return False
# Return an error JSON in response.
self.dispatcher.push_response({"error": "syntax error", "request": raw_command})
else:
- self.dispatcher.push_request(self.session,command)
+ self.dispatcher.push_request(self.session, command)
return True
+
class TcpServer(threading.Thread):
def __init__(self, dispatcher, host, port, use_ssl, ssl_certfile, ssl_keyfile):
def run(self):
if self.use_ssl:
- print_log( "TCP/SSL server started.")
+ print_log("TCP/SSL server started.")
else:
- print_log( "TCP server started.")
+ print_log("TCP server started.")
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
sock.bind((self.host, self.port))
self.dispatcher.collect_garbage()
client_req = TcpClientRequestor(self.dispatcher, session)
client_req.start()
-
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
+import base64
+from functools import partial
+from itertools import imap
+import random
+import string
+import threading
+import time
+import hashlib
+import re
+import sys
-
-import hashlib, base64, re
+__b58chars = '123456789ABCDEFGHJKLMNPQRSTUVWXYZabcdefghijkmnopqrstuvwxyz'
+__b58base = len(__b58chars)
def rev_hex(s):
return s.decode('hex')[::-1].encode('hex')
+
def int_to_hex(i, length=1):
s = hex(i)[2:].rstrip('L')
s = "0"*(2*length - len(s)) + s
return rev_hex(s)
+
def var_int(i):
- if i<0xfd:
+ if i < 0xfd:
return int_to_hex(i)
- elif i<=0xffff:
- return "fd"+int_to_hex(i,2)
- elif i<=0xffffffff:
- return "fe"+int_to_hex(i,4)
+ elif i <= 0xffff:
+ return "fd" + int_to_hex(i, 2)
+ elif i <= 0xffffffff:
+ return "fe" + int_to_hex(i, 4)
else:
- return "ff"+int_to_hex(i,8)
+ return "ff" + int_to_hex(i, 8)
Hash = lambda x: hashlib.sha256(hashlib.sha256(x).digest()).digest()
+
+
hash_encode = lambda x: x[::-1].encode('hex')
+
+
hash_decode = lambda x: x.decode('hex')[::-1]
def header_to_string(res):
pbh = res.get('prev_block_hash')
- if pbh is None: pbh = '0'*64
- s = int_to_hex(res.get('version'),4) \
+ if pbh is None:
+ pbh = '0'*64
+
+ return int_to_hex(res.get('version'), 4) \
+ rev_hex(pbh) \
+ rev_hex(res.get('merkle_root')) \
- + int_to_hex(int(res.get('timestamp')),4) \
- + int_to_hex(int(res.get('bits')),4) \
- + int_to_hex(int(res.get('nonce')),4)
- return s
+ + int_to_hex(int(res.get('timestamp')), 4) \
+ + int_to_hex(int(res.get('bits')), 4) \
+ + int_to_hex(int(res.get('nonce')), 4)
+
+
+def hex_to_int(s):
+ return eval('0x' + s[::-1].encode('hex'))
-def header_from_string( s):
- hex_to_int = lambda s: eval('0x' + s[::-1].encode('hex'))
- h = {}
- h['version'] = hex_to_int(s[0:4])
- h['prev_block_hash'] = hash_encode(s[4:36])
- h['merkle_root'] = hash_encode(s[36:68])
- h['timestamp'] = hex_to_int(s[68:72])
- h['bits'] = hex_to_int(s[72:76])
- h['nonce'] = hex_to_int(s[76:80])
- return h
+def header_from_string(s):
+ return {
+ 'version': hex_to_int(s[0:4]),
+ 'prev_block_hash': hash_encode(s[4:36]),
+ 'merkle_root': hash_encode(s[36:68]),
+ 'timestamp': hex_to_int(s[68:72]),
+ 'bits': hex_to_int(s[72:76]),
+ 'nonce': hex_to_int(s[76:80]),
+ }
-############ functions from pywallet #####################
+
+############ functions from pywallet #####################
addrtype = 0
+
def hash_160(public_key):
try:
md = hashlib.new('ripemd160')
def public_key_to_bc_address(public_key):
- h160 = hash_160(public_key)
- return hash_160_to_bc_address(h160)
+ return hash_160_to_bc_address(hash_160(public_key))
+
def hash_160_to_bc_address(h160):
- if h160 == 'None': return 'None'
+ if h160 == 'None':
+ return 'None'
vh160 = chr(addrtype) + h160
h = Hash(vh160)
addr = vh160 + h[0:4]
return b58encode(addr)
+
def bc_address_to_hash_160(addr):
- if addr == 'None': return 'None'
+ if addr == 'None':
+ return 'None'
bytes = b58decode(addr, 25)
return bytes[1:21]
-__b58chars = '123456789ABCDEFGHJKLMNPQRSTUVWXYZabcdefghijkmnopqrstuvwxyz'
-__b58base = len(__b58chars)
-
def b58encode(v):
- """ encode v, which is a string of bytes, to base58."""
+ """encode v, which is a string of bytes, to base58."""
long_value = 0L
for (i, c) in enumerate(v[::-1]):
# leading 0-bytes in the input become leading-1s
nPad = 0
for c in v:
- if c == '\0': nPad += 1
- else: break
+ if c == '\0':
+ nPad += 1
+ else:
+ break
return (__b58chars[0]*nPad) + result
+
def b58decode(v, length):
""" decode v into a string of len bytes."""
long_value = 0L
nPad = 0
for c in v:
- if c == __b58chars[0]: nPad += 1
- else: break
+ if c == __b58chars[0]:
+ nPad += 1
+ else:
+ break
result = chr(0)*nPad + result
if length is not None and len(result) != length:
hash = Hash(vchIn)
return b58encode(vchIn + hash[0:4])
+
def DecodeBase58Check(psz):
vchRet = b58decode(psz, None)
key = vchRet[0:-4]
else:
return key
+
def PrivKeyToSecret(privkey):
return privkey[9:9+32]
+
def SecretToASecret(secret):
vchIn = chr(addrtype+128) + secret
return EncodeBase58Check(vchIn)
+
def ASecretToSecret(key):
vch = DecodeBase58Check(key)
if vch and vch[0] == chr(addrtype+128):
else:
return False
+
########### end pywallet functions #######################
+def random_string(length):
+ return ''.join(random.choice(string.ascii_uppercase + string.digits) for x in xrange(length))
+
+
+def timestr():
+ return time.strftime("[%d/%m/%Y-%H:%M:%S]")
+
+
+print_lock = threading.Lock()
+
+
+def print_log(*args):
+ with print_lock:
+ sys.stderr.write(timestr() + " " + " ".join(imap(str, args)) + "\n")
+ sys.stderr.flush()