3 from __future__ import division
14 from twisted.internet import defer, reactor, task
15 from twisted.web import server
16 from twisted.python import log
18 import bitcoin.p2p, bitcoin.getwork, bitcoin.data
19 from util import db, expiring_dict, jsonrpc, variable, deferral, math, skiplist
20 from . import p2p, worker_interface
21 import p2pool.data as p2pool
22 import p2pool as p2pool_init
24 @deferral.retry('Error getting work from bitcoind:', 3)
25 @defer.inlineCallbacks
26 def getwork(bitcoind):
27 # a block could arrive in between these two queries
28 getwork_df, height_df = bitcoind.rpc_getwork(), bitcoind.rpc_getblocknumber()
30 getwork, height = bitcoin.getwork.BlockAttempt.from_getwork((yield getwork_df)), (yield height_df)
32 # get rid of residual errors
33 getwork_df.addErrback(lambda fail: None)
34 height_df.addErrback(lambda fail: None)
35 defer.returnValue((getwork, height))
37 @deferral.retry('Error getting payout script from bitcoind:', 1)
38 @defer.inlineCallbacks
39 def get_payout_script(factory):
40 res = yield (yield factory.getProtocol()).check_order(order=bitcoin.p2p.Protocol.null_order)
41 if res['reply'] == 'success':
42 my_script = res['script']
43 elif res['reply'] == 'denied':
46 raise ValueError('Unexpected reply: %r' % (res,))
48 @deferral.retry('Error creating payout script:', 10)
49 @defer.inlineCallbacks
50 def get_payout_script2(bitcoind, net):
51 defer.returnValue(bitcoin.data.pubkey_hash_to_script2(bitcoin.data.address_to_pubkey_hash((yield bitcoind.rpc_getaccountaddress('p2pool')), net)))
53 @defer.inlineCallbacks
56 print 'p2pool (version %s)' % (p2pool_init.__version__,)
59 # connect to bitcoind over JSON-RPC and do initial getwork
60 url = 'http://%s:%i/' % (args.bitcoind_address, args.bitcoind_rpc_port)
61 print '''Testing bitcoind RPC connection to '%s' with username '%s'...''' % (url, args.bitcoind_rpc_username)
62 bitcoind = jsonrpc.Proxy(url, (args.bitcoind_rpc_username, args.bitcoind_rpc_password))
63 temp_work, temp_height = yield getwork(bitcoind)
65 print ' Current block hash: %x height: %i' % (temp_work.previous_block, temp_height)
68 # connect to bitcoind over bitcoin-p2p and do checkorder to get pubkey to send payouts to
69 print '''Testing bitcoind P2P connection to '%s:%s'...''' % (args.bitcoind_address, args.bitcoind_p2p_port)
70 factory = bitcoin.p2p.ClientFactory(args.net)
71 reactor.connectTCP(args.bitcoind_address, args.bitcoind_p2p_port, factory)
72 my_script = yield get_payout_script(factory)
73 if args.pubkey_hash is None:
75 print 'IP transaction denied ... falling back to sending to address.'
76 my_script = yield get_payout_script2(bitcoind, args.net)
78 my_script = bitcoin.data.pubkey_hash_to_script2(args.pubkey_hash)
80 print ' Payout script:', my_script.encode('hex')
83 @defer.inlineCallbacks
84 def real_get_block(block_hash):
85 block = yield (yield factory.getProtocol()).get_block(block_hash)
86 print 'Got block %x' % (block_hash,)
87 defer.returnValue(block)
88 get_block = deferral.DeferredCacher(real_get_block, expiring_dict.ExpiringDict(3600))
90 get_raw_transaction = deferral.DeferredCacher(lambda tx_hash: bitcoind.rpc_getrawtransaction('%x' % tx_hash), expiring_dict.ExpiringDict(100))
92 ht = bitcoin.p2p.HeightTracker(factory)
94 tracker = p2pool.OkayTracker(args.net)
95 chains = expiring_dict.ExpiringDict(300)
96 def get_chain(chain_id_data):
97 return chains.setdefault(chain_id_data, Chain(chain_id_data))
99 # information affecting work that should trigger a long-polling update
100 current_work = variable.Variable(None)
101 # information affecting work that should not trigger a long-polling update
102 current_work2 = variable.Variable(None)
105 task.LoopingCall(requested.clear).start(60)
107 @defer.inlineCallbacks
108 def set_real_work1():
109 work, height = yield getwork(bitcoind)
110 current_work.set(dict(
111 version=work.version,
112 previous_block=work.previous_block,
115 best_share_hash=current_work.value['best_share_hash'] if current_work.value is not None else None,
117 current_work2.set(dict(
118 clock_offset=time.time() - work.timestamp,
121 def set_real_work2():
122 best, desired = tracker.think(ht, current_work.value['previous_block'], time.time() - current_work2.value['clock_offset'])
124 t = dict(current_work.value)
125 t['best_share_hash'] = best
128 for peer2, share_hash in desired:
131 if (peer2.nonce, share_hash) in requested:
133 print 'Requesting parent share %x' % (share_hash % 2**32,)
134 peer2.send_getshares(
137 stops=list(set(tracker.heads) | set(
138 tracker.get_nth_parent_hash(head, min(max(0, tracker.get_height_and_last(head)[0] - 1), 10)) for head in tracker.heads
141 requested.add((peer2.nonce, share_hash))
143 print 'Initializing work...'
144 yield set_real_work1()
145 yield set_real_work2()
148 start_time = time.time() - current_work2.value['clock_offset']
150 # setup p2p logic and join p2pool network
152 def share_share(share, ignore_peer=None):
153 for peer in p2p_node.peers.itervalues():
154 if peer is ignore_peer:
156 peer.send_shares([share])
159 def p2p_shares(shares, peer=None):
161 print "Processing %i shares..." % (len(shares),)
165 if share.hash in tracker.shares:
166 #print 'Got duplicate share, ignoring. Hash: %x' % (share.hash % 2**32,)
170 #print 'Received share %x from %r' % (share.hash % 2**32, share.peer.transport.getPeer() if share.peer is not None else None)
173 #for peer2, share_hash in desired:
174 # print 'Requesting parent share %x' % (share_hash,)
175 # peer2.send_getshares(hashes=[share_hash], parents=2000)
177 if share.bitcoin_hash <= share.header['target']:
179 print 'GOT BLOCK! Passing to bitcoind! %x bitcoin: %x' % (share.hash % 2**32, share.bitcoin_hash,)
181 if factory.conn.value is not None:
182 factory.conn.value.send_block(block=share.as_block(tracker, args.net))
184 print 'No bitcoind connection! Erp!'
189 best, desired = tracker.think(ht, current_work.value['previous_block'], time.time() - current_work2.value['clock_offset'])
191 if best == share.hash:
192 print ('MINE: ' if peer is None else '') + 'Accepted share, new best, will pass to peers! Hash: %x' % (share.hash % 2**32,)
194 print ('MINE: ' if peer is None else '') + 'Accepted share, not best. Hash: %x' % (share.hash % 2**32,)
196 w = dict(current_work.value)
197 w['best_share_hash'] = best
201 print "... done processing %i shares." % (len(shares),)
203 def p2p_share_hashes(share_hashes, peer):
205 for share_hash in share_hashes:
206 if share_hash in tracker.shares:
207 pass # print 'Got share hash, already have, ignoring. Hash: %x' % (share_hash % 2**32,)
209 print 'Got share hash, requesting! Hash: %x' % (share_hash % 2**32,)
210 get_hashes.append(share_hash)
212 peer.send_getshares(hashes=get_hashes, parents=0, stops=[])
214 def p2p_get_shares(share_hashes, parents, stops, peer):
215 parents = min(parents, 1000//len(share_hashes))
218 for share_hash in share_hashes:
219 for share in itertools.islice(tracker.get_chain_known(share_hash), parents + 1):
220 if share.hash in stops:
223 peer.send_shares(shares, full=True)
225 print 'Joining p2pool network using TCP port %i...' % (args.p2pool_port,)
229 ip, port = x.split(':')
232 return x, args.net.P2P_PORT
235 ('72.14.191.28', args.net.P2P_PORT),
236 ('62.204.197.159', args.net.P2P_PORT),
239 nodes.append(((yield reactor.resolve('p2pool.forre.st')), args.net.P2P_PORT))
242 print 'Error resolving bootstrap node IP:'
247 current_work=current_work,
248 port=args.p2pool_port,
250 addr_store=db.SQLiteDict(sqlite3.connect(os.path.join(os.path.dirname(sys.argv[0]), 'addrs.dat'), isolation_level=None), args.net.ADDRS_TABLE),
251 mode=0 if args.low_bandwidth else 1,
252 preferred_addrs=map(parse, args.p2pool_nodes) + nodes,
254 p2p_node.handle_shares = p2p_shares
255 p2p_node.handle_share_hashes = p2p_share_hashes
256 p2p_node.handle_get_shares = p2p_get_shares
260 # send share when the chain changes to their chain
261 def work_changed(new_work):
262 #print 'Work changed:', new_work
263 for share in tracker.get_chain_known(new_work['best_share_hash']):
266 share_share(share, share.peer)
267 current_work.changed.watch(work_changed)
272 # start listening for workers with a JSON-RPC server
274 print 'Listening for workers on port %i...' % (args.worker_port,)
278 merkle_root_to_transactions = expiring_dict.ExpiringDict(300)
279 run_identifier = struct.pack('<Q', random.randrange(2**64))
281 def compute(state, all_targets):
282 pre_extra_txs = [tx for tx in tx_pool.itervalues() if tx.is_good()]
283 pre_extra_txs = pre_extra_txs[:2**16 - 1] # merkle_branch limit
286 for tx in pre_extra_txs:
287 this_size = len(bitcoin.data.tx_type.pack(tx.tx))
288 if size + this_size > 500000:
293 # XXX assuming generate_tx is smallish here..
294 generate_tx = p2pool.generate_transaction(
296 previous_share_hash=state['best_share_hash'],
297 new_script=my_script,
298 subsidy=(50*100000000 >> (state['height'] + 1)//210000) + sum(tx.value_in - tx.value_out for tx in extra_txs),
299 nonce=run_identifier + struct.pack('<Q', random.randrange(2**64)),
300 block_target=state['target'],
303 print 'Generating! Difficulty: %.06f Payout if block: %.6f BTC' % (0xffff*2**208/p2pool.coinbase_type.unpack(generate_tx['tx_ins'][0]['script'])['share_data']['target'], generate_tx['tx_outs'][-1]['value']*1e-8)
304 #print 'Target: %x' % (p2pool.coinbase_type.unpack(generate_tx['tx_ins'][0]['script'])['share_data']['target'],)
305 #, have', shares.count(my_script) - 2, 'share(s) in the current chain. Fee:', sum(tx.value_in - tx.value_out for tx in extra_txs)/100000000
306 transactions = [generate_tx] + [tx.tx for tx in extra_txs]
307 merkle_root = bitcoin.data.merkle_hash(transactions)
308 merkle_root_to_transactions[merkle_root] = transactions # will stay for 1000 seconds
310 timestamp = int(time.time() - current_work2.value['clock_offset'])
311 if state['best_share_hash'] is not None:
312 timestamp2 = math.median((s.timestamp for s in itertools.islice(tracker.get_chain_to_root(state['best_share_hash']), 11)), use_float=False) + 1
313 if timestamp2 > timestamp:
314 print 'Toff', timestamp2 - timestamp
315 timestamp = timestamp2
316 target2 = p2pool.coinbase_type.unpack(generate_tx['tx_ins'][0]['script'])['share_data']['target']
318 target2 = min(2**256//2**32 - 1, target2)
319 times[p2pool.coinbase_type.unpack(generate_tx['tx_ins'][0]['script'])['share_data']['nonce']] = time.time()
320 #print 'SENT', 2**256//p2pool.coinbase_type.unpack(generate_tx['tx_ins'][0]['script'])['share_data']['target']
321 return bitcoin.getwork.BlockAttempt(state['version'], state['previous_block'], merkle_root, timestamp, state['target'], target2)
326 def got_response(data):
328 # match up with transactions
329 header = bitcoin.getwork.decode_data(data)
330 transactions = merkle_root_to_transactions.get(header['merkle_root'], None)
331 if transactions is None:
332 print '''Couldn't link returned work's merkle root with its transactions - should only happen if you recently restarted p2pool'''
334 block = dict(header=header, txs=transactions)
335 hash_ = bitcoin.data.block_header_type.hash256(block['header'])
336 if hash_ <= block['header']['target'] or p2pool_init.DEBUG:
338 print 'GOT BLOCK! Passing to bitcoind! %x' % (hash_,)
340 if factory.conn.value is not None:
341 factory.conn.value.send_block(block=block)
343 print 'No bitcoind connection! Erp!'
344 target = p2pool.coinbase_type.unpack(transactions[0]['tx_ins'][0]['script'])['share_data']['target']
346 print 'Received invalid share from worker - %x/%x' % (hash_, target)
348 share = p2pool.Share.from_block(block)
349 my_shares.add(share.hash)
350 print 'GOT SHARE! %x %x' % (share.hash % 2**32, 0 if share.previous_hash is None else share.previous_hash), "DEAD ON ARRIVAL" if share.previous_hash != current_work.value['best_share_hash'] else "", time.time() - times[share.nonce]
354 print 'Error processing data received from worker:'
362 if current_work.value['best_share_hash'] is not None:
363 height, last = tracker.get_height_and_last(current_work.value['best_share_hash'])
364 att_s = p2pool.get_pool_attempts_per_second(tracker, current_work.value['best_share_hash'], args.net)
367 reactor.listenTCP(args.worker_port, server.Site(worker_interface.WorkerInterface(current_work, compute, got_response, get_rate)))
374 def get_blocks(start_hash):
377 block = get_block.call_now(start_hash)
378 except deferral.NotNowError:
380 yield start_hash, block
381 start_hash = block['header']['previous_block']
383 tx_pool = expiring_dict.ExpiringDict(600, get_touches=False) # hash -> tx
386 def __init__(self, tx, seen_at_block):
387 self.hash = bitcoin.data.tx_type.hash256(tx)
389 self.seen_at_block = seen_at_block
390 self.mentions = set([bitcoin.data.tx_type.hash256(tx)] + [tx_in['previous_output']['hash'] for tx_in in tx['tx_ins']])
392 #print '%x %r' % (seen_at_block, tx)
393 #for mention in self.mentions:
394 # print '%x' % mention
396 self.parents_all_in_blocks = False
399 self.value_out = sum(txout['value'] for txout in self.tx['tx_outs'])
400 self._find_parents_in_blocks()
402 @defer.inlineCallbacks
403 def _find_parents_in_blocks(self):
404 for tx_in in self.tx['tx_ins']:
406 raw_transaction = yield get_raw_transaction(tx_in['previous_output']['hash'])
409 self.value_in += raw_transaction['tx']['txouts'][tx_in['previous_output']['index']]['value']
410 #print raw_transaction
411 if not raw_transaction['parent_blocks']:
413 self.parents_all_in_blocks = True
416 if not self.parents_all_in_blocks:
423 for block_hash, block in itertools.islice(get_blocks(current_work.value['previous_block']), 10):
424 if block_hash == self.seen_at_block:
426 for tx in block['txs']:
427 mentions = set([bitcoin.data.tx_type.hash256(tx)] + [tx_in['previous_output']['hash'] for tx_in in tx['tx_ins'] if tx_in['previous_output'] is not None])
428 if mentions & self.mentions:
432 @defer.inlineCallbacks
435 assert isinstance(tx_hash, (int, long))
436 #print "REQUESTING", tx_hash
437 tx = yield (yield factory.getProtocol()).get_tx(tx_hash)
439 tx_pool[bitcoin.data.tx_type.hash256(tx)] = Tx(tx, current_work.value['previous_block'])
442 print 'Error handling tx:'
445 factory.new_tx.watch(new_tx)
447 @defer.inlineCallbacks
448 def new_block(block):
449 yield set_real_work1()
451 factory.new_block.watch(new_block)
453 print 'Started successfully!'
456 @defer.inlineCallbacks
459 yield deferral.sleep(random.expovariate(1/1))
461 yield set_real_work1()
466 @defer.inlineCallbacks
469 yield deferral.sleep(random.expovariate(1/1))
471 yield set_real_work2()
478 counter = skiplist.CountsSkipList(tracker, my_script, run_identifier)
481 yield deferral.sleep(random.expovariate(1/1))
483 if current_work.value['best_share_hash'] is not None:
484 height, last = tracker.get_height_and_last(current_work.value['best_share_hash'])
486 att_s = p2pool.get_pool_attempts_per_second(tracker, current_work.value['best_share_hash'], args.net)
487 weights, total_weight = tracker.get_cumulative_weights(current_work.value['best_share_hash'], min(height, 1000), 2**100)
488 count = counter(current_work.value['best_share_hash'], height, 2**100)
489 print 'Pool: %sH/s in %i shares Recent: %.02f%% >%sH/s Known: %i shares (so %i stales)' % (
492 weights.get(my_script, 0)/total_weight*100,
493 math.format(weights.get(my_script, 0)/total_weight*att_s),
495 len(my_shares) - count,
497 #weights, total_weight = tracker.get_cumulative_weights(current_work.value['best_share_hash'], min(height, 100), 2**100)
498 #for k, v in weights.iteritems():
499 # print k.encode('hex'), v/total_weight
510 parser = argparse.ArgumentParser(description='p2pool (version %s)' % (p2pool_init.__version__,))
511 parser.add_argument('--version', action='version', version=p2pool_init.__version__)
512 parser.add_argument('--testnet',
513 help='use the testnet',
514 action='store_const', const=p2pool.Testnet, default=p2pool.Mainnet, dest='net')
515 parser.add_argument('--debug',
516 help='debugging mode',
517 action='store_const', const=True, default=False, dest='debug')
518 parser.add_argument('-a', '--address',
519 help='generate to this address (defaults to requesting one from bitcoind)',
520 type=str, action='store', default=None, dest='address')
522 p2pool_group = parser.add_argument_group('p2pool interface')
523 p2pool_group.add_argument('--p2pool-port', metavar='PORT',
524 help='use TCP port PORT to listen for connections (default: 9333 normally, 19333 for testnet) (forward this port from your router!)',
525 type=int, action='store', default=None, dest='p2pool_port')
526 p2pool_group.add_argument('-n', '--p2pool-node', metavar='ADDR[:PORT]',
527 help='connect to existing p2pool node at ADDR listening on TCP port PORT (defaults to 9333 normally, 19333 for testnet), in addition to builtin addresses',
528 type=str, action='append', default=[], dest='p2pool_nodes')
529 parser.add_argument('-l', '--low-bandwidth',
530 help='trade lower bandwidth usage for higher latency (reduced efficiency)',
531 action='store_true', default=False, dest='low_bandwidth')
533 worker_group = parser.add_argument_group('worker interface')
534 worker_group.add_argument('-w', '--worker-port', metavar='PORT',
535 help='listen on PORT for RPC connections from miners asking for work and providing responses (default: 9332)',
536 type=int, action='store', default=9332, dest='worker_port')
538 bitcoind_group = parser.add_argument_group('bitcoind interface')
539 bitcoind_group.add_argument('--bitcoind-address', metavar='BITCOIND_ADDRESS',
540 help='connect to a bitcoind at this address (default: 127.0.0.1)',
541 type=str, action='store', default='127.0.0.1', dest='bitcoind_address')
542 bitcoind_group.add_argument('--bitcoind-rpc-port', metavar='BITCOIND_RPC_PORT',
543 help='connect to a bitcoind at this port over the RPC interface - used to get the current highest block via getwork (default: 8332)',
544 type=int, action='store', default=8332, dest='bitcoind_rpc_port')
545 bitcoind_group.add_argument('--bitcoind-p2p-port', metavar='BITCOIND_P2P_PORT',
546 help='connect to a bitcoind at this port over the p2p interface - used to submit blocks and get the pubkey to generate to via an IP transaction (default: 8333 normally. 18333 for testnet)',
547 type=int, action='store', default=None, dest='bitcoind_p2p_port')
549 bitcoind_group.add_argument(metavar='BITCOIND_RPC_USERNAME',
550 help='bitcoind RPC interface username',
551 type=str, action='store', dest='bitcoind_rpc_username')
552 bitcoind_group.add_argument(metavar='BITCOIND_RPC_PASSWORD',
553 help='bitcoind RPC interface password',
554 type=str, action='store', dest='bitcoind_rpc_password')
556 args = parser.parse_args()
559 p2pool_init.DEBUG = True
561 if args.bitcoind_p2p_port is None:
562 args.bitcoind_p2p_port = args.net.BITCOIN_P2P_PORT
564 if args.p2pool_port is None:
565 args.p2pool_port = args.net.P2P_PORT
567 if args.address is not None:
569 args.pubkey_hash = bitcoin.data.address_to_pubkey_hash(args.address, args.net)
571 raise ValueError("error parsing address: " + repr(e))
573 args.pubkey_hash = None
575 reactor.callWhenRunning(main, args)