import StringIO
import argparse
import base64
+import json
import os
import random
import sys
import traceback
import urlparse
+try:
+ from twisted.internet import iocpreactor
+ iocpreactor.install()
+except:
+ pass
+else:
+ print 'Using IOCP reactor!'
from twisted.internet import defer, reactor, protocol, task
from twisted.web import server
from twisted.python import log
shared_share_hashes = set()
ss = p2pool_data.ShareStore(os.path.join(datadir_path, 'shares.'), net)
known_verified = set()
- recent_blocks = []
print "Loading shares..."
for i, (mode, contents) in enumerate(ss.get_shares()):
if mode == 'share':
print 'Sending %i shares to %s:%i' % (len(shares), peer.addr[0], peer.addr[1])
return shares
+ @deferral.retry('Error submitting primary block: (will retry)', 10, 10)
+ def submit_block_p2p(block):
+ if factory.conn.value is None:
+ print >>sys.stderr, 'No bitcoind connection when block submittal attempted! %s%32x' % (net.PARENT.BLOCK_EXPLORER_URL_PREFIX, header_hash)
+ raise deferral.RetrySilentlyException()
+ factory.conn.value.send_block(block=block)
+
@deferral.retry('Error submitting block: (will retry)', 10, 10)
@defer.inlineCallbacks
- def submit_block(block, ignore_failure):
+ def submit_block_rpc(block, ignore_failure):
success = yield bitcoind.rpc_getmemorypool(bitcoin_data.block_type.pack(block).encode('hex'))
success_expected = net.PARENT.POW_FUNC(bitcoin_data.block_header_type.pack(block['header'])) <= block['header']['bits'].target
if (not success and success_expected and not ignore_failure) or (success and not success_expected):
print >>sys.stderr, 'Block submittal result: %s Expected: %s' % (result, expected_result)
+ def submit_block(block, ignore_failure):
+ submit_block_p2p(block)
+ submit_block_rpc(block, ignore_failure)
+
@tracker.verified.added.watch
def _(share):
if share.pow_hash <= share.header['bits'].target:
print
print 'GOT BLOCK FROM PEER! Passing to bitcoind! %s bitcoin: %s%064x' % (p2pool_data.format_hash(share.hash), net.PARENT.BLOCK_EXPLORER_URL_PREFIX, share.header_hash)
print
- recent_blocks.append(dict(ts=share.timestamp, hash='%064x' % (share.header_hash,)))
+ if current_work.value['previous_block'] in [share.header['previous_block'], share.header_hash]:
+ broadcast_share(share.hash)
print 'Joining p2pool network using port %i...' % (args.p2pool_port,)
defer.returnValue(((yield reactor.resolve(x)), net.P2P_PORT))
addrs = {}
- if os.path.exists(os.path.join(datadir_path, 'addrs.txt')):
+ if os.path.exists(os.path.join(datadir_path, 'addrs')):
+ try:
+ with open(os.path.join(datadir_path, 'addrs'), 'rb') as f:
+ addrs.update(dict((tuple(k), v) for k, v in json.loads(f.read())))
+ except:
+ print >>sys.stderr, 'error parsing addrs'
+ elif os.path.exists(os.path.join(datadir_path, 'addrs.txt')):
try:
addrs.update(dict(eval(x) for x in open(os.path.join(datadir_path, 'addrs.txt'))))
except:
- print >>sys.stderr, "error reading addrs"
+ print >>sys.stderr, "error reading addrs.txt"
for addr_df in map(parse, net.BOOTSTRAP_ADDRS):
try:
addr = yield addr_df
)
p2p_node.start()
- task.LoopingCall(lambda: open(os.path.join(datadir_path, 'addrs.txt'), 'w').writelines(repr(x) + '\n' for x in p2p_node.addr_store.iteritems())).start(60)
+ def save_addrs():
+ with open(os.path.join(datadir_path, 'addrs'), 'wb') as f:
+ f.write(json.dumps(p2p_node.addr_store.items()))
+ task.LoopingCall(save_addrs).start(60)
- # send share when the chain changes to their chain
- def work_changed(new_work):
- #print 'Work changed:', new_work
+ def broadcast_share(share_hash):
shares = []
- for share in tracker.get_chain(new_work['best_share_hash'], min(5, tracker.get_height(new_work['best_share_hash']))):
+ for share in tracker.get_chain(share_hash, min(5, tracker.get_height(share_hash))):
if share.hash in shared_share_hashes:
break
shared_share_hashes.add(share.hash)
for peer in p2p_node.peers.itervalues():
peer.sendShares([share for share in shares if share.peer is not peer])
- current_work.changed.watch(work_changed)
+ # send share when the chain changes to their chain
+ current_work.changed.watch(lambda new_work: broadcast_share(new_work['best_share_hash']))
def save_shares():
for share in tracker.get_chain(current_work.value['best_share_hash'], min(tracker.get_height(current_work.value['best_share_hash']), 2*net.CHAIN_LENGTH)):
print 'Listening for workers on %r port %i...' % (worker_endpoint[0], worker_endpoint[1])
- if os.path.exists(os.path.join(datadir_path, 'vip_pass')):
- with open(os.path.join(datadir_path, 'vip_pass'), 'rb') as f:
- vip_pass = f.read().strip('\r\n')
- else:
- vip_pass = '%016x' % (random.randrange(2**64),)
- with open(os.path.join(datadir_path, 'vip_pass'), 'wb') as f:
- f.write(vip_pass)
-
# setup worker logic
removed_unstales_var = variable.Variable((0, 0, 0))
'''Returns (orphans, doas), total, (orphans_recorded_in_chain, doas_recorded_in_chain)'''
my_shares = len(my_share_hashes)
my_doa_shares = len(my_doa_share_hashes)
- delta = tracker.verified.get_delta(current_work.value['best_share_hash'])
+ delta = tracker.verified.get_delta_to_last(current_work.value['best_share_hash'])
my_shares_in_chain = delta.my_count + removed_unstales_var.value[0]
my_doa_shares_in_chain = delta.my_doa_count + removed_doa_unstales_var.value
orphans_recorded_in_chain = delta.my_orphan_announce_count + removed_unstales_var.value[1]
self.new_work_event = current_work.changed
self.recent_shares_ts_work = []
- def preprocess_request(self, request):
+ def get_user_details(self, request):
user = request.getUser() if request.getUser() is not None else ''
desired_pseudoshare_target = None
except: # XXX blah
pubkey_hash = my_pubkey_hash
+ return user, pubkey_hash, desired_share_target, desired_pseudoshare_target
+
+ def preprocess_request(self, request):
+ user, pubkey_hash, desired_share_target, desired_pseudoshare_target = self.get_user_details(request)
return pubkey_hash, desired_share_target, desired_pseudoshare_target
def get_work(self, pubkey_hash, desired_share_target, desired_pseudoshare_target):
received_header_hashes = set()
def got_response(header, request):
+ user, _, _, _ = self.get_user_details(request)
assert header['merkle_root'] == merkle_root
header_hash = bitcoin_data.hash256(bitcoin_data.block_header_type.pack(header))
print
print 'GOT BLOCK FROM MINER! Passing to bitcoind! %s%064x' % (net.PARENT.BLOCK_EXPLORER_URL_PREFIX, header_hash)
print
- recent_blocks.append(dict(ts=time.time(), hash='%064x' % (header_hash,)))
except:
log.err(None, 'Error while processing potential block:')
else:
received_header_hashes.add(header_hash)
- pseudoshare_received.happened(bitcoin_data.target_to_average_attempts(target), not on_time, request.getUser(), request.getPassword() == vip_pass)
+ pseudoshare_received.happened(bitcoin_data.target_to_average_attempts(target), not on_time, user)
self.recent_shares_ts_work.append((time.time(), bitcoin_data.target_to_average_attempts(target)))
while len(self.recent_shares_ts_work) > 50:
self.recent_shares_ts_work.pop(0)
- local_rate_monitor.add_datum(dict(work=bitcoin_data.target_to_average_attempts(target), dead=not on_time, user=request.getUser()))
+ local_rate_monitor.add_datum(dict(work=bitcoin_data.target_to_average_attempts(target), dead=not on_time, user=user))
return on_time
get_current_txouts = lambda: p2pool_data.get_expected_payouts(tracker, current_work.value['best_share_hash'], current_work.value['bits'].target, current_work2.value['subsidy'], net)
- web_root = web.get_web_root(tracker, current_work, current_work2, get_current_txouts, datadir_path, net, get_stale_counts, my_pubkey_hash, local_rate_monitor, args.worker_fee, p2p_node, my_share_hashes, recent_blocks, pseudoshare_received, share_received)
+ web_root = web.get_web_root(tracker, current_work, current_work2, get_current_txouts, datadir_path, net, get_stale_counts, my_pubkey_hash, local_rate_monitor, args.worker_fee, p2p_node, my_share_hashes, pseudoshare_received, share_received)
worker_interface.WorkerInterface(WorkerBridge()).attach_to(web_root, get_handler=lambda request: request.redirect('/static/'))
deferral.retry('Error binding to worker port:', traceback=False)(reactor.listenTCP)(worker_endpoint[1], server.Site(web_root), interface=worker_endpoint[0])
math.format_dt(2**256 / current_work.value['bits'].target / real_att_s),
)
- desired_version_counts = p2pool_data.get_desired_version_counts(tracker, current_work.value['best_share_hash'], min(720, height))
- majority_desired_version = max(desired_version_counts, key=lambda k: desired_version_counts[k])
- if majority_desired_version not in [0, 1]:
+ for warning in p2pool_data.get_warnings(tracker, current_work):
print >>sys.stderr, '#'*40
- print >>sys.stderr, '>>> WARNING: A MAJORITY OF SHARES CONTAIN A VOTE FOR AN UNSUPPORTED SHARE IMPLEMENTATION! (v%i with %i%% support)' % (
- majority_desired_version, 100*desired_version_counts[majority_desired_version]/sum(desired_version_counts.itervalues()))
- print >>sys.stderr, '>>> An upgrade is likely necessary. Check http://p2pool.forre.st/ for more information.'
+ print >>sys.stderr, '>>> Warning: ' + warning
print >>sys.stderr, '#'*40
if this_str != last_str or time.time() > last_time + 15: