read message from file
[electrum-server.git] / server.py
1 #!/usr/bin/env python
2 # Copyright(C) 2011 thomasv@gitorious
3
4 # This program is free software: you can redistribute it and/or modify
5 # it under the terms of the GNU Affero General Public License as
6 # published by the Free Software Foundation, either version 3 of the
7 # License, or (at your option) any later version.
8 #
9 # This program is distributed in the hope that it will be useful, but
10 # WITHOUT ANY WARRANTY; without even the implied warranty of
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
12 # Affero General Public License for more details.
13 #
14 # You should have received a copy of the GNU Affero General Public
15 # License along with this program.  If not, see
16 # <http://www.gnu.org/licenses/agpl.html>.
17
18 """
19 Todo:
20    * server should check and return bitcoind status..
21    * improve txpoint sorting
22 """
23
24 f = open('/etc/electrum.conf','r')
25 SERVER_MESSAGE = f.read()
26 f.close()
27
28
29
30 import time, socket, operator, thread, ast, sys
31
32 import psycopg2, binascii
33 import bitcoinrpc
34
35 from Abe.abe import hash_to_address, decode_check_address
36 from Abe.DataStore import DataStore as Datastore_class
37 from Abe import DataStore, readconf, BCDataStream,  deserialize, util, base58
38
39
40 sessions = {}
41 dblock = thread.allocate_lock()
42
43
44 class MyStore(Datastore_class):
45
46     def safe_sql(self,sql):
47         try:
48             dblock.acquire()
49             ret = self.selectall(sql)
50             dblock.release()
51             return ret
52         except:
53             print "sql error", sql
54             return []
55
56     def get_tx_outputs(self, tx_id):
57         return self.safe_sql("""SELECT
58                 txout.txout_pos,
59                 txout.txout_scriptPubKey,
60                 txout.txout_value,
61                 nexttx.tx_hash,
62                 nexttx.tx_id,
63                 txin.txin_pos,
64                 pubkey.pubkey_hash
65               FROM txout
66               LEFT JOIN txin ON (txin.txout_id = txout.txout_id)
67               LEFT JOIN pubkey ON (pubkey.pubkey_id = txout.pubkey_id)
68               LEFT JOIN tx nexttx ON (txin.tx_id = nexttx.tx_id)
69              WHERE txout.tx_id = %d 
70              ORDER BY txout.txout_pos
71         """%(tx_id))
72
73     def get_tx_inputs(self, tx_id):
74         return self.safe_sql(""" SELECT
75                 txin.txin_pos,
76                 txin.txin_scriptSig,
77                 txout.txout_value,
78                 COALESCE(prevtx.tx_hash, u.txout_tx_hash),
79                 prevtx.tx_id,
80                 COALESCE(txout.txout_pos, u.txout_pos),
81                 pubkey.pubkey_hash
82               FROM txin
83               LEFT JOIN txout ON (txout.txout_id = txin.txout_id)
84               LEFT JOIN pubkey ON (pubkey.pubkey_id = txout.pubkey_id)
85               LEFT JOIN tx prevtx ON (txout.tx_id = prevtx.tx_id)
86               LEFT JOIN unlinked_txin u ON (u.txin_id = txin.txin_id)
87              WHERE txin.tx_id = %d
88              ORDER BY txin.txin_pos
89              """%(tx_id,))
90
91     def get_address_out_rows(self, dbhash):
92         return self.safe_sql(""" SELECT
93                 b.block_nTime,
94                 cc.chain_id,
95                 b.block_height,
96                 1,
97                 b.block_hash,
98                 tx.tx_hash,
99                 tx.tx_id,
100                 txin.txin_pos,
101                 -prevout.txout_value
102               FROM chain_candidate cc
103               JOIN block b ON (b.block_id = cc.block_id)
104               JOIN block_tx ON (block_tx.block_id = b.block_id)
105               JOIN tx ON (tx.tx_id = block_tx.tx_id)
106               JOIN txin ON (txin.tx_id = tx.tx_id)
107               JOIN txout prevout ON (txin.txout_id = prevout.txout_id)
108               JOIN pubkey ON (pubkey.pubkey_id = prevout.pubkey_id)
109              WHERE pubkey.pubkey_hash = '%s'
110                AND cc.in_longest = 1"""%dbhash)
111
112     def get_address_out_rows_memorypool(self, dbhash):
113         return self.safe_sql(""" SELECT
114                 1,
115                 tx.tx_hash,
116                 tx.tx_id,
117                 txin.txin_pos,
118                 -prevout.txout_value
119               FROM tx 
120               JOIN txin ON (txin.tx_id = tx.tx_id)
121               JOIN txout prevout ON (txin.txout_id = prevout.txout_id)
122               JOIN pubkey ON (pubkey.pubkey_id = prevout.pubkey_id)
123              WHERE pubkey.pubkey_hash ='%s' """%(dbhash))
124
125     def get_address_in_rows(self, dbhash):
126         return self.safe_sql(""" SELECT
127                 b.block_nTime,
128                 cc.chain_id,
129                 b.block_height,
130                 0,
131                 b.block_hash,
132                 tx.tx_hash,
133                 tx.tx_id,
134                 txout.txout_pos,
135                 txout.txout_value
136               FROM chain_candidate cc
137               JOIN block b ON (b.block_id = cc.block_id)
138               JOIN block_tx ON (block_tx.block_id = b.block_id)
139               JOIN tx ON (tx.tx_id = block_tx.tx_id)
140               JOIN txout ON (txout.tx_id = tx.tx_id)
141               JOIN pubkey ON (pubkey.pubkey_id = txout.pubkey_id)
142              WHERE pubkey.pubkey_hash = '%s'
143                AND cc.in_longest = 1"""%(dbhash))
144
145     def get_address_in_rows_memorypool(self, dbhash):
146         return self.safe_sql( """ SELECT
147                 0,
148                 tx.tx_hash,
149                 tx.tx_id,
150                 txout.txout_pos,
151                 txout.txout_value
152               FROM tx
153               JOIN txout ON (txout.tx_id = tx.tx_id)
154               JOIN pubkey ON (pubkey.pubkey_id = txout.pubkey_id)
155              WHERE pubkey.pubkey_hash = '%s' """%(dbhash))
156
157     def get_txpoints(self, addr):
158         version, binaddr = decode_check_address(addr)
159         if binaddr is None:
160             return "err"
161         dbhash = self.binin(binaddr)
162         rows = []
163         rows += self.get_address_out_rows( dbhash )
164         rows += self.get_address_in_rows( dbhash )
165
166         txpoints = []
167         known_tx = []
168
169         for row in rows:
170             try:
171                 nTime, chain_id, height, is_in, blk_hash, tx_hash, tx_id, pos, value = row
172             except:
173                 print "cannot unpack row", row
174                 break
175             tx_hash = self.hashout_hex(tx_hash)
176             txpoint = {
177                     "nTime":    int(nTime),
178                     #"chain_id": int(chain_id),
179                     "height":   int(height),
180                     "is_in":    int(is_in),
181                     "blk_hash": self.hashout_hex(blk_hash),
182                     "tx_hash":  tx_hash,
183                     "tx_id":    int(tx_id),
184                     "pos":      int(pos),
185                     "value":    int(value),
186                     }
187
188             txpoints.append(txpoint)
189             known_tx.append(self.hashout_hex(tx_hash))
190
191
192         # todo: sort them really...
193         txpoints = sorted(txpoints, key=operator.itemgetter("nTime"))
194
195         # read memory pool
196         rows = []
197         rows += self.get_address_in_rows_memorypool( dbhash )
198         rows += self.get_address_out_rows_memorypool( dbhash )
199         for row in rows:
200             is_in, tx_hash, tx_id, pos, value = row
201             tx_hash = self.hashout_hex(tx_hash)
202             if tx_hash in known_tx:
203                 continue
204             #print "mempool", tx_hash
205             txpoint = {
206                     "nTime":    0,
207                     "chain_id": 1,
208                     "height":   0,
209                     "is_in":    int(is_in),
210                     "blk_hash": 'mempool',
211                     "tx_hash":  tx_hash,
212                     "tx_id":    int(tx_id),
213                     "pos":      int(pos),
214                     "value":    int(value),
215                     }
216             txpoints.append(txpoint)
217
218
219         for txpoint in txpoints:
220             tx_id = txpoint['tx_id']
221             
222             txinputs = []
223             inrows = self.get_tx_inputs(tx_id)
224             for row in inrows:
225                 _hash = self.binout(row[6])
226                 address = hash_to_address(chr(0), _hash)
227                 txinputs.append(address)
228             txpoint['inputs'] = txinputs
229             txoutputs = []
230             outrows = self.get_tx_outputs(tx_id)
231             for row in outrows:
232                 _hash = self.binout(row[6])
233                 address = hash_to_address(chr(0), _hash)
234                 txoutputs.append(address)
235             txpoint['outputs'] = txoutputs
236
237             # for all unspent inputs, I want their scriptpubkey. (actually I could deduce it from the address)
238             if not txpoint['is_in']:
239                 # detect if already redeemed...
240                 for row in outrows:
241                     if row[6] == dbhash: break
242                 else:
243                     raise
244                 #row = self.get_tx_output(tx_id,dbhash)
245                 # pos, script, value, o_hash, o_id, o_pos, binaddr = row
246                 # if not redeemed, we add the script
247                 if row:
248                     if not row[4]: txpoint['raw_scriptPubKey'] = row[1]
249
250
251         return txpoints
252
253
254     def get_status(self, addr):
255         # last block for an address
256         tx_points = self.get_txpoints(addr)
257         if not tx_points:
258             return None
259         else:
260             return tx_points[-1]['blk_hash']
261
262
263 def send_tx(tx):
264     import bitcoinrpc
265     conn = bitcoinrpc.connect_to_local()
266     try:
267         v = conn.importtransaction(tx)
268     except:
269         v = "error: transaction rejected by memorypool"
270     return v
271
272
273 def listen_thread(store):
274
275     s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
276     s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
277     s.bind(('ecdsa.org', 50000))
278     s.listen(1)
279     while True:
280         conn, addr = s.accept()
281         thread.start_new_thread(client_thread, (addr, conn,))
282
283
284 def client_thread(ipaddr,conn):
285     #print "client thread", ipaddr
286     try:
287         ipaddr = ipaddr[0]
288         msg = ''
289         while 1:
290             d = conn.recv(1024)
291             msg += d
292             if d[-1]=='#':
293                 break
294
295         #print msg
296
297         try:
298             cmd, data = ast.literal_eval(msg[:-1])
299         except:
300             print "syntax error", repr(msg)
301             conn.close()
302             return
303
304         if cmd=='b':
305             out = "%d"%store.get_block_number(1)
306
307         elif cmd=='session':
308             import random, string
309             session_id = ''.join(random.choice(string.ascii_uppercase + string.digits) for x in range(10))
310             print "new session", ipaddr, session_id, data
311
312             addresses = ast.literal_eval(data)
313             sessions[session_id] = {}
314             for a in addresses:
315                 sessions[session_id][a] = ''
316             out = repr( (session_id, SERVER_MESSAGE) )
317
318         elif cmd=='poll': 
319             session_id = data
320             addresses = sessions.get(session_id)
321             if not addresses:
322                 print "session not found", ipaddr
323                 out = repr( (-1, {}))
324             else:
325                 ret = {}
326                 for addr in addresses:
327                     status = store.get_status( addr )
328                     last_status = sessions[session_id].get( addr )
329                     if last_status != status:
330                         sessions[session_id][addr] = status
331                         ret[addr] = status
332                 out = repr( (store.get_block_number(1), ret ) )
333
334         elif cmd == 'h': 
335             # history
336             addr = data
337             h = store.get_txpoints( addr )
338             out = repr(h)
339
340         elif cmd =='tx':        
341             # transaction
342             out = send_tx(data)
343         else:
344             out = None
345
346         if out:
347             #print ipaddr, cmd, len(out)
348             try:
349                 conn.send(out)
350             except:
351                 print "error, could not send"
352
353     finally:
354         conn.close()
355     
356
357 ds = BCDataStream.BCDataStream()
358
359
360 def memorypool_update(store):
361
362     conn = bitcoinrpc.connect_to_local()
363     try:
364         v = conn.getmemorypool()
365     except:
366         print "cannot contact bitcoin daemmon"
367         return
368     v = v['transactions']
369     for hextx in v:
370         ds.clear()
371         ds.write(hextx.decode('hex'))
372         tx = deserialize.parse_Transaction(ds)
373         tx['hash'] = util.double_sha256(tx['tx'])
374             
375         if store.tx_find_id_and_value(tx):
376             pass
377         else:
378             store.import_tx(tx, False)
379             #print tx['hash'][::-1].encode('hex')
380     store.commit()
381
382 import traceback
383
384
385 if __name__ == '__main__':
386
387     conf = DataStore.CONFIG_DEFAULTS
388     args, argv = readconf.parse_argv( [], conf)
389     args.dbtype='psycopg2'
390     args.connect_args = {"database":"abe"}
391     store = MyStore(args)
392
393     thread.start_new_thread(listen_thread, (store,))
394     #listen_thread(store)
395     #exit(0)
396
397     while True:
398         try:
399             dblock.acquire()
400             store.catch_up()
401             memorypool_update(store)
402             dblock.release()
403         except:
404             traceback.print_exc(file=sys.stdout)
405         time.sleep(10)
406