removed unused argument 'full' to p2p.Protocol.send_shares
[p2pool.git] / p2pool / main.py
1 #!/usr/bin/python
2
3 from __future__ import division
4
5 import argparse
6 import datetime
7 import itertools
8 import os
9 import random
10 import sqlite3
11 import struct
12 import sys
13 import time
14 import json
15 import signal
16 import traceback
17
18 from twisted.internet import defer, reactor, task
19 from twisted.web import server, resource
20 from twisted.python import log
21 from nattraverso import portmapper, ipdiscover
22
23 import bitcoin.p2p, bitcoin.getwork, bitcoin.data
24 from util import db, expiring_dict, jsonrpc, variable, deferral, math
25 from . import p2p, worker_interface, skiplists
26 import p2pool.data as p2pool
27 import p2pool as p2pool_init
28
29 @deferral.retry('Error getting work from bitcoind:', 3)
30 @defer.inlineCallbacks
31 def getwork(bitcoind, ht, net):
32     try:
33         work = yield bitcoind.rpc_getmemorypool()
34         defer.returnValue(dict(
35             version=work['version'],
36             previous_block_hash=int(work['previousblockhash'], 16),
37             transactions=[bitcoin.data.tx_type.unpack(x.decode('hex')) for x in work['transactions']],
38             subsidy=work['coinbasevalue'],
39             time=work['time'],
40             target=bitcoin.data.FloatingIntegerType().unpack(work['bits'].decode('hex')[::-1]) if isinstance(work['bits'], (str, unicode)) else bitcoin.data.FloatingInteger(work['bits']),
41         ))
42     except jsonrpc.Error, e:
43         if e.code != -32601:
44             raise
45         
46         print "---> Update your bitcoind to support the 'getmemorypool' RPC call. Not including transactions in generated blocks! <---"
47         work = bitcoin.getwork.BlockAttempt.from_getwork((yield bitcoind.rpc_getwork()))
48         try:
49             subsidy = net.BITCOIN_SUBSIDY_FUNC(ht.getHeight(work.previous_block))
50         except ValueError:
51             subsidy = net.BITCOIN_SUBSIDY_FUNC(1000)
52         
53         defer.returnValue(dict(
54             version=work.version,
55             previous_block_hash=work.previous_block,
56             transactions=[],
57             subsidy=subsidy,
58             time=work.timestamp,
59             target=work.block_target,
60         ))
61
62 @deferral.retry('Error getting payout script from bitcoind:', 1)
63 @defer.inlineCallbacks
64 def get_payout_script(factory):
65     res = yield (yield factory.getProtocol()).check_order(order=bitcoin.p2p.Protocol.null_order)
66     if res['reply'] == 'success':
67         defer.returnValue(res['script'])
68     elif res['reply'] == 'denied':
69         defer.returnValue(None)
70     else:
71         raise ValueError('Unexpected reply: %r' % (res,))
72
73 @deferral.retry('Error creating payout script:', 10)
74 @defer.inlineCallbacks
75 def get_payout_script2(bitcoind, net):
76     defer.returnValue(bitcoin.data.pubkey_hash_to_script2(bitcoin.data.address_to_pubkey_hash((yield bitcoind.rpc_getaccountaddress('p2pool')), net)))
77
78 @defer.inlineCallbacks
79 def main(args):
80     try:
81         print 'p2pool (version %s)' % (p2pool_init.__version__,)
82         print
83         try:
84             from . import draw
85         except ImportError:
86             draw = None
87             print "Install Pygame and PIL to enable visualizations! Visualizations disabled."
88             print
89         
90         # connect to bitcoind over JSON-RPC and do initial getwork
91         url = 'http://%s:%i/' % (args.bitcoind_address, args.bitcoind_rpc_port)
92         print '''Testing bitcoind RPC connection to '%s' with username '%s'...''' % (url, args.bitcoind_rpc_username)
93         bitcoind = jsonrpc.Proxy(url, (args.bitcoind_rpc_username, args.bitcoind_rpc_password))
94         good = yield deferral.retry('Error while checking bitcoind identity:', 1)(args.net.BITCOIN_RPC_CHECK)(bitcoind)
95         if not good:
96             print "    Check failed! Make sure that you're connected to the right bitcoind with --bitcoind-rpc-port!"
97             return
98         temp_work = yield deferral.retry('Error while testing getwork:', 1)(defer.inlineCallbacks(lambda: defer.returnValue(bitcoin.getwork.BlockAttempt.from_getwork((yield bitcoind.rpc_getwork())))))()
99         print '    ...success!'
100         print '    Current block hash: %x' % (temp_work.previous_block,)
101         print
102         
103         # connect to bitcoind over bitcoin-p2p and do checkorder to get pubkey to send payouts to
104         print '''Testing bitcoind P2P connection to '%s:%s'...''' % (args.bitcoind_address, args.bitcoind_p2p_port)
105         factory = bitcoin.p2p.ClientFactory(args.net)
106         reactor.connectTCP(args.bitcoind_address, args.bitcoind_p2p_port, factory)
107         my_script = yield get_payout_script(factory)
108         if args.pubkey_hash is None:
109             if my_script is None:
110                 print '    IP transaction denied ... falling back to sending to address.'
111                 my_script = yield get_payout_script2(bitcoind, args.net)
112         else:
113             my_script = bitcoin.data.pubkey_hash_to_script2(args.pubkey_hash)
114         print '    ...success!'
115         print '    Payout script:', bitcoin.data.script2_to_human(my_script, args.net)
116         print
117         
118         print 'Loading cached block headers...'
119         ht = bitcoin.p2p.HeightTracker(factory, args.net.NAME + '_headers.dat')
120         print '   ...done loading %i cached block headers.' % (len(ht.tracker.shares),)
121         print
122         
123         tracker = p2pool.OkayTracker(args.net)
124         ss = p2pool.ShareStore(os.path.join(os.path.dirname(sys.argv[0]), args.net.NAME + '_shares.'), args.net)
125         known_verified = set()
126         print "Loading shares..."
127         for i, (mode, contents) in enumerate(ss.get_shares()):
128             if mode == 'share':
129                 if contents.hash in tracker.shares:
130                     continue
131                 contents.shared = True
132                 contents.stored = True
133                 contents.time_seen = 0
134                 tracker.add(contents)
135                 if len(tracker.shares) % 1000 == 0 and tracker.shares:
136                     print "    %i" % (len(tracker.shares),)
137             elif mode == 'verified_hash':
138                 known_verified.add(contents)
139             else:
140                 raise AssertionError()
141         print "    ...inserting %i verified shares..." % (len(known_verified),)
142         for h in known_verified:
143             if h not in tracker.shares:
144                 ss.forget_verified_share(h)
145                 continue
146             tracker.verified.add(tracker.shares[h])
147         print "    ...done loading %i shares!" % (len(tracker.shares),)
148         print
149         tracker.added.watch(lambda share: ss.add_share(share))
150         tracker.verified.added.watch(lambda share: ss.add_verified_hash(share.hash))
151         tracker.removed.watch(lambda share: ss.forget_share(share.hash))
152         tracker.verified.removed.watch(lambda share: ss.forget_verified_share(share.hash))
153         
154         peer_heads = expiring_dict.ExpiringDict(300) # hash -> peers that know of it
155         
156         # information affecting work that should trigger a long-polling update
157         current_work = variable.Variable(None)
158         # information affecting work that should not trigger a long-polling update
159         current_work2 = variable.Variable(None)
160         
161         work_updated = variable.Event()
162         
163         requested = expiring_dict.ExpiringDict(300)
164         
165         @defer.inlineCallbacks
166         def set_real_work1():
167             work = yield getwork(bitcoind, ht, args.net)
168             changed = work['previous_block_hash'] != current_work.value['previous_block'] if current_work.value is not None else True
169             current_work.set(dict(
170                 version=work['version'],
171                 previous_block=work['previous_block_hash'],
172                 target=work['target'],
173                 best_share_hash=current_work.value['best_share_hash'] if current_work.value is not None else None,
174                 aux_work=current_work.value['aux_work'] if current_work.value is not None else None,
175             ))
176             current_work2.set(dict(
177                 transactions=work['transactions'],
178                 subsidy=work['subsidy'],
179                 clock_offset=time.time() - work['time'],
180                 last_update=time.time(),
181             ))
182             if changed:
183                 set_real_work2()
184         
185         def set_real_work2():
186             best, desired = tracker.think(ht, current_work.value['previous_block'], time.time() - current_work2.value['clock_offset'])
187             
188             t = dict(current_work.value)
189             t['best_share_hash'] = best
190             current_work.set(t)
191             
192             t = time.time()
193             for peer2, share_hash in desired:
194                 if share_hash not in tracker.tails: # was received in the time tracker.think was running
195                     continue
196                 last_request_time, count = requested.get(share_hash, (None, 0))
197                 if last_request_time is not None and last_request_time - 5 < t < last_request_time + 10 * 1.5**count:
198                     continue
199                 potential_peers = set()
200                 for head in tracker.tails[share_hash]:
201                     potential_peers.update(peer_heads.get(head, set()))
202                 potential_peers = [peer for peer in potential_peers if peer.connected2]
203                 if count == 0 and peer2 is not None and peer2.connected2:
204                     peer = peer2
205                 else:
206                     peer = random.choice(potential_peers) if potential_peers and random.random() > .2 else peer2
207                     if peer is None:
208                         continue
209                 
210                 print 'Requesting parent share %s from %s' % (p2pool.format_hash(share_hash), '%s:%i' % peer.addr)
211                 peer.send_getshares(
212                     hashes=[share_hash],
213                     parents=2000,
214                     stops=list(set(tracker.heads) | set(
215                         tracker.get_nth_parent_hash(head, min(max(0, tracker.get_height_and_last(head)[0] - 1), 10)) for head in tracker.heads
216                     ))[:100],
217                 )
218                 requested[share_hash] = t, count + 1
219         
220         print 'Initializing work...'
221         yield set_real_work1()
222         set_real_work2()
223         print '    ...success!'
224         print
225         
226         @defer.inlineCallbacks
227         def set_merged_work():
228             if not args.merged_url:
229                 return
230             while True:
231                 merged = jsonrpc.Proxy(args.merged_url, (args.merged_userpass,))
232                 auxblock = yield deferral.retry('Error while calling merged getauxblock:', 1)(merged.rpc_getauxblock)()
233                 x = dict(current_work.value)
234                 x['aux_work'] = dict(
235                     hash=int(auxblock['hash'], 16),
236                     target=bitcoin.data.HashType().unpack(auxblock['target'].decode('hex')),
237                     chain_id=auxblock['chainid'],
238                 )
239                 #print x['aux_work']
240                 current_work.set(x)
241                 yield deferral.sleep(1)
242         set_merged_work()
243         
244         start_time = time.time() - current_work2.value['clock_offset']
245         
246         # setup p2p logic and join p2pool network
247         
248         def share_share(share, ignore_peer=None):
249             for peer in p2p_node.peers.itervalues():
250                 if peer is ignore_peer:
251                     continue
252                 #if p2pool_init.DEBUG:
253                 #    print "Sending share %s to %r" % (p2pool.format_hash(share.hash), peer.addr)
254                 peer.send_shares([share])
255             share.flag_shared()
256         
257         def p2p_shares(shares, peer=None):
258             if len(shares) > 5:
259                 print 'Processing %i shares...' % (len(shares),)
260             
261             new_count = 0
262             for share in shares:
263                 if share.hash in tracker.shares:
264                     #print 'Got duplicate share, ignoring. Hash: %s' % (p2pool.format_hash(share.hash),)
265                     continue
266                 
267                 new_count += 1
268                 
269                 #print 'Received share %s from %r' % (p2pool.format_hash(share.hash), share.peer.addr if share.peer is not None else None)
270                 
271                 tracker.add(share)
272             
273             if shares and peer is not None:
274                 peer_heads.setdefault(shares[0].hash, set()).add(peer)
275             
276             if new_count:
277                 set_real_work2()
278             
279             if len(shares) > 5:
280                 print '... done processing %i shares. New: %i Have: %i/~%i' % (len(shares), new_count, len(tracker.shares), 2*args.net.CHAIN_LENGTH)
281         
282         @tracker.verified.added.watch
283         def _(share):
284             if share.bitcoin_hash <= share.header['target']:
285                 print
286                 print 'GOT BLOCK! Passing to bitcoind! %s bitcoin: %x' % (p2pool.format_hash(share.hash), share.bitcoin_hash,)
287                 print
288                 if factory.conn.value is not None:
289                     factory.conn.value.send_block(block=share.as_block(tracker, args.net))
290                 else:
291                     print 'No bitcoind connection! Erp!'
292         
293         def p2p_share_hashes(share_hashes, peer):
294             t = time.time()
295             get_hashes = []
296             for share_hash in share_hashes:
297                 if share_hash in tracker.shares:
298                     continue
299                 last_request_time, count = requested.get(share_hash, (None, 0))
300                 if last_request_time is not None and last_request_time - 5 < t < last_request_time + 10 * 1.5**count:
301                     continue
302                 print 'Got share hash, requesting! Hash: %s' % (p2pool.format_hash(share_hash),)
303                 get_hashes.append(share_hash)
304                 requested[share_hash] = t, count + 1
305             
306             if share_hashes and peer is not None:
307                 peer_heads.setdefault(share_hashes[0], set()).add(peer)
308             if get_hashes:
309                 peer.send_getshares(hashes=get_hashes, parents=0, stops=[])
310         
311         def p2p_get_shares(share_hashes, parents, stops, peer):
312             parents = min(parents, 1000//len(share_hashes))
313             stops = set(stops)
314             shares = []
315             for share_hash in share_hashes:
316                 for share in itertools.islice(tracker.get_chain_known(share_hash), parents + 1):
317                     if share.hash in stops:
318                         break
319                     shares.append(share)
320             print 'Sending %i shares to %s:%i' % (len(shares), peer.addr[0], peer.addr[1])
321             peer.send_shares(shares)
322         
323         print 'Joining p2pool network using TCP port %i...' % (args.p2pool_port,)
324         
325         def parse(x):
326             if ':' in x:
327                 ip, port = x.split(':')
328                 return ip, int(port)
329             else:
330                 return x, args.net.P2P_PORT
331         
332         nodes = set([
333             ('72.14.191.28', args.net.P2P_PORT),
334             ('62.204.197.159', args.net.P2P_PORT),
335             ('142.58.248.28', args.net.P2P_PORT),
336             ('94.23.34.145', args.net.P2P_PORT),
337         ])
338         for host in [
339             'p2pool.forre.st',
340             'dabuttonfactory.com',
341         ]:
342             try:
343                 nodes.add(((yield reactor.resolve(host)), args.net.P2P_PORT))
344             except:
345                 log.err(None, 'Error resolving bootstrap node IP:')
346
347         if args.net_name == 'litecoin':
348             nodes.add(((yield reactor.resolve('liteco.in')), args.net.P2P_PORT))
349         
350         p2p_node = p2p.Node(
351             current_work=current_work,
352             port=args.p2pool_port,
353             net=args.net,
354             addr_store=db.SQLiteDict(sqlite3.connect(os.path.join(os.path.dirname(sys.argv[0]), 'addrs.dat'), isolation_level=None), args.net.NAME),
355             preferred_addrs=set(map(parse, args.p2pool_nodes)) | nodes,
356         )
357         p2p_node.handle_shares = p2p_shares
358         p2p_node.handle_share_hashes = p2p_share_hashes
359         p2p_node.handle_get_shares = p2p_get_shares
360         
361         p2p_node.start()
362         
363         # send share when the chain changes to their chain
364         def work_changed(new_work):
365             #print 'Work changed:', new_work
366             for share in tracker.get_chain_known(new_work['best_share_hash']):
367                 if share.shared:
368                     break
369                 share_share(share, share.peer)
370         current_work.changed.watch(work_changed)
371         
372         print '    ...success!'
373         print
374         
375         @defer.inlineCallbacks
376         def upnp_thread():
377             while True:
378                 try:
379                     is_lan, lan_ip = yield ipdiscover.get_local_ip()
380                     if is_lan:
381                         pm = yield portmapper.get_port_mapper()
382                         yield pm._upnp.add_port_mapping(lan_ip, args.p2pool_port, args.p2pool_port, 'p2pool', 'TCP') # XXX try to forward external correct port?
383                 except defer.TimeoutError:
384                     pass
385                 except:
386                     if p2pool_init.DEBUG:
387                         log.err(None, "UPnP error:")
388                 yield deferral.sleep(random.expovariate(1/120))
389         
390         if args.upnp:
391             upnp_thread()
392         
393         # start listening for workers with a JSON-RPC server
394         
395         print 'Listening for workers on port %i...' % (args.worker_port,)
396         
397         # setup worker logic
398         
399         merkle_root_to_transactions = expiring_dict.ExpiringDict(300)
400         run_identifier = struct.pack('<I', random.randrange(2**32))
401         
402         share_counter = skiplists.CountsSkipList(tracker, run_identifier)
403         removed_unstales = set()
404         def get_share_counts(doa=False):
405             height, last = tracker.get_height_and_last(current_work.value['best_share_hash'])
406             matching_in_chain = share_counter(current_work.value['best_share_hash'], max(0, height - 1)) | removed_unstales
407             shares_in_chain = my_shares & matching_in_chain
408             stale_shares = my_shares - matching_in_chain
409             if doa:
410                 stale_doa_shares = stale_shares & doa_shares
411                 stale_not_doa_shares = stale_shares - stale_doa_shares
412                 return len(shares_in_chain) + len(stale_shares), len(stale_doa_shares), len(stale_not_doa_shares)
413             return len(shares_in_chain) + len(stale_shares), len(stale_shares)
414         @tracker.verified.removed.watch
415         def _(share):
416             if share.hash in my_shares and tracker.is_child_of(share.hash, current_work.value['best_share_hash']):
417                 removed_unstales.add(share.hash)
418
419         def compute(state, payout_script):
420             if payout_script is None or random.uniform(0, 100) < args.worker_fee:
421                 payout_script = my_script
422             if state['best_share_hash'] is None and args.net.PERSIST:
423                 raise jsonrpc.Error(-12345, u'p2pool is downloading shares')
424             if len(p2p_node.peers) == 0 and args.net.PERSIST:
425                 raise jsonrpc.Error(-12345, u'p2pool is not connected to any peers')
426             if time.time() > current_work2.value['last_update'] + 60:
427                 raise jsonrpc.Error(-12345, u'lost contact with bitcoind')
428             
429             if state['aux_work'] is not None:
430                 aux_str = '\xfa\xbemm' + bitcoin.data.HashType().pack(state['aux_work']['hash'])[::-1] + struct.pack('<ii', 1, 0)
431             else:
432                 aux_str = ''
433             
434             # XXX assuming generate_tx is smallish here..
435             def get_stale_frac():
436                 shares, stale_shares = get_share_counts()
437                 if shares == 0:
438                     return ""
439                 frac = stale_shares/shares
440                 return 2*struct.pack('<H', int(65535*frac + .5))
441             subsidy = current_work2.value['subsidy']
442             generate_tx = p2pool.generate_transaction(
443                 tracker=tracker,
444                 previous_share_hash=state['best_share_hash'],
445                 new_script=payout_script,
446                 subsidy=subsidy,
447                 nonce=run_identifier + struct.pack('<H', random.randrange(2**16)) + aux_str + get_stale_frac(),
448                 block_target=state['target'],
449                 net=args.net,
450             )
451             print 'New work for worker! Difficulty: %.06f Payout if block: %.6f %s Total block value: %.6f %s including %i transactions' % (0xffff*2**208/p2pool.coinbase_type.unpack(generate_tx['tx_ins'][0]['script'])['share_data']['target'], (generate_tx['tx_outs'][-1]['value']-subsidy//200)*1e-8, args.net.BITCOIN_SYMBOL, subsidy*1e-8, args.net.BITCOIN_SYMBOL, len(current_work2.value['transactions']))
452             #print 'Target: %x' % (p2pool.coinbase_type.unpack(generate_tx['tx_ins'][0]['script'])['share_data']['target'],)
453             #, have', shares.count(my_script) - 2, 'share(s) in the current chain. Fee:', sum(tx.value_in - tx.value_out for tx in extra_txs)/100000000
454             transactions = [generate_tx] + list(current_work2.value['transactions'])
455             merkle_root = bitcoin.data.merkle_hash(transactions)
456             merkle_root_to_transactions[merkle_root] = transactions # will stay for 1000 seconds
457             
458             timestamp = int(time.time() - current_work2.value['clock_offset'])
459             if state['best_share_hash'] is not None:
460                 timestamp2 = math.median((s.timestamp for s in itertools.islice(tracker.get_chain_to_root(state['best_share_hash']), 11)), use_float=False) + 1
461                 if timestamp2 > timestamp:
462                     print 'Toff', timestamp2 - timestamp
463                     timestamp = timestamp2
464             target2 = p2pool.coinbase_type.unpack(generate_tx['tx_ins'][0]['script'])['share_data']['target']
465             times[p2pool.coinbase_type.unpack(generate_tx['tx_ins'][0]['script'])['share_data']['nonce']] = time.time()
466             #print 'SENT', 2**256//p2pool.coinbase_type.unpack(generate_tx['tx_ins'][0]['script'])['share_data']['target']
467             return bitcoin.getwork.BlockAttempt(state['version'], state['previous_block'], merkle_root, timestamp, state['target'], target2)
468         
469         my_shares = set()
470         doa_shares = set()
471         times = {}
472         
473         def got_response(data, user):
474             try:
475                 # match up with transactions
476                 header = bitcoin.getwork.decode_data(data)
477                 transactions = merkle_root_to_transactions.get(header['merkle_root'], None)
478                 if transactions is None:
479                     print '''Couldn't link returned work's merkle root with its transactions - should only happen if you recently restarted p2pool'''
480                     return False
481                 block = dict(header=header, txs=transactions)
482                 hash_ = bitcoin.data.block_header_type.hash256(block['header'])
483                 pow = hash_;
484
485                 # use scrypt for Litecoin
486                 if (getattr(args.net, 'BITCOIN_POW_SCRYPT', False)):
487                     pow = bitcoin.data.block_header_type.scrypt(block['header']);
488 #                    print 'LTC: hash256 %x' % hash_
489 #                    print 'LTC: scrypt  %x' % pow
490 #                    print 'LTC: target  %x' % block['header']['target']
491 #                    print 'LTC: starget %x' % p2pool.coinbase_type.unpack(transactions[0]['tx_ins'][0]['script'])['share_data']['target']
492
493                 if pow <= block['header']['target'] or p2pool_init.DEBUG:
494                     if factory.conn.value is not None:
495                         factory.conn.value.send_block(block=block)
496                     else:
497                         print 'No bitcoind connection! Erp!'
498                     if pow <= block['header']['target']:
499                         print
500                         print 'GOT BLOCK! Passing to bitcoind! bitcoin: %x' % (hash_,)
501                         print
502                 
503                 if current_work.value['aux_work'] is not None and pow <= current_work.value['aux_work']['target']:
504                     try:
505                         aux_pow = dict(
506                             merkle_tx=dict(
507                                 tx=transactions[0],
508                                 block_hash=hash_,
509                                 merkle_branch=[x['hash'] for x in p2pool.calculate_merkle_branch(transactions, 0)],
510                                 index=0,
511                             ),
512                             merkle_branch=[],
513                             index=0,
514                             parent_block_header=header,
515                         )
516                         
517                         a, b = transactions[0]['tx_ins'][0]['script'][-32-8:-8].encode('hex'), bitcoin.data.aux_pow_type.pack(aux_pow).encode('hex')
518                         #print a, b
519                         merged = jsonrpc.Proxy(args.merged_url, (args.merged_userpass,))
520                         def _(res):
521                             print "MERGED RESULT:", res
522                         merged.rpc_getauxblock(a, b).addBoth(_)
523                     except:
524                         log.err(None, 'Error while processing merged mining POW:')
525                 
526                 target = p2pool.coinbase_type.unpack(transactions[0]['tx_ins'][0]['script'])['share_data']['target']
527                 if pow > target:
528                     print 'Worker submitted share with hash > target:\nhash  : %x\ntarget: %x' % (pow, target)
529                     return False
530                 share = p2pool.Share.from_block(block, args.net)
531                 my_shares.add(share.hash)
532                 if share.previous_hash != current_work.value['best_share_hash']:
533                     doa_shares.add(share.hash)
534                 print 'GOT SHARE! %s %s prev %s age %.2fs' % (user, p2pool.format_hash(share.hash), p2pool.format_hash(share.previous_hash), time.time() - times[share.nonce]) + (' DEAD ON ARRIVAL' if share.previous_hash != current_work.value['best_share_hash'] else '')
535                 good = share.previous_hash == current_work.value['best_share_hash']
536                 # maybe revert back to tracker being non-blocking so 'good' can be more accurate?
537                 p2p_shares([share])
538                 # eg. good = share.hash == current_work.value['best_share_hash'] here
539                 return good
540             except:
541                 log.err(None, 'Error processing data received from worker:')
542                 return False
543         
544         web_root = worker_interface.WorkerInterface(current_work, compute, got_response, args.net)
545         
546         def get_rate():
547             if current_work.value['best_share_hash'] is not None:
548                 height, last = tracker.get_height_and_last(current_work.value['best_share_hash'])
549                 att_s = p2pool.get_pool_attempts_per_second(tracker, current_work.value['best_share_hash'], args.net, min(height - 1, 720))
550                 fracs = [read_stale_frac(share) for share in itertools.islice(tracker.get_chain_known(current_work.value['best_share_hash']), 120) if read_stale_frac(share) is not None]
551                 return json.dumps(int(att_s / (1. - (math.median(fracs) if fracs else 0))))
552             return json.dumps(None)
553         
554         def get_users():
555             height, last = tracker.get_height_and_last(current_work.value['best_share_hash'])
556             weights, total_weight = tracker.get_cumulative_weights(current_work.value['best_share_hash'], min(height, 720), 2**256)
557             res = {}
558             for script in sorted(weights, key=lambda s: weights[s]):
559                 res[bitcoin.data.script2_to_human(script, args.net)] = weights[script]/total_weight
560             return json.dumps(res)
561         
562         class WebInterface(resource.Resource):
563             def __init__(self, func, mime_type):
564                 self.func, self.mime_type = func, mime_type
565             
566             def render_GET(self, request):
567                 request.setHeader('Content-Type', self.mime_type)
568                 return self.func()
569         
570         web_root.putChild('rate', WebInterface(get_rate, 'application/json'))
571         web_root.putChild('users', WebInterface(get_users, 'application/json'))
572         web_root.putChild('fee', WebInterface(lambda: json.dumps(args.worker_fee), 'application/json'))
573         if draw is not None:
574             web_root.putChild('chain_img', WebInterface(lambda: draw.get(tracker, current_work.value['best_share_hash']), 'image/png'))
575         
576         reactor.listenTCP(args.worker_port, server.Site(web_root))
577         
578         print '    ...success!'
579         print
580         
581         # done!
582         
583         # do new getwork when a block is heard on the p2p interface
584         
585         def new_block(block_hash):
586             work_updated.happened()
587         factory.new_block.watch(new_block)
588         
589         print 'Started successfully!'
590         print
591         
592         ht.updated.watch(set_real_work2)
593         
594         @defer.inlineCallbacks
595         def work1_thread():
596             while True:
597                 flag = work_updated.get_deferred()
598                 try:
599                     yield set_real_work1()
600                 except:
601                     log.err()
602                 yield defer.DeferredList([flag, deferral.sleep(random.uniform(1, 10))], fireOnOneCallback=True)
603         
604         @defer.inlineCallbacks
605         def work2_thread():
606             while True:
607                 try:
608                     set_real_work2()
609                 except:
610                     log.err()
611                 yield deferral.sleep(random.expovariate(1/20))
612         
613         work1_thread()
614         work2_thread()
615         
616         
617         if hasattr(signal, 'SIGALRM'):
618             def watchdog_handler(signum, frame):
619                 print 'Watchdog timer went off at:'
620                 traceback.print_stack()
621             
622             signal.signal(signal.SIGALRM, watchdog_handler)
623             task.LoopingCall(signal.alarm, 30).start(1)
624         
625         
626         def read_stale_frac(share):
627             if len(share.nonce) < 4:
628                 return None
629             a, b = struct.unpack("<HH", share.nonce[-4:])
630             if a == 0 or a != b:
631                 return None
632             return a/65535
633
634         pool_str = None;
635         while True:
636             yield deferral.sleep(3)
637             try:
638                 if time.time() > current_work2.value['last_update'] + 60:
639                     print '''---> LOST CONTACT WITH BITCOIND for 60 seconds, check that it isn't frozen or dead <---'''
640                 if current_work.value['best_share_hash'] is not None:
641                     height, last = tracker.get_height_and_last(current_work.value['best_share_hash'])
642                     if height > 2:
643                         att_s = p2pool.get_pool_attempts_per_second(tracker, current_work.value['best_share_hash'], args.net, min(height - 1, 720))
644                         weights, total_weight = tracker.get_cumulative_weights(current_work.value['best_share_hash'], min(height, 720), 2**100)
645                         shares, stale_doa_shares, stale_not_doa_shares = get_share_counts(True)
646                         stale_shares = stale_doa_shares + stale_not_doa_shares
647                         fracs = [read_stale_frac(share) for share in itertools.islice(tracker.get_chain_known(current_work.value['best_share_hash']), 120) if read_stale_frac(share) is not None]
648                         str = 'Pool: %sH/s in %i shares (%i/%i verified) Recent: %.02f%% >%sH/s Shares: %i (%i orphan, %i dead) Peers: %i' % (
649                             math.format(int(att_s / (1. - (math.median(fracs) if fracs else 0)))),
650                             height,
651                             len(tracker.verified.shares),
652                             len(tracker.shares),
653                             weights.get(my_script, 0)/total_weight*100,
654                             math.format(int(weights.get(my_script, 0)*att_s//total_weight / (1. - (math.median(fracs) if fracs else 0)))),
655                             shares,
656                             stale_not_doa_shares,
657                             stale_doa_shares,
658                             len(p2p_node.peers),
659                         ) + (' FDs: %i R/%i W' % (len(reactor.getReaders()), len(reactor.getWriters())) if p2pool_init.DEBUG else '')
660                         if (str != pool_str):
661                             print str;
662                             pool_str = str;
663                         if fracs:
664                             med = math.median(fracs)
665                             print 'Median stale proportion:', med
666                             if shares:
667                                 print '    Own:', stale_shares/shares
668                                 if med < .99:
669                                     print '    Own efficiency: %.02f%%' % (100*(1 - stale_shares/shares)/(1 - med),)
670             
671             
672             except:
673                 log.err()
674     except:
675         log.err(None, 'Fatal error:')
676     finally:
677         reactor.stop()
678
679 def run():
680     class FixedArgumentParser(argparse.ArgumentParser):
681         def _read_args_from_files(self, arg_strings):
682             # expand arguments referencing files
683             new_arg_strings = []
684             for arg_string in arg_strings:
685                 
686                 # for regular arguments, just add them back into the list
687                 if not arg_string or arg_string[0] not in self.fromfile_prefix_chars:
688                     new_arg_strings.append(arg_string)
689                 
690                 # replace arguments referencing files with the file content
691                 else:
692                     try:
693                         args_file = open(arg_string[1:])
694                         try:
695                             arg_strings = []
696                             for arg_line in args_file.read().splitlines():
697                                 for arg in self.convert_arg_line_to_args(arg_line):
698                                     arg_strings.append(arg)
699                             arg_strings = self._read_args_from_files(arg_strings)
700                             new_arg_strings.extend(arg_strings)
701                         finally:
702                             args_file.close()
703                     except IOError:
704                         err = sys.exc_info()[1]
705                         self.error(str(err))
706             
707             # return the modified argument list
708             return new_arg_strings
709         
710         def convert_arg_line_to_args(self, arg_line):
711             return [arg for arg in arg_line.split() if arg.strip()]
712     
713     parser = FixedArgumentParser(description='p2pool (version %s)' % (p2pool_init.__version__,), fromfile_prefix_chars='@')
714     parser.add_argument('--version', action='version', version=p2pool_init.__version__)
715     parser.add_argument('--net',
716         help='use specified network (default: bitcoin)',
717         action='store', choices=sorted(x for x in p2pool.nets if 'testnet' not in x), default='bitcoin', dest='net_name')
718     parser.add_argument('--testnet',
719         help='''use the network's testnet''',
720         action='store_const', const=True, default=False, dest='testnet')
721     parser.add_argument('--debug',
722         help='debugging mode',
723         action='store_const', const=True, default=False, dest='debug')
724     parser.add_argument('-a', '--address',
725         help='generate to this address (defaults to requesting one from bitcoind)',
726         type=str, action='store', default=None, dest='address')
727     parser.add_argument('--logfile',
728         help='''log to specific file (defaults to <network_name>.log in run_p2pool.py's directory)''',
729         type=str, action='store', default=None, dest='logfile')
730     parser.add_argument('--merged-url',
731         help='call getauxblock on this url to get work for merged mining',
732         type=str, action='store', default=None, dest='merged_url')
733     parser.add_argument('--merged-userpass',
734         help='merge daemon user and password, separated by a colon. Example: ncuser:ncpass',
735         type=str, action='store', default=None, dest='merged_userpass')
736     
737     p2pool_group = parser.add_argument_group('p2pool interface')
738     p2pool_group.add_argument('--p2pool-port', metavar='PORT',
739         help='use TCP port PORT to listen for connections (default: 9333 normally, 19333 for testnet) (forward this port from your router!)',
740         type=int, action='store', default=None, dest='p2pool_port')
741     p2pool_group.add_argument('-n', '--p2pool-node', metavar='ADDR[:PORT]',
742         help='connect to existing p2pool node at ADDR listening on TCP port PORT (defaults to 9333 normally, 19333 for testnet), in addition to builtin addresses',
743         type=str, action='append', default=[], dest='p2pool_nodes')
744     parser.add_argument('--disable-upnp',
745         help='''don't attempt to forward port 9333 (19333 for testnet) from the WAN to this computer using UPnP''',
746         action='store_false', default=True, dest='upnp')
747     
748     worker_group = parser.add_argument_group('worker interface')
749     worker_group.add_argument('-w', '--worker-port', metavar='PORT',
750         help='listen on PORT for RPC connections from miners asking for work and providing responses (default: bitcoin: 9332 namecoin: 9331 ixcoin: 9330 i0coin: 9329 solidcoin: 9328 litecoin: 9327, +10000 for testnets)',
751         type=int, action='store', default=None, dest='worker_port')
752     worker_group.add_argument('-f', '--fee', metavar='FEE_PERCENTAGE',
753         help='''charge workers mining to their own bitcoin address (by setting their miner's username to a bitcoin address) this percentage fee to mine on your p2pool instance. Amount displayed at http://127.0.0.1:9332/fee . default: 0''',
754         type=float, action='store', default=0, dest='worker_fee')
755     
756     bitcoind_group = parser.add_argument_group('bitcoind interface')
757     bitcoind_group.add_argument('--bitcoind-address', metavar='BITCOIND_ADDRESS',
758         help='connect to a bitcoind at this address (default: 127.0.0.1)',
759         type=str, action='store', default='127.0.0.1', dest='bitcoind_address')
760     bitcoind_group.add_argument('--bitcoind-rpc-port', metavar='BITCOIND_RPC_PORT',
761         help='connect to a bitcoind at this port over the RPC interface - used to get the current highest block via getwork (default: 8332 ixcoin: 8338 i0coin: 7332 litecoin: 9332)',
762         type=int, action='store', default=None, dest='bitcoind_rpc_port')
763     bitcoind_group.add_argument('--bitcoind-p2p-port', metavar='BITCOIND_P2P_PORT',
764         help='connect to a bitcoind at this port over the p2p interface - used to submit blocks and get the pubkey to generate to via an IP transaction (default: 8333 namecoin: 8334 ixcoin: 8337 i0coin: 7333 solidcoin: 7555 litecoin: 9333, +10000 for testnets)',
765         type=int, action='store', default=None, dest='bitcoind_p2p_port')
766     
767     bitcoind_group.add_argument(metavar='BITCOIND_RPCUSER',
768         help='bitcoind RPC interface username (default: empty)',
769         type=str, action='store', default='', nargs='?', dest='bitcoind_rpc_username')
770     bitcoind_group.add_argument(metavar='BITCOIND_RPCPASSWORD',
771         help='bitcoind RPC interface password',
772         type=str, action='store', dest='bitcoind_rpc_password')
773     
774     args = parser.parse_args()
775     
776     if args.debug:
777         p2pool_init.DEBUG = True
778     
779     if args.logfile is None:
780         args.logfile = os.path.join(os.path.dirname(sys.argv[0]), args.net_name + ('_testnet' if args.testnet else '') + '.log')
781     
782     class LogFile(object):
783         def __init__(self, filename):
784             self.filename = filename
785             self.inner_file = None
786             self.reopen()
787         def reopen(self):
788             if self.inner_file is not None:
789                 self.inner_file.close()
790             open(self.filename, 'a').close()
791             f = open(self.filename, 'rb')
792             f.seek(0, os.SEEK_END)
793             length = f.tell()
794             if length > 100*1000*1000:
795                 f.seek(-1000*1000, os.SEEK_END)
796                 while True:
797                     if f.read(1) in ('', '\n'):
798                         break
799                 data = f.read()
800                 f.close()
801                 f = open(self.filename, 'wb')
802                 f.write(data)
803             f.close()
804             self.inner_file = open(self.filename, 'a')
805         def write(self, data):
806             self.inner_file.write(data)
807         def flush(self):
808             self.inner_file.flush()
809     class TeePipe(object):
810         def __init__(self, outputs):
811             self.outputs = outputs
812         def write(self, data):
813             for output in self.outputs:
814                 output.write(data)
815         def flush(self):
816             for output in self.outputs:
817                 output.flush()
818     class TimestampingPipe(object):
819         def __init__(self, inner_file):
820             self.inner_file = inner_file
821             self.buf = ''
822             self.softspace = 0
823         def write(self, data):
824             buf = self.buf + data
825             lines = buf.split('\n')
826             for line in lines[:-1]:
827                 self.inner_file.write('%s %s\n' % (datetime.datetime.now().strftime("%H:%M:%S.%f"), line))
828                 self.inner_file.flush()
829             self.buf = lines[-1]
830         def flush(self):
831             pass
832     logfile = LogFile(args.logfile)
833     sys.stdout = sys.stderr = log.DefaultObserver.stderr = TimestampingPipe(TeePipe([sys.stderr, logfile]))
834     if hasattr(signal, "SIGUSR1"):
835         def sigusr1(signum, frame):
836             print 'Caught SIGUSR1, closing %r...' % (args.logfile,)
837             logfile.reopen()
838             print '...and reopened %r after catching SIGUSR1.' % (args.logfile,)
839         signal.signal(signal.SIGUSR1, sigusr1)
840     task.LoopingCall(logfile.reopen).start(5)
841     
842     args.net = p2pool.nets[args.net_name + ('_testnet' if args.testnet else '')]
843     
844     if args.bitcoind_rpc_port is None:
845         args.bitcoind_rpc_port = args.net.BITCOIN_RPC_PORT
846     
847     if args.bitcoind_p2p_port is None:
848         args.bitcoind_p2p_port = args.net.BITCOIN_P2P_PORT
849     
850     if args.p2pool_port is None:
851         args.p2pool_port = args.net.P2P_PORT
852     
853     if args.worker_port is None:
854         args.worker_port = args.net.WORKER_PORT
855     
856     if args.address is not None:
857         try:
858             args.pubkey_hash = bitcoin.data.address_to_pubkey_hash(args.address, args.net)
859         except Exception, e:
860             parser.error('error parsing address: ' + repr(e))
861     else:
862         args.pubkey_hash = None
863     
864     if (args.merged_url is None) ^ (args.merged_userpass is None):
865         parser.error('must specify --merged-url and --merged-userpass')
866     
867     reactor.callWhenRunning(main, args)
868     reactor.run()