import struct
import sys
import time
+import json
+import signal
from twisted.internet import defer, reactor
-from twisted.web import server
+from twisted.web import server, resource
from twisted.python import log
+from nattraverso import portmapper, ipdiscover
import bitcoin.p2p, bitcoin.getwork, bitcoin.data
from util import db, expiring_dict, jsonrpc, variable, deferral, math
@defer.inlineCallbacks
def main(args):
try:
+ if args.charts:
+ from . import draw
+
print 'p2pool (version %s)' % (p2pool_init.__version__,)
print
my_script = yield get_payout_script(factory)
if args.pubkey_hash is None:
if my_script is None:
- print 'IP transaction denied ... falling back to sending to address.'
+ print ' IP transaction denied ... falling back to sending to address.'
my_script = yield get_payout_script2(bitcoind, args.net)
else:
my_script = bitcoin.data.pubkey_hash_to_script2(args.pubkey_hash)
print ' Payout script:', my_script.encode('hex')
print
- ht = bitcoin.p2p.HeightTracker(factory)
+ print 'Loading cached block headers...'
+ ht = bitcoin.p2p.HeightTracker(factory, args.net.HEADERSTORE_FILENAME)
+ print ' ...done loading %i cached block headers.' % (len(ht.tracker.shares),)
+ print
tracker = p2pool.OkayTracker(args.net)
chains = expiring_dict.ExpiringDict(300)
current_work2 = variable.Variable(None)
work_updated = variable.Event()
- tracker_updated = variable.Event()
requested = expiring_dict.ExpiringDict(300)
@defer.inlineCallbacks
def set_real_work1():
work, height = yield getwork(bitcoind)
- # XXX call tracker_updated
+ changed = work.previous_block != current_work.value['previous_block'] if current_work.value is not None else True
current_work.set(dict(
version=work.version,
previous_block=work.previous_block,
current_work2.set(dict(
clock_offset=time.time() - work.timestamp,
))
+ if changed:
+ set_real_work2()
- @defer.inlineCallbacks
def set_real_work2():
- best, desired = yield tracker.think(ht, current_work.value['previous_block'], time.time() - current_work2.value['clock_offset'])
+ best, desired = tracker.think(ht, current_work.value['previous_block'], time.time() - current_work2.value['clock_offset'])
t = dict(current_work.value)
t['best_share_hash'] = best
current_work.set(t)
+ t = time.time()
for peer2, share_hash in desired:
if share_hash not in tracker.tails: # was received in the time tracker.think was running
continue
last_request_time, count = requested.get(share_hash, (None, 0))
- if last_request_time is not None and last_request_time - 5 < time.time() < last_request_time + 10 * 1.5**count:
+ if last_request_time is not None and last_request_time - 5 < t < last_request_time + 10 * 1.5**count:
continue
potential_peers = set()
for head in tracker.tails[share_hash]:
tracker.get_nth_parent_hash(head, min(max(0, tracker.get_height_and_last(head)[0] - 1), 10)) for head in tracker.heads
))[:100],
)
- requested[share_hash] = time.time(), count + 1
+ requested[share_hash] = t, count + 1
print 'Initializing work...'
yield set_real_work1()
- yield set_real_work2()
+ set_real_work2()
print ' ...success!'
+ print
start_time = time.time() - current_work2.value['clock_offset']
for peer in p2p_node.peers.itervalues():
if peer is ignore_peer:
continue
- if p2pool_init.DEBUG:
- print "Sending share %s to %r" % (p2pool.format_hash(share.hash), peer.addr)
+ #if p2pool_init.DEBUG:
+ # print "Sending share %s to %r" % (p2pool.format_hash(share.hash), peer.addr)
peer.send_shares([share])
share.flag_shared()
peer_heads.setdefault(shares[0].hash, set()).add(peer)
if some_new:
- tracker_updated.happened()
+ set_real_work2()
if len(shares) > 5:
print '... done processing %i shares. Have: %i/~%i' % (len(shares), len(tracker.shares), 2*args.net.CHAIN_LENGTH)
def p2p_share_hashes(share_hashes, peer):
+ t = time.time()
get_hashes = []
for share_hash in share_hashes:
if share_hash in tracker.shares:
- pass # print 'Got share hash, already have, ignoring. Hash: %s' % (p2pool.format_hash(share_hash),)
- else:
- print 'Got share hash, requesting! Hash: %s' % (p2pool.format_hash(share_hash),)
- get_hashes.append(share_hash)
+ continue
+ last_request_time, count = requested.get(share_hash, (None, 0))
+ if last_request_time is not None and last_request_time - 5 < t < last_request_time + 10 * 1.5**count:
+ continue
+ print 'Got share hash, requesting! Hash: %s' % (p2pool.format_hash(share_hash),)
+ get_hashes.append(share_hash)
+ requested[share_hash] = t, count + 1
if share_hashes and peer is not None:
peer_heads.setdefault(share_hashes[0], set()).add(peer)
else:
return x, args.net.P2P_PORT
- nodes = [
+ nodes = set([
('72.14.191.28', args.net.P2P_PORT),
('62.204.197.159', args.net.P2P_PORT),
- ]
- try:
- nodes.append(((yield reactor.resolve('p2pool.forre.st')), args.net.P2P_PORT))
- except:
- log.err(None, 'Error resolving bootstrap node IP:')
+ ('142.58.248.28', args.net.P2P_PORT),
+ ('94.23.34.145', args.net.P2P_PORT),
+ ])
+ for host in [
+ 'p2pool.forre.st',
+ 'dabuttonfactory.com',
+ ]:
+ try:
+ nodes.add(((yield reactor.resolve(host)), args.net.P2P_PORT))
+ except:
+ log.err(None, 'Error resolving bootstrap node IP:')
p2p_node = p2p.Node(
current_work=current_work,
net=args.net,
addr_store=db.SQLiteDict(sqlite3.connect(os.path.join(os.path.dirname(sys.argv[0]), 'addrs.dat'), isolation_level=None), args.net.ADDRS_TABLE),
mode=0 if args.low_bandwidth else 1,
- preferred_addrs=map(parse, args.p2pool_nodes) + nodes,
+ preferred_addrs=set(map(parse, args.p2pool_nodes)) | nodes,
)
p2p_node.handle_shares = p2p_shares
p2p_node.handle_share_hashes = p2p_share_hashes
print ' ...success!'
print
+ @defer.inlineCallbacks
+ def upnp_thread():
+ while True:
+ try:
+ is_lan, lan_ip = yield ipdiscover.get_local_ip()
+ if not is_lan:
+ continue
+ pm = yield portmapper.get_port_mapper()
+ yield pm._upnp.add_port_mapping(lan_ip, args.net.P2P_PORT, args.net.P2P_PORT, 'p2pool', 'TCP')
+ except:
+ if p2pool_init.DEBUG:
+ log.err(None, "UPnP error:")
+ yield deferral.sleep(random.expovariate(1/120))
+
+ if args.upnp:
+ upnp_thread()
+
# start listening for workers with a JSON-RPC server
print 'Listening for workers on port %i...' % (args.worker_port,)
block = dict(header=header, txs=transactions)
hash_ = bitcoin.data.block_header_type.hash256(block['header'])
if hash_ <= block['header']['target'] or p2pool_init.DEBUG:
- print
- print 'GOT BLOCK! Passing to bitcoind! bitcoin: %x' % (hash_,)
- print
if factory.conn.value is not None:
factory.conn.value.send_block(block=block)
else:
print 'No bitcoind connection! Erp!'
+ if hash_ <= block['header']['target']:
+ print
+ print 'GOT BLOCK! Passing to bitcoind! bitcoin: %x' % (hash_,)
+ print
target = p2pool.coinbase_type.unpack(transactions[0]['tx_ins'][0]['script'])['share_data']['target']
if hash_ > target:
print 'Received invalid share from worker - %x/%x' % (hash_, target)
log.err(None, 'Error processing data received from worker:')
return False
+ web_root = worker_interface.WorkerInterface(current_work, compute, got_response, args.net)
+
def get_rate():
if current_work.value['best_share_hash'] is not None:
height, last = tracker.get_height_and_last(current_work.value['best_share_hash'])
att_s = p2pool.get_pool_attempts_per_second(tracker, current_work.value['best_share_hash'], args.net, min(height, 720))
- return att_s
+ return json.dumps(att_s)
+ return json.dumps(None)
def get_users():
height, last = tracker.get_height_and_last(current_work.value['best_share_hash'])
weights, total_weight = tracker.get_cumulative_weights(current_work.value['best_share_hash'], min(height, 720), 2**256)
res = {}
for script in sorted(weights, key=lambda s: weights[s]):
- res[script.encode('hex')] = weights[script]/total_weight
- return res
-
+ res[bitcoin.data.script2_to_human(script, args.net)] = weights[script]/total_weight
+ return json.dumps(res)
+
+ class WebInterface(resource.Resource):
+ def __init__(self, func, mime_type):
+ self.func, self.mime_type = func, mime_type
+
+ def render_GET(self, request):
+ request.setHeader('Content-Type', self.mime_type)
+ return self.func()
+
+ web_root.putChild('rate', WebInterface(get_rate, 'application/json'))
+ web_root.putChild('users', WebInterface(get_users, 'application/json'))
+ if args.charts:
+ web_root.putChild('chain_img', WebInterface(lambda: draw.get(tracker, current_work.value['best_share_hash']), 'image/png'))
- reactor.listenTCP(args.worker_port, server.Site(worker_interface.WorkerInterface(current_work, compute, got_response, get_rate, get_users, args.net)))
+ reactor.listenTCP(args.worker_port, server.Site(web_root))
print ' ...success!'
print
print 'Started successfully!'
print
+ ht.updated.watch(set_real_work2)
+
@defer.inlineCallbacks
def work1_thread():
while True:
yield set_real_work1()
except:
log.err()
- yield defer.DeferredList([flag, deferral.sleep(random.expovariate(1/1))], fireOnOneCallback=True)
+ yield defer.DeferredList([flag, deferral.sleep(random.expovariate(1/20))], fireOnOneCallback=True)
@defer.inlineCallbacks
def work2_thread():
while True:
- flag = tracker_updated.get_deferred()
try:
- yield set_real_work2()
+ set_real_work2()
except:
log.err()
- yield defer.DeferredList([flag, deferral.sleep(random.expovariate(1/1))], fireOnOneCallback=True)
+ yield deferral.sleep(random.expovariate(1/20))
work1_thread()
work2_thread()
counter = skiplists.CountsSkipList(tracker, run_identifier)
while True:
- yield deferral.sleep(random.expovariate(1/1))
+ yield deferral.sleep(3)
try:
if current_work.value['best_share_hash'] is not None:
height, last = tracker.get_height_and_last(current_work.value['best_share_hash'])
- if height > 5:
- att_s = p2pool.get_pool_attempts_per_second(tracker, current_work.value['best_share_hash'], args.net)
+ if height > 2:
+ att_s = p2pool.get_pool_attempts_per_second(tracker, current_work.value['best_share_hash'], args.net, min(height - 1, 120))
weights, total_weight = tracker.get_cumulative_weights(current_work.value['best_share_hash'], min(height, 120), 2**100)
matching_in_chain = counter(current_work.value['best_share_hash'], height)
shares_in_chain = my_shares & matching_in_chain
len(shares_in_chain) + len(stale_shares),
len(stale_shares),
len(p2p_node.peers),
- )
+ ) + (' FDs: %i R/%i W' % (len(reactor.getReaders()), len(reactor.getWriters())) if p2pool_init.DEBUG else '')
#weights, total_weight = tracker.get_cumulative_weights(current_work.value['best_share_hash'], min(height, 100), 2**100)
#for k, v in weights.iteritems():
# print k.encode('hex'), v/total_weight
parser.add_argument('-a', '--address',
help='generate to this address (defaults to requesting one from bitcoind)',
type=str, action='store', default=None, dest='address')
+ parser.add_argument('--charts',
+ help='generate charts on the web interface (requires PIL and pygame)',
+ action='store_const', const=True, default=False, dest='charts')
p2pool_group = parser.add_argument_group('p2pool interface')
p2pool_group.add_argument('--p2pool-port', metavar='PORT',
parser.add_argument('-l', '--low-bandwidth',
help='trade lower bandwidth usage for higher latency (reduced efficiency)',
action='store_true', default=False, dest='low_bandwidth')
+ parser.add_argument('--disable-upnp',
+ help='''don't attempt to forward port 9333 (19333 for testnet) from the WAN to this computer using UPnP''',
+ action='store_false', default=True, dest='upnp')
worker_group = parser.add_argument_group('worker interface')
worker_group.add_argument('-w', '--worker-port', metavar='PORT',
if args.debug:
p2pool_init.DEBUG = True
+ class ReopeningFile(object):
+ def __init__(self, *open_args, **open_kwargs):
+ self.open_args, self.open_kwargs = open_args, open_kwargs
+ self.inner_file = open(*self.open_args, **self.open_kwargs)
+ def reopen(self):
+ self.inner_file.close()
+ self.inner_file = open(*self.open_args, **self.open_kwargs)
+ def write(self, data):
+ self.inner_file.write(data)
+ def flush(self):
+ self.inner_file.flush()
class TeePipe(object):
def __init__(self, outputs):
self.outputs = outputs
self.buf = lines[-1]
def flush(self):
pass
- sys.stdout = sys.stderr = log.DefaultObserver.stderr = TimestampingPipe(TeePipe([sys.stderr, open(os.path.join(os.path.dirname(sys.argv[0]), 'debug.log'), 'w')]))
+ logfile = ReopeningFile(os.path.join(os.path.dirname(sys.argv[0]), 'debug.log'), 'w')
+ sys.stdout = sys.stderr = log.DefaultObserver.stderr = TimestampingPipe(TeePipe([sys.stderr, logfile]))
+ if hasattr(signal, "SIGUSR1"):
+ def sigusr1(signum, frame):
+ print '''Caught SIGUSR1, closing 'debug.log'...'''
+ logfile.reopen()
+ print '''...and reopened 'debug.log' after catching SIGUSR1.'''
+ signal.signal(signal.SIGUSR1, sigusr1)
if args.bitcoind_p2p_port is None:
args.bitcoind_p2p_port = args.net.BITCOIN_P2P_PORT