show hostnames
[electrum-server.git] / server.py
1 #!/usr/bin/env python
2 # Copyright(C) 2011 thomasv@gitorious
3
4 # This program is free software: you can redistribute it and/or modify
5 # it under the terms of the GNU Affero General Public License as
6 # published by the Free Software Foundation, either version 3 of the
7 # License, or (at your option) any later version.
8 #
9 # This program is distributed in the hope that it will be useful, but
10 # WITHOUT ANY WARRANTY; without even the implied warranty of
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
12 # Affero General Public License for more details.
13 #
14 # You should have received a copy of the GNU Affero General Public
15 # License along with this program.  If not, see
16 # <http://www.gnu.org/licenses/agpl.html>.
17
18 """
19 Todo:
20    * server should check and return bitcoind status..
21    * improve txpoint sorting
22 """
23
24
25 import time, socket, operator, thread, ast, sys,re
26 import psycopg2, binascii
27 import bitcoinrpc
28
29 from Abe.abe import hash_to_address, decode_check_address
30 from Abe.DataStore import DataStore as Datastore_class
31 from Abe import DataStore, readconf, BCDataStream,  deserialize, util, base58
32
33 try:
34     f = open('/etc/electrum.conf','r')
35     data = f.read()
36     f.close()
37     HOST, PORT, PASSWORD, SERVER_MESSAGE = ast.literal_eval(data)
38 except:
39     print "could not read /etc/electrum.conf"
40     SERVER_MESSAGE = "Welcome to Electrum"
41     HOST = 'ecdsa.org'
42     PORT = 50000
43     PASSWORD = ''
44
45
46 stopping = False
47 block_number = -1
48 sessions = {}
49 sessions_last_time = {}
50 dblock = thread.allocate_lock()
51
52 peer_list = {}
53
54 class MyStore(Datastore_class):
55
56     def safe_sql(self,sql):
57         try:
58             dblock.acquire()
59             ret = self.selectall(sql)
60             dblock.release()
61             return ret
62         except:
63             print "sql error", sql
64             return []
65
66     def get_tx_outputs(self, tx_id):
67         return self.safe_sql("""SELECT
68                 txout.txout_pos,
69                 txout.txout_scriptPubKey,
70                 txout.txout_value,
71                 nexttx.tx_hash,
72                 nexttx.tx_id,
73                 txin.txin_pos,
74                 pubkey.pubkey_hash
75               FROM txout
76               LEFT JOIN txin ON (txin.txout_id = txout.txout_id)
77               LEFT JOIN pubkey ON (pubkey.pubkey_id = txout.pubkey_id)
78               LEFT JOIN tx nexttx ON (txin.tx_id = nexttx.tx_id)
79              WHERE txout.tx_id = %d 
80              ORDER BY txout.txout_pos
81         """%(tx_id))
82
83     def get_tx_inputs(self, tx_id):
84         return self.safe_sql(""" SELECT
85                 txin.txin_pos,
86                 txin.txin_scriptSig,
87                 txout.txout_value,
88                 COALESCE(prevtx.tx_hash, u.txout_tx_hash),
89                 prevtx.tx_id,
90                 COALESCE(txout.txout_pos, u.txout_pos),
91                 pubkey.pubkey_hash
92               FROM txin
93               LEFT JOIN txout ON (txout.txout_id = txin.txout_id)
94               LEFT JOIN pubkey ON (pubkey.pubkey_id = txout.pubkey_id)
95               LEFT JOIN tx prevtx ON (txout.tx_id = prevtx.tx_id)
96               LEFT JOIN unlinked_txin u ON (u.txin_id = txin.txin_id)
97              WHERE txin.tx_id = %d
98              ORDER BY txin.txin_pos
99              """%(tx_id,))
100
101     def get_address_out_rows(self, dbhash):
102         return self.safe_sql(""" SELECT
103                 b.block_nTime,
104                 cc.chain_id,
105                 b.block_height,
106                 1,
107                 b.block_hash,
108                 tx.tx_hash,
109                 tx.tx_id,
110                 txin.txin_pos,
111                 -prevout.txout_value
112               FROM chain_candidate cc
113               JOIN block b ON (b.block_id = cc.block_id)
114               JOIN block_tx ON (block_tx.block_id = b.block_id)
115               JOIN tx ON (tx.tx_id = block_tx.tx_id)
116               JOIN txin ON (txin.tx_id = tx.tx_id)
117               JOIN txout prevout ON (txin.txout_id = prevout.txout_id)
118               JOIN pubkey ON (pubkey.pubkey_id = prevout.pubkey_id)
119              WHERE pubkey.pubkey_hash = '%s'
120                AND cc.in_longest = 1"""%dbhash)
121
122     def get_address_out_rows_memorypool(self, dbhash):
123         return self.safe_sql(""" SELECT
124                 1,
125                 tx.tx_hash,
126                 tx.tx_id,
127                 txin.txin_pos,
128                 -prevout.txout_value
129               FROM tx 
130               JOIN txin ON (txin.tx_id = tx.tx_id)
131               JOIN txout prevout ON (txin.txout_id = prevout.txout_id)
132               JOIN pubkey ON (pubkey.pubkey_id = prevout.pubkey_id)
133              WHERE pubkey.pubkey_hash ='%s' """%(dbhash))
134
135     def get_address_in_rows(self, dbhash):
136         return self.safe_sql(""" SELECT
137                 b.block_nTime,
138                 cc.chain_id,
139                 b.block_height,
140                 0,
141                 b.block_hash,
142                 tx.tx_hash,
143                 tx.tx_id,
144                 txout.txout_pos,
145                 txout.txout_value
146               FROM chain_candidate cc
147               JOIN block b ON (b.block_id = cc.block_id)
148               JOIN block_tx ON (block_tx.block_id = b.block_id)
149               JOIN tx ON (tx.tx_id = block_tx.tx_id)
150               JOIN txout ON (txout.tx_id = tx.tx_id)
151               JOIN pubkey ON (pubkey.pubkey_id = txout.pubkey_id)
152              WHERE pubkey.pubkey_hash = '%s'
153                AND cc.in_longest = 1"""%(dbhash))
154
155     def get_address_in_rows_memorypool(self, dbhash):
156         return self.safe_sql( """ SELECT
157                 0,
158                 tx.tx_hash,
159                 tx.tx_id,
160                 txout.txout_pos,
161                 txout.txout_value
162               FROM tx
163               JOIN txout ON (txout.tx_id = tx.tx_id)
164               JOIN pubkey ON (pubkey.pubkey_id = txout.pubkey_id)
165              WHERE pubkey.pubkey_hash = '%s' """%(dbhash))
166
167     def get_txpoints(self, addr):
168         version, binaddr = decode_check_address(addr)
169         if binaddr is None:
170             return "err"
171         dbhash = self.binin(binaddr)
172         rows = []
173         rows += self.get_address_out_rows( dbhash )
174         rows += self.get_address_in_rows( dbhash )
175
176         txpoints = []
177         known_tx = []
178
179         for row in rows:
180             try:
181                 nTime, chain_id, height, is_in, blk_hash, tx_hash, tx_id, pos, value = row
182             except:
183                 print "cannot unpack row", row
184                 break
185             tx_hash = self.hashout_hex(tx_hash)
186             txpoint = {
187                     "nTime":    int(nTime),
188                     #"chain_id": int(chain_id),
189                     "height":   int(height),
190                     "is_in":    int(is_in),
191                     "blk_hash": self.hashout_hex(blk_hash),
192                     "tx_hash":  tx_hash,
193                     "tx_id":    int(tx_id),
194                     "pos":      int(pos),
195                     "value":    int(value),
196                     }
197
198             txpoints.append(txpoint)
199             known_tx.append(self.hashout_hex(tx_hash))
200
201
202         # todo: sort them really...
203         txpoints = sorted(txpoints, key=operator.itemgetter("nTime"))
204
205         # read memory pool
206         rows = []
207         rows += self.get_address_in_rows_memorypool( dbhash )
208         rows += self.get_address_out_rows_memorypool( dbhash )
209         for row in rows:
210             is_in, tx_hash, tx_id, pos, value = row
211             tx_hash = self.hashout_hex(tx_hash)
212             if tx_hash in known_tx:
213                 continue
214             #print "mempool", tx_hash
215             txpoint = {
216                     "nTime":    0,
217                     "chain_id": 1,
218                     "height":   0,
219                     "is_in":    int(is_in),
220                     "blk_hash": 'mempool',
221                     "tx_hash":  tx_hash,
222                     "tx_id":    int(tx_id),
223                     "pos":      int(pos),
224                     "value":    int(value),
225                     }
226             txpoints.append(txpoint)
227
228
229         for txpoint in txpoints:
230             tx_id = txpoint['tx_id']
231             
232             txinputs = []
233             inrows = self.get_tx_inputs(tx_id)
234             for row in inrows:
235                 _hash = self.binout(row[6])
236                 address = hash_to_address(chr(0), _hash)
237                 txinputs.append(address)
238             txpoint['inputs'] = txinputs
239             txoutputs = []
240             outrows = self.get_tx_outputs(tx_id)
241             for row in outrows:
242                 _hash = self.binout(row[6])
243                 address = hash_to_address(chr(0), _hash)
244                 txoutputs.append(address)
245             txpoint['outputs'] = txoutputs
246
247             # for all unspent inputs, I want their scriptpubkey. (actually I could deduce it from the address)
248             if not txpoint['is_in']:
249                 # detect if already redeemed...
250                 for row in outrows:
251                     if row[6] == dbhash: break
252                 else:
253                     raise
254                 #row = self.get_tx_output(tx_id,dbhash)
255                 # pos, script, value, o_hash, o_id, o_pos, binaddr = row
256                 # if not redeemed, we add the script
257                 if row:
258                     if not row[4]: txpoint['raw_scriptPubKey'] = row[1]
259
260
261         return txpoints
262
263
264     def get_status(self, addr):
265         # last block for an address
266         tx_points = self.get_txpoints(addr)
267         if not tx_points:
268             return None
269         else:
270             return tx_points[-1]['blk_hash']
271
272
273 def send_tx(tx):
274     import bitcoinrpc
275     conn = bitcoinrpc.connect_to_local()
276     try:
277         v = conn.importtransaction(tx)
278     except:
279         v = "error: transaction rejected by memorypool"
280     return v
281
282
283 def listen_thread(store):
284     s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
285     s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
286     s.bind((HOST, PORT))
287     s.listen(1)
288     while not stopping:
289         conn, addr = s.accept()
290         thread.start_new_thread(client_thread, (addr, conn,))
291
292 def random_string(N):
293     import random, string
294     return ''.join(random.choice(string.ascii_uppercase + string.digits) for x in range(N))
295
296 def client_thread(ipaddr,conn):
297     #print "client thread", ipaddr
298     try:
299         ipaddr = ipaddr[0]
300         msg = ''
301         while 1:
302             d = conn.recv(1024)
303             msg += d
304             if d[-1]=='#':
305                 break
306
307         #print msg
308
309         try:
310             cmd, data = ast.literal_eval(msg[:-1])
311         except:
312             print "syntax error", repr(msg)
313             conn.close()
314             return
315
316         if cmd=='b':
317             out = "%d"%block_number
318         elif cmd=='session':
319             session_id = random_string(10)
320             try:
321                 addresses = ast.literal_eval(data)
322             except:
323                 print "error"
324                 conn.close()
325                 return
326
327             print time.asctime(), "session", ipaddr, session_id, addresses[0], len(addresses)
328
329             sessions[session_id] = {}
330             for a in addresses:
331                 sessions[session_id][a] = ''
332             out = repr( (session_id, SERVER_MESSAGE) )
333             sessions_last_time[session_id] = time.time()
334
335         elif cmd=='poll': 
336             session_id = data
337             addresses = sessions.get(session_id)
338             if not addresses:
339                 print "session not found", ipaddr
340                 out = repr( (-1, {}))
341             else:
342                 sessions_last_time[session_id] = time.time()
343                 ret = {}
344                 for addr in addresses:
345                     status = store.get_status( addr )
346                     last_status = sessions[session_id].get( addr )
347                     if last_status != status:
348                         sessions[session_id][addr] = status
349                         ret[addr] = status
350                 out = repr( (block_number, ret ) )
351
352         elif cmd == 'h': 
353             # history
354             addr = data
355             h = store.get_txpoints( addr )
356             out = repr(h)
357
358         elif cmd == 'load': 
359             if PASSWORD == data:
360                 out = repr( len(sessions) )
361             else:
362                 out = 'wrong password'
363
364         elif cmd =='tx':        
365             out = send_tx(data)
366
367         elif cmd == 'stop':
368             global stopping
369             if PASSWORD == data:
370                 stopping = True
371                 out = 'ok'
372             else:
373                 out = 'wrong password'
374
375         elif cmd == 'peers':
376             out = repr(peer_list.values())
377
378         else:
379             out = None
380
381         if out:
382             #print ipaddr, cmd, len(out)
383             try:
384                 conn.send(out)
385             except:
386                 print "error, could not send"
387
388     finally:
389         conn.close()
390     
391
392 ds = BCDataStream.BCDataStream()
393
394
395 def memorypool_update(store):
396
397     conn = bitcoinrpc.connect_to_local()
398     try:
399         v = conn.getmemorypool()
400     except:
401         print "cannot contact bitcoin daemmon"
402         return
403     v = v['transactions']
404     for hextx in v:
405         ds.clear()
406         ds.write(hextx.decode('hex'))
407         tx = deserialize.parse_Transaction(ds)
408         tx['hash'] = util.double_sha256(tx['tx'])
409             
410         if store.tx_find_id_and_value(tx):
411             pass
412         else:
413             store.import_tx(tx, False)
414             #print tx['hash'][::-1].encode('hex')
415     store.commit()
416
417
418
419
420 def clean_session_thread():
421     while not stopping:
422         time.sleep(3)
423         t = time.time()
424         for k,t0 in sessions_last_time.items():
425             if t - t0 > 60:
426                 print "lost session",k
427                 sessions.pop(k)
428                 sessions_last_time.pop(k)
429             
430
431 def irc_thread():
432     global peer_list
433     NICK = 'E_'+random_string(10)
434     while not stopping:
435         try:
436             s = socket.socket()
437             s.connect(('irc.freenode.net', 6667))
438             s.send('USER '+HOST+' '+NICK+' bla :'+NICK+'\n') 
439             s.send('NICK '+NICK+'\n')
440             s.send('JOIN #electrum\n')
441             t = 0
442             while not stopping:
443                 line = s.recv(2048)
444                 line = line.rstrip('\r\n')
445                 line = line.split()
446                 if line[0]=='PING': 
447                     s.send('PONG '+line[1]+'\n')
448                 elif '353' in line: # answer to /names
449                     k = line.index('353')
450                     try:
451                         k2 = line.index('366')
452                     except:
453                         continue
454                     for item in line[k+1:k2]:
455                         if item[0:2] == 'E_':
456                             s.send('USERHOST %s\n'%item)
457                 elif '302' in line: # answer to /userhost
458                     k = line.index('302')
459                     m = re.match( "^:(.*?)=\+~(.*?)@(.*?)$", line[k+2] )
460                     if m:
461                         name = m.group(1)
462                         host = m.group(2)
463                         ip = m.group(3)
464                         peer_list[name] = (ip,host)
465                 elif time.time() - t > 5*60:
466                     s.send('NAMES #electrum\n')
467                     t = time.time()
468         except:
469             traceback.print_exc(file=sys.stdout)
470         finally:
471             s.close()
472
473
474 import traceback
475
476
477 if __name__ == '__main__':
478
479     if len(sys.argv)>1:
480         cmd = sys.argv[1]
481         if cmd == 'load':
482             request = "('load','%s')#"%PASSWORD
483         elif cmd == 'peers':
484             request = "('peers','')#"
485         elif cmd == 'stop':
486             request = "('stop','%s')#"%PASSWORD
487
488         s = socket.socket( socket.AF_INET, socket.SOCK_STREAM)
489         s.connect(( HOST, PORT))
490         s.send( request )
491         out = ''
492         while 1:
493             msg = s.recv(1024)
494             if msg: out += msg
495             else: break
496         s.close()
497         print out
498         sys.exit(0)
499
500
501     print "starting Electrum server"
502     conf = DataStore.CONFIG_DEFAULTS
503     args, argv = readconf.parse_argv( [], conf)
504     args.dbtype='psycopg2'
505     args.connect_args = {"database":"abe"}
506     store = MyStore(args)
507
508     thread.start_new_thread(listen_thread, (store,))
509     thread.start_new_thread(clean_session_thread, ())
510     thread.start_new_thread(irc_thread, ())
511
512     while not stopping:
513         try:
514             dblock.acquire()
515             store.catch_up()
516             memorypool_update(store)
517             block_number = store.get_block_number(1)
518             dblock.release()
519         except:
520             traceback.print_exc(file=sys.stdout)
521         time.sleep(10)
522
523     print "server stopped"
524