18ddf6b032b08d38e2b7a6a3cde59877f299bfa8
[p2pool.git] / p2pool / main.py
1 from __future__ import division
2
3 import ConfigParser
4 import StringIO
5 import argparse
6 import base64
7 import os
8 import random
9 import sys
10 import time
11 import signal
12 import traceback
13 import urlparse
14
15 try:
16     from twisted.internet import iocpreactor
17     iocpreactor.install()
18 except:
19     pass
20 else:
21     print 'Using IOCP reactor!'
22 from twisted.internet import defer, reactor, protocol, task
23 from twisted.web import server
24 from twisted.python import log
25 from nattraverso import portmapper, ipdiscover
26
27 import bitcoin.p2p as bitcoin_p2p, bitcoin.getwork as bitcoin_getwork, bitcoin.data as bitcoin_data
28 from bitcoin import worker_interface, height_tracker
29 from util import expiring_dict, jsonrpc, variable, deferral, math, logging, pack
30 from . import p2p, networks, web
31 import p2pool, p2pool.data as p2pool_data
32
33 @deferral.retry('Error getting work from bitcoind:', 3)
34 @defer.inlineCallbacks
35 def getwork(bitcoind):
36     try:
37         work = yield bitcoind.rpc_getmemorypool()
38     except jsonrpc.Error, e:
39         if e.code == -32601: # Method not found
40             print >>sys.stderr, 'Error: Bitcoin version too old! Upgrade to v0.5 or newer!'
41             raise deferral.RetrySilentlyException()
42         raise
43     packed_transactions = [x.decode('hex') for x in work['transactions']]
44     defer.returnValue(dict(
45         version=work['version'],
46         previous_block_hash=int(work['previousblockhash'], 16),
47         transactions=map(bitcoin_data.tx_type.unpack, packed_transactions),
48         merkle_link=bitcoin_data.calculate_merkle_link([0] + map(bitcoin_data.hash256, packed_transactions), 0), # using 0 is a bit of a hack, but will always work when index=0
49         subsidy=work['coinbasevalue'],
50         time=work['time'],
51         bits=bitcoin_data.FloatingIntegerType().unpack(work['bits'].decode('hex')[::-1]) if isinstance(work['bits'], (str, unicode)) else bitcoin_data.FloatingInteger(work['bits']),
52         coinbaseflags=work['coinbaseflags'].decode('hex') if 'coinbaseflags' in work else ''.join(x.decode('hex') for x in work['coinbaseaux'].itervalues()) if 'coinbaseaux' in work else '',
53     ))
54
55 @defer.inlineCallbacks
56 def main(args, net, datadir_path, merged_urls, worker_endpoint):
57     try:
58         print 'p2pool (version %s)' % (p2pool.__version__,)
59         print
60         
61         # connect to bitcoind over JSON-RPC and do initial getmemorypool
62         url = 'http://%s:%i/' % (args.bitcoind_address, args.bitcoind_rpc_port)
63         print '''Testing bitcoind RPC connection to '%s' with username '%s'...''' % (url, args.bitcoind_rpc_username)
64         bitcoind = jsonrpc.Proxy(url, dict(Authorization='Basic ' + base64.b64encode(args.bitcoind_rpc_username + ':' + args.bitcoind_rpc_password)), timeout=30)
65         @deferral.retry('Error while checking Bitcoin connection:', 1)
66         @defer.inlineCallbacks
67         def check():
68             if not (yield net.PARENT.RPC_CHECK)(bitcoind):
69                 print >>sys.stderr, "    Check failed! Make sure that you're connected to the right bitcoind with --bitcoind-rpc-port!"
70                 raise deferral.RetrySilentlyException()
71             temp_work = yield getwork(bitcoind)
72             if not net.VERSION_CHECK((yield bitcoind.rpc_getinfo())['version'], temp_work):
73                 print >>sys.stderr, '    Bitcoin version too old! BIP16 support required! Upgrade to 0.6.0rc4 or greater!'
74                 raise deferral.RetrySilentlyException()
75             defer.returnValue(temp_work)
76         temp_work = yield check()
77         print '    ...success!'
78         print '    Current block hash: %x' % (temp_work['previous_block_hash'],)
79         print
80         
81         # connect to bitcoind over bitcoin-p2p
82         print '''Testing bitcoind P2P connection to '%s:%s'...''' % (args.bitcoind_address, args.bitcoind_p2p_port)
83         factory = bitcoin_p2p.ClientFactory(net.PARENT)
84         reactor.connectTCP(args.bitcoind_address, args.bitcoind_p2p_port, factory)
85         yield factory.getProtocol() # waits until handshake is successful
86         print '    ...success!'
87         print
88         
89         print 'Determining payout address...'
90         if args.pubkey_hash is None:
91             address_path = os.path.join(datadir_path, 'cached_payout_address')
92             
93             if os.path.exists(address_path):
94                 with open(address_path, 'rb') as f:
95                     address = f.read().strip('\r\n')
96                 print '    Loaded cached address: %s...' % (address,)
97             else:
98                 address = None
99             
100             if address is not None:
101                 res = yield deferral.retry('Error validating cached address:', 5)(lambda: bitcoind.rpc_validateaddress(address))()
102                 if not res['isvalid'] or not res['ismine']:
103                     print '    Cached address is either invalid or not controlled by local bitcoind!'
104                     address = None
105             
106             if address is None:
107                 print '    Getting payout address from bitcoind...'
108                 address = yield deferral.retry('Error getting payout address from bitcoind:', 5)(lambda: bitcoind.rpc_getaccountaddress('p2pool'))()
109             
110             with open(address_path, 'wb') as f:
111                 f.write(address)
112             
113             my_pubkey_hash = bitcoin_data.address_to_pubkey_hash(address, net.PARENT)
114         else:
115             my_pubkey_hash = args.pubkey_hash
116         print '    ...success! Payout address:', bitcoin_data.pubkey_hash_to_address(my_pubkey_hash, net.PARENT)
117         print
118         
119         my_share_hashes = set()
120         my_doa_share_hashes = set()
121         
122         tracker = p2pool_data.OkayTracker(net, my_share_hashes, my_doa_share_hashes)
123         shared_share_hashes = set()
124         ss = p2pool_data.ShareStore(os.path.join(datadir_path, 'shares.'), net)
125         known_verified = set()
126         print "Loading shares..."
127         for i, (mode, contents) in enumerate(ss.get_shares()):
128             if mode == 'share':
129                 if contents.hash in tracker.shares:
130                     continue
131                 shared_share_hashes.add(contents.hash)
132                 contents.time_seen = 0
133                 tracker.add(contents)
134                 if len(tracker.shares) % 1000 == 0 and tracker.shares:
135                     print "    %i" % (len(tracker.shares),)
136             elif mode == 'verified_hash':
137                 known_verified.add(contents)
138             else:
139                 raise AssertionError()
140         print "    ...inserting %i verified shares..." % (len(known_verified),)
141         for h in known_verified:
142             if h not in tracker.shares:
143                 ss.forget_verified_share(h)
144                 continue
145             tracker.verified.add(tracker.shares[h])
146         print "    ...done loading %i shares!" % (len(tracker.shares),)
147         print
148         tracker.removed.watch(lambda share: ss.forget_share(share.hash))
149         tracker.verified.removed.watch(lambda share: ss.forget_verified_share(share.hash))
150         tracker.removed.watch(lambda share: shared_share_hashes.discard(share.hash))
151         
152         peer_heads = expiring_dict.ExpiringDict(300) # hash -> peers that know of it
153         
154         pre_current_work = variable.Variable(None)
155         pre_merged_work = variable.Variable({})
156         # information affecting work that should trigger a long-polling update
157         current_work = variable.Variable(None)
158         # information affecting work that should not trigger a long-polling update
159         current_work2 = variable.Variable(None)
160         
161         requested = expiring_dict.ExpiringDict(300)
162         
163         print 'Initializing work...'
164         @defer.inlineCallbacks
165         def set_real_work1():
166             work = yield getwork(bitcoind)
167             current_work2.set(dict(
168                 time=work['time'],
169                 transactions=work['transactions'],
170                 merkle_link=work['merkle_link'],
171                 subsidy=work['subsidy'],
172                 clock_offset=time.time() - work['time'],
173                 last_update=time.time(),
174             )) # second set first because everything hooks on the first
175             pre_current_work.set(dict(
176                 version=work['version'],
177                 previous_block=work['previous_block_hash'],
178                 bits=work['bits'],
179                 coinbaseflags=work['coinbaseflags'],
180             ))
181         yield set_real_work1()
182         
183         get_height_rel_highest = yield height_tracker.get_height_rel_highest_func(bitcoind, factory, lambda: pre_current_work.value['previous_block'], net)
184         
185         def set_real_work2():
186             best, desired = tracker.think(get_height_rel_highest, pre_current_work.value['previous_block'], pre_current_work.value['bits'])
187             
188             t = dict(pre_current_work.value)
189             t['best_share_hash'] = best
190             t['mm_chains'] = pre_merged_work.value
191             current_work.set(t)
192             
193             t = time.time()
194             for peer2, share_hash in desired:
195                 if share_hash not in tracker.tails: # was received in the time tracker.think was running
196                     continue
197                 last_request_time, count = requested.get(share_hash, (None, 0))
198                 if last_request_time is not None and last_request_time - 5 < t < last_request_time + 10 * 1.5**count:
199                     continue
200                 potential_peers = set()
201                 for head in tracker.tails[share_hash]:
202                     potential_peers.update(peer_heads.get(head, set()))
203                 potential_peers = [peer for peer in potential_peers if peer.connected2]
204                 if count == 0 and peer2 is not None and peer2.connected2:
205                     peer = peer2
206                 else:
207                     peer = random.choice(potential_peers) if potential_peers and random.random() > .2 else peer2
208                     if peer is None:
209                         continue
210                 
211                 print 'Requesting parent share %s from %s' % (p2pool_data.format_hash(share_hash), '%s:%i' % peer.addr)
212                 peer.send_getshares(
213                     hashes=[share_hash],
214                     parents=2000,
215                     stops=list(set(tracker.heads) | set(
216                         tracker.get_nth_parent_hash(head, min(max(0, tracker.get_height_and_last(head)[0] - 1), 10)) for head in tracker.heads
217                     ))[:100],
218                 )
219                 requested[share_hash] = t, count + 1
220         pre_current_work.changed.watch(lambda _: set_real_work2())
221         pre_merged_work.changed.watch(lambda _: set_real_work2())
222         set_real_work2()
223         print '    ...success!'
224         print
225         
226         
227         @defer.inlineCallbacks
228         def set_merged_work(merged_url, merged_userpass):
229             merged_proxy = jsonrpc.Proxy(merged_url, dict(Authorization='Basic ' + base64.b64encode(merged_userpass)))
230             while True:
231                 auxblock = yield deferral.retry('Error while calling merged getauxblock:', 1)(merged_proxy.rpc_getauxblock)()
232                 pre_merged_work.set(dict(pre_merged_work.value, **{auxblock['chainid']: dict(
233                     hash=int(auxblock['hash'], 16),
234                     target=pack.IntType(256).unpack(auxblock['target'].decode('hex')),
235                     merged_proxy=merged_proxy,
236                 )}))
237                 yield deferral.sleep(1)
238         for merged_url, merged_userpass in merged_urls:
239             set_merged_work(merged_url, merged_userpass)
240         
241         @pre_merged_work.changed.watch
242         def _(new_merged_work):
243             print 'Got new merged mining work!'
244         
245         # setup p2p logic and join p2pool network
246         
247         class Node(p2p.Node):
248             def handle_shares(self, shares, peer):
249                 if len(shares) > 5:
250                     print 'Processing %i shares from %s...' % (len(shares), '%s:%i' % peer.addr if peer is not None else None)
251                 
252                 new_count = 0
253                 for share in shares:
254                     if share.hash in tracker.shares:
255                         #print 'Got duplicate share, ignoring. Hash: %s' % (p2pool_data.format_hash(share.hash),)
256                         continue
257                     
258                     new_count += 1
259                     
260                     #print 'Received share %s from %r' % (p2pool_data.format_hash(share.hash), share.peer.addr if share.peer is not None else None)
261                     
262                     tracker.add(share)
263                 
264                 if shares and peer is not None:
265                     peer_heads.setdefault(shares[0].hash, set()).add(peer)
266                 
267                 if new_count:
268                     set_real_work2()
269                 
270                 if len(shares) > 5:
271                     print '... done processing %i shares. New: %i Have: %i/~%i' % (len(shares), new_count, len(tracker.shares), 2*net.CHAIN_LENGTH)
272             
273             def handle_share_hashes(self, hashes, peer):
274                 t = time.time()
275                 get_hashes = []
276                 for share_hash in hashes:
277                     if share_hash in tracker.shares:
278                         continue
279                     last_request_time, count = requested.get(share_hash, (None, 0))
280                     if last_request_time is not None and last_request_time - 5 < t < last_request_time + 10 * 1.5**count:
281                         continue
282                     print 'Got share hash, requesting! Hash: %s' % (p2pool_data.format_hash(share_hash),)
283                     get_hashes.append(share_hash)
284                     requested[share_hash] = t, count + 1
285                 
286                 if hashes and peer is not None:
287                     peer_heads.setdefault(hashes[0], set()).add(peer)
288                 if get_hashes:
289                     peer.send_getshares(hashes=get_hashes, parents=0, stops=[])
290             
291             def handle_get_shares(self, hashes, parents, stops, peer):
292                 parents = min(parents, 1000//len(hashes))
293                 stops = set(stops)
294                 shares = []
295                 for share_hash in hashes:
296                     for share in tracker.get_chain(share_hash, min(parents + 1, tracker.get_height(share_hash))):
297                         if share.hash in stops:
298                             break
299                         shares.append(share)
300                 print 'Sending %i shares to %s:%i' % (len(shares), peer.addr[0], peer.addr[1])
301                 return shares
302         
303         @deferral.retry('Error submitting primary block: (will retry)', 10, 10)
304         def submit_block_p2p(block):
305             if factory.conn.value is None:
306                 print >>sys.stderr, 'No bitcoind connection when block submittal attempted! %s%32x' % (net.PARENT.BLOCK_EXPLORER_URL_PREFIX, header_hash)
307                 raise deferral.RetrySilentlyException()
308             factory.conn.value.send_block(block=block)
309         
310         @deferral.retry('Error submitting block: (will retry)', 10, 10)
311         @defer.inlineCallbacks
312         def submit_block_rpc(block, ignore_failure):
313             success = yield bitcoind.rpc_getmemorypool(bitcoin_data.block_type.pack(block).encode('hex'))
314             success_expected = net.PARENT.POW_FUNC(bitcoin_data.block_header_type.pack(block['header'])) <= block['header']['bits'].target
315             if (not success and success_expected and not ignore_failure) or (success and not success_expected):
316                 print >>sys.stderr, 'Block submittal result: %s Expected: %s' % (result, expected_result)
317         
318         def submit_block(block, ignore_failure):
319             submit_block_p2p(block)
320             submit_block_rpc(block, ignore_failure)
321         
322         @tracker.verified.added.watch
323         def _(share):
324             if share.pow_hash <= share.header['bits'].target:
325                 submit_block(share.as_block(tracker), ignore_failure=True)
326                 print
327                 print 'GOT BLOCK FROM PEER! Passing to bitcoind! %s bitcoin: %s%064x' % (p2pool_data.format_hash(share.hash), net.PARENT.BLOCK_EXPLORER_URL_PREFIX, share.header_hash)
328                 print
329                 if current_work.value['previous_block'] in [share.header['previous_block'], share.header_hash]:
330                     broadcast_share(share.hash)
331         
332         print 'Joining p2pool network using port %i...' % (args.p2pool_port,)
333         
334         @defer.inlineCallbacks
335         def parse(x):
336             if ':' in x:
337                 ip, port = x.split(':')
338                 defer.returnValue(((yield reactor.resolve(ip)), int(port)))
339             else:
340                 defer.returnValue(((yield reactor.resolve(x)), net.P2P_PORT))
341         
342         addrs = {}
343         if os.path.exists(os.path.join(datadir_path, 'addrs.txt')):
344             try:
345                 addrs.update(dict(eval(x) for x in open(os.path.join(datadir_path, 'addrs.txt'))))
346             except:
347                 print >>sys.stderr, "error reading addrs"
348         for addr_df in map(parse, net.BOOTSTRAP_ADDRS):
349             try:
350                 addr = yield addr_df
351                 if addr not in addrs:
352                     addrs[addr] = (0, time.time(), time.time())
353             except:
354                 log.err()
355         
356         connect_addrs = set()
357         for addr_df in map(parse, args.p2pool_nodes):
358             try:
359                 connect_addrs.add((yield addr_df))
360             except:
361                 log.err()
362         
363         p2p_node = Node(
364             best_share_hash_func=lambda: current_work.value['best_share_hash'],
365             port=args.p2pool_port,
366             net=net,
367             addr_store=addrs,
368             connect_addrs=connect_addrs,
369             max_incoming_conns=args.p2pool_conns,
370         )
371         p2p_node.start()
372         
373         task.LoopingCall(lambda: open(os.path.join(datadir_path, 'addrs.txt'), 'w').writelines(repr(x) + '\n' for x in p2p_node.addr_store.iteritems())).start(60)
374         
375         def broadcast_share(share_hash):
376             shares = []
377             for share in tracker.get_chain(share_hash, min(5, tracker.get_height(share_hash))):
378                 if share.hash in shared_share_hashes:
379                     break
380                 shared_share_hashes.add(share.hash)
381                 shares.append(share)
382             
383             for peer in p2p_node.peers.itervalues():
384                 peer.sendShares([share for share in shares if share.peer is not peer])
385         
386         # send share when the chain changes to their chain
387         current_work.changed.watch(lambda new_work: broadcast_share(new_work['best_share_hash']))
388         
389         def save_shares():
390             for share in tracker.get_chain(current_work.value['best_share_hash'], min(tracker.get_height(current_work.value['best_share_hash']), 2*net.CHAIN_LENGTH)):
391                 ss.add_share(share)
392                 if share.hash in tracker.verified.shares:
393                     ss.add_verified_hash(share.hash)
394         task.LoopingCall(save_shares).start(60)
395         
396         print '    ...success!'
397         print
398         
399         if args.upnp:
400             @defer.inlineCallbacks
401             def upnp_thread():
402                 while True:
403                     try:
404                         is_lan, lan_ip = yield ipdiscover.get_local_ip()
405                         if is_lan:
406                             pm = yield portmapper.get_port_mapper()
407                             yield pm._upnp.add_port_mapping(lan_ip, args.p2pool_port, args.p2pool_port, 'p2pool', 'TCP')
408                     except defer.TimeoutError:
409                         pass
410                     except:
411                         if p2pool.DEBUG:
412                             log.err(None, 'UPnP error:')
413                     yield deferral.sleep(random.expovariate(1/120))
414             upnp_thread()
415         
416         # start listening for workers with a JSON-RPC server
417         
418         print 'Listening for workers on %r port %i...' % (worker_endpoint[0], worker_endpoint[1])
419         
420         # setup worker logic
421         
422         removed_unstales_var = variable.Variable((0, 0, 0))
423         removed_doa_unstales_var = variable.Variable(0)
424         @tracker.verified.removed.watch
425         def _(share):
426             if share.hash in my_share_hashes and tracker.is_child_of(share.hash, current_work.value['best_share_hash']):
427                 assert share.share_data['stale_info'] in [0, 253, 254] # we made these shares in this instance
428                 removed_unstales_var.set((
429                     removed_unstales_var.value[0] + 1,
430                     removed_unstales_var.value[1] + (1 if share.share_data['stale_info'] == 253 else 0),
431                     removed_unstales_var.value[2] + (1 if share.share_data['stale_info'] == 254 else 0),
432                 ))
433             if share.hash in my_doa_share_hashes and tracker.is_child_of(share.hash, current_work.value['best_share_hash']):
434                 removed_doa_unstales.set(removed_doa_unstales.value + 1)
435         
436         def get_stale_counts():
437             '''Returns (orphans, doas), total, (orphans_recorded_in_chain, doas_recorded_in_chain)'''
438             my_shares = len(my_share_hashes)
439             my_doa_shares = len(my_doa_share_hashes)
440             delta = tracker.verified.get_delta_to_last(current_work.value['best_share_hash'])
441             my_shares_in_chain = delta.my_count + removed_unstales_var.value[0]
442             my_doa_shares_in_chain = delta.my_doa_count + removed_doa_unstales_var.value
443             orphans_recorded_in_chain = delta.my_orphan_announce_count + removed_unstales_var.value[1]
444             doas_recorded_in_chain = delta.my_dead_announce_count + removed_unstales_var.value[2]
445             
446             my_shares_not_in_chain = my_shares - my_shares_in_chain
447             my_doa_shares_not_in_chain = my_doa_shares - my_doa_shares_in_chain
448             
449             return (my_shares_not_in_chain - my_doa_shares_not_in_chain, my_doa_shares_not_in_chain), my_shares, (orphans_recorded_in_chain, doas_recorded_in_chain)
450         
451         
452         pseudoshare_received = variable.Event()
453         share_received = variable.Event()
454         local_rate_monitor = math.RateMonitor(10*60)
455         
456         class WorkerBridge(worker_interface.WorkerBridge):
457             def __init__(self):
458                 worker_interface.WorkerBridge.__init__(self)
459                 self.new_work_event = current_work.changed
460                 self.recent_shares_ts_work = []
461             
462             def get_user_details(self, request):
463                 user = request.getUser() if request.getUser() is not None else ''
464                 
465                 desired_pseudoshare_target = None
466                 if '+' in user:
467                     user, desired_pseudoshare_difficulty_str = user.rsplit('+', 1)
468                     try:
469                         desired_pseudoshare_target = bitcoin_data.difficulty_to_target(float(desired_pseudoshare_difficulty_str))
470                     except:
471                         pass
472                 
473                 desired_share_target = 2**256 - 1
474                 if '/' in user:
475                     user, min_diff_str = user.rsplit('/', 1)
476                     try:
477                         desired_share_target = bitcoin_data.difficulty_to_target(float(min_diff_str))
478                     except:
479                         pass
480                 
481                 if random.uniform(0, 100) < args.worker_fee:
482                     pubkey_hash = my_pubkey_hash
483                 else:
484                     try:
485                         pubkey_hash = bitcoin_data.address_to_pubkey_hash(user, net.PARENT)
486                     except: # XXX blah
487                         pubkey_hash = my_pubkey_hash
488                 
489                 return user, pubkey_hash, desired_share_target, desired_pseudoshare_target
490             
491             def preprocess_request(self, request):
492                 user, pubkey_hash, desired_share_target, desired_pseudoshare_target = self.get_user_details(request)
493                 return pubkey_hash, desired_share_target, desired_pseudoshare_target
494             
495             def get_work(self, pubkey_hash, desired_share_target, desired_pseudoshare_target):
496                 if len(p2p_node.peers) == 0 and net.PERSIST:
497                     raise jsonrpc.Error(-12345, u'p2pool is not connected to any peers')
498                 if current_work.value['best_share_hash'] is None and net.PERSIST:
499                     raise jsonrpc.Error(-12345, u'p2pool is downloading shares')
500                 if time.time() > current_work2.value['last_update'] + 60:
501                     raise jsonrpc.Error(-12345, u'lost contact with bitcoind')
502                 
503                 if current_work.value['mm_chains']:
504                     tree, size = bitcoin_data.make_auxpow_tree(current_work.value['mm_chains'])
505                     mm_hashes = [current_work.value['mm_chains'].get(tree.get(i), dict(hash=0))['hash'] for i in xrange(size)]
506                     mm_data = '\xfa\xbemm' + bitcoin_data.aux_pow_coinbase_type.pack(dict(
507                         merkle_root=bitcoin_data.merkle_hash(mm_hashes),
508                         size=size,
509                         nonce=0,
510                     ))
511                     mm_later = [(aux_work, mm_hashes.index(aux_work['hash']), mm_hashes) for chain_id, aux_work in current_work.value['mm_chains'].iteritems()]
512                 else:
513                     mm_data = ''
514                     mm_later = []
515                 
516                 if True:
517                     share_info, generate_tx = p2pool_data.Share.generate_transaction(
518                         tracker=tracker,
519                         share_data=dict(
520                             previous_share_hash=current_work.value['best_share_hash'],
521                             coinbase=(mm_data + current_work.value['coinbaseflags'])[:100],
522                             nonce=random.randrange(2**32),
523                             pubkey_hash=pubkey_hash,
524                             subsidy=current_work2.value['subsidy'],
525                             donation=math.perfect_round(65535*args.donation_percentage/100),
526                             stale_info=(lambda (orphans, doas), total, (orphans_recorded_in_chain, doas_recorded_in_chain):
527                                 253 if orphans > orphans_recorded_in_chain else
528                                 254 if doas > doas_recorded_in_chain else
529                                 0
530                             )(*get_stale_counts()),
531                             desired_version=1,
532                         ),
533                         block_target=current_work.value['bits'].target,
534                         desired_timestamp=int(time.time() - current_work2.value['clock_offset']),
535                         desired_target=desired_share_target,
536                         ref_merkle_link=dict(branch=[], index=0),
537                         net=net,
538                     )
539                 
540                 target = net.PARENT.SANE_MAX_TARGET
541                 if desired_pseudoshare_target is None:
542                     if len(self.recent_shares_ts_work) == 50:
543                         hash_rate = sum(work for ts, work in self.recent_shares_ts_work[1:])//(self.recent_shares_ts_work[-1][0] - self.recent_shares_ts_work[0][0])
544                         target = min(target, 2**256//hash_rate)
545                 else:
546                     target = min(target, desired_pseudoshare_target)
547                 target = max(target, share_info['bits'].target)
548                 for aux_work in current_work.value['mm_chains'].itervalues():
549                     target = max(target, aux_work['target'])
550                 
551                 transactions = [generate_tx] + list(current_work2.value['transactions'])
552                 packed_generate_tx = bitcoin_data.tx_type.pack(generate_tx)
553                 merkle_root = bitcoin_data.check_merkle_link(bitcoin_data.hash256(packed_generate_tx), current_work2.value['merkle_link'])
554                 
555                 getwork_time = time.time()
556                 merkle_link = current_work2.value['merkle_link']
557                 
558                 print 'New work for worker! Difficulty: %.06f Share difficulty: %.06f Total block value: %.6f %s including %i transactions' % (
559                     bitcoin_data.target_to_difficulty(target),
560                     bitcoin_data.target_to_difficulty(share_info['bits'].target),
561                     current_work2.value['subsidy']*1e-8, net.PARENT.SYMBOL,
562                     len(current_work2.value['transactions']),
563                 )
564                 
565                 ba = bitcoin_getwork.BlockAttempt(
566                     version=current_work.value['version'],
567                     previous_block=current_work.value['previous_block'],
568                     merkle_root=merkle_root,
569                     timestamp=current_work2.value['time'],
570                     bits=current_work.value['bits'],
571                     share_target=target,
572                 )
573                 
574                 received_header_hashes = set()
575                 
576                 def got_response(header, request):
577                     user, _, _, _ = self.get_user_details(request)
578                     assert header['merkle_root'] == merkle_root
579                     
580                     header_hash = bitcoin_data.hash256(bitcoin_data.block_header_type.pack(header))
581                     pow_hash = net.PARENT.POW_FUNC(bitcoin_data.block_header_type.pack(header))
582                     on_time = current_work.value['best_share_hash'] == share_info['share_data']['previous_share_hash']
583                     
584                     try:
585                         if pow_hash <= header['bits'].target or p2pool.DEBUG:
586                             submit_block(dict(header=header, txs=transactions), ignore_failure=False)
587                             if pow_hash <= header['bits'].target:
588                                 print
589                                 print 'GOT BLOCK FROM MINER! Passing to bitcoind! %s%064x' % (net.PARENT.BLOCK_EXPLORER_URL_PREFIX, header_hash)
590                                 print
591                     except:
592                         log.err(None, 'Error while processing potential block:')
593                     
594                     for aux_work, index, hashes in mm_later:
595                         try:
596                             if pow_hash <= aux_work['target'] or p2pool.DEBUG:
597                                 df = deferral.retry('Error submitting merged block: (will retry)', 10, 10)(aux_work['merged_proxy'].rpc_getauxblock)(
598                                     pack.IntType(256, 'big').pack(aux_work['hash']).encode('hex'),
599                                     bitcoin_data.aux_pow_type.pack(dict(
600                                         merkle_tx=dict(
601                                             tx=transactions[0],
602                                             block_hash=header_hash,
603                                             merkle_link=merkle_link,
604                                         ),
605                                         merkle_link=bitcoin_data.calculate_merkle_link(hashes, index),
606                                         parent_block_header=header,
607                                     )).encode('hex'),
608                                 )
609                                 @df.addCallback
610                                 def _(result):
611                                     if result != (pow_hash <= aux_work['target']):
612                                         print >>sys.stderr, 'Merged block submittal result: %s Expected: %s' % (result, pow_hash <= aux_work['target'])
613                                     else:
614                                         print 'Merged block submittal result: %s' % (result,)
615                                 @df.addErrback
616                                 def _(err):
617                                     log.err(err, 'Error submitting merged block:')
618                         except:
619                             log.err(None, 'Error while processing merged mining POW:')
620                     
621                     if pow_hash <= share_info['bits'].target and header_hash not in received_header_hashes:
622                         min_header = dict(header);del min_header['merkle_root']
623                         hash_link = p2pool_data.prefix_to_hash_link(packed_generate_tx[:-32-4], p2pool_data.Share.gentx_before_refhash)
624                         share = p2pool_data.Share(net, None, dict(
625                             min_header=min_header, share_info=share_info, hash_link=hash_link,
626                             ref_merkle_link=dict(branch=[], index=0),
627                         ), merkle_link=merkle_link, other_txs=transactions[1:] if pow_hash <= header['bits'].target else None)
628                         
629                         print 'GOT SHARE! %s %s prev %s age %.2fs%s' % (
630                             request.getUser(),
631                             p2pool_data.format_hash(share.hash),
632                             p2pool_data.format_hash(share.previous_hash),
633                             time.time() - getwork_time,
634                             ' DEAD ON ARRIVAL' if not on_time else '',
635                         )
636                         my_share_hashes.add(share.hash)
637                         if not on_time:
638                             my_doa_share_hashes.add(share.hash)
639                         
640                         tracker.add(share)
641                         if not p2pool.DEBUG:
642                             tracker.verified.add(share)
643                         set_real_work2()
644                         
645                         try:
646                             if pow_hash <= header['bits'].target or p2pool.DEBUG:
647                                 for peer in p2p_node.peers.itervalues():
648                                     peer.sendShares([share])
649                                 shared_share_hashes.add(share.hash)
650                         except:
651                             log.err(None, 'Error forwarding block solution:')
652                         
653                         share_received.happened(bitcoin_data.target_to_average_attempts(share.target), not on_time)
654                     
655                     if pow_hash > target:
656                         print 'Worker %s submitted share with hash > target:' % (request.getUser(),)
657                         print '    Hash:   %56x' % (pow_hash,)
658                         print '    Target: %56x' % (target,)
659                     elif header_hash in received_header_hashes:
660                         print >>sys.stderr, 'Worker %s @ %s submitted share more than once!' % (request.getUser(), request.getClientIP())
661                     else:
662                         received_header_hashes.add(header_hash)
663                         
664                         pseudoshare_received.happened(bitcoin_data.target_to_average_attempts(target), not on_time, user)
665                         self.recent_shares_ts_work.append((time.time(), bitcoin_data.target_to_average_attempts(target)))
666                         while len(self.recent_shares_ts_work) > 50:
667                             self.recent_shares_ts_work.pop(0)
668                         local_rate_monitor.add_datum(dict(work=bitcoin_data.target_to_average_attempts(target), dead=not on_time, user=user))
669                     
670                     return on_time
671                 
672                 return ba, got_response
673         
674         get_current_txouts = lambda: p2pool_data.get_expected_payouts(tracker, current_work.value['best_share_hash'], current_work.value['bits'].target, current_work2.value['subsidy'], net)
675         
676         web_root = web.get_web_root(tracker, current_work, current_work2, get_current_txouts, datadir_path, net, get_stale_counts, my_pubkey_hash, local_rate_monitor, args.worker_fee, p2p_node, my_share_hashes, pseudoshare_received, share_received)
677         worker_interface.WorkerInterface(WorkerBridge()).attach_to(web_root, get_handler=lambda request: request.redirect('/static/'))
678         
679         deferral.retry('Error binding to worker port:', traceback=False)(reactor.listenTCP)(worker_endpoint[1], server.Site(web_root), interface=worker_endpoint[0])
680         
681         with open(os.path.join(os.path.join(datadir_path, 'ready_flag')), 'wb') as f:
682             pass
683         
684         print '    ...success!'
685         print
686         
687         
688         @defer.inlineCallbacks
689         def work_poller():
690             while True:
691                 flag = factory.new_block.get_deferred()
692                 try:
693                     yield set_real_work1()
694                 except:
695                     log.err()
696                 yield defer.DeferredList([flag, deferral.sleep(15)], fireOnOneCallback=True)
697         work_poller()
698         
699         
700         # done!
701         print 'Started successfully!'
702         print 'Go to http://127.0.0.1:%i/ to view graphs and statistics!' % (worker_endpoint[1],)
703         if args.donation_percentage > 0.51:
704             print '''Donating %.1f%% of work towards P2Pool's development. Thanks for the tip!''' % (args.donation_percentage,)
705         elif args.donation_percentage < 0.49:
706             print '''Donating %.1f%% of work towards P2Pool's development. Please donate to encourage further development of P2Pool!''' % (args.donation_percentage,)
707         else:
708             print '''Donating %.1f%% of work towards P2Pool's development. Thank you!''' % (args.donation_percentage,)
709             print 'You can increase this amount with --give-author argument! (or decrease it, if you must)'
710         print
711         
712         
713         if hasattr(signal, 'SIGALRM'):
714             signal.signal(signal.SIGALRM, lambda signum, frame: reactor.callFromThread(
715                 sys.stderr.write, 'Watchdog timer went off at:\n' + ''.join(traceback.format_stack())
716             ))
717             signal.siginterrupt(signal.SIGALRM, False)
718             task.LoopingCall(signal.alarm, 30).start(1)
719         
720         if args.irc_announce:
721             from twisted.words.protocols import irc
722             class IRCClient(irc.IRCClient):
723                 nickname = 'p2pool%02i' % (random.randrange(100),)
724                 channel = net.ANNOUNCE_CHANNEL
725                 def lineReceived(self, line):
726                     if p2pool.DEBUG:
727                         print repr(line)
728                     irc.IRCClient.lineReceived(self, line)
729                 def signedOn(self):
730                     irc.IRCClient.signedOn(self)
731                     self.factory.resetDelay()
732                     self.join(self.channel)
733                     @defer.inlineCallbacks
734                     def new_share(share):
735                         if share.pow_hash <= share.header['bits'].target and abs(share.timestamp - time.time()) < 10*60:
736                             yield deferral.sleep(random.expovariate(1/60))
737                             message = '\x02%s BLOCK FOUND by %s! %s%064x' % (net.NAME.upper(), bitcoin_data.script2_to_address(share.new_script, net.PARENT), net.PARENT.BLOCK_EXPLORER_URL_PREFIX, share.header_hash)
738                             if message not in self.recent_messages:
739                                 self.say(self.channel, message)
740                                 self._remember_message(message)
741                     self.watch_id = tracker.verified.added.watch(new_share)
742                     self.recent_messages = []
743                 def _remember_message(self, message):
744                     self.recent_messages.append(message)
745                     while len(self.recent_messages) > 100:
746                         self.recent_messages.pop(0)
747                 def privmsg(self, user, channel, message):
748                     if channel == self.channel:
749                         self._remember_message(message)
750                 def connectionLost(self, reason):
751                     tracker.verified.added.unwatch(self.watch_id)
752                     print 'IRC connection lost:', reason.getErrorMessage()
753             class IRCClientFactory(protocol.ReconnectingClientFactory):
754                 protocol = IRCClient
755             reactor.connectTCP("irc.freenode.net", 6667, IRCClientFactory())
756         
757         @defer.inlineCallbacks
758         def status_thread():
759             last_str = None
760             last_time = 0
761             while True:
762                 yield deferral.sleep(3)
763                 try:
764                     if time.time() > current_work2.value['last_update'] + 60:
765                         print >>sys.stderr, '''---> LOST CONTACT WITH BITCOIND for %s! Check that it isn't frozen or dead! <---''' % (math.format_dt(time.time() - current_work2.value['last_update']),)
766                     
767                     height = tracker.get_height(current_work.value['best_share_hash'])
768                     this_str = 'P2Pool: %i shares in chain (%i verified/%i total) Peers: %i (%i incoming)' % (
769                         height,
770                         len(tracker.verified.shares),
771                         len(tracker.shares),
772                         len(p2p_node.peers),
773                         sum(1 for peer in p2p_node.peers.itervalues() if peer.incoming),
774                     ) + (' FDs: %i R/%i W' % (len(reactor.getReaders()), len(reactor.getWriters())) if p2pool.DEBUG else '')
775                     
776                     datums, dt = local_rate_monitor.get_datums_in_last()
777                     my_att_s = sum(datum['work']/dt for datum in datums)
778                     this_str += '\n Local: %sH/s in last %s Local dead on arrival: %s Expected time to share: %s' % (
779                         math.format(int(my_att_s)),
780                         math.format_dt(dt),
781                         math.format_binomial_conf(sum(1 for datum in datums if datum['dead']), len(datums), 0.95),
782                         math.format_dt(2**256 / tracker.shares[current_work.value['best_share_hash']].max_target / my_att_s) if my_att_s and current_work.value['best_share_hash'] else '???',
783                     )
784                     
785                     if height > 2:
786                         (stale_orphan_shares, stale_doa_shares), shares, _ = get_stale_counts()
787                         stale_prop = p2pool_data.get_average_stale_prop(tracker, current_work.value['best_share_hash'], min(720, height))
788                         real_att_s = p2pool_data.get_pool_attempts_per_second(tracker, current_work.value['best_share_hash'], min(height - 1, 720)) / (1 - stale_prop)
789                         
790                         this_str += '\n Shares: %i (%i orphan, %i dead) Stale rate: %s Efficiency: %s Current payout: %.4f %s' % (
791                             shares, stale_orphan_shares, stale_doa_shares,
792                             math.format_binomial_conf(stale_orphan_shares + stale_doa_shares, shares, 0.95),
793                             math.format_binomial_conf(stale_orphan_shares + stale_doa_shares, shares, 0.95, lambda x: (1 - x)/(1 - stale_prop)),
794                             get_current_txouts().get(bitcoin_data.pubkey_hash_to_script2(my_pubkey_hash), 0)*1e-8, net.PARENT.SYMBOL,
795                         )
796                         this_str += '\n Pool: %sH/s Stale rate: %.1f%% Expected time to block: %s' % (
797                             math.format(int(real_att_s)),
798                             100*stale_prop,
799                             math.format_dt(2**256 / current_work.value['bits'].target / real_att_s),
800                         )
801                         
802                         desired_version_counts = p2pool_data.get_desired_version_counts(tracker, current_work.value['best_share_hash'], min(720, height))
803                         majority_desired_version = max(desired_version_counts, key=lambda k: desired_version_counts[k])
804                         if majority_desired_version not in [0, 1]:
805                             print >>sys.stderr, '#'*40
806                             print >>sys.stderr, '>>> WARNING: A MAJORITY OF SHARES CONTAIN A VOTE FOR AN UNSUPPORTED SHARE IMPLEMENTATION! (v%i with %i%% support)' % (
807                                 majority_desired_version, 100*desired_version_counts[majority_desired_version]/sum(desired_version_counts.itervalues()))
808                             print >>sys.stderr, '>>> An upgrade is likely necessary. Check http://p2pool.forre.st/ for more information.'
809                             print >>sys.stderr, '#'*40
810                     
811                     if this_str != last_str or time.time() > last_time + 15:
812                         print this_str
813                         last_str = this_str
814                         last_time = time.time()
815                 except:
816                     log.err()
817         status_thread()
818     except:
819         reactor.stop()
820         log.err(None, 'Fatal error:')
821
822 def run():
823     class FixedArgumentParser(argparse.ArgumentParser):
824         def _read_args_from_files(self, arg_strings):
825             # expand arguments referencing files
826             new_arg_strings = []
827             for arg_string in arg_strings:
828                 
829                 # for regular arguments, just add them back into the list
830                 if not arg_string or arg_string[0] not in self.fromfile_prefix_chars:
831                     new_arg_strings.append(arg_string)
832                 
833                 # replace arguments referencing files with the file content
834                 else:
835                     try:
836                         args_file = open(arg_string[1:])
837                         try:
838                             arg_strings = []
839                             for arg_line in args_file.read().splitlines():
840                                 for arg in self.convert_arg_line_to_args(arg_line):
841                                     arg_strings.append(arg)
842                             arg_strings = self._read_args_from_files(arg_strings)
843                             new_arg_strings.extend(arg_strings)
844                         finally:
845                             args_file.close()
846                     except IOError:
847                         err = sys.exc_info()[1]
848                         self.error(str(err))
849             
850             # return the modified argument list
851             return new_arg_strings
852         
853         def convert_arg_line_to_args(self, arg_line):
854             return [arg for arg in arg_line.split() if arg.strip()]
855     
856     
857     realnets=dict((name, net) for name, net in networks.nets.iteritems() if '_testnet' not in name)
858     
859     parser = FixedArgumentParser(description='p2pool (version %s)' % (p2pool.__version__,), fromfile_prefix_chars='@')
860     parser.add_argument('--version', action='version', version=p2pool.__version__)
861     parser.add_argument('--net',
862         help='use specified network (default: bitcoin)',
863         action='store', choices=sorted(realnets), default='bitcoin', dest='net_name')
864     parser.add_argument('--testnet',
865         help='''use the network's testnet''',
866         action='store_const', const=True, default=False, dest='testnet')
867     parser.add_argument('--debug',
868         help='enable debugging mode',
869         action='store_const', const=True, default=False, dest='debug')
870     parser.add_argument('-a', '--address',
871         help='generate payouts to this address (default: <address requested from bitcoind>)',
872         type=str, action='store', default=None, dest='address')
873     parser.add_argument('--datadir',
874         help='store data in this directory (default: <directory run_p2pool.py is in>/data)',
875         type=str, action='store', default=None, dest='datadir')
876     parser.add_argument('--logfile',
877         help='''log to this file (default: data/<NET>/log)''',
878         type=str, action='store', default=None, dest='logfile')
879     parser.add_argument('--merged',
880         help='call getauxblock on this url to get work for merged mining (example: http://ncuser:ncpass@127.0.0.1:10332/)',
881         type=str, action='append', default=[], dest='merged_urls')
882     parser.add_argument('--give-author', metavar='DONATION_PERCENTAGE',
883         help='donate this percentage of work towards the development of p2pool (default: 0.5)',
884         type=float, action='store', default=0.5, dest='donation_percentage')
885     parser.add_argument('--irc-announce',
886         help='announce any blocks found on irc://irc.freenode.net/#p2pool',
887         action='store_true', default=False, dest='irc_announce')
888     
889     p2pool_group = parser.add_argument_group('p2pool interface')
890     p2pool_group.add_argument('--p2pool-port', metavar='PORT',
891         help='use port PORT to listen for connections (forward this port from your router!) (default: %s)' % ', '.join('%s:%i' % (name, net.P2P_PORT) for name, net in sorted(realnets.items())),
892         type=int, action='store', default=None, dest='p2pool_port')
893     p2pool_group.add_argument('-n', '--p2pool-node', metavar='ADDR[:PORT]',
894         help='connect to existing p2pool node at ADDR listening on port PORT (defaults to default p2pool P2P port) in addition to builtin addresses',
895         type=str, action='append', default=[], dest='p2pool_nodes')
896     parser.add_argument('--disable-upnp',
897         help='''don't attempt to use UPnP to forward p2pool's P2P port from the Internet to this computer''',
898         action='store_false', default=True, dest='upnp')
899     p2pool_group.add_argument('--max-conns', metavar='CONNS',
900         help='maximum incoming connections (default: 40)',
901         type=int, action='store', default=40, dest='p2pool_conns')
902     
903     worker_group = parser.add_argument_group('worker interface')
904     worker_group.add_argument('-w', '--worker-port', metavar='PORT or ADDR:PORT',
905         help='listen on PORT on interface with ADDR for RPC connections from miners (default: all interfaces, %s)' % ', '.join('%s:%i' % (name, net.WORKER_PORT) for name, net in sorted(realnets.items())),
906         type=str, action='store', default=None, dest='worker_endpoint')
907     worker_group.add_argument('-f', '--fee', metavar='FEE_PERCENTAGE',
908         help='''charge workers mining to their own bitcoin address (by setting their miner's username to a bitcoin address) this percentage fee to mine on your p2pool instance. Amount displayed at http://127.0.0.1:WORKER_PORT/fee (default: 0)''',
909         type=float, action='store', default=0, dest='worker_fee')
910     
911     bitcoind_group = parser.add_argument_group('bitcoind interface')
912     bitcoind_group.add_argument('--bitcoind-address', metavar='BITCOIND_ADDRESS',
913         help='connect to this address (default: 127.0.0.1)',
914         type=str, action='store', default='127.0.0.1', dest='bitcoind_address')
915     bitcoind_group.add_argument('--bitcoind-rpc-port', metavar='BITCOIND_RPC_PORT',
916         help='''connect to JSON-RPC interface at this port (default: %s <read from bitcoin.conf if password not provided>)''' % ', '.join('%s:%i' % (name, net.PARENT.RPC_PORT) for name, net in sorted(realnets.items())),
917         type=int, action='store', default=None, dest='bitcoind_rpc_port')
918     bitcoind_group.add_argument('--bitcoind-p2p-port', metavar='BITCOIND_P2P_PORT',
919         help='''connect to P2P interface at this port (default: %s <read from bitcoin.conf if password not provided>)''' % ', '.join('%s:%i' % (name, net.PARENT.P2P_PORT) for name, net in sorted(realnets.items())),
920         type=int, action='store', default=None, dest='bitcoind_p2p_port')
921     
922     bitcoind_group.add_argument(metavar='BITCOIND_RPCUSERPASS',
923         help='bitcoind RPC interface username, then password, space-separated (only one being provided will cause the username to default to being empty, and none will cause P2Pool to read them from bitcoin.conf)',
924         type=str, action='store', default=[], nargs='*', dest='bitcoind_rpc_userpass')
925     
926     args = parser.parse_args()
927     
928     if args.debug:
929         p2pool.DEBUG = True
930     
931     net_name = args.net_name + ('_testnet' if args.testnet else '')
932     net = networks.nets[net_name]
933     
934     datadir_path = os.path.join((os.path.join(os.path.dirname(sys.argv[0]), 'data') if args.datadir is None else args.datadir), net_name)
935     if not os.path.exists(datadir_path):
936         os.makedirs(datadir_path)
937     
938     if len(args.bitcoind_rpc_userpass) > 2:
939         parser.error('a maximum of two arguments are allowed')
940     args.bitcoind_rpc_username, args.bitcoind_rpc_password = ([None, None] + args.bitcoind_rpc_userpass)[-2:]
941     
942     if args.bitcoind_rpc_password is None:
943         if not hasattr(net.PARENT, 'CONF_FILE_FUNC'):
944             parser.error('This network has no configuration file function. Manually enter your RPC password.')
945         conf_path = net.PARENT.CONF_FILE_FUNC()
946         if not os.path.exists(conf_path):
947             parser.error('''Bitcoin configuration file not found. Manually enter your RPC password.\r\n'''
948                 '''If you actually haven't created a configuration file, you should create one at %s with the text:\r\n'''
949                 '''\r\n'''
950                 '''server=1\r\n'''
951                 '''rpcpassword=%x''' % (conf_path, random.randrange(2**128)))
952         with open(conf_path, 'rb') as f:
953             cp = ConfigParser.RawConfigParser()
954             cp.readfp(StringIO.StringIO('[x]\r\n' + f.read()))
955             for conf_name, var_name, var_type in [
956                 ('rpcuser', 'bitcoind_rpc_username', str),
957                 ('rpcpassword', 'bitcoind_rpc_password', str),
958                 ('rpcport', 'bitcoind_rpc_port', int),
959                 ('port', 'bitcoind_p2p_port', int),
960             ]:
961                 if getattr(args, var_name) is None and cp.has_option('x', conf_name):
962                     setattr(args, var_name, var_type(cp.get('x', conf_name)))
963         if args.bitcoind_rpc_password is None:
964             parser.error('''Bitcoin configuration file didn't contain an rpcpassword= line! Add one!''')
965     
966     if args.bitcoind_rpc_username is None:
967         args.bitcoind_rpc_username = ''
968     
969     if args.bitcoind_rpc_port is None:
970         args.bitcoind_rpc_port = net.PARENT.RPC_PORT
971     
972     if args.bitcoind_p2p_port is None:
973         args.bitcoind_p2p_port = net.PARENT.P2P_PORT
974     
975     if args.p2pool_port is None:
976         args.p2pool_port = net.P2P_PORT
977     
978     if args.worker_endpoint is None:
979         worker_endpoint = '', net.WORKER_PORT
980     elif ':' not in args.worker_endpoint:
981         worker_endpoint = '', int(args.worker_endpoint)
982     else:
983         addr, port = args.worker_endpoint.rsplit(':', 1)
984         worker_endpoint = addr, int(port)
985     
986     if args.address is not None:
987         try:
988             args.pubkey_hash = bitcoin_data.address_to_pubkey_hash(args.address, net.PARENT)
989         except Exception, e:
990             parser.error('error parsing address: ' + repr(e))
991     else:
992         args.pubkey_hash = None
993     
994     def separate_url(url):
995         s = urlparse.urlsplit(url)
996         if '@' not in s.netloc:
997             parser.error('merged url netloc must contain an "@"')
998         userpass, new_netloc = s.netloc.rsplit('@', 1)
999         return urlparse.urlunsplit(s._replace(netloc=new_netloc)), userpass
1000     merged_urls = map(separate_url, args.merged_urls)
1001     
1002     if args.logfile is None:
1003         args.logfile = os.path.join(datadir_path, 'log')
1004     
1005     logfile = logging.LogFile(args.logfile)
1006     pipe = logging.TimestampingPipe(logging.TeePipe([logging.EncodeReplacerPipe(sys.stderr), logfile]))
1007     sys.stdout = logging.AbortPipe(pipe)
1008     sys.stderr = log.DefaultObserver.stderr = logging.AbortPipe(logging.PrefixPipe(pipe, '> '))
1009     if hasattr(signal, "SIGUSR1"):
1010         def sigusr1(signum, frame):
1011             print 'Caught SIGUSR1, closing %r...' % (args.logfile,)
1012             logfile.reopen()
1013             print '...and reopened %r after catching SIGUSR1.' % (args.logfile,)
1014         signal.signal(signal.SIGUSR1, sigusr1)
1015     task.LoopingCall(logfile.reopen).start(5)
1016     
1017     reactor.callWhenRunning(main, args, net, datadir_path, merged_urls, worker_endpoint)
1018     reactor.run()