9c2e2b857b110f701bf07ba9c2f77159d1712992
[electrum-server.git] / server.py
1 #!/usr/bin/env python
2 # Copyright(C) 2012 thomasv@gitorious
3
4 # This program is free software: you can redistribute it and/or modify
5 # it under the terms of the GNU Affero General Public License as
6 # published by the Free Software Foundation, either version 3 of the
7 # License, or (at your option) any later version.
8 #
9 # This program is distributed in the hope that it will be useful, but
10 # WITHOUT ANY WARRANTY; without even the implied warranty of
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
12 # Affero General Public License for more details.
13 #
14 # You should have received a copy of the GNU Affero General Public
15 # License along with this program.  If not, see
16 # <http://www.gnu.org/licenses/agpl.html>.
17
18 """
19 Todo:
20    * server should check and return bitcoind status..
21    * improve txpoint sorting
22    * command to check cache
23
24  mempool transactions do not need to be added to the database; it slows it down
25 """
26
27 import abe_backend
28
29
30
31
32 import time, json, socket, operator, thread, ast, sys, re, traceback
33 import ConfigParser
34 from json import dumps, loads
35 import urllib
36
37
38 config = ConfigParser.ConfigParser()
39 # set some defaults, which will be overwritten by the config file
40 config.add_section('server')
41 config.set('server','banner', 'Welcome to Electrum!')
42 config.set('server', 'host', 'localhost')
43 config.set('server', 'port', '50000')
44 config.set('server', 'password', '')
45 config.set('server', 'irc', 'yes')
46 config.set('server', 'ircname', 'Electrum server')
47 config.add_section('database')
48 config.set('database', 'type', 'psycopg2')
49 config.set('database', 'database', 'abe')
50
51 try:
52     f = open('/etc/electrum.conf','r')
53     config.readfp(f)
54     f.close()
55 except:
56     print "Could not read electrum.conf. I will use the default values."
57
58 try:
59     f = open('/etc/electrum.banner','r')
60     config.set('server','banner', f.read())
61     f.close()
62 except:
63     pass
64
65
66 password = config.get('server','password')
67
68 stopping = False
69 block_number = -1
70 sessions = {}
71
72 m_sessions = [{}] # served by http
73
74 peer_list = {}
75
76 from Queue import Queue
77 input_queue = Queue()
78 output_queue = Queue()
79
80
81
82
83 def random_string(N):
84     import random, string
85     return ''.join(random.choice(string.ascii_uppercase + string.digits) for x in range(N))
86
87     
88
89 def cmd_stop(_,__,pw):
90     global stopping
91     if password == pw:
92         stopping = True
93         return 'ok'
94     else:
95         return 'wrong password'
96
97 def cmd_load(_,__,pw):
98     if password == pw:
99         return repr( len(sessions) )
100     else:
101         return 'wrong password'
102
103
104
105
106
107 def modified_addresses(a_session):
108     #t1 = time.time()
109     import copy
110     session = copy.deepcopy(a_session)
111     addresses = session['addresses']
112     session['last_time'] = time.time()
113     ret = {}
114     k = 0
115     for addr in addresses:
116         status = store.get_status( addr )
117         msg_id, last_status = addresses.get( addr )
118         if last_status != status:
119             addresses[addr] = msg_id, status
120             ret[addr] = status
121
122     #t2 = time.time() - t1 
123     #if t2 > 10: print "high load:", session_id, "%d/%d"%(k,len(addresses)), t2
124     return ret, addresses
125
126
127 def poll_session(session_id): 
128     # native
129     session = sessions.get(session_id)
130     if session is None:
131         print time.asctime(), "session not found", session_id
132         return -1, {}
133     else:
134         sessions[session_id]['last_time'] = time.time()
135         ret, addresses = modified_addresses(session)
136         if ret: sessions[session_id]['addresses'] = addresses
137         return repr( (block_number,ret))
138
139
140 def poll_session_json(session_id, message_id):
141     session = m_sessions[0].get(session_id)
142     if session is None:
143         raise BaseException("session not found %s"%session_id)
144     else:
145         m_sessions[0][session_id]['last_time'] = time.time()
146         out = []
147         ret, addresses = modified_addresses(session)
148         if ret: 
149             m_sessions[0][session_id]['addresses'] = addresses
150             for addr in ret:
151                 msg_id, status = addresses[addr]
152                 out.append(  { 'id':msg_id, 'result':status } )
153
154         msg_id, last_nb = session.get('numblocks')
155         if last_nb:
156             if last_nb != block_number:
157                 m_sessions[0][session_id]['numblocks'] = msg_id, block_number
158                 out.append( {'id':msg_id, 'result':block_number} )
159
160         return out
161
162
163
164
165 def address_get_history_json(_,message_id,address):
166     return store.get_history(address)
167
168 def subscribe_to_numblocks_json(session_id, message_id):
169     global m_sessions
170     m_sessions[0][session_id]['numblocks'] = message_id,block_number
171     return block_number
172
173 def add_address_to_session_json(session_id, message_id, address):
174     global m_sessions
175     sessions = m_sessions[0]
176     status = store.get_status(address)
177     sessions[session_id]['addresses'][address] = (message_id, status)
178     sessions[session_id]['last_time'] = time.time()
179     m_sessions[0] = sessions
180     return status
181
182 def add_address_to_session(session_id, address):
183     status = store.get_status(address)
184     sessions[session_id]['addresses'][address] = ("", status)
185     sessions[session_id]['last_time'] = time.time()
186     return status
187
188 def new_session(version, addresses):
189     session_id = random_string(10)
190     sessions[session_id] = { 'addresses':{}, 'version':version }
191     for a in addresses:
192         sessions[session_id]['addresses'][a] = ('','')
193     out = repr( (session_id, config.get('server','banner').replace('\\n','\n') ) )
194     sessions[session_id]['last_time'] = time.time()
195     return out
196
197
198 def client_version_json(session_id, _, version):
199     global m_sessions
200     sessions = m_sessions[0]
201     sessions[session_id]['version'] = version
202     m_sessions[0] = sessions
203
204 def create_session_json(_, __):
205     sessions = m_sessions[0]
206     session_id = random_string(10)
207     print "creating session", session_id
208     sessions[session_id] = { 'addresses':{}, 'numblocks':('','') }
209     sessions[session_id]['last_time'] = time.time()
210     m_sessions[0] = sessions
211     return session_id
212
213
214
215 def get_banner(_,__):
216     return config.get('server','banner').replace('\\n','\n')
217
218 def update_session(session_id,addresses):
219     """deprecated in 0.42, wad replaced by add_address_to_session"""
220     sessions[session_id]['addresses'] = {}
221     for a in addresses:
222         sessions[session_id]['addresses'][a] = ''
223     sessions[session_id]['last_time'] = time.time()
224     return 'ok'
225
226 def native_server_thread():
227     s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
228     s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
229     s.bind((config.get('server','host'), config.getint('server','port')))
230     s.listen(1)
231     while not stopping:
232         conn, addr = s.accept()
233         try:
234             thread.start_new_thread(native_client_thread, (addr, conn,))
235         except:
236             # can't start new thread if there is no memory..
237             traceback.print_exc(file=sys.stdout)
238
239
240 def native_client_thread(ipaddr,conn):
241     #print "client thread", ipaddr
242     try:
243         ipaddr = ipaddr[0]
244         msg = ''
245         while 1:
246             d = conn.recv(1024)
247             msg += d
248             if not d: 
249                 break
250             if '#' in msg:
251                 msg = msg.split('#', 1)[0]
252                 break
253         try:
254             cmd, data = ast.literal_eval(msg)
255         except:
256             print "syntax error", repr(msg), ipaddr
257             conn.close()
258             return
259
260         out = do_command(cmd, data, ipaddr)
261         if out:
262             #print ipaddr, cmd, len(out)
263             try:
264                 conn.send(out)
265             except:
266                 print "error, could not send"
267
268     finally:
269         conn.close()
270
271
272 def timestr():
273     return time.strftime("[%d/%m/%Y-%H:%M:%S]")
274
275 # used by the native handler
276 def do_command(cmd, data, ipaddr):
277
278     if cmd=='b':
279         out = "%d"%block_number
280
281     elif cmd in ['session','new_session']:
282         try:
283             if cmd == 'session':
284                 addresses = ast.literal_eval(data)
285                 version = "old"
286             else:
287                 version, addresses = ast.literal_eval(data)
288                 if version[0]=="0": version = "v" + version
289         except:
290             print "error", data
291             return None
292         print timestr(), "new session", ipaddr, addresses[0] if addresses else addresses, len(addresses), version
293         out = new_session(version, addresses)
294
295     elif cmd=='address.subscribe':
296         try:
297             session_id, addr = ast.literal_eval(data)
298         except:
299             traceback.print_exc(file=sys.stdout)
300             print data
301             return None
302         out = add_address_to_session(session_id,addr)
303
304     elif cmd=='update_session':
305         try:
306             session_id, addresses = ast.literal_eval(data)
307         except:
308             traceback.print_exc(file=sys.stdout)
309             return None
310         print timestr(), "update session", ipaddr, addresses[0] if addresses else addresses, len(addresses)
311         out = update_session(session_id,addresses)
312             
313     elif cmd=='poll': 
314         out = poll_session(data)
315
316     elif cmd == 'h': 
317         # history
318         address = data
319         out = repr( store.get_history( address ) )
320
321     elif cmd == 'load': 
322         out = cmd_load(None,None,data)
323
324     elif cmd =='tx':
325         out = store.send_tx(data)
326         print timestr(), "sent tx:", ipaddr, out
327
328     elif cmd == 'stop':
329         out = cmd_stop(data)
330
331     elif cmd == 'peers':
332         out = repr(peer_list.values())
333
334     else:
335         out = None
336
337     return out
338
339
340 def clean_session_thread():
341     while not stopping:
342         time.sleep(30)
343         t = time.time()
344         for k,s in sessions.items():
345             if s.get('type') == 'persistent': continue
346             t0 = s['last_time']
347             if t - t0 > 5*60:
348                 sessions.pop(k)
349                 print "lost session", k
350             
351
352 ####################################################################
353
354
355 import stratum
356
357 class AbeProcessor(stratum.Processor):
358     def process(self,session,request):
359         message_id = request['id']
360         method = request['method']
361         params = request.get('params',[])
362         #print request
363
364         result = ''
365         if method == 'numblocks.subscribe':
366             session.subscribe_to_numblocks(message_id)
367             result = block_number
368         elif method == 'address.subscribe':
369             address = params[0]
370             status = store.get_status(address)
371             session.subscribe_to_address(address,message_id,status)
372             result = status
373         elif method == 'client.version':
374             session.version = params[0]
375         elif method == 'server.banner':
376             result = config.get('server','banner').replace('\\n','\n')
377         elif method == 'server.peers':
378             result = peer_list.values()
379         elif method == 'address.get_history':
380             address = params[0]
381             result = store.get_history( address ) 
382         elif method == 'transaction.broadcast':
383             txo = store.send_tx(params[0])
384             print "sent tx:", txo
385             result = txo 
386         else:
387             print "unknown method", request
388
389         if result!='':
390             response = { 'id':message_id, 'result':result }
391             self.push_response(session,response)
392
393     def get_status(self,addr):
394         return store.get_status(addr)
395
396
397
398 ####################################################################
399
400
401
402
403
404 def irc_thread():
405     global peer_list
406     NICK = 'E_'+random_string(10)
407     while not stopping:
408         try:
409             s = socket.socket()
410             s.connect(('irc.freenode.net', 6667))
411             s.send('USER electrum 0 * :'+config.get('server','host')+' '+config.get('server','ircname')+'\n')
412             s.send('NICK '+NICK+'\n')
413             s.send('JOIN #electrum\n')
414             sf = s.makefile('r', 0)
415             t = 0
416             while not stopping:
417                 line = sf.readline()
418                 line = line.rstrip('\r\n')
419                 line = line.split()
420                 if line[0]=='PING': 
421                     s.send('PONG '+line[1]+'\n')
422                 elif '353' in line: # answer to /names
423                     k = line.index('353')
424                     for item in line[k+1:]:
425                         if item[0:2] == 'E_':
426                             s.send('WHO %s\n'%item)
427                 elif '352' in line: # answer to /who
428                     # warning: this is a horrible hack which apparently works
429                     k = line.index('352')
430                     ip = line[k+4]
431                     ip = socket.gethostbyname(ip)
432                     name = line[k+6]
433                     host = line[k+9]
434                     peer_list[name] = (ip,host)
435                 if time.time() - t > 5*60:
436                     s.send('NAMES #electrum\n')
437                     t = time.time()
438                     peer_list = {}
439         except:
440             traceback.print_exc(file=sys.stdout)
441         finally:
442             sf.close()
443             s.close()
444
445
446 def get_peers_json(_,__):
447     return peer_list.values()
448
449 def http_server_thread():
450     # see http://code.google.com/p/jsonrpclib/
451     from SocketServer import ThreadingMixIn
452     from StratumJSONRPCServer import StratumJSONRPCServer
453     class StratumThreadedJSONRPCServer(ThreadingMixIn, StratumJSONRPCServer): pass
454     server = StratumThreadedJSONRPCServer(( config.get('server','host'), 8081))
455     server.register_function(get_peers_json, 'server.peers')
456     server.register_function(cmd_stop, 'stop')
457     server.register_function(cmd_load, 'load')
458     server.register_function(get_banner, 'server.banner')
459     server.register_function(lambda a,b,c: store.send_tx(c), 'transaction.broadcast')
460     server.register_function(address_get_history_json, 'address.get_history')
461     server.register_function(add_address_to_session_json, 'address.subscribe')
462     server.register_function(subscribe_to_numblocks_json, 'numblocks.subscribe')
463     server.register_function(client_version_json, 'client.version')
464     server.register_function(create_session_json, 'session.create')   # internal message (not part of protocol)
465     server.register_function(poll_session_json, 'session.poll')       # internal message (not part of protocol)
466     server.serve_forever()
467
468
469 if __name__ == '__main__':
470
471     if len(sys.argv)>1:
472         import jsonrpclib
473         server = jsonrpclib.Server('http://%s:8081'%config.get('server','host'))
474         cmd = sys.argv[1]
475         if cmd == 'load':
476             out = server.load(password)
477         elif cmd == 'peers':
478             out = server.server.peers()
479         elif cmd == 'stop':
480             out = server.stop(password)
481         elif cmd == 'clear_cache':
482             out = server.clear_cache(password)
483         elif cmd == 'get_cache':
484             out = server.get_cache(password,sys.argv[2])
485         elif cmd == 'h':
486             out = server.address.get_history(sys.argv[2])
487         elif cmd == 'tx':
488             out = server.transaction.broadcast(sys.argv[2])
489         elif cmd == 'b':
490             out = server.numblocks.subscribe()
491         else:
492             out = "Unknown command: '%s'" % cmd
493         print out
494         sys.exit(0)
495
496     # backend
497     store = abe_backend.AbeStore(config)
498
499     # supported protocols
500     thread.start_new_thread(native_server_thread, ())
501
502     thread.start_new_thread(http_server_thread, ())
503     thread.start_new_thread(clean_session_thread, ())
504
505     #tcp stratum
506     stratum_processor = AbeProcessor()
507     shared = stratum.Shared()
508     # Bind shared to processor since constructor is user defined
509     stratum_processor.shared = shared
510     stratum_processor.start()
511     # Create various transports we need
512     server = stratum.TcpServer(shared, stratum_processor, "ecdsa.org",50001)
513     server.start()
514
515     if (config.get('server','irc') == 'yes' ):
516         thread.start_new_thread(irc_thread, ())
517
518     print "starting Electrum server"
519
520     old_block_number = None
521     while not stopping:
522         block_number = store.main_iteration()
523
524         if block_number != old_block_number:
525             old_block_number = block_number
526             stratum_processor.update_from_blocknum(block_number)
527
528         while True:
529             try:
530                 addr = store.address_queue.get(False)
531             except:
532                 break
533
534             stratum_processor.update_from_address(addr)
535
536         time.sleep(10)
537     print "server stopped"
538