add warning to web interface
[p2pool.git] / p2pool / main.py
index 04214d9..ac74588 100644 (file)
@@ -4,6 +4,7 @@ import ConfigParser
 import StringIO
 import argparse
 import base64
+import json
 import os
 import random
 import sys
@@ -12,6 +13,13 @@ import signal
 import traceback
 import urlparse
 
+try:
+    from twisted.internet import iocpreactor
+    iocpreactor.install()
+except:
+    pass
+else:
+    print 'Using IOCP reactor!'
 from twisted.internet import defer, reactor, protocol, task
 from twisted.web import server
 from twisted.python import log
@@ -116,7 +124,6 @@ def main(args, net, datadir_path, merged_urls, worker_endpoint):
         shared_share_hashes = set()
         ss = p2pool_data.ShareStore(os.path.join(datadir_path, 'shares.'), net)
         known_verified = set()
-        recent_blocks = []
         print "Loading shares..."
         for i, (mode, contents) in enumerate(ss.get_shares()):
             if mode == 'share':
@@ -294,14 +301,25 @@ def main(args, net, datadir_path, merged_urls, worker_endpoint):
                 print 'Sending %i shares to %s:%i' % (len(shares), peer.addr[0], peer.addr[1])
                 return shares
         
+        @deferral.retry('Error submitting primary block: (will retry)', 10, 10)
+        def submit_block_p2p(block):
+            if factory.conn.value is None:
+                print >>sys.stderr, 'No bitcoind connection when block submittal attempted! %s%32x' % (net.PARENT.BLOCK_EXPLORER_URL_PREFIX, header_hash)
+                raise deferral.RetrySilentlyException()
+            factory.conn.value.send_block(block=block)
+        
         @deferral.retry('Error submitting block: (will retry)', 10, 10)
         @defer.inlineCallbacks
-        def submit_block(block, ignore_failure):
+        def submit_block_rpc(block, ignore_failure):
             success = yield bitcoind.rpc_getmemorypool(bitcoin_data.block_type.pack(block).encode('hex'))
             success_expected = net.PARENT.POW_FUNC(bitcoin_data.block_header_type.pack(block['header'])) <= block['header']['bits'].target
             if (not success and success_expected and not ignore_failure) or (success and not success_expected):
                 print >>sys.stderr, 'Block submittal result: %s Expected: %s' % (result, expected_result)
         
+        def submit_block(block, ignore_failure):
+            submit_block_p2p(block)
+            submit_block_rpc(block, ignore_failure)
+        
         @tracker.verified.added.watch
         def _(share):
             if share.pow_hash <= share.header['bits'].target:
@@ -309,7 +327,8 @@ def main(args, net, datadir_path, merged_urls, worker_endpoint):
                 print
                 print 'GOT BLOCK FROM PEER! Passing to bitcoind! %s bitcoin: %s%064x' % (p2pool_data.format_hash(share.hash), net.PARENT.BLOCK_EXPLORER_URL_PREFIX, share.header_hash)
                 print
-                recent_blocks.append(dict(ts=share.timestamp, hash='%064x' % (share.header_hash,)))
+                if current_work.value['previous_block'] in [share.header['previous_block'], share.header_hash]:
+                    broadcast_share(share.hash)
         
         print 'Joining p2pool network using port %i...' % (args.p2pool_port,)
         
@@ -322,11 +341,17 @@ def main(args, net, datadir_path, merged_urls, worker_endpoint):
                 defer.returnValue(((yield reactor.resolve(x)), net.P2P_PORT))
         
         addrs = {}
-        if os.path.exists(os.path.join(datadir_path, 'addrs.txt')):
+        if os.path.exists(os.path.join(datadir_path, 'addrs')):
+            try:
+                with open(os.path.join(datadir_path, 'addrs'), 'rb') as f:
+                    addrs.update(dict((tuple(k), v) for k, v in json.loads(f.read())))
+            except:
+                print >>sys.stderr, 'error parsing addrs'
+        elif os.path.exists(os.path.join(datadir_path, 'addrs.txt')):
             try:
                 addrs.update(dict(eval(x) for x in open(os.path.join(datadir_path, 'addrs.txt'))))
             except:
-                print >>sys.stderr, "error reading addrs"
+                print >>sys.stderr, "error reading addrs.txt"
         for addr_df in map(parse, net.BOOTSTRAP_ADDRS):
             try:
                 addr = yield addr_df
@@ -352,13 +377,14 @@ def main(args, net, datadir_path, merged_urls, worker_endpoint):
         )
         p2p_node.start()
         
-        task.LoopingCall(lambda: open(os.path.join(datadir_path, 'addrs.txt'), 'w').writelines(repr(x) + '\n' for x in p2p_node.addr_store.iteritems())).start(60)
+        def save_addrs():
+            with open(os.path.join(datadir_path, 'addrs'), 'wb') as f:
+                f.write(json.dumps(p2p_node.addr_store.items()))
+        task.LoopingCall(save_addrs).start(60)
         
-        # send share when the chain changes to their chain
-        def work_changed(new_work):
-            #print 'Work changed:', new_work
+        def broadcast_share(share_hash):
             shares = []
-            for share in tracker.get_chain(new_work['best_share_hash'], min(5, tracker.get_height(new_work['best_share_hash']))):
+            for share in tracker.get_chain(share_hash, min(5, tracker.get_height(share_hash))):
                 if share.hash in shared_share_hashes:
                     break
                 shared_share_hashes.add(share.hash)
@@ -367,7 +393,8 @@ def main(args, net, datadir_path, merged_urls, worker_endpoint):
             for peer in p2p_node.peers.itervalues():
                 peer.sendShares([share for share in shares if share.peer is not peer])
         
-        current_work.changed.watch(work_changed)
+        # send share when the chain changes to their chain
+        current_work.changed.watch(lambda new_work: broadcast_share(new_work['best_share_hash']))
         
         def save_shares():
             for share in tracker.get_chain(current_work.value['best_share_hash'], min(tracker.get_height(current_work.value['best_share_hash']), 2*net.CHAIN_LENGTH)):
@@ -400,14 +427,6 @@ def main(args, net, datadir_path, merged_urls, worker_endpoint):
         
         print 'Listening for workers on %r port %i...' % (worker_endpoint[0], worker_endpoint[1])
         
-        if os.path.exists(os.path.join(datadir_path, 'vip_pass')):
-            with open(os.path.join(datadir_path, 'vip_pass'), 'rb') as f:
-                vip_pass = f.read().strip('\r\n')
-        else:
-            vip_pass = '%016x' % (random.randrange(2**64),)
-            with open(os.path.join(datadir_path, 'vip_pass'), 'wb') as f:
-                f.write(vip_pass)
-        
         # setup worker logic
         
         removed_unstales_var = variable.Variable((0, 0, 0))
@@ -428,7 +447,7 @@ def main(args, net, datadir_path, merged_urls, worker_endpoint):
             '''Returns (orphans, doas), total, (orphans_recorded_in_chain, doas_recorded_in_chain)'''
             my_shares = len(my_share_hashes)
             my_doa_shares = len(my_doa_share_hashes)
-            delta = tracker.verified.get_delta(current_work.value['best_share_hash'])
+            delta = tracker.verified.get_delta_to_last(current_work.value['best_share_hash'])
             my_shares_in_chain = delta.my_count + removed_unstales_var.value[0]
             my_doa_shares_in_chain = delta.my_doa_count + removed_doa_unstales_var.value
             orphans_recorded_in_chain = delta.my_orphan_announce_count + removed_unstales_var.value[1]
@@ -450,7 +469,7 @@ def main(args, net, datadir_path, merged_urls, worker_endpoint):
                 self.new_work_event = current_work.changed
                 self.recent_shares_ts_work = []
             
-            def preprocess_request(self, request):
+            def get_user_details(self, request):
                 user = request.getUser() if request.getUser() is not None else ''
                 
                 desired_pseudoshare_target = None
@@ -477,6 +496,10 @@ def main(args, net, datadir_path, merged_urls, worker_endpoint):
                     except: # XXX blah
                         pubkey_hash = my_pubkey_hash
                 
+                return user, pubkey_hash, desired_share_target, desired_pseudoshare_target
+            
+            def preprocess_request(self, request):
+                user, pubkey_hash, desired_share_target, desired_pseudoshare_target = self.get_user_details(request)
                 return pubkey_hash, desired_share_target, desired_pseudoshare_target
             
             def get_work(self, pubkey_hash, desired_share_target, desired_pseudoshare_target):
@@ -561,6 +584,7 @@ def main(args, net, datadir_path, merged_urls, worker_endpoint):
                 received_header_hashes = set()
                 
                 def got_response(header, request):
+                    user, _, _, _ = self.get_user_details(request)
                     assert header['merkle_root'] == merkle_root
                     
                     header_hash = bitcoin_data.hash256(bitcoin_data.block_header_type.pack(header))
@@ -574,7 +598,6 @@ def main(args, net, datadir_path, merged_urls, worker_endpoint):
                                 print
                                 print 'GOT BLOCK FROM MINER! Passing to bitcoind! %s%064x' % (net.PARENT.BLOCK_EXPLORER_URL_PREFIX, header_hash)
                                 print
-                                recent_blocks.append(dict(ts=time.time(), hash='%064x' % (header_hash,)))
                     except:
                         log.err(None, 'Error while processing potential block:')
                     
@@ -648,11 +671,11 @@ def main(args, net, datadir_path, merged_urls, worker_endpoint):
                     else:
                         received_header_hashes.add(header_hash)
                         
-                        pseudoshare_received.happened(bitcoin_data.target_to_average_attempts(target), not on_time, request.getUser(), request.getPassword() == vip_pass)
+                        pseudoshare_received.happened(bitcoin_data.target_to_average_attempts(target), not on_time, user)
                         self.recent_shares_ts_work.append((time.time(), bitcoin_data.target_to_average_attempts(target)))
                         while len(self.recent_shares_ts_work) > 50:
                             self.recent_shares_ts_work.pop(0)
-                        local_rate_monitor.add_datum(dict(work=bitcoin_data.target_to_average_attempts(target), dead=not on_time, user=request.getUser()))
+                        local_rate_monitor.add_datum(dict(work=bitcoin_data.target_to_average_attempts(target), dead=not on_time, user=user))
                     
                     return on_time
                 
@@ -660,7 +683,7 @@ def main(args, net, datadir_path, merged_urls, worker_endpoint):
         
         get_current_txouts = lambda: p2pool_data.get_expected_payouts(tracker, current_work.value['best_share_hash'], current_work.value['bits'].target, current_work2.value['subsidy'], net)
         
-        web_root = web.get_web_root(tracker, current_work, current_work2, get_current_txouts, datadir_path, net, get_stale_counts, my_pubkey_hash, local_rate_monitor, args.worker_fee, p2p_node, my_share_hashes, recent_blocks, pseudoshare_received, share_received)
+        web_root = web.get_web_root(tracker, current_work, current_work2, get_current_txouts, datadir_path, net, get_stale_counts, my_pubkey_hash, local_rate_monitor, args.worker_fee, p2p_node, my_share_hashes, pseudoshare_received, share_received)
         worker_interface.WorkerInterface(WorkerBridge()).attach_to(web_root, get_handler=lambda request: request.redirect('/static/'))
         
         deferral.retry('Error binding to worker port:', traceback=False)(reactor.listenTCP)(worker_endpoint[1], server.Site(web_root), interface=worker_endpoint[0])
@@ -786,13 +809,9 @@ def main(args, net, datadir_path, merged_urls, worker_endpoint):
                             math.format_dt(2**256 / current_work.value['bits'].target / real_att_s),
                         )
                         
-                        desired_version_counts = p2pool_data.get_desired_version_counts(tracker, current_work.value['best_share_hash'], min(720, height))
-                        majority_desired_version = max(desired_version_counts, key=lambda k: desired_version_counts[k])
-                        if majority_desired_version not in [0, 1]:
+                        for warning in p2pool_data.get_warnings(tracker, current_work):
                             print >>sys.stderr, '#'*40
-                            print >>sys.stderr, '>>> WARNING: A MAJORITY OF SHARES CONTAIN A VOTE FOR AN UNSUPPORTED SHARE IMPLEMENTATION! (v%i with %i%% support)' % (
-                                majority_desired_version, 100*desired_version_counts[majority_desired_version]/sum(desired_version_counts.itervalues()))
-                            print >>sys.stderr, '>>> An upgrade is likely necessary. Check http://p2pool.forre.st/ for more information.'
+                            print >>sys.stderr, '>>> Warning: ' + warning
                             print >>sys.stderr, '#'*40
                     
                     if this_str != last_str or time.time() > last_time + 15: