setup package in lib subdirectory
[electrum-nvc.git] / lib / interface.py
1 #!/usr/bin/env python
2 #
3 # Electrum - lightweight Bitcoin client
4 # Copyright (C) 2011 thomasv@gitorious
5 #
6 # This program is free software: you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation, either version 3 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful,
12 # but WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 # GNU General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program. If not, see <http://www.gnu.org/licenses/>.
18
19
20 import random, socket, ast, re
21 import threading, traceback, sys, time, json, Queue
22
23 from version import ELECTRUM_VERSION
24
25 DEFAULT_TIMEOUT = 5
26 DEFAULT_SERVERS = [ 'ecdsa.org:50001:t', 'electrum.novit.ro:50001:t', 'electrum.bitcoins.sk:50001:t']  # list of default servers
27
28
29 def old_to_new(s):
30     s = s.replace("'blk_hash'", "'block_hash'")
31     s = s.replace("'pos'", "'index'")
32     s = s.replace("'nTime'", "'timestamp'")
33     s = s.replace("'is_in'", "'is_input'")
34     s = s.replace("'raw_scriptPubKey'","'raw_output_script'")
35     return s
36
37
38 class Interface(threading.Thread):
39     def __init__(self, host, port):
40         threading.Thread.__init__(self)
41         self.daemon = True
42         self.host = host
43         self.port = port
44
45         self.servers = [] # actual list from IRC
46         self.rtime = 0
47
48         self.is_connected = True
49         self.poll_interval = 1
50
51         #json
52         self.message_id = 0
53         self.responses = Queue.Queue()
54         self.unanswered_requests = {}
55
56     def poke(self):
57         # push a fake response so that the getting thread exits its loop
58         self.responses.put(None)
59
60     def queue_json_response(self, c):
61
62         #print "<--",c
63         msg_id = c.get('id')
64         error = c.get('error')
65         
66         if error:
67             print "received error:", c
68             return
69
70         if msg_id is not None:
71             method, params = self.unanswered_requests.pop(msg_id)
72             result = c.get('result')
73         else:
74             # notification
75             method = c.get('method')
76             params = c.get('params')
77
78             if method == 'blockchain.numblocks.subscribe':
79                 result = params[0]
80                 params = []
81
82             elif method == 'blockchain.address.subscribe':
83                 addr = params[0]
84                 result = params[1]
85                 params = [addr]
86
87         self.responses.put({'method':method, 'params':params, 'result':result})
88
89
90
91     def subscribe(self, addresses):
92         messages = []
93         for addr in addresses:
94             messages.append(('blockchain.address.subscribe', [addr]))
95         self.send(messages)
96
97
98
99
100 class PollingInterface(Interface):
101     """ non-persistent connection. synchronous calls"""
102
103     def __init__(self, host, port):
104         Interface.__init__(self, host, port)
105         self.session_id = None
106
107     def get_history(self, address):
108         self.send([('blockchain.address.get_history', [address] )])
109
110     def poll(self):
111         pass
112         #if is_new or wallet.remote_url:
113         #    self.was_updated = True
114         #    is_new = wallet.synchronize()
115         #    wallet.update_tx_history()
116         #    wallet.save()
117         #    return is_new
118         #else:
119         #    return False
120
121     def run(self):
122         self.is_connected = True
123         while self.is_connected:
124             try:
125                 if self.session_id:
126                     self.poll()
127                 time.sleep(self.poll_interval)
128             except socket.gaierror:
129                 break
130             except socket.error:
131                 break
132             except:
133                 traceback.print_exc(file=sys.stdout)
134                 break
135             
136         self.is_connected = False
137         self.poke()
138
139                 
140
141
142
143
144
145
146
147 class HttpStratumInterface(PollingInterface):
148
149     def poll(self):
150         self.send([])
151
152     def send(self, messages):
153         import urllib2, json, time, cookielib
154
155         cj = cookielib.CookieJar()
156         opener = urllib2.build_opener(urllib2.HTTPCookieProcessor(cj))
157         urllib2.install_opener(opener)
158
159         t1 = time.time()
160
161         data = []
162         for m in messages:
163             method, params = m
164             if type(params) != type([]): params = [params]
165             data.append( { 'method':method, 'id':self.message_id, 'params':params } )
166             self.unanswered_requests[self.message_id] = method, params
167             self.message_id += 1
168
169         if data:
170             data_json = json.dumps(data)
171         else:
172             # poll with GET
173             data_json = None 
174
175         host = 'http://%s:%d'%( self.host, self.port )
176         headers = {'content-type': 'application/json'}
177         if self.session_id:
178             headers['cookie'] = 'SESSION=%s'%self.session_id
179
180         req = urllib2.Request(host, data_json, headers)
181         response_stream = urllib2.urlopen(req)
182
183         for index, cookie in enumerate(cj):
184             if cookie.name=='SESSION':
185                 self.session_id = cookie.value
186
187         response = response_stream.read()
188         if response: 
189             response = json.loads( response )
190             if type(response) is not type([]):
191                 self.queue_json_response(response)
192             else:
193                 for item in response:
194                     self.queue_json_response(item)
195
196         if response: 
197             self.poll_interval = 1
198         else:
199             if self.poll_interval < 15: 
200                 self.poll_interval += 1
201         #print self.poll_interval, response
202
203         self.rtime = time.time() - t1
204         self.is_connected = True
205
206
207
208
209 class TcpStratumInterface(Interface):
210     """json-rpc over persistent TCP connection, asynchronous"""
211
212     def __init__(self, host, port):
213         Interface.__init__(self, host, port)
214         self.s = socket.socket( socket.AF_INET, socket.SOCK_STREAM )
215         self.s.settimeout(5*60)
216         self.s.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1)
217         try:
218             self.s.connect(( self.host, self.port))
219             self.is_connected = True
220             self.send([('server.version', [ELECTRUM_VERSION])])
221         except:
222             self.is_connected = False
223             print "not connected"
224
225     def run(self):
226         try:
227             out = ''
228             while self.is_connected:
229                 try: msg = self.s.recv(1024)
230                 except socket.timeout:
231                     # ping the server with server.version, as a real ping does not exist yet
232                     self.send([('server.version', [ELECTRUM_VERSION])])
233                     continue
234                 out += msg
235                 if msg == '': 
236                     self.is_connected = False
237                     print "disconnected."
238
239                 while True:
240                     s = out.find('\n')
241                     if s==-1: break
242                     c = out[0:s]
243                     out = out[s+1:]
244                     c = json.loads(c)
245                     self.queue_json_response(c)
246
247         except:
248             traceback.print_exc(file=sys.stdout)
249
250         self.is_connected = False
251         print "poking"
252         self.poke()
253
254     def send(self, messages):
255         out = ''
256         for m in messages:
257             method, params = m 
258             request = json.dumps( { 'id':self.message_id, 'method':method, 'params':params } )
259             self.unanswered_requests[self.message_id] = method, params
260             #print "-->",request
261             self.message_id += 1
262             out += request + '\n'
263         self.s.send( out )
264
265     def get_history(self, addr):
266         self.send([('blockchain.address.get_history', [addr])])
267
268
269
270
271
272 class WalletSynchronizer(threading.Thread):
273
274     def __init__(self, wallet, loop=False):
275         threading.Thread.__init__(self)
276         self.daemon = True
277         self.wallet = wallet
278         self.loop = loop
279         self.start_interface()
280
281
282     def handle_response(self, r):
283         if r is None:
284             return
285
286         method = r['method']
287         params = r['params']
288         result = r['result']
289
290         if method == 'server.banner':
291             self.wallet.banner = result
292             self.wallet.was_updated = True
293
294         elif method == 'server.peers.subscribe':
295             servers = []
296             for item in result:
297                 s = []
298                 host = item[1]
299                 ports = []
300                 if len(item)>2:
301                     for v in item[2]:
302                         if re.match("[th]\d+",v):
303                             ports.append((v[0],v[1:]))
304                 if ports:
305                     servers.append( (host, ports) )
306             self.interface.servers = servers
307
308         elif method == 'blockchain.address.subscribe':
309             addr = params[0]
310             self.wallet.receive_status_callback(addr, result)
311                             
312         elif method == 'blockchain.address.get_history':
313             addr = params[0]
314             self.wallet.receive_history_callback(addr, result)
315             self.wallet.was_updated = True
316
317         elif method == 'blockchain.transaction.broadcast':
318             self.wallet.tx_result = result
319             self.wallet.tx_event.set()
320
321         elif method == 'blockchain.numblocks.subscribe':
322             self.wallet.blocks = result
323
324         elif method == 'server.version':
325             pass
326
327         else:
328             print "unknown message:", method, params, result
329
330
331     def start_interface(self):
332         try:
333             host, port, protocol = self.wallet.server.split(':')
334             port = int(port)
335         except:
336             self.wallet.pick_random_server()
337             host, port, protocol = self.wallet.server.split(':')
338             port = int(port)
339
340         #print protocol, host, port
341         if protocol == 't':
342             InterfaceClass = TcpStratumInterface
343         elif protocol == 'h':
344             InterfaceClass = HttpStratumInterface
345         else:
346             print "unknown protocol"
347             InterfaceClass = TcpStratumInterface
348
349         self.interface = InterfaceClass(host, port)
350         self.interface.start()
351         self.wallet.interface = self.interface
352
353         if self.interface.is_connected:
354             self.wallet.start_session(self.interface)
355
356
357
358     def run(self):
359         import socket, time
360         while True:
361             while self.interface.is_connected:
362                 new_addresses = self.wallet.synchronize()
363                 if new_addresses:
364                     self.interface.subscribe(new_addresses)
365
366                 if self.wallet.is_up_to_date():
367                     if not self.wallet.up_to_date:
368                         self.wallet.up_to_date = True
369                         self.wallet.was_updated = True
370                         self.wallet.up_to_date_event.set()
371                 else:
372                     if self.wallet.up_to_date:
373                         self.wallet.up_to_date = False
374                         self.wallet.was_updated = True
375
376                 if self.wallet.was_updated:
377                     self.wallet.gui_callback()
378                     self.wallet.was_updated = False
379
380                 response = self.interface.responses.get()
381                 self.handle_response(response)
382
383             print "disconnected, gui callback"
384             self.wallet.gui_callback()
385             if self.loop:
386                 time.sleep(5)
387                 self.start_interface()
388                 continue
389             else:
390                 break
391
392
393