reset peer list before update
[electrum-server.git] / server.py
1 #!/usr/bin/env python
2 # Copyright(C) 2011 thomasv@gitorious
3
4 # This program is free software: you can redistribute it and/or modify
5 # it under the terms of the GNU Affero General Public License as
6 # published by the Free Software Foundation, either version 3 of the
7 # License, or (at your option) any later version.
8 #
9 # This program is distributed in the hope that it will be useful, but
10 # WITHOUT ANY WARRANTY; without even the implied warranty of
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
12 # Affero General Public License for more details.
13 #
14 # You should have received a copy of the GNU Affero General Public
15 # License along with this program.  If not, see
16 # <http://www.gnu.org/licenses/agpl.html>.
17
18 """
19 Todo:
20    * server should check and return bitcoind status..
21    * improve txpoint sorting
22 """
23
24
25 import time, socket, operator, thread, ast, sys,re
26 import psycopg2, binascii
27 import bitcoinrpc
28
29 from Abe.abe import hash_to_address, decode_check_address
30 from Abe.DataStore import DataStore as Datastore_class
31 from Abe import DataStore, readconf, BCDataStream,  deserialize, util, base58
32
33 import ConfigParser
34
35 config = ConfigParser.ConfigParser()
36 # set some defaults, which will be overwritten by the config file
37 config.add_section('server')
38 config.set('server','banner', 'Welcome to Electrum!')
39 config.set('server', 'host', 'ecdsa.org')
40 config.set('server', 'port', 50000)
41 config.set('server', 'password', '')
42 config.set('server', 'irc', 'yes')
43 config.set('server', 'ircname', 'Electrum server')
44 config.add_section('database')
45 config.set('database', 'type', 'psycopg2')
46 config.set('database', 'database', 'abe')
47
48 try:
49     f = open('/etc/electrum.conf','r')
50     config.readfp(f)
51     f.close()
52 except:
53     print "Could not read electrum.conf. I will use the default values."
54
55 stopping = False
56 block_number = -1
57 sessions = {}
58 sessions_last_time = {}
59 dblock = thread.allocate_lock()
60
61 peer_list = {}
62
63 class MyStore(Datastore_class):
64
65     def safe_sql(self,sql, params=()):
66         try:
67             dblock.acquire()
68             ret = self.selectall(sql,params)
69             dblock.release()
70             return ret
71         except:
72             print "sql error", sql
73             return []
74
75     def get_tx_outputs(self, tx_id):
76         return self.safe_sql("""SELECT
77                 txout.txout_pos,
78                 txout.txout_scriptPubKey,
79                 txout.txout_value,
80                 nexttx.tx_hash,
81                 nexttx.tx_id,
82                 txin.txin_pos,
83                 pubkey.pubkey_hash
84               FROM txout
85               LEFT JOIN txin ON (txin.txout_id = txout.txout_id)
86               LEFT JOIN pubkey ON (pubkey.pubkey_id = txout.pubkey_id)
87               LEFT JOIN tx nexttx ON (txin.tx_id = nexttx.tx_id)
88              WHERE txout.tx_id = %d 
89              ORDER BY txout.txout_pos
90         """%(tx_id))
91
92     def get_tx_inputs(self, tx_id):
93         return self.safe_sql(""" SELECT
94                 txin.txin_pos,
95                 txin.txin_scriptSig,
96                 txout.txout_value,
97                 COALESCE(prevtx.tx_hash, u.txout_tx_hash),
98                 prevtx.tx_id,
99                 COALESCE(txout.txout_pos, u.txout_pos),
100                 pubkey.pubkey_hash
101               FROM txin
102               LEFT JOIN txout ON (txout.txout_id = txin.txout_id)
103               LEFT JOIN pubkey ON (pubkey.pubkey_id = txout.pubkey_id)
104               LEFT JOIN tx prevtx ON (txout.tx_id = prevtx.tx_id)
105               LEFT JOIN unlinked_txin u ON (u.txin_id = txin.txin_id)
106              WHERE txin.tx_id = %d
107              ORDER BY txin.txin_pos
108              """%(tx_id,))
109
110     def get_address_out_rows(self, dbhash):
111         return self.safe_sql(""" SELECT
112                 b.block_nTime,
113                 cc.chain_id,
114                 b.block_height,
115                 1,
116                 b.block_hash,
117                 tx.tx_hash,
118                 tx.tx_id,
119                 txin.txin_pos,
120                 -prevout.txout_value
121               FROM chain_candidate cc
122               JOIN block b ON (b.block_id = cc.block_id)
123               JOIN block_tx ON (block_tx.block_id = b.block_id)
124               JOIN tx ON (tx.tx_id = block_tx.tx_id)
125               JOIN txin ON (txin.tx_id = tx.tx_id)
126               JOIN txout prevout ON (txin.txout_id = prevout.txout_id)
127               JOIN pubkey ON (pubkey.pubkey_id = prevout.pubkey_id)
128              WHERE pubkey.pubkey_hash = ?
129                AND cc.in_longest = 1""", (dbhash,))
130
131     def get_address_out_rows_memorypool(self, dbhash):
132         return self.safe_sql(""" SELECT
133                 1,
134                 tx.tx_hash,
135                 tx.tx_id,
136                 txin.txin_pos,
137                 -prevout.txout_value
138               FROM tx 
139               JOIN txin ON (txin.tx_id = tx.tx_id)
140               JOIN txout prevout ON (txin.txout_id = prevout.txout_id)
141               JOIN pubkey ON (pubkey.pubkey_id = prevout.pubkey_id)
142              WHERE pubkey.pubkey_hash = ? """, (dbhash,))
143
144     def get_address_in_rows(self, dbhash):
145         return self.safe_sql(""" SELECT
146                 b.block_nTime,
147                 cc.chain_id,
148                 b.block_height,
149                 0,
150                 b.block_hash,
151                 tx.tx_hash,
152                 tx.tx_id,
153                 txout.txout_pos,
154                 txout.txout_value
155               FROM chain_candidate cc
156               JOIN block b ON (b.block_id = cc.block_id)
157               JOIN block_tx ON (block_tx.block_id = b.block_id)
158               JOIN tx ON (tx.tx_id = block_tx.tx_id)
159               JOIN txout ON (txout.tx_id = tx.tx_id)
160               JOIN pubkey ON (pubkey.pubkey_id = txout.pubkey_id)
161              WHERE pubkey.pubkey_hash = ?
162                AND cc.in_longest = 1""", (dbhash,))
163
164     def get_address_in_rows_memorypool(self, dbhash):
165         return self.safe_sql( """ SELECT
166                 0,
167                 tx.tx_hash,
168                 tx.tx_id,
169                 txout.txout_pos,
170                 txout.txout_value
171               FROM tx
172               JOIN txout ON (txout.tx_id = tx.tx_id)
173               JOIN pubkey ON (pubkey.pubkey_id = txout.pubkey_id)
174              WHERE pubkey.pubkey_hash = ? """, (dbhash,))
175
176     def get_txpoints(self, addr):
177         version, binaddr = decode_check_address(addr)
178         if binaddr is None:
179             return "err"
180         dbhash = self.binin(binaddr)
181         rows = []
182         rows += self.get_address_out_rows( dbhash )
183         rows += self.get_address_in_rows( dbhash )
184
185         txpoints = []
186         known_tx = []
187
188         for row in rows:
189             try:
190                 nTime, chain_id, height, is_in, blk_hash, tx_hash, tx_id, pos, value = row
191             except:
192                 print "cannot unpack row", row
193                 break
194             tx_hash = self.hashout_hex(tx_hash)
195             txpoint = {
196                     "nTime":    int(nTime),
197                     #"chain_id": int(chain_id),
198                     "height":   int(height),
199                     "is_in":    int(is_in),
200                     "blk_hash": self.hashout_hex(blk_hash),
201                     "tx_hash":  tx_hash,
202                     "tx_id":    int(tx_id),
203                     "pos":      int(pos),
204                     "value":    int(value),
205                     }
206
207             txpoints.append(txpoint)
208             known_tx.append(self.hashout_hex(tx_hash))
209
210
211         # todo: sort them really...
212         txpoints = sorted(txpoints, key=operator.itemgetter("nTime"))
213
214         # read memory pool
215         rows = []
216         rows += self.get_address_in_rows_memorypool( dbhash )
217         rows += self.get_address_out_rows_memorypool( dbhash )
218         for row in rows:
219             is_in, tx_hash, tx_id, pos, value = row
220             tx_hash = self.hashout_hex(tx_hash)
221             if tx_hash in known_tx:
222                 continue
223             #print "mempool", tx_hash
224             txpoint = {
225                     "nTime":    0,
226                     "chain_id": 1,
227                     "height":   0,
228                     "is_in":    int(is_in),
229                     "blk_hash": 'mempool',
230                     "tx_hash":  tx_hash,
231                     "tx_id":    int(tx_id),
232                     "pos":      int(pos),
233                     "value":    int(value),
234                     }
235             txpoints.append(txpoint)
236
237
238         for txpoint in txpoints:
239             tx_id = txpoint['tx_id']
240             
241             txinputs = []
242             inrows = self.get_tx_inputs(tx_id)
243             for row in inrows:
244                 _hash = self.binout(row[6])
245                 address = hash_to_address(chr(0), _hash)
246                 txinputs.append(address)
247             txpoint['inputs'] = txinputs
248             txoutputs = []
249             outrows = self.get_tx_outputs(tx_id)
250             for row in outrows:
251                 _hash = self.binout(row[6])
252                 address = hash_to_address(chr(0), _hash)
253                 txoutputs.append(address)
254             txpoint['outputs'] = txoutputs
255
256             # for all unspent inputs, I want their scriptpubkey. (actually I could deduce it from the address)
257             if not txpoint['is_in']:
258                 # detect if already redeemed...
259                 for row in outrows:
260                     if row[6] == dbhash: break
261                 else:
262                     raise
263                 #row = self.get_tx_output(tx_id,dbhash)
264                 # pos, script, value, o_hash, o_id, o_pos, binaddr = row
265                 # if not redeemed, we add the script
266                 if row:
267                     if not row[4]: txpoint['raw_scriptPubKey'] = row[1]
268
269
270         return txpoints
271
272
273     def get_status(self, addr):
274         # last block for an address
275         tx_points = self.get_txpoints(addr)
276         if not tx_points:
277             return None
278         else:
279             return tx_points[-1]['blk_hash']
280
281
282 def send_tx(tx):
283     import bitcoinrpc
284     conn = bitcoinrpc.connect_to_local()
285     try:
286         v = conn.importtransaction(tx)
287     except:
288         v = "error: transaction rejected by memorypool"
289     return v
290
291
292 def listen_thread(store):
293     s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
294     s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
295     s.bind((config.get('server','host'), config.getint('server','port')))
296     s.listen(1)
297     while not stopping:
298         conn, addr = s.accept()
299         thread.start_new_thread(client_thread, (addr, conn,))
300
301 def random_string(N):
302     import random, string
303     return ''.join(random.choice(string.ascii_uppercase + string.digits) for x in range(N))
304
305 def client_thread(ipaddr,conn):
306     #print "client thread", ipaddr
307     try:
308         ipaddr = ipaddr[0]
309         msg = ''
310         while 1:
311             d = conn.recv(1024)
312             msg += d
313             if d[-1]=='#':
314                 break
315
316         #print msg
317
318         try:
319             cmd, data = ast.literal_eval(msg[:-1])
320         except:
321             print "syntax error", repr(msg)
322             conn.close()
323             return
324
325         if cmd=='b':
326             out = "%d"%block_number
327         elif cmd=='session':
328             session_id = random_string(10)
329             try:
330                 addresses = ast.literal_eval(data)
331             except:
332                 print "error"
333                 conn.close()
334                 return
335
336             print time.asctime(), "session", ipaddr, session_id, addresses[0], len(addresses)
337
338             sessions[session_id] = {}
339             for a in addresses:
340                 sessions[session_id][a] = ''
341             out = repr( (session_id, config.get('server','banner').replace('\\n','\n') ) )
342             sessions_last_time[session_id] = time.time()
343
344         elif cmd=='poll': 
345             session_id = data
346             addresses = sessions.get(session_id)
347             if not addresses:
348                 print "session not found", ipaddr
349                 out = repr( (-1, {}))
350             else:
351                 sessions_last_time[session_id] = time.time()
352                 ret = {}
353                 for addr in addresses:
354                     status = store.get_status( addr )
355                     last_status = sessions[session_id].get( addr )
356                     if last_status != status:
357                         sessions[session_id][addr] = status
358                         ret[addr] = status
359                 out = repr( (block_number, ret ) )
360
361         elif cmd == 'h': 
362             # history
363             addr = data
364             h = store.get_txpoints( addr )
365             out = repr(h)
366
367         elif cmd == 'load': 
368             if config.get('server','password') == data:
369                 out = repr( len(sessions) )
370             else:
371                 out = 'wrong password'
372
373         elif cmd =='tx':        
374             out = send_tx(data)
375
376         elif cmd == 'stop':
377             global stopping
378             if config.get('server','password') == data:
379                 stopping = True
380                 out = 'ok'
381             else:
382                 out = 'wrong password'
383
384         elif cmd == 'peers':
385             out = repr(peer_list.values())
386
387         else:
388             out = None
389
390         if out:
391             #print ipaddr, cmd, len(out)
392             try:
393                 conn.send(out)
394             except:
395                 print "error, could not send"
396
397     finally:
398         conn.close()
399     
400
401 ds = BCDataStream.BCDataStream()
402
403
404 def memorypool_update(store):
405
406     conn = bitcoinrpc.connect_to_local()
407     try:
408         v = conn.getmemorypool()
409     except:
410         print "cannot contact bitcoin daemmon"
411         return
412     v = v['transactions']
413     for hextx in v:
414         ds.clear()
415         ds.write(hextx.decode('hex'))
416         tx = deserialize.parse_Transaction(ds)
417         tx['hash'] = util.double_sha256(tx['tx'])
418             
419         if store.tx_find_id_and_value(tx):
420             pass
421         else:
422             store.import_tx(tx, False)
423             #print tx['hash'][::-1].encode('hex')
424     store.commit()
425
426
427
428
429 def clean_session_thread():
430     while not stopping:
431         time.sleep(30)
432         t = time.time()
433         for k,t0 in sessions_last_time.items():
434             if t - t0 > 60:
435                 print "lost session",k
436                 sessions.pop(k)
437                 sessions_last_time.pop(k)
438             
439
440 def irc_thread():
441     global peer_list
442     NICK = 'E_'+random_string(10)
443     while not stopping:
444         try:
445             s = socket.socket()
446             s.connect(('irc.freenode.net', 6667))
447             s.send('USER electrum 0 * :'+config.get('server','host')+' '+config.get('server','ircname')+'\n')
448             s.send('NICK '+NICK+'\n')
449             s.send('JOIN #electrum\n')
450             sf = s.makefile('r', 0)
451             t = 0
452             while not stopping:
453                 line = sf.readline()
454                 line = line.rstrip('\r\n')
455                 line = line.split()
456                 if line[0]=='PING': 
457                     s.send('PONG '+line[1]+'\n')
458                 elif '353' in line: # answer to /names
459                     k = line.index('353')
460                     for item in line[k+1:]:
461                         if item[0:2] == 'E_':
462                             s.send('WHO %s\n'%item)
463                 elif '352' in line: # answer to /who
464                     # warning: this is a horrible hack which apparently works
465                     k = line.index('352')
466                     ip = line[k+4]
467                     ip = socket.gethostbyname(ip)
468                     name = line[k+6]
469                     host = line[k+9]
470                     peer_list[name] = (ip,host)
471                 elif time.time() - t > 5*60:
472                     s.send('NAMES #electrum\n')
473                     t = time.time()
474                     peer_list = {}
475         except:
476             traceback.print_exc(file=sys.stdout)
477         finally:
478             sf.close()
479             s.close()
480
481
482 import traceback
483
484
485 if __name__ == '__main__':
486
487     if len(sys.argv)>1:
488         cmd = sys.argv[1]
489         if cmd == 'load':
490             request = "('load','%s')#"%config.get('server','password')
491         elif cmd == 'peers':
492             request = "('peers','')#"
493         elif cmd == 'stop':
494             request = "('stop','%s')#"%config.get('server','password')
495
496         s = socket.socket( socket.AF_INET, socket.SOCK_STREAM)
497         s.connect((config.get('server','host'), config.getint('server','port')))
498         s.send( request )
499         out = ''
500         while 1:
501             msg = s.recv(1024)
502             if msg: out += msg
503             else: break
504         s.close()
505         print out
506         sys.exit(0)
507
508
509     print "starting Electrum server"
510     conf = DataStore.CONFIG_DEFAULTS
511     args, argv = readconf.parse_argv( [], conf)
512     args.dbtype= config.get('database','type')
513     if args.dbtype == 'sqlite3':
514         args.connect_args = { 'database' : config.get('database','database') }
515     elif args.dbtype == 'MySQLdb':
516         args.connect_args = { 'db' : config.get('database','database'), 'user' : config.get('database','username'), 'passwd' : config.get('database','password') }
517     elif args.dbtype == 'psycopg2':
518         args.connect_args = { 'database' : config.get('database','database') }
519     store = MyStore(args)
520
521     thread.start_new_thread(listen_thread, (store,))
522     thread.start_new_thread(clean_session_thread, ())
523     if (config.get('server','irc') == 'yes' ):
524         thread.start_new_thread(irc_thread, ())
525
526     while not stopping:
527         try:
528             dblock.acquire()
529             store.catch_up()
530             memorypool_update(store)
531             block_number = store.get_block_number(1)
532             dblock.release()
533         except:
534             traceback.print_exc(file=sys.stdout)
535         time.sleep(10)
536
537     print "server stopped"
538