1 from __future__ import absolute_import
2 from __future__ import division
6 from p2pool.util import math as math2
9 class DataViewDescription(object):
10 def __init__(self, bin_count, total_width):
11 self.bin_count = bin_count
12 self.bin_width = total_width/bin_count
14 def _shift(x, shift, pad_item):
15 left_pad = math2.clip(shift, (0, len(x)))
16 right_pad = math2.clip(-shift, (0, len(x)))
17 return [pad_item]*left_pad + x[right_pad:-left_pad if left_pad else None] + [pad_item]*right_pad
19 combine_bins = math2.add_dicts_ext(lambda (a1, b1), (a2, b2): (a1+a2, b1+b2), (0, 0))
22 def keep_largest(n, squash_key=nothing, key=lambda x: x, add_func=lambda a, b: a+b):
24 items = sorted(d.iteritems(), key=lambda (k, v): (k != squash_key, key(v)), reverse=True)
27 if squash_key is not nothing:
28 items[-1] = squash_key, add_func(items[-1][1], v)
32 class DataView(object):
33 def __init__(self, desc, ds_desc, last_bin_end, bins):
34 assert len(bins) == desc.bin_count
37 self.ds_desc = ds_desc
38 self.last_bin_end = last_bin_end
41 def _add_datum(self, t, value):
42 if not self.ds_desc.multivalues:
44 shift = max(0, int(math.ceil((t - self.last_bin_end)/self.desc.bin_width)))
45 self.bins = _shift(self.bins, shift, {})
46 self.last_bin_end += shift*self.desc.bin_width
48 bin = int(math.ceil((self.last_bin_end - self.desc.bin_width - t)/self.desc.bin_width))
49 if bin < self.desc.bin_count:
50 self.bins[bin] = self.ds_desc.keep_largest_func(combine_bins(self.bins[bin], dict((k, (v, 1)) for k, v in value.iteritems())))
52 def get_data(self, t):
53 shift = max(0, int(math.ceil((t - self.last_bin_end)/self.desc.bin_width)))
54 bins = _shift(self.bins, shift, {})
55 last_bin_end = self.last_bin_end + shift*self.desc.bin_width
57 assert last_bin_end - self.desc.bin_width <= t <= last_bin_end
60 left, right = last_bin_end - self.desc.bin_width*(i + 1), min(t, last_bin_end - self.desc.bin_width*i)
61 center, width = (left+right)/2, right-left
62 if self.ds_desc.source_is_cumulative:
63 val = dict((k, total/count) for k, (total, count) in bin.iteritems())
65 val = dict((k, total/width) for k, (total, count) in bin.iteritems())
66 if not self.ds_desc.multivalues:
67 val = val.get(None, None if self.ds_desc.source_is_cumulative else 0)
68 return center, val, width
69 return map(_, enumerate(bins))
72 class DataStreamDescription(object):
73 def __init__(self, source_is_cumulative, dataview_descriptions, multivalues=False):
74 self.source_is_cumulative = source_is_cumulative
75 self.dataview_descriptions = dataview_descriptions
76 self.multivalues = multivalues
77 self.keep_largest_func = keep_largest(20, None, key=lambda (t, c): t/c if self.source_is_cumulative else t, add_func=lambda (a1, b1), (a2, b2): (a1+a2, b1+b2))
79 class DataStream(object):
80 def __init__(self, desc, dataviews):
82 self.dataviews = dataviews
84 def add_datum(self, t, value=1):
85 for dv_name, dv in self.dataviews.iteritems():
86 dv._add_datum(t, value)
89 class HistoryDatabase(object):
91 def from_obj(cls, datastream_descriptions, obj={}):
93 if isinstance(bin, dict):
96 if not isinstance(total, dict):
98 return dict((k, (v, count)) for k, v in total.iteritems()) if count else {}
99 def get_dataview(ds_name, ds_desc, dv_name, dv_desc):
101 ds_data = obj[ds_name]
102 if dv_name in ds_data:
103 dv_data = ds_data[dv_name]
104 if dv_data['bin_width'] == dv_desc.bin_width and len(dv_data['bins']) == dv_desc.bin_count:
105 return DataView(dv_desc, ds_desc, dv_data['last_bin_end'], map(convert_bin, dv_data['bins']))
106 return DataView(dv_desc, ds_desc, 0, dv_desc.bin_count*[{}])
108 (ds_name, DataStream(ds_desc, dict(
109 (dv_name, get_dataview(ds_name, ds_desc, dv_name, dv_desc))
110 for dv_name, dv_desc in ds_desc.dataview_descriptions.iteritems()
112 for ds_name, ds_desc in datastream_descriptions.iteritems()
115 def __init__(self, datastreams):
116 self.datastreams = datastreams
119 return dict((ds_name, dict((dv_name, dict(last_bin_end=dv.last_bin_end, bin_width=dv.desc.bin_width, bins=dv.bins))
120 for dv_name, dv in ds.dataviews.iteritems())) for ds_name, ds in self.datastreams.iteritems())