3 from __future__ import division
16 from twisted.internet import defer, reactor
17 from twisted.web import server, resource
18 from twisted.python import log
20 import bitcoin.p2p, bitcoin.getwork, bitcoin.data
21 from util import db, expiring_dict, jsonrpc, variable, deferral, math
22 from . import p2p, worker_interface, skiplists
23 import p2pool.data as p2pool
24 import p2pool as p2pool_init
26 @deferral.retry('Error getting work from bitcoind:', 3)
27 @defer.inlineCallbacks
28 def getwork(bitcoind):
29 # a block could arrive in between these two queries
30 getwork_df, height_df = bitcoind.rpc_getwork(), bitcoind.rpc_getblocknumber()
32 getwork, height = bitcoin.getwork.BlockAttempt.from_getwork((yield getwork_df)), (yield height_df)
34 # get rid of residual errors
35 getwork_df.addErrback(lambda fail: None)
36 height_df.addErrback(lambda fail: None)
37 defer.returnValue((getwork, height))
39 @deferral.retry('Error getting payout script from bitcoind:', 1)
40 @defer.inlineCallbacks
41 def get_payout_script(factory):
42 res = yield (yield factory.getProtocol()).check_order(order=bitcoin.p2p.Protocol.null_order)
43 if res['reply'] == 'success':
44 defer.returnValue(res['script'])
45 elif res['reply'] == 'denied':
46 defer.returnValue(None)
48 raise ValueError('Unexpected reply: %r' % (res,))
50 @deferral.retry('Error creating payout script:', 10)
51 @defer.inlineCallbacks
52 def get_payout_script2(bitcoind, net):
53 defer.returnValue(bitcoin.data.pubkey_hash_to_script2(bitcoin.data.address_to_pubkey_hash((yield bitcoind.rpc_getaccountaddress('p2pool')), net)))
55 @defer.inlineCallbacks
58 print 'p2pool (version %s)' % (p2pool_init.__version__,)
61 # connect to bitcoind over JSON-RPC and do initial getwork
62 url = 'http://%s:%i/' % (args.bitcoind_address, args.bitcoind_rpc_port)
63 print '''Testing bitcoind RPC connection to '%s' with username '%s'...''' % (url, args.bitcoind_rpc_username)
64 bitcoind = jsonrpc.Proxy(url, (args.bitcoind_rpc_username, args.bitcoind_rpc_password))
65 temp_work, temp_height = yield getwork(bitcoind)
67 print ' Current block hash: %x height: %i' % (temp_work.previous_block, temp_height)
70 # connect to bitcoind over bitcoin-p2p and do checkorder to get pubkey to send payouts to
71 print '''Testing bitcoind P2P connection to '%s:%s'...''' % (args.bitcoind_address, args.bitcoind_p2p_port)
72 factory = bitcoin.p2p.ClientFactory(args.net)
73 reactor.connectTCP(args.bitcoind_address, args.bitcoind_p2p_port, factory)
74 my_script = yield get_payout_script(factory)
75 if args.pubkey_hash is None:
77 print 'IP transaction denied ... falling back to sending to address.'
78 my_script = yield get_payout_script2(bitcoind, args.net)
80 my_script = bitcoin.data.pubkey_hash_to_script2(args.pubkey_hash)
82 print ' Payout script:', my_script.encode('hex')
85 ht = bitcoin.p2p.HeightTracker(factory)
87 tracker = p2pool.OkayTracker(args.net)
88 chains = expiring_dict.ExpiringDict(300)
89 def get_chain(chain_id_data):
90 return chains.setdefault(chain_id_data, Chain(chain_id_data))
92 peer_heads = expiring_dict.ExpiringDict(300) # hash -> peers that know of it
94 # information affecting work that should trigger a long-polling update
95 current_work = variable.Variable(None)
96 # information affecting work that should not trigger a long-polling update
97 current_work2 = variable.Variable(None)
99 work_updated = variable.Event()
100 tracker_updated = variable.Event()
102 requested = expiring_dict.ExpiringDict(300)
104 @defer.inlineCallbacks
105 def set_real_work1():
106 work, height = yield getwork(bitcoind)
107 # XXX call tracker_updated
108 current_work.set(dict(
109 version=work.version,
110 previous_block=work.previous_block,
113 best_share_hash=current_work.value['best_share_hash'] if current_work.value is not None else None,
115 current_work2.set(dict(
116 clock_offset=time.time() - work.timestamp,
119 @defer.inlineCallbacks
120 def set_real_work2():
121 best, desired = yield tracker.think(ht, current_work.value['previous_block'], time.time() - current_work2.value['clock_offset'])
123 t = dict(current_work.value)
124 t['best_share_hash'] = best
127 for peer2, share_hash in desired:
128 if share_hash not in tracker.tails: # was received in the time tracker.think was running
130 last_request_time, count = requested.get(share_hash, (None, 0))
131 if last_request_time is not None and last_request_time - 5 < time.time() < last_request_time + 10 * 1.5**count:
133 potential_peers = set()
134 for head in tracker.tails[share_hash]:
135 potential_peers.update(peer_heads.get(head, set()))
136 potential_peers = [peer for peer in potential_peers if peer.connected2]
137 if count == 0 and peer2 is not None and peer2.connected2:
140 peer = random.choice(potential_peers) if potential_peers and random.random() > .2 else peer2
144 print 'Requesting parent share %s from %s' % (p2pool.format_hash(share_hash), '%s:%i' % peer.addr)
148 stops=list(set(tracker.heads) | set(
149 tracker.get_nth_parent_hash(head, min(max(0, tracker.get_height_and_last(head)[0] - 1), 10)) for head in tracker.heads
152 requested[share_hash] = time.time(), count + 1
154 print 'Initializing work...'
155 yield set_real_work1()
156 yield set_real_work2()
159 start_time = time.time() - current_work2.value['clock_offset']
161 # setup p2p logic and join p2pool network
163 def share_share(share, ignore_peer=None):
164 for peer in p2p_node.peers.itervalues():
165 if peer is ignore_peer:
167 if p2pool_init.DEBUG:
168 print "Sending share %s to %r" % (p2pool.format_hash(share.hash), peer.addr)
169 peer.send_shares([share])
172 def p2p_shares(shares, peer=None):
174 print 'Processing %i shares...' % (len(shares),)
178 if share.hash in tracker.shares:
179 #print 'Got duplicate share, ignoring. Hash: %s' % (p2pool.format_hash(share.hash),)
183 #print 'Received share %s from %r' % (p2pool.format_hash(share.hash), share.peer.addr if share.peer is not None else None)
186 #for peer2, share_hash in desired:
187 # print 'Requesting parent share %x' % (share_hash,)
188 # peer2.send_getshares(hashes=[share_hash], parents=2000)
190 if share.bitcoin_hash <= share.header['target']:
192 print 'GOT BLOCK! Passing to bitcoind! %s bitcoin: %x' % (p2pool.format_hash(share.hash), share.bitcoin_hash,)
194 if factory.conn.value is not None:
195 factory.conn.value.send_block(block=share.as_block(tracker, args.net))
197 print 'No bitcoind connection! Erp!'
199 if shares and peer is not None:
200 peer_heads.setdefault(shares[0].hash, set()).add(peer)
203 tracker_updated.happened()
206 print '... done processing %i shares. Have: %i/~%i' % (len(shares), len(tracker.shares), 2*args.net.CHAIN_LENGTH)
208 def p2p_share_hashes(share_hashes, peer):
210 for share_hash in share_hashes:
211 if share_hash in tracker.shares:
212 pass # print 'Got share hash, already have, ignoring. Hash: %s' % (p2pool.format_hash(share_hash),)
214 print 'Got share hash, requesting! Hash: %s' % (p2pool.format_hash(share_hash),)
215 get_hashes.append(share_hash)
217 if share_hashes and peer is not None:
218 peer_heads.setdefault(share_hashes[0], set()).add(peer)
220 peer.send_getshares(hashes=get_hashes, parents=0, stops=[])
222 def p2p_get_shares(share_hashes, parents, stops, peer):
223 parents = min(parents, 1000//len(share_hashes))
226 for share_hash in share_hashes:
227 for share in itertools.islice(tracker.get_chain_known(share_hash), parents + 1):
228 if share.hash in stops:
231 peer.send_shares(shares, full=True)
233 print 'Joining p2pool network using TCP port %i...' % (args.p2pool_port,)
237 ip, port = x.split(':')
240 return x, args.net.P2P_PORT
243 ('72.14.191.28', args.net.P2P_PORT),
244 ('62.204.197.159', args.net.P2P_PORT),
247 nodes.append(((yield reactor.resolve('p2pool.forre.st')), args.net.P2P_PORT))
249 log.err(None, 'Error resolving bootstrap node IP:')
252 current_work=current_work,
253 port=args.p2pool_port,
255 addr_store=db.SQLiteDict(sqlite3.connect(os.path.join(os.path.dirname(sys.argv[0]), 'addrs.dat'), isolation_level=None), args.net.ADDRS_TABLE),
256 mode=0 if args.low_bandwidth else 1,
257 preferred_addrs=map(parse, args.p2pool_nodes) + nodes,
259 p2p_node.handle_shares = p2p_shares
260 p2p_node.handle_share_hashes = p2p_share_hashes
261 p2p_node.handle_get_shares = p2p_get_shares
265 # send share when the chain changes to their chain
266 def work_changed(new_work):
267 #print 'Work changed:', new_work
268 for share in tracker.get_chain_known(new_work['best_share_hash']):
271 share_share(share, share.peer)
272 current_work.changed.watch(work_changed)
277 # start listening for workers with a JSON-RPC server
279 print 'Listening for workers on port %i...' % (args.worker_port,)
283 merkle_root_to_transactions = expiring_dict.ExpiringDict(300)
284 run_identifier = struct.pack('<Q', random.randrange(2**64))
286 def compute(state, payout_script):
287 if payout_script is None:
288 payout_script = my_script
289 if state['best_share_hash'] is None and args.net.PERSIST:
290 raise jsonrpc.Error(-12345, u'p2pool is downloading shares')
291 pre_extra_txs = [tx for tx in tx_pool.itervalues() if tx.is_good()]
292 pre_extra_txs = pre_extra_txs[:2**16 - 1] # merkle_branch limit
295 for tx in pre_extra_txs:
296 this_size = len(bitcoin.data.tx_type.pack(tx.tx))
297 if size + this_size > 500000:
302 # XXX assuming generate_tx is smallish here..
303 generate_tx = p2pool.generate_transaction(
305 previous_share_hash=state['best_share_hash'],
306 new_script=payout_script,
307 subsidy=(50*100000000 >> (state['height'] + 1)//210000) + sum(tx.value_in - tx.value_out for tx in extra_txs),
308 nonce=run_identifier + struct.pack('<Q', random.randrange(2**64)),
309 block_target=state['target'],
312 print 'Generating! Difficulty: %.06f Payout if block: %.6f BTC' % (0xffff*2**208/p2pool.coinbase_type.unpack(generate_tx['tx_ins'][0]['script'])['share_data']['target'], generate_tx['tx_outs'][-1]['value']*1e-8)
313 #print 'Target: %x' % (p2pool.coinbase_type.unpack(generate_tx['tx_ins'][0]['script'])['share_data']['target'],)
314 #, have', shares.count(my_script) - 2, 'share(s) in the current chain. Fee:', sum(tx.value_in - tx.value_out for tx in extra_txs)/100000000
315 transactions = [generate_tx] + [tx.tx for tx in extra_txs]
316 merkle_root = bitcoin.data.merkle_hash(transactions)
317 merkle_root_to_transactions[merkle_root] = transactions # will stay for 1000 seconds
319 timestamp = int(time.time() - current_work2.value['clock_offset'])
320 if state['best_share_hash'] is not None:
321 timestamp2 = math.median((s.timestamp for s in itertools.islice(tracker.get_chain_to_root(state['best_share_hash']), 11)), use_float=False) + 1
322 if timestamp2 > timestamp:
323 print 'Toff', timestamp2 - timestamp
324 timestamp = timestamp2
325 target2 = p2pool.coinbase_type.unpack(generate_tx['tx_ins'][0]['script'])['share_data']['target']
326 times[p2pool.coinbase_type.unpack(generate_tx['tx_ins'][0]['script'])['share_data']['nonce']] = time.time()
327 #print 'SENT', 2**256//p2pool.coinbase_type.unpack(generate_tx['tx_ins'][0]['script'])['share_data']['target']
328 return bitcoin.getwork.BlockAttempt(state['version'], state['previous_block'], merkle_root, timestamp, state['target'], target2)
333 def got_response(data):
335 # match up with transactions
336 header = bitcoin.getwork.decode_data(data)
337 transactions = merkle_root_to_transactions.get(header['merkle_root'], None)
338 if transactions is None:
339 print '''Couldn't link returned work's merkle root with its transactions - should only happen if you recently restarted p2pool'''
341 block = dict(header=header, txs=transactions)
342 hash_ = bitcoin.data.block_header_type.hash256(block['header'])
343 if hash_ <= block['header']['target'] or p2pool_init.DEBUG:
345 print 'GOT BLOCK! Passing to bitcoind! bitcoin: %x' % (hash_,)
347 if factory.conn.value is not None:
348 factory.conn.value.send_block(block=block)
350 print 'No bitcoind connection! Erp!'
351 target = p2pool.coinbase_type.unpack(transactions[0]['tx_ins'][0]['script'])['share_data']['target']
353 print 'Received invalid share from worker - %x/%x' % (hash_, target)
355 share = p2pool.Share.from_block(block)
356 my_shares.add(share.hash)
357 print 'GOT SHARE! %s prev %s age %.2fs' % (p2pool.format_hash(share.hash), p2pool.format_hash(share.previous_hash), time.time() - times[share.nonce]) + (' DEAD ON ARRIVAL' if share.previous_hash != current_work.value['best_share_hash'] else '')
358 good = share.previous_hash == current_work.value['best_share_hash']
359 # maybe revert back to tracker being non-blocking so 'good' can be more accurate?
361 # eg. good = share.hash == current_work.value['best_share_hash'] here
364 log.err(None, 'Error processing data received from worker:')
367 web_root = worker_interface.WorkerInterface(current_work, compute, got_response, args.net)
370 if current_work.value['best_share_hash'] is not None:
371 height, last = tracker.get_height_and_last(current_work.value['best_share_hash'])
372 att_s = p2pool.get_pool_attempts_per_second(tracker, current_work.value['best_share_hash'], args.net, min(height, 720))
373 return json.dumps(att_s)
374 return json.dumps(None)
377 height, last = tracker.get_height_and_last(current_work.value['best_share_hash'])
378 weights, total_weight = tracker.get_cumulative_weights(current_work.value['best_share_hash'], min(height, 720), 2**256)
380 for script in sorted(weights, key=lambda s: weights[s]):
381 res[script.encode('hex')] = weights[script]/total_weight
382 return json.dumps(res)
384 class WebInterface(resource.Resource):
385 def __init__(self, func, mime_type):
386 self.func, self.mime_type = func, mime_type
388 def render_GET(self, request):
389 request.setHeader('Content-Type', self.mime_type)
392 web_root.putChild('rate', WebInterface(get_rate, 'application/json'))
393 web_root.putChild('users', WebInterface(get_users, 'application/json'))
395 reactor.listenTCP(args.worker_port, server.Site(web_root))
402 tx_pool = expiring_dict.ExpiringDict(600, get_touches=False) # hash -> tx
403 get_raw_transaction = deferral.DeferredCacher(lambda tx_hash: bitcoind.rpc_getrawtransaction('%x' % tx_hash), expiring_dict.ExpiringDict(100))
406 def __init__(self, tx, seen_at_block):
407 self.hash = bitcoin.data.tx_type.hash256(tx)
409 self.seen_at_block = seen_at_block
410 self.mentions = set([bitcoin.data.tx_type.hash256(tx)] + [tx_in['previous_output']['hash'] for tx_in in tx['tx_ins']])
412 #print '%x %r' % (seen_at_block, tx)
413 #for mention in self.mentions:
414 # print '%x' % mention
416 self.parents_all_in_blocks = False
419 self.value_out = sum(txout['value'] for txout in self.tx['tx_outs'])
420 self._find_parents_in_blocks()
422 @defer.inlineCallbacks
423 def _find_parents_in_blocks(self):
424 for tx_in in self.tx['tx_ins']:
426 raw_transaction = yield get_raw_transaction(tx_in['previous_output']['hash'])
429 self.value_in += raw_transaction['tx']['txouts'][tx_in['previous_output']['index']]['value']
430 #print raw_transaction
431 if not raw_transaction['parent_blocks']:
433 self.parents_all_in_blocks = True
436 if not self.parents_all_in_blocks:
442 @defer.inlineCallbacks
445 assert isinstance(tx_hash, (int, long))
446 #print 'REQUESTING', tx_hash
447 tx = yield (yield factory.getProtocol()).get_tx(tx_hash)
449 tx_pool[bitcoin.data.tx_type.hash256(tx)] = Tx(tx, current_work.value['previous_block'])
451 log.err(None, 'Error handling tx:')
452 # disable for now, for testing impact on stales
453 #factory.new_tx.watch(new_tx)
455 def new_block(block_hash):
456 work_updated.happened()
457 factory.new_block.watch(new_block)
459 print 'Started successfully!'
462 @defer.inlineCallbacks
465 flag = work_updated.get_deferred()
467 yield set_real_work1()
470 yield defer.DeferredList([flag, deferral.sleep(random.expovariate(1/1))], fireOnOneCallback=True)
472 @defer.inlineCallbacks
475 flag = tracker_updated.get_deferred()
477 yield set_real_work2()
480 yield defer.DeferredList([flag, deferral.sleep(random.expovariate(1/1))], fireOnOneCallback=True)
485 counter = skiplists.CountsSkipList(tracker, run_identifier)
488 yield deferral.sleep(random.expovariate(1/1))
490 if current_work.value['best_share_hash'] is not None:
491 height, last = tracker.get_height_and_last(current_work.value['best_share_hash'])
493 att_s = p2pool.get_pool_attempts_per_second(tracker, current_work.value['best_share_hash'], args.net)
494 weights, total_weight = tracker.get_cumulative_weights(current_work.value['best_share_hash'], min(height, 120), 2**100)
495 matching_in_chain = counter(current_work.value['best_share_hash'], height)
496 shares_in_chain = my_shares & matching_in_chain
497 stale_shares = my_shares - matching_in_chain
498 print 'Pool: %sH/s in %i shares Recent: %.02f%% >%sH/s Shares: %i (%i stale) Peers: %i' % (
501 weights.get(my_script, 0)/total_weight*100,
502 math.format(weights.get(my_script, 0)/total_weight*att_s),
503 len(shares_in_chain) + len(stale_shares),
507 #weights, total_weight = tracker.get_cumulative_weights(current_work.value['best_share_hash'], min(height, 100), 2**100)
508 #for k, v in weights.iteritems():
509 # print k.encode('hex'), v/total_weight
513 log.err(None, 'Fatal error:')
517 parser = argparse.ArgumentParser(description='p2pool (version %s)' % (p2pool_init.__version__,))
518 parser.add_argument('--version', action='version', version=p2pool_init.__version__)
519 parser.add_argument('--testnet',
520 help='use the testnet',
521 action='store_const', const=p2pool.Testnet, default=p2pool.Mainnet, dest='net')
522 parser.add_argument('--debug',
523 help='debugging mode',
524 action='store_const', const=True, default=False, dest='debug')
525 parser.add_argument('-a', '--address',
526 help='generate to this address (defaults to requesting one from bitcoind)',
527 type=str, action='store', default=None, dest='address')
529 p2pool_group = parser.add_argument_group('p2pool interface')
530 p2pool_group.add_argument('--p2pool-port', metavar='PORT',
531 help='use TCP port PORT to listen for connections (default: 9333 normally, 19333 for testnet) (forward this port from your router!)',
532 type=int, action='store', default=None, dest='p2pool_port')
533 p2pool_group.add_argument('-n', '--p2pool-node', metavar='ADDR[:PORT]',
534 help='connect to existing p2pool node at ADDR listening on TCP port PORT (defaults to 9333 normally, 19333 for testnet), in addition to builtin addresses',
535 type=str, action='append', default=[], dest='p2pool_nodes')
536 parser.add_argument('-l', '--low-bandwidth',
537 help='trade lower bandwidth usage for higher latency (reduced efficiency)',
538 action='store_true', default=False, dest='low_bandwidth')
540 worker_group = parser.add_argument_group('worker interface')
541 worker_group.add_argument('-w', '--worker-port', metavar='PORT',
542 help='listen on PORT for RPC connections from miners asking for work and providing responses (default: 9332)',
543 type=int, action='store', default=9332, dest='worker_port')
545 bitcoind_group = parser.add_argument_group('bitcoind interface')
546 bitcoind_group.add_argument('--bitcoind-address', metavar='BITCOIND_ADDRESS',
547 help='connect to a bitcoind at this address (default: 127.0.0.1)',
548 type=str, action='store', default='127.0.0.1', dest='bitcoind_address')
549 bitcoind_group.add_argument('--bitcoind-rpc-port', metavar='BITCOIND_RPC_PORT',
550 help='connect to a bitcoind at this port over the RPC interface - used to get the current highest block via getwork (default: 8332)',
551 type=int, action='store', default=8332, dest='bitcoind_rpc_port')
552 bitcoind_group.add_argument('--bitcoind-p2p-port', metavar='BITCOIND_P2P_PORT',
553 help='connect to a bitcoind at this port over the p2p interface - used to submit blocks and get the pubkey to generate to via an IP transaction (default: 8333 normally. 18333 for testnet)',
554 type=int, action='store', default=None, dest='bitcoind_p2p_port')
556 bitcoind_group.add_argument(metavar='BITCOIND_RPC_USERNAME',
557 help='bitcoind RPC interface username',
558 type=str, action='store', dest='bitcoind_rpc_username')
559 bitcoind_group.add_argument(metavar='BITCOIND_RPC_PASSWORD',
560 help='bitcoind RPC interface password',
561 type=str, action='store', dest='bitcoind_rpc_password')
563 args = parser.parse_args()
566 p2pool_init.DEBUG = True
567 class TeePipe(object):
568 def __init__(self, outputs):
569 self.outputs = outputs
570 def write(self, data):
571 for output in self.outputs:
574 for output in self.outputs:
576 class TimestampingPipe(object):
577 def __init__(self, inner_file):
578 self.inner_file = inner_file
581 def write(self, data):
582 buf = self.buf + data
583 lines = buf.split('\n')
584 for line in lines[:-1]:
585 self.inner_file.write('%s %s\n' % (datetime.datetime.now().strftime("%H:%M:%S.%f"), line))
586 self.inner_file.flush()
590 sys.stdout = sys.stderr = log.DefaultObserver.stderr = TimestampingPipe(TeePipe([sys.stderr, open(os.path.join(os.path.dirname(sys.argv[0]), 'debug.log'), 'w')]))
592 if args.bitcoind_p2p_port is None:
593 args.bitcoind_p2p_port = args.net.BITCOIN_P2P_PORT
595 if args.p2pool_port is None:
596 args.p2pool_port = args.net.P2P_PORT
598 if args.address is not None:
600 args.pubkey_hash = bitcoin.data.address_to_pubkey_hash(args.address, args.net)
602 raise ValueError('error parsing address: ' + repr(e))
604 args.pubkey_hash = None
606 reactor.callWhenRunning(main, args)