Ok, *really* fix to use the correct bitcoinrpc module.
[electrum-server.git] / server.py
1 #!/usr/bin/env python
2 # Copyright(C) 2011 thomasv@gitorious
3
4 # This program is free software: you can redistribute it and/or modify
5 # it under the terms of the GNU Affero General Public License as
6 # published by the Free Software Foundation, either version 3 of the
7 # License, or (at your option) any later version.
8 #
9 # This program is distributed in the hope that it will be useful, but
10 # WITHOUT ANY WARRANTY; without even the implied warranty of
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
12 # Affero General Public License for more details.
13 #
14 # You should have received a copy of the GNU Affero General Public
15 # License along with this program.  If not, see
16 # <http://www.gnu.org/licenses/agpl.html>.
17
18 """
19 Todo:
20    * server should check and return bitcoind status..
21    * improve txpoint sorting
22 """
23
24
25 import time, socket, operator, thread, ast, sys,re
26 import psycopg2, binascii
27 import bitcoinrpc
28
29 from Abe.abe import hash_to_address, decode_check_address
30 from Abe.DataStore import DataStore as Datastore_class
31 from Abe import DataStore, readconf, BCDataStream,  deserialize, util, base58
32
33 import ConfigParser
34
35 config = ConfigParser.ConfigParser()
36 # set some defaults, which will be overwritten by the config file
37 config.add_section('server')
38 config.set('server','banner', 'Welcome to Electrum!')
39 config.set('server', 'host', 'ecdsa.org')
40 config.set('server', 'port', 50000)
41 config.set('server', 'password', '')
42 config.add_section('database')
43 config.set('database', 'type', 'psycopg2')
44 config.set('database', 'database', 'abe')
45
46 try:
47     f = open('/etc/electrum.conf','r')
48     config.readfp(f)
49     f.close()
50 except:
51     print "Could not read electrum.conf. I will use the dafault values."
52
53 stopping = False
54 block_number = -1
55 sessions = {}
56 sessions_last_time = {}
57 dblock = thread.allocate_lock()
58
59 peer_list = {}
60
61 class MyStore(Datastore_class):
62
63     def safe_sql(self,sql):
64         try:
65             dblock.acquire()
66             ret = self.selectall(sql)
67             dblock.release()
68             return ret
69         except:
70             print "sql error", sql
71             return []
72
73     def get_tx_outputs(self, tx_id):
74         return self.safe_sql("""SELECT
75                 txout.txout_pos,
76                 txout.txout_scriptPubKey,
77                 txout.txout_value,
78                 nexttx.tx_hash,
79                 nexttx.tx_id,
80                 txin.txin_pos,
81                 pubkey.pubkey_hash
82               FROM txout
83               LEFT JOIN txin ON (txin.txout_id = txout.txout_id)
84               LEFT JOIN pubkey ON (pubkey.pubkey_id = txout.pubkey_id)
85               LEFT JOIN tx nexttx ON (txin.tx_id = nexttx.tx_id)
86              WHERE txout.tx_id = %d 
87              ORDER BY txout.txout_pos
88         """%(tx_id))
89
90     def get_tx_inputs(self, tx_id):
91         return self.safe_sql(""" SELECT
92                 txin.txin_pos,
93                 txin.txin_scriptSig,
94                 txout.txout_value,
95                 COALESCE(prevtx.tx_hash, u.txout_tx_hash),
96                 prevtx.tx_id,
97                 COALESCE(txout.txout_pos, u.txout_pos),
98                 pubkey.pubkey_hash
99               FROM txin
100               LEFT JOIN txout ON (txout.txout_id = txin.txout_id)
101               LEFT JOIN pubkey ON (pubkey.pubkey_id = txout.pubkey_id)
102               LEFT JOIN tx prevtx ON (txout.tx_id = prevtx.tx_id)
103               LEFT JOIN unlinked_txin u ON (u.txin_id = txin.txin_id)
104              WHERE txin.tx_id = %d
105              ORDER BY txin.txin_pos
106              """%(tx_id,))
107
108     def get_address_out_rows(self, dbhash):
109         return self.safe_sql(""" SELECT
110                 b.block_nTime,
111                 cc.chain_id,
112                 b.block_height,
113                 1,
114                 b.block_hash,
115                 tx.tx_hash,
116                 tx.tx_id,
117                 txin.txin_pos,
118                 -prevout.txout_value
119               FROM chain_candidate cc
120               JOIN block b ON (b.block_id = cc.block_id)
121               JOIN block_tx ON (block_tx.block_id = b.block_id)
122               JOIN tx ON (tx.tx_id = block_tx.tx_id)
123               JOIN txin ON (txin.tx_id = tx.tx_id)
124               JOIN txout prevout ON (txin.txout_id = prevout.txout_id)
125               JOIN pubkey ON (pubkey.pubkey_id = prevout.pubkey_id)
126              WHERE pubkey.pubkey_hash = '%s'
127                AND cc.in_longest = 1"""%dbhash)
128
129     def get_address_out_rows_memorypool(self, dbhash):
130         return self.safe_sql(""" SELECT
131                 1,
132                 tx.tx_hash,
133                 tx.tx_id,
134                 txin.txin_pos,
135                 -prevout.txout_value
136               FROM tx 
137               JOIN txin ON (txin.tx_id = tx.tx_id)
138               JOIN txout prevout ON (txin.txout_id = prevout.txout_id)
139               JOIN pubkey ON (pubkey.pubkey_id = prevout.pubkey_id)
140              WHERE pubkey.pubkey_hash ='%s' """%(dbhash))
141
142     def get_address_in_rows(self, dbhash):
143         return self.safe_sql(""" SELECT
144                 b.block_nTime,
145                 cc.chain_id,
146                 b.block_height,
147                 0,
148                 b.block_hash,
149                 tx.tx_hash,
150                 tx.tx_id,
151                 txout.txout_pos,
152                 txout.txout_value
153               FROM chain_candidate cc
154               JOIN block b ON (b.block_id = cc.block_id)
155               JOIN block_tx ON (block_tx.block_id = b.block_id)
156               JOIN tx ON (tx.tx_id = block_tx.tx_id)
157               JOIN txout ON (txout.tx_id = tx.tx_id)
158               JOIN pubkey ON (pubkey.pubkey_id = txout.pubkey_id)
159              WHERE pubkey.pubkey_hash = '%s'
160                AND cc.in_longest = 1"""%(dbhash))
161
162     def get_address_in_rows_memorypool(self, dbhash):
163         return self.safe_sql( """ SELECT
164                 0,
165                 tx.tx_hash,
166                 tx.tx_id,
167                 txout.txout_pos,
168                 txout.txout_value
169               FROM tx
170               JOIN txout ON (txout.tx_id = tx.tx_id)
171               JOIN pubkey ON (pubkey.pubkey_id = txout.pubkey_id)
172              WHERE pubkey.pubkey_hash = '%s' """%(dbhash))
173
174     def get_txpoints(self, addr):
175         version, binaddr = decode_check_address(addr)
176         if binaddr is None:
177             return "err"
178         dbhash = self.binin(binaddr)
179         rows = []
180         rows += self.get_address_out_rows( dbhash )
181         rows += self.get_address_in_rows( dbhash )
182
183         txpoints = []
184         known_tx = []
185
186         for row in rows:
187             try:
188                 nTime, chain_id, height, is_in, blk_hash, tx_hash, tx_id, pos, value = row
189             except:
190                 print "cannot unpack row", row
191                 break
192             tx_hash = self.hashout_hex(tx_hash)
193             txpoint = {
194                     "nTime":    int(nTime),
195                     #"chain_id": int(chain_id),
196                     "height":   int(height),
197                     "is_in":    int(is_in),
198                     "blk_hash": self.hashout_hex(blk_hash),
199                     "tx_hash":  tx_hash,
200                     "tx_id":    int(tx_id),
201                     "pos":      int(pos),
202                     "value":    int(value),
203                     }
204
205             txpoints.append(txpoint)
206             known_tx.append(self.hashout_hex(tx_hash))
207
208
209         # todo: sort them really...
210         txpoints = sorted(txpoints, key=operator.itemgetter("nTime"))
211
212         # read memory pool
213         rows = []
214         rows += self.get_address_in_rows_memorypool( dbhash )
215         rows += self.get_address_out_rows_memorypool( dbhash )
216         for row in rows:
217             is_in, tx_hash, tx_id, pos, value = row
218             tx_hash = self.hashout_hex(tx_hash)
219             if tx_hash in known_tx:
220                 continue
221             #print "mempool", tx_hash
222             txpoint = {
223                     "nTime":    0,
224                     "chain_id": 1,
225                     "height":   0,
226                     "is_in":    int(is_in),
227                     "blk_hash": 'mempool',
228                     "tx_hash":  tx_hash,
229                     "tx_id":    int(tx_id),
230                     "pos":      int(pos),
231                     "value":    int(value),
232                     }
233             txpoints.append(txpoint)
234
235
236         for txpoint in txpoints:
237             tx_id = txpoint['tx_id']
238             
239             txinputs = []
240             inrows = self.get_tx_inputs(tx_id)
241             for row in inrows:
242                 _hash = self.binout(row[6])
243                 address = hash_to_address(chr(0), _hash)
244                 txinputs.append(address)
245             txpoint['inputs'] = txinputs
246             txoutputs = []
247             outrows = self.get_tx_outputs(tx_id)
248             for row in outrows:
249                 _hash = self.binout(row[6])
250                 address = hash_to_address(chr(0), _hash)
251                 txoutputs.append(address)
252             txpoint['outputs'] = txoutputs
253
254             # for all unspent inputs, I want their scriptpubkey. (actually I could deduce it from the address)
255             if not txpoint['is_in']:
256                 # detect if already redeemed...
257                 for row in outrows:
258                     if row[6] == dbhash: break
259                 else:
260                     raise
261                 #row = self.get_tx_output(tx_id,dbhash)
262                 # pos, script, value, o_hash, o_id, o_pos, binaddr = row
263                 # if not redeemed, we add the script
264                 if row:
265                     if not row[4]: txpoint['raw_scriptPubKey'] = row[1]
266
267
268         return txpoints
269
270
271     def get_status(self, addr):
272         # last block for an address
273         tx_points = self.get_txpoints(addr)
274         if not tx_points:
275             return None
276         else:
277             return tx_points[-1]['blk_hash']
278
279
280 def send_tx(tx):
281     import bitcoinrpc
282     conn = bitcoinrpc.connect_to_local()
283     try:
284         v = conn.importtransaction(tx)
285     except:
286         v = "error: transaction rejected by memorypool"
287     return v
288
289
290 def listen_thread(store):
291     s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
292     s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
293     s.bind((config.get('server','host'), config.getint('server','port')))
294     s.listen(1)
295     while not stopping:
296         conn, addr = s.accept()
297         thread.start_new_thread(client_thread, (addr, conn,))
298
299 def random_string(N):
300     import random, string
301     return ''.join(random.choice(string.ascii_uppercase + string.digits) for x in range(N))
302
303 def client_thread(ipaddr,conn):
304     #print "client thread", ipaddr
305     try:
306         ipaddr = ipaddr[0]
307         msg = ''
308         while 1:
309             d = conn.recv(1024)
310             msg += d
311             if d[-1]=='#':
312                 break
313
314         #print msg
315
316         try:
317             cmd, data = ast.literal_eval(msg[:-1])
318         except:
319             print "syntax error", repr(msg)
320             conn.close()
321             return
322
323         if cmd=='b':
324             out = "%d"%block_number
325         elif cmd=='session':
326             session_id = random_string(10)
327             try:
328                 addresses = ast.literal_eval(data)
329             except:
330                 print "error"
331                 conn.close()
332                 return
333
334             print time.asctime(), "session", ipaddr, session_id, addresses[0], len(addresses)
335
336             sessions[session_id] = {}
337             for a in addresses:
338                 sessions[session_id][a] = ''
339             out = repr( (session_id, config.get('server','banner').replace('\\n','\n') ) )
340             sessions_last_time[session_id] = time.time()
341
342         elif cmd=='poll': 
343             session_id = data
344             addresses = sessions.get(session_id)
345             if not addresses:
346                 print "session not found", ipaddr
347                 out = repr( (-1, {}))
348             else:
349                 sessions_last_time[session_id] = time.time()
350                 ret = {}
351                 for addr in addresses:
352                     status = store.get_status( addr )
353                     last_status = sessions[session_id].get( addr )
354                     if last_status != status:
355                         sessions[session_id][addr] = status
356                         ret[addr] = status
357                 out = repr( (block_number, ret ) )
358
359         elif cmd == 'h': 
360             # history
361             addr = data
362             h = store.get_txpoints( addr )
363             out = repr(h)
364
365         elif cmd == 'load': 
366             if config.get('server','password') == data:
367                 out = repr( len(sessions) )
368             else:
369                 out = 'wrong password'
370
371         elif cmd =='tx':        
372             out = send_tx(data)
373
374         elif cmd == 'stop':
375             global stopping
376             if config.get('server','password') == data:
377                 stopping = True
378                 out = 'ok'
379             else:
380                 out = 'wrong password'
381
382         elif cmd == 'peers':
383             out = repr(peer_list.values())
384
385         else:
386             out = None
387
388         if out:
389             #print ipaddr, cmd, len(out)
390             try:
391                 conn.send(out)
392             except:
393                 print "error, could not send"
394
395     finally:
396         conn.close()
397     
398
399 ds = BCDataStream.BCDataStream()
400
401
402 def memorypool_update(store):
403
404     conn = bitcoinrpc.connect_to_local()
405     try:
406         v = conn.getmemorypool()
407     except:
408         print "cannot contact bitcoin daemmon"
409         return
410     v = v['transactions']
411     for hextx in v:
412         ds.clear()
413         ds.write(hextx.decode('hex'))
414         tx = deserialize.parse_Transaction(ds)
415         tx['hash'] = util.double_sha256(tx['tx'])
416             
417         if store.tx_find_id_and_value(tx):
418             pass
419         else:
420             store.import_tx(tx, False)
421             #print tx['hash'][::-1].encode('hex')
422     store.commit()
423
424
425
426
427 def clean_session_thread():
428     while not stopping:
429         time.sleep(30)
430         t = time.time()
431         for k,t0 in sessions_last_time.items():
432             if t - t0 > 60:
433                 print "lost session",k
434                 sessions.pop(k)
435                 sessions_last_time.pop(k)
436             
437
438 def irc_thread():
439     global peer_list
440     NICK = 'E_'+random_string(10)
441     while not stopping:
442         try:
443             s = socket.socket()
444             s.connect(('irc.freenode.net', 6667))
445             s.send('USER '+config.get('server','host')+' '+NICK+' bla :'+NICK+'\n') 
446             s.send('NICK '+NICK+'\n')
447             s.send('JOIN #electrum\n')
448             t = 0
449             while not stopping:
450                 line = s.recv(2048)
451                 line = line.rstrip('\r\n')
452                 line = line.split()
453                 if line[0]=='PING': 
454                     s.send('PONG '+line[1]+'\n')
455                 elif '353' in line: # answer to /names
456                     k = line.index('353')
457                     try:
458                         k2 = line.index('366')
459                     except:
460                         continue
461                     for item in line[k+1:k2]:
462                         if item[0:2] == 'E_':
463                             s.send('USERHOST %s\n'%item)
464                 elif '302' in line: # answer to /userhost
465                     k = line.index('302')
466                     m = re.match( "^:(.*?)=\+~(.*?)@(.*?)$", line[k+2] )
467                     if m:
468                         name = m.group(1)
469                         host = m.group(2)
470                         ip = m.group(3)
471                         peer_list[name] = (ip,host)
472                 elif time.time() - t > 5*60:
473                     s.send('NAMES #electrum\n')
474                     t = time.time()
475         except:
476             traceback.print_exc(file=sys.stdout)
477         finally:
478             s.close()
479
480
481 import traceback
482
483
484 if __name__ == '__main__':
485
486     if len(sys.argv)>1:
487         cmd = sys.argv[1]
488         if cmd == 'load':
489             request = "('load','%s')#"%config.get('server','password')
490         elif cmd == 'peers':
491             request = "('peers','')#"
492         elif cmd == 'stop':
493             request = "('stop','%s')#"%config.get('server','password')
494
495         s = socket.socket( socket.AF_INET, socket.SOCK_STREAM)
496         s.connect((config.get('server','host'), config.getint('server','port')))
497         s.send( request )
498         out = ''
499         while 1:
500             msg = s.recv(1024)
501             if msg: out += msg
502             else: break
503         s.close()
504         print out
505         sys.exit(0)
506
507
508     print "starting Electrum server"
509     conf = DataStore.CONFIG_DEFAULTS
510     args, argv = readconf.parse_argv( [], conf)
511     args.dbtype= config.get('database','type')
512     args.connect_args = {'database' : config.get('database','database') }
513     store = MyStore(args)
514
515     thread.start_new_thread(listen_thread, (store,))
516     thread.start_new_thread(clean_session_thread, ())
517     thread.start_new_thread(irc_thread, ())
518
519     while not stopping:
520         try:
521             dblock.acquire()
522             store.catch_up()
523             memorypool_update(store)
524             block_number = store.get_block_number(1)
525             dblock.release()
526         except:
527             traceback.print_exc(file=sys.stdout)
528         time.sleep(10)
529
530     print "server stopped"
531