not used
[electrum-server.git] / server.py
1 #!/usr/bin/env python
2 # Copyright(C) 2012 thomasv@gitorious
3
4 # This program is free software: you can redistribute it and/or modify
5 # it under the terms of the GNU Affero General Public License as
6 # published by the Free Software Foundation, either version 3 of the
7 # License, or (at your option) any later version.
8 #
9 # This program is distributed in the hope that it will be useful, but
10 # WITHOUT ANY WARRANTY; without even the implied warranty of
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
12 # Affero General Public License for more details.
13 #
14 # You should have received a copy of the GNU Affero General Public
15 # License along with this program.  If not, see
16 # <http://www.gnu.org/licenses/agpl.html>.
17
18 """
19 Todo:
20    * server should check and return bitcoind status..
21    * improve txpoint sorting
22    * command to check cache
23
24  mempool transactions do not need to be added to the database; it slows it down
25 """
26
27 import abe_backend
28
29
30
31
32 import time, json, socket, operator, thread, ast, sys, re, traceback
33 import ConfigParser
34 from json import dumps, loads
35 import urllib
36
37
38 config = ConfigParser.ConfigParser()
39 # set some defaults, which will be overwritten by the config file
40 config.add_section('server')
41 config.set('server','banner', 'Welcome to Electrum!')
42 config.set('server', 'host', 'localhost')
43 config.set('server', 'port', '50000')
44 config.set('server', 'password', '')
45 config.set('server', 'irc', 'yes')
46 config.set('server', 'ircname', 'Electrum server')
47 config.add_section('database')
48 config.set('database', 'type', 'psycopg2')
49 config.set('database', 'database', 'abe')
50
51 try:
52     f = open('/etc/electrum.conf','r')
53     config.readfp(f)
54     f.close()
55 except:
56     print "Could not read electrum.conf. I will use the default values."
57
58 try:
59     f = open('/etc/electrum.banner','r')
60     config.set('server','banner', f.read())
61     f.close()
62 except:
63     pass
64
65
66 password = config.get('server','password')
67
68 stopping = False
69 block_number = -1
70 sessions = {}
71 sessions_sub_numblocks = {} # sessions that have subscribed to the service
72
73 m_sessions = [{}] # served by http
74
75 peer_list = {}
76
77 from Queue import Queue
78 input_queue = Queue()
79 output_queue = Queue()
80
81
82
83
84 def random_string(N):
85     import random, string
86     return ''.join(random.choice(string.ascii_uppercase + string.digits) for x in range(N))
87
88     
89
90 def cmd_stop(_,__,pw):
91     global stopping
92     if password == pw:
93         stopping = True
94         return 'ok'
95     else:
96         return 'wrong password'
97
98 def cmd_load(_,__,pw):
99     if password == pw:
100         return repr( len(sessions) )
101     else:
102         return 'wrong password'
103
104
105
106
107
108 def modified_addresses(a_session):
109     #t1 = time.time()
110     import copy
111     session = copy.deepcopy(a_session)
112     addresses = session['addresses']
113     session['last_time'] = time.time()
114     ret = {}
115     k = 0
116     for addr in addresses:
117         status = store.get_status( addr )
118         msg_id, last_status = addresses.get( addr )
119         if last_status != status:
120             addresses[addr] = msg_id, status
121             ret[addr] = status
122
123     #t2 = time.time() - t1 
124     #if t2 > 10: print "high load:", session_id, "%d/%d"%(k,len(addresses)), t2
125     return ret, addresses
126
127
128 def poll_session(session_id): 
129     # native
130     session = sessions.get(session_id)
131     if session is None:
132         print time.asctime(), "session not found", session_id
133         return -1, {}
134     else:
135         sessions[session_id]['last_time'] = time.time()
136         ret, addresses = modified_addresses(session)
137         if ret: sessions[session_id]['addresses'] = addresses
138         return repr( (block_number,ret))
139
140
141 def poll_session_json(session_id, message_id):
142     session = m_sessions[0].get(session_id)
143     if session is None:
144         raise BaseException("session not found %s"%session_id)
145     else:
146         m_sessions[0][session_id]['last_time'] = time.time()
147         out = []
148         ret, addresses = modified_addresses(session)
149         if ret: 
150             m_sessions[0][session_id]['addresses'] = addresses
151             for addr in ret:
152                 msg_id, status = addresses[addr]
153                 out.append(  { 'id':msg_id, 'result':status } )
154
155         msg_id, last_nb = session.get('numblocks')
156         if last_nb:
157             if last_nb != block_number:
158                 m_sessions[0][session_id]['numblocks'] = msg_id, block_number
159                 out.append( {'id':msg_id, 'result':block_number} )
160
161         return out
162
163
164 def do_update_address(addr):
165     # an address was involved in a transaction; we check if it was subscribed to in a session
166     # the address can be subscribed in several sessions; the cache should ensure that we don't do redundant requests
167
168     for session_id in sessions.keys():
169         session = sessions[session_id]
170         if session.get('type') != 'persistent': continue
171         addresses = session['addresses'].keys()
172
173         if addr in addresses:
174             status = store.get_status( addr )
175             message_id, last_status = session['addresses'][addr]
176             if last_status != status:
177                 #print "sending new status for %s:"%addr, status
178                 send_status(session_id,message_id,addr,status)
179                 sessions[session_id]['addresses'][addr] = (message_id,status)
180
181
182
183 def send_numblocks(session_id):
184     message_id = sessions_sub_numblocks[session_id]
185     out = json.dumps( {'id':message_id, 'result':block_number} )
186     output_queue.put((session_id, out))
187
188 def send_status(session_id, message_id, address, status):
189     out = json.dumps( { 'id':message_id, 'result':status } )
190     output_queue.put((session_id, out))
191
192 def address_get_history_json(_,message_id,address):
193     return store.get_history(address)
194
195 def subscribe_to_numblocks(session_id, message_id):
196     sessions_sub_numblocks[session_id] = message_id
197     send_numblocks(session_id)
198
199 def subscribe_to_numblocks_json(session_id, message_id):
200     global m_sessions
201     m_sessions[0][session_id]['numblocks'] = message_id,block_number
202     return block_number
203
204 def subscribe_to_address(session_id, message_id, address):
205     status = store.get_status(address)
206     sessions[session_id]['addresses'][address] = (message_id, status)
207     sessions[session_id]['last_time'] = time.time()
208     send_status(session_id, message_id, address, status)
209
210 def add_address_to_session_json(session_id, message_id, address):
211     global m_sessions
212     sessions = m_sessions[0]
213     status = store.get_status(address)
214     sessions[session_id]['addresses'][address] = (message_id, status)
215     sessions[session_id]['last_time'] = time.time()
216     m_sessions[0] = sessions
217     return status
218
219 def add_address_to_session(session_id, address):
220     status = store.get_status(address)
221     sessions[session_id]['addresses'][address] = ("", status)
222     sessions[session_id]['last_time'] = time.time()
223     return status
224
225 def new_session(version, addresses):
226     session_id = random_string(10)
227     sessions[session_id] = { 'addresses':{}, 'version':version }
228     for a in addresses:
229         sessions[session_id]['addresses'][a] = ('','')
230     out = repr( (session_id, config.get('server','banner').replace('\\n','\n') ) )
231     sessions[session_id]['last_time'] = time.time()
232     return out
233
234
235 def client_version_json(session_id, _, version):
236     global m_sessions
237     sessions = m_sessions[0]
238     sessions[session_id]['version'] = version
239     m_sessions[0] = sessions
240
241 def create_session_json(_, __):
242     sessions = m_sessions[0]
243     session_id = random_string(10)
244     print "creating session", session_id
245     sessions[session_id] = { 'addresses':{}, 'numblocks':('','') }
246     sessions[session_id]['last_time'] = time.time()
247     m_sessions[0] = sessions
248     return session_id
249
250
251
252 def get_banner(_,__):
253     return config.get('server','banner').replace('\\n','\n')
254
255 def update_session(session_id,addresses):
256     """deprecated in 0.42"""
257     sessions[session_id]['addresses'] = {}
258     for a in addresses:
259         sessions[session_id]['addresses'][a] = ''
260     sessions[session_id]['last_time'] = time.time()
261     return 'ok'
262
263 def native_server_thread():
264     s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
265     s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
266     s.bind((config.get('server','host'), config.getint('server','port')))
267     s.listen(1)
268     while not stopping:
269         conn, addr = s.accept()
270         try:
271             thread.start_new_thread(native_client_thread, (addr, conn,))
272         except:
273             # can't start new thread if there is no memory..
274             traceback.print_exc(file=sys.stdout)
275
276
277 def native_client_thread(ipaddr,conn):
278     #print "client thread", ipaddr
279     try:
280         ipaddr = ipaddr[0]
281         msg = ''
282         while 1:
283             d = conn.recv(1024)
284             msg += d
285             if not d: 
286                 break
287             if '#' in msg:
288                 msg = msg.split('#', 1)[0]
289                 break
290         try:
291             cmd, data = ast.literal_eval(msg)
292         except:
293             print "syntax error", repr(msg), ipaddr
294             conn.close()
295             return
296
297         out = do_command(cmd, data, ipaddr)
298         if out:
299             #print ipaddr, cmd, len(out)
300             try:
301                 conn.send(out)
302             except:
303                 print "error, could not send"
304
305     finally:
306         conn.close()
307
308
309 def timestr():
310     return time.strftime("[%d/%m/%Y-%H:%M:%S]")
311
312 # used by the native handler
313 def do_command(cmd, data, ipaddr):
314
315     if cmd=='b':
316         out = "%d"%block_number
317
318     elif cmd in ['session','new_session']:
319         try:
320             if cmd == 'session':
321                 addresses = ast.literal_eval(data)
322                 version = "old"
323             else:
324                 version, addresses = ast.literal_eval(data)
325                 if version[0]=="0": version = "v" + version
326         except:
327             print "error", data
328             return None
329         print timestr(), "new session", ipaddr, addresses[0] if addresses else addresses, len(addresses), version
330         out = new_session(version, addresses)
331
332     elif cmd=='address.subscribe':
333         try:
334             session_id, addr = ast.literal_eval(data)
335         except:
336             traceback.print_exc(file=sys.stdout)
337             print data
338             return None
339         out = add_address_to_session(session_id,addr)
340
341     elif cmd=='update_session':
342         try:
343             session_id, addresses = ast.literal_eval(data)
344         except:
345             traceback.print_exc(file=sys.stdout)
346             return None
347         print timestr(), "update session", ipaddr, addresses[0] if addresses else addresses, len(addresses)
348         out = update_session(session_id,addresses)
349             
350     elif cmd=='poll': 
351         out = poll_session(data)
352
353     elif cmd == 'h': 
354         # history
355         address = data
356         out = repr( store.get_history( address ) )
357
358     elif cmd == 'load': 
359         out = cmd_load(None,None,data)
360
361     elif cmd =='tx':
362         out = store.send_tx(data)
363         print timestr(), "sent tx:", ipaddr, out
364
365     elif cmd == 'stop':
366         out = cmd_stop(data)
367
368     elif cmd == 'peers':
369         out = repr(peer_list.values())
370
371     else:
372         out = None
373
374     return out
375
376
377
378 ####################################################################
379
380 def tcp_server_thread():
381     thread.start_new_thread(process_input_queue, ())
382     thread.start_new_thread(process_output_queue, ())
383
384     s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
385     s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
386     s.bind((config.get('server','host'), 50001))
387     s.listen(1)
388     while not stopping:
389         conn, addr = s.accept()
390         try:
391             thread.start_new_thread(tcp_client_thread, (addr, conn,))
392         except:
393             # can't start new thread if there is no memory..
394             traceback.print_exc(file=sys.stdout)
395
396
397 def close_session(session_id):
398     #print "lost connection", session_id
399     sessions.pop(session_id)
400     if session_id in sessions_sub_numblocks:
401         sessions_sub_numblocks.pop(session_id)
402
403
404 # one thread per client. put requests in a queue.
405 def tcp_client_thread(ipaddr,conn):
406     """ use a persistent connection. put commands in a queue."""
407
408     print timestr(), "TCP session", ipaddr
409     global sessions
410
411     session_id = random_string(10)
412     sessions[session_id] = { 'conn':conn, 'addresses':{}, 'version':'unknown', 'type':'persistent' }
413
414     ipaddr = ipaddr[0]
415     msg = ''
416
417     while not stopping:
418         try:
419             d = conn.recv(1024)
420         except socket.error:
421             d = ''
422         if not d:
423             close_session(session_id)
424             break
425
426         msg += d
427         while True:
428             s = msg.find('\n')
429             if s ==-1:
430                 break
431             else:
432                 c = msg[0:s].strip()
433                 msg = msg[s+1:]
434                 if c == 'quit': 
435                     conn.close()
436                     close_session(session_id)
437                     return
438                 try:
439                     c = json.loads(c)
440                 except:
441                     print "json error", repr(c)
442                     continue
443                 try:
444                     message_id = c.get('id')
445                     method = c.get('method')
446                     params = c.get('params')
447                 except:
448                     print "syntax error", repr(c), ipaddr
449                     continue
450
451                 # add to queue
452                 input_queue.put((session_id, message_id, method, params))
453
454
455
456 # read commands from the input queue. perform requests, etc. this should be called from the main thread.
457 def process_input_queue():
458     while not stopping:
459         session_id, message_id, method, data = input_queue.get()
460         if session_id not in sessions.keys():
461             continue
462         out = None
463         if method == 'address.subscribe':
464             address = data[0]
465             subscribe_to_address(session_id,message_id,address)
466         elif method == 'numblocks.subscribe':
467             subscribe_to_numblocks(session_id,message_id)
468         elif method == 'client.version':
469             sessions[session_id]['version'] = data[0]
470         elif method == 'server.banner':
471             out = { 'result':config.get('server','banner').replace('\\n','\n') } 
472         elif method == 'server.peers':
473             out = { 'result':peer_list.values() } 
474         elif method == 'address.get_history':
475             address = data[0]
476             out = { 'result':store.get_history( address ) } 
477         elif method == 'transaction.broadcast':
478             txo = store.send_tx(data[0])
479             print "sent tx:", txo
480             out = {'result':txo }
481         else:
482             print "unknown command", method
483         if out:
484             out['id'] = message_id
485             out = json.dumps( out )
486             output_queue.put((session_id, out))
487
488 # this is a separate thread
489 def process_output_queue():
490     while not stopping:
491         session_id, out = output_queue.get()
492         session = sessions.get(session_id)
493         if session: 
494             try:
495                 conn = session.get('conn')
496                 conn.send(out+'\n')
497             except:
498                 close_session(session_id)
499                 
500
501
502
503 ####################################################################
504
505
506
507
508 def clean_session_thread():
509     while not stopping:
510         time.sleep(30)
511         t = time.time()
512         for k,s in sessions.items():
513             if s.get('type') == 'persistent': continue
514             t0 = s['last_time']
515             if t - t0 > 5*60:
516                 sessions.pop(k)
517                 print "lost session", k
518             
519
520 def irc_thread():
521     global peer_list
522     NICK = 'E_'+random_string(10)
523     while not stopping:
524         try:
525             s = socket.socket()
526             s.connect(('irc.freenode.net', 6667))
527             s.send('USER electrum 0 * :'+config.get('server','host')+' '+config.get('server','ircname')+'\n')
528             s.send('NICK '+NICK+'\n')
529             s.send('JOIN #electrum\n')
530             sf = s.makefile('r', 0)
531             t = 0
532             while not stopping:
533                 line = sf.readline()
534                 line = line.rstrip('\r\n')
535                 line = line.split()
536                 if line[0]=='PING': 
537                     s.send('PONG '+line[1]+'\n')
538                 elif '353' in line: # answer to /names
539                     k = line.index('353')
540                     for item in line[k+1:]:
541                         if item[0:2] == 'E_':
542                             s.send('WHO %s\n'%item)
543                 elif '352' in line: # answer to /who
544                     # warning: this is a horrible hack which apparently works
545                     k = line.index('352')
546                     ip = line[k+4]
547                     ip = socket.gethostbyname(ip)
548                     name = line[k+6]
549                     host = line[k+9]
550                     peer_list[name] = (ip,host)
551                 if time.time() - t > 5*60:
552                     s.send('NAMES #electrum\n')
553                     t = time.time()
554                     peer_list = {}
555         except:
556             traceback.print_exc(file=sys.stdout)
557         finally:
558             sf.close()
559             s.close()
560
561
562 def get_peers_json(_,__):
563     return peer_list.values()
564
565 def http_server_thread():
566     # see http://code.google.com/p/jsonrpclib/
567     from SocketServer import ThreadingMixIn
568     from StratumJSONRPCServer import StratumJSONRPCServer
569     class StratumThreadedJSONRPCServer(ThreadingMixIn, StratumJSONRPCServer): pass
570     server = StratumThreadedJSONRPCServer(( config.get('server','host'), 8081))
571     server.register_function(get_peers_json, 'server.peers')
572     server.register_function(cmd_stop, 'stop')
573     server.register_function(cmd_load, 'load')
574     server.register_function(get_banner, 'server.banner')
575     server.register_function(lambda a,b,c: store.send_tx(c), 'transaction.broadcast')
576     server.register_function(address_get_history_json, 'address.get_history')
577     server.register_function(add_address_to_session_json, 'address.subscribe')
578     server.register_function(subscribe_to_numblocks_json, 'numblocks.subscribe')
579     server.register_function(client_version_json, 'client.version')
580     server.register_function(create_session_json, 'session.create')   # internal message (not part of protocol)
581     server.register_function(poll_session_json, 'session.poll')       # internal message (not part of protocol)
582     server.serve_forever()
583
584
585 if __name__ == '__main__':
586
587     if len(sys.argv)>1:
588         import jsonrpclib
589         server = jsonrpclib.Server('http://%s:8081'%config.get('server','host'))
590         cmd = sys.argv[1]
591         if cmd == 'load':
592             out = server.load(password)
593         elif cmd == 'peers':
594             out = server.server.peers()
595         elif cmd == 'stop':
596             out = server.stop(password)
597         elif cmd == 'clear_cache':
598             out = server.clear_cache(password)
599         elif cmd == 'get_cache':
600             out = server.get_cache(password,sys.argv[2])
601         elif cmd == 'h':
602             out = server.address.get_history(sys.argv[2])
603         elif cmd == 'tx':
604             out = server.transaction.broadcast(sys.argv[2])
605         elif cmd == 'b':
606             out = server.numblocks.subscribe()
607         else:
608             out = "Unknown command: '%s'" % cmd
609         print out
610         sys.exit(0)
611
612     # backend
613     store = abe_backend.AbeStore(config)
614
615     # supported protocols
616     thread.start_new_thread(native_server_thread, ())
617     thread.start_new_thread(tcp_server_thread, ())
618     thread.start_new_thread(http_server_thread, ())
619     thread.start_new_thread(clean_session_thread, ())
620
621     if (config.get('server','irc') == 'yes' ):
622         thread.start_new_thread(irc_thread, ())
623
624     print "starting Electrum server"
625
626     old_block_number = None
627     while not stopping:
628         block_number = store.main_iteration()
629
630         if block_number != old_block_number:
631             old_block_number = block_number
632             for session_id in sessions_sub_numblocks.keys():
633                 send_numblocks(session_id)
634         while True:
635             try:
636                 addr = store.address_queue.get(False)
637             except:
638                 break
639             do_update_address(addr)
640
641         time.sleep(10)
642     print "server stopped"
643