2 # Copyright(C) 2012 thomasv@gitorious
4 # This program is free software: you can redistribute it and/or modify
5 # it under the terms of the GNU Affero General Public License as
6 # published by the Free Software Foundation, either version 3 of the
7 # License, or (at your option) any later version.
9 # This program is distributed in the hope that it will be useful, but
10 # WITHOUT ANY WARRANTY; without even the implied warranty of
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
12 # Affero General Public License for more details.
14 # You should have received a copy of the GNU Affero General Public
15 # License along with this program. If not, see
16 # <http://www.gnu.org/licenses/agpl.html>.
20 * server should check and return bitcoind status..
21 * improve txpoint sorting
22 * command to check cache
24 mempool transactions do not need to be added to the database; it slows it down
28 from Abe.abe import hash_to_address, decode_check_address
29 from Abe.DataStore import DataStore as Datastore_class
30 from Abe import DataStore, readconf, BCDataStream, deserialize, util, base58
32 import psycopg2, binascii
34 import thread, traceback, sys, urllib, operator
35 from json import dumps, loads
38 class MyStore(Datastore_class):
40 def __init__(self, config, address_queue):
41 conf = DataStore.CONFIG_DEFAULTS
42 args, argv = readconf.parse_argv( [], conf)
43 args.dbtype = config.get('database','type')
44 if args.dbtype == 'sqlite3':
45 args.connect_args = { 'database' : config.get('database','database') }
46 elif args.dbtype == 'MySQLdb':
47 args.connect_args = { 'db' : config.get('database','database'), 'user' : config.get('database','username'), 'passwd' : config.get('database','password') }
48 elif args.dbtype == 'psycopg2':
49 args.connect_args = { 'database' : config.get('database','database') }
51 Datastore_class.__init__(self,args)
54 self.mempool_keys = {}
55 self.bitcoind_url = 'http://%s:%s@%s:%s/' % ( config.get('bitcoind','user'), config.get('bitcoind','password'), config.get('bitcoind','host'), config.get('bitcoind','port'))
57 self.address_queue = address_queue
58 self.dblock = thread.allocate_lock()
62 def import_block(self, b, chain_ids=frozenset()):
63 block_id = super(MyStore, self).import_block(b, chain_ids)
64 for pos in xrange(len(b['transactions'])):
65 tx = b['transactions'][pos]
67 tx['hash'] = util.double_sha256(tx['tx'])
68 tx_id = store.tx_find_id_and_value(tx)
70 self.update_tx_cache(tx_id)
72 print "error: import_block: no tx_id"
76 def update_tx_cache(self, txid):
77 inrows = self.get_tx_inputs(txid, False)
79 _hash = self.binout(row[6])
80 address = hash_to_address(chr(0), _hash)
81 if self.tx_cache.has_key(address):
82 print "cache: invalidating", address
83 self.tx_cache.pop(address)
84 self.address_queue.put(address)
86 outrows = self.get_tx_outputs(txid, False)
88 _hash = self.binout(row[6])
89 address = hash_to_address(chr(0), _hash)
90 if self.tx_cache.has_key(address):
91 print "cache: invalidating", address
92 self.tx_cache.pop(address)
93 self.address_queue.put(address)
95 def safe_sql(self,sql, params=(), lock=True):
97 if lock: self.dblock.acquire()
98 ret = self.selectall(sql,params)
99 if lock: self.dblock.release()
102 print "sql error", sql
105 def get_tx_outputs(self, tx_id, lock=True):
106 return self.safe_sql("""SELECT
108 txout.txout_scriptPubKey,
115 LEFT JOIN txin ON (txin.txout_id = txout.txout_id)
116 LEFT JOIN pubkey ON (pubkey.pubkey_id = txout.pubkey_id)
117 LEFT JOIN tx nexttx ON (txin.tx_id = nexttx.tx_id)
118 WHERE txout.tx_id = %d
119 ORDER BY txout.txout_pos
120 """%(tx_id), (), lock)
122 def get_tx_inputs(self, tx_id, lock=True):
123 return self.safe_sql(""" SELECT
127 COALESCE(prevtx.tx_hash, u.txout_tx_hash),
129 COALESCE(txout.txout_pos, u.txout_pos),
132 LEFT JOIN txout ON (txout.txout_id = txin.txout_id)
133 LEFT JOIN pubkey ON (pubkey.pubkey_id = txout.pubkey_id)
134 LEFT JOIN tx prevtx ON (txout.tx_id = prevtx.tx_id)
135 LEFT JOIN unlinked_txin u ON (u.txin_id = txin.txin_id)
136 WHERE txin.tx_id = %d
137 ORDER BY txin.txin_pos
138 """%(tx_id,), (), lock)
140 def get_address_out_rows(self, dbhash):
141 return self.safe_sql(""" SELECT
151 FROM chain_candidate cc
152 JOIN block b ON (b.block_id = cc.block_id)
153 JOIN block_tx ON (block_tx.block_id = b.block_id)
154 JOIN tx ON (tx.tx_id = block_tx.tx_id)
155 JOIN txin ON (txin.tx_id = tx.tx_id)
156 JOIN txout prevout ON (txin.txout_id = prevout.txout_id)
157 JOIN pubkey ON (pubkey.pubkey_id = prevout.pubkey_id)
158 WHERE pubkey.pubkey_hash = ?
159 AND cc.in_longest = 1""", (dbhash,))
161 def get_address_out_rows_memorypool(self, dbhash):
162 return self.safe_sql(""" SELECT
169 JOIN txin ON (txin.tx_id = tx.tx_id)
170 JOIN txout prevout ON (txin.txout_id = prevout.txout_id)
171 JOIN pubkey ON (pubkey.pubkey_id = prevout.pubkey_id)
172 WHERE pubkey.pubkey_hash = ? """, (dbhash,))
174 def get_address_in_rows(self, dbhash):
175 return self.safe_sql(""" SELECT
185 FROM chain_candidate cc
186 JOIN block b ON (b.block_id = cc.block_id)
187 JOIN block_tx ON (block_tx.block_id = b.block_id)
188 JOIN tx ON (tx.tx_id = block_tx.tx_id)
189 JOIN txout ON (txout.tx_id = tx.tx_id)
190 JOIN pubkey ON (pubkey.pubkey_id = txout.pubkey_id)
191 WHERE pubkey.pubkey_hash = ?
192 AND cc.in_longest = 1""", (dbhash,))
194 def get_address_in_rows_memorypool(self, dbhash):
195 return self.safe_sql( """ SELECT
202 JOIN txout ON (txout.tx_id = tx.tx_id)
203 JOIN pubkey ON (pubkey.pubkey_id = txout.pubkey_id)
204 WHERE pubkey.pubkey_hash = ? """, (dbhash,))
206 def get_history(self, addr):
208 cached_version = self.tx_cache.get( addr )
209 if cached_version is not None:
210 return cached_version
212 version, binaddr = decode_check_address(addr)
216 dbhash = self.binin(binaddr)
218 rows += self.get_address_out_rows( dbhash )
219 rows += self.get_address_in_rows( dbhash )
226 nTime, chain_id, height, is_in, blk_hash, tx_hash, tx_id, pos, value = row
228 print "cannot unpack row", row
230 tx_hash = self.hashout_hex(tx_hash)
233 "height": int(height),
235 "blk_hash": self.hashout_hex(blk_hash),
242 txpoints.append(txpoint)
243 known_tx.append(self.hashout_hex(tx_hash))
246 # todo: sort them really...
247 txpoints = sorted(txpoints, key=operator.itemgetter("nTime"))
251 rows += self.get_address_in_rows_memorypool( dbhash )
252 rows += self.get_address_out_rows_memorypool( dbhash )
253 address_has_mempool = False
256 is_in, tx_hash, tx_id, pos, value = row
257 tx_hash = self.hashout_hex(tx_hash)
258 if tx_hash in known_tx:
261 # this means that pending transactions were added to the db, even if they are not returned by getmemorypool
262 address_has_mempool = True
264 # this means pending transactions are returned by getmemorypool
265 if tx_hash not in self.mempool_keys:
268 #print "mempool", tx_hash
273 "blk_hash": 'mempool',
279 txpoints.append(txpoint)
282 for txpoint in txpoints:
283 tx_id = txpoint['tx_id']
286 inrows = self.get_tx_inputs(tx_id)
288 _hash = self.binout(row[6])
289 address = hash_to_address(chr(0), _hash)
290 txinputs.append(address)
291 txpoint['inputs'] = txinputs
293 outrows = self.get_tx_outputs(tx_id)
295 _hash = self.binout(row[6])
296 address = hash_to_address(chr(0), _hash)
297 txoutputs.append(address)
298 txpoint['outputs'] = txoutputs
300 # for all unspent inputs, I want their scriptpubkey. (actually I could deduce it from the address)
301 if not txpoint['is_in']:
302 # detect if already redeemed...
304 if row[6] == dbhash: break
307 #row = self.get_tx_output(tx_id,dbhash)
308 # pos, script, value, o_hash, o_id, o_pos, binaddr = row
309 # if not redeemed, we add the script
311 if not row[4]: txpoint['raw_scriptPubKey'] = row[1]
314 if not address_has_mempool:
315 self.tx_cache[addr] = txpoints
321 def memorypool_update(store):
323 ds = BCDataStream.BCDataStream()
324 previous_transactions = store.mempool_keys
325 store.mempool_keys = []
327 postdata = dumps({"method": 'getmemorypool', 'params': [], 'id':'jsonrpc'})
329 respdata = urllib.urlopen(store.bitcoind_url, postdata).read()
331 if r['error'] != None:
334 v = r['result'].get('transactions')
337 ds.write(hextx.decode('hex'))
338 tx = deserialize.parse_Transaction(ds)
339 tx['hash'] = util.double_sha256(tx['tx'])
340 tx_hash = store.hashin(tx['hash'])
342 def send_tx(self,tx):
343 postdata = dumps({"method": 'importtransaction', 'params': [tx], 'id':'jsonrpc'})
344 respdata = urllib.urlopen(self.bitcoind_url, postdata).read()
346 if r['error'] != None:
347 out = "error: transaction rejected by memorypool\n"+tx
353 def main_iteration(store):
355 store.dblock.acquire()
357 store.memorypool_update()
358 block_number = store.get_block_number(1)
361 print "IOError: cannot reach bitcoind"
364 traceback.print_exc(file=sys.stdout)
367 store.dblock.release()
373 import time, json, socket, operator, thread, ast, sys, re, traceback
375 from json import dumps, loads
379 config = ConfigParser.ConfigParser()
380 # set some defaults, which will be overwritten by the config file
381 config.add_section('server')
382 config.set('server','banner', 'Welcome to Electrum!')
383 config.set('server', 'host', 'localhost')
384 config.set('server', 'port', '50000')
385 config.set('server', 'password', '')
386 config.set('server', 'irc', 'yes')
387 config.set('server', 'ircname', 'Electrum server')
388 config.add_section('database')
389 config.set('database', 'type', 'psycopg2')
390 config.set('database', 'database', 'abe')
393 f = open('/etc/electrum.conf','r')
397 print "Could not read electrum.conf. I will use the default values."
400 f = open('/etc/electrum.banner','r')
401 config.set('server','banner', f.read())
407 password = config.get('server','password')
412 sessions_sub_numblocks = {} # sessions that have subscribed to the service
414 m_sessions = [{}] # served by http
418 wallets = {} # for ultra-light clients such as bccapi
420 from Queue import Queue
421 input_queue = Queue()
422 output_queue = Queue()
423 address_queue = Queue()
431 def random_string(N):
432 import random, string
433 return ''.join(random.choice(string.ascii_uppercase + string.digits) for x in range(N))
437 def cmd_stop(_,__,pw):
443 return 'wrong password'
445 def cmd_load(_,__,pw):
447 return repr( len(sessions) )
449 return 'wrong password'
455 def modified_addresses(session):
458 addresses = session['addresses']
459 session['last_time'] = time.time()
462 for addr in addresses:
463 status = get_address_status( addr )
464 msg_id, last_status = addresses.get( addr )
465 if last_status != status:
466 addresses[addr] = msg_id, status
469 t2 = time.time() - t1
470 #if t2 > 10: print "high load:", session_id, "%d/%d"%(k,len(addresses)), t2
471 return ret, addresses
474 def poll_session(session_id):
476 session = sessions.get(session_id)
478 print time.asctime(), "session not found", session_id
481 ret, addresses = modified_addresses(session)
482 if ret: sessions[session_id]['addresses'] = addresses
483 return repr( (block_number,ret))
486 def poll_session_json(session_id, message_id):
487 session = m_sessions[0].get(session_id)
489 raise BaseException("session not found %s"%session_id)
492 ret, addresses = modified_addresses(session)
494 m_sessions[0][session_id]['addresses'] = addresses
496 msg_id, status = addresses[addr]
497 out.append( { 'id':msg_id, 'result':status } )
499 msg_id, last_nb = session.get('numblocks')
501 if last_nb != block_number:
502 m_sessions[0][session_id]['numblocks'] = msg_id, block_number
503 out.append( {'id':msg_id, 'result':block_number} )
508 def do_update_address(addr):
509 # an address was involved in a transaction; we check if it was subscribed to in a session
510 # the address can be subscribed in several sessions; the cache should ensure that we don't do redundant requests
512 for session_id in sessions.keys():
513 session = sessions[session_id]
514 if session.get('type') != 'persistent': continue
515 addresses = session['addresses'].keys()
517 if addr in addresses:
518 status = get_address_status( addr )
519 message_id, last_status = session['addresses'][addr]
520 if last_status != status:
521 #print "sending new status for %s:"%addr, status
522 send_status(session_id,message_id,addr,status)
523 sessions[session_id]['addresses'][addr] = (message_id,status)
525 def get_address_status(addr):
526 # get address status, i.e. the last block for that address.
527 tx_points = store.get_history(addr)
531 lastpoint = tx_points[-1]
532 status = lastpoint['blk_hash']
533 # this is a temporary hack; move it up once old clients have disappeared
534 if status == 'mempool': # and session['version'] != "old":
535 status = status + ':%d'% len(tx_points)
539 def send_numblocks(session_id):
540 message_id = sessions_sub_numblocks[session_id]
541 out = json.dumps( {'id':message_id, 'result':block_number} )
542 output_queue.put((session_id, out))
544 def send_status(session_id, message_id, address, status):
545 out = json.dumps( { 'id':message_id, 'result':status } )
546 output_queue.put((session_id, out))
548 def address_get_history_json(_,message_id,address):
549 return store.get_history(address)
551 def subscribe_to_numblocks(session_id, message_id):
552 sessions_sub_numblocks[session_id] = message_id
553 send_numblocks(session_id)
555 def subscribe_to_numblocks_json(session_id, message_id):
557 m_sessions[0][session_id]['numblocks'] = message_id,block_number
560 def subscribe_to_address(session_id, message_id, address):
561 status = get_address_status(address)
562 sessions[session_id]['addresses'][address] = (message_id, status)
563 sessions[session_id]['last_time'] = time.time()
564 send_status(session_id, message_id, address, status)
566 def add_address_to_session_json(session_id, message_id, address):
568 sessions = m_sessions[0]
569 status = get_address_status(address)
570 sessions[session_id]['addresses'][address] = (message_id, status)
571 sessions[session_id]['last_time'] = time.time()
572 m_sessions[0] = sessions
575 def add_address_to_session(session_id, address):
576 status = get_address_status(address)
577 sessions[session_id]['addresses'][address] = ("", status)
578 sessions[session_id]['last_time'] = time.time()
581 def new_session(version, addresses):
582 session_id = random_string(10)
583 sessions[session_id] = { 'addresses':{}, 'version':version }
585 sessions[session_id]['addresses'][a] = ('','')
586 out = repr( (session_id, config.get('server','banner').replace('\\n','\n') ) )
587 sessions[session_id]['last_time'] = time.time()
591 def client_version_json(session_id, _, version):
593 sessions = m_sessions[0]
594 sessions[session_id]['version'] = version
595 m_sessions[0] = sessions
597 def create_session_json(_, __):
598 sessions = m_sessions[0]
599 session_id = random_string(10)
600 print "creating session", session_id
601 sessions[session_id] = { 'addresses':{}, 'numblocks':('','') }
602 sessions[session_id]['last_time'] = time.time()
603 m_sessions[0] = sessions
608 def get_banner(_,__):
609 return config.get('server','banner').replace('\\n','\n')
611 def update_session(session_id,addresses):
612 """deprecated in 0.42"""
613 sessions[session_id]['addresses'] = {}
615 sessions[session_id]['addresses'][a] = ''
616 sessions[session_id]['last_time'] = time.time()
619 def native_server_thread():
620 s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
621 s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
622 s.bind((config.get('server','host'), config.getint('server','port')))
625 conn, addr = s.accept()
627 thread.start_new_thread(native_client_thread, (addr, conn,))
629 # can't start new thread if there is no memory..
630 traceback.print_exc(file=sys.stdout)
633 def native_client_thread(ipaddr,conn):
634 #print "client thread", ipaddr
644 msg = msg.split('#', 1)[0]
647 cmd, data = ast.literal_eval(msg)
649 print "syntax error", repr(msg), ipaddr
653 out = do_command(cmd, data, ipaddr)
655 #print ipaddr, cmd, len(out)
659 print "error, could not send"
666 return time.strftime("[%d/%m/%Y-%H:%M:%S]")
668 # used by the native handler
669 def do_command(cmd, data, ipaddr):
672 out = "%d"%block_number
674 elif cmd in ['session','new_session']:
677 addresses = ast.literal_eval(data)
680 version, addresses = ast.literal_eval(data)
681 if version[0]=="0": version = "v" + version
685 print timestr(), "new session", ipaddr, addresses[0] if addresses else addresses, len(addresses), version
686 out = new_session(version, addresses)
688 elif cmd=='address.subscribe':
690 session_id, addr = ast.literal_eval(data)
692 traceback.print_exc(file=sys.stdout)
695 out = add_address_to_session(session_id,addr)
697 elif cmd=='update_session':
699 session_id, addresses = ast.literal_eval(data)
701 traceback.print_exc(file=sys.stdout)
703 print timestr(), "update session", ipaddr, addresses[0] if addresses else addresses, len(addresses)
704 out = update_session(session_id,addresses)
707 out = poll_session(data)
712 out = repr( store.get_history( address ) )
715 out = cmd_load(None,None,data)
718 out = store.send_tx(data)
719 print timestr(), "sent tx:", ipaddr, out
725 out = repr(peer_list.values())
734 ####################################################################
736 def tcp_server_thread():
737 thread.start_new_thread(process_input_queue, ())
738 thread.start_new_thread(process_output_queue, ())
740 s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
741 s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
742 s.bind((config.get('server','host'), 50001))
745 conn, addr = s.accept()
747 thread.start_new_thread(tcp_client_thread, (addr, conn,))
749 # can't start new thread if there is no memory..
750 traceback.print_exc(file=sys.stdout)
753 def close_session(session_id):
754 #print "lost connection", session_id
755 sessions.pop(session_id)
756 if session_id in sessions_sub_numblocks:
757 sessions_sub_numblocks.pop(session_id)
760 # one thread per client. put requests in a queue.
761 def tcp_client_thread(ipaddr,conn):
762 """ use a persistent connection. put commands in a queue."""
764 print timestr(), "TCP session", ipaddr
767 session_id = random_string(10)
768 sessions[session_id] = { 'conn':conn, 'addresses':{}, 'version':'unknown', 'type':'persistent' }
779 close_session(session_id)
792 close_session(session_id)
797 print "json error", repr(c)
800 message_id = c.get('id')
801 method = c.get('method')
802 params = c.get('params')
804 print "syntax error", repr(c), ipaddr
808 input_queue.put((session_id, message_id, method, params))
812 # read commands from the input queue. perform requests, etc. this should be called from the main thread.
813 def process_input_queue():
815 session_id, message_id, method, data = input_queue.get()
816 if session_id not in sessions.keys():
819 if method == 'address.subscribe':
821 subscribe_to_address(session_id,message_id,address)
822 elif method == 'numblocks.subscribe':
823 subscribe_to_numblocks(session_id,message_id)
824 elif method == 'client.version':
825 sessions[session_id]['version'] = data[0]
826 elif method == 'server.banner':
827 out = { 'result':config.get('server','banner').replace('\\n','\n') }
828 elif method == 'server.peers':
829 out = { 'result':peer_list.values() }
830 elif method == 'address.get_history':
832 out = { 'result':store.get_history( address ) }
833 elif method == 'transaction.broadcast':
834 postdata = dumps({"method": 'importtransaction', 'params': [data], 'id':'jsonrpc'})
835 txo = urllib.urlopen(bitcoind_url, postdata).read()
836 print "sent tx:", txo
837 out = json.loads(txo)
839 print "unknown command", method
841 out['id'] = message_id
842 out = json.dumps( out )
843 output_queue.put((session_id, out))
845 # this is a separate thread
846 def process_output_queue():
848 session_id, out = output_queue.get()
849 session = sessions.get(session_id)
852 conn = session.get('conn')
855 close_session(session_id)
860 ####################################################################
865 def clean_session_thread():
869 for k,s in sessions.items():
870 if s.get('type') == 'persistent': continue
874 print "lost session", k
879 NICK = 'E_'+random_string(10)
883 s.connect(('irc.freenode.net', 6667))
884 s.send('USER electrum 0 * :'+config.get('server','host')+' '+config.get('server','ircname')+'\n')
885 s.send('NICK '+NICK+'\n')
886 s.send('JOIN #electrum\n')
887 sf = s.makefile('r', 0)
891 line = line.rstrip('\r\n')
894 s.send('PONG '+line[1]+'\n')
895 elif '353' in line: # answer to /names
896 k = line.index('353')
897 for item in line[k+1:]:
898 if item[0:2] == 'E_':
899 s.send('WHO %s\n'%item)
900 elif '352' in line: # answer to /who
901 # warning: this is a horrible hack which apparently works
902 k = line.index('352')
904 ip = socket.gethostbyname(ip)
907 peer_list[name] = (ip,host)
908 if time.time() - t > 5*60:
909 s.send('NAMES #electrum\n')
913 traceback.print_exc(file=sys.stdout)
919 def get_peers_json(_,__):
920 return peer_list.values()
922 def http_server_thread():
923 # see http://code.google.com/p/jsonrpclib/
924 from SocketServer import ThreadingMixIn
925 from StratumJSONRPCServer import StratumJSONRPCServer
926 class StratumThreadedJSONRPCServer(ThreadingMixIn, StratumJSONRPCServer): pass
927 server = StratumThreadedJSONRPCServer(( config.get('server','host'), 8081))
928 server.register_function(get_peers_json, 'server.peers')
929 server.register_function(cmd_stop, 'stop')
930 server.register_function(cmd_load, 'load')
931 server.register_function(get_banner, 'server.banner')
932 server.register_function(lambda a,b,c: store.send_tx(c), 'transaction.broadcast')
933 server.register_function(address_get_history_json, 'address.get_history')
934 server.register_function(add_address_to_session_json, 'address.subscribe')
935 server.register_function(subscribe_to_numblocks_json, 'numblocks.subscribe')
936 server.register_function(client_version_json, 'client.version')
937 server.register_function(create_session_json, 'session.create') # internal message (not part of protocol)
938 server.register_function(poll_session_json, 'session.poll') # internal message (not part of protocol)
939 server.serve_forever()
942 if __name__ == '__main__':
946 server = jsonrpclib.Server('http://%s:8081'%config.get('server','host'))
949 out = server.load(password)
951 out = server.server.peers()
953 out = server.stop(password)
954 elif cmd == 'clear_cache':
955 out = server.clear_cache(password)
956 elif cmd == 'get_cache':
957 out = server.get_cache(password,sys.argv[2])
959 out = server.address.get_history(sys.argv[2])
961 out = server.transaction.broadcast(sys.argv[2])
963 out = server.numblocks.subscribe()
965 out = "Unknown command: '%s'" % cmd
970 # from db import MyStore
971 store = MyStore(config,address_queue)
973 # supported protocols
974 thread.start_new_thread(native_server_thread, ())
975 thread.start_new_thread(tcp_server_thread, ())
976 thread.start_new_thread(http_server_thread, ())
977 thread.start_new_thread(clean_session_thread, ())
979 if (config.get('server','irc') == 'yes' ):
980 thread.start_new_thread(irc_thread, ())
982 print "starting Electrum server"
984 old_block_number = None
986 block_number = store.main_iteration()
988 if block_number != old_block_number:
989 old_block_number = block_number
990 for session_id in sessions_sub_numblocks.keys():
991 send_numblocks(session_id)
994 addr = address_queue.get(False)
997 do_update_address(addr)
1000 print "server stopped"