messages
[electrum-server.git] / server.py
1 #!/usr/bin/env python
2 # Copyright(C) 2011 thomasv@gitorious
3
4 # This program is free software: you can redistribute it and/or modify
5 # it under the terms of the GNU Affero General Public License as
6 # published by the Free Software Foundation, either version 3 of the
7 # License, or (at your option) any later version.
8 #
9 # This program is distributed in the hope that it will be useful, but
10 # WITHOUT ANY WARRANTY; without even the implied warranty of
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
12 # Affero General Public License for more details.
13 #
14 # You should have received a copy of the GNU Affero General Public
15 # License along with this program.  If not, see
16 # <http://www.gnu.org/licenses/agpl.html>.
17
18 """
19 Todo:
20    * server should check and return bitcoind status..
21    * improve txpoint sorting
22 """
23
24
25 import time, socket, operator, thread, ast, sys
26 import psycopg2, binascii
27 import bitcoinrpc
28
29 from Abe.abe import hash_to_address, decode_check_address
30 from Abe.DataStore import DataStore as Datastore_class
31 from Abe import DataStore, readconf, BCDataStream,  deserialize, util, base58
32
33 try:
34     f = open('/etc/electrum.conf','r')
35     data = f.read()
36     f.close()
37     HOST, PORT, SERVER_MESSAGE = ast.literal_eval(data)
38 except:
39     print "could not read /etc/electrum.conf"
40     SERVER_MESSAGE = "Welcome to Electrum"
41     HOST = 'ecdsa.org'
42     PORT = 50000
43
44
45
46 sessions = {}
47 dblock = thread.allocate_lock()
48
49
50 class MyStore(Datastore_class):
51
52     def safe_sql(self,sql):
53         try:
54             dblock.acquire()
55             ret = self.selectall(sql)
56             dblock.release()
57             return ret
58         except:
59             print "sql error", sql
60             return []
61
62     def get_tx_outputs(self, tx_id):
63         return self.safe_sql("""SELECT
64                 txout.txout_pos,
65                 txout.txout_scriptPubKey,
66                 txout.txout_value,
67                 nexttx.tx_hash,
68                 nexttx.tx_id,
69                 txin.txin_pos,
70                 pubkey.pubkey_hash
71               FROM txout
72               LEFT JOIN txin ON (txin.txout_id = txout.txout_id)
73               LEFT JOIN pubkey ON (pubkey.pubkey_id = txout.pubkey_id)
74               LEFT JOIN tx nexttx ON (txin.tx_id = nexttx.tx_id)
75              WHERE txout.tx_id = %d 
76              ORDER BY txout.txout_pos
77         """%(tx_id))
78
79     def get_tx_inputs(self, tx_id):
80         return self.safe_sql(""" SELECT
81                 txin.txin_pos,
82                 txin.txin_scriptSig,
83                 txout.txout_value,
84                 COALESCE(prevtx.tx_hash, u.txout_tx_hash),
85                 prevtx.tx_id,
86                 COALESCE(txout.txout_pos, u.txout_pos),
87                 pubkey.pubkey_hash
88               FROM txin
89               LEFT JOIN txout ON (txout.txout_id = txin.txout_id)
90               LEFT JOIN pubkey ON (pubkey.pubkey_id = txout.pubkey_id)
91               LEFT JOIN tx prevtx ON (txout.tx_id = prevtx.tx_id)
92               LEFT JOIN unlinked_txin u ON (u.txin_id = txin.txin_id)
93              WHERE txin.tx_id = %d
94              ORDER BY txin.txin_pos
95              """%(tx_id,))
96
97     def get_address_out_rows(self, dbhash):
98         return self.safe_sql(""" SELECT
99                 b.block_nTime,
100                 cc.chain_id,
101                 b.block_height,
102                 1,
103                 b.block_hash,
104                 tx.tx_hash,
105                 tx.tx_id,
106                 txin.txin_pos,
107                 -prevout.txout_value
108               FROM chain_candidate cc
109               JOIN block b ON (b.block_id = cc.block_id)
110               JOIN block_tx ON (block_tx.block_id = b.block_id)
111               JOIN tx ON (tx.tx_id = block_tx.tx_id)
112               JOIN txin ON (txin.tx_id = tx.tx_id)
113               JOIN txout prevout ON (txin.txout_id = prevout.txout_id)
114               JOIN pubkey ON (pubkey.pubkey_id = prevout.pubkey_id)
115              WHERE pubkey.pubkey_hash = '%s'
116                AND cc.in_longest = 1"""%dbhash)
117
118     def get_address_out_rows_memorypool(self, dbhash):
119         return self.safe_sql(""" SELECT
120                 1,
121                 tx.tx_hash,
122                 tx.tx_id,
123                 txin.txin_pos,
124                 -prevout.txout_value
125               FROM tx 
126               JOIN txin ON (txin.tx_id = tx.tx_id)
127               JOIN txout prevout ON (txin.txout_id = prevout.txout_id)
128               JOIN pubkey ON (pubkey.pubkey_id = prevout.pubkey_id)
129              WHERE pubkey.pubkey_hash ='%s' """%(dbhash))
130
131     def get_address_in_rows(self, dbhash):
132         return self.safe_sql(""" SELECT
133                 b.block_nTime,
134                 cc.chain_id,
135                 b.block_height,
136                 0,
137                 b.block_hash,
138                 tx.tx_hash,
139                 tx.tx_id,
140                 txout.txout_pos,
141                 txout.txout_value
142               FROM chain_candidate cc
143               JOIN block b ON (b.block_id = cc.block_id)
144               JOIN block_tx ON (block_tx.block_id = b.block_id)
145               JOIN tx ON (tx.tx_id = block_tx.tx_id)
146               JOIN txout ON (txout.tx_id = tx.tx_id)
147               JOIN pubkey ON (pubkey.pubkey_id = txout.pubkey_id)
148              WHERE pubkey.pubkey_hash = '%s'
149                AND cc.in_longest = 1"""%(dbhash))
150
151     def get_address_in_rows_memorypool(self, dbhash):
152         return self.safe_sql( """ SELECT
153                 0,
154                 tx.tx_hash,
155                 tx.tx_id,
156                 txout.txout_pos,
157                 txout.txout_value
158               FROM tx
159               JOIN txout ON (txout.tx_id = tx.tx_id)
160               JOIN pubkey ON (pubkey.pubkey_id = txout.pubkey_id)
161              WHERE pubkey.pubkey_hash = '%s' """%(dbhash))
162
163     def get_txpoints(self, addr):
164         version, binaddr = decode_check_address(addr)
165         if binaddr is None:
166             return "err"
167         dbhash = self.binin(binaddr)
168         rows = []
169         rows += self.get_address_out_rows( dbhash )
170         rows += self.get_address_in_rows( dbhash )
171
172         txpoints = []
173         known_tx = []
174
175         for row in rows:
176             try:
177                 nTime, chain_id, height, is_in, blk_hash, tx_hash, tx_id, pos, value = row
178             except:
179                 print "cannot unpack row", row
180                 break
181             tx_hash = self.hashout_hex(tx_hash)
182             txpoint = {
183                     "nTime":    int(nTime),
184                     #"chain_id": int(chain_id),
185                     "height":   int(height),
186                     "is_in":    int(is_in),
187                     "blk_hash": self.hashout_hex(blk_hash),
188                     "tx_hash":  tx_hash,
189                     "tx_id":    int(tx_id),
190                     "pos":      int(pos),
191                     "value":    int(value),
192                     }
193
194             txpoints.append(txpoint)
195             known_tx.append(self.hashout_hex(tx_hash))
196
197
198         # todo: sort them really...
199         txpoints = sorted(txpoints, key=operator.itemgetter("nTime"))
200
201         # read memory pool
202         rows = []
203         rows += self.get_address_in_rows_memorypool( dbhash )
204         rows += self.get_address_out_rows_memorypool( dbhash )
205         for row in rows:
206             is_in, tx_hash, tx_id, pos, value = row
207             tx_hash = self.hashout_hex(tx_hash)
208             if tx_hash in known_tx:
209                 continue
210             #print "mempool", tx_hash
211             txpoint = {
212                     "nTime":    0,
213                     "chain_id": 1,
214                     "height":   0,
215                     "is_in":    int(is_in),
216                     "blk_hash": 'mempool',
217                     "tx_hash":  tx_hash,
218                     "tx_id":    int(tx_id),
219                     "pos":      int(pos),
220                     "value":    int(value),
221                     }
222             txpoints.append(txpoint)
223
224
225         for txpoint in txpoints:
226             tx_id = txpoint['tx_id']
227             
228             txinputs = []
229             inrows = self.get_tx_inputs(tx_id)
230             for row in inrows:
231                 _hash = self.binout(row[6])
232                 address = hash_to_address(chr(0), _hash)
233                 txinputs.append(address)
234             txpoint['inputs'] = txinputs
235             txoutputs = []
236             outrows = self.get_tx_outputs(tx_id)
237             for row in outrows:
238                 _hash = self.binout(row[6])
239                 address = hash_to_address(chr(0), _hash)
240                 txoutputs.append(address)
241             txpoint['outputs'] = txoutputs
242
243             # for all unspent inputs, I want their scriptpubkey. (actually I could deduce it from the address)
244             if not txpoint['is_in']:
245                 # detect if already redeemed...
246                 for row in outrows:
247                     if row[6] == dbhash: break
248                 else:
249                     raise
250                 #row = self.get_tx_output(tx_id,dbhash)
251                 # pos, script, value, o_hash, o_id, o_pos, binaddr = row
252                 # if not redeemed, we add the script
253                 if row:
254                     if not row[4]: txpoint['raw_scriptPubKey'] = row[1]
255
256
257         return txpoints
258
259
260     def get_status(self, addr):
261         # last block for an address
262         tx_points = self.get_txpoints(addr)
263         if not tx_points:
264             return None
265         else:
266             return tx_points[-1]['blk_hash']
267
268
269 def send_tx(tx):
270     import bitcoinrpc
271     conn = bitcoinrpc.connect_to_local()
272     try:
273         v = conn.importtransaction(tx)
274     except:
275         v = "error: transaction rejected by memorypool"
276     return v
277
278
279 def listen_thread(store):
280
281     s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
282     s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
283     s.bind((HOST, PORT))
284     s.listen(1)
285     while True:
286         conn, addr = s.accept()
287         thread.start_new_thread(client_thread, (addr, conn,))
288
289
290 def client_thread(ipaddr,conn):
291     #print "client thread", ipaddr
292     try:
293         ipaddr = ipaddr[0]
294         msg = ''
295         while 1:
296             d = conn.recv(1024)
297             msg += d
298             if d[-1]=='#':
299                 break
300
301         #print msg
302
303         try:
304             cmd, data = ast.literal_eval(msg[:-1])
305         except:
306             print "syntax error", repr(msg)
307             conn.close()
308             return
309
310         if cmd=='b':
311             out = "%d"%store.get_block_number(1)
312
313         elif cmd=='session':
314             import random, string
315             session_id = ''.join(random.choice(string.ascii_uppercase + string.digits) for x in range(10))
316             try:
317                 addresses = ast.literal_eval(data)
318             except:
319                 print "error"
320                 conn.close()
321                 return
322
323             print time.asctime(), "session", ipaddr, session_id, addresses[0], len(addresses)
324
325             sessions[session_id] = {}
326             for a in addresses:
327                 sessions[session_id][a] = ''
328             out = repr( (session_id, SERVER_MESSAGE) )
329
330         elif cmd=='poll': 
331             session_id = data
332             addresses = sessions.get(session_id)
333             if not addresses:
334                 print "session not found", ipaddr
335                 out = repr( (-1, {}))
336             else:
337                 ret = {}
338                 for addr in addresses:
339                     status = store.get_status( addr )
340                     last_status = sessions[session_id].get( addr )
341                     if last_status != status:
342                         sessions[session_id][addr] = status
343                         ret[addr] = status
344                 out = repr( (store.get_block_number(1), ret ) )
345
346         elif cmd == 'h': 
347             # history
348             addr = data
349             h = store.get_txpoints( addr )
350             out = repr(h)
351
352         elif cmd =='tx':        
353             # transaction
354             out = send_tx(data)
355         else:
356             out = None
357
358         if out:
359             #print ipaddr, cmd, len(out)
360             try:
361                 conn.send(out)
362             except:
363                 print "error, could not send"
364
365     finally:
366         conn.close()
367     
368
369 ds = BCDataStream.BCDataStream()
370
371
372 def memorypool_update(store):
373
374     conn = bitcoinrpc.connect_to_local()
375     try:
376         v = conn.getmemorypool()
377     except:
378         print "cannot contact bitcoin daemmon"
379         return
380     v = v['transactions']
381     for hextx in v:
382         ds.clear()
383         ds.write(hextx.decode('hex'))
384         tx = deserialize.parse_Transaction(ds)
385         tx['hash'] = util.double_sha256(tx['tx'])
386             
387         if store.tx_find_id_and_value(tx):
388             pass
389         else:
390             store.import_tx(tx, False)
391             #print tx['hash'][::-1].encode('hex')
392     store.commit()
393
394 import traceback
395
396
397 if __name__ == '__main__':
398
399     print "starting Electrum server"
400     conf = DataStore.CONFIG_DEFAULTS
401     args, argv = readconf.parse_argv( [], conf)
402     args.dbtype='psycopg2'
403     args.connect_args = {"database":"abe"}
404     store = MyStore(args)
405
406     thread.start_new_thread(listen_thread, (store,))
407     #listen_thread(store)
408     #exit(0)
409
410     while True:
411         try:
412             dblock.acquire()
413             store.catch_up()
414             memorypool_update(store)
415             dblock.release()
416         except:
417             traceback.print_exc(file=sys.stdout)
418             print "continuing..."
419         time.sleep(10)
420