propagate all block solutions over p2p net
[p2pool.git] / p2pool / main.py
1 from __future__ import division
2
3 import ConfigParser
4 import StringIO
5 import argparse
6 import base64
7 import os
8 import random
9 import sys
10 import time
11 import signal
12 import traceback
13 import urlparse
14
15 try:
16     from twisted.internet import iocpreactor
17     iocpreactor.install()
18 except:
19     pass
20 else:
21     print 'Using IOCP reactor!'
22 from twisted.internet import defer, reactor, protocol, task
23 from twisted.web import server
24 from twisted.python import log
25 from nattraverso import portmapper, ipdiscover
26
27 import bitcoin.p2p as bitcoin_p2p, bitcoin.getwork as bitcoin_getwork, bitcoin.data as bitcoin_data
28 from bitcoin import worker_interface, height_tracker
29 from util import expiring_dict, jsonrpc, variable, deferral, math, logging, pack
30 from . import p2p, networks, web
31 import p2pool, p2pool.data as p2pool_data
32
33 @deferral.retry('Error getting work from bitcoind:', 3)
34 @defer.inlineCallbacks
35 def getwork(bitcoind):
36     try:
37         work = yield bitcoind.rpc_getmemorypool()
38     except jsonrpc.Error, e:
39         if e.code == -32601: # Method not found
40             print >>sys.stderr, 'Error: Bitcoin version too old! Upgrade to v0.5 or newer!'
41             raise deferral.RetrySilentlyException()
42         raise
43     packed_transactions = [x.decode('hex') for x in work['transactions']]
44     defer.returnValue(dict(
45         version=work['version'],
46         previous_block_hash=int(work['previousblockhash'], 16),
47         transactions=map(bitcoin_data.tx_type.unpack, packed_transactions),
48         merkle_link=bitcoin_data.calculate_merkle_link([0] + map(bitcoin_data.hash256, packed_transactions), 0), # using 0 is a bit of a hack, but will always work when index=0
49         subsidy=work['coinbasevalue'],
50         time=work['time'],
51         bits=bitcoin_data.FloatingIntegerType().unpack(work['bits'].decode('hex')[::-1]) if isinstance(work['bits'], (str, unicode)) else bitcoin_data.FloatingInteger(work['bits']),
52         coinbaseflags=work['coinbaseflags'].decode('hex') if 'coinbaseflags' in work else ''.join(x.decode('hex') for x in work['coinbaseaux'].itervalues()) if 'coinbaseaux' in work else '',
53     ))
54
55 @defer.inlineCallbacks
56 def main(args, net, datadir_path, merged_urls, worker_endpoint):
57     try:
58         print 'p2pool (version %s)' % (p2pool.__version__,)
59         print
60         
61         # connect to bitcoind over JSON-RPC and do initial getmemorypool
62         url = 'http://%s:%i/' % (args.bitcoind_address, args.bitcoind_rpc_port)
63         print '''Testing bitcoind RPC connection to '%s' with username '%s'...''' % (url, args.bitcoind_rpc_username)
64         bitcoind = jsonrpc.Proxy(url, dict(Authorization='Basic ' + base64.b64encode(args.bitcoind_rpc_username + ':' + args.bitcoind_rpc_password)), timeout=30)
65         @deferral.retry('Error while checking Bitcoin connection:', 1)
66         @defer.inlineCallbacks
67         def check():
68             if not (yield net.PARENT.RPC_CHECK)(bitcoind):
69                 print >>sys.stderr, "    Check failed! Make sure that you're connected to the right bitcoind with --bitcoind-rpc-port!"
70                 raise deferral.RetrySilentlyException()
71             temp_work = yield getwork(bitcoind)
72             if not net.VERSION_CHECK((yield bitcoind.rpc_getinfo())['version'], temp_work):
73                 print >>sys.stderr, '    Bitcoin version too old! BIP16 support required! Upgrade to 0.6.0rc4 or greater!'
74                 raise deferral.RetrySilentlyException()
75             defer.returnValue(temp_work)
76         temp_work = yield check()
77         print '    ...success!'
78         print '    Current block hash: %x' % (temp_work['previous_block_hash'],)
79         print
80         
81         # connect to bitcoind over bitcoin-p2p
82         print '''Testing bitcoind P2P connection to '%s:%s'...''' % (args.bitcoind_address, args.bitcoind_p2p_port)
83         factory = bitcoin_p2p.ClientFactory(net.PARENT)
84         reactor.connectTCP(args.bitcoind_address, args.bitcoind_p2p_port, factory)
85         yield factory.getProtocol() # waits until handshake is successful
86         print '    ...success!'
87         print
88         
89         print 'Determining payout address...'
90         if args.pubkey_hash is None:
91             address_path = os.path.join(datadir_path, 'cached_payout_address')
92             
93             if os.path.exists(address_path):
94                 with open(address_path, 'rb') as f:
95                     address = f.read().strip('\r\n')
96                 print '    Loaded cached address: %s...' % (address,)
97             else:
98                 address = None
99             
100             if address is not None:
101                 res = yield deferral.retry('Error validating cached address:', 5)(lambda: bitcoind.rpc_validateaddress(address))()
102                 if not res['isvalid'] or not res['ismine']:
103                     print '    Cached address is either invalid or not controlled by local bitcoind!'
104                     address = None
105             
106             if address is None:
107                 print '    Getting payout address from bitcoind...'
108                 address = yield deferral.retry('Error getting payout address from bitcoind:', 5)(lambda: bitcoind.rpc_getaccountaddress('p2pool'))()
109             
110             with open(address_path, 'wb') as f:
111                 f.write(address)
112             
113             my_pubkey_hash = bitcoin_data.address_to_pubkey_hash(address, net.PARENT)
114         else:
115             my_pubkey_hash = args.pubkey_hash
116         print '    ...success! Payout address:', bitcoin_data.pubkey_hash_to_address(my_pubkey_hash, net.PARENT)
117         print
118         
119         my_share_hashes = set()
120         my_doa_share_hashes = set()
121         
122         tracker = p2pool_data.OkayTracker(net, my_share_hashes, my_doa_share_hashes)
123         shared_share_hashes = set()
124         ss = p2pool_data.ShareStore(os.path.join(datadir_path, 'shares.'), net)
125         known_verified = set()
126         print "Loading shares..."
127         for i, (mode, contents) in enumerate(ss.get_shares()):
128             if mode == 'share':
129                 if contents.hash in tracker.shares:
130                     continue
131                 shared_share_hashes.add(contents.hash)
132                 contents.time_seen = 0
133                 tracker.add(contents)
134                 if len(tracker.shares) % 1000 == 0 and tracker.shares:
135                     print "    %i" % (len(tracker.shares),)
136             elif mode == 'verified_hash':
137                 known_verified.add(contents)
138             else:
139                 raise AssertionError()
140         print "    ...inserting %i verified shares..." % (len(known_verified),)
141         for h in known_verified:
142             if h not in tracker.shares:
143                 ss.forget_verified_share(h)
144                 continue
145             tracker.verified.add(tracker.shares[h])
146         print "    ...done loading %i shares!" % (len(tracker.shares),)
147         print
148         tracker.removed.watch(lambda share: ss.forget_share(share.hash))
149         tracker.verified.removed.watch(lambda share: ss.forget_verified_share(share.hash))
150         tracker.removed.watch(lambda share: shared_share_hashes.discard(share.hash))
151         
152         peer_heads = expiring_dict.ExpiringDict(300) # hash -> peers that know of it
153         
154         pre_current_work = variable.Variable(None)
155         pre_merged_work = variable.Variable({})
156         # information affecting work that should trigger a long-polling update
157         current_work = variable.Variable(None)
158         # information affecting work that should not trigger a long-polling update
159         current_work2 = variable.Variable(None)
160         
161         requested = expiring_dict.ExpiringDict(300)
162         
163         print 'Initializing work...'
164         @defer.inlineCallbacks
165         def set_real_work1():
166             work = yield getwork(bitcoind)
167             current_work2.set(dict(
168                 time=work['time'],
169                 transactions=work['transactions'],
170                 merkle_link=work['merkle_link'],
171                 subsidy=work['subsidy'],
172                 clock_offset=time.time() - work['time'],
173                 last_update=time.time(),
174             )) # second set first because everything hooks on the first
175             pre_current_work.set(dict(
176                 version=work['version'],
177                 previous_block=work['previous_block_hash'],
178                 bits=work['bits'],
179                 coinbaseflags=work['coinbaseflags'],
180             ))
181         yield set_real_work1()
182         
183         get_height_rel_highest = yield height_tracker.get_height_rel_highest_func(bitcoind, factory, lambda: pre_current_work.value['previous_block'], net)
184         
185         def set_real_work2():
186             best, desired = tracker.think(get_height_rel_highest, pre_current_work.value['previous_block'], pre_current_work.value['bits'])
187             
188             t = dict(pre_current_work.value)
189             t['best_share_hash'] = best
190             t['mm_chains'] = pre_merged_work.value
191             current_work.set(t)
192             
193             t = time.time()
194             for peer2, share_hash in desired:
195                 if share_hash not in tracker.tails: # was received in the time tracker.think was running
196                     continue
197                 last_request_time, count = requested.get(share_hash, (None, 0))
198                 if last_request_time is not None and last_request_time - 5 < t < last_request_time + 10 * 1.5**count:
199                     continue
200                 potential_peers = set()
201                 for head in tracker.tails[share_hash]:
202                     potential_peers.update(peer_heads.get(head, set()))
203                 potential_peers = [peer for peer in potential_peers if peer.connected2]
204                 if count == 0 and peer2 is not None and peer2.connected2:
205                     peer = peer2
206                 else:
207                     peer = random.choice(potential_peers) if potential_peers and random.random() > .2 else peer2
208                     if peer is None:
209                         continue
210                 
211                 print 'Requesting parent share %s from %s' % (p2pool_data.format_hash(share_hash), '%s:%i' % peer.addr)
212                 peer.send_getshares(
213                     hashes=[share_hash],
214                     parents=2000,
215                     stops=list(set(tracker.heads) | set(
216                         tracker.get_nth_parent_hash(head, min(max(0, tracker.get_height_and_last(head)[0] - 1), 10)) for head in tracker.heads
217                     ))[:100],
218                 )
219                 requested[share_hash] = t, count + 1
220         pre_current_work.changed.watch(lambda _: set_real_work2())
221         pre_merged_work.changed.watch(lambda _: set_real_work2())
222         set_real_work2()
223         print '    ...success!'
224         print
225         
226         
227         @defer.inlineCallbacks
228         def set_merged_work(merged_url, merged_userpass):
229             merged_proxy = jsonrpc.Proxy(merged_url, dict(Authorization='Basic ' + base64.b64encode(merged_userpass)))
230             while True:
231                 auxblock = yield deferral.retry('Error while calling merged getauxblock:', 1)(merged_proxy.rpc_getauxblock)()
232                 pre_merged_work.set(dict(pre_merged_work.value, **{auxblock['chainid']: dict(
233                     hash=int(auxblock['hash'], 16),
234                     target=pack.IntType(256).unpack(auxblock['target'].decode('hex')),
235                     merged_proxy=merged_proxy,
236                 )}))
237                 yield deferral.sleep(1)
238         for merged_url, merged_userpass in merged_urls:
239             set_merged_work(merged_url, merged_userpass)
240         
241         @pre_merged_work.changed.watch
242         def _(new_merged_work):
243             print 'Got new merged mining work!'
244         
245         # setup p2p logic and join p2pool network
246         
247         class Node(p2p.Node):
248             def handle_shares(self, shares, peer):
249                 if len(shares) > 5:
250                     print 'Processing %i shares from %s...' % (len(shares), '%s:%i' % peer.addr if peer is not None else None)
251                 
252                 new_count = 0
253                 for share in shares:
254                     if share.hash in tracker.shares:
255                         #print 'Got duplicate share, ignoring. Hash: %s' % (p2pool_data.format_hash(share.hash),)
256                         continue
257                     
258                     new_count += 1
259                     
260                     #print 'Received share %s from %r' % (p2pool_data.format_hash(share.hash), share.peer.addr if share.peer is not None else None)
261                     
262                     tracker.add(share)
263                 
264                 if shares and peer is not None:
265                     peer_heads.setdefault(shares[0].hash, set()).add(peer)
266                 
267                 if new_count:
268                     set_real_work2()
269                 
270                 if len(shares) > 5:
271                     print '... done processing %i shares. New: %i Have: %i/~%i' % (len(shares), new_count, len(tracker.shares), 2*net.CHAIN_LENGTH)
272             
273             def handle_share_hashes(self, hashes, peer):
274                 t = time.time()
275                 get_hashes = []
276                 for share_hash in hashes:
277                     if share_hash in tracker.shares:
278                         continue
279                     last_request_time, count = requested.get(share_hash, (None, 0))
280                     if last_request_time is not None and last_request_time - 5 < t < last_request_time + 10 * 1.5**count:
281                         continue
282                     print 'Got share hash, requesting! Hash: %s' % (p2pool_data.format_hash(share_hash),)
283                     get_hashes.append(share_hash)
284                     requested[share_hash] = t, count + 1
285                 
286                 if hashes and peer is not None:
287                     peer_heads.setdefault(hashes[0], set()).add(peer)
288                 if get_hashes:
289                     peer.send_getshares(hashes=get_hashes, parents=0, stops=[])
290             
291             def handle_get_shares(self, hashes, parents, stops, peer):
292                 parents = min(parents, 1000//len(hashes))
293                 stops = set(stops)
294                 shares = []
295                 for share_hash in hashes:
296                     for share in tracker.get_chain(share_hash, min(parents + 1, tracker.get_height(share_hash))):
297                         if share.hash in stops:
298                             break
299                         shares.append(share)
300                 print 'Sending %i shares to %s:%i' % (len(shares), peer.addr[0], peer.addr[1])
301                 return shares
302         
303         @deferral.retry('Error submitting block: (will retry)', 10, 10)
304         @defer.inlineCallbacks
305         def submit_block(block, ignore_failure):
306             success = yield bitcoind.rpc_getmemorypool(bitcoin_data.block_type.pack(block).encode('hex'))
307             success_expected = net.PARENT.POW_FUNC(bitcoin_data.block_header_type.pack(block['header'])) <= block['header']['bits'].target
308             if (not success and success_expected and not ignore_failure) or (success and not success_expected):
309                 print >>sys.stderr, 'Block submittal result: %s Expected: %s' % (result, expected_result)
310         
311         @tracker.verified.added.watch
312         def _(share):
313             if share.pow_hash <= share.header['bits'].target:
314                 submit_block(share.as_block(tracker), ignore_failure=True)
315                 print
316                 print 'GOT BLOCK FROM PEER! Passing to bitcoind! %s bitcoin: %s%064x' % (p2pool_data.format_hash(share.hash), net.PARENT.BLOCK_EXPLORER_URL_PREFIX, share.header_hash)
317                 print
318                 if current_work.value['previous_block'] in [share.header['previous_block'], share.header_hash]:
319                     broadcast_share(share.hash)
320         
321         print 'Joining p2pool network using port %i...' % (args.p2pool_port,)
322         
323         @defer.inlineCallbacks
324         def parse(x):
325             if ':' in x:
326                 ip, port = x.split(':')
327                 defer.returnValue(((yield reactor.resolve(ip)), int(port)))
328             else:
329                 defer.returnValue(((yield reactor.resolve(x)), net.P2P_PORT))
330         
331         addrs = {}
332         if os.path.exists(os.path.join(datadir_path, 'addrs.txt')):
333             try:
334                 addrs.update(dict(eval(x) for x in open(os.path.join(datadir_path, 'addrs.txt'))))
335             except:
336                 print >>sys.stderr, "error reading addrs"
337         for addr_df in map(parse, net.BOOTSTRAP_ADDRS):
338             try:
339                 addr = yield addr_df
340                 if addr not in addrs:
341                     addrs[addr] = (0, time.time(), time.time())
342             except:
343                 log.err()
344         
345         connect_addrs = set()
346         for addr_df in map(parse, args.p2pool_nodes):
347             try:
348                 connect_addrs.add((yield addr_df))
349             except:
350                 log.err()
351         
352         p2p_node = Node(
353             best_share_hash_func=lambda: current_work.value['best_share_hash'],
354             port=args.p2pool_port,
355             net=net,
356             addr_store=addrs,
357             connect_addrs=connect_addrs,
358             max_incoming_conns=args.p2pool_conns,
359         )
360         p2p_node.start()
361         
362         task.LoopingCall(lambda: open(os.path.join(datadir_path, 'addrs.txt'), 'w').writelines(repr(x) + '\n' for x in p2p_node.addr_store.iteritems())).start(60)
363         
364         def broadcast_share(share_hash):
365             shares = []
366             for share in tracker.get_chain(share_hash, min(5, tracker.get_height(share_hash))):
367                 if share.hash in shared_share_hashes:
368                     break
369                 shared_share_hashes.add(share.hash)
370                 shares.append(share)
371             
372             for peer in p2p_node.peers.itervalues():
373                 peer.sendShares([share for share in shares if share.peer is not peer])
374         
375         # send share when the chain changes to their chain
376         current_work.changed.watch(lambda new_work: broadcast_share(new_work['best_share_hash']))
377         
378         def save_shares():
379             for share in tracker.get_chain(current_work.value['best_share_hash'], min(tracker.get_height(current_work.value['best_share_hash']), 2*net.CHAIN_LENGTH)):
380                 ss.add_share(share)
381                 if share.hash in tracker.verified.shares:
382                     ss.add_verified_hash(share.hash)
383         task.LoopingCall(save_shares).start(60)
384         
385         print '    ...success!'
386         print
387         
388         if args.upnp:
389             @defer.inlineCallbacks
390             def upnp_thread():
391                 while True:
392                     try:
393                         is_lan, lan_ip = yield ipdiscover.get_local_ip()
394                         if is_lan:
395                             pm = yield portmapper.get_port_mapper()
396                             yield pm._upnp.add_port_mapping(lan_ip, args.p2pool_port, args.p2pool_port, 'p2pool', 'TCP')
397                     except defer.TimeoutError:
398                         pass
399                     except:
400                         if p2pool.DEBUG:
401                             log.err(None, 'UPnP error:')
402                     yield deferral.sleep(random.expovariate(1/120))
403             upnp_thread()
404         
405         # start listening for workers with a JSON-RPC server
406         
407         print 'Listening for workers on %r port %i...' % (worker_endpoint[0], worker_endpoint[1])
408         
409         # setup worker logic
410         
411         removed_unstales_var = variable.Variable((0, 0, 0))
412         removed_doa_unstales_var = variable.Variable(0)
413         @tracker.verified.removed.watch
414         def _(share):
415             if share.hash in my_share_hashes and tracker.is_child_of(share.hash, current_work.value['best_share_hash']):
416                 assert share.share_data['stale_info'] in [0, 253, 254] # we made these shares in this instance
417                 removed_unstales_var.set((
418                     removed_unstales_var.value[0] + 1,
419                     removed_unstales_var.value[1] + (1 if share.share_data['stale_info'] == 253 else 0),
420                     removed_unstales_var.value[2] + (1 if share.share_data['stale_info'] == 254 else 0),
421                 ))
422             if share.hash in my_doa_share_hashes and tracker.is_child_of(share.hash, current_work.value['best_share_hash']):
423                 removed_doa_unstales.set(removed_doa_unstales.value + 1)
424         
425         def get_stale_counts():
426             '''Returns (orphans, doas), total, (orphans_recorded_in_chain, doas_recorded_in_chain)'''
427             my_shares = len(my_share_hashes)
428             my_doa_shares = len(my_doa_share_hashes)
429             delta = tracker.verified.get_delta(current_work.value['best_share_hash'])
430             my_shares_in_chain = delta.my_count + removed_unstales_var.value[0]
431             my_doa_shares_in_chain = delta.my_doa_count + removed_doa_unstales_var.value
432             orphans_recorded_in_chain = delta.my_orphan_announce_count + removed_unstales_var.value[1]
433             doas_recorded_in_chain = delta.my_dead_announce_count + removed_unstales_var.value[2]
434             
435             my_shares_not_in_chain = my_shares - my_shares_in_chain
436             my_doa_shares_not_in_chain = my_doa_shares - my_doa_shares_in_chain
437             
438             return (my_shares_not_in_chain - my_doa_shares_not_in_chain, my_doa_shares_not_in_chain), my_shares, (orphans_recorded_in_chain, doas_recorded_in_chain)
439         
440         
441         pseudoshare_received = variable.Event()
442         share_received = variable.Event()
443         local_rate_monitor = math.RateMonitor(10*60)
444         
445         class WorkerBridge(worker_interface.WorkerBridge):
446             def __init__(self):
447                 worker_interface.WorkerBridge.__init__(self)
448                 self.new_work_event = current_work.changed
449                 self.recent_shares_ts_work = []
450             
451             def get_user_details(self, request):
452                 user = request.getUser() if request.getUser() is not None else ''
453                 
454                 desired_pseudoshare_target = None
455                 if '+' in user:
456                     user, desired_pseudoshare_difficulty_str = user.rsplit('+', 1)
457                     try:
458                         desired_pseudoshare_target = bitcoin_data.difficulty_to_target(float(desired_pseudoshare_difficulty_str))
459                     except:
460                         pass
461                 
462                 desired_share_target = 2**256 - 1
463                 if '/' in user:
464                     user, min_diff_str = user.rsplit('/', 1)
465                     try:
466                         desired_share_target = bitcoin_data.difficulty_to_target(float(min_diff_str))
467                     except:
468                         pass
469                 
470                 if random.uniform(0, 100) < args.worker_fee:
471                     pubkey_hash = my_pubkey_hash
472                 else:
473                     try:
474                         pubkey_hash = bitcoin_data.address_to_pubkey_hash(user, net.PARENT)
475                     except: # XXX blah
476                         pubkey_hash = my_pubkey_hash
477                 
478                 return user, pubkey_hash, desired_share_target, desired_pseudoshare_target
479             
480             def preprocess_request(self, request):
481                 user, pubkey_hash, desired_share_target, desired_pseudoshare_target = self.get_user_details(request)
482                 return pubkey_hash, desired_share_target, desired_pseudoshare_target
483             
484             def get_work(self, pubkey_hash, desired_share_target, desired_pseudoshare_target):
485                 if len(p2p_node.peers) == 0 and net.PERSIST:
486                     raise jsonrpc.Error(-12345, u'p2pool is not connected to any peers')
487                 if current_work.value['best_share_hash'] is None and net.PERSIST:
488                     raise jsonrpc.Error(-12345, u'p2pool is downloading shares')
489                 if time.time() > current_work2.value['last_update'] + 60:
490                     raise jsonrpc.Error(-12345, u'lost contact with bitcoind')
491                 
492                 if current_work.value['mm_chains']:
493                     tree, size = bitcoin_data.make_auxpow_tree(current_work.value['mm_chains'])
494                     mm_hashes = [current_work.value['mm_chains'].get(tree.get(i), dict(hash=0))['hash'] for i in xrange(size)]
495                     mm_data = '\xfa\xbemm' + bitcoin_data.aux_pow_coinbase_type.pack(dict(
496                         merkle_root=bitcoin_data.merkle_hash(mm_hashes),
497                         size=size,
498                         nonce=0,
499                     ))
500                     mm_later = [(aux_work, mm_hashes.index(aux_work['hash']), mm_hashes) for chain_id, aux_work in current_work.value['mm_chains'].iteritems()]
501                 else:
502                     mm_data = ''
503                     mm_later = []
504                 
505                 if True:
506                     share_info, generate_tx = p2pool_data.Share.generate_transaction(
507                         tracker=tracker,
508                         share_data=dict(
509                             previous_share_hash=current_work.value['best_share_hash'],
510                             coinbase=(mm_data + current_work.value['coinbaseflags'])[:100],
511                             nonce=random.randrange(2**32),
512                             pubkey_hash=pubkey_hash,
513                             subsidy=current_work2.value['subsidy'],
514                             donation=math.perfect_round(65535*args.donation_percentage/100),
515                             stale_info=(lambda (orphans, doas), total, (orphans_recorded_in_chain, doas_recorded_in_chain):
516                                 253 if orphans > orphans_recorded_in_chain else
517                                 254 if doas > doas_recorded_in_chain else
518                                 0
519                             )(*get_stale_counts()),
520                             desired_version=1,
521                         ),
522                         block_target=current_work.value['bits'].target,
523                         desired_timestamp=int(time.time() - current_work2.value['clock_offset']),
524                         desired_target=desired_share_target,
525                         ref_merkle_link=dict(branch=[], index=0),
526                         net=net,
527                     )
528                 
529                 target = net.PARENT.SANE_MAX_TARGET
530                 if desired_pseudoshare_target is None:
531                     if len(self.recent_shares_ts_work) == 50:
532                         hash_rate = sum(work for ts, work in self.recent_shares_ts_work[1:])//(self.recent_shares_ts_work[-1][0] - self.recent_shares_ts_work[0][0])
533                         target = min(target, 2**256//hash_rate)
534                 else:
535                     target = min(target, desired_pseudoshare_target)
536                 target = max(target, share_info['bits'].target)
537                 for aux_work in current_work.value['mm_chains'].itervalues():
538                     target = max(target, aux_work['target'])
539                 
540                 transactions = [generate_tx] + list(current_work2.value['transactions'])
541                 packed_generate_tx = bitcoin_data.tx_type.pack(generate_tx)
542                 merkle_root = bitcoin_data.check_merkle_link(bitcoin_data.hash256(packed_generate_tx), current_work2.value['merkle_link'])
543                 
544                 getwork_time = time.time()
545                 merkle_link = current_work2.value['merkle_link']
546                 
547                 print 'New work for worker! Difficulty: %.06f Share difficulty: %.06f Total block value: %.6f %s including %i transactions' % (
548                     bitcoin_data.target_to_difficulty(target),
549                     bitcoin_data.target_to_difficulty(share_info['bits'].target),
550                     current_work2.value['subsidy']*1e-8, net.PARENT.SYMBOL,
551                     len(current_work2.value['transactions']),
552                 )
553                 
554                 ba = bitcoin_getwork.BlockAttempt(
555                     version=current_work.value['version'],
556                     previous_block=current_work.value['previous_block'],
557                     merkle_root=merkle_root,
558                     timestamp=current_work2.value['time'],
559                     bits=current_work.value['bits'],
560                     share_target=target,
561                 )
562                 
563                 received_header_hashes = set()
564                 
565                 def got_response(header, request):
566                     user, _, _, _ = self.get_user_details(request)
567                     assert header['merkle_root'] == merkle_root
568                     
569                     header_hash = bitcoin_data.hash256(bitcoin_data.block_header_type.pack(header))
570                     pow_hash = net.PARENT.POW_FUNC(bitcoin_data.block_header_type.pack(header))
571                     on_time = current_work.value['best_share_hash'] == share_info['share_data']['previous_share_hash']
572                     
573                     try:
574                         if pow_hash <= header['bits'].target or p2pool.DEBUG:
575                             submit_block(dict(header=header, txs=transactions), ignore_failure=False)
576                             if pow_hash <= header['bits'].target:
577                                 print
578                                 print 'GOT BLOCK FROM MINER! Passing to bitcoind! %s%064x' % (net.PARENT.BLOCK_EXPLORER_URL_PREFIX, header_hash)
579                                 print
580                     except:
581                         log.err(None, 'Error while processing potential block:')
582                     
583                     for aux_work, index, hashes in mm_later:
584                         try:
585                             if pow_hash <= aux_work['target'] or p2pool.DEBUG:
586                                 df = deferral.retry('Error submitting merged block: (will retry)', 10, 10)(aux_work['merged_proxy'].rpc_getauxblock)(
587                                     pack.IntType(256, 'big').pack(aux_work['hash']).encode('hex'),
588                                     bitcoin_data.aux_pow_type.pack(dict(
589                                         merkle_tx=dict(
590                                             tx=transactions[0],
591                                             block_hash=header_hash,
592                                             merkle_link=merkle_link,
593                                         ),
594                                         merkle_link=bitcoin_data.calculate_merkle_link(hashes, index),
595                                         parent_block_header=header,
596                                     )).encode('hex'),
597                                 )
598                                 @df.addCallback
599                                 def _(result):
600                                     if result != (pow_hash <= aux_work['target']):
601                                         print >>sys.stderr, 'Merged block submittal result: %s Expected: %s' % (result, pow_hash <= aux_work['target'])
602                                     else:
603                                         print 'Merged block submittal result: %s' % (result,)
604                                 @df.addErrback
605                                 def _(err):
606                                     log.err(err, 'Error submitting merged block:')
607                         except:
608                             log.err(None, 'Error while processing merged mining POW:')
609                     
610                     if pow_hash <= share_info['bits'].target and header_hash not in received_header_hashes:
611                         min_header = dict(header);del min_header['merkle_root']
612                         hash_link = p2pool_data.prefix_to_hash_link(packed_generate_tx[:-32-4], p2pool_data.Share.gentx_before_refhash)
613                         share = p2pool_data.Share(net, None, dict(
614                             min_header=min_header, share_info=share_info, hash_link=hash_link,
615                             ref_merkle_link=dict(branch=[], index=0),
616                         ), merkle_link=merkle_link, other_txs=transactions[1:] if pow_hash <= header['bits'].target else None)
617                         
618                         print 'GOT SHARE! %s %s prev %s age %.2fs%s' % (
619                             request.getUser(),
620                             p2pool_data.format_hash(share.hash),
621                             p2pool_data.format_hash(share.previous_hash),
622                             time.time() - getwork_time,
623                             ' DEAD ON ARRIVAL' if not on_time else '',
624                         )
625                         my_share_hashes.add(share.hash)
626                         if not on_time:
627                             my_doa_share_hashes.add(share.hash)
628                         
629                         tracker.add(share)
630                         if not p2pool.DEBUG:
631                             tracker.verified.add(share)
632                         set_real_work2()
633                         
634                         try:
635                             if pow_hash <= header['bits'].target or p2pool.DEBUG:
636                                 for peer in p2p_node.peers.itervalues():
637                                     peer.sendShares([share])
638                                 shared_share_hashes.add(share.hash)
639                         except:
640                             log.err(None, 'Error forwarding block solution:')
641                         
642                         share_received.happened(bitcoin_data.target_to_average_attempts(share.target), not on_time)
643                     
644                     if pow_hash > target:
645                         print 'Worker %s submitted share with hash > target:' % (request.getUser(),)
646                         print '    Hash:   %56x' % (pow_hash,)
647                         print '    Target: %56x' % (target,)
648                     elif header_hash in received_header_hashes:
649                         print >>sys.stderr, 'Worker %s @ %s submitted share more than once!' % (request.getUser(), request.getClientIP())
650                     else:
651                         received_header_hashes.add(header_hash)
652                         
653                         pseudoshare_received.happened(bitcoin_data.target_to_average_attempts(target), not on_time, user)
654                         self.recent_shares_ts_work.append((time.time(), bitcoin_data.target_to_average_attempts(target)))
655                         while len(self.recent_shares_ts_work) > 50:
656                             self.recent_shares_ts_work.pop(0)
657                         local_rate_monitor.add_datum(dict(work=bitcoin_data.target_to_average_attempts(target), dead=not on_time, user=user))
658                     
659                     return on_time
660                 
661                 return ba, got_response
662         
663         get_current_txouts = lambda: p2pool_data.get_expected_payouts(tracker, current_work.value['best_share_hash'], current_work.value['bits'].target, current_work2.value['subsidy'], net)
664         
665         web_root = web.get_web_root(tracker, current_work, current_work2, get_current_txouts, datadir_path, net, get_stale_counts, my_pubkey_hash, local_rate_monitor, args.worker_fee, p2p_node, my_share_hashes, pseudoshare_received, share_received)
666         worker_interface.WorkerInterface(WorkerBridge()).attach_to(web_root, get_handler=lambda request: request.redirect('/static/'))
667         
668         deferral.retry('Error binding to worker port:', traceback=False)(reactor.listenTCP)(worker_endpoint[1], server.Site(web_root), interface=worker_endpoint[0])
669         
670         with open(os.path.join(os.path.join(datadir_path, 'ready_flag')), 'wb') as f:
671             pass
672         
673         print '    ...success!'
674         print
675         
676         
677         @defer.inlineCallbacks
678         def work_poller():
679             while True:
680                 flag = factory.new_block.get_deferred()
681                 try:
682                     yield set_real_work1()
683                 except:
684                     log.err()
685                 yield defer.DeferredList([flag, deferral.sleep(15)], fireOnOneCallback=True)
686         work_poller()
687         
688         
689         # done!
690         print 'Started successfully!'
691         print 'Go to http://127.0.0.1:%i/ to view graphs and statistics!' % (worker_endpoint[1],)
692         if args.donation_percentage > 0.51:
693             print '''Donating %.1f%% of work towards P2Pool's development. Thanks for the tip!''' % (args.donation_percentage,)
694         elif args.donation_percentage < 0.49:
695             print '''Donating %.1f%% of work towards P2Pool's development. Please donate to encourage further development of P2Pool!''' % (args.donation_percentage,)
696         else:
697             print '''Donating %.1f%% of work towards P2Pool's development. Thank you!''' % (args.donation_percentage,)
698             print 'You can increase this amount with --give-author argument! (or decrease it, if you must)'
699         print
700         
701         
702         if hasattr(signal, 'SIGALRM'):
703             signal.signal(signal.SIGALRM, lambda signum, frame: reactor.callFromThread(
704                 sys.stderr.write, 'Watchdog timer went off at:\n' + ''.join(traceback.format_stack())
705             ))
706             signal.siginterrupt(signal.SIGALRM, False)
707             task.LoopingCall(signal.alarm, 30).start(1)
708         
709         if args.irc_announce:
710             from twisted.words.protocols import irc
711             class IRCClient(irc.IRCClient):
712                 nickname = 'p2pool%02i' % (random.randrange(100),)
713                 channel = net.ANNOUNCE_CHANNEL
714                 def lineReceived(self, line):
715                     if p2pool.DEBUG:
716                         print repr(line)
717                     irc.IRCClient.lineReceived(self, line)
718                 def signedOn(self):
719                     irc.IRCClient.signedOn(self)
720                     self.factory.resetDelay()
721                     self.join(self.channel)
722                     @defer.inlineCallbacks
723                     def new_share(share):
724                         if share.pow_hash <= share.header['bits'].target and abs(share.timestamp - time.time()) < 10*60:
725                             yield deferral.sleep(random.expovariate(1/60))
726                             message = '\x02%s BLOCK FOUND by %s! %s%064x' % (net.NAME.upper(), bitcoin_data.script2_to_address(share.new_script, net.PARENT), net.PARENT.BLOCK_EXPLORER_URL_PREFIX, share.header_hash)
727                             if message not in self.recent_messages:
728                                 self.say(self.channel, message)
729                                 self._remember_message(message)
730                     self.watch_id = tracker.verified.added.watch(new_share)
731                     self.recent_messages = []
732                 def _remember_message(self, message):
733                     self.recent_messages.append(message)
734                     while len(self.recent_messages) > 100:
735                         self.recent_messages.pop(0)
736                 def privmsg(self, user, channel, message):
737                     if channel == self.channel:
738                         self._remember_message(message)
739                 def connectionLost(self, reason):
740                     tracker.verified.added.unwatch(self.watch_id)
741                     print 'IRC connection lost:', reason.getErrorMessage()
742             class IRCClientFactory(protocol.ReconnectingClientFactory):
743                 protocol = IRCClient
744             reactor.connectTCP("irc.freenode.net", 6667, IRCClientFactory())
745         
746         @defer.inlineCallbacks
747         def status_thread():
748             last_str = None
749             last_time = 0
750             while True:
751                 yield deferral.sleep(3)
752                 try:
753                     if time.time() > current_work2.value['last_update'] + 60:
754                         print >>sys.stderr, '''---> LOST CONTACT WITH BITCOIND for %s! Check that it isn't frozen or dead! <---''' % (math.format_dt(time.time() - current_work2.value['last_update']),)
755                     
756                     height = tracker.get_height(current_work.value['best_share_hash'])
757                     this_str = 'P2Pool: %i shares in chain (%i verified/%i total) Peers: %i (%i incoming)' % (
758                         height,
759                         len(tracker.verified.shares),
760                         len(tracker.shares),
761                         len(p2p_node.peers),
762                         sum(1 for peer in p2p_node.peers.itervalues() if peer.incoming),
763                     ) + (' FDs: %i R/%i W' % (len(reactor.getReaders()), len(reactor.getWriters())) if p2pool.DEBUG else '')
764                     
765                     datums, dt = local_rate_monitor.get_datums_in_last()
766                     my_att_s = sum(datum['work']/dt for datum in datums)
767                     this_str += '\n Local: %sH/s in last %s Local dead on arrival: %s Expected time to share: %s' % (
768                         math.format(int(my_att_s)),
769                         math.format_dt(dt),
770                         math.format_binomial_conf(sum(1 for datum in datums if datum['dead']), len(datums), 0.95),
771                         math.format_dt(2**256 / tracker.shares[current_work.value['best_share_hash']].max_target / my_att_s) if my_att_s and current_work.value['best_share_hash'] else '???',
772                     )
773                     
774                     if height > 2:
775                         (stale_orphan_shares, stale_doa_shares), shares, _ = get_stale_counts()
776                         stale_prop = p2pool_data.get_average_stale_prop(tracker, current_work.value['best_share_hash'], min(720, height))
777                         real_att_s = p2pool_data.get_pool_attempts_per_second(tracker, current_work.value['best_share_hash'], min(height - 1, 720)) / (1 - stale_prop)
778                         
779                         this_str += '\n Shares: %i (%i orphan, %i dead) Stale rate: %s Efficiency: %s Current payout: %.4f %s' % (
780                             shares, stale_orphan_shares, stale_doa_shares,
781                             math.format_binomial_conf(stale_orphan_shares + stale_doa_shares, shares, 0.95),
782                             math.format_binomial_conf(stale_orphan_shares + stale_doa_shares, shares, 0.95, lambda x: (1 - x)/(1 - stale_prop)),
783                             get_current_txouts().get(bitcoin_data.pubkey_hash_to_script2(my_pubkey_hash), 0)*1e-8, net.PARENT.SYMBOL,
784                         )
785                         this_str += '\n Pool: %sH/s Stale rate: %.1f%% Expected time to block: %s' % (
786                             math.format(int(real_att_s)),
787                             100*stale_prop,
788                             math.format_dt(2**256 / current_work.value['bits'].target / real_att_s),
789                         )
790                         
791                         desired_version_counts = p2pool_data.get_desired_version_counts(tracker, current_work.value['best_share_hash'], min(720, height))
792                         majority_desired_version = max(desired_version_counts, key=lambda k: desired_version_counts[k])
793                         if majority_desired_version not in [0, 1]:
794                             print >>sys.stderr, '#'*40
795                             print >>sys.stderr, '>>> WARNING: A MAJORITY OF SHARES CONTAIN A VOTE FOR AN UNSUPPORTED SHARE IMPLEMENTATION! (v%i with %i%% support)' % (
796                                 majority_desired_version, 100*desired_version_counts[majority_desired_version]/sum(desired_version_counts.itervalues()))
797                             print >>sys.stderr, '>>> An upgrade is likely necessary. Check http://p2pool.forre.st/ for more information.'
798                             print >>sys.stderr, '#'*40
799                     
800                     if this_str != last_str or time.time() > last_time + 15:
801                         print this_str
802                         last_str = this_str
803                         last_time = time.time()
804                 except:
805                     log.err()
806         status_thread()
807     except:
808         reactor.stop()
809         log.err(None, 'Fatal error:')
810
811 def run():
812     class FixedArgumentParser(argparse.ArgumentParser):
813         def _read_args_from_files(self, arg_strings):
814             # expand arguments referencing files
815             new_arg_strings = []
816             for arg_string in arg_strings:
817                 
818                 # for regular arguments, just add them back into the list
819                 if not arg_string or arg_string[0] not in self.fromfile_prefix_chars:
820                     new_arg_strings.append(arg_string)
821                 
822                 # replace arguments referencing files with the file content
823                 else:
824                     try:
825                         args_file = open(arg_string[1:])
826                         try:
827                             arg_strings = []
828                             for arg_line in args_file.read().splitlines():
829                                 for arg in self.convert_arg_line_to_args(arg_line):
830                                     arg_strings.append(arg)
831                             arg_strings = self._read_args_from_files(arg_strings)
832                             new_arg_strings.extend(arg_strings)
833                         finally:
834                             args_file.close()
835                     except IOError:
836                         err = sys.exc_info()[1]
837                         self.error(str(err))
838             
839             # return the modified argument list
840             return new_arg_strings
841         
842         def convert_arg_line_to_args(self, arg_line):
843             return [arg for arg in arg_line.split() if arg.strip()]
844     
845     
846     realnets=dict((name, net) for name, net in networks.nets.iteritems() if '_testnet' not in name)
847     
848     parser = FixedArgumentParser(description='p2pool (version %s)' % (p2pool.__version__,), fromfile_prefix_chars='@')
849     parser.add_argument('--version', action='version', version=p2pool.__version__)
850     parser.add_argument('--net',
851         help='use specified network (default: bitcoin)',
852         action='store', choices=sorted(realnets), default='bitcoin', dest='net_name')
853     parser.add_argument('--testnet',
854         help='''use the network's testnet''',
855         action='store_const', const=True, default=False, dest='testnet')
856     parser.add_argument('--debug',
857         help='enable debugging mode',
858         action='store_const', const=True, default=False, dest='debug')
859     parser.add_argument('-a', '--address',
860         help='generate payouts to this address (default: <address requested from bitcoind>)',
861         type=str, action='store', default=None, dest='address')
862     parser.add_argument('--datadir',
863         help='store data in this directory (default: <directory run_p2pool.py is in>/data)',
864         type=str, action='store', default=None, dest='datadir')
865     parser.add_argument('--logfile',
866         help='''log to this file (default: data/<NET>/log)''',
867         type=str, action='store', default=None, dest='logfile')
868     parser.add_argument('--merged',
869         help='call getauxblock on this url to get work for merged mining (example: http://ncuser:ncpass@127.0.0.1:10332/)',
870         type=str, action='append', default=[], dest='merged_urls')
871     parser.add_argument('--give-author', metavar='DONATION_PERCENTAGE',
872         help='donate this percentage of work towards the development of p2pool (default: 0.5)',
873         type=float, action='store', default=0.5, dest='donation_percentage')
874     parser.add_argument('--irc-announce',
875         help='announce any blocks found on irc://irc.freenode.net/#p2pool',
876         action='store_true', default=False, dest='irc_announce')
877     
878     p2pool_group = parser.add_argument_group('p2pool interface')
879     p2pool_group.add_argument('--p2pool-port', metavar='PORT',
880         help='use port PORT to listen for connections (forward this port from your router!) (default: %s)' % ', '.join('%s:%i' % (name, net.P2P_PORT) for name, net in sorted(realnets.items())),
881         type=int, action='store', default=None, dest='p2pool_port')
882     p2pool_group.add_argument('-n', '--p2pool-node', metavar='ADDR[:PORT]',
883         help='connect to existing p2pool node at ADDR listening on port PORT (defaults to default p2pool P2P port) in addition to builtin addresses',
884         type=str, action='append', default=[], dest='p2pool_nodes')
885     parser.add_argument('--disable-upnp',
886         help='''don't attempt to use UPnP to forward p2pool's P2P port from the Internet to this computer''',
887         action='store_false', default=True, dest='upnp')
888     p2pool_group.add_argument('--max-conns', metavar='CONNS',
889         help='maximum incoming connections (default: 40)',
890         type=int, action='store', default=40, dest='p2pool_conns')
891     
892     worker_group = parser.add_argument_group('worker interface')
893     worker_group.add_argument('-w', '--worker-port', metavar='PORT or ADDR:PORT',
894         help='listen on PORT on interface with ADDR for RPC connections from miners (default: all interfaces, %s)' % ', '.join('%s:%i' % (name, net.WORKER_PORT) for name, net in sorted(realnets.items())),
895         type=str, action='store', default=None, dest='worker_endpoint')
896     worker_group.add_argument('-f', '--fee', metavar='FEE_PERCENTAGE',
897         help='''charge workers mining to their own bitcoin address (by setting their miner's username to a bitcoin address) this percentage fee to mine on your p2pool instance. Amount displayed at http://127.0.0.1:WORKER_PORT/fee (default: 0)''',
898         type=float, action='store', default=0, dest='worker_fee')
899     
900     bitcoind_group = parser.add_argument_group('bitcoind interface')
901     bitcoind_group.add_argument('--bitcoind-address', metavar='BITCOIND_ADDRESS',
902         help='connect to this address (default: 127.0.0.1)',
903         type=str, action='store', default='127.0.0.1', dest='bitcoind_address')
904     bitcoind_group.add_argument('--bitcoind-rpc-port', metavar='BITCOIND_RPC_PORT',
905         help='''connect to JSON-RPC interface at this port (default: %s <read from bitcoin.conf if password not provided>)''' % ', '.join('%s:%i' % (name, net.PARENT.RPC_PORT) for name, net in sorted(realnets.items())),
906         type=int, action='store', default=None, dest='bitcoind_rpc_port')
907     bitcoind_group.add_argument('--bitcoind-p2p-port', metavar='BITCOIND_P2P_PORT',
908         help='''connect to P2P interface at this port (default: %s <read from bitcoin.conf if password not provided>)''' % ', '.join('%s:%i' % (name, net.PARENT.P2P_PORT) for name, net in sorted(realnets.items())),
909         type=int, action='store', default=None, dest='bitcoind_p2p_port')
910     
911     bitcoind_group.add_argument(metavar='BITCOIND_RPCUSERPASS',
912         help='bitcoind RPC interface username, then password, space-separated (only one being provided will cause the username to default to being empty, and none will cause P2Pool to read them from bitcoin.conf)',
913         type=str, action='store', default=[], nargs='*', dest='bitcoind_rpc_userpass')
914     
915     args = parser.parse_args()
916     
917     if args.debug:
918         p2pool.DEBUG = True
919     
920     net_name = args.net_name + ('_testnet' if args.testnet else '')
921     net = networks.nets[net_name]
922     
923     datadir_path = os.path.join((os.path.join(os.path.dirname(sys.argv[0]), 'data') if args.datadir is None else args.datadir), net_name)
924     if not os.path.exists(datadir_path):
925         os.makedirs(datadir_path)
926     
927     if len(args.bitcoind_rpc_userpass) > 2:
928         parser.error('a maximum of two arguments are allowed')
929     args.bitcoind_rpc_username, args.bitcoind_rpc_password = ([None, None] + args.bitcoind_rpc_userpass)[-2:]
930     
931     if args.bitcoind_rpc_password is None:
932         if not hasattr(net.PARENT, 'CONF_FILE_FUNC'):
933             parser.error('This network has no configuration file function. Manually enter your RPC password.')
934         conf_path = net.PARENT.CONF_FILE_FUNC()
935         if not os.path.exists(conf_path):
936             parser.error('''Bitcoin configuration file not found. Manually enter your RPC password.\r\n'''
937                 '''If you actually haven't created a configuration file, you should create one at %s with the text:\r\n'''
938                 '''\r\n'''
939                 '''server=1\r\n'''
940                 '''rpcpassword=%x''' % (conf_path, random.randrange(2**128)))
941         with open(conf_path, 'rb') as f:
942             cp = ConfigParser.RawConfigParser()
943             cp.readfp(StringIO.StringIO('[x]\r\n' + f.read()))
944             for conf_name, var_name, var_type in [
945                 ('rpcuser', 'bitcoind_rpc_username', str),
946                 ('rpcpassword', 'bitcoind_rpc_password', str),
947                 ('rpcport', 'bitcoind_rpc_port', int),
948                 ('port', 'bitcoind_p2p_port', int),
949             ]:
950                 if getattr(args, var_name) is None and cp.has_option('x', conf_name):
951                     setattr(args, var_name, var_type(cp.get('x', conf_name)))
952         if args.bitcoind_rpc_password is None:
953             parser.error('''Bitcoin configuration file didn't contain an rpcpassword= line! Add one!''')
954     
955     if args.bitcoind_rpc_username is None:
956         args.bitcoind_rpc_username = ''
957     
958     if args.bitcoind_rpc_port is None:
959         args.bitcoind_rpc_port = net.PARENT.RPC_PORT
960     
961     if args.bitcoind_p2p_port is None:
962         args.bitcoind_p2p_port = net.PARENT.P2P_PORT
963     
964     if args.p2pool_port is None:
965         args.p2pool_port = net.P2P_PORT
966     
967     if args.worker_endpoint is None:
968         worker_endpoint = '', net.WORKER_PORT
969     elif ':' not in args.worker_endpoint:
970         worker_endpoint = '', int(args.worker_endpoint)
971     else:
972         addr, port = args.worker_endpoint.rsplit(':', 1)
973         worker_endpoint = addr, int(port)
974     
975     if args.address is not None:
976         try:
977             args.pubkey_hash = bitcoin_data.address_to_pubkey_hash(args.address, net.PARENT)
978         except Exception, e:
979             parser.error('error parsing address: ' + repr(e))
980     else:
981         args.pubkey_hash = None
982     
983     def separate_url(url):
984         s = urlparse.urlsplit(url)
985         if '@' not in s.netloc:
986             parser.error('merged url netloc must contain an "@"')
987         userpass, new_netloc = s.netloc.rsplit('@', 1)
988         return urlparse.urlunsplit(s._replace(netloc=new_netloc)), userpass
989     merged_urls = map(separate_url, args.merged_urls)
990     
991     if args.logfile is None:
992         args.logfile = os.path.join(datadir_path, 'log')
993     
994     logfile = logging.LogFile(args.logfile)
995     pipe = logging.TimestampingPipe(logging.TeePipe([logging.EncodeReplacerPipe(sys.stderr), logfile]))
996     sys.stdout = logging.AbortPipe(pipe)
997     sys.stderr = log.DefaultObserver.stderr = logging.AbortPipe(logging.PrefixPipe(pipe, '> '))
998     if hasattr(signal, "SIGUSR1"):
999         def sigusr1(signum, frame):
1000             print 'Caught SIGUSR1, closing %r...' % (args.logfile,)
1001             logfile.reopen()
1002             print '...and reopened %r after catching SIGUSR1.' % (args.logfile,)
1003         signal.signal(signal.SIGUSR1, sigusr1)
1004     task.LoopingCall(logfile.reopen).start(5)
1005     
1006     reactor.callWhenRunning(main, args, net, datadir_path, merged_urls, worker_endpoint)
1007     reactor.run()