split worker logic completely into WorkerBridge class and out of main()
[p2pool.git] / p2pool / main.py
1 from __future__ import division
2
3 import ConfigParser
4 import StringIO
5 import base64
6 import json
7 import os
8 import random
9 import sys
10 import time
11 import signal
12 import traceback
13 import urlparse
14
15 if '--iocp' in sys.argv:
16     from twisted.internet import iocpreactor
17     iocpreactor.install()
18 from twisted.internet import defer, reactor, protocol, task
19 from twisted.web import server
20 from twisted.python import log
21 from nattraverso import portmapper, ipdiscover
22
23 import bitcoin.p2p as bitcoin_p2p, bitcoin.getwork as bitcoin_getwork, bitcoin.data as bitcoin_data
24 from bitcoin import worker_interface, height_tracker
25 from util import expiring_dict, fixargparse, jsonrpc, variable, deferral, math, logging, pack
26 from . import p2p, networks, web
27 import p2pool, p2pool.data as p2pool_data
28
29 @deferral.retry('Error getting work from bitcoind:', 3)
30 @defer.inlineCallbacks
31 def getwork(bitcoind):
32     try:
33         work = yield bitcoind.rpc_getmemorypool()
34     except jsonrpc.Error, e:
35         if e.code == -32601: # Method not found
36             print >>sys.stderr, 'Error: Bitcoin version too old! Upgrade to v0.5 or newer!'
37             raise deferral.RetrySilentlyException()
38         raise
39     packed_transactions = [x.decode('hex') for x in work['transactions']]
40     defer.returnValue(dict(
41         version=work['version'],
42         previous_block_hash=int(work['previousblockhash'], 16),
43         transactions=map(bitcoin_data.tx_type.unpack, packed_transactions),
44         merkle_link=bitcoin_data.calculate_merkle_link([None] + map(bitcoin_data.hash256, packed_transactions), 0),
45         subsidy=work['coinbasevalue'],
46         time=work['time'],
47         bits=bitcoin_data.FloatingIntegerType().unpack(work['bits'].decode('hex')[::-1]) if isinstance(work['bits'], (str, unicode)) else bitcoin_data.FloatingInteger(work['bits']),
48         coinbaseflags=work['coinbaseflags'].decode('hex') if 'coinbaseflags' in work else ''.join(x.decode('hex') for x in work['coinbaseaux'].itervalues()) if 'coinbaseaux' in work else '',
49     ))
50
51 class WorkerBridge(worker_interface.WorkerBridge):
52     def __init__(self, lp_signal, my_pubkey_hash, net, donation_percentage, current_work, tracker, my_share_hashes, my_doa_share_hashes, worker_fee, p2p_node, submit_block, compute_work, shared_share_hashes):
53         worker_interface.WorkerBridge.__init__(self)
54         self.new_work_event = lp_signal
55         self.recent_shares_ts_work = []
56         
57         self.my_pubkey_hash = my_pubkey_hash
58         self.net = net
59         self.donation_percentage = donation_percentage
60         self.current_work = current_work
61         self.tracker = tracker
62         self.my_share_hashes = my_share_hashes
63         self.my_doa_share_hashes = my_doa_share_hashes
64         self.worker_fee = worker_fee
65         self.p2p_node = p2p_node
66         self.submit_block = submit_block
67         self.compute_work = compute_work
68         self.shared_share_hashes = shared_share_hashes
69         
70         self.pseudoshare_received = variable.Event()
71         self.share_received = variable.Event()
72         self.local_rate_monitor = math.RateMonitor(10*60)
73         
74         self.removed_unstales_var = variable.Variable((0, 0, 0))
75         self.removed_doa_unstales_var = variable.Variable(0)
76         
77         @tracker.verified.removed.watch
78         def _(share):
79             if share.hash in self.my_share_hashes and tracker.is_child_of(share.hash, self.current_work.value['best_share_hash']):
80                 assert share.share_data['stale_info'] in [None, 'orphan', 'doa'] # we made these shares in this instance
81                 self.removed_unstales_var.set((
82                     self.removed_unstales_var.value[0] + 1,
83                     self.removed_unstales_var.value[1] + (1 if share.share_data['stale_info'] == 'orphan' else 0),
84                     self.removed_unstales_var.value[2] + (1 if share.share_data['stale_info'] == 'doa' else 0),
85                 ))
86             if share.hash in self.my_doa_share_hashes and self.tracker.is_child_of(share.hash, self.current_work.value['best_share_hash']):
87                 self.removed_doa_unstales_var.set(self.removed_doa_unstales_var.value + 1)
88     
89     def get_stale_counts(self):
90         '''Returns (orphans, doas), total, (orphans_recorded_in_chain, doas_recorded_in_chain)'''
91         my_shares = len(self.my_share_hashes)
92         my_doa_shares = len(self.my_doa_share_hashes)
93         delta = self.tracker.verified.get_delta_to_last(self.current_work.value['best_share_hash'])
94         my_shares_in_chain = delta.my_count + self.removed_unstales_var.value[0]
95         my_doa_shares_in_chain = delta.my_doa_count + self.removed_doa_unstales_var.value
96         orphans_recorded_in_chain = delta.my_orphan_announce_count + self.removed_unstales_var.value[1]
97         doas_recorded_in_chain = delta.my_dead_announce_count + self.removed_unstales_var.value[2]
98         
99         my_shares_not_in_chain = my_shares - my_shares_in_chain
100         my_doa_shares_not_in_chain = my_doa_shares - my_doa_shares_in_chain
101         
102         return (my_shares_not_in_chain - my_doa_shares_not_in_chain, my_doa_shares_not_in_chain), my_shares, (orphans_recorded_in_chain, doas_recorded_in_chain)
103     
104     def get_user_details(self, request):
105         user = request.getUser() if request.getUser() is not None else ''
106         
107         desired_pseudoshare_target = None
108         if '+' in user:
109             user, desired_pseudoshare_difficulty_str = user.rsplit('+', 1)
110             try:
111                 desired_pseudoshare_target = bitcoin_data.difficulty_to_target(float(desired_pseudoshare_difficulty_str))
112             except:
113                 pass
114         
115         desired_share_target = 2**256 - 1
116         if '/' in user:
117             user, min_diff_str = user.rsplit('/', 1)
118             try:
119                 desired_share_target = bitcoin_data.difficulty_to_target(float(min_diff_str))
120             except:
121                 pass
122         
123         if random.uniform(0, 100) < self.worker_fee:
124             pubkey_hash = self.my_pubkey_hash
125         else:
126             try:
127                 pubkey_hash = bitcoin_data.address_to_pubkey_hash(user, self.net.PARENT)
128             except: # XXX blah
129                 pubkey_hash = self.my_pubkey_hash
130         
131         return user, pubkey_hash, desired_share_target, desired_pseudoshare_target
132     
133     def preprocess_request(self, request):
134         user, pubkey_hash, desired_share_target, desired_pseudoshare_target = self.get_user_details(request)
135         return pubkey_hash, desired_share_target, desired_pseudoshare_target
136     
137     def get_work(self, pubkey_hash, desired_share_target, desired_pseudoshare_target):
138         if len(self.p2p_node.peers) == 0 and self.net.PERSIST:
139             raise jsonrpc.Error(-12345, u'p2pool is not connected to any peers')
140         if self.current_work.value['best_share_hash'] is None and self.net.PERSIST:
141             raise jsonrpc.Error(-12345, u'p2pool is downloading shares')
142         if time.time() > self.current_work.value['last_update'] + 60:
143             raise jsonrpc.Error(-12345, u'lost contact with bitcoind')
144         
145         if self.current_work.value['mm_chains']:
146             tree, size = bitcoin_data.make_auxpow_tree(self.current_work.value['mm_chains'])
147             mm_hashes = [self.current_work.value['mm_chains'].get(tree.get(i), dict(hash=0))['hash'] for i in xrange(size)]
148             mm_data = '\xfa\xbemm' + bitcoin_data.aux_pow_coinbase_type.pack(dict(
149                 merkle_root=bitcoin_data.merkle_hash(mm_hashes),
150                 size=size,
151                 nonce=0,
152             ))
153             mm_later = [(aux_work, mm_hashes.index(aux_work['hash']), mm_hashes) for chain_id, aux_work in self.current_work.value['mm_chains'].iteritems()]
154         else:
155             mm_data = ''
156             mm_later = []
157         
158         if True:
159             share_info, generate_tx = p2pool_data.Share.generate_transaction(
160                 tracker=self.tracker,
161                 share_data=dict(
162                     previous_share_hash=self.current_work.value['best_share_hash'],
163                     coinbase=(mm_data + self.current_work.value['coinbaseflags'])[:100],
164                     nonce=random.randrange(2**32),
165                     pubkey_hash=pubkey_hash,
166                     subsidy=self.current_work.value['subsidy'],
167                     donation=math.perfect_round(65535*self.donation_percentage/100),
168                     stale_info=(lambda (orphans, doas), total, (orphans_recorded_in_chain, doas_recorded_in_chain):
169                         'orphan' if orphans > orphans_recorded_in_chain else
170                         'doa' if doas > doas_recorded_in_chain else
171                         None
172                     )(*self.get_stale_counts()),
173                     desired_version=3,
174                 ),
175                 block_target=self.current_work.value['bits'].target,
176                 desired_timestamp=int(time.time() - self.current_work.value['clock_offset']),
177                 desired_target=desired_share_target,
178                 ref_merkle_link=dict(branch=[], index=0),
179                 net=self.net,
180             )
181         
182         if desired_pseudoshare_target is None:
183             target = 2**256-1
184             if len(self.recent_shares_ts_work) == 50:
185                 hash_rate = sum(work for ts, work in self.recent_shares_ts_work[1:])//(self.recent_shares_ts_work[-1][0] - self.recent_shares_ts_work[0][0])
186                 if hash_rate:
187                     target = min(target, int(2**256/hash_rate))
188         else:
189             target = desired_pseudoshare_target
190         target = max(target, share_info['bits'].target)
191         for aux_work in self.current_work.value['mm_chains'].itervalues():
192             target = max(target, aux_work['target'])
193         target = math.clip(target, self.net.PARENT.SANE_TARGET_RANGE)
194         
195         transactions = [generate_tx] + list(self.current_work.value['transactions'])
196         packed_generate_tx = bitcoin_data.tx_type.pack(generate_tx)
197         merkle_root = bitcoin_data.check_merkle_link(bitcoin_data.hash256(packed_generate_tx), self.current_work.value['merkle_link'])
198         
199         getwork_time = time.time()
200         merkle_link = self.current_work.value['merkle_link']
201         
202         print 'New work for worker! Difficulty: %.06f Share difficulty: %.06f Total block value: %.6f %s including %i transactions' % (
203             bitcoin_data.target_to_difficulty(target),
204             bitcoin_data.target_to_difficulty(share_info['bits'].target),
205             self.current_work.value['subsidy']*1e-8, self.net.PARENT.SYMBOL,
206             len(self.current_work.value['transactions']),
207         )
208         
209         bits = self.current_work.value['bits']
210         previous_block = self.current_work.value['previous_block']
211         ba = bitcoin_getwork.BlockAttempt(
212             version=self.current_work.value['version'],
213             previous_block=self.current_work.value['previous_block'],
214             merkle_root=merkle_root,
215             timestamp=self.current_work.value['time'],
216             bits=self.current_work.value['bits'],
217             share_target=target,
218         )
219         
220         received_header_hashes = set()
221         
222         def got_response(header, request):
223             header_hash = bitcoin_data.hash256(bitcoin_data.block_header_type.pack(header))
224             pow_hash = self.net.PARENT.POW_FUNC(bitcoin_data.block_header_type.pack(header))
225             try:
226                 if pow_hash <= header['bits'].target or p2pool.DEBUG:
227                     self.submit_block(dict(header=header, txs=transactions), ignore_failure=False)
228                     if pow_hash <= header['bits'].target:
229                         print
230                         print 'GOT BLOCK FROM MINER! Passing to bitcoind! %s%064x' % (self.net.PARENT.BLOCK_EXPLORER_URL_PREFIX, header_hash)
231                         print
232             except:
233                 log.err(None, 'Error while processing potential block:')
234             
235             user, _, _, _ = self.get_user_details(request)
236             assert header['merkle_root'] == merkle_root
237             assert header['previous_block'] == previous_block
238             assert header['bits'] == bits
239             
240             on_time = self.current_work.value['best_share_hash'] == share_info['share_data']['previous_share_hash']
241             
242             for aux_work, index, hashes in mm_later:
243                 try:
244                     if pow_hash <= aux_work['target'] or p2pool.DEBUG:
245                         df = deferral.retry('Error submitting merged block: (will retry)', 10, 10)(aux_work['merged_proxy'].rpc_getauxblock)(
246                             pack.IntType(256, 'big').pack(aux_work['hash']).encode('hex'),
247                             bitcoin_data.aux_pow_type.pack(dict(
248                                 merkle_tx=dict(
249                                     tx=transactions[0],
250                                     block_hash=header_hash,
251                                     merkle_link=merkle_link,
252                                 ),
253                                 merkle_link=bitcoin_data.calculate_merkle_link(hashes, index),
254                                 parent_block_header=header,
255                             )).encode('hex'),
256                         )
257                         @df.addCallback
258                         def _(result):
259                             if result != (pow_hash <= aux_work['target']):
260                                 print >>sys.stderr, 'Merged block submittal result: %s Expected: %s' % (result, pow_hash <= aux_work['target'])
261                             else:
262                                 print 'Merged block submittal result: %s' % (result,)
263                         @df.addErrback
264                         def _(err):
265                             log.err(err, 'Error submitting merged block:')
266                 except:
267                     log.err(None, 'Error while processing merged mining POW:')
268             
269             if pow_hash <= share_info['bits'].target and header_hash not in received_header_hashes:
270                 min_header = dict(header);del min_header['merkle_root']
271                 hash_link = p2pool_data.prefix_to_hash_link(packed_generate_tx[:-32-4], p2pool_data.Share.gentx_before_refhash)
272                 share = p2pool_data.Share(self.net, None, dict(
273                     min_header=min_header, share_info=share_info, hash_link=hash_link,
274                     ref_merkle_link=dict(branch=[], index=0),
275                 ), merkle_link=merkle_link, other_txs=transactions[1:] if pow_hash <= header['bits'].target else None)
276                 
277                 print 'GOT SHARE! %s %s prev %s age %.2fs%s' % (
278                     request.getUser(),
279                     p2pool_data.format_hash(share.hash),
280                     p2pool_data.format_hash(share.previous_hash),
281                     time.time() - getwork_time,
282                     ' DEAD ON ARRIVAL' if not on_time else '',
283                 )
284                 self.my_share_hashes.add(share.hash)
285                 if not on_time:
286                     self.my_doa_share_hashes.add(share.hash)
287                 
288                 self.tracker.add(share)
289                 if not p2pool.DEBUG:
290                     self.tracker.verified.add(share)
291                 self.compute_work()
292                 
293                 try:
294                     if pow_hash <= header['bits'].target or p2pool.DEBUG:
295                         for peer in self.p2p_node.peers.itervalues():
296                             peer.sendShares([share])
297                         self.shared_share_hashes.add(share.hash)
298                 except:
299                     log.err(None, 'Error forwarding block solution:')
300                 
301                 self.share_received.happened(bitcoin_data.target_to_average_attempts(share.target), not on_time)
302             
303             if pow_hash > target:
304                 print 'Worker %s submitted share with hash > target:' % (request.getUser(),)
305                 print '    Hash:   %56x' % (pow_hash,)
306                 print '    Target: %56x' % (target,)
307             elif header_hash in received_header_hashes:
308                 print >>sys.stderr, 'Worker %s @ %s submitted share more than once!' % (request.getUser(), request.getClientIP())
309             else:
310                 received_header_hashes.add(header_hash)
311                 
312                 self.pseudoshare_received.happened(bitcoin_data.target_to_average_attempts(target), not on_time, user)
313                 self.recent_shares_ts_work.append((time.time(), bitcoin_data.target_to_average_attempts(target)))
314                 while len(self.recent_shares_ts_work) > 50:
315                     self.recent_shares_ts_work.pop(0)
316                 self.local_rate_monitor.add_datum(dict(work=bitcoin_data.target_to_average_attempts(target), dead=not on_time, user=user))
317             
318             return on_time
319         
320         return ba, got_response
321
322 @defer.inlineCallbacks
323 def main(args, net, datadir_path, merged_urls, worker_endpoint):
324     try:
325         print 'p2pool (version %s)' % (p2pool.__version__,)
326         print
327         
328         # connect to bitcoind over JSON-RPC and do initial getmemorypool
329         url = 'http://%s:%i/' % (args.bitcoind_address, args.bitcoind_rpc_port)
330         print '''Testing bitcoind RPC connection to '%s' with username '%s'...''' % (url, args.bitcoind_rpc_username)
331         bitcoind = jsonrpc.Proxy(url, dict(Authorization='Basic ' + base64.b64encode(args.bitcoind_rpc_username + ':' + args.bitcoind_rpc_password)), timeout=30)
332         @deferral.retry('Error while checking Bitcoin connection:', 1)
333         @defer.inlineCallbacks
334         def check():
335             if not (yield net.PARENT.RPC_CHECK)(bitcoind):
336                 print >>sys.stderr, "    Check failed! Make sure that you're connected to the right bitcoind with --bitcoind-rpc-port!"
337                 raise deferral.RetrySilentlyException()
338             temp_work = yield getwork(bitcoind)
339             if not net.VERSION_CHECK((yield bitcoind.rpc_getinfo())['version'], temp_work):
340                 print >>sys.stderr, '    Bitcoin version too old! BIP16 support required! Upgrade to 0.6.0rc4 or greater!'
341                 raise deferral.RetrySilentlyException()
342             defer.returnValue(temp_work)
343         temp_work = yield check()
344         
345         block_height_var = variable.Variable(None)
346         @defer.inlineCallbacks
347         def poll_height():
348             block_height_var.set((yield deferral.retry('Error while calling getblockcount:')(bitcoind.rpc_getblockcount)()))
349         yield poll_height()
350         task.LoopingCall(poll_height).start(60*60)
351         
352         print '    ...success!'
353         print '    Current block hash: %x' % (temp_work['previous_block_hash'],)
354         print '    Current block height: %i' % (block_height_var.value,)
355         print
356         
357         # connect to bitcoind over bitcoin-p2p
358         print '''Testing bitcoind P2P connection to '%s:%s'...''' % (args.bitcoind_address, args.bitcoind_p2p_port)
359         factory = bitcoin_p2p.ClientFactory(net.PARENT)
360         reactor.connectTCP(args.bitcoind_address, args.bitcoind_p2p_port, factory)
361         yield factory.getProtocol() # waits until handshake is successful
362         print '    ...success!'
363         print
364         
365         print 'Determining payout address...'
366         if args.pubkey_hash is None:
367             address_path = os.path.join(datadir_path, 'cached_payout_address')
368             
369             if os.path.exists(address_path):
370                 with open(address_path, 'rb') as f:
371                     address = f.read().strip('\r\n')
372                 print '    Loaded cached address: %s...' % (address,)
373             else:
374                 address = None
375             
376             if address is not None:
377                 res = yield deferral.retry('Error validating cached address:', 5)(lambda: bitcoind.rpc_validateaddress(address))()
378                 if not res['isvalid'] or not res['ismine']:
379                     print '    Cached address is either invalid or not controlled by local bitcoind!'
380                     address = None
381             
382             if address is None:
383                 print '    Getting payout address from bitcoind...'
384                 address = yield deferral.retry('Error getting payout address from bitcoind:', 5)(lambda: bitcoind.rpc_getaccountaddress('p2pool'))()
385             
386             with open(address_path, 'wb') as f:
387                 f.write(address)
388             
389             my_pubkey_hash = bitcoin_data.address_to_pubkey_hash(address, net.PARENT)
390         else:
391             my_pubkey_hash = args.pubkey_hash
392         print '    ...success! Payout address:', bitcoin_data.pubkey_hash_to_address(my_pubkey_hash, net.PARENT)
393         print
394         
395         my_share_hashes = set()
396         my_doa_share_hashes = set()
397         
398         tracker = p2pool_data.OkayTracker(net, my_share_hashes, my_doa_share_hashes)
399         shared_share_hashes = set()
400         ss = p2pool_data.ShareStore(os.path.join(datadir_path, 'shares.'), net)
401         known_verified = set()
402         print "Loading shares..."
403         for i, (mode, contents) in enumerate(ss.get_shares()):
404             if mode == 'share':
405                 if contents.hash in tracker.shares:
406                     continue
407                 shared_share_hashes.add(contents.hash)
408                 contents.time_seen = 0
409                 tracker.add(contents)
410                 if len(tracker.shares) % 1000 == 0 and tracker.shares:
411                     print "    %i" % (len(tracker.shares),)
412             elif mode == 'verified_hash':
413                 known_verified.add(contents)
414             else:
415                 raise AssertionError()
416         print "    ...inserting %i verified shares..." % (len(known_verified),)
417         for h in known_verified:
418             if h not in tracker.shares:
419                 ss.forget_verified_share(h)
420                 continue
421             tracker.verified.add(tracker.shares[h])
422         print "    ...done loading %i shares!" % (len(tracker.shares),)
423         print
424         tracker.removed.watch(lambda share: ss.forget_share(share.hash))
425         tracker.verified.removed.watch(lambda share: ss.forget_verified_share(share.hash))
426         tracker.removed.watch(lambda share: shared_share_hashes.discard(share.hash))
427         
428         print 'Initializing work...'
429         
430         
431         # BITCOIND WORK
432         
433         bitcoind_work = variable.Variable(None)
434         
435         @defer.inlineCallbacks
436         def poll_bitcoind():
437             work = yield getwork(bitcoind)
438             bitcoind_work.set(dict(
439                 version=work['version'],
440                 previous_block=work['previous_block_hash'],
441                 bits=work['bits'],
442                 coinbaseflags=work['coinbaseflags'],
443                 time=work['time'],
444                 transactions=work['transactions'],
445                 merkle_link=work['merkle_link'],
446                 subsidy=work['subsidy'],
447                 clock_offset=time.time() - work['time'],
448                 last_update=time.time(),
449             ))
450         yield poll_bitcoind()
451         
452         @defer.inlineCallbacks
453         def work_poller():
454             while True:
455                 flag = factory.new_block.get_deferred()
456                 try:
457                     yield poll_bitcoind()
458                 except:
459                     log.err()
460                 yield defer.DeferredList([flag, deferral.sleep(15)], fireOnOneCallback=True)
461         work_poller()
462         
463         # PEER WORK
464         
465         best_block_header = variable.Variable(None)
466         def handle_header(new_header):
467             # check that header matches current target
468             if not (net.PARENT.POW_FUNC(bitcoin_data.block_header_type.pack(new_header)) <= bitcoind_work.value['bits'].target):
469                 return
470             bitcoind_best_block = bitcoind_work.value['previous_block']
471             if (best_block_header.value is None
472                 or (
473                     new_header['previous_block'] == bitcoind_best_block and
474                     bitcoin_data.hash256(bitcoin_data.block_header_type.pack(best_block_header.value)) == bitcoind_best_block
475                 ) # new is child of current and previous is current
476                 or (
477                     bitcoin_data.hash256(bitcoin_data.block_header_type.pack(new_header)) == bitcoind_best_block and
478                     best_block_header.value['previous_block'] != bitcoind_best_block
479                 )): # new is current and previous is not a child of current
480                 best_block_header.set(new_header)
481         @defer.inlineCallbacks
482         def poll_header():
483             handle_header((yield factory.conn.value.get_block_header(bitcoind_work.value['previous_block'])))
484         bitcoind_work.changed.watch(lambda _: poll_header())
485         yield poll_header()
486         
487         # MERGED WORK
488         
489         merged_work = variable.Variable({})
490         
491         @defer.inlineCallbacks
492         def set_merged_work(merged_url, merged_userpass):
493             merged_proxy = jsonrpc.Proxy(merged_url, dict(Authorization='Basic ' + base64.b64encode(merged_userpass)))
494             while True:
495                 auxblock = yield deferral.retry('Error while calling merged getauxblock:', 1)(merged_proxy.rpc_getauxblock)()
496                 merged_work.set(dict(merged_work.value, **{auxblock['chainid']: dict(
497                     hash=int(auxblock['hash'], 16),
498                     target=pack.IntType(256).unpack(auxblock['target'].decode('hex')),
499                     merged_proxy=merged_proxy,
500                 )}))
501                 yield deferral.sleep(1)
502         for merged_url, merged_userpass in merged_urls:
503             set_merged_work(merged_url, merged_userpass)
504         
505         @merged_work.changed.watch
506         def _(new_merged_work):
507             print 'Got new merged mining work!'
508         
509         # COMBINE WORK
510         
511         current_work = variable.Variable(None)
512         
513         get_height_rel_highest = yield height_tracker.get_height_rel_highest_func(bitcoind, factory, lambda: bitcoind_work.value['previous_block'], net)
514         requested = expiring_dict.ExpiringDict(300)
515         peer_heads = expiring_dict.ExpiringDict(300) # hash -> peers that know of it
516         def compute_work():
517             best, desired = tracker.think(get_height_rel_highest, bitcoind_work.value['previous_block'], bitcoind_work.value['bits'])
518             
519             t = dict(bitcoind_work.value)
520             
521             if (best_block_header.value is not None and
522             best_block_header.value['previous_block'] == t['previous_block'] and
523             net.PARENT.POW_FUNC(bitcoin_data.block_header_type.pack(best_block_header.value)) <= t['bits'].target):
524                 print 'Skipping from block %x to block %x!' % (best_block_header.value['previous_block'],
525                     bitcoin_data.hash256(bitcoin_data.block_header_type.pack(best_block_header.value)))
526                 t = dict(
527                     version=best_block_header.value['version'],
528                     previous_block=bitcoin_data.hash256(bitcoin_data.block_header_type.pack(best_block_header.value)),
529                     bits=best_block_header.value['bits'], # not always true
530                     coinbaseflags='',
531                     time=best_block_header.value['timestamp'] + 600, # better way?
532                     transactions=[],
533                     merkle_link=bitcoin_data.calculate_merkle_link([None], 0),
534                     subsidy=net.PARENT.SUBSIDY_FUNC(block_height_var.value),
535                     clock_offset=current_work.value['clock_offset'],
536                     last_update=current_work.value['last_update'],
537                 )
538             
539             t['best_share_hash'] = best
540             t['mm_chains'] = merged_work.value
541             current_work.set(t)
542             
543             t = time.time()
544             for peer2, share_hash in desired:
545                 if share_hash not in tracker.tails: # was received in the time tracker.think was running
546                     continue
547                 last_request_time, count = requested.get(share_hash, (None, 0))
548                 if last_request_time is not None and last_request_time - 5 < t < last_request_time + 10 * 1.5**count:
549                     continue
550                 potential_peers = set()
551                 for head in tracker.tails[share_hash]:
552                     potential_peers.update(peer_heads.get(head, set()))
553                 potential_peers = [peer for peer in potential_peers if peer.connected2]
554                 if count == 0 and peer2 is not None and peer2.connected2:
555                     peer = peer2
556                 else:
557                     peer = random.choice(potential_peers) if potential_peers and random.random() > .2 else peer2
558                     if peer is None:
559                         continue
560                 
561                 print 'Requesting parent share %s from %s' % (p2pool_data.format_hash(share_hash), '%s:%i' % peer.addr)
562                 peer.send_getshares(
563                     hashes=[share_hash],
564                     parents=2000,
565                     stops=list(set(tracker.heads) | set(
566                         tracker.get_nth_parent_hash(head, min(max(0, tracker.get_height_and_last(head)[0] - 1), 10)) for head in tracker.heads
567                     ))[:100],
568                 )
569                 requested[share_hash] = t, count + 1
570         bitcoind_work.changed.watch(lambda _: compute_work())
571         merged_work.changed.watch(lambda _: compute_work())
572         best_block_header.changed.watch(lambda _: compute_work())
573         compute_work()
574         
575         # LONG POLLING
576         
577         lp_signal = variable.Event()
578         
579         @current_work.transitioned.watch
580         def _(before, after):
581             # trigger LP if version/previous_block/bits changed or transactions changed from nothing
582             if any(before[x] != after[x] for x in ['version', 'previous_block', 'bits', 'best_share_hash', 'mm_chains']) or (not before['transactions'] and after['transactions']):
583                 lp_signal.happened()
584         
585         
586         print '    ...success!'
587         print
588         
589         # setup p2p logic and join p2pool network
590         
591         class Node(p2p.Node):
592             def handle_shares(self, shares, peer):
593                 if len(shares) > 5:
594                     print 'Processing %i shares from %s...' % (len(shares), '%s:%i' % peer.addr if peer is not None else None)
595                 
596                 new_count = 0
597                 for share in shares:
598                     if share.hash in tracker.shares:
599                         #print 'Got duplicate share, ignoring. Hash: %s' % (p2pool_data.format_hash(share.hash),)
600                         continue
601                     
602                     new_count += 1
603                     
604                     #print 'Received share %s from %r' % (p2pool_data.format_hash(share.hash), share.peer.addr if share.peer is not None else None)
605                     
606                     tracker.add(share)
607                 
608                 if shares and peer is not None:
609                     peer_heads.setdefault(shares[0].hash, set()).add(peer)
610                 
611                 if new_count:
612                     compute_work()
613                 
614                 if len(shares) > 5:
615                     print '... done processing %i shares. New: %i Have: %i/~%i' % (len(shares), new_count, len(tracker.shares), 2*net.CHAIN_LENGTH)
616             
617             def handle_share_hashes(self, hashes, peer):
618                 t = time.time()
619                 get_hashes = []
620                 for share_hash in hashes:
621                     if share_hash in tracker.shares:
622                         continue
623                     last_request_time, count = requested.get(share_hash, (None, 0))
624                     if last_request_time is not None and last_request_time - 5 < t < last_request_time + 10 * 1.5**count:
625                         continue
626                     print 'Got share hash, requesting! Hash: %s' % (p2pool_data.format_hash(share_hash),)
627                     get_hashes.append(share_hash)
628                     requested[share_hash] = t, count + 1
629                 
630                 if hashes and peer is not None:
631                     peer_heads.setdefault(hashes[0], set()).add(peer)
632                 if get_hashes:
633                     peer.send_getshares(hashes=get_hashes, parents=0, stops=[])
634             
635             def handle_get_shares(self, hashes, parents, stops, peer):
636                 parents = min(parents, 1000//len(hashes))
637                 stops = set(stops)
638                 shares = []
639                 for share_hash in hashes:
640                     for share in tracker.get_chain(share_hash, min(parents + 1, tracker.get_height(share_hash))):
641                         if share.hash in stops:
642                             break
643                         shares.append(share)
644                 print 'Sending %i shares to %s:%i' % (len(shares), peer.addr[0], peer.addr[1])
645                 return shares
646             
647             def handle_bestblock(self, header, peer):
648                 if net.PARENT.POW_FUNC(bitcoin_data.block_header_type.pack(header)) > header['bits'].target:
649                     raise p2p.PeerMisbehavingError('received block header fails PoW test')
650                 handle_header(header)
651         
652         @deferral.retry('Error submitting primary block: (will retry)', 10, 10)
653         def submit_block_p2p(block):
654             if factory.conn.value is None:
655                 print >>sys.stderr, 'No bitcoind connection when block submittal attempted! %s%32x' % (net.PARENT.BLOCK_EXPLORER_URL_PREFIX, bitcoin_data.hash256(bitcoin_data.block_header_type.pack(block['header'])))
656                 raise deferral.RetrySilentlyException()
657             factory.conn.value.send_block(block=block)
658         
659         @deferral.retry('Error submitting block: (will retry)', 10, 10)
660         @defer.inlineCallbacks
661         def submit_block_rpc(block, ignore_failure):
662             success = yield bitcoind.rpc_getmemorypool(bitcoin_data.block_type.pack(block).encode('hex'))
663             success_expected = net.PARENT.POW_FUNC(bitcoin_data.block_header_type.pack(block['header'])) <= block['header']['bits'].target
664             if (not success and success_expected and not ignore_failure) or (success and not success_expected):
665                 print >>sys.stderr, 'Block submittal result: %s Expected: %s' % (success, success_expected)
666         
667         def submit_block(block, ignore_failure):
668             submit_block_p2p(block)
669             submit_block_rpc(block, ignore_failure)
670         
671         @tracker.verified.added.watch
672         def _(share):
673             if share.pow_hash <= share.header['bits'].target:
674                 submit_block(share.as_block(tracker), ignore_failure=True)
675                 print
676                 print 'GOT BLOCK FROM PEER! Passing to bitcoind! %s bitcoin: %s%064x' % (p2pool_data.format_hash(share.hash), net.PARENT.BLOCK_EXPLORER_URL_PREFIX, share.header_hash)
677                 print
678                 def spread():
679                     if (get_height_rel_highest(share.header['previous_block']) > -5 or
680                         current_work.value['previous_block'] in [share.header['previous_block'], share.header_hash]):
681                         broadcast_share(share.hash)
682                 spread()
683                 reactor.callLater(5, spread) # so get_height_rel_highest can update
684         
685         print 'Joining p2pool network using port %i...' % (args.p2pool_port,)
686         
687         @defer.inlineCallbacks
688         def parse(x):
689             if ':' in x:
690                 ip, port = x.split(':')
691                 defer.returnValue(((yield reactor.resolve(ip)), int(port)))
692             else:
693                 defer.returnValue(((yield reactor.resolve(x)), net.P2P_PORT))
694         
695         addrs = {}
696         if os.path.exists(os.path.join(datadir_path, 'addrs')):
697             try:
698                 with open(os.path.join(datadir_path, 'addrs'), 'rb') as f:
699                     addrs.update(dict((tuple(k), v) for k, v in json.loads(f.read())))
700             except:
701                 print >>sys.stderr, 'error parsing addrs'
702         elif os.path.exists(os.path.join(datadir_path, 'addrs.txt')):
703             try:
704                 addrs.update(dict(eval(x) for x in open(os.path.join(datadir_path, 'addrs.txt'))))
705             except:
706                 print >>sys.stderr, "error reading addrs.txt"
707         for addr_df in map(parse, net.BOOTSTRAP_ADDRS):
708             try:
709                 addr = yield addr_df
710                 if addr not in addrs:
711                     addrs[addr] = (0, time.time(), time.time())
712             except:
713                 log.err()
714         
715         connect_addrs = set()
716         for addr_df in map(parse, args.p2pool_nodes):
717             try:
718                 connect_addrs.add((yield addr_df))
719             except:
720                 log.err()
721         
722         p2p_node = Node(
723             best_share_hash_func=lambda: current_work.value['best_share_hash'],
724             port=args.p2pool_port,
725             net=net,
726             addr_store=addrs,
727             connect_addrs=connect_addrs,
728             max_incoming_conns=args.p2pool_conns,
729         )
730         p2p_node.start()
731         
732         def save_addrs():
733             with open(os.path.join(datadir_path, 'addrs'), 'wb') as f:
734                 f.write(json.dumps(p2p_node.addr_store.items()))
735         task.LoopingCall(save_addrs).start(60)
736         
737         @best_block_header.changed.watch
738         def _(header):
739             for peer in p2p_node.peers.itervalues():
740                 peer.send_bestblock(header=header)
741         
742         def broadcast_share(share_hash):
743             shares = []
744             for share in tracker.get_chain(share_hash, min(5, tracker.get_height(share_hash))):
745                 if share.hash in shared_share_hashes:
746                     break
747                 shared_share_hashes.add(share.hash)
748                 shares.append(share)
749             
750             for peer in p2p_node.peers.itervalues():
751                 peer.sendShares([share for share in shares if share.peer is not peer])
752         
753         # send share when the chain changes to their chain
754         current_work.changed.watch(lambda new_work: broadcast_share(new_work['best_share_hash']))
755         
756         def save_shares():
757             for share in tracker.get_chain(current_work.value['best_share_hash'], min(tracker.get_height(current_work.value['best_share_hash']), 2*net.CHAIN_LENGTH)):
758                 ss.add_share(share)
759                 if share.hash in tracker.verified.shares:
760                     ss.add_verified_hash(share.hash)
761         task.LoopingCall(save_shares).start(60)
762         
763         print '    ...success!'
764         print
765         
766         if args.upnp:
767             @defer.inlineCallbacks
768             def upnp_thread():
769                 while True:
770                     try:
771                         is_lan, lan_ip = yield ipdiscover.get_local_ip()
772                         if is_lan:
773                             pm = yield portmapper.get_port_mapper()
774                             yield pm._upnp.add_port_mapping(lan_ip, args.p2pool_port, args.p2pool_port, 'p2pool', 'TCP')
775                     except defer.TimeoutError:
776                         pass
777                     except:
778                         if p2pool.DEBUG:
779                             log.err(None, 'UPnP error:')
780                     yield deferral.sleep(random.expovariate(1/120))
781             upnp_thread()
782         
783         # start listening for workers with a JSON-RPC server
784         
785         print 'Listening for workers on %r port %i...' % (worker_endpoint[0], worker_endpoint[1])
786         
787         get_current_txouts = lambda: p2pool_data.get_expected_payouts(tracker, current_work.value['best_share_hash'], current_work.value['bits'].target, current_work.value['subsidy'], net)
788         
789         wb = WorkerBridge(lp_signal, my_pubkey_hash, net, args.donation_percentage, current_work, tracker, my_share_hashes, my_doa_share_hashes, args.worker_fee, p2p_node, submit_block, compute_work, shared_share_hashes)
790         web_root = web.get_web_root(tracker, current_work, get_current_txouts, datadir_path, net, wb.get_stale_counts, my_pubkey_hash, wb.local_rate_monitor, args.worker_fee, p2p_node, wb.my_share_hashes, wb.pseudoshare_received, wb.share_received)
791         worker_interface.WorkerInterface(wb).attach_to(web_root, get_handler=lambda request: request.redirect('/static/'))
792         
793         deferral.retry('Error binding to worker port:', traceback=False)(reactor.listenTCP)(worker_endpoint[1], server.Site(web_root), interface=worker_endpoint[0])
794         
795         with open(os.path.join(os.path.join(datadir_path, 'ready_flag')), 'wb') as f:
796             pass
797         
798         print '    ...success!'
799         print
800         
801         
802         # done!
803         print 'Started successfully!'
804         print 'Go to http://127.0.0.1:%i/ to view graphs and statistics!' % (worker_endpoint[1],)
805         if args.donation_percentage > 0.51:
806             print '''Donating %.1f%% of work towards P2Pool's development. Thanks for the tip!''' % (args.donation_percentage,)
807         elif args.donation_percentage < 0.49:
808             print '''Donating %.1f%% of work towards P2Pool's development. Please donate to encourage further development of P2Pool!''' % (args.donation_percentage,)
809         else:
810             print '''Donating %.1f%% of work towards P2Pool's development. Thank you!''' % (args.donation_percentage,)
811             print 'You can increase this amount with --give-author argument! (or decrease it, if you must)'
812         print
813         
814         
815         if hasattr(signal, 'SIGALRM'):
816             signal.signal(signal.SIGALRM, lambda signum, frame: reactor.callFromThread(
817                 sys.stderr.write, 'Watchdog timer went off at:\n' + ''.join(traceback.format_stack())
818             ))
819             signal.siginterrupt(signal.SIGALRM, False)
820             task.LoopingCall(signal.alarm, 30).start(1)
821         
822         if args.irc_announce:
823             from twisted.words.protocols import irc
824             class IRCClient(irc.IRCClient):
825                 nickname = 'p2pool%02i' % (random.randrange(100),)
826                 channel = net.ANNOUNCE_CHANNEL
827                 def lineReceived(self, line):
828                     if p2pool.DEBUG:
829                         print repr(line)
830                     irc.IRCClient.lineReceived(self, line)
831                 def signedOn(self):
832                     irc.IRCClient.signedOn(self)
833                     self.factory.resetDelay()
834                     self.join(self.channel)
835                     @defer.inlineCallbacks
836                     def new_share(share):
837                         if share.pow_hash <= share.header['bits'].target and abs(share.timestamp - time.time()) < 10*60:
838                             yield deferral.sleep(random.expovariate(1/60))
839                             message = '\x02%s BLOCK FOUND by %s! %s%064x' % (net.NAME.upper(), bitcoin_data.script2_to_address(share.new_script, net.PARENT), net.PARENT.BLOCK_EXPLORER_URL_PREFIX, share.header_hash)
840                             if message not in self.recent_messages:
841                                 self.say(self.channel, message)
842                                 self._remember_message(message)
843                     self.watch_id = tracker.verified.added.watch(new_share)
844                     self.recent_messages = []
845                 def _remember_message(self, message):
846                     self.recent_messages.append(message)
847                     while len(self.recent_messages) > 100:
848                         self.recent_messages.pop(0)
849                 def privmsg(self, user, channel, message):
850                     if channel == self.channel:
851                         self._remember_message(message)
852                 def connectionLost(self, reason):
853                     tracker.verified.added.unwatch(self.watch_id)
854                     print 'IRC connection lost:', reason.getErrorMessage()
855             class IRCClientFactory(protocol.ReconnectingClientFactory):
856                 protocol = IRCClient
857             reactor.connectTCP("irc.freenode.net", 6667, IRCClientFactory())
858         
859         @defer.inlineCallbacks
860         def status_thread():
861             last_str = None
862             last_time = 0
863             while True:
864                 yield deferral.sleep(3)
865                 try:
866                     if time.time() > current_work.value['last_update'] + 60:
867                         print >>sys.stderr, '''---> LOST CONTACT WITH BITCOIND for %s! Check that it isn't frozen or dead! <---''' % (math.format_dt(time.time() - current_work.value['last_update']),)
868                     
869                     height = tracker.get_height(current_work.value['best_share_hash'])
870                     this_str = 'P2Pool: %i shares in chain (%i verified/%i total) Peers: %i (%i incoming)' % (
871                         height,
872                         len(tracker.verified.shares),
873                         len(tracker.shares),
874                         len(p2p_node.peers),
875                         sum(1 for peer in p2p_node.peers.itervalues() if peer.incoming),
876                     ) + (' FDs: %i R/%i W' % (len(reactor.getReaders()), len(reactor.getWriters())) if p2pool.DEBUG else '')
877                     
878                     datums, dt = wb.local_rate_monitor.get_datums_in_last()
879                     my_att_s = sum(datum['work']/dt for datum in datums)
880                     this_str += '\n Local: %sH/s in last %s Local dead on arrival: %s Expected time to share: %s' % (
881                         math.format(int(my_att_s)),
882                         math.format_dt(dt),
883                         math.format_binomial_conf(sum(1 for datum in datums if datum['dead']), len(datums), 0.95),
884                         math.format_dt(2**256 / tracker.shares[current_work.value['best_share_hash']].max_target / my_att_s) if my_att_s and current_work.value['best_share_hash'] else '???',
885                     )
886                     
887                     if height > 2:
888                         (stale_orphan_shares, stale_doa_shares), shares, _ = wb.get_stale_counts()
889                         stale_prop = p2pool_data.get_average_stale_prop(tracker, current_work.value['best_share_hash'], min(60*60//net.SHARE_PERIOD, height))
890                         real_att_s = p2pool_data.get_pool_attempts_per_second(tracker, current_work.value['best_share_hash'], min(height - 1, 60*60//net.SHARE_PERIOD)) / (1 - stale_prop)
891                         
892                         this_str += '\n Shares: %i (%i orphan, %i dead) Stale rate: %s Efficiency: %s Current payout: %.4f %s' % (
893                             shares, stale_orphan_shares, stale_doa_shares,
894                             math.format_binomial_conf(stale_orphan_shares + stale_doa_shares, shares, 0.95),
895                             math.format_binomial_conf(stale_orphan_shares + stale_doa_shares, shares, 0.95, lambda x: (1 - x)/(1 - stale_prop)),
896                             get_current_txouts().get(bitcoin_data.pubkey_hash_to_script2(my_pubkey_hash), 0)*1e-8, net.PARENT.SYMBOL,
897                         )
898                         this_str += '\n Pool: %sH/s Stale rate: %.1f%% Expected time to block: %s' % (
899                             math.format(int(real_att_s)),
900                             100*stale_prop,
901                             math.format_dt(2**256 / current_work.value['bits'].target / real_att_s),
902                         )
903                         
904                         for warning in p2pool_data.get_warnings(tracker, current_work, net):
905                             print >>sys.stderr, '#'*40
906                             print >>sys.stderr, '>>> Warning: ' + warning
907                             print >>sys.stderr, '#'*40
908                     
909                     if this_str != last_str or time.time() > last_time + 15:
910                         print this_str
911                         last_str = this_str
912                         last_time = time.time()
913                 except:
914                     log.err()
915         status_thread()
916     except:
917         reactor.stop()
918         log.err(None, 'Fatal error:')
919
920 def run():
921     realnets=dict((name, net) for name, net in networks.nets.iteritems() if '_testnet' not in name)
922     
923     parser = fixargparse.FixedArgumentParser(description='p2pool (version %s)' % (p2pool.__version__,), fromfile_prefix_chars='@')
924     parser.add_argument('--version', action='version', version=p2pool.__version__)
925     parser.add_argument('--net',
926         help='use specified network (default: bitcoin)',
927         action='store', choices=sorted(realnets), default='bitcoin', dest='net_name')
928     parser.add_argument('--testnet',
929         help='''use the network's testnet''',
930         action='store_const', const=True, default=False, dest='testnet')
931     parser.add_argument('--debug',
932         help='enable debugging mode',
933         action='store_const', const=True, default=False, dest='debug')
934     parser.add_argument('-a', '--address',
935         help='generate payouts to this address (default: <address requested from bitcoind>)',
936         type=str, action='store', default=None, dest='address')
937     parser.add_argument('--datadir',
938         help='store data in this directory (default: <directory run_p2pool.py is in>/data)',
939         type=str, action='store', default=None, dest='datadir')
940     parser.add_argument('--logfile',
941         help='''log to this file (default: data/<NET>/log)''',
942         type=str, action='store', default=None, dest='logfile')
943     parser.add_argument('--merged',
944         help='call getauxblock on this url to get work for merged mining (example: http://ncuser:ncpass@127.0.0.1:10332/)',
945         type=str, action='append', default=[], dest='merged_urls')
946     parser.add_argument('--give-author', metavar='DONATION_PERCENTAGE',
947         help='donate this percentage of work towards the development of p2pool (default: 0.5)',
948         type=float, action='store', default=0.5, dest='donation_percentage')
949     parser.add_argument('--iocp',
950         help='use Windows IOCP API in order to avoid errors due to large number of sockets being open',
951         action='store_true', default=False, dest='iocp')
952     parser.add_argument('--irc-announce',
953         help='announce any blocks found on irc://irc.freenode.net/#p2pool',
954         action='store_true', default=False, dest='irc_announce')
955     parser.add_argument('--no-bugreport',
956         help='disable submitting caught exceptions to the author',
957         action='store_true', default=False, dest='no_bugreport')
958     
959     p2pool_group = parser.add_argument_group('p2pool interface')
960     p2pool_group.add_argument('--p2pool-port', metavar='PORT',
961         help='use port PORT to listen for connections (forward this port from your router!) (default: %s)' % ', '.join('%s:%i' % (name, net.P2P_PORT) for name, net in sorted(realnets.items())),
962         type=int, action='store', default=None, dest='p2pool_port')
963     p2pool_group.add_argument('-n', '--p2pool-node', metavar='ADDR[:PORT]',
964         help='connect to existing p2pool node at ADDR listening on port PORT (defaults to default p2pool P2P port) in addition to builtin addresses',
965         type=str, action='append', default=[], dest='p2pool_nodes')
966     parser.add_argument('--disable-upnp',
967         help='''don't attempt to use UPnP to forward p2pool's P2P port from the Internet to this computer''',
968         action='store_false', default=True, dest='upnp')
969     p2pool_group.add_argument('--max-conns', metavar='CONNS',
970         help='maximum incoming connections (default: 40)',
971         type=int, action='store', default=40, dest='p2pool_conns')
972     
973     worker_group = parser.add_argument_group('worker interface')
974     worker_group.add_argument('-w', '--worker-port', metavar='PORT or ADDR:PORT',
975         help='listen on PORT on interface with ADDR for RPC connections from miners (default: all interfaces, %s)' % ', '.join('%s:%i' % (name, net.WORKER_PORT) for name, net in sorted(realnets.items())),
976         type=str, action='store', default=None, dest='worker_endpoint')
977     worker_group.add_argument('-f', '--fee', metavar='FEE_PERCENTAGE',
978         help='''charge workers mining to their own bitcoin address (by setting their miner's username to a bitcoin address) this percentage fee to mine on your p2pool instance. Amount displayed at http://127.0.0.1:WORKER_PORT/fee (default: 0)''',
979         type=float, action='store', default=0, dest='worker_fee')
980     
981     bitcoind_group = parser.add_argument_group('bitcoind interface')
982     bitcoind_group.add_argument('--bitcoind-address', metavar='BITCOIND_ADDRESS',
983         help='connect to this address (default: 127.0.0.1)',
984         type=str, action='store', default='127.0.0.1', dest='bitcoind_address')
985     bitcoind_group.add_argument('--bitcoind-rpc-port', metavar='BITCOIND_RPC_PORT',
986         help='''connect to JSON-RPC interface at this port (default: %s <read from bitcoin.conf if password not provided>)''' % ', '.join('%s:%i' % (name, net.PARENT.RPC_PORT) for name, net in sorted(realnets.items())),
987         type=int, action='store', default=None, dest='bitcoind_rpc_port')
988     bitcoind_group.add_argument('--bitcoind-p2p-port', metavar='BITCOIND_P2P_PORT',
989         help='''connect to P2P interface at this port (default: %s <read from bitcoin.conf if password not provided>)''' % ', '.join('%s:%i' % (name, net.PARENT.P2P_PORT) for name, net in sorted(realnets.items())),
990         type=int, action='store', default=None, dest='bitcoind_p2p_port')
991     
992     bitcoind_group.add_argument(metavar='BITCOIND_RPCUSERPASS',
993         help='bitcoind RPC interface username, then password, space-separated (only one being provided will cause the username to default to being empty, and none will cause P2Pool to read them from bitcoin.conf)',
994         type=str, action='store', default=[], nargs='*', dest='bitcoind_rpc_userpass')
995     
996     args = parser.parse_args()
997     
998     if args.debug:
999         p2pool.DEBUG = True
1000     
1001     net_name = args.net_name + ('_testnet' if args.testnet else '')
1002     net = networks.nets[net_name]
1003     
1004     datadir_path = os.path.join((os.path.join(os.path.dirname(sys.argv[0]), 'data') if args.datadir is None else args.datadir), net_name)
1005     if not os.path.exists(datadir_path):
1006         os.makedirs(datadir_path)
1007     
1008     if len(args.bitcoind_rpc_userpass) > 2:
1009         parser.error('a maximum of two arguments are allowed')
1010     args.bitcoind_rpc_username, args.bitcoind_rpc_password = ([None, None] + args.bitcoind_rpc_userpass)[-2:]
1011     
1012     if args.bitcoind_rpc_password is None:
1013         if not hasattr(net.PARENT, 'CONF_FILE_FUNC'):
1014             parser.error('This network has no configuration file function. Manually enter your RPC password.')
1015         conf_path = net.PARENT.CONF_FILE_FUNC()
1016         if not os.path.exists(conf_path):
1017             parser.error('''Bitcoin configuration file not found. Manually enter your RPC password.\r\n'''
1018                 '''If you actually haven't created a configuration file, you should create one at %s with the text:\r\n'''
1019                 '''\r\n'''
1020                 '''server=1\r\n'''
1021                 '''rpcpassword=%x''' % (conf_path, random.randrange(2**128)))
1022         with open(conf_path, 'rb') as f:
1023             cp = ConfigParser.RawConfigParser()
1024             cp.readfp(StringIO.StringIO('[x]\r\n' + f.read()))
1025             for conf_name, var_name, var_type in [
1026                 ('rpcuser', 'bitcoind_rpc_username', str),
1027                 ('rpcpassword', 'bitcoind_rpc_password', str),
1028                 ('rpcport', 'bitcoind_rpc_port', int),
1029                 ('port', 'bitcoind_p2p_port', int),
1030             ]:
1031                 if getattr(args, var_name) is None and cp.has_option('x', conf_name):
1032                     setattr(args, var_name, var_type(cp.get('x', conf_name)))
1033         if args.bitcoind_rpc_password is None:
1034             parser.error('''Bitcoin configuration file didn't contain an rpcpassword= line! Add one!''')
1035     
1036     if args.bitcoind_rpc_username is None:
1037         args.bitcoind_rpc_username = ''
1038     
1039     if args.bitcoind_rpc_port is None:
1040         args.bitcoind_rpc_port = net.PARENT.RPC_PORT
1041     
1042     if args.bitcoind_p2p_port is None:
1043         args.bitcoind_p2p_port = net.PARENT.P2P_PORT
1044     
1045     if args.p2pool_port is None:
1046         args.p2pool_port = net.P2P_PORT
1047     
1048     if args.worker_endpoint is None:
1049         worker_endpoint = '', net.WORKER_PORT
1050     elif ':' not in args.worker_endpoint:
1051         worker_endpoint = '', int(args.worker_endpoint)
1052     else:
1053         addr, port = args.worker_endpoint.rsplit(':', 1)
1054         worker_endpoint = addr, int(port)
1055     
1056     if args.address is not None:
1057         try:
1058             args.pubkey_hash = bitcoin_data.address_to_pubkey_hash(args.address, net.PARENT)
1059         except Exception, e:
1060             parser.error('error parsing address: ' + repr(e))
1061     else:
1062         args.pubkey_hash = None
1063     
1064     def separate_url(url):
1065         s = urlparse.urlsplit(url)
1066         if '@' not in s.netloc:
1067             parser.error('merged url netloc must contain an "@"')
1068         userpass, new_netloc = s.netloc.rsplit('@', 1)
1069         return urlparse.urlunsplit(s._replace(netloc=new_netloc)), userpass
1070     merged_urls = map(separate_url, args.merged_urls)
1071     
1072     if args.logfile is None:
1073         args.logfile = os.path.join(datadir_path, 'log')
1074     
1075     logfile = logging.LogFile(args.logfile)
1076     pipe = logging.TimestampingPipe(logging.TeePipe([logging.EncodeReplacerPipe(sys.stderr), logfile]))
1077     sys.stdout = logging.AbortPipe(pipe)
1078     sys.stderr = log.DefaultObserver.stderr = logging.AbortPipe(logging.PrefixPipe(pipe, '> '))
1079     if hasattr(signal, "SIGUSR1"):
1080         def sigusr1(signum, frame):
1081             print 'Caught SIGUSR1, closing %r...' % (args.logfile,)
1082             logfile.reopen()
1083             print '...and reopened %r after catching SIGUSR1.' % (args.logfile,)
1084         signal.signal(signal.SIGUSR1, sigusr1)
1085     task.LoopingCall(logfile.reopen).start(5)
1086     
1087     class ErrorReporter(object):
1088         def __init__(self):
1089             self.last_sent = None
1090         
1091         def emit(self, eventDict):
1092             if not eventDict["isError"]:
1093                 return
1094             
1095             if self.last_sent is not None and time.time() < self.last_sent + 5:
1096                 return
1097             self.last_sent = time.time()
1098             
1099             if 'failure' in eventDict:
1100                 text = ((eventDict.get('why') or 'Unhandled Error')
1101                     + '\n' + eventDict['failure'].getTraceback())
1102             else:
1103                 text = " ".join([str(m) for m in eventDict["message"]]) + "\n"
1104             
1105             from twisted.web import client
1106             client.getPage(
1107                 url='http://u.forre.st/p2pool_error.cgi',
1108                 method='POST',
1109                 postdata=p2pool.__version__ + ' ' + net.NAME + '\n' + text,
1110                 timeout=15,
1111             ).addBoth(lambda x: None)
1112     if not args.no_bugreport:
1113         log.addObserver(ErrorReporter().emit)
1114     
1115     reactor.callWhenRunning(main, args, net, datadir_path, merged_urls, worker_endpoint)
1116     reactor.run()