reworking
authorforrest <forrest@470744a7-cac9-478e-843e-5ec1b25c69e8>
Mon, 4 Jul 2011 09:22:27 +0000 (09:22 +0000)
committerforrest <forrest@470744a7-cac9-478e-843e-5ec1b25c69e8>
Mon, 4 Jul 2011 09:22:27 +0000 (09:22 +0000)
git-svn-id: svn://forre.st/p2pool@1368 470744a7-cac9-478e-843e-5ec1b25c69e8

19 files changed:
p2pool/bitcoin/base58.py [new file with mode: 0644]
p2pool/bitcoin/data.py
p2pool/bitcoin/p2p.py
p2pool/data.py
p2pool/main.py
p2pool/p2p.py
p2pool/util.py [deleted file]
p2pool/util/__init__.py [new file with mode: 0644]
p2pool/util/bases.py [new file with mode: 0644]
p2pool/util/datachunker.py [new file with mode: 0644]
p2pool/util/db.py [moved from p2pool/db.py with 100% similarity]
p2pool/util/deferral.py [new file with mode: 0644]
p2pool/util/deferred_resource.py [new file with mode: 0644]
p2pool/util/dicts.py [new file with mode: 0644]
p2pool/util/expiring_dict.py [moved from p2pool/expiring_dict.py with 100% similarity]
p2pool/util/jsonrpc.py [moved from p2pool/jsonrpc.py with 93% similarity]
p2pool/util/math.py [new file with mode: 0644]
p2pool/util/variable.py [new file with mode: 0644]
p2pool/worker_interface.py

diff --git a/p2pool/bitcoin/base58.py b/p2pool/bitcoin/base58.py
new file mode 100644 (file)
index 0000000..c798e39
--- /dev/null
@@ -0,0 +1,10 @@
+from p2pool.util import bases
+
+base58_alphabet = '123456789ABCDEFGHJKLMNPQRSTUVWXYZabcdefghijkmnopqrstuvwxyz'
+
+def base58_encode(data):
+    return base58_alphabet[0]*(len(data) - len(data.lstrip(chr(0)))) + bases.natural_to_string(bases.string_to_natural(data), base58_alphabet)
+
+def base58_decode(data):
+    return chr(0)*(len(data) - len(data.lstrip(base58_alphabet[0]))) + bases.natural_to_string(bases.string_to_natural(data, base58_alphabet))
+
index 2e37da2..59a7f8d 100644 (file)
@@ -1,7 +1,12 @@
+from __future__ import division
+
 import struct
 import StringIO
 import hashlib
 
+from . import base58
+from p2pool.util import bases
+
 class EarlyEnd(Exception):
     pass
 
@@ -39,6 +44,20 @@ class Type(object):
         data = self._pack(obj)
         assert self._unpack(data) == obj
         return data
+    
+    
+    def pack_base58(self, obj):
+        return base58.base58_encode(self.pack(obj))
+    
+    def unpack_base58(self, base58_data):
+        return self.unpack(base58.base58_decode(base58_data))
+        
+    
+    def hash160(self, obj):
+        return ripemdsha(self.pack(obj))
+    
+    def hash256(self, obj):
+        return doublesha(self.pack(obj))
 
 class VarIntType(Type):
     def read(self, file):
@@ -120,6 +139,8 @@ class HashType(Type):
         return int(data[::-1].encode('hex'), 16)
     
     def write(self, file, item):
+        if item >= 2**256:
+            raise ValueError("invalid hash value")
         file.write(('%064x' % (item,)).decode('hex')[::-1])
 
 class ShortHashType(Type):
@@ -130,7 +151,9 @@ class ShortHashType(Type):
         return int(data[::-1].encode('hex'), 16)
     
     def write(self, file, item):
-        file.write(('%020x' % (item,)).decode('hex')[::-1])
+        if item >= 2**160:
+            raise ValueError("invalid hash value")
+        file.write(('%040x' % (item,)).decode('hex')[::-1])
 
 class ListType(Type):
     def __init__(self, type):
@@ -199,263 +222,23 @@ class ComposedType(Type):
         for key, type_ in self.fields:
             type_.write(file, item[key])
 
-address_type = ComposedType([
-    ('services', StructType('<Q')),
-    ('address', IPV6AddressType()),
-    ('port', StructType('>H')),
-])
-
-tx_type = ComposedType([
-    ('version', StructType('<I')),
-    ('tx_ins', ListType(ComposedType([
-        ('previous_output', ComposedType([
-            ('hash', HashType()),
-            ('index', StructType('<I')),
-        ])),
-        ('script', VarStrType()),
-        ('sequence', StructType('<I')),
-    ]))),
-    ('tx_outs', ListType(ComposedType([
-        ('value', StructType('<Q')),
-        ('script', VarStrType()),
-    ]))),
-    ('lock_time', StructType('<I')),
-])
-
-block_header_type = ComposedType([
-    ('version', StructType('<I')),
-    ('previous_block', HashType()),
-    ('merkle_root', HashType()),
-    ('timestamp', StructType('<I')),
-    ('bits', StructType('<I')),
-    ('nonce', StructType('<I')),
-])
-
-block_type = ComposedType([
-    ('header', block_header_type),
-    ('txs', ListType(tx_type)),
-])
-
-def doublesha(data):
-    return HashType().unpack(hashlib.sha256(hashlib.sha256(data).digest()).digest())
-
-def ripemdsha(data):
-    return ShortHashType().unpack(hashlib.new('ripemd160', hashlib.sha256(data).digest()).digest())
-
-merkle_record_type = ComposedType([
-    ('left', HashType()),
-    ('right', HashType()),
-])
-
-def merkle_hash(tx_list):
-    hash_list = [doublesha(tx_type.pack(tx)) for tx in tx_list]
-    while len(hash_list) > 1:
-        hash_list = [doublesha(merkle_record_type.pack(dict(left=left, right=left if right is None else right)))
-            for left, right in zip(hash_list[::2], hash_list[1::2] + [None])]
-    return hash_list[0]
-
-def tx_hash(tx):
-    return doublesha(tx_type.pack(tx))
-
-def block_hash(header):
-    return doublesha(block_header_type.pack(header))
-
-class EarlyEnd(Exception):
-    pass
-
-class LateEnd(Exception):
-    pass
-
-class Type(object):
-    # the same data can have only one unpacked representation, but multiple packed binary representations
+class ChecksummedType(Type):
+    def __init__(self, inner):
+        self.inner = inner
     
-    def _unpack(self, data):
-        f = StringIO.StringIO(data)
-        
-        obj = self.read(f)
+    def read(self, file):
+        obj = self.inner.read(file)
+        data = self.inner.pack(obj)
         
-        if f.tell() != len(data):
-            raise LateEnd('underread ' + repr((self, data)))
+        if file.read(4) != hashlib.sha256(hashlib.sha256(data).digest()).digest()[:4]:
+            raise ValueError("invalid checksum")
         
         return obj
     
-    def unpack(self, data):
-        obj = self._unpack(data)
-        assert self._unpack(self._pack(obj)) == obj
-        return obj
-    
-    def _pack(self, obj):
-        f = StringIO.StringIO()
-        
-        self.write(f, obj)
-        
-        data = f.getvalue()
-        
-        return data
-    
-    def pack(self, obj):
-        data = self._pack(obj)
-        assert self._unpack(data) == obj
-        return data
-
-class VarIntType(Type):
-    def read(self, file):
-        data = file.read(1)
-        if len(data) != 1:
-            raise EarlyEnd()
-        first, = struct.unpack('<B', data)
-        if first == 0xff: desc = '<Q'
-        elif first == 0xfe: desc = '<I'
-        elif first == 0xfd: desc = '<H'
-        else: return first
-        length = struct.calcsize(desc)
-        data = file.read(length)
-        if len(data) != length:
-            raise EarlyEnd()
-        return struct.unpack(desc, data)[0]
-    
-    def write(self, file, item):
-        if item < 0xfd:
-            file.write(struct.pack('<B', item))
-        elif item <= 0xffff:
-            file.write(struct.pack('<BH', 0xfd, item))
-        elif item <= 0xffffffff:
-            file.write(struct.pack('<BI', 0xfe, item))
-        elif item <= 0xffffffffffffffff:
-            file.write(struct.pack('<BQ', 0xff, item))
-        else:
-            raise ValueError('int too large for varint')
-
-class VarStrType(Type):
-    def read(self, file):
-        length = VarIntType().read(file)
-        res = file.read(length)
-        if len(res) != length:
-            raise EarlyEnd('var str not long enough %r' % ((length, len(res), res),))
-        return res
-    
-    def write(self, file, item):
-        VarIntType().write(file, len(item))
-        file.write(item)
-
-class FixedStrType(Type):
-    def __init__(self, length):
-        self.length = length
-    
-    def read(self, file):
-        res = file.read(self.length)
-        if len(res) != self.length:
-            raise EarlyEnd('early EOF!')
-        return res
-    
     def write(self, file, item):
-        if len(item) != self.length:
-            raise ValueError('incorrect length!')
-        file.write(item)
-
-class EnumType(Type):
-    def __init__(self, inner, values):
-        self.inner = inner
-        self.values = values
-        
-        self.keys = {}
-        for k, v in values.iteritems():
-            if v in self.keys:
-                raise ValueError('duplicate value in values')
-            self.keys[v] = k
-    
-    def read(self, file):
-        return self.keys[self.inner.read(file)]
-    
-    def write(self, file, item):
-        self.inner.write(file, self.values[item])
-
-class HashType(Type):
-    def read(self, file):
-        data = file.read(256//8)
-        if len(data) != 256//8:
-            raise EarlyEnd('incorrect length!')
-        return int(data[::-1].encode('hex'), 16)
-    
-    def write(self, file, item):
-        file.write(('%064x' % (item,)).decode('hex')[::-1])
-
-class ShortHashType(Type):
-    def read(self, file):
-        data = file.read(160//8)
-        if len(data) != 160//8:
-            raise EarlyEnd('incorrect length!')
-        return int(data[::-1].encode('hex'), 16)
-    
-    def write(self, file, item):
-        file.write(('%020x' % (item,)).decode('hex')[::-1])
-
-class ListType(Type):
-    def __init__(self, type):
-        self.type = type
-    
-    def read(self, file):
-        length = VarIntType().read(file)
-        return [self.type.read(file) for i in xrange(length)]
-    
-    def write(self, file, item):
-        VarIntType().write(file, len(item))
-        for subitem in item:
-            self.type.write(file, subitem)
-
-class StructType(Type):
-    def __init__(self, desc):
-        self.desc = desc
-        self.length = struct.calcsize(self.desc)
-    
-    def read(self, file):
-        data = file.read(self.length)
-        if len(data) != self.length:
-            raise EarlyEnd()
-        res, = struct.unpack(self.desc, data)
-        return res
-    
-    def write(self, file, item):
-        data = struct.pack(self.desc, item)
-        if struct.unpack(self.desc, data)[0] != item:
-            # special test because struct doesn't error on some overflows
-            raise ValueError("item didn't survive pack cycle (%r)" % (item,))
-        file.write(data)
-
-class IPV6AddressType(Type):
-    def read(self, file):
-        data = file.read(16)
-        if len(data) != 16:
-            raise EarlyEnd()
-        if data[:12] != '00000000000000000000ffff'.decode('hex'):
-            raise ValueError("ipv6 addresses not supported yet")
-        return '::ffff:' + '.'.join(str(ord(x)) for x in data[12:])
-    
-    def write(self, file, item):
-        prefix = '::ffff:'
-        if not item.startswith(prefix):
-            raise ValueError("ipv6 addresses not supported yet")
-        item = item[len(prefix):]
-        bits = map(int, item.split('.'))
-        if len(bits) != 4:
-            raise ValueError("invalid address: %r" % (bits,))
-        data = '00000000000000000000ffff'.decode('hex') + ''.join(chr(x) for x in bits)
-        assert len(data) == 16, len(data)
+        data = self.inner.pack(item)
         file.write(data)
-
-class ComposedType(Type):
-    def __init__(self, fields):
-        self.fields = fields
-    
-    def read(self, file):
-        item = {}
-        for key, type_ in self.fields:
-            item[key] = type_.read(file)
-        return item
-    
-    def write(self, file, item):
-        for key, type_ in self.fields:
-            type_.write(file, item[key])
+        file.write(hashlib.sha256(hashlib.sha256(data).digest()).digest()[:4])
 
 address_type = ComposedType([
     ('services', StructType('<Q')),
@@ -485,7 +268,7 @@ block_header_type = ComposedType([
     ('previous_block', HashType()),
     ('merkle_root', HashType()),
     ('timestamp', StructType('<I')),
-    ('bits', StructType('<I')),
+    ('bits', FixedStrType(4)),
     ('nonce', StructType('<I')),
 ])
 
@@ -506,7 +289,7 @@ merkle_record_type = ComposedType([
 ])
 
 def merkle_hash(tx_list):
-    hash_list = [doublesha(tx_type.pack(tx)) for tx in tx_list]
+    hash_list = map(tx_hash, tx_list)
     while len(hash_list) > 1:
         hash_list = [doublesha(merkle_record_type.pack(dict(left=left, right=left if right is None else right)))
             for left, right in zip(hash_list[::2], hash_list[1::2] + [None])]
@@ -518,8 +301,67 @@ def tx_hash(tx):
 def block_hash(header):
     return doublesha(block_header_type.pack(header))
 
+def shift_left(n, m):
+    # python: :(
+    if m < 0:
+        return n >> -m
+    return n << m
+
 def bits_to_target(bits):
-    return (bits & 0x00ffffff) * 2 ** (8 * ((bits >> 24) - 3))
+    bits = bits[::-1]
+    length = ord(bits[0])
+    return bases.string_to_natural((bits[1:] + "\0"*length)[:length])
+
+def old_bits_to_target(bits):
+    return shift_left(bits & 0x00ffffff, 8 * ((bits >> 24) - 3))
+
+def about_equal(a, b):
+    if a == b: return True
+    return abs(a-b)/((abs(a)+abs(b))/2) < .01
+
+def compress_target_to_bits(target): # loses precision
+    print
+    print "t", target
+    n = bases.natural_to_string(target)
+    print "n", n.encode('hex')
+    bits = chr(len(n)) + n[:3].ljust(3, '\0')
+    bits = bits[::-1]
+    print "bits", bits.encode('hex')
+    print "new", bits_to_target(bits)
+    print "old", old_bits_to_target(struct.unpack("<I", bits)[0])
+    assert about_equal(bits_to_target(bits), target), (bits_to_target(bits), target)
+    assert about_equal(old_bits_to_target(struct.unpack("<I", bits)[0]), target), (old_bits_to_target(struct.unpack("<I", bits)[0]), target)
+    return bits
 
 def target_to_average_attempts(target):
     return 2**256//(target + 1)
+
+human_address_type = ChecksummedType(ComposedType([
+    ('version', StructType("<B")),
+    ('pubkey_hash', ShortHashType()),
+]))
+
+pubkey_type = FixedStrType(65)
+
+def pubkey_hash_to_address(pubkey_hash, net):
+    return human_address_type.pack_base58(dict(version=net.BITCOIN_ADDRESS_VERSION, pubkey_hash=pubkey_hash))
+
+def pubkey_to_address(pubkey, net):
+    return pubkey_hash_to_address(pubkey_type.hash160(pubkey), net)
+
+def address_to_pubkey_hash(address, net):
+    x = human_address_type.unpack_base58(address)
+    if x['version'] != net.BITCOIN_ADDRESS_VERSION:
+        raise ValueError('address not for this net!')
+    return x['pubkey_hash']
+
+class Mainnet(object):
+    BITCOIN_P2P_PREFIX = 'f9beb4d9'.decode('hex')
+    BITCOIN_P2P_PORT = 8333
+    BITCOIN_ADDRESS_VERSION = 0
+
+class Testnet(object):
+    BITCOIN_P2P_PREFIX = 'fabfb5da'.decode('hex')
+    BITCOIN_P2P_PORT = 18333
+    BITCOIN_ADDRESS_VERSION = 111
+
index ece3e2f..f68c984 100644 (file)
@@ -13,11 +13,11 @@ import traceback
 from twisted.internet import protocol, reactor
 
 from . import data as bitcoin_data
-import p2pool.util
+from p2pool.util import variable, datachunker, deferral
 
 class BaseProtocol(protocol.Protocol):
     def connectionMade(self):
-        self.dataReceived = p2pool.util.DataChunker(self.dataReceiver())
+        self.dataReceived = datachunker.DataChunker(self.dataReceiver())
     
     def dataReceiver(self):
         while True:
@@ -93,11 +93,8 @@ class BaseProtocol(protocol.Protocol):
         raise AttributeError(attr)
 
 class Protocol(BaseProtocol):
-    def __init__(self, testnet=False):
-        if testnet:
-            self._prefix = 'fabfb5da'.decode('hex')
-        else:
-            self._prefix = 'f9beb4d9'.decode('hex')
+    def __init__(self, net):
+        self._prefix = net.BITCOIN_P2P_PREFIX
     
     version = 0
     
@@ -206,10 +203,10 @@ class Protocol(BaseProtocol):
         self.version = self.version_after
         
         # connection ready
-        self.check_order = p2pool.util.GenericDeferrer(2**256, lambda id, order: self.send_checkorder(id=id, order=order))
-        self.submit_order = p2pool.util.GenericDeferrer(2**256, lambda id, order: self.send_submitorder(id=id, order=order))
-        self.get_block = p2pool.util.ReplyMatcher(lambda hash: self.send_getdata(requests=[dict(type='block', hash=hash)]))
-        self.get_block_header = p2pool.util.ReplyMatcher(lambda hash: self.send_getdata(requests=[dict(type='block', hash=hash)]))
+        self.check_order = deferral.GenericDeferrer(2**256, lambda id, order: self.send_checkorder(id=id, order=order))
+        self.submit_order = deferral.GenericDeferrer(2**256, lambda id, order: self.send_submitorder(id=id, order=order))
+        self.get_block = deferral.ReplyMatcher(lambda hash: self.send_getdata(requests=[dict(type='block', hash=hash)]))
+        self.get_block_header = deferral.ReplyMatcher(lambda hash: self.send_getdata(requests=[dict(type='block', hash=hash)]))
         
         if hasattr(self.factory, 'resetDelay'):
             self.factory.resetDelay()
@@ -249,15 +246,15 @@ class ClientFactory(protocol.ReconnectingClientFactory):
     
     maxDelay = 15
     
-    def __init__(self, testnet=False):
-        self.testnet = testnet
-        self.conn = p2pool.util.Variable(None)
+    def __init__(self, net):
+        self.net = net
+        self.conn = variable.Variable(None)
         
-        self.new_block = p2pool.util.Event()
-        self.new_tx = p2pool.util.Event()
+        self.new_block = variable.Event()
+        self.new_tx = variable.Event()
     
     def buildProtocol(self, addr):
-        p = self.protocol(self.testnet)
+        p = self.protocol(self.net)
         p.factory = self
         return p
     
index 3f2d18c..238a88d 100644 (file)
@@ -2,14 +2,9 @@ from __future__ import division
 
 from bitcoin import data as bitcoin_data
 
-chain_id_type = bitcoin_data.ComposedType([
-    ('last_p2pool_block_hash', bitcoin_data.HashType()),
-    ('bits', bitcoin_data.StructType('<I')),
-])
-
 share_data_type = bitcoin_data.ComposedType([
-    ('last_p2pool_block_hash', bitcoin_data.HashType()),
     ('previous_p2pool_share_hash', bitcoin_data.HashType()),
+    ('bits2', bitcoin_data.FixedStrType(4)),
     ('nonce', bitcoin_data.VarStrType()),
 ])
 
@@ -77,7 +72,6 @@ def txs_to_gentx_info(txs):
 
 def share_info_to_gentx_and_shares(share_info, chain, net):
     return generate_transaction(
-        last_p2pool_block_hash=share_info['share_data']['last_p2pool_block_hash'],
         previous_share2=chain.share2s[share_info['share_data']['previous_p2pool_share_hash']],
         nonce=share_info['share_data']['nonce'],
         new_script=share_info['new_script'],
@@ -161,6 +155,9 @@ def generate_transaction(last_p2pool_block_hash, previous_share2, new_script, su
     dests = sorted(amounts.iterkeys(), key=lambda script: (script == new_script, script))
     assert dests[-1] == new_script
     
+    pre_target = sum(bitcoin_data.target_to_average_attempts(share(x ago).target) for x in xrange(1000))/(share(1000 ago).timestamp - share(1 ago).timestamp)
+    bits2 = bitcoin_data.compress_target_to_bits(pre_target)
+    
     return dict(
         version=1,
         tx_ins=[dict(
@@ -172,6 +169,7 @@ def generate_transaction(last_p2pool_block_hash, previous_share2, new_script, su
                     last_p2pool_block_hash=last_p2pool_block_hash,
                     previous_p2pool_share_hash=previous_share2.share.hash if previous_share2 is not None else 2**256 - 1,
                     nonce=nonce,
+                    bits2=bits2,
                 ),
             )),
         )],
@@ -200,6 +198,25 @@ class Tracker(object):
             self.heads.add(share.hash)
             if share.previous_hash in self.heads:
                 self.heads.remove(share.previous_hash)
+    
+    def get_chain(self, start):
+        share_hash_to_get = start
+        while share_hash_to_get in self.shares:
+            share = self.shares[share_hash_to_get]
+            yield share
+            share_hash_to_get = share.previous_hash
+    
+    def best(self):
+        return max(self.heads, key=self.score_chain)
+    
+    def score_chain(self, start):
+        length = len(self.get_chain(start))
+        
+        score = 0
+        for share in itertools.islice(self.get_chain(start), 1000):
+            score += a
+        
+        return (min(length, 1000), score)
 
 if __name__ == '__main__':
     class FakeShare(object):
@@ -218,18 +235,20 @@ if __name__ == '__main__':
 
 # TARGET_MULTIPLIER needs to be less than the current difficulty to prevent miner clients from missing shares
 
-class Testnet(object):
+class Mainnet(bitcoin_data.Mainnet):
+    TARGET_MULTIPLIER = SPREAD = 600
+    ROOT_BLOCK = 0x6c9cb0589a44808d9a9361266a4ffb9fea2e2cf4d70bb2118b5
+    SCRIPT = '4104ffd03de44a6e11b9917f3a29f9443283d9871c9d743ef30d5eddcd37094b64d1b3d8090496b53256786bf5c82932ec23c3b74d9f05a6f95a8b5529352656664bac'.decode('hex')
+    IDENTIFIER = 0x7452839666e1f8f8
+    PREFIX = '2d4224bf18c87b87'.decode('hex')
+    ADDRS_TABLE = 'addrs'
+    P2P_PORT = 9333
+
+class Testnet(bitcoin_data.Testnet):
     TARGET_MULTIPLIER = SPREAD = 30
     ROOT_BLOCK = 0xd5070cd4f2987ad2191af71393731a2b143f094f7b84c9e6aa9a6a
     SCRIPT = '410403ad3dee8ab3d8a9ce5dd2abfbe7364ccd9413df1d279bf1a207849310465b0956e5904b1155ecd17574778f9949589ebfd4fb33ce837c241474a225cf08d85dac'.decode('hex')
     IDENTIFIER = 0x1ae3479e4eb6700a
     PREFIX = 'd19778c812754854'.decode('hex')
     ADDRS_TABLE = 'addrs_testnet'
-
-class Main(object):
-    TARGET_MULTIPLIER = SPREAD = 600
-    ROOT_BLOCK = 0x11a22c6e314b1a3f44cbbf50246187a37756ea8af4d41c43a8d6
-    SCRIPT = '4104ffd03de44a6e11b9917f3a29f9443283d9871c9d743ef30d5eddcd37094b64d1b3d8090496b53256786bf5c82932ec23c3b74d9f05a6f95a8b5529352656664bac'.decode('hex')
-    IDENTIFIER = 0x7452839666e1f8f8
-    PREFIX = '2d4224bf18c87b87'.decode('hex')
-    ADDRS_TABLE = 'addrs'
+    P2P_PORT = 19333
index 6eaf294..1800624 100644 (file)
@@ -7,6 +7,7 @@ import itertools
 import os
 import random
 import sqlite3
+import struct
 import subprocess
 import sys
 import traceback
@@ -15,12 +16,9 @@ from twisted.internet import defer, reactor
 from twisted.web import server
 
 import bitcoin.p2p, bitcoin.getwork, bitcoin.data
-import db
-import expiring_dict
-import jsonrpc
-import p2p
+from util import db, expiring_dict, jsonrpc, variable, deferral
+import p2pool.p2p as p2p
 import p2pool.data as p2pool
-import util
 import worker_interface
 
 try:
@@ -37,7 +35,7 @@ class Chain(object):
         self.last_p2pool_block_hash = p2pool.chain_id_type.unpack(chain_id_data)['last_p2pool_block_hash']
         
         self.share2s = {} # hash -> share2
-        self.highest = util.Variable(None) # hash
+        self.highest = variable.Variable(None) # hash
         
         self.requesting = set()
         self.request_map = {}
@@ -122,36 +120,23 @@ def get_last_p2pool_block_hash(current_block_hash, get_block, net):
                 print
         block_hash = block['header']['previous_block']
 
+@deferral.retry('Error getting work from bitcoind:', 1)
 @defer.inlineCallbacks
 def getwork(bitcoind):
-    while True:
-        try:
-            # a block could arrive in between these two queries
-            getwork_df, height_df = bitcoind.rpc_getwork(), bitcoind.rpc_getblocknumber()
-            try:
-                getwork, height = bitcoin.getwork.BlockAttempt.from_getwork((yield getwork_df)), (yield height_df)
-            finally:
-                # get rid of residual errors
-                getwork_df.addErrback(lambda fail: None)
-                height_df.addErrback(lambda fail: None)
-        except:
-            print
-            print 'Error getting work from bitcoind:'
-            traceback.print_exc()
-            print
-            
-            
-            yield util.sleep(1)
-            
-            continue
-        defer.returnValue((getwork, height))
+    # a block could arrive in between these two queries
+    getwork_df, height_df = bitcoind.rpc_getwork(), bitcoind.rpc_getblocknumber()
+    try:
+        getwork, height = bitcoin.getwork.BlockAttempt.from_getwork((yield getwork_df)), (yield height_df)
+    finally:
+        # get rid of residual errors
+        getwork_df.addErrback(lambda fail: None)
+        height_df.addErrback(lambda fail: None)
+    defer.returnValue((getwork, height))
 
 
 @defer.inlineCallbacks
 def main(args):
     try:
-        net = p2pool.Testnet if args.testnet else p2pool.Main
-        
         print 'p2pool (version %s)' % (__version__,)
         print
         
@@ -168,7 +153,7 @@ def main(args):
         
         # connect to bitcoind over bitcoin-p2p and do checkorder to get pubkey to send payouts to
         print "Testing bitcoind P2P connection to '%s:%s'..." % (args.bitcoind_address, args.bitcoind_p2p_port)
-        factory = bitcoin.p2p.ClientFactory(args.testnet)
+        factory = bitcoin.p2p.ClientFactory(args.net)
         reactor.connectTCP(args.bitcoind_address, args.bitcoind_p2p_port, factory)
         
         while True:
@@ -188,7 +173,7 @@ def main(args):
                 print
             else:
                 break
-            yield util.sleep(1)
+            yield deferral.sleep(1)
         
         print '    ...success!'
         print '    Payout script:', my_script.encode('hex')
@@ -199,24 +184,24 @@ def main(args):
             block = yield (yield factory.getProtocol()).get_block(block_hash)
             print 'Got block %x' % (block_hash,)
             defer.returnValue(block)
-        get_block = util.DeferredCacher(real_get_block, expiring_dict.ExpiringDict(3600))
+        get_block = deferral.DeferredCacher(real_get_block, expiring_dict.ExpiringDict(3600))
         
-        get_raw_transaction = util.DeferredCacher(lambda tx_hash: bitcoind.rpc_getrawtransaction('%x' % tx_hash), expiring_dict.ExpiringDict(100))
+        get_raw_transaction = deferral.DeferredCacher(lambda tx_hash: bitcoind.rpc_getrawtransaction('%x' % tx_hash), expiring_dict.ExpiringDict(100))
         
         chains = expiring_dict.ExpiringDict(300)
         def get_chain(chain_id_data):
             return chains.setdefault(chain_id_data, Chain(chain_id_data))
         # information affecting work that should trigger a long-polling update
-        current_work = util.Variable(None)
+        current_work = variable.Variable(None)
         # information affecting work that should not trigger a long-polling update
-        current_work2 = util.Variable(None)
+        current_work2 = variable.Variable(None)
         
         share_dbs = [db.SQLiteDict(sqlite3.connect(filename, isolation_level=None), 'shares') for filename in args.store_shares]
         
         @defer.inlineCallbacks
         def set_real_work():
             work, height = yield getwork(bitcoind)
-            last_p2pool_block_hash = (yield get_last_p2pool_block_hash(work.previous_block, get_block, net))
+            last_p2pool_block_hash = (yield get_last_p2pool_block_hash(work.previous_block, get_block, args.net))
             chain = get_chain(p2pool.chain_id_type.pack(dict(last_p2pool_block_hash=last_p2pool_block_hash, bits=work.bits)))
             current_work.set(dict(
                 version=work.version,
@@ -247,7 +232,7 @@ def main(args):
             share2.flag_shared()
         
         def p2p_share(share, peer=None):
-            if share.hash <= conv.bits_to_target(share.header['bits']):
+            if share.hash <= bitcoin.data.bits_to_target(share.header['bits']):
                 print
                 print 'GOT BLOCK! Passing to bitcoind! %x' % (share.hash,)
                 #print share.__dict__
@@ -258,7 +243,7 @@ def main(args):
                     print 'No bitcoind connection! Erp!'
             
             chain = get_chain(share.chain_id_data)
-            res = chain.accept(share, net)
+            res = chain.accept(share, args.net)
             if res == 'good':
                 share2 = chain.share2s[share.hash]
                 
@@ -335,22 +320,19 @@ def main(args):
                 ip, port = x.split(':')
                 return ip, int(port)
             else:
-                return x, {False: 9333, True: 19333}[args.testnet]
+                return x, args.net.P2P_PORT
         
-        if args.testnet:
-            nodes = [('72.14.191.28', 19333)]
-        else:
-            nodes = [('72.14.191.28', 9333)]
+        nodes = [('72.14.191.28', args.net.P2P_PORT)]
         try:
-            nodes.append(((yield reactor.resolve('p2pool.forre.st')), {False: 9333, True: 19333}[args.testnet]))
+            nodes.append(((yield reactor.resolve('p2pool.forre.st')), args.net.P2P_PORT))
         except:
             traceback.print_exc()
         
         p2p_node = p2p.Node(
             current_work=current_work,
             port=args.p2pool_port,
-            net=net,
-            addr_store=db.SQLiteDict(sqlite3.connect(os.path.join(os.path.dirname(__file__), 'addrs.dat'), isolation_level=None), net.ADDRS_TABLE),
+            net=args.net,
+            addr_store=db.SQLiteDict(sqlite3.connect(os.path.join(os.path.dirname(__file__), 'addrs.dat'), isolation_level=None), args.net.ADDRS_TABLE),
             mode=0 if args.low_bandwidth else 1,
             preferred_addrs=map(parse, args.p2pool_nodes) + nodes,
         )
@@ -395,18 +377,18 @@ def main(args):
                 new_script=my_script,
                 subsidy=(50*100000000 >> state['height']//210000) + sum(tx.value_in - tx.value_out for tx in extra_txs),
                 nonce=struct.pack("<Q", random.randrange(2**64)),
-                net=net,
+                net=args.net,
             )
             print 'Generating, have', shares.count(my_script) - 2, 'share(s) in the current chain. Fee:', sum(tx.value_in - tx.value_out for tx in extra_txs)/100000000
             transactions = [generate_tx] + [tx.tx for tx in extra_txs]
-            merkle_root = bitcoin.p2p.merkle_hash(transactions)
+            merkle_root = bitcoin.data.merkle_hash(transactions)
             merkle_root_to_transactions[merkle_root] = transactions # will stay for 1000 seconds
-            ba = conv.BlockAttempt(state['version'], state['previous_block'], merkle_root, current_work2.value['timestamp'], state['bits'])
-            return ba.getwork(net.TARGET_MULTIPLIER)
+            ba = bitcoin.getwork.BlockAttempt(state['version'], state['previous_block'], merkle_root, current_work2.value['timestamp'], state['bits'])
+            return ba.getwork(args.net.TARGET_MULTIPLIER)
         
         def got_response(data):
             # match up with transactions
-            header = conv.decode_data(data)
+            header = bitcoin.getwork.decode_data(data)
             transactions = merkle_root_to_transactions.get(header['merkle_root'], None)
             if transactions is None:
                 print "Couldn't link returned work's merkle root with its transactions - should only happen if you recently restarted p2pool"
@@ -435,7 +417,7 @@ def main(args):
             while True:
                 try:
                     block = get_block.call_now(start_hash)
-                except util.NotNowError:
+                except deferral.NotNowError:
                     break
                 yield start_hash, block
                 start_hash = block['header']['previous_block']
@@ -511,7 +493,7 @@ def run():
     parser.add_argument('--version', action='version', version=__version__)
     parser.add_argument('--testnet',
         help='use the testnet; make sure you change the ports too',
-        action='store_true', default=False, dest='testnet')
+        action='store_const', const=p2pool.Testnet, default=p2pool.Mainnet, dest='net')
     parser.add_argument('--store-shares', metavar='FILENAME',
         help='write shares to a database (not needed for normal usage)',
         type=str, action='append', default=[], dest='store_shares')
@@ -553,10 +535,10 @@ def run():
     args = parser.parse_args()
     
     if args.bitcoind_p2p_port is None:
-        args.bitcoind_p2p_port = {False: 8333, True: 18333}[args.testnet]
+        args.bitcoind_p2p_port = args.net.BITCOIN_P2P_PORT
     
     if args.p2pool_port is None:
-        args.p2pool_port = {False: 9333, True: 19333}[args.testnet]
+        args.p2pool_port = args.net.P2P_PORT
     
     reactor.callWhenRunning(main, args)
     reactor.run()
index 87ceef1..f09097b 100644 (file)
@@ -9,7 +9,7 @@ from twisted.internet import defer, protocol, reactor
 from bitcoin import p2p as bitcoin_p2p
 from bitcoin import data as bitcoin_data
 from p2pool import data as p2pool_data
-import util
+from util import deferral, variable, dicts
 
 # mode
 #     0: send hash first (high latency, low bandwidth)
@@ -135,19 +135,19 @@ class Protocol(bitcoin_p2p.BaseProtocol):
     def _think(self):
         while self.connected2:
             self.send_ping()
-            yield util.sleep(random.expovariate(1/100))
+            yield deferral.sleep(random.expovariate(1/100))
     
     @defer.inlineCallbacks
     def _think2(self):
         while self.connected2:
             self.send_addrme(port=self.node.port)
             #print 'sending addrme'
-            yield util.sleep(random.expovariate(1/100))
+            yield deferral.sleep(random.expovariate(1/100))
     
     def handle_version(self, version, services, addr_to, addr_from, nonce, sub_version, mode, state):
         self.other_version = version
         self.other_services = services
-        self.other_mode_var = util.Variable(mode)
+        self.other_mode_var = variable.Variable(mode)
         
         if nonce == self.node.nonce:
             #print 'Detected connection to self, disconnecting from %s:%i' % (self.transport.getPeer().host, self.transport.getPeer().port)
@@ -304,7 +304,7 @@ addrdb_value = bitcoin_data.ComposedType([
     ('last_seen', bitcoin_data.StructType('<Q')),
 ])
 
-class AddrStore(util.DictWrapper):
+class AddrStore(dicts.DictWrapper):
     def encode_key(self, (address, port)):
         return addrdb_key.pack(dict(address=address, port=port))
     def decode_key(self, encoded_key):
@@ -326,7 +326,7 @@ class Node(object):
         self.net = net
         self.addr_store = AddrStore(addr_store)
         self.preferred_addrs = preferred_addrs
-        self.mode_var = util.Variable(mode)
+        self.mode_var = variable.Variable(mode)
         self.desired_peers = desired_peers
         self.max_attempts = max_attempts
         self.current_work = current_work
@@ -367,7 +367,7 @@ class Node(object):
             except:
                 traceback.print_exc()
             
-            yield util.sleep(random.expovariate(1/5))
+            yield deferral.sleep(random.expovariate(1/5))
     
     @defer.inlineCallbacks
     def _think2(self):
@@ -378,7 +378,7 @@ class Node(object):
             except:
                 traceback.print_exc()
             
-            yield util.sleep(random.expovariate(1/20))
+            yield deferral.sleep(random.expovariate(1/20))
     
     def stop(self):
         if not self.running:
diff --git a/p2pool/util.py b/p2pool/util.py
deleted file mode 100644 (file)
index 27be3d7..0000000
+++ /dev/null
@@ -1,336 +0,0 @@
-from __future__ import division
-
-import collections
-import hashlib
-import itertools
-import random
-
-from twisted.internet import defer, reactor
-from twisted.python import failure
-from twisted.web import resource, server
-
-class DeferredResource(resource.Resource):
-    def render(self, request):
-        def finish(x):
-            if request._disconnected:
-                return
-            if x is not None:
-                request.write(x)
-            request.finish()
-        
-        def finish_error(fail):
-            if request._disconnected:
-                return
-            request.setResponseCode(500) # won't do anything if already written to
-            request.write('---ERROR---')
-            request.finish()
-            fail.printTraceback()
-        
-        defer.maybeDeferred(resource.Resource.render, self, request).addCallbacks(finish, finish_error)
-        return server.NOT_DONE_YET
-
-class Event(object):
-    def __init__(self):
-        self.observers = {}
-        self.one_time_observers = {}
-        self.id_generator = itertools.count()
-    
-    def watch(self, func):
-        id = self.id_generator.next()
-        self.observers[id] = func
-        return id
-    def unwatch(self, id):
-        self.observers.pop(id)
-    
-    def watch_one_time(self, func):
-        id = self.id_generator.next()
-        self.one_time_observers[id] = func
-        return id
-    def unwatch_one_time(self, id):
-        self.one_time_observers.pop(id)
-    
-    def happened(self, event=None):
-        for func in self.observers.itervalues():
-            func(event)
-        
-        one_time_observers = self.one_time_observers
-        self.one_time_observers = {}
-        for func in one_time_observers.itervalues():
-            func(event)
-    
-    def get_deferred(self):
-        df = defer.Deferred()
-        self.watch_one_time(df.callback)
-        return df
-
-class Variable(object):
-    def __init__(self, value):
-        self.value = value
-        self.changed = Event()
-    
-    def set(self, value):
-        if value == self.value:
-            return
-        
-        self.value = value
-        self.changed.happened(value)
-    
-    def get_not_none(self):
-        if self.value is not None:
-            return defer.succeed(self.value)
-        else:
-            df = defer.Deferred()
-            self.changed.watch_one_time(df.callback)
-            return df
-
-def sleep(t):
-    d = defer.Deferred()
-    reactor.callLater(t, d.callback, None)
-    return d
-
-def median(x):
-    # don't really need a complex algorithm here
-    y = sorted(x)
-    left = (len(y) - 1)//2
-    right = len(y)//2
-    return (y[left] + y[right])/2
-
-class StringBuffer(object):
-    'Buffer manager with great worst-case behavior'
-    
-    def __init__(self, data=''):
-        self.buf = collections.deque([data])
-        self.buf_len = len(data)
-        self.pos = 0
-    
-    def __len__(self):
-        return self.buf_len - self.pos
-    
-    def add(self, data):
-        self.buf.append(data)
-        self.buf_len += len(data)
-    
-    def get(self, wants):
-        if self.buf_len - self.pos < wants:
-            raise IndexError('not enough data')
-        data = []
-        while wants:
-            seg = self.buf[0][self.pos:self.pos+wants]
-            self.pos += len(seg)
-            while self.buf and self.pos >= len(self.buf[0]):
-                x = self.buf.popleft()
-                self.buf_len -= len(x)
-                self.pos -= len(x)
-            
-            data.append(seg)
-            wants -= len(seg)
-        return ''.join(data)
-
-def _DataChunker(receiver):
-    wants = receiver.next()
-    buf = StringBuffer()
-    
-    while True:
-        if len(buf) >= wants:
-            wants = receiver.send(buf.get(wants))
-        else:
-            buf.add((yield))
-def DataChunker(receiver):
-    '''
-    Produces a function that accepts data that is input into a generator
-    (receiver) in response to the receiver yielding the size of data to wait on
-    '''
-    x = _DataChunker(receiver)
-    x.next()
-    return x.send
-
-class ReplyMatcher(object):
-    def __init__(self, func, timeout=5):
-        self.func = func
-        self.timeout = timeout
-        self.map = {}
-    
-    def __call__(self, id):
-        try:
-            self.func(id)
-            uniq = random.randrange(2**256)
-            df = defer.Deferred()
-            def timeout():
-                df, timer = self.map[id].pop(uniq)
-                df.errback(failure.Failure(defer.TimeoutError()))
-                if not self.map[id]:
-                    del self.map[id]
-            self.map.setdefault(id, {})[uniq] = (df, reactor.callLater(self.timeout, timeout))
-            return df
-        except:
-            import traceback
-            traceback.print_exc()
-    
-    def got_response(self, id, resp):
-        if id not in self.map:
-            return
-        for df, timer in self.map.pop(id).itervalues():
-            timer.cancel()
-            df.callback(resp)
-
-class GenericDeferrer(object):
-    def __init__(self, max_id, func, timeout=5):
-        self.max_id = max_id
-        self.func = func
-        self.timeout = timeout
-        self.map = {}
-    
-    def __call__(self, *args, **kwargs):
-        while True:
-            id = random.randrange(self.max_id)
-            if id not in self.map:
-                break
-        df = defer.Deferred()
-        def timeout():
-            self.map.pop(id)
-            df.errback(failure.Failure(defer.TimeoutError()))
-        timer = reactor.callLater(self.timeout, timeout)
-        self.func(id, *args, **kwargs)
-        self.map[id] = df, timer
-        return df
-    
-    def got_response(self, id, resp):
-        if id not in self.map:
-            return
-        df, timer = self.map.pop(id)
-        timer.cancel()
-        df.callback(resp)
-
-class NotNowError(Exception):
-    pass
-
-class DeferredCacher(object):
-    def __init__(self, func, backing=None):
-        if backing is None:
-            backing = {}
-        
-        self.func = func
-        self.backing = backing
-        self.waiting = {}
-    
-    @defer.inlineCallbacks
-    def __call__(self, key):
-        if key in self.waiting:
-            yield self.waiting[key]
-        
-        if key in self.backing:
-            defer.returnValue(self.backing[key])
-        else:
-            self.waiting[key] = defer.Deferred()
-            try:
-                value = yield self.func(key)
-            finally:
-                self.waiting.pop(key).callback(None)
-        
-        self.backing[key] = value
-        defer.returnValue(value)
-    
-    def call_now(self, key):
-        if key in self.waiting:
-            raise NotNowError()
-        
-        if key in self.backing:
-            return self.backing[key]
-        else:
-            self.waiting[key] = defer.Deferred()
-            def cb(value):
-                self.backing[key] = value
-                self.waiting.pop(key).callback(None)
-            def eb(fail):
-                self.waiting.pop(key).callback(None)
-                fail.printTraceback()
-            self.func(key).addCallback(cb).addErrback(eb)
-            raise NotNowError()
-
-def pubkey_to_address(pubkey, testnet):
-    if len(pubkey) != 65:
-        raise ValueError('invalid pubkey')
-    version = 111 if testnet else 0
-    key_hash = chr(version) + hashlib.new('ripemd160', hashlib.sha256(pubkey).digest()).digest()
-    checksum = hashlib.sha256(hashlib.sha256(key_hash).digest()).digest()[:4]
-    return base58_encode(key_hash + checksum)
-
-def base58_encode(data):
-    return '1'*(len(data) - len(data.lstrip(chr(0)))) + natural_to_string(string_to_natural(data), '123456789ABCDEFGHJKLMNPQRSTUVWXYZabcdefghijkmnopqrstuvwxyz')
-
-def natural_to_string(n, alphabet=None, min_width=1):
-    if alphabet is None:
-        s = '%x' % (n,)
-        if len(s) % 2:
-            s = '\0' + x
-        return s.decode('hex').rjust(min_width, '\x00')
-    res = []
-    while n:
-        n, x = divmod(n, len(alphabet))
-        res.append(alphabet[x])
-    res.reverse()
-    return ''.join(res).rjust(min_width, '\x00')
-
-def string_to_natural(s, alphabet=None):
-    if alphabet is None:
-        s = s.encode('hex')
-        return int(s, 16)
-    if not s or (s != alphabet[0] and s.startswith(alphabet[0])):
-        raise ValueError()
-    return sum(alphabet.index(char) * len(alphabet)**i for i, char in enumerate(reversed(s)))
-
-
-class DictWrapper(object):
-    def encode_key(self, key):
-        return key
-    def decode_key(self, encoded_key):
-        return encoded_key
-    def encode_value(self, value):
-        return value
-    def decode_value(self, encoded_value):
-        return encoded_value
-    
-    def __init__(self, inner):
-        self.inner = inner
-    
-    def __len__(self):
-        return len(self.inner)
-    
-    def __contains__(self, key):
-        return self.encode_key(key) in self.inner
-    
-    def __getitem__(self, key):
-        return self.decode_value(self.inner[self.encode_key(key)])
-    def __setitem__(self, key, value):
-        self.inner[self.encode_key(key)] = self.encode_value(value)
-    def __delitem__(self, key):
-        del self.inner[self.encode_key(key)]
-    
-    def __iter__(self):
-        for encoded_key in self.inner:
-            yield self.decode_key(encoded_key)
-    def iterkeys(self):
-        return iter(self)
-    def keys(self):
-        return list(self.iterkeys())
-    
-    def itervalue(self):
-        for encoded_value in self.inner.itervalues():
-            yield self.decode_value(encoded_value)
-    def values(self):
-        return list(self.itervalue())
-    
-    def iteritems(self):
-        for key, value in self.inner.iteritems():
-            yield self.decode_key(key), self.decode_value(value)
-    def items(self):
-        return list(self.iteritems())
-
-def update_dict(d, **replace):
-    d = d.copy()
-    for k, v in replace.iteritems():
-        if v is None:
-            del d[k]
-        else:
-            d[k] = v
-    return d
diff --git a/p2pool/util/__init__.py b/p2pool/util/__init__.py
new file mode 100644 (file)
index 0000000..e69de29
diff --git a/p2pool/util/bases.py b/p2pool/util/bases.py
new file mode 100644 (file)
index 0000000..9c45589
--- /dev/null
@@ -0,0 +1,42 @@
+def natural_to_string(n, alphabet=None, min_width=0):
+    if alphabet is None:
+        s = '%x' % (n,)
+        if len(s) % 2:
+            s = '0' + s
+        return s.decode('hex').lstrip('\x00').rjust(min_width, '\x00')
+    assert len(set(alphabet)) == len(alphabet)
+    res = []
+    while n:
+        n, x = divmod(n, len(alphabet))
+        res.append(alphabet[x])
+    res.reverse()
+    return ''.join(res).rjust(min_width, '\x00')
+
+def string_to_natural(s, alphabet=None):
+    if alphabet is None:
+        s = s.encode('hex')
+        return int("0"+s, 16)
+    assert len(set(alphabet)) == len(alphabet)
+    if not s or (s != alphabet[0] and s.startswith(alphabet[0])):
+        raise ValueError()
+    return sum(alphabet.index(char) * len(alphabet)**i for i, char in enumerate(reversed(s)))
+
+import random
+
+def generate_alphabet():
+    if random.randrange(2):
+        return None
+    else:
+        a = map(chr, xrange(256))
+        random.shuffle(a)
+        return a[:random.randrange(2, len(a))]
+
+if __name__ == '__main__':
+    while True:
+        alphabet = generate_alphabet()
+        for i in xrange(1000):
+            n = random.randrange(100000000000000000000000000000)
+            s = natural_to_string(n, alphabet)
+            n2 = string_to_natural(s, alphabet)
+            print n, s.encode('hex'), n2
+            assert n == n2
diff --git a/p2pool/util/datachunker.py b/p2pool/util/datachunker.py
new file mode 100644 (file)
index 0000000..c627a20
--- /dev/null
@@ -0,0 +1,50 @@
+import collections
+
+class StringBuffer(object):
+    'Buffer manager with great worst-case behavior'
+    
+    def __init__(self, data=''):
+        self.buf = collections.deque([data])
+        self.buf_len = len(data)
+        self.pos = 0
+    
+    def __len__(self):
+        return self.buf_len - self.pos
+    
+    def add(self, data):
+        self.buf.append(data)
+        self.buf_len += len(data)
+    
+    def get(self, wants):
+        if self.buf_len - self.pos < wants:
+            raise IndexError('not enough data')
+        data = []
+        while wants:
+            seg = self.buf[0][self.pos:self.pos+wants]
+            self.pos += len(seg)
+            while self.buf and self.pos >= len(self.buf[0]):
+                x = self.buf.popleft()
+                self.buf_len -= len(x)
+                self.pos -= len(x)
+            
+            data.append(seg)
+            wants -= len(seg)
+        return ''.join(data)
+
+def _DataChunker(receiver):
+    wants = receiver.next()
+    buf = StringBuffer()
+    
+    while True:
+        if len(buf) >= wants:
+            wants = receiver.send(buf.get(wants))
+        else:
+            buf.add((yield))
+def DataChunker(receiver):
+    '''
+    Produces a function that accepts data that is input into a generator
+    (receiver) in response to the receiver yielding the size of data to wait on
+    '''
+    x = _DataChunker(receiver)
+    x.next()
+    return x.send
similarity index 100%
rename from p2pool/db.py
rename to p2pool/util/db.py
diff --git a/p2pool/util/deferral.py b/p2pool/util/deferral.py
new file mode 100644 (file)
index 0000000..a5c1c67
--- /dev/null
@@ -0,0 +1,163 @@
+from __future__ import division
+
+import random
+import traceback
+
+from twisted.internet import defer, reactor
+from twisted.python import failure
+
+def sleep(t):
+    d = defer.Deferred()
+    reactor.callLater(t, d.callback, None)
+    return d
+
+def retry(message, delay):
+    '''
+    @retry('Error getting block:', 1)
+    @defer.inlineCallbacks
+    def get_block(hash):
+        ...
+    '''
+    
+    def retry2(func):
+        @defer.inlineCallbacks
+        def f(*args, **kwargs):
+            while True:
+                try:
+                    result = yield func(*args, **kwargs)
+                except:
+                    print
+                    print message
+                    traceback.print_exc()
+                    print
+                    
+                    yield sleep(delay)
+                else:
+                    defer.returnValue(result)
+        return f
+    return retry2
+
+class ReplyMatcher(object):
+    '''
+    Converts request/got response interface to deferred interface
+    '''
+    
+    def __init__(self, func, timeout=5):
+        self.func = func
+        self.timeout = timeout
+        self.map = {}
+    
+    def __call__(self, id):
+        self.func(id)
+        uniq = random.randrange(2**256)
+        df = defer.Deferred()
+        def timeout():
+            df, timer = self.map[id].pop(uniq)
+            df.errback(failure.Failure(defer.TimeoutError()))
+            if not self.map[id]:
+                del self.map[id]
+        self.map.setdefault(id, {})[uniq] = (df, reactor.callLater(self.timeout, timeout))
+        return df
+    
+    def got_response(self, id, resp):
+        if id not in self.map:
+            return
+        for df, timer in self.map.pop(id).itervalues():
+            timer.cancel()
+            df.callback(resp)
+
+class GenericDeferrer(object):
+    '''
+    Converts query with identifier/got response interface to deferred interface
+    '''
+    
+    def __init__(self, max_id, func, timeout=5):
+        self.max_id = max_id
+        self.func = func
+        self.timeout = timeout
+        self.map = {}
+    
+    def __call__(self, *args, **kwargs):
+        while True:
+            id = random.randrange(self.max_id)
+            if id not in self.map:
+                break
+        df = defer.Deferred()
+        def timeout():
+            self.map.pop(id)
+            df.errback(failure.Failure(defer.TimeoutError()))
+        timer = reactor.callLater(self.timeout, timeout)
+        self.func(id, *args, **kwargs)
+        self.map[id] = df, timer
+        return df
+    
+    def got_response(self, id, resp):
+        if id not in self.map:
+            return
+        df, timer = self.map.pop(id)
+        timer.cancel()
+        df.callback(resp)
+
+class NotNowError(Exception):
+    pass
+
+class DeferredCacher(object):
+    '''
+    like memoize, but for functions that return Deferreds
+    
+    @DeferredCacher
+    def f(x):
+        ...
+        return df
+    
+    @DeferredCacher.with_backing(bsddb.hashopen(...))
+    def f(x):
+        ...
+        return df
+    '''
+    
+    @classmethod
+    def with_backing(cls, backing):
+        return lambda func: cls(func, backing)
+    
+    def __init__(self, func, backing=None):
+        if backing is None:
+            backing = {}
+        
+        self.func = func
+        self.backing = backing
+        self.waiting = {}
+    
+    @defer.inlineCallbacks
+    def __call__(self, key):
+        if key in self.waiting:
+            yield self.waiting[key]
+        
+        if key in self.backing:
+            defer.returnValue(self.backing[key])
+        else:
+            self.waiting[key] = defer.Deferred()
+            try:
+                value = yield self.func(key)
+            finally:
+                self.waiting.pop(key).callback(None)
+        
+        self.backing[key] = value
+        defer.returnValue(value)
+    
+    def call_now(self, key):
+        if key in self.waiting:
+            raise NotNowError()
+        
+        if key in self.backing:
+            return self.backing[key]
+        else:
+            self.waiting[key] = defer.Deferred()
+            def cb(value):
+                self.backing[key] = value
+                self.waiting.pop(key).callback(None)
+            def eb(fail):
+                self.waiting.pop(key).callback(None)
+                fail.printTraceback()
+            self.func(key).addCallback(cb).addErrback(eb)
+            raise NotNowError()
diff --git a/p2pool/util/deferred_resource.py b/p2pool/util/deferred_resource.py
new file mode 100644 (file)
index 0000000..dbc844e
--- /dev/null
@@ -0,0 +1,24 @@
+from __future__ import division
+
+from twisted.internet import defer
+from twisted.web import resource, server
+
+class DeferredResource(resource.Resource):
+    def render(self, request):
+        def finish(x):
+            if request._disconnected:
+                return
+            if x is not None:
+                request.write(x)
+            request.finish()
+        
+        def finish_error(fail):
+            if request._disconnected:
+                return
+            request.setResponseCode(500) # won't do anything if already written to
+            request.write('---ERROR---')
+            request.finish()
+            fail.printTraceback()
+        
+        defer.maybeDeferred(resource.Resource.render, self, request).addCallbacks(finish, finish_error)
+        return server.NOT_DONE_YET
diff --git a/p2pool/util/dicts.py b/p2pool/util/dicts.py
new file mode 100644 (file)
index 0000000..b193f9d
--- /dev/null
@@ -0,0 +1,54 @@
+class DictWrapper(object):
+    def encode_key(self, key):
+        return key
+    def decode_key(self, encoded_key):
+        return encoded_key
+    def encode_value(self, value):
+        return value
+    def decode_value(self, encoded_value):
+        return encoded_value
+    
+    def __init__(self, inner):
+        self.inner = inner
+    
+    def __len__(self):
+        return len(self.inner)
+    
+    def __contains__(self, key):
+        return self.encode_key(key) in self.inner
+    
+    def __getitem__(self, key):
+        return self.decode_value(self.inner[self.encode_key(key)])
+    def __setitem__(self, key, value):
+        self.inner[self.encode_key(key)] = self.encode_value(value)
+    def __delitem__(self, key):
+        del self.inner[self.encode_key(key)]
+    
+    def __iter__(self):
+        for encoded_key in self.inner:
+            yield self.decode_key(encoded_key)
+    def iterkeys(self):
+        return iter(self)
+    def keys(self):
+        return list(self.iterkeys())
+    
+    def itervalue(self):
+        for encoded_value in self.inner.itervalues():
+            yield self.decode_value(encoded_value)
+    def values(self):
+        return list(self.itervalue())
+    
+    def iteritems(self):
+        for key, value in self.inner.iteritems():
+            yield self.decode_key(key), self.decode_value(value)
+    def items(self):
+        return list(self.iteritems())
+
+def update_dict(d, **replace):
+    d = d.copy()
+    for k, v in replace.iteritems():
+        if v is None:
+            del d[k]
+        else:
+            d[k] = v
+    return d
similarity index 93%
rename from p2pool/jsonrpc.py
rename to p2pool/util/jsonrpc.py
index 1eb33d0..04f2ebd 100644 (file)
@@ -7,7 +7,7 @@ import traceback
 from twisted.internet import defer
 from twisted.web import client
 
-import util
+import deferred_resource
 
 class Error(Exception):
     def __init__(self, code, message, data=None):
@@ -62,7 +62,7 @@ class Proxy(object):
             return lambda *params: self.callRemote(attr[len('rpc_'):], *params)
         raise AttributeError('%r object has no attribute %r' % (self.__class__.__name__, attr))
 
-class Server(util.DeferredResource):
+class Server(deferred_resource.DeferredResource):
     extra_headers = None
     
     @defer.inlineCallbacks
@@ -111,11 +111,13 @@ class Server(util.DeferredResource):
             if id_ is None:
                 return
             
+            #print (df.result.type, df.result.value, df.result.tb)
+            #print df.result.__dict__
             try:
                 result = yield df
-            except Error, e:
-                raise e
-            except Exception, e:
+            #except Error, e:
+            #w    raise e
+            except Exception:
                 print 'Squelched JSON method error:'
                 traceback.print_exc()
                 raise Error(-32099, u'Unknown error')
diff --git a/p2pool/util/math.py b/p2pool/util/math.py
new file mode 100644 (file)
index 0000000..25f155c
--- /dev/null
@@ -0,0 +1,17 @@
+from __future__ import division
+
+def median(x, use_float=True):
+    # there exist better algorithms...
+    y = sorted(x)
+    left = (len(y) - 1)//2
+    right = len(y)//2
+    sum = y[left] + y[right]
+    if use_float:
+        return sum/2
+    else:
+        return sum//2
+
+def shuffled(x):
+    x = list(x)
+    random.shuffle(x)
+    return x
diff --git a/p2pool/util/variable.py b/p2pool/util/variable.py
new file mode 100644 (file)
index 0000000..dfa1c3e
--- /dev/null
@@ -0,0 +1,57 @@
+import itertools
+
+from twisted.internet import defer
+
+class Event(object):
+    def __init__(self):
+        self.observers = {}
+        self.one_time_observers = {}
+        self.id_generator = itertools.count()
+    
+    def watch(self, func):
+        id = self.id_generator.next()
+        self.observers[id] = func
+        return id
+    def unwatch(self, id):
+        self.observers.pop(id)
+    
+    def watch_one_time(self, func):
+        id = self.id_generator.next()
+        self.one_time_observers[id] = func
+        return id
+    def unwatch_one_time(self, id):
+        self.one_time_observers.pop(id)
+    
+    def happened(self, event=None):
+        for func in self.observers.itervalues():
+            func(event)
+        
+        one_time_observers = self.one_time_observers
+        self.one_time_observers = {}
+        for func in one_time_observers.itervalues():
+            func(event)
+    
+    def get_deferred(self):
+        df = defer.Deferred()
+        self.watch_one_time(df.callback)
+        return df
+
+class Variable(object):
+    def __init__(self, value):
+        self.value = value
+        self.changed = Event()
+    
+    def set(self, value):
+        if value == self.value:
+            return
+        
+        self.value = value
+        self.changed.happened(value)
+    
+    def get_not_none(self):
+        if self.value is not None:
+            return defer.succeed(self.value)
+        else:
+            df = defer.Deferred()
+            self.changed.watch_one_time(df.callback)
+            return df
index d781e15..3d849dc 100644 (file)
@@ -1,12 +1,13 @@
 from __future__ import division
 
+import json
+import traceback
+
 from twisted.internet import defer
 
-import json
-import jsonrpc
-import util
+from util import jsonrpc, deferred_resource
 
-class LongPollingWorkerInterface(util.DeferredResource):
+class LongPollingWorkerInterface(deferred_resource.DeferredResource):
     def __init__(self, work, compute):
         self.work = work
         self.compute = compute
@@ -42,7 +43,10 @@ class WorkerInterface(jsonrpc.Server):
         self.putChild('', self)
     
     def rpc_getwork(self, data=None):
+      try:
         if data is not None:
             return self.response_callback(data)
         
         return self.compute(self.work.value)
+      except ValueError:
+        traceback.print_exc()