3 from __future__ import division
14 from twisted.internet import defer, reactor
15 from twisted.web import server
17 import bitcoin.p2p, bitcoin.getwork, bitcoin.data
22 import p2pool.data as p2pool
24 import worker_interface
27 __version__ = subprocess.Popen(['svnversion', os.path.dirname(sys.argv[0])], stdout=subprocess.PIPE).stdout.read().strip()
29 __version__ = 'unknown'
31 if hasattr(sys, "frozen"):
32 __file__ = sys.executable
35 def __init__(self, chain_id_data):
36 self.chain_id_data = chain_id_data
37 self.last_p2pool_block_hash = p2pool.chain_id_type.unpack(chain_id_data)['last_p2pool_block_hash']
39 self.share2s = {} # hash -> share2
40 self.highest = util.Variable(None) # hash
42 self.requesting = set()
45 def accept(self, share, net):
46 if share.chain_id_data != self.chain_id_data:
47 raise ValueError('share does not belong to this chain')
49 if share.hash in self.share2s:
52 if share.previous_share_hash is None:
53 previous_height, previous_share2 = -1, None
54 elif share.previous_share_hash not in self.share2s:
57 previous_share2 = self.share2s[share.previous_share_hash]
58 previous_height = previous_share2.height
60 height = previous_height + 1
62 share2 = share.check(self, height, previous_share2, net) # raises exceptions
64 if share2.share is not share:
67 self.share2s[share.hash] = share2
69 if self.highest.value is None or height > self.share2s[self.highest.value].height:
70 self.highest.set(share.hash)
74 def get_highest_share2(self):
75 return self.share2s[self.highest.value] if self.highest.value is not None else None
77 def get_down(self, share_hash):
81 blocks.append(share_hash)
82 if share_hash not in self.share2s:
84 share2 = self.share2s[share_hash]
85 if share2.share.previous_share_hash is None:
87 share_hash = share2.share.previous_share_hash
91 @defer.inlineCallbacks
92 def get_last_p2pool_block_hash(current_block_hash, get_block, net):
93 block_hash = current_block_hash
95 if block_hash == net.ROOT_BLOCK:
96 defer.returnValue(block_hash)
98 block = yield get_block(block_hash)
100 traceback.print_exc()
102 coinbase_data = block['txs'][0]['tx_ins'][0]['script']
104 coinbase = p2pool.coinbase_type.unpack(coinbase_data)
105 except bitcoin.data.EarlyEnd:
109 if coinbase['identifier'] == net.IDENTIFIER:
111 for tx_out in block['txs'][0]['tx_outs']:
112 payouts[tx_out['script']] = payouts.get(tx_out['script'], 0) + tx_out['value']
113 subsidy = sum(payouts.itervalues())
114 if coinbase['subsidy'] == subsidy:
115 if payouts.get(net.SCRIPT, 0) >= subsidy//64:
116 defer.returnValue(block_hash)
119 print 'Error matching block:'
120 print 'block:', block
121 traceback.print_exc()
123 block_hash = block['header']['previous_block']
125 @defer.inlineCallbacks
126 def getwork(bitcoind):
129 # a block could arrive in between these two queries
130 getwork_df, height_df = bitcoind.rpc_getwork(), bitcoind.rpc_getblocknumber()
132 getwork, height = bitcoin.getwork.BlockAttempt.from_getwork((yield getwork_df)), (yield height_df)
134 # get rid of residual errors
135 getwork_df.addErrback(lambda fail: None)
136 height_df.addErrback(lambda fail: None)
139 print 'Error getting work from bitcoind:'
140 traceback.print_exc()
147 defer.returnValue((getwork, height))
150 @defer.inlineCallbacks
153 net = p2pool.Testnet if args.testnet else p2pool.Main
155 print 'p2pool (version %s)' % (__version__,)
158 # connect to bitcoind over JSON-RPC and do initial getwork
159 url = 'http://%s:%i/' % (args.bitcoind_address, args.bitcoind_rpc_port)
160 print "Testing bitcoind RPC connection to '%s' with authorization '%s:%s'..." % (url, args.bitcoind_rpc_username, args.bitcoind_rpc_password)
161 bitcoind = jsonrpc.Proxy(url, (args.bitcoind_rpc_username, args.bitcoind_rpc_password))
163 work, height = yield getwork(bitcoind)
166 print ' Current block hash: %x height: %i' % (work.previous_block, height)
169 # connect to bitcoind over bitcoin-p2p and do checkorder to get pubkey to send payouts to
170 print "Testing bitcoind P2P connection to '%s:%s'..." % (args.bitcoind_address, args.bitcoind_p2p_port)
171 factory = bitcoin.p2p.ClientFactory(args.testnet)
172 reactor.connectTCP(args.bitcoind_address, args.bitcoind_p2p_port, factory)
176 res = yield (yield factory.getProtocol()).check_order(order=bitcoin.p2p.Protocol.null_order)
177 if res['reply'] != 'success':
179 print 'Error getting payout script:'
183 my_script = res['script']
186 print 'Error getting payout script:'
187 traceback.print_exc()
194 print ' Payout script:', my_script.encode('hex')
197 @defer.inlineCallbacks
198 def real_get_block(block_hash):
199 block = yield (yield factory.getProtocol()).get_block(block_hash)
200 print 'Got block %x' % (block_hash,)
201 defer.returnValue(block)
202 get_block = util.DeferredCacher(real_get_block, expiring_dict.ExpiringDict(3600))
204 get_raw_transaction = util.DeferredCacher(lambda tx_hash: bitcoind.rpc_getrawtransaction('%x' % tx_hash), expiring_dict.ExpiringDict(100))
206 chains = expiring_dict.ExpiringDict(300)
207 def get_chain(chain_id_data):
208 return chains.setdefault(chain_id_data, Chain(chain_id_data))
209 # information affecting work that should trigger a long-polling update
210 current_work = util.Variable(None)
211 # information affecting work that should not trigger a long-polling update
212 current_work2 = util.Variable(None)
214 share_dbs = [db.SQLiteDict(sqlite3.connect(filename, isolation_level=None), 'shares') for filename in args.store_shares]
216 @defer.inlineCallbacks
218 work, height = yield getwork(bitcoind)
219 last_p2pool_block_hash = (yield get_last_p2pool_block_hash(work.previous_block, get_block, net))
220 chain = get_chain(p2pool.chain_id_type.pack(dict(last_p2pool_block_hash=last_p2pool_block_hash, bits=work.bits)))
221 current_work.set(dict(
222 version=work.version,
223 previous_block=work.previous_block,
227 highest_p2pool_share2=chain.get_highest_share2(),
228 last_p2pool_block_hash=last_p2pool_block_hash,
230 current_work2.set(dict(
231 timestamp=work.timestamp,
234 print 'Searching for last p2pool-generated block...'
235 yield set_real_work()
237 print ' Matched block %x' % (current_work.value['last_p2pool_block_hash'],)
240 # setup p2p logic and join p2pool network
242 def share_share2(share2, ignore_peer=None):
243 for peer in p2p_node.peers.itervalues():
244 if peer is ignore_peer:
246 peer.send_share(share2.share)
249 def p2p_share(share, peer=None):
250 if share.hash <= conv.bits_to_target(share.header['bits']):
252 print 'GOT BLOCK! Passing to bitcoind! %x' % (share.hash,)
253 #print share.__dict__
255 if factory.conn is not None:
256 factory.conn.send_block(block=share.as_block())
258 print 'No bitcoind connection! Erp!'
260 chain = get_chain(share.chain_id_data)
261 res = chain.accept(share, net)
263 share2 = chain.share2s[share.hash]
266 hash_data = bitcoin.p2p.HashType().pack(share.hash)
267 share1_data = p2pool.share1.pack(share.as_share1())
268 for share_db in share_dbs:
269 share_db[hash_data] = share1_data
270 reactor.callLater(1, save)
272 if chain is current_work.value['current_chain']:
273 if share.hash == chain.highest.value:
274 print 'Accepted share, passing to peers. Height: %i Hash: %x Script: %s' % (share2.height, share.hash, share2.shares[-1].encode('hex'))
275 share_share2(share2, peer)
277 print 'Accepted share, not highest. Height: %i Hash: %x' % (share2.height, share.hash,)
279 print 'Accepted share to non-current chain. Height: %i Hash: %x' % (share2.height, share.hash,)
281 print 'Got duplicate share, ignoring. Hash: %x' % (share.hash,)
282 elif res == 'orphan':
283 print 'Got share referencing unknown share, requesting past shares from peer. Hash: %x' % (share.hash,)
287 chain_id=p2pool.chain_id_type.unpack(share.chain_id_data),
288 have=random.sample(chain.share2s.keys(), min(8, len(chain.share2s))) + [chain.share2s[chain.highest.value].share.hash] if chain.highest.value is not None else [],
291 raise ValueError('unknown result from chain.accept - %r' % (res,))
293 w = dict(current_work.value)
294 w['highest_p2pool_share2'] = w['current_chain'].get_highest_share2()
297 def p2p_share_hash(chain_id_data, hash, peer):
298 chain = get_chain(chain_id_data)
299 if chain is current_work.value['current_chain']:
300 if hash not in chain.share2s:
301 print "Got share hash, requesting! Hash: %x" % (hash,)
302 peer.send_getshares(chain_id=p2pool.chain_id_type.unpack(chain_id_data), hashes=[hash])
304 print "Got share hash, already have, ignoring. Hash: %x" % (hash,)
306 print "Got share hash to non-current chain, storing. Hash: %x" % (hash,)
307 chain.request_map.setdefault(hash, []).append(peer)
309 def p2p_get_to_best(chain_id_data, have, peer):
310 chain = get_chain(chain_id_data)
311 if chain.highest.value is None:
314 chain_hashes = chain.get_down(chain.highest.value)
318 have2 |= set(chain.get_down(hash_))
320 for share_hash in reversed(chain_hashes):
321 if share_hash in have2:
323 peer.send_share(chain.share2s[share_hash].share, full=True) # doesn't have to be full ... but does that still guarantee ordering?
325 def p2p_get_shares(chain_id_data, hashes, peer):
326 chain = get_chain(chain_id_data)
328 if hash_ in chain.share2s:
329 peer.send_share(chain.share2s[hash_].share, full=True)
331 print 'Joining p2pool network using TCP port %i...' % (args.p2pool_port,)
335 ip, port = x.split(':')
338 return x, {False: 9333, True: 19333}[args.testnet]
341 nodes = [('72.14.191.28', 19333)]
343 nodes = [('72.14.191.28', 9333)]
345 nodes.append(((yield reactor.resolve('p2pool.forre.st')), {False: 9333, True: 19333}[args.testnet]))
347 traceback.print_exc()
350 current_work=current_work,
351 port=args.p2pool_port,
353 addr_store=db.SQLiteDict(sqlite3.connect(os.path.join(os.path.dirname(__file__), 'addrs.dat'), isolation_level=None), net.ADDRS_TABLE),
354 mode=0 if args.low_bandwidth else 1,
355 preferred_addrs=map(parse, args.p2pool_nodes) + nodes,
357 p2p_node.handle_share = p2p_share
358 p2p_node.handle_share_hash = p2p_share_hash
359 p2p_node.handle_get_to_best = p2p_get_to_best
360 p2p_node.handle_get_shares = p2p_get_shares
364 # send share when the chain changes to their chain
365 def work_changed(new_work):
366 #print 'Work changed:', new_work
367 chain = new_work['current_chain']
368 if chain.highest.value is not None:
369 for share_hash in chain.get_down(chain.highest.value):
370 share2 = chain.share2s[share_hash]
371 if not share2.shared:
372 print 'Sharing share of switched to chain. Hash:', share2.share.hash
374 for hash, peers in chain.request_map.iteritems():
375 if hash not in chain.share2s:
376 random.choice(peers).send_getshares(hashes=[hash])
377 current_work.changed.watch(work_changed)
382 # start listening for workers with a JSON-RPC server
384 print 'Listening for workers on port %i...' % (args.worker_port,)
388 merkle_root_to_transactions = expiring_dict.ExpiringDict(300)
391 extra_txs = [tx for tx in tx_pool.itervalues() if tx.is_good()]
392 generate_tx, shares = p2pool.generate_transaction(
393 last_p2pool_block_hash=state['last_p2pool_block_hash'],
394 previous_share2=state['highest_p2pool_share2'],
395 new_script=my_script,
396 subsidy=(50*100000000 >> state['height']//210000) + sum(tx.value_in - tx.value_out for tx in extra_txs),
397 nonce=struct.pack("<Q", random.randrange(2**64)),
400 print 'Generating, have', shares.count(my_script) - 2, 'share(s) in the current chain. Fee:', sum(tx.value_in - tx.value_out for tx in extra_txs)/100000000
401 transactions = [generate_tx] + [tx.tx for tx in extra_txs]
402 merkle_root = bitcoin.p2p.merkle_hash(transactions)
403 merkle_root_to_transactions[merkle_root] = transactions # will stay for 1000 seconds
404 ba = conv.BlockAttempt(state['version'], state['previous_block'], merkle_root, current_work2.value['timestamp'], state['bits'])
405 return ba.getwork(net.TARGET_MULTIPLIER)
407 def got_response(data):
408 # match up with transactions
409 header = conv.decode_data(data)
410 transactions = merkle_root_to_transactions.get(header['merkle_root'], None)
411 if transactions is None:
412 print "Couldn't link returned work's merkle root with its transactions - should only happen if you recently restarted p2pool"
414 share = p2pool.Share(header=header, txs=transactions)
415 print 'GOT SHARE! %x' % (share.hash,)
420 print 'Error processing data received from worker:'
421 traceback.print_exc()
427 reactor.listenTCP(args.worker_port, server.Site(worker_interface.WorkerInterface(current_work, compute, got_response)))
434 def get_blocks(start_hash):
437 block = get_block.call_now(start_hash)
438 except util.NotNowError:
440 yield start_hash, block
441 start_hash = block['header']['previous_block']
443 tx_pool = expiring_dict.ExpiringDict(600, get_touches=False) # hash -> tx
446 def __init__(self, tx, seen_at_block):
447 self.hash = bitcoin.data.tx_hash(tx)
449 self.seen_at_block = seen_at_block
450 self.mentions = set([bitcoin.data.tx_hash(tx)] + [tx_in['previous_output']['hash'] for tx_in in tx['tx_ins']])
452 #print "%x %r" % (seen_at_block, tx)
453 #for mention in self.mentions:
454 # print "%x" % mention
456 self.parents_all_in_blocks = False
459 self.value_out = sum(txout['value'] for txout in self.tx['tx_outs'])
460 self._find_parents_in_blocks()
462 @defer.inlineCallbacks
463 def _find_parents_in_blocks(self):
464 for tx_in in self.tx['tx_ins']:
466 raw_transaction = yield get_raw_transaction(tx_in['previous_output']['hash'])
469 self.value_in += raw_transaction['tx']['txouts'][tx_in['previous_output']['index']]['value']
470 #print raw_transaction
471 if not raw_transaction['parent_blocks']:
473 self.parents_all_in_blocks = True
476 if not self.parents_all_in_blocks:
483 for block_hash, block in itertools.islice(get_blocks(current_work.value['previous_block']), 10):
484 if block_hash == self.seen_at_block:
486 for tx in block['txs']:
487 mentions = set([bitcoin.data.tx_hash(tx)] + [tx_in['previous_output']['hash'] for tx_in in tx['tx_ins']])
488 if mentions & self.mentions:
493 tx_pool[bitcoin.data.tx_hash(tx)] = Tx(tx, current_work.value['previous_block'])
494 factory.new_tx.watch(new_tx)
496 def new_block(block):
498 factory.new_block.watch(new_block)
500 print 'Started successfully!'
505 traceback.print_exc()
510 parser = argparse.ArgumentParser(description='p2pool (version %s)' % (__version__,))
511 parser.add_argument('--version', action='version', version=__version__)
512 parser.add_argument('--testnet',
513 help='use the testnet; make sure you change the ports too',
514 action='store_true', default=False, dest='testnet')
515 parser.add_argument('--store-shares', metavar='FILENAME',
516 help='write shares to a database (not needed for normal usage)',
517 type=str, action='append', default=[], dest='store_shares')
519 p2pool_group = parser.add_argument_group('p2pool interface')
520 p2pool_group.add_argument('--p2pool-port', metavar='PORT',
521 help='use TCP port PORT to listen for connections (default: 9333 normally, 19333 for testnet) (forward this port from your router!)',
522 type=int, action='store', default=None, dest='p2pool_port')
523 p2pool_group.add_argument('-n', '--p2pool-node', metavar='ADDR[:PORT]',
524 help='connect to existing p2pool node at ADDR listening on TCP port PORT (defaults to 9333 normally, 19333 for testnet), in addition to builtin addresses',
525 type=str, action='append', default=[], dest='p2pool_nodes')
526 parser.add_argument('-l', '--low-bandwidth',
527 help='trade lower bandwidth usage for higher latency (reduced efficiency)',
528 action='store_true', default=False, dest='low_bandwidth')
530 worker_group = parser.add_argument_group('worker interface')
531 worker_group.add_argument('-w', '--worker-port', metavar='PORT',
532 help='listen on PORT for RPC connections from miners asking for work and providing responses (default: 9332)',
533 type=int, action='store', default=9332, dest='worker_port')
535 bitcoind_group = parser.add_argument_group('bitcoind interface')
536 bitcoind_group.add_argument('--bitcoind-address', metavar='BITCOIND_ADDRESS',
537 help='connect to a bitcoind at this address (default: 127.0.0.1)',
538 type=str, action='store', default='127.0.0.1', dest='bitcoind_address')
539 bitcoind_group.add_argument('--bitcoind-rpc-port', metavar='BITCOIND_RPC_PORT',
540 help='connect to a bitcoind at this port over the RPC interface - used to get the current highest block via getwork (default: 8332)',
541 type=int, action='store', default=8332, dest='bitcoind_rpc_port')
542 bitcoind_group.add_argument('--bitcoind-p2p-port', metavar='BITCOIND_P2P_PORT',
543 help='connect to a bitcoind at this port over the p2p interface - used to submit blocks and get the pubkey to generate to via an IP transaction (default: 8333 normally. 18333 for testnet)',
544 type=int, action='store', default=None, dest='bitcoind_p2p_port')
546 bitcoind_group.add_argument(metavar='BITCOIND_RPC_USERNAME',
547 help='bitcoind RPC interface username',
548 type=str, action='store', dest='bitcoind_rpc_username')
549 bitcoind_group.add_argument(metavar='BITCOIND_RPC_PASSWORD',
550 help='bitcoind RPC interface password',
551 type=str, action='store', dest='bitcoind_rpc_password')
553 args = parser.parse_args()
555 if args.bitcoind_p2p_port is None:
556 args.bitcoind_p2p_port = {False: 8333, True: 18333}[args.testnet]
558 if args.p2pool_port is None:
559 args.p2pool_port = {False: 9333, True: 19333}[args.testnet]
561 reactor.callWhenRunning(main, args)