1 from __future__ import division
16 from twisted.internet import defer, error, reactor, protocol, task
17 from twisted.web import server, resource
18 from twisted.python import log
19 from nattraverso import portmapper, ipdiscover
21 import bitcoin.p2p as bitcoin_p2p, bitcoin.getwork as bitcoin_getwork, bitcoin.data as bitcoin_data
22 from bitcoin import worker_interface
23 from util import expiring_dict, jsonrpc, variable, deferral, math, logging, pack
24 from . import p2p, networks, graphs
25 import p2pool, p2pool.data as p2pool_data
27 @deferral.retry('Error getting work from bitcoind:', 3)
28 @defer.inlineCallbacks
29 def getwork(bitcoind):
31 work = yield bitcoind.rpc_getmemorypool()
32 except jsonrpc.Error, e:
33 if e.code == -32601: # Method not found
34 print >>sys.stderr, 'Error: Bitcoin version too old! Upgrade to v0.5 or newer!'
35 raise deferral.RetrySilentlyException()
37 packed_transactions = [x.decode('hex') for x in work['transactions']]
38 defer.returnValue(dict(
39 version=work['version'],
40 previous_block_hash=int(work['previousblockhash'], 16),
41 transactions=map(bitcoin_data.tx_type.unpack, packed_transactions),
42 merkle_branch=bitcoin_data.calculate_merkle_branch([0] + map(bitcoin_data.hash256, packed_transactions), 0),
43 subsidy=work['coinbasevalue'],
45 bits=bitcoin_data.FloatingIntegerType().unpack(work['bits'].decode('hex')[::-1]) if isinstance(work['bits'], (str, unicode)) else bitcoin_data.FloatingInteger(work['bits']),
46 coinbaseflags=work['coinbaseflags'].decode('hex') if 'coinbaseflags' in work else ''.join(x.decode('hex') for x in work['coinbaseaux'].itervalues()) if 'coinbaseaux' in work else '',
49 @defer.inlineCallbacks
50 def main(args, net, datadir_path, merged_urls, worker_endpoint):
52 print 'p2pool (version %s)' % (p2pool.__version__,)
58 print "Install Pygame and PIL to enable visualizations! Visualizations disabled."
61 # connect to bitcoind over JSON-RPC and do initial getmemorypool
62 url = 'http://%s:%i/' % (args.bitcoind_address, args.bitcoind_rpc_port)
63 print '''Testing bitcoind RPC connection to '%s' with username '%s'...''' % (url, args.bitcoind_rpc_username)
64 bitcoind = jsonrpc.Proxy(url, (args.bitcoind_rpc_username, args.bitcoind_rpc_password))
65 good = yield deferral.retry('Error while checking bitcoind identity:', 1)(net.PARENT.RPC_CHECK)(bitcoind)
67 print >>sys.stderr, " Check failed! Make sure that you're connected to the right bitcoind with --bitcoind-rpc-port!"
69 temp_work = yield getwork(bitcoind)
71 print ' Current block hash: %x' % (temp_work['previous_block_hash'],)
74 # connect to bitcoind over bitcoin-p2p
75 print '''Testing bitcoind P2P connection to '%s:%s'...''' % (args.bitcoind_address, args.bitcoind_p2p_port)
76 factory = bitcoin_p2p.ClientFactory(net.PARENT)
77 reactor.connectTCP(args.bitcoind_address, args.bitcoind_p2p_port, factory)
78 yield factory.getProtocol() # waits until handshake is successful
82 print 'Determining payout script...'
83 if args.pubkey_hash is None:
84 address_path = os.path.join(datadir_path, 'cached_payout_address')
86 if os.path.exists(address_path):
87 with open(address_path, 'rb') as f:
88 address = f.read().strip('\r\n')
89 print ' Loaded cached address: %s...' % (address,)
93 if address is not None:
94 res = yield deferral.retry('Error validating cached address:', 5)(lambda: bitcoind.rpc_validateaddress(address))()
95 if not res['isvalid'] or not res['ismine']:
96 print ' Cached address is either invalid or not controlled by local bitcoind!'
100 print ' Getting payout address from bitcoind...'
101 address = yield deferral.retry('Error getting payout address from bitcoind:', 5)(lambda: bitcoind.rpc_getaccountaddress('p2pool'))()
103 with open(address_path, 'wb') as f:
106 my_script = bitcoin_data.pubkey_hash_to_script2(bitcoin_data.address_to_pubkey_hash(address, net.PARENT))
108 print ' ...Computing payout script from provided address...'
109 my_script = bitcoin_data.pubkey_hash_to_script2(args.pubkey_hash)
110 print ' ...success! Payout script:', bitcoin_data.script2_to_human(my_script, net.PARENT)
113 my_share_hashes = set()
114 my_doa_share_hashes = set()
116 tracker = p2pool_data.OkayTracker(net, my_share_hashes, my_doa_share_hashes)
117 shared_share_hashes = set()
118 ss = p2pool_data.ShareStore(os.path.join(datadir_path, 'shares.'), net)
119 known_verified = set()
121 print "Loading shares..."
122 for i, (mode, contents) in enumerate(ss.get_shares()):
124 if contents.hash in tracker.shares:
126 shared_share_hashes.add(contents.hash)
127 contents.time_seen = 0
128 tracker.add(contents)
129 if len(tracker.shares) % 1000 == 0 and tracker.shares:
130 print " %i" % (len(tracker.shares),)
131 elif mode == 'verified_hash':
132 known_verified.add(contents)
134 raise AssertionError()
135 print " ...inserting %i verified shares..." % (len(known_verified),)
136 for h in known_verified:
137 if h not in tracker.shares:
138 ss.forget_verified_share(h)
140 tracker.verified.add(tracker.shares[h])
141 print " ...done loading %i shares!" % (len(tracker.shares),)
143 tracker.removed.watch(lambda share: ss.forget_share(share.hash))
144 tracker.verified.removed.watch(lambda share: ss.forget_verified_share(share.hash))
145 tracker.removed.watch(lambda share: shared_share_hashes.discard(share.hash))
147 peer_heads = expiring_dict.ExpiringDict(300) # hash -> peers that know of it
149 pre_current_work = variable.Variable(None)
150 pre_merged_work = variable.Variable({})
151 # information affecting work that should trigger a long-polling update
152 current_work = variable.Variable(None)
153 # information affecting work that should not trigger a long-polling update
154 current_work2 = variable.Variable(None)
156 requested = expiring_dict.ExpiringDict(300)
158 @defer.inlineCallbacks
159 def set_real_work1():
160 work = yield getwork(bitcoind)
161 current_work2.set(dict(
163 transactions=work['transactions'],
164 merkle_branch=work['merkle_branch'],
165 subsidy=work['subsidy'],
166 clock_offset=time.time() - work['time'],
167 last_update=time.time(),
168 )) # second set first because everything hooks on the first
169 pre_current_work.set(dict(
170 version=work['version'],
171 previous_block=work['previous_block_hash'],
173 coinbaseflags=work['coinbaseflags'],
176 if '\ngetblock ' in (yield bitcoind.rpc_help()):
177 height_cacher = deferral.DeferredCacher(defer.inlineCallbacks(lambda block_hash: defer.returnValue((yield bitcoind.rpc_getblock('%x' % (block_hash,)))['blockcount'])))
178 def get_height_rel_highest(block_hash):
179 return height_cacher.call_now(block_hash, 0) - height_cacher.call_now(pre_current_work.value['previous_block'], 1000000000)
181 get_height_rel_highest = bitcoin_p2p.HeightTracker(bitcoind, factory).get_height_rel_highest
183 def set_real_work2():
184 best, desired = tracker.think(get_height_rel_highest, pre_current_work.value['previous_block'], pre_current_work.value['bits'])
186 t = dict(pre_current_work.value)
187 t['best_share_hash'] = best
188 t['mm_chains'] = pre_merged_work.value
192 for peer2, share_hash in desired:
193 if share_hash not in tracker.tails: # was received in the time tracker.think was running
195 last_request_time, count = requested.get(share_hash, (None, 0))
196 if last_request_time is not None and last_request_time - 5 < t < last_request_time + 10 * 1.5**count:
198 potential_peers = set()
199 for head in tracker.tails[share_hash]:
200 potential_peers.update(peer_heads.get(head, set()))
201 potential_peers = [peer for peer in potential_peers if peer.connected2]
202 if count == 0 and peer2 is not None and peer2.connected2:
205 peer = random.choice(potential_peers) if potential_peers and random.random() > .2 else peer2
209 print 'Requesting parent share %s from %s' % (p2pool_data.format_hash(share_hash), '%s:%i' % peer.addr)
213 stops=list(set(tracker.heads) | set(
214 tracker.get_nth_parent_hash(head, min(max(0, tracker.get_height_and_last(head)[0] - 1), 10)) for head in tracker.heads
217 requested[share_hash] = t, count + 1
218 pre_current_work.changed.watch(lambda _: set_real_work2())
220 print 'Initializing work...'
221 yield set_real_work1()
225 pre_merged_work.changed.watch(lambda _: set_real_work2())
228 @defer.inlineCallbacks
229 def set_merged_work(merged_url, merged_userpass):
230 merged_proxy = jsonrpc.Proxy(merged_url, (merged_userpass,))
232 auxblock = yield deferral.retry('Error while calling merged getauxblock:', 1)(merged_proxy.rpc_getauxblock)()
233 pre_merged_work.set(dict(pre_merged_work.value, **{auxblock['chainid']: dict(
234 hash=int(auxblock['hash'], 16),
235 target=pack.IntType(256).unpack(auxblock['target'].decode('hex')),
236 merged_proxy=merged_proxy,
238 yield deferral.sleep(1)
239 for merged_url, merged_userpass in merged_urls:
240 set_merged_work(merged_url, merged_userpass)
242 @pre_merged_work.changed.watch
243 def _(new_merged_work):
244 print 'Got new merged mining work!'
246 # setup p2p logic and join p2pool network
248 class Node(p2p.Node):
249 def handle_shares(self, shares, peer):
251 print 'Processing %i shares from %s...' % (len(shares), '%s:%i' % peer.addr if peer is not None else None)
255 if share.hash in tracker.shares:
256 #print 'Got duplicate share, ignoring. Hash: %s' % (p2pool_data.format_hash(share.hash),)
261 #print 'Received share %s from %r' % (p2pool_data.format_hash(share.hash), share.peer.addr if share.peer is not None else None)
265 if shares and peer is not None:
266 peer_heads.setdefault(shares[0].hash, set()).add(peer)
272 print '... done processing %i shares. New: %i Have: %i/~%i' % (len(shares), new_count, len(tracker.shares), 2*net.CHAIN_LENGTH)
274 def handle_share_hashes(self, hashes, peer):
277 for share_hash in hashes:
278 if share_hash in tracker.shares:
280 last_request_time, count = requested.get(share_hash, (None, 0))
281 if last_request_time is not None and last_request_time - 5 < t < last_request_time + 10 * 1.5**count:
283 print 'Got share hash, requesting! Hash: %s' % (p2pool_data.format_hash(share_hash),)
284 get_hashes.append(share_hash)
285 requested[share_hash] = t, count + 1
287 if hashes and peer is not None:
288 peer_heads.setdefault(hashes[0], set()).add(peer)
290 peer.send_getshares(hashes=get_hashes, parents=0, stops=[])
292 def handle_get_shares(self, hashes, parents, stops, peer):
293 parents = min(parents, 1000//len(hashes))
296 for share_hash in hashes:
297 for share in tracker.get_chain(share_hash, min(parents + 1, tracker.get_height(share_hash))):
298 if share.hash in stops:
301 print 'Sending %i shares to %s:%i' % (len(shares), peer.addr[0], peer.addr[1])
302 peer.sendShares(shares)
304 @tracker.verified.added.watch
306 if share.pow_hash <= share.header['bits'].target:
307 if factory.conn.value is not None:
308 factory.conn.value.send_block(block=share.as_block(tracker))
310 print >>sys.stderr, 'No bitcoind connection when block submittal attempted! Erp!'
312 print 'GOT BLOCK FROM PEER! Passing to bitcoind! %s bitcoin: %x' % (p2pool_data.format_hash(share.hash), share.header_hash)
314 recent_blocks.append({ 'ts': share.timestamp, 'hash': '%x' % (share.header_hash) })
316 print 'Joining p2pool network using port %i...' % (args.p2pool_port,)
318 @defer.inlineCallbacks
321 ip, port = x.split(':')
322 defer.returnValue(((yield reactor.resolve(ip)), int(port)))
324 defer.returnValue(((yield reactor.resolve(x)), net.P2P_PORT))
327 if os.path.exists(os.path.join(datadir_path, 'addrs.txt')):
329 addrs.update(dict(eval(x) for x in open(os.path.join(datadir_path, 'addrs.txt'))))
331 print >>sys.stderr, "error reading addrs"
332 for addr_df in map(parse, net.BOOTSTRAP_ADDRS):
335 if addr not in addrs:
336 addrs[addr] = (0, time.time(), time.time())
340 connect_addrs = set()
341 for addr_df in map(parse, args.p2pool_nodes):
343 connect_addrs.add((yield addr_df))
348 best_share_hash_func=lambda: current_work.value['best_share_hash'],
349 port=args.p2pool_port,
352 connect_addrs=connect_addrs,
357 open(os.path.join(datadir_path, 'addrs.txt'), 'w').writelines(repr(x) + '\n' for x in p2p_node.addr_store.iteritems())
358 task.LoopingCall(save_addrs).start(60)
360 # send share when the chain changes to their chain
361 def work_changed(new_work):
362 #print 'Work changed:', new_work
364 for share in tracker.get_chain(new_work['best_share_hash'], min(5, tracker.get_height(new_work['best_share_hash']))):
365 if share.hash in shared_share_hashes:
367 shared_share_hashes.add(share.hash)
370 for peer in p2p_node.peers.itervalues():
371 peer.sendShares([share for share in shares if share.peer is not peer])
373 current_work.changed.watch(work_changed)
376 for share in tracker.get_chain(current_work.value['best_share_hash'], min(tracker.get_height(current_work.value['best_share_hash']), 2*net.CHAIN_LENGTH)):
378 if share.hash in tracker.verified.shares:
379 ss.add_verified_hash(share.hash)
380 task.LoopingCall(save_shares).start(60)
385 start_time = time.time()
387 @defer.inlineCallbacks
391 is_lan, lan_ip = yield ipdiscover.get_local_ip()
393 pm = yield portmapper.get_port_mapper()
394 yield pm._upnp.add_port_mapping(lan_ip, args.p2pool_port, args.p2pool_port, 'p2pool', 'TCP')
395 except defer.TimeoutError:
399 log.err(None, "UPnP error:")
400 yield deferral.sleep(random.expovariate(1/120))
405 # start listening for workers with a JSON-RPC server
407 print 'Listening for workers on %r port %i...' % (worker_endpoint[0], worker_endpoint[1])
409 if os.path.exists(os.path.join(datadir_path, 'vip_pass')):
410 with open(os.path.join(datadir_path, 'vip_pass'), 'rb') as f:
411 vip_pass = f.read().strip('\r\n')
413 vip_pass = '%016x' % (random.randrange(2**64),)
414 with open(os.path.join(datadir_path, 'vip_pass'), 'wb') as f:
416 print ' Worker password:', vip_pass, '(only required for generating graphs)'
420 removed_unstales_var = variable.Variable((0, 0, 0))
421 @tracker.verified.removed.watch
423 if share.hash in my_share_hashes and tracker.is_child_of(share.hash, current_work.value['best_share_hash']):
424 assert share.share_data['stale_info'] in [0, 253, 254] # we made these shares in this instance
425 removed_unstales_var.set((
426 removed_unstales_var.value[0] + 1,
427 removed_unstales_var.value[1] + (1 if share.share_data['stale_info'] == 253 else 0),
428 removed_unstales_var.value[2] + (1 if share.share_data['stale_info'] == 254 else 0),
431 removed_doa_unstales_var = variable.Variable(0)
432 @tracker.verified.removed.watch
434 if share.hash in my_doa_share_hashes and tracker.is_child_of(share.hash, current_work.value['best_share_hash']):
435 removed_doa_unstales.set(removed_doa_unstales.value + 1)
437 def get_stale_counts():
438 '''Returns (orphans, doas), total, (orphans_recorded_in_chain, doas_recorded_in_chain)'''
439 my_shares = len(my_share_hashes)
440 my_doa_shares = len(my_doa_share_hashes)
441 delta = tracker.verified.get_delta(current_work.value['best_share_hash'])
442 my_shares_in_chain = delta.my_count + removed_unstales_var.value[0]
443 my_doa_shares_in_chain = delta.my_doa_count + removed_doa_unstales_var.value
444 orphans_recorded_in_chain = delta.my_orphan_announce_count + removed_unstales_var.value[1]
445 doas_recorded_in_chain = delta.my_dead_announce_count + removed_unstales_var.value[2]
447 my_shares_not_in_chain = my_shares - my_shares_in_chain
448 my_doa_shares_not_in_chain = my_doa_shares - my_doa_shares_in_chain
450 return (my_shares_not_in_chain - my_doa_shares_not_in_chain, my_doa_shares_not_in_chain), my_shares, (orphans_recorded_in_chain, doas_recorded_in_chain)
453 local_rate_monitor = math.RateMonitor(10*60)
455 class WorkerBridge(worker_interface.WorkerBridge):
457 worker_interface.WorkerBridge.__init__(self)
458 self.new_work_event = current_work.changed
459 self.recent_shares_ts_work = []
461 def _get_payout_script_from_username(self, user):
465 pubkey_hash = bitcoin_data.address_to_pubkey_hash(user, net.PARENT)
468 return bitcoin_data.pubkey_hash_to_script2(pubkey_hash)
470 def preprocess_request(self, request):
471 payout_script = self._get_payout_script_from_username(request.getUser())
472 if payout_script is None or random.uniform(0, 100) < args.worker_fee:
473 payout_script = my_script
474 return payout_script,
476 def get_work(self, payout_script):
477 if len(p2p_node.peers) == 0 and net.PERSIST:
478 raise jsonrpc.Error(-12345, u'p2pool is not connected to any peers')
479 if current_work.value['best_share_hash'] is None and net.PERSIST:
480 raise jsonrpc.Error(-12345, u'p2pool is downloading shares')
481 if time.time() > current_work2.value['last_update'] + 60:
482 raise jsonrpc.Error(-12345, u'lost contact with bitcoind')
484 if current_work.value['mm_chains']:
485 tree, size = bitcoin_data.make_auxpow_tree(current_work.value['mm_chains'])
486 mm_hashes = [current_work.value['mm_chains'].get(tree.get(i), dict(hash=0))['hash'] for i in xrange(size)]
487 mm_data = '\xfa\xbemm' + bitcoin_data.aux_pow_coinbase_type.pack(dict(
488 merkle_root=bitcoin_data.merkle_hash(mm_hashes),
492 mm_later = [(aux_work, mm_hashes.index(aux_work['hash']), mm_hashes) for chain_id, aux_work in current_work.value['mm_chains'].iteritems()]
497 share_info, generate_tx = p2pool_data.generate_transaction(
500 previous_share_hash=current_work.value['best_share_hash'],
501 coinbase=(mm_data + current_work.value['coinbaseflags'])[:100],
502 nonce=struct.pack('<Q', random.randrange(2**64)),
503 new_script=payout_script,
504 subsidy=current_work2.value['subsidy'],
505 donation=math.perfect_round(65535*args.donation_percentage/100),
506 stale_info=(lambda (orphans, doas), total, (orphans_recorded_in_chain, doas_recorded_in_chain):
507 253 if orphans > orphans_recorded_in_chain else
508 254 if doas > doas_recorded_in_chain else
510 )(*get_stale_counts()),
512 block_target=current_work.value['bits'].target,
513 desired_timestamp=int(time.time() - current_work2.value['clock_offset']),
517 target = 2**256//2**32 - 1
518 if len(self.recent_shares_ts_work) == 50:
519 hash_rate = sum(work for ts, work in self.recent_shares_ts_work)//(self.recent_shares_ts_work[-1][0] - self.recent_shares_ts_work[0][0])
520 target = min(target, 2**256//(hash_rate * 5))
521 target = max(target, share_info['bits'].target)
522 for aux_work in current_work.value['mm_chains'].itervalues():
523 target = max(target, aux_work['target'])
525 transactions = [generate_tx] + list(current_work2.value['transactions'])
526 merkle_root = bitcoin_data.check_merkle_branch(bitcoin_data.hash256(bitcoin_data.tx_type.pack(generate_tx)), 0, current_work2.value['merkle_branch'])
528 getwork_time = time.time()
529 merkle_branch = current_work2.value['merkle_branch']
531 print 'New work for worker! Difficulty: %.06f Share difficulty: %.06f Total block value: %.6f %s including %i transactions' % (
532 bitcoin_data.target_to_difficulty(target),
533 bitcoin_data.target_to_difficulty(share_info['bits'].target),
534 current_work2.value['subsidy']*1e-8, net.PARENT.SYMBOL,
535 len(current_work2.value['transactions']),
538 ba = bitcoin_getwork.BlockAttempt(
539 version=current_work.value['version'],
540 previous_block=current_work.value['previous_block'],
541 merkle_root=merkle_root,
542 timestamp=current_work2.value['time'],
543 bits=current_work.value['bits'],
547 def got_response(header, request):
548 assert header['merkle_root'] == merkle_root
550 pow_hash = net.PARENT.POW_FUNC(bitcoin_data.block_header_type.pack(header))
551 on_time = current_work.value['best_share_hash'] == share_info['share_data']['previous_share_hash']
554 if pow_hash <= header['bits'].target or p2pool.DEBUG:
555 @deferral.retry('Error submitting primary block: (will retry)', 10, 10)
557 if factory.conn.value is None:
558 print >>sys.stderr, 'No bitcoind connection when block submittal attempted! Hash: %x' % (bitcoin_data.hash256(bitcoin_data.block_header_type.pack(header)),)
559 raise deferral.RetrySilentlyException()
560 factory.conn.value.send_block(block=dict(header=header, txs=transactions))
562 if pow_hash <= header['bits'].target:
564 print 'GOT BLOCK FROM MINER! Passing to bitcoind! bitcoin: %x' % (bitcoin_data.hash256(bitcoin_data.block_header_type.pack(header)),)
566 recent_blocks.append({ 'ts': time.time(), 'hash': '%x' % (bitcoin_data.hash256(bitcoin_data.block_header_type.pack(header)),) })
568 log.err(None, 'Error while processing potential block:')
570 for aux_work, index, hashes in mm_later:
572 if pow_hash <= aux_work['target'] or p2pool.DEBUG:
573 df = deferral.retry('Error submitting merged block: (will retry)', 10, 10)(aux_work['merged_proxy'].rpc_getauxblock)(
574 pack.IntType(256, 'big').pack(aux_work['hash']).encode('hex'),
575 bitcoin_data.aux_pow_type.pack(dict(
578 block_hash=bitcoin_data.hash256(bitcoin_data.block_header_type.pack(header)),
579 merkle_branch=merkle_branch,
582 merkle_branch=bitcoin_data.calculate_merkle_branch(hashes, index),
584 parent_block_header=header,
589 if result != (pow_hash <= aux_work['target']):
590 print >>sys.stderr, 'Merged block submittal result: %s Expected: %s' % (result, pow_hash <= aux_work['target'])
592 print 'Merged block submittal result: %s' % (result,)
595 log.err(err, 'Error submitting merged block:')
597 log.err(None, 'Error while processing merged mining POW:')
599 if pow_hash <= share_info['bits'].target:
600 share = p2pool_data.Share(net, header, share_info, merkle_branch=merkle_branch, other_txs=transactions[1:] if pow_hash <= header['bits'].target else None)
601 print 'GOT SHARE! %s %s prev %s age %.2fs%s' % (
603 p2pool_data.format_hash(share.hash),
604 p2pool_data.format_hash(share.previous_hash),
605 time.time() - getwork_time,
606 ' DEAD ON ARRIVAL' if not on_time else '',
608 my_share_hashes.add(share.hash)
610 my_doa_share_hashes.add(share.hash)
614 tracker.verified.add(share)
618 if pow_hash <= header['bits'].target or p2pool.DEBUG:
619 for peer in p2p_node.peers.itervalues():
620 peer.sendShares([share])
621 shared_share_hashes.add(share.hash)
623 log.err(None, 'Error forwarding block solution:')
625 if pow_hash <= target:
626 reactor.callLater(1, grapher.add_localrate_point, bitcoin_data.target_to_average_attempts(target), not on_time)
627 if request.getPassword() == vip_pass:
628 reactor.callLater(1, grapher.add_localminer_point, request.getUser(), bitcoin_data.target_to_average_attempts(target), not on_time)
629 self.recent_shares_ts_work.append((time.time(), bitcoin_data.target_to_average_attempts(target)))
630 while len(self.recent_shares_ts_work) > 50:
631 self.recent_shares_ts_work.pop(0)
632 local_rate_monitor.add_datum(dict(work=bitcoin_data.target_to_average_attempts(target), dead=not on_time, user=request.getUser()))
635 if pow_hash > target:
636 print 'Worker %s submitted share with hash > target:' % (request.getUser(),)
637 print ' Hash: %56x' % (pow_hash,)
638 print ' Target: %56x' % (target,)
642 return ba, got_response
644 web_root = resource.Resource()
645 worker_interface.WorkerInterface(WorkerBridge()).attach_to(web_root)
648 if tracker.get_height(current_work.value['best_share_hash']) < 720:
649 return json.dumps(None)
650 return json.dumps(p2pool_data.get_pool_attempts_per_second(tracker, current_work.value['best_share_hash'], 720)
651 / (1 - p2pool_data.get_average_stale_prop(tracker, current_work.value['best_share_hash'], 720)))
654 height, last = tracker.get_height_and_last(current_work.value['best_share_hash'])
655 weights, total_weight, donation_weight = tracker.get_cumulative_weights(current_work.value['best_share_hash'], min(height, 720), 65535*2**256)
657 for script in sorted(weights, key=lambda s: weights[s]):
658 res[bitcoin_data.script2_to_human(script, net.PARENT)] = weights[script]/total_weight
659 return json.dumps(res)
661 def get_current_txouts():
662 share = tracker.shares[current_work.value['best_share_hash']]
663 share_info, gentx = p2pool_data.generate_transaction(tracker, share.share_info['share_data'], share.header['bits'].target, share.share_info['timestamp'], share.net)
664 return dict((out['script'], out['value']) for out in gentx['tx_outs'])
666 def get_current_scaled_txouts(scale, trunc=0):
667 txouts = get_current_txouts()
668 total = sum(txouts.itervalues())
669 results = dict((script, value*scale//total) for script, value in txouts.iteritems())
673 for s in sorted(results, key=results.__getitem__):
674 if results[s] >= trunc:
676 total_random += results[s]
679 winner = math.weighted_choice((script, results[script]) for script in random_set)
680 for script in random_set:
682 results[winner] = total_random
683 if sum(results.itervalues()) < int(scale):
684 results[math.weighted_choice(results.iteritems())] += int(scale) - sum(results.itervalues())
687 def get_current_payouts():
688 return json.dumps(dict((bitcoin_data.script2_to_human(script, net.PARENT), value/1e8) for script, value in get_current_txouts().iteritems()))
690 def get_patron_sendmany(this):
693 this, trunc = this.split('/', 1)
696 return json.dumps(dict(
697 (bitcoin_data.script2_to_address(script, net.PARENT), value/1e8)
698 for script, value in get_current_scaled_txouts(scale=int(float(this)*1e8), trunc=int(float(trunc)*1e8)).iteritems()
699 if bitcoin_data.script2_to_address(script, net.PARENT) is not None
702 return json.dumps(None)
704 def get_global_stats():
705 # averaged over last hour
706 lookbehind = 3600//net.SHARE_PERIOD
707 if tracker.get_height(current_work.value['best_share_hash']) < lookbehind:
710 nonstale_hash_rate = p2pool_data.get_pool_attempts_per_second(tracker, current_work.value['best_share_hash'], lookbehind)
711 stale_prop = p2pool_data.get_average_stale_prop(tracker, current_work.value['best_share_hash'], lookbehind)
712 return json.dumps(dict(
713 pool_nonstale_hash_rate=nonstale_hash_rate,
714 pool_hash_rate=nonstale_hash_rate/(1 - stale_prop),
715 pool_stale_prop=stale_prop,
718 def get_local_stats():
719 lookbehind = 3600//net.SHARE_PERIOD
720 if tracker.get_height(current_work.value['best_share_hash']) < lookbehind:
723 global_stale_prop = p2pool_data.get_average_stale_prop(tracker, current_work.value['best_share_hash'], lookbehind)
725 my_unstale_count = sum(1 for share in tracker.get_chain(current_work.value['best_share_hash'], lookbehind) if share.hash in my_share_hashes)
726 my_orphan_count = sum(1 for share in tracker.get_chain(current_work.value['best_share_hash'], lookbehind) if share.hash in my_share_hashes and share.share_data['stale_info'] == 253)
727 my_doa_count = sum(1 for share in tracker.get_chain(current_work.value['best_share_hash'], lookbehind) if share.hash in my_share_hashes and share.share_data['stale_info'] == 254)
728 my_share_count = my_unstale_count + my_orphan_count + my_doa_count
729 my_stale_count = my_orphan_count + my_doa_count
731 my_stale_prop = my_stale_count/my_share_count if my_share_count != 0 else None
733 my_work = sum(bitcoin_data.target_to_average_attempts(share.target)
734 for share in tracker.get_chain(current_work.value['best_share_hash'], lookbehind - 1)
735 if share.hash in my_share_hashes)
736 actual_time = (tracker.shares[current_work.value['best_share_hash']].timestamp -
737 tracker.shares[tracker.get_nth_parent_hash(current_work.value['best_share_hash'], lookbehind - 1)].timestamp)
738 share_att_s = my_work / actual_time
740 miner_hash_rates = {}
741 miner_dead_hash_rates = {}
742 datums, dt = local_rate_monitor.get_datums_in_last()
744 miner_hash_rates[datum['user']] = miner_hash_rates.get(datum['user'], 0) + datum['work']/dt
746 miner_dead_hash_rates[datum['user']] = miner_dead_hash_rates.get(datum['user'], 0) + datum['work']/dt
748 return json.dumps(dict(
749 my_hash_rates_in_last_hour=dict(
751 nonstale=share_att_s,
752 rewarded=share_att_s/(1 - global_stale_prop),
753 actual=share_att_s/(1 - my_stale_prop) if my_stale_prop is not None else 0, # 0 because we don't have any shares anyway
755 my_share_counts_in_last_hour=dict(
756 shares=my_share_count,
757 unstale_shares=my_unstale_count,
758 stale_shares=my_stale_count,
759 orphan_stale_shares=my_orphan_count,
760 doa_stale_shares=my_doa_count,
762 my_stale_proportions_in_last_hour=dict(
764 orphan_stale=my_orphan_count/my_share_count if my_share_count != 0 else None,
765 dead_stale=my_doa_count/my_share_count if my_share_count != 0 else None,
767 miner_hash_rates=miner_hash_rates,
768 miner_dead_hash_rates=miner_dead_hash_rates,
771 def get_peer_addresses():
772 return ' '.join(peer.transport.getPeer().host + (':' + str(peer.transport.getPeer().port) if peer.transport.getPeer().port != net.P2P_PORT else '') for peer in p2p_node.peers.itervalues())
775 return json.dumps(time.time() - start_time)
777 class WebInterface(resource.Resource):
778 def __init__(self, func, mime_type, *fields):
779 self.func, self.mime_type, self.fields = func, mime_type, fields
781 def render_GET(self, request):
782 request.setHeader('Content-Type', self.mime_type)
783 request.setHeader('Access-Control-Allow-Origin', '*')
784 return self.func(*(request.args[field][0] for field in self.fields))
786 web_root.putChild('rate', WebInterface(get_rate, 'application/json'))
787 web_root.putChild('users', WebInterface(get_users, 'application/json'))
788 web_root.putChild('fee', WebInterface(lambda: json.dumps(args.worker_fee), 'application/json'))
789 web_root.putChild('current_payouts', WebInterface(get_current_payouts, 'application/json'))
790 web_root.putChild('patron_sendmany', WebInterface(get_patron_sendmany, 'text/plain', 'total'))
791 web_root.putChild('global_stats', WebInterface(get_global_stats, 'application/json'))
792 web_root.putChild('local_stats', WebInterface(get_local_stats, 'application/json'))
793 web_root.putChild('peer_addresses', WebInterface(get_peer_addresses, 'text/plain'))
794 web_root.putChild('payout_addr', WebInterface(lambda: json.dumps(bitcoin_data.script2_to_human(my_script, net.PARENT)), 'application/json'))
795 web_root.putChild('recent_blocks', WebInterface(lambda: json.dumps(recent_blocks), 'application/json'))
796 web_root.putChild('uptime', WebInterface(get_uptime, 'application/json'))
798 web_root.putChild('chain_img', WebInterface(lambda: draw.get(tracker, current_work.value['best_share_hash']), 'image/png'))
800 new_root = resource.Resource()
801 web_root.putChild('web', new_root)
804 if os.path.exists(os.path.join(datadir_path, 'stats')):
806 with open(os.path.join(datadir_path, 'stats'), 'rb') as f:
807 stat_log = json.loads(f.read())
809 log.err(None, 'Error loading stats:')
810 def update_stat_log():
811 while stat_log and stat_log[0]['time'] < time.time() - 24*60*60:
814 lookbehind = 3600//net.SHARE_PERIOD
815 if tracker.get_height(current_work.value['best_share_hash']) < lookbehind:
818 global_stale_prop = p2pool_data.get_average_stale_prop(tracker, current_work.value['best_share_hash'], lookbehind)
819 (stale_orphan_shares, stale_doa_shares), shares, _ = get_stale_counts()
821 miner_hash_rates = {}
822 miner_dead_hash_rates = {}
823 datums, dt = local_rate_monitor.get_datums_in_last()
825 miner_hash_rates[datum['user']] = miner_hash_rates.get(datum['user'], 0) + datum['work']/dt
827 miner_dead_hash_rates[datum['user']] = miner_dead_hash_rates.get(datum['user'], 0) + datum['work']/dt
829 stat_log.append(dict(
831 pool_hash_rate=p2pool_data.get_pool_attempts_per_second(tracker, current_work.value['best_share_hash'], lookbehind)/(1-global_stale_prop),
832 pool_stale_prop=global_stale_prop,
833 local_hash_rates=miner_hash_rates,
834 local_dead_hash_rates=miner_dead_hash_rates,
836 stale_shares=stale_orphan_shares + stale_doa_shares,
837 stale_shares_breakdown=dict(orphan=stale_orphan_shares, doa=stale_doa_shares),
840 with open(os.path.join(datadir_path, 'stats'), 'wb') as f:
841 f.write(json.dumps(stat_log))
842 task.LoopingCall(update_stat_log).start(5*60)
843 new_root.putChild('log', WebInterface(lambda: json.dumps(stat_log), 'application/json'))
845 grapher = graphs.Grapher(os.path.join(datadir_path, 'rrd'))
846 web_root.putChild('graphs', grapher.get_resource())
848 if tracker.get_height(current_work.value['best_share_hash']) < 720:
850 nonstalerate = p2pool_data.get_pool_attempts_per_second(tracker, current_work.value['best_share_hash'], 720)
851 poolrate = nonstalerate / (1 - p2pool_data.get_average_stale_prop(tracker, current_work.value['best_share_hash'], 720))
852 grapher.add_poolrate_point(poolrate, poolrate - nonstalerate)
853 task.LoopingCall(add_point).start(100)
855 def attempt_listen():
857 reactor.listenTCP(worker_endpoint[1], server.Site(web_root), interface=worker_endpoint[0])
858 except error.CannotListenError, e:
859 print >>sys.stderr, 'Error binding to worker port: %s. Retrying in 1 second.' % (e.socketError,)
860 reactor.callLater(1, attempt_listen)
862 with open(os.path.join(os.path.join(datadir_path, 'ready_flag')), 'wb') as f:
870 @defer.inlineCallbacks
873 flag = factory.new_block.get_deferred()
875 yield set_real_work1()
878 yield defer.DeferredList([flag, deferral.sleep(15)], fireOnOneCallback=True)
883 print 'Started successfully!'
887 if hasattr(signal, 'SIGALRM'):
888 signal.signal(signal.SIGALRM, lambda signum, frame: reactor.callFromThread(
889 sys.stderr.write, 'Watchdog timer went off at:\n' + ''.join(traceback.format_stack())
891 signal.siginterrupt(signal.SIGALRM, False)
892 task.LoopingCall(signal.alarm, 30).start(1)
894 if args.irc_announce:
895 from twisted.words.protocols import irc
896 class IRCClient(irc.IRCClient):
898 def lineReceived(self, line):
900 irc.IRCClient.lineReceived(self, line)
902 irc.IRCClient.signedOn(self)
903 self.factory.resetDelay()
905 self.watch_id = tracker.verified.added.watch(self._new_share)
906 self.announced_hashes = set()
907 def _new_share(self, share):
908 if share.pow_hash <= share.header['bits'].target and share.header_hash not in self.announced_hashes and abs(share.timestamp - time.time()) < 10*60:
909 self.announced_hashes.add(share.header_hash)
910 self.say('#p2pool', '\x02BLOCK FOUND by %s! http://blockexplorer.com/block/%064x' % (bitcoin_data.script2_to_address(share.share_data['new_script'], net.PARENT), share.header_hash))
911 def connectionLost(self, reason):
912 tracker.verified.added.unwatch(self.watch_id)
913 print 'IRC connection lost:', reason.getErrorMessage()
914 class IRCClientFactory(protocol.ReconnectingClientFactory):
916 reactor.connectTCP("irc.freenode.net", 6667, IRCClientFactory())
918 @defer.inlineCallbacks
923 yield deferral.sleep(3)
925 if time.time() > current_work2.value['last_update'] + 60:
926 print >>sys.stderr, '''---> LOST CONTACT WITH BITCOIND for %s! Check that it isn't frozen or dead! <---''' % (math.format_dt(time.time() - current_work2.value['last_update']),)
928 height = tracker.get_height(current_work.value['best_share_hash'])
929 this_str = 'P2Pool: %i shares in chain (%i verified/%i total) Peers: %i (%i incoming)' % (
931 len(tracker.verified.shares),
934 sum(1 for peer in p2p_node.peers.itervalues() if peer.incoming),
935 ) + (' FDs: %i R/%i W' % (len(reactor.getReaders()), len(reactor.getWriters())) if p2pool.DEBUG else '')
937 datums, dt = local_rate_monitor.get_datums_in_last()
938 my_att_s = sum(datum['work']/dt for datum in datums)
939 this_str += '\n Local: %sH/s in last %s Local dead on arrival: %s Expected time to share: %s' % (
940 math.format(int(my_att_s)),
942 math.format_binomial_conf(sum(1 for datum in datums if datum['dead']), len(datums), 0.95),
943 math.format_dt(2**256 / tracker.shares[current_work.value['best_share_hash']].target / my_att_s) if my_att_s else '???',
947 (stale_orphan_shares, stale_doa_shares), shares, _ = get_stale_counts()
948 stale_prop = p2pool_data.get_average_stale_prop(tracker, current_work.value['best_share_hash'], min(720, height))
949 real_att_s = p2pool_data.get_pool_attempts_per_second(tracker, current_work.value['best_share_hash'], min(height - 1, 720)) / (1 - stale_prop)
951 this_str += '\n Shares: %i (%i orphan, %i dead) Stale rate: %s Efficiency: %s Current payout: %.4f %s' % (
952 shares, stale_orphan_shares, stale_doa_shares,
953 math.format_binomial_conf(stale_orphan_shares + stale_doa_shares, shares, 0.95),
954 math.format_binomial_conf(stale_orphan_shares + stale_doa_shares, shares, 0.95, lambda x: (1 - x)/(1 - stale_prop)),
955 get_current_txouts().get(my_script, 0)*1e-8, net.PARENT.SYMBOL,
957 this_str += '\n Pool: %sH/s Stale rate: %.1f%% Expected time to block: %s' % (
958 math.format(int(real_att_s)),
960 math.format_dt(2**256 / current_work.value['bits'].target / real_att_s),
963 if this_str != last_str or time.time() > last_time + 15:
966 last_time = time.time()
971 log.err(None, 'Fatal error:')
975 class FixedArgumentParser(argparse.ArgumentParser):
976 def _read_args_from_files(self, arg_strings):
977 # expand arguments referencing files
979 for arg_string in arg_strings:
981 # for regular arguments, just add them back into the list
982 if not arg_string or arg_string[0] not in self.fromfile_prefix_chars:
983 new_arg_strings.append(arg_string)
985 # replace arguments referencing files with the file content
988 args_file = open(arg_string[1:])
991 for arg_line in args_file.read().splitlines():
992 for arg in self.convert_arg_line_to_args(arg_line):
993 arg_strings.append(arg)
994 arg_strings = self._read_args_from_files(arg_strings)
995 new_arg_strings.extend(arg_strings)
999 err = sys.exc_info()[1]
1000 self.error(str(err))
1002 # return the modified argument list
1003 return new_arg_strings
1005 def convert_arg_line_to_args(self, arg_line):
1006 return [arg for arg in arg_line.split() if arg.strip()]
1009 realnets=dict((name, net) for name, net in networks.nets.iteritems() if '_testnet' not in name)
1011 parser = FixedArgumentParser(description='p2pool (version %s)' % (p2pool.__version__,), fromfile_prefix_chars='@')
1012 parser.add_argument('--version', action='version', version=p2pool.__version__)
1013 parser.add_argument('--net',
1014 help='use specified network (default: bitcoin)',
1015 action='store', choices=sorted(realnets), default='bitcoin', dest='net_name')
1016 parser.add_argument('--testnet',
1017 help='''use the network's testnet''',
1018 action='store_const', const=True, default=False, dest='testnet')
1019 parser.add_argument('--debug',
1020 help='enable debugging mode',
1021 action='store_const', const=True, default=False, dest='debug')
1022 parser.add_argument('-a', '--address',
1023 help='generate payouts to this address (default: <address requested from bitcoind>)',
1024 type=str, action='store', default=None, dest='address')
1025 parser.add_argument('--datadir',
1026 help='store data in this directory (default: <directory run_p2pool.py is in>/data)',
1027 type=str, action='store', default=None, dest='datadir')
1028 parser.add_argument('--logfile',
1029 help='''log to this file (default: data/<NET>/log)''',
1030 type=str, action='store', default=None, dest='logfile')
1031 parser.add_argument('--merged',
1032 help='call getauxblock on this url to get work for merged mining (example: http://ncuser:ncpass@127.0.0.1:10332/)',
1033 type=str, action='append', default=[], dest='merged_urls')
1034 parser.add_argument('--merged-url',
1035 help='DEPRECATED, use --merged',
1036 type=str, action='store', default=None, dest='merged_url')
1037 parser.add_argument('--merged-userpass',
1038 help='DEPRECATED, use --merged',
1039 type=str, action='store', default=None, dest='merged_userpass')
1040 parser.add_argument('--give-author', metavar='DONATION_PERCENTAGE',
1041 help='donate this percentage of work to author of p2pool (default: 0.5)',
1042 type=float, action='store', default=0.5, dest='donation_percentage')
1043 parser.add_argument('--irc-announce',
1044 help='announce any blocks found on irc://irc.freenode.net/#p2pool',
1045 action='store_true', default=False, dest='irc_announce')
1047 p2pool_group = parser.add_argument_group('p2pool interface')
1048 p2pool_group.add_argument('--p2pool-port', metavar='PORT',
1049 help='use port PORT to listen for connections (forward this port from your router!) (default: %s)' % ', '.join('%s:%i' % (name, net.P2P_PORT) for name, net in sorted(realnets.items())),
1050 type=int, action='store', default=None, dest='p2pool_port')
1051 p2pool_group.add_argument('-n', '--p2pool-node', metavar='ADDR[:PORT]',
1052 help='connect to existing p2pool node at ADDR listening on port PORT (defaults to default p2pool P2P port) in addition to builtin addresses',
1053 type=str, action='append', default=[], dest='p2pool_nodes')
1054 parser.add_argument('--disable-upnp',
1055 help='''don't attempt to use UPnP to forward p2pool's P2P port from the Internet to this computer''',
1056 action='store_false', default=True, dest='upnp')
1058 worker_group = parser.add_argument_group('worker interface')
1059 worker_group.add_argument('-w', '--worker-port', metavar='PORT or ADDR:PORT',
1060 help='listen on PORT on interface with ADDR for RPC connections from miners (default: all interfaces, %s)' % ', '.join('%s:%i' % (name, net.WORKER_PORT) for name, net in sorted(realnets.items())),
1061 type=str, action='store', default=None, dest='worker_endpoint')
1062 worker_group.add_argument('-f', '--fee', metavar='FEE_PERCENTAGE',
1063 help='''charge workers mining to their own bitcoin address (by setting their miner's username to a bitcoin address) this percentage fee to mine on your p2pool instance. Amount displayed at http://127.0.0.1:WORKER_PORT/fee (default: 0)''',
1064 type=float, action='store', default=0, dest='worker_fee')
1066 bitcoind_group = parser.add_argument_group('bitcoind interface')
1067 bitcoind_group.add_argument('--bitcoind-address', metavar='BITCOIND_ADDRESS',
1068 help='connect to this address (default: 127.0.0.1)',
1069 type=str, action='store', default='127.0.0.1', dest='bitcoind_address')
1070 bitcoind_group.add_argument('--bitcoind-rpc-port', metavar='BITCOIND_RPC_PORT',
1071 help='''connect to JSON-RPC interface at this port (default: %s <read from bitcoin.conf if password not provided>)''' % ', '.join('%s:%i' % (name, net.PARENT.RPC_PORT) for name, net in sorted(realnets.items())),
1072 type=int, action='store', default=None, dest='bitcoind_rpc_port')
1073 bitcoind_group.add_argument('--bitcoind-p2p-port', metavar='BITCOIND_P2P_PORT',
1074 help='''connect to P2P interface at this port (default: %s <read from bitcoin.conf if password not provided>)''' % ', '.join('%s:%i' % (name, net.PARENT.P2P_PORT) for name, net in sorted(realnets.items())),
1075 type=int, action='store', default=None, dest='bitcoind_p2p_port')
1077 bitcoind_group.add_argument(metavar='BITCOIND_RPCUSERPASS',
1078 help='bitcoind RPC interface username, then password, space-separated (only one being provided will cause the username to default to being empty, and none will cause P2Pool to read them from bitcoin.conf)',
1079 type=str, action='store', default=[], nargs='*', dest='bitcoind_rpc_userpass')
1081 args = parser.parse_args()
1086 net_name = args.net_name + ('_testnet' if args.testnet else '')
1087 net = networks.nets[net_name]
1089 datadir_path = os.path.join((os.path.join(os.path.dirname(sys.argv[0]), 'data') if args.datadir is None else args.datadir), net_name)
1090 if not os.path.exists(datadir_path):
1091 os.makedirs(datadir_path)
1093 if len(args.bitcoind_rpc_userpass) > 2:
1094 parser.error('a maximum of two arguments are allowed')
1095 args.bitcoind_rpc_username, args.bitcoind_rpc_password = ([None, None] + args.bitcoind_rpc_userpass)[-2:]
1097 if args.bitcoind_rpc_password is None:
1098 if not hasattr(net.PARENT, 'CONF_FILE_FUNC'):
1099 parser.error('This network has no configuration file function. Manually enter your RPC password.')
1100 conf_path = net.PARENT.CONF_FILE_FUNC()
1101 if not os.path.exists(conf_path):
1102 parser.error('''Bitcoin configuration file not found. Manually enter your RPC password.\r\n'''
1103 '''If you actually haven't created a configuration file, you should create one at %s with the text:\r\n'''
1106 '''rpcpassword=%x''' % (conf_path, random.randrange(2**128)))
1107 with open(conf_path, 'rb') as f:
1108 cp = ConfigParser.RawConfigParser()
1109 cp.readfp(StringIO.StringIO('[x]\r\n' + f.read()))
1110 for conf_name, var_name, var_type in [
1111 ('rpcuser', 'bitcoind_rpc_username', str),
1112 ('rpcpassword', 'bitcoind_rpc_password', str),
1113 ('rpcport', 'bitcoind_rpc_port', int),
1114 ('port', 'bitcoind_p2p_port', int),
1116 if getattr(args, var_name) is None and cp.has_option('x', conf_name):
1117 setattr(args, var_name, var_type(cp.get('x', conf_name)))
1119 if args.bitcoind_rpc_username is None:
1120 args.bitcoind_rpc_username = ''
1122 if args.bitcoind_rpc_port is None:
1123 args.bitcoind_rpc_port = net.PARENT.RPC_PORT
1125 if args.bitcoind_p2p_port is None:
1126 args.bitcoind_p2p_port = net.PARENT.P2P_PORT
1128 if args.p2pool_port is None:
1129 args.p2pool_port = net.P2P_PORT
1131 if args.worker_endpoint is None:
1132 worker_endpoint = '', net.WORKER_PORT
1133 elif ':' not in args.worker_endpoint:
1134 worker_endpoint = '', int(args.worker_endpoint)
1136 addr, port = args.worker_endpoint.rsplit(':', 1)
1137 worker_endpoint = addr, int(port)
1139 if args.address is not None:
1141 args.pubkey_hash = bitcoin_data.address_to_pubkey_hash(args.address, net.PARENT)
1142 except Exception, e:
1143 parser.error('error parsing address: ' + repr(e))
1145 args.pubkey_hash = None
1147 def separate_url(url):
1148 s = urlparse.urlsplit(url)
1149 if '@' not in s.netloc:
1150 parser.error('merged url netloc must contain an "@"')
1151 userpass, new_netloc = s.netloc.rsplit('@', 1)
1152 return urlparse.urlunsplit(s._replace(netloc=new_netloc)), userpass
1153 merged_urls = map(separate_url, args.merged_urls)
1155 if args.merged_url is not None or args.merged_userpass is not None:
1156 print '--merged-url and --merged-userpass are deprecated! Use --merged http://USER:PASS@HOST:PORT/ instead!'
1157 print 'Pausing 10 seconds...'
1160 if args.merged_url is None or args.merged_userpass is None:
1161 parser.error('must specify both --merged-url and --merged-userpass')
1163 merged_urls = merged_urls + [(args.merged_url, args.merged_userpass)]
1166 if args.logfile is None:
1167 args.logfile = os.path.join(datadir_path, 'log')
1169 logfile = logging.LogFile(args.logfile)
1170 pipe = logging.TimestampingPipe(logging.TeePipe([logging.EncodeReplacerPipe(sys.stderr), logfile]))
1171 sys.stdout = logging.AbortPipe(pipe)
1172 sys.stderr = log.DefaultObserver.stderr = logging.AbortPipe(logging.PrefixPipe(pipe, '> '))
1173 if hasattr(signal, "SIGUSR1"):
1174 def sigusr1(signum, frame):
1175 print 'Caught SIGUSR1, closing %r...' % (args.logfile,)
1177 print '...and reopened %r after catching SIGUSR1.' % (args.logfile,)
1178 signal.signal(signal.SIGUSR1, sigusr1)
1179 task.LoopingCall(logfile.reopen).start(5)
1181 reactor.callWhenRunning(main, args, net, datadir_path, merged_urls, worker_endpoint)