make proxy an optional parameter
[electrum-nvc.git] / lib / interface.py
1 #!/usr/bin/env python
2 #
3 # Electrum - lightweight Bitcoin client
4 # Copyright (C) 2011 thomasv@gitorious
5 #
6 # This program is free software: you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation, either version 3 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful,
12 # but WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 # GNU General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program. If not, see <http://www.gnu.org/licenses/>.
18
19
20 import random, socket, ast, re
21 import threading, traceback, sys, time, json, Queue
22
23 from version import ELECTRUM_VERSION
24 from util import print_error
25
26 DEFAULT_TIMEOUT = 5
27 DEFAULT_SERVERS = [ 'ecdsa.org:50001:t', 
28                     'electrum.novit.ro:50001:t', 
29                     'electrum.bytesized-hosting.com:50001:t']  # list of default servers
30
31 proxy_modes = ['socks4', 'socks5', 'http']
32
33 def replace_keys(obj, old_key, new_key):
34     if isinstance(obj, dict):
35         if old_key in obj:
36             obj[new_key] = obj[old_key]
37             del obj[old_key]
38         for elem in obj.itervalues():
39             replace_keys(elem, old_key, new_key)
40     elif isinstance(obj, list):
41         for elem in obj:
42             replace_keys(elem, old_key, new_key)
43
44 def old_to_new(d):
45     replace_keys(d, 'blk_hash', 'block_hash')
46     replace_keys(d, 'pos', 'index')
47     replace_keys(d, 'nTime', 'timestamp')
48     replace_keys(d, 'is_in', 'is_input')
49     replace_keys(d, 'raw_scriptPubKey', 'raw_output_script')
50
51 def parse_proxy_options(s):
52     proxy = { "mode":"socks5", "host":"localhost" }
53     args = s.split(':')
54     n = 0
55     if proxy_modes.count(args[n]) == 1:
56         proxy["mode"] = args[n]
57         n += 1
58     if len(args) > n:
59         proxy["host"] = args[n]
60         n += 1
61     if len(args) > n:
62         proxy["port"] = args[n]
63     else:
64         proxy["port"] = "8080" if proxy["mode"] == "http" else "1080"
65     return proxy
66
67 class Interface(threading.Thread):
68     def __init__(self, host, port, proxy=None):
69         threading.Thread.__init__(self)
70         self.daemon = True
71         self.host = host
72         self.port = port
73         self.proxy = proxy
74
75         self.servers = [] # actual list from IRC
76         self.rtime = 0
77         self.bytes_received = 0
78
79         self.is_connected = True
80         self.poll_interval = 1
81
82         #json
83         self.message_id = 0
84         self.responses = Queue.Queue()
85         self.unanswered_requests = {}
86
87     def init_socket(self):
88         pass
89
90     def poke(self):
91         # push a fake response so that the getting thread exits its loop
92         self.responses.put(None)
93
94     def queue_json_response(self, c):
95
96         # uncomment to debug
97         # print "<--",c
98
99         msg_id = c.get('id')
100         error = c.get('error')
101         
102         if error:
103             print "received error:", c
104             return
105
106         if msg_id is not None:
107             method, params = self.unanswered_requests.pop(msg_id)
108             result = c.get('result')
109         else:
110             # notification
111             method = c.get('method')
112             params = c.get('params')
113
114             if method == 'blockchain.numblocks.subscribe':
115                 result = params[0]
116                 params = []
117
118             elif method == 'blockchain.address.subscribe':
119                 addr = params[0]
120                 result = params[1]
121                 params = [addr]
122
123         self.responses.put({'method':method, 'params':params, 'result':result})
124
125
126
127     def subscribe(self, addresses):
128         messages = []
129         for addr in addresses:
130             messages.append(('blockchain.address.subscribe', [addr]))
131         self.send(messages)
132
133
134
135
136 class PollingInterface(Interface):
137     """ non-persistent connection. synchronous calls"""
138
139     def __init__(self, host, port, proxy=None):
140         Interface.__init__(self, host, port, proxy)
141         self.session_id = None
142
143     def get_history(self, address):
144         self.send([('blockchain.address.get_history', [address] )])
145
146     def poll(self):
147         pass
148         #if is_new or wallet.remote_url:
149         #    self.was_updated = True
150         #    is_new = wallet.synchronize()
151         #    wallet.update_tx_history()
152         #    wallet.save()
153         #    return is_new
154         #else:
155         #    return False
156
157     def run(self):
158         self.is_connected = True
159         while self.is_connected:
160             try:
161                 if self.session_id:
162                     self.poll()
163                 time.sleep(self.poll_interval)
164             except socket.gaierror:
165                 break
166             except socket.error:
167                 break
168             except:
169                 traceback.print_exc(file=sys.stdout)
170                 break
171             
172         self.is_connected = False
173         self.poke()
174
175                 
176
177
178
179
180
181
182
183 class HttpStratumInterface(PollingInterface):
184
185     def poll(self):
186         self.send([])
187
188     def send(self, messages):
189         import urllib2, json, time, cookielib
190         
191         if self.proxy:
192             import socks
193             socks.setdefaultproxy(proxy_modes.index(self.proxy["mode"]), self.proxy["host"], int(self.proxy["port"]) )
194             socks.wrapmodule(urllib2)
195         cj = cookielib.CookieJar()
196         opener = urllib2.build_opener(urllib2.HTTPCookieProcessor(cj))
197         urllib2.install_opener(opener)
198
199         t1 = time.time()
200
201         data = []
202         for m in messages:
203             method, params = m
204             if type(params) != type([]): params = [params]
205             data.append( { 'method':method, 'id':self.message_id, 'params':params } )
206             self.unanswered_requests[self.message_id] = method, params
207             self.message_id += 1
208
209         if data:
210             data_json = json.dumps(data)
211         else:
212             # poll with GET
213             data_json = None 
214
215         host = 'http://%s:%d'%( self.host, self.port )
216         headers = {'content-type': 'application/json'}
217         if self.session_id:
218             headers['cookie'] = 'SESSION=%s'%self.session_id
219
220         req = urllib2.Request(host, data_json, headers)
221         response_stream = urllib2.urlopen(req)
222
223         for index, cookie in enumerate(cj):
224             if cookie.name=='SESSION':
225                 self.session_id = cookie.value
226
227         response = response_stream.read()
228         self.bytes_received += len(response)
229         if response: 
230             response = json.loads( response )
231             if type(response) is not type([]):
232                 self.queue_json_response(response)
233             else:
234                 for item in response:
235                     self.queue_json_response(item)
236
237         if response: 
238             self.poll_interval = 1
239         else:
240             if self.poll_interval < 15: 
241                 self.poll_interval += 1
242         #print self.poll_interval, response
243
244         self.rtime = time.time() - t1
245         self.is_connected = True
246
247
248
249
250 class TcpStratumInterface(Interface):
251     """json-rpc over persistent TCP connection, asynchronous"""
252
253     def __init__(self, host, port, proxy=None):
254         Interface.__init__(self, host, port, proxy)
255
256     def init_socket(self):
257         global proxy_modes
258         if self.proxy is None:
259             self.s = socket.socket( socket.AF_INET, socket.SOCK_STREAM )
260         else:
261             import socks
262             self.s = socks.socksocket()
263             print "Using Proxy", self.proxy
264             self.s.setproxy(proxy_modes.index(self.proxy["mode"]), self.proxy["host"], int(self.proxy["port"]) )
265         self.s.settimeout(60)
266         self.s.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1)
267         try:
268             self.s.connect(( self.host.encode('ascii'), int(self.port)))
269             self.is_connected = True
270             self.send([('server.version', [ELECTRUM_VERSION])])
271             print "Connected to %s:%d"%(self.host,self.port)
272         except:
273             self.is_connected = False
274             print_error("Not connected")
275
276     def run(self):
277         try:
278             out = ''
279             while self.is_connected:
280                 try: msg = self.s.recv(1024)
281                 except socket.timeout:
282                     # ping the server with server.version, as a real ping does not exist yet
283                     self.send([('server.version', [ELECTRUM_VERSION])])
284                     continue
285                 out += msg
286                 self.bytes_received += len(msg)
287                 if msg == '': 
288                     self.is_connected = False
289                     print "Disconnected."
290
291                 while True:
292                     s = out.find('\n')
293                     if s==-1: break
294                     c = out[0:s]
295                     out = out[s+1:]
296                     c = json.loads(c)
297                     self.queue_json_response(c)
298
299         except:
300             traceback.print_exc(file=sys.stdout)
301
302         self.is_connected = False
303         print "Poking"
304         self.poke()
305
306     def send(self, messages):
307         out = ''
308         for m in messages:
309             method, params = m 
310             request = json.dumps( { 'id':self.message_id, 'method':method, 'params':params } )
311             self.unanswered_requests[self.message_id] = method, params
312
313             # uncomment to debug
314             # print "-->",request
315
316             self.message_id += 1
317             out += request + '\n'
318
319         while out:
320             sent = self.s.send( out )
321             out = out[sent:]
322
323     def get_history(self, addr):
324         self.send([('blockchain.address.get_history', [addr])])
325
326
327
328
329
330 class WalletSynchronizer(threading.Thread):
331
332     def __init__(self, wallet, loop=False, servers_loaded_callback=None, proxy=None):
333         threading.Thread.__init__(self)
334         self.daemon = True
335         self.wallet = wallet
336         self.loop = loop
337         self.proxy = proxy
338         self.init_interface()
339         self.servers_loaded_callback = servers_loaded_callback
340
341     def init_interface(self):
342         try:
343             host, port, protocol = self.wallet.server.split(':')
344             port = int(port)
345         except:
346             self.wallet.pick_random_server()
347             host, port, protocol = self.wallet.server.split(':')
348             port = int(port)
349
350         #print protocol, host, port
351         if protocol == 't':
352             InterfaceClass = TcpStratumInterface
353         elif protocol == 'h':
354             InterfaceClass = HttpStratumInterface
355         else:
356             print_error("Error: Unknown protocol")
357             InterfaceClass = TcpStratumInterface
358
359         self.interface = InterfaceClass(host, port, self.proxy)
360         self.wallet.interface = self.interface
361
362     def handle_response(self, r):
363         if r is None:
364             return
365
366         method = r['method']
367         params = r['params']
368         result = r['result']
369
370         if method == 'server.banner':
371             self.wallet.banner = result
372             self.wallet.was_updated = True
373
374         elif method == 'server.peers.subscribe':
375             servers = []
376             for item in result:
377                 s = []
378                 host = item[1]
379                 ports = []
380                 version = None
381                 if len(item) > 2:
382                     for v in item[2]:
383                         if re.match("[th]\d+", v):
384                             ports.append((v[0], v[1:]))
385                         if re.match("v(.?)+", v):
386                             version = v[1:]
387                 if ports and version:
388                     servers.append((host, ports))
389             self.interface.servers = servers
390             # servers_loaded_callback is None for commands, but should
391             # NEVER be None when using the GUI.
392             if self.servers_loaded_callback is not None:
393                 self.servers_loaded_callback()
394
395         elif method == 'blockchain.address.subscribe':
396             addr = params[0]
397             self.wallet.receive_status_callback(addr, result)
398                             
399         elif method == 'blockchain.address.get_history':
400             addr = params[0]
401             self.wallet.receive_history_callback(addr, result)
402             self.wallet.was_updated = True
403
404         elif method == 'blockchain.transaction.broadcast':
405             self.wallet.tx_result = result
406             self.wallet.tx_event.set()
407
408         elif method == 'blockchain.numblocks.subscribe':
409             self.wallet.blocks = result
410             self.wallet.was_updated = True
411
412         elif method == 'server.version':
413             pass
414
415         else:
416             print_error("Error: Unknown message:" + method + ", " + params + ", " + result)
417
418
419     def start_interface(self):
420         self.interface.init_socket()
421         self.interface.start()
422         if self.interface.is_connected:
423             self.wallet.start_session(self.interface)
424
425
426
427     def run(self):
428         import socket, time
429         self.start_interface()
430         while True:
431             while self.interface.is_connected:
432                 new_addresses = self.wallet.synchronize()
433                 if new_addresses:
434                     self.interface.subscribe(new_addresses)
435
436                 if self.wallet.is_up_to_date():
437                     if not self.wallet.up_to_date:
438                         self.wallet.up_to_date = True
439                         self.wallet.was_updated = True
440                         self.wallet.up_to_date_event.set()
441                 else:
442                     if self.wallet.up_to_date:
443                         self.wallet.up_to_date = False
444                         self.wallet.was_updated = True
445
446                 if self.wallet.was_updated:
447                     self.wallet.trigger_callbacks()
448                     self.wallet.was_updated = False
449
450                 response = self.interface.responses.get()
451                 self.handle_response(response)
452
453             self.wallet.trigger_callbacks()
454             if self.loop:
455                 time.sleep(5)
456                 # Server has been changed. Copy callback for new interface.
457                 self.proxy = self.interface.proxy
458                 self.init_interface()
459                 self.start_interface()
460                 continue
461             else:
462                 break
463
464
465