3 from __future__ import division
14 from twisted.internet import defer, reactor, task
15 from twisted.web import server
16 from twisted.python import log
18 import bitcoin.p2p, bitcoin.getwork, bitcoin.data
19 from util import db, expiring_dict, jsonrpc, variable, deferral, math, skiplist
20 from . import p2p, worker_interface
21 import p2pool.data as p2pool
22 import p2pool as p2pool_init
24 @deferral.retry('Error getting work from bitcoind:', 3)
25 @defer.inlineCallbacks
26 def getwork(bitcoind):
27 # a block could arrive in between these two queries
28 getwork_df, height_df = bitcoind.rpc_getwork(), bitcoind.rpc_getblocknumber()
30 getwork, height = bitcoin.getwork.BlockAttempt.from_getwork((yield getwork_df)), (yield height_df)
32 # get rid of residual errors
33 getwork_df.addErrback(lambda fail: None)
34 height_df.addErrback(lambda fail: None)
35 defer.returnValue((getwork, height))
37 @deferral.retry('Error getting payout script from bitcoind:', 1)
38 @defer.inlineCallbacks
39 def get_payout_script(factory):
40 res = yield (yield factory.getProtocol()).check_order(order=bitcoin.p2p.Protocol.null_order)
41 if res['reply'] == 'success':
42 my_script = res['script']
43 elif res['reply'] == 'denied':
46 raise ValueError('Unexpected reply: %r' % (res,))
48 @deferral.retry('Error creating payout script:', 10)
49 @defer.inlineCallbacks
50 def get_payout_script2(bitcoind, net):
51 defer.returnValue(bitcoin.data.pubkey_hash_to_script2(bitcoin.data.address_to_pubkey_hash((yield bitcoind.rpc_getaccountaddress('p2pool')), net)))
53 @defer.inlineCallbacks
56 print 'p2pool (version %s)' % (p2pool_init.__version__,)
59 # connect to bitcoind over JSON-RPC and do initial getwork
60 url = 'http://%s:%i/' % (args.bitcoind_address, args.bitcoind_rpc_port)
61 print '''Testing bitcoind RPC connection to '%s' with username '%s'...''' % (url, args.bitcoind_rpc_username)
62 bitcoind = jsonrpc.Proxy(url, (args.bitcoind_rpc_username, args.bitcoind_rpc_password))
63 temp_work, temp_height = yield getwork(bitcoind)
65 print ' Current block hash: %x height: %i' % (temp_work.previous_block, temp_height)
68 # connect to bitcoind over bitcoin-p2p and do checkorder to get pubkey to send payouts to
69 print '''Testing bitcoind P2P connection to '%s:%s'...''' % (args.bitcoind_address, args.bitcoind_p2p_port)
70 factory = bitcoin.p2p.ClientFactory(args.net)
71 reactor.connectTCP(args.bitcoind_address, args.bitcoind_p2p_port, factory)
72 my_script = yield get_payout_script(factory)
73 if args.pubkey_hash is None:
75 print 'IP transaction denied ... falling back to sending to address.'
76 my_script = yield get_payout_script2(bitcoind, args.net)
78 my_script = bitcoin.data.pubkey_hash_to_script2(args.pubkey_hash)
80 print ' Payout script:', my_script.encode('hex')
83 @defer.inlineCallbacks
84 def real_get_block(block_hash):
85 block = yield (yield factory.getProtocol()).get_block(block_hash)
86 print 'Got block %x' % (block_hash,)
87 defer.returnValue(block)
88 get_block = deferral.DeferredCacher(real_get_block, expiring_dict.ExpiringDict(3600))
90 get_raw_transaction = deferral.DeferredCacher(lambda tx_hash: bitcoind.rpc_getrawtransaction('%x' % tx_hash), expiring_dict.ExpiringDict(100))
92 ht = bitcoin.p2p.HeightTracker(factory)
94 tracker = p2pool.OkayTracker(args.net)
95 chains = expiring_dict.ExpiringDict(300)
96 def get_chain(chain_id_data):
97 return chains.setdefault(chain_id_data, Chain(chain_id_data))
99 # information affecting work that should trigger a long-polling update
100 current_work = variable.Variable(None)
101 # information affecting work that should not trigger a long-polling update
102 current_work2 = variable.Variable(None)
104 work_updated = variable.Event()
105 tracker_updated = variable.Event()
108 task.LoopingCall(requested.clear).start(60)
110 @defer.inlineCallbacks
111 def set_real_work1():
112 work, height = yield getwork(bitcoind)
113 # XXX call tracker_updated
114 current_work.set(dict(
115 version=work.version,
116 previous_block=work.previous_block,
119 best_share_hash=current_work.value['best_share_hash'] if current_work.value is not None else None,
121 current_work2.set(dict(
122 clock_offset=time.time() - work.timestamp,
125 @defer.inlineCallbacks
126 def set_real_work2():
127 best, desired = yield tracker.think(ht, current_work.value['previous_block'], time.time() - current_work2.value['clock_offset'])
129 t = dict(current_work.value)
130 t['best_share_hash'] = best
138 # # instead, trigger main thread to call set_work
139 # best, desired = yield tracker.think(ht, current_work.value['previous_block'], time.time() - current_work2.value['clock_offset'])
141 # if best == share.hash:
142 # print ('MINE: ' if peer is None else '') + 'Accepted share, new best, will pass to peers! Hash: %x' % (share.hash % 2**32,)
144 # print ('MINE: ' if peer is None else '') + 'Accepted share, not best. Hash: %x' % (share.hash % 2**32,)
146 # w = dict(current_work.value)
147 # w['best_share_hash'] = best
148 # current_work.set(w)
150 for peer2, share_hash in desired:
153 if (peer2.nonce, share_hash) in requested:
155 print 'Requesting parent share %x' % (share_hash % 2**32,)
156 peer2.send_getshares(
159 stops=list(set(tracker.heads) | set(
160 tracker.get_nth_parent_hash(head, min(max(0, tracker.get_height_and_last(head)[0] - 1), 10)) for head in tracker.heads
163 requested.add((peer2.nonce, share_hash))
165 print 'Initializing work...'
166 yield set_real_work1()
167 yield set_real_work2()
170 start_time = time.time() - current_work2.value['clock_offset']
172 # setup p2p logic and join p2pool network
174 def share_share(share, ignore_peer=None):
175 for peer in p2p_node.peers.itervalues():
176 if peer is ignore_peer:
178 peer.send_shares([share])
181 def p2p_shares(shares, peer=None):
183 print "Processing %i shares..." % (len(shares),)
186 if share.hash in tracker.shares:
187 #print 'Got duplicate share, ignoring. Hash: %x' % (share.hash % 2**32,)
190 #print 'Received share %x from %r' % (share.hash % 2**32, share.peer.transport.getPeer() if share.peer is not None else None)
193 #for peer2, share_hash in desired:
194 # print 'Requesting parent share %x' % (share_hash,)
195 # peer2.send_getshares(hashes=[share_hash], parents=2000)
197 if share.bitcoin_hash <= share.header['target']:
199 print 'GOT BLOCK! Passing to bitcoind! %x bitcoin: %x' % (share.hash % 2**32, share.bitcoin_hash,)
201 if factory.conn.value is not None:
202 factory.conn.value.send_block(block=share.as_block(tracker, args.net))
204 print 'No bitcoind connection! Erp!'
206 tracker_updated.happened()
209 print "... done processing %i shares." % (len(shares),)
211 def p2p_share_hashes(share_hashes, peer):
213 for share_hash in share_hashes:
214 if share_hash in tracker.shares:
215 pass # print 'Got share hash, already have, ignoring. Hash: %x' % (share_hash % 2**32,)
217 print 'Got share hash, requesting! Hash: %x' % (share_hash % 2**32,)
218 get_hashes.append(share_hash)
220 peer.send_getshares(hashes=get_hashes, parents=0, stops=[])
222 def p2p_get_shares(share_hashes, parents, stops, peer):
223 parents = min(parents, 1000//len(share_hashes))
226 for share_hash in share_hashes:
227 for share in itertools.islice(tracker.get_chain_known(share_hash), parents + 1):
228 if share.hash in stops:
231 peer.send_shares(shares, full=True)
233 print 'Joining p2pool network using TCP port %i...' % (args.p2pool_port,)
237 ip, port = x.split(':')
240 return x, args.net.P2P_PORT
243 ('72.14.191.28', args.net.P2P_PORT),
244 ('62.204.197.159', args.net.P2P_PORT),
247 nodes.append(((yield reactor.resolve('p2pool.forre.st')), args.net.P2P_PORT))
250 print 'Error resolving bootstrap node IP:'
255 current_work=current_work,
256 port=args.p2pool_port,
258 addr_store=db.SQLiteDict(sqlite3.connect(os.path.join(os.path.dirname(sys.argv[0]), 'addrs.dat'), isolation_level=None), args.net.ADDRS_TABLE),
259 mode=0 if args.low_bandwidth else 1,
260 preferred_addrs=map(parse, args.p2pool_nodes) + nodes,
262 p2p_node.handle_shares = p2p_shares
263 p2p_node.handle_share_hashes = p2p_share_hashes
264 p2p_node.handle_get_shares = p2p_get_shares
268 # send share when the chain changes to their chain
269 def work_changed(new_work):
270 #print 'Work changed:', new_work
271 for share in tracker.get_chain_known(new_work['best_share_hash']):
274 share_share(share, share.peer)
275 current_work.changed.watch(work_changed)
280 # start listening for workers with a JSON-RPC server
282 print 'Listening for workers on port %i...' % (args.worker_port,)
286 merkle_root_to_transactions = expiring_dict.ExpiringDict(300)
287 run_identifier = struct.pack('<Q', random.randrange(2**64))
289 def compute(state, all_targets):
290 pre_extra_txs = [tx for tx in tx_pool.itervalues() if tx.is_good()]
291 pre_extra_txs = pre_extra_txs[:2**16 - 1] # merkle_branch limit
294 for tx in pre_extra_txs:
295 this_size = len(bitcoin.data.tx_type.pack(tx.tx))
296 if size + this_size > 500000:
301 # XXX assuming generate_tx is smallish here..
302 generate_tx = p2pool.generate_transaction(
304 previous_share_hash=state['best_share_hash'],
305 new_script=my_script,
306 subsidy=(50*100000000 >> (state['height'] + 1)//210000) + sum(tx.value_in - tx.value_out for tx in extra_txs),
307 nonce=run_identifier + struct.pack('<Q', random.randrange(2**64)),
308 block_target=state['target'],
311 print 'Generating! Difficulty: %.06f Payout if block: %.6f BTC' % (0xffff*2**208/p2pool.coinbase_type.unpack(generate_tx['tx_ins'][0]['script'])['share_data']['target'], generate_tx['tx_outs'][-1]['value']*1e-8)
312 #print 'Target: %x' % (p2pool.coinbase_type.unpack(generate_tx['tx_ins'][0]['script'])['share_data']['target'],)
313 #, have', shares.count(my_script) - 2, 'share(s) in the current chain. Fee:', sum(tx.value_in - tx.value_out for tx in extra_txs)/100000000
314 transactions = [generate_tx] + [tx.tx for tx in extra_txs]
315 merkle_root = bitcoin.data.merkle_hash(transactions)
316 merkle_root_to_transactions[merkle_root] = transactions # will stay for 1000 seconds
318 timestamp = int(time.time() - current_work2.value['clock_offset'])
319 if state['best_share_hash'] is not None:
320 timestamp2 = math.median((s.timestamp for s in itertools.islice(tracker.get_chain_to_root(state['best_share_hash']), 11)), use_float=False) + 1
321 if timestamp2 > timestamp:
322 print 'Toff', timestamp2 - timestamp
323 timestamp = timestamp2
324 target2 = p2pool.coinbase_type.unpack(generate_tx['tx_ins'][0]['script'])['share_data']['target']
326 target2 = min(2**256//2**32 - 1, target2)
327 times[p2pool.coinbase_type.unpack(generate_tx['tx_ins'][0]['script'])['share_data']['nonce']] = time.time()
328 #print 'SENT', 2**256//p2pool.coinbase_type.unpack(generate_tx['tx_ins'][0]['script'])['share_data']['target']
329 return bitcoin.getwork.BlockAttempt(state['version'], state['previous_block'], merkle_root, timestamp, state['target'], target2)
334 def got_response(data):
336 # match up with transactions
337 header = bitcoin.getwork.decode_data(data)
338 transactions = merkle_root_to_transactions.get(header['merkle_root'], None)
339 if transactions is None:
340 print '''Couldn't link returned work's merkle root with its transactions - should only happen if you recently restarted p2pool'''
342 block = dict(header=header, txs=transactions)
343 hash_ = bitcoin.data.block_header_type.hash256(block['header'])
344 if hash_ <= block['header']['target'] or p2pool_init.DEBUG:
346 print 'GOT BLOCK! Passing to bitcoind! %x' % (hash_,)
348 if factory.conn.value is not None:
349 factory.conn.value.send_block(block=block)
351 print 'No bitcoind connection! Erp!'
352 target = p2pool.coinbase_type.unpack(transactions[0]['tx_ins'][0]['script'])['share_data']['target']
354 print 'Received invalid share from worker - %x/%x' % (hash_, target)
356 share = p2pool.Share.from_block(block)
357 my_shares.add(share.hash)
358 print 'GOT SHARE! %x prev %x' % (share.hash % 2**32, 0 if share.previous_hash is None else share.previous_hash % 2**32), "DEAD ON ARRIVAL" if share.previous_hash != current_work.value['best_share_hash'] else "", time.time() - times[share.nonce], "s since getwork"
359 good = share.previous_hash == current_work.value['best_share_hash']
360 # maybe revert back to tracker being non-blocking so 'good' can be more accurate?
362 # eg. good = share.hash == current_work.value['best_share_hash'] here
366 print 'Error processing data received from worker:'
372 if current_work.value['best_share_hash'] is not None:
373 height, last = tracker.get_height_and_last(current_work.value['best_share_hash'])
374 att_s = p2pool.get_pool_attempts_per_second(tracker, current_work.value['best_share_hash'], args.net)
377 reactor.listenTCP(args.worker_port, server.Site(worker_interface.WorkerInterface(current_work, compute, got_response, get_rate)))
384 def get_blocks(start_hash):
387 block = get_block.call_now(start_hash)
388 except deferral.NotNowError:
390 yield start_hash, block
391 start_hash = block['header']['previous_block']
393 tx_pool = expiring_dict.ExpiringDict(600, get_touches=False) # hash -> tx
396 def __init__(self, tx, seen_at_block):
397 self.hash = bitcoin.data.tx_type.hash256(tx)
399 self.seen_at_block = seen_at_block
400 self.mentions = set([bitcoin.data.tx_type.hash256(tx)] + [tx_in['previous_output']['hash'] for tx_in in tx['tx_ins']])
402 #print '%x %r' % (seen_at_block, tx)
403 #for mention in self.mentions:
404 # print '%x' % mention
406 self.parents_all_in_blocks = False
409 self.value_out = sum(txout['value'] for txout in self.tx['tx_outs'])
410 self._find_parents_in_blocks()
412 @defer.inlineCallbacks
413 def _find_parents_in_blocks(self):
414 for tx_in in self.tx['tx_ins']:
416 raw_transaction = yield get_raw_transaction(tx_in['previous_output']['hash'])
419 self.value_in += raw_transaction['tx']['txouts'][tx_in['previous_output']['index']]['value']
420 #print raw_transaction
421 if not raw_transaction['parent_blocks']:
423 self.parents_all_in_blocks = True
426 if not self.parents_all_in_blocks:
433 for block_hash, block in itertools.islice(get_blocks(current_work.value['previous_block']), 10):
434 if block_hash == self.seen_at_block:
436 for tx in block['txs']:
437 mentions = set([bitcoin.data.tx_type.hash256(tx)] + [tx_in['previous_output']['hash'] for tx_in in tx['tx_ins'] if tx_in['previous_output'] is not None])
438 if mentions & self.mentions:
442 @defer.inlineCallbacks
445 assert isinstance(tx_hash, (int, long))
446 #print "REQUESTING", tx_hash
447 tx = yield (yield factory.getProtocol()).get_tx(tx_hash)
449 tx_pool[bitcoin.data.tx_type.hash256(tx)] = Tx(tx, current_work.value['previous_block'])
452 print 'Error handling tx:'
455 # disable for now, for testing impact on stales
456 #factory.new_tx.watch(new_tx)
458 def new_block(block):
459 work_updated.happened()
460 factory.new_block.watch(new_block)
462 print 'Started successfully!'
465 @defer.inlineCallbacks
469 yield work_updated.get_deferred(random.expovariate(1/1))
470 except defer.TimeoutError:
473 yield set_real_work1()
478 @defer.inlineCallbacks
482 yield tracker_updated.get_deferred(random.expovariate(1/1))
483 except defer.TimeoutError:
486 yield set_real_work2()
493 counter = skiplist.CountsSkipList(tracker, my_script, run_identifier)
496 yield deferral.sleep(random.expovariate(1/1))
498 if current_work.value['best_share_hash'] is not None:
499 height, last = tracker.get_height_and_last(current_work.value['best_share_hash'])
501 att_s = p2pool.get_pool_attempts_per_second(tracker, current_work.value['best_share_hash'], args.net)
502 weights, total_weight = tracker.get_cumulative_weights(current_work.value['best_share_hash'], min(height, 120), 2**100)
503 count = counter(current_work.value['best_share_hash'], height, 2**100)
504 print 'Pool: %sH/s in %i shares Recent: %.02f%% >%sH/s Shares: %i (%i stale)' % (
507 weights.get(my_script, 0)/total_weight*100,
508 math.format(weights.get(my_script, 0)/total_weight*att_s),
510 len(my_shares) - count,
512 #weights, total_weight = tracker.get_cumulative_weights(current_work.value['best_share_hash'], min(height, 100), 2**100)
513 #for k, v in weights.iteritems():
514 # print k.encode('hex'), v/total_weight
525 parser = argparse.ArgumentParser(description='p2pool (version %s)' % (p2pool_init.__version__,))
526 parser.add_argument('--version', action='version', version=p2pool_init.__version__)
527 parser.add_argument('--testnet',
528 help='use the testnet',
529 action='store_const', const=p2pool.Testnet, default=p2pool.Mainnet, dest='net')
530 parser.add_argument('--debug',
531 help='debugging mode',
532 action='store_const', const=True, default=False, dest='debug')
533 parser.add_argument('-a', '--address',
534 help='generate to this address (defaults to requesting one from bitcoind)',
535 type=str, action='store', default=None, dest='address')
537 p2pool_group = parser.add_argument_group('p2pool interface')
538 p2pool_group.add_argument('--p2pool-port', metavar='PORT',
539 help='use TCP port PORT to listen for connections (default: 9333 normally, 19333 for testnet) (forward this port from your router!)',
540 type=int, action='store', default=None, dest='p2pool_port')
541 p2pool_group.add_argument('-n', '--p2pool-node', metavar='ADDR[:PORT]',
542 help='connect to existing p2pool node at ADDR listening on TCP port PORT (defaults to 9333 normally, 19333 for testnet), in addition to builtin addresses',
543 type=str, action='append', default=[], dest='p2pool_nodes')
544 parser.add_argument('-l', '--low-bandwidth',
545 help='trade lower bandwidth usage for higher latency (reduced efficiency)',
546 action='store_true', default=False, dest='low_bandwidth')
548 worker_group = parser.add_argument_group('worker interface')
549 worker_group.add_argument('-w', '--worker-port', metavar='PORT',
550 help='listen on PORT for RPC connections from miners asking for work and providing responses (default: 9332)',
551 type=int, action='store', default=9332, dest='worker_port')
553 bitcoind_group = parser.add_argument_group('bitcoind interface')
554 bitcoind_group.add_argument('--bitcoind-address', metavar='BITCOIND_ADDRESS',
555 help='connect to a bitcoind at this address (default: 127.0.0.1)',
556 type=str, action='store', default='127.0.0.1', dest='bitcoind_address')
557 bitcoind_group.add_argument('--bitcoind-rpc-port', metavar='BITCOIND_RPC_PORT',
558 help='connect to a bitcoind at this port over the RPC interface - used to get the current highest block via getwork (default: 8332)',
559 type=int, action='store', default=8332, dest='bitcoind_rpc_port')
560 bitcoind_group.add_argument('--bitcoind-p2p-port', metavar='BITCOIND_P2P_PORT',
561 help='connect to a bitcoind at this port over the p2p interface - used to submit blocks and get the pubkey to generate to via an IP transaction (default: 8333 normally. 18333 for testnet)',
562 type=int, action='store', default=None, dest='bitcoind_p2p_port')
564 bitcoind_group.add_argument(metavar='BITCOIND_RPC_USERNAME',
565 help='bitcoind RPC interface username',
566 type=str, action='store', dest='bitcoind_rpc_username')
567 bitcoind_group.add_argument(metavar='BITCOIND_RPC_PASSWORD',
568 help='bitcoind RPC interface password',
569 type=str, action='store', dest='bitcoind_rpc_password')
571 args = parser.parse_args()
574 p2pool_init.DEBUG = True
575 class TimestampingPipe(object):
576 def __init__(self, inner_file):
577 self.inner_file = inner_file
579 def write(self, data):
580 buf = self.buf + data
581 lines = buf.split('\n')
582 for line in lines[:-1]:
583 self.inner_file.write("%s %s\n" % (time.strftime("%H:%M:%S"), line))
585 sys.stdout = TimestampingPipe(sys.stdout)
586 sys.stderr = TimestampingPipe(sys.stderr)
588 if args.bitcoind_p2p_port is None:
589 args.bitcoind_p2p_port = args.net.BITCOIN_P2P_PORT
591 if args.p2pool_port is None:
592 args.p2pool_port = args.net.P2P_PORT
594 if args.address is not None:
596 args.pubkey_hash = bitcoin.data.address_to_pubkey_hash(args.address, args.net)
598 raise ValueError("error parsing address: " + repr(e))
600 args.pubkey_hash = None
602 reactor.callWhenRunning(main, args)