fixes
[electrum-server.git] / server.py
1 #!/usr/bin/env python
2 # Copyright(C) 2012 thomasv@gitorious
3
4 # This program is free software: you can redistribute it and/or modify
5 # it under the terms of the GNU Affero General Public License as
6 # published by the Free Software Foundation, either version 3 of the
7 # License, or (at your option) any later version.
8 #
9 # This program is distributed in the hope that it will be useful, but
10 # WITHOUT ANY WARRANTY; without even the implied warranty of
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
12 # Affero General Public License for more details.
13 #
14 # You should have received a copy of the GNU Affero General Public
15 # License along with this program.  If not, see
16 # <http://www.gnu.org/licenses/agpl.html>.
17
18 """
19 Todo:
20    * server should check and return bitcoind status..
21    * improve txpoint sorting
22    * command to check cache
23
24  mempool transactions do not need to be added to the database; it slows it down
25 """
26
27
28 from Abe.abe import hash_to_address, decode_check_address
29 from Abe.DataStore import DataStore as Datastore_class
30 from Abe import DataStore, readconf, BCDataStream,  deserialize, util, base58
31
32 import psycopg2, binascii
33
34 import thread, traceback, sys, urllib, operator
35 from json import dumps, loads
36
37
38 class MyStore(Datastore_class):
39
40     def __init__(self, config):
41         conf = DataStore.CONFIG_DEFAULTS
42         args, argv = readconf.parse_argv( [], conf)
43         args.dbtype = config.get('database','type')
44         if args.dbtype == 'sqlite3':
45             args.connect_args = { 'database' : config.get('database','database') }
46         elif args.dbtype == 'MySQLdb':
47             args.connect_args = { 'db' : config.get('database','database'), 'user' : config.get('database','username'), 'passwd' : config.get('database','password') }
48         elif args.dbtype == 'psycopg2':
49             args.connect_args = { 'database' : config.get('database','database') }
50
51         Datastore_class.__init__(self,args)
52
53         self.tx_cache = {}
54         self.mempool_keys = {}
55         self.bitcoind_url = 'http://%s:%s@%s:%s/' % ( config.get('bitcoind','user'), config.get('bitcoind','password'), config.get('bitcoind','host'), config.get('bitcoind','port'))
56
57         self.address_queue = Queue()
58
59         self.dblock = thread.allocate_lock()
60
61
62
63     def import_block(self, b, chain_ids=frozenset()):
64         block_id = super(MyStore, self).import_block(b, chain_ids)
65         for pos in xrange(len(b['transactions'])):
66             tx = b['transactions'][pos]
67             if 'hash' not in tx:
68                 tx['hash'] = util.double_sha256(tx['tx'])
69             tx_id = store.tx_find_id_and_value(tx)
70             if tx_id:
71                 self.update_tx_cache(tx_id)
72             else:
73                 print "error: import_block: no tx_id"
74         return block_id
75
76
77     def update_tx_cache(self, txid):
78         inrows = self.get_tx_inputs(txid, False)
79         for row in inrows:
80             _hash = self.binout(row[6])
81             address = hash_to_address(chr(0), _hash)
82             if self.tx_cache.has_key(address):
83                 print "cache: invalidating", address
84                 self.tx_cache.pop(address)
85             self.address_queue.put(address)
86
87         outrows = self.get_tx_outputs(txid, False)
88         for row in outrows:
89             _hash = self.binout(row[6])
90             address = hash_to_address(chr(0), _hash)
91             if self.tx_cache.has_key(address):
92                 print "cache: invalidating", address
93                 self.tx_cache.pop(address)
94             self.address_queue.put(address)
95
96     def safe_sql(self,sql, params=(), lock=True):
97         try:
98             if lock: self.dblock.acquire()
99             ret = self.selectall(sql,params)
100             if lock: self.dblock.release()
101             return ret
102         except:
103             print "sql error", sql
104             return []
105
106     def get_tx_outputs(self, tx_id, lock=True):
107         return self.safe_sql("""SELECT
108                 txout.txout_pos,
109                 txout.txout_scriptPubKey,
110                 txout.txout_value,
111                 nexttx.tx_hash,
112                 nexttx.tx_id,
113                 txin.txin_pos,
114                 pubkey.pubkey_hash
115               FROM txout
116               LEFT JOIN txin ON (txin.txout_id = txout.txout_id)
117               LEFT JOIN pubkey ON (pubkey.pubkey_id = txout.pubkey_id)
118               LEFT JOIN tx nexttx ON (txin.tx_id = nexttx.tx_id)
119              WHERE txout.tx_id = %d 
120              ORDER BY txout.txout_pos
121         """%(tx_id), (), lock)
122
123     def get_tx_inputs(self, tx_id, lock=True):
124         return self.safe_sql(""" SELECT
125                 txin.txin_pos,
126                 txin.txin_scriptSig,
127                 txout.txout_value,
128                 COALESCE(prevtx.tx_hash, u.txout_tx_hash),
129                 prevtx.tx_id,
130                 COALESCE(txout.txout_pos, u.txout_pos),
131                 pubkey.pubkey_hash
132               FROM txin
133               LEFT JOIN txout ON (txout.txout_id = txin.txout_id)
134               LEFT JOIN pubkey ON (pubkey.pubkey_id = txout.pubkey_id)
135               LEFT JOIN tx prevtx ON (txout.tx_id = prevtx.tx_id)
136               LEFT JOIN unlinked_txin u ON (u.txin_id = txin.txin_id)
137              WHERE txin.tx_id = %d
138              ORDER BY txin.txin_pos
139              """%(tx_id,), (), lock)
140
141     def get_address_out_rows(self, dbhash):
142         return self.safe_sql(""" SELECT
143                 b.block_nTime,
144                 cc.chain_id,
145                 b.block_height,
146                 1,
147                 b.block_hash,
148                 tx.tx_hash,
149                 tx.tx_id,
150                 txin.txin_pos,
151                 -prevout.txout_value
152               FROM chain_candidate cc
153               JOIN block b ON (b.block_id = cc.block_id)
154               JOIN block_tx ON (block_tx.block_id = b.block_id)
155               JOIN tx ON (tx.tx_id = block_tx.tx_id)
156               JOIN txin ON (txin.tx_id = tx.tx_id)
157               JOIN txout prevout ON (txin.txout_id = prevout.txout_id)
158               JOIN pubkey ON (pubkey.pubkey_id = prevout.pubkey_id)
159              WHERE pubkey.pubkey_hash = ?
160                AND cc.in_longest = 1""", (dbhash,))
161
162     def get_address_out_rows_memorypool(self, dbhash):
163         return self.safe_sql(""" SELECT
164                 1,
165                 tx.tx_hash,
166                 tx.tx_id,
167                 txin.txin_pos,
168                 -prevout.txout_value
169               FROM tx 
170               JOIN txin ON (txin.tx_id = tx.tx_id)
171               JOIN txout prevout ON (txin.txout_id = prevout.txout_id)
172               JOIN pubkey ON (pubkey.pubkey_id = prevout.pubkey_id)
173              WHERE pubkey.pubkey_hash = ? """, (dbhash,))
174
175     def get_address_in_rows(self, dbhash):
176         return self.safe_sql(""" SELECT
177                 b.block_nTime,
178                 cc.chain_id,
179                 b.block_height,
180                 0,
181                 b.block_hash,
182                 tx.tx_hash,
183                 tx.tx_id,
184                 txout.txout_pos,
185                 txout.txout_value
186               FROM chain_candidate cc
187               JOIN block b ON (b.block_id = cc.block_id)
188               JOIN block_tx ON (block_tx.block_id = b.block_id)
189               JOIN tx ON (tx.tx_id = block_tx.tx_id)
190               JOIN txout ON (txout.tx_id = tx.tx_id)
191               JOIN pubkey ON (pubkey.pubkey_id = txout.pubkey_id)
192              WHERE pubkey.pubkey_hash = ?
193                AND cc.in_longest = 1""", (dbhash,))
194
195     def get_address_in_rows_memorypool(self, dbhash):
196         return self.safe_sql( """ SELECT
197                 0,
198                 tx.tx_hash,
199                 tx.tx_id,
200                 txout.txout_pos,
201                 txout.txout_value
202               FROM tx
203               JOIN txout ON (txout.tx_id = tx.tx_id)
204               JOIN pubkey ON (pubkey.pubkey_id = txout.pubkey_id)
205              WHERE pubkey.pubkey_hash = ? """, (dbhash,))
206
207     def get_history(self, addr):
208         
209         cached_version = self.tx_cache.get( addr )
210         if cached_version is not None:
211             return cached_version
212
213         version, binaddr = decode_check_address(addr)
214         if binaddr is None:
215             return None
216
217         dbhash = self.binin(binaddr)
218         rows = []
219         rows += self.get_address_out_rows( dbhash )
220         rows += self.get_address_in_rows( dbhash )
221
222         txpoints = []
223         known_tx = []
224
225         for row in rows:
226             try:
227                 nTime, chain_id, height, is_in, blk_hash, tx_hash, tx_id, pos, value = row
228             except:
229                 print "cannot unpack row", row
230                 break
231             tx_hash = self.hashout_hex(tx_hash)
232             txpoint = {
233                     "nTime":    int(nTime),
234                     "height":   int(height),
235                     "is_in":    int(is_in),
236                     "blk_hash": self.hashout_hex(blk_hash),
237                     "tx_hash":  tx_hash,
238                     "tx_id":    int(tx_id),
239                     "pos":      int(pos),
240                     "value":    int(value),
241                     }
242
243             txpoints.append(txpoint)
244             known_tx.append(self.hashout_hex(tx_hash))
245
246
247         # todo: sort them really...
248         txpoints = sorted(txpoints, key=operator.itemgetter("nTime"))
249
250         # read memory pool
251         rows = []
252         rows += self.get_address_in_rows_memorypool( dbhash )
253         rows += self.get_address_out_rows_memorypool( dbhash )
254         address_has_mempool = False
255
256         for row in rows:
257             is_in, tx_hash, tx_id, pos, value = row
258             tx_hash = self.hashout_hex(tx_hash)
259             if tx_hash in known_tx:
260                 continue
261
262             # this means that pending transactions were added to the db, even if they are not returned by getmemorypool
263             address_has_mempool = True
264
265             # this means pending transactions are returned by getmemorypool
266             if tx_hash not in self.mempool_keys:
267                 continue
268
269             #print "mempool", tx_hash
270             txpoint = {
271                     "nTime":    0,
272                     "height":   0,
273                     "is_in":    int(is_in),
274                     "blk_hash": 'mempool', 
275                     "tx_hash":  tx_hash,
276                     "tx_id":    int(tx_id),
277                     "pos":      int(pos),
278                     "value":    int(value),
279                     }
280             txpoints.append(txpoint)
281
282
283         for txpoint in txpoints:
284             tx_id = txpoint['tx_id']
285             
286             txinputs = []
287             inrows = self.get_tx_inputs(tx_id)
288             for row in inrows:
289                 _hash = self.binout(row[6])
290                 address = hash_to_address(chr(0), _hash)
291                 txinputs.append(address)
292             txpoint['inputs'] = txinputs
293             txoutputs = []
294             outrows = self.get_tx_outputs(tx_id)
295             for row in outrows:
296                 _hash = self.binout(row[6])
297                 address = hash_to_address(chr(0), _hash)
298                 txoutputs.append(address)
299             txpoint['outputs'] = txoutputs
300
301             # for all unspent inputs, I want their scriptpubkey. (actually I could deduce it from the address)
302             if not txpoint['is_in']:
303                 # detect if already redeemed...
304                 for row in outrows:
305                     if row[6] == dbhash: break
306                 else:
307                     raise
308                 #row = self.get_tx_output(tx_id,dbhash)
309                 # pos, script, value, o_hash, o_id, o_pos, binaddr = row
310                 # if not redeemed, we add the script
311                 if row:
312                     if not row[4]: txpoint['raw_scriptPubKey'] = row[1]
313
314         # cache result
315         if not address_has_mempool:
316             self.tx_cache[addr] = txpoints
317         
318         return txpoints
319
320
321
322     def memorypool_update(store):
323
324         ds = BCDataStream.BCDataStream()
325         previous_transactions = store.mempool_keys
326         store.mempool_keys = []
327
328         postdata = dumps({"method": 'getmemorypool', 'params': [], 'id':'jsonrpc'})
329
330         respdata = urllib.urlopen(store.bitcoind_url, postdata).read()
331         r = loads(respdata)
332         if r['error'] != None:
333             return
334
335         v = r['result'].get('transactions')
336         for hextx in v:
337             ds.clear()
338             ds.write(hextx.decode('hex'))
339             tx = deserialize.parse_Transaction(ds)
340             tx['hash'] = util.double_sha256(tx['tx'])
341             tx_hash = store.hashin(tx['hash'])
342
343             store.mempool_keys.append(tx_hash)
344             if store.tx_find_id_and_value(tx):
345                 pass
346             else:
347                 tx_id = store.import_tx(tx, False)
348                 store.update_tx_cache(tx_id)
349     
350         store.commit()
351
352
353     def send_tx(self,tx):
354         postdata = dumps({"method": 'importtransaction', 'params': [tx], 'id':'jsonrpc'})
355         respdata = urllib.urlopen(self.bitcoind_url, postdata).read()
356         r = loads(respdata)
357         if r['error'] != None:
358             out = "error: transaction rejected by memorypool\n"+tx
359         else:
360             out = r['result']
361         return out
362
363
364     def main_iteration(store):
365         try:
366             store.dblock.acquire()
367             store.catch_up()
368             store.memorypool_update()
369             block_number = store.get_block_number(1)
370
371         except IOError:
372             print "IOError: cannot reach bitcoind"
373             block_number = 0
374         except:
375             traceback.print_exc(file=sys.stdout)
376             block_number = 0
377         finally:
378             store.dblock.release()
379
380         return block_number
381
382
383
384 import time, json, socket, operator, thread, ast, sys, re, traceback
385 import ConfigParser
386 from json import dumps, loads
387 import urllib
388
389
390 config = ConfigParser.ConfigParser()
391 # set some defaults, which will be overwritten by the config file
392 config.add_section('server')
393 config.set('server','banner', 'Welcome to Electrum!')
394 config.set('server', 'host', 'localhost')
395 config.set('server', 'port', '50000')
396 config.set('server', 'password', '')
397 config.set('server', 'irc', 'yes')
398 config.set('server', 'ircname', 'Electrum server')
399 config.add_section('database')
400 config.set('database', 'type', 'psycopg2')
401 config.set('database', 'database', 'abe')
402
403 try:
404     f = open('/etc/electrum.conf','r')
405     config.readfp(f)
406     f.close()
407 except:
408     print "Could not read electrum.conf. I will use the default values."
409
410 try:
411     f = open('/etc/electrum.banner','r')
412     config.set('server','banner', f.read())
413     f.close()
414 except:
415     pass
416
417
418 password = config.get('server','password')
419
420 stopping = False
421 block_number = -1
422 sessions = {}
423 sessions_sub_numblocks = {} # sessions that have subscribed to the service
424
425 m_sessions = [{}] # served by http
426
427 peer_list = {}
428
429 wallets = {} # for ultra-light clients such as bccapi
430
431 from Queue import Queue
432 input_queue = Queue()
433 output_queue = Queue()
434
435
436
437
438 def random_string(N):
439     import random, string
440     return ''.join(random.choice(string.ascii_uppercase + string.digits) for x in range(N))
441
442     
443
444 def cmd_stop(_,__,pw):
445     global stopping
446     if password == pw:
447         stopping = True
448         return 'ok'
449     else:
450         return 'wrong password'
451
452 def cmd_load(_,__,pw):
453     if password == pw:
454         return repr( len(sessions) )
455     else:
456         return 'wrong password'
457
458
459
460
461
462 def modified_addresses(a_session):
463     #t1 = time.time()
464     import copy
465     session = copy.deepcopy(a_session)
466     addresses = session['addresses']
467     session['last_time'] = time.time()
468     ret = {}
469     k = 0
470     for addr in addresses:
471         status = get_address_status( addr )
472         msg_id, last_status = addresses.get( addr )
473         if last_status != status:
474             addresses[addr] = msg_id, status
475             ret[addr] = status
476
477     #t2 = time.time() - t1 
478     #if t2 > 10: print "high load:", session_id, "%d/%d"%(k,len(addresses)), t2
479     return ret, addresses
480
481
482 def poll_session(session_id): 
483     # native
484     session = sessions.get(session_id)
485     if session is None:
486         print time.asctime(), "session not found", session_id
487         return -1, {}
488     else:
489         ret, addresses = modified_addresses(session)
490         if ret: sessions[session_id]['addresses'] = addresses
491         return repr( (block_number,ret))
492
493
494 def poll_session_json(session_id, message_id):
495     session = m_sessions[0].get(session_id)
496     if session is None:
497         raise BaseException("session not found %s"%session_id)
498     else:
499         out = []
500         ret, addresses = modified_addresses(session)
501         if ret: 
502             m_sessions[0][session_id]['addresses'] = addresses
503             for addr in ret:
504                 msg_id, status = addresses[addr]
505                 out.append(  { 'id':msg_id, 'result':status } )
506
507         msg_id, last_nb = session.get('numblocks')
508         if last_nb:
509             if last_nb != block_number:
510                 m_sessions[0][session_id]['numblocks'] = msg_id, block_number
511                 out.append( {'id':msg_id, 'result':block_number} )
512
513         return out
514
515
516 def do_update_address(addr):
517     # an address was involved in a transaction; we check if it was subscribed to in a session
518     # the address can be subscribed in several sessions; the cache should ensure that we don't do redundant requests
519
520     for session_id in sessions.keys():
521         session = sessions[session_id]
522         if session.get('type') != 'persistent': continue
523         addresses = session['addresses'].keys()
524
525         if addr in addresses:
526             status = get_address_status( addr )
527             message_id, last_status = session['addresses'][addr]
528             if last_status != status:
529                 #print "sending new status for %s:"%addr, status
530                 send_status(session_id,message_id,addr,status)
531                 sessions[session_id]['addresses'][addr] = (message_id,status)
532
533 def get_address_status(addr):
534     # get address status, i.e. the last block for that address.
535     tx_points = store.get_history(addr)
536     if not tx_points:
537         status = None
538     else:
539         lastpoint = tx_points[-1]
540         status = lastpoint['blk_hash']
541         # this is a temporary hack; move it up once old clients have disappeared
542         if status == 'mempool': # and session['version'] != "old":
543             status = status + ':%d'% len(tx_points)
544     return status
545
546
547 def send_numblocks(session_id):
548     message_id = sessions_sub_numblocks[session_id]
549     out = json.dumps( {'id':message_id, 'result':block_number} )
550     output_queue.put((session_id, out))
551
552 def send_status(session_id, message_id, address, status):
553     out = json.dumps( { 'id':message_id, 'result':status } )
554     output_queue.put((session_id, out))
555
556 def address_get_history_json(_,message_id,address):
557     return store.get_history(address)
558
559 def subscribe_to_numblocks(session_id, message_id):
560     sessions_sub_numblocks[session_id] = message_id
561     send_numblocks(session_id)
562
563 def subscribe_to_numblocks_json(session_id, message_id):
564     global m_sessions
565     m_sessions[0][session_id]['numblocks'] = message_id,block_number
566     return block_number
567
568 def subscribe_to_address(session_id, message_id, address):
569     status = get_address_status(address)
570     sessions[session_id]['addresses'][address] = (message_id, status)
571     sessions[session_id]['last_time'] = time.time()
572     send_status(session_id, message_id, address, status)
573
574 def add_address_to_session_json(session_id, message_id, address):
575     global m_sessions
576     sessions = m_sessions[0]
577     status = get_address_status(address)
578     sessions[session_id]['addresses'][address] = (message_id, status)
579     sessions[session_id]['last_time'] = time.time()
580     m_sessions[0] = sessions
581     return status
582
583 def add_address_to_session(session_id, address):
584     status = get_address_status(address)
585     sessions[session_id]['addresses'][address] = ("", status)
586     sessions[session_id]['last_time'] = time.time()
587     return status
588
589 def new_session(version, addresses):
590     session_id = random_string(10)
591     sessions[session_id] = { 'addresses':{}, 'version':version }
592     for a in addresses:
593         sessions[session_id]['addresses'][a] = ('','')
594     out = repr( (session_id, config.get('server','banner').replace('\\n','\n') ) )
595     sessions[session_id]['last_time'] = time.time()
596     return out
597
598
599 def client_version_json(session_id, _, version):
600     global m_sessions
601     sessions = m_sessions[0]
602     sessions[session_id]['version'] = version
603     m_sessions[0] = sessions
604
605 def create_session_json(_, __):
606     sessions = m_sessions[0]
607     session_id = random_string(10)
608     print "creating session", session_id
609     sessions[session_id] = { 'addresses':{}, 'numblocks':('','') }
610     sessions[session_id]['last_time'] = time.time()
611     m_sessions[0] = sessions
612     return session_id
613
614
615
616 def get_banner(_,__):
617     return config.get('server','banner').replace('\\n','\n')
618
619 def update_session(session_id,addresses):
620     """deprecated in 0.42"""
621     sessions[session_id]['addresses'] = {}
622     for a in addresses:
623         sessions[session_id]['addresses'][a] = ''
624     sessions[session_id]['last_time'] = time.time()
625     return 'ok'
626
627 def native_server_thread():
628     s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
629     s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
630     s.bind((config.get('server','host'), config.getint('server','port')))
631     s.listen(1)
632     while not stopping:
633         conn, addr = s.accept()
634         try:
635             thread.start_new_thread(native_client_thread, (addr, conn,))
636         except:
637             # can't start new thread if there is no memory..
638             traceback.print_exc(file=sys.stdout)
639
640
641 def native_client_thread(ipaddr,conn):
642     #print "client thread", ipaddr
643     try:
644         ipaddr = ipaddr[0]
645         msg = ''
646         while 1:
647             d = conn.recv(1024)
648             msg += d
649             if not d: 
650                 break
651             if '#' in msg:
652                 msg = msg.split('#', 1)[0]
653                 break
654         try:
655             cmd, data = ast.literal_eval(msg)
656         except:
657             print "syntax error", repr(msg), ipaddr
658             conn.close()
659             return
660
661         out = do_command(cmd, data, ipaddr)
662         if out:
663             #print ipaddr, cmd, len(out)
664             try:
665                 conn.send(out)
666             except:
667                 print "error, could not send"
668
669     finally:
670         conn.close()
671
672
673 def timestr():
674     return time.strftime("[%d/%m/%Y-%H:%M:%S]")
675
676 # used by the native handler
677 def do_command(cmd, data, ipaddr):
678
679     if cmd=='b':
680         out = "%d"%block_number
681
682     elif cmd in ['session','new_session']:
683         try:
684             if cmd == 'session':
685                 addresses = ast.literal_eval(data)
686                 version = "old"
687             else:
688                 version, addresses = ast.literal_eval(data)
689                 if version[0]=="0": version = "v" + version
690         except:
691             print "error", data
692             return None
693         print timestr(), "new session", ipaddr, addresses[0] if addresses else addresses, len(addresses), version
694         out = new_session(version, addresses)
695
696     elif cmd=='address.subscribe':
697         try:
698             session_id, addr = ast.literal_eval(data)
699         except:
700             traceback.print_exc(file=sys.stdout)
701             print data
702             return None
703         out = add_address_to_session(session_id,addr)
704
705     elif cmd=='update_session':
706         try:
707             session_id, addresses = ast.literal_eval(data)
708         except:
709             traceback.print_exc(file=sys.stdout)
710             return None
711         print timestr(), "update session", ipaddr, addresses[0] if addresses else addresses, len(addresses)
712         out = update_session(session_id,addresses)
713             
714     elif cmd=='poll': 
715         out = poll_session(data)
716
717     elif cmd == 'h': 
718         # history
719         address = data
720         out = repr( store.get_history( address ) )
721
722     elif cmd == 'load': 
723         out = cmd_load(None,None,data)
724
725     elif cmd =='tx':
726         out = store.send_tx(data)
727         print timestr(), "sent tx:", ipaddr, out
728
729     elif cmd == 'stop':
730         out = cmd_stop(data)
731
732     elif cmd == 'peers':
733         out = repr(peer_list.values())
734
735     else:
736         out = None
737
738     return out
739
740
741
742 ####################################################################
743
744 def tcp_server_thread():
745     thread.start_new_thread(process_input_queue, ())
746     thread.start_new_thread(process_output_queue, ())
747
748     s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
749     s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
750     s.bind((config.get('server','host'), 50001))
751     s.listen(1)
752     while not stopping:
753         conn, addr = s.accept()
754         try:
755             thread.start_new_thread(tcp_client_thread, (addr, conn,))
756         except:
757             # can't start new thread if there is no memory..
758             traceback.print_exc(file=sys.stdout)
759
760
761 def close_session(session_id):
762     #print "lost connection", session_id
763     sessions.pop(session_id)
764     if session_id in sessions_sub_numblocks:
765         sessions_sub_numblocks.pop(session_id)
766
767
768 # one thread per client. put requests in a queue.
769 def tcp_client_thread(ipaddr,conn):
770     """ use a persistent connection. put commands in a queue."""
771
772     print timestr(), "TCP session", ipaddr
773     global sessions
774
775     session_id = random_string(10)
776     sessions[session_id] = { 'conn':conn, 'addresses':{}, 'version':'unknown', 'type':'persistent' }
777
778     ipaddr = ipaddr[0]
779     msg = ''
780
781     while not stopping:
782         try:
783             d = conn.recv(1024)
784         except socket.error:
785             d = ''
786         if not d:
787             close_session(session_id)
788             break
789
790         msg += d
791         while True:
792             s = msg.find('\n')
793             if s ==-1:
794                 break
795             else:
796                 c = msg[0:s].strip()
797                 msg = msg[s+1:]
798                 if c == 'quit': 
799                     conn.close()
800                     close_session(session_id)
801                     return
802                 try:
803                     c = json.loads(c)
804                 except:
805                     print "json error", repr(c)
806                     continue
807                 try:
808                     message_id = c.get('id')
809                     method = c.get('method')
810                     params = c.get('params')
811                 except:
812                     print "syntax error", repr(c), ipaddr
813                     continue
814
815                 # add to queue
816                 input_queue.put((session_id, message_id, method, params))
817
818
819
820 # read commands from the input queue. perform requests, etc. this should be called from the main thread.
821 def process_input_queue():
822     while not stopping:
823         session_id, message_id, method, data = input_queue.get()
824         if session_id not in sessions.keys():
825             continue
826         out = None
827         if method == 'address.subscribe':
828             address = data[0]
829             subscribe_to_address(session_id,message_id,address)
830         elif method == 'numblocks.subscribe':
831             subscribe_to_numblocks(session_id,message_id)
832         elif method == 'client.version':
833             sessions[session_id]['version'] = data[0]
834         elif method == 'server.banner':
835             out = { 'result':config.get('server','banner').replace('\\n','\n') } 
836         elif method == 'server.peers':
837             out = { 'result':peer_list.values() } 
838         elif method == 'address.get_history':
839             address = data[0]
840             out = { 'result':store.get_history( address ) } 
841         elif method == 'transaction.broadcast':
842             txo = store.send_tx(data[0])
843             print "sent tx:", txo
844             out = {'result':txo }
845         else:
846             print "unknown command", method
847         if out:
848             out['id'] = message_id
849             out = json.dumps( out )
850             output_queue.put((session_id, out))
851
852 # this is a separate thread
853 def process_output_queue():
854     while not stopping:
855         session_id, out = output_queue.get()
856         session = sessions.get(session_id)
857         if session: 
858             try:
859                 conn = session.get('conn')
860                 conn.send(out+'\n')
861             except:
862                 close_session(session_id)
863                 
864
865
866
867 ####################################################################
868
869
870
871
872 def clean_session_thread():
873     while not stopping:
874         time.sleep(30)
875         t = time.time()
876         for k,s in sessions.items():
877             if s.get('type') == 'persistent': continue
878             t0 = s['last_time']
879             if t - t0 > 5*60:
880                 sessions.pop(k)
881                 print "lost session", k
882             
883
884 def irc_thread():
885     global peer_list
886     NICK = 'E_'+random_string(10)
887     while not stopping:
888         try:
889             s = socket.socket()
890             s.connect(('irc.freenode.net', 6667))
891             s.send('USER electrum 0 * :'+config.get('server','host')+' '+config.get('server','ircname')+'\n')
892             s.send('NICK '+NICK+'\n')
893             s.send('JOIN #electrum\n')
894             sf = s.makefile('r', 0)
895             t = 0
896             while not stopping:
897                 line = sf.readline()
898                 line = line.rstrip('\r\n')
899                 line = line.split()
900                 if line[0]=='PING': 
901                     s.send('PONG '+line[1]+'\n')
902                 elif '353' in line: # answer to /names
903                     k = line.index('353')
904                     for item in line[k+1:]:
905                         if item[0:2] == 'E_':
906                             s.send('WHO %s\n'%item)
907                 elif '352' in line: # answer to /who
908                     # warning: this is a horrible hack which apparently works
909                     k = line.index('352')
910                     ip = line[k+4]
911                     ip = socket.gethostbyname(ip)
912                     name = line[k+6]
913                     host = line[k+9]
914                     peer_list[name] = (ip,host)
915                 if time.time() - t > 5*60:
916                     s.send('NAMES #electrum\n')
917                     t = time.time()
918                     peer_list = {}
919         except:
920             traceback.print_exc(file=sys.stdout)
921         finally:
922             sf.close()
923             s.close()
924
925
926 def get_peers_json(_,__):
927     return peer_list.values()
928
929 def http_server_thread():
930     # see http://code.google.com/p/jsonrpclib/
931     from SocketServer import ThreadingMixIn
932     from StratumJSONRPCServer import StratumJSONRPCServer
933     class StratumThreadedJSONRPCServer(ThreadingMixIn, StratumJSONRPCServer): pass
934     server = StratumThreadedJSONRPCServer(( config.get('server','host'), 8081))
935     server.register_function(get_peers_json, 'server.peers')
936     server.register_function(cmd_stop, 'stop')
937     server.register_function(cmd_load, 'load')
938     server.register_function(get_banner, 'server.banner')
939     server.register_function(lambda a,b,c: store.send_tx(c), 'transaction.broadcast')
940     server.register_function(address_get_history_json, 'address.get_history')
941     server.register_function(add_address_to_session_json, 'address.subscribe')
942     server.register_function(subscribe_to_numblocks_json, 'numblocks.subscribe')
943     server.register_function(client_version_json, 'client.version')
944     server.register_function(create_session_json, 'session.create')   # internal message (not part of protocol)
945     server.register_function(poll_session_json, 'session.poll')       # internal message (not part of protocol)
946     server.serve_forever()
947
948
949 if __name__ == '__main__':
950
951     if len(sys.argv)>1:
952         import jsonrpclib
953         server = jsonrpclib.Server('http://%s:8081'%config.get('server','host'))
954         cmd = sys.argv[1]
955         if cmd == 'load':
956             out = server.load(password)
957         elif cmd == 'peers':
958             out = server.server.peers()
959         elif cmd == 'stop':
960             out = server.stop(password)
961         elif cmd == 'clear_cache':
962             out = server.clear_cache(password)
963         elif cmd == 'get_cache':
964             out = server.get_cache(password,sys.argv[2])
965         elif cmd == 'h':
966             out = server.address.get_history(sys.argv[2])
967         elif cmd == 'tx':
968             out = server.transaction.broadcast(sys.argv[2])
969         elif cmd == 'b':
970             out = server.numblocks.subscribe()
971         else:
972             out = "Unknown command: '%s'" % cmd
973         print out
974         sys.exit(0)
975
976     # backend
977     # from db import MyStore
978     store = MyStore(config)
979
980     # supported protocols
981     thread.start_new_thread(native_server_thread, ())
982     thread.start_new_thread(tcp_server_thread, ())
983     thread.start_new_thread(http_server_thread, ())
984     thread.start_new_thread(clean_session_thread, ())
985
986     if (config.get('server','irc') == 'yes' ):
987         thread.start_new_thread(irc_thread, ())
988
989     print "starting Electrum server"
990
991     old_block_number = None
992     while not stopping:
993         block_number = store.main_iteration()
994
995         if block_number != old_block_number:
996             old_block_number = block_number
997             for session_id in sessions_sub_numblocks.keys():
998                 send_numblocks(session_id)
999         while True:
1000             try:
1001                 addr = store.address_queue.get(False)
1002             except:
1003                 break
1004             do_update_address(addr)
1005
1006         time.sleep(10)
1007     print "server stopped"
1008