3 from __future__ import division
15 from twisted.internet import defer, reactor, task
16 from twisted.web import server
17 from twisted.python import log
19 import bitcoin.p2p, bitcoin.getwork, bitcoin.data
20 from util import db, expiring_dict, jsonrpc, variable, deferral, math, skiplist
21 from . import p2p, worker_interface
22 import p2pool.data as p2pool
23 import p2pool as p2pool_init
25 @deferral.retry('Error getting work from bitcoind:', 3)
26 @defer.inlineCallbacks
27 def getwork(bitcoind):
28 # a block could arrive in between these two queries
29 getwork_df, height_df = bitcoind.rpc_getwork(), bitcoind.rpc_getblocknumber()
31 getwork, height = bitcoin.getwork.BlockAttempt.from_getwork((yield getwork_df)), (yield height_df)
33 # get rid of residual errors
34 getwork_df.addErrback(lambda fail: None)
35 height_df.addErrback(lambda fail: None)
36 defer.returnValue((getwork, height))
38 @deferral.retry('Error getting payout script from bitcoind:', 1)
39 @defer.inlineCallbacks
40 def get_payout_script(factory):
41 res = yield (yield factory.getProtocol()).check_order(order=bitcoin.p2p.Protocol.null_order)
42 if res['reply'] == 'success':
43 my_script = res['script']
44 elif res['reply'] == 'denied':
47 raise ValueError('Unexpected reply: %r' % (res,))
49 @deferral.retry('Error creating payout script:', 10)
50 @defer.inlineCallbacks
51 def get_payout_script2(bitcoind, net):
52 defer.returnValue(bitcoin.data.pubkey_hash_to_script2(bitcoin.data.address_to_pubkey_hash((yield bitcoind.rpc_getaccountaddress('p2pool')), net)))
54 @defer.inlineCallbacks
57 print 'p2pool (version %s)' % (p2pool_init.__version__,)
60 # connect to bitcoind over JSON-RPC and do initial getwork
61 url = 'http://%s:%i/' % (args.bitcoind_address, args.bitcoind_rpc_port)
62 print '''Testing bitcoind RPC connection to '%s' with authorization '%s:%s'...''' % (url, args.bitcoind_rpc_username, args.bitcoind_rpc_password)
63 bitcoind = jsonrpc.Proxy(url, (args.bitcoind_rpc_username, args.bitcoind_rpc_password))
64 temp_work, temp_height = yield getwork(bitcoind)
66 print ' Current block hash: %x height: %i' % (temp_work.previous_block, temp_height)
69 # connect to bitcoind over bitcoin-p2p and do checkorder to get pubkey to send payouts to
70 print '''Testing bitcoind P2P connection to '%s:%s'...''' % (args.bitcoind_address, args.bitcoind_p2p_port)
71 factory = bitcoin.p2p.ClientFactory(args.net)
72 reactor.connectTCP(args.bitcoind_address, args.bitcoind_p2p_port, factory)
73 my_script = yield get_payout_script(factory)
74 if args.pubkey_hash is None:
76 print 'IP transaction denied ... falling back to sending to address.'
77 my_script = yield get_payout_script2(bitcoind, args.net)
79 my_script = bitcoin.data.pubkey_hash_to_script2(args.pubkey_hash)
81 print ' Payout script:', my_script.encode('hex')
84 @defer.inlineCallbacks
85 def real_get_block(block_hash):
86 block = yield (yield factory.getProtocol()).get_block(block_hash)
87 print 'Got block %x' % (block_hash,)
88 defer.returnValue(block)
89 get_block = deferral.DeferredCacher(real_get_block, expiring_dict.ExpiringDict(3600))
91 get_raw_transaction = deferral.DeferredCacher(lambda tx_hash: bitcoind.rpc_getrawtransaction('%x' % tx_hash), expiring_dict.ExpiringDict(100))
93 ht = bitcoin.p2p.HeightTracker(factory)
95 tracker = p2pool.OkayTracker(args.net)
96 chains = expiring_dict.ExpiringDict(300)
97 def get_chain(chain_id_data):
98 return chains.setdefault(chain_id_data, Chain(chain_id_data))
100 # information affecting work that should trigger a long-polling update
101 current_work = variable.Variable(None)
102 # information affecting work that should not trigger a long-polling update
103 current_work2 = variable.Variable(None)
106 task.LoopingCall(requested.clear).start(60)
108 @defer.inlineCallbacks
109 def set_real_work1():
110 work, height = yield getwork(bitcoind)
111 current_work.set(dict(
112 version=work.version,
113 previous_block=work.previous_block,
116 best_share_hash=current_work.value['best_share_hash'] if current_work.value is not None else None,
118 current_work2.set(dict(
122 def set_real_work2():
123 best, desired = tracker.think(ht, current_work.value['previous_block'], current_work2.value['time'])
125 t = dict(current_work.value)
126 t['best_share_hash'] = best
129 for peer2, share_hash in desired:
132 if (peer2.nonce, share_hash) in requested:
134 print 'Requesting parent share %x' % (share_hash % 2**32,)
135 peer2.send_getshares(
138 stops=list(set(tracker.heads) | set(
139 tracker.get_nth_parent_hash(head, min(max(0, tracker.get_height_and_last(head)[0] - 1), 10)) for head in tracker.heads
142 requested.add((peer2.nonce, share_hash))
144 print 'Initializing work...'
145 yield set_real_work1()
146 yield set_real_work2()
149 # setup p2p logic and join p2pool network
151 def share_share(share, ignore_peer=None):
152 for peer in p2p_node.peers.itervalues():
153 if peer is ignore_peer:
155 peer.send_shares([share])
158 def p2p_shares(shares, peer=None):
160 print "Processing %i shares..." % (len(shares),)
164 if share.hash in tracker.shares:
165 #print 'Got duplicate share, ignoring. Hash: %x' % (share.hash % 2**32,)
169 #print 'Received share %x from %r' % (share.hash % 2**32, share.peer.transport.getPeer() if share.peer is not None else None)
172 #for peer2, share_hash in desired:
173 # print 'Requesting parent share %x' % (share_hash,)
174 # peer2.send_getshares(hashes=[share_hash], parents=2000)
176 if share.bitcoin_hash <= share.header['target']:
178 print 'GOT BLOCK! Passing to bitcoind! %x bitcoin: %x' % (share.hash % 2**32, share.bitcoin_hash,)
180 if factory.conn.value is not None:
181 factory.conn.value.send_block(block=share.as_block(tracker, args.net))
183 print 'No bitcoind connection! Erp!'
188 best, desired = tracker.think(ht, current_work.value['previous_block'], current_work2.value['time'])
190 if best == share.hash:
191 print ('MINE: ' if peer is None else '') + 'Accepted share, new best, will pass to peers! Hash: %x' % (share.hash % 2**32,)
193 print ('MINE: ' if peer is None else '') + 'Accepted share, not best. Hash: %x' % (share.hash % 2**32,)
195 w = dict(current_work.value)
196 w['best_share_hash'] = best
200 print "... done processing %i shares." % (len(shares),)
202 def p2p_share_hashes(share_hashes, peer):
204 for share_hash in share_hashes:
205 if share_hash in tracker.shares:
206 pass # print 'Got share hash, already have, ignoring. Hash: %x' % (share_hash % 2**32,)
208 print 'Got share hash, requesting! Hash: %x' % (share_hash % 2**32,)
209 get_hashes.append(share_hash)
211 peer.send_getshares(hashes=get_hashes, parents=0, stops=[])
213 def p2p_get_shares(share_hashes, parents, stops, peer):
214 parents = min(parents, 1000//len(share_hashes))
217 for share_hash in share_hashes:
218 for share in itertools.islice(tracker.get_chain_known(share_hash), parents + 1):
219 if share.hash in stops:
222 peer.send_shares(shares, full=True)
224 print 'Joining p2pool network using TCP port %i...' % (args.p2pool_port,)
228 ip, port = x.split(':')
231 return x, args.net.P2P_PORT
234 ('72.14.191.28', args.net.P2P_PORT),
235 ('62.204.197.159', args.net.P2P_PORT),
238 nodes.append(((yield reactor.resolve('p2pool.forre.st')), args.net.P2P_PORT))
241 print 'Error resolving bootstrap node IP:'
246 current_work=current_work,
247 port=args.p2pool_port,
249 addr_store=db.SQLiteDict(sqlite3.connect(os.path.join(os.path.dirname(sys.argv[0]), 'addrs.dat'), isolation_level=None), args.net.ADDRS_TABLE),
250 mode=0 if args.low_bandwidth else 1,
251 preferred_addrs=map(parse, args.p2pool_nodes) + nodes,
253 p2p_node.handle_shares = p2p_shares
254 p2p_node.handle_share_hashes = p2p_share_hashes
255 p2p_node.handle_get_shares = p2p_get_shares
259 # send share when the chain changes to their chain
260 def work_changed(new_work):
261 #print 'Work changed:', new_work
262 for share in tracker.get_chain_known(new_work['best_share_hash']):
265 share_share(share, share.peer)
266 current_work.changed.watch(work_changed)
271 # start listening for workers with a JSON-RPC server
273 print 'Listening for workers on port %i...' % (args.worker_port,)
277 merkle_root_to_transactions = expiring_dict.ExpiringDict(300)
278 run_identifier = struct.pack('<Q', random.randrange(2**64))
280 def compute(state, all_targets):
281 pre_extra_txs = [tx for tx in tx_pool.itervalues() if tx.is_good()]
282 pre_extra_txs = pre_extra_txs[:2**16 - 1] # merkle_branch limit
285 for tx in pre_extra_txs:
286 this_size = len(bitcoin.data.tx_type.pack(tx.tx))
287 if size + this_size > 500000:
292 # XXX assuming generate_tx is smallish here..
293 generate_tx = p2pool.generate_transaction(
295 previous_share_hash=state['best_share_hash'],
296 new_script=my_script,
297 subsidy=(50*100000000 >> (state['height'] + 1)//210000) + sum(tx.value_in - tx.value_out for tx in extra_txs),
298 nonce=run_identifier + struct.pack('<Q', random.randrange(2**64)),
299 block_target=state['target'],
302 print 'Generating! Difficulty: %.06f Payout if block: %.6f BTC' % (0xffff*2**208/p2pool.coinbase_type.unpack(generate_tx['tx_ins'][0]['script'])['share_data']['target'], generate_tx['tx_outs'][-1]['value']*1e-8)
303 #print 'Target: %x' % (p2pool.coinbase_type.unpack(generate_tx['tx_ins'][0]['script'])['share_data']['target'],)
304 #, have', shares.count(my_script) - 2, 'share(s) in the current chain. Fee:', sum(tx.value_in - tx.value_out for tx in extra_txs)/100000000
305 transactions = [generate_tx] + [tx.tx for tx in extra_txs]
306 merkle_root = bitcoin.data.merkle_hash(transactions)
307 merkle_root_to_transactions[merkle_root] = transactions # will stay for 1000 seconds
309 timestamp = current_work2.value['time']
310 if state['best_share_hash'] is not None:
311 timestamp2 = math.median((s.timestamp for s in itertools.islice(tracker.get_chain_to_root(state['best_share_hash']), 11)), use_float=False) + 1
312 if timestamp2 > timestamp:
313 print 'Toff', timestamp2 - timestamp
314 timestamp = timestamp2
315 target2 = p2pool.coinbase_type.unpack(generate_tx['tx_ins'][0]['script'])['share_data']['target']
317 target2 = min(2**256//2**32 - 1, target2)
318 times[p2pool.coinbase_type.unpack(generate_tx['tx_ins'][0]['script'])['share_data']['nonce']] = time.time()
319 #print 'SENT', 2**256//p2pool.coinbase_type.unpack(generate_tx['tx_ins'][0]['script'])['share_data']['target']
320 return bitcoin.getwork.BlockAttempt(state['version'], state['previous_block'], merkle_root, timestamp, state['target'], target2)
325 def got_response(data):
327 # match up with transactions
328 header = bitcoin.getwork.decode_data(data)
329 transactions = merkle_root_to_transactions.get(header['merkle_root'], None)
330 if transactions is None:
331 print '''Couldn't link returned work's merkle root with its transactions - should only happen if you recently restarted p2pool'''
333 block = dict(header=header, txs=transactions)
334 hash_ = bitcoin.data.block_header_type.hash256(block['header'])
335 if hash_ <= block['header']['target'] or p2pool_init.DEBUG:
337 print 'GOT BLOCK! Passing to bitcoind! %x' % (hash_,)
339 if factory.conn.value is not None:
340 factory.conn.value.send_block(block=block)
342 print 'No bitcoind connection! Erp!'
343 target = p2pool.coinbase_type.unpack(transactions[0]['tx_ins'][0]['script'])['share_data']['target']
345 print 'Received invalid share from worker - %x/%x' % (hash_, target)
347 share = p2pool.Share.from_block(block)
348 my_shares.add(share.hash)
349 print 'GOT SHARE! %x %x' % (share.hash % 2**32, 0 if share.previous_hash is None else share.previous_hash), "DEAD ON ARRIVAL" if share.previous_hash != current_work.value['best_share_hash'] else "", time.time() - times[share.nonce]
353 print 'Error processing data received from worker:'
361 if current_work.value['best_share_hash'] is not None:
362 height, last = tracker.get_height_and_last(current_work.value['best_share_hash'])
363 att_s = p2pool.get_pool_attempts_per_second(tracker, current_work.value['best_share_hash'], args.net)
366 reactor.listenTCP(args.worker_port, server.Site(worker_interface.WorkerInterface(current_work, compute, got_response, get_rate)))
373 def get_blocks(start_hash):
376 block = get_block.call_now(start_hash)
377 except deferral.NotNowError:
379 yield start_hash, block
380 start_hash = block['header']['previous_block']
382 tx_pool = expiring_dict.ExpiringDict(600, get_touches=False) # hash -> tx
385 def __init__(self, tx, seen_at_block):
386 self.hash = bitcoin.data.tx_type.hash256(tx)
388 self.seen_at_block = seen_at_block
389 self.mentions = set([bitcoin.data.tx_type.hash256(tx)] + [tx_in['previous_output']['hash'] for tx_in in tx['tx_ins']])
391 #print '%x %r' % (seen_at_block, tx)
392 #for mention in self.mentions:
393 # print '%x' % mention
395 self.parents_all_in_blocks = False
398 self.value_out = sum(txout['value'] for txout in self.tx['tx_outs'])
399 self._find_parents_in_blocks()
401 @defer.inlineCallbacks
402 def _find_parents_in_blocks(self):
403 for tx_in in self.tx['tx_ins']:
405 raw_transaction = yield get_raw_transaction(tx_in['previous_output']['hash'])
408 self.value_in += raw_transaction['tx']['txouts'][tx_in['previous_output']['index']]['value']
409 #print raw_transaction
410 if not raw_transaction['parent_blocks']:
412 self.parents_all_in_blocks = True
415 if not self.parents_all_in_blocks:
422 for block_hash, block in itertools.islice(get_blocks(current_work.value['previous_block']), 10):
423 if block_hash == self.seen_at_block:
425 for tx in block['txs']:
426 mentions = set([bitcoin.data.tx_type.hash256(tx)] + [tx_in['previous_output']['hash'] for tx_in in tx['tx_ins'] if tx_in['previous_output'] is not None])
427 if mentions & self.mentions:
431 @defer.inlineCallbacks
434 assert isinstance(tx_hash, (int, long))
435 #print "REQUESTING", tx_hash
436 tx = yield (yield factory.getProtocol()).get_tx(tx_hash)
438 tx_pool[bitcoin.data.tx_type.hash256(tx)] = Tx(tx, current_work.value['previous_block'])
441 print 'Error handling tx:'
444 factory.new_tx.watch(new_tx)
446 @defer.inlineCallbacks
447 def new_block(block):
448 yield set_real_work1()
450 factory.new_block.watch(new_block)
452 print 'Started successfully!'
455 @defer.inlineCallbacks
458 yield deferral.sleep(random.expovariate(1/1))
460 yield set_real_work1()
465 @defer.inlineCallbacks
468 yield deferral.sleep(random.expovariate(1/1))
470 yield set_real_work2()
477 counter = skiplist.CountsSkipList(tracker, my_script, run_identifier)
480 yield deferral.sleep(random.expovariate(1/1))
482 if current_work.value['best_share_hash'] is not None:
483 height, last = tracker.get_height_and_last(current_work.value['best_share_hash'])
485 att_s = p2pool.get_pool_attempts_per_second(tracker, current_work.value['best_share_hash'], args.net)
486 weights, total_weight = tracker.get_cumulative_weights(current_work.value['best_share_hash'], min(height, 1000), 2**100)
487 count = counter(current_work.value['best_share_hash'], height, 2**100)
488 print 'Pool: %i mhash/s in %i shares Recent: %.02f%% >%i mhash/s Known: %i shares (so %i stales)' % (
491 weights.get(my_script, 0)/total_weight*100,
492 weights.get(my_script, 0)/total_weight*att_s//1000000,
494 len(my_shares) - count,
496 #weights, total_weight = tracker.get_cumulative_weights(current_work.value['best_share_hash'], min(height, 100), 2**100)
497 #for k, v in weights.iteritems():
498 # print k.encode('hex'), v/total_weight
509 parser = argparse.ArgumentParser(description='p2pool (version %s)' % (p2pool_init.__version__,))
510 parser.add_argument('--version', action='version', version=p2pool_init.__version__)
511 parser.add_argument('--testnet',
512 help='use the testnet',
513 action='store_const', const=p2pool.Testnet, default=p2pool.Mainnet, dest='net')
514 parser.add_argument('--debug',
515 help='debugging mode',
516 action='store_const', const=True, default=False, dest='debug')
517 parser.add_argument('-a', '--address',
518 help='generate to this address (defaults to requesting one from bitcoind)',
519 type=str, action='store', default=None, dest='address')
521 p2pool_group = parser.add_argument_group('p2pool interface')
522 p2pool_group.add_argument('--p2pool-port', metavar='PORT',
523 help='use TCP port PORT to listen for connections (default: 9333 normally, 19333 for testnet) (forward this port from your router!)',
524 type=int, action='store', default=None, dest='p2pool_port')
525 p2pool_group.add_argument('-n', '--p2pool-node', metavar='ADDR[:PORT]',
526 help='connect to existing p2pool node at ADDR listening on TCP port PORT (defaults to 9333 normally, 19333 for testnet), in addition to builtin addresses',
527 type=str, action='append', default=[], dest='p2pool_nodes')
528 parser.add_argument('-l', '--low-bandwidth',
529 help='trade lower bandwidth usage for higher latency (reduced efficiency)',
530 action='store_true', default=False, dest='low_bandwidth')
532 worker_group = parser.add_argument_group('worker interface')
533 worker_group.add_argument('-w', '--worker-port', metavar='PORT',
534 help='listen on PORT for RPC connections from miners asking for work and providing responses (default: 9332)',
535 type=int, action='store', default=9332, dest='worker_port')
537 bitcoind_group = parser.add_argument_group('bitcoind interface')
538 bitcoind_group.add_argument('--bitcoind-address', metavar='BITCOIND_ADDRESS',
539 help='connect to a bitcoind at this address (default: 127.0.0.1)',
540 type=str, action='store', default='127.0.0.1', dest='bitcoind_address')
541 bitcoind_group.add_argument('--bitcoind-rpc-port', metavar='BITCOIND_RPC_PORT',
542 help='connect to a bitcoind at this port over the RPC interface - used to get the current highest block via getwork (default: 8332)',
543 type=int, action='store', default=8332, dest='bitcoind_rpc_port')
544 bitcoind_group.add_argument('--bitcoind-p2p-port', metavar='BITCOIND_P2P_PORT',
545 help='connect to a bitcoind at this port over the p2p interface - used to submit blocks and get the pubkey to generate to via an IP transaction (default: 8333 normally. 18333 for testnet)',
546 type=int, action='store', default=None, dest='bitcoind_p2p_port')
548 bitcoind_group.add_argument(metavar='BITCOIND_RPC_USERNAME',
549 help='bitcoind RPC interface username',
550 type=str, action='store', dest='bitcoind_rpc_username')
551 bitcoind_group.add_argument(metavar='BITCOIND_RPC_PASSWORD',
552 help='bitcoind RPC interface password',
553 type=str, action='store', dest='bitcoind_rpc_password')
555 args = parser.parse_args()
558 p2pool_init.DEBUG = True
560 if args.bitcoind_p2p_port is None:
561 args.bitcoind_p2p_port = args.net.BITCOIN_P2P_PORT
563 if args.p2pool_port is None:
564 args.p2pool_port = args.net.P2P_PORT
566 if args.address is not None:
568 args.pubkey_hash = bitcoin.data.address_to_pubkey_hash(args.address, args.net)
570 raise ValueError("error parsing address: " + repr(e))
572 args.pubkey_hash = None
574 reactor.callWhenRunning(main, args)