Memory cache for addresses. Not activated for the moment, but it will log errors.
[electrum-server.git] / server.py
1 #!/usr/bin/env python
2 # Copyright(C) 2011 thomasv@gitorious
3
4 # This program is free software: you can redistribute it and/or modify
5 # it under the terms of the GNU Affero General Public License as
6 # published by the Free Software Foundation, either version 3 of the
7 # License, or (at your option) any later version.
8 #
9 # This program is distributed in the hope that it will be useful, but
10 # WITHOUT ANY WARRANTY; without even the implied warranty of
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
12 # Affero General Public License for more details.
13 #
14 # You should have received a copy of the GNU Affero General Public
15 # License along with this program.  If not, see
16 # <http://www.gnu.org/licenses/agpl.html>.
17
18 """
19 Todo:
20    * server should check and return bitcoind status..
21    * improve txpoint sorting
22 """
23
24
25 import time, socket, operator, thread, ast, sys,re
26 import psycopg2, binascii
27 import bitcoinrpc
28
29 from Abe.abe import hash_to_address, decode_check_address
30 from Abe.DataStore import DataStore as Datastore_class
31 from Abe import DataStore, readconf, BCDataStream,  deserialize, util, base58
32
33 import ConfigParser
34
35 config = ConfigParser.ConfigParser()
36 # set some defaults, which will be overwritten by the config file
37 config.add_section('server')
38 config.set('server','banner', 'Welcome to Electrum!')
39 config.set('server', 'host', 'ecdsa.org')
40 config.set('server', 'port', 50000)
41 config.set('server', 'password', '')
42 config.set('server', 'irc', 'yes')
43 config.set('server', 'ircname', 'Electrum server')
44 config.add_section('database')
45 config.set('database', 'type', 'psycopg2')
46 config.set('database', 'database', 'abe')
47
48 try:
49     f = open('/etc/electrum.conf','r')
50     config.readfp(f)
51     f.close()
52 except:
53     print "Could not read electrum.conf. I will use the default values."
54
55 stopping = False
56 block_number = -1
57 sessions = {}
58 sessions_last_time = {}
59 dblock = thread.allocate_lock()
60
61 peer_list = {}
62
63
64
65 class MyStore(Datastore_class):
66
67     def import_tx(self, tx, is_coinbase):
68         tx_id = super(MyStore, self).import_tx(tx, is_coinbase)
69         self.update_tx_cache(tx_id)
70
71     def update_tx_cache(self, txid):
72         inrows = self.get_tx_inputs(txid, False)
73         for row in inrows:
74             _hash = store.binout(row[6])
75             address = hash_to_address(chr(0), _hash)
76             if self.tx_cache.has_key(address):
77                 print "cache: popping", address
78                 self.tx_cache.pop(address)
79         outrows = self.get_tx_outputs(txid, False)
80         for row in outrows:
81             _hash = store.binout(row[6])
82             address = hash_to_address(chr(0), _hash)
83             if self.tx_cache.has_key(address):
84                 print "cache: popping", address
85                 self.tx_cache.pop(address)
86
87     def safe_sql(self,sql, params=(), lock=True):
88         try:
89             if lock: dblock.acquire()
90             ret = self.selectall(sql,params)
91             if lock: dblock.release()
92             return ret
93         except:
94             print "sql error", sql
95             return []
96
97     def get_tx_outputs(self, tx_id, lock=True):
98         return self.safe_sql("""SELECT
99                 txout.txout_pos,
100                 txout.txout_scriptPubKey,
101                 txout.txout_value,
102                 nexttx.tx_hash,
103                 nexttx.tx_id,
104                 txin.txin_pos,
105                 pubkey.pubkey_hash
106               FROM txout
107               LEFT JOIN txin ON (txin.txout_id = txout.txout_id)
108               LEFT JOIN pubkey ON (pubkey.pubkey_id = txout.pubkey_id)
109               LEFT JOIN tx nexttx ON (txin.tx_id = nexttx.tx_id)
110              WHERE txout.tx_id = %d 
111              ORDER BY txout.txout_pos
112         """%(tx_id), (), lock)
113
114     def get_tx_inputs(self, tx_id, lock=True):
115         return self.safe_sql(""" SELECT
116                 txin.txin_pos,
117                 txin.txin_scriptSig,
118                 txout.txout_value,
119                 COALESCE(prevtx.tx_hash, u.txout_tx_hash),
120                 prevtx.tx_id,
121                 COALESCE(txout.txout_pos, u.txout_pos),
122                 pubkey.pubkey_hash
123               FROM txin
124               LEFT JOIN txout ON (txout.txout_id = txin.txout_id)
125               LEFT JOIN pubkey ON (pubkey.pubkey_id = txout.pubkey_id)
126               LEFT JOIN tx prevtx ON (txout.tx_id = prevtx.tx_id)
127               LEFT JOIN unlinked_txin u ON (u.txin_id = txin.txin_id)
128              WHERE txin.tx_id = %d
129              ORDER BY txin.txin_pos
130              """%(tx_id,), (), lock)
131
132     def get_address_out_rows(self, dbhash):
133         return self.safe_sql(""" SELECT
134                 b.block_nTime,
135                 cc.chain_id,
136                 b.block_height,
137                 1,
138                 b.block_hash,
139                 tx.tx_hash,
140                 tx.tx_id,
141                 txin.txin_pos,
142                 -prevout.txout_value
143               FROM chain_candidate cc
144               JOIN block b ON (b.block_id = cc.block_id)
145               JOIN block_tx ON (block_tx.block_id = b.block_id)
146               JOIN tx ON (tx.tx_id = block_tx.tx_id)
147               JOIN txin ON (txin.tx_id = tx.tx_id)
148               JOIN txout prevout ON (txin.txout_id = prevout.txout_id)
149               JOIN pubkey ON (pubkey.pubkey_id = prevout.pubkey_id)
150              WHERE pubkey.pubkey_hash = ?
151                AND cc.in_longest = 1""", (dbhash,))
152
153     def get_address_out_rows_memorypool(self, dbhash):
154         return self.safe_sql(""" SELECT
155                 1,
156                 tx.tx_hash,
157                 tx.tx_id,
158                 txin.txin_pos,
159                 -prevout.txout_value
160               FROM tx 
161               JOIN txin ON (txin.tx_id = tx.tx_id)
162               JOIN txout prevout ON (txin.txout_id = prevout.txout_id)
163               JOIN pubkey ON (pubkey.pubkey_id = prevout.pubkey_id)
164              WHERE pubkey.pubkey_hash = ? """, (dbhash,))
165
166     def get_address_in_rows(self, dbhash):
167         return self.safe_sql(""" SELECT
168                 b.block_nTime,
169                 cc.chain_id,
170                 b.block_height,
171                 0,
172                 b.block_hash,
173                 tx.tx_hash,
174                 tx.tx_id,
175                 txout.txout_pos,
176                 txout.txout_value
177               FROM chain_candidate cc
178               JOIN block b ON (b.block_id = cc.block_id)
179               JOIN block_tx ON (block_tx.block_id = b.block_id)
180               JOIN tx ON (tx.tx_id = block_tx.tx_id)
181               JOIN txout ON (txout.tx_id = tx.tx_id)
182               JOIN pubkey ON (pubkey.pubkey_id = txout.pubkey_id)
183              WHERE pubkey.pubkey_hash = ?
184                AND cc.in_longest = 1""", (dbhash,))
185
186     def get_address_in_rows_memorypool(self, dbhash):
187         return self.safe_sql( """ SELECT
188                 0,
189                 tx.tx_hash,
190                 tx.tx_id,
191                 txout.txout_pos,
192                 txout.txout_value
193               FROM tx
194               JOIN txout ON (txout.tx_id = tx.tx_id)
195               JOIN pubkey ON (pubkey.pubkey_id = txout.pubkey_id)
196              WHERE pubkey.pubkey_hash = ? """, (dbhash,))
197
198     def get_txpoints(self, addr):
199
200         cached_version = self.tx_cache.get( addr ) 
201
202         version, binaddr = decode_check_address(addr)
203         if binaddr is None:
204             return "err"
205         dbhash = self.binin(binaddr)
206         rows = []
207         rows += self.get_address_out_rows( dbhash )
208         rows += self.get_address_in_rows( dbhash )
209
210         txpoints = []
211         known_tx = []
212
213         for row in rows:
214             try:
215                 nTime, chain_id, height, is_in, blk_hash, tx_hash, tx_id, pos, value = row
216             except:
217                 print "cannot unpack row", row
218                 break
219             tx_hash = self.hashout_hex(tx_hash)
220             txpoint = {
221                     "nTime":    int(nTime),
222                     #"chain_id": int(chain_id),
223                     "height":   int(height),
224                     "is_in":    int(is_in),
225                     "blk_hash": self.hashout_hex(blk_hash),
226                     "tx_hash":  tx_hash,
227                     "tx_id":    int(tx_id),
228                     "pos":      int(pos),
229                     "value":    int(value),
230                     }
231
232             txpoints.append(txpoint)
233             known_tx.append(self.hashout_hex(tx_hash))
234
235
236         # todo: sort them really...
237         txpoints = sorted(txpoints, key=operator.itemgetter("nTime"))
238
239         # read memory pool
240         rows = []
241         rows += self.get_address_in_rows_memorypool( dbhash )
242         rows += self.get_address_out_rows_memorypool( dbhash )
243         for row in rows:
244             is_in, tx_hash, tx_id, pos, value = row
245             tx_hash = self.hashout_hex(tx_hash)
246             if tx_hash in known_tx:
247                 continue
248             #print "mempool", tx_hash
249             txpoint = {
250                     "nTime":    0,
251                     "chain_id": 1,
252                     "height":   0,
253                     "is_in":    int(is_in),
254                     "blk_hash": 'mempool',
255                     "tx_hash":  tx_hash,
256                     "tx_id":    int(tx_id),
257                     "pos":      int(pos),
258                     "value":    int(value),
259                     }
260             txpoints.append(txpoint)
261
262
263         for txpoint in txpoints:
264             tx_id = txpoint['tx_id']
265             
266             txinputs = []
267             inrows = self.get_tx_inputs(tx_id)
268             for row in inrows:
269                 _hash = self.binout(row[6])
270                 address = hash_to_address(chr(0), _hash)
271                 txinputs.append(address)
272             txpoint['inputs'] = txinputs
273             txoutputs = []
274             outrows = self.get_tx_outputs(tx_id)
275             for row in outrows:
276                 _hash = self.binout(row[6])
277                 address = hash_to_address(chr(0), _hash)
278                 txoutputs.append(address)
279             txpoint['outputs'] = txoutputs
280
281             # for all unspent inputs, I want their scriptpubkey. (actually I could deduce it from the address)
282             if not txpoint['is_in']:
283                 # detect if already redeemed...
284                 for row in outrows:
285                     if row[6] == dbhash: break
286                 else:
287                     raise
288                 #row = self.get_tx_output(tx_id,dbhash)
289                 # pos, script, value, o_hash, o_id, o_pos, binaddr = row
290                 # if not redeemed, we add the script
291                 if row:
292                     if not row[4]: txpoint['raw_scriptPubKey'] = row[1]
293
294
295         if cached_version is None:
296             #print "cache: adding", addr
297             self.tx_cache[addr] = txpoints
298             return txpoints
299         else:
300             if cached_version != txpoints: 
301                 print "cache error: ", addr
302             return txpoints
303
304
305     def get_status(self, addr):
306         # last block for an address.
307         tx_points = self.get_txpoints(addr)
308         if not tx_points:
309             return None
310         else:
311             return tx_points[-1]['blk_hash']
312
313
314 def send_tx(tx):
315     import bitcoinrpc
316     conn = bitcoinrpc.connect_to_local()
317     try:
318         v = conn.importtransaction(tx)
319     except:
320         v = "error: transaction rejected by memorypool"
321     return v
322
323
324 def listen_thread(store):
325     s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
326     s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
327     s.bind((config.get('server','host'), config.getint('server','port')))
328     s.listen(1)
329     while not stopping:
330         conn, addr = s.accept()
331         thread.start_new_thread(client_thread, (addr, conn,))
332
333 def random_string(N):
334     import random, string
335     return ''.join(random.choice(string.ascii_uppercase + string.digits) for x in range(N))
336
337 def client_thread(ipaddr,conn):
338     #print "client thread", ipaddr
339     try:
340         ipaddr = ipaddr[0]
341         msg = ''
342         while 1:
343             d = conn.recv(1024)
344             msg += d
345             if d[-1]=='#':
346                 break
347
348         #print msg
349
350         try:
351             cmd, data = ast.literal_eval(msg[:-1])
352         except:
353             print "syntax error", repr(msg)
354             conn.close()
355             return
356
357         if cmd=='b':
358             out = "%d"%block_number
359         elif cmd=='session':
360             session_id = random_string(10)
361             try:
362                 addresses = ast.literal_eval(data)
363             except:
364                 print "error"
365                 conn.close()
366                 return
367
368             print time.asctime(), "session", ipaddr, session_id, addresses[0], len(addresses)
369
370             sessions[session_id] = {}
371             for a in addresses:
372                 sessions[session_id][a] = ''
373             out = repr( (session_id, config.get('server','banner').replace('\\n','\n') ) )
374             sessions_last_time[session_id] = time.time()
375
376         elif cmd=='poll': 
377             session_id = data
378             addresses = sessions.get(session_id)
379             if not addresses:
380                 print "session not found", ipaddr
381                 out = repr( (-1, {}))
382             else:
383                 sessions_last_time[session_id] = time.time()
384                 ret = {}
385                 for addr in addresses:
386                     status = store.get_status( addr )
387                     last_status = sessions[session_id].get( addr )
388                     if last_status != status:
389                         sessions[session_id][addr] = status
390                         ret[addr] = status
391                 out = repr( (block_number, ret ) )
392
393         elif cmd == 'h': 
394             # history
395             address = data
396             out = repr( store.get_txpoints( address ) )
397
398         elif cmd == 'load': 
399             if config.get('server','password') == data:
400                 out = repr( len(sessions) )
401             else:
402                 out = 'wrong password'
403
404         elif cmd =='tx':        
405             out = send_tx(data)
406
407         elif cmd == 'stop':
408             global stopping
409             if config.get('server','password') == data:
410                 stopping = True
411                 out = 'ok'
412             else:
413                 out = 'wrong password'
414
415         elif cmd == 'peers':
416             out = repr(peer_list.values())
417
418         else:
419             out = None
420
421         if out:
422             #print ipaddr, cmd, len(out)
423             try:
424                 conn.send(out)
425             except:
426                 print "error, could not send"
427
428     finally:
429         conn.close()
430     
431
432 ds = BCDataStream.BCDataStream()
433
434
435
436
437 def memorypool_update(store):
438     conn = bitcoinrpc.connect_to_local()
439     try:
440         v = conn.getmemorypool()
441     except:
442         print "cannot contact bitcoin daemon"
443         return
444     v = v['transactions']
445     for hextx in v:
446         ds.clear()
447         ds.write(hextx.decode('hex'))
448         tx = deserialize.parse_Transaction(ds)
449         #print "new tx",tx
450
451         tx['hash'] = util.double_sha256(tx['tx'])
452             
453         if store.tx_find_id_and_value(tx):
454             pass
455         else:
456             store.import_tx(tx, False)
457
458     store.commit()
459
460
461
462
463 def clean_session_thread():
464     while not stopping:
465         time.sleep(30)
466         t = time.time()
467         for k,t0 in sessions_last_time.items():
468             if t - t0 > 60:
469                 print "lost session",k
470                 sessions.pop(k)
471                 sessions_last_time.pop(k)
472             
473
474 def irc_thread():
475     global peer_list
476     NICK = 'E_'+random_string(10)
477     while not stopping:
478         try:
479             s = socket.socket()
480             s.connect(('irc.freenode.net', 6667))
481             s.send('USER electrum 0 * :'+config.get('server','host')+' '+config.get('server','ircname')+'\n')
482             s.send('NICK '+NICK+'\n')
483             s.send('JOIN #electrum\n')
484             sf = s.makefile('r', 0)
485             t = 0
486             while not stopping:
487                 line = sf.readline()
488                 line = line.rstrip('\r\n')
489                 line = line.split()
490                 if line[0]=='PING': 
491                     s.send('PONG '+line[1]+'\n')
492                 elif '353' in line: # answer to /names
493                     k = line.index('353')
494                     for item in line[k+1:]:
495                         if item[0:2] == 'E_':
496                             s.send('WHO %s\n'%item)
497                 elif '352' in line: # answer to /who
498                     # warning: this is a horrible hack which apparently works
499                     k = line.index('352')
500                     ip = line[k+4]
501                     ip = socket.gethostbyname(ip)
502                     name = line[k+6]
503                     host = line[k+9]
504                     peer_list[name] = (ip,host)
505                 elif time.time() - t > 5*60:
506                     s.send('NAMES #electrum\n')
507                     t = time.time()
508                     peer_list = {}
509         except:
510             traceback.print_exc(file=sys.stdout)
511         finally:
512             sf.close()
513             s.close()
514
515
516 import traceback
517
518
519 if __name__ == '__main__':
520
521     if len(sys.argv)>1:
522         cmd = sys.argv[1]
523         if cmd == 'load':
524             request = "('load','%s')#"%config.get('server','password')
525         elif cmd == 'peers':
526             request = "('peers','')#"
527         elif cmd == 'stop':
528             request = "('stop','%s')#"%config.get('server','password')
529
530         s = socket.socket( socket.AF_INET, socket.SOCK_STREAM)
531         s.connect((config.get('server','host'), config.getint('server','port')))
532         s.send( request )
533         out = ''
534         while 1:
535             msg = s.recv(1024)
536             if msg: out += msg
537             else: break
538         s.close()
539         print out
540         sys.exit(0)
541
542
543     print "starting Electrum server"
544     conf = DataStore.CONFIG_DEFAULTS
545     args, argv = readconf.parse_argv( [], conf)
546     args.dbtype= config.get('database','type')
547     if args.dbtype == 'sqlite3':
548         args.connect_args = { 'database' : config.get('database','database') }
549     elif args.dbtype == 'MySQLdb':
550         args.connect_args = { 'db' : config.get('database','database'), 'user' : config.get('database','username'), 'passwd' : config.get('database','password') }
551     elif args.dbtype == 'psycopg2':
552         args.connect_args = { 'database' : config.get('database','database') }
553     store = MyStore(args)
554     store.tx_cache = {}
555
556     thread.start_new_thread(listen_thread, (store,))
557     thread.start_new_thread(clean_session_thread, ())
558     if (config.get('server','irc') == 'yes' ):
559         thread.start_new_thread(irc_thread, ())
560
561     while not stopping:
562         try:
563             dblock.acquire()
564             store.catch_up()
565             memorypool_update(store)
566             block_number = store.get_block_number(1)
567             dblock.release()
568         except:
569             traceback.print_exc(file=sys.stdout)
570         time.sleep(10)
571
572     print "server stopped"
573