# along with this program. If not, see <http://www.gnu.org/licenses/>.
import socket
-import select
import time
import sys
import os
import json
import Queue
from network import Network
+from util import print_msg, print_stderr
+from simple_config import SimpleConfig
-
+DAEMON_PORT=8001
class NetworkProxy(threading.Thread):
# connects to daemon
# sends requests, runs callbacks
- def __init__(self, config):
+ def __init__(self, config = {}):
threading.Thread.__init__(self)
self.daemon = True
- self.config = config
+ self.config = SimpleConfig(config) if type(config) == type({}) else config
self.socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
- self.socket.connect(('', 8000))
+ self.daemon_port = config.get('daemon_port', DAEMON_PORT)
self.message_id = 0
self.unanswered_requests = {}
self.subscriptions = {}
- self.debug = True
+ self.debug = False
self.lock = threading.Lock()
-
+ self.pending_transactions_for_notifications = []
+
+
+ def start(self, start_daemon=False):
+ daemon_started = False
+ while True:
+ try:
+ self.socket.connect(('', self.daemon_port))
+ threading.Thread.start(self)
+ return True
+
+ except socket.error:
+ if not start_daemon:
+ return False
+
+ elif not daemon_started:
+ print_stderr( "Starting daemon [%s]"%self.config.get('server'))
+ daemon_started = True
+ pid = os.fork()
+ if (pid == 0): # The first child.
+ os.chdir("/")
+ os.setsid()
+ os.umask(0)
+ pid2 = os.fork()
+ if (pid2 == 0): # Second child
+ server = NetworkServer(self.config)
+ try:
+ server.main_loop()
+ except KeyboardInterrupt:
+ print "Ctrl C - Stopping server"
+ sys.exit(1)
+ sys.exit(0)
+ else:
+ time.sleep(0.1)
+
+
def parse_json(self, message):
s = message.find('\n')
def process(self, response):
# runs callbacks
- #print "<--", response
+ if self.debug: print "<--", response
msg_id = response.get('id')
with self.lock:
method, params, callback = self.unanswered_requests.pop(msg_id)
result = response.get('result')
- callback({'method':method, 'params':params, 'result':result, 'id':msg_id})
+ callback(None, {'method':method, 'params':params, 'result':result, 'id':msg_id})
- def send(self, messages, callback):
+ def subscribe(self, messages, callback):
# detect if it is a subscription
with self.lock:
if self.subscriptions.get(callback) is None:
if message not in self.subscriptions[callback]:
self.subscriptions[callback].append(message)
- self.do_send( messages, callback )
+ self.send( messages, callback )
- def do_send(self, messages, callback):
+ def send(self, messages, callback):
"""return the ids of the requests that we sent"""
out = ''
ids = []
request = json.dumps( { 'id':self.message_id, 'method':method, 'params':params } )
self.unanswered_requests[self.message_id] = method, params, callback
ids.append(self.message_id)
- # print "-->", request
+ if self.debug: print "-->", request
self.message_id += 1
out += request + '\n'
while out:
def synchronous_get(self, requests, timeout=100000000):
queue = Queue.Queue()
- ids = self.do_send(requests, queue.put)
+ ids = self.send(requests, lambda i,x: queue.put(x))
id2 = ids[:]
res = {}
while ids:
def get_servers(self):
- return self.synchronous_get([('network.getservers',[])])[0]
+ return self.synchronous_get([('network.get_servers',[])])[0]
+
+ def get_header(self, height):
+ return self.synchronous_get([('network.get_header',[height])])[0]
+
+ def get_local_height(self):
+ return self.synchronous_get([('network.get_local_height',[])])[0]
+
+ def is_connected(self):
+ return self.synchronous_get([('network.is_connected',[])])[0]
+
+ def is_up_to_date(self):
+ return self.synchronous_get([('network.is_up_to_date',[])])[0]
+
+ def main_server(self):
+ return self.synchronous_get([('network.main_server',[])])[0]
def stop(self):
- return self.synchronous_get([('network.shutdown',[])])[0]
+ return self.synchronous_get([('daemon.shutdown',[])])[0]
+
+
+ def trigger_callback(self, cb):
+ pass
self.network = network
self.queue = Queue.Queue()
self.unanswered_requests = {}
+ self.debug = False
def run(self):
def process(self, request):
- #print "<--", request
+ if self.debug: print "<--", request
method = request['method']
params = request['params']
_id = request['id']
if method.startswith('network.'):
- if method == 'network.shutdown':
- self.server.running = False
- r = {'id':_id, 'result':True}
- elif method == 'network.getservers':
- servers = self.network.get_servers()
- r = {'id':_id, 'result':servers}
- else:
- r = {'id':_id, 'error':'unknown method'}
- self.queue.put(r)
+ out = {'id':_id}
+ try:
+ f = getattr(self.network, method[8:])
+ except AttributeError:
+ out['error'] = "unknown method"
+ try:
+ out['result'] = f(*params)
+ except BaseException as e:
+ out['error'] =str(e)
+ self.queue.put(out)
+ return
+
+ if method == 'daemon.shutdown':
+ self.server.running = False
+ self.queue.put({'id':_id, 'result':True})
return
def cb(i,r):
while out:
n = self.s.send(out)
out = out[n:]
- #print "-->", r
+ if self.debug: print "-->", r
-#Server:
-# start network() object
-# accept connections, forward requests
class NetworkServer:
self.network = network
self.server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
- self.server.bind(('', 8000))
+ self.daemon_port = config.get('daemon_port', DAEMON_PORT)
+ self.server.bind(('', self.daemon_port))
self.server.listen(5)
self.server.settimeout(1)
self.running = False
- self.timeout = 60
+ self.timeout = config.get('daemon_timeout', 60)
def main_loop(self):
t = time.time()
client = ClientThread(self, self.network, connection)
client.start()
- #print "Done."
-
-
-
-
-def start_daemon(config):
- pid = os.fork()
- if (pid == 0): # The first child.
- os.chdir("/")
- os.setsid()
- os.umask(0)
- pid2 = os.fork()
- if (pid2 == 0): # Second child
- server = NetworkServer(config)
- try:
- server.main_loop()
- except KeyboardInterrupt:
- print "Ctrl C - Stopping server"
- sys.exit(1)
-
- sys.exit(0)
-
- # should use a signal
- time.sleep(2)