8f6edf339a19f9b0899a574bfe5401f507c2a853
[electrum-server.git] / abe_backend.py
1 from Abe.abe import hash_to_address, decode_check_address
2 from Abe.DataStore import DataStore as Datastore_class
3 from Abe import DataStore, readconf, BCDataStream,  deserialize, util, base58
4
5 import psycopg2, binascii
6
7 import thread, traceback, sys, urllib, operator
8 from json import dumps, loads
9 from Queue import Queue
10
11 class AbeStore(Datastore_class):
12
13     def __init__(self, config):
14         conf = DataStore.CONFIG_DEFAULTS
15         args, argv = readconf.parse_argv( [], conf)
16         args.dbtype = config.get('database','type')
17         if args.dbtype == 'sqlite3':
18             args.connect_args = { 'database' : config.get('database','database') }
19         elif args.dbtype == 'MySQLdb':
20             args.connect_args = { 'db' : config.get('database','database'), 'user' : config.get('database','username'), 'passwd' : config.get('database','password') }
21         elif args.dbtype == 'psycopg2':
22             args.connect_args = { 'database' : config.get('database','database') }
23
24         Datastore_class.__init__(self,args)
25
26         self.tx_cache = {}
27         self.mempool_keys = {}
28         self.bitcoind_url = 'http://%s:%s@%s:%s/' % ( config.get('bitcoind','user'), config.get('bitcoind','password'), config.get('bitcoind','host'), config.get('bitcoind','port'))
29
30         self.address_queue = Queue()
31
32         self.dblock = thread.allocate_lock()
33
34
35
36     def import_block(self, b, chain_ids=frozenset()):
37         block_id = super(AbeStore, self).import_block(b, chain_ids)
38         for pos in xrange(len(b['transactions'])):
39             tx = b['transactions'][pos]
40             if 'hash' not in tx:
41                 tx['hash'] = util.double_sha256(tx['tx'])
42             tx_id = self.tx_find_id_and_value(tx)
43             if tx_id:
44                 self.update_tx_cache(tx_id)
45             else:
46                 print "error: import_block: no tx_id"
47         return block_id
48
49
50     def update_tx_cache(self, txid):
51         inrows = self.get_tx_inputs(txid, False)
52         for row in inrows:
53             _hash = self.binout(row[6])
54             address = hash_to_address(chr(0), _hash)
55             if self.tx_cache.has_key(address):
56                 print "cache: invalidating", address
57                 self.tx_cache.pop(address)
58             self.address_queue.put(address)
59
60         outrows = self.get_tx_outputs(txid, False)
61         for row in outrows:
62             _hash = self.binout(row[6])
63             address = hash_to_address(chr(0), _hash)
64             if self.tx_cache.has_key(address):
65                 print "cache: invalidating", address
66                 self.tx_cache.pop(address)
67             self.address_queue.put(address)
68
69     def safe_sql(self,sql, params=(), lock=True):
70         try:
71             if lock: self.dblock.acquire()
72             ret = self.selectall(sql,params)
73             if lock: self.dblock.release()
74             return ret
75         except:
76             print "sql error", sql
77             return []
78
79     def get_tx_outputs(self, tx_id, lock=True):
80         return self.safe_sql("""SELECT
81                 txout.txout_pos,
82                 txout.txout_scriptPubKey,
83                 txout.txout_value,
84                 nexttx.tx_hash,
85                 nexttx.tx_id,
86                 txin.txin_pos,
87                 pubkey.pubkey_hash
88               FROM txout
89               LEFT JOIN txin ON (txin.txout_id = txout.txout_id)
90               LEFT JOIN pubkey ON (pubkey.pubkey_id = txout.pubkey_id)
91               LEFT JOIN tx nexttx ON (txin.tx_id = nexttx.tx_id)
92              WHERE txout.tx_id = %d 
93              ORDER BY txout.txout_pos
94         """%(tx_id), (), lock)
95
96     def get_tx_inputs(self, tx_id, lock=True):
97         return self.safe_sql(""" SELECT
98                 txin.txin_pos,
99                 txin.txin_scriptSig,
100                 txout.txout_value,
101                 COALESCE(prevtx.tx_hash, u.txout_tx_hash),
102                 prevtx.tx_id,
103                 COALESCE(txout.txout_pos, u.txout_pos),
104                 pubkey.pubkey_hash
105               FROM txin
106               LEFT JOIN txout ON (txout.txout_id = txin.txout_id)
107               LEFT JOIN pubkey ON (pubkey.pubkey_id = txout.pubkey_id)
108               LEFT JOIN tx prevtx ON (txout.tx_id = prevtx.tx_id)
109               LEFT JOIN unlinked_txin u ON (u.txin_id = txin.txin_id)
110              WHERE txin.tx_id = %d
111              ORDER BY txin.txin_pos
112              """%(tx_id,), (), lock)
113
114     def get_address_out_rows(self, dbhash):
115         return self.safe_sql(""" SELECT
116                 b.block_nTime,
117                 cc.chain_id,
118                 b.block_height,
119                 1,
120                 b.block_hash,
121                 tx.tx_hash,
122                 tx.tx_id,
123                 txin.txin_pos,
124                 -prevout.txout_value
125               FROM chain_candidate cc
126               JOIN block b ON (b.block_id = cc.block_id)
127               JOIN block_tx ON (block_tx.block_id = b.block_id)
128               JOIN tx ON (tx.tx_id = block_tx.tx_id)
129               JOIN txin ON (txin.tx_id = tx.tx_id)
130               JOIN txout prevout ON (txin.txout_id = prevout.txout_id)
131               JOIN pubkey ON (pubkey.pubkey_id = prevout.pubkey_id)
132              WHERE pubkey.pubkey_hash = ?
133                AND cc.in_longest = 1""", (dbhash,))
134
135     def get_address_out_rows_memorypool(self, dbhash):
136         return self.safe_sql(""" SELECT
137                 1,
138                 tx.tx_hash,
139                 tx.tx_id,
140                 txin.txin_pos,
141                 -prevout.txout_value
142               FROM tx 
143               JOIN txin ON (txin.tx_id = tx.tx_id)
144               JOIN txout prevout ON (txin.txout_id = prevout.txout_id)
145               JOIN pubkey ON (pubkey.pubkey_id = prevout.pubkey_id)
146              WHERE pubkey.pubkey_hash = ? """, (dbhash,))
147
148     def get_address_in_rows(self, dbhash):
149         return self.safe_sql(""" SELECT
150                 b.block_nTime,
151                 cc.chain_id,
152                 b.block_height,
153                 0,
154                 b.block_hash,
155                 tx.tx_hash,
156                 tx.tx_id,
157                 txout.txout_pos,
158                 txout.txout_value
159               FROM chain_candidate cc
160               JOIN block b ON (b.block_id = cc.block_id)
161               JOIN block_tx ON (block_tx.block_id = b.block_id)
162               JOIN tx ON (tx.tx_id = block_tx.tx_id)
163               JOIN txout ON (txout.tx_id = tx.tx_id)
164               JOIN pubkey ON (pubkey.pubkey_id = txout.pubkey_id)
165              WHERE pubkey.pubkey_hash = ?
166                AND cc.in_longest = 1""", (dbhash,))
167
168     def get_address_in_rows_memorypool(self, dbhash):
169         return self.safe_sql( """ SELECT
170                 0,
171                 tx.tx_hash,
172                 tx.tx_id,
173                 txout.txout_pos,
174                 txout.txout_value
175               FROM tx
176               JOIN txout ON (txout.tx_id = tx.tx_id)
177               JOIN pubkey ON (pubkey.pubkey_id = txout.pubkey_id)
178              WHERE pubkey.pubkey_hash = ? """, (dbhash,))
179
180     def get_history(self, addr):
181         
182         cached_version = self.tx_cache.get( addr )
183         if cached_version is not None:
184             return cached_version
185
186         version, binaddr = decode_check_address(addr)
187         if binaddr is None:
188             return None
189
190         dbhash = self.binin(binaddr)
191         rows = []
192         rows += self.get_address_out_rows( dbhash )
193         rows += self.get_address_in_rows( dbhash )
194
195         txpoints = []
196         known_tx = []
197
198         for row in rows:
199             try:
200                 nTime, chain_id, height, is_in, blk_hash, tx_hash, tx_id, pos, value = row
201             except:
202                 print "cannot unpack row", row
203                 break
204             tx_hash = self.hashout_hex(tx_hash)
205             txpoint = {
206                     "nTime":    int(nTime),
207                     "height":   int(height),
208                     "is_in":    int(is_in),
209                     "blk_hash": self.hashout_hex(blk_hash),
210                     "tx_hash":  tx_hash,
211                     "tx_id":    int(tx_id),
212                     "pos":      int(pos),
213                     "value":    int(value),
214                     }
215
216             txpoints.append(txpoint)
217             known_tx.append(self.hashout_hex(tx_hash))
218
219
220         # todo: sort them really...
221         txpoints = sorted(txpoints, key=operator.itemgetter("nTime"))
222
223         # read memory pool
224         rows = []
225         rows += self.get_address_in_rows_memorypool( dbhash )
226         rows += self.get_address_out_rows_memorypool( dbhash )
227         address_has_mempool = False
228
229         for row in rows:
230             is_in, tx_hash, tx_id, pos, value = row
231             tx_hash = self.hashout_hex(tx_hash)
232             if tx_hash in known_tx:
233                 continue
234
235             # this means that pending transactions were added to the db, even if they are not returned by getmemorypool
236             address_has_mempool = True
237
238             # this means pending transactions are returned by getmemorypool
239             if tx_hash not in self.mempool_keys:
240                 continue
241
242             #print "mempool", tx_hash
243             txpoint = {
244                     "nTime":    0,
245                     "height":   0,
246                     "is_in":    int(is_in),
247                     "blk_hash": 'mempool', 
248                     "tx_hash":  tx_hash,
249                     "tx_id":    int(tx_id),
250                     "pos":      int(pos),
251                     "value":    int(value),
252                     }
253             txpoints.append(txpoint)
254
255
256         for txpoint in txpoints:
257             tx_id = txpoint['tx_id']
258             
259             txinputs = []
260             inrows = self.get_tx_inputs(tx_id)
261             for row in inrows:
262                 _hash = self.binout(row[6])
263                 address = hash_to_address(chr(0), _hash)
264                 txinputs.append(address)
265             txpoint['inputs'] = txinputs
266             txoutputs = []
267             outrows = self.get_tx_outputs(tx_id)
268             for row in outrows:
269                 _hash = self.binout(row[6])
270                 address = hash_to_address(chr(0), _hash)
271                 txoutputs.append(address)
272             txpoint['outputs'] = txoutputs
273
274             # for all unspent inputs, I want their scriptpubkey. (actually I could deduce it from the address)
275             if not txpoint['is_in']:
276                 # detect if already redeemed...
277                 for row in outrows:
278                     if row[6] == dbhash: break
279                 else:
280                     raise
281                 #row = self.get_tx_output(tx_id,dbhash)
282                 # pos, script, value, o_hash, o_id, o_pos, binaddr = row
283                 # if not redeemed, we add the script
284                 if row:
285                     if not row[4]: txpoint['raw_scriptPubKey'] = row[1]
286
287         # cache result
288         if not address_has_mempool:
289             self.tx_cache[addr] = txpoints
290         
291         return txpoints
292
293
294     def get_status(self,addr):
295         # get address status, i.e. the last block for that address.
296         tx_points = self.get_history(addr)
297         if not tx_points:
298             status = None
299         else:
300             lastpoint = tx_points[-1]
301             status = lastpoint['blk_hash']
302             # this is a temporary hack; move it up once old clients have disappeared
303             if status == 'mempool': # and session['version'] != "old":
304                 status = status + ':%d'% len(tx_points)
305         return status
306
307
308
309     def memorypool_update(store):
310
311         ds = BCDataStream.BCDataStream()
312         previous_transactions = store.mempool_keys
313         store.mempool_keys = []
314
315         postdata = dumps({"method": 'getmemorypool', 'params': [], 'id':'jsonrpc'})
316
317         respdata = urllib.urlopen(store.bitcoind_url, postdata).read()
318         r = loads(respdata)
319         if r['error'] != None:
320             return
321
322         v = r['result'].get('transactions')
323         for hextx in v:
324             ds.clear()
325             ds.write(hextx.decode('hex'))
326             tx = deserialize.parse_Transaction(ds)
327             tx['hash'] = util.double_sha256(tx['tx'])
328             tx_hash = store.hashin(tx['hash'])
329
330             store.mempool_keys.append(tx_hash)
331             if store.tx_find_id_and_value(tx):
332                 pass
333             else:
334                 tx_id = store.import_tx(tx, False)
335                 store.update_tx_cache(tx_id)
336     
337         store.commit()
338
339
340     def send_tx(self,tx):
341         postdata = dumps({"method": 'importtransaction', 'params': [tx], 'id':'jsonrpc'})
342         respdata = urllib.urlopen(self.bitcoind_url, postdata).read()
343         r = loads(respdata)
344         if r['error'] != None:
345             out = "error: transaction rejected by memorypool\n"+tx
346         else:
347             out = r['result']
348         return out
349
350
351     def main_iteration(store):
352         try:
353             store.dblock.acquire()
354             store.catch_up()
355             store.memorypool_update()
356             block_number = store.get_block_number(1)
357
358         except IOError:
359             print "IOError: cannot reach bitcoind"
360             block_number = 0
361         except:
362             traceback.print_exc(file=sys.stdout)
363             block_number = 0
364         finally:
365             store.dblock.release()
366
367         return block_number