bugfix: poke to reconnect
[electrum-nvc.git] / lib / interface.py
1 #!/usr/bin/env python
2 #
3 # Electrum - lightweight Bitcoin client
4 # Copyright (C) 2011 thomasv@gitorious
5 #
6 # This program is free software: you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation, either version 3 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful,
12 # but WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 # GNU General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program. If not, see <http://www.gnu.org/licenses/>.
18
19
20 import random, socket, ast, re
21 import threading, traceback, sys, time, json, Queue
22
23 from version import ELECTRUM_VERSION
24
25 DEFAULT_TIMEOUT = 5
26 DEFAULT_SERVERS = [ 'ecdsa.org:50001:t', 'electrum.novit.ro:50001:t', 'electrum.bitcoins.sk:50001:t']  # list of default servers
27
28
29 def old_to_new(s):
30     s = s.replace("'blk_hash'", "'block_hash'")
31     s = s.replace("'pos'", "'index'")
32     s = s.replace("'nTime'", "'timestamp'")
33     s = s.replace("'is_in'", "'is_input'")
34     s = s.replace("'raw_scriptPubKey'","'raw_output_script'")
35     return s
36
37
38 class Interface(threading.Thread):
39     def __init__(self, host, port):
40         threading.Thread.__init__(self)
41         self.daemon = True
42         self.host = host
43         self.port = port
44
45         self.servers = [] # actual list from IRC
46         self.rtime = 0
47
48         self.is_connected = True
49         self.poll_interval = 1
50
51         #json
52         self.message_id = 0
53         self.responses = Queue.Queue()
54         self.unanswered_requests = {}
55
56     def poke(self):
57         # push a fake response so that the getting thread exits its loop
58         self.responses.put(None)
59
60     def queue_json_response(self, c):
61
62         #print "<--",c
63         msg_id = c.get('id')
64         error = c.get('error')
65         
66         if error:
67             print "received error:", c
68             return
69
70         if msg_id is not None:
71             method, params = self.unanswered_requests.pop(msg_id)
72             result = c.get('result')
73         else:
74             # notification
75             method = c.get('method')
76             params = c.get('params')
77
78             if method == 'blockchain.numblocks.subscribe':
79                 result = params[0]
80                 params = []
81
82             elif method == 'blockchain.address.subscribe':
83                 addr = params[0]
84                 result = params[1]
85                 params = [addr]
86
87         self.responses.put({'method':method, 'params':params, 'result':result})
88
89
90
91     def subscribe(self, addresses):
92         messages = []
93         for addr in addresses:
94             messages.append(('blockchain.address.subscribe', [addr]))
95         self.send(messages)
96
97
98
99
100 class PollingInterface(Interface):
101     """ non-persistent connection. synchronous calls"""
102
103     def __init__(self, host, port):
104         Interface.__init__(self, host, port)
105         self.session_id = None
106
107     def get_history(self, address):
108         self.send([('blockchain.address.get_history', [address] )])
109
110     def poll(self):
111         pass
112         #if is_new or wallet.remote_url:
113         #    self.was_updated = True
114         #    is_new = wallet.synchronize()
115         #    wallet.update_tx_history()
116         #    wallet.save()
117         #    return is_new
118         #else:
119         #    return False
120
121     def run(self):
122         self.is_connected = True
123         while self.is_connected:
124             try:
125                 if self.session_id:
126                     self.poll()
127                 time.sleep(self.poll_interval)
128             except socket.gaierror:
129                 break
130             except socket.error:
131                 break
132             except:
133                 traceback.print_exc(file=sys.stdout)
134                 break
135             
136         self.is_connected = False
137         self.poke()
138
139                 
140
141
142
143
144
145
146
147 class HttpStratumInterface(PollingInterface):
148
149     def poll(self):
150         self.send([])
151
152     def send(self, messages):
153         import urllib2, json, time, cookielib
154
155         cj = cookielib.CookieJar()
156         opener = urllib2.build_opener(urllib2.HTTPCookieProcessor(cj))
157         urllib2.install_opener(opener)
158
159         t1 = time.time()
160
161         data = []
162         for m in messages:
163             method, params = m
164             if type(params) != type([]): params = [params]
165             data.append( { 'method':method, 'id':self.message_id, 'params':params } )
166             self.unanswered_requests[self.message_id] = method, params
167             self.message_id += 1
168
169         if data:
170             data_json = json.dumps(data)
171         else:
172             # poll with GET
173             data_json = None 
174
175         host = 'http://%s:%d'%( self.host, self.port )
176         headers = {'content-type': 'application/json'}
177         if self.session_id:
178             headers['cookie'] = 'SESSION=%s'%self.session_id
179
180         req = urllib2.Request(host, data_json, headers)
181         response_stream = urllib2.urlopen(req)
182
183         for index, cookie in enumerate(cj):
184             if cookie.name=='SESSION':
185                 self.session_id = cookie.value
186
187         response = response_stream.read()
188         if response: 
189             response = json.loads( response )
190             if type(response) is not type([]):
191                 self.queue_json_response(response)
192             else:
193                 for item in response:
194                     self.queue_json_response(item)
195
196         if response: 
197             self.poll_interval = 1
198         else:
199             if self.poll_interval < 15: 
200                 self.poll_interval += 1
201         #print self.poll_interval, response
202
203         self.rtime = time.time() - t1
204         self.is_connected = True
205
206
207
208
209 class TcpStratumInterface(Interface):
210     """json-rpc over persistent TCP connection, asynchronous"""
211
212     def __init__(self, host, port):
213         Interface.__init__(self, host, port)
214         self.s = socket.socket( socket.AF_INET, socket.SOCK_STREAM )
215         self.s.settimeout(5*60)
216         self.s.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1)
217         try:
218             self.s.connect(( self.host, self.port))
219             self.is_connected = True
220             self.send([('server.version', [ELECTRUM_VERSION])])
221             print "Connected to %s:%d"%(self.host,self.port)
222         except:
223             self.is_connected = False
224             print "Not connected"
225
226     def run(self):
227         try:
228             out = ''
229             while self.is_connected:
230                 try: msg = self.s.recv(1024)
231                 except socket.timeout:
232                     # ping the server with server.version, as a real ping does not exist yet
233                     self.send([('server.version', [ELECTRUM_VERSION])])
234                     continue
235                 out += msg
236                 if msg == '': 
237                     self.is_connected = False
238                     print "disconnected."
239
240                 while True:
241                     s = out.find('\n')
242                     if s==-1: break
243                     c = out[0:s]
244                     out = out[s+1:]
245                     c = json.loads(c)
246                     self.queue_json_response(c)
247
248         except:
249             traceback.print_exc(file=sys.stdout)
250
251         self.is_connected = False
252         print "poking"
253         self.poke()
254
255     def send(self, messages):
256         out = ''
257         for m in messages:
258             method, params = m 
259             request = json.dumps( { 'id':self.message_id, 'method':method, 'params':params } )
260             self.unanswered_requests[self.message_id] = method, params
261             #print "-->",request
262             self.message_id += 1
263             out += request + '\n'
264         self.s.send( out )
265
266     def get_history(self, addr):
267         self.send([('blockchain.address.get_history', [addr])])
268
269
270
271
272
273 class WalletSynchronizer(threading.Thread):
274
275     def __init__(self, wallet, loop=False):
276         threading.Thread.__init__(self)
277         self.daemon = True
278         self.wallet = wallet
279         self.loop = loop
280         self.start_interface()
281
282
283     def handle_response(self, r):
284         if r is None:
285             return
286
287         method = r['method']
288         params = r['params']
289         result = r['result']
290
291         if method == 'server.banner':
292             self.wallet.banner = result
293             self.wallet.was_updated = True
294
295         elif method == 'server.peers.subscribe':
296             servers = []
297             for item in result:
298                 s = []
299                 host = item[1]
300                 ports = []
301                 if len(item)>2:
302                     for v in item[2]:
303                         if re.match("[th]\d+",v):
304                             ports.append((v[0],v[1:]))
305                 if ports:
306                     servers.append( (host, ports) )
307             self.interface.servers = servers
308
309         elif method == 'blockchain.address.subscribe':
310             addr = params[0]
311             self.wallet.receive_status_callback(addr, result)
312                             
313         elif method == 'blockchain.address.get_history':
314             addr = params[0]
315             self.wallet.receive_history_callback(addr, result)
316             self.wallet.was_updated = True
317
318         elif method == 'blockchain.transaction.broadcast':
319             self.wallet.tx_result = result
320             self.wallet.tx_event.set()
321
322         elif method == 'blockchain.numblocks.subscribe':
323             self.wallet.blocks = result
324
325         elif method == 'server.version':
326             pass
327
328         else:
329             print "unknown message:", method, params, result
330
331
332     def start_interface(self):
333         try:
334             host, port, protocol = self.wallet.server.split(':')
335             port = int(port)
336         except:
337             self.wallet.pick_random_server()
338             host, port, protocol = self.wallet.server.split(':')
339             port = int(port)
340
341         #print protocol, host, port
342         if protocol == 't':
343             InterfaceClass = TcpStratumInterface
344         elif protocol == 'h':
345             InterfaceClass = HttpStratumInterface
346         else:
347             print "unknown protocol"
348             InterfaceClass = TcpStratumInterface
349
350         self.interface = InterfaceClass(host, port)
351         self.interface.start()
352         self.wallet.interface = self.interface
353
354         if self.interface.is_connected:
355             self.wallet.start_session(self.interface)
356
357
358
359     def run(self):
360         import socket, time
361         while True:
362             while self.interface.is_connected:
363                 new_addresses = self.wallet.synchronize()
364                 if new_addresses:
365                     self.interface.subscribe(new_addresses)
366
367                 if self.wallet.is_up_to_date():
368                     if not self.wallet.up_to_date:
369                         self.wallet.up_to_date = True
370                         self.wallet.was_updated = True
371                         self.wallet.up_to_date_event.set()
372                 else:
373                     if self.wallet.up_to_date:
374                         self.wallet.up_to_date = False
375                         self.wallet.was_updated = True
376
377                 if self.wallet.was_updated:
378                     self.wallet.gui_callback()
379                     self.wallet.was_updated = False
380
381                 response = self.interface.responses.get()
382                 self.handle_response(response)
383
384             self.wallet.gui_callback()
385             if self.loop:
386                 time.sleep(5)
387                 self.start_interface()
388                 continue
389             else:
390                 break
391
392
393