generic processor; register backends
[electrum-server.git] / abe_backend.py
index 8f6edf3..f66ddf7 100644 (file)
@@ -7,6 +7,7 @@ import psycopg2, binascii
 import thread, traceback, sys, urllib, operator
 from json import dumps, loads
 from Queue import Queue
+import time
 
 class AbeStore(Datastore_class):
 
@@ -30,6 +31,8 @@ class AbeStore(Datastore_class):
         self.address_queue = Queue()
 
         self.dblock = thread.allocate_lock()
+        self.block_number = -1
+        self.watched_addresses = []
 
 
 
@@ -365,3 +368,66 @@ class AbeStore(Datastore_class):
             store.dblock.release()
 
         return block_number
+
+    def watch_address(self, addr):
+        if addr not in self.watched_addresses:
+            self.watched_addresses.append(addr)
+
+    def run(self, processor):
+        
+        old_block_number = None
+        while not processor.shared.stopped():
+            self.block_number = self.main_iteration()
+
+            if self.block_number != old_block_number:
+                old_block_number = self.block_number
+                processor.push_response({ 'method':'blockchain.numblocks.subscribe', 'result':self.block_number })
+
+            while True:
+                try:
+                    addr = self.address_queue.get(False)
+                except:
+                    break
+                if addr in self.watched_addresses:
+                    status = self.get_status( addr )
+                    processor.push_response({ 'method':'blockchain.address.subscribe', 'params':[addr], 'result':status })
+
+            time.sleep(10)
+
+
+
+class AbeBackend:
+
+    def __init__(self,config, processor):
+        self.store = AbeStore(config)
+        self.store.processor = processor
+        thread.start_new_thread(self.store.run,(processor,))
+
+    def process(self, request, queue):
+        message_id = request['id']
+        method = request['method']
+        params = request.get('params',[])
+        result = ''
+        if method == 'blockchain.numblocks.subscribe':
+            result = self.store.block_number
+        elif method == 'blockchain.address.subscribe':
+            address = params[0]
+            self.store.watch_address(address)
+            status = self.store.get_status(address)
+            result = status
+        elif method == 'blockchain.address.get_history':
+            address = params[0]
+            result = self.store.get_history( address ) 
+        elif method == 'blockchain.transaction.broadcast':
+            txo = self.store.send_tx(params[0])
+            print "sent tx:", txo
+            result = txo 
+        else:
+            print "unknown method", request
+
+        if result != '':
+            response = { 'id':message_id, 'method':method, 'params':params, 'result':result }
+            queue.put(response)
+
+
+