3 from __future__ import division
16 from twisted.internet import defer, reactor
17 from twisted.web import server, resource
18 from twisted.python import log
19 from nattraverso import portmapper, ipdiscover
21 import bitcoin.p2p, bitcoin.getwork, bitcoin.data
22 from util import db, expiring_dict, jsonrpc, variable, deferral, math
23 from . import p2p, worker_interface, skiplists
24 import p2pool.data as p2pool
25 import p2pool as p2pool_init
27 @deferral.retry('Error getting work from bitcoind:', 3)
28 @defer.inlineCallbacks
29 def getwork(bitcoind):
30 # a block could arrive in between these two queries
31 getwork_df, height_df = bitcoind.rpc_getwork(), bitcoind.rpc_getblocknumber()
33 getwork, height = bitcoin.getwork.BlockAttempt.from_getwork((yield getwork_df)), (yield height_df)
35 # get rid of residual errors
36 getwork_df.addErrback(lambda fail: None)
37 height_df.addErrback(lambda fail: None)
38 defer.returnValue((getwork, height))
40 @deferral.retry('Error getting payout script from bitcoind:', 1)
41 @defer.inlineCallbacks
42 def get_payout_script(factory):
43 res = yield (yield factory.getProtocol()).check_order(order=bitcoin.p2p.Protocol.null_order)
44 if res['reply'] == 'success':
45 defer.returnValue(res['script'])
46 elif res['reply'] == 'denied':
47 defer.returnValue(None)
49 raise ValueError('Unexpected reply: %r' % (res,))
51 @deferral.retry('Error creating payout script:', 10)
52 @defer.inlineCallbacks
53 def get_payout_script2(bitcoind, net):
54 defer.returnValue(bitcoin.data.pubkey_hash_to_script2(bitcoin.data.address_to_pubkey_hash((yield bitcoind.rpc_getaccountaddress('p2pool')), net)))
56 @defer.inlineCallbacks
62 print 'p2pool (version %s)' % (p2pool_init.__version__,)
65 # connect to bitcoind over JSON-RPC and do initial getwork
66 url = 'http://%s:%i/' % (args.bitcoind_address, args.bitcoind_rpc_port)
67 print '''Testing bitcoind RPC connection to '%s' with username '%s'...''' % (url, args.bitcoind_rpc_username)
68 bitcoind = jsonrpc.Proxy(url, (args.bitcoind_rpc_username, args.bitcoind_rpc_password))
69 temp_work, temp_height = yield getwork(bitcoind)
71 print ' Current block hash: %x height: %i' % (temp_work.previous_block, temp_height)
74 # connect to bitcoind over bitcoin-p2p and do checkorder to get pubkey to send payouts to
75 print '''Testing bitcoind P2P connection to '%s:%s'...''' % (args.bitcoind_address, args.bitcoind_p2p_port)
76 factory = bitcoin.p2p.ClientFactory(args.net)
77 reactor.connectTCP(args.bitcoind_address, args.bitcoind_p2p_port, factory)
78 my_script = yield get_payout_script(factory)
79 if args.pubkey_hash is None:
81 print ' IP transaction denied ... falling back to sending to address.'
82 my_script = yield get_payout_script2(bitcoind, args.net)
84 my_script = bitcoin.data.pubkey_hash_to_script2(args.pubkey_hash)
86 print ' Payout script:', my_script.encode('hex')
89 ht = bitcoin.p2p.HeightTracker(factory)
91 tracker = p2pool.OkayTracker(args.net)
92 chains = expiring_dict.ExpiringDict(300)
93 def get_chain(chain_id_data):
94 return chains.setdefault(chain_id_data, Chain(chain_id_data))
96 peer_heads = expiring_dict.ExpiringDict(300) # hash -> peers that know of it
98 # information affecting work that should trigger a long-polling update
99 current_work = variable.Variable(None)
100 # information affecting work that should not trigger a long-polling update
101 current_work2 = variable.Variable(None)
103 work_updated = variable.Event()
104 tracker_updated = variable.Event()
106 requested = expiring_dict.ExpiringDict(300)
108 @defer.inlineCallbacks
109 def set_real_work1():
110 work, height = yield getwork(bitcoind)
111 # XXX call tracker_updated
112 current_work.set(dict(
113 version=work.version,
114 previous_block=work.previous_block,
117 best_share_hash=current_work.value['best_share_hash'] if current_work.value is not None else None,
119 current_work2.set(dict(
120 clock_offset=time.time() - work.timestamp,
123 def set_real_work2():
124 best, desired = tracker.think(ht, current_work.value['previous_block'], time.time() - current_work2.value['clock_offset'])
126 t = dict(current_work.value)
127 t['best_share_hash'] = best
131 for peer2, share_hash in desired:
132 #if share_hash not in tracker.tails: # was received in the time tracker.think was running
134 last_request_time, count = requested.get(share_hash, (None, 0))
135 if last_request_time is not None and last_request_time - 5 < t < last_request_time + 10 * 1.5**count:
137 potential_peers = set()
138 for head in tracker.tails[share_hash]:
139 potential_peers.update(peer_heads.get(head, set()))
140 potential_peers = [peer for peer in potential_peers if peer.connected2]
141 if count == 0 and peer2 is not None and peer2.connected2:
144 peer = random.choice(potential_peers) if potential_peers and random.random() > .2 else peer2
148 print 'Requesting parent share %s from %s' % (p2pool.format_hash(share_hash), '%s:%i' % peer.addr)
152 stops=list(set(tracker.heads) | set(
153 tracker.get_nth_parent_hash(head, min(max(0, tracker.get_height_and_last(head)[0] - 1), 10)) for head in tracker.heads
156 requested[share_hash] = t, count + 1
158 print 'Initializing work...'
159 yield set_real_work1()
163 start_time = time.time() - current_work2.value['clock_offset']
165 # setup p2p logic and join p2pool network
167 def share_share(share, ignore_peer=None):
168 for peer in p2p_node.peers.itervalues():
169 if peer is ignore_peer:
171 #if p2pool_init.DEBUG:
172 # print "Sending share %s to %r" % (p2pool.format_hash(share.hash), peer.addr)
173 peer.send_shares([share])
176 def p2p_shares(shares, peer=None):
178 print 'Processing %i shares...' % (len(shares),)
182 if share.hash in tracker.shares:
183 #print 'Got duplicate share, ignoring. Hash: %s' % (p2pool.format_hash(share.hash),)
187 #print 'Received share %s from %r' % (p2pool.format_hash(share.hash), share.peer.addr if share.peer is not None else None)
190 #for peer2, share_hash in desired:
191 # print 'Requesting parent share %x' % (share_hash,)
192 # peer2.send_getshares(hashes=[share_hash], parents=2000)
194 if share.bitcoin_hash <= share.header['target']:
196 print 'GOT BLOCK! Passing to bitcoind! %s bitcoin: %x' % (p2pool.format_hash(share.hash), share.bitcoin_hash,)
198 if factory.conn.value is not None:
199 factory.conn.value.send_block(block=share.as_block(tracker, args.net))
201 print 'No bitcoind connection! Erp!'
203 if shares and peer is not None:
204 peer_heads.setdefault(shares[0].hash, set()).add(peer)
207 tracker_updated.happened()
210 print '... done processing %i shares. Have: %i/~%i' % (len(shares), len(tracker.shares), 2*args.net.CHAIN_LENGTH)
212 def p2p_share_hashes(share_hashes, peer):
215 for share_hash in share_hashes:
216 if share_hash in tracker.shares:
218 last_request_time, count = requested.get(share_hash, (None, 0))
219 if last_request_time is not None and last_request_time - 5 < t < last_request_time + 10 * 1.5**count:
221 print 'Got share hash, requesting! Hash: %s' % (p2pool.format_hash(share_hash),)
222 get_hashes.append(share_hash)
223 requested[share_hash] = t, count + 1
225 if share_hashes and peer is not None:
226 peer_heads.setdefault(share_hashes[0], set()).add(peer)
228 peer.send_getshares(hashes=get_hashes, parents=0, stops=[])
230 def p2p_get_shares(share_hashes, parents, stops, peer):
231 parents = min(parents, 1000//len(share_hashes))
234 for share_hash in share_hashes:
235 for share in itertools.islice(tracker.get_chain_known(share_hash), parents + 1):
236 if share.hash in stops:
239 peer.send_shares(shares, full=True)
241 print 'Joining p2pool network using TCP port %i...' % (args.p2pool_port,)
245 ip, port = x.split(':')
248 return x, args.net.P2P_PORT
251 ('72.14.191.28', args.net.P2P_PORT),
252 ('62.204.197.159', args.net.P2P_PORT),
253 ('142.58.248.28', args.net.P2P_PORT),
254 ('94.23.34.145', args.net.P2P_PORT),
258 'dabuttonfactory.com',
261 nodes.add(((yield reactor.resolve(host)), args.net.P2P_PORT))
263 log.err(None, 'Error resolving bootstrap node IP:')
266 current_work=current_work,
267 port=args.p2pool_port,
269 addr_store=db.SQLiteDict(sqlite3.connect(os.path.join(os.path.dirname(sys.argv[0]), 'addrs.dat'), isolation_level=None), args.net.ADDRS_TABLE),
270 mode=0 if args.low_bandwidth else 1,
271 preferred_addrs=set(map(parse, args.p2pool_nodes)) | nodes,
273 p2p_node.handle_shares = p2p_shares
274 p2p_node.handle_share_hashes = p2p_share_hashes
275 p2p_node.handle_get_shares = p2p_get_shares
279 # send share when the chain changes to their chain
280 def work_changed(new_work):
281 #print 'Work changed:', new_work
282 for share in tracker.get_chain_known(new_work['best_share_hash']):
285 share_share(share, share.peer)
286 current_work.changed.watch(work_changed)
291 @defer.inlineCallbacks
295 is_lan, lan_ip = yield ipdiscover.get_local_ip()
298 pm = yield portmapper.get_port_mapper()
299 yield pm._upnp.add_port_mapping(lan_ip, args.net.P2P_PORT, args.net.P2P_PORT, 'p2pool', 'TCP')
302 yield deferral.sleep(random.expovariate(1/120))
307 # start listening for workers with a JSON-RPC server
309 print 'Listening for workers on port %i...' % (args.worker_port,)
313 merkle_root_to_transactions = expiring_dict.ExpiringDict(300)
314 run_identifier = struct.pack('<Q', random.randrange(2**64))
316 def compute(state, payout_script):
317 if payout_script is None:
318 payout_script = my_script
319 if state['best_share_hash'] is None and args.net.PERSIST:
320 raise jsonrpc.Error(-12345, u'p2pool is downloading shares')
321 pre_extra_txs = [tx for tx in tx_pool.itervalues() if tx.is_good()]
322 pre_extra_txs = pre_extra_txs[:2**16 - 1] # merkle_branch limit
325 for tx in pre_extra_txs:
326 this_size = len(bitcoin.data.tx_type.pack(tx.tx))
327 if size + this_size > 500000:
332 # XXX assuming generate_tx is smallish here..
333 generate_tx = p2pool.generate_transaction(
335 previous_share_hash=state['best_share_hash'],
336 new_script=payout_script,
337 subsidy=(50*100000000 >> (state['height'] + 1)//210000) + sum(tx.value_in - tx.value_out for tx in extra_txs),
338 nonce=run_identifier + struct.pack('<Q', random.randrange(2**64)),
339 block_target=state['target'],
342 print 'Generating! Difficulty: %.06f Payout if block: %.6f BTC' % (0xffff*2**208/p2pool.coinbase_type.unpack(generate_tx['tx_ins'][0]['script'])['share_data']['target'], generate_tx['tx_outs'][-1]['value']*1e-8)
343 #print 'Target: %x' % (p2pool.coinbase_type.unpack(generate_tx['tx_ins'][0]['script'])['share_data']['target'],)
344 #, have', shares.count(my_script) - 2, 'share(s) in the current chain. Fee:', sum(tx.value_in - tx.value_out for tx in extra_txs)/100000000
345 transactions = [generate_tx] + [tx.tx for tx in extra_txs]
346 merkle_root = bitcoin.data.merkle_hash(transactions)
347 merkle_root_to_transactions[merkle_root] = transactions # will stay for 1000 seconds
349 timestamp = int(time.time() - current_work2.value['clock_offset'])
350 if state['best_share_hash'] is not None:
351 timestamp2 = math.median((s.timestamp for s in itertools.islice(tracker.get_chain_to_root(state['best_share_hash']), 11)), use_float=False) + 1
352 if timestamp2 > timestamp:
353 print 'Toff', timestamp2 - timestamp
354 timestamp = timestamp2
355 target2 = p2pool.coinbase_type.unpack(generate_tx['tx_ins'][0]['script'])['share_data']['target']
356 times[p2pool.coinbase_type.unpack(generate_tx['tx_ins'][0]['script'])['share_data']['nonce']] = time.time()
357 #print 'SENT', 2**256//p2pool.coinbase_type.unpack(generate_tx['tx_ins'][0]['script'])['share_data']['target']
358 return bitcoin.getwork.BlockAttempt(state['version'], state['previous_block'], merkle_root, timestamp, state['target'], target2)
363 def got_response(data):
365 # match up with transactions
366 header = bitcoin.getwork.decode_data(data)
367 transactions = merkle_root_to_transactions.get(header['merkle_root'], None)
368 if transactions is None:
369 print '''Couldn't link returned work's merkle root with its transactions - should only happen if you recently restarted p2pool'''
371 block = dict(header=header, txs=transactions)
372 hash_ = bitcoin.data.block_header_type.hash256(block['header'])
373 if hash_ <= block['header']['target'] or p2pool_init.DEBUG:
375 print 'GOT BLOCK! Passing to bitcoind! bitcoin: %x' % (hash_,)
377 if factory.conn.value is not None:
378 factory.conn.value.send_block(block=block)
380 print 'No bitcoind connection! Erp!'
381 target = p2pool.coinbase_type.unpack(transactions[0]['tx_ins'][0]['script'])['share_data']['target']
383 print 'Received invalid share from worker - %x/%x' % (hash_, target)
385 share = p2pool.Share.from_block(block)
386 my_shares.add(share.hash)
387 print 'GOT SHARE! %s prev %s age %.2fs' % (p2pool.format_hash(share.hash), p2pool.format_hash(share.previous_hash), time.time() - times[share.nonce]) + (' DEAD ON ARRIVAL' if share.previous_hash != current_work.value['best_share_hash'] else '')
388 good = share.previous_hash == current_work.value['best_share_hash']
389 # maybe revert back to tracker being non-blocking so 'good' can be more accurate?
391 # eg. good = share.hash == current_work.value['best_share_hash'] here
394 log.err(None, 'Error processing data received from worker:')
397 web_root = worker_interface.WorkerInterface(current_work, compute, got_response, args.net)
400 if current_work.value['best_share_hash'] is not None:
401 height, last = tracker.get_height_and_last(current_work.value['best_share_hash'])
402 att_s = p2pool.get_pool_attempts_per_second(tracker, current_work.value['best_share_hash'], args.net, min(height, 720))
403 return json.dumps(att_s)
404 return json.dumps(None)
407 height, last = tracker.get_height_and_last(current_work.value['best_share_hash'])
408 weights, total_weight = tracker.get_cumulative_weights(current_work.value['best_share_hash'], min(height, 720), 2**256)
410 for script in sorted(weights, key=lambda s: weights[s]):
411 res[script.encode('hex')] = weights[script]/total_weight
412 return json.dumps(res)
414 class WebInterface(resource.Resource):
415 def __init__(self, func, mime_type):
416 self.func, self.mime_type = func, mime_type
418 def render_GET(self, request):
419 request.setHeader('Content-Type', self.mime_type)
422 web_root.putChild('rate', WebInterface(get_rate, 'application/json'))
423 web_root.putChild('users', WebInterface(get_users, 'application/json'))
425 web_root.putChild('chain_img', WebInterface(lambda: draw.get(tracker, current_work.value['best_share_hash']), 'image/png'))
427 reactor.listenTCP(args.worker_port, server.Site(web_root))
434 tx_pool = expiring_dict.ExpiringDict(600, get_touches=False) # hash -> tx
435 get_raw_transaction = deferral.DeferredCacher(lambda tx_hash: bitcoind.rpc_getrawtransaction('%x' % tx_hash), expiring_dict.ExpiringDict(100))
438 def __init__(self, tx, seen_at_block):
439 self.hash = bitcoin.data.tx_type.hash256(tx)
441 self.seen_at_block = seen_at_block
442 self.mentions = set([bitcoin.data.tx_type.hash256(tx)] + [tx_in['previous_output']['hash'] for tx_in in tx['tx_ins']])
444 #print '%x %r' % (seen_at_block, tx)
445 #for mention in self.mentions:
446 # print '%x' % mention
448 self.parents_all_in_blocks = False
451 self.value_out = sum(txout['value'] for txout in self.tx['tx_outs'])
452 self._find_parents_in_blocks()
454 @defer.inlineCallbacks
455 def _find_parents_in_blocks(self):
456 for tx_in in self.tx['tx_ins']:
458 raw_transaction = yield get_raw_transaction(tx_in['previous_output']['hash'])
461 self.value_in += raw_transaction['tx']['txouts'][tx_in['previous_output']['index']]['value']
462 #print raw_transaction
463 if not raw_transaction['parent_blocks']:
465 self.parents_all_in_blocks = True
468 if not self.parents_all_in_blocks:
474 @defer.inlineCallbacks
477 assert isinstance(tx_hash, (int, long))
478 #print 'REQUESTING', tx_hash
479 tx = yield (yield factory.getProtocol()).get_tx(tx_hash)
481 tx_pool[bitcoin.data.tx_type.hash256(tx)] = Tx(tx, current_work.value['previous_block'])
483 log.err(None, 'Error handling tx:')
484 # disable for now, for testing impact on stales
485 #factory.new_tx.watch(new_tx)
487 def new_block(block_hash):
488 work_updated.happened()
489 factory.new_block.watch(new_block)
491 print 'Started successfully!'
494 ht.updated.watch(lambda x: set_real_work2())
496 @defer.inlineCallbacks
499 flag = work_updated.get_deferred()
501 yield set_real_work1()
504 yield defer.DeferredList([flag, deferral.sleep(random.expovariate(1/1))], fireOnOneCallback=True)
506 @defer.inlineCallbacks
509 flag = tracker_updated.get_deferred()
514 yield defer.DeferredList([flag, deferral.sleep(random.expovariate(1/1))], fireOnOneCallback=True)
519 counter = skiplists.CountsSkipList(tracker, run_identifier)
522 yield deferral.sleep(3)
524 if current_work.value['best_share_hash'] is not None:
525 height, last = tracker.get_height_and_last(current_work.value['best_share_hash'])
527 att_s = p2pool.get_pool_attempts_per_second(tracker, current_work.value['best_share_hash'], args.net)
528 weights, total_weight = tracker.get_cumulative_weights(current_work.value['best_share_hash'], min(height, 120), 2**100)
529 matching_in_chain = counter(current_work.value['best_share_hash'], height)
530 shares_in_chain = my_shares & matching_in_chain
531 stale_shares = my_shares - matching_in_chain
532 print 'Pool: %sH/s in %i shares Recent: %.02f%% >%sH/s Shares: %i (%i stale) Peers: %i' % (
535 weights.get(my_script, 0)/total_weight*100,
536 math.format(weights.get(my_script, 0)/total_weight*att_s),
537 len(shares_in_chain) + len(stale_shares),
541 #weights, total_weight = tracker.get_cumulative_weights(current_work.value['best_share_hash'], min(height, 100), 2**100)
542 #for k, v in weights.iteritems():
543 # print k.encode('hex'), v/total_weight
547 log.err(None, 'Fatal error:')
551 parser = argparse.ArgumentParser(description='p2pool (version %s)' % (p2pool_init.__version__,))
552 parser.add_argument('--version', action='version', version=p2pool_init.__version__)
553 parser.add_argument('--testnet',
554 help='use the testnet',
555 action='store_const', const=p2pool.Testnet, default=p2pool.Mainnet, dest='net')
556 parser.add_argument('--debug',
557 help='debugging mode',
558 action='store_const', const=True, default=False, dest='debug')
559 parser.add_argument('-a', '--address',
560 help='generate to this address (defaults to requesting one from bitcoind)',
561 type=str, action='store', default=None, dest='address')
562 parser.add_argument('--charts',
563 help='generate charts on the web interface (requires PIL and pygame)',
564 action='store_const', const=True, default=False, dest='charts')
566 p2pool_group = parser.add_argument_group('p2pool interface')
567 p2pool_group.add_argument('--p2pool-port', metavar='PORT',
568 help='use TCP port PORT to listen for connections (default: 9333 normally, 19333 for testnet) (forward this port from your router!)',
569 type=int, action='store', default=None, dest='p2pool_port')
570 p2pool_group.add_argument('-n', '--p2pool-node', metavar='ADDR[:PORT]',
571 help='connect to existing p2pool node at ADDR listening on TCP port PORT (defaults to 9333 normally, 19333 for testnet), in addition to builtin addresses',
572 type=str, action='append', default=[], dest='p2pool_nodes')
573 parser.add_argument('-l', '--low-bandwidth',
574 help='trade lower bandwidth usage for higher latency (reduced efficiency)',
575 action='store_true', default=False, dest='low_bandwidth')
576 parser.add_argument('--disable-upnp',
577 help='''don't attempt to forward port 9333 (19333 for testnet) from the WAN to this computer using UPnP''',
578 action='store_false', default=True, dest='upnp')
580 worker_group = parser.add_argument_group('worker interface')
581 worker_group.add_argument('-w', '--worker-port', metavar='PORT',
582 help='listen on PORT for RPC connections from miners asking for work and providing responses (default: 9332)',
583 type=int, action='store', default=9332, dest='worker_port')
585 bitcoind_group = parser.add_argument_group('bitcoind interface')
586 bitcoind_group.add_argument('--bitcoind-address', metavar='BITCOIND_ADDRESS',
587 help='connect to a bitcoind at this address (default: 127.0.0.1)',
588 type=str, action='store', default='127.0.0.1', dest='bitcoind_address')
589 bitcoind_group.add_argument('--bitcoind-rpc-port', metavar='BITCOIND_RPC_PORT',
590 help='connect to a bitcoind at this port over the RPC interface - used to get the current highest block via getwork (default: 8332)',
591 type=int, action='store', default=8332, dest='bitcoind_rpc_port')
592 bitcoind_group.add_argument('--bitcoind-p2p-port', metavar='BITCOIND_P2P_PORT',
593 help='connect to a bitcoind at this port over the p2p interface - used to submit blocks and get the pubkey to generate to via an IP transaction (default: 8333 normally. 18333 for testnet)',
594 type=int, action='store', default=None, dest='bitcoind_p2p_port')
596 bitcoind_group.add_argument(metavar='BITCOIND_RPC_USERNAME',
597 help='bitcoind RPC interface username',
598 type=str, action='store', dest='bitcoind_rpc_username')
599 bitcoind_group.add_argument(metavar='BITCOIND_RPC_PASSWORD',
600 help='bitcoind RPC interface password',
601 type=str, action='store', dest='bitcoind_rpc_password')
603 args = parser.parse_args()
606 p2pool_init.DEBUG = True
607 class TeePipe(object):
608 def __init__(self, outputs):
609 self.outputs = outputs
610 def write(self, data):
611 for output in self.outputs:
614 for output in self.outputs:
616 class TimestampingPipe(object):
617 def __init__(self, inner_file):
618 self.inner_file = inner_file
621 def write(self, data):
622 buf = self.buf + data
623 lines = buf.split('\n')
624 for line in lines[:-1]:
625 self.inner_file.write('%s %s\n' % (datetime.datetime.now().strftime("%H:%M:%S.%f"), line))
626 self.inner_file.flush()
630 sys.stdout = sys.stderr = log.DefaultObserver.stderr = TimestampingPipe(TeePipe([sys.stderr, open(os.path.join(os.path.dirname(sys.argv[0]), 'debug.log'), 'w')]))
632 if args.bitcoind_p2p_port is None:
633 args.bitcoind_p2p_port = args.net.BITCOIN_P2P_PORT
635 if args.p2pool_port is None:
636 args.p2pool_port = args.net.P2P_PORT
638 if args.address is not None:
640 args.pubkey_hash = bitcoin.data.address_to_pubkey_hash(args.address, args.net)
642 raise ValueError('error parsing address: ' + repr(e))
644 args.pubkey_hash = None
646 reactor.callWhenRunning(main, args)