1 from __future__ import division
16 if '--iocp' in sys.argv:
17 from twisted.internet import iocpreactor
19 from twisted.internet import defer, reactor, protocol, task
20 from twisted.web import server
21 from twisted.python import log
22 from nattraverso import portmapper, ipdiscover
24 import bitcoin.p2p as bitcoin_p2p, bitcoin.getwork as bitcoin_getwork, bitcoin.data as bitcoin_data
25 from bitcoin import worker_interface, height_tracker
26 from util import expiring_dict, jsonrpc, variable, deferral, math, logging, pack
27 from . import p2p, networks, web
28 import p2pool, p2pool.data as p2pool_data
30 @deferral.retry('Error getting work from bitcoind:', 3)
31 @defer.inlineCallbacks
32 def getwork(bitcoind):
34 work = yield bitcoind.rpc_getmemorypool()
35 except jsonrpc.Error, e:
36 if e.code == -32601: # Method not found
37 print >>sys.stderr, 'Error: Bitcoin version too old! Upgrade to v0.5 or newer!'
38 raise deferral.RetrySilentlyException()
40 packed_transactions = [x.decode('hex') for x in work['transactions']]
41 defer.returnValue(dict(
42 version=work['version'],
43 previous_block_hash=int(work['previousblockhash'], 16),
44 transactions=map(bitcoin_data.tx_type.unpack, packed_transactions),
45 merkle_link=bitcoin_data.calculate_merkle_link([0] + map(bitcoin_data.hash256, packed_transactions), 0), # using 0 is a bit of a hack, but will always work when index=0
46 subsidy=work['coinbasevalue'],
48 bits=bitcoin_data.FloatingIntegerType().unpack(work['bits'].decode('hex')[::-1]) if isinstance(work['bits'], (str, unicode)) else bitcoin_data.FloatingInteger(work['bits']),
49 coinbaseflags=work['coinbaseflags'].decode('hex') if 'coinbaseflags' in work else ''.join(x.decode('hex') for x in work['coinbaseaux'].itervalues()) if 'coinbaseaux' in work else '',
52 @defer.inlineCallbacks
53 def main(args, net, datadir_path, merged_urls, worker_endpoint):
55 print 'p2pool (version %s)' % (p2pool.__version__,)
58 # connect to bitcoind over JSON-RPC and do initial getmemorypool
59 url = 'http://%s:%i/' % (args.bitcoind_address, args.bitcoind_rpc_port)
60 print '''Testing bitcoind RPC connection to '%s' with username '%s'...''' % (url, args.bitcoind_rpc_username)
61 bitcoind = jsonrpc.Proxy(url, dict(Authorization='Basic ' + base64.b64encode(args.bitcoind_rpc_username + ':' + args.bitcoind_rpc_password)), timeout=30)
62 @deferral.retry('Error while checking Bitcoin connection:', 1)
63 @defer.inlineCallbacks
65 if not (yield net.PARENT.RPC_CHECK)(bitcoind):
66 print >>sys.stderr, " Check failed! Make sure that you're connected to the right bitcoind with --bitcoind-rpc-port!"
67 raise deferral.RetrySilentlyException()
68 temp_work = yield getwork(bitcoind)
69 if not net.VERSION_CHECK((yield bitcoind.rpc_getinfo())['version'], temp_work):
70 print >>sys.stderr, ' Bitcoin version too old! BIP16 support required! Upgrade to 0.6.0rc4 or greater!'
71 raise deferral.RetrySilentlyException()
72 defer.returnValue(temp_work)
73 temp_work = yield check()
75 block_height_var = variable.Variable(None)
76 @defer.inlineCallbacks
78 block_height_var.set((yield deferral.retry('Error while calling getblockcount:')(bitcoind.rpc_getblockcount)()))
80 task.LoopingCall(poll_height).start(60*60)
83 print ' Current block hash: %x' % (temp_work['previous_block_hash'],)
84 print ' Current block height: %i' % (block_height_var.value,)
87 # connect to bitcoind over bitcoin-p2p
88 print '''Testing bitcoind P2P connection to '%s:%s'...''' % (args.bitcoind_address, args.bitcoind_p2p_port)
89 factory = bitcoin_p2p.ClientFactory(net.PARENT)
90 reactor.connectTCP(args.bitcoind_address, args.bitcoind_p2p_port, factory)
91 yield factory.getProtocol() # waits until handshake is successful
95 print 'Determining payout address...'
96 if args.pubkey_hash is None:
97 address_path = os.path.join(datadir_path, 'cached_payout_address')
99 if os.path.exists(address_path):
100 with open(address_path, 'rb') as f:
101 address = f.read().strip('\r\n')
102 print ' Loaded cached address: %s...' % (address,)
106 if address is not None:
107 res = yield deferral.retry('Error validating cached address:', 5)(lambda: bitcoind.rpc_validateaddress(address))()
108 if not res['isvalid'] or not res['ismine']:
109 print ' Cached address is either invalid or not controlled by local bitcoind!'
113 print ' Getting payout address from bitcoind...'
114 address = yield deferral.retry('Error getting payout address from bitcoind:', 5)(lambda: bitcoind.rpc_getaccountaddress('p2pool'))()
116 with open(address_path, 'wb') as f:
119 my_pubkey_hash = bitcoin_data.address_to_pubkey_hash(address, net.PARENT)
121 my_pubkey_hash = args.pubkey_hash
122 print ' ...success! Payout address:', bitcoin_data.pubkey_hash_to_address(my_pubkey_hash, net.PARENT)
125 my_share_hashes = set()
126 my_doa_share_hashes = set()
128 tracker = p2pool_data.OkayTracker(net, my_share_hashes, my_doa_share_hashes)
129 shared_share_hashes = set()
130 ss = p2pool_data.ShareStore(os.path.join(datadir_path, 'shares.'), net)
131 known_verified = set()
132 print "Loading shares..."
133 for i, (mode, contents) in enumerate(ss.get_shares()):
135 if contents.hash in tracker.shares:
137 shared_share_hashes.add(contents.hash)
138 contents.time_seen = 0
139 tracker.add(contents)
140 if len(tracker.shares) % 1000 == 0 and tracker.shares:
141 print " %i" % (len(tracker.shares),)
142 elif mode == 'verified_hash':
143 known_verified.add(contents)
145 raise AssertionError()
146 print " ...inserting %i verified shares..." % (len(known_verified),)
147 for h in known_verified:
148 if h not in tracker.shares:
149 ss.forget_verified_share(h)
151 tracker.verified.add(tracker.shares[h])
152 print " ...done loading %i shares!" % (len(tracker.shares),)
154 tracker.removed.watch(lambda share: ss.forget_share(share.hash))
155 tracker.verified.removed.watch(lambda share: ss.forget_verified_share(share.hash))
156 tracker.removed.watch(lambda share: shared_share_hashes.discard(share.hash))
158 print 'Initializing work...'
163 bitcoind_work = variable.Variable(None)
165 @defer.inlineCallbacks
167 work = yield getwork(bitcoind)
168 bitcoind_work.set(dict(
169 version=work['version'],
170 previous_block=work['previous_block_hash'],
172 coinbaseflags=work['coinbaseflags'],
174 transactions=work['transactions'],
175 merkle_link=work['merkle_link'],
176 subsidy=work['subsidy'],
177 clock_offset=time.time() - work['time'],
178 last_update=time.time(),
180 yield poll_bitcoind()
182 @defer.inlineCallbacks
185 flag = factory.new_block.get_deferred()
187 yield poll_bitcoind()
190 yield defer.DeferredList([flag, deferral.sleep(15)], fireOnOneCallback=True)
195 best_block_header = variable.Variable(None)
196 def handle_header(new_header):
197 # check that header matches current target
198 if not (net.PARENT.POW_FUNC(bitcoin_data.block_header_type.pack(new_header)) <= bitcoind_work.value['bits'].target):
200 bitcoind_best_block = bitcoind_work.value['previous_block']
201 if (best_block_header.value is None
203 new_header['previous_block'] == bitcoind_best_block and
204 bitcoin_data.hash256(bitcoin_data.block_header_type.pack(best_block_header.value)) == bitcoind_best_block
205 ) # new is child of current and previous is current
207 bitcoin_data.hash256(bitcoin_data.block_header_type.pack(new_header)) == bitcoind_best_block and
208 best_block_header.value['previous_block'] != bitcoind_best_block
209 )): # new is current and previous is not a child of current
210 best_block_header.set(new_header)
211 @defer.inlineCallbacks
213 handle_header((yield factory.conn.value.get_block_header(bitcoind_work.value['previous_block'])))
214 bitcoind_work.changed.watch(lambda _: poll_header())
219 merged_work = variable.Variable({})
221 @defer.inlineCallbacks
222 def set_merged_work(merged_url, merged_userpass):
223 merged_proxy = jsonrpc.Proxy(merged_url, dict(Authorization='Basic ' + base64.b64encode(merged_userpass)))
225 auxblock = yield deferral.retry('Error while calling merged getauxblock:', 1)(merged_proxy.rpc_getauxblock)()
226 merged_work.set(dict(merged_work.value, **{auxblock['chainid']: dict(
227 hash=int(auxblock['hash'], 16),
228 target=pack.IntType(256).unpack(auxblock['target'].decode('hex')),
229 merged_proxy=merged_proxy,
231 yield deferral.sleep(1)
232 for merged_url, merged_userpass in merged_urls:
233 set_merged_work(merged_url, merged_userpass)
235 @merged_work.changed.watch
236 def _(new_merged_work):
237 print 'Got new merged mining work!'
241 current_work = variable.Variable(None)
243 get_height_rel_highest = yield height_tracker.get_height_rel_highest_func(bitcoind, factory, lambda: bitcoind_work.value['previous_block'], net)
244 requested = expiring_dict.ExpiringDict(300)
245 peer_heads = expiring_dict.ExpiringDict(300) # hash -> peers that know of it
247 best, desired = tracker.think(get_height_rel_highest, bitcoind_work.value['previous_block'], bitcoind_work.value['bits'])
249 t = dict(bitcoind_work.value)
251 if (best_block_header.value is not None and
252 best_block_header.value['previous_block'] == t['previous_block'] and
253 net.PARENT.POW_FUNC(bitcoin_data.block_header_type.pack(best_block_header.value)) <= t['bits'].target):
254 print 'Skipping from block %x to block %x!' % (best_block_header.value['previous_block'],
255 bitcoin_data.hash256(bitcoin_data.block_header_type.pack(best_block_header.value)))
257 version=best_block_header.value['version'],
258 previous_block=bitcoin_data.hash256(bitcoin_data.block_header_type.pack(best_block_header.value)),
259 bits=best_block_header.value['bits'], # not always true
261 time=best_block_header.value['timestamp'] + 600, # better way?
263 merkle_link=bitcoin_data.calculate_merkle_link([0], 0),
264 subsidy=net.PARENT.SUBSIDY_FUNC(block_height_var.value),
265 clock_offset=current_work.value['clock_offset'],
266 last_update=current_work.value['last_update'],
269 t['best_share_hash'] = best
270 t['mm_chains'] = merged_work.value
274 for peer2, share_hash in desired:
275 if share_hash not in tracker.tails: # was received in the time tracker.think was running
277 last_request_time, count = requested.get(share_hash, (None, 0))
278 if last_request_time is not None and last_request_time - 5 < t < last_request_time + 10 * 1.5**count:
280 potential_peers = set()
281 for head in tracker.tails[share_hash]:
282 potential_peers.update(peer_heads.get(head, set()))
283 potential_peers = [peer for peer in potential_peers if peer.connected2]
284 if count == 0 and peer2 is not None and peer2.connected2:
287 peer = random.choice(potential_peers) if potential_peers and random.random() > .2 else peer2
291 print 'Requesting parent share %s from %s' % (p2pool_data.format_hash(share_hash), '%s:%i' % peer.addr)
295 stops=list(set(tracker.heads) | set(
296 tracker.get_nth_parent_hash(head, min(max(0, tracker.get_height_and_last(head)[0] - 1), 10)) for head in tracker.heads
299 requested[share_hash] = t, count + 1
300 bitcoind_work.changed.watch(lambda _: compute_work())
301 merged_work.changed.watch(lambda _: compute_work())
302 best_block_header.changed.watch(lambda _: compute_work())
307 lp_signal = variable.Event()
309 @current_work.transitioned.watch
310 def _(before, after):
311 # trigger LP if version/previous_block/bits changed or transactions changed from nothing
312 if any(before[x] != after[x] for x in ['version', 'previous_block', 'bits', 'best_share_hash', 'mm_chains']) or (not before['transactions'] and after['transactions']):
319 # setup p2p logic and join p2pool network
321 class Node(p2p.Node):
322 def handle_shares(self, shares, peer):
324 print 'Processing %i shares from %s...' % (len(shares), '%s:%i' % peer.addr if peer is not None else None)
328 if share.hash in tracker.shares:
329 #print 'Got duplicate share, ignoring. Hash: %s' % (p2pool_data.format_hash(share.hash),)
334 #print 'Received share %s from %r' % (p2pool_data.format_hash(share.hash), share.peer.addr if share.peer is not None else None)
338 if shares and peer is not None:
339 peer_heads.setdefault(shares[0].hash, set()).add(peer)
345 print '... done processing %i shares. New: %i Have: %i/~%i' % (len(shares), new_count, len(tracker.shares), 2*net.CHAIN_LENGTH)
347 def handle_share_hashes(self, hashes, peer):
350 for share_hash in hashes:
351 if share_hash in tracker.shares:
353 last_request_time, count = requested.get(share_hash, (None, 0))
354 if last_request_time is not None and last_request_time - 5 < t < last_request_time + 10 * 1.5**count:
356 print 'Got share hash, requesting! Hash: %s' % (p2pool_data.format_hash(share_hash),)
357 get_hashes.append(share_hash)
358 requested[share_hash] = t, count + 1
360 if hashes and peer is not None:
361 peer_heads.setdefault(hashes[0], set()).add(peer)
363 peer.send_getshares(hashes=get_hashes, parents=0, stops=[])
365 def handle_get_shares(self, hashes, parents, stops, peer):
366 parents = min(parents, 1000//len(hashes))
369 for share_hash in hashes:
370 for share in tracker.get_chain(share_hash, min(parents + 1, tracker.get_height(share_hash))):
371 if share.hash in stops:
374 print 'Sending %i shares to %s:%i' % (len(shares), peer.addr[0], peer.addr[1])
377 def handle_bestblock(self, header, peer):
378 if net.PARENT.POW_FUNC(bitcoin_data.block_header_type.pack(header)) > header['bits'].target:
379 raise p2p.PeerMisbehavingError('received block header fails PoW test')
380 handle_header(header)
382 @deferral.retry('Error submitting primary block: (will retry)', 10, 10)
383 def submit_block_p2p(block):
384 if factory.conn.value is None:
385 print >>sys.stderr, 'No bitcoind connection when block submittal attempted! %s%32x' % (net.PARENT.BLOCK_EXPLORER_URL_PREFIX, bitcoin_data.hash256(bitcoin_data.block_header_type.pack(block['header'])))
386 raise deferral.RetrySilentlyException()
387 factory.conn.value.send_block(block=block)
389 @deferral.retry('Error submitting block: (will retry)', 10, 10)
390 @defer.inlineCallbacks
391 def submit_block_rpc(block, ignore_failure):
392 success = yield bitcoind.rpc_getmemorypool(bitcoin_data.block_type.pack(block).encode('hex'))
393 success_expected = net.PARENT.POW_FUNC(bitcoin_data.block_header_type.pack(block['header'])) <= block['header']['bits'].target
394 if (not success and success_expected and not ignore_failure) or (success and not success_expected):
395 print >>sys.stderr, 'Block submittal result: %s Expected: %s' % (success, success_expected)
397 def submit_block(block, ignore_failure):
398 submit_block_p2p(block)
399 submit_block_rpc(block, ignore_failure)
401 @tracker.verified.added.watch
403 if share.pow_hash <= share.header['bits'].target:
404 submit_block(share.as_block(tracker), ignore_failure=True)
406 print 'GOT BLOCK FROM PEER! Passing to bitcoind! %s bitcoin: %s%064x' % (p2pool_data.format_hash(share.hash), net.PARENT.BLOCK_EXPLORER_URL_PREFIX, share.header_hash)
409 if (get_height_rel_highest(share.header['previous_block']) > -5 or
410 current_work.value['previous_block'] in [share.header['previous_block'], share.header_hash]):
411 broadcast_share(share.hash)
413 reactor.callLater(5, spread) # so get_height_rel_highest can update
415 print 'Joining p2pool network using port %i...' % (args.p2pool_port,)
417 @defer.inlineCallbacks
420 ip, port = x.split(':')
421 defer.returnValue(((yield reactor.resolve(ip)), int(port)))
423 defer.returnValue(((yield reactor.resolve(x)), net.P2P_PORT))
426 if os.path.exists(os.path.join(datadir_path, 'addrs')):
428 with open(os.path.join(datadir_path, 'addrs'), 'rb') as f:
429 addrs.update(dict((tuple(k), v) for k, v in json.loads(f.read())))
431 print >>sys.stderr, 'error parsing addrs'
432 elif os.path.exists(os.path.join(datadir_path, 'addrs.txt')):
434 addrs.update(dict(eval(x) for x in open(os.path.join(datadir_path, 'addrs.txt'))))
436 print >>sys.stderr, "error reading addrs.txt"
437 for addr_df in map(parse, net.BOOTSTRAP_ADDRS):
440 if addr not in addrs:
441 addrs[addr] = (0, time.time(), time.time())
445 connect_addrs = set()
446 for addr_df in map(parse, args.p2pool_nodes):
448 connect_addrs.add((yield addr_df))
453 best_share_hash_func=lambda: current_work.value['best_share_hash'],
454 port=args.p2pool_port,
457 connect_addrs=connect_addrs,
458 max_incoming_conns=args.p2pool_conns,
463 with open(os.path.join(datadir_path, 'addrs'), 'wb') as f:
464 f.write(json.dumps(p2p_node.addr_store.items()))
465 task.LoopingCall(save_addrs).start(60)
467 @best_block_header.changed.watch
469 for peer in p2p_node.peers.itervalues():
470 peer.send_bestblock(header=header)
472 def broadcast_share(share_hash):
474 for share in tracker.get_chain(share_hash, min(5, tracker.get_height(share_hash))):
475 if share.hash in shared_share_hashes:
477 shared_share_hashes.add(share.hash)
480 for peer in p2p_node.peers.itervalues():
481 peer.sendShares([share for share in shares if share.peer is not peer])
483 # send share when the chain changes to their chain
484 current_work.changed.watch(lambda new_work: broadcast_share(new_work['best_share_hash']))
487 for share in tracker.get_chain(current_work.value['best_share_hash'], min(tracker.get_height(current_work.value['best_share_hash']), 2*net.CHAIN_LENGTH)):
489 if share.hash in tracker.verified.shares:
490 ss.add_verified_hash(share.hash)
491 task.LoopingCall(save_shares).start(60)
497 @defer.inlineCallbacks
501 is_lan, lan_ip = yield ipdiscover.get_local_ip()
503 pm = yield portmapper.get_port_mapper()
504 yield pm._upnp.add_port_mapping(lan_ip, args.p2pool_port, args.p2pool_port, 'p2pool', 'TCP')
505 except defer.TimeoutError:
509 log.err(None, 'UPnP error:')
510 yield deferral.sleep(random.expovariate(1/120))
513 # start listening for workers with a JSON-RPC server
515 print 'Listening for workers on %r port %i...' % (worker_endpoint[0], worker_endpoint[1])
519 removed_unstales_var = variable.Variable((0, 0, 0))
520 removed_doa_unstales_var = variable.Variable(0)
521 @tracker.verified.removed.watch
523 if share.hash in my_share_hashes and tracker.is_child_of(share.hash, current_work.value['best_share_hash']):
524 assert share.share_data['stale_info'] in [None, 'orphan', 'doa'] # we made these shares in this instance
525 removed_unstales_var.set((
526 removed_unstales_var.value[0] + 1,
527 removed_unstales_var.value[1] + (1 if share.share_data['stale_info'] == 'orphan' else 0),
528 removed_unstales_var.value[2] + (1 if share.share_data['stale_info'] == 'doa' else 0),
530 if share.hash in my_doa_share_hashes and tracker.is_child_of(share.hash, current_work.value['best_share_hash']):
531 removed_doa_unstales_var.set(removed_doa_unstales_var.value + 1)
533 def get_stale_counts():
534 '''Returns (orphans, doas), total, (orphans_recorded_in_chain, doas_recorded_in_chain)'''
535 my_shares = len(my_share_hashes)
536 my_doa_shares = len(my_doa_share_hashes)
537 delta = tracker.verified.get_delta_to_last(current_work.value['best_share_hash'])
538 my_shares_in_chain = delta.my_count + removed_unstales_var.value[0]
539 my_doa_shares_in_chain = delta.my_doa_count + removed_doa_unstales_var.value
540 orphans_recorded_in_chain = delta.my_orphan_announce_count + removed_unstales_var.value[1]
541 doas_recorded_in_chain = delta.my_dead_announce_count + removed_unstales_var.value[2]
543 my_shares_not_in_chain = my_shares - my_shares_in_chain
544 my_doa_shares_not_in_chain = my_doa_shares - my_doa_shares_in_chain
546 return (my_shares_not_in_chain - my_doa_shares_not_in_chain, my_doa_shares_not_in_chain), my_shares, (orphans_recorded_in_chain, doas_recorded_in_chain)
549 pseudoshare_received = variable.Event()
550 share_received = variable.Event()
551 local_rate_monitor = math.RateMonitor(10*60)
553 class WorkerBridge(worker_interface.WorkerBridge):
555 worker_interface.WorkerBridge.__init__(self)
556 self.new_work_event = lp_signal
557 self.recent_shares_ts_work = []
559 def get_user_details(self, request):
560 user = request.getUser() if request.getUser() is not None else ''
562 desired_pseudoshare_target = None
564 user, desired_pseudoshare_difficulty_str = user.rsplit('+', 1)
566 desired_pseudoshare_target = bitcoin_data.difficulty_to_target(float(desired_pseudoshare_difficulty_str))
570 desired_share_target = 2**256 - 1
572 user, min_diff_str = user.rsplit('/', 1)
574 desired_share_target = bitcoin_data.difficulty_to_target(float(min_diff_str))
578 if random.uniform(0, 100) < args.worker_fee:
579 pubkey_hash = my_pubkey_hash
582 pubkey_hash = bitcoin_data.address_to_pubkey_hash(user, net.PARENT)
584 pubkey_hash = my_pubkey_hash
586 return user, pubkey_hash, desired_share_target, desired_pseudoshare_target
588 def preprocess_request(self, request):
589 user, pubkey_hash, desired_share_target, desired_pseudoshare_target = self.get_user_details(request)
590 return pubkey_hash, desired_share_target, desired_pseudoshare_target
592 def get_work(self, pubkey_hash, desired_share_target, desired_pseudoshare_target):
593 if len(p2p_node.peers) == 0 and net.PERSIST:
594 raise jsonrpc.Error(-12345, u'p2pool is not connected to any peers')
595 if current_work.value['best_share_hash'] is None and net.PERSIST:
596 raise jsonrpc.Error(-12345, u'p2pool is downloading shares')
597 if time.time() > current_work.value['last_update'] + 60:
598 raise jsonrpc.Error(-12345, u'lost contact with bitcoind')
600 if current_work.value['mm_chains']:
601 tree, size = bitcoin_data.make_auxpow_tree(current_work.value['mm_chains'])
602 mm_hashes = [current_work.value['mm_chains'].get(tree.get(i), dict(hash=0))['hash'] for i in xrange(size)]
603 mm_data = '\xfa\xbemm' + bitcoin_data.aux_pow_coinbase_type.pack(dict(
604 merkle_root=bitcoin_data.merkle_hash(mm_hashes),
608 mm_later = [(aux_work, mm_hashes.index(aux_work['hash']), mm_hashes) for chain_id, aux_work in current_work.value['mm_chains'].iteritems()]
614 share_info, generate_tx = p2pool_data.Share.generate_transaction(
617 previous_share_hash=current_work.value['best_share_hash'],
618 coinbase=(mm_data + current_work.value['coinbaseflags'])[:100],
619 nonce=random.randrange(2**32),
620 pubkey_hash=pubkey_hash,
621 subsidy=current_work.value['subsidy'],
622 donation=math.perfect_round(65535*args.donation_percentage/100),
623 stale_info=(lambda (orphans, doas), total, (orphans_recorded_in_chain, doas_recorded_in_chain):
624 'orphan' if orphans > orphans_recorded_in_chain else
625 'doa' if doas > doas_recorded_in_chain else
627 )(*get_stale_counts()),
630 block_target=current_work.value['bits'].target,
631 desired_timestamp=int(time.time() - current_work.value['clock_offset']),
632 desired_target=desired_share_target,
633 ref_merkle_link=dict(branch=[], index=0),
637 if desired_pseudoshare_target is None:
639 if len(self.recent_shares_ts_work) == 50:
640 hash_rate = sum(work for ts, work in self.recent_shares_ts_work[1:])//(self.recent_shares_ts_work[-1][0] - self.recent_shares_ts_work[0][0])
642 target = min(target, int(2**256/hash_rate))
644 target = desired_pseudoshare_target
645 target = max(target, share_info['bits'].target)
646 for aux_work in current_work.value['mm_chains'].itervalues():
647 target = max(target, aux_work['target'])
648 target = math.clip(target, net.PARENT.SANE_TARGET_RANGE)
650 transactions = [generate_tx] + list(current_work.value['transactions'])
651 packed_generate_tx = bitcoin_data.tx_type.pack(generate_tx)
652 merkle_root = bitcoin_data.check_merkle_link(bitcoin_data.hash256(packed_generate_tx), current_work.value['merkle_link'])
654 getwork_time = time.time()
655 merkle_link = current_work.value['merkle_link']
657 print 'New work for worker! Difficulty: %.06f Share difficulty: %.06f Total block value: %.6f %s including %i transactions' % (
658 bitcoin_data.target_to_difficulty(target),
659 bitcoin_data.target_to_difficulty(share_info['bits'].target),
660 current_work.value['subsidy']*1e-8, net.PARENT.SYMBOL,
661 len(current_work.value['transactions']),
664 bits = current_work.value['bits']
665 previous_block = current_work.value['previous_block']
666 ba = bitcoin_getwork.BlockAttempt(
667 version=current_work.value['version'],
668 previous_block=current_work.value['previous_block'],
669 merkle_root=merkle_root,
670 timestamp=current_work.value['time'],
671 bits=current_work.value['bits'],
675 received_header_hashes = set()
677 def got_response(header, request):
678 header_hash = bitcoin_data.hash256(bitcoin_data.block_header_type.pack(header))
679 pow_hash = net.PARENT.POW_FUNC(bitcoin_data.block_header_type.pack(header))
681 if pow_hash <= header['bits'].target or p2pool.DEBUG:
682 submit_block(dict(header=header, txs=transactions), ignore_failure=False)
683 if pow_hash <= header['bits'].target:
685 print 'GOT BLOCK FROM MINER! Passing to bitcoind! %s%064x' % (net.PARENT.BLOCK_EXPLORER_URL_PREFIX, header_hash)
688 log.err(None, 'Error while processing potential block:')
690 user, _, _, _ = self.get_user_details(request)
691 assert header['merkle_root'] == merkle_root
692 assert header['previous_block'] == previous_block
693 assert header['bits'] == bits
695 on_time = current_work.value['best_share_hash'] == share_info['share_data']['previous_share_hash']
697 for aux_work, index, hashes in mm_later:
699 if pow_hash <= aux_work['target'] or p2pool.DEBUG:
700 df = deferral.retry('Error submitting merged block: (will retry)', 10, 10)(aux_work['merged_proxy'].rpc_getauxblock)(
701 pack.IntType(256, 'big').pack(aux_work['hash']).encode('hex'),
702 bitcoin_data.aux_pow_type.pack(dict(
705 block_hash=header_hash,
706 merkle_link=merkle_link,
708 merkle_link=bitcoin_data.calculate_merkle_link(hashes, index),
709 parent_block_header=header,
714 if result != (pow_hash <= aux_work['target']):
715 print >>sys.stderr, 'Merged block submittal result: %s Expected: %s' % (result, pow_hash <= aux_work['target'])
717 print 'Merged block submittal result: %s' % (result,)
720 log.err(err, 'Error submitting merged block:')
722 log.err(None, 'Error while processing merged mining POW:')
724 if pow_hash <= share_info['bits'].target and header_hash not in received_header_hashes:
725 min_header = dict(header);del min_header['merkle_root']
726 hash_link = p2pool_data.prefix_to_hash_link(packed_generate_tx[:-32-4], p2pool_data.Share.gentx_before_refhash)
727 share = p2pool_data.Share(net, None, dict(
728 min_header=min_header, share_info=share_info, hash_link=hash_link,
729 ref_merkle_link=dict(branch=[], index=0),
730 ), merkle_link=merkle_link, other_txs=transactions[1:] if pow_hash <= header['bits'].target else None)
732 print 'GOT SHARE! %s %s prev %s age %.2fs%s' % (
734 p2pool_data.format_hash(share.hash),
735 p2pool_data.format_hash(share.previous_hash),
736 time.time() - getwork_time,
737 ' DEAD ON ARRIVAL' if not on_time else '',
739 my_share_hashes.add(share.hash)
741 my_doa_share_hashes.add(share.hash)
745 tracker.verified.add(share)
749 if pow_hash <= header['bits'].target or p2pool.DEBUG:
750 for peer in p2p_node.peers.itervalues():
751 peer.sendShares([share])
752 shared_share_hashes.add(share.hash)
754 log.err(None, 'Error forwarding block solution:')
756 share_received.happened(bitcoin_data.target_to_average_attempts(share.target), not on_time)
758 if pow_hash > target:
759 print 'Worker %s submitted share with hash > target:' % (request.getUser(),)
760 print ' Hash: %56x' % (pow_hash,)
761 print ' Target: %56x' % (target,)
762 elif header_hash in received_header_hashes:
763 print >>sys.stderr, 'Worker %s @ %s submitted share more than once!' % (request.getUser(), request.getClientIP())
765 received_header_hashes.add(header_hash)
767 pseudoshare_received.happened(bitcoin_data.target_to_average_attempts(target), not on_time, user)
768 self.recent_shares_ts_work.append((time.time(), bitcoin_data.target_to_average_attempts(target)))
769 while len(self.recent_shares_ts_work) > 50:
770 self.recent_shares_ts_work.pop(0)
771 local_rate_monitor.add_datum(dict(work=bitcoin_data.target_to_average_attempts(target), dead=not on_time, user=user))
775 return ba, got_response
777 get_current_txouts = lambda: p2pool_data.get_expected_payouts(tracker, current_work.value['best_share_hash'], current_work.value['bits'].target, current_work.value['subsidy'], net)
779 web_root = web.get_web_root(tracker, current_work, get_current_txouts, datadir_path, net, get_stale_counts, my_pubkey_hash, local_rate_monitor, args.worker_fee, p2p_node, my_share_hashes, pseudoshare_received, share_received)
780 worker_interface.WorkerInterface(WorkerBridge()).attach_to(web_root, get_handler=lambda request: request.redirect('/static/'))
782 deferral.retry('Error binding to worker port:', traceback=False)(reactor.listenTCP)(worker_endpoint[1], server.Site(web_root), interface=worker_endpoint[0])
784 with open(os.path.join(os.path.join(datadir_path, 'ready_flag')), 'wb') as f:
792 print 'Started successfully!'
793 print 'Go to http://127.0.0.1:%i/ to view graphs and statistics!' % (worker_endpoint[1],)
794 if args.donation_percentage > 0.51:
795 print '''Donating %.1f%% of work towards P2Pool's development. Thanks for the tip!''' % (args.donation_percentage,)
796 elif args.donation_percentage < 0.49:
797 print '''Donating %.1f%% of work towards P2Pool's development. Please donate to encourage further development of P2Pool!''' % (args.donation_percentage,)
799 print '''Donating %.1f%% of work towards P2Pool's development. Thank you!''' % (args.donation_percentage,)
800 print 'You can increase this amount with --give-author argument! (or decrease it, if you must)'
804 if hasattr(signal, 'SIGALRM'):
805 signal.signal(signal.SIGALRM, lambda signum, frame: reactor.callFromThread(
806 sys.stderr.write, 'Watchdog timer went off at:\n' + ''.join(traceback.format_stack())
808 signal.siginterrupt(signal.SIGALRM, False)
809 task.LoopingCall(signal.alarm, 30).start(1)
811 if args.irc_announce:
812 from twisted.words.protocols import irc
813 class IRCClient(irc.IRCClient):
814 nickname = 'p2pool%02i' % (random.randrange(100),)
815 channel = net.ANNOUNCE_CHANNEL
816 def lineReceived(self, line):
819 irc.IRCClient.lineReceived(self, line)
821 irc.IRCClient.signedOn(self)
822 self.factory.resetDelay()
823 self.join(self.channel)
824 @defer.inlineCallbacks
825 def new_share(share):
826 if share.pow_hash <= share.header['bits'].target and abs(share.timestamp - time.time()) < 10*60:
827 yield deferral.sleep(random.expovariate(1/60))
828 message = '\x02%s BLOCK FOUND by %s! %s%064x' % (net.NAME.upper(), bitcoin_data.script2_to_address(share.new_script, net.PARENT), net.PARENT.BLOCK_EXPLORER_URL_PREFIX, share.header_hash)
829 if message not in self.recent_messages:
830 self.say(self.channel, message)
831 self._remember_message(message)
832 self.watch_id = tracker.verified.added.watch(new_share)
833 self.recent_messages = []
834 def _remember_message(self, message):
835 self.recent_messages.append(message)
836 while len(self.recent_messages) > 100:
837 self.recent_messages.pop(0)
838 def privmsg(self, user, channel, message):
839 if channel == self.channel:
840 self._remember_message(message)
841 def connectionLost(self, reason):
842 tracker.verified.added.unwatch(self.watch_id)
843 print 'IRC connection lost:', reason.getErrorMessage()
844 class IRCClientFactory(protocol.ReconnectingClientFactory):
846 reactor.connectTCP("irc.freenode.net", 6667, IRCClientFactory())
848 @defer.inlineCallbacks
853 yield deferral.sleep(3)
855 if time.time() > current_work.value['last_update'] + 60:
856 print >>sys.stderr, '''---> LOST CONTACT WITH BITCOIND for %s! Check that it isn't frozen or dead! <---''' % (math.format_dt(time.time() - current_work.value['last_update']),)
858 height = tracker.get_height(current_work.value['best_share_hash'])
859 this_str = 'P2Pool: %i shares in chain (%i verified/%i total) Peers: %i (%i incoming)' % (
861 len(tracker.verified.shares),
864 sum(1 for peer in p2p_node.peers.itervalues() if peer.incoming),
865 ) + (' FDs: %i R/%i W' % (len(reactor.getReaders()), len(reactor.getWriters())) if p2pool.DEBUG else '')
867 datums, dt = local_rate_monitor.get_datums_in_last()
868 my_att_s = sum(datum['work']/dt for datum in datums)
869 this_str += '\n Local: %sH/s in last %s Local dead on arrival: %s Expected time to share: %s' % (
870 math.format(int(my_att_s)),
872 math.format_binomial_conf(sum(1 for datum in datums if datum['dead']), len(datums), 0.95),
873 math.format_dt(2**256 / tracker.shares[current_work.value['best_share_hash']].max_target / my_att_s) if my_att_s and current_work.value['best_share_hash'] else '???',
877 (stale_orphan_shares, stale_doa_shares), shares, _ = get_stale_counts()
878 stale_prop = p2pool_data.get_average_stale_prop(tracker, current_work.value['best_share_hash'], min(60*60//net.SHARE_PERIOD, height))
879 real_att_s = p2pool_data.get_pool_attempts_per_second(tracker, current_work.value['best_share_hash'], min(height - 1, 60*60//net.SHARE_PERIOD)) / (1 - stale_prop)
881 this_str += '\n Shares: %i (%i orphan, %i dead) Stale rate: %s Efficiency: %s Current payout: %.4f %s' % (
882 shares, stale_orphan_shares, stale_doa_shares,
883 math.format_binomial_conf(stale_orphan_shares + stale_doa_shares, shares, 0.95),
884 math.format_binomial_conf(stale_orphan_shares + stale_doa_shares, shares, 0.95, lambda x: (1 - x)/(1 - stale_prop)),
885 get_current_txouts().get(bitcoin_data.pubkey_hash_to_script2(my_pubkey_hash), 0)*1e-8, net.PARENT.SYMBOL,
887 this_str += '\n Pool: %sH/s Stale rate: %.1f%% Expected time to block: %s' % (
888 math.format(int(real_att_s)),
890 math.format_dt(2**256 / current_work.value['bits'].target / real_att_s),
893 for warning in p2pool_data.get_warnings(tracker, current_work, net):
894 print >>sys.stderr, '#'*40
895 print >>sys.stderr, '>>> Warning: ' + warning
896 print >>sys.stderr, '#'*40
898 if this_str != last_str or time.time() > last_time + 15:
901 last_time = time.time()
907 log.err(None, 'Fatal error:')
910 class FixedArgumentParser(argparse.ArgumentParser):
911 def _read_args_from_files(self, arg_strings):
912 # expand arguments referencing files
914 for arg_string in arg_strings:
916 # for regular arguments, just add them back into the list
917 if not arg_string or arg_string[0] not in self.fromfile_prefix_chars:
918 new_arg_strings.append(arg_string)
920 # replace arguments referencing files with the file content
923 args_file = open(arg_string[1:])
926 for arg_line in args_file.read().splitlines():
927 for arg in self.convert_arg_line_to_args(arg_line):
928 arg_strings.append(arg)
929 arg_strings = self._read_args_from_files(arg_strings)
930 new_arg_strings.extend(arg_strings)
934 err = sys.exc_info()[1]
937 # return the modified argument list
938 return new_arg_strings
940 def convert_arg_line_to_args(self, arg_line):
941 return [arg for arg in arg_line.split() if arg.strip()]
944 realnets=dict((name, net) for name, net in networks.nets.iteritems() if '_testnet' not in name)
946 parser = FixedArgumentParser(description='p2pool (version %s)' % (p2pool.__version__,), fromfile_prefix_chars='@')
947 parser.add_argument('--version', action='version', version=p2pool.__version__)
948 parser.add_argument('--net',
949 help='use specified network (default: bitcoin)',
950 action='store', choices=sorted(realnets), default='bitcoin', dest='net_name')
951 parser.add_argument('--testnet',
952 help='''use the network's testnet''',
953 action='store_const', const=True, default=False, dest='testnet')
954 parser.add_argument('--debug',
955 help='enable debugging mode',
956 action='store_const', const=True, default=False, dest='debug')
957 parser.add_argument('-a', '--address',
958 help='generate payouts to this address (default: <address requested from bitcoind>)',
959 type=str, action='store', default=None, dest='address')
960 parser.add_argument('--datadir',
961 help='store data in this directory (default: <directory run_p2pool.py is in>/data)',
962 type=str, action='store', default=None, dest='datadir')
963 parser.add_argument('--logfile',
964 help='''log to this file (default: data/<NET>/log)''',
965 type=str, action='store', default=None, dest='logfile')
966 parser.add_argument('--merged',
967 help='call getauxblock on this url to get work for merged mining (example: http://ncuser:ncpass@127.0.0.1:10332/)',
968 type=str, action='append', default=[], dest='merged_urls')
969 parser.add_argument('--give-author', metavar='DONATION_PERCENTAGE',
970 help='donate this percentage of work towards the development of p2pool (default: 0.5)',
971 type=float, action='store', default=0.5, dest='donation_percentage')
972 parser.add_argument('--iocp',
973 help='use Windows IOCP API in order to avoid errors due to large number of sockets being open',
974 action='store_true', default=False, dest='iocp')
975 parser.add_argument('--irc-announce',
976 help='announce any blocks found on irc://irc.freenode.net/#p2pool',
977 action='store_true', default=False, dest='irc_announce')
978 parser.add_argument('--no-bugreport',
979 help='disable submitting caught exceptions to the author',
980 action='store_true', default=False, dest='no_bugreport')
982 p2pool_group = parser.add_argument_group('p2pool interface')
983 p2pool_group.add_argument('--p2pool-port', metavar='PORT',
984 help='use port PORT to listen for connections (forward this port from your router!) (default: %s)' % ', '.join('%s:%i' % (name, net.P2P_PORT) for name, net in sorted(realnets.items())),
985 type=int, action='store', default=None, dest='p2pool_port')
986 p2pool_group.add_argument('-n', '--p2pool-node', metavar='ADDR[:PORT]',
987 help='connect to existing p2pool node at ADDR listening on port PORT (defaults to default p2pool P2P port) in addition to builtin addresses',
988 type=str, action='append', default=[], dest='p2pool_nodes')
989 parser.add_argument('--disable-upnp',
990 help='''don't attempt to use UPnP to forward p2pool's P2P port from the Internet to this computer''',
991 action='store_false', default=True, dest='upnp')
992 p2pool_group.add_argument('--max-conns', metavar='CONNS',
993 help='maximum incoming connections (default: 40)',
994 type=int, action='store', default=40, dest='p2pool_conns')
996 worker_group = parser.add_argument_group('worker interface')
997 worker_group.add_argument('-w', '--worker-port', metavar='PORT or ADDR:PORT',
998 help='listen on PORT on interface with ADDR for RPC connections from miners (default: all interfaces, %s)' % ', '.join('%s:%i' % (name, net.WORKER_PORT) for name, net in sorted(realnets.items())),
999 type=str, action='store', default=None, dest='worker_endpoint')
1000 worker_group.add_argument('-f', '--fee', metavar='FEE_PERCENTAGE',
1001 help='''charge workers mining to their own bitcoin address (by setting their miner's username to a bitcoin address) this percentage fee to mine on your p2pool instance. Amount displayed at http://127.0.0.1:WORKER_PORT/fee (default: 0)''',
1002 type=float, action='store', default=0, dest='worker_fee')
1004 bitcoind_group = parser.add_argument_group('bitcoind interface')
1005 bitcoind_group.add_argument('--bitcoind-address', metavar='BITCOIND_ADDRESS',
1006 help='connect to this address (default: 127.0.0.1)',
1007 type=str, action='store', default='127.0.0.1', dest='bitcoind_address')
1008 bitcoind_group.add_argument('--bitcoind-rpc-port', metavar='BITCOIND_RPC_PORT',
1009 help='''connect to JSON-RPC interface at this port (default: %s <read from bitcoin.conf if password not provided>)''' % ', '.join('%s:%i' % (name, net.PARENT.RPC_PORT) for name, net in sorted(realnets.items())),
1010 type=int, action='store', default=None, dest='bitcoind_rpc_port')
1011 bitcoind_group.add_argument('--bitcoind-p2p-port', metavar='BITCOIND_P2P_PORT',
1012 help='''connect to P2P interface at this port (default: %s <read from bitcoin.conf if password not provided>)''' % ', '.join('%s:%i' % (name, net.PARENT.P2P_PORT) for name, net in sorted(realnets.items())),
1013 type=int, action='store', default=None, dest='bitcoind_p2p_port')
1015 bitcoind_group.add_argument(metavar='BITCOIND_RPCUSERPASS',
1016 help='bitcoind RPC interface username, then password, space-separated (only one being provided will cause the username to default to being empty, and none will cause P2Pool to read them from bitcoin.conf)',
1017 type=str, action='store', default=[], nargs='*', dest='bitcoind_rpc_userpass')
1019 args = parser.parse_args()
1024 net_name = args.net_name + ('_testnet' if args.testnet else '')
1025 net = networks.nets[net_name]
1027 datadir_path = os.path.join((os.path.join(os.path.dirname(sys.argv[0]), 'data') if args.datadir is None else args.datadir), net_name)
1028 if not os.path.exists(datadir_path):
1029 os.makedirs(datadir_path)
1031 if len(args.bitcoind_rpc_userpass) > 2:
1032 parser.error('a maximum of two arguments are allowed')
1033 args.bitcoind_rpc_username, args.bitcoind_rpc_password = ([None, None] + args.bitcoind_rpc_userpass)[-2:]
1035 if args.bitcoind_rpc_password is None:
1036 if not hasattr(net.PARENT, 'CONF_FILE_FUNC'):
1037 parser.error('This network has no configuration file function. Manually enter your RPC password.')
1038 conf_path = net.PARENT.CONF_FILE_FUNC()
1039 if not os.path.exists(conf_path):
1040 parser.error('''Bitcoin configuration file not found. Manually enter your RPC password.\r\n'''
1041 '''If you actually haven't created a configuration file, you should create one at %s with the text:\r\n'''
1044 '''rpcpassword=%x''' % (conf_path, random.randrange(2**128)))
1045 with open(conf_path, 'rb') as f:
1046 cp = ConfigParser.RawConfigParser()
1047 cp.readfp(StringIO.StringIO('[x]\r\n' + f.read()))
1048 for conf_name, var_name, var_type in [
1049 ('rpcuser', 'bitcoind_rpc_username', str),
1050 ('rpcpassword', 'bitcoind_rpc_password', str),
1051 ('rpcport', 'bitcoind_rpc_port', int),
1052 ('port', 'bitcoind_p2p_port', int),
1054 if getattr(args, var_name) is None and cp.has_option('x', conf_name):
1055 setattr(args, var_name, var_type(cp.get('x', conf_name)))
1056 if args.bitcoind_rpc_password is None:
1057 parser.error('''Bitcoin configuration file didn't contain an rpcpassword= line! Add one!''')
1059 if args.bitcoind_rpc_username is None:
1060 args.bitcoind_rpc_username = ''
1062 if args.bitcoind_rpc_port is None:
1063 args.bitcoind_rpc_port = net.PARENT.RPC_PORT
1065 if args.bitcoind_p2p_port is None:
1066 args.bitcoind_p2p_port = net.PARENT.P2P_PORT
1068 if args.p2pool_port is None:
1069 args.p2pool_port = net.P2P_PORT
1071 if args.worker_endpoint is None:
1072 worker_endpoint = '', net.WORKER_PORT
1073 elif ':' not in args.worker_endpoint:
1074 worker_endpoint = '', int(args.worker_endpoint)
1076 addr, port = args.worker_endpoint.rsplit(':', 1)
1077 worker_endpoint = addr, int(port)
1079 if args.address is not None:
1081 args.pubkey_hash = bitcoin_data.address_to_pubkey_hash(args.address, net.PARENT)
1082 except Exception, e:
1083 parser.error('error parsing address: ' + repr(e))
1085 args.pubkey_hash = None
1087 def separate_url(url):
1088 s = urlparse.urlsplit(url)
1089 if '@' not in s.netloc:
1090 parser.error('merged url netloc must contain an "@"')
1091 userpass, new_netloc = s.netloc.rsplit('@', 1)
1092 return urlparse.urlunsplit(s._replace(netloc=new_netloc)), userpass
1093 merged_urls = map(separate_url, args.merged_urls)
1095 if args.logfile is None:
1096 args.logfile = os.path.join(datadir_path, 'log')
1098 logfile = logging.LogFile(args.logfile)
1099 pipe = logging.TimestampingPipe(logging.TeePipe([logging.EncodeReplacerPipe(sys.stderr), logfile]))
1100 sys.stdout = logging.AbortPipe(pipe)
1101 sys.stderr = log.DefaultObserver.stderr = logging.AbortPipe(logging.PrefixPipe(pipe, '> '))
1102 if hasattr(signal, "SIGUSR1"):
1103 def sigusr1(signum, frame):
1104 print 'Caught SIGUSR1, closing %r...' % (args.logfile,)
1106 print '...and reopened %r after catching SIGUSR1.' % (args.logfile,)
1107 signal.signal(signal.SIGUSR1, sigusr1)
1108 task.LoopingCall(logfile.reopen).start(5)
1110 class ErrorReporter(object):
1112 self.last_sent = None
1114 def emit(self, eventDict):
1115 if not eventDict["isError"]:
1118 if self.last_sent is not None and time.time() < self.last_sent + 5:
1120 self.last_sent = time.time()
1122 if 'failure' in eventDict:
1123 text = ((eventDict.get('why') or 'Unhandled Error')
1124 + '\n' + eventDict['failure'].getTraceback())
1126 text = " ".join([str(m) for m in eventDict["message"]]) + "\n"
1128 from twisted.web import client
1130 url='http://u.forre.st/p2pool_error.cgi',
1132 postdata=p2pool.__version__ + ' ' + net.NAME + '\n' + text,
1134 ).addBoth(lambda x: None)
1135 if not args.no_bugreport:
1136 log.addObserver(ErrorReporter().emit)
1138 reactor.callWhenRunning(main, args, net, datadir_path, merged_urls, worker_endpoint)