more modularisation
[electrum-server.git] / server.py
index 6ab0512..675ef5f 100755 (executable)
--- a/server.py
+++ b/server.py
 # License along with this program.  If not, see
 # <http://www.gnu.org/licenses/agpl.html>.
 
-"""
-Todo:
-   * server should check and return bitcoind status..
-   * improve txpoint sorting
-   * command to check cache
-
- mempool transactions do not need to be added to the database; it slows it down
-"""
-
-import abe_backend
-
-
-
-
 import time, json, socket, operator, thread, ast, sys, re, traceback
 import ConfigParser
 from json import dumps, loads
@@ -64,201 +50,14 @@ except:
 
 
 password = config.get('server','password')
-stopping = False
-sessions = {}
-
-
-
-def random_string(N):
-    import random, string
-    return ''.join(random.choice(string.ascii_uppercase + string.digits) for x in range(N))
-
-
-
-def modified_addresses(a_session):
-    #t1 = time.time()
-    import copy
-    session = copy.deepcopy(a_session)
-    addresses = session['addresses']
-    session['last_time'] = time.time()
-    ret = {}
-    k = 0
-    for addr in addresses:
-        status = store.get_status( addr )
-        msg_id, last_status = addresses.get( addr )
-        if last_status != status:
-            addresses[addr] = msg_id, status
-            ret[addr] = status
-
-    #t2 = time.time() - t1 
-    #if t2 > 10: print "high load:", session_id, "%d/%d"%(k,len(addresses)), t2
-    return ret, addresses
-
-
-def poll_session(session_id): 
-    # native
-    session = sessions.get(session_id)
-    if session is None:
-        print time.asctime(), "session not found", session_id
-        return -1, {}
-    else:
-        sessions[session_id]['last_time'] = time.time()
-        ret, addresses = modified_addresses(session)
-        if ret: sessions[session_id]['addresses'] = addresses
-        return repr( (store.block_number,ret))
-
-
-def add_address_to_session(session_id, address):
-    status = store.get_status(address)
-    sessions[session_id]['addresses'][address] = ("", status)
-    sessions[session_id]['last_time'] = time.time()
-    return status
-
-
-def new_session(version, addresses):
-    session_id = random_string(10)
-    sessions[session_id] = { 'addresses':{}, 'version':version }
-    for a in addresses:
-        sessions[session_id]['addresses'][a] = ('','')
-    out = repr( (session_id, config.get('server','banner').replace('\\n','\n') ) )
-    sessions[session_id]['last_time'] = time.time()
-    return out
-
-
-def update_session(session_id,addresses):
-    """deprecated in 0.42, wad replaced by add_address_to_session"""
-    sessions[session_id]['addresses'] = {}
-    for a in addresses:
-        sessions[session_id]['addresses'][a] = ''
-    sessions[session_id]['last_time'] = time.time()
-    return 'ok'
-
-
-def native_server_thread():
-    s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
-    s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
-    s.bind((config.get('server','host'), config.getint('server','port')))
-    s.listen(1)
-    while not stopping:
-        conn, addr = s.accept()
-        try:
-            thread.start_new_thread(native_client_thread, (addr, conn,))
-        except:
-            # can't start new thread if there is no memory..
-            traceback.print_exc(file=sys.stdout)
-
-
-def native_client_thread(ipaddr,conn):
-    #print "client thread", ipaddr
-    try:
-        ipaddr = ipaddr[0]
-        msg = ''
-        while 1:
-            d = conn.recv(1024)
-            msg += d
-            if not d: 
-                break
-            if '#' in msg:
-                msg = msg.split('#', 1)[0]
-                break
-        try:
-            cmd, data = ast.literal_eval(msg)
-        except:
-            print "syntax error", repr(msg), ipaddr
-            conn.close()
-            return
-
-        out = do_command(cmd, data, ipaddr)
-        if out:
-            #print ipaddr, cmd, len(out)
-            try:
-                conn.send(out)
-            except:
-                print "error, could not send"
-
-    finally:
-        conn.close()
-
-
-def timestr():
-    return time.strftime("[%d/%m/%Y-%H:%M:%S]")
-
-# used by the native handler
-def do_command(cmd, data, ipaddr):
-
-    if cmd=='b':
-        out = "%d"%block_number
-
-    elif cmd in ['session','new_session']:
-        try:
-            if cmd == 'session':
-                addresses = ast.literal_eval(data)
-                version = "old"
-            else:
-                version, addresses = ast.literal_eval(data)
-                if version[0]=="0": version = "v" + version
-        except:
-            print "error", data
-            return None
-        print timestr(), "new session", ipaddr, addresses[0] if addresses else addresses, len(addresses), version
-        out = new_session(version, addresses)
-
-    elif cmd=='address.subscribe':
-        try:
-            session_id, addr = ast.literal_eval(data)
-        except:
-            traceback.print_exc(file=sys.stdout)
-            print data
-            return None
-        out = add_address_to_session(session_id,addr)
-
-    elif cmd=='update_session':
-        try:
-            session_id, addresses = ast.literal_eval(data)
-        except:
-            traceback.print_exc(file=sys.stdout)
-            return None
-        print timestr(), "update session", ipaddr, addresses[0] if addresses else addresses, len(addresses)
-        out = update_session(session_id,addresses)
-            
-    elif cmd=='poll': 
-        out = poll_session(data)
-
-    elif cmd == 'h': 
-        address = data
-        out = repr( store.get_history( address ) )
-
-    elif cmd =='tx':
-        out = store.send_tx(data)
-        print timestr(), "sent tx:", ipaddr, out
-
-    elif cmd == 'peers':
-        out = repr(irc.get_peers())
-
-    else:
-        out = None
-
-    return out
-
-
-def clean_session_thread():
-    while not stopping:
-        time.sleep(30)
-        t = time.time()
-        for k,s in sessions.items():
-            if s.get('type') == 'persistent': continue
-            t0 = s['last_time']
-            if t - t0 > 5*60:
-                sessions.pop(k)
-                print "lost session", k
-            
-
-####################################################################
 
 
 from processor import Shared, Processor, Dispatcher
 from stratum_http import HttpServer
 from stratum import TcpServer
+from native import NativeServer
+from irc import Irc
+from abe_backend import AbeStore
 
 class AbeProcessor(Processor):
     def process(self,request):
@@ -301,62 +100,6 @@ class AbeProcessor(Processor):
 
 
 
-####################################################################
-
-
-
-class Irc(threading.Thread):
-
-    def __init__(self, processor):
-        self.processor = processor
-        threading.Thread.__init__(self)
-        self.daemon = True
-        self.peers = {}
-
-    def get_peers(self):
-        return self.peers.values()
-
-    def run(self):
-        NICK = 'E_'+random_string(10)
-        while not self.processor.shared.stopped():
-            try:
-                s = socket.socket()
-                s.connect(('irc.freenode.net', 6667))
-                s.send('USER electrum 0 * :'+config.get('server','host')+' '+config.get('server','ircname')+'\n')
-                s.send('NICK '+NICK+'\n')
-                s.send('JOIN #electrum\n')
-                sf = s.makefile('r', 0)
-                t = 0
-                while not self.processor.shared.stopped():
-                    line = sf.readline()
-                    line = line.rstrip('\r\n')
-                    line = line.split()
-                    if line[0]=='PING': 
-                        s.send('PONG '+line[1]+'\n')
-                    elif '353' in line: # answer to /names
-                        k = line.index('353')
-                        for item in line[k+1:]:
-                            if item[0:2] == 'E_':
-                                s.send('WHO %s\n'%item)
-                    elif '352' in line: # answer to /who
-                        # warning: this is a horrible hack which apparently works
-                        k = line.index('352')
-                        ip = line[k+4]
-                        ip = socket.gethostbyname(ip)
-                        name = line[k+6]
-                        host = line[k+9]
-                        self.peers[name] = (ip,host)
-                    if time.time() - t > 5*60:
-                        self.processor.push_response({'method':'server.peers', 'result':[self.get_peers()]})
-                        s.send('NAMES #electrum\n')
-                        t = time.time()
-                        self.peers = {}
-            except:
-                traceback.print_exc(file=sys.stdout)
-            finally:
-                sf.close()
-                s.close()
-
 
 
 
@@ -366,56 +109,39 @@ if __name__ == '__main__':
         import jsonrpclib
         server = jsonrpclib.Server('http://%s:8081'%config.get('server','host'))
         cmd = sys.argv[1]
-        if cmd == 'load':
-            out = server.load(password)
-        elif cmd == 'peers':
-            out = server.server.peers()
-        elif cmd == 'stop':
+        if cmd == 'stop':
             out = server.stop(password)
-        elif cmd == 'clear_cache':
-            out = server.clear_cache(password)
-        elif cmd == 'get_cache':
-            out = server.get_cache(password,sys.argv[2])
-        elif cmd == 'h':
-            out = server.address.get_history(sys.argv[2])
-        elif cmd == 'tx':
-            out = server.transaction.broadcast(sys.argv[2])
-        elif cmd == 'b':
-            out = server.numblocks.subscribe()
         else:
             out = "Unknown command: '%s'" % cmd
         print out
         sys.exit(0)
 
-    # backend
-    store = abe_backend.AbeStore(config)
-
-    # old protocol
-    thread.start_new_thread(native_server_thread, ())
-    thread.start_new_thread(clean_session_thread, ())
-
     processor = AbeProcessor()
     shared = Shared()
     # Bind shared to processor since constructor is user defined
     processor.shared = shared
     processor.start()
+
+    irc = Irc(processor, config.get('server','host'), config.get('server','ircname'))
+    if (config.get('server','irc') == 'yes' ): irc.start()
+
+    # backend
+    store = AbeStore(config)
+
     # dispatcher
     dispatcher = Dispatcher(shared, processor)
     dispatcher.start()
+
+    host = config.get('server','host')
     # Create various transports we need
-    transports = [ TcpServer(shared, processor, "ecdsa.org",50001),
-                   HttpServer(shared, processor, "ecdsa.org",8081)
+    transports = [ NativeServer(shared, store, irc, config.get('server','banner'), host, 50000),
+                   TcpServer(shared, processor, host, 50001),
+                   HttpServer(shared, processor, host, 8081),
                    ]
     for server in transports:
         server.start()
 
-
-    if (config.get('server','irc') == 'yes' ):
-       irc = Irc(processor)
-        irc.start()
-
-
-    print "starting Electrum server"
+    print "starting Electrum server on", host
     store.run(processor)
     print "server stopped"