self.new_work_event = current_work.changed
self.recent_shares_ts_work = []
- def preprocess_request(self, request):
+ def get_user_details(self, request):
user = request.getUser() if request.getUser() is not None else ''
desired_pseudoshare_target = None
except: # XXX blah
pubkey_hash = my_pubkey_hash
+ return user, pubkey_hash, desired_share_target, desired_pseudoshare_target
+
+ def preprocess_request(self, request):
+ user, pubkey_hash, desired_share_target, desired_pseudoshare_target = self.get_user_details(request)
return pubkey_hash, desired_share_target, desired_pseudoshare_target
def get_work(self, pubkey_hash, desired_share_target, desired_pseudoshare_target):
received_header_hashes = set()
def got_response(header, request):
+ user, _, _, _ = self.get_user_details(request)
assert header['merkle_root'] == merkle_root
header_hash = bitcoin_data.hash256(bitcoin_data.block_header_type.pack(header))
else:
received_header_hashes.add(header_hash)
- pseudoshare_received.happened(bitcoin_data.target_to_average_attempts(target), not on_time, request.getUser())
+ pseudoshare_received.happened(bitcoin_data.target_to_average_attempts(target), not on_time, user)
self.recent_shares_ts_work.append((time.time(), bitcoin_data.target_to_average_attempts(target)))
while len(self.recent_shares_ts_work) > 50:
self.recent_shares_ts_work.pop(0)
- local_rate_monitor.add_datum(dict(work=bitcoin_data.target_to_average_attempts(target), dead=not on_time, user=request.getUser()))
+ local_rate_monitor.add_datum(dict(work=bitcoin_data.target_to_average_attempts(target), dead=not on_time, user=user))
return on_time
--- /dev/null
+import unittest
+
+from p2pool.util import graph
+
+class Test(unittest.TestCase):
+ def test_combine_and_keep_largest(self):
+ f = graph.combine_and_keep_largest(3, 'squashed')
+ a, b = dict(a=1, b=2, c=3, d=4, e=5), dict(a=1, b=3, c=5, d=7, e=9)
+ res = f(a, b)
+ assert res == {'squashed': 15, 'e': 14, 'd': 11}
+ assert f(res, dict(squashed=100)) == {'squashed': 115, 'e': 14, 'd': 11}
right_pad = math2.clip(-shift, (0, len(x)))
return [pad_item]*left_pad + x[right_pad:-left_pad if left_pad else None] + [pad_item]*right_pad
+combine_bins = math2.add_dicts_ext(lambda (a1, b1), (a2, b2): (a1+a2, b1+b2), (0, 0))
+
+nothing = object()
+def keep_largest(n, squash_key=nothing, key=lambda x: x, add_func=lambda a, b: a+b):
+ def _(d):
+ items = sorted(d.iteritems(), key=lambda (k, v): (k != squash_key, key(v)), reverse=True)
+ while len(items) > n:
+ k, v = items.pop()
+ if squash_key is not nothing:
+ items[-1] = squash_key, add_func(items[-1][1], v)
+ return dict(items)
+ return _
+
class DataView(object):
def __init__(self, desc, ds_desc, last_bin_end, bins):
assert len(bins) == desc.bin_count
self.bins = bins
def _add_datum(self, t, value):
+ if not self.ds_desc.multivalues:
+ value = {None: value}
shift = max(0, int(math.ceil((t - self.last_bin_end)/self.desc.bin_width)))
- self.bins = _shift(self.bins, shift, (self.ds_desc.zero_element, 0))
+ self.bins = _shift(self.bins, shift, {})
self.last_bin_end += shift*self.desc.bin_width
bin = int(math.ceil((self.last_bin_end - self.desc.bin_width - t)/self.desc.bin_width))
-
- if bin >= self.desc.bin_count:
- return
-
- prev_total, prev_count = self.bins[bin]
- self.bins[bin] = self.ds_desc.add_operator(prev_total, value), prev_count + 1
+ if bin < self.desc.bin_count:
+ self.bins[bin] = self.ds_desc.keep_largest_func(combine_bins(self.bins[bin], dict((k, (v, 1)) for k, v in value.iteritems())))
def get_data(self, t):
shift = max(0, int(math.ceil((t - self.last_bin_end)/self.desc.bin_width)))
- bins = _shift(self.bins, shift, (self.ds_desc.zero_element, 0))
+ bins = _shift(self.bins, shift, {})
last_bin_end = self.last_bin_end + shift*self.desc.bin_width
assert last_bin_end - self.desc.bin_width <= t <= last_bin_end
- return [(
- (min(t, last_bin_end - self.desc.bin_width*i) + (last_bin_end - self.desc.bin_width*(i + 1)))/2, # center time
- (self.ds_desc.mult_operator(1/count, total) if count else None) if self.ds_desc.source_is_cumulative
- else self.ds_desc.mult_operator(1/(min(t, last_bin_end - self.desc.bin_width*i) - (last_bin_end - self.desc.bin_width*(i + 1))), total), # value
- min(t, last_bin_end - self.desc.bin_width*i) - (last_bin_end - self.desc.bin_width*(i + 1)), # width
- ) for i, (total, count) in enumerate(bins)]
+ def _((i, bin)):
+ left, right = last_bin_end - self.desc.bin_width*(i + 1), min(t, last_bin_end - self.desc.bin_width*i)
+ center, width = (left+right)/2, right-left
+ if self.ds_desc.source_is_cumulative:
+ val = dict((k, total/count) for k, (total, count) in bin.iteritems())
+ else:
+ val = dict((k, total/width) for k, (total, count) in bin.iteritems())
+ if not self.ds_desc.multivalues:
+ val = val.get(None, None if self.ds_desc.source_is_cumulative else 0)
+ return center, val, width
+ return map(_, enumerate(bins))
class DataStreamDescription(object):
- def __init__(self, source_is_cumulative, dataview_descriptions, zero_element=0, add_operator=lambda x, y: x+y, mult_operator=lambda a, x: a*x):
+ def __init__(self, source_is_cumulative, dataview_descriptions, multivalues=False):
self.source_is_cumulative = source_is_cumulative
self.dataview_descriptions = dataview_descriptions
- self.zero_element = zero_element
- self.add_operator = add_operator
- self.mult_operator = mult_operator
+ self.multivalues = multivalues
+ self.keep_largest_func = keep_largest(20, None, key=lambda (t, c): t/c if self.source_is_cumulative else t, add_func=lambda (a1, b1), (a2, b2): (a1+a2, b1+b2))
class DataStream(object):
def __init__(self, desc, dataviews):
class HistoryDatabase(object):
@classmethod
def from_obj(cls, datastream_descriptions, obj={}):
+ def convert_bin(bin):
+ if isinstance(bin, dict):
+ return bin
+ total, count = bin
+ if not isinstance(total, dict):
+ total = {None: total}
+ return dict((k, (v, count)) for k, v in total.iteritems()) if count else {}
def get_dataview(ds_name, ds_desc, dv_name, dv_desc):
if ds_name in obj:
ds_data = obj[ds_name]
if dv_name in ds_data:
dv_data = ds_data[dv_name]
if dv_data['bin_width'] == dv_desc.bin_width and len(dv_data['bins']) == dv_desc.bin_count:
- return DataView(dv_desc, ds_desc, dv_data['last_bin_end'], dv_data['bins'])
- return DataView(dv_desc, ds_desc, 0, dv_desc.bin_count*[(ds_desc.zero_element, 0)])
+ return DataView(dv_desc, ds_desc, dv_data['last_bin_end'], map(convert_bin, dv_data['bins']))
+ return DataView(dv_desc, ds_desc, 0, dv_desc.bin_count*[{}])
return cls(dict(
(ds_name, DataStream(ds_desc, dict(
(dv_name, get_dataview(ds_name, ds_desc, dv_name, dv_desc))
return 1
return int(math.log1p(-random.random()) / math.log1p(-p)) + 1
-def add_dicts(*dicts):
- res = {}
- for d in dicts:
- for k, v in d.iteritems():
- res[k] = res.get(k, 0) + v
- return dict((k, v) for k, v in res.iteritems() if v)
+def add_dicts_ext(add_func=lambda a, b: a+b, zero=0):
+ def add_dicts(*dicts):
+ res = {}
+ for d in dicts:
+ for k, v in d.iteritems():
+ res[k] = add_func(res.get(k, zero), v)
+ return dict((k, v) for k, v in res.iteritems() if v != zero)
+ return add_dicts
+add_dicts = add_dicts_ext()
mult_dict = lambda c, x: dict((k, c*v) for k, v in x.iteritems())
if bitcoin_data.script2_to_address(script, net.PARENT) is not None
)
+ def get_local_rates():
+ miner_hash_rates = {}
+ miner_dead_hash_rates = {}
+ datums, dt = local_rate_monitor.get_datums_in_last()
+ for datum in datums:
+ miner_hash_rates[datum['user']] = miner_hash_rates.get(datum['user'], 0) + datum['work']/dt
+ if datum['dead']:
+ miner_dead_hash_rates[datum['user']] = miner_dead_hash_rates.get(datum['user'], 0) + datum['work']/dt
+ return miner_hash_rates, miner_dead_hash_rates
+
def get_global_stats():
# averaged over last hour
lookbehind = 3600//net.SHARE_PERIOD
tracker.shares[tracker.get_nth_parent_hash(current_work.value['best_share_hash'], lookbehind - 1)].timestamp)
share_att_s = my_work / actual_time
- miner_hash_rates = {}
- miner_dead_hash_rates = {}
- datums, dt = local_rate_monitor.get_datums_in_last()
- for datum in datums:
- miner_hash_rates[datum['user']] = miner_hash_rates.get(datum['user'], 0) + datum['work']/dt
- if datum['dead']:
- miner_dead_hash_rates[datum['user']] = miner_dead_hash_rates.get(datum['user'], 0) + datum['work']/dt
-
+ miner_hash_rates, miner_dead_hash_rates = get_local_rates()
(stale_orphan_shares, stale_doa_shares), shares, _ = get_stale_counts()
return dict(
global_stale_prop = p2pool_data.get_average_stale_prop(tracker, current_work.value['best_share_hash'], lookbehind)
(stale_orphan_shares, stale_doa_shares), shares, _ = get_stale_counts()
-
- miner_hash_rates = {}
- miner_dead_hash_rates = {}
- datums, dt = local_rate_monitor.get_datums_in_last()
- for datum in datums:
- miner_hash_rates[datum['user']] = miner_hash_rates.get(datum['user'], 0) + datum['work']/dt
- if datum['dead']:
- miner_dead_hash_rates[datum['user']] = miner_dead_hash_rates.get(datum['user'], 0) + datum['work']/dt
+ miner_hash_rates, miner_dead_hash_rates = get_local_rates()
stat_log.append(dict(
time=time.time(),
'last_month': graph.DataViewDescription(300, 60*60*24*30),
'last_year': graph.DataViewDescription(300, 60*60*24*365.25),
}
- def combine_and_keep_largest(*dicts):
- res = {}
- for d in dicts:
- for k, v in d.iteritems():
- res[k] = res.get(k, 0) + v
- return dict((k, v) for k, v in sorted(res.iteritems(), key=lambda (k, v): v)[-30:] if v)
hd = graph.HistoryDatabase.from_obj({
'local_hash_rate': graph.DataStreamDescription(False, dataview_descriptions),
'local_dead_hash_rate': graph.DataStreamDescription(False, dataview_descriptions),
'pool_rate': graph.DataStreamDescription(True, dataview_descriptions),
'pool_stale_rate': graph.DataStreamDescription(True, dataview_descriptions),
'current_payout': graph.DataStreamDescription(True, dataview_descriptions),
+ 'current_payouts': graph.DataStreamDescription(True, dataview_descriptions, multivalues=True),
'incoming_peers': graph.DataStreamDescription(True, dataview_descriptions),
'outgoing_peers': graph.DataStreamDescription(True, dataview_descriptions),
- 'miner_hash_rates': graph.DataStreamDescription(False, dataview_descriptions, {}, combine_and_keep_largest, math.mult_dict),
- 'miner_dead_hash_rates': graph.DataStreamDescription(False, dataview_descriptions, {}, combine_and_keep_largest, math.mult_dict),
+ 'miner_hash_rates': graph.DataStreamDescription(False, dataview_descriptions, multivalues=True),
+ 'miner_dead_hash_rates': graph.DataStreamDescription(False, dataview_descriptions, multivalues=True),
}, hd_obj)
task.LoopingCall(lambda: _atomic_write(hd_path, json.dumps(hd.to_obj()))).start(100)
@pseudoshare_received.watch
t = time.time()
hd.datastreams['pool_rate'].add_datum(t, poolrate)
hd.datastreams['pool_stale_rate'].add_datum(t, poolrate - nonstalerate)
- hd.datastreams['current_payout'].add_datum(t, get_current_txouts().get(bitcoin_data.pubkey_hash_to_script2(my_pubkey_hash), 0)*1e-8)
+ current_txouts = get_current_txouts()
+ hd.datastreams['current_payout'].add_datum(t, current_txouts.get(bitcoin_data.pubkey_hash_to_script2(my_pubkey_hash), 0)*1e-8)
+ miner_hash_rates, miner_dead_hash_rates = get_local_rates()
+ current_txouts_by_address = dict((bitcoin_data.script2_to_address(script, net.PARENT), amount) for script, amount in current_txouts.iteritems())
+ hd.datastreams['current_payouts'].add_datum(t, dict((user, current_txouts_by_address[user]*1e-8) for user in miner_hash_rates if user in current_txouts_by_address))
hd.datastreams['incoming_peers'].add_datum(t, sum(1 for peer in p2p_node.peers.itervalues() if peer.incoming))
hd.datastreams['outgoing_peers'].add_datum(t, sum(1 for peer in p2p_node.peers.itervalues() if not peer.incoming))
task.LoopingCall(add_point).start(5)
d3.json("/web/graph_data/miner_hash_rates/last_" + lowerperiod, function(data) {
d3.json("/web/graph_data/miner_dead_hash_rates/last_" + lowerperiod, function(dead_data) {
- var users = {}; for(var i = 0; i < data.length; ++i) for(var u in data[i][1]) users[u] = null; for(var i = 0; i < dead_data.length; ++i) for(var u in dead_data[i][1]) users[u] = null;
- var userlist = []; for(var u in users) userlist.push(u);
- userlist.sort();
- d3.select("#miners").selectAll("*").remove();
- var div = d3.select("#miners").selectAll().data(userlist).enter().append("div");
- div.append("h3").text(identity);
- div.append("svg:svg").each(function(u) {
- plot(d3.select(this), "H/s", "H", [
- {"data": data, "value_getter": function(d) { return u in d ? d[u] : 0; }, "color": "#0000FF", "label": "Total"},
- {"data": dead_data, "value_getter": function(d) { return u in d ? d[u] : 0; }, "color": "#FF0000", "label": "Dead"}
- ]);
+ d3.json("/web/graph_data/current_payouts/last_" + lowerperiod, function(current_payouts) {
+ var users = {}; for(var i = 0; i < data.length; ++i) for(var u in data[i][1]) users[u] = null; for(var i = 0; i < dead_data.length; ++i) for(var u in dead_data[i][1]) users[u] = null;
+ var userlist = []; for(var u in users) userlist.push(u);
+ userlist.sort();
+ d3.select("#miners").selectAll("*").remove();
+ var div = d3.select("#miners").selectAll().data(userlist).enter().append("div");
+ div.append("h3").text(identity);
+ div.append("svg:svg").each(function(u) {
+ plot(d3.select(this), "H/s", "H", [
+ {"data": data, "value_getter": function(d) { return u in d ? d[u] : 0; }, "color": "#0000FF", "label": "Total"},
+ {"data": dead_data, "value_getter": function(d) { return u in d ? d[u] : 0; }, "color": "#FF0000", "label": "Dead"}
+ ]);
+ });
+ div.append("svg:svg").each(function(u) {
+ plot(d3.select(this), "BTC", null, [
+ {"data": current_payouts, "value_getter": function(d) { return u in d ? d[u] : null; }, "color": "#0000FF"}
+ ]);
+ });
});
});
});