move abe backend into module
[electrum-server.git] / server.py
1 #!/usr/bin/env python
2 # Copyright(C) 2012 thomasv@gitorious
3
4 # This program is free software: you can redistribute it and/or modify
5 # it under the terms of the GNU Affero General Public License as
6 # published by the Free Software Foundation, either version 3 of the
7 # License, or (at your option) any later version.
8 #
9 # This program is distributed in the hope that it will be useful, but
10 # WITHOUT ANY WARRANTY; without even the implied warranty of
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
12 # Affero General Public License for more details.
13 #
14 # You should have received a copy of the GNU Affero General Public
15 # License along with this program.  If not, see
16 # <http://www.gnu.org/licenses/agpl.html>.
17
18 """
19 Todo:
20    * server should check and return bitcoind status..
21    * improve txpoint sorting
22    * command to check cache
23
24  mempool transactions do not need to be added to the database; it slows it down
25 """
26
27 import abe_backend
28
29
30
31
32 import time, json, socket, operator, thread, ast, sys, re, traceback
33 import ConfigParser
34 from json import dumps, loads
35 import urllib
36
37
38 config = ConfigParser.ConfigParser()
39 # set some defaults, which will be overwritten by the config file
40 config.add_section('server')
41 config.set('server','banner', 'Welcome to Electrum!')
42 config.set('server', 'host', 'localhost')
43 config.set('server', 'port', '50000')
44 config.set('server', 'password', '')
45 config.set('server', 'irc', 'yes')
46 config.set('server', 'ircname', 'Electrum server')
47 config.add_section('database')
48 config.set('database', 'type', 'psycopg2')
49 config.set('database', 'database', 'abe')
50
51 try:
52     f = open('/etc/electrum.conf','r')
53     config.readfp(f)
54     f.close()
55 except:
56     print "Could not read electrum.conf. I will use the default values."
57
58 try:
59     f = open('/etc/electrum.banner','r')
60     config.set('server','banner', f.read())
61     f.close()
62 except:
63     pass
64
65
66 password = config.get('server','password')
67
68 stopping = False
69 block_number = -1
70 sessions = {}
71 sessions_sub_numblocks = {} # sessions that have subscribed to the service
72
73 m_sessions = [{}] # served by http
74
75 peer_list = {}
76
77 wallets = {} # for ultra-light clients such as bccapi
78
79 from Queue import Queue
80 input_queue = Queue()
81 output_queue = Queue()
82
83
84
85
86 def random_string(N):
87     import random, string
88     return ''.join(random.choice(string.ascii_uppercase + string.digits) for x in range(N))
89
90     
91
92 def cmd_stop(_,__,pw):
93     global stopping
94     if password == pw:
95         stopping = True
96         return 'ok'
97     else:
98         return 'wrong password'
99
100 def cmd_load(_,__,pw):
101     if password == pw:
102         return repr( len(sessions) )
103     else:
104         return 'wrong password'
105
106
107
108
109
110 def modified_addresses(a_session):
111     #t1 = time.time()
112     import copy
113     session = copy.deepcopy(a_session)
114     addresses = session['addresses']
115     session['last_time'] = time.time()
116     ret = {}
117     k = 0
118     for addr in addresses:
119         status = store.get_status( addr )
120         msg_id, last_status = addresses.get( addr )
121         if last_status != status:
122             addresses[addr] = msg_id, status
123             ret[addr] = status
124
125     #t2 = time.time() - t1 
126     #if t2 > 10: print "high load:", session_id, "%d/%d"%(k,len(addresses)), t2
127     return ret, addresses
128
129
130 def poll_session(session_id): 
131     # native
132     session = sessions.get(session_id)
133     if session is None:
134         print time.asctime(), "session not found", session_id
135         return -1, {}
136     else:
137         sessions[session_id]['last_time'] = time.time()
138         ret, addresses = modified_addresses(session)
139         if ret: sessions[session_id]['addresses'] = addresses
140         return repr( (block_number,ret))
141
142
143 def poll_session_json(session_id, message_id):
144     session = m_sessions[0].get(session_id)
145     if session is None:
146         raise BaseException("session not found %s"%session_id)
147     else:
148         m_sessions[0][session_id]['last_time'] = time.time()
149         out = []
150         ret, addresses = modified_addresses(session)
151         if ret: 
152             m_sessions[0][session_id]['addresses'] = addresses
153             for addr in ret:
154                 msg_id, status = addresses[addr]
155                 out.append(  { 'id':msg_id, 'result':status } )
156
157         msg_id, last_nb = session.get('numblocks')
158         if last_nb:
159             if last_nb != block_number:
160                 m_sessions[0][session_id]['numblocks'] = msg_id, block_number
161                 out.append( {'id':msg_id, 'result':block_number} )
162
163         return out
164
165
166 def do_update_address(addr):
167     # an address was involved in a transaction; we check if it was subscribed to in a session
168     # the address can be subscribed in several sessions; the cache should ensure that we don't do redundant requests
169
170     for session_id in sessions.keys():
171         session = sessions[session_id]
172         if session.get('type') != 'persistent': continue
173         addresses = session['addresses'].keys()
174
175         if addr in addresses:
176             status = store.get_status( addr )
177             message_id, last_status = session['addresses'][addr]
178             if last_status != status:
179                 #print "sending new status for %s:"%addr, status
180                 send_status(session_id,message_id,addr,status)
181                 sessions[session_id]['addresses'][addr] = (message_id,status)
182
183
184
185 def send_numblocks(session_id):
186     message_id = sessions_sub_numblocks[session_id]
187     out = json.dumps( {'id':message_id, 'result':block_number} )
188     output_queue.put((session_id, out))
189
190 def send_status(session_id, message_id, address, status):
191     out = json.dumps( { 'id':message_id, 'result':status } )
192     output_queue.put((session_id, out))
193
194 def address_get_history_json(_,message_id,address):
195     return store.get_history(address)
196
197 def subscribe_to_numblocks(session_id, message_id):
198     sessions_sub_numblocks[session_id] = message_id
199     send_numblocks(session_id)
200
201 def subscribe_to_numblocks_json(session_id, message_id):
202     global m_sessions
203     m_sessions[0][session_id]['numblocks'] = message_id,block_number
204     return block_number
205
206 def subscribe_to_address(session_id, message_id, address):
207     status = store.get_status(address)
208     sessions[session_id]['addresses'][address] = (message_id, status)
209     sessions[session_id]['last_time'] = time.time()
210     send_status(session_id, message_id, address, status)
211
212 def add_address_to_session_json(session_id, message_id, address):
213     global m_sessions
214     sessions = m_sessions[0]
215     status = store.get_status(address)
216     sessions[session_id]['addresses'][address] = (message_id, status)
217     sessions[session_id]['last_time'] = time.time()
218     m_sessions[0] = sessions
219     return status
220
221 def add_address_to_session(session_id, address):
222     status = store.get_status(address)
223     sessions[session_id]['addresses'][address] = ("", status)
224     sessions[session_id]['last_time'] = time.time()
225     return status
226
227 def new_session(version, addresses):
228     session_id = random_string(10)
229     sessions[session_id] = { 'addresses':{}, 'version':version }
230     for a in addresses:
231         sessions[session_id]['addresses'][a] = ('','')
232     out = repr( (session_id, config.get('server','banner').replace('\\n','\n') ) )
233     sessions[session_id]['last_time'] = time.time()
234     return out
235
236
237 def client_version_json(session_id, _, version):
238     global m_sessions
239     sessions = m_sessions[0]
240     sessions[session_id]['version'] = version
241     m_sessions[0] = sessions
242
243 def create_session_json(_, __):
244     sessions = m_sessions[0]
245     session_id = random_string(10)
246     print "creating session", session_id
247     sessions[session_id] = { 'addresses':{}, 'numblocks':('','') }
248     sessions[session_id]['last_time'] = time.time()
249     m_sessions[0] = sessions
250     return session_id
251
252
253
254 def get_banner(_,__):
255     return config.get('server','banner').replace('\\n','\n')
256
257 def update_session(session_id,addresses):
258     """deprecated in 0.42"""
259     sessions[session_id]['addresses'] = {}
260     for a in addresses:
261         sessions[session_id]['addresses'][a] = ''
262     sessions[session_id]['last_time'] = time.time()
263     return 'ok'
264
265 def native_server_thread():
266     s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
267     s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
268     s.bind((config.get('server','host'), config.getint('server','port')))
269     s.listen(1)
270     while not stopping:
271         conn, addr = s.accept()
272         try:
273             thread.start_new_thread(native_client_thread, (addr, conn,))
274         except:
275             # can't start new thread if there is no memory..
276             traceback.print_exc(file=sys.stdout)
277
278
279 def native_client_thread(ipaddr,conn):
280     #print "client thread", ipaddr
281     try:
282         ipaddr = ipaddr[0]
283         msg = ''
284         while 1:
285             d = conn.recv(1024)
286             msg += d
287             if not d: 
288                 break
289             if '#' in msg:
290                 msg = msg.split('#', 1)[0]
291                 break
292         try:
293             cmd, data = ast.literal_eval(msg)
294         except:
295             print "syntax error", repr(msg), ipaddr
296             conn.close()
297             return
298
299         out = do_command(cmd, data, ipaddr)
300         if out:
301             #print ipaddr, cmd, len(out)
302             try:
303                 conn.send(out)
304             except:
305                 print "error, could not send"
306
307     finally:
308         conn.close()
309
310
311 def timestr():
312     return time.strftime("[%d/%m/%Y-%H:%M:%S]")
313
314 # used by the native handler
315 def do_command(cmd, data, ipaddr):
316
317     if cmd=='b':
318         out = "%d"%block_number
319
320     elif cmd in ['session','new_session']:
321         try:
322             if cmd == 'session':
323                 addresses = ast.literal_eval(data)
324                 version = "old"
325             else:
326                 version, addresses = ast.literal_eval(data)
327                 if version[0]=="0": version = "v" + version
328         except:
329             print "error", data
330             return None
331         print timestr(), "new session", ipaddr, addresses[0] if addresses else addresses, len(addresses), version
332         out = new_session(version, addresses)
333
334     elif cmd=='address.subscribe':
335         try:
336             session_id, addr = ast.literal_eval(data)
337         except:
338             traceback.print_exc(file=sys.stdout)
339             print data
340             return None
341         out = add_address_to_session(session_id,addr)
342
343     elif cmd=='update_session':
344         try:
345             session_id, addresses = ast.literal_eval(data)
346         except:
347             traceback.print_exc(file=sys.stdout)
348             return None
349         print timestr(), "update session", ipaddr, addresses[0] if addresses else addresses, len(addresses)
350         out = update_session(session_id,addresses)
351             
352     elif cmd=='poll': 
353         out = poll_session(data)
354
355     elif cmd == 'h': 
356         # history
357         address = data
358         out = repr( store.get_history( address ) )
359
360     elif cmd == 'load': 
361         out = cmd_load(None,None,data)
362
363     elif cmd =='tx':
364         out = store.send_tx(data)
365         print timestr(), "sent tx:", ipaddr, out
366
367     elif cmd == 'stop':
368         out = cmd_stop(data)
369
370     elif cmd == 'peers':
371         out = repr(peer_list.values())
372
373     else:
374         out = None
375
376     return out
377
378
379
380 ####################################################################
381
382 def tcp_server_thread():
383     thread.start_new_thread(process_input_queue, ())
384     thread.start_new_thread(process_output_queue, ())
385
386     s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
387     s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
388     s.bind((config.get('server','host'), 50001))
389     s.listen(1)
390     while not stopping:
391         conn, addr = s.accept()
392         try:
393             thread.start_new_thread(tcp_client_thread, (addr, conn,))
394         except:
395             # can't start new thread if there is no memory..
396             traceback.print_exc(file=sys.stdout)
397
398
399 def close_session(session_id):
400     #print "lost connection", session_id
401     sessions.pop(session_id)
402     if session_id in sessions_sub_numblocks:
403         sessions_sub_numblocks.pop(session_id)
404
405
406 # one thread per client. put requests in a queue.
407 def tcp_client_thread(ipaddr,conn):
408     """ use a persistent connection. put commands in a queue."""
409
410     print timestr(), "TCP session", ipaddr
411     global sessions
412
413     session_id = random_string(10)
414     sessions[session_id] = { 'conn':conn, 'addresses':{}, 'version':'unknown', 'type':'persistent' }
415
416     ipaddr = ipaddr[0]
417     msg = ''
418
419     while not stopping:
420         try:
421             d = conn.recv(1024)
422         except socket.error:
423             d = ''
424         if not d:
425             close_session(session_id)
426             break
427
428         msg += d
429         while True:
430             s = msg.find('\n')
431             if s ==-1:
432                 break
433             else:
434                 c = msg[0:s].strip()
435                 msg = msg[s+1:]
436                 if c == 'quit': 
437                     conn.close()
438                     close_session(session_id)
439                     return
440                 try:
441                     c = json.loads(c)
442                 except:
443                     print "json error", repr(c)
444                     continue
445                 try:
446                     message_id = c.get('id')
447                     method = c.get('method')
448                     params = c.get('params')
449                 except:
450                     print "syntax error", repr(c), ipaddr
451                     continue
452
453                 # add to queue
454                 input_queue.put((session_id, message_id, method, params))
455
456
457
458 # read commands from the input queue. perform requests, etc. this should be called from the main thread.
459 def process_input_queue():
460     while not stopping:
461         session_id, message_id, method, data = input_queue.get()
462         if session_id not in sessions.keys():
463             continue
464         out = None
465         if method == 'address.subscribe':
466             address = data[0]
467             subscribe_to_address(session_id,message_id,address)
468         elif method == 'numblocks.subscribe':
469             subscribe_to_numblocks(session_id,message_id)
470         elif method == 'client.version':
471             sessions[session_id]['version'] = data[0]
472         elif method == 'server.banner':
473             out = { 'result':config.get('server','banner').replace('\\n','\n') } 
474         elif method == 'server.peers':
475             out = { 'result':peer_list.values() } 
476         elif method == 'address.get_history':
477             address = data[0]
478             out = { 'result':store.get_history( address ) } 
479         elif method == 'transaction.broadcast':
480             txo = store.send_tx(data[0])
481             print "sent tx:", txo
482             out = {'result':txo }
483         else:
484             print "unknown command", method
485         if out:
486             out['id'] = message_id
487             out = json.dumps( out )
488             output_queue.put((session_id, out))
489
490 # this is a separate thread
491 def process_output_queue():
492     while not stopping:
493         session_id, out = output_queue.get()
494         session = sessions.get(session_id)
495         if session: 
496             try:
497                 conn = session.get('conn')
498                 conn.send(out+'\n')
499             except:
500                 close_session(session_id)
501                 
502
503
504
505 ####################################################################
506
507
508
509
510 def clean_session_thread():
511     while not stopping:
512         time.sleep(30)
513         t = time.time()
514         for k,s in sessions.items():
515             if s.get('type') == 'persistent': continue
516             t0 = s['last_time']
517             if t - t0 > 5*60:
518                 sessions.pop(k)
519                 print "lost session", k
520             
521
522 def irc_thread():
523     global peer_list
524     NICK = 'E_'+random_string(10)
525     while not stopping:
526         try:
527             s = socket.socket()
528             s.connect(('irc.freenode.net', 6667))
529             s.send('USER electrum 0 * :'+config.get('server','host')+' '+config.get('server','ircname')+'\n')
530             s.send('NICK '+NICK+'\n')
531             s.send('JOIN #electrum\n')
532             sf = s.makefile('r', 0)
533             t = 0
534             while not stopping:
535                 line = sf.readline()
536                 line = line.rstrip('\r\n')
537                 line = line.split()
538                 if line[0]=='PING': 
539                     s.send('PONG '+line[1]+'\n')
540                 elif '353' in line: # answer to /names
541                     k = line.index('353')
542                     for item in line[k+1:]:
543                         if item[0:2] == 'E_':
544                             s.send('WHO %s\n'%item)
545                 elif '352' in line: # answer to /who
546                     # warning: this is a horrible hack which apparently works
547                     k = line.index('352')
548                     ip = line[k+4]
549                     ip = socket.gethostbyname(ip)
550                     name = line[k+6]
551                     host = line[k+9]
552                     peer_list[name] = (ip,host)
553                 if time.time() - t > 5*60:
554                     s.send('NAMES #electrum\n')
555                     t = time.time()
556                     peer_list = {}
557         except:
558             traceback.print_exc(file=sys.stdout)
559         finally:
560             sf.close()
561             s.close()
562
563
564 def get_peers_json(_,__):
565     return peer_list.values()
566
567 def http_server_thread():
568     # see http://code.google.com/p/jsonrpclib/
569     from SocketServer import ThreadingMixIn
570     from StratumJSONRPCServer import StratumJSONRPCServer
571     class StratumThreadedJSONRPCServer(ThreadingMixIn, StratumJSONRPCServer): pass
572     server = StratumThreadedJSONRPCServer(( config.get('server','host'), 8081))
573     server.register_function(get_peers_json, 'server.peers')
574     server.register_function(cmd_stop, 'stop')
575     server.register_function(cmd_load, 'load')
576     server.register_function(get_banner, 'server.banner')
577     server.register_function(lambda a,b,c: store.send_tx(c), 'transaction.broadcast')
578     server.register_function(address_get_history_json, 'address.get_history')
579     server.register_function(add_address_to_session_json, 'address.subscribe')
580     server.register_function(subscribe_to_numblocks_json, 'numblocks.subscribe')
581     server.register_function(client_version_json, 'client.version')
582     server.register_function(create_session_json, 'session.create')   # internal message (not part of protocol)
583     server.register_function(poll_session_json, 'session.poll')       # internal message (not part of protocol)
584     server.serve_forever()
585
586
587 if __name__ == '__main__':
588
589     if len(sys.argv)>1:
590         import jsonrpclib
591         server = jsonrpclib.Server('http://%s:8081'%config.get('server','host'))
592         cmd = sys.argv[1]
593         if cmd == 'load':
594             out = server.load(password)
595         elif cmd == 'peers':
596             out = server.server.peers()
597         elif cmd == 'stop':
598             out = server.stop(password)
599         elif cmd == 'clear_cache':
600             out = server.clear_cache(password)
601         elif cmd == 'get_cache':
602             out = server.get_cache(password,sys.argv[2])
603         elif cmd == 'h':
604             out = server.address.get_history(sys.argv[2])
605         elif cmd == 'tx':
606             out = server.transaction.broadcast(sys.argv[2])
607         elif cmd == 'b':
608             out = server.numblocks.subscribe()
609         else:
610             out = "Unknown command: '%s'" % cmd
611         print out
612         sys.exit(0)
613
614     # backend
615     store = abe_backend.AbeStore(config)
616
617     # supported protocols
618     thread.start_new_thread(native_server_thread, ())
619     thread.start_new_thread(tcp_server_thread, ())
620     thread.start_new_thread(http_server_thread, ())
621     thread.start_new_thread(clean_session_thread, ())
622
623     if (config.get('server','irc') == 'yes' ):
624         thread.start_new_thread(irc_thread, ())
625
626     print "starting Electrum server"
627
628     old_block_number = None
629     while not stopping:
630         block_number = store.main_iteration()
631
632         if block_number != old_block_number:
633             old_block_number = block_number
634             for session_id in sessions_sub_numblocks.keys():
635                 send_numblocks(session_id)
636         while True:
637             try:
638                 addr = store.address_queue.get(False)
639             except:
640                 break
641             do_update_address(addr)
642
643         time.sleep(10)
644     print "server stopped"
645