3 from __future__ import division
16 from twisted.internet import defer, reactor
17 from twisted.web import server, resource
18 from twisted.python import log
19 from nattraverso import portmapper, ipdiscover
21 import bitcoin.p2p, bitcoin.getwork, bitcoin.data
22 from util import db, expiring_dict, jsonrpc, variable, deferral, math
23 from . import p2p, worker_interface, skiplists
24 import p2pool.data as p2pool
25 import p2pool as p2pool_init
27 @deferral.retry('Error getting work from bitcoind:', 3)
28 @defer.inlineCallbacks
29 def getwork(bitcoind):
30 # a block could arrive in between these two queries
31 getwork_df, height_df = bitcoind.rpc_getwork(), bitcoind.rpc_getblocknumber()
33 getwork, height = bitcoin.getwork.BlockAttempt.from_getwork((yield getwork_df)), (yield height_df)
35 # get rid of residual errors
36 getwork_df.addErrback(lambda fail: None)
37 height_df.addErrback(lambda fail: None)
38 defer.returnValue((getwork, height))
40 @deferral.retry('Error getting payout script from bitcoind:', 1)
41 @defer.inlineCallbacks
42 def get_payout_script(factory):
43 res = yield (yield factory.getProtocol()).check_order(order=bitcoin.p2p.Protocol.null_order)
44 if res['reply'] == 'success':
45 defer.returnValue(res['script'])
46 elif res['reply'] == 'denied':
47 defer.returnValue(None)
49 raise ValueError('Unexpected reply: %r' % (res,))
51 @deferral.retry('Error creating payout script:', 10)
52 @defer.inlineCallbacks
53 def get_payout_script2(bitcoind, net):
54 defer.returnValue(bitcoin.data.pubkey_hash_to_script2(bitcoin.data.address_to_pubkey_hash((yield bitcoind.rpc_getaccountaddress('p2pool')), net)))
56 @defer.inlineCallbacks
62 print 'p2pool (version %s)' % (p2pool_init.__version__,)
65 # connect to bitcoind over JSON-RPC and do initial getwork
66 url = 'http://%s:%i/' % (args.bitcoind_address, args.bitcoind_rpc_port)
67 print '''Testing bitcoind RPC connection to '%s' with username '%s'...''' % (url, args.bitcoind_rpc_username)
68 bitcoind = jsonrpc.Proxy(url, (args.bitcoind_rpc_username, args.bitcoind_rpc_password))
69 temp_work, temp_height = yield getwork(bitcoind)
71 print ' Current block hash: %x height: %i' % (temp_work.previous_block, temp_height)
74 # connect to bitcoind over bitcoin-p2p and do checkorder to get pubkey to send payouts to
75 print '''Testing bitcoind P2P connection to '%s:%s'...''' % (args.bitcoind_address, args.bitcoind_p2p_port)
76 factory = bitcoin.p2p.ClientFactory(args.net)
77 reactor.connectTCP(args.bitcoind_address, args.bitcoind_p2p_port, factory)
78 my_script = yield get_payout_script(factory)
79 if args.pubkey_hash is None:
81 print ' IP transaction denied ... falling back to sending to address.'
82 my_script = yield get_payout_script2(bitcoind, args.net)
84 my_script = bitcoin.data.pubkey_hash_to_script2(args.pubkey_hash)
86 print ' Payout script:', my_script.encode('hex')
89 ht = bitcoin.p2p.HeightTracker(factory)
91 tracker = p2pool.OkayTracker(args.net)
92 chains = expiring_dict.ExpiringDict(300)
93 def get_chain(chain_id_data):
94 return chains.setdefault(chain_id_data, Chain(chain_id_data))
96 peer_heads = expiring_dict.ExpiringDict(300) # hash -> peers that know of it
98 # information affecting work that should trigger a long-polling update
99 current_work = variable.Variable(None)
100 # information affecting work that should not trigger a long-polling update
101 current_work2 = variable.Variable(None)
103 work_updated = variable.Event()
104 tracker_updated = variable.Event()
106 requested = expiring_dict.ExpiringDict(300)
108 @defer.inlineCallbacks
109 def set_real_work1():
110 work, height = yield getwork(bitcoind)
111 # XXX call tracker_updated
112 current_work.set(dict(
113 version=work.version,
114 previous_block=work.previous_block,
117 best_share_hash=current_work.value['best_share_hash'] if current_work.value is not None else None,
119 current_work2.set(dict(
120 clock_offset=time.time() - work.timestamp,
123 def set_real_work2():
124 best, desired = tracker.think(ht, current_work.value['previous_block'], time.time() - current_work2.value['clock_offset'])
126 t = dict(current_work.value)
127 t['best_share_hash'] = best
131 for peer2, share_hash in desired:
132 #if share_hash not in tracker.tails: # was received in the time tracker.think was running
134 last_request_time, count = requested.get(share_hash, (None, 0))
135 if last_request_time is not None and last_request_time - 5 < t < last_request_time + 10 * 1.5**count:
137 potential_peers = set()
138 for head in tracker.tails[share_hash]:
139 potential_peers.update(peer_heads.get(head, set()))
140 potential_peers = [peer for peer in potential_peers if peer.connected2]
141 if count == 0 and peer2 is not None and peer2.connected2:
144 peer = random.choice(potential_peers) if potential_peers and random.random() > .2 else peer2
148 print 'Requesting parent share %s from %s' % (p2pool.format_hash(share_hash), '%s:%i' % peer.addr)
152 stops=list(set(tracker.heads) | set(
153 tracker.get_nth_parent_hash(head, min(max(0, tracker.get_height_and_last(head)[0] - 1), 10)) for head in tracker.heads
156 requested[share_hash] = t, count + 1
158 print 'Initializing work...'
159 yield set_real_work1()
163 start_time = time.time() - current_work2.value['clock_offset']
165 # setup p2p logic and join p2pool network
167 def share_share(share, ignore_peer=None):
168 for peer in p2p_node.peers.itervalues():
169 if peer is ignore_peer:
171 #if p2pool_init.DEBUG:
172 # print "Sending share %s to %r" % (p2pool.format_hash(share.hash), peer.addr)
173 peer.send_shares([share])
176 def p2p_shares(shares, peer=None):
178 print 'Processing %i shares...' % (len(shares),)
182 if share.hash in tracker.shares:
183 #print 'Got duplicate share, ignoring. Hash: %s' % (p2pool.format_hash(share.hash),)
187 #print 'Received share %s from %r' % (p2pool.format_hash(share.hash), share.peer.addr if share.peer is not None else None)
190 #for peer2, share_hash in desired:
191 # print 'Requesting parent share %x' % (share_hash,)
192 # peer2.send_getshares(hashes=[share_hash], parents=2000)
194 if share.bitcoin_hash <= share.header['target']:
196 print 'GOT BLOCK! Passing to bitcoind! %s bitcoin: %x' % (p2pool.format_hash(share.hash), share.bitcoin_hash,)
198 if factory.conn.value is not None:
199 factory.conn.value.send_block(block=share.as_block(tracker, args.net))
201 print 'No bitcoind connection! Erp!'
203 if shares and peer is not None:
204 peer_heads.setdefault(shares[0].hash, set()).add(peer)
207 tracker_updated.happened()
210 print '... done processing %i shares. Have: %i/~%i' % (len(shares), len(tracker.shares), 2*args.net.CHAIN_LENGTH)
212 def p2p_share_hashes(share_hashes, peer):
215 for share_hash in share_hashes:
216 if share_hash in tracker.shares:
218 last_request_time, count = requested.get(share_hash, (None, 0))
219 if last_request_time is not None and last_request_time - 5 < t < last_request_time + 10 * 1.5**count:
221 print 'Got share hash, requesting! Hash: %s' % (p2pool.format_hash(share_hash),)
222 get_hashes.append(share_hash)
223 requested[share_hash] = t, count + 1
225 if share_hashes and peer is not None:
226 peer_heads.setdefault(share_hashes[0], set()).add(peer)
228 peer.send_getshares(hashes=get_hashes, parents=0, stops=[])
230 def p2p_get_shares(share_hashes, parents, stops, peer):
231 parents = min(parents, 1000//len(share_hashes))
234 for share_hash in share_hashes:
235 for share in itertools.islice(tracker.get_chain_known(share_hash), parents + 1):
236 if share.hash in stops:
239 peer.send_shares(shares, full=True)
241 print 'Joining p2pool network using TCP port %i...' % (args.p2pool_port,)
245 ip, port = x.split(':')
248 return x, args.net.P2P_PORT
251 ('72.14.191.28', args.net.P2P_PORT),
252 ('62.204.197.159', args.net.P2P_PORT),
253 ('142.58.248.28', args.net.P2P_PORT),
254 ('94.23.34.145', args.net.P2P_PORT),
258 'dabuttonfactory.com',
261 nodes.add(((yield reactor.resolve(host)), args.net.P2P_PORT))
263 log.err(None, 'Error resolving bootstrap node IP:')
266 current_work=current_work,
267 port=args.p2pool_port,
269 addr_store=db.SQLiteDict(sqlite3.connect(os.path.join(os.path.dirname(sys.argv[0]), 'addrs.dat'), isolation_level=None), args.net.ADDRS_TABLE),
270 mode=0 if args.low_bandwidth else 1,
271 preferred_addrs=set(map(parse, args.p2pool_nodes)) | nodes,
273 p2p_node.handle_shares = p2p_shares
274 p2p_node.handle_share_hashes = p2p_share_hashes
275 p2p_node.handle_get_shares = p2p_get_shares
279 # send share when the chain changes to their chain
280 def work_changed(new_work):
281 #print 'Work changed:', new_work
282 for share in tracker.get_chain_known(new_work['best_share_hash']):
285 share_share(share, share.peer)
286 current_work.changed.watch(work_changed)
291 @defer.inlineCallbacks
295 is_lan, lan_ip = yield ipdiscover.get_local_ip()
298 pm = yield portmapper.get_port_mapper()
299 yield pm._upnp.add_port_mapping(lan_ip, args.net.P2P_PORT, args.net.P2P_PORT, 'p2pool', 'TCP')
302 yield deferral.sleep(random.expovariate(1/120))
307 # start listening for workers with a JSON-RPC server
309 print 'Listening for workers on port %i...' % (args.worker_port,)
313 merkle_root_to_transactions = expiring_dict.ExpiringDict(300)
314 run_identifier = struct.pack('<Q', random.randrange(2**64))
316 def compute(state, payout_script):
317 if payout_script is None:
318 payout_script = my_script
319 if state['best_share_hash'] is None and args.net.PERSIST:
320 raise jsonrpc.Error(-12345, u'p2pool is downloading shares')
321 pre_extra_txs = [tx for tx in tx_pool.itervalues() if tx.is_good()]
322 pre_extra_txs = pre_extra_txs[:2**16 - 1] # merkle_branch limit
325 for tx in pre_extra_txs:
326 this_size = len(bitcoin.data.tx_type.pack(tx.tx))
327 if size + this_size > 500000:
332 # XXX assuming generate_tx is smallish here..
333 generate_tx = p2pool.generate_transaction(
335 previous_share_hash=state['best_share_hash'],
336 new_script=payout_script,
337 subsidy=(50*100000000 >> (state['height'] + 1)//210000) + sum(tx.value_in - tx.value_out for tx in extra_txs),
338 nonce=run_identifier + struct.pack('<Q', random.randrange(2**64)),
339 block_target=state['target'],
342 print 'Generating! Difficulty: %.06f Payout if block: %.6f BTC' % (0xffff*2**208/p2pool.coinbase_type.unpack(generate_tx['tx_ins'][0]['script'])['share_data']['target'], generate_tx['tx_outs'][-1]['value']*1e-8)
343 #print 'Target: %x' % (p2pool.coinbase_type.unpack(generate_tx['tx_ins'][0]['script'])['share_data']['target'],)
344 #, have', shares.count(my_script) - 2, 'share(s) in the current chain. Fee:', sum(tx.value_in - tx.value_out for tx in extra_txs)/100000000
345 transactions = [generate_tx] + [tx.tx for tx in extra_txs]
346 merkle_root = bitcoin.data.merkle_hash(transactions)
347 merkle_root_to_transactions[merkle_root] = transactions # will stay for 1000 seconds
349 timestamp = int(time.time() - current_work2.value['clock_offset'])
350 if state['best_share_hash'] is not None:
351 timestamp2 = math.median((s.timestamp for s in itertools.islice(tracker.get_chain_to_root(state['best_share_hash']), 11)), use_float=False) + 1
352 if timestamp2 > timestamp:
353 print 'Toff', timestamp2 - timestamp
354 timestamp = timestamp2
355 target2 = p2pool.coinbase_type.unpack(generate_tx['tx_ins'][0]['script'])['share_data']['target']
356 times[p2pool.coinbase_type.unpack(generate_tx['tx_ins'][0]['script'])['share_data']['nonce']] = time.time()
357 #print 'SENT', 2**256//p2pool.coinbase_type.unpack(generate_tx['tx_ins'][0]['script'])['share_data']['target']
358 return bitcoin.getwork.BlockAttempt(state['version'], state['previous_block'], merkle_root, timestamp, state['target'], target2)
363 def got_response(data):
365 # match up with transactions
366 header = bitcoin.getwork.decode_data(data)
367 transactions = merkle_root_to_transactions.get(header['merkle_root'], None)
368 if transactions is None:
369 print '''Couldn't link returned work's merkle root with its transactions - should only happen if you recently restarted p2pool'''
371 block = dict(header=header, txs=transactions)
372 hash_ = bitcoin.data.block_header_type.hash256(block['header'])
373 if hash_ <= block['header']['target'] or p2pool_init.DEBUG:
375 print 'GOT BLOCK! Passing to bitcoind! bitcoin: %x' % (hash_,)
377 if factory.conn.value is not None:
378 factory.conn.value.send_block(block=block)
380 print 'No bitcoind connection! Erp!'
381 target = p2pool.coinbase_type.unpack(transactions[0]['tx_ins'][0]['script'])['share_data']['target']
383 print 'Received invalid share from worker - %x/%x' % (hash_, target)
385 share = p2pool.Share.from_block(block)
386 my_shares.add(share.hash)
387 print 'GOT SHARE! %s prev %s age %.2fs' % (p2pool.format_hash(share.hash), p2pool.format_hash(share.previous_hash), time.time() - times[share.nonce]) + (' DEAD ON ARRIVAL' if share.previous_hash != current_work.value['best_share_hash'] else '')
388 good = share.previous_hash == current_work.value['best_share_hash']
389 # maybe revert back to tracker being non-blocking so 'good' can be more accurate?
391 # eg. good = share.hash == current_work.value['best_share_hash'] here
394 log.err(None, 'Error processing data received from worker:')
397 web_root = worker_interface.WorkerInterface(current_work, compute, got_response, args.net)
400 if current_work.value['best_share_hash'] is not None:
401 height, last = tracker.get_height_and_last(current_work.value['best_share_hash'])
402 att_s = p2pool.get_pool_attempts_per_second(tracker, current_work.value['best_share_hash'], args.net, min(height, 720))
403 return json.dumps(att_s)
404 return json.dumps(None)
407 height, last = tracker.get_height_and_last(current_work.value['best_share_hash'])
408 weights, total_weight = tracker.get_cumulative_weights(current_work.value['best_share_hash'], min(height, 720), 2**256)
410 for script in sorted(weights, key=lambda s: weights[s]):
411 res[script.encode('hex')] = weights[script]/total_weight
412 return json.dumps(res)
414 class WebInterface(resource.Resource):
415 def __init__(self, func, mime_type):
416 self.func, self.mime_type = func, mime_type
418 def render_GET(self, request):
419 request.setHeader('Content-Type', self.mime_type)
422 web_root.putChild('rate', WebInterface(get_rate, 'application/json'))
423 web_root.putChild('users', WebInterface(get_users, 'application/json'))
425 web_root.putChild('chain_img', WebInterface(lambda: draw.get(tracker, current_work.value['best_share_hash']), 'image/png'))
427 reactor.listenTCP(args.worker_port, server.Site(web_root))
434 tx_pool = expiring_dict.ExpiringDict(600, get_touches=False) # hash -> tx
435 get_raw_transaction = deferral.DeferredCacher(lambda tx_hash: bitcoind.rpc_getrawtransaction('%x' % tx_hash), expiring_dict.ExpiringDict(100))
438 def __init__(self, tx, seen_at_block):
439 self.hash = bitcoin.data.tx_type.hash256(tx)
441 self.seen_at_block = seen_at_block
442 self.mentions = set([bitcoin.data.tx_type.hash256(tx)] + [tx_in['previous_output']['hash'] for tx_in in tx['tx_ins']])
444 #print '%x %r' % (seen_at_block, tx)
445 #for mention in self.mentions:
446 # print '%x' % mention
448 self.parents_all_in_blocks = False
451 self.value_out = sum(txout['value'] for txout in self.tx['tx_outs'])
452 self._find_parents_in_blocks()
454 @defer.inlineCallbacks
455 def _find_parents_in_blocks(self):
456 for tx_in in self.tx['tx_ins']:
458 raw_transaction = yield get_raw_transaction(tx_in['previous_output']['hash'])
461 self.value_in += raw_transaction['tx']['txouts'][tx_in['previous_output']['index']]['value']
462 #print raw_transaction
463 if not raw_transaction['parent_blocks']:
465 self.parents_all_in_blocks = True
468 if not self.parents_all_in_blocks:
474 @defer.inlineCallbacks
477 assert isinstance(tx_hash, (int, long))
478 #print 'REQUESTING', tx_hash
479 tx = yield (yield factory.getProtocol()).get_tx(tx_hash)
481 tx_pool[bitcoin.data.tx_type.hash256(tx)] = Tx(tx, current_work.value['previous_block'])
483 log.err(None, 'Error handling tx:')
484 # disable for now, for testing impact on stales
485 #factory.new_tx.watch(new_tx)
487 def new_block(block_hash):
488 work_updated.happened()
489 factory.new_block.watch(new_block)
491 print 'Started successfully!'
494 @defer.inlineCallbacks
497 flag = work_updated.get_deferred()
499 yield set_real_work1()
502 yield defer.DeferredList([flag, deferral.sleep(random.expovariate(1/1))], fireOnOneCallback=True)
504 @defer.inlineCallbacks
507 flag = tracker_updated.get_deferred()
512 yield defer.DeferredList([flag, deferral.sleep(random.expovariate(1/1))], fireOnOneCallback=True)
517 counter = skiplists.CountsSkipList(tracker, run_identifier)
520 yield deferral.sleep(random.expovariate(1/1))
522 if current_work.value['best_share_hash'] is not None:
523 height, last = tracker.get_height_and_last(current_work.value['best_share_hash'])
525 att_s = p2pool.get_pool_attempts_per_second(tracker, current_work.value['best_share_hash'], args.net)
526 weights, total_weight = tracker.get_cumulative_weights(current_work.value['best_share_hash'], min(height, 120), 2**100)
527 matching_in_chain = counter(current_work.value['best_share_hash'], height)
528 shares_in_chain = my_shares & matching_in_chain
529 stale_shares = my_shares - matching_in_chain
530 print 'Pool: %sH/s in %i shares Recent: %.02f%% >%sH/s Shares: %i (%i stale) Peers: %i' % (
533 weights.get(my_script, 0)/total_weight*100,
534 math.format(weights.get(my_script, 0)/total_weight*att_s),
535 len(shares_in_chain) + len(stale_shares),
539 #weights, total_weight = tracker.get_cumulative_weights(current_work.value['best_share_hash'], min(height, 100), 2**100)
540 #for k, v in weights.iteritems():
541 # print k.encode('hex'), v/total_weight
545 log.err(None, 'Fatal error:')
549 parser = argparse.ArgumentParser(description='p2pool (version %s)' % (p2pool_init.__version__,))
550 parser.add_argument('--version', action='version', version=p2pool_init.__version__)
551 parser.add_argument('--testnet',
552 help='use the testnet',
553 action='store_const', const=p2pool.Testnet, default=p2pool.Mainnet, dest='net')
554 parser.add_argument('--debug',
555 help='debugging mode',
556 action='store_const', const=True, default=False, dest='debug')
557 parser.add_argument('-a', '--address',
558 help='generate to this address (defaults to requesting one from bitcoind)',
559 type=str, action='store', default=None, dest='address')
560 parser.add_argument('--charts',
561 help='generate charts on the web interface (requires PIL and pygame)',
562 action='store_const', const=True, default=False, dest='charts')
564 p2pool_group = parser.add_argument_group('p2pool interface')
565 p2pool_group.add_argument('--p2pool-port', metavar='PORT',
566 help='use TCP port PORT to listen for connections (default: 9333 normally, 19333 for testnet) (forward this port from your router!)',
567 type=int, action='store', default=None, dest='p2pool_port')
568 p2pool_group.add_argument('-n', '--p2pool-node', metavar='ADDR[:PORT]',
569 help='connect to existing p2pool node at ADDR listening on TCP port PORT (defaults to 9333 normally, 19333 for testnet), in addition to builtin addresses',
570 type=str, action='append', default=[], dest='p2pool_nodes')
571 parser.add_argument('-l', '--low-bandwidth',
572 help='trade lower bandwidth usage for higher latency (reduced efficiency)',
573 action='store_true', default=False, dest='low_bandwidth')
574 parser.add_argument('--disable-upnp',
575 help='''don't attempt to forward port 9333 (19333 for testnet) from the WAN to this computer using UPnP''',
576 action='store_false', default=True, dest='upnp')
578 worker_group = parser.add_argument_group('worker interface')
579 worker_group.add_argument('-w', '--worker-port', metavar='PORT',
580 help='listen on PORT for RPC connections from miners asking for work and providing responses (default: 9332)',
581 type=int, action='store', default=9332, dest='worker_port')
583 bitcoind_group = parser.add_argument_group('bitcoind interface')
584 bitcoind_group.add_argument('--bitcoind-address', metavar='BITCOIND_ADDRESS',
585 help='connect to a bitcoind at this address (default: 127.0.0.1)',
586 type=str, action='store', default='127.0.0.1', dest='bitcoind_address')
587 bitcoind_group.add_argument('--bitcoind-rpc-port', metavar='BITCOIND_RPC_PORT',
588 help='connect to a bitcoind at this port over the RPC interface - used to get the current highest block via getwork (default: 8332)',
589 type=int, action='store', default=8332, dest='bitcoind_rpc_port')
590 bitcoind_group.add_argument('--bitcoind-p2p-port', metavar='BITCOIND_P2P_PORT',
591 help='connect to a bitcoind at this port over the p2p interface - used to submit blocks and get the pubkey to generate to via an IP transaction (default: 8333 normally. 18333 for testnet)',
592 type=int, action='store', default=None, dest='bitcoind_p2p_port')
594 bitcoind_group.add_argument(metavar='BITCOIND_RPC_USERNAME',
595 help='bitcoind RPC interface username',
596 type=str, action='store', dest='bitcoind_rpc_username')
597 bitcoind_group.add_argument(metavar='BITCOIND_RPC_PASSWORD',
598 help='bitcoind RPC interface password',
599 type=str, action='store', dest='bitcoind_rpc_password')
601 args = parser.parse_args()
604 p2pool_init.DEBUG = True
605 class TeePipe(object):
606 def __init__(self, outputs):
607 self.outputs = outputs
608 def write(self, data):
609 for output in self.outputs:
612 for output in self.outputs:
614 class TimestampingPipe(object):
615 def __init__(self, inner_file):
616 self.inner_file = inner_file
619 def write(self, data):
620 buf = self.buf + data
621 lines = buf.split('\n')
622 for line in lines[:-1]:
623 self.inner_file.write('%s %s\n' % (datetime.datetime.now().strftime("%H:%M:%S.%f"), line))
624 self.inner_file.flush()
628 sys.stdout = sys.stderr = log.DefaultObserver.stderr = TimestampingPipe(TeePipe([sys.stderr, open(os.path.join(os.path.dirname(sys.argv[0]), 'debug.log'), 'w')]))
630 if args.bitcoind_p2p_port is None:
631 args.bitcoind_p2p_port = args.net.BITCOIN_P2P_PORT
633 if args.p2pool_port is None:
634 args.p2pool_port = args.net.P2P_PORT
636 if args.address is not None:
638 args.pubkey_hash = bitcoin.data.address_to_pubkey_hash(args.address, args.net)
640 raise ValueError('error parsing address: ' + repr(e))
642 args.pubkey_hash = None
644 reactor.callWhenRunning(main, args)