3 from __future__ import division
15 from twisted.internet import defer, reactor, task
16 from twisted.web import server
17 from twisted.python import log
19 import bitcoin.p2p, bitcoin.getwork, bitcoin.data
20 from util import db, expiring_dict, jsonrpc, variable, deferral, math, skiplist
21 from . import p2p, worker_interface
22 import p2pool.data as p2pool
23 import p2pool as p2pool_init
25 @deferral.retry('Error getting work from bitcoind:', 3)
26 @defer.inlineCallbacks
27 def getwork(bitcoind):
28 # a block could arrive in between these two queries
29 getwork_df, height_df = bitcoind.rpc_getwork(), bitcoind.rpc_getblocknumber()
31 getwork, height = bitcoin.getwork.BlockAttempt.from_getwork((yield getwork_df)), (yield height_df)
33 # get rid of residual errors
34 getwork_df.addErrback(lambda fail: None)
35 height_df.addErrback(lambda fail: None)
36 defer.returnValue((getwork, height))
38 @deferral.retry('Error getting payout script from bitcoind:', 1)
39 @defer.inlineCallbacks
40 def get_payout_script(factory):
41 res = yield (yield factory.getProtocol()).check_order(order=bitcoin.p2p.Protocol.null_order)
42 if res['reply'] == 'success':
43 my_script = res['script']
44 elif res['reply'] == 'denied':
47 raise ValueError('Unexpected reply: %r' % (res,))
49 @deferral.retry('Error creating payout script:', 10)
50 @defer.inlineCallbacks
51 def get_payout_script2(bitcoind, net):
52 defer.returnValue(bitcoin.data.pubkey_hash_to_script2(bitcoin.data.address_to_pubkey_hash((yield bitcoind.rpc_getaccountaddress('p2pool')), net)))
54 @defer.inlineCallbacks
57 print 'p2pool (version %s)' % (p2pool_init.__version__,)
60 # connect to bitcoind over JSON-RPC and do initial getwork
61 url = 'http://%s:%i/' % (args.bitcoind_address, args.bitcoind_rpc_port)
62 print '''Testing bitcoind RPC connection to '%s' with authorization '%s:%s'...''' % (url, args.bitcoind_rpc_username, args.bitcoind_rpc_password)
63 bitcoind = jsonrpc.Proxy(url, (args.bitcoind_rpc_username, args.bitcoind_rpc_password))
64 temp_work, temp_height = yield getwork(bitcoind)
66 print ' Current block hash: %x height: %i' % (temp_work.previous_block, temp_height)
69 # connect to bitcoind over bitcoin-p2p and do checkorder to get pubkey to send payouts to
70 print '''Testing bitcoind P2P connection to '%s:%s'...''' % (args.bitcoind_address, args.bitcoind_p2p_port)
71 factory = bitcoin.p2p.ClientFactory(args.net)
72 reactor.connectTCP(args.bitcoind_address, args.bitcoind_p2p_port, factory)
73 my_script = yield get_payout_script(factory)
74 if args.pubkey_hash is None:
76 print 'IP transaction denied ... falling back to sending to address.'
77 my_script = yield get_payout_script2(bitcoind, args.net)
79 my_script = bitcoin.data.pubkey_hash_to_script2(args.pubkey_hash)
81 print ' Payout script:', my_script.encode('hex')
84 @defer.inlineCallbacks
85 def real_get_block(block_hash):
86 block = yield (yield factory.getProtocol()).get_block(block_hash)
87 print 'Got block %x' % (block_hash,)
88 defer.returnValue(block)
89 get_block = deferral.DeferredCacher(real_get_block, expiring_dict.ExpiringDict(3600))
91 get_raw_transaction = deferral.DeferredCacher(lambda tx_hash: bitcoind.rpc_getrawtransaction('%x' % tx_hash), expiring_dict.ExpiringDict(100))
93 ht = bitcoin.p2p.HeightTracker(factory)
95 tracker = p2pool.OkayTracker(args.net)
96 chains = expiring_dict.ExpiringDict(300)
97 def get_chain(chain_id_data):
98 return chains.setdefault(chain_id_data, Chain(chain_id_data))
100 # information affecting work that should trigger a long-polling update
101 current_work = variable.Variable(None)
102 # information affecting work that should not trigger a long-polling update
103 current_work2 = variable.Variable(None)
106 task.LoopingCall(requested.clear).start(60)
108 @defer.inlineCallbacks
109 def set_real_work1():
110 work, height = yield getwork(bitcoind)
111 current_work.set(dict(
112 version=work.version,
113 previous_block=work.previous_block,
116 best_share_hash=current_work.value['best_share_hash'] if current_work.value is not None else None,
118 current_work2.set(dict(
122 def set_real_work2():
123 best, desired = tracker.think(ht, current_work.value['previous_block'], current_work2.value['time'])
125 t = dict(current_work.value)
126 t['best_share_hash'] = best
129 for peer2, share_hash in desired:
132 if (peer2.nonce, share_hash) in requested:
134 print 'Requesting parent share %x' % (share_hash % 2**32,)
135 peer2.send_getshares(
138 stops=list(set(tracker.heads) | set(
139 tracker.get_nth_parent_hash(head, min(max(0, tracker.get_height_and_last(head)[0] - 1), 10)) for head in tracker.heads
142 requested.add((peer2.nonce, share_hash))
144 print 'Initializing work...'
145 yield set_real_work1()
146 yield set_real_work2()
149 # setup p2p logic and join p2pool network
151 def share_share(share, ignore_peer=None):
152 for peer in p2p_node.peers.itervalues():
153 if peer is ignore_peer:
155 peer.send_shares([share])
158 def p2p_shares(shares, peer=None):
160 print "Processing %i shares..." % (len(shares),)
163 if share.hash in tracker.shares:
164 #print 'Got duplicate share, ignoring. Hash: %x' % (share.hash % 2**32,)
167 #print 'Received share %x from %r' % (share.hash % 2**32, share.peer.transport.getPeer() if share.peer is not None else None)
170 #for peer2, share_hash in desired:
171 # print 'Requesting parent share %x' % (share_hash,)
172 # peer2.send_getshares(hashes=[share_hash], parents=2000)
174 if share.bitcoin_hash <= share.header['target']:
176 print 'GOT BLOCK! Passing to bitcoind! %x bitcoin: %x' % (share.hash % 2**32, share.bitcoin_hash,)
178 if factory.conn.value is not None:
179 factory.conn.value.send_block(block=share.as_block(tracker, net))
181 print 'No bitcoind connection! Erp!'
186 best, desired = tracker.think(ht, current_work.value['previous_block'], current_work2.value['time'])
188 if best == share.hash:
189 print ('MINE: ' if peer is None else '') + 'Accepted share, new best, will pass to peers! Hash: %x' % (share.hash % 2**32,)
191 print ('MINE: ' if peer is None else '') + 'Accepted share, not best. Hash: %x' % (share.hash % 2**32,)
193 w = dict(current_work.value)
194 w['best_share_hash'] = best
198 print "... done processing %i shares." % (len(shares),)
200 def p2p_share_hashes(share_hashes, peer):
202 for share_hash in share_hashes:
203 if share_hash in tracker.shares:
204 pass # print 'Got share hash, already have, ignoring. Hash: %x' % (share_hash % 2**32,)
206 print 'Got share hash, requesting! Hash: %x' % (share_hash % 2**32,)
207 get_hashes.append(share_hash)
209 peer.send_getshares(hashes=get_hashes, parents=0, stops=[])
211 def p2p_get_shares(share_hashes, parents, stops, peer):
212 parents = min(parents, 1000//len(share_hashes))
215 for share_hash in share_hashes:
216 for share in itertools.islice(tracker.get_chain_known(share_hash), parents + 1):
217 if share.hash in stops:
220 peer.send_shares(shares, full=True)
222 print 'Joining p2pool network using TCP port %i...' % (args.p2pool_port,)
226 ip, port = x.split(':')
229 return x, args.net.P2P_PORT
232 ('72.14.191.28', args.net.P2P_PORT),
233 ('62.204.197.159', args.net.P2P_PORT),
236 nodes.append(((yield reactor.resolve('p2pool.forre.st')), args.net.P2P_PORT))
239 print 'Error resolving bootstrap node IP:'
244 current_work=current_work,
245 port=args.p2pool_port,
247 addr_store=db.SQLiteDict(sqlite3.connect(os.path.join(os.path.dirname(sys.argv[0]), 'addrs.dat'), isolation_level=None), args.net.ADDRS_TABLE),
248 mode=0 if args.low_bandwidth else 1,
249 preferred_addrs=map(parse, args.p2pool_nodes) + nodes,
251 p2p_node.handle_shares = p2p_shares
252 p2p_node.handle_share_hashes = p2p_share_hashes
253 p2p_node.handle_get_shares = p2p_get_shares
257 # send share when the chain changes to their chain
258 def work_changed(new_work):
259 #print 'Work changed:', new_work
260 for share in tracker.get_chain_known(new_work['best_share_hash']):
263 share_share(share, share.peer)
264 current_work.changed.watch(work_changed)
269 # start listening for workers with a JSON-RPC server
271 print 'Listening for workers on port %i...' % (args.worker_port,)
275 merkle_root_to_transactions = expiring_dict.ExpiringDict(300)
276 run_identifier = struct.pack('<Q', random.randrange(2**64))
278 def compute(state, all_targets):
280 pre_extra_txs = [tx for tx in tx_pool.itervalues() if tx.is_good()]
281 pre_extra_txs = pre_extra_txs[:2**16 - 1] # merkle_branch limit
284 for tx in pre_extra_txs:
285 this_size = len(bitcoin.data.tx_type.pack(tx.tx))
286 if size + this_size > 500000:
291 # XXX assuming generate_tx is smallish here..
292 generate_tx = p2pool.generate_transaction(
294 previous_share_hash=state['best_share_hash'],
295 new_script=my_script,
296 subsidy=(50*100000000 >> (state['height'] + 1)//210000) + sum(tx.value_in - tx.value_out for tx in extra_txs),
297 nonce=run_identifier + struct.pack('<Q', random.randrange(2**64)),
298 block_target=state['target'],
301 print 'Generating! Difficulty: %.06f Payout if block: %.6f BTC' % (0xffff*2**208/p2pool.coinbase_type.unpack(generate_tx['tx_ins'][0]['script'])['share_data']['target'], generate_tx['tx_outs'][-1]['value']*1e-8)
302 #print 'Target: %x' % (p2pool.coinbase_type.unpack(generate_tx['tx_ins'][0]['script'])['share_data']['target'],)
303 #, have', shares.count(my_script) - 2, 'share(s) in the current chain. Fee:', sum(tx.value_in - tx.value_out for tx in extra_txs)/100000000
304 transactions = [generate_tx] + [tx.tx for tx in extra_txs]
305 merkle_root = bitcoin.data.merkle_hash(transactions)
306 merkle_root_to_transactions[merkle_root] = transactions # will stay for 1000 seconds
308 timestamp = current_work2.value['time']
309 if state['best_share_hash'] is not None:
310 timestamp2 = math.median((s.timestamp for s in itertools.islice(tracker.get_chain_to_root(state['best_share_hash']), 11)), use_float=False) + 1
311 if timestamp2 > timestamp:
312 print 'Toff', timestamp2 - timestamp
313 timestamp = timestamp2
314 target2 = p2pool.coinbase_type.unpack(generate_tx['tx_ins'][0]['script'])['share_data']['target']
316 target2 = min(2**256//2**32 - 1, target2)
317 print "TOOK", time.time() - start
318 times[p2pool.coinbase_type.unpack(generate_tx['tx_ins'][0]['script'])['share_data']['nonce']] = time.time()
319 #print 'SENT', 2**256//p2pool.coinbase_type.unpack(generate_tx['tx_ins'][0]['script'])['share_data']['target']
320 return bitcoin.getwork.BlockAttempt(state['version'], state['previous_block'], merkle_root, timestamp, state['target'], target2)
325 def got_response(data):
327 # match up with transactions
328 header = bitcoin.getwork.decode_data(data)
329 transactions = merkle_root_to_transactions.get(header['merkle_root'], None)
330 if transactions is None:
331 print '''Couldn't link returned work's merkle root with its transactions - should only happen if you recently restarted p2pool'''
333 block = dict(header=header, txs=transactions)
334 hash_ = bitcoin.data.block_header_type.hash256(block['header'])
335 if hash_ <= block['header']['target']:
337 print 'GOT BLOCK! Passing to bitcoind! %x' % (hash_,)
339 if factory.conn.value is not None:
340 factory.conn.value.send_block(block=block)
342 print 'No bitcoind connection! Erp!'
343 target = p2pool.coinbase_type.unpack(transactions[0]['tx_ins'][0]['script'])['share_data']['target']
345 print 'Received invalid share from worker - %x/%x' % (hash_, target)
347 share = p2pool.Share.from_block(block)
348 my_shares.add(share.hash)
349 print 'GOT SHARE! %x %x' % (share.hash % 2**32, 0 if share.previous_hash is None else share.previous_hash), "DEAD ON ARRIVAL" if share.previous_hash != current_work.value['best_share_hash'] else "", time.time() - times[share.nonce]
353 print 'Error processing data received from worker:'
361 if current_work.value['best_share_hash'] is not None:
362 height, last = tracker.get_height_and_last(current_work.value['best_share_hash'])
363 att_s = p2pool.get_pool_attempts_per_second(tracker, current_work.value['best_share_hash'], args.net)
366 reactor.listenTCP(args.worker_port, server.Site(worker_interface.WorkerInterface(current_work, compute, got_response, get_rate)))
373 def get_blocks(start_hash):
376 block = get_block.call_now(start_hash)
377 except deferral.NotNowError:
379 yield start_hash, block
380 start_hash = block['header']['previous_block']
382 tx_pool = expiring_dict.ExpiringDict(600, get_touches=False) # hash -> tx
385 def __init__(self, tx, seen_at_block):
386 self.hash = bitcoin.data.tx_type.hash256(tx)
388 self.seen_at_block = seen_at_block
389 self.mentions = set([bitcoin.data.tx_type.hash256(tx)] + [tx_in['previous_output']['hash'] for tx_in in tx['tx_ins']])
391 #print '%x %r' % (seen_at_block, tx)
392 #for mention in self.mentions:
393 # print '%x' % mention
395 self.parents_all_in_blocks = False
398 self.value_out = sum(txout['value'] for txout in self.tx['tx_outs'])
399 self._find_parents_in_blocks()
401 @defer.inlineCallbacks
402 def _find_parents_in_blocks(self):
403 for tx_in in self.tx['tx_ins']:
405 raw_transaction = yield get_raw_transaction(tx_in['previous_output']['hash'])
408 self.value_in += raw_transaction['tx']['txouts'][tx_in['previous_output']['index']]['value']
409 #print raw_transaction
410 if not raw_transaction['parent_blocks']:
412 self.parents_all_in_blocks = True
415 if not self.parents_all_in_blocks:
422 for block_hash, block in itertools.islice(get_blocks(current_work.value['previous_block']), 10):
423 if block_hash == self.seen_at_block:
425 for tx in block['txs']:
426 mentions = set([bitcoin.data.tx_type.hash256(tx)] + [tx_in['previous_output']['hash'] for tx_in in tx['tx_ins'] if tx_in['previous_output'] is not None])
427 if mentions & self.mentions:
431 @defer.inlineCallbacks
434 assert isinstance(tx_hash, (int, long))
435 #print "REQUESTING", tx_hash
436 tx = yield (yield factory.getProtocol()).get_tx(tx_hash)
438 tx_pool[bitcoin.data.tx_type.hash256(tx)] = Tx(tx, current_work.value['previous_block'])
441 print 'Error handling tx:'
444 factory.new_tx.watch(new_tx)
446 @defer.inlineCallbacks
447 def new_block(block):
448 yield set_real_work1()
450 factory.new_block.watch(new_block)
452 print 'Started successfully!'
455 @defer.inlineCallbacks
458 yield deferral.sleep(random.expovariate(1/1))
460 yield set_real_work1()
465 @defer.inlineCallbacks
468 yield deferral.sleep(random.expovariate(1/1))
470 yield set_real_work2()
477 counter = skiplist.CountsSkipList(tracker, my_script, run_identifier)
480 yield deferral.sleep(random.expovariate(1/1))
482 if current_work.value['best_share_hash'] is not None:
483 height, last = tracker.get_height_and_last(current_work.value['best_share_hash'])
485 att_s = p2pool.get_pool_attempts_per_second(tracker, current_work.value['best_share_hash'], args.net)
486 weights, total_weight = tracker.get_cumulative_weights(current_work.value['best_share_hash'], min(height, 1000), 2**100)
487 count = counter(current_work.value['best_share_hash'], height, 2**100)
488 print 'Pool: %i mhash/s in %i shares Recent: %.02f%% >%i mhash/s Known: %i shares (so %i stales)' % (
491 weights.get(my_script, 0)/total_weight*100,
492 weights.get(my_script, 0)/total_weight*att_s//1000000,
494 len(my_shares) - count,
496 #weights, total_weight = tracker.get_cumulative_weights(current_work.value['best_share_hash'], min(height, 100), 2**100)
497 #for k, v in weights.iteritems():
498 # print k.encode('hex'), v/total_weight
509 parser = argparse.ArgumentParser(description='p2pool (version %s)' % (p2pool_init.__version__,))
510 parser.add_argument('--version', action='version', version=p2pool_init.__version__)
511 parser.add_argument('--testnet',
512 help='use the testnet',
513 action='store_const', const=p2pool.Testnet, default=p2pool.Mainnet, dest='net')
514 parser.add_argument('--debug',
515 help='debugging mode',
516 action='store_const', const=True, default=False, dest='debug')
517 parser.add_argument('-a', '--address',
518 help='generate to this address (defaults to requesting one from bitcoind)',
519 type=str, action='store', default=None, dest='address')
521 p2pool_group = parser.add_argument_group('p2pool interface')
522 p2pool_group.add_argument('--p2pool-port', metavar='PORT',
523 help='use TCP port PORT to listen for connections (default: 9333 normally, 19333 for testnet) (forward this port from your router!)',
524 type=int, action='store', default=None, dest='p2pool_port')
525 p2pool_group.add_argument('-n', '--p2pool-node', metavar='ADDR[:PORT]',
526 help='connect to existing p2pool node at ADDR listening on TCP port PORT (defaults to 9333 normally, 19333 for testnet), in addition to builtin addresses',
527 type=str, action='append', default=[], dest='p2pool_nodes')
528 parser.add_argument('-l', '--low-bandwidth',
529 help='trade lower bandwidth usage for higher latency (reduced efficiency)',
530 action='store_true', default=False, dest='low_bandwidth')
532 worker_group = parser.add_argument_group('worker interface')
533 worker_group.add_argument('-w', '--worker-port', metavar='PORT',
534 help='listen on PORT for RPC connections from miners asking for work and providing responses (default: 9332)',
535 type=int, action='store', default=9332, dest='worker_port')
537 bitcoind_group = parser.add_argument_group('bitcoind interface')
538 bitcoind_group.add_argument('--bitcoind-address', metavar='BITCOIND_ADDRESS',
539 help='connect to a bitcoind at this address (default: 127.0.0.1)',
540 type=str, action='store', default='127.0.0.1', dest='bitcoind_address')
541 bitcoind_group.add_argument('--bitcoind-rpc-port', metavar='BITCOIND_RPC_PORT',
542 help='connect to a bitcoind at this port over the RPC interface - used to get the current highest block via getwork (default: 8332)',
543 type=int, action='store', default=8332, dest='bitcoind_rpc_port')
544 bitcoind_group.add_argument('--bitcoind-p2p-port', metavar='BITCOIND_P2P_PORT',
545 help='connect to a bitcoind at this port over the p2p interface - used to submit blocks and get the pubkey to generate to via an IP transaction (default: 8333 normally. 18333 for testnet)',
546 type=int, action='store', default=None, dest='bitcoind_p2p_port')
548 bitcoind_group.add_argument(metavar='BITCOIND_RPC_USERNAME',
549 help='bitcoind RPC interface username',
550 type=str, action='store', dest='bitcoind_rpc_username')
551 bitcoind_group.add_argument(metavar='BITCOIND_RPC_PASSWORD',
552 help='bitcoind RPC interface password',
553 type=str, action='store', dest='bitcoind_rpc_password')
555 args = parser.parse_args()
558 p2pool_init.DEBUG = True
560 if args.bitcoind_p2p_port is None:
561 args.bitcoind_p2p_port = args.net.BITCOIN_P2P_PORT
563 if args.p2pool_port is None:
564 args.p2pool_port = args.net.P2P_PORT
566 if args.address is not None:
568 args.pubkey_hash = bitcoin.data.address_to_pubkey_hash(args.address, args.net)
570 raise ValueError("error parsing address: " + repr(e))
572 args.pubkey_hash = None
574 reactor.callWhenRunning(main, args)