3 from __future__ import division
15 from twisted.internet import defer, reactor, task
16 from twisted.web import server
17 from twisted.python import log
19 import bitcoin.p2p, bitcoin.getwork, bitcoin.data
20 from util import db, expiring_dict, jsonrpc, variable, deferral, math
21 from . import p2p, worker_interface
22 import p2pool.data as p2pool
25 os.chdir(os.path.abspath(os.path.dirname(sys.argv[0])))
27 __version__ = subprocess.Popen(['git', 'describe', '--always'], stdout=subprocess.PIPE).stdout.read().strip()
29 __version__ = 'unknown'
32 @deferral.retry('Error getting work from bitcoind:', 3)
33 @defer.inlineCallbacks
34 def getwork(bitcoind):
35 # a block could arrive in between these two queries
36 getwork_df, height_df = bitcoind.rpc_getwork(), bitcoind.rpc_getblocknumber()
38 getwork, height = bitcoin.getwork.BlockAttempt.from_getwork((yield getwork_df)), (yield height_df)
40 # get rid of residual errors
41 getwork_df.addErrback(lambda fail: None)
42 height_df.addErrback(lambda fail: None)
43 defer.returnValue((getwork, height))
45 @deferral.retry('Error getting payout script from bitcoind:', 1)
46 @defer.inlineCallbacks
47 def get_payout_script(factory):
48 res = yield (yield factory.getProtocol()).check_order(order=bitcoin.p2p.Protocol.null_order)
49 if res['reply'] == 'success':
50 my_script = res['script']
51 elif res['reply'] == 'denied':
54 raise ValueError('Unexpected reply: %r' % (res,))
56 @deferral.retry('Error creating payout script:', 10)
57 @defer.inlineCallbacks
58 def get_payout_script2(bitcoind, net):
59 defer.returnValue(bitcoin.data.pubkey_hash_to_script2(bitcoin.data.address_to_pubkey_hash((yield bitcoind.rpc_getaccountaddress('p2pool')), net)))
61 @defer.inlineCallbacks
64 print 'p2pool (version %s)' % (__version__,)
67 # connect to bitcoind over JSON-RPC and do initial getwork
68 url = 'http://%s:%i/' % (args.bitcoind_address, args.bitcoind_rpc_port)
69 print '''Testing bitcoind RPC connection to '%s' with authorization '%s:%s'...''' % (url, args.bitcoind_rpc_username, args.bitcoind_rpc_password)
70 bitcoind = jsonrpc.Proxy(url, (args.bitcoind_rpc_username, args.bitcoind_rpc_password))
71 temp_work, temp_height = yield getwork(bitcoind)
73 print ' Current block hash: %x height: %i' % (temp_work.previous_block, temp_height)
76 # connect to bitcoind over bitcoin-p2p and do checkorder to get pubkey to send payouts to
77 print '''Testing bitcoind P2P connection to '%s:%s'...''' % (args.bitcoind_address, args.bitcoind_p2p_port)
78 factory = bitcoin.p2p.ClientFactory(args.net)
79 reactor.connectTCP(args.bitcoind_address, args.bitcoind_p2p_port, factory)
80 my_script = yield get_payout_script(factory)
81 if args.pubkey_hash is None:
83 print 'IP transaction denied ... falling back to sending to address.'
84 my_script = yield get_payout_script2(bitcoind, args.net)
86 my_script = bitcoin.data.pubkey_hash_to_script2(args.pubkey_hash)
88 print ' Payout script:', my_script.encode('hex')
91 @defer.inlineCallbacks
92 def real_get_block(block_hash):
93 block = yield (yield factory.getProtocol()).get_block(block_hash)
94 print 'Got block %x' % (block_hash,)
95 defer.returnValue(block)
96 get_block = deferral.DeferredCacher(real_get_block, expiring_dict.ExpiringDict(3600))
98 get_raw_transaction = deferral.DeferredCacher(lambda tx_hash: bitcoind.rpc_getrawtransaction('%x' % tx_hash), expiring_dict.ExpiringDict(100))
100 ht = bitcoin.p2p.HeightTracker(factory)
102 tracker = p2pool.OkayTracker(args.net)
103 chains = expiring_dict.ExpiringDict(300)
104 def get_chain(chain_id_data):
105 return chains.setdefault(chain_id_data, Chain(chain_id_data))
107 # information affecting work that should trigger a long-polling update
108 current_work = variable.Variable(None)
109 # information affecting work that should not trigger a long-polling update
110 current_work2 = variable.Variable(None)
113 task.LoopingCall(requested.clear).start(60)
115 @defer.inlineCallbacks
116 def set_real_work1():
117 work, height = yield getwork(bitcoind)
118 current_work.set(dict(
119 version=work.version,
120 previous_block=work.previous_block,
123 best_share_hash=current_work.value['best_share_hash'] if current_work.value is not None else None,
125 current_work2.set(dict(
129 def set_real_work2():
130 best, desired = tracker.think(ht, current_work.value['previous_block'], current_work2.value['time'])
132 t = dict(current_work.value)
133 t['best_share_hash'] = best
136 for peer2, share_hash in desired:
139 if (peer2.nonce, share_hash) in requested:
141 print 'Requesting parent share %x' % (share_hash,)
142 peer2.send_getshares(
145 stops=list(set(tracker.heads) | set(
146 tracker.get_nth_parent_hash(head, min(max(0, tracker.get_height_and_last(head)[0] - 1), 10)) for head in tracker.heads
149 requested.add((peer2.nonce, share_hash))
151 print 'Initializing work...'
152 yield set_real_work1()
153 yield set_real_work2()
156 # setup p2p logic and join p2pool network
158 def share_share(share, ignore_peer=None):
159 for peer in p2p_node.peers.itervalues():
160 if peer is ignore_peer:
162 peer.send_shares([share])
165 def p2p_shares(shares, peer=None):
167 if share.hash in tracker.shares:
168 print 'Got duplicate share, ignoring. Hash: %x' % (share.hash,)
171 #print 'Received share %x from %r' % (share.hash, share.peer.transport.getPeer() if share.peer is not None else None)
174 #for peer2, share_hash in desired:
175 # print 'Requesting parent share %x' % (share_hash,)
176 # peer2.send_getshares(hashes=[share_hash], parents=2000)
178 if share.gentx is not None:
179 if share.bitcoin_hash <= share.header['target']:
181 print 'GOT BLOCK! Passing to bitcoind! %x bitcoin: %x' % (share.hash, share.bitcoin_hash,)
183 if factory.conn.value is not None:
184 factory.conn.value.send_block(block=share.as_block())
186 print 'No bitcoind connection! Erp!'
188 best, desired = tracker.think(ht, current_work.value['previous_block'], current_work2.value['time'])
190 if best == share.hash:
191 print 'Accepted share, new best, will pass to peers! Hash: %x' % (share.hash,)
193 print 'Accepted share, not best. Hash: %x' % (share.hash,)
195 w = dict(current_work.value)
196 w['best_share_hash'] = best
199 def p2p_share_hashes(share_hashes, peer):
201 for share_hash in share_hashes:
202 if share_hash in tracker.shares:
203 print 'Got share hash, already have, ignoring. Hash: %x' % (share_hash,)
205 print 'Got share hash, requesting! Hash: %x' % (share_hash,)
206 get_hashes.append(share_hash)
208 peer.send_getshares(hashes=get_hashes, parents=0, stops=[])
210 def p2p_get_shares(share_hashes, parents, stops, peer):
211 parents = min(parents, 1000//len(share_hashes))
214 for share_hash in share_hashes:
215 for share in itertools.islice(tracker.get_chain_known(share_hash), parents + 1):
216 if share.hash in stops:
219 peer.send_shares(shares, full=True)
221 print 'Joining p2pool network using TCP port %i...' % (args.p2pool_port,)
225 ip, port = x.split(':')
228 return x, args.net.P2P_PORT
231 ('72.14.191.28', args.net.P2P_PORT),
232 ('62.204.197.159', args.net.P2P_PORT),
235 nodes.append(((yield reactor.resolve('p2pool.forre.st')), args.net.P2P_PORT))
238 print 'Error resolving bootstrap node IP:'
243 current_work=current_work,
244 port=args.p2pool_port,
246 addr_store=db.SQLiteDict(sqlite3.connect(os.path.join(os.path.dirname(sys.argv[0]), 'addrs.dat'), isolation_level=None), args.net.ADDRS_TABLE),
247 mode=0 if args.low_bandwidth else 1,
248 preferred_addrs=map(parse, args.p2pool_nodes) + nodes,
250 p2p_node.handle_shares = p2p_shares
251 p2p_node.handle_share_hashes = p2p_share_hashes
252 p2p_node.handle_get_shares = p2p_get_shares
256 # send share when the chain changes to their chain
257 def work_changed(new_work):
258 #print 'Work changed:', new_work
259 for share in tracker.get_chain_known(new_work['best_share_hash']):
262 share_share(share, share.peer)
263 current_work.changed.watch(work_changed)
268 # start listening for workers with a JSON-RPC server
270 print 'Listening for workers on port %i...' % (args.worker_port,)
274 merkle_root_to_transactions = expiring_dict.ExpiringDict(300)
276 def compute(state, all_targets):
277 pre_extra_txs = [tx for tx in tx_pool.itervalues() if tx.is_good()]
278 pre_extra_txs = pre_extra_txs[:2**16 - 1] # merkle_branch limit
281 for tx in pre_extra_txs:
282 this_size = len(bitcoin.data.tx_type.pack(tx.tx))
283 if size + this_size > 500000:
288 # XXX assuming generate_tx is smallish here..
289 generate_tx = p2pool.generate_transaction(
291 previous_share_hash=state['best_share_hash'],
292 new_script=my_script,
293 subsidy=(50*100000000 >> (state['height'] + 1)//210000) + sum(tx.value_in - tx.value_out for tx in extra_txs),
294 nonce=struct.pack('<Q', random.randrange(2**64)),
295 block_target=state['target'],
299 #print 'Target: %x' % (p2pool.coinbase_type.unpack(generate_tx['tx_ins'][0]['script'])['share_data']['target'],)
300 #, have', shares.count(my_script) - 2, 'share(s) in the current chain. Fee:', sum(tx.value_in - tx.value_out for tx in extra_txs)/100000000
301 transactions = [generate_tx] + [tx.tx for tx in extra_txs]
302 merkle_root = bitcoin.data.merkle_hash(transactions)
303 merkle_root_to_transactions[merkle_root] = transactions # will stay for 1000 seconds
305 timestamp = current_work2.value['time']
306 if state['best_share_hash'] is not None:
307 timestamp2 = math.median((s.timestamp for s in itertools.islice(tracker.get_chain_to_root(state['best_share_hash']), 11)), use_float=False) + 1
308 if timestamp2 > timestamp:
309 print 'Toff', timestamp2 - timestamp
310 timestamp = timestamp2
311 ba = bitcoin.getwork.BlockAttempt(state['version'], state['previous_block'], merkle_root, timestamp, state['target'])
312 #print 'SENT', 2**256//p2pool.coinbase_type.unpack(generate_tx['tx_ins'][0]['script'])['share_data']['target']
313 target = p2pool.coinbase_type.unpack(generate_tx['tx_ins'][0]['script'])['share_data']['target']
315 target = min(2**256//2**32 - 1, target)
316 return ba.getwork(target)
318 def got_response(data):
320 # match up with transactions
321 header = bitcoin.getwork.decode_data(data)
322 transactions = merkle_root_to_transactions.get(header['merkle_root'], None)
323 if transactions is None:
324 print '''Couldn't link returned work's merkle root with its transactions - should only happen if you recently restarted p2pool'''
326 block = dict(header=header, txs=transactions)
327 hash_ = bitcoin.data.block_header_type.hash256(block['header'])
328 if hash_ <= block['header']['target']:
330 print 'GOT BLOCK! Passing to bitcoind! %x' % (hash_,)
332 if factory.conn.value is not None:
333 factory.conn.value.send_block(block=block)
335 print 'No bitcoind connection! Erp!'
336 share = p2pool.Share.from_block(block)
337 print 'GOT SHARE! %x' % (share.hash,)
341 print 'Error processing data received from worker:'
349 if current_work.value['best_share_hash'] is not None:
350 height, last = tracker.get_height_and_last(current_work.value['best_share_hash'])
351 att_s = p2pool.get_pool_attempts_per_second(tracker, current_work.value['best_share_hash'], args.net)
354 reactor.listenTCP(args.worker_port, server.Site(worker_interface.WorkerInterface(current_work, compute, got_response, get_rate)))
361 def get_blocks(start_hash):
364 block = get_block.call_now(start_hash)
365 except deferral.NotNowError:
367 yield start_hash, block
368 start_hash = block['header']['previous_block']
370 tx_pool = expiring_dict.ExpiringDict(600, get_touches=False) # hash -> tx
373 def __init__(self, tx, seen_at_block):
374 self.hash = bitcoin.data.tx_type.hash256(tx)
376 self.seen_at_block = seen_at_block
377 self.mentions = set([bitcoin.data.tx_type.hash256(tx)] + [tx_in['previous_output']['hash'] for tx_in in tx['tx_ins']])
379 #print '%x %r' % (seen_at_block, tx)
380 #for mention in self.mentions:
381 # print '%x' % mention
383 self.parents_all_in_blocks = False
386 self.value_out = sum(txout['value'] for txout in self.tx['tx_outs'])
387 self._find_parents_in_blocks()
389 @defer.inlineCallbacks
390 def _find_parents_in_blocks(self):
391 for tx_in in self.tx['tx_ins']:
393 raw_transaction = yield get_raw_transaction(tx_in['previous_output']['hash'])
396 self.value_in += raw_transaction['tx']['txouts'][tx_in['previous_output']['index']]['value']
397 #print raw_transaction
398 if not raw_transaction['parent_blocks']:
400 self.parents_all_in_blocks = True
403 if not self.parents_all_in_blocks:
410 for block_hash, block in itertools.islice(get_blocks(current_work.value['previous_block']), 10):
411 if block_hash == self.seen_at_block:
413 for tx in block['txs']:
414 mentions = set([bitcoin.data.tx_type.hash256(tx)] + [tx_in['previous_output']['hash'] for tx_in in tx['tx_ins'] if tx_in['previous_output'] is not None])
415 if mentions & self.mentions:
419 @defer.inlineCallbacks
422 assert isinstance(tx_hash, (int, long))
423 #print "REQUESTING", tx_hash
424 tx = yield (yield factory.getProtocol()).get_tx(tx_hash)
426 tx_pool[bitcoin.data.tx_type.hash256(tx)] = Tx(tx, current_work.value['previous_block'])
429 print 'Error handling tx:'
432 factory.new_tx.watch(new_tx)
434 @defer.inlineCallbacks
435 def new_block(block):
436 yield set_real_work1()
438 factory.new_block.watch(new_block)
440 print 'Started successfully!'
443 @defer.inlineCallbacks
446 yield deferral.sleep(random.expovariate(1/1))
448 yield set_real_work1()
453 @defer.inlineCallbacks
456 yield deferral.sleep(random.expovariate(1/1))
458 yield set_real_work2()
466 yield deferral.sleep(random.expovariate(1/1))
468 if current_work.value['best_share_hash'] is not None:
469 height, last = tracker.get_height_and_last(current_work.value['best_share_hash'])
470 att_s = p2pool.get_pool_attempts_per_second(tracker, current_work.value['best_share_hash'], args.net)
472 weights, total_weight = tracker.get_cumulative_weights(current_work.value['best_share_hash'], min(height, 1000), 2**100)
473 print 'Pool rate: %i mhash/s %i shares Contribution: %.02f%% >%i mhash/s' % (
476 weights.get(my_script, 0)/total_weight*100,
477 weights.get(my_script, 0)/total_weight*att_s//1000000,
490 defer.setDebugging(True)
492 parser = argparse.ArgumentParser(description='p2pool (version %s)' % (__version__,))
493 parser.add_argument('--version', action='version', version=__version__)
494 parser.add_argument('--testnet',
495 help='use the testnet',
496 action='store_const', const=p2pool.Testnet, default=p2pool.Mainnet, dest='net')
497 parser.add_argument('-a', '--address',
498 help='generate to this address (defaults to requesting one from bitcoind)',
499 type=str, action='store', default=None, dest='address')
501 p2pool_group = parser.add_argument_group('p2pool interface')
502 p2pool_group.add_argument('--p2pool-port', metavar='PORT',
503 help='use TCP port PORT to listen for connections (default: 9333 normally, 19333 for testnet) (forward this port from your router!)',
504 type=int, action='store', default=None, dest='p2pool_port')
505 p2pool_group.add_argument('-n', '--p2pool-node', metavar='ADDR[:PORT]',
506 help='connect to existing p2pool node at ADDR listening on TCP port PORT (defaults to 9333 normally, 19333 for testnet), in addition to builtin addresses',
507 type=str, action='append', default=[], dest='p2pool_nodes')
508 parser.add_argument('-l', '--low-bandwidth',
509 help='trade lower bandwidth usage for higher latency (reduced efficiency)',
510 action='store_true', default=False, dest='low_bandwidth')
512 worker_group = parser.add_argument_group('worker interface')
513 worker_group.add_argument('-w', '--worker-port', metavar='PORT',
514 help='listen on PORT for RPC connections from miners asking for work and providing responses (default: 9332)',
515 type=int, action='store', default=9332, dest='worker_port')
517 bitcoind_group = parser.add_argument_group('bitcoind interface')
518 bitcoind_group.add_argument('--bitcoind-address', metavar='BITCOIND_ADDRESS',
519 help='connect to a bitcoind at this address (default: 127.0.0.1)',
520 type=str, action='store', default='127.0.0.1', dest='bitcoind_address')
521 bitcoind_group.add_argument('--bitcoind-rpc-port', metavar='BITCOIND_RPC_PORT',
522 help='connect to a bitcoind at this port over the RPC interface - used to get the current highest block via getwork (default: 8332)',
523 type=int, action='store', default=8332, dest='bitcoind_rpc_port')
524 bitcoind_group.add_argument('--bitcoind-p2p-port', metavar='BITCOIND_P2P_PORT',
525 help='connect to a bitcoind at this port over the p2p interface - used to submit blocks and get the pubkey to generate to via an IP transaction (default: 8333 normally. 18333 for testnet)',
526 type=int, action='store', default=None, dest='bitcoind_p2p_port')
528 bitcoind_group.add_argument(metavar='BITCOIND_RPC_USERNAME',
529 help='bitcoind RPC interface username',
530 type=str, action='store', dest='bitcoind_rpc_username')
531 bitcoind_group.add_argument(metavar='BITCOIND_RPC_PASSWORD',
532 help='bitcoind RPC interface password',
533 type=str, action='store', dest='bitcoind_rpc_password')
535 args = parser.parse_args()
537 if args.bitcoind_p2p_port is None:
538 args.bitcoind_p2p_port = args.net.BITCOIN_P2P_PORT
540 if args.p2pool_port is None:
541 args.p2pool_port = args.net.P2P_PORT
543 if args.address is not None:
545 args.pubkey_hash = bitcoin.data.address_to_pubkey_hash(args.address, args.net)
547 raise ValueError("error parsing address: " + repr(e))
549 args.pubkey_hash = None
551 reactor.callWhenRunning(main, args)