3 from __future__ import division
16 from twisted.internet import defer, reactor
17 from twisted.web import server, resource
18 from twisted.python import log
20 import bitcoin.p2p, bitcoin.getwork, bitcoin.data
21 from util import db, expiring_dict, jsonrpc, variable, deferral, math
22 from . import p2p, worker_interface, skiplists
23 import p2pool.data as p2pool
24 import p2pool as p2pool_init
26 @deferral.retry('Error getting work from bitcoind:', 3)
27 @defer.inlineCallbacks
28 def getwork(bitcoind):
29 # a block could arrive in between these two queries
30 getwork_df, height_df = bitcoind.rpc_getwork(), bitcoind.rpc_getblocknumber()
32 getwork, height = bitcoin.getwork.BlockAttempt.from_getwork((yield getwork_df)), (yield height_df)
34 # get rid of residual errors
35 getwork_df.addErrback(lambda fail: None)
36 height_df.addErrback(lambda fail: None)
37 defer.returnValue((getwork, height))
39 @deferral.retry('Error getting payout script from bitcoind:', 1)
40 @defer.inlineCallbacks
41 def get_payout_script(factory):
42 res = yield (yield factory.getProtocol()).check_order(order=bitcoin.p2p.Protocol.null_order)
43 if res['reply'] == 'success':
44 defer.returnValue(res['script'])
45 elif res['reply'] == 'denied':
46 defer.returnValue(None)
48 raise ValueError('Unexpected reply: %r' % (res,))
50 @deferral.retry('Error creating payout script:', 10)
51 @defer.inlineCallbacks
52 def get_payout_script2(bitcoind, net):
53 defer.returnValue(bitcoin.data.pubkey_hash_to_script2(bitcoin.data.address_to_pubkey_hash((yield bitcoind.rpc_getaccountaddress('p2pool')), net)))
55 @defer.inlineCallbacks
61 print 'p2pool (version %s)' % (p2pool_init.__version__,)
64 # connect to bitcoind over JSON-RPC and do initial getwork
65 url = 'http://%s:%i/' % (args.bitcoind_address, args.bitcoind_rpc_port)
66 print '''Testing bitcoind RPC connection to '%s' with username '%s'...''' % (url, args.bitcoind_rpc_username)
67 bitcoind = jsonrpc.Proxy(url, (args.bitcoind_rpc_username, args.bitcoind_rpc_password))
68 temp_work, temp_height = yield getwork(bitcoind)
70 print ' Current block hash: %x height: %i' % (temp_work.previous_block, temp_height)
73 # connect to bitcoind over bitcoin-p2p and do checkorder to get pubkey to send payouts to
74 print '''Testing bitcoind P2P connection to '%s:%s'...''' % (args.bitcoind_address, args.bitcoind_p2p_port)
75 factory = bitcoin.p2p.ClientFactory(args.net)
76 reactor.connectTCP(args.bitcoind_address, args.bitcoind_p2p_port, factory)
77 my_script = yield get_payout_script(factory)
78 if args.pubkey_hash is None:
80 print 'IP transaction denied ... falling back to sending to address.'
81 my_script = yield get_payout_script2(bitcoind, args.net)
83 my_script = bitcoin.data.pubkey_hash_to_script2(args.pubkey_hash)
85 print ' Payout script:', my_script.encode('hex')
88 ht = bitcoin.p2p.HeightTracker(factory)
90 tracker = p2pool.OkayTracker(args.net)
91 chains = expiring_dict.ExpiringDict(300)
92 def get_chain(chain_id_data):
93 return chains.setdefault(chain_id_data, Chain(chain_id_data))
95 peer_heads = expiring_dict.ExpiringDict(300) # hash -> peers that know of it
97 # information affecting work that should trigger a long-polling update
98 current_work = variable.Variable(None)
99 # information affecting work that should not trigger a long-polling update
100 current_work2 = variable.Variable(None)
102 work_updated = variable.Event()
103 tracker_updated = variable.Event()
105 requested = expiring_dict.ExpiringDict(300)
107 @defer.inlineCallbacks
108 def set_real_work1():
109 work, height = yield getwork(bitcoind)
110 # XXX call tracker_updated
111 current_work.set(dict(
112 version=work.version,
113 previous_block=work.previous_block,
116 best_share_hash=current_work.value['best_share_hash'] if current_work.value is not None else None,
118 current_work2.set(dict(
119 clock_offset=time.time() - work.timestamp,
122 def set_real_work2():
123 best, desired = tracker.think(ht, current_work.value['previous_block'], time.time() - current_work2.value['clock_offset'])
125 t = dict(current_work.value)
126 t['best_share_hash'] = best
130 for peer2, share_hash in desired:
131 #if share_hash not in tracker.tails: # was received in the time tracker.think was running
133 last_request_time, count = requested.get(share_hash, (None, 0))
134 if last_request_time is not None and last_request_time - 5 < t < last_request_time + 10 * 1.5**count:
136 potential_peers = set()
137 for head in tracker.tails[share_hash]:
138 potential_peers.update(peer_heads.get(head, set()))
139 potential_peers = [peer for peer in potential_peers if peer.connected2]
140 if count == 0 and peer2 is not None and peer2.connected2:
143 peer = random.choice(potential_peers) if potential_peers and random.random() > .2 else peer2
147 print 'Requesting parent share %s from %s' % (p2pool.format_hash(share_hash), '%s:%i' % peer.addr)
151 stops=list(set(tracker.heads) | set(
152 tracker.get_nth_parent_hash(head, min(max(0, tracker.get_height_and_last(head)[0] - 1), 10)) for head in tracker.heads
155 requested[share_hash] = t, count + 1
157 print 'Initializing work...'
158 yield set_real_work1()
162 start_time = time.time() - current_work2.value['clock_offset']
164 # setup p2p logic and join p2pool network
166 def share_share(share, ignore_peer=None):
167 for peer in p2p_node.peers.itervalues():
168 if peer is ignore_peer:
170 if p2pool_init.DEBUG:
171 print "Sending share %s to %r" % (p2pool.format_hash(share.hash), peer.addr)
172 peer.send_shares([share])
175 def p2p_shares(shares, peer=None):
177 print 'Processing %i shares...' % (len(shares),)
181 if share.hash in tracker.shares:
182 #print 'Got duplicate share, ignoring. Hash: %s' % (p2pool.format_hash(share.hash),)
186 #print 'Received share %s from %r' % (p2pool.format_hash(share.hash), share.peer.addr if share.peer is not None else None)
189 #for peer2, share_hash in desired:
190 # print 'Requesting parent share %x' % (share_hash,)
191 # peer2.send_getshares(hashes=[share_hash], parents=2000)
193 if share.bitcoin_hash <= share.header['target']:
195 print 'GOT BLOCK! Passing to bitcoind! %s bitcoin: %x' % (p2pool.format_hash(share.hash), share.bitcoin_hash,)
197 if factory.conn.value is not None:
198 factory.conn.value.send_block(block=share.as_block(tracker, args.net))
200 print 'No bitcoind connection! Erp!'
202 if shares and peer is not None:
203 peer_heads.setdefault(shares[0].hash, set()).add(peer)
206 tracker_updated.happened()
209 print '... done processing %i shares. Have: %i/~%i' % (len(shares), len(tracker.shares), 2*args.net.CHAIN_LENGTH)
211 def p2p_share_hashes(share_hashes, peer):
214 for share_hash in share_hashes:
215 if share_hash in tracker.shares:
217 last_request_time, count = requested.get(share_hash, (None, 0))
218 if last_request_time is not None and last_request_time - 5 < t < last_request_time + 10 * 1.5**count:
220 print 'Got share hash, requesting! Hash: %s' % (p2pool.format_hash(share_hash),)
221 get_hashes.append(share_hash)
222 requested[share_hash] = t, count + 1
224 if share_hashes and peer is not None:
225 peer_heads.setdefault(share_hashes[0], set()).add(peer)
227 peer.send_getshares(hashes=get_hashes, parents=0, stops=[])
229 def p2p_get_shares(share_hashes, parents, stops, peer):
230 parents = min(parents, 1000//len(share_hashes))
233 for share_hash in share_hashes:
234 for share in itertools.islice(tracker.get_chain_known(share_hash), parents + 1):
235 if share.hash in stops:
238 peer.send_shares(shares, full=True)
240 print 'Joining p2pool network using TCP port %i...' % (args.p2pool_port,)
244 ip, port = x.split(':')
247 return x, args.net.P2P_PORT
250 ('72.14.191.28', args.net.P2P_PORT),
251 ('62.204.197.159', args.net.P2P_PORT),
254 nodes.append(((yield reactor.resolve('p2pool.forre.st')), args.net.P2P_PORT))
256 log.err(None, 'Error resolving bootstrap node IP:')
259 current_work=current_work,
260 port=args.p2pool_port,
262 addr_store=db.SQLiteDict(sqlite3.connect(os.path.join(os.path.dirname(sys.argv[0]), 'addrs.dat'), isolation_level=None), args.net.ADDRS_TABLE),
263 mode=0 if args.low_bandwidth else 1,
264 preferred_addrs=map(parse, args.p2pool_nodes) + nodes,
266 p2p_node.handle_shares = p2p_shares
267 p2p_node.handle_share_hashes = p2p_share_hashes
268 p2p_node.handle_get_shares = p2p_get_shares
272 # send share when the chain changes to their chain
273 def work_changed(new_work):
274 #print 'Work changed:', new_work
275 for share in tracker.get_chain_known(new_work['best_share_hash']):
278 share_share(share, share.peer)
279 current_work.changed.watch(work_changed)
284 # start listening for workers with a JSON-RPC server
286 print 'Listening for workers on port %i...' % (args.worker_port,)
290 merkle_root_to_transactions = expiring_dict.ExpiringDict(300)
291 run_identifier = struct.pack('<Q', random.randrange(2**64))
293 def compute(state, payout_script):
294 if payout_script is None:
295 payout_script = my_script
296 if state['best_share_hash'] is None and args.net.PERSIST:
297 raise jsonrpc.Error(-12345, u'p2pool is downloading shares')
298 pre_extra_txs = [tx for tx in tx_pool.itervalues() if tx.is_good()]
299 pre_extra_txs = pre_extra_txs[:2**16 - 1] # merkle_branch limit
302 for tx in pre_extra_txs:
303 this_size = len(bitcoin.data.tx_type.pack(tx.tx))
304 if size + this_size > 500000:
309 # XXX assuming generate_tx is smallish here..
310 generate_tx = p2pool.generate_transaction(
312 previous_share_hash=state['best_share_hash'],
313 new_script=payout_script,
314 subsidy=(50*100000000 >> (state['height'] + 1)//210000) + sum(tx.value_in - tx.value_out for tx in extra_txs),
315 nonce=run_identifier + struct.pack('<Q', random.randrange(2**64)),
316 block_target=state['target'],
319 print 'Generating! Difficulty: %.06f Payout if block: %.6f BTC' % (0xffff*2**208/p2pool.coinbase_type.unpack(generate_tx['tx_ins'][0]['script'])['share_data']['target'], generate_tx['tx_outs'][-1]['value']*1e-8)
320 #print 'Target: %x' % (p2pool.coinbase_type.unpack(generate_tx['tx_ins'][0]['script'])['share_data']['target'],)
321 #, have', shares.count(my_script) - 2, 'share(s) in the current chain. Fee:', sum(tx.value_in - tx.value_out for tx in extra_txs)/100000000
322 transactions = [generate_tx] + [tx.tx for tx in extra_txs]
323 merkle_root = bitcoin.data.merkle_hash(transactions)
324 merkle_root_to_transactions[merkle_root] = transactions # will stay for 1000 seconds
326 timestamp = int(time.time() - current_work2.value['clock_offset'])
327 if state['best_share_hash'] is not None:
328 timestamp2 = math.median((s.timestamp for s in itertools.islice(tracker.get_chain_to_root(state['best_share_hash']), 11)), use_float=False) + 1
329 if timestamp2 > timestamp:
330 print 'Toff', timestamp2 - timestamp
331 timestamp = timestamp2
332 target2 = p2pool.coinbase_type.unpack(generate_tx['tx_ins'][0]['script'])['share_data']['target']
333 times[p2pool.coinbase_type.unpack(generate_tx['tx_ins'][0]['script'])['share_data']['nonce']] = time.time()
334 #print 'SENT', 2**256//p2pool.coinbase_type.unpack(generate_tx['tx_ins'][0]['script'])['share_data']['target']
335 return bitcoin.getwork.BlockAttempt(state['version'], state['previous_block'], merkle_root, timestamp, state['target'], target2)
340 def got_response(data):
342 # match up with transactions
343 header = bitcoin.getwork.decode_data(data)
344 transactions = merkle_root_to_transactions.get(header['merkle_root'], None)
345 if transactions is None:
346 print '''Couldn't link returned work's merkle root with its transactions - should only happen if you recently restarted p2pool'''
348 block = dict(header=header, txs=transactions)
349 hash_ = bitcoin.data.block_header_type.hash256(block['header'])
350 if hash_ <= block['header']['target'] or p2pool_init.DEBUG:
352 print 'GOT BLOCK! Passing to bitcoind! bitcoin: %x' % (hash_,)
354 if factory.conn.value is not None:
355 factory.conn.value.send_block(block=block)
357 print 'No bitcoind connection! Erp!'
358 target = p2pool.coinbase_type.unpack(transactions[0]['tx_ins'][0]['script'])['share_data']['target']
360 print 'Received invalid share from worker - %x/%x' % (hash_, target)
362 share = p2pool.Share.from_block(block)
363 my_shares.add(share.hash)
364 print 'GOT SHARE! %s prev %s age %.2fs' % (p2pool.format_hash(share.hash), p2pool.format_hash(share.previous_hash), time.time() - times[share.nonce]) + (' DEAD ON ARRIVAL' if share.previous_hash != current_work.value['best_share_hash'] else '')
365 good = share.previous_hash == current_work.value['best_share_hash']
366 # maybe revert back to tracker being non-blocking so 'good' can be more accurate?
368 # eg. good = share.hash == current_work.value['best_share_hash'] here
371 log.err(None, 'Error processing data received from worker:')
374 web_root = worker_interface.WorkerInterface(current_work, compute, got_response, args.net)
377 if current_work.value['best_share_hash'] is not None:
378 height, last = tracker.get_height_and_last(current_work.value['best_share_hash'])
379 att_s = p2pool.get_pool_attempts_per_second(tracker, current_work.value['best_share_hash'], args.net, min(height, 720))
380 return json.dumps(att_s)
381 return json.dumps(None)
384 height, last = tracker.get_height_and_last(current_work.value['best_share_hash'])
385 weights, total_weight = tracker.get_cumulative_weights(current_work.value['best_share_hash'], min(height, 720), 2**256)
387 for script in sorted(weights, key=lambda s: weights[s]):
388 res[script.encode('hex')] = weights[script]/total_weight
389 return json.dumps(res)
391 class WebInterface(resource.Resource):
392 def __init__(self, func, mime_type):
393 self.func, self.mime_type = func, mime_type
395 def render_GET(self, request):
396 request.setHeader('Content-Type', self.mime_type)
399 web_root.putChild('rate', WebInterface(get_rate, 'application/json'))
400 web_root.putChild('users', WebInterface(get_users, 'application/json'))
402 web_root.putChild('chain_img', WebInterface(lambda: draw.get(tracker, current_work.value['best_share_hash']), 'image/png'))
404 reactor.listenTCP(args.worker_port, server.Site(web_root))
411 tx_pool = expiring_dict.ExpiringDict(600, get_touches=False) # hash -> tx
412 get_raw_transaction = deferral.DeferredCacher(lambda tx_hash: bitcoind.rpc_getrawtransaction('%x' % tx_hash), expiring_dict.ExpiringDict(100))
415 def __init__(self, tx, seen_at_block):
416 self.hash = bitcoin.data.tx_type.hash256(tx)
418 self.seen_at_block = seen_at_block
419 self.mentions = set([bitcoin.data.tx_type.hash256(tx)] + [tx_in['previous_output']['hash'] for tx_in in tx['tx_ins']])
421 #print '%x %r' % (seen_at_block, tx)
422 #for mention in self.mentions:
423 # print '%x' % mention
425 self.parents_all_in_blocks = False
428 self.value_out = sum(txout['value'] for txout in self.tx['tx_outs'])
429 self._find_parents_in_blocks()
431 @defer.inlineCallbacks
432 def _find_parents_in_blocks(self):
433 for tx_in in self.tx['tx_ins']:
435 raw_transaction = yield get_raw_transaction(tx_in['previous_output']['hash'])
438 self.value_in += raw_transaction['tx']['txouts'][tx_in['previous_output']['index']]['value']
439 #print raw_transaction
440 if not raw_transaction['parent_blocks']:
442 self.parents_all_in_blocks = True
445 if not self.parents_all_in_blocks:
451 @defer.inlineCallbacks
454 assert isinstance(tx_hash, (int, long))
455 #print 'REQUESTING', tx_hash
456 tx = yield (yield factory.getProtocol()).get_tx(tx_hash)
458 tx_pool[bitcoin.data.tx_type.hash256(tx)] = Tx(tx, current_work.value['previous_block'])
460 log.err(None, 'Error handling tx:')
461 # disable for now, for testing impact on stales
462 #factory.new_tx.watch(new_tx)
464 def new_block(block_hash):
465 work_updated.happened()
466 factory.new_block.watch(new_block)
468 print 'Started successfully!'
471 @defer.inlineCallbacks
474 flag = work_updated.get_deferred()
476 yield set_real_work1()
479 yield defer.DeferredList([flag, deferral.sleep(random.expovariate(1/1))], fireOnOneCallback=True)
481 @defer.inlineCallbacks
484 flag = tracker_updated.get_deferred()
489 yield defer.DeferredList([flag, deferral.sleep(random.expovariate(1/1))], fireOnOneCallback=True)
494 counter = skiplists.CountsSkipList(tracker, run_identifier)
497 yield deferral.sleep(random.expovariate(1/1))
499 if current_work.value['best_share_hash'] is not None:
500 height, last = tracker.get_height_and_last(current_work.value['best_share_hash'])
502 att_s = p2pool.get_pool_attempts_per_second(tracker, current_work.value['best_share_hash'], args.net)
503 weights, total_weight = tracker.get_cumulative_weights(current_work.value['best_share_hash'], min(height, 120), 2**100)
504 matching_in_chain = counter(current_work.value['best_share_hash'], height)
505 shares_in_chain = my_shares & matching_in_chain
506 stale_shares = my_shares - matching_in_chain
507 print 'Pool: %sH/s in %i shares Recent: %.02f%% >%sH/s Shares: %i (%i stale) Peers: %i' % (
510 weights.get(my_script, 0)/total_weight*100,
511 math.format(weights.get(my_script, 0)/total_weight*att_s),
512 len(shares_in_chain) + len(stale_shares),
516 #weights, total_weight = tracker.get_cumulative_weights(current_work.value['best_share_hash'], min(height, 100), 2**100)
517 #for k, v in weights.iteritems():
518 # print k.encode('hex'), v/total_weight
522 log.err(None, 'Fatal error:')
526 parser = argparse.ArgumentParser(description='p2pool (version %s)' % (p2pool_init.__version__,))
527 parser.add_argument('--version', action='version', version=p2pool_init.__version__)
528 parser.add_argument('--testnet',
529 help='use the testnet',
530 action='store_const', const=p2pool.Testnet, default=p2pool.Mainnet, dest='net')
531 parser.add_argument('--debug',
532 help='debugging mode',
533 action='store_const', const=True, default=False, dest='debug')
534 parser.add_argument('-a', '--address',
535 help='generate to this address (defaults to requesting one from bitcoind)',
536 type=str, action='store', default=None, dest='address')
537 parser.add_argument('--charts',
538 help='generate charts on the web interface (requires PIL and pygame)',
539 action='store_const', const=True, default=False, dest='charts')
541 p2pool_group = parser.add_argument_group('p2pool interface')
542 p2pool_group.add_argument('--p2pool-port', metavar='PORT',
543 help='use TCP port PORT to listen for connections (default: 9333 normally, 19333 for testnet) (forward this port from your router!)',
544 type=int, action='store', default=None, dest='p2pool_port')
545 p2pool_group.add_argument('-n', '--p2pool-node', metavar='ADDR[:PORT]',
546 help='connect to existing p2pool node at ADDR listening on TCP port PORT (defaults to 9333 normally, 19333 for testnet), in addition to builtin addresses',
547 type=str, action='append', default=[], dest='p2pool_nodes')
548 parser.add_argument('-l', '--low-bandwidth',
549 help='trade lower bandwidth usage for higher latency (reduced efficiency)',
550 action='store_true', default=False, dest='low_bandwidth')
552 worker_group = parser.add_argument_group('worker interface')
553 worker_group.add_argument('-w', '--worker-port', metavar='PORT',
554 help='listen on PORT for RPC connections from miners asking for work and providing responses (default: 9332)',
555 type=int, action='store', default=9332, dest='worker_port')
557 bitcoind_group = parser.add_argument_group('bitcoind interface')
558 bitcoind_group.add_argument('--bitcoind-address', metavar='BITCOIND_ADDRESS',
559 help='connect to a bitcoind at this address (default: 127.0.0.1)',
560 type=str, action='store', default='127.0.0.1', dest='bitcoind_address')
561 bitcoind_group.add_argument('--bitcoind-rpc-port', metavar='BITCOIND_RPC_PORT',
562 help='connect to a bitcoind at this port over the RPC interface - used to get the current highest block via getwork (default: 8332)',
563 type=int, action='store', default=8332, dest='bitcoind_rpc_port')
564 bitcoind_group.add_argument('--bitcoind-p2p-port', metavar='BITCOIND_P2P_PORT',
565 help='connect to a bitcoind at this port over the p2p interface - used to submit blocks and get the pubkey to generate to via an IP transaction (default: 8333 normally. 18333 for testnet)',
566 type=int, action='store', default=None, dest='bitcoind_p2p_port')
568 bitcoind_group.add_argument(metavar='BITCOIND_RPC_USERNAME',
569 help='bitcoind RPC interface username',
570 type=str, action='store', dest='bitcoind_rpc_username')
571 bitcoind_group.add_argument(metavar='BITCOIND_RPC_PASSWORD',
572 help='bitcoind RPC interface password',
573 type=str, action='store', dest='bitcoind_rpc_password')
575 args = parser.parse_args()
578 p2pool_init.DEBUG = True
579 class TeePipe(object):
580 def __init__(self, outputs):
581 self.outputs = outputs
582 def write(self, data):
583 for output in self.outputs:
586 for output in self.outputs:
588 class TimestampingPipe(object):
589 def __init__(self, inner_file):
590 self.inner_file = inner_file
593 def write(self, data):
594 buf = self.buf + data
595 lines = buf.split('\n')
596 for line in lines[:-1]:
597 self.inner_file.write('%s %s\n' % (datetime.datetime.now().strftime("%H:%M:%S.%f"), line))
598 self.inner_file.flush()
602 sys.stdout = sys.stderr = log.DefaultObserver.stderr = TimestampingPipe(TeePipe([sys.stderr, open(os.path.join(os.path.dirname(sys.argv[0]), 'debug.log'), 'w')]))
604 if args.bitcoind_p2p_port is None:
605 args.bitcoind_p2p_port = args.net.BITCOIN_P2P_PORT
607 if args.p2pool_port is None:
608 args.p2pool_port = args.net.P2P_PORT
610 if args.address is not None:
612 args.pubkey_hash = bitcoin.data.address_to_pubkey_hash(args.address, args.net)
614 raise ValueError('error parsing address: ' + repr(e))
616 args.pubkey_hash = None
618 reactor.callWhenRunning(main, args)