major reorganisation, http works now
[electrum-server.git] / server.py
1 #!/usr/bin/env python
2 # Copyright(C) 2012 thomasv@gitorious
3
4 # This program is free software: you can redistribute it and/or modify
5 # it under the terms of the GNU Affero General Public License as
6 # published by the Free Software Foundation, either version 3 of the
7 # License, or (at your option) any later version.
8 #
9 # This program is distributed in the hope that it will be useful, but
10 # WITHOUT ANY WARRANTY; without even the implied warranty of
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
12 # Affero General Public License for more details.
13 #
14 # You should have received a copy of the GNU Affero General Public
15 # License along with this program.  If not, see
16 # <http://www.gnu.org/licenses/agpl.html>.
17
18 """
19 Todo:
20    * server should check and return bitcoind status..
21    * improve txpoint sorting
22    * command to check cache
23
24  mempool transactions do not need to be added to the database; it slows it down
25 """
26
27 import abe_backend
28
29
30
31
32 import time, json, socket, operator, thread, ast, sys, re, traceback
33 import ConfigParser
34 from json import dumps, loads
35 import urllib
36 import threading
37
38 config = ConfigParser.ConfigParser()
39 # set some defaults, which will be overwritten by the config file
40 config.add_section('server')
41 config.set('server','banner', 'Welcome to Electrum!')
42 config.set('server', 'host', 'localhost')
43 config.set('server', 'port', '50000')
44 config.set('server', 'password', '')
45 config.set('server', 'irc', 'yes')
46 config.set('server', 'ircname', 'Electrum server')
47 config.add_section('database')
48 config.set('database', 'type', 'psycopg2')
49 config.set('database', 'database', 'abe')
50
51 try:
52     f = open('/etc/electrum.conf','r')
53     config.readfp(f)
54     f.close()
55 except:
56     print "Could not read electrum.conf. I will use the default values."
57
58 try:
59     f = open('/etc/electrum.banner','r')
60     config.set('server','banner', f.read())
61     f.close()
62 except:
63     pass
64
65
66 password = config.get('server','password')
67 stopping = False
68 sessions = {}
69
70
71
72 def random_string(N):
73     import random, string
74     return ''.join(random.choice(string.ascii_uppercase + string.digits) for x in range(N))
75
76
77
78 def modified_addresses(a_session):
79     #t1 = time.time()
80     import copy
81     session = copy.deepcopy(a_session)
82     addresses = session['addresses']
83     session['last_time'] = time.time()
84     ret = {}
85     k = 0
86     for addr in addresses:
87         status = store.get_status( addr )
88         msg_id, last_status = addresses.get( addr )
89         if last_status != status:
90             addresses[addr] = msg_id, status
91             ret[addr] = status
92
93     #t2 = time.time() - t1 
94     #if t2 > 10: print "high load:", session_id, "%d/%d"%(k,len(addresses)), t2
95     return ret, addresses
96
97
98 def poll_session(session_id): 
99     # native
100     session = sessions.get(session_id)
101     if session is None:
102         print time.asctime(), "session not found", session_id
103         return -1, {}
104     else:
105         sessions[session_id]['last_time'] = time.time()
106         ret, addresses = modified_addresses(session)
107         if ret: sessions[session_id]['addresses'] = addresses
108         return repr( (store.block_number,ret))
109
110
111 def add_address_to_session(session_id, address):
112     status = store.get_status(address)
113     sessions[session_id]['addresses'][address] = ("", status)
114     sessions[session_id]['last_time'] = time.time()
115     return status
116
117
118 def new_session(version, addresses):
119     session_id = random_string(10)
120     sessions[session_id] = { 'addresses':{}, 'version':version }
121     for a in addresses:
122         sessions[session_id]['addresses'][a] = ('','')
123     out = repr( (session_id, config.get('server','banner').replace('\\n','\n') ) )
124     sessions[session_id]['last_time'] = time.time()
125     return out
126
127
128 def update_session(session_id,addresses):
129     """deprecated in 0.42, wad replaced by add_address_to_session"""
130     sessions[session_id]['addresses'] = {}
131     for a in addresses:
132         sessions[session_id]['addresses'][a] = ''
133     sessions[session_id]['last_time'] = time.time()
134     return 'ok'
135
136
137 def native_server_thread():
138     s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
139     s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
140     s.bind((config.get('server','host'), config.getint('server','port')))
141     s.listen(1)
142     while not stopping:
143         conn, addr = s.accept()
144         try:
145             thread.start_new_thread(native_client_thread, (addr, conn,))
146         except:
147             # can't start new thread if there is no memory..
148             traceback.print_exc(file=sys.stdout)
149
150
151 def native_client_thread(ipaddr,conn):
152     #print "client thread", ipaddr
153     try:
154         ipaddr = ipaddr[0]
155         msg = ''
156         while 1:
157             d = conn.recv(1024)
158             msg += d
159             if not d: 
160                 break
161             if '#' in msg:
162                 msg = msg.split('#', 1)[0]
163                 break
164         try:
165             cmd, data = ast.literal_eval(msg)
166         except:
167             print "syntax error", repr(msg), ipaddr
168             conn.close()
169             return
170
171         out = do_command(cmd, data, ipaddr)
172         if out:
173             #print ipaddr, cmd, len(out)
174             try:
175                 conn.send(out)
176             except:
177                 print "error, could not send"
178
179     finally:
180         conn.close()
181
182
183 def timestr():
184     return time.strftime("[%d/%m/%Y-%H:%M:%S]")
185
186 # used by the native handler
187 def do_command(cmd, data, ipaddr):
188
189     if cmd=='b':
190         out = "%d"%block_number
191
192     elif cmd in ['session','new_session']:
193         try:
194             if cmd == 'session':
195                 addresses = ast.literal_eval(data)
196                 version = "old"
197             else:
198                 version, addresses = ast.literal_eval(data)
199                 if version[0]=="0": version = "v" + version
200         except:
201             print "error", data
202             return None
203         print timestr(), "new session", ipaddr, addresses[0] if addresses else addresses, len(addresses), version
204         out = new_session(version, addresses)
205
206     elif cmd=='address.subscribe':
207         try:
208             session_id, addr = ast.literal_eval(data)
209         except:
210             traceback.print_exc(file=sys.stdout)
211             print data
212             return None
213         out = add_address_to_session(session_id,addr)
214
215     elif cmd=='update_session':
216         try:
217             session_id, addresses = ast.literal_eval(data)
218         except:
219             traceback.print_exc(file=sys.stdout)
220             return None
221         print timestr(), "update session", ipaddr, addresses[0] if addresses else addresses, len(addresses)
222         out = update_session(session_id,addresses)
223             
224     elif cmd=='poll': 
225         out = poll_session(data)
226
227     elif cmd == 'h': 
228         address = data
229         out = repr( store.get_history( address ) )
230
231     elif cmd =='tx':
232         out = store.send_tx(data)
233         print timestr(), "sent tx:", ipaddr, out
234
235     elif cmd == 'peers':
236         out = repr(irc.get_peers())
237
238     else:
239         out = None
240
241     return out
242
243
244 def clean_session_thread():
245     while not stopping:
246         time.sleep(30)
247         t = time.time()
248         for k,s in sessions.items():
249             if s.get('type') == 'persistent': continue
250             t0 = s['last_time']
251             if t - t0 > 5*60:
252                 sessions.pop(k)
253                 print "lost session", k
254             
255
256 ####################################################################
257
258
259 from processor import Shared, Processor, Dispatcher
260 from stratum_http import HttpServer
261 from stratum import TcpServer
262
263 class AbeProcessor(Processor):
264     def process(self,request):
265         message_id = request['id']
266         method = request['method']
267         params = request.get('params',[])
268         #print request
269
270         result = ''
271         if method == 'numblocks.subscribe':
272             result = store.block_number
273         elif method == 'address.subscribe':
274             address = params[0]
275             store.watch_address(address)
276             status = store.get_status(address)
277             result = status
278         elif method == 'client.version':
279             #session.version = params[0]
280             pass
281         elif method == 'server.banner':
282             result = config.get('server','banner').replace('\\n','\n')
283         elif method == 'server.peers':
284             result = irc.get_peers()
285         elif method == 'address.get_history':
286             address = params[0]
287             result = store.get_history( address ) 
288         elif method == 'transaction.broadcast':
289             txo = store.send_tx(params[0])
290             print "sent tx:", txo
291             result = txo 
292         else:
293             print "unknown method", request
294
295         if result!='':
296             response = { 'id':message_id, 'method':method, 'params':params, 'result':result }
297             self.push_response(response)
298
299     def get_status(self,addr):
300         return store.get_status(addr)
301
302
303
304 ####################################################################
305
306
307
308 class Irc(threading.Thread):
309
310     def __init__(self, processor):
311         self.processor = processor
312         threading.Thread.__init__(self)
313         self.daemon = True
314         self.peers = {}
315
316     def get_peers(self):
317         return self.peers.values()
318
319     def run(self):
320         NICK = 'E_'+random_string(10)
321         while not self.processor.shared.stopped():
322             try:
323                 s = socket.socket()
324                 s.connect(('irc.freenode.net', 6667))
325                 s.send('USER electrum 0 * :'+config.get('server','host')+' '+config.get('server','ircname')+'\n')
326                 s.send('NICK '+NICK+'\n')
327                 s.send('JOIN #electrum\n')
328                 sf = s.makefile('r', 0)
329                 t = 0
330                 while not self.processor.shared.stopped():
331                     line = sf.readline()
332                     line = line.rstrip('\r\n')
333                     line = line.split()
334                     if line[0]=='PING': 
335                         s.send('PONG '+line[1]+'\n')
336                     elif '353' in line: # answer to /names
337                         k = line.index('353')
338                         for item in line[k+1:]:
339                             if item[0:2] == 'E_':
340                                 s.send('WHO %s\n'%item)
341                     elif '352' in line: # answer to /who
342                         # warning: this is a horrible hack which apparently works
343                         k = line.index('352')
344                         ip = line[k+4]
345                         ip = socket.gethostbyname(ip)
346                         name = line[k+6]
347                         host = line[k+9]
348                         self.peers[name] = (ip,host)
349                     if time.time() - t > 5*60:
350                         self.processor.push_response({'method':'server.peers', 'result':[self.get_peers()]})
351                         s.send('NAMES #electrum\n')
352                         t = time.time()
353                         self.peers = {}
354             except:
355                 traceback.print_exc(file=sys.stdout)
356             finally:
357                 sf.close()
358                 s.close()
359
360
361
362
363 if __name__ == '__main__':
364
365     if len(sys.argv)>1:
366         import jsonrpclib
367         server = jsonrpclib.Server('http://%s:8081'%config.get('server','host'))
368         cmd = sys.argv[1]
369         if cmd == 'load':
370             out = server.load(password)
371         elif cmd == 'peers':
372             out = server.server.peers()
373         elif cmd == 'stop':
374             out = server.stop(password)
375         elif cmd == 'clear_cache':
376             out = server.clear_cache(password)
377         elif cmd == 'get_cache':
378             out = server.get_cache(password,sys.argv[2])
379         elif cmd == 'h':
380             out = server.address.get_history(sys.argv[2])
381         elif cmd == 'tx':
382             out = server.transaction.broadcast(sys.argv[2])
383         elif cmd == 'b':
384             out = server.numblocks.subscribe()
385         else:
386             out = "Unknown command: '%s'" % cmd
387         print out
388         sys.exit(0)
389
390     # backend
391     store = abe_backend.AbeStore(config)
392
393     # old protocol
394     thread.start_new_thread(native_server_thread, ())
395     thread.start_new_thread(clean_session_thread, ())
396
397     processor = AbeProcessor()
398     shared = Shared()
399     # Bind shared to processor since constructor is user defined
400     processor.shared = shared
401     processor.start()
402     # dispatcher
403     dispatcher = Dispatcher(shared, processor)
404     dispatcher.start()
405     # Create various transports we need
406     transports = [ TcpServer(shared, processor, "ecdsa.org",50001),
407                    HttpServer(shared, processor, "ecdsa.org",8081)
408                    ]
409     for server in transports:
410         server.start()
411
412
413     if (config.get('server','irc') == 'yes' ):
414         irc = Irc(processor)
415         irc.start()
416
417
418     print "starting Electrum server"
419     store.run(processor)
420     print "server stopped"
421