X-Git-Url: https://git.novaco.in/?a=blobdiff_plain;f=lib%2Fbitcoin.py;h=af0a6ed2ee59558a394b3ecb61606b892700c6ab;hb=0ac02bf040c9e0761030622074405dd2d59fd0fd;hp=c83d5d5630257f2ae579c6d7e92dcb54c2c3b9eb;hpb=93f61f1717deb868b73a6cf2f600c7e2f77c4908;p=electrum-nvc.git diff --git a/lib/bitcoin.py b/lib/bitcoin.py index c83d5d5..af0a6ed 100644 --- a/lib/bitcoin.py +++ b/lib/bitcoin.py @@ -17,16 +17,35 @@ # You should have received a copy of the GNU General Public License # along with this program. If not, see . - -import hashlib, base64, ecdsa, re +import hashlib +import base64 +import re +import sys import hmac -import aes + from util import print_error +from version import SEED_PREFIX + +try: + import ecdsa +except ImportError: + sys.exit("Error: python-ecdsa does not seem to be installed. Try 'sudo pip install ecdsa'") + +try: + import aes +except ImportError: + sys.exit("Error: AES does not seem to be installed. Try 'sudo pip install slowaes'") + +################################## transactions + +MIN_RELAY_TX_FEE = 1000 + # AES encryption EncodeAES = lambda secret, s: base64.b64encode(aes.encryptData(secret,s)) DecodeAES = lambda secret, e: aes.decryptData(secret, base64.b64decode(e)) + def pw_encode(s, password): if password: secret = Hash(password) @@ -34,6 +53,7 @@ def pw_encode(s, password): else: return s + def pw_decode(s, password): if password is not None: secret = Hash(password) @@ -46,17 +66,16 @@ def pw_decode(s, password): return s - - - def rev_hex(s): return s.decode('hex')[::-1].encode('hex') + def int_to_hex(i, length=1): s = hex(i)[2:].rstrip('L') s = "0"*(2*length - len(s)) + s return rev_hex(s) + def var_int(i): # https://en.bitcoin.it/wiki/Protocol_specification#Variable_length_integer if i<0xfd: @@ -68,6 +87,7 @@ def var_int(i): else: return "ff"+int_to_hex(i,8) + def op_push(i): if i<0x4c: return int_to_hex(i) @@ -77,27 +97,29 @@ def op_push(i): return '4d' + int_to_hex(i,2) else: return '4e' + int_to_hex(i,4) - def sha256(x): return hashlib.sha256(x).digest() + def Hash(x): if type(x) is unicode: x=x.encode('utf-8') return sha256(sha256(x)) + hash_encode = lambda x: x[::-1].encode('hex') hash_decode = lambda x: x.decode('hex')[::-1] hmac_sha_512 = lambda x,y: hmac.new(x, y, hashlib.sha512).digest() + def mnemonic_to_seed(mnemonic, passphrase): from pbkdf2 import PBKDF2 import hmac PBKDF2_ROUNDS = 2048 return PBKDF2(mnemonic, 'mnemonic' + passphrase, iterations = PBKDF2_ROUNDS, macmodule = hmac, digestmodule = hashlib.sha512).read(64) -from version import SEED_PREFIX + is_new_seed = lambda x: hmac_sha_512("Seed version", x.encode('utf8')).encode('hex')[0:2].startswith(SEED_PREFIX) def is_old_seed(seed): @@ -114,7 +136,7 @@ def is_old_seed(seed): is_hex = (len(seed) == 32) except Exception: is_hex = False - + return is_hex or (uses_electrum_words and len(words) == 12) @@ -142,9 +164,9 @@ def i2d_ECPrivateKey(pkey, compressed=False): '022100' + \ '%064x' % _r + \ '020101a144034200' - + return key.decode('hex') + i2o_ECPublicKey(pkey.pubkey, compressed) - + def i2o_ECPublicKey(pubkey, compressed=False): # public keys are 65 bytes long (520 bits) # 0x04 + 32-byte X-coordinate + 32-byte Y-coordinate @@ -159,14 +181,14 @@ def i2o_ECPublicKey(pubkey, compressed=False): key = '04' + \ '%064x' % pubkey.point.x() + \ '%064x' % pubkey.point.y() - + return key.decode('hex') - + # end pywallet openssl private key implementation - - -############ functions from pywallet ##################### + + +############ functions from pywallet ##################### def hash_160(public_key): try: @@ -197,6 +219,7 @@ def bc_address_to_hash_160(addr): __b58chars = '123456789ABCDEFGHJKLMNPQRSTUVWXYZabcdefghijkmnopqrstuvwxyz' __b58base = len(__b58chars) + def b58encode(v): """ encode v, which is a string of bytes, to base58.""" @@ -220,6 +243,7 @@ def b58encode(v): return (__b58chars[0]*nPad) + result + def b58decode(v, length): """ decode v into a string of len bytes.""" long_value = 0L @@ -249,6 +273,7 @@ def EncodeBase58Check(vchIn): hash = Hash(vchIn) return b58encode(vchIn + hash[0:4]) + def DecodeBase58Check(psz): vchRet = b58decode(psz, None) key = vchRet[0:-4] @@ -260,9 +285,11 @@ def DecodeBase58Check(psz): else: return key + def PrivKeyToSecret(privkey): return privkey[9:9+32] + def SecretToASecret(secret, compressed=False, addrtype=0): vchIn = chr((addrtype+128)&255) + secret if compressed: vchIn += '\01' @@ -282,15 +309,19 @@ def regenerate_key(sec): b = b[0:32] return EC_KEY(b) + def GetPubKey(pubkey, compressed=False): return i2o_ECPublicKey(pubkey, compressed) + def GetPrivKey(pkey, compressed=False): return i2d_ECPrivateKey(pkey, compressed) + def GetSecret(pkey): return ('%064x' % pkey.secret).decode('hex') + def is_compressed(sec): b = ASecretToSecret(sec) return len(b) == 33 @@ -327,7 +358,7 @@ def is_address(addr): def is_private_key(key): try: - k = ASecretToSecret(key) + k = ASecretToSecret(key) return k is not False except: return False @@ -400,12 +431,40 @@ def ser_to_point(Aser): _r = generator.order() assert Aser[0] in ['\x02','\x03','\x04'] if Aser[0] == '\x04': - return Point( curve, str_to_long(Aser[1:33]), str_to_long(Aser[33:]), _r ) + return Point( curve, string_to_number(Aser[1:33]), string_to_number(Aser[33:]), _r ) Mx = string_to_number(Aser[1:]) return Point( curve, Mx, ECC_YfromX(Mx, curve, Aser[0]=='\x03')[0], _r ) +class MyVerifyingKey(ecdsa.VerifyingKey): + @classmethod + def from_signature(klass, sig, recid, h, curve): + """ See http://www.secg.org/download/aid-780/sec1-v2.pdf, chapter 4.1.6 """ + from ecdsa import util, numbertheory + import msqr + curveFp = curve.curve + G = curve.generator + order = G.order() + # extract r,s from signature + r, s = util.sigdecode_string(sig, order) + # 1.1 + x = r + (recid/2) * order + # 1.3 + alpha = ( x * x * x + curveFp.a() * x + curveFp.b() ) % curveFp.p() + beta = msqr.modular_sqrt(alpha, curveFp.p()) + y = beta if (beta - recid) % 2 == 0 else curveFp.p() - beta + # 1.4 the constructor checks that nR is at infinity + R = Point(curveFp, x, y, order) + # 1.5 compute e from message: + e = string_to_number(h) + minus_e = -e % order + # 1.6 compute Q = r^-1 (sR - eG) + inv_r = numbertheory.inverse_mod(r,order) + Q = inv_r * ( s * R + minus_e * G ) + return klass.from_public_point( Q, curve ) + + class EC_KEY(object): def __init__( self, k ): secret = string_to_number(k) @@ -434,16 +493,9 @@ class EC_KEY(object): @classmethod def verify_message(self, address, signature, message): - """ See http://www.secg.org/download/aid-780/sec1-v2.pdf for the math """ - from ecdsa import numbertheory, util - import msqr - curve = curve_secp256k1 - G = generator_secp256k1 - order = G.order() - # extract r,s from signature sig = base64.b64decode(signature) if len(sig) != 65: raise Exception("Wrong encoding") - r,s = util.sigdecode_string(sig[1:], order) + nV = ord(sig[0]) if nV < 27 or nV >= 35: raise Exception("Bad encoding") @@ -454,24 +506,12 @@ class EC_KEY(object): compressed = False recid = nV - 27 - # 1.1 - x = r + (recid/2) * order - # 1.3 - alpha = ( x * x * x + curve.a() * x + curve.b() ) % curve.p() - beta = msqr.modular_sqrt(alpha, curve.p()) - y = beta if (beta - recid) % 2 == 0 else curve.p() - beta - # 1.4 the constructor checks that nR is at infinity - R = Point(curve, x, y, order) - # 1.5 compute e from message: h = Hash( msg_magic(message) ) - e = string_to_number(h) - minus_e = -e % order - # 1.6 compute Q = r^-1 (sR - eG) - inv_r = numbertheory.inverse_mod(r,order) - Q = inv_r * ( s * R + minus_e * G ) - public_key = ecdsa.VerifyingKey.from_public_point( Q, curve = SECP256k1 ) - # check that Q is the public key + public_key = MyVerifyingKey.from_signature( sig[1:], recid, h, curve = SECP256k1 ) + + # check public key public_key.verify_digest( sig[1:], h, sigdecode = ecdsa.util.sigdecode_string) + # check that we get the original signing address addr = public_key_to_bc_address( point_to_ser(public_key.pubkey.point, compressed) ) if address != addr: @@ -482,49 +522,43 @@ class EC_KEY(object): @classmethod def encrypt_message(self, message, pubkey): - + pk = ser_to_point(pubkey) if not ecdsa.ecdsa.point_is_valid(generator_secp256k1, pk.x(), pk.y()): raise Exception('invalid pubkey') - + ephemeral_exponent = number_to_string(ecdsa.util.randrange(pow(2,256)), generator_secp256k1.order()) ephemeral = EC_KEY(ephemeral_exponent) - + ecdh_key = (pk * ephemeral.privkey.secret_multiplier).x() ecdh_key = ('%064x' % ecdh_key).decode('hex') key = hashlib.sha512(ecdh_key).digest() key_e, key_m = key[:32], key[32:] - + iv_ciphertext = aes.encryptData(key_e, message) - iv, ciphertext = iv_ciphertext[:16], iv_ciphertext[16:] - mac = hmac.new(key_m, ciphertext, hashlib.sha256).digest() ephemeral_pubkey = ephemeral.get_public_key(compressed=True).decode('hex') - - encrypted = 'BIE1' + hash_160(pubkey) + ephemeral_pubkey + iv + ciphertext + mac - return base64.b64encode(encrypted) + encrypted = 'BIE1' + ephemeral_pubkey + iv_ciphertext + mac = hmac.new(key_m, encrypted, hashlib.sha256).digest() + + return base64.b64encode(encrypted + mac) def decrypt_message(self, encrypted): - + encrypted = base64.b64decode(encrypted) - - if len(encrypted) < 105: + + if len(encrypted) < 85: raise Exception('invalid ciphertext: length') - + magic = encrypted[:4] - recipient_pubkeyhash = encrypted[4:24] - ephemeral_pubkey = encrypted[24:57] - iv = encrypted[57:73] - ciphertext = encrypted[73:-32] + ephemeral_pubkey = encrypted[4:37] + iv_ciphertext = encrypted[37:-32] mac = encrypted[-32:] - + if magic != 'BIE1': raise Exception('invalid ciphertext: invalid magic bytes') - - if hash_160(self.get_public_key().decode('hex')) != recipient_pubkeyhash: - raise Exception('invalid ciphertext: invalid key') - + try: ephemeral_pubkey = ser_to_point(ephemeral_pubkey) except AssertionError, e: @@ -537,11 +571,10 @@ class EC_KEY(object): ecdh_key = ('%064x' % ecdh_key).decode('hex') key = hashlib.sha512(ecdh_key).digest() key_e, key_m = key[:32], key[32:] - if mac != hmac.new(key_m, ciphertext, hashlib.sha256).digest(): + if mac != hmac.new(key_m, encrypted[:-32], hashlib.sha256).digest(): raise Exception('invalid ciphertext: invalid mac') - return aes.decryptData(key_e, iv + ciphertext) - + return aes.decryptData(key_e, iv_ciphertext) ###################################### BIP32 ############################## @@ -584,10 +617,10 @@ def _CKD_priv(k, c, s, is_prime): return k_n, c_n # Child public key derivation function (from public key only) -# K = master public key +# K = master public key # c = master chain code # n = index of key we want to derive -# This function allows us to find the nth public key, as long as n is +# This function allows us to find the nth public key, as long as n is # non-negative. If n is negative, we need the master private key to find it. def CKD_pub(cK, c, n): if n & BIP32_PRIME: raise @@ -607,37 +640,91 @@ def _CKD_pub(cK, c, s): return cK_n, c_n +BITCOIN_HEADERS = ["0488ade4", "0488b21e"] +TESTNET_HEADERS = ["043587cf", "04358394"] + +BITCOIN_HEAD = "0488ade4" +TESTNET_HEAD = "04358394" + +BITCOIN_HEADER_PRIV = "0488ADE4" +BITCOIN_HEADER_PUB = "0488B21E" + +TESTNET_HEADER_PRIV = "04358394" +TESTNET_HEADER_PUB = "043587CF" + + +def _get_headers(testnet): + """Returns the correct headers for either testnet or bitcoin, in the form + of a 2-tuple, like (public, private).""" + if testnet: + return (TESTNET_HEADER_PUB, TESTNET_HEADER_PRIV) + else: + return (BITCOIN_HEADER_PUB, BITCOIN_HEADER_PRIV) + def deserialize_xkey(xkey): - xkey = DecodeBase58Check(xkey) + + xkey = DecodeBase58Check(xkey) assert len(xkey) == 78 - assert xkey[0:4].encode('hex') in ["0488ade4", "0488b21e"] + + xkey_header = xkey[0:4].encode('hex') + # Determine if the key is a bitcoin key or a testnet key. + if xkey_header in TESTNET_HEADERS: + head = TESTNET_HEAD + elif xkey_header in BITCOIN_HEADERS: + head = BITCOIN_HEAD + else: + raise Exception("Unknown xkey header: '%s'" % xkey_header) + depth = ord(xkey[4]) fingerprint = xkey[5:9] child_number = xkey[9:13] c = xkey[13:13+32] - if xkey[0:4].encode('hex') == "0488ade4": + if xkey[0:4].encode('hex') == head: K_or_k = xkey[13+33:] else: K_or_k = xkey[13+32:] return depth, fingerprint, child_number, c, K_or_k +def get_xkey_name(xkey, testnet=False): + depth, fingerprint, child_number, c, K = deserialize_xkey(xkey) + n = int(child_number.encode('hex'), 16) + if n & BIP32_PRIME: + child_id = "%d'"%(n - BIP32_PRIME) + else: + child_id = "%d"%n + if depth == 0: + return '' + elif depth == 1: + return child_id + else: + raise BaseException("xpub depth error") + + +def xpub_from_xprv(xprv, testnet=False): + depth, fingerprint, child_number, c, k = deserialize_xkey(xprv) + K, cK = get_pubkeys_from_secret(k) + header_pub, _ = _get_headers(testnet) + xpub = header_pub.decode('hex') + chr(depth) + fingerprint + child_number + c + cK + return EncodeBase58Check(xpub) + -def bip32_root(seed): +def bip32_root(seed, testnet=False): import hmac - seed = seed.decode('hex') + header_pub, header_priv = _get_headers(testnet) + seed = seed.decode('hex') I = hmac.new("Bitcoin seed", seed, hashlib.sha512).digest() master_k = I[0:32] master_c = I[32:] K, cK = get_pubkeys_from_secret(master_k) - xprv = ("0488ADE4" + "00" + "00000000" + "00000000").decode("hex") + master_c + chr(0) + master_k - xpub = ("0488B21E" + "00" + "00000000" + "00000000").decode("hex") + master_c + cK + xprv = (header_priv + "00" + "00000000" + "00000000").decode("hex") + master_c + chr(0) + master_k + xpub = (header_pub + "00" + "00000000" + "00000000").decode("hex") + master_c + cK return EncodeBase58Check(xprv), EncodeBase58Check(xpub) - -def bip32_private_derivation(xprv, branch, sequence): +def bip32_private_derivation(xprv, branch, sequence, testnet=False): + header_pub, header_priv = _get_headers(testnet) depth, fingerprint, child_number, c, k = deserialize_xkey(xprv) assert sequence.startswith(branch) sequence = sequence[len(branch):] @@ -652,13 +739,13 @@ def bip32_private_derivation(xprv, branch, sequence): fingerprint = hash_160(parent_cK)[0:4] child_number = ("%08X"%i).decode('hex') K, cK = get_pubkeys_from_secret(k) - xprv = "0488ADE4".decode('hex') + chr(depth) + fingerprint + child_number + c + chr(0) + k - xpub = "0488B21E".decode('hex') + chr(depth) + fingerprint + child_number + c + cK + xprv = header_priv.decode('hex') + chr(depth) + fingerprint + child_number + c + chr(0) + k + xpub = header_pub.decode('hex') + chr(depth) + fingerprint + child_number + c + cK return EncodeBase58Check(xprv), EncodeBase58Check(xpub) - -def bip32_public_derivation(xpub, branch, sequence): +def bip32_public_derivation(xpub, branch, sequence, testnet=False): + header_pub, _ = _get_headers(testnet) depth, fingerprint, child_number, c, cK = deserialize_xkey(xpub) assert sequence.startswith(branch) sequence = sequence[len(branch):] @@ -671,87 +758,11 @@ def bip32_public_derivation(xpub, branch, sequence): fingerprint = hash_160(parent_cK)[0:4] child_number = ("%08X"%i).decode('hex') - xpub = "0488B21E".decode('hex') + chr(depth) + fingerprint + child_number + c + cK + xpub = header_pub.decode('hex') + chr(depth) + fingerprint + child_number + c + cK return EncodeBase58Check(xpub) - - def bip32_private_key(sequence, k, chain): for i in sequence: k, chain = CKD_priv(k, chain, i) return SecretToASecret(k, True) - - - - -################################## transactions - -MIN_RELAY_TX_FEE = 1000 - - - -def test_bip32(seed, sequence): - """ - run a test vector, - see https://en.bitcoin.it/wiki/BIP_0032_TestVectors - """ - - xprv, xpub = bip32_root(seed) - print xpub - print xprv - - assert sequence[0:2] == "m/" - path = 'm' - sequence = sequence[2:] - for n in sequence.split('/'): - child_path = path + '/' + n - if n[-1] != "'": - xpub2 = bip32_public_derivation(xpub, path, child_path) - xprv, xpub = bip32_private_derivation(xprv, path, child_path) - if n[-1] != "'": - assert xpub == xpub2 - - - path = child_path - print path - print xpub - print xprv - - print "----" - - - -def test_crypto(): - - G = generator_secp256k1 - _r = G.order() - pvk = ecdsa.util.randrange( pow(2,256) ) %_r - - Pub = pvk*G - pubkey_c = point_to_ser(Pub,True) - pubkey_u = point_to_ser(Pub,False) - addr_c = public_key_to_bc_address(pubkey_c) - addr_u = public_key_to_bc_address(pubkey_u) - - print "Private key ", '%064x'%pvk - print "Compressed public key ", pubkey_c.encode('hex') - print "Uncompressed public key", pubkey_u.encode('hex') - - message = "Chancellor on brink of second bailout for banks" - enc = EC_KEY.encrypt_message(message,pubkey_c) - eck = EC_KEY(number_to_string(pvk,_r)) - dec = eck.decrypt_message(enc) - print "decrypted", dec - - signature = eck.sign_message(message, True, addr_c) - print signature - EC_KEY.verify_message(addr_c, signature, message) - - -if __name__ == '__main__': - test_crypto() - test_bip32("000102030405060708090a0b0c0d0e0f", "m/0'/1/2'/2/1000000000") - test_bip32("fffcf9f6f3f0edeae7e4e1dedbd8d5d2cfccc9c6c3c0bdbab7b4b1aeaba8a5a29f9c999693908d8a8784817e7b7875726f6c696663605d5a5754514e4b484542","m/0/2147483647'/1/2147483646'/2") - -