from __future__ import division
-import ConfigParser
-import StringIO
import base64
import json
import os
import bitcoin.p2p as bitcoin_p2p, bitcoin.data as bitcoin_data
from bitcoin import worker_interface, height_tracker
-from util import expiring_dict, fixargparse, jsonrpc, variable, deferral, math, logging
+from util import fixargparse, jsonrpc, variable, deferral, math, logging
from . import p2p, networks, web, work
import p2pool, p2pool.data as p2pool_data
@deferral.retry('Error getting work from bitcoind:', 3)
@defer.inlineCallbacks
-def getwork(bitcoind):
+def getwork(bitcoind, use_getblocktemplate=False):
+ def go():
+ if use_getblocktemplate:
+ return bitcoind.rpc_getblocktemplate(dict(mode='template'))
+ else:
+ return bitcoind.rpc_getmemorypool()
try:
- work = yield bitcoind.rpc_getmemorypool()
- except jsonrpc.Error, e:
- if e.code == -32601: # Method not found
+ work = yield go()
+ except jsonrpc.Error_for_code(-32601): # Method not found
+ use_getblocktemplate = not use_getblocktemplate
+ try:
+ work = yield go()
+ except jsonrpc.Error_for_code(-32601): # Method not found
print >>sys.stderr, 'Error: Bitcoin version too old! Upgrade to v0.5 or newer!'
raise deferral.RetrySilentlyException()
- raise
- packed_transactions = [x.decode('hex') for x in work['transactions']]
+ packed_transactions = [(x['data'] if isinstance(x, dict) else x).decode('hex') for x in work['transactions']]
+ if 'height' not in work:
+ work['height'] = (yield bitcoind.rpc_getblock(work['previousblockhash']))['height'] + 1
+ elif p2pool.DEBUG:
+ assert work['height'] == (yield bitcoind.rpc_getblock(work['previousblockhash']))['height'] + 1
defer.returnValue(dict(
version=work['version'],
previous_block=int(work['previousblockhash'], 16),
transactions=map(bitcoin_data.tx_type.unpack, packed_transactions),
- merkle_link=bitcoin_data.calculate_merkle_link([None] + map(bitcoin_data.hash256, packed_transactions), 0),
subsidy=work['coinbasevalue'],
- time=work['time'],
+ time=work['time'] if 'time' in work else work['curtime'],
bits=bitcoin_data.FloatingIntegerType().unpack(work['bits'].decode('hex')[::-1]) if isinstance(work['bits'], (str, unicode)) else bitcoin_data.FloatingInteger(work['bits']),
coinbaseflags=work['coinbaseflags'].decode('hex') if 'coinbaseflags' in work else ''.join(x.decode('hex') for x in work['coinbaseaux'].itervalues()) if 'coinbaseaux' in work else '',
- clock_offset=time.time() - work['time'],
+ height=work['height'],
last_update=time.time(),
+ use_getblocktemplate=use_getblocktemplate,
))
@defer.inlineCallbacks
print 'p2pool (version %s)' % (p2pool.__version__,)
print
+ traffic_happened = variable.Event()
+
+ @defer.inlineCallbacks
+ def connect_p2p():
+ # connect to bitcoind over bitcoin-p2p
+ print '''Testing bitcoind P2P connection to '%s:%s'...''' % (args.bitcoind_address, args.bitcoind_p2p_port)
+ factory = bitcoin_p2p.ClientFactory(net.PARENT)
+ reactor.connectTCP(args.bitcoind_address, args.bitcoind_p2p_port, factory)
+ yield factory.getProtocol() # waits until handshake is successful
+ print ' ...success!'
+ print
+ defer.returnValue(factory)
+
+ if args.testnet: # establish p2p connection first if testnet so bitcoind can work without connections
+ factory = yield connect_p2p()
+
# connect to bitcoind over JSON-RPC and do initial getmemorypool
- url = 'http://%s:%i/' % (args.bitcoind_address, args.bitcoind_rpc_port)
+ url = '%s://%s:%i/' % ('https' if args.bitcoind_rpc_ssl else 'http', args.bitcoind_address, args.bitcoind_rpc_port)
print '''Testing bitcoind RPC connection to '%s' with username '%s'...''' % (url, args.bitcoind_rpc_username)
bitcoind = jsonrpc.Proxy(url, dict(Authorization='Basic ' + base64.b64encode(args.bitcoind_rpc_username + ':' + args.bitcoind_rpc_password)), timeout=30)
@deferral.retry('Error while checking Bitcoin connection:', 1)
if not (yield net.PARENT.RPC_CHECK(bitcoind)):
print >>sys.stderr, " Check failed! Make sure that you're connected to the right bitcoind with --bitcoind-rpc-port!"
raise deferral.RetrySilentlyException()
- temp_work = yield getwork(bitcoind)
- if not net.VERSION_CHECK((yield bitcoind.rpc_getinfo())['version'], temp_work):
- print >>sys.stderr, ' Bitcoin version too old! BIP16 support required! Upgrade to 0.6.0rc4 or greater!'
+ if not net.VERSION_CHECK((yield bitcoind.rpc_getinfo())['version']):
+ print >>sys.stderr, ' Bitcoin version too old! Upgrade to 0.6.4 or newer!'
raise deferral.RetrySilentlyException()
- defer.returnValue(temp_work)
- temp_work = yield check()
+ yield check()
+ temp_work = yield getwork(bitcoind)
+
+ if not args.testnet:
+ factory = yield connect_p2p()
block_height_var = variable.Variable(None)
@defer.inlineCallbacks
yield poll_height()
task.LoopingCall(poll_height).start(60*60)
+ bitcoind_warning_var = variable.Variable(None)
+ @defer.inlineCallbacks
+ def poll_warnings():
+ errors = (yield deferral.retry('Error while calling getmininginfo:')(bitcoind.rpc_getmininginfo)())['errors']
+ bitcoind_warning_var.set(errors if errors != '' else None)
+ yield poll_warnings()
+ task.LoopingCall(poll_warnings).start(20*60)
+
print ' ...success!'
print ' Current block hash: %x' % (temp_work['previous_block'],)
print ' Current block height: %i' % (block_height_var.value,)
print
- # connect to bitcoind over bitcoin-p2p
- print '''Testing bitcoind P2P connection to '%s:%s'...''' % (args.bitcoind_address, args.bitcoind_p2p_port)
- factory = bitcoin_p2p.ClientFactory(net.PARENT)
- reactor.connectTCP(args.bitcoind_address, args.bitcoind_p2p_port, factory)
- yield factory.getProtocol() # waits until handshake is successful
- print ' ...success!'
- print
-
print 'Determining payout address...'
if args.pubkey_hash is None:
address_path = os.path.join(datadir_path, 'cached_payout_address')
while True:
flag = factory.new_block.get_deferred()
try:
- bitcoind_work.set((yield getwork(bitcoind)))
+ bitcoind_work.set((yield getwork(bitcoind, bitcoind_work.value['use_getblocktemplate'])))
except:
log.err()
yield defer.DeferredList([flag, deferral.sleep(15)], fireOnOneCallback=True)
def poll_header():
handle_header((yield factory.conn.value.get_block_header(bitcoind_work.value['previous_block'])))
bitcoind_work.changed.watch(lambda _: poll_header())
- yield poll_header()
+ yield deferral.retry('Error while requesting best block header:')(poll_header)()
# BEST SHARE
+ known_txs_var = variable.Variable({}) # hash -> tx
+ mining_txs_var = variable.Variable({}) # hash -> tx
get_height_rel_highest = yield height_tracker.get_height_rel_highest_func(bitcoind, factory, lambda: bitcoind_work.value['previous_block'], net)
- requested = expiring_dict.ExpiringDict(300)
- peer_heads = expiring_dict.ExpiringDict(300) # hash -> peers that know of it
best_share_var = variable.Variable(None)
+ desired_var = variable.Variable(None)
def set_best_share():
- best, desired = tracker.think(get_height_rel_highest, bitcoind_work.value['previous_block'], bitcoind_work.value['bits'])
+ best, desired = tracker.think(get_height_rel_highest, bitcoind_work.value['previous_block'], bitcoind_work.value['bits'], known_txs_var.value)
best_share_var.set(best)
-
- t = time.time()
- for peer2, share_hash in desired:
- if share_hash not in tracker.tails: # was received in the time tracker.think was running
- continue
- last_request_time, count = requested.get(share_hash, (None, 0))
- if last_request_time is not None and last_request_time - 5 < t < last_request_time + 10 * 1.5**count:
- continue
- potential_peers = set()
- for head in tracker.tails[share_hash]:
- potential_peers.update(peer_heads.get(head, set()))
- potential_peers = [peer for peer in potential_peers if peer.connected2]
- if count == 0 and peer2 is not None and peer2.connected2:
- peer = peer2
- else:
- peer = random.choice(potential_peers) if potential_peers and random.random() > .2 else peer2
- if peer is None:
- continue
-
- print 'Requesting parent share %s from %s' % (p2pool_data.format_hash(share_hash), '%s:%i' % peer.addr)
- peer.send_getshares(
- hashes=[share_hash],
- parents=2000,
- stops=list(set(tracker.heads) | set(
- tracker.get_nth_parent_hash(head, min(max(0, tracker.get_height_and_last(head)[0] - 1), 10)) for head in tracker.heads
- ))[:100],
- )
- requested[share_hash] = t, count + 1
+ desired_var.set(desired)
bitcoind_work.changed.watch(lambda _: set_best_share())
set_best_share()
-
print ' ...success!'
print
# setup p2p logic and join p2pool network
+ # update mining_txs according to getwork results
+ @bitcoind_work.changed.run_and_watch
+ def _(_=None):
+ new_mining_txs = {}
+ new_known_txs = dict(known_txs_var.value)
+ for tx in bitcoind_work.value['transactions']:
+ tx_hash = bitcoin_data.hash256(bitcoin_data.tx_type.pack(tx))
+ new_mining_txs[tx_hash] = tx
+ new_known_txs[tx_hash] = tx
+ mining_txs_var.set(new_mining_txs)
+ known_txs_var.set(new_known_txs)
+ # forward transactions seen to bitcoind
+ @known_txs_var.transitioned.watch
+ def _(before, after):
+ for tx_hash in set(after) - set(before):
+ factory.conn.value.send_tx(tx=after[tx_hash])
+
class Node(p2p.Node):
def handle_shares(self, shares, peer):
if len(shares) > 5:
tracker.add(share)
- if shares and peer is not None:
- peer_heads.setdefault(shares[0].hash, set()).add(peer)
-
if new_count:
set_best_share()
if len(shares) > 5:
print '... done processing %i shares. New: %i Have: %i/~%i' % (len(shares), new_count, len(tracker.items), 2*net.CHAIN_LENGTH)
+ @defer.inlineCallbacks
def handle_share_hashes(self, hashes, peer):
- t = time.time()
- get_hashes = []
- for share_hash in hashes:
- if share_hash in tracker.items:
- continue
- last_request_time, count = requested.get(share_hash, (None, 0))
- if last_request_time is not None and last_request_time - 5 < t < last_request_time + 10 * 1.5**count:
- continue
- print 'Got share hash, requesting! Hash: %s' % (p2pool_data.format_hash(share_hash),)
- get_hashes.append(share_hash)
- requested[share_hash] = t, count + 1
-
- if hashes and peer is not None:
- peer_heads.setdefault(hashes[0], set()).add(peer)
- if get_hashes:
- peer.send_getshares(hashes=get_hashes, parents=0, stops=[])
+ new_hashes = [x for x in hashes if x not in tracker.items]
+ if not new_hashes:
+ return
+ try:
+ shares = yield peer.get_shares(
+ hashes=new_hashes,
+ parents=0,
+ stops=[],
+ )
+ except:
+ log.err(None, 'in handle_share_hashes:')
+ else:
+ self.handle_shares(shares, peer)
def handle_get_shares(self, hashes, parents, stops, peer):
parents = min(parents, 1000//len(hashes))
@deferral.retry('Error submitting primary block: (will retry)', 10, 10)
def submit_block_p2p(block):
if factory.conn.value is None:
- print >>sys.stderr, 'No bitcoind connection when block submittal attempted! %s%32x' % (net.PARENT.BLOCK_EXPLORER_URL_PREFIX, bitcoin_data.hash256(bitcoin_data.block_header_type.pack(block['header'])))
+ print >>sys.stderr, 'No bitcoind connection when block submittal attempted! %s%064x' % (net.PARENT.BLOCK_EXPLORER_URL_PREFIX, bitcoin_data.hash256(bitcoin_data.block_header_type.pack(block['header'])))
raise deferral.RetrySilentlyException()
factory.conn.value.send_block(block=block)
@deferral.retry('Error submitting block: (will retry)', 10, 10)
@defer.inlineCallbacks
def submit_block_rpc(block, ignore_failure):
- success = yield bitcoind.rpc_getmemorypool(bitcoin_data.block_type.pack(block).encode('hex'))
+ if bitcoind_work.value['use_getblocktemplate']:
+ result = yield bitcoind.rpc_submitblock(bitcoin_data.block_type.pack(block).encode('hex'))
+ success = result is None
+ else:
+ result = yield bitcoind.rpc_getmemorypool(bitcoin_data.block_type.pack(block).encode('hex'))
+ success = result
success_expected = net.PARENT.POW_FUNC(bitcoin_data.block_header_type.pack(block['header'])) <= block['header']['bits'].target
if (not success and success_expected and not ignore_failure) or (success and not success_expected):
- print >>sys.stderr, 'Block submittal result: %s Expected: %s' % (success, success_expected)
+ print >>sys.stderr, 'Block submittal result: %s (%r) Expected: %s' % (success, result, success_expected)
def submit_block(block, ignore_failure):
submit_block_p2p(block)
@tracker.verified.added.watch
def _(share):
if share.pow_hash <= share.header['bits'].target:
- submit_block(share.as_block(tracker), ignore_failure=True)
+ block = share.as_block(tracker, known_txs_var.value)
+ if block is None:
+ print >>sys.stderr, 'GOT INCOMPLETE BLOCK FROM PEER! %s bitcoin: %s%064x' % (p2pool_data.format_hash(share.hash), net.PARENT.BLOCK_EXPLORER_URL_PREFIX, share.header_hash)
+ return
+ submit_block(block, ignore_failure=True)
print
print 'GOT BLOCK FROM PEER! Passing to bitcoind! %s bitcoin: %s%064x' % (p2pool_data.format_hash(share.hash), net.PARENT.BLOCK_EXPLORER_URL_PREFIX, share.header_hash)
print
addr_store=addrs,
connect_addrs=connect_addrs,
max_incoming_conns=args.p2pool_conns,
+ traffic_happened=traffic_happened,
+ known_txs_var=known_txs_var,
+ mining_txs_var=mining_txs_var,
)
p2p_node.start()
shares.append(share)
for peer in list(p2p_node.peers.itervalues()):
- yield peer.sendShares([share for share in shares if share.peer is not peer])
+ yield peer.sendShares([share for share in shares if share.peer is not peer], tracker, known_txs_var.value, include_txs_with=[share_hash])
# send share when the chain changes to their chain
best_share_var.changed.watch(broadcast_share)
ss.add_verified_hash(share.hash)
task.LoopingCall(save_shares).start(60)
+ @apply
+ @defer.inlineCallbacks
+ def download_shares():
+ while True:
+ desired = yield desired_var.get_when_satisfies(lambda val: len(val) != 0)
+ peer2, share_hash = random.choice(desired)
+
+ if len(p2p_node.peers) == 0:
+ yield deferral.sleep(1)
+ continue
+ peer = random.choice(p2p_node.peers.values())
+
+ print 'Requesting parent share %s from %s' % (p2pool_data.format_hash(share_hash), '%s:%i' % peer.addr)
+ try:
+ shares = yield peer.get_shares(
+ hashes=[share_hash],
+ parents=500,
+ stops=[],
+ )
+ except:
+ log.err(None, 'in download_shares:')
+ continue
+
+ if not shares:
+ yield deferral.sleep(1) # sleep so we don't keep rerequesting the same share nobody has
+ continue
+ p2p_node.handle_shares(shares, peer)
+
print ' ...success!'
print
get_current_txouts = lambda: p2pool_data.get_expected_payouts(tracker, best_share_var.value, bitcoind_work.value['bits'].target, bitcoind_work.value['subsidy'], net)
wb = work.WorkerBridge(my_pubkey_hash, net, args.donation_percentage, bitcoind_work, best_block_header, merged_urls, best_share_var, tracker, my_share_hashes, my_doa_share_hashes, args.worker_fee, p2p_node, submit_block, set_best_share, broadcast_share, block_height_var)
- web_root = web.get_web_root(tracker, bitcoind_work, get_current_txouts, datadir_path, net, wb.get_stale_counts, my_pubkey_hash, wb.local_rate_monitor, args.worker_fee, p2p_node, wb.my_share_hashes, wb.pseudoshare_received, wb.share_received, best_share_var)
+ web_root = web.get_web_root(tracker, bitcoind_work, get_current_txouts, datadir_path, net, wb.get_stale_counts, my_pubkey_hash, wb.local_rate_monitor, args.worker_fee, p2p_node, wb.my_share_hashes, wb.pseudoshare_received, wb.share_received, best_share_var, bitcoind_warning_var, traffic_happened)
worker_interface.WorkerInterface(wb).attach_to(web_root, get_handler=lambda request: request.redirect('/static/'))
deferral.retry('Error binding to worker port:', traceback=False)(reactor.listenTCP)(worker_endpoint[1], server.Site(web_root), interface=worker_endpoint[0])
if share.pow_hash <= share.header['bits'].target and abs(share.timestamp - time.time()) < 10*60:
yield deferral.sleep(random.expovariate(1/60))
message = '\x02%s BLOCK FOUND by %s! %s%064x' % (net.NAME.upper(), bitcoin_data.script2_to_address(share.new_script, net.PARENT), net.PARENT.BLOCK_EXPLORER_URL_PREFIX, share.header_hash)
- if message not in self.recent_messages:
+ if all('%x' % (share.header_hash,) not in old_message for old_message in self.recent_messages):
self.say(self.channel, message)
self._remember_message(message)
self.watch_id = tracker.verified.added.watch(new_share)
while True:
yield deferral.sleep(3)
try:
- if time.time() > bitcoind_work.value['last_update'] + 60:
- print >>sys.stderr, '''---> LOST CONTACT WITH BITCOIND for %s! Check that it isn't frozen or dead! <---''' % (math.format_dt(time.time() - bitcoind_work.value['last_update']),)
-
height = tracker.get_height(best_share_var.value)
this_str = 'P2Pool: %i shares in chain (%i verified/%i total) Peers: %i (%i incoming)' % (
height,
math.format_dt(2**256 / bitcoind_work.value['bits'].target / real_att_s),
)
- for warning in p2pool_data.get_warnings(tracker, best_share_var.value, net):
+ for warning in p2pool_data.get_warnings(tracker, best_share_var.value, net, bitcoind_warning_var.value, bitcoind_work.value):
print >>sys.stderr, '#'*40
print >>sys.stderr, '>>> Warning: ' + warning
print >>sys.stderr, '#'*40
bitcoind_group.add_argument('--bitcoind-rpc-port', metavar='BITCOIND_RPC_PORT',
help='''connect to JSON-RPC interface at this port (default: %s <read from bitcoin.conf if password not provided>)''' % ', '.join('%s:%i' % (name, net.PARENT.RPC_PORT) for name, net in sorted(realnets.items())),
type=int, action='store', default=None, dest='bitcoind_rpc_port')
+ bitcoind_group.add_argument('--bitcoind-rpc-ssl',
+ help='connect to JSON-RPC interface using SSL',
+ action='store_true', default=False, dest='bitcoind_rpc_ssl')
bitcoind_group.add_argument('--bitcoind-p2p-port', metavar='BITCOIND_P2P_PORT',
help='''connect to P2P interface at this port (default: %s <read from bitcoin.conf if password not provided>)''' % ', '.join('%s:%i' % (name, net.PARENT.P2P_PORT) for name, net in sorted(realnets.items())),
type=int, action='store', default=None, dest='bitcoind_p2p_port')
args.bitcoind_rpc_username, args.bitcoind_rpc_password = ([None, None] + args.bitcoind_rpc_userpass)[-2:]
if args.bitcoind_rpc_password is None:
- if not hasattr(net.PARENT, 'CONF_FILE_FUNC'):
- parser.error('This network has no configuration file function. Manually enter your RPC password.')
conf_path = net.PARENT.CONF_FILE_FUNC()
if not os.path.exists(conf_path):
parser.error('''Bitcoin configuration file not found. Manually enter your RPC password.\r\n'''
'''If you actually haven't created a configuration file, you should create one at %s with the text:\r\n'''
'''\r\n'''
'''server=1\r\n'''
- '''rpcpassword=%x''' % (conf_path, random.randrange(2**128)))
- with open(conf_path, 'rb') as f:
- cp = ConfigParser.RawConfigParser()
- cp.readfp(StringIO.StringIO('[x]\r\n' + f.read()))
- for conf_name, var_name, var_type in [
- ('rpcuser', 'bitcoind_rpc_username', str),
- ('rpcpassword', 'bitcoind_rpc_password', str),
- ('rpcport', 'bitcoind_rpc_port', int),
- ('port', 'bitcoind_p2p_port', int),
- ]:
- if getattr(args, var_name) is None and cp.has_option('x', conf_name):
- setattr(args, var_name, var_type(cp.get('x', conf_name)))
+ '''rpcpassword=%x\r\n'''
+ '''\r\n'''
+ '''Keep that password secret! After creating the file, restart Bitcoin.''' % (conf_path, random.randrange(2**128)))
+ conf = open(conf_path, 'rb').read()
+ contents = {}
+ for line in conf.splitlines(True):
+ if '#' in line:
+ line = line[:line.index('#')]
+ if '=' not in line:
+ continue
+ k, v = line.split('=', 1)
+ contents[k.strip()] = v.strip()
+ for conf_name, var_name, var_type in [
+ ('rpcuser', 'bitcoind_rpc_username', str),
+ ('rpcpassword', 'bitcoind_rpc_password', str),
+ ('rpcport', 'bitcoind_rpc_port', int),
+ ('port', 'bitcoind_p2p_port', int),
+ ]:
+ if getattr(args, var_name) is None and conf_name in contents:
+ setattr(args, var_name, var_type(contents[conf_name]))
if args.bitcoind_rpc_password is None:
parser.error('''Bitcoin configuration file didn't contain an rpcpassword= line! Add one!''')