fix
[electrum-server.git] / server.py
1 #!/usr/bin/env python
2 # Copyright(C) 2012 thomasv@gitorious
3
4 # This program is free software: you can redistribute it and/or modify
5 # it under the terms of the GNU Affero General Public License as
6 # published by the Free Software Foundation, either version 3 of the
7 # License, or (at your option) any later version.
8 #
9 # This program is distributed in the hope that it will be useful, but
10 # WITHOUT ANY WARRANTY; without even the implied warranty of
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
12 # Affero General Public License for more details.
13 #
14 # You should have received a copy of the GNU Affero General Public
15 # License along with this program.  If not, see
16 # <http://www.gnu.org/licenses/agpl.html>.
17
18 """
19 Todo:
20    * server should check and return bitcoind status..
21    * improve txpoint sorting
22    * command to check cache
23
24  mempool transactions do not need to be added to the database; it slows it down
25 """
26
27
28 import time, json, socket, operator, thread, ast, sys,re
29
30
31 import ConfigParser
32 from json import dumps, loads
33 import urllib
34
35 # we need to import electrum
36 sys.path.append('../client/')
37 from wallet import Wallet
38 from interface import Interface
39
40
41 config = ConfigParser.ConfigParser()
42 # set some defaults, which will be overwritten by the config file
43 config.add_section('server')
44 config.set('server','banner', 'Welcome to Electrum!')
45 config.set('server', 'host', 'localhost')
46 config.set('server', 'port', '50000')
47 config.set('server', 'password', '')
48 config.set('server', 'irc', 'yes')
49 config.set('server', 'ircname', 'Electrum server')
50 config.add_section('database')
51 config.set('database', 'type', 'psycopg2')
52 config.set('database', 'database', 'abe')
53
54 try:
55     f = open('/etc/electrum.conf','r')
56     config.readfp(f)
57     f.close()
58 except:
59     print "Could not read electrum.conf. I will use the default values."
60
61 try:
62     f = open('/etc/electrum.banner','r')
63     config.set('server','banner', f.read())
64     f.close()
65 except:
66     pass
67
68
69 password = config.get('server','password')
70
71 stopping = False
72 block_number = -1
73 old_block_number = -1
74 sessions = {}
75 sessions_sub_numblocks = {} # sessions that have subscribed to the service
76
77 m_sessions = [{}] # served by http
78
79 peer_list = {}
80
81 wallets = {} # for ultra-light clients such as bccapi
82
83 from Queue import Queue
84 input_queue = Queue()
85 output_queue = Queue()
86 address_queue = Queue()
87
88
89
90
91
92 class Direct_Interface(Interface):
93     def __init__(self):
94         pass
95
96     def handler(self, method, params = ''):
97         cmds = {'session.new':new_session,
98                 'session.poll':poll_session,
99                 'session.update':update_session,
100                 'transaction.broadcast':send_tx,
101                 'address.get_history':store.get_history
102                 }
103         func = cmds[method]
104         return func( params )
105
106
107
108 def send_tx(tx):
109     postdata = dumps({"method": 'importtransaction', 'params': [tx], 'id':'jsonrpc'})
110     respdata = urllib.urlopen(bitcoind_url, postdata).read()
111     r = loads(respdata)
112     if r['error'] != None:
113         out = "error: transaction rejected by memorypool\n"+tx
114     else:
115         out = r['result']
116     return out
117
118
119
120 def random_string(N):
121     import random, string
122     return ''.join(random.choice(string.ascii_uppercase + string.digits) for x in range(N))
123
124     
125
126 def cmd_stop(_,__,pw):
127     global stopping
128     if password == pw:
129         stopping = True
130         return 'ok'
131     else:
132         return 'wrong password'
133
134 def cmd_load(_,__,pw):
135     if password == pw:
136         return repr( len(sessions) )
137     else:
138         return 'wrong password'
139
140
141
142
143
144 def modified_addresses(session):
145     if 1:
146         t1 = time.time()
147         addresses = session['addresses']
148         session['last_time'] = time.time()
149         ret = {}
150         k = 0
151         for addr in addresses:
152             status = get_address_status( addr )
153             msg_id, last_status = addresses.get( addr )
154             if last_status != status:
155                 addresses[addr] = msg_id, status
156                 ret[addr] = status
157
158         t2 = time.time() - t1 
159         #if t2 > 10: print "high load:", session_id, "%d/%d"%(k,len(addresses)), t2
160         return ret, addresses
161
162
163 def poll_session(session_id): 
164     # native
165     session = sessions.get(session_id)
166     if session is None:
167         print time.asctime(), "session not found", session_id
168         return -1, {}
169     else:
170         ret, addresses = modified_addresses(session)
171         if ret: sessions[session_id]['addresses'] = addresses
172         return repr( (block_number,ret))
173
174
175 def poll_session_json(session_id, message_id):
176     session = m_sessions[0].get(session_id)
177     if session is None:
178         raise BaseException("session not found %s"%session_id)
179     else:
180         out = []
181         ret, addresses = modified_addresses(session)
182         if ret: 
183             m_sessions[0][session_id]['addresses'] = addresses
184             for addr in ret:
185                 msg_id, status = addresses[addr]
186                 out.append(  { 'id':msg_id, 'result':status } )
187
188         msg_id, last_nb = session.get('numblocks')
189         if last_nb:
190             if last_nb != block_number:
191                 m_sessions[0][session_id]['numblocks'] = msg_id, block_number
192                 out.append( {'id':msg_id, 'result':block_number} )
193
194         return out
195
196
197 def do_update_address(addr):
198     # an address was involved in a transaction; we check if it was subscribed to in a session
199     # the address can be subscribed in several sessions; the cache should ensure that we don't do redundant requests
200
201     for session_id in sessions.keys():
202         session = sessions[session_id]
203         if session.get('type') != 'persistent': continue
204         addresses = session['addresses'].keys()
205
206         if addr in addresses:
207             status = get_address_status( addr )
208             message_id, last_status = session['addresses'][addr]
209             if last_status != status:
210                 #print "sending new status for %s:"%addr, status
211                 send_status(session_id,message_id,addr,status)
212                 sessions[session_id]['addresses'][addr] = (message_id,status)
213
214 def get_address_status(addr):
215     # get address status, i.e. the last block for that address.
216     tx_points = store.get_history(addr)
217     if not tx_points:
218         status = None
219     else:
220         lastpoint = tx_points[-1]
221         status = lastpoint['blk_hash']
222         # this is a temporary hack; move it up once old clients have disappeared
223         if status == 'mempool': # and session['version'] != "old":
224             status = status + ':%d'% len(tx_points)
225     return status
226
227
228 def send_numblocks(session_id):
229     message_id = sessions_sub_numblocks[session_id]
230     out = json.dumps( {'id':message_id, 'result':block_number} )
231     output_queue.put((session_id, out))
232
233 def send_status(session_id, message_id, address, status):
234     out = json.dumps( { 'id':message_id, 'result':status } )
235     output_queue.put((session_id, out))
236
237 def address_get_history_json(_,message_id,address):
238     return store.get_history(address)
239
240 def subscribe_to_numblocks(session_id, message_id):
241     sessions_sub_numblocks[session_id] = message_id
242     send_numblocks(session_id)
243
244 def subscribe_to_numblocks_json(session_id, message_id):
245     global m_sessions
246     m_sessions[0][session_id]['numblocks'] = message_id,block_number
247     return block_number
248
249 def subscribe_to_address(session_id, message_id, address):
250     status = get_address_status(address)
251     sessions[session_id]['addresses'][address] = (message_id, status)
252     sessions[session_id]['last_time'] = time.time()
253     send_status(session_id, message_id, address, status)
254
255 def add_address_to_session_json(session_id, message_id, address):
256     global m_sessions
257     sessions = m_sessions[0]
258     status = get_address_status(address)
259     sessions[session_id]['addresses'][address] = (message_id, status)
260     sessions[session_id]['last_time'] = time.time()
261     m_sessions[0] = sessions
262     return status
263
264 def add_address_to_session(session_id, address):
265     status = get_address_status(address)
266     sessions[session_id]['addresses'][addr] = ("", status)
267     sessions[session_id]['last_time'] = time.time()
268     return status
269
270 def new_session(version, addresses):
271     session_id = random_string(10)
272     sessions[session_id] = { 'addresses':{}, 'version':version }
273     for a in addresses:
274         sessions[session_id]['addresses'][a] = ('','')
275     out = repr( (session_id, config.get('server','banner').replace('\\n','\n') ) )
276     sessions[session_id]['last_time'] = time.time()
277     return out
278
279
280 def client_version_json(session_id, _, version):
281     global m_sessions
282     sessions = m_sessions[0]
283     sessions[session_id]['version'] = version
284     m_sessions[0] = sessions
285
286 def create_session_json(_, __):
287     sessions = m_sessions[0]
288     session_id = random_string(10)
289     print "creating session", session_id
290     sessions[session_id] = { 'addresses':{}, 'numblocks':('','') }
291     sessions[session_id]['last_time'] = time.time()
292     m_sessions[0] = sessions
293     return session_id
294
295
296
297 def get_banner(_,__):
298     return config.get('server','banner').replace('\\n','\n')
299
300 def update_session(session_id,addresses):
301     """deprecated in 0.42"""
302     sessions[session_id]['addresses'] = {}
303     for a in addresses:
304         sessions[session_id]['addresses'][a] = ''
305     sessions[session_id]['last_time'] = time.time()
306     return 'ok'
307
308 def native_server_thread():
309     s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
310     s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
311     s.bind((config.get('server','host'), config.getint('server','port')))
312     s.listen(1)
313     while not stopping:
314         conn, addr = s.accept()
315         try:
316             thread.start_new_thread(native_client_thread, (addr, conn,))
317         except:
318             # can't start new thread if there is no memory..
319             traceback.print_exc(file=sys.stdout)
320
321
322 def native_client_thread(ipaddr,conn):
323     #print "client thread", ipaddr
324     try:
325         ipaddr = ipaddr[0]
326         msg = ''
327         while 1:
328             d = conn.recv(1024)
329             msg += d
330             if not d: 
331                 break
332             if '#' in msg:
333                 msg = msg.split('#', 1)[0]
334                 break
335         try:
336             cmd, data = ast.literal_eval(msg)
337         except:
338             print "syntax error", repr(msg), ipaddr
339             conn.close()
340             return
341
342         out = do_command(cmd, data, ipaddr)
343         if out:
344             #print ipaddr, cmd, len(out)
345             try:
346                 conn.send(out)
347             except:
348                 print "error, could not send"
349
350     finally:
351         conn.close()
352
353
354 def timestr():
355     return time.strftime("[%d/%m/%Y-%H:%M:%S]")
356
357 # used by the native handler
358 def do_command(cmd, data, ipaddr):
359
360     if cmd=='b':
361         out = "%d"%block_number
362
363     elif cmd in ['session','new_session']:
364         try:
365             if cmd == 'session':
366                 addresses = ast.literal_eval(data)
367                 version = "old"
368             else:
369                 version, addresses = ast.literal_eval(data)
370                 if version[0]=="0": version = "v" + version
371         except:
372             print "error", data
373             return None
374         print timestr(), "new session", ipaddr, addresses[0] if addresses else addresses, len(addresses), version
375         out = new_session(version, addresses)
376
377     elif cmd=='address.subscribe':
378         try:
379             session_id, addr = ast.literal_eval(data)
380         except:
381             print "error"
382             return None
383         out = add_address_to_session(session_id,addr)
384
385     elif cmd=='update_session':
386         try:
387             session_id, addresses = ast.literal_eval(data)
388         except:
389             print "error"
390             return None
391         print timestr(), "update session", ipaddr, addresses[0] if addresses else addresses, len(addresses)
392         out = update_session(session_id,addresses)
393
394     elif cmd == 'bccapi_login':
395         import electrum
396         print "data",data
397         v, k = ast.literal_eval(data)
398         master_public_key = k.decode('hex') # todo: sanitize. no need to decode twice...
399         print master_public_key
400         wallet_id = random_string(10)
401         w = Wallet( Direct_Interface() )
402         w.master_public_key = master_public_key.decode('hex')
403         w.synchronize()
404         wallets[wallet_id] = w
405         out = wallet_id
406         print "wallets", wallets
407
408     elif cmd == 'bccapi_getAccountInfo':
409         from wallet import int_to_hex
410         v, wallet_id = ast.literal_eval(data)
411         w = wallets.get(wallet_id)
412         if w is not None:
413             num = len(w.addresses)
414             c, u = w.get_balance()
415             out = int_to_hex(num,4) + int_to_hex(c,8) + int_to_hex( c+u, 8 )
416             out = out.decode('hex')
417         else:
418             print "error",data
419             out = "error"
420
421     elif cmd == 'bccapi_getAccountStatement':
422         from wallet import int_to_hex
423         v, wallet_id = ast.literal_eval(data)
424         w = wallets.get(wallet_id)
425         if w is not None:
426             num = len(w.addresses)
427             c, u = w.get_balance()
428             total_records = num_records = 0
429             out = int_to_hex(num,4) + int_to_hex(c,8) + int_to_hex( c+u, 8 ) + int_to_hex( total_records ) + int_to_hex( num_records )
430             out = out.decode('hex')
431         else:
432             print "error",data
433             out = "error"
434
435     elif cmd == 'bccapi_getSendCoinForm':
436         out = ''
437
438     elif cmd == 'bccapi_submitTransaction':
439         out = ''
440             
441     elif cmd=='poll': 
442         out = poll_session(data)
443
444     elif cmd == 'h': 
445         # history
446         address = data
447         out = repr( store.get_history( address ) )
448
449     elif cmd == 'load': 
450         out = cmd_load(None,None,data)
451
452     elif cmd =='tx':
453         out = send_tx(data)
454         print timestr(), "sent tx:", ipaddr, out
455
456     elif cmd == 'stop':
457         out = cmd_stop(data)
458
459     elif cmd == 'peers':
460         out = repr(peer_list.values())
461
462     else:
463         out = None
464
465     return out
466
467
468
469 ####################################################################
470
471 def tcp_server_thread():
472     thread.start_new_thread(process_input_queue, ())
473     thread.start_new_thread(process_output_queue, ())
474
475     s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
476     s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
477     s.bind((config.get('server','host'), 50001))
478     s.listen(1)
479     while not stopping:
480         conn, addr = s.accept()
481         try:
482             thread.start_new_thread(tcp_client_thread, (addr, conn,))
483         except:
484             # can't start new thread if there is no memory..
485             traceback.print_exc(file=sys.stdout)
486
487
488 def close_session(session_id):
489     #print "lost connection", session_id
490     sessions.pop(session_id)
491     if session_id in sessions_sub_numblocks:
492         sessions_sub_numblocks.pop(session_id)
493
494
495 # one thread per client. put requests in a queue.
496 def tcp_client_thread(ipaddr,conn):
497     """ use a persistent connection. put commands in a queue."""
498
499     print timestr(), "TCP session", ipaddr
500     global sessions
501
502     session_id = random_string(10)
503     sessions[session_id] = { 'conn':conn, 'addresses':{}, 'version':'unknown', 'type':'persistent' }
504
505     ipaddr = ipaddr[0]
506     msg = ''
507
508     while not stopping:
509         try:
510             d = conn.recv(1024)
511         except socket.error:
512             d = ''
513         if not d:
514             close_session(session_id)
515             break
516
517         msg += d
518         while True:
519             s = msg.find('\n')
520             if s ==-1:
521                 break
522             else:
523                 c = msg[0:s].strip()
524                 msg = msg[s+1:]
525                 if c == 'quit': 
526                     conn.close()
527                     close_session(session_id)
528                     return
529                 try:
530                     c = json.loads(c)
531                 except:
532                     print "json error", repr(c)
533                     continue
534                 try:
535                     message_id = c.get('id')
536                     method = c.get('method')
537                     params = c.get('params')
538                 except:
539                     print "syntax error", repr(c), ipaddr
540                     continue
541
542                 # add to queue
543                 input_queue.put((session_id, message_id, method, params))
544
545
546
547 # read commands from the input queue. perform requests, etc. this should be called from the main thread.
548 def process_input_queue():
549     while not stopping:
550         session_id, message_id, method, data = input_queue.get()
551         if session_id not in sessions.keys():
552             continue
553         out = None
554         if method == 'address.subscribe':
555             address = data[0]
556             subscribe_to_address(session_id,message_id,address)
557         elif method == 'numblocks.subscribe':
558             subscribe_to_numblocks(session_id,message_id)
559         elif method == 'client.version':
560             sessions[session_id]['version'] = data[0]
561         elif method == 'server.banner':
562             out = { 'result':config.get('server','banner').replace('\\n','\n') } 
563         elif method == 'server.peers':
564             out = { 'result':peer_list.values() } 
565         elif method == 'address.get_history':
566             address = data[0]
567             out = { 'result':store.get_history( address ) } 
568         elif method == 'transaction.broadcast':
569             postdata = dumps({"method": 'importtransaction', 'params': [data], 'id':'jsonrpc'})
570             txo = urllib.urlopen(bitcoind_url, postdata).read()
571             print "sent tx:", txo
572             out = json.loads(txo)
573         else:
574             print "unknown command", method
575         if out:
576             out['id'] = message_id
577             out = json.dumps( out )
578             output_queue.put((session_id, out))
579
580 # this is a separate thread
581 def process_output_queue():
582     while not stopping:
583         session_id, out = output_queue.get()
584         session = sessions.get(session_id)
585         if session: 
586             try:
587                 conn = session.get('conn')
588                 conn.send(out+'\n')
589             except:
590                 close_session(session_id)
591                 
592
593
594
595 ####################################################################
596
597
598
599
600 def clean_session_thread():
601     while not stopping:
602         time.sleep(30)
603         t = time.time()
604         for k,s in sessions.items():
605             if s.get('type') == 'persistent': continue
606             t0 = s['last_time']
607             if t - t0 > 5*60:
608                 sessions.pop(k)
609                 print "lost session", k
610             
611
612 def irc_thread():
613     global peer_list
614     NICK = 'E_'+random_string(10)
615     while not stopping:
616         try:
617             s = socket.socket()
618             s.connect(('irc.freenode.net', 6667))
619             s.send('USER electrum 0 * :'+config.get('server','host')+' '+config.get('server','ircname')+'\n')
620             s.send('NICK '+NICK+'\n')
621             s.send('JOIN #electrum\n')
622             sf = s.makefile('r', 0)
623             t = 0
624             while not stopping:
625                 line = sf.readline()
626                 line = line.rstrip('\r\n')
627                 line = line.split()
628                 if line[0]=='PING': 
629                     s.send('PONG '+line[1]+'\n')
630                 elif '353' in line: # answer to /names
631                     k = line.index('353')
632                     for item in line[k+1:]:
633                         if item[0:2] == 'E_':
634                             s.send('WHO %s\n'%item)
635                 elif '352' in line: # answer to /who
636                     # warning: this is a horrible hack which apparently works
637                     k = line.index('352')
638                     ip = line[k+4]
639                     ip = socket.gethostbyname(ip)
640                     name = line[k+6]
641                     host = line[k+9]
642                     peer_list[name] = (ip,host)
643                 if time.time() - t > 5*60:
644                     s.send('NAMES #electrum\n')
645                     t = time.time()
646                     peer_list = {}
647         except:
648             traceback.print_exc(file=sys.stdout)
649         finally:
650             sf.close()
651             s.close()
652
653
654 def get_peers_json(_,__):
655     return peer_list.values()
656
657 def http_server_thread():
658     # see http://code.google.com/p/jsonrpclib/
659     from SocketServer import ThreadingMixIn
660     from StratumJSONRPCServer import StratumJSONRPCServer
661     class StratumThreadedJSONRPCServer(ThreadingMixIn, StratumJSONRPCServer): pass
662     server = StratumThreadedJSONRPCServer(( config.get('server','host'), 8081))
663     server.register_function(get_peers_json, 'server.peers')
664     server.register_function(cmd_stop, 'stop')
665     server.register_function(cmd_load, 'load')
666     server.register_function(get_banner, 'server.banner')
667     server.register_function(lambda a,b,c: send_tx(c), 'transaction.broadcast')
668     server.register_function(address_get_history_json, 'address.get_history')
669     server.register_function(add_address_to_session_json, 'address.subscribe')
670     server.register_function(subscribe_to_numblocks_json, 'numblocks.subscribe')
671     server.register_function(client_version_json, 'client.version')
672     server.register_function(create_session_json, 'session.create')   # internal message (not part of protocol)
673     server.register_function(poll_session_json, 'session.poll')       # internal message (not part of protocol)
674     server.serve_forever()
675
676
677 import traceback
678
679
680 if __name__ == '__main__':
681
682     if len(sys.argv)>1:
683         import jsonrpclib
684         server = jsonrpclib.Server('http://%s:8081'%config.get('server','host'))
685         cmd = sys.argv[1]
686         if cmd == 'load':
687             out = server.load(password)
688         elif cmd == 'peers':
689             out = server.server.peers()
690         elif cmd == 'stop':
691             out = server.stop(password)
692         elif cmd == 'clear_cache':
693             out = server.clear_cache(password)
694         elif cmd == 'get_cache':
695             out = server.get_cache(password,sys.argv[2])
696         elif cmd == 'h':
697             out = server.address.get_history(sys.argv[2])
698         elif cmd == 'tx':
699             out = server.transaction.broadcast(sys.argv[2])
700         elif cmd == 'b':
701             out = server.numblocks.subscribe()
702         else:
703             out = "Unknown command: '%s'" % cmd
704         print out
705         sys.exit(0)
706
707
708     # backend
709     import db
710     store = db.MyStore(config,address_queue)
711
712
713     # supported protocols
714     thread.start_new_thread(native_server_thread, ())
715     thread.start_new_thread(tcp_server_thread, ())
716     thread.start_new_thread(http_server_thread, ())
717     thread.start_new_thread(clean_session_thread, ())
718
719     if (config.get('server','irc') == 'yes' ):
720         thread.start_new_thread(irc_thread, ())
721
722     print "starting Electrum server"
723
724
725     while not stopping:
726         block_number = store.main_iteration()
727
728         if block_number != old_block_number:
729             old_block_number = block_number
730             for session_id in sessions_sub_numblocks.keys():
731                 send_numblocks(session_id)
732         # do addresses
733         while True:
734             try:
735                 addr = address_queue.get(False)
736             except:
737                 break
738             do_update_address(addr)
739
740         time.sleep(10)
741     print "server stopped"
742