2 # Copyright(C) 2012 thomasv@gitorious
4 # This program is free software: you can redistribute it and/or modify
5 # it under the terms of the GNU Affero General Public License as
6 # published by the Free Software Foundation, either version 3 of the
7 # License, or (at your option) any later version.
9 # This program is distributed in the hope that it will be useful, but
10 # WITHOUT ANY WARRANTY; without even the implied warranty of
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
12 # Affero General Public License for more details.
14 # You should have received a copy of the GNU Affero General Public
15 # License along with this program. If not, see
16 # <http://www.gnu.org/licenses/agpl.html>.
20 * server should check and return bitcoind status..
21 * improve txpoint sorting
22 * command to check cache
24 mempool transactions do not need to be added to the database; it slows it down
28 from Abe.abe import hash_to_address, decode_check_address
29 from Abe.DataStore import DataStore as Datastore_class
30 from Abe import DataStore, readconf, BCDataStream, deserialize, util, base58
32 import psycopg2, binascii
34 import thread, traceback, sys, urllib, operator
35 from json import dumps, loads
38 class MyStore(Datastore_class):
40 def __init__(self, config):
41 conf = DataStore.CONFIG_DEFAULTS
42 args, argv = readconf.parse_argv( [], conf)
43 args.dbtype = config.get('database','type')
44 if args.dbtype == 'sqlite3':
45 args.connect_args = { 'database' : config.get('database','database') }
46 elif args.dbtype == 'MySQLdb':
47 args.connect_args = { 'db' : config.get('database','database'), 'user' : config.get('database','username'), 'passwd' : config.get('database','password') }
48 elif args.dbtype == 'psycopg2':
49 args.connect_args = { 'database' : config.get('database','database') }
51 Datastore_class.__init__(self,args)
54 self.mempool_keys = {}
55 self.bitcoind_url = 'http://%s:%s@%s:%s/' % ( config.get('bitcoind','user'), config.get('bitcoind','password'), config.get('bitcoind','host'), config.get('bitcoind','port'))
57 self.address_queue = Queue()
59 self.dblock = thread.allocate_lock()
63 def import_block(self, b, chain_ids=frozenset()):
64 block_id = super(MyStore, self).import_block(b, chain_ids)
65 for pos in xrange(len(b['transactions'])):
66 tx = b['transactions'][pos]
68 tx['hash'] = util.double_sha256(tx['tx'])
69 tx_id = store.tx_find_id_and_value(tx)
71 self.update_tx_cache(tx_id)
73 print "error: import_block: no tx_id"
77 def update_tx_cache(self, txid):
78 inrows = self.get_tx_inputs(txid, False)
80 _hash = self.binout(row[6])
81 address = hash_to_address(chr(0), _hash)
82 if self.tx_cache.has_key(address):
83 print "cache: invalidating", address
84 self.tx_cache.pop(address)
85 self.address_queue.put(address)
87 outrows = self.get_tx_outputs(txid, False)
89 _hash = self.binout(row[6])
90 address = hash_to_address(chr(0), _hash)
91 if self.tx_cache.has_key(address):
92 print "cache: invalidating", address
93 self.tx_cache.pop(address)
94 self.address_queue.put(address)
96 def safe_sql(self,sql, params=(), lock=True):
98 if lock: self.dblock.acquire()
99 ret = self.selectall(sql,params)
100 if lock: self.dblock.release()
103 print "sql error", sql
106 def get_tx_outputs(self, tx_id, lock=True):
107 return self.safe_sql("""SELECT
109 txout.txout_scriptPubKey,
116 LEFT JOIN txin ON (txin.txout_id = txout.txout_id)
117 LEFT JOIN pubkey ON (pubkey.pubkey_id = txout.pubkey_id)
118 LEFT JOIN tx nexttx ON (txin.tx_id = nexttx.tx_id)
119 WHERE txout.tx_id = %d
120 ORDER BY txout.txout_pos
121 """%(tx_id), (), lock)
123 def get_tx_inputs(self, tx_id, lock=True):
124 return self.safe_sql(""" SELECT
128 COALESCE(prevtx.tx_hash, u.txout_tx_hash),
130 COALESCE(txout.txout_pos, u.txout_pos),
133 LEFT JOIN txout ON (txout.txout_id = txin.txout_id)
134 LEFT JOIN pubkey ON (pubkey.pubkey_id = txout.pubkey_id)
135 LEFT JOIN tx prevtx ON (txout.tx_id = prevtx.tx_id)
136 LEFT JOIN unlinked_txin u ON (u.txin_id = txin.txin_id)
137 WHERE txin.tx_id = %d
138 ORDER BY txin.txin_pos
139 """%(tx_id,), (), lock)
141 def get_address_out_rows(self, dbhash):
142 return self.safe_sql(""" SELECT
152 FROM chain_candidate cc
153 JOIN block b ON (b.block_id = cc.block_id)
154 JOIN block_tx ON (block_tx.block_id = b.block_id)
155 JOIN tx ON (tx.tx_id = block_tx.tx_id)
156 JOIN txin ON (txin.tx_id = tx.tx_id)
157 JOIN txout prevout ON (txin.txout_id = prevout.txout_id)
158 JOIN pubkey ON (pubkey.pubkey_id = prevout.pubkey_id)
159 WHERE pubkey.pubkey_hash = ?
160 AND cc.in_longest = 1""", (dbhash,))
162 def get_address_out_rows_memorypool(self, dbhash):
163 return self.safe_sql(""" SELECT
170 JOIN txin ON (txin.tx_id = tx.tx_id)
171 JOIN txout prevout ON (txin.txout_id = prevout.txout_id)
172 JOIN pubkey ON (pubkey.pubkey_id = prevout.pubkey_id)
173 WHERE pubkey.pubkey_hash = ? """, (dbhash,))
175 def get_address_in_rows(self, dbhash):
176 return self.safe_sql(""" SELECT
186 FROM chain_candidate cc
187 JOIN block b ON (b.block_id = cc.block_id)
188 JOIN block_tx ON (block_tx.block_id = b.block_id)
189 JOIN tx ON (tx.tx_id = block_tx.tx_id)
190 JOIN txout ON (txout.tx_id = tx.tx_id)
191 JOIN pubkey ON (pubkey.pubkey_id = txout.pubkey_id)
192 WHERE pubkey.pubkey_hash = ?
193 AND cc.in_longest = 1""", (dbhash,))
195 def get_address_in_rows_memorypool(self, dbhash):
196 return self.safe_sql( """ SELECT
203 JOIN txout ON (txout.tx_id = tx.tx_id)
204 JOIN pubkey ON (pubkey.pubkey_id = txout.pubkey_id)
205 WHERE pubkey.pubkey_hash = ? """, (dbhash,))
207 def get_history(self, addr):
209 cached_version = self.tx_cache.get( addr )
210 if cached_version is not None:
211 return cached_version
213 version, binaddr = decode_check_address(addr)
217 dbhash = self.binin(binaddr)
219 rows += self.get_address_out_rows( dbhash )
220 rows += self.get_address_in_rows( dbhash )
227 nTime, chain_id, height, is_in, blk_hash, tx_hash, tx_id, pos, value = row
229 print "cannot unpack row", row
231 tx_hash = self.hashout_hex(tx_hash)
234 "height": int(height),
236 "blk_hash": self.hashout_hex(blk_hash),
243 txpoints.append(txpoint)
244 known_tx.append(self.hashout_hex(tx_hash))
247 # todo: sort them really...
248 txpoints = sorted(txpoints, key=operator.itemgetter("nTime"))
252 rows += self.get_address_in_rows_memorypool( dbhash )
253 rows += self.get_address_out_rows_memorypool( dbhash )
254 address_has_mempool = False
257 is_in, tx_hash, tx_id, pos, value = row
258 tx_hash = self.hashout_hex(tx_hash)
259 if tx_hash in known_tx:
262 # this means that pending transactions were added to the db, even if they are not returned by getmemorypool
263 address_has_mempool = True
265 # this means pending transactions are returned by getmemorypool
266 if tx_hash not in self.mempool_keys:
269 #print "mempool", tx_hash
274 "blk_hash": 'mempool',
280 txpoints.append(txpoint)
283 for txpoint in txpoints:
284 tx_id = txpoint['tx_id']
287 inrows = self.get_tx_inputs(tx_id)
289 _hash = self.binout(row[6])
290 address = hash_to_address(chr(0), _hash)
291 txinputs.append(address)
292 txpoint['inputs'] = txinputs
294 outrows = self.get_tx_outputs(tx_id)
296 _hash = self.binout(row[6])
297 address = hash_to_address(chr(0), _hash)
298 txoutputs.append(address)
299 txpoint['outputs'] = txoutputs
301 # for all unspent inputs, I want their scriptpubkey. (actually I could deduce it from the address)
302 if not txpoint['is_in']:
303 # detect if already redeemed...
305 if row[6] == dbhash: break
308 #row = self.get_tx_output(tx_id,dbhash)
309 # pos, script, value, o_hash, o_id, o_pos, binaddr = row
310 # if not redeemed, we add the script
312 if not row[4]: txpoint['raw_scriptPubKey'] = row[1]
315 if not address_has_mempool:
316 self.tx_cache[addr] = txpoints
322 def memorypool_update(store):
324 ds = BCDataStream.BCDataStream()
325 previous_transactions = store.mempool_keys
326 store.mempool_keys = []
328 postdata = dumps({"method": 'getmemorypool', 'params': [], 'id':'jsonrpc'})
330 respdata = urllib.urlopen(store.bitcoind_url, postdata).read()
332 if r['error'] != None:
335 v = r['result'].get('transactions')
338 ds.write(hextx.decode('hex'))
339 tx = deserialize.parse_Transaction(ds)
340 tx['hash'] = util.double_sha256(tx['tx'])
341 tx_hash = store.hashin(tx['hash'])
343 store.mempool_keys.append(tx_hash)
344 if store.tx_find_id_and_value(tx):
347 tx_id = store.import_tx(tx, False)
348 store.update_tx_cache(tx_id)
353 def send_tx(self,tx):
354 postdata = dumps({"method": 'importtransaction', 'params': [tx], 'id':'jsonrpc'})
355 respdata = urllib.urlopen(self.bitcoind_url, postdata).read()
357 if r['error'] != None:
358 out = "error: transaction rejected by memorypool\n"+tx
364 def main_iteration(store):
366 store.dblock.acquire()
368 store.memorypool_update()
369 block_number = store.get_block_number(1)
372 print "IOError: cannot reach bitcoind"
375 traceback.print_exc(file=sys.stdout)
378 store.dblock.release()
384 import time, json, socket, operator, thread, ast, sys, re, traceback
386 from json import dumps, loads
390 config = ConfigParser.ConfigParser()
391 # set some defaults, which will be overwritten by the config file
392 config.add_section('server')
393 config.set('server','banner', 'Welcome to Electrum!')
394 config.set('server', 'host', 'localhost')
395 config.set('server', 'port', '50000')
396 config.set('server', 'password', '')
397 config.set('server', 'irc', 'yes')
398 config.set('server', 'ircname', 'Electrum server')
399 config.add_section('database')
400 config.set('database', 'type', 'psycopg2')
401 config.set('database', 'database', 'abe')
404 f = open('/etc/electrum.conf','r')
408 print "Could not read electrum.conf. I will use the default values."
411 f = open('/etc/electrum.banner','r')
412 config.set('server','banner', f.read())
418 password = config.get('server','password')
423 sessions_sub_numblocks = {} # sessions that have subscribed to the service
425 m_sessions = [{}] # served by http
429 wallets = {} # for ultra-light clients such as bccapi
431 from Queue import Queue
432 input_queue = Queue()
433 output_queue = Queue()
438 def random_string(N):
439 import random, string
440 return ''.join(random.choice(string.ascii_uppercase + string.digits) for x in range(N))
444 def cmd_stop(_,__,pw):
450 return 'wrong password'
452 def cmd_load(_,__,pw):
454 return repr( len(sessions) )
456 return 'wrong password'
462 def modified_addresses(a_session):
465 session = copy.deepcopy(a_session)
466 addresses = session['addresses']
467 session['last_time'] = time.time()
470 for addr in addresses:
471 status = get_address_status( addr )
472 msg_id, last_status = addresses.get( addr )
473 if last_status != status:
474 addresses[addr] = msg_id, status
477 #t2 = time.time() - t1
478 #if t2 > 10: print "high load:", session_id, "%d/%d"%(k,len(addresses)), t2
479 return ret, addresses
482 def poll_session(session_id):
484 session = sessions.get(session_id)
486 print time.asctime(), "session not found", session_id
489 ret, addresses = modified_addresses(session)
490 if ret: sessions[session_id]['addresses'] = addresses
491 return repr( (block_number,ret))
494 def poll_session_json(session_id, message_id):
495 session = m_sessions[0].get(session_id)
497 raise BaseException("session not found %s"%session_id)
500 ret, addresses = modified_addresses(session)
502 m_sessions[0][session_id]['addresses'] = addresses
504 msg_id, status = addresses[addr]
505 out.append( { 'id':msg_id, 'result':status } )
507 msg_id, last_nb = session.get('numblocks')
509 if last_nb != block_number:
510 m_sessions[0][session_id]['numblocks'] = msg_id, block_number
511 out.append( {'id':msg_id, 'result':block_number} )
516 def do_update_address(addr):
517 # an address was involved in a transaction; we check if it was subscribed to in a session
518 # the address can be subscribed in several sessions; the cache should ensure that we don't do redundant requests
520 for session_id in sessions.keys():
521 session = sessions[session_id]
522 if session.get('type') != 'persistent': continue
523 addresses = session['addresses'].keys()
525 if addr in addresses:
526 status = get_address_status( addr )
527 message_id, last_status = session['addresses'][addr]
528 if last_status != status:
529 #print "sending new status for %s:"%addr, status
530 send_status(session_id,message_id,addr,status)
531 sessions[session_id]['addresses'][addr] = (message_id,status)
533 def get_address_status(addr):
534 # get address status, i.e. the last block for that address.
535 tx_points = store.get_history(addr)
539 lastpoint = tx_points[-1]
540 status = lastpoint['blk_hash']
541 # this is a temporary hack; move it up once old clients have disappeared
542 if status == 'mempool': # and session['version'] != "old":
543 status = status + ':%d'% len(tx_points)
547 def send_numblocks(session_id):
548 message_id = sessions_sub_numblocks[session_id]
549 out = json.dumps( {'id':message_id, 'result':block_number} )
550 output_queue.put((session_id, out))
552 def send_status(session_id, message_id, address, status):
553 out = json.dumps( { 'id':message_id, 'result':status } )
554 output_queue.put((session_id, out))
556 def address_get_history_json(_,message_id,address):
557 return store.get_history(address)
559 def subscribe_to_numblocks(session_id, message_id):
560 sessions_sub_numblocks[session_id] = message_id
561 send_numblocks(session_id)
563 def subscribe_to_numblocks_json(session_id, message_id):
565 m_sessions[0][session_id]['numblocks'] = message_id,block_number
568 def subscribe_to_address(session_id, message_id, address):
569 status = get_address_status(address)
570 sessions[session_id]['addresses'][address] = (message_id, status)
571 sessions[session_id]['last_time'] = time.time()
572 send_status(session_id, message_id, address, status)
574 def add_address_to_session_json(session_id, message_id, address):
576 sessions = m_sessions[0]
577 status = get_address_status(address)
578 sessions[session_id]['addresses'][address] = (message_id, status)
579 sessions[session_id]['last_time'] = time.time()
580 m_sessions[0] = sessions
583 def add_address_to_session(session_id, address):
584 status = get_address_status(address)
585 sessions[session_id]['addresses'][address] = ("", status)
586 sessions[session_id]['last_time'] = time.time()
589 def new_session(version, addresses):
590 session_id = random_string(10)
591 sessions[session_id] = { 'addresses':{}, 'version':version }
593 sessions[session_id]['addresses'][a] = ('','')
594 out = repr( (session_id, config.get('server','banner').replace('\\n','\n') ) )
595 sessions[session_id]['last_time'] = time.time()
599 def client_version_json(session_id, _, version):
601 sessions = m_sessions[0]
602 sessions[session_id]['version'] = version
603 m_sessions[0] = sessions
605 def create_session_json(_, __):
606 sessions = m_sessions[0]
607 session_id = random_string(10)
608 print "creating session", session_id
609 sessions[session_id] = { 'addresses':{}, 'numblocks':('','') }
610 sessions[session_id]['last_time'] = time.time()
611 m_sessions[0] = sessions
616 def get_banner(_,__):
617 return config.get('server','banner').replace('\\n','\n')
619 def update_session(session_id,addresses):
620 """deprecated in 0.42"""
621 sessions[session_id]['addresses'] = {}
623 sessions[session_id]['addresses'][a] = ''
624 sessions[session_id]['last_time'] = time.time()
627 def native_server_thread():
628 s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
629 s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
630 s.bind((config.get('server','host'), config.getint('server','port')))
633 conn, addr = s.accept()
635 thread.start_new_thread(native_client_thread, (addr, conn,))
637 # can't start new thread if there is no memory..
638 traceback.print_exc(file=sys.stdout)
641 def native_client_thread(ipaddr,conn):
642 #print "client thread", ipaddr
652 msg = msg.split('#', 1)[0]
655 cmd, data = ast.literal_eval(msg)
657 print "syntax error", repr(msg), ipaddr
661 out = do_command(cmd, data, ipaddr)
663 #print ipaddr, cmd, len(out)
667 print "error, could not send"
674 return time.strftime("[%d/%m/%Y-%H:%M:%S]")
676 # used by the native handler
677 def do_command(cmd, data, ipaddr):
680 out = "%d"%block_number
682 elif cmd in ['session','new_session']:
685 addresses = ast.literal_eval(data)
688 version, addresses = ast.literal_eval(data)
689 if version[0]=="0": version = "v" + version
693 print timestr(), "new session", ipaddr, addresses[0] if addresses else addresses, len(addresses), version
694 out = new_session(version, addresses)
696 elif cmd=='address.subscribe':
698 session_id, addr = ast.literal_eval(data)
700 traceback.print_exc(file=sys.stdout)
703 out = add_address_to_session(session_id,addr)
705 elif cmd=='update_session':
707 session_id, addresses = ast.literal_eval(data)
709 traceback.print_exc(file=sys.stdout)
711 print timestr(), "update session", ipaddr, addresses[0] if addresses else addresses, len(addresses)
712 out = update_session(session_id,addresses)
715 out = poll_session(data)
720 out = repr( store.get_history( address ) )
723 out = cmd_load(None,None,data)
726 out = store.send_tx(data)
727 print timestr(), "sent tx:", ipaddr, out
733 out = repr(peer_list.values())
742 ####################################################################
744 def tcp_server_thread():
745 thread.start_new_thread(process_input_queue, ())
746 thread.start_new_thread(process_output_queue, ())
748 s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
749 s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
750 s.bind((config.get('server','host'), 50001))
753 conn, addr = s.accept()
755 thread.start_new_thread(tcp_client_thread, (addr, conn,))
757 # can't start new thread if there is no memory..
758 traceback.print_exc(file=sys.stdout)
761 def close_session(session_id):
762 #print "lost connection", session_id
763 sessions.pop(session_id)
764 if session_id in sessions_sub_numblocks:
765 sessions_sub_numblocks.pop(session_id)
768 # one thread per client. put requests in a queue.
769 def tcp_client_thread(ipaddr,conn):
770 """ use a persistent connection. put commands in a queue."""
772 print timestr(), "TCP session", ipaddr
775 session_id = random_string(10)
776 sessions[session_id] = { 'conn':conn, 'addresses':{}, 'version':'unknown', 'type':'persistent' }
787 close_session(session_id)
800 close_session(session_id)
805 print "json error", repr(c)
808 message_id = c.get('id')
809 method = c.get('method')
810 params = c.get('params')
812 print "syntax error", repr(c), ipaddr
816 input_queue.put((session_id, message_id, method, params))
820 # read commands from the input queue. perform requests, etc. this should be called from the main thread.
821 def process_input_queue():
823 session_id, message_id, method, data = input_queue.get()
824 if session_id not in sessions.keys():
827 if method == 'address.subscribe':
829 subscribe_to_address(session_id,message_id,address)
830 elif method == 'numblocks.subscribe':
831 subscribe_to_numblocks(session_id,message_id)
832 elif method == 'client.version':
833 sessions[session_id]['version'] = data[0]
834 elif method == 'server.banner':
835 out = { 'result':config.get('server','banner').replace('\\n','\n') }
836 elif method == 'server.peers':
837 out = { 'result':peer_list.values() }
838 elif method == 'address.get_history':
840 out = { 'result':store.get_history( address ) }
841 elif method == 'transaction.broadcast':
842 txo = store.send_tx(data[0])
843 print "sent tx:", txo
844 out = {'result':txo }
846 print "unknown command", method
848 out['id'] = message_id
849 out = json.dumps( out )
850 output_queue.put((session_id, out))
852 # this is a separate thread
853 def process_output_queue():
855 session_id, out = output_queue.get()
856 session = sessions.get(session_id)
859 conn = session.get('conn')
862 close_session(session_id)
867 ####################################################################
872 def clean_session_thread():
876 for k,s in sessions.items():
877 if s.get('type') == 'persistent': continue
881 print "lost session", k
886 NICK = 'E_'+random_string(10)
890 s.connect(('irc.freenode.net', 6667))
891 s.send('USER electrum 0 * :'+config.get('server','host')+' '+config.get('server','ircname')+'\n')
892 s.send('NICK '+NICK+'\n')
893 s.send('JOIN #electrum\n')
894 sf = s.makefile('r', 0)
898 line = line.rstrip('\r\n')
901 s.send('PONG '+line[1]+'\n')
902 elif '353' in line: # answer to /names
903 k = line.index('353')
904 for item in line[k+1:]:
905 if item[0:2] == 'E_':
906 s.send('WHO %s\n'%item)
907 elif '352' in line: # answer to /who
908 # warning: this is a horrible hack which apparently works
909 k = line.index('352')
911 ip = socket.gethostbyname(ip)
914 peer_list[name] = (ip,host)
915 if time.time() - t > 5*60:
916 s.send('NAMES #electrum\n')
920 traceback.print_exc(file=sys.stdout)
926 def get_peers_json(_,__):
927 return peer_list.values()
929 def http_server_thread():
930 # see http://code.google.com/p/jsonrpclib/
931 from SocketServer import ThreadingMixIn
932 from StratumJSONRPCServer import StratumJSONRPCServer
933 class StratumThreadedJSONRPCServer(ThreadingMixIn, StratumJSONRPCServer): pass
934 server = StratumThreadedJSONRPCServer(( config.get('server','host'), 8081))
935 server.register_function(get_peers_json, 'server.peers')
936 server.register_function(cmd_stop, 'stop')
937 server.register_function(cmd_load, 'load')
938 server.register_function(get_banner, 'server.banner')
939 server.register_function(lambda a,b,c: store.send_tx(c), 'transaction.broadcast')
940 server.register_function(address_get_history_json, 'address.get_history')
941 server.register_function(add_address_to_session_json, 'address.subscribe')
942 server.register_function(subscribe_to_numblocks_json, 'numblocks.subscribe')
943 server.register_function(client_version_json, 'client.version')
944 server.register_function(create_session_json, 'session.create') # internal message (not part of protocol)
945 server.register_function(poll_session_json, 'session.poll') # internal message (not part of protocol)
946 server.serve_forever()
949 if __name__ == '__main__':
953 server = jsonrpclib.Server('http://%s:8081'%config.get('server','host'))
956 out = server.load(password)
958 out = server.server.peers()
960 out = server.stop(password)
961 elif cmd == 'clear_cache':
962 out = server.clear_cache(password)
963 elif cmd == 'get_cache':
964 out = server.get_cache(password,sys.argv[2])
966 out = server.address.get_history(sys.argv[2])
968 out = server.transaction.broadcast(sys.argv[2])
970 out = server.numblocks.subscribe()
972 out = "Unknown command: '%s'" % cmd
977 # from db import MyStore
978 store = MyStore(config)
980 # supported protocols
981 thread.start_new_thread(native_server_thread, ())
982 thread.start_new_thread(tcp_server_thread, ())
983 thread.start_new_thread(http_server_thread, ())
984 thread.start_new_thread(clean_session_thread, ())
986 if (config.get('server','irc') == 'yes' ):
987 thread.start_new_thread(irc_thread, ())
989 print "starting Electrum server"
991 old_block_number = None
993 block_number = store.main_iteration()
995 if block_number != old_block_number:
996 old_block_number = block_number
997 for session_id in sessions_sub_numblocks.keys():
998 send_numblocks(session_id)
1001 addr = store.address_queue.get(False)
1004 do_update_address(addr)
1007 print "server stopped"