3 from __future__ import division
15 from twisted.internet import defer, reactor, task
16 from twisted.web import server
17 from twisted.python import log
19 import bitcoin.p2p, bitcoin.getwork, bitcoin.data
20 from util import db, expiring_dict, jsonrpc, variable, deferral, math, skiplist
21 from . import p2p, worker_interface
22 import p2pool.data as p2pool
23 import p2pool as p2pool_init
25 @deferral.retry('Error getting work from bitcoind:', 3)
26 @defer.inlineCallbacks
27 def getwork(bitcoind):
28 # a block could arrive in between these two queries
29 getwork_df, height_df = bitcoind.rpc_getwork(), bitcoind.rpc_getblocknumber()
31 getwork, height = bitcoin.getwork.BlockAttempt.from_getwork((yield getwork_df)), (yield height_df)
33 # get rid of residual errors
34 getwork_df.addErrback(lambda fail: None)
35 height_df.addErrback(lambda fail: None)
36 defer.returnValue((getwork, height))
38 @deferral.retry('Error getting payout script from bitcoind:', 1)
39 @defer.inlineCallbacks
40 def get_payout_script(factory):
41 res = yield (yield factory.getProtocol()).check_order(order=bitcoin.p2p.Protocol.null_order)
42 if res['reply'] == 'success':
43 my_script = res['script']
44 elif res['reply'] == 'denied':
47 raise ValueError('Unexpected reply: %r' % (res,))
49 @deferral.retry('Error creating payout script:', 10)
50 @defer.inlineCallbacks
51 def get_payout_script2(bitcoind, net):
52 defer.returnValue(bitcoin.data.pubkey_hash_to_script2(bitcoin.data.address_to_pubkey_hash((yield bitcoind.rpc_getaccountaddress('p2pool')), net)))
54 @defer.inlineCallbacks
57 print 'p2pool (version %s)' % (p2pool_init.__version__,)
60 # connect to bitcoind over JSON-RPC and do initial getwork
61 url = 'http://%s:%i/' % (args.bitcoind_address, args.bitcoind_rpc_port)
62 print '''Testing bitcoind RPC connection to '%s' with authorization '%s:%s'...''' % (url, args.bitcoind_rpc_username, args.bitcoind_rpc_password)
63 bitcoind = jsonrpc.Proxy(url, (args.bitcoind_rpc_username, args.bitcoind_rpc_password))
64 temp_work, temp_height = yield getwork(bitcoind)
66 print ' Current block hash: %x height: %i' % (temp_work.previous_block, temp_height)
69 # connect to bitcoind over bitcoin-p2p and do checkorder to get pubkey to send payouts to
70 print '''Testing bitcoind P2P connection to '%s:%s'...''' % (args.bitcoind_address, args.bitcoind_p2p_port)
71 factory = bitcoin.p2p.ClientFactory(args.net)
72 reactor.connectTCP(args.bitcoind_address, args.bitcoind_p2p_port, factory)
73 my_script = yield get_payout_script(factory)
74 if args.pubkey_hash is None:
76 print 'IP transaction denied ... falling back to sending to address.'
77 my_script = yield get_payout_script2(bitcoind, args.net)
79 my_script = bitcoin.data.pubkey_hash_to_script2(args.pubkey_hash)
81 print ' Payout script:', my_script.encode('hex')
84 @defer.inlineCallbacks
85 def real_get_block(block_hash):
86 block = yield (yield factory.getProtocol()).get_block(block_hash)
87 print 'Got block %x' % (block_hash,)
88 defer.returnValue(block)
89 get_block = deferral.DeferredCacher(real_get_block, expiring_dict.ExpiringDict(3600))
91 get_raw_transaction = deferral.DeferredCacher(lambda tx_hash: bitcoind.rpc_getrawtransaction('%x' % tx_hash), expiring_dict.ExpiringDict(100))
93 ht = bitcoin.p2p.HeightTracker(factory)
95 tracker = p2pool.OkayTracker(args.net)
96 chains = expiring_dict.ExpiringDict(300)
97 def get_chain(chain_id_data):
98 return chains.setdefault(chain_id_data, Chain(chain_id_data))
100 # information affecting work that should trigger a long-polling update
101 current_work = variable.Variable(None)
102 # information affecting work that should not trigger a long-polling update
103 current_work2 = variable.Variable(None)
106 task.LoopingCall(requested.clear).start(60)
108 @defer.inlineCallbacks
109 def set_real_work1():
110 work, height = yield getwork(bitcoind)
111 current_work.set(dict(
112 version=work.version,
113 previous_block=work.previous_block,
116 best_share_hash=current_work.value['best_share_hash'] if current_work.value is not None else None,
118 current_work2.set(dict(
122 def set_real_work2():
123 best, desired = tracker.think(ht, current_work.value['previous_block'], current_work2.value['time'])
125 t = dict(current_work.value)
126 t['best_share_hash'] = best
129 for peer2, share_hash in desired:
132 if (peer2.nonce, share_hash) in requested:
134 print 'Requesting parent share %x' % (share_hash % 2**32,)
135 peer2.send_getshares(
138 stops=list(set(tracker.heads) | set(
139 tracker.get_nth_parent_hash(head, min(max(0, tracker.get_height_and_last(head)[0] - 1), 10)) for head in tracker.heads
142 requested.add((peer2.nonce, share_hash))
144 print 'Initializing work...'
145 yield set_real_work1()
146 yield set_real_work2()
149 # setup p2p logic and join p2pool network
151 def share_share(share, ignore_peer=None):
152 for peer in p2p_node.peers.itervalues():
153 if peer is ignore_peer:
155 peer.send_shares([share])
158 def p2p_shares(shares, peer=None):
160 if share.hash in tracker.shares:
161 #print 'Got duplicate share, ignoring. Hash: %x' % (share.hash % 2**32,)
164 #print 'Received share %x from %r' % (share.hash % 2**32, share.peer.transport.getPeer() if share.peer is not None else None)
167 #for peer2, share_hash in desired:
168 # print 'Requesting parent share %x' % (share_hash,)
169 # peer2.send_getshares(hashes=[share_hash], parents=2000)
171 if share.bitcoin_hash <= share.header['target']:
173 print 'GOT BLOCK! Passing to bitcoind! %x bitcoin: %x' % (share.hash % 2**32, share.bitcoin_hash,)
175 if factory.conn.value is not None:
176 factory.conn.value.send_block(block=share.as_block(tracker, net))
178 print 'No bitcoind connection! Erp!'
183 best, desired = tracker.think(ht, current_work.value['previous_block'], current_work2.value['time'])
185 if best == share.hash:
186 print ('MINE: ' if peer is None else '') + 'Accepted share, new best, will pass to peers! Hash: %x' % (share.hash % 2**32,)
188 print ('MINE: ' if peer is None else '') + 'Accepted share, not best. Hash: %x' % (share.hash % 2**32,)
190 w = dict(current_work.value)
191 w['best_share_hash'] = best
194 def p2p_share_hashes(share_hashes, peer):
196 for share_hash in share_hashes:
197 if share_hash in tracker.shares:
198 pass # print 'Got share hash, already have, ignoring. Hash: %x' % (share_hash % 2**32,)
200 print 'Got share hash, requesting! Hash: %x' % (share_hash % 2**32,)
201 get_hashes.append(share_hash)
203 peer.send_getshares(hashes=get_hashes, parents=0, stops=[])
205 def p2p_get_shares(share_hashes, parents, stops, peer):
206 parents = min(parents, 1000//len(share_hashes))
209 for share_hash in share_hashes:
210 for share in itertools.islice(tracker.get_chain_known(share_hash), parents + 1):
211 if share.hash in stops:
214 peer.send_shares(shares, full=True)
216 print 'Joining p2pool network using TCP port %i...' % (args.p2pool_port,)
220 ip, port = x.split(':')
223 return x, args.net.P2P_PORT
226 ('72.14.191.28', args.net.P2P_PORT),
227 ('62.204.197.159', args.net.P2P_PORT),
230 nodes.append(((yield reactor.resolve('p2pool.forre.st')), args.net.P2P_PORT))
233 print 'Error resolving bootstrap node IP:'
238 current_work=current_work,
239 port=args.p2pool_port,
241 addr_store=db.SQLiteDict(sqlite3.connect(os.path.join(os.path.dirname(sys.argv[0]), 'addrs.dat'), isolation_level=None), args.net.ADDRS_TABLE),
242 mode=0 if args.low_bandwidth else 1,
243 preferred_addrs=map(parse, args.p2pool_nodes) + nodes,
245 p2p_node.handle_shares = p2p_shares
246 p2p_node.handle_share_hashes = p2p_share_hashes
247 p2p_node.handle_get_shares = p2p_get_shares
251 # send share when the chain changes to their chain
252 def work_changed(new_work):
253 #print 'Work changed:', new_work
254 for share in tracker.get_chain_known(new_work['best_share_hash']):
257 share_share(share, share.peer)
258 current_work.changed.watch(work_changed)
263 # start listening for workers with a JSON-RPC server
265 print 'Listening for workers on port %i...' % (args.worker_port,)
269 merkle_root_to_transactions = expiring_dict.ExpiringDict(300)
270 run_identifier = struct.pack('<Q', random.randrange(2**64))
272 def compute(state, all_targets):
275 pre_extra_txs = [tx for tx in tx_pool.itervalues() if tx.is_good()]
276 pre_extra_txs = pre_extra_txs[:2**16 - 1] # merkle_branch limit
279 for tx in pre_extra_txs:
280 this_size = len(bitcoin.data.tx_type.pack(tx.tx))
281 if size + this_size > 500000:
286 # XXX assuming generate_tx is smallish here..
287 generate_tx = p2pool.generate_transaction(
289 previous_share_hash=state['best_share_hash'],
290 new_script=my_script,
291 subsidy=(50*100000000 >> (state['height'] + 1)//210000) + sum(tx.value_in - tx.value_out for tx in extra_txs),
292 nonce=run_identifier + struct.pack('<Q', random.randrange(2**64)),
293 block_target=state['target'],
296 print 'Generating! Difficulty: %.06f Payout if block: %.6f BTC' % (0xffff*2**208/p2pool.coinbase_type.unpack(generate_tx['tx_ins'][0]['script'])['share_data']['target'], generate_tx['tx_outs'][-1]['value']*1e-8)
297 #print 'Target: %x' % (p2pool.coinbase_type.unpack(generate_tx['tx_ins'][0]['script'])['share_data']['target'],)
298 #, have', shares.count(my_script) - 2, 'share(s) in the current chain. Fee:', sum(tx.value_in - tx.value_out for tx in extra_txs)/100000000
299 transactions = [generate_tx] + [tx.tx for tx in extra_txs]
300 merkle_root = bitcoin.data.merkle_hash(transactions)
301 merkle_root_to_transactions[merkle_root] = transactions # will stay for 1000 seconds
303 timestamp = current_work2.value['time']
304 if state['best_share_hash'] is not None:
305 timestamp2 = math.median((s.timestamp for s in itertools.islice(tracker.get_chain_to_root(state['best_share_hash']), 11)), use_float=False) + 1
306 if timestamp2 > timestamp:
307 print 'Toff', timestamp2 - timestamp
308 timestamp = timestamp2
309 target2 = p2pool.coinbase_type.unpack(generate_tx['tx_ins'][0]['script'])['share_data']['target']
311 target2 = min(2**256//2**32 - 1, target2)
312 print "TOOK", time.time() - start
313 times[p2pool.coinbase_type.unpack(generate_tx['tx_ins'][0]['script'])['share_data']['nonce']] = time.time()
314 #print 'SENT', 2**256//p2pool.coinbase_type.unpack(generate_tx['tx_ins'][0]['script'])['share_data']['target']
315 return bitcoin.getwork.BlockAttempt(state['version'], state['previous_block'], merkle_root, timestamp, state['target'], target2)
320 def got_response(data):
322 # match up with transactions
323 header = bitcoin.getwork.decode_data(data)
324 transactions = merkle_root_to_transactions.get(header['merkle_root'], None)
325 if transactions is None:
326 print '''Couldn't link returned work's merkle root with its transactions - should only happen if you recently restarted p2pool'''
328 block = dict(header=header, txs=transactions)
329 hash_ = bitcoin.data.block_header_type.hash256(block['header'])
330 if hash_ <= block['header']['target']:
332 print 'GOT BLOCK! Passing to bitcoind! %x' % (hash_,)
334 if factory.conn.value is not None:
335 factory.conn.value.send_block(block=block)
337 print 'No bitcoind connection! Erp!'
338 target = p2pool.coinbase_type.unpack(transactions[0]['tx_ins'][0]['script'])['share_data']['target']
340 print 'Received invalid share from worker - %x/%x' % (hash_, target)
342 share = p2pool.Share.from_block(block)
343 my_shares.add(share.hash)
344 print 'GOT SHARE! %x %x' % (share.hash, 0 if share.previous_hash is None else share.previous_hash), "DEAD ON ARRIVAL" if share.previous_hash != current_work.value['best_share_hash'] else "", time.time() - times[share.nonce]
348 print 'Error processing data received from worker:'
356 if current_work.value['best_share_hash'] is not None:
357 height, last = tracker.get_height_and_last(current_work.value['best_share_hash'])
358 att_s = p2pool.get_pool_attempts_per_second(tracker, current_work.value['best_share_hash'], args.net)
361 reactor.listenTCP(args.worker_port, server.Site(worker_interface.WorkerInterface(current_work, compute, got_response, get_rate)))
368 def get_blocks(start_hash):
371 block = get_block.call_now(start_hash)
372 except deferral.NotNowError:
374 yield start_hash, block
375 start_hash = block['header']['previous_block']
377 tx_pool = expiring_dict.ExpiringDict(600, get_touches=False) # hash -> tx
380 def __init__(self, tx, seen_at_block):
381 self.hash = bitcoin.data.tx_type.hash256(tx)
383 self.seen_at_block = seen_at_block
384 self.mentions = set([bitcoin.data.tx_type.hash256(tx)] + [tx_in['previous_output']['hash'] for tx_in in tx['tx_ins']])
386 #print '%x %r' % (seen_at_block, tx)
387 #for mention in self.mentions:
388 # print '%x' % mention
390 self.parents_all_in_blocks = False
393 self.value_out = sum(txout['value'] for txout in self.tx['tx_outs'])
394 self._find_parents_in_blocks()
396 @defer.inlineCallbacks
397 def _find_parents_in_blocks(self):
398 for tx_in in self.tx['tx_ins']:
400 raw_transaction = yield get_raw_transaction(tx_in['previous_output']['hash'])
403 self.value_in += raw_transaction['tx']['txouts'][tx_in['previous_output']['index']]['value']
404 #print raw_transaction
405 if not raw_transaction['parent_blocks']:
407 self.parents_all_in_blocks = True
410 if not self.parents_all_in_blocks:
417 for block_hash, block in itertools.islice(get_blocks(current_work.value['previous_block']), 10):
418 if block_hash == self.seen_at_block:
420 for tx in block['txs']:
421 mentions = set([bitcoin.data.tx_type.hash256(tx)] + [tx_in['previous_output']['hash'] for tx_in in tx['tx_ins'] if tx_in['previous_output'] is not None])
422 if mentions & self.mentions:
426 @defer.inlineCallbacks
429 assert isinstance(tx_hash, (int, long))
430 #print "REQUESTING", tx_hash
431 tx = yield (yield factory.getProtocol()).get_tx(tx_hash)
433 tx_pool[bitcoin.data.tx_type.hash256(tx)] = Tx(tx, current_work.value['previous_block'])
436 print 'Error handling tx:'
439 factory.new_tx.watch(new_tx)
441 @defer.inlineCallbacks
442 def new_block(block):
443 yield set_real_work1()
445 factory.new_block.watch(new_block)
447 print 'Started successfully!'
450 @defer.inlineCallbacks
453 yield deferral.sleep(random.expovariate(1/1))
455 yield set_real_work1()
460 @defer.inlineCallbacks
463 yield deferral.sleep(random.expovariate(1/1))
465 yield set_real_work2()
472 counter = skiplist.CountsSkipList(tracker, my_script, run_identifier)
475 yield deferral.sleep(random.expovariate(1/1))
477 if current_work.value['best_share_hash'] is not None:
478 height, last = tracker.get_height_and_last(current_work.value['best_share_hash'])
480 att_s = p2pool.get_pool_attempts_per_second(tracker, current_work.value['best_share_hash'], args.net)
481 weights, total_weight = tracker.get_cumulative_weights(current_work.value['best_share_hash'], min(height, 1000), 2**100)
482 count = counter(current_work.value['best_share_hash'], height, 2**100)
483 print 'Pool: %i mhash/s in %i shares Recent: %.02f%% >%i mhash/s Known: %i shares (so %i stales)' % (
486 weights.get(my_script, 0)/total_weight*100,
487 weights.get(my_script, 0)/total_weight*att_s//1000000,
489 len(my_shares) - count,
491 #weights, total_weight = tracker.get_cumulative_weights(current_work.value['best_share_hash'], min(height, 100), 2**100)
492 #for k, v in weights.iteritems():
493 # print k.encode('hex'), v/total_weight
505 defer.setDebugging(True)
507 parser = argparse.ArgumentParser(description='p2pool (version %s)' % (p2pool_init.__version__,))
508 parser.add_argument('--version', action='version', version=p2pool_init.__version__)
509 parser.add_argument('--testnet',
510 help='use the testnet',
511 action='store_const', const=p2pool.Testnet, default=p2pool.Mainnet, dest='net')
512 parser.add_argument('-a', '--address',
513 help='generate to this address (defaults to requesting one from bitcoind)',
514 type=str, action='store', default=None, dest='address')
516 p2pool_group = parser.add_argument_group('p2pool interface')
517 p2pool_group.add_argument('--p2pool-port', metavar='PORT',
518 help='use TCP port PORT to listen for connections (default: 9333 normally, 19333 for testnet) (forward this port from your router!)',
519 type=int, action='store', default=None, dest='p2pool_port')
520 p2pool_group.add_argument('-n', '--p2pool-node', metavar='ADDR[:PORT]',
521 help='connect to existing p2pool node at ADDR listening on TCP port PORT (defaults to 9333 normally, 19333 for testnet), in addition to builtin addresses',
522 type=str, action='append', default=[], dest='p2pool_nodes')
523 parser.add_argument('-l', '--low-bandwidth',
524 help='trade lower bandwidth usage for higher latency (reduced efficiency)',
525 action='store_true', default=False, dest='low_bandwidth')
527 worker_group = parser.add_argument_group('worker interface')
528 worker_group.add_argument('-w', '--worker-port', metavar='PORT',
529 help='listen on PORT for RPC connections from miners asking for work and providing responses (default: 9332)',
530 type=int, action='store', default=9332, dest='worker_port')
532 bitcoind_group = parser.add_argument_group('bitcoind interface')
533 bitcoind_group.add_argument('--bitcoind-address', metavar='BITCOIND_ADDRESS',
534 help='connect to a bitcoind at this address (default: 127.0.0.1)',
535 type=str, action='store', default='127.0.0.1', dest='bitcoind_address')
536 bitcoind_group.add_argument('--bitcoind-rpc-port', metavar='BITCOIND_RPC_PORT',
537 help='connect to a bitcoind at this port over the RPC interface - used to get the current highest block via getwork (default: 8332)',
538 type=int, action='store', default=8332, dest='bitcoind_rpc_port')
539 bitcoind_group.add_argument('--bitcoind-p2p-port', metavar='BITCOIND_P2P_PORT',
540 help='connect to a bitcoind at this port over the p2p interface - used to submit blocks and get the pubkey to generate to via an IP transaction (default: 8333 normally. 18333 for testnet)',
541 type=int, action='store', default=None, dest='bitcoind_p2p_port')
543 bitcoind_group.add_argument(metavar='BITCOIND_RPC_USERNAME',
544 help='bitcoind RPC interface username',
545 type=str, action='store', dest='bitcoind_rpc_username')
546 bitcoind_group.add_argument(metavar='BITCOIND_RPC_PASSWORD',
547 help='bitcoind RPC interface password',
548 type=str, action='store', dest='bitcoind_rpc_password')
550 args = parser.parse_args()
552 if args.bitcoind_p2p_port is None:
553 args.bitcoind_p2p_port = args.net.BITCOIN_P2P_PORT
555 if args.p2pool_port is None:
556 args.p2pool_port = args.net.P2P_PORT
558 if args.address is not None:
560 args.pubkey_hash = bitcoin.data.address_to_pubkey_hash(args.address, args.net)
562 raise ValueError("error parsing address: " + repr(e))
564 args.pubkey_hash = None
566 reactor.callWhenRunning(main, args)