2 # Copyright(C) 2012 thomasv@gitorious
4 # This program is free software: you can redistribute it and/or modify
5 # it under the terms of the GNU Affero General Public License as
6 # published by the Free Software Foundation, either version 3 of the
7 # License, or (at your option) any later version.
9 # This program is distributed in the hope that it will be useful, but
10 # WITHOUT ANY WARRANTY; without even the implied warranty of
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
12 # Affero General Public License for more details.
14 # You should have received a copy of the GNU Affero General Public
15 # License along with this program. If not, see
16 # <http://www.gnu.org/licenses/agpl.html>.
20 * server should check and return bitcoind status..
21 * improve txpoint sorting
22 * command to check cache
24 mempool transactions do not need to be added to the database; it slows it down
28 from Abe.abe import hash_to_address, decode_check_address
29 from Abe.DataStore import DataStore as Datastore_class
30 from Abe import DataStore, readconf, BCDataStream, deserialize, util, base58
32 import psycopg2, binascii
34 import thread, traceback, sys, urllib, operator
35 from json import dumps, loads
38 class MyStore(Datastore_class):
40 def __init__(self, config):
41 conf = DataStore.CONFIG_DEFAULTS
42 args, argv = readconf.parse_argv( [], conf)
43 args.dbtype = config.get('database','type')
44 if args.dbtype == 'sqlite3':
45 args.connect_args = { 'database' : config.get('database','database') }
46 elif args.dbtype == 'MySQLdb':
47 args.connect_args = { 'db' : config.get('database','database'), 'user' : config.get('database','username'), 'passwd' : config.get('database','password') }
48 elif args.dbtype == 'psycopg2':
49 args.connect_args = { 'database' : config.get('database','database') }
51 Datastore_class.__init__(self,args)
54 self.mempool_keys = {}
55 self.bitcoind_url = 'http://%s:%s@%s:%s/' % ( config.get('bitcoind','user'), config.get('bitcoind','password'), config.get('bitcoind','host'), config.get('bitcoind','port'))
57 self.address_queue = Queue()
59 self.dblock = thread.allocate_lock()
63 def import_block(self, b, chain_ids=frozenset()):
64 block_id = super(MyStore, self).import_block(b, chain_ids)
65 for pos in xrange(len(b['transactions'])):
66 tx = b['transactions'][pos]
68 tx['hash'] = util.double_sha256(tx['tx'])
69 tx_id = store.tx_find_id_and_value(tx)
71 self.update_tx_cache(tx_id)
73 print "error: import_block: no tx_id"
77 def update_tx_cache(self, txid):
78 inrows = self.get_tx_inputs(txid, False)
80 _hash = self.binout(row[6])
81 address = hash_to_address(chr(0), _hash)
82 if self.tx_cache.has_key(address):
83 print "cache: invalidating", address
84 self.tx_cache.pop(address)
85 self.address_queue.put(address)
87 outrows = self.get_tx_outputs(txid, False)
89 _hash = self.binout(row[6])
90 address = hash_to_address(chr(0), _hash)
91 if self.tx_cache.has_key(address):
92 print "cache: invalidating", address
93 self.tx_cache.pop(address)
94 self.address_queue.put(address)
96 def safe_sql(self,sql, params=(), lock=True):
98 if lock: self.dblock.acquire()
99 ret = self.selectall(sql,params)
100 if lock: self.dblock.release()
103 print "sql error", sql
106 def get_tx_outputs(self, tx_id, lock=True):
107 return self.safe_sql("""SELECT
109 txout.txout_scriptPubKey,
116 LEFT JOIN txin ON (txin.txout_id = txout.txout_id)
117 LEFT JOIN pubkey ON (pubkey.pubkey_id = txout.pubkey_id)
118 LEFT JOIN tx nexttx ON (txin.tx_id = nexttx.tx_id)
119 WHERE txout.tx_id = %d
120 ORDER BY txout.txout_pos
121 """%(tx_id), (), lock)
123 def get_tx_inputs(self, tx_id, lock=True):
124 return self.safe_sql(""" SELECT
128 COALESCE(prevtx.tx_hash, u.txout_tx_hash),
130 COALESCE(txout.txout_pos, u.txout_pos),
133 LEFT JOIN txout ON (txout.txout_id = txin.txout_id)
134 LEFT JOIN pubkey ON (pubkey.pubkey_id = txout.pubkey_id)
135 LEFT JOIN tx prevtx ON (txout.tx_id = prevtx.tx_id)
136 LEFT JOIN unlinked_txin u ON (u.txin_id = txin.txin_id)
137 WHERE txin.tx_id = %d
138 ORDER BY txin.txin_pos
139 """%(tx_id,), (), lock)
141 def get_address_out_rows(self, dbhash):
142 return self.safe_sql(""" SELECT
152 FROM chain_candidate cc
153 JOIN block b ON (b.block_id = cc.block_id)
154 JOIN block_tx ON (block_tx.block_id = b.block_id)
155 JOIN tx ON (tx.tx_id = block_tx.tx_id)
156 JOIN txin ON (txin.tx_id = tx.tx_id)
157 JOIN txout prevout ON (txin.txout_id = prevout.txout_id)
158 JOIN pubkey ON (pubkey.pubkey_id = prevout.pubkey_id)
159 WHERE pubkey.pubkey_hash = ?
160 AND cc.in_longest = 1""", (dbhash,))
162 def get_address_out_rows_memorypool(self, dbhash):
163 return self.safe_sql(""" SELECT
170 JOIN txin ON (txin.tx_id = tx.tx_id)
171 JOIN txout prevout ON (txin.txout_id = prevout.txout_id)
172 JOIN pubkey ON (pubkey.pubkey_id = prevout.pubkey_id)
173 WHERE pubkey.pubkey_hash = ? """, (dbhash,))
175 def get_address_in_rows(self, dbhash):
176 return self.safe_sql(""" SELECT
186 FROM chain_candidate cc
187 JOIN block b ON (b.block_id = cc.block_id)
188 JOIN block_tx ON (block_tx.block_id = b.block_id)
189 JOIN tx ON (tx.tx_id = block_tx.tx_id)
190 JOIN txout ON (txout.tx_id = tx.tx_id)
191 JOIN pubkey ON (pubkey.pubkey_id = txout.pubkey_id)
192 WHERE pubkey.pubkey_hash = ?
193 AND cc.in_longest = 1""", (dbhash,))
195 def get_address_in_rows_memorypool(self, dbhash):
196 return self.safe_sql( """ SELECT
203 JOIN txout ON (txout.tx_id = tx.tx_id)
204 JOIN pubkey ON (pubkey.pubkey_id = txout.pubkey_id)
205 WHERE pubkey.pubkey_hash = ? """, (dbhash,))
207 def get_history(self, addr):
209 cached_version = self.tx_cache.get( addr )
210 if cached_version is not None:
211 return cached_version
213 version, binaddr = decode_check_address(addr)
217 dbhash = self.binin(binaddr)
219 rows += self.get_address_out_rows( dbhash )
220 rows += self.get_address_in_rows( dbhash )
227 nTime, chain_id, height, is_in, blk_hash, tx_hash, tx_id, pos, value = row
229 print "cannot unpack row", row
231 tx_hash = self.hashout_hex(tx_hash)
234 "height": int(height),
236 "blk_hash": self.hashout_hex(blk_hash),
243 txpoints.append(txpoint)
244 known_tx.append(self.hashout_hex(tx_hash))
247 # todo: sort them really...
248 txpoints = sorted(txpoints, key=operator.itemgetter("nTime"))
252 rows += self.get_address_in_rows_memorypool( dbhash )
253 rows += self.get_address_out_rows_memorypool( dbhash )
254 address_has_mempool = False
257 is_in, tx_hash, tx_id, pos, value = row
258 tx_hash = self.hashout_hex(tx_hash)
259 if tx_hash in known_tx:
262 # this means that pending transactions were added to the db, even if they are not returned by getmemorypool
263 address_has_mempool = True
265 # this means pending transactions are returned by getmemorypool
266 if tx_hash not in self.mempool_keys:
269 #print "mempool", tx_hash
274 "blk_hash": 'mempool',
280 txpoints.append(txpoint)
283 for txpoint in txpoints:
284 tx_id = txpoint['tx_id']
287 inrows = self.get_tx_inputs(tx_id)
289 _hash = self.binout(row[6])
290 address = hash_to_address(chr(0), _hash)
291 txinputs.append(address)
292 txpoint['inputs'] = txinputs
294 outrows = self.get_tx_outputs(tx_id)
296 _hash = self.binout(row[6])
297 address = hash_to_address(chr(0), _hash)
298 txoutputs.append(address)
299 txpoint['outputs'] = txoutputs
301 # for all unspent inputs, I want their scriptpubkey. (actually I could deduce it from the address)
302 if not txpoint['is_in']:
303 # detect if already redeemed...
305 if row[6] == dbhash: break
308 #row = self.get_tx_output(tx_id,dbhash)
309 # pos, script, value, o_hash, o_id, o_pos, binaddr = row
310 # if not redeemed, we add the script
312 if not row[4]: txpoint['raw_scriptPubKey'] = row[1]
315 if not address_has_mempool:
316 self.tx_cache[addr] = txpoints
322 def memorypool_update(store):
324 ds = BCDataStream.BCDataStream()
325 previous_transactions = store.mempool_keys
326 store.mempool_keys = []
328 postdata = dumps({"method": 'getmemorypool', 'params': [], 'id':'jsonrpc'})
330 respdata = urllib.urlopen(store.bitcoind_url, postdata).read()
332 if r['error'] != None:
335 v = r['result'].get('transactions')
338 ds.write(hextx.decode('hex'))
339 tx = deserialize.parse_Transaction(ds)
340 tx['hash'] = util.double_sha256(tx['tx'])
341 tx_hash = store.hashin(tx['hash'])
343 def send_tx(self,tx):
344 postdata = dumps({"method": 'importtransaction', 'params': [tx], 'id':'jsonrpc'})
345 respdata = urllib.urlopen(self.bitcoind_url, postdata).read()
347 if r['error'] != None:
348 out = "error: transaction rejected by memorypool\n"+tx
354 def main_iteration(store):
356 store.dblock.acquire()
358 store.memorypool_update()
359 block_number = store.get_block_number(1)
362 print "IOError: cannot reach bitcoind"
365 traceback.print_exc(file=sys.stdout)
368 store.dblock.release()
374 import time, json, socket, operator, thread, ast, sys, re, traceback
376 from json import dumps, loads
380 config = ConfigParser.ConfigParser()
381 # set some defaults, which will be overwritten by the config file
382 config.add_section('server')
383 config.set('server','banner', 'Welcome to Electrum!')
384 config.set('server', 'host', 'localhost')
385 config.set('server', 'port', '50000')
386 config.set('server', 'password', '')
387 config.set('server', 'irc', 'yes')
388 config.set('server', 'ircname', 'Electrum server')
389 config.add_section('database')
390 config.set('database', 'type', 'psycopg2')
391 config.set('database', 'database', 'abe')
394 f = open('/etc/electrum.conf','r')
398 print "Could not read electrum.conf. I will use the default values."
401 f = open('/etc/electrum.banner','r')
402 config.set('server','banner', f.read())
408 password = config.get('server','password')
413 sessions_sub_numblocks = {} # sessions that have subscribed to the service
415 m_sessions = [{}] # served by http
419 wallets = {} # for ultra-light clients such as bccapi
421 from Queue import Queue
422 input_queue = Queue()
423 output_queue = Queue()
428 def random_string(N):
429 import random, string
430 return ''.join(random.choice(string.ascii_uppercase + string.digits) for x in range(N))
434 def cmd_stop(_,__,pw):
440 return 'wrong password'
442 def cmd_load(_,__,pw):
444 return repr( len(sessions) )
446 return 'wrong password'
452 def modified_addresses(session):
455 addresses = session['addresses']
456 session['last_time'] = time.time()
459 for addr in addresses:
460 status = get_address_status( addr )
461 msg_id, last_status = addresses.get( addr )
462 if last_status != status:
463 addresses[addr] = msg_id, status
466 t2 = time.time() - t1
467 #if t2 > 10: print "high load:", session_id, "%d/%d"%(k,len(addresses)), t2
468 return ret, addresses
471 def poll_session(session_id):
473 session = sessions.get(session_id)
475 print time.asctime(), "session not found", session_id
478 ret, addresses = modified_addresses(session)
479 if ret: sessions[session_id]['addresses'] = addresses
480 return repr( (block_number,ret))
483 def poll_session_json(session_id, message_id):
484 session = m_sessions[0].get(session_id)
486 raise BaseException("session not found %s"%session_id)
489 ret, addresses = modified_addresses(session)
491 m_sessions[0][session_id]['addresses'] = addresses
493 msg_id, status = addresses[addr]
494 out.append( { 'id':msg_id, 'result':status } )
496 msg_id, last_nb = session.get('numblocks')
498 if last_nb != block_number:
499 m_sessions[0][session_id]['numblocks'] = msg_id, block_number
500 out.append( {'id':msg_id, 'result':block_number} )
505 def do_update_address(addr):
506 # an address was involved in a transaction; we check if it was subscribed to in a session
507 # the address can be subscribed in several sessions; the cache should ensure that we don't do redundant requests
509 for session_id in sessions.keys():
510 session = sessions[session_id]
511 if session.get('type') != 'persistent': continue
512 addresses = session['addresses'].keys()
514 if addr in addresses:
515 status = get_address_status( addr )
516 message_id, last_status = session['addresses'][addr]
517 if last_status != status:
518 #print "sending new status for %s:"%addr, status
519 send_status(session_id,message_id,addr,status)
520 sessions[session_id]['addresses'][addr] = (message_id,status)
522 def get_address_status(addr):
523 # get address status, i.e. the last block for that address.
524 tx_points = store.get_history(addr)
528 lastpoint = tx_points[-1]
529 status = lastpoint['blk_hash']
530 # this is a temporary hack; move it up once old clients have disappeared
531 if status == 'mempool': # and session['version'] != "old":
532 status = status + ':%d'% len(tx_points)
536 def send_numblocks(session_id):
537 message_id = sessions_sub_numblocks[session_id]
538 out = json.dumps( {'id':message_id, 'result':block_number} )
539 output_queue.put((session_id, out))
541 def send_status(session_id, message_id, address, status):
542 out = json.dumps( { 'id':message_id, 'result':status } )
543 output_queue.put((session_id, out))
545 def address_get_history_json(_,message_id,address):
546 return store.get_history(address)
548 def subscribe_to_numblocks(session_id, message_id):
549 sessions_sub_numblocks[session_id] = message_id
550 send_numblocks(session_id)
552 def subscribe_to_numblocks_json(session_id, message_id):
554 m_sessions[0][session_id]['numblocks'] = message_id,block_number
557 def subscribe_to_address(session_id, message_id, address):
558 status = get_address_status(address)
559 sessions[session_id]['addresses'][address] = (message_id, status)
560 sessions[session_id]['last_time'] = time.time()
561 send_status(session_id, message_id, address, status)
563 def add_address_to_session_json(session_id, message_id, address):
565 sessions = m_sessions[0]
566 status = get_address_status(address)
567 sessions[session_id]['addresses'][address] = (message_id, status)
568 sessions[session_id]['last_time'] = time.time()
569 m_sessions[0] = sessions
572 def add_address_to_session(session_id, address):
573 status = get_address_status(address)
574 sessions[session_id]['addresses'][address] = ("", status)
575 sessions[session_id]['last_time'] = time.time()
578 def new_session(version, addresses):
579 session_id = random_string(10)
580 sessions[session_id] = { 'addresses':{}, 'version':version }
582 sessions[session_id]['addresses'][a] = ('','')
583 out = repr( (session_id, config.get('server','banner').replace('\\n','\n') ) )
584 sessions[session_id]['last_time'] = time.time()
588 def client_version_json(session_id, _, version):
590 sessions = m_sessions[0]
591 sessions[session_id]['version'] = version
592 m_sessions[0] = sessions
594 def create_session_json(_, __):
595 sessions = m_sessions[0]
596 session_id = random_string(10)
597 print "creating session", session_id
598 sessions[session_id] = { 'addresses':{}, 'numblocks':('','') }
599 sessions[session_id]['last_time'] = time.time()
600 m_sessions[0] = sessions
605 def get_banner(_,__):
606 return config.get('server','banner').replace('\\n','\n')
608 def update_session(session_id,addresses):
609 """deprecated in 0.42"""
610 sessions[session_id]['addresses'] = {}
612 sessions[session_id]['addresses'][a] = ''
613 sessions[session_id]['last_time'] = time.time()
616 def native_server_thread():
617 s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
618 s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
619 s.bind((config.get('server','host'), config.getint('server','port')))
622 conn, addr = s.accept()
624 thread.start_new_thread(native_client_thread, (addr, conn,))
626 # can't start new thread if there is no memory..
627 traceback.print_exc(file=sys.stdout)
630 def native_client_thread(ipaddr,conn):
631 #print "client thread", ipaddr
641 msg = msg.split('#', 1)[0]
644 cmd, data = ast.literal_eval(msg)
646 print "syntax error", repr(msg), ipaddr
650 out = do_command(cmd, data, ipaddr)
652 #print ipaddr, cmd, len(out)
656 print "error, could not send"
663 return time.strftime("[%d/%m/%Y-%H:%M:%S]")
665 # used by the native handler
666 def do_command(cmd, data, ipaddr):
669 out = "%d"%block_number
671 elif cmd in ['session','new_session']:
674 addresses = ast.literal_eval(data)
677 version, addresses = ast.literal_eval(data)
678 if version[0]=="0": version = "v" + version
682 print timestr(), "new session", ipaddr, addresses[0] if addresses else addresses, len(addresses), version
683 out = new_session(version, addresses)
685 elif cmd=='address.subscribe':
687 session_id, addr = ast.literal_eval(data)
689 traceback.print_exc(file=sys.stdout)
692 out = add_address_to_session(session_id,addr)
694 elif cmd=='update_session':
696 session_id, addresses = ast.literal_eval(data)
698 traceback.print_exc(file=sys.stdout)
700 print timestr(), "update session", ipaddr, addresses[0] if addresses else addresses, len(addresses)
701 out = update_session(session_id,addresses)
704 out = poll_session(data)
709 out = repr( store.get_history( address ) )
712 out = cmd_load(None,None,data)
715 out = store.send_tx(data)
716 print timestr(), "sent tx:", ipaddr, out
722 out = repr(peer_list.values())
731 ####################################################################
733 def tcp_server_thread():
734 thread.start_new_thread(process_input_queue, ())
735 thread.start_new_thread(process_output_queue, ())
737 s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
738 s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
739 s.bind((config.get('server','host'), 50001))
742 conn, addr = s.accept()
744 thread.start_new_thread(tcp_client_thread, (addr, conn,))
746 # can't start new thread if there is no memory..
747 traceback.print_exc(file=sys.stdout)
750 def close_session(session_id):
751 #print "lost connection", session_id
752 sessions.pop(session_id)
753 if session_id in sessions_sub_numblocks:
754 sessions_sub_numblocks.pop(session_id)
757 # one thread per client. put requests in a queue.
758 def tcp_client_thread(ipaddr,conn):
759 """ use a persistent connection. put commands in a queue."""
761 print timestr(), "TCP session", ipaddr
764 session_id = random_string(10)
765 sessions[session_id] = { 'conn':conn, 'addresses':{}, 'version':'unknown', 'type':'persistent' }
776 close_session(session_id)
789 close_session(session_id)
794 print "json error", repr(c)
797 message_id = c.get('id')
798 method = c.get('method')
799 params = c.get('params')
801 print "syntax error", repr(c), ipaddr
805 input_queue.put((session_id, message_id, method, params))
809 # read commands from the input queue. perform requests, etc. this should be called from the main thread.
810 def process_input_queue():
812 session_id, message_id, method, data = input_queue.get()
813 if session_id not in sessions.keys():
816 if method == 'address.subscribe':
818 subscribe_to_address(session_id,message_id,address)
819 elif method == 'numblocks.subscribe':
820 subscribe_to_numblocks(session_id,message_id)
821 elif method == 'client.version':
822 sessions[session_id]['version'] = data[0]
823 elif method == 'server.banner':
824 out = { 'result':config.get('server','banner').replace('\\n','\n') }
825 elif method == 'server.peers':
826 out = { 'result':peer_list.values() }
827 elif method == 'address.get_history':
829 out = { 'result':store.get_history( address ) }
830 elif method == 'transaction.broadcast':
831 postdata = dumps({"method": 'importtransaction', 'params': [data], 'id':'jsonrpc'})
832 txo = urllib.urlopen(bitcoind_url, postdata).read()
833 print "sent tx:", txo
834 out = json.loads(txo)
836 print "unknown command", method
838 out['id'] = message_id
839 out = json.dumps( out )
840 output_queue.put((session_id, out))
842 # this is a separate thread
843 def process_output_queue():
845 session_id, out = output_queue.get()
846 session = sessions.get(session_id)
849 conn = session.get('conn')
852 close_session(session_id)
857 ####################################################################
862 def clean_session_thread():
866 for k,s in sessions.items():
867 if s.get('type') == 'persistent': continue
871 print "lost session", k
876 NICK = 'E_'+random_string(10)
880 s.connect(('irc.freenode.net', 6667))
881 s.send('USER electrum 0 * :'+config.get('server','host')+' '+config.get('server','ircname')+'\n')
882 s.send('NICK '+NICK+'\n')
883 s.send('JOIN #electrum\n')
884 sf = s.makefile('r', 0)
888 line = line.rstrip('\r\n')
891 s.send('PONG '+line[1]+'\n')
892 elif '353' in line: # answer to /names
893 k = line.index('353')
894 for item in line[k+1:]:
895 if item[0:2] == 'E_':
896 s.send('WHO %s\n'%item)
897 elif '352' in line: # answer to /who
898 # warning: this is a horrible hack which apparently works
899 k = line.index('352')
901 ip = socket.gethostbyname(ip)
904 peer_list[name] = (ip,host)
905 if time.time() - t > 5*60:
906 s.send('NAMES #electrum\n')
910 traceback.print_exc(file=sys.stdout)
916 def get_peers_json(_,__):
917 return peer_list.values()
919 def http_server_thread():
920 # see http://code.google.com/p/jsonrpclib/
921 from SocketServer import ThreadingMixIn
922 from StratumJSONRPCServer import StratumJSONRPCServer
923 class StratumThreadedJSONRPCServer(ThreadingMixIn, StratumJSONRPCServer): pass
924 server = StratumThreadedJSONRPCServer(( config.get('server','host'), 8081))
925 server.register_function(get_peers_json, 'server.peers')
926 server.register_function(cmd_stop, 'stop')
927 server.register_function(cmd_load, 'load')
928 server.register_function(get_banner, 'server.banner')
929 server.register_function(lambda a,b,c: store.send_tx(c), 'transaction.broadcast')
930 server.register_function(address_get_history_json, 'address.get_history')
931 server.register_function(add_address_to_session_json, 'address.subscribe')
932 server.register_function(subscribe_to_numblocks_json, 'numblocks.subscribe')
933 server.register_function(client_version_json, 'client.version')
934 server.register_function(create_session_json, 'session.create') # internal message (not part of protocol)
935 server.register_function(poll_session_json, 'session.poll') # internal message (not part of protocol)
936 server.serve_forever()
939 if __name__ == '__main__':
943 server = jsonrpclib.Server('http://%s:8081'%config.get('server','host'))
946 out = server.load(password)
948 out = server.server.peers()
950 out = server.stop(password)
951 elif cmd == 'clear_cache':
952 out = server.clear_cache(password)
953 elif cmd == 'get_cache':
954 out = server.get_cache(password,sys.argv[2])
956 out = server.address.get_history(sys.argv[2])
958 out = server.transaction.broadcast(sys.argv[2])
960 out = server.numblocks.subscribe()
962 out = "Unknown command: '%s'" % cmd
967 # from db import MyStore
968 store = MyStore(config)
970 # supported protocols
971 thread.start_new_thread(native_server_thread, ())
972 thread.start_new_thread(tcp_server_thread, ())
973 thread.start_new_thread(http_server_thread, ())
974 thread.start_new_thread(clean_session_thread, ())
976 if (config.get('server','irc') == 'yes' ):
977 thread.start_new_thread(irc_thread, ())
979 print "starting Electrum server"
981 old_block_number = None
983 block_number = store.main_iteration()
985 if block_number != old_block_number:
986 old_block_number = block_number
987 for session_id in sessions_sub_numblocks.keys():
988 send_numblocks(session_id)
991 addr = store.address_queue.get(False)
994 do_update_address(addr)
997 print "server stopped"