new bitcoind backend (experimental)
authorThomasV <thomasv@gitorious>
Sun, 11 Nov 2012 16:45:50 +0000 (20:45 +0400)
committerThomasV <thomasv@gitorious>
Sun, 11 Nov 2012 16:45:50 +0000 (20:45 +0400)
backends/bitcoind/__init__.py [new file with mode: 0644]
backends/bitcoind/blockchain_processor.py [new file with mode: 0644]
backends/bitcoind/deserialize.py [new file with mode: 0644]
processor.py
server.py
transports/stratum_http.py
transports/stratum_tcp.py

diff --git a/backends/bitcoind/__init__.py b/backends/bitcoind/__init__.py
new file mode 100644 (file)
index 0000000..7a67281
--- /dev/null
@@ -0,0 +1 @@
+from blockchain_processor import Blockchain2Processor
diff --git a/backends/bitcoind/blockchain_processor.py b/backends/bitcoind/blockchain_processor.py
new file mode 100644 (file)
index 0000000..24aa0d0
--- /dev/null
@@ -0,0 +1,512 @@
+from json import dumps, loads
+import leveldb, urllib
+import deserialize
+import ast, time, threading, hashlib
+from Queue import Queue
+
+
+def rev_hex(s):
+    return s.decode('hex')[::-1].encode('hex')
+
+
+def int_to_hex(i, length=1):
+    s = hex(i)[2:].rstrip('L')
+    s = "0"*(2*length - len(s)) + s
+    return rev_hex(s)
+
+
+from processor import Processor, print_log
+
+
+class Blockchain2Processor(Processor):
+
+    def __init__(self, config):
+        Processor.__init__(self)
+
+        self.watched_addresses = []
+        self.history_cache = {}
+        self.chunk_cache = {}
+        self.cache_lock = threading.Lock()
+
+        self.mempool_hist = {}
+        self.known_mempool_hashes = []
+        self.address_queue = Queue()
+
+        self.dblock = threading.Lock()
+        try:
+            self.db = leveldb.LevelDB(config.get('leveldb', 'path'))
+        except:
+            traceback.print_exc(file=sys.stdout)
+            self.shared.stop()
+
+        self.bitcoind_url = 'http://%s:%s@%s:%s/' % (
+            config.get('bitcoind','user'),
+            config.get('bitcoind','password'),
+            config.get('bitcoind','host'),
+            config.get('bitcoind','port'))
+
+        self.height = 0
+        self.sent_height = 0
+        self.sent_header = None
+
+        # catch_up first
+        try:
+            hist = self.deserialize(self.db.Get('0'))
+            hh, self.height = hist[0] 
+            self.block_hashes = [hh]
+            print_log( "hist", hist )
+        except:
+            traceback.print_exc(file=sys.stdout)
+            self.height = 0
+            self.block_hashes = [ '000000000019d6689c085ae165831e934ff763ae46a2a6c172b3f1b60a8ce26f' ]
+
+        threading.Timer(10, self.main_iteration).start()
+
+
+    def bitcoind(self, method, params=[]):
+        postdata = dumps({"method": method, 'params': params, 'id':'jsonrpc'})
+        respdata = urllib.urlopen(self.bitcoind_url, postdata).read()
+        r = loads(respdata)
+        if r['error'] != None:
+            raise BaseException(r['error'])
+        return r.get('result')
+    
+
+    def serialize(self, h):
+        s = ''
+        for txid, height in h:
+            s += txid + int_to_hex(height, 4)
+        return s.decode('hex')
+
+    def deserialize(self, s):
+        h = []
+        while s:
+            txid = s[0:32].encode('hex')
+            height = s[32:36].encode('hex')
+            height = int( rev_hex( height ), 16 )
+            h.append( ( txid, height ) )
+            s = s[36:]
+        return h
+
+
+    def block2header(self, b):
+        return {"block_height":b.get('height'), "version":b.get('version'), "prev_block_hash":b.get('previousblockhash'), 
+                "merkle_root":b.get('merkleroot'), "timestamp":b.get('time'), "bits":b.get('bits'), "nonce":b.get('nonce')}
+
+    def get_header(self, height):
+        block_hash = self.bitcoind('getblockhash', [height])
+        b = self.bitcoind('getblock', [block_hash])
+        return self.block2header(b)
+    
+
+    def get_chunk(self):
+        # store them on disk; store the current chunk in memory
+        pass
+
+
+    def get_transaction(self, txid, block_height=-1):
+        raw_tx = self.bitcoind('getrawtransaction', [txid, 0, block_height])
+        vds = deserialize.BCDataStream()
+        vds.write(raw_tx.decode('hex'))
+        return deserialize.parse_Transaction(vds)
+
+
+    def get_history(self, addr, cache_only=False):
+        with self.cache_lock: hist = self.history_cache.get( addr )
+        if hist is not None: return hist
+        if cache_only: return -1
+
+        with self.dblock:
+            try:
+                hist = self.deserialize(self.db.Get(addr))
+            except: 
+                hist = []
+
+        # should not be necessary
+        hist.sort( key=lambda tup: tup[1])
+        # check uniqueness too...
+
+        # add memory pool
+        for txid in self.mempool_hist.get(addr,[]):
+            hist.append((txid, 0))
+
+        hist = map(lambda x: {'tx_hash':x[0], 'height':x[1]}, hist)
+        with self.cache_lock: self.history_cache[addr] = hist
+        return hist
+
+
+    def get_status(self, addr, cache_only=False):
+        tx_points = self.get_history(addr, cache_only)
+        if cache_only and tx_points == -1: return -1
+
+        if not tx_points: return None
+        status = ''
+        for tx in tx_points:
+            status += tx.get('tx_hash') + ':%d:' % tx.get('height')
+        return hashlib.sha256( status ).digest().encode('hex')
+
+
+    def get_merkle(self, target_hash, height):
+
+        block_hash = self.bitcoind('getblockhash', [height])
+        b = self.bitcoind('getblock', [block_hash])
+        merkle = b.get('tx')
+
+        s = []
+        while len(merkle) != 1:
+            if len(merkle)%2: merkle.append( merkle[-1] )
+            n = []
+            while merkle:
+                new_hash = Hash( merkle[0] + merkle[1] )
+                if merkle[0] == target_hash:
+                    s.append( merkle[1])
+                    target_hash = new_hash
+                elif merkle[1] == target_hash:
+                    s.append( merkle[0])
+                    target_hash = new_hash
+                n.append( new_hash )
+                merkle = merkle[2:]
+            merkle = n
+
+        return {"block_height":height, "merkle":s, "pos":tx_pos}
+
+        
+
+
+    def import_block(self, block, block_hash, block_height):
+        #print "importing block", block_hash, block_height
+
+        txlist = block.get('tx')
+        batch_list = {}
+
+        for txid in txlist:
+            tx = self.get_transaction(txid, block_height)
+            for x in tx.get('inputs') + tx.get('outputs'):
+                addr = x.get('address')
+                serialized_hist = batch_list.get(addr)
+                if serialized_hist is None:
+                    try:
+                        serialized_hist = self.db.Get(addr)
+                    except: 
+                        serialized_hist = ''
+
+                s = (txid + int_to_hex(block_height, 4)).decode('hex')
+
+                found = False
+                for i in range(len(serialized_hist)/36):
+                    item = serialized_hist[-36*(1+i):]
+                    item = item[0:36]
+
+                    h = int( rev_hex( item[32:36].encode('hex') ), 16 )
+                    if h > block_height:
+                        txhash = item[0:32].encode('hex')
+                        print_log('warning: non-chronological order at', addr, (txhash, h), (txid, block_height))
+                        hist = self.deserialize(serialized_hist)
+                        print_log(hist)
+                        hist.sort( key=lambda tup: tup[1])
+                        while hist:
+                            last = hist[-1]
+                            if last[1] > block_height:
+                                hist = hist[0:-1]
+                            else:
+                                break
+                        found = (txhash, h) in hist
+                        print_log('new sorted hist', hist, found)
+                        serialized_hist = self.serialize(hist)
+                        break
+                    elif h < block_height:
+                        break
+                    elif item == s:
+                        found = True
+                        break
+
+                if not found:
+                    serialized_hist += s
+
+                batch_list[addr] = serialized_hist
+
+        # batch write
+        batch = leveldb.WriteBatch()
+        for addr, hist in batch_list.items():
+            batch.Put(addr, serialized_hist)
+        batch.Put('0', self.serialize( [(block_hash, block_height)] ) )
+        self.db.Write(batch, sync = True)
+
+        # invalidate cache
+        for addr in batch_list.keys(): self.update_history_cache(addr)
+
+        return len(txlist)
+
+
+
+    def revert_block(self, block, block_hash, block_height):
+
+        txlist = block.get('tx')
+        batch_list = {}
+
+        for txid in txlist:
+            tx = self.get_transaction(txid, block_height)
+            for x in tx.get('inputs') + tx.get('outputs'):
+
+                addr = x.get('address')
+
+                hist = batch_list.get(addr)
+                if hist is None:
+                    try:
+                        hist = self.deserialize(self.db.Get(addr))
+                    except: 
+                        hist = []
+
+                if (txid, block_height) in hist:
+                    hist.remove( (txid, block_height) )
+                else:
+                    print "error: txid not found during block revert", txid, block_height
+
+                batch_list[addr] = hist
+
+        # batch write
+        batch = leveldb.WriteBatch()
+        for addr, hist in batch_list.items():
+            batch.Put(addr, self.serialize(hist))
+        batch.Put('0', self.serialize( [(block_hash, block_height)] ) )
+        self.db.Write(batch, sync = True)
+
+        # invalidate cache
+        for addr in batch_list.keys(): self.update_history_cache(addr)
+
+        return len(txlist)
+
+
+
+    def add_request(self, request):
+        # see if we can get if from cache. if not, add to queue
+        if self.process( request, cache_only = True) == -1:
+            self.queue.put(request)
+
+
+
+    def process(self, request, cache_only = False):
+        #print "abe process", request
+
+        message_id = request['id']
+        method = request['method']
+        params = request.get('params',[])
+        result = None
+        error = None
+
+        if method == 'blockchain2.numblocks.subscribe':
+            result = self.height
+
+        elif method == 'blockchain2.headers.subscribe':
+            result = self.header
+
+        elif method == 'blockchain2.address.subscribe':
+            try:
+                address = params[0]
+                result = self.get_status(address, cache_only)
+                self.watch_address(address)
+            except BaseException, e:
+                error = str(e) + ': ' + address
+                print_log( "error:", error )
+
+        elif method == 'blockchain2.address.subscribe2':
+            try:
+                address = params[0]
+                result = self.get_status2(address, cache_only)
+                self.watch_address(address)
+            except BaseException, e:
+                error = str(e) + ': ' + address
+                print_log( "error:", error )
+
+        elif method == 'blockchain2.address.get_history':
+            try:
+                address = params[0]
+                result = self.get_history( address, cache_only )
+            except BaseException, e:
+                error = str(e) + ': ' + address
+                print_log( "error:", error )
+
+        elif method == 'blockchain2.block.get_header':
+            if cache_only: 
+                result = -1
+            else:
+                try:
+                    height = params[0]
+                    result = self.get_header( height ) 
+                except BaseException, e:
+                    error = str(e) + ': %d'% height
+                    print_log( "error:", error )
+                    
+        elif method == 'blockchain2.block.get_chunk':
+            if cache_only:
+                result = -1
+            else:
+                try:
+                    index = params[0]
+                    result = self.get_chunk( index ) 
+                except BaseException, e:
+                    error = str(e) + ': %d'% index
+                    print_log( "error:", error)
+
+        elif method == 'blockchain2.transaction.broadcast':
+            txo = self.bitcoind('sendrawtransaction', params[0])
+            print_log( "sent tx:", txo )
+            result = txo 
+
+        elif method == 'blockchain2.transaction.get_merkle':
+            if cache_only:
+                result = -1
+            else:
+                try:
+                    tx_hash = params[0]
+                    tx_height = params[1]
+                    result = self.get_merkle(tx_hash, tx_height) 
+                except BaseException, e:
+                    error = str(e) + ': ' + tx_hash
+                    print_log( "error:", error )
+                    
+        elif method == 'blockchain2.transaction.get':
+            try:
+                tx_hash = params[0]
+                height = params[1]
+                result = self.bitcoind('getrawtransaction', [tx_hash, 0, height] ) 
+            except BaseException, e:
+                error = str(e) + ': ' + tx_hash
+                print_log( "error:", error )
+
+        else:
+            error = "unknown method:%s"%method
+
+        if cache_only and result == -1: return -1
+
+        if error:
+            response = { 'id':message_id, 'error':error }
+            self.push_response(response)
+        elif result != '':
+            response = { 'id':message_id, 'result':result }
+            self.push_response(response)
+
+
+    def watch_address(self, addr):
+        if addr not in self.watched_addresses:
+            self.watched_addresses.append(addr)
+
+
+
+    def last_hash(self):
+        return self.block_hashes[-1]
+
+
+    def catch_up(self):
+
+        t1 = time.time()
+
+        while not self.shared.stopped():
+
+            # are we done yet?
+            info = self.bitcoind('getinfo')
+            bitcoind_height = info.get('blocks')
+            bitcoind_block_hash = self.bitcoind('getblockhash', [bitcoind_height])
+            if self.last_hash() == bitcoind_block_hash: break
+
+            # not done..
+            block_hash = self.bitcoind('getblockhash', [self.height+1])
+            block = self.bitcoind('getblock', [block_hash])
+
+            if block.get('previousblockhash') == self.last_hash():
+
+                self.import_block(block, block_hash, self.height+1)
+
+                if (self.height+1)%100 == 0: 
+                    t2 = time.time()
+                    print_log( "bc2: block %d (%.3fs)"%( self.height+1, t2 - t1 ) )
+                    t1 = t2
+
+                self.height = self.height + 1
+                self.block_hashes.append(block_hash)
+                self.block_hashes = self.block_hashes[-10:]
+                    
+            else:
+                # revert current block
+                print_log( "bc2: reorg", self.height, block.get('previousblockhash'), self.last_hash() )
+                block_hash = self.last_hash()
+                block = self.bitcoind('getblock', [block_hash])
+                self.height = self.height -1
+                self.block_hashes.remove(block_hash)
+                self.revert_block(block, self.last_hash(), self.height)
+        
+
+        self.header = self.block2header(self.bitcoind('getblock', [self.last_hash()]))
+
+        
+
+            
+    def memorypool_update(self):
+
+        mempool_hashes = self.bitcoind('getrawmempool')
+
+        for tx_hash in mempool_hashes:
+            if tx_hash in self.known_mempool_hashes: continue
+            self.known_mempool_hashes.append(tx_hash)
+
+            tx = self.get_transaction(tx_hash)
+            if not tx: continue
+
+            for x in tx.get('inputs') + tx.get('outputs'):
+                addr = x.get('address')
+                hist = self.mempool_hist.get(addr, [])
+                if tx_hash not in hist: 
+                    hist.append( tx_hash )
+                    self.mempool_hist[addr] = hist
+                    self.update_history_cache(addr)
+
+        self.known_mempool_hashes = mempool_hashes
+
+
+    def update_history_cache(self, address):
+        with self.cache_lock:
+            if self.history_cache.has_key(address):
+                print_log( "cache: invalidating", address )
+                self.history_cache.pop(address)
+
+
+
+    def main_iteration(self):
+
+        if self.shared.stopped(): 
+            print_log( "bc2 terminating")
+            return
+
+        with self.dblock:
+            t1 = time.time()
+            self.catch_up()
+            t2 = time.time()
+            print_log( "blockchain: %d (%.3fs)"%( self.height+1, t2 - t1 ) )
+        self.memorypool_update()
+
+        if self.sent_height != self.height:
+            self.sent_height = self.height
+            self.push_response({ 'id': None, 'method':'blockchain2.numblocks.subscribe', 'params':[self.height] })
+
+        if self.sent_header != self.header:
+            self.sent_header = self.header
+            self.push_response({ 'id': None, 'method':'blockchain2.headers.subscribe', 'params':[self.header] })
+
+        while True:
+            try:
+                addr = self.address_queue.get(False)
+            except:
+                break
+            if addr in self.watched_addresses:
+                status = self.get_status( addr )
+                self.push_response({ 'id': None, 'method':'blockchain.address.subscribe', 'params':[addr, status] })
+
+
+        if not self.shared.stopped(): 
+            threading.Timer(10, self.main_iteration).start()
+        else:
+            print_log( "bc2 terminating" )
+
+
+
+
diff --git a/backends/bitcoind/deserialize.py b/backends/bitcoind/deserialize.py
new file mode 100644 (file)
index 0000000..d92a2b5
--- /dev/null
@@ -0,0 +1,390 @@
+# this code comes from ABE. it can probably be simplified
+#
+#
+
+#from bitcoin import public_key_to_bc_address, hash_160_to_bc_address, hash_encode
+#import socket
+import time, hashlib
+import struct
+addrtype = 0
+
+
+Hash = lambda x: hashlib.sha256(hashlib.sha256(x).digest()).digest()
+hash_encode = lambda x: x[::-1].encode('hex')
+hash_decode = lambda x: x.decode('hex')[::-1]
+
+def hash_160(public_key):
+    md = hashlib.new('ripemd160')
+    md.update(hashlib.sha256(public_key).digest())
+    return md.digest()
+
+def public_key_to_bc_address(public_key):
+    h160 = hash_160(public_key)
+    return hash_160_to_bc_address(h160)
+
+def hash_160_to_bc_address(h160):
+    vh160 = chr(addrtype) + h160
+    h = Hash(vh160)
+    addr = vh160 + h[0:4]
+    return b58encode(addr)
+
+__b58chars = '123456789ABCDEFGHJKLMNPQRSTUVWXYZabcdefghijkmnopqrstuvwxyz'
+__b58base = len(__b58chars)
+
+def b58encode(v):
+    """ encode v, which is a string of bytes, to base58."""
+
+    long_value = 0L
+    for (i, c) in enumerate(v[::-1]):
+        long_value += (256**i) * ord(c)
+
+    result = ''
+    while long_value >= __b58base:
+        div, mod = divmod(long_value, __b58base)
+        result = __b58chars[mod] + result
+        long_value = div
+    result = __b58chars[long_value] + result
+
+    # Bitcoin does a little leading-zero-compression:
+    # leading 0-bytes in the input become leading-1s
+    nPad = 0
+    for c in v:
+        if c == '\0': nPad += 1
+        else: break
+
+    return (__b58chars[0]*nPad) + result
+
+def b58decode(v, length):
+    """ decode v into a string of len bytes."""
+    long_value = 0L
+    for (i, c) in enumerate(v[::-1]):
+        long_value += __b58chars.find(c) * (__b58base**i)
+
+    result = ''
+    while long_value >= 256:
+        div, mod = divmod(long_value, 256)
+        result = chr(mod) + result
+        long_value = div
+    result = chr(long_value) + result
+
+    nPad = 0
+    for c in v:
+        if c == __b58chars[0]: nPad += 1
+        else: break
+
+    result = chr(0)*nPad + result
+    if length is not None and len(result) != length:
+        return None
+
+    return result
+
+
+#
+# Workalike python implementation of Bitcoin's CDataStream class.
+#
+import struct
+import StringIO
+import mmap
+
+class SerializationError(Exception):
+  """ Thrown when there's a problem deserializing or serializing """
+
+class BCDataStream(object):
+  def __init__(self):
+    self.input = None
+    self.read_cursor = 0
+
+  def clear(self):
+    self.input = None
+    self.read_cursor = 0
+
+  def write(self, bytes):  # Initialize with string of bytes
+    if self.input is None:
+      self.input = bytes
+    else:
+      self.input += bytes
+
+  def map_file(self, file, start):  # Initialize with bytes from file
+    self.input = mmap.mmap(file.fileno(), 0, access=mmap.ACCESS_READ)
+    self.read_cursor = start
+  def seek_file(self, position):
+    self.read_cursor = position
+  def close_file(self):
+    self.input.close()
+
+  def read_string(self):
+    # Strings are encoded depending on length:
+    # 0 to 252 :  1-byte-length followed by bytes (if any)
+    # 253 to 65,535 : byte'253' 2-byte-length followed by bytes
+    # 65,536 to 4,294,967,295 : byte '254' 4-byte-length followed by bytes
+    # ... and the Bitcoin client is coded to understand:
+    # greater than 4,294,967,295 : byte '255' 8-byte-length followed by bytes of string
+    # ... but I don't think it actually handles any strings that big.
+    if self.input is None:
+      raise SerializationError("call write(bytes) before trying to deserialize")
+
+    try:
+      length = self.read_compact_size()
+    except IndexError:
+      raise SerializationError("attempt to read past end of buffer")
+
+    return self.read_bytes(length)
+
+  def write_string(self, string):
+    # Length-encoded as with read-string
+    self.write_compact_size(len(string))
+    self.write(string)
+
+  def read_bytes(self, length):
+    try:
+      result = self.input[self.read_cursor:self.read_cursor+length]
+      self.read_cursor += length
+      return result
+    except IndexError:
+      raise SerializationError("attempt to read past end of buffer")
+
+    return ''
+
+  def read_boolean(self): return self.read_bytes(1)[0] != chr(0)
+  def read_int16(self): return self._read_num('<h')
+  def read_uint16(self): return self._read_num('<H')
+  def read_int32(self): return self._read_num('<i')
+  def read_uint32(self): return self._read_num('<I')
+  def read_int64(self): return self._read_num('<q')
+  def read_uint64(self): return self._read_num('<Q')
+
+  def write_boolean(self, val): return self.write(chr(1) if val else chr(0))
+  def write_int16(self, val): return self._write_num('<h', val)
+  def write_uint16(self, val): return self._write_num('<H', val)
+  def write_int32(self, val): return self._write_num('<i', val)
+  def write_uint32(self, val): return self._write_num('<I', val)
+  def write_int64(self, val): return self._write_num('<q', val)
+  def write_uint64(self, val): return self._write_num('<Q', val)
+
+  def read_compact_size(self):
+    size = ord(self.input[self.read_cursor])
+    self.read_cursor += 1
+    if size == 253:
+      size = self._read_num('<H')
+    elif size == 254:
+      size = self._read_num('<I')
+    elif size == 255:
+      size = self._read_num('<Q')
+    return size
+
+  def write_compact_size(self, size):
+    if size < 0:
+      raise SerializationError("attempt to write size < 0")
+    elif size < 253:
+       self.write(chr(size))
+    elif size < 2**16:
+      self.write('\xfd')
+      self._write_num('<H', size)
+    elif size < 2**32:
+      self.write('\xfe')
+      self._write_num('<I', size)
+    elif size < 2**64:
+      self.write('\xff')
+      self._write_num('<Q', size)
+
+  def _read_num(self, format):
+    (i,) = struct.unpack_from(format, self.input, self.read_cursor)
+    self.read_cursor += struct.calcsize(format)
+    return i
+
+  def _write_num(self, format, num):
+    s = struct.pack(format, num)
+    self.write(s)
+
+#
+# enum-like type
+# From the Python Cookbook, downloaded from http://code.activestate.com/recipes/67107/
+#
+import types, string, exceptions
+
+class EnumException(exceptions.Exception):
+    pass
+
+class Enumeration:
+    def __init__(self, name, enumList):
+        self.__doc__ = name
+        lookup = { }
+        reverseLookup = { }
+        i = 0
+        uniqueNames = [ ]
+        uniqueValues = [ ]
+        for x in enumList:
+            if type(x) == types.TupleType:
+                x, i = x
+            if type(x) != types.StringType:
+                raise EnumException, "enum name is not a string: " + x
+            if type(i) != types.IntType:
+                raise EnumException, "enum value is not an integer: " + i
+            if x in uniqueNames:
+                raise EnumException, "enum name is not unique: " + x
+            if i in uniqueValues:
+                raise EnumException, "enum value is not unique for " + x
+            uniqueNames.append(x)
+            uniqueValues.append(i)
+            lookup[x] = i
+            reverseLookup[i] = x
+            i = i + 1
+        self.lookup = lookup
+        self.reverseLookup = reverseLookup
+    def __getattr__(self, attr):
+        if not self.lookup.has_key(attr):
+            raise AttributeError
+        return self.lookup[attr]
+    def whatis(self, value):
+        return self.reverseLookup[value]
+
+
+# This function comes from bitcointools, bct-LICENSE.txt.
+def long_hex(bytes):
+    return bytes.encode('hex_codec')
+
+# This function comes from bitcointools, bct-LICENSE.txt.
+def short_hex(bytes):
+    t = bytes.encode('hex_codec')
+    if len(t) < 11:
+        return t
+    return t[0:4]+"..."+t[-4:]
+
+
+
+def parse_TxIn(vds):
+  d = {}
+  d['prevout_hash'] = hash_encode(vds.read_bytes(32))
+  d['prevout_n'] = vds.read_uint32()
+  scriptSig = vds.read_bytes(vds.read_compact_size())
+  d['sequence'] = vds.read_uint32()
+  d['address'] = extract_public_key(scriptSig)
+  #d['script'] = decode_script(scriptSig)
+  return d
+
+
+def parse_TxOut(vds, i):
+  d = {}
+  d['value'] = vds.read_int64()
+  scriptPubKey = vds.read_bytes(vds.read_compact_size())
+  d['address'] = extract_public_key(scriptPubKey)
+  #d['script'] = decode_script(scriptPubKey)
+  d['raw_output_script'] = scriptPubKey.encode('hex')
+  d['index'] = i
+  return d
+
+
+def parse_Transaction(vds):
+  d = {}
+  start = vds.read_cursor
+  d['version'] = vds.read_int32()
+  n_vin = vds.read_compact_size()
+  d['inputs'] = []
+  for i in xrange(n_vin):
+    d['inputs'].append(parse_TxIn(vds))
+  n_vout = vds.read_compact_size()
+  d['outputs'] = []
+  for i in xrange(n_vout):
+    d['outputs'].append(parse_TxOut(vds, i))
+  d['lockTime'] = vds.read_uint32()
+  return d
+
+
+
+
+opcodes = Enumeration("Opcodes", [
+    ("OP_0", 0), ("OP_PUSHDATA1",76), "OP_PUSHDATA2", "OP_PUSHDATA4", "OP_1NEGATE", "OP_RESERVED",
+    "OP_1", "OP_2", "OP_3", "OP_4", "OP_5", "OP_6", "OP_7",
+    "OP_8", "OP_9", "OP_10", "OP_11", "OP_12", "OP_13", "OP_14", "OP_15", "OP_16",
+    "OP_NOP", "OP_VER", "OP_IF", "OP_NOTIF", "OP_VERIF", "OP_VERNOTIF", "OP_ELSE", "OP_ENDIF", "OP_VERIFY",
+    "OP_RETURN", "OP_TOALTSTACK", "OP_FROMALTSTACK", "OP_2DROP", "OP_2DUP", "OP_3DUP", "OP_2OVER", "OP_2ROT", "OP_2SWAP",
+    "OP_IFDUP", "OP_DEPTH", "OP_DROP", "OP_DUP", "OP_NIP", "OP_OVER", "OP_PICK", "OP_ROLL", "OP_ROT",
+    "OP_SWAP", "OP_TUCK", "OP_CAT", "OP_SUBSTR", "OP_LEFT", "OP_RIGHT", "OP_SIZE", "OP_INVERT", "OP_AND",
+    "OP_OR", "OP_XOR", "OP_EQUAL", "OP_EQUALVERIFY", "OP_RESERVED1", "OP_RESERVED2", "OP_1ADD", "OP_1SUB", "OP_2MUL",
+    "OP_2DIV", "OP_NEGATE", "OP_ABS", "OP_NOT", "OP_0NOTEQUAL", "OP_ADD", "OP_SUB", "OP_MUL", "OP_DIV",
+    "OP_MOD", "OP_LSHIFT", "OP_RSHIFT", "OP_BOOLAND", "OP_BOOLOR",
+    "OP_NUMEQUAL", "OP_NUMEQUALVERIFY", "OP_NUMNOTEQUAL", "OP_LESSTHAN",
+    "OP_GREATERTHAN", "OP_LESSTHANOREQUAL", "OP_GREATERTHANOREQUAL", "OP_MIN", "OP_MAX",
+    "OP_WITHIN", "OP_RIPEMD160", "OP_SHA1", "OP_SHA256", "OP_HASH160",
+    "OP_HASH256", "OP_CODESEPARATOR", "OP_CHECKSIG", "OP_CHECKSIGVERIFY", "OP_CHECKMULTISIG",
+    "OP_CHECKMULTISIGVERIFY",
+    ("OP_SINGLEBYTE_END", 0xF0),
+    ("OP_DOUBLEBYTE_BEGIN", 0xF000),
+    "OP_PUBKEY", "OP_PUBKEYHASH",
+    ("OP_INVALIDOPCODE", 0xFFFF),
+])
+
+def script_GetOp(bytes):
+  i = 0
+  while i < len(bytes):
+    vch = None
+    opcode = ord(bytes[i])
+    i += 1
+    if opcode >= opcodes.OP_SINGLEBYTE_END:
+      opcode <<= 8
+      opcode |= ord(bytes[i])
+      i += 1
+
+    if opcode <= opcodes.OP_PUSHDATA4:
+      nSize = opcode
+      if opcode == opcodes.OP_PUSHDATA1:
+        nSize = ord(bytes[i])
+        i += 1
+      elif opcode == opcodes.OP_PUSHDATA2:
+        (nSize,) = struct.unpack_from('<H', bytes, i)
+        i += 2
+      elif opcode == opcodes.OP_PUSHDATA4:
+        (nSize,) = struct.unpack_from('<I', bytes, i)
+        i += 4
+      vch = bytes[i:i+nSize]
+      i += nSize
+
+    yield (opcode, vch, i)
+
+def script_GetOpName(opcode):
+  return (opcodes.whatis(opcode)).replace("OP_", "")
+
+def decode_script(bytes):
+  result = ''
+  for (opcode, vch, i) in script_GetOp(bytes):
+    if len(result) > 0: result += " "
+    if opcode <= opcodes.OP_PUSHDATA4:
+      result += "%d:"%(opcode,)
+      result += short_hex(vch)
+    else:
+      result += script_GetOpName(opcode)
+  return result
+
+def match_decoded(decoded, to_match):
+  if len(decoded) != len(to_match):
+    return False;
+  for i in range(len(decoded)):
+    if to_match[i] == opcodes.OP_PUSHDATA4 and decoded[i][0] <= opcodes.OP_PUSHDATA4:
+      continue  # Opcodes below OP_PUSHDATA4 all just push data onto stack, and are equivalent.
+    if to_match[i] != decoded[i][0]:
+      return False
+  return True
+
+def extract_public_key(bytes):
+  decoded = [ x for x in script_GetOp(bytes) ]
+
+  # non-generated TxIn transactions push a signature
+  # (seventy-something bytes) and then their public key
+  # (65 bytes) onto the stack:
+  match = [ opcodes.OP_PUSHDATA4, opcodes.OP_PUSHDATA4 ]
+  if match_decoded(decoded, match):
+    return public_key_to_bc_address(decoded[1][1])
+
+  # The Genesis Block, self-payments, and pay-by-IP-address payments look like:
+  # 65 BYTES:... CHECKSIG
+  match = [ opcodes.OP_PUSHDATA4, opcodes.OP_CHECKSIG ]
+  if match_decoded(decoded, match):
+    return public_key_to_bc_address(decoded[0][1])
+
+  # Pay-by-Bitcoin-address TxOuts look like:
+  # DUP HASH160 20 BYTES:... EQUALVERIFY CHECKSIG
+  match = [ opcodes.OP_DUP, opcodes.OP_HASH160, opcodes.OP_PUSHDATA4, opcodes.OP_EQUALVERIFY, opcodes.OP_CHECKSIG ]
+  if match_decoded(decoded, match):
+    return hash_160_to_bc_address(decoded[2][1])
+
+  return "(None)"
index dfe6128..56cea3d 100644 (file)
@@ -13,6 +13,15 @@ def timestr():
     return time.strftime("[%d/%m/%Y-%H:%M:%S]")
 
 
+print_lock = threading.Lock()
+def print_log(*args):
+    args = [str(item) for item in args]
+    with print_lock:
+        sys.stderr.write(" ".join(args) + "\n")
+        sys.stderr.flush()
+
+
+
 class Shared:
 
     def __init__(self):
@@ -20,7 +29,7 @@ class Shared:
         self._stopped = False
 
     def stop(self):
-        print "Stopping Stratum"
+        print_log( "Stopping Stratum" )
         with self.lock:
             self._stopped = True
 
@@ -55,7 +64,7 @@ class Processor(threading.Thread):
             except:
                 traceback.print_exc(file=sys.stdout)
 
-        print "processor terminating"
+        print_log( "processor terminating")
             
 
 
@@ -162,7 +171,7 @@ class RequestDispatcher(threading.Thread):
         try:
             p = self.processors[prefix]
         except:
-            print "error: no processor for", prefix
+            print_log( "error: no processor for", prefix)
             return
 
         p.add_request(request)
@@ -221,8 +230,7 @@ class Session:
             addr = None
 
         if self.subscriptions:
-            print timestr(), self.name, self.address, addr,\
-                len(self.subscriptions), self.version
+            print_log( timestr(), self.name, self.address, addr, len(self.subscriptions), self.version )
 
     def stopped(self):
         with self.lock:
@@ -280,7 +288,7 @@ class ResponseDispatcher(threading.Thread):
         elif internal_id is not None: # and method is None and params is None:
             self.send_response(internal_id, response)
         else:
-            print "no method", response
+            print_log( "no method", response)
 
     def notification(self, method, params, response):
         subdesc = Session.build_subdesc(method, params)
@@ -298,5 +306,5 @@ class ResponseDispatcher(threading.Thread):
             response['id'] = message_id
             session.send_response(response)
         else:
-            print "send_response: no session", message_id, internal_id, response
+            print_log( "send_response: no session", message_id, internal_id, response )
 
index c24dcfc..517299e 100755 (executable)
--- a/server.py
+++ b/server.py
@@ -122,19 +122,19 @@ if __name__ == '__main__':
 
     print "Starting Electrum server on", host
 
-    from backends.bitcoind import Blockchain2Processor
-
     # Create hub
     dispatcher = Dispatcher()
     shared = dispatcher.shared
 
     # Create and register processors
+
+    # from backends.bitcoind import Blockchain2Processor
+    # chain2_proc = Blockchain2Processor(config)
+    # dispatcher.register('blockchain2', chain2_proc)
+
     chain_proc = backend.BlockchainProcessor(config)
     dispatcher.register('blockchain', chain_proc)
 
-    chain2_proc = Blockchain2Processor(config)
-    dispatcher.register('blockchain2', chain2_proc)
-
     server_proc = ServerProcessor(config)
     dispatcher.register('server', server_proc)
 
index 51d5219..f60b320 100644 (file)
@@ -49,7 +49,7 @@ from the processor point of view:
 """
 
 
-from processor import random_string
+from processor import random_string, print_log
 
 
 def get_version(request):
@@ -390,11 +390,11 @@ class HttpServer(threading.Thread):
         if self.use_ssl:
             class StratumThreadedServer(ThreadingMixIn, StratumHTTPSSLServer): pass
             self.server = StratumThreadedServer(( self.host, self.port), self.certfile, self.keyfile)
-            print "HTTPS server started."
+            print_log( "HTTPS server started.")
         else:
             class StratumThreadedServer(ThreadingMixIn, StratumHTTPServer): pass
             self.server = StratumThreadedServer(( self.host, self.port))
-            print "HTTP server started."
+            print_log( "HTTP server started.")
 
         self.server.dispatcher = self.dispatcher
         self.server.register_function(None, 'server.stop')
index e1ebcd1..c843f77 100644 (file)
@@ -4,7 +4,7 @@ import threading
 import time
 import Queue as queue
 
-from processor import Session, Dispatcher, timestr
+from processor import Session, Dispatcher, print_log
 
 class TcpSession(Session):
 
@@ -131,7 +131,10 @@ class TcpServer(threading.Thread):
         self.ssl_certfile = ssl_certfile
 
     def run(self):
-        print "TCP server started.", self.use_ssl
+        if self.use_ssl:
+            print_log( "TCP/SSL server started.")
+        else:
+            print_log( "TCP server started.")
         sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
         sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
         sock.bind((self.host, self.port))