Merge pull request #7 from Ramblurr/master
authorThomasV <thomasv1@gmx.de>
Wed, 21 Nov 2012 14:02:22 +0000 (06:02 -0800)
committerThomasV <thomasv1@gmx.de>
Wed, 21 Nov 2012 14:02:22 +0000 (06:02 -0800)
Update patch to work with latest git sources

README.leveldb [new file with mode: 0644]
backends/bitcoind/blockchain_processor.py
backends/bitcoind/deserialize.py
backends/bitcoind/util.py [new file with mode: 0644]
backends/irc/__init__.py
processor.py
server.py

diff --git a/README.leveldb b/README.leveldb
new file mode 100644 (file)
index 0000000..a06d5a8
--- /dev/null
@@ -0,0 +1,63 @@
+How to run a pruning node with leveldb
+
+Pruning nodes use a lightweight database to store address histories.
+Only unspent coins are kept in that database; spent outputs are
+pruned.
+
+
+__________________________________________________________
+1. patch and compile bitcoind.
+
+Install version 0.8 or equivalent.
+Patch it with the patch distributed with Electrum.
+
+Note: Even though Electrum's database uses pruning, you cannot use it
+with a pruning bitcoind. A full bitcoin node is required in order to
+know for each address if it has been used. Pruning occurs only at the
+level of the Electrum database.
+
+__________________________________________________________
+
+2. Install python-leveldb: 
+
+sudo apt-get install python-leveldb
+
+__________________________________________________________
+
+3. edit /etc/electrum.conf : 
+
+[server]
+backend = leveldb
+
+[leveldb]
+path = /path/to/your/database
+
+______________________________________________________________
+
+4. catch up with the blockchain.
+
+In order to speed up the initial catch_up phase, it is recommended to
+locate your database in shared memory:
+
+ path = /run/shm/electrum_db
+
+Once your server has finished catching up, copy your database to disk
+and update the path in /etc/electrum.conf
+
+During the catch_up phase, you can interrupt the server with Ctrl-C;
+it will safely write the current status in the database and exit.
+
+_________________________________
+
+5. enjoy!
+
+Once the server is synchronized, it will listen to ports, and the
+normal way to stop it is to type: ./server.py stop
+
+Other commands are available: 
+
+./server info   : view connections
+./server load   : view the size of the queue
+
+
+
index ae3d590..5e7901a 100644 (file)
@@ -3,50 +3,11 @@ import leveldb, urllib
 import deserialize
 import ast, time, threading, hashlib
 from Queue import Queue
-import traceback, sys, os
-
-
-
-Hash = lambda x: hashlib.sha256(hashlib.sha256(x).digest()).digest()
-hash_encode = lambda x: x[::-1].encode('hex')
-hash_decode = lambda x: x.decode('hex')[::-1]
-
-
-
-def rev_hex(s):
-    return s.decode('hex')[::-1].encode('hex')
-
-
-def int_to_hex(i, length=1):
-    s = hex(i)[2:].rstrip('L')
-    s = "0"*(2*length - len(s)) + s
-    return rev_hex(s)
-
-def header_to_string(res):
-    pbh = res.get('prev_block_hash')
-    if pbh is None: pbh = '0'*64
-    s = int_to_hex(res.get('version'),4) \
-        + rev_hex(pbh) \
-        + rev_hex(res.get('merkle_root')) \
-        + int_to_hex(int(res.get('timestamp')),4) \
-        + int_to_hex(int(res.get('bits')),4) \
-        + int_to_hex(int(res.get('nonce')),4)
-    return s
-
-def header_from_string( s):
-    hex_to_int = lambda s: eval('0x' + s[::-1].encode('hex'))
-    h = {}
-    h['version'] = hex_to_int(s[0:4])
-    h['prev_block_hash'] = hash_encode(s[4:36])
-    h['merkle_root'] = hash_encode(s[36:68])
-    h['timestamp'] = hex_to_int(s[68:72])
-    h['bits'] = hex_to_int(s[72:76])
-    h['nonce'] = hex_to_int(s[76:80])
-    return h
-
-
+import traceback, sys, os, random
 
 
+from util import Hash, hash_encode, hash_decode, rev_hex, int_to_hex
+from util import bc_address_to_hash_160, hash_160_to_bc_address, header_to_string, header_from_string
 from processor import Processor, print_log
 
 class BlockchainProcessor(Processor):
@@ -55,6 +16,7 @@ class BlockchainProcessor(Processor):
         Processor.__init__(self)
 
         self.shared = shared
+        self.config = config
         self.up_to_date = False
         self.watched_addresses = []
         self.history_cache = {}
@@ -84,20 +46,20 @@ class BlockchainProcessor(Processor):
             config.get('bitcoind','port'))
 
         self.height = 0
+        self.is_test = False
         self.sent_height = 0
         self.sent_header = None
 
 
         try:
-            hist = self.deserialize(self.db.Get('0'))
-            hh, self.height, _ = hist[0] 
-            self.block_hashes = [hh]
+            hist = self.deserialize(self.db.Get('height'))
+            self.last_hash, self.height, _ = hist[0] 
             print_log( "hist", hist )
         except:
             #traceback.print_exc(file=sys.stdout)
             print_log('initializing database')
             self.height = 0
-            self.block_hashes = [ '000000000019d6689c085ae165831e934ff763ae46a2a6c172b3f1b60a8ce26f' ]
+            self.last_hash = '000000000019d6689c085ae165831e934ff763ae46a2a6c172b3f1b60a8ce26f'
 
         # catch_up headers
         self.init_headers(self.height)
@@ -111,7 +73,7 @@ class BlockchainProcessor(Processor):
                 shared.stop()
                 sys.exit(0)
 
-        print "blockchain is up to date."
+        print_log( "blockchain is up to date." )
 
         threading.Timer(10, self.main_iteration).start()
 
@@ -159,28 +121,29 @@ class BlockchainProcessor(Processor):
         self.chunk_cache = {}
         self.headers_filename = os.path.join( self.dbpath, 'blockchain_headers')
 
-        height = 0
         if os.path.exists(self.headers_filename):
-            height = os.path.getsize(self.headers_filename)/80
-
-        if height:
-            prev_header = self.read_header(height -1)
-            prev_hash = self.hash_header(prev_header)
+            height = os.path.getsize(self.headers_filename)/80 - 1   # the current height
+            if height > 0:
+                prev_hash = self.hash_header(self.read_header(height))
+            else:
+                prev_hash = None
         else:
             open(self.headers_filename,'wb').close()
             prev_hash = None
+            height = -1
 
-        if height != db_height:
+        if height < db_height:
             print_log( "catching up missing headers:", height, db_height)
 
-        s = ''
         try:
-            for i in range(height, db_height):
-                header = self.get_header(i)
-                assert prev_hash == header.get('prev_block_hash')
+            while height < db_height:
+                height = height + 1
+                header = self.get_header(height)
+                if height>1: 
+                    assert prev_hash == header.get('prev_block_hash')
                 self.write_header(header, sync=False)
                 prev_hash = self.hash_header(header)
-                if i%1000==0: print_log("headers file:",i)
+                if height%1000==0: print_log("headers file:",height)
         except KeyboardInterrupt:
             self.flush_headers()
             sys.exit()
@@ -214,6 +177,7 @@ class BlockchainProcessor(Processor):
     def write_header(self, header, sync=True):
         if not self.headers_data:
             self.headers_offset = header.get('block_height')
+
         self.headers_data += header_to_string(header).decode('hex')
         if sync or len(self.headers_data) > 40*100:
             self.flush_headers()
@@ -256,7 +220,8 @@ class BlockchainProcessor(Processor):
 
         with self.dblock:
             try:
-                hist = self.deserialize(self.db.Get(addr))
+                hash_160 = bc_address_to_hash_160(addr)
+                hist = self.deserialize(self.db.Get(hash_160))
                 is_known = True
             except: 
                 hist = []
@@ -319,37 +284,57 @@ class BlockchainProcessor(Processor):
         return {"block_height":height, "merkle":s, "pos":tx_pos}
 
         
-    def add_to_batch(self, addr, tx_hash, tx_pos, tx_height):
 
-        # we do it chronologically, so nothing wrong can happen...
+
+    def add_to_history(self, addr, tx_hash, tx_pos, tx_height):
+
+        # keep it sorted
         s = (tx_hash + int_to_hex(tx_pos, 4) + int_to_hex(tx_height, 4)).decode('hex')
-        self.batch_list[addr] += s
+
+        serialized_hist = self.batch_list[addr] 
+
+        l = len(serialized_hist)/40
+        for i in range(l-1, -1, -1):
+            item = serialized_hist[40*i:40*(i+1)]
+            item_height = int( rev_hex( item[36:40].encode('hex') ), 16 )
+            if item_height < tx_height:
+                serialized_hist = serialized_hist[0:40*(i+1)] + s + serialized_hist[40*(i+1):]
+                break
+        else:
+            serialized_hist = s + serialized_hist
+
+        self.batch_list[addr] = serialized_hist
 
         # backlink
         txo = (tx_hash + int_to_hex(tx_pos, 4)).decode('hex')
         self.batch_txio[txo] = addr
 
 
-    def remove_from_batch(self, tx_hash, tx_pos):
+    def remove_from_history(self, addr, tx_hash, tx_pos):
                     
         txi = (tx_hash + int_to_hex(tx_pos, 4)).decode('hex')
-        try:
-            addr = self.batch_txio[txi]
-        except:
-            #raise BaseException(tx_hash, tx_pos)
-            print "WARNING: cannot find address for", (tx_hash, tx_pos)
-            return
 
+        if addr is None:
+            try:
+                addr = self.batch_txio[txi]
+            except:
+                raise BaseException(tx_hash, tx_pos)
+        
         serialized_hist = self.batch_list[addr]
 
         l = len(serialized_hist)/40
         for i in range(l):
-            if serialized_hist[40*i:40*i+36] == txi:
+            item = serialized_hist[40*i:40*(i+1)]
+            if item[0:36] == txi:
+                height = int( rev_hex( item[36:40].encode('hex') ), 16 )
                 serialized_hist = serialized_hist[0:40*i] + serialized_hist[40*(i+1):]
                 break
         else:
+            hist = self.deserialize(serialized_hist)
             raise BaseException("prevout not found", addr, hist, tx_hash, tx_pos)
+
         self.batch_list[addr] = serialized_hist
+        return height, addr
 
 
     def deserialize_block(self, block):
@@ -367,40 +352,59 @@ class BlockchainProcessor(Processor):
             is_coinbase = False
         return tx_hashes, txdict
 
+    def get_undo_info(self, height):
+        s = self.db.Get("undo%d"%(height%100))
+        return eval(s)
+
+    def write_undo_info(self, batch, height, undo_info):
+        if self.is_test or height > self.bitcoind_height - 100:
+            batch.Put("undo%d"%(height%100), repr(undo_info))
+
 
     def import_block(self, block, block_hash, block_height, sync, revert=False):
 
         self.batch_list = {}  # address -> history
         self.batch_txio = {}  # transaction i/o -> address
 
-        inputs_to_read = []
+        block_inputs = []
+        block_outputs = []
         addr_to_read = []
 
         # deserialize transactions
         t0 = time.time()
         tx_hashes, txdict = self.deserialize_block(block)
 
-        # read addresses of tx inputs
         t00 = time.time()
-        for tx in txdict.values():
-            for x in tx.get('inputs'):
-                txi = (x.get('prevout_hash') + int_to_hex(x.get('prevout_n'), 4)).decode('hex')
-                inputs_to_read.append(txi)
 
-        inputs_to_read.sort()
-        for txi in inputs_to_read:
-            try:
-                addr = self.db.Get(txi)    
-            except:
-                # the input could come from the same block
-                continue
-            self.batch_txio[txi] = addr
-            addr_to_read.append(addr)
 
+        if not revert:
+            # read addresses of tx inputs
+            for tx in txdict.values():
+                for x in tx.get('inputs'):
+                    txi = (x.get('prevout_hash') + int_to_hex(x.get('prevout_n'), 4)).decode('hex')
+                    block_inputs.append(txi)
+
+            block_inputs.sort()
+            for txi in block_inputs:
+                try:
+                    addr = self.db.Get(txi)
+                except:
+                    # the input could come from the same block
+                    continue
+                self.batch_txio[txi] = addr
+                addr_to_read.append(addr)
+
+        else:
+            for txid, tx in txdict.items():
+                for x in tx.get('outputs'):
+                    txo = (txid + int_to_hex(x.get('index'), 4)).decode('hex')
+                    block_outputs.append(txo)
+            
         # read histories of addresses
         for txid, tx in txdict.items():
             for x in tx.get('outputs'):
-                addr_to_read.append(x.get('address'))
+                hash_160 = bc_address_to_hash_160(x.get('address'))
+                addr_to_read.append(hash_160)
 
         addr_to_read.sort()
         for addr in addr_to_read:
@@ -408,22 +412,48 @@ class BlockchainProcessor(Processor):
                 self.batch_list[addr] = self.db.Get(addr)
             except: 
                 self.batch_list[addr] = ''
-              
+
+
+        if revert: 
+            undo_info = self.get_undo_info(block_height)
+            # print "undo", block_height, undo_info
+        else: undo_info = {}
+
         # process
         t1 = time.time()
 
+        if revert: tx_hashes = tx_hashes[::-1]
         for txid in tx_hashes: # must be ordered
             tx = txdict[txid]
             if not revert:
+
+                undo = []
                 for x in tx.get('inputs'):
-                    self.remove_from_batch( x.get('prevout_hash'), x.get('prevout_n'))
+                    prevout_height, prevout_addr = self.remove_from_history( None, x.get('prevout_hash'), x.get('prevout_n'))
+                    undo.append( (prevout_height, prevout_addr) )
+                undo_info[txid] = undo
+
                 for x in tx.get('outputs'):
-                    self.add_to_batch( x.get('address'), txid, x.get('index'), block_height)
+                    hash_160 = bc_address_to_hash_160(x.get('address'))
+                    self.add_to_history( hash_160, txid, x.get('index'), block_height)
+                    
             else:
                 for x in tx.get('outputs'):
-                    self.remove_from_batch( x.get('prevout_hash'), x.get('prevout_n'))
+                    hash_160 = bc_address_to_hash_160(x.get('address'))
+                    self.remove_from_history( hash_160, txid, x.get('index'))
+
+                i = 0
                 for x in tx.get('inputs'):
-                    self.add_to_batch( x.get('address'), txid, x.get('index'), block_height)
+                    prevout_height, prevout_addr = undo_info.get(txid)[i]
+                    i += 1
+
+                    # read the history into batch list
+                    if self.batch_list.get(prevout_addr) is None:
+                        self.batch_list[prevout_addr] = self.db.Get(prevout_addr)
+
+                    # re-add them to the history
+                    self.add_to_history( prevout_addr, x.get('prevout_hash'), x.get('prevout_n'), prevout_height)
+                    print_log( "new hist for", hash_160_to_bc_address(prevout_addr), self.deserialize(self.batch_list[prevout_addr]) )
 
         # write
         max_len = 0
@@ -438,26 +468,42 @@ class BlockchainProcessor(Processor):
                 max_len = l
                 max_addr = addr
 
-        for txio, addr in self.batch_txio.items():
-            batch.Put(txio, addr)
-        # delete spent inputs
-        for txi in inputs_to_read:
-            batch.Delete(txi)
-        batch.Put('0', self.serialize( [(block_hash, block_height, 0)] ) )
+        if not revert:
+            # add new created outputs
+            for txio, addr in self.batch_txio.items():
+                batch.Put(txio, addr)
+            # delete spent inputs
+            for txi in block_inputs:
+                batch.Delete(txi)
+            # add undo info 
+            self.write_undo_info(batch, block_height, undo_info)
+        else:
+            # restore spent inputs
+            for txio, addr in self.batch_txio.items():
+                batch.Put(txio, addr)
+            # delete spent outputs
+            for txo in block_outputs:
+                batch.Delete(txo)
+
+
+        # add the max
+        batch.Put('height', self.serialize( [(block_hash, block_height, 0)] ) )
 
         # actual write
         self.db.Write(batch, sync = sync)
 
         t3 = time.time()
-        if t3 - t0 > 10: 
+        if t3 - t0 > 10 and not sync: 
             print_log("block", block_height, 
                       "parse:%0.2f "%(t00 - t0), 
                       "read:%0.2f "%(t1 - t00), 
                       "proc:%.2f "%(t2-t1), 
                       "write:%.2f "%(t3-t2), 
-                      "max:", max_len, max_addr)
+                      "max:", max_len, hash_160_to_bc_address(max_addr))
 
-        for addr in self.batch_list.keys(): self.invalidate_cache(addr)
+        for h160 in self.batch_list.keys(): 
+            addr = hash_160_to_bc_address(h160)
+            self.invalidate_cache(addr)
 
 
 
@@ -492,16 +538,22 @@ class BlockchainProcessor(Processor):
                 error = str(e) + ': ' + address
                 print_log( "error:", error )
 
-        elif method == 'blockchain.address.subscribe2':
+        elif method == 'blockchain.address.unsubscribe':
             try:
-                address = params[0]
-                result = self.get_status(address, cache_only)
-                self.watch_address(address)
+                password = params[0]
+                address = params[1]
+                if password == self.config.get('server','password'):
+                    self.watched_addresses.remove(address)
+                    print_log('unsubscribed', address)
+                    result = "ok"
+                else:
+                    print_log('incorrect password')
+                    result = "authentication error"
             except BaseException, e:
                 error = str(e) + ': ' + address
                 print_log( "error:", error )
 
-        elif method == 'blockchain.address.get_history2':
+        elif method == 'blockchain.address.get_history':
             try:
                 address = params[0]
                 result = self.get_history( address, cache_only )
@@ -576,58 +628,58 @@ class BlockchainProcessor(Processor):
 
 
 
-    def last_hash(self):
-        return self.block_hashes[-1]
-
-
     def catch_up(self, sync = True):
+
         t1 = time.time()
 
         while not self.shared.stopped():
 
             # are we done yet?
             info = self.bitcoind('getinfo')
-            bitcoind_height = info.get('blocks')
-            bitcoind_block_hash = self.bitcoind('getblockhash', [bitcoind_height])
-            if self.last_hash() == bitcoind_block_hash: 
+            self.bitcoind_height = info.get('blocks')
+            bitcoind_block_hash = self.bitcoind('getblockhash', [self.bitcoind_height])
+            if self.last_hash == bitcoind_block_hash: 
                 self.up_to_date = True
                 break
 
             # not done..
             self.up_to_date = False
-            block_hash = self.bitcoind('getblockhash', [self.height+1])
-            block = self.bitcoind('getblock', [block_hash, 1])
+            next_block_hash = self.bitcoind('getblockhash', [self.height+1])
+            next_block = self.bitcoind('getblock', [next_block_hash, 1])
 
-            if block.get('previousblockhash') == self.last_hash():
+            # fixme: this is unsafe, if we revert when the undo info is not yet written 
+            revert = (random.randint(1, 100)==1) if self.is_test else False        
 
-                self.import_block(block, block_hash, self.height+1, sync)
-                self.height = self.height + 1
-                self.write_header(self.block2header(block), sync)
+            if (next_block.get('previousblockhash') == self.last_hash) and not revert:
 
-                self.block_hashes.append(block_hash)
-                self.block_hashes = self.block_hashes[-10:]
+                self.import_block(next_block, next_block_hash, self.height+1, sync)
+                self.height = self.height + 1
+                self.write_header(self.block2header(next_block), sync)
+                self.last_hash = next_block_hash
 
-                if (self.height+1)%100 == 0 and not sync: 
+                if (self.height)%100 == 0 and not sync: 
                     t2 = time.time()
                     print_log( "catch_up: block %d (%.3fs)"%( self.height, t2 - t1 ) )
                     t1 = t2
-
                     
             else:
                 # revert current block
-                print_log( "bc2: reorg", self.height, block.get('previousblockhash'), self.last_hash() )
-                block_hash = self.last_hash()
-                block = self.bitcoind('getblock', [block_hash, 1])
-                self.height = self.height -1
+                block = self.bitcoind('getblock', [self.last_hash, 1])
+                print_log( "blockchain reorg", self.height, block.get('previousblockhash'), self.last_hash )
+                self.import_block(block, self.last_hash, self.height, sync, revert=True)
                 self.pop_header()
+                self.flush_headers()
 
-                self.block_hashes.remove(block_hash)
-                self.import_block(block, self.last_hash(), self.height, revert=True)
+                self.height = self.height -1
+
+                # read previous header from disk
+                self.header = self.read_header(self.height)
+                self.last_hash = self.hash_header(self.header)
         
 
-        self.header = self.block2header(self.bitcoind('getblock', [self.last_hash()]))
+        self.header = self.block2header(self.bitcoind('getblock', [self.last_hash]))
+
 
-        
 
             
     def memorypool_update(self):
@@ -643,7 +695,8 @@ class BlockchainProcessor(Processor):
             for x in tx.get('inputs'):
                 txi = (x.get('prevout_hash') + int_to_hex(x.get('prevout_n'), 4)).decode('hex')
                 try:
-                    addr = self.db.Get(txi)    
+                    h160 = self.db.Get(txi)
+                    addr = hash_160_to_bc_address(h160)
                 except:
                     continue
                 l = self.mempool_addresses.get(tx_hash, [])
@@ -733,8 +786,6 @@ class BlockchainProcessor(Processor):
             if addr in self.watched_addresses:
                 status = self.get_status( addr )
                 self.push_response({ 'id': None, 'method':'blockchain.address.subscribe', 'params':[addr, status] })
-                self.push_response({ 'id': None, 'method':'blockchain.address.subscribe2', 'params':[addr, status] })
-
 
         if not self.shared.stopped(): 
             threading.Timer(10, self.main_iteration).start()
index 395e152..60af334 100644 (file)
@@ -289,8 +289,13 @@ def parse_Transaction(vds, is_coinbase):
   d['outputs'] = []
   for i in xrange(n_vout):
       o = parse_TxOut(vds, i)
-      if o['address'] is not None:
-          d['outputs'].append(o)
+
+      #if o['address'] == "None" and o['value']==0:
+      #    print("skipping strange tx output with zero value")
+      #    continue
+      # if o['address'] != "None":
+      d['outputs'].append(o)
+
   d['lockTime'] = vds.read_uint32()
   return d
 
@@ -386,11 +391,22 @@ def extract_public_key(bytes):
   if match_decoded(decoded, match):
     return public_key_to_bc_address(decoded[0][1])
 
+  # coins sent to black hole
+  # DUP HASH160 20 BYTES:... EQUALVERIFY CHECKSIG
+  match = [ opcodes.OP_DUP, opcodes.OP_HASH160, opcodes.OP_0, opcodes.OP_EQUALVERIFY, opcodes.OP_CHECKSIG ]
+  if match_decoded(decoded, match):
+    return "None"
+
   # Pay-by-Bitcoin-address TxOuts look like:
   # DUP HASH160 20 BYTES:... EQUALVERIFY CHECKSIG
   match = [ opcodes.OP_DUP, opcodes.OP_HASH160, opcodes.OP_PUSHDATA4, opcodes.OP_EQUALVERIFY, opcodes.OP_CHECKSIG ]
   if match_decoded(decoded, match):
     return hash_160_to_bc_address(decoded[2][1])
 
+  # strange tx
+  match = [ opcodes.OP_DUP, opcodes.OP_HASH160, opcodes.OP_PUSHDATA4, opcodes.OP_EQUALVERIFY, opcodes.OP_CHECKSIG, opcodes.OP_NOP ]
+  if match_decoded(decoded, match):
+    return hash_160_to_bc_address(decoded[2][1])
+
   #raise BaseException("address not found in script") see ce35795fb64c268a52324b884793b3165233b1e6d678ccaadf760628ec34d76b
-  return "(None)"
+  return "None"
diff --git a/backends/bitcoind/util.py b/backends/bitcoind/util.py
new file mode 100644 (file)
index 0000000..f9b9ddc
--- /dev/null
@@ -0,0 +1,183 @@
+#!/usr/bin/env python
+#
+# Electrum - lightweight Bitcoin client
+# Copyright (C) 2011 thomasv@gitorious
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see <http://www.gnu.org/licenses/>.
+
+
+import hashlib, base64, re
+
+
+def rev_hex(s):
+    return s.decode('hex')[::-1].encode('hex')
+
+def int_to_hex(i, length=1):
+    s = hex(i)[2:].rstrip('L')
+    s = "0"*(2*length - len(s)) + s
+    return rev_hex(s)
+
+def var_int(i):
+    if i<0xfd:
+        return int_to_hex(i)
+    elif i<=0xffff:
+        return "fd"+int_to_hex(i,2)
+    elif i<=0xffffffff:
+        return "fe"+int_to_hex(i,4)
+    else:
+        return "ff"+int_to_hex(i,8)
+
+
+Hash = lambda x: hashlib.sha256(hashlib.sha256(x).digest()).digest()
+hash_encode = lambda x: x[::-1].encode('hex')
+hash_decode = lambda x: x.decode('hex')[::-1]
+
+
+def header_to_string(res):
+    pbh = res.get('prev_block_hash')
+    if pbh is None: pbh = '0'*64
+    s = int_to_hex(res.get('version'),4) \
+        + rev_hex(pbh) \
+        + rev_hex(res.get('merkle_root')) \
+        + int_to_hex(int(res.get('timestamp')),4) \
+        + int_to_hex(int(res.get('bits')),4) \
+        + int_to_hex(int(res.get('nonce')),4)
+    return s
+
+def header_from_string( s):
+    hex_to_int = lambda s: eval('0x' + s[::-1].encode('hex'))
+    h = {}
+    h['version'] = hex_to_int(s[0:4])
+    h['prev_block_hash'] = hash_encode(s[4:36])
+    h['merkle_root'] = hash_encode(s[36:68])
+    h['timestamp'] = hex_to_int(s[68:72])
+    h['bits'] = hex_to_int(s[72:76])
+    h['nonce'] = hex_to_int(s[76:80])
+    return h
+
+
+############ functions from pywallet ##################### 
+
+addrtype = 0
+
+def hash_160(public_key):
+    try:
+        md = hashlib.new('ripemd160')
+        md.update(hashlib.sha256(public_key).digest())
+        return md.digest()
+    except:
+        import ripemd
+        md = ripemd.new(hashlib.sha256(public_key).digest())
+        return md.digest()
+
+
+def public_key_to_bc_address(public_key):
+    h160 = hash_160(public_key)
+    return hash_160_to_bc_address(h160)
+
+def hash_160_to_bc_address(h160):
+    if h160 == 'None': return 'None'
+    vh160 = chr(addrtype) + h160
+    h = Hash(vh160)
+    addr = vh160 + h[0:4]
+    return b58encode(addr)
+
+def bc_address_to_hash_160(addr):
+    if addr == 'None': return 'None'
+    bytes = b58decode(addr, 25)
+    return bytes[1:21]
+
+
+__b58chars = '123456789ABCDEFGHJKLMNPQRSTUVWXYZabcdefghijkmnopqrstuvwxyz'
+__b58base = len(__b58chars)
+
+def b58encode(v):
+    """ encode v, which is a string of bytes, to base58."""
+
+    long_value = 0L
+    for (i, c) in enumerate(v[::-1]):
+        long_value += (256**i) * ord(c)
+
+    result = ''
+    while long_value >= __b58base:
+        div, mod = divmod(long_value, __b58base)
+        result = __b58chars[mod] + result
+        long_value = div
+    result = __b58chars[long_value] + result
+
+    # Bitcoin does a little leading-zero-compression:
+    # leading 0-bytes in the input become leading-1s
+    nPad = 0
+    for c in v:
+        if c == '\0': nPad += 1
+        else: break
+
+    return (__b58chars[0]*nPad) + result
+
+def b58decode(v, length):
+    """ decode v into a string of len bytes."""
+    long_value = 0L
+    for (i, c) in enumerate(v[::-1]):
+        long_value += __b58chars.find(c) * (__b58base**i)
+
+    result = ''
+    while long_value >= 256:
+        div, mod = divmod(long_value, 256)
+        result = chr(mod) + result
+        long_value = div
+    result = chr(long_value) + result
+
+    nPad = 0
+    for c in v:
+        if c == __b58chars[0]: nPad += 1
+        else: break
+
+    result = chr(0)*nPad + result
+    if length is not None and len(result) != length:
+        return None
+
+    return result
+
+
+def EncodeBase58Check(vchIn):
+    hash = Hash(vchIn)
+    return b58encode(vchIn + hash[0:4])
+
+def DecodeBase58Check(psz):
+    vchRet = b58decode(psz, None)
+    key = vchRet[0:-4]
+    csum = vchRet[-4:]
+    hash = Hash(key)
+    cs32 = hash[0:4]
+    if cs32 != csum:
+        return None
+    else:
+        return key
+
+def PrivKeyToSecret(privkey):
+    return privkey[9:9+32]
+
+def SecretToASecret(secret):
+    vchIn = chr(addrtype+128) + secret
+    return EncodeBase58Check(vchIn)
+
+def ASecretToSecret(key):
+    vch = DecodeBase58Check(key)
+    if vch and vch[0] == chr(addrtype+128):
+        return vch[1:]
+    else:
+        return False
+
+########### end pywallet functions #######################
+
index e70746c..d800a4a 100644 (file)
@@ -24,6 +24,7 @@ class IrcThread(threading.Thread):
         self.prepend = 'E_'
         if config.get('server', 'coin') == 'litecoin':
             self.prepend = 'EL_'
+        self.pruning = config.get('server', 'backend') == 'leveldb'
         self.nick = self.prepend + self.nick
 
     def get_peers(self):
@@ -32,6 +33,7 @@ class IrcThread(threading.Thread):
 
     def getname(self):
         s = 'v' + VERSION + ' '
+        if self.pruning: s += 'p '
         if self.stratum_tcp_port:
             s += 't' + self.stratum_tcp_port + ' ' 
         if self.stratum_http_port:
index c6972f8..8d1788d 100644 (file)
@@ -17,16 +17,17 @@ print_lock = threading.Lock()
 def print_log(*args):
     args = [str(item) for item in args]
     with print_lock:
-        sys.stderr.write(" ".join(args) + "\n")
+        sys.stderr.write(timestr() + " " + " ".join(args) + "\n")
         sys.stderr.flush()
 
 
 
 class Shared:
 
-    def __init__(self):
+    def __init__(self, config):
         self.lock = threading.Lock()
         self._stopped = False
+        self.config = config
 
     def stop(self):
         print_log( "Stopping Stratum" )
@@ -70,8 +71,8 @@ class Processor(threading.Thread):
 
 class Dispatcher:
 
-    def __init__(self):
-        self.shared = Shared()
+    def __init__(self, config):
+        self.shared = Shared(config)
         self.request_dispatcher = RequestDispatcher(self.shared)
         self.request_dispatcher.start()
         self.response_dispatcher = \
@@ -151,18 +152,10 @@ class RequestDispatcher(threading.Thread):
         params = request.get('params',[])
         suffix = method.split('.')[-1]
 
-        is_new = session.protocol_version >= 0.5
-
-        if is_new and method == 'blockchain.address.get_history': 
-            method = 'blockchain.address.get_history2'
-            request['method'] = method
-
-        if suffix == 'subscribe':
-            if is_new and method == 'blockchain.address.subscribe': 
-                method = 'blockchain.address.subscribe2'
-                request['method'] = method
-
-            session.subscribe_to_service(method, params)
+        if session is not None:
+            is_new = session.protocol_version >= 0.5
+            if suffix == 'subscribe':
+                session.subscribe_to_service(method, params)
 
         # store session and id locally
         request['id'] = self.store_session_id(session, request['id'])
@@ -234,7 +227,7 @@ class Session:
             addr = None
 
         if self.subscriptions:
-            print_log( timestr(), self.name, self.address, addr, len(self.subscriptions), self.version )
+            print_log( "%4s"%self.name, "%14s"%self.address, "%35s"%addr, "%3d"%len(self.subscriptions), self.version )
 
     def stopped(self):
         with self.lock:
@@ -253,7 +246,7 @@ class Session:
             return method,
         elif method == "blockchain.headers.subscribe":
             return method,
-        elif method in ["blockchain.address.subscribe", "blockchain.address.subscribe2"]:
+        elif method in ["blockchain.address.subscribe"]:
             if not params:
                 return None
             else:
@@ -268,9 +261,9 @@ class Session:
 
 class ResponseDispatcher(threading.Thread):
 
-    def __init__(self, shared, processor):
+    def __init__(self, shared, request_dispatcher):
         self.shared = shared
-        self.processor = processor
+        self.request_dispatcher = request_dispatcher
         threading.Thread.__init__(self)
         self.daemon = True
 
@@ -279,7 +272,7 @@ class ResponseDispatcher(threading.Thread):
             self.update()
 
     def update(self):
-        response = self.processor.pop_response()
+        response = self.request_dispatcher.pop_response()
         #print "pop response", response
         internal_id = response.get('id')
         method = response.get('method')
@@ -287,28 +280,34 @@ class ResponseDispatcher(threading.Thread):
 
         # A notification
         if internal_id is None: # and method is not None and params is not None:
-            self.notification(method, params, response)
+            found = self.notification(method, params, response)
+            if not found and method == 'blockchain.address.subscribe':
+                params2 = [self.shared.config.get('server','password')] + params
+                self.request_dispatcher.push_request(None,{'method':method.replace('.subscribe', '.unsubscribe'), 'params':params2, 'id':None})
+
         # A response
-        elif internal_id is not None: # and method is None and params is None:
+        elif internal_id is not None: 
             self.send_response(internal_id, response)
         else:
             print_log( "no method", response)
 
     def notification(self, method, params, response):
         subdesc = Session.build_subdesc(method, params)
-        for session in self.processor.sessions:
+        found = False
+        for session in self.request_dispatcher.sessions:
             if session.stopped():
                 continue
             if session.contains_subscription(subdesc):
-                if response.get('method') == "blockchain.address.subscribe2":
-                    response['method'] = "blockchain.address.subscribe"
                 session.send_response(response)
+                found = True
+        # if not found: print_log( "no subscriber for", subdesc)
+        return found
 
     def send_response(self, internal_id, response):
-        session, message_id = self.processor.get_session_id(internal_id)
+        session, message_id = self.request_dispatcher.get_session_id(internal_id)
         if session:
             response['id'] = message_id
             session.send_response(response)
-        else:
-            print_log( "send_response: no session", message_id, internal_id, response )
+        #else:
+        #    print_log( "send_response: no session", message_id, internal_id, response )
 
index 6ada775..f874cfe 100755 (executable)
--- a/server.py
+++ b/server.py
@@ -114,7 +114,7 @@ if __name__ == '__main__':
         run_rpc_command(sys.argv[1], stratum_tcp_port)
         sys.exit(0)
 
-    from processor import Dispatcher
+    from processor import Dispatcher, print_log
     from backends.irc import ServerProcessor
 
     backend_name = config.get('server', 'backend')
@@ -122,16 +122,17 @@ if __name__ == '__main__':
         from backends.abe import BlockchainProcessor
     elif backend_name == 'libbitcoin':
         from backends.libbitcoin import BlockchainProcessor
-    elif backend_name == 'bitcoind':
+    elif backend_name == 'leveldb':
         from backends.bitcoind import BlockchainProcessor
     else:
         print "Unknown backend '%s' specified\n" % backend_name
         sys.exit(1)
 
-    print "\n\n\n\nStarting Electrum server on", host
+    for i in range(5): print ""
+    print_log( "Starting Electrum server on", host)
 
     # Create hub
-    dispatcher = Dispatcher()
+    dispatcher = Dispatcher(config)
     shared = dispatcher.shared
 
     # Create and register processors
@@ -172,5 +173,5 @@ if __name__ == '__main__':
         except:
             shared.stop()
 
-    print "Electrum Server stopped"
+    print_log( "Electrum Server stopped")