import StringIO
import json
import random
+import sys
from twisted.internet import defer
import p2pool
from p2pool.bitcoin import getwork
-from p2pool.util import jsonrpc, variable
+from p2pool.util import expiring_dict, jsonrpc, variable
class _Page(jsonrpc.Server):
def __init__(self, parent, long_poll):
def get_work(self, request):
raise NotImplementedError()
-
- def got_response(self, block_header):
- print self.got_response, "called with", block_header
class WorkerInterface(object):
def __init__(self, worker_bridge):
self.work_cache = {} # request_process_func(request) -> blockattempt
self.work_cache_times = self.worker_bridge.new_work_event.times
+
+ self.merkle_root_to_handler = expiring_dict.ExpiringDict(300)
def attach_to(self, res):
res.putChild('', _Page(self, long_poll=False))
request.setHeader('X-Is-P2Pool', 'true')
if data is not None:
- defer.returnValue(self.worker_bridge.got_response(getwork.decode_data(data), request))
+ header = getwork.decode_data(data)
+ if header['merkle_root'] not in self.merkle_root_to_handler:
+ print >>sys.stderr, '''Couldn't link returned work's merkle root with its handler. This should only happen if this process was recently restarted!'''
+ defer.returnValue(False)
+ defer.returnValue(self.merkle_root_to_handler[header['merkle_root']](header, request))
if p2pool.DEBUG:
id = random.randrange(1000, 10000)
if key in self.work_cache:
res, orig_timestamp = self.work_cache.pop(key)
else:
- res = self.worker_bridge.get_work(*key)
+ res, handler = self.worker_bridge.get_work(*key)
+ assert res.merkle_root not in self.merkle_root_to_handler
+ self.merkle_root_to_handler[res.merkle_root] = handler
orig_timestamp = res.timestamp
if res.timestamp + 12 < orig_timestamp + 600:
def __init__(self):
worker_interface.WorkerBridge.__init__(self)
self.new_work_event = current_work.changed
-
- self.merkle_root_to_transactions = expiring_dict.ExpiringDict(300)
self.recent_shares_ts_work = []
def _get_payout_script_from_username(self, user):
transactions = [generate_tx] + list(current_work2.value['transactions'])
merkle_root = bitcoin_data.check_merkle_branch(bitcoin_data.hash256(bitcoin_data.tx_type.pack(generate_tx)), 0, current_work2.value['merkle_branch'])
- self.merkle_root_to_transactions[merkle_root] = share_info, transactions, time.time(), mm_later, target, current_work2.value['merkle_branch']
+
+ getwork_time = time.time()
+ merkle_branch = current_work2.value['merkle_branch']
print 'New work for worker! Difficulty: %.06f Share difficulty: %.06f Total block value: %.6f %s including %i transactions' % (
bitcoin_data.target_to_difficulty(target),
len(current_work2.value['transactions']),
)
- return bitcoin_getwork.BlockAttempt(
+ ba = bitcoin_getwork.BlockAttempt(
version=current_work.value['version'],
previous_block=current_work.value['previous_block'],
merkle_root=merkle_root,
bits=current_work.value['bits'],
share_target=target,
)
-
- def got_response(self, header, request):
- # match up with transactions
- if header['merkle_root'] not in self.merkle_root_to_transactions:
- print >>sys.stderr, '''Couldn't link returned work's merkle root with its transactions - should only happen if you recently restarted p2pool'''
- return False
- share_info, transactions, getwork_time, mm_later, target, merkle_branch = self.merkle_root_to_transactions[header['merkle_root']]
-
- pow_hash = net.PARENT.POW_FUNC(bitcoin_data.block_header_type.pack(header))
- on_time = current_work.value['best_share_hash'] == share_info['share_data']['previous_share_hash']
-
- try:
- if pow_hash <= header['bits'].target or p2pool.DEBUG:
- if factory.conn.value is not None:
- factory.conn.value.send_block(block=dict(header=header, txs=transactions))
- else:
- print >>sys.stderr, 'No bitcoind connection when block submittal attempted! Erp!'
- if pow_hash <= header['bits'].target:
- print
- print 'GOT BLOCK FROM MINER! Passing to bitcoind! bitcoin: %x' % (bitcoin_data.hash256(bitcoin_data.block_header_type.pack(header)),)
- print
- recent_blocks.append({ 'ts': time.time(), 'hash': '%x' % (bitcoin_data.hash256(bitcoin_data.block_header_type.pack(header)),) })
- except:
- log.err(None, 'Error while processing potential block:')
- for aux_work, index, hashes in mm_later:
- try:
- if pow_hash <= aux_work['target'] or p2pool.DEBUG:
- df = deferral.retry('Error submitting merged block: (will retry)', 10, 10)(aux_work['merged_proxy'].rpc_getauxblock)(
- pack.IntType(256, 'big').pack(aux_work['hash']).encode('hex'),
- bitcoin_data.aux_pow_type.pack(dict(
- merkle_tx=dict(
- tx=transactions[0],
- block_hash=bitcoin_data.hash256(bitcoin_data.block_header_type.pack(header)),
- merkle_branch=merkle_branch,
- index=0,
- ),
- merkle_branch=bitcoin_data.calculate_merkle_branch(hashes, index),
- index=index,
- parent_block_header=header,
- )).encode('hex'),
- )
- @df.addCallback
- def _(result):
- if result != (pow_hash <= aux_work['target']):
- print >>sys.stderr, 'Merged block submittal result: %s Expected: %s' % (result, pow_hash <= aux_work['target'])
- else:
- print 'Merged block submittal result: %s' % (result,)
- @df.addErrback
- def _(err):
- log.err(err, 'Error submitting merged block:')
- except:
- log.err(None, 'Error while processing merged mining POW:')
-
- if pow_hash <= share_info['bits'].target:
- share = p2pool_data.Share(net, header, share_info, merkle_branch=merkle_branch, other_txs=transactions[1:] if pow_hash <= header['bits'].target else None)
- print 'GOT SHARE! %s %s prev %s age %.2fs%s' % (
- request.getUser(),
- p2pool_data.format_hash(share.hash),
- p2pool_data.format_hash(share.previous_hash),
- time.time() - getwork_time,
- ' DEAD ON ARRIVAL' if not on_time else '',
- )
- my_share_hashes.add(share.hash)
- if not on_time:
- my_doa_share_hashes.add(share.hash)
+ def got_response(header, request):
+ assert header['merkle_root'] == merkle_root
- tracker.add(share)
- if not p2pool.DEBUG:
- tracker.verified.add(share)
- set_real_work2()
+ pow_hash = net.PARENT.POW_FUNC(bitcoin_data.block_header_type.pack(header))
+ on_time = current_work.value['best_share_hash'] == share_info['share_data']['previous_share_hash']
try:
if pow_hash <= header['bits'].target or p2pool.DEBUG:
- for peer in p2p_node.peers.itervalues():
- peer.sendShares([share])
- shared_share_hashes.add(share.hash)
+ if factory.conn.value is not None:
+ factory.conn.value.send_block(block=dict(header=header, txs=transactions))
+ else:
+ print >>sys.stderr, 'No bitcoind connection when block submittal attempted! Erp!'
+ if pow_hash <= header['bits'].target:
+ print
+ print 'GOT BLOCK FROM MINER! Passing to bitcoind! bitcoin: %x' % (bitcoin_data.hash256(bitcoin_data.block_header_type.pack(header)),)
+ print
+ recent_blocks.append({ 'ts': time.time(), 'hash': '%x' % (bitcoin_data.hash256(bitcoin_data.block_header_type.pack(header)),) })
except:
- log.err(None, 'Error forwarding block solution:')
-
- if pow_hash <= target:
- reactor.callLater(1, grapher.add_localrate_point, bitcoin_data.target_to_average_attempts(target), not on_time)
- if request.getPassword() == vip_pass:
- reactor.callLater(1, grapher.add_localminer_point, request.getUser(), bitcoin_data.target_to_average_attempts(target), not on_time)
- self.recent_shares_ts_work.append((time.time(), bitcoin_data.target_to_average_attempts(target)))
- while len(self.recent_shares_ts_work) > 50:
- self.recent_shares_ts_work.pop(0)
- recent_shares_ts_work2.append((time.time(), bitcoin_data.target_to_average_attempts(target), not on_time))
-
-
- if pow_hash > target:
- print 'Worker %s submitted share with hash > target:' % (request.getUser(),)
- print ' Hash: %56x' % (pow_hash,)
- print ' Target: %56x' % (target,)
+ log.err(None, 'Error while processing potential block:')
+
+ for aux_work, index, hashes in mm_later:
+ try:
+ if pow_hash <= aux_work['target'] or p2pool.DEBUG:
+ df = deferral.retry('Error submitting merged block: (will retry)', 10, 10)(aux_work['merged_proxy'].rpc_getauxblock)(
+ pack.IntType(256, 'big').pack(aux_work['hash']).encode('hex'),
+ bitcoin_data.aux_pow_type.pack(dict(
+ merkle_tx=dict(
+ tx=transactions[0],
+ block_hash=bitcoin_data.hash256(bitcoin_data.block_header_type.pack(header)),
+ merkle_branch=merkle_branch,
+ index=0,
+ ),
+ merkle_branch=bitcoin_data.calculate_merkle_branch(hashes, index),
+ index=index,
+ parent_block_header=header,
+ )).encode('hex'),
+ )
+ @df.addCallback
+ def _(result):
+ if result != (pow_hash <= aux_work['target']):
+ print >>sys.stderr, 'Merged block submittal result: %s Expected: %s' % (result, pow_hash <= aux_work['target'])
+ else:
+ print 'Merged block submittal result: %s' % (result,)
+ @df.addErrback
+ def _(err):
+ log.err(err, 'Error submitting merged block:')
+ except:
+ log.err(None, 'Error while processing merged mining POW:')
+
+ if pow_hash <= share_info['bits'].target:
+ share = p2pool_data.Share(net, header, share_info, merkle_branch=merkle_branch, other_txs=transactions[1:] if pow_hash <= header['bits'].target else None)
+ print 'GOT SHARE! %s %s prev %s age %.2fs%s' % (
+ request.getUser(),
+ p2pool_data.format_hash(share.hash),
+ p2pool_data.format_hash(share.previous_hash),
+ time.time() - getwork_time,
+ ' DEAD ON ARRIVAL' if not on_time else '',
+ )
+ my_share_hashes.add(share.hash)
+ if not on_time:
+ my_doa_share_hashes.add(share.hash)
+
+ tracker.add(share)
+ if not p2pool.DEBUG:
+ tracker.verified.add(share)
+ set_real_work2()
+
+ try:
+ if pow_hash <= header['bits'].target or p2pool.DEBUG:
+ for peer in p2p_node.peers.itervalues():
+ peer.sendShares([share])
+ shared_share_hashes.add(share.hash)
+ except:
+ log.err(None, 'Error forwarding block solution:')
+
+ if pow_hash <= target:
+ reactor.callLater(1, grapher.add_localrate_point, bitcoin_data.target_to_average_attempts(target), not on_time)
+ if request.getPassword() == vip_pass:
+ reactor.callLater(1, grapher.add_localminer_point, request.getUser(), bitcoin_data.target_to_average_attempts(target), not on_time)
+ self.recent_shares_ts_work.append((time.time(), bitcoin_data.target_to_average_attempts(target)))
+ while len(self.recent_shares_ts_work) > 50:
+ self.recent_shares_ts_work.pop(0)
+ recent_shares_ts_work2.append((time.time(), bitcoin_data.target_to_average_attempts(target), not on_time))
+
+
+ if pow_hash > target:
+ print 'Worker %s submitted share with hash > target:' % (request.getUser(),)
+ print ' Hash: %56x' % (pow_hash,)
+ print ' Target: %56x' % (target,)
+
+ return on_time
- return on_time
+ return ba, got_response
web_root = resource.Resource()
worker_interface.WorkerInterface(WorkerBridge()).attach_to(web_root)