2 # Copyright(C) 2012 thomasv@gitorious
4 # This program is free software: you can redistribute it and/or modify
5 # it under the terms of the GNU Affero General Public License as
6 # published by the Free Software Foundation, either version 3 of the
7 # License, or (at your option) any later version.
9 # This program is distributed in the hope that it will be useful, but
10 # WITHOUT ANY WARRANTY; without even the implied warranty of
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
12 # Affero General Public License for more details.
14 # You should have received a copy of the GNU Affero General Public
15 # License along with this program. If not, see
16 # <http://www.gnu.org/licenses/agpl.html>.
20 * server should check and return bitcoind status..
21 * improve txpoint sorting
22 * command to check cache
24 mempool transactions do not need to be added to the database; it slows it down
28 from Abe.abe import hash_to_address, decode_check_address
29 from Abe.DataStore import DataStore as Datastore_class
30 from Abe import DataStore, readconf, BCDataStream, deserialize, util, base58
32 import psycopg2, binascii
34 import thread, traceback, sys, urllib, operator
35 from json import dumps, loads
38 class MyStore(Datastore_class):
40 def __init__(self, config):
41 conf = DataStore.CONFIG_DEFAULTS
42 args, argv = readconf.parse_argv( [], conf)
43 args.dbtype = config.get('database','type')
44 if args.dbtype == 'sqlite3':
45 args.connect_args = { 'database' : config.get('database','database') }
46 elif args.dbtype == 'MySQLdb':
47 args.connect_args = { 'db' : config.get('database','database'), 'user' : config.get('database','username'), 'passwd' : config.get('database','password') }
48 elif args.dbtype == 'psycopg2':
49 args.connect_args = { 'database' : config.get('database','database') }
51 Datastore_class.__init__(self,args)
54 self.mempool_keys = {}
55 self.bitcoind_url = 'http://%s:%s@%s:%s/' % ( config.get('bitcoind','user'), config.get('bitcoind','password'), config.get('bitcoind','host'), config.get('bitcoind','port'))
57 self.address_queue = Queue()
59 self.dblock = thread.allocate_lock()
63 def import_block(self, b, chain_ids=frozenset()):
64 block_id = super(MyStore, self).import_block(b, chain_ids)
65 for pos in xrange(len(b['transactions'])):
66 tx = b['transactions'][pos]
68 tx['hash'] = util.double_sha256(tx['tx'])
69 tx_id = store.tx_find_id_and_value(tx)
71 self.update_tx_cache(tx_id)
73 print "error: import_block: no tx_id"
77 def update_tx_cache(self, txid):
78 inrows = self.get_tx_inputs(txid, False)
80 _hash = self.binout(row[6])
81 address = hash_to_address(chr(0), _hash)
82 if self.tx_cache.has_key(address):
83 print "cache: invalidating", address
84 self.tx_cache.pop(address)
85 self.address_queue.put(address)
87 outrows = self.get_tx_outputs(txid, False)
89 _hash = self.binout(row[6])
90 address = hash_to_address(chr(0), _hash)
91 if self.tx_cache.has_key(address):
92 print "cache: invalidating", address
93 self.tx_cache.pop(address)
94 self.address_queue.put(address)
96 def safe_sql(self,sql, params=(), lock=True):
98 if lock: self.dblock.acquire()
99 ret = self.selectall(sql,params)
100 if lock: self.dblock.release()
103 print "sql error", sql
106 def get_tx_outputs(self, tx_id, lock=True):
107 return self.safe_sql("""SELECT
109 txout.txout_scriptPubKey,
116 LEFT JOIN txin ON (txin.txout_id = txout.txout_id)
117 LEFT JOIN pubkey ON (pubkey.pubkey_id = txout.pubkey_id)
118 LEFT JOIN tx nexttx ON (txin.tx_id = nexttx.tx_id)
119 WHERE txout.tx_id = %d
120 ORDER BY txout.txout_pos
121 """%(tx_id), (), lock)
123 def get_tx_inputs(self, tx_id, lock=True):
124 return self.safe_sql(""" SELECT
128 COALESCE(prevtx.tx_hash, u.txout_tx_hash),
130 COALESCE(txout.txout_pos, u.txout_pos),
133 LEFT JOIN txout ON (txout.txout_id = txin.txout_id)
134 LEFT JOIN pubkey ON (pubkey.pubkey_id = txout.pubkey_id)
135 LEFT JOIN tx prevtx ON (txout.tx_id = prevtx.tx_id)
136 LEFT JOIN unlinked_txin u ON (u.txin_id = txin.txin_id)
137 WHERE txin.tx_id = %d
138 ORDER BY txin.txin_pos
139 """%(tx_id,), (), lock)
141 def get_address_out_rows(self, dbhash):
142 return self.safe_sql(""" SELECT
152 FROM chain_candidate cc
153 JOIN block b ON (b.block_id = cc.block_id)
154 JOIN block_tx ON (block_tx.block_id = b.block_id)
155 JOIN tx ON (tx.tx_id = block_tx.tx_id)
156 JOIN txin ON (txin.tx_id = tx.tx_id)
157 JOIN txout prevout ON (txin.txout_id = prevout.txout_id)
158 JOIN pubkey ON (pubkey.pubkey_id = prevout.pubkey_id)
159 WHERE pubkey.pubkey_hash = ?
160 AND cc.in_longest = 1""", (dbhash,))
162 def get_address_out_rows_memorypool(self, dbhash):
163 return self.safe_sql(""" SELECT
170 JOIN txin ON (txin.tx_id = tx.tx_id)
171 JOIN txout prevout ON (txin.txout_id = prevout.txout_id)
172 JOIN pubkey ON (pubkey.pubkey_id = prevout.pubkey_id)
173 WHERE pubkey.pubkey_hash = ? """, (dbhash,))
175 def get_address_in_rows(self, dbhash):
176 return self.safe_sql(""" SELECT
186 FROM chain_candidate cc
187 JOIN block b ON (b.block_id = cc.block_id)
188 JOIN block_tx ON (block_tx.block_id = b.block_id)
189 JOIN tx ON (tx.tx_id = block_tx.tx_id)
190 JOIN txout ON (txout.tx_id = tx.tx_id)
191 JOIN pubkey ON (pubkey.pubkey_id = txout.pubkey_id)
192 WHERE pubkey.pubkey_hash = ?
193 AND cc.in_longest = 1""", (dbhash,))
195 def get_address_in_rows_memorypool(self, dbhash):
196 return self.safe_sql( """ SELECT
203 JOIN txout ON (txout.tx_id = tx.tx_id)
204 JOIN pubkey ON (pubkey.pubkey_id = txout.pubkey_id)
205 WHERE pubkey.pubkey_hash = ? """, (dbhash,))
207 def get_history(self, addr):
209 cached_version = self.tx_cache.get( addr )
210 if cached_version is not None:
211 return cached_version
213 version, binaddr = decode_check_address(addr)
217 dbhash = self.binin(binaddr)
219 rows += self.get_address_out_rows( dbhash )
220 rows += self.get_address_in_rows( dbhash )
227 nTime, chain_id, height, is_in, blk_hash, tx_hash, tx_id, pos, value = row
229 print "cannot unpack row", row
231 tx_hash = self.hashout_hex(tx_hash)
234 "height": int(height),
236 "blk_hash": self.hashout_hex(blk_hash),
243 txpoints.append(txpoint)
244 known_tx.append(self.hashout_hex(tx_hash))
247 # todo: sort them really...
248 txpoints = sorted(txpoints, key=operator.itemgetter("nTime"))
252 rows += self.get_address_in_rows_memorypool( dbhash )
253 rows += self.get_address_out_rows_memorypool( dbhash )
254 address_has_mempool = False
257 is_in, tx_hash, tx_id, pos, value = row
258 tx_hash = self.hashout_hex(tx_hash)
259 if tx_hash in known_tx:
262 # this means that pending transactions were added to the db, even if they are not returned by getmemorypool
263 address_has_mempool = True
265 # this means pending transactions are returned by getmemorypool
266 if tx_hash not in self.mempool_keys:
269 #print "mempool", tx_hash
274 "blk_hash": 'mempool',
280 txpoints.append(txpoint)
283 for txpoint in txpoints:
284 tx_id = txpoint['tx_id']
287 inrows = self.get_tx_inputs(tx_id)
289 _hash = self.binout(row[6])
290 address = hash_to_address(chr(0), _hash)
291 txinputs.append(address)
292 txpoint['inputs'] = txinputs
294 outrows = self.get_tx_outputs(tx_id)
296 _hash = self.binout(row[6])
297 address = hash_to_address(chr(0), _hash)
298 txoutputs.append(address)
299 txpoint['outputs'] = txoutputs
301 # for all unspent inputs, I want their scriptpubkey. (actually I could deduce it from the address)
302 if not txpoint['is_in']:
303 # detect if already redeemed...
305 if row[6] == dbhash: break
308 #row = self.get_tx_output(tx_id,dbhash)
309 # pos, script, value, o_hash, o_id, o_pos, binaddr = row
310 # if not redeemed, we add the script
312 if not row[4]: txpoint['raw_scriptPubKey'] = row[1]
315 if not address_has_mempool:
316 self.tx_cache[addr] = txpoints
322 def memorypool_update(store):
324 ds = BCDataStream.BCDataStream()
325 previous_transactions = store.mempool_keys
326 store.mempool_keys = []
328 postdata = dumps({"method": 'getmemorypool', 'params': [], 'id':'jsonrpc'})
330 respdata = urllib.urlopen(store.bitcoind_url, postdata).read()
332 if r['error'] != None:
335 v = r['result'].get('transactions')
338 ds.write(hextx.decode('hex'))
339 tx = deserialize.parse_Transaction(ds)
340 tx['hash'] = util.double_sha256(tx['tx'])
341 tx_hash = store.hashin(tx['hash'])
343 store.mempool_keys.append(tx_hash)
344 if store.tx_find_id_and_value(tx):
347 tx_id = store.import_tx(tx, False)
348 store.update_tx_cache(tx_id)
353 def send_tx(self,tx):
354 postdata = dumps({"method": 'importtransaction', 'params': [tx], 'id':'jsonrpc'})
355 respdata = urllib.urlopen(self.bitcoind_url, postdata).read()
357 if r['error'] != None:
358 out = "error: transaction rejected by memorypool\n"+tx
364 def main_iteration(store):
366 store.dblock.acquire()
368 store.memorypool_update()
369 block_number = store.get_block_number(1)
372 print "IOError: cannot reach bitcoind"
375 traceback.print_exc(file=sys.stdout)
378 store.dblock.release()
384 import time, json, socket, operator, thread, ast, sys, re, traceback
386 from json import dumps, loads
390 config = ConfigParser.ConfigParser()
391 # set some defaults, which will be overwritten by the config file
392 config.add_section('server')
393 config.set('server','banner', 'Welcome to Electrum!')
394 config.set('server', 'host', 'localhost')
395 config.set('server', 'port', '50000')
396 config.set('server', 'password', '')
397 config.set('server', 'irc', 'yes')
398 config.set('server', 'ircname', 'Electrum server')
399 config.add_section('database')
400 config.set('database', 'type', 'psycopg2')
401 config.set('database', 'database', 'abe')
404 f = open('/etc/electrum.conf','r')
408 print "Could not read electrum.conf. I will use the default values."
411 f = open('/etc/electrum.banner','r')
412 config.set('server','banner', f.read())
418 password = config.get('server','password')
423 sessions_sub_numblocks = {} # sessions that have subscribed to the service
425 m_sessions = [{}] # served by http
429 wallets = {} # for ultra-light clients such as bccapi
431 from Queue import Queue
432 input_queue = Queue()
433 output_queue = Queue()
438 def random_string(N):
439 import random, string
440 return ''.join(random.choice(string.ascii_uppercase + string.digits) for x in range(N))
444 def cmd_stop(_,__,pw):
450 return 'wrong password'
452 def cmd_load(_,__,pw):
454 return repr( len(sessions) )
456 return 'wrong password'
462 def modified_addresses(a_session):
465 session = copy.deepcopy(a_session)
466 addresses = session['addresses']
467 session['last_time'] = time.time()
470 for addr in addresses:
471 status = get_address_status( addr )
472 msg_id, last_status = addresses.get( addr )
473 if last_status != status:
474 addresses[addr] = msg_id, status
477 #t2 = time.time() - t1
478 #if t2 > 10: print "high load:", session_id, "%d/%d"%(k,len(addresses)), t2
479 return ret, addresses
482 def poll_session(session_id):
484 session = sessions.get(session_id)
486 print time.asctime(), "session not found", session_id
489 sessions[session_id]['last_time'] = time.time()
490 ret, addresses = modified_addresses(session)
491 if ret: sessions[session_id]['addresses'] = addresses
492 return repr( (block_number,ret))
495 def poll_session_json(session_id, message_id):
496 session = m_sessions[0].get(session_id)
498 raise BaseException("session not found %s"%session_id)
500 m_sessions[0][session_id]['last_time'] = time.time()
502 ret, addresses = modified_addresses(session)
504 m_sessions[0][session_id]['addresses'] = addresses
506 msg_id, status = addresses[addr]
507 out.append( { 'id':msg_id, 'result':status } )
509 msg_id, last_nb = session.get('numblocks')
511 if last_nb != block_number:
512 m_sessions[0][session_id]['numblocks'] = msg_id, block_number
513 out.append( {'id':msg_id, 'result':block_number} )
518 def do_update_address(addr):
519 # an address was involved in a transaction; we check if it was subscribed to in a session
520 # the address can be subscribed in several sessions; the cache should ensure that we don't do redundant requests
522 for session_id in sessions.keys():
523 session = sessions[session_id]
524 if session.get('type') != 'persistent': continue
525 addresses = session['addresses'].keys()
527 if addr in addresses:
528 status = get_address_status( addr )
529 message_id, last_status = session['addresses'][addr]
530 if last_status != status:
531 #print "sending new status for %s:"%addr, status
532 send_status(session_id,message_id,addr,status)
533 sessions[session_id]['addresses'][addr] = (message_id,status)
535 def get_address_status(addr):
536 # get address status, i.e. the last block for that address.
537 tx_points = store.get_history(addr)
541 lastpoint = tx_points[-1]
542 status = lastpoint['blk_hash']
543 # this is a temporary hack; move it up once old clients have disappeared
544 if status == 'mempool': # and session['version'] != "old":
545 status = status + ':%d'% len(tx_points)
549 def send_numblocks(session_id):
550 message_id = sessions_sub_numblocks[session_id]
551 out = json.dumps( {'id':message_id, 'result':block_number} )
552 output_queue.put((session_id, out))
554 def send_status(session_id, message_id, address, status):
555 out = json.dumps( { 'id':message_id, 'result':status } )
556 output_queue.put((session_id, out))
558 def address_get_history_json(_,message_id,address):
559 return store.get_history(address)
561 def subscribe_to_numblocks(session_id, message_id):
562 sessions_sub_numblocks[session_id] = message_id
563 send_numblocks(session_id)
565 def subscribe_to_numblocks_json(session_id, message_id):
567 m_sessions[0][session_id]['numblocks'] = message_id,block_number
570 def subscribe_to_address(session_id, message_id, address):
571 status = get_address_status(address)
572 sessions[session_id]['addresses'][address] = (message_id, status)
573 sessions[session_id]['last_time'] = time.time()
574 send_status(session_id, message_id, address, status)
576 def add_address_to_session_json(session_id, message_id, address):
578 sessions = m_sessions[0]
579 status = get_address_status(address)
580 sessions[session_id]['addresses'][address] = (message_id, status)
581 sessions[session_id]['last_time'] = time.time()
582 m_sessions[0] = sessions
585 def add_address_to_session(session_id, address):
586 status = get_address_status(address)
587 sessions[session_id]['addresses'][address] = ("", status)
588 sessions[session_id]['last_time'] = time.time()
591 def new_session(version, addresses):
592 session_id = random_string(10)
593 sessions[session_id] = { 'addresses':{}, 'version':version }
595 sessions[session_id]['addresses'][a] = ('','')
596 out = repr( (session_id, config.get('server','banner').replace('\\n','\n') ) )
597 sessions[session_id]['last_time'] = time.time()
601 def client_version_json(session_id, _, version):
603 sessions = m_sessions[0]
604 sessions[session_id]['version'] = version
605 m_sessions[0] = sessions
607 def create_session_json(_, __):
608 sessions = m_sessions[0]
609 session_id = random_string(10)
610 print "creating session", session_id
611 sessions[session_id] = { 'addresses':{}, 'numblocks':('','') }
612 sessions[session_id]['last_time'] = time.time()
613 m_sessions[0] = sessions
618 def get_banner(_,__):
619 return config.get('server','banner').replace('\\n','\n')
621 def update_session(session_id,addresses):
622 """deprecated in 0.42"""
623 sessions[session_id]['addresses'] = {}
625 sessions[session_id]['addresses'][a] = ''
626 sessions[session_id]['last_time'] = time.time()
629 def native_server_thread():
630 s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
631 s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
632 s.bind((config.get('server','host'), config.getint('server','port')))
635 conn, addr = s.accept()
637 thread.start_new_thread(native_client_thread, (addr, conn,))
639 # can't start new thread if there is no memory..
640 traceback.print_exc(file=sys.stdout)
643 def native_client_thread(ipaddr,conn):
644 #print "client thread", ipaddr
654 msg = msg.split('#', 1)[0]
657 cmd, data = ast.literal_eval(msg)
659 print "syntax error", repr(msg), ipaddr
663 out = do_command(cmd, data, ipaddr)
665 #print ipaddr, cmd, len(out)
669 print "error, could not send"
676 return time.strftime("[%d/%m/%Y-%H:%M:%S]")
678 # used by the native handler
679 def do_command(cmd, data, ipaddr):
682 out = "%d"%block_number
684 elif cmd in ['session','new_session']:
687 addresses = ast.literal_eval(data)
690 version, addresses = ast.literal_eval(data)
691 if version[0]=="0": version = "v" + version
695 print timestr(), "new session", ipaddr, addresses[0] if addresses else addresses, len(addresses), version
696 out = new_session(version, addresses)
698 elif cmd=='address.subscribe':
700 session_id, addr = ast.literal_eval(data)
702 traceback.print_exc(file=sys.stdout)
705 out = add_address_to_session(session_id,addr)
707 elif cmd=='update_session':
709 session_id, addresses = ast.literal_eval(data)
711 traceback.print_exc(file=sys.stdout)
713 print timestr(), "update session", ipaddr, addresses[0] if addresses else addresses, len(addresses)
714 out = update_session(session_id,addresses)
717 out = poll_session(data)
722 out = repr( store.get_history( address ) )
725 out = cmd_load(None,None,data)
728 out = store.send_tx(data)
729 print timestr(), "sent tx:", ipaddr, out
735 out = repr(peer_list.values())
744 ####################################################################
746 def tcp_server_thread():
747 thread.start_new_thread(process_input_queue, ())
748 thread.start_new_thread(process_output_queue, ())
750 s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
751 s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
752 s.bind((config.get('server','host'), 50001))
755 conn, addr = s.accept()
757 thread.start_new_thread(tcp_client_thread, (addr, conn,))
759 # can't start new thread if there is no memory..
760 traceback.print_exc(file=sys.stdout)
763 def close_session(session_id):
764 #print "lost connection", session_id
765 sessions.pop(session_id)
766 if session_id in sessions_sub_numblocks:
767 sessions_sub_numblocks.pop(session_id)
770 # one thread per client. put requests in a queue.
771 def tcp_client_thread(ipaddr,conn):
772 """ use a persistent connection. put commands in a queue."""
774 print timestr(), "TCP session", ipaddr
777 session_id = random_string(10)
778 sessions[session_id] = { 'conn':conn, 'addresses':{}, 'version':'unknown', 'type':'persistent' }
789 close_session(session_id)
802 close_session(session_id)
807 print "json error", repr(c)
810 message_id = c.get('id')
811 method = c.get('method')
812 params = c.get('params')
814 print "syntax error", repr(c), ipaddr
818 input_queue.put((session_id, message_id, method, params))
822 # read commands from the input queue. perform requests, etc. this should be called from the main thread.
823 def process_input_queue():
825 session_id, message_id, method, data = input_queue.get()
826 if session_id not in sessions.keys():
829 if method == 'address.subscribe':
831 subscribe_to_address(session_id,message_id,address)
832 elif method == 'numblocks.subscribe':
833 subscribe_to_numblocks(session_id,message_id)
834 elif method == 'client.version':
835 sessions[session_id]['version'] = data[0]
836 elif method == 'server.banner':
837 out = { 'result':config.get('server','banner').replace('\\n','\n') }
838 elif method == 'server.peers':
839 out = { 'result':peer_list.values() }
840 elif method == 'address.get_history':
842 out = { 'result':store.get_history( address ) }
843 elif method == 'transaction.broadcast':
844 txo = store.send_tx(data[0])
845 print "sent tx:", txo
846 out = {'result':txo }
848 print "unknown command", method
850 out['id'] = message_id
851 out = json.dumps( out )
852 output_queue.put((session_id, out))
854 # this is a separate thread
855 def process_output_queue():
857 session_id, out = output_queue.get()
858 session = sessions.get(session_id)
861 conn = session.get('conn')
864 close_session(session_id)
869 ####################################################################
874 def clean_session_thread():
878 for k,s in sessions.items():
879 if s.get('type') == 'persistent': continue
883 print "lost session", k
888 NICK = 'E_'+random_string(10)
892 s.connect(('irc.freenode.net', 6667))
893 s.send('USER electrum 0 * :'+config.get('server','host')+' '+config.get('server','ircname')+'\n')
894 s.send('NICK '+NICK+'\n')
895 s.send('JOIN #electrum\n')
896 sf = s.makefile('r', 0)
900 line = line.rstrip('\r\n')
903 s.send('PONG '+line[1]+'\n')
904 elif '353' in line: # answer to /names
905 k = line.index('353')
906 for item in line[k+1:]:
907 if item[0:2] == 'E_':
908 s.send('WHO %s\n'%item)
909 elif '352' in line: # answer to /who
910 # warning: this is a horrible hack which apparently works
911 k = line.index('352')
913 ip = socket.gethostbyname(ip)
916 peer_list[name] = (ip,host)
917 if time.time() - t > 5*60:
918 s.send('NAMES #electrum\n')
922 traceback.print_exc(file=sys.stdout)
928 def get_peers_json(_,__):
929 return peer_list.values()
931 def http_server_thread():
932 # see http://code.google.com/p/jsonrpclib/
933 from SocketServer import ThreadingMixIn
934 from StratumJSONRPCServer import StratumJSONRPCServer
935 class StratumThreadedJSONRPCServer(ThreadingMixIn, StratumJSONRPCServer): pass
936 server = StratumThreadedJSONRPCServer(( config.get('server','host'), 8081))
937 server.register_function(get_peers_json, 'server.peers')
938 server.register_function(cmd_stop, 'stop')
939 server.register_function(cmd_load, 'load')
940 server.register_function(get_banner, 'server.banner')
941 server.register_function(lambda a,b,c: store.send_tx(c), 'transaction.broadcast')
942 server.register_function(address_get_history_json, 'address.get_history')
943 server.register_function(add_address_to_session_json, 'address.subscribe')
944 server.register_function(subscribe_to_numblocks_json, 'numblocks.subscribe')
945 server.register_function(client_version_json, 'client.version')
946 server.register_function(create_session_json, 'session.create') # internal message (not part of protocol)
947 server.register_function(poll_session_json, 'session.poll') # internal message (not part of protocol)
948 server.serve_forever()
951 if __name__ == '__main__':
955 server = jsonrpclib.Server('http://%s:8081'%config.get('server','host'))
958 out = server.load(password)
960 out = server.server.peers()
962 out = server.stop(password)
963 elif cmd == 'clear_cache':
964 out = server.clear_cache(password)
965 elif cmd == 'get_cache':
966 out = server.get_cache(password,sys.argv[2])
968 out = server.address.get_history(sys.argv[2])
970 out = server.transaction.broadcast(sys.argv[2])
972 out = server.numblocks.subscribe()
974 out = "Unknown command: '%s'" % cmd
979 # from db import MyStore
980 store = MyStore(config)
982 # supported protocols
983 thread.start_new_thread(native_server_thread, ())
984 thread.start_new_thread(tcp_server_thread, ())
985 thread.start_new_thread(http_server_thread, ())
986 thread.start_new_thread(clean_session_thread, ())
988 if (config.get('server','irc') == 'yes' ):
989 thread.start_new_thread(irc_thread, ())
991 print "starting Electrum server"
993 old_block_number = None
995 block_number = store.main_iteration()
997 if block_number != old_block_number:
998 old_block_number = block_number
999 for session_id in sessions_sub_numblocks.keys():
1000 send_numblocks(session_id)
1003 addr = store.address_queue.get(False)
1006 do_update_address(addr)
1009 print "server stopped"