3 from __future__ import division
14 from twisted.internet import defer, reactor
15 from twisted.web import server
16 from twisted.python import log
18 import bitcoin.p2p, bitcoin.getwork, bitcoin.data
19 from util import db, expiring_dict, jsonrpc, variable, deferral, math
20 from . import p2p, worker_interface
21 import p2pool.data as p2pool
24 __version__ = subprocess.Popen(['svnversion', os.path.dirname(sys.argv[0])], stdout=subprocess.PIPE).stdout.read().strip()
26 __version__ = 'unknown'
28 @deferral.retry('Error getting work from bitcoind:', 3)
29 @defer.inlineCallbacks
30 def getwork(bitcoind):
31 # a block could arrive in between these two queries
32 getwork_df, height_df = bitcoind.rpc_getwork(), bitcoind.rpc_getblocknumber()
34 getwork, height = bitcoin.getwork.BlockAttempt.from_getwork((yield getwork_df)), (yield height_df)
36 # get rid of residual errors
37 getwork_df.addErrback(lambda fail: None)
38 height_df.addErrback(lambda fail: None)
39 defer.returnValue((getwork, height))
41 @deferral.retry('Error getting payout script from bitcoind:', 1)
42 @defer.inlineCallbacks
43 def get_payout_script(factory):
44 res = yield (yield factory.getProtocol()).check_order(order=bitcoin.p2p.Protocol.null_order)
45 if res['reply'] == 'success':
46 my_script = res['script']
47 elif res['reply'] == 'denied':
50 raise ValueError('Unexpected reply: %r' % (res,))
52 @deferral.retry('Error creating payout script:', 10)
53 @defer.inlineCallbacks
54 def get_payout_script2(bitcoind, net):
55 defer.returnValue(bitcoin.data.pubkey_hash_to_script2(bitcoin.data.address_to_pubkey_hash((yield bitcoind.rpc_getnewaddress()), net)))
57 @defer.inlineCallbacks
60 print 'p2pool (version %s)' % (__version__,)
63 # connect to bitcoind over JSON-RPC and do initial getwork
64 url = 'http://%s:%i/' % (args.bitcoind_address, args.bitcoind_rpc_port)
65 print '''Testing bitcoind RPC connection to '%s' with authorization '%s:%s'...''' % (url, args.bitcoind_rpc_username, args.bitcoind_rpc_password)
66 bitcoind = jsonrpc.Proxy(url, (args.bitcoind_rpc_username, args.bitcoind_rpc_password))
67 work, height = yield getwork(bitcoind)
69 print ' Current block hash: %x height: %i' % (work.previous_block, height)
72 # connect to bitcoind over bitcoin-p2p and do checkorder to get pubkey to send payouts to
73 print '''Testing bitcoind P2P connection to '%s:%s'...''' % (args.bitcoind_address, args.bitcoind_p2p_port)
74 factory = bitcoin.p2p.ClientFactory(args.net)
75 reactor.connectTCP(args.bitcoind_address, args.bitcoind_p2p_port, factory)
76 my_script = yield get_payout_script(factory)
78 print 'IP transaction denied ... falling back to sending to address. Enable IP transactions on your bitcoind!'
79 my_script = yield get_payout_script2(bitcoind, args.net)
81 print ' Payout script:', my_script.encode('hex')
84 @defer.inlineCallbacks
85 def real_get_block(block_hash):
86 block = yield (yield factory.getProtocol()).get_block(block_hash)
87 print 'Got block %x' % (block_hash,)
88 defer.returnValue(block)
89 get_block = deferral.DeferredCacher(real_get_block, expiring_dict.ExpiringDict(3600))
91 get_raw_transaction = deferral.DeferredCacher(lambda tx_hash: bitcoind.rpc_getrawtransaction('%x' % tx_hash), expiring_dict.ExpiringDict(100))
93 ht = bitcoin.p2p.HeightTracker(factory)
95 tracker = p2pool.OkayTracker(args.net)
96 chains = expiring_dict.ExpiringDict(300)
97 def get_chain(chain_id_data):
98 return chains.setdefault(chain_id_data, Chain(chain_id_data))
100 # information affecting work that should trigger a long-polling update
101 current_work = variable.Variable(None)
102 # information affecting work that should not trigger a long-polling update
103 current_work2 = variable.Variable(None)
107 @defer.inlineCallbacks
109 work, height = yield getwork(bitcoind)
110 current_work2.set(dict(
113 best, desired = tracker.think(ht, current_work2.value['time'])
114 for peer2, share_hash in desired:
117 if (peer2.nonce, share_hash) in requested:
119 print 'Requesting parent share %x' % (share_hash,)
120 peer2.send_getshares(
123 stops=list(set(tracker.heads) | set(
124 tracker.get_nth_parent_hash(head, min(max(0, tracker.get_height_and_last(head)[0] - 1), 10)) for head in tracker.heads
127 requested.add((peer2.nonce, share_hash))
128 current_work.set(dict(
129 version=work.version,
130 previous_block=work.previous_block,
133 best_share_hash=best,
136 print 'Initializing work...'
137 yield set_real_work()
140 # setup p2p logic and join p2pool network
142 def share_share(share, ignore_peer=None):
143 for peer in p2p_node.peers.itervalues():
144 if peer is ignore_peer:
146 peer.send_shares([share])
149 def p2p_share(share, peer=None):
150 if share.hash in tracker.shares:
151 print 'Got duplicate share, ignoring. Hash: %x' % (share.hash,)
154 #print 'Received share %x' % (share.hash,)
157 best, desired = tracker.think(ht, current_work2.value['time'])
158 #for peer2, share_hash in desired:
159 # print 'Requesting parent share %x' % (share_hash,)
160 # peer2.send_getshares(hashes=[share_hash], parents=2000)
162 if share.gentx is not None:
163 if share.hash <= share.header['target']:
165 print 'GOT BLOCK! Passing to bitcoind! %x' % (share.hash,)
167 if factory.conn.value is not None:
168 factory.conn.value.send_block(block=share.as_block())
170 print 'No bitcoind connection! Erp!'
172 w = dict(current_work.value)
173 w['best_share_hash'] = best
176 if best == share.hash:
177 print 'Accepted share, new highest, will pass to peers! Hash: %x' % (share.hash,)
179 print 'Accepted share, not highest. Hash: %x' % (share.hash,)
181 def p2p_share_hash(share_hash, peer):
182 if share_hash in tracker.shares:
183 print 'Got share hash, already have, ignoring. Hash: %x' % (share_hash,)
185 print 'Got share hash, requesting! Hash: %x' % (share_hash,)
186 peer.send_getshares(hashes=[share_hash], parents=0, stops=[])
188 def p2p_get_shares(share_hashes, parents, stops, peer):
189 parents = min(parents, 1000//len(share_hashes))
192 for share_hash in share_hashes:
193 for share in itertools.islice(tracker.get_chain_known(share_hash), parents + 1):
194 if share.hash in stops:
197 peer.send_shares(shares, full=True)
199 print 'Joining p2pool network using TCP port %i...' % (args.p2pool_port,)
203 ip, port = x.split(':')
206 return x, args.net.P2P_PORT
209 ('72.14.191.28', args.net.P2P_PORT),
210 ('62.204.197.159', args.net.P2P_PORT),
213 nodes.append(((yield reactor.resolve('p2pool.forre.st')), args.net.P2P_PORT))
216 print 'Error resolving bootstrap node IP:'
221 current_work=current_work,
222 port=args.p2pool_port,
224 addr_store=db.SQLiteDict(sqlite3.connect(os.path.join(os.path.dirname(sys.argv[0]), 'addrs.dat'), isolation_level=None), args.net.ADDRS_TABLE),
225 mode=0 if args.low_bandwidth else 1,
226 preferred_addrs=map(parse, args.p2pool_nodes) + nodes,
228 p2p_node.handle_share = p2p_share
229 p2p_node.handle_share_hash = p2p_share_hash
230 p2p_node.handle_get_shares = p2p_get_shares
234 # send share when the chain changes to their chain
235 def work_changed(new_work):
236 #print 'Work changed:', new_work
237 for share in tracker.get_chain_known(new_work['best_share_hash']):
240 share_share(share, share.peer)
241 current_work.changed.watch(work_changed)
246 # start listening for workers with a JSON-RPC server
248 print 'Listening for workers on port %i...' % (args.worker_port,)
252 merkle_root_to_transactions = expiring_dict.ExpiringDict(300)
254 def compute(state, all_targets):
255 extra_txs = [tx for tx in tx_pool.itervalues() if tx.is_good()]
256 # XXX limit to merkle_branch and block max size - 1000000 byte
258 generate_tx = p2pool.generate_transaction(
260 previous_share_hash=state['best_share_hash'],
261 new_script=my_script,
262 subsidy=(50*100000000 >> (state['height'] + 1)//210000) + sum(tx.value_in - tx.value_out for tx in extra_txs),
263 nonce=struct.pack('<Q', random.randrange(2**64)),
264 block_target=state['target'],
267 print 'Generating!', 2**256//p2pool.coinbase_type.unpack(generate_tx['tx_ins'][0]['script'])['share_data']['target2']//1000000
268 print 'Target: %x' % (p2pool.coinbase_type.unpack(generate_tx['tx_ins'][0]['script'])['share_data']['target2'],)
269 #, have', shares.count(my_script) - 2, 'share(s) in the current chain. Fee:', sum(tx.value_in - tx.value_out for tx in extra_txs)/100000000
270 transactions = [generate_tx] + [tx.tx for tx in extra_txs]
271 merkle_root = bitcoin.data.merkle_hash(transactions)
272 merkle_root_to_transactions[merkle_root] = transactions # will stay for 1000 seconds
274 timestamp = current_work2.value['time']
275 if state['best_share_hash'] is not None:
276 timestamp2 = math.median((s.timestamp for s in itertools.islice(tracker.get_chain_to_root(state['best_share_hash']), 11)), use_float=False) + 1
277 if timestamp2 > timestamp:
278 print 'Toff', timestamp2 - timestamp
279 timestamp = timestamp2
280 ba = bitcoin.getwork.BlockAttempt(state['version'], state['previous_block'], merkle_root, timestamp, state['target'])
281 #print 'SENT', 2**256//p2pool.coinbase_type.unpack(generate_tx['tx_ins'][0]['script'])['share_data']['target2']
282 target = p2pool.coinbase_type.unpack(generate_tx['tx_ins'][0]['script'])['share_data']['target2']
284 target = min(2**256//2**32 - 1, target)
285 return ba.getwork(target)
287 def got_response(data):
289 # match up with transactions
290 header = bitcoin.getwork.decode_data(data)
291 transactions = merkle_root_to_transactions.get(header['merkle_root'], None)
292 if transactions is None:
293 print '''Couldn't link returned work's merkle root with its transactions - should only happen if you recently restarted p2pool'''
295 block = dict(header=header, txs=transactions)
296 hash_ = bitcoin.data.block_header_type.hash256(block['header'])
297 if hash_ <= block['header']['target']:
299 print 'GOT BLOCK! Passing to bitcoind! %x' % (hash_,)
301 if factory.conn.value is not None:
302 factory.conn.value.send_block(block=block)
304 print 'No bitcoind connection! Erp!'
305 share = p2pool.Share.from_block(block)
306 print 'GOT SHARE! %x' % (share.hash,)
310 print 'Error processing data received from worker:'
317 reactor.listenTCP(args.worker_port, server.Site(worker_interface.WorkerInterface(current_work, compute, got_response)))
324 def get_blocks(start_hash):
327 block = get_block.call_now(start_hash)
328 except deferral.NotNowError:
330 yield start_hash, block
331 start_hash = block['header']['previous_block']
333 tx_pool = expiring_dict.ExpiringDict(600, get_touches=False) # hash -> tx
336 def __init__(self, tx, seen_at_block):
337 self.hash = bitcoin.data.tx_type.hash256(tx)
339 self.seen_at_block = seen_at_block
340 self.mentions = set([bitcoin.data.tx_type.hash256(tx)] + [tx_in['previous_output']['hash'] for tx_in in tx['tx_ins']])
342 #print '%x %r' % (seen_at_block, tx)
343 #for mention in self.mentions:
344 # print '%x' % mention
346 self.parents_all_in_blocks = False
349 self.value_out = sum(txout['value'] for txout in self.tx['tx_outs'])
350 self._find_parents_in_blocks()
352 @defer.inlineCallbacks
353 def _find_parents_in_blocks(self):
354 for tx_in in self.tx['tx_ins']:
356 raw_transaction = yield get_raw_transaction(tx_in['previous_output']['hash'])
359 self.value_in += raw_transaction['tx']['txouts'][tx_in['previous_output']['index']]['value']
360 #print raw_transaction
361 if not raw_transaction['parent_blocks']:
363 self.parents_all_in_blocks = True
366 if not self.parents_all_in_blocks:
373 for block_hash, block in itertools.islice(get_blocks(current_work.value['previous_block']), 10):
374 if block_hash == self.seen_at_block:
376 for tx in block['txs']:
377 mentions = set([bitcoin.data.tx_type.hash256(tx)] + [tx_in['previous_output']['hash'] for tx_in in tx['tx_ins']])
378 if mentions & self.mentions:
382 @defer.inlineCallbacks
385 assert isinstance(tx_hash, (int, long))
386 tx = yield (yield factory.getProtocol()).get_tx(tx_hash)
387 tx_pool[bitcoin.data.tx_type.hash256(tx)] = Tx(tx, current_work.value['previous_block'])
390 print 'Error handling tx:'
393 factory.new_tx.watch(new_tx)
395 def new_block(block):
397 factory.new_block.watch(new_block)
399 print 'Started successfully!'
403 yield deferral.sleep(1)
404 yield set_real_work()
413 parser = argparse.ArgumentParser(description='p2pool (version %s)' % (__version__,))
414 parser.add_argument('--version', action='version', version=__version__)
415 parser.add_argument('--testnet',
416 help='use the testnet',
417 action='store_const', const=p2pool.Testnet, default=p2pool.Mainnet, dest='net')
419 p2pool_group = parser.add_argument_group('p2pool interface')
420 p2pool_group.add_argument('--p2pool-port', metavar='PORT',
421 help='use TCP port PORT to listen for connections (default: 9333 normally, 19333 for testnet) (forward this port from your router!)',
422 type=int, action='store', default=None, dest='p2pool_port')
423 p2pool_group.add_argument('-n', '--p2pool-node', metavar='ADDR[:PORT]',
424 help='connect to existing p2pool node at ADDR listening on TCP port PORT (defaults to 9333 normally, 19333 for testnet), in addition to builtin addresses',
425 type=str, action='append', default=[], dest='p2pool_nodes')
426 parser.add_argument('-l', '--low-bandwidth',
427 help='trade lower bandwidth usage for higher latency (reduced efficiency)',
428 action='store_true', default=False, dest='low_bandwidth')
430 worker_group = parser.add_argument_group('worker interface')
431 worker_group.add_argument('-w', '--worker-port', metavar='PORT',
432 help='listen on PORT for RPC connections from miners asking for work and providing responses (default: 9332)',
433 type=int, action='store', default=9332, dest='worker_port')
435 bitcoind_group = parser.add_argument_group('bitcoind interface')
436 bitcoind_group.add_argument('--bitcoind-address', metavar='BITCOIND_ADDRESS',
437 help='connect to a bitcoind at this address (default: 127.0.0.1)',
438 type=str, action='store', default='127.0.0.1', dest='bitcoind_address')
439 bitcoind_group.add_argument('--bitcoind-rpc-port', metavar='BITCOIND_RPC_PORT',
440 help='connect to a bitcoind at this port over the RPC interface - used to get the current highest block via getwork (default: 8332)',
441 type=int, action='store', default=8332, dest='bitcoind_rpc_port')
442 bitcoind_group.add_argument('--bitcoind-p2p-port', metavar='BITCOIND_P2P_PORT',
443 help='connect to a bitcoind at this port over the p2p interface - used to submit blocks and get the pubkey to generate to via an IP transaction (default: 8333 normally. 18333 for testnet)',
444 type=int, action='store', default=None, dest='bitcoind_p2p_port')
446 bitcoind_group.add_argument(metavar='BITCOIND_RPC_USERNAME',
447 help='bitcoind RPC interface username',
448 type=str, action='store', dest='bitcoind_rpc_username')
449 bitcoind_group.add_argument(metavar='BITCOIND_RPC_PASSWORD',
450 help='bitcoind RPC interface password',
451 type=str, action='store', dest='bitcoind_rpc_password')
453 args = parser.parse_args()
455 if args.bitcoind_p2p_port is None:
456 args.bitcoind_p2p_port = args.net.BITCOIN_P2P_PORT
458 if args.p2pool_port is None:
459 args.p2pool_port = args.net.P2P_PORT
461 reactor.callWhenRunning(main, args)