Revert "new incompatible share implementation. will switch over 22 hours after 85...
[p2pool.git] / p2pool / main.py
1 from __future__ import division
2
3 import ConfigParser
4 import StringIO
5 import argparse
6 import base64
7 import json
8 import os
9 import random
10 import sys
11 import time
12 import signal
13 import traceback
14 import urlparse
15
16 if '--iocp' in sys.argv:
17     from twisted.internet import iocpreactor
18     iocpreactor.install()
19 from twisted.internet import defer, reactor, protocol, task
20 from twisted.web import server
21 from twisted.python import log
22 from nattraverso import portmapper, ipdiscover
23
24 import bitcoin.p2p as bitcoin_p2p, bitcoin.getwork as bitcoin_getwork, bitcoin.data as bitcoin_data
25 from bitcoin import worker_interface, height_tracker
26 from util import expiring_dict, jsonrpc, variable, deferral, math, logging, pack
27 from . import p2p, networks, web
28 import p2pool, p2pool.data as p2pool_data
29
30 @deferral.retry('Error getting work from bitcoind:', 3)
31 @defer.inlineCallbacks
32 def getwork(bitcoind):
33     try:
34         work = yield bitcoind.rpc_getmemorypool()
35     except jsonrpc.Error, e:
36         if e.code == -32601: # Method not found
37             print >>sys.stderr, 'Error: Bitcoin version too old! Upgrade to v0.5 or newer!'
38             raise deferral.RetrySilentlyException()
39         raise
40     packed_transactions = [x.decode('hex') for x in work['transactions']]
41     defer.returnValue(dict(
42         version=work['version'],
43         previous_block_hash=int(work['previousblockhash'], 16),
44         transactions=map(bitcoin_data.tx_type.unpack, packed_transactions),
45         merkle_link=bitcoin_data.calculate_merkle_link([0] + map(bitcoin_data.hash256, packed_transactions), 0), # using 0 is a bit of a hack, but will always work when index=0
46         subsidy=work['coinbasevalue'],
47         time=work['time'],
48         bits=bitcoin_data.FloatingIntegerType().unpack(work['bits'].decode('hex')[::-1]) if isinstance(work['bits'], (str, unicode)) else bitcoin_data.FloatingInteger(work['bits']),
49         coinbaseflags=work['coinbaseflags'].decode('hex') if 'coinbaseflags' in work else ''.join(x.decode('hex') for x in work['coinbaseaux'].itervalues()) if 'coinbaseaux' in work else '',
50     ))
51
52 @defer.inlineCallbacks
53 def main(args, net, datadir_path, merged_urls, worker_endpoint):
54     try:
55         print 'p2pool (version %s)' % (p2pool.__version__,)
56         print
57         
58         # connect to bitcoind over JSON-RPC and do initial getmemorypool
59         url = 'http://%s:%i/' % (args.bitcoind_address, args.bitcoind_rpc_port)
60         print '''Testing bitcoind RPC connection to '%s' with username '%s'...''' % (url, args.bitcoind_rpc_username)
61         bitcoind = jsonrpc.Proxy(url, dict(Authorization='Basic ' + base64.b64encode(args.bitcoind_rpc_username + ':' + args.bitcoind_rpc_password)), timeout=30)
62         @deferral.retry('Error while checking Bitcoin connection:', 1)
63         @defer.inlineCallbacks
64         def check():
65             if not (yield net.PARENT.RPC_CHECK)(bitcoind):
66                 print >>sys.stderr, "    Check failed! Make sure that you're connected to the right bitcoind with --bitcoind-rpc-port!"
67                 raise deferral.RetrySilentlyException()
68             temp_work = yield getwork(bitcoind)
69             if not net.VERSION_CHECK((yield bitcoind.rpc_getinfo())['version'], temp_work):
70                 print >>sys.stderr, '    Bitcoin version too old! BIP16 support required! Upgrade to 0.6.0rc4 or greater!'
71                 raise deferral.RetrySilentlyException()
72             defer.returnValue(temp_work)
73         temp_work = yield check()
74         print '    ...success!'
75         print '    Current block hash: %x' % (temp_work['previous_block_hash'],)
76         print
77         
78         # connect to bitcoind over bitcoin-p2p
79         print '''Testing bitcoind P2P connection to '%s:%s'...''' % (args.bitcoind_address, args.bitcoind_p2p_port)
80         factory = bitcoin_p2p.ClientFactory(net.PARENT)
81         reactor.connectTCP(args.bitcoind_address, args.bitcoind_p2p_port, factory)
82         yield factory.getProtocol() # waits until handshake is successful
83         print '    ...success!'
84         print
85         
86         print 'Determining payout address...'
87         if args.pubkey_hash is None:
88             address_path = os.path.join(datadir_path, 'cached_payout_address')
89             
90             if os.path.exists(address_path):
91                 with open(address_path, 'rb') as f:
92                     address = f.read().strip('\r\n')
93                 print '    Loaded cached address: %s...' % (address,)
94             else:
95                 address = None
96             
97             if address is not None:
98                 res = yield deferral.retry('Error validating cached address:', 5)(lambda: bitcoind.rpc_validateaddress(address))()
99                 if not res['isvalid'] or not res['ismine']:
100                     print '    Cached address is either invalid or not controlled by local bitcoind!'
101                     address = None
102             
103             if address is None:
104                 print '    Getting payout address from bitcoind...'
105                 address = yield deferral.retry('Error getting payout address from bitcoind:', 5)(lambda: bitcoind.rpc_getaccountaddress('p2pool'))()
106             
107             with open(address_path, 'wb') as f:
108                 f.write(address)
109             
110             my_pubkey_hash = bitcoin_data.address_to_pubkey_hash(address, net.PARENT)
111         else:
112             my_pubkey_hash = args.pubkey_hash
113         print '    ...success! Payout address:', bitcoin_data.pubkey_hash_to_address(my_pubkey_hash, net.PARENT)
114         print
115         
116         my_share_hashes = set()
117         my_doa_share_hashes = set()
118         
119         tracker = p2pool_data.OkayTracker(net, my_share_hashes, my_doa_share_hashes)
120         shared_share_hashes = set()
121         ss = p2pool_data.ShareStore(os.path.join(datadir_path, 'shares.'), net)
122         known_verified = set()
123         print "Loading shares..."
124         for i, (mode, contents) in enumerate(ss.get_shares()):
125             if mode == 'share':
126                 if contents.hash in tracker.shares:
127                     continue
128                 shared_share_hashes.add(contents.hash)
129                 contents.time_seen = 0
130                 tracker.add(contents)
131                 if len(tracker.shares) % 1000 == 0 and tracker.shares:
132                     print "    %i" % (len(tracker.shares),)
133             elif mode == 'verified_hash':
134                 known_verified.add(contents)
135             else:
136                 raise AssertionError()
137         print "    ...inserting %i verified shares..." % (len(known_verified),)
138         for h in known_verified:
139             if h not in tracker.shares:
140                 ss.forget_verified_share(h)
141                 continue
142             tracker.verified.add(tracker.shares[h])
143         print "    ...done loading %i shares!" % (len(tracker.shares),)
144         print
145         tracker.removed.watch(lambda share: ss.forget_share(share.hash))
146         tracker.verified.removed.watch(lambda share: ss.forget_verified_share(share.hash))
147         tracker.removed.watch(lambda share: shared_share_hashes.discard(share.hash))
148         
149         peer_heads = expiring_dict.ExpiringDict(300) # hash -> peers that know of it
150         
151         pre_current_work = variable.Variable(None)
152         pre_merged_work = variable.Variable({})
153         # information affecting work that should trigger a long-polling update
154         current_work = variable.Variable(None)
155         # information affecting work that should not trigger a long-polling update
156         current_work2 = variable.Variable(None)
157         
158         requested = expiring_dict.ExpiringDict(300)
159         
160         print 'Initializing work...'
161         @defer.inlineCallbacks
162         def set_real_work1():
163             work = yield getwork(bitcoind)
164             current_work2.set(dict(
165                 time=work['time'],
166                 transactions=work['transactions'],
167                 merkle_link=work['merkle_link'],
168                 subsidy=work['subsidy'],
169                 clock_offset=time.time() - work['time'],
170                 last_update=time.time(),
171             )) # second set first because everything hooks on the first
172             pre_current_work.set(dict(
173                 version=work['version'],
174                 previous_block=work['previous_block_hash'],
175                 bits=work['bits'],
176                 coinbaseflags=work['coinbaseflags'],
177             ))
178         yield set_real_work1()
179         
180         get_height_rel_highest = yield height_tracker.get_height_rel_highest_func(bitcoind, factory, lambda: pre_current_work.value['previous_block'], net)
181         
182         def set_real_work2():
183             best, desired = tracker.think(get_height_rel_highest, pre_current_work.value['previous_block'], pre_current_work.value['bits'])
184             
185             t = dict(pre_current_work.value)
186             t['best_share_hash'] = best
187             t['mm_chains'] = pre_merged_work.value
188             current_work.set(t)
189             
190             t = time.time()
191             for peer2, share_hash in desired:
192                 if share_hash not in tracker.tails: # was received in the time tracker.think was running
193                     continue
194                 last_request_time, count = requested.get(share_hash, (None, 0))
195                 if last_request_time is not None and last_request_time - 5 < t < last_request_time + 10 * 1.5**count:
196                     continue
197                 potential_peers = set()
198                 for head in tracker.tails[share_hash]:
199                     potential_peers.update(peer_heads.get(head, set()))
200                 potential_peers = [peer for peer in potential_peers if peer.connected2]
201                 if count == 0 and peer2 is not None and peer2.connected2:
202                     peer = peer2
203                 else:
204                     peer = random.choice(potential_peers) if potential_peers and random.random() > .2 else peer2
205                     if peer is None:
206                         continue
207                 
208                 print 'Requesting parent share %s from %s' % (p2pool_data.format_hash(share_hash), '%s:%i' % peer.addr)
209                 peer.send_getshares(
210                     hashes=[share_hash],
211                     parents=2000,
212                     stops=list(set(tracker.heads) | set(
213                         tracker.get_nth_parent_hash(head, min(max(0, tracker.get_height_and_last(head)[0] - 1), 10)) for head in tracker.heads
214                     ))[:100],
215                 )
216                 requested[share_hash] = t, count + 1
217         pre_current_work.changed.watch(lambda _: set_real_work2())
218         pre_merged_work.changed.watch(lambda _: set_real_work2())
219         set_real_work2()
220         print '    ...success!'
221         print
222         
223         
224         @defer.inlineCallbacks
225         def set_merged_work(merged_url, merged_userpass):
226             merged_proxy = jsonrpc.Proxy(merged_url, dict(Authorization='Basic ' + base64.b64encode(merged_userpass)))
227             while True:
228                 auxblock = yield deferral.retry('Error while calling merged getauxblock:', 1)(merged_proxy.rpc_getauxblock)()
229                 pre_merged_work.set(dict(pre_merged_work.value, **{auxblock['chainid']: dict(
230                     hash=int(auxblock['hash'], 16),
231                     target=pack.IntType(256).unpack(auxblock['target'].decode('hex')),
232                     merged_proxy=merged_proxy,
233                 )}))
234                 yield deferral.sleep(1)
235         for merged_url, merged_userpass in merged_urls:
236             set_merged_work(merged_url, merged_userpass)
237         
238         @pre_merged_work.changed.watch
239         def _(new_merged_work):
240             print 'Got new merged mining work!'
241         
242         # setup p2p logic and join p2pool network
243         
244         class Node(p2p.Node):
245             def handle_shares(self, shares, peer):
246                 if len(shares) > 5:
247                     print 'Processing %i shares from %s...' % (len(shares), '%s:%i' % peer.addr if peer is not None else None)
248                 
249                 new_count = 0
250                 for share in shares:
251                     if share.hash in tracker.shares:
252                         #print 'Got duplicate share, ignoring. Hash: %s' % (p2pool_data.format_hash(share.hash),)
253                         continue
254                     
255                     new_count += 1
256                     
257                     #print 'Received share %s from %r' % (p2pool_data.format_hash(share.hash), share.peer.addr if share.peer is not None else None)
258                     
259                     tracker.add(share)
260                 
261                 if shares and peer is not None:
262                     peer_heads.setdefault(shares[0].hash, set()).add(peer)
263                 
264                 if new_count:
265                     set_real_work2()
266                 
267                 if len(shares) > 5:
268                     print '... done processing %i shares. New: %i Have: %i/~%i' % (len(shares), new_count, len(tracker.shares), 2*net.CHAIN_LENGTH)
269             
270             def handle_share_hashes(self, hashes, peer):
271                 t = time.time()
272                 get_hashes = []
273                 for share_hash in hashes:
274                     if share_hash in tracker.shares:
275                         continue
276                     last_request_time, count = requested.get(share_hash, (None, 0))
277                     if last_request_time is not None and last_request_time - 5 < t < last_request_time + 10 * 1.5**count:
278                         continue
279                     print 'Got share hash, requesting! Hash: %s' % (p2pool_data.format_hash(share_hash),)
280                     get_hashes.append(share_hash)
281                     requested[share_hash] = t, count + 1
282                 
283                 if hashes and peer is not None:
284                     peer_heads.setdefault(hashes[0], set()).add(peer)
285                 if get_hashes:
286                     peer.send_getshares(hashes=get_hashes, parents=0, stops=[])
287             
288             def handle_get_shares(self, hashes, parents, stops, peer):
289                 parents = min(parents, 1000//len(hashes))
290                 stops = set(stops)
291                 shares = []
292                 for share_hash in hashes:
293                     for share in tracker.get_chain(share_hash, min(parents + 1, tracker.get_height(share_hash))):
294                         if share.hash in stops:
295                             break
296                         shares.append(share)
297                 print 'Sending %i shares to %s:%i' % (len(shares), peer.addr[0], peer.addr[1])
298                 return shares
299         
300         @deferral.retry('Error submitting primary block: (will retry)', 10, 10)
301         def submit_block_p2p(block):
302             if factory.conn.value is None:
303                 print >>sys.stderr, 'No bitcoind connection when block submittal attempted! %s%32x' % (net.PARENT.BLOCK_EXPLORER_URL_PREFIX, bitcoin_data.hash256(bitcoin_data.block_header_type.pack(block['header'])))
304                 raise deferral.RetrySilentlyException()
305             factory.conn.value.send_block(block=block)
306         
307         @deferral.retry('Error submitting block: (will retry)', 10, 10)
308         @defer.inlineCallbacks
309         def submit_block_rpc(block, ignore_failure):
310             success = yield bitcoind.rpc_getmemorypool(bitcoin_data.block_type.pack(block).encode('hex'))
311             success_expected = net.PARENT.POW_FUNC(bitcoin_data.block_header_type.pack(block['header'])) <= block['header']['bits'].target
312             if (not success and success_expected and not ignore_failure) or (success and not success_expected):
313                 print >>sys.stderr, 'Block submittal result: %s Expected: %s' % (success, success_expected)
314         
315         def submit_block(block, ignore_failure):
316             submit_block_p2p(block)
317             submit_block_rpc(block, ignore_failure)
318         
319         @tracker.verified.added.watch
320         def _(share):
321             if share.pow_hash <= share.header['bits'].target:
322                 submit_block(share.as_block(tracker), ignore_failure=True)
323                 print
324                 print 'GOT BLOCK FROM PEER! Passing to bitcoind! %s bitcoin: %s%064x' % (p2pool_data.format_hash(share.hash), net.PARENT.BLOCK_EXPLORER_URL_PREFIX, share.header_hash)
325                 print
326                 def spread():
327                     if (get_height_rel_highest(share.header['previous_block']) > -5 or
328                         current_work.value['previous_block'] in [share.header['previous_block'], share.header_hash]):
329                         broadcast_share(share.hash)
330                 spread()
331                 reactor.callLater(5, spread) # so get_height_rel_highest can update
332         
333         print 'Joining p2pool network using port %i...' % (args.p2pool_port,)
334         
335         @defer.inlineCallbacks
336         def parse(x):
337             if ':' in x:
338                 ip, port = x.split(':')
339                 defer.returnValue(((yield reactor.resolve(ip)), int(port)))
340             else:
341                 defer.returnValue(((yield reactor.resolve(x)), net.P2P_PORT))
342         
343         addrs = {}
344         if os.path.exists(os.path.join(datadir_path, 'addrs')):
345             try:
346                 with open(os.path.join(datadir_path, 'addrs'), 'rb') as f:
347                     addrs.update(dict((tuple(k), v) for k, v in json.loads(f.read())))
348             except:
349                 print >>sys.stderr, 'error parsing addrs'
350         elif os.path.exists(os.path.join(datadir_path, 'addrs.txt')):
351             try:
352                 addrs.update(dict(eval(x) for x in open(os.path.join(datadir_path, 'addrs.txt'))))
353             except:
354                 print >>sys.stderr, "error reading addrs.txt"
355         for addr_df in map(parse, net.BOOTSTRAP_ADDRS):
356             try:
357                 addr = yield addr_df
358                 if addr not in addrs:
359                     addrs[addr] = (0, time.time(), time.time())
360             except:
361                 log.err()
362         
363         connect_addrs = set()
364         for addr_df in map(parse, args.p2pool_nodes):
365             try:
366                 connect_addrs.add((yield addr_df))
367             except:
368                 log.err()
369         
370         p2p_node = Node(
371             best_share_hash_func=lambda: current_work.value['best_share_hash'],
372             port=args.p2pool_port,
373             net=net,
374             addr_store=addrs,
375             connect_addrs=connect_addrs,
376             max_incoming_conns=args.p2pool_conns,
377         )
378         p2p_node.start()
379         
380         def save_addrs():
381             with open(os.path.join(datadir_path, 'addrs'), 'wb') as f:
382                 f.write(json.dumps(p2p_node.addr_store.items()))
383         task.LoopingCall(save_addrs).start(60)
384         
385         def broadcast_share(share_hash):
386             shares = []
387             for share in tracker.get_chain(share_hash, min(5, tracker.get_height(share_hash))):
388                 if share.hash in shared_share_hashes:
389                     break
390                 shared_share_hashes.add(share.hash)
391                 shares.append(share)
392             
393             for peer in p2p_node.peers.itervalues():
394                 peer.sendShares([share for share in shares if share.peer is not peer])
395         
396         # send share when the chain changes to their chain
397         current_work.changed.watch(lambda new_work: broadcast_share(new_work['best_share_hash']))
398         
399         def save_shares():
400             for share in tracker.get_chain(current_work.value['best_share_hash'], min(tracker.get_height(current_work.value['best_share_hash']), 2*net.CHAIN_LENGTH)):
401                 ss.add_share(share)
402                 if share.hash in tracker.verified.shares:
403                     ss.add_verified_hash(share.hash)
404         task.LoopingCall(save_shares).start(60)
405         
406         print '    ...success!'
407         print
408         
409         if args.upnp:
410             @defer.inlineCallbacks
411             def upnp_thread():
412                 while True:
413                     try:
414                         is_lan, lan_ip = yield ipdiscover.get_local_ip()
415                         if is_lan:
416                             pm = yield portmapper.get_port_mapper()
417                             yield pm._upnp.add_port_mapping(lan_ip, args.p2pool_port, args.p2pool_port, 'p2pool', 'TCP')
418                     except defer.TimeoutError:
419                         pass
420                     except:
421                         if p2pool.DEBUG:
422                             log.err(None, 'UPnP error:')
423                     yield deferral.sleep(random.expovariate(1/120))
424             upnp_thread()
425         
426         # start listening for workers with a JSON-RPC server
427         
428         print 'Listening for workers on %r port %i...' % (worker_endpoint[0], worker_endpoint[1])
429         
430         # setup worker logic
431         
432         removed_unstales_var = variable.Variable((0, 0, 0))
433         removed_doa_unstales_var = variable.Variable(0)
434         @tracker.verified.removed.watch
435         def _(share):
436             if share.hash in my_share_hashes and tracker.is_child_of(share.hash, current_work.value['best_share_hash']):
437                 assert share.share_data['stale_info'] in [None, 'orphan', 'doa'] # we made these shares in this instance
438                 removed_unstales_var.set((
439                     removed_unstales_var.value[0] + 1,
440                     removed_unstales_var.value[1] + (1 if share.share_data['stale_info'] == 'orphan' else 0),
441                     removed_unstales_var.value[2] + (1 if share.share_data['stale_info'] == 'doa' else 0),
442                 ))
443             if share.hash in my_doa_share_hashes and tracker.is_child_of(share.hash, current_work.value['best_share_hash']):
444                 removed_doa_unstales_var.set(removed_doa_unstales_var.value + 1)
445         
446         def get_stale_counts():
447             '''Returns (orphans, doas), total, (orphans_recorded_in_chain, doas_recorded_in_chain)'''
448             my_shares = len(my_share_hashes)
449             my_doa_shares = len(my_doa_share_hashes)
450             delta = tracker.verified.get_delta_to_last(current_work.value['best_share_hash'])
451             my_shares_in_chain = delta.my_count + removed_unstales_var.value[0]
452             my_doa_shares_in_chain = delta.my_doa_count + removed_doa_unstales_var.value
453             orphans_recorded_in_chain = delta.my_orphan_announce_count + removed_unstales_var.value[1]
454             doas_recorded_in_chain = delta.my_dead_announce_count + removed_unstales_var.value[2]
455             
456             my_shares_not_in_chain = my_shares - my_shares_in_chain
457             my_doa_shares_not_in_chain = my_doa_shares - my_doa_shares_in_chain
458             
459             return (my_shares_not_in_chain - my_doa_shares_not_in_chain, my_doa_shares_not_in_chain), my_shares, (orphans_recorded_in_chain, doas_recorded_in_chain)
460         
461         
462         pseudoshare_received = variable.Event()
463         share_received = variable.Event()
464         local_rate_monitor = math.RateMonitor(10*60)
465         
466         class WorkerBridge(worker_interface.WorkerBridge):
467             def __init__(self):
468                 worker_interface.WorkerBridge.__init__(self)
469                 self.new_work_event = current_work.changed
470                 self.recent_shares_ts_work = []
471             
472             def get_user_details(self, request):
473                 user = request.getUser() if request.getUser() is not None else ''
474                 
475                 desired_pseudoshare_target = None
476                 if '+' in user:
477                     user, desired_pseudoshare_difficulty_str = user.rsplit('+', 1)
478                     try:
479                         desired_pseudoshare_target = bitcoin_data.difficulty_to_target(float(desired_pseudoshare_difficulty_str))
480                     except:
481                         pass
482                 
483                 desired_share_target = 2**256 - 1
484                 if '/' in user:
485                     user, min_diff_str = user.rsplit('/', 1)
486                     try:
487                         desired_share_target = bitcoin_data.difficulty_to_target(float(min_diff_str))
488                     except:
489                         pass
490                 
491                 if random.uniform(0, 100) < args.worker_fee:
492                     pubkey_hash = my_pubkey_hash
493                 else:
494                     try:
495                         pubkey_hash = bitcoin_data.address_to_pubkey_hash(user, net.PARENT)
496                     except: # XXX blah
497                         pubkey_hash = my_pubkey_hash
498                 
499                 return user, pubkey_hash, desired_share_target, desired_pseudoshare_target
500             
501             def preprocess_request(self, request):
502                 user, pubkey_hash, desired_share_target, desired_pseudoshare_target = self.get_user_details(request)
503                 return pubkey_hash, desired_share_target, desired_pseudoshare_target
504             
505             def get_work(self, pubkey_hash, desired_share_target, desired_pseudoshare_target):
506                 if len(p2p_node.peers) == 0 and net.PERSIST:
507                     raise jsonrpc.Error(-12345, u'p2pool is not connected to any peers')
508                 if current_work.value['best_share_hash'] is None and net.PERSIST:
509                     raise jsonrpc.Error(-12345, u'p2pool is downloading shares')
510                 if time.time() > current_work2.value['last_update'] + 60:
511                     raise jsonrpc.Error(-12345, u'lost contact with bitcoind')
512                 
513                 if current_work.value['mm_chains']:
514                     tree, size = bitcoin_data.make_auxpow_tree(current_work.value['mm_chains'])
515                     mm_hashes = [current_work.value['mm_chains'].get(tree.get(i), dict(hash=0))['hash'] for i in xrange(size)]
516                     mm_data = '\xfa\xbemm' + bitcoin_data.aux_pow_coinbase_type.pack(dict(
517                         merkle_root=bitcoin_data.merkle_hash(mm_hashes),
518                         size=size,
519                         nonce=0,
520                     ))
521                     mm_later = [(aux_work, mm_hashes.index(aux_work['hash']), mm_hashes) for chain_id, aux_work in current_work.value['mm_chains'].iteritems()]
522                 else:
523                     mm_data = ''
524                     mm_later = []
525                 
526                 if True:
527                     share_info, generate_tx = p2pool_data.Share.generate_transaction(
528                         tracker=tracker,
529                         share_data=dict(
530                             previous_share_hash=current_work.value['best_share_hash'],
531                             coinbase=(mm_data + current_work.value['coinbaseflags'])[:100],
532                             nonce=random.randrange(2**32),
533                             pubkey_hash=pubkey_hash,
534                             subsidy=current_work2.value['subsidy'],
535                             donation=math.perfect_round(65535*args.donation_percentage/100),
536                             stale_info=(lambda (orphans, doas), total, (orphans_recorded_in_chain, doas_recorded_in_chain):
537                                 'orphan' if orphans > orphans_recorded_in_chain else
538                                 'doa' if doas > doas_recorded_in_chain else
539                                 None
540                             )(*get_stale_counts()),
541                             desired_version=1,
542                         ),
543                         block_target=current_work.value['bits'].target,
544                         desired_timestamp=int(time.time() - current_work2.value['clock_offset']),
545                         desired_target=desired_share_target,
546                         ref_merkle_link=dict(branch=[], index=0),
547                         net=net,
548                     )
549                 
550                 if desired_pseudoshare_target is None:
551                     target = 2**256-1
552                     if len(self.recent_shares_ts_work) == 50:
553                         hash_rate = sum(work for ts, work in self.recent_shares_ts_work[1:])//(self.recent_shares_ts_work[-1][0] - self.recent_shares_ts_work[0][0])
554                         if hash_rate:
555                             target = min(target, int(2**256/hash_rate))
556                 else:
557                     target = desired_pseudoshare_target
558                 target = max(target, share_info['bits'].target)
559                 for aux_work in current_work.value['mm_chains'].itervalues():
560                     target = max(target, aux_work['target'])
561                 target = math.clip(target, net.PARENT.SANE_TARGET_RANGE)
562                 
563                 transactions = [generate_tx] + list(current_work2.value['transactions'])
564                 packed_generate_tx = bitcoin_data.tx_type.pack(generate_tx)
565                 merkle_root = bitcoin_data.check_merkle_link(bitcoin_data.hash256(packed_generate_tx), current_work2.value['merkle_link'])
566                 
567                 getwork_time = time.time()
568                 merkle_link = current_work2.value['merkle_link']
569                 
570                 print 'New work for worker! Difficulty: %.06f Share difficulty: %.06f Total block value: %.6f %s including %i transactions' % (
571                     bitcoin_data.target_to_difficulty(target),
572                     bitcoin_data.target_to_difficulty(share_info['bits'].target),
573                     current_work2.value['subsidy']*1e-8, net.PARENT.SYMBOL,
574                     len(current_work2.value['transactions']),
575                 )
576                 
577                 bits = current_work.value['bits']
578                 previous_block = current_work.value['previous_block']
579                 ba = bitcoin_getwork.BlockAttempt(
580                     version=current_work.value['version'],
581                     previous_block=current_work.value['previous_block'],
582                     merkle_root=merkle_root,
583                     timestamp=current_work2.value['time'],
584                     bits=current_work.value['bits'],
585                     share_target=target,
586                 )
587                 
588                 received_header_hashes = set()
589                 
590                 def got_response(header, request):
591                     header_hash = bitcoin_data.hash256(bitcoin_data.block_header_type.pack(header))
592                     pow_hash = net.PARENT.POW_FUNC(bitcoin_data.block_header_type.pack(header))
593                     try:
594                         if pow_hash <= header['bits'].target or p2pool.DEBUG:
595                             submit_block(dict(header=header, txs=transactions), ignore_failure=False)
596                             if pow_hash <= header['bits'].target:
597                                 print
598                                 print 'GOT BLOCK FROM MINER! Passing to bitcoind! %s%064x' % (net.PARENT.BLOCK_EXPLORER_URL_PREFIX, header_hash)
599                                 print
600                     except:
601                         log.err(None, 'Error while processing potential block:')
602                     
603                     user, _, _, _ = self.get_user_details(request)
604                     assert header['merkle_root'] == merkle_root
605                     assert header['previous_block'] == previous_block
606                     assert header['bits'] == bits
607                     
608                     on_time = current_work.value['best_share_hash'] == share_info['share_data']['previous_share_hash']
609                     
610                     for aux_work, index, hashes in mm_later:
611                         try:
612                             if pow_hash <= aux_work['target'] or p2pool.DEBUG:
613                                 df = deferral.retry('Error submitting merged block: (will retry)', 10, 10)(aux_work['merged_proxy'].rpc_getauxblock)(
614                                     pack.IntType(256, 'big').pack(aux_work['hash']).encode('hex'),
615                                     bitcoin_data.aux_pow_type.pack(dict(
616                                         merkle_tx=dict(
617                                             tx=transactions[0],
618                                             block_hash=header_hash,
619                                             merkle_link=merkle_link,
620                                         ),
621                                         merkle_link=bitcoin_data.calculate_merkle_link(hashes, index),
622                                         parent_block_header=header,
623                                     )).encode('hex'),
624                                 )
625                                 @df.addCallback
626                                 def _(result):
627                                     if result != (pow_hash <= aux_work['target']):
628                                         print >>sys.stderr, 'Merged block submittal result: %s Expected: %s' % (result, pow_hash <= aux_work['target'])
629                                     else:
630                                         print 'Merged block submittal result: %s' % (result,)
631                                 @df.addErrback
632                                 def _(err):
633                                     log.err(err, 'Error submitting merged block:')
634                         except:
635                             log.err(None, 'Error while processing merged mining POW:')
636                     
637                     if pow_hash <= share_info['bits'].target and header_hash not in received_header_hashes:
638                         min_header = dict(header);del min_header['merkle_root']
639                         hash_link = p2pool_data.prefix_to_hash_link(packed_generate_tx[:-32-4], p2pool_data.Share.gentx_before_refhash)
640                         share = p2pool_data.Share(net, None, dict(
641                             min_header=min_header, share_info=share_info, hash_link=hash_link,
642                             ref_merkle_link=dict(branch=[], index=0),
643                         ), merkle_link=merkle_link, other_txs=transactions[1:] if pow_hash <= header['bits'].target else None)
644                         
645                         print 'GOT SHARE! %s %s prev %s age %.2fs%s' % (
646                             request.getUser(),
647                             p2pool_data.format_hash(share.hash),
648                             p2pool_data.format_hash(share.previous_hash),
649                             time.time() - getwork_time,
650                             ' DEAD ON ARRIVAL' if not on_time else '',
651                         )
652                         my_share_hashes.add(share.hash)
653                         if not on_time:
654                             my_doa_share_hashes.add(share.hash)
655                         
656                         tracker.add(share)
657                         if not p2pool.DEBUG:
658                             tracker.verified.add(share)
659                         set_real_work2()
660                         
661                         try:
662                             if pow_hash <= header['bits'].target or p2pool.DEBUG:
663                                 for peer in p2p_node.peers.itervalues():
664                                     peer.sendShares([share])
665                                 shared_share_hashes.add(share.hash)
666                         except:
667                             log.err(None, 'Error forwarding block solution:')
668                         
669                         share_received.happened(bitcoin_data.target_to_average_attempts(share.target), not on_time)
670                     
671                     if pow_hash > target:
672                         print 'Worker %s submitted share with hash > target:' % (request.getUser(),)
673                         print '    Hash:   %56x' % (pow_hash,)
674                         print '    Target: %56x' % (target,)
675                     elif header_hash in received_header_hashes:
676                         print >>sys.stderr, 'Worker %s @ %s submitted share more than once!' % (request.getUser(), request.getClientIP())
677                     else:
678                         received_header_hashes.add(header_hash)
679                         
680                         pseudoshare_received.happened(bitcoin_data.target_to_average_attempts(target), not on_time, user)
681                         self.recent_shares_ts_work.append((time.time(), bitcoin_data.target_to_average_attempts(target)))
682                         while len(self.recent_shares_ts_work) > 50:
683                             self.recent_shares_ts_work.pop(0)
684                         local_rate_monitor.add_datum(dict(work=bitcoin_data.target_to_average_attempts(target), dead=not on_time, user=user))
685                     
686                     return on_time
687                 
688                 return ba, got_response
689         
690         get_current_txouts = lambda: p2pool_data.get_expected_payouts(tracker, current_work.value['best_share_hash'], current_work.value['bits'].target, current_work2.value['subsidy'], net)
691         
692         web_root = web.get_web_root(tracker, current_work, current_work2, get_current_txouts, datadir_path, net, get_stale_counts, my_pubkey_hash, local_rate_monitor, args.worker_fee, p2p_node, my_share_hashes, pseudoshare_received, share_received)
693         worker_interface.WorkerInterface(WorkerBridge()).attach_to(web_root, get_handler=lambda request: request.redirect('/static/'))
694         
695         deferral.retry('Error binding to worker port:', traceback=False)(reactor.listenTCP)(worker_endpoint[1], server.Site(web_root), interface=worker_endpoint[0])
696         
697         with open(os.path.join(os.path.join(datadir_path, 'ready_flag')), 'wb') as f:
698             pass
699         
700         print '    ...success!'
701         print
702         
703         
704         @defer.inlineCallbacks
705         def work_poller():
706             while True:
707                 flag = factory.new_block.get_deferred()
708                 try:
709                     yield set_real_work1()
710                 except:
711                     log.err()
712                 yield defer.DeferredList([flag, deferral.sleep(15)], fireOnOneCallback=True)
713         work_poller()
714         
715         
716         # done!
717         print 'Started successfully!'
718         print 'Go to http://127.0.0.1:%i/ to view graphs and statistics!' % (worker_endpoint[1],)
719         if args.donation_percentage > 0.51:
720             print '''Donating %.1f%% of work towards P2Pool's development. Thanks for the tip!''' % (args.donation_percentage,)
721         elif args.donation_percentage < 0.49:
722             print '''Donating %.1f%% of work towards P2Pool's development. Please donate to encourage further development of P2Pool!''' % (args.donation_percentage,)
723         else:
724             print '''Donating %.1f%% of work towards P2Pool's development. Thank you!''' % (args.donation_percentage,)
725             print 'You can increase this amount with --give-author argument! (or decrease it, if you must)'
726         print
727         
728         
729         if hasattr(signal, 'SIGALRM'):
730             signal.signal(signal.SIGALRM, lambda signum, frame: reactor.callFromThread(
731                 sys.stderr.write, 'Watchdog timer went off at:\n' + ''.join(traceback.format_stack())
732             ))
733             signal.siginterrupt(signal.SIGALRM, False)
734             task.LoopingCall(signal.alarm, 30).start(1)
735         
736         if args.irc_announce:
737             from twisted.words.protocols import irc
738             class IRCClient(irc.IRCClient):
739                 nickname = 'p2pool%02i' % (random.randrange(100),)
740                 channel = net.ANNOUNCE_CHANNEL
741                 def lineReceived(self, line):
742                     if p2pool.DEBUG:
743                         print repr(line)
744                     irc.IRCClient.lineReceived(self, line)
745                 def signedOn(self):
746                     irc.IRCClient.signedOn(self)
747                     self.factory.resetDelay()
748                     self.join(self.channel)
749                     @defer.inlineCallbacks
750                     def new_share(share):
751                         if share.pow_hash <= share.header['bits'].target and abs(share.timestamp - time.time()) < 10*60:
752                             yield deferral.sleep(random.expovariate(1/60))
753                             message = '\x02%s BLOCK FOUND by %s! %s%064x' % (net.NAME.upper(), bitcoin_data.script2_to_address(share.new_script, net.PARENT), net.PARENT.BLOCK_EXPLORER_URL_PREFIX, share.header_hash)
754                             if message not in self.recent_messages:
755                                 self.say(self.channel, message)
756                                 self._remember_message(message)
757                     self.watch_id = tracker.verified.added.watch(new_share)
758                     self.recent_messages = []
759                 def _remember_message(self, message):
760                     self.recent_messages.append(message)
761                     while len(self.recent_messages) > 100:
762                         self.recent_messages.pop(0)
763                 def privmsg(self, user, channel, message):
764                     if channel == self.channel:
765                         self._remember_message(message)
766                 def connectionLost(self, reason):
767                     tracker.verified.added.unwatch(self.watch_id)
768                     print 'IRC connection lost:', reason.getErrorMessage()
769             class IRCClientFactory(protocol.ReconnectingClientFactory):
770                 protocol = IRCClient
771             reactor.connectTCP("irc.freenode.net", 6667, IRCClientFactory())
772         
773         @defer.inlineCallbacks
774         def status_thread():
775             last_str = None
776             last_time = 0
777             while True:
778                 yield deferral.sleep(3)
779                 try:
780                     if time.time() > current_work2.value['last_update'] + 60:
781                         print >>sys.stderr, '''---> LOST CONTACT WITH BITCOIND for %s! Check that it isn't frozen or dead! <---''' % (math.format_dt(time.time() - current_work2.value['last_update']),)
782                     
783                     height = tracker.get_height(current_work.value['best_share_hash'])
784                     this_str = 'P2Pool: %i shares in chain (%i verified/%i total) Peers: %i (%i incoming)' % (
785                         height,
786                         len(tracker.verified.shares),
787                         len(tracker.shares),
788                         len(p2p_node.peers),
789                         sum(1 for peer in p2p_node.peers.itervalues() if peer.incoming),
790                     ) + (' FDs: %i R/%i W' % (len(reactor.getReaders()), len(reactor.getWriters())) if p2pool.DEBUG else '')
791                     
792                     datums, dt = local_rate_monitor.get_datums_in_last()
793                     my_att_s = sum(datum['work']/dt for datum in datums)
794                     this_str += '\n Local: %sH/s in last %s Local dead on arrival: %s Expected time to share: %s' % (
795                         math.format(int(my_att_s)),
796                         math.format_dt(dt),
797                         math.format_binomial_conf(sum(1 for datum in datums if datum['dead']), len(datums), 0.95),
798                         math.format_dt(2**256 / tracker.shares[current_work.value['best_share_hash']].max_target / my_att_s) if my_att_s and current_work.value['best_share_hash'] else '???',
799                     )
800                     
801                     if height > 2:
802                         (stale_orphan_shares, stale_doa_shares), shares, _ = get_stale_counts()
803                         stale_prop = p2pool_data.get_average_stale_prop(tracker, current_work.value['best_share_hash'], min(60*60//net.SHARE_PERIOD, height))
804                         real_att_s = p2pool_data.get_pool_attempts_per_second(tracker, current_work.value['best_share_hash'], min(height - 1, 60*60//net.SHARE_PERIOD)) / (1 - stale_prop)
805                         
806                         this_str += '\n Shares: %i (%i orphan, %i dead) Stale rate: %s Efficiency: %s Current payout: %.4f %s' % (
807                             shares, stale_orphan_shares, stale_doa_shares,
808                             math.format_binomial_conf(stale_orphan_shares + stale_doa_shares, shares, 0.95),
809                             math.format_binomial_conf(stale_orphan_shares + stale_doa_shares, shares, 0.95, lambda x: (1 - x)/(1 - stale_prop)),
810                             get_current_txouts().get(bitcoin_data.pubkey_hash_to_script2(my_pubkey_hash), 0)*1e-8, net.PARENT.SYMBOL,
811                         )
812                         this_str += '\n Pool: %sH/s Stale rate: %.1f%% Expected time to block: %s' % (
813                             math.format(int(real_att_s)),
814                             100*stale_prop,
815                             math.format_dt(2**256 / current_work.value['bits'].target / real_att_s),
816                         )
817                         
818                         for warning in p2pool_data.get_warnings(tracker, current_work, net):
819                             print >>sys.stderr, '#'*40
820                             print >>sys.stderr, '>>> Warning: ' + warning
821                             print >>sys.stderr, '#'*40
822                     
823                     if this_str != last_str or time.time() > last_time + 15:
824                         print this_str
825                         last_str = this_str
826                         last_time = time.time()
827                 except:
828                     log.err()
829         status_thread()
830     except:
831         reactor.stop()
832         log.err(None, 'Fatal error:')
833
834 def run():
835     class FixedArgumentParser(argparse.ArgumentParser):
836         def _read_args_from_files(self, arg_strings):
837             # expand arguments referencing files
838             new_arg_strings = []
839             for arg_string in arg_strings:
840                 
841                 # for regular arguments, just add them back into the list
842                 if not arg_string or arg_string[0] not in self.fromfile_prefix_chars:
843                     new_arg_strings.append(arg_string)
844                 
845                 # replace arguments referencing files with the file content
846                 else:
847                     try:
848                         args_file = open(arg_string[1:])
849                         try:
850                             arg_strings = []
851                             for arg_line in args_file.read().splitlines():
852                                 for arg in self.convert_arg_line_to_args(arg_line):
853                                     arg_strings.append(arg)
854                             arg_strings = self._read_args_from_files(arg_strings)
855                             new_arg_strings.extend(arg_strings)
856                         finally:
857                             args_file.close()
858                     except IOError:
859                         err = sys.exc_info()[1]
860                         self.error(str(err))
861             
862             # return the modified argument list
863             return new_arg_strings
864         
865         def convert_arg_line_to_args(self, arg_line):
866             return [arg for arg in arg_line.split() if arg.strip()]
867     
868     
869     realnets=dict((name, net) for name, net in networks.nets.iteritems() if '_testnet' not in name)
870     
871     parser = FixedArgumentParser(description='p2pool (version %s)' % (p2pool.__version__,), fromfile_prefix_chars='@')
872     parser.add_argument('--version', action='version', version=p2pool.__version__)
873     parser.add_argument('--net',
874         help='use specified network (default: bitcoin)',
875         action='store', choices=sorted(realnets), default='bitcoin', dest='net_name')
876     parser.add_argument('--testnet',
877         help='''use the network's testnet''',
878         action='store_const', const=True, default=False, dest='testnet')
879     parser.add_argument('--debug',
880         help='enable debugging mode',
881         action='store_const', const=True, default=False, dest='debug')
882     parser.add_argument('-a', '--address',
883         help='generate payouts to this address (default: <address requested from bitcoind>)',
884         type=str, action='store', default=None, dest='address')
885     parser.add_argument('--datadir',
886         help='store data in this directory (default: <directory run_p2pool.py is in>/data)',
887         type=str, action='store', default=None, dest='datadir')
888     parser.add_argument('--logfile',
889         help='''log to this file (default: data/<NET>/log)''',
890         type=str, action='store', default=None, dest='logfile')
891     parser.add_argument('--merged',
892         help='call getauxblock on this url to get work for merged mining (example: http://ncuser:ncpass@127.0.0.1:10332/)',
893         type=str, action='append', default=[], dest='merged_urls')
894     parser.add_argument('--give-author', metavar='DONATION_PERCENTAGE',
895         help='donate this percentage of work towards the development of p2pool (default: 0.5)',
896         type=float, action='store', default=0.5, dest='donation_percentage')
897     parser.add_argument('--iocp',
898         help='use Windows IOCP API in order to avoid errors due to large number of sockets being open',
899         action='store_true', default=False, dest='iocp')
900     parser.add_argument('--irc-announce',
901         help='announce any blocks found on irc://irc.freenode.net/#p2pool',
902         action='store_true', default=False, dest='irc_announce')
903     parser.add_argument('--no-bugreport',
904         help='disable submitting caught exceptions to the author',
905         action='store_true', default=False, dest='no_bugreport')
906     
907     p2pool_group = parser.add_argument_group('p2pool interface')
908     p2pool_group.add_argument('--p2pool-port', metavar='PORT',
909         help='use port PORT to listen for connections (forward this port from your router!) (default: %s)' % ', '.join('%s:%i' % (name, net.P2P_PORT) for name, net in sorted(realnets.items())),
910         type=int, action='store', default=None, dest='p2pool_port')
911     p2pool_group.add_argument('-n', '--p2pool-node', metavar='ADDR[:PORT]',
912         help='connect to existing p2pool node at ADDR listening on port PORT (defaults to default p2pool P2P port) in addition to builtin addresses',
913         type=str, action='append', default=[], dest='p2pool_nodes')
914     parser.add_argument('--disable-upnp',
915         help='''don't attempt to use UPnP to forward p2pool's P2P port from the Internet to this computer''',
916         action='store_false', default=True, dest='upnp')
917     p2pool_group.add_argument('--max-conns', metavar='CONNS',
918         help='maximum incoming connections (default: 40)',
919         type=int, action='store', default=40, dest='p2pool_conns')
920     
921     worker_group = parser.add_argument_group('worker interface')
922     worker_group.add_argument('-w', '--worker-port', metavar='PORT or ADDR:PORT',
923         help='listen on PORT on interface with ADDR for RPC connections from miners (default: all interfaces, %s)' % ', '.join('%s:%i' % (name, net.WORKER_PORT) for name, net in sorted(realnets.items())),
924         type=str, action='store', default=None, dest='worker_endpoint')
925     worker_group.add_argument('-f', '--fee', metavar='FEE_PERCENTAGE',
926         help='''charge workers mining to their own bitcoin address (by setting their miner's username to a bitcoin address) this percentage fee to mine on your p2pool instance. Amount displayed at http://127.0.0.1:WORKER_PORT/fee (default: 0)''',
927         type=float, action='store', default=0, dest='worker_fee')
928     
929     bitcoind_group = parser.add_argument_group('bitcoind interface')
930     bitcoind_group.add_argument('--bitcoind-address', metavar='BITCOIND_ADDRESS',
931         help='connect to this address (default: 127.0.0.1)',
932         type=str, action='store', default='127.0.0.1', dest='bitcoind_address')
933     bitcoind_group.add_argument('--bitcoind-rpc-port', metavar='BITCOIND_RPC_PORT',
934         help='''connect to JSON-RPC interface at this port (default: %s <read from bitcoin.conf if password not provided>)''' % ', '.join('%s:%i' % (name, net.PARENT.RPC_PORT) for name, net in sorted(realnets.items())),
935         type=int, action='store', default=None, dest='bitcoind_rpc_port')
936     bitcoind_group.add_argument('--bitcoind-p2p-port', metavar='BITCOIND_P2P_PORT',
937         help='''connect to P2P interface at this port (default: %s <read from bitcoin.conf if password not provided>)''' % ', '.join('%s:%i' % (name, net.PARENT.P2P_PORT) for name, net in sorted(realnets.items())),
938         type=int, action='store', default=None, dest='bitcoind_p2p_port')
939     
940     bitcoind_group.add_argument(metavar='BITCOIND_RPCUSERPASS',
941         help='bitcoind RPC interface username, then password, space-separated (only one being provided will cause the username to default to being empty, and none will cause P2Pool to read them from bitcoin.conf)',
942         type=str, action='store', default=[], nargs='*', dest='bitcoind_rpc_userpass')
943     
944     args = parser.parse_args()
945     
946     if args.debug:
947         p2pool.DEBUG = True
948     
949     net_name = args.net_name + ('_testnet' if args.testnet else '')
950     net = networks.nets[net_name]
951     
952     datadir_path = os.path.join((os.path.join(os.path.dirname(sys.argv[0]), 'data') if args.datadir is None else args.datadir), net_name)
953     if not os.path.exists(datadir_path):
954         os.makedirs(datadir_path)
955     
956     if len(args.bitcoind_rpc_userpass) > 2:
957         parser.error('a maximum of two arguments are allowed')
958     args.bitcoind_rpc_username, args.bitcoind_rpc_password = ([None, None] + args.bitcoind_rpc_userpass)[-2:]
959     
960     if args.bitcoind_rpc_password is None:
961         if not hasattr(net.PARENT, 'CONF_FILE_FUNC'):
962             parser.error('This network has no configuration file function. Manually enter your RPC password.')
963         conf_path = net.PARENT.CONF_FILE_FUNC()
964         if not os.path.exists(conf_path):
965             parser.error('''Bitcoin configuration file not found. Manually enter your RPC password.\r\n'''
966                 '''If you actually haven't created a configuration file, you should create one at %s with the text:\r\n'''
967                 '''\r\n'''
968                 '''server=1\r\n'''
969                 '''rpcpassword=%x''' % (conf_path, random.randrange(2**128)))
970         with open(conf_path, 'rb') as f:
971             cp = ConfigParser.RawConfigParser()
972             cp.readfp(StringIO.StringIO('[x]\r\n' + f.read()))
973             for conf_name, var_name, var_type in [
974                 ('rpcuser', 'bitcoind_rpc_username', str),
975                 ('rpcpassword', 'bitcoind_rpc_password', str),
976                 ('rpcport', 'bitcoind_rpc_port', int),
977                 ('port', 'bitcoind_p2p_port', int),
978             ]:
979                 if getattr(args, var_name) is None and cp.has_option('x', conf_name):
980                     setattr(args, var_name, var_type(cp.get('x', conf_name)))
981         if args.bitcoind_rpc_password is None:
982             parser.error('''Bitcoin configuration file didn't contain an rpcpassword= line! Add one!''')
983     
984     if args.bitcoind_rpc_username is None:
985         args.bitcoind_rpc_username = ''
986     
987     if args.bitcoind_rpc_port is None:
988         args.bitcoind_rpc_port = net.PARENT.RPC_PORT
989     
990     if args.bitcoind_p2p_port is None:
991         args.bitcoind_p2p_port = net.PARENT.P2P_PORT
992     
993     if args.p2pool_port is None:
994         args.p2pool_port = net.P2P_PORT
995     
996     if args.worker_endpoint is None:
997         worker_endpoint = '', net.WORKER_PORT
998     elif ':' not in args.worker_endpoint:
999         worker_endpoint = '', int(args.worker_endpoint)
1000     else:
1001         addr, port = args.worker_endpoint.rsplit(':', 1)
1002         worker_endpoint = addr, int(port)
1003     
1004     if args.address is not None:
1005         try:
1006             args.pubkey_hash = bitcoin_data.address_to_pubkey_hash(args.address, net.PARENT)
1007         except Exception, e:
1008             parser.error('error parsing address: ' + repr(e))
1009     else:
1010         args.pubkey_hash = None
1011     
1012     def separate_url(url):
1013         s = urlparse.urlsplit(url)
1014         if '@' not in s.netloc:
1015             parser.error('merged url netloc must contain an "@"')
1016         userpass, new_netloc = s.netloc.rsplit('@', 1)
1017         return urlparse.urlunsplit(s._replace(netloc=new_netloc)), userpass
1018     merged_urls = map(separate_url, args.merged_urls)
1019     
1020     if args.logfile is None:
1021         args.logfile = os.path.join(datadir_path, 'log')
1022     
1023     logfile = logging.LogFile(args.logfile)
1024     pipe = logging.TimestampingPipe(logging.TeePipe([logging.EncodeReplacerPipe(sys.stderr), logfile]))
1025     sys.stdout = logging.AbortPipe(pipe)
1026     sys.stderr = log.DefaultObserver.stderr = logging.AbortPipe(logging.PrefixPipe(pipe, '> '))
1027     if hasattr(signal, "SIGUSR1"):
1028         def sigusr1(signum, frame):
1029             print 'Caught SIGUSR1, closing %r...' % (args.logfile,)
1030             logfile.reopen()
1031             print '...and reopened %r after catching SIGUSR1.' % (args.logfile,)
1032         signal.signal(signal.SIGUSR1, sigusr1)
1033     task.LoopingCall(logfile.reopen).start(5)
1034     
1035     class ErrorReporter(object):
1036         def __init__(self):
1037             self.last_sent = None
1038         
1039         def emit(self, eventDict):
1040             if not eventDict["isError"]:
1041                 return
1042             
1043             if self.last_sent is not None and time.time() < self.last_sent + 5:
1044                 return
1045             self.last_sent = time.time()
1046             
1047             if 'failure' in eventDict:
1048                 text = ((eventDict.get('why') or 'Unhandled Error')
1049                     + '\n' + eventDict['failure'].getTraceback())
1050             else:
1051                 text = " ".join([str(m) for m in eventDict["message"]]) + "\n"
1052             
1053             from twisted.web import client
1054             client.getPage(
1055                 url='http://u.forre.st/p2pool_error.cgi',
1056                 method='POST',
1057                 postdata=p2pool.__version__ + ' ' + net.NAME + '\n' + text,
1058                 timeout=15,
1059             ).addBoth(lambda x: None)
1060     if not args.no_bugreport:
1061         log.addObserver(ErrorReporter().emit)
1062     
1063     reactor.callWhenRunning(main, args, net, datadir_path, merged_urls, worker_endpoint)
1064     reactor.run()