Fixed merged conflict and added folder creation on first load
[electrum-nvc.git] / lib / interface.py
1 #!/usr/bin/env python
2 #
3 # Electrum - lightweight Bitcoin client
4 # Copyright (C) 2011 thomasv@gitorious
5 #
6 # This program is free software: you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation, either version 3 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful,
12 # but WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 # GNU General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program. If not, see <http://www.gnu.org/licenses/>.
18
19
20 import random, socket, ast, re
21 import threading, traceback, sys, time, json, Queue
22
23 from version import ELECTRUM_VERSION
24 from util import print_error
25
26 DEFAULT_TIMEOUT = 5
27 DEFAULT_SERVERS = [ 'ecdsa.org:50001:t', 
28                     'electrum.novit.ro:50001:t', 
29                     'electrum.bytesized-hosting.com:50001:t']  # list of default servers
30
31 proxy_modes = ['none', 'socks4', 'socks5', 'http' ]
32
33 def replace_keys(obj, old_key, new_key):
34     if isinstance(obj, dict):
35         if old_key in obj:
36             obj[new_key] = obj[old_key]
37             del obj[old_key]
38         for elem in obj.itervalues():
39             replace_keys(elem, old_key, new_key)
40     elif isinstance(obj, list):
41         for elem in obj:
42             replace_keys(elem, old_key, new_key)
43
44 def old_to_new(d):
45     replace_keys(d, 'blk_hash', 'block_hash')
46     replace_keys(d, 'pos', 'index')
47     replace_keys(d, 'nTime', 'timestamp')
48     replace_keys(d, 'is_in', 'is_input')
49     replace_keys(d, 'raw_scriptPubKey', 'raw_output_script')
50
51 def parse_proxy_options(s):
52     proxy = { "mode":"socks5", "host":"localhost" }
53     args = s.split(':')
54     n = 0
55     if proxy_modes.count(args[n]) == 1:
56         proxy["mode"] = args[n]
57         n += 1
58     if len(args) > n:
59         proxy["host"] = args[n]
60         n += 1
61     if len(args) > n:
62         proxy["port"] = args[n]
63     else:
64         proxy["port"] = "8080" if proxy["mode"] == "http" else "1080"
65     return proxy
66
67 class Interface(threading.Thread):
68     def __init__(self, host, port, debug_server, proxy):
69         threading.Thread.__init__(self)
70         self.daemon = True
71         self.host = host
72         self.port = port
73         self.proxy = proxy
74
75         self.servers = [] # actual list from IRC
76         self.rtime = 0
77         self.bytes_received = 0
78
79         self.is_connected = True
80         self.poll_interval = 1
81
82         #json
83         self.message_id = 0
84         self.responses = Queue.Queue()
85         self.unanswered_requests = {}
86
87         self.debug_server = debug_server
88
89     def init_socket(self):
90         pass
91
92     def poke(self):
93         # push a fake response so that the getting thread exits its loop
94         self.responses.put(None)
95
96     def queue_json_response(self, c):
97
98         if self.debug_server:
99           print "<--",c
100
101         msg_id = c.get('id')
102         error = c.get('error')
103         
104         if error:
105             print "received error:", c
106             return
107
108         if msg_id is not None:
109             method, params = self.unanswered_requests.pop(msg_id)
110             result = c.get('result')
111         else:
112             # notification
113             method = c.get('method')
114             params = c.get('params')
115
116             if method == 'blockchain.numblocks.subscribe':
117                 result = params[0]
118                 params = []
119
120             elif method == 'blockchain.address.subscribe':
121                 addr = params[0]
122                 result = params[1]
123                 params = [addr]
124
125         self.responses.put({'method':method, 'params':params, 'result':result})
126
127
128
129     def subscribe(self, addresses):
130         messages = []
131         for addr in addresses:
132             messages.append(('blockchain.address.subscribe', [addr]))
133         self.send(messages)
134
135
136
137
138 class PollingInterface(Interface):
139     """ non-persistent connection. synchronous calls"""
140
141     def __init__(self, host, port, debug_server, proxy):
142         Interface.__init__(self, host, port, debug_server, proxy)
143         self.session_id = None
144         self.debug_server = debug_server
145
146     def get_history(self, address):
147         self.send([('blockchain.address.get_history', [address] )])
148
149     def poll(self):
150         pass
151         #if is_new or wallet.remote_url:
152         #    self.was_updated = True
153         #    is_new = wallet.synchronize()
154         #    wallet.update_tx_history()
155         #    wallet.save()
156         #    return is_new
157         #else:
158         #    return False
159
160     def run(self):
161         self.is_connected = True
162         while self.is_connected:
163             try:
164                 if self.session_id:
165                     self.poll()
166                 time.sleep(self.poll_interval)
167             except socket.gaierror:
168                 break
169             except socket.error:
170                 break
171             except:
172                 traceback.print_exc(file=sys.stdout)
173                 break
174             
175         self.is_connected = False
176         self.poke()
177
178                 
179
180
181
182
183
184
185
186 class HttpStratumInterface(PollingInterface):
187
188     def poll(self):
189         self.send([])
190
191     def send(self, messages):
192         import urllib2, json, time, cookielib
193         
194         if self.proxy["mode"] != "none":
195             import socks
196             socks.setdefaultproxy(proxy_modes.index(self.proxy["mode"]), self.proxy["host"], int(self.proxy["port"]) )
197             socks.wrapmodule(urllib2)
198         cj = cookielib.CookieJar()
199         opener = urllib2.build_opener(urllib2.HTTPCookieProcessor(cj))
200         urllib2.install_opener(opener)
201
202         t1 = time.time()
203
204         data = []
205         for m in messages:
206             method, params = m
207             if type(params) != type([]): params = [params]
208             data.append( { 'method':method, 'id':self.message_id, 'params':params } )
209             self.unanswered_requests[self.message_id] = method, params
210             self.message_id += 1
211
212         if data:
213             data_json = json.dumps(data)
214         else:
215             # poll with GET
216             data_json = None 
217
218         host = 'http://%s:%d'%( self.host, self.port )
219         headers = {'content-type': 'application/json'}
220         if self.session_id:
221             headers['cookie'] = 'SESSION=%s'%self.session_id
222
223         req = urllib2.Request(host, data_json, headers)
224         response_stream = urllib2.urlopen(req)
225
226         for index, cookie in enumerate(cj):
227             if cookie.name=='SESSION':
228                 self.session_id = cookie.value
229
230         response = response_stream.read()
231         self.bytes_received += len(response)
232         if response: 
233             response = json.loads( response )
234             if type(response) is not type([]):
235                 self.queue_json_response(response)
236             else:
237                 for item in response:
238                     self.queue_json_response(item)
239
240         if response: 
241             self.poll_interval = 1
242         else:
243             if self.poll_interval < 15: 
244                 self.poll_interval += 1
245         #print self.poll_interval, response
246
247         self.rtime = time.time() - t1
248         self.is_connected = True
249
250
251
252
253 class TcpStratumInterface(Interface):
254     """json-rpc over persistent TCP connection, asynchronous"""
255
256     def __init__(self, host, port, debug_server, proxy):
257         Interface.__init__(self, host, port, debug_server, proxy)
258         self.debug_server = debug_server
259
260     def init_socket(self):
261         global proxy_modes
262         if self.proxy["mode"] == "none":
263             self.s = socket.socket( socket.AF_INET, socket.SOCK_STREAM )
264         else:
265             import socks
266             self.s = socks.socksocket()
267             print "Using Proxy", self.proxy
268             self.s.setproxy(proxy_modes.index(self.proxy["mode"]), self.proxy["host"], int(self.proxy["port"]) )
269         self.s.settimeout(60)
270         self.s.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1)
271         try:
272             self.s.connect(( self.host.encode('ascii'), int(self.port)))
273             self.is_connected = True
274             self.send([('server.version', [ELECTRUM_VERSION])])
275             print "Connected to %s:%d"%(self.host,self.port)
276         except:
277             self.is_connected = False
278             print_error("Not connected")
279
280     def run(self):
281         try:
282             out = ''
283             while self.is_connected:
284                 try: msg = self.s.recv(1024)
285                 except socket.timeout:
286                     # ping the server with server.version, as a real ping does not exist yet
287                     self.send([('server.version', [ELECTRUM_VERSION])])
288                     continue
289                 out += msg
290                 self.bytes_received += len(msg)
291                 if msg == '': 
292                     self.is_connected = False
293                     print "Disconnected."
294
295                 while True:
296                     s = out.find('\n')
297                     if s==-1: break
298                     c = out[0:s]
299                     out = out[s+1:]
300                     c = json.loads(c)
301                     self.queue_json_response(c)
302
303         except:
304             traceback.print_exc(file=sys.stdout)
305
306         self.is_connected = False
307         print "Poking"
308         self.poke()
309
310     def send(self, messages):
311         out = ''
312         for m in messages:
313             method, params = m 
314             request = json.dumps( { 'id':self.message_id, 'method':method, 'params':params } )
315             self.unanswered_requests[self.message_id] = method, params
316
317             if self.debug_server:
318               print "-->",request
319
320             self.message_id += 1
321             out += request + '\n'
322
323         while out:
324             sent = self.s.send( out )
325             out = out[sent:]
326
327     def get_history(self, addr):
328         self.send([('blockchain.address.get_history', [addr])])
329
330
331
332
333
334 class WalletSynchronizer(threading.Thread):
335
336     def __init__(self, wallet, loop=False, servers_loaded_callback=None, proxy=None):
337         threading.Thread.__init__(self)
338         self.daemon = True
339         self.wallet = wallet
340         self.loop = loop
341         self.proxy = proxy
342         self.init_interface()
343         self.servers_loaded_callback = servers_loaded_callback
344
345     def init_interface(self):
346         try:
347             host, port, protocol = self.wallet.server.split(':')
348             port = int(port)
349         except:
350             self.wallet.pick_random_server()
351             host, port, protocol = self.wallet.server.split(':')
352             port = int(port)
353
354         #print protocol, host, port
355         if protocol == 't':
356             InterfaceClass = TcpStratumInterface
357         elif protocol == 'h':
358             InterfaceClass = HttpStratumInterface
359         else:
360             print_error("Error: Unknown protocol")
361             InterfaceClass = TcpStratumInterface
362
363         self.interface = InterfaceClass(host, port, self.wallet.debug_server, self.proxy)
364         self.wallet.interface = self.interface
365
366     def handle_response(self, r):
367         if r is None:
368             return
369
370         method = r['method']
371         params = r['params']
372         result = r['result']
373
374         if method == 'server.banner':
375             self.wallet.banner = result
376             self.wallet.was_updated = True
377
378         elif method == 'server.peers.subscribe':
379             servers = []
380             for item in result:
381                 s = []
382                 host = item[1]
383                 ports = []
384                 version = None
385                 if len(item) > 2:
386                     for v in item[2]:
387                         if re.match("[th]\d+", v):
388                             ports.append((v[0], v[1:]))
389                         if re.match("v(.?)+", v):
390                             version = v[1:]
391                 if ports and version:
392                     servers.append((host, ports))
393             self.interface.servers = servers
394             # servers_loaded_callback is None for commands, but should
395             # NEVER be None when using the GUI.
396             if self.servers_loaded_callback is not None:
397                 self.servers_loaded_callback()
398
399         elif method == 'blockchain.address.subscribe':
400             addr = params[0]
401             self.wallet.receive_status_callback(addr, result)
402                             
403         elif method == 'blockchain.address.get_history':
404             addr = params[0]
405             self.wallet.receive_history_callback(addr, result)
406             self.wallet.was_updated = True
407
408         elif method == 'blockchain.transaction.broadcast':
409             self.wallet.tx_result = result
410             self.wallet.tx_event.set()
411
412         elif method == 'blockchain.numblocks.subscribe':
413             self.wallet.blocks = result
414             self.wallet.was_updated = True
415
416         elif method == 'server.version':
417             pass
418
419         else:
420             print_error("Error: Unknown message:" + method + ", " + params + ", " + result)
421
422
423     def start_interface(self):
424         self.interface.init_socket()
425         self.interface.start()
426         if self.interface.is_connected:
427             self.wallet.start_session(self.interface)
428
429
430
431     def run(self):
432         import socket, time
433         self.start_interface()
434         while True:
435             while self.interface.is_connected:
436                 new_addresses = self.wallet.synchronize()
437                 if new_addresses:
438                     self.interface.subscribe(new_addresses)
439
440                 if self.wallet.is_up_to_date():
441                     if not self.wallet.up_to_date:
442                         self.wallet.up_to_date = True
443                         self.wallet.was_updated = True
444                         self.wallet.up_to_date_event.set()
445                 else:
446                     if self.wallet.up_to_date:
447                         self.wallet.up_to_date = False
448                         self.wallet.was_updated = True
449
450                 if self.wallet.was_updated:
451                     self.wallet.trigger_callbacks()
452                     self.wallet.was_updated = False
453
454                 response = self.interface.responses.get()
455                 self.handle_response(response)
456
457             self.wallet.trigger_callbacks()
458             if self.loop:
459                 time.sleep(5)
460                 # Server has been changed. Copy callback for new interface.
461                 self.proxy = self.interface.proxy
462                 self.init_interface()
463                 self.start_interface()
464                 continue
465             else:
466                 break
467
468
469