3 from __future__ import division
14 from twisted.internet import defer, reactor
15 from twisted.web import server
16 from twisted.python import log
18 import bitcoin.p2p, bitcoin.getwork, bitcoin.data
19 from util import db, expiring_dict, jsonrpc, variable, deferral, math, skiplist
20 from . import p2p, worker_interface
21 import p2pool.data as p2pool
22 import p2pool as p2pool_init
24 @deferral.retry('Error getting work from bitcoind:', 3)
25 @defer.inlineCallbacks
26 def getwork(bitcoind):
27 # a block could arrive in between these two queries
28 getwork_df, height_df = bitcoind.rpc_getwork(), bitcoind.rpc_getblocknumber()
30 getwork, height = bitcoin.getwork.BlockAttempt.from_getwork((yield getwork_df)), (yield height_df)
32 # get rid of residual errors
33 getwork_df.addErrback(lambda fail: None)
34 height_df.addErrback(lambda fail: None)
35 defer.returnValue((getwork, height))
37 @deferral.retry('Error getting payout script from bitcoind:', 1)
38 @defer.inlineCallbacks
39 def get_payout_script(factory):
40 res = yield (yield factory.getProtocol()).check_order(order=bitcoin.p2p.Protocol.null_order)
41 if res['reply'] == 'success':
42 my_script = res['script']
43 elif res['reply'] == 'denied':
46 raise ValueError('Unexpected reply: %r' % (res,))
48 @deferral.retry('Error creating payout script:', 10)
49 @defer.inlineCallbacks
50 def get_payout_script2(bitcoind, net):
51 defer.returnValue(bitcoin.data.pubkey_hash_to_script2(bitcoin.data.address_to_pubkey_hash((yield bitcoind.rpc_getaccountaddress('p2pool')), net)))
53 @defer.inlineCallbacks
56 print 'p2pool (version %s)' % (p2pool_init.__version__,)
59 # connect to bitcoind over JSON-RPC and do initial getwork
60 url = 'http://%s:%i/' % (args.bitcoind_address, args.bitcoind_rpc_port)
61 print '''Testing bitcoind RPC connection to '%s' with username '%s'...''' % (url, args.bitcoind_rpc_username)
62 bitcoind = jsonrpc.Proxy(url, (args.bitcoind_rpc_username, args.bitcoind_rpc_password))
63 temp_work, temp_height = yield getwork(bitcoind)
65 print ' Current block hash: %x height: %i' % (temp_work.previous_block, temp_height)
68 # connect to bitcoind over bitcoin-p2p and do checkorder to get pubkey to send payouts to
69 print '''Testing bitcoind P2P connection to '%s:%s'...''' % (args.bitcoind_address, args.bitcoind_p2p_port)
70 factory = bitcoin.p2p.ClientFactory(args.net)
71 reactor.connectTCP(args.bitcoind_address, args.bitcoind_p2p_port, factory)
72 my_script = yield get_payout_script(factory)
73 if args.pubkey_hash is None:
75 print 'IP transaction denied ... falling back to sending to address.'
76 my_script = yield get_payout_script2(bitcoind, args.net)
78 my_script = bitcoin.data.pubkey_hash_to_script2(args.pubkey_hash)
80 print ' Payout script:', my_script.encode('hex')
83 ht = bitcoin.p2p.HeightTracker(factory)
85 tracker = p2pool.OkayTracker(args.net)
86 chains = expiring_dict.ExpiringDict(300)
87 def get_chain(chain_id_data):
88 return chains.setdefault(chain_id_data, Chain(chain_id_data))
90 peer_heads = expiring_dict.ExpiringDict(300) # hash -> peers that know of it
92 # information affecting work that should trigger a long-polling update
93 current_work = variable.Variable(None)
94 # information affecting work that should not trigger a long-polling update
95 current_work2 = variable.Variable(None)
97 work_updated = variable.Event()
98 tracker_updated = variable.Event()
100 requested = expiring_dict.ExpiringDict(300)
102 @defer.inlineCallbacks
103 def set_real_work1():
104 work, height = yield getwork(bitcoind)
105 # XXX call tracker_updated
106 current_work.set(dict(
107 version=work.version,
108 previous_block=work.previous_block,
111 best_share_hash=current_work.value['best_share_hash'] if current_work.value is not None else None,
113 current_work2.set(dict(
114 clock_offset=time.time() - work.timestamp,
117 @defer.inlineCallbacks
118 def set_real_work2():
119 best, desired = yield tracker.think(ht, current_work.value['previous_block'], time.time() - current_work2.value['clock_offset'])
121 t = dict(current_work.value)
122 t['best_share_hash'] = best
125 for peer2, share_hash in desired:
126 last_request_time, count = requested.get(share_hash, (None, 0))
127 if last_request_time is not None and last_request_time - 5 < time.time() < last_request_time + 10 * 1.5**count:
129 potential_peers = set()
130 for head in tracker.tails[share_hash]:
131 potential_peers.update(peer_heads.get(head, set()))
132 potential_peers = [peer for peer in potential_peers if peer.connected2]
133 if count == 0 and peer2 is not None and peer2.connected2:
136 peer = random.choice(potential_peers) if potential_peers and random.random() > .2 else peer2
140 print 'Requesting parent share %x from %s' % (share_hash % 2**32, '%s:%i' % peer.addr)
144 stops=list(set(tracker.heads) | set(
145 tracker.get_nth_parent_hash(head, min(max(0, tracker.get_height_and_last(head)[0] - 1), 10)) for head in tracker.heads
148 requested[share_hash] = time.time(), count + 1
150 print 'Initializing work...'
151 yield set_real_work1()
152 yield set_real_work2()
155 start_time = time.time() - current_work2.value['clock_offset']
157 # setup p2p logic and join p2pool network
159 def share_share(share, ignore_peer=None):
160 for peer in p2p_node.peers.itervalues():
161 if peer is ignore_peer:
163 peer.send_shares([share])
166 def p2p_shares(shares, peer=None):
168 print "Processing %i shares..." % (len(shares),)
172 if share.hash in tracker.shares:
173 #print 'Got duplicate share, ignoring. Hash: %x' % (share.hash % 2**32,)
177 #print 'Received share %x from %r' % (share.hash % 2**32, share.peer.addr if share.peer is not None else None)
180 #for peer2, share_hash in desired:
181 # print 'Requesting parent share %x' % (share_hash,)
182 # peer2.send_getshares(hashes=[share_hash], parents=2000)
184 if share.bitcoin_hash <= share.header['target']:
186 print 'GOT BLOCK! Passing to bitcoind! %x bitcoin: %x' % (share.hash % 2**32, share.bitcoin_hash,)
188 if factory.conn.value is not None:
189 factory.conn.value.send_block(block=share.as_block(tracker, args.net))
191 print 'No bitcoind connection! Erp!'
193 if shares and peer is not None:
194 peer_heads.setdefault(shares[0].hash, set()).add(peer)
197 tracker_updated.happened()
200 print "... done processing %i shares." % (len(shares),)
202 def p2p_share_hashes(share_hashes, peer):
204 for share_hash in share_hashes:
205 if share_hash in tracker.shares:
206 pass # print 'Got share hash, already have, ignoring. Hash: %x' % (share_hash % 2**32,)
208 print 'Got share hash, requesting! Hash: %x' % (share_hash % 2**32,)
209 get_hashes.append(share_hash)
211 if share_hashes and peer is not None:
212 peer_heads.setdefault(share_hashes[0], set()).add(peer)
214 peer.send_getshares(hashes=get_hashes, parents=0, stops=[])
216 def p2p_get_shares(share_hashes, parents, stops, peer):
217 parents = min(parents, 1000//len(share_hashes))
220 for share_hash in share_hashes:
221 for share in itertools.islice(tracker.get_chain_known(share_hash), parents + 1):
222 if share.hash in stops:
225 peer.send_shares(shares, full=True)
227 print 'Joining p2pool network using TCP port %i...' % (args.p2pool_port,)
231 ip, port = x.split(':')
234 return x, args.net.P2P_PORT
237 ('72.14.191.28', args.net.P2P_PORT),
238 ('62.204.197.159', args.net.P2P_PORT),
241 nodes.append(((yield reactor.resolve('p2pool.forre.st')), args.net.P2P_PORT))
243 log.err(None, 'Error resolving bootstrap node IP:')
246 current_work=current_work,
247 port=args.p2pool_port,
249 addr_store=db.SQLiteDict(sqlite3.connect(os.path.join(os.path.dirname(sys.argv[0]), 'addrs.dat'), isolation_level=None), args.net.ADDRS_TABLE),
250 mode=0 if args.low_bandwidth else 1,
251 preferred_addrs=map(parse, args.p2pool_nodes) + nodes,
253 p2p_node.handle_shares = p2p_shares
254 p2p_node.handle_share_hashes = p2p_share_hashes
255 p2p_node.handle_get_shares = p2p_get_shares
259 # send share when the chain changes to their chain
260 def work_changed(new_work):
261 #print 'Work changed:', new_work
262 for share in tracker.get_chain_known(new_work['best_share_hash']):
265 share_share(share, share.peer)
266 current_work.changed.watch(work_changed)
271 # start listening for workers with a JSON-RPC server
273 print 'Listening for workers on port %i...' % (args.worker_port,)
277 merkle_root_to_transactions = expiring_dict.ExpiringDict(300)
278 run_identifier = struct.pack('<Q', random.randrange(2**64))
280 def compute(state, all_targets):
281 if state['best_share_hash'] is None:
282 raise jsonrpc.Error(-12345, u"p2pool is downloading shares")
283 pre_extra_txs = [tx for tx in tx_pool.itervalues() if tx.is_good()]
284 pre_extra_txs = pre_extra_txs[:2**16 - 1] # merkle_branch limit
287 for tx in pre_extra_txs:
288 this_size = len(bitcoin.data.tx_type.pack(tx.tx))
289 if size + this_size > 500000:
294 # XXX assuming generate_tx is smallish here..
295 generate_tx = p2pool.generate_transaction(
297 previous_share_hash=state['best_share_hash'],
298 new_script=my_script,
299 subsidy=(50*100000000 >> (state['height'] + 1)//210000) + sum(tx.value_in - tx.value_out for tx in extra_txs),
300 nonce=run_identifier + struct.pack('<Q', random.randrange(2**64)),
301 block_target=state['target'],
304 print 'Generating! Difficulty: %.06f Payout if block: %.6f BTC' % (0xffff*2**208/p2pool.coinbase_type.unpack(generate_tx['tx_ins'][0]['script'])['share_data']['target'], generate_tx['tx_outs'][-1]['value']*1e-8)
305 #print 'Target: %x' % (p2pool.coinbase_type.unpack(generate_tx['tx_ins'][0]['script'])['share_data']['target'],)
306 #, have', shares.count(my_script) - 2, 'share(s) in the current chain. Fee:', sum(tx.value_in - tx.value_out for tx in extra_txs)/100000000
307 transactions = [generate_tx] + [tx.tx for tx in extra_txs]
308 merkle_root = bitcoin.data.merkle_hash(transactions)
309 merkle_root_to_transactions[merkle_root] = transactions # will stay for 1000 seconds
311 timestamp = int(time.time() - current_work2.value['clock_offset'])
312 if state['best_share_hash'] is not None:
313 timestamp2 = math.median((s.timestamp for s in itertools.islice(tracker.get_chain_to_root(state['best_share_hash']), 11)), use_float=False) + 1
314 if timestamp2 > timestamp:
315 print 'Toff', timestamp2 - timestamp
316 timestamp = timestamp2
317 target2 = p2pool.coinbase_type.unpack(generate_tx['tx_ins'][0]['script'])['share_data']['target']
319 target2 = min(2**256//2**32 - 1, target2)
320 times[p2pool.coinbase_type.unpack(generate_tx['tx_ins'][0]['script'])['share_data']['nonce']] = time.time()
321 #print 'SENT', 2**256//p2pool.coinbase_type.unpack(generate_tx['tx_ins'][0]['script'])['share_data']['target']
322 return bitcoin.getwork.BlockAttempt(state['version'], state['previous_block'], merkle_root, timestamp, state['target'], target2)
327 def got_response(data):
329 # match up with transactions
330 header = bitcoin.getwork.decode_data(data)
331 transactions = merkle_root_to_transactions.get(header['merkle_root'], None)
332 if transactions is None:
333 print '''Couldn't link returned work's merkle root with its transactions - should only happen if you recently restarted p2pool'''
335 block = dict(header=header, txs=transactions)
336 hash_ = bitcoin.data.block_header_type.hash256(block['header'])
337 if hash_ <= block['header']['target'] or p2pool_init.DEBUG:
339 print 'GOT BLOCK! Passing to bitcoind! %x' % (hash_,)
341 if factory.conn.value is not None:
342 factory.conn.value.send_block(block=block)
344 print 'No bitcoind connection! Erp!'
345 target = p2pool.coinbase_type.unpack(transactions[0]['tx_ins'][0]['script'])['share_data']['target']
347 print 'Received invalid share from worker - %x/%x' % (hash_, target)
349 share = p2pool.Share.from_block(block)
350 my_shares.add(share.hash)
351 print 'GOT SHARE! %x prev %x age %.2fs' % (share.hash % 2**32, 0 if share.previous_hash is None else share.previous_hash % 2**32, time.time() - times[share.nonce]) + (" DEAD ON ARRIVAL" if share.previous_hash != current_work.value['best_share_hash'] else "")
352 good = share.previous_hash == current_work.value['best_share_hash']
353 # maybe revert back to tracker being non-blocking so 'good' can be more accurate?
355 # eg. good = share.hash == current_work.value['best_share_hash'] here
358 log.err(None, 'Error processing data received from worker:')
362 if current_work.value['best_share_hash'] is not None:
363 height, last = tracker.get_height_and_last(current_work.value['best_share_hash'])
364 att_s = p2pool.get_pool_attempts_per_second(tracker, current_work.value['best_share_hash'], args.net)
367 reactor.listenTCP(args.worker_port, server.Site(worker_interface.WorkerInterface(current_work, compute, got_response, get_rate)))
374 tx_pool = expiring_dict.ExpiringDict(600, get_touches=False) # hash -> tx
375 get_raw_transaction = deferral.DeferredCacher(lambda tx_hash: bitcoind.rpc_getrawtransaction('%x' % tx_hash), expiring_dict.ExpiringDict(100))
378 def __init__(self, tx, seen_at_block):
379 self.hash = bitcoin.data.tx_type.hash256(tx)
381 self.seen_at_block = seen_at_block
382 self.mentions = set([bitcoin.data.tx_type.hash256(tx)] + [tx_in['previous_output']['hash'] for tx_in in tx['tx_ins']])
384 #print '%x %r' % (seen_at_block, tx)
385 #for mention in self.mentions:
386 # print '%x' % mention
388 self.parents_all_in_blocks = False
391 self.value_out = sum(txout['value'] for txout in self.tx['tx_outs'])
392 self._find_parents_in_blocks()
394 @defer.inlineCallbacks
395 def _find_parents_in_blocks(self):
396 for tx_in in self.tx['tx_ins']:
398 raw_transaction = yield get_raw_transaction(tx_in['previous_output']['hash'])
401 self.value_in += raw_transaction['tx']['txouts'][tx_in['previous_output']['index']]['value']
402 #print raw_transaction
403 if not raw_transaction['parent_blocks']:
405 self.parents_all_in_blocks = True
408 if not self.parents_all_in_blocks:
414 @defer.inlineCallbacks
417 assert isinstance(tx_hash, (int, long))
418 #print "REQUESTING", tx_hash
419 tx = yield (yield factory.getProtocol()).get_tx(tx_hash)
421 tx_pool[bitcoin.data.tx_type.hash256(tx)] = Tx(tx, current_work.value['previous_block'])
423 log.err(None, 'Error handling tx:')
424 # disable for now, for testing impact on stales
425 #factory.new_tx.watch(new_tx)
427 def new_block(block_hash):
428 work_updated.happened()
429 factory.new_block.watch(new_block)
431 print 'Started successfully!'
434 @defer.inlineCallbacks
437 flag = work_updated.get_deferred()
439 yield set_real_work1()
442 yield defer.DeferredList([flag, deferral.sleep(random.expovariate(1/1))], fireOnOneCallback=True)
444 @defer.inlineCallbacks
447 flag = tracker_updated.get_deferred()
449 yield set_real_work2()
452 yield defer.DeferredList([flag, deferral.sleep(random.expovariate(1/1))], fireOnOneCallback=True)
457 counter = skiplist.CountsSkipList(tracker, my_script, run_identifier)
460 yield deferral.sleep(random.expovariate(1/1))
462 if current_work.value['best_share_hash'] is not None:
463 height, last = tracker.get_height_and_last(current_work.value['best_share_hash'])
465 att_s = p2pool.get_pool_attempts_per_second(tracker, current_work.value['best_share_hash'], args.net)
466 weights, total_weight = tracker.get_cumulative_weights(current_work.value['best_share_hash'], min(height, 120), 2**100)
467 count = counter(current_work.value['best_share_hash'], height, 2**100)
468 print 'Pool: %sH/s in %i shares Recent: %.02f%% >%sH/s Shares: %i (%i stale)' % (
471 weights.get(my_script, 0)/total_weight*100,
472 math.format(weights.get(my_script, 0)/total_weight*att_s),
474 len(my_shares) - count,
476 #weights, total_weight = tracker.get_cumulative_weights(current_work.value['best_share_hash'], min(height, 100), 2**100)
477 #for k, v in weights.iteritems():
478 # print k.encode('hex'), v/total_weight
482 log.err(None, 'Fatal error:')
486 parser = argparse.ArgumentParser(description='p2pool (version %s)' % (p2pool_init.__version__,))
487 parser.add_argument('--version', action='version', version=p2pool_init.__version__)
488 parser.add_argument('--testnet',
489 help='use the testnet',
490 action='store_const', const=p2pool.Testnet, default=p2pool.Mainnet, dest='net')
491 parser.add_argument('--debug',
492 help='debugging mode',
493 action='store_const', const=True, default=False, dest='debug')
494 parser.add_argument('-a', '--address',
495 help='generate to this address (defaults to requesting one from bitcoind)',
496 type=str, action='store', default=None, dest='address')
498 p2pool_group = parser.add_argument_group('p2pool interface')
499 p2pool_group.add_argument('--p2pool-port', metavar='PORT',
500 help='use TCP port PORT to listen for connections (default: 9333 normally, 19333 for testnet) (forward this port from your router!)',
501 type=int, action='store', default=None, dest='p2pool_port')
502 p2pool_group.add_argument('-n', '--p2pool-node', metavar='ADDR[:PORT]',
503 help='connect to existing p2pool node at ADDR listening on TCP port PORT (defaults to 9333 normally, 19333 for testnet), in addition to builtin addresses',
504 type=str, action='append', default=[], dest='p2pool_nodes')
505 parser.add_argument('-l', '--low-bandwidth',
506 help='trade lower bandwidth usage for higher latency (reduced efficiency)',
507 action='store_true', default=False, dest='low_bandwidth')
509 worker_group = parser.add_argument_group('worker interface')
510 worker_group.add_argument('-w', '--worker-port', metavar='PORT',
511 help='listen on PORT for RPC connections from miners asking for work and providing responses (default: 9332)',
512 type=int, action='store', default=9332, dest='worker_port')
514 bitcoind_group = parser.add_argument_group('bitcoind interface')
515 bitcoind_group.add_argument('--bitcoind-address', metavar='BITCOIND_ADDRESS',
516 help='connect to a bitcoind at this address (default: 127.0.0.1)',
517 type=str, action='store', default='127.0.0.1', dest='bitcoind_address')
518 bitcoind_group.add_argument('--bitcoind-rpc-port', metavar='BITCOIND_RPC_PORT',
519 help='connect to a bitcoind at this port over the RPC interface - used to get the current highest block via getwork (default: 8332)',
520 type=int, action='store', default=8332, dest='bitcoind_rpc_port')
521 bitcoind_group.add_argument('--bitcoind-p2p-port', metavar='BITCOIND_P2P_PORT',
522 help='connect to a bitcoind at this port over the p2p interface - used to submit blocks and get the pubkey to generate to via an IP transaction (default: 8333 normally. 18333 for testnet)',
523 type=int, action='store', default=None, dest='bitcoind_p2p_port')
525 bitcoind_group.add_argument(metavar='BITCOIND_RPC_USERNAME',
526 help='bitcoind RPC interface username',
527 type=str, action='store', dest='bitcoind_rpc_username')
528 bitcoind_group.add_argument(metavar='BITCOIND_RPC_PASSWORD',
529 help='bitcoind RPC interface password',
530 type=str, action='store', dest='bitcoind_rpc_password')
532 args = parser.parse_args()
535 p2pool_init.DEBUG = True
536 class TimestampingPipe(object):
537 def __init__(self, inner_file):
538 self.inner_file = inner_file
541 def write(self, data):
542 buf = self.buf + data
543 lines = buf.split('\n')
544 for line in lines[:-1]:
545 self.inner_file.write("%s %s\n" % (time.strftime("%H:%M:%S"), line))
548 self.inner_file.flush()
549 sys.stdout = TimestampingPipe(sys.stdout)
550 sys.stderr = TimestampingPipe(sys.stderr)
551 log.DefaultObserver.stderr = sys.stderr
553 if args.bitcoind_p2p_port is None:
554 args.bitcoind_p2p_port = args.net.BITCOIN_P2P_PORT
556 if args.p2pool_port is None:
557 args.p2pool_port = args.net.P2P_PORT
559 if args.address is not None:
561 args.pubkey_hash = bitcoin.data.address_to_pubkey_hash(args.address, args.net)
563 raise ValueError("error parsing address: " + repr(e))
565 args.pubkey_hash = None
567 reactor.callWhenRunning(main, args)