--- /dev/null
+from twisted.internet import reactor, defer
+import time
+from datetime import datetime
+import Queue
+
+from stratum import settings
+
+import stratum.logger
+log = stratum.logger.get_logger('DBInterface')
+
+class DBInterface():
+ def __init__(self):
+ self.dbi = self.connectDB()
+
+ def init_main(self):
+ self.dbi.check_tables()
+
+ self.q = Queue.Queue()
+ self.queueclock = None
+
+ self.usercache = {}
+ self.clearusercache()
+
+ self.nextStatsUpdate = 0
+
+ self.scheduleImport()
+
+ def set_bitcoinrpc(self,bitcoinrpc):
+ self.bitcoinrpc=bitcoinrpc
+
+ def connectDB(self):
+ # Choose our database driver and put it in self.dbi
+ if settings.DATABASE_DRIVER == "sqlite":
+ log.debug('DB_Sqlite INIT')
+ import DB_Sqlite
+ return DB_Sqlite.DB_Sqlite()
+ elif settings.DATABASE_DRIVER == "mysql":
+ log.debug('DB_Mysql INIT')
+ import DB_Mysql
+ return DB_Mysql.DB_Mysql()
+ elif settings.DATABASE_DRIVER == "postgresql":
+ log.debug('DB_Postgresql INIT')
+ import DB_Postgresql
+ return DB_Postgresql.DB_Postgresql()
+ elif settings.DATABASE_DRIVER == "none":
+ log.debug('DB_None INIT')
+ import DB_None
+ return DB_None.DB_None()
+ else:
+ log.error('Invalid DATABASE_DRIVER -- using NONE')
+ log.debug('DB_None INIT')
+ import DB_None
+ return DB_None.DB_None()
+
+ def clearusercache(self):
+ self.usercache = {}
+ self.usercacheclock = reactor.callLater( settings.DB_USERCACHE_TIME , self.clearusercache)
+
+ def scheduleImport(self):
+ # This schedule's the Import
+ use_thread = True
+ if settings.DATABASE_DRIVER == "sqlite":
+ use_thread = False
+
+ if use_thread:
+ self.queueclock = reactor.callLater( settings.DB_LOADER_CHECKTIME , self.run_import_thread)
+ else:
+ self.queueclock = reactor.callLater( settings.DB_LOADER_CHECKTIME , self.run_import)
+
+ def run_import_thread(self):
+ if self.q.qsize() >= settings.DB_LOADER_REC_MIN: # Don't incur thread overhead if we're not going to run
+ reactor.callInThread(self.import_thread)
+ self.scheduleImport()
+
+ def run_import(self):
+ self.do_import(self.dbi,False)
+ if settings.DATABASE_EXTEND and time.time() > self.nextStatsUpdate :
+ self.nextStatsUpdate = time.time() + settings.DB_STATS_AVG_TIME
+ self.dbi.updateStats(settings.DB_STATS_AVG_TIME)
+ d = self.bitcoinrpc.getinfo()
+ d.addCallback(self._update_pool_info)
+ if settings.ARCHIVE_SHARES :
+ self.archive_shares(self.dbi)
+ self.scheduleImport()
+
+ def import_thread(self):
+ # Here we are in the thread.
+ dbi = self.connectDB()
+ self.do_import(dbi,False)
+ if settings.DATABASE_EXTEND and time.time() > self.nextStatsUpdate :
+ self.nextStatsUpdate = time.time() + settings.DB_STATS_AVG_TIME
+ dbi.updateStats(settings.DB_STATS_AVG_TIME)
+ d = self.bitcoinrpc.getinfo()
+ d.addCallback(self._update_pool_info)
+ if settings.ARCHIVE_SHARES :
+ self.archive_shares(dbi)
+ dbi.close()
+
+ def _update_pool_info(self,data):
+ self.dbi.update_pool_info({ 'blocks' : data['blocks'], 'balance' : data['balance'],
+ 'connections' : data['connections'], 'difficulty' : data['difficulty'] })
+
+ def do_import(self,dbi,force):
+ # Only run if we have data
+ while force == True or self.q.qsize() >= settings.DB_LOADER_REC_MIN:
+ force = False
+ # Put together the data we want to import
+ sqldata = []
+ datacnt = 0
+ while self.q.empty() == False and datacnt < settings.DB_LOADER_REC_MAX :
+ datacnt += 1
+ data = self.q.get()
+ sqldata.append(data)
+ self.q.task_done()
+ # try to do the import, if we fail, log the error and put the data back in the queue
+ try:
+ log.info("Inserting %s Share Records",datacnt)
+ dbi.import_shares(sqldata)
+ except Exception as e:
+ log.error("Insert Share Records Failed: %s", e.args[0])
+ for k,v in enumerate(sqldata):
+ self.q.put(v)
+ break # Allows us to sleep a little
+
+ def archive_shares(self,dbi):
+ found_time = dbi.archive_check()
+ if found_time == 0:
+ return False
+ log.info("Archiving shares newer than timestamp %f " % found_time)
+ dbi.archive_found(found_time)
+ if settings.ARCHIVE_MODE == 'db':
+ dbi.archive_to_db(found_time)
+ dbi.archive_cleanup(found_time)
+ elif settings.ARCHIVE_MODE == 'file':
+ shares = dbi.archive_get_shares(found_time)
+
+ filename = settings.ARCHIVE_FILE
+ if settings.ARCHIVE_FILE_APPEND_TIME :
+ filename = filename + "-" + datetime.fromtimestamp(found_time).strftime("%Y-%m-%d-%H-%M-%S")
+ filename = filename + ".csv"
+
+ if settings.ARCHIVE_FILE_COMPRESS == 'gzip' :
+ import gzip
+ filename = filename + ".gz"
+ filehandle = gzip.open(filename, 'a')
+ elif settings.ARCHIVE_FILE_COMPRESS == 'bzip2' and settings.ARCHIVE_FILE_APPEND_TIME :
+ import bz2
+ filename = filename + ".bz2"
+ filehandle = bz2.BZFile(filename, mode='wb', buffering=4096 )
+ else:
+ filehandle = open(filename, "a")
+
+ while True:
+ row = shares.fetchone()
+ if row == None:
+ break
+ str1 = '","'.join([str(x) for x in row])
+ filehandle.write('"%s"\n' % str1)
+ filehandle.close()
+
+ clean = False
+ while not clean:
+ try:
+ dbi.archive_cleanup(found_time)
+ clean = True
+ except Exception as e:
+ clean = False
+ log.error("Archive Cleanup Failed... will retry to cleanup in 30 seconds")
+ sleep(30)
+
+ return True
+
+ def queue_share(self,data):
+ self.q.put( data )
+
+ def found_block(self,data):
+ try:
+ log.info("Updating Found Block Share Record")
+ self.do_import(self.dbi,True) # We can't Update if the record is not there.
+ self.dbi.found_block(data)
+ except Exception as e:
+ log.error("Update Found Block Share Record Failed: %s", e.args[0])
+
+ def check_password(self,username,password):
+ if username == "":
+ log.info("Rejected worker for blank username")
+ return False
+ wid = username+":-:"+password
+ if wid in self.usercache :
+ return True
+ elif self.dbi.check_password(username,password) :
+ self.usercache[wid] = 1
+ return True
+ elif settings.USERS_AUTOADD == True :
+ self.insert_user(username,password)
+ self.usercache[wid] = 1
+ return True
+ return False
+
+ def insert_user(self,username,password):
+ return self.dbi.insert_user(username,password)
+
+ def delete_user(self,username):
+ self.usercache = {}
+ return self.dbi.delete_user(username)
+
+ def update_user(self,username,password):
+ self.usercache = {}
+ return self.dbi.update_user(username,password)
+
+ def update_worker_diff(self,username,diff):
+ return self.dbi.update_worker_diff(username,diff)
+
+ def get_pool_stats(self):
+ return self.dbi.get_pool_stats()
+
+ def get_workers_stats(self):
+ return self.dbi.get_workers_stats()
+
+ def clear_worker_diff(self):
+ return self.dbi.clear_worker_diff()
+