Privacy feature: allow running a private server, that does not register on IRC. New...
[electrum-server.git] / server.py
1 #!/usr/bin/env python
2 # Copyright(C) 2011 thomasv@gitorious
3
4 # This program is free software: you can redistribute it and/or modify
5 # it under the terms of the GNU Affero General Public License as
6 # published by the Free Software Foundation, either version 3 of the
7 # License, or (at your option) any later version.
8 #
9 # This program is distributed in the hope that it will be useful, but
10 # WITHOUT ANY WARRANTY; without even the implied warranty of
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
12 # Affero General Public License for more details.
13 #
14 # You should have received a copy of the GNU Affero General Public
15 # License along with this program.  If not, see
16 # <http://www.gnu.org/licenses/agpl.html>.
17
18 """
19 Todo:
20    * server should check and return bitcoind status..
21    * improve txpoint sorting
22 """
23
24
25 import time, socket, operator, thread, ast, sys,re
26 import psycopg2, binascii
27 import bitcoinrpc
28
29 from Abe.abe import hash_to_address, decode_check_address
30 from Abe.DataStore import DataStore as Datastore_class
31 from Abe import DataStore, readconf, BCDataStream,  deserialize, util, base58
32
33 import ConfigParser
34
35 config = ConfigParser.ConfigParser()
36 # set some defaults, which will be overwritten by the config file
37 config.add_section('server')
38 config.set('server','banner', 'Welcome to Electrum!')
39 config.set('server', 'host', 'ecdsa.org')
40 config.set('server', 'port', 50000)
41 config.set('server', 'password', '')
42 config.set('server', 'irc', 'yes')
43 config.add_section('database')
44 config.set('database', 'type', 'psycopg2')
45 config.set('database', 'database', 'abe')
46
47 try:
48     f = open('/etc/electrum.conf','r')
49     config.readfp(f)
50     f.close()
51 except:
52     print "Could not read electrum.conf. I will use the dafault values."
53
54 stopping = False
55 block_number = -1
56 sessions = {}
57 sessions_last_time = {}
58 dblock = thread.allocate_lock()
59
60 peer_list = {}
61
62 class MyStore(Datastore_class):
63
64     def safe_sql(self,sql, params=()):
65         try:
66             dblock.acquire()
67             ret = self.selectall(sql,params)
68             dblock.release()
69             return ret
70         except:
71             print "sql error", sql
72             return []
73
74     def get_tx_outputs(self, tx_id):
75         return self.safe_sql("""SELECT
76                 txout.txout_pos,
77                 txout.txout_scriptPubKey,
78                 txout.txout_value,
79                 nexttx.tx_hash,
80                 nexttx.tx_id,
81                 txin.txin_pos,
82                 pubkey.pubkey_hash
83               FROM txout
84               LEFT JOIN txin ON (txin.txout_id = txout.txout_id)
85               LEFT JOIN pubkey ON (pubkey.pubkey_id = txout.pubkey_id)
86               LEFT JOIN tx nexttx ON (txin.tx_id = nexttx.tx_id)
87              WHERE txout.tx_id = %d 
88              ORDER BY txout.txout_pos
89         """%(tx_id))
90
91     def get_tx_inputs(self, tx_id):
92         return self.safe_sql(""" SELECT
93                 txin.txin_pos,
94                 txin.txin_scriptSig,
95                 txout.txout_value,
96                 COALESCE(prevtx.tx_hash, u.txout_tx_hash),
97                 prevtx.tx_id,
98                 COALESCE(txout.txout_pos, u.txout_pos),
99                 pubkey.pubkey_hash
100               FROM txin
101               LEFT JOIN txout ON (txout.txout_id = txin.txout_id)
102               LEFT JOIN pubkey ON (pubkey.pubkey_id = txout.pubkey_id)
103               LEFT JOIN tx prevtx ON (txout.tx_id = prevtx.tx_id)
104               LEFT JOIN unlinked_txin u ON (u.txin_id = txin.txin_id)
105              WHERE txin.tx_id = %d
106              ORDER BY txin.txin_pos
107              """%(tx_id,))
108
109     def get_address_out_rows(self, dbhash):
110         return self.safe_sql(""" SELECT
111                 b.block_nTime,
112                 cc.chain_id,
113                 b.block_height,
114                 1,
115                 b.block_hash,
116                 tx.tx_hash,
117                 tx.tx_id,
118                 txin.txin_pos,
119                 -prevout.txout_value
120               FROM chain_candidate cc
121               JOIN block b ON (b.block_id = cc.block_id)
122               JOIN block_tx ON (block_tx.block_id = b.block_id)
123               JOIN tx ON (tx.tx_id = block_tx.tx_id)
124               JOIN txin ON (txin.tx_id = tx.tx_id)
125               JOIN txout prevout ON (txin.txout_id = prevout.txout_id)
126               JOIN pubkey ON (pubkey.pubkey_id = prevout.pubkey_id)
127              WHERE pubkey.pubkey_hash = ?
128                AND cc.in_longest = 1""", (dbhash,))
129
130     def get_address_out_rows_memorypool(self, dbhash):
131         return self.safe_sql(""" SELECT
132                 1,
133                 tx.tx_hash,
134                 tx.tx_id,
135                 txin.txin_pos,
136                 -prevout.txout_value
137               FROM tx 
138               JOIN txin ON (txin.tx_id = tx.tx_id)
139               JOIN txout prevout ON (txin.txout_id = prevout.txout_id)
140               JOIN pubkey ON (pubkey.pubkey_id = prevout.pubkey_id)
141              WHERE pubkey.pubkey_hash = ? """, (dbhash,))
142
143     def get_address_in_rows(self, dbhash):
144         return self.safe_sql(""" SELECT
145                 b.block_nTime,
146                 cc.chain_id,
147                 b.block_height,
148                 0,
149                 b.block_hash,
150                 tx.tx_hash,
151                 tx.tx_id,
152                 txout.txout_pos,
153                 txout.txout_value
154               FROM chain_candidate cc
155               JOIN block b ON (b.block_id = cc.block_id)
156               JOIN block_tx ON (block_tx.block_id = b.block_id)
157               JOIN tx ON (tx.tx_id = block_tx.tx_id)
158               JOIN txout ON (txout.tx_id = tx.tx_id)
159               JOIN pubkey ON (pubkey.pubkey_id = txout.pubkey_id)
160              WHERE pubkey.pubkey_hash = ?
161                AND cc.in_longest = 1""", (dbhash,))
162
163     def get_address_in_rows_memorypool(self, dbhash):
164         return self.safe_sql( """ SELECT
165                 0,
166                 tx.tx_hash,
167                 tx.tx_id,
168                 txout.txout_pos,
169                 txout.txout_value
170               FROM tx
171               JOIN txout ON (txout.tx_id = tx.tx_id)
172               JOIN pubkey ON (pubkey.pubkey_id = txout.pubkey_id)
173              WHERE pubkey.pubkey_hash = ? """, (dbhash,))
174
175     def get_txpoints(self, addr):
176         version, binaddr = decode_check_address(addr)
177         if binaddr is None:
178             return "err"
179         dbhash = self.binin(binaddr)
180         rows = []
181         rows += self.get_address_out_rows( dbhash )
182         rows += self.get_address_in_rows( dbhash )
183
184         txpoints = []
185         known_tx = []
186
187         for row in rows:
188             try:
189                 nTime, chain_id, height, is_in, blk_hash, tx_hash, tx_id, pos, value = row
190             except:
191                 print "cannot unpack row", row
192                 break
193             tx_hash = self.hashout_hex(tx_hash)
194             txpoint = {
195                     "nTime":    int(nTime),
196                     #"chain_id": int(chain_id),
197                     "height":   int(height),
198                     "is_in":    int(is_in),
199                     "blk_hash": self.hashout_hex(blk_hash),
200                     "tx_hash":  tx_hash,
201                     "tx_id":    int(tx_id),
202                     "pos":      int(pos),
203                     "value":    int(value),
204                     }
205
206             txpoints.append(txpoint)
207             known_tx.append(self.hashout_hex(tx_hash))
208
209
210         # todo: sort them really...
211         txpoints = sorted(txpoints, key=operator.itemgetter("nTime"))
212
213         # read memory pool
214         rows = []
215         rows += self.get_address_in_rows_memorypool( dbhash )
216         rows += self.get_address_out_rows_memorypool( dbhash )
217         for row in rows:
218             is_in, tx_hash, tx_id, pos, value = row
219             tx_hash = self.hashout_hex(tx_hash)
220             if tx_hash in known_tx:
221                 continue
222             #print "mempool", tx_hash
223             txpoint = {
224                     "nTime":    0,
225                     "chain_id": 1,
226                     "height":   0,
227                     "is_in":    int(is_in),
228                     "blk_hash": 'mempool',
229                     "tx_hash":  tx_hash,
230                     "tx_id":    int(tx_id),
231                     "pos":      int(pos),
232                     "value":    int(value),
233                     }
234             txpoints.append(txpoint)
235
236
237         for txpoint in txpoints:
238             tx_id = txpoint['tx_id']
239             
240             txinputs = []
241             inrows = self.get_tx_inputs(tx_id)
242             for row in inrows:
243                 _hash = self.binout(row[6])
244                 address = hash_to_address(chr(0), _hash)
245                 txinputs.append(address)
246             txpoint['inputs'] = txinputs
247             txoutputs = []
248             outrows = self.get_tx_outputs(tx_id)
249             for row in outrows:
250                 _hash = self.binout(row[6])
251                 address = hash_to_address(chr(0), _hash)
252                 txoutputs.append(address)
253             txpoint['outputs'] = txoutputs
254
255             # for all unspent inputs, I want their scriptpubkey. (actually I could deduce it from the address)
256             if not txpoint['is_in']:
257                 # detect if already redeemed...
258                 for row in outrows:
259                     if row[6] == dbhash: break
260                 else:
261                     raise
262                 #row = self.get_tx_output(tx_id,dbhash)
263                 # pos, script, value, o_hash, o_id, o_pos, binaddr = row
264                 # if not redeemed, we add the script
265                 if row:
266                     if not row[4]: txpoint['raw_scriptPubKey'] = row[1]
267
268
269         return txpoints
270
271
272     def get_status(self, addr):
273         # last block for an address
274         tx_points = self.get_txpoints(addr)
275         if not tx_points:
276             return None
277         else:
278             return tx_points[-1]['blk_hash']
279
280
281 def send_tx(tx):
282     import bitcoinrpc
283     conn = bitcoinrpc.connect_to_local()
284     try:
285         v = conn.importtransaction(tx)
286     except:
287         v = "error: transaction rejected by memorypool"
288     return v
289
290
291 def listen_thread(store):
292     s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
293     s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
294     s.bind((config.get('server','host'), config.getint('server','port')))
295     s.listen(1)
296     while not stopping:
297         conn, addr = s.accept()
298         thread.start_new_thread(client_thread, (addr, conn,))
299
300 def random_string(N):
301     import random, string
302     return ''.join(random.choice(string.ascii_uppercase + string.digits) for x in range(N))
303
304 def client_thread(ipaddr,conn):
305     #print "client thread", ipaddr
306     try:
307         ipaddr = ipaddr[0]
308         msg = ''
309         while 1:
310             d = conn.recv(1024)
311             msg += d
312             if d[-1]=='#':
313                 break
314
315         #print msg
316
317         try:
318             cmd, data = ast.literal_eval(msg[:-1])
319         except:
320             print "syntax error", repr(msg)
321             conn.close()
322             return
323
324         if cmd=='b':
325             out = "%d"%block_number
326         elif cmd=='session':
327             session_id = random_string(10)
328             try:
329                 addresses = ast.literal_eval(data)
330             except:
331                 print "error"
332                 conn.close()
333                 return
334
335             print time.asctime(), "session", ipaddr, session_id, addresses[0], len(addresses)
336
337             sessions[session_id] = {}
338             for a in addresses:
339                 sessions[session_id][a] = ''
340             out = repr( (session_id, config.get('server','banner').replace('\\n','\n') ) )
341             sessions_last_time[session_id] = time.time()
342
343         elif cmd=='poll': 
344             session_id = data
345             addresses = sessions.get(session_id)
346             if not addresses:
347                 print "session not found", ipaddr
348                 out = repr( (-1, {}))
349             else:
350                 sessions_last_time[session_id] = time.time()
351                 ret = {}
352                 for addr in addresses:
353                     status = store.get_status( addr )
354                     last_status = sessions[session_id].get( addr )
355                     if last_status != status:
356                         sessions[session_id][addr] = status
357                         ret[addr] = status
358                 out = repr( (block_number, ret ) )
359
360         elif cmd == 'h': 
361             # history
362             addr = data
363             h = store.get_txpoints( addr )
364             out = repr(h)
365
366         elif cmd == 'load': 
367             if config.get('server','password') == data:
368                 out = repr( len(sessions) )
369             else:
370                 out = 'wrong password'
371
372         elif cmd =='tx':        
373             out = send_tx(data)
374
375         elif cmd == 'stop':
376             global stopping
377             if config.get('server','password') == data:
378                 stopping = True
379                 out = 'ok'
380             else:
381                 out = 'wrong password'
382
383         elif cmd == 'peers':
384             out = repr(peer_list.values())
385
386         else:
387             out = None
388
389         if out:
390             #print ipaddr, cmd, len(out)
391             try:
392                 conn.send(out)
393             except:
394                 print "error, could not send"
395
396     finally:
397         conn.close()
398     
399
400 ds = BCDataStream.BCDataStream()
401
402
403 def memorypool_update(store):
404
405     conn = bitcoinrpc.connect_to_local()
406     try:
407         v = conn.getmemorypool()
408     except:
409         print "cannot contact bitcoin daemmon"
410         return
411     v = v['transactions']
412     for hextx in v:
413         ds.clear()
414         ds.write(hextx.decode('hex'))
415         tx = deserialize.parse_Transaction(ds)
416         tx['hash'] = util.double_sha256(tx['tx'])
417             
418         if store.tx_find_id_and_value(tx):
419             pass
420         else:
421             store.import_tx(tx, False)
422             #print tx['hash'][::-1].encode('hex')
423     store.commit()
424
425
426
427
428 def clean_session_thread():
429     while not stopping:
430         time.sleep(30)
431         t = time.time()
432         for k,t0 in sessions_last_time.items():
433             if t - t0 > 60:
434                 print "lost session",k
435                 sessions.pop(k)
436                 sessions_last_time.pop(k)
437             
438
439 def irc_thread():
440     global peer_list
441     NICK = 'E_'+random_string(10)
442     while not stopping:
443         try:
444             s = socket.socket()
445             s.connect(('irc.freenode.net', 6667))
446             s.send('USER '+config.get('server','host')+' '+NICK+' bla :'+NICK+'\n') 
447             s.send('NICK '+NICK+'\n')
448             s.send('JOIN #electrum\n')
449             t = 0
450             while not stopping:
451                 line = s.recv(2048)
452                 line = line.rstrip('\r\n')
453                 line = line.split()
454                 if line[0]=='PING': 
455                     s.send('PONG '+line[1]+'\n')
456                 elif '353' in line: # answer to /names
457                     k = line.index('353')
458                     try:
459                         k2 = line.index('366')
460                     except:
461                         continue
462                     for item in line[k+1:k2]:
463                         if item[0:2] == 'E_':
464                             s.send('USERHOST %s\n'%item)
465                 elif '302' in line: # answer to /userhost
466                     k = line.index('302')
467                     m = re.match( "^:(.*?)=\+~(.*?)@(.*?)$", line[k+2] )
468                     if m:
469                         name = m.group(1)
470                         host = m.group(2)
471                         ip = m.group(3)
472                         peer_list[name] = (ip,host)
473                 elif time.time() - t > 5*60:
474                     s.send('NAMES #electrum\n')
475                     t = time.time()
476         except:
477             traceback.print_exc(file=sys.stdout)
478         finally:
479             s.close()
480
481
482 import traceback
483
484
485 if __name__ == '__main__':
486
487     if len(sys.argv)>1:
488         cmd = sys.argv[1]
489         if cmd == 'load':
490             request = "('load','%s')#"%config.get('server','password')
491         elif cmd == 'peers':
492             request = "('peers','')#"
493         elif cmd == 'stop':
494             request = "('stop','%s')#"%config.get('server','password')
495
496         s = socket.socket( socket.AF_INET, socket.SOCK_STREAM)
497         s.connect((config.get('server','host'), config.getint('server','port')))
498         s.send( request )
499         out = ''
500         while 1:
501             msg = s.recv(1024)
502             if msg: out += msg
503             else: break
504         s.close()
505         print out
506         sys.exit(0)
507
508
509     print "starting Electrum server"
510     conf = DataStore.CONFIG_DEFAULTS
511     args, argv = readconf.parse_argv( [], conf)
512     args.dbtype= config.get('database','type')
513     if args.dbtype == 'sqlite3':
514         args.connect_args = { 'database' : config.get('database','database') }
515     elif args.dbtype == 'MySQLdb':
516         args.connect_args = { 'db' : config.get('database','database'), 'user' : config.get('database','user'), 'passwd' : config.get('database','pass') }
517     elif args.dbtype == 'psycopg2':
518         args.connect_args = { 'database' : config.get('database','database') }
519     store = MyStore(args)
520
521     thread.start_new_thread(listen_thread, (store,))
522     thread.start_new_thread(clean_session_thread, ())
523     if (config.get('server','irc') == 'yes' ):
524         thread.start_new_thread(irc_thread, ())
525
526     while not stopping:
527         try:
528             dblock.acquire()
529             store.catch_up()
530             memorypool_update(store)
531             block_number = store.get_block_number(1)
532             dblock.release()
533         except:
534             traceback.print_exc(file=sys.stdout)
535         time.sleep(10)
536
537     print "server stopped"
538