added donation argument and use of new_generate_tx
[p2pool.git] / p2pool / main.py
1 #!/usr/bin/python
2
3 from __future__ import division
4
5 import argparse
6 import datetime
7 import itertools
8 import os
9 import random
10 import sqlite3
11 import struct
12 import sys
13 import time
14 import json
15 import signal
16 import traceback
17
18 from twisted.internet import defer, reactor, task
19 from twisted.web import server, resource
20 from twisted.python import log
21 from nattraverso import portmapper, ipdiscover
22
23 import bitcoin.p2p, bitcoin.getwork, bitcoin.data
24 from util import db, expiring_dict, jsonrpc, variable, deferral, math
25 from . import p2p, worker_interface, skiplists
26 import p2pool.data as p2pool
27 import p2pool as p2pool_init
28
29 @deferral.retry('Error getting work from bitcoind:', 3)
30 @defer.inlineCallbacks
31 def getwork(bitcoind, ht, net):
32     try:
33         work = yield bitcoind.rpc_getmemorypool()
34         defer.returnValue(dict(
35             version=work['version'],
36             previous_block_hash=int(work['previousblockhash'], 16),
37             transactions=[bitcoin.data.tx_type.unpack(x.decode('hex')) for x in work['transactions']],
38             subsidy=work['coinbasevalue'],
39             time=work['time'],
40             target=bitcoin.data.FloatingIntegerType().unpack(work['bits'].decode('hex')[::-1]) if isinstance(work['bits'], (str, unicode)) else bitcoin.data.FloatingInteger(work['bits']),
41         ))
42     except jsonrpc.Error, e:
43         if e.code != -32601:
44             raise
45         
46         print "---> Update your bitcoind to support the 'getmemorypool' RPC call. Not including transactions in generated blocks! <---"
47         work = bitcoin.getwork.BlockAttempt.from_getwork((yield bitcoind.rpc_getwork()))
48         try:
49             subsidy = net.BITCOIN_SUBSIDY_FUNC(ht.getHeight(work.previous_block))
50         except ValueError:
51             subsidy = net.BITCOIN_SUBSIDY_FUNC(1000)
52         
53         defer.returnValue(dict(
54             version=work.version,
55             previous_block_hash=work.previous_block,
56             transactions=[],
57             subsidy=subsidy,
58             time=work.timestamp,
59             target=work.block_target,
60         ))
61
62 @deferral.retry('Error getting payout script from bitcoind:', 1)
63 @defer.inlineCallbacks
64 def get_payout_script(factory):
65     res = yield (yield factory.getProtocol()).check_order(order=bitcoin.p2p.Protocol.null_order)
66     if res['reply'] == 'success':
67         defer.returnValue(res['script'])
68     elif res['reply'] == 'denied':
69         defer.returnValue(None)
70     else:
71         raise ValueError('Unexpected reply: %r' % (res,))
72
73 @deferral.retry('Error creating payout script:', 10)
74 @defer.inlineCallbacks
75 def get_payout_script2(bitcoind, net):
76     defer.returnValue(bitcoin.data.pubkey_hash_to_script2(bitcoin.data.address_to_pubkey_hash((yield bitcoind.rpc_getaccountaddress('p2pool')), net)))
77
78 @defer.inlineCallbacks
79 def main(args):
80     try:
81         print 'p2pool (version %s)' % (p2pool_init.__version__,)
82         print
83         try:
84             from . import draw
85         except ImportError:
86             draw = None
87             print "Install Pygame and PIL to enable visualizations! Visualizations disabled."
88             print
89         
90         # connect to bitcoind over JSON-RPC and do initial getwork
91         url = 'http://%s:%i/' % (args.bitcoind_address, args.bitcoind_rpc_port)
92         print '''Testing bitcoind RPC connection to '%s' with username '%s'...''' % (url, args.bitcoind_rpc_username)
93         bitcoind = jsonrpc.Proxy(url, (args.bitcoind_rpc_username, args.bitcoind_rpc_password))
94         good = yield deferral.retry('Error while checking bitcoind identity:', 1)(args.net.BITCOIN_RPC_CHECK)(bitcoind)
95         if not good:
96             print "    Check failed! Make sure that you're connected to the right bitcoind with --bitcoind-rpc-port!"
97             return
98         temp_work = yield deferral.retry('Error while testing getwork:', 1)(defer.inlineCallbacks(lambda: defer.returnValue(bitcoin.getwork.BlockAttempt.from_getwork((yield bitcoind.rpc_getwork())))))()
99         print '    ...success!'
100         print '    Current block hash: %x' % (temp_work.previous_block,)
101         print
102         
103         # connect to bitcoind over bitcoin-p2p and do checkorder to get pubkey to send payouts to
104         print '''Testing bitcoind P2P connection to '%s:%s'...''' % (args.bitcoind_address, args.bitcoind_p2p_port)
105         factory = bitcoin.p2p.ClientFactory(args.net)
106         reactor.connectTCP(args.bitcoind_address, args.bitcoind_p2p_port, factory)
107         my_script = yield get_payout_script(factory)
108         if args.pubkey_hash is None:
109             if my_script is None:
110                 print '    IP transaction denied ... falling back to sending to address.'
111                 my_script = yield get_payout_script2(bitcoind, args.net)
112         else:
113             my_script = bitcoin.data.pubkey_hash_to_script2(args.pubkey_hash)
114         print '    ...success!'
115         print '    Payout script:', bitcoin.data.script2_to_human(my_script, args.net)
116         print
117         
118         print 'Loading cached block headers...'
119         ht = bitcoin.p2p.HeightTracker(factory, args.net.NAME + '_headers.dat')
120         print '   ...done loading %i cached block headers.' % (len(ht.tracker.shares),)
121         print
122         
123         tracker = p2pool.OkayTracker(args.net)
124         ss = p2pool.ShareStore(os.path.join(os.path.dirname(sys.argv[0]), args.net.NAME + '_shares.'), args.net)
125         known_verified = set()
126         print "Loading shares..."
127         for i, (mode, contents) in enumerate(ss.get_shares()):
128             if mode == 'share':
129                 if contents.hash in tracker.shares:
130                     continue
131                 contents.shared = True
132                 contents.stored = True
133                 contents.time_seen = 0
134                 tracker.add(contents)
135                 if len(tracker.shares) % 1000 == 0 and tracker.shares:
136                     print "    %i" % (len(tracker.shares),)
137             elif mode == 'verified_hash':
138                 known_verified.add(contents)
139             else:
140                 raise AssertionError()
141         print "    ...inserting %i verified shares..." % (len(known_verified),)
142         for h in known_verified:
143             if h not in tracker.shares:
144                 ss.forget_verified_share(h)
145                 continue
146             tracker.verified.add(tracker.shares[h])
147         print "    ...done loading %i shares!" % (len(tracker.shares),)
148         print
149         tracker.added.watch(lambda share: ss.add_share(share))
150         tracker.verified.added.watch(lambda share: ss.add_verified_hash(share.hash))
151         tracker.removed.watch(lambda share: ss.forget_share(share.hash))
152         tracker.verified.removed.watch(lambda share: ss.forget_verified_share(share.hash))
153         
154         peer_heads = expiring_dict.ExpiringDict(300) # hash -> peers that know of it
155         
156         # information affecting work that should trigger a long-polling update
157         current_work = variable.Variable(None)
158         # information affecting work that should not trigger a long-polling update
159         current_work2 = variable.Variable(None)
160         
161         work_updated = variable.Event()
162         
163         requested = expiring_dict.ExpiringDict(300)
164         
165         @defer.inlineCallbacks
166         def set_real_work1():
167             work = yield getwork(bitcoind, ht, args.net)
168             changed = work['previous_block_hash'] != current_work.value['previous_block'] if current_work.value is not None else True
169             current_work.set(dict(
170                 version=work['version'],
171                 previous_block=work['previous_block_hash'],
172                 target=work['target'],
173                 best_share_hash=current_work.value['best_share_hash'] if current_work.value is not None else None,
174                 aux_work=current_work.value['aux_work'] if current_work.value is not None else None,
175             ))
176             current_work2.set(dict(
177                 transactions=work['transactions'],
178                 subsidy=work['subsidy'],
179                 clock_offset=time.time() - work['time'],
180                 last_update=time.time(),
181             ))
182             if changed:
183                 set_real_work2()
184         
185         def set_real_work2():
186             best, desired = tracker.think(ht, current_work.value['previous_block'], time.time() - current_work2.value['clock_offset'])
187             
188             t = dict(current_work.value)
189             t['best_share_hash'] = best
190             current_work.set(t)
191             
192             t = time.time()
193             for peer2, share_hash in desired:
194                 if share_hash not in tracker.tails: # was received in the time tracker.think was running
195                     continue
196                 last_request_time, count = requested.get(share_hash, (None, 0))
197                 if last_request_time is not None and last_request_time - 5 < t < last_request_time + 10 * 1.5**count:
198                     continue
199                 potential_peers = set()
200                 for head in tracker.tails[share_hash]:
201                     potential_peers.update(peer_heads.get(head, set()))
202                 potential_peers = [peer for peer in potential_peers if peer.connected2]
203                 if count == 0 and peer2 is not None and peer2.connected2:
204                     peer = peer2
205                 else:
206                     peer = random.choice(potential_peers) if potential_peers and random.random() > .2 else peer2
207                     if peer is None:
208                         continue
209                 
210                 print 'Requesting parent share %s from %s' % (p2pool.format_hash(share_hash), '%s:%i' % peer.addr)
211                 peer.send_getshares(
212                     hashes=[share_hash],
213                     parents=2000,
214                     stops=list(set(tracker.heads) | set(
215                         tracker.get_nth_parent_hash(head, min(max(0, tracker.get_height_and_last(head)[0] - 1), 10)) for head in tracker.heads
216                     ))[:100],
217                 )
218                 requested[share_hash] = t, count + 1
219         
220         print 'Initializing work...'
221         yield set_real_work1()
222         set_real_work2()
223         print '    ...success!'
224         print
225         
226         @defer.inlineCallbacks
227         def set_merged_work():
228             if not args.merged_url:
229                 return
230             while True:
231                 merged = jsonrpc.Proxy(args.merged_url, (args.merged_userpass,))
232                 auxblock = yield deferral.retry('Error while calling merged getauxblock:', 1)(merged.rpc_getauxblock)()
233                 x = dict(current_work.value)
234                 x['aux_work'] = dict(
235                     hash=int(auxblock['hash'], 16),
236                     target=bitcoin.data.HashType().unpack(auxblock['target'].decode('hex')),
237                     chain_id=auxblock['chainid'],
238                 )
239                 #print x['aux_work']
240                 current_work.set(x)
241                 yield deferral.sleep(1)
242         set_merged_work()
243         
244         start_time = time.time() - current_work2.value['clock_offset']
245         
246         # setup p2p logic and join p2pool network
247         
248         def share_share(share, ignore_peer=None):
249             for peer in p2p_node.peers.itervalues():
250                 if peer is ignore_peer:
251                     continue
252                 #if p2pool_init.DEBUG:
253                 #    print "Sending share %s to %r" % (p2pool.format_hash(share.hash), peer.addr)
254                 peer.send_shares([share])
255             share.flag_shared()
256         
257         def p2p_shares(shares, peer=None):
258             if len(shares) > 5:
259                 print 'Processing %i shares...' % (len(shares),)
260             
261             new_count = 0
262             for share in shares:
263                 if share.hash in tracker.shares:
264                     #print 'Got duplicate share, ignoring. Hash: %s' % (p2pool.format_hash(share.hash),)
265                     continue
266                 
267                 new_count += 1
268                 
269                 #print 'Received share %s from %r' % (p2pool.format_hash(share.hash), share.peer.addr if share.peer is not None else None)
270                 
271                 tracker.add(share)
272             
273             if shares and peer is not None:
274                 peer_heads.setdefault(shares[0].hash, set()).add(peer)
275             
276             if new_count:
277                 set_real_work2()
278             
279             if len(shares) > 5:
280                 print '... done processing %i shares. New: %i Have: %i/~%i' % (len(shares), new_count, len(tracker.shares), 2*args.net.CHAIN_LENGTH)
281         
282         @tracker.verified.added.watch
283         def _(share):
284             if share.bitcoin_hash <= share.header['target']:
285                 print
286                 print 'GOT BLOCK! Passing to bitcoind! %s bitcoin: %x' % (p2pool.format_hash(share.hash), share.bitcoin_hash,)
287                 print
288                 if factory.conn.value is not None:
289                     factory.conn.value.send_block(block=share.as_block(tracker, args.net))
290                 else:
291                     print 'No bitcoind connection! Erp!'
292         
293         def p2p_share_hashes(share_hashes, peer):
294             t = time.time()
295             get_hashes = []
296             for share_hash in share_hashes:
297                 if share_hash in tracker.shares:
298                     continue
299                 last_request_time, count = requested.get(share_hash, (None, 0))
300                 if last_request_time is not None and last_request_time - 5 < t < last_request_time + 10 * 1.5**count:
301                     continue
302                 print 'Got share hash, requesting! Hash: %s' % (p2pool.format_hash(share_hash),)
303                 get_hashes.append(share_hash)
304                 requested[share_hash] = t, count + 1
305             
306             if share_hashes and peer is not None:
307                 peer_heads.setdefault(share_hashes[0], set()).add(peer)
308             if get_hashes:
309                 peer.send_getshares(hashes=get_hashes, parents=0, stops=[])
310         
311         def p2p_get_shares(share_hashes, parents, stops, peer):
312             parents = min(parents, 1000//len(share_hashes))
313             stops = set(stops)
314             shares = []
315             for share_hash in share_hashes:
316                 for share in itertools.islice(tracker.get_chain_known(share_hash), parents + 1):
317                     if share.hash in stops:
318                         break
319                     shares.append(share)
320             print 'Sending %i shares to %s:%i' % (len(shares), peer.addr[0], peer.addr[1])
321             peer.send_shares(shares, full=True)
322         
323         print 'Joining p2pool network using TCP port %i...' % (args.p2pool_port,)
324         
325         def parse(x):
326             if ':' in x:
327                 ip, port = x.split(':')
328                 return ip, int(port)
329             else:
330                 return x, args.net.P2P_PORT
331         
332         nodes = set([
333             ('72.14.191.28', args.net.P2P_PORT),
334             ('62.204.197.159', args.net.P2P_PORT),
335             ('142.58.248.28', args.net.P2P_PORT),
336             ('94.23.34.145', args.net.P2P_PORT),
337         ])
338         for host in [
339             'p2pool.forre.st',
340             'dabuttonfactory.com',
341         ]:
342             try:
343                 nodes.add(((yield reactor.resolve(host)), args.net.P2P_PORT))
344             except:
345                 log.err(None, 'Error resolving bootstrap node IP:')
346
347         if args.net_name == 'litecoin':
348             nodes.add(((yield reactor.resolve('liteco.in')), args.net.P2P_PORT))
349         
350         p2p_node = p2p.Node(
351             current_work=current_work,
352             port=args.p2pool_port,
353             net=args.net,
354             addr_store=db.SQLiteDict(sqlite3.connect(os.path.join(os.path.dirname(sys.argv[0]), 'addrs.dat'), isolation_level=None), args.net.NAME),
355             mode=0 if args.low_bandwidth else 1,
356             preferred_addrs=set(map(parse, args.p2pool_nodes)) | nodes,
357         )
358         p2p_node.handle_shares = p2p_shares
359         p2p_node.handle_share_hashes = p2p_share_hashes
360         p2p_node.handle_get_shares = p2p_get_shares
361         
362         p2p_node.start()
363         
364         # send share when the chain changes to their chain
365         def work_changed(new_work):
366             #print 'Work changed:', new_work
367             for share in tracker.get_chain_known(new_work['best_share_hash']):
368                 if share.shared:
369                     break
370                 share_share(share, share.peer)
371         current_work.changed.watch(work_changed)
372         
373         print '    ...success!'
374         print
375         
376         @defer.inlineCallbacks
377         def upnp_thread():
378             while True:
379                 try:
380                     is_lan, lan_ip = yield ipdiscover.get_local_ip()
381                     if is_lan:
382                         pm = yield portmapper.get_port_mapper()
383                         yield pm._upnp.add_port_mapping(lan_ip, args.p2pool_port, args.p2pool_port, 'p2pool', 'TCP') # XXX try to forward external correct port?
384                 except defer.TimeoutError:
385                     pass
386                 except:
387                     if p2pool_init.DEBUG:
388                         log.err(None, "UPnP error:")
389                 yield deferral.sleep(random.expovariate(1/120))
390         
391         if args.upnp:
392             upnp_thread()
393         
394         # start listening for workers with a JSON-RPC server
395         
396         print 'Listening for workers on port %i...' % (args.worker_port,)
397         
398         # setup worker logic
399         
400         merkle_root_to_transactions = expiring_dict.ExpiringDict(300)
401         run_identifier = struct.pack('<I', random.randrange(2**32))
402         
403         share_counter = skiplists.CountsSkipList(tracker, run_identifier)
404         removed_unstales = set()
405         def get_share_counts(doa=False):
406             height, last = tracker.get_height_and_last(current_work.value['best_share_hash'])
407             matching_in_chain = share_counter(current_work.value['best_share_hash'], max(0, height - 1)) | removed_unstales
408             shares_in_chain = my_shares & matching_in_chain
409             stale_shares = my_shares - matching_in_chain
410             if doa:
411                 stale_doa_shares = stale_shares & doa_shares
412                 stale_not_doa_shares = stale_shares - stale_doa_shares
413                 return len(shares_in_chain) + len(stale_shares), len(stale_doa_shares), len(stale_not_doa_shares)
414             return len(shares_in_chain) + len(stale_shares), len(stale_shares)
415         @tracker.verified.removed.watch
416         def _(share):
417             if share.hash in my_shares and tracker.is_child_of(share.hash, current_work.value['best_share_hash']):
418                 removed_unstales.add(share.hash)
419
420         def compute(state, payout_script):
421             if payout_script is None or random.uniform(0, 100) < args.worker_fee:
422                 payout_script = my_script
423             if state['best_share_hash'] is None and args.net.PERSIST:
424                 raise jsonrpc.Error(-12345, u'p2pool is downloading shares')
425             if len(p2p_node.peers) == 0 and args.net.PERSIST:
426                 raise jsonrpc.Error(-12345, u'p2pool is not connected to any peers')
427             if time.time() > current_work2.value['last_update'] + 60:
428                 raise jsonrpc.Error(-12345, u'lost contact with bitcoind')
429             
430             if state['aux_work'] is not None:
431                 aux_str = '\xfa\xbemm' + bitcoin.data.HashType().pack(state['aux_work']['hash'])[::-1] + struct.pack('<ii', 1, 0)
432             else:
433                 aux_str = ''
434             
435             # XXX assuming generate_tx is smallish here..
436             def get_stale_frac():
437                 shares, stale_shares = get_share_counts()
438                 if shares == 0:
439                     return ""
440                 frac = stale_shares/shares
441                 return 2*struct.pack('<H', int(65535*frac + .5))
442             subsidy = current_work2.value['subsidy']
443             
444             timestamp = int(time.time() - current_work2.value['clock_offset'])
445             if state['best_share_hash'] is not None:
446                 timestamp2 = math.median((s.timestamp for s in itertools.islice(tracker.get_chain_to_root(state['best_share_hash']), 11)), use_float=False) + 1
447                 if timestamp2 > timestamp:
448                     print 'Toff', timestamp2 - timestamp
449                     timestamp = timestamp2
450             
451             if timestamp > 42:
452                 generate_tx = p2pool.new_generate_transaction(
453                     tracker=tracker,
454                     new_share_data=dict(
455                         previous_share_hash=state['best_share_hash'],
456                         pre_coinbase="",
457                         post_coinbase=aux_str,
458                         nonce=run_identifier + struct.pack('<Q', random.randrange(2**64)) + get_stale_frac(),
459                         new_script=payout_script,
460                         subsidy=subsidy,
461                         donation=math.perfect_round(65535*args.donation_amount/100),
462                     ),
463                     block_target=state['target'],
464                     net=args.net,
465                 )
466             else:
467                 generate_tx = p2pool.generate_transaction(
468                     tracker=tracker,
469                     previous_share_hash=state['best_share_hash'],
470                     new_script=payout_script,
471                     subsidy=subsidy,
472                     nonce=run_identifier + struct.pack('<H', random.randrange(2**16)) + aux_str + get_stale_frac(),
473                     block_target=state['target'],
474                     net=args.net,
475                 )
476             
477             print 'New work for worker! Difficulty: %.06f Payout if block: %.6f %s Total block value: %.6f %s including %i transactions' % (0xffff*2**208/p2pool.coinbase_type.unpack(generate_tx['tx_ins'][0]['script'])['share_data']['target'], (generate_tx['tx_outs'][-1]['value']-subsidy//200)*1e-8, args.net.BITCOIN_SYMBOL, subsidy*1e-8, args.net.BITCOIN_SYMBOL, len(current_work2.value['transactions']))
478             #print 'Target: %x' % (p2pool.coinbase_type.unpack(generate_tx['tx_ins'][0]['script'])['share_data']['target'],)
479             #, have', shares.count(my_script) - 2, 'share(s) in the current chain. Fee:', sum(tx.value_in - tx.value_out for tx in extra_txs)/100000000
480             transactions = [generate_tx] + list(current_work2.value['transactions'])
481             merkle_root = bitcoin.data.merkle_hash(transactions)
482             merkle_root_to_transactions[merkle_root] = transactions # will stay for 1000 seconds
483             
484             target2 = p2pool.coinbase_type.unpack(generate_tx['tx_ins'][0]['script'])['share_data']['target']
485             times[p2pool.coinbase_type.unpack(generate_tx['tx_ins'][0]['script'])['share_data']['nonce']] = time.time()
486             #print 'SENT', 2**256//p2pool.coinbase_type.unpack(generate_tx['tx_ins'][0]['script'])['share_data']['target']
487             return bitcoin.getwork.BlockAttempt(state['version'], state['previous_block'], merkle_root, timestamp, state['target'], target2)
488         
489         my_shares = set()
490         doa_shares = set()
491         times = {}
492         
493         def got_response(data, user):
494             try:
495                 # match up with transactions
496                 header = bitcoin.getwork.decode_data(data)
497                 transactions = merkle_root_to_transactions.get(header['merkle_root'], None)
498                 if transactions is None:
499                     print '''Couldn't link returned work's merkle root with its transactions - should only happen if you recently restarted p2pool'''
500                     return False
501                 block = dict(header=header, txs=transactions)
502                 hash_ = bitcoin.data.block_header_type.hash256(block['header'])
503                 pow = hash_;
504
505                 # use scrypt for Litecoin
506                 if (getattr(args.net, 'BITCOIN_POW_SCRYPT', False)):
507                     pow = bitcoin.data.block_header_type.scrypt(block['header']);
508 #                    print 'LTC: hash256 %x' % hash_
509 #                    print 'LTC: scrypt  %x' % pow
510 #                    print 'LTC: target  %x' % block['header']['target']
511 #                    print 'LTC: starget %x' % p2pool.coinbase_type.unpack(transactions[0]['tx_ins'][0]['script'])['share_data']['target']
512
513                 if pow <= block['header']['target'] or p2pool_init.DEBUG:
514                     if factory.conn.value is not None:
515                         factory.conn.value.send_block(block=block)
516                     else:
517                         print 'No bitcoind connection! Erp!'
518                     if pow <= block['header']['target']:
519                         print
520                         print 'GOT BLOCK! Passing to bitcoind! bitcoin: %x' % (hash_,)
521                         print
522                 
523                 if current_work.value['aux_work'] is not None and pow <= current_work.value['aux_work']['target']:
524                     try:
525                         aux_pow = dict(
526                             merkle_tx=dict(
527                                 tx=transactions[0],
528                                 block_hash=hash_,
529                                 merkle_branch=[x['hash'] for x in p2pool.calculate_merkle_branch(transactions, 0)],
530                                 index=0,
531                             ),
532                             merkle_branch=[],
533                             index=0,
534                             parent_block_header=header,
535                         )
536                         
537                         a, b = transactions[0]['tx_ins'][0]['script'][-32-8:-8].encode('hex'), bitcoin.data.aux_pow_type.pack(aux_pow).encode('hex')
538                         #print a, b
539                         merged = jsonrpc.Proxy(args.merged_url, (args.merged_userpass,))
540                         def _(res):
541                             print "MERGED RESULT:", res
542                         merged.rpc_getauxblock(a, b).addBoth(_)
543                     except:
544                         log.err(None, 'Error while processing merged mining POW:')
545                 
546                 target = p2pool.coinbase_type.unpack(transactions[0]['tx_ins'][0]['script'])['share_data']['target']
547                 if pow > target:
548                     print 'Worker submitted share with hash > target:\nhash  : %x\ntarget: %x' % (pow, target)
549                     return False
550                 share = p2pool.Share.from_block(block, args.net)
551                 my_shares.add(share.hash)
552                 if share.previous_hash != current_work.value['best_share_hash']:
553                     doa_shares.add(share.hash)
554                 print 'GOT SHARE! %s %s prev %s age %.2fs' % (user, p2pool.format_hash(share.hash), p2pool.format_hash(share.previous_hash), time.time() - times[share.nonce]) + (' DEAD ON ARRIVAL' if share.previous_hash != current_work.value['best_share_hash'] else '')
555                 good = share.previous_hash == current_work.value['best_share_hash']
556                 # maybe revert back to tracker being non-blocking so 'good' can be more accurate?
557                 p2p_shares([share])
558                 # eg. good = share.hash == current_work.value['best_share_hash'] here
559                 return good
560             except:
561                 log.err(None, 'Error processing data received from worker:')
562                 return False
563         
564         web_root = worker_interface.WorkerInterface(current_work, compute, got_response, args.net)
565         
566         def get_rate():
567             if current_work.value['best_share_hash'] is not None:
568                 height, last = tracker.get_height_and_last(current_work.value['best_share_hash'])
569                 att_s = p2pool.get_pool_attempts_per_second(tracker, current_work.value['best_share_hash'], args.net, min(height - 1, 720))
570                 fracs = [read_stale_frac(share) for share in itertools.islice(tracker.get_chain_known(current_work.value['best_share_hash']), 120) if read_stale_frac(share) is not None]
571                 return json.dumps(int(att_s / (1. - (math.median(fracs) if fracs else 0))))
572             return json.dumps(None)
573         
574         def get_users():
575             height, last = tracker.get_height_and_last(current_work.value['best_share_hash'])
576             weights, total_weight = tracker.get_cumulative_weights(current_work.value['best_share_hash'], min(height, 720), 2**256)
577             res = {}
578             for script in sorted(weights, key=lambda s: weights[s]):
579                 res[bitcoin.data.script2_to_human(script, args.net)] = weights[script]/total_weight
580             return json.dumps(res)
581         
582         class WebInterface(resource.Resource):
583             def __init__(self, func, mime_type):
584                 self.func, self.mime_type = func, mime_type
585             
586             def render_GET(self, request):
587                 request.setHeader('Content-Type', self.mime_type)
588                 return self.func()
589         
590         web_root.putChild('rate', WebInterface(get_rate, 'application/json'))
591         web_root.putChild('users', WebInterface(get_users, 'application/json'))
592         web_root.putChild('fee', WebInterface(lambda: json.dumps(args.worker_fee), 'application/json'))
593         if draw is not None:
594             web_root.putChild('chain_img', WebInterface(lambda: draw.get(tracker, current_work.value['best_share_hash']), 'image/png'))
595         
596         reactor.listenTCP(args.worker_port, server.Site(web_root))
597         
598         print '    ...success!'
599         print
600         
601         # done!
602         
603         # do new getwork when a block is heard on the p2p interface
604         
605         def new_block(block_hash):
606             work_updated.happened()
607         factory.new_block.watch(new_block)
608         
609         print 'Started successfully!'
610         print
611         
612         ht.updated.watch(set_real_work2)
613         
614         @defer.inlineCallbacks
615         def work1_thread():
616             while True:
617                 flag = work_updated.get_deferred()
618                 try:
619                     yield set_real_work1()
620                 except:
621                     log.err()
622                 yield defer.DeferredList([flag, deferral.sleep(random.uniform(1, 10))], fireOnOneCallback=True)
623         
624         @defer.inlineCallbacks
625         def work2_thread():
626             while True:
627                 try:
628                     set_real_work2()
629                 except:
630                     log.err()
631                 yield deferral.sleep(random.expovariate(1/20))
632         
633         work1_thread()
634         work2_thread()
635         
636         
637         if hasattr(signal, 'SIGALRM'):
638             def watchdog_handler(signum, frame):
639                 print 'Watchdog timer went off at:'
640                 traceback.print_stack()
641             
642             signal.signal(signal.SIGALRM, watchdog_handler)
643             task.LoopingCall(signal.alarm, 30).start(1)
644         
645         
646         def read_stale_frac(share):
647             if len(share.nonce) < 4:
648                 return None
649             a, b = struct.unpack("<HH", share.nonce[-4:])
650             if a == 0 or a != b:
651                 return None
652             return a/65535
653
654         pool_str = None;
655         while True:
656             yield deferral.sleep(3)
657             try:
658                 if time.time() > current_work2.value['last_update'] + 60:
659                     print '''---> LOST CONTACT WITH BITCOIND for 60 seconds, check that it isn't frozen or dead <---'''
660                 if current_work.value['best_share_hash'] is not None:
661                     height, last = tracker.get_height_and_last(current_work.value['best_share_hash'])
662                     if height > 2:
663                         att_s = p2pool.get_pool_attempts_per_second(tracker, current_work.value['best_share_hash'], args.net, min(height - 1, 720))
664                         weights, total_weight = tracker.get_cumulative_weights(current_work.value['best_share_hash'], min(height, 720), 2**100)
665                         shares, stale_doa_shares, stale_not_doa_shares = get_share_counts(True)
666                         stale_shares = stale_doa_shares + stale_not_doa_shares
667                         fracs = [read_stale_frac(share) for share in itertools.islice(tracker.get_chain_known(current_work.value['best_share_hash']), 120) if read_stale_frac(share) is not None]
668                         str = 'Pool: %sH/s in %i shares (%i/%i verified) Recent: %.02f%% >%sH/s Shares: %i (%i orphan, %i dead) Peers: %i' % (
669                             math.format(int(att_s / (1. - (math.median(fracs) if fracs else 0)))),
670                             height,
671                             len(tracker.verified.shares),
672                             len(tracker.shares),
673                             weights.get(my_script, 0)/total_weight*100,
674                             math.format(int(weights.get(my_script, 0)*att_s//total_weight / (1. - (math.median(fracs) if fracs else 0)))),
675                             shares,
676                             stale_not_doa_shares,
677                             stale_doa_shares,
678                             len(p2p_node.peers),
679                         ) + (' FDs: %i R/%i W' % (len(reactor.getReaders()), len(reactor.getWriters())) if p2pool_init.DEBUG else '')
680                         if (str != pool_str):
681                             print str;
682                             pool_str = str;
683                         if fracs:
684                             med = math.median(fracs)
685                             print 'Median stale proportion:', med
686                             if shares:
687                                 print '    Own:', stale_shares/shares
688                                 if med < .99:
689                                     print '    Own efficiency: %.02f%%' % (100*(1 - stale_shares/shares)/(1 - med),)
690             
691             
692             except:
693                 log.err()
694     except:
695         log.err(None, 'Fatal error:')
696     finally:
697         reactor.stop()
698
699 def run():
700     class FixedArgumentParser(argparse.ArgumentParser):
701         def _read_args_from_files(self, arg_strings):
702             # expand arguments referencing files
703             new_arg_strings = []
704             for arg_string in arg_strings:
705                 
706                 # for regular arguments, just add them back into the list
707                 if not arg_string or arg_string[0] not in self.fromfile_prefix_chars:
708                     new_arg_strings.append(arg_string)
709                 
710                 # replace arguments referencing files with the file content
711                 else:
712                     try:
713                         args_file = open(arg_string[1:])
714                         try:
715                             arg_strings = []
716                             for arg_line in args_file.read().splitlines():
717                                 for arg in self.convert_arg_line_to_args(arg_line):
718                                     arg_strings.append(arg)
719                             arg_strings = self._read_args_from_files(arg_strings)
720                             new_arg_strings.extend(arg_strings)
721                         finally:
722                             args_file.close()
723                     except IOError:
724                         err = sys.exc_info()[1]
725                         self.error(str(err))
726             
727             # return the modified argument list
728             return new_arg_strings
729         
730         def convert_arg_line_to_args(self, arg_line):
731             return [arg for arg in arg_line.split() if arg.strip()]
732     
733     parser = FixedArgumentParser(description='p2pool (version %s)' % (p2pool_init.__version__,), fromfile_prefix_chars='@')
734     parser.add_argument('--version', action='version', version=p2pool_init.__version__)
735     parser.add_argument('--net',
736         help='use specified network (default: bitcoin)',
737         action='store', choices=sorted(x for x in p2pool.nets if 'testnet' not in x), default='bitcoin', dest='net_name')
738     parser.add_argument('--testnet',
739         help='''use the network's testnet''',
740         action='store_const', const=True, default=False, dest='testnet')
741     parser.add_argument('--debug',
742         help='debugging mode',
743         action='store_const', const=True, default=False, dest='debug')
744     parser.add_argument('-a', '--address',
745         help='generate to this address (defaults to requesting one from bitcoind)',
746         type=str, action='store', default=None, dest='address')
747     parser.add_argument('--logfile',
748         help='''log to specific file (defaults to <network_name>.log in run_p2pool.py's directory)''',
749         type=str, action='store', default=None, dest='logfile')
750     parser.add_argument('--merged-url',
751         help='call getauxblock on this url to get work for merged mining',
752         type=str, action='store', default=None, dest='merged_url')
753     parser.add_argument('--merged-userpass',
754         help='merge daemon user and password, separated by a colon. Example: ncuser:ncpass',
755         type=str, action='store', default=None, dest='merged_userpass')
756     parser.add_argument('--give-author', metavar='DONATION_PERCENTAGE',
757         help='percentage amount to donate to author of p2pool. Default: 0.5',
758         type=float, action='store', default=0.5, dest='donation')
759     
760     p2pool_group = parser.add_argument_group('p2pool interface')
761     p2pool_group.add_argument('--p2pool-port', metavar='PORT',
762         help='use TCP port PORT to listen for connections (default: 9333 normally, 19333 for testnet) (forward this port from your router!)',
763         type=int, action='store', default=None, dest='p2pool_port')
764     p2pool_group.add_argument('-n', '--p2pool-node', metavar='ADDR[:PORT]',
765         help='connect to existing p2pool node at ADDR listening on TCP port PORT (defaults to 9333 normally, 19333 for testnet), in addition to builtin addresses',
766         type=str, action='append', default=[], dest='p2pool_nodes')
767     parser.add_argument('-l', '--low-bandwidth',
768         help='trade lower bandwidth usage for higher latency (reduced efficiency)',
769         action='store_true', default=False, dest='low_bandwidth')
770     parser.add_argument('--disable-upnp',
771         help='''don't attempt to forward port 9333 (19333 for testnet) from the WAN to this computer using UPnP''',
772         action='store_false', default=True, dest='upnp')
773     
774     worker_group = parser.add_argument_group('worker interface')
775     worker_group.add_argument('-w', '--worker-port', metavar='PORT',
776         help='listen on PORT for RPC connections from miners asking for work and providing responses (default: bitcoin: 9332 namecoin: 9331 ixcoin: 9330 i0coin: 9329 solidcoin: 9328 litecoin: 9327, +10000 for testnets)',
777         type=int, action='store', default=None, dest='worker_port')
778     worker_group.add_argument('-f', '--fee', metavar='FEE_PERCENTAGE',
779         help='''charge workers mining to their own bitcoin address (by setting their miner's username to a bitcoin address) this percentage fee to mine on your p2pool instance. Amount displayed at http://127.0.0.1:9332/fee . default: 0''',
780         type=float, action='store', default=0, dest='worker_fee')
781     
782     bitcoind_group = parser.add_argument_group('bitcoind interface')
783     bitcoind_group.add_argument('--bitcoind-address', metavar='BITCOIND_ADDRESS',
784         help='connect to a bitcoind at this address (default: 127.0.0.1)',
785         type=str, action='store', default='127.0.0.1', dest='bitcoind_address')
786     bitcoind_group.add_argument('--bitcoind-rpc-port', metavar='BITCOIND_RPC_PORT',
787         help='connect to a bitcoind at this port over the RPC interface - used to get the current highest block via getwork (default: 8332 ixcoin: 8338 i0coin: 7332 litecoin: 9332)',
788         type=int, action='store', default=None, dest='bitcoind_rpc_port')
789     bitcoind_group.add_argument('--bitcoind-p2p-port', metavar='BITCOIND_P2P_PORT',
790         help='connect to a bitcoind at this port over the p2p interface - used to submit blocks and get the pubkey to generate to via an IP transaction (default: 8333 namecoin: 8334 ixcoin: 8337 i0coin: 7333 solidcoin: 7555 litecoin: 9333, +10000 for testnets)',
791         type=int, action='store', default=None, dest='bitcoind_p2p_port')
792     
793     bitcoind_group.add_argument(metavar='BITCOIND_RPCUSER',
794         help='bitcoind RPC interface username (default: empty)',
795         type=str, action='store', default='', nargs='?', dest='bitcoind_rpc_username')
796     bitcoind_group.add_argument(metavar='BITCOIND_RPCPASSWORD',
797         help='bitcoind RPC interface password',
798         type=str, action='store', dest='bitcoind_rpc_password')
799     
800     args = parser.parse_args()
801     
802     if args.debug:
803         p2pool_init.DEBUG = True
804     
805     if args.logfile is None:
806         args.logfile = os.path.join(os.path.dirname(sys.argv[0]), args.net_name + ('_testnet' if args.testnet else '') + '.log')
807     
808     class LogFile(object):
809         def __init__(self, filename):
810             self.filename = filename
811             self.inner_file = None
812             self.reopen()
813         def reopen(self):
814             if self.inner_file is not None:
815                 self.inner_file.close()
816             open(self.filename, 'a').close()
817             f = open(self.filename, 'rb')
818             f.seek(0, os.SEEK_END)
819             length = f.tell()
820             if length > 100*1000*1000:
821                 f.seek(-1000*1000, os.SEEK_END)
822                 while True:
823                     if f.read(1) in ('', '\n'):
824                         break
825                 data = f.read()
826                 f.close()
827                 f = open(self.filename, 'wb')
828                 f.write(data)
829             f.close()
830             self.inner_file = open(self.filename, 'a')
831         def write(self, data):
832             self.inner_file.write(data)
833         def flush(self):
834             self.inner_file.flush()
835     class TeePipe(object):
836         def __init__(self, outputs):
837             self.outputs = outputs
838         def write(self, data):
839             for output in self.outputs:
840                 output.write(data)
841         def flush(self):
842             for output in self.outputs:
843                 output.flush()
844     class TimestampingPipe(object):
845         def __init__(self, inner_file):
846             self.inner_file = inner_file
847             self.buf = ''
848             self.softspace = 0
849         def write(self, data):
850             buf = self.buf + data
851             lines = buf.split('\n')
852             for line in lines[:-1]:
853                 self.inner_file.write('%s %s\n' % (datetime.datetime.now().strftime("%H:%M:%S.%f"), line))
854                 self.inner_file.flush()
855             self.buf = lines[-1]
856         def flush(self):
857             pass
858     logfile = LogFile(args.logfile)
859     sys.stdout = sys.stderr = log.DefaultObserver.stderr = TimestampingPipe(TeePipe([sys.stderr, logfile]))
860     if hasattr(signal, "SIGUSR1"):
861         def sigusr1(signum, frame):
862             print 'Caught SIGUSR1, closing %r...' % (args.logfile,)
863             logfile.reopen()
864             print '...and reopened %r after catching SIGUSR1.' % (args.logfile,)
865         signal.signal(signal.SIGUSR1, sigusr1)
866     task.LoopingCall(logfile.reopen).start(5)
867     
868     args.net = p2pool.nets[args.net_name + ('_testnet' if args.testnet else '')]
869     
870     if args.bitcoind_rpc_port is None:
871         args.bitcoind_rpc_port = args.net.BITCOIN_RPC_PORT
872     
873     if args.bitcoind_p2p_port is None:
874         args.bitcoind_p2p_port = args.net.BITCOIN_P2P_PORT
875     
876     if args.p2pool_port is None:
877         args.p2pool_port = args.net.P2P_PORT
878     
879     if args.worker_port is None:
880         args.worker_port = args.net.WORKER_PORT
881     
882     if args.address is not None:
883         try:
884             args.pubkey_hash = bitcoin.data.address_to_pubkey_hash(args.address, args.net)
885         except Exception, e:
886             parser.error('error parsing address: ' + repr(e))
887     else:
888         args.pubkey_hash = None
889     
890     if (args.merged_url is None) ^ (args.merged_userpass is None):
891         parser.error('must specify --merged-url and --merged-userpass')
892     
893     reactor.callWhenRunning(main, args)
894     reactor.run()