2 # Copyright(C) 2012 thomasv@gitorious
4 # This program is free software: you can redistribute it and/or modify
5 # it under the terms of the GNU Affero General Public License as
6 # published by the Free Software Foundation, either version 3 of the
7 # License, or (at your option) any later version.
9 # This program is distributed in the hope that it will be useful, but
10 # WITHOUT ANY WARRANTY; without even the implied warranty of
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
12 # Affero General Public License for more details.
14 # You should have received a copy of the GNU Affero General Public
15 # License along with this program. If not, see
16 # <http://www.gnu.org/licenses/agpl.html>.
18 from Abe.abe import hash_to_address, decode_check_address
19 from Abe.DataStore import DataStore as Datastore_class
20 from Abe import DataStore, readconf, BCDataStream, deserialize, util, base58
22 import psycopg2, binascii
24 import thread, traceback, sys, urllib, operator
25 from json import dumps, loads
28 class MyStore(Datastore_class):
30 def __init__(self, config, address_queue):
31 conf = DataStore.CONFIG_DEFAULTS
32 args, argv = readconf.parse_argv( [], conf)
33 args.dbtype = config.get('database','type')
34 if args.dbtype == 'sqlite3':
35 args.connect_args = { 'database' : config.get('database','database') }
36 elif args.dbtype == 'MySQLdb':
37 args.connect_args = { 'db' : config.get('database','database'), 'user' : config.get('database','username'), 'passwd' : config.get('database','password') }
38 elif args.dbtype == 'psycopg2':
39 args.connect_args = { 'database' : config.get('database','database') }
41 Datastore_class.__init__(self,args)
44 self.mempool_keys = {}
45 self.bitcoind_url = 'http://%s:%s@%s:%s/' % ( config.get('bitcoind','user'), config.get('bitcoind','password'), config.get('bitcoind','host'), config.get('bitcoind','port'))
47 self.address_queue = address_queue
48 self.dblock = thread.allocate_lock()
52 def import_block(self, b, chain_ids=frozenset()):
53 block_id = super(MyStore, self).import_block(b, chain_ids)
54 print "import block", block_id
55 for pos in xrange(len(b['transactions'])):
56 tx = b['transactions'][pos]
58 tx['hash'] = util.double_sha256(tx['tx'])
59 tx_id = store.tx_find_id_and_value(tx)
61 self.update_tx_cache(tx_id)
63 print "error: import_block: no tx_id"
67 def update_tx_cache(self, txid):
68 inrows = self.get_tx_inputs(txid, False)
70 _hash = self.binout(row[6])
71 address = hash_to_address(chr(0), _hash)
72 if self.tx_cache.has_key(address):
73 print "cache: invalidating", address
74 self.tx_cache.pop(address)
75 self.address_queue.put(address)
77 outrows = self.get_tx_outputs(txid, False)
79 _hash = self.binout(row[6])
80 address = hash_to_address(chr(0), _hash)
81 if self.tx_cache.has_key(address):
82 print "cache: invalidating", address
83 self.tx_cache.pop(address)
84 self.address_queue.put(address)
86 def safe_sql(self,sql, params=(), lock=True):
88 if lock: self.dblock.acquire()
89 ret = self.selectall(sql,params)
90 if lock: self.dblock.release()
93 print "sql error", sql
96 def get_tx_outputs(self, tx_id, lock=True):
97 return self.safe_sql("""SELECT
99 txout.txout_scriptPubKey,
106 LEFT JOIN txin ON (txin.txout_id = txout.txout_id)
107 LEFT JOIN pubkey ON (pubkey.pubkey_id = txout.pubkey_id)
108 LEFT JOIN tx nexttx ON (txin.tx_id = nexttx.tx_id)
109 WHERE txout.tx_id = %d
110 ORDER BY txout.txout_pos
111 """%(tx_id), (), lock)
113 def get_tx_inputs(self, tx_id, lock=True):
114 return self.safe_sql(""" SELECT
118 COALESCE(prevtx.tx_hash, u.txout_tx_hash),
120 COALESCE(txout.txout_pos, u.txout_pos),
123 LEFT JOIN txout ON (txout.txout_id = txin.txout_id)
124 LEFT JOIN pubkey ON (pubkey.pubkey_id = txout.pubkey_id)
125 LEFT JOIN tx prevtx ON (txout.tx_id = prevtx.tx_id)
126 LEFT JOIN unlinked_txin u ON (u.txin_id = txin.txin_id)
127 WHERE txin.tx_id = %d
128 ORDER BY txin.txin_pos
129 """%(tx_id,), (), lock)
131 def get_address_out_rows(self, dbhash):
132 return self.safe_sql(""" SELECT
142 FROM chain_candidate cc
143 JOIN block b ON (b.block_id = cc.block_id)
144 JOIN block_tx ON (block_tx.block_id = b.block_id)
145 JOIN tx ON (tx.tx_id = block_tx.tx_id)
146 JOIN txin ON (txin.tx_id = tx.tx_id)
147 JOIN txout prevout ON (txin.txout_id = prevout.txout_id)
148 JOIN pubkey ON (pubkey.pubkey_id = prevout.pubkey_id)
149 WHERE pubkey.pubkey_hash = ?
150 AND cc.in_longest = 1""", (dbhash,))
152 def get_address_out_rows_memorypool(self, dbhash):
153 return self.safe_sql(""" SELECT
160 JOIN txin ON (txin.tx_id = tx.tx_id)
161 JOIN txout prevout ON (txin.txout_id = prevout.txout_id)
162 JOIN pubkey ON (pubkey.pubkey_id = prevout.pubkey_id)
163 WHERE pubkey.pubkey_hash = ? """, (dbhash,))
165 def get_address_in_rows(self, dbhash):
166 return self.safe_sql(""" SELECT
176 FROM chain_candidate cc
177 JOIN block b ON (b.block_id = cc.block_id)
178 JOIN block_tx ON (block_tx.block_id = b.block_id)
179 JOIN tx ON (tx.tx_id = block_tx.tx_id)
180 JOIN txout ON (txout.tx_id = tx.tx_id)
181 JOIN pubkey ON (pubkey.pubkey_id = txout.pubkey_id)
182 WHERE pubkey.pubkey_hash = ?
183 AND cc.in_longest = 1""", (dbhash,))
185 def get_address_in_rows_memorypool(self, dbhash):
186 return self.safe_sql( """ SELECT
193 JOIN txout ON (txout.tx_id = tx.tx_id)
194 JOIN pubkey ON (pubkey.pubkey_id = txout.pubkey_id)
195 WHERE pubkey.pubkey_hash = ? """, (dbhash,))
197 def get_history(self, addr):
199 cached_version = self.tx_cache.get( addr )
200 if cached_version is not None:
201 return cached_version
203 version, binaddr = decode_check_address(addr)
207 dbhash = self.binin(binaddr)
209 rows += self.get_address_out_rows( dbhash )
210 rows += self.get_address_in_rows( dbhash )
217 nTime, chain_id, height, is_in, blk_hash, tx_hash, tx_id, pos, value = row
219 print "cannot unpack row", row
221 tx_hash = self.hashout_hex(tx_hash)
224 "height": int(height),
226 "blk_hash": self.hashout_hex(blk_hash),
233 txpoints.append(txpoint)
234 known_tx.append(self.hashout_hex(tx_hash))
237 # todo: sort them really...
238 txpoints = sorted(txpoints, key=operator.itemgetter("nTime"))
242 rows += self.get_address_in_rows_memorypool( dbhash )
243 rows += self.get_address_out_rows_memorypool( dbhash )
244 address_has_mempool = False
247 is_in, tx_hash, tx_id, pos, value = row
248 tx_hash = self.hashout_hex(tx_hash)
249 if tx_hash in known_tx:
252 # this means that pending transactions were added to the db, even if they are not returned by getmemorypool
253 address_has_mempool = True
255 # this means pending transactions are returned by getmemorypool
256 if tx_hash not in self.mempool_keys:
259 #print "mempool", tx_hash
264 "blk_hash": 'mempool',
270 txpoints.append(txpoint)
273 for txpoint in txpoints:
274 tx_id = txpoint['tx_id']
277 inrows = self.get_tx_inputs(tx_id)
279 _hash = self.binout(row[6])
280 address = hash_to_address(chr(0), _hash)
281 txinputs.append(address)
282 txpoint['inputs'] = txinputs
284 outrows = self.get_tx_outputs(tx_id)
286 _hash = self.binout(row[6])
287 address = hash_to_address(chr(0), _hash)
288 txoutputs.append(address)
289 txpoint['outputs'] = txoutputs
291 # for all unspent inputs, I want their scriptpubkey. (actually I could deduce it from the address)
292 if not txpoint['is_in']:
293 # detect if already redeemed...
295 if row[6] == dbhash: break
298 #row = self.get_tx_output(tx_id,dbhash)
299 # pos, script, value, o_hash, o_id, o_pos, binaddr = row
300 # if not redeemed, we add the script
302 if not row[4]: txpoint['raw_scriptPubKey'] = row[1]
305 if not address_has_mempool:
306 self.tx_cache[addr] = txpoints
312 def memorypool_update(store):
314 ds = BCDataStream.BCDataStream()
315 previous_transactions = store.mempool_keys
316 store.mempool_keys = []
318 postdata = dumps({"method": 'getmemorypool', 'params': [], 'id':'jsonrpc'})
320 respdata = urllib.urlopen(store.bitcoind_url, postdata).read()
322 if r['error'] != None:
325 v = r['result'].get('transactions')
328 ds.write(hextx.decode('hex'))
329 tx = deserialize.parse_Transaction(ds)
330 tx['hash'] = util.double_sha256(tx['tx'])
331 tx_hash = store.hashin(tx['hash'])
333 def send_tx(self,tx):
334 postdata = dumps({"method": 'importtransaction', 'params': [tx], 'id':'jsonrpc'})
335 respdata = urllib.urlopen(self.bitcoind_url, postdata).read()
337 if r['error'] != None:
338 out = "error: transaction rejected by memorypool\n"+tx
344 def main_iteration(store):
346 store.dblock.acquire()
348 store.memorypool_update()
350 block_number = store.get_block_number(1)
354 print "IOError: cannot reach bitcoind"
357 traceback.print_exc(file=sys.stdout)
360 store.dblock.release()
366 if __name__ == '__main__':
367 import Queue, ConfigParser
369 config = ConfigParser.ConfigParser()
370 config.add_section('database')
371 config.set('database', 'type', 'psycopg2')
372 config.set('database', 'database', 'abe')
375 f = open('/etc/electrum.conf','r')
379 print "Could not read electrum.conf. I will use the default values."
381 address_queue = Queue.Queue()
382 store = MyStore(config,address_queue)
383 store.main_iteration()