new workflow
[electrum-server.git] / server.py
1 #!/usr/bin/env python
2 # Copyright(C) 2012 thomasv@gitorious
3
4 # This program is free software: you can redistribute it and/or modify
5 # it under the terms of the GNU Affero General Public License as
6 # published by the Free Software Foundation, either version 3 of the
7 # License, or (at your option) any later version.
8 #
9 # This program is distributed in the hope that it will be useful, but
10 # WITHOUT ANY WARRANTY; without even the implied warranty of
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
12 # Affero General Public License for more details.
13 #
14 # You should have received a copy of the GNU Affero General Public
15 # License along with this program.  If not, see
16 # <http://www.gnu.org/licenses/agpl.html>.
17
18 """
19 Todo:
20    * server should check and return bitcoind status..
21    * improve txpoint sorting
22    * command to check cache
23
24  mempool transactions do not need to be added to the database; it slows it down
25 """
26
27 import abe_backend
28
29
30
31
32 import time, json, socket, operator, thread, ast, sys, re, traceback
33 import ConfigParser
34 from json import dumps, loads
35 import urllib
36
37
38 config = ConfigParser.ConfigParser()
39 # set some defaults, which will be overwritten by the config file
40 config.add_section('server')
41 config.set('server','banner', 'Welcome to Electrum!')
42 config.set('server', 'host', 'localhost')
43 config.set('server', 'port', '50000')
44 config.set('server', 'password', '')
45 config.set('server', 'irc', 'yes')
46 config.set('server', 'ircname', 'Electrum server')
47 config.add_section('database')
48 config.set('database', 'type', 'psycopg2')
49 config.set('database', 'database', 'abe')
50
51 try:
52     f = open('/etc/electrum.conf','r')
53     config.readfp(f)
54     f.close()
55 except:
56     print "Could not read electrum.conf. I will use the default values."
57
58 try:
59     f = open('/etc/electrum.banner','r')
60     config.set('server','banner', f.read())
61     f.close()
62 except:
63     pass
64
65
66 password = config.get('server','password')
67
68 stopping = False
69 sessions = {}
70
71 m_sessions = [{}] # served by http
72
73 peer_list = {}
74
75 from Queue import Queue
76 input_queue = Queue()
77 output_queue = Queue()
78
79
80
81
82 def random_string(N):
83     import random, string
84     return ''.join(random.choice(string.ascii_uppercase + string.digits) for x in range(N))
85
86     
87
88 def cmd_stop(_,__,pw):
89     global stopping
90     if password == pw:
91         stopping = True
92         return 'ok'
93     else:
94         return 'wrong password'
95
96 def cmd_load(_,__,pw):
97     if password == pw:
98         return repr( len(sessions) )
99     else:
100         return 'wrong password'
101
102
103
104
105
106 def modified_addresses(a_session):
107     #t1 = time.time()
108     import copy
109     session = copy.deepcopy(a_session)
110     addresses = session['addresses']
111     session['last_time'] = time.time()
112     ret = {}
113     k = 0
114     for addr in addresses:
115         status = store.get_status( addr )
116         msg_id, last_status = addresses.get( addr )
117         if last_status != status:
118             addresses[addr] = msg_id, status
119             ret[addr] = status
120
121     #t2 = time.time() - t1 
122     #if t2 > 10: print "high load:", session_id, "%d/%d"%(k,len(addresses)), t2
123     return ret, addresses
124
125
126 def poll_session(session_id): 
127     # native
128     session = sessions.get(session_id)
129     if session is None:
130         print time.asctime(), "session not found", session_id
131         return -1, {}
132     else:
133         sessions[session_id]['last_time'] = time.time()
134         ret, addresses = modified_addresses(session)
135         if ret: sessions[session_id]['addresses'] = addresses
136         return repr( (store.block_number,ret))
137
138
139 def poll_session_json(session_id, message_id):
140     session = m_sessions[0].get(session_id)
141     if session is None:
142         raise BaseException("session not found %s"%session_id)
143     else:
144         m_sessions[0][session_id]['last_time'] = time.time()
145         out = []
146         ret, addresses = modified_addresses(session)
147         if ret: 
148             m_sessions[0][session_id]['addresses'] = addresses
149             for addr in ret:
150                 msg_id, status = addresses[addr]
151                 out.append(  { 'id':msg_id, 'result':status } )
152
153         msg_id, last_nb = session.get('numblocks')
154         if last_nb:
155             if last_nb != block_number:
156                 m_sessions[0][session_id]['numblocks'] = msg_id, block_number
157                 out.append( {'id':msg_id, 'result':block_number} )
158
159         return out
160
161
162
163
164 def address_get_history_json(_,message_id,address):
165     return store.get_history(address)
166
167 def subscribe_to_numblocks_json(session_id, message_id):
168     global m_sessions
169     m_sessions[0][session_id]['numblocks'] = message_id,block_number
170     return block_number
171
172 def add_address_to_session_json(session_id, message_id, address):
173     global m_sessions
174     sessions = m_sessions[0]
175     status = store.get_status(address)
176     sessions[session_id]['addresses'][address] = (message_id, status)
177     sessions[session_id]['last_time'] = time.time()
178     m_sessions[0] = sessions
179     return status
180
181 def add_address_to_session(session_id, address):
182     status = store.get_status(address)
183     sessions[session_id]['addresses'][address] = ("", status)
184     sessions[session_id]['last_time'] = time.time()
185     return status
186
187 def new_session(version, addresses):
188     session_id = random_string(10)
189     sessions[session_id] = { 'addresses':{}, 'version':version }
190     for a in addresses:
191         sessions[session_id]['addresses'][a] = ('','')
192     out = repr( (session_id, config.get('server','banner').replace('\\n','\n') ) )
193     sessions[session_id]['last_time'] = time.time()
194     return out
195
196
197 def client_version_json(session_id, _, version):
198     global m_sessions
199     sessions = m_sessions[0]
200     sessions[session_id]['version'] = version
201     m_sessions[0] = sessions
202
203
204
205 def get_banner(_,__):
206     return config.get('server','banner').replace('\\n','\n')
207
208 def update_session(session_id,addresses):
209     """deprecated in 0.42, wad replaced by add_address_to_session"""
210     sessions[session_id]['addresses'] = {}
211     for a in addresses:
212         sessions[session_id]['addresses'][a] = ''
213     sessions[session_id]['last_time'] = time.time()
214     return 'ok'
215
216 def native_server_thread():
217     s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
218     s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
219     s.bind((config.get('server','host'), config.getint('server','port')))
220     s.listen(1)
221     while not stopping:
222         conn, addr = s.accept()
223         try:
224             thread.start_new_thread(native_client_thread, (addr, conn,))
225         except:
226             # can't start new thread if there is no memory..
227             traceback.print_exc(file=sys.stdout)
228
229
230 def native_client_thread(ipaddr,conn):
231     #print "client thread", ipaddr
232     try:
233         ipaddr = ipaddr[0]
234         msg = ''
235         while 1:
236             d = conn.recv(1024)
237             msg += d
238             if not d: 
239                 break
240             if '#' in msg:
241                 msg = msg.split('#', 1)[0]
242                 break
243         try:
244             cmd, data = ast.literal_eval(msg)
245         except:
246             print "syntax error", repr(msg), ipaddr
247             conn.close()
248             return
249
250         out = do_command(cmd, data, ipaddr)
251         if out:
252             #print ipaddr, cmd, len(out)
253             try:
254                 conn.send(out)
255             except:
256                 print "error, could not send"
257
258     finally:
259         conn.close()
260
261
262 def timestr():
263     return time.strftime("[%d/%m/%Y-%H:%M:%S]")
264
265 # used by the native handler
266 def do_command(cmd, data, ipaddr):
267
268     if cmd=='b':
269         out = "%d"%block_number
270
271     elif cmd in ['session','new_session']:
272         try:
273             if cmd == 'session':
274                 addresses = ast.literal_eval(data)
275                 version = "old"
276             else:
277                 version, addresses = ast.literal_eval(data)
278                 if version[0]=="0": version = "v" + version
279         except:
280             print "error", data
281             return None
282         print timestr(), "new session", ipaddr, addresses[0] if addresses else addresses, len(addresses), version
283         out = new_session(version, addresses)
284
285     elif cmd=='address.subscribe':
286         try:
287             session_id, addr = ast.literal_eval(data)
288         except:
289             traceback.print_exc(file=sys.stdout)
290             print data
291             return None
292         out = add_address_to_session(session_id,addr)
293
294     elif cmd=='update_session':
295         try:
296             session_id, addresses = ast.literal_eval(data)
297         except:
298             traceback.print_exc(file=sys.stdout)
299             return None
300         print timestr(), "update session", ipaddr, addresses[0] if addresses else addresses, len(addresses)
301         out = update_session(session_id,addresses)
302             
303     elif cmd=='poll': 
304         out = poll_session(data)
305
306     elif cmd == 'h': 
307         # history
308         address = data
309         out = repr( store.get_history( address ) )
310
311     elif cmd == 'load': 
312         out = cmd_load(None,None,data)
313
314     elif cmd =='tx':
315         out = store.send_tx(data)
316         print timestr(), "sent tx:", ipaddr, out
317
318     elif cmd == 'stop':
319         out = cmd_stop(data)
320
321     elif cmd == 'peers':
322         out = repr(peer_list.values())
323
324     else:
325         out = None
326
327     return out
328
329
330 def clean_session_thread():
331     while not stopping:
332         time.sleep(30)
333         t = time.time()
334         for k,s in sessions.items():
335             if s.get('type') == 'persistent': continue
336             t0 = s['last_time']
337             if t - t0 > 5*60:
338                 sessions.pop(k)
339                 print "lost session", k
340             
341
342 ####################################################################
343
344
345 import stratum
346
347 class AbeProcessor(stratum.Processor):
348     def process(self,request):
349         message_id = request['id']
350         method = request['method']
351         params = request.get('params',[])
352         #print request
353
354         result = ''
355         if method == 'numblocks.subscribe':
356             result = store.block_number
357         elif method == 'address.subscribe':
358             address = params[0]
359             store.watch_address(address)
360             status = store.get_status(address)
361             result = status
362         elif method == 'client.version':
363             #session.version = params[0]
364             pass
365         elif method == 'server.banner':
366             result = config.get('server','banner').replace('\\n','\n')
367         elif method == 'server.peers':
368             result = peer_list.values()
369         elif method == 'address.get_history':
370             address = params[0]
371             result = store.get_history( address ) 
372         elif method == 'transaction.broadcast':
373             txo = store.send_tx(params[0])
374             print "sent tx:", txo
375             result = txo 
376         else:
377             print "unknown method", request
378
379         if result!='':
380             response = { 'id':message_id, 'method':method, 'params':params, 'result':result }
381             self.push_response(response)
382
383     def get_status(self,addr):
384         return store.get_status(addr)
385
386
387
388 ####################################################################
389
390
391
392
393
394 def irc_thread():
395     global peer_list
396     NICK = 'E_'+random_string(10)
397     while not stopping:
398         try:
399             s = socket.socket()
400             s.connect(('irc.freenode.net', 6667))
401             s.send('USER electrum 0 * :'+config.get('server','host')+' '+config.get('server','ircname')+'\n')
402             s.send('NICK '+NICK+'\n')
403             s.send('JOIN #electrum\n')
404             sf = s.makefile('r', 0)
405             t = 0
406             while not stopping:
407                 line = sf.readline()
408                 line = line.rstrip('\r\n')
409                 line = line.split()
410                 if line[0]=='PING': 
411                     s.send('PONG '+line[1]+'\n')
412                 elif '353' in line: # answer to /names
413                     k = line.index('353')
414                     for item in line[k+1:]:
415                         if item[0:2] == 'E_':
416                             s.send('WHO %s\n'%item)
417                 elif '352' in line: # answer to /who
418                     # warning: this is a horrible hack which apparently works
419                     k = line.index('352')
420                     ip = line[k+4]
421                     ip = socket.gethostbyname(ip)
422                     name = line[k+6]
423                     host = line[k+9]
424                     peer_list[name] = (ip,host)
425                 if time.time() - t > 5*60:
426                     s.send('NAMES #electrum\n')
427                     t = time.time()
428                     peer_list = {}
429         except:
430             traceback.print_exc(file=sys.stdout)
431         finally:
432             sf.close()
433             s.close()
434
435
436 def get_peers_json(_,__):
437     return peer_list.values()
438
439 def http_server_thread():
440     # see http://code.google.com/p/jsonrpclib/
441     from SocketServer import ThreadingMixIn
442     from StratumJSONRPCServer import StratumJSONRPCServer
443     class StratumThreadedJSONRPCServer(ThreadingMixIn, StratumJSONRPCServer): pass
444     server = StratumThreadedJSONRPCServer(( config.get('server','host'), 8081))
445     server.register_function(get_peers_json, 'server.peers')
446     server.register_function(cmd_stop, 'stop')
447     server.register_function(cmd_load, 'load')
448     server.register_function(get_banner, 'server.banner')
449     server.register_function(lambda a,b,c: store.send_tx(c), 'transaction.broadcast')
450     server.register_function(address_get_history_json, 'address.get_history')
451     server.register_function(add_address_to_session_json, 'address.subscribe')
452     server.register_function(subscribe_to_numblocks_json, 'numblocks.subscribe')
453     server.register_function(client_version_json, 'client.version')
454     server.register_function(create_session_json, 'session.create')   # internal message (not part of protocol)
455     server.register_function(poll_session_json, 'session.poll')       # internal message (not part of protocol)
456     server.serve_forever()
457
458
459 if __name__ == '__main__':
460
461     if len(sys.argv)>1:
462         import jsonrpclib
463         server = jsonrpclib.Server('http://%s:8081'%config.get('server','host'))
464         cmd = sys.argv[1]
465         if cmd == 'load':
466             out = server.load(password)
467         elif cmd == 'peers':
468             out = server.server.peers()
469         elif cmd == 'stop':
470             out = server.stop(password)
471         elif cmd == 'clear_cache':
472             out = server.clear_cache(password)
473         elif cmd == 'get_cache':
474             out = server.get_cache(password,sys.argv[2])
475         elif cmd == 'h':
476             out = server.address.get_history(sys.argv[2])
477         elif cmd == 'tx':
478             out = server.transaction.broadcast(sys.argv[2])
479         elif cmd == 'b':
480             out = server.numblocks.subscribe()
481         else:
482             out = "Unknown command: '%s'" % cmd
483         print out
484         sys.exit(0)
485
486     # backend
487     store = abe_backend.AbeStore(config)
488
489     # supported protocols
490     thread.start_new_thread(native_server_thread, ())
491     thread.start_new_thread(clean_session_thread, ())
492
493     #thread.start_new_thread(http_server_thread, ())
494
495
496     processor = AbeProcessor()
497     shared = stratum.Shared()
498     # Bind shared to processor since constructor is user defined
499     processor.shared = shared
500     processor.start()
501
502     # Create various transports we need
503
504     #tcp stratum
505     tcpserver = stratum.TcpServer(shared, processor, "ecdsa.org",50001)
506     tcpserver.start()
507
508     #http stratum
509     from StratumJSONRPCServer import HttpServer
510     server = HttpServer(shared, processor, "ecdsa.org",8081)
511     server.start()
512
513
514     if (config.get('server','irc') == 'yes' ):
515         thread.start_new_thread(irc_thread, ())
516
517     print "starting Electrum server"
518     store.run(processor)
519     print "server stopped"
520