replaced recent_blocks with gleaning from share chain
[p2pool.git] / p2pool / main.py
1 from __future__ import division
2
3 import ConfigParser
4 import StringIO
5 import argparse
6 import base64
7 import os
8 import random
9 import sys
10 import time
11 import signal
12 import traceback
13 import urlparse
14
15 from twisted.internet import defer, reactor, protocol, task
16 from twisted.web import server
17 from twisted.python import log
18 from nattraverso import portmapper, ipdiscover
19
20 import bitcoin.p2p as bitcoin_p2p, bitcoin.getwork as bitcoin_getwork, bitcoin.data as bitcoin_data
21 from bitcoin import worker_interface, height_tracker
22 from util import expiring_dict, jsonrpc, variable, deferral, math, logging, pack
23 from . import p2p, networks, web
24 import p2pool, p2pool.data as p2pool_data
25
26 @deferral.retry('Error getting work from bitcoind:', 3)
27 @defer.inlineCallbacks
28 def getwork(bitcoind):
29     try:
30         work = yield bitcoind.rpc_getmemorypool()
31     except jsonrpc.Error, e:
32         if e.code == -32601: # Method not found
33             print >>sys.stderr, 'Error: Bitcoin version too old! Upgrade to v0.5 or newer!'
34             raise deferral.RetrySilentlyException()
35         raise
36     packed_transactions = [x.decode('hex') for x in work['transactions']]
37     defer.returnValue(dict(
38         version=work['version'],
39         previous_block_hash=int(work['previousblockhash'], 16),
40         transactions=map(bitcoin_data.tx_type.unpack, packed_transactions),
41         merkle_link=bitcoin_data.calculate_merkle_link([0] + map(bitcoin_data.hash256, packed_transactions), 0), # using 0 is a bit of a hack, but will always work when index=0
42         subsidy=work['coinbasevalue'],
43         time=work['time'],
44         bits=bitcoin_data.FloatingIntegerType().unpack(work['bits'].decode('hex')[::-1]) if isinstance(work['bits'], (str, unicode)) else bitcoin_data.FloatingInteger(work['bits']),
45         coinbaseflags=work['coinbaseflags'].decode('hex') if 'coinbaseflags' in work else ''.join(x.decode('hex') for x in work['coinbaseaux'].itervalues()) if 'coinbaseaux' in work else '',
46     ))
47
48 @defer.inlineCallbacks
49 def main(args, net, datadir_path, merged_urls, worker_endpoint):
50     try:
51         print 'p2pool (version %s)' % (p2pool.__version__,)
52         print
53         
54         # connect to bitcoind over JSON-RPC and do initial getmemorypool
55         url = 'http://%s:%i/' % (args.bitcoind_address, args.bitcoind_rpc_port)
56         print '''Testing bitcoind RPC connection to '%s' with username '%s'...''' % (url, args.bitcoind_rpc_username)
57         bitcoind = jsonrpc.Proxy(url, dict(Authorization='Basic ' + base64.b64encode(args.bitcoind_rpc_username + ':' + args.bitcoind_rpc_password)), timeout=30)
58         @deferral.retry('Error while checking Bitcoin connection:', 1)
59         @defer.inlineCallbacks
60         def check():
61             if not (yield net.PARENT.RPC_CHECK)(bitcoind):
62                 print >>sys.stderr, "    Check failed! Make sure that you're connected to the right bitcoind with --bitcoind-rpc-port!"
63                 raise deferral.RetrySilentlyException()
64             temp_work = yield getwork(bitcoind)
65             if not net.VERSION_CHECK((yield bitcoind.rpc_getinfo())['version'], temp_work):
66                 print >>sys.stderr, '    Bitcoin version too old! BIP16 support required! Upgrade to 0.6.0rc4 or greater!'
67                 raise deferral.RetrySilentlyException()
68             defer.returnValue(temp_work)
69         temp_work = yield check()
70         print '    ...success!'
71         print '    Current block hash: %x' % (temp_work['previous_block_hash'],)
72         print
73         
74         # connect to bitcoind over bitcoin-p2p
75         print '''Testing bitcoind P2P connection to '%s:%s'...''' % (args.bitcoind_address, args.bitcoind_p2p_port)
76         factory = bitcoin_p2p.ClientFactory(net.PARENT)
77         reactor.connectTCP(args.bitcoind_address, args.bitcoind_p2p_port, factory)
78         yield factory.getProtocol() # waits until handshake is successful
79         print '    ...success!'
80         print
81         
82         print 'Determining payout address...'
83         if args.pubkey_hash is None:
84             address_path = os.path.join(datadir_path, 'cached_payout_address')
85             
86             if os.path.exists(address_path):
87                 with open(address_path, 'rb') as f:
88                     address = f.read().strip('\r\n')
89                 print '    Loaded cached address: %s...' % (address,)
90             else:
91                 address = None
92             
93             if address is not None:
94                 res = yield deferral.retry('Error validating cached address:', 5)(lambda: bitcoind.rpc_validateaddress(address))()
95                 if not res['isvalid'] or not res['ismine']:
96                     print '    Cached address is either invalid or not controlled by local bitcoind!'
97                     address = None
98             
99             if address is None:
100                 print '    Getting payout address from bitcoind...'
101                 address = yield deferral.retry('Error getting payout address from bitcoind:', 5)(lambda: bitcoind.rpc_getaccountaddress('p2pool'))()
102             
103             with open(address_path, 'wb') as f:
104                 f.write(address)
105             
106             my_pubkey_hash = bitcoin_data.address_to_pubkey_hash(address, net.PARENT)
107         else:
108             my_pubkey_hash = args.pubkey_hash
109         print '    ...success! Payout address:', bitcoin_data.pubkey_hash_to_address(my_pubkey_hash, net.PARENT)
110         print
111         
112         my_share_hashes = set()
113         my_doa_share_hashes = set()
114         
115         tracker = p2pool_data.OkayTracker(net, my_share_hashes, my_doa_share_hashes)
116         shared_share_hashes = set()
117         ss = p2pool_data.ShareStore(os.path.join(datadir_path, 'shares.'), net)
118         known_verified = set()
119         print "Loading shares..."
120         for i, (mode, contents) in enumerate(ss.get_shares()):
121             if mode == 'share':
122                 if contents.hash in tracker.shares:
123                     continue
124                 shared_share_hashes.add(contents.hash)
125                 contents.time_seen = 0
126                 tracker.add(contents)
127                 if len(tracker.shares) % 1000 == 0 and tracker.shares:
128                     print "    %i" % (len(tracker.shares),)
129             elif mode == 'verified_hash':
130                 known_verified.add(contents)
131             else:
132                 raise AssertionError()
133         print "    ...inserting %i verified shares..." % (len(known_verified),)
134         for h in known_verified:
135             if h not in tracker.shares:
136                 ss.forget_verified_share(h)
137                 continue
138             tracker.verified.add(tracker.shares[h])
139         print "    ...done loading %i shares!" % (len(tracker.shares),)
140         print
141         tracker.removed.watch(lambda share: ss.forget_share(share.hash))
142         tracker.verified.removed.watch(lambda share: ss.forget_verified_share(share.hash))
143         tracker.removed.watch(lambda share: shared_share_hashes.discard(share.hash))
144         
145         peer_heads = expiring_dict.ExpiringDict(300) # hash -> peers that know of it
146         
147         pre_current_work = variable.Variable(None)
148         pre_merged_work = variable.Variable({})
149         # information affecting work that should trigger a long-polling update
150         current_work = variable.Variable(None)
151         # information affecting work that should not trigger a long-polling update
152         current_work2 = variable.Variable(None)
153         
154         requested = expiring_dict.ExpiringDict(300)
155         
156         print 'Initializing work...'
157         @defer.inlineCallbacks
158         def set_real_work1():
159             work = yield getwork(bitcoind)
160             current_work2.set(dict(
161                 time=work['time'],
162                 transactions=work['transactions'],
163                 merkle_link=work['merkle_link'],
164                 subsidy=work['subsidy'],
165                 clock_offset=time.time() - work['time'],
166                 last_update=time.time(),
167             )) # second set first because everything hooks on the first
168             pre_current_work.set(dict(
169                 version=work['version'],
170                 previous_block=work['previous_block_hash'],
171                 bits=work['bits'],
172                 coinbaseflags=work['coinbaseflags'],
173             ))
174         yield set_real_work1()
175         
176         get_height_rel_highest = yield height_tracker.get_height_rel_highest_func(bitcoind, factory, lambda: pre_current_work.value['previous_block'], net)
177         
178         def set_real_work2():
179             best, desired = tracker.think(get_height_rel_highest, pre_current_work.value['previous_block'], pre_current_work.value['bits'])
180             
181             t = dict(pre_current_work.value)
182             t['best_share_hash'] = best
183             t['mm_chains'] = pre_merged_work.value
184             current_work.set(t)
185             
186             t = time.time()
187             for peer2, share_hash in desired:
188                 if share_hash not in tracker.tails: # was received in the time tracker.think was running
189                     continue
190                 last_request_time, count = requested.get(share_hash, (None, 0))
191                 if last_request_time is not None and last_request_time - 5 < t < last_request_time + 10 * 1.5**count:
192                     continue
193                 potential_peers = set()
194                 for head in tracker.tails[share_hash]:
195                     potential_peers.update(peer_heads.get(head, set()))
196                 potential_peers = [peer for peer in potential_peers if peer.connected2]
197                 if count == 0 and peer2 is not None and peer2.connected2:
198                     peer = peer2
199                 else:
200                     peer = random.choice(potential_peers) if potential_peers and random.random() > .2 else peer2
201                     if peer is None:
202                         continue
203                 
204                 print 'Requesting parent share %s from %s' % (p2pool_data.format_hash(share_hash), '%s:%i' % peer.addr)
205                 peer.send_getshares(
206                     hashes=[share_hash],
207                     parents=2000,
208                     stops=list(set(tracker.heads) | set(
209                         tracker.get_nth_parent_hash(head, min(max(0, tracker.get_height_and_last(head)[0] - 1), 10)) for head in tracker.heads
210                     ))[:100],
211                 )
212                 requested[share_hash] = t, count + 1
213         pre_current_work.changed.watch(lambda _: set_real_work2())
214         pre_merged_work.changed.watch(lambda _: set_real_work2())
215         set_real_work2()
216         print '    ...success!'
217         print
218         
219         
220         @defer.inlineCallbacks
221         def set_merged_work(merged_url, merged_userpass):
222             merged_proxy = jsonrpc.Proxy(merged_url, dict(Authorization='Basic ' + base64.b64encode(merged_userpass)))
223             while True:
224                 auxblock = yield deferral.retry('Error while calling merged getauxblock:', 1)(merged_proxy.rpc_getauxblock)()
225                 pre_merged_work.set(dict(pre_merged_work.value, **{auxblock['chainid']: dict(
226                     hash=int(auxblock['hash'], 16),
227                     target=pack.IntType(256).unpack(auxblock['target'].decode('hex')),
228                     merged_proxy=merged_proxy,
229                 )}))
230                 yield deferral.sleep(1)
231         for merged_url, merged_userpass in merged_urls:
232             set_merged_work(merged_url, merged_userpass)
233         
234         @pre_merged_work.changed.watch
235         def _(new_merged_work):
236             print 'Got new merged mining work!'
237         
238         # setup p2p logic and join p2pool network
239         
240         class Node(p2p.Node):
241             def handle_shares(self, shares, peer):
242                 if len(shares) > 5:
243                     print 'Processing %i shares from %s...' % (len(shares), '%s:%i' % peer.addr if peer is not None else None)
244                 
245                 new_count = 0
246                 for share in shares:
247                     if share.hash in tracker.shares:
248                         #print 'Got duplicate share, ignoring. Hash: %s' % (p2pool_data.format_hash(share.hash),)
249                         continue
250                     
251                     new_count += 1
252                     
253                     #print 'Received share %s from %r' % (p2pool_data.format_hash(share.hash), share.peer.addr if share.peer is not None else None)
254                     
255                     tracker.add(share)
256                 
257                 if shares and peer is not None:
258                     peer_heads.setdefault(shares[0].hash, set()).add(peer)
259                 
260                 if new_count:
261                     set_real_work2()
262                 
263                 if len(shares) > 5:
264                     print '... done processing %i shares. New: %i Have: %i/~%i' % (len(shares), new_count, len(tracker.shares), 2*net.CHAIN_LENGTH)
265             
266             def handle_share_hashes(self, hashes, peer):
267                 t = time.time()
268                 get_hashes = []
269                 for share_hash in hashes:
270                     if share_hash in tracker.shares:
271                         continue
272                     last_request_time, count = requested.get(share_hash, (None, 0))
273                     if last_request_time is not None and last_request_time - 5 < t < last_request_time + 10 * 1.5**count:
274                         continue
275                     print 'Got share hash, requesting! Hash: %s' % (p2pool_data.format_hash(share_hash),)
276                     get_hashes.append(share_hash)
277                     requested[share_hash] = t, count + 1
278                 
279                 if hashes and peer is not None:
280                     peer_heads.setdefault(hashes[0], set()).add(peer)
281                 if get_hashes:
282                     peer.send_getshares(hashes=get_hashes, parents=0, stops=[])
283             
284             def handle_get_shares(self, hashes, parents, stops, peer):
285                 parents = min(parents, 1000//len(hashes))
286                 stops = set(stops)
287                 shares = []
288                 for share_hash in hashes:
289                     for share in tracker.get_chain(share_hash, min(parents + 1, tracker.get_height(share_hash))):
290                         if share.hash in stops:
291                             break
292                         shares.append(share)
293                 print 'Sending %i shares to %s:%i' % (len(shares), peer.addr[0], peer.addr[1])
294                 return shares
295         
296         @deferral.retry('Error submitting block: (will retry)', 10, 10)
297         @defer.inlineCallbacks
298         def submit_block(block, ignore_failure):
299             success = yield bitcoind.rpc_getmemorypool(bitcoin_data.block_type.pack(block).encode('hex'))
300             success_expected = net.PARENT.POW_FUNC(bitcoin_data.block_header_type.pack(block['header'])) <= block['header']['bits'].target
301             if (not success and success_expected and not ignore_failure) or (success and not success_expected):
302                 print >>sys.stderr, 'Block submittal result: %s Expected: %s' % (result, expected_result)
303         
304         @tracker.verified.added.watch
305         def _(share):
306             if share.pow_hash <= share.header['bits'].target:
307                 submit_block(share.as_block(tracker), ignore_failure=True)
308                 print
309                 print 'GOT BLOCK FROM PEER! Passing to bitcoind! %s bitcoin: %s%064x' % (p2pool_data.format_hash(share.hash), net.PARENT.BLOCK_EXPLORER_URL_PREFIX, share.header_hash)
310                 print
311         
312         print 'Joining p2pool network using port %i...' % (args.p2pool_port,)
313         
314         @defer.inlineCallbacks
315         def parse(x):
316             if ':' in x:
317                 ip, port = x.split(':')
318                 defer.returnValue(((yield reactor.resolve(ip)), int(port)))
319             else:
320                 defer.returnValue(((yield reactor.resolve(x)), net.P2P_PORT))
321         
322         addrs = {}
323         if os.path.exists(os.path.join(datadir_path, 'addrs.txt')):
324             try:
325                 addrs.update(dict(eval(x) for x in open(os.path.join(datadir_path, 'addrs.txt'))))
326             except:
327                 print >>sys.stderr, "error reading addrs"
328         for addr_df in map(parse, net.BOOTSTRAP_ADDRS):
329             try:
330                 addr = yield addr_df
331                 if addr not in addrs:
332                     addrs[addr] = (0, time.time(), time.time())
333             except:
334                 log.err()
335         
336         connect_addrs = set()
337         for addr_df in map(parse, args.p2pool_nodes):
338             try:
339                 connect_addrs.add((yield addr_df))
340             except:
341                 log.err()
342         
343         p2p_node = Node(
344             best_share_hash_func=lambda: current_work.value['best_share_hash'],
345             port=args.p2pool_port,
346             net=net,
347             addr_store=addrs,
348             connect_addrs=connect_addrs,
349             max_incoming_conns=args.p2pool_conns,
350         )
351         p2p_node.start()
352         
353         task.LoopingCall(lambda: open(os.path.join(datadir_path, 'addrs.txt'), 'w').writelines(repr(x) + '\n' for x in p2p_node.addr_store.iteritems())).start(60)
354         
355         # send share when the chain changes to their chain
356         def work_changed(new_work):
357             #print 'Work changed:', new_work
358             shares = []
359             for share in tracker.get_chain(new_work['best_share_hash'], min(5, tracker.get_height(new_work['best_share_hash']))):
360                 if share.hash in shared_share_hashes:
361                     break
362                 shared_share_hashes.add(share.hash)
363                 shares.append(share)
364             
365             for peer in p2p_node.peers.itervalues():
366                 peer.sendShares([share for share in shares if share.peer is not peer])
367         
368         current_work.changed.watch(work_changed)
369         
370         def save_shares():
371             for share in tracker.get_chain(current_work.value['best_share_hash'], min(tracker.get_height(current_work.value['best_share_hash']), 2*net.CHAIN_LENGTH)):
372                 ss.add_share(share)
373                 if share.hash in tracker.verified.shares:
374                     ss.add_verified_hash(share.hash)
375         task.LoopingCall(save_shares).start(60)
376         
377         print '    ...success!'
378         print
379         
380         if args.upnp:
381             @defer.inlineCallbacks
382             def upnp_thread():
383                 while True:
384                     try:
385                         is_lan, lan_ip = yield ipdiscover.get_local_ip()
386                         if is_lan:
387                             pm = yield portmapper.get_port_mapper()
388                             yield pm._upnp.add_port_mapping(lan_ip, args.p2pool_port, args.p2pool_port, 'p2pool', 'TCP')
389                     except defer.TimeoutError:
390                         pass
391                     except:
392                         if p2pool.DEBUG:
393                             log.err(None, 'UPnP error:')
394                     yield deferral.sleep(random.expovariate(1/120))
395             upnp_thread()
396         
397         # start listening for workers with a JSON-RPC server
398         
399         print 'Listening for workers on %r port %i...' % (worker_endpoint[0], worker_endpoint[1])
400         
401         # setup worker logic
402         
403         removed_unstales_var = variable.Variable((0, 0, 0))
404         removed_doa_unstales_var = variable.Variable(0)
405         @tracker.verified.removed.watch
406         def _(share):
407             if share.hash in my_share_hashes and tracker.is_child_of(share.hash, current_work.value['best_share_hash']):
408                 assert share.share_data['stale_info'] in [0, 253, 254] # we made these shares in this instance
409                 removed_unstales_var.set((
410                     removed_unstales_var.value[0] + 1,
411                     removed_unstales_var.value[1] + (1 if share.share_data['stale_info'] == 253 else 0),
412                     removed_unstales_var.value[2] + (1 if share.share_data['stale_info'] == 254 else 0),
413                 ))
414             if share.hash in my_doa_share_hashes and tracker.is_child_of(share.hash, current_work.value['best_share_hash']):
415                 removed_doa_unstales.set(removed_doa_unstales.value + 1)
416         
417         def get_stale_counts():
418             '''Returns (orphans, doas), total, (orphans_recorded_in_chain, doas_recorded_in_chain)'''
419             my_shares = len(my_share_hashes)
420             my_doa_shares = len(my_doa_share_hashes)
421             delta = tracker.verified.get_delta(current_work.value['best_share_hash'])
422             my_shares_in_chain = delta.my_count + removed_unstales_var.value[0]
423             my_doa_shares_in_chain = delta.my_doa_count + removed_doa_unstales_var.value
424             orphans_recorded_in_chain = delta.my_orphan_announce_count + removed_unstales_var.value[1]
425             doas_recorded_in_chain = delta.my_dead_announce_count + removed_unstales_var.value[2]
426             
427             my_shares_not_in_chain = my_shares - my_shares_in_chain
428             my_doa_shares_not_in_chain = my_doa_shares - my_doa_shares_in_chain
429             
430             return (my_shares_not_in_chain - my_doa_shares_not_in_chain, my_doa_shares_not_in_chain), my_shares, (orphans_recorded_in_chain, doas_recorded_in_chain)
431         
432         
433         pseudoshare_received = variable.Event()
434         share_received = variable.Event()
435         local_rate_monitor = math.RateMonitor(10*60)
436         
437         class WorkerBridge(worker_interface.WorkerBridge):
438             def __init__(self):
439                 worker_interface.WorkerBridge.__init__(self)
440                 self.new_work_event = current_work.changed
441                 self.recent_shares_ts_work = []
442             
443             def preprocess_request(self, request):
444                 user = request.getUser() if request.getUser() is not None else ''
445                 
446                 desired_pseudoshare_target = None
447                 if '+' in user:
448                     user, desired_pseudoshare_difficulty_str = user.rsplit('+', 1)
449                     try:
450                         desired_pseudoshare_target = bitcoin_data.difficulty_to_target(float(desired_pseudoshare_difficulty_str))
451                     except:
452                         pass
453                 
454                 desired_share_target = 2**256 - 1
455                 if '/' in user:
456                     user, min_diff_str = user.rsplit('/', 1)
457                     try:
458                         desired_share_target = bitcoin_data.difficulty_to_target(float(min_diff_str))
459                     except:
460                         pass
461                 
462                 if random.uniform(0, 100) < args.worker_fee:
463                     pubkey_hash = my_pubkey_hash
464                 else:
465                     try:
466                         pubkey_hash = bitcoin_data.address_to_pubkey_hash(user, net.PARENT)
467                     except: # XXX blah
468                         pubkey_hash = my_pubkey_hash
469                 
470                 return pubkey_hash, desired_share_target, desired_pseudoshare_target
471             
472             def get_work(self, pubkey_hash, desired_share_target, desired_pseudoshare_target):
473                 if len(p2p_node.peers) == 0 and net.PERSIST:
474                     raise jsonrpc.Error(-12345, u'p2pool is not connected to any peers')
475                 if current_work.value['best_share_hash'] is None and net.PERSIST:
476                     raise jsonrpc.Error(-12345, u'p2pool is downloading shares')
477                 if time.time() > current_work2.value['last_update'] + 60:
478                     raise jsonrpc.Error(-12345, u'lost contact with bitcoind')
479                 
480                 if current_work.value['mm_chains']:
481                     tree, size = bitcoin_data.make_auxpow_tree(current_work.value['mm_chains'])
482                     mm_hashes = [current_work.value['mm_chains'].get(tree.get(i), dict(hash=0))['hash'] for i in xrange(size)]
483                     mm_data = '\xfa\xbemm' + bitcoin_data.aux_pow_coinbase_type.pack(dict(
484                         merkle_root=bitcoin_data.merkle_hash(mm_hashes),
485                         size=size,
486                         nonce=0,
487                     ))
488                     mm_later = [(aux_work, mm_hashes.index(aux_work['hash']), mm_hashes) for chain_id, aux_work in current_work.value['mm_chains'].iteritems()]
489                 else:
490                     mm_data = ''
491                     mm_later = []
492                 
493                 if True:
494                     share_info, generate_tx = p2pool_data.Share.generate_transaction(
495                         tracker=tracker,
496                         share_data=dict(
497                             previous_share_hash=current_work.value['best_share_hash'],
498                             coinbase=(mm_data + current_work.value['coinbaseflags'])[:100],
499                             nonce=random.randrange(2**32),
500                             pubkey_hash=pubkey_hash,
501                             subsidy=current_work2.value['subsidy'],
502                             donation=math.perfect_round(65535*args.donation_percentage/100),
503                             stale_info=(lambda (orphans, doas), total, (orphans_recorded_in_chain, doas_recorded_in_chain):
504                                 253 if orphans > orphans_recorded_in_chain else
505                                 254 if doas > doas_recorded_in_chain else
506                                 0
507                             )(*get_stale_counts()),
508                             desired_version=1,
509                         ),
510                         block_target=current_work.value['bits'].target,
511                         desired_timestamp=int(time.time() - current_work2.value['clock_offset']),
512                         desired_target=desired_share_target,
513                         ref_merkle_link=dict(branch=[], index=0),
514                         net=net,
515                     )
516                 
517                 target = net.PARENT.SANE_MAX_TARGET
518                 if desired_pseudoshare_target is None:
519                     if len(self.recent_shares_ts_work) == 50:
520                         hash_rate = sum(work for ts, work in self.recent_shares_ts_work[1:])//(self.recent_shares_ts_work[-1][0] - self.recent_shares_ts_work[0][0])
521                         target = min(target, 2**256//hash_rate)
522                 else:
523                     target = min(target, desired_pseudoshare_target)
524                 target = max(target, share_info['bits'].target)
525                 for aux_work in current_work.value['mm_chains'].itervalues():
526                     target = max(target, aux_work['target'])
527                 
528                 transactions = [generate_tx] + list(current_work2.value['transactions'])
529                 packed_generate_tx = bitcoin_data.tx_type.pack(generate_tx)
530                 merkle_root = bitcoin_data.check_merkle_link(bitcoin_data.hash256(packed_generate_tx), current_work2.value['merkle_link'])
531                 
532                 getwork_time = time.time()
533                 merkle_link = current_work2.value['merkle_link']
534                 
535                 print 'New work for worker! Difficulty: %.06f Share difficulty: %.06f Total block value: %.6f %s including %i transactions' % (
536                     bitcoin_data.target_to_difficulty(target),
537                     bitcoin_data.target_to_difficulty(share_info['bits'].target),
538                     current_work2.value['subsidy']*1e-8, net.PARENT.SYMBOL,
539                     len(current_work2.value['transactions']),
540                 )
541                 
542                 ba = bitcoin_getwork.BlockAttempt(
543                     version=current_work.value['version'],
544                     previous_block=current_work.value['previous_block'],
545                     merkle_root=merkle_root,
546                     timestamp=current_work2.value['time'],
547                     bits=current_work.value['bits'],
548                     share_target=target,
549                 )
550                 
551                 received_header_hashes = set()
552                 
553                 def got_response(header, request):
554                     assert header['merkle_root'] == merkle_root
555                     
556                     header_hash = bitcoin_data.hash256(bitcoin_data.block_header_type.pack(header))
557                     pow_hash = net.PARENT.POW_FUNC(bitcoin_data.block_header_type.pack(header))
558                     on_time = current_work.value['best_share_hash'] == share_info['share_data']['previous_share_hash']
559                     
560                     try:
561                         if pow_hash <= header['bits'].target or p2pool.DEBUG:
562                             submit_block(dict(header=header, txs=transactions), ignore_failure=False)
563                             if pow_hash <= header['bits'].target:
564                                 print
565                                 print 'GOT BLOCK FROM MINER! Passing to bitcoind! %s%064x' % (net.PARENT.BLOCK_EXPLORER_URL_PREFIX, header_hash)
566                                 print
567                     except:
568                         log.err(None, 'Error while processing potential block:')
569                     
570                     for aux_work, index, hashes in mm_later:
571                         try:
572                             if pow_hash <= aux_work['target'] or p2pool.DEBUG:
573                                 df = deferral.retry('Error submitting merged block: (will retry)', 10, 10)(aux_work['merged_proxy'].rpc_getauxblock)(
574                                     pack.IntType(256, 'big').pack(aux_work['hash']).encode('hex'),
575                                     bitcoin_data.aux_pow_type.pack(dict(
576                                         merkle_tx=dict(
577                                             tx=transactions[0],
578                                             block_hash=header_hash,
579                                             merkle_link=merkle_link,
580                                         ),
581                                         merkle_link=bitcoin_data.calculate_merkle_link(hashes, index),
582                                         parent_block_header=header,
583                                     )).encode('hex'),
584                                 )
585                                 @df.addCallback
586                                 def _(result):
587                                     if result != (pow_hash <= aux_work['target']):
588                                         print >>sys.stderr, 'Merged block submittal result: %s Expected: %s' % (result, pow_hash <= aux_work['target'])
589                                     else:
590                                         print 'Merged block submittal result: %s' % (result,)
591                                 @df.addErrback
592                                 def _(err):
593                                     log.err(err, 'Error submitting merged block:')
594                         except:
595                             log.err(None, 'Error while processing merged mining POW:')
596                     
597                     if pow_hash <= share_info['bits'].target and header_hash not in received_header_hashes:
598                         min_header = dict(header);del min_header['merkle_root']
599                         hash_link = p2pool_data.prefix_to_hash_link(packed_generate_tx[:-32-4], p2pool_data.Share.gentx_before_refhash)
600                         share = p2pool_data.Share(net, None, dict(
601                             min_header=min_header, share_info=share_info, hash_link=hash_link,
602                             ref_merkle_link=dict(branch=[], index=0),
603                         ), merkle_link=merkle_link, other_txs=transactions[1:] if pow_hash <= header['bits'].target else None)
604                         
605                         print 'GOT SHARE! %s %s prev %s age %.2fs%s' % (
606                             request.getUser(),
607                             p2pool_data.format_hash(share.hash),
608                             p2pool_data.format_hash(share.previous_hash),
609                             time.time() - getwork_time,
610                             ' DEAD ON ARRIVAL' if not on_time else '',
611                         )
612                         my_share_hashes.add(share.hash)
613                         if not on_time:
614                             my_doa_share_hashes.add(share.hash)
615                         
616                         tracker.add(share)
617                         if not p2pool.DEBUG:
618                             tracker.verified.add(share)
619                         set_real_work2()
620                         
621                         try:
622                             if pow_hash <= header['bits'].target or p2pool.DEBUG:
623                                 for peer in p2p_node.peers.itervalues():
624                                     peer.sendShares([share])
625                                 shared_share_hashes.add(share.hash)
626                         except:
627                             log.err(None, 'Error forwarding block solution:')
628                         
629                         share_received.happened(bitcoin_data.target_to_average_attempts(share.target), not on_time)
630                     
631                     if pow_hash > target:
632                         print 'Worker %s submitted share with hash > target:' % (request.getUser(),)
633                         print '    Hash:   %56x' % (pow_hash,)
634                         print '    Target: %56x' % (target,)
635                     elif header_hash in received_header_hashes:
636                         print >>sys.stderr, 'Worker %s @ %s submitted share more than once!' % (request.getUser(), request.getClientIP())
637                     else:
638                         received_header_hashes.add(header_hash)
639                         
640                         pseudoshare_received.happened(bitcoin_data.target_to_average_attempts(target), not on_time, request.getUser())
641                         self.recent_shares_ts_work.append((time.time(), bitcoin_data.target_to_average_attempts(target)))
642                         while len(self.recent_shares_ts_work) > 50:
643                             self.recent_shares_ts_work.pop(0)
644                         local_rate_monitor.add_datum(dict(work=bitcoin_data.target_to_average_attempts(target), dead=not on_time, user=request.getUser()))
645                     
646                     return on_time
647                 
648                 return ba, got_response
649         
650         get_current_txouts = lambda: p2pool_data.get_expected_payouts(tracker, current_work.value['best_share_hash'], current_work.value['bits'].target, current_work2.value['subsidy'], net)
651         
652         web_root = web.get_web_root(tracker, current_work, current_work2, get_current_txouts, datadir_path, net, get_stale_counts, my_pubkey_hash, local_rate_monitor, args.worker_fee, p2p_node, my_share_hashes, pseudoshare_received, share_received)
653         worker_interface.WorkerInterface(WorkerBridge()).attach_to(web_root, get_handler=lambda request: request.redirect('/static/'))
654         
655         deferral.retry('Error binding to worker port:', traceback=False)(reactor.listenTCP)(worker_endpoint[1], server.Site(web_root), interface=worker_endpoint[0])
656         
657         with open(os.path.join(os.path.join(datadir_path, 'ready_flag')), 'wb') as f:
658             pass
659         
660         print '    ...success!'
661         print
662         
663         
664         @defer.inlineCallbacks
665         def work_poller():
666             while True:
667                 flag = factory.new_block.get_deferred()
668                 try:
669                     yield set_real_work1()
670                 except:
671                     log.err()
672                 yield defer.DeferredList([flag, deferral.sleep(15)], fireOnOneCallback=True)
673         work_poller()
674         
675         
676         # done!
677         print 'Started successfully!'
678         print 'Go to http://127.0.0.1:%i/ to view graphs and statistics!' % (worker_endpoint[1],)
679         if args.donation_percentage > 0.51:
680             print '''Donating %.1f%% of work towards P2Pool's development. Thanks for the tip!''' % (args.donation_percentage,)
681         elif args.donation_percentage < 0.49:
682             print '''Donating %.1f%% of work towards P2Pool's development. Please donate to encourage further development of P2Pool!''' % (args.donation_percentage,)
683         else:
684             print '''Donating %.1f%% of work towards P2Pool's development. Thank you!''' % (args.donation_percentage,)
685             print 'You can increase this amount with --give-author argument! (or decrease it, if you must)'
686         print
687         
688         
689         if hasattr(signal, 'SIGALRM'):
690             signal.signal(signal.SIGALRM, lambda signum, frame: reactor.callFromThread(
691                 sys.stderr.write, 'Watchdog timer went off at:\n' + ''.join(traceback.format_stack())
692             ))
693             signal.siginterrupt(signal.SIGALRM, False)
694             task.LoopingCall(signal.alarm, 30).start(1)
695         
696         if args.irc_announce:
697             from twisted.words.protocols import irc
698             class IRCClient(irc.IRCClient):
699                 nickname = 'p2pool%02i' % (random.randrange(100),)
700                 channel = net.ANNOUNCE_CHANNEL
701                 def lineReceived(self, line):
702                     if p2pool.DEBUG:
703                         print repr(line)
704                     irc.IRCClient.lineReceived(self, line)
705                 def signedOn(self):
706                     irc.IRCClient.signedOn(self)
707                     self.factory.resetDelay()
708                     self.join(self.channel)
709                     @defer.inlineCallbacks
710                     def new_share(share):
711                         if share.pow_hash <= share.header['bits'].target and abs(share.timestamp - time.time()) < 10*60:
712                             yield deferral.sleep(random.expovariate(1/60))
713                             message = '\x02%s BLOCK FOUND by %s! %s%064x' % (net.NAME.upper(), bitcoin_data.script2_to_address(share.new_script, net.PARENT), net.PARENT.BLOCK_EXPLORER_URL_PREFIX, share.header_hash)
714                             if message not in self.recent_messages:
715                                 self.say(self.channel, message)
716                                 self._remember_message(message)
717                     self.watch_id = tracker.verified.added.watch(new_share)
718                     self.recent_messages = []
719                 def _remember_message(self, message):
720                     self.recent_messages.append(message)
721                     while len(self.recent_messages) > 100:
722                         self.recent_messages.pop(0)
723                 def privmsg(self, user, channel, message):
724                     if channel == self.channel:
725                         self._remember_message(message)
726                 def connectionLost(self, reason):
727                     tracker.verified.added.unwatch(self.watch_id)
728                     print 'IRC connection lost:', reason.getErrorMessage()
729             class IRCClientFactory(protocol.ReconnectingClientFactory):
730                 protocol = IRCClient
731             reactor.connectTCP("irc.freenode.net", 6667, IRCClientFactory())
732         
733         @defer.inlineCallbacks
734         def status_thread():
735             last_str = None
736             last_time = 0
737             while True:
738                 yield deferral.sleep(3)
739                 try:
740                     if time.time() > current_work2.value['last_update'] + 60:
741                         print >>sys.stderr, '''---> LOST CONTACT WITH BITCOIND for %s! Check that it isn't frozen or dead! <---''' % (math.format_dt(time.time() - current_work2.value['last_update']),)
742                     
743                     height = tracker.get_height(current_work.value['best_share_hash'])
744                     this_str = 'P2Pool: %i shares in chain (%i verified/%i total) Peers: %i (%i incoming)' % (
745                         height,
746                         len(tracker.verified.shares),
747                         len(tracker.shares),
748                         len(p2p_node.peers),
749                         sum(1 for peer in p2p_node.peers.itervalues() if peer.incoming),
750                     ) + (' FDs: %i R/%i W' % (len(reactor.getReaders()), len(reactor.getWriters())) if p2pool.DEBUG else '')
751                     
752                     datums, dt = local_rate_monitor.get_datums_in_last()
753                     my_att_s = sum(datum['work']/dt for datum in datums)
754                     this_str += '\n Local: %sH/s in last %s Local dead on arrival: %s Expected time to share: %s' % (
755                         math.format(int(my_att_s)),
756                         math.format_dt(dt),
757                         math.format_binomial_conf(sum(1 for datum in datums if datum['dead']), len(datums), 0.95),
758                         math.format_dt(2**256 / tracker.shares[current_work.value['best_share_hash']].max_target / my_att_s) if my_att_s and current_work.value['best_share_hash'] else '???',
759                     )
760                     
761                     if height > 2:
762                         (stale_orphan_shares, stale_doa_shares), shares, _ = get_stale_counts()
763                         stale_prop = p2pool_data.get_average_stale_prop(tracker, current_work.value['best_share_hash'], min(720, height))
764                         real_att_s = p2pool_data.get_pool_attempts_per_second(tracker, current_work.value['best_share_hash'], min(height - 1, 720)) / (1 - stale_prop)
765                         
766                         this_str += '\n Shares: %i (%i orphan, %i dead) Stale rate: %s Efficiency: %s Current payout: %.4f %s' % (
767                             shares, stale_orphan_shares, stale_doa_shares,
768                             math.format_binomial_conf(stale_orphan_shares + stale_doa_shares, shares, 0.95),
769                             math.format_binomial_conf(stale_orphan_shares + stale_doa_shares, shares, 0.95, lambda x: (1 - x)/(1 - stale_prop)),
770                             get_current_txouts().get(bitcoin_data.pubkey_hash_to_script2(my_pubkey_hash), 0)*1e-8, net.PARENT.SYMBOL,
771                         )
772                         this_str += '\n Pool: %sH/s Stale rate: %.1f%% Expected time to block: %s' % (
773                             math.format(int(real_att_s)),
774                             100*stale_prop,
775                             math.format_dt(2**256 / current_work.value['bits'].target / real_att_s),
776                         )
777                         
778                         desired_version_counts = p2pool_data.get_desired_version_counts(tracker, current_work.value['best_share_hash'], min(720, height))
779                         majority_desired_version = max(desired_version_counts, key=lambda k: desired_version_counts[k])
780                         if majority_desired_version not in [0, 1]:
781                             print >>sys.stderr, '#'*40
782                             print >>sys.stderr, '>>> WARNING: A MAJORITY OF SHARES CONTAIN A VOTE FOR AN UNSUPPORTED SHARE IMPLEMENTATION! (v%i with %i%% support)' % (
783                                 majority_desired_version, 100*desired_version_counts[majority_desired_version]/sum(desired_version_counts.itervalues()))
784                             print >>sys.stderr, '>>> An upgrade is likely necessary. Check http://p2pool.forre.st/ for more information.'
785                             print >>sys.stderr, '#'*40
786                     
787                     if this_str != last_str or time.time() > last_time + 15:
788                         print this_str
789                         last_str = this_str
790                         last_time = time.time()
791                 except:
792                     log.err()
793         status_thread()
794     except:
795         reactor.stop()
796         log.err(None, 'Fatal error:')
797
798 def run():
799     class FixedArgumentParser(argparse.ArgumentParser):
800         def _read_args_from_files(self, arg_strings):
801             # expand arguments referencing files
802             new_arg_strings = []
803             for arg_string in arg_strings:
804                 
805                 # for regular arguments, just add them back into the list
806                 if not arg_string or arg_string[0] not in self.fromfile_prefix_chars:
807                     new_arg_strings.append(arg_string)
808                 
809                 # replace arguments referencing files with the file content
810                 else:
811                     try:
812                         args_file = open(arg_string[1:])
813                         try:
814                             arg_strings = []
815                             for arg_line in args_file.read().splitlines():
816                                 for arg in self.convert_arg_line_to_args(arg_line):
817                                     arg_strings.append(arg)
818                             arg_strings = self._read_args_from_files(arg_strings)
819                             new_arg_strings.extend(arg_strings)
820                         finally:
821                             args_file.close()
822                     except IOError:
823                         err = sys.exc_info()[1]
824                         self.error(str(err))
825             
826             # return the modified argument list
827             return new_arg_strings
828         
829         def convert_arg_line_to_args(self, arg_line):
830             return [arg for arg in arg_line.split() if arg.strip()]
831     
832     
833     realnets=dict((name, net) for name, net in networks.nets.iteritems() if '_testnet' not in name)
834     
835     parser = FixedArgumentParser(description='p2pool (version %s)' % (p2pool.__version__,), fromfile_prefix_chars='@')
836     parser.add_argument('--version', action='version', version=p2pool.__version__)
837     parser.add_argument('--net',
838         help='use specified network (default: bitcoin)',
839         action='store', choices=sorted(realnets), default='bitcoin', dest='net_name')
840     parser.add_argument('--testnet',
841         help='''use the network's testnet''',
842         action='store_const', const=True, default=False, dest='testnet')
843     parser.add_argument('--debug',
844         help='enable debugging mode',
845         action='store_const', const=True, default=False, dest='debug')
846     parser.add_argument('-a', '--address',
847         help='generate payouts to this address (default: <address requested from bitcoind>)',
848         type=str, action='store', default=None, dest='address')
849     parser.add_argument('--datadir',
850         help='store data in this directory (default: <directory run_p2pool.py is in>/data)',
851         type=str, action='store', default=None, dest='datadir')
852     parser.add_argument('--logfile',
853         help='''log to this file (default: data/<NET>/log)''',
854         type=str, action='store', default=None, dest='logfile')
855     parser.add_argument('--merged',
856         help='call getauxblock on this url to get work for merged mining (example: http://ncuser:ncpass@127.0.0.1:10332/)',
857         type=str, action='append', default=[], dest='merged_urls')
858     parser.add_argument('--give-author', metavar='DONATION_PERCENTAGE',
859         help='donate this percentage of work towards the development of p2pool (default: 0.5)',
860         type=float, action='store', default=0.5, dest='donation_percentage')
861     parser.add_argument('--irc-announce',
862         help='announce any blocks found on irc://irc.freenode.net/#p2pool',
863         action='store_true', default=False, dest='irc_announce')
864     
865     p2pool_group = parser.add_argument_group('p2pool interface')
866     p2pool_group.add_argument('--p2pool-port', metavar='PORT',
867         help='use port PORT to listen for connections (forward this port from your router!) (default: %s)' % ', '.join('%s:%i' % (name, net.P2P_PORT) for name, net in sorted(realnets.items())),
868         type=int, action='store', default=None, dest='p2pool_port')
869     p2pool_group.add_argument('-n', '--p2pool-node', metavar='ADDR[:PORT]',
870         help='connect to existing p2pool node at ADDR listening on port PORT (defaults to default p2pool P2P port) in addition to builtin addresses',
871         type=str, action='append', default=[], dest='p2pool_nodes')
872     parser.add_argument('--disable-upnp',
873         help='''don't attempt to use UPnP to forward p2pool's P2P port from the Internet to this computer''',
874         action='store_false', default=True, dest='upnp')
875     p2pool_group.add_argument('--max-conns', metavar='CONNS',
876         help='maximum incoming connections (default: 40)',
877         type=int, action='store', default=40, dest='p2pool_conns')
878     
879     worker_group = parser.add_argument_group('worker interface')
880     worker_group.add_argument('-w', '--worker-port', metavar='PORT or ADDR:PORT',
881         help='listen on PORT on interface with ADDR for RPC connections from miners (default: all interfaces, %s)' % ', '.join('%s:%i' % (name, net.WORKER_PORT) for name, net in sorted(realnets.items())),
882         type=str, action='store', default=None, dest='worker_endpoint')
883     worker_group.add_argument('-f', '--fee', metavar='FEE_PERCENTAGE',
884         help='''charge workers mining to their own bitcoin address (by setting their miner's username to a bitcoin address) this percentage fee to mine on your p2pool instance. Amount displayed at http://127.0.0.1:WORKER_PORT/fee (default: 0)''',
885         type=float, action='store', default=0, dest='worker_fee')
886     
887     bitcoind_group = parser.add_argument_group('bitcoind interface')
888     bitcoind_group.add_argument('--bitcoind-address', metavar='BITCOIND_ADDRESS',
889         help='connect to this address (default: 127.0.0.1)',
890         type=str, action='store', default='127.0.0.1', dest='bitcoind_address')
891     bitcoind_group.add_argument('--bitcoind-rpc-port', metavar='BITCOIND_RPC_PORT',
892         help='''connect to JSON-RPC interface at this port (default: %s <read from bitcoin.conf if password not provided>)''' % ', '.join('%s:%i' % (name, net.PARENT.RPC_PORT) for name, net in sorted(realnets.items())),
893         type=int, action='store', default=None, dest='bitcoind_rpc_port')
894     bitcoind_group.add_argument('--bitcoind-p2p-port', metavar='BITCOIND_P2P_PORT',
895         help='''connect to P2P interface at this port (default: %s <read from bitcoin.conf if password not provided>)''' % ', '.join('%s:%i' % (name, net.PARENT.P2P_PORT) for name, net in sorted(realnets.items())),
896         type=int, action='store', default=None, dest='bitcoind_p2p_port')
897     
898     bitcoind_group.add_argument(metavar='BITCOIND_RPCUSERPASS',
899         help='bitcoind RPC interface username, then password, space-separated (only one being provided will cause the username to default to being empty, and none will cause P2Pool to read them from bitcoin.conf)',
900         type=str, action='store', default=[], nargs='*', dest='bitcoind_rpc_userpass')
901     
902     args = parser.parse_args()
903     
904     if args.debug:
905         p2pool.DEBUG = True
906     
907     net_name = args.net_name + ('_testnet' if args.testnet else '')
908     net = networks.nets[net_name]
909     
910     datadir_path = os.path.join((os.path.join(os.path.dirname(sys.argv[0]), 'data') if args.datadir is None else args.datadir), net_name)
911     if not os.path.exists(datadir_path):
912         os.makedirs(datadir_path)
913     
914     if len(args.bitcoind_rpc_userpass) > 2:
915         parser.error('a maximum of two arguments are allowed')
916     args.bitcoind_rpc_username, args.bitcoind_rpc_password = ([None, None] + args.bitcoind_rpc_userpass)[-2:]
917     
918     if args.bitcoind_rpc_password is None:
919         if not hasattr(net.PARENT, 'CONF_FILE_FUNC'):
920             parser.error('This network has no configuration file function. Manually enter your RPC password.')
921         conf_path = net.PARENT.CONF_FILE_FUNC()
922         if not os.path.exists(conf_path):
923             parser.error('''Bitcoin configuration file not found. Manually enter your RPC password.\r\n'''
924                 '''If you actually haven't created a configuration file, you should create one at %s with the text:\r\n'''
925                 '''\r\n'''
926                 '''server=1\r\n'''
927                 '''rpcpassword=%x''' % (conf_path, random.randrange(2**128)))
928         with open(conf_path, 'rb') as f:
929             cp = ConfigParser.RawConfigParser()
930             cp.readfp(StringIO.StringIO('[x]\r\n' + f.read()))
931             for conf_name, var_name, var_type in [
932                 ('rpcuser', 'bitcoind_rpc_username', str),
933                 ('rpcpassword', 'bitcoind_rpc_password', str),
934                 ('rpcport', 'bitcoind_rpc_port', int),
935                 ('port', 'bitcoind_p2p_port', int),
936             ]:
937                 if getattr(args, var_name) is None and cp.has_option('x', conf_name):
938                     setattr(args, var_name, var_type(cp.get('x', conf_name)))
939         if args.bitcoind_rpc_password is None:
940             parser.error('''Bitcoin configuration file didn't contain an rpcpassword= line! Add one!''')
941     
942     if args.bitcoind_rpc_username is None:
943         args.bitcoind_rpc_username = ''
944     
945     if args.bitcoind_rpc_port is None:
946         args.bitcoind_rpc_port = net.PARENT.RPC_PORT
947     
948     if args.bitcoind_p2p_port is None:
949         args.bitcoind_p2p_port = net.PARENT.P2P_PORT
950     
951     if args.p2pool_port is None:
952         args.p2pool_port = net.P2P_PORT
953     
954     if args.worker_endpoint is None:
955         worker_endpoint = '', net.WORKER_PORT
956     elif ':' not in args.worker_endpoint:
957         worker_endpoint = '', int(args.worker_endpoint)
958     else:
959         addr, port = args.worker_endpoint.rsplit(':', 1)
960         worker_endpoint = addr, int(port)
961     
962     if args.address is not None:
963         try:
964             args.pubkey_hash = bitcoin_data.address_to_pubkey_hash(args.address, net.PARENT)
965         except Exception, e:
966             parser.error('error parsing address: ' + repr(e))
967     else:
968         args.pubkey_hash = None
969     
970     def separate_url(url):
971         s = urlparse.urlsplit(url)
972         if '@' not in s.netloc:
973             parser.error('merged url netloc must contain an "@"')
974         userpass, new_netloc = s.netloc.rsplit('@', 1)
975         return urlparse.urlunsplit(s._replace(netloc=new_netloc)), userpass
976     merged_urls = map(separate_url, args.merged_urls)
977     
978     if args.logfile is None:
979         args.logfile = os.path.join(datadir_path, 'log')
980     
981     logfile = logging.LogFile(args.logfile)
982     pipe = logging.TimestampingPipe(logging.TeePipe([logging.EncodeReplacerPipe(sys.stderr), logfile]))
983     sys.stdout = logging.AbortPipe(pipe)
984     sys.stderr = log.DefaultObserver.stderr = logging.AbortPipe(logging.PrefixPipe(pipe, '> '))
985     if hasattr(signal, "SIGUSR1"):
986         def sigusr1(signum, frame):
987             print 'Caught SIGUSR1, closing %r...' % (args.logfile,)
988             logfile.reopen()
989             print '...and reopened %r after catching SIGUSR1.' % (args.logfile,)
990         signal.signal(signal.SIGUSR1, sigusr1)
991     task.LoopingCall(logfile.reopen).start(5)
992     
993     reactor.callWhenRunning(main, args, net, datadir_path, merged_urls, worker_endpoint)
994     reactor.run()