fix p2pool fatal error messages being hidden by messages printed when reactor stops
[p2pool.git] / p2pool / main.py
1 from __future__ import division
2
3 import ConfigParser
4 import StringIO
5 import argparse
6 import base64
7 import os
8 import random
9 import sys
10 import time
11 import signal
12 import traceback
13 import urlparse
14
15 from twisted.internet import defer, reactor, protocol, task
16 from twisted.web import server
17 from twisted.python import log
18 from nattraverso import portmapper, ipdiscover
19
20 import bitcoin.p2p as bitcoin_p2p, bitcoin.getwork as bitcoin_getwork, bitcoin.data as bitcoin_data
21 from bitcoin import worker_interface, height_tracker
22 from util import expiring_dict, jsonrpc, variable, deferral, math, logging, pack
23 from . import p2p, networks, web
24 import p2pool, p2pool.data as p2pool_data
25
26 @deferral.retry('Error getting work from bitcoind:', 3)
27 @defer.inlineCallbacks
28 def getwork(bitcoind):
29     try:
30         work = yield bitcoind.rpc_getmemorypool()
31     except jsonrpc.Error, e:
32         if e.code == -32601: # Method not found
33             print >>sys.stderr, 'Error: Bitcoin version too old! Upgrade to v0.5 or newer!'
34             raise deferral.RetrySilentlyException()
35         raise
36     packed_transactions = [x.decode('hex') for x in work['transactions']]
37     defer.returnValue(dict(
38         version=work['version'],
39         previous_block_hash=int(work['previousblockhash'], 16),
40         transactions=map(bitcoin_data.tx_type.unpack, packed_transactions),
41         merkle_branch=bitcoin_data.calculate_merkle_branch([0] + map(bitcoin_data.hash256, packed_transactions), 0),
42         subsidy=work['coinbasevalue'],
43         time=work['time'],
44         bits=bitcoin_data.FloatingIntegerType().unpack(work['bits'].decode('hex')[::-1]) if isinstance(work['bits'], (str, unicode)) else bitcoin_data.FloatingInteger(work['bits']),
45         coinbaseflags=work['coinbaseflags'].decode('hex') if 'coinbaseflags' in work else ''.join(x.decode('hex') for x in work['coinbaseaux'].itervalues()) if 'coinbaseaux' in work else '',
46     ))
47
48 @defer.inlineCallbacks
49 def main(args, net, datadir_path, merged_urls, worker_endpoint):
50     try:
51         print 'p2pool (version %s)' % (p2pool.__version__,)
52         print
53         
54         # connect to bitcoind over JSON-RPC and do initial getmemorypool
55         url = 'http://%s:%i/' % (args.bitcoind_address, args.bitcoind_rpc_port)
56         print '''Testing bitcoind RPC connection to '%s' with username '%s'...''' % (url, args.bitcoind_rpc_username)
57         bitcoind = jsonrpc.Proxy(url, dict(Authorization='Basic ' + base64.b64encode(args.bitcoind_rpc_username + ':' + args.bitcoind_rpc_password)), timeout=30)
58         @deferral.retry('Error while checking Bitcoin connection:', 1)
59         @defer.inlineCallbacks
60         def check():
61             if not (yield net.PARENT.RPC_CHECK)(bitcoind):
62                 print >>sys.stderr, "    Check failed! Make sure that you're connected to the right bitcoind with --bitcoind-rpc-port!"
63                 raise deferral.RetrySilentlyException()
64             v = (yield bitcoind.rpc_getinfo())['version']
65             temp_work = yield getwork(bitcoind)
66             major, minor, patch = v//10000, v//100%100, v%100
67             if not (major >= 7 or (major == 6 and patch >= 3) or (major == 5 and minor >= 4) or '/P2SH/' in temp_work['coinbaseflags']):
68                 print >>sys.stderr, '    Bitcoin version too old! BIP16 support required! Upgrade to 0.6.0rc4 or greater!'
69                 raise deferral.RetrySilentlyException()
70             defer.returnValue(temp_work)
71         temp_work = yield check()
72         print '    ...success!'
73         print '    Current block hash: %x' % (temp_work['previous_block_hash'],)
74         print
75         
76         # connect to bitcoind over bitcoin-p2p
77         print '''Testing bitcoind P2P connection to '%s:%s'...''' % (args.bitcoind_address, args.bitcoind_p2p_port)
78         factory = bitcoin_p2p.ClientFactory(net.PARENT)
79         reactor.connectTCP(args.bitcoind_address, args.bitcoind_p2p_port, factory)
80         yield factory.getProtocol() # waits until handshake is successful
81         print '    ...success!'
82         print
83         
84         print 'Determining payout address...'
85         if args.pubkey_hash is None:
86             address_path = os.path.join(datadir_path, 'cached_payout_address')
87             
88             if os.path.exists(address_path):
89                 with open(address_path, 'rb') as f:
90                     address = f.read().strip('\r\n')
91                 print '    Loaded cached address: %s...' % (address,)
92             else:
93                 address = None
94             
95             if address is not None:
96                 res = yield deferral.retry('Error validating cached address:', 5)(lambda: bitcoind.rpc_validateaddress(address))()
97                 if not res['isvalid'] or not res['ismine']:
98                     print '    Cached address is either invalid or not controlled by local bitcoind!'
99                     address = None
100             
101             if address is None:
102                 print '    Getting payout address from bitcoind...'
103                 address = yield deferral.retry('Error getting payout address from bitcoind:', 5)(lambda: bitcoind.rpc_getaccountaddress('p2pool'))()
104             
105             with open(address_path, 'wb') as f:
106                 f.write(address)
107             
108             my_pubkey_hash = bitcoin_data.address_to_pubkey_hash(address, net.PARENT)
109         else:
110             my_pubkey_hash = args.pubkey_hash
111         print '    ...success! Payout address:', bitcoin_data.pubkey_hash_to_address(my_pubkey_hash, net.PARENT)
112         print
113         
114         my_share_hashes = set()
115         my_doa_share_hashes = set()
116         
117         tracker = p2pool_data.OkayTracker(net, my_share_hashes, my_doa_share_hashes)
118         shared_share_hashes = set()
119         ss = p2pool_data.ShareStore(os.path.join(datadir_path, 'shares.'), net)
120         known_verified = set()
121         recent_blocks = []
122         print "Loading shares..."
123         for i, (mode, contents) in enumerate(ss.get_shares()):
124             if mode == 'share':
125                 if contents.hash in tracker.shares:
126                     continue
127                 shared_share_hashes.add(contents.hash)
128                 contents.time_seen = 0
129                 tracker.add(contents)
130                 if len(tracker.shares) % 1000 == 0 and tracker.shares:
131                     print "    %i" % (len(tracker.shares),)
132             elif mode == 'verified_hash':
133                 known_verified.add(contents)
134             else:
135                 raise AssertionError()
136         print "    ...inserting %i verified shares..." % (len(known_verified),)
137         for h in known_verified:
138             if h not in tracker.shares:
139                 ss.forget_verified_share(h)
140                 continue
141             tracker.verified.add(tracker.shares[h])
142         print "    ...done loading %i shares!" % (len(tracker.shares),)
143         print
144         tracker.removed.watch(lambda share: ss.forget_share(share.hash))
145         tracker.verified.removed.watch(lambda share: ss.forget_verified_share(share.hash))
146         tracker.removed.watch(lambda share: shared_share_hashes.discard(share.hash))
147         
148         peer_heads = expiring_dict.ExpiringDict(300) # hash -> peers that know of it
149         
150         pre_current_work = variable.Variable(None)
151         pre_merged_work = variable.Variable({})
152         # information affecting work that should trigger a long-polling update
153         current_work = variable.Variable(None)
154         # information affecting work that should not trigger a long-polling update
155         current_work2 = variable.Variable(None)
156         
157         requested = expiring_dict.ExpiringDict(300)
158         
159         print 'Initializing work...'
160         @defer.inlineCallbacks
161         def set_real_work1():
162             work = yield getwork(bitcoind)
163             current_work2.set(dict(
164                 time=work['time'],
165                 transactions=work['transactions'],
166                 merkle_branch=work['merkle_branch'],
167                 subsidy=work['subsidy'],
168                 clock_offset=time.time() - work['time'],
169                 last_update=time.time(),
170             )) # second set first because everything hooks on the first
171             pre_current_work.set(dict(
172                 version=work['version'],
173                 previous_block=work['previous_block_hash'],
174                 bits=work['bits'],
175                 coinbaseflags=work['coinbaseflags'],
176             ))
177         yield set_real_work1()
178         
179         get_height_rel_highest = yield height_tracker.get_height_rel_highest_func(bitcoind, factory, lambda: pre_current_work.value['previous_block'], net)
180         
181         def set_real_work2():
182             best, desired = tracker.think(get_height_rel_highest, pre_current_work.value['previous_block'], pre_current_work.value['bits'])
183             
184             t = dict(pre_current_work.value)
185             t['best_share_hash'] = best
186             t['mm_chains'] = pre_merged_work.value
187             current_work.set(t)
188             
189             t = time.time()
190             for peer2, share_hash in desired:
191                 if share_hash not in tracker.tails: # was received in the time tracker.think was running
192                     continue
193                 last_request_time, count = requested.get(share_hash, (None, 0))
194                 if last_request_time is not None and last_request_time - 5 < t < last_request_time + 10 * 1.5**count:
195                     continue
196                 potential_peers = set()
197                 for head in tracker.tails[share_hash]:
198                     potential_peers.update(peer_heads.get(head, set()))
199                 potential_peers = [peer for peer in potential_peers if peer.connected2]
200                 if count == 0 and peer2 is not None and peer2.connected2:
201                     peer = peer2
202                 else:
203                     peer = random.choice(potential_peers) if potential_peers and random.random() > .2 else peer2
204                     if peer is None:
205                         continue
206                 
207                 print 'Requesting parent share %s from %s' % (p2pool_data.format_hash(share_hash), '%s:%i' % peer.addr)
208                 peer.send_getshares(
209                     hashes=[share_hash],
210                     parents=2000,
211                     stops=list(set(tracker.heads) | set(
212                         tracker.get_nth_parent_hash(head, min(max(0, tracker.get_height_and_last(head)[0] - 1), 10)) for head in tracker.heads
213                     ))[:100],
214                 )
215                 requested[share_hash] = t, count + 1
216         pre_current_work.changed.watch(lambda _: set_real_work2())
217         pre_merged_work.changed.watch(lambda _: set_real_work2())
218         set_real_work2()
219         print '    ...success!'
220         print
221         
222         
223         @defer.inlineCallbacks
224         def set_merged_work(merged_url, merged_userpass):
225             merged_proxy = jsonrpc.Proxy(merged_url, dict(Authorization='Basic ' + base64.b64encode(merged_userpass)))
226             while True:
227                 auxblock = yield deferral.retry('Error while calling merged getauxblock:', 1)(merged_proxy.rpc_getauxblock)()
228                 pre_merged_work.set(dict(pre_merged_work.value, **{auxblock['chainid']: dict(
229                     hash=int(auxblock['hash'], 16),
230                     target=pack.IntType(256).unpack(auxblock['target'].decode('hex')),
231                     merged_proxy=merged_proxy,
232                 )}))
233                 yield deferral.sleep(1)
234         for merged_url, merged_userpass in merged_urls:
235             set_merged_work(merged_url, merged_userpass)
236         
237         @pre_merged_work.changed.watch
238         def _(new_merged_work):
239             print 'Got new merged mining work!'
240         
241         # setup p2p logic and join p2pool network
242         
243         class Node(p2p.Node):
244             def handle_shares(self, shares, peer):
245                 if len(shares) > 5:
246                     print 'Processing %i shares from %s...' % (len(shares), '%s:%i' % peer.addr if peer is not None else None)
247                 
248                 new_count = 0
249                 for share in shares:
250                     if share.hash in tracker.shares:
251                         #print 'Got duplicate share, ignoring. Hash: %s' % (p2pool_data.format_hash(share.hash),)
252                         continue
253                     
254                     new_count += 1
255                     
256                     #print 'Received share %s from %r' % (p2pool_data.format_hash(share.hash), share.peer.addr if share.peer is not None else None)
257                     
258                     tracker.add(share)
259                 
260                 if shares and peer is not None:
261                     peer_heads.setdefault(shares[0].hash, set()).add(peer)
262                 
263                 if new_count:
264                     set_real_work2()
265                 
266                 if len(shares) > 5:
267                     print '... done processing %i shares. New: %i Have: %i/~%i' % (len(shares), new_count, len(tracker.shares), 2*net.CHAIN_LENGTH)
268             
269             def handle_share_hashes(self, hashes, peer):
270                 t = time.time()
271                 get_hashes = []
272                 for share_hash in hashes:
273                     if share_hash in tracker.shares:
274                         continue
275                     last_request_time, count = requested.get(share_hash, (None, 0))
276                     if last_request_time is not None and last_request_time - 5 < t < last_request_time + 10 * 1.5**count:
277                         continue
278                     print 'Got share hash, requesting! Hash: %s' % (p2pool_data.format_hash(share_hash),)
279                     get_hashes.append(share_hash)
280                     requested[share_hash] = t, count + 1
281                 
282                 if hashes and peer is not None:
283                     peer_heads.setdefault(hashes[0], set()).add(peer)
284                 if get_hashes:
285                     peer.send_getshares(hashes=get_hashes, parents=0, stops=[])
286             
287             def handle_get_shares(self, hashes, parents, stops, peer):
288                 parents = min(parents, 1000//len(hashes))
289                 stops = set(stops)
290                 shares = []
291                 for share_hash in hashes:
292                     for share in tracker.get_chain(share_hash, min(parents + 1, tracker.get_height(share_hash))):
293                         if share.hash in stops:
294                             break
295                         shares.append(share)
296                 print 'Sending %i shares to %s:%i' % (len(shares), peer.addr[0], peer.addr[1])
297                 peer.sendShares(shares)
298         
299         @deferral.retry('Error submitting block: (will retry)', 10, 10)
300         @defer.inlineCallbacks
301         def submit_block(block, ignore_failure):
302             success = yield bitcoind.rpc_getmemorypool(bitcoin_data.block_type.pack(block).encode('hex'))
303             success_expected = net.PARENT.POW_FUNC(bitcoin_data.block_header_type.pack(block['header'])) <= block['header']['bits'].target
304             if (not success and success_expected and not ignore_failure) or (success and not success_expected):
305                 print >>sys.stderr, 'Block submittal result: %s Expected: %s' % (result, expected_result)
306         
307         @tracker.verified.added.watch
308         def _(share):
309             if share.pow_hash <= share.header['bits'].target:
310                 submit_block(share.as_block(tracker), ignore_failure=True)
311                 print
312                 print 'GOT BLOCK FROM PEER! Passing to bitcoind! %s bitcoin: %s%064x' % (p2pool_data.format_hash(share.hash), net.PARENT.BLOCK_EXPLORER_URL_PREFIX, share.header_hash)
313                 print
314                 recent_blocks.append(dict(ts=share.timestamp, hash='%064x' % (share.header_hash,)))
315         
316         print 'Joining p2pool network using port %i...' % (args.p2pool_port,)
317         
318         @defer.inlineCallbacks
319         def parse(x):
320             if ':' in x:
321                 ip, port = x.split(':')
322                 defer.returnValue(((yield reactor.resolve(ip)), int(port)))
323             else:
324                 defer.returnValue(((yield reactor.resolve(x)), net.P2P_PORT))
325         
326         addrs = {}
327         if os.path.exists(os.path.join(datadir_path, 'addrs.txt')):
328             try:
329                 addrs.update(dict(eval(x) for x in open(os.path.join(datadir_path, 'addrs.txt'))))
330             except:
331                 print >>sys.stderr, "error reading addrs"
332         for addr_df in map(parse, net.BOOTSTRAP_ADDRS):
333             try:
334                 addr = yield addr_df
335                 if addr not in addrs:
336                     addrs[addr] = (0, time.time(), time.time())
337             except:
338                 log.err()
339         
340         connect_addrs = set()
341         for addr_df in map(parse, args.p2pool_nodes):
342             try:
343                 connect_addrs.add((yield addr_df))
344             except:
345                 log.err()
346         
347         p2p_node = Node(
348             best_share_hash_func=lambda: current_work.value['best_share_hash'],
349             port=args.p2pool_port,
350             net=net,
351             addr_store=addrs,
352             connect_addrs=connect_addrs,
353             max_incoming_conns=args.p2pool_conns,
354         )
355         p2p_node.start()
356         
357         task.LoopingCall(lambda: open(os.path.join(datadir_path, 'addrs.txt'), 'w').writelines(repr(x) + '\n' for x in p2p_node.addr_store.iteritems())).start(60)
358         
359         # send share when the chain changes to their chain
360         def work_changed(new_work):
361             #print 'Work changed:', new_work
362             shares = []
363             for share in tracker.get_chain(new_work['best_share_hash'], min(5, tracker.get_height(new_work['best_share_hash']))):
364                 if share.hash in shared_share_hashes:
365                     break
366                 shared_share_hashes.add(share.hash)
367                 shares.append(share)
368             
369             for peer in p2p_node.peers.itervalues():
370                 peer.sendShares([share for share in shares if share.peer is not peer])
371         
372         current_work.changed.watch(work_changed)
373         
374         def save_shares():
375             for share in tracker.get_chain(current_work.value['best_share_hash'], min(tracker.get_height(current_work.value['best_share_hash']), 2*net.CHAIN_LENGTH)):
376                 ss.add_share(share)
377                 if share.hash in tracker.verified.shares:
378                     ss.add_verified_hash(share.hash)
379         task.LoopingCall(save_shares).start(60)
380         
381         print '    ...success!'
382         print
383         
384         if args.upnp:
385             @defer.inlineCallbacks
386             def upnp_thread():
387                 while True:
388                     try:
389                         is_lan, lan_ip = yield ipdiscover.get_local_ip()
390                         if is_lan:
391                             pm = yield portmapper.get_port_mapper()
392                             yield pm._upnp.add_port_mapping(lan_ip, args.p2pool_port, args.p2pool_port, 'p2pool', 'TCP')
393                     except defer.TimeoutError:
394                         pass
395                     except:
396                         if p2pool.DEBUG:
397                             log.err(None, 'UPnP error:')
398                     yield deferral.sleep(random.expovariate(1/120))
399             upnp_thread()
400         
401         # start listening for workers with a JSON-RPC server
402         
403         print 'Listening for workers on %r port %i...' % (worker_endpoint[0], worker_endpoint[1])
404         
405         if os.path.exists(os.path.join(datadir_path, 'vip_pass')):
406             with open(os.path.join(datadir_path, 'vip_pass'), 'rb') as f:
407                 vip_pass = f.read().strip('\r\n')
408         else:
409             vip_pass = '%016x' % (random.randrange(2**64),)
410             with open(os.path.join(datadir_path, 'vip_pass'), 'wb') as f:
411                 f.write(vip_pass)
412         print '    Worker password:', vip_pass, '(only required for generating graphs)'
413         
414         # setup worker logic
415         
416         removed_unstales_var = variable.Variable((0, 0, 0))
417         removed_doa_unstales_var = variable.Variable(0)
418         @tracker.verified.removed.watch
419         def _(share):
420             if share.hash in my_share_hashes and tracker.is_child_of(share.hash, current_work.value['best_share_hash']):
421                 assert share.share_data['stale_info'] in [0, 253, 254] # we made these shares in this instance
422                 removed_unstales_var.set((
423                     removed_unstales_var.value[0] + 1,
424                     removed_unstales_var.value[1] + (1 if share.share_data['stale_info'] == 253 else 0),
425                     removed_unstales_var.value[2] + (1 if share.share_data['stale_info'] == 254 else 0),
426                 ))
427             if share.hash in my_doa_share_hashes and tracker.is_child_of(share.hash, current_work.value['best_share_hash']):
428                 removed_doa_unstales.set(removed_doa_unstales.value + 1)
429         
430         def get_stale_counts():
431             '''Returns (orphans, doas), total, (orphans_recorded_in_chain, doas_recorded_in_chain)'''
432             my_shares = len(my_share_hashes)
433             my_doa_shares = len(my_doa_share_hashes)
434             delta = tracker.verified.get_delta(current_work.value['best_share_hash'])
435             my_shares_in_chain = delta.my_count + removed_unstales_var.value[0]
436             my_doa_shares_in_chain = delta.my_doa_count + removed_doa_unstales_var.value
437             orphans_recorded_in_chain = delta.my_orphan_announce_count + removed_unstales_var.value[1]
438             doas_recorded_in_chain = delta.my_dead_announce_count + removed_unstales_var.value[2]
439             
440             my_shares_not_in_chain = my_shares - my_shares_in_chain
441             my_doa_shares_not_in_chain = my_doa_shares - my_doa_shares_in_chain
442             
443             return (my_shares_not_in_chain - my_doa_shares_not_in_chain, my_doa_shares_not_in_chain), my_shares, (orphans_recorded_in_chain, doas_recorded_in_chain)
444         
445         
446         pseudoshare_received = variable.Event()
447         local_rate_monitor = math.RateMonitor(10*60)
448         
449         class WorkerBridge(worker_interface.WorkerBridge):
450             def __init__(self):
451                 worker_interface.WorkerBridge.__init__(self)
452                 self.new_work_event = current_work.changed
453                 self.recent_shares_ts_work = []
454             
455             def preprocess_request(self, request):
456                 user = request.getUser() if request.getUser() is not None else ''
457                 
458                 desired_pseudoshare_target = None
459                 if '+' in user:
460                     user, desired_pseudoshare_difficulty_str = user.rsplit('+', 1)
461                     try:
462                         desired_pseudoshare_target = bitcoin_data.difficulty_to_target(float(desired_pseudoshare_difficulty_str))
463                     except:
464                         pass
465                 
466                 desired_share_target = 2**256 - 1
467                 if '/' in user:
468                     user, min_diff_str = user.rsplit('/', 1)
469                     try:
470                         desired_share_target = bitcoin_data.difficulty_to_target(float(min_diff_str))
471                     except:
472                         pass
473                 
474                 if random.uniform(0, 100) < args.worker_fee:
475                     pubkey_hash = my_pubkey_hash
476                 else:
477                     try:
478                         pubkey_hash = bitcoin_data.address_to_pubkey_hash(user, net.PARENT)
479                     except: # XXX blah
480                         pubkey_hash = my_pubkey_hash
481                 
482                 return pubkey_hash, desired_share_target, desired_pseudoshare_target
483             
484             def get_work(self, pubkey_hash, desired_share_target, desired_pseudoshare_target):
485                 if len(p2p_node.peers) == 0 and net.PERSIST:
486                     raise jsonrpc.Error(-12345, u'p2pool is not connected to any peers')
487                 if current_work.value['best_share_hash'] is None and net.PERSIST:
488                     raise jsonrpc.Error(-12345, u'p2pool is downloading shares')
489                 if time.time() > current_work2.value['last_update'] + 60:
490                     raise jsonrpc.Error(-12345, u'lost contact with bitcoind')
491                 
492                 if current_work.value['mm_chains']:
493                     tree, size = bitcoin_data.make_auxpow_tree(current_work.value['mm_chains'])
494                     mm_hashes = [current_work.value['mm_chains'].get(tree.get(i), dict(hash=0))['hash'] for i in xrange(size)]
495                     mm_data = '\xfa\xbemm' + bitcoin_data.aux_pow_coinbase_type.pack(dict(
496                         merkle_root=bitcoin_data.merkle_hash(mm_hashes),
497                         size=size,
498                         nonce=0,
499                     ))
500                     mm_later = [(aux_work, mm_hashes.index(aux_work['hash']), mm_hashes) for chain_id, aux_work in current_work.value['mm_chains'].iteritems()]
501                 else:
502                     mm_data = ''
503                     mm_later = []
504                 
505                 share_info, generate_tx = p2pool_data.Share.generate_transaction(
506                     tracker=tracker,
507                     share_data=dict(
508                         previous_share_hash=current_work.value['best_share_hash'],
509                         coinbase=(mm_data + current_work.value['coinbaseflags'])[:100],
510                         nonce=random.randrange(2**32),
511                         pubkey_hash=pubkey_hash,
512                         subsidy=current_work2.value['subsidy'],
513                         donation=math.perfect_round(65535*args.donation_percentage/100),
514                         stale_info=(lambda (orphans, doas), total, (orphans_recorded_in_chain, doas_recorded_in_chain):
515                             253 if orphans > orphans_recorded_in_chain else
516                             254 if doas > doas_recorded_in_chain else
517                             0
518                         )(*get_stale_counts()),
519                     ),
520                     block_target=current_work.value['bits'].target,
521                     desired_timestamp=int(time.time() - current_work2.value['clock_offset']),
522                     desired_target=desired_share_target,
523                     net=net,
524                 )
525                 
526                 target = net.PARENT.SANE_MAX_TARGET
527                 if desired_pseudoshare_target is None:
528                     if len(self.recent_shares_ts_work) == 50:
529                         hash_rate = sum(work for ts, work in self.recent_shares_ts_work[1:])//(self.recent_shares_ts_work[-1][0] - self.recent_shares_ts_work[0][0])
530                         target = min(target, 2**256//hash_rate)
531                 else:
532                     target = min(target, desired_pseudoshare_target)
533                 target = max(target, share_info['bits'].target)
534                 for aux_work in current_work.value['mm_chains'].itervalues():
535                     target = max(target, aux_work['target'])
536                 
537                 transactions = [generate_tx] + list(current_work2.value['transactions'])
538                 packed_generate_tx = bitcoin_data.tx_type.pack(generate_tx)
539                 merkle_root = bitcoin_data.check_merkle_branch(bitcoin_data.hash256(packed_generate_tx), 0, current_work2.value['merkle_branch'])
540                 
541                 getwork_time = time.time()
542                 merkle_branch = current_work2.value['merkle_branch']
543                 
544                 print 'New work for worker! Difficulty: %.06f Share difficulty: %.06f Total block value: %.6f %s including %i transactions' % (
545                     bitcoin_data.target_to_difficulty(target),
546                     bitcoin_data.target_to_difficulty(share_info['bits'].target),
547                     current_work2.value['subsidy']*1e-8, net.PARENT.SYMBOL,
548                     len(current_work2.value['transactions']),
549                 )
550                 
551                 ba = bitcoin_getwork.BlockAttempt(
552                     version=current_work.value['version'],
553                     previous_block=current_work.value['previous_block'],
554                     merkle_root=merkle_root,
555                     timestamp=current_work2.value['time'],
556                     bits=current_work.value['bits'],
557                     share_target=target,
558                 )
559                 
560                 received_header_hashes = set()
561                 
562                 def got_response(header, request):
563                     assert header['merkle_root'] == merkle_root
564                     
565                     header_hash = bitcoin_data.hash256(bitcoin_data.block_header_type.pack(header))
566                     pow_hash = net.PARENT.POW_FUNC(bitcoin_data.block_header_type.pack(header))
567                     on_time = current_work.value['best_share_hash'] == share_info['share_data']['previous_share_hash']
568                     
569                     try:
570                         if pow_hash <= header['bits'].target or p2pool.DEBUG:
571                             submit_block(dict(header=header, txs=transactions), ignore_failure=False)
572                             if pow_hash <= header['bits'].target:
573                                 print
574                                 print 'GOT BLOCK FROM MINER! Passing to bitcoind! %s%064x' % (net.PARENT.BLOCK_EXPLORER_URL_PREFIX, header_hash)
575                                 print
576                                 recent_blocks.append(dict(ts=time.time(), hash='%064x' % (header_hash,)))
577                     except:
578                         log.err(None, 'Error while processing potential block:')
579                     
580                     for aux_work, index, hashes in mm_later:
581                         try:
582                             if pow_hash <= aux_work['target'] or p2pool.DEBUG:
583                                 df = deferral.retry('Error submitting merged block: (will retry)', 10, 10)(aux_work['merged_proxy'].rpc_getauxblock)(
584                                     pack.IntType(256, 'big').pack(aux_work['hash']).encode('hex'),
585                                     bitcoin_data.aux_pow_type.pack(dict(
586                                         merkle_tx=dict(
587                                             tx=transactions[0],
588                                             block_hash=header_hash,
589                                             merkle_branch=merkle_branch,
590                                             index=0,
591                                         ),
592                                         merkle_branch=bitcoin_data.calculate_merkle_branch(hashes, index),
593                                         index=index,
594                                         parent_block_header=header,
595                                     )).encode('hex'),
596                                 )
597                                 @df.addCallback
598                                 def _(result):
599                                     if result != (pow_hash <= aux_work['target']):
600                                         print >>sys.stderr, 'Merged block submittal result: %s Expected: %s' % (result, pow_hash <= aux_work['target'])
601                                     else:
602                                         print 'Merged block submittal result: %s' % (result,)
603                                 @df.addErrback
604                                 def _(err):
605                                     log.err(err, 'Error submitting merged block:')
606                         except:
607                             log.err(None, 'Error while processing merged mining POW:')
608                     
609                     if pow_hash <= share_info['bits'].target:
610                         min_header = dict(header);del min_header['merkle_root']
611                         hash_link = p2pool_data.prefix_to_hash_link(packed_generate_tx[:-32-4], p2pool_data.Share.gentx_before_refhash)
612                         share = p2pool_data.Share(net, None, min_header, share_info, hash_link=hash_link, merkle_branch=merkle_branch, other_txs=transactions[1:] if pow_hash <= header['bits'].target else None)
613                         
614                         print 'GOT SHARE! %s %s prev %s age %.2fs%s' % (
615                             request.getUser(),
616                             p2pool_data.format_hash(share.hash),
617                             p2pool_data.format_hash(share.previous_hash),
618                             time.time() - getwork_time,
619                             ' DEAD ON ARRIVAL' if not on_time else '',
620                         )
621                         my_share_hashes.add(share.hash)
622                         if not on_time:
623                             my_doa_share_hashes.add(share.hash)
624                         
625                         tracker.add(share)
626                         if not p2pool.DEBUG:
627                             tracker.verified.add(share)
628                         set_real_work2()
629                         
630                         try:
631                             if pow_hash <= header['bits'].target or p2pool.DEBUG:
632                                 for peer in p2p_node.peers.itervalues():
633                                     peer.sendShares([share])
634                                 shared_share_hashes.add(share.hash)
635                         except:
636                             log.err(None, 'Error forwarding block solution:')
637                     
638                     if pow_hash > target:
639                         print 'Worker %s submitted share with hash > target:' % (request.getUser(),)
640                         print '    Hash:   %56x' % (pow_hash,)
641                         print '    Target: %56x' % (target,)
642                     elif header_hash in received_header_hashes:
643                         print >>sys.stderr, 'Worker %s @ %s submitted share more than once!' % (request.getUser(), request.getClientIP())
644                     else:
645                         received_header_hashes.add(header_hash)
646                         
647                         pseudoshare_received.happened(bitcoin_data.target_to_average_attempts(target), not on_time, request.getUser() if request.getPassword() == vip_pass else None)
648                         self.recent_shares_ts_work.append((time.time(), bitcoin_data.target_to_average_attempts(target)))
649                         while len(self.recent_shares_ts_work) > 50:
650                             self.recent_shares_ts_work.pop(0)
651                         local_rate_monitor.add_datum(dict(work=bitcoin_data.target_to_average_attempts(target), dead=not on_time, user=request.getUser()))
652                     
653                     return on_time
654                 
655                 return ba, got_response
656         
657         get_current_txouts = lambda: p2pool_data.get_expected_payouts(tracker, current_work.value['best_share_hash'], current_work.value['bits'].target, current_work2.value['subsidy'], net)
658         
659         web_root = web.get_web_root(tracker, current_work, current_work2, get_current_txouts, datadir_path, net, get_stale_counts, my_pubkey_hash, local_rate_monitor, args.worker_fee, p2p_node, my_share_hashes, recent_blocks, pseudoshare_received)
660         worker_interface.WorkerInterface(WorkerBridge()).attach_to(web_root)
661         
662         deferral.retry('Error binding to worker port:', traceback=False)(reactor.listenTCP)(worker_endpoint[1], server.Site(web_root), interface=worker_endpoint[0])
663         
664         with open(os.path.join(os.path.join(datadir_path, 'ready_flag')), 'wb') as f:
665             pass
666         
667         print '    ...success!'
668         print
669         
670         
671         @defer.inlineCallbacks
672         def work_poller():
673             while True:
674                 flag = factory.new_block.get_deferred()
675                 try:
676                     yield set_real_work1()
677                 except:
678                     log.err()
679                 yield defer.DeferredList([flag, deferral.sleep(15)], fireOnOneCallback=True)
680         work_poller()
681         
682         
683         # done!
684         print 'Started successfully!'
685         print
686         
687         
688         if hasattr(signal, 'SIGALRM'):
689             signal.signal(signal.SIGALRM, lambda signum, frame: reactor.callFromThread(
690                 sys.stderr.write, 'Watchdog timer went off at:\n' + ''.join(traceback.format_stack())
691             ))
692             signal.siginterrupt(signal.SIGALRM, False)
693             task.LoopingCall(signal.alarm, 30).start(1)
694         
695         if args.irc_announce:
696             from twisted.words.protocols import irc
697             class IRCClient(irc.IRCClient):
698                 nickname = 'p2pool%02i' % (random.randrange(100),)
699                 channel = net.ANNOUNCE_CHANNEL
700                 def lineReceived(self, line):
701                     print repr(line)
702                     irc.IRCClient.lineReceived(self, line)
703                 def signedOn(self):
704                     irc.IRCClient.signedOn(self)
705                     self.factory.resetDelay()
706                     self.join(self.channel)
707                     @defer.inlineCallbacks
708                     def new_share(share):
709                         if share.pow_hash <= share.header['bits'].target and abs(share.timestamp - time.time()) < 10*60:
710                             yield deferral.sleep(random.expovariate(1/60))
711                             message = '\x02%s BLOCK FOUND by %s! %s%064x' % (net.NAME.upper(), bitcoin_data.script2_to_address(share.new_script, net.PARENT), net.PARENT.BLOCK_EXPLORER_URL_PREFIX, share.header_hash)
712                             if message not in self.recent_messages:
713                                 self.say(self.channel, message)
714                                 self._remember_message(message)
715                     self.watch_id = tracker.verified.added.watch(new_share)
716                     self.recent_messages = []
717                 def _remember_message(self, message):
718                     self.recent_messages.append(message)
719                     while len(self.recent_message) > 100:
720                         self.recent_messages.pop(0)
721                 def privmsg(self, user, channel, message):
722                     if channel == self.channel:
723                         self._remember_message(message)
724                 def connectionLost(self, reason):
725                     tracker.verified.added.unwatch(self.watch_id)
726                     print 'IRC connection lost:', reason.getErrorMessage()
727             class IRCClientFactory(protocol.ReconnectingClientFactory):
728                 protocol = IRCClient
729             reactor.connectTCP("irc.freenode.net", 6667, IRCClientFactory())
730         
731         @defer.inlineCallbacks
732         def status_thread():
733             last_str = None
734             last_time = 0
735             while True:
736                 yield deferral.sleep(3)
737                 try:
738                     if time.time() > current_work2.value['last_update'] + 60:
739                         print >>sys.stderr, '''---> LOST CONTACT WITH BITCOIND for %s! Check that it isn't frozen or dead! <---''' % (math.format_dt(time.time() - current_work2.value['last_update']),)
740                     
741                     height = tracker.get_height(current_work.value['best_share_hash'])
742                     this_str = 'P2Pool: %i shares in chain (%i verified/%i total) Peers: %i (%i incoming)' % (
743                         height,
744                         len(tracker.verified.shares),
745                         len(tracker.shares),
746                         len(p2p_node.peers),
747                         sum(1 for peer in p2p_node.peers.itervalues() if peer.incoming),
748                     ) + (' FDs: %i R/%i W' % (len(reactor.getReaders()), len(reactor.getWriters())) if p2pool.DEBUG else '')
749                     
750                     datums, dt = local_rate_monitor.get_datums_in_last()
751                     my_att_s = sum(datum['work']/dt for datum in datums)
752                     this_str += '\n Local: %sH/s in last %s Local dead on arrival: %s Expected time to share: %s' % (
753                         math.format(int(my_att_s)),
754                         math.format_dt(dt),
755                         math.format_binomial_conf(sum(1 for datum in datums if datum['dead']), len(datums), 0.95),
756                         math.format_dt(2**256 / tracker.shares[current_work.value['best_share_hash']].max_target / my_att_s) if my_att_s and current_work.value['best_share_hash'] else '???',
757                     )
758                     
759                     if height > 2:
760                         (stale_orphan_shares, stale_doa_shares), shares, _ = get_stale_counts()
761                         stale_prop = p2pool_data.get_average_stale_prop(tracker, current_work.value['best_share_hash'], min(720, height))
762                         real_att_s = p2pool_data.get_pool_attempts_per_second(tracker, current_work.value['best_share_hash'], min(height - 1, 720)) / (1 - stale_prop)
763                         
764                         this_str += '\n Shares: %i (%i orphan, %i dead) Stale rate: %s Efficiency: %s Current payout: %.4f %s' % (
765                             shares, stale_orphan_shares, stale_doa_shares,
766                             math.format_binomial_conf(stale_orphan_shares + stale_doa_shares, shares, 0.95),
767                             math.format_binomial_conf(stale_orphan_shares + stale_doa_shares, shares, 0.95, lambda x: (1 - x)/(1 - stale_prop)),
768                             get_current_txouts().get(bitcoin_data.pubkey_hash_to_script2(my_pubkey_hash), 0)*1e-8, net.PARENT.SYMBOL,
769                         )
770                         this_str += '\n Pool: %sH/s Stale rate: %.1f%% Expected time to block: %s' % (
771                             math.format(int(real_att_s)),
772                             100*stale_prop,
773                             math.format_dt(2**256 / current_work.value['bits'].target / real_att_s),
774                         )
775                     
776                     if this_str != last_str or time.time() > last_time + 15:
777                         print this_str
778                         last_str = this_str
779                         last_time = time.time()
780                 except:
781                     log.err()
782         status_thread()
783     except:
784         reactor.stop()
785         log.err(None, 'Fatal error:')
786
787 def run():
788     class FixedArgumentParser(argparse.ArgumentParser):
789         def _read_args_from_files(self, arg_strings):
790             # expand arguments referencing files
791             new_arg_strings = []
792             for arg_string in arg_strings:
793                 
794                 # for regular arguments, just add them back into the list
795                 if not arg_string or arg_string[0] not in self.fromfile_prefix_chars:
796                     new_arg_strings.append(arg_string)
797                 
798                 # replace arguments referencing files with the file content
799                 else:
800                     try:
801                         args_file = open(arg_string[1:])
802                         try:
803                             arg_strings = []
804                             for arg_line in args_file.read().splitlines():
805                                 for arg in self.convert_arg_line_to_args(arg_line):
806                                     arg_strings.append(arg)
807                             arg_strings = self._read_args_from_files(arg_strings)
808                             new_arg_strings.extend(arg_strings)
809                         finally:
810                             args_file.close()
811                     except IOError:
812                         err = sys.exc_info()[1]
813                         self.error(str(err))
814             
815             # return the modified argument list
816             return new_arg_strings
817         
818         def convert_arg_line_to_args(self, arg_line):
819             return [arg for arg in arg_line.split() if arg.strip()]
820     
821     
822     realnets=dict((name, net) for name, net in networks.nets.iteritems() if '_testnet' not in name)
823     
824     parser = FixedArgumentParser(description='p2pool (version %s)' % (p2pool.__version__,), fromfile_prefix_chars='@')
825     parser.add_argument('--version', action='version', version=p2pool.__version__)
826     parser.add_argument('--net',
827         help='use specified network (default: bitcoin)',
828         action='store', choices=sorted(realnets), default='bitcoin', dest='net_name')
829     parser.add_argument('--testnet',
830         help='''use the network's testnet''',
831         action='store_const', const=True, default=False, dest='testnet')
832     parser.add_argument('--debug',
833         help='enable debugging mode',
834         action='store_const', const=True, default=False, dest='debug')
835     parser.add_argument('-a', '--address',
836         help='generate payouts to this address (default: <address requested from bitcoind>)',
837         type=str, action='store', default=None, dest='address')
838     parser.add_argument('--datadir',
839         help='store data in this directory (default: <directory run_p2pool.py is in>/data)',
840         type=str, action='store', default=None, dest='datadir')
841     parser.add_argument('--logfile',
842         help='''log to this file (default: data/<NET>/log)''',
843         type=str, action='store', default=None, dest='logfile')
844     parser.add_argument('--merged',
845         help='call getauxblock on this url to get work for merged mining (example: http://ncuser:ncpass@127.0.0.1:10332/)',
846         type=str, action='append', default=[], dest='merged_urls')
847     parser.add_argument('--give-author', metavar='DONATION_PERCENTAGE',
848         help='donate this percentage of work to author of p2pool (default: 0.5)',
849         type=float, action='store', default=0.5, dest='donation_percentage')
850     parser.add_argument('--irc-announce',
851         help='announce any blocks found on irc://irc.freenode.net/#p2pool',
852         action='store_true', default=False, dest='irc_announce')
853     
854     p2pool_group = parser.add_argument_group('p2pool interface')
855     p2pool_group.add_argument('--p2pool-port', metavar='PORT',
856         help='use port PORT to listen for connections (forward this port from your router!) (default: %s)' % ', '.join('%s:%i' % (name, net.P2P_PORT) for name, net in sorted(realnets.items())),
857         type=int, action='store', default=None, dest='p2pool_port')
858     p2pool_group.add_argument('-n', '--p2pool-node', metavar='ADDR[:PORT]',
859         help='connect to existing p2pool node at ADDR listening on port PORT (defaults to default p2pool P2P port) in addition to builtin addresses',
860         type=str, action='append', default=[], dest='p2pool_nodes')
861     parser.add_argument('--disable-upnp',
862         help='''don't attempt to use UPnP to forward p2pool's P2P port from the Internet to this computer''',
863         action='store_false', default=True, dest='upnp')
864     p2pool_group.add_argument('--max-conns', metavar='CONNS',
865         help='maximum incoming connections (default: 40)',
866         type=int, action='store', default=40, dest='p2pool_conns')
867     
868     worker_group = parser.add_argument_group('worker interface')
869     worker_group.add_argument('-w', '--worker-port', metavar='PORT or ADDR:PORT',
870         help='listen on PORT on interface with ADDR for RPC connections from miners (default: all interfaces, %s)' % ', '.join('%s:%i' % (name, net.WORKER_PORT) for name, net in sorted(realnets.items())),
871         type=str, action='store', default=None, dest='worker_endpoint')
872     worker_group.add_argument('-f', '--fee', metavar='FEE_PERCENTAGE',
873         help='''charge workers mining to their own bitcoin address (by setting their miner's username to a bitcoin address) this percentage fee to mine on your p2pool instance. Amount displayed at http://127.0.0.1:WORKER_PORT/fee (default: 0)''',
874         type=float, action='store', default=0, dest='worker_fee')
875     
876     bitcoind_group = parser.add_argument_group('bitcoind interface')
877     bitcoind_group.add_argument('--bitcoind-address', metavar='BITCOIND_ADDRESS',
878         help='connect to this address (default: 127.0.0.1)',
879         type=str, action='store', default='127.0.0.1', dest='bitcoind_address')
880     bitcoind_group.add_argument('--bitcoind-rpc-port', metavar='BITCOIND_RPC_PORT',
881         help='''connect to JSON-RPC interface at this port (default: %s <read from bitcoin.conf if password not provided>)''' % ', '.join('%s:%i' % (name, net.PARENT.RPC_PORT) for name, net in sorted(realnets.items())),
882         type=int, action='store', default=None, dest='bitcoind_rpc_port')
883     bitcoind_group.add_argument('--bitcoind-p2p-port', metavar='BITCOIND_P2P_PORT',
884         help='''connect to P2P interface at this port (default: %s <read from bitcoin.conf if password not provided>)''' % ', '.join('%s:%i' % (name, net.PARENT.P2P_PORT) for name, net in sorted(realnets.items())),
885         type=int, action='store', default=None, dest='bitcoind_p2p_port')
886     
887     bitcoind_group.add_argument(metavar='BITCOIND_RPCUSERPASS',
888         help='bitcoind RPC interface username, then password, space-separated (only one being provided will cause the username to default to being empty, and none will cause P2Pool to read them from bitcoin.conf)',
889         type=str, action='store', default=[], nargs='*', dest='bitcoind_rpc_userpass')
890     
891     args = parser.parse_args()
892     
893     if args.debug:
894         p2pool.DEBUG = True
895     
896     net_name = args.net_name + ('_testnet' if args.testnet else '')
897     net = networks.nets[net_name]
898     
899     datadir_path = os.path.join((os.path.join(os.path.dirname(sys.argv[0]), 'data') if args.datadir is None else args.datadir), net_name)
900     if not os.path.exists(datadir_path):
901         os.makedirs(datadir_path)
902     
903     if len(args.bitcoind_rpc_userpass) > 2:
904         parser.error('a maximum of two arguments are allowed')
905     args.bitcoind_rpc_username, args.bitcoind_rpc_password = ([None, None] + args.bitcoind_rpc_userpass)[-2:]
906     
907     if args.bitcoind_rpc_password is None:
908         if not hasattr(net.PARENT, 'CONF_FILE_FUNC'):
909             parser.error('This network has no configuration file function. Manually enter your RPC password.')
910         conf_path = net.PARENT.CONF_FILE_FUNC()
911         if not os.path.exists(conf_path):
912             parser.error('''Bitcoin configuration file not found. Manually enter your RPC password.\r\n'''
913                 '''If you actually haven't created a configuration file, you should create one at %s with the text:\r\n'''
914                 '''\r\n'''
915                 '''server=1\r\n'''
916                 '''rpcpassword=%x''' % (conf_path, random.randrange(2**128)))
917         with open(conf_path, 'rb') as f:
918             cp = ConfigParser.RawConfigParser()
919             cp.readfp(StringIO.StringIO('[x]\r\n' + f.read()))
920             for conf_name, var_name, var_type in [
921                 ('rpcuser', 'bitcoind_rpc_username', str),
922                 ('rpcpassword', 'bitcoind_rpc_password', str),
923                 ('rpcport', 'bitcoind_rpc_port', int),
924                 ('port', 'bitcoind_p2p_port', int),
925             ]:
926                 if getattr(args, var_name) is None and cp.has_option('x', conf_name):
927                     setattr(args, var_name, var_type(cp.get('x', conf_name)))
928         if args.bitcoind_rpc_password is None:
929             parser.error('''Bitcoin configuration file didn't contain an rpcpassword= line! Add one!''')
930     
931     if args.bitcoind_rpc_username is None:
932         args.bitcoind_rpc_username = ''
933     
934     if args.bitcoind_rpc_port is None:
935         args.bitcoind_rpc_port = net.PARENT.RPC_PORT
936     
937     if args.bitcoind_p2p_port is None:
938         args.bitcoind_p2p_port = net.PARENT.P2P_PORT
939     
940     if args.p2pool_port is None:
941         args.p2pool_port = net.P2P_PORT
942     
943     if args.worker_endpoint is None:
944         worker_endpoint = '', net.WORKER_PORT
945     elif ':' not in args.worker_endpoint:
946         worker_endpoint = '', int(args.worker_endpoint)
947     else:
948         addr, port = args.worker_endpoint.rsplit(':', 1)
949         worker_endpoint = addr, int(port)
950     
951     if args.address is not None:
952         try:
953             args.pubkey_hash = bitcoin_data.address_to_pubkey_hash(args.address, net.PARENT)
954         except Exception, e:
955             parser.error('error parsing address: ' + repr(e))
956     else:
957         args.pubkey_hash = None
958     
959     def separate_url(url):
960         s = urlparse.urlsplit(url)
961         if '@' not in s.netloc:
962             parser.error('merged url netloc must contain an "@"')
963         userpass, new_netloc = s.netloc.rsplit('@', 1)
964         return urlparse.urlunsplit(s._replace(netloc=new_netloc)), userpass
965     merged_urls = map(separate_url, args.merged_urls)
966     
967     if args.logfile is None:
968         args.logfile = os.path.join(datadir_path, 'log')
969     
970     logfile = logging.LogFile(args.logfile)
971     pipe = logging.TimestampingPipe(logging.TeePipe([logging.EncodeReplacerPipe(sys.stderr), logfile]))
972     sys.stdout = logging.AbortPipe(pipe)
973     sys.stderr = log.DefaultObserver.stderr = logging.AbortPipe(logging.PrefixPipe(pipe, '> '))
974     if hasattr(signal, "SIGUSR1"):
975         def sigusr1(signum, frame):
976             print 'Caught SIGUSR1, closing %r...' % (args.logfile,)
977             logfile.reopen()
978             print '...and reopened %r after catching SIGUSR1.' % (args.logfile,)
979         signal.signal(signal.SIGUSR1, sigusr1)
980     task.LoopingCall(logfile.reopen).start(5)
981     
982     reactor.callWhenRunning(main, args, net, datadir_path, merged_urls, worker_endpoint)
983     reactor.run()