import ConfigParser
import StringIO
import argparse
+import base64
+import json
import os
import random
-import struct
import sys
import time
import signal
import traceback
import urlparse
+try:
+ from twisted.internet import iocpreactor
+ iocpreactor.install()
+except:
+ pass
+else:
+ print 'Using IOCP reactor!'
from twisted.internet import defer, reactor, protocol, task
from twisted.web import server
from twisted.python import log
from nattraverso import portmapper, ipdiscover
import bitcoin.p2p as bitcoin_p2p, bitcoin.getwork as bitcoin_getwork, bitcoin.data as bitcoin_data
-from bitcoin import worker_interface
+from bitcoin import worker_interface, height_tracker
from util import expiring_dict, jsonrpc, variable, deferral, math, logging, pack
from . import p2p, networks, web
import p2pool, p2pool.data as p2pool_data
version=work['version'],
previous_block_hash=int(work['previousblockhash'], 16),
transactions=map(bitcoin_data.tx_type.unpack, packed_transactions),
- merkle_branch=bitcoin_data.calculate_merkle_branch([0] + map(bitcoin_data.hash256, packed_transactions), 0),
+ merkle_link=bitcoin_data.calculate_merkle_link([0] + map(bitcoin_data.hash256, packed_transactions), 0), # using 0 is a bit of a hack, but will always work when index=0
subsidy=work['coinbasevalue'],
time=work['time'],
bits=bitcoin_data.FloatingIntegerType().unpack(work['bits'].decode('hex')[::-1]) if isinstance(work['bits'], (str, unicode)) else bitcoin_data.FloatingInteger(work['bits']),
# connect to bitcoind over JSON-RPC and do initial getmemorypool
url = 'http://%s:%i/' % (args.bitcoind_address, args.bitcoind_rpc_port)
print '''Testing bitcoind RPC connection to '%s' with username '%s'...''' % (url, args.bitcoind_rpc_username)
- bitcoind = jsonrpc.Proxy(url, (args.bitcoind_rpc_username, args.bitcoind_rpc_password))
- good = yield deferral.retry('Error while checking bitcoind identity:', 1)(net.PARENT.RPC_CHECK)(bitcoind)
- if not good:
- print >>sys.stderr, " Check failed! Make sure that you're connected to the right bitcoind with --bitcoind-rpc-port!"
- return
- temp_work = yield getwork(bitcoind)
+ bitcoind = jsonrpc.Proxy(url, dict(Authorization='Basic ' + base64.b64encode(args.bitcoind_rpc_username + ':' + args.bitcoind_rpc_password)), timeout=30)
+ @deferral.retry('Error while checking Bitcoin connection:', 1)
+ @defer.inlineCallbacks
+ def check():
+ if not (yield net.PARENT.RPC_CHECK)(bitcoind):
+ print >>sys.stderr, " Check failed! Make sure that you're connected to the right bitcoind with --bitcoind-rpc-port!"
+ raise deferral.RetrySilentlyException()
+ temp_work = yield getwork(bitcoind)
+ if not net.VERSION_CHECK((yield bitcoind.rpc_getinfo())['version'], temp_work):
+ print >>sys.stderr, ' Bitcoin version too old! BIP16 support required! Upgrade to 0.6.0rc4 or greater!'
+ raise deferral.RetrySilentlyException()
+ defer.returnValue(temp_work)
+ temp_work = yield check()
print ' ...success!'
print ' Current block hash: %x' % (temp_work['previous_block_hash'],)
print
shared_share_hashes = set()
ss = p2pool_data.ShareStore(os.path.join(datadir_path, 'shares.'), net)
known_verified = set()
- recent_blocks = []
print "Loading shares..."
for i, (mode, contents) in enumerate(ss.get_shares()):
if mode == 'share':
current_work2.set(dict(
time=work['time'],
transactions=work['transactions'],
- merkle_branch=work['merkle_branch'],
+ merkle_link=work['merkle_link'],
subsidy=work['subsidy'],
clock_offset=time.time() - work['time'],
last_update=time.time(),
))
yield set_real_work1()
- if '\ngetblock ' in (yield deferral.retry()(bitcoind.rpc_help)()):
- @deferral.DeferredCacher
- @defer.inlineCallbacks
- def height_cacher(block_hash):
- try:
- x = yield bitcoind.rpc_getblock('%x' % (block_hash,))
- except jsonrpc.Error, e:
- if e.code == -5 and not p2pool.DEBUG:
- raise deferral.RetrySilentlyException()
- raise
- defer.returnValue(x['blockcount'] if 'blockcount' in x else x['height'])
- best_height_cached = variable.Variable((yield deferral.retry()(height_cacher)(pre_current_work.value['previous_block'])))
- def get_height_rel_highest(block_hash):
- this_height = height_cacher.call_now(block_hash, 0)
- best_height = height_cacher.call_now(pre_current_work.value['previous_block'], 0)
- best_height_cached.set(max(best_height_cached.value, this_height, best_height))
- return this_height - best_height_cached.value
- else:
- get_height_rel_highest = bitcoin_p2p.HeightTracker(bitcoind, factory, 5*net.SHARE_PERIOD*net.CHAIN_LENGTH/net.PARENT.BLOCK_PERIOD).get_height_rel_highest
+ get_height_rel_highest = yield height_tracker.get_height_rel_highest_func(bitcoind, factory, lambda: pre_current_work.value['previous_block'], net)
def set_real_work2():
best, desired = tracker.think(get_height_rel_highest, pre_current_work.value['previous_block'], pre_current_work.value['bits'])
@defer.inlineCallbacks
def set_merged_work(merged_url, merged_userpass):
- merged_proxy = jsonrpc.Proxy(merged_url, (merged_userpass,))
+ merged_proxy = jsonrpc.Proxy(merged_url, dict(Authorization='Basic ' + base64.b64encode(merged_userpass)))
while True:
auxblock = yield deferral.retry('Error while calling merged getauxblock:', 1)(merged_proxy.rpc_getauxblock)()
pre_merged_work.set(dict(pre_merged_work.value, **{auxblock['chainid']: dict(
break
shares.append(share)
print 'Sending %i shares to %s:%i' % (len(shares), peer.addr[0], peer.addr[1])
- peer.sendShares(shares)
+ return shares
+
+ @deferral.retry('Error submitting primary block: (will retry)', 10, 10)
+ def submit_block_p2p(block):
+ if factory.conn.value is None:
+ print >>sys.stderr, 'No bitcoind connection when block submittal attempted! %s%32x' % (net.PARENT.BLOCK_EXPLORER_URL_PREFIX, header_hash)
+ raise deferral.RetrySilentlyException()
+ factory.conn.value.send_block(block=block)
@deferral.retry('Error submitting block: (will retry)', 10, 10)
@defer.inlineCallbacks
- def submit_block(block, ignore_failure):
+ def submit_block_rpc(block, ignore_failure):
success = yield bitcoind.rpc_getmemorypool(bitcoin_data.block_type.pack(block).encode('hex'))
success_expected = net.PARENT.POW_FUNC(bitcoin_data.block_header_type.pack(block['header'])) <= block['header']['bits'].target
if (not success and success_expected and not ignore_failure) or (success and not success_expected):
print >>sys.stderr, 'Block submittal result: %s Expected: %s' % (result, expected_result)
+ def submit_block(block, ignore_failure):
+ submit_block_p2p(block)
+ submit_block_rpc(block, ignore_failure)
+
@tracker.verified.added.watch
def _(share):
if share.pow_hash <= share.header['bits'].target:
print
print 'GOT BLOCK FROM PEER! Passing to bitcoind! %s bitcoin: %s%064x' % (p2pool_data.format_hash(share.hash), net.PARENT.BLOCK_EXPLORER_URL_PREFIX, share.header_hash)
print
- recent_blocks.append(dict(ts=share.timestamp, hash='%064x' % (share.header_hash,)))
+ if current_work.value['previous_block'] in [share.header['previous_block'], share.header_hash]:
+ broadcast_share(share.hash)
print 'Joining p2pool network using port %i...' % (args.p2pool_port,)
defer.returnValue(((yield reactor.resolve(x)), net.P2P_PORT))
addrs = {}
- if os.path.exists(os.path.join(datadir_path, 'addrs.txt')):
+ if os.path.exists(os.path.join(datadir_path, 'addrs')):
+ try:
+ with open(os.path.join(datadir_path, 'addrs'), 'rb') as f:
+ addrs.update(dict((tuple(k), v) for k, v in json.loads(f.read())))
+ except:
+ print >>sys.stderr, 'error parsing addrs'
+ elif os.path.exists(os.path.join(datadir_path, 'addrs.txt')):
try:
addrs.update(dict(eval(x) for x in open(os.path.join(datadir_path, 'addrs.txt'))))
except:
- print >>sys.stderr, "error reading addrs"
+ print >>sys.stderr, "error reading addrs.txt"
for addr_df in map(parse, net.BOOTSTRAP_ADDRS):
try:
addr = yield addr_df
net=net,
addr_store=addrs,
connect_addrs=connect_addrs,
+ max_incoming_conns=args.p2pool_conns,
)
p2p_node.start()
- task.LoopingCall(lambda: open(os.path.join(datadir_path, 'addrs.txt'), 'w').writelines(repr(x) + '\n' for x in p2p_node.addr_store.iteritems())).start(60)
+ def save_addrs():
+ with open(os.path.join(datadir_path, 'addrs'), 'wb') as f:
+ f.write(json.dumps(p2p_node.addr_store.items()))
+ task.LoopingCall(save_addrs).start(60)
- # send share when the chain changes to their chain
- def work_changed(new_work):
- #print 'Work changed:', new_work
+ def broadcast_share(share_hash):
shares = []
- for share in tracker.get_chain(new_work['best_share_hash'], min(5, tracker.get_height(new_work['best_share_hash']))):
+ for share in tracker.get_chain(share_hash, min(5, tracker.get_height(share_hash))):
if share.hash in shared_share_hashes:
break
shared_share_hashes.add(share.hash)
for peer in p2p_node.peers.itervalues():
peer.sendShares([share for share in shares if share.peer is not peer])
- current_work.changed.watch(work_changed)
+ # send share when the chain changes to their chain
+ current_work.changed.watch(lambda new_work: broadcast_share(new_work['best_share_hash']))
def save_shares():
for share in tracker.get_chain(current_work.value['best_share_hash'], min(tracker.get_height(current_work.value['best_share_hash']), 2*net.CHAIN_LENGTH)):
print 'Listening for workers on %r port %i...' % (worker_endpoint[0], worker_endpoint[1])
- if os.path.exists(os.path.join(datadir_path, 'vip_pass')):
- with open(os.path.join(datadir_path, 'vip_pass'), 'rb') as f:
- vip_pass = f.read().strip('\r\n')
- else:
- vip_pass = '%016x' % (random.randrange(2**64),)
- with open(os.path.join(datadir_path, 'vip_pass'), 'wb') as f:
- f.write(vip_pass)
- print ' Worker password:', vip_pass, '(only required for generating graphs)'
-
# setup worker logic
removed_unstales_var = variable.Variable((0, 0, 0))
'''Returns (orphans, doas), total, (orphans_recorded_in_chain, doas_recorded_in_chain)'''
my_shares = len(my_share_hashes)
my_doa_shares = len(my_doa_share_hashes)
- delta = tracker.verified.get_delta(current_work.value['best_share_hash'])
+ delta = tracker.verified.get_delta_to_last(current_work.value['best_share_hash'])
my_shares_in_chain = delta.my_count + removed_unstales_var.value[0]
my_doa_shares_in_chain = delta.my_doa_count + removed_doa_unstales_var.value
orphans_recorded_in_chain = delta.my_orphan_announce_count + removed_unstales_var.value[1]
pseudoshare_received = variable.Event()
+ share_received = variable.Event()
local_rate_monitor = math.RateMonitor(10*60)
class WorkerBridge(worker_interface.WorkerBridge):
self.new_work_event = current_work.changed
self.recent_shares_ts_work = []
- def preprocess_request(self, request):
+ def get_user_details(self, request):
user = request.getUser() if request.getUser() is not None else ''
desired_pseudoshare_target = None
except: # XXX blah
pubkey_hash = my_pubkey_hash
+ return user, pubkey_hash, desired_share_target, desired_pseudoshare_target
+
+ def preprocess_request(self, request):
+ user, pubkey_hash, desired_share_target, desired_pseudoshare_target = self.get_user_details(request)
return pubkey_hash, desired_share_target, desired_pseudoshare_target
def get_work(self, pubkey_hash, desired_share_target, desired_pseudoshare_target):
mm_data = ''
mm_later = []
- share_info, generate_tx = p2pool_data.generate_transaction(
- tracker=tracker,
- share_data=dict(
- previous_share_hash=current_work.value['best_share_hash'],
- coinbase=(mm_data + current_work.value['coinbaseflags'])[:100],
- nonce=random.randrange(2**32),
- pubkey_hash=pubkey_hash,
- subsidy=current_work2.value['subsidy'],
- donation=math.perfect_round(65535*args.donation_percentage/100),
- stale_info=(lambda (orphans, doas), total, (orphans_recorded_in_chain, doas_recorded_in_chain):
- 253 if orphans > orphans_recorded_in_chain else
- 254 if doas > doas_recorded_in_chain else
- 0
- )(*get_stale_counts()),
- ),
- block_target=current_work.value['bits'].target,
- desired_timestamp=int(time.time() - current_work2.value['clock_offset']),
- desired_target=desired_share_target,
- net=net,
- )
+ if True:
+ share_info, generate_tx = p2pool_data.Share.generate_transaction(
+ tracker=tracker,
+ share_data=dict(
+ previous_share_hash=current_work.value['best_share_hash'],
+ coinbase=(mm_data + current_work.value['coinbaseflags'])[:100],
+ nonce=random.randrange(2**32),
+ pubkey_hash=pubkey_hash,
+ subsidy=current_work2.value['subsidy'],
+ donation=math.perfect_round(65535*args.donation_percentage/100),
+ stale_info=(lambda (orphans, doas), total, (orphans_recorded_in_chain, doas_recorded_in_chain):
+ 253 if orphans > orphans_recorded_in_chain else
+ 254 if doas > doas_recorded_in_chain else
+ 0
+ )(*get_stale_counts()),
+ desired_version=1,
+ ),
+ block_target=current_work.value['bits'].target,
+ desired_timestamp=int(time.time() - current_work2.value['clock_offset']),
+ desired_target=desired_share_target,
+ ref_merkle_link=dict(branch=[], index=0),
+ net=net,
+ )
target = net.PARENT.SANE_MAX_TARGET
if desired_pseudoshare_target is None:
transactions = [generate_tx] + list(current_work2.value['transactions'])
packed_generate_tx = bitcoin_data.tx_type.pack(generate_tx)
- merkle_root = bitcoin_data.check_merkle_branch(bitcoin_data.hash256(packed_generate_tx), 0, current_work2.value['merkle_branch'])
+ merkle_root = bitcoin_data.check_merkle_link(bitcoin_data.hash256(packed_generate_tx), current_work2.value['merkle_link'])
getwork_time = time.time()
- merkle_branch = current_work2.value['merkle_branch']
+ merkle_link = current_work2.value['merkle_link']
print 'New work for worker! Difficulty: %.06f Share difficulty: %.06f Total block value: %.6f %s including %i transactions' % (
bitcoin_data.target_to_difficulty(target),
received_header_hashes = set()
def got_response(header, request):
+ user, _, _, _ = self.get_user_details(request)
assert header['merkle_root'] == merkle_root
header_hash = bitcoin_data.hash256(bitcoin_data.block_header_type.pack(header))
print
print 'GOT BLOCK FROM MINER! Passing to bitcoind! %s%064x' % (net.PARENT.BLOCK_EXPLORER_URL_PREFIX, header_hash)
print
- recent_blocks.append(dict(ts=time.time(), hash='%064x' % (header_hash,)))
except:
log.err(None, 'Error while processing potential block:')
merkle_tx=dict(
tx=transactions[0],
block_hash=header_hash,
- merkle_branch=merkle_branch,
- index=0,
+ merkle_link=merkle_link,
),
- merkle_branch=bitcoin_data.calculate_merkle_branch(hashes, index),
- index=index,
+ merkle_link=bitcoin_data.calculate_merkle_link(hashes, index),
parent_block_header=header,
)).encode('hex'),
)
except:
log.err(None, 'Error while processing merged mining POW:')
- if pow_hash <= share_info['bits'].target:
+ if pow_hash <= share_info['bits'].target and header_hash not in received_header_hashes:
min_header = dict(header);del min_header['merkle_root']
- hash_link = p2pool_data.prefix_to_hash_link(packed_generate_tx[:-32-4], p2pool_data.gentx_before_refhash)
- share = p2pool_data.Share(net, None, min_header, share_info, hash_link=hash_link, merkle_branch=merkle_branch, other_txs=transactions[1:] if pow_hash <= header['bits'].target else None)
+ hash_link = p2pool_data.prefix_to_hash_link(packed_generate_tx[:-32-4], p2pool_data.Share.gentx_before_refhash)
+ share = p2pool_data.Share(net, None, dict(
+ min_header=min_header, share_info=share_info, hash_link=hash_link,
+ ref_merkle_link=dict(branch=[], index=0),
+ ), merkle_link=merkle_link, other_txs=transactions[1:] if pow_hash <= header['bits'].target else None)
print 'GOT SHARE! %s %s prev %s age %.2fs%s' % (
request.getUser(),
shared_share_hashes.add(share.hash)
except:
log.err(None, 'Error forwarding block solution:')
+
+ share_received.happened(bitcoin_data.target_to_average_attempts(share.target), not on_time)
if pow_hash > target:
print 'Worker %s submitted share with hash > target:' % (request.getUser(),)
else:
received_header_hashes.add(header_hash)
- pseudoshare_received.happened(bitcoin_data.target_to_average_attempts(target), not on_time, request.getUser() if request.getPassword() == vip_pass else None)
+ pseudoshare_received.happened(bitcoin_data.target_to_average_attempts(target), not on_time, user)
self.recent_shares_ts_work.append((time.time(), bitcoin_data.target_to_average_attempts(target)))
while len(self.recent_shares_ts_work) > 50:
self.recent_shares_ts_work.pop(0)
- local_rate_monitor.add_datum(dict(work=bitcoin_data.target_to_average_attempts(target), dead=not on_time, user=request.getUser()))
+ local_rate_monitor.add_datum(dict(work=bitcoin_data.target_to_average_attempts(target), dead=not on_time, user=user))
return on_time
get_current_txouts = lambda: p2pool_data.get_expected_payouts(tracker, current_work.value['best_share_hash'], current_work.value['bits'].target, current_work2.value['subsidy'], net)
- web_root = web.get_web_root(tracker, current_work, current_work2, get_current_txouts, datadir_path, net, get_stale_counts, my_pubkey_hash, local_rate_monitor, args.worker_fee, p2p_node, my_share_hashes, recent_blocks, pseudoshare_received)
- worker_interface.WorkerInterface(WorkerBridge()).attach_to(web_root)
+ web_root = web.get_web_root(tracker, current_work, current_work2, get_current_txouts, datadir_path, net, get_stale_counts, my_pubkey_hash, local_rate_monitor, args.worker_fee, p2p_node, my_share_hashes, pseudoshare_received, share_received)
+ worker_interface.WorkerInterface(WorkerBridge()).attach_to(web_root, get_handler=lambda request: request.redirect('/static/'))
deferral.retry('Error binding to worker port:', traceback=False)(reactor.listenTCP)(worker_endpoint[1], server.Site(web_root), interface=worker_endpoint[0])
# done!
print 'Started successfully!'
+ print 'Go to http://127.0.0.1:%i/ to view graphs and statistics!' % (worker_endpoint[1],)
+ if args.donation_percentage > 0.51:
+ print '''Donating %.1f%% of work towards P2Pool's development. Thanks for the tip!''' % (args.donation_percentage,)
+ elif args.donation_percentage < 0.49:
+ print '''Donating %.1f%% of work towards P2Pool's development. Please donate to encourage further development of P2Pool!''' % (args.donation_percentage,)
+ else:
+ print '''Donating %.1f%% of work towards P2Pool's development. Thank you!''' % (args.donation_percentage,)
+ print 'You can increase this amount with --give-author argument! (or decrease it, if you must)'
print
from twisted.words.protocols import irc
class IRCClient(irc.IRCClient):
nickname = 'p2pool%02i' % (random.randrange(100),)
- channel = '#p2pool' if net.NAME == 'bitcoin' else '#p2pool-alt'
+ channel = net.ANNOUNCE_CHANNEL
def lineReceived(self, line):
- print repr(line)
+ if p2pool.DEBUG:
+ print repr(line)
irc.IRCClient.lineReceived(self, line)
def signedOn(self):
irc.IRCClient.signedOn(self)
self.factory.resetDelay()
self.join(self.channel)
- self.watch_id = tracker.verified.added.watch(self._new_share)
- self.announced_hashes = set()
- self.delayed_messages = {}
+ @defer.inlineCallbacks
+ def new_share(share):
+ if share.pow_hash <= share.header['bits'].target and abs(share.timestamp - time.time()) < 10*60:
+ yield deferral.sleep(random.expovariate(1/60))
+ message = '\x02%s BLOCK FOUND by %s! %s%064x' % (net.NAME.upper(), bitcoin_data.script2_to_address(share.new_script, net.PARENT), net.PARENT.BLOCK_EXPLORER_URL_PREFIX, share.header_hash)
+ if message not in self.recent_messages:
+ self.say(self.channel, message)
+ self._remember_message(message)
+ self.watch_id = tracker.verified.added.watch(new_share)
+ self.recent_messages = []
+ def _remember_message(self, message):
+ self.recent_messages.append(message)
+ while len(self.recent_messages) > 100:
+ self.recent_messages.pop(0)
def privmsg(self, user, channel, message):
- if channel == self.channel and message in self.delayed_messages:
- self.delayed_messages.pop(message).cancel()
- def _new_share(self, share):
- if share.pow_hash <= share.header['bits'].target and share.header_hash not in self.announced_hashes and abs(share.timestamp - time.time()) < 10*60:
- self.announced_hashes.add(share.header_hash)
- message = '\x02%s BLOCK FOUND by %s! %s%064x' % (net.NAME.upper(), bitcoin_data.script2_to_address(share.new_script, net.PARENT), net.PARENT.BLOCK_EXPLORER_URL_PREFIX, share.header_hash)
- self.delayed_messages[message] = reactor.callLater(random.expovariate(1/5), lambda: (self.say(self.channel, message), self.delayed_messages.pop(message)))
+ if channel == self.channel:
+ self._remember_message(message)
def connectionLost(self, reason):
tracker.verified.added.unwatch(self.watch_id)
print 'IRC connection lost:', reason.getErrorMessage()
100*stale_prop,
math.format_dt(2**256 / current_work.value['bits'].target / real_att_s),
)
+
+ for warning in p2pool_data.get_warnings(tracker, current_work):
+ print >>sys.stderr, '#'*40
+ print >>sys.stderr, '>>> Warning: ' + warning
+ print >>sys.stderr, '#'*40
if this_str != last_str or time.time() > last_time + 15:
print this_str
log.err()
status_thread()
except:
- log.err(None, 'Fatal error:')
reactor.stop()
+ log.err(None, 'Fatal error:')
def run():
class FixedArgumentParser(argparse.ArgumentParser):
help='call getauxblock on this url to get work for merged mining (example: http://ncuser:ncpass@127.0.0.1:10332/)',
type=str, action='append', default=[], dest='merged_urls')
parser.add_argument('--give-author', metavar='DONATION_PERCENTAGE',
- help='donate this percentage of work to author of p2pool (default: 0.5)',
+ help='donate this percentage of work towards the development of p2pool (default: 0.5)',
type=float, action='store', default=0.5, dest='donation_percentage')
parser.add_argument('--irc-announce',
help='announce any blocks found on irc://irc.freenode.net/#p2pool',
parser.add_argument('--disable-upnp',
help='''don't attempt to use UPnP to forward p2pool's P2P port from the Internet to this computer''',
action='store_false', default=True, dest='upnp')
+ p2pool_group.add_argument('--max-conns', metavar='CONNS',
+ help='maximum incoming connections (default: 40)',
+ type=int, action='store', default=40, dest='p2pool_conns')
worker_group = parser.add_argument_group('worker interface')
worker_group.add_argument('-w', '--worker-port', metavar='PORT or ADDR:PORT',