3 from __future__ import division
15 from twisted.internet import defer, reactor
16 from twisted.web import server
17 from twisted.python import log
19 import bitcoin.p2p, bitcoin.getwork, bitcoin.data
20 from util import db, expiring_dict, jsonrpc, variable, deferral, math, skiplist
21 from . import p2p, worker_interface
22 import p2pool.data as p2pool
23 import p2pool as p2pool_init
25 @deferral.retry('Error getting work from bitcoind:', 3)
26 @defer.inlineCallbacks
27 def getwork(bitcoind):
28 # a block could arrive in between these two queries
29 getwork_df, height_df = bitcoind.rpc_getwork(), bitcoind.rpc_getblocknumber()
31 getwork, height = bitcoin.getwork.BlockAttempt.from_getwork((yield getwork_df)), (yield height_df)
33 # get rid of residual errors
34 getwork_df.addErrback(lambda fail: None)
35 height_df.addErrback(lambda fail: None)
36 defer.returnValue((getwork, height))
38 @deferral.retry('Error getting payout script from bitcoind:', 1)
39 @defer.inlineCallbacks
40 def get_payout_script(factory):
41 res = yield (yield factory.getProtocol()).check_order(order=bitcoin.p2p.Protocol.null_order)
42 if res['reply'] == 'success':
43 defer.returnValue(res['script'])
44 elif res['reply'] == 'denied':
45 defer.returnValue(None)
47 raise ValueError('Unexpected reply: %r' % (res,))
49 @deferral.retry('Error creating payout script:', 10)
50 @defer.inlineCallbacks
51 def get_payout_script2(bitcoind, net):
52 defer.returnValue(bitcoin.data.pubkey_hash_to_script2(bitcoin.data.address_to_pubkey_hash((yield bitcoind.rpc_getaccountaddress('p2pool')), net)))
54 @defer.inlineCallbacks
57 print 'p2pool (version %s)' % (p2pool_init.__version__,)
60 # connect to bitcoind over JSON-RPC and do initial getwork
61 url = 'http://%s:%i/' % (args.bitcoind_address, args.bitcoind_rpc_port)
62 print '''Testing bitcoind RPC connection to '%s' with username '%s'...''' % (url, args.bitcoind_rpc_username)
63 bitcoind = jsonrpc.Proxy(url, (args.bitcoind_rpc_username, args.bitcoind_rpc_password))
64 temp_work, temp_height = yield getwork(bitcoind)
66 print ' Current block hash: %x height: %i' % (temp_work.previous_block, temp_height)
69 # connect to bitcoind over bitcoin-p2p and do checkorder to get pubkey to send payouts to
70 print '''Testing bitcoind P2P connection to '%s:%s'...''' % (args.bitcoind_address, args.bitcoind_p2p_port)
71 factory = bitcoin.p2p.ClientFactory(args.net)
72 reactor.connectTCP(args.bitcoind_address, args.bitcoind_p2p_port, factory)
73 my_script = yield get_payout_script(factory)
74 if args.pubkey_hash is None:
76 print 'IP transaction denied ... falling back to sending to address.'
77 my_script = yield get_payout_script2(bitcoind, args.net)
79 my_script = bitcoin.data.pubkey_hash_to_script2(args.pubkey_hash)
81 print ' Payout script:', my_script.encode('hex')
84 ht = bitcoin.p2p.HeightTracker(factory)
86 tracker = p2pool.OkayTracker(args.net)
87 chains = expiring_dict.ExpiringDict(300)
88 def get_chain(chain_id_data):
89 return chains.setdefault(chain_id_data, Chain(chain_id_data))
91 peer_heads = expiring_dict.ExpiringDict(300) # hash -> peers that know of it
93 # information affecting work that should trigger a long-polling update
94 current_work = variable.Variable(None)
95 # information affecting work that should not trigger a long-polling update
96 current_work2 = variable.Variable(None)
98 work_updated = variable.Event()
99 tracker_updated = variable.Event()
101 requested = expiring_dict.ExpiringDict(300)
103 @defer.inlineCallbacks
104 def set_real_work1():
105 work, height = yield getwork(bitcoind)
106 # XXX call tracker_updated
107 current_work.set(dict(
108 version=work.version,
109 previous_block=work.previous_block,
112 best_share_hash=current_work.value['best_share_hash'] if current_work.value is not None else None,
114 current_work2.set(dict(
115 clock_offset=time.time() - work.timestamp,
118 @defer.inlineCallbacks
119 def set_real_work2():
120 best, desired = yield tracker.think(ht, current_work.value['previous_block'], time.time() - current_work2.value['clock_offset'])
122 t = dict(current_work.value)
123 t['best_share_hash'] = best
126 for peer2, share_hash in desired:
127 if share_hash not in tracker.tails: # was received in the time tracker.think was running
129 last_request_time, count = requested.get(share_hash, (None, 0))
130 if last_request_time is not None and last_request_time - 5 < time.time() < last_request_time + 10 * 1.5**count:
132 potential_peers = set()
133 for head in tracker.tails[share_hash]:
134 potential_peers.update(peer_heads.get(head, set()))
135 potential_peers = [peer for peer in potential_peers if peer.connected2]
136 if count == 0 and peer2 is not None and peer2.connected2:
139 peer = random.choice(potential_peers) if potential_peers and random.random() > .2 else peer2
143 print 'Requesting parent share %s from %s' % (p2pool.format_hash(share_hash), '%s:%i' % peer.addr)
147 stops=list(set(tracker.heads) | set(
148 tracker.get_nth_parent_hash(head, min(max(0, tracker.get_height_and_last(head)[0] - 1), 10)) for head in tracker.heads
151 requested[share_hash] = time.time(), count + 1
153 print 'Initializing work...'
154 yield set_real_work1()
155 yield set_real_work2()
158 start_time = time.time() - current_work2.value['clock_offset']
160 # setup p2p logic and join p2pool network
162 def share_share(share, ignore_peer=None):
163 for peer in p2p_node.peers.itervalues():
164 if peer is ignore_peer:
166 if p2pool_init.DEBUG:
167 print "Sending share %s to %r" % (p2pool.format_hash(share.hash), peer.addr)
168 peer.send_shares([share])
171 def p2p_shares(shares, peer=None):
173 print 'Processing %i shares...' % (len(shares),)
177 if share.hash in tracker.shares:
178 #print 'Got duplicate share, ignoring. Hash: %s' % (p2pool.format_hash(share.hash),)
182 #print 'Received share %s from %r' % (p2pool.format_hash(share.hash), share.peer.addr if share.peer is not None else None)
185 #for peer2, share_hash in desired:
186 # print 'Requesting parent share %x' % (share_hash,)
187 # peer2.send_getshares(hashes=[share_hash], parents=2000)
189 if share.bitcoin_hash <= share.header['target']:
191 print 'GOT BLOCK! Passing to bitcoind! %s bitcoin: %x' % (p2pool.format_hash(share.hash), share.bitcoin_hash,)
193 if factory.conn.value is not None:
194 factory.conn.value.send_block(block=share.as_block(tracker, args.net))
196 print 'No bitcoind connection! Erp!'
198 if shares and peer is not None:
199 peer_heads.setdefault(shares[0].hash, set()).add(peer)
202 tracker_updated.happened()
205 print '... done processing %i shares. Have: %i/~%i' % (len(shares), len(tracker.shares), 2*args.net.CHAIN_LENGTH)
207 def p2p_share_hashes(share_hashes, peer):
209 for share_hash in share_hashes:
210 if share_hash in tracker.shares:
211 pass # print 'Got share hash, already have, ignoring. Hash: %s' % (p2pool.format_hash(share_hash),)
213 print 'Got share hash, requesting! Hash: %s' % (p2pool.format_hash(share_hash),)
214 get_hashes.append(share_hash)
216 if share_hashes and peer is not None:
217 peer_heads.setdefault(share_hashes[0], set()).add(peer)
219 peer.send_getshares(hashes=get_hashes, parents=0, stops=[])
221 def p2p_get_shares(share_hashes, parents, stops, peer):
222 parents = min(parents, 1000//len(share_hashes))
225 for share_hash in share_hashes:
226 for share in itertools.islice(tracker.get_chain_known(share_hash), parents + 1):
227 if share.hash in stops:
230 peer.send_shares(shares, full=True)
232 print 'Joining p2pool network using TCP port %i...' % (args.p2pool_port,)
236 ip, port = x.split(':')
239 return x, args.net.P2P_PORT
242 ('72.14.191.28', args.net.P2P_PORT),
243 ('62.204.197.159', args.net.P2P_PORT),
246 nodes.append(((yield reactor.resolve('p2pool.forre.st')), args.net.P2P_PORT))
248 log.err(None, 'Error resolving bootstrap node IP:')
251 current_work=current_work,
252 port=args.p2pool_port,
254 addr_store=db.SQLiteDict(sqlite3.connect(os.path.join(os.path.dirname(sys.argv[0]), 'addrs.dat'), isolation_level=None), args.net.ADDRS_TABLE),
255 mode=0 if args.low_bandwidth else 1,
256 preferred_addrs=map(parse, args.p2pool_nodes) + nodes,
258 p2p_node.handle_shares = p2p_shares
259 p2p_node.handle_share_hashes = p2p_share_hashes
260 p2p_node.handle_get_shares = p2p_get_shares
264 # send share when the chain changes to their chain
265 def work_changed(new_work):
266 #print 'Work changed:', new_work
267 for share in tracker.get_chain_known(new_work['best_share_hash']):
270 share_share(share, share.peer)
271 current_work.changed.watch(work_changed)
276 # start listening for workers with a JSON-RPC server
278 print 'Listening for workers on port %i...' % (args.worker_port,)
282 merkle_root_to_transactions = expiring_dict.ExpiringDict(300)
283 run_identifier = struct.pack('<Q', random.randrange(2**64))
285 def compute(state, payout_script):
286 if payout_script is None:
287 payout_script = my_script
288 if state['best_share_hash'] is None and args.net.PERSIST:
289 raise jsonrpc.Error(-12345, u'p2pool is downloading shares')
290 pre_extra_txs = [tx for tx in tx_pool.itervalues() if tx.is_good()]
291 pre_extra_txs = pre_extra_txs[:2**16 - 1] # merkle_branch limit
294 for tx in pre_extra_txs:
295 this_size = len(bitcoin.data.tx_type.pack(tx.tx))
296 if size + this_size > 500000:
301 # XXX assuming generate_tx is smallish here..
302 generate_tx = p2pool.generate_transaction(
304 previous_share_hash=state['best_share_hash'],
305 new_script=payout_script,
306 subsidy=(50*100000000 >> (state['height'] + 1)//210000) + sum(tx.value_in - tx.value_out for tx in extra_txs),
307 nonce=run_identifier + struct.pack('<Q', random.randrange(2**64)),
308 block_target=state['target'],
311 print 'Generating! Difficulty: %.06f Payout if block: %.6f BTC' % (0xffff*2**208/p2pool.coinbase_type.unpack(generate_tx['tx_ins'][0]['script'])['share_data']['target'], generate_tx['tx_outs'][-1]['value']*1e-8)
312 #print 'Target: %x' % (p2pool.coinbase_type.unpack(generate_tx['tx_ins'][0]['script'])['share_data']['target'],)
313 #, have', shares.count(my_script) - 2, 'share(s) in the current chain. Fee:', sum(tx.value_in - tx.value_out for tx in extra_txs)/100000000
314 transactions = [generate_tx] + [tx.tx for tx in extra_txs]
315 merkle_root = bitcoin.data.merkle_hash(transactions)
316 merkle_root_to_transactions[merkle_root] = transactions # will stay for 1000 seconds
318 timestamp = int(time.time() - current_work2.value['clock_offset'])
319 if state['best_share_hash'] is not None:
320 timestamp2 = math.median((s.timestamp for s in itertools.islice(tracker.get_chain_to_root(state['best_share_hash']), 11)), use_float=False) + 1
321 if timestamp2 > timestamp:
322 print 'Toff', timestamp2 - timestamp
323 timestamp = timestamp2
324 target2 = p2pool.coinbase_type.unpack(generate_tx['tx_ins'][0]['script'])['share_data']['target']
325 times[p2pool.coinbase_type.unpack(generate_tx['tx_ins'][0]['script'])['share_data']['nonce']] = time.time()
326 #print 'SENT', 2**256//p2pool.coinbase_type.unpack(generate_tx['tx_ins'][0]['script'])['share_data']['target']
327 return bitcoin.getwork.BlockAttempt(state['version'], state['previous_block'], merkle_root, timestamp, state['target'], target2)
332 def got_response(data):
334 # match up with transactions
335 header = bitcoin.getwork.decode_data(data)
336 transactions = merkle_root_to_transactions.get(header['merkle_root'], None)
337 if transactions is None:
338 print '''Couldn't link returned work's merkle root with its transactions - should only happen if you recently restarted p2pool'''
340 block = dict(header=header, txs=transactions)
341 hash_ = bitcoin.data.block_header_type.hash256(block['header'])
342 if hash_ <= block['header']['target'] or p2pool_init.DEBUG:
344 print 'GOT BLOCK! Passing to bitcoind! bitcoin: %x' % (hash_,)
346 if factory.conn.value is not None:
347 factory.conn.value.send_block(block=block)
349 print 'No bitcoind connection! Erp!'
350 target = p2pool.coinbase_type.unpack(transactions[0]['tx_ins'][0]['script'])['share_data']['target']
352 print 'Received invalid share from worker - %x/%x' % (hash_, target)
354 share = p2pool.Share.from_block(block)
355 my_shares.add(share.hash)
356 print 'GOT SHARE! %s prev %s age %.2fs' % (p2pool.format_hash(share.hash), p2pool.format_hash(share.previous_hash), time.time() - times[share.nonce]) + (' DEAD ON ARRIVAL' if share.previous_hash != current_work.value['best_share_hash'] else '')
357 good = share.previous_hash == current_work.value['best_share_hash']
358 # maybe revert back to tracker being non-blocking so 'good' can be more accurate?
360 # eg. good = share.hash == current_work.value['best_share_hash'] here
363 log.err(None, 'Error processing data received from worker:')
367 if current_work.value['best_share_hash'] is not None:
368 height, last = tracker.get_height_and_last(current_work.value['best_share_hash'])
369 att_s = p2pool.get_pool_attempts_per_second(tracker, current_work.value['best_share_hash'], args.net, min(height, 720))
373 height, last = tracker.get_height_and_last(current_work.value['best_share_hash'])
374 weights, total_weight = tracker.get_cumulative_weights(current_work.value['best_share_hash'], min(height, 720), 2**256)
376 for script in sorted(weights, key=lambda s: weights[s]):
377 res[script.encode('hex')] = weights[script]/total_weight
381 reactor.listenTCP(args.worker_port, server.Site(worker_interface.WorkerInterface(current_work, compute, got_response, get_rate, get_users, args.net)))
388 tx_pool = expiring_dict.ExpiringDict(600, get_touches=False) # hash -> tx
389 get_raw_transaction = deferral.DeferredCacher(lambda tx_hash: bitcoind.rpc_getrawtransaction('%x' % tx_hash), expiring_dict.ExpiringDict(100))
392 def __init__(self, tx, seen_at_block):
393 self.hash = bitcoin.data.tx_type.hash256(tx)
395 self.seen_at_block = seen_at_block
396 self.mentions = set([bitcoin.data.tx_type.hash256(tx)] + [tx_in['previous_output']['hash'] for tx_in in tx['tx_ins']])
398 #print '%x %r' % (seen_at_block, tx)
399 #for mention in self.mentions:
400 # print '%x' % mention
402 self.parents_all_in_blocks = False
405 self.value_out = sum(txout['value'] for txout in self.tx['tx_outs'])
406 self._find_parents_in_blocks()
408 @defer.inlineCallbacks
409 def _find_parents_in_blocks(self):
410 for tx_in in self.tx['tx_ins']:
412 raw_transaction = yield get_raw_transaction(tx_in['previous_output']['hash'])
415 self.value_in += raw_transaction['tx']['txouts'][tx_in['previous_output']['index']]['value']
416 #print raw_transaction
417 if not raw_transaction['parent_blocks']:
419 self.parents_all_in_blocks = True
422 if not self.parents_all_in_blocks:
428 @defer.inlineCallbacks
431 assert isinstance(tx_hash, (int, long))
432 #print 'REQUESTING', tx_hash
433 tx = yield (yield factory.getProtocol()).get_tx(tx_hash)
435 tx_pool[bitcoin.data.tx_type.hash256(tx)] = Tx(tx, current_work.value['previous_block'])
437 log.err(None, 'Error handling tx:')
438 # disable for now, for testing impact on stales
439 #factory.new_tx.watch(new_tx)
441 def new_block(block_hash):
442 work_updated.happened()
443 factory.new_block.watch(new_block)
445 print 'Started successfully!'
448 @defer.inlineCallbacks
451 flag = work_updated.get_deferred()
453 yield set_real_work1()
456 yield defer.DeferredList([flag, deferral.sleep(random.expovariate(1/1))], fireOnOneCallback=True)
458 @defer.inlineCallbacks
461 flag = tracker_updated.get_deferred()
463 yield set_real_work2()
466 yield defer.DeferredList([flag, deferral.sleep(random.expovariate(1/1))], fireOnOneCallback=True)
471 counter = skiplist.CountsSkipList(tracker, run_identifier)
474 yield deferral.sleep(random.expovariate(1/1))
476 if current_work.value['best_share_hash'] is not None:
477 height, last = tracker.get_height_and_last(current_work.value['best_share_hash'])
479 att_s = p2pool.get_pool_attempts_per_second(tracker, current_work.value['best_share_hash'], args.net)
480 weights, total_weight = tracker.get_cumulative_weights(current_work.value['best_share_hash'], min(height, 120), 2**100)
481 count = counter(current_work.value['best_share_hash'], height, 2**100)
482 print 'Pool: %sH/s in %i shares Recent: %.02f%% >%sH/s Shares: %i (%i stale) Peers: %i' % (
485 weights.get(my_script, 0)/total_weight*100,
486 math.format(weights.get(my_script, 0)/total_weight*att_s),
488 len(my_shares) - count,
491 #weights, total_weight = tracker.get_cumulative_weights(current_work.value['best_share_hash'], min(height, 100), 2**100)
492 #for k, v in weights.iteritems():
493 # print k.encode('hex'), v/total_weight
497 log.err(None, 'Fatal error:')
501 parser = argparse.ArgumentParser(description='p2pool (version %s)' % (p2pool_init.__version__,))
502 parser.add_argument('--version', action='version', version=p2pool_init.__version__)
503 parser.add_argument('--testnet',
504 help='use the testnet',
505 action='store_const', const=p2pool.Testnet, default=p2pool.Mainnet, dest='net')
506 parser.add_argument('--debug',
507 help='debugging mode',
508 action='store_const', const=True, default=False, dest='debug')
509 parser.add_argument('-a', '--address',
510 help='generate to this address (defaults to requesting one from bitcoind)',
511 type=str, action='store', default=None, dest='address')
513 p2pool_group = parser.add_argument_group('p2pool interface')
514 p2pool_group.add_argument('--p2pool-port', metavar='PORT',
515 help='use TCP port PORT to listen for connections (default: 9333 normally, 19333 for testnet) (forward this port from your router!)',
516 type=int, action='store', default=None, dest='p2pool_port')
517 p2pool_group.add_argument('-n', '--p2pool-node', metavar='ADDR[:PORT]',
518 help='connect to existing p2pool node at ADDR listening on TCP port PORT (defaults to 9333 normally, 19333 for testnet), in addition to builtin addresses',
519 type=str, action='append', default=[], dest='p2pool_nodes')
520 parser.add_argument('-l', '--low-bandwidth',
521 help='trade lower bandwidth usage for higher latency (reduced efficiency)',
522 action='store_true', default=False, dest='low_bandwidth')
524 worker_group = parser.add_argument_group('worker interface')
525 worker_group.add_argument('-w', '--worker-port', metavar='PORT',
526 help='listen on PORT for RPC connections from miners asking for work and providing responses (default: 9332)',
527 type=int, action='store', default=9332, dest='worker_port')
529 bitcoind_group = parser.add_argument_group('bitcoind interface')
530 bitcoind_group.add_argument('--bitcoind-address', metavar='BITCOIND_ADDRESS',
531 help='connect to a bitcoind at this address (default: 127.0.0.1)',
532 type=str, action='store', default='127.0.0.1', dest='bitcoind_address')
533 bitcoind_group.add_argument('--bitcoind-rpc-port', metavar='BITCOIND_RPC_PORT',
534 help='connect to a bitcoind at this port over the RPC interface - used to get the current highest block via getwork (default: 8332)',
535 type=int, action='store', default=8332, dest='bitcoind_rpc_port')
536 bitcoind_group.add_argument('--bitcoind-p2p-port', metavar='BITCOIND_P2P_PORT',
537 help='connect to a bitcoind at this port over the p2p interface - used to submit blocks and get the pubkey to generate to via an IP transaction (default: 8333 normally. 18333 for testnet)',
538 type=int, action='store', default=None, dest='bitcoind_p2p_port')
540 bitcoind_group.add_argument(metavar='BITCOIND_RPC_USERNAME',
541 help='bitcoind RPC interface username',
542 type=str, action='store', dest='bitcoind_rpc_username')
543 bitcoind_group.add_argument(metavar='BITCOIND_RPC_PASSWORD',
544 help='bitcoind RPC interface password',
545 type=str, action='store', dest='bitcoind_rpc_password')
547 args = parser.parse_args()
550 p2pool_init.DEBUG = True
551 class TeePipe(object):
552 def __init__(self, outputs):
553 self.outputs = outputs
554 def write(self, data):
555 for output in self.outputs:
558 for output in self.outputs:
560 class TimestampingPipe(object):
561 def __init__(self, inner_file):
562 self.inner_file = inner_file
565 def write(self, data):
566 buf = self.buf + data
567 lines = buf.split('\n')
568 for line in lines[:-1]:
569 self.inner_file.write('%s %s\n' % (datetime.datetime.now().strftime("%H:%M:%S.%f"), line))
570 self.inner_file.flush()
574 sys.stdout = sys.stderr = log.DefaultObserver.stderr = TimestampingPipe(TeePipe([sys.stderr, open(os.path.join(os.path.dirname(sys.argv[0]), 'debug.log'), 'w')]))
576 if args.bitcoind_p2p_port is None:
577 args.bitcoind_p2p_port = args.net.BITCOIN_P2P_PORT
579 if args.p2pool_port is None:
580 args.p2pool_port = args.net.P2P_PORT
582 if args.address is not None:
584 args.pubkey_hash = bitcoin.data.address_to_pubkey_hash(args.address, args.net)
586 raise ValueError('error parsing address: ' + repr(e))
588 args.pubkey_hash = None
590 reactor.callWhenRunning(main, args)