move address_queue
[electrum-server.git] / server.py
1 #!/usr/bin/env python
2 # Copyright(C) 2012 thomasv@gitorious
3
4 # This program is free software: you can redistribute it and/or modify
5 # it under the terms of the GNU Affero General Public License as
6 # published by the Free Software Foundation, either version 3 of the
7 # License, or (at your option) any later version.
8 #
9 # This program is distributed in the hope that it will be useful, but
10 # WITHOUT ANY WARRANTY; without even the implied warranty of
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
12 # Affero General Public License for more details.
13 #
14 # You should have received a copy of the GNU Affero General Public
15 # License along with this program.  If not, see
16 # <http://www.gnu.org/licenses/agpl.html>.
17
18 """
19 Todo:
20    * server should check and return bitcoind status..
21    * improve txpoint sorting
22    * command to check cache
23
24  mempool transactions do not need to be added to the database; it slows it down
25 """
26
27
28 from Abe.abe import hash_to_address, decode_check_address
29 from Abe.DataStore import DataStore as Datastore_class
30 from Abe import DataStore, readconf, BCDataStream,  deserialize, util, base58
31
32 import psycopg2, binascii
33
34 import thread, traceback, sys, urllib, operator
35 from json import dumps, loads
36
37
38 class MyStore(Datastore_class):
39
40     def __init__(self, config):
41         conf = DataStore.CONFIG_DEFAULTS
42         args, argv = readconf.parse_argv( [], conf)
43         args.dbtype = config.get('database','type')
44         if args.dbtype == 'sqlite3':
45             args.connect_args = { 'database' : config.get('database','database') }
46         elif args.dbtype == 'MySQLdb':
47             args.connect_args = { 'db' : config.get('database','database'), 'user' : config.get('database','username'), 'passwd' : config.get('database','password') }
48         elif args.dbtype == 'psycopg2':
49             args.connect_args = { 'database' : config.get('database','database') }
50
51         Datastore_class.__init__(self,args)
52
53         self.tx_cache = {}
54         self.mempool_keys = {}
55         self.bitcoind_url = 'http://%s:%s@%s:%s/' % ( config.get('bitcoind','user'), config.get('bitcoind','password'), config.get('bitcoind','host'), config.get('bitcoind','port'))
56
57         self.address_queue = Queue()
58
59         self.dblock = thread.allocate_lock()
60
61
62
63     def import_block(self, b, chain_ids=frozenset()):
64         block_id = super(MyStore, self).import_block(b, chain_ids)
65         for pos in xrange(len(b['transactions'])):
66             tx = b['transactions'][pos]
67             if 'hash' not in tx:
68                 tx['hash'] = util.double_sha256(tx['tx'])
69             tx_id = store.tx_find_id_and_value(tx)
70             if tx_id:
71                 self.update_tx_cache(tx_id)
72             else:
73                 print "error: import_block: no tx_id"
74         return block_id
75
76
77     def update_tx_cache(self, txid):
78         inrows = self.get_tx_inputs(txid, False)
79         for row in inrows:
80             _hash = self.binout(row[6])
81             address = hash_to_address(chr(0), _hash)
82             if self.tx_cache.has_key(address):
83                 print "cache: invalidating", address
84                 self.tx_cache.pop(address)
85             self.address_queue.put(address)
86
87         outrows = self.get_tx_outputs(txid, False)
88         for row in outrows:
89             _hash = self.binout(row[6])
90             address = hash_to_address(chr(0), _hash)
91             if self.tx_cache.has_key(address):
92                 print "cache: invalidating", address
93                 self.tx_cache.pop(address)
94             self.address_queue.put(address)
95
96     def safe_sql(self,sql, params=(), lock=True):
97         try:
98             if lock: self.dblock.acquire()
99             ret = self.selectall(sql,params)
100             if lock: self.dblock.release()
101             return ret
102         except:
103             print "sql error", sql
104             return []
105
106     def get_tx_outputs(self, tx_id, lock=True):
107         return self.safe_sql("""SELECT
108                 txout.txout_pos,
109                 txout.txout_scriptPubKey,
110                 txout.txout_value,
111                 nexttx.tx_hash,
112                 nexttx.tx_id,
113                 txin.txin_pos,
114                 pubkey.pubkey_hash
115               FROM txout
116               LEFT JOIN txin ON (txin.txout_id = txout.txout_id)
117               LEFT JOIN pubkey ON (pubkey.pubkey_id = txout.pubkey_id)
118               LEFT JOIN tx nexttx ON (txin.tx_id = nexttx.tx_id)
119              WHERE txout.tx_id = %d 
120              ORDER BY txout.txout_pos
121         """%(tx_id), (), lock)
122
123     def get_tx_inputs(self, tx_id, lock=True):
124         return self.safe_sql(""" SELECT
125                 txin.txin_pos,
126                 txin.txin_scriptSig,
127                 txout.txout_value,
128                 COALESCE(prevtx.tx_hash, u.txout_tx_hash),
129                 prevtx.tx_id,
130                 COALESCE(txout.txout_pos, u.txout_pos),
131                 pubkey.pubkey_hash
132               FROM txin
133               LEFT JOIN txout ON (txout.txout_id = txin.txout_id)
134               LEFT JOIN pubkey ON (pubkey.pubkey_id = txout.pubkey_id)
135               LEFT JOIN tx prevtx ON (txout.tx_id = prevtx.tx_id)
136               LEFT JOIN unlinked_txin u ON (u.txin_id = txin.txin_id)
137              WHERE txin.tx_id = %d
138              ORDER BY txin.txin_pos
139              """%(tx_id,), (), lock)
140
141     def get_address_out_rows(self, dbhash):
142         return self.safe_sql(""" SELECT
143                 b.block_nTime,
144                 cc.chain_id,
145                 b.block_height,
146                 1,
147                 b.block_hash,
148                 tx.tx_hash,
149                 tx.tx_id,
150                 txin.txin_pos,
151                 -prevout.txout_value
152               FROM chain_candidate cc
153               JOIN block b ON (b.block_id = cc.block_id)
154               JOIN block_tx ON (block_tx.block_id = b.block_id)
155               JOIN tx ON (tx.tx_id = block_tx.tx_id)
156               JOIN txin ON (txin.tx_id = tx.tx_id)
157               JOIN txout prevout ON (txin.txout_id = prevout.txout_id)
158               JOIN pubkey ON (pubkey.pubkey_id = prevout.pubkey_id)
159              WHERE pubkey.pubkey_hash = ?
160                AND cc.in_longest = 1""", (dbhash,))
161
162     def get_address_out_rows_memorypool(self, dbhash):
163         return self.safe_sql(""" SELECT
164                 1,
165                 tx.tx_hash,
166                 tx.tx_id,
167                 txin.txin_pos,
168                 -prevout.txout_value
169               FROM tx 
170               JOIN txin ON (txin.tx_id = tx.tx_id)
171               JOIN txout prevout ON (txin.txout_id = prevout.txout_id)
172               JOIN pubkey ON (pubkey.pubkey_id = prevout.pubkey_id)
173              WHERE pubkey.pubkey_hash = ? """, (dbhash,))
174
175     def get_address_in_rows(self, dbhash):
176         return self.safe_sql(""" SELECT
177                 b.block_nTime,
178                 cc.chain_id,
179                 b.block_height,
180                 0,
181                 b.block_hash,
182                 tx.tx_hash,
183                 tx.tx_id,
184                 txout.txout_pos,
185                 txout.txout_value
186               FROM chain_candidate cc
187               JOIN block b ON (b.block_id = cc.block_id)
188               JOIN block_tx ON (block_tx.block_id = b.block_id)
189               JOIN tx ON (tx.tx_id = block_tx.tx_id)
190               JOIN txout ON (txout.tx_id = tx.tx_id)
191               JOIN pubkey ON (pubkey.pubkey_id = txout.pubkey_id)
192              WHERE pubkey.pubkey_hash = ?
193                AND cc.in_longest = 1""", (dbhash,))
194
195     def get_address_in_rows_memorypool(self, dbhash):
196         return self.safe_sql( """ SELECT
197                 0,
198                 tx.tx_hash,
199                 tx.tx_id,
200                 txout.txout_pos,
201                 txout.txout_value
202               FROM tx
203               JOIN txout ON (txout.tx_id = tx.tx_id)
204               JOIN pubkey ON (pubkey.pubkey_id = txout.pubkey_id)
205              WHERE pubkey.pubkey_hash = ? """, (dbhash,))
206
207     def get_history(self, addr):
208         
209         cached_version = self.tx_cache.get( addr )
210         if cached_version is not None:
211             return cached_version
212
213         version, binaddr = decode_check_address(addr)
214         if binaddr is None:
215             return None
216
217         dbhash = self.binin(binaddr)
218         rows = []
219         rows += self.get_address_out_rows( dbhash )
220         rows += self.get_address_in_rows( dbhash )
221
222         txpoints = []
223         known_tx = []
224
225         for row in rows:
226             try:
227                 nTime, chain_id, height, is_in, blk_hash, tx_hash, tx_id, pos, value = row
228             except:
229                 print "cannot unpack row", row
230                 break
231             tx_hash = self.hashout_hex(tx_hash)
232             txpoint = {
233                     "nTime":    int(nTime),
234                     "height":   int(height),
235                     "is_in":    int(is_in),
236                     "blk_hash": self.hashout_hex(blk_hash),
237                     "tx_hash":  tx_hash,
238                     "tx_id":    int(tx_id),
239                     "pos":      int(pos),
240                     "value":    int(value),
241                     }
242
243             txpoints.append(txpoint)
244             known_tx.append(self.hashout_hex(tx_hash))
245
246
247         # todo: sort them really...
248         txpoints = sorted(txpoints, key=operator.itemgetter("nTime"))
249
250         # read memory pool
251         rows = []
252         rows += self.get_address_in_rows_memorypool( dbhash )
253         rows += self.get_address_out_rows_memorypool( dbhash )
254         address_has_mempool = False
255
256         for row in rows:
257             is_in, tx_hash, tx_id, pos, value = row
258             tx_hash = self.hashout_hex(tx_hash)
259             if tx_hash in known_tx:
260                 continue
261
262             # this means that pending transactions were added to the db, even if they are not returned by getmemorypool
263             address_has_mempool = True
264
265             # this means pending transactions are returned by getmemorypool
266             if tx_hash not in self.mempool_keys:
267                 continue
268
269             #print "mempool", tx_hash
270             txpoint = {
271                     "nTime":    0,
272                     "height":   0,
273                     "is_in":    int(is_in),
274                     "blk_hash": 'mempool', 
275                     "tx_hash":  tx_hash,
276                     "tx_id":    int(tx_id),
277                     "pos":      int(pos),
278                     "value":    int(value),
279                     }
280             txpoints.append(txpoint)
281
282
283         for txpoint in txpoints:
284             tx_id = txpoint['tx_id']
285             
286             txinputs = []
287             inrows = self.get_tx_inputs(tx_id)
288             for row in inrows:
289                 _hash = self.binout(row[6])
290                 address = hash_to_address(chr(0), _hash)
291                 txinputs.append(address)
292             txpoint['inputs'] = txinputs
293             txoutputs = []
294             outrows = self.get_tx_outputs(tx_id)
295             for row in outrows:
296                 _hash = self.binout(row[6])
297                 address = hash_to_address(chr(0), _hash)
298                 txoutputs.append(address)
299             txpoint['outputs'] = txoutputs
300
301             # for all unspent inputs, I want their scriptpubkey. (actually I could deduce it from the address)
302             if not txpoint['is_in']:
303                 # detect if already redeemed...
304                 for row in outrows:
305                     if row[6] == dbhash: break
306                 else:
307                     raise
308                 #row = self.get_tx_output(tx_id,dbhash)
309                 # pos, script, value, o_hash, o_id, o_pos, binaddr = row
310                 # if not redeemed, we add the script
311                 if row:
312                     if not row[4]: txpoint['raw_scriptPubKey'] = row[1]
313
314         # cache result
315         if not address_has_mempool:
316             self.tx_cache[addr] = txpoints
317         
318         return txpoints
319
320
321
322     def memorypool_update(store):
323
324         ds = BCDataStream.BCDataStream()
325         previous_transactions = store.mempool_keys
326         store.mempool_keys = []
327
328         postdata = dumps({"method": 'getmemorypool', 'params': [], 'id':'jsonrpc'})
329
330         respdata = urllib.urlopen(store.bitcoind_url, postdata).read()
331         r = loads(respdata)
332         if r['error'] != None:
333             return
334
335         v = r['result'].get('transactions')
336         for hextx in v:
337             ds.clear()
338             ds.write(hextx.decode('hex'))
339             tx = deserialize.parse_Transaction(ds)
340             tx['hash'] = util.double_sha256(tx['tx'])
341             tx_hash = store.hashin(tx['hash'])
342
343     def send_tx(self,tx):
344         postdata = dumps({"method": 'importtransaction', 'params': [tx], 'id':'jsonrpc'})
345         respdata = urllib.urlopen(self.bitcoind_url, postdata).read()
346         r = loads(respdata)
347         if r['error'] != None:
348             out = "error: transaction rejected by memorypool\n"+tx
349         else:
350             out = r['result']
351         return out
352
353
354     def main_iteration(store):
355         try:
356             store.dblock.acquire()
357             store.catch_up()
358             store.memorypool_update()
359             block_number = store.get_block_number(1)
360
361         except IOError:
362             print "IOError: cannot reach bitcoind"
363             block_number = 0
364         except:
365             traceback.print_exc(file=sys.stdout)
366             block_number = 0
367         finally:
368             store.dblock.release()
369
370         return block_number
371
372
373
374 import time, json, socket, operator, thread, ast, sys, re, traceback
375 import ConfigParser
376 from json import dumps, loads
377 import urllib
378
379
380 config = ConfigParser.ConfigParser()
381 # set some defaults, which will be overwritten by the config file
382 config.add_section('server')
383 config.set('server','banner', 'Welcome to Electrum!')
384 config.set('server', 'host', 'localhost')
385 config.set('server', 'port', '50000')
386 config.set('server', 'password', '')
387 config.set('server', 'irc', 'yes')
388 config.set('server', 'ircname', 'Electrum server')
389 config.add_section('database')
390 config.set('database', 'type', 'psycopg2')
391 config.set('database', 'database', 'abe')
392
393 try:
394     f = open('/etc/electrum.conf','r')
395     config.readfp(f)
396     f.close()
397 except:
398     print "Could not read electrum.conf. I will use the default values."
399
400 try:
401     f = open('/etc/electrum.banner','r')
402     config.set('server','banner', f.read())
403     f.close()
404 except:
405     pass
406
407
408 password = config.get('server','password')
409
410 stopping = False
411 block_number = -1
412 sessions = {}
413 sessions_sub_numblocks = {} # sessions that have subscribed to the service
414
415 m_sessions = [{}] # served by http
416
417 peer_list = {}
418
419 wallets = {} # for ultra-light clients such as bccapi
420
421 from Queue import Queue
422 input_queue = Queue()
423 output_queue = Queue()
424
425
426
427
428 def random_string(N):
429     import random, string
430     return ''.join(random.choice(string.ascii_uppercase + string.digits) for x in range(N))
431
432     
433
434 def cmd_stop(_,__,pw):
435     global stopping
436     if password == pw:
437         stopping = True
438         return 'ok'
439     else:
440         return 'wrong password'
441
442 def cmd_load(_,__,pw):
443     if password == pw:
444         return repr( len(sessions) )
445     else:
446         return 'wrong password'
447
448
449
450
451
452 def modified_addresses(session):
453     if 1:
454         t1 = time.time()
455         addresses = session['addresses']
456         session['last_time'] = time.time()
457         ret = {}
458         k = 0
459         for addr in addresses:
460             status = get_address_status( addr )
461             msg_id, last_status = addresses.get( addr )
462             if last_status != status:
463                 addresses[addr] = msg_id, status
464                 ret[addr] = status
465
466         t2 = time.time() - t1 
467         #if t2 > 10: print "high load:", session_id, "%d/%d"%(k,len(addresses)), t2
468         return ret, addresses
469
470
471 def poll_session(session_id): 
472     # native
473     session = sessions.get(session_id)
474     if session is None:
475         print time.asctime(), "session not found", session_id
476         return -1, {}
477     else:
478         ret, addresses = modified_addresses(session)
479         if ret: sessions[session_id]['addresses'] = addresses
480         return repr( (block_number,ret))
481
482
483 def poll_session_json(session_id, message_id):
484     session = m_sessions[0].get(session_id)
485     if session is None:
486         raise BaseException("session not found %s"%session_id)
487     else:
488         out = []
489         ret, addresses = modified_addresses(session)
490         if ret: 
491             m_sessions[0][session_id]['addresses'] = addresses
492             for addr in ret:
493                 msg_id, status = addresses[addr]
494                 out.append(  { 'id':msg_id, 'result':status } )
495
496         msg_id, last_nb = session.get('numblocks')
497         if last_nb:
498             if last_nb != block_number:
499                 m_sessions[0][session_id]['numblocks'] = msg_id, block_number
500                 out.append( {'id':msg_id, 'result':block_number} )
501
502         return out
503
504
505 def do_update_address(addr):
506     # an address was involved in a transaction; we check if it was subscribed to in a session
507     # the address can be subscribed in several sessions; the cache should ensure that we don't do redundant requests
508
509     for session_id in sessions.keys():
510         session = sessions[session_id]
511         if session.get('type') != 'persistent': continue
512         addresses = session['addresses'].keys()
513
514         if addr in addresses:
515             status = get_address_status( addr )
516             message_id, last_status = session['addresses'][addr]
517             if last_status != status:
518                 #print "sending new status for %s:"%addr, status
519                 send_status(session_id,message_id,addr,status)
520                 sessions[session_id]['addresses'][addr] = (message_id,status)
521
522 def get_address_status(addr):
523     # get address status, i.e. the last block for that address.
524     tx_points = store.get_history(addr)
525     if not tx_points:
526         status = None
527     else:
528         lastpoint = tx_points[-1]
529         status = lastpoint['blk_hash']
530         # this is a temporary hack; move it up once old clients have disappeared
531         if status == 'mempool': # and session['version'] != "old":
532             status = status + ':%d'% len(tx_points)
533     return status
534
535
536 def send_numblocks(session_id):
537     message_id = sessions_sub_numblocks[session_id]
538     out = json.dumps( {'id':message_id, 'result':block_number} )
539     output_queue.put((session_id, out))
540
541 def send_status(session_id, message_id, address, status):
542     out = json.dumps( { 'id':message_id, 'result':status } )
543     output_queue.put((session_id, out))
544
545 def address_get_history_json(_,message_id,address):
546     return store.get_history(address)
547
548 def subscribe_to_numblocks(session_id, message_id):
549     sessions_sub_numblocks[session_id] = message_id
550     send_numblocks(session_id)
551
552 def subscribe_to_numblocks_json(session_id, message_id):
553     global m_sessions
554     m_sessions[0][session_id]['numblocks'] = message_id,block_number
555     return block_number
556
557 def subscribe_to_address(session_id, message_id, address):
558     status = get_address_status(address)
559     sessions[session_id]['addresses'][address] = (message_id, status)
560     sessions[session_id]['last_time'] = time.time()
561     send_status(session_id, message_id, address, status)
562
563 def add_address_to_session_json(session_id, message_id, address):
564     global m_sessions
565     sessions = m_sessions[0]
566     status = get_address_status(address)
567     sessions[session_id]['addresses'][address] = (message_id, status)
568     sessions[session_id]['last_time'] = time.time()
569     m_sessions[0] = sessions
570     return status
571
572 def add_address_to_session(session_id, address):
573     status = get_address_status(address)
574     sessions[session_id]['addresses'][address] = ("", status)
575     sessions[session_id]['last_time'] = time.time()
576     return status
577
578 def new_session(version, addresses):
579     session_id = random_string(10)
580     sessions[session_id] = { 'addresses':{}, 'version':version }
581     for a in addresses:
582         sessions[session_id]['addresses'][a] = ('','')
583     out = repr( (session_id, config.get('server','banner').replace('\\n','\n') ) )
584     sessions[session_id]['last_time'] = time.time()
585     return out
586
587
588 def client_version_json(session_id, _, version):
589     global m_sessions
590     sessions = m_sessions[0]
591     sessions[session_id]['version'] = version
592     m_sessions[0] = sessions
593
594 def create_session_json(_, __):
595     sessions = m_sessions[0]
596     session_id = random_string(10)
597     print "creating session", session_id
598     sessions[session_id] = { 'addresses':{}, 'numblocks':('','') }
599     sessions[session_id]['last_time'] = time.time()
600     m_sessions[0] = sessions
601     return session_id
602
603
604
605 def get_banner(_,__):
606     return config.get('server','banner').replace('\\n','\n')
607
608 def update_session(session_id,addresses):
609     """deprecated in 0.42"""
610     sessions[session_id]['addresses'] = {}
611     for a in addresses:
612         sessions[session_id]['addresses'][a] = ''
613     sessions[session_id]['last_time'] = time.time()
614     return 'ok'
615
616 def native_server_thread():
617     s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
618     s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
619     s.bind((config.get('server','host'), config.getint('server','port')))
620     s.listen(1)
621     while not stopping:
622         conn, addr = s.accept()
623         try:
624             thread.start_new_thread(native_client_thread, (addr, conn,))
625         except:
626             # can't start new thread if there is no memory..
627             traceback.print_exc(file=sys.stdout)
628
629
630 def native_client_thread(ipaddr,conn):
631     #print "client thread", ipaddr
632     try:
633         ipaddr = ipaddr[0]
634         msg = ''
635         while 1:
636             d = conn.recv(1024)
637             msg += d
638             if not d: 
639                 break
640             if '#' in msg:
641                 msg = msg.split('#', 1)[0]
642                 break
643         try:
644             cmd, data = ast.literal_eval(msg)
645         except:
646             print "syntax error", repr(msg), ipaddr
647             conn.close()
648             return
649
650         out = do_command(cmd, data, ipaddr)
651         if out:
652             #print ipaddr, cmd, len(out)
653             try:
654                 conn.send(out)
655             except:
656                 print "error, could not send"
657
658     finally:
659         conn.close()
660
661
662 def timestr():
663     return time.strftime("[%d/%m/%Y-%H:%M:%S]")
664
665 # used by the native handler
666 def do_command(cmd, data, ipaddr):
667
668     if cmd=='b':
669         out = "%d"%block_number
670
671     elif cmd in ['session','new_session']:
672         try:
673             if cmd == 'session':
674                 addresses = ast.literal_eval(data)
675                 version = "old"
676             else:
677                 version, addresses = ast.literal_eval(data)
678                 if version[0]=="0": version = "v" + version
679         except:
680             print "error", data
681             return None
682         print timestr(), "new session", ipaddr, addresses[0] if addresses else addresses, len(addresses), version
683         out = new_session(version, addresses)
684
685     elif cmd=='address.subscribe':
686         try:
687             session_id, addr = ast.literal_eval(data)
688         except:
689             traceback.print_exc(file=sys.stdout)
690             print data
691             return None
692         out = add_address_to_session(session_id,addr)
693
694     elif cmd=='update_session':
695         try:
696             session_id, addresses = ast.literal_eval(data)
697         except:
698             traceback.print_exc(file=sys.stdout)
699             return None
700         print timestr(), "update session", ipaddr, addresses[0] if addresses else addresses, len(addresses)
701         out = update_session(session_id,addresses)
702             
703     elif cmd=='poll': 
704         out = poll_session(data)
705
706     elif cmd == 'h': 
707         # history
708         address = data
709         out = repr( store.get_history( address ) )
710
711     elif cmd == 'load': 
712         out = cmd_load(None,None,data)
713
714     elif cmd =='tx':
715         out = store.send_tx(data)
716         print timestr(), "sent tx:", ipaddr, out
717
718     elif cmd == 'stop':
719         out = cmd_stop(data)
720
721     elif cmd == 'peers':
722         out = repr(peer_list.values())
723
724     else:
725         out = None
726
727     return out
728
729
730
731 ####################################################################
732
733 def tcp_server_thread():
734     thread.start_new_thread(process_input_queue, ())
735     thread.start_new_thread(process_output_queue, ())
736
737     s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
738     s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
739     s.bind((config.get('server','host'), 50001))
740     s.listen(1)
741     while not stopping:
742         conn, addr = s.accept()
743         try:
744             thread.start_new_thread(tcp_client_thread, (addr, conn,))
745         except:
746             # can't start new thread if there is no memory..
747             traceback.print_exc(file=sys.stdout)
748
749
750 def close_session(session_id):
751     #print "lost connection", session_id
752     sessions.pop(session_id)
753     if session_id in sessions_sub_numblocks:
754         sessions_sub_numblocks.pop(session_id)
755
756
757 # one thread per client. put requests in a queue.
758 def tcp_client_thread(ipaddr,conn):
759     """ use a persistent connection. put commands in a queue."""
760
761     print timestr(), "TCP session", ipaddr
762     global sessions
763
764     session_id = random_string(10)
765     sessions[session_id] = { 'conn':conn, 'addresses':{}, 'version':'unknown', 'type':'persistent' }
766
767     ipaddr = ipaddr[0]
768     msg = ''
769
770     while not stopping:
771         try:
772             d = conn.recv(1024)
773         except socket.error:
774             d = ''
775         if not d:
776             close_session(session_id)
777             break
778
779         msg += d
780         while True:
781             s = msg.find('\n')
782             if s ==-1:
783                 break
784             else:
785                 c = msg[0:s].strip()
786                 msg = msg[s+1:]
787                 if c == 'quit': 
788                     conn.close()
789                     close_session(session_id)
790                     return
791                 try:
792                     c = json.loads(c)
793                 except:
794                     print "json error", repr(c)
795                     continue
796                 try:
797                     message_id = c.get('id')
798                     method = c.get('method')
799                     params = c.get('params')
800                 except:
801                     print "syntax error", repr(c), ipaddr
802                     continue
803
804                 # add to queue
805                 input_queue.put((session_id, message_id, method, params))
806
807
808
809 # read commands from the input queue. perform requests, etc. this should be called from the main thread.
810 def process_input_queue():
811     while not stopping:
812         session_id, message_id, method, data = input_queue.get()
813         if session_id not in sessions.keys():
814             continue
815         out = None
816         if method == 'address.subscribe':
817             address = data[0]
818             subscribe_to_address(session_id,message_id,address)
819         elif method == 'numblocks.subscribe':
820             subscribe_to_numblocks(session_id,message_id)
821         elif method == 'client.version':
822             sessions[session_id]['version'] = data[0]
823         elif method == 'server.banner':
824             out = { 'result':config.get('server','banner').replace('\\n','\n') } 
825         elif method == 'server.peers':
826             out = { 'result':peer_list.values() } 
827         elif method == 'address.get_history':
828             address = data[0]
829             out = { 'result':store.get_history( address ) } 
830         elif method == 'transaction.broadcast':
831             postdata = dumps({"method": 'importtransaction', 'params': [data], 'id':'jsonrpc'})
832             txo = urllib.urlopen(bitcoind_url, postdata).read()
833             print "sent tx:", txo
834             out = json.loads(txo)
835         else:
836             print "unknown command", method
837         if out:
838             out['id'] = message_id
839             out = json.dumps( out )
840             output_queue.put((session_id, out))
841
842 # this is a separate thread
843 def process_output_queue():
844     while not stopping:
845         session_id, out = output_queue.get()
846         session = sessions.get(session_id)
847         if session: 
848             try:
849                 conn = session.get('conn')
850                 conn.send(out+'\n')
851             except:
852                 close_session(session_id)
853                 
854
855
856
857 ####################################################################
858
859
860
861
862 def clean_session_thread():
863     while not stopping:
864         time.sleep(30)
865         t = time.time()
866         for k,s in sessions.items():
867             if s.get('type') == 'persistent': continue
868             t0 = s['last_time']
869             if t - t0 > 5*60:
870                 sessions.pop(k)
871                 print "lost session", k
872             
873
874 def irc_thread():
875     global peer_list
876     NICK = 'E_'+random_string(10)
877     while not stopping:
878         try:
879             s = socket.socket()
880             s.connect(('irc.freenode.net', 6667))
881             s.send('USER electrum 0 * :'+config.get('server','host')+' '+config.get('server','ircname')+'\n')
882             s.send('NICK '+NICK+'\n')
883             s.send('JOIN #electrum\n')
884             sf = s.makefile('r', 0)
885             t = 0
886             while not stopping:
887                 line = sf.readline()
888                 line = line.rstrip('\r\n')
889                 line = line.split()
890                 if line[0]=='PING': 
891                     s.send('PONG '+line[1]+'\n')
892                 elif '353' in line: # answer to /names
893                     k = line.index('353')
894                     for item in line[k+1:]:
895                         if item[0:2] == 'E_':
896                             s.send('WHO %s\n'%item)
897                 elif '352' in line: # answer to /who
898                     # warning: this is a horrible hack which apparently works
899                     k = line.index('352')
900                     ip = line[k+4]
901                     ip = socket.gethostbyname(ip)
902                     name = line[k+6]
903                     host = line[k+9]
904                     peer_list[name] = (ip,host)
905                 if time.time() - t > 5*60:
906                     s.send('NAMES #electrum\n')
907                     t = time.time()
908                     peer_list = {}
909         except:
910             traceback.print_exc(file=sys.stdout)
911         finally:
912             sf.close()
913             s.close()
914
915
916 def get_peers_json(_,__):
917     return peer_list.values()
918
919 def http_server_thread():
920     # see http://code.google.com/p/jsonrpclib/
921     from SocketServer import ThreadingMixIn
922     from StratumJSONRPCServer import StratumJSONRPCServer
923     class StratumThreadedJSONRPCServer(ThreadingMixIn, StratumJSONRPCServer): pass
924     server = StratumThreadedJSONRPCServer(( config.get('server','host'), 8081))
925     server.register_function(get_peers_json, 'server.peers')
926     server.register_function(cmd_stop, 'stop')
927     server.register_function(cmd_load, 'load')
928     server.register_function(get_banner, 'server.banner')
929     server.register_function(lambda a,b,c: store.send_tx(c), 'transaction.broadcast')
930     server.register_function(address_get_history_json, 'address.get_history')
931     server.register_function(add_address_to_session_json, 'address.subscribe')
932     server.register_function(subscribe_to_numblocks_json, 'numblocks.subscribe')
933     server.register_function(client_version_json, 'client.version')
934     server.register_function(create_session_json, 'session.create')   # internal message (not part of protocol)
935     server.register_function(poll_session_json, 'session.poll')       # internal message (not part of protocol)
936     server.serve_forever()
937
938
939 if __name__ == '__main__':
940
941     if len(sys.argv)>1:
942         import jsonrpclib
943         server = jsonrpclib.Server('http://%s:8081'%config.get('server','host'))
944         cmd = sys.argv[1]
945         if cmd == 'load':
946             out = server.load(password)
947         elif cmd == 'peers':
948             out = server.server.peers()
949         elif cmd == 'stop':
950             out = server.stop(password)
951         elif cmd == 'clear_cache':
952             out = server.clear_cache(password)
953         elif cmd == 'get_cache':
954             out = server.get_cache(password,sys.argv[2])
955         elif cmd == 'h':
956             out = server.address.get_history(sys.argv[2])
957         elif cmd == 'tx':
958             out = server.transaction.broadcast(sys.argv[2])
959         elif cmd == 'b':
960             out = server.numblocks.subscribe()
961         else:
962             out = "Unknown command: '%s'" % cmd
963         print out
964         sys.exit(0)
965
966     # backend
967     # from db import MyStore
968     store = MyStore(config)
969
970     # supported protocols
971     thread.start_new_thread(native_server_thread, ())
972     thread.start_new_thread(tcp_server_thread, ())
973     thread.start_new_thread(http_server_thread, ())
974     thread.start_new_thread(clean_session_thread, ())
975
976     if (config.get('server','irc') == 'yes' ):
977         thread.start_new_thread(irc_thread, ())
978
979     print "starting Electrum server"
980
981     old_block_number = None
982     while not stopping:
983         block_number = store.main_iteration()
984
985         if block_number != old_block_number:
986             old_block_number = block_number
987             for session_id in sessions_sub_numblocks.keys():
988                 send_numblocks(session_id)
989         while True:
990             try:
991                 addr = store.address_queue.get(False)
992             except:
993                 break
994             do_update_address(addr)
995
996         time.sleep(10)
997     print "server stopped"
998