X-Git-Url: https://git.novaco.in/?a=blobdiff_plain;f=processor.py;h=baec5d085dd6fa5048a0d1d50cfbbb51f7d83196;hb=a5ee6ee874d534225b59bf2d5efc3c7785c126ab;hp=82124e0c4476ca3550699dd4b0c1b491acb1fff9;hpb=9526ce60749182cea0be7da5838fa04e3d59e0f1;p=electrum-server.git diff --git a/processor.py b/processor.py index 82124e0..baec5d0 100644 --- a/processor.py +++ b/processor.py @@ -1,35 +1,23 @@ import json +import Queue as queue import socket import threading import time -import traceback, sys -import Queue as queue - -def random_string(N): - import random, string - return ''.join(random.choice(string.ascii_uppercase + string.digits) for x in range(N)) - -def timestr(): - return time.strftime("[%d/%m/%Y-%H:%M:%S]") - - -print_lock = threading.Lock() -def print_log(*args): - args = [str(item) for item in args] - with print_lock: - sys.stderr.write(timestr() + " " + " ".join(args) + "\n") - sys.stderr.flush() +import traceback +import sys +from utils import random_string, timestr, print_log class Shared: - def __init__(self): + def __init__(self, config): self.lock = threading.Lock() self._stopped = False + self.config = config def stop(self): - print_log( "Stopping Stratum" ) + print_log("Stopping Stratum") with self.lock: self._stopped = True @@ -64,14 +52,13 @@ class Processor(threading.Thread): except: traceback.print_exc(file=sys.stdout) - print_log( "processor terminating") - + print_log("processor terminating") class Dispatcher: - def __init__(self): - self.shared = Shared() + def __init__(self, config): + self.shared = Shared(config) self.request_dispatcher = RequestDispatcher(self.shared) self.request_dispatcher.start() self.response_dispatcher = \ @@ -85,7 +72,6 @@ class Dispatcher: self.request_dispatcher.processors[prefix] = processor - class RequestDispatcher(threading.Thread): def __init__(self, shared): @@ -107,7 +93,7 @@ class RequestDispatcher(threading.Thread): return self.response_queue.get() def push_request(self, session, item): - self.request_queue.put((session,item)) + self.request_queue.put((session, item)) def pop_request(self): return self.request_queue.get() @@ -137,7 +123,6 @@ class RequestDispatcher(threading.Thread): self.do_dispatch(session, request) except: traceback.print_exc(file=sys.stdout) - self.stop() @@ -148,7 +133,7 @@ class RequestDispatcher(threading.Thread): """ dispatch request to the relevant processor """ method = request['method'] - params = request.get('params',[]) + params = request.get('params', []) suffix = method.split('.')[-1] if session is not None: @@ -163,7 +148,7 @@ class RequestDispatcher(threading.Thread): try: p = self.processors[prefix] except: - print_log( "error: no processor for", prefix) + print_log("error: no processor for", prefix) return p.add_request(request) @@ -226,7 +211,11 @@ class Session: addr = None if self.subscriptions: - print_log( "%4s"%self.name, "%14s"%self.address, "%35s"%addr, "%3d"%len(self.subscriptions), self.version ) + print_log("%4s" % self.name, + "%15s" % self.address, + "%35s" % addr, + "%3d" % len(self.subscriptions), + self.version) def stopped(self): with self.lock: @@ -256,7 +245,7 @@ class Session: def contains_subscription(self, subdesc): with self.lock: return subdesc in self.subscriptions - + class ResponseDispatcher(threading.Thread): @@ -278,16 +267,21 @@ class ResponseDispatcher(threading.Thread): params = response.get('params') # A notification - if internal_id is None: # and method is not None and params is not None: + if internal_id is None: # and method is not None and params is not None: found = self.notification(method, params, response) if not found and method == 'blockchain.address.subscribe': - self.request_dispatcher.push_request(None,{'method':method.replace('.subscribe', '.unsubscribe'), 'params':params, 'id':None}) - pass + request = { + 'id': None, + 'method': method.replace('.subscribe', '.unsubscribe'), + 'params': [self.shared.config.get('server', 'password')] + params, + } + + self.request_dispatcher.push_request(None, request) # A response - elif internal_id is not None: + elif internal_id is not None: self.send_response(internal_id, response) else: - print_log( "no method", response) + print_log("no method", response) def notification(self, method, params, response): subdesc = Session.build_subdesc(method, params) @@ -298,7 +292,7 @@ class ResponseDispatcher(threading.Thread): if session.contains_subscription(subdesc): session.send_response(response) found = True - if not found: print "no subscriber for", subdesc + # if not found: print_log("no subscriber for", subdesc) return found def send_response(self, internal_id, response): @@ -306,6 +300,5 @@ class ResponseDispatcher(threading.Thread): if session: response['id'] = message_id session.send_response(response) - else: - print_log( "send_response: no session", message_id, internal_id, response ) - + #else: + # print_log("send_response: no session", message_id, internal_id, response )