From: forrest Date: Mon, 4 Jul 2011 09:22:27 +0000 (+0000) Subject: reworking X-Git-Tag: 0.8.2~400 X-Git-Url: https://git.novaco.in/?a=commitdiff_plain;h=176dd2aa7ed7abbb2146d45dcc993177c339dfc0;p=p2pool.git reworking git-svn-id: svn://forre.st/p2pool@1368 470744a7-cac9-478e-843e-5ec1b25c69e8 --- diff --git a/p2pool/bitcoin/base58.py b/p2pool/bitcoin/base58.py new file mode 100644 index 0000000..c798e39 --- /dev/null +++ b/p2pool/bitcoin/base58.py @@ -0,0 +1,10 @@ +from p2pool.util import bases + +base58_alphabet = '123456789ABCDEFGHJKLMNPQRSTUVWXYZabcdefghijkmnopqrstuvwxyz' + +def base58_encode(data): + return base58_alphabet[0]*(len(data) - len(data.lstrip(chr(0)))) + bases.natural_to_string(bases.string_to_natural(data), base58_alphabet) + +def base58_decode(data): + return chr(0)*(len(data) - len(data.lstrip(base58_alphabet[0]))) + bases.natural_to_string(bases.string_to_natural(data, base58_alphabet)) + diff --git a/p2pool/bitcoin/data.py b/p2pool/bitcoin/data.py index 2e37da2..59a7f8d 100644 --- a/p2pool/bitcoin/data.py +++ b/p2pool/bitcoin/data.py @@ -1,7 +1,12 @@ +from __future__ import division + import struct import StringIO import hashlib +from . import base58 +from p2pool.util import bases + class EarlyEnd(Exception): pass @@ -39,6 +44,20 @@ class Type(object): data = self._pack(obj) assert self._unpack(data) == obj return data + + + def pack_base58(self, obj): + return base58.base58_encode(self.pack(obj)) + + def unpack_base58(self, base58_data): + return self.unpack(base58.base58_decode(base58_data)) + + + def hash160(self, obj): + return ripemdsha(self.pack(obj)) + + def hash256(self, obj): + return doublesha(self.pack(obj)) class VarIntType(Type): def read(self, file): @@ -120,6 +139,8 @@ class HashType(Type): return int(data[::-1].encode('hex'), 16) def write(self, file, item): + if item >= 2**256: + raise ValueError("invalid hash value") file.write(('%064x' % (item,)).decode('hex')[::-1]) class ShortHashType(Type): @@ -130,7 +151,9 @@ class ShortHashType(Type): return int(data[::-1].encode('hex'), 16) def write(self, file, item): - file.write(('%020x' % (item,)).decode('hex')[::-1]) + if item >= 2**160: + raise ValueError("invalid hash value") + file.write(('%040x' % (item,)).decode('hex')[::-1]) class ListType(Type): def __init__(self, type): @@ -199,263 +222,23 @@ class ComposedType(Type): for key, type_ in self.fields: type_.write(file, item[key]) -address_type = ComposedType([ - ('services', StructType('H')), -]) - -tx_type = ComposedType([ - ('version', StructType(' 1: - hash_list = [doublesha(merkle_record_type.pack(dict(left=left, right=left if right is None else right))) - for left, right in zip(hash_list[::2], hash_list[1::2] + [None])] - return hash_list[0] - -def tx_hash(tx): - return doublesha(tx_type.pack(tx)) - -def block_hash(header): - return doublesha(block_header_type.pack(header)) - -class EarlyEnd(Exception): - pass - -class LateEnd(Exception): - pass - -class Type(object): - # the same data can have only one unpacked representation, but multiple packed binary representations +class ChecksummedType(Type): + def __init__(self, inner): + self.inner = inner - def _unpack(self, data): - f = StringIO.StringIO(data) - - obj = self.read(f) + def read(self, file): + obj = self.inner.read(file) + data = self.inner.pack(obj) - if f.tell() != len(data): - raise LateEnd('underread ' + repr((self, data))) + if file.read(4) != hashlib.sha256(hashlib.sha256(data).digest()).digest()[:4]: + raise ValueError("invalid checksum") return obj - def unpack(self, data): - obj = self._unpack(data) - assert self._unpack(self._pack(obj)) == obj - return obj - - def _pack(self, obj): - f = StringIO.StringIO() - - self.write(f, obj) - - data = f.getvalue() - - return data - - def pack(self, obj): - data = self._pack(obj) - assert self._unpack(data) == obj - return data - -class VarIntType(Type): - def read(self, file): - data = file.read(1) - if len(data) != 1: - raise EarlyEnd() - first, = struct.unpack(' 1: hash_list = [doublesha(merkle_record_type.pack(dict(left=left, right=left if right is None else right))) for left, right in zip(hash_list[::2], hash_list[1::2] + [None])] @@ -518,8 +301,67 @@ def tx_hash(tx): def block_hash(header): return doublesha(block_header_type.pack(header)) +def shift_left(n, m): + # python: :( + if m < 0: + return n >> -m + return n << m + def bits_to_target(bits): - return (bits & 0x00ffffff) * 2 ** (8 * ((bits >> 24) - 3)) + bits = bits[::-1] + length = ord(bits[0]) + return bases.string_to_natural((bits[1:] + "\0"*length)[:length]) + +def old_bits_to_target(bits): + return shift_left(bits & 0x00ffffff, 8 * ((bits >> 24) - 3)) + +def about_equal(a, b): + if a == b: return True + return abs(a-b)/((abs(a)+abs(b))/2) < .01 + +def compress_target_to_bits(target): # loses precision + print + print "t", target + n = bases.natural_to_string(target) + print "n", n.encode('hex') + bits = chr(len(n)) + n[:3].ljust(3, '\0') + bits = bits[::-1] + print "bits", bits.encode('hex') + print "new", bits_to_target(bits) + print "old", old_bits_to_target(struct.unpack(" share2 - self.highest = util.Variable(None) # hash + self.highest = variable.Variable(None) # hash self.requesting = set() self.request_map = {} @@ -122,36 +120,23 @@ def get_last_p2pool_block_hash(current_block_hash, get_block, net): print block_hash = block['header']['previous_block'] +@deferral.retry('Error getting work from bitcoind:', 1) @defer.inlineCallbacks def getwork(bitcoind): - while True: - try: - # a block could arrive in between these two queries - getwork_df, height_df = bitcoind.rpc_getwork(), bitcoind.rpc_getblocknumber() - try: - getwork, height = bitcoin.getwork.BlockAttempt.from_getwork((yield getwork_df)), (yield height_df) - finally: - # get rid of residual errors - getwork_df.addErrback(lambda fail: None) - height_df.addErrback(lambda fail: None) - except: - print - print 'Error getting work from bitcoind:' - traceback.print_exc() - print - - - yield util.sleep(1) - - continue - defer.returnValue((getwork, height)) + # a block could arrive in between these two queries + getwork_df, height_df = bitcoind.rpc_getwork(), bitcoind.rpc_getblocknumber() + try: + getwork, height = bitcoin.getwork.BlockAttempt.from_getwork((yield getwork_df)), (yield height_df) + finally: + # get rid of residual errors + getwork_df.addErrback(lambda fail: None) + height_df.addErrback(lambda fail: None) + defer.returnValue((getwork, height)) @defer.inlineCallbacks def main(args): try: - net = p2pool.Testnet if args.testnet else p2pool.Main - print 'p2pool (version %s)' % (__version__,) print @@ -168,7 +153,7 @@ def main(args): # connect to bitcoind over bitcoin-p2p and do checkorder to get pubkey to send payouts to print "Testing bitcoind P2P connection to '%s:%s'..." % (args.bitcoind_address, args.bitcoind_p2p_port) - factory = bitcoin.p2p.ClientFactory(args.testnet) + factory = bitcoin.p2p.ClientFactory(args.net) reactor.connectTCP(args.bitcoind_address, args.bitcoind_p2p_port, factory) while True: @@ -188,7 +173,7 @@ def main(args): print else: break - yield util.sleep(1) + yield deferral.sleep(1) print ' ...success!' print ' Payout script:', my_script.encode('hex') @@ -199,24 +184,24 @@ def main(args): block = yield (yield factory.getProtocol()).get_block(block_hash) print 'Got block %x' % (block_hash,) defer.returnValue(block) - get_block = util.DeferredCacher(real_get_block, expiring_dict.ExpiringDict(3600)) + get_block = deferral.DeferredCacher(real_get_block, expiring_dict.ExpiringDict(3600)) - get_raw_transaction = util.DeferredCacher(lambda tx_hash: bitcoind.rpc_getrawtransaction('%x' % tx_hash), expiring_dict.ExpiringDict(100)) + get_raw_transaction = deferral.DeferredCacher(lambda tx_hash: bitcoind.rpc_getrawtransaction('%x' % tx_hash), expiring_dict.ExpiringDict(100)) chains = expiring_dict.ExpiringDict(300) def get_chain(chain_id_data): return chains.setdefault(chain_id_data, Chain(chain_id_data)) # information affecting work that should trigger a long-polling update - current_work = util.Variable(None) + current_work = variable.Variable(None) # information affecting work that should not trigger a long-polling update - current_work2 = util.Variable(None) + current_work2 = variable.Variable(None) share_dbs = [db.SQLiteDict(sqlite3.connect(filename, isolation_level=None), 'shares') for filename in args.store_shares] @defer.inlineCallbacks def set_real_work(): work, height = yield getwork(bitcoind) - last_p2pool_block_hash = (yield get_last_p2pool_block_hash(work.previous_block, get_block, net)) + last_p2pool_block_hash = (yield get_last_p2pool_block_hash(work.previous_block, get_block, args.net)) chain = get_chain(p2pool.chain_id_type.pack(dict(last_p2pool_block_hash=last_p2pool_block_hash, bits=work.bits))) current_work.set(dict( version=work.version, @@ -247,7 +232,7 @@ def main(args): share2.flag_shared() def p2p_share(share, peer=None): - if share.hash <= conv.bits_to_target(share.header['bits']): + if share.hash <= bitcoin.data.bits_to_target(share.header['bits']): print print 'GOT BLOCK! Passing to bitcoind! %x' % (share.hash,) #print share.__dict__ @@ -258,7 +243,7 @@ def main(args): print 'No bitcoind connection! Erp!' chain = get_chain(share.chain_id_data) - res = chain.accept(share, net) + res = chain.accept(share, args.net) if res == 'good': share2 = chain.share2s[share.hash] @@ -335,22 +320,19 @@ def main(args): ip, port = x.split(':') return ip, int(port) else: - return x, {False: 9333, True: 19333}[args.testnet] + return x, args.net.P2P_PORT - if args.testnet: - nodes = [('72.14.191.28', 19333)] - else: - nodes = [('72.14.191.28', 9333)] + nodes = [('72.14.191.28', args.net.P2P_PORT)] try: - nodes.append(((yield reactor.resolve('p2pool.forre.st')), {False: 9333, True: 19333}[args.testnet])) + nodes.append(((yield reactor.resolve('p2pool.forre.st')), args.net.P2P_PORT)) except: traceback.print_exc() p2p_node = p2p.Node( current_work=current_work, port=args.p2pool_port, - net=net, - addr_store=db.SQLiteDict(sqlite3.connect(os.path.join(os.path.dirname(__file__), 'addrs.dat'), isolation_level=None), net.ADDRS_TABLE), + net=args.net, + addr_store=db.SQLiteDict(sqlite3.connect(os.path.join(os.path.dirname(__file__), 'addrs.dat'), isolation_level=None), args.net.ADDRS_TABLE), mode=0 if args.low_bandwidth else 1, preferred_addrs=map(parse, args.p2pool_nodes) + nodes, ) @@ -395,18 +377,18 @@ def main(args): new_script=my_script, subsidy=(50*100000000 >> state['height']//210000) + sum(tx.value_in - tx.value_out for tx in extra_txs), nonce=struct.pack("= len(self.buf[0]): - x = self.buf.popleft() - self.buf_len -= len(x) - self.pos -= len(x) - - data.append(seg) - wants -= len(seg) - return ''.join(data) - -def _DataChunker(receiver): - wants = receiver.next() - buf = StringBuffer() - - while True: - if len(buf) >= wants: - wants = receiver.send(buf.get(wants)) - else: - buf.add((yield)) -def DataChunker(receiver): - ''' - Produces a function that accepts data that is input into a generator - (receiver) in response to the receiver yielding the size of data to wait on - ''' - x = _DataChunker(receiver) - x.next() - return x.send - -class ReplyMatcher(object): - def __init__(self, func, timeout=5): - self.func = func - self.timeout = timeout - self.map = {} - - def __call__(self, id): - try: - self.func(id) - uniq = random.randrange(2**256) - df = defer.Deferred() - def timeout(): - df, timer = self.map[id].pop(uniq) - df.errback(failure.Failure(defer.TimeoutError())) - if not self.map[id]: - del self.map[id] - self.map.setdefault(id, {})[uniq] = (df, reactor.callLater(self.timeout, timeout)) - return df - except: - import traceback - traceback.print_exc() - - def got_response(self, id, resp): - if id not in self.map: - return - for df, timer in self.map.pop(id).itervalues(): - timer.cancel() - df.callback(resp) - -class GenericDeferrer(object): - def __init__(self, max_id, func, timeout=5): - self.max_id = max_id - self.func = func - self.timeout = timeout - self.map = {} - - def __call__(self, *args, **kwargs): - while True: - id = random.randrange(self.max_id) - if id not in self.map: - break - df = defer.Deferred() - def timeout(): - self.map.pop(id) - df.errback(failure.Failure(defer.TimeoutError())) - timer = reactor.callLater(self.timeout, timeout) - self.func(id, *args, **kwargs) - self.map[id] = df, timer - return df - - def got_response(self, id, resp): - if id not in self.map: - return - df, timer = self.map.pop(id) - timer.cancel() - df.callback(resp) - -class NotNowError(Exception): - pass - -class DeferredCacher(object): - def __init__(self, func, backing=None): - if backing is None: - backing = {} - - self.func = func - self.backing = backing - self.waiting = {} - - @defer.inlineCallbacks - def __call__(self, key): - if key in self.waiting: - yield self.waiting[key] - - if key in self.backing: - defer.returnValue(self.backing[key]) - else: - self.waiting[key] = defer.Deferred() - try: - value = yield self.func(key) - finally: - self.waiting.pop(key).callback(None) - - self.backing[key] = value - defer.returnValue(value) - - def call_now(self, key): - if key in self.waiting: - raise NotNowError() - - if key in self.backing: - return self.backing[key] - else: - self.waiting[key] = defer.Deferred() - def cb(value): - self.backing[key] = value - self.waiting.pop(key).callback(None) - def eb(fail): - self.waiting.pop(key).callback(None) - fail.printTraceback() - self.func(key).addCallback(cb).addErrback(eb) - raise NotNowError() - -def pubkey_to_address(pubkey, testnet): - if len(pubkey) != 65: - raise ValueError('invalid pubkey') - version = 111 if testnet else 0 - key_hash = chr(version) + hashlib.new('ripemd160', hashlib.sha256(pubkey).digest()).digest() - checksum = hashlib.sha256(hashlib.sha256(key_hash).digest()).digest()[:4] - return base58_encode(key_hash + checksum) - -def base58_encode(data): - return '1'*(len(data) - len(data.lstrip(chr(0)))) + natural_to_string(string_to_natural(data), '123456789ABCDEFGHJKLMNPQRSTUVWXYZabcdefghijkmnopqrstuvwxyz') - -def natural_to_string(n, alphabet=None, min_width=1): - if alphabet is None: - s = '%x' % (n,) - if len(s) % 2: - s = '\0' + x - return s.decode('hex').rjust(min_width, '\x00') - res = [] - while n: - n, x = divmod(n, len(alphabet)) - res.append(alphabet[x]) - res.reverse() - return ''.join(res).rjust(min_width, '\x00') - -def string_to_natural(s, alphabet=None): - if alphabet is None: - s = s.encode('hex') - return int(s, 16) - if not s or (s != alphabet[0] and s.startswith(alphabet[0])): - raise ValueError() - return sum(alphabet.index(char) * len(alphabet)**i for i, char in enumerate(reversed(s))) - - -class DictWrapper(object): - def encode_key(self, key): - return key - def decode_key(self, encoded_key): - return encoded_key - def encode_value(self, value): - return value - def decode_value(self, encoded_value): - return encoded_value - - def __init__(self, inner): - self.inner = inner - - def __len__(self): - return len(self.inner) - - def __contains__(self, key): - return self.encode_key(key) in self.inner - - def __getitem__(self, key): - return self.decode_value(self.inner[self.encode_key(key)]) - def __setitem__(self, key, value): - self.inner[self.encode_key(key)] = self.encode_value(value) - def __delitem__(self, key): - del self.inner[self.encode_key(key)] - - def __iter__(self): - for encoded_key in self.inner: - yield self.decode_key(encoded_key) - def iterkeys(self): - return iter(self) - def keys(self): - return list(self.iterkeys()) - - def itervalue(self): - for encoded_value in self.inner.itervalues(): - yield self.decode_value(encoded_value) - def values(self): - return list(self.itervalue()) - - def iteritems(self): - for key, value in self.inner.iteritems(): - yield self.decode_key(key), self.decode_value(value) - def items(self): - return list(self.iteritems()) - -def update_dict(d, **replace): - d = d.copy() - for k, v in replace.iteritems(): - if v is None: - del d[k] - else: - d[k] = v - return d diff --git a/p2pool/util/__init__.py b/p2pool/util/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/p2pool/util/bases.py b/p2pool/util/bases.py new file mode 100644 index 0000000..9c45589 --- /dev/null +++ b/p2pool/util/bases.py @@ -0,0 +1,42 @@ +def natural_to_string(n, alphabet=None, min_width=0): + if alphabet is None: + s = '%x' % (n,) + if len(s) % 2: + s = '0' + s + return s.decode('hex').lstrip('\x00').rjust(min_width, '\x00') + assert len(set(alphabet)) == len(alphabet) + res = [] + while n: + n, x = divmod(n, len(alphabet)) + res.append(alphabet[x]) + res.reverse() + return ''.join(res).rjust(min_width, '\x00') + +def string_to_natural(s, alphabet=None): + if alphabet is None: + s = s.encode('hex') + return int("0"+s, 16) + assert len(set(alphabet)) == len(alphabet) + if not s or (s != alphabet[0] and s.startswith(alphabet[0])): + raise ValueError() + return sum(alphabet.index(char) * len(alphabet)**i for i, char in enumerate(reversed(s))) + +import random + +def generate_alphabet(): + if random.randrange(2): + return None + else: + a = map(chr, xrange(256)) + random.shuffle(a) + return a[:random.randrange(2, len(a))] + +if __name__ == '__main__': + while True: + alphabet = generate_alphabet() + for i in xrange(1000): + n = random.randrange(100000000000000000000000000000) + s = natural_to_string(n, alphabet) + n2 = string_to_natural(s, alphabet) + print n, s.encode('hex'), n2 + assert n == n2 diff --git a/p2pool/util/datachunker.py b/p2pool/util/datachunker.py new file mode 100644 index 0000000..c627a20 --- /dev/null +++ b/p2pool/util/datachunker.py @@ -0,0 +1,50 @@ +import collections + +class StringBuffer(object): + 'Buffer manager with great worst-case behavior' + + def __init__(self, data=''): + self.buf = collections.deque([data]) + self.buf_len = len(data) + self.pos = 0 + + def __len__(self): + return self.buf_len - self.pos + + def add(self, data): + self.buf.append(data) + self.buf_len += len(data) + + def get(self, wants): + if self.buf_len - self.pos < wants: + raise IndexError('not enough data') + data = [] + while wants: + seg = self.buf[0][self.pos:self.pos+wants] + self.pos += len(seg) + while self.buf and self.pos >= len(self.buf[0]): + x = self.buf.popleft() + self.buf_len -= len(x) + self.pos -= len(x) + + data.append(seg) + wants -= len(seg) + return ''.join(data) + +def _DataChunker(receiver): + wants = receiver.next() + buf = StringBuffer() + + while True: + if len(buf) >= wants: + wants = receiver.send(buf.get(wants)) + else: + buf.add((yield)) +def DataChunker(receiver): + ''' + Produces a function that accepts data that is input into a generator + (receiver) in response to the receiver yielding the size of data to wait on + ''' + x = _DataChunker(receiver) + x.next() + return x.send diff --git a/p2pool/db.py b/p2pool/util/db.py similarity index 100% rename from p2pool/db.py rename to p2pool/util/db.py diff --git a/p2pool/util/deferral.py b/p2pool/util/deferral.py new file mode 100644 index 0000000..a5c1c67 --- /dev/null +++ b/p2pool/util/deferral.py @@ -0,0 +1,163 @@ +from __future__ import division + +import random +import traceback + +from twisted.internet import defer, reactor +from twisted.python import failure + +def sleep(t): + d = defer.Deferred() + reactor.callLater(t, d.callback, None) + return d + +def retry(message, delay): + ''' + @retry('Error getting block:', 1) + @defer.inlineCallbacks + def get_block(hash): + ... + ''' + + def retry2(func): + @defer.inlineCallbacks + def f(*args, **kwargs): + while True: + try: + result = yield func(*args, **kwargs) + except: + print + print message + traceback.print_exc() + print + + yield sleep(delay) + else: + defer.returnValue(result) + return f + return retry2 + +class ReplyMatcher(object): + ''' + Converts request/got response interface to deferred interface + ''' + + def __init__(self, func, timeout=5): + self.func = func + self.timeout = timeout + self.map = {} + + def __call__(self, id): + self.func(id) + uniq = random.randrange(2**256) + df = defer.Deferred() + def timeout(): + df, timer = self.map[id].pop(uniq) + df.errback(failure.Failure(defer.TimeoutError())) + if not self.map[id]: + del self.map[id] + self.map.setdefault(id, {})[uniq] = (df, reactor.callLater(self.timeout, timeout)) + return df + + def got_response(self, id, resp): + if id not in self.map: + return + for df, timer in self.map.pop(id).itervalues(): + timer.cancel() + df.callback(resp) + +class GenericDeferrer(object): + ''' + Converts query with identifier/got response interface to deferred interface + ''' + + def __init__(self, max_id, func, timeout=5): + self.max_id = max_id + self.func = func + self.timeout = timeout + self.map = {} + + def __call__(self, *args, **kwargs): + while True: + id = random.randrange(self.max_id) + if id not in self.map: + break + df = defer.Deferred() + def timeout(): + self.map.pop(id) + df.errback(failure.Failure(defer.TimeoutError())) + timer = reactor.callLater(self.timeout, timeout) + self.func(id, *args, **kwargs) + self.map[id] = df, timer + return df + + def got_response(self, id, resp): + if id not in self.map: + return + df, timer = self.map.pop(id) + timer.cancel() + df.callback(resp) + +class NotNowError(Exception): + pass + +class DeferredCacher(object): + ''' + like memoize, but for functions that return Deferreds + + @DeferredCacher + def f(x): + ... + return df + + @DeferredCacher.with_backing(bsddb.hashopen(...)) + def f(x): + ... + return df + ''' + + @classmethod + def with_backing(cls, backing): + return lambda func: cls(func, backing) + + def __init__(self, func, backing=None): + if backing is None: + backing = {} + + self.func = func + self.backing = backing + self.waiting = {} + + @defer.inlineCallbacks + def __call__(self, key): + if key in self.waiting: + yield self.waiting[key] + + if key in self.backing: + defer.returnValue(self.backing[key]) + else: + self.waiting[key] = defer.Deferred() + try: + value = yield self.func(key) + finally: + self.waiting.pop(key).callback(None) + + self.backing[key] = value + defer.returnValue(value) + + def call_now(self, key): + if key in self.waiting: + raise NotNowError() + + if key in self.backing: + return self.backing[key] + else: + self.waiting[key] = defer.Deferred() + def cb(value): + self.backing[key] = value + self.waiting.pop(key).callback(None) + def eb(fail): + self.waiting.pop(key).callback(None) + fail.printTraceback() + self.func(key).addCallback(cb).addErrback(eb) + raise NotNowError() diff --git a/p2pool/util/deferred_resource.py b/p2pool/util/deferred_resource.py new file mode 100644 index 0000000..dbc844e --- /dev/null +++ b/p2pool/util/deferred_resource.py @@ -0,0 +1,24 @@ +from __future__ import division + +from twisted.internet import defer +from twisted.web import resource, server + +class DeferredResource(resource.Resource): + def render(self, request): + def finish(x): + if request._disconnected: + return + if x is not None: + request.write(x) + request.finish() + + def finish_error(fail): + if request._disconnected: + return + request.setResponseCode(500) # won't do anything if already written to + request.write('---ERROR---') + request.finish() + fail.printTraceback() + + defer.maybeDeferred(resource.Resource.render, self, request).addCallbacks(finish, finish_error) + return server.NOT_DONE_YET diff --git a/p2pool/util/dicts.py b/p2pool/util/dicts.py new file mode 100644 index 0000000..b193f9d --- /dev/null +++ b/p2pool/util/dicts.py @@ -0,0 +1,54 @@ +class DictWrapper(object): + def encode_key(self, key): + return key + def decode_key(self, encoded_key): + return encoded_key + def encode_value(self, value): + return value + def decode_value(self, encoded_value): + return encoded_value + + def __init__(self, inner): + self.inner = inner + + def __len__(self): + return len(self.inner) + + def __contains__(self, key): + return self.encode_key(key) in self.inner + + def __getitem__(self, key): + return self.decode_value(self.inner[self.encode_key(key)]) + def __setitem__(self, key, value): + self.inner[self.encode_key(key)] = self.encode_value(value) + def __delitem__(self, key): + del self.inner[self.encode_key(key)] + + def __iter__(self): + for encoded_key in self.inner: + yield self.decode_key(encoded_key) + def iterkeys(self): + return iter(self) + def keys(self): + return list(self.iterkeys()) + + def itervalue(self): + for encoded_value in self.inner.itervalues(): + yield self.decode_value(encoded_value) + def values(self): + return list(self.itervalue()) + + def iteritems(self): + for key, value in self.inner.iteritems(): + yield self.decode_key(key), self.decode_value(value) + def items(self): + return list(self.iteritems()) + +def update_dict(d, **replace): + d = d.copy() + for k, v in replace.iteritems(): + if v is None: + del d[k] + else: + d[k] = v + return d diff --git a/p2pool/expiring_dict.py b/p2pool/util/expiring_dict.py similarity index 100% rename from p2pool/expiring_dict.py rename to p2pool/util/expiring_dict.py diff --git a/p2pool/jsonrpc.py b/p2pool/util/jsonrpc.py similarity index 93% rename from p2pool/jsonrpc.py rename to p2pool/util/jsonrpc.py index 1eb33d0..04f2ebd 100644 --- a/p2pool/jsonrpc.py +++ b/p2pool/util/jsonrpc.py @@ -7,7 +7,7 @@ import traceback from twisted.internet import defer from twisted.web import client -import util +import deferred_resource class Error(Exception): def __init__(self, code, message, data=None): @@ -62,7 +62,7 @@ class Proxy(object): return lambda *params: self.callRemote(attr[len('rpc_'):], *params) raise AttributeError('%r object has no attribute %r' % (self.__class__.__name__, attr)) -class Server(util.DeferredResource): +class Server(deferred_resource.DeferredResource): extra_headers = None @defer.inlineCallbacks @@ -111,11 +111,13 @@ class Server(util.DeferredResource): if id_ is None: return + #print (df.result.type, df.result.value, df.result.tb) + #print df.result.__dict__ try: result = yield df - except Error, e: - raise e - except Exception, e: + #except Error, e: + #w raise e + except Exception: print 'Squelched JSON method error:' traceback.print_exc() raise Error(-32099, u'Unknown error') diff --git a/p2pool/util/math.py b/p2pool/util/math.py new file mode 100644 index 0000000..25f155c --- /dev/null +++ b/p2pool/util/math.py @@ -0,0 +1,17 @@ +from __future__ import division + +def median(x, use_float=True): + # there exist better algorithms... + y = sorted(x) + left = (len(y) - 1)//2 + right = len(y)//2 + sum = y[left] + y[right] + if use_float: + return sum/2 + else: + return sum//2 + +def shuffled(x): + x = list(x) + random.shuffle(x) + return x diff --git a/p2pool/util/variable.py b/p2pool/util/variable.py new file mode 100644 index 0000000..dfa1c3e --- /dev/null +++ b/p2pool/util/variable.py @@ -0,0 +1,57 @@ +import itertools + +from twisted.internet import defer + +class Event(object): + def __init__(self): + self.observers = {} + self.one_time_observers = {} + self.id_generator = itertools.count() + + def watch(self, func): + id = self.id_generator.next() + self.observers[id] = func + return id + def unwatch(self, id): + self.observers.pop(id) + + def watch_one_time(self, func): + id = self.id_generator.next() + self.one_time_observers[id] = func + return id + def unwatch_one_time(self, id): + self.one_time_observers.pop(id) + + def happened(self, event=None): + for func in self.observers.itervalues(): + func(event) + + one_time_observers = self.one_time_observers + self.one_time_observers = {} + for func in one_time_observers.itervalues(): + func(event) + + def get_deferred(self): + df = defer.Deferred() + self.watch_one_time(df.callback) + return df + +class Variable(object): + def __init__(self, value): + self.value = value + self.changed = Event() + + def set(self, value): + if value == self.value: + return + + self.value = value + self.changed.happened(value) + + def get_not_none(self): + if self.value is not None: + return defer.succeed(self.value) + else: + df = defer.Deferred() + self.changed.watch_one_time(df.callback) + return df diff --git a/p2pool/worker_interface.py b/p2pool/worker_interface.py index d781e15..3d849dc 100644 --- a/p2pool/worker_interface.py +++ b/p2pool/worker_interface.py @@ -1,12 +1,13 @@ from __future__ import division +import json +import traceback + from twisted.internet import defer -import json -import jsonrpc -import util +from util import jsonrpc, deferred_resource -class LongPollingWorkerInterface(util.DeferredResource): +class LongPollingWorkerInterface(deferred_resource.DeferredResource): def __init__(self, work, compute): self.work = work self.compute = compute @@ -42,7 +43,10 @@ class WorkerInterface(jsonrpc.Server): self.putChild('', self) def rpc_getwork(self, data=None): + try: if data is not None: return self.response_callback(data) return self.compute(self.work.value) + except ValueError: + traceback.print_exc()