move store back into main
[electrum-server.git] / server.py
1 #!/usr/bin/env python
2 # Copyright(C) 2012 thomasv@gitorious
3
4 # This program is free software: you can redistribute it and/or modify
5 # it under the terms of the GNU Affero General Public License as
6 # published by the Free Software Foundation, either version 3 of the
7 # License, or (at your option) any later version.
8 #
9 # This program is distributed in the hope that it will be useful, but
10 # WITHOUT ANY WARRANTY; without even the implied warranty of
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
12 # Affero General Public License for more details.
13 #
14 # You should have received a copy of the GNU Affero General Public
15 # License along with this program.  If not, see
16 # <http://www.gnu.org/licenses/agpl.html>.
17
18 """
19 Todo:
20    * server should check and return bitcoind status..
21    * improve txpoint sorting
22    * command to check cache
23
24  mempool transactions do not need to be added to the database; it slows it down
25 """
26
27
28 from Abe.abe import hash_to_address, decode_check_address
29 from Abe.DataStore import DataStore as Datastore_class
30 from Abe import DataStore, readconf, BCDataStream,  deserialize, util, base58
31
32 import psycopg2, binascii
33
34 import thread, traceback, sys, urllib, operator
35 from json import dumps, loads
36
37
38 class MyStore(Datastore_class):
39
40     def __init__(self, config, address_queue):
41         conf = DataStore.CONFIG_DEFAULTS
42         args, argv = readconf.parse_argv( [], conf)
43         args.dbtype = config.get('database','type')
44         if args.dbtype == 'sqlite3':
45             args.connect_args = { 'database' : config.get('database','database') }
46         elif args.dbtype == 'MySQLdb':
47             args.connect_args = { 'db' : config.get('database','database'), 'user' : config.get('database','username'), 'passwd' : config.get('database','password') }
48         elif args.dbtype == 'psycopg2':
49             args.connect_args = { 'database' : config.get('database','database') }
50
51         Datastore_class.__init__(self,args)
52
53         self.tx_cache = {}
54         self.mempool_keys = {}
55         self.bitcoind_url = 'http://%s:%s@%s:%s/' % ( config.get('bitcoind','user'), config.get('bitcoind','password'), config.get('bitcoind','host'), config.get('bitcoind','port'))
56
57         self.address_queue = address_queue
58         self.dblock = thread.allocate_lock()
59
60
61
62     def import_block(self, b, chain_ids=frozenset()):
63         block_id = super(MyStore, self).import_block(b, chain_ids)
64         for pos in xrange(len(b['transactions'])):
65             tx = b['transactions'][pos]
66             if 'hash' not in tx:
67                 tx['hash'] = util.double_sha256(tx['tx'])
68             tx_id = store.tx_find_id_and_value(tx)
69             if tx_id:
70                 self.update_tx_cache(tx_id)
71             else:
72                 print "error: import_block: no tx_id"
73         return block_id
74
75
76     def update_tx_cache(self, txid):
77         inrows = self.get_tx_inputs(txid, False)
78         for row in inrows:
79             _hash = self.binout(row[6])
80             address = hash_to_address(chr(0), _hash)
81             if self.tx_cache.has_key(address):
82                 print "cache: invalidating", address
83                 self.tx_cache.pop(address)
84             self.address_queue.put(address)
85
86         outrows = self.get_tx_outputs(txid, False)
87         for row in outrows:
88             _hash = self.binout(row[6])
89             address = hash_to_address(chr(0), _hash)
90             if self.tx_cache.has_key(address):
91                 print "cache: invalidating", address
92                 self.tx_cache.pop(address)
93             self.address_queue.put(address)
94
95     def safe_sql(self,sql, params=(), lock=True):
96         try:
97             if lock: self.dblock.acquire()
98             ret = self.selectall(sql,params)
99             if lock: self.dblock.release()
100             return ret
101         except:
102             print "sql error", sql
103             return []
104
105     def get_tx_outputs(self, tx_id, lock=True):
106         return self.safe_sql("""SELECT
107                 txout.txout_pos,
108                 txout.txout_scriptPubKey,
109                 txout.txout_value,
110                 nexttx.tx_hash,
111                 nexttx.tx_id,
112                 txin.txin_pos,
113                 pubkey.pubkey_hash
114               FROM txout
115               LEFT JOIN txin ON (txin.txout_id = txout.txout_id)
116               LEFT JOIN pubkey ON (pubkey.pubkey_id = txout.pubkey_id)
117               LEFT JOIN tx nexttx ON (txin.tx_id = nexttx.tx_id)
118              WHERE txout.tx_id = %d 
119              ORDER BY txout.txout_pos
120         """%(tx_id), (), lock)
121
122     def get_tx_inputs(self, tx_id, lock=True):
123         return self.safe_sql(""" SELECT
124                 txin.txin_pos,
125                 txin.txin_scriptSig,
126                 txout.txout_value,
127                 COALESCE(prevtx.tx_hash, u.txout_tx_hash),
128                 prevtx.tx_id,
129                 COALESCE(txout.txout_pos, u.txout_pos),
130                 pubkey.pubkey_hash
131               FROM txin
132               LEFT JOIN txout ON (txout.txout_id = txin.txout_id)
133               LEFT JOIN pubkey ON (pubkey.pubkey_id = txout.pubkey_id)
134               LEFT JOIN tx prevtx ON (txout.tx_id = prevtx.tx_id)
135               LEFT JOIN unlinked_txin u ON (u.txin_id = txin.txin_id)
136              WHERE txin.tx_id = %d
137              ORDER BY txin.txin_pos
138              """%(tx_id,), (), lock)
139
140     def get_address_out_rows(self, dbhash):
141         return self.safe_sql(""" SELECT
142                 b.block_nTime,
143                 cc.chain_id,
144                 b.block_height,
145                 1,
146                 b.block_hash,
147                 tx.tx_hash,
148                 tx.tx_id,
149                 txin.txin_pos,
150                 -prevout.txout_value
151               FROM chain_candidate cc
152               JOIN block b ON (b.block_id = cc.block_id)
153               JOIN block_tx ON (block_tx.block_id = b.block_id)
154               JOIN tx ON (tx.tx_id = block_tx.tx_id)
155               JOIN txin ON (txin.tx_id = tx.tx_id)
156               JOIN txout prevout ON (txin.txout_id = prevout.txout_id)
157               JOIN pubkey ON (pubkey.pubkey_id = prevout.pubkey_id)
158              WHERE pubkey.pubkey_hash = ?
159                AND cc.in_longest = 1""", (dbhash,))
160
161     def get_address_out_rows_memorypool(self, dbhash):
162         return self.safe_sql(""" SELECT
163                 1,
164                 tx.tx_hash,
165                 tx.tx_id,
166                 txin.txin_pos,
167                 -prevout.txout_value
168               FROM tx 
169               JOIN txin ON (txin.tx_id = tx.tx_id)
170               JOIN txout prevout ON (txin.txout_id = prevout.txout_id)
171               JOIN pubkey ON (pubkey.pubkey_id = prevout.pubkey_id)
172              WHERE pubkey.pubkey_hash = ? """, (dbhash,))
173
174     def get_address_in_rows(self, dbhash):
175         return self.safe_sql(""" SELECT
176                 b.block_nTime,
177                 cc.chain_id,
178                 b.block_height,
179                 0,
180                 b.block_hash,
181                 tx.tx_hash,
182                 tx.tx_id,
183                 txout.txout_pos,
184                 txout.txout_value
185               FROM chain_candidate cc
186               JOIN block b ON (b.block_id = cc.block_id)
187               JOIN block_tx ON (block_tx.block_id = b.block_id)
188               JOIN tx ON (tx.tx_id = block_tx.tx_id)
189               JOIN txout ON (txout.tx_id = tx.tx_id)
190               JOIN pubkey ON (pubkey.pubkey_id = txout.pubkey_id)
191              WHERE pubkey.pubkey_hash = ?
192                AND cc.in_longest = 1""", (dbhash,))
193
194     def get_address_in_rows_memorypool(self, dbhash):
195         return self.safe_sql( """ SELECT
196                 0,
197                 tx.tx_hash,
198                 tx.tx_id,
199                 txout.txout_pos,
200                 txout.txout_value
201               FROM tx
202               JOIN txout ON (txout.tx_id = tx.tx_id)
203               JOIN pubkey ON (pubkey.pubkey_id = txout.pubkey_id)
204              WHERE pubkey.pubkey_hash = ? """, (dbhash,))
205
206     def get_history(self, addr):
207         
208         cached_version = self.tx_cache.get( addr )
209         if cached_version is not None:
210             return cached_version
211
212         version, binaddr = decode_check_address(addr)
213         if binaddr is None:
214             return None
215
216         dbhash = self.binin(binaddr)
217         rows = []
218         rows += self.get_address_out_rows( dbhash )
219         rows += self.get_address_in_rows( dbhash )
220
221         txpoints = []
222         known_tx = []
223
224         for row in rows:
225             try:
226                 nTime, chain_id, height, is_in, blk_hash, tx_hash, tx_id, pos, value = row
227             except:
228                 print "cannot unpack row", row
229                 break
230             tx_hash = self.hashout_hex(tx_hash)
231             txpoint = {
232                     "nTime":    int(nTime),
233                     "height":   int(height),
234                     "is_in":    int(is_in),
235                     "blk_hash": self.hashout_hex(blk_hash),
236                     "tx_hash":  tx_hash,
237                     "tx_id":    int(tx_id),
238                     "pos":      int(pos),
239                     "value":    int(value),
240                     }
241
242             txpoints.append(txpoint)
243             known_tx.append(self.hashout_hex(tx_hash))
244
245
246         # todo: sort them really...
247         txpoints = sorted(txpoints, key=operator.itemgetter("nTime"))
248
249         # read memory pool
250         rows = []
251         rows += self.get_address_in_rows_memorypool( dbhash )
252         rows += self.get_address_out_rows_memorypool( dbhash )
253         address_has_mempool = False
254
255         for row in rows:
256             is_in, tx_hash, tx_id, pos, value = row
257             tx_hash = self.hashout_hex(tx_hash)
258             if tx_hash in known_tx:
259                 continue
260
261             # this means that pending transactions were added to the db, even if they are not returned by getmemorypool
262             address_has_mempool = True
263
264             # this means pending transactions are returned by getmemorypool
265             if tx_hash not in self.mempool_keys:
266                 continue
267
268             #print "mempool", tx_hash
269             txpoint = {
270                     "nTime":    0,
271                     "height":   0,
272                     "is_in":    int(is_in),
273                     "blk_hash": 'mempool', 
274                     "tx_hash":  tx_hash,
275                     "tx_id":    int(tx_id),
276                     "pos":      int(pos),
277                     "value":    int(value),
278                     }
279             txpoints.append(txpoint)
280
281
282         for txpoint in txpoints:
283             tx_id = txpoint['tx_id']
284             
285             txinputs = []
286             inrows = self.get_tx_inputs(tx_id)
287             for row in inrows:
288                 _hash = self.binout(row[6])
289                 address = hash_to_address(chr(0), _hash)
290                 txinputs.append(address)
291             txpoint['inputs'] = txinputs
292             txoutputs = []
293             outrows = self.get_tx_outputs(tx_id)
294             for row in outrows:
295                 _hash = self.binout(row[6])
296                 address = hash_to_address(chr(0), _hash)
297                 txoutputs.append(address)
298             txpoint['outputs'] = txoutputs
299
300             # for all unspent inputs, I want their scriptpubkey. (actually I could deduce it from the address)
301             if not txpoint['is_in']:
302                 # detect if already redeemed...
303                 for row in outrows:
304                     if row[6] == dbhash: break
305                 else:
306                     raise
307                 #row = self.get_tx_output(tx_id,dbhash)
308                 # pos, script, value, o_hash, o_id, o_pos, binaddr = row
309                 # if not redeemed, we add the script
310                 if row:
311                     if not row[4]: txpoint['raw_scriptPubKey'] = row[1]
312
313         # cache result
314         if not address_has_mempool:
315             self.tx_cache[addr] = txpoints
316         
317         return txpoints
318
319
320
321     def memorypool_update(store):
322
323         ds = BCDataStream.BCDataStream()
324         previous_transactions = store.mempool_keys
325         store.mempool_keys = []
326
327         postdata = dumps({"method": 'getmemorypool', 'params': [], 'id':'jsonrpc'})
328
329         respdata = urllib.urlopen(store.bitcoind_url, postdata).read()
330         r = loads(respdata)
331         if r['error'] != None:
332             return
333
334         v = r['result'].get('transactions')
335         for hextx in v:
336             ds.clear()
337             ds.write(hextx.decode('hex'))
338             tx = deserialize.parse_Transaction(ds)
339             tx['hash'] = util.double_sha256(tx['tx'])
340             tx_hash = store.hashin(tx['hash'])
341
342     def send_tx(self,tx):
343         postdata = dumps({"method": 'importtransaction', 'params': [tx], 'id':'jsonrpc'})
344         respdata = urllib.urlopen(self.bitcoind_url, postdata).read()
345         r = loads(respdata)
346         if r['error'] != None:
347             out = "error: transaction rejected by memorypool\n"+tx
348         else:
349             out = r['result']
350         return out
351
352
353     def main_iteration(store):
354         try:
355             store.dblock.acquire()
356             store.catch_up()
357             store.memorypool_update()
358             block_number = store.get_block_number(1)
359
360         except IOError:
361             print "IOError: cannot reach bitcoind"
362             block_number = 0
363         except:
364             traceback.print_exc(file=sys.stdout)
365             block_number = 0
366         finally:
367             store.dblock.release()
368
369         return block_number
370
371
372
373 import time, json, socket, operator, thread, ast, sys, re, traceback
374 import ConfigParser
375 from json import dumps, loads
376 import urllib
377
378
379 config = ConfigParser.ConfigParser()
380 # set some defaults, which will be overwritten by the config file
381 config.add_section('server')
382 config.set('server','banner', 'Welcome to Electrum!')
383 config.set('server', 'host', 'localhost')
384 config.set('server', 'port', '50000')
385 config.set('server', 'password', '')
386 config.set('server', 'irc', 'yes')
387 config.set('server', 'ircname', 'Electrum server')
388 config.add_section('database')
389 config.set('database', 'type', 'psycopg2')
390 config.set('database', 'database', 'abe')
391
392 try:
393     f = open('/etc/electrum.conf','r')
394     config.readfp(f)
395     f.close()
396 except:
397     print "Could not read electrum.conf. I will use the default values."
398
399 try:
400     f = open('/etc/electrum.banner','r')
401     config.set('server','banner', f.read())
402     f.close()
403 except:
404     pass
405
406
407 password = config.get('server','password')
408
409 stopping = False
410 block_number = -1
411 sessions = {}
412 sessions_sub_numblocks = {} # sessions that have subscribed to the service
413
414 m_sessions = [{}] # served by http
415
416 peer_list = {}
417
418 wallets = {} # for ultra-light clients such as bccapi
419
420 from Queue import Queue
421 input_queue = Queue()
422 output_queue = Queue()
423 address_queue = Queue()
424
425
426
427
428
429
430
431 def random_string(N):
432     import random, string
433     return ''.join(random.choice(string.ascii_uppercase + string.digits) for x in range(N))
434
435     
436
437 def cmd_stop(_,__,pw):
438     global stopping
439     if password == pw:
440         stopping = True
441         return 'ok'
442     else:
443         return 'wrong password'
444
445 def cmd_load(_,__,pw):
446     if password == pw:
447         return repr( len(sessions) )
448     else:
449         return 'wrong password'
450
451
452
453
454
455 def modified_addresses(session):
456     if 1:
457         t1 = time.time()
458         addresses = session['addresses']
459         session['last_time'] = time.time()
460         ret = {}
461         k = 0
462         for addr in addresses:
463             status = get_address_status( addr )
464             msg_id, last_status = addresses.get( addr )
465             if last_status != status:
466                 addresses[addr] = msg_id, status
467                 ret[addr] = status
468
469         t2 = time.time() - t1 
470         #if t2 > 10: print "high load:", session_id, "%d/%d"%(k,len(addresses)), t2
471         return ret, addresses
472
473
474 def poll_session(session_id): 
475     # native
476     session = sessions.get(session_id)
477     if session is None:
478         print time.asctime(), "session not found", session_id
479         return -1, {}
480     else:
481         ret, addresses = modified_addresses(session)
482         if ret: sessions[session_id]['addresses'] = addresses
483         return repr( (block_number,ret))
484
485
486 def poll_session_json(session_id, message_id):
487     session = m_sessions[0].get(session_id)
488     if session is None:
489         raise BaseException("session not found %s"%session_id)
490     else:
491         out = []
492         ret, addresses = modified_addresses(session)
493         if ret: 
494             m_sessions[0][session_id]['addresses'] = addresses
495             for addr in ret:
496                 msg_id, status = addresses[addr]
497                 out.append(  { 'id':msg_id, 'result':status } )
498
499         msg_id, last_nb = session.get('numblocks')
500         if last_nb:
501             if last_nb != block_number:
502                 m_sessions[0][session_id]['numblocks'] = msg_id, block_number
503                 out.append( {'id':msg_id, 'result':block_number} )
504
505         return out
506
507
508 def do_update_address(addr):
509     # an address was involved in a transaction; we check if it was subscribed to in a session
510     # the address can be subscribed in several sessions; the cache should ensure that we don't do redundant requests
511
512     for session_id in sessions.keys():
513         session = sessions[session_id]
514         if session.get('type') != 'persistent': continue
515         addresses = session['addresses'].keys()
516
517         if addr in addresses:
518             status = get_address_status( addr )
519             message_id, last_status = session['addresses'][addr]
520             if last_status != status:
521                 #print "sending new status for %s:"%addr, status
522                 send_status(session_id,message_id,addr,status)
523                 sessions[session_id]['addresses'][addr] = (message_id,status)
524
525 def get_address_status(addr):
526     # get address status, i.e. the last block for that address.
527     tx_points = store.get_history(addr)
528     if not tx_points:
529         status = None
530     else:
531         lastpoint = tx_points[-1]
532         status = lastpoint['blk_hash']
533         # this is a temporary hack; move it up once old clients have disappeared
534         if status == 'mempool': # and session['version'] != "old":
535             status = status + ':%d'% len(tx_points)
536     return status
537
538
539 def send_numblocks(session_id):
540     message_id = sessions_sub_numblocks[session_id]
541     out = json.dumps( {'id':message_id, 'result':block_number} )
542     output_queue.put((session_id, out))
543
544 def send_status(session_id, message_id, address, status):
545     out = json.dumps( { 'id':message_id, 'result':status } )
546     output_queue.put((session_id, out))
547
548 def address_get_history_json(_,message_id,address):
549     return store.get_history(address)
550
551 def subscribe_to_numblocks(session_id, message_id):
552     sessions_sub_numblocks[session_id] = message_id
553     send_numblocks(session_id)
554
555 def subscribe_to_numblocks_json(session_id, message_id):
556     global m_sessions
557     m_sessions[0][session_id]['numblocks'] = message_id,block_number
558     return block_number
559
560 def subscribe_to_address(session_id, message_id, address):
561     status = get_address_status(address)
562     sessions[session_id]['addresses'][address] = (message_id, status)
563     sessions[session_id]['last_time'] = time.time()
564     send_status(session_id, message_id, address, status)
565
566 def add_address_to_session_json(session_id, message_id, address):
567     global m_sessions
568     sessions = m_sessions[0]
569     status = get_address_status(address)
570     sessions[session_id]['addresses'][address] = (message_id, status)
571     sessions[session_id]['last_time'] = time.time()
572     m_sessions[0] = sessions
573     return status
574
575 def add_address_to_session(session_id, address):
576     status = get_address_status(address)
577     sessions[session_id]['addresses'][address] = ("", status)
578     sessions[session_id]['last_time'] = time.time()
579     return status
580
581 def new_session(version, addresses):
582     session_id = random_string(10)
583     sessions[session_id] = { 'addresses':{}, 'version':version }
584     for a in addresses:
585         sessions[session_id]['addresses'][a] = ('','')
586     out = repr( (session_id, config.get('server','banner').replace('\\n','\n') ) )
587     sessions[session_id]['last_time'] = time.time()
588     return out
589
590
591 def client_version_json(session_id, _, version):
592     global m_sessions
593     sessions = m_sessions[0]
594     sessions[session_id]['version'] = version
595     m_sessions[0] = sessions
596
597 def create_session_json(_, __):
598     sessions = m_sessions[0]
599     session_id = random_string(10)
600     print "creating session", session_id
601     sessions[session_id] = { 'addresses':{}, 'numblocks':('','') }
602     sessions[session_id]['last_time'] = time.time()
603     m_sessions[0] = sessions
604     return session_id
605
606
607
608 def get_banner(_,__):
609     return config.get('server','banner').replace('\\n','\n')
610
611 def update_session(session_id,addresses):
612     """deprecated in 0.42"""
613     sessions[session_id]['addresses'] = {}
614     for a in addresses:
615         sessions[session_id]['addresses'][a] = ''
616     sessions[session_id]['last_time'] = time.time()
617     return 'ok'
618
619 def native_server_thread():
620     s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
621     s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
622     s.bind((config.get('server','host'), config.getint('server','port')))
623     s.listen(1)
624     while not stopping:
625         conn, addr = s.accept()
626         try:
627             thread.start_new_thread(native_client_thread, (addr, conn,))
628         except:
629             # can't start new thread if there is no memory..
630             traceback.print_exc(file=sys.stdout)
631
632
633 def native_client_thread(ipaddr,conn):
634     #print "client thread", ipaddr
635     try:
636         ipaddr = ipaddr[0]
637         msg = ''
638         while 1:
639             d = conn.recv(1024)
640             msg += d
641             if not d: 
642                 break
643             if '#' in msg:
644                 msg = msg.split('#', 1)[0]
645                 break
646         try:
647             cmd, data = ast.literal_eval(msg)
648         except:
649             print "syntax error", repr(msg), ipaddr
650             conn.close()
651             return
652
653         out = do_command(cmd, data, ipaddr)
654         if out:
655             #print ipaddr, cmd, len(out)
656             try:
657                 conn.send(out)
658             except:
659                 print "error, could not send"
660
661     finally:
662         conn.close()
663
664
665 def timestr():
666     return time.strftime("[%d/%m/%Y-%H:%M:%S]")
667
668 # used by the native handler
669 def do_command(cmd, data, ipaddr):
670
671     if cmd=='b':
672         out = "%d"%block_number
673
674     elif cmd in ['session','new_session']:
675         try:
676             if cmd == 'session':
677                 addresses = ast.literal_eval(data)
678                 version = "old"
679             else:
680                 version, addresses = ast.literal_eval(data)
681                 if version[0]=="0": version = "v" + version
682         except:
683             print "error", data
684             return None
685         print timestr(), "new session", ipaddr, addresses[0] if addresses else addresses, len(addresses), version
686         out = new_session(version, addresses)
687
688     elif cmd=='address.subscribe':
689         try:
690             session_id, addr = ast.literal_eval(data)
691         except:
692             traceback.print_exc(file=sys.stdout)
693             print data
694             return None
695         out = add_address_to_session(session_id,addr)
696
697     elif cmd=='update_session':
698         try:
699             session_id, addresses = ast.literal_eval(data)
700         except:
701             traceback.print_exc(file=sys.stdout)
702             return None
703         print timestr(), "update session", ipaddr, addresses[0] if addresses else addresses, len(addresses)
704         out = update_session(session_id,addresses)
705             
706     elif cmd=='poll': 
707         out = poll_session(data)
708
709     elif cmd == 'h': 
710         # history
711         address = data
712         out = repr( store.get_history( address ) )
713
714     elif cmd == 'load': 
715         out = cmd_load(None,None,data)
716
717     elif cmd =='tx':
718         out = store.send_tx(data)
719         print timestr(), "sent tx:", ipaddr, out
720
721     elif cmd == 'stop':
722         out = cmd_stop(data)
723
724     elif cmd == 'peers':
725         out = repr(peer_list.values())
726
727     else:
728         out = None
729
730     return out
731
732
733
734 ####################################################################
735
736 def tcp_server_thread():
737     thread.start_new_thread(process_input_queue, ())
738     thread.start_new_thread(process_output_queue, ())
739
740     s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
741     s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
742     s.bind((config.get('server','host'), 50001))
743     s.listen(1)
744     while not stopping:
745         conn, addr = s.accept()
746         try:
747             thread.start_new_thread(tcp_client_thread, (addr, conn,))
748         except:
749             # can't start new thread if there is no memory..
750             traceback.print_exc(file=sys.stdout)
751
752
753 def close_session(session_id):
754     #print "lost connection", session_id
755     sessions.pop(session_id)
756     if session_id in sessions_sub_numblocks:
757         sessions_sub_numblocks.pop(session_id)
758
759
760 # one thread per client. put requests in a queue.
761 def tcp_client_thread(ipaddr,conn):
762     """ use a persistent connection. put commands in a queue."""
763
764     print timestr(), "TCP session", ipaddr
765     global sessions
766
767     session_id = random_string(10)
768     sessions[session_id] = { 'conn':conn, 'addresses':{}, 'version':'unknown', 'type':'persistent' }
769
770     ipaddr = ipaddr[0]
771     msg = ''
772
773     while not stopping:
774         try:
775             d = conn.recv(1024)
776         except socket.error:
777             d = ''
778         if not d:
779             close_session(session_id)
780             break
781
782         msg += d
783         while True:
784             s = msg.find('\n')
785             if s ==-1:
786                 break
787             else:
788                 c = msg[0:s].strip()
789                 msg = msg[s+1:]
790                 if c == 'quit': 
791                     conn.close()
792                     close_session(session_id)
793                     return
794                 try:
795                     c = json.loads(c)
796                 except:
797                     print "json error", repr(c)
798                     continue
799                 try:
800                     message_id = c.get('id')
801                     method = c.get('method')
802                     params = c.get('params')
803                 except:
804                     print "syntax error", repr(c), ipaddr
805                     continue
806
807                 # add to queue
808                 input_queue.put((session_id, message_id, method, params))
809
810
811
812 # read commands from the input queue. perform requests, etc. this should be called from the main thread.
813 def process_input_queue():
814     while not stopping:
815         session_id, message_id, method, data = input_queue.get()
816         if session_id not in sessions.keys():
817             continue
818         out = None
819         if method == 'address.subscribe':
820             address = data[0]
821             subscribe_to_address(session_id,message_id,address)
822         elif method == 'numblocks.subscribe':
823             subscribe_to_numblocks(session_id,message_id)
824         elif method == 'client.version':
825             sessions[session_id]['version'] = data[0]
826         elif method == 'server.banner':
827             out = { 'result':config.get('server','banner').replace('\\n','\n') } 
828         elif method == 'server.peers':
829             out = { 'result':peer_list.values() } 
830         elif method == 'address.get_history':
831             address = data[0]
832             out = { 'result':store.get_history( address ) } 
833         elif method == 'transaction.broadcast':
834             postdata = dumps({"method": 'importtransaction', 'params': [data], 'id':'jsonrpc'})
835             txo = urllib.urlopen(bitcoind_url, postdata).read()
836             print "sent tx:", txo
837             out = json.loads(txo)
838         else:
839             print "unknown command", method
840         if out:
841             out['id'] = message_id
842             out = json.dumps( out )
843             output_queue.put((session_id, out))
844
845 # this is a separate thread
846 def process_output_queue():
847     while not stopping:
848         session_id, out = output_queue.get()
849         session = sessions.get(session_id)
850         if session: 
851             try:
852                 conn = session.get('conn')
853                 conn.send(out+'\n')
854             except:
855                 close_session(session_id)
856                 
857
858
859
860 ####################################################################
861
862
863
864
865 def clean_session_thread():
866     while not stopping:
867         time.sleep(30)
868         t = time.time()
869         for k,s in sessions.items():
870             if s.get('type') == 'persistent': continue
871             t0 = s['last_time']
872             if t - t0 > 5*60:
873                 sessions.pop(k)
874                 print "lost session", k
875             
876
877 def irc_thread():
878     global peer_list
879     NICK = 'E_'+random_string(10)
880     while not stopping:
881         try:
882             s = socket.socket()
883             s.connect(('irc.freenode.net', 6667))
884             s.send('USER electrum 0 * :'+config.get('server','host')+' '+config.get('server','ircname')+'\n')
885             s.send('NICK '+NICK+'\n')
886             s.send('JOIN #electrum\n')
887             sf = s.makefile('r', 0)
888             t = 0
889             while not stopping:
890                 line = sf.readline()
891                 line = line.rstrip('\r\n')
892                 line = line.split()
893                 if line[0]=='PING': 
894                     s.send('PONG '+line[1]+'\n')
895                 elif '353' in line: # answer to /names
896                     k = line.index('353')
897                     for item in line[k+1:]:
898                         if item[0:2] == 'E_':
899                             s.send('WHO %s\n'%item)
900                 elif '352' in line: # answer to /who
901                     # warning: this is a horrible hack which apparently works
902                     k = line.index('352')
903                     ip = line[k+4]
904                     ip = socket.gethostbyname(ip)
905                     name = line[k+6]
906                     host = line[k+9]
907                     peer_list[name] = (ip,host)
908                 if time.time() - t > 5*60:
909                     s.send('NAMES #electrum\n')
910                     t = time.time()
911                     peer_list = {}
912         except:
913             traceback.print_exc(file=sys.stdout)
914         finally:
915             sf.close()
916             s.close()
917
918
919 def get_peers_json(_,__):
920     return peer_list.values()
921
922 def http_server_thread():
923     # see http://code.google.com/p/jsonrpclib/
924     from SocketServer import ThreadingMixIn
925     from StratumJSONRPCServer import StratumJSONRPCServer
926     class StratumThreadedJSONRPCServer(ThreadingMixIn, StratumJSONRPCServer): pass
927     server = StratumThreadedJSONRPCServer(( config.get('server','host'), 8081))
928     server.register_function(get_peers_json, 'server.peers')
929     server.register_function(cmd_stop, 'stop')
930     server.register_function(cmd_load, 'load')
931     server.register_function(get_banner, 'server.banner')
932     server.register_function(lambda a,b,c: store.send_tx(c), 'transaction.broadcast')
933     server.register_function(address_get_history_json, 'address.get_history')
934     server.register_function(add_address_to_session_json, 'address.subscribe')
935     server.register_function(subscribe_to_numblocks_json, 'numblocks.subscribe')
936     server.register_function(client_version_json, 'client.version')
937     server.register_function(create_session_json, 'session.create')   # internal message (not part of protocol)
938     server.register_function(poll_session_json, 'session.poll')       # internal message (not part of protocol)
939     server.serve_forever()
940
941
942 if __name__ == '__main__':
943
944     if len(sys.argv)>1:
945         import jsonrpclib
946         server = jsonrpclib.Server('http://%s:8081'%config.get('server','host'))
947         cmd = sys.argv[1]
948         if cmd == 'load':
949             out = server.load(password)
950         elif cmd == 'peers':
951             out = server.server.peers()
952         elif cmd == 'stop':
953             out = server.stop(password)
954         elif cmd == 'clear_cache':
955             out = server.clear_cache(password)
956         elif cmd == 'get_cache':
957             out = server.get_cache(password,sys.argv[2])
958         elif cmd == 'h':
959             out = server.address.get_history(sys.argv[2])
960         elif cmd == 'tx':
961             out = server.transaction.broadcast(sys.argv[2])
962         elif cmd == 'b':
963             out = server.numblocks.subscribe()
964         else:
965             out = "Unknown command: '%s'" % cmd
966         print out
967         sys.exit(0)
968
969     # backend
970     # from db import MyStore
971     store = MyStore(config,address_queue)
972
973     # supported protocols
974     thread.start_new_thread(native_server_thread, ())
975     thread.start_new_thread(tcp_server_thread, ())
976     thread.start_new_thread(http_server_thread, ())
977     thread.start_new_thread(clean_session_thread, ())
978
979     if (config.get('server','irc') == 'yes' ):
980         thread.start_new_thread(irc_thread, ())
981
982     print "starting Electrum server"
983
984     old_block_number = None
985     while not stopping:
986         block_number = store.main_iteration()
987
988         if block_number != old_block_number:
989             old_block_number = block_number
990             for session_id in sessions_sub_numblocks.keys():
991                 send_numblocks(session_id)
992         while True:
993             try:
994                 addr = address_queue.get(False)
995             except:
996                 break
997             do_update_address(addr)
998
999         time.sleep(10)
1000     print "server stopped"
1001