config.set('server', 'port', 50000)
config.set('server', 'password', '')
config.set('server', 'irc', 'yes')
-config.set('server', 'cache', 'no')
config.set('server', 'ircname', 'Electrum server')
config.add_section('database')
config.set('database', 'type', 'psycopg2')
from Queue import Queue
input_queue = Queue()
output_queue = Queue()
+address_queue = Queue()
class MyStore(Datastore_class):
def import_tx(self, tx, is_coinbase):
tx_id = super(MyStore, self).import_tx(tx, is_coinbase)
- if config.get('server', 'cache') == 'yes': self.update_tx_cache(tx_id)
+ self.update_tx_cache(tx_id)
def update_tx_cache(self, txid):
inrows = self.get_tx_inputs(txid, False)
if self.tx_cache.has_key(address):
print "cache: invalidating", address
self.tx_cache.pop(address)
+ address_queue.put(address)
+
outrows = self.get_tx_outputs(txid, False)
for row in outrows:
_hash = store.binout(row[6])
if self.tx_cache.has_key(address):
print "cache: invalidating", address
self.tx_cache.pop(address)
+ address_queue.put(address)
def safe_sql(self,sql, params=(), lock=True):
try:
def get_history(self, addr):
- if config.get('server','cache') == 'yes':
- cached_version = self.tx_cache.get( addr )
- if cached_version is not None:
- return cached_version
+ cached_version = self.tx_cache.get( addr )
+ if cached_version is not None:
+ return cached_version
version, binaddr = decode_check_address(addr)
if binaddr is None:
tx_hash = self.hashout_hex(tx_hash)
txpoint = {
"nTime": int(nTime),
- #"chain_id": int(chain_id),
"height": int(height),
"is_in": int(is_in),
"blk_hash": self.hashout_hex(blk_hash),
#print "mempool", tx_hash
txpoint = {
"nTime": 0,
- #"chain_id": 1,
"height": 0,
"is_in": int(is_in),
"blk_hash": 'mempool',
if not row[4]: txpoint['raw_scriptPubKey'] = row[1]
# cache result
- if config.get('server','cache') == 'yes' and not address_has_mempool:
+ if not address_has_mempool:
self.tx_cache[addr] = txpoints
return txpoints
return out
+
+def do_update_address(addr):
+ # an address was involved in a transaction; we check if it was subscribed to in a session
+ # the address can be subscribed in several sessions; the cache should ensure that we don't do redundant requests
+ for session_id in sessions.keys():
+ session = sessions[session_id]
+ if session.get('type') != 'subscribe': continue
+ addresses = session['addresses'].keys()
+
+ if addr in addresses:
+ print "address ", addr, "is watched by", session_id
+ status = get_address_status( addr )
+ last_status = session['addresses'][addr]
+ if last_status != status:
+ print "sending new status for %s:"%addr, status
+ send_status(session_id,addr,status)
+ sessions[session_id]['addresses'][addr] = status
+
+
def get_address_status(addr):
- # get addtess status, i.e. the last block for that address.
+ # get address status, i.e. the last block for that address.
tx_points = store.get_history(addr)
if not tx_points:
status = None
out = json.dumps( {'method':'numblocks.subscribe', 'result':block_number} )
output_queue.put((session_id, out))
+def send_status(session_id, address, status):
+ out = json.dumps( { 'method':'address.subscribe', 'address':address, 'status':status } )
+ output_queue.put((session_id, out))
+
def subscribe_to_numblocks(session_id):
sessions_sub_numblocks.append(session_id)
send_numblocks(session_id)
def subscribe_to_address(session_id, address):
- #print "%s subscribing to %s"%(session_id,address)
status = get_address_status(address)
sessions[session_id]['type'] = 'subscribe'
sessions[session_id]['addresses'][address] = status
sessions[session_id]['last_time'] = time.time()
- out = json.dumps( { 'method':'address.subscribe', 'address':address, 'status':status } )
- output_queue.put((session_id, out))
+ send_status(session_id, address, status)
def new_session(version, addresses):
session_id = random_string(10)
traceback.print_exc(file=sys.stdout)
+def close_session(session_id):
+ print "lost connection", session_id
+ sessions.pop(session_id)
+ if session_id in sessions_sub_numblocks:
+ sessions_sub_numblocks.remove(session_id)
+
+
# one thread per client. put requests in a queue.
def tcp_client_thread(ipaddr,conn):
""" use a persistent connection. put commands in a queue."""
msg = ''
while not stopping:
- d = conn.recv(1024)
+ try:
+ d = conn.recv(1024)
+ except socket.error:
+ d = ''
+ if not d:
+ close_session(session_id)
+ break
+
msg += d
- if not d: break
while True:
s = msg.find('\n')
if s ==-1:
break
else:
- c = msg[0:s]
+ c = msg[0:s].strip()
msg = msg[s+1:]
- c = json.loads(c)
+ if c == 'quit':
+ conn.close()
+ close_session(session_id)
+ return
+ try:
+ c = json.loads(c)
+ except:
+ print "json error", repr(c)
+ continue
try:
cmd = c['method']
data = c['params']
input_queue.put((session_id, cmd, data))
+
# read commands from the input queue. perform requests, etc. this should be called from the main thread.
def process_input_queue():
while not stopping:
session_id, cmd, data = input_queue.get()
+ if session_id not in sessions.keys():
+ continue
out = None
if cmd == 'address.subscribe':
subscribe_to_address(session_id,data)
sessions[session_id]['version'] = data
elif cmd == 'server.banner':
out = json.dumps( { 'method':'server.banner', 'result':config.get('server','banner').replace('\\n','\n') } )
+ elif cmd == 'server.peers':
+ out = json.dumps( { 'method':'server.peers', 'result':peer_list.values() } )
elif cmd == 'address.get_history':
address = data
out = json.dumps( { 'method':'address.get_history', 'address':address, 'result':store.get_history( address ) } )
elif cmd == 'transaction.broadcast':
- out = json.dumps( { 'method':'transaction.broadcast', 'result':send_tx(data) } )
+ txo = send_tx(data)
+ print "sent tx:", txo
+ out = json.dumps( { 'method':'transaction.broadcast', 'result':txo } )
else:
print "unknown command", cmd
if out:
session_id, out = output_queue.get()
session = sessions.get(session_id)
if session:
- conn = session.get('conn')
- conn.send(out+'\n')
+ try:
+ conn = session.get('conn')
+ conn.send(out+'\n')
+ except:
+ close_session(session_id)
+
def memorypool_update(store):
+ """ when a tx is removed from memory pool, I need to notify subscribers"""
+
ds = BCDataStream.BCDataStream()
+ previous_transactions = store.mempool_keys
store.mempool_keys = []
postdata = dumps({"method": 'getmemorypool', 'params': [], 'id':'jsonrpc'})
ds.write(hextx.decode('hex'))
tx = deserialize.parse_Transaction(ds)
tx['hash'] = util.double_sha256(tx['tx'])
- tx_hash = tx['hash'][::-1].encode('hex')
+ tx_hash = store.hashin(tx['hash'])
+
store.mempool_keys.append(tx_hash)
if store.tx_find_id_and_value(tx):
pass
else:
store.import_tx(tx, False)
-
store.commit()
+ for tx_hash in previous_transactions:
+ if tx_hash not in store.mempool_keys:
+ tx = { 'hash':store.hashout(tx_hash) }
+ tx_id = store.tx_find_id_and_value(tx)
+ if tx_id:
+ store.update_tx_cache(tx_id)
def clean_session_thread():
time.sleep(30)
t = time.time()
for k,s in sessions.items():
+ if s.get('type') == 'subscribe': continue
t0 = s['last_time']
if t - t0 > 5*60:
sessions.pop(k)
+ print "lost session", k
def irc_thread():
print "starting Electrum server"
- print "cache:", config.get('server', 'cache')
conf = DataStore.CONFIG_DEFAULTS
args, argv = readconf.parse_argv( [], conf)
thread.start_new_thread(http_server_thread, (store,))
thread.start_new_thread(clean_session_thread, ())
+
if (config.get('server','irc') == 'yes' ):
thread.start_new_thread(irc_thread, ())
block_number = 0
finally:
dblock.release()
+
+ # do addresses
+ while True:
+ try:
+ addr = address_queue.get(False)
+ except:
+ break
+ do_update_address(addr)
+
time.sleep(10)
print "server stopped"