daemon; initial commit
authorThomasV <thomasv@gitorious>
Mon, 10 Mar 2014 15:16:27 +0000 (16:16 +0100)
committerThomasV <thomasv@gitorious>
Mon, 10 Mar 2014 15:16:27 +0000 (16:16 +0100)
electrum
lib/__init__.py
lib/daemon.py [new file with mode: 0644]
lib/network.py
setup.py

index a449de3..74b0077 100755 (executable)
--- a/electrum
+++ b/electrum
@@ -114,15 +114,16 @@ def run_command(cmd, password=None, args=[]):
     cmd_runner.password = password
 
     if cmd.requires_network and not options.offline:
-        cmd_runner.network = xmlrpclib.ServerProxy('http://localhost:8000')
 
         while True:
             try:
-                if cmd_runner.network.ping() == 'pong':
-                    break
+                cmd_runner.network = NetworkProxy(config)
+                cmd_runner.network.start()
+                break
             except socket.error:
                 if cmd.name != 'daemon':
-                    start_daemon()
+                    print "starting daemon"
+                    start_daemon(config)
                 else:
                     print "Daemon not running"
                     sys.exit(1)
@@ -133,6 +134,7 @@ def run_command(cmd, password=None, args=[]):
     else:
         cmd_runner.network = None
 
+
     try:
         result = func(*args[1:])
     except Exception:
@@ -153,50 +155,7 @@ def run_command(cmd, password=None, args=[]):
 
 
 
-def start_server():
-    network = Network(config)
-    if not network.start(wait=True):
-        print_msg("Not connected, aborting.")
-        sys.exit(1)
-    print_msg("Network daemon connected to " + network.interface.connection_msg)
-    from SimpleXMLRPCServer import SimpleXMLRPCServer
-    server = SimpleXMLRPCServer(('localhost',8000), allow_none=True, logRequests=False)
-    server.network = network
-    server.register_function(lambda: 'pong', 'ping')
-    server.register_function(network.synchronous_get, 'synchronous_get')
-    server.register_function(network.get_servers, 'get_servers')
-    server.register_function(network.main_server, 'main_server')
-    server.register_function(network.send, 'send')
-    server.register_function(network.subscribe, 'subscribe')
-    server.register_function(network.is_connected, 'is_connected')
-    server.register_function(network.is_up_to_date, 'is_up_to_date')
-    server.register_function(lambda: setattr(server,'running', False), 'stop')
-    return server
-
-def start_daemon():
-    pid = os.fork()
-    if (pid == 0): # The first child.
-        os.chdir("/")
-        os.setsid()
-        os.umask(0)
-        pid2 = os.fork()
-        if (pid2 == 0):  # Second child
-            server = start_server()
-            server.running = True
-            timeout = 60
-            t0 = time.time()
-            server.socket.settimeout(timeout)
-            while server.running:
-                server.handle_request()
-                t = time.time()
-                if t - t0 > 0.9*timeout:
-                    break
-                if not server.network.is_connected():
-                    break
-                t0 = t
-        sys.exit(0)
 
-    time.sleep(2)
 
 
 if __name__ == '__main__':
index ddf27d5..d01b645 100644 (file)
@@ -14,3 +14,4 @@ from plugins import BasePlugin
 from mnemonic import mn_encode as mnemonic_encode
 from mnemonic import mn_decode as mnemonic_decode
 from commands import Commands, known_commands
+from daemon import start_daemon, NetworkProxy
diff --git a/lib/daemon.py b/lib/daemon.py
new file mode 100644 (file)
index 0000000..b26ef88
--- /dev/null
@@ -0,0 +1,307 @@
+#!/usr/bin/env python
+#
+# Electrum - lightweight Bitcoin client
+# Copyright (C) 2014 Thomas Voegtlin
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see <http://www.gnu.org/licenses/>.
+
+import socket
+import select
+import time
+import sys
+import os
+import threading
+import traceback
+import json
+import Queue
+from network import Network
+
+
+
+class NetworkProxy(threading.Thread):
+    # connects to daemon
+    # sends requests, runs callbacks
+
+    def __init__(self, config):
+        threading.Thread.__init__(self)
+        self.daemon = True
+        self.config = config
+        self.socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
+        self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
+        self.socket.connect(('', 8000))
+        self.message_id = 0
+        self.unanswered_requests = {}
+        self.subscriptions = {}
+        self.debug = True
+        self.lock = threading.Lock()
+        
+
+    def parse_json(self, message):
+        s = message.find('\n')
+        if s==-1: 
+            return None, message
+        j = json.loads( message[0:s] )
+        return j, message[s+1:]
+
+
+    def run(self):
+        # read responses and trigger callbacks
+        message = ''
+        while True:
+            try:
+                data = self.socket.recv(1024)
+            except:
+                data = ''
+            if not data:
+                break
+
+            message += data
+            while True:
+                response, message = self.parse_json(message)
+                if response is not None: 
+                    self.process(response)
+                else:
+                    break
+
+        print "NetworkProxy: exiting"
+
+
+    def process(self, response):
+        # runs callbacks
+        #print "<--", response
+
+        msg_id = response.get('id')
+        with self.lock: 
+            method, params, callback = self.unanswered_requests.pop(msg_id)
+
+        result = response.get('result')
+        callback({'method':method, 'params':params, 'result':result, 'id':msg_id})
+
+
+    def send(self, messages, callback):
+        # detect if it is a subscription
+        with self.lock:
+            if self.subscriptions.get(callback) is None: 
+                self.subscriptions[callback] = []
+            for message in messages:
+                if message not in self.subscriptions[callback]:
+                    self.subscriptions[callback].append(message)
+
+        self.do_send( messages, callback )
+
+
+    def do_send(self, messages, callback):
+        """return the ids of the requests that we sent"""
+        out = ''
+        ids = []
+        for m in messages:
+            method, params = m 
+            request = json.dumps( { 'id':self.message_id, 'method':method, 'params':params } )
+            self.unanswered_requests[self.message_id] = method, params, callback
+            ids.append(self.message_id)
+            # print "-->", request
+            self.message_id += 1
+            out += request + '\n'
+        while out:
+            sent = self.socket.send( out )
+            out = out[sent:]
+        return ids
+
+
+    def synchronous_get(self, requests, timeout=100000000):
+        queue = Queue.Queue()
+        ids = self.do_send(requests, queue.put)
+        id2 = ids[:]
+        res = {}
+        while ids:
+            r = queue.get(True, timeout)
+            _id = r.get('id')
+            if _id in ids:
+                ids.remove(_id)
+                res[_id] = r.get('result')
+        out = []
+        for _id in id2:
+            out.append(res[_id])
+        return out
+
+
+    def get_servers(self):
+        return self.synchronous_get([('network.getservers',[])])[0]
+
+    def stop(self):
+        return self.synchronous_get([('network.shutdown',[])])[0]
+
+
+
+
+
+
+class ClientThread(threading.Thread):
+    # read messages from client (socket), and sends them to Network
+    # responses are sent back on the same socket
+
+    def __init__(self, server, network, socket):
+        threading.Thread.__init__(self)
+        self.server = server
+        self.daemon = True
+        self.s = socket
+        self.s.settimeout(0.1)
+        self.network = network
+        self.queue = Queue.Queue()
+        self.unanswered_requests = {}
+
+
+    def run(self):
+        message = ''
+        while True:
+            self.send_responses()
+            try:
+                data = self.s.recv(1024)
+            except socket.timeout:
+                continue
+
+            if not data:
+                break
+            message += data
+
+            while True:
+                cmd, message = self.parse_json(message)
+                if not cmd:
+                    break
+                self.process(cmd)
+
+        #print "client thread terminating"
+
+
+    def parse_json(self, message):
+        n = message.find('\n')
+        if n==-1: 
+            return None, message
+        j = json.loads( message[0:n] )
+        return j, message[n+1:]
+
+
+    def process(self, request):
+        #print "<--", request
+        method = request['method']
+        params = request['params']
+        _id = request['id']
+
+        if method.startswith('network.'):
+            if method == 'network.shutdown':
+                self.server.running = False
+                r = {'id':_id, 'result':True}
+            elif method == 'network.getservers':
+                servers = self.network.get_servers()
+                r = {'id':_id, 'result':servers}
+            else:
+                r = {'id':_id, 'error':'unknown method'}
+            self.queue.put(r) 
+            return
+
+        def cb(i,r):
+            _id = r.get('id')
+            if _id is not None:
+                my_id = self.unanswered_requests.pop(_id)
+                r['id'] = my_id
+            self.queue.put(r)
+
+        new_id = self.network.interface.send([(method, params)], cb) [0]
+        self.unanswered_requests[new_id] = _id
+
+
+    def send_responses(self):
+        while True:
+            try:
+                r = self.queue.get_nowait()
+            except Queue.Empty:
+                break
+            out = json.dumps(r) + '\n'
+            while out:
+                n = self.s.send(out)
+                out = out[n:]
+            #print "-->", r
+        
+
+#Server:
+#   start network() object
+#   accept connections, forward requests 
+
+
+class NetworkServer:
+
+    def __init__(self, config):
+        network = Network(config)
+        if not network.start(wait=True):
+            print_msg("Not connected, aborting.")
+            sys.exit(1)
+        self.network = network
+        self.server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
+        self.server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
+        self.server.bind(('', 8000))
+        self.server.listen(5)
+        self.server.settimeout(1)
+        self.running = False
+        self.timeout = 60
+
+
+    def main_loop(self):
+        self.running = True
+        t = time.time()
+        while self.running:
+            try:
+                connection, address = self.server.accept()
+            except socket.timeout:
+                if time.time() - t > self.timeout:
+                    break
+                continue
+            t = time.time()
+            client = ClientThread(self, self.network, connection)
+            client.start()
+        #print "Done."
+
+
+
+
+def start_daemon(config):
+    pid = os.fork()
+    if (pid == 0): # The first child.
+        os.chdir("/")
+        os.setsid()
+        os.umask(0)
+        pid2 = os.fork()
+        if (pid2 == 0):  # Second child
+            server = NetworkServer(config)
+            try:
+                server.main_loop()
+            except KeyboardInterrupt:
+                print "Ctrl C - Stopping server"
+            sys.exit(1)
+
+        sys.exit(0)
+
+    # should use a signal
+    time.sleep(2)
+
+
+
+if __name__ == '__main__':
+    import simple_config
+    config = simple_config.SimpleConfig({'verbose':True, 'server':'ecdsa.net:50002:s'})
+    server = NetworkServer(config)
+    try:
+        server.main_loop()
+    except KeyboardInterrupt:
+        print "Ctrl C - Stopping server"
+        sys.exit(1)
index aab98ac..6ac7ec5 100644 (file)
@@ -413,24 +413,14 @@ class Network(threading.Thread):
 
 
 
-class NetworkProxy:
-    # interface to the network object. 
-    # handle subscriptions and callbacks
-    # the network object can be jsonrpc server 
-    def __init__(self, network):
-        self.network = network
-
-
-
-
 if __name__ == "__main__":
-    import simple_config
-    config = simple_config.SimpleConfig({'verbose':True, 'server':'ecdsa.org:50002:s'})
-    network = Network(config)
+    network = NetworkProxy({})
     network.start()
+    print network.get_servers()
 
-    while 1:
-        time.sleep(1)
-
-
+    q = Queue.Queue()
+    network.send([('blockchain.headers.subscribe',[])], q.put)
+    while True:
+        r = q.get(timeout=10000)
+        print r
 
index b79f659..f10e064 100644 (file)
--- a/setup.py
+++ b/setup.py
@@ -71,6 +71,7 @@ setup(
         'electrum.blockchain',
         'electrum.bmp',
         'electrum.commands',
+        'electrum.daemon',
         'electrum.i18n',
         'electrum.interface',
         'electrum.mnemonic',