from __future__ import absolute_import
from __future__ import division
-import json
import math
-import os
-from twisted.python import log
-
-
-def _atomic_write(filename, data):
- open(filename + '.new', 'w').write(data)
- os.rename(filename + '.new', filename)
+from p2pool.util import math as math2
class DataViewDescription(object):
self.bin_count = bin_count
self.bin_width = total_width/bin_count
+def _shift(x, shift, pad_item):
+ left_pad = math2.clip(shift, (0, len(x)))
+ right_pad = math2.clip(-shift, (0, len(x)))
+ return [pad_item]*left_pad + x[right_pad:-left_pad if left_pad else None] + [pad_item]*right_pad
+
+combine_bins = math2.add_dicts_ext(lambda (a1, b1), (a2, b2): (a1+a2, b1+b2), (0, 0))
+
+nothing = object()
+def keep_largest(n, squash_key=nothing, key=lambda x: x, add_func=lambda a, b: a+b):
+ def _(d):
+ items = sorted(d.iteritems(), key=lambda (k, v): (k != squash_key, key(v)), reverse=True)
+ while len(items) > n:
+ k, v = items.pop()
+ if squash_key is not nothing:
+ items[-1] = squash_key, add_func(items[-1][1], v)
+ return dict(items)
+ return _
+
class DataView(object):
def __init__(self, desc, ds_desc, last_bin_end, bins):
assert len(bins) == desc.bin_count
self.bins = bins
def _add_datum(self, t, value):
+ if not self.ds_desc.multivalues:
+ value = {'null': value}
+ elif self.ds_desc.multivalue_undefined_means_0 and 'null' not in value:
+ value = dict(value, null=0) # use null to hold sample counter
shift = max(0, int(math.ceil((t - self.last_bin_end)/self.desc.bin_width)))
- self.bins = [(0, 0)]*min(shift, self.desc.bin_count) + self.bins[:max(0, len(self.bins) - shift)]
+ self.bins = _shift(self.bins, shift, {})
self.last_bin_end += shift*self.desc.bin_width
bin = int(math.ceil((self.last_bin_end - self.desc.bin_width - t)/self.desc.bin_width))
+ if bin < self.desc.bin_count:
+ self.bins[bin] = self.ds_desc.keep_largest_func(combine_bins(self.bins[bin], dict((k, (v, 1)) for k, v in value.iteritems())))
+
+ def get_data(self, t):
+ shift = max(0, int(math.ceil((t - self.last_bin_end)/self.desc.bin_width)))
+ bins = _shift(self.bins, shift, {})
+ last_bin_end = self.last_bin_end + shift*self.desc.bin_width
- if bin >= self.desc.bin_count:
- return
+ assert last_bin_end - self.desc.bin_width <= t <= last_bin_end
- prev_total, prev_count = self.bins[bin]
- self.bins[bin] = prev_total + value, prev_count + 1
-
- def get_data(self):
- return [(self.last_bin_end - self.desc.bin_width*(i + 1/2), (total/count if count else None) if self.ds_desc.source_is_cumulative else total/self.desc.bin_width) for i, (total, count) in enumerate(self.bins)]
+ def _((i, bin)):
+ left, right = last_bin_end - self.desc.bin_width*(i + 1), min(t, last_bin_end - self.desc.bin_width*i)
+ center, width = (left+right)/2, right-left
+ if self.ds_desc.is_gauge and self.ds_desc.multivalue_undefined_means_0:
+ real_count = max([0] + [count for total, count in bin.itervalues()])
+ if real_count == 0:
+ val = None
+ else:
+ val = dict((k, total/real_count) for k, (total, count) in bin.iteritems())
+ default = 0
+ elif self.ds_desc.is_gauge and not self.ds_desc.multivalue_undefined_means_0:
+ val = dict((k, total/count) for k, (total, count) in bin.iteritems())
+ default = None
+ else:
+ val = dict((k, total/width) for k, (total, count) in bin.iteritems())
+ default = 0
+ if not self.ds_desc.multivalues:
+ val = None if val is None else val.get('null', default)
+ return center, val, width, default
+ return map(_, enumerate(bins))
class DataStreamDescription(object):
- def __init__(self, source_is_cumulative, dataview_descriptions):
- self.source_is_cumulative = source_is_cumulative
+ def __init__(self, dataview_descriptions, is_gauge=True, multivalues=False, multivalues_keep=20, multivalues_squash_key=None, multivalue_undefined_means_0=False, default_func=None):
self.dataview_descriptions = dataview_descriptions
+ self.is_gauge = is_gauge
+ self.multivalues = multivalues
+ self.keep_largest_func = keep_largest(multivalues_keep, multivalues_squash_key, key=lambda (t, c): t/c if self.is_gauge else t, add_func=lambda (a1, b1), (a2, b2): (a1+a2, b1+b2))
+ self.multivalue_undefined_means_0 = multivalue_undefined_means_0
+ self.default_func = default_func
class DataStream(object):
def __init__(self, desc, dataviews):
class HistoryDatabase(object):
@classmethod
- def from_nothing(cls, datastream_descriptions):
- return cls(dict(
- (ds_name, DataStream(ds_desc, dict(
- (dv_name, DataView(dv_desc, ds_desc, 0, dv_desc.bin_count*[(0, 0)]))
- for dv_name, dv_desc in ds_desc.dataview_descriptions.iteritems()
- )))
- for ds_name, ds_desc in datastream_descriptions.iteritems()
- ))
-
- @classmethod
- def from_file(cls, datastream_descriptions, filename):
- try:
- data = json.loads(open(filename, 'rb').read())
- except Exception: # XXX
- log.err()
- data = {}
+ def from_obj(cls, datastream_descriptions, obj={}):
+ def convert_bin(bin):
+ if isinstance(bin, dict):
+ return bin
+ total, count = bin
+ if not isinstance(total, dict):
+ total = {'null': total}
+ return dict((k, (v, count)) for k, v in total.iteritems()) if count else {}
def get_dataview(ds_name, ds_desc, dv_name, dv_desc):
- if ds_name in data:
- ds_data = data[ds_name]
+ if ds_name in obj:
+ ds_data = obj[ds_name]
if dv_name in ds_data:
dv_data = ds_data[dv_name]
if dv_data['bin_width'] == dv_desc.bin_width and len(dv_data['bins']) == dv_desc.bin_count:
- return DataView(dv_desc, ds_desc, dv_data['last_bin_end'], dv_data['bins'])
- return DataView(dv_desc, ds_desc, 0, dv_desc.bin_count*[(0, 0)])
+ return DataView(dv_desc, ds_desc, dv_data['last_bin_end'], map(convert_bin, dv_data['bins']))
+ elif ds_desc.default_func is None:
+ return DataView(dv_desc, ds_desc, 0, dv_desc.bin_count*[{}])
+ else:
+ return ds_desc.default_func(ds_name, ds_desc, dv_name, dv_desc, obj)
return cls(dict(
(ds_name, DataStream(ds_desc, dict(
(dv_name, get_dataview(ds_name, ds_desc, dv_name, dv_desc))
def __init__(self, datastreams):
self.datastreams = datastreams
- def write(self, filename):
- _atomic_write(filename, json.dumps(
- dict((ds_name, dict((dv_name, dict(last_bin_end=dv.last_bin_end, bin_width=dv.desc.bin_width, bins=dv.bins))
- for dv_name, dv in ds.dataviews.iteritems())) for ds_name, ds in self.datastreams.iteritems())
- ))
+ def to_obj(self):
+ return dict((ds_name, dict((dv_name, dict(last_bin_end=dv.last_bin_end, bin_width=dv.desc.bin_width, bins=dv.bins))
+ for dv_name, dv in ds.dataviews.iteritems())) for ds_name, ds in self.datastreams.iteritems())