3 from __future__ import division
15 from twisted.internet import defer, reactor
16 from twisted.web import server
18 import bitcoin.p2p, bitcoin.getwork, bitcoin.data
19 from util import db, expiring_dict, jsonrpc, variable, deferral, math
20 from . import p2p, worker_interface
21 import p2pool.data as p2pool
24 __version__ = subprocess.Popen(['svnversion', os.path.dirname(sys.argv[0])], stdout=subprocess.PIPE).stdout.read().strip()
26 __version__ = 'unknown'
29 def __init__(self, chain_id_data):
31 self.chain_id_data = chain_id_data
32 self.last_p2pool_block_hash = p2pool.chain_id_type.unpack(chain_id_data)['last_p2pool_block_hash']
34 self.share2s = {} # hash -> share2
35 self.highest = variable.Variable(None) # hash
37 self.requesting = set()
40 def accept(self, share, net):
41 if share.chain_id_data != self.chain_id_data:
42 raise ValueError('share does not belong to this chain')
44 if share.hash in self.share2s:
47 if share.previous_share_hash is None:
48 previous_height, previous_share2 = -1, None
49 elif share.previous_share_hash not in self.share2s:
52 previous_share2 = self.share2s[share.previous_share_hash]
53 previous_height = previous_share2.height
55 height = previous_height + 1
57 share2 = share.check(self, height, previous_share2, net) # raises exceptions
59 if share2.share is not share:
62 self.share2s[share.hash] = share2
64 if self.highest.value is None or height > self.share2s[self.highest.value].height:
65 self.highest.set(share.hash)
69 def get_highest_share2(self):
70 return self.share2s[self.highest.value] if self.highest.value is not None else None
72 def get_down(self, share_hash):
76 blocks.append(share_hash)
77 if share_hash not in self.share2s:
79 share2 = self.share2s[share_hash]
80 if share2.share.previous_share_hash is None:
82 share_hash = share2.share.previous_share_hash
86 @defer.inlineCallbacks
87 def get_last_p2pool_block_hash(current_block_hash, get_block, net):
88 block_hash = current_block_hash
90 if block_hash == net.ROOT_BLOCK:
91 defer.returnValue(block_hash)
93 block = yield get_block(block_hash)
97 coinbase_data = block['txs'][0]['tx_ins'][0]['script']
99 coinbase = p2pool.coinbase_type.unpack(coinbase_data)
100 except bitcoin.data.EarlyEnd:
104 if coinbase['identifier'] == net.IDENTIFIER:
106 for tx_out in block['txs'][0]['tx_outs']:
107 payouts[tx_out['script']] = payouts.get(tx_out['script'], 0) + tx_out['value']
108 subsidy = sum(payouts.itervalues())
109 if coinbase['subsidy'] == subsidy:
110 if payouts.get(net.SCRIPT, 0) >= subsidy//64:
111 defer.returnValue(block_hash)
114 print 'Error matching block:'
115 print 'block:', block
116 traceback.print_exc()
118 block_hash = block['header']['previous_block']
120 @deferral.retry('Error getting work from bitcoind:', 1)
121 @defer.inlineCallbacks
122 def getwork(bitcoind):
123 # a block could arrive in between these two queries
124 getwork_df, height_df = bitcoind.rpc_getwork(), bitcoind.rpc_getblocknumber()
126 getwork, height = bitcoin.getwork.BlockAttempt.from_getwork((yield getwork_df)), (yield height_df)
128 # get rid of residual errors
129 getwork_df.addErrback(lambda fail: None)
130 height_df.addErrback(lambda fail: None)
131 defer.returnValue((getwork, height))
133 @defer.inlineCallbacks
134 def get_payout_script(factory):
137 res = yield (yield factory.getProtocol()).check_order(order=bitcoin.p2p.Protocol.null_order)
138 if res['reply'] != 'success':
140 print 'Error getting payout script:'
144 my_script = res['script']
147 print 'Error getting payout script:'
148 traceback.print_exc()
151 defer.returnValue(my_script)
152 yield deferral.sleep(1)
154 @defer.inlineCallbacks
157 print 'p2pool (version %s)' % (__version__,)
160 # connect to bitcoind over JSON-RPC and do initial getwork
161 url = 'http://%s:%i/' % (args.bitcoind_address, args.bitcoind_rpc_port)
162 print "Testing bitcoind RPC connection to '%s' with authorization '%s:%s'..." % (url, args.bitcoind_rpc_username, args.bitcoind_rpc_password)
163 bitcoind = jsonrpc.Proxy(url, (args.bitcoind_rpc_username, args.bitcoind_rpc_password))
164 work, height = yield getwork(bitcoind)
166 print ' Current block hash: %x height: %i' % (work.previous_block, height)
169 # connect to bitcoind over bitcoin-p2p and do checkorder to get pubkey to send payouts to
170 print "Testing bitcoind P2P connection to '%s:%s'..." % (args.bitcoind_address, args.bitcoind_p2p_port)
171 factory = bitcoin.p2p.ClientFactory(args.net)
172 reactor.connectTCP(args.bitcoind_address, args.bitcoind_p2p_port, factory)
173 my_script = yield get_payout_script(factory)
175 print ' Payout script:', my_script.encode('hex')
178 @defer.inlineCallbacks
179 def real_get_block(block_hash):
180 block = yield (yield factory.getProtocol()).get_block(block_hash)
181 print 'Got block %x' % (block_hash,)
182 defer.returnValue(block)
183 get_block = deferral.DeferredCacher(real_get_block, expiring_dict.ExpiringDict(3600))
185 get_raw_transaction = deferral.DeferredCacher(lambda tx_hash: bitcoind.rpc_getrawtransaction('%x' % tx_hash), expiring_dict.ExpiringDict(100))
187 ht = bitcoin.p2p.HeightTracker(factory)
189 tracker = p2pool.OkayTracker(args.net)
190 chains = expiring_dict.ExpiringDict(300)
191 def get_chain(chain_id_data):
192 return chains.setdefault(chain_id_data, Chain(chain_id_data))
194 # information affecting work that should trigger a long-polling update
195 current_work = variable.Variable(None)
196 # information affecting work that should not trigger a long-polling update
197 current_work2 = variable.Variable(None)
199 @defer.inlineCallbacks
201 work, height = yield getwork(bitcoind)
202 best, desired = tracker.think(ht)
204 current_work.set(dict(
205 version=work.version,
206 previous_block=work.previous_block,
211 best_share_hash=best,
213 current_work2.set(dict(
214 timestamp=work.timestamp,
217 print 'Initializing work...'
218 yield set_real_work()
221 # setup p2p logic and join p2pool network
223 def share_share(share, ignore_peer=None):
224 for peer in p2p_node.peers.itervalues():
225 if peer is ignore_peer:
227 peer.send_share(share)
230 def p2p_share(share, peer=None):
231 if share.hash in tracker.shares:
232 print 'Got duplicate share, ignoring. Hash: %x' % (share.hash,)
235 #print "Received share %x" % (share.hash,)
238 best, desired = tracker.think(ht)
239 for peer2, share_hash in desired:
240 print "Requesting parent share %x" % (share_hash,)
241 peer2.send_getshares(hashes=[share_hash])
243 if share.gentx is not None:
244 if share.hash <= share.header['target']:
246 print 'GOT BLOCK! Passing to bitcoind! %x' % (share.hash,)
248 if factory.conn.value is not None:
249 factory.conn.value.send_block(block=share.as_block())
251 print 'No bitcoind connection! Erp!'
253 w = dict(current_work.value)
254 w['best_share_hash'] = best
257 if best == share.hash:
258 print 'Accepted share, new highest, will pass to peers! Hash: %x' % (share.hash,)
260 print 'Accepted share, not highest. Hash: %x' % (share.hash,)
262 def p2p_share_hash(share_hash, peer):
263 if share_hash in tracker.shares:
264 print "Got share hash, already have, ignoring. Hash: %x" % (share_hash,)
266 print "Got share hash, requesting! Hash: %x" % (share_hash,)
267 peer.send_getshares(hashes=[share_hash])
269 def p2p_get_to_best(chain_id_data, have, peer):
270 chain = get_chain(chain_id_data)
271 if chain.highest.value is None:
274 chain_hashes = chain.get_down(chain.highest.value)
278 have2 |= set(chain.get_down(hash_))
280 for share_hash in reversed(chain_hashes):
281 if share_hash in have2:
283 peer.send_share(chain.share2s[share_hash].share, full=True) # doesn't have to be full ... but does that still guarantee ordering?
285 def p2p_get_shares(share_hashes, peer):
286 for share_hash in share_hashes:
287 if share_hash in tracker.shares:
288 peer.send_share(tracker.shares[share_hash], full=True)
290 print 'Joining p2pool network using TCP port %i...' % (args.p2pool_port,)
294 ip, port = x.split(':')
297 return x, args.net.P2P_PORT
300 ('72.14.191.28', args.net.P2P_PORT),
301 ('62.204.197.159', args.net.P2P_PORT),
304 nodes.append(((yield reactor.resolve('p2pool.forre.st')), args.net.P2P_PORT))
306 traceback.print_exc()
309 current_work=current_work,
310 port=args.p2pool_port,
312 addr_store=db.SQLiteDict(sqlite3.connect(os.path.join(os.path.dirname(sys.argv[0]), 'addrs.dat'), isolation_level=None), args.net.ADDRS_TABLE),
313 mode=0 if args.low_bandwidth else 1,
314 preferred_addrs=map(parse, args.p2pool_nodes) + nodes,
316 p2p_node.handle_share = p2p_share
317 p2p_node.handle_share_hash = p2p_share_hash
318 p2p_node.handle_get_to_best = p2p_get_to_best
319 p2p_node.handle_get_shares = p2p_get_shares
323 # send share when the chain changes to their chain
324 def work_changed(new_work):
325 #print 'Work changed:', new_work
326 for share in tracker.get_chain_known(new_work['best_share_hash']):
329 share_share(share, share.peer)
330 current_work.changed.watch(work_changed)
335 # start listening for workers with a JSON-RPC server
337 print 'Listening for workers on port %i...' % (args.worker_port,)
341 merkle_root_to_transactions = expiring_dict.ExpiringDict(300)
344 extra_txs = [tx for tx in tx_pool.itervalues() if tx.is_good()]
345 generate_tx = p2pool.generate_transaction(
347 previous_share_hash=state['best_share_hash'],
348 new_script=my_script,
349 subsidy=(50*100000000 >> state['height']//210000) + sum(tx.value_in - tx.value_out for tx in extra_txs),
350 nonce=struct.pack("<Q", random.randrange(2**64)),
351 block_target=state['target'],
354 print 'Generating!', 2**256//p2pool.coinbase_type.unpack(generate_tx['tx_ins'][0]['script'])['share_data']['target2']//1000000
355 print "Target: %x" % (p2pool.coinbase_type.unpack(generate_tx['tx_ins'][0]['script'])['share_data']['target2'],)
356 #, have', shares.count(my_script) - 2, 'share(s) in the current chain. Fee:', sum(tx.value_in - tx.value_out for tx in extra_txs)/100000000
357 transactions = [generate_tx] + [tx.tx for tx in extra_txs]
358 merkle_root = bitcoin.data.merkle_hash(transactions)
359 merkle_root_to_transactions[merkle_root] = transactions # will stay for 1000 seconds
361 timestamp = current_work2.value['timestamp']
362 if state['best_share_hash'] is not None:
363 timestamp2 = math.median((s.timestamp for s in itertools.islice(tracker.get_chain_to_root(state['best_share_hash']), 11)), use_float=False) + 1
364 if timestamp2 > timestamp:
365 print "Toff", timestamp2 - timestamp
366 timestamp = timestamp2
367 ba = bitcoin.getwork.BlockAttempt(state['version'], state['previous_block'], merkle_root, timestamp, state['target'])
368 #print "SENT", 2**256//p2pool.coinbase_type.unpack(generate_tx['tx_ins'][0]['script'])['share_data']['target2']
369 return ba.getwork(p2pool.coinbase_type.unpack(generate_tx['tx_ins'][0]['script'])['share_data']['target2'])
371 def got_response(data):
372 # match up with transactions
373 header = bitcoin.getwork.decode_data(data)
374 transactions = merkle_root_to_transactions.get(header['merkle_root'], None)
375 if transactions is None:
376 print "Couldn't link returned work's merkle root with its transactions - should only happen if you recently restarted p2pool"
378 block = dict(header=header, txs=transactions)
379 hash_ = bitcoin.data.block_header_type.hash256(block['header'])
380 if hash_ <= block['header']['target']:
382 print 'GOT BLOCK! Passing to bitcoind! %x' % (hash_,)
384 if factory.conn.value is not None:
385 factory.conn.value.send_block(block=block)
387 print 'No bitcoind connection! Erp!'
388 share = p2pool.Share.from_block(block)
389 print 'GOT SHARE! %x' % (share.hash,)
394 print 'Error processing data received from worker:'
395 traceback.print_exc()
401 reactor.listenTCP(args.worker_port, server.Site(worker_interface.WorkerInterface(current_work, compute, got_response)))
408 def get_blocks(start_hash):
411 block = get_block.call_now(start_hash)
412 except deferral.NotNowError:
414 yield start_hash, block
415 start_hash = block['header']['previous_block']
417 tx_pool = expiring_dict.ExpiringDict(600, get_touches=False) # hash -> tx
420 def __init__(self, tx, seen_at_block):
421 self.hash = bitcoin.data.tx_type.hash256(tx)
423 self.seen_at_block = seen_at_block
424 self.mentions = set([bitcoin.data.tx_type.hash256(tx)] + [tx_in['previous_output']['hash'] for tx_in in tx['tx_ins']])
426 #print "%x %r" % (seen_at_block, tx)
427 #for mention in self.mentions:
428 # print "%x" % mention
430 self.parents_all_in_blocks = False
433 self.value_out = sum(txout['value'] for txout in self.tx['tx_outs'])
434 self._find_parents_in_blocks()
436 @defer.inlineCallbacks
437 def _find_parents_in_blocks(self):
438 for tx_in in self.tx['tx_ins']:
440 raw_transaction = yield get_raw_transaction(tx_in['previous_output']['hash'])
443 self.value_in += raw_transaction['tx']['txouts'][tx_in['previous_output']['index']]['value']
444 #print raw_transaction
445 if not raw_transaction['parent_blocks']:
447 self.parents_all_in_blocks = True
450 if not self.parents_all_in_blocks:
457 for block_hash, block in itertools.islice(get_blocks(current_work.value['previous_block']), 10):
458 if block_hash == self.seen_at_block:
460 for tx in block['txs']:
461 mentions = set([bitcoin.data.tx_type.hash256(tx)] + [tx_in['previous_output']['hash'] for tx_in in tx['tx_ins']])
462 if mentions & self.mentions:
466 @defer.inlineCallbacks
469 assert isinstance(tx_hash, (int, long))
470 tx = yield (yield factory.getProtocol()).get_tx(tx_hash)
471 tx_pool[bitcoin.data.tx_type.hash256(tx)] = Tx(tx, current_work.value['previous_block'])
473 traceback.print_exc()
474 factory.new_tx.watch(new_tx)
476 def new_block(block):
478 factory.new_block.watch(new_block)
480 print 'Started successfully!'
484 yield deferral.sleep(1)
489 traceback.print_exc()
494 parser = argparse.ArgumentParser(description='p2pool (version %s)' % (__version__,))
495 parser.add_argument('--version', action='version', version=__version__)
496 parser.add_argument('--testnet',
497 help='use the testnet; make sure you change the ports too',
498 action='store_const', const=p2pool.Testnet, default=p2pool.Mainnet, dest='net')
500 p2pool_group = parser.add_argument_group('p2pool interface')
501 p2pool_group.add_argument('--p2pool-port', metavar='PORT',
502 help='use TCP port PORT to listen for connections (default: 9333 normally, 19333 for testnet) (forward this port from your router!)',
503 type=int, action='store', default=None, dest='p2pool_port')
504 p2pool_group.add_argument('-n', '--p2pool-node', metavar='ADDR[:PORT]',
505 help='connect to existing p2pool node at ADDR listening on TCP port PORT (defaults to 9333 normally, 19333 for testnet), in addition to builtin addresses',
506 type=str, action='append', default=[], dest='p2pool_nodes')
507 parser.add_argument('-l', '--low-bandwidth',
508 help='trade lower bandwidth usage for higher latency (reduced efficiency)',
509 action='store_true', default=False, dest='low_bandwidth')
511 worker_group = parser.add_argument_group('worker interface')
512 worker_group.add_argument('-w', '--worker-port', metavar='PORT',
513 help='listen on PORT for RPC connections from miners asking for work and providing responses (default: 9332)',
514 type=int, action='store', default=9332, dest='worker_port')
516 bitcoind_group = parser.add_argument_group('bitcoind interface')
517 bitcoind_group.add_argument('--bitcoind-address', metavar='BITCOIND_ADDRESS',
518 help='connect to a bitcoind at this address (default: 127.0.0.1)',
519 type=str, action='store', default='127.0.0.1', dest='bitcoind_address')
520 bitcoind_group.add_argument('--bitcoind-rpc-port', metavar='BITCOIND_RPC_PORT',
521 help='connect to a bitcoind at this port over the RPC interface - used to get the current highest block via getwork (default: 8332)',
522 type=int, action='store', default=8332, dest='bitcoind_rpc_port')
523 bitcoind_group.add_argument('--bitcoind-p2p-port', metavar='BITCOIND_P2P_PORT',
524 help='connect to a bitcoind at this port over the p2p interface - used to submit blocks and get the pubkey to generate to via an IP transaction (default: 8333 normally. 18333 for testnet)',
525 type=int, action='store', default=None, dest='bitcoind_p2p_port')
527 bitcoind_group.add_argument(metavar='BITCOIND_RPC_USERNAME',
528 help='bitcoind RPC interface username',
529 type=str, action='store', dest='bitcoind_rpc_username')
530 bitcoind_group.add_argument(metavar='BITCOIND_RPC_PASSWORD',
531 help='bitcoind RPC interface password',
532 type=str, action='store', dest='bitcoind_rpc_password')
534 args = parser.parse_args()
536 if args.bitcoind_p2p_port is None:
537 args.bitcoind_p2p_port = args.net.BITCOIN_P2P_PORT
539 if args.p2pool_port is None:
540 args.p2pool_port = args.net.P2P_PORT
542 reactor.callWhenRunning(main, args)