added broadcast_transaction hook into libbitcoin
[electrum-server.git] / stratum.py
index 74e5866..ba3ab22 100644 (file)
@@ -32,6 +32,10 @@ class Processor(threading.Thread):
                     # to our internal register
                     self.add_session(session)
                     self.process(session)
+        self.stop()
+
+    def stop(self):
+        pass
 
     def process(self, session):
         request = session.pop_request()
@@ -49,6 +53,8 @@ class Session:
 
         self.request_queue = queue.Queue()
         self.response_queue = queue.Queue()
+        self.numblocks_sub = None
+        self.addresses_sub = {}
 
     def stop(self):
         self._connection.close()
@@ -72,12 +78,21 @@ class Session:
     def pop_request(self):
         return self.request_queue.get()
 
-    def push_response(self):
+    def push_response(self, item):
         self.response_queue.put(item)
 
     def pop_response(self):
         return self.response_queue.get()
 
+    def subscribe_to_numblocks(self,message_id):
+        with self.lock:
+            self.numblocks_sub = message_id
+    
+    def subscribe_to_address(self,address,message_id,status):
+        with self.lock:
+            self.addresses_sub[address] = message_id,status
+
+
 class TcpClientResponder(threading.Thread):
 
     def __init__(self, shared, session):
@@ -131,47 +146,52 @@ class TcpClientRequestor(threading.Thread):
             return None
 
     def parse(self):
-        while True:
-            raw_buffer = self.message.find('\n')
-            if raw_buffer == -1:
-                return True
+        raw_buffer = self.message.find('\n')
+        if raw_buffer == -1:
+            return True
 
-            command = self.message[0:raw_buffer].strip()
-            self.message = self.message[raw_buffer + 1:]
-            if command == 'quit': 
-                return False
+        raw_command = self.message[0:raw_buffer].strip()
+        self.message = self.message[raw_buffer + 1:]
+        if raw_command == 'quit': 
+            return False
 
-            try:
-                command = json.loads(command)
-            except:
-                print "json error", repr(command)
-                continue
+        try:
+            command = json.loads(raw_command)
+        except:
+            self.session.push_response(
+                {"error": "bad JSON", "request": raw_command})
+            return True
 
-            try:
-                message_id = command.get('id')
-                method = command.get('method')
-                params = command.get('params')
-            except:
-                print "syntax error", repr(command), self.session.address[0]
-                continue
+        try:
+            # Try to load vital fields, and return an error if
+            # unsuccessful.
+            message_id = command['id']
+            method = command['method']
+        except KeyError:
+            # Return an error JSON in response.
+            self.session.push_response(
+                {"error": "syntax error", "request": raw_command})
+        else:
+            self.session.push_request(command)
 
-            self.session.push_request((message_id, method, params))
-            print message_id, method, params
+        return True
 
 class TcpServer(threading.Thread):
 
-    def __init__(self, shared, processor):
+    def __init__(self, shared, processor, host, port):
         self.shared = shared
         self.processor = processor
         self.clients = []
         threading.Thread.__init__(self)
         self.daemon = True
+        self.host = host
+        self.port = port
 
     def run(self):
         print "TCP server started."
         sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
         sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
-        sock.bind(("localhost", 50001))
+        sock.bind((self.host, self.port))
         sock.listen(1)
         while not self.shared.stopped():
             session = Session(*sock.accept())
@@ -204,7 +224,7 @@ class Stratum:
         processor.shared = shared
         processor.start()
         # Create various transports we need
-        transports = TcpServer(shared, processor),
+        transports = TcpServer(shared, processor, "176.31.24.241",50001),
         for server in transports:
             server.start()
         while not shared.stopped():