1 from twisted.internet import reactor, defer
3 from datetime import datetime
6 from stratum import settings
9 log = stratum.logger.get_logger('DBInterface')
13 self.dbi = self.connectDB()
16 self.dbi.check_tables()
18 self.q = Queue.Queue()
19 self.queueclock = None
24 self.nextStatsUpdate = 0
28 def set_bitcoinrpc(self,bitcoinrpc):
29 self.bitcoinrpc=bitcoinrpc
32 # Choose our database driver and put it in self.dbi
33 if settings.DATABASE_DRIVER == "sqlite":
34 log.debug('DB_Sqlite INIT')
36 return DB_Sqlite.DB_Sqlite()
37 elif settings.DATABASE_DRIVER == "mysql":
38 log.debug('DB_Mysql INIT')
40 return DB_Mysql.DB_Mysql()
41 elif settings.DATABASE_DRIVER == "postgresql":
42 log.debug('DB_Postgresql INIT')
44 return DB_Postgresql.DB_Postgresql()
45 elif settings.DATABASE_DRIVER == "none":
46 log.debug('DB_None INIT')
48 return DB_None.DB_None()
50 log.error('Invalid DATABASE_DRIVER -- using NONE')
51 log.debug('DB_None INIT')
53 return DB_None.DB_None()
55 def clearusercache(self):
57 self.usercacheclock = reactor.callLater( settings.DB_USERCACHE_TIME , self.clearusercache)
59 def scheduleImport(self):
60 # This schedule's the Import
62 if settings.DATABASE_DRIVER == "sqlite":
66 self.queueclock = reactor.callLater( settings.DB_LOADER_CHECKTIME , self.run_import_thread)
68 self.queueclock = reactor.callLater( settings.DB_LOADER_CHECKTIME , self.run_import)
70 def run_import_thread(self):
71 if self.q.qsize() >= settings.DB_LOADER_REC_MIN: # Don't incur thread overhead if we're not going to run
72 reactor.callInThread(self.import_thread)
76 self.do_import(self.dbi,False)
77 if settings.DATABASE_EXTEND and time.time() > self.nextStatsUpdate :
78 self.nextStatsUpdate = time.time() + settings.DB_STATS_AVG_TIME
79 self.dbi.updateStats(settings.DB_STATS_AVG_TIME)
80 d = self.bitcoinrpc.getinfo()
81 d.addCallback(self._update_pool_info)
82 if settings.ARCHIVE_SHARES :
83 self.archive_shares(self.dbi)
86 def import_thread(self):
87 # Here we are in the thread.
88 dbi = self.connectDB()
89 self.do_import(dbi,False)
90 if settings.DATABASE_EXTEND and time.time() > self.nextStatsUpdate :
91 self.nextStatsUpdate = time.time() + settings.DB_STATS_AVG_TIME
92 dbi.updateStats(settings.DB_STATS_AVG_TIME)
93 d = self.bitcoinrpc.getinfo()
94 d.addCallback(self._update_pool_info)
95 if settings.ARCHIVE_SHARES :
96 self.archive_shares(dbi)
99 def _update_pool_info(self,data):
100 self.dbi.update_pool_info({ 'blocks' : data['blocks'], 'balance' : data['balance'],
101 'connections' : data['connections'], 'difficulty' : data['difficulty'] })
103 def do_import(self,dbi,force):
104 # Only run if we have data
105 while force == True or self.q.qsize() >= settings.DB_LOADER_REC_MIN:
107 # Put together the data we want to import
110 while self.q.empty() == False and datacnt < settings.DB_LOADER_REC_MAX :
115 # try to do the import, if we fail, log the error and put the data back in the queue
117 log.info("Inserting %s Share Records",datacnt)
118 dbi.import_shares(sqldata)
119 except Exception as e:
120 log.error("Insert Share Records Failed: %s", e.args[0])
121 for k,v in enumerate(sqldata):
123 break # Allows us to sleep a little
125 def archive_shares(self,dbi):
126 found_time = dbi.archive_check()
129 log.info("Archiving shares newer than timestamp %f " % found_time)
130 dbi.archive_found(found_time)
131 if settings.ARCHIVE_MODE == 'db':
132 dbi.archive_to_db(found_time)
133 dbi.archive_cleanup(found_time)
134 elif settings.ARCHIVE_MODE == 'file':
135 shares = dbi.archive_get_shares(found_time)
137 filename = settings.ARCHIVE_FILE
138 if settings.ARCHIVE_FILE_APPEND_TIME :
139 filename = filename + "-" + datetime.fromtimestamp(found_time).strftime("%Y-%m-%d-%H-%M-%S")
140 filename = filename + ".csv"
142 if settings.ARCHIVE_FILE_COMPRESS == 'gzip' :
144 filename = filename + ".gz"
145 filehandle = gzip.open(filename, 'a')
146 elif settings.ARCHIVE_FILE_COMPRESS == 'bzip2' and settings.ARCHIVE_FILE_APPEND_TIME :
148 filename = filename + ".bz2"
149 filehandle = bz2.BZFile(filename, mode='wb', buffering=4096 )
151 filehandle = open(filename, "a")
154 row = shares.fetchone()
157 str1 = '","'.join([str(x) for x in row])
158 filehandle.write('"%s"\n' % str1)
164 dbi.archive_cleanup(found_time)
166 except Exception as e:
168 log.error("Archive Cleanup Failed... will retry to cleanup in 30 seconds")
173 def queue_share(self,data):
176 def found_block(self,data):
178 log.info("Updating Found Block Share Record")
179 self.do_import(self.dbi,True) # We can't Update if the record is not there.
180 self.dbi.found_block(data)
181 except Exception as e:
182 log.error("Update Found Block Share Record Failed: %s", e.args[0])
184 def check_password(self,username,password):
186 log.info("Rejected worker for blank username")
188 wid = username+":-:"+password
189 if wid in self.usercache :
191 elif self.dbi.check_password(username,password) :
192 self.usercache[wid] = 1
194 elif settings.USERS_AUTOADD == True :
195 self.insert_user(username,password)
196 self.usercache[wid] = 1
200 def insert_user(self,username,password):
201 return self.dbi.insert_user(username,password)
203 def delete_user(self,username):
205 return self.dbi.delete_user(username)
207 def update_user(self,username,password):
209 return self.dbi.update_user(username,password)
211 def update_worker_diff(self,username,diff):
212 return self.dbi.update_worker_diff(username,diff)
214 def get_pool_stats(self):
215 return self.dbi.get_pool_stats()
217 def get_workers_stats(self):
218 return self.dbi.get_workers_stats()
220 def clear_worker_diff(self):
221 return self.dbi.clear_worker_diff()