added network traffic graph
authorForrest Voight <forrest@forre.st>
Tue, 18 Sep 2012 18:11:36 +0000 (14:11 -0400)
committerForrest Voight <forrest@forre.st>
Tue, 18 Sep 2012 18:25:35 +0000 (14:25 -0400)
p2pool/main.py
p2pool/p2p.py
p2pool/util/p2protocol.py
p2pool/web.py
web-static/graphs.html

index 7c819b7..6f89141 100644 (file)
@@ -68,6 +68,8 @@ def main(args, net, datadir_path, merged_urls, worker_endpoint):
         print 'p2pool (version %s)' % (p2pool.__version__,)
         print
         
+        traffic_happened = variable.Event()
+        
         # connect to bitcoind over JSON-RPC and do initial getmemorypool
         url = 'http://%s:%i/' % (args.bitcoind_address, args.bitcoind_rpc_port)
         print '''Testing bitcoind RPC connection to '%s' with username '%s'...''' % (url, args.bitcoind_rpc_username)
@@ -368,6 +370,7 @@ def main(args, net, datadir_path, merged_urls, worker_endpoint):
             addr_store=addrs,
             connect_addrs=connect_addrs,
             max_incoming_conns=args.p2pool_conns,
+            traffic_happened=traffic_happened,
         )
         p2p_node.start()
         
@@ -454,7 +457,7 @@ def main(args, net, datadir_path, merged_urls, worker_endpoint):
         get_current_txouts = lambda: p2pool_data.get_expected_payouts(tracker, best_share_var.value, bitcoind_work.value['bits'].target, bitcoind_work.value['subsidy'], net)
         
         wb = work.WorkerBridge(my_pubkey_hash, net, args.donation_percentage, bitcoind_work, best_block_header, merged_urls, best_share_var, tracker, my_share_hashes, my_doa_share_hashes, args.worker_fee, p2p_node, submit_block, set_best_share, broadcast_share, block_height_var)
-        web_root = web.get_web_root(tracker, bitcoind_work, get_current_txouts, datadir_path, net, wb.get_stale_counts, my_pubkey_hash, wb.local_rate_monitor, args.worker_fee, p2p_node, wb.my_share_hashes, wb.pseudoshare_received, wb.share_received, best_share_var, bitcoind_warning_var)
+        web_root = web.get_web_root(tracker, bitcoind_work, get_current_txouts, datadir_path, net, wb.get_stale_counts, my_pubkey_hash, wb.local_rate_monitor, args.worker_fee, p2p_node, wb.my_share_hashes, wb.pseudoshare_received, wb.share_received, best_share_var, bitcoind_warning_var, traffic_happened)
         worker_interface.WorkerInterface(wb).attach_to(web_root, get_handler=lambda request: request.redirect('/static/'))
         
         deferral.retry('Error binding to worker port:', traceback=False)(reactor.listenTCP)(worker_endpoint[1], server.Site(web_root), interface=worker_endpoint[0])
index c5a3ef6..0c3f519 100644 (file)
@@ -10,14 +10,14 @@ from twisted.python import failure, log
 import p2pool
 from p2pool import data as p2pool_data
 from p2pool.bitcoin import data as bitcoin_data
-from p2pool.util import deferral, p2protocol, pack
+from p2pool.util import deferral, p2protocol, pack, variable
 
 class PeerMisbehavingError(Exception):
     pass
 
 class Protocol(p2protocol.Protocol):
     def __init__(self, node, incoming):
-        p2protocol.Protocol.__init__(self, node.net.PREFIX, 1000000)
+        p2protocol.Protocol.__init__(self, node.net.PREFIX, 1000000, node.traffic_happened)
         self.node = node
         self.incoming = incoming
         
@@ -417,13 +417,14 @@ class SingleClientFactory(protocol.ReconnectingClientFactory):
         self.node.lost_conn(proto, reason)
 
 class Node(object):
-    def __init__(self, best_share_hash_func, port, net, addr_store={}, connect_addrs=set(), desired_outgoing_conns=10, max_outgoing_attempts=30, max_incoming_conns=50, preferred_storage=1000):
+    def __init__(self, best_share_hash_func, port, net, addr_store={}, connect_addrs=set(), desired_outgoing_conns=10, max_outgoing_attempts=30, max_incoming_conns=50, preferred_storage=1000, traffic_happened=variable.Event()):
         self.best_share_hash_func = best_share_hash_func
         self.port = port
         self.net = net
         self.addr_store = dict(addr_store)
         self.connect_addrs = connect_addrs
         self.preferred_storage = preferred_storage
+        self.traffic_happened = traffic_happened
         
         self.nonce = random.randrange(2**64)
         self.peers = {}
index ec3a6f9..433d79a 100644 (file)
@@ -15,11 +15,12 @@ class TooLong(Exception):
     pass
 
 class Protocol(protocol.Protocol):
-    def __init__(self, message_prefix, max_payload_length):
+    def __init__(self, message_prefix, max_payload_length, traffic_happened=variable.Event()):
         self._message_prefix = message_prefix
         self._max_payload_length = max_payload_length
-        self.dataReceived = datachunker.DataChunker(self.dataReceiver())
+        self.dataReceived2 = datachunker.DataChunker(self.dataReceiver())
         self.paused_var = variable.Variable(False)
+        self.traffic_happened = traffic_happened
     
     def connectionMade(self):
         self.transport.registerProducer(self, True)
@@ -33,6 +34,10 @@ class Protocol(protocol.Protocol):
     def stopProducing(self):
         pass
     
+    def dataReceived(self, data):
+        self.traffic_happened.happened('p2p/in', len(data))
+        self.dataReceived2(data)
+    
     def dataReceiver(self):
         while True:
             start = ''
@@ -86,7 +91,9 @@ class Protocol(protocol.Protocol):
         payload = type_.pack(payload2)
         if len(payload) > self._max_payload_length:
             raise TooLong('payload too long')
-        self.transport.write(self._message_prefix + struct.pack('<12sI', command, len(payload)) + hashlib.sha256(hashlib.sha256(payload).digest()).digest()[:4] + payload)
+        data = self._message_prefix + struct.pack('<12sI', command, len(payload)) + hashlib.sha256(hashlib.sha256(payload).digest()).digest()[:4] + payload
+        self.traffic_happened.happened('p2p/out', len(data))
+        self.transport.write(data)
         return self.paused_var.get_when_satisfies(lambda paused: not paused)
     
     def __getattr__(self, attr):
index 9a28355..b0c11f8 100644 (file)
@@ -44,7 +44,7 @@ def _atomic_write(filename, data):
         os.remove(filename)
         os.rename(filename + '.new', filename)
 
-def get_web_root(tracker, bitcoind_work, get_current_txouts, datadir_path, net, get_stale_counts, my_pubkey_hash, local_rate_monitor, worker_fee, p2p_node, my_share_hashes, pseudoshare_received, share_received, best_share_var, bitcoin_warning_var):
+def get_web_root(tracker, bitcoind_work, get_current_txouts, datadir_path, net, get_stale_counts, my_pubkey_hash, local_rate_monitor, worker_fee, p2p_node, my_share_hashes, pseudoshare_received, share_received, best_share_var, bitcoin_warning_var, traffic_happened):
     start_time = time.time()
     
     web_root = resource.Resource()
@@ -380,6 +380,7 @@ def get_web_root(tracker, bitcoind_work, get_current_txouts, datadir_path, net,
         'miner_dead_hash_rates': graph.DataStreamDescription(dataview_descriptions, is_gauge=False, multivalues=True),
         'desired_versions': graph.DataStreamDescription(dataview_descriptions, multivalues=True,
             multivalue_undefined_means_0=True),
+        'traffic_rate': graph.DataStreamDescription(dataview_descriptions, is_gauge=False, multivalues=True),
     }, hd_obj)
     task.LoopingCall(lambda: _atomic_write(hd_path, json.dumps(hd.to_obj()))).start(100)
     @pseudoshare_received.watch
@@ -398,6 +399,9 @@ def get_web_root(tracker, bitcoind_work, get_current_txouts, datadir_path, net,
         hd.datastreams['local_share_hash_rate'].add_datum(t, work)
         if dead:
             hd.datastreams['local_dead_share_hash_rate'].add_datum(t, work)
+    @traffic_happened.watch
+    def _(name, bytes):
+        hd.datastreams['traffic_rate'].add_datum(time.time(), {name: bytes})
     def add_point():
         if tracker.get_height(best_share_var.value) < 720:
             return
index 6577123..ad4f1ca 100644 (file)
@@ -51,6 +51,9 @@
         <h2>Desired versions</h2>
         <svg id="desired_versions"></svg>
         
+        <h2>Traffic rate</h2>
+        <svg id="traffic_rate"></svg>
+        
         <script type="text/javascript">
             function compose() {
                 var funcs = arguments;
                     
                     var text_boxes = [];
                     var total = 0;
+                    var total_area = 0;
                     for(var i = 0; i < lines.length; ++i) {
                         var line = lines[i];
                         var stats = get_area_mean(line.data);
                                 .attr("fill", line.color)
                                 .attr("x", w - margin_h + 10)
                                 .attr("y", y(num/denom))]);
+                            if(total_unit != null)
+                                text_boxes.push([i, g.append("svg:text")
+                                    .text("Area: " + d3.format(".3s")(stats.area) + total_unit)
+                                    .attr("text-anchor", "start")
+                                    .attr("dominant-baseline", "central")
+                                    .attr("fill", line.color)
+                                    .attr("x", w - margin_h + 10)
+                                    .attr("y", y(num/denom) + 12)]);
                             total += stats.mean;
+                            total_area += stats.area;
                         }
                     }
                     text_boxes.push([i, g.append("svg:text")
                         .attr("fill", "black")
                         .attr("x", w - margin_h)
                         .attr("y", y(total))]);
+                    if(total_unit != null)
+                        text_boxes.push([i, g.append("svg:text")
+                            .text("Area: " + d3.format(".3s")(total_area) + total_unit)
+                            .attr("text-anchor", "start")
+                            .attr("dominant-baseline", "central")
+                            .attr("fill", "black")
+                            .attr("x", w - margin_h + 10)
+                            .attr("y", y(total) + 12)]);
                 } else {
                     var y = d3.scale.linear().domain([
                         0,
                 d3.json("/web/graph_data/desired_versions/last_" + lowerperiod, function(data) {
                     plot(d3.select('#desired_versions'), '', null, data_to_lines(data, function(line){ return parseInt(line.label) }), true);
                 });
+                
+                d3.json("/web/graph_data/traffic_rate/last_" + lowerperiod, function(data) {
+                    plot(d3.select('#traffic_rate'), 'B/s', 'B', data_to_lines(data, function(line){ return parseInt(line.label) }), true);
+                });
             }
             
             d3.json('/web/currency_info', function(currency_info) {