self.address_queue = Queue()
self.dblock = thread.allocate_lock()
- self.block_number = -1
- self.watched_addresses = []
def import_block(self, b, chain_ids=frozenset()):
+ #print "import block"
block_id = super(AbeStore, self).import_block(b, chain_ids)
for pos in xrange(len(b['transactions'])):
tx = b['transactions'][pos]
return block_number
- def watch_address(self, addr):
- if addr not in self.watched_addresses:
- self.watched_addresses.append(addr)
- def run(self, processor):
-
- old_block_number = None
- while not processor.shared.stopped():
- self.block_number = self.main_iteration()
+from processor import Processor
- if self.block_number != old_block_number:
- old_block_number = self.block_number
- processor.push_response({ 'method':'blockchain.numblocks.subscribe', 'result':self.block_number })
+class AbeProcessor(Processor):
- while True:
- try:
- addr = self.address_queue.get(False)
- except:
- break
- if addr in self.watched_addresses:
- status = self.get_status( addr )
- processor.push_response({ 'method':'blockchain.address.subscribe', 'params':[addr], 'result':status })
-
- time.sleep(10)
-
-
-
-class AbeBackend:
-
- def __init__(self,config, processor):
+ def __init__(self, config):
+ Processor.__init__(self)
self.store = AbeStore(config)
- self.store.processor = processor
- thread.start_new_thread(self.store.run,(processor,))
+ self.block_number = -1
+ self.watched_addresses = []
- def process(self, request, queue):
+ def process(self, request):
message_id = request['id']
method = request['method']
params = request.get('params',[])
result = ''
if method == 'blockchain.numblocks.subscribe':
- result = self.store.block_number
+ result = self.block_number
elif method == 'blockchain.address.subscribe':
address = params[0]
- self.store.watch_address(address)
+ self.watch_address(address)
status = self.store.get_status(address)
result = status
elif method == 'blockchain.address.get_history':
if result != '':
response = { 'id':message_id, 'method':method, 'params':params, 'result':result }
- queue.put(response)
+ self.push_response(response)
+
+
+ def watch_address(self, addr):
+ if addr not in self.watched_addresses:
+ self.watched_addresses.append(addr)
+
+
+ def run(self):
+
+ old_block_number = None
+ while not self.shared.stopped():
+ self.block_number = self.store.main_iteration()
+
+ if self.block_number != old_block_number:
+ old_block_number = self.block_number
+ self.push_response({ 'method':'blockchain.numblocks.subscribe', 'result':self.block_number })
+
+ while True:
+ try:
+ addr = self.store.address_queue.get(False)
+ except:
+ break
+ if addr in self.watched_addresses:
+ status = self.get_status( addr )
+ self.push_response({ 'method':'blockchain.address.subscribe', 'params':[addr], 'result':status })
+
+ time.sleep(10)
import random, string
return ''.join(random.choice(string.ascii_uppercase + string.digits) for x in range(N))
+from processor import Processor
-class Irc(threading.Thread):
+class ServerProcessor(Processor):
- def __init__(self, processor, host, nick):
- self.processor = processor
- threading.Thread.__init__(self)
+ def __init__(self, config):
+ Processor.__init__(self)
self.daemon = True
self.peers = {}
- self.host = host
- self.nick = nick
+ self.banner = config.get('server','banner')
+ self.host = config.get('server','host')
+ self.nick = config.get('server','ircname')
+ self.irc = config.get('server','irc') == 'yes'
def get_peers(self):
return self.peers.values()
def run(self):
+ if not self.irc:
+ return
NICK = 'E_'+random_string(10)
- while not self.processor.shared.stopped():
+ while not self.shared.stopped():
try:
s = socket.socket()
s.connect(('irc.freenode.net', 6667))
s.send('JOIN #electrum\n')
sf = s.makefile('r', 0)
t = 0
- while not self.processor.shared.stopped():
+ while not self.shared.stopped():
line = sf.readline()
line = line.rstrip('\r\n')
line = line.split()
host = line[k+9]
self.peers[name] = (ip,host)
if time.time() - t > 5*60:
- self.processor.push_response({'method':'server.peers', 'result':[self.get_peers()]})
+ self.push_response({'method':'server.peers', 'result':[self.get_peers()]})
s.send('NAMES #electrum\n')
t = time.time()
self.peers = {}
s.close()
-class ServerBackend:
-
- def __init__(self, config, processor):
- self.banner = config.get('server','banner')
- self.irc = Irc(processor, config.get('server','host'), config.get('server','ircname'))
- self.irc.processor = processor
- if (config.get('server','irc') == 'yes' ):
- self.irc.start()
- def process(self, request, queue):
+ def process(self, request):
method = request['method']
result = ''
if method == 'server.banner':
result = self.banner.replace('\\n','\n')
elif method == 'server.peers.subscribe':
- result = self.irc.get_peers()
+ result = self.get_peers()
else:
print "unknown method", request
if result!='':
response = { 'id':request['id'], 'method':method, 'params':request['params'], 'result':result }
- queue.put(response)
+ self.push_response(response)
class NumblocksSubscribe:
- def __init__(self, backend):
+ def __init__(self, backend, processor):
self.backend = backend
+ self.processor = processor
self.lock = threading.Lock()
self.backend.blockchain.subscribe_reorganize(self.reorganize)
self.backend.blockchain.fetch_last_depth(self.set_last_depth)
def reorganize(self, ec, fork_point, arrivals, replaced):
latest = fork_point + len(arrivals)
self.latest.set(latest)
- self.push_response({"method":"numblocks.subscribe", "result": latest})
+ self.processor.push_response({"method":"numblocks.subscribe", "result": latest})
self.backend.blockchain.subscribe_reorganize(self.reorganize)
class AddressGetHistory:
- def __init__(self, backend):
+ def __init__(self, backend, processor):
self.backend = backend
+ self.processor = processor
def get(self, request):
address = str(request["params"])
bitcoin.bind(self.respond, request, bitcoin._1))
def respond(self, request, result):
- self.push_response({"id": request["id"], "method":request["method"], "params":request["params"], "result": result})
+ self.processor.push_response({"id": request["id"], "method":request["method"], "params":request["params"], "result": result})
+
class LibbitcoinProcessor(Processor):
- def __init__(self):
- self.backend = Backend()
- self.numblocks_subscribe = NumblocksSubscribe(self.backend)
- self.address_get_history = AddressGetHistory(self.backend)
+ def __init__(self, config):
Processor.__init__(self)
+ self.backend = Backend()
+ self.numblocks_subscribe = NumblocksSubscribe(self.backend, self)
+ self.address_get_history = AddressGetHistory(self.backend, self)
def stop(self):
self.backend.stop()
response = {"id": request["id"], "method": request["method"], "params":request["params"], "result": tx_hash}
self.push_response(response)
-
-
-def run(processor):
- #processor = LibbitcoinProcessor()
- print "Warning: pre-alpha prototype. Full of bugs."
- while not processor.shared.stopped():
- if raw_input() == "quit":
- shared.stop()
- time.sleep(1)
+ def run(self):
+ # this class is a thread. it does nothing in this example.
+ print "Warning: pre-alpha prototype. Full of bugs."
+ while not self.shared.stopped():
+ time.sleep(1)
class Processor(threading.Thread):
def __init__(self):
- self.shared = None
+ threading.Thread.__init__(self)
+ self.daemon = True
+ self.dispatcher = None
+
+ def process(self, request):
+ pass
+
+ def push_response(self, response):
+ self.dispatcher.request_dispatcher.push_response(response)
+
+
+
+class Dispatcher:
+
+ def __init__(self):
+ self.shared = Shared()
+ self.request_dispatcher = RequestDispatcher(self.shared)
+ self.request_dispatcher.start()
+ self.response_dispatcher = ResponseDispatcher(self.shared, self.request_dispatcher)
+ self.response_dispatcher.start()
+
+ def register(self, prefix, processor):
+ processor.dispatcher = self
+ processor.shared = self.shared
+ processor.start()
+ self.request_dispatcher.processors[prefix] = processor
+
+
+
+class RequestDispatcher(threading.Thread):
+
+ def __init__(self, shared):
+ self.shared = shared
threading.Thread.__init__(self)
self.daemon = True
self.request_queue = queue.Queue()
self.internal_id += 1
return r
- def register(self, prefix, function):
- self.processors[prefix] = function
-
def run(self):
if self.shared is None:
raise TypeError("self.shared not set in Processor")
# dispatch request to the relevant module..
prefix = request['method'].split('.')[0]
try:
- func = self.processors[prefix]
+ p = self.processors[prefix]
except:
print "error: no processor for", prefix
return
try:
- func(request,self.response_queue)
+ p.process(request)
except:
traceback.print_exc(file=sys.stdout)
self.subscriptions.append((method, params))
-class Dispatcher(threading.Thread):
+class ResponseDispatcher(threading.Thread):
def __init__(self, shared, processor):
self.shared = shared
# License along with this program. If not, see
# <http://www.gnu.org/licenses/agpl.html>.
-import time, json, socket, operator, thread, ast, sys, re, traceback
+import time, sys, traceback
import ConfigParser
-from json import dumps, loads
-import urllib
-import threading
config = ConfigParser.ConfigParser()
# set some defaults, which will be overwritten by the config file
except:
pass
-
password = config.get('server','password')
+host = config.get('server','host')
+use_libbitcoin = False
-from processor import Shared, Processor, Dispatcher
-
-
+from processor import Dispatcher
from transports.stratum_http import HttpServer
from transports.stratum_tcp import TcpServer
from transports.native import NativeServer
+from irc import ServerProcessor
+from abe_backend import AbeProcessor
-
-import irc
-import abe_backend
-from processor import Processor
-
-
-
-
-
+if use_libbitcoin:
+ from modules.python_bitcoin import LibBitcoinProcessor as BlockchainProcessor
+else:
+ from abe_backend import AbeProcessor as BlockchainProcessor
if __name__ == '__main__':
if len(sys.argv)>1:
import jsonrpclib
- server = jsonrpclib.Server('http://%s:8081'%config.get('server','host'))
+ server = jsonrpclib.Server('http://%s:8081'%host)
cmd = sys.argv[1]
if cmd == 'stop':
out = server.stop(password)
print out
sys.exit(0)
- processor = Processor()
- shared = Shared()
- # Bind shared to processor since constructor is user defined
- processor.shared = shared
- processor.start()
+ # Create hub
+ dispatcher = Dispatcher()
+ shared = dispatcher.shared
- abe = abe_backend.AbeBackend(config, processor)
- processor.register('blockchain', abe.process)
+ # Create and register processors
+ abe = BlockchainProcessor(config)
+ dispatcher.register('blockchain', abe)
- sb = irc.ServerBackend(config, processor)
- processor.register('server', sb.process)
-
- # dispatcher
- dispatcher = Dispatcher(shared, processor)
- dispatcher.start()
+ sb = ServerProcessor(config)
+ dispatcher.register('server', sb)
# Create various transports we need
- host = config.get('server','host')
- transports = [ NativeServer(shared, abe.store, sb.irc, config.get('server','banner'), host, 50000),
- TcpServer(shared, processor, host, 50001),
- HttpServer(shared, processor, host, 8081),
+ transports = [ NativeServer(shared, abe, sb, config.get('server','banner'), host, 50000),
+ TcpServer(dispatcher, host, 50001),
+ HttpServer(dispatcher, host, 8081),
]
for server in transports:
server.start()
while not shared.stopped():
time.sleep(1)
print "server stopped"
-
-import thread, threading, time, socket, traceback, ast
+import thread, threading, time, socket, traceback, ast, sys
class NativeServer(threading.Thread):
- def __init__(self, shared, store, irc, banner, host, port):
+ def __init__(self, shared, abe, irc, banner, host, port):
threading.Thread.__init__(self)
self.banner = banner
- self.store = store
+ self.abe = abe
+ self.store = abe.store
self.irc = irc
self.sessions = {}
self.host = host
self.sessions[session_id]['last_time'] = time.time()
ret, addresses = self.modified_addresses(session)
if ret: self.sessions[session_id]['addresses'] = addresses
- return repr( (self.store.block_number,ret))
+ return repr( (self.abe.block_number,ret))
def add_address_to_session(self, session_id, address):
def do_command(self, cmd, data, ipaddr):
if cmd=='b':
- out = "%d"%block_number
+ out = "%d"%self.abe.block_number
elif cmd in ['session','new_session']:
try:
self.pending_responses.append(response)
class HttpServer(threading.Thread):
- def __init__(self, shared, _processor, host, port):
- self.shared = shared
- self.processor = _processor
+ def __init__(self, dispatcher, host, port):
+ self.shared = dispatcher.shared
+ self.dispatcher = dispatcher.request_dispatcher
threading.Thread.__init__(self)
self.daemon = True
self.host = host
#print session, request
session = self.server.sessions.get(session_id)
if session:
- self.processor.process(session, request)
+ self.dispatcher.process(session, request)
def do_stop(self, session, request):
self.shared.stop()
import time
import Queue as queue
-from processor import Session, Dispatcher, Shared
+from processor import Session, Dispatcher
class TcpSession(Session):
class TcpClientRequestor(threading.Thread):
- def __init__(self, shared, processor, session):
- self.shared = shared
- self.processor = processor
+ def __init__(self, dispatcher, session):
+ self.shared = dispatcher.shared
+ self.dispatcher = dispatcher
self.message = ""
self.session = session
threading.Thread.__init__(self)
try:
command = json.loads(raw_command)
except:
- self.processor.push_response({"error": "bad JSON", "request": raw_command})
+ self.dispatcher.push_response({"error": "bad JSON", "request": raw_command})
return True
try:
method = command['method']
except KeyError:
# Return an error JSON in response.
- self.processor.push_response({"error": "syntax error", "request": raw_command})
+ self.dispatcher.push_response({"error": "syntax error", "request": raw_command})
else:
- self.processor.push_request(self.session,command)
+ self.dispatcher.push_request(self.session,command)
return True
class TcpServer(threading.Thread):
- def __init__(self, shared, processor, host, port):
- self.shared = shared
- self.processor = processor
+ def __init__(self, dispatcher, host, port):
+ self.shared = dispatcher.shared
+ self.dispatcher = dispatcher.request_dispatcher
threading.Thread.__init__(self)
self.daemon = True
self.host = host
sock.listen(1)
while not self.shared.stopped():
session = TcpSession(*sock.accept())
- client_req = TcpClientRequestor(self.shared, self.processor, session)
+ client_req = TcpClientRequestor(self.dispatcher, session)
client_req.start()
- self.processor.add_session(session)
- self.processor.collect_garbage()
+ self.dispatcher.add_session(session)
+ self.dispatcher.collect_garbage()