create cleaning thread, add commad line socket requests
[electrum-server.git] / server.py
1 #!/usr/bin/env python
2 # Copyright(C) 2011 thomasv@gitorious
3
4 # This program is free software: you can redistribute it and/or modify
5 # it under the terms of the GNU Affero General Public License as
6 # published by the Free Software Foundation, either version 3 of the
7 # License, or (at your option) any later version.
8 #
9 # This program is distributed in the hope that it will be useful, but
10 # WITHOUT ANY WARRANTY; without even the implied warranty of
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
12 # Affero General Public License for more details.
13 #
14 # You should have received a copy of the GNU Affero General Public
15 # License along with this program.  If not, see
16 # <http://www.gnu.org/licenses/agpl.html>.
17
18 """
19 Todo:
20    * server should check and return bitcoind status..
21    * improve txpoint sorting
22 """
23
24
25 import time, socket, operator, thread, ast, sys
26 import psycopg2, binascii
27 import bitcoinrpc
28
29 from Abe.abe import hash_to_address, decode_check_address
30 from Abe.DataStore import DataStore as Datastore_class
31 from Abe import DataStore, readconf, BCDataStream,  deserialize, util, base58
32
33 try:
34     f = open('/etc/electrum.conf','r')
35     data = f.read()
36     f.close()
37     HOST, PORT, SERVER_MESSAGE = ast.literal_eval(data)
38 except:
39     print "could not read /etc/electrum.conf"
40     SERVER_MESSAGE = "Welcome to Electrum"
41     HOST = 'ecdsa.org'
42     PORT = 50000
43
44
45
46 sessions = {}
47 sessions_last_time = {}
48 dblock = thread.allocate_lock()
49
50
51 class MyStore(Datastore_class):
52
53     def safe_sql(self,sql):
54         try:
55             dblock.acquire()
56             ret = self.selectall(sql)
57             dblock.release()
58             return ret
59         except:
60             print "sql error", sql
61             return []
62
63     def get_tx_outputs(self, tx_id):
64         return self.safe_sql("""SELECT
65                 txout.txout_pos,
66                 txout.txout_scriptPubKey,
67                 txout.txout_value,
68                 nexttx.tx_hash,
69                 nexttx.tx_id,
70                 txin.txin_pos,
71                 pubkey.pubkey_hash
72               FROM txout
73               LEFT JOIN txin ON (txin.txout_id = txout.txout_id)
74               LEFT JOIN pubkey ON (pubkey.pubkey_id = txout.pubkey_id)
75               LEFT JOIN tx nexttx ON (txin.tx_id = nexttx.tx_id)
76              WHERE txout.tx_id = %d 
77              ORDER BY txout.txout_pos
78         """%(tx_id))
79
80     def get_tx_inputs(self, tx_id):
81         return self.safe_sql(""" SELECT
82                 txin.txin_pos,
83                 txin.txin_scriptSig,
84                 txout.txout_value,
85                 COALESCE(prevtx.tx_hash, u.txout_tx_hash),
86                 prevtx.tx_id,
87                 COALESCE(txout.txout_pos, u.txout_pos),
88                 pubkey.pubkey_hash
89               FROM txin
90               LEFT JOIN txout ON (txout.txout_id = txin.txout_id)
91               LEFT JOIN pubkey ON (pubkey.pubkey_id = txout.pubkey_id)
92               LEFT JOIN tx prevtx ON (txout.tx_id = prevtx.tx_id)
93               LEFT JOIN unlinked_txin u ON (u.txin_id = txin.txin_id)
94              WHERE txin.tx_id = %d
95              ORDER BY txin.txin_pos
96              """%(tx_id,))
97
98     def get_address_out_rows(self, dbhash):
99         return self.safe_sql(""" SELECT
100                 b.block_nTime,
101                 cc.chain_id,
102                 b.block_height,
103                 1,
104                 b.block_hash,
105                 tx.tx_hash,
106                 tx.tx_id,
107                 txin.txin_pos,
108                 -prevout.txout_value
109               FROM chain_candidate cc
110               JOIN block b ON (b.block_id = cc.block_id)
111               JOIN block_tx ON (block_tx.block_id = b.block_id)
112               JOIN tx ON (tx.tx_id = block_tx.tx_id)
113               JOIN txin ON (txin.tx_id = tx.tx_id)
114               JOIN txout prevout ON (txin.txout_id = prevout.txout_id)
115               JOIN pubkey ON (pubkey.pubkey_id = prevout.pubkey_id)
116              WHERE pubkey.pubkey_hash = '%s'
117                AND cc.in_longest = 1"""%dbhash)
118
119     def get_address_out_rows_memorypool(self, dbhash):
120         return self.safe_sql(""" SELECT
121                 1,
122                 tx.tx_hash,
123                 tx.tx_id,
124                 txin.txin_pos,
125                 -prevout.txout_value
126               FROM tx 
127               JOIN txin ON (txin.tx_id = tx.tx_id)
128               JOIN txout prevout ON (txin.txout_id = prevout.txout_id)
129               JOIN pubkey ON (pubkey.pubkey_id = prevout.pubkey_id)
130              WHERE pubkey.pubkey_hash ='%s' """%(dbhash))
131
132     def get_address_in_rows(self, dbhash):
133         return self.safe_sql(""" SELECT
134                 b.block_nTime,
135                 cc.chain_id,
136                 b.block_height,
137                 0,
138                 b.block_hash,
139                 tx.tx_hash,
140                 tx.tx_id,
141                 txout.txout_pos,
142                 txout.txout_value
143               FROM chain_candidate cc
144               JOIN block b ON (b.block_id = cc.block_id)
145               JOIN block_tx ON (block_tx.block_id = b.block_id)
146               JOIN tx ON (tx.tx_id = block_tx.tx_id)
147               JOIN txout ON (txout.tx_id = tx.tx_id)
148               JOIN pubkey ON (pubkey.pubkey_id = txout.pubkey_id)
149              WHERE pubkey.pubkey_hash = '%s'
150                AND cc.in_longest = 1"""%(dbhash))
151
152     def get_address_in_rows_memorypool(self, dbhash):
153         return self.safe_sql( """ SELECT
154                 0,
155                 tx.tx_hash,
156                 tx.tx_id,
157                 txout.txout_pos,
158                 txout.txout_value
159               FROM tx
160               JOIN txout ON (txout.tx_id = tx.tx_id)
161               JOIN pubkey ON (pubkey.pubkey_id = txout.pubkey_id)
162              WHERE pubkey.pubkey_hash = '%s' """%(dbhash))
163
164     def get_txpoints(self, addr):
165         version, binaddr = decode_check_address(addr)
166         if binaddr is None:
167             return "err"
168         dbhash = self.binin(binaddr)
169         rows = []
170         rows += self.get_address_out_rows( dbhash )
171         rows += self.get_address_in_rows( dbhash )
172
173         txpoints = []
174         known_tx = []
175
176         for row in rows:
177             try:
178                 nTime, chain_id, height, is_in, blk_hash, tx_hash, tx_id, pos, value = row
179             except:
180                 print "cannot unpack row", row
181                 break
182             tx_hash = self.hashout_hex(tx_hash)
183             txpoint = {
184                     "nTime":    int(nTime),
185                     #"chain_id": int(chain_id),
186                     "height":   int(height),
187                     "is_in":    int(is_in),
188                     "blk_hash": self.hashout_hex(blk_hash),
189                     "tx_hash":  tx_hash,
190                     "tx_id":    int(tx_id),
191                     "pos":      int(pos),
192                     "value":    int(value),
193                     }
194
195             txpoints.append(txpoint)
196             known_tx.append(self.hashout_hex(tx_hash))
197
198
199         # todo: sort them really...
200         txpoints = sorted(txpoints, key=operator.itemgetter("nTime"))
201
202         # read memory pool
203         rows = []
204         rows += self.get_address_in_rows_memorypool( dbhash )
205         rows += self.get_address_out_rows_memorypool( dbhash )
206         for row in rows:
207             is_in, tx_hash, tx_id, pos, value = row
208             tx_hash = self.hashout_hex(tx_hash)
209             if tx_hash in known_tx:
210                 continue
211             #print "mempool", tx_hash
212             txpoint = {
213                     "nTime":    0,
214                     "chain_id": 1,
215                     "height":   0,
216                     "is_in":    int(is_in),
217                     "blk_hash": 'mempool',
218                     "tx_hash":  tx_hash,
219                     "tx_id":    int(tx_id),
220                     "pos":      int(pos),
221                     "value":    int(value),
222                     }
223             txpoints.append(txpoint)
224
225
226         for txpoint in txpoints:
227             tx_id = txpoint['tx_id']
228             
229             txinputs = []
230             inrows = self.get_tx_inputs(tx_id)
231             for row in inrows:
232                 _hash = self.binout(row[6])
233                 address = hash_to_address(chr(0), _hash)
234                 txinputs.append(address)
235             txpoint['inputs'] = txinputs
236             txoutputs = []
237             outrows = self.get_tx_outputs(tx_id)
238             for row in outrows:
239                 _hash = self.binout(row[6])
240                 address = hash_to_address(chr(0), _hash)
241                 txoutputs.append(address)
242             txpoint['outputs'] = txoutputs
243
244             # for all unspent inputs, I want their scriptpubkey. (actually I could deduce it from the address)
245             if not txpoint['is_in']:
246                 # detect if already redeemed...
247                 for row in outrows:
248                     if row[6] == dbhash: break
249                 else:
250                     raise
251                 #row = self.get_tx_output(tx_id,dbhash)
252                 # pos, script, value, o_hash, o_id, o_pos, binaddr = row
253                 # if not redeemed, we add the script
254                 if row:
255                     if not row[4]: txpoint['raw_scriptPubKey'] = row[1]
256
257
258         return txpoints
259
260
261     def get_status(self, addr):
262         # last block for an address
263         tx_points = self.get_txpoints(addr)
264         if not tx_points:
265             return None
266         else:
267             return tx_points[-1]['blk_hash']
268
269
270 def send_tx(tx):
271     import bitcoinrpc
272     conn = bitcoinrpc.connect_to_local()
273     try:
274         v = conn.importtransaction(tx)
275     except:
276         v = "error: transaction rejected by memorypool"
277     return v
278
279
280 def listen_thread(store):
281
282     s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
283     s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
284     s.bind((HOST, PORT))
285     s.listen(1)
286     while True:
287         conn, addr = s.accept()
288         thread.start_new_thread(client_thread, (addr, conn,))
289
290
291 def client_thread(ipaddr,conn):
292     #print "client thread", ipaddr
293     try:
294         ipaddr = ipaddr[0]
295         msg = ''
296         while 1:
297             d = conn.recv(1024)
298             msg += d
299             if d[-1]=='#':
300                 break
301
302         #print msg
303
304         try:
305             cmd, data = ast.literal_eval(msg[:-1])
306         except:
307             print "syntax error", repr(msg)
308             conn.close()
309             return
310
311         if cmd=='b':
312             out = "%d"%store.get_block_number(1)
313
314         elif cmd=='session':
315             import random, string
316             session_id = ''.join(random.choice(string.ascii_uppercase + string.digits) for x in range(10))
317             try:
318                 addresses = ast.literal_eval(data)
319             except:
320                 print "error"
321                 conn.close()
322                 return
323
324             print time.asctime(), "session", ipaddr, session_id, addresses[0], len(addresses)
325
326             sessions[session_id] = {}
327             for a in addresses:
328                 sessions[session_id][a] = ''
329             out = repr( (session_id, SERVER_MESSAGE) )
330             sessions_last_time[session_id] = time.time()
331
332         elif cmd=='poll': 
333             session_id = data
334             addresses = sessions.get(session_id)
335             if not addresses:
336                 print "session not found", ipaddr
337                 out = repr( (-1, {}))
338             else:
339                 sessions_last_time[session_id] = time.time()
340                 ret = {}
341                 for addr in addresses:
342                     status = store.get_status( addr )
343                     last_status = sessions[session_id].get( addr )
344                     if last_status != status:
345                         sessions[session_id][addr] = status
346                         ret[addr] = status
347                 out = repr( (store.get_block_number(1), ret ) )
348
349         elif cmd == 'h': 
350             # history
351             addr = data
352             h = store.get_txpoints( addr )
353             out = repr(h)
354
355         elif cmd == 'load': 
356             out = repr( len(sessions) )
357
358         elif cmd =='tx':        
359             # transaction
360             out = send_tx(data)
361         else:
362             out = None
363
364         if out:
365             #print ipaddr, cmd, len(out)
366             try:
367                 conn.send(out)
368             except:
369                 print "error, could not send"
370
371     finally:
372         conn.close()
373     
374
375 ds = BCDataStream.BCDataStream()
376
377
378 def memorypool_update(store):
379
380     conn = bitcoinrpc.connect_to_local()
381     try:
382         v = conn.getmemorypool()
383     except:
384         print "cannot contact bitcoin daemmon"
385         return
386     v = v['transactions']
387     for hextx in v:
388         ds.clear()
389         ds.write(hextx.decode('hex'))
390         tx = deserialize.parse_Transaction(ds)
391         tx['hash'] = util.double_sha256(tx['tx'])
392             
393         if store.tx_find_id_and_value(tx):
394             pass
395         else:
396             store.import_tx(tx, False)
397             #print tx['hash'][::-1].encode('hex')
398     store.commit()
399
400
401
402
403 def clean_session_thread():
404     while 1:
405         time.sleep(30)
406         t = time.time()
407         for k,t0 in sessions_last_time.items():
408             if t - t0 > 60:
409                 print "lost session",k
410                 sessions.pop(k)
411                 sessions_last_time.pop(k)
412             
413
414
415 import traceback
416
417
418 if __name__ == '__main__':
419
420     if len(sys.argv)>1:
421         request = sys.argv[1]
422         request += "#"
423         s = socket.socket( socket.AF_INET, socket.SOCK_STREAM)
424         s.connect(( HOST, PORT))
425         s.send( request )
426         out = ''
427         while 1:
428             msg = s.recv(1024)
429             if msg: out += msg
430             else: break
431         s.close()
432         print out
433         sys.exit(0)
434
435
436     print "starting Electrum server"
437     conf = DataStore.CONFIG_DEFAULTS
438     args, argv = readconf.parse_argv( [], conf)
439     args.dbtype='psycopg2'
440     args.connect_args = {"database":"abe"}
441     store = MyStore(args)
442
443     thread.start_new_thread(listen_thread, (store,))
444     thread.start_new_thread(clean_session_thread, ())
445
446     while True:
447         try:
448             dblock.acquire()
449             store.catch_up()
450             memorypool_update(store)
451             dblock.release()
452         except:
453             traceback.print_exc(file=sys.stdout)
454             print "continuing..."
455         time.sleep(10)
456