server.peers rpc
[electrum-server.git] / server.py
1 #!/usr/bin/env python
2 # Copyright(C) 2011 thomasv@gitorious
3
4 # This program is free software: you can redistribute it and/or modify
5 # it under the terms of the GNU Affero General Public License as
6 # published by the Free Software Foundation, either version 3 of the
7 # License, or (at your option) any later version.
8 #
9 # This program is distributed in the hope that it will be useful, but
10 # WITHOUT ANY WARRANTY; without even the implied warranty of
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
12 # Affero General Public License for more details.
13 #
14 # You should have received a copy of the GNU Affero General Public
15 # License along with this program.  If not, see
16 # <http://www.gnu.org/licenses/agpl.html>.
17
18 """
19 Todo:
20    * server should check and return bitcoind status..
21    * improve txpoint sorting
22    * command to check cache
23
24  mempool transactions do not need to be added to the database; it slows it down
25 """
26
27
28 import time, json, socket, operator, thread, ast, sys,re
29 import psycopg2, binascii
30
31 from Abe.abe import hash_to_address, decode_check_address
32 from Abe.DataStore import DataStore as Datastore_class
33 from Abe import DataStore, readconf, BCDataStream,  deserialize, util, base58
34
35 import ConfigParser
36 from json import dumps, loads
37 import urllib
38
39 # we need to import electrum
40 sys.path.append('../client/')
41 from wallet import Wallet
42 from interface import Interface
43
44
45 config = ConfigParser.ConfigParser()
46 # set some defaults, which will be overwritten by the config file
47 config.add_section('server')
48 config.set('server','banner', 'Welcome to Electrum!')
49 config.set('server', 'host', 'localhost')
50 config.set('server', 'port', 50000)
51 config.set('server', 'password', '')
52 config.set('server', 'irc', 'yes')
53 config.set('server', 'cache', 'no') 
54 config.set('server', 'ircname', 'Electrum server')
55 config.add_section('database')
56 config.set('database', 'type', 'psycopg2')
57 config.set('database', 'database', 'abe')
58
59 try:
60     f = open('/etc/electrum.conf','r')
61     config.readfp(f)
62     f.close()
63 except:
64     print "Could not read electrum.conf. I will use the default values."
65
66 try:
67     f = open('/etc/electrum.banner','r')
68     config.set('server','banner', f.read())
69     f.close()
70 except:
71     pass
72
73 password = config.get('server','password')
74 bitcoind_url = 'http://%s:%s@%s:%s/' % ( config.get('bitcoind','user'), config.get('bitcoind','password'), config.get('bitcoind','host'), config.get('bitcoind','port'))
75
76 stopping = False
77 block_number = -1
78 old_block_number = -1
79 sessions = {}
80 sessions_sub_numblocks = [] # sessions that have subscribed to the service
81
82 dblock = thread.allocate_lock()
83 peer_list = {}
84
85 wallets = {} # for ultra-light clients such as bccapi
86
87 from Queue import Queue
88 input_queue = Queue()
89 output_queue = Queue()
90 address_queue = Queue()
91
92 class MyStore(Datastore_class):
93
94     def import_tx(self, tx, is_coinbase):
95         tx_id = super(MyStore, self).import_tx(tx, is_coinbase)
96         if config.get('server', 'cache') == 'yes': self.update_tx_cache(tx_id)
97
98     def update_tx_cache(self, txid):
99         inrows = self.get_tx_inputs(txid, False)
100         for row in inrows:
101             _hash = store.binout(row[6])
102             address = hash_to_address(chr(0), _hash)
103             if self.tx_cache.has_key(address):
104                 print "cache: invalidating", address
105                 self.tx_cache.pop(address)
106             address_queue.put(address)
107
108         outrows = self.get_tx_outputs(txid, False)
109         for row in outrows:
110             _hash = store.binout(row[6])
111             address = hash_to_address(chr(0), _hash)
112             if self.tx_cache.has_key(address):
113                 print "cache: invalidating", address
114                 self.tx_cache.pop(address)
115             address_queue.put(address)
116
117     def safe_sql(self,sql, params=(), lock=True):
118         try:
119             if lock: dblock.acquire()
120             ret = self.selectall(sql,params)
121             if lock: dblock.release()
122             return ret
123         except:
124             print "sql error", sql
125             return []
126
127     def get_tx_outputs(self, tx_id, lock=True):
128         return self.safe_sql("""SELECT
129                 txout.txout_pos,
130                 txout.txout_scriptPubKey,
131                 txout.txout_value,
132                 nexttx.tx_hash,
133                 nexttx.tx_id,
134                 txin.txin_pos,
135                 pubkey.pubkey_hash
136               FROM txout
137               LEFT JOIN txin ON (txin.txout_id = txout.txout_id)
138               LEFT JOIN pubkey ON (pubkey.pubkey_id = txout.pubkey_id)
139               LEFT JOIN tx nexttx ON (txin.tx_id = nexttx.tx_id)
140              WHERE txout.tx_id = %d 
141              ORDER BY txout.txout_pos
142         """%(tx_id), (), lock)
143
144     def get_tx_inputs(self, tx_id, lock=True):
145         return self.safe_sql(""" SELECT
146                 txin.txin_pos,
147                 txin.txin_scriptSig,
148                 txout.txout_value,
149                 COALESCE(prevtx.tx_hash, u.txout_tx_hash),
150                 prevtx.tx_id,
151                 COALESCE(txout.txout_pos, u.txout_pos),
152                 pubkey.pubkey_hash
153               FROM txin
154               LEFT JOIN txout ON (txout.txout_id = txin.txout_id)
155               LEFT JOIN pubkey ON (pubkey.pubkey_id = txout.pubkey_id)
156               LEFT JOIN tx prevtx ON (txout.tx_id = prevtx.tx_id)
157               LEFT JOIN unlinked_txin u ON (u.txin_id = txin.txin_id)
158              WHERE txin.tx_id = %d
159              ORDER BY txin.txin_pos
160              """%(tx_id,), (), lock)
161
162     def get_address_out_rows(self, dbhash):
163         return self.safe_sql(""" SELECT
164                 b.block_nTime,
165                 cc.chain_id,
166                 b.block_height,
167                 1,
168                 b.block_hash,
169                 tx.tx_hash,
170                 tx.tx_id,
171                 txin.txin_pos,
172                 -prevout.txout_value
173               FROM chain_candidate cc
174               JOIN block b ON (b.block_id = cc.block_id)
175               JOIN block_tx ON (block_tx.block_id = b.block_id)
176               JOIN tx ON (tx.tx_id = block_tx.tx_id)
177               JOIN txin ON (txin.tx_id = tx.tx_id)
178               JOIN txout prevout ON (txin.txout_id = prevout.txout_id)
179               JOIN pubkey ON (pubkey.pubkey_id = prevout.pubkey_id)
180              WHERE pubkey.pubkey_hash = ?
181                AND cc.in_longest = 1""", (dbhash,))
182
183     def get_address_out_rows_memorypool(self, dbhash):
184         return self.safe_sql(""" SELECT
185                 1,
186                 tx.tx_hash,
187                 tx.tx_id,
188                 txin.txin_pos,
189                 -prevout.txout_value
190               FROM tx 
191               JOIN txin ON (txin.tx_id = tx.tx_id)
192               JOIN txout prevout ON (txin.txout_id = prevout.txout_id)
193               JOIN pubkey ON (pubkey.pubkey_id = prevout.pubkey_id)
194              WHERE pubkey.pubkey_hash = ? """, (dbhash,))
195
196     def get_address_in_rows(self, dbhash):
197         return self.safe_sql(""" SELECT
198                 b.block_nTime,
199                 cc.chain_id,
200                 b.block_height,
201                 0,
202                 b.block_hash,
203                 tx.tx_hash,
204                 tx.tx_id,
205                 txout.txout_pos,
206                 txout.txout_value
207               FROM chain_candidate cc
208               JOIN block b ON (b.block_id = cc.block_id)
209               JOIN block_tx ON (block_tx.block_id = b.block_id)
210               JOIN tx ON (tx.tx_id = block_tx.tx_id)
211               JOIN txout ON (txout.tx_id = tx.tx_id)
212               JOIN pubkey ON (pubkey.pubkey_id = txout.pubkey_id)
213              WHERE pubkey.pubkey_hash = ?
214                AND cc.in_longest = 1""", (dbhash,))
215
216     def get_address_in_rows_memorypool(self, dbhash):
217         return self.safe_sql( """ SELECT
218                 0,
219                 tx.tx_hash,
220                 tx.tx_id,
221                 txout.txout_pos,
222                 txout.txout_value
223               FROM tx
224               JOIN txout ON (txout.tx_id = tx.tx_id)
225               JOIN pubkey ON (pubkey.pubkey_id = txout.pubkey_id)
226              WHERE pubkey.pubkey_hash = ? """, (dbhash,))
227
228     def get_history(self, addr):
229         
230         if config.get('server','cache') == 'yes':
231             cached_version = self.tx_cache.get( addr )
232             if cached_version is not None:
233                 return cached_version
234
235         version, binaddr = decode_check_address(addr)
236         if binaddr is None:
237             return None
238
239         dbhash = self.binin(binaddr)
240         rows = []
241         rows += self.get_address_out_rows( dbhash )
242         rows += self.get_address_in_rows( dbhash )
243
244         txpoints = []
245         known_tx = []
246
247         for row in rows:
248             try:
249                 nTime, chain_id, height, is_in, blk_hash, tx_hash, tx_id, pos, value = row
250             except:
251                 print "cannot unpack row", row
252                 break
253             tx_hash = self.hashout_hex(tx_hash)
254             txpoint = {
255                     "nTime":    int(nTime),
256                     "height":   int(height),
257                     "is_in":    int(is_in),
258                     "blk_hash": self.hashout_hex(blk_hash),
259                     "tx_hash":  tx_hash,
260                     "tx_id":    int(tx_id),
261                     "pos":      int(pos),
262                     "value":    int(value),
263                     }
264
265             txpoints.append(txpoint)
266             known_tx.append(self.hashout_hex(tx_hash))
267
268
269         # todo: sort them really...
270         txpoints = sorted(txpoints, key=operator.itemgetter("nTime"))
271
272         # read memory pool
273         rows = []
274         rows += self.get_address_in_rows_memorypool( dbhash )
275         rows += self.get_address_out_rows_memorypool( dbhash )
276         address_has_mempool = False
277
278         for row in rows:
279             is_in, tx_hash, tx_id, pos, value = row
280             tx_hash = self.hashout_hex(tx_hash)
281             if tx_hash in known_tx:
282                 continue
283
284             # this means that pending transactions were added to the db, even if they are not returned by getmemorypool
285             address_has_mempool = True
286
287             # this means pending transactions are returned by getmemorypool
288             if tx_hash not in self.mempool_keys:
289                 continue
290
291             #print "mempool", tx_hash
292             txpoint = {
293                     "nTime":    0,
294                     "height":   0,
295                     "is_in":    int(is_in),
296                     "blk_hash": 'mempool', 
297                     "tx_hash":  tx_hash,
298                     "tx_id":    int(tx_id),
299                     "pos":      int(pos),
300                     "value":    int(value),
301                     }
302             txpoints.append(txpoint)
303
304
305         for txpoint in txpoints:
306             tx_id = txpoint['tx_id']
307             
308             txinputs = []
309             inrows = self.get_tx_inputs(tx_id)
310             for row in inrows:
311                 _hash = self.binout(row[6])
312                 address = hash_to_address(chr(0), _hash)
313                 txinputs.append(address)
314             txpoint['inputs'] = txinputs
315             txoutputs = []
316             outrows = self.get_tx_outputs(tx_id)
317             for row in outrows:
318                 _hash = self.binout(row[6])
319                 address = hash_to_address(chr(0), _hash)
320                 txoutputs.append(address)
321             txpoint['outputs'] = txoutputs
322
323             # for all unspent inputs, I want their scriptpubkey. (actually I could deduce it from the address)
324             if not txpoint['is_in']:
325                 # detect if already redeemed...
326                 for row in outrows:
327                     if row[6] == dbhash: break
328                 else:
329                     raise
330                 #row = self.get_tx_output(tx_id,dbhash)
331                 # pos, script, value, o_hash, o_id, o_pos, binaddr = row
332                 # if not redeemed, we add the script
333                 if row:
334                     if not row[4]: txpoint['raw_scriptPubKey'] = row[1]
335
336         # cache result
337         if config.get('server','cache') == 'yes' and not address_has_mempool:
338             self.tx_cache[addr] = txpoints
339         
340         return txpoints
341
342
343
344 class Direct_Interface(Interface):
345     def __init__(self):
346         pass
347
348     def handler(self, method, params = ''):
349         cmds = {'session.new':new_session,
350                 'session.poll':poll_session,
351                 'session.update':update_session,
352                 'blockchain.transaction.broadcast':send_tx,
353                 'blockchain.address.get_history':store.get_history
354                 }
355         func = cmds[method]
356         return func( params )
357
358
359
360 def send_tx(tx):
361     postdata = dumps({"method": 'importtransaction', 'params': [tx], 'id':'jsonrpc'})
362     respdata = urllib.urlopen(bitcoind_url, postdata).read()
363     r = loads(respdata)
364     if r['error'] != None:
365         out = "error: transaction rejected by memorypool\n"+tx
366     else:
367         out = r['result']
368     return out
369
370
371
372 def random_string(N):
373     import random, string
374     return ''.join(random.choice(string.ascii_uppercase + string.digits) for x in range(N))
375
376     
377
378 def cmd_stop(data):
379     global stopping
380     if password == data:
381         stopping = True
382         return 'ok'
383     else:
384         return 'wrong password'
385
386 def cmd_load(pw):
387     if password == pw:
388         return repr( len(sessions) )
389     else:
390         return 'wrong password'
391
392
393 def clear_cache(pw):
394     if password == pw:
395         store.tx_cache = {}
396         return 'ok'
397     else:
398         return 'wrong password'
399
400 def get_cache(pw,addr):
401     if password == pw:
402         return store.tx_cache.get(addr)
403     else:
404         return 'wrong password'
405
406
407 def poll_session(session_id):
408     session = sessions.get(session_id)
409     if session is None:
410         print time.asctime(), "session not found", session_id
411         out = repr( (-1, {}))
412     else:
413         t1 = time.time()
414         addresses = session['addresses']
415         session['last_time'] = time.time()
416         ret = {}
417         k = 0
418         for addr in addresses:
419             if store.tx_cache.get( addr ) is not None: k += 1
420             status = get_address_status( addr )
421             last_status = addresses.get( addr )
422             if last_status != status:
423                 addresses[addr] = status
424                 ret[addr] = status
425         if ret:
426             sessions[session_id]['addresses'] = addresses
427         out = repr( (block_number, ret ) )
428         t2 = time.time() - t1 
429         if t2 > 10:
430             print "high load:", session_id, "%d/%d"%(k,len(addresses)), t2
431
432         return out
433
434
435 def do_update_address(addr):
436     # an address was involved in a transaction; we check if it was subscribed to in a session
437     # the address can be subscribed in several sessions; the cache should ensure that we don't do redundant requests
438     for session_id in sessions.keys():
439         session = sessions[session_id]
440         if session.get('type') != 'subscribe': continue
441         addresses = session['addresses'].keys()
442
443         if addr in addresses:
444             print "address ", addr, "found in session", session_id
445             status = get_address_status( addr )
446             print "new_status:", status
447             last_status = session['addresses'][addr]
448             print "last_status", last_status
449             if last_status != status:
450                 print "status is new", addr
451                 send_status(session_id,addr,status)
452                 sessions[session_id]['addresses'][addr] = status
453
454
455 def get_address_status(addr):
456     # get address status, i.e. the last block for that address.
457     tx_points = store.get_history(addr)
458     if not tx_points:
459         status = None
460     else:
461         lastpoint = tx_points[-1]
462         status = lastpoint['blk_hash']
463         # this is a temporary hack; move it up once old clients have disappeared
464         if status == 'mempool': # and session['version'] != "old":
465             status = status + ':%d'% len(tx_points)
466     return status
467
468
469 def send_numblocks(session_id):
470     out = json.dumps( {'method':'numblocks.subscribe', 'result':block_number} )
471     output_queue.put((session_id, out))
472
473 def send_status(session_id, address, status):
474     out = json.dumps( { 'method':'address.subscribe', 'address':address, 'status':status } )
475     output_queue.put((session_id, out))
476
477 def subscribe_to_numblocks(session_id):
478     sessions_sub_numblocks.append(session_id)
479     send_numblocks(session_id)
480
481 def subscribe_to_address(session_id, address):
482     status = get_address_status(address)
483     sessions[session_id]['type'] = 'subscribe'
484     sessions[session_id]['addresses'][address] = status
485     sessions[session_id]['last_time'] = time.time()
486     send_status(session_id, address, status)
487
488 def new_session(version, addresses):
489     session_id = random_string(10)
490     sessions[session_id] = { 'addresses':{}, 'version':version }
491     for a in addresses:
492         sessions[session_id]['addresses'][a] = ''
493     out = repr( (session_id, config.get('server','banner').replace('\\n','\n') ) )
494     sessions[session_id]['last_time'] = time.time()
495     return out
496
497 def update_session(session_id,addresses):
498     sessions[session_id]['addresses'] = {}
499     for a in addresses:
500         sessions[session_id]['addresses'][a] = ''
501     sessions[session_id]['last_time'] = time.time()
502     return 'ok'
503
504 def native_server_thread():
505     s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
506     s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
507     s.bind((config.get('server','host'), config.getint('server','port')))
508     s.listen(1)
509     while not stopping:
510         conn, addr = s.accept()
511         try:
512             thread.start_new_thread(native_client_thread, (addr, conn,))
513         except:
514             # can't start new thread if there is no memory..
515             traceback.print_exc(file=sys.stdout)
516
517
518 def native_client_thread(ipaddr,conn):
519     #print "client thread", ipaddr
520     try:
521         ipaddr = ipaddr[0]
522         msg = ''
523         while 1:
524             d = conn.recv(1024)
525             msg += d
526             if not d: 
527                 break
528             if '#' in msg:
529                 msg = msg.split('#', 1)[0]
530                 break
531         try:
532             cmd, data = ast.literal_eval(msg)
533         except:
534             print "syntax error", repr(msg), ipaddr
535             conn.close()
536             return
537
538         out = do_command(cmd, data, ipaddr)
539         if out:
540             #print ipaddr, cmd, len(out)
541             try:
542                 conn.send(out)
543             except:
544                 print "error, could not send"
545
546     finally:
547         conn.close()
548
549
550
551 # used by the native handler
552 def do_command(cmd, data, ipaddr):
553
554     timestr = time.strftime("[%d/%m/%Y-%H:%M:%S]")
555
556     if cmd=='b':
557         out = "%d"%block_number
558
559     elif cmd in ['session','new_session']:
560         try:
561             if cmd == 'session':
562                 addresses = ast.literal_eval(data)
563                 version = "old"
564             else:
565                 version, addresses = ast.literal_eval(data)
566                 if version[0]=="0": version = "v" + version
567         except:
568             print "error", data
569             return None
570         print timestr, "new session", ipaddr, addresses[0] if addresses else addresses, len(addresses), version
571         out = new_session(version, addresses)
572
573     elif cmd=='update_session':
574         try:
575             session_id, addresses = ast.literal_eval(data)
576         except:
577             print "error"
578             return None
579         print timestr, "update session", ipaddr, addresses[0] if addresses else addresses, len(addresses)
580         out = update_session(session_id,addresses)
581
582     elif cmd == 'bccapi_login':
583         import electrum
584         print "data",data
585         v, k = ast.literal_eval(data)
586         master_public_key = k.decode('hex') # todo: sanitize. no need to decode twice...
587         print master_public_key
588         wallet_id = random_string(10)
589         w = Wallet( Direct_Interface() )
590         w.master_public_key = master_public_key.decode('hex')
591         w.synchronize()
592         wallets[wallet_id] = w
593         out = wallet_id
594         print "wallets", wallets
595
596     elif cmd == 'bccapi_getAccountInfo':
597         from wallet import int_to_hex
598         v, wallet_id = ast.literal_eval(data)
599         w = wallets.get(wallet_id)
600         if w is not None:
601             num = len(w.addresses)
602             c, u = w.get_balance()
603             out = int_to_hex(num,4) + int_to_hex(c,8) + int_to_hex( c+u, 8 )
604             out = out.decode('hex')
605         else:
606             print "error",data
607             out = "error"
608
609     elif cmd == 'bccapi_getAccountStatement':
610         from wallet import int_to_hex
611         v, wallet_id = ast.literal_eval(data)
612         w = wallets.get(wallet_id)
613         if w is not None:
614             num = len(w.addresses)
615             c, u = w.get_balance()
616             total_records = num_records = 0
617             out = int_to_hex(num,4) + int_to_hex(c,8) + int_to_hex( c+u, 8 ) + int_to_hex( total_records ) + int_to_hex( num_records )
618             out = out.decode('hex')
619         else:
620             print "error",data
621             out = "error"
622
623     elif cmd == 'bccapi_getSendCoinForm':
624         out = ''
625
626     elif cmd == 'bccapi_submitTransaction':
627         out = ''
628             
629     elif cmd=='poll': 
630         out = poll_session(data)
631
632     elif cmd == 'h': 
633         # history
634         address = data
635         out = repr( store.get_history( address ) )
636
637     elif cmd == 'load': 
638         out = cmd_load(data)
639
640     elif cmd =='tx':
641         out = send_tx(data)
642         print timestr, "sent tx:", ipaddr, out
643
644     elif cmd == 'stop':
645         out = cmd_stop(data)
646
647     elif cmd == 'peers':
648         out = repr(peer_list.values())
649
650     else:
651         out = None
652
653     return out
654
655
656
657 ####################################################################
658
659 def tcp_server_thread():
660     thread.start_new_thread(process_input_queue, ())
661     thread.start_new_thread(process_output_queue, ())
662
663     s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
664     s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
665     s.bind((config.get('server','host'), 50001))
666     s.listen(1)
667     while not stopping:
668         conn, addr = s.accept()
669         try:
670             thread.start_new_thread(tcp_client_thread, (addr, conn,))
671         except:
672             # can't start new thread if there is no memory..
673             traceback.print_exc(file=sys.stdout)
674
675
676 def close_sesion(session_id):
677     print "lost connection", session_id
678     sessions.pop(session_id)
679     sessions_sub_numblocks.remove(session_id)
680
681
682 # one thread per client. put requests in a queue.
683 def tcp_client_thread(ipaddr,conn):
684     """ use a persistent connection. put commands in a queue."""
685     print "persistent client thread", ipaddr
686     global sessions
687
688     session_id = random_string(10)
689     sessions[session_id] = { 'conn':conn, 'addresses':{}, 'version':'unknown' }
690
691     ipaddr = ipaddr[0]
692     msg = ''
693
694     while not stopping:
695         d = conn.recv(1024)
696         msg += d
697         if not d:
698             close_sesion(session_id)
699             break
700
701         while True:
702             s = msg.find('\n')
703             if s ==-1:
704                 break
705             else:
706                 c = msg[0:s]
707                 msg = msg[s+1:]
708                 c = json.loads(c)
709                 try:
710                     cmd = c['method']
711                     data = c['params']
712                 except:
713                     print "syntax error", repr(c), ipaddr
714                     continue
715
716                 # add to queue
717                 input_queue.put((session_id, cmd, data))
718
719
720 # read commands from the input queue. perform requests, etc. this should be called from the main thread.
721 def process_input_queue():
722     while not stopping:
723         session_id, cmd, data = input_queue.get()
724         out = None
725         if cmd == 'address.subscribe':
726             subscribe_to_address(session_id,data)
727         elif cmd == 'numblocks.subscribe':
728             subscribe_to_numblocks(session_id)
729         elif cmd == 'client.version':
730             sessions[session_id]['version'] = data
731         elif cmd == 'server.banner':
732             out = json.dumps( { 'method':'server.banner', 'result':config.get('server','banner').replace('\\n','\n') } )
733         elif cmd == 'server.peers':
734             out = json.dumps( { 'method':'server.peers', 'result':peer_list.values() } )
735         elif cmd == 'address.get_history':
736             address = data
737             out = json.dumps( { 'method':'address.get_history', 'address':address, 'result':store.get_history( address ) } )
738         elif cmd == 'transaction.broadcast':
739             txo = send_tx(data)
740             print "sent tx:", txo
741             out = json.dumps( { 'method':'transaction.broadcast', 'result':txo } )
742         else:
743             print "unknown command", cmd
744         if out:
745             output_queue.put((session_id, out))
746
747 # this is a separate thread
748 def process_output_queue():
749     while not stopping:
750         session_id, out = output_queue.get()
751         session = sessions.get(session_id)
752         if session: 
753             try:
754                 conn = session.get('conn')
755                 conn.send(out+'\n')
756             except:
757                 close_session(session_id)
758                 
759
760
761
762 ####################################################################
763
764
765 def memorypool_update(store):
766     ds = BCDataStream.BCDataStream()
767     store.mempool_keys = []
768
769     postdata = dumps({"method": 'getmemorypool', 'params': [], 'id':'jsonrpc'})
770     respdata = urllib.urlopen(bitcoind_url, postdata).read()
771     r = loads(respdata)
772     if r['error'] != None:
773         return
774
775     v = r['result'].get('transactions')
776     for hextx in v:
777         ds.clear()
778         ds.write(hextx.decode('hex'))
779         tx = deserialize.parse_Transaction(ds)
780         tx['hash'] = util.double_sha256(tx['tx'])
781         tx_hash = tx['hash'][::-1].encode('hex')
782         store.mempool_keys.append(tx_hash)
783         if store.tx_find_id_and_value(tx):
784             pass
785         else:
786             store.import_tx(tx, False)
787
788     store.commit()
789
790
791
792 def clean_session_thread():
793     while not stopping:
794         time.sleep(30)
795         t = time.time()
796         for k,s in sessions.items():
797             if s.get('type') == 'subscribe': continue
798             t0 = s['last_time']
799             if t - t0 > 5*60:
800                 sessions.pop(k)
801                 print "lost session", k
802             
803
804 def irc_thread():
805     global peer_list
806     NICK = 'E_'+random_string(10)
807     while not stopping:
808         try:
809             s = socket.socket()
810             s.connect(('irc.freenode.net', 6667))
811             s.send('USER electrum 0 * :'+config.get('server','host')+' '+config.get('server','ircname')+'\n')
812             s.send('NICK '+NICK+'\n')
813             s.send('JOIN #electrum\n')
814             sf = s.makefile('r', 0)
815             t = 0
816             while not stopping:
817                 line = sf.readline()
818                 line = line.rstrip('\r\n')
819                 line = line.split()
820                 if line[0]=='PING': 
821                     s.send('PONG '+line[1]+'\n')
822                 elif '353' in line: # answer to /names
823                     k = line.index('353')
824                     for item in line[k+1:]:
825                         if item[0:2] == 'E_':
826                             s.send('WHO %s\n'%item)
827                 elif '352' in line: # answer to /who
828                     # warning: this is a horrible hack which apparently works
829                     k = line.index('352')
830                     ip = line[k+4]
831                     ip = socket.gethostbyname(ip)
832                     name = line[k+6]
833                     host = line[k+9]
834                     peer_list[name] = (ip,host)
835                 if time.time() - t > 5*60:
836                     s.send('NAMES #electrum\n')
837                     t = time.time()
838                     peer_list = {}
839         except:
840             traceback.print_exc(file=sys.stdout)
841         finally:
842             sf.close()
843             s.close()
844
845
846
847 def http_server_thread(store):
848     # see http://code.google.com/p/jsonrpclib/
849     from SocketServer import ThreadingMixIn
850     from jsonrpclib.SimpleJSONRPCServer import SimpleJSONRPCServer
851     class SimpleThreadedJSONRPCServer(ThreadingMixIn, SimpleJSONRPCServer): pass
852     server = SimpleThreadedJSONRPCServer(( config.get('server','host'), 8081))
853     server.register_function(lambda : peer_list.values(), 'peers')
854     server.register_function(cmd_stop, 'stop')
855     server.register_function(cmd_load, 'load')
856     server.register_function(lambda : block_number, 'blocks')
857     server.register_function(clear_cache, 'clear_cache')
858     server.register_function(get_cache, 'get_cache')
859     server.register_function(send_tx, 'blockchain.transaction.broadcast')
860     server.register_function(store.get_history, 'blockchain.address.get_history')
861     server.register_function(new_session, 'session.new')
862     server.register_function(update_session, 'session.update')
863     server.register_function(poll_session, 'session.poll')
864     server.serve_forever()
865
866
867 import traceback
868
869
870 if __name__ == '__main__':
871
872     if len(sys.argv)>1:
873         import jsonrpclib
874         server = jsonrpclib.Server('http://%s:8081'%config.get('server','host'))
875         cmd = sys.argv[1]
876         if cmd == 'load':
877             out = server.load(password)
878         elif cmd == 'peers':
879             out = server.peers()
880         elif cmd == 'stop':
881             out = server.stop(password)
882         elif cmd == 'clear_cache':
883             out = server.clear_cache(password)
884         elif cmd == 'get_cache':
885             out = server.get_cache(password,sys.argv[2])
886         elif cmd == 'h':
887             out = server.blockchain.address.get_history(sys.argv[2])
888         elif cmd == 'tx':
889             out = server.blockchain.transaction.broadcast(sys.argv[2])
890         elif cmd == 'b':
891             out = server.blocks()
892         else:
893             out = "Unknown command: '%s'" % cmd
894         print out
895         sys.exit(0)
896
897
898     print "starting Electrum server"
899     print "cache:", config.get('server', 'cache')
900
901     conf = DataStore.CONFIG_DEFAULTS
902     args, argv = readconf.parse_argv( [], conf)
903     args.dbtype= config.get('database','type')
904     if args.dbtype == 'sqlite3':
905         args.connect_args = { 'database' : config.get('database','database') }
906     elif args.dbtype == 'MySQLdb':
907         args.connect_args = { 'db' : config.get('database','database'), 'user' : config.get('database','username'), 'passwd' : config.get('database','password') }
908     elif args.dbtype == 'psycopg2':
909         args.connect_args = { 'database' : config.get('database','database') }
910     store = MyStore(args)
911     store.tx_cache = {}
912     store.mempool_keys = {}
913
914     # supported protocols
915     thread.start_new_thread(native_server_thread, ())
916     thread.start_new_thread(tcp_server_thread, ())
917     thread.start_new_thread(http_server_thread, (store,))
918
919     thread.start_new_thread(clean_session_thread, ())
920
921     if (config.get('server','irc') == 'yes' ):
922         thread.start_new_thread(irc_thread, ())
923
924     while not stopping:
925         try:
926             dblock.acquire()
927             store.catch_up()
928             memorypool_update(store)
929             block_number = store.get_block_number(1)
930
931             if block_number != old_block_number:
932                 old_block_number = block_number
933                 for session_id in sessions_sub_numblocks:
934                     send_numblocks(session_id)
935
936         except IOError:
937             print "IOError: cannot reach bitcoind"
938             block_number = 0
939         except:
940             traceback.print_exc(file=sys.stdout)
941             block_number = 0
942         finally:
943             dblock.release()
944
945         # do addresses
946         while True:
947             try:
948                 addr = address_queue.get(False)
949             except:
950                 break
951             do_update_address(addr)
952
953         time.sleep(10)
954
955     print "server stopped"
956