3 from __future__ import division
16 from twisted.internet import defer, reactor
17 from twisted.web import server, resource
18 from twisted.python import log
20 import bitcoin.p2p, bitcoin.getwork, bitcoin.data
21 from util import db, expiring_dict, jsonrpc, variable, deferral, math
22 from . import p2p, worker_interface, skiplists
23 import p2pool.data as p2pool
24 import p2pool as p2pool_init
26 @deferral.retry('Error getting work from bitcoind:', 3)
27 @defer.inlineCallbacks
28 def getwork(bitcoind):
29 # a block could arrive in between these two queries
30 getwork_df, height_df = bitcoind.rpc_getwork(), bitcoind.rpc_getblocknumber()
32 getwork, height = bitcoin.getwork.BlockAttempt.from_getwork((yield getwork_df)), (yield height_df)
34 # get rid of residual errors
35 getwork_df.addErrback(lambda fail: None)
36 height_df.addErrback(lambda fail: None)
37 defer.returnValue((getwork, height))
39 @deferral.retry('Error getting payout script from bitcoind:', 1)
40 @defer.inlineCallbacks
41 def get_payout_script(factory):
42 res = yield (yield factory.getProtocol()).check_order(order=bitcoin.p2p.Protocol.null_order)
43 if res['reply'] == 'success':
44 defer.returnValue(res['script'])
45 elif res['reply'] == 'denied':
46 defer.returnValue(None)
48 raise ValueError('Unexpected reply: %r' % (res,))
50 @deferral.retry('Error creating payout script:', 10)
51 @defer.inlineCallbacks
52 def get_payout_script2(bitcoind, net):
53 defer.returnValue(bitcoin.data.pubkey_hash_to_script2(bitcoin.data.address_to_pubkey_hash((yield bitcoind.rpc_getaccountaddress('p2pool')), net)))
55 @defer.inlineCallbacks
61 print 'p2pool (version %s)' % (p2pool_init.__version__,)
64 # connect to bitcoind over JSON-RPC and do initial getwork
65 url = 'http://%s:%i/' % (args.bitcoind_address, args.bitcoind_rpc_port)
66 print '''Testing bitcoind RPC connection to '%s' with username '%s'...''' % (url, args.bitcoind_rpc_username)
67 bitcoind = jsonrpc.Proxy(url, (args.bitcoind_rpc_username, args.bitcoind_rpc_password))
68 temp_work, temp_height = yield getwork(bitcoind)
70 print ' Current block hash: %x height: %i' % (temp_work.previous_block, temp_height)
73 # connect to bitcoind over bitcoin-p2p and do checkorder to get pubkey to send payouts to
74 print '''Testing bitcoind P2P connection to '%s:%s'...''' % (args.bitcoind_address, args.bitcoind_p2p_port)
75 factory = bitcoin.p2p.ClientFactory(args.net)
76 reactor.connectTCP(args.bitcoind_address, args.bitcoind_p2p_port, factory)
77 my_script = yield get_payout_script(factory)
78 if args.pubkey_hash is None:
80 print 'IP transaction denied ... falling back to sending to address.'
81 my_script = yield get_payout_script2(bitcoind, args.net)
83 my_script = bitcoin.data.pubkey_hash_to_script2(args.pubkey_hash)
85 print ' Payout script:', my_script.encode('hex')
88 ht = bitcoin.p2p.HeightTracker(factory)
90 tracker = p2pool.OkayTracker(args.net)
91 chains = expiring_dict.ExpiringDict(300)
92 def get_chain(chain_id_data):
93 return chains.setdefault(chain_id_data, Chain(chain_id_data))
95 peer_heads = expiring_dict.ExpiringDict(300) # hash -> peers that know of it
97 # information affecting work that should trigger a long-polling update
98 current_work = variable.Variable(None)
99 # information affecting work that should not trigger a long-polling update
100 current_work2 = variable.Variable(None)
102 work_updated = variable.Event()
103 tracker_updated = variable.Event()
105 requested = expiring_dict.ExpiringDict(300)
107 @defer.inlineCallbacks
108 def set_real_work1():
109 work, height = yield getwork(bitcoind)
110 # XXX call tracker_updated
111 current_work.set(dict(
112 version=work.version,
113 previous_block=work.previous_block,
116 best_share_hash=current_work.value['best_share_hash'] if current_work.value is not None else None,
118 current_work2.set(dict(
119 clock_offset=time.time() - work.timestamp,
122 @defer.inlineCallbacks
123 def set_real_work2():
124 best, desired = yield tracker.think(ht, current_work.value['previous_block'], time.time() - current_work2.value['clock_offset'])
126 t = dict(current_work.value)
127 t['best_share_hash'] = best
130 for peer2, share_hash in desired:
131 if share_hash not in tracker.tails: # was received in the time tracker.think was running
133 last_request_time, count = requested.get(share_hash, (None, 0))
134 if last_request_time is not None and last_request_time - 5 < time.time() < last_request_time + 10 * 1.5**count:
136 potential_peers = set()
137 for head in tracker.tails[share_hash]:
138 potential_peers.update(peer_heads.get(head, set()))
139 potential_peers = [peer for peer in potential_peers if peer.connected2]
140 if count == 0 and peer2 is not None and peer2.connected2:
143 peer = random.choice(potential_peers) if potential_peers and random.random() > .2 else peer2
147 print 'Requesting parent share %s from %s' % (p2pool.format_hash(share_hash), '%s:%i' % peer.addr)
151 stops=list(set(tracker.heads) | set(
152 tracker.get_nth_parent_hash(head, min(max(0, tracker.get_height_and_last(head)[0] - 1), 10)) for head in tracker.heads
155 requested[share_hash] = time.time(), count + 1
157 print 'Initializing work...'
158 yield set_real_work1()
159 yield set_real_work2()
162 start_time = time.time() - current_work2.value['clock_offset']
164 # setup p2p logic and join p2pool network
166 def share_share(share, ignore_peer=None):
167 for peer in p2p_node.peers.itervalues():
168 if peer is ignore_peer:
170 if p2pool_init.DEBUG:
171 print "Sending share %s to %r" % (p2pool.format_hash(share.hash), peer.addr)
172 peer.send_shares([share])
175 def p2p_shares(shares, peer=None):
177 print 'Processing %i shares...' % (len(shares),)
181 if share.hash in tracker.shares:
182 #print 'Got duplicate share, ignoring. Hash: %s' % (p2pool.format_hash(share.hash),)
186 #print 'Received share %s from %r' % (p2pool.format_hash(share.hash), share.peer.addr if share.peer is not None else None)
189 #for peer2, share_hash in desired:
190 # print 'Requesting parent share %x' % (share_hash,)
191 # peer2.send_getshares(hashes=[share_hash], parents=2000)
193 if share.bitcoin_hash <= share.header['target']:
195 print 'GOT BLOCK! Passing to bitcoind! %s bitcoin: %x' % (p2pool.format_hash(share.hash), share.bitcoin_hash,)
197 if factory.conn.value is not None:
198 factory.conn.value.send_block(block=share.as_block(tracker, args.net))
200 print 'No bitcoind connection! Erp!'
202 if shares and peer is not None:
203 peer_heads.setdefault(shares[0].hash, set()).add(peer)
206 tracker_updated.happened()
209 print '... done processing %i shares. Have: %i/~%i' % (len(shares), len(tracker.shares), 2*args.net.CHAIN_LENGTH)
211 def p2p_share_hashes(share_hashes, peer):
213 for share_hash in share_hashes:
214 if share_hash in tracker.shares:
215 pass # print 'Got share hash, already have, ignoring. Hash: %s' % (p2pool.format_hash(share_hash),)
217 print 'Got share hash, requesting! Hash: %s' % (p2pool.format_hash(share_hash),)
218 get_hashes.append(share_hash)
220 if share_hashes and peer is not None:
221 peer_heads.setdefault(share_hashes[0], set()).add(peer)
223 peer.send_getshares(hashes=get_hashes, parents=0, stops=[])
225 def p2p_get_shares(share_hashes, parents, stops, peer):
226 parents = min(parents, 1000//len(share_hashes))
229 for share_hash in share_hashes:
230 for share in itertools.islice(tracker.get_chain_known(share_hash), parents + 1):
231 if share.hash in stops:
234 peer.send_shares(shares, full=True)
236 print 'Joining p2pool network using TCP port %i...' % (args.p2pool_port,)
240 ip, port = x.split(':')
243 return x, args.net.P2P_PORT
246 ('72.14.191.28', args.net.P2P_PORT),
247 ('62.204.197.159', args.net.P2P_PORT),
250 nodes.append(((yield reactor.resolve('p2pool.forre.st')), args.net.P2P_PORT))
252 log.err(None, 'Error resolving bootstrap node IP:')
255 current_work=current_work,
256 port=args.p2pool_port,
258 addr_store=db.SQLiteDict(sqlite3.connect(os.path.join(os.path.dirname(sys.argv[0]), 'addrs.dat'), isolation_level=None), args.net.ADDRS_TABLE),
259 mode=0 if args.low_bandwidth else 1,
260 preferred_addrs=map(parse, args.p2pool_nodes) + nodes,
262 p2p_node.handle_shares = p2p_shares
263 p2p_node.handle_share_hashes = p2p_share_hashes
264 p2p_node.handle_get_shares = p2p_get_shares
268 # send share when the chain changes to their chain
269 def work_changed(new_work):
270 #print 'Work changed:', new_work
271 for share in tracker.get_chain_known(new_work['best_share_hash']):
274 share_share(share, share.peer)
275 current_work.changed.watch(work_changed)
280 # start listening for workers with a JSON-RPC server
282 print 'Listening for workers on port %i...' % (args.worker_port,)
286 merkle_root_to_transactions = expiring_dict.ExpiringDict(300)
287 run_identifier = struct.pack('<Q', random.randrange(2**64))
289 def compute(state, payout_script):
290 if payout_script is None:
291 payout_script = my_script
292 if state['best_share_hash'] is None and args.net.PERSIST:
293 raise jsonrpc.Error(-12345, u'p2pool is downloading shares')
294 pre_extra_txs = [tx for tx in tx_pool.itervalues() if tx.is_good()]
295 pre_extra_txs = pre_extra_txs[:2**16 - 1] # merkle_branch limit
298 for tx in pre_extra_txs:
299 this_size = len(bitcoin.data.tx_type.pack(tx.tx))
300 if size + this_size > 500000:
305 # XXX assuming generate_tx is smallish here..
306 generate_tx = p2pool.generate_transaction(
308 previous_share_hash=state['best_share_hash'],
309 new_script=payout_script,
310 subsidy=(50*100000000 >> (state['height'] + 1)//210000) + sum(tx.value_in - tx.value_out for tx in extra_txs),
311 nonce=run_identifier + struct.pack('<Q', random.randrange(2**64)),
312 block_target=state['target'],
315 print 'Generating! Difficulty: %.06f Payout if block: %.6f BTC' % (0xffff*2**208/p2pool.coinbase_type.unpack(generate_tx['tx_ins'][0]['script'])['share_data']['target'], generate_tx['tx_outs'][-1]['value']*1e-8)
316 #print 'Target: %x' % (p2pool.coinbase_type.unpack(generate_tx['tx_ins'][0]['script'])['share_data']['target'],)
317 #, have', shares.count(my_script) - 2, 'share(s) in the current chain. Fee:', sum(tx.value_in - tx.value_out for tx in extra_txs)/100000000
318 transactions = [generate_tx] + [tx.tx for tx in extra_txs]
319 merkle_root = bitcoin.data.merkle_hash(transactions)
320 merkle_root_to_transactions[merkle_root] = transactions # will stay for 1000 seconds
322 timestamp = int(time.time() - current_work2.value['clock_offset'])
323 if state['best_share_hash'] is not None:
324 timestamp2 = math.median((s.timestamp for s in itertools.islice(tracker.get_chain_to_root(state['best_share_hash']), 11)), use_float=False) + 1
325 if timestamp2 > timestamp:
326 print 'Toff', timestamp2 - timestamp
327 timestamp = timestamp2
328 target2 = p2pool.coinbase_type.unpack(generate_tx['tx_ins'][0]['script'])['share_data']['target']
329 times[p2pool.coinbase_type.unpack(generate_tx['tx_ins'][0]['script'])['share_data']['nonce']] = time.time()
330 #print 'SENT', 2**256//p2pool.coinbase_type.unpack(generate_tx['tx_ins'][0]['script'])['share_data']['target']
331 return bitcoin.getwork.BlockAttempt(state['version'], state['previous_block'], merkle_root, timestamp, state['target'], target2)
336 def got_response(data):
338 # match up with transactions
339 header = bitcoin.getwork.decode_data(data)
340 transactions = merkle_root_to_transactions.get(header['merkle_root'], None)
341 if transactions is None:
342 print '''Couldn't link returned work's merkle root with its transactions - should only happen if you recently restarted p2pool'''
344 block = dict(header=header, txs=transactions)
345 hash_ = bitcoin.data.block_header_type.hash256(block['header'])
346 if hash_ <= block['header']['target'] or p2pool_init.DEBUG:
348 print 'GOT BLOCK! Passing to bitcoind! bitcoin: %x' % (hash_,)
350 if factory.conn.value is not None:
351 factory.conn.value.send_block(block=block)
353 print 'No bitcoind connection! Erp!'
354 target = p2pool.coinbase_type.unpack(transactions[0]['tx_ins'][0]['script'])['share_data']['target']
356 print 'Received invalid share from worker - %x/%x' % (hash_, target)
358 share = p2pool.Share.from_block(block)
359 my_shares.add(share.hash)
360 print 'GOT SHARE! %s prev %s age %.2fs' % (p2pool.format_hash(share.hash), p2pool.format_hash(share.previous_hash), time.time() - times[share.nonce]) + (' DEAD ON ARRIVAL' if share.previous_hash != current_work.value['best_share_hash'] else '')
361 good = share.previous_hash == current_work.value['best_share_hash']
362 # maybe revert back to tracker being non-blocking so 'good' can be more accurate?
364 # eg. good = share.hash == current_work.value['best_share_hash'] here
367 log.err(None, 'Error processing data received from worker:')
370 web_root = worker_interface.WorkerInterface(current_work, compute, got_response, args.net)
373 if current_work.value['best_share_hash'] is not None:
374 height, last = tracker.get_height_and_last(current_work.value['best_share_hash'])
375 att_s = p2pool.get_pool_attempts_per_second(tracker, current_work.value['best_share_hash'], args.net, min(height, 720))
376 return json.dumps(att_s)
377 return json.dumps(None)
380 height, last = tracker.get_height_and_last(current_work.value['best_share_hash'])
381 weights, total_weight = tracker.get_cumulative_weights(current_work.value['best_share_hash'], min(height, 720), 2**256)
383 for script in sorted(weights, key=lambda s: weights[s]):
384 res[script.encode('hex')] = weights[script]/total_weight
385 return json.dumps(res)
387 class WebInterface(resource.Resource):
388 def __init__(self, func, mime_type):
389 self.func, self.mime_type = func, mime_type
391 def render_GET(self, request):
392 request.setHeader('Content-Type', self.mime_type)
395 web_root.putChild('rate', WebInterface(get_rate, 'application/json'))
396 web_root.putChild('users', WebInterface(get_users, 'application/json'))
398 web_root.putChild('chain_img', WebInterface(lambda: draw.get(tracker, current_work.value['best_share_hash']), 'image/png'))
400 reactor.listenTCP(args.worker_port, server.Site(web_root))
407 tx_pool = expiring_dict.ExpiringDict(600, get_touches=False) # hash -> tx
408 get_raw_transaction = deferral.DeferredCacher(lambda tx_hash: bitcoind.rpc_getrawtransaction('%x' % tx_hash), expiring_dict.ExpiringDict(100))
411 def __init__(self, tx, seen_at_block):
412 self.hash = bitcoin.data.tx_type.hash256(tx)
414 self.seen_at_block = seen_at_block
415 self.mentions = set([bitcoin.data.tx_type.hash256(tx)] + [tx_in['previous_output']['hash'] for tx_in in tx['tx_ins']])
417 #print '%x %r' % (seen_at_block, tx)
418 #for mention in self.mentions:
419 # print '%x' % mention
421 self.parents_all_in_blocks = False
424 self.value_out = sum(txout['value'] for txout in self.tx['tx_outs'])
425 self._find_parents_in_blocks()
427 @defer.inlineCallbacks
428 def _find_parents_in_blocks(self):
429 for tx_in in self.tx['tx_ins']:
431 raw_transaction = yield get_raw_transaction(tx_in['previous_output']['hash'])
434 self.value_in += raw_transaction['tx']['txouts'][tx_in['previous_output']['index']]['value']
435 #print raw_transaction
436 if not raw_transaction['parent_blocks']:
438 self.parents_all_in_blocks = True
441 if not self.parents_all_in_blocks:
447 @defer.inlineCallbacks
450 assert isinstance(tx_hash, (int, long))
451 #print 'REQUESTING', tx_hash
452 tx = yield (yield factory.getProtocol()).get_tx(tx_hash)
454 tx_pool[bitcoin.data.tx_type.hash256(tx)] = Tx(tx, current_work.value['previous_block'])
456 log.err(None, 'Error handling tx:')
457 # disable for now, for testing impact on stales
458 #factory.new_tx.watch(new_tx)
460 def new_block(block_hash):
461 work_updated.happened()
462 factory.new_block.watch(new_block)
464 print 'Started successfully!'
467 @defer.inlineCallbacks
470 flag = work_updated.get_deferred()
472 yield set_real_work1()
475 yield defer.DeferredList([flag, deferral.sleep(random.expovariate(1/1))], fireOnOneCallback=True)
477 @defer.inlineCallbacks
480 flag = tracker_updated.get_deferred()
482 yield set_real_work2()
485 yield defer.DeferredList([flag, deferral.sleep(random.expovariate(1/1))], fireOnOneCallback=True)
490 counter = skiplists.CountsSkipList(tracker, run_identifier)
493 yield deferral.sleep(random.expovariate(1/1))
495 if current_work.value['best_share_hash'] is not None:
496 height, last = tracker.get_height_and_last(current_work.value['best_share_hash'])
498 att_s = p2pool.get_pool_attempts_per_second(tracker, current_work.value['best_share_hash'], args.net)
499 weights, total_weight = tracker.get_cumulative_weights(current_work.value['best_share_hash'], min(height, 120), 2**100)
500 matching_in_chain = counter(current_work.value['best_share_hash'], height)
501 shares_in_chain = my_shares & matching_in_chain
502 stale_shares = my_shares - matching_in_chain
503 print 'Pool: %sH/s in %i shares Recent: %.02f%% >%sH/s Shares: %i (%i stale) Peers: %i' % (
506 weights.get(my_script, 0)/total_weight*100,
507 math.format(weights.get(my_script, 0)/total_weight*att_s),
508 len(shares_in_chain) + len(stale_shares),
512 #weights, total_weight = tracker.get_cumulative_weights(current_work.value['best_share_hash'], min(height, 100), 2**100)
513 #for k, v in weights.iteritems():
514 # print k.encode('hex'), v/total_weight
518 log.err(None, 'Fatal error:')
522 parser = argparse.ArgumentParser(description='p2pool (version %s)' % (p2pool_init.__version__,))
523 parser.add_argument('--version', action='version', version=p2pool_init.__version__)
524 parser.add_argument('--testnet',
525 help='use the testnet',
526 action='store_const', const=p2pool.Testnet, default=p2pool.Mainnet, dest='net')
527 parser.add_argument('--debug',
528 help='debugging mode',
529 action='store_const', const=True, default=False, dest='debug')
530 parser.add_argument('-a', '--address',
531 help='generate to this address (defaults to requesting one from bitcoind)',
532 type=str, action='store', default=None, dest='address')
533 parser.add_argument('--charts',
534 help='generate charts on the web interface (requires PIL and pygame)',
535 action='store_const', const=True, default=False, dest='charts')
537 p2pool_group = parser.add_argument_group('p2pool interface')
538 p2pool_group.add_argument('--p2pool-port', metavar='PORT',
539 help='use TCP port PORT to listen for connections (default: 9333 normally, 19333 for testnet) (forward this port from your router!)',
540 type=int, action='store', default=None, dest='p2pool_port')
541 p2pool_group.add_argument('-n', '--p2pool-node', metavar='ADDR[:PORT]',
542 help='connect to existing p2pool node at ADDR listening on TCP port PORT (defaults to 9333 normally, 19333 for testnet), in addition to builtin addresses',
543 type=str, action='append', default=[], dest='p2pool_nodes')
544 parser.add_argument('-l', '--low-bandwidth',
545 help='trade lower bandwidth usage for higher latency (reduced efficiency)',
546 action='store_true', default=False, dest='low_bandwidth')
548 worker_group = parser.add_argument_group('worker interface')
549 worker_group.add_argument('-w', '--worker-port', metavar='PORT',
550 help='listen on PORT for RPC connections from miners asking for work and providing responses (default: 9332)',
551 type=int, action='store', default=9332, dest='worker_port')
553 bitcoind_group = parser.add_argument_group('bitcoind interface')
554 bitcoind_group.add_argument('--bitcoind-address', metavar='BITCOIND_ADDRESS',
555 help='connect to a bitcoind at this address (default: 127.0.0.1)',
556 type=str, action='store', default='127.0.0.1', dest='bitcoind_address')
557 bitcoind_group.add_argument('--bitcoind-rpc-port', metavar='BITCOIND_RPC_PORT',
558 help='connect to a bitcoind at this port over the RPC interface - used to get the current highest block via getwork (default: 8332)',
559 type=int, action='store', default=8332, dest='bitcoind_rpc_port')
560 bitcoind_group.add_argument('--bitcoind-p2p-port', metavar='BITCOIND_P2P_PORT',
561 help='connect to a bitcoind at this port over the p2p interface - used to submit blocks and get the pubkey to generate to via an IP transaction (default: 8333 normally. 18333 for testnet)',
562 type=int, action='store', default=None, dest='bitcoind_p2p_port')
564 bitcoind_group.add_argument(metavar='BITCOIND_RPC_USERNAME',
565 help='bitcoind RPC interface username',
566 type=str, action='store', dest='bitcoind_rpc_username')
567 bitcoind_group.add_argument(metavar='BITCOIND_RPC_PASSWORD',
568 help='bitcoind RPC interface password',
569 type=str, action='store', dest='bitcoind_rpc_password')
571 args = parser.parse_args()
574 p2pool_init.DEBUG = True
575 class TeePipe(object):
576 def __init__(self, outputs):
577 self.outputs = outputs
578 def write(self, data):
579 for output in self.outputs:
582 for output in self.outputs:
584 class TimestampingPipe(object):
585 def __init__(self, inner_file):
586 self.inner_file = inner_file
589 def write(self, data):
590 buf = self.buf + data
591 lines = buf.split('\n')
592 for line in lines[:-1]:
593 self.inner_file.write('%s %s\n' % (datetime.datetime.now().strftime("%H:%M:%S.%f"), line))
594 self.inner_file.flush()
598 sys.stdout = sys.stderr = log.DefaultObserver.stderr = TimestampingPipe(TeePipe([sys.stderr, open(os.path.join(os.path.dirname(sys.argv[0]), 'debug.log'), 'w')]))
600 if args.bitcoind_p2p_port is None:
601 args.bitcoind_p2p_port = args.net.BITCOIN_P2P_PORT
603 if args.p2pool_port is None:
604 args.p2pool_port = args.net.P2P_PORT
606 if args.address is not None:
608 args.pubkey_hash = bitcoin.data.address_to_pubkey_hash(args.address, args.net)
610 raise ValueError('error parsing address: ' + repr(e))
612 args.pubkey_hash = None
614 reactor.callWhenRunning(main, args)