fix: update last_time
[electrum-server.git] / server.py
1 #!/usr/bin/env python
2 # Copyright(C) 2012 thomasv@gitorious
3
4 # This program is free software: you can redistribute it and/or modify
5 # it under the terms of the GNU Affero General Public License as
6 # published by the Free Software Foundation, either version 3 of the
7 # License, or (at your option) any later version.
8 #
9 # This program is distributed in the hope that it will be useful, but
10 # WITHOUT ANY WARRANTY; without even the implied warranty of
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
12 # Affero General Public License for more details.
13 #
14 # You should have received a copy of the GNU Affero General Public
15 # License along with this program.  If not, see
16 # <http://www.gnu.org/licenses/agpl.html>.
17
18 """
19 Todo:
20    * server should check and return bitcoind status..
21    * improve txpoint sorting
22    * command to check cache
23
24  mempool transactions do not need to be added to the database; it slows it down
25 """
26
27
28 from Abe.abe import hash_to_address, decode_check_address
29 from Abe.DataStore import DataStore as Datastore_class
30 from Abe import DataStore, readconf, BCDataStream,  deserialize, util, base58
31
32 import psycopg2, binascii
33
34 import thread, traceback, sys, urllib, operator
35 from json import dumps, loads
36
37
38 class MyStore(Datastore_class):
39
40     def __init__(self, config):
41         conf = DataStore.CONFIG_DEFAULTS
42         args, argv = readconf.parse_argv( [], conf)
43         args.dbtype = config.get('database','type')
44         if args.dbtype == 'sqlite3':
45             args.connect_args = { 'database' : config.get('database','database') }
46         elif args.dbtype == 'MySQLdb':
47             args.connect_args = { 'db' : config.get('database','database'), 'user' : config.get('database','username'), 'passwd' : config.get('database','password') }
48         elif args.dbtype == 'psycopg2':
49             args.connect_args = { 'database' : config.get('database','database') }
50
51         Datastore_class.__init__(self,args)
52
53         self.tx_cache = {}
54         self.mempool_keys = {}
55         self.bitcoind_url = 'http://%s:%s@%s:%s/' % ( config.get('bitcoind','user'), config.get('bitcoind','password'), config.get('bitcoind','host'), config.get('bitcoind','port'))
56
57         self.address_queue = Queue()
58
59         self.dblock = thread.allocate_lock()
60
61
62
63     def import_block(self, b, chain_ids=frozenset()):
64         block_id = super(MyStore, self).import_block(b, chain_ids)
65         for pos in xrange(len(b['transactions'])):
66             tx = b['transactions'][pos]
67             if 'hash' not in tx:
68                 tx['hash'] = util.double_sha256(tx['tx'])
69             tx_id = store.tx_find_id_and_value(tx)
70             if tx_id:
71                 self.update_tx_cache(tx_id)
72             else:
73                 print "error: import_block: no tx_id"
74         return block_id
75
76
77     def update_tx_cache(self, txid):
78         inrows = self.get_tx_inputs(txid, False)
79         for row in inrows:
80             _hash = self.binout(row[6])
81             address = hash_to_address(chr(0), _hash)
82             if self.tx_cache.has_key(address):
83                 print "cache: invalidating", address
84                 self.tx_cache.pop(address)
85             self.address_queue.put(address)
86
87         outrows = self.get_tx_outputs(txid, False)
88         for row in outrows:
89             _hash = self.binout(row[6])
90             address = hash_to_address(chr(0), _hash)
91             if self.tx_cache.has_key(address):
92                 print "cache: invalidating", address
93                 self.tx_cache.pop(address)
94             self.address_queue.put(address)
95
96     def safe_sql(self,sql, params=(), lock=True):
97         try:
98             if lock: self.dblock.acquire()
99             ret = self.selectall(sql,params)
100             if lock: self.dblock.release()
101             return ret
102         except:
103             print "sql error", sql
104             return []
105
106     def get_tx_outputs(self, tx_id, lock=True):
107         return self.safe_sql("""SELECT
108                 txout.txout_pos,
109                 txout.txout_scriptPubKey,
110                 txout.txout_value,
111                 nexttx.tx_hash,
112                 nexttx.tx_id,
113                 txin.txin_pos,
114                 pubkey.pubkey_hash
115               FROM txout
116               LEFT JOIN txin ON (txin.txout_id = txout.txout_id)
117               LEFT JOIN pubkey ON (pubkey.pubkey_id = txout.pubkey_id)
118               LEFT JOIN tx nexttx ON (txin.tx_id = nexttx.tx_id)
119              WHERE txout.tx_id = %d 
120              ORDER BY txout.txout_pos
121         """%(tx_id), (), lock)
122
123     def get_tx_inputs(self, tx_id, lock=True):
124         return self.safe_sql(""" SELECT
125                 txin.txin_pos,
126                 txin.txin_scriptSig,
127                 txout.txout_value,
128                 COALESCE(prevtx.tx_hash, u.txout_tx_hash),
129                 prevtx.tx_id,
130                 COALESCE(txout.txout_pos, u.txout_pos),
131                 pubkey.pubkey_hash
132               FROM txin
133               LEFT JOIN txout ON (txout.txout_id = txin.txout_id)
134               LEFT JOIN pubkey ON (pubkey.pubkey_id = txout.pubkey_id)
135               LEFT JOIN tx prevtx ON (txout.tx_id = prevtx.tx_id)
136               LEFT JOIN unlinked_txin u ON (u.txin_id = txin.txin_id)
137              WHERE txin.tx_id = %d
138              ORDER BY txin.txin_pos
139              """%(tx_id,), (), lock)
140
141     def get_address_out_rows(self, dbhash):
142         return self.safe_sql(""" SELECT
143                 b.block_nTime,
144                 cc.chain_id,
145                 b.block_height,
146                 1,
147                 b.block_hash,
148                 tx.tx_hash,
149                 tx.tx_id,
150                 txin.txin_pos,
151                 -prevout.txout_value
152               FROM chain_candidate cc
153               JOIN block b ON (b.block_id = cc.block_id)
154               JOIN block_tx ON (block_tx.block_id = b.block_id)
155               JOIN tx ON (tx.tx_id = block_tx.tx_id)
156               JOIN txin ON (txin.tx_id = tx.tx_id)
157               JOIN txout prevout ON (txin.txout_id = prevout.txout_id)
158               JOIN pubkey ON (pubkey.pubkey_id = prevout.pubkey_id)
159              WHERE pubkey.pubkey_hash = ?
160                AND cc.in_longest = 1""", (dbhash,))
161
162     def get_address_out_rows_memorypool(self, dbhash):
163         return self.safe_sql(""" SELECT
164                 1,
165                 tx.tx_hash,
166                 tx.tx_id,
167                 txin.txin_pos,
168                 -prevout.txout_value
169               FROM tx 
170               JOIN txin ON (txin.tx_id = tx.tx_id)
171               JOIN txout prevout ON (txin.txout_id = prevout.txout_id)
172               JOIN pubkey ON (pubkey.pubkey_id = prevout.pubkey_id)
173              WHERE pubkey.pubkey_hash = ? """, (dbhash,))
174
175     def get_address_in_rows(self, dbhash):
176         return self.safe_sql(""" SELECT
177                 b.block_nTime,
178                 cc.chain_id,
179                 b.block_height,
180                 0,
181                 b.block_hash,
182                 tx.tx_hash,
183                 tx.tx_id,
184                 txout.txout_pos,
185                 txout.txout_value
186               FROM chain_candidate cc
187               JOIN block b ON (b.block_id = cc.block_id)
188               JOIN block_tx ON (block_tx.block_id = b.block_id)
189               JOIN tx ON (tx.tx_id = block_tx.tx_id)
190               JOIN txout ON (txout.tx_id = tx.tx_id)
191               JOIN pubkey ON (pubkey.pubkey_id = txout.pubkey_id)
192              WHERE pubkey.pubkey_hash = ?
193                AND cc.in_longest = 1""", (dbhash,))
194
195     def get_address_in_rows_memorypool(self, dbhash):
196         return self.safe_sql( """ SELECT
197                 0,
198                 tx.tx_hash,
199                 tx.tx_id,
200                 txout.txout_pos,
201                 txout.txout_value
202               FROM tx
203               JOIN txout ON (txout.tx_id = tx.tx_id)
204               JOIN pubkey ON (pubkey.pubkey_id = txout.pubkey_id)
205              WHERE pubkey.pubkey_hash = ? """, (dbhash,))
206
207     def get_history(self, addr):
208         
209         cached_version = self.tx_cache.get( addr )
210         if cached_version is not None:
211             return cached_version
212
213         version, binaddr = decode_check_address(addr)
214         if binaddr is None:
215             return None
216
217         dbhash = self.binin(binaddr)
218         rows = []
219         rows += self.get_address_out_rows( dbhash )
220         rows += self.get_address_in_rows( dbhash )
221
222         txpoints = []
223         known_tx = []
224
225         for row in rows:
226             try:
227                 nTime, chain_id, height, is_in, blk_hash, tx_hash, tx_id, pos, value = row
228             except:
229                 print "cannot unpack row", row
230                 break
231             tx_hash = self.hashout_hex(tx_hash)
232             txpoint = {
233                     "nTime":    int(nTime),
234                     "height":   int(height),
235                     "is_in":    int(is_in),
236                     "blk_hash": self.hashout_hex(blk_hash),
237                     "tx_hash":  tx_hash,
238                     "tx_id":    int(tx_id),
239                     "pos":      int(pos),
240                     "value":    int(value),
241                     }
242
243             txpoints.append(txpoint)
244             known_tx.append(self.hashout_hex(tx_hash))
245
246
247         # todo: sort them really...
248         txpoints = sorted(txpoints, key=operator.itemgetter("nTime"))
249
250         # read memory pool
251         rows = []
252         rows += self.get_address_in_rows_memorypool( dbhash )
253         rows += self.get_address_out_rows_memorypool( dbhash )
254         address_has_mempool = False
255
256         for row in rows:
257             is_in, tx_hash, tx_id, pos, value = row
258             tx_hash = self.hashout_hex(tx_hash)
259             if tx_hash in known_tx:
260                 continue
261
262             # this means that pending transactions were added to the db, even if they are not returned by getmemorypool
263             address_has_mempool = True
264
265             # this means pending transactions are returned by getmemorypool
266             if tx_hash not in self.mempool_keys:
267                 continue
268
269             #print "mempool", tx_hash
270             txpoint = {
271                     "nTime":    0,
272                     "height":   0,
273                     "is_in":    int(is_in),
274                     "blk_hash": 'mempool', 
275                     "tx_hash":  tx_hash,
276                     "tx_id":    int(tx_id),
277                     "pos":      int(pos),
278                     "value":    int(value),
279                     }
280             txpoints.append(txpoint)
281
282
283         for txpoint in txpoints:
284             tx_id = txpoint['tx_id']
285             
286             txinputs = []
287             inrows = self.get_tx_inputs(tx_id)
288             for row in inrows:
289                 _hash = self.binout(row[6])
290                 address = hash_to_address(chr(0), _hash)
291                 txinputs.append(address)
292             txpoint['inputs'] = txinputs
293             txoutputs = []
294             outrows = self.get_tx_outputs(tx_id)
295             for row in outrows:
296                 _hash = self.binout(row[6])
297                 address = hash_to_address(chr(0), _hash)
298                 txoutputs.append(address)
299             txpoint['outputs'] = txoutputs
300
301             # for all unspent inputs, I want their scriptpubkey. (actually I could deduce it from the address)
302             if not txpoint['is_in']:
303                 # detect if already redeemed...
304                 for row in outrows:
305                     if row[6] == dbhash: break
306                 else:
307                     raise
308                 #row = self.get_tx_output(tx_id,dbhash)
309                 # pos, script, value, o_hash, o_id, o_pos, binaddr = row
310                 # if not redeemed, we add the script
311                 if row:
312                     if not row[4]: txpoint['raw_scriptPubKey'] = row[1]
313
314         # cache result
315         if not address_has_mempool:
316             self.tx_cache[addr] = txpoints
317         
318         return txpoints
319
320
321
322     def memorypool_update(store):
323
324         ds = BCDataStream.BCDataStream()
325         previous_transactions = store.mempool_keys
326         store.mempool_keys = []
327
328         postdata = dumps({"method": 'getmemorypool', 'params': [], 'id':'jsonrpc'})
329
330         respdata = urllib.urlopen(store.bitcoind_url, postdata).read()
331         r = loads(respdata)
332         if r['error'] != None:
333             return
334
335         v = r['result'].get('transactions')
336         for hextx in v:
337             ds.clear()
338             ds.write(hextx.decode('hex'))
339             tx = deserialize.parse_Transaction(ds)
340             tx['hash'] = util.double_sha256(tx['tx'])
341             tx_hash = store.hashin(tx['hash'])
342
343             store.mempool_keys.append(tx_hash)
344             if store.tx_find_id_and_value(tx):
345                 pass
346             else:
347                 tx_id = store.import_tx(tx, False)
348                 store.update_tx_cache(tx_id)
349     
350         store.commit()
351
352
353     def send_tx(self,tx):
354         postdata = dumps({"method": 'importtransaction', 'params': [tx], 'id':'jsonrpc'})
355         respdata = urllib.urlopen(self.bitcoind_url, postdata).read()
356         r = loads(respdata)
357         if r['error'] != None:
358             out = "error: transaction rejected by memorypool\n"+tx
359         else:
360             out = r['result']
361         return out
362
363
364     def main_iteration(store):
365         try:
366             store.dblock.acquire()
367             store.catch_up()
368             store.memorypool_update()
369             block_number = store.get_block_number(1)
370
371         except IOError:
372             print "IOError: cannot reach bitcoind"
373             block_number = 0
374         except:
375             traceback.print_exc(file=sys.stdout)
376             block_number = 0
377         finally:
378             store.dblock.release()
379
380         return block_number
381
382
383
384 import time, json, socket, operator, thread, ast, sys, re, traceback
385 import ConfigParser
386 from json import dumps, loads
387 import urllib
388
389
390 config = ConfigParser.ConfigParser()
391 # set some defaults, which will be overwritten by the config file
392 config.add_section('server')
393 config.set('server','banner', 'Welcome to Electrum!')
394 config.set('server', 'host', 'localhost')
395 config.set('server', 'port', '50000')
396 config.set('server', 'password', '')
397 config.set('server', 'irc', 'yes')
398 config.set('server', 'ircname', 'Electrum server')
399 config.add_section('database')
400 config.set('database', 'type', 'psycopg2')
401 config.set('database', 'database', 'abe')
402
403 try:
404     f = open('/etc/electrum.conf','r')
405     config.readfp(f)
406     f.close()
407 except:
408     print "Could not read electrum.conf. I will use the default values."
409
410 try:
411     f = open('/etc/electrum.banner','r')
412     config.set('server','banner', f.read())
413     f.close()
414 except:
415     pass
416
417
418 password = config.get('server','password')
419
420 stopping = False
421 block_number = -1
422 sessions = {}
423 sessions_sub_numblocks = {} # sessions that have subscribed to the service
424
425 m_sessions = [{}] # served by http
426
427 peer_list = {}
428
429 wallets = {} # for ultra-light clients such as bccapi
430
431 from Queue import Queue
432 input_queue = Queue()
433 output_queue = Queue()
434
435
436
437
438 def random_string(N):
439     import random, string
440     return ''.join(random.choice(string.ascii_uppercase + string.digits) for x in range(N))
441
442     
443
444 def cmd_stop(_,__,pw):
445     global stopping
446     if password == pw:
447         stopping = True
448         return 'ok'
449     else:
450         return 'wrong password'
451
452 def cmd_load(_,__,pw):
453     if password == pw:
454         return repr( len(sessions) )
455     else:
456         return 'wrong password'
457
458
459
460
461
462 def modified_addresses(a_session):
463     #t1 = time.time()
464     import copy
465     session = copy.deepcopy(a_session)
466     addresses = session['addresses']
467     session['last_time'] = time.time()
468     ret = {}
469     k = 0
470     for addr in addresses:
471         status = get_address_status( addr )
472         msg_id, last_status = addresses.get( addr )
473         if last_status != status:
474             addresses[addr] = msg_id, status
475             ret[addr] = status
476
477     #t2 = time.time() - t1 
478     #if t2 > 10: print "high load:", session_id, "%d/%d"%(k,len(addresses)), t2
479     return ret, addresses
480
481
482 def poll_session(session_id): 
483     # native
484     session = sessions.get(session_id)
485     if session is None:
486         print time.asctime(), "session not found", session_id
487         return -1, {}
488     else:
489         sessions[session_id]['last_time'] = time.time()
490         ret, addresses = modified_addresses(session)
491         if ret: sessions[session_id]['addresses'] = addresses
492         return repr( (block_number,ret))
493
494
495 def poll_session_json(session_id, message_id):
496     session = m_sessions[0].get(session_id)
497     if session is None:
498         raise BaseException("session not found %s"%session_id)
499     else:
500         m_sessions[0][session_id]['last_time'] = time.time()
501         out = []
502         ret, addresses = modified_addresses(session)
503         if ret: 
504             m_sessions[0][session_id]['addresses'] = addresses
505             for addr in ret:
506                 msg_id, status = addresses[addr]
507                 out.append(  { 'id':msg_id, 'result':status } )
508
509         msg_id, last_nb = session.get('numblocks')
510         if last_nb:
511             if last_nb != block_number:
512                 m_sessions[0][session_id]['numblocks'] = msg_id, block_number
513                 out.append( {'id':msg_id, 'result':block_number} )
514
515         return out
516
517
518 def do_update_address(addr):
519     # an address was involved in a transaction; we check if it was subscribed to in a session
520     # the address can be subscribed in several sessions; the cache should ensure that we don't do redundant requests
521
522     for session_id in sessions.keys():
523         session = sessions[session_id]
524         if session.get('type') != 'persistent': continue
525         addresses = session['addresses'].keys()
526
527         if addr in addresses:
528             status = get_address_status( addr )
529             message_id, last_status = session['addresses'][addr]
530             if last_status != status:
531                 #print "sending new status for %s:"%addr, status
532                 send_status(session_id,message_id,addr,status)
533                 sessions[session_id]['addresses'][addr] = (message_id,status)
534
535 def get_address_status(addr):
536     # get address status, i.e. the last block for that address.
537     tx_points = store.get_history(addr)
538     if not tx_points:
539         status = None
540     else:
541         lastpoint = tx_points[-1]
542         status = lastpoint['blk_hash']
543         # this is a temporary hack; move it up once old clients have disappeared
544         if status == 'mempool': # and session['version'] != "old":
545             status = status + ':%d'% len(tx_points)
546     return status
547
548
549 def send_numblocks(session_id):
550     message_id = sessions_sub_numblocks[session_id]
551     out = json.dumps( {'id':message_id, 'result':block_number} )
552     output_queue.put((session_id, out))
553
554 def send_status(session_id, message_id, address, status):
555     out = json.dumps( { 'id':message_id, 'result':status } )
556     output_queue.put((session_id, out))
557
558 def address_get_history_json(_,message_id,address):
559     return store.get_history(address)
560
561 def subscribe_to_numblocks(session_id, message_id):
562     sessions_sub_numblocks[session_id] = message_id
563     send_numblocks(session_id)
564
565 def subscribe_to_numblocks_json(session_id, message_id):
566     global m_sessions
567     m_sessions[0][session_id]['numblocks'] = message_id,block_number
568     return block_number
569
570 def subscribe_to_address(session_id, message_id, address):
571     status = get_address_status(address)
572     sessions[session_id]['addresses'][address] = (message_id, status)
573     sessions[session_id]['last_time'] = time.time()
574     send_status(session_id, message_id, address, status)
575
576 def add_address_to_session_json(session_id, message_id, address):
577     global m_sessions
578     sessions = m_sessions[0]
579     status = get_address_status(address)
580     sessions[session_id]['addresses'][address] = (message_id, status)
581     sessions[session_id]['last_time'] = time.time()
582     m_sessions[0] = sessions
583     return status
584
585 def add_address_to_session(session_id, address):
586     status = get_address_status(address)
587     sessions[session_id]['addresses'][address] = ("", status)
588     sessions[session_id]['last_time'] = time.time()
589     return status
590
591 def new_session(version, addresses):
592     session_id = random_string(10)
593     sessions[session_id] = { 'addresses':{}, 'version':version }
594     for a in addresses:
595         sessions[session_id]['addresses'][a] = ('','')
596     out = repr( (session_id, config.get('server','banner').replace('\\n','\n') ) )
597     sessions[session_id]['last_time'] = time.time()
598     return out
599
600
601 def client_version_json(session_id, _, version):
602     global m_sessions
603     sessions = m_sessions[0]
604     sessions[session_id]['version'] = version
605     m_sessions[0] = sessions
606
607 def create_session_json(_, __):
608     sessions = m_sessions[0]
609     session_id = random_string(10)
610     print "creating session", session_id
611     sessions[session_id] = { 'addresses':{}, 'numblocks':('','') }
612     sessions[session_id]['last_time'] = time.time()
613     m_sessions[0] = sessions
614     return session_id
615
616
617
618 def get_banner(_,__):
619     return config.get('server','banner').replace('\\n','\n')
620
621 def update_session(session_id,addresses):
622     """deprecated in 0.42"""
623     sessions[session_id]['addresses'] = {}
624     for a in addresses:
625         sessions[session_id]['addresses'][a] = ''
626     sessions[session_id]['last_time'] = time.time()
627     return 'ok'
628
629 def native_server_thread():
630     s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
631     s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
632     s.bind((config.get('server','host'), config.getint('server','port')))
633     s.listen(1)
634     while not stopping:
635         conn, addr = s.accept()
636         try:
637             thread.start_new_thread(native_client_thread, (addr, conn,))
638         except:
639             # can't start new thread if there is no memory..
640             traceback.print_exc(file=sys.stdout)
641
642
643 def native_client_thread(ipaddr,conn):
644     #print "client thread", ipaddr
645     try:
646         ipaddr = ipaddr[0]
647         msg = ''
648         while 1:
649             d = conn.recv(1024)
650             msg += d
651             if not d: 
652                 break
653             if '#' in msg:
654                 msg = msg.split('#', 1)[0]
655                 break
656         try:
657             cmd, data = ast.literal_eval(msg)
658         except:
659             print "syntax error", repr(msg), ipaddr
660             conn.close()
661             return
662
663         out = do_command(cmd, data, ipaddr)
664         if out:
665             #print ipaddr, cmd, len(out)
666             try:
667                 conn.send(out)
668             except:
669                 print "error, could not send"
670
671     finally:
672         conn.close()
673
674
675 def timestr():
676     return time.strftime("[%d/%m/%Y-%H:%M:%S]")
677
678 # used by the native handler
679 def do_command(cmd, data, ipaddr):
680
681     if cmd=='b':
682         out = "%d"%block_number
683
684     elif cmd in ['session','new_session']:
685         try:
686             if cmd == 'session':
687                 addresses = ast.literal_eval(data)
688                 version = "old"
689             else:
690                 version, addresses = ast.literal_eval(data)
691                 if version[0]=="0": version = "v" + version
692         except:
693             print "error", data
694             return None
695         print timestr(), "new session", ipaddr, addresses[0] if addresses else addresses, len(addresses), version
696         out = new_session(version, addresses)
697
698     elif cmd=='address.subscribe':
699         try:
700             session_id, addr = ast.literal_eval(data)
701         except:
702             traceback.print_exc(file=sys.stdout)
703             print data
704             return None
705         out = add_address_to_session(session_id,addr)
706
707     elif cmd=='update_session':
708         try:
709             session_id, addresses = ast.literal_eval(data)
710         except:
711             traceback.print_exc(file=sys.stdout)
712             return None
713         print timestr(), "update session", ipaddr, addresses[0] if addresses else addresses, len(addresses)
714         out = update_session(session_id,addresses)
715             
716     elif cmd=='poll': 
717         out = poll_session(data)
718
719     elif cmd == 'h': 
720         # history
721         address = data
722         out = repr( store.get_history( address ) )
723
724     elif cmd == 'load': 
725         out = cmd_load(None,None,data)
726
727     elif cmd =='tx':
728         out = store.send_tx(data)
729         print timestr(), "sent tx:", ipaddr, out
730
731     elif cmd == 'stop':
732         out = cmd_stop(data)
733
734     elif cmd == 'peers':
735         out = repr(peer_list.values())
736
737     else:
738         out = None
739
740     return out
741
742
743
744 ####################################################################
745
746 def tcp_server_thread():
747     thread.start_new_thread(process_input_queue, ())
748     thread.start_new_thread(process_output_queue, ())
749
750     s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
751     s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
752     s.bind((config.get('server','host'), 50001))
753     s.listen(1)
754     while not stopping:
755         conn, addr = s.accept()
756         try:
757             thread.start_new_thread(tcp_client_thread, (addr, conn,))
758         except:
759             # can't start new thread if there is no memory..
760             traceback.print_exc(file=sys.stdout)
761
762
763 def close_session(session_id):
764     #print "lost connection", session_id
765     sessions.pop(session_id)
766     if session_id in sessions_sub_numblocks:
767         sessions_sub_numblocks.pop(session_id)
768
769
770 # one thread per client. put requests in a queue.
771 def tcp_client_thread(ipaddr,conn):
772     """ use a persistent connection. put commands in a queue."""
773
774     print timestr(), "TCP session", ipaddr
775     global sessions
776
777     session_id = random_string(10)
778     sessions[session_id] = { 'conn':conn, 'addresses':{}, 'version':'unknown', 'type':'persistent' }
779
780     ipaddr = ipaddr[0]
781     msg = ''
782
783     while not stopping:
784         try:
785             d = conn.recv(1024)
786         except socket.error:
787             d = ''
788         if not d:
789             close_session(session_id)
790             break
791
792         msg += d
793         while True:
794             s = msg.find('\n')
795             if s ==-1:
796                 break
797             else:
798                 c = msg[0:s].strip()
799                 msg = msg[s+1:]
800                 if c == 'quit': 
801                     conn.close()
802                     close_session(session_id)
803                     return
804                 try:
805                     c = json.loads(c)
806                 except:
807                     print "json error", repr(c)
808                     continue
809                 try:
810                     message_id = c.get('id')
811                     method = c.get('method')
812                     params = c.get('params')
813                 except:
814                     print "syntax error", repr(c), ipaddr
815                     continue
816
817                 # add to queue
818                 input_queue.put((session_id, message_id, method, params))
819
820
821
822 # read commands from the input queue. perform requests, etc. this should be called from the main thread.
823 def process_input_queue():
824     while not stopping:
825         session_id, message_id, method, data = input_queue.get()
826         if session_id not in sessions.keys():
827             continue
828         out = None
829         if method == 'address.subscribe':
830             address = data[0]
831             subscribe_to_address(session_id,message_id,address)
832         elif method == 'numblocks.subscribe':
833             subscribe_to_numblocks(session_id,message_id)
834         elif method == 'client.version':
835             sessions[session_id]['version'] = data[0]
836         elif method == 'server.banner':
837             out = { 'result':config.get('server','banner').replace('\\n','\n') } 
838         elif method == 'server.peers':
839             out = { 'result':peer_list.values() } 
840         elif method == 'address.get_history':
841             address = data[0]
842             out = { 'result':store.get_history( address ) } 
843         elif method == 'transaction.broadcast':
844             txo = store.send_tx(data[0])
845             print "sent tx:", txo
846             out = {'result':txo }
847         else:
848             print "unknown command", method
849         if out:
850             out['id'] = message_id
851             out = json.dumps( out )
852             output_queue.put((session_id, out))
853
854 # this is a separate thread
855 def process_output_queue():
856     while not stopping:
857         session_id, out = output_queue.get()
858         session = sessions.get(session_id)
859         if session: 
860             try:
861                 conn = session.get('conn')
862                 conn.send(out+'\n')
863             except:
864                 close_session(session_id)
865                 
866
867
868
869 ####################################################################
870
871
872
873
874 def clean_session_thread():
875     while not stopping:
876         time.sleep(30)
877         t = time.time()
878         for k,s in sessions.items():
879             if s.get('type') == 'persistent': continue
880             t0 = s['last_time']
881             if t - t0 > 5*60:
882                 sessions.pop(k)
883                 print "lost session", k
884             
885
886 def irc_thread():
887     global peer_list
888     NICK = 'E_'+random_string(10)
889     while not stopping:
890         try:
891             s = socket.socket()
892             s.connect(('irc.freenode.net', 6667))
893             s.send('USER electrum 0 * :'+config.get('server','host')+' '+config.get('server','ircname')+'\n')
894             s.send('NICK '+NICK+'\n')
895             s.send('JOIN #electrum\n')
896             sf = s.makefile('r', 0)
897             t = 0
898             while not stopping:
899                 line = sf.readline()
900                 line = line.rstrip('\r\n')
901                 line = line.split()
902                 if line[0]=='PING': 
903                     s.send('PONG '+line[1]+'\n')
904                 elif '353' in line: # answer to /names
905                     k = line.index('353')
906                     for item in line[k+1:]:
907                         if item[0:2] == 'E_':
908                             s.send('WHO %s\n'%item)
909                 elif '352' in line: # answer to /who
910                     # warning: this is a horrible hack which apparently works
911                     k = line.index('352')
912                     ip = line[k+4]
913                     ip = socket.gethostbyname(ip)
914                     name = line[k+6]
915                     host = line[k+9]
916                     peer_list[name] = (ip,host)
917                 if time.time() - t > 5*60:
918                     s.send('NAMES #electrum\n')
919                     t = time.time()
920                     peer_list = {}
921         except:
922             traceback.print_exc(file=sys.stdout)
923         finally:
924             sf.close()
925             s.close()
926
927
928 def get_peers_json(_,__):
929     return peer_list.values()
930
931 def http_server_thread():
932     # see http://code.google.com/p/jsonrpclib/
933     from SocketServer import ThreadingMixIn
934     from StratumJSONRPCServer import StratumJSONRPCServer
935     class StratumThreadedJSONRPCServer(ThreadingMixIn, StratumJSONRPCServer): pass
936     server = StratumThreadedJSONRPCServer(( config.get('server','host'), 8081))
937     server.register_function(get_peers_json, 'server.peers')
938     server.register_function(cmd_stop, 'stop')
939     server.register_function(cmd_load, 'load')
940     server.register_function(get_banner, 'server.banner')
941     server.register_function(lambda a,b,c: store.send_tx(c), 'transaction.broadcast')
942     server.register_function(address_get_history_json, 'address.get_history')
943     server.register_function(add_address_to_session_json, 'address.subscribe')
944     server.register_function(subscribe_to_numblocks_json, 'numblocks.subscribe')
945     server.register_function(client_version_json, 'client.version')
946     server.register_function(create_session_json, 'session.create')   # internal message (not part of protocol)
947     server.register_function(poll_session_json, 'session.poll')       # internal message (not part of protocol)
948     server.serve_forever()
949
950
951 if __name__ == '__main__':
952
953     if len(sys.argv)>1:
954         import jsonrpclib
955         server = jsonrpclib.Server('http://%s:8081'%config.get('server','host'))
956         cmd = sys.argv[1]
957         if cmd == 'load':
958             out = server.load(password)
959         elif cmd == 'peers':
960             out = server.server.peers()
961         elif cmd == 'stop':
962             out = server.stop(password)
963         elif cmd == 'clear_cache':
964             out = server.clear_cache(password)
965         elif cmd == 'get_cache':
966             out = server.get_cache(password,sys.argv[2])
967         elif cmd == 'h':
968             out = server.address.get_history(sys.argv[2])
969         elif cmd == 'tx':
970             out = server.transaction.broadcast(sys.argv[2])
971         elif cmd == 'b':
972             out = server.numblocks.subscribe()
973         else:
974             out = "Unknown command: '%s'" % cmd
975         print out
976         sys.exit(0)
977
978     # backend
979     # from db import MyStore
980     store = MyStore(config)
981
982     # supported protocols
983     thread.start_new_thread(native_server_thread, ())
984     thread.start_new_thread(tcp_server_thread, ())
985     thread.start_new_thread(http_server_thread, ())
986     thread.start_new_thread(clean_session_thread, ())
987
988     if (config.get('server','irc') == 'yes' ):
989         thread.start_new_thread(irc_thread, ())
990
991     print "starting Electrum server"
992
993     old_block_number = None
994     while not stopping:
995         block_number = store.main_iteration()
996
997         if block_number != old_block_number:
998             old_block_number = block_number
999             for session_id in sessions_sub_numblocks.keys():
1000                 send_numblocks(session_id)
1001         while True:
1002             try:
1003                 addr = store.address_queue.get(False)
1004             except:
1005                 break
1006             do_update_address(addr)
1007
1008         time.sleep(10)
1009     print "server stopped"
1010