licence
[electrum-server.git] / db.py
1 #!/usr/bin/env python
2 # Copyright(C) 2012 thomasv@gitorious
3
4 # This program is free software: you can redistribute it and/or modify
5 # it under the terms of the GNU Affero General Public License as
6 # published by the Free Software Foundation, either version 3 of the
7 # License, or (at your option) any later version.
8 #
9 # This program is distributed in the hope that it will be useful, but
10 # WITHOUT ANY WARRANTY; without even the implied warranty of
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
12 # Affero General Public License for more details.
13 #
14 # You should have received a copy of the GNU Affero General Public
15 # License along with this program.  If not, see
16 # <http://www.gnu.org/licenses/agpl.html>.
17
18 from Abe.abe import hash_to_address, decode_check_address
19 from Abe.DataStore import DataStore as Datastore_class
20 from Abe import DataStore, readconf, BCDataStream,  deserialize, util, base58
21
22 import psycopg2, binascii
23
24 import thread, traceback, sys, urllib, operator
25 from json import dumps, loads
26
27 dblock = thread.allocate_lock()
28
29 class MyStore(Datastore_class):
30
31     def __init__(self,config, address_queue):
32         conf = DataStore.CONFIG_DEFAULTS
33         args, argv = readconf.parse_argv( [], conf)
34         args.dbtype = config.get('database','type')
35         if args.dbtype == 'sqlite3':
36             args.connect_args = { 'database' : config.get('database','database') }
37         elif args.dbtype == 'MySQLdb':
38             args.connect_args = { 'db' : config.get('database','database'), 'user' : config.get('database','username'), 'passwd' : config.get('database','password') }
39         elif args.dbtype == 'psycopg2':
40             args.connect_args = { 'database' : config.get('database','database') }
41
42         Datastore_class.__init__(self,args)
43
44         self.tx_cache = {}
45         self.mempool_keys = {}
46         self.bitcoind_url = 'http://%s:%s@%s:%s/' % ( config.get('bitcoind','user'), config.get('bitcoind','password'), config.get('bitcoind','host'), config.get('bitcoind','port'))
47
48         self.address_queue = address_queue
49
50
51
52     def import_block(self, b, chain_ids=frozenset()):
53         block_id = super(MyStore, self).import_block(b, chain_ids)
54         #print "block", block_id
55         for pos in xrange(len(b['transactions'])):
56             tx = b['transactions'][pos]
57             if 'hash' not in tx:
58                 tx['hash'] = util.double_sha256(tx['tx'])
59             tx_id = store.tx_find_id_and_value(tx)
60             if tx_id:
61                 self.update_tx_cache(tx_id)
62             else:
63                 print "error: import_block: no tx_id"
64         return block_id
65
66
67     def update_tx_cache(self, txid):
68         inrows = self.get_tx_inputs(txid, False)
69         for row in inrows:
70             _hash = self.binout(row[6])
71             address = hash_to_address(chr(0), _hash)
72             if self.tx_cache.has_key(address):
73                 print "cache: invalidating", address
74                 self.tx_cache.pop(address)
75             self.address_queue.put(address)
76
77         outrows = self.get_tx_outputs(txid, False)
78         for row in outrows:
79             _hash = self.binout(row[6])
80             address = hash_to_address(chr(0), _hash)
81             if self.tx_cache.has_key(address):
82                 print "cache: invalidating", address
83                 self.tx_cache.pop(address)
84             self.address_queue.put(address)
85
86     def safe_sql(self,sql, params=(), lock=True):
87         try:
88             if lock: dblock.acquire()
89             ret = self.selectall(sql,params)
90             if lock: dblock.release()
91             return ret
92         except:
93             print "sql error", sql
94             return []
95
96     def get_tx_outputs(self, tx_id, lock=True):
97         return self.safe_sql("""SELECT
98                 txout.txout_pos,
99                 txout.txout_scriptPubKey,
100                 txout.txout_value,
101                 nexttx.tx_hash,
102                 nexttx.tx_id,
103                 txin.txin_pos,
104                 pubkey.pubkey_hash
105               FROM txout
106               LEFT JOIN txin ON (txin.txout_id = txout.txout_id)
107               LEFT JOIN pubkey ON (pubkey.pubkey_id = txout.pubkey_id)
108               LEFT JOIN tx nexttx ON (txin.tx_id = nexttx.tx_id)
109              WHERE txout.tx_id = %d 
110              ORDER BY txout.txout_pos
111         """%(tx_id), (), lock)
112
113     def get_tx_inputs(self, tx_id, lock=True):
114         return self.safe_sql(""" SELECT
115                 txin.txin_pos,
116                 txin.txin_scriptSig,
117                 txout.txout_value,
118                 COALESCE(prevtx.tx_hash, u.txout_tx_hash),
119                 prevtx.tx_id,
120                 COALESCE(txout.txout_pos, u.txout_pos),
121                 pubkey.pubkey_hash
122               FROM txin
123               LEFT JOIN txout ON (txout.txout_id = txin.txout_id)
124               LEFT JOIN pubkey ON (pubkey.pubkey_id = txout.pubkey_id)
125               LEFT JOIN tx prevtx ON (txout.tx_id = prevtx.tx_id)
126               LEFT JOIN unlinked_txin u ON (u.txin_id = txin.txin_id)
127              WHERE txin.tx_id = %d
128              ORDER BY txin.txin_pos
129              """%(tx_id,), (), lock)
130
131     def get_address_out_rows(self, dbhash):
132         return self.safe_sql(""" SELECT
133                 b.block_nTime,
134                 cc.chain_id,
135                 b.block_height,
136                 1,
137                 b.block_hash,
138                 tx.tx_hash,
139                 tx.tx_id,
140                 txin.txin_pos,
141                 -prevout.txout_value
142               FROM chain_candidate cc
143               JOIN block b ON (b.block_id = cc.block_id)
144               JOIN block_tx ON (block_tx.block_id = b.block_id)
145               JOIN tx ON (tx.tx_id = block_tx.tx_id)
146               JOIN txin ON (txin.tx_id = tx.tx_id)
147               JOIN txout prevout ON (txin.txout_id = prevout.txout_id)
148               JOIN pubkey ON (pubkey.pubkey_id = prevout.pubkey_id)
149              WHERE pubkey.pubkey_hash = ?
150                AND cc.in_longest = 1""", (dbhash,))
151
152     def get_address_out_rows_memorypool(self, dbhash):
153         return self.safe_sql(""" SELECT
154                 1,
155                 tx.tx_hash,
156                 tx.tx_id,
157                 txin.txin_pos,
158                 -prevout.txout_value
159               FROM tx 
160               JOIN txin ON (txin.tx_id = tx.tx_id)
161               JOIN txout prevout ON (txin.txout_id = prevout.txout_id)
162               JOIN pubkey ON (pubkey.pubkey_id = prevout.pubkey_id)
163              WHERE pubkey.pubkey_hash = ? """, (dbhash,))
164
165     def get_address_in_rows(self, dbhash):
166         return self.safe_sql(""" SELECT
167                 b.block_nTime,
168                 cc.chain_id,
169                 b.block_height,
170                 0,
171                 b.block_hash,
172                 tx.tx_hash,
173                 tx.tx_id,
174                 txout.txout_pos,
175                 txout.txout_value
176               FROM chain_candidate cc
177               JOIN block b ON (b.block_id = cc.block_id)
178               JOIN block_tx ON (block_tx.block_id = b.block_id)
179               JOIN tx ON (tx.tx_id = block_tx.tx_id)
180               JOIN txout ON (txout.tx_id = tx.tx_id)
181               JOIN pubkey ON (pubkey.pubkey_id = txout.pubkey_id)
182              WHERE pubkey.pubkey_hash = ?
183                AND cc.in_longest = 1""", (dbhash,))
184
185     def get_address_in_rows_memorypool(self, dbhash):
186         return self.safe_sql( """ SELECT
187                 0,
188                 tx.tx_hash,
189                 tx.tx_id,
190                 txout.txout_pos,
191                 txout.txout_value
192               FROM tx
193               JOIN txout ON (txout.tx_id = tx.tx_id)
194               JOIN pubkey ON (pubkey.pubkey_id = txout.pubkey_id)
195              WHERE pubkey.pubkey_hash = ? """, (dbhash,))
196
197     def get_history(self, addr):
198         
199         cached_version = self.tx_cache.get( addr )
200         if cached_version is not None:
201             return cached_version
202
203         version, binaddr = decode_check_address(addr)
204         if binaddr is None:
205             return None
206
207         dbhash = self.binin(binaddr)
208         rows = []
209         rows += self.get_address_out_rows( dbhash )
210         rows += self.get_address_in_rows( dbhash )
211
212         txpoints = []
213         known_tx = []
214
215         for row in rows:
216             try:
217                 nTime, chain_id, height, is_in, blk_hash, tx_hash, tx_id, pos, value = row
218             except:
219                 print "cannot unpack row", row
220                 break
221             tx_hash = self.hashout_hex(tx_hash)
222             txpoint = {
223                     "nTime":    int(nTime),
224                     "height":   int(height),
225                     "is_in":    int(is_in),
226                     "blk_hash": self.hashout_hex(blk_hash),
227                     "tx_hash":  tx_hash,
228                     "tx_id":    int(tx_id),
229                     "pos":      int(pos),
230                     "value":    int(value),
231                     }
232
233             txpoints.append(txpoint)
234             known_tx.append(self.hashout_hex(tx_hash))
235
236
237         # todo: sort them really...
238         txpoints = sorted(txpoints, key=operator.itemgetter("nTime"))
239
240         # read memory pool
241         rows = []
242         rows += self.get_address_in_rows_memorypool( dbhash )
243         rows += self.get_address_out_rows_memorypool( dbhash )
244         address_has_mempool = False
245
246         for row in rows:
247             is_in, tx_hash, tx_id, pos, value = row
248             tx_hash = self.hashout_hex(tx_hash)
249             if tx_hash in known_tx:
250                 continue
251
252             # this means that pending transactions were added to the db, even if they are not returned by getmemorypool
253             address_has_mempool = True
254
255             # this means pending transactions are returned by getmemorypool
256             if tx_hash not in self.mempool_keys:
257                 continue
258
259             #print "mempool", tx_hash
260             txpoint = {
261                     "nTime":    0,
262                     "height":   0,
263                     "is_in":    int(is_in),
264                     "blk_hash": 'mempool', 
265                     "tx_hash":  tx_hash,
266                     "tx_id":    int(tx_id),
267                     "pos":      int(pos),
268                     "value":    int(value),
269                     }
270             txpoints.append(txpoint)
271
272
273         for txpoint in txpoints:
274             tx_id = txpoint['tx_id']
275             
276             txinputs = []
277             inrows = self.get_tx_inputs(tx_id)
278             for row in inrows:
279                 _hash = self.binout(row[6])
280                 address = hash_to_address(chr(0), _hash)
281                 txinputs.append(address)
282             txpoint['inputs'] = txinputs
283             txoutputs = []
284             outrows = self.get_tx_outputs(tx_id)
285             for row in outrows:
286                 _hash = self.binout(row[6])
287                 address = hash_to_address(chr(0), _hash)
288                 txoutputs.append(address)
289             txpoint['outputs'] = txoutputs
290
291             # for all unspent inputs, I want their scriptpubkey. (actually I could deduce it from the address)
292             if not txpoint['is_in']:
293                 # detect if already redeemed...
294                 for row in outrows:
295                     if row[6] == dbhash: break
296                 else:
297                     raise
298                 #row = self.get_tx_output(tx_id,dbhash)
299                 # pos, script, value, o_hash, o_id, o_pos, binaddr = row
300                 # if not redeemed, we add the script
301                 if row:
302                     if not row[4]: txpoint['raw_scriptPubKey'] = row[1]
303
304         # cache result
305         if not address_has_mempool:
306             self.tx_cache[addr] = txpoints
307         
308         return txpoints
309
310
311
312     def memorypool_update(store):
313
314         ds = BCDataStream.BCDataStream()
315         previous_transactions = store.mempool_keys
316         store.mempool_keys = []
317
318         postdata = dumps({"method": 'getmemorypool', 'params': [], 'id':'jsonrpc'})
319
320         respdata = urllib.urlopen(store.bitcoind_url, postdata).read()
321         r = loads(respdata)
322         if r['error'] != None:
323             return
324
325         v = r['result'].get('transactions')
326         for hextx in v:
327             ds.clear()
328             ds.write(hextx.decode('hex'))
329             tx = deserialize.parse_Transaction(ds)
330             tx['hash'] = util.double_sha256(tx['tx'])
331             tx_hash = store.hashin(tx['hash'])
332
333             store.mempool_keys.append(tx_hash)
334             if store.tx_find_id_and_value(tx):
335                 pass
336             else:
337                 tx_id = store.import_tx(tx, False)
338                 store.update_tx_cache(tx_id)
339
340         store.commit()
341
342
343     def main_iteration(store):
344         try:
345             dblock.acquire()
346             store.catch_up()
347             store.memorypool_update()
348
349             block_number = store.get_block_number(1)
350
351         except IOError:
352             print "IOError: cannot reach bitcoind"
353             block_number = 0
354         except:
355             traceback.print_exc(file=sys.stdout)
356             block_number = 0
357         finally:
358             dblock.release()
359
360         return block_number
361
362