import ConfigParser
from json import dumps, loads
import urllib
-
+import threading
config = ConfigParser.ConfigParser()
# set some defaults, which will be overwritten by the config file
m_sessions = [{}] # served by http
-peer_list = {}
from Queue import Queue
input_queue = Queue()
out = cmd_stop(data)
elif cmd == 'peers':
- out = repr(peer_list.values())
+ out = repr(irc.get_peers())
else:
out = None
elif method == 'server.banner':
result = config.get('server','banner').replace('\\n','\n')
elif method == 'server.peers':
- result = peer_list.values()
+ result = irc.get_peers()
elif method == 'address.get_history':
address = params[0]
result = store.get_history( address )
+class Irc(threading.Thread):
+ def __init__(self, processor):
+ self.processor = processor
+ threading.Thread.__init__(self)
+ self.daemon = True
+ self.peers = {}
-def irc_thread():
- global peer_list
- NICK = 'E_'+random_string(10)
- while not stopping:
- try:
- s = socket.socket()
- s.connect(('irc.freenode.net', 6667))
- s.send('USER electrum 0 * :'+config.get('server','host')+' '+config.get('server','ircname')+'\n')
- s.send('NICK '+NICK+'\n')
- s.send('JOIN #electrum\n')
- sf = s.makefile('r', 0)
- t = 0
- while not stopping:
- line = sf.readline()
- line = line.rstrip('\r\n')
- line = line.split()
- if line[0]=='PING':
- s.send('PONG '+line[1]+'\n')
- elif '353' in line: # answer to /names
- k = line.index('353')
- for item in line[k+1:]:
- if item[0:2] == 'E_':
- s.send('WHO %s\n'%item)
- elif '352' in line: # answer to /who
- # warning: this is a horrible hack which apparently works
- k = line.index('352')
- ip = line[k+4]
- ip = socket.gethostbyname(ip)
- name = line[k+6]
- host = line[k+9]
- peer_list[name] = (ip,host)
- if time.time() - t > 5*60:
- s.send('NAMES #electrum\n')
- t = time.time()
- peer_list = {}
- except:
- traceback.print_exc(file=sys.stdout)
- finally:
- sf.close()
- s.close()
+ def get_peers(self):
+ return self.peers.values()
+
+ def run(self):
+ NICK = 'E_'+random_string(10)
+ while not self.processor.shared.stopped():
+ try:
+ s = socket.socket()
+ s.connect(('irc.freenode.net', 6667))
+ s.send('USER electrum 0 * :'+config.get('server','host')+' '+config.get('server','ircname')+'\n')
+ s.send('NICK '+NICK+'\n')
+ s.send('JOIN #electrum\n')
+ sf = s.makefile('r', 0)
+ t = 0
+ while not self.processor.shared.stopped():
+ line = sf.readline()
+ line = line.rstrip('\r\n')
+ line = line.split()
+ if line[0]=='PING':
+ s.send('PONG '+line[1]+'\n')
+ elif '353' in line: # answer to /names
+ k = line.index('353')
+ for item in line[k+1:]:
+ if item[0:2] == 'E_':
+ s.send('WHO %s\n'%item)
+ elif '352' in line: # answer to /who
+ # warning: this is a horrible hack which apparently works
+ k = line.index('352')
+ ip = line[k+4]
+ ip = socket.gethostbyname(ip)
+ name = line[k+6]
+ host = line[k+9]
+ self.peers[name] = (ip,host)
+ if time.time() - t > 5*60:
+ self.processor.push_response({'method':'server.peers', 'result':[self.get_peers()]})
+ s.send('NAMES #electrum\n')
+ t = time.time()
+ self.peers = {}
+ except:
+ traceback.print_exc(file=sys.stdout)
+ finally:
+ sf.close()
+ s.close()
def get_peers_json(_,__):
- return peer_list.values()
+ return irc.get_peers()
def http_server_thread():
# see http://code.google.com/p/jsonrpclib/
tcpserver.start()
#http stratum
- from StratumJSONRPCServer import HttpServer
+ from stratum_http import HttpServer
server = HttpServer(shared, processor, "ecdsa.org",8081)
server.start()
if (config.get('server','irc') == 'yes' ):
- thread.start_new_thread(irc_thread, ())
+ irc = Irc(processor)
+ irc.start()
+
print "starting Electrum server"
store.run(processor)
self.daemon = True
self.request_queue = queue.Queue()
self.response_queue = queue.Queue()
- self.id_session = {}
+ self.internal_ids = {}
+ self.internal_id = 0
+ self.lock = threading.Lock()
def push_response(self, item):
self.response_queue.put(item)
def pop_request(self):
return self.request_queue.get()
+ def get_session_id(self, internal_id):
+ with self.lock:
+ return session_ids.pop(internal_id)
+
+ def store_session_id(self, session, msgid):
+ with self.lock:
+ self.internal_ids[self.internal_id] = session, msgid
+ self.internal_id += 1
+
def run(self):
if self.shared is None:
raise TypeError("self.shared not set in Processor")
method = request['method']
params = request.get('params',[])
- if method == 'numblocks.subscribe':
- session.subscribe_to_numblocks()
-
- elif method == 'address.subscribe':
- address = params[0]
- session.subscribe_to_address(address)
-
- elif method == 'server.peers':
- session.subscribe_to_peers()
+ if method in [ 'numblocks.subscribe', 'address.subscribe', 'server.peers']:
+ session.subscribe_to_service(method, params)
- message_id = request['id']
- self.id_session[message_id] = session
+ # store session and id locally
+ request['id'] = self.store_session_id(session, request['id'])
self.process(request)
self.stop()
self.address = address
self._stopped = False
self.lock = threading.Lock()
- self.numblocks_sub = None
- self.addresses_sub = {}
+ self.subscriptions = []
print "new session", address
def stop(self):
else:
return self._connection
- def subscribe_to_numblocks(self):
+ def subscribe_to_service(self, method, params):
with self.lock:
- self.numblocks_sub = True
+ self.subscriptions.append((method, params))
- def subscribe_to_peers(self):
- pass
-
- def subscribe_to_address(self,address):
- with self.lock:
- self.addresses_sub[address] = 'unknown'
class TcpResponder(threading.Thread):
def run(self):
while not self.shared.stopped():
response = self.processor.pop_response()
- # if it is a subscription, find the list of sessions that suuscribed
-
- # if there is an id, there should be a session
- # note: I must add and remove the session id to the message id..
- message_id = response.get('id')
+ internal_id = response.get('id')
+ params = response.get('params',[])
try:
method = response['method']
except:
print "no method", response
continue
- if message_id:
- session = self.processor.id_session.pop(message_id)
+ if internal_id:
+ session, message_id = self.processor.get_session_id(internal_id)
+ if message_id:
+ response['id'] = message_id
self.send_response(response, session)
- elif method == 'numblocks.subscribe':
+ else:
for session in self.server.sessions:
if not session.stopped():
- if session.numblocks_sub:
+ if (method,params) in session.subscriptions:
self.send_response(response, session)
- elif method == 'address.subscribe':
- for session in self.server.sessions:
- if not session.stopped():
- addr = response['params'][0]
- last_status = session.addresses_sub.get(addr)
- if last_status:
- new_status = response.get('result')
- if new_status != last_status:
- session.addresses_sub[addr] = new_status
- self.send_response(response, session)
- else:
- print "error", response
def send_response(self, response, session):