import thread, traceback, sys, urllib, operator
from json import dumps, loads
from Queue import Queue
-import time
+import time, threading
class AbeStore(Datastore_class):
self.store = AbeStore(config)
self.block_number = -1
self.watched_addresses = []
+ threading.Timer(10, self.run_store_iteration).start()
def process(self, request):
+ #print "abe process", request
+
message_id = request['id']
method = request['method']
params = request.get('params',[])
self.watched_addresses.append(addr)
- def run(self):
+ def run_store_iteration(self):
+ if self.shared.stopped():
+ print "exit timer"
+ return
- old_block_number = None
- while not self.shared.stopped():
- self.block_number = self.store.main_iteration()
-
- if self.block_number != old_block_number:
- old_block_number = self.block_number
- print "block number:", self.block_number
- self.push_response({ 'method':'blockchain.numblocks.subscribe', 'params':[self.block_number] })
-
- while True:
- try:
- addr = self.store.address_queue.get(False)
- except:
- break
- if addr in self.watched_addresses:
- status = self.store.get_status( addr )
- self.push_response({ 'method':'blockchain.address.subscribe', 'params':[addr, status] })
-
- time.sleep(10)
+ block_number = self.store.main_iteration()
+ if self.block_number != block_number:
+ self.block_number = block_number
+ print "block number:", self.block_number
+ self.push_response({ 'method':'blockchain.numblocks.subscribe', 'params':[self.block_number] })
+
+ while True:
+ try:
+ addr = self.store.address_queue.get(False)
+ except:
+ break
+ if addr in self.watched_addresses:
+ status = self.store.get_status( addr )
+ self.push_response({ 'method':'blockchain.address.subscribe', 'params':[addr, status] })
+ threading.Timer(10, self.run_store_iteration).start()
from processor import Processor
-class ServerProcessor(Processor):
- def __init__(self, config):
- Processor.__init__(self)
- self.daemon = True
- self.peers = {}
- self.banner = config.get('server','banner')
- self.host = config.get('server','host')
- self.password = config.get('server','password')
+class IrcThread(threading.Thread):
+ def __init__(self, processor, config):
+ threading.Thread.__init__(self)
+ self.processor = processor
+ self.daemon = True
self.stratum_tcp_port = config.get('server','stratum_tcp_port')
self.stratum_http_port = config.get('server','stratum_http_port')
-
- self.irc = config.get('server', 'irc') == 'yes'
- self.nick = config.get('server', 'irc_nick')
+ self.peers = {}
+ self.host = config.get('server','host')
+ self.nick = config.get('server', 'irc_nick')
if not self.nick: self.nick = random_string(10)
-
def get_peers(self):
return self.peers.values()
def run(self):
- if not self.irc:
- return
-
ircname = self.getname()
- while not self.shared.stopped():
+ while not self.processor.shared.stopped():
try:
s = socket.socket()
s.connect(('irc.freenode.net', 6667))
s.send('JOIN #electrum\n')
sf = s.makefile('r', 0)
t = 0
- while not self.shared.stopped():
+ while not self.processor.shared.stopped():
line = sf.readline()
line = line.rstrip('\r\n')
line = line.split()
ports = line[k+10:]
self.peers[name] = (ip, host, ports)
if time.time() - t > 5*60:
- self.push_response({'method':'server.peers', 'params':[self.get_peers()]})
+ self.processor.push_response({'method':'server.peers', 'params':[self.get_peers()]})
s.send('NAMES #electrum\n')
t = time.time()
self.peers = {}
sf.close()
s.close()
+ print "quitting IRC"
+
+
+class ServerProcessor(Processor):
+
+ def __init__(self, config):
+ Processor.__init__(self)
+ self.daemon = True
+ self.banner = config.get('server','banner')
+ self.password = config.get('server','password')
+
+ if config.get('server', 'irc') == 'yes':
+ self.irc = IrcThread(self, config)
+ else:
+ self.irc = None
+
+
+ def get_peers(self):
+ if self.irc:
+ return self.irc.get_peers()
+ else:
+ return []
+
+
+ def run(self):
+ if self.irc:
+ self.irc.start()
+ Processor.run(self)
def process(self, request):
method = request['method']
threading.Thread.__init__(self)
self.daemon = True
self.dispatcher = None
+ self.queue = queue.Queue()
def process(self, request):
pass
#print "response", response
self.dispatcher.request_dispatcher.push_response(response)
+ def run(self):
+ while not self.shared.stopped():
+ request = self.queue.get(10000000000)
+ try:
+ self.process(request)
+ except:
+ traceback.print_exc(file=sys.stdout)
+
+ print "processor terminating"
+
class Dispatcher:
raise TypeError("self.shared not set in Processor")
while not self.shared.stopped():
session, request = self.pop_request()
- self.process(session, request)
+ self.do_dispatch(session, request)
self.stop()
def stop(self):
pass
- def process(self, session, request):
+ def do_dispatch(self, session, request):
+ """ dispatch request to the relevant processor """
+
method = request['method']
params = request.get('params',[])
-
suffix = method.split('.')[-1]
if suffix == 'subscribe':
session.subscribe_to_service(method, params)
# store session and id locally
request['id'] = self.store_session_id(session, request['id'])
- # dispatch request to the relevant module..
prefix = request['method'].split('.')[0]
try:
p = self.processors[prefix]
except:
print "error: no processor for", prefix
return
- try:
- p.process(request)
- except:
- traceback.print_exc(file=sys.stdout)
+
+ p.queue.put(request)
if method in ['server.version']:
session.version = params[0]