display per-miner current payout in graphs
authorForrest Voight <forrest@forre.st>
Sat, 14 Apr 2012 16:46:24 +0000 (12:46 -0400)
committerForrest Voight <forrest@forre.st>
Sun, 15 Apr 2012 05:20:00 +0000 (01:20 -0400)
p2pool/main.py
p2pool/test/util/test_graph.py [new file with mode: 0644]
p2pool/util/graph.py
p2pool/util/math.py
p2pool/web.py
web-static/graphs.html

index 88ce1ba..3fc8858 100644 (file)
@@ -447,7 +447,7 @@ def main(args, net, datadir_path, merged_urls, worker_endpoint):
                 self.new_work_event = current_work.changed
                 self.recent_shares_ts_work = []
             
-            def preprocess_request(self, request):
+            def get_user_details(self, request):
                 user = request.getUser() if request.getUser() is not None else ''
                 
                 desired_pseudoshare_target = None
@@ -474,6 +474,10 @@ def main(args, net, datadir_path, merged_urls, worker_endpoint):
                     except: # XXX blah
                         pubkey_hash = my_pubkey_hash
                 
+                return user, pubkey_hash, desired_share_target, desired_pseudoshare_target
+            
+            def preprocess_request(self, request):
+                user, pubkey_hash, desired_share_target, desired_pseudoshare_target = self.get_user_details(request)
                 return pubkey_hash, desired_share_target, desired_pseudoshare_target
             
             def get_work(self, pubkey_hash, desired_share_target, desired_pseudoshare_target):
@@ -558,6 +562,7 @@ def main(args, net, datadir_path, merged_urls, worker_endpoint):
                 received_header_hashes = set()
                 
                 def got_response(header, request):
+                    user, _, _, _ = self.get_user_details(request)
                     assert header['merkle_root'] == merkle_root
                     
                     header_hash = bitcoin_data.hash256(bitcoin_data.block_header_type.pack(header))
@@ -644,11 +649,11 @@ def main(args, net, datadir_path, merged_urls, worker_endpoint):
                     else:
                         received_header_hashes.add(header_hash)
                         
-                        pseudoshare_received.happened(bitcoin_data.target_to_average_attempts(target), not on_time, request.getUser())
+                        pseudoshare_received.happened(bitcoin_data.target_to_average_attempts(target), not on_time, user)
                         self.recent_shares_ts_work.append((time.time(), bitcoin_data.target_to_average_attempts(target)))
                         while len(self.recent_shares_ts_work) > 50:
                             self.recent_shares_ts_work.pop(0)
-                        local_rate_monitor.add_datum(dict(work=bitcoin_data.target_to_average_attempts(target), dead=not on_time, user=request.getUser()))
+                        local_rate_monitor.add_datum(dict(work=bitcoin_data.target_to_average_attempts(target), dead=not on_time, user=user))
                     
                     return on_time
                 
diff --git a/p2pool/test/util/test_graph.py b/p2pool/test/util/test_graph.py
new file mode 100644 (file)
index 0000000..595c2cd
--- /dev/null
@@ -0,0 +1,11 @@
+import unittest
+
+from p2pool.util import graph
+
+class Test(unittest.TestCase):
+    def test_combine_and_keep_largest(self):
+        f = graph.combine_and_keep_largest(3, 'squashed')
+        a, b = dict(a=1, b=2, c=3, d=4, e=5), dict(a=1, b=3, c=5, d=7, e=9)
+        res = f(a, b)
+        assert res == {'squashed': 15, 'e': 14, 'd': 11}
+        assert f(res, dict(squashed=100)) == {'squashed': 115, 'e': 14, 'd': 11}
index 66c1f99..8dea8a1 100644 (file)
@@ -16,6 +16,19 @@ def _shift(x, shift, pad_item):
     right_pad = math2.clip(-shift, (0, len(x)))
     return [pad_item]*left_pad + x[right_pad:-left_pad if left_pad else None] + [pad_item]*right_pad
 
+combine_bins = math2.add_dicts_ext(lambda (a1, b1), (a2, b2): (a1+a2, b1+b2), (0, 0))
+
+nothing = object()
+def keep_largest(n, squash_key=nothing, key=lambda x: x, add_func=lambda a, b: a+b):
+    def _(d):
+        items = sorted(d.iteritems(), key=lambda (k, v): (k != squash_key, key(v)), reverse=True)
+        while len(items) > n:
+            k, v = items.pop()
+            if squash_key is not nothing:
+                items[-1] = squash_key, add_func(items[-1][1], v)
+        return dict(items)
+    return _
+
 class DataView(object):
     def __init__(self, desc, ds_desc, last_bin_end, bins):
         assert len(bins) == desc.bin_count
@@ -26,40 +39,42 @@ class DataView(object):
         self.bins = bins
     
     def _add_datum(self, t, value):
+        if not self.ds_desc.multivalues:
+            value = {None: value}
         shift = max(0, int(math.ceil((t - self.last_bin_end)/self.desc.bin_width)))
-        self.bins = _shift(self.bins, shift, (self.ds_desc.zero_element, 0))
+        self.bins = _shift(self.bins, shift, {})
         self.last_bin_end += shift*self.desc.bin_width
         
         bin = int(math.ceil((self.last_bin_end - self.desc.bin_width - t)/self.desc.bin_width))
-        
-        if bin >= self.desc.bin_count:
-            return
-        
-        prev_total, prev_count = self.bins[bin]
-        self.bins[bin] = self.ds_desc.add_operator(prev_total, value), prev_count + 1
+        if bin < self.desc.bin_count:
+            self.bins[bin] = self.ds_desc.keep_largest_func(combine_bins(self.bins[bin], dict((k, (v, 1)) for k, v in value.iteritems())))
     
     def get_data(self, t):
         shift = max(0, int(math.ceil((t - self.last_bin_end)/self.desc.bin_width)))
-        bins = _shift(self.bins, shift, (self.ds_desc.zero_element, 0))
+        bins = _shift(self.bins, shift, {})
         last_bin_end = self.last_bin_end + shift*self.desc.bin_width
         
         assert last_bin_end - self.desc.bin_width <= t <= last_bin_end
         
-        return [(
-            (min(t, last_bin_end - self.desc.bin_width*i) + (last_bin_end - self.desc.bin_width*(i + 1)))/2, # center time
-            (self.ds_desc.mult_operator(1/count, total) if count else None) if self.ds_desc.source_is_cumulative
-                else self.ds_desc.mult_operator(1/(min(t, last_bin_end - self.desc.bin_width*i) - (last_bin_end - self.desc.bin_width*(i + 1))), total), # value
-            min(t, last_bin_end - self.desc.bin_width*i) - (last_bin_end - self.desc.bin_width*(i + 1)), # width
-        ) for i, (total, count) in enumerate(bins)]
+        def _((i, bin)):
+            left, right = last_bin_end - self.desc.bin_width*(i + 1), min(t, last_bin_end - self.desc.bin_width*i)
+            center, width = (left+right)/2, right-left
+            if self.ds_desc.source_is_cumulative:
+                val = dict((k, total/count) for k, (total, count) in bin.iteritems())
+            else:
+                val = dict((k, total/width) for k, (total, count) in bin.iteritems())
+            if not self.ds_desc.multivalues:
+                val = val.get(None, None if self.ds_desc.source_is_cumulative else 0)
+            return center, val, width
+        return map(_, enumerate(bins))
 
 
 class DataStreamDescription(object):
-    def __init__(self, source_is_cumulative, dataview_descriptions, zero_element=0, add_operator=lambda x, y: x+y, mult_operator=lambda a, x: a*x):
+    def __init__(self, source_is_cumulative, dataview_descriptions, multivalues=False):
         self.source_is_cumulative = source_is_cumulative
         self.dataview_descriptions = dataview_descriptions
-        self.zero_element = zero_element
-        self.add_operator = add_operator
-        self.mult_operator = mult_operator
+        self.multivalues = multivalues
+        self.keep_largest_func = keep_largest(20, None, key=lambda (t, c): t/c if self.source_is_cumulative else t, add_func=lambda (a1, b1), (a2, b2): (a1+a2, b1+b2))
 
 class DataStream(object):
     def __init__(self, desc, dataviews):
@@ -74,14 +89,21 @@ class DataStream(object):
 class HistoryDatabase(object):
     @classmethod
     def from_obj(cls, datastream_descriptions, obj={}):
+        def convert_bin(bin):
+            if isinstance(bin, dict):
+                return bin
+            total, count = bin
+            if not isinstance(total, dict):
+                total = {None: total}
+            return dict((k, (v, count)) for k, v in total.iteritems()) if count else {}
         def get_dataview(ds_name, ds_desc, dv_name, dv_desc):
             if ds_name in obj:
                 ds_data = obj[ds_name]
                 if dv_name in ds_data:
                     dv_data = ds_data[dv_name]
                     if dv_data['bin_width'] == dv_desc.bin_width and len(dv_data['bins']) == dv_desc.bin_count:
-                        return DataView(dv_desc, ds_desc, dv_data['last_bin_end'], dv_data['bins'])
-            return DataView(dv_desc, ds_desc, 0, dv_desc.bin_count*[(ds_desc.zero_element, 0)])
+                        return DataView(dv_desc, ds_desc, dv_data['last_bin_end'], map(convert_bin, dv_data['bins']))
+            return DataView(dv_desc, ds_desc, 0, dv_desc.bin_count*[{}])
         return cls(dict(
             (ds_name, DataStream(ds_desc, dict(
                 (dv_name, get_dataview(ds_name, ds_desc, dv_name, dv_desc))
index 1d71bb0..eb109c1 100644 (file)
@@ -60,12 +60,15 @@ def geometric(p):
         return 1
     return int(math.log1p(-random.random()) / math.log1p(-p)) + 1
 
-def add_dicts(*dicts):
-    res = {}
-    for d in dicts:
-        for k, v in d.iteritems():
-            res[k] = res.get(k, 0) + v
-    return dict((k, v) for k, v in res.iteritems() if v)
+def add_dicts_ext(add_func=lambda a, b: a+b, zero=0):
+    def add_dicts(*dicts):
+        res = {}
+        for d in dicts:
+            for k, v in d.iteritems():
+                res[k] = add_func(res.get(k, zero), v)
+        return dict((k, v) for k, v in res.iteritems() if v != zero)
+    return add_dicts
+add_dicts = add_dicts_ext()
 
 mult_dict = lambda c, x: dict((k, c*v) for k, v in x.iteritems())
 
index 0a58901..6a469f5 100644 (file)
@@ -88,6 +88,16 @@ def get_web_root(tracker, current_work, current_work2, get_current_txouts, datad
             if bitcoin_data.script2_to_address(script, net.PARENT) is not None
         )
     
+    def get_local_rates():
+        miner_hash_rates = {}
+        miner_dead_hash_rates = {}
+        datums, dt = local_rate_monitor.get_datums_in_last()
+        for datum in datums:
+            miner_hash_rates[datum['user']] = miner_hash_rates.get(datum['user'], 0) + datum['work']/dt
+            if datum['dead']:
+                miner_dead_hash_rates[datum['user']] = miner_dead_hash_rates.get(datum['user'], 0) + datum['work']/dt
+        return miner_hash_rates, miner_dead_hash_rates
+    
     def get_global_stats():
         # averaged over last hour
         lookbehind = 3600//net.SHARE_PERIOD
@@ -124,14 +134,7 @@ def get_web_root(tracker, current_work, current_work2, get_current_txouts, datad
             tracker.shares[tracker.get_nth_parent_hash(current_work.value['best_share_hash'], lookbehind - 1)].timestamp)
         share_att_s = my_work / actual_time
         
-        miner_hash_rates = {}
-        miner_dead_hash_rates = {}
-        datums, dt = local_rate_monitor.get_datums_in_last()
-        for datum in datums:
-            miner_hash_rates[datum['user']] = miner_hash_rates.get(datum['user'], 0) + datum['work']/dt
-            if datum['dead']:
-                miner_dead_hash_rates[datum['user']] = miner_dead_hash_rates.get(datum['user'], 0) + datum['work']/dt
-        
+        miner_hash_rates, miner_dead_hash_rates = get_local_rates()
         (stale_orphan_shares, stale_doa_shares), shares, _ = get_stale_counts()
         
         return dict(
@@ -206,14 +209,7 @@ def get_web_root(tracker, current_work, current_work2, get_current_txouts, datad
         
         global_stale_prop = p2pool_data.get_average_stale_prop(tracker, current_work.value['best_share_hash'], lookbehind)
         (stale_orphan_shares, stale_doa_shares), shares, _ = get_stale_counts()
-        
-        miner_hash_rates = {}
-        miner_dead_hash_rates = {}
-        datums, dt = local_rate_monitor.get_datums_in_last()
-        for datum in datums:
-            miner_hash_rates[datum['user']] = miner_hash_rates.get(datum['user'], 0) + datum['work']/dt
-            if datum['dead']:
-                miner_dead_hash_rates[datum['user']] = miner_dead_hash_rates.get(datum['user'], 0) + datum['work']/dt
+        miner_hash_rates, miner_dead_hash_rates = get_local_rates()
         
         stat_log.append(dict(
             time=time.time(),
@@ -308,12 +304,6 @@ def get_web_root(tracker, current_work, current_work2, get_current_txouts, datad
         'last_month': graph.DataViewDescription(300, 60*60*24*30),
         'last_year': graph.DataViewDescription(300, 60*60*24*365.25),
     }
-    def combine_and_keep_largest(*dicts):
-        res = {}
-        for d in dicts:
-            for k, v in d.iteritems():
-                res[k] = res.get(k, 0) + v
-        return dict((k, v) for k, v in sorted(res.iteritems(), key=lambda (k, v): v)[-30:] if v)
     hd = graph.HistoryDatabase.from_obj({
         'local_hash_rate': graph.DataStreamDescription(False, dataview_descriptions),
         'local_dead_hash_rate': graph.DataStreamDescription(False, dataview_descriptions),
@@ -322,10 +312,11 @@ def get_web_root(tracker, current_work, current_work2, get_current_txouts, datad
         'pool_rate': graph.DataStreamDescription(True, dataview_descriptions),
         'pool_stale_rate': graph.DataStreamDescription(True, dataview_descriptions),
         'current_payout': graph.DataStreamDescription(True, dataview_descriptions),
+        'current_payouts': graph.DataStreamDescription(True, dataview_descriptions, multivalues=True),
         'incoming_peers': graph.DataStreamDescription(True, dataview_descriptions),
         'outgoing_peers': graph.DataStreamDescription(True, dataview_descriptions),
-        'miner_hash_rates': graph.DataStreamDescription(False, dataview_descriptions, {}, combine_and_keep_largest, math.mult_dict),
-        'miner_dead_hash_rates': graph.DataStreamDescription(False, dataview_descriptions, {}, combine_and_keep_largest, math.mult_dict),
+        'miner_hash_rates': graph.DataStreamDescription(False, dataview_descriptions, multivalues=True),
+        'miner_dead_hash_rates': graph.DataStreamDescription(False, dataview_descriptions, multivalues=True),
     }, hd_obj)
     task.LoopingCall(lambda: _atomic_write(hd_path, json.dumps(hd.to_obj()))).start(100)
     @pseudoshare_received.watch
@@ -352,7 +343,11 @@ def get_web_root(tracker, current_work, current_work2, get_current_txouts, datad
         t = time.time()
         hd.datastreams['pool_rate'].add_datum(t, poolrate)
         hd.datastreams['pool_stale_rate'].add_datum(t, poolrate - nonstalerate)
-        hd.datastreams['current_payout'].add_datum(t, get_current_txouts().get(bitcoin_data.pubkey_hash_to_script2(my_pubkey_hash), 0)*1e-8)
+        current_txouts = get_current_txouts()
+        hd.datastreams['current_payout'].add_datum(t, current_txouts.get(bitcoin_data.pubkey_hash_to_script2(my_pubkey_hash), 0)*1e-8)
+        miner_hash_rates, miner_dead_hash_rates = get_local_rates()
+        current_txouts_by_address = dict((bitcoin_data.script2_to_address(script, net.PARENT), amount) for script, amount in current_txouts.iteritems())
+        hd.datastreams['current_payouts'].add_datum(t, dict((user, current_txouts_by_address[user]*1e-8) for user in miner_hash_rates if user in current_txouts_by_address))
         hd.datastreams['incoming_peers'].add_datum(t, sum(1 for peer in p2p_node.peers.itervalues() if peer.incoming))
         hd.datastreams['outgoing_peers'].add_datum(t, sum(1 for peer in p2p_node.peers.itervalues() if not peer.incoming))
     task.LoopingCall(add_point).start(5)
index c9f4437..1c95ea4 100644 (file)
                 
                 d3.json("/web/graph_data/miner_hash_rates/last_" + lowerperiod, function(data) {
                     d3.json("/web/graph_data/miner_dead_hash_rates/last_" + lowerperiod, function(dead_data) {
-                        var users = {}; for(var i = 0; i < data.length; ++i) for(var u in data[i][1]) users[u] = null; for(var i = 0; i < dead_data.length; ++i) for(var u in dead_data[i][1]) users[u] = null;
-                        var userlist = []; for(var u in users) userlist.push(u);
-                        userlist.sort();
-                        d3.select("#miners").selectAll("*").remove();
-                        var div = d3.select("#miners").selectAll().data(userlist).enter().append("div");
-                        div.append("h3").text(identity);
-                        div.append("svg:svg").each(function(u) {
-                            plot(d3.select(this), "H/s", "H", [
-                                {"data": data, "value_getter": function(d) { return u in d ? d[u] : 0; }, "color": "#0000FF", "label": "Total"},
-                                {"data": dead_data, "value_getter": function(d) { return u in d ? d[u] : 0; }, "color": "#FF0000", "label": "Dead"}
-                            ]);
+                        d3.json("/web/graph_data/current_payouts/last_" + lowerperiod, function(current_payouts) {
+                            var users = {}; for(var i = 0; i < data.length; ++i) for(var u in data[i][1]) users[u] = null; for(var i = 0; i < dead_data.length; ++i) for(var u in dead_data[i][1]) users[u] = null;
+                            var userlist = []; for(var u in users) userlist.push(u);
+                            userlist.sort();
+                            d3.select("#miners").selectAll("*").remove();
+                            var div = d3.select("#miners").selectAll().data(userlist).enter().append("div");
+                            div.append("h3").text(identity);
+                            div.append("svg:svg").each(function(u) {
+                                plot(d3.select(this), "H/s", "H", [
+                                    {"data": data, "value_getter": function(d) { return u in d ? d[u] : 0; }, "color": "#0000FF", "label": "Total"},
+                                    {"data": dead_data, "value_getter": function(d) { return u in d ? d[u] : 0; }, "color": "#FF0000", "label": "Dead"}
+                                ]);
+                            });
+                            div.append("svg:svg").each(function(u) {
+                                plot(d3.select(this), "BTC", null, [
+                                    {"data": current_payouts, "value_getter": function(d) { return u in d ? d[u] : null; }, "color": "#0000FF"}
+                                ]);
+                            });
                         });
                     });
                 });