if self.block_number != old_block_number:
old_block_number = self.block_number
- processor.push_response({ 'method':'numblocks.subscribe', 'result':self.block_number })
+ processor.push_response({ 'method':'blockchain.numblocks.subscribe', 'result':self.block_number })
while True:
try:
break
if addr in self.watched_addresses:
status = self.get_status( addr )
- processor.push_response({ 'method':'address.subscribe', 'params':[addr], 'result':status })
+ processor.push_response({ 'method':'blockchain.address.subscribe', 'params':[addr], 'result':status })
time.sleep(10)
+
+
+
+class AbeBackend:
+
+ def __init__(self,config, processor):
+ self.store = AbeStore(config)
+ self.store.processor = processor
+ thread.start_new_thread(self.store.run,(processor,))
+
+ def process(self, request, queue):
+ message_id = request['id']
+ method = request['method']
+ params = request.get('params',[])
+ result = ''
+ if method == 'blockchain.numblocks.subscribe':
+ result = self.store.block_number
+ elif method == 'blockchain.address.subscribe':
+ address = params[0]
+ self.store.watch_address(address)
+ status = self.store.get_status(address)
+ result = status
+ elif method == 'blockchain.address.get_history':
+ address = params[0]
+ result = self.store.get_history( address )
+ elif method == 'blockchain.transaction.broadcast':
+ txo = self.store.send_tx(params[0])
+ print "sent tx:", txo
+ result = txo
+ else:
+ print "unknown method", request
+
+ if result != '':
+ response = { 'id':message_id, 'method':method, 'params':params, 'result':result }
+ queue.put(response)
+
+
+
finally:
sf.close()
s.close()
+
+
+class ServerBackend:
+
+ def __init__(self, config, processor):
+ self.banner = config.get('server','banner')
+ self.irc = Irc(processor, config.get('server','host'), config.get('server','ircname'))
+ self.irc.processor = processor
+ if (config.get('server','irc') == 'yes' ):
+ self.irc.start()
+
+ def process(self, request, queue):
+ method = request['method']
+
+ result = ''
+ if method == 'server.banner':
+ result = self.banner.replace('\\n','\n')
+ elif method == 'server.peers.subscribe':
+ result = self.irc.get_peers()
+ else:
+ print "unknown method", request
+
+ if result!='':
+ response = { 'id':request['id'], 'method':method, 'params':request['params'], 'result':result }
+ queue.put(response)
+
+
+
import bitcoin
-import stratum
+from processor import Processor
import threading
import time
self.backend.blockchain.subscribe_reorganize(self.reorganize)
self.backend.blockchain.fetch_last_depth(self.set_last_depth)
self.latest = GhostValue()
- self.subscribed = []
-
- def subscribe(self, session, request):
- last = self.latest.get()
- self.push_response(session,{"id": request["id"], "result": last})
- with self.lock:
- self.subscribed.append((session, request))
def set_last_depth(self, ec, last_depth):
if ec:
def reorganize(self, ec, fork_point, arrivals, replaced):
latest = fork_point + len(arrivals)
self.latest.set(latest)
- subscribed = self.spring_clean()
- for session, request in subscribed:
- self.push_response(session,{"id": request["id"], "result": latest})
+ self.push_response({"method":"numblocks.subscribe", "result": latest})
self.backend.blockchain.subscribe_reorganize(self.reorganize)
- def spring_clean(self):
- with self.lock:
- self.subscribed = [sub for sub in self.subscribed
- if not sub[0].stopped()]
- return self.subscribed[:]
class AddressGetHistory:
def __init__(self, backend):
self.backend = backend
- def get(self, session, request):
+ def get(self, request):
address = str(request["params"])
composed.payment_history(self.backend.blockchain, address,
- bitcoin.bind(self.respond, session, request, bitcoin._1))
+ bitcoin.bind(self.respond, request, bitcoin._1))
- def respond(self, session, request, result):
- self.push_response(session,{"id": request["id"], "result": result})
+ def respond(self, request, result):
+ self.push_response({"id": request["id"], "method":request["method"], "params":request["params"], "result": result})
-class LibbitcoinProcessor(stratum.Processor):
+class LibbitcoinProcessor(Processor):
def __init__(self):
self.backend = Backend()
self.numblocks_subscribe = NumblocksSubscribe(self.backend)
self.address_get_history = AddressGetHistory(self.backend)
- stratum.Processor.__init__(self)
+ Processor.__init__(self)
def stop(self):
self.backend.stop()
- def process(self, session, request):
+ def process(self, request):
print "New request (lib)", request
if request["method"] == "numblocks.subscribe":
self.numblocks_subscribe.subscribe(session, request)
elif request["method"] == "address.get_history":
- self.address_get_history.get(session, request)
+ self.address_get_history.get(request)
elif request["method"] == "server.banner":
- self.push_response(session, {"id": request["id"],
+ self.push_response({"id": request["id"], "method": request["method"], "params":request["params"],
"result": "libbitcoin using python-bitcoin bindings"})
elif request["method"] == "transaction.broadcast":
- self.broadcast_transaction(session, request)
+ self.broadcast_transaction(request)
# Execute and when ready, you call
- # self.push_response(session,response)
+ # self.push_response(response)
- def broadcast_transaction(self, session, request):
+ def broadcast_transaction(self, request):
raw_tx = bitcoin.data_chunk(str(request["params"]))
exporter = bitcoin.satoshi_exporter()
try:
tx = exporter.load_transaction(raw_tx)
except RuntimeError:
- response = {"id": request["id"], "result": None,
+ response = {"id": request["id"], "method": request["method"], "params":request["params"], "result": None,
"error": {"message":
"Exception while parsing the transaction data.",
"code": -4}}
else:
self.backend.protocol.broadcast_transaction(tx)
tx_hash = str(bitcoin.hash_transaction(tx))
- response = {"id": request["id"], "result": tx_hash}
- self.push_response(session,response)
+ response = {"id": request["id"], "method": request["method"], "params":request["params"], "result": tx_hash}
+ self.push_response(response)
+
+
-def run(stratum):
+def run(processor):
+ #processor = LibbitcoinProcessor()
print "Warning: pre-alpha prototype. Full of bugs."
- processor = LibbitcoinProcessor()
- stratum.start(processor)
+ while not processor.shared.stopped():
+ if raw_input() == "quit":
+ shared.stop()
+ time.sleep(1)
import socket
import threading
import time
+import traceback, sys
import Queue as queue
class Shared:
self.internal_id = 1
self.lock = threading.Lock()
self.sessions = []
+ self.processors = {}
def push_response(self, item):
self.response_queue.put(item)
self.internal_id += 1
return r
+ def register(self, prefix, function):
+ self.processors[prefix] = function
+
def run(self):
if self.shared is None:
raise TypeError("self.shared not set in Processor")
method = request['method']
params = request.get('params',[])
- if method in [ 'numblocks.subscribe', 'address.subscribe', 'server.peers']:
+ suffix = method.split('.')[-1]
+ if suffix == 'subscribe':
session.subscribe_to_service(method, params)
# store session and id locally
request['id'] = self.store_session_id(session, request['id'])
- self.process(request)
+
+ # dispatch request to the relevant module..
+ prefix = method.split('.')[0]
+ try:
+ func = self.processors[prefix]
+ except:
+ print "error: no processor for", prefix
+ continue
+
+ try:
+ func(request,self.response_queue)
+ except:
+ traceback.print_exc(file=sys.stdout)
+ continue
self.stop()
from stratum_http import HttpServer
from stratum import TcpServer
from native import NativeServer
-from irc import Irc
-from abe_backend import AbeStore
-
-class AbeProcessor(Processor):
- def process(self,request):
- message_id = request['id']
- method = request['method']
- params = request.get('params',[])
- #print request
-
- result = ''
- if method == 'numblocks.subscribe':
- result = store.block_number
- elif method == 'address.subscribe':
- address = params[0]
- store.watch_address(address)
- status = store.get_status(address)
- result = status
- elif method == 'client.version':
- #session.version = params[0]
- pass
- elif method == 'server.banner':
- result = config.get('server','banner').replace('\\n','\n')
- elif method == 'server.peers':
- result = irc.get_peers()
- elif method == 'address.get_history':
- address = params[0]
- result = store.get_history( address )
- elif method == 'transaction.broadcast':
- txo = store.send_tx(params[0])
- print "sent tx:", txo
- result = txo
- else:
- print "unknown method", request
- if result!='':
- response = { 'id':message_id, 'method':method, 'params':params, 'result':result }
- self.push_response(response)
- def get_status(self,addr):
- return store.get_status(addr)
+import irc
+import abe_backend
+from processor import Processor
print out
sys.exit(0)
- processor = AbeProcessor()
+ processor = Processor()
shared = Shared()
# Bind shared to processor since constructor is user defined
processor.shared = shared
processor.start()
- irc = Irc(processor, config.get('server','host'), config.get('server','ircname'))
- if (config.get('server','irc') == 'yes' ): irc.start()
+ abe = abe_backend.AbeBackend(config, processor)
+ processor.register('blockchain', abe.process)
- # backend
- store = AbeStore(config)
+ sb = irc.ServerBackend(config, processor)
+ processor.register('server', sb.process)
# dispatcher
dispatcher = Dispatcher(shared, processor)
dispatcher.start()
- host = config.get('server','host')
# Create various transports we need
- transports = [ NativeServer(shared, store, irc, config.get('server','banner'), host, 50000),
+ host = config.get('server','host')
+ transports = [ NativeServer(shared, abe.store, sb.irc, config.get('server','banner'), host, 50000),
TcpServer(shared, processor, host, 50001),
HttpServer(shared, processor, host, 8081),
]
server.start()
print "starting Electrum server on", host
- store.run(processor)
+ while not shared.stopped():
+ time.sleep(1)
print "server stopped"
self._connection = connection
self.address = address
Session.__init__(self)
+ print "New session", address
def connection(self):
if self.stopped():
-class Stratum:
-
- def start(self, processor):
- shared = Shared()
- # Bind shared to processor since constructor is user defined
- processor.shared = shared
- processor.start()
- # Create various transports we need
- transports = TcpServer(shared, processor, "176.31.24.241", 50001),
- for server in transports:
- server.start()
- while not shared.stopped():
- if raw_input() == "quit":
- shared.stop()
- time.sleep(1)
-
-if __name__ == "__main__":
- processor = Processor()
- app = Stratum()
- app.start(processor)
-