minor fix in recv loop
[electrum-server.git] / server.py
1 #!/usr/bin/env python
2 # Copyright(C) 2011 thomasv@gitorious
3
4 # This program is free software: you can redistribute it and/or modify
5 # it under the terms of the GNU Affero General Public License as
6 # published by the Free Software Foundation, either version 3 of the
7 # License, or (at your option) any later version.
8 #
9 # This program is distributed in the hope that it will be useful, but
10 # WITHOUT ANY WARRANTY; without even the implied warranty of
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
12 # Affero General Public License for more details.
13 #
14 # You should have received a copy of the GNU Affero General Public
15 # License along with this program.  If not, see
16 # <http://www.gnu.org/licenses/agpl.html>.
17
18 """
19 Todo:
20    * server should check and return bitcoind status..
21    * improve txpoint sorting
22    * command to check cache
23 """
24
25
26 import time, socket, operator, thread, ast, sys,re
27 import psycopg2, binascii
28 import bitcoinrpc
29
30 from Abe.abe import hash_to_address, decode_check_address
31 from Abe.DataStore import DataStore as Datastore_class
32 from Abe import DataStore, readconf, BCDataStream,  deserialize, util, base58
33
34 import ConfigParser
35
36 config = ConfigParser.ConfigParser()
37 # set some defaults, which will be overwritten by the config file
38 config.add_section('server')
39 config.set('server','banner', 'Welcome to Electrum!')
40 config.set('server', 'host', 'localhost')
41 config.set('server', 'port', 50000)
42 config.set('server', 'password', '')
43 config.set('server', 'irc', 'yes')
44 config.set('server', 'cache', 'no') 
45 config.set('server', 'ircname', 'Electrum server')
46 config.add_section('database')
47 config.set('database', 'type', 'psycopg2')
48 config.set('database', 'database', 'abe')
49
50 try:
51     f = open('/etc/electrum.conf','r')
52     config.readfp(f)
53     f.close()
54 except:
55     print "Could not read electrum.conf. I will use the default values."
56
57 stopping = False
58 block_number = -1
59 sessions = {}
60 sessions_last_time = {}
61 dblock = thread.allocate_lock()
62 peer_list = {}
63
64
65 class MyStore(Datastore_class):
66
67     def import_tx(self, tx, is_coinbase):
68         tx_id = super(MyStore, self).import_tx(tx, is_coinbase)
69         if config.get('server', 'cache') == 'yes': self.update_tx_cache(tx_id)
70
71     def update_tx_cache(self, txid):
72         inrows = self.get_tx_inputs(txid, False)
73         for row in inrows:
74             _hash = store.binout(row[6])
75             address = hash_to_address(chr(0), _hash)
76             if self.tx_cache.has_key(address):
77                 print "cache: invalidating", address, self.ismempool
78                 self.tx_cache.pop(address)
79         outrows = self.get_tx_outputs(txid, False)
80         for row in outrows:
81             _hash = store.binout(row[6])
82             address = hash_to_address(chr(0), _hash)
83             if self.tx_cache.has_key(address):
84                 print "cache: invalidating", address, self.ismempool
85                 self.tx_cache.pop(address)
86
87     def safe_sql(self,sql, params=(), lock=True):
88         try:
89             if lock: dblock.acquire()
90             ret = self.selectall(sql,params)
91             if lock: dblock.release()
92             return ret
93         except:
94             print "sql error", sql
95             return []
96
97     def get_tx_outputs(self, tx_id, lock=True):
98         return self.safe_sql("""SELECT
99                 txout.txout_pos,
100                 txout.txout_scriptPubKey,
101                 txout.txout_value,
102                 nexttx.tx_hash,
103                 nexttx.tx_id,
104                 txin.txin_pos,
105                 pubkey.pubkey_hash
106               FROM txout
107               LEFT JOIN txin ON (txin.txout_id = txout.txout_id)
108               LEFT JOIN pubkey ON (pubkey.pubkey_id = txout.pubkey_id)
109               LEFT JOIN tx nexttx ON (txin.tx_id = nexttx.tx_id)
110              WHERE txout.tx_id = %d 
111              ORDER BY txout.txout_pos
112         """%(tx_id), (), lock)
113
114     def get_tx_inputs(self, tx_id, lock=True):
115         return self.safe_sql(""" SELECT
116                 txin.txin_pos,
117                 txin.txin_scriptSig,
118                 txout.txout_value,
119                 COALESCE(prevtx.tx_hash, u.txout_tx_hash),
120                 prevtx.tx_id,
121                 COALESCE(txout.txout_pos, u.txout_pos),
122                 pubkey.pubkey_hash
123               FROM txin
124               LEFT JOIN txout ON (txout.txout_id = txin.txout_id)
125               LEFT JOIN pubkey ON (pubkey.pubkey_id = txout.pubkey_id)
126               LEFT JOIN tx prevtx ON (txout.tx_id = prevtx.tx_id)
127               LEFT JOIN unlinked_txin u ON (u.txin_id = txin.txin_id)
128              WHERE txin.tx_id = %d
129              ORDER BY txin.txin_pos
130              """%(tx_id,), (), lock)
131
132     def get_address_out_rows(self, dbhash):
133         return self.safe_sql(""" SELECT
134                 b.block_nTime,
135                 cc.chain_id,
136                 b.block_height,
137                 1,
138                 b.block_hash,
139                 tx.tx_hash,
140                 tx.tx_id,
141                 txin.txin_pos,
142                 -prevout.txout_value
143               FROM chain_candidate cc
144               JOIN block b ON (b.block_id = cc.block_id)
145               JOIN block_tx ON (block_tx.block_id = b.block_id)
146               JOIN tx ON (tx.tx_id = block_tx.tx_id)
147               JOIN txin ON (txin.tx_id = tx.tx_id)
148               JOIN txout prevout ON (txin.txout_id = prevout.txout_id)
149               JOIN pubkey ON (pubkey.pubkey_id = prevout.pubkey_id)
150              WHERE pubkey.pubkey_hash = ?
151                AND cc.in_longest = 1""", (dbhash,))
152
153     def get_address_out_rows_memorypool(self, dbhash):
154         return self.safe_sql(""" SELECT
155                 1,
156                 tx.tx_hash,
157                 tx.tx_id,
158                 txin.txin_pos,
159                 -prevout.txout_value
160               FROM tx 
161               JOIN txin ON (txin.tx_id = tx.tx_id)
162               JOIN txout prevout ON (txin.txout_id = prevout.txout_id)
163               JOIN pubkey ON (pubkey.pubkey_id = prevout.pubkey_id)
164              WHERE pubkey.pubkey_hash = ? """, (dbhash,))
165
166     def get_address_in_rows(self, dbhash):
167         return self.safe_sql(""" SELECT
168                 b.block_nTime,
169                 cc.chain_id,
170                 b.block_height,
171                 0,
172                 b.block_hash,
173                 tx.tx_hash,
174                 tx.tx_id,
175                 txout.txout_pos,
176                 txout.txout_value
177               FROM chain_candidate cc
178               JOIN block b ON (b.block_id = cc.block_id)
179               JOIN block_tx ON (block_tx.block_id = b.block_id)
180               JOIN tx ON (tx.tx_id = block_tx.tx_id)
181               JOIN txout ON (txout.tx_id = tx.tx_id)
182               JOIN pubkey ON (pubkey.pubkey_id = txout.pubkey_id)
183              WHERE pubkey.pubkey_hash = ?
184                AND cc.in_longest = 1""", (dbhash,))
185
186     def get_address_in_rows_memorypool(self, dbhash):
187         return self.safe_sql( """ SELECT
188                 0,
189                 tx.tx_hash,
190                 tx.tx_id,
191                 txout.txout_pos,
192                 txout.txout_value
193               FROM tx
194               JOIN txout ON (txout.tx_id = tx.tx_id)
195               JOIN pubkey ON (pubkey.pubkey_id = txout.pubkey_id)
196              WHERE pubkey.pubkey_hash = ? """, (dbhash,))
197
198     def get_txpoints(self, addr):
199         
200         if config.get('server','cache') == 'yes':
201             cached_version = self.tx_cache.get( addr ) 
202             if cached_version is not None: 
203                 return cached_version
204
205         version, binaddr = decode_check_address(addr)
206         if binaddr is None:
207             return "err"
208         dbhash = self.binin(binaddr)
209         rows = []
210         rows += self.get_address_out_rows( dbhash )
211         rows += self.get_address_in_rows( dbhash )
212
213         txpoints = []
214         known_tx = []
215
216         for row in rows:
217             try:
218                 nTime, chain_id, height, is_in, blk_hash, tx_hash, tx_id, pos, value = row
219             except:
220                 print "cannot unpack row", row
221                 break
222             tx_hash = self.hashout_hex(tx_hash)
223             txpoint = {
224                     "nTime":    int(nTime),
225                     #"chain_id": int(chain_id),
226                     "height":   int(height),
227                     "is_in":    int(is_in),
228                     "blk_hash": self.hashout_hex(blk_hash),
229                     "tx_hash":  tx_hash,
230                     "tx_id":    int(tx_id),
231                     "pos":      int(pos),
232                     "value":    int(value),
233                     }
234
235             txpoints.append(txpoint)
236             known_tx.append(self.hashout_hex(tx_hash))
237
238
239         # todo: sort them really...
240         txpoints = sorted(txpoints, key=operator.itemgetter("nTime"))
241
242         # read memory pool
243         rows = []
244         rows += self.get_address_in_rows_memorypool( dbhash )
245         rows += self.get_address_out_rows_memorypool( dbhash )
246         address_has_mempool = False
247
248         for row in rows:
249             is_in, tx_hash, tx_id, pos, value = row
250             tx_hash = self.hashout_hex(tx_hash)
251             if tx_hash in known_tx:
252                 continue
253
254             address_has_mempool = True
255             #print "mempool", tx_hash
256             txpoint = {
257                     "nTime":    0,
258                     #"chain_id": 1,
259                     "height":   0,
260                     "is_in":    int(is_in),
261                     "blk_hash": 'mempool',
262                     "tx_hash":  tx_hash,
263                     "tx_id":    int(tx_id),
264                     "pos":      int(pos),
265                     "value":    int(value),
266                     }
267             txpoints.append(txpoint)
268
269
270         for txpoint in txpoints:
271             tx_id = txpoint['tx_id']
272             
273             txinputs = []
274             inrows = self.get_tx_inputs(tx_id)
275             for row in inrows:
276                 _hash = self.binout(row[6])
277                 address = hash_to_address(chr(0), _hash)
278                 txinputs.append(address)
279             txpoint['inputs'] = txinputs
280             txoutputs = []
281             outrows = self.get_tx_outputs(tx_id)
282             for row in outrows:
283                 _hash = self.binout(row[6])
284                 address = hash_to_address(chr(0), _hash)
285                 txoutputs.append(address)
286             txpoint['outputs'] = txoutputs
287
288             # for all unspent inputs, I want their scriptpubkey. (actually I could deduce it from the address)
289             if not txpoint['is_in']:
290                 # detect if already redeemed...
291                 for row in outrows:
292                     if row[6] == dbhash: break
293                 else:
294                     raise
295                 #row = self.get_tx_output(tx_id,dbhash)
296                 # pos, script, value, o_hash, o_id, o_pos, binaddr = row
297                 # if not redeemed, we add the script
298                 if row:
299                     if not row[4]: txpoint['raw_scriptPubKey'] = row[1]
300
301         # cache result
302         if config.get('server','cache') == 'yes' and not address_has_mempool:
303             self.tx_cache[addr] = txpoints
304         
305         return txpoints
306
307
308     def get_status(self, addr):
309         # last block for an address.
310         tx_points = self.get_txpoints(addr)
311         if not tx_points:
312             return None
313         else:
314             return tx_points[-1]['blk_hash']
315
316
317 def send_tx(tx):
318     import bitcoinrpc
319     conn = bitcoinrpc.connect_to_local()
320     try:
321         v = conn.importtransaction(tx)
322     except:
323         v = "error: transaction rejected by memorypool"
324     return v
325
326
327 def listen_thread(store):
328     s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
329     s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
330     s.bind((config.get('server','host'), config.getint('server','port')))
331     s.listen(1)
332     while not stopping:
333         conn, addr = s.accept()
334         thread.start_new_thread(client_thread, (addr, conn,))
335
336 def random_string(N):
337     import random, string
338     return ''.join(random.choice(string.ascii_uppercase + string.digits) for x in range(N))
339
340 def client_thread(ipaddr,conn):
341     #print "client thread", ipaddr
342     try:
343         ipaddr = ipaddr[0]
344         msg = ''
345         while 1:
346             d = conn.recv(1024)
347             msg += d
348             if not d: 
349                 break
350             if d[-1]=='#':
351                 break
352
353         #print msg
354
355         try:
356             cmd, data = ast.literal_eval(msg[:-1])
357         except:
358             print "syntax error", repr(msg)
359             conn.close()
360             return
361
362         if cmd=='b':
363             out = "%d"%block_number
364         elif cmd=='session':
365             session_id = random_string(10)
366             try:
367                 addresses = ast.literal_eval(data)
368             except:
369                 print "error"
370                 conn.close()
371                 return
372
373             print time.asctime(), "session", ipaddr, session_id, addresses[0] if addresses else addresses, len(addresses)
374
375             sessions[session_id] = {}
376             for a in addresses:
377                 sessions[session_id][a] = ''
378             out = repr( (session_id, config.get('server','banner').replace('\\n','\n') ) )
379             sessions_last_time[session_id] = time.time()
380
381         elif cmd=='poll': 
382             session_id = data
383             addresses = sessions.get(session_id)
384             if addresses is None:
385                 print time.asctime(), " Session not found", session_id, ipaddr
386                 out = repr( (-1, {}))
387             else:
388                 t1 = time.time()
389                 sessions_last_time[session_id] = time.time()
390                 ret = {}
391                 k = 0
392                 for addr in addresses:
393                     if store.tx_cache.get( addr ) is not None: k += 1
394                     status = store.get_status( addr )
395                     last_status = addresses.get( addr )
396                     if last_status != status:
397                         addresses[addr] = status
398                         ret[addr] = status
399                 if ret:
400                     sessions[session_id] = addresses
401                 out = repr( (block_number, ret ) )
402                 t2 = time.time() - t1 
403                 if t2 > 10:
404                     print "high load:", session_id, "%d/%d"%(k,len(addresses)), t2
405
406         elif cmd == 'h': 
407             # history
408             address = data
409             out = repr( store.get_txpoints( address ) )
410
411         elif cmd == 'load': 
412             if config.get('server','password') == data:
413                 out = repr( len(sessions) )
414             else:
415                 out = 'wrong password'
416
417         elif cmd =='tx':        
418             out = send_tx(data)
419
420         elif cmd == 'stop':
421             global stopping
422             if config.get('server','password') == data:
423                 stopping = True
424                 out = 'ok'
425             else:
426                 out = 'wrong password'
427
428         elif cmd == 'peers':
429             out = repr(peer_list.values())
430
431         else:
432             out = None
433
434         if out:
435             #print ipaddr, cmd, len(out)
436             try:
437                 conn.send(out)
438             except:
439                 print "error, could not send"
440
441     finally:
442         conn.close()
443     
444
445 ds = BCDataStream.BCDataStream()
446
447
448
449
450 def memorypool_update(store):
451     conn = bitcoinrpc.connect_to_local()
452     try:
453         v = conn.getmemorypool()
454     except:
455         print "cannot contact bitcoin daemon"
456         return
457     v = v['transactions']
458     for hextx in v:
459         ds.clear()
460         ds.write(hextx.decode('hex'))
461         tx = deserialize.parse_Transaction(ds)
462         #print "new tx",tx
463
464         tx['hash'] = util.double_sha256(tx['tx'])
465             
466         if store.tx_find_id_and_value(tx):
467             pass
468         else:
469             store.import_tx(tx, False)
470
471     store.commit()
472
473
474
475
476 def clean_session_thread():
477     while not stopping:
478         time.sleep(30)
479         t = time.time()
480         for k,t0 in sessions_last_time.items():
481             if t - t0 > 5*60:
482                 print time.asctime(), "lost session",k
483                 sessions.pop(k)
484                 sessions_last_time.pop(k)
485             
486
487 def irc_thread():
488     global peer_list
489     NICK = 'E_'+random_string(10)
490     while not stopping:
491         try:
492             s = socket.socket()
493             s.connect(('irc.freenode.net', 6667))
494             s.send('USER electrum 0 * :'+config.get('server','host')+' '+config.get('server','ircname')+'\n')
495             s.send('NICK '+NICK+'\n')
496             s.send('JOIN #electrum\n')
497             sf = s.makefile('r', 0)
498             t = 0
499             while not stopping:
500                 line = sf.readline()
501                 line = line.rstrip('\r\n')
502                 line = line.split()
503                 if line[0]=='PING': 
504                     s.send('PONG '+line[1]+'\n')
505                 elif '353' in line: # answer to /names
506                     k = line.index('353')
507                     for item in line[k+1:]:
508                         if item[0:2] == 'E_':
509                             s.send('WHO %s\n'%item)
510                 elif '352' in line: # answer to /who
511                     # warning: this is a horrible hack which apparently works
512                     k = line.index('352')
513                     ip = line[k+4]
514                     ip = socket.gethostbyname(ip)
515                     name = line[k+6]
516                     host = line[k+9]
517                     peer_list[name] = (ip,host)
518                 if time.time() - t > 5*60:
519                     s.send('NAMES #electrum\n')
520                     t = time.time()
521                     peer_list = {}
522         except:
523             traceback.print_exc(file=sys.stdout)
524         finally:
525             sf.close()
526             s.close()
527
528
529 import traceback
530
531
532 if __name__ == '__main__':
533
534     if len(sys.argv)>1:
535         cmd = sys.argv[1]
536         if cmd == 'load':
537             request = "('load','%s')#"%config.get('server','password')
538         elif cmd == 'peers':
539             request = "('peers','')#"
540         elif cmd == 'stop':
541             request = "('stop','%s')#"%config.get('server','password')
542         elif cmd == 'h':
543             request = "('h','%s')#"%sys.argv[2]
544         elif cmd == 'b':
545             request = "('b','')#"
546
547         s = socket.socket( socket.AF_INET, socket.SOCK_STREAM)
548         s.connect((config.get('server','host'), config.getint('server','port')))
549         s.send( request )
550         out = ''
551         while 1:
552             msg = s.recv(1024)
553             if msg: out += msg
554             else: break
555         s.close()
556         print out
557         sys.exit(0)
558
559
560     print "starting Electrum server"
561     print "cache:", config.get('server', 'cache')
562
563     conf = DataStore.CONFIG_DEFAULTS
564     args, argv = readconf.parse_argv( [], conf)
565     args.dbtype= config.get('database','type')
566     if args.dbtype == 'sqlite3':
567         args.connect_args = { 'database' : config.get('database','database') }
568     elif args.dbtype == 'MySQLdb':
569         args.connect_args = { 'db' : config.get('database','database'), 'user' : config.get('database','username'), 'passwd' : config.get('database','password') }
570     elif args.dbtype == 'psycopg2':
571         args.connect_args = { 'database' : config.get('database','database') }
572     store = MyStore(args)
573     store.tx_cache = {}
574     store.ismempool = False
575
576     thread.start_new_thread(listen_thread, (store,))
577     thread.start_new_thread(clean_session_thread, ())
578     if (config.get('server','irc') == 'yes' ):
579         thread.start_new_thread(irc_thread, ())
580
581     while not stopping:
582         try:
583             dblock.acquire()
584             store.catch_up()
585             store.ismempool = True
586             memorypool_update(store)
587             store.ismempool = False
588             block_number = store.get_block_number(1)
589             dblock.release()
590         except:
591             traceback.print_exc(file=sys.stdout)
592         time.sleep(10)
593
594     print "server stopped"
595