From: kha0S Date: Thu, 20 Feb 2014 00:11:15 +0000 (+0000) Subject: Mysql share reporting fixed. Update config template. X-Git-Url: https://git.novaco.in/?p=stratum-mining.git;a=commitdiff_plain;h=adec2324c5f5a612a864453b7cd2cb5f17c5cfa8 Mysql share reporting fixed. Update config template. --- diff --git a/conf/config_sample.py b/conf/config_sample.py index b57346a..e45988b 100644 --- a/conf/config_sample.py +++ b/conf/config_sample.py @@ -1,6 +1,6 @@ ''' This is example configuration for Stratum server. -Please rename it to settings.py and fill correct values. +Please rename it to config.py and fill correct values. ''' # ******************** GENERAL SETTINGS *************** @@ -58,17 +58,45 @@ ADMIN_PASSWORD_SHA256 = None IRC_NICK = None -''' -DATABASE_DRIVER = 'MySQLdb' -DATABASE_HOST = 'localhost' -DATABASE_DBNAME = 'pooldb' -DATABASE_USER = 'pooldb' -DATABASE_PASSWORD = '**empty**' -''' + +DATABASE_DRIVER = 'mysql' +DATABASE_EXTEND = False # False = pushpool db layout, True = pushpool + extra columns +DB_MYSQL_HOST = 'localhost' +DB_MYSQL_DBNAME = 'pooldb' +DB_MYSQL_USER = 'pooldb' +DB_MYSQL_PASS = '**empty**' + # Pool related settings INSTANCE_ID = 31 CENTRAL_WALLET = '4WpFe4iTc8zC3UHAzdQX6w9BcRuXFxvPqm' # local novacoin address where money goes PREVHASH_REFRESH_INTERVAL = 5 # in sec MERKLE_REFRESH_INTERVAL = 60 # How often check memorypool -COINBASE_EXTRAS = '/stratum/' +COINBASE_EXTRAS = '' + +# ******************** Pool Difficulty Settings ********************* +# Again, Don't change unless you know what this is for. + +# Pool Target (Base Difficulty) +POOL_TARGET = 32 # Pool-wide difficulty target int >= 1 + +# Variable Difficulty Enable +VARIABLE_DIFF = False # Master variable difficulty enable + +# Variable diff tuning variables +VDIFF_TARGET = 15 # Target time per share (i.e. try to get 1 share per this many seconds) +VDIFF_RETARGET = 120 # Check to see if we should retarget this often +VDIFF_VARIANCE_PERCENT = 50 # Allow average time to very this % from target without retarget + +# ******************** Adv. DB Settings ********************* +# Don't change these unless you know what you are doing + +DB_LOADER_CHECKTIME = 15 # How often we check to see if we should run the loader +DB_LOADER_REC_MIN = 10 # Min Records before the bulk loader fires +DB_LOADER_REC_MAX = 20 # Max Records the bulk loader will commit at a time + +DB_STATS_AVG_TIME = 30 # When using the DATABASE_EXTEND option, average speed over X sec + # Note: this is also how often it updates +DB_USERCACHE_TIME = 600 # How long the usercache is good for before we refresh + +USERS_AUTOADD = False diff --git a/lib/template_registry.py b/lib/template_registry.py index bd1bca7..e367d73 100644 --- a/lib/template_registry.py +++ b/lib/template_registry.py @@ -16,56 +16,56 @@ class JobIdGenerator(object): '''Generate pseudo-unique job_id. It does not need to be absolutely unique, because pool sends "clean_jobs" flag to clients and they should drop all previous jobs.''' counter = 0 - + @classmethod def get_new_id(cls): cls.counter += 1 if cls.counter % 0xffff == 0: cls.counter = 1 return "%x" % cls.counter - + class TemplateRegistry(object): '''Implements the main logic of the pool. Keep track on valid block templates, provide internal interface for stratum service and implements block validation and submits.''' - + def __init__(self, block_template_class, coinbaser, bitcoin_rpc, instance_id, on_template_callback, on_block_callback): self.prevhashes = {} self.jobs = weakref.WeakValueDictionary() - + self.extranonce_counter = ExtranonceCounter(instance_id) self.extranonce2_size = block_template_class.coinbase_transaction_class.extranonce_size \ - self.extranonce_counter.get_size() - + self.coinbaser = coinbaser self.block_template_class = block_template_class self.bitcoin_rpc = bitcoin_rpc self.on_block_callback = on_block_callback self.on_template_callback = on_template_callback - + self.last_block = None self.update_in_progress = False self.last_update = None - + # Create first block template on startup self.update_block() - + def get_new_extranonce1(self): '''Generates unique extranonce1 (e.g. for newly subscribed connection.''' return self.extranonce_counter.get_new_bin() - + def get_last_broadcast_args(self): '''Returns arguments for mining.notify from last known template.''' return self.last_block.broadcast_args - - def add_template(self, block): + + def add_template(self, block,block_height): '''Adds new template to the registry. It also clean up templates which should not be used anymore.''' - + prevhash = block.prevhash_hex if prevhash in self.prevhashes.keys(): @@ -73,73 +73,73 @@ class TemplateRegistry(object): else: new_block = True self.prevhashes[prevhash] = [] - + # Blocks sorted by prevhash, so it's easy to drop # them on blockchain update self.prevhashes[prevhash].append(block) - + # Weak reference for fast lookup using job_id self.jobs[block.job_id] = block - + # Use this template for every new request self.last_block = block - + # Drop templates of obsolete blocks for ph in self.prevhashes.keys(): if ph != prevhash: del self.prevhashes[ph] - + log.info("New template for %s" % prevhash) if new_block: # Tell the system about new block # It is mostly important for share manager - self.on_block_callback(prevhash) + self.on_block_callback(prevhash, block_height) # Everything is ready, let's broadcast jobs! self.on_template_callback(new_block) - + #from twisted.internet import reactor - #reactor.callLater(10, self.on_block_callback, new_block) - + #reactor.callLater(10, self.on_block_callback, new_block) + def update_block(self): '''Registry calls the getblocktemplate() RPC and build new block template.''' - + if self.update_in_progress: # Block has been already detected return - + self.update_in_progress = True self.last_update = Interfaces.timestamper.time() - + d = self.bitcoin_rpc.getblocktemplate() d.addCallback(self._update_block) d.addErrback(self._update_block_failed) - + def _update_block_failed(self, failure): log.error(str(failure)) self.update_in_progress = False - + def _update_block(self, data): start = Interfaces.timestamper.time() - + template = self.block_template_class(Interfaces.timestamper, self.coinbaser, JobIdGenerator.get_new_id()) template.fill_from_rpc(data) - self.add_template(template) + self.add_template(template,data['height']) log.info("Update finished, %.03f sec, %d txes" % \ (Interfaces.timestamper.time() - start, len(template.vtx))) - - self.update_in_progress = False + + self.update_in_progress = False return data - + def diff_to_target(self, difficulty): '''Converts difficulty to target''' diff1 = 0x0000ffff00000000000000000000000000000000000000000000000000000000 return diff1 / difficulty - + def get_job(self, job_id): '''For given job_id returns BlockTemplate instance or None''' try: @@ -147,85 +147,87 @@ class TemplateRegistry(object): except: log.info("Job id '%s' not found" % job_id) return None - + # Now we have to check if job is still valid. # Unfortunately weak references are not bulletproof and # old reference can be found until next run of garbage collector. if j.prevhash_hex not in self.prevhashes: log.info("Prevhash of job '%s' is unknown" % job_id) return None - + if j not in self.prevhashes[j.prevhash_hex]: log.info("Job %s is unknown" % job_id) return None - + return j - - def submit_share(self, job_id, worker_name, extranonce1_bin, extranonce2, ntime, nonce, + + def submit_share(self, job_id, worker_name, session, extranonce1_bin, extranonce2, ntime, nonce, difficulty): '''Check parameters and finalize block template. If it leads to valid block candidate, asynchronously submits the block - back to the novacoin network. - + back to the bitcoin network. + - extranonce1_bin is binary. No checks performed, it should be from session data - job_id, extranonce2, ntime, nonce - in hex form sent by the client - difficulty - decimal number from session, again no checks performed - submitblock_callback - reference to method which receive result of submitblock() ''' - + # Check if extranonce2 looks correctly. extranonce2 is in hex form... if len(extranonce2) != self.extranonce2_size * 2: raise SubmitException("Incorrect size of extranonce2. Expected %d chars" % (self.extranonce2_size*2)) - + # Check for job job = self.get_job(job_id) if job == None: raise SubmitException("Job '%s' not found" % job_id) - + # Check if ntime looks correct if len(ntime) != 8: raise SubmitException("Incorrect size of ntime. Expected 8 chars") if not job.check_ntime(int(ntime, 16)): raise SubmitException("Ntime out of range") - - # Check nonce + + # Check nonce if len(nonce) != 8: raise SubmitException("Incorrect size of nonce. Expected 8 chars") - + # Check for duplicated submit if not job.register_submit(extranonce1_bin, extranonce2, ntime, nonce): log.info("Duplicate from %s, (%s %s %s %s)" % \ (worker_name, binascii.hexlify(extranonce1_bin), extranonce2, ntime, nonce)) raise SubmitException("Duplicate share") - + # Now let's do the hard work! # --------------------------- - + # 0. Some sugar extranonce2_bin = binascii.unhexlify(extranonce2) ntime_bin = binascii.unhexlify(ntime) nonce_bin = binascii.unhexlify(nonce) - + # 1. Build coinbase coinbase_bin = job.serialize_coinbase(extranonce1_bin, extranonce2_bin) coinbase_hash = util.doublesha(coinbase_bin) - + # 2. Calculate merkle root merkle_root_bin = job.merkletree.withFirst(coinbase_hash) merkle_root_int = util.uint256_from_str(merkle_root_bin) - + # 3. Serialize header with given merkle, ntime and nonce header_bin = job.serialize_header(merkle_root_int, ntime_bin, nonce_bin) - + # 4. Reverse header and compare it with target of the user hash_bin = util.scrypt(''.join([ header_bin[i*4:i*4+4][::-1] for i in range(0, 20) ])) hash_int = util.uint256_from_str(hash_bin) block_hash_hex = "%064x" % hash_int header_hex = binascii.hexlify(header_bin) - - target_user = self.diff_to_target(difficulty) - if hash_int > target_user: + + target_user = self.diff_to_target(difficulty) + if hash_int > target_user and \ + ( 'prev_jobid' not in session or session['prev_jobid'] < job_id \ + or 'prev_diff' not in session or hash_int > self.diff_to_target(session['prev_diff']) ): raise SubmitException("Share is above target") # Mostly for debugging purposes @@ -233,22 +235,25 @@ class TemplateRegistry(object): if hash_int <= target_info: log.info("Yay, share with diff above 100000") - # 5. Compare hash with target of the network + # Algebra tells us the diff_to_target is the same as hash_to_diff + share_diff = int(self.diff_to_target(hash_int)) + + # 5. Compare hash with target of the network if hash_int <= job.target: - # Yay! It is block candidate! + # Yay! It is block candidate! log.info("We found a block candidate! %s" % block_hash_hex) - - # 6. Finalize and serialize block object + + # 6. Finalize and serialize block object job.finalize(merkle_root_int, extranonce1_bin, extranonce2_bin, int(ntime, 16), int(nonce, 16)) - + if not job.is_valid(): # Should not happen log.error("Final job validation failed!") - + # 7. Submit block to the network serialized = binascii.hexlify(job.serialize()) on_submit = self.bitcoin_rpc.submitblock(serialized) - - return (header_hex, block_hash_hex, on_submit) - - return (header_hex, block_hash_hex, None) + + return (header_hex, block_hash_hex, share_diff, on_submit) + + return (header_hex, block_hash_hex, share_diff, None) diff --git a/lib/util.py b/lib/util.py index 12b4593..d9570c7 100644 --- a/lib/util.py +++ b/lib/util.py @@ -143,6 +143,17 @@ def b58decode(v, length): return result +def b58encode(value): + """ encode integer 'value' as a base58 string; returns string + """ + encoded = '' + while value >= __b58base: + div, mod = divmod(value, __b58base) + encoded = __b58chars[mod] + encoded # add to left + value = div + encoded = __b58chars[value] + encoded # most significant remainder + return encoded + def reverse_hash(h): # This only revert byte order, nothing more if len(h) != 64: diff --git a/mining/DBInterface.py b/mining/DBInterface.py new file mode 100644 index 0000000..4152e6e --- /dev/null +++ b/mining/DBInterface.py @@ -0,0 +1,222 @@ +from twisted.internet import reactor, defer +import time +from datetime import datetime +import Queue + +from stratum import settings + +import stratum.logger +log = stratum.logger.get_logger('DBInterface') + +class DBInterface(): + def __init__(self): + self.dbi = self.connectDB() + + def init_main(self): + self.dbi.check_tables() + + self.q = Queue.Queue() + self.queueclock = None + + self.usercache = {} + self.clearusercache() + + self.nextStatsUpdate = 0 + + self.scheduleImport() + + def set_bitcoinrpc(self,bitcoinrpc): + self.bitcoinrpc=bitcoinrpc + + def connectDB(self): + # Choose our database driver and put it in self.dbi + if settings.DATABASE_DRIVER == "sqlite": + log.debug('DB_Sqlite INIT') + import DB_Sqlite + return DB_Sqlite.DB_Sqlite() + elif settings.DATABASE_DRIVER == "mysql": + log.debug('DB_Mysql INIT') + import DB_Mysql + return DB_Mysql.DB_Mysql() + elif settings.DATABASE_DRIVER == "postgresql": + log.debug('DB_Postgresql INIT') + import DB_Postgresql + return DB_Postgresql.DB_Postgresql() + elif settings.DATABASE_DRIVER == "none": + log.debug('DB_None INIT') + import DB_None + return DB_None.DB_None() + else: + log.error('Invalid DATABASE_DRIVER -- using NONE') + log.debug('DB_None INIT') + import DB_None + return DB_None.DB_None() + + def clearusercache(self): + self.usercache = {} + self.usercacheclock = reactor.callLater( settings.DB_USERCACHE_TIME , self.clearusercache) + + def scheduleImport(self): + # This schedule's the Import + use_thread = True + if settings.DATABASE_DRIVER == "sqlite": + use_thread = False + + if use_thread: + self.queueclock = reactor.callLater( settings.DB_LOADER_CHECKTIME , self.run_import_thread) + else: + self.queueclock = reactor.callLater( settings.DB_LOADER_CHECKTIME , self.run_import) + + def run_import_thread(self): + if self.q.qsize() >= settings.DB_LOADER_REC_MIN: # Don't incur thread overhead if we're not going to run + reactor.callInThread(self.import_thread) + self.scheduleImport() + + def run_import(self): + self.do_import(self.dbi,False) + if settings.DATABASE_EXTEND and time.time() > self.nextStatsUpdate : + self.nextStatsUpdate = time.time() + settings.DB_STATS_AVG_TIME + self.dbi.updateStats(settings.DB_STATS_AVG_TIME) + d = self.bitcoinrpc.getinfo() + d.addCallback(self._update_pool_info) + if settings.ARCHIVE_SHARES : + self.archive_shares(self.dbi) + self.scheduleImport() + + def import_thread(self): + # Here we are in the thread. + dbi = self.connectDB() + self.do_import(dbi,False) + if settings.DATABASE_EXTEND and time.time() > self.nextStatsUpdate : + self.nextStatsUpdate = time.time() + settings.DB_STATS_AVG_TIME + dbi.updateStats(settings.DB_STATS_AVG_TIME) + d = self.bitcoinrpc.getinfo() + d.addCallback(self._update_pool_info) + if settings.ARCHIVE_SHARES : + self.archive_shares(dbi) + dbi.close() + + def _update_pool_info(self,data): + self.dbi.update_pool_info({ 'blocks' : data['blocks'], 'balance' : data['balance'], + 'connections' : data['connections'], 'difficulty' : data['difficulty'] }) + + def do_import(self,dbi,force): + # Only run if we have data + while force == True or self.q.qsize() >= settings.DB_LOADER_REC_MIN: + force = False + # Put together the data we want to import + sqldata = [] + datacnt = 0 + while self.q.empty() == False and datacnt < settings.DB_LOADER_REC_MAX : + datacnt += 1 + data = self.q.get() + sqldata.append(data) + self.q.task_done() + # try to do the import, if we fail, log the error and put the data back in the queue + try: + log.info("Inserting %s Share Records",datacnt) + dbi.import_shares(sqldata) + except Exception as e: + log.error("Insert Share Records Failed: %s", e.args[0]) + for k,v in enumerate(sqldata): + self.q.put(v) + break # Allows us to sleep a little + + def archive_shares(self,dbi): + found_time = dbi.archive_check() + if found_time == 0: + return False + log.info("Archiving shares newer than timestamp %f " % found_time) + dbi.archive_found(found_time) + if settings.ARCHIVE_MODE == 'db': + dbi.archive_to_db(found_time) + dbi.archive_cleanup(found_time) + elif settings.ARCHIVE_MODE == 'file': + shares = dbi.archive_get_shares(found_time) + + filename = settings.ARCHIVE_FILE + if settings.ARCHIVE_FILE_APPEND_TIME : + filename = filename + "-" + datetime.fromtimestamp(found_time).strftime("%Y-%m-%d-%H-%M-%S") + filename = filename + ".csv" + + if settings.ARCHIVE_FILE_COMPRESS == 'gzip' : + import gzip + filename = filename + ".gz" + filehandle = gzip.open(filename, 'a') + elif settings.ARCHIVE_FILE_COMPRESS == 'bzip2' and settings.ARCHIVE_FILE_APPEND_TIME : + import bz2 + filename = filename + ".bz2" + filehandle = bz2.BZFile(filename, mode='wb', buffering=4096 ) + else: + filehandle = open(filename, "a") + + while True: + row = shares.fetchone() + if row == None: + break + str1 = '","'.join([str(x) for x in row]) + filehandle.write('"%s"\n' % str1) + filehandle.close() + + clean = False + while not clean: + try: + dbi.archive_cleanup(found_time) + clean = True + except Exception as e: + clean = False + log.error("Archive Cleanup Failed... will retry to cleanup in 30 seconds") + sleep(30) + + return True + + def queue_share(self,data): + self.q.put( data ) + + def found_block(self,data): + try: + log.info("Updating Found Block Share Record") + self.do_import(self.dbi,True) # We can't Update if the record is not there. + self.dbi.found_block(data) + except Exception as e: + log.error("Update Found Block Share Record Failed: %s", e.args[0]) + + def check_password(self,username,password): + if username == "": + log.info("Rejected worker for blank username") + return False + wid = username+":-:"+password + if wid in self.usercache : + return True + elif self.dbi.check_password(username,password) : + self.usercache[wid] = 1 + return True + elif settings.USERS_AUTOADD == True : + self.insert_user(username,password) + self.usercache[wid] = 1 + return True + return False + + def insert_user(self,username,password): + return self.dbi.insert_user(username,password) + + def delete_user(self,username): + self.usercache = {} + return self.dbi.delete_user(username) + + def update_user(self,username,password): + self.usercache = {} + return self.dbi.update_user(username,password) + + def update_worker_diff(self,username,diff): + return self.dbi.update_worker_diff(username,diff) + + def get_pool_stats(self): + return self.dbi.get_pool_stats() + + def get_workers_stats(self): + return self.dbi.get_workers_stats() + + def clear_worker_diff(self): + return self.dbi.clear_worker_diff() + diff --git a/mining/DB_Mysql.py b/mining/DB_Mysql.py new file mode 100644 index 0000000..146d22a --- /dev/null +++ b/mining/DB_Mysql.py @@ -0,0 +1,246 @@ +import time +from stratum import settings +import stratum.logger +log = stratum.logger.get_logger('DB_Mysql') + +import MySQLdb + +class DB_Mysql(): + def __init__(self): + log.debug("Connecting to DB") + self.dbh = MySQLdb.connect(settings.DB_MYSQL_HOST,settings.DB_MYSQL_USER,settings.DB_MYSQL_PASS,settings.DB_MYSQL_DBNAME) + self.dbc = self.dbh.cursor() + + def updateStats(self,averageOverTime): + log.debug("Updating Stats") + # Note: we are using transactions... so we can set the speed = 0 and it doesn't take affect until we are commited. + #self.dbc.execute("update pool_worker set speed = 0, alive = 0"); + #stime = '%.0f' % ( time.time() - averageOverTime ); + #self.dbc.execute("select username,SUM(difficulty) from shares where time > FROM_UNIXTIME(%s) group by username", (stime,)) + #total_speed = 0 + #for name,shares in self.dbc.fetchall(): + # speed = int(int(shares) * pow(2,32)) / ( int(averageOverTime) * 1000 * 1000) + # total_speed += speed + # self.dbc.execute("update pool_worker set speed = %s, alive = 1 where username = %s", (speed,name)) + #self.dbc.execute("update pool set value = %s where parameter = 'pool_speed'",[total_speed]) + self.dbh.commit() + + def archive_check(self): + # Check for found shares to archive + #self.dbc.execute("select time from shares where upstream_result = 1 order by time limit 1") + #data = self.dbc.fetchone() + #if data is None or (data[0] + settings.ARCHIVE_DELAY) > time.time() : + # return False + return data[0] + + def archive_found(self,found_time): + self.dbc.execute("insert into shares_archive_found select * from shares where upstream_result = 'Y' and time <= FROM_UNIXTIME(%s)", (found_time,)) + self.dbh.commit() + + def archive_to_db(self,found_time): + self.dbc.execute("insert into shares_archive select * from shares where time <= FROM_UNIXTIME(%s)",(found_time,)) + self.dbh.commit() + + def archive_cleanup(self,found_time): + self.dbc.execute("delete from shares where time <= FROM_UNIXTIME(%s)",(found_time,)) + self.dbh.commit() + + def archive_get_shares(self,found_time): + #self.dbc.execute("select * from shares where time <= FROM_UNIXTIME(%s)",(found_time,)) + return self.dbc + + def import_shares(self,data): + log.debug("Importing Shares") +# 0 1 2 3 4 5 6 7 8 9 10 +# data: [worker_name,block_header,block_hash,difficulty,timestamp,is_valid,ip,block_height,prev_hash,invalid_reason,best_diff] + checkin_times = {} + total_shares = 0 + best_diff = 0 + for k,v in enumerate(data): + self.dbc.execute("insert into shares (rem_host,username,our_result,upstream_result,reason,share_diff) VALUES " +\ + "(%s,%s,%s,%s,%s,%s)", + (v[6],v[0],v[5],'N',v[9],v[3]) ) + + self.dbh.commit() + + + def found_block(self,data): + # Note: difficulty = -1 here + self.dbc.execute("update shares set upstream_result = %s, solution = %s where time = FROM_UNIXTIME(%s) and username = %s limit 1", + (data[5],data[2],data[4],data[0])) + self.dbh.commit() + + def delete_user(self,username): + log.debug("Deleting Username") + self.dbc.execute("delete from pool_worker where username = %s", + (username )) + self.dbh.commit() + + def insert_user(self,username,password): + log.debug("Adding Username/Password") + self.dbc.execute("insert into pool_worker (username,password) VALUES (%s,%s)", + (username, password )) + self.dbh.commit() + + def update_user(self,username,password): + log.debug("Updating Username/Password") + self.dbc.execute("update pool_worker set password = %(pass)s where username = %(uname)s", + (username, password )) + self.dbh.commit() + + def update_worker_diff(self,username,diff): + if settings.DATABASE_EXTEND == True : + self.dbc.execute("update pool_worker set difficulty = %s where username = %s",(diff,username)) + self.dbh.commit() + + def clear_worker_diff(self): + if settings.DATABASE_EXTEND == True : + self.dbc.execute("update pool_worker set difficulty = 0") + self.dbh.commit() + + def check_password(self,username,password): + log.debug("Checking Username/Password") + self.dbc.execute("select COUNT(*) from pool_worker where username = %s and password = %s", + (username, password )) + data = self.dbc.fetchone() + if data[0] > 0 : + return True + return False + + def update_pool_info(self,pi): + self.dbc.executemany("update pool set value = %s where parameter = %s",[(pi['blocks'],"bitcoin_blocks"), + (pi['balance'],"bitcoin_balance"), + (pi['connections'],"bitcoin_connections"), + (pi['difficulty'],"bitcoin_difficulty"), + (time.time(),"bitcoin_infotime") + ]) + self.dbh.commit() + + def get_pool_stats(self): + self.dbc.execute("select * from pool") + ret = {} + for data in self.dbc.fetchall(): + ret[data[0]] = data[1] + return ret + + def get_workers_stats(self): + self.dbc.execute("select username,speed,last_checkin,total_shares,total_rejects,total_found,alive,difficulty from pool_worker") + ret = {} + for data in self.dbc.fetchall(): + ret[data[0]] = { "username" : data[0], + "speed" : data[1], + "last_checkin" : time.mktime(data[2].timetuple()), + "total_shares" : data[3], + "total_rejects" : data[4], + "total_found" : data[5], + "alive" : data[6], + "difficulty" : data[7] } + return ret + + def close(self): + self.dbh.close() + + def check_tables(self): + log.debug("Checking Tables") + + # Do we have our tables? + shares_exist = False + self.dbc.execute("select COUNT(*) from INFORMATION_SCHEMA.STATISTICS " +\ + "where table_schema = %(schema)s and table_name = 'shares' and index_name = 'shares_username'", + {"schema": settings.DB_MYSQL_DBNAME }) + data = self.dbc.fetchone() + if data[0] <= 0 : + self.update_version_1() # no, we don't, so create them + + if settings.DATABASE_EXTEND == True : + self.update_tables() + + def update_tables(self): + version = 0 + current_version = 6 + while version < current_version : + self.dbc.execute("select value from pool where parameter = 'DB Version'") + data = self.dbc.fetchone() + version = int(data[0]) + if version < current_version : + log.info("Updating Database from %i to %i" % (version, version +1)) + getattr(self, 'update_version_' + str(version) )() + + def update_version_1(self): + if settings.DATABASE_EXTEND == True : + self.dbc.execute("create table if not exists shares " +\ + "(id serial primary key,time timestamp,rem_host TEXT, username TEXT, our_result BOOLEAN, upstream_result BOOLEAN, reason TEXT, solution TEXT, " +\ + "block_num INTEGER, prev_block_hash TEXT, useragent TEXT, difficulty INTEGER) ENGINE = MYISAM;") + self.dbc.execute("create index shares_username ON shares(username(10))") + + self.dbc.execute("create table if not exists pool_worker" +\ + "(id serial primary key,username TEXT, password TEXT, speed INTEGER, last_checkin timestamp" +\ + ") ENGINE = MYISAM") + self.dbc.execute("create index pool_worker_username ON pool_worker(username(10))") + + self.dbc.execute("create table if not exists pool(parameter TEXT, value TEXT)") + self.dbc.execute("alter table pool_worker add total_shares INTEGER default 0") + self.dbc.execute("alter table pool_worker add total_rejects INTEGER default 0") + self.dbc.execute("alter table pool_worker add total_found INTEGER default 0") + self.dbc.execute("insert into pool (parameter,value) VALUES ('DB Version',2)") + + else : + self.dbc.execute("create table if not exists shares" + \ + "(id serial,time timestamp,rem_host TEXT, username TEXT, our_result INTEGER, upstream_result INTEGER, reason TEXT, solution TEXT) ENGINE = MYISAM") + self.dbc.execute("create index shares_username ON shares(username(10))") + self.dbc.execute("create table if not exists pool_worker(id serial,username TEXT, password TEXT) ENGINE = MYISAM") + self.dbc.execute("create index pool_worker_username ON pool_worker(username(10))") + self.dbh.commit() + + + def update_version_2(self): + log.info("running update 2") + self.dbc.executemany("insert into pool (parameter,value) VALUES (%s,%s)",[('bitcoin_blocks',0), + ('bitcoin_balance',0), + ('bitcoin_connections',0), + ('bitcoin_difficulty',0), + ('pool_speed',0), + ('pool_total_found',0), + ('round_shares',0), + ('round_progress',0), + ('round_start',time.time()) + ]) + self.dbc.execute("update pool set value = 3 where parameter = 'DB Version'") + self.dbh.commit() + + def update_version_3(self): + log.info("running update 3") + self.dbc.executemany("insert into pool (parameter,value) VALUES (%s,%s)",[ + ('round_best_share',0), + ('bitcoin_infotime',0) + ]) + self.dbc.execute("alter table pool_worker add alive BOOLEAN") + self.dbc.execute("update pool set value = 4 where parameter = 'DB Version'") + self.dbh.commit() + + def update_version_4(self): + log.info("running update 4") + self.dbc.execute("alter table pool_worker add difficulty INTEGER default 0") + self.dbc.execute("create table if not exists shares_archive " +\ + "(id serial primary key,time timestamp,rem_host TEXT, username TEXT, our_result BOOLEAN, upstream_result BOOLEAN, reason TEXT, solution TEXT, " +\ + "block_num INTEGER, prev_block_hash TEXT, useragent TEXT, difficulty INTEGER) ENGINE = MYISAM;") + self.dbc.execute("create table if not exists shares_archive_found " +\ + "(id serial primary key,time timestamp,rem_host TEXT, username TEXT, our_result BOOLEAN, upstream_result BOOLEAN, reason TEXT, solution TEXT, " +\ + "block_num INTEGER, prev_block_hash TEXT, useragent TEXT, difficulty INTEGER) ENGINE = MYISAM;") + self.dbc.execute("update pool set value = 5 where parameter = 'DB Version'") + self.dbh.commit() + + def update_version_5(self): + log.info("running update 5") + # Adding Primary key to table: pool + self.dbc.execute("alter table pool add primary key (parameter(100))") + self.dbh.commit() + # Adjusting indicies on table: shares + self.dbc.execute("DROP INDEX shares_username ON shares") + self.dbc.execute("CREATE INDEX shares_time_username ON shares(time,username(10))") + self.dbc.execute("CREATE INDEX shares_upstreamresult ON shares(upstream_result)") + self.dbh.commit() + + self.dbc.execute("update pool set value = 6 where parameter = 'DB Version'") + self.dbh.commit() + diff --git a/mining/DB_None.py b/mining/DB_None.py new file mode 100644 index 0000000..eda5121 --- /dev/null +++ b/mining/DB_None.py @@ -0,0 +1,48 @@ +import stratum.logger +log = stratum.logger.get_logger('None') + +class DB_None(): + def __init__(self): + log.debug("Connecting to DB") + + def updateStats(self,averageOverTime): + log.debug("Updating Stats") + + def import_shares(self,data): + log.debug("Importing Shares") + + def found_block(self,data): + log.debug("Found Block") + + def delete_user(self,username): + log.debug("Deleting Username") + + def insert_user(self,username,password): + log.debug("Adding Username/Password") + + def update_user(self,username,password): + log.debug("Updating Username/Password") + + def check_password(self,username,password): + log.debug("Checking Username/Password") + return True + + def update_pool_info(self,pi): + log.debug("Update Pool Info") + + def get_pool_stats(self): + log.debug("Get Pool Stats") + ret = {} + return ret + + def get_workers_stats(self): + log.debug("Get Workers Stats") + ret = {} + return ret + + def check_tables(self): + log.debug("Checking Tables") + + def close(self): + log.debug("Close Connection") + diff --git a/mining/basic_share_limiter.py b/mining/basic_share_limiter.py new file mode 100644 index 0000000..6893a12 --- /dev/null +++ b/mining/basic_share_limiter.py @@ -0,0 +1,117 @@ +from stratum import settings + +import stratum.logger +log = stratum.logger.get_logger('BasicShareLimiter') + +import DBInterface +dbi = DBInterface.DBInterface() +dbi.clear_worker_diff() + +from twisted.internet import defer +from mining.interfaces import Interfaces +import time + +''' This is just a customized ring buffer ''' +class SpeedBuffer: + def __init__(self, size_max): + self.max = size_max + self.data = [] + self.cur = 0 + def append(self, x): + self.data.append(x) + self.cur += 1 + if len(self.data) == self.max: + self.cur = 0 + self.__class__ = SpeedBufferFull + def avg(self): + return sum(self.data) / self.cur + def pos(self): + return self.cur + def clear(self): + self.data = [] + self.cur = 0 + def size(self): + return self.cur + +class SpeedBufferFull: + def __init__(self, n): + raise "you should use SpeedBuffer" + def append(self, x): + self.data[self.cur] = x + self.cur = (self.cur + 1) % self.max + def avg(self): + return sum(self.data) / self.max + def pos(self): + return self.cur + def clear(self): + self.data = [] + self.cur = 0 + self.__class__ = SpeedBuffer + def size(self): + return self.max + +class BasicShareLimiter(object): + def __init__(self): + self.worker_stats = {} + self.target = settings.VDIFF_TARGET + self.retarget = settings.VDIFF_RETARGET + self.variance = self.target * (float(settings.VDIFF_VARIANCE_PERCENT) / float(100)) + self.tmin = self.target - self.variance + self.tmax = self.target + self.variance + self.buffersize = self.retarget / self.target * 4 + # TODO: trim the hash of inactive workers + + def submit(self, connection_ref, job_id, current_difficulty, timestamp, worker_name): + ts = int(timestamp) + # Init the stats for this worker if it isn't set. + if worker_name not in self.worker_stats or self.worker_stats[worker_name]['last_ts'] < ts - settings.DB_USERCACHE_TIME : + self.worker_stats[worker_name] = {'last_rtc': (ts - self.retarget / 2), 'last_ts': ts, 'buffer': SpeedBuffer(self.buffersize) } + dbi.update_worker_diff(worker_name, settings.POOL_TARGET) + return + + # Standard share update of data + self.worker_stats[worker_name]['buffer'].append(ts - self.worker_stats[worker_name]['last_ts']) + self.worker_stats[worker_name]['last_ts'] = ts + # Do We retarget? If not, we're done. + if ts - self.worker_stats[worker_name]['last_rtc'] < self.retarget and self.worker_stats[worker_name]['buffer'].size() > 0: + return + + # Set up and log our check + self.worker_stats[worker_name]['last_rtc'] = ts + avg = self.worker_stats[worker_name]['buffer'].avg() + log.info("Checking Retarget for %s (%i) avg. %i target %i+-%i" % (worker_name, current_difficulty, avg, + self.target, self.variance)) + if avg < 1: + log.info("Reseting avg = 1 since it's SOOO low") + avg = 1 + + # Figure out our Delta-Diff + ddiff = int((float(current_difficulty) * (float(self.target) / float(avg))) - current_difficulty) + if (avg > self.tmax and current_difficulty > settings.POOL_TARGET): + # For fractional -0.1 ddiff's just drop by 1 + if ddiff > -1: + ddiff = -1 + # Don't drop below POOL_TARGET + if (ddiff + current_difficulty) < settings.POOL_TARGET: + ddiff = settings.POOL_TARGET - current_difficulty + elif avg < self.tmin: + # For fractional 0.1 ddiff's just up by 1 + if ddiff < 1: + ddiff = 1 + # Don't go above BITCOIN_DIFF + # TODO + else: # If we are here, then we should not be retargeting. + return + + # At this point we are retargeting this worker + new_diff = current_difficulty + ddiff + log.info("Retarget for %s %i old: %i new: %i" % (worker_name, ddiff, current_difficulty, new_diff)) + + self.worker_stats[worker_name]['buffer'].clear() + session = connection_ref().get_session() + session['prev_diff'] = session['difficulty'] + session['prev_jobid'] = job_id + session['difficulty'] = new_diff + connection_ref().rpc('mining.set_difficulty', [new_diff, ], is_notification=True) + dbi.update_worker_diff(worker_name, new_diff) + diff --git a/mining/interfaces.py b/mining/interfaces.py index 40f1d2f..5bbe68a 100644 --- a/mining/interfaces.py +++ b/mining/interfaces.py @@ -2,51 +2,65 @@ Default implementation do almost nothing, you probably want to override these classes and customize references to interface instances in your launcher. (see launcher_demo.tac for an example). -''' - +''' +from stratum import settings import time from twisted.internet import reactor, defer +from lib.util import b58encode import stratum.logger log = stratum.logger.get_logger('interfaces') +import DBInterface +dbi = DBInterface.DBInterface() +dbi.init_main() + class WorkerManagerInterface(object): def __init__(self): # Fire deferred when manager is ready self.on_load = defer.Deferred() self.on_load.callback(True) - + def authorize(self, worker_name, worker_password): - return True + # Important NOTE: This is called on EVERY submitted share. So you'll need caching!!! + return dbi.check_password(worker_name,worker_password) + class ShareLimiterInterface(object): '''Implement difficulty adjustments here''' - - def submit(self, connection_ref, current_difficulty, timestamp): + + def submit(self, connection_ref, job_id, current_difficulty, timestamp, worker_name): '''connection - weak reference to Protocol instance current_difficulty - difficulty of the connection timestamp - submission time of current share - + - raise SubmitException for stop processing this request - call mining.set_difficulty on connection to adjust the difficulty''' - pass - + return dbi.update_worker_diff(worker_name,settings.POOL_TARGET) + class ShareManagerInterface(object): def __init__(self): # Fire deferred when manager is ready self.on_load = defer.Deferred() self.on_load.callback(True) + self.block_height = 0 + self.prev_hash = 0 - def on_network_block(self, prevhash): + def on_network_block(self, prevhash, block_height): '''Prints when there's new block coming from the network (possibly new round)''' + self.block_height = block_height + self.prev_hash = b58encode(int(prevhash,16)) pass - - def on_submit_share(self, worker_name, block_header, block_hash, shares, timestamp, is_valid): - log.info("%s %s %s" % (block_hash, 'valid' if is_valid else 'INVALID', worker_name)) - - def on_submit_block(self, is_accepted, worker_name, block_header, block_hash, timestamp): + + def on_submit_share(self, worker_name, block_header, block_hash, difficulty, timestamp, is_valid, ip, invalid_reason, share_diff ): + log.info("%s (%s) %s %s" % (block_hash, share_diff, 'valid' if is_valid else 'INVALID', worker_name)) + dbi.queue_share([worker_name,block_header,block_hash,difficulty,timestamp,is_valid, ip, self.block_height, self.prev_hash, + invalid_reason, share_diff ]) + + def on_submit_block(self, is_accepted, worker_name, block_header, block_hash, timestamp, ip, share_diff ): log.info("Block %s %s" % (block_hash, 'ACCEPTED' if is_accepted else 'REJECTED')) - + dbi.found_block([worker_name,block_header,block_hash,-1,timestamp,is_accepted,ip,self.block_height, self.prev_hash, share_diff ]) + class TimestamperInterface(object): '''This is the only source for current time in the application. Override this for generating unix timestamp in different way.''' @@ -57,34 +71,35 @@ class PredictableTimestamperInterface(TimestamperInterface): '''Predictable timestamper may be useful for unit testing.''' start_time = 1345678900 # Some day in year 2012 delta = 0 - + def time(self): self.delta += 1 return self.start_time + self.delta - + class Interfaces(object): worker_manager = None share_manager = None share_limiter = None timestamper = None template_registry = None - + @classmethod def set_worker_manager(cls, manager): - cls.worker_manager = manager - - @classmethod + cls.worker_manager = manager + + @classmethod def set_share_manager(cls, manager): cls.share_manager = manager - @classmethod + @classmethod def set_share_limiter(cls, limiter): cls.share_limiter = limiter - + @classmethod def set_timestamper(cls, manager): cls.timestamper = manager - + @classmethod def set_template_registry(cls, registry): + dbi.set_bitcoinrpc(registry.bitcoin_rpc) cls.template_registry = registry diff --git a/mining/service.py b/mining/service.py index 4242542..0a368f4 100644 --- a/mining/service.py +++ b/mining/service.py @@ -1,6 +1,7 @@ import binascii from twisted.internet import defer +from stratum import settings from stratum.services import GenericService, admin from stratum.pubsub import Pubsub from interfaces import Interfaces @@ -9,78 +10,78 @@ from lib.exceptions import SubmitException import stratum.logger log = stratum.logger.get_logger('mining') - + class MiningService(GenericService): '''This service provides public API for Stratum mining proxy or any Stratum-compatible miner software. - + Warning - any callable argument of this class will be propagated over Stratum protocol for public audience!''' - + service_type = 'mining' service_vendor = 'stratum' is_default = True - + @admin def update_block(self): - '''Connect this RPC call to 'novacoind -blocknotify' for + '''Connect this RPC call to 'bitcoind -blocknotify' for instant notification about new block on the network. See blocknotify.sh in /scripts/ for more info.''' - + log.info("New block notification received") Interfaces.template_registry.update_block() - return True - + return True + def authorize(self, worker_name, worker_password): '''Let authorize worker on this connection.''' - + session = self.connection_ref().get_session() session.setdefault('authorized', {}) - + if Interfaces.worker_manager.authorize(worker_name, worker_password): session['authorized'][worker_name] = worker_password return True - + else: if worker_name in session['authorized']: del session['authorized'][worker_name] return False - - def subscribe(self, *args): + + def subscribe(self,*arg): '''Subscribe for receiving mining jobs. This will return subscription details, extranonce1_hex and extranonce2_size''' - + extranonce1 = Interfaces.template_registry.get_new_extranonce1() extranonce2_size = Interfaces.template_registry.extranonce2_size extranonce1_hex = binascii.hexlify(extranonce1) - + session = self.connection_ref().get_session() session['extranonce1'] = extranonce1 - session['difficulty'] = 1 # Following protocol specs, default diff is 1 + session['difficulty'] = settings.POOL_TARGET # Following protocol specs, default diff is 1 return Pubsub.subscribe(self.connection_ref(), MiningSubscription()) + (extranonce1_hex, extranonce2_size) - - ''' + + ''' def submit(self, worker_name, job_id, extranonce2, ntime, nonce): import time start = time.time() - + for x in range(100): try: ret = self.submit2(worker_name, job_id, extranonce2, ntime, nonce) except: pass - + log.info("LEN %.03f" % (time.time() - start)) return ret ''' - + def submit(self, worker_name, job_id, extranonce2, ntime, nonce): '''Try to solve block candidate using given parameters.''' - + session = self.connection_ref().get_session() session.setdefault('authorized', {}) - + # Check if worker is authorized to submit shares if not Interfaces.worker_manager.authorize(worker_name, session['authorized'].get(worker_name)): @@ -90,46 +91,47 @@ class MiningService(GenericService): extranonce1_bin = session.get('extranonce1', None) if not extranonce1_bin: raise SubmitException("Connection is not subscribed for mining") - + difficulty = session['difficulty'] submit_time = Interfaces.timestamper.time() - - Interfaces.share_limiter.submit(self.connection_ref, difficulty, submit_time) - + ip = self.connection_ref()._get_ip() + + Interfaces.share_limiter.submit(self.connection_ref, job_id, difficulty, submit_time, worker_name) + # This checks if submitted share meet all requirements # and it is valid proof of work. try: - (block_header, block_hash, on_submit) = Interfaces.template_registry.submit_share(job_id, - worker_name, extranonce1_bin, extranonce2, ntime, nonce, difficulty) - except SubmitException: + (block_header, block_hash, share_diff, on_submit) = Interfaces.template_registry.submit_share(job_id, + worker_name, session, extranonce1_bin, extranonce2, ntime, nonce, difficulty) + except SubmitException as e: # block_header and block_hash are None when submitted data are corrupted Interfaces.share_manager.on_submit_share(worker_name, None, None, difficulty, - submit_time, False) + submit_time, False, ip, e[0], 0) raise - - + + Interfaces.share_manager.on_submit_share(worker_name, block_header, block_hash, difficulty, - submit_time, True) - + submit_time, True, ip, '', share_diff) + if on_submit != None: - # Pool performs submitblock() to novacoind. Let's hook + # Pool performs submitblock() to bitcoind. Let's hook # to result and report it to share manager on_submit.addCallback(Interfaces.share_manager.on_submit_block, - worker_name, block_header, block_hash, submit_time) + worker_name, block_header, block_hash, submit_time,ip,share_diff) return True - + # Service documentation for remote discovery update_block.help_text = "Notify Stratum server about new block on the network." update_block.params = [('password', 'string', 'Administrator password'),] - + authorize.help_text = "Authorize worker for submitting shares on this connection." authorize.params = [('worker_name', 'string', 'Name of the worker, usually in the form of user_login.worker_id.'), ('worker_password', 'string', 'Worker password'),] - + subscribe.help_text = "Subscribes current connection for receiving new mining jobs." subscribe.params = [] - + submit.help_text = "Submit solved share back to the server. Excessive sending of invalid shares "\ "or shares above indicated target (see Stratum mining docs for set_target()) may lead "\ "to temporary or permanent ban of user,worker or IP address." @@ -138,4 +140,4 @@ class MiningService(GenericService): ('extranonce2', 'string', 'hex-encoded big-endian extranonce2, length depends on extranonce2_size from mining.notify.'), ('ntime', 'string', 'UNIX timestamp (32bit integer, big-endian, hex-encoded), must be >= ntime provided by mining,notify and <= current time'), ('nonce', 'string', '32bit integer, hex-encoded, big-endian'),] - + diff --git a/mining/subscription.py b/mining/subscription.py index 3eea5cc..41c77d0 100644 --- a/mining/subscription.py +++ b/mining/subscription.py @@ -1,54 +1,55 @@ from stratum.pubsub import Pubsub, Subscription from mining.interfaces import Interfaces +from stratum import settings import stratum.logger log = stratum.logger.get_logger('subscription') class MiningSubscription(Subscription): '''This subscription object implements logic for broadcasting new jobs to the clients.''' - + event = 'mining.notify' - + @classmethod def on_template(cls, is_new_block): '''This is called when TemplateRegistry registers new block which we have to broadcast clients.''' - + start = Interfaces.timestamper.time() - + clean_jobs = is_new_block (job_id, prevhash, coinb1, coinb2, merkle_branch, version, nbits, ntime, _) = \ Interfaces.template_registry.get_last_broadcast_args() - + # Push new job to subscribed clients cls.emit(job_id, prevhash, coinb1, coinb2, merkle_branch, version, nbits, ntime, clean_jobs) - + cnt = Pubsub.get_subscription_count(cls.event) log.info("BROADCASTED to %d connections in %.03f sec" % (cnt, (Interfaces.timestamper.time() - start))) - + def _finish_after_subscribe(self, result): '''Send new job to newly subscribed client''' - try: + try: (job_id, prevhash, coinb1, coinb2, merkle_branch, version, nbits, ntime, _) = \ Interfaces.template_registry.get_last_broadcast_args() except Exception: log.error("Template not ready yet") return result - + # Force set higher difficulty - # TODO - #self.connection_ref().rpc('mining.set_difficulty', [2,], is_notification=True) + self.connection_ref().rpc('mining.set_difficulty', [settings.POOL_TARGET,], is_notification=True) #self.connection_ref().rpc('client.get_version', []) - + # Force client to remove previous jobs if any (eg. from previous connection) clean_jobs = True self.emit_single(job_id, prevhash, coinb1, coinb2, merkle_branch, version, nbits, ntime, True) - + return result - + def after_subscribe(self, *args): '''This will send new job to the client *after* he receive subscription details. on_finish callback solve the issue that job is broadcasted *during* the subscription request and client receive messages in wrong order.''' self.connection_ref().on_finish.addCallback(self._finish_after_subscribe) +