2 # Copyright(C) 2012 thomasv@gitorious
4 # This program is free software: you can redistribute it and/or modify
5 # it under the terms of the GNU Affero General Public License as
6 # published by the Free Software Foundation, either version 3 of the
7 # License, or (at your option) any later version.
9 # This program is distributed in the hope that it will be useful, but
10 # WITHOUT ANY WARRANTY; without even the implied warranty of
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
12 # Affero General Public License for more details.
14 # You should have received a copy of the GNU Affero General Public
15 # License along with this program. If not, see
16 # <http://www.gnu.org/licenses/agpl.html>.
20 * server should check and return bitcoind status..
21 * improve txpoint sorting
22 * command to check cache
24 mempool transactions do not need to be added to the database; it slows it down
32 import time, json, socket, operator, thread, ast, sys, re, traceback
34 from json import dumps, loads
38 config = ConfigParser.ConfigParser()
39 # set some defaults, which will be overwritten by the config file
40 config.add_section('server')
41 config.set('server','banner', 'Welcome to Electrum!')
42 config.set('server', 'host', 'localhost')
43 config.set('server', 'port', '50000')
44 config.set('server', 'password', '')
45 config.set('server', 'irc', 'yes')
46 config.set('server', 'ircname', 'Electrum server')
47 config.add_section('database')
48 config.set('database', 'type', 'psycopg2')
49 config.set('database', 'database', 'abe')
52 f = open('/etc/electrum.conf','r')
56 print "Could not read electrum.conf. I will use the default values."
59 f = open('/etc/electrum.banner','r')
60 config.set('server','banner', f.read())
66 password = config.get('server','password')
74 return ''.join(random.choice(string.ascii_uppercase + string.digits) for x in range(N))
78 def modified_addresses(a_session):
81 session = copy.deepcopy(a_session)
82 addresses = session['addresses']
83 session['last_time'] = time.time()
86 for addr in addresses:
87 status = store.get_status( addr )
88 msg_id, last_status = addresses.get( addr )
89 if last_status != status:
90 addresses[addr] = msg_id, status
93 #t2 = time.time() - t1
94 #if t2 > 10: print "high load:", session_id, "%d/%d"%(k,len(addresses)), t2
98 def poll_session(session_id):
100 session = sessions.get(session_id)
102 print time.asctime(), "session not found", session_id
105 sessions[session_id]['last_time'] = time.time()
106 ret, addresses = modified_addresses(session)
107 if ret: sessions[session_id]['addresses'] = addresses
108 return repr( (store.block_number,ret))
111 def add_address_to_session(session_id, address):
112 status = store.get_status(address)
113 sessions[session_id]['addresses'][address] = ("", status)
114 sessions[session_id]['last_time'] = time.time()
118 def new_session(version, addresses):
119 session_id = random_string(10)
120 sessions[session_id] = { 'addresses':{}, 'version':version }
122 sessions[session_id]['addresses'][a] = ('','')
123 out = repr( (session_id, config.get('server','banner').replace('\\n','\n') ) )
124 sessions[session_id]['last_time'] = time.time()
128 def update_session(session_id,addresses):
129 """deprecated in 0.42, wad replaced by add_address_to_session"""
130 sessions[session_id]['addresses'] = {}
132 sessions[session_id]['addresses'][a] = ''
133 sessions[session_id]['last_time'] = time.time()
137 def native_server_thread():
138 s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
139 s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
140 s.bind((config.get('server','host'), config.getint('server','port')))
143 conn, addr = s.accept()
145 thread.start_new_thread(native_client_thread, (addr, conn,))
147 # can't start new thread if there is no memory..
148 traceback.print_exc(file=sys.stdout)
151 def native_client_thread(ipaddr,conn):
152 #print "client thread", ipaddr
162 msg = msg.split('#', 1)[0]
165 cmd, data = ast.literal_eval(msg)
167 print "syntax error", repr(msg), ipaddr
171 out = do_command(cmd, data, ipaddr)
173 #print ipaddr, cmd, len(out)
177 print "error, could not send"
184 return time.strftime("[%d/%m/%Y-%H:%M:%S]")
186 # used by the native handler
187 def do_command(cmd, data, ipaddr):
190 out = "%d"%block_number
192 elif cmd in ['session','new_session']:
195 addresses = ast.literal_eval(data)
198 version, addresses = ast.literal_eval(data)
199 if version[0]=="0": version = "v" + version
203 print timestr(), "new session", ipaddr, addresses[0] if addresses else addresses, len(addresses), version
204 out = new_session(version, addresses)
206 elif cmd=='address.subscribe':
208 session_id, addr = ast.literal_eval(data)
210 traceback.print_exc(file=sys.stdout)
213 out = add_address_to_session(session_id,addr)
215 elif cmd=='update_session':
217 session_id, addresses = ast.literal_eval(data)
219 traceback.print_exc(file=sys.stdout)
221 print timestr(), "update session", ipaddr, addresses[0] if addresses else addresses, len(addresses)
222 out = update_session(session_id,addresses)
225 out = poll_session(data)
229 out = repr( store.get_history( address ) )
232 out = store.send_tx(data)
233 print timestr(), "sent tx:", ipaddr, out
236 out = repr(irc.get_peers())
244 def clean_session_thread():
248 for k,s in sessions.items():
249 if s.get('type') == 'persistent': continue
253 print "lost session", k
256 ####################################################################
259 from processor import Shared, Processor, Dispatcher
260 from stratum_http import HttpServer
261 from stratum import TcpServer
263 class AbeProcessor(Processor):
264 def process(self,request):
265 message_id = request['id']
266 method = request['method']
267 params = request.get('params',[])
271 if method == 'numblocks.subscribe':
272 result = store.block_number
273 elif method == 'address.subscribe':
275 store.watch_address(address)
276 status = store.get_status(address)
278 elif method == 'client.version':
279 #session.version = params[0]
281 elif method == 'server.banner':
282 result = config.get('server','banner').replace('\\n','\n')
283 elif method == 'server.peers':
284 result = irc.get_peers()
285 elif method == 'address.get_history':
287 result = store.get_history( address )
288 elif method == 'transaction.broadcast':
289 txo = store.send_tx(params[0])
290 print "sent tx:", txo
293 print "unknown method", request
296 response = { 'id':message_id, 'method':method, 'params':params, 'result':result }
297 self.push_response(response)
299 def get_status(self,addr):
300 return store.get_status(addr)
304 ####################################################################
308 class Irc(threading.Thread):
310 def __init__(self, processor):
311 self.processor = processor
312 threading.Thread.__init__(self)
317 return self.peers.values()
320 NICK = 'E_'+random_string(10)
321 while not self.processor.shared.stopped():
324 s.connect(('irc.freenode.net', 6667))
325 s.send('USER electrum 0 * :'+config.get('server','host')+' '+config.get('server','ircname')+'\n')
326 s.send('NICK '+NICK+'\n')
327 s.send('JOIN #electrum\n')
328 sf = s.makefile('r', 0)
330 while not self.processor.shared.stopped():
332 line = line.rstrip('\r\n')
335 s.send('PONG '+line[1]+'\n')
336 elif '353' in line: # answer to /names
337 k = line.index('353')
338 for item in line[k+1:]:
339 if item[0:2] == 'E_':
340 s.send('WHO %s\n'%item)
341 elif '352' in line: # answer to /who
342 # warning: this is a horrible hack which apparently works
343 k = line.index('352')
345 ip = socket.gethostbyname(ip)
348 self.peers[name] = (ip,host)
349 if time.time() - t > 5*60:
350 self.processor.push_response({'method':'server.peers', 'result':[self.get_peers()]})
351 s.send('NAMES #electrum\n')
355 traceback.print_exc(file=sys.stdout)
363 if __name__ == '__main__':
367 server = jsonrpclib.Server('http://%s:8081'%config.get('server','host'))
370 out = server.load(password)
372 out = server.server.peers()
374 out = server.stop(password)
375 elif cmd == 'clear_cache':
376 out = server.clear_cache(password)
377 elif cmd == 'get_cache':
378 out = server.get_cache(password,sys.argv[2])
380 out = server.address.get_history(sys.argv[2])
382 out = server.transaction.broadcast(sys.argv[2])
384 out = server.numblocks.subscribe()
386 out = "Unknown command: '%s'" % cmd
391 store = abe_backend.AbeStore(config)
394 thread.start_new_thread(native_server_thread, ())
395 thread.start_new_thread(clean_session_thread, ())
397 processor = AbeProcessor()
399 # Bind shared to processor since constructor is user defined
400 processor.shared = shared
403 dispatcher = Dispatcher(shared, processor)
405 # Create various transports we need
406 transports = [ TcpServer(shared, processor, "ecdsa.org",50001),
407 HttpServer(shared, processor, "ecdsa.org",8081)
409 for server in transports:
413 if (config.get('server','irc') == 'yes' ):
418 print "starting Electrum server"
420 print "server stopped"