handle socket error
[electrum-server.git] / server.py
1 #!/usr/bin/env python
2 # Copyright(C) 2011 thomasv@gitorious
3
4 # This program is free software: you can redistribute it and/or modify
5 # it under the terms of the GNU Affero General Public License as
6 # published by the Free Software Foundation, either version 3 of the
7 # License, or (at your option) any later version.
8 #
9 # This program is distributed in the hope that it will be useful, but
10 # WITHOUT ANY WARRANTY; without even the implied warranty of
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
12 # Affero General Public License for more details.
13 #
14 # You should have received a copy of the GNU Affero General Public
15 # License along with this program.  If not, see
16 # <http://www.gnu.org/licenses/agpl.html>.
17
18 """
19 Todo:
20    * server should check and return bitcoind status..
21    * improve txpoint sorting
22    * command to check cache
23
24  mempool transactions do not need to be added to the database; it slows it down
25 """
26
27
28 import time, json, socket, operator, thread, ast, sys,re
29 import psycopg2, binascii
30
31 from Abe.abe import hash_to_address, decode_check_address
32 from Abe.DataStore import DataStore as Datastore_class
33 from Abe import DataStore, readconf, BCDataStream,  deserialize, util, base58
34
35 import ConfigParser
36 from json import dumps, loads
37 import urllib
38
39 # we need to import electrum
40 sys.path.append('../client/')
41 from wallet import Wallet
42 from interface import Interface
43
44
45 config = ConfigParser.ConfigParser()
46 # set some defaults, which will be overwritten by the config file
47 config.add_section('server')
48 config.set('server','banner', 'Welcome to Electrum!')
49 config.set('server', 'host', 'localhost')
50 config.set('server', 'port', 50000)
51 config.set('server', 'password', '')
52 config.set('server', 'irc', 'yes')
53 config.set('server', 'cache', 'no') 
54 config.set('server', 'ircname', 'Electrum server')
55 config.add_section('database')
56 config.set('database', 'type', 'psycopg2')
57 config.set('database', 'database', 'abe')
58
59 try:
60     f = open('/etc/electrum.conf','r')
61     config.readfp(f)
62     f.close()
63 except:
64     print "Could not read electrum.conf. I will use the default values."
65
66 try:
67     f = open('/etc/electrum.banner','r')
68     config.set('server','banner', f.read())
69     f.close()
70 except:
71     pass
72
73 password = config.get('server','password')
74 bitcoind_url = 'http://%s:%s@%s:%s/' % ( config.get('bitcoind','user'), config.get('bitcoind','password'), config.get('bitcoind','host'), config.get('bitcoind','port'))
75
76 stopping = False
77 block_number = -1
78 old_block_number = -1
79 sessions = {}
80 sessions_sub_numblocks = [] # sessions that have subscribed to the service
81
82 dblock = thread.allocate_lock()
83 peer_list = {}
84
85 wallets = {} # for ultra-light clients such as bccapi
86
87 from Queue import Queue
88 input_queue = Queue()
89 output_queue = Queue()
90 address_queue = Queue()
91
92 class MyStore(Datastore_class):
93
94     def import_tx(self, tx, is_coinbase):
95         tx_id = super(MyStore, self).import_tx(tx, is_coinbase)
96         if config.get('server', 'cache') == 'yes': self.update_tx_cache(tx_id)
97
98     def update_tx_cache(self, txid):
99         inrows = self.get_tx_inputs(txid, False)
100         for row in inrows:
101             _hash = store.binout(row[6])
102             address = hash_to_address(chr(0), _hash)
103             if self.tx_cache.has_key(address):
104                 print "cache: invalidating", address
105                 self.tx_cache.pop(address)
106             address_queue.put(address)
107
108         outrows = self.get_tx_outputs(txid, False)
109         for row in outrows:
110             _hash = store.binout(row[6])
111             address = hash_to_address(chr(0), _hash)
112             if self.tx_cache.has_key(address):
113                 print "cache: invalidating", address
114                 self.tx_cache.pop(address)
115             address_queue.put(address)
116
117     def safe_sql(self,sql, params=(), lock=True):
118         try:
119             if lock: dblock.acquire()
120             ret = self.selectall(sql,params)
121             if lock: dblock.release()
122             return ret
123         except:
124             print "sql error", sql
125             return []
126
127     def get_tx_outputs(self, tx_id, lock=True):
128         return self.safe_sql("""SELECT
129                 txout.txout_pos,
130                 txout.txout_scriptPubKey,
131                 txout.txout_value,
132                 nexttx.tx_hash,
133                 nexttx.tx_id,
134                 txin.txin_pos,
135                 pubkey.pubkey_hash
136               FROM txout
137               LEFT JOIN txin ON (txin.txout_id = txout.txout_id)
138               LEFT JOIN pubkey ON (pubkey.pubkey_id = txout.pubkey_id)
139               LEFT JOIN tx nexttx ON (txin.tx_id = nexttx.tx_id)
140              WHERE txout.tx_id = %d 
141              ORDER BY txout.txout_pos
142         """%(tx_id), (), lock)
143
144     def get_tx_inputs(self, tx_id, lock=True):
145         return self.safe_sql(""" SELECT
146                 txin.txin_pos,
147                 txin.txin_scriptSig,
148                 txout.txout_value,
149                 COALESCE(prevtx.tx_hash, u.txout_tx_hash),
150                 prevtx.tx_id,
151                 COALESCE(txout.txout_pos, u.txout_pos),
152                 pubkey.pubkey_hash
153               FROM txin
154               LEFT JOIN txout ON (txout.txout_id = txin.txout_id)
155               LEFT JOIN pubkey ON (pubkey.pubkey_id = txout.pubkey_id)
156               LEFT JOIN tx prevtx ON (txout.tx_id = prevtx.tx_id)
157               LEFT JOIN unlinked_txin u ON (u.txin_id = txin.txin_id)
158              WHERE txin.tx_id = %d
159              ORDER BY txin.txin_pos
160              """%(tx_id,), (), lock)
161
162     def get_address_out_rows(self, dbhash):
163         return self.safe_sql(""" SELECT
164                 b.block_nTime,
165                 cc.chain_id,
166                 b.block_height,
167                 1,
168                 b.block_hash,
169                 tx.tx_hash,
170                 tx.tx_id,
171                 txin.txin_pos,
172                 -prevout.txout_value
173               FROM chain_candidate cc
174               JOIN block b ON (b.block_id = cc.block_id)
175               JOIN block_tx ON (block_tx.block_id = b.block_id)
176               JOIN tx ON (tx.tx_id = block_tx.tx_id)
177               JOIN txin ON (txin.tx_id = tx.tx_id)
178               JOIN txout prevout ON (txin.txout_id = prevout.txout_id)
179               JOIN pubkey ON (pubkey.pubkey_id = prevout.pubkey_id)
180              WHERE pubkey.pubkey_hash = ?
181                AND cc.in_longest = 1""", (dbhash,))
182
183     def get_address_out_rows_memorypool(self, dbhash):
184         return self.safe_sql(""" SELECT
185                 1,
186                 tx.tx_hash,
187                 tx.tx_id,
188                 txin.txin_pos,
189                 -prevout.txout_value
190               FROM tx 
191               JOIN txin ON (txin.tx_id = tx.tx_id)
192               JOIN txout prevout ON (txin.txout_id = prevout.txout_id)
193               JOIN pubkey ON (pubkey.pubkey_id = prevout.pubkey_id)
194              WHERE pubkey.pubkey_hash = ? """, (dbhash,))
195
196     def get_address_in_rows(self, dbhash):
197         return self.safe_sql(""" SELECT
198                 b.block_nTime,
199                 cc.chain_id,
200                 b.block_height,
201                 0,
202                 b.block_hash,
203                 tx.tx_hash,
204                 tx.tx_id,
205                 txout.txout_pos,
206                 txout.txout_value
207               FROM chain_candidate cc
208               JOIN block b ON (b.block_id = cc.block_id)
209               JOIN block_tx ON (block_tx.block_id = b.block_id)
210               JOIN tx ON (tx.tx_id = block_tx.tx_id)
211               JOIN txout ON (txout.tx_id = tx.tx_id)
212               JOIN pubkey ON (pubkey.pubkey_id = txout.pubkey_id)
213              WHERE pubkey.pubkey_hash = ?
214                AND cc.in_longest = 1""", (dbhash,))
215
216     def get_address_in_rows_memorypool(self, dbhash):
217         return self.safe_sql( """ SELECT
218                 0,
219                 tx.tx_hash,
220                 tx.tx_id,
221                 txout.txout_pos,
222                 txout.txout_value
223               FROM tx
224               JOIN txout ON (txout.tx_id = tx.tx_id)
225               JOIN pubkey ON (pubkey.pubkey_id = txout.pubkey_id)
226              WHERE pubkey.pubkey_hash = ? """, (dbhash,))
227
228     def get_history(self, addr):
229         
230         if config.get('server','cache') == 'yes':
231             cached_version = self.tx_cache.get( addr )
232             if cached_version is not None:
233                 return cached_version
234
235         version, binaddr = decode_check_address(addr)
236         if binaddr is None:
237             return None
238
239         dbhash = self.binin(binaddr)
240         rows = []
241         rows += self.get_address_out_rows( dbhash )
242         rows += self.get_address_in_rows( dbhash )
243
244         txpoints = []
245         known_tx = []
246
247         for row in rows:
248             try:
249                 nTime, chain_id, height, is_in, blk_hash, tx_hash, tx_id, pos, value = row
250             except:
251                 print "cannot unpack row", row
252                 break
253             tx_hash = self.hashout_hex(tx_hash)
254             txpoint = {
255                     "nTime":    int(nTime),
256                     "height":   int(height),
257                     "is_in":    int(is_in),
258                     "blk_hash": self.hashout_hex(blk_hash),
259                     "tx_hash":  tx_hash,
260                     "tx_id":    int(tx_id),
261                     "pos":      int(pos),
262                     "value":    int(value),
263                     }
264
265             txpoints.append(txpoint)
266             known_tx.append(self.hashout_hex(tx_hash))
267
268
269         # todo: sort them really...
270         txpoints = sorted(txpoints, key=operator.itemgetter("nTime"))
271
272         # read memory pool
273         rows = []
274         rows += self.get_address_in_rows_memorypool( dbhash )
275         rows += self.get_address_out_rows_memorypool( dbhash )
276         address_has_mempool = False
277
278         for row in rows:
279             is_in, tx_hash, tx_id, pos, value = row
280             tx_hash = self.hashout_hex(tx_hash)
281             if tx_hash in known_tx:
282                 continue
283
284             # this means that pending transactions were added to the db, even if they are not returned by getmemorypool
285             address_has_mempool = True
286
287             # this means pending transactions are returned by getmemorypool
288             if tx_hash not in self.mempool_keys:
289                 continue
290
291             #print "mempool", tx_hash
292             txpoint = {
293                     "nTime":    0,
294                     "height":   0,
295                     "is_in":    int(is_in),
296                     "blk_hash": 'mempool', 
297                     "tx_hash":  tx_hash,
298                     "tx_id":    int(tx_id),
299                     "pos":      int(pos),
300                     "value":    int(value),
301                     }
302             txpoints.append(txpoint)
303
304
305         for txpoint in txpoints:
306             tx_id = txpoint['tx_id']
307             
308             txinputs = []
309             inrows = self.get_tx_inputs(tx_id)
310             for row in inrows:
311                 _hash = self.binout(row[6])
312                 address = hash_to_address(chr(0), _hash)
313                 txinputs.append(address)
314             txpoint['inputs'] = txinputs
315             txoutputs = []
316             outrows = self.get_tx_outputs(tx_id)
317             for row in outrows:
318                 _hash = self.binout(row[6])
319                 address = hash_to_address(chr(0), _hash)
320                 txoutputs.append(address)
321             txpoint['outputs'] = txoutputs
322
323             # for all unspent inputs, I want their scriptpubkey. (actually I could deduce it from the address)
324             if not txpoint['is_in']:
325                 # detect if already redeemed...
326                 for row in outrows:
327                     if row[6] == dbhash: break
328                 else:
329                     raise
330                 #row = self.get_tx_output(tx_id,dbhash)
331                 # pos, script, value, o_hash, o_id, o_pos, binaddr = row
332                 # if not redeemed, we add the script
333                 if row:
334                     if not row[4]: txpoint['raw_scriptPubKey'] = row[1]
335
336         # cache result
337         if config.get('server','cache') == 'yes' and not address_has_mempool:
338             self.tx_cache[addr] = txpoints
339         
340         return txpoints
341
342
343
344 class Direct_Interface(Interface):
345     def __init__(self):
346         pass
347
348     def handler(self, method, params = ''):
349         cmds = {'session.new':new_session,
350                 'session.poll':poll_session,
351                 'session.update':update_session,
352                 'blockchain.transaction.broadcast':send_tx,
353                 'blockchain.address.get_history':store.get_history
354                 }
355         func = cmds[method]
356         return func( params )
357
358
359
360 def send_tx(tx):
361     postdata = dumps({"method": 'importtransaction', 'params': [tx], 'id':'jsonrpc'})
362     respdata = urllib.urlopen(bitcoind_url, postdata).read()
363     r = loads(respdata)
364     if r['error'] != None:
365         out = "error: transaction rejected by memorypool\n"+tx
366     else:
367         out = r['result']
368     return out
369
370
371
372 def random_string(N):
373     import random, string
374     return ''.join(random.choice(string.ascii_uppercase + string.digits) for x in range(N))
375
376     
377
378 def cmd_stop(data):
379     global stopping
380     if password == data:
381         stopping = True
382         return 'ok'
383     else:
384         return 'wrong password'
385
386 def cmd_load(pw):
387     if password == pw:
388         return repr( len(sessions) )
389     else:
390         return 'wrong password'
391
392
393 def clear_cache(pw):
394     if password == pw:
395         store.tx_cache = {}
396         return 'ok'
397     else:
398         return 'wrong password'
399
400 def get_cache(pw,addr):
401     if password == pw:
402         return store.tx_cache.get(addr)
403     else:
404         return 'wrong password'
405
406
407 def poll_session(session_id):
408     session = sessions.get(session_id)
409     if session is None:
410         print time.asctime(), "session not found", session_id
411         out = repr( (-1, {}))
412     else:
413         t1 = time.time()
414         addresses = session['addresses']
415         session['last_time'] = time.time()
416         ret = {}
417         k = 0
418         for addr in addresses:
419             if store.tx_cache.get( addr ) is not None: k += 1
420             status = get_address_status( addr )
421             last_status = addresses.get( addr )
422             if last_status != status:
423                 addresses[addr] = status
424                 ret[addr] = status
425         if ret:
426             sessions[session_id]['addresses'] = addresses
427         out = repr( (block_number, ret ) )
428         t2 = time.time() - t1 
429         if t2 > 10:
430             print "high load:", session_id, "%d/%d"%(k,len(addresses)), t2
431
432         return out
433
434
435 def do_update_address(addr):
436     # an address was involved in a transaction; we check if it was subscribed to in a session
437     # the address can be subscribed in several sessions; the cache should ensure that we don't do redundant requests
438     for session_id in sessions.keys():
439         session = sessions[session_id]
440         if session.get('type') != 'subscribe': continue
441         addresses = session['addresses'].keys()
442
443         if addr in addresses:
444             print "address ", addr, "found in session", session_id
445             status = get_address_status( addr )
446             print "new_status:", status
447             last_status = session['addresses'][addr]
448             print "last_status", last_status
449             if last_status != status:
450                 print "status is new", addr
451                 send_status(session_id,addr,status)
452                 sessions[session_id]['addresses'][addr] = status
453
454
455 def get_address_status(addr):
456     # get address status, i.e. the last block for that address.
457     tx_points = store.get_history(addr)
458     if not tx_points:
459         status = None
460     else:
461         lastpoint = tx_points[-1]
462         status = lastpoint['blk_hash']
463         # this is a temporary hack; move it up once old clients have disappeared
464         if status == 'mempool': # and session['version'] != "old":
465             status = status + ':%d'% len(tx_points)
466     return status
467
468
469 def send_numblocks(session_id):
470     out = json.dumps( {'method':'numblocks.subscribe', 'result':block_number} )
471     output_queue.put((session_id, out))
472
473 def send_status(session_id, address, status):
474     out = json.dumps( { 'method':'address.subscribe', 'address':address, 'status':status } )
475     output_queue.put((session_id, out))
476
477 def subscribe_to_numblocks(session_id):
478     sessions_sub_numblocks.append(session_id)
479     send_numblocks(session_id)
480
481 def subscribe_to_address(session_id, address):
482     status = get_address_status(address)
483     sessions[session_id]['type'] = 'subscribe'
484     sessions[session_id]['addresses'][address] = status
485     sessions[session_id]['last_time'] = time.time()
486     send_status(session_id, address, status)
487
488 def new_session(version, addresses):
489     session_id = random_string(10)
490     sessions[session_id] = { 'addresses':{}, 'version':version }
491     for a in addresses:
492         sessions[session_id]['addresses'][a] = ''
493     out = repr( (session_id, config.get('server','banner').replace('\\n','\n') ) )
494     sessions[session_id]['last_time'] = time.time()
495     return out
496
497 def update_session(session_id,addresses):
498     sessions[session_id]['addresses'] = {}
499     for a in addresses:
500         sessions[session_id]['addresses'][a] = ''
501     sessions[session_id]['last_time'] = time.time()
502     return 'ok'
503
504 def native_server_thread():
505     s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
506     s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
507     s.bind((config.get('server','host'), config.getint('server','port')))
508     s.listen(1)
509     while not stopping:
510         conn, addr = s.accept()
511         try:
512             thread.start_new_thread(native_client_thread, (addr, conn,))
513         except:
514             # can't start new thread if there is no memory..
515             traceback.print_exc(file=sys.stdout)
516
517
518 def native_client_thread(ipaddr,conn):
519     #print "client thread", ipaddr
520     try:
521         ipaddr = ipaddr[0]
522         msg = ''
523         while 1:
524             d = conn.recv(1024)
525             msg += d
526             if not d: 
527                 break
528             if '#' in msg:
529                 msg = msg.split('#', 1)[0]
530                 break
531         try:
532             cmd, data = ast.literal_eval(msg)
533         except:
534             print "syntax error", repr(msg), ipaddr
535             conn.close()
536             return
537
538         out = do_command(cmd, data, ipaddr)
539         if out:
540             #print ipaddr, cmd, len(out)
541             try:
542                 conn.send(out)
543             except:
544                 print "error, could not send"
545
546     finally:
547         conn.close()
548
549
550
551 # used by the native handler
552 def do_command(cmd, data, ipaddr):
553
554     timestr = time.strftime("[%d/%m/%Y-%H:%M:%S]")
555
556     if cmd=='b':
557         out = "%d"%block_number
558
559     elif cmd in ['session','new_session']:
560         try:
561             if cmd == 'session':
562                 addresses = ast.literal_eval(data)
563                 version = "old"
564             else:
565                 version, addresses = ast.literal_eval(data)
566                 if version[0]=="0": version = "v" + version
567         except:
568             print "error", data
569             return None
570         print timestr, "new session", ipaddr, addresses[0] if addresses else addresses, len(addresses), version
571         out = new_session(version, addresses)
572
573     elif cmd=='update_session':
574         try:
575             session_id, addresses = ast.literal_eval(data)
576         except:
577             print "error"
578             return None
579         print timestr, "update session", ipaddr, addresses[0] if addresses else addresses, len(addresses)
580         out = update_session(session_id,addresses)
581
582     elif cmd == 'bccapi_login':
583         import electrum
584         print "data",data
585         v, k = ast.literal_eval(data)
586         master_public_key = k.decode('hex') # todo: sanitize. no need to decode twice...
587         print master_public_key
588         wallet_id = random_string(10)
589         w = Wallet( Direct_Interface() )
590         w.master_public_key = master_public_key.decode('hex')
591         w.synchronize()
592         wallets[wallet_id] = w
593         out = wallet_id
594         print "wallets", wallets
595
596     elif cmd == 'bccapi_getAccountInfo':
597         from wallet import int_to_hex
598         v, wallet_id = ast.literal_eval(data)
599         w = wallets.get(wallet_id)
600         if w is not None:
601             num = len(w.addresses)
602             c, u = w.get_balance()
603             out = int_to_hex(num,4) + int_to_hex(c,8) + int_to_hex( c+u, 8 )
604             out = out.decode('hex')
605         else:
606             print "error",data
607             out = "error"
608
609     elif cmd == 'bccapi_getAccountStatement':
610         from wallet import int_to_hex
611         v, wallet_id = ast.literal_eval(data)
612         w = wallets.get(wallet_id)
613         if w is not None:
614             num = len(w.addresses)
615             c, u = w.get_balance()
616             total_records = num_records = 0
617             out = int_to_hex(num,4) + int_to_hex(c,8) + int_to_hex( c+u, 8 ) + int_to_hex( total_records ) + int_to_hex( num_records )
618             out = out.decode('hex')
619         else:
620             print "error",data
621             out = "error"
622
623     elif cmd == 'bccapi_getSendCoinForm':
624         out = ''
625
626     elif cmd == 'bccapi_submitTransaction':
627         out = ''
628             
629     elif cmd=='poll': 
630         out = poll_session(data)
631
632     elif cmd == 'h': 
633         # history
634         address = data
635         out = repr( store.get_history( address ) )
636
637     elif cmd == 'load': 
638         out = cmd_load(data)
639
640     elif cmd =='tx':
641         out = send_tx(data)
642         print timestr, "sent tx:", ipaddr, out
643
644     elif cmd == 'stop':
645         out = cmd_stop(data)
646
647     elif cmd == 'peers':
648         out = repr(peer_list.values())
649
650     else:
651         out = None
652
653     return out
654
655
656
657 ####################################################################
658
659 def tcp_server_thread():
660     thread.start_new_thread(process_input_queue, ())
661     thread.start_new_thread(process_output_queue, ())
662
663     s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
664     s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
665     s.bind((config.get('server','host'), 50001))
666     s.listen(1)
667     while not stopping:
668         conn, addr = s.accept()
669         try:
670             thread.start_new_thread(tcp_client_thread, (addr, conn,))
671         except:
672             # can't start new thread if there is no memory..
673             traceback.print_exc(file=sys.stdout)
674
675
676 def close_session(session_id):
677     print "lost connection", session_id
678     sessions.pop(session_id)
679     if session_id in sessions_sub_numblocks:
680         sessions_sub_numblocks.remove(session_id)
681
682
683 # one thread per client. put requests in a queue.
684 def tcp_client_thread(ipaddr,conn):
685     """ use a persistent connection. put commands in a queue."""
686     print "persistent client thread", ipaddr
687     global sessions
688
689     session_id = random_string(10)
690     sessions[session_id] = { 'conn':conn, 'addresses':{}, 'version':'unknown' }
691
692     ipaddr = ipaddr[0]
693     msg = ''
694
695     while not stopping:
696         try:
697             d = conn.recv(1024)
698         except socket.error:
699             d = ''
700         if not d:
701             close_session(session_id)
702             break
703
704         msg += d
705         while True:
706             s = msg.find('\n')
707             if s ==-1:
708                 break
709             else:
710                 c = msg[0:s].strip()
711                 msg = msg[s+1:]
712                 if c == 'quit': 
713                     conn.close()
714                     close_session(session_id)
715                     return
716                 try:
717                     c = json.loads(c)
718                 except:
719                     print "json error", repr(c)
720                     continue
721                 try:
722                     cmd = c['method']
723                     data = c['params']
724                 except:
725                     print "syntax error", repr(c), ipaddr
726                     continue
727
728                 # add to queue
729                 input_queue.put((session_id, cmd, data))
730
731
732
733 # read commands from the input queue. perform requests, etc. this should be called from the main thread.
734 def process_input_queue():
735     while not stopping:
736         session_id, cmd, data = input_queue.get()
737         if session_id not in sessions.keys():
738             continue
739         out = None
740         if cmd == 'address.subscribe':
741             subscribe_to_address(session_id,data)
742         elif cmd == 'numblocks.subscribe':
743             subscribe_to_numblocks(session_id)
744         elif cmd == 'client.version':
745             sessions[session_id]['version'] = data
746         elif cmd == 'server.banner':
747             out = json.dumps( { 'method':'server.banner', 'result':config.get('server','banner').replace('\\n','\n') } )
748         elif cmd == 'server.peers':
749             out = json.dumps( { 'method':'server.peers', 'result':peer_list.values() } )
750         elif cmd == 'address.get_history':
751             address = data
752             out = json.dumps( { 'method':'address.get_history', 'address':address, 'result':store.get_history( address ) } )
753         elif cmd == 'transaction.broadcast':
754             txo = send_tx(data)
755             print "sent tx:", txo
756             out = json.dumps( { 'method':'transaction.broadcast', 'result':txo } )
757         else:
758             print "unknown command", cmd
759         if out:
760             output_queue.put((session_id, out))
761
762 # this is a separate thread
763 def process_output_queue():
764     while not stopping:
765         session_id, out = output_queue.get()
766         session = sessions.get(session_id)
767         if session: 
768             try:
769                 conn = session.get('conn')
770                 conn.send(out+'\n')
771             except:
772                 close_session(session_id)
773                 
774
775
776
777 ####################################################################
778
779
780 def memorypool_update(store):
781     ds = BCDataStream.BCDataStream()
782     store.mempool_keys = []
783
784     postdata = dumps({"method": 'getmemorypool', 'params': [], 'id':'jsonrpc'})
785     respdata = urllib.urlopen(bitcoind_url, postdata).read()
786     r = loads(respdata)
787     if r['error'] != None:
788         return
789
790     v = r['result'].get('transactions')
791     for hextx in v:
792         ds.clear()
793         ds.write(hextx.decode('hex'))
794         tx = deserialize.parse_Transaction(ds)
795         tx['hash'] = util.double_sha256(tx['tx'])
796         tx_hash = tx['hash'][::-1].encode('hex')
797         store.mempool_keys.append(tx_hash)
798         if store.tx_find_id_and_value(tx):
799             pass
800         else:
801             store.import_tx(tx, False)
802
803     store.commit()
804
805
806
807 def clean_session_thread():
808     while not stopping:
809         time.sleep(30)
810         t = time.time()
811         for k,s in sessions.items():
812             if s.get('type') == 'subscribe': continue
813             t0 = s['last_time']
814             if t - t0 > 5*60:
815                 sessions.pop(k)
816                 print "lost session", k
817             
818
819 def irc_thread():
820     global peer_list
821     NICK = 'E_'+random_string(10)
822     while not stopping:
823         try:
824             s = socket.socket()
825             s.connect(('irc.freenode.net', 6667))
826             s.send('USER electrum 0 * :'+config.get('server','host')+' '+config.get('server','ircname')+'\n')
827             s.send('NICK '+NICK+'\n')
828             s.send('JOIN #electrum\n')
829             sf = s.makefile('r', 0)
830             t = 0
831             while not stopping:
832                 line = sf.readline()
833                 line = line.rstrip('\r\n')
834                 line = line.split()
835                 if line[0]=='PING': 
836                     s.send('PONG '+line[1]+'\n')
837                 elif '353' in line: # answer to /names
838                     k = line.index('353')
839                     for item in line[k+1:]:
840                         if item[0:2] == 'E_':
841                             s.send('WHO %s\n'%item)
842                 elif '352' in line: # answer to /who
843                     # warning: this is a horrible hack which apparently works
844                     k = line.index('352')
845                     ip = line[k+4]
846                     ip = socket.gethostbyname(ip)
847                     name = line[k+6]
848                     host = line[k+9]
849                     peer_list[name] = (ip,host)
850                 if time.time() - t > 5*60:
851                     s.send('NAMES #electrum\n')
852                     t = time.time()
853                     peer_list = {}
854         except:
855             traceback.print_exc(file=sys.stdout)
856         finally:
857             sf.close()
858             s.close()
859
860
861
862 def http_server_thread(store):
863     # see http://code.google.com/p/jsonrpclib/
864     from SocketServer import ThreadingMixIn
865     from jsonrpclib.SimpleJSONRPCServer import SimpleJSONRPCServer
866     class SimpleThreadedJSONRPCServer(ThreadingMixIn, SimpleJSONRPCServer): pass
867     server = SimpleThreadedJSONRPCServer(( config.get('server','host'), 8081))
868     server.register_function(lambda : peer_list.values(), 'peers')
869     server.register_function(cmd_stop, 'stop')
870     server.register_function(cmd_load, 'load')
871     server.register_function(lambda : block_number, 'blocks')
872     server.register_function(clear_cache, 'clear_cache')
873     server.register_function(get_cache, 'get_cache')
874     server.register_function(send_tx, 'blockchain.transaction.broadcast')
875     server.register_function(store.get_history, 'blockchain.address.get_history')
876     server.register_function(new_session, 'session.new')
877     server.register_function(update_session, 'session.update')
878     server.register_function(poll_session, 'session.poll')
879     server.serve_forever()
880
881
882 import traceback
883
884
885 if __name__ == '__main__':
886
887     if len(sys.argv)>1:
888         import jsonrpclib
889         server = jsonrpclib.Server('http://%s:8081'%config.get('server','host'))
890         cmd = sys.argv[1]
891         if cmd == 'load':
892             out = server.load(password)
893         elif cmd == 'peers':
894             out = server.peers()
895         elif cmd == 'stop':
896             out = server.stop(password)
897         elif cmd == 'clear_cache':
898             out = server.clear_cache(password)
899         elif cmd == 'get_cache':
900             out = server.get_cache(password,sys.argv[2])
901         elif cmd == 'h':
902             out = server.blockchain.address.get_history(sys.argv[2])
903         elif cmd == 'tx':
904             out = server.blockchain.transaction.broadcast(sys.argv[2])
905         elif cmd == 'b':
906             out = server.blocks()
907         else:
908             out = "Unknown command: '%s'" % cmd
909         print out
910         sys.exit(0)
911
912
913     print "starting Electrum server"
914     print "cache:", config.get('server', 'cache')
915
916     conf = DataStore.CONFIG_DEFAULTS
917     args, argv = readconf.parse_argv( [], conf)
918     args.dbtype= config.get('database','type')
919     if args.dbtype == 'sqlite3':
920         args.connect_args = { 'database' : config.get('database','database') }
921     elif args.dbtype == 'MySQLdb':
922         args.connect_args = { 'db' : config.get('database','database'), 'user' : config.get('database','username'), 'passwd' : config.get('database','password') }
923     elif args.dbtype == 'psycopg2':
924         args.connect_args = { 'database' : config.get('database','database') }
925     store = MyStore(args)
926     store.tx_cache = {}
927     store.mempool_keys = {}
928
929     # supported protocols
930     thread.start_new_thread(native_server_thread, ())
931     thread.start_new_thread(tcp_server_thread, ())
932     thread.start_new_thread(http_server_thread, (store,))
933
934     thread.start_new_thread(clean_session_thread, ())
935
936     if (config.get('server','irc') == 'yes' ):
937         thread.start_new_thread(irc_thread, ())
938
939     while not stopping:
940         try:
941             dblock.acquire()
942             store.catch_up()
943             memorypool_update(store)
944             block_number = store.get_block_number(1)
945
946             if block_number != old_block_number:
947                 old_block_number = block_number
948                 for session_id in sessions_sub_numblocks:
949                     send_numblocks(session_id)
950
951         except IOError:
952             print "IOError: cannot reach bitcoind"
953             block_number = 0
954         except:
955             traceback.print_exc(file=sys.stdout)
956             block_number = 0
957         finally:
958             dblock.release()
959
960         # do addresses
961         while True:
962             try:
963                 addr = address_queue.get(False)
964             except:
965                 break
966             do_update_address(addr)
967
968         time.sleep(10)
969
970     print "server stopped"
971