add network name to bug reports
[p2pool.git] / p2pool / main.py
1 from __future__ import division
2
3 import ConfigParser
4 import StringIO
5 import argparse
6 import base64
7 import json
8 import os
9 import random
10 import sys
11 import time
12 import signal
13 import traceback
14 import urlparse
15
16 try:
17     from twisted.internet import iocpreactor
18     iocpreactor.install()
19 except:
20     pass
21 else:
22     print 'Using IOCP reactor!'
23 from twisted.internet import defer, reactor, protocol, task
24 from twisted.web import server
25 from twisted.python import log
26 from nattraverso import portmapper, ipdiscover
27
28 import bitcoin.p2p as bitcoin_p2p, bitcoin.getwork as bitcoin_getwork, bitcoin.data as bitcoin_data
29 from bitcoin import worker_interface, height_tracker
30 from util import expiring_dict, jsonrpc, variable, deferral, math, logging, pack
31 from . import p2p, networks, web
32 import p2pool, p2pool.data as p2pool_data
33
34 @deferral.retry('Error getting work from bitcoind:', 3)
35 @defer.inlineCallbacks
36 def getwork(bitcoind):
37     try:
38         work = yield bitcoind.rpc_getmemorypool()
39     except jsonrpc.Error, e:
40         if e.code == -32601: # Method not found
41             print >>sys.stderr, 'Error: Bitcoin version too old! Upgrade to v0.5 or newer!'
42             raise deferral.RetrySilentlyException()
43         raise
44     packed_transactions = [x.decode('hex') for x in work['transactions']]
45     defer.returnValue(dict(
46         version=work['version'],
47         previous_block_hash=int(work['previousblockhash'], 16),
48         transactions=map(bitcoin_data.tx_type.unpack, packed_transactions),
49         merkle_link=bitcoin_data.calculate_merkle_link([0] + map(bitcoin_data.hash256, packed_transactions), 0), # using 0 is a bit of a hack, but will always work when index=0
50         subsidy=work['coinbasevalue'],
51         time=work['time'],
52         bits=bitcoin_data.FloatingIntegerType().unpack(work['bits'].decode('hex')[::-1]) if isinstance(work['bits'], (str, unicode)) else bitcoin_data.FloatingInteger(work['bits']),
53         coinbaseflags=work['coinbaseflags'].decode('hex') if 'coinbaseflags' in work else ''.join(x.decode('hex') for x in work['coinbaseaux'].itervalues()) if 'coinbaseaux' in work else '',
54     ))
55
56 @defer.inlineCallbacks
57 def main(args, net, datadir_path, merged_urls, worker_endpoint):
58     try:
59         print 'p2pool (version %s)' % (p2pool.__version__,)
60         print
61         
62         # connect to bitcoind over JSON-RPC and do initial getmemorypool
63         url = 'http://%s:%i/' % (args.bitcoind_address, args.bitcoind_rpc_port)
64         print '''Testing bitcoind RPC connection to '%s' with username '%s'...''' % (url, args.bitcoind_rpc_username)
65         bitcoind = jsonrpc.Proxy(url, dict(Authorization='Basic ' + base64.b64encode(args.bitcoind_rpc_username + ':' + args.bitcoind_rpc_password)), timeout=30)
66         @deferral.retry('Error while checking Bitcoin connection:', 1)
67         @defer.inlineCallbacks
68         def check():
69             if not (yield net.PARENT.RPC_CHECK)(bitcoind):
70                 print >>sys.stderr, "    Check failed! Make sure that you're connected to the right bitcoind with --bitcoind-rpc-port!"
71                 raise deferral.RetrySilentlyException()
72             temp_work = yield getwork(bitcoind)
73             if not net.VERSION_CHECK((yield bitcoind.rpc_getinfo())['version'], temp_work):
74                 print >>sys.stderr, '    Bitcoin version too old! BIP16 support required! Upgrade to 0.6.0rc4 or greater!'
75                 raise deferral.RetrySilentlyException()
76             defer.returnValue(temp_work)
77         temp_work = yield check()
78         print '    ...success!'
79         print '    Current block hash: %x' % (temp_work['previous_block_hash'],)
80         print
81         
82         # connect to bitcoind over bitcoin-p2p
83         print '''Testing bitcoind P2P connection to '%s:%s'...''' % (args.bitcoind_address, args.bitcoind_p2p_port)
84         factory = bitcoin_p2p.ClientFactory(net.PARENT)
85         reactor.connectTCP(args.bitcoind_address, args.bitcoind_p2p_port, factory)
86         yield factory.getProtocol() # waits until handshake is successful
87         print '    ...success!'
88         print
89         
90         print 'Determining payout address...'
91         if args.pubkey_hash is None:
92             address_path = os.path.join(datadir_path, 'cached_payout_address')
93             
94             if os.path.exists(address_path):
95                 with open(address_path, 'rb') as f:
96                     address = f.read().strip('\r\n')
97                 print '    Loaded cached address: %s...' % (address,)
98             else:
99                 address = None
100             
101             if address is not None:
102                 res = yield deferral.retry('Error validating cached address:', 5)(lambda: bitcoind.rpc_validateaddress(address))()
103                 if not res['isvalid'] or not res['ismine']:
104                     print '    Cached address is either invalid or not controlled by local bitcoind!'
105                     address = None
106             
107             if address is None:
108                 print '    Getting payout address from bitcoind...'
109                 address = yield deferral.retry('Error getting payout address from bitcoind:', 5)(lambda: bitcoind.rpc_getaccountaddress('p2pool'))()
110             
111             with open(address_path, 'wb') as f:
112                 f.write(address)
113             
114             my_pubkey_hash = bitcoin_data.address_to_pubkey_hash(address, net.PARENT)
115         else:
116             my_pubkey_hash = args.pubkey_hash
117         print '    ...success! Payout address:', bitcoin_data.pubkey_hash_to_address(my_pubkey_hash, net.PARENT)
118         print
119         
120         my_share_hashes = set()
121         my_doa_share_hashes = set()
122         
123         tracker = p2pool_data.OkayTracker(net, my_share_hashes, my_doa_share_hashes)
124         shared_share_hashes = set()
125         ss = p2pool_data.ShareStore(os.path.join(datadir_path, 'shares.'), net)
126         known_verified = set()
127         print "Loading shares..."
128         for i, (mode, contents) in enumerate(ss.get_shares()):
129             if mode == 'share':
130                 if contents.hash in tracker.shares:
131                     continue
132                 shared_share_hashes.add(contents.hash)
133                 contents.time_seen = 0
134                 tracker.add(contents)
135                 if len(tracker.shares) % 1000 == 0 and tracker.shares:
136                     print "    %i" % (len(tracker.shares),)
137             elif mode == 'verified_hash':
138                 known_verified.add(contents)
139             else:
140                 raise AssertionError()
141         print "    ...inserting %i verified shares..." % (len(known_verified),)
142         for h in known_verified:
143             if h not in tracker.shares:
144                 ss.forget_verified_share(h)
145                 continue
146             tracker.verified.add(tracker.shares[h])
147         print "    ...done loading %i shares!" % (len(tracker.shares),)
148         print
149         tracker.removed.watch(lambda share: ss.forget_share(share.hash))
150         tracker.verified.removed.watch(lambda share: ss.forget_verified_share(share.hash))
151         tracker.removed.watch(lambda share: shared_share_hashes.discard(share.hash))
152         
153         peer_heads = expiring_dict.ExpiringDict(300) # hash -> peers that know of it
154         
155         pre_current_work = variable.Variable(None)
156         pre_merged_work = variable.Variable({})
157         # information affecting work that should trigger a long-polling update
158         current_work = variable.Variable(None)
159         # information affecting work that should not trigger a long-polling update
160         current_work2 = variable.Variable(None)
161         
162         requested = expiring_dict.ExpiringDict(300)
163         
164         print 'Initializing work...'
165         @defer.inlineCallbacks
166         def set_real_work1():
167             work = yield getwork(bitcoind)
168             current_work2.set(dict(
169                 time=work['time'],
170                 transactions=work['transactions'],
171                 merkle_link=work['merkle_link'],
172                 subsidy=work['subsidy'],
173                 clock_offset=time.time() - work['time'],
174                 last_update=time.time(),
175             )) # second set first because everything hooks on the first
176             pre_current_work.set(dict(
177                 version=work['version'],
178                 previous_block=work['previous_block_hash'],
179                 bits=work['bits'],
180                 coinbaseflags=work['coinbaseflags'],
181             ))
182         yield set_real_work1()
183         
184         get_height_rel_highest = yield height_tracker.get_height_rel_highest_func(bitcoind, factory, lambda: pre_current_work.value['previous_block'], net)
185         
186         def set_real_work2():
187             best, desired = tracker.think(get_height_rel_highest, pre_current_work.value['previous_block'], pre_current_work.value['bits'])
188             
189             t = dict(pre_current_work.value)
190             t['best_share_hash'] = best
191             t['mm_chains'] = pre_merged_work.value
192             current_work.set(t)
193             
194             t = time.time()
195             for peer2, share_hash in desired:
196                 if share_hash not in tracker.tails: # was received in the time tracker.think was running
197                     continue
198                 last_request_time, count = requested.get(share_hash, (None, 0))
199                 if last_request_time is not None and last_request_time - 5 < t < last_request_time + 10 * 1.5**count:
200                     continue
201                 potential_peers = set()
202                 for head in tracker.tails[share_hash]:
203                     potential_peers.update(peer_heads.get(head, set()))
204                 potential_peers = [peer for peer in potential_peers if peer.connected2]
205                 if count == 0 and peer2 is not None and peer2.connected2:
206                     peer = peer2
207                 else:
208                     peer = random.choice(potential_peers) if potential_peers and random.random() > .2 else peer2
209                     if peer is None:
210                         continue
211                 
212                 print 'Requesting parent share %s from %s' % (p2pool_data.format_hash(share_hash), '%s:%i' % peer.addr)
213                 peer.send_getshares(
214                     hashes=[share_hash],
215                     parents=2000,
216                     stops=list(set(tracker.heads) | set(
217                         tracker.get_nth_parent_hash(head, min(max(0, tracker.get_height_and_last(head)[0] - 1), 10)) for head in tracker.heads
218                     ))[:100],
219                 )
220                 requested[share_hash] = t, count + 1
221         pre_current_work.changed.watch(lambda _: set_real_work2())
222         pre_merged_work.changed.watch(lambda _: set_real_work2())
223         set_real_work2()
224         print '    ...success!'
225         print
226         
227         
228         @defer.inlineCallbacks
229         def set_merged_work(merged_url, merged_userpass):
230             merged_proxy = jsonrpc.Proxy(merged_url, dict(Authorization='Basic ' + base64.b64encode(merged_userpass)))
231             while True:
232                 auxblock = yield deferral.retry('Error while calling merged getauxblock:', 1)(merged_proxy.rpc_getauxblock)()
233                 pre_merged_work.set(dict(pre_merged_work.value, **{auxblock['chainid']: dict(
234                     hash=int(auxblock['hash'], 16),
235                     target=pack.IntType(256).unpack(auxblock['target'].decode('hex')),
236                     merged_proxy=merged_proxy,
237                 )}))
238                 yield deferral.sleep(1)
239         for merged_url, merged_userpass in merged_urls:
240             set_merged_work(merged_url, merged_userpass)
241         
242         @pre_merged_work.changed.watch
243         def _(new_merged_work):
244             print 'Got new merged mining work!'
245         
246         # setup p2p logic and join p2pool network
247         
248         class Node(p2p.Node):
249             def handle_shares(self, shares, peer):
250                 if len(shares) > 5:
251                     print 'Processing %i shares from %s...' % (len(shares), '%s:%i' % peer.addr if peer is not None else None)
252                 
253                 new_count = 0
254                 for share in shares:
255                     if share.hash in tracker.shares:
256                         #print 'Got duplicate share, ignoring. Hash: %s' % (p2pool_data.format_hash(share.hash),)
257                         continue
258                     
259                     new_count += 1
260                     
261                     #print 'Received share %s from %r' % (p2pool_data.format_hash(share.hash), share.peer.addr if share.peer is not None else None)
262                     
263                     tracker.add(share)
264                 
265                 if shares and peer is not None:
266                     peer_heads.setdefault(shares[0].hash, set()).add(peer)
267                 
268                 if new_count:
269                     set_real_work2()
270                 
271                 if len(shares) > 5:
272                     print '... done processing %i shares. New: %i Have: %i/~%i' % (len(shares), new_count, len(tracker.shares), 2*net.CHAIN_LENGTH)
273             
274             def handle_share_hashes(self, hashes, peer):
275                 t = time.time()
276                 get_hashes = []
277                 for share_hash in hashes:
278                     if share_hash in tracker.shares:
279                         continue
280                     last_request_time, count = requested.get(share_hash, (None, 0))
281                     if last_request_time is not None and last_request_time - 5 < t < last_request_time + 10 * 1.5**count:
282                         continue
283                     print 'Got share hash, requesting! Hash: %s' % (p2pool_data.format_hash(share_hash),)
284                     get_hashes.append(share_hash)
285                     requested[share_hash] = t, count + 1
286                 
287                 if hashes and peer is not None:
288                     peer_heads.setdefault(hashes[0], set()).add(peer)
289                 if get_hashes:
290                     peer.send_getshares(hashes=get_hashes, parents=0, stops=[])
291             
292             def handle_get_shares(self, hashes, parents, stops, peer):
293                 parents = min(parents, 1000//len(hashes))
294                 stops = set(stops)
295                 shares = []
296                 for share_hash in hashes:
297                     for share in tracker.get_chain(share_hash, min(parents + 1, tracker.get_height(share_hash))):
298                         if share.hash in stops:
299                             break
300                         shares.append(share)
301                 print 'Sending %i shares to %s:%i' % (len(shares), peer.addr[0], peer.addr[1])
302                 return shares
303         
304         @deferral.retry('Error submitting primary block: (will retry)', 10, 10)
305         def submit_block_p2p(block):
306             if factory.conn.value is None:
307                 print >>sys.stderr, 'No bitcoind connection when block submittal attempted! %s%32x' % (net.PARENT.BLOCK_EXPLORER_URL_PREFIX, header_hash)
308                 raise deferral.RetrySilentlyException()
309             factory.conn.value.send_block(block=block)
310         
311         @deferral.retry('Error submitting block: (will retry)', 10, 10)
312         @defer.inlineCallbacks
313         def submit_block_rpc(block, ignore_failure):
314             success = yield bitcoind.rpc_getmemorypool(bitcoin_data.block_type.pack(block).encode('hex'))
315             success_expected = net.PARENT.POW_FUNC(bitcoin_data.block_header_type.pack(block['header'])) <= block['header']['bits'].target
316             if (not success and success_expected and not ignore_failure) or (success and not success_expected):
317                 print >>sys.stderr, 'Block submittal result: %s Expected: %s' % (success, success_expected)
318         
319         def submit_block(block, ignore_failure):
320             submit_block_p2p(block)
321             submit_block_rpc(block, ignore_failure)
322         
323         @tracker.verified.added.watch
324         def _(share):
325             if share.pow_hash <= share.header['bits'].target:
326                 submit_block(share.as_block(tracker), ignore_failure=True)
327                 print
328                 print 'GOT BLOCK FROM PEER! Passing to bitcoind! %s bitcoin: %s%064x' % (p2pool_data.format_hash(share.hash), net.PARENT.BLOCK_EXPLORER_URL_PREFIX, share.header_hash)
329                 print
330                 def spread():
331                     if (get_height_rel_highest(share.header['previous_block']) > -5 or
332                         current_work.value['previous_block'] in [share.header['previous_block'], share.header_hash]):
333                         broadcast_share(share.hash)
334                 spread()
335                 reactor.callLater(5, spread) # so get_height_rel_highest can update
336         
337         print 'Joining p2pool network using port %i...' % (args.p2pool_port,)
338         
339         @defer.inlineCallbacks
340         def parse(x):
341             if ':' in x:
342                 ip, port = x.split(':')
343                 defer.returnValue(((yield reactor.resolve(ip)), int(port)))
344             else:
345                 defer.returnValue(((yield reactor.resolve(x)), net.P2P_PORT))
346         
347         addrs = {}
348         if os.path.exists(os.path.join(datadir_path, 'addrs')):
349             try:
350                 with open(os.path.join(datadir_path, 'addrs'), 'rb') as f:
351                     addrs.update(dict((tuple(k), v) for k, v in json.loads(f.read())))
352             except:
353                 print >>sys.stderr, 'error parsing addrs'
354         elif os.path.exists(os.path.join(datadir_path, 'addrs.txt')):
355             try:
356                 addrs.update(dict(eval(x) for x in open(os.path.join(datadir_path, 'addrs.txt'))))
357             except:
358                 print >>sys.stderr, "error reading addrs.txt"
359         for addr_df in map(parse, net.BOOTSTRAP_ADDRS):
360             try:
361                 addr = yield addr_df
362                 if addr not in addrs:
363                     addrs[addr] = (0, time.time(), time.time())
364             except:
365                 log.err()
366         
367         connect_addrs = set()
368         for addr_df in map(parse, args.p2pool_nodes):
369             try:
370                 connect_addrs.add((yield addr_df))
371             except:
372                 log.err()
373         
374         p2p_node = Node(
375             best_share_hash_func=lambda: current_work.value['best_share_hash'],
376             port=args.p2pool_port,
377             net=net,
378             addr_store=addrs,
379             connect_addrs=connect_addrs,
380             max_incoming_conns=args.p2pool_conns,
381         )
382         p2p_node.start()
383         
384         def save_addrs():
385             with open(os.path.join(datadir_path, 'addrs'), 'wb') as f:
386                 f.write(json.dumps(p2p_node.addr_store.items()))
387         task.LoopingCall(save_addrs).start(60)
388         
389         def broadcast_share(share_hash):
390             shares = []
391             for share in tracker.get_chain(share_hash, min(5, tracker.get_height(share_hash))):
392                 if share.hash in shared_share_hashes:
393                     break
394                 shared_share_hashes.add(share.hash)
395                 shares.append(share)
396             
397             for peer in p2p_node.peers.itervalues():
398                 peer.sendShares([share for share in shares if share.peer is not peer])
399         
400         # send share when the chain changes to their chain
401         current_work.changed.watch(lambda new_work: broadcast_share(new_work['best_share_hash']))
402         
403         def save_shares():
404             for share in tracker.get_chain(current_work.value['best_share_hash'], min(tracker.get_height(current_work.value['best_share_hash']), 2*net.CHAIN_LENGTH)):
405                 ss.add_share(share)
406                 if share.hash in tracker.verified.shares:
407                     ss.add_verified_hash(share.hash)
408         task.LoopingCall(save_shares).start(60)
409         
410         print '    ...success!'
411         print
412         
413         if args.upnp:
414             @defer.inlineCallbacks
415             def upnp_thread():
416                 while True:
417                     try:
418                         is_lan, lan_ip = yield ipdiscover.get_local_ip()
419                         if is_lan:
420                             pm = yield portmapper.get_port_mapper()
421                             yield pm._upnp.add_port_mapping(lan_ip, args.p2pool_port, args.p2pool_port, 'p2pool', 'TCP')
422                     except defer.TimeoutError:
423                         pass
424                     except:
425                         if p2pool.DEBUG:
426                             log.err(None, 'UPnP error:')
427                     yield deferral.sleep(random.expovariate(1/120))
428             upnp_thread()
429         
430         # start listening for workers with a JSON-RPC server
431         
432         print 'Listening for workers on %r port %i...' % (worker_endpoint[0], worker_endpoint[1])
433         
434         # setup worker logic
435         
436         removed_unstales_var = variable.Variable((0, 0, 0))
437         removed_doa_unstales_var = variable.Variable(0)
438         @tracker.verified.removed.watch
439         def _(share):
440             if share.hash in my_share_hashes and tracker.is_child_of(share.hash, current_work.value['best_share_hash']):
441                 assert share.share_data['stale_info'] in [0, 253, 254] # we made these shares in this instance
442                 removed_unstales_var.set((
443                     removed_unstales_var.value[0] + 1,
444                     removed_unstales_var.value[1] + (1 if share.share_data['stale_info'] == 253 else 0),
445                     removed_unstales_var.value[2] + (1 if share.share_data['stale_info'] == 254 else 0),
446                 ))
447             if share.hash in my_doa_share_hashes and tracker.is_child_of(share.hash, current_work.value['best_share_hash']):
448                 removed_doa_unstales.set(removed_doa_unstales.value + 1)
449         
450         def get_stale_counts():
451             '''Returns (orphans, doas), total, (orphans_recorded_in_chain, doas_recorded_in_chain)'''
452             my_shares = len(my_share_hashes)
453             my_doa_shares = len(my_doa_share_hashes)
454             delta = tracker.verified.get_delta_to_last(current_work.value['best_share_hash'])
455             my_shares_in_chain = delta.my_count + removed_unstales_var.value[0]
456             my_doa_shares_in_chain = delta.my_doa_count + removed_doa_unstales_var.value
457             orphans_recorded_in_chain = delta.my_orphan_announce_count + removed_unstales_var.value[1]
458             doas_recorded_in_chain = delta.my_dead_announce_count + removed_unstales_var.value[2]
459             
460             my_shares_not_in_chain = my_shares - my_shares_in_chain
461             my_doa_shares_not_in_chain = my_doa_shares - my_doa_shares_in_chain
462             
463             return (my_shares_not_in_chain - my_doa_shares_not_in_chain, my_doa_shares_not_in_chain), my_shares, (orphans_recorded_in_chain, doas_recorded_in_chain)
464         
465         
466         pseudoshare_received = variable.Event()
467         share_received = variable.Event()
468         local_rate_monitor = math.RateMonitor(10*60)
469         
470         class WorkerBridge(worker_interface.WorkerBridge):
471             def __init__(self):
472                 worker_interface.WorkerBridge.__init__(self)
473                 self.new_work_event = current_work.changed
474                 self.recent_shares_ts_work = []
475             
476             def get_user_details(self, request):
477                 user = request.getUser() if request.getUser() is not None else ''
478                 
479                 desired_pseudoshare_target = None
480                 if '+' in user:
481                     user, desired_pseudoshare_difficulty_str = user.rsplit('+', 1)
482                     try:
483                         desired_pseudoshare_target = bitcoin_data.difficulty_to_target(float(desired_pseudoshare_difficulty_str))
484                     except:
485                         pass
486                 
487                 desired_share_target = 2**256 - 1
488                 if '/' in user:
489                     user, min_diff_str = user.rsplit('/', 1)
490                     try:
491                         desired_share_target = bitcoin_data.difficulty_to_target(float(min_diff_str))
492                     except:
493                         pass
494                 
495                 if random.uniform(0, 100) < args.worker_fee:
496                     pubkey_hash = my_pubkey_hash
497                 else:
498                     try:
499                         pubkey_hash = bitcoin_data.address_to_pubkey_hash(user, net.PARENT)
500                     except: # XXX blah
501                         pubkey_hash = my_pubkey_hash
502                 
503                 return user, pubkey_hash, desired_share_target, desired_pseudoshare_target
504             
505             def preprocess_request(self, request):
506                 user, pubkey_hash, desired_share_target, desired_pseudoshare_target = self.get_user_details(request)
507                 return pubkey_hash, desired_share_target, desired_pseudoshare_target
508             
509             def get_work(self, pubkey_hash, desired_share_target, desired_pseudoshare_target):
510                 if len(p2p_node.peers) == 0 and net.PERSIST:
511                     raise jsonrpc.Error(-12345, u'p2pool is not connected to any peers')
512                 if current_work.value['best_share_hash'] is None and net.PERSIST:
513                     raise jsonrpc.Error(-12345, u'p2pool is downloading shares')
514                 if time.time() > current_work2.value['last_update'] + 60:
515                     raise jsonrpc.Error(-12345, u'lost contact with bitcoind')
516                 
517                 if current_work.value['mm_chains']:
518                     tree, size = bitcoin_data.make_auxpow_tree(current_work.value['mm_chains'])
519                     mm_hashes = [current_work.value['mm_chains'].get(tree.get(i), dict(hash=0))['hash'] for i in xrange(size)]
520                     mm_data = '\xfa\xbemm' + bitcoin_data.aux_pow_coinbase_type.pack(dict(
521                         merkle_root=bitcoin_data.merkle_hash(mm_hashes),
522                         size=size,
523                         nonce=0,
524                     ))
525                     mm_later = [(aux_work, mm_hashes.index(aux_work['hash']), mm_hashes) for chain_id, aux_work in current_work.value['mm_chains'].iteritems()]
526                 else:
527                     mm_data = ''
528                     mm_later = []
529                 
530                 if True:
531                     share_info, generate_tx = p2pool_data.Share.generate_transaction(
532                         tracker=tracker,
533                         share_data=dict(
534                             previous_share_hash=current_work.value['best_share_hash'],
535                             coinbase=(mm_data + current_work.value['coinbaseflags'])[:100],
536                             nonce=random.randrange(2**32),
537                             pubkey_hash=pubkey_hash,
538                             subsidy=current_work2.value['subsidy'],
539                             donation=math.perfect_round(65535*args.donation_percentage/100),
540                             stale_info=(lambda (orphans, doas), total, (orphans_recorded_in_chain, doas_recorded_in_chain):
541                                 253 if orphans > orphans_recorded_in_chain else
542                                 254 if doas > doas_recorded_in_chain else
543                                 0
544                             )(*get_stale_counts()),
545                             desired_version=1,
546                         ),
547                         block_target=current_work.value['bits'].target,
548                         desired_timestamp=int(time.time() - current_work2.value['clock_offset']),
549                         desired_target=desired_share_target,
550                         ref_merkle_link=dict(branch=[], index=0),
551                         net=net,
552                     )
553                 
554                 if desired_pseudoshare_target is None:
555                     if len(self.recent_shares_ts_work) == 50:
556                         hash_rate = sum(work for ts, work in self.recent_shares_ts_work[1:])//(self.recent_shares_ts_work[-1][0] - self.recent_shares_ts_work[0][0])
557                         target = min(2**256-1, int(4*2**256/hash_rate))
558                     else:
559                         target = 2**256-1
560                 else:
561                     target = desired_pseudoshare_target
562                 target = max(target, share_info['bits'].target)
563                 for aux_work in current_work.value['mm_chains'].itervalues():
564                     target = max(target, aux_work['target'])
565                 target = min(target, net.PARENT.SANE_MAX_TARGET)
566                 
567                 transactions = [generate_tx] + list(current_work2.value['transactions'])
568                 packed_generate_tx = bitcoin_data.tx_type.pack(generate_tx)
569                 merkle_root = bitcoin_data.check_merkle_link(bitcoin_data.hash256(packed_generate_tx), current_work2.value['merkle_link'])
570                 
571                 getwork_time = time.time()
572                 merkle_link = current_work2.value['merkle_link']
573                 
574                 print 'New work for worker! Difficulty: %.06f Share difficulty: %.06f Total block value: %.6f %s including %i transactions' % (
575                     bitcoin_data.target_to_difficulty(target),
576                     bitcoin_data.target_to_difficulty(share_info['bits'].target),
577                     current_work2.value['subsidy']*1e-8, net.PARENT.SYMBOL,
578                     len(current_work2.value['transactions']),
579                 )
580                 
581                 ba = bitcoin_getwork.BlockAttempt(
582                     version=current_work.value['version'],
583                     previous_block=current_work.value['previous_block'],
584                     merkle_root=merkle_root,
585                     timestamp=current_work2.value['time'],
586                     bits=current_work.value['bits'],
587                     share_target=target,
588                 )
589                 
590                 received_header_hashes = set()
591                 
592                 def got_response(header, request):
593                     header_hash = bitcoin_data.hash256(bitcoin_data.block_header_type.pack(header))
594                     pow_hash = net.PARENT.POW_FUNC(bitcoin_data.block_header_type.pack(header))
595                     try:
596                         if pow_hash <= header['bits'].target or p2pool.DEBUG:
597                             submit_block(dict(header=header, txs=transactions), ignore_failure=False)
598                             if pow_hash <= header['bits'].target:
599                                 print
600                                 print 'GOT BLOCK FROM MINER! Passing to bitcoind! %s%064x' % (net.PARENT.BLOCK_EXPLORER_URL_PREFIX, header_hash)
601                                 print
602                     except:
603                         log.err(None, 'Error while processing potential block:')
604                     
605                     user, _, _, _ = self.get_user_details(request)
606                     assert header['merkle_root'] == merkle_root
607                     
608                     on_time = current_work.value['best_share_hash'] == share_info['share_data']['previous_share_hash']
609                     
610                     for aux_work, index, hashes in mm_later:
611                         try:
612                             if pow_hash <= aux_work['target'] or p2pool.DEBUG:
613                                 df = deferral.retry('Error submitting merged block: (will retry)', 10, 10)(aux_work['merged_proxy'].rpc_getauxblock)(
614                                     pack.IntType(256, 'big').pack(aux_work['hash']).encode('hex'),
615                                     bitcoin_data.aux_pow_type.pack(dict(
616                                         merkle_tx=dict(
617                                             tx=transactions[0],
618                                             block_hash=header_hash,
619                                             merkle_link=merkle_link,
620                                         ),
621                                         merkle_link=bitcoin_data.calculate_merkle_link(hashes, index),
622                                         parent_block_header=header,
623                                     )).encode('hex'),
624                                 )
625                                 @df.addCallback
626                                 def _(result):
627                                     if result != (pow_hash <= aux_work['target']):
628                                         print >>sys.stderr, 'Merged block submittal result: %s Expected: %s' % (result, pow_hash <= aux_work['target'])
629                                     else:
630                                         print 'Merged block submittal result: %s' % (result,)
631                                 @df.addErrback
632                                 def _(err):
633                                     log.err(err, 'Error submitting merged block:')
634                         except:
635                             log.err(None, 'Error while processing merged mining POW:')
636                     
637                     if pow_hash <= share_info['bits'].target and header_hash not in received_header_hashes:
638                         min_header = dict(header);del min_header['merkle_root']
639                         hash_link = p2pool_data.prefix_to_hash_link(packed_generate_tx[:-32-4], p2pool_data.Share.gentx_before_refhash)
640                         share = p2pool_data.Share(net, None, dict(
641                             min_header=min_header, share_info=share_info, hash_link=hash_link,
642                             ref_merkle_link=dict(branch=[], index=0),
643                         ), merkle_link=merkle_link, other_txs=transactions[1:] if pow_hash <= header['bits'].target else None)
644                         
645                         print 'GOT SHARE! %s %s prev %s age %.2fs%s' % (
646                             request.getUser(),
647                             p2pool_data.format_hash(share.hash),
648                             p2pool_data.format_hash(share.previous_hash),
649                             time.time() - getwork_time,
650                             ' DEAD ON ARRIVAL' if not on_time else '',
651                         )
652                         my_share_hashes.add(share.hash)
653                         if not on_time:
654                             my_doa_share_hashes.add(share.hash)
655                         
656                         tracker.add(share)
657                         if not p2pool.DEBUG:
658                             tracker.verified.add(share)
659                         set_real_work2()
660                         
661                         try:
662                             if pow_hash <= header['bits'].target or p2pool.DEBUG:
663                                 for peer in p2p_node.peers.itervalues():
664                                     peer.sendShares([share])
665                                 shared_share_hashes.add(share.hash)
666                         except:
667                             log.err(None, 'Error forwarding block solution:')
668                         
669                         share_received.happened(bitcoin_data.target_to_average_attempts(share.target), not on_time)
670                     
671                     if pow_hash > target:
672                         print 'Worker %s submitted share with hash > target:' % (request.getUser(),)
673                         print '    Hash:   %56x' % (pow_hash,)
674                         print '    Target: %56x' % (target,)
675                     elif header_hash in received_header_hashes:
676                         print >>sys.stderr, 'Worker %s @ %s submitted share more than once!' % (request.getUser(), request.getClientIP())
677                     else:
678                         received_header_hashes.add(header_hash)
679                         
680                         pseudoshare_received.happened(bitcoin_data.target_to_average_attempts(target), not on_time, user)
681                         self.recent_shares_ts_work.append((time.time(), bitcoin_data.target_to_average_attempts(target)))
682                         while len(self.recent_shares_ts_work) > 50:
683                             self.recent_shares_ts_work.pop(0)
684                         local_rate_monitor.add_datum(dict(work=bitcoin_data.target_to_average_attempts(target), dead=not on_time, user=user))
685                     
686                     return on_time
687                 
688                 return ba, got_response
689         
690         get_current_txouts = lambda: p2pool_data.get_expected_payouts(tracker, current_work.value['best_share_hash'], current_work.value['bits'].target, current_work2.value['subsidy'], net)
691         
692         web_root = web.get_web_root(tracker, current_work, current_work2, get_current_txouts, datadir_path, net, get_stale_counts, my_pubkey_hash, local_rate_monitor, args.worker_fee, p2p_node, my_share_hashes, pseudoshare_received, share_received)
693         worker_interface.WorkerInterface(WorkerBridge()).attach_to(web_root, get_handler=lambda request: request.redirect('/static/'))
694         
695         deferral.retry('Error binding to worker port:', traceback=False)(reactor.listenTCP)(worker_endpoint[1], server.Site(web_root), interface=worker_endpoint[0])
696         
697         with open(os.path.join(os.path.join(datadir_path, 'ready_flag')), 'wb') as f:
698             pass
699         
700         print '    ...success!'
701         print
702         
703         
704         @defer.inlineCallbacks
705         def work_poller():
706             while True:
707                 flag = factory.new_block.get_deferred()
708                 try:
709                     yield set_real_work1()
710                 except:
711                     log.err()
712                 yield defer.DeferredList([flag, deferral.sleep(15)], fireOnOneCallback=True)
713         work_poller()
714         
715         
716         # done!
717         print 'Started successfully!'
718         print 'Go to http://127.0.0.1:%i/ to view graphs and statistics!' % (worker_endpoint[1],)
719         if args.donation_percentage > 0.51:
720             print '''Donating %.1f%% of work towards P2Pool's development. Thanks for the tip!''' % (args.donation_percentage,)
721         elif args.donation_percentage < 0.49:
722             print '''Donating %.1f%% of work towards P2Pool's development. Please donate to encourage further development of P2Pool!''' % (args.donation_percentage,)
723         else:
724             print '''Donating %.1f%% of work towards P2Pool's development. Thank you!''' % (args.donation_percentage,)
725             print 'You can increase this amount with --give-author argument! (or decrease it, if you must)'
726         print
727         
728         
729         if hasattr(signal, 'SIGALRM'):
730             signal.signal(signal.SIGALRM, lambda signum, frame: reactor.callFromThread(
731                 sys.stderr.write, 'Watchdog timer went off at:\n' + ''.join(traceback.format_stack())
732             ))
733             signal.siginterrupt(signal.SIGALRM, False)
734             task.LoopingCall(signal.alarm, 30).start(1)
735         
736         if args.irc_announce:
737             from twisted.words.protocols import irc
738             class IRCClient(irc.IRCClient):
739                 nickname = 'p2pool%02i' % (random.randrange(100),)
740                 channel = net.ANNOUNCE_CHANNEL
741                 def lineReceived(self, line):
742                     if p2pool.DEBUG:
743                         print repr(line)
744                     irc.IRCClient.lineReceived(self, line)
745                 def signedOn(self):
746                     irc.IRCClient.signedOn(self)
747                     self.factory.resetDelay()
748                     self.join(self.channel)
749                     @defer.inlineCallbacks
750                     def new_share(share):
751                         if share.pow_hash <= share.header['bits'].target and abs(share.timestamp - time.time()) < 10*60:
752                             yield deferral.sleep(random.expovariate(1/60))
753                             message = '\x02%s BLOCK FOUND by %s! %s%064x' % (net.NAME.upper(), bitcoin_data.script2_to_address(share.new_script, net.PARENT), net.PARENT.BLOCK_EXPLORER_URL_PREFIX, share.header_hash)
754                             if message not in self.recent_messages:
755                                 self.say(self.channel, message)
756                                 self._remember_message(message)
757                     self.watch_id = tracker.verified.added.watch(new_share)
758                     self.recent_messages = []
759                 def _remember_message(self, message):
760                     self.recent_messages.append(message)
761                     while len(self.recent_messages) > 100:
762                         self.recent_messages.pop(0)
763                 def privmsg(self, user, channel, message):
764                     if channel == self.channel:
765                         self._remember_message(message)
766                 def connectionLost(self, reason):
767                     tracker.verified.added.unwatch(self.watch_id)
768                     print 'IRC connection lost:', reason.getErrorMessage()
769             class IRCClientFactory(protocol.ReconnectingClientFactory):
770                 protocol = IRCClient
771             reactor.connectTCP("irc.freenode.net", 6667, IRCClientFactory())
772         
773         @defer.inlineCallbacks
774         def status_thread():
775             last_str = None
776             last_time = 0
777             while True:
778                 yield deferral.sleep(3)
779                 try:
780                     if time.time() > current_work2.value['last_update'] + 60:
781                         print >>sys.stderr, '''---> LOST CONTACT WITH BITCOIND for %s! Check that it isn't frozen or dead! <---''' % (math.format_dt(time.time() - current_work2.value['last_update']),)
782                     
783                     height = tracker.get_height(current_work.value['best_share_hash'])
784                     this_str = 'P2Pool: %i shares in chain (%i verified/%i total) Peers: %i (%i incoming)' % (
785                         height,
786                         len(tracker.verified.shares),
787                         len(tracker.shares),
788                         len(p2p_node.peers),
789                         sum(1 for peer in p2p_node.peers.itervalues() if peer.incoming),
790                     ) + (' FDs: %i R/%i W' % (len(reactor.getReaders()), len(reactor.getWriters())) if p2pool.DEBUG else '')
791                     
792                     datums, dt = local_rate_monitor.get_datums_in_last()
793                     my_att_s = sum(datum['work']/dt for datum in datums)
794                     this_str += '\n Local: %sH/s in last %s Local dead on arrival: %s Expected time to share: %s' % (
795                         math.format(int(my_att_s)),
796                         math.format_dt(dt),
797                         math.format_binomial_conf(sum(1 for datum in datums if datum['dead']), len(datums), 0.95),
798                         math.format_dt(2**256 / tracker.shares[current_work.value['best_share_hash']].max_target / my_att_s) if my_att_s and current_work.value['best_share_hash'] else '???',
799                     )
800                     
801                     if height > 2:
802                         (stale_orphan_shares, stale_doa_shares), shares, _ = get_stale_counts()
803                         stale_prop = p2pool_data.get_average_stale_prop(tracker, current_work.value['best_share_hash'], min(60*60//net.SHARE_PERIOD, height))
804                         real_att_s = p2pool_data.get_pool_attempts_per_second(tracker, current_work.value['best_share_hash'], min(height - 1, 60*60//net.SHARE_PERIOD)) / (1 - stale_prop)
805                         
806                         this_str += '\n Shares: %i (%i orphan, %i dead) Stale rate: %s Efficiency: %s Current payout: %.4f %s' % (
807                             shares, stale_orphan_shares, stale_doa_shares,
808                             math.format_binomial_conf(stale_orphan_shares + stale_doa_shares, shares, 0.95),
809                             math.format_binomial_conf(stale_orphan_shares + stale_doa_shares, shares, 0.95, lambda x: (1 - x)/(1 - stale_prop)),
810                             get_current_txouts().get(bitcoin_data.pubkey_hash_to_script2(my_pubkey_hash), 0)*1e-8, net.PARENT.SYMBOL,
811                         )
812                         this_str += '\n Pool: %sH/s Stale rate: %.1f%% Expected time to block: %s' % (
813                             math.format(int(real_att_s)),
814                             100*stale_prop,
815                             math.format_dt(2**256 / current_work.value['bits'].target / real_att_s),
816                         )
817                         
818                         for warning in p2pool_data.get_warnings(tracker, current_work, net):
819                             print >>sys.stderr, '#'*40
820                             print >>sys.stderr, '>>> Warning: ' + warning
821                             print >>sys.stderr, '#'*40
822                     
823                     if this_str != last_str or time.time() > last_time + 15:
824                         print this_str
825                         last_str = this_str
826                         last_time = time.time()
827                 except:
828                     log.err()
829         status_thread()
830     except:
831         reactor.stop()
832         log.err(None, 'Fatal error:')
833
834 def run():
835     class FixedArgumentParser(argparse.ArgumentParser):
836         def _read_args_from_files(self, arg_strings):
837             # expand arguments referencing files
838             new_arg_strings = []
839             for arg_string in arg_strings:
840                 
841                 # for regular arguments, just add them back into the list
842                 if not arg_string or arg_string[0] not in self.fromfile_prefix_chars:
843                     new_arg_strings.append(arg_string)
844                 
845                 # replace arguments referencing files with the file content
846                 else:
847                     try:
848                         args_file = open(arg_string[1:])
849                         try:
850                             arg_strings = []
851                             for arg_line in args_file.read().splitlines():
852                                 for arg in self.convert_arg_line_to_args(arg_line):
853                                     arg_strings.append(arg)
854                             arg_strings = self._read_args_from_files(arg_strings)
855                             new_arg_strings.extend(arg_strings)
856                         finally:
857                             args_file.close()
858                     except IOError:
859                         err = sys.exc_info()[1]
860                         self.error(str(err))
861             
862             # return the modified argument list
863             return new_arg_strings
864         
865         def convert_arg_line_to_args(self, arg_line):
866             return [arg for arg in arg_line.split() if arg.strip()]
867     
868     
869     realnets=dict((name, net) for name, net in networks.nets.iteritems() if '_testnet' not in name)
870     
871     parser = FixedArgumentParser(description='p2pool (version %s)' % (p2pool.__version__,), fromfile_prefix_chars='@')
872     parser.add_argument('--version', action='version', version=p2pool.__version__)
873     parser.add_argument('--net',
874         help='use specified network (default: bitcoin)',
875         action='store', choices=sorted(realnets), default='bitcoin', dest='net_name')
876     parser.add_argument('--testnet',
877         help='''use the network's testnet''',
878         action='store_const', const=True, default=False, dest='testnet')
879     parser.add_argument('--debug',
880         help='enable debugging mode',
881         action='store_const', const=True, default=False, dest='debug')
882     parser.add_argument('-a', '--address',
883         help='generate payouts to this address (default: <address requested from bitcoind>)',
884         type=str, action='store', default=None, dest='address')
885     parser.add_argument('--datadir',
886         help='store data in this directory (default: <directory run_p2pool.py is in>/data)',
887         type=str, action='store', default=None, dest='datadir')
888     parser.add_argument('--logfile',
889         help='''log to this file (default: data/<NET>/log)''',
890         type=str, action='store', default=None, dest='logfile')
891     parser.add_argument('--merged',
892         help='call getauxblock on this url to get work for merged mining (example: http://ncuser:ncpass@127.0.0.1:10332/)',
893         type=str, action='append', default=[], dest='merged_urls')
894     parser.add_argument('--give-author', metavar='DONATION_PERCENTAGE',
895         help='donate this percentage of work towards the development of p2pool (default: 0.5)',
896         type=float, action='store', default=0.5, dest='donation_percentage')
897     parser.add_argument('--irc-announce',
898         help='announce any blocks found on irc://irc.freenode.net/#p2pool',
899         action='store_true', default=False, dest='irc_announce')
900     parser.add_argument('--no-bugreport',
901         help='disable submitting caught exceptions to the author',
902         action='store_true', default=False, dest='no_bugreport')
903     
904     p2pool_group = parser.add_argument_group('p2pool interface')
905     p2pool_group.add_argument('--p2pool-port', metavar='PORT',
906         help='use port PORT to listen for connections (forward this port from your router!) (default: %s)' % ', '.join('%s:%i' % (name, net.P2P_PORT) for name, net in sorted(realnets.items())),
907         type=int, action='store', default=None, dest='p2pool_port')
908     p2pool_group.add_argument('-n', '--p2pool-node', metavar='ADDR[:PORT]',
909         help='connect to existing p2pool node at ADDR listening on port PORT (defaults to default p2pool P2P port) in addition to builtin addresses',
910         type=str, action='append', default=[], dest='p2pool_nodes')
911     parser.add_argument('--disable-upnp',
912         help='''don't attempt to use UPnP to forward p2pool's P2P port from the Internet to this computer''',
913         action='store_false', default=True, dest='upnp')
914     p2pool_group.add_argument('--max-conns', metavar='CONNS',
915         help='maximum incoming connections (default: 40)',
916         type=int, action='store', default=40, dest='p2pool_conns')
917     
918     worker_group = parser.add_argument_group('worker interface')
919     worker_group.add_argument('-w', '--worker-port', metavar='PORT or ADDR:PORT',
920         help='listen on PORT on interface with ADDR for RPC connections from miners (default: all interfaces, %s)' % ', '.join('%s:%i' % (name, net.WORKER_PORT) for name, net in sorted(realnets.items())),
921         type=str, action='store', default=None, dest='worker_endpoint')
922     worker_group.add_argument('-f', '--fee', metavar='FEE_PERCENTAGE',
923         help='''charge workers mining to their own bitcoin address (by setting their miner's username to a bitcoin address) this percentage fee to mine on your p2pool instance. Amount displayed at http://127.0.0.1:WORKER_PORT/fee (default: 0)''',
924         type=float, action='store', default=0, dest='worker_fee')
925     
926     bitcoind_group = parser.add_argument_group('bitcoind interface')
927     bitcoind_group.add_argument('--bitcoind-address', metavar='BITCOIND_ADDRESS',
928         help='connect to this address (default: 127.0.0.1)',
929         type=str, action='store', default='127.0.0.1', dest='bitcoind_address')
930     bitcoind_group.add_argument('--bitcoind-rpc-port', metavar='BITCOIND_RPC_PORT',
931         help='''connect to JSON-RPC interface at this port (default: %s <read from bitcoin.conf if password not provided>)''' % ', '.join('%s:%i' % (name, net.PARENT.RPC_PORT) for name, net in sorted(realnets.items())),
932         type=int, action='store', default=None, dest='bitcoind_rpc_port')
933     bitcoind_group.add_argument('--bitcoind-p2p-port', metavar='BITCOIND_P2P_PORT',
934         help='''connect to P2P interface at this port (default: %s <read from bitcoin.conf if password not provided>)''' % ', '.join('%s:%i' % (name, net.PARENT.P2P_PORT) for name, net in sorted(realnets.items())),
935         type=int, action='store', default=None, dest='bitcoind_p2p_port')
936     
937     bitcoind_group.add_argument(metavar='BITCOIND_RPCUSERPASS',
938         help='bitcoind RPC interface username, then password, space-separated (only one being provided will cause the username to default to being empty, and none will cause P2Pool to read them from bitcoin.conf)',
939         type=str, action='store', default=[], nargs='*', dest='bitcoind_rpc_userpass')
940     
941     args = parser.parse_args()
942     
943     if args.debug:
944         p2pool.DEBUG = True
945     
946     net_name = args.net_name + ('_testnet' if args.testnet else '')
947     net = networks.nets[net_name]
948     
949     datadir_path = os.path.join((os.path.join(os.path.dirname(sys.argv[0]), 'data') if args.datadir is None else args.datadir), net_name)
950     if not os.path.exists(datadir_path):
951         os.makedirs(datadir_path)
952     
953     if len(args.bitcoind_rpc_userpass) > 2:
954         parser.error('a maximum of two arguments are allowed')
955     args.bitcoind_rpc_username, args.bitcoind_rpc_password = ([None, None] + args.bitcoind_rpc_userpass)[-2:]
956     
957     if args.bitcoind_rpc_password is None:
958         if not hasattr(net.PARENT, 'CONF_FILE_FUNC'):
959             parser.error('This network has no configuration file function. Manually enter your RPC password.')
960         conf_path = net.PARENT.CONF_FILE_FUNC()
961         if not os.path.exists(conf_path):
962             parser.error('''Bitcoin configuration file not found. Manually enter your RPC password.\r\n'''
963                 '''If you actually haven't created a configuration file, you should create one at %s with the text:\r\n'''
964                 '''\r\n'''
965                 '''server=1\r\n'''
966                 '''rpcpassword=%x''' % (conf_path, random.randrange(2**128)))
967         with open(conf_path, 'rb') as f:
968             cp = ConfigParser.RawConfigParser()
969             cp.readfp(StringIO.StringIO('[x]\r\n' + f.read()))
970             for conf_name, var_name, var_type in [
971                 ('rpcuser', 'bitcoind_rpc_username', str),
972                 ('rpcpassword', 'bitcoind_rpc_password', str),
973                 ('rpcport', 'bitcoind_rpc_port', int),
974                 ('port', 'bitcoind_p2p_port', int),
975             ]:
976                 if getattr(args, var_name) is None and cp.has_option('x', conf_name):
977                     setattr(args, var_name, var_type(cp.get('x', conf_name)))
978         if args.bitcoind_rpc_password is None:
979             parser.error('''Bitcoin configuration file didn't contain an rpcpassword= line! Add one!''')
980     
981     if args.bitcoind_rpc_username is None:
982         args.bitcoind_rpc_username = ''
983     
984     if args.bitcoind_rpc_port is None:
985         args.bitcoind_rpc_port = net.PARENT.RPC_PORT
986     
987     if args.bitcoind_p2p_port is None:
988         args.bitcoind_p2p_port = net.PARENT.P2P_PORT
989     
990     if args.p2pool_port is None:
991         args.p2pool_port = net.P2P_PORT
992     
993     if args.worker_endpoint is None:
994         worker_endpoint = '', net.WORKER_PORT
995     elif ':' not in args.worker_endpoint:
996         worker_endpoint = '', int(args.worker_endpoint)
997     else:
998         addr, port = args.worker_endpoint.rsplit(':', 1)
999         worker_endpoint = addr, int(port)
1000     
1001     if args.address is not None:
1002         try:
1003             args.pubkey_hash = bitcoin_data.address_to_pubkey_hash(args.address, net.PARENT)
1004         except Exception, e:
1005             parser.error('error parsing address: ' + repr(e))
1006     else:
1007         args.pubkey_hash = None
1008     
1009     def separate_url(url):
1010         s = urlparse.urlsplit(url)
1011         if '@' not in s.netloc:
1012             parser.error('merged url netloc must contain an "@"')
1013         userpass, new_netloc = s.netloc.rsplit('@', 1)
1014         return urlparse.urlunsplit(s._replace(netloc=new_netloc)), userpass
1015     merged_urls = map(separate_url, args.merged_urls)
1016     
1017     if args.logfile is None:
1018         args.logfile = os.path.join(datadir_path, 'log')
1019     
1020     logfile = logging.LogFile(args.logfile)
1021     pipe = logging.TimestampingPipe(logging.TeePipe([logging.EncodeReplacerPipe(sys.stderr), logfile]))
1022     sys.stdout = logging.AbortPipe(pipe)
1023     sys.stderr = log.DefaultObserver.stderr = logging.AbortPipe(logging.PrefixPipe(pipe, '> '))
1024     if hasattr(signal, "SIGUSR1"):
1025         def sigusr1(signum, frame):
1026             print 'Caught SIGUSR1, closing %r...' % (args.logfile,)
1027             logfile.reopen()
1028             print '...and reopened %r after catching SIGUSR1.' % (args.logfile,)
1029         signal.signal(signal.SIGUSR1, sigusr1)
1030     task.LoopingCall(logfile.reopen).start(5)
1031     
1032     class ErrorReporter(object):
1033         def __init__(self):
1034             self.last_sent = None
1035         
1036         def emit(self, eventDict):
1037             if not eventDict["isError"]:
1038                 return
1039             
1040             if self.last_sent is not None and time.time() < self.last_sent + 5:
1041                 return
1042             self.last_sent = time.time()
1043             
1044             if 'failure' in eventDict:
1045                 text = ((eventDict.get('why') or 'Unhandled Error')
1046                     + '\n' + eventDict['failure'].getTraceback())
1047             else:
1048                 text = " ".join([str(m) for m in eventDict["message"]]) + "\n"
1049             
1050             from twisted.web import client
1051             client.getPage(
1052                 url='http://u.forre.st/p2pool_error.cgi',
1053                 method='POST',
1054                 postdata=p2pool.__version__ + ' ' + net.NAME + '\n' + text,
1055                 timeout=15,
1056             ).addBoth(lambda x: None)
1057     if not args.no_bugreport:
1058         log.addObserver(ErrorReporter().emit)
1059     
1060     reactor.callWhenRunning(main, args, net, datadir_path, merged_urls, worker_endpoint)
1061     reactor.run()