fix for memcache
[electrum-server.git] / server.py
1 #!/usr/bin/env python
2 # Copyright(C) 2011 thomasv@gitorious
3
4 # This program is free software: you can redistribute it and/or modify
5 # it under the terms of the GNU Affero General Public License as
6 # published by the Free Software Foundation, either version 3 of the
7 # License, or (at your option) any later version.
8 #
9 # This program is distributed in the hope that it will be useful, but
10 # WITHOUT ANY WARRANTY; without even the implied warranty of
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
12 # Affero General Public License for more details.
13 #
14 # You should have received a copy of the GNU Affero General Public
15 # License along with this program.  If not, see
16 # <http://www.gnu.org/licenses/agpl.html>.
17
18 """
19 Todo:
20    * server should check and return bitcoind status..
21    * improve txpoint sorting
22    * command to check cache
23 """
24
25
26 import time, socket, operator, thread, ast, sys,re
27 import psycopg2, binascii
28 import bitcoinrpc
29
30 from Abe.abe import hash_to_address, decode_check_address
31 from Abe.DataStore import DataStore as Datastore_class
32 from Abe import DataStore, readconf, BCDataStream,  deserialize, util, base58
33
34 import ConfigParser
35
36 config = ConfigParser.ConfigParser()
37 # set some defaults, which will be overwritten by the config file
38 config.add_section('server')
39 config.set('server','banner', 'Welcome to Electrum!')
40 config.set('server', 'host', 'localhost')
41 config.set('server', 'port', 50000)
42 config.set('server', 'password', '')
43 config.set('server', 'irc', 'yes')
44 config.set('server', 'cache', 'no') 
45 config.set('server', 'ircname', 'Electrum server')
46 config.add_section('database')
47 config.set('database', 'type', 'psycopg2')
48 config.set('database', 'database', 'abe')
49
50 try:
51     f = open('/etc/electrum.conf','r')
52     config.readfp(f)
53     f.close()
54 except:
55     print "Could not read electrum.conf. I will use the default values."
56
57 stopping = False
58 block_number = -1
59 sessions = {}
60 sessions_last_time = {}
61 dblock = thread.allocate_lock()
62 peer_list = {}
63
64
65 class MyStore(Datastore_class):
66
67     def import_tx(self, tx, is_coinbase):
68         tx_id = super(MyStore, self).import_tx(tx, is_coinbase)
69         if config.get('server', 'cache') == 'yes': self.update_tx_cache(tx_id)
70
71     def update_tx_cache(self, txid):
72         inrows = self.get_tx_inputs(txid, False)
73         for row in inrows:
74             _hash = store.binout(row[6])
75             address = hash_to_address(chr(0), _hash)
76             if self.tx_cache.has_key(address):
77                 print "cache: invalidating", address, self.ismempool
78                 self.tx_cache.pop(address)
79         outrows = self.get_tx_outputs(txid, False)
80         for row in outrows:
81             _hash = store.binout(row[6])
82             address = hash_to_address(chr(0), _hash)
83             if self.tx_cache.has_key(address):
84                 print "cache: invalidating", address, self.ismempool
85                 self.tx_cache.pop(address)
86
87     def safe_sql(self,sql, params=(), lock=True):
88         try:
89             if lock: dblock.acquire()
90             ret = self.selectall(sql,params)
91             if lock: dblock.release()
92             return ret
93         except:
94             print "sql error", sql
95             return []
96
97     def get_tx_outputs(self, tx_id, lock=True):
98         return self.safe_sql("""SELECT
99                 txout.txout_pos,
100                 txout.txout_scriptPubKey,
101                 txout.txout_value,
102                 nexttx.tx_hash,
103                 nexttx.tx_id,
104                 txin.txin_pos,
105                 pubkey.pubkey_hash
106               FROM txout
107               LEFT JOIN txin ON (txin.txout_id = txout.txout_id)
108               LEFT JOIN pubkey ON (pubkey.pubkey_id = txout.pubkey_id)
109               LEFT JOIN tx nexttx ON (txin.tx_id = nexttx.tx_id)
110              WHERE txout.tx_id = %d 
111              ORDER BY txout.txout_pos
112         """%(tx_id), (), lock)
113
114     def get_tx_inputs(self, tx_id, lock=True):
115         return self.safe_sql(""" SELECT
116                 txin.txin_pos,
117                 txin.txin_scriptSig,
118                 txout.txout_value,
119                 COALESCE(prevtx.tx_hash, u.txout_tx_hash),
120                 prevtx.tx_id,
121                 COALESCE(txout.txout_pos, u.txout_pos),
122                 pubkey.pubkey_hash
123               FROM txin
124               LEFT JOIN txout ON (txout.txout_id = txin.txout_id)
125               LEFT JOIN pubkey ON (pubkey.pubkey_id = txout.pubkey_id)
126               LEFT JOIN tx prevtx ON (txout.tx_id = prevtx.tx_id)
127               LEFT JOIN unlinked_txin u ON (u.txin_id = txin.txin_id)
128              WHERE txin.tx_id = %d
129              ORDER BY txin.txin_pos
130              """%(tx_id,), (), lock)
131
132     def get_address_out_rows(self, dbhash):
133         return self.safe_sql(""" SELECT
134                 b.block_nTime,
135                 cc.chain_id,
136                 b.block_height,
137                 1,
138                 b.block_hash,
139                 tx.tx_hash,
140                 tx.tx_id,
141                 txin.txin_pos,
142                 -prevout.txout_value
143               FROM chain_candidate cc
144               JOIN block b ON (b.block_id = cc.block_id)
145               JOIN block_tx ON (block_tx.block_id = b.block_id)
146               JOIN tx ON (tx.tx_id = block_tx.tx_id)
147               JOIN txin ON (txin.tx_id = tx.tx_id)
148               JOIN txout prevout ON (txin.txout_id = prevout.txout_id)
149               JOIN pubkey ON (pubkey.pubkey_id = prevout.pubkey_id)
150              WHERE pubkey.pubkey_hash = ?
151                AND cc.in_longest = 1""", (dbhash,))
152
153     def get_address_out_rows_memorypool(self, dbhash):
154         return self.safe_sql(""" SELECT
155                 1,
156                 tx.tx_hash,
157                 tx.tx_id,
158                 txin.txin_pos,
159                 -prevout.txout_value
160               FROM tx 
161               JOIN txin ON (txin.tx_id = tx.tx_id)
162               JOIN txout prevout ON (txin.txout_id = prevout.txout_id)
163               JOIN pubkey ON (pubkey.pubkey_id = prevout.pubkey_id)
164              WHERE pubkey.pubkey_hash = ? """, (dbhash,))
165
166     def get_address_in_rows(self, dbhash):
167         return self.safe_sql(""" SELECT
168                 b.block_nTime,
169                 cc.chain_id,
170                 b.block_height,
171                 0,
172                 b.block_hash,
173                 tx.tx_hash,
174                 tx.tx_id,
175                 txout.txout_pos,
176                 txout.txout_value
177               FROM chain_candidate cc
178               JOIN block b ON (b.block_id = cc.block_id)
179               JOIN block_tx ON (block_tx.block_id = b.block_id)
180               JOIN tx ON (tx.tx_id = block_tx.tx_id)
181               JOIN txout ON (txout.tx_id = tx.tx_id)
182               JOIN pubkey ON (pubkey.pubkey_id = txout.pubkey_id)
183              WHERE pubkey.pubkey_hash = ?
184                AND cc.in_longest = 1""", (dbhash,))
185
186     def get_address_in_rows_memorypool(self, dbhash):
187         return self.safe_sql( """ SELECT
188                 0,
189                 tx.tx_hash,
190                 tx.tx_id,
191                 txout.txout_pos,
192                 txout.txout_value
193               FROM tx
194               JOIN txout ON (txout.tx_id = tx.tx_id)
195               JOIN pubkey ON (pubkey.pubkey_id = txout.pubkey_id)
196              WHERE pubkey.pubkey_hash = ? """, (dbhash,))
197
198     def get_txpoints(self, addr):
199         
200         if config.get('server','cache') == 'yes':
201             cached_version = self.tx_cache.get( addr ) 
202             if cached_version is not None: 
203                 return cached_version
204
205         version, binaddr = decode_check_address(addr)
206         if binaddr is None:
207             return "err"
208         dbhash = self.binin(binaddr)
209         rows = []
210         rows += self.get_address_out_rows( dbhash )
211         rows += self.get_address_in_rows( dbhash )
212
213         txpoints = []
214         known_tx = []
215
216         for row in rows:
217             try:
218                 nTime, chain_id, height, is_in, blk_hash, tx_hash, tx_id, pos, value = row
219             except:
220                 print "cannot unpack row", row
221                 break
222             tx_hash = self.hashout_hex(tx_hash)
223             txpoint = {
224                     "nTime":    int(nTime),
225                     #"chain_id": int(chain_id),
226                     "height":   int(height),
227                     "is_in":    int(is_in),
228                     "blk_hash": self.hashout_hex(blk_hash),
229                     "tx_hash":  tx_hash,
230                     "tx_id":    int(tx_id),
231                     "pos":      int(pos),
232                     "value":    int(value),
233                     }
234
235             txpoints.append(txpoint)
236             known_tx.append(self.hashout_hex(tx_hash))
237
238
239         # todo: sort them really...
240         txpoints = sorted(txpoints, key=operator.itemgetter("nTime"))
241
242         # read memory pool
243         rows = []
244         rows += self.get_address_in_rows_memorypool( dbhash )
245         rows += self.get_address_out_rows_memorypool( dbhash )
246         address_has_mempool = False
247
248         for row in rows:
249             is_in, tx_hash, tx_id, pos, value = row
250             tx_hash = self.hashout_hex(tx_hash)
251             if tx_hash in known_tx:
252                 continue
253
254             address_has_mempool = True
255             #print "mempool", tx_hash
256             txpoint = {
257                     "nTime":    0,
258                     #"chain_id": 1,
259                     "height":   0,
260                     "is_in":    int(is_in),
261                     "blk_hash": 'mempool',
262                     "tx_hash":  tx_hash,
263                     "tx_id":    int(tx_id),
264                     "pos":      int(pos),
265                     "value":    int(value),
266                     }
267             txpoints.append(txpoint)
268
269
270         for txpoint in txpoints:
271             tx_id = txpoint['tx_id']
272             
273             txinputs = []
274             inrows = self.get_tx_inputs(tx_id)
275             for row in inrows:
276                 _hash = self.binout(row[6])
277                 address = hash_to_address(chr(0), _hash)
278                 txinputs.append(address)
279             txpoint['inputs'] = txinputs
280             txoutputs = []
281             outrows = self.get_tx_outputs(tx_id)
282             for row in outrows:
283                 _hash = self.binout(row[6])
284                 address = hash_to_address(chr(0), _hash)
285                 txoutputs.append(address)
286             txpoint['outputs'] = txoutputs
287
288             # for all unspent inputs, I want their scriptpubkey. (actually I could deduce it from the address)
289             if not txpoint['is_in']:
290                 # detect if already redeemed...
291                 for row in outrows:
292                     if row[6] == dbhash: break
293                 else:
294                     raise
295                 #row = self.get_tx_output(tx_id,dbhash)
296                 # pos, script, value, o_hash, o_id, o_pos, binaddr = row
297                 # if not redeemed, we add the script
298                 if row:
299                     if not row[4]: txpoint['raw_scriptPubKey'] = row[1]
300
301         # cache result
302         if config.get('server','cache') == 'yes' and not address_has_mempool:
303             self.tx_cache[addr] = txpoints
304         
305         return txpoints
306
307
308     def get_status(self, addr):
309         # last block for an address.
310         tx_points = self.get_txpoints(addr)
311         if not tx_points:
312             return None
313         else:
314             return tx_points[-1]['blk_hash']
315
316
317 def send_tx(tx):
318     import bitcoinrpc
319     conn = bitcoinrpc.connect_to_local()
320     try:
321         v = conn.importtransaction(tx)
322     except:
323         v = "error: transaction rejected by memorypool"
324     return v
325
326
327 def listen_thread(store):
328     s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
329     s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
330     s.bind((config.get('server','host'), config.getint('server','port')))
331     s.listen(1)
332     while not stopping:
333         conn, addr = s.accept()
334         thread.start_new_thread(client_thread, (addr, conn,))
335
336 def random_string(N):
337     import random, string
338     return ''.join(random.choice(string.ascii_uppercase + string.digits) for x in range(N))
339
340 def client_thread(ipaddr,conn):
341     #print "client thread", ipaddr
342     try:
343         ipaddr = ipaddr[0]
344         msg = ''
345         while 1:
346             d = conn.recv(1024)
347             msg += d
348             if d[-1]=='#':
349                 break
350
351         #print msg
352
353         try:
354             cmd, data = ast.literal_eval(msg[:-1])
355         except:
356             print "syntax error", repr(msg)
357             conn.close()
358             return
359
360         if cmd=='b':
361             out = "%d"%block_number
362         elif cmd=='session':
363             session_id = random_string(10)
364             try:
365                 addresses = ast.literal_eval(data)
366             except:
367                 print "error"
368                 conn.close()
369                 return
370
371             print time.asctime(), "session", ipaddr, session_id, addresses[0] if addresses else addresses, len(addresses)
372
373             sessions[session_id] = {}
374             for a in addresses:
375                 sessions[session_id][a] = ''
376             out = repr( (session_id, config.get('server','banner').replace('\\n','\n') ) )
377             sessions_last_time[session_id] = time.time()
378
379         elif cmd=='poll': 
380             session_id = data
381             addresses = sessions.get(session_id)
382             if addresses is None:
383                 print time.asctime(), " Session not found", session_id, ipaddr
384                 out = repr( (-1, {}))
385             else:
386                 t1 = time.time()
387                 sessions_last_time[session_id] = time.time()
388                 ret = {}
389                 k = 0
390                 for addr in addresses:
391                     if store.tx_cache.get( addr ) is not None: k += 1
392                     status = store.get_status( addr )
393                     last_status = addresses.get( addr )
394                     if last_status != status:
395                         addresses[addr] = status
396                         ret[addr] = status
397                 if ret:
398                     sessions[session_id] = addresses
399                 out = repr( (block_number, ret ) )
400                 t2 = time.time() - t1 
401                 if t2 > 10:
402                     print "high load:", session_id, "%d/%d"%(k,len(addresses)), t2
403
404         elif cmd == 'h': 
405             # history
406             address = data
407             out = repr( store.get_txpoints( address ) )
408
409         elif cmd == 'load': 
410             if config.get('server','password') == data:
411                 out = repr( len(sessions) )
412             else:
413                 out = 'wrong password'
414
415         elif cmd =='tx':        
416             out = send_tx(data)
417
418         elif cmd == 'stop':
419             global stopping
420             if config.get('server','password') == data:
421                 stopping = True
422                 out = 'ok'
423             else:
424                 out = 'wrong password'
425
426         elif cmd == 'peers':
427             out = repr(peer_list.values())
428
429         else:
430             out = None
431
432         if out:
433             #print ipaddr, cmd, len(out)
434             try:
435                 conn.send(out)
436             except:
437                 print "error, could not send"
438
439     finally:
440         conn.close()
441     
442
443 ds = BCDataStream.BCDataStream()
444
445
446
447
448 def memorypool_update(store):
449     conn = bitcoinrpc.connect_to_local()
450     try:
451         v = conn.getmemorypool()
452     except:
453         print "cannot contact bitcoin daemon"
454         return
455     v = v['transactions']
456     for hextx in v:
457         ds.clear()
458         ds.write(hextx.decode('hex'))
459         tx = deserialize.parse_Transaction(ds)
460         #print "new tx",tx
461
462         tx['hash'] = util.double_sha256(tx['tx'])
463             
464         if store.tx_find_id_and_value(tx):
465             pass
466         else:
467             store.import_tx(tx, False)
468
469     store.commit()
470
471
472
473
474 def clean_session_thread():
475     while not stopping:
476         time.sleep(30)
477         t = time.time()
478         for k,t0 in sessions_last_time.items():
479             if t - t0 > 5*60:
480                 print time.asctime(), "lost session",k
481                 sessions.pop(k)
482                 sessions_last_time.pop(k)
483             
484
485 def irc_thread():
486     global peer_list
487     NICK = 'E_'+random_string(10)
488     while not stopping:
489         try:
490             s = socket.socket()
491             s.connect(('irc.freenode.net', 6667))
492             s.send('USER electrum 0 * :'+config.get('server','host')+' '+config.get('server','ircname')+'\n')
493             s.send('NICK '+NICK+'\n')
494             s.send('JOIN #electrum\n')
495             sf = s.makefile('r', 0)
496             t = 0
497             while not stopping:
498                 line = sf.readline()
499                 line = line.rstrip('\r\n')
500                 line = line.split()
501                 if line[0]=='PING': 
502                     s.send('PONG '+line[1]+'\n')
503                 elif '353' in line: # answer to /names
504                     k = line.index('353')
505                     for item in line[k+1:]:
506                         if item[0:2] == 'E_':
507                             s.send('WHO %s\n'%item)
508                 elif '352' in line: # answer to /who
509                     # warning: this is a horrible hack which apparently works
510                     k = line.index('352')
511                     ip = line[k+4]
512                     ip = socket.gethostbyname(ip)
513                     name = line[k+6]
514                     host = line[k+9]
515                     peer_list[name] = (ip,host)
516                 if time.time() - t > 5*60:
517                     s.send('NAMES #electrum\n')
518                     t = time.time()
519                     peer_list = {}
520         except:
521             traceback.print_exc(file=sys.stdout)
522         finally:
523             sf.close()
524             s.close()
525
526
527 import traceback
528
529
530 if __name__ == '__main__':
531
532     if len(sys.argv)>1:
533         cmd = sys.argv[1]
534         if cmd == 'load':
535             request = "('load','%s')#"%config.get('server','password')
536         elif cmd == 'peers':
537             request = "('peers','')#"
538         elif cmd == 'stop':
539             request = "('stop','%s')#"%config.get('server','password')
540         elif cmd == 'h':
541             request = "('h','%s')#"%sys.argv[2]
542         elif cmd == 'b':
543             request = "('b','')#"
544
545         s = socket.socket( socket.AF_INET, socket.SOCK_STREAM)
546         s.connect((config.get('server','host'), config.getint('server','port')))
547         s.send( request )
548         out = ''
549         while 1:
550             msg = s.recv(1024)
551             if msg: out += msg
552             else: break
553         s.close()
554         print out
555         sys.exit(0)
556
557
558     print "starting Electrum server"
559     print "cache:", config.get('server', 'cache')
560
561     conf = DataStore.CONFIG_DEFAULTS
562     args, argv = readconf.parse_argv( [], conf)
563     args.dbtype= config.get('database','type')
564     if args.dbtype == 'sqlite3':
565         args.connect_args = { 'database' : config.get('database','database') }
566     elif args.dbtype == 'MySQLdb':
567         args.connect_args = { 'db' : config.get('database','database'), 'user' : config.get('database','username'), 'passwd' : config.get('database','password') }
568     elif args.dbtype == 'psycopg2':
569         args.connect_args = { 'database' : config.get('database','database') }
570     store = MyStore(args)
571     store.tx_cache = {}
572     store.ismempool = False
573
574     thread.start_new_thread(listen_thread, (store,))
575     thread.start_new_thread(clean_session_thread, ())
576     if (config.get('server','irc') == 'yes' ):
577         thread.start_new_thread(irc_thread, ())
578
579     while not stopping:
580         try:
581             dblock.acquire()
582             store.catch_up()
583             store.ismempool = True
584             memorypool_update(store)
585             store.ismempool = False
586             block_number = store.get_block_number(1)
587             dblock.release()
588         except:
589             traceback.print_exc(file=sys.stdout)
590         time.sleep(10)
591
592     print "server stopped"
593