more modularisation
authorThomasV <thomasv@gitorious>
Wed, 28 Mar 2012 14:31:27 +0000 (18:31 +0400)
committerThomasV <thomasv@gitorious>
Wed, 28 Mar 2012 14:31:27 +0000 (18:31 +0400)
irc.py [new file with mode: 0644]
native.py [new file with mode: 0644]
processor.py
server.py

diff --git a/irc.py b/irc.py
new file mode 100644 (file)
index 0000000..8143593
--- /dev/null
+++ b/irc.py
@@ -0,0 +1,62 @@
+####################################################################
+
+import threading, socket, traceback, time
+
+def random_string(N):
+    import random, string
+    return ''.join(random.choice(string.ascii_uppercase + string.digits) for x in range(N))
+
+
+class Irc(threading.Thread):
+
+    def __init__(self, processor, host, nick):
+        self.processor = processor
+        threading.Thread.__init__(self)
+        self.daemon = True
+        self.peers = {}
+        self.host = host
+        self.nick = nick
+
+    def get_peers(self):
+        return self.peers.values()
+
+    def run(self):
+        NICK = 'E_'+random_string(10)
+        while not self.processor.shared.stopped():
+            try:
+                s = socket.socket()
+                s.connect(('irc.freenode.net', 6667))
+                s.send('USER electrum 0 * :'+self.host+' '+self.nick+'\n')
+                s.send('NICK '+NICK+'\n')
+                s.send('JOIN #electrum\n')
+                sf = s.makefile('r', 0)
+                t = 0
+                while not self.processor.shared.stopped():
+                    line = sf.readline()
+                    line = line.rstrip('\r\n')
+                    line = line.split()
+                    if line[0]=='PING': 
+                        s.send('PONG '+line[1]+'\n')
+                    elif '353' in line: # answer to /names
+                        k = line.index('353')
+                        for item in line[k+1:]:
+                            if item[0:2] == 'E_':
+                                s.send('WHO %s\n'%item)
+                    elif '352' in line: # answer to /who
+                        # warning: this is a horrible hack which apparently works
+                        k = line.index('352')
+                        ip = line[k+4]
+                        ip = socket.gethostbyname(ip)
+                        name = line[k+6]
+                        host = line[k+9]
+                        self.peers[name] = (ip,host)
+                    if time.time() - t > 5*60:
+                        self.processor.push_response({'method':'server.peers', 'result':[self.get_peers()]})
+                        s.send('NAMES #electrum\n')
+                        t = time.time()
+                        self.peers = {}
+            except:
+                traceback.print_exc(file=sys.stdout)
+            finally:
+                sf.close()
+                s.close()
diff --git a/native.py b/native.py
new file mode 100644 (file)
index 0000000..1b6a6ec
--- /dev/null
+++ b/native.py
@@ -0,0 +1,197 @@
+import thread, threading, time, socket, traceback, ast
+
+
+
+def random_string(N):
+    import random, string
+    return ''.join(random.choice(string.ascii_uppercase + string.digits) for x in range(N))
+
+def timestr():
+    return time.strftime("[%d/%m/%Y-%H:%M:%S]")
+
+
+class NativeServer(threading.Thread):
+
+    def __init__(self, shared, store, irc, banner, host, port):
+        threading.Thread.__init__(self)
+        self.banner = banner
+        self.store = store
+        self.irc = irc
+        self.sessions = {}
+        self.host = host
+        self.port = port
+        self.daemon = True
+        self.shared = shared
+
+
+    def modified_addresses(self,a_session):
+        import copy
+        session = copy.deepcopy(a_session)
+        addresses = session['addresses']
+        session['last_time'] = time.time()
+        ret = {}
+        k = 0
+        for addr in addresses:
+            status = self.store.get_status( addr )
+            msg_id, last_status = addresses.get( addr )
+            if last_status != status:
+                addresses[addr] = msg_id, status
+                ret[addr] = status
+
+        return ret, addresses
+
+
+    def poll_session(self, session_id): 
+        session = self.sessions.get(session_id)
+        if session is None:
+            print time.asctime(), "session not found", session_id
+            return -1, {}
+        else:
+            self.sessions[session_id]['last_time'] = time.time()
+            ret, addresses = self.modified_addresses(session)
+            if ret: self.sessions[session_id]['addresses'] = addresses
+            return repr( (self.store.block_number,ret))
+
+
+    def add_address_to_session(self, session_id, address):
+        status = self.store.get_status(address)
+        self.sessions[session_id]['addresses'][address] = ("", status)
+        self.sessions[session_id]['last_time'] = time.time()
+        return status
+
+
+    def new_session(self, version, addresses):
+        session_id = random_string(10)
+        self.sessions[session_id] = { 'addresses':{}, 'version':version }
+        for a in addresses:
+            self.sessions[session_id]['addresses'][a] = ('','')
+        out = repr( (session_id, self.banner.replace('\\n','\n') ) )
+        self.sessions[session_id]['last_time'] = time.time()
+        return out
+
+
+    def update_session(self, session_id,addresses):
+        """deprecated in 0.42, wad replaced by add_address_to_session"""
+        self.sessions[session_id]['addresses'] = {}
+        for a in addresses:
+            self.sessions[session_id]['addresses'][a] = ''
+        self.sessions[session_id]['last_time'] = time.time()
+        return 'ok'
+
+
+
+    def native_client_thread(self, ipaddr,conn):
+        try:
+            ipaddr = ipaddr[0]
+            msg = ''
+            while 1:
+                d = conn.recv(1024)
+                msg += d
+                if not d: 
+                    break
+                if '#' in msg:
+                    msg = msg.split('#', 1)[0]
+                    break
+            try:
+                cmd, data = ast.literal_eval(msg)
+            except:
+                print "syntax error", repr(msg), ipaddr
+                conn.close()
+                return
+
+            out = self.do_command(cmd, data, ipaddr)
+            if out:
+                #print ipaddr, cmd, len(out)
+                try:
+                    conn.send(out)
+                except:
+                    print "error, could not send"
+
+        finally:
+            conn.close()
+
+
+
+    def do_command(self, cmd, data, ipaddr):
+
+        if cmd=='b':
+            out = "%d"%block_number
+
+        elif cmd in ['session','new_session']:
+            try:
+                if cmd == 'session':
+                    addresses = ast.literal_eval(data)
+                    version = "old"
+                else:
+                    version, addresses = ast.literal_eval(data)
+                    if version[0]=="0": version = "v" + version
+            except:
+                print "error", data
+                return None
+            print timestr(), "new session", ipaddr, addresses[0] if addresses else addresses, len(addresses), version
+            out = self.new_session(version, addresses)
+
+        elif cmd=='address.subscribe':
+            try:
+                session_id, addr = ast.literal_eval(data)
+            except:
+                traceback.print_exc(file=sys.stdout)
+                print data
+                return None
+            out = self.add_address_to_session(session_id,addr)
+
+        elif cmd=='update_session':
+            try:
+                session_id, addresses = ast.literal_eval(data)
+            except:
+                traceback.print_exc(file=sys.stdout)
+                return None
+            print timestr(), "update session", ipaddr, addresses[0] if addresses else addresses, len(addresses)
+            out = self.update_session(session_id,addresses)
+            
+        elif cmd=='poll': 
+            out = self.poll_session(data)
+
+        elif cmd == 'h': 
+            address = data
+            out = repr( self.store.get_history( address ) )
+
+        elif cmd =='tx':
+            out = self.store.send_tx(data)
+            print timestr(), "sent tx:", ipaddr, out
+
+        elif cmd == 'peers':
+            out = repr(self.irc.get_peers())
+
+        else:
+            out = None
+
+        return out
+
+
+    def clean_session_thread(self):
+        while not self.shared.stopped():
+            time.sleep(30)
+            t = time.time()
+            for k,s in self.sessions.items():
+                if s.get('type') == 'persistent': continue
+                t0 = s['last_time']
+                if t - t0 > 5*60:
+                    self.sessions.pop(k)
+                    print "lost session", k
+            
+    def run(self):
+        thread.start_new_thread(self.clean_session_thread, ())
+
+        s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
+        s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
+        s.bind((self.host, self.port)) 
+        s.listen(1)
+        while not self.shared.stopped():
+            conn, addr = s.accept()
+            try:
+                thread.start_new_thread(self.native_client_thread, (addr, conn,))
+            except:
+                # can't start new thread if there is no memory..
+                traceback.print_exc(file=sys.stdout)
+
index d0fe91f..28df39e 100644 (file)
@@ -123,6 +123,7 @@ class Dispatcher(threading.Thread):
         self.shared = shared
         self.processor = processor
         threading.Thread.__init__(self)
+        self.daemon = True
 
     def run(self):
         while not self.shared.stopped():
index 6ab0512..675ef5f 100755 (executable)
--- a/server.py
+++ b/server.py
 # License along with this program.  If not, see
 # <http://www.gnu.org/licenses/agpl.html>.
 
-"""
-Todo:
-   * server should check and return bitcoind status..
-   * improve txpoint sorting
-   * command to check cache
-
- mempool transactions do not need to be added to the database; it slows it down
-"""
-
-import abe_backend
-
-
-
-
 import time, json, socket, operator, thread, ast, sys, re, traceback
 import ConfigParser
 from json import dumps, loads
@@ -64,201 +50,14 @@ except:
 
 
 password = config.get('server','password')
-stopping = False
-sessions = {}
-
-
-
-def random_string(N):
-    import random, string
-    return ''.join(random.choice(string.ascii_uppercase + string.digits) for x in range(N))
-
-
-
-def modified_addresses(a_session):
-    #t1 = time.time()
-    import copy
-    session = copy.deepcopy(a_session)
-    addresses = session['addresses']
-    session['last_time'] = time.time()
-    ret = {}
-    k = 0
-    for addr in addresses:
-        status = store.get_status( addr )
-        msg_id, last_status = addresses.get( addr )
-        if last_status != status:
-            addresses[addr] = msg_id, status
-            ret[addr] = status
-
-    #t2 = time.time() - t1 
-    #if t2 > 10: print "high load:", session_id, "%d/%d"%(k,len(addresses)), t2
-    return ret, addresses
-
-
-def poll_session(session_id): 
-    # native
-    session = sessions.get(session_id)
-    if session is None:
-        print time.asctime(), "session not found", session_id
-        return -1, {}
-    else:
-        sessions[session_id]['last_time'] = time.time()
-        ret, addresses = modified_addresses(session)
-        if ret: sessions[session_id]['addresses'] = addresses
-        return repr( (store.block_number,ret))
-
-
-def add_address_to_session(session_id, address):
-    status = store.get_status(address)
-    sessions[session_id]['addresses'][address] = ("", status)
-    sessions[session_id]['last_time'] = time.time()
-    return status
-
-
-def new_session(version, addresses):
-    session_id = random_string(10)
-    sessions[session_id] = { 'addresses':{}, 'version':version }
-    for a in addresses:
-        sessions[session_id]['addresses'][a] = ('','')
-    out = repr( (session_id, config.get('server','banner').replace('\\n','\n') ) )
-    sessions[session_id]['last_time'] = time.time()
-    return out
-
-
-def update_session(session_id,addresses):
-    """deprecated in 0.42, wad replaced by add_address_to_session"""
-    sessions[session_id]['addresses'] = {}
-    for a in addresses:
-        sessions[session_id]['addresses'][a] = ''
-    sessions[session_id]['last_time'] = time.time()
-    return 'ok'
-
-
-def native_server_thread():
-    s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
-    s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
-    s.bind((config.get('server','host'), config.getint('server','port')))
-    s.listen(1)
-    while not stopping:
-        conn, addr = s.accept()
-        try:
-            thread.start_new_thread(native_client_thread, (addr, conn,))
-        except:
-            # can't start new thread if there is no memory..
-            traceback.print_exc(file=sys.stdout)
-
-
-def native_client_thread(ipaddr,conn):
-    #print "client thread", ipaddr
-    try:
-        ipaddr = ipaddr[0]
-        msg = ''
-        while 1:
-            d = conn.recv(1024)
-            msg += d
-            if not d: 
-                break
-            if '#' in msg:
-                msg = msg.split('#', 1)[0]
-                break
-        try:
-            cmd, data = ast.literal_eval(msg)
-        except:
-            print "syntax error", repr(msg), ipaddr
-            conn.close()
-            return
-
-        out = do_command(cmd, data, ipaddr)
-        if out:
-            #print ipaddr, cmd, len(out)
-            try:
-                conn.send(out)
-            except:
-                print "error, could not send"
-
-    finally:
-        conn.close()
-
-
-def timestr():
-    return time.strftime("[%d/%m/%Y-%H:%M:%S]")
-
-# used by the native handler
-def do_command(cmd, data, ipaddr):
-
-    if cmd=='b':
-        out = "%d"%block_number
-
-    elif cmd in ['session','new_session']:
-        try:
-            if cmd == 'session':
-                addresses = ast.literal_eval(data)
-                version = "old"
-            else:
-                version, addresses = ast.literal_eval(data)
-                if version[0]=="0": version = "v" + version
-        except:
-            print "error", data
-            return None
-        print timestr(), "new session", ipaddr, addresses[0] if addresses else addresses, len(addresses), version
-        out = new_session(version, addresses)
-
-    elif cmd=='address.subscribe':
-        try:
-            session_id, addr = ast.literal_eval(data)
-        except:
-            traceback.print_exc(file=sys.stdout)
-            print data
-            return None
-        out = add_address_to_session(session_id,addr)
-
-    elif cmd=='update_session':
-        try:
-            session_id, addresses = ast.literal_eval(data)
-        except:
-            traceback.print_exc(file=sys.stdout)
-            return None
-        print timestr(), "update session", ipaddr, addresses[0] if addresses else addresses, len(addresses)
-        out = update_session(session_id,addresses)
-            
-    elif cmd=='poll': 
-        out = poll_session(data)
-
-    elif cmd == 'h': 
-        address = data
-        out = repr( store.get_history( address ) )
-
-    elif cmd =='tx':
-        out = store.send_tx(data)
-        print timestr(), "sent tx:", ipaddr, out
-
-    elif cmd == 'peers':
-        out = repr(irc.get_peers())
-
-    else:
-        out = None
-
-    return out
-
-
-def clean_session_thread():
-    while not stopping:
-        time.sleep(30)
-        t = time.time()
-        for k,s in sessions.items():
-            if s.get('type') == 'persistent': continue
-            t0 = s['last_time']
-            if t - t0 > 5*60:
-                sessions.pop(k)
-                print "lost session", k
-            
-
-####################################################################
 
 
 from processor import Shared, Processor, Dispatcher
 from stratum_http import HttpServer
 from stratum import TcpServer
+from native import NativeServer
+from irc import Irc
+from abe_backend import AbeStore
 
 class AbeProcessor(Processor):
     def process(self,request):
@@ -301,62 +100,6 @@ class AbeProcessor(Processor):
 
 
 
-####################################################################
-
-
-
-class Irc(threading.Thread):
-
-    def __init__(self, processor):
-        self.processor = processor
-        threading.Thread.__init__(self)
-        self.daemon = True
-        self.peers = {}
-
-    def get_peers(self):
-        return self.peers.values()
-
-    def run(self):
-        NICK = 'E_'+random_string(10)
-        while not self.processor.shared.stopped():
-            try:
-                s = socket.socket()
-                s.connect(('irc.freenode.net', 6667))
-                s.send('USER electrum 0 * :'+config.get('server','host')+' '+config.get('server','ircname')+'\n')
-                s.send('NICK '+NICK+'\n')
-                s.send('JOIN #electrum\n')
-                sf = s.makefile('r', 0)
-                t = 0
-                while not self.processor.shared.stopped():
-                    line = sf.readline()
-                    line = line.rstrip('\r\n')
-                    line = line.split()
-                    if line[0]=='PING': 
-                        s.send('PONG '+line[1]+'\n')
-                    elif '353' in line: # answer to /names
-                        k = line.index('353')
-                        for item in line[k+1:]:
-                            if item[0:2] == 'E_':
-                                s.send('WHO %s\n'%item)
-                    elif '352' in line: # answer to /who
-                        # warning: this is a horrible hack which apparently works
-                        k = line.index('352')
-                        ip = line[k+4]
-                        ip = socket.gethostbyname(ip)
-                        name = line[k+6]
-                        host = line[k+9]
-                        self.peers[name] = (ip,host)
-                    if time.time() - t > 5*60:
-                        self.processor.push_response({'method':'server.peers', 'result':[self.get_peers()]})
-                        s.send('NAMES #electrum\n')
-                        t = time.time()
-                        self.peers = {}
-            except:
-                traceback.print_exc(file=sys.stdout)
-            finally:
-                sf.close()
-                s.close()
-
 
 
 
@@ -366,56 +109,39 @@ if __name__ == '__main__':
         import jsonrpclib
         server = jsonrpclib.Server('http://%s:8081'%config.get('server','host'))
         cmd = sys.argv[1]
-        if cmd == 'load':
-            out = server.load(password)
-        elif cmd == 'peers':
-            out = server.server.peers()
-        elif cmd == 'stop':
+        if cmd == 'stop':
             out = server.stop(password)
-        elif cmd == 'clear_cache':
-            out = server.clear_cache(password)
-        elif cmd == 'get_cache':
-            out = server.get_cache(password,sys.argv[2])
-        elif cmd == 'h':
-            out = server.address.get_history(sys.argv[2])
-        elif cmd == 'tx':
-            out = server.transaction.broadcast(sys.argv[2])
-        elif cmd == 'b':
-            out = server.numblocks.subscribe()
         else:
             out = "Unknown command: '%s'" % cmd
         print out
         sys.exit(0)
 
-    # backend
-    store = abe_backend.AbeStore(config)
-
-    # old protocol
-    thread.start_new_thread(native_server_thread, ())
-    thread.start_new_thread(clean_session_thread, ())
-
     processor = AbeProcessor()
     shared = Shared()
     # Bind shared to processor since constructor is user defined
     processor.shared = shared
     processor.start()
+
+    irc = Irc(processor, config.get('server','host'), config.get('server','ircname'))
+    if (config.get('server','irc') == 'yes' ): irc.start()
+
+    # backend
+    store = AbeStore(config)
+
     # dispatcher
     dispatcher = Dispatcher(shared, processor)
     dispatcher.start()
+
+    host = config.get('server','host')
     # Create various transports we need
-    transports = [ TcpServer(shared, processor, "ecdsa.org",50001),
-                   HttpServer(shared, processor, "ecdsa.org",8081)
+    transports = [ NativeServer(shared, store, irc, config.get('server','banner'), host, 50000),
+                   TcpServer(shared, processor, host, 50001),
+                   HttpServer(shared, processor, host, 8081),
                    ]
     for server in transports:
         server.start()
 
-
-    if (config.get('server','irc') == 'yes' ):
-       irc = Irc(processor)
-        irc.start()
-
-
-    print "starting Electrum server"
+    print "starting Electrum server on", host
     store.run(processor)
     print "server stopped"