3 # Electrum - lightweight Bitcoin client
4 # Copyright (C) 2011 thomasv@gitorious
6 # This program is free software: you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation, either version 3 of the License, or
9 # (at your option) any later version.
11 # This program is distributed in the hope that it will be useful,
12 # but WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 # GNU General Public License for more details.
16 # You should have received a copy of the GNU General Public License
17 # along with this program. If not, see <http://www.gnu.org/licenses/>.
20 import random, socket, ast, re
21 import threading, traceback, sys, time, json, Queue
23 from version import ELECTRUM_VERSION
24 from util import print_error
28 DEFAULT_SERVERS = [ 'electrum.novit.ro:50001:t',
29 'electrum.bytesized-hosting.com:50001:t'] # list of default servers
31 proxy_modes = ['socks4', 'socks5', 'http']
33 def pick_random_server():
34 print "using random server"
35 return random.choice( DEFAULT_SERVERS )
42 class InterfaceAncestor(threading.Thread):
44 def __init__(self, host, port, proxy=None):
45 threading.Thread.__init__(self)
51 self.servers = [] # actual list from IRC
53 self.bytes_received = 0
55 self.is_connected = True
56 self.poll_interval = 1
60 self.responses = Queue.Queue()
61 self.unanswered_requests = {}
63 def init_socket(self):
67 # push a fake response so that the getting thread exits its loop
68 self.responses.put(None)
70 def queue_json_response(self, c):
76 error = c.get('error')
79 print "received error:", c
82 if msg_id is not None:
83 method, params = self.unanswered_requests.pop(msg_id)
84 result = c.get('result')
87 method = c.get('method')
88 params = c.get('params')
90 if method == 'blockchain.numblocks.subscribe':
94 elif method == 'blockchain.address.subscribe':
99 self.responses.put({'method':method, 'params':params, 'result':result})
103 def subscribe(self, addresses):
105 for addr in addresses:
106 messages.append(('blockchain.address.subscribe', [addr]))
113 class HttpStratumInterface(InterfaceAncestor):
114 """ non-persistent connection. synchronous calls"""
116 def __init__(self, host, port, proxy=None):
117 InterfaceAncestor.__init__(self, host, port, proxy)
118 self.session_id = None
120 def get_history(self, address):
121 self.send([('blockchain.address.get_history', [address] )])
124 self.is_connected = True
125 while self.is_connected:
129 time.sleep(self.poll_interval)
130 except socket.gaierror:
135 traceback.print_exc(file=sys.stdout)
138 self.is_connected = False
145 def send(self, messages):
146 import urllib2, json, time, cookielib
150 socks.setdefaultproxy(proxy_modes.index(self.proxy["mode"]), self.proxy["host"], int(self.proxy["port"]) )
151 socks.wrapmodule(urllib2)
153 cj = cookielib.CookieJar()
154 opener = urllib2.build_opener(urllib2.HTTPCookieProcessor(cj))
155 urllib2.install_opener(opener)
162 if type(params) != type([]): params = [params]
163 data.append( { 'method':method, 'id':self.message_id, 'params':params } )
164 self.unanswered_requests[self.message_id] = method, params
168 data_json = json.dumps(data)
173 host = 'http://%s:%d'%( self.host, self.port )
174 headers = {'content-type': 'application/json'}
176 headers['cookie'] = 'SESSION=%s'%self.session_id
178 req = urllib2.Request(host, data_json, headers)
179 response_stream = urllib2.urlopen(req)
181 for index, cookie in enumerate(cj):
182 if cookie.name=='SESSION':
183 self.session_id = cookie.value
185 response = response_stream.read()
186 self.bytes_received += len(response)
188 response = json.loads( response )
189 if type(response) is not type([]):
190 self.queue_json_response(response)
192 for item in response:
193 self.queue_json_response(item)
196 self.poll_interval = 1
198 if self.poll_interval < 15:
199 self.poll_interval += 1
200 #print self.poll_interval, response
202 self.rtime = time.time() - t1
203 self.is_connected = True
208 class TcpStratumInterface(InterfaceAncestor):
209 """json-rpc over persistent TCP connection, asynchronous"""
211 def __init__(self, host, port, proxy=None):
212 InterfaceAncestor.__init__(self, host, port, proxy)
214 def init_socket(self):
216 connection_msg = "%s:%d"%(self.host,self.port)
217 if self.proxy is None:
218 self.s = socket.socket( socket.AF_INET, socket.SOCK_STREAM )
220 connection_msg += " using proxy %s:%s:%s"%(self.proxy.get('mode'), self.proxy.get('host'), self.proxy.get('port'))
222 self.s = socks.socksocket()
223 self.s.setproxy(proxy_modes.index(self.proxy["mode"]), self.proxy["host"], int(self.proxy["port"]) )
224 self.s.settimeout(60)
225 self.s.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1)
227 self.s.connect(( self.host.encode('ascii'), int(self.port)))
228 self.is_connected = True
229 self.send([('server.version', [ELECTRUM_VERSION])])
230 print "Connected to " + connection_msg
232 self.is_connected = False
233 print_error("Failed to connect" + connection_msg)
238 while self.is_connected:
239 try: msg = self.s.recv(1024)
240 except socket.timeout:
241 # ping the server with server.version, as a real ping does not exist yet
242 self.send([('server.version', [ELECTRUM_VERSION])])
245 self.bytes_received += len(msg)
247 self.is_connected = False
248 print "Disconnected."
256 self.queue_json_response(c)
259 traceback.print_exc(file=sys.stdout)
261 self.is_connected = False
265 def send(self, messages):
269 request = json.dumps( { 'id':self.message_id, 'method':method, 'params':params } )
270 self.unanswered_requests[self.message_id] = method, params
273 # print "-->",request
276 out += request + '\n'
279 sent = self.s.send( out )
282 def get_history(self, addr):
283 self.send([('blockchain.address.get_history', [addr])])
287 class Interface(TcpStratumInterface, HttpStratumInterface):
289 def __init__(self, config):
292 s = config.get('server')
293 host, port, protocol = s.split(':')
296 s = pick_random_server()
297 host, port, protocol = s.split(':')
300 self.protocol = protocol
301 proxy = self.parse_proxy_options(config.get('proxy'))
302 self.server = host + ':%d:%s'%(port, protocol)
304 #print protocol, host, port
306 TcpStratumInterface.__init__(self, host, port, proxy)
307 elif protocol == 'h':
308 HttpStratumInterface.__init__(self, host, port, proxy)
310 print_error("Error: Unknown protocol")
311 TcpStratumInterface.__init__(self, host, port, proxy)
315 if self.protocol == 't':
316 TcpStratumInterface.run(self)
318 HttpStratumInterface.run(self)
320 def send(self, messages):
321 if self.protocol == 't':
322 TcpStratumInterface.send(self, messages)
324 HttpStratumInterface.send(self, messages)
327 def parse_proxy_options(self, s):
328 if type(s) == type({}): return s # fixme: type should be fixed
329 if type(s) != type(""): return None
330 if s.lower() == 'none': return None
331 proxy = { "mode":"socks5", "host":"localhost" }
334 if proxy_modes.count(args[n]) == 1:
335 proxy["mode"] = args[n]
338 proxy["host"] = args[n]
341 proxy["port"] = args[n]
343 proxy["port"] = "8080" if proxy["mode"] == "http" else "1080"
347 def set_server(self, server, proxy=None):
348 # raise an error if the format isnt correct
349 a,b,c = server.split(':')
351 assert c in ['t', 'h']
353 if server != self.server or proxy != self.proxy:
354 print "changing server:", server, proxy
357 self.is_connected = False # this exits the polling loop
365 class WalletSynchronizer(threading.Thread):
367 def __init__(self, wallet, config, loop=False, servers_loaded_callback=None):
368 threading.Thread.__init__(self)
373 self.init_interface()
374 self.servers_loaded_callback = servers_loaded_callback
376 def init_interface(self):
377 self.interface = Interface(self.config)
378 self.wallet.interface = self.interface
380 def handle_response(self, r):
388 if method == 'server.banner':
389 self.wallet.banner = result
390 self.wallet.was_updated = True
392 elif method == 'server.peers.subscribe':
401 if re.match("[th]\d+", v):
402 ports.append((v[0], v[1:]))
403 if re.match("v(.?)+", v):
405 if ports and version:
406 servers.append((host, ports))
407 self.interface.servers = servers
408 # servers_loaded_callback is None for commands, but should
409 # NEVER be None when using the GUI.
410 if self.servers_loaded_callback is not None:
411 self.servers_loaded_callback()
413 elif method == 'blockchain.address.subscribe':
415 self.wallet.receive_status_callback(addr, result)
417 elif method == 'blockchain.address.get_history':
419 self.wallet.receive_history_callback(addr, result)
420 self.wallet.was_updated = True
422 elif method == 'blockchain.transaction.broadcast':
423 self.wallet.tx_result = result
424 self.wallet.tx_event.set()
426 elif method == 'blockchain.numblocks.subscribe':
427 self.wallet.blocks = result
428 self.wallet.was_updated = True
430 elif method == 'server.version':
434 print_error("Error: Unknown message:" + method + ", " + params + ", " + result)
437 def start_interface(self):
438 self.interface.init_socket()
439 self.interface.start()
440 if self.interface.is_connected:
441 self.wallet.start_session(self.interface)
447 self.start_interface()
449 while self.interface.is_connected:
450 new_addresses = self.wallet.synchronize()
452 self.interface.subscribe(new_addresses)
454 if self.wallet.is_up_to_date():
455 if not self.wallet.up_to_date:
456 self.wallet.up_to_date = True
457 self.wallet.was_updated = True
458 self.wallet.up_to_date_event.set()
460 if self.wallet.up_to_date:
461 self.wallet.up_to_date = False
462 self.wallet.was_updated = True
464 if self.wallet.was_updated:
465 self.wallet.trigger_callbacks()
466 self.wallet.was_updated = False
468 response = self.interface.responses.get()
469 self.handle_response(response)
471 self.wallet.trigger_callbacks()
474 # Server has been changed. Copy callback for new interface.
475 self.proxy = self.interface.proxy
476 self.init_interface()
477 self.start_interface()