Mysql share reporting fixed. Update config template.
authorkha0S <miguel@khore.org>
Thu, 20 Feb 2014 00:11:15 +0000 (00:11 +0000)
committerkha0S <miguel@khore.org>
Thu, 20 Feb 2014 00:11:15 +0000 (00:11 +0000)
conf/config_sample.py
lib/template_registry.py
lib/util.py
mining/DBInterface.py [new file with mode: 0644]
mining/DB_Mysql.py [new file with mode: 0644]
mining/DB_None.py [new file with mode: 0644]
mining/basic_share_limiter.py [new file with mode: 0644]
mining/interfaces.py
mining/service.py
mining/subscription.py

index b57346a..e45988b 100644 (file)
@@ -1,6 +1,6 @@
 '''
 This is example configuration for Stratum server.
-Please rename it to settings.py and fill correct values.
+Please rename it to config.py and fill correct values.
 '''
 
 # ******************** GENERAL SETTINGS ***************
@@ -58,17 +58,45 @@ ADMIN_PASSWORD_SHA256 = None
 
 IRC_NICK = None
 
-'''
-DATABASE_DRIVER = 'MySQLdb'
-DATABASE_HOST = 'localhost'
-DATABASE_DBNAME = 'pooldb'
-DATABASE_USER = 'pooldb'
-DATABASE_PASSWORD = '**empty**'
-'''
+
+DATABASE_DRIVER = 'mysql'
+DATABASE_EXTEND = False         # False = pushpool db layout, True = pushpool + extra columns
+DB_MYSQL_HOST = 'localhost'
+DB_MYSQL_DBNAME = 'pooldb'
+DB_MYSQL_USER = 'pooldb'
+DB_MYSQL_PASS = '**empty**'
+
 
 # Pool related settings
 INSTANCE_ID = 31
 CENTRAL_WALLET = '4WpFe4iTc8zC3UHAzdQX6w9BcRuXFxvPqm' # local novacoin address where money goes
 PREVHASH_REFRESH_INTERVAL = 5 # in sec
 MERKLE_REFRESH_INTERVAL = 60 # How often check memorypool
-COINBASE_EXTRAS = '/stratum/'
+COINBASE_EXTRAS = ''
+
+# ******************** Pool Difficulty Settings *********************
+#  Again, Don't change unless you know what this is for.
+
+# Pool Target (Base Difficulty)
+POOL_TARGET = 32                # Pool-wide difficulty target int >= 1
+
+# Variable Difficulty Enable
+VARIABLE_DIFF = False           # Master variable difficulty enable
+
+# Variable diff tuning variables
+VDIFF_TARGET = 15               # Target time per share (i.e. try to get 1 share per this many seconds)
+VDIFF_RETARGET = 120            # Check to see if we should retarget this often
+VDIFF_VARIANCE_PERCENT = 50     # Allow average time to very this % from target without retarget
+
+# ******************** Adv. DB Settings *********************
+#  Don't change these unless you know what you are doing
+
+DB_LOADER_CHECKTIME = 15        # How often we check to see if we should run the loader
+DB_LOADER_REC_MIN = 10          # Min Records before the bulk loader fires
+DB_LOADER_REC_MAX = 20          # Max Records the bulk loader will commit at a time
+
+DB_STATS_AVG_TIME = 30          # When using the DATABASE_EXTEND option, average speed over X sec
+                                #       Note: this is also how often it updates
+DB_USERCACHE_TIME = 600         # How long the usercache is good for before we refresh
+
+USERS_AUTOADD = False
index bd1bca7..e367d73 100644 (file)
@@ -16,56 +16,56 @@ class JobIdGenerator(object):
     '''Generate pseudo-unique job_id. It does not need to be absolutely unique,
     because pool sends "clean_jobs" flag to clients and they should drop all previous jobs.'''
     counter = 0
-
+    
     @classmethod
     def get_new_id(cls):
         cls.counter += 1
         if cls.counter % 0xffff == 0:
             cls.counter = 1
         return "%x" % cls.counter
-
+                
 class TemplateRegistry(object):
     '''Implements the main logic of the pool. Keep track
     on valid block templates, provide internal interface for stratum
     service and implements block validation and submits.'''
-
+    
     def __init__(self, block_template_class, coinbaser, bitcoin_rpc, instance_id,
                  on_template_callback, on_block_callback):
         self.prevhashes = {}
         self.jobs = weakref.WeakValueDictionary()
-
+        
         self.extranonce_counter = ExtranonceCounter(instance_id)
         self.extranonce2_size = block_template_class.coinbase_transaction_class.extranonce_size \
                 - self.extranonce_counter.get_size()
-
+                 
         self.coinbaser = coinbaser
         self.block_template_class = block_template_class
         self.bitcoin_rpc = bitcoin_rpc
         self.on_block_callback = on_block_callback
         self.on_template_callback = on_template_callback
-
+        
         self.last_block = None
         self.update_in_progress = False
         self.last_update = None
-
+        
         # Create first block template on startup
         self.update_block()
-
+        
     def get_new_extranonce1(self):
         '''Generates unique extranonce1 (e.g. for newly
         subscribed connection.'''
         return self.extranonce_counter.get_new_bin()
-
+    
     def get_last_broadcast_args(self):
         '''Returns arguments for mining.notify
         from last known template.'''
         return self.last_block.broadcast_args
-
-    def add_template(self, block):
+        
+    def add_template(self, block,block_height):
         '''Adds new template to the registry.
         It also clean up templates which should
         not be used anymore.'''
-
+        
         prevhash = block.prevhash_hex
 
         if prevhash in self.prevhashes.keys():
@@ -73,73 +73,73 @@ class TemplateRegistry(object):
         else:
             new_block = True
             self.prevhashes[prevhash] = []
-
+               
         # Blocks sorted by prevhash, so it's easy to drop
         # them on blockchain update
         self.prevhashes[prevhash].append(block)
-
+        
         # Weak reference for fast lookup using job_id
         self.jobs[block.job_id] = block
-
+        
         # Use this template for every new request
         self.last_block = block
-
+        
         # Drop templates of obsolete blocks
         for ph in self.prevhashes.keys():
             if ph != prevhash:
                 del self.prevhashes[ph]
-
+                
         log.info("New template for %s" % prevhash)
 
         if new_block:
             # Tell the system about new block
             # It is mostly important for share manager
-            self.on_block_callback(prevhash)
+            self.on_block_callback(prevhash, block_height)
 
         # Everything is ready, let's broadcast jobs!
         self.on_template_callback(new_block)
-
+        
 
         #from twisted.internet import reactor
-        #reactor.callLater(10, self.on_block_callback, new_block)
-
+        #reactor.callLater(10, self.on_block_callback, new_block) 
+              
     def update_block(self):
         '''Registry calls the getblocktemplate() RPC
         and build new block template.'''
-
+        
         if self.update_in_progress:
             # Block has been already detected
             return
-
+        
         self.update_in_progress = True
         self.last_update = Interfaces.timestamper.time()
-
+        
         d = self.bitcoin_rpc.getblocktemplate()
         d.addCallback(self._update_block)
         d.addErrback(self._update_block_failed)
-
+        
     def _update_block_failed(self, failure):
         log.error(str(failure))
         self.update_in_progress = False
-
+        
     def _update_block(self, data):
         start = Interfaces.timestamper.time()
-
+                
         template = self.block_template_class(Interfaces.timestamper, self.coinbaser, JobIdGenerator.get_new_id())
         template.fill_from_rpc(data)
-        self.add_template(template)
+        self.add_template(template,data['height'])
 
         log.info("Update finished, %.03f sec, %d txes" % \
                     (Interfaces.timestamper.time() - start, len(template.vtx)))
-
-        self.update_in_progress = False
+        
+        self.update_in_progress = False        
         return data
-
+    
     def diff_to_target(self, difficulty):
         '''Converts difficulty to target'''
         diff1 = 0x0000ffff00000000000000000000000000000000000000000000000000000000 
         return diff1 / difficulty
-
+    
     def get_job(self, job_id):
         '''For given job_id returns BlockTemplate instance or None'''
         try:
@@ -147,85 +147,87 @@ class TemplateRegistry(object):
         except:
             log.info("Job id '%s' not found" % job_id)
             return None
-
+        
         # Now we have to check if job is still valid.
         # Unfortunately weak references are not bulletproof and
         # old reference can be found until next run of garbage collector.
         if j.prevhash_hex not in self.prevhashes:
             log.info("Prevhash of job '%s' is unknown" % job_id)
             return None
-
+        
         if j not in self.prevhashes[j.prevhash_hex]:
             log.info("Job %s is unknown" % job_id)
             return None
-
+        
         return j
-
-    def submit_share(self, job_id, worker_name, extranonce1_bin, extranonce2, ntime, nonce,
+        
+    def submit_share(self, job_id, worker_name, session, extranonce1_bin, extranonce2, ntime, nonce,
                      difficulty):
         '''Check parameters and finalize block template. If it leads
            to valid block candidate, asynchronously submits the block
-           back to the novacoin network.
-
+           back to the bitcoin network.
+        
             - extranonce1_bin is binary. No checks performed, it should be from session data
             - job_id, extranonce2, ntime, nonce - in hex form sent by the client
             - difficulty - decimal number from session, again no checks performed
             - submitblock_callback - reference to method which receive result of submitblock()
         '''
-
+        
         # Check if extranonce2 looks correctly. extranonce2 is in hex form...
         if len(extranonce2) != self.extranonce2_size * 2:
             raise SubmitException("Incorrect size of extranonce2. Expected %d chars" % (self.extranonce2_size*2))
-
+        
         # Check for job
         job = self.get_job(job_id)
         if job == None:
             raise SubmitException("Job '%s' not found" % job_id)
-
+                
         # Check if ntime looks correct
         if len(ntime) != 8:
             raise SubmitException("Incorrect size of ntime. Expected 8 chars")
 
         if not job.check_ntime(int(ntime, 16)):
             raise SubmitException("Ntime out of range")
-
-        # Check nonce
+        
+        # Check nonce        
         if len(nonce) != 8:
             raise SubmitException("Incorrect size of nonce. Expected 8 chars")
-
+        
         # Check for duplicated submit
         if not job.register_submit(extranonce1_bin, extranonce2, ntime, nonce):
             log.info("Duplicate from %s, (%s %s %s %s)" % \
                     (worker_name, binascii.hexlify(extranonce1_bin), extranonce2, ntime, nonce))
             raise SubmitException("Duplicate share")
-
+        
         # Now let's do the hard work!
         # ---------------------------
-
+        
         # 0. Some sugar
         extranonce2_bin = binascii.unhexlify(extranonce2)
         ntime_bin = binascii.unhexlify(ntime)
         nonce_bin = binascii.unhexlify(nonce)
-
+                
         # 1. Build coinbase
         coinbase_bin = job.serialize_coinbase(extranonce1_bin, extranonce2_bin)
         coinbase_hash = util.doublesha(coinbase_bin)
-
+        
         # 2. Calculate merkle root
         merkle_root_bin = job.merkletree.withFirst(coinbase_hash)
         merkle_root_int = util.uint256_from_str(merkle_root_bin)
-
+                
         # 3. Serialize header with given merkle, ntime and nonce
         header_bin = job.serialize_header(merkle_root_int, ntime_bin, nonce_bin)
-
+    
         # 4. Reverse header and compare it with target of the user
         hash_bin = util.scrypt(''.join([ header_bin[i*4:i*4+4][::-1] for i in range(0, 20) ]))
         hash_int = util.uint256_from_str(hash_bin)
         block_hash_hex = "%064x" % hash_int
         header_hex = binascii.hexlify(header_bin)
-
-        target_user = self.diff_to_target(difficulty)
-        if hash_int > target_user:
+                 
+        target_user = self.diff_to_target(difficulty)        
+        if hash_int > target_user and \
+               ( 'prev_jobid' not in session or session['prev_jobid'] < job_id \
+               or 'prev_diff' not in session or hash_int > self.diff_to_target(session['prev_diff']) ):
             raise SubmitException("Share is above target")
 
         # Mostly for debugging purposes
@@ -233,22 +235,25 @@ class TemplateRegistry(object):
         if hash_int <= target_info:
             log.info("Yay, share with diff above 100000")
 
-        # 5. Compare hash with target of the network
+       # Algebra tells us the diff_to_target is the same as hash_to_diff
+       share_diff = int(self.diff_to_target(hash_int))
+
+        # 5. Compare hash with target of the network        
         if hash_int <= job.target:
-            # Yay! It is block candidate!
+            # Yay! It is block candidate! 
             log.info("We found a block candidate! %s" % block_hash_hex)
-
-            # 6. Finalize and serialize block object
+           
+            # 6. Finalize and serialize block object 
             job.finalize(merkle_root_int, extranonce1_bin, extranonce2_bin, int(ntime, 16), int(nonce, 16))
-
+            
             if not job.is_valid():
                 # Should not happen
                 log.error("Final job validation failed!")
-
+                            
             # 7. Submit block to the network
             serialized = binascii.hexlify(job.serialize())
             on_submit = self.bitcoin_rpc.submitblock(serialized)
-
-            return (header_hex, block_hash_hex, on_submit)
-
-        return (header_hex, block_hash_hex, None)
+            
+            return (header_hex, block_hash_hex, share_diff, on_submit)
+        
+        return (header_hex, block_hash_hex, share_diff, None)
index 12b4593..d9570c7 100644 (file)
@@ -143,6 +143,17 @@ def b58decode(v, length):
 
     return result
 
+def b58encode(value):
+    """ encode integer 'value' as a base58 string; returns string
+    """
+    encoded = ''
+    while value >= __b58base:
+        div, mod = divmod(value, __b58base)
+        encoded = __b58chars[mod] + encoded # add to left
+        value = div
+    encoded = __b58chars[value] + encoded # most significant remainder
+    return encoded
+
 def reverse_hash(h):
     # This only revert byte order, nothing more
     if len(h) != 64:
diff --git a/mining/DBInterface.py b/mining/DBInterface.py
new file mode 100644 (file)
index 0000000..4152e6e
--- /dev/null
@@ -0,0 +1,222 @@
+from twisted.internet import reactor, defer
+import time
+from datetime import datetime
+import Queue
+
+from stratum import settings
+
+import stratum.logger
+log = stratum.logger.get_logger('DBInterface')
+
+class DBInterface():
+    def __init__(self):
+       self.dbi = self.connectDB()
+
+    def init_main(self):
+       self.dbi.check_tables()
+       self.q = Queue.Queue()
+        self.queueclock = None
+
+       self.usercache = {}
+        self.clearusercache()
+
+       self.nextStatsUpdate = 0
+
+        self.scheduleImport()
+
+    def set_bitcoinrpc(self,bitcoinrpc):
+       self.bitcoinrpc=bitcoinrpc
+
+    def connectDB(self):
+       # Choose our database driver and put it in self.dbi
+       if settings.DATABASE_DRIVER == "sqlite":
+               log.debug('DB_Sqlite INIT')
+               import DB_Sqlite
+               return DB_Sqlite.DB_Sqlite()
+       elif settings.DATABASE_DRIVER == "mysql":
+               log.debug('DB_Mysql INIT')
+               import DB_Mysql
+               return DB_Mysql.DB_Mysql()
+       elif settings.DATABASE_DRIVER == "postgresql":
+               log.debug('DB_Postgresql INIT')
+               import DB_Postgresql
+               return DB_Postgresql.DB_Postgresql()
+       elif settings.DATABASE_DRIVER == "none":
+               log.debug('DB_None INIT')
+               import DB_None
+               return DB_None.DB_None()
+       else:
+               log.error('Invalid DATABASE_DRIVER -- using NONE')
+               log.debug('DB_None INIT')
+               import DB_None
+               return DB_None.DB_None()
+
+    def clearusercache(self):
+       self.usercache = {}
+        self.usercacheclock = reactor.callLater( settings.DB_USERCACHE_TIME , self.clearusercache)
+
+    def scheduleImport(self):
+       # This schedule's the Import
+       use_thread = True
+       if settings.DATABASE_DRIVER == "sqlite":
+           use_thread = False
+       
+       if use_thread:
+            self.queueclock = reactor.callLater( settings.DB_LOADER_CHECKTIME , self.run_import_thread)
+       else:
+            self.queueclock = reactor.callLater( settings.DB_LOADER_CHECKTIME , self.run_import)
+    
+    def run_import_thread(self):
+       if self.q.qsize() >= settings.DB_LOADER_REC_MIN:        # Don't incur thread overhead if we're not going to run
+               reactor.callInThread(self.import_thread)
+       self.scheduleImport()
+
+    def run_import(self):
+       self.do_import(self.dbi,False)
+       if settings.DATABASE_EXTEND and time.time() > self.nextStatsUpdate :
+           self.nextStatsUpdate = time.time() + settings.DB_STATS_AVG_TIME
+           self.dbi.updateStats(settings.DB_STATS_AVG_TIME)
+            d = self.bitcoinrpc.getinfo()
+            d.addCallback(self._update_pool_info)
+           if settings.ARCHIVE_SHARES :
+               self.archive_shares(self.dbi)
+       self.scheduleImport()
+
+    def import_thread(self):
+       # Here we are in the thread.
+       dbi = self.connectDB()  
+       self.do_import(dbi,False)
+       if settings.DATABASE_EXTEND and time.time() > self.nextStatsUpdate :
+           self.nextStatsUpdate = time.time() + settings.DB_STATS_AVG_TIME
+           dbi.updateStats(settings.DB_STATS_AVG_TIME)
+            d = self.bitcoinrpc.getinfo()
+            d.addCallback(self._update_pool_info)
+           if settings.ARCHIVE_SHARES :
+               self.archive_shares(dbi)
+       dbi.close()
+
+    def _update_pool_info(self,data):
+       self.dbi.update_pool_info({ 'blocks' : data['blocks'], 'balance' : data['balance'], 
+               'connections' : data['connections'], 'difficulty' : data['difficulty'] })
+
+    def do_import(self,dbi,force):
+       # Only run if we have data
+       while force == True or self.q.qsize() >= settings.DB_LOADER_REC_MIN:
+           force = False
+           # Put together the data we want to import
+           sqldata = []
+           datacnt = 0
+           while self.q.empty() == False and datacnt < settings.DB_LOADER_REC_MAX :
+               datacnt += 1
+               data = self.q.get()
+               sqldata.append(data)
+               self.q.task_done()
+           # try to do the import, if we fail, log the error and put the data back in the queue
+           try:
+               log.info("Inserting %s Share Records",datacnt)
+               dbi.import_shares(sqldata)
+           except Exception as e:
+               log.error("Insert Share Records Failed: %s", e.args[0])
+               for k,v in enumerate(sqldata):
+                   self.q.put(v)
+               break           # Allows us to sleep a little
+
+    def archive_shares(self,dbi):
+       found_time = dbi.archive_check()
+       if found_time == 0:
+           return False
+       log.info("Archiving shares newer than timestamp %f " % found_time)
+       dbi.archive_found(found_time)
+       if settings.ARCHIVE_MODE == 'db':
+           dbi.archive_to_db(found_time)
+           dbi.archive_cleanup(found_time)
+       elif settings.ARCHIVE_MODE == 'file':
+           shares = dbi.archive_get_shares(found_time)
+
+           filename = settings.ARCHIVE_FILE
+           if settings.ARCHIVE_FILE_APPEND_TIME :
+               filename = filename + "-" + datetime.fromtimestamp(found_time).strftime("%Y-%m-%d-%H-%M-%S")
+           filename = filename + ".csv"
+
+           if settings.ARCHIVE_FILE_COMPRESS == 'gzip' :
+               import gzip
+               filename = filename + ".gz"
+               filehandle = gzip.open(filename, 'a')   
+           elif settings.ARCHIVE_FILE_COMPRESS == 'bzip2' and settings.ARCHIVE_FILE_APPEND_TIME :
+               import bz2
+               filename = filename + ".bz2"
+               filehandle = bz2.BZFile(filename, mode='wb', buffering=4096 )
+           else:
+               filehandle = open(filename, "a")
+
+           while True: 
+               row = shares.fetchone()
+               if row == None:
+                   break
+               str1 = '","'.join([str(x) for x in row])
+               filehandle.write('"%s"\n' % str1)
+           filehandle.close()
+
+           clean = False
+           while not clean:
+               try:
+                   dbi.archive_cleanup(found_time)
+                   clean = True
+               except Exception as e:
+                   clean = False
+                   log.error("Archive Cleanup Failed... will retry to cleanup in 30 seconds")
+                   sleep(30)
+               
+       return True
+
+    def queue_share(self,data):
+       self.q.put( data )
+
+    def found_block(self,data):
+       try:
+           log.info("Updating Found Block Share Record")
+           self.do_import(self.dbi,True)       # We can't Update if the record is not there.
+           self.dbi.found_block(data)
+       except Exception as e:
+           log.error("Update Found Block Share Record Failed: %s", e.args[0])
+
+    def check_password(self,username,password):
+       if username == "":
+           log.info("Rejected worker for blank username")
+           return False
+       wid = username+":-:"+password
+       if wid in self.usercache :
+           return True
+       elif self.dbi.check_password(username,password) :
+           self.usercache[wid] = 1
+           return True
+       elif settings.USERS_AUTOADD == True :
+           self.insert_user(username,password)
+           self.usercache[wid] = 1
+           return True
+       return False
+
+    def insert_user(self,username,password):   
+       return self.dbi.insert_user(username,password)
+
+    def delete_user(self,username):
+       self.usercache = {}
+       return self.dbi.delete_user(username)
+       
+    def update_user(self,username,password):
+       self.usercache = {}
+       return self.dbi.update_user(username,password)
+
+    def update_worker_diff(self,username,diff):
+       return self.dbi.update_worker_diff(username,diff)
+
+    def get_pool_stats(self):
+       return self.dbi.get_pool_stats()
+    
+    def get_workers_stats(self):
+       return self.dbi.get_workers_stats()
+
+    def clear_worker_diff(self):
+       return self.dbi.clear_worker_diff()
+
diff --git a/mining/DB_Mysql.py b/mining/DB_Mysql.py
new file mode 100644 (file)
index 0000000..146d22a
--- /dev/null
@@ -0,0 +1,246 @@
+import time
+from stratum import settings
+import stratum.logger
+log = stratum.logger.get_logger('DB_Mysql')
+
+import MySQLdb
+                
+class DB_Mysql():
+    def __init__(self):
+       log.debug("Connecting to DB")
+       self.dbh = MySQLdb.connect(settings.DB_MYSQL_HOST,settings.DB_MYSQL_USER,settings.DB_MYSQL_PASS,settings.DB_MYSQL_DBNAME)
+       self.dbc = self.dbh.cursor()
+
+    def updateStats(self,averageOverTime):
+       log.debug("Updating Stats")
+       # Note: we are using transactions... so we can set the speed = 0 and it doesn't take affect until we are commited.
+       #self.dbc.execute("update pool_worker set speed = 0, alive = 0");
+       #stime = '%.0f' % ( time.time() - averageOverTime );
+       #self.dbc.execute("select username,SUM(difficulty) from shares where time > FROM_UNIXTIME(%s) group by username", (stime,))
+       #total_speed = 0
+       #for name,shares in self.dbc.fetchall():
+       #    speed = int(int(shares) * pow(2,32)) / ( int(averageOverTime) * 1000 * 1000)
+       #    total_speed += speed
+       #    self.dbc.execute("update pool_worker set speed = %s, alive = 1 where username = %s", (speed,name))
+       #self.dbc.execute("update pool set value = %s where parameter = 'pool_speed'",[total_speed])
+       self.dbh.commit()
+    
+    def archive_check(self):
+       # Check for found shares to archive
+       #self.dbc.execute("select time from shares where upstream_result = 1 order by time limit 1")
+       #data = self.dbc.fetchone()
+       #if data is None or (data[0] + settings.ARCHIVE_DELAY) > time.time() :
+       #    return False
+       return data[0]
+
+    def archive_found(self,found_time):
+       self.dbc.execute("insert into shares_archive_found select * from shares where upstream_result = 'Y' and time <= FROM_UNIXTIME(%s)", (found_time,))
+       self.dbh.commit()
+
+    def archive_to_db(self,found_time):
+       self.dbc.execute("insert into shares_archive select * from shares where time <= FROM_UNIXTIME(%s)",(found_time,))       
+       self.dbh.commit()
+
+    def archive_cleanup(self,found_time):
+       self.dbc.execute("delete from shares where time <= FROM_UNIXTIME(%s)",(found_time,))
+       self.dbh.commit()
+
+    def archive_get_shares(self,found_time):
+       #self.dbc.execute("select * from shares where time <= FROM_UNIXTIME(%s)",(found_time,))
+       return self.dbc
+
+    def import_shares(self,data):
+       log.debug("Importing Shares")
+#             0           1            2          3          4         5        6  7            8         9              10
+#      data: [worker_name,block_header,block_hash,difficulty,timestamp,is_valid,ip,block_height,prev_hash,invalid_reason,best_diff]
+       checkin_times = {}
+       total_shares = 0
+       best_diff = 0
+       for k,v in enumerate(data):
+               self.dbc.execute("insert into shares (rem_host,username,our_result,upstream_result,reason,share_diff) VALUES " +\
+                       "(%s,%s,%s,%s,%s,%s)",
+                       (v[6],v[0],v[5],'N',v[9],v[3]) )
+        
+       self.dbh.commit()
+
+
+    def found_block(self,data):
+       # Note: difficulty = -1 here
+       self.dbc.execute("update shares set upstream_result = %s, solution = %s where time = FROM_UNIXTIME(%s) and username = %s limit 1",
+               (data[5],data[2],data[4],data[0]))
+       self.dbh.commit()
+
+    def delete_user(self,username):
+       log.debug("Deleting Username")
+       self.dbc.execute("delete from pool_worker where username = %s",
+               (username ))
+       self.dbh.commit()
+
+    def insert_user(self,username,password):
+       log.debug("Adding Username/Password")
+       self.dbc.execute("insert into pool_worker (username,password) VALUES (%s,%s)",
+               (username, password ))
+       self.dbh.commit()
+
+    def update_user(self,username,password):
+       log.debug("Updating Username/Password")
+       self.dbc.execute("update pool_worker set password = %(pass)s where username = %(uname)s",
+               (username, password ))
+       self.dbh.commit()
+
+    def update_worker_diff(self,username,diff):
+        if settings.DATABASE_EXTEND == True :
+           self.dbc.execute("update pool_worker set difficulty = %s where username = %s",(diff,username))
+           self.dbh.commit()
+    
+    def clear_worker_diff(self):
+       if settings.DATABASE_EXTEND == True :
+           self.dbc.execute("update pool_worker set difficulty = 0")
+           self.dbh.commit()
+
+    def check_password(self,username,password):
+       log.debug("Checking Username/Password")
+       self.dbc.execute("select COUNT(*) from pool_worker where username = %s and password = %s",
+               (username, password ))
+       data = self.dbc.fetchone()
+       if data[0] > 0 :
+           return True
+       return False
+
+    def update_pool_info(self,pi):
+       self.dbc.executemany("update pool set value = %s where parameter = %s",[(pi['blocks'],"bitcoin_blocks"),
+               (pi['balance'],"bitcoin_balance"),
+               (pi['connections'],"bitcoin_connections"),
+               (pi['difficulty'],"bitcoin_difficulty"),
+               (time.time(),"bitcoin_infotime")
+               ])
+       self.dbh.commit()
+
+    def get_pool_stats(self):
+       self.dbc.execute("select * from pool")
+       ret = {}
+       for data in self.dbc.fetchall():
+           ret[data[0]] = data[1]
+       return ret
+
+    def get_workers_stats(self):
+       self.dbc.execute("select username,speed,last_checkin,total_shares,total_rejects,total_found,alive,difficulty from pool_worker")
+       ret = {}
+       for data in self.dbc.fetchall():
+           ret[data[0]] = { "username" : data[0],
+               "speed" : data[1],
+               "last_checkin" : time.mktime(data[2].timetuple()),
+               "total_shares" : data[3],
+               "total_rejects" : data[4],
+               "total_found" : data[5],
+               "alive" : data[6],
+               "difficulty" : data[7] }
+       return ret
+
+    def close(self):
+       self.dbh.close()
+
+    def check_tables(self):
+       log.debug("Checking Tables")
+
+       # Do we have our tables?
+       shares_exist = False
+       self.dbc.execute("select COUNT(*) from INFORMATION_SCHEMA.STATISTICS " +\
+               "where table_schema = %(schema)s and table_name = 'shares' and index_name = 'shares_username'",
+               {"schema": settings.DB_MYSQL_DBNAME })
+       data = self.dbc.fetchone()
+       if data[0] <= 0 :
+           self.update_version_1()     # no, we don't, so create them
+           
+       if settings.DATABASE_EXTEND == True :
+           self.update_tables()
+       
+    def update_tables(self):
+       version = 0
+       current_version = 6
+       while version < current_version :
+           self.dbc.execute("select value from pool where parameter = 'DB Version'")
+           data = self.dbc.fetchone()
+           version = int(data[0])
+           if version < current_version :
+               log.info("Updating Database from %i to %i" % (version, version +1))
+               getattr(self, 'update_version_' + str(version) )()
+
+    def update_version_1(self):
+       if settings.DATABASE_EXTEND == True :
+           self.dbc.execute("create table if not exists shares " +\
+               "(id serial primary key,time timestamp,rem_host TEXT, username TEXT, our_result BOOLEAN, upstream_result BOOLEAN, reason TEXT, solution TEXT, " +\
+               "block_num INTEGER, prev_block_hash TEXT, useragent TEXT, difficulty INTEGER) ENGINE = MYISAM;")
+           self.dbc.execute("create index shares_username ON shares(username(10))")
+
+           self.dbc.execute("create table if not exists pool_worker" +\
+               "(id serial primary key,username TEXT, password TEXT, speed INTEGER, last_checkin timestamp" +\
+               ") ENGINE = MYISAM")
+           self.dbc.execute("create index pool_worker_username ON pool_worker(username(10))")
+       
+           self.dbc.execute("create table if not exists pool(parameter TEXT, value TEXT)")
+           self.dbc.execute("alter table pool_worker add total_shares INTEGER default 0")
+           self.dbc.execute("alter table pool_worker add total_rejects INTEGER default 0")
+           self.dbc.execute("alter table pool_worker add total_found INTEGER default 0")
+           self.dbc.execute("insert into pool (parameter,value) VALUES ('DB Version',2)")
+
+       else :
+           self.dbc.execute("create table if not exists shares" + \
+               "(id serial,time timestamp,rem_host TEXT, username TEXT, our_result INTEGER, upstream_result INTEGER, reason TEXT, solution TEXT) ENGINE = MYISAM")
+           self.dbc.execute("create index shares_username ON shares(username(10))")
+           self.dbc.execute("create table if not exists pool_worker(id serial,username TEXT, password TEXT) ENGINE = MYISAM")
+           self.dbc.execute("create index pool_worker_username ON pool_worker(username(10))")
+       self.dbh.commit()
+                   
+
+    def update_version_2(self):
+       log.info("running update 2")
+       self.dbc.executemany("insert into pool (parameter,value) VALUES (%s,%s)",[('bitcoin_blocks',0),
+               ('bitcoin_balance',0),
+               ('bitcoin_connections',0),
+               ('bitcoin_difficulty',0),
+               ('pool_speed',0),
+               ('pool_total_found',0),
+               ('round_shares',0),
+               ('round_progress',0),
+               ('round_start',time.time())
+               ])
+       self.dbc.execute("update pool set value = 3 where parameter = 'DB Version'")
+       self.dbh.commit()
+       
+    def update_version_3(self):
+       log.info("running update 3")
+       self.dbc.executemany("insert into pool (parameter,value) VALUES (%s,%s)",[
+               ('round_best_share',0),
+               ('bitcoin_infotime',0)
+               ])
+       self.dbc.execute("alter table pool_worker add alive BOOLEAN")
+       self.dbc.execute("update pool set value = 4 where parameter = 'DB Version'")
+       self.dbh.commit()
+       
+    def update_version_4(self):
+       log.info("running update 4")
+       self.dbc.execute("alter table pool_worker add difficulty INTEGER default 0")
+       self.dbc.execute("create table if not exists shares_archive " +\
+               "(id serial primary key,time timestamp,rem_host TEXT, username TEXT, our_result BOOLEAN, upstream_result BOOLEAN, reason TEXT, solution TEXT, " +\
+               "block_num INTEGER, prev_block_hash TEXT, useragent TEXT, difficulty INTEGER) ENGINE = MYISAM;")
+       self.dbc.execute("create table if not exists shares_archive_found " +\
+               "(id serial primary key,time timestamp,rem_host TEXT, username TEXT, our_result BOOLEAN, upstream_result BOOLEAN, reason TEXT, solution TEXT, " +\
+               "block_num INTEGER, prev_block_hash TEXT, useragent TEXT, difficulty INTEGER) ENGINE = MYISAM;")
+       self.dbc.execute("update pool set value = 5 where parameter = 'DB Version'")
+       self.dbh.commit()
+
+    def update_version_5(self):
+       log.info("running update 5")
+       # Adding Primary key to table: pool
+       self.dbc.execute("alter table pool add primary key (parameter(100))")
+       self.dbh.commit()
+       # Adjusting indicies on table: shares
+       self.dbc.execute("DROP INDEX shares_username ON shares")
+       self.dbc.execute("CREATE INDEX shares_time_username ON shares(time,username(10))")
+       self.dbc.execute("CREATE INDEX shares_upstreamresult ON shares(upstream_result)")
+       self.dbh.commit()
+       
+       self.dbc.execute("update pool set value = 6 where parameter = 'DB Version'")
+       self.dbh.commit()
+
diff --git a/mining/DB_None.py b/mining/DB_None.py
new file mode 100644 (file)
index 0000000..eda5121
--- /dev/null
@@ -0,0 +1,48 @@
+import stratum.logger
+log = stratum.logger.get_logger('None')
+                
+class DB_None():
+    def __init__(self):
+       log.debug("Connecting to DB")
+
+    def updateStats(self,averageOverTime):
+       log.debug("Updating Stats")
+
+    def import_shares(self,data):
+       log.debug("Importing Shares")
+
+    def found_block(self,data):
+       log.debug("Found Block")
+
+    def delete_user(self,username):
+       log.debug("Deleting Username")
+
+    def insert_user(self,username,password):
+       log.debug("Adding Username/Password")
+
+    def update_user(self,username,password):
+       log.debug("Updating Username/Password")
+
+    def check_password(self,username,password):
+       log.debug("Checking Username/Password")
+       return True
+    
+    def update_pool_info(self,pi):
+       log.debug("Update Pool Info")
+
+    def get_pool_stats(self):
+       log.debug("Get Pool Stats")
+       ret = {}
+       return ret
+
+    def get_workers_stats(self):
+       log.debug("Get Workers Stats")
+       ret = {}
+       return ret
+
+    def check_tables(self):
+       log.debug("Checking Tables")
+
+    def close(self):
+       log.debug("Close Connection")
+       
diff --git a/mining/basic_share_limiter.py b/mining/basic_share_limiter.py
new file mode 100644 (file)
index 0000000..6893a12
--- /dev/null
@@ -0,0 +1,117 @@
+from stratum import settings
+
+import stratum.logger
+log = stratum.logger.get_logger('BasicShareLimiter')
+
+import DBInterface
+dbi = DBInterface.DBInterface()
+dbi.clear_worker_diff()
+
+from twisted.internet import defer
+from mining.interfaces import Interfaces
+import time
+
+''' This is just a customized ring buffer '''
+class SpeedBuffer:
+       def __init__(self, size_max):
+               self.max = size_max
+               self.data = []
+               self.cur = 0
+       def append(self, x):
+               self.data.append(x)
+               self.cur += 1
+               if len(self.data) == self.max:
+                       self.cur = 0
+                       self.__class__ = SpeedBufferFull
+       def avg(self):
+               return sum(self.data) / self.cur
+       def pos(self):
+               return self.cur
+       def clear(self):
+               self.data = []
+               self.cur = 0
+       def size(self):
+               return self.cur
+
+class SpeedBufferFull:
+       def __init__(self, n):
+               raise "you should use SpeedBuffer"
+       def append(self, x):            
+               self.data[self.cur] = x
+               self.cur = (self.cur + 1) % self.max
+       def avg(self):
+               return sum(self.data) / self.max
+       def pos(self):
+               return self.cur
+       def clear(self):
+               self.data = []
+               self.cur = 0
+               self.__class__ = SpeedBuffer
+       def size(self):
+               return self.max
+
+class BasicShareLimiter(object):
+    def __init__(self):
+       self.worker_stats = {}
+       self.target = settings.VDIFF_TARGET
+       self.retarget = settings.VDIFF_RETARGET
+       self.variance = self.target * (float(settings.VDIFF_VARIANCE_PERCENT) / float(100))
+       self.tmin = self.target - self.variance
+       self.tmax = self.target + self.variance
+       self.buffersize = self.retarget / self.target * 4
+       # TODO: trim the hash of inactive workers
+
+    def submit(self, connection_ref, job_id, current_difficulty, timestamp, worker_name):
+       ts = int(timestamp)
+       # Init the stats for this worker if it isn't set.       
+        if worker_name not in self.worker_stats or self.worker_stats[worker_name]['last_ts'] < ts - settings.DB_USERCACHE_TIME :
+            self.worker_stats[worker_name] = {'last_rtc': (ts - self.retarget / 2), 'last_ts': ts, 'buffer': SpeedBuffer(self.buffersize) }
+            dbi.update_worker_diff(worker_name, settings.POOL_TARGET)
+            return
+       
+       # Standard share update of data
+       self.worker_stats[worker_name]['buffer'].append(ts - self.worker_stats[worker_name]['last_ts'])
+       self.worker_stats[worker_name]['last_ts'] = ts
+       # Do We retarget? If not, we're done.
+       if ts - self.worker_stats[worker_name]['last_rtc'] < self.retarget and self.worker_stats[worker_name]['buffer'].size() > 0:
+           return
+
+       # Set up and log our check
+       self.worker_stats[worker_name]['last_rtc'] = ts
+       avg = self.worker_stats[worker_name]['buffer'].avg()
+       log.info("Checking Retarget for %s (%i) avg. %i target %i+-%i" % (worker_name, current_difficulty, avg,
+               self.target, self.variance))
+       if avg < 1:
+           log.info("Reseting avg = 1 since it's SOOO low")
+           avg = 1
+
+       # Figure out our Delta-Diff
+       ddiff = int((float(current_difficulty) * (float(self.target) / float(avg))) - current_difficulty)
+       if (avg > self.tmax and current_difficulty > settings.POOL_TARGET):
+           # For fractional -0.1 ddiff's just drop by 1
+           if ddiff > -1:
+               ddiff = -1
+           # Don't drop below POOL_TARGET
+           if (ddiff + current_difficulty) < settings.POOL_TARGET:
+               ddiff = settings.POOL_TARGET - current_difficulty
+       elif avg < self.tmin:
+           # For fractional 0.1 ddiff's just up by 1
+           if ddiff < 1:
+               ddiff = 1
+           # Don't go above BITCOIN_DIFF
+           # TODO
+       else:  # If we are here, then we should not be retargeting.
+           return
+
+       # At this point we are retargeting this worker
+       new_diff = current_difficulty + ddiff
+       log.info("Retarget for %s %i old: %i new: %i" % (worker_name, ddiff, current_difficulty, new_diff))
+
+       self.worker_stats[worker_name]['buffer'].clear()
+        session = connection_ref().get_session()
+       session['prev_diff'] = session['difficulty']
+       session['prev_jobid'] = job_id
+       session['difficulty'] = new_diff
+       connection_ref().rpc('mining.set_difficulty', [new_diff, ], is_notification=True)
+       dbi.update_worker_diff(worker_name, new_diff)
+
index 40f1d2f..5bbe68a 100644 (file)
@@ -2,51 +2,65 @@
    Default implementation do almost nothing, you probably want to override these classes
    and customize references to interface instances in your launcher.
    (see launcher_demo.tac for an example).
-'''
-
+''' 
+from stratum import settings
 import time
 from twisted.internet import reactor, defer
+from lib.util import b58encode
 
 import stratum.logger
 log = stratum.logger.get_logger('interfaces')
 
+import DBInterface
+dbi = DBInterface.DBInterface()
+dbi.init_main()
+
 class WorkerManagerInterface(object):
     def __init__(self):
         # Fire deferred when manager is ready
         self.on_load = defer.Deferred()
         self.on_load.callback(True)
-
+        
     def authorize(self, worker_name, worker_password):
-        return True
+       # Important NOTE: This is called on EVERY submitted share. So you'll need caching!!!
+       return dbi.check_password(worker_name,worker_password)
+
 
 class ShareLimiterInterface(object):
     '''Implement difficulty adjustments here'''
-
-    def submit(self, connection_ref, current_difficulty, timestamp):
+    
+    def submit(self, connection_ref, job_id, current_difficulty, timestamp, worker_name):
         '''connection - weak reference to Protocol instance
            current_difficulty - difficulty of the connection
            timestamp - submission time of current share
-
+           
            - raise SubmitException for stop processing this request
            - call mining.set_difficulty on connection to adjust the difficulty'''
-        pass
-
+       return dbi.update_worker_diff(worker_name,settings.POOL_TARGET)
 class ShareManagerInterface(object):
     def __init__(self):
         # Fire deferred when manager is ready
         self.on_load = defer.Deferred()
         self.on_load.callback(True)
+       self.block_height = 0
+       self.prev_hash = 0
 
-    def on_network_block(self, prevhash):
+    def on_network_block(self, prevhash, block_height):
         '''Prints when there's new block coming from the network (possibly new round)'''
+        self.block_height = block_height
+       self.prev_hash = b58encode(int(prevhash,16))
         pass
-
-    def on_submit_share(self, worker_name, block_header, block_hash, shares, timestamp, is_valid):
-        log.info("%s %s %s" % (block_hash, 'valid' if is_valid else 'INVALID', worker_name))
-
-    def on_submit_block(self, is_accepted, worker_name, block_header, block_hash, timestamp):
+    
+    def on_submit_share(self, worker_name, block_header, block_hash, difficulty, timestamp, is_valid, ip, invalid_reason, share_diff ):
+        log.info("%s (%s) %s %s" % (block_hash, share_diff, 'valid' if is_valid else 'INVALID', worker_name))
+       dbi.queue_share([worker_name,block_header,block_hash,difficulty,timestamp,is_valid, ip, self.block_height, self.prev_hash, 
+               invalid_reason, share_diff ])
+    def on_submit_block(self, is_accepted, worker_name, block_header, block_hash, timestamp, ip, share_diff ):
         log.info("Block %s %s" % (block_hash, 'ACCEPTED' if is_accepted else 'REJECTED'))
-
+       dbi.found_block([worker_name,block_header,block_hash,-1,timestamp,is_accepted,ip,self.block_height, self.prev_hash, share_diff ])
+    
 class TimestamperInterface(object):
     '''This is the only source for current time in the application.
     Override this for generating unix timestamp in different way.'''
@@ -57,34 +71,35 @@ class PredictableTimestamperInterface(TimestamperInterface):
     '''Predictable timestamper may be useful for unit testing.'''
     start_time = 1345678900 # Some day in year 2012
     delta = 0
-
+    
     def time(self):
         self.delta += 1
         return self.start_time + self.delta
-
+        
 class Interfaces(object):
     worker_manager = None
     share_manager = None
     share_limiter = None
     timestamper = None
     template_registry = None
-
+    
     @classmethod
     def set_worker_manager(cls, manager):
-        cls.worker_manager = manager
-
-    @classmethod
+        cls.worker_manager = manager    
+    
+    @classmethod        
     def set_share_manager(cls, manager):
         cls.share_manager = manager
 
-    @classmethod
+    @classmethod        
     def set_share_limiter(cls, limiter):
         cls.share_limiter = limiter
-
+    
     @classmethod
     def set_timestamper(cls, manager):
         cls.timestamper = manager
-
+        
     @classmethod
     def set_template_registry(cls, registry):
+       dbi.set_bitcoinrpc(registry.bitcoin_rpc)
         cls.template_registry = registry
index 4242542..0a368f4 100644 (file)
@@ -1,6 +1,7 @@
 import binascii
 from twisted.internet import defer
 
+from stratum import settings
 from stratum.services import GenericService, admin
 from stratum.pubsub import Pubsub
 from interfaces import Interfaces
@@ -9,78 +10,78 @@ from lib.exceptions import SubmitException
 
 import stratum.logger
 log = stratum.logger.get_logger('mining')
-
+                
 class MiningService(GenericService):
     '''This service provides public API for Stratum mining proxy
     or any Stratum-compatible miner software.
-
+    
     Warning - any callable argument of this class will be propagated
     over Stratum protocol for public audience!'''
-
+    
     service_type = 'mining'
     service_vendor = 'stratum'
     is_default = True
-
+    
     @admin
     def update_block(self):
-        '''Connect this RPC call to 'novacoind -blocknotify' for
+        '''Connect this RPC call to 'bitcoind -blocknotify' for 
         instant notification about new block on the network.
         See blocknotify.sh in /scripts/ for more info.'''
-
+        
         log.info("New block notification received")
         Interfaces.template_registry.update_block()
-        return True
-
+        return True 
+    
     def authorize(self, worker_name, worker_password):
         '''Let authorize worker on this connection.'''
-
+        
         session = self.connection_ref().get_session()
         session.setdefault('authorized', {})
-
+        
         if Interfaces.worker_manager.authorize(worker_name, worker_password):
             session['authorized'][worker_name] = worker_password
             return True
-
+        
         else:
             if worker_name in session['authorized']:
                 del session['authorized'][worker_name]
             return False
-
-    def subscribe(self, *args):
+        
+    def subscribe(self,*arg):
         '''Subscribe for receiving mining jobs. This will
         return subscription details, extranonce1_hex and extranonce2_size'''
-
+        
         extranonce1 = Interfaces.template_registry.get_new_extranonce1()
         extranonce2_size = Interfaces.template_registry.extranonce2_size
         extranonce1_hex = binascii.hexlify(extranonce1)
-
+        
         session = self.connection_ref().get_session()
         session['extranonce1'] = extranonce1
-        session['difficulty'] = 1 # Following protocol specs, default diff is 1
+        session['difficulty'] = settings.POOL_TARGET # Following protocol specs, default diff is 1
 
         return Pubsub.subscribe(self.connection_ref(), MiningSubscription()) + (extranonce1_hex, extranonce2_size)
-
-    '''
+    
+    '''    
     def submit(self, worker_name, job_id, extranonce2, ntime, nonce):
         import time
         start = time.time()
-
+        
         for x in range(100):
             try:
                 ret = self.submit2(worker_name, job_id, extranonce2, ntime, nonce)
             except:
                 pass
-
+            
         log.info("LEN %.03f" % (time.time() - start))
         return ret
     '''
-
+        
     def submit(self, worker_name, job_id, extranonce2, ntime, nonce):
         '''Try to solve block candidate using given parameters.'''
-
+        
         session = self.connection_ref().get_session()
         session.setdefault('authorized', {})
-
+        
         # Check if worker is authorized to submit shares
         if not Interfaces.worker_manager.authorize(worker_name,
                         session['authorized'].get(worker_name)):
@@ -90,46 +91,47 @@ class MiningService(GenericService):
         extranonce1_bin = session.get('extranonce1', None)
         if not extranonce1_bin:
             raise SubmitException("Connection is not subscribed for mining")
-
+        
         difficulty = session['difficulty']
         submit_time = Interfaces.timestamper.time()
-
-        Interfaces.share_limiter.submit(self.connection_ref, difficulty, submit_time)
-
+        ip = self.connection_ref()._get_ip()
+    
+        Interfaces.share_limiter.submit(self.connection_ref, job_id, difficulty, submit_time, worker_name)
+            
         # This checks if submitted share meet all requirements
         # and it is valid proof of work.
         try:
-            (block_header, block_hash, on_submit) = Interfaces.template_registry.submit_share(job_id,
-                                                worker_name, extranonce1_bin, extranonce2, ntime, nonce, difficulty)
-        except SubmitException:
+            (block_header, block_hash, share_diff, on_submit) = Interfaces.template_registry.submit_share(job_id,
+                                                worker_name, session, extranonce1_bin, extranonce2, ntime, nonce, difficulty)
+        except SubmitException as e:
             # block_header and block_hash are None when submitted data are corrupted
             Interfaces.share_manager.on_submit_share(worker_name, None, None, difficulty,
-                                                 submit_time, False)
+                                                 submit_time, False, ip, e[0], 0)    
             raise
-
-
+            
+             
         Interfaces.share_manager.on_submit_share(worker_name, block_header, block_hash, difficulty,
-                                                 submit_time, True)
-
+                                                 submit_time, True, ip, '', share_diff)
+        
         if on_submit != None:
-            # Pool performs submitblock() to novacoind. Let's hook
+            # Pool performs submitblock() to bitcoind. Let's hook
             # to result and report it to share manager
             on_submit.addCallback(Interfaces.share_manager.on_submit_block,
-                        worker_name, block_header, block_hash, submit_time)
+                        worker_name, block_header, block_hash, submit_time,ip,share_diff)
 
         return True
-
+            
     # Service documentation for remote discovery
     update_block.help_text = "Notify Stratum server about new block on the network."
     update_block.params = [('password', 'string', 'Administrator password'),]
-
+    
     authorize.help_text = "Authorize worker for submitting shares on this connection."
     authorize.params = [('worker_name', 'string', 'Name of the worker, usually in the form of user_login.worker_id.'),
                         ('worker_password', 'string', 'Worker password'),]
-
+    
     subscribe.help_text = "Subscribes current connection for receiving new mining jobs."
     subscribe.params = []
-
+    
     submit.help_text = "Submit solved share back to the server. Excessive sending of invalid shares "\
                        "or shares above indicated target (see Stratum mining docs for set_target()) may lead "\
                        "to temporary or permanent ban of user,worker or IP address."
@@ -138,4 +140,4 @@ class MiningService(GenericService):
                      ('extranonce2', 'string', 'hex-encoded big-endian extranonce2, length depends on extranonce2_size from mining.notify.'),
                      ('ntime', 'string', 'UNIX timestamp (32bit integer, big-endian, hex-encoded), must be >= ntime provided by mining,notify and <= current time'),
                      ('nonce', 'string', '32bit integer, hex-encoded, big-endian'),]
-
+        
index 3eea5cc..41c77d0 100644 (file)
@@ -1,54 +1,55 @@
 from stratum.pubsub import Pubsub, Subscription
 from mining.interfaces import Interfaces
 
+from stratum import settings
 import stratum.logger
 log = stratum.logger.get_logger('subscription')
 
 class MiningSubscription(Subscription):
     '''This subscription object implements
     logic for broadcasting new jobs to the clients.'''
-
+    
     event = 'mining.notify'
-
+    
     @classmethod
     def on_template(cls, is_new_block):
         '''This is called when TemplateRegistry registers
            new block which we have to broadcast clients.'''
-
+        
         start = Interfaces.timestamper.time()
-
+        
         clean_jobs = is_new_block
         (job_id, prevhash, coinb1, coinb2, merkle_branch, version, nbits, ntime, _) = \
                         Interfaces.template_registry.get_last_broadcast_args()
-
+        
         # Push new job to subscribed clients
         cls.emit(job_id, prevhash, coinb1, coinb2, merkle_branch, version, nbits, ntime, clean_jobs)
-
+        
         cnt = Pubsub.get_subscription_count(cls.event)
         log.info("BROADCASTED to %d connections in %.03f sec" % (cnt, (Interfaces.timestamper.time() - start)))
-
+        
     def _finish_after_subscribe(self, result):
         '''Send new job to newly subscribed client'''
-        try:
+        try:        
             (job_id, prevhash, coinb1, coinb2, merkle_branch, version, nbits, ntime, _) = \
                         Interfaces.template_registry.get_last_broadcast_args()
         except Exception:
             log.error("Template not ready yet")
             return result
-
+        
         # Force set higher difficulty
-        # TODO
-        #self.connection_ref().rpc('mining.set_difficulty', [2,], is_notification=True)
+        self.connection_ref().rpc('mining.set_difficulty', [settings.POOL_TARGET,], is_notification=True)
         #self.connection_ref().rpc('client.get_version', [])
-
+        
         # Force client to remove previous jobs if any (eg. from previous connection)
         clean_jobs = True
         self.emit_single(job_id, prevhash, coinb1, coinb2, merkle_branch, version, nbits, ntime, True)
-
+        
         return result
-
+                
     def after_subscribe(self, *args):
         '''This will send new job to the client *after* he receive subscription details.
         on_finish callback solve the issue that job is broadcasted *during*
         the subscription request and client receive messages in wrong order.'''
         self.connection_ref().on_finish.addCallback(self._finish_after_subscribe)
+