server code
[electrum-server.git] / server.py
1 #!/usr/bin/env python
2 # Copyright(C) 2011 thomasv@gitorious
3
4 # This program is free software: you can redistribute it and/or modify
5 # it under the terms of the GNU Affero General Public License as
6 # published by the Free Software Foundation, either version 3 of the
7 # License, or (at your option) any later version.
8 #
9 # This program is distributed in the hope that it will be useful, but
10 # WITHOUT ANY WARRANTY; without even the implied warranty of
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
12 # Affero General Public License for more details.
13 #
14 # You should have received a copy of the GNU Affero General Public
15 # License along with this program.  If not, see
16 # <http://www.gnu.org/licenses/agpl.html>.
17
18 """
19 Todo:
20    * server should check and return bitcoind status..
21    * improve txpoint sorting
22 """
23
24 SERVER_MESSAGE = """
25 Welcome to ecdsa.org.
26
27 This service is free. Support this node: 
28 19mP9FKrXqL46Si58pHdhGKow88SUPy1V8
29
30 The server code is free software; you may 
31 download it and operate your own Electrum 
32 node. See http://ecdsa.org/electrum
33 """
34
35
36
37 import time, socket, operator, thread, ast, sys
38
39 import psycopg2, binascii
40 import bitcoinrpc
41
42 from Abe.abe import hash_to_address, decode_check_address
43 from Abe.DataStore import DataStore as Datastore_class
44 from Abe import DataStore, readconf, BCDataStream,  deserialize, util, base58
45
46
47 sessions = {}
48 dblock = thread.allocate_lock()
49
50
51 class MyStore(Datastore_class):
52
53     def safe_sql(self,sql):
54         try:
55             dblock.acquire()
56             ret = self.selectall(sql)
57             dblock.release()
58             return ret
59         except:
60             print "sql error", sql
61             return []
62
63     def get_tx_outputs(self, tx_id):
64         return self.safe_sql("""SELECT
65                 txout.txout_pos,
66                 txout.txout_scriptPubKey,
67                 txout.txout_value,
68                 nexttx.tx_hash,
69                 nexttx.tx_id,
70                 txin.txin_pos,
71                 pubkey.pubkey_hash
72               FROM txout
73               LEFT JOIN txin ON (txin.txout_id = txout.txout_id)
74               LEFT JOIN pubkey ON (pubkey.pubkey_id = txout.pubkey_id)
75               LEFT JOIN tx nexttx ON (txin.tx_id = nexttx.tx_id)
76              WHERE txout.tx_id = %d 
77              ORDER BY txout.txout_pos
78         """%(tx_id))
79
80     def get_tx_inputs(self, tx_id):
81         return self.safe_sql(""" SELECT
82                 txin.txin_pos,
83                 txin.txin_scriptSig,
84                 txout.txout_value,
85                 COALESCE(prevtx.tx_hash, u.txout_tx_hash),
86                 prevtx.tx_id,
87                 COALESCE(txout.txout_pos, u.txout_pos),
88                 pubkey.pubkey_hash
89               FROM txin
90               LEFT JOIN txout ON (txout.txout_id = txin.txout_id)
91               LEFT JOIN pubkey ON (pubkey.pubkey_id = txout.pubkey_id)
92               LEFT JOIN tx prevtx ON (txout.tx_id = prevtx.tx_id)
93               LEFT JOIN unlinked_txin u ON (u.txin_id = txin.txin_id)
94              WHERE txin.tx_id = %d
95              ORDER BY txin.txin_pos
96              """%(tx_id,))
97
98     def get_address_out_rows(self, dbhash):
99         return self.safe_sql(""" SELECT
100                 b.block_nTime,
101                 cc.chain_id,
102                 b.block_height,
103                 1,
104                 b.block_hash,
105                 tx.tx_hash,
106                 tx.tx_id,
107                 txin.txin_pos,
108                 -prevout.txout_value
109               FROM chain_candidate cc
110               JOIN block b ON (b.block_id = cc.block_id)
111               JOIN block_tx ON (block_tx.block_id = b.block_id)
112               JOIN tx ON (tx.tx_id = block_tx.tx_id)
113               JOIN txin ON (txin.tx_id = tx.tx_id)
114               JOIN txout prevout ON (txin.txout_id = prevout.txout_id)
115               JOIN pubkey ON (pubkey.pubkey_id = prevout.pubkey_id)
116              WHERE pubkey.pubkey_hash = '%s'
117                AND cc.in_longest = 1"""%dbhash)
118
119     def get_address_out_rows_memorypool(self, dbhash):
120         return self.safe_sql(""" SELECT
121                 1,
122                 tx.tx_hash,
123                 tx.tx_id,
124                 txin.txin_pos,
125                 -prevout.txout_value
126               FROM tx 
127               JOIN txin ON (txin.tx_id = tx.tx_id)
128               JOIN txout prevout ON (txin.txout_id = prevout.txout_id)
129               JOIN pubkey ON (pubkey.pubkey_id = prevout.pubkey_id)
130              WHERE pubkey.pubkey_hash ='%s' """%(dbhash))
131
132     def get_address_in_rows(self, dbhash):
133         return self.safe_sql(""" SELECT
134                 b.block_nTime,
135                 cc.chain_id,
136                 b.block_height,
137                 0,
138                 b.block_hash,
139                 tx.tx_hash,
140                 tx.tx_id,
141                 txout.txout_pos,
142                 txout.txout_value
143               FROM chain_candidate cc
144               JOIN block b ON (b.block_id = cc.block_id)
145               JOIN block_tx ON (block_tx.block_id = b.block_id)
146               JOIN tx ON (tx.tx_id = block_tx.tx_id)
147               JOIN txout ON (txout.tx_id = tx.tx_id)
148               JOIN pubkey ON (pubkey.pubkey_id = txout.pubkey_id)
149              WHERE pubkey.pubkey_hash = '%s'
150                AND cc.in_longest = 1"""%(dbhash))
151
152     def get_address_in_rows_memorypool(self, dbhash):
153         return self.safe_sql( """ SELECT
154                 0,
155                 tx.tx_hash,
156                 tx.tx_id,
157                 txout.txout_pos,
158                 txout.txout_value
159               FROM tx
160               JOIN txout ON (txout.tx_id = tx.tx_id)
161               JOIN pubkey ON (pubkey.pubkey_id = txout.pubkey_id)
162              WHERE pubkey.pubkey_hash = '%s' """%(dbhash))
163
164     def get_txpoints(self, addr):
165         version, binaddr = decode_check_address(addr)
166         if binaddr is None:
167             return "err"
168         dbhash = self.binin(binaddr)
169         rows = []
170         rows += self.get_address_out_rows( dbhash )
171         rows += self.get_address_in_rows( dbhash )
172
173         txpoints = []
174         known_tx = []
175
176         for row in rows:
177             try:
178                 nTime, chain_id, height, is_in, blk_hash, tx_hash, tx_id, pos, value = row
179             except:
180                 print "cannot unpack row", row
181                 break
182             tx_hash = self.hashout_hex(tx_hash)
183             txpoint = {
184                     "nTime":    int(nTime),
185                     #"chain_id": int(chain_id),
186                     "height":   int(height),
187                     "is_in":    int(is_in),
188                     "blk_hash": self.hashout_hex(blk_hash),
189                     "tx_hash":  tx_hash,
190                     "tx_id":    int(tx_id),
191                     "pos":      int(pos),
192                     "value":    int(value),
193                     }
194
195             txpoints.append(txpoint)
196             known_tx.append(self.hashout_hex(tx_hash))
197
198
199         # todo: sort them really...
200         txpoints = sorted(txpoints, key=operator.itemgetter("nTime"))
201
202         # read memory pool
203         rows = []
204         rows += self.get_address_in_rows_memorypool( dbhash )
205         rows += self.get_address_out_rows_memorypool( dbhash )
206         for row in rows:
207             is_in, tx_hash, tx_id, pos, value = row
208             tx_hash = self.hashout_hex(tx_hash)
209             if tx_hash in known_tx:
210                 continue
211             #print "mempool", tx_hash
212             txpoint = {
213                     "nTime":    0,
214                     "chain_id": 1,
215                     "height":   0,
216                     "is_in":    int(is_in),
217                     "blk_hash": 'mempool',
218                     "tx_hash":  tx_hash,
219                     "tx_id":    int(tx_id),
220                     "pos":      int(pos),
221                     "value":    int(value),
222                     }
223             txpoints.append(txpoint)
224
225
226         for txpoint in txpoints:
227             tx_id = txpoint['tx_id']
228             
229             txinputs = []
230             inrows = self.get_tx_inputs(tx_id)
231             for row in inrows:
232                 _hash = self.binout(row[6])
233                 address = hash_to_address(chr(0), _hash)
234                 txinputs.append(address)
235             txpoint['inputs'] = txinputs
236             txoutputs = []
237             outrows = self.get_tx_outputs(tx_id)
238             for row in outrows:
239                 _hash = self.binout(row[6])
240                 address = hash_to_address(chr(0), _hash)
241                 txoutputs.append(address)
242             txpoint['outputs'] = txoutputs
243
244             # for all unspent inputs, I want their scriptpubkey. (actually I could deduce it from the address)
245             if not txpoint['is_in']:
246                 # detect if already redeemed...
247                 for row in outrows:
248                     if row[6] == dbhash: break
249                 else:
250                     raise
251                 #row = self.get_tx_output(tx_id,dbhash)
252                 # pos, script, value, o_hash, o_id, o_pos, binaddr = row
253                 # if not redeemed, we add the script
254                 if row:
255                     if not row[4]: txpoint['raw_scriptPubKey'] = row[1]
256
257
258         return txpoints
259
260
261     def get_status(self, addr):
262         # last block for an address
263         tx_points = self.get_txpoints(addr)
264         if not tx_points:
265             return None
266         else:
267             return tx_points[-1]['blk_hash']
268
269
270 def send_tx(tx):
271     import bitcoinrpc
272     conn = bitcoinrpc.connect_to_local()
273     try:
274         v = conn.importtransaction(tx)
275     except:
276         v = "error: transaction rejected by memorypool"
277     return v
278
279
280 def listen_thread(store):
281
282     s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
283     s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
284     s.bind(('ecdsa.org', 50000))
285     s.listen(1)
286     while True:
287         conn, addr = s.accept()
288         thread.start_new_thread(client_thread, (addr, conn,))
289
290
291 def client_thread(ipaddr,conn):
292     try:
293         ipaddr = ipaddr[0]
294         msg = ''
295         while 1:
296             d = conn.recv(1024)
297             if d: msg+=d
298             try:
299                 cmd, data = ast.literal_eval(msg)
300                 break
301             except:
302                 continue
303         
304
305         if cmd=='b':
306             out = "%d"%store.get_block_number(1)
307
308         elif cmd=='watch':
309             addresses = ast.literal_eval(data)
310             sessions[ipaddr] = {}
311             for a in addresses:
312                 sessions[ipaddr][a] = ''
313             out = SERVER_MESSAGE
314
315         elif cmd=='poll': 
316             addresses = sessions.get(ipaddr)
317             if not addresses:
318                 print "session not found", ipaddr
319                 out = repr( (-1, {}))
320             else:
321                 ret = {}
322                 for addr in addresses:
323                     status = store.get_status( addr )
324                     last_status = sessions[ipaddr].get( addr )
325                     if last_status != status:
326                         sessions[ipaddr][addr] = status
327                         ret[addr] = status
328                 out = repr( (store.get_block_number(1), ret ) )
329
330         elif cmd == 'h': 
331             # history
332             addr = data
333             h = store.get_txpoints( addr )
334             out = repr(h)
335
336         elif cmd =='tx':        
337             # transaction
338             out = send_tx(data)
339         else:
340             out = None
341
342         if out:
343             #print ipaddr, cmd, len(out)
344             try:
345                 conn.send(out)
346             except:
347                 print "error, could not send"
348
349     finally:
350         conn.close()
351     
352
353 ds = BCDataStream.BCDataStream()
354
355
356 def memorypool_update(store):
357
358     conn = bitcoinrpc.connect_to_local()
359     try:
360         v = conn.getmemorypool()
361     except:
362         print "cannot contact bitcoin daemmon"
363         return
364     v = v['transactions']
365     for hextx in v:
366         ds.clear()
367         ds.write(hextx.decode('hex'))
368         tx = deserialize.parse_Transaction(ds)
369         tx['hash'] = util.double_sha256(tx['tx'])
370             
371         if store.tx_find_id_and_value(tx):
372             pass
373         else:
374             store.import_tx(tx, False)
375             print tx['hash'][::-1].encode('hex')
376     store.commit()
377
378 import traceback
379
380
381 if __name__ == '__main__':
382
383     conf = DataStore.CONFIG_DEFAULTS
384     args, argv = readconf.parse_argv( [], conf)
385     args.dbtype='psycopg2'
386     args.connect_args = {"database":"abe"}
387     store = MyStore(args)
388
389     thread.start_new_thread(listen_thread, (store,))
390
391     while True:
392         try:
393             dblock.acquire()
394             store.catch_up()
395             memorypool_update(store)
396             dblock.release()
397         except:
398             traceback.print_exc(file=sys.stdout)
399         time.sleep(10)
400