--- /dev/null
+from p2pool.util import bases
+
+base58_alphabet = '123456789ABCDEFGHJKLMNPQRSTUVWXYZabcdefghijkmnopqrstuvwxyz'
+
+def base58_encode(data):
+ return base58_alphabet[0]*(len(data) - len(data.lstrip(chr(0)))) + bases.natural_to_string(bases.string_to_natural(data), base58_alphabet)
+
+def base58_decode(data):
+ return chr(0)*(len(data) - len(data.lstrip(base58_alphabet[0]))) + bases.natural_to_string(bases.string_to_natural(data, base58_alphabet))
+
+from __future__ import division
+
import struct
import StringIO
import hashlib
+from . import base58
+from p2pool.util import bases
+
class EarlyEnd(Exception):
pass
data = self._pack(obj)
assert self._unpack(data) == obj
return data
+
+
+ def pack_base58(self, obj):
+ return base58.base58_encode(self.pack(obj))
+
+ def unpack_base58(self, base58_data):
+ return self.unpack(base58.base58_decode(base58_data))
+
+
+ def hash160(self, obj):
+ return ripemdsha(self.pack(obj))
+
+ def hash256(self, obj):
+ return doublesha(self.pack(obj))
class VarIntType(Type):
def read(self, file):
return int(data[::-1].encode('hex'), 16)
def write(self, file, item):
+ if item >= 2**256:
+ raise ValueError("invalid hash value")
file.write(('%064x' % (item,)).decode('hex')[::-1])
class ShortHashType(Type):
return int(data[::-1].encode('hex'), 16)
def write(self, file, item):
- file.write(('%020x' % (item,)).decode('hex')[::-1])
+ if item >= 2**160:
+ raise ValueError("invalid hash value")
+ file.write(('%040x' % (item,)).decode('hex')[::-1])
class ListType(Type):
def __init__(self, type):
for key, type_ in self.fields:
type_.write(file, item[key])
-address_type = ComposedType([
- ('services', StructType('<Q')),
- ('address', IPV6AddressType()),
- ('port', StructType('>H')),
-])
-
-tx_type = ComposedType([
- ('version', StructType('<I')),
- ('tx_ins', ListType(ComposedType([
- ('previous_output', ComposedType([
- ('hash', HashType()),
- ('index', StructType('<I')),
- ])),
- ('script', VarStrType()),
- ('sequence', StructType('<I')),
- ]))),
- ('tx_outs', ListType(ComposedType([
- ('value', StructType('<Q')),
- ('script', VarStrType()),
- ]))),
- ('lock_time', StructType('<I')),
-])
-
-block_header_type = ComposedType([
- ('version', StructType('<I')),
- ('previous_block', HashType()),
- ('merkle_root', HashType()),
- ('timestamp', StructType('<I')),
- ('bits', StructType('<I')),
- ('nonce', StructType('<I')),
-])
-
-block_type = ComposedType([
- ('header', block_header_type),
- ('txs', ListType(tx_type)),
-])
-
-def doublesha(data):
- return HashType().unpack(hashlib.sha256(hashlib.sha256(data).digest()).digest())
-
-def ripemdsha(data):
- return ShortHashType().unpack(hashlib.new('ripemd160', hashlib.sha256(data).digest()).digest())
-
-merkle_record_type = ComposedType([
- ('left', HashType()),
- ('right', HashType()),
-])
-
-def merkle_hash(tx_list):
- hash_list = [doublesha(tx_type.pack(tx)) for tx in tx_list]
- while len(hash_list) > 1:
- hash_list = [doublesha(merkle_record_type.pack(dict(left=left, right=left if right is None else right)))
- for left, right in zip(hash_list[::2], hash_list[1::2] + [None])]
- return hash_list[0]
-
-def tx_hash(tx):
- return doublesha(tx_type.pack(tx))
-
-def block_hash(header):
- return doublesha(block_header_type.pack(header))
-
-class EarlyEnd(Exception):
- pass
-
-class LateEnd(Exception):
- pass
-
-class Type(object):
- # the same data can have only one unpacked representation, but multiple packed binary representations
+class ChecksummedType(Type):
+ def __init__(self, inner):
+ self.inner = inner
- def _unpack(self, data):
- f = StringIO.StringIO(data)
-
- obj = self.read(f)
+ def read(self, file):
+ obj = self.inner.read(file)
+ data = self.inner.pack(obj)
- if f.tell() != len(data):
- raise LateEnd('underread ' + repr((self, data)))
+ if file.read(4) != hashlib.sha256(hashlib.sha256(data).digest()).digest()[:4]:
+ raise ValueError("invalid checksum")
return obj
- def unpack(self, data):
- obj = self._unpack(data)
- assert self._unpack(self._pack(obj)) == obj
- return obj
-
- def _pack(self, obj):
- f = StringIO.StringIO()
-
- self.write(f, obj)
-
- data = f.getvalue()
-
- return data
-
- def pack(self, obj):
- data = self._pack(obj)
- assert self._unpack(data) == obj
- return data
-
-class VarIntType(Type):
- def read(self, file):
- data = file.read(1)
- if len(data) != 1:
- raise EarlyEnd()
- first, = struct.unpack('<B', data)
- if first == 0xff: desc = '<Q'
- elif first == 0xfe: desc = '<I'
- elif first == 0xfd: desc = '<H'
- else: return first
- length = struct.calcsize(desc)
- data = file.read(length)
- if len(data) != length:
- raise EarlyEnd()
- return struct.unpack(desc, data)[0]
-
- def write(self, file, item):
- if item < 0xfd:
- file.write(struct.pack('<B', item))
- elif item <= 0xffff:
- file.write(struct.pack('<BH', 0xfd, item))
- elif item <= 0xffffffff:
- file.write(struct.pack('<BI', 0xfe, item))
- elif item <= 0xffffffffffffffff:
- file.write(struct.pack('<BQ', 0xff, item))
- else:
- raise ValueError('int too large for varint')
-
-class VarStrType(Type):
- def read(self, file):
- length = VarIntType().read(file)
- res = file.read(length)
- if len(res) != length:
- raise EarlyEnd('var str not long enough %r' % ((length, len(res), res),))
- return res
-
- def write(self, file, item):
- VarIntType().write(file, len(item))
- file.write(item)
-
-class FixedStrType(Type):
- def __init__(self, length):
- self.length = length
-
- def read(self, file):
- res = file.read(self.length)
- if len(res) != self.length:
- raise EarlyEnd('early EOF!')
- return res
-
def write(self, file, item):
- if len(item) != self.length:
- raise ValueError('incorrect length!')
- file.write(item)
-
-class EnumType(Type):
- def __init__(self, inner, values):
- self.inner = inner
- self.values = values
-
- self.keys = {}
- for k, v in values.iteritems():
- if v in self.keys:
- raise ValueError('duplicate value in values')
- self.keys[v] = k
-
- def read(self, file):
- return self.keys[self.inner.read(file)]
-
- def write(self, file, item):
- self.inner.write(file, self.values[item])
-
-class HashType(Type):
- def read(self, file):
- data = file.read(256//8)
- if len(data) != 256//8:
- raise EarlyEnd('incorrect length!')
- return int(data[::-1].encode('hex'), 16)
-
- def write(self, file, item):
- file.write(('%064x' % (item,)).decode('hex')[::-1])
-
-class ShortHashType(Type):
- def read(self, file):
- data = file.read(160//8)
- if len(data) != 160//8:
- raise EarlyEnd('incorrect length!')
- return int(data[::-1].encode('hex'), 16)
-
- def write(self, file, item):
- file.write(('%020x' % (item,)).decode('hex')[::-1])
-
-class ListType(Type):
- def __init__(self, type):
- self.type = type
-
- def read(self, file):
- length = VarIntType().read(file)
- return [self.type.read(file) for i in xrange(length)]
-
- def write(self, file, item):
- VarIntType().write(file, len(item))
- for subitem in item:
- self.type.write(file, subitem)
-
-class StructType(Type):
- def __init__(self, desc):
- self.desc = desc
- self.length = struct.calcsize(self.desc)
-
- def read(self, file):
- data = file.read(self.length)
- if len(data) != self.length:
- raise EarlyEnd()
- res, = struct.unpack(self.desc, data)
- return res
-
- def write(self, file, item):
- data = struct.pack(self.desc, item)
- if struct.unpack(self.desc, data)[0] != item:
- # special test because struct doesn't error on some overflows
- raise ValueError("item didn't survive pack cycle (%r)" % (item,))
- file.write(data)
-
-class IPV6AddressType(Type):
- def read(self, file):
- data = file.read(16)
- if len(data) != 16:
- raise EarlyEnd()
- if data[:12] != '00000000000000000000ffff'.decode('hex'):
- raise ValueError("ipv6 addresses not supported yet")
- return '::ffff:' + '.'.join(str(ord(x)) for x in data[12:])
-
- def write(self, file, item):
- prefix = '::ffff:'
- if not item.startswith(prefix):
- raise ValueError("ipv6 addresses not supported yet")
- item = item[len(prefix):]
- bits = map(int, item.split('.'))
- if len(bits) != 4:
- raise ValueError("invalid address: %r" % (bits,))
- data = '00000000000000000000ffff'.decode('hex') + ''.join(chr(x) for x in bits)
- assert len(data) == 16, len(data)
+ data = self.inner.pack(item)
file.write(data)
-
-class ComposedType(Type):
- def __init__(self, fields):
- self.fields = fields
-
- def read(self, file):
- item = {}
- for key, type_ in self.fields:
- item[key] = type_.read(file)
- return item
-
- def write(self, file, item):
- for key, type_ in self.fields:
- type_.write(file, item[key])
+ file.write(hashlib.sha256(hashlib.sha256(data).digest()).digest()[:4])
address_type = ComposedType([
('services', StructType('<Q')),
('previous_block', HashType()),
('merkle_root', HashType()),
('timestamp', StructType('<I')),
- ('bits', StructType('<I')),
+ ('bits', FixedStrType(4)),
('nonce', StructType('<I')),
])
])
def merkle_hash(tx_list):
- hash_list = [doublesha(tx_type.pack(tx)) for tx in tx_list]
+ hash_list = map(tx_hash, tx_list)
while len(hash_list) > 1:
hash_list = [doublesha(merkle_record_type.pack(dict(left=left, right=left if right is None else right)))
for left, right in zip(hash_list[::2], hash_list[1::2] + [None])]
def block_hash(header):
return doublesha(block_header_type.pack(header))
+def shift_left(n, m):
+ # python: :(
+ if m < 0:
+ return n >> -m
+ return n << m
+
def bits_to_target(bits):
- return (bits & 0x00ffffff) * 2 ** (8 * ((bits >> 24) - 3))
+ bits = bits[::-1]
+ length = ord(bits[0])
+ return bases.string_to_natural((bits[1:] + "\0"*length)[:length])
+
+def old_bits_to_target(bits):
+ return shift_left(bits & 0x00ffffff, 8 * ((bits >> 24) - 3))
+
+def about_equal(a, b):
+ if a == b: return True
+ return abs(a-b)/((abs(a)+abs(b))/2) < .01
+
+def compress_target_to_bits(target): # loses precision
+ print
+ print "t", target
+ n = bases.natural_to_string(target)
+ print "n", n.encode('hex')
+ bits = chr(len(n)) + n[:3].ljust(3, '\0')
+ bits = bits[::-1]
+ print "bits", bits.encode('hex')
+ print "new", bits_to_target(bits)
+ print "old", old_bits_to_target(struct.unpack("<I", bits)[0])
+ assert about_equal(bits_to_target(bits), target), (bits_to_target(bits), target)
+ assert about_equal(old_bits_to_target(struct.unpack("<I", bits)[0]), target), (old_bits_to_target(struct.unpack("<I", bits)[0]), target)
+ return bits
def target_to_average_attempts(target):
return 2**256//(target + 1)
+
+human_address_type = ChecksummedType(ComposedType([
+ ('version', StructType("<B")),
+ ('pubkey_hash', ShortHashType()),
+]))
+
+pubkey_type = FixedStrType(65)
+
+def pubkey_hash_to_address(pubkey_hash, net):
+ return human_address_type.pack_base58(dict(version=net.BITCOIN_ADDRESS_VERSION, pubkey_hash=pubkey_hash))
+
+def pubkey_to_address(pubkey, net):
+ return pubkey_hash_to_address(pubkey_type.hash160(pubkey), net)
+
+def address_to_pubkey_hash(address, net):
+ x = human_address_type.unpack_base58(address)
+ if x['version'] != net.BITCOIN_ADDRESS_VERSION:
+ raise ValueError('address not for this net!')
+ return x['pubkey_hash']
+
+class Mainnet(object):
+ BITCOIN_P2P_PREFIX = 'f9beb4d9'.decode('hex')
+ BITCOIN_P2P_PORT = 8333
+ BITCOIN_ADDRESS_VERSION = 0
+
+class Testnet(object):
+ BITCOIN_P2P_PREFIX = 'fabfb5da'.decode('hex')
+ BITCOIN_P2P_PORT = 18333
+ BITCOIN_ADDRESS_VERSION = 111
+
from twisted.internet import protocol, reactor
from . import data as bitcoin_data
-import p2pool.util
+from p2pool.util import variable, datachunker, deferral
class BaseProtocol(protocol.Protocol):
def connectionMade(self):
- self.dataReceived = p2pool.util.DataChunker(self.dataReceiver())
+ self.dataReceived = datachunker.DataChunker(self.dataReceiver())
def dataReceiver(self):
while True:
raise AttributeError(attr)
class Protocol(BaseProtocol):
- def __init__(self, testnet=False):
- if testnet:
- self._prefix = 'fabfb5da'.decode('hex')
- else:
- self._prefix = 'f9beb4d9'.decode('hex')
+ def __init__(self, net):
+ self._prefix = net.BITCOIN_P2P_PREFIX
version = 0
self.version = self.version_after
# connection ready
- self.check_order = p2pool.util.GenericDeferrer(2**256, lambda id, order: self.send_checkorder(id=id, order=order))
- self.submit_order = p2pool.util.GenericDeferrer(2**256, lambda id, order: self.send_submitorder(id=id, order=order))
- self.get_block = p2pool.util.ReplyMatcher(lambda hash: self.send_getdata(requests=[dict(type='block', hash=hash)]))
- self.get_block_header = p2pool.util.ReplyMatcher(lambda hash: self.send_getdata(requests=[dict(type='block', hash=hash)]))
+ self.check_order = deferral.GenericDeferrer(2**256, lambda id, order: self.send_checkorder(id=id, order=order))
+ self.submit_order = deferral.GenericDeferrer(2**256, lambda id, order: self.send_submitorder(id=id, order=order))
+ self.get_block = deferral.ReplyMatcher(lambda hash: self.send_getdata(requests=[dict(type='block', hash=hash)]))
+ self.get_block_header = deferral.ReplyMatcher(lambda hash: self.send_getdata(requests=[dict(type='block', hash=hash)]))
if hasattr(self.factory, 'resetDelay'):
self.factory.resetDelay()
maxDelay = 15
- def __init__(self, testnet=False):
- self.testnet = testnet
- self.conn = p2pool.util.Variable(None)
+ def __init__(self, net):
+ self.net = net
+ self.conn = variable.Variable(None)
- self.new_block = p2pool.util.Event()
- self.new_tx = p2pool.util.Event()
+ self.new_block = variable.Event()
+ self.new_tx = variable.Event()
def buildProtocol(self, addr):
- p = self.protocol(self.testnet)
+ p = self.protocol(self.net)
p.factory = self
return p
from bitcoin import data as bitcoin_data
-chain_id_type = bitcoin_data.ComposedType([
- ('last_p2pool_block_hash', bitcoin_data.HashType()),
- ('bits', bitcoin_data.StructType('<I')),
-])
-
share_data_type = bitcoin_data.ComposedType([
- ('last_p2pool_block_hash', bitcoin_data.HashType()),
('previous_p2pool_share_hash', bitcoin_data.HashType()),
+ ('bits2', bitcoin_data.FixedStrType(4)),
('nonce', bitcoin_data.VarStrType()),
])
def share_info_to_gentx_and_shares(share_info, chain, net):
return generate_transaction(
- last_p2pool_block_hash=share_info['share_data']['last_p2pool_block_hash'],
previous_share2=chain.share2s[share_info['share_data']['previous_p2pool_share_hash']],
nonce=share_info['share_data']['nonce'],
new_script=share_info['new_script'],
dests = sorted(amounts.iterkeys(), key=lambda script: (script == new_script, script))
assert dests[-1] == new_script
+ pre_target = sum(bitcoin_data.target_to_average_attempts(share(x ago).target) for x in xrange(1000))/(share(1000 ago).timestamp - share(1 ago).timestamp)
+ bits2 = bitcoin_data.compress_target_to_bits(pre_target)
+
return dict(
version=1,
tx_ins=[dict(
last_p2pool_block_hash=last_p2pool_block_hash,
previous_p2pool_share_hash=previous_share2.share.hash if previous_share2 is not None else 2**256 - 1,
nonce=nonce,
+ bits2=bits2,
),
)),
)],
self.heads.add(share.hash)
if share.previous_hash in self.heads:
self.heads.remove(share.previous_hash)
+
+ def get_chain(self, start):
+ share_hash_to_get = start
+ while share_hash_to_get in self.shares:
+ share = self.shares[share_hash_to_get]
+ yield share
+ share_hash_to_get = share.previous_hash
+
+ def best(self):
+ return max(self.heads, key=self.score_chain)
+
+ def score_chain(self, start):
+ length = len(self.get_chain(start))
+
+ score = 0
+ for share in itertools.islice(self.get_chain(start), 1000):
+ score += a
+
+ return (min(length, 1000), score)
if __name__ == '__main__':
class FakeShare(object):
# TARGET_MULTIPLIER needs to be less than the current difficulty to prevent miner clients from missing shares
-class Testnet(object):
+class Mainnet(bitcoin_data.Mainnet):
+ TARGET_MULTIPLIER = SPREAD = 600
+ ROOT_BLOCK = 0x6c9cb0589a44808d9a9361266a4ffb9fea2e2cf4d70bb2118b5
+ SCRIPT = '4104ffd03de44a6e11b9917f3a29f9443283d9871c9d743ef30d5eddcd37094b64d1b3d8090496b53256786bf5c82932ec23c3b74d9f05a6f95a8b5529352656664bac'.decode('hex')
+ IDENTIFIER = 0x7452839666e1f8f8
+ PREFIX = '2d4224bf18c87b87'.decode('hex')
+ ADDRS_TABLE = 'addrs'
+ P2P_PORT = 9333
+
+class Testnet(bitcoin_data.Testnet):
TARGET_MULTIPLIER = SPREAD = 30
ROOT_BLOCK = 0xd5070cd4f2987ad2191af71393731a2b143f094f7b84c9e6aa9a6a
SCRIPT = '410403ad3dee8ab3d8a9ce5dd2abfbe7364ccd9413df1d279bf1a207849310465b0956e5904b1155ecd17574778f9949589ebfd4fb33ce837c241474a225cf08d85dac'.decode('hex')
IDENTIFIER = 0x1ae3479e4eb6700a
PREFIX = 'd19778c812754854'.decode('hex')
ADDRS_TABLE = 'addrs_testnet'
-
-class Main(object):
- TARGET_MULTIPLIER = SPREAD = 600
- ROOT_BLOCK = 0x11a22c6e314b1a3f44cbbf50246187a37756ea8af4d41c43a8d6
- SCRIPT = '4104ffd03de44a6e11b9917f3a29f9443283d9871c9d743ef30d5eddcd37094b64d1b3d8090496b53256786bf5c82932ec23c3b74d9f05a6f95a8b5529352656664bac'.decode('hex')
- IDENTIFIER = 0x7452839666e1f8f8
- PREFIX = '2d4224bf18c87b87'.decode('hex')
- ADDRS_TABLE = 'addrs'
+ P2P_PORT = 19333
import os
import random
import sqlite3
+import struct
import subprocess
import sys
import traceback
from twisted.web import server
import bitcoin.p2p, bitcoin.getwork, bitcoin.data
-import db
-import expiring_dict
-import jsonrpc
-import p2p
+from util import db, expiring_dict, jsonrpc, variable, deferral
+import p2pool.p2p as p2p
import p2pool.data as p2pool
-import util
import worker_interface
try:
self.last_p2pool_block_hash = p2pool.chain_id_type.unpack(chain_id_data)['last_p2pool_block_hash']
self.share2s = {} # hash -> share2
- self.highest = util.Variable(None) # hash
+ self.highest = variable.Variable(None) # hash
self.requesting = set()
self.request_map = {}
print
block_hash = block['header']['previous_block']
+@deferral.retry('Error getting work from bitcoind:', 1)
@defer.inlineCallbacks
def getwork(bitcoind):
- while True:
- try:
- # a block could arrive in between these two queries
- getwork_df, height_df = bitcoind.rpc_getwork(), bitcoind.rpc_getblocknumber()
- try:
- getwork, height = bitcoin.getwork.BlockAttempt.from_getwork((yield getwork_df)), (yield height_df)
- finally:
- # get rid of residual errors
- getwork_df.addErrback(lambda fail: None)
- height_df.addErrback(lambda fail: None)
- except:
- print
- print 'Error getting work from bitcoind:'
- traceback.print_exc()
- print
-
-
- yield util.sleep(1)
-
- continue
- defer.returnValue((getwork, height))
+ # a block could arrive in between these two queries
+ getwork_df, height_df = bitcoind.rpc_getwork(), bitcoind.rpc_getblocknumber()
+ try:
+ getwork, height = bitcoin.getwork.BlockAttempt.from_getwork((yield getwork_df)), (yield height_df)
+ finally:
+ # get rid of residual errors
+ getwork_df.addErrback(lambda fail: None)
+ height_df.addErrback(lambda fail: None)
+ defer.returnValue((getwork, height))
@defer.inlineCallbacks
def main(args):
try:
- net = p2pool.Testnet if args.testnet else p2pool.Main
-
print 'p2pool (version %s)' % (__version__,)
print
# connect to bitcoind over bitcoin-p2p and do checkorder to get pubkey to send payouts to
print "Testing bitcoind P2P connection to '%s:%s'..." % (args.bitcoind_address, args.bitcoind_p2p_port)
- factory = bitcoin.p2p.ClientFactory(args.testnet)
+ factory = bitcoin.p2p.ClientFactory(args.net)
reactor.connectTCP(args.bitcoind_address, args.bitcoind_p2p_port, factory)
while True:
print
else:
break
- yield util.sleep(1)
+ yield deferral.sleep(1)
print ' ...success!'
print ' Payout script:', my_script.encode('hex')
block = yield (yield factory.getProtocol()).get_block(block_hash)
print 'Got block %x' % (block_hash,)
defer.returnValue(block)
- get_block = util.DeferredCacher(real_get_block, expiring_dict.ExpiringDict(3600))
+ get_block = deferral.DeferredCacher(real_get_block, expiring_dict.ExpiringDict(3600))
- get_raw_transaction = util.DeferredCacher(lambda tx_hash: bitcoind.rpc_getrawtransaction('%x' % tx_hash), expiring_dict.ExpiringDict(100))
+ get_raw_transaction = deferral.DeferredCacher(lambda tx_hash: bitcoind.rpc_getrawtransaction('%x' % tx_hash), expiring_dict.ExpiringDict(100))
chains = expiring_dict.ExpiringDict(300)
def get_chain(chain_id_data):
return chains.setdefault(chain_id_data, Chain(chain_id_data))
# information affecting work that should trigger a long-polling update
- current_work = util.Variable(None)
+ current_work = variable.Variable(None)
# information affecting work that should not trigger a long-polling update
- current_work2 = util.Variable(None)
+ current_work2 = variable.Variable(None)
share_dbs = [db.SQLiteDict(sqlite3.connect(filename, isolation_level=None), 'shares') for filename in args.store_shares]
@defer.inlineCallbacks
def set_real_work():
work, height = yield getwork(bitcoind)
- last_p2pool_block_hash = (yield get_last_p2pool_block_hash(work.previous_block, get_block, net))
+ last_p2pool_block_hash = (yield get_last_p2pool_block_hash(work.previous_block, get_block, args.net))
chain = get_chain(p2pool.chain_id_type.pack(dict(last_p2pool_block_hash=last_p2pool_block_hash, bits=work.bits)))
current_work.set(dict(
version=work.version,
share2.flag_shared()
def p2p_share(share, peer=None):
- if share.hash <= conv.bits_to_target(share.header['bits']):
+ if share.hash <= bitcoin.data.bits_to_target(share.header['bits']):
print
print 'GOT BLOCK! Passing to bitcoind! %x' % (share.hash,)
#print share.__dict__
print 'No bitcoind connection! Erp!'
chain = get_chain(share.chain_id_data)
- res = chain.accept(share, net)
+ res = chain.accept(share, args.net)
if res == 'good':
share2 = chain.share2s[share.hash]
ip, port = x.split(':')
return ip, int(port)
else:
- return x, {False: 9333, True: 19333}[args.testnet]
+ return x, args.net.P2P_PORT
- if args.testnet:
- nodes = [('72.14.191.28', 19333)]
- else:
- nodes = [('72.14.191.28', 9333)]
+ nodes = [('72.14.191.28', args.net.P2P_PORT)]
try:
- nodes.append(((yield reactor.resolve('p2pool.forre.st')), {False: 9333, True: 19333}[args.testnet]))
+ nodes.append(((yield reactor.resolve('p2pool.forre.st')), args.net.P2P_PORT))
except:
traceback.print_exc()
p2p_node = p2p.Node(
current_work=current_work,
port=args.p2pool_port,
- net=net,
- addr_store=db.SQLiteDict(sqlite3.connect(os.path.join(os.path.dirname(__file__), 'addrs.dat'), isolation_level=None), net.ADDRS_TABLE),
+ net=args.net,
+ addr_store=db.SQLiteDict(sqlite3.connect(os.path.join(os.path.dirname(__file__), 'addrs.dat'), isolation_level=None), args.net.ADDRS_TABLE),
mode=0 if args.low_bandwidth else 1,
preferred_addrs=map(parse, args.p2pool_nodes) + nodes,
)
new_script=my_script,
subsidy=(50*100000000 >> state['height']//210000) + sum(tx.value_in - tx.value_out for tx in extra_txs),
nonce=struct.pack("<Q", random.randrange(2**64)),
- net=net,
+ net=args.net,
)
print 'Generating, have', shares.count(my_script) - 2, 'share(s) in the current chain. Fee:', sum(tx.value_in - tx.value_out for tx in extra_txs)/100000000
transactions = [generate_tx] + [tx.tx for tx in extra_txs]
- merkle_root = bitcoin.p2p.merkle_hash(transactions)
+ merkle_root = bitcoin.data.merkle_hash(transactions)
merkle_root_to_transactions[merkle_root] = transactions # will stay for 1000 seconds
- ba = conv.BlockAttempt(state['version'], state['previous_block'], merkle_root, current_work2.value['timestamp'], state['bits'])
- return ba.getwork(net.TARGET_MULTIPLIER)
+ ba = bitcoin.getwork.BlockAttempt(state['version'], state['previous_block'], merkle_root, current_work2.value['timestamp'], state['bits'])
+ return ba.getwork(args.net.TARGET_MULTIPLIER)
def got_response(data):
# match up with transactions
- header = conv.decode_data(data)
+ header = bitcoin.getwork.decode_data(data)
transactions = merkle_root_to_transactions.get(header['merkle_root'], None)
if transactions is None:
print "Couldn't link returned work's merkle root with its transactions - should only happen if you recently restarted p2pool"
while True:
try:
block = get_block.call_now(start_hash)
- except util.NotNowError:
+ except deferral.NotNowError:
break
yield start_hash, block
start_hash = block['header']['previous_block']
parser.add_argument('--version', action='version', version=__version__)
parser.add_argument('--testnet',
help='use the testnet; make sure you change the ports too',
- action='store_true', default=False, dest='testnet')
+ action='store_const', const=p2pool.Testnet, default=p2pool.Mainnet, dest='net')
parser.add_argument('--store-shares', metavar='FILENAME',
help='write shares to a database (not needed for normal usage)',
type=str, action='append', default=[], dest='store_shares')
args = parser.parse_args()
if args.bitcoind_p2p_port is None:
- args.bitcoind_p2p_port = {False: 8333, True: 18333}[args.testnet]
+ args.bitcoind_p2p_port = args.net.BITCOIN_P2P_PORT
if args.p2pool_port is None:
- args.p2pool_port = {False: 9333, True: 19333}[args.testnet]
+ args.p2pool_port = args.net.P2P_PORT
reactor.callWhenRunning(main, args)
reactor.run()
from bitcoin import p2p as bitcoin_p2p
from bitcoin import data as bitcoin_data
from p2pool import data as p2pool_data
-import util
+from util import deferral, variable, dicts
# mode
# 0: send hash first (high latency, low bandwidth)
def _think(self):
while self.connected2:
self.send_ping()
- yield util.sleep(random.expovariate(1/100))
+ yield deferral.sleep(random.expovariate(1/100))
@defer.inlineCallbacks
def _think2(self):
while self.connected2:
self.send_addrme(port=self.node.port)
#print 'sending addrme'
- yield util.sleep(random.expovariate(1/100))
+ yield deferral.sleep(random.expovariate(1/100))
def handle_version(self, version, services, addr_to, addr_from, nonce, sub_version, mode, state):
self.other_version = version
self.other_services = services
- self.other_mode_var = util.Variable(mode)
+ self.other_mode_var = variable.Variable(mode)
if nonce == self.node.nonce:
#print 'Detected connection to self, disconnecting from %s:%i' % (self.transport.getPeer().host, self.transport.getPeer().port)
('last_seen', bitcoin_data.StructType('<Q')),
])
-class AddrStore(util.DictWrapper):
+class AddrStore(dicts.DictWrapper):
def encode_key(self, (address, port)):
return addrdb_key.pack(dict(address=address, port=port))
def decode_key(self, encoded_key):
self.net = net
self.addr_store = AddrStore(addr_store)
self.preferred_addrs = preferred_addrs
- self.mode_var = util.Variable(mode)
+ self.mode_var = variable.Variable(mode)
self.desired_peers = desired_peers
self.max_attempts = max_attempts
self.current_work = current_work
except:
traceback.print_exc()
- yield util.sleep(random.expovariate(1/5))
+ yield deferral.sleep(random.expovariate(1/5))
@defer.inlineCallbacks
def _think2(self):
except:
traceback.print_exc()
- yield util.sleep(random.expovariate(1/20))
+ yield deferral.sleep(random.expovariate(1/20))
def stop(self):
if not self.running:
+++ /dev/null
-from __future__ import division
-
-import collections
-import hashlib
-import itertools
-import random
-
-from twisted.internet import defer, reactor
-from twisted.python import failure
-from twisted.web import resource, server
-
-class DeferredResource(resource.Resource):
- def render(self, request):
- def finish(x):
- if request._disconnected:
- return
- if x is not None:
- request.write(x)
- request.finish()
-
- def finish_error(fail):
- if request._disconnected:
- return
- request.setResponseCode(500) # won't do anything if already written to
- request.write('---ERROR---')
- request.finish()
- fail.printTraceback()
-
- defer.maybeDeferred(resource.Resource.render, self, request).addCallbacks(finish, finish_error)
- return server.NOT_DONE_YET
-
-class Event(object):
- def __init__(self):
- self.observers = {}
- self.one_time_observers = {}
- self.id_generator = itertools.count()
-
- def watch(self, func):
- id = self.id_generator.next()
- self.observers[id] = func
- return id
- def unwatch(self, id):
- self.observers.pop(id)
-
- def watch_one_time(self, func):
- id = self.id_generator.next()
- self.one_time_observers[id] = func
- return id
- def unwatch_one_time(self, id):
- self.one_time_observers.pop(id)
-
- def happened(self, event=None):
- for func in self.observers.itervalues():
- func(event)
-
- one_time_observers = self.one_time_observers
- self.one_time_observers = {}
- for func in one_time_observers.itervalues():
- func(event)
-
- def get_deferred(self):
- df = defer.Deferred()
- self.watch_one_time(df.callback)
- return df
-
-class Variable(object):
- def __init__(self, value):
- self.value = value
- self.changed = Event()
-
- def set(self, value):
- if value == self.value:
- return
-
- self.value = value
- self.changed.happened(value)
-
- def get_not_none(self):
- if self.value is not None:
- return defer.succeed(self.value)
- else:
- df = defer.Deferred()
- self.changed.watch_one_time(df.callback)
- return df
-
-def sleep(t):
- d = defer.Deferred()
- reactor.callLater(t, d.callback, None)
- return d
-
-def median(x):
- # don't really need a complex algorithm here
- y = sorted(x)
- left = (len(y) - 1)//2
- right = len(y)//2
- return (y[left] + y[right])/2
-
-class StringBuffer(object):
- 'Buffer manager with great worst-case behavior'
-
- def __init__(self, data=''):
- self.buf = collections.deque([data])
- self.buf_len = len(data)
- self.pos = 0
-
- def __len__(self):
- return self.buf_len - self.pos
-
- def add(self, data):
- self.buf.append(data)
- self.buf_len += len(data)
-
- def get(self, wants):
- if self.buf_len - self.pos < wants:
- raise IndexError('not enough data')
- data = []
- while wants:
- seg = self.buf[0][self.pos:self.pos+wants]
- self.pos += len(seg)
- while self.buf and self.pos >= len(self.buf[0]):
- x = self.buf.popleft()
- self.buf_len -= len(x)
- self.pos -= len(x)
-
- data.append(seg)
- wants -= len(seg)
- return ''.join(data)
-
-def _DataChunker(receiver):
- wants = receiver.next()
- buf = StringBuffer()
-
- while True:
- if len(buf) >= wants:
- wants = receiver.send(buf.get(wants))
- else:
- buf.add((yield))
-def DataChunker(receiver):
- '''
- Produces a function that accepts data that is input into a generator
- (receiver) in response to the receiver yielding the size of data to wait on
- '''
- x = _DataChunker(receiver)
- x.next()
- return x.send
-
-class ReplyMatcher(object):
- def __init__(self, func, timeout=5):
- self.func = func
- self.timeout = timeout
- self.map = {}
-
- def __call__(self, id):
- try:
- self.func(id)
- uniq = random.randrange(2**256)
- df = defer.Deferred()
- def timeout():
- df, timer = self.map[id].pop(uniq)
- df.errback(failure.Failure(defer.TimeoutError()))
- if not self.map[id]:
- del self.map[id]
- self.map.setdefault(id, {})[uniq] = (df, reactor.callLater(self.timeout, timeout))
- return df
- except:
- import traceback
- traceback.print_exc()
-
- def got_response(self, id, resp):
- if id not in self.map:
- return
- for df, timer in self.map.pop(id).itervalues():
- timer.cancel()
- df.callback(resp)
-
-class GenericDeferrer(object):
- def __init__(self, max_id, func, timeout=5):
- self.max_id = max_id
- self.func = func
- self.timeout = timeout
- self.map = {}
-
- def __call__(self, *args, **kwargs):
- while True:
- id = random.randrange(self.max_id)
- if id not in self.map:
- break
- df = defer.Deferred()
- def timeout():
- self.map.pop(id)
- df.errback(failure.Failure(defer.TimeoutError()))
- timer = reactor.callLater(self.timeout, timeout)
- self.func(id, *args, **kwargs)
- self.map[id] = df, timer
- return df
-
- def got_response(self, id, resp):
- if id not in self.map:
- return
- df, timer = self.map.pop(id)
- timer.cancel()
- df.callback(resp)
-
-class NotNowError(Exception):
- pass
-
-class DeferredCacher(object):
- def __init__(self, func, backing=None):
- if backing is None:
- backing = {}
-
- self.func = func
- self.backing = backing
- self.waiting = {}
-
- @defer.inlineCallbacks
- def __call__(self, key):
- if key in self.waiting:
- yield self.waiting[key]
-
- if key in self.backing:
- defer.returnValue(self.backing[key])
- else:
- self.waiting[key] = defer.Deferred()
- try:
- value = yield self.func(key)
- finally:
- self.waiting.pop(key).callback(None)
-
- self.backing[key] = value
- defer.returnValue(value)
-
- def call_now(self, key):
- if key in self.waiting:
- raise NotNowError()
-
- if key in self.backing:
- return self.backing[key]
- else:
- self.waiting[key] = defer.Deferred()
- def cb(value):
- self.backing[key] = value
- self.waiting.pop(key).callback(None)
- def eb(fail):
- self.waiting.pop(key).callback(None)
- fail.printTraceback()
- self.func(key).addCallback(cb).addErrback(eb)
- raise NotNowError()
-
-def pubkey_to_address(pubkey, testnet):
- if len(pubkey) != 65:
- raise ValueError('invalid pubkey')
- version = 111 if testnet else 0
- key_hash = chr(version) + hashlib.new('ripemd160', hashlib.sha256(pubkey).digest()).digest()
- checksum = hashlib.sha256(hashlib.sha256(key_hash).digest()).digest()[:4]
- return base58_encode(key_hash + checksum)
-
-def base58_encode(data):
- return '1'*(len(data) - len(data.lstrip(chr(0)))) + natural_to_string(string_to_natural(data), '123456789ABCDEFGHJKLMNPQRSTUVWXYZabcdefghijkmnopqrstuvwxyz')
-
-def natural_to_string(n, alphabet=None, min_width=1):
- if alphabet is None:
- s = '%x' % (n,)
- if len(s) % 2:
- s = '\0' + x
- return s.decode('hex').rjust(min_width, '\x00')
- res = []
- while n:
- n, x = divmod(n, len(alphabet))
- res.append(alphabet[x])
- res.reverse()
- return ''.join(res).rjust(min_width, '\x00')
-
-def string_to_natural(s, alphabet=None):
- if alphabet is None:
- s = s.encode('hex')
- return int(s, 16)
- if not s or (s != alphabet[0] and s.startswith(alphabet[0])):
- raise ValueError()
- return sum(alphabet.index(char) * len(alphabet)**i for i, char in enumerate(reversed(s)))
-
-
-class DictWrapper(object):
- def encode_key(self, key):
- return key
- def decode_key(self, encoded_key):
- return encoded_key
- def encode_value(self, value):
- return value
- def decode_value(self, encoded_value):
- return encoded_value
-
- def __init__(self, inner):
- self.inner = inner
-
- def __len__(self):
- return len(self.inner)
-
- def __contains__(self, key):
- return self.encode_key(key) in self.inner
-
- def __getitem__(self, key):
- return self.decode_value(self.inner[self.encode_key(key)])
- def __setitem__(self, key, value):
- self.inner[self.encode_key(key)] = self.encode_value(value)
- def __delitem__(self, key):
- del self.inner[self.encode_key(key)]
-
- def __iter__(self):
- for encoded_key in self.inner:
- yield self.decode_key(encoded_key)
- def iterkeys(self):
- return iter(self)
- def keys(self):
- return list(self.iterkeys())
-
- def itervalue(self):
- for encoded_value in self.inner.itervalues():
- yield self.decode_value(encoded_value)
- def values(self):
- return list(self.itervalue())
-
- def iteritems(self):
- for key, value in self.inner.iteritems():
- yield self.decode_key(key), self.decode_value(value)
- def items(self):
- return list(self.iteritems())
-
-def update_dict(d, **replace):
- d = d.copy()
- for k, v in replace.iteritems():
- if v is None:
- del d[k]
- else:
- d[k] = v
- return d
--- /dev/null
+def natural_to_string(n, alphabet=None, min_width=0):
+ if alphabet is None:
+ s = '%x' % (n,)
+ if len(s) % 2:
+ s = '0' + s
+ return s.decode('hex').lstrip('\x00').rjust(min_width, '\x00')
+ assert len(set(alphabet)) == len(alphabet)
+ res = []
+ while n:
+ n, x = divmod(n, len(alphabet))
+ res.append(alphabet[x])
+ res.reverse()
+ return ''.join(res).rjust(min_width, '\x00')
+
+def string_to_natural(s, alphabet=None):
+ if alphabet is None:
+ s = s.encode('hex')
+ return int("0"+s, 16)
+ assert len(set(alphabet)) == len(alphabet)
+ if not s or (s != alphabet[0] and s.startswith(alphabet[0])):
+ raise ValueError()
+ return sum(alphabet.index(char) * len(alphabet)**i for i, char in enumerate(reversed(s)))
+
+import random
+
+def generate_alphabet():
+ if random.randrange(2):
+ return None
+ else:
+ a = map(chr, xrange(256))
+ random.shuffle(a)
+ return a[:random.randrange(2, len(a))]
+
+if __name__ == '__main__':
+ while True:
+ alphabet = generate_alphabet()
+ for i in xrange(1000):
+ n = random.randrange(100000000000000000000000000000)
+ s = natural_to_string(n, alphabet)
+ n2 = string_to_natural(s, alphabet)
+ print n, s.encode('hex'), n2
+ assert n == n2
--- /dev/null
+import collections
+
+class StringBuffer(object):
+ 'Buffer manager with great worst-case behavior'
+
+ def __init__(self, data=''):
+ self.buf = collections.deque([data])
+ self.buf_len = len(data)
+ self.pos = 0
+
+ def __len__(self):
+ return self.buf_len - self.pos
+
+ def add(self, data):
+ self.buf.append(data)
+ self.buf_len += len(data)
+
+ def get(self, wants):
+ if self.buf_len - self.pos < wants:
+ raise IndexError('not enough data')
+ data = []
+ while wants:
+ seg = self.buf[0][self.pos:self.pos+wants]
+ self.pos += len(seg)
+ while self.buf and self.pos >= len(self.buf[0]):
+ x = self.buf.popleft()
+ self.buf_len -= len(x)
+ self.pos -= len(x)
+
+ data.append(seg)
+ wants -= len(seg)
+ return ''.join(data)
+
+def _DataChunker(receiver):
+ wants = receiver.next()
+ buf = StringBuffer()
+
+ while True:
+ if len(buf) >= wants:
+ wants = receiver.send(buf.get(wants))
+ else:
+ buf.add((yield))
+def DataChunker(receiver):
+ '''
+ Produces a function that accepts data that is input into a generator
+ (receiver) in response to the receiver yielding the size of data to wait on
+ '''
+ x = _DataChunker(receiver)
+ x.next()
+ return x.send
--- /dev/null
+from __future__ import division
+
+import random
+import traceback
+
+from twisted.internet import defer, reactor
+from twisted.python import failure
+
+def sleep(t):
+ d = defer.Deferred()
+ reactor.callLater(t, d.callback, None)
+ return d
+
+def retry(message, delay):
+ '''
+ @retry('Error getting block:', 1)
+ @defer.inlineCallbacks
+ def get_block(hash):
+ ...
+ '''
+
+ def retry2(func):
+ @defer.inlineCallbacks
+ def f(*args, **kwargs):
+ while True:
+ try:
+ result = yield func(*args, **kwargs)
+ except:
+ print
+ print message
+ traceback.print_exc()
+ print
+
+ yield sleep(delay)
+ else:
+ defer.returnValue(result)
+ return f
+ return retry2
+
+class ReplyMatcher(object):
+ '''
+ Converts request/got response interface to deferred interface
+ '''
+
+ def __init__(self, func, timeout=5):
+ self.func = func
+ self.timeout = timeout
+ self.map = {}
+
+ def __call__(self, id):
+ self.func(id)
+ uniq = random.randrange(2**256)
+ df = defer.Deferred()
+ def timeout():
+ df, timer = self.map[id].pop(uniq)
+ df.errback(failure.Failure(defer.TimeoutError()))
+ if not self.map[id]:
+ del self.map[id]
+ self.map.setdefault(id, {})[uniq] = (df, reactor.callLater(self.timeout, timeout))
+ return df
+
+ def got_response(self, id, resp):
+ if id not in self.map:
+ return
+ for df, timer in self.map.pop(id).itervalues():
+ timer.cancel()
+ df.callback(resp)
+
+class GenericDeferrer(object):
+ '''
+ Converts query with identifier/got response interface to deferred interface
+ '''
+
+ def __init__(self, max_id, func, timeout=5):
+ self.max_id = max_id
+ self.func = func
+ self.timeout = timeout
+ self.map = {}
+
+ def __call__(self, *args, **kwargs):
+ while True:
+ id = random.randrange(self.max_id)
+ if id not in self.map:
+ break
+ df = defer.Deferred()
+ def timeout():
+ self.map.pop(id)
+ df.errback(failure.Failure(defer.TimeoutError()))
+ timer = reactor.callLater(self.timeout, timeout)
+ self.func(id, *args, **kwargs)
+ self.map[id] = df, timer
+ return df
+
+ def got_response(self, id, resp):
+ if id not in self.map:
+ return
+ df, timer = self.map.pop(id)
+ timer.cancel()
+ df.callback(resp)
+
+class NotNowError(Exception):
+ pass
+
+class DeferredCacher(object):
+ '''
+ like memoize, but for functions that return Deferreds
+
+ @DeferredCacher
+ def f(x):
+ ...
+ return df
+
+ @DeferredCacher.with_backing(bsddb.hashopen(...))
+ def f(x):
+ ...
+ return df
+ '''
+
+ @classmethod
+ def with_backing(cls, backing):
+ return lambda func: cls(func, backing)
+
+ def __init__(self, func, backing=None):
+ if backing is None:
+ backing = {}
+
+ self.func = func
+ self.backing = backing
+ self.waiting = {}
+
+ @defer.inlineCallbacks
+ def __call__(self, key):
+ if key in self.waiting:
+ yield self.waiting[key]
+
+ if key in self.backing:
+ defer.returnValue(self.backing[key])
+ else:
+ self.waiting[key] = defer.Deferred()
+ try:
+ value = yield self.func(key)
+ finally:
+ self.waiting.pop(key).callback(None)
+
+ self.backing[key] = value
+ defer.returnValue(value)
+
+ def call_now(self, key):
+ if key in self.waiting:
+ raise NotNowError()
+
+ if key in self.backing:
+ return self.backing[key]
+ else:
+ self.waiting[key] = defer.Deferred()
+ def cb(value):
+ self.backing[key] = value
+ self.waiting.pop(key).callback(None)
+ def eb(fail):
+ self.waiting.pop(key).callback(None)
+ fail.printTraceback()
+ self.func(key).addCallback(cb).addErrback(eb)
+ raise NotNowError()
--- /dev/null
+from __future__ import division
+
+from twisted.internet import defer
+from twisted.web import resource, server
+
+class DeferredResource(resource.Resource):
+ def render(self, request):
+ def finish(x):
+ if request._disconnected:
+ return
+ if x is not None:
+ request.write(x)
+ request.finish()
+
+ def finish_error(fail):
+ if request._disconnected:
+ return
+ request.setResponseCode(500) # won't do anything if already written to
+ request.write('---ERROR---')
+ request.finish()
+ fail.printTraceback()
+
+ defer.maybeDeferred(resource.Resource.render, self, request).addCallbacks(finish, finish_error)
+ return server.NOT_DONE_YET
--- /dev/null
+class DictWrapper(object):
+ def encode_key(self, key):
+ return key
+ def decode_key(self, encoded_key):
+ return encoded_key
+ def encode_value(self, value):
+ return value
+ def decode_value(self, encoded_value):
+ return encoded_value
+
+ def __init__(self, inner):
+ self.inner = inner
+
+ def __len__(self):
+ return len(self.inner)
+
+ def __contains__(self, key):
+ return self.encode_key(key) in self.inner
+
+ def __getitem__(self, key):
+ return self.decode_value(self.inner[self.encode_key(key)])
+ def __setitem__(self, key, value):
+ self.inner[self.encode_key(key)] = self.encode_value(value)
+ def __delitem__(self, key):
+ del self.inner[self.encode_key(key)]
+
+ def __iter__(self):
+ for encoded_key in self.inner:
+ yield self.decode_key(encoded_key)
+ def iterkeys(self):
+ return iter(self)
+ def keys(self):
+ return list(self.iterkeys())
+
+ def itervalue(self):
+ for encoded_value in self.inner.itervalues():
+ yield self.decode_value(encoded_value)
+ def values(self):
+ return list(self.itervalue())
+
+ def iteritems(self):
+ for key, value in self.inner.iteritems():
+ yield self.decode_key(key), self.decode_value(value)
+ def items(self):
+ return list(self.iteritems())
+
+def update_dict(d, **replace):
+ d = d.copy()
+ for k, v in replace.iteritems():
+ if v is None:
+ del d[k]
+ else:
+ d[k] = v
+ return d
from twisted.internet import defer
from twisted.web import client
-import util
+import deferred_resource
class Error(Exception):
def __init__(self, code, message, data=None):
return lambda *params: self.callRemote(attr[len('rpc_'):], *params)
raise AttributeError('%r object has no attribute %r' % (self.__class__.__name__, attr))
-class Server(util.DeferredResource):
+class Server(deferred_resource.DeferredResource):
extra_headers = None
@defer.inlineCallbacks
if id_ is None:
return
+ #print (df.result.type, df.result.value, df.result.tb)
+ #print df.result.__dict__
try:
result = yield df
- except Error, e:
- raise e
- except Exception, e:
+ #except Error, e:
+ #w raise e
+ except Exception:
print 'Squelched JSON method error:'
traceback.print_exc()
raise Error(-32099, u'Unknown error')
--- /dev/null
+from __future__ import division
+
+def median(x, use_float=True):
+ # there exist better algorithms...
+ y = sorted(x)
+ left = (len(y) - 1)//2
+ right = len(y)//2
+ sum = y[left] + y[right]
+ if use_float:
+ return sum/2
+ else:
+ return sum//2
+
+def shuffled(x):
+ x = list(x)
+ random.shuffle(x)
+ return x
--- /dev/null
+import itertools
+
+from twisted.internet import defer
+
+class Event(object):
+ def __init__(self):
+ self.observers = {}
+ self.one_time_observers = {}
+ self.id_generator = itertools.count()
+
+ def watch(self, func):
+ id = self.id_generator.next()
+ self.observers[id] = func
+ return id
+ def unwatch(self, id):
+ self.observers.pop(id)
+
+ def watch_one_time(self, func):
+ id = self.id_generator.next()
+ self.one_time_observers[id] = func
+ return id
+ def unwatch_one_time(self, id):
+ self.one_time_observers.pop(id)
+
+ def happened(self, event=None):
+ for func in self.observers.itervalues():
+ func(event)
+
+ one_time_observers = self.one_time_observers
+ self.one_time_observers = {}
+ for func in one_time_observers.itervalues():
+ func(event)
+
+ def get_deferred(self):
+ df = defer.Deferred()
+ self.watch_one_time(df.callback)
+ return df
+
+class Variable(object):
+ def __init__(self, value):
+ self.value = value
+ self.changed = Event()
+
+ def set(self, value):
+ if value == self.value:
+ return
+
+ self.value = value
+ self.changed.happened(value)
+
+ def get_not_none(self):
+ if self.value is not None:
+ return defer.succeed(self.value)
+ else:
+ df = defer.Deferred()
+ self.changed.watch_one_time(df.callback)
+ return df
from __future__ import division
+import json
+import traceback
+
from twisted.internet import defer
-import json
-import jsonrpc
-import util
+from util import jsonrpc, deferred_resource
-class LongPollingWorkerInterface(util.DeferredResource):
+class LongPollingWorkerInterface(deferred_resource.DeferredResource):
def __init__(self, work, compute):
self.work = work
self.compute = compute
self.putChild('', self)
def rpc_getwork(self, data=None):
+ try:
if data is not None:
return self.response_callback(data)
return self.compute(self.work.value)
+ except ValueError:
+ traceback.print_exc()