import deserialize
import ast, time, threading, hashlib
from Queue import Queue
-import traceback, sys, os
-
-
-
-Hash = lambda x: hashlib.sha256(hashlib.sha256(x).digest()).digest()
-hash_encode = lambda x: x[::-1].encode('hex')
-hash_decode = lambda x: x.decode('hex')[::-1]
-
-
-
-def rev_hex(s):
- return s.decode('hex')[::-1].encode('hex')
-
-
-def int_to_hex(i, length=1):
- s = hex(i)[2:].rstrip('L')
- s = "0"*(2*length - len(s)) + s
- return rev_hex(s)
-
-def header_to_string(res):
- pbh = res.get('prev_block_hash')
- if pbh is None: pbh = '0'*64
- s = int_to_hex(res.get('version'),4) \
- + rev_hex(pbh) \
- + rev_hex(res.get('merkle_root')) \
- + int_to_hex(int(res.get('timestamp')),4) \
- + int_to_hex(int(res.get('bits')),4) \
- + int_to_hex(int(res.get('nonce')),4)
- return s
-
-def header_from_string( s):
- hex_to_int = lambda s: eval('0x' + s[::-1].encode('hex'))
- h = {}
- h['version'] = hex_to_int(s[0:4])
- h['prev_block_hash'] = hash_encode(s[4:36])
- h['merkle_root'] = hash_encode(s[36:68])
- h['timestamp'] = hex_to_int(s[68:72])
- h['bits'] = hex_to_int(s[72:76])
- h['nonce'] = hex_to_int(s[76:80])
- return h
-
-
+import traceback, sys, os, random
+from util import Hash, hash_encode, hash_decode, rev_hex, int_to_hex
+from util import bc_address_to_hash_160, hash_160_to_bc_address, header_to_string, header_from_string
from processor import Processor, print_log
class BlockchainProcessor(Processor):
Processor.__init__(self)
self.shared = shared
+ self.config = config
self.up_to_date = False
self.watched_addresses = []
self.history_cache = {}
config.get('bitcoind','port'))
self.height = 0
+ self.is_test = False
self.sent_height = 0
self.sent_header = None
try:
- hist = self.deserialize(self.db.Get('0'))
- hh, self.height, _ = hist[0]
- self.block_hashes = [hh]
+ hist = self.deserialize(self.db.Get('height'))
+ self.last_hash, self.height, _ = hist[0]
print_log( "hist", hist )
except:
#traceback.print_exc(file=sys.stdout)
print_log('initializing database')
self.height = 0
- self.block_hashes = [ '000000000019d6689c085ae165831e934ff763ae46a2a6c172b3f1b60a8ce26f' ]
+ self.last_hash = '000000000019d6689c085ae165831e934ff763ae46a2a6c172b3f1b60a8ce26f'
# catch_up headers
self.init_headers(self.height)
shared.stop()
sys.exit(0)
- print "blockchain is up to date."
+ print_log( "blockchain is up to date." )
threading.Timer(10, self.main_iteration).start()
self.chunk_cache = {}
self.headers_filename = os.path.join( self.dbpath, 'blockchain_headers')
- height = 0
if os.path.exists(self.headers_filename):
- height = os.path.getsize(self.headers_filename)/80
-
- if height:
- prev_header = self.read_header(height -1)
- prev_hash = self.hash_header(prev_header)
+ height = os.path.getsize(self.headers_filename)/80 - 1 # the current height
+ if height > 0:
+ prev_hash = self.hash_header(self.read_header(height))
+ else:
+ prev_hash = None
else:
open(self.headers_filename,'wb').close()
prev_hash = None
+ height = -1
- if height != db_height:
+ if height < db_height:
print_log( "catching up missing headers:", height, db_height)
- s = ''
try:
- for i in range(height, db_height):
- header = self.get_header(i)
- assert prev_hash == header.get('prev_block_hash')
+ while height < db_height:
+ height = height + 1
+ header = self.get_header(height)
+ if height>1:
+ assert prev_hash == header.get('prev_block_hash')
self.write_header(header, sync=False)
prev_hash = self.hash_header(header)
- if i%1000==0: print_log("headers file:",i)
+ if height%1000==0: print_log("headers file:",height)
except KeyboardInterrupt:
self.flush_headers()
sys.exit()
def write_header(self, header, sync=True):
if not self.headers_data:
self.headers_offset = header.get('block_height')
+
self.headers_data += header_to_string(header).decode('hex')
if sync or len(self.headers_data) > 40*100:
self.flush_headers()
with self.dblock:
try:
- hist = self.deserialize(self.db.Get(addr))
+ hash_160 = bc_address_to_hash_160(addr)
+ hist = self.deserialize(self.db.Get(hash_160))
is_known = True
except:
hist = []
return {"block_height":height, "merkle":s, "pos":tx_pos}
- def add_to_batch(self, addr, tx_hash, tx_pos, tx_height):
- # we do it chronologically, so nothing wrong can happen...
+
+ def add_to_history(self, addr, tx_hash, tx_pos, tx_height):
+
+ # keep it sorted
s = (tx_hash + int_to_hex(tx_pos, 4) + int_to_hex(tx_height, 4)).decode('hex')
- self.batch_list[addr] += s
+
+ serialized_hist = self.batch_list[addr]
+
+ l = len(serialized_hist)/40
+ for i in range(l-1, -1, -1):
+ item = serialized_hist[40*i:40*(i+1)]
+ item_height = int( rev_hex( item[36:40].encode('hex') ), 16 )
+ if item_height < tx_height:
+ serialized_hist = serialized_hist[0:40*(i+1)] + s + serialized_hist[40*(i+1):]
+ break
+ else:
+ serialized_hist = s + serialized_hist
+
+ self.batch_list[addr] = serialized_hist
# backlink
txo = (tx_hash + int_to_hex(tx_pos, 4)).decode('hex')
self.batch_txio[txo] = addr
- def remove_from_batch(self, tx_hash, tx_pos):
+ def remove_from_history(self, addr, tx_hash, tx_pos):
txi = (tx_hash + int_to_hex(tx_pos, 4)).decode('hex')
- try:
- addr = self.batch_txio[txi]
- except:
- #raise BaseException(tx_hash, tx_pos)
- print "WARNING: cannot find address for", (tx_hash, tx_pos)
- return
+ if addr is None:
+ try:
+ addr = self.batch_txio[txi]
+ except:
+ raise BaseException(tx_hash, tx_pos)
+
serialized_hist = self.batch_list[addr]
l = len(serialized_hist)/40
for i in range(l):
- if serialized_hist[40*i:40*i+36] == txi:
+ item = serialized_hist[40*i:40*(i+1)]
+ if item[0:36] == txi:
+ height = int( rev_hex( item[36:40].encode('hex') ), 16 )
serialized_hist = serialized_hist[0:40*i] + serialized_hist[40*(i+1):]
break
else:
+ hist = self.deserialize(serialized_hist)
raise BaseException("prevout not found", addr, hist, tx_hash, tx_pos)
+
self.batch_list[addr] = serialized_hist
+ return height, addr
def deserialize_block(self, block):
is_coinbase = False
return tx_hashes, txdict
+ def get_undo_info(self, height):
+ s = self.db.Get("undo%d"%(height%100))
+ return eval(s)
+
+ def write_undo_info(self, batch, height, undo_info):
+ if self.is_test or height > self.bitcoind_height - 100:
+ batch.Put("undo%d"%(height%100), repr(undo_info))
+
def import_block(self, block, block_hash, block_height, sync, revert=False):
self.batch_list = {} # address -> history
self.batch_txio = {} # transaction i/o -> address
- inputs_to_read = []
+ block_inputs = []
+ block_outputs = []
addr_to_read = []
# deserialize transactions
t0 = time.time()
tx_hashes, txdict = self.deserialize_block(block)
- # read addresses of tx inputs
t00 = time.time()
- for tx in txdict.values():
- for x in tx.get('inputs'):
- txi = (x.get('prevout_hash') + int_to_hex(x.get('prevout_n'), 4)).decode('hex')
- inputs_to_read.append(txi)
- inputs_to_read.sort()
- for txi in inputs_to_read:
- try:
- addr = self.db.Get(txi)
- except:
- # the input could come from the same block
- continue
- self.batch_txio[txi] = addr
- addr_to_read.append(addr)
+ if not revert:
+ # read addresses of tx inputs
+ for tx in txdict.values():
+ for x in tx.get('inputs'):
+ txi = (x.get('prevout_hash') + int_to_hex(x.get('prevout_n'), 4)).decode('hex')
+ block_inputs.append(txi)
+
+ block_inputs.sort()
+ for txi in block_inputs:
+ try:
+ addr = self.db.Get(txi)
+ except:
+ # the input could come from the same block
+ continue
+ self.batch_txio[txi] = addr
+ addr_to_read.append(addr)
+
+ else:
+ for txid, tx in txdict.items():
+ for x in tx.get('outputs'):
+ txo = (txid + int_to_hex(x.get('index'), 4)).decode('hex')
+ block_outputs.append(txo)
+
# read histories of addresses
for txid, tx in txdict.items():
for x in tx.get('outputs'):
- addr_to_read.append(x.get('address'))
+ hash_160 = bc_address_to_hash_160(x.get('address'))
+ addr_to_read.append(hash_160)
addr_to_read.sort()
for addr in addr_to_read:
self.batch_list[addr] = self.db.Get(addr)
except:
self.batch_list[addr] = ''
-
+
+
+ if revert:
+ undo_info = self.get_undo_info(block_height)
+ # print "undo", block_height, undo_info
+ else: undo_info = {}
+
# process
t1 = time.time()
+ if revert: tx_hashes = tx_hashes[::-1]
for txid in tx_hashes: # must be ordered
tx = txdict[txid]
if not revert:
+
+ undo = []
for x in tx.get('inputs'):
- self.remove_from_batch( x.get('prevout_hash'), x.get('prevout_n'))
+ prevout_height, prevout_addr = self.remove_from_history( None, x.get('prevout_hash'), x.get('prevout_n'))
+ undo.append( (prevout_height, prevout_addr) )
+ undo_info[txid] = undo
+
for x in tx.get('outputs'):
- self.add_to_batch( x.get('address'), txid, x.get('index'), block_height)
+ hash_160 = bc_address_to_hash_160(x.get('address'))
+ self.add_to_history( hash_160, txid, x.get('index'), block_height)
+
else:
for x in tx.get('outputs'):
- self.remove_from_batch( x.get('prevout_hash'), x.get('prevout_n'))
+ hash_160 = bc_address_to_hash_160(x.get('address'))
+ self.remove_from_history( hash_160, txid, x.get('index'))
+
+ i = 0
for x in tx.get('inputs'):
- self.add_to_batch( x.get('address'), txid, x.get('index'), block_height)
+ prevout_height, prevout_addr = undo_info.get(txid)[i]
+ i += 1
+
+ # read the history into batch list
+ if self.batch_list.get(prevout_addr) is None:
+ self.batch_list[prevout_addr] = self.db.Get(prevout_addr)
+
+ # re-add them to the history
+ self.add_to_history( prevout_addr, x.get('prevout_hash'), x.get('prevout_n'), prevout_height)
+ print_log( "new hist for", hash_160_to_bc_address(prevout_addr), self.deserialize(self.batch_list[prevout_addr]) )
# write
max_len = 0
max_len = l
max_addr = addr
- for txio, addr in self.batch_txio.items():
- batch.Put(txio, addr)
- # delete spent inputs
- for txi in inputs_to_read:
- batch.Delete(txi)
- batch.Put('0', self.serialize( [(block_hash, block_height, 0)] ) )
+ if not revert:
+ # add new created outputs
+ for txio, addr in self.batch_txio.items():
+ batch.Put(txio, addr)
+ # delete spent inputs
+ for txi in block_inputs:
+ batch.Delete(txi)
+ # add undo info
+ self.write_undo_info(batch, block_height, undo_info)
+ else:
+ # restore spent inputs
+ for txio, addr in self.batch_txio.items():
+ batch.Put(txio, addr)
+ # delete spent outputs
+ for txo in block_outputs:
+ batch.Delete(txo)
+
+
+ # add the max
+ batch.Put('height', self.serialize( [(block_hash, block_height, 0)] ) )
# actual write
self.db.Write(batch, sync = sync)
t3 = time.time()
- if t3 - t0 > 10:
+ if t3 - t0 > 10 and not sync:
print_log("block", block_height,
"parse:%0.2f "%(t00 - t0),
"read:%0.2f "%(t1 - t00),
"proc:%.2f "%(t2-t1),
"write:%.2f "%(t3-t2),
- "max:", max_len, max_addr)
+ "max:", max_len, hash_160_to_bc_address(max_addr))
- for addr in self.batch_list.keys(): self.invalidate_cache(addr)
+ for h160 in self.batch_list.keys():
+ addr = hash_160_to_bc_address(h160)
+ self.invalidate_cache(addr)
error = str(e) + ': ' + address
print_log( "error:", error )
- elif method == 'blockchain.address.subscribe2':
+ elif method == 'blockchain.address.unsubscribe':
try:
- address = params[0]
- result = self.get_status(address, cache_only)
- self.watch_address(address)
+ password = params[0]
+ address = params[1]
+ if password == self.config.get('server','password'):
+ self.watched_addresses.remove(address)
+ print_log('unsubscribed', address)
+ result = "ok"
+ else:
+ print_log('incorrect password')
+ result = "authentication error"
except BaseException, e:
error = str(e) + ': ' + address
print_log( "error:", error )
- elif method == 'blockchain.address.get_history2':
+ elif method == 'blockchain.address.get_history':
try:
address = params[0]
result = self.get_history( address, cache_only )
- def last_hash(self):
- return self.block_hashes[-1]
-
-
def catch_up(self, sync = True):
+
t1 = time.time()
while not self.shared.stopped():
# are we done yet?
info = self.bitcoind('getinfo')
- bitcoind_height = info.get('blocks')
- bitcoind_block_hash = self.bitcoind('getblockhash', [bitcoind_height])
- if self.last_hash() == bitcoind_block_hash:
+ self.bitcoind_height = info.get('blocks')
+ bitcoind_block_hash = self.bitcoind('getblockhash', [self.bitcoind_height])
+ if self.last_hash == bitcoind_block_hash:
self.up_to_date = True
break
# not done..
self.up_to_date = False
- block_hash = self.bitcoind('getblockhash', [self.height+1])
- block = self.bitcoind('getblock', [block_hash, 1])
+ next_block_hash = self.bitcoind('getblockhash', [self.height+1])
+ next_block = self.bitcoind('getblock', [next_block_hash, 1])
- if block.get('previousblockhash') == self.last_hash():
+ # fixme: this is unsafe, if we revert when the undo info is not yet written
+ revert = (random.randint(1, 100)==1) if self.is_test else False
- self.import_block(block, block_hash, self.height+1, sync)
- self.height = self.height + 1
- self.write_header(self.block2header(block), sync)
+ if (next_block.get('previousblockhash') == self.last_hash) and not revert:
- self.block_hashes.append(block_hash)
- self.block_hashes = self.block_hashes[-10:]
+ self.import_block(next_block, next_block_hash, self.height+1, sync)
+ self.height = self.height + 1
+ self.write_header(self.block2header(next_block), sync)
+ self.last_hash = next_block_hash
- if (self.height+1)%100 == 0 and not sync:
+ if (self.height)%100 == 0 and not sync:
t2 = time.time()
print_log( "catch_up: block %d (%.3fs)"%( self.height, t2 - t1 ) )
t1 = t2
-
else:
# revert current block
- print_log( "bc2: reorg", self.height, block.get('previousblockhash'), self.last_hash() )
- block_hash = self.last_hash()
- block = self.bitcoind('getblock', [block_hash, 1])
- self.height = self.height -1
+ block = self.bitcoind('getblock', [self.last_hash, 1])
+ print_log( "blockchain reorg", self.height, block.get('previousblockhash'), self.last_hash )
+ self.import_block(block, self.last_hash, self.height, sync, revert=True)
self.pop_header()
+ self.flush_headers()
- self.block_hashes.remove(block_hash)
- self.import_block(block, self.last_hash(), self.height, revert=True)
+ self.height = self.height -1
+
+ # read previous header from disk
+ self.header = self.read_header(self.height)
+ self.last_hash = self.hash_header(self.header)
- self.header = self.block2header(self.bitcoind('getblock', [self.last_hash()]))
+ self.header = self.block2header(self.bitcoind('getblock', [self.last_hash]))
+
-
def memorypool_update(self):
for x in tx.get('inputs'):
txi = (x.get('prevout_hash') + int_to_hex(x.get('prevout_n'), 4)).decode('hex')
try:
- addr = self.db.Get(txi)
+ h160 = self.db.Get(txi)
+ addr = hash_160_to_bc_address(h160)
except:
continue
l = self.mempool_addresses.get(tx_hash, [])
if addr in self.watched_addresses:
status = self.get_status( addr )
self.push_response({ 'id': None, 'method':'blockchain.address.subscribe', 'params':[addr, status] })
- self.push_response({ 'id': None, 'method':'blockchain.address.subscribe2', 'params':[addr, status] })
-
if not self.shared.stopped():
threading.Timer(10, self.main_iteration).start()
--- /dev/null
+#!/usr/bin/env python
+#
+# Electrum - lightweight Bitcoin client
+# Copyright (C) 2011 thomasv@gitorious
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see <http://www.gnu.org/licenses/>.
+
+
+import hashlib, base64, re
+
+
+def rev_hex(s):
+ return s.decode('hex')[::-1].encode('hex')
+
+def int_to_hex(i, length=1):
+ s = hex(i)[2:].rstrip('L')
+ s = "0"*(2*length - len(s)) + s
+ return rev_hex(s)
+
+def var_int(i):
+ if i<0xfd:
+ return int_to_hex(i)
+ elif i<=0xffff:
+ return "fd"+int_to_hex(i,2)
+ elif i<=0xffffffff:
+ return "fe"+int_to_hex(i,4)
+ else:
+ return "ff"+int_to_hex(i,8)
+
+
+Hash = lambda x: hashlib.sha256(hashlib.sha256(x).digest()).digest()
+hash_encode = lambda x: x[::-1].encode('hex')
+hash_decode = lambda x: x.decode('hex')[::-1]
+
+
+def header_to_string(res):
+ pbh = res.get('prev_block_hash')
+ if pbh is None: pbh = '0'*64
+ s = int_to_hex(res.get('version'),4) \
+ + rev_hex(pbh) \
+ + rev_hex(res.get('merkle_root')) \
+ + int_to_hex(int(res.get('timestamp')),4) \
+ + int_to_hex(int(res.get('bits')),4) \
+ + int_to_hex(int(res.get('nonce')),4)
+ return s
+
+def header_from_string( s):
+ hex_to_int = lambda s: eval('0x' + s[::-1].encode('hex'))
+ h = {}
+ h['version'] = hex_to_int(s[0:4])
+ h['prev_block_hash'] = hash_encode(s[4:36])
+ h['merkle_root'] = hash_encode(s[36:68])
+ h['timestamp'] = hex_to_int(s[68:72])
+ h['bits'] = hex_to_int(s[72:76])
+ h['nonce'] = hex_to_int(s[76:80])
+ return h
+
+
+############ functions from pywallet #####################
+
+addrtype = 0
+
+def hash_160(public_key):
+ try:
+ md = hashlib.new('ripemd160')
+ md.update(hashlib.sha256(public_key).digest())
+ return md.digest()
+ except:
+ import ripemd
+ md = ripemd.new(hashlib.sha256(public_key).digest())
+ return md.digest()
+
+
+def public_key_to_bc_address(public_key):
+ h160 = hash_160(public_key)
+ return hash_160_to_bc_address(h160)
+
+def hash_160_to_bc_address(h160):
+ if h160 == 'None': return 'None'
+ vh160 = chr(addrtype) + h160
+ h = Hash(vh160)
+ addr = vh160 + h[0:4]
+ return b58encode(addr)
+
+def bc_address_to_hash_160(addr):
+ if addr == 'None': return 'None'
+ bytes = b58decode(addr, 25)
+ return bytes[1:21]
+
+
+__b58chars = '123456789ABCDEFGHJKLMNPQRSTUVWXYZabcdefghijkmnopqrstuvwxyz'
+__b58base = len(__b58chars)
+
+def b58encode(v):
+ """ encode v, which is a string of bytes, to base58."""
+
+ long_value = 0L
+ for (i, c) in enumerate(v[::-1]):
+ long_value += (256**i) * ord(c)
+
+ result = ''
+ while long_value >= __b58base:
+ div, mod = divmod(long_value, __b58base)
+ result = __b58chars[mod] + result
+ long_value = div
+ result = __b58chars[long_value] + result
+
+ # Bitcoin does a little leading-zero-compression:
+ # leading 0-bytes in the input become leading-1s
+ nPad = 0
+ for c in v:
+ if c == '\0': nPad += 1
+ else: break
+
+ return (__b58chars[0]*nPad) + result
+
+def b58decode(v, length):
+ """ decode v into a string of len bytes."""
+ long_value = 0L
+ for (i, c) in enumerate(v[::-1]):
+ long_value += __b58chars.find(c) * (__b58base**i)
+
+ result = ''
+ while long_value >= 256:
+ div, mod = divmod(long_value, 256)
+ result = chr(mod) + result
+ long_value = div
+ result = chr(long_value) + result
+
+ nPad = 0
+ for c in v:
+ if c == __b58chars[0]: nPad += 1
+ else: break
+
+ result = chr(0)*nPad + result
+ if length is not None and len(result) != length:
+ return None
+
+ return result
+
+
+def EncodeBase58Check(vchIn):
+ hash = Hash(vchIn)
+ return b58encode(vchIn + hash[0:4])
+
+def DecodeBase58Check(psz):
+ vchRet = b58decode(psz, None)
+ key = vchRet[0:-4]
+ csum = vchRet[-4:]
+ hash = Hash(key)
+ cs32 = hash[0:4]
+ if cs32 != csum:
+ return None
+ else:
+ return key
+
+def PrivKeyToSecret(privkey):
+ return privkey[9:9+32]
+
+def SecretToASecret(secret):
+ vchIn = chr(addrtype+128) + secret
+ return EncodeBase58Check(vchIn)
+
+def ASecretToSecret(key):
+ vch = DecodeBase58Check(key)
+ if vch and vch[0] == chr(addrtype+128):
+ return vch[1:]
+ else:
+ return False
+
+########### end pywallet functions #######################
+
def print_log(*args):
args = [str(item) for item in args]
with print_lock:
- sys.stderr.write(" ".join(args) + "\n")
+ sys.stderr.write(timestr() + " " + " ".join(args) + "\n")
sys.stderr.flush()
class Shared:
- def __init__(self):
+ def __init__(self, config):
self.lock = threading.Lock()
self._stopped = False
+ self.config = config
def stop(self):
print_log( "Stopping Stratum" )
class Dispatcher:
- def __init__(self):
- self.shared = Shared()
+ def __init__(self, config):
+ self.shared = Shared(config)
self.request_dispatcher = RequestDispatcher(self.shared)
self.request_dispatcher.start()
self.response_dispatcher = \
params = request.get('params',[])
suffix = method.split('.')[-1]
- is_new = session.protocol_version >= 0.5
-
- if is_new and method == 'blockchain.address.get_history':
- method = 'blockchain.address.get_history2'
- request['method'] = method
-
- if suffix == 'subscribe':
- if is_new and method == 'blockchain.address.subscribe':
- method = 'blockchain.address.subscribe2'
- request['method'] = method
-
- session.subscribe_to_service(method, params)
+ if session is not None:
+ is_new = session.protocol_version >= 0.5
+ if suffix == 'subscribe':
+ session.subscribe_to_service(method, params)
# store session and id locally
request['id'] = self.store_session_id(session, request['id'])
addr = None
if self.subscriptions:
- print_log( timestr(), self.name, self.address, addr, len(self.subscriptions), self.version )
+ print_log( "%4s"%self.name, "%14s"%self.address, "%35s"%addr, "%3d"%len(self.subscriptions), self.version )
def stopped(self):
with self.lock:
return method,
elif method == "blockchain.headers.subscribe":
return method,
- elif method in ["blockchain.address.subscribe", "blockchain.address.subscribe2"]:
+ elif method in ["blockchain.address.subscribe"]:
if not params:
return None
else:
class ResponseDispatcher(threading.Thread):
- def __init__(self, shared, processor):
+ def __init__(self, shared, request_dispatcher):
self.shared = shared
- self.processor = processor
+ self.request_dispatcher = request_dispatcher
threading.Thread.__init__(self)
self.daemon = True
self.update()
def update(self):
- response = self.processor.pop_response()
+ response = self.request_dispatcher.pop_response()
#print "pop response", response
internal_id = response.get('id')
method = response.get('method')
# A notification
if internal_id is None: # and method is not None and params is not None:
- self.notification(method, params, response)
+ found = self.notification(method, params, response)
+ if not found and method == 'blockchain.address.subscribe':
+ params2 = [self.shared.config.get('server','password')] + params
+ self.request_dispatcher.push_request(None,{'method':method.replace('.subscribe', '.unsubscribe'), 'params':params2, 'id':None})
+
# A response
- elif internal_id is not None: # and method is None and params is None:
+ elif internal_id is not None:
self.send_response(internal_id, response)
else:
print_log( "no method", response)
def notification(self, method, params, response):
subdesc = Session.build_subdesc(method, params)
- for session in self.processor.sessions:
+ found = False
+ for session in self.request_dispatcher.sessions:
if session.stopped():
continue
if session.contains_subscription(subdesc):
- if response.get('method') == "blockchain.address.subscribe2":
- response['method'] = "blockchain.address.subscribe"
session.send_response(response)
+ found = True
+ # if not found: print_log( "no subscriber for", subdesc)
+ return found
def send_response(self, internal_id, response):
- session, message_id = self.processor.get_session_id(internal_id)
+ session, message_id = self.request_dispatcher.get_session_id(internal_id)
if session:
response['id'] = message_id
session.send_response(response)
- else:
- print_log( "send_response: no session", message_id, internal_id, response )
+ #else:
+ # print_log( "send_response: no session", message_id, internal_id, response )