added desired pseudoshare target to user string, now ADDRESS[/SHARE_DIFF][+PSEUDOSHAR...
[p2pool.git] / p2pool / main.py
1 from __future__ import division
2
3 import ConfigParser
4 import StringIO
5 import argparse
6 import os
7 import random
8 import struct
9 import sys
10 import time
11 import signal
12 import traceback
13 import urlparse
14
15 from twisted.internet import defer, reactor, protocol, task
16 from twisted.web import server
17 from twisted.python import log
18 from nattraverso import portmapper, ipdiscover
19
20 import bitcoin.p2p as bitcoin_p2p, bitcoin.getwork as bitcoin_getwork, bitcoin.data as bitcoin_data
21 from bitcoin import worker_interface
22 from util import expiring_dict, jsonrpc, variable, deferral, math, logging, pack
23 from . import p2p, networks, web
24 import p2pool, p2pool.data as p2pool_data
25
26 @deferral.retry('Error getting work from bitcoind:', 3)
27 @defer.inlineCallbacks
28 def getwork(bitcoind):
29     try:
30         work = yield bitcoind.rpc_getmemorypool()
31     except jsonrpc.Error, e:
32         if e.code == -32601: # Method not found
33             print >>sys.stderr, 'Error: Bitcoin version too old! Upgrade to v0.5 or newer!'
34             raise deferral.RetrySilentlyException()
35         raise
36     packed_transactions = [x.decode('hex') for x in work['transactions']]
37     defer.returnValue(dict(
38         version=work['version'],
39         previous_block_hash=int(work['previousblockhash'], 16),
40         transactions=map(bitcoin_data.tx_type.unpack, packed_transactions),
41         merkle_branch=bitcoin_data.calculate_merkle_branch([0] + map(bitcoin_data.hash256, packed_transactions), 0),
42         subsidy=work['coinbasevalue'],
43         time=work['time'],
44         bits=bitcoin_data.FloatingIntegerType().unpack(work['bits'].decode('hex')[::-1]) if isinstance(work['bits'], (str, unicode)) else bitcoin_data.FloatingInteger(work['bits']),
45         coinbaseflags=work['coinbaseflags'].decode('hex') if 'coinbaseflags' in work else ''.join(x.decode('hex') for x in work['coinbaseaux'].itervalues()) if 'coinbaseaux' in work else '',
46     ))
47
48 @defer.inlineCallbacks
49 def main(args, net, datadir_path, merged_urls, worker_endpoint):
50     try:
51         print 'p2pool (version %s)' % (p2pool.__version__,)
52         print
53         
54         # connect to bitcoind over JSON-RPC and do initial getmemorypool
55         url = 'http://%s:%i/' % (args.bitcoind_address, args.bitcoind_rpc_port)
56         print '''Testing bitcoind RPC connection to '%s' with username '%s'...''' % (url, args.bitcoind_rpc_username)
57         bitcoind = jsonrpc.Proxy(url, (args.bitcoind_rpc_username, args.bitcoind_rpc_password))
58         good = yield deferral.retry('Error while checking bitcoind identity:', 1)(net.PARENT.RPC_CHECK)(bitcoind)
59         if not good:
60             print >>sys.stderr, "    Check failed! Make sure that you're connected to the right bitcoind with --bitcoind-rpc-port!"
61             return
62         temp_work = yield getwork(bitcoind)
63         print '    ...success!'
64         print '    Current block hash: %x' % (temp_work['previous_block_hash'],)
65         print
66         
67         # connect to bitcoind over bitcoin-p2p
68         print '''Testing bitcoind P2P connection to '%s:%s'...''' % (args.bitcoind_address, args.bitcoind_p2p_port)
69         factory = bitcoin_p2p.ClientFactory(net.PARENT)
70         reactor.connectTCP(args.bitcoind_address, args.bitcoind_p2p_port, factory)
71         yield factory.getProtocol() # waits until handshake is successful
72         print '    ...success!'
73         print
74         
75         print 'Determining payout address...'
76         if args.pubkey_hash is None:
77             address_path = os.path.join(datadir_path, 'cached_payout_address')
78             
79             if os.path.exists(address_path):
80                 with open(address_path, 'rb') as f:
81                     address = f.read().strip('\r\n')
82                 print '    Loaded cached address: %s...' % (address,)
83             else:
84                 address = None
85             
86             if address is not None:
87                 res = yield deferral.retry('Error validating cached address:', 5)(lambda: bitcoind.rpc_validateaddress(address))()
88                 if not res['isvalid'] or not res['ismine']:
89                     print '    Cached address is either invalid or not controlled by local bitcoind!'
90                     address = None
91             
92             if address is None:
93                 print '    Getting payout address from bitcoind...'
94                 address = yield deferral.retry('Error getting payout address from bitcoind:', 5)(lambda: bitcoind.rpc_getaccountaddress('p2pool'))()
95             
96             with open(address_path, 'wb') as f:
97                 f.write(address)
98             
99             my_pubkey_hash = bitcoin_data.address_to_pubkey_hash(address, net.PARENT)
100         else:
101             my_pubkey_hash = args.pubkey_hash
102         print '    ...success! Payout address:', bitcoin_data.pubkey_hash_to_address(my_pubkey_hash, net.PARENT)
103         print
104         
105         my_share_hashes = set()
106         my_doa_share_hashes = set()
107         
108         tracker = p2pool_data.OkayTracker(net, my_share_hashes, my_doa_share_hashes)
109         shared_share_hashes = set()
110         ss = p2pool_data.ShareStore(os.path.join(datadir_path, 'shares.'), net)
111         known_verified = set()
112         recent_blocks = []
113         print "Loading shares..."
114         for i, (mode, contents) in enumerate(ss.get_shares()):
115             if mode == 'share':
116                 if contents.hash in tracker.shares:
117                     continue
118                 shared_share_hashes.add(contents.hash)
119                 contents.time_seen = 0
120                 tracker.add(contents)
121                 if len(tracker.shares) % 1000 == 0 and tracker.shares:
122                     print "    %i" % (len(tracker.shares),)
123             elif mode == 'verified_hash':
124                 known_verified.add(contents)
125             else:
126                 raise AssertionError()
127         print "    ...inserting %i verified shares..." % (len(known_verified),)
128         for h in known_verified:
129             if h not in tracker.shares:
130                 ss.forget_verified_share(h)
131                 continue
132             tracker.verified.add(tracker.shares[h])
133         print "    ...done loading %i shares!" % (len(tracker.shares),)
134         print
135         tracker.removed.watch(lambda share: ss.forget_share(share.hash))
136         tracker.verified.removed.watch(lambda share: ss.forget_verified_share(share.hash))
137         tracker.removed.watch(lambda share: shared_share_hashes.discard(share.hash))
138         
139         peer_heads = expiring_dict.ExpiringDict(300) # hash -> peers that know of it
140         
141         pre_current_work = variable.Variable(None)
142         pre_merged_work = variable.Variable({})
143         # information affecting work that should trigger a long-polling update
144         current_work = variable.Variable(None)
145         # information affecting work that should not trigger a long-polling update
146         current_work2 = variable.Variable(None)
147         
148         requested = expiring_dict.ExpiringDict(300)
149         
150         print 'Initializing work...'
151         @defer.inlineCallbacks
152         def set_real_work1():
153             work = yield getwork(bitcoind)
154             current_work2.set(dict(
155                 time=work['time'],
156                 transactions=work['transactions'],
157                 merkle_branch=work['merkle_branch'],
158                 subsidy=work['subsidy'],
159                 clock_offset=time.time() - work['time'],
160                 last_update=time.time(),
161             )) # second set first because everything hooks on the first
162             pre_current_work.set(dict(
163                 version=work['version'],
164                 previous_block=work['previous_block_hash'],
165                 bits=work['bits'],
166                 coinbaseflags=work['coinbaseflags'],
167             ))
168         yield set_real_work1()
169         
170         if '\ngetblock ' in (yield deferral.retry()(bitcoind.rpc_help)()):
171             height_cacher = deferral.DeferredCacher(defer.inlineCallbacks(lambda block_hash: defer.returnValue((lambda x: x['blockcount'] if 'blockcount' in x else x['height'])((yield bitcoind.rpc_getblock('%x' % (block_hash,)))))))
172             best_height_cached = variable.Variable((yield deferral.retry()(height_cacher)(pre_current_work.value['previous_block'])))
173             def get_height_rel_highest(block_hash):
174                 this_height = height_cacher.call_now(block_hash, 0)
175                 best_height = height_cacher.call_now(pre_current_work.value['previous_block'], 0)
176                 best_height_cached.set(max(best_height_cached.value, this_height, best_height))
177                 return this_height - best_height_cached.value
178         else:
179             get_height_rel_highest = bitcoin_p2p.HeightTracker(bitcoind, factory, 5*net.SHARE_PERIOD*net.CHAIN_LENGTH/net.PARENT.BLOCK_PERIOD).get_height_rel_highest
180         
181         def set_real_work2():
182             best, desired = tracker.think(get_height_rel_highest, pre_current_work.value['previous_block'], pre_current_work.value['bits'])
183             
184             t = dict(pre_current_work.value)
185             t['best_share_hash'] = best
186             t['mm_chains'] = pre_merged_work.value
187             current_work.set(t)
188             
189             t = time.time()
190             for peer2, share_hash in desired:
191                 if share_hash not in tracker.tails: # was received in the time tracker.think was running
192                     continue
193                 last_request_time, count = requested.get(share_hash, (None, 0))
194                 if last_request_time is not None and last_request_time - 5 < t < last_request_time + 10 * 1.5**count:
195                     continue
196                 potential_peers = set()
197                 for head in tracker.tails[share_hash]:
198                     potential_peers.update(peer_heads.get(head, set()))
199                 potential_peers = [peer for peer in potential_peers if peer.connected2]
200                 if count == 0 and peer2 is not None and peer2.connected2:
201                     peer = peer2
202                 else:
203                     peer = random.choice(potential_peers) if potential_peers and random.random() > .2 else peer2
204                     if peer is None:
205                         continue
206                 
207                 print 'Requesting parent share %s from %s' % (p2pool_data.format_hash(share_hash), '%s:%i' % peer.addr)
208                 peer.send_getshares(
209                     hashes=[share_hash],
210                     parents=2000,
211                     stops=list(set(tracker.heads) | set(
212                         tracker.get_nth_parent_hash(head, min(max(0, tracker.get_height_and_last(head)[0] - 1), 10)) for head in tracker.heads
213                     ))[:100],
214                 )
215                 requested[share_hash] = t, count + 1
216         pre_current_work.changed.watch(lambda _: set_real_work2())
217         pre_merged_work.changed.watch(lambda _: set_real_work2())
218         set_real_work2()
219         print '    ...success!'
220         print
221         
222         
223         @defer.inlineCallbacks
224         def set_merged_work(merged_url, merged_userpass):
225             merged_proxy = jsonrpc.Proxy(merged_url, (merged_userpass,))
226             while True:
227                 auxblock = yield deferral.retry('Error while calling merged getauxblock:', 1)(merged_proxy.rpc_getauxblock)()
228                 pre_merged_work.set(dict(pre_merged_work.value, **{auxblock['chainid']: dict(
229                     hash=int(auxblock['hash'], 16),
230                     target=pack.IntType(256).unpack(auxblock['target'].decode('hex')),
231                     merged_proxy=merged_proxy,
232                 )}))
233                 yield deferral.sleep(1)
234         for merged_url, merged_userpass in merged_urls:
235             set_merged_work(merged_url, merged_userpass)
236         
237         @pre_merged_work.changed.watch
238         def _(new_merged_work):
239             print 'Got new merged mining work!'
240         
241         # setup p2p logic and join p2pool network
242         
243         class Node(p2p.Node):
244             def handle_shares(self, shares, peer):
245                 if len(shares) > 5:
246                     print 'Processing %i shares from %s...' % (len(shares), '%s:%i' % peer.addr if peer is not None else None)
247                 
248                 new_count = 0
249                 for share in shares:
250                     if share.hash in tracker.shares:
251                         #print 'Got duplicate share, ignoring. Hash: %s' % (p2pool_data.format_hash(share.hash),)
252                         continue
253                     
254                     new_count += 1
255                     
256                     #print 'Received share %s from %r' % (p2pool_data.format_hash(share.hash), share.peer.addr if share.peer is not None else None)
257                     
258                     tracker.add(share)
259                 
260                 if shares and peer is not None:
261                     peer_heads.setdefault(shares[0].hash, set()).add(peer)
262                 
263                 if new_count:
264                     set_real_work2()
265                 
266                 if len(shares) > 5:
267                     print '... done processing %i shares. New: %i Have: %i/~%i' % (len(shares), new_count, len(tracker.shares), 2*net.CHAIN_LENGTH)
268             
269             def handle_share_hashes(self, hashes, peer):
270                 t = time.time()
271                 get_hashes = []
272                 for share_hash in hashes:
273                     if share_hash in tracker.shares:
274                         continue
275                     last_request_time, count = requested.get(share_hash, (None, 0))
276                     if last_request_time is not None and last_request_time - 5 < t < last_request_time + 10 * 1.5**count:
277                         continue
278                     print 'Got share hash, requesting! Hash: %s' % (p2pool_data.format_hash(share_hash),)
279                     get_hashes.append(share_hash)
280                     requested[share_hash] = t, count + 1
281                 
282                 if hashes and peer is not None:
283                     peer_heads.setdefault(hashes[0], set()).add(peer)
284                 if get_hashes:
285                     peer.send_getshares(hashes=get_hashes, parents=0, stops=[])
286             
287             def handle_get_shares(self, hashes, parents, stops, peer):
288                 parents = min(parents, 1000//len(hashes))
289                 stops = set(stops)
290                 shares = []
291                 for share_hash in hashes:
292                     for share in tracker.get_chain(share_hash, min(parents + 1, tracker.get_height(share_hash))):
293                         if share.hash in stops:
294                             break
295                         shares.append(share)
296                 print 'Sending %i shares to %s:%i' % (len(shares), peer.addr[0], peer.addr[1])
297                 peer.sendShares(shares)
298         
299         
300         submit_block = deferral.retry('Error submitting primary block: (will retry)', 10, 10)(lambda block: bitcoind.rpc_getmemorypool(bitcoin_data.block_type.pack(block).encode('hex')))
301         
302         @tracker.verified.added.watch
303         def _(share):
304             if share.pow_hash <= share.header['bits'].target:
305                 submit_block(share.as_block(tracker))
306                 print
307                 print 'GOT BLOCK FROM PEER! Passing to bitcoind! %s bitcoin: %s%064x' % (p2pool_data.format_hash(share.hash), net.PARENT.BLOCK_EXPLORER_URL_PREFIX, share.header_hash)
308                 print
309                 recent_blocks.append(dict(ts=share.timestamp, hash='%064x' % (share.header_hash,)))
310         
311         print 'Joining p2pool network using port %i...' % (args.p2pool_port,)
312         
313         @defer.inlineCallbacks
314         def parse(x):
315             if ':' in x:
316                 ip, port = x.split(':')
317                 defer.returnValue(((yield reactor.resolve(ip)), int(port)))
318             else:
319                 defer.returnValue(((yield reactor.resolve(x)), net.P2P_PORT))
320         
321         addrs = {}
322         if os.path.exists(os.path.join(datadir_path, 'addrs.txt')):
323             try:
324                 addrs.update(dict(eval(x) for x in open(os.path.join(datadir_path, 'addrs.txt'))))
325             except:
326                 print >>sys.stderr, "error reading addrs"
327         for addr_df in map(parse, net.BOOTSTRAP_ADDRS):
328             try:
329                 addr = yield addr_df
330                 if addr not in addrs:
331                     addrs[addr] = (0, time.time(), time.time())
332             except:
333                 log.err()
334         
335         connect_addrs = set()
336         for addr_df in map(parse, args.p2pool_nodes):
337             try:
338                 connect_addrs.add((yield addr_df))
339             except:
340                 log.err()
341         
342         p2p_node = Node(
343             best_share_hash_func=lambda: current_work.value['best_share_hash'],
344             port=args.p2pool_port,
345             net=net,
346             addr_store=addrs,
347             connect_addrs=connect_addrs,
348         )
349         p2p_node.start()
350         
351         task.LoopingCall(lambda: open(os.path.join(datadir_path, 'addrs.txt'), 'w').writelines(repr(x) + '\n' for x in p2p_node.addr_store.iteritems())).start(60)
352         
353         # send share when the chain changes to their chain
354         def work_changed(new_work):
355             #print 'Work changed:', new_work
356             shares = []
357             for share in tracker.get_chain(new_work['best_share_hash'], min(5, tracker.get_height(new_work['best_share_hash']))):
358                 if share.hash in shared_share_hashes:
359                     break
360                 shared_share_hashes.add(share.hash)
361                 shares.append(share)
362             
363             for peer in p2p_node.peers.itervalues():
364                 peer.sendShares([share for share in shares if share.peer is not peer])
365         
366         current_work.changed.watch(work_changed)
367         
368         def save_shares():
369             for share in tracker.get_chain(current_work.value['best_share_hash'], min(tracker.get_height(current_work.value['best_share_hash']), 2*net.CHAIN_LENGTH)):
370                 ss.add_share(share)
371                 if share.hash in tracker.verified.shares:
372                     ss.add_verified_hash(share.hash)
373         task.LoopingCall(save_shares).start(60)
374         
375         print '    ...success!'
376         print
377         
378         if args.upnp:
379             @defer.inlineCallbacks
380             def upnp_thread():
381                 while True:
382                     try:
383                         is_lan, lan_ip = yield ipdiscover.get_local_ip()
384                         if is_lan:
385                             pm = yield portmapper.get_port_mapper()
386                             yield pm._upnp.add_port_mapping(lan_ip, args.p2pool_port, args.p2pool_port, 'p2pool', 'TCP')
387                     except defer.TimeoutError:
388                         pass
389                     except:
390                         if p2pool.DEBUG:
391                             log.err(None, 'UPnP error:')
392                     yield deferral.sleep(random.expovariate(1/120))
393             upnp_thread()
394         
395         # start listening for workers with a JSON-RPC server
396         
397         print 'Listening for workers on %r port %i...' % (worker_endpoint[0], worker_endpoint[1])
398         
399         if os.path.exists(os.path.join(datadir_path, 'vip_pass')):
400             with open(os.path.join(datadir_path, 'vip_pass'), 'rb') as f:
401                 vip_pass = f.read().strip('\r\n')
402         else:
403             vip_pass = '%016x' % (random.randrange(2**64),)
404             with open(os.path.join(datadir_path, 'vip_pass'), 'wb') as f:
405                 f.write(vip_pass)
406         print '    Worker password:', vip_pass, '(only required for generating graphs)'
407         
408         # setup worker logic
409         
410         removed_unstales_var = variable.Variable((0, 0, 0))
411         removed_doa_unstales_var = variable.Variable(0)
412         @tracker.verified.removed.watch
413         def _(share):
414             if share.hash in my_share_hashes and tracker.is_child_of(share.hash, current_work.value['best_share_hash']):
415                 assert share.share_data['stale_info'] in [0, 253, 254] # we made these shares in this instance
416                 removed_unstales_var.set((
417                     removed_unstales_var.value[0] + 1,
418                     removed_unstales_var.value[1] + (1 if share.share_data['stale_info'] == 253 else 0),
419                     removed_unstales_var.value[2] + (1 if share.share_data['stale_info'] == 254 else 0),
420                 ))
421             if share.hash in my_doa_share_hashes and tracker.is_child_of(share.hash, current_work.value['best_share_hash']):
422                 removed_doa_unstales.set(removed_doa_unstales.value + 1)
423         
424         def get_stale_counts():
425             '''Returns (orphans, doas), total, (orphans_recorded_in_chain, doas_recorded_in_chain)'''
426             my_shares = len(my_share_hashes)
427             my_doa_shares = len(my_doa_share_hashes)
428             delta = tracker.verified.get_delta(current_work.value['best_share_hash'])
429             my_shares_in_chain = delta.my_count + removed_unstales_var.value[0]
430             my_doa_shares_in_chain = delta.my_doa_count + removed_doa_unstales_var.value
431             orphans_recorded_in_chain = delta.my_orphan_announce_count + removed_unstales_var.value[1]
432             doas_recorded_in_chain = delta.my_dead_announce_count + removed_unstales_var.value[2]
433             
434             my_shares_not_in_chain = my_shares - my_shares_in_chain
435             my_doa_shares_not_in_chain = my_doa_shares - my_doa_shares_in_chain
436             
437             return (my_shares_not_in_chain - my_doa_shares_not_in_chain, my_doa_shares_not_in_chain), my_shares, (orphans_recorded_in_chain, doas_recorded_in_chain)
438         
439         
440         pseudoshare_received = variable.Event()
441         local_rate_monitor = math.RateMonitor(10*60)
442         
443         class WorkerBridge(worker_interface.WorkerBridge):
444             def __init__(self):
445                 worker_interface.WorkerBridge.__init__(self)
446                 self.new_work_event = current_work.changed
447                 self.recent_shares_ts_work = []
448             
449             def preprocess_request(self, request):
450                 user = request.getUser() if request.getUser() is not None else ''
451                 
452                 desired_pseudoshare_target = None
453                 if '+' in user:
454                     user, desired_pseudoshare_difficulty_str = user.rsplit('+', 1)
455                     try:
456                         desired_pseudoshare_target = bitcoin_data.difficulty_to_target(float(desired_pseudoshare_difficulty_str))
457                     except:
458                         pass
459                 
460                 desired_share_target = 2**256 - 1
461                 if '/' in user:
462                     user, min_diff_str = user.rsplit('/', 1)
463                     try:
464                         desired_share_target = bitcoin_data.difficulty_to_target(float(min_diff_str))
465                     except:
466                         pass
467                 
468                 if random.uniform(0, 100) < args.worker_fee:
469                     pubkey_hash = my_pubkey_hash
470                 else:
471                     try:
472                         pubkey_hash = bitcoin_data.address_to_pubkey_hash(user, net.PARENT)
473                     except: # XXX blah
474                         pubkey_hash = my_pubkey_hash
475                 
476                 return pubkey_hash, desired_share_target, desired_pseudoshare_target
477             
478             def get_work(self, pubkey_hash, desired_share_target, desired_pseudoshare_target):
479                 if len(p2p_node.peers) == 0 and net.PERSIST:
480                     raise jsonrpc.Error(-12345, u'p2pool is not connected to any peers')
481                 if current_work.value['best_share_hash'] is None and net.PERSIST:
482                     raise jsonrpc.Error(-12345, u'p2pool is downloading shares')
483                 if time.time() > current_work2.value['last_update'] + 60:
484                     raise jsonrpc.Error(-12345, u'lost contact with bitcoind')
485                 
486                 if current_work.value['mm_chains']:
487                     tree, size = bitcoin_data.make_auxpow_tree(current_work.value['mm_chains'])
488                     mm_hashes = [current_work.value['mm_chains'].get(tree.get(i), dict(hash=0))['hash'] for i in xrange(size)]
489                     mm_data = '\xfa\xbemm' + bitcoin_data.aux_pow_coinbase_type.pack(dict(
490                         merkle_root=bitcoin_data.merkle_hash(mm_hashes),
491                         size=size,
492                         nonce=0,
493                     ))
494                     mm_later = [(aux_work, mm_hashes.index(aux_work['hash']), mm_hashes) for chain_id, aux_work in current_work.value['mm_chains'].iteritems()]
495                 else:
496                     mm_data = ''
497                     mm_later = []
498                 
499                 new = (tracker.shares[current_work.value['best_share_hash']].timestamp if current_work.value['best_share_hash'] is not None else time.time()) > net.SWITCH_TIME
500                 
501                 if new:
502                     share_info, generate_tx = p2pool_data.new_generate_transaction(
503                         tracker=tracker,
504                         share_data=dict(
505                             previous_share_hash=current_work.value['best_share_hash'],
506                             coinbase=(mm_data + current_work.value['coinbaseflags'])[:100],
507                             nonce=random.randrange(2**32),
508                             pubkey_hash=pubkey_hash,
509                             subsidy=current_work2.value['subsidy'],
510                             donation=math.perfect_round(65535*args.donation_percentage/100),
511                             stale_info=(lambda (orphans, doas), total, (orphans_recorded_in_chain, doas_recorded_in_chain):
512                                 253 if orphans > orphans_recorded_in_chain else
513                                 254 if doas > doas_recorded_in_chain else
514                                 0
515                             )(*get_stale_counts()),
516                         ),
517                         block_target=current_work.value['bits'].target,
518                         desired_timestamp=int(time.time() - current_work2.value['clock_offset']),
519                         desired_target=desired_share_target,
520                         net=net,
521                     )
522                 else:
523                     share_info, generate_tx = p2pool_data.generate_transaction(
524                         tracker=tracker,
525                         share_data=dict(
526                             previous_share_hash=current_work.value['best_share_hash'],
527                             coinbase=(mm_data + current_work.value['coinbaseflags'])[:100],
528                             nonce=struct.pack('<Q', random.randrange(2**64)),
529                             new_script=bitcoin_data.pubkey_hash_to_script2(pubkey_hash),
530                             subsidy=current_work2.value['subsidy'],
531                             donation=math.perfect_round(65535*args.donation_percentage/100),
532                             stale_info=(lambda (orphans, doas), total, (orphans_recorded_in_chain, doas_recorded_in_chain):
533                                 253 if orphans > orphans_recorded_in_chain else
534                                 254 if doas > doas_recorded_in_chain else
535                                 0
536                             )(*get_stale_counts()),
537                         ),
538                         block_target=current_work.value['bits'].target,
539                         desired_timestamp=int(time.time() - current_work2.value['clock_offset']),
540                         net=net,
541                     )
542                 
543                 target = net.PARENT.SANE_MAX_TARGET
544                 if desired_pseudoshare_target is None:
545                     if len(self.recent_shares_ts_work) == 50:
546                         hash_rate = sum(work for ts, work in self.recent_shares_ts_work)//(self.recent_shares_ts_work[-1][0] - self.recent_shares_ts_work[0][0])
547                         target = min(target, 2**256//hash_rate)
548                 else:
549                     target = min(target, desired_pseudoshare_target)
550                 target = max(target, share_info['bits'].target)
551                 for aux_work in current_work.value['mm_chains'].itervalues():
552                     target = max(target, aux_work['target'])
553                 
554                 transactions = [generate_tx] + list(current_work2.value['transactions'])
555                 packed_generate_tx = bitcoin_data.tx_type.pack(generate_tx)
556                 merkle_root = bitcoin_data.check_merkle_branch(bitcoin_data.hash256(packed_generate_tx), 0, current_work2.value['merkle_branch'])
557                 
558                 getwork_time = time.time()
559                 merkle_branch = current_work2.value['merkle_branch']
560                 
561                 print 'New work for worker! Difficulty: %.06f Share difficulty: %.06f Total block value: %.6f %s including %i transactions' % (
562                     bitcoin_data.target_to_difficulty(target),
563                     bitcoin_data.target_to_difficulty(share_info['bits'].target),
564                     current_work2.value['subsidy']*1e-8, net.PARENT.SYMBOL,
565                     len(current_work2.value['transactions']),
566                 )
567                 
568                 ba = bitcoin_getwork.BlockAttempt(
569                     version=current_work.value['version'],
570                     previous_block=current_work.value['previous_block'],
571                     merkle_root=merkle_root,
572                     timestamp=current_work2.value['time'],
573                     bits=current_work.value['bits'],
574                     share_target=target,
575                 )
576                 
577                 received_header_hashes = set()
578                 
579                 def got_response(header, request):
580                     assert header['merkle_root'] == merkle_root
581                     
582                     header_hash = bitcoin_data.hash256(bitcoin_data.block_header_type.pack(header))
583                     pow_hash = net.PARENT.POW_FUNC(bitcoin_data.block_header_type.pack(header))
584                     on_time = current_work.value['best_share_hash'] == share_info['share_data']['previous_share_hash']
585                     
586                     try:
587                         if pow_hash <= header['bits'].target or p2pool.DEBUG:
588                             submit_block(dict(header=header, txs=transactions))
589                             if pow_hash <= header['bits'].target:
590                                 print
591                                 print 'GOT BLOCK FROM MINER! Passing to bitcoind! %s%064x' % (net.PARENT.BLOCK_EXPLORER_URL_PREFIX, header_hash)
592                                 print
593                                 recent_blocks.append(dict(ts=time.time(), hash='%064x' % (header_hash,)))
594                     except:
595                         log.err(None, 'Error while processing potential block:')
596                     
597                     for aux_work, index, hashes in mm_later:
598                         try:
599                             if pow_hash <= aux_work['target'] or p2pool.DEBUG:
600                                 df = deferral.retry('Error submitting merged block: (will retry)', 10, 10)(aux_work['merged_proxy'].rpc_getauxblock)(
601                                     pack.IntType(256, 'big').pack(aux_work['hash']).encode('hex'),
602                                     bitcoin_data.aux_pow_type.pack(dict(
603                                         merkle_tx=dict(
604                                             tx=transactions[0],
605                                             block_hash=header_hash,
606                                             merkle_branch=merkle_branch,
607                                             index=0,
608                                         ),
609                                         merkle_branch=bitcoin_data.calculate_merkle_branch(hashes, index),
610                                         index=index,
611                                         parent_block_header=header,
612                                     )).encode('hex'),
613                                 )
614                                 @df.addCallback
615                                 def _(result):
616                                     if result != (pow_hash <= aux_work['target']):
617                                         print >>sys.stderr, 'Merged block submittal result: %s Expected: %s' % (result, pow_hash <= aux_work['target'])
618                                     else:
619                                         print 'Merged block submittal result: %s' % (result,)
620                                 @df.addErrback
621                                 def _(err):
622                                     log.err(err, 'Error submitting merged block:')
623                         except:
624                             log.err(None, 'Error while processing merged mining POW:')
625                     
626                     if pow_hash <= share_info['bits'].target:
627                         if new:
628                             min_header = dict(header);del min_header['merkle_root']
629                             hash_link = p2pool_data.prefix_to_hash_link(packed_generate_tx[:-32-4], p2pool_data.gentx_before_refhash)
630                             share = p2pool_data.NewShare(net, None, min_header, share_info, hash_link=hash_link, merkle_branch=merkle_branch, other_txs=transactions[1:] if pow_hash <= header['bits'].target else None)
631                         else:
632                             share = p2pool_data.Share(net, None, header, share_info, merkle_branch=merkle_branch, other_txs=transactions[1:] if pow_hash <= header['bits'].target else None)
633                         print 'GOT SHARE! %s %s prev %s age %.2fs%s' % (
634                             request.getUser(),
635                             p2pool_data.format_hash(share.hash),
636                             p2pool_data.format_hash(share.previous_hash),
637                             time.time() - getwork_time,
638                             ' DEAD ON ARRIVAL' if not on_time else '',
639                         )
640                         my_share_hashes.add(share.hash)
641                         if not on_time:
642                             my_doa_share_hashes.add(share.hash)
643                         
644                         tracker.add(share)
645                         if not p2pool.DEBUG:
646                             tracker.verified.add(share)
647                         set_real_work2()
648                         
649                         try:
650                             if pow_hash <= header['bits'].target or p2pool.DEBUG:
651                                 for peer in p2p_node.peers.itervalues():
652                                     peer.sendShares([share])
653                                 shared_share_hashes.add(share.hash)
654                         except:
655                             log.err(None, 'Error forwarding block solution:')
656                     
657                     if pow_hash <= target and header_hash not in received_header_hashes:
658                         pseudoshare_received.happened(bitcoin_data.target_to_average_attempts(target), not on_time, request.getUser() if request.getPassword() == vip_pass else None)
659                         self.recent_shares_ts_work.append((time.time(), bitcoin_data.target_to_average_attempts(target)))
660                         while len(self.recent_shares_ts_work) > 50:
661                             self.recent_shares_ts_work.pop(0)
662                         local_rate_monitor.add_datum(dict(work=bitcoin_data.target_to_average_attempts(target), dead=not on_time, user=request.getUser()))
663                     
664                     if header_hash in received_header_hashes:
665                         print >>sys.stderr, 'Worker %s @ %s submitted share more than once!' % (request.getUser(), request.getClientIP())
666                     received_header_hashes.add(header_hash)
667                     
668                     if pow_hash > target:
669                         print 'Worker %s submitted share with hash > target:' % (request.getUser(),)
670                         print '    Hash:   %56x' % (pow_hash,)
671                         print '    Target: %56x' % (target,)
672                     
673                     return on_time
674                 
675                 return ba, got_response
676         
677         get_current_txouts = lambda: p2pool_data.get_expected_payouts(tracker, current_work.value['best_share_hash'], current_work.value['bits'].target, current_work2.value['subsidy'], net)
678         
679         web_root = web.get_web_root(tracker, current_work, current_work2, get_current_txouts, datadir_path, net, get_stale_counts, my_pubkey_hash, local_rate_monitor, args.worker_fee, p2p_node, my_share_hashes, recent_blocks, pseudoshare_received)
680         worker_interface.WorkerInterface(WorkerBridge()).attach_to(web_root)
681         
682         deferral.retry('Error binding to worker port:', traceback=False)(reactor.listenTCP)(worker_endpoint[1], server.Site(web_root), interface=worker_endpoint[0])
683         
684         with open(os.path.join(os.path.join(datadir_path, 'ready_flag')), 'wb') as f:
685             pass
686         
687         print '    ...success!'
688         print
689         
690         
691         @defer.inlineCallbacks
692         def work_poller():
693             while True:
694                 flag = factory.new_block.get_deferred()
695                 try:
696                     yield set_real_work1()
697                 except:
698                     log.err()
699                 yield defer.DeferredList([flag, deferral.sleep(15)], fireOnOneCallback=True)
700         work_poller()
701         
702         
703         # done!
704         print 'Started successfully!'
705         print
706         
707         
708         if hasattr(signal, 'SIGALRM'):
709             signal.signal(signal.SIGALRM, lambda signum, frame: reactor.callFromThread(
710                 sys.stderr.write, 'Watchdog timer went off at:\n' + ''.join(traceback.format_stack())
711             ))
712             signal.siginterrupt(signal.SIGALRM, False)
713             task.LoopingCall(signal.alarm, 30).start(1)
714         
715         if args.irc_announce:
716             from twisted.words.protocols import irc
717             class IRCClient(irc.IRCClient):
718                 nickname = 'p2pool%02i' % (random.randrange(100),)
719                 channel = '#p2pool' if net.NAME == 'bitcoin' else '#p2pool-alt'
720                 def lineReceived(self, line):
721                     print repr(line)
722                     irc.IRCClient.lineReceived(self, line)
723                 def signedOn(self):
724                     irc.IRCClient.signedOn(self)
725                     self.factory.resetDelay()
726                     self.join(self.channel)
727                     self.watch_id = tracker.verified.added.watch(self._new_share)
728                     self.announced_hashes = set()
729                     self.delayed_messages = {}
730                 def privmsg(self, user, channel, message):
731                     if channel == self.channel and message in self.delayed_messages:
732                         self.delayed_messages.pop(message).cancel()
733                 def _new_share(self, share):
734                     if share.pow_hash <= share.header['bits'].target and share.header_hash not in self.announced_hashes and abs(share.timestamp - time.time()) < 10*60:
735                         self.announced_hashes.add(share.header_hash)
736                         message = '\x02%s BLOCK FOUND by %s! %s%064x' % (net.NAME.upper(), bitcoin_data.script2_to_address(share.new_script, net.PARENT), net.PARENT.BLOCK_EXPLORER_URL_PREFIX, share.header_hash)
737                         self.delayed_messages[message] = reactor.callLater(random.expovariate(1/5), lambda: (self.say(self.channel, message), self.delayed_messages.pop(message)))
738                 def connectionLost(self, reason):
739                     tracker.verified.added.unwatch(self.watch_id)
740                     print 'IRC connection lost:', reason.getErrorMessage()
741             class IRCClientFactory(protocol.ReconnectingClientFactory):
742                 protocol = IRCClient
743             reactor.connectTCP("irc.freenode.net", 6667, IRCClientFactory())
744         
745         @defer.inlineCallbacks
746         def status_thread():
747             last_str = None
748             last_time = 0
749             while True:
750                 yield deferral.sleep(3)
751                 try:
752                     if time.time() > current_work2.value['last_update'] + 60:
753                         print >>sys.stderr, '''---> LOST CONTACT WITH BITCOIND for %s! Check that it isn't frozen or dead! <---''' % (math.format_dt(time.time() - current_work2.value['last_update']),)
754                     
755                     height = tracker.get_height(current_work.value['best_share_hash'])
756                     this_str = 'P2Pool: %i shares in chain (%i verified/%i total) Peers: %i (%i incoming)' % (
757                         height,
758                         len(tracker.verified.shares),
759                         len(tracker.shares),
760                         len(p2p_node.peers),
761                         sum(1 for peer in p2p_node.peers.itervalues() if peer.incoming),
762                     ) + (' FDs: %i R/%i W' % (len(reactor.getReaders()), len(reactor.getWriters())) if p2pool.DEBUG else '')
763                     
764                     datums, dt = local_rate_monitor.get_datums_in_last()
765                     my_att_s = sum(datum['work']/dt for datum in datums)
766                     this_str += '\n Local: %sH/s in last %s Local dead on arrival: %s Expected time to share: %s' % (
767                         math.format(int(my_att_s)),
768                         math.format_dt(dt),
769                         math.format_binomial_conf(sum(1 for datum in datums if datum['dead']), len(datums), 0.95),
770                         math.format_dt(2**256 / tracker.shares[current_work.value['best_share_hash']].max_target / my_att_s) if my_att_s and current_work.value['best_share_hash'] else '???',
771                     )
772                     
773                     if height > 2:
774                         (stale_orphan_shares, stale_doa_shares), shares, _ = get_stale_counts()
775                         stale_prop = p2pool_data.get_average_stale_prop(tracker, current_work.value['best_share_hash'], min(720, height))
776                         real_att_s = p2pool_data.get_pool_attempts_per_second(tracker, current_work.value['best_share_hash'], min(height - 1, 720)) / (1 - stale_prop)
777                         
778                         this_str += '\n Shares: %i (%i orphan, %i dead) Stale rate: %s Efficiency: %s Current payout: %.4f %s' % (
779                             shares, stale_orphan_shares, stale_doa_shares,
780                             math.format_binomial_conf(stale_orphan_shares + stale_doa_shares, shares, 0.95),
781                             math.format_binomial_conf(stale_orphan_shares + stale_doa_shares, shares, 0.95, lambda x: (1 - x)/(1 - stale_prop)),
782                             get_current_txouts().get(bitcoin_data.pubkey_hash_to_script2(my_pubkey_hash), 0)*1e-8, net.PARENT.SYMBOL,
783                         )
784                         this_str += '\n Pool: %sH/s Stale rate: %.1f%% Expected time to block: %s' % (
785                             math.format(int(real_att_s)),
786                             100*stale_prop,
787                             math.format_dt(2**256 / current_work.value['bits'].target / real_att_s),
788                         )
789                     
790                     if this_str != last_str or time.time() > last_time + 15:
791                         print this_str
792                         last_str = this_str
793                         last_time = time.time()
794                 except:
795                     log.err()
796         status_thread()
797     except:
798         log.err(None, 'Fatal error:')
799         reactor.stop()
800
801 def run():
802     class FixedArgumentParser(argparse.ArgumentParser):
803         def _read_args_from_files(self, arg_strings):
804             # expand arguments referencing files
805             new_arg_strings = []
806             for arg_string in arg_strings:
807                 
808                 # for regular arguments, just add them back into the list
809                 if not arg_string or arg_string[0] not in self.fromfile_prefix_chars:
810                     new_arg_strings.append(arg_string)
811                 
812                 # replace arguments referencing files with the file content
813                 else:
814                     try:
815                         args_file = open(arg_string[1:])
816                         try:
817                             arg_strings = []
818                             for arg_line in args_file.read().splitlines():
819                                 for arg in self.convert_arg_line_to_args(arg_line):
820                                     arg_strings.append(arg)
821                             arg_strings = self._read_args_from_files(arg_strings)
822                             new_arg_strings.extend(arg_strings)
823                         finally:
824                             args_file.close()
825                     except IOError:
826                         err = sys.exc_info()[1]
827                         self.error(str(err))
828             
829             # return the modified argument list
830             return new_arg_strings
831         
832         def convert_arg_line_to_args(self, arg_line):
833             return [arg for arg in arg_line.split() if arg.strip()]
834     
835     
836     realnets=dict((name, net) for name, net in networks.nets.iteritems() if '_testnet' not in name)
837     
838     parser = FixedArgumentParser(description='p2pool (version %s)' % (p2pool.__version__,), fromfile_prefix_chars='@')
839     parser.add_argument('--version', action='version', version=p2pool.__version__)
840     parser.add_argument('--net',
841         help='use specified network (default: bitcoin)',
842         action='store', choices=sorted(realnets), default='bitcoin', dest='net_name')
843     parser.add_argument('--testnet',
844         help='''use the network's testnet''',
845         action='store_const', const=True, default=False, dest='testnet')
846     parser.add_argument('--debug',
847         help='enable debugging mode',
848         action='store_const', const=True, default=False, dest='debug')
849     parser.add_argument('-a', '--address',
850         help='generate payouts to this address (default: <address requested from bitcoind>)',
851         type=str, action='store', default=None, dest='address')
852     parser.add_argument('--datadir',
853         help='store data in this directory (default: <directory run_p2pool.py is in>/data)',
854         type=str, action='store', default=None, dest='datadir')
855     parser.add_argument('--logfile',
856         help='''log to this file (default: data/<NET>/log)''',
857         type=str, action='store', default=None, dest='logfile')
858     parser.add_argument('--merged',
859         help='call getauxblock on this url to get work for merged mining (example: http://ncuser:ncpass@127.0.0.1:10332/)',
860         type=str, action='append', default=[], dest='merged_urls')
861     parser.add_argument('--give-author', metavar='DONATION_PERCENTAGE',
862         help='donate this percentage of work to author of p2pool (default: 0.5)',
863         type=float, action='store', default=0.5, dest='donation_percentage')
864     parser.add_argument('--irc-announce',
865         help='announce any blocks found on irc://irc.freenode.net/#p2pool',
866         action='store_true', default=False, dest='irc_announce')
867     
868     p2pool_group = parser.add_argument_group('p2pool interface')
869     p2pool_group.add_argument('--p2pool-port', metavar='PORT',
870         help='use port PORT to listen for connections (forward this port from your router!) (default: %s)' % ', '.join('%s:%i' % (name, net.P2P_PORT) for name, net in sorted(realnets.items())),
871         type=int, action='store', default=None, dest='p2pool_port')
872     p2pool_group.add_argument('-n', '--p2pool-node', metavar='ADDR[:PORT]',
873         help='connect to existing p2pool node at ADDR listening on port PORT (defaults to default p2pool P2P port) in addition to builtin addresses',
874         type=str, action='append', default=[], dest='p2pool_nodes')
875     parser.add_argument('--disable-upnp',
876         help='''don't attempt to use UPnP to forward p2pool's P2P port from the Internet to this computer''',
877         action='store_false', default=True, dest='upnp')
878     
879     worker_group = parser.add_argument_group('worker interface')
880     worker_group.add_argument('-w', '--worker-port', metavar='PORT or ADDR:PORT',
881         help='listen on PORT on interface with ADDR for RPC connections from miners (default: all interfaces, %s)' % ', '.join('%s:%i' % (name, net.WORKER_PORT) for name, net in sorted(realnets.items())),
882         type=str, action='store', default=None, dest='worker_endpoint')
883     worker_group.add_argument('-f', '--fee', metavar='FEE_PERCENTAGE',
884         help='''charge workers mining to their own bitcoin address (by setting their miner's username to a bitcoin address) this percentage fee to mine on your p2pool instance. Amount displayed at http://127.0.0.1:WORKER_PORT/fee (default: 0)''',
885         type=float, action='store', default=0, dest='worker_fee')
886     
887     bitcoind_group = parser.add_argument_group('bitcoind interface')
888     bitcoind_group.add_argument('--bitcoind-address', metavar='BITCOIND_ADDRESS',
889         help='connect to this address (default: 127.0.0.1)',
890         type=str, action='store', default='127.0.0.1', dest='bitcoind_address')
891     bitcoind_group.add_argument('--bitcoind-rpc-port', metavar='BITCOIND_RPC_PORT',
892         help='''connect to JSON-RPC interface at this port (default: %s <read from bitcoin.conf if password not provided>)''' % ', '.join('%s:%i' % (name, net.PARENT.RPC_PORT) for name, net in sorted(realnets.items())),
893         type=int, action='store', default=None, dest='bitcoind_rpc_port')
894     bitcoind_group.add_argument('--bitcoind-p2p-port', metavar='BITCOIND_P2P_PORT',
895         help='''connect to P2P interface at this port (default: %s <read from bitcoin.conf if password not provided>)''' % ', '.join('%s:%i' % (name, net.PARENT.P2P_PORT) for name, net in sorted(realnets.items())),
896         type=int, action='store', default=None, dest='bitcoind_p2p_port')
897     
898     bitcoind_group.add_argument(metavar='BITCOIND_RPCUSERPASS',
899         help='bitcoind RPC interface username, then password, space-separated (only one being provided will cause the username to default to being empty, and none will cause P2Pool to read them from bitcoin.conf)',
900         type=str, action='store', default=[], nargs='*', dest='bitcoind_rpc_userpass')
901     
902     args = parser.parse_args()
903     
904     if args.debug:
905         p2pool.DEBUG = True
906     
907     net_name = args.net_name + ('_testnet' if args.testnet else '')
908     net = networks.nets[net_name]
909     
910     datadir_path = os.path.join((os.path.join(os.path.dirname(sys.argv[0]), 'data') if args.datadir is None else args.datadir), net_name)
911     if not os.path.exists(datadir_path):
912         os.makedirs(datadir_path)
913     
914     if len(args.bitcoind_rpc_userpass) > 2:
915         parser.error('a maximum of two arguments are allowed')
916     args.bitcoind_rpc_username, args.bitcoind_rpc_password = ([None, None] + args.bitcoind_rpc_userpass)[-2:]
917     
918     if args.bitcoind_rpc_password is None:
919         if not hasattr(net.PARENT, 'CONF_FILE_FUNC'):
920             parser.error('This network has no configuration file function. Manually enter your RPC password.')
921         conf_path = net.PARENT.CONF_FILE_FUNC()
922         if not os.path.exists(conf_path):
923             parser.error('''Bitcoin configuration file not found. Manually enter your RPC password.\r\n'''
924                 '''If you actually haven't created a configuration file, you should create one at %s with the text:\r\n'''
925                 '''\r\n'''
926                 '''server=1\r\n'''
927                 '''rpcpassword=%x''' % (conf_path, random.randrange(2**128)))
928         with open(conf_path, 'rb') as f:
929             cp = ConfigParser.RawConfigParser()
930             cp.readfp(StringIO.StringIO('[x]\r\n' + f.read()))
931             for conf_name, var_name, var_type in [
932                 ('rpcuser', 'bitcoind_rpc_username', str),
933                 ('rpcpassword', 'bitcoind_rpc_password', str),
934                 ('rpcport', 'bitcoind_rpc_port', int),
935                 ('port', 'bitcoind_p2p_port', int),
936             ]:
937                 if getattr(args, var_name) is None and cp.has_option('x', conf_name):
938                     setattr(args, var_name, var_type(cp.get('x', conf_name)))
939         if args.bitcoind_rpc_password is None:
940             parser.error('''Bitcoin configuration file didn't contain an rpcpassword= line! Add one!''')
941     
942     if args.bitcoind_rpc_username is None:
943         args.bitcoind_rpc_username = ''
944     
945     if args.bitcoind_rpc_port is None:
946         args.bitcoind_rpc_port = net.PARENT.RPC_PORT
947     
948     if args.bitcoind_p2p_port is None:
949         args.bitcoind_p2p_port = net.PARENT.P2P_PORT
950     
951     if args.p2pool_port is None:
952         args.p2pool_port = net.P2P_PORT
953     
954     if args.worker_endpoint is None:
955         worker_endpoint = '', net.WORKER_PORT
956     elif ':' not in args.worker_endpoint:
957         worker_endpoint = '', int(args.worker_endpoint)
958     else:
959         addr, port = args.worker_endpoint.rsplit(':', 1)
960         worker_endpoint = addr, int(port)
961     
962     if args.address is not None:
963         try:
964             args.pubkey_hash = bitcoin_data.address_to_pubkey_hash(args.address, net.PARENT)
965         except Exception, e:
966             parser.error('error parsing address: ' + repr(e))
967     else:
968         args.pubkey_hash = None
969     
970     def separate_url(url):
971         s = urlparse.urlsplit(url)
972         if '@' not in s.netloc:
973             parser.error('merged url netloc must contain an "@"')
974         userpass, new_netloc = s.netloc.rsplit('@', 1)
975         return urlparse.urlunsplit(s._replace(netloc=new_netloc)), userpass
976     merged_urls = map(separate_url, args.merged_urls)
977     
978     if args.logfile is None:
979         args.logfile = os.path.join(datadir_path, 'log')
980     
981     logfile = logging.LogFile(args.logfile)
982     pipe = logging.TimestampingPipe(logging.TeePipe([logging.EncodeReplacerPipe(sys.stderr), logfile]))
983     sys.stdout = logging.AbortPipe(pipe)
984     sys.stderr = log.DefaultObserver.stderr = logging.AbortPipe(logging.PrefixPipe(pipe, '> '))
985     if hasattr(signal, "SIGUSR1"):
986         def sigusr1(signum, frame):
987             print 'Caught SIGUSR1, closing %r...' % (args.logfile,)
988             logfile.reopen()
989             print '...and reopened %r after catching SIGUSR1.' % (args.logfile,)
990         signal.signal(signal.SIGUSR1, sigusr1)
991     task.LoopingCall(logfile.reopen).start(5)
992     
993     reactor.callWhenRunning(main, args, net, datadir_path, merged_urls, worker_endpoint)
994     reactor.run()