moved all web related stuff into p2pool.web
[p2pool.git] / p2pool / main.py
1 from __future__ import division
2
3 import ConfigParser
4 import StringIO
5 import argparse
6 import os
7 import random
8 import struct
9 import sys
10 import time
11 import signal
12 import traceback
13 import urlparse
14
15 from twisted.internet import defer, error, reactor, protocol, task
16 from twisted.web import server
17 from twisted.python import log
18 from nattraverso import portmapper, ipdiscover
19
20 import bitcoin.p2p as bitcoin_p2p, bitcoin.getwork as bitcoin_getwork, bitcoin.data as bitcoin_data
21 from bitcoin import worker_interface
22 from util import expiring_dict, jsonrpc, variable, deferral, math, logging, pack
23 from . import p2p, networks, web
24 import p2pool, p2pool.data as p2pool_data
25
26 @deferral.retry('Error getting work from bitcoind:', 3)
27 @defer.inlineCallbacks
28 def getwork(bitcoind):
29     try:
30         work = yield bitcoind.rpc_getmemorypool()
31     except jsonrpc.Error, e:
32         if e.code == -32601: # Method not found
33             print >>sys.stderr, 'Error: Bitcoin version too old! Upgrade to v0.5 or newer!'
34             raise deferral.RetrySilentlyException()
35         raise
36     packed_transactions = [x.decode('hex') for x in work['transactions']]
37     defer.returnValue(dict(
38         version=work['version'],
39         previous_block_hash=int(work['previousblockhash'], 16),
40         transactions=map(bitcoin_data.tx_type.unpack, packed_transactions),
41         merkle_branch=bitcoin_data.calculate_merkle_branch([0] + map(bitcoin_data.hash256, packed_transactions), 0),
42         subsidy=work['coinbasevalue'],
43         time=work['time'],
44         bits=bitcoin_data.FloatingIntegerType().unpack(work['bits'].decode('hex')[::-1]) if isinstance(work['bits'], (str, unicode)) else bitcoin_data.FloatingInteger(work['bits']),
45         coinbaseflags=work['coinbaseflags'].decode('hex') if 'coinbaseflags' in work else ''.join(x.decode('hex') for x in work['coinbaseaux'].itervalues()) if 'coinbaseaux' in work else '',
46     ))
47
48 @defer.inlineCallbacks
49 def main(args, net, datadir_path, merged_urls, worker_endpoint):
50     try:
51         print 'p2pool (version %s)' % (p2pool.__version__,)
52         print
53         
54         # connect to bitcoind over JSON-RPC and do initial getmemorypool
55         url = 'http://%s:%i/' % (args.bitcoind_address, args.bitcoind_rpc_port)
56         print '''Testing bitcoind RPC connection to '%s' with username '%s'...''' % (url, args.bitcoind_rpc_username)
57         bitcoind = jsonrpc.Proxy(url, (args.bitcoind_rpc_username, args.bitcoind_rpc_password))
58         good = yield deferral.retry('Error while checking bitcoind identity:', 1)(net.PARENT.RPC_CHECK)(bitcoind)
59         if not good:
60             print >>sys.stderr, "    Check failed! Make sure that you're connected to the right bitcoind with --bitcoind-rpc-port!"
61             return
62         temp_work = yield getwork(bitcoind)
63         print '    ...success!'
64         print '    Current block hash: %x' % (temp_work['previous_block_hash'],)
65         print
66         
67         # connect to bitcoind over bitcoin-p2p
68         print '''Testing bitcoind P2P connection to '%s:%s'...''' % (args.bitcoind_address, args.bitcoind_p2p_port)
69         factory = bitcoin_p2p.ClientFactory(net.PARENT)
70         reactor.connectTCP(args.bitcoind_address, args.bitcoind_p2p_port, factory)
71         yield factory.getProtocol() # waits until handshake is successful
72         print '    ...success!'
73         print
74         
75         print 'Determining payout address...'
76         if args.pubkey_hash is None:
77             address_path = os.path.join(datadir_path, 'cached_payout_address')
78             
79             if os.path.exists(address_path):
80                 with open(address_path, 'rb') as f:
81                     address = f.read().strip('\r\n')
82                 print '    Loaded cached address: %s...' % (address,)
83             else:
84                 address = None
85             
86             if address is not None:
87                 res = yield deferral.retry('Error validating cached address:', 5)(lambda: bitcoind.rpc_validateaddress(address))()
88                 if not res['isvalid'] or not res['ismine']:
89                     print '    Cached address is either invalid or not controlled by local bitcoind!'
90                     address = None
91             
92             if address is None:
93                 print '    Getting payout address from bitcoind...'
94                 address = yield deferral.retry('Error getting payout address from bitcoind:', 5)(lambda: bitcoind.rpc_getaccountaddress('p2pool'))()
95             
96             with open(address_path, 'wb') as f:
97                 f.write(address)
98             
99             my_pubkey_hash = bitcoin_data.address_to_pubkey_hash(address, net.PARENT)
100         else:
101             my_pubkey_hash = args.pubkey_hash
102         print '    ...success! Payout address:', bitcoin_data.pubkey_hash_to_address(my_pubkey_hash, net.PARENT)
103         print
104         
105         my_share_hashes = set()
106         my_doa_share_hashes = set()
107         
108         tracker = p2pool_data.OkayTracker(net, my_share_hashes, my_doa_share_hashes)
109         shared_share_hashes = set()
110         ss = p2pool_data.ShareStore(os.path.join(datadir_path, 'shares.'), net)
111         known_verified = set()
112         recent_blocks = []
113         print "Loading shares..."
114         for i, (mode, contents) in enumerate(ss.get_shares()):
115             if mode == 'share':
116                 if contents.hash in tracker.shares:
117                     continue
118                 shared_share_hashes.add(contents.hash)
119                 contents.time_seen = 0
120                 tracker.add(contents)
121                 if len(tracker.shares) % 1000 == 0 and tracker.shares:
122                     print "    %i" % (len(tracker.shares),)
123             elif mode == 'verified_hash':
124                 known_verified.add(contents)
125             else:
126                 raise AssertionError()
127         print "    ...inserting %i verified shares..." % (len(known_verified),)
128         for h in known_verified:
129             if h not in tracker.shares:
130                 ss.forget_verified_share(h)
131                 continue
132             tracker.verified.add(tracker.shares[h])
133         print "    ...done loading %i shares!" % (len(tracker.shares),)
134         print
135         tracker.removed.watch(lambda share: ss.forget_share(share.hash))
136         tracker.verified.removed.watch(lambda share: ss.forget_verified_share(share.hash))
137         tracker.removed.watch(lambda share: shared_share_hashes.discard(share.hash))
138         
139         peer_heads = expiring_dict.ExpiringDict(300) # hash -> peers that know of it
140         
141         pre_current_work = variable.Variable(None)
142         pre_merged_work = variable.Variable({})
143         # information affecting work that should trigger a long-polling update
144         current_work = variable.Variable(None)
145         # information affecting work that should not trigger a long-polling update
146         current_work2 = variable.Variable(None)
147         
148         requested = expiring_dict.ExpiringDict(300)
149         
150         print 'Initializing work...'
151         @defer.inlineCallbacks
152         def set_real_work1():
153             work = yield getwork(bitcoind)
154             current_work2.set(dict(
155                 time=work['time'],
156                 transactions=work['transactions'],
157                 merkle_branch=work['merkle_branch'],
158                 subsidy=work['subsidy'],
159                 clock_offset=time.time() - work['time'],
160                 last_update=time.time(),
161             )) # second set first because everything hooks on the first
162             pre_current_work.set(dict(
163                 version=work['version'],
164                 previous_block=work['previous_block_hash'],
165                 bits=work['bits'],
166                 coinbaseflags=work['coinbaseflags'],
167             ))
168         yield set_real_work1()
169         
170         if '\ngetblock ' in (yield deferral.retry()(bitcoind.rpc_help)()):
171             height_cacher = deferral.DeferredCacher(defer.inlineCallbacks(lambda block_hash: defer.returnValue((yield bitcoind.rpc_getblock('%x' % (block_hash,)))['blockcount'])))
172             best_height_cached = variable.Variable((yield deferral.retry()(height_cacher)(pre_current_work.value['previous_block'])))
173             def get_height_rel_highest(block_hash):
174                 this_height = height_cacher.call_now(block_hash, 0)
175                 best_height = height_cacher.call_now(pre_current_work.value['previous_block'], 0)
176                 best_height_cached.set(max(best_height_cached.value, this_height, best_height))
177                 return this_height - best_height_cached.value
178         else:
179             get_height_rel_highest = bitcoin_p2p.HeightTracker(bitcoind, factory, 5*net.SHARE_PERIOD*net.CHAIN_LENGTH/net.PARENT.BLOCK_PERIOD).get_height_rel_highest
180         
181         def set_real_work2():
182             best, desired = tracker.think(get_height_rel_highest, pre_current_work.value['previous_block'], pre_current_work.value['bits'])
183             
184             t = dict(pre_current_work.value)
185             t['best_share_hash'] = best
186             t['mm_chains'] = pre_merged_work.value
187             current_work.set(t)
188             
189             t = time.time()
190             for peer2, share_hash in desired:
191                 if share_hash not in tracker.tails: # was received in the time tracker.think was running
192                     continue
193                 last_request_time, count = requested.get(share_hash, (None, 0))
194                 if last_request_time is not None and last_request_time - 5 < t < last_request_time + 10 * 1.5**count:
195                     continue
196                 potential_peers = set()
197                 for head in tracker.tails[share_hash]:
198                     potential_peers.update(peer_heads.get(head, set()))
199                 potential_peers = [peer for peer in potential_peers if peer.connected2]
200                 if count == 0 and peer2 is not None and peer2.connected2:
201                     peer = peer2
202                 else:
203                     peer = random.choice(potential_peers) if potential_peers and random.random() > .2 else peer2
204                     if peer is None:
205                         continue
206                 
207                 print 'Requesting parent share %s from %s' % (p2pool_data.format_hash(share_hash), '%s:%i' % peer.addr)
208                 peer.send_getshares(
209                     hashes=[share_hash],
210                     parents=2000,
211                     stops=list(set(tracker.heads) | set(
212                         tracker.get_nth_parent_hash(head, min(max(0, tracker.get_height_and_last(head)[0] - 1), 10)) for head in tracker.heads
213                     ))[:100],
214                 )
215                 requested[share_hash] = t, count + 1
216         pre_current_work.changed.watch(lambda _: set_real_work2())
217         pre_merged_work.changed.watch(lambda _: set_real_work2())
218         set_real_work2()
219         print '    ...success!'
220         print
221         
222         
223         @defer.inlineCallbacks
224         def set_merged_work(merged_url, merged_userpass):
225             merged_proxy = jsonrpc.Proxy(merged_url, (merged_userpass,))
226             while True:
227                 auxblock = yield deferral.retry('Error while calling merged getauxblock:', 1)(merged_proxy.rpc_getauxblock)()
228                 pre_merged_work.set(dict(pre_merged_work.value, **{auxblock['chainid']: dict(
229                     hash=int(auxblock['hash'], 16),
230                     target=pack.IntType(256).unpack(auxblock['target'].decode('hex')),
231                     merged_proxy=merged_proxy,
232                 )}))
233                 yield deferral.sleep(1)
234         for merged_url, merged_userpass in merged_urls:
235             set_merged_work(merged_url, merged_userpass)
236         
237         @pre_merged_work.changed.watch
238         def _(new_merged_work):
239             print 'Got new merged mining work!'
240         
241         # setup p2p logic and join p2pool network
242         
243         class Node(p2p.Node):
244             def handle_shares(self, shares, peer):
245                 if len(shares) > 5:
246                     print 'Processing %i shares from %s...' % (len(shares), '%s:%i' % peer.addr if peer is not None else None)
247                 
248                 new_count = 0
249                 for share in shares:
250                     if share.hash in tracker.shares:
251                         #print 'Got duplicate share, ignoring. Hash: %s' % (p2pool_data.format_hash(share.hash),)
252                         continue
253                     
254                     new_count += 1
255                     
256                     #print 'Received share %s from %r' % (p2pool_data.format_hash(share.hash), share.peer.addr if share.peer is not None else None)
257                     
258                     tracker.add(share)
259                 
260                 if shares and peer is not None:
261                     peer_heads.setdefault(shares[0].hash, set()).add(peer)
262                 
263                 if new_count:
264                     set_real_work2()
265                 
266                 if len(shares) > 5:
267                     print '... done processing %i shares. New: %i Have: %i/~%i' % (len(shares), new_count, len(tracker.shares), 2*net.CHAIN_LENGTH)
268             
269             def handle_share_hashes(self, hashes, peer):
270                 t = time.time()
271                 get_hashes = []
272                 for share_hash in hashes:
273                     if share_hash in tracker.shares:
274                         continue
275                     last_request_time, count = requested.get(share_hash, (None, 0))
276                     if last_request_time is not None and last_request_time - 5 < t < last_request_time + 10 * 1.5**count:
277                         continue
278                     print 'Got share hash, requesting! Hash: %s' % (p2pool_data.format_hash(share_hash),)
279                     get_hashes.append(share_hash)
280                     requested[share_hash] = t, count + 1
281                 
282                 if hashes and peer is not None:
283                     peer_heads.setdefault(hashes[0], set()).add(peer)
284                 if get_hashes:
285                     peer.send_getshares(hashes=get_hashes, parents=0, stops=[])
286             
287             def handle_get_shares(self, hashes, parents, stops, peer):
288                 parents = min(parents, 1000//len(hashes))
289                 stops = set(stops)
290                 shares = []
291                 for share_hash in hashes:
292                     for share in tracker.get_chain(share_hash, min(parents + 1, tracker.get_height(share_hash))):
293                         if share.hash in stops:
294                             break
295                         shares.append(share)
296                 print 'Sending %i shares to %s:%i' % (len(shares), peer.addr[0], peer.addr[1])
297                 peer.sendShares(shares)
298         
299         @tracker.verified.added.watch
300         def _(share):
301             if share.pow_hash <= share.header['bits'].target:
302                 if factory.conn.value is not None:
303                     factory.conn.value.send_block(block=share.as_block(tracker))
304                 else:
305                     print >>sys.stderr, 'No bitcoind connection when block submittal attempted! Erp!'
306                 print
307                 print 'GOT BLOCK FROM PEER! Passing to bitcoind! %s bitcoin: %s%064x' % (p2pool_data.format_hash(share.hash), net.PARENT.BLOCK_EXPLORER_URL_PREFIX, share.header_hash)
308                 print
309                 recent_blocks.append(dict(ts=share.timestamp, hash='%064x' % (share.header_hash,)))
310         
311         print 'Joining p2pool network using port %i...' % (args.p2pool_port,)
312         
313         @defer.inlineCallbacks
314         def parse(x):
315             if ':' in x:
316                 ip, port = x.split(':')
317                 defer.returnValue(((yield reactor.resolve(ip)), int(port)))
318             else:
319                 defer.returnValue(((yield reactor.resolve(x)), net.P2P_PORT))
320         
321         addrs = {}
322         if os.path.exists(os.path.join(datadir_path, 'addrs.txt')):
323             try:
324                 addrs.update(dict(eval(x) for x in open(os.path.join(datadir_path, 'addrs.txt'))))
325             except:
326                 print >>sys.stderr, "error reading addrs"
327         for addr_df in map(parse, net.BOOTSTRAP_ADDRS):
328             try:
329                 addr = yield addr_df
330                 if addr not in addrs:
331                     addrs[addr] = (0, time.time(), time.time())
332             except:
333                 log.err()
334         
335         connect_addrs = set()
336         for addr_df in map(parse, args.p2pool_nodes):
337             try:
338                 connect_addrs.add((yield addr_df))
339             except:
340                 log.err()
341         
342         p2p_node = Node(
343             best_share_hash_func=lambda: current_work.value['best_share_hash'],
344             port=args.p2pool_port,
345             net=net,
346             addr_store=addrs,
347             connect_addrs=connect_addrs,
348         )
349         p2p_node.start()
350         
351         def save_addrs():
352             open(os.path.join(datadir_path, 'addrs.txt'), 'w').writelines(repr(x) + '\n' for x in p2p_node.addr_store.iteritems())
353         task.LoopingCall(save_addrs).start(60)
354         
355         # send share when the chain changes to their chain
356         def work_changed(new_work):
357             #print 'Work changed:', new_work
358             shares = []
359             for share in tracker.get_chain(new_work['best_share_hash'], min(5, tracker.get_height(new_work['best_share_hash']))):
360                 if share.hash in shared_share_hashes:
361                     break
362                 shared_share_hashes.add(share.hash)
363                 shares.append(share)
364             
365             for peer in p2p_node.peers.itervalues():
366                 peer.sendShares([share for share in shares if share.peer is not peer])
367         
368         current_work.changed.watch(work_changed)
369         
370         def save_shares():
371             for share in tracker.get_chain(current_work.value['best_share_hash'], min(tracker.get_height(current_work.value['best_share_hash']), 2*net.CHAIN_LENGTH)):
372                 ss.add_share(share)
373                 if share.hash in tracker.verified.shares:
374                     ss.add_verified_hash(share.hash)
375         task.LoopingCall(save_shares).start(60)
376         
377         print '    ...success!'
378         print
379         
380         @defer.inlineCallbacks
381         def upnp_thread():
382             while True:
383                 try:
384                     is_lan, lan_ip = yield ipdiscover.get_local_ip()
385                     if is_lan:
386                         pm = yield portmapper.get_port_mapper()
387                         yield pm._upnp.add_port_mapping(lan_ip, args.p2pool_port, args.p2pool_port, 'p2pool', 'TCP')
388                 except defer.TimeoutError:
389                     pass
390                 except:
391                     if p2pool.DEBUG:
392                         log.err(None, "UPnP error:")
393                 yield deferral.sleep(random.expovariate(1/120))
394         
395         if args.upnp:
396             upnp_thread()
397         
398         # start listening for workers with a JSON-RPC server
399         
400         print 'Listening for workers on %r port %i...' % (worker_endpoint[0], worker_endpoint[1])
401         
402         if os.path.exists(os.path.join(datadir_path, 'vip_pass')):
403             with open(os.path.join(datadir_path, 'vip_pass'), 'rb') as f:
404                 vip_pass = f.read().strip('\r\n')
405         else:
406             vip_pass = '%016x' % (random.randrange(2**64),)
407             with open(os.path.join(datadir_path, 'vip_pass'), 'wb') as f:
408                 f.write(vip_pass)
409         print '    Worker password:', vip_pass, '(only required for generating graphs)'
410         
411         # setup worker logic
412         
413         removed_unstales_var = variable.Variable((0, 0, 0))
414         removed_doa_unstales_var = variable.Variable(0)
415         @tracker.verified.removed.watch
416         def _(share):
417             if share.hash in my_share_hashes and tracker.is_child_of(share.hash, current_work.value['best_share_hash']):
418                 assert share.share_data['stale_info'] in [0, 253, 254] # we made these shares in this instance
419                 removed_unstales_var.set((
420                     removed_unstales_var.value[0] + 1,
421                     removed_unstales_var.value[1] + (1 if share.share_data['stale_info'] == 253 else 0),
422                     removed_unstales_var.value[2] + (1 if share.share_data['stale_info'] == 254 else 0),
423                 ))
424             if share.hash in my_doa_share_hashes and tracker.is_child_of(share.hash, current_work.value['best_share_hash']):
425                 removed_doa_unstales.set(removed_doa_unstales.value + 1)
426         
427         def get_stale_counts():
428             '''Returns (orphans, doas), total, (orphans_recorded_in_chain, doas_recorded_in_chain)'''
429             my_shares = len(my_share_hashes)
430             my_doa_shares = len(my_doa_share_hashes)
431             delta = tracker.verified.get_delta(current_work.value['best_share_hash'])
432             my_shares_in_chain = delta.my_count + removed_unstales_var.value[0]
433             my_doa_shares_in_chain = delta.my_doa_count + removed_doa_unstales_var.value
434             orphans_recorded_in_chain = delta.my_orphan_announce_count + removed_unstales_var.value[1]
435             doas_recorded_in_chain = delta.my_dead_announce_count + removed_unstales_var.value[2]
436             
437             my_shares_not_in_chain = my_shares - my_shares_in_chain
438             my_doa_shares_not_in_chain = my_doa_shares - my_doa_shares_in_chain
439             
440             return (my_shares_not_in_chain - my_doa_shares_not_in_chain, my_doa_shares_not_in_chain), my_shares, (orphans_recorded_in_chain, doas_recorded_in_chain)
441         
442         
443         local_rate_monitor = math.RateMonitor(10*60)
444         
445         class WorkerBridge(worker_interface.WorkerBridge):
446             def __init__(self):
447                 worker_interface.WorkerBridge.__init__(self)
448                 self.new_work_event = current_work.changed
449                 self.recent_shares_ts_work = []
450             
451             def preprocess_request(self, request):
452                 user = request.getUser() if request.getUser() is not None else ''
453                 pubkey_hash = my_pubkey_hash
454                 max_target = 2**256 - 1
455                 if '/' in user:
456                     user, min_diff_str = user.rsplit('/', 1)
457                     try:
458                         max_target = bitcoin_data.difficulty_to_target(float(min_diff_str))
459                     except:
460                         pass
461                 try:
462                     pubkey_hash = bitcoin_data.address_to_pubkey_hash(user, net.PARENT)
463                 except: # XXX blah
464                     pass
465                 if random.uniform(0, 100) < args.worker_fee:
466                     pubkey_hash = my_pubkey_hash
467                 return pubkey_hash, max_target
468             
469             def get_work(self, pubkey_hash, max_target):
470                 if len(p2p_node.peers) == 0 and net.PERSIST:
471                     raise jsonrpc.Error(-12345, u'p2pool is not connected to any peers')
472                 if current_work.value['best_share_hash'] is None and net.PERSIST:
473                     raise jsonrpc.Error(-12345, u'p2pool is downloading shares')
474                 if time.time() > current_work2.value['last_update'] + 60:
475                     raise jsonrpc.Error(-12345, u'lost contact with bitcoind')
476                 
477                 if current_work.value['mm_chains']:
478                     tree, size = bitcoin_data.make_auxpow_tree(current_work.value['mm_chains'])
479                     mm_hashes = [current_work.value['mm_chains'].get(tree.get(i), dict(hash=0))['hash'] for i in xrange(size)]
480                     mm_data = '\xfa\xbemm' + bitcoin_data.aux_pow_coinbase_type.pack(dict(
481                         merkle_root=bitcoin_data.merkle_hash(mm_hashes),
482                         size=size,
483                         nonce=0,
484                     ))
485                     mm_later = [(aux_work, mm_hashes.index(aux_work['hash']), mm_hashes) for chain_id, aux_work in current_work.value['mm_chains'].iteritems()]
486                 else:
487                     mm_data = ''
488                     mm_later = []
489                 
490                 new = time.time() > net.SWITCH_TIME
491                 
492                 if new:
493                     share_info, generate_tx = p2pool_data.new_generate_transaction(
494                         tracker=tracker,
495                         share_data=dict(
496                             previous_share_hash=current_work.value['best_share_hash'],
497                             coinbase=(mm_data + current_work.value['coinbaseflags'])[:100],
498                             nonce=random.randrange(2**32),
499                             pubkey_hash=pubkey_hash,
500                             subsidy=current_work2.value['subsidy'],
501                             donation=math.perfect_round(65535*args.donation_percentage/100),
502                             stale_info=(lambda (orphans, doas), total, (orphans_recorded_in_chain, doas_recorded_in_chain):
503                                 253 if orphans > orphans_recorded_in_chain else
504                                 254 if doas > doas_recorded_in_chain else
505                                 0
506                             )(*get_stale_counts()),
507                         ),
508                         block_target=current_work.value['bits'].target,
509                         desired_timestamp=int(time.time() - current_work2.value['clock_offset']),
510                         desired_target=max_target,
511                         net=net,
512                     )
513                 else:
514                     share_info, generate_tx = p2pool_data.generate_transaction(
515                         tracker=tracker,
516                         share_data=dict(
517                             previous_share_hash=current_work.value['best_share_hash'],
518                             coinbase=(mm_data + current_work.value['coinbaseflags'])[:100],
519                             nonce=struct.pack('<Q', random.randrange(2**64)),
520                             new_script=bitcoin_data.pubkey_hash_to_script2(pubkey_hash),
521                             subsidy=current_work2.value['subsidy'],
522                             donation=math.perfect_round(65535*args.donation_percentage/100),
523                             stale_info=(lambda (orphans, doas), total, (orphans_recorded_in_chain, doas_recorded_in_chain):
524                                 253 if orphans > orphans_recorded_in_chain else
525                                 254 if doas > doas_recorded_in_chain else
526                                 0
527                             )(*get_stale_counts()),
528                         ),
529                         block_target=current_work.value['bits'].target,
530                         desired_timestamp=int(time.time() - current_work2.value['clock_offset']),
531                         net=net,
532                     )
533                 
534                 target = net.PARENT.SANE_MAX_TARGET
535                 if len(self.recent_shares_ts_work) == 50:
536                     hash_rate = sum(work for ts, work in self.recent_shares_ts_work)//(self.recent_shares_ts_work[-1][0] - self.recent_shares_ts_work[0][0])
537                     target = min(target, 2**256//(hash_rate * 5))
538                 target = max(target, share_info['bits'].target)
539                 for aux_work in current_work.value['mm_chains'].itervalues():
540                     target = max(target, aux_work['target'])
541                 
542                 transactions = [generate_tx] + list(current_work2.value['transactions'])
543                 packed_generate_tx = bitcoin_data.tx_type.pack(generate_tx)
544                 merkle_root = bitcoin_data.check_merkle_branch(bitcoin_data.hash256(packed_generate_tx), 0, current_work2.value['merkle_branch'])
545                 
546                 getwork_time = time.time()
547                 merkle_branch = current_work2.value['merkle_branch']
548                 
549                 print 'New work for worker! Difficulty: %.06f Share difficulty: %.06f Total block value: %.6f %s including %i transactions' % (
550                     bitcoin_data.target_to_difficulty(target),
551                     bitcoin_data.target_to_difficulty(share_info['bits'].target),
552                     current_work2.value['subsidy']*1e-8, net.PARENT.SYMBOL,
553                     len(current_work2.value['transactions']),
554                 )
555                 
556                 ba = bitcoin_getwork.BlockAttempt(
557                     version=current_work.value['version'],
558                     previous_block=current_work.value['previous_block'],
559                     merkle_root=merkle_root,
560                     timestamp=current_work2.value['time'],
561                     bits=current_work.value['bits'],
562                     share_target=target,
563                 )
564                 
565                 received_header_hashes = set()
566                 
567                 def got_response(header, request):
568                     assert header['merkle_root'] == merkle_root
569                     
570                     header_hash = bitcoin_data.hash256(bitcoin_data.block_header_type.pack(header))
571                     pow_hash = net.PARENT.POW_FUNC(bitcoin_data.block_header_type.pack(header))
572                     on_time = current_work.value['best_share_hash'] == share_info['share_data']['previous_share_hash']
573                     
574                     try:
575                         if pow_hash <= header['bits'].target or p2pool.DEBUG:
576                             @deferral.retry('Error submitting primary block: (will retry)', 10, 10)
577                             def submit_block():
578                                 if factory.conn.value is None:
579                                     print >>sys.stderr, 'No bitcoind connection when block submittal attempted! %s%32x' % (net.PARENT.BLOCK_EXPLORER_URL_PREFIX, header_hash)
580                                     raise deferral.RetrySilentlyException()
581                                 factory.conn.value.send_block(block=dict(header=header, txs=transactions))
582                             submit_block()
583                             if pow_hash <= header['bits'].target:
584                                 print
585                                 print 'GOT BLOCK FROM MINER! Passing to bitcoind! %s%064x' % (net.PARENT.BLOCK_EXPLORER_URL_PREFIX, header_hash)
586                                 print
587                                 recent_blocks.append(dict(ts=time.time(), hash='%064x' % (header_hash,)))
588                     except:
589                         log.err(None, 'Error while processing potential block:')
590                     
591                     for aux_work, index, hashes in mm_later:
592                         try:
593                             if pow_hash <= aux_work['target'] or p2pool.DEBUG:
594                                 df = deferral.retry('Error submitting merged block: (will retry)', 10, 10)(aux_work['merged_proxy'].rpc_getauxblock)(
595                                     pack.IntType(256, 'big').pack(aux_work['hash']).encode('hex'),
596                                     bitcoin_data.aux_pow_type.pack(dict(
597                                         merkle_tx=dict(
598                                             tx=transactions[0],
599                                             block_hash=header_hash,
600                                             merkle_branch=merkle_branch,
601                                             index=0,
602                                         ),
603                                         merkle_branch=bitcoin_data.calculate_merkle_branch(hashes, index),
604                                         index=index,
605                                         parent_block_header=header,
606                                     )).encode('hex'),
607                                 )
608                                 @df.addCallback
609                                 def _(result):
610                                     if result != (pow_hash <= aux_work['target']):
611                                         print >>sys.stderr, 'Merged block submittal result: %s Expected: %s' % (result, pow_hash <= aux_work['target'])
612                                     else:
613                                         print 'Merged block submittal result: %s' % (result,)
614                                 @df.addErrback
615                                 def _(err):
616                                     log.err(err, 'Error submitting merged block:')
617                         except:
618                             log.err(None, 'Error while processing merged mining POW:')
619                     
620                     if pow_hash <= share_info['bits'].target:
621                         if new:
622                             min_header = dict(header);del min_header['merkle_root']
623                             hash_link = p2pool_data.prefix_to_hash_link(packed_generate_tx[:-32-4], p2pool_data.gentx_before_refhash)
624                             share = p2pool_data.NewShare(net, min_header, share_info, hash_link=hash_link, merkle_branch=merkle_branch, other_txs=transactions[1:] if pow_hash <= header['bits'].target else None)
625                         else:
626                             share = p2pool_data.Share(net, header, share_info, merkle_branch=merkle_branch, other_txs=transactions[1:] if pow_hash <= header['bits'].target else None)
627                         print 'GOT SHARE! %s %s prev %s age %.2fs%s' % (
628                             request.getUser(),
629                             p2pool_data.format_hash(share.hash),
630                             p2pool_data.format_hash(share.previous_hash),
631                             time.time() - getwork_time,
632                             ' DEAD ON ARRIVAL' if not on_time else '',
633                         )
634                         my_share_hashes.add(share.hash)
635                         if not on_time:
636                             my_doa_share_hashes.add(share.hash)
637                         
638                         tracker.add(share)
639                         if not p2pool.DEBUG:
640                             tracker.verified.add(share)
641                         set_real_work2()
642                         
643                         try:
644                             if pow_hash <= header['bits'].target or p2pool.DEBUG:
645                                 for peer in p2p_node.peers.itervalues():
646                                     peer.sendShares([share])
647                                 shared_share_hashes.add(share.hash)
648                         except:
649                             log.err(None, 'Error forwarding block solution:')
650                     
651                     if pow_hash <= target and header_hash not in received_header_hashes:
652                         reactor.callLater(1, grapher.add_localrate_point, bitcoin_data.target_to_average_attempts(target), not on_time)
653                         if request.getPassword() == vip_pass:
654                             reactor.callLater(1, grapher.add_localminer_point, request.getUser(), bitcoin_data.target_to_average_attempts(target), not on_time)
655                         self.recent_shares_ts_work.append((time.time(), bitcoin_data.target_to_average_attempts(target)))
656                         while len(self.recent_shares_ts_work) > 50:
657                             self.recent_shares_ts_work.pop(0)
658                         local_rate_monitor.add_datum(dict(work=bitcoin_data.target_to_average_attempts(target), dead=not on_time, user=request.getUser()))
659                     
660                     if header_hash in received_header_hashes:
661                         print >>sys.stderr, 'Worker %s @ %s submitted share more than once!' % (request.getUser(), request.getClientIP())
662                     received_header_hashes.add(header_hash)
663                     
664                     if pow_hash > target:
665                         print 'Worker %s submitted share with hash > target:' % (request.getUser(),)
666                         print '    Hash:   %56x' % (pow_hash,)
667                         print '    Target: %56x' % (target,)
668                     
669                     return on_time
670                 
671                 return ba, got_response
672         
673         
674         def get_current_txouts():
675             weights, total_weight, donation_weight = tracker.get_cumulative_weights(current_work.value['best_share_hash'],
676                 min(tracker.get_height(current_work.value['best_share_hash']), net.REAL_CHAIN_LENGTH),
677                 65535*net.SPREAD*bitcoin_data.target_to_average_attempts(current_work.value['bits'].target),
678             )
679             res = dict((script, current_work2.value['subsidy']*weight//total_weight) for script, weight in weights.iteritems())
680             res[p2pool_data.DONATION_SCRIPT] = res.get(p2pool_data.DONATION_SCRIPT, 0) + current_work2.value['subsidy'] - sum(res.itervalues())
681             return res
682         
683         web_root = web.get_web_root(tracker, current_work, current_work2, get_current_txouts, datadir_path, net, get_stale_counts, my_pubkey_hash, local_rate_monitor, args.worker_fee, p2p_node, my_share_hashes, recent_blocks)
684         worker_interface.WorkerInterface(WorkerBridge()).attach_to(web_root)
685         
686         def attempt_listen():
687             try:
688                 reactor.listenTCP(worker_endpoint[1], server.Site(web_root), interface=worker_endpoint[0])
689             except error.CannotListenError, e:
690                 print >>sys.stderr, 'Error binding to worker port: %s. Retrying in 1 second.' % (e.socketError,)
691                 reactor.callLater(1, attempt_listen)
692             else:
693                 with open(os.path.join(os.path.join(datadir_path, 'ready_flag')), 'wb') as f:
694                     pass
695         attempt_listen()
696         
697         print '    ...success!'
698         print
699         
700         
701         @defer.inlineCallbacks
702         def work_poller():
703             while True:
704                 flag = factory.new_block.get_deferred()
705                 try:
706                     yield set_real_work1()
707                 except:
708                     log.err()
709                 yield defer.DeferredList([flag, deferral.sleep(15)], fireOnOneCallback=True)
710         work_poller()
711         
712         
713         # done!
714         print 'Started successfully!'
715         print
716         
717         
718         if hasattr(signal, 'SIGALRM'):
719             signal.signal(signal.SIGALRM, lambda signum, frame: reactor.callFromThread(
720                 sys.stderr.write, 'Watchdog timer went off at:\n' + ''.join(traceback.format_stack())
721             ))
722             signal.siginterrupt(signal.SIGALRM, False)
723             task.LoopingCall(signal.alarm, 30).start(1)
724         
725         if args.irc_announce:
726             from twisted.words.protocols import irc
727             class IRCClient(irc.IRCClient):
728                 nickname = 'p2pool%02i' % (random.randrange(100),)
729                 channel = '#p2pool' if net.NAME == 'bitcoin' else '#p2pool-alt'
730                 def lineReceived(self, line):
731                     print repr(line)
732                     irc.IRCClient.lineReceived(self, line)
733                 def signedOn(self):
734                     irc.IRCClient.signedOn(self)
735                     self.factory.resetDelay()
736                     self.join(self.channel)
737                     self.watch_id = tracker.verified.added.watch(self._new_share)
738                     self.announced_hashes = set()
739                     self.delayed_messages = {}
740                 def privmsg(self, user, channel, message):
741                     if channel == self.channel and message in self.delayed_messages:
742                         self.delayed_messages.pop(message).cancel()
743                 def _new_share(self, share):
744                     if share.pow_hash <= share.header['bits'].target and share.header_hash not in self.announced_hashes and abs(share.timestamp - time.time()) < 10*60:
745                         self.announced_hashes.add(share.header_hash)
746                         message = '\x02%s BLOCK FOUND by %s! %s%064x' % (net.NAME.upper(), bitcoin_data.script2_to_address(share.new_script, net.PARENT), net.PARENT.BLOCK_EXPLORER_URL_PREFIX, share.header_hash)
747                         self.delayed_messages[message] = reactor.callLater(random.expovariate(1/5), lambda: (self.say(self.channel, message), self.delayed_messages.pop(message)))
748                 def connectionLost(self, reason):
749                     tracker.verified.added.unwatch(self.watch_id)
750                     print 'IRC connection lost:', reason.getErrorMessage()
751             class IRCClientFactory(protocol.ReconnectingClientFactory):
752                 protocol = IRCClient
753             reactor.connectTCP("irc.freenode.net", 6667, IRCClientFactory())
754         
755         @defer.inlineCallbacks
756         def status_thread():
757             last_str = None
758             last_time = 0
759             while True:
760                 yield deferral.sleep(3)
761                 try:
762                     if time.time() > current_work2.value['last_update'] + 60:
763                         print >>sys.stderr, '''---> LOST CONTACT WITH BITCOIND for %s! Check that it isn't frozen or dead! <---''' % (math.format_dt(time.time() - current_work2.value['last_update']),)
764                     
765                     height = tracker.get_height(current_work.value['best_share_hash'])
766                     this_str = 'P2Pool: %i shares in chain (%i verified/%i total) Peers: %i (%i incoming)' % (
767                         height,
768                         len(tracker.verified.shares),
769                         len(tracker.shares),
770                         len(p2p_node.peers),
771                         sum(1 for peer in p2p_node.peers.itervalues() if peer.incoming),
772                     ) + (' FDs: %i R/%i W' % (len(reactor.getReaders()), len(reactor.getWriters())) if p2pool.DEBUG else '')
773                     
774                     datums, dt = local_rate_monitor.get_datums_in_last()
775                     my_att_s = sum(datum['work']/dt for datum in datums)
776                     this_str += '\n Local: %sH/s in last %s Local dead on arrival: %s Expected time to share: %s' % (
777                         math.format(int(my_att_s)),
778                         math.format_dt(dt),
779                         math.format_binomial_conf(sum(1 for datum in datums if datum['dead']), len(datums), 0.95),
780                         math.format_dt(2**256 / tracker.shares[current_work.value['best_share_hash']].max_target / my_att_s) if my_att_s and current_work.value['best_share_hash'] else '???',
781                     )
782                     
783                     if height > 2:
784                         (stale_orphan_shares, stale_doa_shares), shares, _ = get_stale_counts()
785                         stale_prop = p2pool_data.get_average_stale_prop(tracker, current_work.value['best_share_hash'], min(720, height))
786                         real_att_s = p2pool_data.get_pool_attempts_per_second(tracker, current_work.value['best_share_hash'], min(height - 1, 720)) / (1 - stale_prop)
787                         
788                         this_str += '\n Shares: %i (%i orphan, %i dead) Stale rate: %s Efficiency: %s Current payout: %.4f %s' % (
789                             shares, stale_orphan_shares, stale_doa_shares,
790                             math.format_binomial_conf(stale_orphan_shares + stale_doa_shares, shares, 0.95),
791                             math.format_binomial_conf(stale_orphan_shares + stale_doa_shares, shares, 0.95, lambda x: (1 - x)/(1 - stale_prop)),
792                             get_current_txouts().get(bitcoin_data.pubkey_hash_to_script2(my_pubkey_hash), 0)*1e-8, net.PARENT.SYMBOL,
793                         )
794                         this_str += '\n Pool: %sH/s Stale rate: %.1f%% Expected time to block: %s' % (
795                             math.format(int(real_att_s)),
796                             100*stale_prop,
797                             math.format_dt(2**256 / current_work.value['bits'].target / real_att_s),
798                         )
799                     
800                     if this_str != last_str or time.time() > last_time + 15:
801                         print this_str
802                         last_str = this_str
803                         last_time = time.time()
804                 except:
805                     log.err()
806         status_thread()
807     except:
808         log.err(None, 'Fatal error:')
809         reactor.stop()
810
811 def run():
812     class FixedArgumentParser(argparse.ArgumentParser):
813         def _read_args_from_files(self, arg_strings):
814             # expand arguments referencing files
815             new_arg_strings = []
816             for arg_string in arg_strings:
817                 
818                 # for regular arguments, just add them back into the list
819                 if not arg_string or arg_string[0] not in self.fromfile_prefix_chars:
820                     new_arg_strings.append(arg_string)
821                 
822                 # replace arguments referencing files with the file content
823                 else:
824                     try:
825                         args_file = open(arg_string[1:])
826                         try:
827                             arg_strings = []
828                             for arg_line in args_file.read().splitlines():
829                                 for arg in self.convert_arg_line_to_args(arg_line):
830                                     arg_strings.append(arg)
831                             arg_strings = self._read_args_from_files(arg_strings)
832                             new_arg_strings.extend(arg_strings)
833                         finally:
834                             args_file.close()
835                     except IOError:
836                         err = sys.exc_info()[1]
837                         self.error(str(err))
838             
839             # return the modified argument list
840             return new_arg_strings
841         
842         def convert_arg_line_to_args(self, arg_line):
843             return [arg for arg in arg_line.split() if arg.strip()]
844     
845     
846     realnets=dict((name, net) for name, net in networks.nets.iteritems() if '_testnet' not in name)
847     
848     parser = FixedArgumentParser(description='p2pool (version %s)' % (p2pool.__version__,), fromfile_prefix_chars='@')
849     parser.add_argument('--version', action='version', version=p2pool.__version__)
850     parser.add_argument('--net',
851         help='use specified network (default: bitcoin)',
852         action='store', choices=sorted(realnets), default='bitcoin', dest='net_name')
853     parser.add_argument('--testnet',
854         help='''use the network's testnet''',
855         action='store_const', const=True, default=False, dest='testnet')
856     parser.add_argument('--debug',
857         help='enable debugging mode',
858         action='store_const', const=True, default=False, dest='debug')
859     parser.add_argument('-a', '--address',
860         help='generate payouts to this address (default: <address requested from bitcoind>)',
861         type=str, action='store', default=None, dest='address')
862     parser.add_argument('--datadir',
863         help='store data in this directory (default: <directory run_p2pool.py is in>/data)',
864         type=str, action='store', default=None, dest='datadir')
865     parser.add_argument('--logfile',
866         help='''log to this file (default: data/<NET>/log)''',
867         type=str, action='store', default=None, dest='logfile')
868     parser.add_argument('--merged',
869         help='call getauxblock on this url to get work for merged mining (example: http://ncuser:ncpass@127.0.0.1:10332/)',
870         type=str, action='append', default=[], dest='merged_urls')
871     parser.add_argument('--give-author', metavar='DONATION_PERCENTAGE',
872         help='donate this percentage of work to author of p2pool (default: 0.5)',
873         type=float, action='store', default=0.5, dest='donation_percentage')
874     parser.add_argument('--irc-announce',
875         help='announce any blocks found on irc://irc.freenode.net/#p2pool',
876         action='store_true', default=False, dest='irc_announce')
877     
878     p2pool_group = parser.add_argument_group('p2pool interface')
879     p2pool_group.add_argument('--p2pool-port', metavar='PORT',
880         help='use port PORT to listen for connections (forward this port from your router!) (default: %s)' % ', '.join('%s:%i' % (name, net.P2P_PORT) for name, net in sorted(realnets.items())),
881         type=int, action='store', default=None, dest='p2pool_port')
882     p2pool_group.add_argument('-n', '--p2pool-node', metavar='ADDR[:PORT]',
883         help='connect to existing p2pool node at ADDR listening on port PORT (defaults to default p2pool P2P port) in addition to builtin addresses',
884         type=str, action='append', default=[], dest='p2pool_nodes')
885     parser.add_argument('--disable-upnp',
886         help='''don't attempt to use UPnP to forward p2pool's P2P port from the Internet to this computer''',
887         action='store_false', default=True, dest='upnp')
888     
889     worker_group = parser.add_argument_group('worker interface')
890     worker_group.add_argument('-w', '--worker-port', metavar='PORT or ADDR:PORT',
891         help='listen on PORT on interface with ADDR for RPC connections from miners (default: all interfaces, %s)' % ', '.join('%s:%i' % (name, net.WORKER_PORT) for name, net in sorted(realnets.items())),
892         type=str, action='store', default=None, dest='worker_endpoint')
893     worker_group.add_argument('-f', '--fee', metavar='FEE_PERCENTAGE',
894         help='''charge workers mining to their own bitcoin address (by setting their miner's username to a bitcoin address) this percentage fee to mine on your p2pool instance. Amount displayed at http://127.0.0.1:WORKER_PORT/fee (default: 0)''',
895         type=float, action='store', default=0, dest='worker_fee')
896     
897     bitcoind_group = parser.add_argument_group('bitcoind interface')
898     bitcoind_group.add_argument('--bitcoind-address', metavar='BITCOIND_ADDRESS',
899         help='connect to this address (default: 127.0.0.1)',
900         type=str, action='store', default='127.0.0.1', dest='bitcoind_address')
901     bitcoind_group.add_argument('--bitcoind-rpc-port', metavar='BITCOIND_RPC_PORT',
902         help='''connect to JSON-RPC interface at this port (default: %s <read from bitcoin.conf if password not provided>)''' % ', '.join('%s:%i' % (name, net.PARENT.RPC_PORT) for name, net in sorted(realnets.items())),
903         type=int, action='store', default=None, dest='bitcoind_rpc_port')
904     bitcoind_group.add_argument('--bitcoind-p2p-port', metavar='BITCOIND_P2P_PORT',
905         help='''connect to P2P interface at this port (default: %s <read from bitcoin.conf if password not provided>)''' % ', '.join('%s:%i' % (name, net.PARENT.P2P_PORT) for name, net in sorted(realnets.items())),
906         type=int, action='store', default=None, dest='bitcoind_p2p_port')
907     
908     bitcoind_group.add_argument(metavar='BITCOIND_RPCUSERPASS',
909         help='bitcoind RPC interface username, then password, space-separated (only one being provided will cause the username to default to being empty, and none will cause P2Pool to read them from bitcoin.conf)',
910         type=str, action='store', default=[], nargs='*', dest='bitcoind_rpc_userpass')
911     
912     args = parser.parse_args()
913     
914     if args.debug:
915         p2pool.DEBUG = True
916     
917     net_name = args.net_name + ('_testnet' if args.testnet else '')
918     net = networks.nets[net_name]
919     
920     datadir_path = os.path.join((os.path.join(os.path.dirname(sys.argv[0]), 'data') if args.datadir is None else args.datadir), net_name)
921     if not os.path.exists(datadir_path):
922         os.makedirs(datadir_path)
923     
924     if len(args.bitcoind_rpc_userpass) > 2:
925         parser.error('a maximum of two arguments are allowed')
926     args.bitcoind_rpc_username, args.bitcoind_rpc_password = ([None, None] + args.bitcoind_rpc_userpass)[-2:]
927     
928     if args.bitcoind_rpc_password is None:
929         if not hasattr(net.PARENT, 'CONF_FILE_FUNC'):
930             parser.error('This network has no configuration file function. Manually enter your RPC password.')
931         conf_path = net.PARENT.CONF_FILE_FUNC()
932         if not os.path.exists(conf_path):
933             parser.error('''Bitcoin configuration file not found. Manually enter your RPC password.\r\n'''
934                 '''If you actually haven't created a configuration file, you should create one at %s with the text:\r\n'''
935                 '''\r\n'''
936                 '''server=1\r\n'''
937                 '''rpcpassword=%x''' % (conf_path, random.randrange(2**128)))
938         with open(conf_path, 'rb') as f:
939             cp = ConfigParser.RawConfigParser()
940             cp.readfp(StringIO.StringIO('[x]\r\n' + f.read()))
941             for conf_name, var_name, var_type in [
942                 ('rpcuser', 'bitcoind_rpc_username', str),
943                 ('rpcpassword', 'bitcoind_rpc_password', str),
944                 ('rpcport', 'bitcoind_rpc_port', int),
945                 ('port', 'bitcoind_p2p_port', int),
946             ]:
947                 if getattr(args, var_name) is None and cp.has_option('x', conf_name):
948                     setattr(args, var_name, var_type(cp.get('x', conf_name)))
949     
950     if args.bitcoind_rpc_username is None:
951         args.bitcoind_rpc_username = ''
952     
953     if args.bitcoind_rpc_port is None:
954         args.bitcoind_rpc_port = net.PARENT.RPC_PORT
955     
956     if args.bitcoind_p2p_port is None:
957         args.bitcoind_p2p_port = net.PARENT.P2P_PORT
958     
959     if args.p2pool_port is None:
960         args.p2pool_port = net.P2P_PORT
961     
962     if args.worker_endpoint is None:
963         worker_endpoint = '', net.WORKER_PORT
964     elif ':' not in args.worker_endpoint:
965         worker_endpoint = '', int(args.worker_endpoint)
966     else:
967         addr, port = args.worker_endpoint.rsplit(':', 1)
968         worker_endpoint = addr, int(port)
969     
970     if args.address is not None:
971         try:
972             args.pubkey_hash = bitcoin_data.address_to_pubkey_hash(args.address, net.PARENT)
973         except Exception, e:
974             parser.error('error parsing address: ' + repr(e))
975     else:
976         args.pubkey_hash = None
977     
978     def separate_url(url):
979         s = urlparse.urlsplit(url)
980         if '@' not in s.netloc:
981             parser.error('merged url netloc must contain an "@"')
982         userpass, new_netloc = s.netloc.rsplit('@', 1)
983         return urlparse.urlunsplit(s._replace(netloc=new_netloc)), userpass
984     merged_urls = map(separate_url, args.merged_urls)
985     
986     if args.logfile is None:
987         args.logfile = os.path.join(datadir_path, 'log')
988     
989     logfile = logging.LogFile(args.logfile)
990     pipe = logging.TimestampingPipe(logging.TeePipe([logging.EncodeReplacerPipe(sys.stderr), logfile]))
991     sys.stdout = logging.AbortPipe(pipe)
992     sys.stderr = log.DefaultObserver.stderr = logging.AbortPipe(logging.PrefixPipe(pipe, '> '))
993     if hasattr(signal, "SIGUSR1"):
994         def sigusr1(signum, frame):
995             print 'Caught SIGUSR1, closing %r...' % (args.logfile,)
996             logfile.reopen()
997             print '...and reopened %r after catching SIGUSR1.' % (args.logfile,)
998         signal.signal(signal.SIGUSR1, sigusr1)
999     task.LoopingCall(logfile.reopen).start(5)
1000     
1001     reactor.callWhenRunning(main, args, net, datadir_path, merged_urls, worker_endpoint)
1002     reactor.run()