separate backend
[electrum-server.git] / server.py
1 #!/usr/bin/env python
2 # Copyright(C) 2011 thomasv@gitorious
3
4 # This program is free software: you can redistribute it and/or modify
5 # it under the terms of the GNU Affero General Public License as
6 # published by the Free Software Foundation, either version 3 of the
7 # License, or (at your option) any later version.
8 #
9 # This program is distributed in the hope that it will be useful, but
10 # WITHOUT ANY WARRANTY; without even the implied warranty of
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
12 # Affero General Public License for more details.
13 #
14 # You should have received a copy of the GNU Affero General Public
15 # License along with this program.  If not, see
16 # <http://www.gnu.org/licenses/agpl.html>.
17
18 """
19 Todo:
20    * server should check and return bitcoind status..
21    * improve txpoint sorting
22    * command to check cache
23
24  mempool transactions do not need to be added to the database; it slows it down
25 """
26
27
28 import time, json, socket, operator, thread, ast, sys,re
29
30
31 import ConfigParser
32 from json import dumps, loads
33 import urllib
34
35 # we need to import electrum
36 sys.path.append('../client/')
37 from wallet import Wallet
38 from interface import Interface
39
40
41 config = ConfigParser.ConfigParser()
42 # set some defaults, which will be overwritten by the config file
43 config.add_section('server')
44 config.set('server','banner', 'Welcome to Electrum!')
45 config.set('server', 'host', 'localhost')
46 config.set('server', 'port', '50000')
47 config.set('server', 'password', '')
48 config.set('server', 'irc', 'yes')
49 config.set('server', 'ircname', 'Electrum server')
50 config.add_section('database')
51 config.set('database', 'type', 'psycopg2')
52 config.set('database', 'database', 'abe')
53
54 try:
55     f = open('/etc/electrum.conf','r')
56     config.readfp(f)
57     f.close()
58 except:
59     print "Could not read electrum.conf. I will use the default values."
60
61 try:
62     f = open('/etc/electrum.banner','r')
63     config.set('server','banner', f.read())
64     f.close()
65 except:
66     pass
67
68
69 password = config.get('server','password')
70
71 stopping = False
72 block_number = -1
73 old_block_number = -1
74 sessions = {}
75 sessions_sub_numblocks = {} # sessions that have subscribed to the service
76
77 m_sessions = [{}] # served by http
78
79 peer_list = {}
80
81 wallets = {} # for ultra-light clients such as bccapi
82
83 from Queue import Queue
84 input_queue = Queue()
85 output_queue = Queue()
86 address_queue = Queue()
87
88
89
90
91
92 class Direct_Interface(Interface):
93     def __init__(self):
94         pass
95
96     def handler(self, method, params = ''):
97         cmds = {'session.new':new_session,
98                 'session.poll':poll_session,
99                 'session.update':update_session,
100                 'transaction.broadcast':send_tx,
101                 'address.get_history':store.get_history
102                 }
103         func = cmds[method]
104         return func( params )
105
106
107
108 def send_tx(tx):
109     postdata = dumps({"method": 'importtransaction', 'params': [tx], 'id':'jsonrpc'})
110     respdata = urllib.urlopen(bitcoind_url, postdata).read()
111     r = loads(respdata)
112     if r['error'] != None:
113         out = "error: transaction rejected by memorypool\n"+tx
114     else:
115         out = r['result']
116     return out
117
118
119
120 def random_string(N):
121     import random, string
122     return ''.join(random.choice(string.ascii_uppercase + string.digits) for x in range(N))
123
124     
125
126 def cmd_stop(_,__,pw):
127     global stopping
128     if password == pw:
129         stopping = True
130         return 'ok'
131     else:
132         return 'wrong password'
133
134 def cmd_load(_,__,pw):
135     if password == pw:
136         return repr( len(sessions) )
137     else:
138         return 'wrong password'
139
140
141 def clear_cache(_,__,pw):
142     if password == pw:
143         store.tx_cache = {}
144         return 'ok'
145     else:
146         return 'wrong password'
147
148 def get_cache(_,__,pw,addr):
149     if password == pw:
150         return store.tx_cache.get(addr)
151     else:
152         return 'wrong password'
153
154
155
156
157 def modified_addresses(session):
158     if 1:
159         t1 = time.time()
160         addresses = session['addresses']
161         session['last_time'] = time.time()
162         ret = {}
163         k = 0
164         for addr in addresses:
165             if store.tx_cache.get( addr ) is not None: k += 1
166             status = get_address_status( addr )
167             msg_id, last_status = addresses.get( addr )
168             if last_status != status:
169                 addresses[addr] = msg_id, status
170                 ret[addr] = status
171
172         t2 = time.time() - t1 
173         #if t2 > 10: print "high load:", session_id, "%d/%d"%(k,len(addresses)), t2
174         return ret, addresses
175
176
177 def poll_session(session_id): 
178     # native
179     session = sessions.get(session_id)
180     if session is None:
181         print time.asctime(), "session not found", session_id
182         return -1, {}
183     else:
184         ret, addresses = modified_addresses(session)
185         if ret: sessions[session_id]['addresses'] = addresses
186         return repr( (block_number,ret))
187
188
189 def poll_session_json(session_id, message_id):
190     session = m_sessions[0].get(session_id)
191     if session is None:
192         raise BaseException("session not found %s"%session_id)
193     else:
194         out = []
195         ret, addresses = modified_addresses(session)
196         if ret: 
197             m_sessions[0][session_id]['addresses'] = addresses
198             for addr in ret:
199                 msg_id, status = addresses[addr]
200                 out.append(  { 'id':msg_id, 'result':status } )
201
202         msg_id, last_nb = session.get('numblocks')
203         if last_nb:
204             if last_nb != block_number:
205                 m_sessions[0][session_id]['numblocks'] = msg_id, block_number
206                 out.append( {'id':msg_id, 'result':block_number} )
207
208         return out
209
210
211 def do_update_address(addr):
212     # an address was involved in a transaction; we check if it was subscribed to in a session
213     # the address can be subscribed in several sessions; the cache should ensure that we don't do redundant requests
214
215     for session_id in sessions.keys():
216         session = sessions[session_id]
217         if session.get('type') != 'persistent': continue
218         addresses = session['addresses'].keys()
219
220         if addr in addresses:
221             status = get_address_status( addr )
222             message_id, last_status = session['addresses'][addr]
223             if last_status != status:
224                 #print "sending new status for %s:"%addr, status
225                 send_status(session_id,message_id,addr,status)
226                 sessions[session_id]['addresses'][addr] = (message_id,status)
227
228 def get_address_status(addr):
229     # get address status, i.e. the last block for that address.
230     tx_points = store.get_history(addr)
231     if not tx_points:
232         status = None
233     else:
234         lastpoint = tx_points[-1]
235         status = lastpoint['blk_hash']
236         # this is a temporary hack; move it up once old clients have disappeared
237         if status == 'mempool': # and session['version'] != "old":
238             status = status + ':%d'% len(tx_points)
239     return status
240
241
242 def send_numblocks(session_id):
243     message_id = sessions_sub_numblocks[session_id]
244     out = json.dumps( {'id':message_id, 'result':block_number} )
245     output_queue.put((session_id, out))
246
247 def send_status(session_id, message_id, address, status):
248     out = json.dumps( { 'id':message_id, 'result':status } )
249     output_queue.put((session_id, out))
250
251 def address_get_history_json(_,message_id,address):
252     return store.get_history(address)
253
254 def subscribe_to_numblocks(session_id, message_id):
255     sessions_sub_numblocks[session_id] = message_id
256     send_numblocks(session_id)
257
258 def subscribe_to_numblocks_json(session_id, message_id):
259     global m_sessions
260     m_sessions[0][session_id]['numblocks'] = message_id,block_number
261     return block_number
262
263 def subscribe_to_address(session_id, message_id, address):
264     status = get_address_status(address)
265     sessions[session_id]['addresses'][address] = (message_id, status)
266     sessions[session_id]['last_time'] = time.time()
267     send_status(session_id, message_id, address, status)
268
269 def add_address_to_session_json(session_id, message_id, address):
270     global m_sessions
271     sessions = m_sessions[0]
272     status = get_address_status(address)
273     sessions[session_id]['addresses'][address] = (message_id, status)
274     sessions[session_id]['last_time'] = time.time()
275     m_sessions[0] = sessions
276     return status
277
278 def add_address_to_session(session_id, address):
279     status = get_address_status(address)
280     sessions[session_id]['addresses'][addr] = ("", status)
281     sessions[session_id]['last_time'] = time.time()
282     return status
283
284 def new_session(version, addresses):
285     session_id = random_string(10)
286     sessions[session_id] = { 'addresses':{}, 'version':version }
287     for a in addresses:
288         sessions[session_id]['addresses'][a] = ('','')
289     out = repr( (session_id, config.get('server','banner').replace('\\n','\n') ) )
290     sessions[session_id]['last_time'] = time.time()
291     return out
292
293
294 def client_version_json(session_id, _, version):
295     global m_sessions
296     sessions = m_sessions[0]
297     sessions[session_id]['version'] = version
298     m_sessions[0] = sessions
299
300 def create_session_json(_, __):
301     sessions = m_sessions[0]
302     session_id = random_string(10)
303     print "creating session", session_id
304     sessions[session_id] = { 'addresses':{}, 'numblocks':('','') }
305     sessions[session_id]['last_time'] = time.time()
306     m_sessions[0] = sessions
307     return session_id
308
309
310
311 def get_banner(_,__):
312     return config.get('server','banner').replace('\\n','\n')
313
314 def update_session(session_id,addresses):
315     """deprecated in 0.42"""
316     sessions[session_id]['addresses'] = {}
317     for a in addresses:
318         sessions[session_id]['addresses'][a] = ''
319     sessions[session_id]['last_time'] = time.time()
320     return 'ok'
321
322 def native_server_thread():
323     s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
324     s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
325     s.bind((config.get('server','host'), config.getint('server','port')))
326     s.listen(1)
327     while not stopping:
328         conn, addr = s.accept()
329         try:
330             thread.start_new_thread(native_client_thread, (addr, conn,))
331         except:
332             # can't start new thread if there is no memory..
333             traceback.print_exc(file=sys.stdout)
334
335
336 def native_client_thread(ipaddr,conn):
337     #print "client thread", ipaddr
338     try:
339         ipaddr = ipaddr[0]
340         msg = ''
341         while 1:
342             d = conn.recv(1024)
343             msg += d
344             if not d: 
345                 break
346             if '#' in msg:
347                 msg = msg.split('#', 1)[0]
348                 break
349         try:
350             cmd, data = ast.literal_eval(msg)
351         except:
352             print "syntax error", repr(msg), ipaddr
353             conn.close()
354             return
355
356         out = do_command(cmd, data, ipaddr)
357         if out:
358             #print ipaddr, cmd, len(out)
359             try:
360                 conn.send(out)
361             except:
362                 print "error, could not send"
363
364     finally:
365         conn.close()
366
367
368 def timestr():
369     return time.strftime("[%d/%m/%Y-%H:%M:%S]")
370
371 # used by the native handler
372 def do_command(cmd, data, ipaddr):
373
374     if cmd=='b':
375         out = "%d"%block_number
376
377     elif cmd in ['session','new_session']:
378         try:
379             if cmd == 'session':
380                 addresses = ast.literal_eval(data)
381                 version = "old"
382             else:
383                 version, addresses = ast.literal_eval(data)
384                 if version[0]=="0": version = "v" + version
385         except:
386             print "error", data
387             return None
388         print timestr(), "new session", ipaddr, addresses[0] if addresses else addresses, len(addresses), version
389         out = new_session(version, addresses)
390
391     elif cmd=='address.subscribe':
392         try:
393             session_id, addr = ast.literal_eval(data)
394         except:
395             print "error"
396             return None
397         out = add_address_to_session(session_id,addr)
398
399     elif cmd=='update_session':
400         try:
401             session_id, addresses = ast.literal_eval(data)
402         except:
403             print "error"
404             return None
405         print timestr(), "update session", ipaddr, addresses[0] if addresses else addresses, len(addresses)
406         out = update_session(session_id,addresses)
407
408     elif cmd == 'bccapi_login':
409         import electrum
410         print "data",data
411         v, k = ast.literal_eval(data)
412         master_public_key = k.decode('hex') # todo: sanitize. no need to decode twice...
413         print master_public_key
414         wallet_id = random_string(10)
415         w = Wallet( Direct_Interface() )
416         w.master_public_key = master_public_key.decode('hex')
417         w.synchronize()
418         wallets[wallet_id] = w
419         out = wallet_id
420         print "wallets", wallets
421
422     elif cmd == 'bccapi_getAccountInfo':
423         from wallet import int_to_hex
424         v, wallet_id = ast.literal_eval(data)
425         w = wallets.get(wallet_id)
426         if w is not None:
427             num = len(w.addresses)
428             c, u = w.get_balance()
429             out = int_to_hex(num,4) + int_to_hex(c,8) + int_to_hex( c+u, 8 )
430             out = out.decode('hex')
431         else:
432             print "error",data
433             out = "error"
434
435     elif cmd == 'bccapi_getAccountStatement':
436         from wallet import int_to_hex
437         v, wallet_id = ast.literal_eval(data)
438         w = wallets.get(wallet_id)
439         if w is not None:
440             num = len(w.addresses)
441             c, u = w.get_balance()
442             total_records = num_records = 0
443             out = int_to_hex(num,4) + int_to_hex(c,8) + int_to_hex( c+u, 8 ) + int_to_hex( total_records ) + int_to_hex( num_records )
444             out = out.decode('hex')
445         else:
446             print "error",data
447             out = "error"
448
449     elif cmd == 'bccapi_getSendCoinForm':
450         out = ''
451
452     elif cmd == 'bccapi_submitTransaction':
453         out = ''
454             
455     elif cmd=='poll': 
456         out = poll_session(data)
457
458     elif cmd == 'h': 
459         # history
460         address = data
461         out = repr( store.get_history( address ) )
462
463     elif cmd == 'load': 
464         out = cmd_load(data)
465
466     elif cmd =='tx':
467         out = send_tx(data)
468         print timestr(), "sent tx:", ipaddr, out
469
470     elif cmd == 'stop':
471         out = cmd_stop(data)
472
473     elif cmd == 'peers':
474         out = repr(peer_list.values())
475
476     else:
477         out = None
478
479     return out
480
481
482
483 ####################################################################
484
485 def tcp_server_thread():
486     thread.start_new_thread(process_input_queue, ())
487     thread.start_new_thread(process_output_queue, ())
488
489     s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
490     s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
491     s.bind((config.get('server','host'), 50001))
492     s.listen(1)
493     while not stopping:
494         conn, addr = s.accept()
495         try:
496             thread.start_new_thread(tcp_client_thread, (addr, conn,))
497         except:
498             # can't start new thread if there is no memory..
499             traceback.print_exc(file=sys.stdout)
500
501
502 def close_session(session_id):
503     #print "lost connection", session_id
504     sessions.pop(session_id)
505     if session_id in sessions_sub_numblocks:
506         sessions_sub_numblocks.pop(session_id)
507
508
509 # one thread per client. put requests in a queue.
510 def tcp_client_thread(ipaddr,conn):
511     """ use a persistent connection. put commands in a queue."""
512
513     print timestr(), "TCP session", ipaddr
514     global sessions
515
516     session_id = random_string(10)
517     sessions[session_id] = { 'conn':conn, 'addresses':{}, 'version':'unknown', 'type':'persistent' }
518
519     ipaddr = ipaddr[0]
520     msg = ''
521
522     while not stopping:
523         try:
524             d = conn.recv(1024)
525         except socket.error:
526             d = ''
527         if not d:
528             close_session(session_id)
529             break
530
531         msg += d
532         while True:
533             s = msg.find('\n')
534             if s ==-1:
535                 break
536             else:
537                 c = msg[0:s].strip()
538                 msg = msg[s+1:]
539                 if c == 'quit': 
540                     conn.close()
541                     close_session(session_id)
542                     return
543                 try:
544                     c = json.loads(c)
545                 except:
546                     print "json error", repr(c)
547                     continue
548                 try:
549                     message_id = c.get('id')
550                     method = c.get('method')
551                     params = c.get('params')
552                 except:
553                     print "syntax error", repr(c), ipaddr
554                     continue
555
556                 # add to queue
557                 input_queue.put((session_id, message_id, method, params))
558
559
560
561 # read commands from the input queue. perform requests, etc. this should be called from the main thread.
562 def process_input_queue():
563     while not stopping:
564         session_id, message_id, method, data = input_queue.get()
565         if session_id not in sessions.keys():
566             continue
567         out = None
568         if method == 'address.subscribe':
569             address = data[0]
570             subscribe_to_address(session_id,message_id,address)
571         elif method == 'numblocks.subscribe':
572             subscribe_to_numblocks(session_id,message_id)
573         elif method == 'client.version':
574             sessions[session_id]['version'] = data[0]
575         elif method == 'server.banner':
576             out = { 'result':config.get('server','banner').replace('\\n','\n') } 
577         elif method == 'server.peers':
578             out = { 'result':peer_list.values() } 
579         elif method == 'address.get_history':
580             address = data[0]
581             out = { 'result':store.get_history( address ) } 
582         elif method == 'transaction.broadcast':
583             postdata = dumps({"method": 'importtransaction', 'params': [data], 'id':'jsonrpc'})
584             txo = urllib.urlopen(bitcoind_url, postdata).read()
585             print "sent tx:", txo
586             out = json.loads(txo)
587         else:
588             print "unknown command", method
589         if out:
590             out['id'] = message_id
591             out = json.dumps( out )
592             output_queue.put((session_id, out))
593
594 # this is a separate thread
595 def process_output_queue():
596     while not stopping:
597         session_id, out = output_queue.get()
598         session = sessions.get(session_id)
599         if session: 
600             try:
601                 conn = session.get('conn')
602                 conn.send(out+'\n')
603             except:
604                 close_session(session_id)
605                 
606
607
608
609 ####################################################################
610
611
612
613
614 def clean_session_thread():
615     while not stopping:
616         time.sleep(30)
617         t = time.time()
618         for k,s in sessions.items():
619             if s.get('type') == 'persistent': continue
620             t0 = s['last_time']
621             if t - t0 > 5*60:
622                 sessions.pop(k)
623                 print "lost session", k
624             
625
626 def irc_thread():
627     global peer_list
628     NICK = 'E_'+random_string(10)
629     while not stopping:
630         try:
631             s = socket.socket()
632             s.connect(('irc.freenode.net', 6667))
633             s.send('USER electrum 0 * :'+config.get('server','host')+' '+config.get('server','ircname')+'\n')
634             s.send('NICK '+NICK+'\n')
635             s.send('JOIN #electrum\n')
636             sf = s.makefile('r', 0)
637             t = 0
638             while not stopping:
639                 line = sf.readline()
640                 line = line.rstrip('\r\n')
641                 line = line.split()
642                 if line[0]=='PING': 
643                     s.send('PONG '+line[1]+'\n')
644                 elif '353' in line: # answer to /names
645                     k = line.index('353')
646                     for item in line[k+1:]:
647                         if item[0:2] == 'E_':
648                             s.send('WHO %s\n'%item)
649                 elif '352' in line: # answer to /who
650                     # warning: this is a horrible hack which apparently works
651                     k = line.index('352')
652                     ip = line[k+4]
653                     ip = socket.gethostbyname(ip)
654                     name = line[k+6]
655                     host = line[k+9]
656                     peer_list[name] = (ip,host)
657                 if time.time() - t > 5*60:
658                     s.send('NAMES #electrum\n')
659                     t = time.time()
660                     peer_list = {}
661         except:
662             traceback.print_exc(file=sys.stdout)
663         finally:
664             sf.close()
665             s.close()
666
667
668 def get_peers_json(_,__):
669     return peer_list.values()
670
671 def http_server_thread(store):
672     # see http://code.google.com/p/jsonrpclib/
673     from SocketServer import ThreadingMixIn
674     from StratumJSONRPCServer import StratumJSONRPCServer
675     class StratumThreadedJSONRPCServer(ThreadingMixIn, StratumJSONRPCServer): pass
676     server = StratumThreadedJSONRPCServer(( config.get('server','host'), 8081))
677     server.register_function(get_peers_json, 'server.peers')
678     server.register_function(cmd_stop, 'stop')
679     server.register_function(cmd_load, 'load')
680     server.register_function(clear_cache, 'clear_cache')
681     server.register_function(get_cache, 'get_cache')
682     server.register_function(get_banner, 'server.banner')
683     server.register_function(lambda a,b,c: send_tx(c), 'transaction.broadcast')
684     server.register_function(address_get_history_json, 'address.get_history')
685     server.register_function(add_address_to_session_json, 'address.subscribe')
686     server.register_function(subscribe_to_numblocks_json, 'numblocks.subscribe')
687     server.register_function(client_version_json, 'client.version')
688     server.register_function(create_session_json, 'session.create')   # internal message (not part of protocol)
689     server.register_function(poll_session_json, 'session.poll')       # internal message (not part of protocol)
690     server.serve_forever()
691
692
693 import traceback
694
695
696 if __name__ == '__main__':
697
698     if len(sys.argv)>1:
699         import jsonrpclib
700         server = jsonrpclib.Server('http://%s:8081'%config.get('server','host'))
701         cmd = sys.argv[1]
702         if cmd == 'load':
703             out = server.load(password)
704         elif cmd == 'peers':
705             out = server.server.peers()
706         elif cmd == 'stop':
707             out = server.stop(password)
708         elif cmd == 'clear_cache':
709             out = server.clear_cache(password)
710         elif cmd == 'get_cache':
711             out = server.get_cache(password,sys.argv[2])
712         elif cmd == 'h':
713             out = server.address.get_history(sys.argv[2])
714         elif cmd == 'tx':
715             out = server.transaction.broadcast(sys.argv[2])
716         elif cmd == 'b':
717             out = server.numblocks.subscribe()
718         else:
719             out = "Unknown command: '%s'" % cmd
720         print out
721         sys.exit(0)
722
723
724     # backend
725     import db
726     store = db.MyStore(config,address_queue)
727
728
729     # supported protocols
730     thread.start_new_thread(native_server_thread, ())
731     thread.start_new_thread(tcp_server_thread, ())
732     thread.start_new_thread(http_server_thread, (store,))
733     thread.start_new_thread(clean_session_thread, ())
734
735     if (config.get('server','irc') == 'yes' ):
736         thread.start_new_thread(irc_thread, ())
737
738     print "starting Electrum server"
739
740
741     while not stopping:
742         block_number = store.main_iteration()
743
744         if block_number != old_block_number:
745             old_block_number = block_number
746             for session_id in sessions_sub_numblocks.keys():
747                 send_numblocks(session_id)
748         # do addresses
749         while True:
750             try:
751                 addr = address_queue.get(False)
752             except:
753                 break
754             do_update_address(addr)
755
756         time.sleep(10)
757     print "server stopped"
758