Implementing a better print_error routine
[electrum-nvc.git] / lib / interface.py
1 #!/usr/bin/env python
2 #
3 # Electrum - lightweight Bitcoin client
4 # Copyright (C) 2011 thomasv@gitorious
5 #
6 # This program is free software: you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation, either version 3 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful,
12 # but WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 # GNU General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program. If not, see <http://www.gnu.org/licenses/>.
18
19
20 import random, socket, ast, re
21 import threading, traceback, sys, time, json, Queue
22
23 from version import ELECTRUM_VERSION
24 from lib.util import print_error
25
26 DEFAULT_TIMEOUT = 5
27 DEFAULT_SERVERS = [ 'ecdsa.org:50001:t', 
28                     'electrum.novit.ro:50001:t', 
29                     'uncle-enzo.info:50001:t', 
30                     'electrum.bytesized-hosting.com:50000:t']  # list of default servers
31
32
33 def replace_keys(obj, old_key, new_key):
34     if isinstance(obj, dict):
35         if old_key in obj:
36             obj[new_key] = obj[old_key]
37             del obj[old_key]
38         for elem in obj.itervalues():
39             replace_keys(elem, old_key, new_key)
40     elif isinstance(obj, list):
41         for elem in obj:
42             replace_keys(elem, old_key, new_key)
43
44 def old_to_new(d):
45     replace_keys(d, 'blk_hash', 'block_hash')
46     replace_keys(d, 'pos', 'index')
47     replace_keys(d, 'nTime', 'timestamp')
48     replace_keys(d, 'is_in', 'is_input')
49     replace_keys(d, 'raw_scriptPubKey', 'raw_output_script')
50
51
52 class Interface(threading.Thread):
53     def __init__(self, host, port, debug_server):
54         threading.Thread.__init__(self)
55         self.daemon = True
56         self.host = host
57         self.port = port
58
59         self.servers = [] # actual list from IRC
60         self.rtime = 0
61         self.bytes_received = 0
62
63         self.is_connected = True
64         self.poll_interval = 1
65
66         #json
67         self.message_id = 0
68         self.responses = Queue.Queue()
69         self.unanswered_requests = {}
70
71         self.debug_server = debug_server
72
73     def init_socket(self):
74         pass
75
76     def poke(self):
77         # push a fake response so that the getting thread exits its loop
78         self.responses.put(None)
79
80     def queue_json_response(self, c):
81
82         if self.debug_server:
83           print "<--",c
84
85         msg_id = c.get('id')
86         error = c.get('error')
87         
88         if error:
89             print "received error:", c
90             return
91
92         if msg_id is not None:
93             method, params = self.unanswered_requests.pop(msg_id)
94             result = c.get('result')
95         else:
96             # notification
97             method = c.get('method')
98             params = c.get('params')
99
100             if method == 'blockchain.numblocks.subscribe':
101                 result = params[0]
102                 params = []
103
104             elif method == 'blockchain.address.subscribe':
105                 addr = params[0]
106                 result = params[1]
107                 params = [addr]
108
109         self.responses.put({'method':method, 'params':params, 'result':result})
110
111
112
113     def subscribe(self, addresses):
114         messages = []
115         for addr in addresses:
116             messages.append(('blockchain.address.subscribe', [addr]))
117         self.send(messages)
118
119
120
121
122 class PollingInterface(Interface):
123     """ non-persistent connection. synchronous calls"""
124
125     def __init__(self, host, port, debug_server):
126         Interface.__init__(self, host, port, debug_server)
127         self.session_id = None
128         self.debug_server = debug_server
129
130     def get_history(self, address):
131         self.send([('blockchain.address.get_history', [address] )])
132
133     def poll(self):
134         pass
135         #if is_new or wallet.remote_url:
136         #    self.was_updated = True
137         #    is_new = wallet.synchronize()
138         #    wallet.update_tx_history()
139         #    wallet.save()
140         #    return is_new
141         #else:
142         #    return False
143
144     def run(self):
145         self.is_connected = True
146         while self.is_connected:
147             try:
148                 if self.session_id:
149                     self.poll()
150                 time.sleep(self.poll_interval)
151             except socket.gaierror:
152                 break
153             except socket.error:
154                 break
155             except:
156                 traceback.print_exc(file=sys.stdout)
157                 break
158             
159         self.is_connected = False
160         self.poke()
161
162                 
163
164
165
166
167
168
169
170 class HttpStratumInterface(PollingInterface):
171
172     def poll(self):
173         self.send([])
174
175     def send(self, messages):
176         import urllib2, json, time, cookielib
177
178         cj = cookielib.CookieJar()
179         opener = urllib2.build_opener(urllib2.HTTPCookieProcessor(cj))
180         urllib2.install_opener(opener)
181
182         t1 = time.time()
183
184         data = []
185         for m in messages:
186             method, params = m
187             if type(params) != type([]): params = [params]
188             data.append( { 'method':method, 'id':self.message_id, 'params':params } )
189             self.unanswered_requests[self.message_id] = method, params
190             self.message_id += 1
191
192         if data:
193             data_json = json.dumps(data)
194         else:
195             # poll with GET
196             data_json = None 
197
198         host = 'http://%s:%d'%( self.host, self.port )
199         headers = {'content-type': 'application/json'}
200         if self.session_id:
201             headers['cookie'] = 'SESSION=%s'%self.session_id
202
203         req = urllib2.Request(host, data_json, headers)
204         response_stream = urllib2.urlopen(req)
205
206         for index, cookie in enumerate(cj):
207             if cookie.name=='SESSION':
208                 self.session_id = cookie.value
209
210         response = response_stream.read()
211         self.bytes_received += len(response)
212         if response: 
213             response = json.loads( response )
214             if type(response) is not type([]):
215                 self.queue_json_response(response)
216             else:
217                 for item in response:
218                     self.queue_json_response(item)
219
220         if response: 
221             self.poll_interval = 1
222         else:
223             if self.poll_interval < 15: 
224                 self.poll_interval += 1
225         #print self.poll_interval, response
226
227         self.rtime = time.time() - t1
228         self.is_connected = True
229
230
231
232
233 class TcpStratumInterface(Interface):
234     """json-rpc over persistent TCP connection, asynchronous"""
235
236     def __init__(self, host, port, debug_server):
237         Interface.__init__(self, host, port, debug_server)
238         self.debug_server = debug_server
239
240     def init_socket(self):
241         self.s = socket.socket( socket.AF_INET, socket.SOCK_STREAM )
242         self.s.settimeout(60)
243         self.s.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1)
244         try:
245             self.s.connect(( self.host, self.port))
246             self.is_connected = True
247             self.send([('server.version', [ELECTRUM_VERSION])])
248             print "Connected to %s:%d"%(self.host,self.port)
249         except:
250             self.is_connected = False
251             print_error("Not connected")
252
253     def run(self):
254         try:
255             out = ''
256             while self.is_connected:
257                 try: msg = self.s.recv(1024)
258                 except socket.timeout:
259                     # ping the server with server.version, as a real ping does not exist yet
260                     self.send([('server.version', [ELECTRUM_VERSION])])
261                     continue
262                 out += msg
263                 self.bytes_received += len(msg)
264                 if msg == '': 
265                     self.is_connected = False
266                     print "Disconnected."
267
268                 while True:
269                     s = out.find('\n')
270                     if s==-1: break
271                     c = out[0:s]
272                     out = out[s+1:]
273                     c = json.loads(c)
274                     self.queue_json_response(c)
275
276         except:
277             traceback.print_exc(file=sys.stdout)
278
279         self.is_connected = False
280         print "Poking"
281         self.poke()
282
283     def send(self, messages):
284         out = ''
285         for m in messages:
286             method, params = m 
287             request = json.dumps( { 'id':self.message_id, 'method':method, 'params':params } )
288             self.unanswered_requests[self.message_id] = method, params
289
290             if self.debug_server:
291               print "-->",request
292
293             self.message_id += 1
294             out += request + '\n'
295
296         while out:
297             sent = self.s.send( out )
298             out = out[sent:]
299
300     def get_history(self, addr):
301         self.send([('blockchain.address.get_history', [addr])])
302
303
304
305
306
307 class WalletSynchronizer(threading.Thread):
308
309     def __init__(self, wallet, loop=False):
310         threading.Thread.__init__(self)
311         self.daemon = True
312         self.wallet = wallet
313         self.loop = loop
314         self.init_interface()
315
316     def init_interface(self):
317         try:
318             host, port, protocol = self.wallet.server.split(':')
319             port = int(port)
320         except:
321             self.wallet.pick_random_server()
322             host, port, protocol = self.wallet.server.split(':')
323             port = int(port)
324
325         #print protocol, host, port
326         if protocol == 't':
327             InterfaceClass = TcpStratumInterface
328         elif protocol == 'h':
329             InterfaceClass = HttpStratumInterface
330         else:
331             print_error("Error: Unknown protocol")
332             InterfaceClass = TcpStratumInterface
333
334         self.interface = InterfaceClass(host, port, self.wallet.debug_server)
335         self.wallet.interface = self.interface
336
337
338     def handle_response(self, r):
339         if r is None:
340             return
341
342         method = r['method']
343         params = r['params']
344         result = r['result']
345
346         if method == 'server.banner':
347             self.wallet.banner = result
348             self.wallet.was_updated = True
349
350         elif method == 'server.peers.subscribe':
351             servers = []
352             for item in result:
353                 s = []
354                 host = item[1]
355                 ports = []
356                 version = None
357                 if len(item)>2:
358                     for v in item[2]:
359                         if re.match("[th]\d+",v):
360                             ports.append((v[0],v[1:]))
361                         if re.match("v(.?)+",v):
362                             version = v[1:]
363                 if ports and version:
364                     servers.append( (host, ports) )
365             self.interface.servers = servers
366
367         elif method == 'blockchain.address.subscribe':
368             addr = params[0]
369             self.wallet.receive_status_callback(addr, result)
370                             
371         elif method == 'blockchain.address.get_history':
372             addr = params[0]
373             self.wallet.receive_history_callback(addr, result)
374             self.wallet.was_updated = True
375
376         elif method == 'blockchain.transaction.broadcast':
377             self.wallet.tx_result = result
378             self.wallet.tx_event.set()
379
380         elif method == 'blockchain.numblocks.subscribe':
381             self.wallet.blocks = result
382             self.wallet.was_updated = True
383
384         elif method == 'server.version':
385             pass
386
387         else:
388             print_error("Error: Unknown message:" + method + ", " + params + ", " + result)
389
390
391     def start_interface(self):
392         self.interface.init_socket()
393         self.interface.start()
394         if self.interface.is_connected:
395             self.wallet.start_session(self.interface)
396
397
398
399     def run(self):
400         import socket, time
401         self.start_interface()
402         while True:
403             while self.interface.is_connected:
404                 new_addresses = self.wallet.synchronize()
405                 if new_addresses:
406                     self.interface.subscribe(new_addresses)
407
408                 if self.wallet.is_up_to_date():
409                     if not self.wallet.up_to_date:
410                         self.wallet.up_to_date = True
411                         self.wallet.was_updated = True
412                         self.wallet.up_to_date_event.set()
413                 else:
414                     if self.wallet.up_to_date:
415                         self.wallet.up_to_date = False
416                         self.wallet.was_updated = True
417
418                 if self.wallet.was_updated:
419                     self.wallet.trigger_callbacks()
420                     self.wallet.was_updated = False
421
422                 response = self.interface.responses.get()
423                 self.handle_response(response)
424
425             self.wallet.trigger_callbacks()
426             if self.loop:
427                 time.sleep(5)
428                 self.init_interface()
429                 self.start_interface()
430                 continue
431             else:
432                 break
433
434
435