3 from __future__ import division
16 from twisted.internet import defer, reactor
17 from twisted.web import server, resource
18 from twisted.python import log
20 import bitcoin.p2p, bitcoin.getwork, bitcoin.data
21 from util import db, expiring_dict, jsonrpc, variable, deferral, math
22 from . import p2p, worker_interface, skiplists
23 import p2pool.data as p2pool
24 import p2pool as p2pool_init
26 @deferral.retry('Error getting work from bitcoind:', 3)
27 @defer.inlineCallbacks
28 def getwork(bitcoind):
29 # a block could arrive in between these two queries
30 getwork_df, height_df = bitcoind.rpc_getwork(), bitcoind.rpc_getblocknumber()
32 getwork, height = bitcoin.getwork.BlockAttempt.from_getwork((yield getwork_df)), (yield height_df)
34 # get rid of residual errors
35 getwork_df.addErrback(lambda fail: None)
36 height_df.addErrback(lambda fail: None)
37 defer.returnValue((getwork, height))
39 @deferral.retry('Error getting payout script from bitcoind:', 1)
40 @defer.inlineCallbacks
41 def get_payout_script(factory):
42 res = yield (yield factory.getProtocol()).check_order(order=bitcoin.p2p.Protocol.null_order)
43 if res['reply'] == 'success':
44 defer.returnValue(res['script'])
45 elif res['reply'] == 'denied':
46 defer.returnValue(None)
48 raise ValueError('Unexpected reply: %r' % (res,))
50 @deferral.retry('Error creating payout script:', 10)
51 @defer.inlineCallbacks
52 def get_payout_script2(bitcoind, net):
53 defer.returnValue(bitcoin.data.pubkey_hash_to_script2(bitcoin.data.address_to_pubkey_hash((yield bitcoind.rpc_getaccountaddress('p2pool')), net)))
55 @defer.inlineCallbacks
61 print 'p2pool (version %s)' % (p2pool_init.__version__,)
64 # connect to bitcoind over JSON-RPC and do initial getwork
65 url = 'http://%s:%i/' % (args.bitcoind_address, args.bitcoind_rpc_port)
66 print '''Testing bitcoind RPC connection to '%s' with username '%s'...''' % (url, args.bitcoind_rpc_username)
67 bitcoind = jsonrpc.Proxy(url, (args.bitcoind_rpc_username, args.bitcoind_rpc_password))
68 temp_work, temp_height = yield getwork(bitcoind)
70 print ' Current block hash: %x height: %i' % (temp_work.previous_block, temp_height)
73 # connect to bitcoind over bitcoin-p2p and do checkorder to get pubkey to send payouts to
74 print '''Testing bitcoind P2P connection to '%s:%s'...''' % (args.bitcoind_address, args.bitcoind_p2p_port)
75 factory = bitcoin.p2p.ClientFactory(args.net)
76 reactor.connectTCP(args.bitcoind_address, args.bitcoind_p2p_port, factory)
77 my_script = yield get_payout_script(factory)
78 if args.pubkey_hash is None:
80 print 'IP transaction denied ... falling back to sending to address.'
81 my_script = yield get_payout_script2(bitcoind, args.net)
83 my_script = bitcoin.data.pubkey_hash_to_script2(args.pubkey_hash)
85 print ' Payout script:', my_script.encode('hex')
88 ht = bitcoin.p2p.HeightTracker(factory)
90 tracker = p2pool.OkayTracker(args.net)
91 chains = expiring_dict.ExpiringDict(300)
92 def get_chain(chain_id_data):
93 return chains.setdefault(chain_id_data, Chain(chain_id_data))
95 peer_heads = expiring_dict.ExpiringDict(300) # hash -> peers that know of it
97 # information affecting work that should trigger a long-polling update
98 current_work = variable.Variable(None)
99 # information affecting work that should not trigger a long-polling update
100 current_work2 = variable.Variable(None)
102 work_updated = variable.Event()
103 tracker_updated = variable.Event()
105 requested = expiring_dict.ExpiringDict(300)
107 @defer.inlineCallbacks
108 def set_real_work1():
109 work, height = yield getwork(bitcoind)
110 # XXX call tracker_updated
111 current_work.set(dict(
112 version=work.version,
113 previous_block=work.previous_block,
116 best_share_hash=current_work.value['best_share_hash'] if current_work.value is not None else None,
118 current_work2.set(dict(
119 clock_offset=time.time() - work.timestamp,
122 def set_real_work2():
123 best, desired = tracker.think(ht, current_work.value['previous_block'], time.time() - current_work2.value['clock_offset'])
125 t = dict(current_work.value)
126 t['best_share_hash'] = best
130 for peer2, share_hash in desired:
131 #if share_hash not in tracker.tails: # was received in the time tracker.think was running
133 last_request_time, count = requested.get(share_hash, (None, 0))
134 if last_request_time is not None and last_request_time - 5 < t < last_request_time + 10 * 1.5**count:
136 potential_peers = set()
137 for head in tracker.tails[share_hash]:
138 potential_peers.update(peer_heads.get(head, set()))
139 potential_peers = [peer for peer in potential_peers if peer.connected2]
140 if count == 0 and peer2 is not None and peer2.connected2:
143 peer = random.choice(potential_peers) if potential_peers and random.random() > .2 else peer2
147 print 'Requesting parent share %s from %s' % (p2pool.format_hash(share_hash), '%s:%i' % peer.addr)
151 stops=list(set(tracker.heads) | set(
152 tracker.get_nth_parent_hash(head, min(max(0, tracker.get_height_and_last(head)[0] - 1), 10)) for head in tracker.heads
155 requested[share_hash] = t, count + 1
157 print 'Initializing work...'
158 yield set_real_work1()
162 start_time = time.time() - current_work2.value['clock_offset']
164 # setup p2p logic and join p2pool network
166 def share_share(share, ignore_peer=None):
167 for peer in p2p_node.peers.itervalues():
168 if peer is ignore_peer:
170 #if p2pool_init.DEBUG:
171 # print "Sending share %s to %r" % (p2pool.format_hash(share.hash), peer.addr)
172 peer.send_shares([share])
175 def p2p_shares(shares, peer=None):
177 print 'Processing %i shares...' % (len(shares),)
181 if share.hash in tracker.shares:
182 #print 'Got duplicate share, ignoring. Hash: %s' % (p2pool.format_hash(share.hash),)
186 #print 'Received share %s from %r' % (p2pool.format_hash(share.hash), share.peer.addr if share.peer is not None else None)
189 #for peer2, share_hash in desired:
190 # print 'Requesting parent share %x' % (share_hash,)
191 # peer2.send_getshares(hashes=[share_hash], parents=2000)
193 if share.bitcoin_hash <= share.header['target']:
195 print 'GOT BLOCK! Passing to bitcoind! %s bitcoin: %x' % (p2pool.format_hash(share.hash), share.bitcoin_hash,)
197 if factory.conn.value is not None:
198 factory.conn.value.send_block(block=share.as_block(tracker, args.net))
200 print 'No bitcoind connection! Erp!'
202 if shares and peer is not None:
203 peer_heads.setdefault(shares[0].hash, set()).add(peer)
206 tracker_updated.happened()
209 print '... done processing %i shares. Have: %i/~%i' % (len(shares), len(tracker.shares), 2*args.net.CHAIN_LENGTH)
211 def p2p_share_hashes(share_hashes, peer):
214 for share_hash in share_hashes:
215 if share_hash in tracker.shares:
217 last_request_time, count = requested.get(share_hash, (None, 0))
218 if last_request_time is not None and last_request_time - 5 < t < last_request_time + 10 * 1.5**count:
220 print 'Got share hash, requesting! Hash: %s' % (p2pool.format_hash(share_hash),)
221 get_hashes.append(share_hash)
222 requested[share_hash] = t, count + 1
224 if share_hashes and peer is not None:
225 peer_heads.setdefault(share_hashes[0], set()).add(peer)
227 peer.send_getshares(hashes=get_hashes, parents=0, stops=[])
229 def p2p_get_shares(share_hashes, parents, stops, peer):
230 parents = min(parents, 1000//len(share_hashes))
233 for share_hash in share_hashes:
234 for share in itertools.islice(tracker.get_chain_known(share_hash), parents + 1):
235 if share.hash in stops:
238 peer.send_shares(shares, full=True)
240 print 'Joining p2pool network using TCP port %i...' % (args.p2pool_port,)
244 ip, port = x.split(':')
247 return x, args.net.P2P_PORT
250 ('72.14.191.28', args.net.P2P_PORT),
251 ('62.204.197.159', args.net.P2P_PORT),
252 ('142.58.248.28', args.net.P2P_PORT),
253 ('94.23.34.145', args.net.P2P_PORT),
257 'dabuttonfactory.com',
260 nodes.add(((yield reactor.resolve(host)), args.net.P2P_PORT))
262 log.err(None, 'Error resolving bootstrap node IP:')
265 current_work=current_work,
266 port=args.p2pool_port,
268 addr_store=db.SQLiteDict(sqlite3.connect(os.path.join(os.path.dirname(sys.argv[0]), 'addrs.dat'), isolation_level=None), args.net.ADDRS_TABLE),
269 mode=0 if args.low_bandwidth else 1,
270 preferred_addrs=set(map(parse, args.p2pool_nodes)) | nodes,
272 p2p_node.handle_shares = p2p_shares
273 p2p_node.handle_share_hashes = p2p_share_hashes
274 p2p_node.handle_get_shares = p2p_get_shares
278 # send share when the chain changes to their chain
279 def work_changed(new_work):
280 #print 'Work changed:', new_work
281 for share in tracker.get_chain_known(new_work['best_share_hash']):
284 share_share(share, share.peer)
285 current_work.changed.watch(work_changed)
290 # start listening for workers with a JSON-RPC server
292 print 'Listening for workers on port %i...' % (args.worker_port,)
296 merkle_root_to_transactions = expiring_dict.ExpiringDict(300)
297 run_identifier = struct.pack('<Q', random.randrange(2**64))
299 def compute(state, payout_script):
300 if payout_script is None:
301 payout_script = my_script
302 if state['best_share_hash'] is None and args.net.PERSIST:
303 raise jsonrpc.Error(-12345, u'p2pool is downloading shares')
304 pre_extra_txs = [tx for tx in tx_pool.itervalues() if tx.is_good()]
305 pre_extra_txs = pre_extra_txs[:2**16 - 1] # merkle_branch limit
308 for tx in pre_extra_txs:
309 this_size = len(bitcoin.data.tx_type.pack(tx.tx))
310 if size + this_size > 500000:
315 # XXX assuming generate_tx is smallish here..
316 generate_tx = p2pool.generate_transaction(
318 previous_share_hash=state['best_share_hash'],
319 new_script=payout_script,
320 subsidy=(50*100000000 >> (state['height'] + 1)//210000) + sum(tx.value_in - tx.value_out for tx in extra_txs),
321 nonce=run_identifier + struct.pack('<Q', random.randrange(2**64)),
322 block_target=state['target'],
325 print 'Generating! Difficulty: %.06f Payout if block: %.6f BTC' % (0xffff*2**208/p2pool.coinbase_type.unpack(generate_tx['tx_ins'][0]['script'])['share_data']['target'], generate_tx['tx_outs'][-1]['value']*1e-8)
326 #print 'Target: %x' % (p2pool.coinbase_type.unpack(generate_tx['tx_ins'][0]['script'])['share_data']['target'],)
327 #, have', shares.count(my_script) - 2, 'share(s) in the current chain. Fee:', sum(tx.value_in - tx.value_out for tx in extra_txs)/100000000
328 transactions = [generate_tx] + [tx.tx for tx in extra_txs]
329 merkle_root = bitcoin.data.merkle_hash(transactions)
330 merkle_root_to_transactions[merkle_root] = transactions # will stay for 1000 seconds
332 timestamp = int(time.time() - current_work2.value['clock_offset'])
333 if state['best_share_hash'] is not None:
334 timestamp2 = math.median((s.timestamp for s in itertools.islice(tracker.get_chain_to_root(state['best_share_hash']), 11)), use_float=False) + 1
335 if timestamp2 > timestamp:
336 print 'Toff', timestamp2 - timestamp
337 timestamp = timestamp2
338 target2 = p2pool.coinbase_type.unpack(generate_tx['tx_ins'][0]['script'])['share_data']['target']
339 times[p2pool.coinbase_type.unpack(generate_tx['tx_ins'][0]['script'])['share_data']['nonce']] = time.time()
340 #print 'SENT', 2**256//p2pool.coinbase_type.unpack(generate_tx['tx_ins'][0]['script'])['share_data']['target']
341 return bitcoin.getwork.BlockAttempt(state['version'], state['previous_block'], merkle_root, timestamp, state['target'], target2)
346 def got_response(data):
348 # match up with transactions
349 header = bitcoin.getwork.decode_data(data)
350 transactions = merkle_root_to_transactions.get(header['merkle_root'], None)
351 if transactions is None:
352 print '''Couldn't link returned work's merkle root with its transactions - should only happen if you recently restarted p2pool'''
354 block = dict(header=header, txs=transactions)
355 hash_ = bitcoin.data.block_header_type.hash256(block['header'])
356 if hash_ <= block['header']['target'] or p2pool_init.DEBUG:
358 print 'GOT BLOCK! Passing to bitcoind! bitcoin: %x' % (hash_,)
360 if factory.conn.value is not None:
361 factory.conn.value.send_block(block=block)
363 print 'No bitcoind connection! Erp!'
364 target = p2pool.coinbase_type.unpack(transactions[0]['tx_ins'][0]['script'])['share_data']['target']
366 print 'Received invalid share from worker - %x/%x' % (hash_, target)
368 share = p2pool.Share.from_block(block)
369 my_shares.add(share.hash)
370 print 'GOT SHARE! %s prev %s age %.2fs' % (p2pool.format_hash(share.hash), p2pool.format_hash(share.previous_hash), time.time() - times[share.nonce]) + (' DEAD ON ARRIVAL' if share.previous_hash != current_work.value['best_share_hash'] else '')
371 good = share.previous_hash == current_work.value['best_share_hash']
372 # maybe revert back to tracker being non-blocking so 'good' can be more accurate?
374 # eg. good = share.hash == current_work.value['best_share_hash'] here
377 log.err(None, 'Error processing data received from worker:')
380 web_root = worker_interface.WorkerInterface(current_work, compute, got_response, args.net)
383 if current_work.value['best_share_hash'] is not None:
384 height, last = tracker.get_height_and_last(current_work.value['best_share_hash'])
385 att_s = p2pool.get_pool_attempts_per_second(tracker, current_work.value['best_share_hash'], args.net, min(height, 720))
386 return json.dumps(att_s)
387 return json.dumps(None)
390 height, last = tracker.get_height_and_last(current_work.value['best_share_hash'])
391 weights, total_weight = tracker.get_cumulative_weights(current_work.value['best_share_hash'], min(height, 720), 2**256)
393 for script in sorted(weights, key=lambda s: weights[s]):
394 res[script.encode('hex')] = weights[script]/total_weight
395 return json.dumps(res)
397 class WebInterface(resource.Resource):
398 def __init__(self, func, mime_type):
399 self.func, self.mime_type = func, mime_type
401 def render_GET(self, request):
402 request.setHeader('Content-Type', self.mime_type)
405 web_root.putChild('rate', WebInterface(get_rate, 'application/json'))
406 web_root.putChild('users', WebInterface(get_users, 'application/json'))
408 web_root.putChild('chain_img', WebInterface(lambda: draw.get(tracker, current_work.value['best_share_hash']), 'image/png'))
410 reactor.listenTCP(args.worker_port, server.Site(web_root))
417 tx_pool = expiring_dict.ExpiringDict(600, get_touches=False) # hash -> tx
418 get_raw_transaction = deferral.DeferredCacher(lambda tx_hash: bitcoind.rpc_getrawtransaction('%x' % tx_hash), expiring_dict.ExpiringDict(100))
421 def __init__(self, tx, seen_at_block):
422 self.hash = bitcoin.data.tx_type.hash256(tx)
424 self.seen_at_block = seen_at_block
425 self.mentions = set([bitcoin.data.tx_type.hash256(tx)] + [tx_in['previous_output']['hash'] for tx_in in tx['tx_ins']])
427 #print '%x %r' % (seen_at_block, tx)
428 #for mention in self.mentions:
429 # print '%x' % mention
431 self.parents_all_in_blocks = False
434 self.value_out = sum(txout['value'] for txout in self.tx['tx_outs'])
435 self._find_parents_in_blocks()
437 @defer.inlineCallbacks
438 def _find_parents_in_blocks(self):
439 for tx_in in self.tx['tx_ins']:
441 raw_transaction = yield get_raw_transaction(tx_in['previous_output']['hash'])
444 self.value_in += raw_transaction['tx']['txouts'][tx_in['previous_output']['index']]['value']
445 #print raw_transaction
446 if not raw_transaction['parent_blocks']:
448 self.parents_all_in_blocks = True
451 if not self.parents_all_in_blocks:
457 @defer.inlineCallbacks
460 assert isinstance(tx_hash, (int, long))
461 #print 'REQUESTING', tx_hash
462 tx = yield (yield factory.getProtocol()).get_tx(tx_hash)
464 tx_pool[bitcoin.data.tx_type.hash256(tx)] = Tx(tx, current_work.value['previous_block'])
466 log.err(None, 'Error handling tx:')
467 # disable for now, for testing impact on stales
468 #factory.new_tx.watch(new_tx)
470 def new_block(block_hash):
471 work_updated.happened()
472 factory.new_block.watch(new_block)
474 print 'Started successfully!'
477 @defer.inlineCallbacks
480 flag = work_updated.get_deferred()
482 yield set_real_work1()
485 yield defer.DeferredList([flag, deferral.sleep(random.expovariate(1/1))], fireOnOneCallback=True)
487 @defer.inlineCallbacks
490 flag = tracker_updated.get_deferred()
495 yield defer.DeferredList([flag, deferral.sleep(random.expovariate(1/1))], fireOnOneCallback=True)
500 counter = skiplists.CountsSkipList(tracker, run_identifier)
503 yield deferral.sleep(random.expovariate(1/1))
505 if current_work.value['best_share_hash'] is not None:
506 height, last = tracker.get_height_and_last(current_work.value['best_share_hash'])
508 att_s = p2pool.get_pool_attempts_per_second(tracker, current_work.value['best_share_hash'], args.net)
509 weights, total_weight = tracker.get_cumulative_weights(current_work.value['best_share_hash'], min(height, 120), 2**100)
510 matching_in_chain = counter(current_work.value['best_share_hash'], height)
511 shares_in_chain = my_shares & matching_in_chain
512 stale_shares = my_shares - matching_in_chain
513 print 'Pool: %sH/s in %i shares Recent: %.02f%% >%sH/s Shares: %i (%i stale) Peers: %i' % (
516 weights.get(my_script, 0)/total_weight*100,
517 math.format(weights.get(my_script, 0)/total_weight*att_s),
518 len(shares_in_chain) + len(stale_shares),
522 #weights, total_weight = tracker.get_cumulative_weights(current_work.value['best_share_hash'], min(height, 100), 2**100)
523 #for k, v in weights.iteritems():
524 # print k.encode('hex'), v/total_weight
528 log.err(None, 'Fatal error:')
532 parser = argparse.ArgumentParser(description='p2pool (version %s)' % (p2pool_init.__version__,))
533 parser.add_argument('--version', action='version', version=p2pool_init.__version__)
534 parser.add_argument('--testnet',
535 help='use the testnet',
536 action='store_const', const=p2pool.Testnet, default=p2pool.Mainnet, dest='net')
537 parser.add_argument('--debug',
538 help='debugging mode',
539 action='store_const', const=True, default=False, dest='debug')
540 parser.add_argument('-a', '--address',
541 help='generate to this address (defaults to requesting one from bitcoind)',
542 type=str, action='store', default=None, dest='address')
543 parser.add_argument('--charts',
544 help='generate charts on the web interface (requires PIL and pygame)',
545 action='store_const', const=True, default=False, dest='charts')
547 p2pool_group = parser.add_argument_group('p2pool interface')
548 p2pool_group.add_argument('--p2pool-port', metavar='PORT',
549 help='use TCP port PORT to listen for connections (default: 9333 normally, 19333 for testnet) (forward this port from your router!)',
550 type=int, action='store', default=None, dest='p2pool_port')
551 p2pool_group.add_argument('-n', '--p2pool-node', metavar='ADDR[:PORT]',
552 help='connect to existing p2pool node at ADDR listening on TCP port PORT (defaults to 9333 normally, 19333 for testnet), in addition to builtin addresses',
553 type=str, action='append', default=[], dest='p2pool_nodes')
554 parser.add_argument('-l', '--low-bandwidth',
555 help='trade lower bandwidth usage for higher latency (reduced efficiency)',
556 action='store_true', default=False, dest='low_bandwidth')
558 worker_group = parser.add_argument_group('worker interface')
559 worker_group.add_argument('-w', '--worker-port', metavar='PORT',
560 help='listen on PORT for RPC connections from miners asking for work and providing responses (default: 9332)',
561 type=int, action='store', default=9332, dest='worker_port')
563 bitcoind_group = parser.add_argument_group('bitcoind interface')
564 bitcoind_group.add_argument('--bitcoind-address', metavar='BITCOIND_ADDRESS',
565 help='connect to a bitcoind at this address (default: 127.0.0.1)',
566 type=str, action='store', default='127.0.0.1', dest='bitcoind_address')
567 bitcoind_group.add_argument('--bitcoind-rpc-port', metavar='BITCOIND_RPC_PORT',
568 help='connect to a bitcoind at this port over the RPC interface - used to get the current highest block via getwork (default: 8332)',
569 type=int, action='store', default=8332, dest='bitcoind_rpc_port')
570 bitcoind_group.add_argument('--bitcoind-p2p-port', metavar='BITCOIND_P2P_PORT',
571 help='connect to a bitcoind at this port over the p2p interface - used to submit blocks and get the pubkey to generate to via an IP transaction (default: 8333 normally. 18333 for testnet)',
572 type=int, action='store', default=None, dest='bitcoind_p2p_port')
574 bitcoind_group.add_argument(metavar='BITCOIND_RPC_USERNAME',
575 help='bitcoind RPC interface username',
576 type=str, action='store', dest='bitcoind_rpc_username')
577 bitcoind_group.add_argument(metavar='BITCOIND_RPC_PASSWORD',
578 help='bitcoind RPC interface password',
579 type=str, action='store', dest='bitcoind_rpc_password')
581 args = parser.parse_args()
584 p2pool_init.DEBUG = True
585 class TeePipe(object):
586 def __init__(self, outputs):
587 self.outputs = outputs
588 def write(self, data):
589 for output in self.outputs:
592 for output in self.outputs:
594 class TimestampingPipe(object):
595 def __init__(self, inner_file):
596 self.inner_file = inner_file
599 def write(self, data):
600 buf = self.buf + data
601 lines = buf.split('\n')
602 for line in lines[:-1]:
603 self.inner_file.write('%s %s\n' % (datetime.datetime.now().strftime("%H:%M:%S.%f"), line))
604 self.inner_file.flush()
608 sys.stdout = sys.stderr = log.DefaultObserver.stderr = TimestampingPipe(TeePipe([sys.stderr, open(os.path.join(os.path.dirname(sys.argv[0]), 'debug.log'), 'w')]))
610 if args.bitcoind_p2p_port is None:
611 args.bitcoind_p2p_port = args.net.BITCOIN_P2P_PORT
613 if args.p2pool_port is None:
614 args.p2pool_port = args.net.P2P_PORT
616 if args.address is not None:
618 args.pubkey_hash = bitcoin.data.address_to_pubkey_hash(args.address, args.net)
620 raise ValueError('error parsing address: ' + repr(e))
622 args.pubkey_hash = None
624 reactor.callWhenRunning(main, args)