fccf5d20263f3b2857735e4de35963b45823a78f
[electrum-server.git] / server.py
1 #!/usr/bin/env python
2 # Copyright(C) 2012 thomasv@gitorious
3
4 # This program is free software: you can redistribute it and/or modify
5 # it under the terms of the GNU Affero General Public License as
6 # published by the Free Software Foundation, either version 3 of the
7 # License, or (at your option) any later version.
8 #
9 # This program is distributed in the hope that it will be useful, but
10 # WITHOUT ANY WARRANTY; without even the implied warranty of
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
12 # Affero General Public License for more details.
13 #
14 # You should have received a copy of the GNU Affero General Public
15 # License along with this program.  If not, see
16 # <http://www.gnu.org/licenses/agpl.html>.
17
18 """
19 Todo:
20    * server should check and return bitcoind status..
21    * improve txpoint sorting
22    * command to check cache
23
24  mempool transactions do not need to be added to the database; it slows it down
25 """
26
27
28 import time, json, socket, operator, thread, ast, sys,re
29
30
31 import ConfigParser
32 from json import dumps, loads
33 import urllib
34
35
36 config = ConfigParser.ConfigParser()
37 # set some defaults, which will be overwritten by the config file
38 config.add_section('server')
39 config.set('server','banner', 'Welcome to Electrum!')
40 config.set('server', 'host', 'localhost')
41 config.set('server', 'port', '50000')
42 config.set('server', 'password', '')
43 config.set('server', 'irc', 'yes')
44 config.set('server', 'ircname', 'Electrum server')
45 config.add_section('database')
46 config.set('database', 'type', 'psycopg2')
47 config.set('database', 'database', 'abe')
48
49 try:
50     f = open('/etc/electrum.conf','r')
51     config.readfp(f)
52     f.close()
53 except:
54     print "Could not read electrum.conf. I will use the default values."
55
56 try:
57     f = open('/etc/electrum.banner','r')
58     config.set('server','banner', f.read())
59     f.close()
60 except:
61     pass
62
63
64 password = config.get('server','password')
65
66 stopping = False
67 block_number = -1
68 old_block_number = -1
69 sessions = {}
70 sessions_sub_numblocks = {} # sessions that have subscribed to the service
71
72 m_sessions = [{}] # served by http
73
74 peer_list = {}
75
76 wallets = {} # for ultra-light clients such as bccapi
77
78 from Queue import Queue
79 input_queue = Queue()
80 output_queue = Queue()
81 address_queue = Queue()
82
83
84
85
86
87
88
89 def random_string(N):
90     import random, string
91     return ''.join(random.choice(string.ascii_uppercase + string.digits) for x in range(N))
92
93     
94
95 def cmd_stop(_,__,pw):
96     global stopping
97     if password == pw:
98         stopping = True
99         return 'ok'
100     else:
101         return 'wrong password'
102
103 def cmd_load(_,__,pw):
104     if password == pw:
105         return repr( len(sessions) )
106     else:
107         return 'wrong password'
108
109
110
111
112
113 def modified_addresses(session):
114     if 1:
115         t1 = time.time()
116         addresses = session['addresses']
117         session['last_time'] = time.time()
118         ret = {}
119         k = 0
120         for addr in addresses:
121             status = get_address_status( addr )
122             msg_id, last_status = addresses.get( addr )
123             if last_status != status:
124                 addresses[addr] = msg_id, status
125                 ret[addr] = status
126
127         t2 = time.time() - t1 
128         #if t2 > 10: print "high load:", session_id, "%d/%d"%(k,len(addresses)), t2
129         return ret, addresses
130
131
132 def poll_session(session_id): 
133     # native
134     session = sessions.get(session_id)
135     if session is None:
136         print time.asctime(), "session not found", session_id
137         return -1, {}
138     else:
139         ret, addresses = modified_addresses(session)
140         if ret: sessions[session_id]['addresses'] = addresses
141         return repr( (block_number,ret))
142
143
144 def poll_session_json(session_id, message_id):
145     session = m_sessions[0].get(session_id)
146     if session is None:
147         raise BaseException("session not found %s"%session_id)
148     else:
149         out = []
150         ret, addresses = modified_addresses(session)
151         if ret: 
152             m_sessions[0][session_id]['addresses'] = addresses
153             for addr in ret:
154                 msg_id, status = addresses[addr]
155                 out.append(  { 'id':msg_id, 'result':status } )
156
157         msg_id, last_nb = session.get('numblocks')
158         if last_nb:
159             if last_nb != block_number:
160                 m_sessions[0][session_id]['numblocks'] = msg_id, block_number
161                 out.append( {'id':msg_id, 'result':block_number} )
162
163         return out
164
165
166 def do_update_address(addr):
167     # an address was involved in a transaction; we check if it was subscribed to in a session
168     # the address can be subscribed in several sessions; the cache should ensure that we don't do redundant requests
169
170     for session_id in sessions.keys():
171         session = sessions[session_id]
172         if session.get('type') != 'persistent': continue
173         addresses = session['addresses'].keys()
174
175         if addr in addresses:
176             status = get_address_status( addr )
177             message_id, last_status = session['addresses'][addr]
178             if last_status != status:
179                 #print "sending new status for %s:"%addr, status
180                 send_status(session_id,message_id,addr,status)
181                 sessions[session_id]['addresses'][addr] = (message_id,status)
182
183 def get_address_status(addr):
184     # get address status, i.e. the last block for that address.
185     tx_points = store.get_history(addr)
186     if not tx_points:
187         status = None
188     else:
189         lastpoint = tx_points[-1]
190         status = lastpoint['blk_hash']
191         # this is a temporary hack; move it up once old clients have disappeared
192         if status == 'mempool': # and session['version'] != "old":
193             status = status + ':%d'% len(tx_points)
194     return status
195
196
197 def send_numblocks(session_id):
198     message_id = sessions_sub_numblocks[session_id]
199     out = json.dumps( {'id':message_id, 'result':block_number} )
200     output_queue.put((session_id, out))
201
202 def send_status(session_id, message_id, address, status):
203     out = json.dumps( { 'id':message_id, 'result':status } )
204     output_queue.put((session_id, out))
205
206 def address_get_history_json(_,message_id,address):
207     return store.get_history(address)
208
209 def subscribe_to_numblocks(session_id, message_id):
210     sessions_sub_numblocks[session_id] = message_id
211     send_numblocks(session_id)
212
213 def subscribe_to_numblocks_json(session_id, message_id):
214     global m_sessions
215     m_sessions[0][session_id]['numblocks'] = message_id,block_number
216     return block_number
217
218 def subscribe_to_address(session_id, message_id, address):
219     status = get_address_status(address)
220     sessions[session_id]['addresses'][address] = (message_id, status)
221     sessions[session_id]['last_time'] = time.time()
222     send_status(session_id, message_id, address, status)
223
224 def add_address_to_session_json(session_id, message_id, address):
225     global m_sessions
226     sessions = m_sessions[0]
227     status = get_address_status(address)
228     sessions[session_id]['addresses'][address] = (message_id, status)
229     sessions[session_id]['last_time'] = time.time()
230     m_sessions[0] = sessions
231     return status
232
233 def add_address_to_session(session_id, address):
234     status = get_address_status(address)
235     sessions[session_id]['addresses'][addr] = ("", status)
236     sessions[session_id]['last_time'] = time.time()
237     return status
238
239 def new_session(version, addresses):
240     session_id = random_string(10)
241     sessions[session_id] = { 'addresses':{}, 'version':version }
242     for a in addresses:
243         sessions[session_id]['addresses'][a] = ('','')
244     out = repr( (session_id, config.get('server','banner').replace('\\n','\n') ) )
245     sessions[session_id]['last_time'] = time.time()
246     return out
247
248
249 def client_version_json(session_id, _, version):
250     global m_sessions
251     sessions = m_sessions[0]
252     sessions[session_id]['version'] = version
253     m_sessions[0] = sessions
254
255 def create_session_json(_, __):
256     sessions = m_sessions[0]
257     session_id = random_string(10)
258     print "creating session", session_id
259     sessions[session_id] = { 'addresses':{}, 'numblocks':('','') }
260     sessions[session_id]['last_time'] = time.time()
261     m_sessions[0] = sessions
262     return session_id
263
264
265
266 def get_banner(_,__):
267     return config.get('server','banner').replace('\\n','\n')
268
269 def update_session(session_id,addresses):
270     """deprecated in 0.42"""
271     sessions[session_id]['addresses'] = {}
272     for a in addresses:
273         sessions[session_id]['addresses'][a] = ''
274     sessions[session_id]['last_time'] = time.time()
275     return 'ok'
276
277 def native_server_thread():
278     s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
279     s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
280     s.bind((config.get('server','host'), config.getint('server','port')))
281     s.listen(1)
282     while not stopping:
283         conn, addr = s.accept()
284         try:
285             thread.start_new_thread(native_client_thread, (addr, conn,))
286         except:
287             # can't start new thread if there is no memory..
288             traceback.print_exc(file=sys.stdout)
289
290
291 def native_client_thread(ipaddr,conn):
292     #print "client thread", ipaddr
293     try:
294         ipaddr = ipaddr[0]
295         msg = ''
296         while 1:
297             d = conn.recv(1024)
298             msg += d
299             if not d: 
300                 break
301             if '#' in msg:
302                 msg = msg.split('#', 1)[0]
303                 break
304         try:
305             cmd, data = ast.literal_eval(msg)
306         except:
307             print "syntax error", repr(msg), ipaddr
308             conn.close()
309             return
310
311         out = do_command(cmd, data, ipaddr)
312         if out:
313             #print ipaddr, cmd, len(out)
314             try:
315                 conn.send(out)
316             except:
317                 print "error, could not send"
318
319     finally:
320         conn.close()
321
322
323 def timestr():
324     return time.strftime("[%d/%m/%Y-%H:%M:%S]")
325
326 # used by the native handler
327 def do_command(cmd, data, ipaddr):
328
329     if cmd=='b':
330         out = "%d"%block_number
331
332     elif cmd in ['session','new_session']:
333         try:
334             if cmd == 'session':
335                 addresses = ast.literal_eval(data)
336                 version = "old"
337             else:
338                 version, addresses = ast.literal_eval(data)
339                 if version[0]=="0": version = "v" + version
340         except:
341             print "error", data
342             return None
343         print timestr(), "new session", ipaddr, addresses[0] if addresses else addresses, len(addresses), version
344         out = new_session(version, addresses)
345
346     elif cmd=='address.subscribe':
347         try:
348             session_id, addr = ast.literal_eval(data)
349         except:
350             traceback.print_exc(file=sys.stdout)
351             return None
352         out = add_address_to_session(session_id,addr)
353
354     elif cmd=='update_session':
355         try:
356             session_id, addresses = ast.literal_eval(data)
357         except:
358             traceback.print_exc(file=sys.stdout)
359             return None
360         print timestr(), "update session", ipaddr, addresses[0] if addresses else addresses, len(addresses)
361         out = update_session(session_id,addresses)
362             
363     elif cmd=='poll': 
364         out = poll_session(data)
365
366     elif cmd == 'h': 
367         # history
368         address = data
369         out = repr( store.get_history( address ) )
370
371     elif cmd == 'load': 
372         out = cmd_load(None,None,data)
373
374     elif cmd =='tx':
375         out = store.send_tx(data)
376         print timestr(), "sent tx:", ipaddr, out
377
378     elif cmd == 'stop':
379         out = cmd_stop(data)
380
381     elif cmd == 'peers':
382         out = repr(peer_list.values())
383
384     else:
385         out = None
386
387     return out
388
389
390
391 ####################################################################
392
393 def tcp_server_thread():
394     thread.start_new_thread(process_input_queue, ())
395     thread.start_new_thread(process_output_queue, ())
396
397     s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
398     s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
399     s.bind((config.get('server','host'), 50001))
400     s.listen(1)
401     while not stopping:
402         conn, addr = s.accept()
403         try:
404             thread.start_new_thread(tcp_client_thread, (addr, conn,))
405         except:
406             # can't start new thread if there is no memory..
407             traceback.print_exc(file=sys.stdout)
408
409
410 def close_session(session_id):
411     #print "lost connection", session_id
412     sessions.pop(session_id)
413     if session_id in sessions_sub_numblocks:
414         sessions_sub_numblocks.pop(session_id)
415
416
417 # one thread per client. put requests in a queue.
418 def tcp_client_thread(ipaddr,conn):
419     """ use a persistent connection. put commands in a queue."""
420
421     print timestr(), "TCP session", ipaddr
422     global sessions
423
424     session_id = random_string(10)
425     sessions[session_id] = { 'conn':conn, 'addresses':{}, 'version':'unknown', 'type':'persistent' }
426
427     ipaddr = ipaddr[0]
428     msg = ''
429
430     while not stopping:
431         try:
432             d = conn.recv(1024)
433         except socket.error:
434             d = ''
435         if not d:
436             close_session(session_id)
437             break
438
439         msg += d
440         while True:
441             s = msg.find('\n')
442             if s ==-1:
443                 break
444             else:
445                 c = msg[0:s].strip()
446                 msg = msg[s+1:]
447                 if c == 'quit': 
448                     conn.close()
449                     close_session(session_id)
450                     return
451                 try:
452                     c = json.loads(c)
453                 except:
454                     print "json error", repr(c)
455                     continue
456                 try:
457                     message_id = c.get('id')
458                     method = c.get('method')
459                     params = c.get('params')
460                 except:
461                     print "syntax error", repr(c), ipaddr
462                     continue
463
464                 # add to queue
465                 input_queue.put((session_id, message_id, method, params))
466
467
468
469 # read commands from the input queue. perform requests, etc. this should be called from the main thread.
470 def process_input_queue():
471     while not stopping:
472         session_id, message_id, method, data = input_queue.get()
473         if session_id not in sessions.keys():
474             continue
475         out = None
476         if method == 'address.subscribe':
477             address = data[0]
478             subscribe_to_address(session_id,message_id,address)
479         elif method == 'numblocks.subscribe':
480             subscribe_to_numblocks(session_id,message_id)
481         elif method == 'client.version':
482             sessions[session_id]['version'] = data[0]
483         elif method == 'server.banner':
484             out = { 'result':config.get('server','banner').replace('\\n','\n') } 
485         elif method == 'server.peers':
486             out = { 'result':peer_list.values() } 
487         elif method == 'address.get_history':
488             address = data[0]
489             out = { 'result':store.get_history( address ) } 
490         elif method == 'transaction.broadcast':
491             postdata = dumps({"method": 'importtransaction', 'params': [data], 'id':'jsonrpc'})
492             txo = urllib.urlopen(bitcoind_url, postdata).read()
493             print "sent tx:", txo
494             out = json.loads(txo)
495         else:
496             print "unknown command", method
497         if out:
498             out['id'] = message_id
499             out = json.dumps( out )
500             output_queue.put((session_id, out))
501
502 # this is a separate thread
503 def process_output_queue():
504     while not stopping:
505         session_id, out = output_queue.get()
506         session = sessions.get(session_id)
507         if session: 
508             try:
509                 conn = session.get('conn')
510                 conn.send(out+'\n')
511             except:
512                 close_session(session_id)
513                 
514
515
516
517 ####################################################################
518
519
520
521
522 def clean_session_thread():
523     while not stopping:
524         time.sleep(30)
525         t = time.time()
526         for k,s in sessions.items():
527             if s.get('type') == 'persistent': continue
528             t0 = s['last_time']
529             if t - t0 > 5*60:
530                 sessions.pop(k)
531                 print "lost session", k
532             
533
534 def irc_thread():
535     global peer_list
536     NICK = 'E_'+random_string(10)
537     while not stopping:
538         try:
539             s = socket.socket()
540             s.connect(('irc.freenode.net', 6667))
541             s.send('USER electrum 0 * :'+config.get('server','host')+' '+config.get('server','ircname')+'\n')
542             s.send('NICK '+NICK+'\n')
543             s.send('JOIN #electrum\n')
544             sf = s.makefile('r', 0)
545             t = 0
546             while not stopping:
547                 line = sf.readline()
548                 line = line.rstrip('\r\n')
549                 line = line.split()
550                 if line[0]=='PING': 
551                     s.send('PONG '+line[1]+'\n')
552                 elif '353' in line: # answer to /names
553                     k = line.index('353')
554                     for item in line[k+1:]:
555                         if item[0:2] == 'E_':
556                             s.send('WHO %s\n'%item)
557                 elif '352' in line: # answer to /who
558                     # warning: this is a horrible hack which apparently works
559                     k = line.index('352')
560                     ip = line[k+4]
561                     ip = socket.gethostbyname(ip)
562                     name = line[k+6]
563                     host = line[k+9]
564                     peer_list[name] = (ip,host)
565                 if time.time() - t > 5*60:
566                     s.send('NAMES #electrum\n')
567                     t = time.time()
568                     peer_list = {}
569         except:
570             traceback.print_exc(file=sys.stdout)
571         finally:
572             sf.close()
573             s.close()
574
575
576 def get_peers_json(_,__):
577     return peer_list.values()
578
579 def http_server_thread():
580     # see http://code.google.com/p/jsonrpclib/
581     from SocketServer import ThreadingMixIn
582     from StratumJSONRPCServer import StratumJSONRPCServer
583     class StratumThreadedJSONRPCServer(ThreadingMixIn, StratumJSONRPCServer): pass
584     server = StratumThreadedJSONRPCServer(( config.get('server','host'), 8081))
585     server.register_function(get_peers_json, 'server.peers')
586     server.register_function(cmd_stop, 'stop')
587     server.register_function(cmd_load, 'load')
588     server.register_function(get_banner, 'server.banner')
589     server.register_function(lambda a,b,c: store.send_tx(c), 'transaction.broadcast')
590     server.register_function(address_get_history_json, 'address.get_history')
591     server.register_function(add_address_to_session_json, 'address.subscribe')
592     server.register_function(subscribe_to_numblocks_json, 'numblocks.subscribe')
593     server.register_function(client_version_json, 'client.version')
594     server.register_function(create_session_json, 'session.create')   # internal message (not part of protocol)
595     server.register_function(poll_session_json, 'session.poll')       # internal message (not part of protocol)
596     server.serve_forever()
597
598
599 import traceback
600
601
602 if __name__ == '__main__':
603
604     if len(sys.argv)>1:
605         import jsonrpclib
606         server = jsonrpclib.Server('http://%s:8081'%config.get('server','host'))
607         cmd = sys.argv[1]
608         if cmd == 'load':
609             out = server.load(password)
610         elif cmd == 'peers':
611             out = server.server.peers()
612         elif cmd == 'stop':
613             out = server.stop(password)
614         elif cmd == 'clear_cache':
615             out = server.clear_cache(password)
616         elif cmd == 'get_cache':
617             out = server.get_cache(password,sys.argv[2])
618         elif cmd == 'h':
619             out = server.address.get_history(sys.argv[2])
620         elif cmd == 'tx':
621             out = server.transaction.broadcast(sys.argv[2])
622         elif cmd == 'b':
623             out = server.numblocks.subscribe()
624         else:
625             out = "Unknown command: '%s'" % cmd
626         print out
627         sys.exit(0)
628
629
630     # backend
631     import db
632     store = db.MyStore(config,address_queue)
633
634     # supported protocols
635     thread.start_new_thread(native_server_thread, ())
636     thread.start_new_thread(tcp_server_thread, ())
637     thread.start_new_thread(http_server_thread, ())
638     thread.start_new_thread(clean_session_thread, ())
639
640     if (config.get('server','irc') == 'yes' ):
641         thread.start_new_thread(irc_thread, ())
642
643     print "starting Electrum server"
644
645
646     while not stopping:
647         block_number = store.main_iteration()
648
649         if block_number != old_block_number:
650             old_block_number = block_number
651             for session_id in sessions_sub_numblocks.keys():
652                 send_numblocks(session_id)
653         # do addresses
654         while True:
655             try:
656                 addr = address_queue.get(False)
657             except:
658                 break
659             do_update_address(addr)
660
661         time.sleep(10)
662     print "server stopped"
663