3 from __future__ import division
14 from twisted.internet import defer, reactor, task
15 from twisted.web import server
16 from twisted.python import log
18 import bitcoin.p2p, bitcoin.getwork, bitcoin.data
19 from util import db, expiring_dict, jsonrpc, variable, deferral, math, skiplist
20 from . import p2p, worker_interface
21 import p2pool.data as p2pool
22 import p2pool as p2pool_init
24 @deferral.retry('Error getting work from bitcoind:', 3)
25 @defer.inlineCallbacks
26 def getwork(bitcoind):
27 # a block could arrive in between these two queries
28 getwork_df, height_df = bitcoind.rpc_getwork(), bitcoind.rpc_getblocknumber()
30 getwork, height = bitcoin.getwork.BlockAttempt.from_getwork((yield getwork_df)), (yield height_df)
32 # get rid of residual errors
33 getwork_df.addErrback(lambda fail: None)
34 height_df.addErrback(lambda fail: None)
35 defer.returnValue((getwork, height))
37 @deferral.retry('Error getting payout script from bitcoind:', 1)
38 @defer.inlineCallbacks
39 def get_payout_script(factory):
40 res = yield (yield factory.getProtocol()).check_order(order=bitcoin.p2p.Protocol.null_order)
41 if res['reply'] == 'success':
42 my_script = res['script']
43 elif res['reply'] == 'denied':
46 raise ValueError('Unexpected reply: %r' % (res,))
48 @deferral.retry('Error creating payout script:', 10)
49 @defer.inlineCallbacks
50 def get_payout_script2(bitcoind, net):
51 defer.returnValue(bitcoin.data.pubkey_hash_to_script2(bitcoin.data.address_to_pubkey_hash((yield bitcoind.rpc_getaccountaddress('p2pool')), net)))
53 @defer.inlineCallbacks
56 print 'p2pool (version %s)' % (p2pool_init.__version__,)
59 # connect to bitcoind over JSON-RPC and do initial getwork
60 url = 'http://%s:%i/' % (args.bitcoind_address, args.bitcoind_rpc_port)
61 print '''Testing bitcoind RPC connection to '%s' with username '%s'...''' % (url, args.bitcoind_rpc_username)
62 bitcoind = jsonrpc.Proxy(url, (args.bitcoind_rpc_username, args.bitcoind_rpc_password))
63 temp_work, temp_height = yield getwork(bitcoind)
65 print ' Current block hash: %x height: %i' % (temp_work.previous_block, temp_height)
68 # connect to bitcoind over bitcoin-p2p and do checkorder to get pubkey to send payouts to
69 print '''Testing bitcoind P2P connection to '%s:%s'...''' % (args.bitcoind_address, args.bitcoind_p2p_port)
70 factory = bitcoin.p2p.ClientFactory(args.net)
71 reactor.connectTCP(args.bitcoind_address, args.bitcoind_p2p_port, factory)
72 my_script = yield get_payout_script(factory)
73 if args.pubkey_hash is None:
75 print 'IP transaction denied ... falling back to sending to address.'
76 my_script = yield get_payout_script2(bitcoind, args.net)
78 my_script = bitcoin.data.pubkey_hash_to_script2(args.pubkey_hash)
80 print ' Payout script:', my_script.encode('hex')
83 @defer.inlineCallbacks
84 def real_get_block(block_hash):
85 block = yield (yield factory.getProtocol()).get_block(block_hash)
86 print 'Got block %x' % (block_hash,)
87 defer.returnValue(block)
88 get_block = deferral.DeferredCacher(real_get_block, expiring_dict.ExpiringDict(3600))
90 get_raw_transaction = deferral.DeferredCacher(lambda tx_hash: bitcoind.rpc_getrawtransaction('%x' % tx_hash), expiring_dict.ExpiringDict(100))
92 ht = bitcoin.p2p.HeightTracker(factory)
94 tracker = p2pool.OkayTracker(args.net)
95 chains = expiring_dict.ExpiringDict(300)
96 def get_chain(chain_id_data):
97 return chains.setdefault(chain_id_data, Chain(chain_id_data))
99 # information affecting work that should trigger a long-polling update
100 current_work = variable.Variable(None)
101 # information affecting work that should not trigger a long-polling update
102 current_work2 = variable.Variable(None)
104 work_updated = variable.Event()
105 tracker_updated = variable.Event()
108 task.LoopingCall(requested.clear).start(60)
110 @defer.inlineCallbacks
111 def set_real_work1():
112 work, height = yield getwork(bitcoind)
113 # XXX call tracker_updated
114 current_work.set(dict(
115 version=work.version,
116 previous_block=work.previous_block,
119 best_share_hash=current_work.value['best_share_hash'] if current_work.value is not None else None,
121 current_work2.set(dict(
122 clock_offset=time.time() - work.timestamp,
125 @defer.inlineCallbacks
126 def set_real_work2():
127 best, desired = yield tracker.think(ht, current_work.value['previous_block'], time.time() - current_work2.value['clock_offset'])
129 t = dict(current_work.value)
130 t['best_share_hash'] = best
138 # # instead, trigger main thread to call set_work
139 # best, desired = yield tracker.think(ht, current_work.value['previous_block'], time.time() - current_work2.value['clock_offset'])
141 # if best == share.hash:
142 # print ('MINE: ' if peer is None else '') + 'Accepted share, new best, will pass to peers! Hash: %x' % (share.hash % 2**32,)
144 # print ('MINE: ' if peer is None else '') + 'Accepted share, not best. Hash: %x' % (share.hash % 2**32,)
146 # w = dict(current_work.value)
147 # w['best_share_hash'] = best
148 # current_work.set(w)
150 for peer2, share_hash in desired:
153 if (peer2.nonce, share_hash) in requested:
155 print 'Requesting parent share %x' % (share_hash % 2**32,)
156 peer2.send_getshares(
159 stops=list(set(tracker.heads) | set(
160 tracker.get_nth_parent_hash(head, min(max(0, tracker.get_height_and_last(head)[0] - 1), 10)) for head in tracker.heads
163 requested.add((peer2.nonce, share_hash))
165 print 'Initializing work...'
166 yield set_real_work1()
167 yield set_real_work2()
170 start_time = time.time() - current_work2.value['clock_offset']
172 # setup p2p logic and join p2pool network
174 def share_share(share, ignore_peer=None):
175 for peer in p2p_node.peers.itervalues():
176 if peer is ignore_peer:
178 peer.send_shares([share])
181 def p2p_shares(shares, peer=None):
183 print "Processing %i shares..." % (len(shares),)
187 if share.hash in tracker.shares:
188 #print 'Got duplicate share, ignoring. Hash: %x' % (share.hash % 2**32,)
192 #print 'Received share %x from %r' % (share.hash % 2**32, share.peer.transport.getPeer() if share.peer is not None else None)
195 #for peer2, share_hash in desired:
196 # print 'Requesting parent share %x' % (share_hash,)
197 # peer2.send_getshares(hashes=[share_hash], parents=2000)
199 if share.bitcoin_hash <= share.header['target']:
201 print 'GOT BLOCK! Passing to bitcoind! %x bitcoin: %x' % (share.hash % 2**32, share.bitcoin_hash,)
203 if factory.conn.value is not None:
204 factory.conn.value.send_block(block=share.as_block(tracker, args.net))
206 print 'No bitcoind connection! Erp!'
209 tracker_updated.happened()
212 print "... done processing %i shares." % (len(shares),)
214 def p2p_share_hashes(share_hashes, peer):
216 for share_hash in share_hashes:
217 if share_hash in tracker.shares:
218 pass # print 'Got share hash, already have, ignoring. Hash: %x' % (share_hash % 2**32,)
220 print 'Got share hash, requesting! Hash: %x' % (share_hash % 2**32,)
221 get_hashes.append(share_hash)
223 peer.send_getshares(hashes=get_hashes, parents=0, stops=[])
225 def p2p_get_shares(share_hashes, parents, stops, peer):
226 parents = min(parents, 1000//len(share_hashes))
229 for share_hash in share_hashes:
230 for share in itertools.islice(tracker.get_chain_known(share_hash), parents + 1):
231 if share.hash in stops:
234 peer.send_shares(shares, full=True)
236 print 'Joining p2pool network using TCP port %i...' % (args.p2pool_port,)
240 ip, port = x.split(':')
243 return x, args.net.P2P_PORT
246 ('72.14.191.28', args.net.P2P_PORT),
247 ('62.204.197.159', args.net.P2P_PORT),
250 nodes.append(((yield reactor.resolve('p2pool.forre.st')), args.net.P2P_PORT))
253 print 'Error resolving bootstrap node IP:'
258 current_work=current_work,
259 port=args.p2pool_port,
261 addr_store=db.SQLiteDict(sqlite3.connect(os.path.join(os.path.dirname(sys.argv[0]), 'addrs.dat'), isolation_level=None), args.net.ADDRS_TABLE),
262 mode=0 if args.low_bandwidth else 1,
263 preferred_addrs=map(parse, args.p2pool_nodes) + nodes,
265 p2p_node.handle_shares = p2p_shares
266 p2p_node.handle_share_hashes = p2p_share_hashes
267 p2p_node.handle_get_shares = p2p_get_shares
271 # send share when the chain changes to their chain
272 def work_changed(new_work):
273 #print 'Work changed:', new_work
274 for share in tracker.get_chain_known(new_work['best_share_hash']):
277 share_share(share, share.peer)
278 current_work.changed.watch(work_changed)
283 # start listening for workers with a JSON-RPC server
285 print 'Listening for workers on port %i...' % (args.worker_port,)
289 merkle_root_to_transactions = expiring_dict.ExpiringDict(300)
290 run_identifier = struct.pack('<Q', random.randrange(2**64))
292 def compute(state, all_targets):
293 pre_extra_txs = [tx for tx in tx_pool.itervalues() if tx.is_good()]
294 pre_extra_txs = pre_extra_txs[:2**16 - 1] # merkle_branch limit
297 for tx in pre_extra_txs:
298 this_size = len(bitcoin.data.tx_type.pack(tx.tx))
299 if size + this_size > 500000:
304 # XXX assuming generate_tx is smallish here..
305 generate_tx = p2pool.generate_transaction(
307 previous_share_hash=state['best_share_hash'],
308 new_script=my_script,
309 subsidy=(50*100000000 >> (state['height'] + 1)//210000) + sum(tx.value_in - tx.value_out for tx in extra_txs),
310 nonce=run_identifier + struct.pack('<Q', random.randrange(2**64)),
311 block_target=state['target'],
314 print 'Generating! Difficulty: %.06f Payout if block: %.6f BTC' % (0xffff*2**208/p2pool.coinbase_type.unpack(generate_tx['tx_ins'][0]['script'])['share_data']['target'], generate_tx['tx_outs'][-1]['value']*1e-8)
315 #print 'Target: %x' % (p2pool.coinbase_type.unpack(generate_tx['tx_ins'][0]['script'])['share_data']['target'],)
316 #, have', shares.count(my_script) - 2, 'share(s) in the current chain. Fee:', sum(tx.value_in - tx.value_out for tx in extra_txs)/100000000
317 transactions = [generate_tx] + [tx.tx for tx in extra_txs]
318 merkle_root = bitcoin.data.merkle_hash(transactions)
319 merkle_root_to_transactions[merkle_root] = transactions # will stay for 1000 seconds
321 timestamp = int(time.time() - current_work2.value['clock_offset'])
322 if state['best_share_hash'] is not None:
323 timestamp2 = math.median((s.timestamp for s in itertools.islice(tracker.get_chain_to_root(state['best_share_hash']), 11)), use_float=False) + 1
324 if timestamp2 > timestamp:
325 print 'Toff', timestamp2 - timestamp
326 timestamp = timestamp2
327 target2 = p2pool.coinbase_type.unpack(generate_tx['tx_ins'][0]['script'])['share_data']['target']
329 target2 = min(2**256//2**32 - 1, target2)
330 times[p2pool.coinbase_type.unpack(generate_tx['tx_ins'][0]['script'])['share_data']['nonce']] = time.time()
331 #print 'SENT', 2**256//p2pool.coinbase_type.unpack(generate_tx['tx_ins'][0]['script'])['share_data']['target']
332 return bitcoin.getwork.BlockAttempt(state['version'], state['previous_block'], merkle_root, timestamp, state['target'], target2)
337 def got_response(data):
339 # match up with transactions
340 header = bitcoin.getwork.decode_data(data)
341 transactions = merkle_root_to_transactions.get(header['merkle_root'], None)
342 if transactions is None:
343 print '''Couldn't link returned work's merkle root with its transactions - should only happen if you recently restarted p2pool'''
345 block = dict(header=header, txs=transactions)
346 hash_ = bitcoin.data.block_header_type.hash256(block['header'])
347 if hash_ <= block['header']['target'] or p2pool_init.DEBUG:
349 print 'GOT BLOCK! Passing to bitcoind! %x' % (hash_,)
351 if factory.conn.value is not None:
352 factory.conn.value.send_block(block=block)
354 print 'No bitcoind connection! Erp!'
355 target = p2pool.coinbase_type.unpack(transactions[0]['tx_ins'][0]['script'])['share_data']['target']
357 print 'Received invalid share from worker - %x/%x' % (hash_, target)
359 share = p2pool.Share.from_block(block)
360 my_shares.add(share.hash)
361 print 'GOT SHARE! %x prev %x' % (share.hash % 2**32, 0 if share.previous_hash is None else share.previous_hash % 2**32), "DEAD ON ARRIVAL" if share.previous_hash != current_work.value['best_share_hash'] else "", time.time() - times[share.nonce], "s since getwork"
362 good = share.previous_hash == current_work.value['best_share_hash']
363 # maybe revert back to tracker being non-blocking so 'good' can be more accurate?
365 # eg. good = share.hash == current_work.value['best_share_hash'] here
369 print 'Error processing data received from worker:'
375 if current_work.value['best_share_hash'] is not None:
376 height, last = tracker.get_height_and_last(current_work.value['best_share_hash'])
377 att_s = p2pool.get_pool_attempts_per_second(tracker, current_work.value['best_share_hash'], args.net)
380 reactor.listenTCP(args.worker_port, server.Site(worker_interface.WorkerInterface(current_work, compute, got_response, get_rate)))
387 def get_blocks(start_hash):
390 block = get_block.call_now(start_hash)
391 except deferral.NotNowError:
393 yield start_hash, block
394 start_hash = block['header']['previous_block']
396 tx_pool = expiring_dict.ExpiringDict(600, get_touches=False) # hash -> tx
399 def __init__(self, tx, seen_at_block):
400 self.hash = bitcoin.data.tx_type.hash256(tx)
402 self.seen_at_block = seen_at_block
403 self.mentions = set([bitcoin.data.tx_type.hash256(tx)] + [tx_in['previous_output']['hash'] for tx_in in tx['tx_ins']])
405 #print '%x %r' % (seen_at_block, tx)
406 #for mention in self.mentions:
407 # print '%x' % mention
409 self.parents_all_in_blocks = False
412 self.value_out = sum(txout['value'] for txout in self.tx['tx_outs'])
413 self._find_parents_in_blocks()
415 @defer.inlineCallbacks
416 def _find_parents_in_blocks(self):
417 for tx_in in self.tx['tx_ins']:
419 raw_transaction = yield get_raw_transaction(tx_in['previous_output']['hash'])
422 self.value_in += raw_transaction['tx']['txouts'][tx_in['previous_output']['index']]['value']
423 #print raw_transaction
424 if not raw_transaction['parent_blocks']:
426 self.parents_all_in_blocks = True
429 if not self.parents_all_in_blocks:
436 for block_hash, block in itertools.islice(get_blocks(current_work.value['previous_block']), 10):
437 if block_hash == self.seen_at_block:
439 for tx in block['txs']:
440 mentions = set([bitcoin.data.tx_type.hash256(tx)] + [tx_in['previous_output']['hash'] for tx_in in tx['tx_ins'] if tx_in['previous_output'] is not None])
441 if mentions & self.mentions:
445 @defer.inlineCallbacks
448 assert isinstance(tx_hash, (int, long))
449 #print "REQUESTING", tx_hash
450 tx = yield (yield factory.getProtocol()).get_tx(tx_hash)
452 tx_pool[bitcoin.data.tx_type.hash256(tx)] = Tx(tx, current_work.value['previous_block'])
455 print 'Error handling tx:'
458 # disable for now, for testing impact on stales
459 #factory.new_tx.watch(new_tx)
461 def new_block(block):
462 work_updated.happened()
463 factory.new_block.watch(new_block)
465 print 'Started successfully!'
468 @defer.inlineCallbacks
471 flag = work_updated.get_deferred()
473 yield set_real_work1()
476 yield defer.DeferredList([flag, deferral.sleep(random.expovariate(1/1))], fireOnOneCallback=True)
479 @defer.inlineCallbacks
482 flag = tracker_updated.get_deferred()
484 yield set_real_work2()
487 yield defer.DeferredList([flag, deferral.sleep(random.expovariate(1/1))], fireOnOneCallback=True)
492 counter = skiplist.CountsSkipList(tracker, my_script, run_identifier)
495 yield deferral.sleep(random.expovariate(1/1))
497 if current_work.value['best_share_hash'] is not None:
498 height, last = tracker.get_height_and_last(current_work.value['best_share_hash'])
500 att_s = p2pool.get_pool_attempts_per_second(tracker, current_work.value['best_share_hash'], args.net)
501 weights, total_weight = tracker.get_cumulative_weights(current_work.value['best_share_hash'], min(height, 120), 2**100)
502 count = counter(current_work.value['best_share_hash'], height, 2**100)
503 print 'Pool: %sH/s in %i shares Recent: %.02f%% >%sH/s Shares: %i (%i stale)' % (
506 weights.get(my_script, 0)/total_weight*100,
507 math.format(weights.get(my_script, 0)/total_weight*att_s),
509 len(my_shares) - count,
511 #weights, total_weight = tracker.get_cumulative_weights(current_work.value['best_share_hash'], min(height, 100), 2**100)
512 #for k, v in weights.iteritems():
513 # print k.encode('hex'), v/total_weight
524 parser = argparse.ArgumentParser(description='p2pool (version %s)' % (p2pool_init.__version__,))
525 parser.add_argument('--version', action='version', version=p2pool_init.__version__)
526 parser.add_argument('--testnet',
527 help='use the testnet',
528 action='store_const', const=p2pool.Testnet, default=p2pool.Mainnet, dest='net')
529 parser.add_argument('--debug',
530 help='debugging mode',
531 action='store_const', const=True, default=False, dest='debug')
532 parser.add_argument('-a', '--address',
533 help='generate to this address (defaults to requesting one from bitcoind)',
534 type=str, action='store', default=None, dest='address')
536 p2pool_group = parser.add_argument_group('p2pool interface')
537 p2pool_group.add_argument('--p2pool-port', metavar='PORT',
538 help='use TCP port PORT to listen for connections (default: 9333 normally, 19333 for testnet) (forward this port from your router!)',
539 type=int, action='store', default=None, dest='p2pool_port')
540 p2pool_group.add_argument('-n', '--p2pool-node', metavar='ADDR[:PORT]',
541 help='connect to existing p2pool node at ADDR listening on TCP port PORT (defaults to 9333 normally, 19333 for testnet), in addition to builtin addresses',
542 type=str, action='append', default=[], dest='p2pool_nodes')
543 parser.add_argument('-l', '--low-bandwidth',
544 help='trade lower bandwidth usage for higher latency (reduced efficiency)',
545 action='store_true', default=False, dest='low_bandwidth')
547 worker_group = parser.add_argument_group('worker interface')
548 worker_group.add_argument('-w', '--worker-port', metavar='PORT',
549 help='listen on PORT for RPC connections from miners asking for work and providing responses (default: 9332)',
550 type=int, action='store', default=9332, dest='worker_port')
552 bitcoind_group = parser.add_argument_group('bitcoind interface')
553 bitcoind_group.add_argument('--bitcoind-address', metavar='BITCOIND_ADDRESS',
554 help='connect to a bitcoind at this address (default: 127.0.0.1)',
555 type=str, action='store', default='127.0.0.1', dest='bitcoind_address')
556 bitcoind_group.add_argument('--bitcoind-rpc-port', metavar='BITCOIND_RPC_PORT',
557 help='connect to a bitcoind at this port over the RPC interface - used to get the current highest block via getwork (default: 8332)',
558 type=int, action='store', default=8332, dest='bitcoind_rpc_port')
559 bitcoind_group.add_argument('--bitcoind-p2p-port', metavar='BITCOIND_P2P_PORT',
560 help='connect to a bitcoind at this port over the p2p interface - used to submit blocks and get the pubkey to generate to via an IP transaction (default: 8333 normally. 18333 for testnet)',
561 type=int, action='store', default=None, dest='bitcoind_p2p_port')
563 bitcoind_group.add_argument(metavar='BITCOIND_RPC_USERNAME',
564 help='bitcoind RPC interface username',
565 type=str, action='store', dest='bitcoind_rpc_username')
566 bitcoind_group.add_argument(metavar='BITCOIND_RPC_PASSWORD',
567 help='bitcoind RPC interface password',
568 type=str, action='store', dest='bitcoind_rpc_password')
570 args = parser.parse_args()
573 p2pool_init.DEBUG = True
574 class TimestampingPipe(object):
575 def __init__(self, inner_file):
576 self.inner_file = inner_file
578 def write(self, data):
579 buf = self.buf + data
580 lines = buf.split('\n')
581 for line in lines[:-1]:
582 self.inner_file.write("%s %s\n" % (time.strftime("%H:%M:%S"), line))
584 sys.stdout = TimestampingPipe(sys.stdout)
585 sys.stderr = TimestampingPipe(sys.stderr)
587 if args.bitcoind_p2p_port is None:
588 args.bitcoind_p2p_port = args.net.BITCOIN_P2P_PORT
590 if args.p2pool_port is None:
591 args.p2pool_port = args.net.P2P_PORT
593 if args.address is not None:
595 args.pubkey_hash = bitcoin.data.address_to_pubkey_hash(args.address, args.net)
597 raise ValueError("error parsing address: " + repr(e))
599 args.pubkey_hash = None
601 reactor.callWhenRunning(main, args)