From: ThomasV Date: Wed, 21 Nov 2012 14:02:22 +0000 (-0800) Subject: Merge pull request #7 from Ramblurr/master X-Git-Url: https://git.novaco.in/?p=electrum-server.git;a=commitdiff_plain;h=10d7d4077c1c07874a413c8b929b3597d8630075;hp=4986b66bb602c6044e33cb6bada5b2278837a2ce Merge pull request #7 from Ramblurr/master Update patch to work with latest git sources --- diff --git a/README.leveldb b/README.leveldb new file mode 100644 index 0000000..a06d5a8 --- /dev/null +++ b/README.leveldb @@ -0,0 +1,63 @@ +How to run a pruning node with leveldb + +Pruning nodes use a lightweight database to store address histories. +Only unspent coins are kept in that database; spent outputs are +pruned. + + +__________________________________________________________ +1. patch and compile bitcoind. + +Install version 0.8 or equivalent. +Patch it with the patch distributed with Electrum. + +Note: Even though Electrum's database uses pruning, you cannot use it +with a pruning bitcoind. A full bitcoin node is required in order to +know for each address if it has been used. Pruning occurs only at the +level of the Electrum database. + +__________________________________________________________ + +2. Install python-leveldb: + +sudo apt-get install python-leveldb + +__________________________________________________________ + +3. edit /etc/electrum.conf : + +[server] +backend = leveldb + +[leveldb] +path = /path/to/your/database + +______________________________________________________________ + +4. catch up with the blockchain. + +In order to speed up the initial catch_up phase, it is recommended to +locate your database in shared memory: + + path = /run/shm/electrum_db + +Once your server has finished catching up, copy your database to disk +and update the path in /etc/electrum.conf + +During the catch_up phase, you can interrupt the server with Ctrl-C; +it will safely write the current status in the database and exit. + +_________________________________ + +5. enjoy! + +Once the server is synchronized, it will listen to ports, and the +normal way to stop it is to type: ./server.py stop + +Other commands are available: + +./server info : view connections +./server load : view the size of the queue + + + diff --git a/backends/bitcoind/blockchain_processor.py b/backends/bitcoind/blockchain_processor.py index ae3d590..5e7901a 100644 --- a/backends/bitcoind/blockchain_processor.py +++ b/backends/bitcoind/blockchain_processor.py @@ -3,50 +3,11 @@ import leveldb, urllib import deserialize import ast, time, threading, hashlib from Queue import Queue -import traceback, sys, os - - - -Hash = lambda x: hashlib.sha256(hashlib.sha256(x).digest()).digest() -hash_encode = lambda x: x[::-1].encode('hex') -hash_decode = lambda x: x.decode('hex')[::-1] - - - -def rev_hex(s): - return s.decode('hex')[::-1].encode('hex') - - -def int_to_hex(i, length=1): - s = hex(i)[2:].rstrip('L') - s = "0"*(2*length - len(s)) + s - return rev_hex(s) - -def header_to_string(res): - pbh = res.get('prev_block_hash') - if pbh is None: pbh = '0'*64 - s = int_to_hex(res.get('version'),4) \ - + rev_hex(pbh) \ - + rev_hex(res.get('merkle_root')) \ - + int_to_hex(int(res.get('timestamp')),4) \ - + int_to_hex(int(res.get('bits')),4) \ - + int_to_hex(int(res.get('nonce')),4) - return s - -def header_from_string( s): - hex_to_int = lambda s: eval('0x' + s[::-1].encode('hex')) - h = {} - h['version'] = hex_to_int(s[0:4]) - h['prev_block_hash'] = hash_encode(s[4:36]) - h['merkle_root'] = hash_encode(s[36:68]) - h['timestamp'] = hex_to_int(s[68:72]) - h['bits'] = hex_to_int(s[72:76]) - h['nonce'] = hex_to_int(s[76:80]) - return h - - +import traceback, sys, os, random +from util import Hash, hash_encode, hash_decode, rev_hex, int_to_hex +from util import bc_address_to_hash_160, hash_160_to_bc_address, header_to_string, header_from_string from processor import Processor, print_log class BlockchainProcessor(Processor): @@ -55,6 +16,7 @@ class BlockchainProcessor(Processor): Processor.__init__(self) self.shared = shared + self.config = config self.up_to_date = False self.watched_addresses = [] self.history_cache = {} @@ -84,20 +46,20 @@ class BlockchainProcessor(Processor): config.get('bitcoind','port')) self.height = 0 + self.is_test = False self.sent_height = 0 self.sent_header = None try: - hist = self.deserialize(self.db.Get('0')) - hh, self.height, _ = hist[0] - self.block_hashes = [hh] + hist = self.deserialize(self.db.Get('height')) + self.last_hash, self.height, _ = hist[0] print_log( "hist", hist ) except: #traceback.print_exc(file=sys.stdout) print_log('initializing database') self.height = 0 - self.block_hashes = [ '000000000019d6689c085ae165831e934ff763ae46a2a6c172b3f1b60a8ce26f' ] + self.last_hash = '000000000019d6689c085ae165831e934ff763ae46a2a6c172b3f1b60a8ce26f' # catch_up headers self.init_headers(self.height) @@ -111,7 +73,7 @@ class BlockchainProcessor(Processor): shared.stop() sys.exit(0) - print "blockchain is up to date." + print_log( "blockchain is up to date." ) threading.Timer(10, self.main_iteration).start() @@ -159,28 +121,29 @@ class BlockchainProcessor(Processor): self.chunk_cache = {} self.headers_filename = os.path.join( self.dbpath, 'blockchain_headers') - height = 0 if os.path.exists(self.headers_filename): - height = os.path.getsize(self.headers_filename)/80 - - if height: - prev_header = self.read_header(height -1) - prev_hash = self.hash_header(prev_header) + height = os.path.getsize(self.headers_filename)/80 - 1 # the current height + if height > 0: + prev_hash = self.hash_header(self.read_header(height)) + else: + prev_hash = None else: open(self.headers_filename,'wb').close() prev_hash = None + height = -1 - if height != db_height: + if height < db_height: print_log( "catching up missing headers:", height, db_height) - s = '' try: - for i in range(height, db_height): - header = self.get_header(i) - assert prev_hash == header.get('prev_block_hash') + while height < db_height: + height = height + 1 + header = self.get_header(height) + if height>1: + assert prev_hash == header.get('prev_block_hash') self.write_header(header, sync=False) prev_hash = self.hash_header(header) - if i%1000==0: print_log("headers file:",i) + if height%1000==0: print_log("headers file:",height) except KeyboardInterrupt: self.flush_headers() sys.exit() @@ -214,6 +177,7 @@ class BlockchainProcessor(Processor): def write_header(self, header, sync=True): if not self.headers_data: self.headers_offset = header.get('block_height') + self.headers_data += header_to_string(header).decode('hex') if sync or len(self.headers_data) > 40*100: self.flush_headers() @@ -256,7 +220,8 @@ class BlockchainProcessor(Processor): with self.dblock: try: - hist = self.deserialize(self.db.Get(addr)) + hash_160 = bc_address_to_hash_160(addr) + hist = self.deserialize(self.db.Get(hash_160)) is_known = True except: hist = [] @@ -319,37 +284,57 @@ class BlockchainProcessor(Processor): return {"block_height":height, "merkle":s, "pos":tx_pos} - def add_to_batch(self, addr, tx_hash, tx_pos, tx_height): - # we do it chronologically, so nothing wrong can happen... + + def add_to_history(self, addr, tx_hash, tx_pos, tx_height): + + # keep it sorted s = (tx_hash + int_to_hex(tx_pos, 4) + int_to_hex(tx_height, 4)).decode('hex') - self.batch_list[addr] += s + + serialized_hist = self.batch_list[addr] + + l = len(serialized_hist)/40 + for i in range(l-1, -1, -1): + item = serialized_hist[40*i:40*(i+1)] + item_height = int( rev_hex( item[36:40].encode('hex') ), 16 ) + if item_height < tx_height: + serialized_hist = serialized_hist[0:40*(i+1)] + s + serialized_hist[40*(i+1):] + break + else: + serialized_hist = s + serialized_hist + + self.batch_list[addr] = serialized_hist # backlink txo = (tx_hash + int_to_hex(tx_pos, 4)).decode('hex') self.batch_txio[txo] = addr - def remove_from_batch(self, tx_hash, tx_pos): + def remove_from_history(self, addr, tx_hash, tx_pos): txi = (tx_hash + int_to_hex(tx_pos, 4)).decode('hex') - try: - addr = self.batch_txio[txi] - except: - #raise BaseException(tx_hash, tx_pos) - print "WARNING: cannot find address for", (tx_hash, tx_pos) - return + if addr is None: + try: + addr = self.batch_txio[txi] + except: + raise BaseException(tx_hash, tx_pos) + serialized_hist = self.batch_list[addr] l = len(serialized_hist)/40 for i in range(l): - if serialized_hist[40*i:40*i+36] == txi: + item = serialized_hist[40*i:40*(i+1)] + if item[0:36] == txi: + height = int( rev_hex( item[36:40].encode('hex') ), 16 ) serialized_hist = serialized_hist[0:40*i] + serialized_hist[40*(i+1):] break else: + hist = self.deserialize(serialized_hist) raise BaseException("prevout not found", addr, hist, tx_hash, tx_pos) + self.batch_list[addr] = serialized_hist + return height, addr def deserialize_block(self, block): @@ -367,40 +352,59 @@ class BlockchainProcessor(Processor): is_coinbase = False return tx_hashes, txdict + def get_undo_info(self, height): + s = self.db.Get("undo%d"%(height%100)) + return eval(s) + + def write_undo_info(self, batch, height, undo_info): + if self.is_test or height > self.bitcoind_height - 100: + batch.Put("undo%d"%(height%100), repr(undo_info)) + def import_block(self, block, block_hash, block_height, sync, revert=False): self.batch_list = {} # address -> history self.batch_txio = {} # transaction i/o -> address - inputs_to_read = [] + block_inputs = [] + block_outputs = [] addr_to_read = [] # deserialize transactions t0 = time.time() tx_hashes, txdict = self.deserialize_block(block) - # read addresses of tx inputs t00 = time.time() - for tx in txdict.values(): - for x in tx.get('inputs'): - txi = (x.get('prevout_hash') + int_to_hex(x.get('prevout_n'), 4)).decode('hex') - inputs_to_read.append(txi) - inputs_to_read.sort() - for txi in inputs_to_read: - try: - addr = self.db.Get(txi) - except: - # the input could come from the same block - continue - self.batch_txio[txi] = addr - addr_to_read.append(addr) + if not revert: + # read addresses of tx inputs + for tx in txdict.values(): + for x in tx.get('inputs'): + txi = (x.get('prevout_hash') + int_to_hex(x.get('prevout_n'), 4)).decode('hex') + block_inputs.append(txi) + + block_inputs.sort() + for txi in block_inputs: + try: + addr = self.db.Get(txi) + except: + # the input could come from the same block + continue + self.batch_txio[txi] = addr + addr_to_read.append(addr) + + else: + for txid, tx in txdict.items(): + for x in tx.get('outputs'): + txo = (txid + int_to_hex(x.get('index'), 4)).decode('hex') + block_outputs.append(txo) + # read histories of addresses for txid, tx in txdict.items(): for x in tx.get('outputs'): - addr_to_read.append(x.get('address')) + hash_160 = bc_address_to_hash_160(x.get('address')) + addr_to_read.append(hash_160) addr_to_read.sort() for addr in addr_to_read: @@ -408,22 +412,48 @@ class BlockchainProcessor(Processor): self.batch_list[addr] = self.db.Get(addr) except: self.batch_list[addr] = '' - + + + if revert: + undo_info = self.get_undo_info(block_height) + # print "undo", block_height, undo_info + else: undo_info = {} + # process t1 = time.time() + if revert: tx_hashes = tx_hashes[::-1] for txid in tx_hashes: # must be ordered tx = txdict[txid] if not revert: + + undo = [] for x in tx.get('inputs'): - self.remove_from_batch( x.get('prevout_hash'), x.get('prevout_n')) + prevout_height, prevout_addr = self.remove_from_history( None, x.get('prevout_hash'), x.get('prevout_n')) + undo.append( (prevout_height, prevout_addr) ) + undo_info[txid] = undo + for x in tx.get('outputs'): - self.add_to_batch( x.get('address'), txid, x.get('index'), block_height) + hash_160 = bc_address_to_hash_160(x.get('address')) + self.add_to_history( hash_160, txid, x.get('index'), block_height) + else: for x in tx.get('outputs'): - self.remove_from_batch( x.get('prevout_hash'), x.get('prevout_n')) + hash_160 = bc_address_to_hash_160(x.get('address')) + self.remove_from_history( hash_160, txid, x.get('index')) + + i = 0 for x in tx.get('inputs'): - self.add_to_batch( x.get('address'), txid, x.get('index'), block_height) + prevout_height, prevout_addr = undo_info.get(txid)[i] + i += 1 + + # read the history into batch list + if self.batch_list.get(prevout_addr) is None: + self.batch_list[prevout_addr] = self.db.Get(prevout_addr) + + # re-add them to the history + self.add_to_history( prevout_addr, x.get('prevout_hash'), x.get('prevout_n'), prevout_height) + print_log( "new hist for", hash_160_to_bc_address(prevout_addr), self.deserialize(self.batch_list[prevout_addr]) ) # write max_len = 0 @@ -438,26 +468,42 @@ class BlockchainProcessor(Processor): max_len = l max_addr = addr - for txio, addr in self.batch_txio.items(): - batch.Put(txio, addr) - # delete spent inputs - for txi in inputs_to_read: - batch.Delete(txi) - batch.Put('0', self.serialize( [(block_hash, block_height, 0)] ) ) + if not revert: + # add new created outputs + for txio, addr in self.batch_txio.items(): + batch.Put(txio, addr) + # delete spent inputs + for txi in block_inputs: + batch.Delete(txi) + # add undo info + self.write_undo_info(batch, block_height, undo_info) + else: + # restore spent inputs + for txio, addr in self.batch_txio.items(): + batch.Put(txio, addr) + # delete spent outputs + for txo in block_outputs: + batch.Delete(txo) + + + # add the max + batch.Put('height', self.serialize( [(block_hash, block_height, 0)] ) ) # actual write self.db.Write(batch, sync = sync) t3 = time.time() - if t3 - t0 > 10: + if t3 - t0 > 10 and not sync: print_log("block", block_height, "parse:%0.2f "%(t00 - t0), "read:%0.2f "%(t1 - t00), "proc:%.2f "%(t2-t1), "write:%.2f "%(t3-t2), - "max:", max_len, max_addr) + "max:", max_len, hash_160_to_bc_address(max_addr)) - for addr in self.batch_list.keys(): self.invalidate_cache(addr) + for h160 in self.batch_list.keys(): + addr = hash_160_to_bc_address(h160) + self.invalidate_cache(addr) @@ -492,16 +538,22 @@ class BlockchainProcessor(Processor): error = str(e) + ': ' + address print_log( "error:", error ) - elif method == 'blockchain.address.subscribe2': + elif method == 'blockchain.address.unsubscribe': try: - address = params[0] - result = self.get_status(address, cache_only) - self.watch_address(address) + password = params[0] + address = params[1] + if password == self.config.get('server','password'): + self.watched_addresses.remove(address) + print_log('unsubscribed', address) + result = "ok" + else: + print_log('incorrect password') + result = "authentication error" except BaseException, e: error = str(e) + ': ' + address print_log( "error:", error ) - elif method == 'blockchain.address.get_history2': + elif method == 'blockchain.address.get_history': try: address = params[0] result = self.get_history( address, cache_only ) @@ -576,58 +628,58 @@ class BlockchainProcessor(Processor): - def last_hash(self): - return self.block_hashes[-1] - - def catch_up(self, sync = True): + t1 = time.time() while not self.shared.stopped(): # are we done yet? info = self.bitcoind('getinfo') - bitcoind_height = info.get('blocks') - bitcoind_block_hash = self.bitcoind('getblockhash', [bitcoind_height]) - if self.last_hash() == bitcoind_block_hash: + self.bitcoind_height = info.get('blocks') + bitcoind_block_hash = self.bitcoind('getblockhash', [self.bitcoind_height]) + if self.last_hash == bitcoind_block_hash: self.up_to_date = True break # not done.. self.up_to_date = False - block_hash = self.bitcoind('getblockhash', [self.height+1]) - block = self.bitcoind('getblock', [block_hash, 1]) + next_block_hash = self.bitcoind('getblockhash', [self.height+1]) + next_block = self.bitcoind('getblock', [next_block_hash, 1]) - if block.get('previousblockhash') == self.last_hash(): + # fixme: this is unsafe, if we revert when the undo info is not yet written + revert = (random.randint(1, 100)==1) if self.is_test else False - self.import_block(block, block_hash, self.height+1, sync) - self.height = self.height + 1 - self.write_header(self.block2header(block), sync) + if (next_block.get('previousblockhash') == self.last_hash) and not revert: - self.block_hashes.append(block_hash) - self.block_hashes = self.block_hashes[-10:] + self.import_block(next_block, next_block_hash, self.height+1, sync) + self.height = self.height + 1 + self.write_header(self.block2header(next_block), sync) + self.last_hash = next_block_hash - if (self.height+1)%100 == 0 and not sync: + if (self.height)%100 == 0 and not sync: t2 = time.time() print_log( "catch_up: block %d (%.3fs)"%( self.height, t2 - t1 ) ) t1 = t2 - else: # revert current block - print_log( "bc2: reorg", self.height, block.get('previousblockhash'), self.last_hash() ) - block_hash = self.last_hash() - block = self.bitcoind('getblock', [block_hash, 1]) - self.height = self.height -1 + block = self.bitcoind('getblock', [self.last_hash, 1]) + print_log( "blockchain reorg", self.height, block.get('previousblockhash'), self.last_hash ) + self.import_block(block, self.last_hash, self.height, sync, revert=True) self.pop_header() + self.flush_headers() - self.block_hashes.remove(block_hash) - self.import_block(block, self.last_hash(), self.height, revert=True) + self.height = self.height -1 + + # read previous header from disk + self.header = self.read_header(self.height) + self.last_hash = self.hash_header(self.header) - self.header = self.block2header(self.bitcoind('getblock', [self.last_hash()])) + self.header = self.block2header(self.bitcoind('getblock', [self.last_hash])) + - def memorypool_update(self): @@ -643,7 +695,8 @@ class BlockchainProcessor(Processor): for x in tx.get('inputs'): txi = (x.get('prevout_hash') + int_to_hex(x.get('prevout_n'), 4)).decode('hex') try: - addr = self.db.Get(txi) + h160 = self.db.Get(txi) + addr = hash_160_to_bc_address(h160) except: continue l = self.mempool_addresses.get(tx_hash, []) @@ -733,8 +786,6 @@ class BlockchainProcessor(Processor): if addr in self.watched_addresses: status = self.get_status( addr ) self.push_response({ 'id': None, 'method':'blockchain.address.subscribe', 'params':[addr, status] }) - self.push_response({ 'id': None, 'method':'blockchain.address.subscribe2', 'params':[addr, status] }) - if not self.shared.stopped(): threading.Timer(10, self.main_iteration).start() diff --git a/backends/bitcoind/deserialize.py b/backends/bitcoind/deserialize.py index 395e152..60af334 100644 --- a/backends/bitcoind/deserialize.py +++ b/backends/bitcoind/deserialize.py @@ -289,8 +289,13 @@ def parse_Transaction(vds, is_coinbase): d['outputs'] = [] for i in xrange(n_vout): o = parse_TxOut(vds, i) - if o['address'] is not None: - d['outputs'].append(o) + + #if o['address'] == "None" and o['value']==0: + # print("skipping strange tx output with zero value") + # continue + # if o['address'] != "None": + d['outputs'].append(o) + d['lockTime'] = vds.read_uint32() return d @@ -386,11 +391,22 @@ def extract_public_key(bytes): if match_decoded(decoded, match): return public_key_to_bc_address(decoded[0][1]) + # coins sent to black hole + # DUP HASH160 20 BYTES:... EQUALVERIFY CHECKSIG + match = [ opcodes.OP_DUP, opcodes.OP_HASH160, opcodes.OP_0, opcodes.OP_EQUALVERIFY, opcodes.OP_CHECKSIG ] + if match_decoded(decoded, match): + return "None" + # Pay-by-Bitcoin-address TxOuts look like: # DUP HASH160 20 BYTES:... EQUALVERIFY CHECKSIG match = [ opcodes.OP_DUP, opcodes.OP_HASH160, opcodes.OP_PUSHDATA4, opcodes.OP_EQUALVERIFY, opcodes.OP_CHECKSIG ] if match_decoded(decoded, match): return hash_160_to_bc_address(decoded[2][1]) + # strange tx + match = [ opcodes.OP_DUP, opcodes.OP_HASH160, opcodes.OP_PUSHDATA4, opcodes.OP_EQUALVERIFY, opcodes.OP_CHECKSIG, opcodes.OP_NOP ] + if match_decoded(decoded, match): + return hash_160_to_bc_address(decoded[2][1]) + #raise BaseException("address not found in script") see ce35795fb64c268a52324b884793b3165233b1e6d678ccaadf760628ec34d76b - return "(None)" + return "None" diff --git a/backends/bitcoind/util.py b/backends/bitcoind/util.py new file mode 100644 index 0000000..f9b9ddc --- /dev/null +++ b/backends/bitcoind/util.py @@ -0,0 +1,183 @@ +#!/usr/bin/env python +# +# Electrum - lightweight Bitcoin client +# Copyright (C) 2011 thomasv@gitorious +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . + + +import hashlib, base64, re + + +def rev_hex(s): + return s.decode('hex')[::-1].encode('hex') + +def int_to_hex(i, length=1): + s = hex(i)[2:].rstrip('L') + s = "0"*(2*length - len(s)) + s + return rev_hex(s) + +def var_int(i): + if i<0xfd: + return int_to_hex(i) + elif i<=0xffff: + return "fd"+int_to_hex(i,2) + elif i<=0xffffffff: + return "fe"+int_to_hex(i,4) + else: + return "ff"+int_to_hex(i,8) + + +Hash = lambda x: hashlib.sha256(hashlib.sha256(x).digest()).digest() +hash_encode = lambda x: x[::-1].encode('hex') +hash_decode = lambda x: x.decode('hex')[::-1] + + +def header_to_string(res): + pbh = res.get('prev_block_hash') + if pbh is None: pbh = '0'*64 + s = int_to_hex(res.get('version'),4) \ + + rev_hex(pbh) \ + + rev_hex(res.get('merkle_root')) \ + + int_to_hex(int(res.get('timestamp')),4) \ + + int_to_hex(int(res.get('bits')),4) \ + + int_to_hex(int(res.get('nonce')),4) + return s + +def header_from_string( s): + hex_to_int = lambda s: eval('0x' + s[::-1].encode('hex')) + h = {} + h['version'] = hex_to_int(s[0:4]) + h['prev_block_hash'] = hash_encode(s[4:36]) + h['merkle_root'] = hash_encode(s[36:68]) + h['timestamp'] = hex_to_int(s[68:72]) + h['bits'] = hex_to_int(s[72:76]) + h['nonce'] = hex_to_int(s[76:80]) + return h + + +############ functions from pywallet ##################### + +addrtype = 0 + +def hash_160(public_key): + try: + md = hashlib.new('ripemd160') + md.update(hashlib.sha256(public_key).digest()) + return md.digest() + except: + import ripemd + md = ripemd.new(hashlib.sha256(public_key).digest()) + return md.digest() + + +def public_key_to_bc_address(public_key): + h160 = hash_160(public_key) + return hash_160_to_bc_address(h160) + +def hash_160_to_bc_address(h160): + if h160 == 'None': return 'None' + vh160 = chr(addrtype) + h160 + h = Hash(vh160) + addr = vh160 + h[0:4] + return b58encode(addr) + +def bc_address_to_hash_160(addr): + if addr == 'None': return 'None' + bytes = b58decode(addr, 25) + return bytes[1:21] + + +__b58chars = '123456789ABCDEFGHJKLMNPQRSTUVWXYZabcdefghijkmnopqrstuvwxyz' +__b58base = len(__b58chars) + +def b58encode(v): + """ encode v, which is a string of bytes, to base58.""" + + long_value = 0L + for (i, c) in enumerate(v[::-1]): + long_value += (256**i) * ord(c) + + result = '' + while long_value >= __b58base: + div, mod = divmod(long_value, __b58base) + result = __b58chars[mod] + result + long_value = div + result = __b58chars[long_value] + result + + # Bitcoin does a little leading-zero-compression: + # leading 0-bytes in the input become leading-1s + nPad = 0 + for c in v: + if c == '\0': nPad += 1 + else: break + + return (__b58chars[0]*nPad) + result + +def b58decode(v, length): + """ decode v into a string of len bytes.""" + long_value = 0L + for (i, c) in enumerate(v[::-1]): + long_value += __b58chars.find(c) * (__b58base**i) + + result = '' + while long_value >= 256: + div, mod = divmod(long_value, 256) + result = chr(mod) + result + long_value = div + result = chr(long_value) + result + + nPad = 0 + for c in v: + if c == __b58chars[0]: nPad += 1 + else: break + + result = chr(0)*nPad + result + if length is not None and len(result) != length: + return None + + return result + + +def EncodeBase58Check(vchIn): + hash = Hash(vchIn) + return b58encode(vchIn + hash[0:4]) + +def DecodeBase58Check(psz): + vchRet = b58decode(psz, None) + key = vchRet[0:-4] + csum = vchRet[-4:] + hash = Hash(key) + cs32 = hash[0:4] + if cs32 != csum: + return None + else: + return key + +def PrivKeyToSecret(privkey): + return privkey[9:9+32] + +def SecretToASecret(secret): + vchIn = chr(addrtype+128) + secret + return EncodeBase58Check(vchIn) + +def ASecretToSecret(key): + vch = DecodeBase58Check(key) + if vch and vch[0] == chr(addrtype+128): + return vch[1:] + else: + return False + +########### end pywallet functions ####################### + diff --git a/backends/irc/__init__.py b/backends/irc/__init__.py index e70746c..d800a4a 100644 --- a/backends/irc/__init__.py +++ b/backends/irc/__init__.py @@ -24,6 +24,7 @@ class IrcThread(threading.Thread): self.prepend = 'E_' if config.get('server', 'coin') == 'litecoin': self.prepend = 'EL_' + self.pruning = config.get('server', 'backend') == 'leveldb' self.nick = self.prepend + self.nick def get_peers(self): @@ -32,6 +33,7 @@ class IrcThread(threading.Thread): def getname(self): s = 'v' + VERSION + ' ' + if self.pruning: s += 'p ' if self.stratum_tcp_port: s += 't' + self.stratum_tcp_port + ' ' if self.stratum_http_port: diff --git a/processor.py b/processor.py index c6972f8..8d1788d 100644 --- a/processor.py +++ b/processor.py @@ -17,16 +17,17 @@ print_lock = threading.Lock() def print_log(*args): args = [str(item) for item in args] with print_lock: - sys.stderr.write(" ".join(args) + "\n") + sys.stderr.write(timestr() + " " + " ".join(args) + "\n") sys.stderr.flush() class Shared: - def __init__(self): + def __init__(self, config): self.lock = threading.Lock() self._stopped = False + self.config = config def stop(self): print_log( "Stopping Stratum" ) @@ -70,8 +71,8 @@ class Processor(threading.Thread): class Dispatcher: - def __init__(self): - self.shared = Shared() + def __init__(self, config): + self.shared = Shared(config) self.request_dispatcher = RequestDispatcher(self.shared) self.request_dispatcher.start() self.response_dispatcher = \ @@ -151,18 +152,10 @@ class RequestDispatcher(threading.Thread): params = request.get('params',[]) suffix = method.split('.')[-1] - is_new = session.protocol_version >= 0.5 - - if is_new and method == 'blockchain.address.get_history': - method = 'blockchain.address.get_history2' - request['method'] = method - - if suffix == 'subscribe': - if is_new and method == 'blockchain.address.subscribe': - method = 'blockchain.address.subscribe2' - request['method'] = method - - session.subscribe_to_service(method, params) + if session is not None: + is_new = session.protocol_version >= 0.5 + if suffix == 'subscribe': + session.subscribe_to_service(method, params) # store session and id locally request['id'] = self.store_session_id(session, request['id']) @@ -234,7 +227,7 @@ class Session: addr = None if self.subscriptions: - print_log( timestr(), self.name, self.address, addr, len(self.subscriptions), self.version ) + print_log( "%4s"%self.name, "%14s"%self.address, "%35s"%addr, "%3d"%len(self.subscriptions), self.version ) def stopped(self): with self.lock: @@ -253,7 +246,7 @@ class Session: return method, elif method == "blockchain.headers.subscribe": return method, - elif method in ["blockchain.address.subscribe", "blockchain.address.subscribe2"]: + elif method in ["blockchain.address.subscribe"]: if not params: return None else: @@ -268,9 +261,9 @@ class Session: class ResponseDispatcher(threading.Thread): - def __init__(self, shared, processor): + def __init__(self, shared, request_dispatcher): self.shared = shared - self.processor = processor + self.request_dispatcher = request_dispatcher threading.Thread.__init__(self) self.daemon = True @@ -279,7 +272,7 @@ class ResponseDispatcher(threading.Thread): self.update() def update(self): - response = self.processor.pop_response() + response = self.request_dispatcher.pop_response() #print "pop response", response internal_id = response.get('id') method = response.get('method') @@ -287,28 +280,34 @@ class ResponseDispatcher(threading.Thread): # A notification if internal_id is None: # and method is not None and params is not None: - self.notification(method, params, response) + found = self.notification(method, params, response) + if not found and method == 'blockchain.address.subscribe': + params2 = [self.shared.config.get('server','password')] + params + self.request_dispatcher.push_request(None,{'method':method.replace('.subscribe', '.unsubscribe'), 'params':params2, 'id':None}) + # A response - elif internal_id is not None: # and method is None and params is None: + elif internal_id is not None: self.send_response(internal_id, response) else: print_log( "no method", response) def notification(self, method, params, response): subdesc = Session.build_subdesc(method, params) - for session in self.processor.sessions: + found = False + for session in self.request_dispatcher.sessions: if session.stopped(): continue if session.contains_subscription(subdesc): - if response.get('method') == "blockchain.address.subscribe2": - response['method'] = "blockchain.address.subscribe" session.send_response(response) + found = True + # if not found: print_log( "no subscriber for", subdesc) + return found def send_response(self, internal_id, response): - session, message_id = self.processor.get_session_id(internal_id) + session, message_id = self.request_dispatcher.get_session_id(internal_id) if session: response['id'] = message_id session.send_response(response) - else: - print_log( "send_response: no session", message_id, internal_id, response ) + #else: + # print_log( "send_response: no session", message_id, internal_id, response ) diff --git a/server.py b/server.py index 6ada775..f874cfe 100755 --- a/server.py +++ b/server.py @@ -114,7 +114,7 @@ if __name__ == '__main__': run_rpc_command(sys.argv[1], stratum_tcp_port) sys.exit(0) - from processor import Dispatcher + from processor import Dispatcher, print_log from backends.irc import ServerProcessor backend_name = config.get('server', 'backend') @@ -122,16 +122,17 @@ if __name__ == '__main__': from backends.abe import BlockchainProcessor elif backend_name == 'libbitcoin': from backends.libbitcoin import BlockchainProcessor - elif backend_name == 'bitcoind': + elif backend_name == 'leveldb': from backends.bitcoind import BlockchainProcessor else: print "Unknown backend '%s' specified\n" % backend_name sys.exit(1) - print "\n\n\n\nStarting Electrum server on", host + for i in range(5): print "" + print_log( "Starting Electrum server on", host) # Create hub - dispatcher = Dispatcher() + dispatcher = Dispatcher(config) shared = dispatcher.shared # Create and register processors @@ -172,5 +173,5 @@ if __name__ == '__main__': except: shared.stop() - print "Electrum Server stopped" + print_log( "Electrum Server stopped")