abstract services from processor; define server.peers as a service
[electrum-server.git] / server.py
1 #!/usr/bin/env python
2 # Copyright(C) 2012 thomasv@gitorious
3
4 # This program is free software: you can redistribute it and/or modify
5 # it under the terms of the GNU Affero General Public License as
6 # published by the Free Software Foundation, either version 3 of the
7 # License, or (at your option) any later version.
8 #
9 # This program is distributed in the hope that it will be useful, but
10 # WITHOUT ANY WARRANTY; without even the implied warranty of
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
12 # Affero General Public License for more details.
13 #
14 # You should have received a copy of the GNU Affero General Public
15 # License along with this program.  If not, see
16 # <http://www.gnu.org/licenses/agpl.html>.
17
18 """
19 Todo:
20    * server should check and return bitcoind status..
21    * improve txpoint sorting
22    * command to check cache
23
24  mempool transactions do not need to be added to the database; it slows it down
25 """
26
27 import abe_backend
28
29
30
31
32 import time, json, socket, operator, thread, ast, sys, re, traceback
33 import ConfigParser
34 from json import dumps, loads
35 import urllib
36 import threading
37
38 config = ConfigParser.ConfigParser()
39 # set some defaults, which will be overwritten by the config file
40 config.add_section('server')
41 config.set('server','banner', 'Welcome to Electrum!')
42 config.set('server', 'host', 'localhost')
43 config.set('server', 'port', '50000')
44 config.set('server', 'password', '')
45 config.set('server', 'irc', 'yes')
46 config.set('server', 'ircname', 'Electrum server')
47 config.add_section('database')
48 config.set('database', 'type', 'psycopg2')
49 config.set('database', 'database', 'abe')
50
51 try:
52     f = open('/etc/electrum.conf','r')
53     config.readfp(f)
54     f.close()
55 except:
56     print "Could not read electrum.conf. I will use the default values."
57
58 try:
59     f = open('/etc/electrum.banner','r')
60     config.set('server','banner', f.read())
61     f.close()
62 except:
63     pass
64
65
66 password = config.get('server','password')
67
68 stopping = False
69 sessions = {}
70
71 m_sessions = [{}] # served by http
72
73
74 from Queue import Queue
75 input_queue = Queue()
76 output_queue = Queue()
77
78
79
80
81 def random_string(N):
82     import random, string
83     return ''.join(random.choice(string.ascii_uppercase + string.digits) for x in range(N))
84
85     
86
87 def cmd_stop(_,__,pw):
88     global stopping
89     if password == pw:
90         stopping = True
91         return 'ok'
92     else:
93         return 'wrong password'
94
95 def cmd_load(_,__,pw):
96     if password == pw:
97         return repr( len(sessions) )
98     else:
99         return 'wrong password'
100
101
102
103
104
105 def modified_addresses(a_session):
106     #t1 = time.time()
107     import copy
108     session = copy.deepcopy(a_session)
109     addresses = session['addresses']
110     session['last_time'] = time.time()
111     ret = {}
112     k = 0
113     for addr in addresses:
114         status = store.get_status( addr )
115         msg_id, last_status = addresses.get( addr )
116         if last_status != status:
117             addresses[addr] = msg_id, status
118             ret[addr] = status
119
120     #t2 = time.time() - t1 
121     #if t2 > 10: print "high load:", session_id, "%d/%d"%(k,len(addresses)), t2
122     return ret, addresses
123
124
125 def poll_session(session_id): 
126     # native
127     session = sessions.get(session_id)
128     if session is None:
129         print time.asctime(), "session not found", session_id
130         return -1, {}
131     else:
132         sessions[session_id]['last_time'] = time.time()
133         ret, addresses = modified_addresses(session)
134         if ret: sessions[session_id]['addresses'] = addresses
135         return repr( (store.block_number,ret))
136
137
138 def poll_session_json(session_id, message_id):
139     session = m_sessions[0].get(session_id)
140     if session is None:
141         raise BaseException("session not found %s"%session_id)
142     else:
143         m_sessions[0][session_id]['last_time'] = time.time()
144         out = []
145         ret, addresses = modified_addresses(session)
146         if ret: 
147             m_sessions[0][session_id]['addresses'] = addresses
148             for addr in ret:
149                 msg_id, status = addresses[addr]
150                 out.append(  { 'id':msg_id, 'result':status } )
151
152         msg_id, last_nb = session.get('numblocks')
153         if last_nb:
154             if last_nb != block_number:
155                 m_sessions[0][session_id]['numblocks'] = msg_id, block_number
156                 out.append( {'id':msg_id, 'result':block_number} )
157
158         return out
159
160
161
162
163 def address_get_history_json(_,message_id,address):
164     return store.get_history(address)
165
166 def subscribe_to_numblocks_json(session_id, message_id):
167     global m_sessions
168     m_sessions[0][session_id]['numblocks'] = message_id,block_number
169     return block_number
170
171 def add_address_to_session_json(session_id, message_id, address):
172     global m_sessions
173     sessions = m_sessions[0]
174     status = store.get_status(address)
175     sessions[session_id]['addresses'][address] = (message_id, status)
176     sessions[session_id]['last_time'] = time.time()
177     m_sessions[0] = sessions
178     return status
179
180 def add_address_to_session(session_id, address):
181     status = store.get_status(address)
182     sessions[session_id]['addresses'][address] = ("", status)
183     sessions[session_id]['last_time'] = time.time()
184     return status
185
186 def new_session(version, addresses):
187     session_id = random_string(10)
188     sessions[session_id] = { 'addresses':{}, 'version':version }
189     for a in addresses:
190         sessions[session_id]['addresses'][a] = ('','')
191     out = repr( (session_id, config.get('server','banner').replace('\\n','\n') ) )
192     sessions[session_id]['last_time'] = time.time()
193     return out
194
195
196 def client_version_json(session_id, _, version):
197     global m_sessions
198     sessions = m_sessions[0]
199     sessions[session_id]['version'] = version
200     m_sessions[0] = sessions
201
202
203
204 def get_banner(_,__):
205     return config.get('server','banner').replace('\\n','\n')
206
207 def update_session(session_id,addresses):
208     """deprecated in 0.42, wad replaced by add_address_to_session"""
209     sessions[session_id]['addresses'] = {}
210     for a in addresses:
211         sessions[session_id]['addresses'][a] = ''
212     sessions[session_id]['last_time'] = time.time()
213     return 'ok'
214
215 def native_server_thread():
216     s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
217     s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
218     s.bind((config.get('server','host'), config.getint('server','port')))
219     s.listen(1)
220     while not stopping:
221         conn, addr = s.accept()
222         try:
223             thread.start_new_thread(native_client_thread, (addr, conn,))
224         except:
225             # can't start new thread if there is no memory..
226             traceback.print_exc(file=sys.stdout)
227
228
229 def native_client_thread(ipaddr,conn):
230     #print "client thread", ipaddr
231     try:
232         ipaddr = ipaddr[0]
233         msg = ''
234         while 1:
235             d = conn.recv(1024)
236             msg += d
237             if not d: 
238                 break
239             if '#' in msg:
240                 msg = msg.split('#', 1)[0]
241                 break
242         try:
243             cmd, data = ast.literal_eval(msg)
244         except:
245             print "syntax error", repr(msg), ipaddr
246             conn.close()
247             return
248
249         out = do_command(cmd, data, ipaddr)
250         if out:
251             #print ipaddr, cmd, len(out)
252             try:
253                 conn.send(out)
254             except:
255                 print "error, could not send"
256
257     finally:
258         conn.close()
259
260
261 def timestr():
262     return time.strftime("[%d/%m/%Y-%H:%M:%S]")
263
264 # used by the native handler
265 def do_command(cmd, data, ipaddr):
266
267     if cmd=='b':
268         out = "%d"%block_number
269
270     elif cmd in ['session','new_session']:
271         try:
272             if cmd == 'session':
273                 addresses = ast.literal_eval(data)
274                 version = "old"
275             else:
276                 version, addresses = ast.literal_eval(data)
277                 if version[0]=="0": version = "v" + version
278         except:
279             print "error", data
280             return None
281         print timestr(), "new session", ipaddr, addresses[0] if addresses else addresses, len(addresses), version
282         out = new_session(version, addresses)
283
284     elif cmd=='address.subscribe':
285         try:
286             session_id, addr = ast.literal_eval(data)
287         except:
288             traceback.print_exc(file=sys.stdout)
289             print data
290             return None
291         out = add_address_to_session(session_id,addr)
292
293     elif cmd=='update_session':
294         try:
295             session_id, addresses = ast.literal_eval(data)
296         except:
297             traceback.print_exc(file=sys.stdout)
298             return None
299         print timestr(), "update session", ipaddr, addresses[0] if addresses else addresses, len(addresses)
300         out = update_session(session_id,addresses)
301             
302     elif cmd=='poll': 
303         out = poll_session(data)
304
305     elif cmd == 'h': 
306         # history
307         address = data
308         out = repr( store.get_history( address ) )
309
310     elif cmd == 'load': 
311         out = cmd_load(None,None,data)
312
313     elif cmd =='tx':
314         out = store.send_tx(data)
315         print timestr(), "sent tx:", ipaddr, out
316
317     elif cmd == 'stop':
318         out = cmd_stop(data)
319
320     elif cmd == 'peers':
321         out = repr(irc.get_peers())
322
323     else:
324         out = None
325
326     return out
327
328
329 def clean_session_thread():
330     while not stopping:
331         time.sleep(30)
332         t = time.time()
333         for k,s in sessions.items():
334             if s.get('type') == 'persistent': continue
335             t0 = s['last_time']
336             if t - t0 > 5*60:
337                 sessions.pop(k)
338                 print "lost session", k
339             
340
341 ####################################################################
342
343
344 import stratum
345
346 class AbeProcessor(stratum.Processor):
347     def process(self,request):
348         message_id = request['id']
349         method = request['method']
350         params = request.get('params',[])
351         #print request
352
353         result = ''
354         if method == 'numblocks.subscribe':
355             result = store.block_number
356         elif method == 'address.subscribe':
357             address = params[0]
358             store.watch_address(address)
359             status = store.get_status(address)
360             result = status
361         elif method == 'client.version':
362             #session.version = params[0]
363             pass
364         elif method == 'server.banner':
365             result = config.get('server','banner').replace('\\n','\n')
366         elif method == 'server.peers':
367             result = irc.get_peers()
368         elif method == 'address.get_history':
369             address = params[0]
370             result = store.get_history( address ) 
371         elif method == 'transaction.broadcast':
372             txo = store.send_tx(params[0])
373             print "sent tx:", txo
374             result = txo 
375         else:
376             print "unknown method", request
377
378         if result!='':
379             response = { 'id':message_id, 'method':method, 'params':params, 'result':result }
380             self.push_response(response)
381
382     def get_status(self,addr):
383         return store.get_status(addr)
384
385
386
387 ####################################################################
388
389
390
391 class Irc(threading.Thread):
392
393     def __init__(self, processor):
394         self.processor = processor
395         threading.Thread.__init__(self)
396         self.daemon = True
397         self.peers = {}
398
399     def get_peers(self):
400         return self.peers.values()
401
402     def run(self):
403         NICK = 'E_'+random_string(10)
404         while not self.processor.shared.stopped():
405             try:
406                 s = socket.socket()
407                 s.connect(('irc.freenode.net', 6667))
408                 s.send('USER electrum 0 * :'+config.get('server','host')+' '+config.get('server','ircname')+'\n')
409                 s.send('NICK '+NICK+'\n')
410                 s.send('JOIN #electrum\n')
411                 sf = s.makefile('r', 0)
412                 t = 0
413                 while not self.processor.shared.stopped():
414                     line = sf.readline()
415                     line = line.rstrip('\r\n')
416                     line = line.split()
417                     if line[0]=='PING': 
418                         s.send('PONG '+line[1]+'\n')
419                     elif '353' in line: # answer to /names
420                         k = line.index('353')
421                         for item in line[k+1:]:
422                             if item[0:2] == 'E_':
423                                 s.send('WHO %s\n'%item)
424                     elif '352' in line: # answer to /who
425                         # warning: this is a horrible hack which apparently works
426                         k = line.index('352')
427                         ip = line[k+4]
428                         ip = socket.gethostbyname(ip)
429                         name = line[k+6]
430                         host = line[k+9]
431                         self.peers[name] = (ip,host)
432                     if time.time() - t > 5*60:
433                         self.processor.push_response({'method':'server.peers', 'result':[self.get_peers()]})
434                         s.send('NAMES #electrum\n')
435                         t = time.time()
436                         self.peers = {}
437             except:
438                 traceback.print_exc(file=sys.stdout)
439             finally:
440                 sf.close()
441                 s.close()
442
443
444 def get_peers_json(_,__):
445     return irc.get_peers()
446
447 def http_server_thread():
448     # see http://code.google.com/p/jsonrpclib/
449     from SocketServer import ThreadingMixIn
450     from StratumJSONRPCServer import StratumJSONRPCServer
451     class StratumThreadedJSONRPCServer(ThreadingMixIn, StratumJSONRPCServer): pass
452     server = StratumThreadedJSONRPCServer(( config.get('server','host'), 8081))
453     server.register_function(get_peers_json, 'server.peers')
454     server.register_function(cmd_stop, 'stop')
455     server.register_function(cmd_load, 'load')
456     server.register_function(get_banner, 'server.banner')
457     server.register_function(lambda a,b,c: store.send_tx(c), 'transaction.broadcast')
458     server.register_function(address_get_history_json, 'address.get_history')
459     server.register_function(add_address_to_session_json, 'address.subscribe')
460     server.register_function(subscribe_to_numblocks_json, 'numblocks.subscribe')
461     server.register_function(client_version_json, 'client.version')
462     server.register_function(create_session_json, 'session.create')   # internal message (not part of protocol)
463     server.register_function(poll_session_json, 'session.poll')       # internal message (not part of protocol)
464     server.serve_forever()
465
466
467 if __name__ == '__main__':
468
469     if len(sys.argv)>1:
470         import jsonrpclib
471         server = jsonrpclib.Server('http://%s:8081'%config.get('server','host'))
472         cmd = sys.argv[1]
473         if cmd == 'load':
474             out = server.load(password)
475         elif cmd == 'peers':
476             out = server.server.peers()
477         elif cmd == 'stop':
478             out = server.stop(password)
479         elif cmd == 'clear_cache':
480             out = server.clear_cache(password)
481         elif cmd == 'get_cache':
482             out = server.get_cache(password,sys.argv[2])
483         elif cmd == 'h':
484             out = server.address.get_history(sys.argv[2])
485         elif cmd == 'tx':
486             out = server.transaction.broadcast(sys.argv[2])
487         elif cmd == 'b':
488             out = server.numblocks.subscribe()
489         else:
490             out = "Unknown command: '%s'" % cmd
491         print out
492         sys.exit(0)
493
494     # backend
495     store = abe_backend.AbeStore(config)
496
497     # supported protocols
498     thread.start_new_thread(native_server_thread, ())
499     thread.start_new_thread(clean_session_thread, ())
500
501     #thread.start_new_thread(http_server_thread, ())
502
503
504     processor = AbeProcessor()
505     shared = stratum.Shared()
506     # Bind shared to processor since constructor is user defined
507     processor.shared = shared
508     processor.start()
509
510     # Create various transports we need
511
512     #tcp stratum
513     tcpserver = stratum.TcpServer(shared, processor, "ecdsa.org",50001)
514     tcpserver.start()
515
516     #http stratum
517     from stratum_http import HttpServer
518     server = HttpServer(shared, processor, "ecdsa.org",8081)
519     server.start()
520
521
522     if (config.get('server','irc') == 'yes' ):
523         irc = Irc(processor)
524         irc.start()
525
526
527     print "starting Electrum server"
528     store.run(processor)
529     print "server stopped"
530