display per-miner current payout in graphs
[p2pool.git] / p2pool / main.py
1 from __future__ import division
2
3 import ConfigParser
4 import StringIO
5 import argparse
6 import base64
7 import os
8 import random
9 import sys
10 import time
11 import signal
12 import traceback
13 import urlparse
14
15 try:
16     from twisted.internet import iocpreactor
17     iocpreactor.install()
18 except:
19     pass
20 else:
21     print 'Using IOCP reactor!'
22 from twisted.internet import defer, reactor, protocol, task
23 from twisted.web import server
24 from twisted.python import log
25 from nattraverso import portmapper, ipdiscover
26
27 import bitcoin.p2p as bitcoin_p2p, bitcoin.getwork as bitcoin_getwork, bitcoin.data as bitcoin_data
28 from bitcoin import worker_interface, height_tracker
29 from util import expiring_dict, jsonrpc, variable, deferral, math, logging, pack
30 from . import p2p, networks, web
31 import p2pool, p2pool.data as p2pool_data
32
33 @deferral.retry('Error getting work from bitcoind:', 3)
34 @defer.inlineCallbacks
35 def getwork(bitcoind):
36     try:
37         work = yield bitcoind.rpc_getmemorypool()
38     except jsonrpc.Error, e:
39         if e.code == -32601: # Method not found
40             print >>sys.stderr, 'Error: Bitcoin version too old! Upgrade to v0.5 or newer!'
41             raise deferral.RetrySilentlyException()
42         raise
43     packed_transactions = [x.decode('hex') for x in work['transactions']]
44     defer.returnValue(dict(
45         version=work['version'],
46         previous_block_hash=int(work['previousblockhash'], 16),
47         transactions=map(bitcoin_data.tx_type.unpack, packed_transactions),
48         merkle_link=bitcoin_data.calculate_merkle_link([0] + map(bitcoin_data.hash256, packed_transactions), 0), # using 0 is a bit of a hack, but will always work when index=0
49         subsidy=work['coinbasevalue'],
50         time=work['time'],
51         bits=bitcoin_data.FloatingIntegerType().unpack(work['bits'].decode('hex')[::-1]) if isinstance(work['bits'], (str, unicode)) else bitcoin_data.FloatingInteger(work['bits']),
52         coinbaseflags=work['coinbaseflags'].decode('hex') if 'coinbaseflags' in work else ''.join(x.decode('hex') for x in work['coinbaseaux'].itervalues()) if 'coinbaseaux' in work else '',
53     ))
54
55 @defer.inlineCallbacks
56 def main(args, net, datadir_path, merged_urls, worker_endpoint):
57     try:
58         print 'p2pool (version %s)' % (p2pool.__version__,)
59         print
60         
61         # connect to bitcoind over JSON-RPC and do initial getmemorypool
62         url = 'http://%s:%i/' % (args.bitcoind_address, args.bitcoind_rpc_port)
63         print '''Testing bitcoind RPC connection to '%s' with username '%s'...''' % (url, args.bitcoind_rpc_username)
64         bitcoind = jsonrpc.Proxy(url, dict(Authorization='Basic ' + base64.b64encode(args.bitcoind_rpc_username + ':' + args.bitcoind_rpc_password)), timeout=30)
65         @deferral.retry('Error while checking Bitcoin connection:', 1)
66         @defer.inlineCallbacks
67         def check():
68             if not (yield net.PARENT.RPC_CHECK)(bitcoind):
69                 print >>sys.stderr, "    Check failed! Make sure that you're connected to the right bitcoind with --bitcoind-rpc-port!"
70                 raise deferral.RetrySilentlyException()
71             temp_work = yield getwork(bitcoind)
72             if not net.VERSION_CHECK((yield bitcoind.rpc_getinfo())['version'], temp_work):
73                 print >>sys.stderr, '    Bitcoin version too old! BIP16 support required! Upgrade to 0.6.0rc4 or greater!'
74                 raise deferral.RetrySilentlyException()
75             defer.returnValue(temp_work)
76         temp_work = yield check()
77         print '    ...success!'
78         print '    Current block hash: %x' % (temp_work['previous_block_hash'],)
79         print
80         
81         # connect to bitcoind over bitcoin-p2p
82         print '''Testing bitcoind P2P connection to '%s:%s'...''' % (args.bitcoind_address, args.bitcoind_p2p_port)
83         factory = bitcoin_p2p.ClientFactory(net.PARENT)
84         reactor.connectTCP(args.bitcoind_address, args.bitcoind_p2p_port, factory)
85         yield factory.getProtocol() # waits until handshake is successful
86         print '    ...success!'
87         print
88         
89         print 'Determining payout address...'
90         if args.pubkey_hash is None:
91             address_path = os.path.join(datadir_path, 'cached_payout_address')
92             
93             if os.path.exists(address_path):
94                 with open(address_path, 'rb') as f:
95                     address = f.read().strip('\r\n')
96                 print '    Loaded cached address: %s...' % (address,)
97             else:
98                 address = None
99             
100             if address is not None:
101                 res = yield deferral.retry('Error validating cached address:', 5)(lambda: bitcoind.rpc_validateaddress(address))()
102                 if not res['isvalid'] or not res['ismine']:
103                     print '    Cached address is either invalid or not controlled by local bitcoind!'
104                     address = None
105             
106             if address is None:
107                 print '    Getting payout address from bitcoind...'
108                 address = yield deferral.retry('Error getting payout address from bitcoind:', 5)(lambda: bitcoind.rpc_getaccountaddress('p2pool'))()
109             
110             with open(address_path, 'wb') as f:
111                 f.write(address)
112             
113             my_pubkey_hash = bitcoin_data.address_to_pubkey_hash(address, net.PARENT)
114         else:
115             my_pubkey_hash = args.pubkey_hash
116         print '    ...success! Payout address:', bitcoin_data.pubkey_hash_to_address(my_pubkey_hash, net.PARENT)
117         print
118         
119         my_share_hashes = set()
120         my_doa_share_hashes = set()
121         
122         tracker = p2pool_data.OkayTracker(net, my_share_hashes, my_doa_share_hashes)
123         shared_share_hashes = set()
124         ss = p2pool_data.ShareStore(os.path.join(datadir_path, 'shares.'), net)
125         known_verified = set()
126         print "Loading shares..."
127         for i, (mode, contents) in enumerate(ss.get_shares()):
128             if mode == 'share':
129                 if contents.hash in tracker.shares:
130                     continue
131                 shared_share_hashes.add(contents.hash)
132                 contents.time_seen = 0
133                 tracker.add(contents)
134                 if len(tracker.shares) % 1000 == 0 and tracker.shares:
135                     print "    %i" % (len(tracker.shares),)
136             elif mode == 'verified_hash':
137                 known_verified.add(contents)
138             else:
139                 raise AssertionError()
140         print "    ...inserting %i verified shares..." % (len(known_verified),)
141         for h in known_verified:
142             if h not in tracker.shares:
143                 ss.forget_verified_share(h)
144                 continue
145             tracker.verified.add(tracker.shares[h])
146         print "    ...done loading %i shares!" % (len(tracker.shares),)
147         print
148         tracker.removed.watch(lambda share: ss.forget_share(share.hash))
149         tracker.verified.removed.watch(lambda share: ss.forget_verified_share(share.hash))
150         tracker.removed.watch(lambda share: shared_share_hashes.discard(share.hash))
151         
152         peer_heads = expiring_dict.ExpiringDict(300) # hash -> peers that know of it
153         
154         pre_current_work = variable.Variable(None)
155         pre_merged_work = variable.Variable({})
156         # information affecting work that should trigger a long-polling update
157         current_work = variable.Variable(None)
158         # information affecting work that should not trigger a long-polling update
159         current_work2 = variable.Variable(None)
160         
161         requested = expiring_dict.ExpiringDict(300)
162         
163         print 'Initializing work...'
164         @defer.inlineCallbacks
165         def set_real_work1():
166             work = yield getwork(bitcoind)
167             current_work2.set(dict(
168                 time=work['time'],
169                 transactions=work['transactions'],
170                 merkle_link=work['merkle_link'],
171                 subsidy=work['subsidy'],
172                 clock_offset=time.time() - work['time'],
173                 last_update=time.time(),
174             )) # second set first because everything hooks on the first
175             pre_current_work.set(dict(
176                 version=work['version'],
177                 previous_block=work['previous_block_hash'],
178                 bits=work['bits'],
179                 coinbaseflags=work['coinbaseflags'],
180             ))
181         yield set_real_work1()
182         
183         get_height_rel_highest = yield height_tracker.get_height_rel_highest_func(bitcoind, factory, lambda: pre_current_work.value['previous_block'], net)
184         
185         def set_real_work2():
186             best, desired = tracker.think(get_height_rel_highest, pre_current_work.value['previous_block'], pre_current_work.value['bits'])
187             
188             t = dict(pre_current_work.value)
189             t['best_share_hash'] = best
190             t['mm_chains'] = pre_merged_work.value
191             current_work.set(t)
192             
193             t = time.time()
194             for peer2, share_hash in desired:
195                 if share_hash not in tracker.tails: # was received in the time tracker.think was running
196                     continue
197                 last_request_time, count = requested.get(share_hash, (None, 0))
198                 if last_request_time is not None and last_request_time - 5 < t < last_request_time + 10 * 1.5**count:
199                     continue
200                 potential_peers = set()
201                 for head in tracker.tails[share_hash]:
202                     potential_peers.update(peer_heads.get(head, set()))
203                 potential_peers = [peer for peer in potential_peers if peer.connected2]
204                 if count == 0 and peer2 is not None and peer2.connected2:
205                     peer = peer2
206                 else:
207                     peer = random.choice(potential_peers) if potential_peers and random.random() > .2 else peer2
208                     if peer is None:
209                         continue
210                 
211                 print 'Requesting parent share %s from %s' % (p2pool_data.format_hash(share_hash), '%s:%i' % peer.addr)
212                 peer.send_getshares(
213                     hashes=[share_hash],
214                     parents=2000,
215                     stops=list(set(tracker.heads) | set(
216                         tracker.get_nth_parent_hash(head, min(max(0, tracker.get_height_and_last(head)[0] - 1), 10)) for head in tracker.heads
217                     ))[:100],
218                 )
219                 requested[share_hash] = t, count + 1
220         pre_current_work.changed.watch(lambda _: set_real_work2())
221         pre_merged_work.changed.watch(lambda _: set_real_work2())
222         set_real_work2()
223         print '    ...success!'
224         print
225         
226         
227         @defer.inlineCallbacks
228         def set_merged_work(merged_url, merged_userpass):
229             merged_proxy = jsonrpc.Proxy(merged_url, dict(Authorization='Basic ' + base64.b64encode(merged_userpass)))
230             while True:
231                 auxblock = yield deferral.retry('Error while calling merged getauxblock:', 1)(merged_proxy.rpc_getauxblock)()
232                 pre_merged_work.set(dict(pre_merged_work.value, **{auxblock['chainid']: dict(
233                     hash=int(auxblock['hash'], 16),
234                     target=pack.IntType(256).unpack(auxblock['target'].decode('hex')),
235                     merged_proxy=merged_proxy,
236                 )}))
237                 yield deferral.sleep(1)
238         for merged_url, merged_userpass in merged_urls:
239             set_merged_work(merged_url, merged_userpass)
240         
241         @pre_merged_work.changed.watch
242         def _(new_merged_work):
243             print 'Got new merged mining work!'
244         
245         # setup p2p logic and join p2pool network
246         
247         class Node(p2p.Node):
248             def handle_shares(self, shares, peer):
249                 if len(shares) > 5:
250                     print 'Processing %i shares from %s...' % (len(shares), '%s:%i' % peer.addr if peer is not None else None)
251                 
252                 new_count = 0
253                 for share in shares:
254                     if share.hash in tracker.shares:
255                         #print 'Got duplicate share, ignoring. Hash: %s' % (p2pool_data.format_hash(share.hash),)
256                         continue
257                     
258                     new_count += 1
259                     
260                     #print 'Received share %s from %r' % (p2pool_data.format_hash(share.hash), share.peer.addr if share.peer is not None else None)
261                     
262                     tracker.add(share)
263                 
264                 if shares and peer is not None:
265                     peer_heads.setdefault(shares[0].hash, set()).add(peer)
266                 
267                 if new_count:
268                     set_real_work2()
269                 
270                 if len(shares) > 5:
271                     print '... done processing %i shares. New: %i Have: %i/~%i' % (len(shares), new_count, len(tracker.shares), 2*net.CHAIN_LENGTH)
272             
273             def handle_share_hashes(self, hashes, peer):
274                 t = time.time()
275                 get_hashes = []
276                 for share_hash in hashes:
277                     if share_hash in tracker.shares:
278                         continue
279                     last_request_time, count = requested.get(share_hash, (None, 0))
280                     if last_request_time is not None and last_request_time - 5 < t < last_request_time + 10 * 1.5**count:
281                         continue
282                     print 'Got share hash, requesting! Hash: %s' % (p2pool_data.format_hash(share_hash),)
283                     get_hashes.append(share_hash)
284                     requested[share_hash] = t, count + 1
285                 
286                 if hashes and peer is not None:
287                     peer_heads.setdefault(hashes[0], set()).add(peer)
288                 if get_hashes:
289                     peer.send_getshares(hashes=get_hashes, parents=0, stops=[])
290             
291             def handle_get_shares(self, hashes, parents, stops, peer):
292                 parents = min(parents, 1000//len(hashes))
293                 stops = set(stops)
294                 shares = []
295                 for share_hash in hashes:
296                     for share in tracker.get_chain(share_hash, min(parents + 1, tracker.get_height(share_hash))):
297                         if share.hash in stops:
298                             break
299                         shares.append(share)
300                 print 'Sending %i shares to %s:%i' % (len(shares), peer.addr[0], peer.addr[1])
301                 return shares
302         
303         @deferral.retry('Error submitting block: (will retry)', 10, 10)
304         @defer.inlineCallbacks
305         def submit_block(block, ignore_failure):
306             success = yield bitcoind.rpc_getmemorypool(bitcoin_data.block_type.pack(block).encode('hex'))
307             success_expected = net.PARENT.POW_FUNC(bitcoin_data.block_header_type.pack(block['header'])) <= block['header']['bits'].target
308             if (not success and success_expected and not ignore_failure) or (success and not success_expected):
309                 print >>sys.stderr, 'Block submittal result: %s Expected: %s' % (result, expected_result)
310         
311         @tracker.verified.added.watch
312         def _(share):
313             if share.pow_hash <= share.header['bits'].target:
314                 submit_block(share.as_block(tracker), ignore_failure=True)
315                 print
316                 print 'GOT BLOCK FROM PEER! Passing to bitcoind! %s bitcoin: %s%064x' % (p2pool_data.format_hash(share.hash), net.PARENT.BLOCK_EXPLORER_URL_PREFIX, share.header_hash)
317                 print
318         
319         print 'Joining p2pool network using port %i...' % (args.p2pool_port,)
320         
321         @defer.inlineCallbacks
322         def parse(x):
323             if ':' in x:
324                 ip, port = x.split(':')
325                 defer.returnValue(((yield reactor.resolve(ip)), int(port)))
326             else:
327                 defer.returnValue(((yield reactor.resolve(x)), net.P2P_PORT))
328         
329         addrs = {}
330         if os.path.exists(os.path.join(datadir_path, 'addrs.txt')):
331             try:
332                 addrs.update(dict(eval(x) for x in open(os.path.join(datadir_path, 'addrs.txt'))))
333             except:
334                 print >>sys.stderr, "error reading addrs"
335         for addr_df in map(parse, net.BOOTSTRAP_ADDRS):
336             try:
337                 addr = yield addr_df
338                 if addr not in addrs:
339                     addrs[addr] = (0, time.time(), time.time())
340             except:
341                 log.err()
342         
343         connect_addrs = set()
344         for addr_df in map(parse, args.p2pool_nodes):
345             try:
346                 connect_addrs.add((yield addr_df))
347             except:
348                 log.err()
349         
350         p2p_node = Node(
351             best_share_hash_func=lambda: current_work.value['best_share_hash'],
352             port=args.p2pool_port,
353             net=net,
354             addr_store=addrs,
355             connect_addrs=connect_addrs,
356             max_incoming_conns=args.p2pool_conns,
357         )
358         p2p_node.start()
359         
360         task.LoopingCall(lambda: open(os.path.join(datadir_path, 'addrs.txt'), 'w').writelines(repr(x) + '\n' for x in p2p_node.addr_store.iteritems())).start(60)
361         
362         # send share when the chain changes to their chain
363         def work_changed(new_work):
364             #print 'Work changed:', new_work
365             shares = []
366             for share in tracker.get_chain(new_work['best_share_hash'], min(5, tracker.get_height(new_work['best_share_hash']))):
367                 if share.hash in shared_share_hashes:
368                     break
369                 shared_share_hashes.add(share.hash)
370                 shares.append(share)
371             
372             for peer in p2p_node.peers.itervalues():
373                 peer.sendShares([share for share in shares if share.peer is not peer])
374         
375         current_work.changed.watch(work_changed)
376         
377         def save_shares():
378             for share in tracker.get_chain(current_work.value['best_share_hash'], min(tracker.get_height(current_work.value['best_share_hash']), 2*net.CHAIN_LENGTH)):
379                 ss.add_share(share)
380                 if share.hash in tracker.verified.shares:
381                     ss.add_verified_hash(share.hash)
382         task.LoopingCall(save_shares).start(60)
383         
384         print '    ...success!'
385         print
386         
387         if args.upnp:
388             @defer.inlineCallbacks
389             def upnp_thread():
390                 while True:
391                     try:
392                         is_lan, lan_ip = yield ipdiscover.get_local_ip()
393                         if is_lan:
394                             pm = yield portmapper.get_port_mapper()
395                             yield pm._upnp.add_port_mapping(lan_ip, args.p2pool_port, args.p2pool_port, 'p2pool', 'TCP')
396                     except defer.TimeoutError:
397                         pass
398                     except:
399                         if p2pool.DEBUG:
400                             log.err(None, 'UPnP error:')
401                     yield deferral.sleep(random.expovariate(1/120))
402             upnp_thread()
403         
404         # start listening for workers with a JSON-RPC server
405         
406         print 'Listening for workers on %r port %i...' % (worker_endpoint[0], worker_endpoint[1])
407         
408         # setup worker logic
409         
410         removed_unstales_var = variable.Variable((0, 0, 0))
411         removed_doa_unstales_var = variable.Variable(0)
412         @tracker.verified.removed.watch
413         def _(share):
414             if share.hash in my_share_hashes and tracker.is_child_of(share.hash, current_work.value['best_share_hash']):
415                 assert share.share_data['stale_info'] in [0, 253, 254] # we made these shares in this instance
416                 removed_unstales_var.set((
417                     removed_unstales_var.value[0] + 1,
418                     removed_unstales_var.value[1] + (1 if share.share_data['stale_info'] == 253 else 0),
419                     removed_unstales_var.value[2] + (1 if share.share_data['stale_info'] == 254 else 0),
420                 ))
421             if share.hash in my_doa_share_hashes and tracker.is_child_of(share.hash, current_work.value['best_share_hash']):
422                 removed_doa_unstales.set(removed_doa_unstales.value + 1)
423         
424         def get_stale_counts():
425             '''Returns (orphans, doas), total, (orphans_recorded_in_chain, doas_recorded_in_chain)'''
426             my_shares = len(my_share_hashes)
427             my_doa_shares = len(my_doa_share_hashes)
428             delta = tracker.verified.get_delta(current_work.value['best_share_hash'])
429             my_shares_in_chain = delta.my_count + removed_unstales_var.value[0]
430             my_doa_shares_in_chain = delta.my_doa_count + removed_doa_unstales_var.value
431             orphans_recorded_in_chain = delta.my_orphan_announce_count + removed_unstales_var.value[1]
432             doas_recorded_in_chain = delta.my_dead_announce_count + removed_unstales_var.value[2]
433             
434             my_shares_not_in_chain = my_shares - my_shares_in_chain
435             my_doa_shares_not_in_chain = my_doa_shares - my_doa_shares_in_chain
436             
437             return (my_shares_not_in_chain - my_doa_shares_not_in_chain, my_doa_shares_not_in_chain), my_shares, (orphans_recorded_in_chain, doas_recorded_in_chain)
438         
439         
440         pseudoshare_received = variable.Event()
441         share_received = variable.Event()
442         local_rate_monitor = math.RateMonitor(10*60)
443         
444         class WorkerBridge(worker_interface.WorkerBridge):
445             def __init__(self):
446                 worker_interface.WorkerBridge.__init__(self)
447                 self.new_work_event = current_work.changed
448                 self.recent_shares_ts_work = []
449             
450             def get_user_details(self, request):
451                 user = request.getUser() if request.getUser() is not None else ''
452                 
453                 desired_pseudoshare_target = None
454                 if '+' in user:
455                     user, desired_pseudoshare_difficulty_str = user.rsplit('+', 1)
456                     try:
457                         desired_pseudoshare_target = bitcoin_data.difficulty_to_target(float(desired_pseudoshare_difficulty_str))
458                     except:
459                         pass
460                 
461                 desired_share_target = 2**256 - 1
462                 if '/' in user:
463                     user, min_diff_str = user.rsplit('/', 1)
464                     try:
465                         desired_share_target = bitcoin_data.difficulty_to_target(float(min_diff_str))
466                     except:
467                         pass
468                 
469                 if random.uniform(0, 100) < args.worker_fee:
470                     pubkey_hash = my_pubkey_hash
471                 else:
472                     try:
473                         pubkey_hash = bitcoin_data.address_to_pubkey_hash(user, net.PARENT)
474                     except: # XXX blah
475                         pubkey_hash = my_pubkey_hash
476                 
477                 return user, pubkey_hash, desired_share_target, desired_pseudoshare_target
478             
479             def preprocess_request(self, request):
480                 user, pubkey_hash, desired_share_target, desired_pseudoshare_target = self.get_user_details(request)
481                 return pubkey_hash, desired_share_target, desired_pseudoshare_target
482             
483             def get_work(self, pubkey_hash, desired_share_target, desired_pseudoshare_target):
484                 if len(p2p_node.peers) == 0 and net.PERSIST:
485                     raise jsonrpc.Error(-12345, u'p2pool is not connected to any peers')
486                 if current_work.value['best_share_hash'] is None and net.PERSIST:
487                     raise jsonrpc.Error(-12345, u'p2pool is downloading shares')
488                 if time.time() > current_work2.value['last_update'] + 60:
489                     raise jsonrpc.Error(-12345, u'lost contact with bitcoind')
490                 
491                 if current_work.value['mm_chains']:
492                     tree, size = bitcoin_data.make_auxpow_tree(current_work.value['mm_chains'])
493                     mm_hashes = [current_work.value['mm_chains'].get(tree.get(i), dict(hash=0))['hash'] for i in xrange(size)]
494                     mm_data = '\xfa\xbemm' + bitcoin_data.aux_pow_coinbase_type.pack(dict(
495                         merkle_root=bitcoin_data.merkle_hash(mm_hashes),
496                         size=size,
497                         nonce=0,
498                     ))
499                     mm_later = [(aux_work, mm_hashes.index(aux_work['hash']), mm_hashes) for chain_id, aux_work in current_work.value['mm_chains'].iteritems()]
500                 else:
501                     mm_data = ''
502                     mm_later = []
503                 
504                 if True:
505                     share_info, generate_tx = p2pool_data.Share.generate_transaction(
506                         tracker=tracker,
507                         share_data=dict(
508                             previous_share_hash=current_work.value['best_share_hash'],
509                             coinbase=(mm_data + current_work.value['coinbaseflags'])[:100],
510                             nonce=random.randrange(2**32),
511                             pubkey_hash=pubkey_hash,
512                             subsidy=current_work2.value['subsidy'],
513                             donation=math.perfect_round(65535*args.donation_percentage/100),
514                             stale_info=(lambda (orphans, doas), total, (orphans_recorded_in_chain, doas_recorded_in_chain):
515                                 253 if orphans > orphans_recorded_in_chain else
516                                 254 if doas > doas_recorded_in_chain else
517                                 0
518                             )(*get_stale_counts()),
519                             desired_version=1,
520                         ),
521                         block_target=current_work.value['bits'].target,
522                         desired_timestamp=int(time.time() - current_work2.value['clock_offset']),
523                         desired_target=desired_share_target,
524                         ref_merkle_link=dict(branch=[], index=0),
525                         net=net,
526                     )
527                 
528                 target = net.PARENT.SANE_MAX_TARGET
529                 if desired_pseudoshare_target is None:
530                     if len(self.recent_shares_ts_work) == 50:
531                         hash_rate = sum(work for ts, work in self.recent_shares_ts_work[1:])//(self.recent_shares_ts_work[-1][0] - self.recent_shares_ts_work[0][0])
532                         target = min(target, 2**256//hash_rate)
533                 else:
534                     target = min(target, desired_pseudoshare_target)
535                 target = max(target, share_info['bits'].target)
536                 for aux_work in current_work.value['mm_chains'].itervalues():
537                     target = max(target, aux_work['target'])
538                 
539                 transactions = [generate_tx] + list(current_work2.value['transactions'])
540                 packed_generate_tx = bitcoin_data.tx_type.pack(generate_tx)
541                 merkle_root = bitcoin_data.check_merkle_link(bitcoin_data.hash256(packed_generate_tx), current_work2.value['merkle_link'])
542                 
543                 getwork_time = time.time()
544                 merkle_link = current_work2.value['merkle_link']
545                 
546                 print 'New work for worker! Difficulty: %.06f Share difficulty: %.06f Total block value: %.6f %s including %i transactions' % (
547                     bitcoin_data.target_to_difficulty(target),
548                     bitcoin_data.target_to_difficulty(share_info['bits'].target),
549                     current_work2.value['subsidy']*1e-8, net.PARENT.SYMBOL,
550                     len(current_work2.value['transactions']),
551                 )
552                 
553                 ba = bitcoin_getwork.BlockAttempt(
554                     version=current_work.value['version'],
555                     previous_block=current_work.value['previous_block'],
556                     merkle_root=merkle_root,
557                     timestamp=current_work2.value['time'],
558                     bits=current_work.value['bits'],
559                     share_target=target,
560                 )
561                 
562                 received_header_hashes = set()
563                 
564                 def got_response(header, request):
565                     user, _, _, _ = self.get_user_details(request)
566                     assert header['merkle_root'] == merkle_root
567                     
568                     header_hash = bitcoin_data.hash256(bitcoin_data.block_header_type.pack(header))
569                     pow_hash = net.PARENT.POW_FUNC(bitcoin_data.block_header_type.pack(header))
570                     on_time = current_work.value['best_share_hash'] == share_info['share_data']['previous_share_hash']
571                     
572                     try:
573                         if pow_hash <= header['bits'].target or p2pool.DEBUG:
574                             submit_block(dict(header=header, txs=transactions), ignore_failure=False)
575                             if pow_hash <= header['bits'].target:
576                                 print
577                                 print 'GOT BLOCK FROM MINER! Passing to bitcoind! %s%064x' % (net.PARENT.BLOCK_EXPLORER_URL_PREFIX, header_hash)
578                                 print
579                     except:
580                         log.err(None, 'Error while processing potential block:')
581                     
582                     for aux_work, index, hashes in mm_later:
583                         try:
584                             if pow_hash <= aux_work['target'] or p2pool.DEBUG:
585                                 df = deferral.retry('Error submitting merged block: (will retry)', 10, 10)(aux_work['merged_proxy'].rpc_getauxblock)(
586                                     pack.IntType(256, 'big').pack(aux_work['hash']).encode('hex'),
587                                     bitcoin_data.aux_pow_type.pack(dict(
588                                         merkle_tx=dict(
589                                             tx=transactions[0],
590                                             block_hash=header_hash,
591                                             merkle_link=merkle_link,
592                                         ),
593                                         merkle_link=bitcoin_data.calculate_merkle_link(hashes, index),
594                                         parent_block_header=header,
595                                     )).encode('hex'),
596                                 )
597                                 @df.addCallback
598                                 def _(result):
599                                     if result != (pow_hash <= aux_work['target']):
600                                         print >>sys.stderr, 'Merged block submittal result: %s Expected: %s' % (result, pow_hash <= aux_work['target'])
601                                     else:
602                                         print 'Merged block submittal result: %s' % (result,)
603                                 @df.addErrback
604                                 def _(err):
605                                     log.err(err, 'Error submitting merged block:')
606                         except:
607                             log.err(None, 'Error while processing merged mining POW:')
608                     
609                     if pow_hash <= share_info['bits'].target and header_hash not in received_header_hashes:
610                         min_header = dict(header);del min_header['merkle_root']
611                         hash_link = p2pool_data.prefix_to_hash_link(packed_generate_tx[:-32-4], p2pool_data.Share.gentx_before_refhash)
612                         share = p2pool_data.Share(net, None, dict(
613                             min_header=min_header, share_info=share_info, hash_link=hash_link,
614                             ref_merkle_link=dict(branch=[], index=0),
615                         ), merkle_link=merkle_link, other_txs=transactions[1:] if pow_hash <= header['bits'].target else None)
616                         
617                         print 'GOT SHARE! %s %s prev %s age %.2fs%s' % (
618                             request.getUser(),
619                             p2pool_data.format_hash(share.hash),
620                             p2pool_data.format_hash(share.previous_hash),
621                             time.time() - getwork_time,
622                             ' DEAD ON ARRIVAL' if not on_time else '',
623                         )
624                         my_share_hashes.add(share.hash)
625                         if not on_time:
626                             my_doa_share_hashes.add(share.hash)
627                         
628                         tracker.add(share)
629                         if not p2pool.DEBUG:
630                             tracker.verified.add(share)
631                         set_real_work2()
632                         
633                         try:
634                             if pow_hash <= header['bits'].target or p2pool.DEBUG:
635                                 for peer in p2p_node.peers.itervalues():
636                                     peer.sendShares([share])
637                                 shared_share_hashes.add(share.hash)
638                         except:
639                             log.err(None, 'Error forwarding block solution:')
640                         
641                         share_received.happened(bitcoin_data.target_to_average_attempts(share.target), not on_time)
642                     
643                     if pow_hash > target:
644                         print 'Worker %s submitted share with hash > target:' % (request.getUser(),)
645                         print '    Hash:   %56x' % (pow_hash,)
646                         print '    Target: %56x' % (target,)
647                     elif header_hash in received_header_hashes:
648                         print >>sys.stderr, 'Worker %s @ %s submitted share more than once!' % (request.getUser(), request.getClientIP())
649                     else:
650                         received_header_hashes.add(header_hash)
651                         
652                         pseudoshare_received.happened(bitcoin_data.target_to_average_attempts(target), not on_time, user)
653                         self.recent_shares_ts_work.append((time.time(), bitcoin_data.target_to_average_attempts(target)))
654                         while len(self.recent_shares_ts_work) > 50:
655                             self.recent_shares_ts_work.pop(0)
656                         local_rate_monitor.add_datum(dict(work=bitcoin_data.target_to_average_attempts(target), dead=not on_time, user=user))
657                     
658                     return on_time
659                 
660                 return ba, got_response
661         
662         get_current_txouts = lambda: p2pool_data.get_expected_payouts(tracker, current_work.value['best_share_hash'], current_work.value['bits'].target, current_work2.value['subsidy'], net)
663         
664         web_root = web.get_web_root(tracker, current_work, current_work2, get_current_txouts, datadir_path, net, get_stale_counts, my_pubkey_hash, local_rate_monitor, args.worker_fee, p2p_node, my_share_hashes, pseudoshare_received, share_received)
665         worker_interface.WorkerInterface(WorkerBridge()).attach_to(web_root, get_handler=lambda request: request.redirect('/static/'))
666         
667         deferral.retry('Error binding to worker port:', traceback=False)(reactor.listenTCP)(worker_endpoint[1], server.Site(web_root), interface=worker_endpoint[0])
668         
669         with open(os.path.join(os.path.join(datadir_path, 'ready_flag')), 'wb') as f:
670             pass
671         
672         print '    ...success!'
673         print
674         
675         
676         @defer.inlineCallbacks
677         def work_poller():
678             while True:
679                 flag = factory.new_block.get_deferred()
680                 try:
681                     yield set_real_work1()
682                 except:
683                     log.err()
684                 yield defer.DeferredList([flag, deferral.sleep(15)], fireOnOneCallback=True)
685         work_poller()
686         
687         
688         # done!
689         print 'Started successfully!'
690         print 'Go to http://127.0.0.1:%i/ to view graphs and statistics!' % (worker_endpoint[1],)
691         if args.donation_percentage > 0.51:
692             print '''Donating %.1f%% of work towards P2Pool's development. Thanks for the tip!''' % (args.donation_percentage,)
693         elif args.donation_percentage < 0.49:
694             print '''Donating %.1f%% of work towards P2Pool's development. Please donate to encourage further development of P2Pool!''' % (args.donation_percentage,)
695         else:
696             print '''Donating %.1f%% of work towards P2Pool's development. Thank you!''' % (args.donation_percentage,)
697             print 'You can increase this amount with --give-author argument! (or decrease it, if you must)'
698         print
699         
700         
701         if hasattr(signal, 'SIGALRM'):
702             signal.signal(signal.SIGALRM, lambda signum, frame: reactor.callFromThread(
703                 sys.stderr.write, 'Watchdog timer went off at:\n' + ''.join(traceback.format_stack())
704             ))
705             signal.siginterrupt(signal.SIGALRM, False)
706             task.LoopingCall(signal.alarm, 30).start(1)
707         
708         if args.irc_announce:
709             from twisted.words.protocols import irc
710             class IRCClient(irc.IRCClient):
711                 nickname = 'p2pool%02i' % (random.randrange(100),)
712                 channel = net.ANNOUNCE_CHANNEL
713                 def lineReceived(self, line):
714                     if p2pool.DEBUG:
715                         print repr(line)
716                     irc.IRCClient.lineReceived(self, line)
717                 def signedOn(self):
718                     irc.IRCClient.signedOn(self)
719                     self.factory.resetDelay()
720                     self.join(self.channel)
721                     @defer.inlineCallbacks
722                     def new_share(share):
723                         if share.pow_hash <= share.header['bits'].target and abs(share.timestamp - time.time()) < 10*60:
724                             yield deferral.sleep(random.expovariate(1/60))
725                             message = '\x02%s BLOCK FOUND by %s! %s%064x' % (net.NAME.upper(), bitcoin_data.script2_to_address(share.new_script, net.PARENT), net.PARENT.BLOCK_EXPLORER_URL_PREFIX, share.header_hash)
726                             if message not in self.recent_messages:
727                                 self.say(self.channel, message)
728                                 self._remember_message(message)
729                     self.watch_id = tracker.verified.added.watch(new_share)
730                     self.recent_messages = []
731                 def _remember_message(self, message):
732                     self.recent_messages.append(message)
733                     while len(self.recent_messages) > 100:
734                         self.recent_messages.pop(0)
735                 def privmsg(self, user, channel, message):
736                     if channel == self.channel:
737                         self._remember_message(message)
738                 def connectionLost(self, reason):
739                     tracker.verified.added.unwatch(self.watch_id)
740                     print 'IRC connection lost:', reason.getErrorMessage()
741             class IRCClientFactory(protocol.ReconnectingClientFactory):
742                 protocol = IRCClient
743             reactor.connectTCP("irc.freenode.net", 6667, IRCClientFactory())
744         
745         @defer.inlineCallbacks
746         def status_thread():
747             last_str = None
748             last_time = 0
749             while True:
750                 yield deferral.sleep(3)
751                 try:
752                     if time.time() > current_work2.value['last_update'] + 60:
753                         print >>sys.stderr, '''---> LOST CONTACT WITH BITCOIND for %s! Check that it isn't frozen or dead! <---''' % (math.format_dt(time.time() - current_work2.value['last_update']),)
754                     
755                     height = tracker.get_height(current_work.value['best_share_hash'])
756                     this_str = 'P2Pool: %i shares in chain (%i verified/%i total) Peers: %i (%i incoming)' % (
757                         height,
758                         len(tracker.verified.shares),
759                         len(tracker.shares),
760                         len(p2p_node.peers),
761                         sum(1 for peer in p2p_node.peers.itervalues() if peer.incoming),
762                     ) + (' FDs: %i R/%i W' % (len(reactor.getReaders()), len(reactor.getWriters())) if p2pool.DEBUG else '')
763                     
764                     datums, dt = local_rate_monitor.get_datums_in_last()
765                     my_att_s = sum(datum['work']/dt for datum in datums)
766                     this_str += '\n Local: %sH/s in last %s Local dead on arrival: %s Expected time to share: %s' % (
767                         math.format(int(my_att_s)),
768                         math.format_dt(dt),
769                         math.format_binomial_conf(sum(1 for datum in datums if datum['dead']), len(datums), 0.95),
770                         math.format_dt(2**256 / tracker.shares[current_work.value['best_share_hash']].max_target / my_att_s) if my_att_s and current_work.value['best_share_hash'] else '???',
771                     )
772                     
773                     if height > 2:
774                         (stale_orphan_shares, stale_doa_shares), shares, _ = get_stale_counts()
775                         stale_prop = p2pool_data.get_average_stale_prop(tracker, current_work.value['best_share_hash'], min(720, height))
776                         real_att_s = p2pool_data.get_pool_attempts_per_second(tracker, current_work.value['best_share_hash'], min(height - 1, 720)) / (1 - stale_prop)
777                         
778                         this_str += '\n Shares: %i (%i orphan, %i dead) Stale rate: %s Efficiency: %s Current payout: %.4f %s' % (
779                             shares, stale_orphan_shares, stale_doa_shares,
780                             math.format_binomial_conf(stale_orphan_shares + stale_doa_shares, shares, 0.95),
781                             math.format_binomial_conf(stale_orphan_shares + stale_doa_shares, shares, 0.95, lambda x: (1 - x)/(1 - stale_prop)),
782                             get_current_txouts().get(bitcoin_data.pubkey_hash_to_script2(my_pubkey_hash), 0)*1e-8, net.PARENT.SYMBOL,
783                         )
784                         this_str += '\n Pool: %sH/s Stale rate: %.1f%% Expected time to block: %s' % (
785                             math.format(int(real_att_s)),
786                             100*stale_prop,
787                             math.format_dt(2**256 / current_work.value['bits'].target / real_att_s),
788                         )
789                         
790                         desired_version_counts = p2pool_data.get_desired_version_counts(tracker, current_work.value['best_share_hash'], min(720, height))
791                         majority_desired_version = max(desired_version_counts, key=lambda k: desired_version_counts[k])
792                         if majority_desired_version not in [0, 1]:
793                             print >>sys.stderr, '#'*40
794                             print >>sys.stderr, '>>> WARNING: A MAJORITY OF SHARES CONTAIN A VOTE FOR AN UNSUPPORTED SHARE IMPLEMENTATION! (v%i with %i%% support)' % (
795                                 majority_desired_version, 100*desired_version_counts[majority_desired_version]/sum(desired_version_counts.itervalues()))
796                             print >>sys.stderr, '>>> An upgrade is likely necessary. Check http://p2pool.forre.st/ for more information.'
797                             print >>sys.stderr, '#'*40
798                     
799                     if this_str != last_str or time.time() > last_time + 15:
800                         print this_str
801                         last_str = this_str
802                         last_time = time.time()
803                 except:
804                     log.err()
805         status_thread()
806     except:
807         reactor.stop()
808         log.err(None, 'Fatal error:')
809
810 def run():
811     class FixedArgumentParser(argparse.ArgumentParser):
812         def _read_args_from_files(self, arg_strings):
813             # expand arguments referencing files
814             new_arg_strings = []
815             for arg_string in arg_strings:
816                 
817                 # for regular arguments, just add them back into the list
818                 if not arg_string or arg_string[0] not in self.fromfile_prefix_chars:
819                     new_arg_strings.append(arg_string)
820                 
821                 # replace arguments referencing files with the file content
822                 else:
823                     try:
824                         args_file = open(arg_string[1:])
825                         try:
826                             arg_strings = []
827                             for arg_line in args_file.read().splitlines():
828                                 for arg in self.convert_arg_line_to_args(arg_line):
829                                     arg_strings.append(arg)
830                             arg_strings = self._read_args_from_files(arg_strings)
831                             new_arg_strings.extend(arg_strings)
832                         finally:
833                             args_file.close()
834                     except IOError:
835                         err = sys.exc_info()[1]
836                         self.error(str(err))
837             
838             # return the modified argument list
839             return new_arg_strings
840         
841         def convert_arg_line_to_args(self, arg_line):
842             return [arg for arg in arg_line.split() if arg.strip()]
843     
844     
845     realnets=dict((name, net) for name, net in networks.nets.iteritems() if '_testnet' not in name)
846     
847     parser = FixedArgumentParser(description='p2pool (version %s)' % (p2pool.__version__,), fromfile_prefix_chars='@')
848     parser.add_argument('--version', action='version', version=p2pool.__version__)
849     parser.add_argument('--net',
850         help='use specified network (default: bitcoin)',
851         action='store', choices=sorted(realnets), default='bitcoin', dest='net_name')
852     parser.add_argument('--testnet',
853         help='''use the network's testnet''',
854         action='store_const', const=True, default=False, dest='testnet')
855     parser.add_argument('--debug',
856         help='enable debugging mode',
857         action='store_const', const=True, default=False, dest='debug')
858     parser.add_argument('-a', '--address',
859         help='generate payouts to this address (default: <address requested from bitcoind>)',
860         type=str, action='store', default=None, dest='address')
861     parser.add_argument('--datadir',
862         help='store data in this directory (default: <directory run_p2pool.py is in>/data)',
863         type=str, action='store', default=None, dest='datadir')
864     parser.add_argument('--logfile',
865         help='''log to this file (default: data/<NET>/log)''',
866         type=str, action='store', default=None, dest='logfile')
867     parser.add_argument('--merged',
868         help='call getauxblock on this url to get work for merged mining (example: http://ncuser:ncpass@127.0.0.1:10332/)',
869         type=str, action='append', default=[], dest='merged_urls')
870     parser.add_argument('--give-author', metavar='DONATION_PERCENTAGE',
871         help='donate this percentage of work towards the development of p2pool (default: 0.5)',
872         type=float, action='store', default=0.5, dest='donation_percentage')
873     parser.add_argument('--irc-announce',
874         help='announce any blocks found on irc://irc.freenode.net/#p2pool',
875         action='store_true', default=False, dest='irc_announce')
876     
877     p2pool_group = parser.add_argument_group('p2pool interface')
878     p2pool_group.add_argument('--p2pool-port', metavar='PORT',
879         help='use port PORT to listen for connections (forward this port from your router!) (default: %s)' % ', '.join('%s:%i' % (name, net.P2P_PORT) for name, net in sorted(realnets.items())),
880         type=int, action='store', default=None, dest='p2pool_port')
881     p2pool_group.add_argument('-n', '--p2pool-node', metavar='ADDR[:PORT]',
882         help='connect to existing p2pool node at ADDR listening on port PORT (defaults to default p2pool P2P port) in addition to builtin addresses',
883         type=str, action='append', default=[], dest='p2pool_nodes')
884     parser.add_argument('--disable-upnp',
885         help='''don't attempt to use UPnP to forward p2pool's P2P port from the Internet to this computer''',
886         action='store_false', default=True, dest='upnp')
887     p2pool_group.add_argument('--max-conns', metavar='CONNS',
888         help='maximum incoming connections (default: 40)',
889         type=int, action='store', default=40, dest='p2pool_conns')
890     
891     worker_group = parser.add_argument_group('worker interface')
892     worker_group.add_argument('-w', '--worker-port', metavar='PORT or ADDR:PORT',
893         help='listen on PORT on interface with ADDR for RPC connections from miners (default: all interfaces, %s)' % ', '.join('%s:%i' % (name, net.WORKER_PORT) for name, net in sorted(realnets.items())),
894         type=str, action='store', default=None, dest='worker_endpoint')
895     worker_group.add_argument('-f', '--fee', metavar='FEE_PERCENTAGE',
896         help='''charge workers mining to their own bitcoin address (by setting their miner's username to a bitcoin address) this percentage fee to mine on your p2pool instance. Amount displayed at http://127.0.0.1:WORKER_PORT/fee (default: 0)''',
897         type=float, action='store', default=0, dest='worker_fee')
898     
899     bitcoind_group = parser.add_argument_group('bitcoind interface')
900     bitcoind_group.add_argument('--bitcoind-address', metavar='BITCOIND_ADDRESS',
901         help='connect to this address (default: 127.0.0.1)',
902         type=str, action='store', default='127.0.0.1', dest='bitcoind_address')
903     bitcoind_group.add_argument('--bitcoind-rpc-port', metavar='BITCOIND_RPC_PORT',
904         help='''connect to JSON-RPC interface at this port (default: %s <read from bitcoin.conf if password not provided>)''' % ', '.join('%s:%i' % (name, net.PARENT.RPC_PORT) for name, net in sorted(realnets.items())),
905         type=int, action='store', default=None, dest='bitcoind_rpc_port')
906     bitcoind_group.add_argument('--bitcoind-p2p-port', metavar='BITCOIND_P2P_PORT',
907         help='''connect to P2P interface at this port (default: %s <read from bitcoin.conf if password not provided>)''' % ', '.join('%s:%i' % (name, net.PARENT.P2P_PORT) for name, net in sorted(realnets.items())),
908         type=int, action='store', default=None, dest='bitcoind_p2p_port')
909     
910     bitcoind_group.add_argument(metavar='BITCOIND_RPCUSERPASS',
911         help='bitcoind RPC interface username, then password, space-separated (only one being provided will cause the username to default to being empty, and none will cause P2Pool to read them from bitcoin.conf)',
912         type=str, action='store', default=[], nargs='*', dest='bitcoind_rpc_userpass')
913     
914     args = parser.parse_args()
915     
916     if args.debug:
917         p2pool.DEBUG = True
918     
919     net_name = args.net_name + ('_testnet' if args.testnet else '')
920     net = networks.nets[net_name]
921     
922     datadir_path = os.path.join((os.path.join(os.path.dirname(sys.argv[0]), 'data') if args.datadir is None else args.datadir), net_name)
923     if not os.path.exists(datadir_path):
924         os.makedirs(datadir_path)
925     
926     if len(args.bitcoind_rpc_userpass) > 2:
927         parser.error('a maximum of two arguments are allowed')
928     args.bitcoind_rpc_username, args.bitcoind_rpc_password = ([None, None] + args.bitcoind_rpc_userpass)[-2:]
929     
930     if args.bitcoind_rpc_password is None:
931         if not hasattr(net.PARENT, 'CONF_FILE_FUNC'):
932             parser.error('This network has no configuration file function. Manually enter your RPC password.')
933         conf_path = net.PARENT.CONF_FILE_FUNC()
934         if not os.path.exists(conf_path):
935             parser.error('''Bitcoin configuration file not found. Manually enter your RPC password.\r\n'''
936                 '''If you actually haven't created a configuration file, you should create one at %s with the text:\r\n'''
937                 '''\r\n'''
938                 '''server=1\r\n'''
939                 '''rpcpassword=%x''' % (conf_path, random.randrange(2**128)))
940         with open(conf_path, 'rb') as f:
941             cp = ConfigParser.RawConfigParser()
942             cp.readfp(StringIO.StringIO('[x]\r\n' + f.read()))
943             for conf_name, var_name, var_type in [
944                 ('rpcuser', 'bitcoind_rpc_username', str),
945                 ('rpcpassword', 'bitcoind_rpc_password', str),
946                 ('rpcport', 'bitcoind_rpc_port', int),
947                 ('port', 'bitcoind_p2p_port', int),
948             ]:
949                 if getattr(args, var_name) is None and cp.has_option('x', conf_name):
950                     setattr(args, var_name, var_type(cp.get('x', conf_name)))
951         if args.bitcoind_rpc_password is None:
952             parser.error('''Bitcoin configuration file didn't contain an rpcpassword= line! Add one!''')
953     
954     if args.bitcoind_rpc_username is None:
955         args.bitcoind_rpc_username = ''
956     
957     if args.bitcoind_rpc_port is None:
958         args.bitcoind_rpc_port = net.PARENT.RPC_PORT
959     
960     if args.bitcoind_p2p_port is None:
961         args.bitcoind_p2p_port = net.PARENT.P2P_PORT
962     
963     if args.p2pool_port is None:
964         args.p2pool_port = net.P2P_PORT
965     
966     if args.worker_endpoint is None:
967         worker_endpoint = '', net.WORKER_PORT
968     elif ':' not in args.worker_endpoint:
969         worker_endpoint = '', int(args.worker_endpoint)
970     else:
971         addr, port = args.worker_endpoint.rsplit(':', 1)
972         worker_endpoint = addr, int(port)
973     
974     if args.address is not None:
975         try:
976             args.pubkey_hash = bitcoin_data.address_to_pubkey_hash(args.address, net.PARENT)
977         except Exception, e:
978             parser.error('error parsing address: ' + repr(e))
979     else:
980         args.pubkey_hash = None
981     
982     def separate_url(url):
983         s = urlparse.urlsplit(url)
984         if '@' not in s.netloc:
985             parser.error('merged url netloc must contain an "@"')
986         userpass, new_netloc = s.netloc.rsplit('@', 1)
987         return urlparse.urlunsplit(s._replace(netloc=new_netloc)), userpass
988     merged_urls = map(separate_url, args.merged_urls)
989     
990     if args.logfile is None:
991         args.logfile = os.path.join(datadir_path, 'log')
992     
993     logfile = logging.LogFile(args.logfile)
994     pipe = logging.TimestampingPipe(logging.TeePipe([logging.EncodeReplacerPipe(sys.stderr), logfile]))
995     sys.stdout = logging.AbortPipe(pipe)
996     sys.stderr = log.DefaultObserver.stderr = logging.AbortPipe(logging.PrefixPipe(pipe, '> '))
997     if hasattr(signal, "SIGUSR1"):
998         def sigusr1(signum, frame):
999             print 'Caught SIGUSR1, closing %r...' % (args.logfile,)
1000             logfile.reopen()
1001             print '...and reopened %r after catching SIGUSR1.' % (args.logfile,)
1002         signal.signal(signal.SIGUSR1, sigusr1)
1003     task.LoopingCall(logfile.reopen).start(5)
1004     
1005     reactor.callWhenRunning(main, args, net, datadir_path, merged_urls, worker_endpoint)
1006     reactor.run()