use stratum
[electrum-server.git] / server.py
index 322993d..666d5d4 100755 (executable)
--- a/server.py
+++ b/server.py
@@ -374,129 +374,57 @@ def do_command(cmd, data, ipaddr):
     return out
 
 
-
-####################################################################
-
-def tcp_server_thread():
-    thread.start_new_thread(process_input_queue, ())
-    thread.start_new_thread(process_output_queue, ())
-
-    s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
-    s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
-    s.bind((config.get('server','host'), 50001))
-    s.listen(1)
+def clean_session_thread():
     while not stopping:
-        conn, addr = s.accept()
-        try:
-            thread.start_new_thread(tcp_client_thread, (addr, conn,))
-        except:
-            # can't start new thread if there is no memory..
-            traceback.print_exc(file=sys.stdout)
-
-
-def close_session(session_id):
-    #print "lost connection", session_id
-    sessions.pop(session_id)
-    if session_id in sessions_sub_numblocks:
-        sessions_sub_numblocks.pop(session_id)
-
-
-# one thread per client. put requests in a queue.
-def tcp_client_thread(ipaddr,conn):
-    """ use a persistent connection. put commands in a queue."""
+        time.sleep(30)
+        t = time.time()
+        for k,s in sessions.items():
+            if s.get('type') == 'persistent': continue
+            t0 = s['last_time']
+            if t - t0 > 5*60:
+                sessions.pop(k)
+                print "lost session", k
+            
 
-    print timestr(), "TCP session", ipaddr
-    global sessions
+####################################################################
 
-    session_id = random_string(10)
-    sessions[session_id] = { 'conn':conn, 'addresses':{}, 'version':'unknown', 'type':'persistent' }
 
-    ipaddr = ipaddr[0]
-    msg = ''
+import stratum
 
-    while not stopping:
-        try:
-            d = conn.recv(1024)
-        except socket.error:
-            d = ''
-        if not d:
-            close_session(session_id)
-            break
+class AbeProcessor(stratum.Processor):
+    def process(self,session,request):
+        message_id = request['id']
+        method = request['method']
+        params = request.get('params',[])
 
-        msg += d
-        while True:
-            s = msg.find('\n')
-            if s ==-1:
-                break
-            else:
-                c = msg[0:s].strip()
-                msg = msg[s+1:]
-                if c == 'quit': 
-                    conn.close()
-                    close_session(session_id)
-                    return
-                try:
-                    c = json.loads(c)
-                except:
-                    print "json error", repr(c)
-                    continue
-                try:
-                    message_id = c.get('id')
-                    method = c.get('method')
-                    params = c.get('params')
-                except:
-                    print "syntax error", repr(c), ipaddr
-                    continue
-
-                # add to queue
-                input_queue.put((session_id, message_id, method, params))
-
-
-
-# read commands from the input queue. perform requests, etc. this should be called from the main thread.
-def process_input_queue():
-    while not stopping:
-        session_id, message_id, method, data = input_queue.get()
-        if session_id not in sessions.keys():
-            continue
-        out = None
-        if method == 'address.subscribe':
-            address = data[0]
-            subscribe_to_address(session_id,message_id,address)
-        elif method == 'numblocks.subscribe':
-            subscribe_to_numblocks(session_id,message_id)
+        result = ''
+        if method == 'numblocks.subscribe':
+            session.subscribe_to_numblocks(message_id)
+            result = block_number
+        elif method == 'address.subscribe':
+            address = params[0]
+            status = store.get_status(address)
+            session.subscribe_to_address(address,message_id,status)
+            result = status
         elif method == 'client.version':
-            sessions[session_id]['version'] = data[0]
+            session.version = params[0]
         elif method == 'server.banner':
-            out = { 'result':config.get('server','banner').replace('\\n','\n') } 
+            result = config.get('server','banner').replace('\\n','\n')
         elif method == 'server.peers':
-            out = { 'result':peer_list.values() } 
+            result = peer_list.values()
         elif method == 'address.get_history':
-            address = data[0]
-            out = { 'result':store.get_history( address ) } 
+            address = params[0]
+            result = store.get_history( address ) 
         elif method == 'transaction.broadcast':
-            txo = store.send_tx(data[0])
+            txo = store.send_tx(params[0])
             print "sent tx:", txo
-            out = {'result':txo }
+            result = txo 
         else:
-            print "unknown command", method
-        if out:
-            out['id'] = message_id
-            out = json.dumps( out )
-            output_queue.put((session_id, out))
+            print "unknown method", request
 
-# this is a separate thread
-def process_output_queue():
-    while not stopping:
-        session_id, out = output_queue.get()
-        session = sessions.get(session_id)
-        if session: 
-            try:
-                conn = session.get('conn')
-                conn.send(out+'\n')
-            except:
-                close_session(session_id)
-                
+        if result!='':
+            response = { 'id':message_id, 'result':result }
+            self.push_response(session,response)
 
 
 
@@ -505,17 +433,6 @@ def process_output_queue():
 
 
 
-def clean_session_thread():
-    while not stopping:
-        time.sleep(30)
-        t = time.time()
-        for k,s in sessions.items():
-            if s.get('type') == 'persistent': continue
-            t0 = s['last_time']
-            if t - t0 > 5*60:
-                sessions.pop(k)
-                print "lost session", k
-            
 
 def irc_thread():
     global peer_list
@@ -614,10 +531,20 @@ if __name__ == '__main__':
 
     # supported protocols
     thread.start_new_thread(native_server_thread, ())
-    thread.start_new_thread(tcp_server_thread, ())
+
     thread.start_new_thread(http_server_thread, ())
     thread.start_new_thread(clean_session_thread, ())
 
+    #tcp stratum
+    stratum_processor = AbeProcessor()
+    shared = stratum.Shared()
+    # Bind shared to processor since constructor is user defined
+    stratum_processor.shared = shared
+    stratum_processor.start()
+    # Create various transports we need
+    server = stratum.TcpServer(shared, stratum_processor, "ecdsa.org",50001)
+    server.start()
+
     if (config.get('server','irc') == 'yes' ):
        thread.start_new_thread(irc_thread, ())
 
@@ -631,6 +558,12 @@ if __name__ == '__main__':
             old_block_number = block_number
             for session_id in sessions_sub_numblocks.keys():
                 send_numblocks(session_id)
+
+            for session in stratum_processor.sessions:
+                if session.numblocks_sub is not None:
+                    response = { 'id':session.numblocks_sub, 'result':block_number }
+                    stratum_processor.push_response(session,response)
+
         while True:
             try:
                 addr = store.address_queue.get(False)
@@ -638,6 +571,16 @@ if __name__ == '__main__':
                 break
             do_update_address(addr)
 
+            for session in stratum_processor.sessions:
+                m = session.addresses_sub.get(addr)
+                if m:
+                    status = store.get_status( addr )
+                    message_id, last_status = m
+                    if status != last_status:
+                        session.subscribe_to_address(message_id, status)
+                        response = { 'id':message_id, 'result':status }
+                        stratum_processor.push_response(session,response)
+
         time.sleep(10)
     print "server stopped"