mv client/* . && rm -fr client
[electrum-nvc.git] / interface.py
1 #!/usr/bin/env python
2 #
3 # Electrum - lightweight Bitcoin client
4 # Copyright (C) 2011 thomasv@gitorious
5 #
6 # This program is free software: you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation, either version 3 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful,
12 # but WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 # GNU General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program. If not, see <http://www.gnu.org/licenses/>.
18
19
20 import random, socket, ast, re
21 import threading, traceback, sys, time, json, Queue
22
23 DEFAULT_TIMEOUT = 5
24 DEFAULT_SERVERS = [ 'ecdsa.org:50001:t', 'electrum.novit.ro:50001:t', 'electrum.bitcoins.sk:50001:t']  # list of default servers
25
26
27 def old_to_new(s):
28     s = s.replace("'blk_hash'", "'block_hash'")
29     s = s.replace("'pos'", "'index'")
30     s = s.replace("'nTime'", "'timestamp'")
31     s = s.replace("'is_in'", "'is_input'")
32     s = s.replace("'raw_scriptPubKey'","'raw_output_script'")
33     return s
34
35
36 class Interface(threading.Thread):
37     def __init__(self, host, port):
38         threading.Thread.__init__(self)
39         self.daemon = True
40         self.host = host
41         self.port = port
42
43         self.servers = [] # actual list from IRC
44         self.rtime = 0
45
46         self.is_connected = True
47         self.poll_interval = 1
48
49         #json
50         self.message_id = 0
51         self.responses = Queue.Queue()
52         self.methods = {}
53
54     def poke(self):
55         # push a fake response so that the getting thread exits its loop
56         self.responses.put(None)
57
58     def queue_json_response(self, c):
59
60         #print "<--",c
61         msg_id = c.get('id')
62         error = c.get('error')
63         
64         if error:
65             print "received error:", c
66             return
67
68         if msg_id is not None:
69             method, params = self.methods.pop(msg_id)
70             result = c.get('result')
71         else:
72             # notification
73             method = c.get('method')
74             params = c.get('params')
75
76             if method == 'blockchain.numblocks.subscribe':
77                 result = params[0]
78                 params = []
79
80             elif method == 'blockchain.address.subscribe':
81                 addr = params[0]
82                 result = params[1]
83                 params = [addr]
84
85         self.responses.put({'method':method, 'params':params, 'result':result})
86
87
88
89     def subscribe(self, addresses):
90         messages = []
91         for addr in addresses:
92             messages.append(('blockchain.address.subscribe', [addr]))
93         self.send(messages)
94
95
96     def get_servers(self, wallet):
97         # loop over default servers
98         # requesting servers could be an independent process
99         addresses = wallet.all_addresses()
100         version = wallet.electrum_version
101
102         for server in DEFAULT_SERVERS:
103             print "connecting to", server
104             try:
105                 self.host = server
106                 self.start_session(addresses, version)
107                 wallet.host = self.host
108                 break
109             except socket.timeout:
110                 continue
111             except socket.error:
112                 continue
113             except:
114                 traceback.print_exc(file=sys.stdout)
115
116
117     def start_session(self, addresses, version):
118         #print "Starting new session: %s:%d"%(self.host,self.port)
119         self.send([('server.version', [version]), ('server.banner',[]), ('blockchain.numblocks.subscribe',[]), ('server.peers.subscribe',[])])
120         self.subscribe(addresses)
121
122
123 class PollingInterface(Interface):
124     """ non-persistent connection. synchronous calls"""
125
126     def __init__(self, host, port):
127         Interface.__init__(self, host, port)
128         self.session_id = None
129
130     def get_history(self, address):
131         self.send([('blockchain.address.get_history', [address] )])
132
133     def poll(self):
134         pass
135         #if is_new or wallet.remote_url:
136         #    self.was_updated = True
137         #    is_new = wallet.synchronize()
138         #    wallet.update_tx_history()
139         #    wallet.save()
140         #    return is_new
141         #else:
142         #    return False
143
144     def run(self):
145         self.is_connected = True
146         while self.is_connected:
147             try:
148                 if self.session_id:
149                     self.poll()
150                 time.sleep(self.poll_interval)
151             except socket.gaierror:
152                 break
153             except socket.error:
154                 break
155             except:
156                 traceback.print_exc(file=sys.stdout)
157                 break
158             
159         self.is_connected = False
160         self.poke()
161
162                 
163
164
165
166
167
168
169
170 class HttpStratumInterface(PollingInterface):
171
172     def poll(self):
173         self.send([])
174
175     def send(self, messages):
176         import urllib2, json, time, cookielib
177
178         cj = cookielib.CookieJar()
179         opener = urllib2.build_opener(urllib2.HTTPCookieProcessor(cj))
180         urllib2.install_opener(opener)
181
182         t1 = time.time()
183
184         data = []
185         for m in messages:
186             method, params = m
187             if type(params) != type([]): params = [params]
188             data.append( { 'method':method, 'id':self.message_id, 'params':params } )
189             self.methods[self.message_id] = method, params
190             self.message_id += 1
191
192         if data:
193             data_json = json.dumps(data)
194         else:
195             # poll with GET
196             data_json = None 
197
198         host = 'http://%s:%d'%( self.host, self.port )
199         headers = {'content-type': 'application/json'}
200         if self.session_id:
201             headers['cookie'] = 'SESSION=%s'%self.session_id
202
203         req = urllib2.Request(host, data_json, headers)
204         response_stream = urllib2.urlopen(req)
205
206         for index, cookie in enumerate(cj):
207             if cookie.name=='SESSION':
208                 self.session_id = cookie.value
209
210         response = response_stream.read()
211         if response: 
212             response = json.loads( response )
213             if type(response) is not type([]):
214                 self.queue_json_response(response)
215             else:
216                 for item in response:
217                     self.queue_json_response(item)
218
219         if response: 
220             self.poll_interval = 1
221         else:
222             if self.poll_interval < 15: 
223                 self.poll_interval += 1
224         #print self.poll_interval, response
225
226         self.rtime = time.time() - t1
227         self.is_connected = True
228
229
230
231
232 class TcpStratumInterface(Interface):
233     """json-rpc over persistent TCP connection, asynchronous"""
234
235     def __init__(self, host, port):
236         Interface.__init__(self, host, port)
237         self.s = socket.socket( socket.AF_INET, socket.SOCK_STREAM )
238         self.s.settimeout(5)
239         self.s.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1)
240         try:
241             self.s.connect(( self.host, self.port))
242             self.is_connected = True
243         except:
244             self.is_connected = False
245             print "not connected"
246
247     def run(self):
248         try:
249             out = ''
250             while self.is_connected:
251                 try: msg = self.s.recv(1024)
252                 except socket.timeout: 
253                     continue
254                 out += msg
255                 if msg == '': 
256                     self.is_connected = False
257                     print "disconnected."
258
259                 while True:
260                     s = out.find('\n')
261                     if s==-1: break
262                     c = out[0:s]
263                     out = out[s+1:]
264                     c = json.loads(c)
265                     self.queue_json_response(c)
266
267         except:
268             traceback.print_exc(file=sys.stdout)
269
270         self.is_connected = False
271         print "poking"
272         self.poke()
273
274     def send(self, messages):
275         out = ''
276         for m in messages:
277             method, params = m 
278             request = json.dumps( { 'id':self.message_id, 'method':method, 'params':params } )
279             self.methods[self.message_id] = method, params
280             #print "-->",request
281             self.message_id += 1
282             out += request + '\n'
283         self.s.send( out )
284
285     def get_history(self, addr):
286         self.send([('blockchain.address.get_history', [addr])])
287
288
289
290
291
292 class WalletSynchronizer(threading.Thread):
293
294     def __init__(self, wallet, loop=False):
295         threading.Thread.__init__(self)
296         self.daemon = True
297         self.wallet = wallet
298         self.loop = loop
299         self.start_interface()
300
301
302     def handle_response(self, r):
303         if r is None:
304             return
305
306         method = r['method']
307         params = r['params']
308         result = r['result']
309
310         if method == 'server.banner':
311             self.wallet.banner = result
312             self.wallet.was_updated = True
313
314         elif method == 'server.peers.subscribe':
315             servers = []
316             for item in result:
317                 s = []
318                 host = item[1]
319                 ports = []
320                 if len(item)>2:
321                     for v in item[2]:
322                         if re.match("[th]\d+",v):
323                             ports.append((v[0],v[1:]))
324                 if ports:
325                     servers.append( (host, ports) )
326             self.interface.servers = servers
327
328         elif method == 'blockchain.address.subscribe':
329             addr = params[0]
330             self.wallet.receive_status_callback(addr, result)
331                             
332         elif method == 'blockchain.address.get_history':
333             addr = params[0]
334             self.wallet.receive_history_callback(addr, result)
335             self.wallet.was_updated = True
336
337         elif method == 'blockchain.transaction.broadcast':
338             self.wallet.tx_result = result
339             self.wallet.tx_event.set()
340
341         elif method == 'blockchain.numblocks.subscribe':
342             self.wallet.blocks = result
343
344         elif method == 'server.version':
345             pass
346
347         else:
348             print "unknown message:", method, params, result
349
350
351     def start_interface(self):
352         try:
353             host, port, protocol = self.wallet.server.split(':')
354             port = int(port)
355         except:
356             self.wallet.pick_random_server()
357             host, port, protocol = self.wallet.server.split(':')
358             port = int(port)
359
360         #print protocol, host, port
361         if protocol == 't':
362             InterfaceClass = TcpStratumInterface
363         elif protocol == 'h':
364             InterfaceClass = HttpStratumInterface
365         else:
366             print "unknown protocol"
367             InterfaceClass = TcpStratumInterface
368
369         self.interface = InterfaceClass(host, port)
370         self.wallet.interface = self.interface
371
372         with self.wallet.lock:
373             self.wallet.addresses_waiting_for_status = []
374             self.wallet.addresses_waiting_for_history = []
375             addresses = self.wallet.all_addresses()
376             version = self.wallet.electrum_version
377             for addr in addresses:
378                 self.wallet.addresses_waiting_for_status.append(addr)
379
380         try:
381             self.interface.start()
382             self.interface.start_session(addresses,version)
383         except:
384             self.interface.is_connected = False
385
386
387     def run(self):
388         import socket, time
389         while True:
390             while self.interface.is_connected:
391                 new_addresses = self.wallet.synchronize()
392                 if new_addresses:
393                     self.interface.subscribe(new_addresses)
394                     for addr in new_addresses:
395                         with self.wallet.lock:
396                             self.wallet.addresses_waiting_for_status.append(addr)
397
398                 if self.wallet.is_up_to_date():
399                     if not self.wallet.up_to_date:
400                         self.wallet.up_to_date = True
401                         self.wallet.was_updated = True
402                         self.wallet.up_to_date_event.set()
403                 else:
404                     if self.wallet.up_to_date:
405                         self.wallet.up_to_date = False
406                         self.wallet.was_updated = True
407
408                 if self.wallet.was_updated:
409                     self.wallet.gui_callback()
410                     self.wallet.was_updated = False
411
412                 response = self.interface.responses.get()
413                 self.handle_response(response)
414
415             print "disconnected, gui callback"
416             self.wallet.gui_callback()
417             if self.loop:
418                 time.sleep(5)
419                 self.start_interface()
420                 continue
421             else:
422                 break
423
424
425