3 from __future__ import division
16 from twisted.internet import defer, reactor
17 from twisted.web import server, resource
18 from twisted.python import log
20 import bitcoin.p2p, bitcoin.getwork, bitcoin.data
21 from util import db, expiring_dict, jsonrpc, variable, deferral, math
22 from . import p2p, worker_interface, skiplists, draw
23 import p2pool.data as p2pool
24 import p2pool as p2pool_init
26 @deferral.retry('Error getting work from bitcoind:', 3)
27 @defer.inlineCallbacks
28 def getwork(bitcoind):
29 # a block could arrive in between these two queries
30 getwork_df, height_df = bitcoind.rpc_getwork(), bitcoind.rpc_getblocknumber()
32 getwork, height = bitcoin.getwork.BlockAttempt.from_getwork((yield getwork_df)), (yield height_df)
34 # get rid of residual errors
35 getwork_df.addErrback(lambda fail: None)
36 height_df.addErrback(lambda fail: None)
37 defer.returnValue((getwork, height))
39 @deferral.retry('Error getting payout script from bitcoind:', 1)
40 @defer.inlineCallbacks
41 def get_payout_script(factory):
42 res = yield (yield factory.getProtocol()).check_order(order=bitcoin.p2p.Protocol.null_order)
43 if res['reply'] == 'success':
44 defer.returnValue(res['script'])
45 elif res['reply'] == 'denied':
46 defer.returnValue(None)
48 raise ValueError('Unexpected reply: %r' % (res,))
50 @deferral.retry('Error creating payout script:', 10)
51 @defer.inlineCallbacks
52 def get_payout_script2(bitcoind, net):
53 defer.returnValue(bitcoin.data.pubkey_hash_to_script2(bitcoin.data.address_to_pubkey_hash((yield bitcoind.rpc_getaccountaddress('p2pool')), net)))
55 @defer.inlineCallbacks
58 print 'p2pool (version %s)' % (p2pool_init.__version__,)
61 # connect to bitcoind over JSON-RPC and do initial getwork
62 url = 'http://%s:%i/' % (args.bitcoind_address, args.bitcoind_rpc_port)
63 print '''Testing bitcoind RPC connection to '%s' with username '%s'...''' % (url, args.bitcoind_rpc_username)
64 bitcoind = jsonrpc.Proxy(url, (args.bitcoind_rpc_username, args.bitcoind_rpc_password))
65 temp_work, temp_height = yield getwork(bitcoind)
67 print ' Current block hash: %x height: %i' % (temp_work.previous_block, temp_height)
70 # connect to bitcoind over bitcoin-p2p and do checkorder to get pubkey to send payouts to
71 print '''Testing bitcoind P2P connection to '%s:%s'...''' % (args.bitcoind_address, args.bitcoind_p2p_port)
72 factory = bitcoin.p2p.ClientFactory(args.net)
73 reactor.connectTCP(args.bitcoind_address, args.bitcoind_p2p_port, factory)
74 my_script = yield get_payout_script(factory)
75 if args.pubkey_hash is None:
77 print 'IP transaction denied ... falling back to sending to address.'
78 my_script = yield get_payout_script2(bitcoind, args.net)
80 my_script = bitcoin.data.pubkey_hash_to_script2(args.pubkey_hash)
82 print ' Payout script:', my_script.encode('hex')
85 ht = bitcoin.p2p.HeightTracker(factory)
87 tracker = p2pool.OkayTracker(args.net)
88 chains = expiring_dict.ExpiringDict(300)
89 def get_chain(chain_id_data):
90 return chains.setdefault(chain_id_data, Chain(chain_id_data))
92 peer_heads = expiring_dict.ExpiringDict(300) # hash -> peers that know of it
94 # information affecting work that should trigger a long-polling update
95 current_work = variable.Variable(None)
96 # information affecting work that should not trigger a long-polling update
97 current_work2 = variable.Variable(None)
99 work_updated = variable.Event()
100 tracker_updated = variable.Event()
102 requested = expiring_dict.ExpiringDict(300)
104 @defer.inlineCallbacks
105 def set_real_work1():
106 work, height = yield getwork(bitcoind)
107 # XXX call tracker_updated
108 current_work.set(dict(
109 version=work.version,
110 previous_block=work.previous_block,
113 best_share_hash=current_work.value['best_share_hash'] if current_work.value is not None else None,
115 current_work2.set(dict(
116 clock_offset=time.time() - work.timestamp,
119 @defer.inlineCallbacks
120 def set_real_work2():
121 best, desired = yield tracker.think(ht, current_work.value['previous_block'], time.time() - current_work2.value['clock_offset'])
123 t = dict(current_work.value)
124 t['best_share_hash'] = best
127 for peer2, share_hash in desired:
128 if share_hash not in tracker.tails: # was received in the time tracker.think was running
130 last_request_time, count = requested.get(share_hash, (None, 0))
131 if last_request_time is not None and last_request_time - 5 < time.time() < last_request_time + 10 * 1.5**count:
133 potential_peers = set()
134 for head in tracker.tails[share_hash]:
135 potential_peers.update(peer_heads.get(head, set()))
136 potential_peers = [peer for peer in potential_peers if peer.connected2]
137 if count == 0 and peer2 is not None and peer2.connected2:
140 peer = random.choice(potential_peers) if potential_peers and random.random() > .2 else peer2
144 print 'Requesting parent share %s from %s' % (p2pool.format_hash(share_hash), '%s:%i' % peer.addr)
148 stops=list(set(tracker.heads) | set(
149 tracker.get_nth_parent_hash(head, min(max(0, tracker.get_height_and_last(head)[0] - 1), 10)) for head in tracker.heads
152 requested[share_hash] = time.time(), count + 1
154 print 'Initializing work...'
155 yield set_real_work1()
156 yield set_real_work2()
159 start_time = time.time() - current_work2.value['clock_offset']
161 # setup p2p logic and join p2pool network
163 def share_share(share, ignore_peer=None):
164 for peer in p2p_node.peers.itervalues():
165 if peer is ignore_peer:
167 if p2pool_init.DEBUG:
168 print "Sending share %s to %r" % (p2pool.format_hash(share.hash), peer.addr)
169 peer.send_shares([share])
172 def p2p_shares(shares, peer=None):
174 print 'Processing %i shares...' % (len(shares),)
178 if share.hash in tracker.shares:
179 #print 'Got duplicate share, ignoring. Hash: %s' % (p2pool.format_hash(share.hash),)
183 #print 'Received share %s from %r' % (p2pool.format_hash(share.hash), share.peer.addr if share.peer is not None else None)
186 #for peer2, share_hash in desired:
187 # print 'Requesting parent share %x' % (share_hash,)
188 # peer2.send_getshares(hashes=[share_hash], parents=2000)
190 if share.bitcoin_hash <= share.header['target']:
192 print 'GOT BLOCK! Passing to bitcoind! %s bitcoin: %x' % (p2pool.format_hash(share.hash), share.bitcoin_hash,)
194 if factory.conn.value is not None:
195 factory.conn.value.send_block(block=share.as_block(tracker, args.net))
197 print 'No bitcoind connection! Erp!'
199 if shares and peer is not None:
200 peer_heads.setdefault(shares[0].hash, set()).add(peer)
203 tracker_updated.happened()
206 print '... done processing %i shares. Have: %i/~%i' % (len(shares), len(tracker.shares), 2*args.net.CHAIN_LENGTH)
208 def p2p_share_hashes(share_hashes, peer):
210 for share_hash in share_hashes:
211 if share_hash in tracker.shares:
212 pass # print 'Got share hash, already have, ignoring. Hash: %s' % (p2pool.format_hash(share_hash),)
214 print 'Got share hash, requesting! Hash: %s' % (p2pool.format_hash(share_hash),)
215 get_hashes.append(share_hash)
217 if share_hashes and peer is not None:
218 peer_heads.setdefault(share_hashes[0], set()).add(peer)
220 peer.send_getshares(hashes=get_hashes, parents=0, stops=[])
222 def p2p_get_shares(share_hashes, parents, stops, peer):
223 parents = min(parents, 1000//len(share_hashes))
226 for share_hash in share_hashes:
227 for share in itertools.islice(tracker.get_chain_known(share_hash), parents + 1):
228 if share.hash in stops:
231 peer.send_shares(shares, full=True)
233 print 'Joining p2pool network using TCP port %i...' % (args.p2pool_port,)
237 ip, port = x.split(':')
240 return x, args.net.P2P_PORT
243 ('72.14.191.28', args.net.P2P_PORT),
244 ('62.204.197.159', args.net.P2P_PORT),
247 nodes.append(((yield reactor.resolve('p2pool.forre.st')), args.net.P2P_PORT))
249 log.err(None, 'Error resolving bootstrap node IP:')
252 current_work=current_work,
253 port=args.p2pool_port,
255 addr_store=db.SQLiteDict(sqlite3.connect(os.path.join(os.path.dirname(sys.argv[0]), 'addrs.dat'), isolation_level=None), args.net.ADDRS_TABLE),
256 mode=0 if args.low_bandwidth else 1,
257 preferred_addrs=map(parse, args.p2pool_nodes) + nodes,
259 p2p_node.handle_shares = p2p_shares
260 p2p_node.handle_share_hashes = p2p_share_hashes
261 p2p_node.handle_get_shares = p2p_get_shares
265 # send share when the chain changes to their chain
266 def work_changed(new_work):
267 #print 'Work changed:', new_work
268 for share in tracker.get_chain_known(new_work['best_share_hash']):
271 share_share(share, share.peer)
272 current_work.changed.watch(work_changed)
277 # start listening for workers with a JSON-RPC server
279 print 'Listening for workers on port %i...' % (args.worker_port,)
283 merkle_root_to_transactions = expiring_dict.ExpiringDict(300)
284 run_identifier = struct.pack('<Q', random.randrange(2**64))
286 def compute(state, payout_script):
287 if payout_script is None:
288 payout_script = my_script
289 if state['best_share_hash'] is None and args.net.PERSIST:
290 raise jsonrpc.Error(-12345, u'p2pool is downloading shares')
291 pre_extra_txs = [tx for tx in tx_pool.itervalues() if tx.is_good()]
292 pre_extra_txs = pre_extra_txs[:2**16 - 1] # merkle_branch limit
295 for tx in pre_extra_txs:
296 this_size = len(bitcoin.data.tx_type.pack(tx.tx))
297 if size + this_size > 500000:
302 # XXX assuming generate_tx is smallish here..
303 generate_tx = p2pool.generate_transaction(
305 previous_share_hash=state['best_share_hash'],
306 new_script=payout_script,
307 subsidy=(50*100000000 >> (state['height'] + 1)//210000) + sum(tx.value_in - tx.value_out for tx in extra_txs),
308 nonce=run_identifier + struct.pack('<Q', random.randrange(2**64)),
309 block_target=state['target'],
312 print 'Generating! Difficulty: %.06f Payout if block: %.6f BTC' % (0xffff*2**208/p2pool.coinbase_type.unpack(generate_tx['tx_ins'][0]['script'])['share_data']['target'], generate_tx['tx_outs'][-1]['value']*1e-8)
313 #print 'Target: %x' % (p2pool.coinbase_type.unpack(generate_tx['tx_ins'][0]['script'])['share_data']['target'],)
314 #, have', shares.count(my_script) - 2, 'share(s) in the current chain. Fee:', sum(tx.value_in - tx.value_out for tx in extra_txs)/100000000
315 transactions = [generate_tx] + [tx.tx for tx in extra_txs]
316 merkle_root = bitcoin.data.merkle_hash(transactions)
317 merkle_root_to_transactions[merkle_root] = transactions # will stay for 1000 seconds
319 timestamp = int(time.time() - current_work2.value['clock_offset'])
320 if state['best_share_hash'] is not None:
321 timestamp2 = math.median((s.timestamp for s in itertools.islice(tracker.get_chain_to_root(state['best_share_hash']), 11)), use_float=False) + 1
322 if timestamp2 > timestamp:
323 print 'Toff', timestamp2 - timestamp
324 timestamp = timestamp2
325 target2 = p2pool.coinbase_type.unpack(generate_tx['tx_ins'][0]['script'])['share_data']['target']
326 times[p2pool.coinbase_type.unpack(generate_tx['tx_ins'][0]['script'])['share_data']['nonce']] = time.time()
327 #print 'SENT', 2**256//p2pool.coinbase_type.unpack(generate_tx['tx_ins'][0]['script'])['share_data']['target']
328 return bitcoin.getwork.BlockAttempt(state['version'], state['previous_block'], merkle_root, timestamp, state['target'], target2)
333 def got_response(data):
335 # match up with transactions
336 header = bitcoin.getwork.decode_data(data)
337 transactions = merkle_root_to_transactions.get(header['merkle_root'], None)
338 if transactions is None:
339 print '''Couldn't link returned work's merkle root with its transactions - should only happen if you recently restarted p2pool'''
341 block = dict(header=header, txs=transactions)
342 hash_ = bitcoin.data.block_header_type.hash256(block['header'])
343 if hash_ <= block['header']['target'] or p2pool_init.DEBUG:
345 print 'GOT BLOCK! Passing to bitcoind! bitcoin: %x' % (hash_,)
347 if factory.conn.value is not None:
348 factory.conn.value.send_block(block=block)
350 print 'No bitcoind connection! Erp!'
351 target = p2pool.coinbase_type.unpack(transactions[0]['tx_ins'][0]['script'])['share_data']['target']
353 print 'Received invalid share from worker - %x/%x' % (hash_, target)
355 share = p2pool.Share.from_block(block)
356 my_shares.add(share.hash)
357 print 'GOT SHARE! %s prev %s age %.2fs' % (p2pool.format_hash(share.hash), p2pool.format_hash(share.previous_hash), time.time() - times[share.nonce]) + (' DEAD ON ARRIVAL' if share.previous_hash != current_work.value['best_share_hash'] else '')
358 good = share.previous_hash == current_work.value['best_share_hash']
359 # maybe revert back to tracker being non-blocking so 'good' can be more accurate?
361 # eg. good = share.hash == current_work.value['best_share_hash'] here
364 log.err(None, 'Error processing data received from worker:')
367 web_root = worker_interface.WorkerInterface(current_work, compute, got_response, args.net)
370 if current_work.value['best_share_hash'] is not None:
371 height, last = tracker.get_height_and_last(current_work.value['best_share_hash'])
372 att_s = p2pool.get_pool_attempts_per_second(tracker, current_work.value['best_share_hash'], args.net, min(height, 720))
373 return json.dumps(att_s)
374 return json.dumps(None)
377 height, last = tracker.get_height_and_last(current_work.value['best_share_hash'])
378 weights, total_weight = tracker.get_cumulative_weights(current_work.value['best_share_hash'], min(height, 720), 2**256)
380 for script in sorted(weights, key=lambda s: weights[s]):
381 res[script.encode('hex')] = weights[script]/total_weight
382 return json.dumps(res)
384 class WebInterface(resource.Resource):
385 def __init__(self, func, mime_type):
386 self.func, self.mime_type = func, mime_type
388 def render_GET(self, request):
389 request.setHeader('Content-Type', self.mime_type)
392 web_root.putChild('rate', WebInterface(get_rate, 'application/json'))
393 web_root.putChild('users', WebInterface(get_users, 'application/json'))
394 web_root.putChild('chain_img', WebInterface(lambda: draw.get(tracker, current_work.value['best_share_hash']), 'image/png'))
396 reactor.listenTCP(args.worker_port, server.Site(web_root))
403 tx_pool = expiring_dict.ExpiringDict(600, get_touches=False) # hash -> tx
404 get_raw_transaction = deferral.DeferredCacher(lambda tx_hash: bitcoind.rpc_getrawtransaction('%x' % tx_hash), expiring_dict.ExpiringDict(100))
407 def __init__(self, tx, seen_at_block):
408 self.hash = bitcoin.data.tx_type.hash256(tx)
410 self.seen_at_block = seen_at_block
411 self.mentions = set([bitcoin.data.tx_type.hash256(tx)] + [tx_in['previous_output']['hash'] for tx_in in tx['tx_ins']])
413 #print '%x %r' % (seen_at_block, tx)
414 #for mention in self.mentions:
415 # print '%x' % mention
417 self.parents_all_in_blocks = False
420 self.value_out = sum(txout['value'] for txout in self.tx['tx_outs'])
421 self._find_parents_in_blocks()
423 @defer.inlineCallbacks
424 def _find_parents_in_blocks(self):
425 for tx_in in self.tx['tx_ins']:
427 raw_transaction = yield get_raw_transaction(tx_in['previous_output']['hash'])
430 self.value_in += raw_transaction['tx']['txouts'][tx_in['previous_output']['index']]['value']
431 #print raw_transaction
432 if not raw_transaction['parent_blocks']:
434 self.parents_all_in_blocks = True
437 if not self.parents_all_in_blocks:
443 @defer.inlineCallbacks
446 assert isinstance(tx_hash, (int, long))
447 #print 'REQUESTING', tx_hash
448 tx = yield (yield factory.getProtocol()).get_tx(tx_hash)
450 tx_pool[bitcoin.data.tx_type.hash256(tx)] = Tx(tx, current_work.value['previous_block'])
452 log.err(None, 'Error handling tx:')
453 # disable for now, for testing impact on stales
454 #factory.new_tx.watch(new_tx)
456 def new_block(block_hash):
457 work_updated.happened()
458 factory.new_block.watch(new_block)
460 print 'Started successfully!'
463 @defer.inlineCallbacks
466 flag = work_updated.get_deferred()
468 yield set_real_work1()
471 yield defer.DeferredList([flag, deferral.sleep(random.expovariate(1/1))], fireOnOneCallback=True)
473 @defer.inlineCallbacks
476 flag = tracker_updated.get_deferred()
478 yield set_real_work2()
481 yield defer.DeferredList([flag, deferral.sleep(random.expovariate(1/1))], fireOnOneCallback=True)
486 counter = skiplists.CountsSkipList(tracker, run_identifier)
489 yield deferral.sleep(random.expovariate(1/1))
491 if current_work.value['best_share_hash'] is not None:
492 height, last = tracker.get_height_and_last(current_work.value['best_share_hash'])
494 att_s = p2pool.get_pool_attempts_per_second(tracker, current_work.value['best_share_hash'], args.net)
495 weights, total_weight = tracker.get_cumulative_weights(current_work.value['best_share_hash'], min(height, 120), 2**100)
496 matching_in_chain = counter(current_work.value['best_share_hash'], height)
497 shares_in_chain = my_shares & matching_in_chain
498 stale_shares = my_shares - matching_in_chain
499 print 'Pool: %sH/s in %i shares Recent: %.02f%% >%sH/s Shares: %i (%i stale) Peers: %i' % (
502 weights.get(my_script, 0)/total_weight*100,
503 math.format(weights.get(my_script, 0)/total_weight*att_s),
504 len(shares_in_chain) + len(stale_shares),
508 #weights, total_weight = tracker.get_cumulative_weights(current_work.value['best_share_hash'], min(height, 100), 2**100)
509 #for k, v in weights.iteritems():
510 # print k.encode('hex'), v/total_weight
514 log.err(None, 'Fatal error:')
518 parser = argparse.ArgumentParser(description='p2pool (version %s)' % (p2pool_init.__version__,))
519 parser.add_argument('--version', action='version', version=p2pool_init.__version__)
520 parser.add_argument('--testnet',
521 help='use the testnet',
522 action='store_const', const=p2pool.Testnet, default=p2pool.Mainnet, dest='net')
523 parser.add_argument('--debug',
524 help='debugging mode',
525 action='store_const', const=True, default=False, dest='debug')
526 parser.add_argument('-a', '--address',
527 help='generate to this address (defaults to requesting one from bitcoind)',
528 type=str, action='store', default=None, dest='address')
530 p2pool_group = parser.add_argument_group('p2pool interface')
531 p2pool_group.add_argument('--p2pool-port', metavar='PORT',
532 help='use TCP port PORT to listen for connections (default: 9333 normally, 19333 for testnet) (forward this port from your router!)',
533 type=int, action='store', default=None, dest='p2pool_port')
534 p2pool_group.add_argument('-n', '--p2pool-node', metavar='ADDR[:PORT]',
535 help='connect to existing p2pool node at ADDR listening on TCP port PORT (defaults to 9333 normally, 19333 for testnet), in addition to builtin addresses',
536 type=str, action='append', default=[], dest='p2pool_nodes')
537 parser.add_argument('-l', '--low-bandwidth',
538 help='trade lower bandwidth usage for higher latency (reduced efficiency)',
539 action='store_true', default=False, dest='low_bandwidth')
541 worker_group = parser.add_argument_group('worker interface')
542 worker_group.add_argument('-w', '--worker-port', metavar='PORT',
543 help='listen on PORT for RPC connections from miners asking for work and providing responses (default: 9332)',
544 type=int, action='store', default=9332, dest='worker_port')
546 bitcoind_group = parser.add_argument_group('bitcoind interface')
547 bitcoind_group.add_argument('--bitcoind-address', metavar='BITCOIND_ADDRESS',
548 help='connect to a bitcoind at this address (default: 127.0.0.1)',
549 type=str, action='store', default='127.0.0.1', dest='bitcoind_address')
550 bitcoind_group.add_argument('--bitcoind-rpc-port', metavar='BITCOIND_RPC_PORT',
551 help='connect to a bitcoind at this port over the RPC interface - used to get the current highest block via getwork (default: 8332)',
552 type=int, action='store', default=8332, dest='bitcoind_rpc_port')
553 bitcoind_group.add_argument('--bitcoind-p2p-port', metavar='BITCOIND_P2P_PORT',
554 help='connect to a bitcoind at this port over the p2p interface - used to submit blocks and get the pubkey to generate to via an IP transaction (default: 8333 normally. 18333 for testnet)',
555 type=int, action='store', default=None, dest='bitcoind_p2p_port')
557 bitcoind_group.add_argument(metavar='BITCOIND_RPC_USERNAME',
558 help='bitcoind RPC interface username',
559 type=str, action='store', dest='bitcoind_rpc_username')
560 bitcoind_group.add_argument(metavar='BITCOIND_RPC_PASSWORD',
561 help='bitcoind RPC interface password',
562 type=str, action='store', dest='bitcoind_rpc_password')
564 args = parser.parse_args()
567 p2pool_init.DEBUG = True
568 class TeePipe(object):
569 def __init__(self, outputs):
570 self.outputs = outputs
571 def write(self, data):
572 for output in self.outputs:
575 for output in self.outputs:
577 class TimestampingPipe(object):
578 def __init__(self, inner_file):
579 self.inner_file = inner_file
582 def write(self, data):
583 buf = self.buf + data
584 lines = buf.split('\n')
585 for line in lines[:-1]:
586 self.inner_file.write('%s %s\n' % (datetime.datetime.now().strftime("%H:%M:%S.%f"), line))
587 self.inner_file.flush()
591 sys.stdout = sys.stderr = log.DefaultObserver.stderr = TimestampingPipe(TeePipe([sys.stderr, open(os.path.join(os.path.dirname(sys.argv[0]), 'debug.log'), 'w')]))
593 if args.bitcoind_p2p_port is None:
594 args.bitcoind_p2p_port = args.net.BITCOIN_P2P_PORT
596 if args.p2pool_port is None:
597 args.p2pool_port = args.net.P2P_PORT
599 if args.address is not None:
601 args.pubkey_hash = bitcoin.data.address_to_pubkey_hash(args.address, args.net)
603 raise ValueError('error parsing address: ' + repr(e))
605 args.pubkey_hash = None
607 reactor.callWhenRunning(main, args)