068dbf3b45ff56a18b9081b8945201ad8ce0f16e
[p2pool.git] / p2pool / util / graph.py
1 from __future__ import absolute_import
2 from __future__ import division
3
4 import math
5
6 from p2pool.util import math as math2
7
8
9 class DataViewDescription(object):
10     def __init__(self, bin_count, total_width):
11         self.bin_count = bin_count
12         self.bin_width = total_width/bin_count
13
14 def _shift(x, shift, pad_item):
15     left_pad = math2.clip(shift, (0, len(x)))
16     right_pad = math2.clip(-shift, (0, len(x)))
17     return [pad_item]*left_pad + x[right_pad:-left_pad if left_pad else None] + [pad_item]*right_pad
18
19 combine_bins = math2.add_dicts_ext(lambda (a1, b1), (a2, b2): (a1+a2, b1+b2), (0, 0))
20
21 nothing = object()
22 def keep_largest(n, squash_key=nothing, key=lambda x: x, add_func=lambda a, b: a+b):
23     def _(d):
24         items = sorted(d.iteritems(), key=lambda (k, v): (k != squash_key, key(v)), reverse=True)
25         while len(items) > n:
26             k, v = items.pop()
27             if squash_key is not nothing:
28                 items[-1] = squash_key, add_func(items[-1][1], v)
29         return dict(items)
30     return _
31
32 class DataView(object):
33     def __init__(self, desc, ds_desc, last_bin_end, bins):
34         assert len(bins) == desc.bin_count
35         
36         self.desc = desc
37         self.ds_desc = ds_desc
38         self.last_bin_end = last_bin_end
39         self.bins = bins
40     
41     def _add_datum(self, t, value):
42         if not self.ds_desc.multivalues:
43             value = {'null': value}
44         elif self.ds_desc.multivalue_undefined_means_0 and 'null' not in value:
45             value = dict(value, null=0) # use null to hold sample counter
46         shift = max(0, int(math.ceil((t - self.last_bin_end)/self.desc.bin_width)))
47         self.bins = _shift(self.bins, shift, {})
48         self.last_bin_end += shift*self.desc.bin_width
49         
50         bin = int(math.ceil((self.last_bin_end - self.desc.bin_width - t)/self.desc.bin_width))
51         if bin < self.desc.bin_count:
52             self.bins[bin] = self.ds_desc.keep_largest_func(combine_bins(self.bins[bin], dict((k, (v, 1)) for k, v in value.iteritems())))
53     
54     def get_data(self, t):
55         shift = max(0, int(math.ceil((t - self.last_bin_end)/self.desc.bin_width)))
56         bins = _shift(self.bins, shift, {})
57         last_bin_end = self.last_bin_end + shift*self.desc.bin_width
58         
59         assert last_bin_end - self.desc.bin_width <= t <= last_bin_end
60         
61         def _((i, bin)):
62             left, right = last_bin_end - self.desc.bin_width*(i + 1), min(t, last_bin_end - self.desc.bin_width*i)
63             center, width = (left+right)/2, right-left
64             if self.ds_desc.is_gauge and self.ds_desc.multivalue_undefined_means_0:
65                 real_count = max([0] + [count for total, count in bin.itervalues()])
66                 if real_count == 0:
67                     val = None
68                 else:
69                     val = dict((k, total/real_count) for k, (total, count) in bin.iteritems())
70                 default = 0
71             elif self.ds_desc.is_gauge and not self.ds_desc.multivalue_undefined_means_0:
72                 val = dict((k, total/count) for k, (total, count) in bin.iteritems())
73                 default = None
74             else:
75                 val = dict((k, total/width) for k, (total, count) in bin.iteritems())
76                 default = 0
77             if not self.ds_desc.multivalues:
78                 val = None if val is None else val.get('null', default)
79             return center, val, width, default
80         return map(_, enumerate(bins))
81
82
83 class DataStreamDescription(object):
84     def __init__(self, dataview_descriptions, is_gauge=True, multivalues=False, multivalues_keep=20, multivalues_squash_key=None, multivalue_undefined_means_0=False, default_func=None):
85         self.dataview_descriptions = dataview_descriptions
86         self.is_gauge = is_gauge
87         self.multivalues = multivalues
88         self.keep_largest_func = keep_largest(multivalues_keep, multivalues_squash_key, key=lambda (t, c): t/c if self.is_gauge else t, add_func=lambda (a1, b1), (a2, b2): (a1+a2, b1+b2))
89         self.multivalue_undefined_means_0 = multivalue_undefined_means_0
90         self.default_func = default_func
91
92 class DataStream(object):
93     def __init__(self, desc, dataviews):
94         self.desc = desc
95         self.dataviews = dataviews
96     
97     def add_datum(self, t, value=1):
98         for dv_name, dv in self.dataviews.iteritems():
99             dv._add_datum(t, value)
100
101
102 class HistoryDatabase(object):
103     @classmethod
104     def from_obj(cls, datastream_descriptions, obj={}):
105         def convert_bin(bin):
106             if isinstance(bin, dict):
107                 return bin
108             total, count = bin
109             if not isinstance(total, dict):
110                 total = {'null': total}
111             return dict((k, (v, count)) for k, v in total.iteritems()) if count else {}
112         def get_dataview(ds_name, ds_desc, dv_name, dv_desc):
113             if ds_name in obj:
114                 ds_data = obj[ds_name]
115                 if dv_name in ds_data:
116                     dv_data = ds_data[dv_name]
117                     if dv_data['bin_width'] == dv_desc.bin_width and len(dv_data['bins']) == dv_desc.bin_count:
118                         return DataView(dv_desc, ds_desc, dv_data['last_bin_end'], map(convert_bin, dv_data['bins']))
119             elif ds_desc.default_func is None:
120                 return DataView(dv_desc, ds_desc, 0, dv_desc.bin_count*[{}])
121             else:
122                 return ds_desc.default_func(ds_name, ds_desc, dv_name, dv_desc, obj)
123         return cls(dict(
124             (ds_name, DataStream(ds_desc, dict(
125                 (dv_name, get_dataview(ds_name, ds_desc, dv_name, dv_desc))
126                 for dv_name, dv_desc in ds_desc.dataview_descriptions.iteritems()
127             )))
128             for ds_name, ds_desc in datastream_descriptions.iteritems()
129         ))
130     
131     def __init__(self, datastreams):
132         self.datastreams = datastreams
133     
134     def to_obj(self):
135         return dict((ds_name, dict((dv_name, dict(last_bin_end=dv.last_bin_end, bin_width=dv.desc.bin_width, bins=dv.bins))
136             for dv_name, dv in ds.dataviews.iteritems())) for ds_name, ds in self.datastreams.iteritems())