allow up to 4 work submits per second
[p2pool.git] / p2pool / main.py
1 from __future__ import division
2
3 import ConfigParser
4 import StringIO
5 import argparse
6 import base64
7 import json
8 import os
9 import random
10 import sys
11 import time
12 import signal
13 import traceback
14 import urlparse
15
16 try:
17     from twisted.internet import iocpreactor
18     iocpreactor.install()
19 except:
20     pass
21 else:
22     print 'Using IOCP reactor!'
23 from twisted.internet import defer, reactor, protocol, task
24 from twisted.web import server
25 from twisted.python import log
26 from nattraverso import portmapper, ipdiscover
27
28 import bitcoin.p2p as bitcoin_p2p, bitcoin.getwork as bitcoin_getwork, bitcoin.data as bitcoin_data
29 from bitcoin import worker_interface, height_tracker
30 from util import expiring_dict, jsonrpc, variable, deferral, math, logging, pack
31 from . import p2p, networks, web
32 import p2pool, p2pool.data as p2pool_data
33
34 @deferral.retry('Error getting work from bitcoind:', 3)
35 @defer.inlineCallbacks
36 def getwork(bitcoind):
37     try:
38         work = yield bitcoind.rpc_getmemorypool()
39     except jsonrpc.Error, e:
40         if e.code == -32601: # Method not found
41             print >>sys.stderr, 'Error: Bitcoin version too old! Upgrade to v0.5 or newer!'
42             raise deferral.RetrySilentlyException()
43         raise
44     packed_transactions = [x.decode('hex') for x in work['transactions']]
45     defer.returnValue(dict(
46         version=work['version'],
47         previous_block_hash=int(work['previousblockhash'], 16),
48         transactions=map(bitcoin_data.tx_type.unpack, packed_transactions),
49         merkle_link=bitcoin_data.calculate_merkle_link([0] + map(bitcoin_data.hash256, packed_transactions), 0), # using 0 is a bit of a hack, but will always work when index=0
50         subsidy=work['coinbasevalue'],
51         time=work['time'],
52         bits=bitcoin_data.FloatingIntegerType().unpack(work['bits'].decode('hex')[::-1]) if isinstance(work['bits'], (str, unicode)) else bitcoin_data.FloatingInteger(work['bits']),
53         coinbaseflags=work['coinbaseflags'].decode('hex') if 'coinbaseflags' in work else ''.join(x.decode('hex') for x in work['coinbaseaux'].itervalues()) if 'coinbaseaux' in work else '',
54     ))
55
56 @defer.inlineCallbacks
57 def main(args, net, datadir_path, merged_urls, worker_endpoint):
58     try:
59         print 'p2pool (version %s)' % (p2pool.__version__,)
60         print
61         
62         # connect to bitcoind over JSON-RPC and do initial getmemorypool
63         url = 'http://%s:%i/' % (args.bitcoind_address, args.bitcoind_rpc_port)
64         print '''Testing bitcoind RPC connection to '%s' with username '%s'...''' % (url, args.bitcoind_rpc_username)
65         bitcoind = jsonrpc.Proxy(url, dict(Authorization='Basic ' + base64.b64encode(args.bitcoind_rpc_username + ':' + args.bitcoind_rpc_password)), timeout=30)
66         @deferral.retry('Error while checking Bitcoin connection:', 1)
67         @defer.inlineCallbacks
68         def check():
69             if not (yield net.PARENT.RPC_CHECK)(bitcoind):
70                 print >>sys.stderr, "    Check failed! Make sure that you're connected to the right bitcoind with --bitcoind-rpc-port!"
71                 raise deferral.RetrySilentlyException()
72             temp_work = yield getwork(bitcoind)
73             if not net.VERSION_CHECK((yield bitcoind.rpc_getinfo())['version'], temp_work):
74                 print >>sys.stderr, '    Bitcoin version too old! BIP16 support required! Upgrade to 0.6.0rc4 or greater!'
75                 raise deferral.RetrySilentlyException()
76             defer.returnValue(temp_work)
77         temp_work = yield check()
78         print '    ...success!'
79         print '    Current block hash: %x' % (temp_work['previous_block_hash'],)
80         print
81         
82         # connect to bitcoind over bitcoin-p2p
83         print '''Testing bitcoind P2P connection to '%s:%s'...''' % (args.bitcoind_address, args.bitcoind_p2p_port)
84         factory = bitcoin_p2p.ClientFactory(net.PARENT)
85         reactor.connectTCP(args.bitcoind_address, args.bitcoind_p2p_port, factory)
86         yield factory.getProtocol() # waits until handshake is successful
87         print '    ...success!'
88         print
89         
90         print 'Determining payout address...'
91         if args.pubkey_hash is None:
92             address_path = os.path.join(datadir_path, 'cached_payout_address')
93             
94             if os.path.exists(address_path):
95                 with open(address_path, 'rb') as f:
96                     address = f.read().strip('\r\n')
97                 print '    Loaded cached address: %s...' % (address,)
98             else:
99                 address = None
100             
101             if address is not None:
102                 res = yield deferral.retry('Error validating cached address:', 5)(lambda: bitcoind.rpc_validateaddress(address))()
103                 if not res['isvalid'] or not res['ismine']:
104                     print '    Cached address is either invalid or not controlled by local bitcoind!'
105                     address = None
106             
107             if address is None:
108                 print '    Getting payout address from bitcoind...'
109                 address = yield deferral.retry('Error getting payout address from bitcoind:', 5)(lambda: bitcoind.rpc_getaccountaddress('p2pool'))()
110             
111             with open(address_path, 'wb') as f:
112                 f.write(address)
113             
114             my_pubkey_hash = bitcoin_data.address_to_pubkey_hash(address, net.PARENT)
115         else:
116             my_pubkey_hash = args.pubkey_hash
117         print '    ...success! Payout address:', bitcoin_data.pubkey_hash_to_address(my_pubkey_hash, net.PARENT)
118         print
119         
120         my_share_hashes = set()
121         my_doa_share_hashes = set()
122         
123         tracker = p2pool_data.OkayTracker(net, my_share_hashes, my_doa_share_hashes)
124         shared_share_hashes = set()
125         ss = p2pool_data.ShareStore(os.path.join(datadir_path, 'shares.'), net)
126         known_verified = set()
127         print "Loading shares..."
128         for i, (mode, contents) in enumerate(ss.get_shares()):
129             if mode == 'share':
130                 if contents.hash in tracker.shares:
131                     continue
132                 shared_share_hashes.add(contents.hash)
133                 contents.time_seen = 0
134                 tracker.add(contents)
135                 if len(tracker.shares) % 1000 == 0 and tracker.shares:
136                     print "    %i" % (len(tracker.shares),)
137             elif mode == 'verified_hash':
138                 known_verified.add(contents)
139             else:
140                 raise AssertionError()
141         print "    ...inserting %i verified shares..." % (len(known_verified),)
142         for h in known_verified:
143             if h not in tracker.shares:
144                 ss.forget_verified_share(h)
145                 continue
146             tracker.verified.add(tracker.shares[h])
147         print "    ...done loading %i shares!" % (len(tracker.shares),)
148         print
149         tracker.removed.watch(lambda share: ss.forget_share(share.hash))
150         tracker.verified.removed.watch(lambda share: ss.forget_verified_share(share.hash))
151         tracker.removed.watch(lambda share: shared_share_hashes.discard(share.hash))
152         
153         peer_heads = expiring_dict.ExpiringDict(300) # hash -> peers that know of it
154         
155         pre_current_work = variable.Variable(None)
156         pre_merged_work = variable.Variable({})
157         # information affecting work that should trigger a long-polling update
158         current_work = variable.Variable(None)
159         # information affecting work that should not trigger a long-polling update
160         current_work2 = variable.Variable(None)
161         
162         requested = expiring_dict.ExpiringDict(300)
163         
164         print 'Initializing work...'
165         @defer.inlineCallbacks
166         def set_real_work1():
167             work = yield getwork(bitcoind)
168             current_work2.set(dict(
169                 time=work['time'],
170                 transactions=work['transactions'],
171                 merkle_link=work['merkle_link'],
172                 subsidy=work['subsidy'],
173                 clock_offset=time.time() - work['time'],
174                 last_update=time.time(),
175             )) # second set first because everything hooks on the first
176             pre_current_work.set(dict(
177                 version=work['version'],
178                 previous_block=work['previous_block_hash'],
179                 bits=work['bits'],
180                 coinbaseflags=work['coinbaseflags'],
181             ))
182         yield set_real_work1()
183         
184         get_height_rel_highest = yield height_tracker.get_height_rel_highest_func(bitcoind, factory, lambda: pre_current_work.value['previous_block'], net)
185         
186         def set_real_work2():
187             best, desired = tracker.think(get_height_rel_highest, pre_current_work.value['previous_block'], pre_current_work.value['bits'])
188             
189             t = dict(pre_current_work.value)
190             t['best_share_hash'] = best
191             t['mm_chains'] = pre_merged_work.value
192             current_work.set(t)
193             
194             t = time.time()
195             for peer2, share_hash in desired:
196                 if share_hash not in tracker.tails: # was received in the time tracker.think was running
197                     continue
198                 last_request_time, count = requested.get(share_hash, (None, 0))
199                 if last_request_time is not None and last_request_time - 5 < t < last_request_time + 10 * 1.5**count:
200                     continue
201                 potential_peers = set()
202                 for head in tracker.tails[share_hash]:
203                     potential_peers.update(peer_heads.get(head, set()))
204                 potential_peers = [peer for peer in potential_peers if peer.connected2]
205                 if count == 0 and peer2 is not None and peer2.connected2:
206                     peer = peer2
207                 else:
208                     peer = random.choice(potential_peers) if potential_peers and random.random() > .2 else peer2
209                     if peer is None:
210                         continue
211                 
212                 print 'Requesting parent share %s from %s' % (p2pool_data.format_hash(share_hash), '%s:%i' % peer.addr)
213                 peer.send_getshares(
214                     hashes=[share_hash],
215                     parents=2000,
216                     stops=list(set(tracker.heads) | set(
217                         tracker.get_nth_parent_hash(head, min(max(0, tracker.get_height_and_last(head)[0] - 1), 10)) for head in tracker.heads
218                     ))[:100],
219                 )
220                 requested[share_hash] = t, count + 1
221         pre_current_work.changed.watch(lambda _: set_real_work2())
222         pre_merged_work.changed.watch(lambda _: set_real_work2())
223         set_real_work2()
224         print '    ...success!'
225         print
226         
227         
228         @defer.inlineCallbacks
229         def set_merged_work(merged_url, merged_userpass):
230             merged_proxy = jsonrpc.Proxy(merged_url, dict(Authorization='Basic ' + base64.b64encode(merged_userpass)))
231             while True:
232                 auxblock = yield deferral.retry('Error while calling merged getauxblock:', 1)(merged_proxy.rpc_getauxblock)()
233                 pre_merged_work.set(dict(pre_merged_work.value, **{auxblock['chainid']: dict(
234                     hash=int(auxblock['hash'], 16),
235                     target=pack.IntType(256).unpack(auxblock['target'].decode('hex')),
236                     merged_proxy=merged_proxy,
237                 )}))
238                 yield deferral.sleep(1)
239         for merged_url, merged_userpass in merged_urls:
240             set_merged_work(merged_url, merged_userpass)
241         
242         @pre_merged_work.changed.watch
243         def _(new_merged_work):
244             print 'Got new merged mining work!'
245         
246         # setup p2p logic and join p2pool network
247         
248         class Node(p2p.Node):
249             def handle_shares(self, shares, peer):
250                 if len(shares) > 5:
251                     print 'Processing %i shares from %s...' % (len(shares), '%s:%i' % peer.addr if peer is not None else None)
252                 
253                 new_count = 0
254                 for share in shares:
255                     if share.hash in tracker.shares:
256                         #print 'Got duplicate share, ignoring. Hash: %s' % (p2pool_data.format_hash(share.hash),)
257                         continue
258                     
259                     new_count += 1
260                     
261                     #print 'Received share %s from %r' % (p2pool_data.format_hash(share.hash), share.peer.addr if share.peer is not None else None)
262                     
263                     tracker.add(share)
264                 
265                 if shares and peer is not None:
266                     peer_heads.setdefault(shares[0].hash, set()).add(peer)
267                 
268                 if new_count:
269                     set_real_work2()
270                 
271                 if len(shares) > 5:
272                     print '... done processing %i shares. New: %i Have: %i/~%i' % (len(shares), new_count, len(tracker.shares), 2*net.CHAIN_LENGTH)
273             
274             def handle_share_hashes(self, hashes, peer):
275                 t = time.time()
276                 get_hashes = []
277                 for share_hash in hashes:
278                     if share_hash in tracker.shares:
279                         continue
280                     last_request_time, count = requested.get(share_hash, (None, 0))
281                     if last_request_time is not None and last_request_time - 5 < t < last_request_time + 10 * 1.5**count:
282                         continue
283                     print 'Got share hash, requesting! Hash: %s' % (p2pool_data.format_hash(share_hash),)
284                     get_hashes.append(share_hash)
285                     requested[share_hash] = t, count + 1
286                 
287                 if hashes and peer is not None:
288                     peer_heads.setdefault(hashes[0], set()).add(peer)
289                 if get_hashes:
290                     peer.send_getshares(hashes=get_hashes, parents=0, stops=[])
291             
292             def handle_get_shares(self, hashes, parents, stops, peer):
293                 parents = min(parents, 1000//len(hashes))
294                 stops = set(stops)
295                 shares = []
296                 for share_hash in hashes:
297                     for share in tracker.get_chain(share_hash, min(parents + 1, tracker.get_height(share_hash))):
298                         if share.hash in stops:
299                             break
300                         shares.append(share)
301                 print 'Sending %i shares to %s:%i' % (len(shares), peer.addr[0], peer.addr[1])
302                 return shares
303         
304         @deferral.retry('Error submitting primary block: (will retry)', 10, 10)
305         def submit_block_p2p(block):
306             if factory.conn.value is None:
307                 print >>sys.stderr, 'No bitcoind connection when block submittal attempted! %s%32x' % (net.PARENT.BLOCK_EXPLORER_URL_PREFIX, header_hash)
308                 raise deferral.RetrySilentlyException()
309             factory.conn.value.send_block(block=block)
310         
311         @deferral.retry('Error submitting block: (will retry)', 10, 10)
312         @defer.inlineCallbacks
313         def submit_block_rpc(block, ignore_failure):
314             success = yield bitcoind.rpc_getmemorypool(bitcoin_data.block_type.pack(block).encode('hex'))
315             success_expected = net.PARENT.POW_FUNC(bitcoin_data.block_header_type.pack(block['header'])) <= block['header']['bits'].target
316             if (not success and success_expected and not ignore_failure) or (success and not success_expected):
317                 print >>sys.stderr, 'Block submittal result: %s Expected: %s' % (result, expected_result)
318         
319         def submit_block(block, ignore_failure):
320             submit_block_p2p(block)
321             submit_block_rpc(block, ignore_failure)
322         
323         @tracker.verified.added.watch
324         def _(share):
325             if share.pow_hash <= share.header['bits'].target:
326                 submit_block(share.as_block(tracker), ignore_failure=True)
327                 print
328                 print 'GOT BLOCK FROM PEER! Passing to bitcoind! %s bitcoin: %s%064x' % (p2pool_data.format_hash(share.hash), net.PARENT.BLOCK_EXPLORER_URL_PREFIX, share.header_hash)
329                 print
330                 if current_work.value['previous_block'] in [share.header['previous_block'], share.header_hash]:
331                     broadcast_share(share.hash)
332         
333         print 'Joining p2pool network using port %i...' % (args.p2pool_port,)
334         
335         @defer.inlineCallbacks
336         def parse(x):
337             if ':' in x:
338                 ip, port = x.split(':')
339                 defer.returnValue(((yield reactor.resolve(ip)), int(port)))
340             else:
341                 defer.returnValue(((yield reactor.resolve(x)), net.P2P_PORT))
342         
343         addrs = {}
344         if os.path.exists(os.path.join(datadir_path, 'addrs')):
345             try:
346                 with open(os.path.join(datadir_path, 'addrs'), 'rb') as f:
347                     addrs.update(dict((tuple(k), v) for k, v in json.loads(f.read())))
348             except:
349                 print >>sys.stderr, 'error parsing addrs'
350         elif os.path.exists(os.path.join(datadir_path, 'addrs.txt')):
351             try:
352                 addrs.update(dict(eval(x) for x in open(os.path.join(datadir_path, 'addrs.txt'))))
353             except:
354                 print >>sys.stderr, "error reading addrs.txt"
355         for addr_df in map(parse, net.BOOTSTRAP_ADDRS):
356             try:
357                 addr = yield addr_df
358                 if addr not in addrs:
359                     addrs[addr] = (0, time.time(), time.time())
360             except:
361                 log.err()
362         
363         connect_addrs = set()
364         for addr_df in map(parse, args.p2pool_nodes):
365             try:
366                 connect_addrs.add((yield addr_df))
367             except:
368                 log.err()
369         
370         p2p_node = Node(
371             best_share_hash_func=lambda: current_work.value['best_share_hash'],
372             port=args.p2pool_port,
373             net=net,
374             addr_store=addrs,
375             connect_addrs=connect_addrs,
376             max_incoming_conns=args.p2pool_conns,
377         )
378         p2p_node.start()
379         
380         def save_addrs():
381             with open(os.path.join(datadir_path, 'addrs'), 'wb') as f:
382                 f.write(json.dumps(p2p_node.addr_store.items()))
383         task.LoopingCall(save_addrs).start(60)
384         
385         def broadcast_share(share_hash):
386             shares = []
387             for share in tracker.get_chain(share_hash, min(5, tracker.get_height(share_hash))):
388                 if share.hash in shared_share_hashes:
389                     break
390                 shared_share_hashes.add(share.hash)
391                 shares.append(share)
392             
393             for peer in p2p_node.peers.itervalues():
394                 peer.sendShares([share for share in shares if share.peer is not peer])
395         
396         # send share when the chain changes to their chain
397         current_work.changed.watch(lambda new_work: broadcast_share(new_work['best_share_hash']))
398         
399         def save_shares():
400             for share in tracker.get_chain(current_work.value['best_share_hash'], min(tracker.get_height(current_work.value['best_share_hash']), 2*net.CHAIN_LENGTH)):
401                 ss.add_share(share)
402                 if share.hash in tracker.verified.shares:
403                     ss.add_verified_hash(share.hash)
404         task.LoopingCall(save_shares).start(60)
405         
406         print '    ...success!'
407         print
408         
409         if args.upnp:
410             @defer.inlineCallbacks
411             def upnp_thread():
412                 while True:
413                     try:
414                         is_lan, lan_ip = yield ipdiscover.get_local_ip()
415                         if is_lan:
416                             pm = yield portmapper.get_port_mapper()
417                             yield pm._upnp.add_port_mapping(lan_ip, args.p2pool_port, args.p2pool_port, 'p2pool', 'TCP')
418                     except defer.TimeoutError:
419                         pass
420                     except:
421                         if p2pool.DEBUG:
422                             log.err(None, 'UPnP error:')
423                     yield deferral.sleep(random.expovariate(1/120))
424             upnp_thread()
425         
426         # start listening for workers with a JSON-RPC server
427         
428         print 'Listening for workers on %r port %i...' % (worker_endpoint[0], worker_endpoint[1])
429         
430         # setup worker logic
431         
432         removed_unstales_var = variable.Variable((0, 0, 0))
433         removed_doa_unstales_var = variable.Variable(0)
434         @tracker.verified.removed.watch
435         def _(share):
436             if share.hash in my_share_hashes and tracker.is_child_of(share.hash, current_work.value['best_share_hash']):
437                 assert share.share_data['stale_info'] in [0, 253, 254] # we made these shares in this instance
438                 removed_unstales_var.set((
439                     removed_unstales_var.value[0] + 1,
440                     removed_unstales_var.value[1] + (1 if share.share_data['stale_info'] == 253 else 0),
441                     removed_unstales_var.value[2] + (1 if share.share_data['stale_info'] == 254 else 0),
442                 ))
443             if share.hash in my_doa_share_hashes and tracker.is_child_of(share.hash, current_work.value['best_share_hash']):
444                 removed_doa_unstales.set(removed_doa_unstales.value + 1)
445         
446         def get_stale_counts():
447             '''Returns (orphans, doas), total, (orphans_recorded_in_chain, doas_recorded_in_chain)'''
448             my_shares = len(my_share_hashes)
449             my_doa_shares = len(my_doa_share_hashes)
450             delta = tracker.verified.get_delta_to_last(current_work.value['best_share_hash'])
451             my_shares_in_chain = delta.my_count + removed_unstales_var.value[0]
452             my_doa_shares_in_chain = delta.my_doa_count + removed_doa_unstales_var.value
453             orphans_recorded_in_chain = delta.my_orphan_announce_count + removed_unstales_var.value[1]
454             doas_recorded_in_chain = delta.my_dead_announce_count + removed_unstales_var.value[2]
455             
456             my_shares_not_in_chain = my_shares - my_shares_in_chain
457             my_doa_shares_not_in_chain = my_doa_shares - my_doa_shares_in_chain
458             
459             return (my_shares_not_in_chain - my_doa_shares_not_in_chain, my_doa_shares_not_in_chain), my_shares, (orphans_recorded_in_chain, doas_recorded_in_chain)
460         
461         
462         pseudoshare_received = variable.Event()
463         share_received = variable.Event()
464         local_rate_monitor = math.RateMonitor(10*60)
465         
466         class WorkerBridge(worker_interface.WorkerBridge):
467             def __init__(self):
468                 worker_interface.WorkerBridge.__init__(self)
469                 self.new_work_event = current_work.changed
470                 self.recent_shares_ts_work = []
471             
472             def get_user_details(self, request):
473                 user = request.getUser() if request.getUser() is not None else ''
474                 
475                 desired_pseudoshare_target = None
476                 if '+' in user:
477                     user, desired_pseudoshare_difficulty_str = user.rsplit('+', 1)
478                     try:
479                         desired_pseudoshare_target = bitcoin_data.difficulty_to_target(float(desired_pseudoshare_difficulty_str))
480                     except:
481                         pass
482                 
483                 desired_share_target = 2**256 - 1
484                 if '/' in user:
485                     user, min_diff_str = user.rsplit('/', 1)
486                     try:
487                         desired_share_target = bitcoin_data.difficulty_to_target(float(min_diff_str))
488                     except:
489                         pass
490                 
491                 if random.uniform(0, 100) < args.worker_fee:
492                     pubkey_hash = my_pubkey_hash
493                 else:
494                     try:
495                         pubkey_hash = bitcoin_data.address_to_pubkey_hash(user, net.PARENT)
496                     except: # XXX blah
497                         pubkey_hash = my_pubkey_hash
498                 
499                 return user, pubkey_hash, desired_share_target, desired_pseudoshare_target
500             
501             def preprocess_request(self, request):
502                 user, pubkey_hash, desired_share_target, desired_pseudoshare_target = self.get_user_details(request)
503                 return pubkey_hash, desired_share_target, desired_pseudoshare_target
504             
505             def get_work(self, pubkey_hash, desired_share_target, desired_pseudoshare_target):
506                 if len(p2p_node.peers) == 0 and net.PERSIST:
507                     raise jsonrpc.Error(-12345, u'p2pool is not connected to any peers')
508                 if current_work.value['best_share_hash'] is None and net.PERSIST:
509                     raise jsonrpc.Error(-12345, u'p2pool is downloading shares')
510                 if time.time() > current_work2.value['last_update'] + 60:
511                     raise jsonrpc.Error(-12345, u'lost contact with bitcoind')
512                 
513                 if current_work.value['mm_chains']:
514                     tree, size = bitcoin_data.make_auxpow_tree(current_work.value['mm_chains'])
515                     mm_hashes = [current_work.value['mm_chains'].get(tree.get(i), dict(hash=0))['hash'] for i in xrange(size)]
516                     mm_data = '\xfa\xbemm' + bitcoin_data.aux_pow_coinbase_type.pack(dict(
517                         merkle_root=bitcoin_data.merkle_hash(mm_hashes),
518                         size=size,
519                         nonce=0,
520                     ))
521                     mm_later = [(aux_work, mm_hashes.index(aux_work['hash']), mm_hashes) for chain_id, aux_work in current_work.value['mm_chains'].iteritems()]
522                 else:
523                     mm_data = ''
524                     mm_later = []
525                 
526                 if True:
527                     share_info, generate_tx = p2pool_data.Share.generate_transaction(
528                         tracker=tracker,
529                         share_data=dict(
530                             previous_share_hash=current_work.value['best_share_hash'],
531                             coinbase=(mm_data + current_work.value['coinbaseflags'])[:100],
532                             nonce=random.randrange(2**32),
533                             pubkey_hash=pubkey_hash,
534                             subsidy=current_work2.value['subsidy'],
535                             donation=math.perfect_round(65535*args.donation_percentage/100),
536                             stale_info=(lambda (orphans, doas), total, (orphans_recorded_in_chain, doas_recorded_in_chain):
537                                 253 if orphans > orphans_recorded_in_chain else
538                                 254 if doas > doas_recorded_in_chain else
539                                 0
540                             )(*get_stale_counts()),
541                             desired_version=1,
542                         ),
543                         block_target=current_work.value['bits'].target,
544                         desired_timestamp=int(time.time() - current_work2.value['clock_offset']),
545                         desired_target=desired_share_target,
546                         ref_merkle_link=dict(branch=[], index=0),
547                         net=net,
548                     )
549                 
550                 target = net.PARENT.SANE_MAX_TARGET
551                 if desired_pseudoshare_target is None:
552                     if len(self.recent_shares_ts_work) == 50:
553                         hash_rate = sum(work for ts, work in self.recent_shares_ts_work[1:])//(self.recent_shares_ts_work[-1][0] - self.recent_shares_ts_work[0][0])
554                         target = min(target, 4*2**256//hash_rate)
555                 else:
556                     target = min(target, desired_pseudoshare_target)
557                 target = max(target, share_info['bits'].target)
558                 for aux_work in current_work.value['mm_chains'].itervalues():
559                     target = max(target, aux_work['target'])
560                 
561                 transactions = [generate_tx] + list(current_work2.value['transactions'])
562                 packed_generate_tx = bitcoin_data.tx_type.pack(generate_tx)
563                 merkle_root = bitcoin_data.check_merkle_link(bitcoin_data.hash256(packed_generate_tx), current_work2.value['merkle_link'])
564                 
565                 getwork_time = time.time()
566                 merkle_link = current_work2.value['merkle_link']
567                 
568                 print 'New work for worker! Difficulty: %.06f Share difficulty: %.06f Total block value: %.6f %s including %i transactions' % (
569                     bitcoin_data.target_to_difficulty(target),
570                     bitcoin_data.target_to_difficulty(share_info['bits'].target),
571                     current_work2.value['subsidy']*1e-8, net.PARENT.SYMBOL,
572                     len(current_work2.value['transactions']),
573                 )
574                 
575                 ba = bitcoin_getwork.BlockAttempt(
576                     version=current_work.value['version'],
577                     previous_block=current_work.value['previous_block'],
578                     merkle_root=merkle_root,
579                     timestamp=current_work2.value['time'],
580                     bits=current_work.value['bits'],
581                     share_target=target,
582                 )
583                 
584                 received_header_hashes = set()
585                 
586                 def got_response(header, request):
587                     user, _, _, _ = self.get_user_details(request)
588                     assert header['merkle_root'] == merkle_root
589                     
590                     header_hash = bitcoin_data.hash256(bitcoin_data.block_header_type.pack(header))
591                     pow_hash = net.PARENT.POW_FUNC(bitcoin_data.block_header_type.pack(header))
592                     on_time = current_work.value['best_share_hash'] == share_info['share_data']['previous_share_hash']
593                     
594                     try:
595                         if pow_hash <= header['bits'].target or p2pool.DEBUG:
596                             submit_block(dict(header=header, txs=transactions), ignore_failure=False)
597                             if pow_hash <= header['bits'].target:
598                                 print
599                                 print 'GOT BLOCK FROM MINER! Passing to bitcoind! %s%064x' % (net.PARENT.BLOCK_EXPLORER_URL_PREFIX, header_hash)
600                                 print
601                     except:
602                         log.err(None, 'Error while processing potential block:')
603                     
604                     for aux_work, index, hashes in mm_later:
605                         try:
606                             if pow_hash <= aux_work['target'] or p2pool.DEBUG:
607                                 df = deferral.retry('Error submitting merged block: (will retry)', 10, 10)(aux_work['merged_proxy'].rpc_getauxblock)(
608                                     pack.IntType(256, 'big').pack(aux_work['hash']).encode('hex'),
609                                     bitcoin_data.aux_pow_type.pack(dict(
610                                         merkle_tx=dict(
611                                             tx=transactions[0],
612                                             block_hash=header_hash,
613                                             merkle_link=merkle_link,
614                                         ),
615                                         merkle_link=bitcoin_data.calculate_merkle_link(hashes, index),
616                                         parent_block_header=header,
617                                     )).encode('hex'),
618                                 )
619                                 @df.addCallback
620                                 def _(result):
621                                     if result != (pow_hash <= aux_work['target']):
622                                         print >>sys.stderr, 'Merged block submittal result: %s Expected: %s' % (result, pow_hash <= aux_work['target'])
623                                     else:
624                                         print 'Merged block submittal result: %s' % (result,)
625                                 @df.addErrback
626                                 def _(err):
627                                     log.err(err, 'Error submitting merged block:')
628                         except:
629                             log.err(None, 'Error while processing merged mining POW:')
630                     
631                     if pow_hash <= share_info['bits'].target and header_hash not in received_header_hashes:
632                         min_header = dict(header);del min_header['merkle_root']
633                         hash_link = p2pool_data.prefix_to_hash_link(packed_generate_tx[:-32-4], p2pool_data.Share.gentx_before_refhash)
634                         share = p2pool_data.Share(net, None, dict(
635                             min_header=min_header, share_info=share_info, hash_link=hash_link,
636                             ref_merkle_link=dict(branch=[], index=0),
637                         ), merkle_link=merkle_link, other_txs=transactions[1:] if pow_hash <= header['bits'].target else None)
638                         
639                         print 'GOT SHARE! %s %s prev %s age %.2fs%s' % (
640                             request.getUser(),
641                             p2pool_data.format_hash(share.hash),
642                             p2pool_data.format_hash(share.previous_hash),
643                             time.time() - getwork_time,
644                             ' DEAD ON ARRIVAL' if not on_time else '',
645                         )
646                         my_share_hashes.add(share.hash)
647                         if not on_time:
648                             my_doa_share_hashes.add(share.hash)
649                         
650                         tracker.add(share)
651                         if not p2pool.DEBUG:
652                             tracker.verified.add(share)
653                         set_real_work2()
654                         
655                         try:
656                             if pow_hash <= header['bits'].target or p2pool.DEBUG:
657                                 for peer in p2p_node.peers.itervalues():
658                                     peer.sendShares([share])
659                                 shared_share_hashes.add(share.hash)
660                         except:
661                             log.err(None, 'Error forwarding block solution:')
662                         
663                         share_received.happened(bitcoin_data.target_to_average_attempts(share.target), not on_time)
664                     
665                     if pow_hash > target:
666                         print 'Worker %s submitted share with hash > target:' % (request.getUser(),)
667                         print '    Hash:   %56x' % (pow_hash,)
668                         print '    Target: %56x' % (target,)
669                     elif header_hash in received_header_hashes:
670                         print >>sys.stderr, 'Worker %s @ %s submitted share more than once!' % (request.getUser(), request.getClientIP())
671                     else:
672                         received_header_hashes.add(header_hash)
673                         
674                         pseudoshare_received.happened(bitcoin_data.target_to_average_attempts(target), not on_time, user)
675                         self.recent_shares_ts_work.append((time.time(), bitcoin_data.target_to_average_attempts(target)))
676                         while len(self.recent_shares_ts_work) > 50:
677                             self.recent_shares_ts_work.pop(0)
678                         local_rate_monitor.add_datum(dict(work=bitcoin_data.target_to_average_attempts(target), dead=not on_time, user=user))
679                     
680                     return on_time
681                 
682                 return ba, got_response
683         
684         get_current_txouts = lambda: p2pool_data.get_expected_payouts(tracker, current_work.value['best_share_hash'], current_work.value['bits'].target, current_work2.value['subsidy'], net)
685         
686         web_root = web.get_web_root(tracker, current_work, current_work2, get_current_txouts, datadir_path, net, get_stale_counts, my_pubkey_hash, local_rate_monitor, args.worker_fee, p2p_node, my_share_hashes, pseudoshare_received, share_received)
687         worker_interface.WorkerInterface(WorkerBridge()).attach_to(web_root, get_handler=lambda request: request.redirect('/static/'))
688         
689         deferral.retry('Error binding to worker port:', traceback=False)(reactor.listenTCP)(worker_endpoint[1], server.Site(web_root), interface=worker_endpoint[0])
690         
691         with open(os.path.join(os.path.join(datadir_path, 'ready_flag')), 'wb') as f:
692             pass
693         
694         print '    ...success!'
695         print
696         
697         
698         @defer.inlineCallbacks
699         def work_poller():
700             while True:
701                 flag = factory.new_block.get_deferred()
702                 try:
703                     yield set_real_work1()
704                 except:
705                     log.err()
706                 yield defer.DeferredList([flag, deferral.sleep(15)], fireOnOneCallback=True)
707         work_poller()
708         
709         
710         # done!
711         print 'Started successfully!'
712         print 'Go to http://127.0.0.1:%i/ to view graphs and statistics!' % (worker_endpoint[1],)
713         if args.donation_percentage > 0.51:
714             print '''Donating %.1f%% of work towards P2Pool's development. Thanks for the tip!''' % (args.donation_percentage,)
715         elif args.donation_percentage < 0.49:
716             print '''Donating %.1f%% of work towards P2Pool's development. Please donate to encourage further development of P2Pool!''' % (args.donation_percentage,)
717         else:
718             print '''Donating %.1f%% of work towards P2Pool's development. Thank you!''' % (args.donation_percentage,)
719             print 'You can increase this amount with --give-author argument! (or decrease it, if you must)'
720         print
721         
722         
723         if hasattr(signal, 'SIGALRM'):
724             signal.signal(signal.SIGALRM, lambda signum, frame: reactor.callFromThread(
725                 sys.stderr.write, 'Watchdog timer went off at:\n' + ''.join(traceback.format_stack())
726             ))
727             signal.siginterrupt(signal.SIGALRM, False)
728             task.LoopingCall(signal.alarm, 30).start(1)
729         
730         if args.irc_announce:
731             from twisted.words.protocols import irc
732             class IRCClient(irc.IRCClient):
733                 nickname = 'p2pool%02i' % (random.randrange(100),)
734                 channel = net.ANNOUNCE_CHANNEL
735                 def lineReceived(self, line):
736                     if p2pool.DEBUG:
737                         print repr(line)
738                     irc.IRCClient.lineReceived(self, line)
739                 def signedOn(self):
740                     irc.IRCClient.signedOn(self)
741                     self.factory.resetDelay()
742                     self.join(self.channel)
743                     @defer.inlineCallbacks
744                     def new_share(share):
745                         if share.pow_hash <= share.header['bits'].target and abs(share.timestamp - time.time()) < 10*60:
746                             yield deferral.sleep(random.expovariate(1/60))
747                             message = '\x02%s BLOCK FOUND by %s! %s%064x' % (net.NAME.upper(), bitcoin_data.script2_to_address(share.new_script, net.PARENT), net.PARENT.BLOCK_EXPLORER_URL_PREFIX, share.header_hash)
748                             if message not in self.recent_messages:
749                                 self.say(self.channel, message)
750                                 self._remember_message(message)
751                     self.watch_id = tracker.verified.added.watch(new_share)
752                     self.recent_messages = []
753                 def _remember_message(self, message):
754                     self.recent_messages.append(message)
755                     while len(self.recent_messages) > 100:
756                         self.recent_messages.pop(0)
757                 def privmsg(self, user, channel, message):
758                     if channel == self.channel:
759                         self._remember_message(message)
760                 def connectionLost(self, reason):
761                     tracker.verified.added.unwatch(self.watch_id)
762                     print 'IRC connection lost:', reason.getErrorMessage()
763             class IRCClientFactory(protocol.ReconnectingClientFactory):
764                 protocol = IRCClient
765             reactor.connectTCP("irc.freenode.net", 6667, IRCClientFactory())
766         
767         @defer.inlineCallbacks
768         def status_thread():
769             last_str = None
770             last_time = 0
771             while True:
772                 yield deferral.sleep(3)
773                 try:
774                     if time.time() > current_work2.value['last_update'] + 60:
775                         print >>sys.stderr, '''---> LOST CONTACT WITH BITCOIND for %s! Check that it isn't frozen or dead! <---''' % (math.format_dt(time.time() - current_work2.value['last_update']),)
776                     
777                     height = tracker.get_height(current_work.value['best_share_hash'])
778                     this_str = 'P2Pool: %i shares in chain (%i verified/%i total) Peers: %i (%i incoming)' % (
779                         height,
780                         len(tracker.verified.shares),
781                         len(tracker.shares),
782                         len(p2p_node.peers),
783                         sum(1 for peer in p2p_node.peers.itervalues() if peer.incoming),
784                     ) + (' FDs: %i R/%i W' % (len(reactor.getReaders()), len(reactor.getWriters())) if p2pool.DEBUG else '')
785                     
786                     datums, dt = local_rate_monitor.get_datums_in_last()
787                     my_att_s = sum(datum['work']/dt for datum in datums)
788                     this_str += '\n Local: %sH/s in last %s Local dead on arrival: %s Expected time to share: %s' % (
789                         math.format(int(my_att_s)),
790                         math.format_dt(dt),
791                         math.format_binomial_conf(sum(1 for datum in datums if datum['dead']), len(datums), 0.95),
792                         math.format_dt(2**256 / tracker.shares[current_work.value['best_share_hash']].max_target / my_att_s) if my_att_s and current_work.value['best_share_hash'] else '???',
793                     )
794                     
795                     if height > 2:
796                         (stale_orphan_shares, stale_doa_shares), shares, _ = get_stale_counts()
797                         stale_prop = p2pool_data.get_average_stale_prop(tracker, current_work.value['best_share_hash'], min(720, height))
798                         real_att_s = p2pool_data.get_pool_attempts_per_second(tracker, current_work.value['best_share_hash'], min(height - 1, 720)) / (1 - stale_prop)
799                         
800                         this_str += '\n Shares: %i (%i orphan, %i dead) Stale rate: %s Efficiency: %s Current payout: %.4f %s' % (
801                             shares, stale_orphan_shares, stale_doa_shares,
802                             math.format_binomial_conf(stale_orphan_shares + stale_doa_shares, shares, 0.95),
803                             math.format_binomial_conf(stale_orphan_shares + stale_doa_shares, shares, 0.95, lambda x: (1 - x)/(1 - stale_prop)),
804                             get_current_txouts().get(bitcoin_data.pubkey_hash_to_script2(my_pubkey_hash), 0)*1e-8, net.PARENT.SYMBOL,
805                         )
806                         this_str += '\n Pool: %sH/s Stale rate: %.1f%% Expected time to block: %s' % (
807                             math.format(int(real_att_s)),
808                             100*stale_prop,
809                             math.format_dt(2**256 / current_work.value['bits'].target / real_att_s),
810                         )
811                         
812                         for warning in p2pool_data.get_warnings(tracker, current_work):
813                             print >>sys.stderr, '#'*40
814                             print >>sys.stderr, '>>> Warning: ' + warning
815                             print >>sys.stderr, '#'*40
816                     
817                     if this_str != last_str or time.time() > last_time + 15:
818                         print this_str
819                         last_str = this_str
820                         last_time = time.time()
821                 except:
822                     log.err()
823         status_thread()
824     except:
825         reactor.stop()
826         log.err(None, 'Fatal error:')
827
828 def run():
829     class FixedArgumentParser(argparse.ArgumentParser):
830         def _read_args_from_files(self, arg_strings):
831             # expand arguments referencing files
832             new_arg_strings = []
833             for arg_string in arg_strings:
834                 
835                 # for regular arguments, just add them back into the list
836                 if not arg_string or arg_string[0] not in self.fromfile_prefix_chars:
837                     new_arg_strings.append(arg_string)
838                 
839                 # replace arguments referencing files with the file content
840                 else:
841                     try:
842                         args_file = open(arg_string[1:])
843                         try:
844                             arg_strings = []
845                             for arg_line in args_file.read().splitlines():
846                                 for arg in self.convert_arg_line_to_args(arg_line):
847                                     arg_strings.append(arg)
848                             arg_strings = self._read_args_from_files(arg_strings)
849                             new_arg_strings.extend(arg_strings)
850                         finally:
851                             args_file.close()
852                     except IOError:
853                         err = sys.exc_info()[1]
854                         self.error(str(err))
855             
856             # return the modified argument list
857             return new_arg_strings
858         
859         def convert_arg_line_to_args(self, arg_line):
860             return [arg for arg in arg_line.split() if arg.strip()]
861     
862     
863     realnets=dict((name, net) for name, net in networks.nets.iteritems() if '_testnet' not in name)
864     
865     parser = FixedArgumentParser(description='p2pool (version %s)' % (p2pool.__version__,), fromfile_prefix_chars='@')
866     parser.add_argument('--version', action='version', version=p2pool.__version__)
867     parser.add_argument('--net',
868         help='use specified network (default: bitcoin)',
869         action='store', choices=sorted(realnets), default='bitcoin', dest='net_name')
870     parser.add_argument('--testnet',
871         help='''use the network's testnet''',
872         action='store_const', const=True, default=False, dest='testnet')
873     parser.add_argument('--debug',
874         help='enable debugging mode',
875         action='store_const', const=True, default=False, dest='debug')
876     parser.add_argument('-a', '--address',
877         help='generate payouts to this address (default: <address requested from bitcoind>)',
878         type=str, action='store', default=None, dest='address')
879     parser.add_argument('--datadir',
880         help='store data in this directory (default: <directory run_p2pool.py is in>/data)',
881         type=str, action='store', default=None, dest='datadir')
882     parser.add_argument('--logfile',
883         help='''log to this file (default: data/<NET>/log)''',
884         type=str, action='store', default=None, dest='logfile')
885     parser.add_argument('--merged',
886         help='call getauxblock on this url to get work for merged mining (example: http://ncuser:ncpass@127.0.0.1:10332/)',
887         type=str, action='append', default=[], dest='merged_urls')
888     parser.add_argument('--give-author', metavar='DONATION_PERCENTAGE',
889         help='donate this percentage of work towards the development of p2pool (default: 0.5)',
890         type=float, action='store', default=0.5, dest='donation_percentage')
891     parser.add_argument('--irc-announce',
892         help='announce any blocks found on irc://irc.freenode.net/#p2pool',
893         action='store_true', default=False, dest='irc_announce')
894     
895     p2pool_group = parser.add_argument_group('p2pool interface')
896     p2pool_group.add_argument('--p2pool-port', metavar='PORT',
897         help='use port PORT to listen for connections (forward this port from your router!) (default: %s)' % ', '.join('%s:%i' % (name, net.P2P_PORT) for name, net in sorted(realnets.items())),
898         type=int, action='store', default=None, dest='p2pool_port')
899     p2pool_group.add_argument('-n', '--p2pool-node', metavar='ADDR[:PORT]',
900         help='connect to existing p2pool node at ADDR listening on port PORT (defaults to default p2pool P2P port) in addition to builtin addresses',
901         type=str, action='append', default=[], dest='p2pool_nodes')
902     parser.add_argument('--disable-upnp',
903         help='''don't attempt to use UPnP to forward p2pool's P2P port from the Internet to this computer''',
904         action='store_false', default=True, dest='upnp')
905     p2pool_group.add_argument('--max-conns', metavar='CONNS',
906         help='maximum incoming connections (default: 40)',
907         type=int, action='store', default=40, dest='p2pool_conns')
908     
909     worker_group = parser.add_argument_group('worker interface')
910     worker_group.add_argument('-w', '--worker-port', metavar='PORT or ADDR:PORT',
911         help='listen on PORT on interface with ADDR for RPC connections from miners (default: all interfaces, %s)' % ', '.join('%s:%i' % (name, net.WORKER_PORT) for name, net in sorted(realnets.items())),
912         type=str, action='store', default=None, dest='worker_endpoint')
913     worker_group.add_argument('-f', '--fee', metavar='FEE_PERCENTAGE',
914         help='''charge workers mining to their own bitcoin address (by setting their miner's username to a bitcoin address) this percentage fee to mine on your p2pool instance. Amount displayed at http://127.0.0.1:WORKER_PORT/fee (default: 0)''',
915         type=float, action='store', default=0, dest='worker_fee')
916     
917     bitcoind_group = parser.add_argument_group('bitcoind interface')
918     bitcoind_group.add_argument('--bitcoind-address', metavar='BITCOIND_ADDRESS',
919         help='connect to this address (default: 127.0.0.1)',
920         type=str, action='store', default='127.0.0.1', dest='bitcoind_address')
921     bitcoind_group.add_argument('--bitcoind-rpc-port', metavar='BITCOIND_RPC_PORT',
922         help='''connect to JSON-RPC interface at this port (default: %s <read from bitcoin.conf if password not provided>)''' % ', '.join('%s:%i' % (name, net.PARENT.RPC_PORT) for name, net in sorted(realnets.items())),
923         type=int, action='store', default=None, dest='bitcoind_rpc_port')
924     bitcoind_group.add_argument('--bitcoind-p2p-port', metavar='BITCOIND_P2P_PORT',
925         help='''connect to P2P interface at this port (default: %s <read from bitcoin.conf if password not provided>)''' % ', '.join('%s:%i' % (name, net.PARENT.P2P_PORT) for name, net in sorted(realnets.items())),
926         type=int, action='store', default=None, dest='bitcoind_p2p_port')
927     
928     bitcoind_group.add_argument(metavar='BITCOIND_RPCUSERPASS',
929         help='bitcoind RPC interface username, then password, space-separated (only one being provided will cause the username to default to being empty, and none will cause P2Pool to read them from bitcoin.conf)',
930         type=str, action='store', default=[], nargs='*', dest='bitcoind_rpc_userpass')
931     
932     args = parser.parse_args()
933     
934     if args.debug:
935         p2pool.DEBUG = True
936     
937     net_name = args.net_name + ('_testnet' if args.testnet else '')
938     net = networks.nets[net_name]
939     
940     datadir_path = os.path.join((os.path.join(os.path.dirname(sys.argv[0]), 'data') if args.datadir is None else args.datadir), net_name)
941     if not os.path.exists(datadir_path):
942         os.makedirs(datadir_path)
943     
944     if len(args.bitcoind_rpc_userpass) > 2:
945         parser.error('a maximum of two arguments are allowed')
946     args.bitcoind_rpc_username, args.bitcoind_rpc_password = ([None, None] + args.bitcoind_rpc_userpass)[-2:]
947     
948     if args.bitcoind_rpc_password is None:
949         if not hasattr(net.PARENT, 'CONF_FILE_FUNC'):
950             parser.error('This network has no configuration file function. Manually enter your RPC password.')
951         conf_path = net.PARENT.CONF_FILE_FUNC()
952         if not os.path.exists(conf_path):
953             parser.error('''Bitcoin configuration file not found. Manually enter your RPC password.\r\n'''
954                 '''If you actually haven't created a configuration file, you should create one at %s with the text:\r\n'''
955                 '''\r\n'''
956                 '''server=1\r\n'''
957                 '''rpcpassword=%x''' % (conf_path, random.randrange(2**128)))
958         with open(conf_path, 'rb') as f:
959             cp = ConfigParser.RawConfigParser()
960             cp.readfp(StringIO.StringIO('[x]\r\n' + f.read()))
961             for conf_name, var_name, var_type in [
962                 ('rpcuser', 'bitcoind_rpc_username', str),
963                 ('rpcpassword', 'bitcoind_rpc_password', str),
964                 ('rpcport', 'bitcoind_rpc_port', int),
965                 ('port', 'bitcoind_p2p_port', int),
966             ]:
967                 if getattr(args, var_name) is None and cp.has_option('x', conf_name):
968                     setattr(args, var_name, var_type(cp.get('x', conf_name)))
969         if args.bitcoind_rpc_password is None:
970             parser.error('''Bitcoin configuration file didn't contain an rpcpassword= line! Add one!''')
971     
972     if args.bitcoind_rpc_username is None:
973         args.bitcoind_rpc_username = ''
974     
975     if args.bitcoind_rpc_port is None:
976         args.bitcoind_rpc_port = net.PARENT.RPC_PORT
977     
978     if args.bitcoind_p2p_port is None:
979         args.bitcoind_p2p_port = net.PARENT.P2P_PORT
980     
981     if args.p2pool_port is None:
982         args.p2pool_port = net.P2P_PORT
983     
984     if args.worker_endpoint is None:
985         worker_endpoint = '', net.WORKER_PORT
986     elif ':' not in args.worker_endpoint:
987         worker_endpoint = '', int(args.worker_endpoint)
988     else:
989         addr, port = args.worker_endpoint.rsplit(':', 1)
990         worker_endpoint = addr, int(port)
991     
992     if args.address is not None:
993         try:
994             args.pubkey_hash = bitcoin_data.address_to_pubkey_hash(args.address, net.PARENT)
995         except Exception, e:
996             parser.error('error parsing address: ' + repr(e))
997     else:
998         args.pubkey_hash = None
999     
1000     def separate_url(url):
1001         s = urlparse.urlsplit(url)
1002         if '@' not in s.netloc:
1003             parser.error('merged url netloc must contain an "@"')
1004         userpass, new_netloc = s.netloc.rsplit('@', 1)
1005         return urlparse.urlunsplit(s._replace(netloc=new_netloc)), userpass
1006     merged_urls = map(separate_url, args.merged_urls)
1007     
1008     if args.logfile is None:
1009         args.logfile = os.path.join(datadir_path, 'log')
1010     
1011     logfile = logging.LogFile(args.logfile)
1012     pipe = logging.TimestampingPipe(logging.TeePipe([logging.EncodeReplacerPipe(sys.stderr), logfile]))
1013     sys.stdout = logging.AbortPipe(pipe)
1014     sys.stderr = log.DefaultObserver.stderr = logging.AbortPipe(logging.PrefixPipe(pipe, '> '))
1015     if hasattr(signal, "SIGUSR1"):
1016         def sigusr1(signum, frame):
1017             print 'Caught SIGUSR1, closing %r...' % (args.logfile,)
1018             logfile.reopen()
1019             print '...and reopened %r after catching SIGUSR1.' % (args.logfile,)
1020         signal.signal(signal.SIGUSR1, sigusr1)
1021     task.LoopingCall(logfile.reopen).start(5)
1022     
1023     reactor.callWhenRunning(main, args, net, datadir_path, merged_urls, worker_endpoint)
1024     reactor.run()