generic processor; register backends
authorThomasV <thomasv@gitorious>
Wed, 28 Mar 2012 20:34:24 +0000 (00:34 +0400)
committerThomasV <thomasv@gitorious>
Wed, 28 Mar 2012 20:34:24 +0000 (00:34 +0400)
abe_backend.py
irc.py
modules/python_bitcoin/__init__.py
processor.py
server.py
stratum.py

index ae43dfd..f66ddf7 100644 (file)
@@ -381,7 +381,7 @@ class AbeStore(Datastore_class):
 
             if self.block_number != old_block_number:
                 old_block_number = self.block_number
-                processor.push_response({ 'method':'numblocks.subscribe', 'result':self.block_number })
+                processor.push_response({ 'method':'blockchain.numblocks.subscribe', 'result':self.block_number })
 
             while True:
                 try:
@@ -390,6 +390,44 @@ class AbeStore(Datastore_class):
                     break
                 if addr in self.watched_addresses:
                     status = self.get_status( addr )
-                    processor.push_response({ 'method':'address.subscribe', 'params':[addr], 'result':status })
+                    processor.push_response({ 'method':'blockchain.address.subscribe', 'params':[addr], 'result':status })
 
             time.sleep(10)
+
+
+
+class AbeBackend:
+
+    def __init__(self,config, processor):
+        self.store = AbeStore(config)
+        self.store.processor = processor
+        thread.start_new_thread(self.store.run,(processor,))
+
+    def process(self, request, queue):
+        message_id = request['id']
+        method = request['method']
+        params = request.get('params',[])
+        result = ''
+        if method == 'blockchain.numblocks.subscribe':
+            result = self.store.block_number
+        elif method == 'blockchain.address.subscribe':
+            address = params[0]
+            self.store.watch_address(address)
+            status = self.store.get_status(address)
+            result = status
+        elif method == 'blockchain.address.get_history':
+            address = params[0]
+            result = self.store.get_history( address ) 
+        elif method == 'blockchain.transaction.broadcast':
+            txo = self.store.send_tx(params[0])
+            print "sent tx:", txo
+            result = txo 
+        else:
+            print "unknown method", request
+
+        if result != '':
+            response = { 'id':message_id, 'method':method, 'params':params, 'result':result }
+            queue.put(response)
+
+
+
diff --git a/irc.py b/irc.py
index 8143593..bae1f80 100644 (file)
--- a/irc.py
+++ b/irc.py
@@ -60,3 +60,31 @@ class Irc(threading.Thread):
             finally:
                 sf.close()
                 s.close()
+
+
+class ServerBackend:
+
+    def __init__(self, config, processor):
+        self.banner = config.get('server','banner')
+        self.irc = Irc(processor, config.get('server','host'), config.get('server','ircname'))
+        self.irc.processor = processor
+        if (config.get('server','irc') == 'yes' ): 
+            self.irc.start()
+
+    def process(self, request, queue):
+        method = request['method']
+
+        result = ''
+        if method == 'server.banner':
+            result = self.banner.replace('\\n','\n')
+        elif method == 'server.peers.subscribe':
+            result = self.irc.get_peers()
+        else:
+            print "unknown method", request
+
+        if result!='':
+            response = { 'id':request['id'], 'method':method, 'params':request['params'], 'result':result }
+            queue.put(response)
+
+
+
index cd9a4db..361b7dc 100644 (file)
@@ -1,5 +1,5 @@
 import bitcoin
-import stratum
+from processor import Processor
 import threading
 import time
 
@@ -102,13 +102,6 @@ class NumblocksSubscribe:
         self.backend.blockchain.subscribe_reorganize(self.reorganize)
         self.backend.blockchain.fetch_last_depth(self.set_last_depth)
         self.latest = GhostValue()
-        self.subscribed = []
-
-    def subscribe(self, session, request):
-        last = self.latest.get()
-        self.push_response(session,{"id": request["id"], "result": last})
-        with self.lock:
-            self.subscribed.append((session, request))
 
     def set_last_depth(self, ec, last_depth):
         if ec:
@@ -119,74 +112,72 @@ class NumblocksSubscribe:
     def reorganize(self, ec, fork_point, arrivals, replaced):
         latest = fork_point + len(arrivals)
         self.latest.set(latest)
-        subscribed = self.spring_clean()
-        for session, request in subscribed:
-            self.push_response(session,{"id": request["id"], "result": latest})
+        self.push_response({"method":"numblocks.subscribe", "result": latest})
         self.backend.blockchain.subscribe_reorganize(self.reorganize)
 
-    def spring_clean(self):
-        with self.lock:
-            self.subscribed = [sub for sub in self.subscribed
-                               if not sub[0].stopped()]
-            return self.subscribed[:]
 
 class AddressGetHistory:
 
     def __init__(self, backend):
         self.backend = backend
 
-    def get(self, session, request):
+    def get(self, request):
         address = str(request["params"])
         composed.payment_history(self.backend.blockchain, address,
-            bitcoin.bind(self.respond, session, request, bitcoin._1))
+            bitcoin.bind(self.respond, request, bitcoin._1))
 
-    def respond(self, session, request, result):
-        self.push_response(session,{"id": request["id"], "result": result})
+    def respond(self, request, result):
+        self.push_response({"id": request["id"], "method":request["method"], "params":request["params"], "result": result})
 
-class LibbitcoinProcessor(stratum.Processor):
+class LibbitcoinProcessor(Processor):
 
     def __init__(self):
         self.backend = Backend()
         self.numblocks_subscribe = NumblocksSubscribe(self.backend)
         self.address_get_history = AddressGetHistory(self.backend)
-        stratum.Processor.__init__(self)
+        Processor.__init__(self)
 
     def stop(self):
         self.backend.stop()
 
-    def process(self, session, request):
+    def process(self, request):
 
         print "New request (lib)", request
         if request["method"] == "numblocks.subscribe":
             self.numblocks_subscribe.subscribe(session, request)
         elif request["method"] == "address.get_history":
-            self.address_get_history.get(session, request)
+            self.address_get_history.get(request)
         elif request["method"] == "server.banner":
-            self.push_response(session, {"id": request["id"],
+            self.push_response({"id": request["id"], "method": request["method"], "params":request["params"],
                 "result": "libbitcoin using python-bitcoin bindings"})
         elif request["method"] == "transaction.broadcast":
-            self.broadcast_transaction(session, request)
+            self.broadcast_transaction(request)
         # Execute and when ready, you call
-        # self.push_response(session,response)
+        # self.push_response(response)
 
-    def broadcast_transaction(self, session, request):
+    def broadcast_transaction(self, request):
         raw_tx = bitcoin.data_chunk(str(request["params"]))
         exporter = bitcoin.satoshi_exporter()
         try:
             tx = exporter.load_transaction(raw_tx)
         except RuntimeError:
-            response = {"id": request["id"], "result": None,
+            response = {"id": request["id"], "method": request["method"], "params":request["params"], "result": None,
                 "error": {"message": 
                     "Exception while parsing the transaction data.",
                     "code": -4}}
         else:
             self.backend.protocol.broadcast_transaction(tx)
             tx_hash = str(bitcoin.hash_transaction(tx))
-            response = {"id": request["id"], "result": tx_hash}
-        self.push_response(session,response)
+            response = {"id": request["id"], "method": request["method"], "params":request["params"], "result": tx_hash}
+        self.push_response(response)
+
+
 
-def run(stratum):
+def run(processor):
+    #processor = LibbitcoinProcessor()
     print "Warning: pre-alpha prototype. Full of bugs."
-    processor = LibbitcoinProcessor()
-    stratum.start(processor)
+    while not processor.shared.stopped():
+        if raw_input() == "quit":
+            shared.stop()
+        time.sleep(1)
 
index 28df39e..1fd7909 100644 (file)
@@ -2,6 +2,7 @@ import json
 import socket
 import threading
 import time
+import traceback, sys
 import Queue as queue
 
 class Shared:
@@ -32,6 +33,7 @@ class Processor(threading.Thread):
         self.internal_id = 1
         self.lock = threading.Lock()
         self.sessions = []
+        self.processors = {}
 
     def push_response(self, item):
         self.response_queue.put(item)
@@ -56,6 +58,9 @@ class Processor(threading.Thread):
             self.internal_id += 1
             return r
 
+    def register(self, prefix, function):
+        self.processors[prefix] = function
+
     def run(self):
         if self.shared is None:
             raise TypeError("self.shared not set in Processor")
@@ -65,12 +70,26 @@ class Processor(threading.Thread):
             method = request['method']
             params = request.get('params',[])
 
-            if method in [ 'numblocks.subscribe', 'address.subscribe', 'server.peers']:
+            suffix = method.split('.')[-1]
+            if suffix == 'subscribe':
                 session.subscribe_to_service(method, params)
 
             # store session and id locally
             request['id'] = self.store_session_id(session, request['id'])
-            self.process(request)
+
+            # dispatch request to the relevant module..
+            prefix = method.split('.')[0]
+            try:
+                func = self.processors[prefix]
+            except:
+                print "error: no processor for", prefix
+                continue
+
+            try:
+                func(request,self.response_queue)
+            except:
+                traceback.print_exc(file=sys.stdout)
+                continue
 
         self.stop()
 
index 675ef5f..97a9773 100755 (executable)
--- a/server.py
+++ b/server.py
@@ -56,47 +56,11 @@ from processor import Shared, Processor, Dispatcher
 from stratum_http import HttpServer
 from stratum import TcpServer
 from native import NativeServer
-from irc import Irc
-from abe_backend import AbeStore
-
-class AbeProcessor(Processor):
-    def process(self,request):
-        message_id = request['id']
-        method = request['method']
-        params = request.get('params',[])
-        #print request
-
-        result = ''
-        if method == 'numblocks.subscribe':
-            result = store.block_number
-        elif method == 'address.subscribe':
-            address = params[0]
-            store.watch_address(address)
-            status = store.get_status(address)
-            result = status
-        elif method == 'client.version':
-            #session.version = params[0]
-            pass
-        elif method == 'server.banner':
-            result = config.get('server','banner').replace('\\n','\n')
-        elif method == 'server.peers':
-            result = irc.get_peers()
-        elif method == 'address.get_history':
-            address = params[0]
-            result = store.get_history( address ) 
-        elif method == 'transaction.broadcast':
-            txo = store.send_tx(params[0])
-            print "sent tx:", txo
-            result = txo 
-        else:
-            print "unknown method", request
 
-        if result!='':
-            response = { 'id':message_id, 'method':method, 'params':params, 'result':result }
-            self.push_response(response)
 
-    def get_status(self,addr):
-        return store.get_status(addr)
+import irc 
+import abe_backend 
+from processor import Processor
 
 
 
@@ -116,25 +80,25 @@ if __name__ == '__main__':
         print out
         sys.exit(0)
 
-    processor = AbeProcessor()
+    processor = Processor()
     shared = Shared()
     # Bind shared to processor since constructor is user defined
     processor.shared = shared
     processor.start()
 
-    irc = Irc(processor, config.get('server','host'), config.get('server','ircname'))
-    if (config.get('server','irc') == 'yes' ): irc.start()
+    abe = abe_backend.AbeBackend(config, processor)
+    processor.register('blockchain', abe.process)
 
-    # backend
-    store = AbeStore(config)
+    sb = irc.ServerBackend(config, processor)
+    processor.register('server', sb.process)
 
     # dispatcher
     dispatcher = Dispatcher(shared, processor)
     dispatcher.start()
 
-    host = config.get('server','host')
     # Create various transports we need
-    transports = [ NativeServer(shared, store, irc, config.get('server','banner'), host, 50000),
+    host = config.get('server','host')
+    transports = [ NativeServer(shared, abe.store, sb.irc, config.get('server','banner'), host, 50000),
                    TcpServer(shared, processor, host, 50001),
                    HttpServer(shared, processor, host, 8081),
                    ]
@@ -142,6 +106,7 @@ if __name__ == '__main__':
         server.start()
 
     print "starting Electrum server on", host
-    store.run(processor)
+    while not shared.stopped():
+        time.sleep(1)
     print "server stopped"
 
index f364d93..ce39aca 100644 (file)
@@ -12,6 +12,7 @@ class TcpSession(Session):
         self._connection = connection
         self.address = address
         Session.__init__(self)
+        print "New session", address
 
     def connection(self):
         if self.stopped():
@@ -128,24 +129,3 @@ class TcpServer(threading.Thread):
 
 
 
-class Stratum:
-
-    def start(self, processor):
-        shared = Shared()
-        # Bind shared to processor since constructor is user defined
-        processor.shared = shared
-        processor.start()
-        # Create various transports we need
-        transports = TcpServer(shared, processor, "176.31.24.241", 50001),
-        for server in transports:
-            server.start()
-        while not shared.stopped():
-            if raw_input() == "quit":
-                shared.stop()
-            time.sleep(1)
-
-if __name__ == "__main__":
-    processor = Processor()
-    app = Stratum()
-    app.start(processor)
-