# to our internal register
self.add_session(session)
self.process(session)
+ self.stop()
+
+ def stop(self):
+ pass
def process(self, session):
request = session.pop_request()
self.request_queue = queue.Queue()
self.response_queue = queue.Queue()
+ self.numblocks_sub = None
+ self.addresses_sub = {}
def stop(self):
self._connection.close()
def pop_request(self):
return self.request_queue.get()
- def push_response(self):
+ def push_response(self, item):
self.response_queue.put(item)
def pop_response(self):
return self.response_queue.get()
+ def subscribe_to_numblocks(self,message_id):
+ with self.lock:
+ self.numblocks_sub = message_id
+
+ def subscribe_to_address(self,address,message_id,status):
+ with self.lock:
+ self.addresses_sub[address] = message_id,status
+
+
class TcpClientResponder(threading.Thread):
def __init__(self, shared, session):
return None
def parse(self):
- while True:
- raw_buffer = self.message.find('\n')
- if raw_buffer == -1:
- return True
+ raw_buffer = self.message.find('\n')
+ if raw_buffer == -1:
+ return True
- command = self.message[0:raw_buffer].strip()
- self.message = self.message[raw_buffer + 1:]
- if command == 'quit':
- return False
+ raw_command = self.message[0:raw_buffer].strip()
+ self.message = self.message[raw_buffer + 1:]
+ if raw_command == 'quit':
+ return False
- try:
- command = json.loads(command)
- except:
- print "json error", repr(command)
- continue
+ try:
+ command = json.loads(raw_command)
+ except:
+ self.session.push_response(
+ {"error": "bad JSON", "request": raw_command})
+ return True
- try:
- message_id = command.get('id')
- method = command.get('method')
- params = command.get('params')
- except:
- print "syntax error", repr(command), self.session.address[0]
- continue
+ try:
+ # Try to load vital fields, and return an error if
+ # unsuccessful.
+ message_id = command['id']
+ method = command['method']
+ except KeyError:
+ # Return an error JSON in response.
+ self.session.push_response(
+ {"error": "syntax error", "request": raw_command})
+ else:
+ self.session.push_request(command)
- self.session.push_request((message_id, method, params))
- print message_id, method, params
+ return True
class TcpServer(threading.Thread):
- def __init__(self, shared, processor):
+ def __init__(self, shared, processor, host, port):
self.shared = shared
self.processor = processor
self.clients = []
threading.Thread.__init__(self)
self.daemon = True
+ self.host = host
+ self.port = port
def run(self):
print "TCP server started."
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
- sock.bind(("localhost", 50001))
+ sock.bind((self.host, self.port))
sock.listen(1)
while not self.shared.stopped():
session = Session(*sock.accept())
processor.shared = shared
processor.start()
# Create various transports we need
- transports = TcpServer(shared, processor),
+ transports = TcpServer(shared, processor, "176.31.24.241",50001),
for server in transports:
server.start()
while not shared.stopped():