3 from __future__ import division
15 from twisted.internet import defer, reactor, task
16 from twisted.web import server
17 from twisted.python import log
19 import bitcoin.p2p, bitcoin.getwork, bitcoin.data
20 from util import db, expiring_dict, jsonrpc, variable, deferral, math, skiplist
21 from . import p2p, worker_interface
22 import p2pool.data as p2pool
23 import p2pool as p2pool_init
25 @deferral.retry('Error getting work from bitcoind:', 3)
26 @defer.inlineCallbacks
27 def getwork(bitcoind):
28 # a block could arrive in between these two queries
29 getwork_df, height_df = bitcoind.rpc_getwork(), bitcoind.rpc_getblocknumber()
31 getwork, height = bitcoin.getwork.BlockAttempt.from_getwork((yield getwork_df)), (yield height_df)
33 # get rid of residual errors
34 getwork_df.addErrback(lambda fail: None)
35 height_df.addErrback(lambda fail: None)
36 defer.returnValue((getwork, height))
38 @deferral.retry('Error getting payout script from bitcoind:', 1)
39 @defer.inlineCallbacks
40 def get_payout_script(factory):
41 res = yield (yield factory.getProtocol()).check_order(order=bitcoin.p2p.Protocol.null_order)
42 if res['reply'] == 'success':
43 my_script = res['script']
44 elif res['reply'] == 'denied':
47 raise ValueError('Unexpected reply: %r' % (res,))
49 @deferral.retry('Error creating payout script:', 10)
50 @defer.inlineCallbacks
51 def get_payout_script2(bitcoind, net):
52 defer.returnValue(bitcoin.data.pubkey_hash_to_script2(bitcoin.data.address_to_pubkey_hash((yield bitcoind.rpc_getaccountaddress('p2pool')), net)))
54 @defer.inlineCallbacks
57 print 'p2pool (version %s)' % (p2pool_init.__version__,)
60 # connect to bitcoind over JSON-RPC and do initial getwork
61 url = 'http://%s:%i/' % (args.bitcoind_address, args.bitcoind_rpc_port)
62 print '''Testing bitcoind RPC connection to '%s' with authorization '%s:%s'...''' % (url, args.bitcoind_rpc_username, args.bitcoind_rpc_password)
63 bitcoind = jsonrpc.Proxy(url, (args.bitcoind_rpc_username, args.bitcoind_rpc_password))
64 temp_work, temp_height = yield getwork(bitcoind)
66 print ' Current block hash: %x height: %i' % (temp_work.previous_block, temp_height)
69 # connect to bitcoind over bitcoin-p2p and do checkorder to get pubkey to send payouts to
70 print '''Testing bitcoind P2P connection to '%s:%s'...''' % (args.bitcoind_address, args.bitcoind_p2p_port)
71 factory = bitcoin.p2p.ClientFactory(args.net)
72 reactor.connectTCP(args.bitcoind_address, args.bitcoind_p2p_port, factory)
73 my_script = yield get_payout_script(factory)
74 if args.pubkey_hash is None:
76 print 'IP transaction denied ... falling back to sending to address.'
77 my_script = yield get_payout_script2(bitcoind, args.net)
79 my_script = bitcoin.data.pubkey_hash_to_script2(args.pubkey_hash)
81 print ' Payout script:', my_script.encode('hex')
84 @defer.inlineCallbacks
85 def real_get_block(block_hash):
86 block = yield (yield factory.getProtocol()).get_block(block_hash)
87 print 'Got block %x' % (block_hash,)
88 defer.returnValue(block)
89 get_block = deferral.DeferredCacher(real_get_block, expiring_dict.ExpiringDict(3600))
91 get_raw_transaction = deferral.DeferredCacher(lambda tx_hash: bitcoind.rpc_getrawtransaction('%x' % tx_hash), expiring_dict.ExpiringDict(100))
93 ht = bitcoin.p2p.HeightTracker(factory)
95 tracker = p2pool.OkayTracker(args.net)
96 chains = expiring_dict.ExpiringDict(300)
97 def get_chain(chain_id_data):
98 return chains.setdefault(chain_id_data, Chain(chain_id_data))
100 # information affecting work that should trigger a long-polling update
101 current_work = variable.Variable(None)
102 # information affecting work that should not trigger a long-polling update
103 current_work2 = variable.Variable(None)
106 task.LoopingCall(requested.clear).start(60)
108 @defer.inlineCallbacks
109 def set_real_work1():
110 work, height = yield getwork(bitcoind)
111 current_work.set(dict(
112 version=work.version,
113 previous_block=work.previous_block,
116 best_share_hash=current_work.value['best_share_hash'] if current_work.value is not None else None,
118 current_work2.set(dict(
122 def set_real_work2():
123 best, desired = tracker.think(ht, current_work.value['previous_block'], current_work2.value['time'])
125 t = dict(current_work.value)
126 t['best_share_hash'] = best
129 for peer2, share_hash in desired:
132 if (peer2.nonce, share_hash) in requested:
134 print 'Requesting parent share %x' % (share_hash,)
135 peer2.send_getshares(
138 stops=list(set(tracker.heads) | set(
139 tracker.get_nth_parent_hash(head, min(max(0, tracker.get_height_and_last(head)[0] - 1), 10)) for head in tracker.heads
142 requested.add((peer2.nonce, share_hash))
144 print 'Initializing work...'
145 yield set_real_work1()
146 yield set_real_work2()
149 # setup p2p logic and join p2pool network
151 def share_share(share, ignore_peer=None):
152 for peer in p2p_node.peers.itervalues():
153 if peer is ignore_peer:
155 peer.send_shares([share])
158 def p2p_shares(shares, peer=None):
160 if share.hash in tracker.shares:
161 print 'Got duplicate share, ignoring. Hash: %x' % (share.hash,)
164 #print 'Received share %x from %r' % (share.hash, share.peer.transport.getPeer() if share.peer is not None else None)
167 #for peer2, share_hash in desired:
168 # print 'Requesting parent share %x' % (share_hash,)
169 # peer2.send_getshares(hashes=[share_hash], parents=2000)
171 if share.gentx is not None:
172 if share.bitcoin_hash <= share.header['target']:
174 print 'GOT BLOCK! Passing to bitcoind! %x bitcoin: %x' % (share.hash, share.bitcoin_hash,)
176 if factory.conn.value is not None:
177 factory.conn.value.send_block(block=share.as_block())
179 print 'No bitcoind connection! Erp!'
184 best, desired = tracker.think(ht, current_work.value['previous_block'], current_work2.value['time'])
186 if best == share.hash:
187 print ('MINE> ' if peer is None else '') + 'Accepted share, new best, will pass to peers! Hash: %x' % (share.hash,)
189 print ('MINE> ' if peer is None else '') + 'Accepted share, not best. Hash: %x' % (share.hash,)
191 w = dict(current_work.value)
192 w['best_share_hash'] = best
195 def p2p_share_hashes(share_hashes, peer):
197 for share_hash in share_hashes:
198 if share_hash in tracker.shares:
199 pass # print 'Got share hash, already have, ignoring. Hash: %x' % (share_hash,)
201 print 'Got share hash, requesting! Hash: %x' % (share_hash,)
202 get_hashes.append(share_hash)
204 peer.send_getshares(hashes=get_hashes, parents=0, stops=[])
206 def p2p_get_shares(share_hashes, parents, stops, peer):
207 parents = min(parents, 1000//len(share_hashes))
210 for share_hash in share_hashes:
211 for share in itertools.islice(tracker.get_chain_known(share_hash), parents + 1):
212 if share.hash in stops:
215 peer.send_shares(shares, full=True)
217 print 'Joining p2pool network using TCP port %i...' % (args.p2pool_port,)
221 ip, port = x.split(':')
224 return x, args.net.P2P_PORT
227 ('72.14.191.28', args.net.P2P_PORT),
228 ('62.204.197.159', args.net.P2P_PORT),
231 nodes.append(((yield reactor.resolve('p2pool.forre.st')), args.net.P2P_PORT))
234 print 'Error resolving bootstrap node IP:'
239 current_work=current_work,
240 port=args.p2pool_port,
242 addr_store=db.SQLiteDict(sqlite3.connect(os.path.join(os.path.dirname(sys.argv[0]), 'addrs.dat'), isolation_level=None), args.net.ADDRS_TABLE),
243 mode=0 if args.low_bandwidth else 1,
244 preferred_addrs=map(parse, args.p2pool_nodes) + nodes,
246 p2p_node.handle_shares = p2p_shares
247 p2p_node.handle_share_hashes = p2p_share_hashes
248 p2p_node.handle_get_shares = p2p_get_shares
252 # send share when the chain changes to their chain
253 def work_changed(new_work):
254 #print 'Work changed:', new_work
255 for share in tracker.get_chain_known(new_work['best_share_hash']):
258 share_share(share, share.peer)
259 current_work.changed.watch(work_changed)
264 # start listening for workers with a JSON-RPC server
266 print 'Listening for workers on port %i...' % (args.worker_port,)
270 merkle_root_to_transactions = expiring_dict.ExpiringDict(300)
271 run_identifier = struct.pack('<Q', random.randrange(2**64))
273 def compute(state, all_targets):
274 pre_extra_txs = [tx for tx in tx_pool.itervalues() if tx.is_good()]
275 pre_extra_txs = pre_extra_txs[:2**16 - 1] # merkle_branch limit
278 for tx in pre_extra_txs:
279 this_size = len(bitcoin.data.tx_type.pack(tx.tx))
280 if size + this_size > 500000:
285 # XXX assuming generate_tx is smallish here..
286 generate_tx = p2pool.generate_transaction(
288 previous_share_hash=state['best_share_hash'],
289 new_script=my_script,
290 subsidy=(50*100000000 >> (state['height'] + 1)//210000) + sum(tx.value_in - tx.value_out for tx in extra_txs),
291 nonce=run_identifier + struct.pack('<Q', random.randrange(2**64)),
292 block_target=state['target'],
296 #print 'Target: %x' % (p2pool.coinbase_type.unpack(generate_tx['tx_ins'][0]['script'])['share_data']['target'],)
297 #, have', shares.count(my_script) - 2, 'share(s) in the current chain. Fee:', sum(tx.value_in - tx.value_out for tx in extra_txs)/100000000
298 transactions = [generate_tx] + [tx.tx for tx in extra_txs]
299 merkle_root = bitcoin.data.merkle_hash(transactions)
300 merkle_root_to_transactions[merkle_root] = transactions # will stay for 1000 seconds
302 timestamp = current_work2.value['time']
303 if state['best_share_hash'] is not None:
304 timestamp2 = math.median((s.timestamp for s in itertools.islice(tracker.get_chain_to_root(state['best_share_hash']), 11)), use_float=False) + 1
305 if timestamp2 > timestamp:
306 print 'Toff', timestamp2 - timestamp
307 timestamp = timestamp2
308 ba = bitcoin.getwork.BlockAttempt(state['version'], state['previous_block'], merkle_root, timestamp, state['target'])
309 #print 'SENT', 2**256//p2pool.coinbase_type.unpack(generate_tx['tx_ins'][0]['script'])['share_data']['target']
310 target = p2pool.coinbase_type.unpack(generate_tx['tx_ins'][0]['script'])['share_data']['target']
312 target = min(2**256//2**32 - 1, target)
313 return ba.getwork(target)
317 def got_response(data):
319 # match up with transactions
320 header = bitcoin.getwork.decode_data(data)
321 transactions = merkle_root_to_transactions.get(header['merkle_root'], None)
322 if transactions is None:
323 print '''Couldn't link returned work's merkle root with its transactions - should only happen if you recently restarted p2pool'''
325 block = dict(header=header, txs=transactions)
326 hash_ = bitcoin.data.block_header_type.hash256(block['header'])
327 if hash_ <= block['header']['target']:
329 print 'GOT BLOCK! Passing to bitcoind! %x' % (hash_,)
331 if factory.conn.value is not None:
332 factory.conn.value.send_block(block=block)
334 print 'No bitcoind connection! Erp!'
335 share = p2pool.Share.from_block(block)
336 my_shares.add(share.hash)
337 #print 'GOT SHARE! %x' % (share.hash,), "DEAD ON ARRIVAL" if share.previous_hash != current_work['best_share_hash'] else ""
341 print 'Error processing data received from worker:'
349 if current_work.value['best_share_hash'] is not None:
350 height, last = tracker.get_height_and_last(current_work.value['best_share_hash'])
351 att_s = p2pool.get_pool_attempts_per_second(tracker, current_work.value['best_share_hash'], args.net)
354 reactor.listenTCP(args.worker_port, server.Site(worker_interface.WorkerInterface(current_work, compute, got_response, get_rate)))
361 def get_blocks(start_hash):
364 block = get_block.call_now(start_hash)
365 except deferral.NotNowError:
367 yield start_hash, block
368 start_hash = block['header']['previous_block']
370 tx_pool = expiring_dict.ExpiringDict(600, get_touches=False) # hash -> tx
373 def __init__(self, tx, seen_at_block):
374 self.hash = bitcoin.data.tx_type.hash256(tx)
376 self.seen_at_block = seen_at_block
377 self.mentions = set([bitcoin.data.tx_type.hash256(tx)] + [tx_in['previous_output']['hash'] for tx_in in tx['tx_ins']])
379 #print '%x %r' % (seen_at_block, tx)
380 #for mention in self.mentions:
381 # print '%x' % mention
383 self.parents_all_in_blocks = False
386 self.value_out = sum(txout['value'] for txout in self.tx['tx_outs'])
387 self._find_parents_in_blocks()
389 @defer.inlineCallbacks
390 def _find_parents_in_blocks(self):
391 for tx_in in self.tx['tx_ins']:
393 raw_transaction = yield get_raw_transaction(tx_in['previous_output']['hash'])
396 self.value_in += raw_transaction['tx']['txouts'][tx_in['previous_output']['index']]['value']
397 #print raw_transaction
398 if not raw_transaction['parent_blocks']:
400 self.parents_all_in_blocks = True
403 if not self.parents_all_in_blocks:
410 for block_hash, block in itertools.islice(get_blocks(current_work.value['previous_block']), 10):
411 if block_hash == self.seen_at_block:
413 for tx in block['txs']:
414 mentions = set([bitcoin.data.tx_type.hash256(tx)] + [tx_in['previous_output']['hash'] for tx_in in tx['tx_ins'] if tx_in['previous_output'] is not None])
415 if mentions & self.mentions:
419 @defer.inlineCallbacks
422 assert isinstance(tx_hash, (int, long))
423 #print "REQUESTING", tx_hash
424 tx = yield (yield factory.getProtocol()).get_tx(tx_hash)
426 tx_pool[bitcoin.data.tx_type.hash256(tx)] = Tx(tx, current_work.value['previous_block'])
429 print 'Error handling tx:'
432 factory.new_tx.watch(new_tx)
434 @defer.inlineCallbacks
435 def new_block(block):
436 yield set_real_work1()
438 factory.new_block.watch(new_block)
440 print 'Started successfully!'
443 @defer.inlineCallbacks
446 yield deferral.sleep(random.expovariate(1/1))
448 yield set_real_work1()
453 @defer.inlineCallbacks
456 yield deferral.sleep(random.expovariate(1/1))
458 yield set_real_work2()
465 counter = skiplist.CountsSkipList(tracker, my_script, run_identifier)
468 yield deferral.sleep(random.expovariate(1/1))
470 if current_work.value['best_share_hash'] is not None:
471 height, last = tracker.get_height_and_last(current_work.value['best_share_hash'])
473 att_s = p2pool.get_pool_attempts_per_second(tracker, current_work.value['best_share_hash'], args.net)
474 weights, total_weight = tracker.get_cumulative_weights(current_work.value['best_share_hash'], min(height, 1000), 2**100)
475 count = counter(current_work.value['best_share_hash'], height, 2**100)
476 print 'Pool: %i mhash/s in %i shares Recent: %.02f%% >%i mhash/s Known: %i shares (so %i stales)' % (
479 weights.get(my_script, 0)/total_weight*100,
480 weights.get(my_script, 0)/total_weight*att_s//1000000,
482 len(my_shares) - count,
484 #weights, total_weight = tracker.get_cumulative_weights(current_work.value['best_share_hash'], min(height, 100), 2**100)
485 #for k, v in weights.iteritems():
486 # print k.encode('hex'), v/total_weight
498 defer.setDebugging(True)
500 parser = argparse.ArgumentParser(description='p2pool (version %s)' % (p2pool_init.__version__,))
501 parser.add_argument('--version', action='version', version=p2pool_init.__version__)
502 parser.add_argument('--testnet',
503 help='use the testnet',
504 action='store_const', const=p2pool.Testnet, default=p2pool.Mainnet, dest='net')
505 parser.add_argument('-a', '--address',
506 help='generate to this address (defaults to requesting one from bitcoind)',
507 type=str, action='store', default=None, dest='address')
509 p2pool_group = parser.add_argument_group('p2pool interface')
510 p2pool_group.add_argument('--p2pool-port', metavar='PORT',
511 help='use TCP port PORT to listen for connections (default: 9333 normally, 19333 for testnet) (forward this port from your router!)',
512 type=int, action='store', default=None, dest='p2pool_port')
513 p2pool_group.add_argument('-n', '--p2pool-node', metavar='ADDR[:PORT]',
514 help='connect to existing p2pool node at ADDR listening on TCP port PORT (defaults to 9333 normally, 19333 for testnet), in addition to builtin addresses',
515 type=str, action='append', default=[], dest='p2pool_nodes')
516 parser.add_argument('-l', '--low-bandwidth',
517 help='trade lower bandwidth usage for higher latency (reduced efficiency)',
518 action='store_true', default=False, dest='low_bandwidth')
520 worker_group = parser.add_argument_group('worker interface')
521 worker_group.add_argument('-w', '--worker-port', metavar='PORT',
522 help='listen on PORT for RPC connections from miners asking for work and providing responses (default: 9332)',
523 type=int, action='store', default=9332, dest='worker_port')
525 bitcoind_group = parser.add_argument_group('bitcoind interface')
526 bitcoind_group.add_argument('--bitcoind-address', metavar='BITCOIND_ADDRESS',
527 help='connect to a bitcoind at this address (default: 127.0.0.1)',
528 type=str, action='store', default='127.0.0.1', dest='bitcoind_address')
529 bitcoind_group.add_argument('--bitcoind-rpc-port', metavar='BITCOIND_RPC_PORT',
530 help='connect to a bitcoind at this port over the RPC interface - used to get the current highest block via getwork (default: 8332)',
531 type=int, action='store', default=8332, dest='bitcoind_rpc_port')
532 bitcoind_group.add_argument('--bitcoind-p2p-port', metavar='BITCOIND_P2P_PORT',
533 help='connect to a bitcoind at this port over the p2p interface - used to submit blocks and get the pubkey to generate to via an IP transaction (default: 8333 normally. 18333 for testnet)',
534 type=int, action='store', default=None, dest='bitcoind_p2p_port')
536 bitcoind_group.add_argument(metavar='BITCOIND_RPC_USERNAME',
537 help='bitcoind RPC interface username',
538 type=str, action='store', dest='bitcoind_rpc_username')
539 bitcoind_group.add_argument(metavar='BITCOIND_RPC_PASSWORD',
540 help='bitcoind RPC interface password',
541 type=str, action='store', dest='bitcoind_rpc_password')
543 args = parser.parse_args()
545 if args.bitcoind_p2p_port is None:
546 args.bitcoind_p2p_port = args.net.BITCOIN_P2P_PORT
548 if args.p2pool_port is None:
549 args.p2pool_port = args.net.P2P_PORT
551 if args.address is not None:
553 args.pubkey_hash = bitcoin.data.address_to_pubkey_hash(args.address, args.net)
555 raise ValueError("error parsing address: " + repr(e))
557 args.pubkey_hash = None
559 reactor.callWhenRunning(main, args)