Mysql share reporting fixed. Update config template.
[stratum-mining.git] / mining / DBInterface.py
1 from twisted.internet import reactor, defer
2 import time
3 from datetime import datetime
4 import Queue
5
6 from stratum import settings
7
8 import stratum.logger
9 log = stratum.logger.get_logger('DBInterface')
10
11 class DBInterface():
12     def __init__(self):
13         self.dbi = self.connectDB()
14
15     def init_main(self):
16         self.dbi.check_tables()
17  
18         self.q = Queue.Queue()
19         self.queueclock = None
20
21         self.usercache = {}
22         self.clearusercache()
23
24         self.nextStatsUpdate = 0
25
26         self.scheduleImport()
27
28     def set_bitcoinrpc(self,bitcoinrpc):
29         self.bitcoinrpc=bitcoinrpc
30
31     def connectDB(self):
32         # Choose our database driver and put it in self.dbi
33         if settings.DATABASE_DRIVER == "sqlite":
34                 log.debug('DB_Sqlite INIT')
35                 import DB_Sqlite
36                 return DB_Sqlite.DB_Sqlite()
37         elif settings.DATABASE_DRIVER == "mysql":
38                 log.debug('DB_Mysql INIT')
39                 import DB_Mysql
40                 return DB_Mysql.DB_Mysql()
41         elif settings.DATABASE_DRIVER == "postgresql":
42                 log.debug('DB_Postgresql INIT')
43                 import DB_Postgresql
44                 return DB_Postgresql.DB_Postgresql()
45         elif settings.DATABASE_DRIVER == "none":
46                 log.debug('DB_None INIT')
47                 import DB_None
48                 return DB_None.DB_None()
49         else:
50                 log.error('Invalid DATABASE_DRIVER -- using NONE')
51                 log.debug('DB_None INIT')
52                 import DB_None
53                 return DB_None.DB_None()
54
55     def clearusercache(self):
56         self.usercache = {}
57         self.usercacheclock = reactor.callLater( settings.DB_USERCACHE_TIME , self.clearusercache)
58
59     def scheduleImport(self):
60         # This schedule's the Import
61         use_thread = True
62         if settings.DATABASE_DRIVER == "sqlite":
63             use_thread = False
64         
65         if use_thread:
66             self.queueclock = reactor.callLater( settings.DB_LOADER_CHECKTIME , self.run_import_thread)
67         else:
68             self.queueclock = reactor.callLater( settings.DB_LOADER_CHECKTIME , self.run_import)
69     
70     def run_import_thread(self):
71         if self.q.qsize() >= settings.DB_LOADER_REC_MIN:        # Don't incur thread overhead if we're not going to run
72                 reactor.callInThread(self.import_thread)
73         self.scheduleImport()
74
75     def run_import(self):
76         self.do_import(self.dbi,False)
77         if settings.DATABASE_EXTEND and time.time() > self.nextStatsUpdate :
78             self.nextStatsUpdate = time.time() + settings.DB_STATS_AVG_TIME
79             self.dbi.updateStats(settings.DB_STATS_AVG_TIME)
80             d = self.bitcoinrpc.getinfo()
81             d.addCallback(self._update_pool_info)
82             if settings.ARCHIVE_SHARES :
83                 self.archive_shares(self.dbi)
84         self.scheduleImport()
85
86     def import_thread(self):
87         # Here we are in the thread.
88         dbi = self.connectDB()  
89         self.do_import(dbi,False)
90         if settings.DATABASE_EXTEND and time.time() > self.nextStatsUpdate :
91             self.nextStatsUpdate = time.time() + settings.DB_STATS_AVG_TIME
92             dbi.updateStats(settings.DB_STATS_AVG_TIME)
93             d = self.bitcoinrpc.getinfo()
94             d.addCallback(self._update_pool_info)
95             if settings.ARCHIVE_SHARES :
96                 self.archive_shares(dbi)
97         dbi.close()
98
99     def _update_pool_info(self,data):
100         self.dbi.update_pool_info({ 'blocks' : data['blocks'], 'balance' : data['balance'], 
101                 'connections' : data['connections'], 'difficulty' : data['difficulty'] })
102
103     def do_import(self,dbi,force):
104         # Only run if we have data
105         while force == True or self.q.qsize() >= settings.DB_LOADER_REC_MIN:
106             force = False
107             # Put together the data we want to import
108             sqldata = []
109             datacnt = 0
110             while self.q.empty() == False and datacnt < settings.DB_LOADER_REC_MAX :
111                 datacnt += 1
112                 data = self.q.get()
113                 sqldata.append(data)
114                 self.q.task_done()
115             # try to do the import, if we fail, log the error and put the data back in the queue
116             try:
117                 log.info("Inserting %s Share Records",datacnt)
118                 dbi.import_shares(sqldata)
119             except Exception as e:
120                 log.error("Insert Share Records Failed: %s", e.args[0])
121                 for k,v in enumerate(sqldata):
122                     self.q.put(v)
123                 break           # Allows us to sleep a little
124
125     def archive_shares(self,dbi):
126         found_time = dbi.archive_check()
127         if found_time == 0:
128             return False
129         log.info("Archiving shares newer than timestamp %f " % found_time)
130         dbi.archive_found(found_time)
131         if settings.ARCHIVE_MODE == 'db':
132             dbi.archive_to_db(found_time)
133             dbi.archive_cleanup(found_time)
134         elif settings.ARCHIVE_MODE == 'file':
135             shares = dbi.archive_get_shares(found_time)
136
137             filename = settings.ARCHIVE_FILE
138             if settings.ARCHIVE_FILE_APPEND_TIME :
139                 filename = filename + "-" + datetime.fromtimestamp(found_time).strftime("%Y-%m-%d-%H-%M-%S")
140             filename = filename + ".csv"
141
142             if settings.ARCHIVE_FILE_COMPRESS == 'gzip' :
143                 import gzip
144                 filename = filename + ".gz"
145                 filehandle = gzip.open(filename, 'a')   
146             elif settings.ARCHIVE_FILE_COMPRESS == 'bzip2' and settings.ARCHIVE_FILE_APPEND_TIME :
147                 import bz2
148                 filename = filename + ".bz2"
149                 filehandle = bz2.BZFile(filename, mode='wb', buffering=4096 )
150             else:
151                 filehandle = open(filename, "a")
152
153             while True: 
154                 row = shares.fetchone()
155                 if row == None:
156                     break
157                 str1 = '","'.join([str(x) for x in row])
158                 filehandle.write('"%s"\n' % str1)
159             filehandle.close()
160
161             clean = False
162             while not clean:
163                 try:
164                     dbi.archive_cleanup(found_time)
165                     clean = True
166                 except Exception as e:
167                     clean = False
168                     log.error("Archive Cleanup Failed... will retry to cleanup in 30 seconds")
169                     sleep(30)
170                 
171         return True
172
173     def queue_share(self,data):
174         self.q.put( data )
175
176     def found_block(self,data):
177         try:
178             log.info("Updating Found Block Share Record")
179             self.do_import(self.dbi,True)       # We can't Update if the record is not there.
180             self.dbi.found_block(data)
181         except Exception as e:
182             log.error("Update Found Block Share Record Failed: %s", e.args[0])
183
184     def check_password(self,username,password):
185         if username == "":
186             log.info("Rejected worker for blank username")
187             return False
188         wid = username+":-:"+password
189         if wid in self.usercache :
190             return True
191         elif self.dbi.check_password(username,password) :
192             self.usercache[wid] = 1
193             return True
194         elif settings.USERS_AUTOADD == True :
195             self.insert_user(username,password)
196             self.usercache[wid] = 1
197             return True
198         return False
199
200     def insert_user(self,username,password):    
201         return self.dbi.insert_user(username,password)
202
203     def delete_user(self,username):
204         self.usercache = {}
205         return self.dbi.delete_user(username)
206         
207     def update_user(self,username,password):
208         self.usercache = {}
209         return self.dbi.update_user(username,password)
210
211     def update_worker_diff(self,username,diff):
212         return self.dbi.update_worker_diff(username,diff)
213
214     def get_pool_stats(self):
215         return self.dbi.get_pool_stats()
216     
217     def get_workers_stats(self):
218         return self.dbi.get_workers_stats()
219
220     def clear_worker_diff(self):
221         return self.dbi.clear_worker_diff()
222