From dfa50459cec061cb7c0bbc4f6391cdbede5b2ec8 Mon Sep 17 00:00:00 2001 From: Forrest Voight Date: Sat, 14 Apr 2012 12:46:24 -0400 Subject: [PATCH] display per-miner current payout in graphs --- p2pool/main.py | 11 +++++-- p2pool/test/util/test_graph.py | 11 +++++++ p2pool/util/graph.py | 62 +++++++++++++++++++++++++++------------- p2pool/util/math.py | 15 ++++++---- p2pool/web.py | 45 +++++++++++++---------------- web-static/graphs.html | 29 +++++++++++------- 6 files changed, 108 insertions(+), 65 deletions(-) create mode 100644 p2pool/test/util/test_graph.py diff --git a/p2pool/main.py b/p2pool/main.py index 88ce1ba..3fc8858 100644 --- a/p2pool/main.py +++ b/p2pool/main.py @@ -447,7 +447,7 @@ def main(args, net, datadir_path, merged_urls, worker_endpoint): self.new_work_event = current_work.changed self.recent_shares_ts_work = [] - def preprocess_request(self, request): + def get_user_details(self, request): user = request.getUser() if request.getUser() is not None else '' desired_pseudoshare_target = None @@ -474,6 +474,10 @@ def main(args, net, datadir_path, merged_urls, worker_endpoint): except: # XXX blah pubkey_hash = my_pubkey_hash + return user, pubkey_hash, desired_share_target, desired_pseudoshare_target + + def preprocess_request(self, request): + user, pubkey_hash, desired_share_target, desired_pseudoshare_target = self.get_user_details(request) return pubkey_hash, desired_share_target, desired_pseudoshare_target def get_work(self, pubkey_hash, desired_share_target, desired_pseudoshare_target): @@ -558,6 +562,7 @@ def main(args, net, datadir_path, merged_urls, worker_endpoint): received_header_hashes = set() def got_response(header, request): + user, _, _, _ = self.get_user_details(request) assert header['merkle_root'] == merkle_root header_hash = bitcoin_data.hash256(bitcoin_data.block_header_type.pack(header)) @@ -644,11 +649,11 @@ def main(args, net, datadir_path, merged_urls, worker_endpoint): else: received_header_hashes.add(header_hash) - pseudoshare_received.happened(bitcoin_data.target_to_average_attempts(target), not on_time, request.getUser()) + pseudoshare_received.happened(bitcoin_data.target_to_average_attempts(target), not on_time, user) self.recent_shares_ts_work.append((time.time(), bitcoin_data.target_to_average_attempts(target))) while len(self.recent_shares_ts_work) > 50: self.recent_shares_ts_work.pop(0) - local_rate_monitor.add_datum(dict(work=bitcoin_data.target_to_average_attempts(target), dead=not on_time, user=request.getUser())) + local_rate_monitor.add_datum(dict(work=bitcoin_data.target_to_average_attempts(target), dead=not on_time, user=user)) return on_time diff --git a/p2pool/test/util/test_graph.py b/p2pool/test/util/test_graph.py new file mode 100644 index 0000000..595c2cd --- /dev/null +++ b/p2pool/test/util/test_graph.py @@ -0,0 +1,11 @@ +import unittest + +from p2pool.util import graph + +class Test(unittest.TestCase): + def test_combine_and_keep_largest(self): + f = graph.combine_and_keep_largest(3, 'squashed') + a, b = dict(a=1, b=2, c=3, d=4, e=5), dict(a=1, b=3, c=5, d=7, e=9) + res = f(a, b) + assert res == {'squashed': 15, 'e': 14, 'd': 11} + assert f(res, dict(squashed=100)) == {'squashed': 115, 'e': 14, 'd': 11} diff --git a/p2pool/util/graph.py b/p2pool/util/graph.py index 66c1f99..8dea8a1 100644 --- a/p2pool/util/graph.py +++ b/p2pool/util/graph.py @@ -16,6 +16,19 @@ def _shift(x, shift, pad_item): right_pad = math2.clip(-shift, (0, len(x))) return [pad_item]*left_pad + x[right_pad:-left_pad if left_pad else None] + [pad_item]*right_pad +combine_bins = math2.add_dicts_ext(lambda (a1, b1), (a2, b2): (a1+a2, b1+b2), (0, 0)) + +nothing = object() +def keep_largest(n, squash_key=nothing, key=lambda x: x, add_func=lambda a, b: a+b): + def _(d): + items = sorted(d.iteritems(), key=lambda (k, v): (k != squash_key, key(v)), reverse=True) + while len(items) > n: + k, v = items.pop() + if squash_key is not nothing: + items[-1] = squash_key, add_func(items[-1][1], v) + return dict(items) + return _ + class DataView(object): def __init__(self, desc, ds_desc, last_bin_end, bins): assert len(bins) == desc.bin_count @@ -26,40 +39,42 @@ class DataView(object): self.bins = bins def _add_datum(self, t, value): + if not self.ds_desc.multivalues: + value = {None: value} shift = max(0, int(math.ceil((t - self.last_bin_end)/self.desc.bin_width))) - self.bins = _shift(self.bins, shift, (self.ds_desc.zero_element, 0)) + self.bins = _shift(self.bins, shift, {}) self.last_bin_end += shift*self.desc.bin_width bin = int(math.ceil((self.last_bin_end - self.desc.bin_width - t)/self.desc.bin_width)) - - if bin >= self.desc.bin_count: - return - - prev_total, prev_count = self.bins[bin] - self.bins[bin] = self.ds_desc.add_operator(prev_total, value), prev_count + 1 + if bin < self.desc.bin_count: + self.bins[bin] = self.ds_desc.keep_largest_func(combine_bins(self.bins[bin], dict((k, (v, 1)) for k, v in value.iteritems()))) def get_data(self, t): shift = max(0, int(math.ceil((t - self.last_bin_end)/self.desc.bin_width))) - bins = _shift(self.bins, shift, (self.ds_desc.zero_element, 0)) + bins = _shift(self.bins, shift, {}) last_bin_end = self.last_bin_end + shift*self.desc.bin_width assert last_bin_end - self.desc.bin_width <= t <= last_bin_end - return [( - (min(t, last_bin_end - self.desc.bin_width*i) + (last_bin_end - self.desc.bin_width*(i + 1)))/2, # center time - (self.ds_desc.mult_operator(1/count, total) if count else None) if self.ds_desc.source_is_cumulative - else self.ds_desc.mult_operator(1/(min(t, last_bin_end - self.desc.bin_width*i) - (last_bin_end - self.desc.bin_width*(i + 1))), total), # value - min(t, last_bin_end - self.desc.bin_width*i) - (last_bin_end - self.desc.bin_width*(i + 1)), # width - ) for i, (total, count) in enumerate(bins)] + def _((i, bin)): + left, right = last_bin_end - self.desc.bin_width*(i + 1), min(t, last_bin_end - self.desc.bin_width*i) + center, width = (left+right)/2, right-left + if self.ds_desc.source_is_cumulative: + val = dict((k, total/count) for k, (total, count) in bin.iteritems()) + else: + val = dict((k, total/width) for k, (total, count) in bin.iteritems()) + if not self.ds_desc.multivalues: + val = val.get(None, None if self.ds_desc.source_is_cumulative else 0) + return center, val, width + return map(_, enumerate(bins)) class DataStreamDescription(object): - def __init__(self, source_is_cumulative, dataview_descriptions, zero_element=0, add_operator=lambda x, y: x+y, mult_operator=lambda a, x: a*x): + def __init__(self, source_is_cumulative, dataview_descriptions, multivalues=False): self.source_is_cumulative = source_is_cumulative self.dataview_descriptions = dataview_descriptions - self.zero_element = zero_element - self.add_operator = add_operator - self.mult_operator = mult_operator + self.multivalues = multivalues + self.keep_largest_func = keep_largest(20, None, key=lambda (t, c): t/c if self.source_is_cumulative else t, add_func=lambda (a1, b1), (a2, b2): (a1+a2, b1+b2)) class DataStream(object): def __init__(self, desc, dataviews): @@ -74,14 +89,21 @@ class DataStream(object): class HistoryDatabase(object): @classmethod def from_obj(cls, datastream_descriptions, obj={}): + def convert_bin(bin): + if isinstance(bin, dict): + return bin + total, count = bin + if not isinstance(total, dict): + total = {None: total} + return dict((k, (v, count)) for k, v in total.iteritems()) if count else {} def get_dataview(ds_name, ds_desc, dv_name, dv_desc): if ds_name in obj: ds_data = obj[ds_name] if dv_name in ds_data: dv_data = ds_data[dv_name] if dv_data['bin_width'] == dv_desc.bin_width and len(dv_data['bins']) == dv_desc.bin_count: - return DataView(dv_desc, ds_desc, dv_data['last_bin_end'], dv_data['bins']) - return DataView(dv_desc, ds_desc, 0, dv_desc.bin_count*[(ds_desc.zero_element, 0)]) + return DataView(dv_desc, ds_desc, dv_data['last_bin_end'], map(convert_bin, dv_data['bins'])) + return DataView(dv_desc, ds_desc, 0, dv_desc.bin_count*[{}]) return cls(dict( (ds_name, DataStream(ds_desc, dict( (dv_name, get_dataview(ds_name, ds_desc, dv_name, dv_desc)) diff --git a/p2pool/util/math.py b/p2pool/util/math.py index 1d71bb0..eb109c1 100644 --- a/p2pool/util/math.py +++ b/p2pool/util/math.py @@ -60,12 +60,15 @@ def geometric(p): return 1 return int(math.log1p(-random.random()) / math.log1p(-p)) + 1 -def add_dicts(*dicts): - res = {} - for d in dicts: - for k, v in d.iteritems(): - res[k] = res.get(k, 0) + v - return dict((k, v) for k, v in res.iteritems() if v) +def add_dicts_ext(add_func=lambda a, b: a+b, zero=0): + def add_dicts(*dicts): + res = {} + for d in dicts: + for k, v in d.iteritems(): + res[k] = add_func(res.get(k, zero), v) + return dict((k, v) for k, v in res.iteritems() if v != zero) + return add_dicts +add_dicts = add_dicts_ext() mult_dict = lambda c, x: dict((k, c*v) for k, v in x.iteritems()) diff --git a/p2pool/web.py b/p2pool/web.py index 0a58901..6a469f5 100644 --- a/p2pool/web.py +++ b/p2pool/web.py @@ -88,6 +88,16 @@ def get_web_root(tracker, current_work, current_work2, get_current_txouts, datad if bitcoin_data.script2_to_address(script, net.PARENT) is not None ) + def get_local_rates(): + miner_hash_rates = {} + miner_dead_hash_rates = {} + datums, dt = local_rate_monitor.get_datums_in_last() + for datum in datums: + miner_hash_rates[datum['user']] = miner_hash_rates.get(datum['user'], 0) + datum['work']/dt + if datum['dead']: + miner_dead_hash_rates[datum['user']] = miner_dead_hash_rates.get(datum['user'], 0) + datum['work']/dt + return miner_hash_rates, miner_dead_hash_rates + def get_global_stats(): # averaged over last hour lookbehind = 3600//net.SHARE_PERIOD @@ -124,14 +134,7 @@ def get_web_root(tracker, current_work, current_work2, get_current_txouts, datad tracker.shares[tracker.get_nth_parent_hash(current_work.value['best_share_hash'], lookbehind - 1)].timestamp) share_att_s = my_work / actual_time - miner_hash_rates = {} - miner_dead_hash_rates = {} - datums, dt = local_rate_monitor.get_datums_in_last() - for datum in datums: - miner_hash_rates[datum['user']] = miner_hash_rates.get(datum['user'], 0) + datum['work']/dt - if datum['dead']: - miner_dead_hash_rates[datum['user']] = miner_dead_hash_rates.get(datum['user'], 0) + datum['work']/dt - + miner_hash_rates, miner_dead_hash_rates = get_local_rates() (stale_orphan_shares, stale_doa_shares), shares, _ = get_stale_counts() return dict( @@ -206,14 +209,7 @@ def get_web_root(tracker, current_work, current_work2, get_current_txouts, datad global_stale_prop = p2pool_data.get_average_stale_prop(tracker, current_work.value['best_share_hash'], lookbehind) (stale_orphan_shares, stale_doa_shares), shares, _ = get_stale_counts() - - miner_hash_rates = {} - miner_dead_hash_rates = {} - datums, dt = local_rate_monitor.get_datums_in_last() - for datum in datums: - miner_hash_rates[datum['user']] = miner_hash_rates.get(datum['user'], 0) + datum['work']/dt - if datum['dead']: - miner_dead_hash_rates[datum['user']] = miner_dead_hash_rates.get(datum['user'], 0) + datum['work']/dt + miner_hash_rates, miner_dead_hash_rates = get_local_rates() stat_log.append(dict( time=time.time(), @@ -308,12 +304,6 @@ def get_web_root(tracker, current_work, current_work2, get_current_txouts, datad 'last_month': graph.DataViewDescription(300, 60*60*24*30), 'last_year': graph.DataViewDescription(300, 60*60*24*365.25), } - def combine_and_keep_largest(*dicts): - res = {} - for d in dicts: - for k, v in d.iteritems(): - res[k] = res.get(k, 0) + v - return dict((k, v) for k, v in sorted(res.iteritems(), key=lambda (k, v): v)[-30:] if v) hd = graph.HistoryDatabase.from_obj({ 'local_hash_rate': graph.DataStreamDescription(False, dataview_descriptions), 'local_dead_hash_rate': graph.DataStreamDescription(False, dataview_descriptions), @@ -322,10 +312,11 @@ def get_web_root(tracker, current_work, current_work2, get_current_txouts, datad 'pool_rate': graph.DataStreamDescription(True, dataview_descriptions), 'pool_stale_rate': graph.DataStreamDescription(True, dataview_descriptions), 'current_payout': graph.DataStreamDescription(True, dataview_descriptions), + 'current_payouts': graph.DataStreamDescription(True, dataview_descriptions, multivalues=True), 'incoming_peers': graph.DataStreamDescription(True, dataview_descriptions), 'outgoing_peers': graph.DataStreamDescription(True, dataview_descriptions), - 'miner_hash_rates': graph.DataStreamDescription(False, dataview_descriptions, {}, combine_and_keep_largest, math.mult_dict), - 'miner_dead_hash_rates': graph.DataStreamDescription(False, dataview_descriptions, {}, combine_and_keep_largest, math.mult_dict), + 'miner_hash_rates': graph.DataStreamDescription(False, dataview_descriptions, multivalues=True), + 'miner_dead_hash_rates': graph.DataStreamDescription(False, dataview_descriptions, multivalues=True), }, hd_obj) task.LoopingCall(lambda: _atomic_write(hd_path, json.dumps(hd.to_obj()))).start(100) @pseudoshare_received.watch @@ -352,7 +343,11 @@ def get_web_root(tracker, current_work, current_work2, get_current_txouts, datad t = time.time() hd.datastreams['pool_rate'].add_datum(t, poolrate) hd.datastreams['pool_stale_rate'].add_datum(t, poolrate - nonstalerate) - hd.datastreams['current_payout'].add_datum(t, get_current_txouts().get(bitcoin_data.pubkey_hash_to_script2(my_pubkey_hash), 0)*1e-8) + current_txouts = get_current_txouts() + hd.datastreams['current_payout'].add_datum(t, current_txouts.get(bitcoin_data.pubkey_hash_to_script2(my_pubkey_hash), 0)*1e-8) + miner_hash_rates, miner_dead_hash_rates = get_local_rates() + current_txouts_by_address = dict((bitcoin_data.script2_to_address(script, net.PARENT), amount) for script, amount in current_txouts.iteritems()) + hd.datastreams['current_payouts'].add_datum(t, dict((user, current_txouts_by_address[user]*1e-8) for user in miner_hash_rates if user in current_txouts_by_address)) hd.datastreams['incoming_peers'].add_datum(t, sum(1 for peer in p2p_node.peers.itervalues() if peer.incoming)) hd.datastreams['outgoing_peers'].add_datum(t, sum(1 for peer in p2p_node.peers.itervalues() if not peer.incoming)) task.LoopingCall(add_point).start(5) diff --git a/web-static/graphs.html b/web-static/graphs.html index c9f4437..1c95ea4 100644 --- a/web-static/graphs.html +++ b/web-static/graphs.html @@ -217,17 +217,24 @@ d3.json("/web/graph_data/miner_hash_rates/last_" + lowerperiod, function(data) { d3.json("/web/graph_data/miner_dead_hash_rates/last_" + lowerperiod, function(dead_data) { - var users = {}; for(var i = 0; i < data.length; ++i) for(var u in data[i][1]) users[u] = null; for(var i = 0; i < dead_data.length; ++i) for(var u in dead_data[i][1]) users[u] = null; - var userlist = []; for(var u in users) userlist.push(u); - userlist.sort(); - d3.select("#miners").selectAll("*").remove(); - var div = d3.select("#miners").selectAll().data(userlist).enter().append("div"); - div.append("h3").text(identity); - div.append("svg:svg").each(function(u) { - plot(d3.select(this), "H/s", "H", [ - {"data": data, "value_getter": function(d) { return u in d ? d[u] : 0; }, "color": "#0000FF", "label": "Total"}, - {"data": dead_data, "value_getter": function(d) { return u in d ? d[u] : 0; }, "color": "#FF0000", "label": "Dead"} - ]); + d3.json("/web/graph_data/current_payouts/last_" + lowerperiod, function(current_payouts) { + var users = {}; for(var i = 0; i < data.length; ++i) for(var u in data[i][1]) users[u] = null; for(var i = 0; i < dead_data.length; ++i) for(var u in dead_data[i][1]) users[u] = null; + var userlist = []; for(var u in users) userlist.push(u); + userlist.sort(); + d3.select("#miners").selectAll("*").remove(); + var div = d3.select("#miners").selectAll().data(userlist).enter().append("div"); + div.append("h3").text(identity); + div.append("svg:svg").each(function(u) { + plot(d3.select(this), "H/s", "H", [ + {"data": data, "value_getter": function(d) { return u in d ? d[u] : 0; }, "color": "#0000FF", "label": "Total"}, + {"data": dead_data, "value_getter": function(d) { return u in d ? d[u] : 0; }, "color": "#FF0000", "label": "Dead"} + ]); + }); + div.append("svg:svg").each(function(u) { + plot(d3.select(this), "BTC", null, [ + {"data": current_payouts, "value_getter": function(d) { return u in d ? d[u] : null; }, "color": "#0000FF"} + ]); + }); }); }); }); -- 1.7.1