log = stratum.logger.get_logger('bitcoin_rpc')
class BitcoinRPC(object):
-
+
def __init__(self, host, port, username, password):
self.bitcoin_url = 'http://%s:%d' % (host, port)
self.credentials = base64.b64encode("%s:%s" % (username, password))
'Content-Type': 'text/json',
'Authorization': 'Basic %s' % self.credentials,
}
-
+
def _call_raw(self, data):
return client.getPage(
url=self.bitcoin_url,
headers=self.headers,
postdata=data,
)
-
+
def _call(self, method, params):
return self._call_raw(json.dumps({
'jsonrpc': '2.0',
def getinfo(self):
resp = (yield self._call('getinfo', []))
defer.returnValue(json.loads(resp)['result'])
-
+
@defer.inlineCallbacks
def getblocktemplate(self):
resp = (yield self._call('getblocktemplate', []))
defer.returnValue(json.loads(resp)['result'])
-
+
@defer.inlineCallbacks
def prevhash(self):
resp = (yield self._call('getwork', []))
except Exception as e:
log.exception("Cannot decode prevhash %s" % str(e))
raise
-
+
@defer.inlineCallbacks
def validateaddress(self, address):
resp = (yield self._call('validateaddress', [address,]))
- defer.returnValue(json.loads(resp)['result'])
\ No newline at end of file
+ defer.returnValue(json.loads(resp)['result'])
'''Template is used for generating new jobs for clients.
Let's iterate extranonce1, extranonce2, ntime and nonce
to find out valid bitcoin block!'''
-
+
coinbase_transaction_class = CoinbaseTransaction
-
+
def __init__(self, timestamper, coinbaser, job_id):
super(BlockTemplate, self).__init__()
-
- self.job_id = job_id
+
+ self.job_id = job_id
self.timestamper = timestamper
self.coinbaser = coinbaser
-
+
self.prevhash_bin = '' # reversed binary form of prevhash
self.prevhash_hex = ''
self.timedelta = 0
self.curtime = 0
self.target = 0
- #self.coinbase_hex = None
+ #self.coinbase_hex = None
self.merkletree = None
-
+
self.broadcast_args = []
-
+
# List of 4-tuples (extranonce1, extranonce2, ntime, nonce)
# registers already submitted and checked shares
# There may be registered also invalid shares inside!
- self.submits = []
-
+ self.submits = []
+
def fill_from_rpc(self, data):
'''Convert getblocktemplate result into BlockTemplate instance'''
-
+
#txhashes = [None] + [ binascii.unhexlify(t['hash']) for t in data['transactions'] ]
txhashes = [None] + [ util.ser_uint256(int(t['hash'], 16)) for t in data['transactions'] ]
mt = merkletree.MerkleTree(txhashes)
coinbase = self.coinbase_transaction_class(self.timestamper, self.coinbaser, data['coinbasevalue'],
data['coinbaseaux']['flags'], data['height'], settings.COINBASE_EXTRAS)
-
+
self.height = data['height']
self.nVersion = data['version']
self.hashPrevBlock = int(data['previousblockhash'], 16)
self.nTime = 0
self.nNonce = 0
self.vtx = [ coinbase, ]
-
+
for tx in data['transactions']:
t = halfnode.CTransaction()
t.deserialize(StringIO.StringIO(binascii.unhexlify(tx['data'])))
self.vtx.append(t)
-
+
self.curtime = data['curtime']
- self.timedelta = self.curtime - int(self.timestamper.time())
+ self.timedelta = self.curtime - int(self.timestamper.time())
self.merkletree = mt
self.target = util.uint256_from_compact(self.nBits)
-
+
# Reversed prevhash
self.prevhash_bin = binascii.unhexlify(util.reverse_hash(data['previousblockhash']))
self.prevhash_hex = "%064x" % self.hashPrevBlock
-
+
self.broadcast_args = self.build_broadcast_args()
-
+
def register_submit(self, extranonce1, extranonce2, ntime, nonce):
'''Client submitted some solution. Let's register it to
prevent double submissions.'''
-
+
t = (extranonce1, extranonce2, ntime, nonce)
if t not in self.submits:
self.submits.append(t)
return True
return False
-
+
def build_broadcast_args(self):
'''Build parameters of mining.notify call. All clients
may receive the same params, because they include
nbits = binascii.hexlify(struct.pack(">I", self.nBits))
ntime = binascii.hexlify(struct.pack(">I", self.curtime))
clean_jobs = True
-
+
return (job_id, prevhash, coinb1, coinb2, merkle_branch, version, nbits, ntime, clean_jobs)
def serialize_coinbase(self, extranonce1, extranonce2):
in binary form'''
(part1, part2) = self.vtx[0]._serialized
return part1 + extranonce1 + extranonce2 + part2
-
+
def check_ntime(self, ntime):
'''Check for ntime restrictions.'''
if ntime < self.curtime:
return False
-
+
if ntime > (self.timestamper.time() + 1000):
# Be strict on ntime into the near future
# may be unnecessary
return False
-
+
return True
def serialize_header(self, merkle_root_int, ntime_bin, nonce_bin):
r += util.ser_uint256_be(merkle_root_int)
r += ntime_bin
r += struct.pack(">I", self.nBits)
- r += nonce_bin
- return r
+ r += nonce_bin
+ return r
def finalize(self, merkle_root_int, extranonce1_bin, extranonce2_bin, ntime, nonce):
'''Take all parameters required to compile block candidate.
self.is_valid() should return True then...'''
-
+
self.hashMerkleRoot = merkle_root_int
self.nTime = ntime
self.nNonce = nonce
- self.vtx[0].set_extranonce(extranonce1_bin + extranonce2_bin)
+ self.vtx[0].set_extranonce(extranonce1_bin + extranonce2_bin)
self.sha256 = None # We changed block parameters, let's reset sha256 cache
'''
Polls upstream's getinfo() and detecting new block on the network.
This will call registry.update_block when new prevhash appear.
-
+
This is just failback alternative when something
- with ./bitcoind -blocknotify will go wrong.
+ with ./bitcoind -blocknotify will go wrong.
'''
-
+
def __init__(self, registry, bitcoin_rpc):
self.bitcoin_rpc = bitcoin_rpc
self.registry = registry
self.clock = None
self.schedule()
-
+
def schedule(self):
when = self._get_next_time()
#log.debug("Next prevhash update in %.03f sec" % when)
#log.debug("Merkle update in next %.03f sec" % \
# ((self.registry.last_update + settings.MERKLE_REFRESH_INTERVAL)-Interfaces.timestamper.time()))
self.clock = reactor.callLater(when, self.run)
-
+
def _get_next_time(self):
when = settings.PREVHASH_REFRESH_INTERVAL - (Interfaces.timestamper.time() - self.registry.last_update) % \
settings.PREVHASH_REFRESH_INTERVAL
- return when
-
+ return when
+
@defer.inlineCallbacks
def run(self):
update = False
-
- try:
+
+ try:
if self.registry.last_block:
current_prevhash = "%064x" % self.registry.last_block.hashPrevBlock
else:
current_prevhash = None
-
+
prevhash = util.reverse_hash((yield self.bitcoin_rpc.prevhash()))
if prevhash and prevhash != current_prevhash:
log.info("New block! Prevhash: %s" % prevhash)
update = True
-
+
elif Interfaces.timestamper.time() - self.registry.last_update >= settings.MERKLE_REFRESH_INTERVAL:
log.info("Merkle update! Prevhash: %s" % prevhash)
update = True
-
+
if update:
self.registry.update_block()
finally:
self.schedule()
-
\ No newline at end of file
+
log = stratum.logger.get_logger('coinbaser')
# TODO: Add on_* hooks in the app
-
+
class SimpleCoinbaser(object):
'''This very simple coinbaser uses constant bitcoin address
for all generated blocks.'''
-
+
def __init__(self, bitcoin_rpc, address):
# Fire callback when coinbaser is ready
self.on_load = defer.Deferred()
-
+
self.address = address
self.is_valid = False # We need to check if pool can use this address
-
+
self.bitcoin_rpc = bitcoin_rpc
self._validate()
d = self.bitcoin_rpc.validateaddress(self.address)
d.addCallback(self._address_check)
d.addErrback(self._failure)
-
+
def _address_check(self, result):
if result['isvalid'] and result['ismine']:
self.is_valid = True
log.info("Coinbase address '%s' is valid" % self.address)
-
+
if not self.on_load.called:
self.on_load.callback(True)
-
+
else:
self.is_valid = False
log.error("Coinbase address '%s' is NOT valid!" % self.address)
-
+
def _failure(self, failure):
log.error("Cannot validate Bitcoin address '%s'" % self.address)
raise
-
+
#def on_new_block(self):
# pass
-
+
#def on_new_template(self):
# pass
-
+
def get_script_pubkey(self):
if not self.is_valid:
# Try again, maybe bitcoind was down?
self._validate()
raise Exception("Coinbase address is not validated!")
- return util.script_to_address(self.address)
-
+ return util.script_to_address(self.address)
+
def get_coinbase_data(self):
- return ''
\ No newline at end of file
+ return ''
'''Construct special transaction used for coinbase tx.
It also implements quick serialization using pre-cached
scriptSig template.'''
-
+
extranonce_type = '>Q'
extranonce_placeholder = struct.pack(extranonce_type, int('f000000ff111111f', 16))
extranonce_size = struct.calcsize(extranonce_type)
def __init__(self, timestamper, coinbaser, value, flags, height, data):
super(CoinbaseTransaction, self).__init__()
-
+
#self.extranonce = 0
-
+
if len(self.extranonce_placeholder) != self.extranonce_size:
raise Exception("Extranonce placeholder don't match expected length!")
chr(self.extranonce_size),
util.ser_string(coinbaser.get_coinbase_data() + data)
)
-
+
tx_in.scriptSig = tx_in._scriptSig_template[0] + self.extranonce_placeholder + tx_in._scriptSig_template[1]
-
+
tx_out = halfnode.CTxOut()
tx_out.nValue = value
tx_out.scriptPubKey = coinbaser.get_script_pubkey()
-
+
self.vin.append(tx_in)
self.vout.append(tx_out)
-
+
# Two parts of serialized coinbase, just put part1 + extranonce + part2 to have final serialized tx
self._serialized = super(CoinbaseTransaction, self).serialize().split(self.extranonce_placeholder)
def set_extranonce(self, extranonce):
if len(extranonce) != self.extranonce_size:
raise Exception("Incorrect extranonce size")
-
+
(part1, part2) = self.vin[0]._scriptSig_template
- self.vin[0].scriptSig = part1 + extranonce + part2
\ No newline at end of file
+ self.vin[0].scriptSig = part1 + extranonce + part2
'''Implementation of a counter producing
unique extranonce across all pool instances.
This is just dumb "quick&dirty" solution,
- but it can be changed at any time without breaking anything.'''
+ but it can be changed at any time without breaking anything.'''
def __init__(self, instance_id):
if instance_id < 0 or instance_id > 31:
raise Exception("Current ExtranonceCounter implementation needs an instance_id in <0, 31>.")
-
+
# Last 5 most-significant bits represents instance_id
# The rest is just an iterator of jobs.
self.counter = instance_id << 27
self.size = struct.calcsize('>L')
-
+
def get_size(self):
'''Return expected size of generated extranonce in bytes'''
return self.size
-
+
def get_new_bin(self):
self.counter += 1
return struct.pack('>L', self.counter)
-
\ No newline at end of file
+
r += ser_vector(self.vout)
r += struct.pack("<I", self.nLockTime)
return r
-
+
def calc_sha256(self):
if self.sha256 is None:
self.sha256 = uint256_from_str(SHA256.new(SHA256.new(self.serialize()).digest()).digest())
return self.sha256
-
+
def is_valid(self):
self.calc_sha256()
for tout in self.vout:
return False
tx.calc_sha256()
hashes.append(ser_uint256(tx.sha256))
-
+
while len(hashes) > 1:
newhashes = []
for i in xrange(0, len(hashes), 2):
i2 = min(i+1, len(hashes)-1)
newhashes.append(SHA256.new(SHA256.new(hashes[i] + hashes[i2]).digest()).digest())
hashes = newhashes
-
+
if uint256_from_str(hashes[0]) != self.hashMerkleRoot:
return False
return True
self.nNonce = random.getrandbits(64)
self.strSubVer = MY_SUBVERSION
self.nStartingHeight = 0
-
+
def deserialize(self, f):
self.nVersion = struct.unpack("<i", f.read(4))[0]
if self.nVersion == 10300:
return ""
def __repr__(self):
return "msg_alert()"
-
+
class BitcoinP2PProtocol(Protocol):
messagemap = {
"version": msg_version,
"ping": msg_ping,
"alert": msg_alert,
}
-
+
def connectionMade(self):
peer = self.transport.getPeer()
self.dstaddr = peer.host
self.dstport = peer.port
self.recvbuf = ""
self.last_sent = 0
-
+
t = msg_version()
t.nStartingHeight = getattr(self, 'nStartingHeight', 0)
t.addrTo.ip = self.dstaddr
t.addrFrom.port = 0
t.addrFrom.nTime = time.time()
self.send_message(t)
-
+
def dataReceived(self, data):
self.recvbuf += data
self.got_data()
-
+
def got_data(self):
while True:
if len(self.recvbuf) < 4:
self.got_message(t)
else:
print "UNKNOWN COMMAND", command, repr(msg)
-
- def prepare_message(self, message):
+
+ def prepare_message(self, message):
command = message.command
data = message.serialize()
tmsg = "\xf9\xbe\xb4\xd9"
tmsg += h[:4]
tmsg += data
return tmsg
-
+
def send_serialized_message(self, tmsg):
if not self.connected:
return
-
+
self.transport.write(tmsg)
- self.last_sent = time.time()
-
+ self.last_sent = time.time()
+
def send_message(self, message):
if not self.connected:
return
-
+
#print message.command
-
+
#print "send %s" % repr(message)
command = message.command
data = message.serialize()
h = SHA256.new(th).digest()
tmsg += h[:4]
tmsg += data
-
+
#print tmsg, len(tmsg)
self.transport.write(tmsg)
self.last_sent = time.time()
-
+
def got_message(self, message):
if self.last_sent + 30 * 60 < time.time():
self.send_message(msg_ping())
self.data = data
self.recalculate(detailed)
self._hash_steps = None
-
+
def recalculate(self, detailed=False):
L = self.data
steps = []
Ll = len(L)
self._steps = steps
self.detail = detail
-
+
def hash_steps(self):
if self._hash_steps == None:
self._hash_steps = doublesha(''.join(self._steps))
return self._hash_steps
-
+
def withFirst(self, f):
steps = self._steps
for s in steps:
f = doublesha(f + s)
return f
-
+
def merkleRoot(self):
return self.withFirst(self.data[0])
# MerkleTree tests
def _test():
import binascii
- import time
-
+ import time
+
mt = MerkleTree([None] + [binascii.unhexlify(a) for a in [
'999d2c8bb6bda0bf784d9ebeb631d711dbbbfe1bc006ea13d6ad0d6a2649a971',
'3f92594d5a3d7b4df29d7dd7c46a0dac39a96e751ba0fc9bab5435ea5e22a19d',
b'82293f182d5db07d08acf334a5a907012bbb9990851557ac0ec028116081bd5a' ==
binascii.b2a_hex(mt.withFirst(binascii.unhexlify('d43b669fb42cfa84695b844c0402d410213faa4f3e66cb7248f688ff19d5e5f7')))
)
-
+
print '82293f182d5db07d08acf334a5a907012bbb9990851557ac0ec028116081bd5a'
txes = [binascii.unhexlify(a) for a in [
'd43b669fb42cfa84695b844c0402d410213faa4f3e66cb7248f688ff19d5e5f7',
'a5633f03855f541d8e60a6340fc491d49709dc821f3acb571956a856637adcb6',
'28d97c850eaf917a4c76c02474b05b70a197eaefb468d21c22ed110afe8ec9e0',
]]
-
+
s = time.time()
mt = MerkleTree(txes)
for x in range(100):
print time.time() - s
if __name__ == '__main__':
- _test()
\ No newline at end of file
+ _test()
'''Generate pseudo-unique job_id. It does not need to be absolutely unique,
because pool sends "clean_jobs" flag to clients and they should drop all previous jobs.'''
counter = 0
-
+
@classmethod
def get_new_id(cls):
cls.counter += 1
if cls.counter % 0xffff == 0:
cls.counter = 1
return "%x" % cls.counter
-
+
class TemplateRegistry(object):
'''Implements the main logic of the pool. Keep track
on valid block templates, provide internal interface for stratum
service and implements block validation and submits.'''
-
+
def __init__(self, block_template_class, coinbaser, bitcoin_rpc, instance_id,
on_template_callback, on_block_callback):
self.prevhashes = {}
self.jobs = weakref.WeakValueDictionary()
-
+
self.extranonce_counter = ExtranonceCounter(instance_id)
self.extranonce2_size = block_template_class.coinbase_transaction_class.extranonce_size \
- self.extranonce_counter.get_size()
-
+
self.coinbaser = coinbaser
self.block_template_class = block_template_class
self.bitcoin_rpc = bitcoin_rpc
self.on_block_callback = on_block_callback
self.on_template_callback = on_template_callback
-
+
self.last_block = None
self.update_in_progress = False
self.last_update = None
-
+
# Create first block template on startup
self.update_block()
-
+
def get_new_extranonce1(self):
'''Generates unique extranonce1 (e.g. for newly
subscribed connection.'''
return self.extranonce_counter.get_new_bin()
-
+
def get_last_broadcast_args(self):
'''Returns arguments for mining.notify
from last known template.'''
return self.last_block.broadcast_args
-
+
def add_template(self, block):
'''Adds new template to the registry.
It also clean up templates which should
not be used anymore.'''
-
+
prevhash = block.prevhash_hex
if prevhash in self.prevhashes.keys():
else:
new_block = True
self.prevhashes[prevhash] = []
-
+
# Blocks sorted by prevhash, so it's easy to drop
# them on blockchain update
self.prevhashes[prevhash].append(block)
-
+
# Weak reference for fast lookup using job_id
self.jobs[block.job_id] = block
-
+
# Use this template for every new request
self.last_block = block
-
+
# Drop templates of obsolete blocks
for ph in self.prevhashes.keys():
if ph != prevhash:
del self.prevhashes[ph]
-
+
log.info("New template for %s" % prevhash)
if new_block:
# Everything is ready, let's broadcast jobs!
self.on_template_callback(new_block)
-
+
#from twisted.internet import reactor
- #reactor.callLater(10, self.on_block_callback, new_block)
-
+ #reactor.callLater(10, self.on_block_callback, new_block)
+
def update_block(self):
'''Registry calls the getblocktemplate() RPC
and build new block template.'''
-
+
if self.update_in_progress:
# Block has been already detected
return
-
+
self.update_in_progress = True
self.last_update = Interfaces.timestamper.time()
-
+
d = self.bitcoin_rpc.getblocktemplate()
d.addCallback(self._update_block)
d.addErrback(self._update_block_failed)
-
+
def _update_block_failed(self, failure):
log.error(str(failure))
self.update_in_progress = False
-
+
def _update_block(self, data):
start = Interfaces.timestamper.time()
-
+
template = self.block_template_class(Interfaces.timestamper, self.coinbaser, JobIdGenerator.get_new_id())
template.fill_from_rpc(data)
self.add_template(template)
log.info("Update finished, %.03f sec, %d txes" % \
(Interfaces.timestamper.time() - start, len(template.vtx)))
-
- self.update_in_progress = False
+
+ self.update_in_progress = False
return data
-
+
def diff_to_target(self, difficulty):
'''Converts difficulty to target'''
- diff1 = 0x00000000ffff0000000000000000000000000000000000000000000000000000
+ diff1 = 0x00000000ffff0000000000000000000000000000000000000000000000000000
return diff1 / difficulty
-
+
def get_job(self, job_id):
'''For given job_id returns BlockTemplate instance or None'''
try:
except:
log.info("Job id '%s' not found" % job_id)
return None
-
+
# Now we have to check if job is still valid.
# Unfortunately weak references are not bulletproof and
# old reference can be found until next run of garbage collector.
if j.prevhash_hex not in self.prevhashes:
log.info("Prevhash of job '%s' is unknown" % job_id)
return None
-
+
if j not in self.prevhashes[j.prevhash_hex]:
log.info("Job %s is unknown" % job_id)
return None
-
+
return j
-
+
def submit_share(self, job_id, worker_name, extranonce1_bin, extranonce2, ntime, nonce,
difficulty):
'''Check parameters and finalize block template. If it leads
to valid block candidate, asynchronously submits the block
back to the bitcoin network.
-
+
- extranonce1_bin is binary. No checks performed, it should be from session data
- job_id, extranonce2, ntime, nonce - in hex form sent by the client
- difficulty - decimal number from session, again no checks performed
- submitblock_callback - reference to method which receive result of submitblock()
'''
-
+
# Check if extranonce2 looks correctly. extranonce2 is in hex form...
if len(extranonce2) != self.extranonce2_size * 2:
raise SubmitException("Incorrect size of extranonce2. Expected %d chars" % (self.extranonce2_size*2))
-
+
# Check for job
job = self.get_job(job_id)
if job == None:
raise SubmitException("Job '%s' not found" % job_id)
-
+
# Check if ntime looks correct
if len(ntime) != 8:
raise SubmitException("Incorrect size of ntime. Expected 8 chars")
if not job.check_ntime(int(ntime, 16)):
raise SubmitException("Ntime out of range")
-
- # Check nonce
+
+ # Check nonce
if len(nonce) != 8:
raise SubmitException("Incorrect size of nonce. Expected 8 chars")
-
+
# Check for duplicated submit
if not job.register_submit(extranonce1_bin, extranonce2, ntime, nonce):
log.info("Duplicate from %s, (%s %s %s %s)" % \
(worker_name, binascii.hexlify(extranonce1_bin), extranonce2, ntime, nonce))
raise SubmitException("Duplicate share")
-
+
# Now let's do the hard work!
# ---------------------------
-
+
# 0. Some sugar
extranonce2_bin = binascii.unhexlify(extranonce2)
ntime_bin = binascii.unhexlify(ntime)
nonce_bin = binascii.unhexlify(nonce)
-
+
# 1. Build coinbase
coinbase_bin = job.serialize_coinbase(extranonce1_bin, extranonce2_bin)
coinbase_hash = util.doublesha(coinbase_bin)
-
+
# 2. Calculate merkle root
merkle_root_bin = job.merkletree.withFirst(coinbase_hash)
merkle_root_int = util.uint256_from_str(merkle_root_bin)
-
+
# 3. Serialize header with given merkle, ntime and nonce
header_bin = job.serialize_header(merkle_root_int, ntime_bin, nonce_bin)
-
+
# 4. Reverse header and compare it with target of the user
hash_bin = util.doublesha(''.join([ header_bin[i*4:i*4+4][::-1] for i in range(0, 20) ]))
hash_int = util.uint256_from_str(hash_bin)
block_hash_hex = "%064x" % hash_int
header_hex = binascii.hexlify(header_bin)
-
- target_user = self.diff_to_target(difficulty)
+
+ target_user = self.diff_to_target(difficulty)
if hash_int > target_user:
raise SubmitException("Share is above target")
if hash_int <= target_info:
log.info("Yay, share with diff above 100000")
- # 5. Compare hash with target of the network
+ # 5. Compare hash with target of the network
if hash_int <= job.target:
- # Yay! It is block candidate!
+ # Yay! It is block candidate!
log.info("We found a block candidate! %s" % block_hash_hex)
-
- # 6. Finalize and serialize block object
+
+ # 6. Finalize and serialize block object
job.finalize(merkle_root_int, extranonce1_bin, extranonce2_bin, int(ntime, 16), int(nonce, 16))
-
+
if not job.is_valid():
# Should not happen
log.error("Final job validation failed!")
-
+
# 7. Submit block to the network
serialized = binascii.hexlify(job.serialize())
on_submit = self.bitcoin_rpc.submitblock(serialized)
-
+
return (header_hex, block_hash_hex, on_submit)
-
- return (header_hex, block_hash_hex, None)
\ No newline at end of file
+
+ return (header_hex, block_hash_hex, None)
result = chr(0)*nPad + result
if length is not None and len(result) != length:
return None
-
+
return result
def reverse_hash(h):
# This only revert byte order, nothing more
if len(h) != 64:
raise Exception('hash must have 64 hexa chars')
-
+
return ''.join([ h[56-i:64-i] for i in range(0, 64, 8) ])
def doublesha(b):
addr = b58decode(addr, 25)
except:
return None
-
+
if addr is None:
return None
-
+
ver = addr[0]
cksumA = addr[-4:]
cksumB = doublesha(addr[:-4])[:4]
-
+
if cksumA != cksumB:
return None
-
+
return (ver, addr[1:-4])
def ser_uint256_be(u):
for i in xrange(8):
rs += struct.pack(">I", u & 0xFFFFFFFFL)
u >>= 32
- return rs
+ return rs
def deser_uint256_be(f):
r = 0L
if not d:
raise ValueError('invalid address')
(ver, pubkeyhash) = d
- return b'\x76\xa9\x14' + pubkeyhash + b'\x88\xac'
\ No newline at end of file
+ return b'\x76\xa9\x14' + pubkeyhash + b'\x88\xac'
from stratum import settings
from interfaces import Interfaces
-
+
# Let's wait until share manager and worker manager boot up
(yield Interfaces.share_manager.on_load)
(yield Interfaces.worker_manager.on_load)
-
+
from lib.block_updater import BlockUpdater
from lib.template_registry import TemplateRegistry
from lib.bitcoin_rpc import BitcoinRPC
from lib.block_template import BlockTemplate
from lib.coinbaser import SimpleCoinbaser
-
+
bitcoin_rpc = BitcoinRPC(settings.BITCOIN_TRUSTED_HOST,
settings.BITCOIN_TRUSTED_PORT,
settings.BITCOIN_TRUSTED_USER,
break
except:
time.sleep(1)
-
+
coinbaser = SimpleCoinbaser(bitcoin_rpc, settings.CENTRAL_WALLET)
(yield coinbaser.on_load)
-
+
registry = TemplateRegistry(BlockTemplate,
coinbaser,
bitcoin_rpc,
# Template registry is the main interface between Stratum service
# and pool core logic
Interfaces.set_template_registry(registry)
-
+
# Set up polling mechanism for detecting new block on the network
# This is just failsafe solution when -blocknotify
- # mechanism is not working properly
+ # mechanism is not working properly
BlockUpdater(registry, bitcoin_rpc)
log.info("MINING SERVICE IS READY")
- on_startup.callback(True)
+ on_startup.callback(True)
Default implementation do almost nothing, you probably want to override these classes
and customize references to interface instances in your launcher.
(see launcher_demo.tac for an example).
-'''
+'''
import time
from twisted.internet import reactor, defer
# Fire deferred when manager is ready
self.on_load = defer.Deferred()
self.on_load.callback(True)
-
+
def authorize(self, worker_name, worker_password):
return True
class ShareLimiterInterface(object):
'''Implement difficulty adjustments here'''
-
+
def submit(self, connection_ref, current_difficulty, timestamp):
'''connection - weak reference to Protocol instance
current_difficulty - difficulty of the connection
timestamp - submission time of current share
-
+
- raise SubmitException for stop processing this request
- call mining.set_difficulty on connection to adjust the difficulty'''
pass
-
+
class ShareManagerInterface(object):
def __init__(self):
# Fire deferred when manager is ready
def on_network_block(self, prevhash):
'''Prints when there's new block coming from the network (possibly new round)'''
pass
-
+
def on_submit_share(self, worker_name, block_header, block_hash, shares, timestamp, is_valid):
log.info("%s %s %s" % (block_hash, 'valid' if is_valid else 'INVALID', worker_name))
-
+
def on_submit_block(self, is_accepted, worker_name, block_header, block_hash, timestamp):
log.info("Block %s %s" % (block_hash, 'ACCEPTED' if is_accepted else 'REJECTED'))
-
+
class TimestamperInterface(object):
'''This is the only source for current time in the application.
Override this for generating unix timestamp in different way.'''
'''Predictable timestamper may be useful for unit testing.'''
start_time = 1345678900 # Some day in year 2012
delta = 0
-
+
def time(self):
self.delta += 1
return self.start_time + self.delta
-
+
class Interfaces(object):
worker_manager = None
share_manager = None
share_limiter = None
timestamper = None
template_registry = None
-
+
@classmethod
def set_worker_manager(cls, manager):
- cls.worker_manager = manager
-
- @classmethod
+ cls.worker_manager = manager
+
+ @classmethod
def set_share_manager(cls, manager):
cls.share_manager = manager
- @classmethod
+ @classmethod
def set_share_limiter(cls, limiter):
cls.share_limiter = limiter
-
+
@classmethod
def set_timestamper(cls, manager):
cls.timestamper = manager
-
+
@classmethod
def set_template_registry(cls, registry):
- cls.template_registry = registry
\ No newline at end of file
+ cls.template_registry = registry
import stratum.logger
log = stratum.logger.get_logger('mining')
-
+
class MiningService(GenericService):
'''This service provides public API for Stratum mining proxy
or any Stratum-compatible miner software.
-
+
Warning - any callable argument of this class will be propagated
over Stratum protocol for public audience!'''
-
+
service_type = 'mining'
service_vendor = 'stratum'
is_default = True
-
+
@admin
def update_block(self):
- '''Connect this RPC call to 'bitcoind -blocknotify' for
+ '''Connect this RPC call to 'bitcoind -blocknotify' for
instant notification about new block on the network.
See blocknotify.sh in /scripts/ for more info.'''
-
+
log.info("New block notification received")
Interfaces.template_registry.update_block()
- return True
-
+ return True
+
def authorize(self, worker_name, worker_password):
'''Let authorize worker on this connection.'''
-
+
session = self.connection_ref().get_session()
session.setdefault('authorized', {})
-
+
if Interfaces.worker_manager.authorize(worker_name, worker_password):
session['authorized'][worker_name] = worker_password
return True
-
+
else:
if worker_name in session['authorized']:
del session['authorized'][worker_name]
return False
-
+
def subscribe(self, *args):
'''Subscribe for receiving mining jobs. This will
return subscription details, extranonce1_hex and extranonce2_size'''
-
+
extranonce1 = Interfaces.template_registry.get_new_extranonce1()
extranonce2_size = Interfaces.template_registry.extranonce2_size
extranonce1_hex = binascii.hexlify(extranonce1)
-
+
session = self.connection_ref().get_session()
session['extranonce1'] = extranonce1
session['difficulty'] = 1 # Following protocol specs, default diff is 1
return Pubsub.subscribe(self.connection_ref(), MiningSubscription()) + (extranonce1_hex, extranonce2_size)
-
- '''
+
+ '''
def submit(self, worker_name, job_id, extranonce2, ntime, nonce):
import time
start = time.time()
-
+
for x in range(100):
try:
ret = self.submit2(worker_name, job_id, extranonce2, ntime, nonce)
except:
pass
-
+
log.info("LEN %.03f" % (time.time() - start))
return ret
'''
-
+
def submit(self, worker_name, job_id, extranonce2, ntime, nonce):
'''Try to solve block candidate using given parameters.'''
-
+
session = self.connection_ref().get_session()
session.setdefault('authorized', {})
-
+
# Check if worker is authorized to submit shares
if not Interfaces.worker_manager.authorize(worker_name,
session['authorized'].get(worker_name)):
extranonce1_bin = session.get('extranonce1', None)
if not extranonce1_bin:
raise SubmitException("Connection is not subscribed for mining")
-
+
difficulty = session['difficulty']
submit_time = Interfaces.timestamper.time()
-
+
Interfaces.share_limiter.submit(self.connection_ref, difficulty, submit_time)
-
+
# This checks if submitted share meet all requirements
# and it is valid proof of work.
try:
except SubmitException:
# block_header and block_hash are None when submitted data are corrupted
Interfaces.share_manager.on_submit_share(worker_name, None, None, difficulty,
- submit_time, False)
+ submit_time, False)
raise
-
-
+
+
Interfaces.share_manager.on_submit_share(worker_name, block_header, block_hash, difficulty,
submit_time, True)
-
+
if on_submit != None:
# Pool performs submitblock() to bitcoind. Let's hook
# to result and report it to share manager
worker_name, block_header, block_hash, submit_time)
return True
-
+
# Service documentation for remote discovery
update_block.help_text = "Notify Stratum server about new block on the network."
update_block.params = [('password', 'string', 'Administrator password'),]
-
+
authorize.help_text = "Authorize worker for submitting shares on this connection."
authorize.params = [('worker_name', 'string', 'Name of the worker, usually in the form of user_login.worker_id.'),
('worker_password', 'string', 'Worker password'),]
-
+
subscribe.help_text = "Subscribes current connection for receiving new mining jobs."
subscribe.params = []
-
+
submit.help_text = "Submit solved share back to the server. Excessive sending of invalid shares "\
"or shares above indicated target (see Stratum mining docs for set_target()) may lead "\
"to temporary or permanent ban of user,worker or IP address."
('extranonce2', 'string', 'hex-encoded big-endian extranonce2, length depends on extranonce2_size from mining.notify.'),
('ntime', 'string', 'UNIX timestamp (32bit integer, big-endian, hex-encoded), must be >= ntime provided by mining,notify and <= current time'),
('nonce', 'string', '32bit integer, hex-encoded, big-endian'),]
-
+
class MiningSubscription(Subscription):
'''This subscription object implements
logic for broadcasting new jobs to the clients.'''
-
+
event = 'mining.notify'
-
+
@classmethod
def on_template(cls, is_new_block):
'''This is called when TemplateRegistry registers
new block which we have to broadcast clients.'''
-
+
start = Interfaces.timestamper.time()
-
+
clean_jobs = is_new_block
(job_id, prevhash, coinb1, coinb2, merkle_branch, version, nbits, ntime, _) = \
Interfaces.template_registry.get_last_broadcast_args()
-
+
# Push new job to subscribed clients
cls.emit(job_id, prevhash, coinb1, coinb2, merkle_branch, version, nbits, ntime, clean_jobs)
-
+
cnt = Pubsub.get_subscription_count(cls.event)
log.info("BROADCASTED to %d connections in %.03f sec" % (cnt, (Interfaces.timestamper.time() - start)))
-
+
def _finish_after_subscribe(self, result):
'''Send new job to newly subscribed client'''
- try:
+ try:
(job_id, prevhash, coinb1, coinb2, merkle_branch, version, nbits, ntime, _) = \
Interfaces.template_registry.get_last_broadcast_args()
except Exception:
log.error("Template not ready yet")
return result
-
+
# Force set higher difficulty
# TODO
#self.connection_ref().rpc('mining.set_difficulty', [2,], is_notification=True)
#self.connection_ref().rpc('client.get_version', [])
-
+
# Force client to remove previous jobs if any (eg. from previous connection)
clean_jobs = True
self.emit_single(job_id, prevhash, coinb1, coinb2, merkle_branch, version, nbits, ntime, True)
-
+
return result
-
+
def after_subscribe(self, *args):
'''This will send new job to the client *after* he receive subscription details.
on_finish callback solve the issue that job is broadcasted *during*
the subscription request and client receive messages in wrong order.'''
- self.connection_ref().on_finish.addCallback(self._finish_after_subscribe)
\ No newline at end of file
+ self.connection_ref().on_finish.addCallback(self._finish_after_subscribe)