self.address_queue = Queue()
self.dblock = thread.allocate_lock()
- self.block_number = -1
- self.watched_addresses = []
def import_block(self, b, chain_ids=frozenset()):
+ #print "import block"
block_id = super(AbeStore, self).import_block(b, chain_ids)
for pos in xrange(len(b['transactions'])):
tx = b['transactions'][pos]
return block_number
- def watch_address(self, addr):
- if addr not in self.watched_addresses:
- self.watched_addresses.append(addr)
- def run(self, processor):
-
- old_block_number = None
- while not processor.shared.stopped():
- self.block_number = self.main_iteration()
+from processor import Processor
- if self.block_number != old_block_number:
- old_block_number = self.block_number
- processor.push_response({ 'method':'blockchain.numblocks.subscribe', 'result':self.block_number })
+class AbeProcessor(Processor):
- while True:
- try:
- addr = self.address_queue.get(False)
- except:
- break
- if addr in self.watched_addresses:
- status = self.get_status( addr )
- processor.push_response({ 'method':'blockchain.address.subscribe', 'params':[addr], 'result':status })
-
- time.sleep(10)
-
-
-
-class AbeBackend:
-
- def __init__(self,config, processor):
+ def __init__(self, config):
+ Processor.__init__(self)
self.store = AbeStore(config)
- self.store.processor = processor
- thread.start_new_thread(self.store.run,(processor,))
+ self.block_number = -1
+ self.watched_addresses = []
- def process(self, request, queue):
+ def process(self, request):
message_id = request['id']
method = request['method']
params = request.get('params',[])
result = ''
if method == 'blockchain.numblocks.subscribe':
- result = self.store.block_number
+ result = self.block_number
elif method == 'blockchain.address.subscribe':
address = params[0]
- self.store.watch_address(address)
+ self.watch_address(address)
status = self.store.get_status(address)
result = status
elif method == 'blockchain.address.get_history':
if result != '':
response = { 'id':message_id, 'method':method, 'params':params, 'result':result }
- queue.put(response)
+ self.push_response(response)
+
+
+ def watch_address(self, addr):
+ if addr not in self.watched_addresses:
+ self.watched_addresses.append(addr)
+
+
+ def run(self):
+
+ old_block_number = None
+ while not self.shared.stopped():
+ self.block_number = self.store.main_iteration()
+
+ if self.block_number != old_block_number:
+ old_block_number = self.block_number
+ self.push_response({ 'method':'blockchain.numblocks.subscribe', 'result':self.block_number })
+
+ while True:
+ try:
+ addr = self.store.address_queue.get(False)
+ except:
+ break
+ if addr in self.watched_addresses:
+ status = self.get_status( addr )
+ self.push_response({ 'method':'blockchain.address.subscribe', 'params':[addr], 'result':status })
+
+ time.sleep(10)