X-Git-Url: https://git.novaco.in/?p=stratum-mining.git;a=blobdiff_plain;f=mining%2FDBInterface.py;fp=mining%2FDBInterface.py;h=4152e6e598c110ee528ecbecd275b0fa6980c7bc;hp=0000000000000000000000000000000000000000;hb=1e5ed48cb8611adccaf854c4ba2c83c51cde832c;hpb=f91c94efe54a29de9af37412349df5ec7415089c diff --git a/mining/DBInterface.py b/mining/DBInterface.py new file mode 100644 index 0000000..4152e6e --- /dev/null +++ b/mining/DBInterface.py @@ -0,0 +1,222 @@ +from twisted.internet import reactor, defer +import time +from datetime import datetime +import Queue + +from stratum import settings + +import stratum.logger +log = stratum.logger.get_logger('DBInterface') + +class DBInterface(): + def __init__(self): + self.dbi = self.connectDB() + + def init_main(self): + self.dbi.check_tables() + + self.q = Queue.Queue() + self.queueclock = None + + self.usercache = {} + self.clearusercache() + + self.nextStatsUpdate = 0 + + self.scheduleImport() + + def set_bitcoinrpc(self,bitcoinrpc): + self.bitcoinrpc=bitcoinrpc + + def connectDB(self): + # Choose our database driver and put it in self.dbi + if settings.DATABASE_DRIVER == "sqlite": + log.debug('DB_Sqlite INIT') + import DB_Sqlite + return DB_Sqlite.DB_Sqlite() + elif settings.DATABASE_DRIVER == "mysql": + log.debug('DB_Mysql INIT') + import DB_Mysql + return DB_Mysql.DB_Mysql() + elif settings.DATABASE_DRIVER == "postgresql": + log.debug('DB_Postgresql INIT') + import DB_Postgresql + return DB_Postgresql.DB_Postgresql() + elif settings.DATABASE_DRIVER == "none": + log.debug('DB_None INIT') + import DB_None + return DB_None.DB_None() + else: + log.error('Invalid DATABASE_DRIVER -- using NONE') + log.debug('DB_None INIT') + import DB_None + return DB_None.DB_None() + + def clearusercache(self): + self.usercache = {} + self.usercacheclock = reactor.callLater( settings.DB_USERCACHE_TIME , self.clearusercache) + + def scheduleImport(self): + # This schedule's the Import + use_thread = True + if settings.DATABASE_DRIVER == "sqlite": + use_thread = False + + if use_thread: + self.queueclock = reactor.callLater( settings.DB_LOADER_CHECKTIME , self.run_import_thread) + else: + self.queueclock = reactor.callLater( settings.DB_LOADER_CHECKTIME , self.run_import) + + def run_import_thread(self): + if self.q.qsize() >= settings.DB_LOADER_REC_MIN: # Don't incur thread overhead if we're not going to run + reactor.callInThread(self.import_thread) + self.scheduleImport() + + def run_import(self): + self.do_import(self.dbi,False) + if settings.DATABASE_EXTEND and time.time() > self.nextStatsUpdate : + self.nextStatsUpdate = time.time() + settings.DB_STATS_AVG_TIME + self.dbi.updateStats(settings.DB_STATS_AVG_TIME) + d = self.bitcoinrpc.getinfo() + d.addCallback(self._update_pool_info) + if settings.ARCHIVE_SHARES : + self.archive_shares(self.dbi) + self.scheduleImport() + + def import_thread(self): + # Here we are in the thread. + dbi = self.connectDB() + self.do_import(dbi,False) + if settings.DATABASE_EXTEND and time.time() > self.nextStatsUpdate : + self.nextStatsUpdate = time.time() + settings.DB_STATS_AVG_TIME + dbi.updateStats(settings.DB_STATS_AVG_TIME) + d = self.bitcoinrpc.getinfo() + d.addCallback(self._update_pool_info) + if settings.ARCHIVE_SHARES : + self.archive_shares(dbi) + dbi.close() + + def _update_pool_info(self,data): + self.dbi.update_pool_info({ 'blocks' : data['blocks'], 'balance' : data['balance'], + 'connections' : data['connections'], 'difficulty' : data['difficulty'] }) + + def do_import(self,dbi,force): + # Only run if we have data + while force == True or self.q.qsize() >= settings.DB_LOADER_REC_MIN: + force = False + # Put together the data we want to import + sqldata = [] + datacnt = 0 + while self.q.empty() == False and datacnt < settings.DB_LOADER_REC_MAX : + datacnt += 1 + data = self.q.get() + sqldata.append(data) + self.q.task_done() + # try to do the import, if we fail, log the error and put the data back in the queue + try: + log.info("Inserting %s Share Records",datacnt) + dbi.import_shares(sqldata) + except Exception as e: + log.error("Insert Share Records Failed: %s", e.args[0]) + for k,v in enumerate(sqldata): + self.q.put(v) + break # Allows us to sleep a little + + def archive_shares(self,dbi): + found_time = dbi.archive_check() + if found_time == 0: + return False + log.info("Archiving shares newer than timestamp %f " % found_time) + dbi.archive_found(found_time) + if settings.ARCHIVE_MODE == 'db': + dbi.archive_to_db(found_time) + dbi.archive_cleanup(found_time) + elif settings.ARCHIVE_MODE == 'file': + shares = dbi.archive_get_shares(found_time) + + filename = settings.ARCHIVE_FILE + if settings.ARCHIVE_FILE_APPEND_TIME : + filename = filename + "-" + datetime.fromtimestamp(found_time).strftime("%Y-%m-%d-%H-%M-%S") + filename = filename + ".csv" + + if settings.ARCHIVE_FILE_COMPRESS == 'gzip' : + import gzip + filename = filename + ".gz" + filehandle = gzip.open(filename, 'a') + elif settings.ARCHIVE_FILE_COMPRESS == 'bzip2' and settings.ARCHIVE_FILE_APPEND_TIME : + import bz2 + filename = filename + ".bz2" + filehandle = bz2.BZFile(filename, mode='wb', buffering=4096 ) + else: + filehandle = open(filename, "a") + + while True: + row = shares.fetchone() + if row == None: + break + str1 = '","'.join([str(x) for x in row]) + filehandle.write('"%s"\n' % str1) + filehandle.close() + + clean = False + while not clean: + try: + dbi.archive_cleanup(found_time) + clean = True + except Exception as e: + clean = False + log.error("Archive Cleanup Failed... will retry to cleanup in 30 seconds") + sleep(30) + + return True + + def queue_share(self,data): + self.q.put( data ) + + def found_block(self,data): + try: + log.info("Updating Found Block Share Record") + self.do_import(self.dbi,True) # We can't Update if the record is not there. + self.dbi.found_block(data) + except Exception as e: + log.error("Update Found Block Share Record Failed: %s", e.args[0]) + + def check_password(self,username,password): + if username == "": + log.info("Rejected worker for blank username") + return False + wid = username+":-:"+password + if wid in self.usercache : + return True + elif self.dbi.check_password(username,password) : + self.usercache[wid] = 1 + return True + elif settings.USERS_AUTOADD == True : + self.insert_user(username,password) + self.usercache[wid] = 1 + return True + return False + + def insert_user(self,username,password): + return self.dbi.insert_user(username,password) + + def delete_user(self,username): + self.usercache = {} + return self.dbi.delete_user(username) + + def update_user(self,username,password): + self.usercache = {} + return self.dbi.update_user(username,password) + + def update_worker_diff(self,username,diff): + return self.dbi.update_worker_diff(username,diff) + + def get_pool_stats(self): + return self.dbi.get_pool_stats() + + def get_workers_stats(self): + return self.dbi.get_workers_stats() + + def clear_worker_diff(self): + return self.dbi.clear_worker_diff() +