18b850fc7b486cc4e9881222d0a0a4efcccad8f3
[p2pool.git] / p2pool / main.py
1 #!/usr/bin/python
2 # coding=utf-8
3
4 from __future__ import division
5
6 import argparse
7 import datetime
8 import itertools
9 import os
10 import random
11 import sqlite3
12 import struct
13 import sys
14 import time
15 import json
16 import signal
17 import traceback
18
19 from twisted.internet import defer, reactor, task
20 from twisted.web import server, resource
21 from twisted.python import log
22 from nattraverso import portmapper, ipdiscover
23
24 import bitcoin.p2p as bitcoin_p2p, bitcoin.getwork as bitcoin_getwork, bitcoin.data as bitcoin_data
25 from bitcoin import worker_interface
26 from util import db, expiring_dict, jsonrpc, variable, deferral, math
27 from . import p2p, skiplists, networks
28 import p2pool, p2pool.data as p2pool_data
29
30 @deferral.retry('Error getting work from bitcoind:', 3)
31 @defer.inlineCallbacks
32 def getwork(bitcoind):
33     work = yield bitcoind.rpc_getmemorypool()
34     defer.returnValue(dict(
35         version=work['version'],
36         previous_block_hash=int(work['previousblockhash'], 16),
37         transactions=[bitcoin_data.tx_type.unpack(x.decode('hex')) for x in work['transactions']],
38         subsidy=work['coinbasevalue'],
39         time=work['time'],
40         target=bitcoin_data.FloatingIntegerType().unpack(work['bits'].decode('hex')[::-1]) if isinstance(work['bits'], (str, unicode)) else bitcoin_data.FloatingInteger(work['bits']),
41     ))
42
43 @deferral.retry('Error creating payout script:', 10)
44 @defer.inlineCallbacks
45 def get_payout_script2(bitcoind, net):
46     address = yield bitcoind.rpc_getaccountaddress('p2pool')
47     validate_response = yield bitcoind.rpc_validateaddress(address)
48     if 'pubkey' not in validate_response:
49         print '    Pubkey request failed. Falling back to payout to address.'
50         defer.returnValue(bitcoin_data.pubkey_hash_to_script2(bitcoin_data.address_to_pubkey_hash(address, net)))
51     pubkey = validate_response['pubkey'].decode('hex')
52     defer.returnValue(bitcoin_data.pubkey_to_script2(pubkey))
53
54 @defer.inlineCallbacks
55 def main(args, net):
56     try:
57         print 'p2pool (version %s)' % (p2pool.__version__,)
58         print
59         try:
60             from . import draw
61         except ImportError:
62             draw = None
63             print "Install Pygame and PIL to enable visualizations! Visualizations disabled."
64             print
65         
66         # connect to bitcoind over JSON-RPC and do initial getmemorypool
67         url = 'http://%s:%i/' % (args.bitcoind_address, args.bitcoind_rpc_port)
68         print '''Testing bitcoind RPC connection to '%s' with username '%s'...''' % (url, args.bitcoind_rpc_username)
69         bitcoind = jsonrpc.Proxy(url, (args.bitcoind_rpc_username, args.bitcoind_rpc_password))
70         good = yield deferral.retry('Error while checking bitcoind identity:', 1)(net.BITCOIN_RPC_CHECK)(bitcoind)
71         if not good:
72             print "    Check failed! Make sure that you're connected to the right bitcoind with --bitcoind-rpc-port!"
73             return
74         temp_work = yield getwork(bitcoind)
75         print '    ...success!'
76         print '    Current block hash: %x' % (temp_work['previous_block_hash'],)
77         print
78         
79         # connect to bitcoind over bitcoin-p2p
80         print '''Testing bitcoind P2P connection to '%s:%s'...''' % (args.bitcoind_address, args.bitcoind_p2p_port)
81         factory = bitcoin_p2p.ClientFactory(net)
82         reactor.connectTCP(args.bitcoind_address, args.bitcoind_p2p_port, factory)
83         yield factory.getProtocol() # waits until handshake is successful
84         print '    ...success!'
85         print
86         
87         if args.pubkey_hash is None:
88             print 'Getting payout address from bitcoind...'
89             my_script = yield get_payout_script2(bitcoind, net)
90         else:
91             print 'Computing payout script from provided address....'
92             my_script = bitcoin_data.pubkey_hash_to_script2(args.pubkey_hash)
93         print '    ...success!'
94         print '    Payout script:', bitcoin_data.script2_to_human(my_script, net)
95         print
96         
97         ht = bitcoin_p2p.HeightTracker(bitcoind, factory)
98         
99         tracker = p2pool_data.OkayTracker(net)
100         shared_share_hashes = set()
101         ss = p2pool_data.ShareStore(os.path.join(os.path.dirname(sys.argv[0]), net.NAME + '_shares.'), net)
102         known_verified = set()
103         print "Loading shares..."
104         for i, (mode, contents) in enumerate(ss.get_shares()):
105             if mode == 'share':
106                 if contents.hash in tracker.shares:
107                     continue
108                 shared_share_hashes.add(contents.hash)
109                 contents.time_seen = 0
110                 tracker.add(contents)
111                 if len(tracker.shares) % 1000 == 0 and tracker.shares:
112                     print "    %i" % (len(tracker.shares),)
113             elif mode == 'verified_hash':
114                 known_verified.add(contents)
115             else:
116                 raise AssertionError()
117         print "    ...inserting %i verified shares..." % (len(known_verified),)
118         for h in known_verified:
119             if h not in tracker.shares:
120                 ss.forget_verified_share(h)
121                 continue
122             tracker.verified.add(tracker.shares[h])
123         print "    ...done loading %i shares!" % (len(tracker.shares),)
124         print
125         tracker.removed.watch(lambda share: ss.forget_share(share.hash))
126         tracker.verified.removed.watch(lambda share: ss.forget_verified_share(share.hash))
127         tracker.removed.watch(lambda share: shared_share_hashes.discard(share.hash))
128         
129         peer_heads = expiring_dict.ExpiringDict(300) # hash -> peers that know of it
130         
131         pre_current_work = variable.Variable(None)
132         pre_current_work2 = variable.Variable(None)
133         pre_merged_work = variable.Variable(None)
134         # information affecting work that should trigger a long-polling update
135         current_work = variable.Variable(None)
136         # information affecting work that should not trigger a long-polling update
137         current_work2 = variable.Variable(None)
138         
139         work_updated = variable.Event()
140         
141         requested = expiring_dict.ExpiringDict(300)
142         
143         @defer.inlineCallbacks
144         def set_real_work1():
145             work = yield getwork(bitcoind)
146             pre_current_work2.set(dict(
147                 time=work['time'],
148                 transactions=work['transactions'],
149                 subsidy=work['subsidy'],
150                 clock_offset=time.time() - work['time'],
151                 last_update=time.time(),
152             )) # second set first because everything hooks on the first
153             pre_current_work.set(dict(
154                 version=work['version'],
155                 previous_block=work['previous_block_hash'],
156                 target=work['target'],
157             ))
158         
159         def set_real_work2():
160             best, desired = tracker.think(ht, pre_current_work.value['previous_block'], time.time() - pre_current_work2.value['clock_offset'])
161             
162             current_work2.set(pre_current_work2.value)
163             t = dict(pre_current_work.value)
164             t['best_share_hash'] = best
165             t['aux_work'] = pre_merged_work.value
166             current_work.set(t)
167             
168             t = time.time()
169             for peer2, share_hash in desired:
170                 if share_hash not in tracker.tails: # was received in the time tracker.think was running
171                     continue
172                 last_request_time, count = requested.get(share_hash, (None, 0))
173                 if last_request_time is not None and last_request_time - 5 < t < last_request_time + 10 * 1.5**count:
174                     continue
175                 potential_peers = set()
176                 for head in tracker.tails[share_hash]:
177                     potential_peers.update(peer_heads.get(head, set()))
178                 potential_peers = [peer for peer in potential_peers if peer.connected2]
179                 if count == 0 and peer2 is not None and peer2.connected2:
180                     peer = peer2
181                 else:
182                     peer = random.choice(potential_peers) if potential_peers and random.random() > .2 else peer2
183                     if peer is None:
184                         continue
185                 
186                 print 'Requesting parent share %s from %s' % (p2pool_data.format_hash(share_hash), '%s:%i' % peer.addr)
187                 peer.send_getshares(
188                     hashes=[share_hash],
189                     parents=2000,
190                     stops=list(set(tracker.heads) | set(
191                         tracker.get_nth_parent_hash(head, min(max(0, tracker.get_height_and_last(head)[0] - 1), 10)) for head in tracker.heads
192                     ))[:100],
193                 )
194                 requested[share_hash] = t, count + 1
195         pre_current_work.changed.watch(lambda _: set_real_work2())
196         
197         print 'Initializing work...'
198         yield set_real_work1()
199         print '    ...success!'
200         print
201         
202         pre_merged_work.changed.watch(lambda _: set_real_work2())
203         ht.updated.watch(set_real_work2)
204         
205         @defer.inlineCallbacks
206         def set_merged_work():
207             if not args.merged_url:
208                 return
209             merged = jsonrpc.Proxy(args.merged_url, (args.merged_userpass,))
210             while True:
211                 auxblock = yield deferral.retry('Error while calling merged getauxblock:', 1)(merged.rpc_getauxblock)()
212                 pre_merged_work.set(dict(
213                     hash=int(auxblock['hash'], 16),
214                     target=bitcoin_data.HashType().unpack(auxblock['target'].decode('hex')),
215                     chain_id=auxblock['chainid'],
216                 ))
217                 yield deferral.sleep(1)
218         set_merged_work()
219         
220         start_time = time.time() - current_work2.value['clock_offset']
221         
222         # setup p2p logic and join p2pool network
223         
224         def p2p_shares(shares, peer=None):
225             if len(shares) > 5:
226                 print 'Processing %i shares...' % (len(shares),)
227             
228             new_count = 0
229             for share in shares:
230                 if share.hash in tracker.shares:
231                     #print 'Got duplicate share, ignoring. Hash: %s' % (p2pool_data.format_hash(share.hash),)
232                     continue
233                 
234                 new_count += 1
235                 
236                 #print 'Received share %s from %r' % (p2pool_data.format_hash(share.hash), share.peer.addr if share.peer is not None else None)
237                 
238                 tracker.add(share)
239             
240             if shares and peer is not None:
241                 peer_heads.setdefault(shares[0].hash, set()).add(peer)
242             
243             if new_count:
244                 set_real_work2()
245             
246             if len(shares) > 5:
247                 print '... done processing %i shares. New: %i Have: %i/~%i' % (len(shares), new_count, len(tracker.shares), 2*net.CHAIN_LENGTH)
248         
249         @tracker.verified.added.watch
250         def _(share):
251             if share.pow_hash <= share.header['target']:
252                 if factory.conn.value is not None:
253                     factory.conn.value.send_block(block=share.as_block(tracker, net))
254                 else:
255                     print 'No bitcoind connection! Erp!'
256                 print
257                 print 'GOT BLOCK! Passing to bitcoind! %s bitcoin: %x' % (p2pool_data.format_hash(share.hash), share.header_hash,)
258                 print
259         
260         def p2p_share_hashes(share_hashes, peer):
261             t = time.time()
262             get_hashes = []
263             for share_hash in share_hashes:
264                 if share_hash in tracker.shares:
265                     continue
266                 last_request_time, count = requested.get(share_hash, (None, 0))
267                 if last_request_time is not None and last_request_time - 5 < t < last_request_time + 10 * 1.5**count:
268                     continue
269                 print 'Got share hash, requesting! Hash: %s' % (p2pool_data.format_hash(share_hash),)
270                 get_hashes.append(share_hash)
271                 requested[share_hash] = t, count + 1
272             
273             if share_hashes and peer is not None:
274                 peer_heads.setdefault(share_hashes[0], set()).add(peer)
275             if get_hashes:
276                 peer.send_getshares(hashes=get_hashes, parents=0, stops=[])
277         
278         def p2p_get_shares(share_hashes, parents, stops, peer):
279             parents = min(parents, 1000//len(share_hashes))
280             stops = set(stops)
281             shares = []
282             for share_hash in share_hashes:
283                 for share in itertools.islice(tracker.get_chain_known(share_hash), parents + 1):
284                     if share.hash in stops:
285                         break
286                     shares.append(share)
287             print 'Sending %i shares to %s:%i' % (len(shares), peer.addr[0], peer.addr[1])
288             peer.sendShares(shares)
289         
290         print 'Joining p2pool network using TCP port %i...' % (args.p2pool_port,)
291         
292         def parse(x):
293             if ':' in x:
294                 ip, port = x.split(':')
295                 return ip, int(port)
296             else:
297                 return x, net.P2P_PORT
298         
299         nodes = set([
300             ('72.14.191.28', net.P2P_PORT),
301             ('62.204.197.159', net.P2P_PORT),
302             ('142.58.248.28', net.P2P_PORT),
303             ('94.23.34.145', net.P2P_PORT),
304         ])
305         for host in [
306             'p2pool.forre.st',
307             'dabuttonfactory.com',
308         ]:
309             try:
310                 nodes.add(((yield reactor.resolve(host)), net.P2P_PORT))
311             except:
312                 log.err(None, 'Error resolving bootstrap node IP:')
313         
314         if net.NAME == 'litecoin':
315             nodes.add(((yield reactor.resolve('liteco.in')), net.P2P_PORT))
316         
317         p2p_node = p2p.Node(
318             current_work=current_work,
319             port=args.p2pool_port,
320             net=net,
321             addr_store=db.SQLiteDict(sqlite3.connect(os.path.join(os.path.dirname(sys.argv[0]), 'addrs.dat'), isolation_level=None), net.NAME),
322             preferred_addrs=set(map(parse, args.p2pool_nodes)) | nodes,
323         )
324         p2p_node.handle_shares = p2p_shares
325         p2p_node.handle_share_hashes = p2p_share_hashes
326         p2p_node.handle_get_shares = p2p_get_shares
327         
328         p2p_node.start()
329         
330         # send share when the chain changes to their chain
331         def work_changed(new_work):
332             #print 'Work changed:', new_work
333             shares = []
334             for share in tracker.get_chain_known(new_work['best_share_hash']):
335                 if share.hash in shared_share_hashes:
336                     break
337                 shared_share_hashes.add(share.hash)
338                 shares.append(share)
339             
340             for peer in p2p_node.peers.itervalues():
341                 peer.sendShares([share for share in shares if share.peer is not peer])
342         
343         current_work.changed.watch(work_changed)
344         
345         def save_shares():
346             for share in itertools.islice(tracker.get_chain_known(current_work.value['best_share_hash']), 2*net.CHAIN_LENGTH):
347                 ss.add_share(share)
348                 if share.hash in tracker.verified.shares:
349                     ss.add_verified_hash(share.hash)
350         task.LoopingCall(save_shares).start(60)
351         
352         print '    ...success!'
353         print
354         
355         @defer.inlineCallbacks
356         def upnp_thread():
357             while True:
358                 try:
359                     is_lan, lan_ip = yield ipdiscover.get_local_ip()
360                     if is_lan:
361                         pm = yield portmapper.get_port_mapper()
362                         yield pm._upnp.add_port_mapping(lan_ip, args.p2pool_port, args.p2pool_port, 'p2pool', 'TCP') # XXX try to forward external correct port?
363                 except defer.TimeoutError:
364                     pass
365                 except:
366                     if p2pool.DEBUG:
367                         log.err(None, "UPnP error:")
368                 yield deferral.sleep(random.expovariate(1/120))
369         
370         if args.upnp:
371             upnp_thread()
372         
373         # start listening for workers with a JSON-RPC server
374         
375         print 'Listening for workers on port %i...' % (args.worker_port,)
376         
377         # setup worker logic
378         
379         merkle_root_to_transactions = expiring_dict.ExpiringDict(300)
380         run_identifier = struct.pack('<I', random.randrange(2**32))
381         
382         share_counter = skiplists.CountsSkipList(tracker, run_identifier)
383         removed_unstales = set()
384         def get_share_counts(doa=False):
385             height, last = tracker.get_height_and_last(current_work.value['best_share_hash'])
386             matching_in_chain = share_counter(current_work.value['best_share_hash'], height) | removed_unstales
387             shares_in_chain = my_shares & matching_in_chain
388             stale_shares = my_shares - matching_in_chain
389             if doa:
390                 stale_doa_shares = stale_shares & doa_shares
391                 stale_not_doa_shares = stale_shares - stale_doa_shares
392                 return len(shares_in_chain) + len(stale_shares), len(stale_doa_shares), len(stale_not_doa_shares)
393             return len(shares_in_chain) + len(stale_shares), len(stale_shares)
394         @tracker.verified.removed.watch
395         def _(share):
396             if share.hash in my_shares and tracker.is_child_of(share.hash, current_work.value['best_share_hash']):
397                 removed_unstales.add(share.hash)
398         
399         
400         def get_payout_script_from_username(request):
401             user = worker_interface.get_username(request)
402             if user is None:
403                 return None
404             try:
405                 return bitcoin_data.pubkey_hash_to_script2(bitcoin_data.address_to_pubkey_hash(user, net))
406             except: # XXX blah
407                 return None
408         
409         def compute(request):
410             state = current_work.value
411             
412             payout_script = get_payout_script_from_username(request)
413             if payout_script is None or random.uniform(0, 100) < args.worker_fee:
414                 payout_script = my_script
415             
416             if len(p2p_node.peers) == 0 and net.PERSIST:
417                 raise jsonrpc.Error(-12345, u'p2pool is not connected to any peers')
418             if state['best_share_hash'] is None and net.PERSIST:
419                 raise jsonrpc.Error(-12345, u'p2pool is downloading shares')
420             if time.time() > current_work2.value['last_update'] + 60:
421                 raise jsonrpc.Error(-12345, u'lost contact with bitcoind')
422             
423             previous_share = None if state['best_share_hash'] is None else tracker.shares[state['best_share_hash']]
424             subsidy = current_work2.value['subsidy']
425             share_info, generate_tx = p2pool_data.generate_transaction(
426                 tracker=tracker,
427                 share_data=dict(
428                     previous_share_hash=state['best_share_hash'],
429                     coinbase='' if state['aux_work'] is None else '\xfa\xbemm' + bitcoin_data.HashType().pack(state['aux_work']['hash'])[::-1] + struct.pack('<ii', 1, 0),
430                     nonce=run_identifier + struct.pack('<Q', random.randrange(2**64)),
431                     new_script=payout_script,
432                     subsidy=subsidy,
433                     donation=math.perfect_round(65535*args.donation_percentage/100),
434                     stale_frac=(lambda shares, stales:
435                         255 if shares == 0 else math.perfect_round(254*stales/shares)
436                     )(*get_share_counts()),
437                 ),
438                 block_target=state['target'],
439                 desired_timestamp=int(time.time() - current_work2.value['clock_offset']),
440                 net=net,
441             )
442             
443             print 'New work for worker! Difficulty: %.06f Payout if block: %.6f %s Total block value: %.6f %s including %i transactions' % (
444                 bitcoin_data.target_to_difficulty(share_info['target']),
445                 (sum(t['value'] for t in generate_tx['tx_outs'] if t['script'] == payout_script) - subsidy//200)*1e-8, net.BITCOIN_SYMBOL,
446                 subsidy*1e-8, net.BITCOIN_SYMBOL,
447                 len(current_work2.value['transactions']),
448             )
449             
450             transactions = [generate_tx] + list(current_work2.value['transactions'])
451             merkle_root = bitcoin_data.merkle_hash(transactions)
452             merkle_root_to_transactions[merkle_root] = share_info, transactions, time.time()
453             
454             return bitcoin_getwork.BlockAttempt(state['version'], state['previous_block'], merkle_root, current_work2.value['time'], state['target'], share_info['target']), state['best_share_hash']
455         
456         my_shares = set()
457         doa_shares = set()
458         
459         def got_response(header, request):
460             try:
461                 user = worker_interface.get_username(request)
462                 # match up with transactions
463                 xxx = merkle_root_to_transactions.get(header['merkle_root'], None)
464                 if xxx is None:
465                     print '''Couldn't link returned work's merkle root with its transactions - should only happen if you recently restarted p2pool'''
466                     return False
467                 share_info, transactions, getwork_time = xxx
468                 
469                 hash_ = bitcoin_data.block_header_type.hash256(header)
470                 
471                 pow_hash = net.BITCOIN_POW_FUNC(header)
472                 
473                 if pow_hash <= header['target'] or p2pool.DEBUG:
474                     if factory.conn.value is not None:
475                         factory.conn.value.send_block(block=dict(header=header, txs=transactions))
476                     else:
477                         print 'No bitcoind connection! Erp!'
478                     if pow_hash <= header['target']:
479                         print
480                         print 'GOT BLOCK! Passing to bitcoind! bitcoin: %x' % (hash_,)
481                         print
482                 
483                 if current_work.value['aux_work'] is not None and pow_hash <= current_work.value['aux_work']['target']:
484                     try:
485                         aux_pow = dict(
486                             merkle_tx=dict(
487                                 tx=transactions[0],
488                                 block_hash=hash_,
489                                 merkle_branch=[x['hash'] for x in p2pool_data.calculate_merkle_branch(transactions, 0)],
490                                 index=0,
491                             ),
492                             merkle_branch=[],
493                             index=0,
494                             parent_block_header=header,
495                         )
496                         
497                         a, b = transactions[0]['tx_ins'][0]['script'][-32-8:-8].encode('hex'), bitcoin_data.aux_pow_type.pack(aux_pow).encode('hex')
498                         #print a, b
499                         merged = jsonrpc.Proxy(args.merged_url, (args.merged_userpass,))
500                         def _(res):
501                             print "MERGED RESULT:", res
502                         merged.rpc_getauxblock(a, b).addBoth(_)
503                     except:
504                         log.err(None, 'Error while processing merged mining POW:')
505                 
506                 target = share_info['target']
507                 if pow_hash > target:
508                     print 'Worker submitted share with hash > target:\nhash  : %x\ntarget: %x' % (pow_hash, target)
509                     return False
510                 share = p2pool_data.Share(net, header, share_info, other_txs=transactions[1:])
511                 my_shares.add(share.hash)
512                 if share.previous_hash != current_work.value['best_share_hash']:
513                     doa_shares.add(share.hash)
514                 print 'GOT SHARE! %s %s prev %s age %.2fs' % (user, p2pool_data.format_hash(share.hash), p2pool_data.format_hash(share.previous_hash), time.time() - getwork_time) + (' DEAD ON ARRIVAL' if share.previous_hash != current_work.value['best_share_hash'] else '')
515                 good = share.previous_hash == current_work.value['best_share_hash']
516                 # maybe revert back to tracker being non-blocking so 'good' can be more accurate?
517                 p2p_shares([share])
518                 # eg. good = share.hash == current_work.value['best_share_hash'] here
519                 return good
520             except:
521                 log.err(None, 'Error processing data received from worker:')
522                 return False
523         
524         web_root = worker_interface.WorkerInterface(compute, got_response, current_work.changed)
525         
526         def get_rate():
527             if current_work.value['best_share_hash'] is not None:
528                 height, last = tracker.get_height_and_last(current_work.value['best_share_hash'])
529                 att_s = p2pool_data.get_pool_attempts_per_second(tracker, current_work.value['best_share_hash'], min(height - 1, 720))
530                 fracs = [share.stale_frac for share in itertools.islice(tracker.get_chain_known(current_work.value['best_share_hash']), 120) if share.stale_frac is not None]
531                 return json.dumps(int(att_s / (1. - (math.median(fracs) if fracs else 0))))
532             return json.dumps(None)
533         
534         def get_users():
535             height, last = tracker.get_height_and_last(current_work.value['best_share_hash'])
536             weights, total_weight, donation_weight = tracker.get_cumulative_weights(current_work.value['best_share_hash'], min(height, 720), 65535*2**256)
537             res = {}
538             for script in sorted(weights, key=lambda s: weights[s]):
539                 res[bitcoin_data.script2_to_human(script, net)] = weights[script]/total_weight
540             return json.dumps(res)
541         
542         class WebInterface(resource.Resource):
543             def __init__(self, func, mime_type):
544                 self.func, self.mime_type = func, mime_type
545             
546             def render_GET(self, request):
547                 request.setHeader('Content-Type', self.mime_type)
548                 return self.func()
549         
550         web_root.putChild('rate', WebInterface(get_rate, 'application/json'))
551         web_root.putChild('users', WebInterface(get_users, 'application/json'))
552         web_root.putChild('fee', WebInterface(lambda: json.dumps(args.worker_fee), 'application/json'))
553         if draw is not None:
554             web_root.putChild('chain_img', WebInterface(lambda: draw.get(tracker, current_work.value['best_share_hash']), 'image/png'))
555         
556         reactor.listenTCP(args.worker_port, server.Site(web_root))
557         
558         print '    ...success!'
559         print
560         
561         # done!
562         
563         # do new getwork when a block is heard on the p2p interface
564         
565         def new_block(block_hash):
566             work_updated.happened()
567         factory.new_block.watch(new_block)
568         
569         print 'Started successfully!'
570         print
571         
572         @defer.inlineCallbacks
573         def work1_thread():
574             while True:
575                 flag = work_updated.get_deferred()
576                 try:
577                     yield set_real_work1()
578                 except:
579                     log.err()
580                 yield defer.DeferredList([flag, deferral.sleep(random.uniform(1, 10))], fireOnOneCallback=True)
581         
582         @defer.inlineCallbacks
583         def work2_thread():
584             while True:
585                 try:
586                     set_real_work2()
587                 except:
588                     log.err()
589                 yield deferral.sleep(random.expovariate(1/20))
590         
591         work1_thread()
592         work2_thread()
593         
594         
595         if hasattr(signal, 'SIGALRM'):
596             def watchdog_handler(signum, frame):
597                 print 'Watchdog timer went off at:'
598                 traceback.print_stack()
599             
600             signal.signal(signal.SIGALRM, watchdog_handler)
601             task.LoopingCall(signal.alarm, 30).start(1)
602         
603         last_str = None
604         last_time = 0
605         while True:
606             yield deferral.sleep(3)
607             try:
608                 if time.time() > current_work2.value['last_update'] + 60:
609                     print '''---> LOST CONTACT WITH BITCOIND for 60 seconds, check that it isn't frozen or dead <---'''
610                 if current_work.value['best_share_hash'] is not None:
611                     height, last = tracker.get_height_and_last(current_work.value['best_share_hash'])
612                     if height > 2:
613                         att_s = p2pool_data.get_pool_attempts_per_second(tracker, current_work.value['best_share_hash'], min(height - 1, 720))
614                         weights, total_weight, donation_weight = tracker.get_cumulative_weights(current_work.value['best_share_hash'], min(height, 720), 65535*2**256)
615                         shares, stale_doa_shares, stale_not_doa_shares = get_share_counts(True)
616                         stale_shares = stale_doa_shares + stale_not_doa_shares
617                         fracs = [share.stale_frac for share in itertools.islice(tracker.get_chain_known(current_work.value['best_share_hash']), 120) if share.stale_frac is not None]
618                         this_str = 'Pool: %sH/s in %i shares (%i/%i verified) Recent: %.02f%% >%sH/s Shares: %i (%i orphan, %i dead) Peers: %i' % (
619                             math.format(int(att_s / (1. - (math.median(fracs) if fracs else 0)))),
620                             height,
621                             len(tracker.verified.shares),
622                             len(tracker.shares),
623                             weights.get(my_script, 0)/total_weight*100,
624                             math.format(int(weights.get(my_script, 0)*att_s//total_weight / (1. - (math.median(fracs) if fracs else 0)))),
625                             shares,
626                             stale_not_doa_shares,
627                             stale_doa_shares,
628                             len(p2p_node.peers),
629                         ) + (' FDs: %i R/%i W' % (len(reactor.getReaders()), len(reactor.getWriters())) if p2pool.DEBUG else '')
630                         if fracs:
631                             med = math.median(fracs)
632                             this_str += '\nPool stales: %i%%' % (int(100*med+.5),)
633                             conf = 0.9
634                             if shares:
635                                 this_str += ' Own: %i±%i%%' % tuple(int(100*x+.5) for x in math.interval_to_center_radius(math.binomial_conf_interval(stale_shares, shares, conf)))
636                                 if med < .99:
637                                     this_str += ' Own efficiency: %i±%i%%' % tuple(int(100*x+.5) for x in math.interval_to_center_radius((1 - y)/(1 - med) for y in math.binomial_conf_interval(stale_shares, shares, conf)[::-1]))
638                                 this_str += ' (%i%% confidence)' % (int(100*conf+.5),)
639                         if this_str != last_str or time.time() > last_time + 15:
640                             print this_str
641                             last_str = this_str
642                             last_time = time.time()
643             
644             
645             except:
646                 log.err()
647     except:
648         log.err(None, 'Fatal error:')
649     finally:
650         reactor.stop()
651
652 def run():
653     class FixedArgumentParser(argparse.ArgumentParser):
654         def _read_args_from_files(self, arg_strings):
655             # expand arguments referencing files
656             new_arg_strings = []
657             for arg_string in arg_strings:
658                 
659                 # for regular arguments, just add them back into the list
660                 if not arg_string or arg_string[0] not in self.fromfile_prefix_chars:
661                     new_arg_strings.append(arg_string)
662                 
663                 # replace arguments referencing files with the file content
664                 else:
665                     try:
666                         args_file = open(arg_string[1:])
667                         try:
668                             arg_strings = []
669                             for arg_line in args_file.read().splitlines():
670                                 for arg in self.convert_arg_line_to_args(arg_line):
671                                     arg_strings.append(arg)
672                             arg_strings = self._read_args_from_files(arg_strings)
673                             new_arg_strings.extend(arg_strings)
674                         finally:
675                             args_file.close()
676                     except IOError:
677                         err = sys.exc_info()[1]
678                         self.error(str(err))
679             
680             # return the modified argument list
681             return new_arg_strings
682         
683         def convert_arg_line_to_args(self, arg_line):
684             return [arg for arg in arg_line.split() if arg.strip()]
685     
686     parser = FixedArgumentParser(description='p2pool (version %s)' % (p2pool.__version__,), fromfile_prefix_chars='@')
687     parser.add_argument('--version', action='version', version=p2pool.__version__)
688     parser.add_argument('--net',
689         help='use specified network (default: bitcoin)',
690         action='store', choices=sorted(networks.realnets), default='bitcoin', dest='net_name')
691     parser.add_argument('--testnet',
692         help='''use the network's testnet''',
693         action='store_const', const=True, default=False, dest='testnet')
694     parser.add_argument('--debug',
695         help='debugging mode',
696         action='store_const', const=True, default=False, dest='debug')
697     parser.add_argument('-a', '--address',
698         help='generate to this address (defaults to requesting one from bitcoind)',
699         type=str, action='store', default=None, dest='address')
700     parser.add_argument('--logfile',
701         help='''log to specific file (defaults to <network_name>.log in run_p2pool.py's directory)''',
702         type=str, action='store', default=None, dest='logfile')
703     parser.add_argument('--merged-url',
704         help='call getauxblock on this url to get work for merged mining',
705         type=str, action='store', default=None, dest='merged_url')
706     parser.add_argument('--merged-userpass',
707         help='merge daemon user and password, separated by a colon. Example: ncuser:ncpass',
708         type=str, action='store', default=None, dest='merged_userpass')
709     parser.add_argument('--give-author', metavar='DONATION_PERCENTAGE',
710         help='percentage amount to donate to author of p2pool. Default: 0.5',
711         type=float, action='store', default=0.5, dest='donation_percentage')
712     
713     p2pool_group = parser.add_argument_group('p2pool interface')
714     p2pool_group.add_argument('--p2pool-port', metavar='PORT',
715         help='use TCP port PORT to listen for connections (forward this port from your router!) (default: %s)' % ', '.join('%s:%i' % (n.NAME, n.P2P_PORT) for _, n in sorted(networks.realnets.items())),
716         type=int, action='store', default=None, dest='p2pool_port')
717     p2pool_group.add_argument('-n', '--p2pool-node', metavar='ADDR[:PORT]',
718         help='connect to existing p2pool node at ADDR listening on TCP port PORT (defaults to default p2pool P2P port), in addition to builtin addresses',
719         type=str, action='append', default=[], dest='p2pool_nodes')
720     parser.add_argument('--disable-upnp',
721         help='''don't attempt to forward p2pool P2P port from the WAN to this computer using UPnP''',
722         action='store_false', default=True, dest='upnp')
723     
724     worker_group = parser.add_argument_group('worker interface')
725     worker_group.add_argument('-w', '--worker-port', metavar='PORT',
726         help='listen on PORT for RPC connections from miners asking for work and providing responses (default:%s)' % ', '.join('%s:%i' % (n.NAME, n.WORKER_PORT) for _, n in sorted(networks.realnets.items())),
727         type=int, action='store', default=None, dest='worker_port')
728     worker_group.add_argument('-f', '--fee', metavar='FEE_PERCENTAGE',
729         help='''charge workers mining to their own bitcoin address (by setting their miner's username to a bitcoin address) this percentage fee to mine on your p2pool instance. Amount displayed at http://127.0.0.1:WORKER_PORT/fee . default: 0''',
730         type=float, action='store', default=0, dest='worker_fee')
731     
732     bitcoind_group = parser.add_argument_group('bitcoind interface')
733     bitcoind_group.add_argument('--bitcoind-address', metavar='BITCOIND_ADDRESS',
734         help='connect to a bitcoind at this address (default: 127.0.0.1)',
735         type=str, action='store', default='127.0.0.1', dest='bitcoind_address')
736     bitcoind_group.add_argument('--bitcoind-rpc-port', metavar='BITCOIND_RPC_PORT',
737         help='connect to a bitcoind at this port over the RPC interface - used to get the current highest block via getmemorypool (default: %s)' % ', '.join('%s:%i' % (n.NAME, n.BITCOIN_RPC_PORT) for _, n in sorted(networks.realnets.items())),
738         type=int, action='store', default=None, dest='bitcoind_rpc_port')
739     bitcoind_group.add_argument('--bitcoind-p2p-port', metavar='BITCOIND_P2P_PORT',
740         help='connect to a bitcoind at this port over the p2p interface - used to submit blocks and get the pubkey to generate to via an IP transaction (default: %s)' % ', '.join('%s:%i' % (n.NAME, n.BITCOIN_P2P_PORT) for _, n in sorted(networks.realnets.items())),
741         type=int, action='store', default=None, dest='bitcoind_p2p_port')
742     
743     bitcoind_group.add_argument(metavar='BITCOIND_RPCUSER',
744         help='bitcoind RPC interface username (default: empty)',
745         type=str, action='store', default='', nargs='?', dest='bitcoind_rpc_username')
746     bitcoind_group.add_argument(metavar='BITCOIND_RPCPASSWORD',
747         help='bitcoind RPC interface password',
748         type=str, action='store', dest='bitcoind_rpc_password')
749     
750     args = parser.parse_args()
751     
752     if args.debug:
753         p2pool.DEBUG = True
754     
755     net = networks.nets[args.net_name + ('_testnet' if args.testnet else '')]
756     
757     if args.logfile is None:
758         args.logfile = os.path.join(os.path.dirname(sys.argv[0]), net.NAME + '.log')
759     
760     class LogFile(object):
761         def __init__(self, filename):
762             self.filename = filename
763             self.inner_file = None
764             self.reopen()
765         def reopen(self):
766             if self.inner_file is not None:
767                 self.inner_file.close()
768             open(self.filename, 'a').close()
769             f = open(self.filename, 'rb')
770             f.seek(0, os.SEEK_END)
771             length = f.tell()
772             if length > 100*1000*1000:
773                 f.seek(-1000*1000, os.SEEK_END)
774                 while True:
775                     if f.read(1) in ('', '\n'):
776                         break
777                 data = f.read()
778                 f.close()
779                 f = open(self.filename, 'wb')
780                 f.write(data)
781             f.close()
782             self.inner_file = open(self.filename, 'a')
783         def write(self, data):
784             self.inner_file.write(data)
785         def flush(self):
786             self.inner_file.flush()
787     class TeePipe(object):
788         def __init__(self, outputs):
789             self.outputs = outputs
790         def write(self, data):
791             for output in self.outputs:
792                 output.write(data)
793         def flush(self):
794             for output in self.outputs:
795                 output.flush()
796     class TimestampingPipe(object):
797         def __init__(self, inner_file):
798             self.inner_file = inner_file
799             self.buf = ''
800             self.softspace = 0
801         def write(self, data):
802             buf = self.buf + data
803             lines = buf.split('\n')
804             for line in lines[:-1]:
805                 self.inner_file.write('%s %s\n' % (datetime.datetime.now().strftime("%H:%M:%S.%f"), line))
806                 self.inner_file.flush()
807             self.buf = lines[-1]
808         def flush(self):
809             pass
810     logfile = LogFile(args.logfile)
811     sys.stdout = sys.stderr = log.DefaultObserver.stderr = TimestampingPipe(TeePipe([sys.stderr, logfile]))
812     if hasattr(signal, "SIGUSR1"):
813         def sigusr1(signum, frame):
814             print 'Caught SIGUSR1, closing %r...' % (args.logfile,)
815             logfile.reopen()
816             print '...and reopened %r after catching SIGUSR1.' % (args.logfile,)
817         signal.signal(signal.SIGUSR1, sigusr1)
818     task.LoopingCall(logfile.reopen).start(5)
819     
820     if args.bitcoind_rpc_port is None:
821         args.bitcoind_rpc_port = net.BITCOIN_RPC_PORT
822     
823     if args.bitcoind_p2p_port is None:
824         args.bitcoind_p2p_port = net.BITCOIN_P2P_PORT
825     
826     if args.p2pool_port is None:
827         args.p2pool_port = net.P2P_PORT
828     
829     if args.worker_port is None:
830         args.worker_port = net.WORKER_PORT
831     
832     if args.address is not None:
833         try:
834             args.pubkey_hash = bitcoin_data.address_to_pubkey_hash(args.address, net)
835         except Exception, e:
836             parser.error('error parsing address: ' + repr(e))
837     else:
838         args.pubkey_hash = None
839     
840     if (args.merged_url is None) ^ (args.merged_userpass is None):
841         parser.error('must specify --merged-url and --merged-userpass')
842     
843     reactor.callWhenRunning(main, args, net)
844     reactor.run()