fixup! added special "p2pool" MM target to match P2Pool share target
[p2pool.git] / p2pool / main.py
1 from __future__ import division
2
3 import ConfigParser
4 import StringIO
5 import argparse
6 import base64
7 import os
8 import random
9 import sys
10 import time
11 import signal
12 import traceback
13 import urlparse
14
15 from twisted.internet import defer, reactor, protocol, task
16 from twisted.web import server
17 from twisted.python import log
18 from nattraverso import portmapper, ipdiscover
19
20 import bitcoin.p2p as bitcoin_p2p, bitcoin.getwork as bitcoin_getwork, bitcoin.data as bitcoin_data
21 from bitcoin import worker_interface, height_tracker
22 from util import expiring_dict, jsonrpc, variable, deferral, math, logging, pack
23 from . import p2p, networks, web
24 import p2pool, p2pool.data as p2pool_data
25
26 @deferral.retry('Error getting work from bitcoind:', 3)
27 @defer.inlineCallbacks
28 def getwork(bitcoind):
29     try:
30         work = yield bitcoind.rpc_getmemorypool()
31     except jsonrpc.Error, e:
32         if e.code == -32601: # Method not found
33             print >>sys.stderr, 'Error: Bitcoin version too old! Upgrade to v0.5 or newer!'
34             raise deferral.RetrySilentlyException()
35         raise
36     packed_transactions = [x.decode('hex') for x in work['transactions']]
37     defer.returnValue(dict(
38         version=work['version'],
39         previous_block_hash=int(work['previousblockhash'], 16),
40         transactions=map(bitcoin_data.tx_type.unpack, packed_transactions),
41         merkle_link=bitcoin_data.calculate_merkle_link([0] + map(bitcoin_data.hash256, packed_transactions), 0), # using 0 is a bit of a hack, but will always work when index=0
42         subsidy=work['coinbasevalue'],
43         time=work['time'],
44         bits=bitcoin_data.FloatingIntegerType().unpack(work['bits'].decode('hex')[::-1]) if isinstance(work['bits'], (str, unicode)) else bitcoin_data.FloatingInteger(work['bits']),
45         coinbaseflags=work['coinbaseflags'].decode('hex') if 'coinbaseflags' in work else ''.join(x.decode('hex') for x in work['coinbaseaux'].itervalues()) if 'coinbaseaux' in work else '',
46     ))
47
48 @defer.inlineCallbacks
49 def main(args, net, datadir_path, merged_urls, worker_endpoint):
50     try:
51         print 'p2pool (version %s)' % (p2pool.__version__,)
52         print
53         
54         # connect to bitcoind over JSON-RPC and do initial getmemorypool
55         url = 'http://%s:%i/' % (args.bitcoind_address, args.bitcoind_rpc_port)
56         print '''Testing bitcoind RPC connection to '%s' with username '%s'...''' % (url, args.bitcoind_rpc_username)
57         bitcoind = jsonrpc.Proxy(url, dict(Authorization='Basic ' + base64.b64encode(args.bitcoind_rpc_username + ':' + args.bitcoind_rpc_password)), timeout=30)
58         @deferral.retry('Error while checking Bitcoin connection:', 1)
59         @defer.inlineCallbacks
60         def check():
61             if not (yield net.PARENT.RPC_CHECK)(bitcoind):
62                 print >>sys.stderr, "    Check failed! Make sure that you're connected to the right bitcoind with --bitcoind-rpc-port!"
63                 raise deferral.RetrySilentlyException()
64             temp_work = yield getwork(bitcoind)
65             if not net.VERSION_CHECK((yield bitcoind.rpc_getinfo())['version'], temp_work):
66                 print >>sys.stderr, '    Bitcoin version too old! BIP16 support required! Upgrade to 0.6.0rc4 or greater!'
67                 raise deferral.RetrySilentlyException()
68             defer.returnValue(temp_work)
69         temp_work = yield check()
70         print '    ...success!'
71         print '    Current block hash: %x' % (temp_work['previous_block_hash'],)
72         print
73         
74         # connect to bitcoind over bitcoin-p2p
75         print '''Testing bitcoind P2P connection to '%s:%s'...''' % (args.bitcoind_address, args.bitcoind_p2p_port)
76         factory = bitcoin_p2p.ClientFactory(net.PARENT)
77         reactor.connectTCP(args.bitcoind_address, args.bitcoind_p2p_port, factory)
78         yield factory.getProtocol() # waits until handshake is successful
79         print '    ...success!'
80         print
81         
82         print 'Determining payout address...'
83         if args.pubkey_hash is None:
84             address_path = os.path.join(datadir_path, 'cached_payout_address')
85             
86             if os.path.exists(address_path):
87                 with open(address_path, 'rb') as f:
88                     address = f.read().strip('\r\n')
89                 print '    Loaded cached address: %s...' % (address,)
90             else:
91                 address = None
92             
93             if address is not None:
94                 res = yield deferral.retry('Error validating cached address:', 5)(lambda: bitcoind.rpc_validateaddress(address))()
95                 if not res['isvalid'] or not res['ismine']:
96                     print '    Cached address is either invalid or not controlled by local bitcoind!'
97                     address = None
98             
99             if address is None:
100                 print '    Getting payout address from bitcoind...'
101                 address = yield deferral.retry('Error getting payout address from bitcoind:', 5)(lambda: bitcoind.rpc_getaccountaddress('p2pool'))()
102             
103             with open(address_path, 'wb') as f:
104                 f.write(address)
105             
106             my_pubkey_hash = bitcoin_data.address_to_pubkey_hash(address, net.PARENT)
107         else:
108             my_pubkey_hash = args.pubkey_hash
109         print '    ...success! Payout address:', bitcoin_data.pubkey_hash_to_address(my_pubkey_hash, net.PARENT)
110         print
111         
112         my_share_hashes = set()
113         my_doa_share_hashes = set()
114         
115         tracker = p2pool_data.OkayTracker(net, my_share_hashes, my_doa_share_hashes)
116         shared_share_hashes = set()
117         ss = p2pool_data.ShareStore(os.path.join(datadir_path, 'shares.'), net)
118         known_verified = set()
119         recent_blocks = []
120         print "Loading shares..."
121         for i, (mode, contents) in enumerate(ss.get_shares()):
122             if mode == 'share':
123                 if contents.hash in tracker.shares:
124                     continue
125                 shared_share_hashes.add(contents.hash)
126                 contents.time_seen = 0
127                 tracker.add(contents)
128                 if len(tracker.shares) % 1000 == 0 and tracker.shares:
129                     print "    %i" % (len(tracker.shares),)
130             elif mode == 'verified_hash':
131                 known_verified.add(contents)
132             else:
133                 raise AssertionError()
134         print "    ...inserting %i verified shares..." % (len(known_verified),)
135         for h in known_verified:
136             if h not in tracker.shares:
137                 ss.forget_verified_share(h)
138                 continue
139             tracker.verified.add(tracker.shares[h])
140         print "    ...done loading %i shares!" % (len(tracker.shares),)
141         print
142         tracker.removed.watch(lambda share: ss.forget_share(share.hash))
143         tracker.verified.removed.watch(lambda share: ss.forget_verified_share(share.hash))
144         tracker.removed.watch(lambda share: shared_share_hashes.discard(share.hash))
145         
146         peer_heads = expiring_dict.ExpiringDict(300) # hash -> peers that know of it
147         
148         pre_current_work = variable.Variable(None)
149         pre_merged_work = variable.Variable({})
150         # information affecting work that should trigger a long-polling update
151         current_work = variable.Variable(None)
152         # information affecting work that should not trigger a long-polling update
153         current_work2 = variable.Variable(None)
154         
155         requested = expiring_dict.ExpiringDict(300)
156         
157         print 'Initializing work...'
158         @defer.inlineCallbacks
159         def set_real_work1():
160             work = yield getwork(bitcoind)
161             current_work2.set(dict(
162                 time=work['time'],
163                 transactions=work['transactions'],
164                 merkle_link=work['merkle_link'],
165                 subsidy=work['subsidy'],
166                 clock_offset=time.time() - work['time'],
167                 last_update=time.time(),
168             )) # second set first because everything hooks on the first
169             pre_current_work.set(dict(
170                 version=work['version'],
171                 previous_block=work['previous_block_hash'],
172                 bits=work['bits'],
173                 coinbaseflags=work['coinbaseflags'],
174             ))
175         yield set_real_work1()
176         
177         get_height_rel_highest = yield height_tracker.get_height_rel_highest_func(bitcoind, factory, lambda: pre_current_work.value['previous_block'], net)
178         
179         def set_real_work2():
180             best, desired = tracker.think(get_height_rel_highest, pre_current_work.value['previous_block'], pre_current_work.value['bits'])
181             
182             t = dict(pre_current_work.value)
183             t['best_share_hash'] = best
184             t['mm_chains'] = pre_merged_work.value
185             current_work.set(t)
186             
187             t = time.time()
188             for peer2, share_hash in desired:
189                 if share_hash not in tracker.tails: # was received in the time tracker.think was running
190                     continue
191                 last_request_time, count = requested.get(share_hash, (None, 0))
192                 if last_request_time is not None and last_request_time - 5 < t < last_request_time + 10 * 1.5**count:
193                     continue
194                 potential_peers = set()
195                 for head in tracker.tails[share_hash]:
196                     potential_peers.update(peer_heads.get(head, set()))
197                 potential_peers = [peer for peer in potential_peers if peer.connected2]
198                 if count == 0 and peer2 is not None and peer2.connected2:
199                     peer = peer2
200                 else:
201                     peer = random.choice(potential_peers) if potential_peers and random.random() > .2 else peer2
202                     if peer is None:
203                         continue
204                 
205                 print 'Requesting parent share %s from %s' % (p2pool_data.format_hash(share_hash), '%s:%i' % peer.addr)
206                 peer.send_getshares(
207                     hashes=[share_hash],
208                     parents=2000,
209                     stops=list(set(tracker.heads) | set(
210                         tracker.get_nth_parent_hash(head, min(max(0, tracker.get_height_and_last(head)[0] - 1), 10)) for head in tracker.heads
211                     ))[:100],
212                 )
213                 requested[share_hash] = t, count + 1
214         pre_current_work.changed.watch(lambda _: set_real_work2())
215         pre_merged_work.changed.watch(lambda _: set_real_work2())
216         set_real_work2()
217         print '    ...success!'
218         print
219         
220         
221         @defer.inlineCallbacks
222         def set_merged_work(merged_url, merged_userpass):
223             merged_proxy = jsonrpc.Proxy(merged_url, dict(Authorization='Basic ' + base64.b64encode(merged_userpass)))
224             while True:
225                 auxblock = yield deferral.retry('Error while calling merged getauxblock:', 1)(merged_proxy.rpc_getauxblock)()
226                 pre_merged_work.set(dict(pre_merged_work.value, **{auxblock['chainid']: dict(
227                     hash=int(auxblock['hash'], 16),
228                     target='p2pool' if auxblock['target'] == 'p2pool' else pack.IntType(256).unpack(auxblock['target'].decode('hex')),
229                     merged_proxy=merged_proxy,
230                 )}))
231                 yield deferral.sleep(1)
232         for merged_url, merged_userpass in merged_urls:
233             set_merged_work(merged_url, merged_userpass)
234         
235         @pre_merged_work.changed.watch
236         def _(new_merged_work):
237             print 'Got new merged mining work!'
238         
239         # setup p2p logic and join p2pool network
240         
241         class Node(p2p.Node):
242             def handle_shares(self, shares, peer):
243                 if len(shares) > 5:
244                     print 'Processing %i shares from %s...' % (len(shares), '%s:%i' % peer.addr if peer is not None else None)
245                 
246                 new_count = 0
247                 for share in shares:
248                     if share.hash in tracker.shares:
249                         #print 'Got duplicate share, ignoring. Hash: %s' % (p2pool_data.format_hash(share.hash),)
250                         continue
251                     
252                     new_count += 1
253                     
254                     #print 'Received share %s from %r' % (p2pool_data.format_hash(share.hash), share.peer.addr if share.peer is not None else None)
255                     
256                     tracker.add(share)
257                 
258                 if shares and peer is not None:
259                     peer_heads.setdefault(shares[0].hash, set()).add(peer)
260                 
261                 if new_count:
262                     set_real_work2()
263                 
264                 if len(shares) > 5:
265                     print '... done processing %i shares. New: %i Have: %i/~%i' % (len(shares), new_count, len(tracker.shares), 2*net.CHAIN_LENGTH)
266             
267             def handle_share_hashes(self, hashes, peer):
268                 t = time.time()
269                 get_hashes = []
270                 for share_hash in hashes:
271                     if share_hash in tracker.shares:
272                         continue
273                     last_request_time, count = requested.get(share_hash, (None, 0))
274                     if last_request_time is not None and last_request_time - 5 < t < last_request_time + 10 * 1.5**count:
275                         continue
276                     print 'Got share hash, requesting! Hash: %s' % (p2pool_data.format_hash(share_hash),)
277                     get_hashes.append(share_hash)
278                     requested[share_hash] = t, count + 1
279                 
280                 if hashes and peer is not None:
281                     peer_heads.setdefault(hashes[0], set()).add(peer)
282                 if get_hashes:
283                     peer.send_getshares(hashes=get_hashes, parents=0, stops=[])
284             
285             def handle_get_shares(self, hashes, parents, stops, peer):
286                 parents = min(parents, 1000//len(hashes))
287                 stops = set(stops)
288                 shares = []
289                 for share_hash in hashes:
290                     for share in tracker.get_chain(share_hash, min(parents + 1, tracker.get_height(share_hash))):
291                         if share.hash in stops:
292                             break
293                         shares.append(share)
294                 print 'Sending %i shares to %s:%i' % (len(shares), peer.addr[0], peer.addr[1])
295                 return shares
296         
297         @deferral.retry('Error submitting block: (will retry)', 10, 10)
298         @defer.inlineCallbacks
299         def submit_block(block, ignore_failure):
300             success = yield bitcoind.rpc_getmemorypool(bitcoin_data.block_type.pack(block).encode('hex'))
301             success_expected = net.PARENT.POW_FUNC(bitcoin_data.block_header_type.pack(block['header'])) <= block['header']['bits'].target
302             if (not success and success_expected and not ignore_failure) or (success and not success_expected):
303                 print >>sys.stderr, 'Block submittal result: %s Expected: %s' % (result, expected_result)
304         
305         @tracker.verified.added.watch
306         def _(share):
307             if share.pow_hash <= share.header['bits'].target:
308                 submit_block(share.as_block(tracker), ignore_failure=True)
309                 print
310                 print 'GOT BLOCK FROM PEER! Passing to bitcoind! %s bitcoin: %s%064x' % (p2pool_data.format_hash(share.hash), net.PARENT.BLOCK_EXPLORER_URL_PREFIX, share.header_hash)
311                 print
312                 recent_blocks.append(dict(ts=share.timestamp, hash='%064x' % (share.header_hash,)))
313         
314         print 'Joining p2pool network using port %i...' % (args.p2pool_port,)
315         
316         @defer.inlineCallbacks
317         def parse(x):
318             if ':' in x:
319                 ip, port = x.split(':')
320                 defer.returnValue(((yield reactor.resolve(ip)), int(port)))
321             else:
322                 defer.returnValue(((yield reactor.resolve(x)), net.P2P_PORT))
323         
324         addrs = {}
325         if os.path.exists(os.path.join(datadir_path, 'addrs.txt')):
326             try:
327                 addrs.update(dict(eval(x) for x in open(os.path.join(datadir_path, 'addrs.txt'))))
328             except:
329                 print >>sys.stderr, "error reading addrs"
330         for addr_df in map(parse, net.BOOTSTRAP_ADDRS):
331             try:
332                 addr = yield addr_df
333                 if addr not in addrs:
334                     addrs[addr] = (0, time.time(), time.time())
335             except:
336                 log.err()
337         
338         connect_addrs = set()
339         for addr_df in map(parse, args.p2pool_nodes):
340             try:
341                 connect_addrs.add((yield addr_df))
342             except:
343                 log.err()
344         
345         p2p_node = Node(
346             best_share_hash_func=lambda: current_work.value['best_share_hash'],
347             port=args.p2pool_port,
348             net=net,
349             addr_store=addrs,
350             connect_addrs=connect_addrs,
351             max_incoming_conns=args.p2pool_conns,
352         )
353         p2p_node.start()
354         
355         task.LoopingCall(lambda: open(os.path.join(datadir_path, 'addrs.txt'), 'w').writelines(repr(x) + '\n' for x in p2p_node.addr_store.iteritems())).start(60)
356         
357         # send share when the chain changes to their chain
358         def work_changed(new_work):
359             #print 'Work changed:', new_work
360             shares = []
361             for share in tracker.get_chain(new_work['best_share_hash'], min(5, tracker.get_height(new_work['best_share_hash']))):
362                 if share.hash in shared_share_hashes:
363                     break
364                 shared_share_hashes.add(share.hash)
365                 shares.append(share)
366             
367             for peer in p2p_node.peers.itervalues():
368                 peer.sendShares([share for share in shares if share.peer is not peer])
369         
370         current_work.changed.watch(work_changed)
371         
372         def save_shares():
373             for share in tracker.get_chain(current_work.value['best_share_hash'], min(tracker.get_height(current_work.value['best_share_hash']), 2*net.CHAIN_LENGTH)):
374                 ss.add_share(share)
375                 if share.hash in tracker.verified.shares:
376                     ss.add_verified_hash(share.hash)
377         task.LoopingCall(save_shares).start(60)
378         
379         print '    ...success!'
380         print
381         
382         if args.upnp:
383             @defer.inlineCallbacks
384             def upnp_thread():
385                 while True:
386                     try:
387                         is_lan, lan_ip = yield ipdiscover.get_local_ip()
388                         if is_lan:
389                             pm = yield portmapper.get_port_mapper()
390                             yield pm._upnp.add_port_mapping(lan_ip, args.p2pool_port, args.p2pool_port, 'p2pool', 'TCP')
391                     except defer.TimeoutError:
392                         pass
393                     except:
394                         if p2pool.DEBUG:
395                             log.err(None, 'UPnP error:')
396                     yield deferral.sleep(random.expovariate(1/120))
397             upnp_thread()
398         
399         # start listening for workers with a JSON-RPC server
400         
401         print 'Listening for workers on %r port %i...' % (worker_endpoint[0], worker_endpoint[1])
402         
403         # setup worker logic
404         
405         removed_unstales_var = variable.Variable((0, 0, 0))
406         removed_doa_unstales_var = variable.Variable(0)
407         @tracker.verified.removed.watch
408         def _(share):
409             if share.hash in my_share_hashes and tracker.is_child_of(share.hash, current_work.value['best_share_hash']):
410                 assert share.share_data['stale_info'] in [0, 253, 254] # we made these shares in this instance
411                 removed_unstales_var.set((
412                     removed_unstales_var.value[0] + 1,
413                     removed_unstales_var.value[1] + (1 if share.share_data['stale_info'] == 253 else 0),
414                     removed_unstales_var.value[2] + (1 if share.share_data['stale_info'] == 254 else 0),
415                 ))
416             if share.hash in my_doa_share_hashes and tracker.is_child_of(share.hash, current_work.value['best_share_hash']):
417                 removed_doa_unstales.set(removed_doa_unstales.value + 1)
418         
419         def get_stale_counts():
420             '''Returns (orphans, doas), total, (orphans_recorded_in_chain, doas_recorded_in_chain)'''
421             my_shares = len(my_share_hashes)
422             my_doa_shares = len(my_doa_share_hashes)
423             delta = tracker.verified.get_delta(current_work.value['best_share_hash'])
424             my_shares_in_chain = delta.my_count + removed_unstales_var.value[0]
425             my_doa_shares_in_chain = delta.my_doa_count + removed_doa_unstales_var.value
426             orphans_recorded_in_chain = delta.my_orphan_announce_count + removed_unstales_var.value[1]
427             doas_recorded_in_chain = delta.my_dead_announce_count + removed_unstales_var.value[2]
428             
429             my_shares_not_in_chain = my_shares - my_shares_in_chain
430             my_doa_shares_not_in_chain = my_doa_shares - my_doa_shares_in_chain
431             
432             return (my_shares_not_in_chain - my_doa_shares_not_in_chain, my_doa_shares_not_in_chain), my_shares, (orphans_recorded_in_chain, doas_recorded_in_chain)
433         
434         
435         pseudoshare_received = variable.Event()
436         share_received = variable.Event()
437         local_rate_monitor = math.RateMonitor(10*60)
438         
439         class WorkerBridge(worker_interface.WorkerBridge):
440             def __init__(self):
441                 worker_interface.WorkerBridge.__init__(self)
442                 self.new_work_event = current_work.changed
443                 self.recent_shares_ts_work = []
444             
445             def preprocess_request(self, request):
446                 user = request.getUser() if request.getUser() is not None else ''
447                 
448                 desired_pseudoshare_target = None
449                 if '+' in user:
450                     user, desired_pseudoshare_difficulty_str = user.rsplit('+', 1)
451                     try:
452                         desired_pseudoshare_target = bitcoin_data.difficulty_to_target(float(desired_pseudoshare_difficulty_str))
453                     except:
454                         pass
455                 
456                 desired_share_target = 2**256 - 1
457                 if '/' in user:
458                     user, min_diff_str = user.rsplit('/', 1)
459                     try:
460                         desired_share_target = bitcoin_data.difficulty_to_target(float(min_diff_str))
461                     except:
462                         pass
463                 
464                 if random.uniform(0, 100) < args.worker_fee:
465                     pubkey_hash = my_pubkey_hash
466                 else:
467                     try:
468                         pubkey_hash = bitcoin_data.address_to_pubkey_hash(user, net.PARENT)
469                     except: # XXX blah
470                         pubkey_hash = my_pubkey_hash
471                 
472                 return pubkey_hash, desired_share_target, desired_pseudoshare_target
473             
474             def get_work(self, pubkey_hash, desired_share_target, desired_pseudoshare_target):
475                 if len(p2p_node.peers) == 0 and net.PERSIST:
476                     raise jsonrpc.Error(-12345, u'p2pool is not connected to any peers')
477                 if current_work.value['best_share_hash'] is None and net.PERSIST:
478                     raise jsonrpc.Error(-12345, u'p2pool is downloading shares')
479                 if time.time() > current_work2.value['last_update'] + 60:
480                     raise jsonrpc.Error(-12345, u'lost contact with bitcoind')
481                 
482                 if current_work.value['mm_chains']:
483                     tree, size = bitcoin_data.make_auxpow_tree(current_work.value['mm_chains'])
484                     mm_hashes = [current_work.value['mm_chains'].get(tree.get(i), dict(hash=0))['hash'] for i in xrange(size)]
485                     mm_data = '\xfa\xbemm' + bitcoin_data.aux_pow_coinbase_type.pack(dict(
486                         merkle_root=bitcoin_data.merkle_hash(mm_hashes),
487                         size=size,
488                         nonce=0,
489                     ))
490                     mm_later = [(aux_work, mm_hashes.index(aux_work['hash']), mm_hashes) for chain_id, aux_work in current_work.value['mm_chains'].iteritems()]
491                 else:
492                     mm_data = ''
493                     mm_later = []
494                 
495                 if True:
496                     share_info, generate_tx = p2pool_data.Share.generate_transaction(
497                         tracker=tracker,
498                         share_data=dict(
499                             previous_share_hash=current_work.value['best_share_hash'],
500                             coinbase=(mm_data + current_work.value['coinbaseflags'])[:100],
501                             nonce=random.randrange(2**32),
502                             pubkey_hash=pubkey_hash,
503                             subsidy=current_work2.value['subsidy'],
504                             donation=math.perfect_round(65535*args.donation_percentage/100),
505                             stale_info=(lambda (orphans, doas), total, (orphans_recorded_in_chain, doas_recorded_in_chain):
506                                 253 if orphans > orphans_recorded_in_chain else
507                                 254 if doas > doas_recorded_in_chain else
508                                 0
509                             )(*get_stale_counts()),
510                             desired_version=1,
511                         ),
512                         block_target=current_work.value['bits'].target,
513                         desired_timestamp=int(time.time() - current_work2.value['clock_offset']),
514                         desired_target=desired_share_target,
515                         ref_merkle_link=dict(branch=[], index=0),
516                         net=net,
517                     )
518                 
519                 mm_later = [(dict(aux_work, target=aux_work['target'] if aux_work['target'] != 'p2pool' else share_info['bits'].target), index, hashes) for aux_work, index, hashes in mm_later]
520                 
521                 target = net.PARENT.SANE_MAX_TARGET
522                 if desired_pseudoshare_target is None:
523                     if len(self.recent_shares_ts_work) == 50:
524                         hash_rate = sum(work for ts, work in self.recent_shares_ts_work[1:])//(self.recent_shares_ts_work[-1][0] - self.recent_shares_ts_work[0][0])
525                         target = min(target, 2**256//hash_rate)
526                 else:
527                     target = min(target, desired_pseudoshare_target)
528                 target = max(target, share_info['bits'].target)
529                 for aux_work, index, hashes in mm_later:
530                     target = max(target, aux_work['target'])
531                 
532                 transactions = [generate_tx] + list(current_work2.value['transactions'])
533                 packed_generate_tx = bitcoin_data.tx_type.pack(generate_tx)
534                 merkle_root = bitcoin_data.check_merkle_link(bitcoin_data.hash256(packed_generate_tx), current_work2.value['merkle_link'])
535                 
536                 getwork_time = time.time()
537                 merkle_link = current_work2.value['merkle_link']
538                 
539                 print 'New work for worker! Difficulty: %.06f Share difficulty: %.06f Total block value: %.6f %s including %i transactions' % (
540                     bitcoin_data.target_to_difficulty(target),
541                     bitcoin_data.target_to_difficulty(share_info['bits'].target),
542                     current_work2.value['subsidy']*1e-8, net.PARENT.SYMBOL,
543                     len(current_work2.value['transactions']),
544                 )
545                 
546                 ba = bitcoin_getwork.BlockAttempt(
547                     version=current_work.value['version'],
548                     previous_block=current_work.value['previous_block'],
549                     merkle_root=merkle_root,
550                     timestamp=current_work2.value['time'],
551                     bits=current_work.value['bits'],
552                     share_target=target,
553                 )
554                 
555                 received_header_hashes = set()
556                 
557                 def got_response(header, request):
558                     assert header['merkle_root'] == merkle_root
559                     
560                     header_hash = bitcoin_data.hash256(bitcoin_data.block_header_type.pack(header))
561                     pow_hash = net.PARENT.POW_FUNC(bitcoin_data.block_header_type.pack(header))
562                     on_time = current_work.value['best_share_hash'] == share_info['share_data']['previous_share_hash']
563                     
564                     try:
565                         if pow_hash <= header['bits'].target or p2pool.DEBUG:
566                             submit_block(dict(header=header, txs=transactions), ignore_failure=False)
567                             if pow_hash <= header['bits'].target:
568                                 print
569                                 print 'GOT BLOCK FROM MINER! Passing to bitcoind! %s%064x' % (net.PARENT.BLOCK_EXPLORER_URL_PREFIX, header_hash)
570                                 print
571                                 recent_blocks.append(dict(ts=time.time(), hash='%064x' % (header_hash,)))
572                     except:
573                         log.err(None, 'Error while processing potential block:')
574                     
575                     for aux_work, index, hashes in mm_later:
576                         try:
577                             if pow_hash <= aux_work['target'] or p2pool.DEBUG:
578                                 df = deferral.retry('Error submitting merged block: (will retry)', 10, 10)(aux_work['merged_proxy'].rpc_getauxblock)(
579                                     pack.IntType(256, 'big').pack(aux_work['hash']).encode('hex'),
580                                     bitcoin_data.aux_pow_type.pack(dict(
581                                         merkle_tx=dict(
582                                             tx=transactions[0],
583                                             block_hash=header_hash,
584                                             merkle_link=merkle_link,
585                                         ),
586                                         merkle_link=bitcoin_data.calculate_merkle_link(hashes, index),
587                                         parent_block_header=header,
588                                     )).encode('hex'),
589                                 )
590                                 @df.addCallback
591                                 def _(result):
592                                     if result != (pow_hash <= aux_work['target']):
593                                         print >>sys.stderr, 'Merged block submittal result: %s Expected: %s' % (result, pow_hash <= aux_work['target'])
594                                     else:
595                                         print 'Merged block submittal result: %s' % (result,)
596                                 @df.addErrback
597                                 def _(err):
598                                     log.err(err, 'Error submitting merged block:')
599                         except:
600                             log.err(None, 'Error while processing merged mining POW:')
601                     
602                     if pow_hash <= share_info['bits'].target and header_hash not in received_header_hashes:
603                         min_header = dict(header);del min_header['merkle_root']
604                         hash_link = p2pool_data.prefix_to_hash_link(packed_generate_tx[:-32-4], p2pool_data.Share.gentx_before_refhash)
605                         share = p2pool_data.Share(net, None, dict(
606                             min_header=min_header, share_info=share_info, hash_link=hash_link,
607                             ref_merkle_link=dict(branch=[], index=0),
608                         ), merkle_link=merkle_link, other_txs=transactions[1:] if pow_hash <= header['bits'].target else None)
609                         
610                         print 'GOT SHARE! %s %s prev %s age %.2fs%s' % (
611                             request.getUser(),
612                             p2pool_data.format_hash(share.hash),
613                             p2pool_data.format_hash(share.previous_hash),
614                             time.time() - getwork_time,
615                             ' DEAD ON ARRIVAL' if not on_time else '',
616                         )
617                         my_share_hashes.add(share.hash)
618                         if not on_time:
619                             my_doa_share_hashes.add(share.hash)
620                         
621                         tracker.add(share)
622                         if not p2pool.DEBUG:
623                             tracker.verified.add(share)
624                         set_real_work2()
625                         
626                         try:
627                             if pow_hash <= header['bits'].target or p2pool.DEBUG:
628                                 for peer in p2p_node.peers.itervalues():
629                                     peer.sendShares([share])
630                                 shared_share_hashes.add(share.hash)
631                         except:
632                             log.err(None, 'Error forwarding block solution:')
633                         
634                         share_received.happened(bitcoin_data.target_to_average_attempts(share.target), not on_time)
635                     
636                     if pow_hash > target:
637                         print 'Worker %s submitted share with hash > target:' % (request.getUser(),)
638                         print '    Hash:   %56x' % (pow_hash,)
639                         print '    Target: %56x' % (target,)
640                     elif header_hash in received_header_hashes:
641                         print >>sys.stderr, 'Worker %s @ %s submitted share more than once!' % (request.getUser(), request.getClientIP())
642                     else:
643                         received_header_hashes.add(header_hash)
644                         
645                         pseudoshare_received.happened(bitcoin_data.target_to_average_attempts(target), not on_time, request.getUser())
646                         self.recent_shares_ts_work.append((time.time(), bitcoin_data.target_to_average_attempts(target)))
647                         while len(self.recent_shares_ts_work) > 50:
648                             self.recent_shares_ts_work.pop(0)
649                         local_rate_monitor.add_datum(dict(work=bitcoin_data.target_to_average_attempts(target), dead=not on_time, user=request.getUser()))
650                     
651                     return on_time
652                 
653                 return ba, got_response
654         
655         get_current_txouts = lambda: p2pool_data.get_expected_payouts(tracker, current_work.value['best_share_hash'], current_work.value['bits'].target, current_work2.value['subsidy'], net)
656         
657         web_root = web.get_web_root(tracker, current_work, current_work2, get_current_txouts, datadir_path, net, get_stale_counts, my_pubkey_hash, local_rate_monitor, args.worker_fee, p2p_node, my_share_hashes, recent_blocks, pseudoshare_received, share_received)
658         worker_interface.WorkerInterface(WorkerBridge()).attach_to(web_root, get_handler=lambda request: request.redirect('/static/'))
659         
660         deferral.retry('Error binding to worker port:', traceback=False)(reactor.listenTCP)(worker_endpoint[1], server.Site(web_root), interface=worker_endpoint[0])
661         
662         with open(os.path.join(os.path.join(datadir_path, 'ready_flag')), 'wb') as f:
663             pass
664         
665         print '    ...success!'
666         print
667         
668         
669         @defer.inlineCallbacks
670         def work_poller():
671             while True:
672                 flag = factory.new_block.get_deferred()
673                 try:
674                     yield set_real_work1()
675                 except:
676                     log.err()
677                 yield defer.DeferredList([flag, deferral.sleep(15)], fireOnOneCallback=True)
678         work_poller()
679         
680         
681         # done!
682         print 'Started successfully!'
683         print 'Go to http://127.0.0.1:%i/ to view graphs and statistics!' % (worker_endpoint[1],)
684         if args.donation_percentage > 0.51:
685             print '''Donating %.1f%% of work towards P2Pool's development. Thanks for the tip!''' % (args.donation_percentage,)
686         elif args.donation_percentage < 0.49:
687             print '''Donating %.1f%% of work towards P2Pool's development. Please donate to encourage further development of P2Pool!''' % (args.donation_percentage,)
688         else:
689             print '''Donating %.1f%% of work towards P2Pool's development. Thank you!''' % (args.donation_percentage,)
690             print 'You can increase this amount with --give-author argument! (or decrease it, if you must)'
691         print
692         
693         
694         if hasattr(signal, 'SIGALRM'):
695             signal.signal(signal.SIGALRM, lambda signum, frame: reactor.callFromThread(
696                 sys.stderr.write, 'Watchdog timer went off at:\n' + ''.join(traceback.format_stack())
697             ))
698             signal.siginterrupt(signal.SIGALRM, False)
699             task.LoopingCall(signal.alarm, 30).start(1)
700         
701         if args.irc_announce:
702             from twisted.words.protocols import irc
703             class IRCClient(irc.IRCClient):
704                 nickname = 'p2pool%02i' % (random.randrange(100),)
705                 channel = net.ANNOUNCE_CHANNEL
706                 def lineReceived(self, line):
707                     if p2pool.DEBUG:
708                         print repr(line)
709                     irc.IRCClient.lineReceived(self, line)
710                 def signedOn(self):
711                     irc.IRCClient.signedOn(self)
712                     self.factory.resetDelay()
713                     self.join(self.channel)
714                     @defer.inlineCallbacks
715                     def new_share(share):
716                         if share.pow_hash <= share.header['bits'].target and abs(share.timestamp - time.time()) < 10*60:
717                             yield deferral.sleep(random.expovariate(1/60))
718                             message = '\x02%s BLOCK FOUND by %s! %s%064x' % (net.NAME.upper(), bitcoin_data.script2_to_address(share.new_script, net.PARENT), net.PARENT.BLOCK_EXPLORER_URL_PREFIX, share.header_hash)
719                             if message not in self.recent_messages:
720                                 self.say(self.channel, message)
721                                 self._remember_message(message)
722                     self.watch_id = tracker.verified.added.watch(new_share)
723                     self.recent_messages = []
724                 def _remember_message(self, message):
725                     self.recent_messages.append(message)
726                     while len(self.recent_messages) > 100:
727                         self.recent_messages.pop(0)
728                 def privmsg(self, user, channel, message):
729                     if channel == self.channel:
730                         self._remember_message(message)
731                 def connectionLost(self, reason):
732                     tracker.verified.added.unwatch(self.watch_id)
733                     print 'IRC connection lost:', reason.getErrorMessage()
734             class IRCClientFactory(protocol.ReconnectingClientFactory):
735                 protocol = IRCClient
736             reactor.connectTCP("irc.freenode.net", 6667, IRCClientFactory())
737         
738         @defer.inlineCallbacks
739         def status_thread():
740             last_str = None
741             last_time = 0
742             while True:
743                 yield deferral.sleep(3)
744                 try:
745                     if time.time() > current_work2.value['last_update'] + 60:
746                         print >>sys.stderr, '''---> LOST CONTACT WITH BITCOIND for %s! Check that it isn't frozen or dead! <---''' % (math.format_dt(time.time() - current_work2.value['last_update']),)
747                     
748                     height = tracker.get_height(current_work.value['best_share_hash'])
749                     this_str = 'P2Pool: %i shares in chain (%i verified/%i total) Peers: %i (%i incoming)' % (
750                         height,
751                         len(tracker.verified.shares),
752                         len(tracker.shares),
753                         len(p2p_node.peers),
754                         sum(1 for peer in p2p_node.peers.itervalues() if peer.incoming),
755                     ) + (' FDs: %i R/%i W' % (len(reactor.getReaders()), len(reactor.getWriters())) if p2pool.DEBUG else '')
756                     
757                     datums, dt = local_rate_monitor.get_datums_in_last()
758                     my_att_s = sum(datum['work']/dt for datum in datums)
759                     this_str += '\n Local: %sH/s in last %s Local dead on arrival: %s Expected time to share: %s' % (
760                         math.format(int(my_att_s)),
761                         math.format_dt(dt),
762                         math.format_binomial_conf(sum(1 for datum in datums if datum['dead']), len(datums), 0.95),
763                         math.format_dt(2**256 / tracker.shares[current_work.value['best_share_hash']].max_target / my_att_s) if my_att_s and current_work.value['best_share_hash'] else '???',
764                     )
765                     
766                     if height > 2:
767                         (stale_orphan_shares, stale_doa_shares), shares, _ = get_stale_counts()
768                         stale_prop = p2pool_data.get_average_stale_prop(tracker, current_work.value['best_share_hash'], min(720, height))
769                         real_att_s = p2pool_data.get_pool_attempts_per_second(tracker, current_work.value['best_share_hash'], min(height - 1, 720)) / (1 - stale_prop)
770                         
771                         this_str += '\n Shares: %i (%i orphan, %i dead) Stale rate: %s Efficiency: %s Current payout: %.4f %s' % (
772                             shares, stale_orphan_shares, stale_doa_shares,
773                             math.format_binomial_conf(stale_orphan_shares + stale_doa_shares, shares, 0.95),
774                             math.format_binomial_conf(stale_orphan_shares + stale_doa_shares, shares, 0.95, lambda x: (1 - x)/(1 - stale_prop)),
775                             get_current_txouts().get(bitcoin_data.pubkey_hash_to_script2(my_pubkey_hash), 0)*1e-8, net.PARENT.SYMBOL,
776                         )
777                         this_str += '\n Pool: %sH/s Stale rate: %.1f%% Expected time to block: %s' % (
778                             math.format(int(real_att_s)),
779                             100*stale_prop,
780                             math.format_dt(2**256 / current_work.value['bits'].target / real_att_s),
781                         )
782                         
783                         desired_version_counts = p2pool_data.get_desired_version_counts(tracker, current_work.value['best_share_hash'], min(720, height))
784                         majority_desired_version = max(desired_version_counts, key=lambda k: desired_version_counts[k])
785                         if majority_desired_version not in [0, 1]:
786                             print >>sys.stderr, '#'*40
787                             print >>sys.stderr, '>>> WARNING: A MAJORITY OF SHARES CONTAIN A VOTE FOR AN UNSUPPORTED SHARE IMPLEMENTATION! (v%i with %i%% support)' % (
788                                 majority_desired_version, 100*desired_version_counts[majority_desired_version]/sum(desired_version_counts.itervalues()))
789                             print >>sys.stderr, '>>> An upgrade is likely necessary. Check http://p2pool.forre.st/ for more information.'
790                             print >>sys.stderr, '#'*40
791                     
792                     if this_str != last_str or time.time() > last_time + 15:
793                         print this_str
794                         last_str = this_str
795                         last_time = time.time()
796                 except:
797                     log.err()
798         status_thread()
799     except:
800         reactor.stop()
801         log.err(None, 'Fatal error:')
802
803 def run():
804     class FixedArgumentParser(argparse.ArgumentParser):
805         def _read_args_from_files(self, arg_strings):
806             # expand arguments referencing files
807             new_arg_strings = []
808             for arg_string in arg_strings:
809                 
810                 # for regular arguments, just add them back into the list
811                 if not arg_string or arg_string[0] not in self.fromfile_prefix_chars:
812                     new_arg_strings.append(arg_string)
813                 
814                 # replace arguments referencing files with the file content
815                 else:
816                     try:
817                         args_file = open(arg_string[1:])
818                         try:
819                             arg_strings = []
820                             for arg_line in args_file.read().splitlines():
821                                 for arg in self.convert_arg_line_to_args(arg_line):
822                                     arg_strings.append(arg)
823                             arg_strings = self._read_args_from_files(arg_strings)
824                             new_arg_strings.extend(arg_strings)
825                         finally:
826                             args_file.close()
827                     except IOError:
828                         err = sys.exc_info()[1]
829                         self.error(str(err))
830             
831             # return the modified argument list
832             return new_arg_strings
833         
834         def convert_arg_line_to_args(self, arg_line):
835             return [arg for arg in arg_line.split() if arg.strip()]
836     
837     
838     realnets=dict((name, net) for name, net in networks.nets.iteritems() if '_testnet' not in name)
839     
840     parser = FixedArgumentParser(description='p2pool (version %s)' % (p2pool.__version__,), fromfile_prefix_chars='@')
841     parser.add_argument('--version', action='version', version=p2pool.__version__)
842     parser.add_argument('--net',
843         help='use specified network (default: bitcoin)',
844         action='store', choices=sorted(realnets), default='bitcoin', dest='net_name')
845     parser.add_argument('--testnet',
846         help='''use the network's testnet''',
847         action='store_const', const=True, default=False, dest='testnet')
848     parser.add_argument('--debug',
849         help='enable debugging mode',
850         action='store_const', const=True, default=False, dest='debug')
851     parser.add_argument('-a', '--address',
852         help='generate payouts to this address (default: <address requested from bitcoind>)',
853         type=str, action='store', default=None, dest='address')
854     parser.add_argument('--datadir',
855         help='store data in this directory (default: <directory run_p2pool.py is in>/data)',
856         type=str, action='store', default=None, dest='datadir')
857     parser.add_argument('--logfile',
858         help='''log to this file (default: data/<NET>/log)''',
859         type=str, action='store', default=None, dest='logfile')
860     parser.add_argument('--merged',
861         help='call getauxblock on this url to get work for merged mining (example: http://ncuser:ncpass@127.0.0.1:10332/)',
862         type=str, action='append', default=[], dest='merged_urls')
863     parser.add_argument('--give-author', metavar='DONATION_PERCENTAGE',
864         help='donate this percentage of work towards the development of p2pool (default: 0.5)',
865         type=float, action='store', default=0.5, dest='donation_percentage')
866     parser.add_argument('--irc-announce',
867         help='announce any blocks found on irc://irc.freenode.net/#p2pool',
868         action='store_true', default=False, dest='irc_announce')
869     
870     p2pool_group = parser.add_argument_group('p2pool interface')
871     p2pool_group.add_argument('--p2pool-port', metavar='PORT',
872         help='use port PORT to listen for connections (forward this port from your router!) (default: %s)' % ', '.join('%s:%i' % (name, net.P2P_PORT) for name, net in sorted(realnets.items())),
873         type=int, action='store', default=None, dest='p2pool_port')
874     p2pool_group.add_argument('-n', '--p2pool-node', metavar='ADDR[:PORT]',
875         help='connect to existing p2pool node at ADDR listening on port PORT (defaults to default p2pool P2P port) in addition to builtin addresses',
876         type=str, action='append', default=[], dest='p2pool_nodes')
877     parser.add_argument('--disable-upnp',
878         help='''don't attempt to use UPnP to forward p2pool's P2P port from the Internet to this computer''',
879         action='store_false', default=True, dest='upnp')
880     p2pool_group.add_argument('--max-conns', metavar='CONNS',
881         help='maximum incoming connections (default: 40)',
882         type=int, action='store', default=40, dest='p2pool_conns')
883     
884     worker_group = parser.add_argument_group('worker interface')
885     worker_group.add_argument('-w', '--worker-port', metavar='PORT or ADDR:PORT',
886         help='listen on PORT on interface with ADDR for RPC connections from miners (default: all interfaces, %s)' % ', '.join('%s:%i' % (name, net.WORKER_PORT) for name, net in sorted(realnets.items())),
887         type=str, action='store', default=None, dest='worker_endpoint')
888     worker_group.add_argument('-f', '--fee', metavar='FEE_PERCENTAGE',
889         help='''charge workers mining to their own bitcoin address (by setting their miner's username to a bitcoin address) this percentage fee to mine on your p2pool instance. Amount displayed at http://127.0.0.1:WORKER_PORT/fee (default: 0)''',
890         type=float, action='store', default=0, dest='worker_fee')
891     
892     bitcoind_group = parser.add_argument_group('bitcoind interface')
893     bitcoind_group.add_argument('--bitcoind-address', metavar='BITCOIND_ADDRESS',
894         help='connect to this address (default: 127.0.0.1)',
895         type=str, action='store', default='127.0.0.1', dest='bitcoind_address')
896     bitcoind_group.add_argument('--bitcoind-rpc-port', metavar='BITCOIND_RPC_PORT',
897         help='''connect to JSON-RPC interface at this port (default: %s <read from bitcoin.conf if password not provided>)''' % ', '.join('%s:%i' % (name, net.PARENT.RPC_PORT) for name, net in sorted(realnets.items())),
898         type=int, action='store', default=None, dest='bitcoind_rpc_port')
899     bitcoind_group.add_argument('--bitcoind-p2p-port', metavar='BITCOIND_P2P_PORT',
900         help='''connect to P2P interface at this port (default: %s <read from bitcoin.conf if password not provided>)''' % ', '.join('%s:%i' % (name, net.PARENT.P2P_PORT) for name, net in sorted(realnets.items())),
901         type=int, action='store', default=None, dest='bitcoind_p2p_port')
902     
903     bitcoind_group.add_argument(metavar='BITCOIND_RPCUSERPASS',
904         help='bitcoind RPC interface username, then password, space-separated (only one being provided will cause the username to default to being empty, and none will cause P2Pool to read them from bitcoin.conf)',
905         type=str, action='store', default=[], nargs='*', dest='bitcoind_rpc_userpass')
906     
907     args = parser.parse_args()
908     
909     if args.debug:
910         p2pool.DEBUG = True
911     
912     net_name = args.net_name + ('_testnet' if args.testnet else '')
913     net = networks.nets[net_name]
914     
915     datadir_path = os.path.join((os.path.join(os.path.dirname(sys.argv[0]), 'data') if args.datadir is None else args.datadir), net_name)
916     if not os.path.exists(datadir_path):
917         os.makedirs(datadir_path)
918     
919     if len(args.bitcoind_rpc_userpass) > 2:
920         parser.error('a maximum of two arguments are allowed')
921     args.bitcoind_rpc_username, args.bitcoind_rpc_password = ([None, None] + args.bitcoind_rpc_userpass)[-2:]
922     
923     if args.bitcoind_rpc_password is None:
924         if not hasattr(net.PARENT, 'CONF_FILE_FUNC'):
925             parser.error('This network has no configuration file function. Manually enter your RPC password.')
926         conf_path = net.PARENT.CONF_FILE_FUNC()
927         if not os.path.exists(conf_path):
928             parser.error('''Bitcoin configuration file not found. Manually enter your RPC password.\r\n'''
929                 '''If you actually haven't created a configuration file, you should create one at %s with the text:\r\n'''
930                 '''\r\n'''
931                 '''server=1\r\n'''
932                 '''rpcpassword=%x''' % (conf_path, random.randrange(2**128)))
933         with open(conf_path, 'rb') as f:
934             cp = ConfigParser.RawConfigParser()
935             cp.readfp(StringIO.StringIO('[x]\r\n' + f.read()))
936             for conf_name, var_name, var_type in [
937                 ('rpcuser', 'bitcoind_rpc_username', str),
938                 ('rpcpassword', 'bitcoind_rpc_password', str),
939                 ('rpcport', 'bitcoind_rpc_port', int),
940                 ('port', 'bitcoind_p2p_port', int),
941             ]:
942                 if getattr(args, var_name) is None and cp.has_option('x', conf_name):
943                     setattr(args, var_name, var_type(cp.get('x', conf_name)))
944         if args.bitcoind_rpc_password is None:
945             parser.error('''Bitcoin configuration file didn't contain an rpcpassword= line! Add one!''')
946     
947     if args.bitcoind_rpc_username is None:
948         args.bitcoind_rpc_username = ''
949     
950     if args.bitcoind_rpc_port is None:
951         args.bitcoind_rpc_port = net.PARENT.RPC_PORT
952     
953     if args.bitcoind_p2p_port is None:
954         args.bitcoind_p2p_port = net.PARENT.P2P_PORT
955     
956     if args.p2pool_port is None:
957         args.p2pool_port = net.P2P_PORT
958     
959     if args.worker_endpoint is None:
960         worker_endpoint = '', net.WORKER_PORT
961     elif ':' not in args.worker_endpoint:
962         worker_endpoint = '', int(args.worker_endpoint)
963     else:
964         addr, port = args.worker_endpoint.rsplit(':', 1)
965         worker_endpoint = addr, int(port)
966     
967     if args.address is not None:
968         try:
969             args.pubkey_hash = bitcoin_data.address_to_pubkey_hash(args.address, net.PARENT)
970         except Exception, e:
971             parser.error('error parsing address: ' + repr(e))
972     else:
973         args.pubkey_hash = None
974     
975     def separate_url(url):
976         s = urlparse.urlsplit(url)
977         if '@' not in s.netloc:
978             parser.error('merged url netloc must contain an "@"')
979         userpass, new_netloc = s.netloc.rsplit('@', 1)
980         return urlparse.urlunsplit(s._replace(netloc=new_netloc)), userpass
981     merged_urls = map(separate_url, args.merged_urls)
982     
983     if args.logfile is None:
984         args.logfile = os.path.join(datadir_path, 'log')
985     
986     logfile = logging.LogFile(args.logfile)
987     pipe = logging.TimestampingPipe(logging.TeePipe([logging.EncodeReplacerPipe(sys.stderr), logfile]))
988     sys.stdout = logging.AbortPipe(pipe)
989     sys.stderr = log.DefaultObserver.stderr = logging.AbortPipe(logging.PrefixPipe(pipe, '> '))
990     if hasattr(signal, "SIGUSR1"):
991         def sigusr1(signum, frame):
992             print 'Caught SIGUSR1, closing %r...' % (args.logfile,)
993             logfile.reopen()
994             print '...and reopened %r after catching SIGUSR1.' % (args.logfile,)
995         signal.signal(signal.SIGUSR1, sigusr1)
996     task.LoopingCall(logfile.reopen).start(5)
997     
998     reactor.callWhenRunning(main, args, net, datadir_path, merged_urls, worker_endpoint)
999     reactor.run()