added --outgoing-conns argument for people who want to decrease bandwidth usage
[p2pool.git] / p2pool / main.py
1 from __future__ import division
2
3 import base64
4 import json
5 import os
6 import random
7 import sys
8 import time
9 import signal
10 import traceback
11 import urlparse
12
13 if '--iocp' in sys.argv:
14     from twisted.internet import iocpreactor
15     iocpreactor.install()
16 from twisted.internet import defer, reactor, protocol, task
17 from twisted.web import server
18 from twisted.python import log
19 from nattraverso import portmapper, ipdiscover
20
21 import bitcoin.p2p as bitcoin_p2p, bitcoin.data as bitcoin_data
22 from bitcoin import worker_interface, helper
23 from util import fixargparse, jsonrpc, variable, deferral, math, logging
24 from . import networks, web, work
25 import p2pool, p2pool.data as p2pool_data, p2pool.node as p2pool_node
26
27 @defer.inlineCallbacks
28 def main(args, net, datadir_path, merged_urls, worker_endpoint):
29     try:
30         print 'p2pool (version %s)' % (p2pool.__version__,)
31         print
32         
33         @defer.inlineCallbacks
34         def connect_p2p():
35             # connect to bitcoind over bitcoin-p2p
36             print '''Testing bitcoind P2P connection to '%s:%s'...''' % (args.bitcoind_address, args.bitcoind_p2p_port)
37             factory = bitcoin_p2p.ClientFactory(net.PARENT)
38             reactor.connectTCP(args.bitcoind_address, args.bitcoind_p2p_port, factory)
39             yield factory.getProtocol() # waits until handshake is successful
40             print '    ...success!'
41             print
42             defer.returnValue(factory)
43         
44         if args.testnet: # establish p2p connection first if testnet so bitcoind can work without connections
45             factory = yield connect_p2p()
46         
47         # connect to bitcoind over JSON-RPC and do initial getmemorypool
48         url = '%s://%s:%i/' % ('https' if args.bitcoind_rpc_ssl else 'http', args.bitcoind_address, args.bitcoind_rpc_port)
49         print '''Testing bitcoind RPC connection to '%s' with username '%s'...''' % (url, args.bitcoind_rpc_username)
50         bitcoind = jsonrpc.Proxy(url, dict(Authorization='Basic ' + base64.b64encode(args.bitcoind_rpc_username + ':' + args.bitcoind_rpc_password)), timeout=30)
51         yield helper.check(bitcoind, net)
52         temp_work = yield helper.getwork(bitcoind)
53         
54         bitcoind_warning_var = variable.Variable(None)
55         @defer.inlineCallbacks
56         def poll_warnings():
57             errors = (yield deferral.retry('Error while calling getmininginfo:')(bitcoind.rpc_getmininginfo)())['errors']
58             bitcoind_warning_var.set(errors if errors != '' else None)
59         yield poll_warnings()
60         task.LoopingCall(poll_warnings).start(20*60)
61         
62         print '    ...success!'
63         print '    Current block hash: %x' % (temp_work['previous_block'],)
64         print '    Current block height: %i' % (temp_work['height'] - 1,)
65         print
66         
67         if not args.testnet:
68             factory = yield connect_p2p()
69         
70         print 'Determining payout address...'
71         if args.pubkey_hash is None:
72             address_path = os.path.join(datadir_path, 'cached_payout_address')
73             
74             if os.path.exists(address_path):
75                 with open(address_path, 'rb') as f:
76                     address = f.read().strip('\r\n')
77                 print '    Loaded cached address: %s...' % (address,)
78             else:
79                 address = None
80             
81             if address is not None:
82                 res = yield deferral.retry('Error validating cached address:', 5)(lambda: bitcoind.rpc_validateaddress(address))()
83                 if not res['isvalid'] or not res['ismine']:
84                     print '    Cached address is either invalid or not controlled by local bitcoind!'
85                     address = None
86             
87             if address is None:
88                 print '    Getting payout address from bitcoind...'
89                 address = yield deferral.retry('Error getting payout address from bitcoind:', 5)(lambda: bitcoind.rpc_getaccountaddress('p2pool'))()
90             
91             with open(address_path, 'wb') as f:
92                 f.write(address)
93             
94             my_pubkey_hash = bitcoin_data.address_to_pubkey_hash(address, net.PARENT)
95         else:
96             my_pubkey_hash = args.pubkey_hash
97         print '    ...success! Payout address:', bitcoin_data.pubkey_hash_to_address(my_pubkey_hash, net.PARENT)
98         print
99         
100         ss = p2pool_data.ShareStore(os.path.join(datadir_path, 'shares.'), net)
101         shares = {}
102         known_verified = set()
103         print "Loading shares..."
104         for i, (mode, contents) in enumerate(ss.get_shares()):
105             if mode == 'share':
106                 contents.time_seen = 0
107                 shares[contents.hash] = contents
108                 if len(shares) % 1000 == 0 and shares:
109                     print "    %i" % (len(shares),)
110             elif mode == 'verified_hash':
111                 known_verified.add(contents)
112             else:
113                 raise AssertionError()
114         print "    ...done loading %i shares (%i verified)!" % (len(shares), len(known_verified))
115         print
116         
117         
118         print 'Initializing work...'
119         
120         node = p2pool_node.Node(factory, bitcoind, shares.values(), known_verified, net)
121         yield node.start()
122         
123         for share_hash in shares:
124             if share_hash not in node.tracker.items:
125                 ss.forget_share(share_hash)
126         for share_hash in known_verified:
127             if share_hash not in node.tracker.verified.items:
128                 ss.forget_verified_share(share_hash)
129         del shares, known_verified
130         node.tracker.removed.watch(lambda share: ss.forget_share(share.hash))
131         node.tracker.verified.removed.watch(lambda share: ss.forget_verified_share(share.hash))
132         
133         def save_shares():
134             for share in node.tracker.get_chain(node.best_share_var.value, min(node.tracker.get_height(node.best_share_var.value), 2*net.CHAIN_LENGTH)):
135                 ss.add_share(share)
136                 if share.hash in node.tracker.verified.items:
137                     ss.add_verified_hash(share.hash)
138         task.LoopingCall(save_shares).start(60)
139         
140         print '    ...success!'
141         print
142         
143         
144         print 'Joining p2pool network using port %i...' % (args.p2pool_port,)
145         
146         @defer.inlineCallbacks
147         def parse(x):
148             if ':' in x:
149                 ip, port = x.split(':')
150                 defer.returnValue(((yield reactor.resolve(ip)), int(port)))
151             else:
152                 defer.returnValue(((yield reactor.resolve(x)), net.P2P_PORT))
153         
154         addrs = {}
155         if os.path.exists(os.path.join(datadir_path, 'addrs')):
156             try:
157                 with open(os.path.join(datadir_path, 'addrs'), 'rb') as f:
158                     addrs.update(dict((tuple(k), v) for k, v in json.loads(f.read())))
159             except:
160                 print >>sys.stderr, 'error parsing addrs'
161         for addr_df in map(parse, net.BOOTSTRAP_ADDRS):
162             try:
163                 addr = yield addr_df
164                 if addr not in addrs:
165                     addrs[addr] = (0, time.time(), time.time())
166             except:
167                 log.err()
168         
169         connect_addrs = set()
170         for addr_df in map(parse, args.p2pool_nodes):
171             try:
172                 connect_addrs.add((yield addr_df))
173             except:
174                 log.err()
175         
176         node.p2p_node = p2pool_node.P2PNode(node,
177             port=args.p2pool_port,
178             max_incoming_conns=args.p2pool_conns,
179             addr_store=addrs,
180             connect_addrs=connect_addrs,
181             desired_outgoing_conns=args.p2pool_outgoing_conns,
182         )
183         node.p2p_node.start()
184         
185         def save_addrs():
186             with open(os.path.join(datadir_path, 'addrs'), 'wb') as f:
187                 f.write(json.dumps(node.p2p_node.addr_store.items()))
188         task.LoopingCall(save_addrs).start(60)
189         
190         print '    ...success!'
191         print
192         
193         if args.upnp:
194             @defer.inlineCallbacks
195             def upnp_thread():
196                 while True:
197                     try:
198                         is_lan, lan_ip = yield ipdiscover.get_local_ip()
199                         if is_lan:
200                             pm = yield portmapper.get_port_mapper()
201                             yield pm._upnp.add_port_mapping(lan_ip, args.p2pool_port, args.p2pool_port, 'p2pool', 'TCP')
202                     except defer.TimeoutError:
203                         pass
204                     except:
205                         if p2pool.DEBUG:
206                             log.err(None, 'UPnP error:')
207                     yield deferral.sleep(random.expovariate(1/120))
208             upnp_thread()
209         
210         # start listening for workers with a JSON-RPC server
211         
212         print 'Listening for workers on %r port %i...' % (worker_endpoint[0], worker_endpoint[1])
213         
214         wb = work.WorkerBridge(node, my_pubkey_hash, args.donation_percentage, merged_urls, args.worker_fee)
215         web_root = web.get_web_root(wb, datadir_path, bitcoind_warning_var)
216         worker_interface.WorkerInterface(wb).attach_to(web_root, get_handler=lambda request: request.redirect('/static/'))
217         
218         deferral.retry('Error binding to worker port:', traceback=False)(reactor.listenTCP)(worker_endpoint[1], server.Site(web_root), interface=worker_endpoint[0])
219         
220         with open(os.path.join(os.path.join(datadir_path, 'ready_flag')), 'wb') as f:
221             pass
222         
223         print '    ...success!'
224         print
225         
226         
227         # done!
228         print 'Started successfully!'
229         print 'Go to http://127.0.0.1:%i/ to view graphs and statistics!' % (worker_endpoint[1],)
230         if args.donation_percentage > 0.51:
231             print '''Donating %.1f%% of work towards P2Pool's development. Thanks for the tip!''' % (args.donation_percentage,)
232         elif args.donation_percentage < 0.49:
233             print '''Donating %.1f%% of work towards P2Pool's development. Please donate to encourage further development of P2Pool!''' % (args.donation_percentage,)
234         else:
235             print '''Donating %.1f%% of work towards P2Pool's development. Thank you!''' % (args.donation_percentage,)
236             print 'You can increase this amount with --give-author argument! (or decrease it, if you must)'
237         print
238         
239         
240         if hasattr(signal, 'SIGALRM'):
241             signal.signal(signal.SIGALRM, lambda signum, frame: reactor.callFromThread(
242                 sys.stderr.write, 'Watchdog timer went off at:\n' + ''.join(traceback.format_stack())
243             ))
244             signal.siginterrupt(signal.SIGALRM, False)
245             task.LoopingCall(signal.alarm, 30).start(1)
246         
247         if args.irc_announce:
248             from twisted.words.protocols import irc
249             class IRCClient(irc.IRCClient):
250                 nickname = 'p2pool%02i' % (random.randrange(100),)
251                 channel = net.ANNOUNCE_CHANNEL
252                 def lineReceived(self, line):
253                     if p2pool.DEBUG:
254                         print repr(line)
255                     irc.IRCClient.lineReceived(self, line)
256                 def signedOn(self):
257                     self.in_channel = False
258                     irc.IRCClient.signedOn(self)
259                     self.factory.resetDelay()
260                     self.join(self.channel)
261                     @defer.inlineCallbacks
262                     def new_share(share):
263                         if not self.in_channel:
264                             return
265                         if share.pow_hash <= share.header['bits'].target and abs(share.timestamp - time.time()) < 10*60:
266                             yield deferral.sleep(random.expovariate(1/60))
267                             message = '\x02%s BLOCK FOUND by %s! %s%064x' % (net.NAME.upper(), bitcoin_data.script2_to_address(share.new_script, net.PARENT), net.PARENT.BLOCK_EXPLORER_URL_PREFIX, share.header_hash)
268                             if all('%x' % (share.header_hash,) not in old_message for old_message in self.recent_messages):
269                                 self.say(self.channel, message)
270                                 self._remember_message(message)
271                     self.watch_id = node.tracker.verified.added.watch(new_share)
272                     self.recent_messages = []
273                 def joined(self, channel):
274                     self.in_channel = True
275                 def left(self, channel):
276                     self.in_channel = False
277                 def _remember_message(self, message):
278                     self.recent_messages.append(message)
279                     while len(self.recent_messages) > 100:
280                         self.recent_messages.pop(0)
281                 def privmsg(self, user, channel, message):
282                     if channel == self.channel:
283                         self._remember_message(message)
284                 def connectionLost(self, reason):
285                     node.tracker.verified.added.unwatch(self.watch_id)
286                     print 'IRC connection lost:', reason.getErrorMessage()
287             class IRCClientFactory(protocol.ReconnectingClientFactory):
288                 protocol = IRCClient
289             reactor.connectTCP("irc.freenode.net", 6667, IRCClientFactory())
290         
291         @defer.inlineCallbacks
292         def status_thread():
293             last_str = None
294             last_time = 0
295             while True:
296                 yield deferral.sleep(3)
297                 try:
298                     height = node.tracker.get_height(node.best_share_var.value)
299                     this_str = 'P2Pool: %i shares in chain (%i verified/%i total) Peers: %i (%i incoming)' % (
300                         height,
301                         len(node.tracker.verified.items),
302                         len(node.tracker.items),
303                         len(node.p2p_node.peers),
304                         sum(1 for peer in node.p2p_node.peers.itervalues() if peer.incoming),
305                     ) + (' FDs: %i R/%i W' % (len(reactor.getReaders()), len(reactor.getWriters())) if p2pool.DEBUG else '')
306                     
307                     datums, dt = wb.local_rate_monitor.get_datums_in_last()
308                     my_att_s = sum(datum['work']/dt for datum in datums)
309                     this_str += '\n Local: %sH/s in last %s Local dead on arrival: %s Expected time to share: %s' % (
310                         math.format(int(my_att_s)),
311                         math.format_dt(dt),
312                         math.format_binomial_conf(sum(1 for datum in datums if datum['dead']), len(datums), 0.95),
313                         math.format_dt(2**256 / node.tracker.items[node.best_share_var.value].max_target / my_att_s) if my_att_s and node.best_share_var.value else '???',
314                     )
315                     
316                     if height > 2:
317                         (stale_orphan_shares, stale_doa_shares), shares, _ = wb.get_stale_counts()
318                         stale_prop = p2pool_data.get_average_stale_prop(node.tracker, node.best_share_var.value, min(60*60//net.SHARE_PERIOD, height))
319                         real_att_s = p2pool_data.get_pool_attempts_per_second(node.tracker, node.best_share_var.value, min(height - 1, 60*60//net.SHARE_PERIOD)) / (1 - stale_prop)
320                         
321                         this_str += '\n Shares: %i (%i orphan, %i dead) Stale rate: %s Efficiency: %s Current payout: %.4f %s' % (
322                             shares, stale_orphan_shares, stale_doa_shares,
323                             math.format_binomial_conf(stale_orphan_shares + stale_doa_shares, shares, 0.95),
324                             math.format_binomial_conf(stale_orphan_shares + stale_doa_shares, shares, 0.95, lambda x: (1 - x)/(1 - stale_prop)),
325                             node.get_current_txouts().get(bitcoin_data.pubkey_hash_to_script2(my_pubkey_hash), 0)*1e-8, net.PARENT.SYMBOL,
326                         )
327                         this_str += '\n Pool: %sH/s Stale rate: %.1f%% Expected time to block: %s' % (
328                             math.format(int(real_att_s)),
329                             100*stale_prop,
330                             math.format_dt(2**256 / node.bitcoind_work.value['bits'].target / real_att_s),
331                         )
332                         
333                         for warning in p2pool_data.get_warnings(node.tracker, node.best_share_var.value, net, bitcoind_warning_var.value, node.bitcoind_work.value):
334                             print >>sys.stderr, '#'*40
335                             print >>sys.stderr, '>>> Warning: ' + warning
336                             print >>sys.stderr, '#'*40
337                     
338                     if this_str != last_str or time.time() > last_time + 15:
339                         print this_str
340                         last_str = this_str
341                         last_time = time.time()
342                 except:
343                     log.err()
344         status_thread()
345     except:
346         reactor.stop()
347         log.err(None, 'Fatal error:')
348
349 def run():
350     realnets = dict((name, net) for name, net in networks.nets.iteritems() if '_testnet' not in name)
351     
352     parser = fixargparse.FixedArgumentParser(description='p2pool (version %s)' % (p2pool.__version__,), fromfile_prefix_chars='@')
353     parser.add_argument('--version', action='version', version=p2pool.__version__)
354     parser.add_argument('--net',
355         help='use specified network (default: bitcoin)',
356         action='store', choices=sorted(realnets), default='bitcoin', dest='net_name')
357     parser.add_argument('--testnet',
358         help='''use the network's testnet''',
359         action='store_const', const=True, default=False, dest='testnet')
360     parser.add_argument('--debug',
361         help='enable debugging mode',
362         action='store_const', const=True, default=False, dest='debug')
363     parser.add_argument('-a', '--address',
364         help='generate payouts to this address (default: <address requested from bitcoind>)',
365         type=str, action='store', default=None, dest='address')
366     parser.add_argument('--datadir',
367         help='store data in this directory (default: <directory run_p2pool.py is in>/data)',
368         type=str, action='store', default=None, dest='datadir')
369     parser.add_argument('--logfile',
370         help='''log to this file (default: data/<NET>/log)''',
371         type=str, action='store', default=None, dest='logfile')
372     parser.add_argument('--merged',
373         help='call getauxblock on this url to get work for merged mining (example: http://ncuser:ncpass@127.0.0.1:10332/)',
374         type=str, action='append', default=[], dest='merged_urls')
375     parser.add_argument('--give-author', metavar='DONATION_PERCENTAGE',
376         help='donate this percentage of work towards the development of p2pool (default: 0.5)',
377         type=float, action='store', default=0.5, dest='donation_percentage')
378     parser.add_argument('--iocp',
379         help='use Windows IOCP API in order to avoid errors due to large number of sockets being open',
380         action='store_true', default=False, dest='iocp')
381     parser.add_argument('--irc-announce',
382         help='announce any blocks found on irc://irc.freenode.net/#p2pool',
383         action='store_true', default=False, dest='irc_announce')
384     parser.add_argument('--no-bugreport',
385         help='disable submitting caught exceptions to the author',
386         action='store_true', default=False, dest='no_bugreport')
387     
388     p2pool_group = parser.add_argument_group('p2pool interface')
389     p2pool_group.add_argument('--p2pool-port', metavar='PORT',
390         help='use port PORT to listen for connections (forward this port from your router!) (default: %s)' % ', '.join('%s:%i' % (name, net.P2P_PORT) for name, net in sorted(realnets.items())),
391         type=int, action='store', default=None, dest='p2pool_port')
392     p2pool_group.add_argument('-n', '--p2pool-node', metavar='ADDR[:PORT]',
393         help='connect to existing p2pool node at ADDR listening on port PORT (defaults to default p2pool P2P port) in addition to builtin addresses',
394         type=str, action='append', default=[], dest='p2pool_nodes')
395     parser.add_argument('--disable-upnp',
396         help='''don't attempt to use UPnP to forward p2pool's P2P port from the Internet to this computer''',
397         action='store_false', default=True, dest='upnp')
398     p2pool_group.add_argument('--max-conns', metavar='CONNS',
399         help='maximum incoming connections (default: 40)',
400         type=int, action='store', default=40, dest='p2pool_conns')
401     p2pool_group.add_argument('--outgoing-conns', metavar='CONNS',
402         help='outgoing connections (default: 10)',
403         type=int, action='store', default=10, dest='p2pool_outgoing_conns')
404     
405     worker_group = parser.add_argument_group('worker interface')
406     worker_group.add_argument('-w', '--worker-port', metavar='PORT or ADDR:PORT',
407         help='listen on PORT on interface with ADDR for RPC connections from miners (default: all interfaces, %s)' % ', '.join('%s:%i' % (name, net.WORKER_PORT) for name, net in sorted(realnets.items())),
408         type=str, action='store', default=None, dest='worker_endpoint')
409     worker_group.add_argument('-f', '--fee', metavar='FEE_PERCENTAGE',
410         help='''charge workers mining to their own bitcoin address (by setting their miner's username to a bitcoin address) this percentage fee to mine on your p2pool instance. Amount displayed at http://127.0.0.1:WORKER_PORT/fee (default: 0)''',
411         type=float, action='store', default=0, dest='worker_fee')
412     
413     bitcoind_group = parser.add_argument_group('bitcoind interface')
414     bitcoind_group.add_argument('--bitcoind-address', metavar='BITCOIND_ADDRESS',
415         help='connect to this address (default: 127.0.0.1)',
416         type=str, action='store', default='127.0.0.1', dest='bitcoind_address')
417     bitcoind_group.add_argument('--bitcoind-rpc-port', metavar='BITCOIND_RPC_PORT',
418         help='''connect to JSON-RPC interface at this port (default: %s <read from bitcoin.conf if password not provided>)''' % ', '.join('%s:%i' % (name, net.PARENT.RPC_PORT) for name, net in sorted(realnets.items())),
419         type=int, action='store', default=None, dest='bitcoind_rpc_port')
420     bitcoind_group.add_argument('--bitcoind-rpc-ssl',
421         help='connect to JSON-RPC interface using SSL',
422         action='store_true', default=False, dest='bitcoind_rpc_ssl')
423     bitcoind_group.add_argument('--bitcoind-p2p-port', metavar='BITCOIND_P2P_PORT',
424         help='''connect to P2P interface at this port (default: %s <read from bitcoin.conf if password not provided>)''' % ', '.join('%s:%i' % (name, net.PARENT.P2P_PORT) for name, net in sorted(realnets.items())),
425         type=int, action='store', default=None, dest='bitcoind_p2p_port')
426     
427     bitcoind_group.add_argument(metavar='BITCOIND_RPCUSERPASS',
428         help='bitcoind RPC interface username, then password, space-separated (only one being provided will cause the username to default to being empty, and none will cause P2Pool to read them from bitcoin.conf)',
429         type=str, action='store', default=[], nargs='*', dest='bitcoind_rpc_userpass')
430     
431     args = parser.parse_args()
432     
433     if args.debug:
434         p2pool.DEBUG = True
435         defer.setDebugging(True)
436     
437     net_name = args.net_name + ('_testnet' if args.testnet else '')
438     net = networks.nets[net_name]
439     
440     datadir_path = os.path.join((os.path.join(os.path.dirname(sys.argv[0]), 'data') if args.datadir is None else args.datadir), net_name)
441     if not os.path.exists(datadir_path):
442         os.makedirs(datadir_path)
443     
444     if len(args.bitcoind_rpc_userpass) > 2:
445         parser.error('a maximum of two arguments are allowed')
446     args.bitcoind_rpc_username, args.bitcoind_rpc_password = ([None, None] + args.bitcoind_rpc_userpass)[-2:]
447     
448     if args.bitcoind_rpc_password is None:
449         conf_path = net.PARENT.CONF_FILE_FUNC()
450         if not os.path.exists(conf_path):
451             parser.error('''Bitcoin configuration file not found. Manually enter your RPC password.\r\n'''
452                 '''If you actually haven't created a configuration file, you should create one at %s with the text:\r\n'''
453                 '''\r\n'''
454                 '''server=1\r\n'''
455                 '''rpcpassword=%x\r\n'''
456                 '''\r\n'''
457                 '''Keep that password secret! After creating the file, restart Bitcoin.''' % (conf_path, random.randrange(2**128)))
458         conf = open(conf_path, 'rb').read()
459         contents = {}
460         for line in conf.splitlines(True):
461             if '#' in line:
462                 line = line[:line.index('#')]
463             if '=' not in line:
464                 continue
465             k, v = line.split('=', 1)
466             contents[k.strip()] = v.strip()
467         for conf_name, var_name, var_type in [
468             ('rpcuser', 'bitcoind_rpc_username', str),
469             ('rpcpassword', 'bitcoind_rpc_password', str),
470             ('rpcport', 'bitcoind_rpc_port', int),
471             ('port', 'bitcoind_p2p_port', int),
472         ]:
473             if getattr(args, var_name) is None and conf_name in contents:
474                 setattr(args, var_name, var_type(contents[conf_name]))
475         if args.bitcoind_rpc_password is None:
476             parser.error('''Bitcoin configuration file didn't contain an rpcpassword= line! Add one!''')
477     
478     if args.bitcoind_rpc_username is None:
479         args.bitcoind_rpc_username = ''
480     
481     if args.bitcoind_rpc_port is None:
482         args.bitcoind_rpc_port = net.PARENT.RPC_PORT
483     
484     if args.bitcoind_p2p_port is None:
485         args.bitcoind_p2p_port = net.PARENT.P2P_PORT
486     
487     if args.p2pool_port is None:
488         args.p2pool_port = net.P2P_PORT
489     
490     if args.p2pool_outgoing_conns > 10:
491         parser.error('''--outgoing-conns can't be more than 10''')
492     
493     if args.worker_endpoint is None:
494         worker_endpoint = '', net.WORKER_PORT
495     elif ':' not in args.worker_endpoint:
496         worker_endpoint = '', int(args.worker_endpoint)
497     else:
498         addr, port = args.worker_endpoint.rsplit(':', 1)
499         worker_endpoint = addr, int(port)
500     
501     if args.address is not None:
502         try:
503             args.pubkey_hash = bitcoin_data.address_to_pubkey_hash(args.address, net.PARENT)
504         except Exception, e:
505             parser.error('error parsing address: ' + repr(e))
506     else:
507         args.pubkey_hash = None
508     
509     def separate_url(url):
510         s = urlparse.urlsplit(url)
511         if '@' not in s.netloc:
512             parser.error('merged url netloc must contain an "@"')
513         userpass, new_netloc = s.netloc.rsplit('@', 1)
514         return urlparse.urlunsplit(s._replace(netloc=new_netloc)), userpass
515     merged_urls = map(separate_url, args.merged_urls)
516     
517     if args.logfile is None:
518         args.logfile = os.path.join(datadir_path, 'log')
519     
520     logfile = logging.LogFile(args.logfile)
521     pipe = logging.TimestampingPipe(logging.TeePipe([logging.EncodeReplacerPipe(sys.stderr), logfile]))
522     sys.stdout = logging.AbortPipe(pipe)
523     sys.stderr = log.DefaultObserver.stderr = logging.AbortPipe(logging.PrefixPipe(pipe, '> '))
524     if hasattr(signal, "SIGUSR1"):
525         def sigusr1(signum, frame):
526             print 'Caught SIGUSR1, closing %r...' % (args.logfile,)
527             logfile.reopen()
528             print '...and reopened %r after catching SIGUSR1.' % (args.logfile,)
529         signal.signal(signal.SIGUSR1, sigusr1)
530     task.LoopingCall(logfile.reopen).start(5)
531     
532     class ErrorReporter(object):
533         def __init__(self):
534             self.last_sent = None
535         
536         def emit(self, eventDict):
537             if not eventDict["isError"]:
538                 return
539             
540             if self.last_sent is not None and time.time() < self.last_sent + 5:
541                 return
542             self.last_sent = time.time()
543             
544             if 'failure' in eventDict:
545                 text = ((eventDict.get('why') or 'Unhandled Error')
546                     + '\n' + eventDict['failure'].getTraceback())
547             else:
548                 text = " ".join([str(m) for m in eventDict["message"]]) + "\n"
549             
550             from twisted.web import client
551             client.getPage(
552                 url='http://u.forre.st/p2pool_error.cgi',
553                 method='POST',
554                 postdata=p2pool.__version__ + ' ' + net.NAME + '\n' + text,
555                 timeout=15,
556             ).addBoth(lambda x: None)
557     if not args.no_bugreport:
558         log.addObserver(ErrorReporter().emit)
559     
560     reactor.callWhenRunning(main, args, net, datadir_path, merged_urls, worker_endpoint)
561     reactor.run()