1 from __future__ import division
16 if '--iocp' in sys.argv:
17 from twisted.internet import iocpreactor
19 from twisted.internet import defer, reactor, protocol, task
20 from twisted.web import server
21 from twisted.python import log
22 from nattraverso import portmapper, ipdiscover
24 import bitcoin.p2p as bitcoin_p2p, bitcoin.getwork as bitcoin_getwork, bitcoin.data as bitcoin_data
25 from bitcoin import worker_interface, height_tracker
26 from util import expiring_dict, jsonrpc, variable, deferral, math, logging, pack
27 from . import p2p, networks, web
28 import p2pool, p2pool.data as p2pool_data
30 @deferral.retry('Error getting work from bitcoind:', 3)
31 @defer.inlineCallbacks
32 def getwork(bitcoind):
34 work = yield bitcoind.rpc_getmemorypool()
35 except jsonrpc.Error, e:
36 if e.code == -32601: # Method not found
37 print >>sys.stderr, 'Error: Bitcoin version too old! Upgrade to v0.5 or newer!'
38 raise deferral.RetrySilentlyException()
40 packed_transactions = [x.decode('hex') for x in work['transactions']]
41 defer.returnValue(dict(
42 version=work['version'],
43 previous_block_hash=int(work['previousblockhash'], 16),
44 transactions=map(bitcoin_data.tx_type.unpack, packed_transactions),
45 merkle_link=bitcoin_data.calculate_merkle_link([0] + map(bitcoin_data.hash256, packed_transactions), 0), # using 0 is a bit of a hack, but will always work when index=0
46 subsidy=work['coinbasevalue'],
48 bits=bitcoin_data.FloatingIntegerType().unpack(work['bits'].decode('hex')[::-1]) if isinstance(work['bits'], (str, unicode)) else bitcoin_data.FloatingInteger(work['bits']),
49 coinbaseflags=work['coinbaseflags'].decode('hex') if 'coinbaseflags' in work else ''.join(x.decode('hex') for x in work['coinbaseaux'].itervalues()) if 'coinbaseaux' in work else '',
52 @defer.inlineCallbacks
53 def main(args, net, datadir_path, merged_urls, worker_endpoint):
55 print 'p2pool (version %s)' % (p2pool.__version__,)
58 # connect to bitcoind over JSON-RPC and do initial getmemorypool
59 url = 'http://%s:%i/' % (args.bitcoind_address, args.bitcoind_rpc_port)
60 print '''Testing bitcoind RPC connection to '%s' with username '%s'...''' % (url, args.bitcoind_rpc_username)
61 bitcoind = jsonrpc.Proxy(url, dict(Authorization='Basic ' + base64.b64encode(args.bitcoind_rpc_username + ':' + args.bitcoind_rpc_password)), timeout=30)
62 @deferral.retry('Error while checking Bitcoin connection:', 1)
63 @defer.inlineCallbacks
65 if not (yield net.PARENT.RPC_CHECK)(bitcoind):
66 print >>sys.stderr, " Check failed! Make sure that you're connected to the right bitcoind with --bitcoind-rpc-port!"
67 raise deferral.RetrySilentlyException()
68 temp_work = yield getwork(bitcoind)
69 if not net.VERSION_CHECK((yield bitcoind.rpc_getinfo())['version'], temp_work):
70 print >>sys.stderr, ' Bitcoin version too old! BIP16 support required! Upgrade to 0.6.0rc4 or greater!'
71 raise deferral.RetrySilentlyException()
72 defer.returnValue(temp_work)
73 temp_work = yield check()
75 print ' Current block hash: %x' % (temp_work['previous_block_hash'],)
78 # connect to bitcoind over bitcoin-p2p
79 print '''Testing bitcoind P2P connection to '%s:%s'...''' % (args.bitcoind_address, args.bitcoind_p2p_port)
80 factory = bitcoin_p2p.ClientFactory(net.PARENT)
81 reactor.connectTCP(args.bitcoind_address, args.bitcoind_p2p_port, factory)
82 yield factory.getProtocol() # waits until handshake is successful
86 print 'Determining payout address...'
87 if args.pubkey_hash is None:
88 address_path = os.path.join(datadir_path, 'cached_payout_address')
90 if os.path.exists(address_path):
91 with open(address_path, 'rb') as f:
92 address = f.read().strip('\r\n')
93 print ' Loaded cached address: %s...' % (address,)
97 if address is not None:
98 res = yield deferral.retry('Error validating cached address:', 5)(lambda: bitcoind.rpc_validateaddress(address))()
99 if not res['isvalid'] or not res['ismine']:
100 print ' Cached address is either invalid or not controlled by local bitcoind!'
104 print ' Getting payout address from bitcoind...'
105 address = yield deferral.retry('Error getting payout address from bitcoind:', 5)(lambda: bitcoind.rpc_getaccountaddress('p2pool'))()
107 with open(address_path, 'wb') as f:
110 my_pubkey_hash = bitcoin_data.address_to_pubkey_hash(address, net.PARENT)
112 my_pubkey_hash = args.pubkey_hash
113 print ' ...success! Payout address:', bitcoin_data.pubkey_hash_to_address(my_pubkey_hash, net.PARENT)
116 my_share_hashes = set()
117 my_doa_share_hashes = set()
119 tracker = p2pool_data.OkayTracker(net, my_share_hashes, my_doa_share_hashes)
120 shared_share_hashes = set()
121 ss = p2pool_data.ShareStore(os.path.join(datadir_path, 'shares.'), net)
122 known_verified = set()
123 print "Loading shares..."
124 for i, (mode, contents) in enumerate(ss.get_shares()):
126 if contents.hash in tracker.shares:
128 shared_share_hashes.add(contents.hash)
129 contents.time_seen = 0
130 tracker.add(contents)
131 if len(tracker.shares) % 1000 == 0 and tracker.shares:
132 print " %i" % (len(tracker.shares),)
133 elif mode == 'verified_hash':
134 known_verified.add(contents)
136 raise AssertionError()
137 print " ...inserting %i verified shares..." % (len(known_verified),)
138 for h in known_verified:
139 if h not in tracker.shares:
140 ss.forget_verified_share(h)
142 tracker.verified.add(tracker.shares[h])
143 print " ...done loading %i shares!" % (len(tracker.shares),)
145 tracker.removed.watch(lambda share: ss.forget_share(share.hash))
146 tracker.verified.removed.watch(lambda share: ss.forget_verified_share(share.hash))
147 tracker.removed.watch(lambda share: shared_share_hashes.discard(share.hash))
149 print 'Initializing work...'
154 bitcoind_work = variable.Variable(None)
156 @defer.inlineCallbacks
158 work = yield getwork(bitcoind)
159 bitcoind_work.set(dict(
160 version=work['version'],
161 previous_block=work['previous_block_hash'],
163 coinbaseflags=work['coinbaseflags'],
165 transactions=work['transactions'],
166 merkle_link=work['merkle_link'],
167 subsidy=work['subsidy'],
168 clock_offset=time.time() - work['time'],
169 last_update=time.time(),
171 yield poll_bitcoind()
173 @defer.inlineCallbacks
176 flag = factory.new_block.get_deferred()
178 yield poll_bitcoind()
181 yield defer.DeferredList([flag, deferral.sleep(15)], fireOnOneCallback=True)
186 best_block_header = variable.Variable(None)
187 def handle_header(header):
188 # check that header matches current target
189 if not (net.PARENT.POW_FUNC(bitcoin_data.block_header_type.pack(header)) <= bitcoind_work.value['bits'].target):
191 if (best_block_header.value is None
193 header['previous_block'] == current_work.value['previous_block'] and
194 bitcoin_data.hash256(bitcoin_data.block_header_type.pack(best_block_header.value)) == current_work.value['previous_block']
195 ) # new is child of current and previous is current
197 bitcoin_data.hash256(bitcoin_data.block_header_type.pack(header)) == current_work.value['previous_block'] and
198 best_block_header.value['previous_block'] != current_work.value['previous_block'])
199 ): # new is current and previous is not child of current
200 best_block_header.set(header)
201 @bitcoind_work.changed.watch
202 @defer.inlineCallbacks
204 handle_header((yield factory.conn.value.get_block_header(work['previous_block'])))
205 @best_block_header.changed.watch
211 merged_work = variable.Variable({})
213 @defer.inlineCallbacks
214 def set_merged_work(merged_url, merged_userpass):
215 merged_proxy = jsonrpc.Proxy(merged_url, dict(Authorization='Basic ' + base64.b64encode(merged_userpass)))
217 auxblock = yield deferral.retry('Error while calling merged getauxblock:', 1)(merged_proxy.rpc_getauxblock)()
218 merged_work.set(dict(merged_work.value, **{auxblock['chainid']: dict(
219 hash=int(auxblock['hash'], 16),
220 target=pack.IntType(256).unpack(auxblock['target'].decode('hex')),
221 merged_proxy=merged_proxy,
223 yield deferral.sleep(1)
224 for merged_url, merged_userpass in merged_urls:
225 set_merged_work(merged_url, merged_userpass)
227 @merged_work.changed.watch
228 def _(new_merged_work):
229 print 'Got new merged mining work!'
233 current_work = variable.Variable(None)
235 get_height_rel_highest = yield height_tracker.get_height_rel_highest_func(bitcoind, factory, lambda: bitcoind_work.value['previous_block'], net)
236 requested = expiring_dict.ExpiringDict(300)
237 peer_heads = expiring_dict.ExpiringDict(300) # hash -> peers that know of it
239 best, desired = tracker.think(get_height_rel_highest, bitcoind_work.value['previous_block'], bitcoind_work.value['bits'])
241 t = dict(bitcoind_work.value)
243 if (best_block_header.value is not None and
244 best_block_header.value['previous_block'] == t['previous_block'] and
245 net.PARENT.POW_FUNC(bitcoin_data.block_header_type.pack(best_block_header.value)) <= t['bits'].target):
246 print 'Skipping from block %x to block %x!' % (best_block_header.value['previous_block'],
247 bitcoin_data.hash256(bitcoin_data.block_header_type.pack(best_block_header.value)))
249 version=best_block_header.value['version'],
250 previous_block=bitcoin_data.hash256(bitcoin_data.block_header_type.pack(best_block_header.value)),
251 bits=best_block_header.value['bits'], # not always true
253 time=best_block_header.value['timestamp'] + 600, # better way?
255 merkle_link=bitcoin_data.calculate_merkle_link([0], 0),
256 subsidy=5000000000, # XXX fix this
257 clock_offset=current_work.value['clock_offset'],
258 last_update=current_work.value['last_update'],
261 t['best_share_hash'] = best
262 t['mm_chains'] = merged_work.value
266 for peer2, share_hash in desired:
267 if share_hash not in tracker.tails: # was received in the time tracker.think was running
269 last_request_time, count = requested.get(share_hash, (None, 0))
270 if last_request_time is not None and last_request_time - 5 < t < last_request_time + 10 * 1.5**count:
272 potential_peers = set()
273 for head in tracker.tails[share_hash]:
274 potential_peers.update(peer_heads.get(head, set()))
275 potential_peers = [peer for peer in potential_peers if peer.connected2]
276 if count == 0 and peer2 is not None and peer2.connected2:
279 peer = random.choice(potential_peers) if potential_peers and random.random() > .2 else peer2
283 print 'Requesting parent share %s from %s' % (p2pool_data.format_hash(share_hash), '%s:%i' % peer.addr)
287 stops=list(set(tracker.heads) | set(
288 tracker.get_nth_parent_hash(head, min(max(0, tracker.get_height_and_last(head)[0] - 1), 10)) for head in tracker.heads
291 requested[share_hash] = t, count + 1
292 bitcoind_work.changed.watch(lambda _: compute_work())
293 merged_work.changed.watch(lambda _: compute_work())
298 lp_signal = variable.Event()
300 @current_work.transitioned.watch
301 def _(before, after):
302 # trigger LP if version/previous_block/bits changed or transactions changed from nothing
303 if any(before[x] != after[x] for x in ['version', 'previous_block', 'bits', 'best_share_hash', 'mm_chains']) or (not before['transactions'] and after['transactions']):
310 # setup p2p logic and join p2pool network
312 class Node(p2p.Node):
313 def handle_shares(self, shares, peer):
315 print 'Processing %i shares from %s...' % (len(shares), '%s:%i' % peer.addr if peer is not None else None)
319 if share.hash in tracker.shares:
320 #print 'Got duplicate share, ignoring. Hash: %s' % (p2pool_data.format_hash(share.hash),)
325 #print 'Received share %s from %r' % (p2pool_data.format_hash(share.hash), share.peer.addr if share.peer is not None else None)
329 if shares and peer is not None:
330 peer_heads.setdefault(shares[0].hash, set()).add(peer)
336 print '... done processing %i shares. New: %i Have: %i/~%i' % (len(shares), new_count, len(tracker.shares), 2*net.CHAIN_LENGTH)
338 def handle_share_hashes(self, hashes, peer):
341 for share_hash in hashes:
342 if share_hash in tracker.shares:
344 last_request_time, count = requested.get(share_hash, (None, 0))
345 if last_request_time is not None and last_request_time - 5 < t < last_request_time + 10 * 1.5**count:
347 print 'Got share hash, requesting! Hash: %s' % (p2pool_data.format_hash(share_hash),)
348 get_hashes.append(share_hash)
349 requested[share_hash] = t, count + 1
351 if hashes and peer is not None:
352 peer_heads.setdefault(hashes[0], set()).add(peer)
354 peer.send_getshares(hashes=get_hashes, parents=0, stops=[])
356 def handle_get_shares(self, hashes, parents, stops, peer):
357 parents = min(parents, 1000//len(hashes))
360 for share_hash in hashes:
361 for share in tracker.get_chain(share_hash, min(parents + 1, tracker.get_height(share_hash))):
362 if share.hash in stops:
365 print 'Sending %i shares to %s:%i' % (len(shares), peer.addr[0], peer.addr[1])
368 def handle_bestblock(self, header, peer):
369 if net.PARENT.POW_FUNC(bitcoin_data.block_header_type.pack(header)) > header['bits'].target:
370 raise p2p.PeerMisbehavingError('received block header fails PoW test')
371 handle_header(header)
373 @deferral.retry('Error submitting primary block: (will retry)', 10, 10)
374 def submit_block_p2p(block):
375 if factory.conn.value is None:
376 print >>sys.stderr, 'No bitcoind connection when block submittal attempted! %s%32x' % (net.PARENT.BLOCK_EXPLORER_URL_PREFIX, bitcoin_data.hash256(bitcoin_data.block_header_type.pack(block['header'])))
377 raise deferral.RetrySilentlyException()
378 factory.conn.value.send_block(block=block)
380 @deferral.retry('Error submitting block: (will retry)', 10, 10)
381 @defer.inlineCallbacks
382 def submit_block_rpc(block, ignore_failure):
383 success = yield bitcoind.rpc_getmemorypool(bitcoin_data.block_type.pack(block).encode('hex'))
384 success_expected = net.PARENT.POW_FUNC(bitcoin_data.block_header_type.pack(block['header'])) <= block['header']['bits'].target
385 if (not success and success_expected and not ignore_failure) or (success and not success_expected):
386 print >>sys.stderr, 'Block submittal result: %s Expected: %s' % (success, success_expected)
388 def submit_block(block, ignore_failure):
389 submit_block_p2p(block)
390 submit_block_rpc(block, ignore_failure)
392 @tracker.verified.added.watch
394 if share.pow_hash <= share.header['bits'].target:
395 submit_block(share.as_block(tracker), ignore_failure=True)
397 print 'GOT BLOCK FROM PEER! Passing to bitcoind! %s bitcoin: %s%064x' % (p2pool_data.format_hash(share.hash), net.PARENT.BLOCK_EXPLORER_URL_PREFIX, share.header_hash)
400 if (get_height_rel_highest(share.header['previous_block']) > -5 or
401 current_work.value['previous_block'] in [share.header['previous_block'], share.header_hash]):
402 broadcast_share(share.hash)
404 reactor.callLater(5, spread) # so get_height_rel_highest can update
406 print 'Joining p2pool network using port %i...' % (args.p2pool_port,)
408 @defer.inlineCallbacks
411 ip, port = x.split(':')
412 defer.returnValue(((yield reactor.resolve(ip)), int(port)))
414 defer.returnValue(((yield reactor.resolve(x)), net.P2P_PORT))
417 if os.path.exists(os.path.join(datadir_path, 'addrs')):
419 with open(os.path.join(datadir_path, 'addrs'), 'rb') as f:
420 addrs.update(dict((tuple(k), v) for k, v in json.loads(f.read())))
422 print >>sys.stderr, 'error parsing addrs'
423 elif os.path.exists(os.path.join(datadir_path, 'addrs.txt')):
425 addrs.update(dict(eval(x) for x in open(os.path.join(datadir_path, 'addrs.txt'))))
427 print >>sys.stderr, "error reading addrs.txt"
428 for addr_df in map(parse, net.BOOTSTRAP_ADDRS):
431 if addr not in addrs:
432 addrs[addr] = (0, time.time(), time.time())
436 connect_addrs = set()
437 for addr_df in map(parse, args.p2pool_nodes):
439 connect_addrs.add((yield addr_df))
444 best_share_hash_func=lambda: current_work.value['best_share_hash'],
445 port=args.p2pool_port,
448 connect_addrs=connect_addrs,
449 max_incoming_conns=args.p2pool_conns,
454 with open(os.path.join(datadir_path, 'addrs'), 'wb') as f:
455 f.write(json.dumps(p2p_node.addr_store.items()))
456 task.LoopingCall(save_addrs).start(60)
458 @best_block_header.changed.watch
460 for peer in p2p_node.peers.itervalues():
461 peer.send_bestblock(header=header)
463 def broadcast_share(share_hash):
465 for share in tracker.get_chain(share_hash, min(5, tracker.get_height(share_hash))):
466 if share.hash in shared_share_hashes:
468 shared_share_hashes.add(share.hash)
471 for peer in p2p_node.peers.itervalues():
472 peer.sendShares([share for share in shares if share.peer is not peer])
474 # send share when the chain changes to their chain
475 current_work.changed.watch(lambda new_work: broadcast_share(new_work['best_share_hash']))
478 for share in tracker.get_chain(current_work.value['best_share_hash'], min(tracker.get_height(current_work.value['best_share_hash']), 2*net.CHAIN_LENGTH)):
480 if share.hash in tracker.verified.shares:
481 ss.add_verified_hash(share.hash)
482 task.LoopingCall(save_shares).start(60)
488 @defer.inlineCallbacks
492 is_lan, lan_ip = yield ipdiscover.get_local_ip()
494 pm = yield portmapper.get_port_mapper()
495 yield pm._upnp.add_port_mapping(lan_ip, args.p2pool_port, args.p2pool_port, 'p2pool', 'TCP')
496 except defer.TimeoutError:
500 log.err(None, 'UPnP error:')
501 yield deferral.sleep(random.expovariate(1/120))
504 # start listening for workers with a JSON-RPC server
506 print 'Listening for workers on %r port %i...' % (worker_endpoint[0], worker_endpoint[1])
510 removed_unstales_var = variable.Variable((0, 0, 0))
511 removed_doa_unstales_var = variable.Variable(0)
512 @tracker.verified.removed.watch
514 if share.hash in my_share_hashes and tracker.is_child_of(share.hash, current_work.value['best_share_hash']):
515 assert share.share_data['stale_info'] in [None, 'orphan', 'doa'] # we made these shares in this instance
516 removed_unstales_var.set((
517 removed_unstales_var.value[0] + 1,
518 removed_unstales_var.value[1] + (1 if share.share_data['stale_info'] == 'orphan' else 0),
519 removed_unstales_var.value[2] + (1 if share.share_data['stale_info'] == 'doa' else 0),
521 if share.hash in my_doa_share_hashes and tracker.is_child_of(share.hash, current_work.value['best_share_hash']):
522 removed_doa_unstales_var.set(removed_doa_unstales_var.value + 1)
524 def get_stale_counts():
525 '''Returns (orphans, doas), total, (orphans_recorded_in_chain, doas_recorded_in_chain)'''
526 my_shares = len(my_share_hashes)
527 my_doa_shares = len(my_doa_share_hashes)
528 delta = tracker.verified.get_delta_to_last(current_work.value['best_share_hash'])
529 my_shares_in_chain = delta.my_count + removed_unstales_var.value[0]
530 my_doa_shares_in_chain = delta.my_doa_count + removed_doa_unstales_var.value
531 orphans_recorded_in_chain = delta.my_orphan_announce_count + removed_unstales_var.value[1]
532 doas_recorded_in_chain = delta.my_dead_announce_count + removed_unstales_var.value[2]
534 my_shares_not_in_chain = my_shares - my_shares_in_chain
535 my_doa_shares_not_in_chain = my_doa_shares - my_doa_shares_in_chain
537 return (my_shares_not_in_chain - my_doa_shares_not_in_chain, my_doa_shares_not_in_chain), my_shares, (orphans_recorded_in_chain, doas_recorded_in_chain)
540 pseudoshare_received = variable.Event()
541 share_received = variable.Event()
542 local_rate_monitor = math.RateMonitor(10*60)
544 class WorkerBridge(worker_interface.WorkerBridge):
546 worker_interface.WorkerBridge.__init__(self)
547 self.new_work_event = lp_signal
548 self.recent_shares_ts_work = []
550 def get_user_details(self, request):
551 user = request.getUser() if request.getUser() is not None else ''
553 desired_pseudoshare_target = None
555 user, desired_pseudoshare_difficulty_str = user.rsplit('+', 1)
557 desired_pseudoshare_target = bitcoin_data.difficulty_to_target(float(desired_pseudoshare_difficulty_str))
561 desired_share_target = 2**256 - 1
563 user, min_diff_str = user.rsplit('/', 1)
565 desired_share_target = bitcoin_data.difficulty_to_target(float(min_diff_str))
569 if random.uniform(0, 100) < args.worker_fee:
570 pubkey_hash = my_pubkey_hash
573 pubkey_hash = bitcoin_data.address_to_pubkey_hash(user, net.PARENT)
575 pubkey_hash = my_pubkey_hash
577 return user, pubkey_hash, desired_share_target, desired_pseudoshare_target
579 def preprocess_request(self, request):
580 user, pubkey_hash, desired_share_target, desired_pseudoshare_target = self.get_user_details(request)
581 return pubkey_hash, desired_share_target, desired_pseudoshare_target
583 def get_work(self, pubkey_hash, desired_share_target, desired_pseudoshare_target):
584 if len(p2p_node.peers) == 0 and net.PERSIST:
585 raise jsonrpc.Error(-12345, u'p2pool is not connected to any peers')
586 if current_work.value['best_share_hash'] is None and net.PERSIST:
587 raise jsonrpc.Error(-12345, u'p2pool is downloading shares')
588 if time.time() > current_work.value['last_update'] + 60:
589 raise jsonrpc.Error(-12345, u'lost contact with bitcoind')
591 if current_work.value['mm_chains']:
592 tree, size = bitcoin_data.make_auxpow_tree(current_work.value['mm_chains'])
593 mm_hashes = [current_work.value['mm_chains'].get(tree.get(i), dict(hash=0))['hash'] for i in xrange(size)]
594 mm_data = '\xfa\xbemm' + bitcoin_data.aux_pow_coinbase_type.pack(dict(
595 merkle_root=bitcoin_data.merkle_hash(mm_hashes),
599 mm_later = [(aux_work, mm_hashes.index(aux_work['hash']), mm_hashes) for chain_id, aux_work in current_work.value['mm_chains'].iteritems()]
605 share_info, generate_tx = p2pool_data.Share.generate_transaction(
608 previous_share_hash=current_work.value['best_share_hash'],
609 coinbase=(mm_data + current_work.value['coinbaseflags'])[:100],
610 nonce=random.randrange(2**32),
611 pubkey_hash=pubkey_hash,
612 subsidy=current_work.value['subsidy'],
613 donation=math.perfect_round(65535*args.donation_percentage/100),
614 stale_info=(lambda (orphans, doas), total, (orphans_recorded_in_chain, doas_recorded_in_chain):
615 'orphan' if orphans > orphans_recorded_in_chain else
616 'doa' if doas > doas_recorded_in_chain else
618 )(*get_stale_counts()),
621 block_target=current_work.value['bits'].target,
622 desired_timestamp=int(time.time() - current_work.value['clock_offset']),
623 desired_target=desired_share_target,
624 ref_merkle_link=dict(branch=[], index=0),
628 if desired_pseudoshare_target is None:
630 if len(self.recent_shares_ts_work) == 50:
631 hash_rate = sum(work for ts, work in self.recent_shares_ts_work[1:])//(self.recent_shares_ts_work[-1][0] - self.recent_shares_ts_work[0][0])
633 target = min(target, int(2**256/hash_rate))
635 target = desired_pseudoshare_target
636 target = max(target, share_info['bits'].target)
637 for aux_work in current_work.value['mm_chains'].itervalues():
638 target = max(target, aux_work['target'])
639 target = math.clip(target, net.PARENT.SANE_TARGET_RANGE)
641 transactions = [generate_tx] + list(current_work.value['transactions'])
642 packed_generate_tx = bitcoin_data.tx_type.pack(generate_tx)
643 merkle_root = bitcoin_data.check_merkle_link(bitcoin_data.hash256(packed_generate_tx), current_work.value['merkle_link'])
645 getwork_time = time.time()
646 merkle_link = current_work.value['merkle_link']
648 print 'New work for worker! Difficulty: %.06f Share difficulty: %.06f Total block value: %.6f %s including %i transactions' % (
649 bitcoin_data.target_to_difficulty(target),
650 bitcoin_data.target_to_difficulty(share_info['bits'].target),
651 current_work.value['subsidy']*1e-8, net.PARENT.SYMBOL,
652 len(current_work.value['transactions']),
655 bits = current_work.value['bits']
656 previous_block = current_work.value['previous_block']
657 ba = bitcoin_getwork.BlockAttempt(
658 version=current_work.value['version'],
659 previous_block=current_work.value['previous_block'],
660 merkle_root=merkle_root,
661 timestamp=current_work.value['time'],
662 bits=current_work.value['bits'],
666 received_header_hashes = set()
668 def got_response(header, request):
669 header_hash = bitcoin_data.hash256(bitcoin_data.block_header_type.pack(header))
670 pow_hash = net.PARENT.POW_FUNC(bitcoin_data.block_header_type.pack(header))
672 if pow_hash <= header['bits'].target or p2pool.DEBUG:
673 submit_block(dict(header=header, txs=transactions), ignore_failure=False)
674 if pow_hash <= header['bits'].target:
676 print 'GOT BLOCK FROM MINER! Passing to bitcoind! %s%064x' % (net.PARENT.BLOCK_EXPLORER_URL_PREFIX, header_hash)
679 log.err(None, 'Error while processing potential block:')
681 user, _, _, _ = self.get_user_details(request)
682 assert header['merkle_root'] == merkle_root
683 assert header['previous_block'] == previous_block
684 assert header['bits'] == bits
686 on_time = current_work.value['best_share_hash'] == share_info['share_data']['previous_share_hash']
688 for aux_work, index, hashes in mm_later:
690 if pow_hash <= aux_work['target'] or p2pool.DEBUG:
691 df = deferral.retry('Error submitting merged block: (will retry)', 10, 10)(aux_work['merged_proxy'].rpc_getauxblock)(
692 pack.IntType(256, 'big').pack(aux_work['hash']).encode('hex'),
693 bitcoin_data.aux_pow_type.pack(dict(
696 block_hash=header_hash,
697 merkle_link=merkle_link,
699 merkle_link=bitcoin_data.calculate_merkle_link(hashes, index),
700 parent_block_header=header,
705 if result != (pow_hash <= aux_work['target']):
706 print >>sys.stderr, 'Merged block submittal result: %s Expected: %s' % (result, pow_hash <= aux_work['target'])
708 print 'Merged block submittal result: %s' % (result,)
711 log.err(err, 'Error submitting merged block:')
713 log.err(None, 'Error while processing merged mining POW:')
715 if pow_hash <= share_info['bits'].target and header_hash not in received_header_hashes:
716 min_header = dict(header);del min_header['merkle_root']
717 hash_link = p2pool_data.prefix_to_hash_link(packed_generate_tx[:-32-4], p2pool_data.Share.gentx_before_refhash)
718 share = p2pool_data.Share(net, None, dict(
719 min_header=min_header, share_info=share_info, hash_link=hash_link,
720 ref_merkle_link=dict(branch=[], index=0),
721 ), merkle_link=merkle_link, other_txs=transactions[1:] if pow_hash <= header['bits'].target else None)
723 print 'GOT SHARE! %s %s prev %s age %.2fs%s' % (
725 p2pool_data.format_hash(share.hash),
726 p2pool_data.format_hash(share.previous_hash),
727 time.time() - getwork_time,
728 ' DEAD ON ARRIVAL' if not on_time else '',
730 my_share_hashes.add(share.hash)
732 my_doa_share_hashes.add(share.hash)
736 tracker.verified.add(share)
740 if pow_hash <= header['bits'].target or p2pool.DEBUG:
741 for peer in p2p_node.peers.itervalues():
742 peer.sendShares([share])
743 shared_share_hashes.add(share.hash)
745 log.err(None, 'Error forwarding block solution:')
747 share_received.happened(bitcoin_data.target_to_average_attempts(share.target), not on_time)
749 if pow_hash > target:
750 print 'Worker %s submitted share with hash > target:' % (request.getUser(),)
751 print ' Hash: %56x' % (pow_hash,)
752 print ' Target: %56x' % (target,)
753 elif header_hash in received_header_hashes:
754 print >>sys.stderr, 'Worker %s @ %s submitted share more than once!' % (request.getUser(), request.getClientIP())
756 received_header_hashes.add(header_hash)
758 pseudoshare_received.happened(bitcoin_data.target_to_average_attempts(target), not on_time, user)
759 self.recent_shares_ts_work.append((time.time(), bitcoin_data.target_to_average_attempts(target)))
760 while len(self.recent_shares_ts_work) > 50:
761 self.recent_shares_ts_work.pop(0)
762 local_rate_monitor.add_datum(dict(work=bitcoin_data.target_to_average_attempts(target), dead=not on_time, user=user))
766 return ba, got_response
768 get_current_txouts = lambda: p2pool_data.get_expected_payouts(tracker, current_work.value['best_share_hash'], current_work.value['bits'].target, current_work.value['subsidy'], net)
770 web_root = web.get_web_root(tracker, current_work, get_current_txouts, datadir_path, net, get_stale_counts, my_pubkey_hash, local_rate_monitor, args.worker_fee, p2p_node, my_share_hashes, pseudoshare_received, share_received)
771 worker_interface.WorkerInterface(WorkerBridge()).attach_to(web_root, get_handler=lambda request: request.redirect('/static/'))
773 deferral.retry('Error binding to worker port:', traceback=False)(reactor.listenTCP)(worker_endpoint[1], server.Site(web_root), interface=worker_endpoint[0])
775 with open(os.path.join(os.path.join(datadir_path, 'ready_flag')), 'wb') as f:
783 print 'Started successfully!'
784 print 'Go to http://127.0.0.1:%i/ to view graphs and statistics!' % (worker_endpoint[1],)
785 if args.donation_percentage > 0.51:
786 print '''Donating %.1f%% of work towards P2Pool's development. Thanks for the tip!''' % (args.donation_percentage,)
787 elif args.donation_percentage < 0.49:
788 print '''Donating %.1f%% of work towards P2Pool's development. Please donate to encourage further development of P2Pool!''' % (args.donation_percentage,)
790 print '''Donating %.1f%% of work towards P2Pool's development. Thank you!''' % (args.donation_percentage,)
791 print 'You can increase this amount with --give-author argument! (or decrease it, if you must)'
795 if hasattr(signal, 'SIGALRM'):
796 signal.signal(signal.SIGALRM, lambda signum, frame: reactor.callFromThread(
797 sys.stderr.write, 'Watchdog timer went off at:\n' + ''.join(traceback.format_stack())
799 signal.siginterrupt(signal.SIGALRM, False)
800 task.LoopingCall(signal.alarm, 30).start(1)
802 if args.irc_announce:
803 from twisted.words.protocols import irc
804 class IRCClient(irc.IRCClient):
805 nickname = 'p2pool%02i' % (random.randrange(100),)
806 channel = net.ANNOUNCE_CHANNEL
807 def lineReceived(self, line):
810 irc.IRCClient.lineReceived(self, line)
812 irc.IRCClient.signedOn(self)
813 self.factory.resetDelay()
814 self.join(self.channel)
815 @defer.inlineCallbacks
816 def new_share(share):
817 if share.pow_hash <= share.header['bits'].target and abs(share.timestamp - time.time()) < 10*60:
818 yield deferral.sleep(random.expovariate(1/60))
819 message = '\x02%s BLOCK FOUND by %s! %s%064x' % (net.NAME.upper(), bitcoin_data.script2_to_address(share.new_script, net.PARENT), net.PARENT.BLOCK_EXPLORER_URL_PREFIX, share.header_hash)
820 if message not in self.recent_messages:
821 self.say(self.channel, message)
822 self._remember_message(message)
823 self.watch_id = tracker.verified.added.watch(new_share)
824 self.recent_messages = []
825 def _remember_message(self, message):
826 self.recent_messages.append(message)
827 while len(self.recent_messages) > 100:
828 self.recent_messages.pop(0)
829 def privmsg(self, user, channel, message):
830 if channel == self.channel:
831 self._remember_message(message)
832 def connectionLost(self, reason):
833 tracker.verified.added.unwatch(self.watch_id)
834 print 'IRC connection lost:', reason.getErrorMessage()
835 class IRCClientFactory(protocol.ReconnectingClientFactory):
837 reactor.connectTCP("irc.freenode.net", 6667, IRCClientFactory())
839 @defer.inlineCallbacks
844 yield deferral.sleep(3)
846 if time.time() > current_work.value['last_update'] + 60:
847 print >>sys.stderr, '''---> LOST CONTACT WITH BITCOIND for %s! Check that it isn't frozen or dead! <---''' % (math.format_dt(time.time() - current_work.value['last_update']),)
849 height = tracker.get_height(current_work.value['best_share_hash'])
850 this_str = 'P2Pool: %i shares in chain (%i verified/%i total) Peers: %i (%i incoming)' % (
852 len(tracker.verified.shares),
855 sum(1 for peer in p2p_node.peers.itervalues() if peer.incoming),
856 ) + (' FDs: %i R/%i W' % (len(reactor.getReaders()), len(reactor.getWriters())) if p2pool.DEBUG else '')
858 datums, dt = local_rate_monitor.get_datums_in_last()
859 my_att_s = sum(datum['work']/dt for datum in datums)
860 this_str += '\n Local: %sH/s in last %s Local dead on arrival: %s Expected time to share: %s' % (
861 math.format(int(my_att_s)),
863 math.format_binomial_conf(sum(1 for datum in datums if datum['dead']), len(datums), 0.95),
864 math.format_dt(2**256 / tracker.shares[current_work.value['best_share_hash']].max_target / my_att_s) if my_att_s and current_work.value['best_share_hash'] else '???',
868 (stale_orphan_shares, stale_doa_shares), shares, _ = get_stale_counts()
869 stale_prop = p2pool_data.get_average_stale_prop(tracker, current_work.value['best_share_hash'], min(60*60//net.SHARE_PERIOD, height))
870 real_att_s = p2pool_data.get_pool_attempts_per_second(tracker, current_work.value['best_share_hash'], min(height - 1, 60*60//net.SHARE_PERIOD)) / (1 - stale_prop)
872 this_str += '\n Shares: %i (%i orphan, %i dead) Stale rate: %s Efficiency: %s Current payout: %.4f %s' % (
873 shares, stale_orphan_shares, stale_doa_shares,
874 math.format_binomial_conf(stale_orphan_shares + stale_doa_shares, shares, 0.95),
875 math.format_binomial_conf(stale_orphan_shares + stale_doa_shares, shares, 0.95, lambda x: (1 - x)/(1 - stale_prop)),
876 get_current_txouts().get(bitcoin_data.pubkey_hash_to_script2(my_pubkey_hash), 0)*1e-8, net.PARENT.SYMBOL,
878 this_str += '\n Pool: %sH/s Stale rate: %.1f%% Expected time to block: %s' % (
879 math.format(int(real_att_s)),
881 math.format_dt(2**256 / current_work.value['bits'].target / real_att_s),
884 for warning in p2pool_data.get_warnings(tracker, current_work, net):
885 print >>sys.stderr, '#'*40
886 print >>sys.stderr, '>>> Warning: ' + warning
887 print >>sys.stderr, '#'*40
889 if this_str != last_str or time.time() > last_time + 15:
892 last_time = time.time()
898 log.err(None, 'Fatal error:')
901 class FixedArgumentParser(argparse.ArgumentParser):
902 def _read_args_from_files(self, arg_strings):
903 # expand arguments referencing files
905 for arg_string in arg_strings:
907 # for regular arguments, just add them back into the list
908 if not arg_string or arg_string[0] not in self.fromfile_prefix_chars:
909 new_arg_strings.append(arg_string)
911 # replace arguments referencing files with the file content
914 args_file = open(arg_string[1:])
917 for arg_line in args_file.read().splitlines():
918 for arg in self.convert_arg_line_to_args(arg_line):
919 arg_strings.append(arg)
920 arg_strings = self._read_args_from_files(arg_strings)
921 new_arg_strings.extend(arg_strings)
925 err = sys.exc_info()[1]
928 # return the modified argument list
929 return new_arg_strings
931 def convert_arg_line_to_args(self, arg_line):
932 return [arg for arg in arg_line.split() if arg.strip()]
935 realnets=dict((name, net) for name, net in networks.nets.iteritems() if '_testnet' not in name)
937 parser = FixedArgumentParser(description='p2pool (version %s)' % (p2pool.__version__,), fromfile_prefix_chars='@')
938 parser.add_argument('--version', action='version', version=p2pool.__version__)
939 parser.add_argument('--net',
940 help='use specified network (default: bitcoin)',
941 action='store', choices=sorted(realnets), default='bitcoin', dest='net_name')
942 parser.add_argument('--testnet',
943 help='''use the network's testnet''',
944 action='store_const', const=True, default=False, dest='testnet')
945 parser.add_argument('--debug',
946 help='enable debugging mode',
947 action='store_const', const=True, default=False, dest='debug')
948 parser.add_argument('-a', '--address',
949 help='generate payouts to this address (default: <address requested from bitcoind>)',
950 type=str, action='store', default=None, dest='address')
951 parser.add_argument('--datadir',
952 help='store data in this directory (default: <directory run_p2pool.py is in>/data)',
953 type=str, action='store', default=None, dest='datadir')
954 parser.add_argument('--logfile',
955 help='''log to this file (default: data/<NET>/log)''',
956 type=str, action='store', default=None, dest='logfile')
957 parser.add_argument('--merged',
958 help='call getauxblock on this url to get work for merged mining (example: http://ncuser:ncpass@127.0.0.1:10332/)',
959 type=str, action='append', default=[], dest='merged_urls')
960 parser.add_argument('--give-author', metavar='DONATION_PERCENTAGE',
961 help='donate this percentage of work towards the development of p2pool (default: 0.5)',
962 type=float, action='store', default=0.5, dest='donation_percentage')
963 parser.add_argument('--iocp',
964 help='use Windows IOCP API in order to avoid errors due to large number of sockets being open',
965 action='store_true', default=False, dest='iocp')
966 parser.add_argument('--irc-announce',
967 help='announce any blocks found on irc://irc.freenode.net/#p2pool',
968 action='store_true', default=False, dest='irc_announce')
969 parser.add_argument('--no-bugreport',
970 help='disable submitting caught exceptions to the author',
971 action='store_true', default=False, dest='no_bugreport')
973 p2pool_group = parser.add_argument_group('p2pool interface')
974 p2pool_group.add_argument('--p2pool-port', metavar='PORT',
975 help='use port PORT to listen for connections (forward this port from your router!) (default: %s)' % ', '.join('%s:%i' % (name, net.P2P_PORT) for name, net in sorted(realnets.items())),
976 type=int, action='store', default=None, dest='p2pool_port')
977 p2pool_group.add_argument('-n', '--p2pool-node', metavar='ADDR[:PORT]',
978 help='connect to existing p2pool node at ADDR listening on port PORT (defaults to default p2pool P2P port) in addition to builtin addresses',
979 type=str, action='append', default=[], dest='p2pool_nodes')
980 parser.add_argument('--disable-upnp',
981 help='''don't attempt to use UPnP to forward p2pool's P2P port from the Internet to this computer''',
982 action='store_false', default=True, dest='upnp')
983 p2pool_group.add_argument('--max-conns', metavar='CONNS',
984 help='maximum incoming connections (default: 40)',
985 type=int, action='store', default=40, dest='p2pool_conns')
987 worker_group = parser.add_argument_group('worker interface')
988 worker_group.add_argument('-w', '--worker-port', metavar='PORT or ADDR:PORT',
989 help='listen on PORT on interface with ADDR for RPC connections from miners (default: all interfaces, %s)' % ', '.join('%s:%i' % (name, net.WORKER_PORT) for name, net in sorted(realnets.items())),
990 type=str, action='store', default=None, dest='worker_endpoint')
991 worker_group.add_argument('-f', '--fee', metavar='FEE_PERCENTAGE',
992 help='''charge workers mining to their own bitcoin address (by setting their miner's username to a bitcoin address) this percentage fee to mine on your p2pool instance. Amount displayed at http://127.0.0.1:WORKER_PORT/fee (default: 0)''',
993 type=float, action='store', default=0, dest='worker_fee')
995 bitcoind_group = parser.add_argument_group('bitcoind interface')
996 bitcoind_group.add_argument('--bitcoind-address', metavar='BITCOIND_ADDRESS',
997 help='connect to this address (default: 127.0.0.1)',
998 type=str, action='store', default='127.0.0.1', dest='bitcoind_address')
999 bitcoind_group.add_argument('--bitcoind-rpc-port', metavar='BITCOIND_RPC_PORT',
1000 help='''connect to JSON-RPC interface at this port (default: %s <read from bitcoin.conf if password not provided>)''' % ', '.join('%s:%i' % (name, net.PARENT.RPC_PORT) for name, net in sorted(realnets.items())),
1001 type=int, action='store', default=None, dest='bitcoind_rpc_port')
1002 bitcoind_group.add_argument('--bitcoind-p2p-port', metavar='BITCOIND_P2P_PORT',
1003 help='''connect to P2P interface at this port (default: %s <read from bitcoin.conf if password not provided>)''' % ', '.join('%s:%i' % (name, net.PARENT.P2P_PORT) for name, net in sorted(realnets.items())),
1004 type=int, action='store', default=None, dest='bitcoind_p2p_port')
1006 bitcoind_group.add_argument(metavar='BITCOIND_RPCUSERPASS',
1007 help='bitcoind RPC interface username, then password, space-separated (only one being provided will cause the username to default to being empty, and none will cause P2Pool to read them from bitcoin.conf)',
1008 type=str, action='store', default=[], nargs='*', dest='bitcoind_rpc_userpass')
1010 args = parser.parse_args()
1015 net_name = args.net_name + ('_testnet' if args.testnet else '')
1016 net = networks.nets[net_name]
1018 datadir_path = os.path.join((os.path.join(os.path.dirname(sys.argv[0]), 'data') if args.datadir is None else args.datadir), net_name)
1019 if not os.path.exists(datadir_path):
1020 os.makedirs(datadir_path)
1022 if len(args.bitcoind_rpc_userpass) > 2:
1023 parser.error('a maximum of two arguments are allowed')
1024 args.bitcoind_rpc_username, args.bitcoind_rpc_password = ([None, None] + args.bitcoind_rpc_userpass)[-2:]
1026 if args.bitcoind_rpc_password is None:
1027 if not hasattr(net.PARENT, 'CONF_FILE_FUNC'):
1028 parser.error('This network has no configuration file function. Manually enter your RPC password.')
1029 conf_path = net.PARENT.CONF_FILE_FUNC()
1030 if not os.path.exists(conf_path):
1031 parser.error('''Bitcoin configuration file not found. Manually enter your RPC password.\r\n'''
1032 '''If you actually haven't created a configuration file, you should create one at %s with the text:\r\n'''
1035 '''rpcpassword=%x''' % (conf_path, random.randrange(2**128)))
1036 with open(conf_path, 'rb') as f:
1037 cp = ConfigParser.RawConfigParser()
1038 cp.readfp(StringIO.StringIO('[x]\r\n' + f.read()))
1039 for conf_name, var_name, var_type in [
1040 ('rpcuser', 'bitcoind_rpc_username', str),
1041 ('rpcpassword', 'bitcoind_rpc_password', str),
1042 ('rpcport', 'bitcoind_rpc_port', int),
1043 ('port', 'bitcoind_p2p_port', int),
1045 if getattr(args, var_name) is None and cp.has_option('x', conf_name):
1046 setattr(args, var_name, var_type(cp.get('x', conf_name)))
1047 if args.bitcoind_rpc_password is None:
1048 parser.error('''Bitcoin configuration file didn't contain an rpcpassword= line! Add one!''')
1050 if args.bitcoind_rpc_username is None:
1051 args.bitcoind_rpc_username = ''
1053 if args.bitcoind_rpc_port is None:
1054 args.bitcoind_rpc_port = net.PARENT.RPC_PORT
1056 if args.bitcoind_p2p_port is None:
1057 args.bitcoind_p2p_port = net.PARENT.P2P_PORT
1059 if args.p2pool_port is None:
1060 args.p2pool_port = net.P2P_PORT
1062 if args.worker_endpoint is None:
1063 worker_endpoint = '', net.WORKER_PORT
1064 elif ':' not in args.worker_endpoint:
1065 worker_endpoint = '', int(args.worker_endpoint)
1067 addr, port = args.worker_endpoint.rsplit(':', 1)
1068 worker_endpoint = addr, int(port)
1070 if args.address is not None:
1072 args.pubkey_hash = bitcoin_data.address_to_pubkey_hash(args.address, net.PARENT)
1073 except Exception, e:
1074 parser.error('error parsing address: ' + repr(e))
1076 args.pubkey_hash = None
1078 def separate_url(url):
1079 s = urlparse.urlsplit(url)
1080 if '@' not in s.netloc:
1081 parser.error('merged url netloc must contain an "@"')
1082 userpass, new_netloc = s.netloc.rsplit('@', 1)
1083 return urlparse.urlunsplit(s._replace(netloc=new_netloc)), userpass
1084 merged_urls = map(separate_url, args.merged_urls)
1086 if args.logfile is None:
1087 args.logfile = os.path.join(datadir_path, 'log')
1089 logfile = logging.LogFile(args.logfile)
1090 pipe = logging.TimestampingPipe(logging.TeePipe([logging.EncodeReplacerPipe(sys.stderr), logfile]))
1091 sys.stdout = logging.AbortPipe(pipe)
1092 sys.stderr = log.DefaultObserver.stderr = logging.AbortPipe(logging.PrefixPipe(pipe, '> '))
1093 if hasattr(signal, "SIGUSR1"):
1094 def sigusr1(signum, frame):
1095 print 'Caught SIGUSR1, closing %r...' % (args.logfile,)
1097 print '...and reopened %r after catching SIGUSR1.' % (args.logfile,)
1098 signal.signal(signal.SIGUSR1, sigusr1)
1099 task.LoopingCall(logfile.reopen).start(5)
1101 class ErrorReporter(object):
1103 self.last_sent = None
1105 def emit(self, eventDict):
1106 if not eventDict["isError"]:
1109 if self.last_sent is not None and time.time() < self.last_sent + 5:
1111 self.last_sent = time.time()
1113 if 'failure' in eventDict:
1114 text = ((eventDict.get('why') or 'Unhandled Error')
1115 + '\n' + eventDict['failure'].getTraceback())
1117 text = " ".join([str(m) for m in eventDict["message"]]) + "\n"
1119 from twisted.web import client
1121 url='http://u.forre.st/p2pool_error.cgi',
1123 postdata=p2pool.__version__ + ' ' + net.NAME + '\n' + text,
1125 ).addBoth(lambda x: None)
1126 if not args.no_bugreport:
1127 log.addObserver(ErrorReporter().emit)
1129 reactor.callWhenRunning(main, args, net, datadir_path, merged_urls, worker_endpoint)