update session
[electrum-server.git] / server.py
1 #!/usr/bin/env python
2 # Copyright(C) 2011 thomasv@gitorious
3
4 # This program is free software: you can redistribute it and/or modify
5 # it under the terms of the GNU Affero General Public License as
6 # published by the Free Software Foundation, either version 3 of the
7 # License, or (at your option) any later version.
8 #
9 # This program is distributed in the hope that it will be useful, but
10 # WITHOUT ANY WARRANTY; without even the implied warranty of
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
12 # Affero General Public License for more details.
13 #
14 # You should have received a copy of the GNU Affero General Public
15 # License along with this program.  If not, see
16 # <http://www.gnu.org/licenses/agpl.html>.
17
18 """
19 Todo:
20    * server should check and return bitcoind status..
21    * improve txpoint sorting
22    * command to check cache
23 """
24
25
26 import time, socket, operator, thread, ast, sys,re
27 import psycopg2, binascii
28 import bitcoinrpc
29
30 from Abe.abe import hash_to_address, decode_check_address
31 from Abe.DataStore import DataStore as Datastore_class
32 from Abe import DataStore, readconf, BCDataStream,  deserialize, util, base58
33
34 import ConfigParser
35
36 config = ConfigParser.ConfigParser()
37 # set some defaults, which will be overwritten by the config file
38 config.add_section('server')
39 config.set('server','banner', 'Welcome to Electrum!')
40 config.set('server', 'host', 'localhost')
41 config.set('server', 'port', 50000)
42 config.set('server', 'password', '')
43 config.set('server', 'irc', 'yes')
44 config.set('server', 'cache', 'no') 
45 config.set('server', 'ircname', 'Electrum server')
46 config.add_section('database')
47 config.set('database', 'type', 'psycopg2')
48 config.set('database', 'database', 'abe')
49
50 try:
51     f = open('/etc/electrum.conf','r')
52     config.readfp(f)
53     f.close()
54 except:
55     print "Could not read electrum.conf. I will use the default values."
56
57 stopping = False
58 block_number = -1
59 sessions = {}
60 sessions_last_time = {}
61 dblock = thread.allocate_lock()
62 peer_list = {}
63
64
65 class MyStore(Datastore_class):
66
67     def import_tx(self, tx, is_coinbase):
68         tx_id = super(MyStore, self).import_tx(tx, is_coinbase)
69         if config.get('server', 'cache') == 'yes': self.update_tx_cache(tx_id)
70
71     def update_tx_cache(self, txid):
72         inrows = self.get_tx_inputs(txid, False)
73         for row in inrows:
74             _hash = store.binout(row[6])
75             address = hash_to_address(chr(0), _hash)
76             if self.tx_cache.has_key(address):
77                 print "cache: invalidating", address, self.ismempool
78                 self.tx_cache.pop(address)
79         outrows = self.get_tx_outputs(txid, False)
80         for row in outrows:
81             _hash = store.binout(row[6])
82             address = hash_to_address(chr(0), _hash)
83             if self.tx_cache.has_key(address):
84                 print "cache: invalidating", address, self.ismempool
85                 self.tx_cache.pop(address)
86
87     def safe_sql(self,sql, params=(), lock=True):
88         try:
89             if lock: dblock.acquire()
90             ret = self.selectall(sql,params)
91             if lock: dblock.release()
92             return ret
93         except:
94             print "sql error", sql
95             return []
96
97     def get_tx_outputs(self, tx_id, lock=True):
98         return self.safe_sql("""SELECT
99                 txout.txout_pos,
100                 txout.txout_scriptPubKey,
101                 txout.txout_value,
102                 nexttx.tx_hash,
103                 nexttx.tx_id,
104                 txin.txin_pos,
105                 pubkey.pubkey_hash
106               FROM txout
107               LEFT JOIN txin ON (txin.txout_id = txout.txout_id)
108               LEFT JOIN pubkey ON (pubkey.pubkey_id = txout.pubkey_id)
109               LEFT JOIN tx nexttx ON (txin.tx_id = nexttx.tx_id)
110              WHERE txout.tx_id = %d 
111              ORDER BY txout.txout_pos
112         """%(tx_id), (), lock)
113
114     def get_tx_inputs(self, tx_id, lock=True):
115         return self.safe_sql(""" SELECT
116                 txin.txin_pos,
117                 txin.txin_scriptSig,
118                 txout.txout_value,
119                 COALESCE(prevtx.tx_hash, u.txout_tx_hash),
120                 prevtx.tx_id,
121                 COALESCE(txout.txout_pos, u.txout_pos),
122                 pubkey.pubkey_hash
123               FROM txin
124               LEFT JOIN txout ON (txout.txout_id = txin.txout_id)
125               LEFT JOIN pubkey ON (pubkey.pubkey_id = txout.pubkey_id)
126               LEFT JOIN tx prevtx ON (txout.tx_id = prevtx.tx_id)
127               LEFT JOIN unlinked_txin u ON (u.txin_id = txin.txin_id)
128              WHERE txin.tx_id = %d
129              ORDER BY txin.txin_pos
130              """%(tx_id,), (), lock)
131
132     def get_address_out_rows(self, dbhash):
133         return self.safe_sql(""" SELECT
134                 b.block_nTime,
135                 cc.chain_id,
136                 b.block_height,
137                 1,
138                 b.block_hash,
139                 tx.tx_hash,
140                 tx.tx_id,
141                 txin.txin_pos,
142                 -prevout.txout_value
143               FROM chain_candidate cc
144               JOIN block b ON (b.block_id = cc.block_id)
145               JOIN block_tx ON (block_tx.block_id = b.block_id)
146               JOIN tx ON (tx.tx_id = block_tx.tx_id)
147               JOIN txin ON (txin.tx_id = tx.tx_id)
148               JOIN txout prevout ON (txin.txout_id = prevout.txout_id)
149               JOIN pubkey ON (pubkey.pubkey_id = prevout.pubkey_id)
150              WHERE pubkey.pubkey_hash = ?
151                AND cc.in_longest = 1""", (dbhash,))
152
153     def get_address_out_rows_memorypool(self, dbhash):
154         return self.safe_sql(""" SELECT
155                 1,
156                 tx.tx_hash,
157                 tx.tx_id,
158                 txin.txin_pos,
159                 -prevout.txout_value
160               FROM tx 
161               JOIN txin ON (txin.tx_id = tx.tx_id)
162               JOIN txout prevout ON (txin.txout_id = prevout.txout_id)
163               JOIN pubkey ON (pubkey.pubkey_id = prevout.pubkey_id)
164              WHERE pubkey.pubkey_hash = ? """, (dbhash,))
165
166     def get_address_in_rows(self, dbhash):
167         return self.safe_sql(""" SELECT
168                 b.block_nTime,
169                 cc.chain_id,
170                 b.block_height,
171                 0,
172                 b.block_hash,
173                 tx.tx_hash,
174                 tx.tx_id,
175                 txout.txout_pos,
176                 txout.txout_value
177               FROM chain_candidate cc
178               JOIN block b ON (b.block_id = cc.block_id)
179               JOIN block_tx ON (block_tx.block_id = b.block_id)
180               JOIN tx ON (tx.tx_id = block_tx.tx_id)
181               JOIN txout ON (txout.tx_id = tx.tx_id)
182               JOIN pubkey ON (pubkey.pubkey_id = txout.pubkey_id)
183              WHERE pubkey.pubkey_hash = ?
184                AND cc.in_longest = 1""", (dbhash,))
185
186     def get_address_in_rows_memorypool(self, dbhash):
187         return self.safe_sql( """ SELECT
188                 0,
189                 tx.tx_hash,
190                 tx.tx_id,
191                 txout.txout_pos,
192                 txout.txout_value
193               FROM tx
194               JOIN txout ON (txout.tx_id = tx.tx_id)
195               JOIN pubkey ON (pubkey.pubkey_id = txout.pubkey_id)
196              WHERE pubkey.pubkey_hash = ? """, (dbhash,))
197
198     def get_txpoints(self, addr):
199         
200         if config.get('server','cache') == 'yes':
201             cached_version = self.tx_cache.get( addr ) 
202             if cached_version is not None: 
203                 return cached_version
204
205         version, binaddr = decode_check_address(addr)
206         if binaddr is None:
207             return "err"
208         dbhash = self.binin(binaddr)
209         rows = []
210         rows += self.get_address_out_rows( dbhash )
211         rows += self.get_address_in_rows( dbhash )
212
213         txpoints = []
214         known_tx = []
215
216         for row in rows:
217             try:
218                 nTime, chain_id, height, is_in, blk_hash, tx_hash, tx_id, pos, value = row
219             except:
220                 print "cannot unpack row", row
221                 break
222             tx_hash = self.hashout_hex(tx_hash)
223             txpoint = {
224                     "nTime":    int(nTime),
225                     #"chain_id": int(chain_id),
226                     "height":   int(height),
227                     "is_in":    int(is_in),
228                     "blk_hash": self.hashout_hex(blk_hash),
229                     "tx_hash":  tx_hash,
230                     "tx_id":    int(tx_id),
231                     "pos":      int(pos),
232                     "value":    int(value),
233                     }
234
235             txpoints.append(txpoint)
236             known_tx.append(self.hashout_hex(tx_hash))
237
238
239         # todo: sort them really...
240         txpoints = sorted(txpoints, key=operator.itemgetter("nTime"))
241
242         # read memory pool
243         rows = []
244         rows += self.get_address_in_rows_memorypool( dbhash )
245         rows += self.get_address_out_rows_memorypool( dbhash )
246         address_has_mempool = False
247
248         for row in rows:
249             is_in, tx_hash, tx_id, pos, value = row
250             tx_hash = self.hashout_hex(tx_hash)
251             if tx_hash in known_tx:
252                 continue
253
254             address_has_mempool = True
255             #print "mempool", tx_hash
256             txpoint = {
257                     "nTime":    0,
258                     #"chain_id": 1,
259                     "height":   0,
260                     "is_in":    int(is_in),
261                     "blk_hash": 'mempool',
262                     "tx_hash":  tx_hash,
263                     "tx_id":    int(tx_id),
264                     "pos":      int(pos),
265                     "value":    int(value),
266                     }
267             txpoints.append(txpoint)
268
269
270         for txpoint in txpoints:
271             tx_id = txpoint['tx_id']
272             
273             txinputs = []
274             inrows = self.get_tx_inputs(tx_id)
275             for row in inrows:
276                 _hash = self.binout(row[6])
277                 address = hash_to_address(chr(0), _hash)
278                 txinputs.append(address)
279             txpoint['inputs'] = txinputs
280             txoutputs = []
281             outrows = self.get_tx_outputs(tx_id)
282             for row in outrows:
283                 _hash = self.binout(row[6])
284                 address = hash_to_address(chr(0), _hash)
285                 txoutputs.append(address)
286             txpoint['outputs'] = txoutputs
287
288             # for all unspent inputs, I want their scriptpubkey. (actually I could deduce it from the address)
289             if not txpoint['is_in']:
290                 # detect if already redeemed...
291                 for row in outrows:
292                     if row[6] == dbhash: break
293                 else:
294                     raise
295                 #row = self.get_tx_output(tx_id,dbhash)
296                 # pos, script, value, o_hash, o_id, o_pos, binaddr = row
297                 # if not redeemed, we add the script
298                 if row:
299                     if not row[4]: txpoint['raw_scriptPubKey'] = row[1]
300
301         # cache result
302         if config.get('server','cache') == 'yes' and not address_has_mempool:
303             self.tx_cache[addr] = txpoints
304         
305         return txpoints
306
307
308     def get_status(self, addr):
309         # last block for an address.
310         tx_points = self.get_txpoints(addr)
311         if not tx_points:
312             return None
313         else:
314             return tx_points[-1]['blk_hash']
315
316
317 def send_tx(tx):
318     import bitcoinrpc
319     conn = bitcoinrpc.connect_to_local()
320     try:
321         v = conn.importtransaction(tx)
322     except:
323         v = "error: transaction rejected by memorypool"
324     return v
325
326
327 def listen_thread(store):
328     s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
329     s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
330     s.bind((config.get('server','host'), config.getint('server','port')))
331     s.listen(1)
332     while not stopping:
333         conn, addr = s.accept()
334         thread.start_new_thread(client_thread, (addr, conn,))
335
336 def random_string(N):
337     import random, string
338     return ''.join(random.choice(string.ascii_uppercase + string.digits) for x in range(N))
339
340 def client_thread(ipaddr,conn):
341     #print "client thread", ipaddr
342     try:
343         ipaddr = ipaddr[0]
344         msg = ''
345         while 1:
346             d = conn.recv(1024)
347             msg += d
348             if not d: 
349                 break
350             if d[-1]=='#':
351                 break
352
353         #print msg
354
355         try:
356             cmd, data = ast.literal_eval(msg[:-1])
357         except:
358             print "syntax error", repr(msg)
359             conn.close()
360             return
361
362         if cmd=='b':
363             out = "%d"%block_number
364
365         elif cmd=='session':
366             session_id = random_string(10)
367             try:
368                 addresses = ast.literal_eval(data)
369             except:
370                 print "error"
371                 conn.close()
372                 return
373
374             print time.asctime(), "new session", ipaddr, session_id, addresses[0] if addresses else addresses, len(addresses)
375
376             sessions[session_id] = {}
377             for a in addresses:
378                 sessions[session_id][a] = ''
379             out = repr( (session_id, config.get('server','banner').replace('\\n','\n') ) )
380             sessions_last_time[session_id] = time.time()
381
382         elif cmd=='update_session':
383             try:
384                 session_id, addresses = ast.literal_eval(data)
385             except:
386                 print "error"
387                 conn.close()
388                 return
389
390             print time.asctime(), "update session", ipaddr, session_id, addresses[0] if addresses else addresses, len(addresses)
391
392             sessions[session_id] = {}
393             for a in addresses:
394                 sessions[session_id][a] = ''
395             out = 'ok'
396             sessions_last_time[session_id] = time.time()
397
398         elif cmd=='poll': 
399             session_id = data
400             addresses = sessions.get(session_id)
401             if addresses is None:
402                 print time.asctime(), "session not found", session_id, ipaddr
403                 out = repr( (-1, {}))
404             else:
405                 t1 = time.time()
406                 sessions_last_time[session_id] = time.time()
407                 ret = {}
408                 k = 0
409                 for addr in addresses:
410                     if store.tx_cache.get( addr ) is not None: k += 1
411                     status = store.get_status( addr )
412                     last_status = addresses.get( addr )
413                     if last_status != status:
414                         addresses[addr] = status
415                         ret[addr] = status
416                 if ret:
417                     sessions[session_id] = addresses
418                 out = repr( (block_number, ret ) )
419                 t2 = time.time() - t1 
420                 if t2 > 10:
421                     print "high load:", session_id, "%d/%d"%(k,len(addresses)), t2
422
423         elif cmd == 'h': 
424             # history
425             address = data
426             out = repr( store.get_txpoints( address ) )
427
428         elif cmd == 'load': 
429             if config.get('server','password') == data:
430                 out = repr( len(sessions) )
431             else:
432                 out = 'wrong password'
433
434         elif cmd =='tx':        
435             out = send_tx(data)
436
437         elif cmd =='clear_cache':
438             if config.get('server','password') == data:
439                 self.tx_cache = {}
440                 out = 'ok'
441             else:
442                 out = 'wrong password'
443
444         elif cmd =='get_cache':
445             try:
446                 pw, addr = data
447             except:
448                 addr = None
449             if addr:
450                 if config.get('server','password') == pw:
451                     out = store.tx_cache.get(addr)
452                     out = repr(out)
453                 else:
454                     out = 'wrong password'
455             else:
456                 out = "error: "+ repr(data)
457
458         elif cmd == 'stop':
459             global stopping
460             if config.get('server','password') == data:
461                 stopping = True
462                 out = 'ok'
463             else:
464                 out = 'wrong password'
465
466         elif cmd == 'peers':
467             out = repr(peer_list.values())
468
469         else:
470             out = None
471
472         if out:
473             #print ipaddr, cmd, len(out)
474             try:
475                 conn.send(out)
476             except:
477                 print "error, could not send"
478
479     finally:
480         conn.close()
481     
482
483 ds = BCDataStream.BCDataStream()
484
485
486
487
488 def memorypool_update(store):
489     conn = bitcoinrpc.connect_to_local()
490     try:
491         v = conn.getmemorypool()
492     except:
493         print "cannot contact bitcoin daemon"
494         return
495     v = v['transactions']
496     for hextx in v:
497         ds.clear()
498         ds.write(hextx.decode('hex'))
499         tx = deserialize.parse_Transaction(ds)
500         #print "new tx",tx
501
502         tx['hash'] = util.double_sha256(tx['tx'])
503             
504         if store.tx_find_id_and_value(tx):
505             pass
506         else:
507             store.import_tx(tx, False)
508
509     store.commit()
510
511
512
513
514 def clean_session_thread():
515     while not stopping:
516         time.sleep(30)
517         t = time.time()
518         for k,t0 in sessions_last_time.items():
519             if t - t0 > 5*60:
520                 print time.asctime(), "lost session",k
521                 sessions.pop(k)
522                 sessions_last_time.pop(k)
523             
524
525 def irc_thread():
526     global peer_list
527     NICK = 'E_'+random_string(10)
528     while not stopping:
529         try:
530             s = socket.socket()
531             s.connect(('irc.freenode.net', 6667))
532             s.send('USER electrum 0 * :'+config.get('server','host')+' '+config.get('server','ircname')+'\n')
533             s.send('NICK '+NICK+'\n')
534             s.send('JOIN #electrum\n')
535             sf = s.makefile('r', 0)
536             t = 0
537             while not stopping:
538                 line = sf.readline()
539                 line = line.rstrip('\r\n')
540                 line = line.split()
541                 if line[0]=='PING': 
542                     s.send('PONG '+line[1]+'\n')
543                 elif '353' in line: # answer to /names
544                     k = line.index('353')
545                     for item in line[k+1:]:
546                         if item[0:2] == 'E_':
547                             s.send('WHO %s\n'%item)
548                 elif '352' in line: # answer to /who
549                     # warning: this is a horrible hack which apparently works
550                     k = line.index('352')
551                     ip = line[k+4]
552                     ip = socket.gethostbyname(ip)
553                     name = line[k+6]
554                     host = line[k+9]
555                     peer_list[name] = (ip,host)
556                 if time.time() - t > 5*60:
557                     s.send('NAMES #electrum\n')
558                     t = time.time()
559                     peer_list = {}
560         except:
561             traceback.print_exc(file=sys.stdout)
562         finally:
563             sf.close()
564             s.close()
565
566
567 import traceback
568
569
570 if __name__ == '__main__':
571
572     if len(sys.argv)>1:
573         cmd = sys.argv[1]
574         if cmd == 'load':
575             request = "('load','%s')#"%config.get('server','password')
576         elif cmd == 'peers':
577             request = "('peers','')#"
578         elif cmd == 'stop':
579             request = "('stop','%s')#"%config.get('server','password')
580         elif cmd == 'clear_cache':
581             request = "('clear_cache','%s')#"%config.get('server','password')
582         elif cmd == 'get_cache':
583             request = "('get_cache',('%s','%s'))#"%(config.get('server','password'),sys.argv[2])
584         elif cmd == 'h':
585             request = "('h','%s')#"%sys.argv[2]
586         elif cmd == 'b':
587             request = "('b','')#"
588
589         s = socket.socket( socket.AF_INET, socket.SOCK_STREAM)
590         s.connect((config.get('server','host'), config.getint('server','port')))
591         s.send( request )
592         out = ''
593         while 1:
594             msg = s.recv(1024)
595             if msg: out += msg
596             else: break
597         s.close()
598         print out
599         sys.exit(0)
600
601
602     print "starting Electrum server"
603     print "cache:", config.get('server', 'cache')
604
605     conf = DataStore.CONFIG_DEFAULTS
606     args, argv = readconf.parse_argv( [], conf)
607     args.dbtype= config.get('database','type')
608     if args.dbtype == 'sqlite3':
609         args.connect_args = { 'database' : config.get('database','database') }
610     elif args.dbtype == 'MySQLdb':
611         args.connect_args = { 'db' : config.get('database','database'), 'user' : config.get('database','username'), 'passwd' : config.get('database','password') }
612     elif args.dbtype == 'psycopg2':
613         args.connect_args = { 'database' : config.get('database','database') }
614     store = MyStore(args)
615     store.tx_cache = {}
616     store.ismempool = False
617
618     thread.start_new_thread(listen_thread, (store,))
619     thread.start_new_thread(clean_session_thread, ())
620     if (config.get('server','irc') == 'yes' ):
621         thread.start_new_thread(irc_thread, ())
622
623     while not stopping:
624         try:
625             dblock.acquire()
626             store.catch_up()
627             store.ismempool = True
628             memorypool_update(store)
629             store.ismempool = False
630             block_number = store.get_block_number(1)
631             dblock.release()
632         except:
633             traceback.print_exc(file=sys.stdout)
634         time.sleep(10)
635
636     print "server stopped"
637