3 from __future__ import division
16 from twisted.internet import defer, reactor
17 from twisted.web import server, resource
18 from twisted.python import log
19 from nattraverso import portmapper, ipdiscover
21 import bitcoin.p2p, bitcoin.getwork, bitcoin.data
22 from util import db, expiring_dict, jsonrpc, variable, deferral, math
23 from . import p2p, worker_interface, skiplists
24 import p2pool.data as p2pool
25 import p2pool as p2pool_init
27 @deferral.retry('Error getting work from bitcoind:', 3)
28 @defer.inlineCallbacks
29 def getwork(bitcoind):
30 # a block could arrive in between these two queries
31 getwork_df, height_df = bitcoind.rpc_getwork(), bitcoind.rpc_getblocknumber()
33 getwork, height = bitcoin.getwork.BlockAttempt.from_getwork((yield getwork_df)), (yield height_df)
35 # get rid of residual errors
36 getwork_df.addErrback(lambda fail: None)
37 height_df.addErrback(lambda fail: None)
38 defer.returnValue((getwork, height))
40 @deferral.retry('Error getting payout script from bitcoind:', 1)
41 @defer.inlineCallbacks
42 def get_payout_script(factory):
43 res = yield (yield factory.getProtocol()).check_order(order=bitcoin.p2p.Protocol.null_order)
44 if res['reply'] == 'success':
45 defer.returnValue(res['script'])
46 elif res['reply'] == 'denied':
47 defer.returnValue(None)
49 raise ValueError('Unexpected reply: %r' % (res,))
51 @deferral.retry('Error creating payout script:', 10)
52 @defer.inlineCallbacks
53 def get_payout_script2(bitcoind, net):
54 defer.returnValue(bitcoin.data.pubkey_hash_to_script2(bitcoin.data.address_to_pubkey_hash((yield bitcoind.rpc_getaccountaddress('p2pool')), net)))
56 @defer.inlineCallbacks
62 print 'p2pool (version %s)' % (p2pool_init.__version__,)
65 # connect to bitcoind over JSON-RPC and do initial getwork
66 url = 'http://%s:%i/' % (args.bitcoind_address, args.bitcoind_rpc_port)
67 print '''Testing bitcoind RPC connection to '%s' with username '%s'...''' % (url, args.bitcoind_rpc_username)
68 bitcoind = jsonrpc.Proxy(url, (args.bitcoind_rpc_username, args.bitcoind_rpc_password))
69 temp_work, temp_height = yield getwork(bitcoind)
71 print ' Current block hash: %x height: %i' % (temp_work.previous_block, temp_height)
74 # connect to bitcoind over bitcoin-p2p and do checkorder to get pubkey to send payouts to
75 print '''Testing bitcoind P2P connection to '%s:%s'...''' % (args.bitcoind_address, args.bitcoind_p2p_port)
76 factory = bitcoin.p2p.ClientFactory(args.net)
77 reactor.connectTCP(args.bitcoind_address, args.bitcoind_p2p_port, factory)
78 my_script = yield get_payout_script(factory)
79 if args.pubkey_hash is None:
81 print ' IP transaction denied ... falling back to sending to address.'
82 my_script = yield get_payout_script2(bitcoind, args.net)
84 my_script = bitcoin.data.pubkey_hash_to_script2(args.pubkey_hash)
86 print ' Payout script:', my_script.encode('hex')
89 ht = bitcoin.p2p.HeightTracker(factory)
91 tracker = p2pool.OkayTracker(args.net)
92 chains = expiring_dict.ExpiringDict(300)
93 def get_chain(chain_id_data):
94 return chains.setdefault(chain_id_data, Chain(chain_id_data))
96 peer_heads = expiring_dict.ExpiringDict(300) # hash -> peers that know of it
98 # information affecting work that should trigger a long-polling update
99 current_work = variable.Variable(None)
100 # information affecting work that should not trigger a long-polling update
101 current_work2 = variable.Variable(None)
103 work_updated = variable.Event()
105 requested = expiring_dict.ExpiringDict(300)
107 @defer.inlineCallbacks
108 def set_real_work1():
109 work, height = yield getwork(bitcoind)
110 changed = work.previous_block != current_work.value['previous_block'] if current_work.value is not None else True
111 current_work.set(dict(
112 version=work.version,
113 previous_block=work.previous_block,
116 best_share_hash=current_work.value['best_share_hash'] if current_work.value is not None else None,
118 current_work2.set(dict(
119 clock_offset=time.time() - work.timestamp,
124 def set_real_work2():
125 best, desired = tracker.think(ht, current_work.value['previous_block'], time.time() - current_work2.value['clock_offset'])
127 t = dict(current_work.value)
128 t['best_share_hash'] = best
132 for peer2, share_hash in desired:
133 #if share_hash not in tracker.tails: # was received in the time tracker.think was running
135 last_request_time, count = requested.get(share_hash, (None, 0))
136 if last_request_time is not None and last_request_time - 5 < t < last_request_time + 10 * 1.5**count:
138 potential_peers = set()
139 for head in tracker.tails[share_hash]:
140 potential_peers.update(peer_heads.get(head, set()))
141 potential_peers = [peer for peer in potential_peers if peer.connected2]
142 if count == 0 and peer2 is not None and peer2.connected2:
145 peer = random.choice(potential_peers) if potential_peers and random.random() > .2 else peer2
149 print 'Requesting parent share %s from %s' % (p2pool.format_hash(share_hash), '%s:%i' % peer.addr)
153 stops=list(set(tracker.heads) | set(
154 tracker.get_nth_parent_hash(head, min(max(0, tracker.get_height_and_last(head)[0] - 1), 10)) for head in tracker.heads
157 requested[share_hash] = t, count + 1
159 print 'Initializing work...'
160 yield set_real_work1()
164 start_time = time.time() - current_work2.value['clock_offset']
166 # setup p2p logic and join p2pool network
168 def share_share(share, ignore_peer=None):
169 for peer in p2p_node.peers.itervalues():
170 if peer is ignore_peer:
172 #if p2pool_init.DEBUG:
173 # print "Sending share %s to %r" % (p2pool.format_hash(share.hash), peer.addr)
174 peer.send_shares([share])
177 def p2p_shares(shares, peer=None):
179 print 'Processing %i shares...' % (len(shares),)
183 if share.hash in tracker.shares:
184 #print 'Got duplicate share, ignoring. Hash: %s' % (p2pool.format_hash(share.hash),)
188 #print 'Received share %s from %r' % (p2pool.format_hash(share.hash), share.peer.addr if share.peer is not None else None)
191 #for peer2, share_hash in desired:
192 # print 'Requesting parent share %x' % (share_hash,)
193 # peer2.send_getshares(hashes=[share_hash], parents=2000)
195 if share.bitcoin_hash <= share.header['target']:
197 print 'GOT BLOCK! Passing to bitcoind! %s bitcoin: %x' % (p2pool.format_hash(share.hash), share.bitcoin_hash,)
199 if factory.conn.value is not None:
200 factory.conn.value.send_block(block=share.as_block(tracker, args.net))
202 print 'No bitcoind connection! Erp!'
204 if shares and peer is not None:
205 peer_heads.setdefault(shares[0].hash, set()).add(peer)
211 print '... done processing %i shares. Have: %i/~%i' % (len(shares), len(tracker.shares), 2*args.net.CHAIN_LENGTH)
213 def p2p_share_hashes(share_hashes, peer):
216 for share_hash in share_hashes:
217 if share_hash in tracker.shares:
219 last_request_time, count = requested.get(share_hash, (None, 0))
220 if last_request_time is not None and last_request_time - 5 < t < last_request_time + 10 * 1.5**count:
222 print 'Got share hash, requesting! Hash: %s' % (p2pool.format_hash(share_hash),)
223 get_hashes.append(share_hash)
224 requested[share_hash] = t, count + 1
226 if share_hashes and peer is not None:
227 peer_heads.setdefault(share_hashes[0], set()).add(peer)
229 peer.send_getshares(hashes=get_hashes, parents=0, stops=[])
231 def p2p_get_shares(share_hashes, parents, stops, peer):
232 parents = min(parents, 1000//len(share_hashes))
235 for share_hash in share_hashes:
236 for share in itertools.islice(tracker.get_chain_known(share_hash), parents + 1):
237 if share.hash in stops:
240 peer.send_shares(shares, full=True)
242 print 'Joining p2pool network using TCP port %i...' % (args.p2pool_port,)
246 ip, port = x.split(':')
249 return x, args.net.P2P_PORT
252 ('72.14.191.28', args.net.P2P_PORT),
253 ('62.204.197.159', args.net.P2P_PORT),
254 ('142.58.248.28', args.net.P2P_PORT),
255 ('94.23.34.145', args.net.P2P_PORT),
259 'dabuttonfactory.com',
262 nodes.add(((yield reactor.resolve(host)), args.net.P2P_PORT))
264 log.err(None, 'Error resolving bootstrap node IP:')
267 current_work=current_work,
268 port=args.p2pool_port,
270 addr_store=db.SQLiteDict(sqlite3.connect(os.path.join(os.path.dirname(sys.argv[0]), 'addrs.dat'), isolation_level=None), args.net.ADDRS_TABLE),
271 mode=0 if args.low_bandwidth else 1,
272 preferred_addrs=set(map(parse, args.p2pool_nodes)) | nodes,
274 p2p_node.handle_shares = p2p_shares
275 p2p_node.handle_share_hashes = p2p_share_hashes
276 p2p_node.handle_get_shares = p2p_get_shares
280 # send share when the chain changes to their chain
281 def work_changed(new_work):
282 #print 'Work changed:', new_work
283 for share in tracker.get_chain_known(new_work['best_share_hash']):
286 share_share(share, share.peer)
287 current_work.changed.watch(work_changed)
292 @defer.inlineCallbacks
296 is_lan, lan_ip = yield ipdiscover.get_local_ip()
299 pm = yield portmapper.get_port_mapper()
300 yield pm._upnp.add_port_mapping(lan_ip, args.net.P2P_PORT, args.net.P2P_PORT, 'p2pool', 'TCP')
303 yield deferral.sleep(random.expovariate(1/120))
308 # start listening for workers with a JSON-RPC server
310 print 'Listening for workers on port %i...' % (args.worker_port,)
314 merkle_root_to_transactions = expiring_dict.ExpiringDict(300)
315 run_identifier = struct.pack('<Q', random.randrange(2**64))
317 def compute(state, payout_script):
318 if payout_script is None:
319 payout_script = my_script
320 if state['best_share_hash'] is None and args.net.PERSIST:
321 raise jsonrpc.Error(-12345, u'p2pool is downloading shares')
322 pre_extra_txs = [tx for tx in tx_pool.itervalues() if tx.is_good()]
323 pre_extra_txs = pre_extra_txs[:2**16 - 1] # merkle_branch limit
326 for tx in pre_extra_txs:
327 this_size = len(bitcoin.data.tx_type.pack(tx.tx))
328 if size + this_size > 500000:
333 # XXX assuming generate_tx is smallish here..
334 generate_tx = p2pool.generate_transaction(
336 previous_share_hash=state['best_share_hash'],
337 new_script=payout_script,
338 subsidy=(50*100000000 >> (state['height'] + 1)//210000) + sum(tx.value_in - tx.value_out for tx in extra_txs),
339 nonce=run_identifier + struct.pack('<Q', random.randrange(2**64)),
340 block_target=state['target'],
343 print 'Generating! Difficulty: %.06f Payout if block: %.6f BTC' % (0xffff*2**208/p2pool.coinbase_type.unpack(generate_tx['tx_ins'][0]['script'])['share_data']['target'], generate_tx['tx_outs'][-1]['value']*1e-8)
344 #print 'Target: %x' % (p2pool.coinbase_type.unpack(generate_tx['tx_ins'][0]['script'])['share_data']['target'],)
345 #, have', shares.count(my_script) - 2, 'share(s) in the current chain. Fee:', sum(tx.value_in - tx.value_out for tx in extra_txs)/100000000
346 transactions = [generate_tx] + [tx.tx for tx in extra_txs]
347 merkle_root = bitcoin.data.merkle_hash(transactions)
348 merkle_root_to_transactions[merkle_root] = transactions # will stay for 1000 seconds
350 timestamp = int(time.time() - current_work2.value['clock_offset'])
351 if state['best_share_hash'] is not None:
352 timestamp2 = math.median((s.timestamp for s in itertools.islice(tracker.get_chain_to_root(state['best_share_hash']), 11)), use_float=False) + 1
353 if timestamp2 > timestamp:
354 print 'Toff', timestamp2 - timestamp
355 timestamp = timestamp2
356 target2 = p2pool.coinbase_type.unpack(generate_tx['tx_ins'][0]['script'])['share_data']['target']
357 times[p2pool.coinbase_type.unpack(generate_tx['tx_ins'][0]['script'])['share_data']['nonce']] = time.time()
358 #print 'SENT', 2**256//p2pool.coinbase_type.unpack(generate_tx['tx_ins'][0]['script'])['share_data']['target']
359 return bitcoin.getwork.BlockAttempt(state['version'], state['previous_block'], merkle_root, timestamp, state['target'], target2)
364 def got_response(data):
366 # match up with transactions
367 header = bitcoin.getwork.decode_data(data)
368 transactions = merkle_root_to_transactions.get(header['merkle_root'], None)
369 if transactions is None:
370 print '''Couldn't link returned work's merkle root with its transactions - should only happen if you recently restarted p2pool'''
372 block = dict(header=header, txs=transactions)
373 hash_ = bitcoin.data.block_header_type.hash256(block['header'])
374 if hash_ <= block['header']['target'] or p2pool_init.DEBUG:
376 print 'GOT BLOCK! Passing to bitcoind! bitcoin: %x' % (hash_,)
378 if factory.conn.value is not None:
379 factory.conn.value.send_block(block=block)
381 print 'No bitcoind connection! Erp!'
382 target = p2pool.coinbase_type.unpack(transactions[0]['tx_ins'][0]['script'])['share_data']['target']
384 print 'Received invalid share from worker - %x/%x' % (hash_, target)
386 share = p2pool.Share.from_block(block)
387 my_shares.add(share.hash)
388 print 'GOT SHARE! %s prev %s age %.2fs' % (p2pool.format_hash(share.hash), p2pool.format_hash(share.previous_hash), time.time() - times[share.nonce]) + (' DEAD ON ARRIVAL' if share.previous_hash != current_work.value['best_share_hash'] else '')
389 good = share.previous_hash == current_work.value['best_share_hash']
390 # maybe revert back to tracker being non-blocking so 'good' can be more accurate?
392 # eg. good = share.hash == current_work.value['best_share_hash'] here
395 log.err(None, 'Error processing data received from worker:')
398 web_root = worker_interface.WorkerInterface(current_work, compute, got_response, args.net)
401 if current_work.value['best_share_hash'] is not None:
402 height, last = tracker.get_height_and_last(current_work.value['best_share_hash'])
403 att_s = p2pool.get_pool_attempts_per_second(tracker, current_work.value['best_share_hash'], args.net, min(height, 720))
404 return json.dumps(att_s)
405 return json.dumps(None)
408 height, last = tracker.get_height_and_last(current_work.value['best_share_hash'])
409 weights, total_weight = tracker.get_cumulative_weights(current_work.value['best_share_hash'], min(height, 720), 2**256)
411 for script in sorted(weights, key=lambda s: weights[s]):
412 res[script.encode('hex')] = weights[script]/total_weight
413 return json.dumps(res)
415 class WebInterface(resource.Resource):
416 def __init__(self, func, mime_type):
417 self.func, self.mime_type = func, mime_type
419 def render_GET(self, request):
420 request.setHeader('Content-Type', self.mime_type)
423 web_root.putChild('rate', WebInterface(get_rate, 'application/json'))
424 web_root.putChild('users', WebInterface(get_users, 'application/json'))
426 web_root.putChild('chain_img', WebInterface(lambda: draw.get(tracker, current_work.value['best_share_hash']), 'image/png'))
428 reactor.listenTCP(args.worker_port, server.Site(web_root))
435 tx_pool = expiring_dict.ExpiringDict(600, get_touches=False) # hash -> tx
436 get_raw_transaction = deferral.DeferredCacher(lambda tx_hash: bitcoind.rpc_getrawtransaction('%x' % tx_hash), expiring_dict.ExpiringDict(100))
439 def __init__(self, tx, seen_at_block):
440 self.hash = bitcoin.data.tx_type.hash256(tx)
442 self.seen_at_block = seen_at_block
443 self.mentions = set([bitcoin.data.tx_type.hash256(tx)] + [tx_in['previous_output']['hash'] for tx_in in tx['tx_ins']])
445 #print '%x %r' % (seen_at_block, tx)
446 #for mention in self.mentions:
447 # print '%x' % mention
449 self.parents_all_in_blocks = False
452 self.value_out = sum(txout['value'] for txout in self.tx['tx_outs'])
453 self._find_parents_in_blocks()
455 @defer.inlineCallbacks
456 def _find_parents_in_blocks(self):
457 for tx_in in self.tx['tx_ins']:
459 raw_transaction = yield get_raw_transaction(tx_in['previous_output']['hash'])
462 self.value_in += raw_transaction['tx']['txouts'][tx_in['previous_output']['index']]['value']
463 #print raw_transaction
464 if not raw_transaction['parent_blocks']:
466 self.parents_all_in_blocks = True
469 if not self.parents_all_in_blocks:
475 @defer.inlineCallbacks
478 assert isinstance(tx_hash, (int, long))
479 #print 'REQUESTING', tx_hash
480 tx = yield (yield factory.getProtocol()).get_tx(tx_hash)
482 tx_pool[bitcoin.data.tx_type.hash256(tx)] = Tx(tx, current_work.value['previous_block'])
484 log.err(None, 'Error handling tx:')
485 # disable for now, for testing impact on stales
486 #factory.new_tx.watch(new_tx)
488 def new_block(block_hash):
489 work_updated.happened()
490 factory.new_block.watch(new_block)
492 print 'Started successfully!'
495 ht.updated.watch(lambda x: set_real_work2())
497 @defer.inlineCallbacks
500 flag = work_updated.get_deferred()
502 yield set_real_work1()
505 yield defer.DeferredList([flag, deferral.sleep(random.expovariate(1/20))], fireOnOneCallback=True)
507 @defer.inlineCallbacks
514 yield deferral.sleep(random.expovariate(1/20))
519 counter = skiplists.CountsSkipList(tracker, run_identifier)
522 yield deferral.sleep(3)
524 if current_work.value['best_share_hash'] is not None:
525 height, last = tracker.get_height_and_last(current_work.value['best_share_hash'])
527 att_s = p2pool.get_pool_attempts_per_second(tracker, current_work.value['best_share_hash'], args.net)
528 weights, total_weight = tracker.get_cumulative_weights(current_work.value['best_share_hash'], min(height, 120), 2**100)
529 matching_in_chain = counter(current_work.value['best_share_hash'], height)
530 shares_in_chain = my_shares & matching_in_chain
531 stale_shares = my_shares - matching_in_chain
532 print 'Pool: %sH/s in %i shares Recent: %.02f%% >%sH/s Shares: %i (%i stale) Peers: %i' % (
535 weights.get(my_script, 0)/total_weight*100,
536 math.format(weights.get(my_script, 0)/total_weight*att_s),
537 len(shares_in_chain) + len(stale_shares),
541 #weights, total_weight = tracker.get_cumulative_weights(current_work.value['best_share_hash'], min(height, 100), 2**100)
542 #for k, v in weights.iteritems():
543 # print k.encode('hex'), v/total_weight
547 log.err(None, 'Fatal error:')
551 parser = argparse.ArgumentParser(description='p2pool (version %s)' % (p2pool_init.__version__,))
552 parser.add_argument('--version', action='version', version=p2pool_init.__version__)
553 parser.add_argument('--testnet',
554 help='use the testnet',
555 action='store_const', const=p2pool.Testnet, default=p2pool.Mainnet, dest='net')
556 parser.add_argument('--debug',
557 help='debugging mode',
558 action='store_const', const=True, default=False, dest='debug')
559 parser.add_argument('-a', '--address',
560 help='generate to this address (defaults to requesting one from bitcoind)',
561 type=str, action='store', default=None, dest='address')
562 parser.add_argument('--charts',
563 help='generate charts on the web interface (requires PIL and pygame)',
564 action='store_const', const=True, default=False, dest='charts')
566 p2pool_group = parser.add_argument_group('p2pool interface')
567 p2pool_group.add_argument('--p2pool-port', metavar='PORT',
568 help='use TCP port PORT to listen for connections (default: 9333 normally, 19333 for testnet) (forward this port from your router!)',
569 type=int, action='store', default=None, dest='p2pool_port')
570 p2pool_group.add_argument('-n', '--p2pool-node', metavar='ADDR[:PORT]',
571 help='connect to existing p2pool node at ADDR listening on TCP port PORT (defaults to 9333 normally, 19333 for testnet), in addition to builtin addresses',
572 type=str, action='append', default=[], dest='p2pool_nodes')
573 parser.add_argument('-l', '--low-bandwidth',
574 help='trade lower bandwidth usage for higher latency (reduced efficiency)',
575 action='store_true', default=False, dest='low_bandwidth')
576 parser.add_argument('--disable-upnp',
577 help='''don't attempt to forward port 9333 (19333 for testnet) from the WAN to this computer using UPnP''',
578 action='store_false', default=True, dest='upnp')
580 worker_group = parser.add_argument_group('worker interface')
581 worker_group.add_argument('-w', '--worker-port', metavar='PORT',
582 help='listen on PORT for RPC connections from miners asking for work and providing responses (default: 9332)',
583 type=int, action='store', default=9332, dest='worker_port')
585 bitcoind_group = parser.add_argument_group('bitcoind interface')
586 bitcoind_group.add_argument('--bitcoind-address', metavar='BITCOIND_ADDRESS',
587 help='connect to a bitcoind at this address (default: 127.0.0.1)',
588 type=str, action='store', default='127.0.0.1', dest='bitcoind_address')
589 bitcoind_group.add_argument('--bitcoind-rpc-port', metavar='BITCOIND_RPC_PORT',
590 help='connect to a bitcoind at this port over the RPC interface - used to get the current highest block via getwork (default: 8332)',
591 type=int, action='store', default=8332, dest='bitcoind_rpc_port')
592 bitcoind_group.add_argument('--bitcoind-p2p-port', metavar='BITCOIND_P2P_PORT',
593 help='connect to a bitcoind at this port over the p2p interface - used to submit blocks and get the pubkey to generate to via an IP transaction (default: 8333 normally. 18333 for testnet)',
594 type=int, action='store', default=None, dest='bitcoind_p2p_port')
596 bitcoind_group.add_argument(metavar='BITCOIND_RPC_USERNAME',
597 help='bitcoind RPC interface username',
598 type=str, action='store', dest='bitcoind_rpc_username')
599 bitcoind_group.add_argument(metavar='BITCOIND_RPC_PASSWORD',
600 help='bitcoind RPC interface password',
601 type=str, action='store', dest='bitcoind_rpc_password')
603 args = parser.parse_args()
606 p2pool_init.DEBUG = True
607 class TeePipe(object):
608 def __init__(self, outputs):
609 self.outputs = outputs
610 def write(self, data):
611 for output in self.outputs:
614 for output in self.outputs:
616 class TimestampingPipe(object):
617 def __init__(self, inner_file):
618 self.inner_file = inner_file
621 def write(self, data):
622 buf = self.buf + data
623 lines = buf.split('\n')
624 for line in lines[:-1]:
625 self.inner_file.write('%s %s\n' % (datetime.datetime.now().strftime("%H:%M:%S.%f"), line))
626 self.inner_file.flush()
630 sys.stdout = sys.stderr = log.DefaultObserver.stderr = TimestampingPipe(TeePipe([sys.stderr, open(os.path.join(os.path.dirname(sys.argv[0]), 'debug.log'), 'w')]))
632 if args.bitcoind_p2p_port is None:
633 args.bitcoind_p2p_port = args.net.BITCOIN_P2P_PORT
635 if args.p2pool_port is None:
636 args.p2pool_port = args.net.P2P_PORT
638 if args.address is not None:
640 args.pubkey_hash = bitcoin.data.address_to_pubkey_hash(args.address, args.net)
642 raise ValueError('error parsing address: ' + repr(e))
644 args.pubkey_hash = None
646 reactor.callWhenRunning(main, args)