moved merged work, combining work, and LP into WorkerBridge
[p2pool.git] / p2pool / main.py
1 from __future__ import division
2
3 import ConfigParser
4 import StringIO
5 import base64
6 import json
7 import os
8 import random
9 import sys
10 import time
11 import signal
12 import traceback
13 import urlparse
14
15 if '--iocp' in sys.argv:
16     from twisted.internet import iocpreactor
17     iocpreactor.install()
18 from twisted.internet import defer, reactor, protocol, task
19 from twisted.web import server
20 from twisted.python import log
21 from nattraverso import portmapper, ipdiscover
22
23 import bitcoin.p2p as bitcoin_p2p, bitcoin.getwork as bitcoin_getwork, bitcoin.data as bitcoin_data
24 from bitcoin import worker_interface, height_tracker
25 from util import expiring_dict, fixargparse, jsonrpc, variable, deferral, math, logging, pack
26 from . import p2p, networks, web
27 import p2pool, p2pool.data as p2pool_data
28
29 @deferral.retry('Error getting work from bitcoind:', 3)
30 @defer.inlineCallbacks
31 def getwork(bitcoind):
32     try:
33         work = yield bitcoind.rpc_getmemorypool()
34     except jsonrpc.Error, e:
35         if e.code == -32601: # Method not found
36             print >>sys.stderr, 'Error: Bitcoin version too old! Upgrade to v0.5 or newer!'
37             raise deferral.RetrySilentlyException()
38         raise
39     packed_transactions = [x.decode('hex') for x in work['transactions']]
40     defer.returnValue(dict(
41         version=work['version'],
42         previous_block_hash=int(work['previousblockhash'], 16),
43         transactions=map(bitcoin_data.tx_type.unpack, packed_transactions),
44         merkle_link=bitcoin_data.calculate_merkle_link([None] + map(bitcoin_data.hash256, packed_transactions), 0),
45         subsidy=work['coinbasevalue'],
46         time=work['time'],
47         bits=bitcoin_data.FloatingIntegerType().unpack(work['bits'].decode('hex')[::-1]) if isinstance(work['bits'], (str, unicode)) else bitcoin_data.FloatingInteger(work['bits']),
48         coinbaseflags=work['coinbaseflags'].decode('hex') if 'coinbaseflags' in work else ''.join(x.decode('hex') for x in work['coinbaseaux'].itervalues()) if 'coinbaseaux' in work else '',
49     ))
50
51 class WorkerBridge(worker_interface.WorkerBridge):
52     def __init__(self, my_pubkey_hash, net, donation_percentage, bitcoind_work, best_block_header, merged_urls, best_share_var, tracker, my_share_hashes, my_doa_share_hashes, worker_fee, p2p_node, submit_block, set_best_share, shared_share_hashes, block_height_var):
53         worker_interface.WorkerBridge.__init__(self)
54         self.recent_shares_ts_work = []
55         
56         self.my_pubkey_hash = my_pubkey_hash
57         self.net = net
58         self.donation_percentage = donation_percentage
59         self.bitcoind_work = bitcoind_work
60         self.best_block_header = best_block_header
61         self.best_share_var = best_share_var
62         self.tracker = tracker
63         self.my_share_hashes = my_share_hashes
64         self.my_doa_share_hashes = my_doa_share_hashes
65         self.worker_fee = worker_fee
66         self.p2p_node = p2p_node
67         self.submit_block = submit_block
68         self.set_best_share = set_best_share
69         self.shared_share_hashes = shared_share_hashes
70         self.block_height_var = block_height_var
71         
72         self.pseudoshare_received = variable.Event()
73         self.share_received = variable.Event()
74         self.local_rate_monitor = math.RateMonitor(10*60)
75         
76         self.removed_unstales_var = variable.Variable((0, 0, 0))
77         self.removed_doa_unstales_var = variable.Variable(0)
78         
79         @tracker.verified.removed.watch
80         def _(share):
81             if share.hash in self.my_share_hashes and tracker.is_child_of(share.hash, self.best_share_var.value):
82                 assert share.share_data['stale_info'] in [None, 'orphan', 'doa'] # we made these shares in this instance
83                 self.removed_unstales_var.set((
84                     self.removed_unstales_var.value[0] + 1,
85                     self.removed_unstales_var.value[1] + (1 if share.share_data['stale_info'] == 'orphan' else 0),
86                     self.removed_unstales_var.value[2] + (1 if share.share_data['stale_info'] == 'doa' else 0),
87                 ))
88             if share.hash in self.my_doa_share_hashes and self.tracker.is_child_of(share.hash, self.best_share_var.value):
89                 self.removed_doa_unstales_var.set(self.removed_doa_unstales_var.value + 1)
90         
91         # MERGED WORK
92         
93         self.merged_work = variable.Variable({})
94         
95         @defer.inlineCallbacks
96         def set_merged_work(merged_url, merged_userpass):
97             merged_proxy = jsonrpc.Proxy(merged_url, dict(Authorization='Basic ' + base64.b64encode(merged_userpass)))
98             while True:
99                 auxblock = yield deferral.retry('Error while calling merged getauxblock:', 1)(merged_proxy.rpc_getauxblock)()
100                 self.merged_work.set(dict(self.merged_work.value, **{auxblock['chainid']: dict(
101                     hash=int(auxblock['hash'], 16),
102                     target=pack.IntType(256).unpack(auxblock['target'].decode('hex')),
103                     merged_proxy=merged_proxy,
104                 )}))
105                 yield deferral.sleep(1)
106         for merged_url, merged_userpass in merged_urls:
107             set_merged_work(merged_url, merged_userpass)
108         
109         @self.merged_work.changed.watch
110         def _(new_merged_work):
111             print 'Got new merged mining work!'
112         
113         # COMBINE WORK
114         
115         self.current_work = variable.Variable(None)
116         def compute_work():
117             t = dict(self.bitcoind_work.value)
118             
119             bb = self.best_block_header.value
120             if bb is not None and bb['previous_block'] == t['previous_block'] and net.PARENT.POW_FUNC(bitcoin_data.block_header_type.pack(bb)) <= t['bits'].target:
121                 print 'Skipping from block %x to block %x!' % (bb['previous_block'],
122                     bitcoin_data.hash256(bitcoin_data.block_header_type.pack(bb)))
123                 t = dict(
124                     version=bb['version'],
125                     previous_block=bitcoin_data.hash256(bitcoin_data.block_header_type.pack(bb)),
126                     bits=bb['bits'], # not always true
127                     coinbaseflags='',
128                     time=bb['timestamp'] + 600, # better way?
129                     transactions=[],
130                     merkle_link=bitcoin_data.calculate_merkle_link([None], 0),
131                     subsidy=net.PARENT.SUBSIDY_FUNC(self.block_height_var.value),
132                     clock_offset=self.current_work.value['clock_offset'],
133                     last_update=self.current_work.value['last_update'],
134                 )
135             
136             self.current_work.set(t)
137         self.bitcoind_work.changed.watch(lambda _: compute_work())
138         self.best_block_header.changed.watch(lambda _: compute_work())
139         compute_work()
140         
141         self.new_work_event = variable.Event()
142         @self.current_work.transitioned.watch
143         def _(before, after):
144             # trigger LP if version/previous_block/bits changed or transactions changed from nothing
145             if any(before[x] != after[x] for x in ['version', 'previous_block', 'bits']) or (not before['transactions'] and after['transactions']):
146                 self.new_work_event.happened()
147         self.merged_work.changed.watch(lambda _: self.new_work_event.happened())
148         self.best_share_var.changed.watch(lambda _: self.new_work_event.happened())
149     
150     def get_stale_counts(self):
151         '''Returns (orphans, doas), total, (orphans_recorded_in_chain, doas_recorded_in_chain)'''
152         my_shares = len(self.my_share_hashes)
153         my_doa_shares = len(self.my_doa_share_hashes)
154         delta = self.tracker.verified.get_delta_to_last(self.best_share_var.value)
155         my_shares_in_chain = delta.my_count + self.removed_unstales_var.value[0]
156         my_doa_shares_in_chain = delta.my_doa_count + self.removed_doa_unstales_var.value
157         orphans_recorded_in_chain = delta.my_orphan_announce_count + self.removed_unstales_var.value[1]
158         doas_recorded_in_chain = delta.my_dead_announce_count + self.removed_unstales_var.value[2]
159         
160         my_shares_not_in_chain = my_shares - my_shares_in_chain
161         my_doa_shares_not_in_chain = my_doa_shares - my_doa_shares_in_chain
162         
163         return (my_shares_not_in_chain - my_doa_shares_not_in_chain, my_doa_shares_not_in_chain), my_shares, (orphans_recorded_in_chain, doas_recorded_in_chain)
164     
165     def get_user_details(self, request):
166         user = request.getUser() if request.getUser() is not None else ''
167         
168         desired_pseudoshare_target = None
169         if '+' in user:
170             user, desired_pseudoshare_difficulty_str = user.rsplit('+', 1)
171             try:
172                 desired_pseudoshare_target = bitcoin_data.difficulty_to_target(float(desired_pseudoshare_difficulty_str))
173             except:
174                 pass
175         
176         desired_share_target = 2**256 - 1
177         if '/' in user:
178             user, min_diff_str = user.rsplit('/', 1)
179             try:
180                 desired_share_target = bitcoin_data.difficulty_to_target(float(min_diff_str))
181             except:
182                 pass
183         
184         if random.uniform(0, 100) < self.worker_fee:
185             pubkey_hash = self.my_pubkey_hash
186         else:
187             try:
188                 pubkey_hash = bitcoin_data.address_to_pubkey_hash(user, self.net.PARENT)
189             except: # XXX blah
190                 pubkey_hash = self.my_pubkey_hash
191         
192         return user, pubkey_hash, desired_share_target, desired_pseudoshare_target
193     
194     def preprocess_request(self, request):
195         user, pubkey_hash, desired_share_target, desired_pseudoshare_target = self.get_user_details(request)
196         return pubkey_hash, desired_share_target, desired_pseudoshare_target
197     
198     def get_work(self, pubkey_hash, desired_share_target, desired_pseudoshare_target):
199         if len(self.p2p_node.peers) == 0 and self.net.PERSIST:
200             raise jsonrpc.Error(-12345, u'p2pool is not connected to any peers')
201         if self.best_share_var.value is None and self.net.PERSIST:
202             raise jsonrpc.Error(-12345, u'p2pool is downloading shares')
203         if time.time() > self.current_work.value['last_update'] + 60:
204             raise jsonrpc.Error(-12345, u'lost contact with bitcoind')
205         
206         if self.merged_work.value:
207             tree, size = bitcoin_data.make_auxpow_tree(self.merged_work.value)
208             mm_hashes = [self.merged_work.value.get(tree.get(i), dict(hash=0))['hash'] for i in xrange(size)]
209             mm_data = '\xfa\xbemm' + bitcoin_data.aux_pow_coinbase_type.pack(dict(
210                 merkle_root=bitcoin_data.merkle_hash(mm_hashes),
211                 size=size,
212                 nonce=0,
213             ))
214             mm_later = [(aux_work, mm_hashes.index(aux_work['hash']), mm_hashes) for chain_id, aux_work in self.merged_work.value.iteritems()]
215         else:
216             mm_data = ''
217             mm_later = []
218         
219         if True:
220             share_info, generate_tx = p2pool_data.Share.generate_transaction(
221                 tracker=self.tracker,
222                 share_data=dict(
223                     previous_share_hash=self.best_share_var.value,
224                     coinbase=(mm_data + self.current_work.value['coinbaseflags'])[:100],
225                     nonce=random.randrange(2**32),
226                     pubkey_hash=pubkey_hash,
227                     subsidy=self.current_work.value['subsidy'],
228                     donation=math.perfect_round(65535*self.donation_percentage/100),
229                     stale_info=(lambda (orphans, doas), total, (orphans_recorded_in_chain, doas_recorded_in_chain):
230                         'orphan' if orphans > orphans_recorded_in_chain else
231                         'doa' if doas > doas_recorded_in_chain else
232                         None
233                     )(*self.get_stale_counts()),
234                     desired_version=3,
235                 ),
236                 block_target=self.current_work.value['bits'].target,
237                 desired_timestamp=int(time.time() - self.current_work.value['clock_offset']),
238                 desired_target=desired_share_target,
239                 ref_merkle_link=dict(branch=[], index=0),
240                 net=self.net,
241             )
242         
243         if desired_pseudoshare_target is None:
244             target = 2**256-1
245             if len(self.recent_shares_ts_work) == 50:
246                 hash_rate = sum(work for ts, work in self.recent_shares_ts_work[1:])//(self.recent_shares_ts_work[-1][0] - self.recent_shares_ts_work[0][0])
247                 if hash_rate:
248                     target = min(target, int(2**256/hash_rate))
249         else:
250             target = desired_pseudoshare_target
251         target = max(target, share_info['bits'].target)
252         for aux_work in self.merged_work.value.itervalues():
253             target = max(target, aux_work['target'])
254         target = math.clip(target, self.net.PARENT.SANE_TARGET_RANGE)
255         
256         transactions = [generate_tx] + list(self.current_work.value['transactions'])
257         packed_generate_tx = bitcoin_data.tx_type.pack(generate_tx)
258         merkle_root = bitcoin_data.check_merkle_link(bitcoin_data.hash256(packed_generate_tx), self.current_work.value['merkle_link'])
259         
260         getwork_time = time.time()
261         merkle_link = self.current_work.value['merkle_link']
262         
263         print 'New work for worker! Difficulty: %.06f Share difficulty: %.06f Total block value: %.6f %s including %i transactions' % (
264             bitcoin_data.target_to_difficulty(target),
265             bitcoin_data.target_to_difficulty(share_info['bits'].target),
266             self.current_work.value['subsidy']*1e-8, self.net.PARENT.SYMBOL,
267             len(self.current_work.value['transactions']),
268         )
269         
270         bits = self.current_work.value['bits']
271         previous_block = self.current_work.value['previous_block']
272         ba = bitcoin_getwork.BlockAttempt(
273             version=self.current_work.value['version'],
274             previous_block=self.current_work.value['previous_block'],
275             merkle_root=merkle_root,
276             timestamp=self.current_work.value['time'],
277             bits=self.current_work.value['bits'],
278             share_target=target,
279         )
280         
281         received_header_hashes = set()
282         
283         def got_response(header, request):
284             header_hash = bitcoin_data.hash256(bitcoin_data.block_header_type.pack(header))
285             pow_hash = self.net.PARENT.POW_FUNC(bitcoin_data.block_header_type.pack(header))
286             try:
287                 if pow_hash <= header['bits'].target or p2pool.DEBUG:
288                     self.submit_block(dict(header=header, txs=transactions), ignore_failure=False)
289                     if pow_hash <= header['bits'].target:
290                         print
291                         print 'GOT BLOCK FROM MINER! Passing to bitcoind! %s%064x' % (self.net.PARENT.BLOCK_EXPLORER_URL_PREFIX, header_hash)
292                         print
293             except:
294                 log.err(None, 'Error while processing potential block:')
295             
296             user, _, _, _ = self.get_user_details(request)
297             assert header['merkle_root'] == merkle_root
298             assert header['previous_block'] == previous_block
299             assert header['bits'] == bits
300             
301             on_time = self.best_share_var.value == share_info['share_data']['previous_share_hash']
302             
303             for aux_work, index, hashes in mm_later:
304                 try:
305                     if pow_hash <= aux_work['target'] or p2pool.DEBUG:
306                         df = deferral.retry('Error submitting merged block: (will retry)', 10, 10)(aux_work['merged_proxy'].rpc_getauxblock)(
307                             pack.IntType(256, 'big').pack(aux_work['hash']).encode('hex'),
308                             bitcoin_data.aux_pow_type.pack(dict(
309                                 merkle_tx=dict(
310                                     tx=transactions[0],
311                                     block_hash=header_hash,
312                                     merkle_link=merkle_link,
313                                 ),
314                                 merkle_link=bitcoin_data.calculate_merkle_link(hashes, index),
315                                 parent_block_header=header,
316                             )).encode('hex'),
317                         )
318                         @df.addCallback
319                         def _(result):
320                             if result != (pow_hash <= aux_work['target']):
321                                 print >>sys.stderr, 'Merged block submittal result: %s Expected: %s' % (result, pow_hash <= aux_work['target'])
322                             else:
323                                 print 'Merged block submittal result: %s' % (result,)
324                         @df.addErrback
325                         def _(err):
326                             log.err(err, 'Error submitting merged block:')
327                 except:
328                     log.err(None, 'Error while processing merged mining POW:')
329             
330             if pow_hash <= share_info['bits'].target and header_hash not in received_header_hashes:
331                 min_header = dict(header);del min_header['merkle_root']
332                 hash_link = p2pool_data.prefix_to_hash_link(packed_generate_tx[:-32-4], p2pool_data.Share.gentx_before_refhash)
333                 share = p2pool_data.Share(self.net, None, dict(
334                     min_header=min_header, share_info=share_info, hash_link=hash_link,
335                     ref_merkle_link=dict(branch=[], index=0),
336                 ), merkle_link=merkle_link, other_txs=transactions[1:] if pow_hash <= header['bits'].target else None)
337                 
338                 print 'GOT SHARE! %s %s prev %s age %.2fs%s' % (
339                     request.getUser(),
340                     p2pool_data.format_hash(share.hash),
341                     p2pool_data.format_hash(share.previous_hash),
342                     time.time() - getwork_time,
343                     ' DEAD ON ARRIVAL' if not on_time else '',
344                 )
345                 self.my_share_hashes.add(share.hash)
346                 if not on_time:
347                     self.my_doa_share_hashes.add(share.hash)
348                 
349                 self.tracker.add(share)
350                 if not p2pool.DEBUG:
351                     self.tracker.verified.add(share)
352                 self.set_best_share()
353                 
354                 try:
355                     if pow_hash <= header['bits'].target or p2pool.DEBUG:
356                         for peer in self.p2p_node.peers.itervalues():
357                             peer.sendShares([share])
358                         self.shared_share_hashes.add(share.hash)
359                 except:
360                     log.err(None, 'Error forwarding block solution:')
361                 
362                 self.share_received.happened(bitcoin_data.target_to_average_attempts(share.target), not on_time)
363             
364             if pow_hash > target:
365                 print 'Worker %s submitted share with hash > target:' % (request.getUser(),)
366                 print '    Hash:   %56x' % (pow_hash,)
367                 print '    Target: %56x' % (target,)
368             elif header_hash in received_header_hashes:
369                 print >>sys.stderr, 'Worker %s @ %s submitted share more than once!' % (request.getUser(), request.getClientIP())
370             else:
371                 received_header_hashes.add(header_hash)
372                 
373                 self.pseudoshare_received.happened(bitcoin_data.target_to_average_attempts(target), not on_time, user)
374                 self.recent_shares_ts_work.append((time.time(), bitcoin_data.target_to_average_attempts(target)))
375                 while len(self.recent_shares_ts_work) > 50:
376                     self.recent_shares_ts_work.pop(0)
377                 self.local_rate_monitor.add_datum(dict(work=bitcoin_data.target_to_average_attempts(target), dead=not on_time, user=user))
378             
379             return on_time
380         
381         return ba, got_response
382
383 @defer.inlineCallbacks
384 def main(args, net, datadir_path, merged_urls, worker_endpoint):
385     try:
386         print 'p2pool (version %s)' % (p2pool.__version__,)
387         print
388         
389         # connect to bitcoind over JSON-RPC and do initial getmemorypool
390         url = 'http://%s:%i/' % (args.bitcoind_address, args.bitcoind_rpc_port)
391         print '''Testing bitcoind RPC connection to '%s' with username '%s'...''' % (url, args.bitcoind_rpc_username)
392         bitcoind = jsonrpc.Proxy(url, dict(Authorization='Basic ' + base64.b64encode(args.bitcoind_rpc_username + ':' + args.bitcoind_rpc_password)), timeout=30)
393         @deferral.retry('Error while checking Bitcoin connection:', 1)
394         @defer.inlineCallbacks
395         def check():
396             if not (yield net.PARENT.RPC_CHECK)(bitcoind):
397                 print >>sys.stderr, "    Check failed! Make sure that you're connected to the right bitcoind with --bitcoind-rpc-port!"
398                 raise deferral.RetrySilentlyException()
399             temp_work = yield getwork(bitcoind)
400             if not net.VERSION_CHECK((yield bitcoind.rpc_getinfo())['version'], temp_work):
401                 print >>sys.stderr, '    Bitcoin version too old! BIP16 support required! Upgrade to 0.6.0rc4 or greater!'
402                 raise deferral.RetrySilentlyException()
403             defer.returnValue(temp_work)
404         temp_work = yield check()
405         
406         block_height_var = variable.Variable(None)
407         @defer.inlineCallbacks
408         def poll_height():
409             block_height_var.set((yield deferral.retry('Error while calling getblockcount:')(bitcoind.rpc_getblockcount)()))
410         yield poll_height()
411         task.LoopingCall(poll_height).start(60*60)
412         
413         print '    ...success!'
414         print '    Current block hash: %x' % (temp_work['previous_block_hash'],)
415         print '    Current block height: %i' % (block_height_var.value,)
416         print
417         
418         # connect to bitcoind over bitcoin-p2p
419         print '''Testing bitcoind P2P connection to '%s:%s'...''' % (args.bitcoind_address, args.bitcoind_p2p_port)
420         factory = bitcoin_p2p.ClientFactory(net.PARENT)
421         reactor.connectTCP(args.bitcoind_address, args.bitcoind_p2p_port, factory)
422         yield factory.getProtocol() # waits until handshake is successful
423         print '    ...success!'
424         print
425         
426         print 'Determining payout address...'
427         if args.pubkey_hash is None:
428             address_path = os.path.join(datadir_path, 'cached_payout_address')
429             
430             if os.path.exists(address_path):
431                 with open(address_path, 'rb') as f:
432                     address = f.read().strip('\r\n')
433                 print '    Loaded cached address: %s...' % (address,)
434             else:
435                 address = None
436             
437             if address is not None:
438                 res = yield deferral.retry('Error validating cached address:', 5)(lambda: bitcoind.rpc_validateaddress(address))()
439                 if not res['isvalid'] or not res['ismine']:
440                     print '    Cached address is either invalid or not controlled by local bitcoind!'
441                     address = None
442             
443             if address is None:
444                 print '    Getting payout address from bitcoind...'
445                 address = yield deferral.retry('Error getting payout address from bitcoind:', 5)(lambda: bitcoind.rpc_getaccountaddress('p2pool'))()
446             
447             with open(address_path, 'wb') as f:
448                 f.write(address)
449             
450             my_pubkey_hash = bitcoin_data.address_to_pubkey_hash(address, net.PARENT)
451         else:
452             my_pubkey_hash = args.pubkey_hash
453         print '    ...success! Payout address:', bitcoin_data.pubkey_hash_to_address(my_pubkey_hash, net.PARENT)
454         print
455         
456         my_share_hashes = set()
457         my_doa_share_hashes = set()
458         
459         tracker = p2pool_data.OkayTracker(net, my_share_hashes, my_doa_share_hashes)
460         shared_share_hashes = set()
461         ss = p2pool_data.ShareStore(os.path.join(datadir_path, 'shares.'), net)
462         known_verified = set()
463         print "Loading shares..."
464         for i, (mode, contents) in enumerate(ss.get_shares()):
465             if mode == 'share':
466                 if contents.hash in tracker.shares:
467                     continue
468                 shared_share_hashes.add(contents.hash)
469                 contents.time_seen = 0
470                 tracker.add(contents)
471                 if len(tracker.shares) % 1000 == 0 and tracker.shares:
472                     print "    %i" % (len(tracker.shares),)
473             elif mode == 'verified_hash':
474                 known_verified.add(contents)
475             else:
476                 raise AssertionError()
477         print "    ...inserting %i verified shares..." % (len(known_verified),)
478         for h in known_verified:
479             if h not in tracker.shares:
480                 ss.forget_verified_share(h)
481                 continue
482             tracker.verified.add(tracker.shares[h])
483         print "    ...done loading %i shares!" % (len(tracker.shares),)
484         print
485         tracker.removed.watch(lambda share: ss.forget_share(share.hash))
486         tracker.verified.removed.watch(lambda share: ss.forget_verified_share(share.hash))
487         tracker.removed.watch(lambda share: shared_share_hashes.discard(share.hash))
488         
489         print 'Initializing work...'
490         
491         
492         # BITCOIND WORK
493         
494         bitcoind_work = variable.Variable(None)
495         
496         @defer.inlineCallbacks
497         def poll_bitcoind():
498             work = yield getwork(bitcoind)
499             bitcoind_work.set(dict(
500                 version=work['version'],
501                 previous_block=work['previous_block_hash'],
502                 bits=work['bits'],
503                 coinbaseflags=work['coinbaseflags'],
504                 time=work['time'],
505                 transactions=work['transactions'],
506                 merkle_link=work['merkle_link'],
507                 subsidy=work['subsidy'],
508                 clock_offset=time.time() - work['time'],
509                 last_update=time.time(),
510             ))
511         yield poll_bitcoind()
512         
513         @defer.inlineCallbacks
514         def work_poller():
515             while True:
516                 flag = factory.new_block.get_deferred()
517                 try:
518                     yield poll_bitcoind()
519                 except:
520                     log.err()
521                 yield defer.DeferredList([flag, deferral.sleep(15)], fireOnOneCallback=True)
522         work_poller()
523         
524         # PEER WORK
525         
526         best_block_header = variable.Variable(None)
527         def handle_header(new_header):
528             # check that header matches current target
529             if not (net.PARENT.POW_FUNC(bitcoin_data.block_header_type.pack(new_header)) <= bitcoind_work.value['bits'].target):
530                 return
531             bitcoind_best_block = bitcoind_work.value['previous_block']
532             if (best_block_header.value is None
533                 or (
534                     new_header['previous_block'] == bitcoind_best_block and
535                     bitcoin_data.hash256(bitcoin_data.block_header_type.pack(best_block_header.value)) == bitcoind_best_block
536                 ) # new is child of current and previous is current
537                 or (
538                     bitcoin_data.hash256(bitcoin_data.block_header_type.pack(new_header)) == bitcoind_best_block and
539                     best_block_header.value['previous_block'] != bitcoind_best_block
540                 )): # new is current and previous is not a child of current
541                 best_block_header.set(new_header)
542         @defer.inlineCallbacks
543         def poll_header():
544             handle_header((yield factory.conn.value.get_block_header(bitcoind_work.value['previous_block'])))
545         bitcoind_work.changed.watch(lambda _: poll_header())
546         yield poll_header()
547         
548         # BEST SHARE
549         
550         get_height_rel_highest = yield height_tracker.get_height_rel_highest_func(bitcoind, factory, lambda: bitcoind_work.value['previous_block'], net)
551         requested = expiring_dict.ExpiringDict(300)
552         peer_heads = expiring_dict.ExpiringDict(300) # hash -> peers that know of it
553         
554         best_share_var = variable.Variable(None)
555         def set_best_share():
556             best, desired = tracker.think(get_height_rel_highest, bitcoind_work.value['previous_block'], bitcoind_work.value['bits'])
557             
558             best_share_var.set(best)
559             
560             t = time.time()
561             for peer2, share_hash in desired:
562                 if share_hash not in tracker.tails: # was received in the time tracker.think was running
563                     continue
564                 last_request_time, count = requested.get(share_hash, (None, 0))
565                 if last_request_time is not None and last_request_time - 5 < t < last_request_time + 10 * 1.5**count:
566                     continue
567                 potential_peers = set()
568                 for head in tracker.tails[share_hash]:
569                     potential_peers.update(peer_heads.get(head, set()))
570                 potential_peers = [peer for peer in potential_peers if peer.connected2]
571                 if count == 0 and peer2 is not None and peer2.connected2:
572                     peer = peer2
573                 else:
574                     peer = random.choice(potential_peers) if potential_peers and random.random() > .2 else peer2
575                     if peer is None:
576                         continue
577                 
578                 print 'Requesting parent share %s from %s' % (p2pool_data.format_hash(share_hash), '%s:%i' % peer.addr)
579                 peer.send_getshares(
580                     hashes=[share_hash],
581                     parents=2000,
582                     stops=list(set(tracker.heads) | set(
583                         tracker.get_nth_parent_hash(head, min(max(0, tracker.get_height_and_last(head)[0] - 1), 10)) for head in tracker.heads
584                     ))[:100],
585                 )
586                 requested[share_hash] = t, count + 1
587         bitcoind_work.changed.watch(lambda _: set_best_share())
588         set_best_share()
589         
590         
591         print '    ...success!'
592         print
593         
594         # setup p2p logic and join p2pool network
595         
596         class Node(p2p.Node):
597             def handle_shares(self, shares, peer):
598                 if len(shares) > 5:
599                     print 'Processing %i shares from %s...' % (len(shares), '%s:%i' % peer.addr if peer is not None else None)
600                 
601                 new_count = 0
602                 for share in shares:
603                     if share.hash in tracker.shares:
604                         #print 'Got duplicate share, ignoring. Hash: %s' % (p2pool_data.format_hash(share.hash),)
605                         continue
606                     
607                     new_count += 1
608                     
609                     #print 'Received share %s from %r' % (p2pool_data.format_hash(share.hash), share.peer.addr if share.peer is not None else None)
610                     
611                     tracker.add(share)
612                 
613                 if shares and peer is not None:
614                     peer_heads.setdefault(shares[0].hash, set()).add(peer)
615                 
616                 if new_count:
617                     set_best_share()
618                 
619                 if len(shares) > 5:
620                     print '... done processing %i shares. New: %i Have: %i/~%i' % (len(shares), new_count, len(tracker.shares), 2*net.CHAIN_LENGTH)
621             
622             def handle_share_hashes(self, hashes, peer):
623                 t = time.time()
624                 get_hashes = []
625                 for share_hash in hashes:
626                     if share_hash in tracker.shares:
627                         continue
628                     last_request_time, count = requested.get(share_hash, (None, 0))
629                     if last_request_time is not None and last_request_time - 5 < t < last_request_time + 10 * 1.5**count:
630                         continue
631                     print 'Got share hash, requesting! Hash: %s' % (p2pool_data.format_hash(share_hash),)
632                     get_hashes.append(share_hash)
633                     requested[share_hash] = t, count + 1
634                 
635                 if hashes and peer is not None:
636                     peer_heads.setdefault(hashes[0], set()).add(peer)
637                 if get_hashes:
638                     peer.send_getshares(hashes=get_hashes, parents=0, stops=[])
639             
640             def handle_get_shares(self, hashes, parents, stops, peer):
641                 parents = min(parents, 1000//len(hashes))
642                 stops = set(stops)
643                 shares = []
644                 for share_hash in hashes:
645                     for share in tracker.get_chain(share_hash, min(parents + 1, tracker.get_height(share_hash))):
646                         if share.hash in stops:
647                             break
648                         shares.append(share)
649                 print 'Sending %i shares to %s:%i' % (len(shares), peer.addr[0], peer.addr[1])
650                 return shares
651             
652             def handle_bestblock(self, header, peer):
653                 if net.PARENT.POW_FUNC(bitcoin_data.block_header_type.pack(header)) > header['bits'].target:
654                     raise p2p.PeerMisbehavingError('received block header fails PoW test')
655                 handle_header(header)
656         
657         @deferral.retry('Error submitting primary block: (will retry)', 10, 10)
658         def submit_block_p2p(block):
659             if factory.conn.value is None:
660                 print >>sys.stderr, 'No bitcoind connection when block submittal attempted! %s%32x' % (net.PARENT.BLOCK_EXPLORER_URL_PREFIX, bitcoin_data.hash256(bitcoin_data.block_header_type.pack(block['header'])))
661                 raise deferral.RetrySilentlyException()
662             factory.conn.value.send_block(block=block)
663         
664         @deferral.retry('Error submitting block: (will retry)', 10, 10)
665         @defer.inlineCallbacks
666         def submit_block_rpc(block, ignore_failure):
667             success = yield bitcoind.rpc_getmemorypool(bitcoin_data.block_type.pack(block).encode('hex'))
668             success_expected = net.PARENT.POW_FUNC(bitcoin_data.block_header_type.pack(block['header'])) <= block['header']['bits'].target
669             if (not success and success_expected and not ignore_failure) or (success and not success_expected):
670                 print >>sys.stderr, 'Block submittal result: %s Expected: %s' % (success, success_expected)
671         
672         def submit_block(block, ignore_failure):
673             submit_block_p2p(block)
674             submit_block_rpc(block, ignore_failure)
675         
676         @tracker.verified.added.watch
677         def _(share):
678             if share.pow_hash <= share.header['bits'].target:
679                 submit_block(share.as_block(tracker), ignore_failure=True)
680                 print
681                 print 'GOT BLOCK FROM PEER! Passing to bitcoind! %s bitcoin: %s%064x' % (p2pool_data.format_hash(share.hash), net.PARENT.BLOCK_EXPLORER_URL_PREFIX, share.header_hash)
682                 print
683                 def spread():
684                     if (get_height_rel_highest(share.header['previous_block']) > -5 or
685                         bitcoind_work.value['previous_block'] in [share.header['previous_block'], share.header_hash]):
686                         broadcast_share(share.hash)
687                 spread()
688                 reactor.callLater(5, spread) # so get_height_rel_highest can update
689         
690         print 'Joining p2pool network using port %i...' % (args.p2pool_port,)
691         
692         @defer.inlineCallbacks
693         def parse(x):
694             if ':' in x:
695                 ip, port = x.split(':')
696                 defer.returnValue(((yield reactor.resolve(ip)), int(port)))
697             else:
698                 defer.returnValue(((yield reactor.resolve(x)), net.P2P_PORT))
699         
700         addrs = {}
701         if os.path.exists(os.path.join(datadir_path, 'addrs')):
702             try:
703                 with open(os.path.join(datadir_path, 'addrs'), 'rb') as f:
704                     addrs.update(dict((tuple(k), v) for k, v in json.loads(f.read())))
705             except:
706                 print >>sys.stderr, 'error parsing addrs'
707         for addr_df in map(parse, net.BOOTSTRAP_ADDRS):
708             try:
709                 addr = yield addr_df
710                 if addr not in addrs:
711                     addrs[addr] = (0, time.time(), time.time())
712             except:
713                 log.err()
714         
715         connect_addrs = set()
716         for addr_df in map(parse, args.p2pool_nodes):
717             try:
718                 connect_addrs.add((yield addr_df))
719             except:
720                 log.err()
721         
722         p2p_node = Node(
723             best_share_hash_func=lambda: best_share_var.value,
724             port=args.p2pool_port,
725             net=net,
726             addr_store=addrs,
727             connect_addrs=connect_addrs,
728             max_incoming_conns=args.p2pool_conns,
729         )
730         p2p_node.start()
731         
732         def save_addrs():
733             with open(os.path.join(datadir_path, 'addrs'), 'wb') as f:
734                 f.write(json.dumps(p2p_node.addr_store.items()))
735         task.LoopingCall(save_addrs).start(60)
736         
737         @best_block_header.changed.watch
738         def _(header):
739             for peer in p2p_node.peers.itervalues():
740                 peer.send_bestblock(header=header)
741         
742         def broadcast_share(share_hash):
743             shares = []
744             for share in tracker.get_chain(share_hash, min(5, tracker.get_height(share_hash))):
745                 if share.hash in shared_share_hashes:
746                     break
747                 shared_share_hashes.add(share.hash)
748                 shares.append(share)
749             
750             for peer in p2p_node.peers.itervalues():
751                 peer.sendShares([share for share in shares if share.peer is not peer])
752         
753         # send share when the chain changes to their chain
754         best_share_var.changed.watch(broadcast_share)
755         
756         def save_shares():
757             for share in tracker.get_chain(best_share_var.value, min(tracker.get_height(best_share_var.value), 2*net.CHAIN_LENGTH)):
758                 ss.add_share(share)
759                 if share.hash in tracker.verified.shares:
760                     ss.add_verified_hash(share.hash)
761         task.LoopingCall(save_shares).start(60)
762         
763         print '    ...success!'
764         print
765         
766         if args.upnp:
767             @defer.inlineCallbacks
768             def upnp_thread():
769                 while True:
770                     try:
771                         is_lan, lan_ip = yield ipdiscover.get_local_ip()
772                         if is_lan:
773                             pm = yield portmapper.get_port_mapper()
774                             yield pm._upnp.add_port_mapping(lan_ip, args.p2pool_port, args.p2pool_port, 'p2pool', 'TCP')
775                     except defer.TimeoutError:
776                         pass
777                     except:
778                         if p2pool.DEBUG:
779                             log.err(None, 'UPnP error:')
780                     yield deferral.sleep(random.expovariate(1/120))
781             upnp_thread()
782         
783         # start listening for workers with a JSON-RPC server
784         
785         print 'Listening for workers on %r port %i...' % (worker_endpoint[0], worker_endpoint[1])
786         
787         get_current_txouts = lambda: p2pool_data.get_expected_payouts(tracker, best_share_var.value, bitcoind_work.value['bits'].target, bitcoind_work.value['subsidy'], net)
788         
789         wb = WorkerBridge(my_pubkey_hash, net, args.donation_percentage, bitcoind_work, best_block_header, merged_urls, best_share_var, tracker, my_share_hashes, my_doa_share_hashes, args.worker_fee, p2p_node, submit_block, set_best_share, shared_share_hashes, block_height_var)
790         web_root = web.get_web_root(tracker, bitcoind_work, get_current_txouts, datadir_path, net, wb.get_stale_counts, my_pubkey_hash, wb.local_rate_monitor, args.worker_fee, p2p_node, wb.my_share_hashes, wb.pseudoshare_received, wb.share_received, best_share_var)
791         worker_interface.WorkerInterface(wb).attach_to(web_root, get_handler=lambda request: request.redirect('/static/'))
792         
793         deferral.retry('Error binding to worker port:', traceback=False)(reactor.listenTCP)(worker_endpoint[1], server.Site(web_root), interface=worker_endpoint[0])
794         
795         with open(os.path.join(os.path.join(datadir_path, 'ready_flag')), 'wb') as f:
796             pass
797         
798         print '    ...success!'
799         print
800         
801         
802         # done!
803         print 'Started successfully!'
804         print 'Go to http://127.0.0.1:%i/ to view graphs and statistics!' % (worker_endpoint[1],)
805         if args.donation_percentage > 0.51:
806             print '''Donating %.1f%% of work towards P2Pool's development. Thanks for the tip!''' % (args.donation_percentage,)
807         elif args.donation_percentage < 0.49:
808             print '''Donating %.1f%% of work towards P2Pool's development. Please donate to encourage further development of P2Pool!''' % (args.donation_percentage,)
809         else:
810             print '''Donating %.1f%% of work towards P2Pool's development. Thank you!''' % (args.donation_percentage,)
811             print 'You can increase this amount with --give-author argument! (or decrease it, if you must)'
812         print
813         
814         
815         if hasattr(signal, 'SIGALRM'):
816             signal.signal(signal.SIGALRM, lambda signum, frame: reactor.callFromThread(
817                 sys.stderr.write, 'Watchdog timer went off at:\n' + ''.join(traceback.format_stack())
818             ))
819             signal.siginterrupt(signal.SIGALRM, False)
820             task.LoopingCall(signal.alarm, 30).start(1)
821         
822         if args.irc_announce:
823             from twisted.words.protocols import irc
824             class IRCClient(irc.IRCClient):
825                 nickname = 'p2pool%02i' % (random.randrange(100),)
826                 channel = net.ANNOUNCE_CHANNEL
827                 def lineReceived(self, line):
828                     if p2pool.DEBUG:
829                         print repr(line)
830                     irc.IRCClient.lineReceived(self, line)
831                 def signedOn(self):
832                     irc.IRCClient.signedOn(self)
833                     self.factory.resetDelay()
834                     self.join(self.channel)
835                     @defer.inlineCallbacks
836                     def new_share(share):
837                         if share.pow_hash <= share.header['bits'].target and abs(share.timestamp - time.time()) < 10*60:
838                             yield deferral.sleep(random.expovariate(1/60))
839                             message = '\x02%s BLOCK FOUND by %s! %s%064x' % (net.NAME.upper(), bitcoin_data.script2_to_address(share.new_script, net.PARENT), net.PARENT.BLOCK_EXPLORER_URL_PREFIX, share.header_hash)
840                             if message not in self.recent_messages:
841                                 self.say(self.channel, message)
842                                 self._remember_message(message)
843                     self.watch_id = tracker.verified.added.watch(new_share)
844                     self.recent_messages = []
845                 def _remember_message(self, message):
846                     self.recent_messages.append(message)
847                     while len(self.recent_messages) > 100:
848                         self.recent_messages.pop(0)
849                 def privmsg(self, user, channel, message):
850                     if channel == self.channel:
851                         self._remember_message(message)
852                 def connectionLost(self, reason):
853                     tracker.verified.added.unwatch(self.watch_id)
854                     print 'IRC connection lost:', reason.getErrorMessage()
855             class IRCClientFactory(protocol.ReconnectingClientFactory):
856                 protocol = IRCClient
857             reactor.connectTCP("irc.freenode.net", 6667, IRCClientFactory())
858         
859         @defer.inlineCallbacks
860         def status_thread():
861             last_str = None
862             last_time = 0
863             while True:
864                 yield deferral.sleep(3)
865                 try:
866                     if time.time() > bitcoind_work.value['last_update'] + 60:
867                         print >>sys.stderr, '''---> LOST CONTACT WITH BITCOIND for %s! Check that it isn't frozen or dead! <---''' % (math.format_dt(time.time() - bitcoind_work.value['last_update']),)
868                     
869                     height = tracker.get_height(best_share_var.value)
870                     this_str = 'P2Pool: %i shares in chain (%i verified/%i total) Peers: %i (%i incoming)' % (
871                         height,
872                         len(tracker.verified.shares),
873                         len(tracker.shares),
874                         len(p2p_node.peers),
875                         sum(1 for peer in p2p_node.peers.itervalues() if peer.incoming),
876                     ) + (' FDs: %i R/%i W' % (len(reactor.getReaders()), len(reactor.getWriters())) if p2pool.DEBUG else '')
877                     
878                     datums, dt = wb.local_rate_monitor.get_datums_in_last()
879                     my_att_s = sum(datum['work']/dt for datum in datums)
880                     this_str += '\n Local: %sH/s in last %s Local dead on arrival: %s Expected time to share: %s' % (
881                         math.format(int(my_att_s)),
882                         math.format_dt(dt),
883                         math.format_binomial_conf(sum(1 for datum in datums if datum['dead']), len(datums), 0.95),
884                         math.format_dt(2**256 / tracker.shares[best_share_var.value].max_target / my_att_s) if my_att_s and best_share_var.value else '???',
885                     )
886                     
887                     if height > 2:
888                         (stale_orphan_shares, stale_doa_shares), shares, _ = wb.get_stale_counts()
889                         stale_prop = p2pool_data.get_average_stale_prop(tracker, best_share_var.value, min(60*60//net.SHARE_PERIOD, height))
890                         real_att_s = p2pool_data.get_pool_attempts_per_second(tracker, best_share_var.value, min(height - 1, 60*60//net.SHARE_PERIOD)) / (1 - stale_prop)
891                         
892                         this_str += '\n Shares: %i (%i orphan, %i dead) Stale rate: %s Efficiency: %s Current payout: %.4f %s' % (
893                             shares, stale_orphan_shares, stale_doa_shares,
894                             math.format_binomial_conf(stale_orphan_shares + stale_doa_shares, shares, 0.95),
895                             math.format_binomial_conf(stale_orphan_shares + stale_doa_shares, shares, 0.95, lambda x: (1 - x)/(1 - stale_prop)),
896                             get_current_txouts().get(bitcoin_data.pubkey_hash_to_script2(my_pubkey_hash), 0)*1e-8, net.PARENT.SYMBOL,
897                         )
898                         this_str += '\n Pool: %sH/s Stale rate: %.1f%% Expected time to block: %s' % (
899                             math.format(int(real_att_s)),
900                             100*stale_prop,
901                             math.format_dt(2**256 / bitcoind_work.value['bits'].target / real_att_s),
902                         )
903                         
904                         for warning in p2pool_data.get_warnings(tracker, best_share_var.value, net):
905                             print >>sys.stderr, '#'*40
906                             print >>sys.stderr, '>>> Warning: ' + warning
907                             print >>sys.stderr, '#'*40
908                     
909                     if this_str != last_str or time.time() > last_time + 15:
910                         print this_str
911                         last_str = this_str
912                         last_time = time.time()
913                 except:
914                     log.err()
915         status_thread()
916     except:
917         reactor.stop()
918         log.err(None, 'Fatal error:')
919
920 def run():
921     realnets=dict((name, net) for name, net in networks.nets.iteritems() if '_testnet' not in name)
922     
923     parser = fixargparse.FixedArgumentParser(description='p2pool (version %s)' % (p2pool.__version__,), fromfile_prefix_chars='@')
924     parser.add_argument('--version', action='version', version=p2pool.__version__)
925     parser.add_argument('--net',
926         help='use specified network (default: bitcoin)',
927         action='store', choices=sorted(realnets), default='bitcoin', dest='net_name')
928     parser.add_argument('--testnet',
929         help='''use the network's testnet''',
930         action='store_const', const=True, default=False, dest='testnet')
931     parser.add_argument('--debug',
932         help='enable debugging mode',
933         action='store_const', const=True, default=False, dest='debug')
934     parser.add_argument('-a', '--address',
935         help='generate payouts to this address (default: <address requested from bitcoind>)',
936         type=str, action='store', default=None, dest='address')
937     parser.add_argument('--datadir',
938         help='store data in this directory (default: <directory run_p2pool.py is in>/data)',
939         type=str, action='store', default=None, dest='datadir')
940     parser.add_argument('--logfile',
941         help='''log to this file (default: data/<NET>/log)''',
942         type=str, action='store', default=None, dest='logfile')
943     parser.add_argument('--merged',
944         help='call getauxblock on this url to get work for merged mining (example: http://ncuser:ncpass@127.0.0.1:10332/)',
945         type=str, action='append', default=[], dest='merged_urls')
946     parser.add_argument('--give-author', metavar='DONATION_PERCENTAGE',
947         help='donate this percentage of work towards the development of p2pool (default: 0.5)',
948         type=float, action='store', default=0.5, dest='donation_percentage')
949     parser.add_argument('--iocp',
950         help='use Windows IOCP API in order to avoid errors due to large number of sockets being open',
951         action='store_true', default=False, dest='iocp')
952     parser.add_argument('--irc-announce',
953         help='announce any blocks found on irc://irc.freenode.net/#p2pool',
954         action='store_true', default=False, dest='irc_announce')
955     parser.add_argument('--no-bugreport',
956         help='disable submitting caught exceptions to the author',
957         action='store_true', default=False, dest='no_bugreport')
958     
959     p2pool_group = parser.add_argument_group('p2pool interface')
960     p2pool_group.add_argument('--p2pool-port', metavar='PORT',
961         help='use port PORT to listen for connections (forward this port from your router!) (default: %s)' % ', '.join('%s:%i' % (name, net.P2P_PORT) for name, net in sorted(realnets.items())),
962         type=int, action='store', default=None, dest='p2pool_port')
963     p2pool_group.add_argument('-n', '--p2pool-node', metavar='ADDR[:PORT]',
964         help='connect to existing p2pool node at ADDR listening on port PORT (defaults to default p2pool P2P port) in addition to builtin addresses',
965         type=str, action='append', default=[], dest='p2pool_nodes')
966     parser.add_argument('--disable-upnp',
967         help='''don't attempt to use UPnP to forward p2pool's P2P port from the Internet to this computer''',
968         action='store_false', default=True, dest='upnp')
969     p2pool_group.add_argument('--max-conns', metavar='CONNS',
970         help='maximum incoming connections (default: 40)',
971         type=int, action='store', default=40, dest='p2pool_conns')
972     
973     worker_group = parser.add_argument_group('worker interface')
974     worker_group.add_argument('-w', '--worker-port', metavar='PORT or ADDR:PORT',
975         help='listen on PORT on interface with ADDR for RPC connections from miners (default: all interfaces, %s)' % ', '.join('%s:%i' % (name, net.WORKER_PORT) for name, net in sorted(realnets.items())),
976         type=str, action='store', default=None, dest='worker_endpoint')
977     worker_group.add_argument('-f', '--fee', metavar='FEE_PERCENTAGE',
978         help='''charge workers mining to their own bitcoin address (by setting their miner's username to a bitcoin address) this percentage fee to mine on your p2pool instance. Amount displayed at http://127.0.0.1:WORKER_PORT/fee (default: 0)''',
979         type=float, action='store', default=0, dest='worker_fee')
980     
981     bitcoind_group = parser.add_argument_group('bitcoind interface')
982     bitcoind_group.add_argument('--bitcoind-address', metavar='BITCOIND_ADDRESS',
983         help='connect to this address (default: 127.0.0.1)',
984         type=str, action='store', default='127.0.0.1', dest='bitcoind_address')
985     bitcoind_group.add_argument('--bitcoind-rpc-port', metavar='BITCOIND_RPC_PORT',
986         help='''connect to JSON-RPC interface at this port (default: %s <read from bitcoin.conf if password not provided>)''' % ', '.join('%s:%i' % (name, net.PARENT.RPC_PORT) for name, net in sorted(realnets.items())),
987         type=int, action='store', default=None, dest='bitcoind_rpc_port')
988     bitcoind_group.add_argument('--bitcoind-p2p-port', metavar='BITCOIND_P2P_PORT',
989         help='''connect to P2P interface at this port (default: %s <read from bitcoin.conf if password not provided>)''' % ', '.join('%s:%i' % (name, net.PARENT.P2P_PORT) for name, net in sorted(realnets.items())),
990         type=int, action='store', default=None, dest='bitcoind_p2p_port')
991     
992     bitcoind_group.add_argument(metavar='BITCOIND_RPCUSERPASS',
993         help='bitcoind RPC interface username, then password, space-separated (only one being provided will cause the username to default to being empty, and none will cause P2Pool to read them from bitcoin.conf)',
994         type=str, action='store', default=[], nargs='*', dest='bitcoind_rpc_userpass')
995     
996     args = parser.parse_args()
997     
998     if args.debug:
999         p2pool.DEBUG = True
1000     
1001     net_name = args.net_name + ('_testnet' if args.testnet else '')
1002     net = networks.nets[net_name]
1003     
1004     datadir_path = os.path.join((os.path.join(os.path.dirname(sys.argv[0]), 'data') if args.datadir is None else args.datadir), net_name)
1005     if not os.path.exists(datadir_path):
1006         os.makedirs(datadir_path)
1007     
1008     if len(args.bitcoind_rpc_userpass) > 2:
1009         parser.error('a maximum of two arguments are allowed')
1010     args.bitcoind_rpc_username, args.bitcoind_rpc_password = ([None, None] + args.bitcoind_rpc_userpass)[-2:]
1011     
1012     if args.bitcoind_rpc_password is None:
1013         if not hasattr(net.PARENT, 'CONF_FILE_FUNC'):
1014             parser.error('This network has no configuration file function. Manually enter your RPC password.')
1015         conf_path = net.PARENT.CONF_FILE_FUNC()
1016         if not os.path.exists(conf_path):
1017             parser.error('''Bitcoin configuration file not found. Manually enter your RPC password.\r\n'''
1018                 '''If you actually haven't created a configuration file, you should create one at %s with the text:\r\n'''
1019                 '''\r\n'''
1020                 '''server=1\r\n'''
1021                 '''rpcpassword=%x''' % (conf_path, random.randrange(2**128)))
1022         with open(conf_path, 'rb') as f:
1023             cp = ConfigParser.RawConfigParser()
1024             cp.readfp(StringIO.StringIO('[x]\r\n' + f.read()))
1025             for conf_name, var_name, var_type in [
1026                 ('rpcuser', 'bitcoind_rpc_username', str),
1027                 ('rpcpassword', 'bitcoind_rpc_password', str),
1028                 ('rpcport', 'bitcoind_rpc_port', int),
1029                 ('port', 'bitcoind_p2p_port', int),
1030             ]:
1031                 if getattr(args, var_name) is None and cp.has_option('x', conf_name):
1032                     setattr(args, var_name, var_type(cp.get('x', conf_name)))
1033         if args.bitcoind_rpc_password is None:
1034             parser.error('''Bitcoin configuration file didn't contain an rpcpassword= line! Add one!''')
1035     
1036     if args.bitcoind_rpc_username is None:
1037         args.bitcoind_rpc_username = ''
1038     
1039     if args.bitcoind_rpc_port is None:
1040         args.bitcoind_rpc_port = net.PARENT.RPC_PORT
1041     
1042     if args.bitcoind_p2p_port is None:
1043         args.bitcoind_p2p_port = net.PARENT.P2P_PORT
1044     
1045     if args.p2pool_port is None:
1046         args.p2pool_port = net.P2P_PORT
1047     
1048     if args.worker_endpoint is None:
1049         worker_endpoint = '', net.WORKER_PORT
1050     elif ':' not in args.worker_endpoint:
1051         worker_endpoint = '', int(args.worker_endpoint)
1052     else:
1053         addr, port = args.worker_endpoint.rsplit(':', 1)
1054         worker_endpoint = addr, int(port)
1055     
1056     if args.address is not None:
1057         try:
1058             args.pubkey_hash = bitcoin_data.address_to_pubkey_hash(args.address, net.PARENT)
1059         except Exception, e:
1060             parser.error('error parsing address: ' + repr(e))
1061     else:
1062         args.pubkey_hash = None
1063     
1064     def separate_url(url):
1065         s = urlparse.urlsplit(url)
1066         if '@' not in s.netloc:
1067             parser.error('merged url netloc must contain an "@"')
1068         userpass, new_netloc = s.netloc.rsplit('@', 1)
1069         return urlparse.urlunsplit(s._replace(netloc=new_netloc)), userpass
1070     merged_urls = map(separate_url, args.merged_urls)
1071     
1072     if args.logfile is None:
1073         args.logfile = os.path.join(datadir_path, 'log')
1074     
1075     logfile = logging.LogFile(args.logfile)
1076     pipe = logging.TimestampingPipe(logging.TeePipe([logging.EncodeReplacerPipe(sys.stderr), logfile]))
1077     sys.stdout = logging.AbortPipe(pipe)
1078     sys.stderr = log.DefaultObserver.stderr = logging.AbortPipe(logging.PrefixPipe(pipe, '> '))
1079     if hasattr(signal, "SIGUSR1"):
1080         def sigusr1(signum, frame):
1081             print 'Caught SIGUSR1, closing %r...' % (args.logfile,)
1082             logfile.reopen()
1083             print '...and reopened %r after catching SIGUSR1.' % (args.logfile,)
1084         signal.signal(signal.SIGUSR1, sigusr1)
1085     task.LoopingCall(logfile.reopen).start(5)
1086     
1087     class ErrorReporter(object):
1088         def __init__(self):
1089             self.last_sent = None
1090         
1091         def emit(self, eventDict):
1092             if not eventDict["isError"]:
1093                 return
1094             
1095             if self.last_sent is not None and time.time() < self.last_sent + 5:
1096                 return
1097             self.last_sent = time.time()
1098             
1099             if 'failure' in eventDict:
1100                 text = ((eventDict.get('why') or 'Unhandled Error')
1101                     + '\n' + eventDict['failure'].getTraceback())
1102             else:
1103                 text = " ".join([str(m) for m in eventDict["message"]]) + "\n"
1104             
1105             from twisted.web import client
1106             client.getPage(
1107                 url='http://u.forre.st/p2pool_error.cgi',
1108                 method='POST',
1109                 postdata=p2pool.__version__ + ' ' + net.NAME + '\n' + text,
1110                 timeout=15,
1111             ).addBoth(lambda x: None)
1112     if not args.no_bugreport:
1113         log.addObserver(ErrorReporter().emit)
1114     
1115     reactor.callWhenRunning(main, args, net, datadir_path, merged_urls, worker_endpoint)
1116     reactor.run()