3 from __future__ import division
16 from twisted.internet import defer, reactor
17 from twisted.web import server, resource
18 from twisted.python import log
19 from nattraverso import portmapper, ipdiscover
21 import bitcoin.p2p, bitcoin.getwork, bitcoin.data
22 from util import db, expiring_dict, jsonrpc, variable, deferral, math
23 from . import p2p, worker_interface, skiplists
24 import p2pool.data as p2pool
25 import p2pool as p2pool_init
27 @deferral.retry('Error getting work from bitcoind:', 3)
28 @defer.inlineCallbacks
29 def getwork(bitcoind):
30 # a block could arrive in between these two queries
31 getwork_df, height_df = bitcoind.rpc_getwork(), bitcoind.rpc_getblocknumber()
33 getwork, height = bitcoin.getwork.BlockAttempt.from_getwork((yield getwork_df)), (yield height_df)
35 # get rid of residual errors
36 getwork_df.addErrback(lambda fail: None)
37 height_df.addErrback(lambda fail: None)
38 defer.returnValue((getwork, height))
40 @deferral.retry('Error getting payout script from bitcoind:', 1)
41 @defer.inlineCallbacks
42 def get_payout_script(factory):
43 res = yield (yield factory.getProtocol()).check_order(order=bitcoin.p2p.Protocol.null_order)
44 if res['reply'] == 'success':
45 defer.returnValue(res['script'])
46 elif res['reply'] == 'denied':
47 defer.returnValue(None)
49 raise ValueError('Unexpected reply: %r' % (res,))
51 @deferral.retry('Error creating payout script:', 10)
52 @defer.inlineCallbacks
53 def get_payout_script2(bitcoind, net):
54 defer.returnValue(bitcoin.data.pubkey_hash_to_script2(bitcoin.data.address_to_pubkey_hash((yield bitcoind.rpc_getaccountaddress('p2pool')), net)))
56 @defer.inlineCallbacks
62 print 'p2pool (version %s)' % (p2pool_init.__version__,)
65 # connect to bitcoind over JSON-RPC and do initial getwork
66 url = 'http://%s:%i/' % (args.bitcoind_address, args.bitcoind_rpc_port)
67 print '''Testing bitcoind RPC connection to '%s' with username '%s'...''' % (url, args.bitcoind_rpc_username)
68 bitcoind = jsonrpc.Proxy(url, (args.bitcoind_rpc_username, args.bitcoind_rpc_password))
69 temp_work, temp_height = yield getwork(bitcoind)
71 print ' Current block hash: %x height: %i' % (temp_work.previous_block, temp_height)
74 # connect to bitcoind over bitcoin-p2p and do checkorder to get pubkey to send payouts to
75 print '''Testing bitcoind P2P connection to '%s:%s'...''' % (args.bitcoind_address, args.bitcoind_p2p_port)
76 factory = bitcoin.p2p.ClientFactory(args.net)
77 reactor.connectTCP(args.bitcoind_address, args.bitcoind_p2p_port, factory)
78 my_script = yield get_payout_script(factory)
79 if args.pubkey_hash is None:
81 print ' IP transaction denied ... falling back to sending to address.'
82 my_script = yield get_payout_script2(bitcoind, args.net)
84 my_script = bitcoin.data.pubkey_hash_to_script2(args.pubkey_hash)
86 print ' Payout script:', my_script.encode('hex')
89 ht = bitcoin.p2p.HeightTracker(factory)
91 tracker = p2pool.OkayTracker(args.net)
92 chains = expiring_dict.ExpiringDict(300)
93 def get_chain(chain_id_data):
94 return chains.setdefault(chain_id_data, Chain(chain_id_data))
96 peer_heads = expiring_dict.ExpiringDict(300) # hash -> peers that know of it
98 # information affecting work that should trigger a long-polling update
99 current_work = variable.Variable(None)
100 # information affecting work that should not trigger a long-polling update
101 current_work2 = variable.Variable(None)
103 work_updated = variable.Event()
105 requested = expiring_dict.ExpiringDict(300)
107 @defer.inlineCallbacks
108 def set_real_work1():
109 work, height = yield getwork(bitcoind)
110 changed = work.previous_block != current_work.value['previous_block'] if current_work.value is not None else True
111 current_work.set(dict(
112 version=work.version,
113 previous_block=work.previous_block,
116 best_share_hash=current_work.value['best_share_hash'] if current_work.value is not None else None,
118 current_work2.set(dict(
119 clock_offset=time.time() - work.timestamp,
124 def set_real_work2():
125 best, desired = tracker.think(ht, current_work.value['previous_block'], time.time() - current_work2.value['clock_offset'])
127 t = dict(current_work.value)
128 t['best_share_hash'] = best
132 for peer2, share_hash in desired:
133 if share_hash not in tracker.tails: # was received in the time tracker.think was running
135 last_request_time, count = requested.get(share_hash, (None, 0))
136 if last_request_time is not None and last_request_time - 5 < t < last_request_time + 10 * 1.5**count:
138 potential_peers = set()
139 for head in tracker.tails[share_hash]:
140 potential_peers.update(peer_heads.get(head, set()))
141 potential_peers = [peer for peer in potential_peers if peer.connected2]
142 if count == 0 and peer2 is not None and peer2.connected2:
145 peer = random.choice(potential_peers) if potential_peers and random.random() > .2 else peer2
149 print 'Requesting parent share %s from %s' % (p2pool.format_hash(share_hash), '%s:%i' % peer.addr)
153 stops=list(set(tracker.heads) | set(
154 tracker.get_nth_parent_hash(head, min(max(0, tracker.get_height_and_last(head)[0] - 1), 10)) for head in tracker.heads
157 requested[share_hash] = t, count + 1
159 print 'Initializing work...'
160 yield set_real_work1()
164 start_time = time.time() - current_work2.value['clock_offset']
166 # setup p2p logic and join p2pool network
168 def share_share(share, ignore_peer=None):
169 for peer in p2p_node.peers.itervalues():
170 if peer is ignore_peer:
172 #if p2pool_init.DEBUG:
173 # print "Sending share %s to %r" % (p2pool.format_hash(share.hash), peer.addr)
174 peer.send_shares([share])
177 def p2p_shares(shares, peer=None):
179 print 'Processing %i shares...' % (len(shares),)
183 if share.hash in tracker.shares:
184 #print 'Got duplicate share, ignoring. Hash: %s' % (p2pool.format_hash(share.hash),)
188 #print 'Received share %s from %r' % (p2pool.format_hash(share.hash), share.peer.addr if share.peer is not None else None)
191 #for peer2, share_hash in desired:
192 # print 'Requesting parent share %x' % (share_hash,)
193 # peer2.send_getshares(hashes=[share_hash], parents=2000)
195 if share.bitcoin_hash <= share.header['target']:
197 print 'GOT BLOCK! Passing to bitcoind! %s bitcoin: %x' % (p2pool.format_hash(share.hash), share.bitcoin_hash,)
199 if factory.conn.value is not None:
200 factory.conn.value.send_block(block=share.as_block(tracker, args.net))
202 print 'No bitcoind connection! Erp!'
204 if shares and peer is not None:
205 peer_heads.setdefault(shares[0].hash, set()).add(peer)
211 print '... done processing %i shares. Have: %i/~%i' % (len(shares), len(tracker.shares), 2*args.net.CHAIN_LENGTH)
213 def p2p_share_hashes(share_hashes, peer):
216 for share_hash in share_hashes:
217 if share_hash in tracker.shares:
219 last_request_time, count = requested.get(share_hash, (None, 0))
220 if last_request_time is not None and last_request_time - 5 < t < last_request_time + 10 * 1.5**count:
222 print 'Got share hash, requesting! Hash: %s' % (p2pool.format_hash(share_hash),)
223 get_hashes.append(share_hash)
224 requested[share_hash] = t, count + 1
226 if share_hashes and peer is not None:
227 peer_heads.setdefault(share_hashes[0], set()).add(peer)
229 peer.send_getshares(hashes=get_hashes, parents=0, stops=[])
231 def p2p_get_shares(share_hashes, parents, stops, peer):
232 parents = min(parents, 1000//len(share_hashes))
235 for share_hash in share_hashes:
236 for share in itertools.islice(tracker.get_chain_known(share_hash), parents + 1):
237 if share.hash in stops:
240 peer.send_shares(shares, full=True)
242 print 'Joining p2pool network using TCP port %i...' % (args.p2pool_port,)
246 ip, port = x.split(':')
249 return x, args.net.P2P_PORT
252 ('72.14.191.28', args.net.P2P_PORT),
253 ('62.204.197.159', args.net.P2P_PORT),
254 ('142.58.248.28', args.net.P2P_PORT),
255 ('94.23.34.145', args.net.P2P_PORT),
259 'dabuttonfactory.com',
262 nodes.add(((yield reactor.resolve(host)), args.net.P2P_PORT))
264 log.err(None, 'Error resolving bootstrap node IP:')
267 current_work=current_work,
268 port=args.p2pool_port,
270 addr_store=db.SQLiteDict(sqlite3.connect(os.path.join(os.path.dirname(sys.argv[0]), 'addrs.dat'), isolation_level=None), args.net.ADDRS_TABLE),
271 mode=0 if args.low_bandwidth else 1,
272 preferred_addrs=set(map(parse, args.p2pool_nodes)) | nodes,
274 p2p_node.handle_shares = p2p_shares
275 p2p_node.handle_share_hashes = p2p_share_hashes
276 p2p_node.handle_get_shares = p2p_get_shares
280 # send share when the chain changes to their chain
281 def work_changed(new_work):
282 #print 'Work changed:', new_work
283 for share in tracker.get_chain_known(new_work['best_share_hash']):
286 share_share(share, share.peer)
287 current_work.changed.watch(work_changed)
292 @defer.inlineCallbacks
296 is_lan, lan_ip = yield ipdiscover.get_local_ip()
299 pm = yield portmapper.get_port_mapper()
300 yield pm._upnp.add_port_mapping(lan_ip, args.net.P2P_PORT, args.net.P2P_PORT, 'p2pool', 'TCP')
302 if p2pool_init.DEBUG:
304 yield deferral.sleep(random.expovariate(1/120))
309 # start listening for workers with a JSON-RPC server
311 print 'Listening for workers on port %i...' % (args.worker_port,)
315 merkle_root_to_transactions = expiring_dict.ExpiringDict(300)
316 run_identifier = struct.pack('<Q', random.randrange(2**64))
318 def compute(state, payout_script):
319 if payout_script is None:
320 payout_script = my_script
321 if state['best_share_hash'] is None and args.net.PERSIST:
322 raise jsonrpc.Error(-12345, u'p2pool is downloading shares')
323 pre_extra_txs = [tx for tx in tx_pool.itervalues() if tx.is_good()]
324 pre_extra_txs = pre_extra_txs[:2**16 - 1] # merkle_branch limit
327 for tx in pre_extra_txs:
328 this_size = len(bitcoin.data.tx_type.pack(tx.tx))
329 if size + this_size > 500000:
334 # XXX assuming generate_tx is smallish here..
335 generate_tx = p2pool.generate_transaction(
337 previous_share_hash=state['best_share_hash'],
338 new_script=payout_script,
339 subsidy=(50*100000000 >> (state['height'] + 1)//210000) + sum(tx.value_in - tx.value_out for tx in extra_txs),
340 nonce=run_identifier + struct.pack('<Q', random.randrange(2**64)),
341 block_target=state['target'],
344 print 'Generating! Difficulty: %.06f Payout if block: %.6f BTC' % (0xffff*2**208/p2pool.coinbase_type.unpack(generate_tx['tx_ins'][0]['script'])['share_data']['target'], generate_tx['tx_outs'][-1]['value']*1e-8)
345 #print 'Target: %x' % (p2pool.coinbase_type.unpack(generate_tx['tx_ins'][0]['script'])['share_data']['target'],)
346 #, have', shares.count(my_script) - 2, 'share(s) in the current chain. Fee:', sum(tx.value_in - tx.value_out for tx in extra_txs)/100000000
347 transactions = [generate_tx] + [tx.tx for tx in extra_txs]
348 merkle_root = bitcoin.data.merkle_hash(transactions)
349 merkle_root_to_transactions[merkle_root] = transactions # will stay for 1000 seconds
351 timestamp = int(time.time() - current_work2.value['clock_offset'])
352 if state['best_share_hash'] is not None:
353 timestamp2 = math.median((s.timestamp for s in itertools.islice(tracker.get_chain_to_root(state['best_share_hash']), 11)), use_float=False) + 1
354 if timestamp2 > timestamp:
355 print 'Toff', timestamp2 - timestamp
356 timestamp = timestamp2
357 target2 = p2pool.coinbase_type.unpack(generate_tx['tx_ins'][0]['script'])['share_data']['target']
358 times[p2pool.coinbase_type.unpack(generate_tx['tx_ins'][0]['script'])['share_data']['nonce']] = time.time()
359 #print 'SENT', 2**256//p2pool.coinbase_type.unpack(generate_tx['tx_ins'][0]['script'])['share_data']['target']
360 return bitcoin.getwork.BlockAttempt(state['version'], state['previous_block'], merkle_root, timestamp, state['target'], target2)
365 def got_response(data):
367 # match up with transactions
368 header = bitcoin.getwork.decode_data(data)
369 transactions = merkle_root_to_transactions.get(header['merkle_root'], None)
370 if transactions is None:
371 print '''Couldn't link returned work's merkle root with its transactions - should only happen if you recently restarted p2pool'''
373 block = dict(header=header, txs=transactions)
374 hash_ = bitcoin.data.block_header_type.hash256(block['header'])
375 if hash_ <= block['header']['target'] or p2pool_init.DEBUG:
377 print 'GOT BLOCK! Passing to bitcoind! bitcoin: %x' % (hash_,)
379 if factory.conn.value is not None:
380 factory.conn.value.send_block(block=block)
382 print 'No bitcoind connection! Erp!'
383 target = p2pool.coinbase_type.unpack(transactions[0]['tx_ins'][0]['script'])['share_data']['target']
385 print 'Received invalid share from worker - %x/%x' % (hash_, target)
387 share = p2pool.Share.from_block(block)
388 my_shares.add(share.hash)
389 print 'GOT SHARE! %s prev %s age %.2fs' % (p2pool.format_hash(share.hash), p2pool.format_hash(share.previous_hash), time.time() - times[share.nonce]) + (' DEAD ON ARRIVAL' if share.previous_hash != current_work.value['best_share_hash'] else '')
390 good = share.previous_hash == current_work.value['best_share_hash']
391 # maybe revert back to tracker being non-blocking so 'good' can be more accurate?
393 # eg. good = share.hash == current_work.value['best_share_hash'] here
396 log.err(None, 'Error processing data received from worker:')
399 web_root = worker_interface.WorkerInterface(current_work, compute, got_response, args.net)
402 if current_work.value['best_share_hash'] is not None:
403 height, last = tracker.get_height_and_last(current_work.value['best_share_hash'])
404 att_s = p2pool.get_pool_attempts_per_second(tracker, current_work.value['best_share_hash'], args.net, min(height, 720))
405 return json.dumps(att_s)
406 return json.dumps(None)
409 height, last = tracker.get_height_and_last(current_work.value['best_share_hash'])
410 weights, total_weight = tracker.get_cumulative_weights(current_work.value['best_share_hash'], min(height, 720), 2**256)
412 for script in sorted(weights, key=lambda s: weights[s]):
413 res[bitcoin.data.script2_to_human(script, args.net)] = weights[script]/total_weight
414 return json.dumps(res)
416 class WebInterface(resource.Resource):
417 def __init__(self, func, mime_type):
418 self.func, self.mime_type = func, mime_type
420 def render_GET(self, request):
421 request.setHeader('Content-Type', self.mime_type)
424 web_root.putChild('rate', WebInterface(get_rate, 'application/json'))
425 web_root.putChild('users', WebInterface(get_users, 'application/json'))
427 web_root.putChild('chain_img', WebInterface(lambda: draw.get(tracker, current_work.value['best_share_hash']), 'image/png'))
429 reactor.listenTCP(args.worker_port, server.Site(web_root))
436 tx_pool = expiring_dict.ExpiringDict(600, get_touches=False) # hash -> tx
437 get_raw_transaction = deferral.DeferredCacher(lambda tx_hash: bitcoind.rpc_getrawtransaction('%x' % tx_hash), expiring_dict.ExpiringDict(100))
440 def __init__(self, tx, seen_at_block):
441 self.hash = bitcoin.data.tx_type.hash256(tx)
443 self.seen_at_block = seen_at_block
444 self.mentions = set([bitcoin.data.tx_type.hash256(tx)] + [tx_in['previous_output']['hash'] for tx_in in tx['tx_ins']])
446 #print '%x %r' % (seen_at_block, tx)
447 #for mention in self.mentions:
448 # print '%x' % mention
450 self.parents_all_in_blocks = False
453 self.value_out = sum(txout['value'] for txout in self.tx['tx_outs'])
454 self._find_parents_in_blocks()
456 @defer.inlineCallbacks
457 def _find_parents_in_blocks(self):
458 for tx_in in self.tx['tx_ins']:
460 raw_transaction = yield get_raw_transaction(tx_in['previous_output']['hash'])
463 self.value_in += raw_transaction['tx']['txouts'][tx_in['previous_output']['index']]['value']
464 #print raw_transaction
465 if not raw_transaction['parent_blocks']:
467 self.parents_all_in_blocks = True
470 if not self.parents_all_in_blocks:
476 @defer.inlineCallbacks
479 assert isinstance(tx_hash, (int, long))
480 #print 'REQUESTING', tx_hash
481 tx = yield (yield factory.getProtocol()).get_tx(tx_hash)
483 tx_pool[bitcoin.data.tx_type.hash256(tx)] = Tx(tx, current_work.value['previous_block'])
485 log.err(None, 'Error handling tx:')
486 # disable for now, for testing impact on stales
487 #factory.new_tx.watch(new_tx)
489 def new_block(block_hash):
490 work_updated.happened()
491 factory.new_block.watch(new_block)
493 print 'Started successfully!'
496 ht.updated.watch(set_real_work2)
498 @defer.inlineCallbacks
501 flag = work_updated.get_deferred()
503 yield set_real_work1()
506 yield defer.DeferredList([flag, deferral.sleep(random.expovariate(1/20))], fireOnOneCallback=True)
508 @defer.inlineCallbacks
515 yield deferral.sleep(random.expovariate(1/20))
520 counter = skiplists.CountsSkipList(tracker, run_identifier)
523 yield deferral.sleep(3)
525 if current_work.value['best_share_hash'] is not None:
526 height, last = tracker.get_height_and_last(current_work.value['best_share_hash'])
528 att_s = p2pool.get_pool_attempts_per_second(tracker, current_work.value['best_share_hash'], args.net)
529 weights, total_weight = tracker.get_cumulative_weights(current_work.value['best_share_hash'], min(height, 120), 2**100)
530 matching_in_chain = counter(current_work.value['best_share_hash'], height)
531 shares_in_chain = my_shares & matching_in_chain
532 stale_shares = my_shares - matching_in_chain
533 print 'Pool: %sH/s in %i shares Recent: %.02f%% >%sH/s Shares: %i (%i stale) Peers: %i' % (
536 weights.get(my_script, 0)/total_weight*100,
537 math.format(weights.get(my_script, 0)/total_weight*att_s),
538 len(shares_in_chain) + len(stale_shares),
542 #weights, total_weight = tracker.get_cumulative_weights(current_work.value['best_share_hash'], min(height, 100), 2**100)
543 #for k, v in weights.iteritems():
544 # print k.encode('hex'), v/total_weight
548 log.err(None, 'Fatal error:')
552 parser = argparse.ArgumentParser(description='p2pool (version %s)' % (p2pool_init.__version__,))
553 parser.add_argument('--version', action='version', version=p2pool_init.__version__)
554 parser.add_argument('--testnet',
555 help='use the testnet',
556 action='store_const', const=p2pool.Testnet, default=p2pool.Mainnet, dest='net')
557 parser.add_argument('--debug',
558 help='debugging mode',
559 action='store_const', const=True, default=False, dest='debug')
560 parser.add_argument('-a', '--address',
561 help='generate to this address (defaults to requesting one from bitcoind)',
562 type=str, action='store', default=None, dest='address')
563 parser.add_argument('--charts',
564 help='generate charts on the web interface (requires PIL and pygame)',
565 action='store_const', const=True, default=False, dest='charts')
567 p2pool_group = parser.add_argument_group('p2pool interface')
568 p2pool_group.add_argument('--p2pool-port', metavar='PORT',
569 help='use TCP port PORT to listen for connections (default: 9333 normally, 19333 for testnet) (forward this port from your router!)',
570 type=int, action='store', default=None, dest='p2pool_port')
571 p2pool_group.add_argument('-n', '--p2pool-node', metavar='ADDR[:PORT]',
572 help='connect to existing p2pool node at ADDR listening on TCP port PORT (defaults to 9333 normally, 19333 for testnet), in addition to builtin addresses',
573 type=str, action='append', default=[], dest='p2pool_nodes')
574 parser.add_argument('-l', '--low-bandwidth',
575 help='trade lower bandwidth usage for higher latency (reduced efficiency)',
576 action='store_true', default=False, dest='low_bandwidth')
577 parser.add_argument('--disable-upnp',
578 help='''don't attempt to forward port 9333 (19333 for testnet) from the WAN to this computer using UPnP''',
579 action='store_false', default=True, dest='upnp')
581 worker_group = parser.add_argument_group('worker interface')
582 worker_group.add_argument('-w', '--worker-port', metavar='PORT',
583 help='listen on PORT for RPC connections from miners asking for work and providing responses (default: 9332)',
584 type=int, action='store', default=9332, dest='worker_port')
586 bitcoind_group = parser.add_argument_group('bitcoind interface')
587 bitcoind_group.add_argument('--bitcoind-address', metavar='BITCOIND_ADDRESS',
588 help='connect to a bitcoind at this address (default: 127.0.0.1)',
589 type=str, action='store', default='127.0.0.1', dest='bitcoind_address')
590 bitcoind_group.add_argument('--bitcoind-rpc-port', metavar='BITCOIND_RPC_PORT',
591 help='connect to a bitcoind at this port over the RPC interface - used to get the current highest block via getwork (default: 8332)',
592 type=int, action='store', default=8332, dest='bitcoind_rpc_port')
593 bitcoind_group.add_argument('--bitcoind-p2p-port', metavar='BITCOIND_P2P_PORT',
594 help='connect to a bitcoind at this port over the p2p interface - used to submit blocks and get the pubkey to generate to via an IP transaction (default: 8333 normally. 18333 for testnet)',
595 type=int, action='store', default=None, dest='bitcoind_p2p_port')
597 bitcoind_group.add_argument(metavar='BITCOIND_RPC_USERNAME',
598 help='bitcoind RPC interface username',
599 type=str, action='store', dest='bitcoind_rpc_username')
600 bitcoind_group.add_argument(metavar='BITCOIND_RPC_PASSWORD',
601 help='bitcoind RPC interface password',
602 type=str, action='store', dest='bitcoind_rpc_password')
604 args = parser.parse_args()
607 p2pool_init.DEBUG = True
608 class TeePipe(object):
609 def __init__(self, outputs):
610 self.outputs = outputs
611 def write(self, data):
612 for output in self.outputs:
615 for output in self.outputs:
617 class TimestampingPipe(object):
618 def __init__(self, inner_file):
619 self.inner_file = inner_file
622 def write(self, data):
623 buf = self.buf + data
624 lines = buf.split('\n')
625 for line in lines[:-1]:
626 self.inner_file.write('%s %s\n' % (datetime.datetime.now().strftime("%H:%M:%S.%f"), line))
627 self.inner_file.flush()
631 sys.stdout = sys.stderr = log.DefaultObserver.stderr = TimestampingPipe(TeePipe([sys.stderr, open(os.path.join(os.path.dirname(sys.argv[0]), 'debug.log'), 'w')]))
633 if args.bitcoind_p2p_port is None:
634 args.bitcoind_p2p_port = args.net.BITCOIN_P2P_PORT
636 if args.p2pool_port is None:
637 args.p2pool_port = args.net.P2P_PORT
639 if args.address is not None:
641 args.pubkey_hash = bitcoin.data.address_to_pubkey_hash(args.address, args.net)
643 raise ValueError('error parsing address: ' + repr(e))
645 args.pubkey_hash = None
647 reactor.callWhenRunning(main, args)