Revert "Revert "Added proxy options to network dialog""
[electrum-nvc.git] / lib / interface.py
1 #!/usr/bin/env python
2 #
3 # Electrum - lightweight Bitcoin client
4 # Copyright (C) 2011 thomasv@gitorious
5 #
6 # This program is free software: you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation, either version 3 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful,
12 # but WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 # GNU General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program. If not, see <http://www.gnu.org/licenses/>.
18
19
20 import random, socket, ast, re
21 import threading, traceback, sys, time, json, Queue
22
23 from version import ELECTRUM_VERSION
24 from util import print_error
25
26 DEFAULT_TIMEOUT = 5
27 DEFAULT_SERVERS = [ 'ecdsa.org:50001:t', 
28                     'electrum.novit.ro:50001:t', 
29                     'uncle-enzo.info:50001:t', 
30                     'electrum.bytesized-hosting.com:50001:t']  # list of default servers
31
32 proxy_modes = ['none', 'socks4', 'socks5', 'http' ]
33
34 def replace_keys(obj, old_key, new_key):
35     if isinstance(obj, dict):
36         if old_key in obj:
37             obj[new_key] = obj[old_key]
38             del obj[old_key]
39         for elem in obj.itervalues():
40             replace_keys(elem, old_key, new_key)
41     elif isinstance(obj, list):
42         for elem in obj:
43             replace_keys(elem, old_key, new_key)
44
45 def old_to_new(d):
46     replace_keys(d, 'blk_hash', 'block_hash')
47     replace_keys(d, 'pos', 'index')
48     replace_keys(d, 'nTime', 'timestamp')
49     replace_keys(d, 'is_in', 'is_input')
50     replace_keys(d, 'raw_scriptPubKey', 'raw_output_script')
51
52 def parse_proxy_options(s):
53     proxy = { "mode":"socks5", "host":"localhost" }
54     args = s.split(':')
55     n = 0
56     if proxy_modes.count(args[n]) == 1:
57         proxy["mode"] = args[n]
58         n += 1
59     if len(args) > n:
60         proxy["host"] = args[n]
61         n += 1
62     if len(args) > n:
63         proxy["port"] = args[n]
64     else:
65         proxy["port"] = "8080" if proxy["mode"] == "http" else "1080"
66     return proxy
67
68 class Interface(threading.Thread):
69     def __init__(self, host, port, debug_server, proxy):
70         threading.Thread.__init__(self)
71         self.daemon = True
72         self.host = host
73         self.port = port
74         self.proxy = proxy
75
76         self.servers = [] # actual list from IRC
77         self.rtime = 0
78         self.bytes_received = 0
79
80         self.is_connected = True
81         self.poll_interval = 1
82
83         #json
84         self.message_id = 0
85         self.responses = Queue.Queue()
86         self.unanswered_requests = {}
87
88         self.debug_server = debug_server
89
90     def init_socket(self):
91         pass
92
93     def poke(self):
94         # push a fake response so that the getting thread exits its loop
95         self.responses.put(None)
96
97     def queue_json_response(self, c):
98
99         if self.debug_server:
100           print "<--",c
101
102         msg_id = c.get('id')
103         error = c.get('error')
104         
105         if error:
106             print "received error:", c
107             return
108
109         if msg_id is not None:
110             method, params = self.unanswered_requests.pop(msg_id)
111             result = c.get('result')
112         else:
113             # notification
114             method = c.get('method')
115             params = c.get('params')
116
117             if method == 'blockchain.numblocks.subscribe':
118                 result = params[0]
119                 params = []
120
121             elif method == 'blockchain.address.subscribe':
122                 addr = params[0]
123                 result = params[1]
124                 params = [addr]
125
126         self.responses.put({'method':method, 'params':params, 'result':result})
127
128
129
130     def subscribe(self, addresses):
131         messages = []
132         for addr in addresses:
133             messages.append(('blockchain.address.subscribe', [addr]))
134         self.send(messages)
135
136
137
138
139 class PollingInterface(Interface):
140     """ non-persistent connection. synchronous calls"""
141
142     def __init__(self, host, port, debug_server, proxy):
143         Interface.__init__(self, host, port, debug_server, proxy)
144         self.session_id = None
145         self.debug_server = debug_server
146
147     def get_history(self, address):
148         self.send([('blockchain.address.get_history', [address] )])
149
150     def poll(self):
151         pass
152         #if is_new or wallet.remote_url:
153         #    self.was_updated = True
154         #    is_new = wallet.synchronize()
155         #    wallet.update_tx_history()
156         #    wallet.save()
157         #    return is_new
158         #else:
159         #    return False
160
161     def run(self):
162         self.is_connected = True
163         while self.is_connected:
164             try:
165                 if self.session_id:
166                     self.poll()
167                 time.sleep(self.poll_interval)
168             except socket.gaierror:
169                 break
170             except socket.error:
171                 break
172             except:
173                 traceback.print_exc(file=sys.stdout)
174                 break
175             
176         self.is_connected = False
177         self.poke()
178
179                 
180
181
182
183
184
185
186
187 class HttpStratumInterface(PollingInterface):
188
189     def poll(self):
190         self.send([])
191
192     def send(self, messages):
193         import urllib2, json, time, cookielib
194         
195         if self.proxy["mode"] != "none":
196             import socks
197             socks.setdefaultproxy(proxy_modes.index(self.proxy["mode"]), self.proxy["host"], int(self.proxy["port"]) )
198             socks.wrapmodule(urllib2)
199         cj = cookielib.CookieJar()
200         opener = urllib2.build_opener(urllib2.HTTPCookieProcessor(cj))
201         urllib2.install_opener(opener)
202
203         t1 = time.time()
204
205         data = []
206         for m in messages:
207             method, params = m
208             if type(params) != type([]): params = [params]
209             data.append( { 'method':method, 'id':self.message_id, 'params':params } )
210             self.unanswered_requests[self.message_id] = method, params
211             self.message_id += 1
212
213         if data:
214             data_json = json.dumps(data)
215         else:
216             # poll with GET
217             data_json = None 
218
219         host = 'http://%s:%d'%( self.host, self.port )
220         headers = {'content-type': 'application/json'}
221         if self.session_id:
222             headers['cookie'] = 'SESSION=%s'%self.session_id
223
224         req = urllib2.Request(host, data_json, headers)
225         response_stream = urllib2.urlopen(req)
226
227         for index, cookie in enumerate(cj):
228             if cookie.name=='SESSION':
229                 self.session_id = cookie.value
230
231         response = response_stream.read()
232         self.bytes_received += len(response)
233         if response: 
234             response = json.loads( response )
235             if type(response) is not type([]):
236                 self.queue_json_response(response)
237             else:
238                 for item in response:
239                     self.queue_json_response(item)
240
241         if response: 
242             self.poll_interval = 1
243         else:
244             if self.poll_interval < 15: 
245                 self.poll_interval += 1
246         #print self.poll_interval, response
247
248         self.rtime = time.time() - t1
249         self.is_connected = True
250
251
252
253
254 class TcpStratumInterface(Interface):
255     """json-rpc over persistent TCP connection, asynchronous"""
256
257     def __init__(self, host, port, debug_server, proxy):
258         Interface.__init__(self, host, port, debug_server, proxy)
259         self.debug_server = debug_server
260
261     def init_socket(self):
262         global proxy_modes
263         if self.proxy["mode"] == "none":
264             self.s = socket.socket( socket.AF_INET, socket.SOCK_STREAM )
265         else:
266             import socks
267             self.s = socks.socksocket()
268             print "Using Proxy", self.proxy
269             self.s.setproxy(proxy_modes.index(self.proxy["mode"]), self.proxy["host"], int(self.proxy["port"]) )
270         self.s.settimeout(60)
271         self.s.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1)
272         try:
273             self.s.connect(( self.host.encode('ascii'), int(self.port)))
274             self.is_connected = True
275             self.send([('server.version', [ELECTRUM_VERSION])])
276             print "Connected to %s:%d"%(self.host,self.port)
277         except:
278             self.is_connected = False
279             print_error("Not connected")
280
281     def run(self):
282         try:
283             out = ''
284             while self.is_connected:
285                 try: msg = self.s.recv(1024)
286                 except socket.timeout:
287                     # ping the server with server.version, as a real ping does not exist yet
288                     self.send([('server.version', [ELECTRUM_VERSION])])
289                     continue
290                 out += msg
291                 self.bytes_received += len(msg)
292                 if msg == '': 
293                     self.is_connected = False
294                     print "Disconnected."
295
296                 while True:
297                     s = out.find('\n')
298                     if s==-1: break
299                     c = out[0:s]
300                     out = out[s+1:]
301                     c = json.loads(c)
302                     self.queue_json_response(c)
303
304         except:
305             traceback.print_exc(file=sys.stdout)
306
307         self.is_connected = False
308         print "Poking"
309         self.poke()
310
311     def send(self, messages):
312         out = ''
313         for m in messages:
314             method, params = m 
315             request = json.dumps( { 'id':self.message_id, 'method':method, 'params':params } )
316             self.unanswered_requests[self.message_id] = method, params
317
318             if self.debug_server:
319               print "-->",request
320
321             self.message_id += 1
322             out += request + '\n'
323
324         while out:
325             sent = self.s.send( out )
326             out = out[sent:]
327
328     def get_history(self, addr):
329         self.send([('blockchain.address.get_history', [addr])])
330
331
332
333
334
335 class WalletSynchronizer(threading.Thread):
336
337     def __init__(self, wallet, loop=False, servers_loaded_callback=None, proxy=None):
338         threading.Thread.__init__(self)
339         self.daemon = True
340         self.wallet = wallet
341         self.loop = loop
342         self.proxy = proxy
343         self.init_interface()
344         self.servers_loaded_callback = servers_loaded_callback
345         
346
347     def init_interface(self):
348         try:
349             host, port, protocol = self.wallet.server.split(':')
350             port = int(port)
351         except:
352             self.wallet.pick_random_server()
353             host, port, protocol = self.wallet.server.split(':')
354             port = int(port)
355
356         #print protocol, host, port
357         if protocol == 't':
358             InterfaceClass = TcpStratumInterface
359         elif protocol == 'h':
360             InterfaceClass = HttpStratumInterface
361         else:
362             print_error("Error: Unknown protocol")
363             InterfaceClass = TcpStratumInterface
364
365         self.interface = InterfaceClass(host, port, self.wallet.debug_server, self.proxy)
366         self.wallet.interface = self.interface
367
368     def handle_response(self, r):
369         if r is None:
370             return
371
372         method = r['method']
373         params = r['params']
374         result = r['result']
375
376         if method == 'server.banner':
377             self.wallet.banner = result
378             self.wallet.was_updated = True
379
380         elif method == 'server.peers.subscribe':
381             servers = []
382             for item in result:
383                 s = []
384                 host = item[1]
385                 ports = []
386                 version = None
387                 if len(item) > 2:
388                     for v in item[2]:
389                         if re.match("[th]\d+", v):
390                             ports.append((v[0], v[1:]))
391                         if re.match("v(.?)+", v):
392                             version = v[1:]
393                 if ports and version:
394                     servers.append((host, ports))
395             self.interface.servers = servers
396             # servers_loaded_callback is None for commands, but should
397             # NEVER be None when using the GUI.
398             if self.servers_loaded_callback is not None:
399                 self.servers_loaded_callback()
400
401         elif method == 'blockchain.address.subscribe':
402             addr = params[0]
403             self.wallet.receive_status_callback(addr, result)
404                             
405         elif method == 'blockchain.address.get_history':
406             addr = params[0]
407             self.wallet.receive_history_callback(addr, result)
408             self.wallet.was_updated = True
409
410         elif method == 'blockchain.transaction.broadcast':
411             self.wallet.tx_result = result
412             self.wallet.tx_event.set()
413
414         elif method == 'blockchain.numblocks.subscribe':
415             self.wallet.blocks = result
416             self.wallet.was_updated = True
417
418         elif method == 'server.version':
419             pass
420
421         else:
422             print_error("Error: Unknown message:" + method + ", " + params + ", " + result)
423
424
425     def start_interface(self):
426         self.interface.init_socket()
427         self.interface.start()
428         if self.interface.is_connected:
429             self.wallet.start_session(self.interface)
430
431
432
433     def run(self):
434         import socket, time
435         self.start_interface()
436         while True:
437             while self.interface.is_connected:
438                 new_addresses = self.wallet.synchronize()
439                 if new_addresses:
440                     self.interface.subscribe(new_addresses)
441
442                 if self.wallet.is_up_to_date():
443                     if not self.wallet.up_to_date:
444                         self.wallet.up_to_date = True
445                         self.wallet.was_updated = True
446                         self.wallet.up_to_date_event.set()
447                 else:
448                     if self.wallet.up_to_date:
449                         self.wallet.up_to_date = False
450                         self.wallet.was_updated = True
451
452                 if self.wallet.was_updated:
453                     self.wallet.trigger_callbacks()
454                     self.wallet.was_updated = False
455
456                 response = self.interface.responses.get()
457                 self.handle_response(response)
458
459             self.wallet.trigger_callbacks()
460             if self.loop:
461                 time.sleep(5)
462                 # Server has been changed. Copy callback for new interface.
463                 self.proxy = self.interface.proxy
464                 self.init_interface()
465                 self.start_interface()
466                 continue
467             else:
468                 break
469
470
471