removed some accidentally committed local code
[p2pool.git] / p2pool / main.py
1 from __future__ import division
2
3 import ConfigParser
4 import StringIO
5 import argparse
6 import base64
7 import json
8 import os
9 import random
10 import sys
11 import time
12 import signal
13 import traceback
14 import urlparse
15
16 if '--iocp' in sys.argv:
17     from twisted.internet import iocpreactor
18     iocpreactor.install()
19 from twisted.internet import defer, reactor, protocol, task
20 from twisted.web import server
21 from twisted.python import log
22 from nattraverso import portmapper, ipdiscover
23
24 import bitcoin.p2p as bitcoin_p2p, bitcoin.getwork as bitcoin_getwork, bitcoin.data as bitcoin_data
25 from bitcoin import worker_interface, height_tracker
26 from util import expiring_dict, jsonrpc, variable, deferral, math, logging, pack
27 from . import p2p, networks, web
28 import p2pool, p2pool.data as p2pool_data
29
30 @deferral.retry('Error getting work from bitcoind:', 3)
31 @defer.inlineCallbacks
32 def getwork(bitcoind):
33     try:
34         work = yield bitcoind.rpc_getmemorypool()
35     except jsonrpc.Error, e:
36         if e.code == -32601: # Method not found
37             print >>sys.stderr, 'Error: Bitcoin version too old! Upgrade to v0.5 or newer!'
38             raise deferral.RetrySilentlyException()
39         raise
40     packed_transactions = [x.decode('hex') for x in work['transactions']]
41     defer.returnValue(dict(
42         version=work['version'],
43         previous_block_hash=int(work['previousblockhash'], 16),
44         transactions=map(bitcoin_data.tx_type.unpack, packed_transactions),
45         merkle_link=bitcoin_data.calculate_merkle_link([0] + map(bitcoin_data.hash256, packed_transactions), 0), # using 0 is a bit of a hack, but will always work when index=0
46         subsidy=work['coinbasevalue'],
47         time=work['time'],
48         bits=bitcoin_data.FloatingIntegerType().unpack(work['bits'].decode('hex')[::-1]) if isinstance(work['bits'], (str, unicode)) else bitcoin_data.FloatingInteger(work['bits']),
49         coinbaseflags=work['coinbaseflags'].decode('hex') if 'coinbaseflags' in work else ''.join(x.decode('hex') for x in work['coinbaseaux'].itervalues()) if 'coinbaseaux' in work else '',
50     ))
51
52 @defer.inlineCallbacks
53 def main(args, net, datadir_path, merged_urls, worker_endpoint):
54     try:
55         print 'p2pool (version %s)' % (p2pool.__version__,)
56         print
57         
58         # connect to bitcoind over JSON-RPC and do initial getmemorypool
59         url = 'http://%s:%i/' % (args.bitcoind_address, args.bitcoind_rpc_port)
60         print '''Testing bitcoind RPC connection to '%s' with username '%s'...''' % (url, args.bitcoind_rpc_username)
61         bitcoind = jsonrpc.Proxy(url, dict(Authorization='Basic ' + base64.b64encode(args.bitcoind_rpc_username + ':' + args.bitcoind_rpc_password)), timeout=30)
62         @deferral.retry('Error while checking Bitcoin connection:', 1)
63         @defer.inlineCallbacks
64         def check():
65             if not (yield net.PARENT.RPC_CHECK)(bitcoind):
66                 print >>sys.stderr, "    Check failed! Make sure that you're connected to the right bitcoind with --bitcoind-rpc-port!"
67                 raise deferral.RetrySilentlyException()
68             temp_work = yield getwork(bitcoind)
69             if not net.VERSION_CHECK((yield bitcoind.rpc_getinfo())['version'], temp_work):
70                 print >>sys.stderr, '    Bitcoin version too old! BIP16 support required! Upgrade to 0.6.0rc4 or greater!'
71                 raise deferral.RetrySilentlyException()
72             defer.returnValue(temp_work)
73         temp_work = yield check()
74         print '    ...success!'
75         print '    Current block hash: %x' % (temp_work['previous_block_hash'],)
76         print
77         
78         # connect to bitcoind over bitcoin-p2p
79         print '''Testing bitcoind P2P connection to '%s:%s'...''' % (args.bitcoind_address, args.bitcoind_p2p_port)
80         factory = bitcoin_p2p.ClientFactory(net.PARENT)
81         reactor.connectTCP(args.bitcoind_address, args.bitcoind_p2p_port, factory)
82         yield factory.getProtocol() # waits until handshake is successful
83         print '    ...success!'
84         print
85         
86         print 'Determining payout address...'
87         if args.pubkey_hash is None:
88             address_path = os.path.join(datadir_path, 'cached_payout_address')
89             
90             if os.path.exists(address_path):
91                 with open(address_path, 'rb') as f:
92                     address = f.read().strip('\r\n')
93                 print '    Loaded cached address: %s...' % (address,)
94             else:
95                 address = None
96             
97             if address is not None:
98                 res = yield deferral.retry('Error validating cached address:', 5)(lambda: bitcoind.rpc_validateaddress(address))()
99                 if not res['isvalid'] or not res['ismine']:
100                     print '    Cached address is either invalid or not controlled by local bitcoind!'
101                     address = None
102             
103             if address is None:
104                 print '    Getting payout address from bitcoind...'
105                 address = yield deferral.retry('Error getting payout address from bitcoind:', 5)(lambda: bitcoind.rpc_getaccountaddress('p2pool'))()
106             
107             with open(address_path, 'wb') as f:
108                 f.write(address)
109             
110             my_pubkey_hash = bitcoin_data.address_to_pubkey_hash(address, net.PARENT)
111         else:
112             my_pubkey_hash = args.pubkey_hash
113         print '    ...success! Payout address:', bitcoin_data.pubkey_hash_to_address(my_pubkey_hash, net.PARENT)
114         print
115         
116         my_share_hashes = set()
117         my_doa_share_hashes = set()
118         
119         tracker = p2pool_data.OkayTracker(net, my_share_hashes, my_doa_share_hashes)
120         shared_share_hashes = set()
121         ss = p2pool_data.ShareStore(os.path.join(datadir_path, 'shares.'), net)
122         known_verified = set()
123         print "Loading shares..."
124         for i, (mode, contents) in enumerate(ss.get_shares()):
125             if mode == 'share':
126                 if contents.hash in tracker.shares:
127                     continue
128                 shared_share_hashes.add(contents.hash)
129                 contents.time_seen = 0
130                 tracker.add(contents)
131                 if len(tracker.shares) % 1000 == 0 and tracker.shares:
132                     print "    %i" % (len(tracker.shares),)
133             elif mode == 'verified_hash':
134                 known_verified.add(contents)
135             else:
136                 raise AssertionError()
137         print "    ...inserting %i verified shares..." % (len(known_verified),)
138         for h in known_verified:
139             if h not in tracker.shares:
140                 ss.forget_verified_share(h)
141                 continue
142             tracker.verified.add(tracker.shares[h])
143         print "    ...done loading %i shares!" % (len(tracker.shares),)
144         print
145         tracker.removed.watch(lambda share: ss.forget_share(share.hash))
146         tracker.verified.removed.watch(lambda share: ss.forget_verified_share(share.hash))
147         tracker.removed.watch(lambda share: shared_share_hashes.discard(share.hash))
148         
149         print 'Initializing work...'
150         
151         
152         # BITCOIND WORK
153         
154         bitcoind_work = variable.Variable(None)
155         
156         @defer.inlineCallbacks
157         def poll_bitcoind():
158             work = yield getwork(bitcoind)
159             bitcoind_work.set(dict(
160                 version=work['version'],
161                 previous_block=work['previous_block_hash'],
162                 bits=work['bits'],
163                 coinbaseflags=work['coinbaseflags'],
164                 time=work['time'],
165                 transactions=work['transactions'],
166                 merkle_link=work['merkle_link'],
167                 subsidy=work['subsidy'],
168                 clock_offset=time.time() - work['time'],
169                 last_update=time.time(),
170             ))
171         yield poll_bitcoind()
172         
173         @defer.inlineCallbacks
174         def work_poller():
175             while True:
176                 flag = factory.new_block.get_deferred()
177                 try:
178                     yield poll_bitcoind()
179                 except:
180                     log.err()
181                 yield defer.DeferredList([flag, deferral.sleep(15)], fireOnOneCallback=True)
182         work_poller()
183         
184         # PEER WORK
185         
186         best_block_header = variable.Variable(None)
187         def handle_header(header):
188             # check that header matches current target
189             if not (net.PARENT.POW_FUNC(bitcoin_data.block_header_type.pack(header)) <= bitcoind_work.value['bits'].target):
190                 return
191             if (best_block_header.value is None
192                 or (
193                     header['previous_block'] == current_work.value['previous_block'] and
194                     bitcoin_data.hash256(bitcoin_data.block_header_type.pack(best_block_header.value)) == current_work.value['previous_block']
195                 ) # new is child of current and previous is current
196                 or (
197                     bitcoin_data.hash256(bitcoin_data.block_header_type.pack(header)) == current_work.value['previous_block'] and
198                     best_block_header.value['previous_block'] != current_work.value['previous_block'])
199                 ): # new is current and previous is not child of current
200                 best_block_header.set(header)
201         @bitcoind_work.changed.watch
202         @defer.inlineCallbacks
203         def _(work):
204             handle_header((yield factory.conn.value.get_block_header(work['previous_block'])))
205         @best_block_header.changed.watch
206         def _(header):
207             compute_work()
208         
209         # MERGED WORK
210         
211         merged_work = variable.Variable({})
212         
213         @defer.inlineCallbacks
214         def set_merged_work(merged_url, merged_userpass):
215             merged_proxy = jsonrpc.Proxy(merged_url, dict(Authorization='Basic ' + base64.b64encode(merged_userpass)))
216             while True:
217                 auxblock = yield deferral.retry('Error while calling merged getauxblock:', 1)(merged_proxy.rpc_getauxblock)()
218                 merged_work.set(dict(merged_work.value, **{auxblock['chainid']: dict(
219                     hash=int(auxblock['hash'], 16),
220                     target=pack.IntType(256).unpack(auxblock['target'].decode('hex')),
221                     merged_proxy=merged_proxy,
222                 )}))
223                 yield deferral.sleep(1)
224         for merged_url, merged_userpass in merged_urls:
225             set_merged_work(merged_url, merged_userpass)
226         
227         @merged_work.changed.watch
228         def _(new_merged_work):
229             print 'Got new merged mining work!'
230         
231         # COMBINE WORK
232         
233         current_work = variable.Variable(None)
234         
235         get_height_rel_highest = yield height_tracker.get_height_rel_highest_func(bitcoind, factory, lambda: bitcoind_work.value['previous_block'], net)
236         requested = expiring_dict.ExpiringDict(300)
237         peer_heads = expiring_dict.ExpiringDict(300) # hash -> peers that know of it
238         def compute_work():
239             best, desired = tracker.think(get_height_rel_highest, bitcoind_work.value['previous_block'], bitcoind_work.value['bits'])
240             
241             t = dict(bitcoind_work.value)
242             
243             if (best_block_header.value is not None and
244             best_block_header.value['previous_block'] == t['previous_block'] and
245             net.PARENT.POW_FUNC(bitcoin_data.block_header_type.pack(best_block_header.value)) <= t['bits'].target):
246                 print 'Skipping from block %x to block %x!' % (best_block_header.value['previous_block'],
247                     bitcoin_data.hash256(bitcoin_data.block_header_type.pack(best_block_header.value)))
248                 t = dict(
249                     version=best_block_header.value['version'],
250                     previous_block=bitcoin_data.hash256(bitcoin_data.block_header_type.pack(best_block_header.value)),
251                     bits=best_block_header.value['bits'], # not always true
252                     coinbaseflags='',
253                     time=best_block_header.value['timestamp'] + 600, # better way?
254                     transactions=[],
255                     merkle_link=bitcoin_data.calculate_merkle_link([0], 0),
256                     subsidy=5000000000, # XXX fix this
257                     clock_offset=current_work.value['clock_offset'],
258                     last_update=current_work.value['last_update'],
259                 )
260             
261             t['best_share_hash'] = best
262             t['mm_chains'] = merged_work.value
263             current_work.set(t)
264             
265             t = time.time()
266             for peer2, share_hash in desired:
267                 if share_hash not in tracker.tails: # was received in the time tracker.think was running
268                     continue
269                 last_request_time, count = requested.get(share_hash, (None, 0))
270                 if last_request_time is not None and last_request_time - 5 < t < last_request_time + 10 * 1.5**count:
271                     continue
272                 potential_peers = set()
273                 for head in tracker.tails[share_hash]:
274                     potential_peers.update(peer_heads.get(head, set()))
275                 potential_peers = [peer for peer in potential_peers if peer.connected2]
276                 if count == 0 and peer2 is not None and peer2.connected2:
277                     peer = peer2
278                 else:
279                     peer = random.choice(potential_peers) if potential_peers and random.random() > .2 else peer2
280                     if peer is None:
281                         continue
282                 
283                 print 'Requesting parent share %s from %s' % (p2pool_data.format_hash(share_hash), '%s:%i' % peer.addr)
284                 peer.send_getshares(
285                     hashes=[share_hash],
286                     parents=2000,
287                     stops=list(set(tracker.heads) | set(
288                         tracker.get_nth_parent_hash(head, min(max(0, tracker.get_height_and_last(head)[0] - 1), 10)) for head in tracker.heads
289                     ))[:100],
290                 )
291                 requested[share_hash] = t, count + 1
292         bitcoind_work.changed.watch(lambda _: compute_work())
293         merged_work.changed.watch(lambda _: compute_work())
294         compute_work()
295         
296         # LONG POLLING
297         
298         lp_signal = variable.Event()
299         
300         @current_work.transitioned.watch
301         def _(before, after):
302             # trigger LP if version/previous_block/bits changed or transactions changed from nothing
303             if any(before[x] != after[x] for x in ['version', 'previous_block', 'bits', 'best_share_hash', 'mm_chains']) or (not before['transactions'] and after['transactions']):
304                 lp_signal.happened()
305         
306         
307         print '    ...success!'
308         print
309         
310         # setup p2p logic and join p2pool network
311         
312         class Node(p2p.Node):
313             def handle_shares(self, shares, peer):
314                 if len(shares) > 5:
315                     print 'Processing %i shares from %s...' % (len(shares), '%s:%i' % peer.addr if peer is not None else None)
316                 
317                 new_count = 0
318                 for share in shares:
319                     if share.hash in tracker.shares:
320                         #print 'Got duplicate share, ignoring. Hash: %s' % (p2pool_data.format_hash(share.hash),)
321                         continue
322                     
323                     new_count += 1
324                     
325                     #print 'Received share %s from %r' % (p2pool_data.format_hash(share.hash), share.peer.addr if share.peer is not None else None)
326                     
327                     tracker.add(share)
328                 
329                 if shares and peer is not None:
330                     peer_heads.setdefault(shares[0].hash, set()).add(peer)
331                 
332                 if new_count:
333                     compute_work()
334                 
335                 if len(shares) > 5:
336                     print '... done processing %i shares. New: %i Have: %i/~%i' % (len(shares), new_count, len(tracker.shares), 2*net.CHAIN_LENGTH)
337             
338             def handle_share_hashes(self, hashes, peer):
339                 t = time.time()
340                 get_hashes = []
341                 for share_hash in hashes:
342                     if share_hash in tracker.shares:
343                         continue
344                     last_request_time, count = requested.get(share_hash, (None, 0))
345                     if last_request_time is not None and last_request_time - 5 < t < last_request_time + 10 * 1.5**count:
346                         continue
347                     print 'Got share hash, requesting! Hash: %s' % (p2pool_data.format_hash(share_hash),)
348                     get_hashes.append(share_hash)
349                     requested[share_hash] = t, count + 1
350                 
351                 if hashes and peer is not None:
352                     peer_heads.setdefault(hashes[0], set()).add(peer)
353                 if get_hashes:
354                     peer.send_getshares(hashes=get_hashes, parents=0, stops=[])
355             
356             def handle_get_shares(self, hashes, parents, stops, peer):
357                 parents = min(parents, 1000//len(hashes))
358                 stops = set(stops)
359                 shares = []
360                 for share_hash in hashes:
361                     for share in tracker.get_chain(share_hash, min(parents + 1, tracker.get_height(share_hash))):
362                         if share.hash in stops:
363                             break
364                         shares.append(share)
365                 print 'Sending %i shares to %s:%i' % (len(shares), peer.addr[0], peer.addr[1])
366                 return shares
367             
368             def handle_bestblock(self, header, peer):
369                 if net.PARENT.POW_FUNC(bitcoin_data.block_header_type.pack(header)) > header['bits'].target:
370                     raise p2p.PeerMisbehavingError('received block header fails PoW test')
371                 handle_header(header)
372         
373         @deferral.retry('Error submitting primary block: (will retry)', 10, 10)
374         def submit_block_p2p(block):
375             if factory.conn.value is None:
376                 print >>sys.stderr, 'No bitcoind connection when block submittal attempted! %s%32x' % (net.PARENT.BLOCK_EXPLORER_URL_PREFIX, bitcoin_data.hash256(bitcoin_data.block_header_type.pack(block['header'])))
377                 raise deferral.RetrySilentlyException()
378             factory.conn.value.send_block(block=block)
379         
380         @deferral.retry('Error submitting block: (will retry)', 10, 10)
381         @defer.inlineCallbacks
382         def submit_block_rpc(block, ignore_failure):
383             success = yield bitcoind.rpc_getmemorypool(bitcoin_data.block_type.pack(block).encode('hex'))
384             success_expected = net.PARENT.POW_FUNC(bitcoin_data.block_header_type.pack(block['header'])) <= block['header']['bits'].target
385             if (not success and success_expected and not ignore_failure) or (success and not success_expected):
386                 print >>sys.stderr, 'Block submittal result: %s Expected: %s' % (success, success_expected)
387         
388         def submit_block(block, ignore_failure):
389             submit_block_p2p(block)
390             submit_block_rpc(block, ignore_failure)
391         
392         @tracker.verified.added.watch
393         def _(share):
394             if share.pow_hash <= share.header['bits'].target:
395                 submit_block(share.as_block(tracker), ignore_failure=True)
396                 print
397                 print 'GOT BLOCK FROM PEER! Passing to bitcoind! %s bitcoin: %s%064x' % (p2pool_data.format_hash(share.hash), net.PARENT.BLOCK_EXPLORER_URL_PREFIX, share.header_hash)
398                 print
399                 def spread():
400                     if (get_height_rel_highest(share.header['previous_block']) > -5 or
401                         current_work.value['previous_block'] in [share.header['previous_block'], share.header_hash]):
402                         broadcast_share(share.hash)
403                 spread()
404                 reactor.callLater(5, spread) # so get_height_rel_highest can update
405         
406         print 'Joining p2pool network using port %i...' % (args.p2pool_port,)
407         
408         @defer.inlineCallbacks
409         def parse(x):
410             if ':' in x:
411                 ip, port = x.split(':')
412                 defer.returnValue(((yield reactor.resolve(ip)), int(port)))
413             else:
414                 defer.returnValue(((yield reactor.resolve(x)), net.P2P_PORT))
415         
416         addrs = {}
417         if os.path.exists(os.path.join(datadir_path, 'addrs')):
418             try:
419                 with open(os.path.join(datadir_path, 'addrs'), 'rb') as f:
420                     addrs.update(dict((tuple(k), v) for k, v in json.loads(f.read())))
421             except:
422                 print >>sys.stderr, 'error parsing addrs'
423         elif os.path.exists(os.path.join(datadir_path, 'addrs.txt')):
424             try:
425                 addrs.update(dict(eval(x) for x in open(os.path.join(datadir_path, 'addrs.txt'))))
426             except:
427                 print >>sys.stderr, "error reading addrs.txt"
428         for addr_df in map(parse, net.BOOTSTRAP_ADDRS):
429             try:
430                 addr = yield addr_df
431                 if addr not in addrs:
432                     addrs[addr] = (0, time.time(), time.time())
433             except:
434                 log.err()
435         
436         connect_addrs = set()
437         for addr_df in map(parse, args.p2pool_nodes):
438             try:
439                 connect_addrs.add((yield addr_df))
440             except:
441                 log.err()
442         
443         p2p_node = Node(
444             best_share_hash_func=lambda: current_work.value['best_share_hash'],
445             port=args.p2pool_port,
446             net=net,
447             addr_store=addrs,
448             connect_addrs=connect_addrs,
449             max_incoming_conns=args.p2pool_conns,
450         )
451         p2p_node.start()
452         
453         def save_addrs():
454             with open(os.path.join(datadir_path, 'addrs'), 'wb') as f:
455                 f.write(json.dumps(p2p_node.addr_store.items()))
456         task.LoopingCall(save_addrs).start(60)
457         
458         @best_block_header.changed.watch
459         def _(header):
460             for peer in p2p_node.peers.itervalues():
461                 peer.send_bestblock(header=header)
462         
463         def broadcast_share(share_hash):
464             shares = []
465             for share in tracker.get_chain(share_hash, min(5, tracker.get_height(share_hash))):
466                 if share.hash in shared_share_hashes:
467                     break
468                 shared_share_hashes.add(share.hash)
469                 shares.append(share)
470             
471             for peer in p2p_node.peers.itervalues():
472                 peer.sendShares([share for share in shares if share.peer is not peer])
473         
474         # send share when the chain changes to their chain
475         current_work.changed.watch(lambda new_work: broadcast_share(new_work['best_share_hash']))
476         
477         def save_shares():
478             for share in tracker.get_chain(current_work.value['best_share_hash'], min(tracker.get_height(current_work.value['best_share_hash']), 2*net.CHAIN_LENGTH)):
479                 ss.add_share(share)
480                 if share.hash in tracker.verified.shares:
481                     ss.add_verified_hash(share.hash)
482         task.LoopingCall(save_shares).start(60)
483         
484         print '    ...success!'
485         print
486         
487         if args.upnp:
488             @defer.inlineCallbacks
489             def upnp_thread():
490                 while True:
491                     try:
492                         is_lan, lan_ip = yield ipdiscover.get_local_ip()
493                         if is_lan:
494                             pm = yield portmapper.get_port_mapper()
495                             yield pm._upnp.add_port_mapping(lan_ip, args.p2pool_port, args.p2pool_port, 'p2pool', 'TCP')
496                     except defer.TimeoutError:
497                         pass
498                     except:
499                         if p2pool.DEBUG:
500                             log.err(None, 'UPnP error:')
501                     yield deferral.sleep(random.expovariate(1/120))
502             upnp_thread()
503         
504         # start listening for workers with a JSON-RPC server
505         
506         print 'Listening for workers on %r port %i...' % (worker_endpoint[0], worker_endpoint[1])
507         
508         # setup worker logic
509         
510         removed_unstales_var = variable.Variable((0, 0, 0))
511         removed_doa_unstales_var = variable.Variable(0)
512         @tracker.verified.removed.watch
513         def _(share):
514             if share.hash in my_share_hashes and tracker.is_child_of(share.hash, current_work.value['best_share_hash']):
515                 assert share.share_data['stale_info'] in [None, 'orphan', 'doa'] # we made these shares in this instance
516                 removed_unstales_var.set((
517                     removed_unstales_var.value[0] + 1,
518                     removed_unstales_var.value[1] + (1 if share.share_data['stale_info'] == 'orphan' else 0),
519                     removed_unstales_var.value[2] + (1 if share.share_data['stale_info'] == 'doa' else 0),
520                 ))
521             if share.hash in my_doa_share_hashes and tracker.is_child_of(share.hash, current_work.value['best_share_hash']):
522                 removed_doa_unstales_var.set(removed_doa_unstales_var.value + 1)
523         
524         def get_stale_counts():
525             '''Returns (orphans, doas), total, (orphans_recorded_in_chain, doas_recorded_in_chain)'''
526             my_shares = len(my_share_hashes)
527             my_doa_shares = len(my_doa_share_hashes)
528             delta = tracker.verified.get_delta_to_last(current_work.value['best_share_hash'])
529             my_shares_in_chain = delta.my_count + removed_unstales_var.value[0]
530             my_doa_shares_in_chain = delta.my_doa_count + removed_doa_unstales_var.value
531             orphans_recorded_in_chain = delta.my_orphan_announce_count + removed_unstales_var.value[1]
532             doas_recorded_in_chain = delta.my_dead_announce_count + removed_unstales_var.value[2]
533             
534             my_shares_not_in_chain = my_shares - my_shares_in_chain
535             my_doa_shares_not_in_chain = my_doa_shares - my_doa_shares_in_chain
536             
537             return (my_shares_not_in_chain - my_doa_shares_not_in_chain, my_doa_shares_not_in_chain), my_shares, (orphans_recorded_in_chain, doas_recorded_in_chain)
538         
539         
540         pseudoshare_received = variable.Event()
541         share_received = variable.Event()
542         local_rate_monitor = math.RateMonitor(10*60)
543         
544         class WorkerBridge(worker_interface.WorkerBridge):
545             def __init__(self):
546                 worker_interface.WorkerBridge.__init__(self)
547                 self.new_work_event = lp_signal
548                 self.recent_shares_ts_work = []
549             
550             def get_user_details(self, request):
551                 user = request.getUser() if request.getUser() is not None else ''
552                 
553                 desired_pseudoshare_target = None
554                 if '+' in user:
555                     user, desired_pseudoshare_difficulty_str = user.rsplit('+', 1)
556                     try:
557                         desired_pseudoshare_target = bitcoin_data.difficulty_to_target(float(desired_pseudoshare_difficulty_str))
558                     except:
559                         pass
560                 
561                 desired_share_target = 2**256 - 1
562                 if '/' in user:
563                     user, min_diff_str = user.rsplit('/', 1)
564                     try:
565                         desired_share_target = bitcoin_data.difficulty_to_target(float(min_diff_str))
566                     except:
567                         pass
568                 
569                 if random.uniform(0, 100) < args.worker_fee:
570                     pubkey_hash = my_pubkey_hash
571                 else:
572                     try:
573                         pubkey_hash = bitcoin_data.address_to_pubkey_hash(user, net.PARENT)
574                     except: # XXX blah
575                         pubkey_hash = my_pubkey_hash
576                 
577                 return user, pubkey_hash, desired_share_target, desired_pseudoshare_target
578             
579             def preprocess_request(self, request):
580                 user, pubkey_hash, desired_share_target, desired_pseudoshare_target = self.get_user_details(request)
581                 return pubkey_hash, desired_share_target, desired_pseudoshare_target
582             
583             def get_work(self, pubkey_hash, desired_share_target, desired_pseudoshare_target):
584                 if len(p2p_node.peers) == 0 and net.PERSIST:
585                     raise jsonrpc.Error(-12345, u'p2pool is not connected to any peers')
586                 if current_work.value['best_share_hash'] is None and net.PERSIST:
587                     raise jsonrpc.Error(-12345, u'p2pool is downloading shares')
588                 if time.time() > current_work.value['last_update'] + 60:
589                     raise jsonrpc.Error(-12345, u'lost contact with bitcoind')
590                 
591                 if current_work.value['mm_chains']:
592                     tree, size = bitcoin_data.make_auxpow_tree(current_work.value['mm_chains'])
593                     mm_hashes = [current_work.value['mm_chains'].get(tree.get(i), dict(hash=0))['hash'] for i in xrange(size)]
594                     mm_data = '\xfa\xbemm' + bitcoin_data.aux_pow_coinbase_type.pack(dict(
595                         merkle_root=bitcoin_data.merkle_hash(mm_hashes),
596                         size=size,
597                         nonce=0,
598                     ))
599                     mm_later = [(aux_work, mm_hashes.index(aux_work['hash']), mm_hashes) for chain_id, aux_work in current_work.value['mm_chains'].iteritems()]
600                 else:
601                     mm_data = ''
602                     mm_later = []
603                 
604                 if True:
605                     share_info, generate_tx = p2pool_data.Share.generate_transaction(
606                         tracker=tracker,
607                         share_data=dict(
608                             previous_share_hash=current_work.value['best_share_hash'],
609                             coinbase=(mm_data + current_work.value['coinbaseflags'])[:100],
610                             nonce=random.randrange(2**32),
611                             pubkey_hash=pubkey_hash,
612                             subsidy=current_work.value['subsidy'],
613                             donation=math.perfect_round(65535*args.donation_percentage/100),
614                             stale_info=(lambda (orphans, doas), total, (orphans_recorded_in_chain, doas_recorded_in_chain):
615                                 'orphan' if orphans > orphans_recorded_in_chain else
616                                 'doa' if doas > doas_recorded_in_chain else
617                                 None
618                             )(*get_stale_counts()),
619                             desired_version=1,
620                         ),
621                         block_target=current_work.value['bits'].target,
622                         desired_timestamp=int(time.time() - current_work.value['clock_offset']),
623                         desired_target=desired_share_target,
624                         ref_merkle_link=dict(branch=[], index=0),
625                         net=net,
626                     )
627                 
628                 if desired_pseudoshare_target is None:
629                     target = 2**256-1
630                     if len(self.recent_shares_ts_work) == 50:
631                         hash_rate = sum(work for ts, work in self.recent_shares_ts_work[1:])//(self.recent_shares_ts_work[-1][0] - self.recent_shares_ts_work[0][0])
632                         if hash_rate:
633                             target = min(target, int(2**256/hash_rate))
634                 else:
635                     target = desired_pseudoshare_target
636                 target = max(target, share_info['bits'].target)
637                 for aux_work in current_work.value['mm_chains'].itervalues():
638                     target = max(target, aux_work['target'])
639                 target = math.clip(target, net.PARENT.SANE_TARGET_RANGE)
640                 
641                 transactions = [generate_tx] + list(current_work.value['transactions'])
642                 packed_generate_tx = bitcoin_data.tx_type.pack(generate_tx)
643                 merkle_root = bitcoin_data.check_merkle_link(bitcoin_data.hash256(packed_generate_tx), current_work.value['merkle_link'])
644                 
645                 getwork_time = time.time()
646                 merkle_link = current_work.value['merkle_link']
647                 
648                 print 'New work for worker! Difficulty: %.06f Share difficulty: %.06f Total block value: %.6f %s including %i transactions' % (
649                     bitcoin_data.target_to_difficulty(target),
650                     bitcoin_data.target_to_difficulty(share_info['bits'].target),
651                     current_work.value['subsidy']*1e-8, net.PARENT.SYMBOL,
652                     len(current_work.value['transactions']),
653                 )
654                 
655                 bits = current_work.value['bits']
656                 previous_block = current_work.value['previous_block']
657                 ba = bitcoin_getwork.BlockAttempt(
658                     version=current_work.value['version'],
659                     previous_block=current_work.value['previous_block'],
660                     merkle_root=merkle_root,
661                     timestamp=current_work.value['time'],
662                     bits=current_work.value['bits'],
663                     share_target=target,
664                 )
665                 
666                 received_header_hashes = set()
667                 
668                 def got_response(header, request):
669                     header_hash = bitcoin_data.hash256(bitcoin_data.block_header_type.pack(header))
670                     pow_hash = net.PARENT.POW_FUNC(bitcoin_data.block_header_type.pack(header))
671                     try:
672                         if pow_hash <= header['bits'].target or p2pool.DEBUG:
673                             submit_block(dict(header=header, txs=transactions), ignore_failure=False)
674                             if pow_hash <= header['bits'].target:
675                                 print
676                                 print 'GOT BLOCK FROM MINER! Passing to bitcoind! %s%064x' % (net.PARENT.BLOCK_EXPLORER_URL_PREFIX, header_hash)
677                                 print
678                     except:
679                         log.err(None, 'Error while processing potential block:')
680                     
681                     user, _, _, _ = self.get_user_details(request)
682                     assert header['merkle_root'] == merkle_root
683                     assert header['previous_block'] == previous_block
684                     assert header['bits'] == bits
685                     
686                     on_time = current_work.value['best_share_hash'] == share_info['share_data']['previous_share_hash']
687                     
688                     for aux_work, index, hashes in mm_later:
689                         try:
690                             if pow_hash <= aux_work['target'] or p2pool.DEBUG:
691                                 df = deferral.retry('Error submitting merged block: (will retry)', 10, 10)(aux_work['merged_proxy'].rpc_getauxblock)(
692                                     pack.IntType(256, 'big').pack(aux_work['hash']).encode('hex'),
693                                     bitcoin_data.aux_pow_type.pack(dict(
694                                         merkle_tx=dict(
695                                             tx=transactions[0],
696                                             block_hash=header_hash,
697                                             merkle_link=merkle_link,
698                                         ),
699                                         merkle_link=bitcoin_data.calculate_merkle_link(hashes, index),
700                                         parent_block_header=header,
701                                     )).encode('hex'),
702                                 )
703                                 @df.addCallback
704                                 def _(result):
705                                     if result != (pow_hash <= aux_work['target']):
706                                         print >>sys.stderr, 'Merged block submittal result: %s Expected: %s' % (result, pow_hash <= aux_work['target'])
707                                     else:
708                                         print 'Merged block submittal result: %s' % (result,)
709                                 @df.addErrback
710                                 def _(err):
711                                     log.err(err, 'Error submitting merged block:')
712                         except:
713                             log.err(None, 'Error while processing merged mining POW:')
714                     
715                     if pow_hash <= share_info['bits'].target and header_hash not in received_header_hashes:
716                         min_header = dict(header);del min_header['merkle_root']
717                         hash_link = p2pool_data.prefix_to_hash_link(packed_generate_tx[:-32-4], p2pool_data.Share.gentx_before_refhash)
718                         share = p2pool_data.Share(net, None, dict(
719                             min_header=min_header, share_info=share_info, hash_link=hash_link,
720                             ref_merkle_link=dict(branch=[], index=0),
721                         ), merkle_link=merkle_link, other_txs=transactions[1:] if pow_hash <= header['bits'].target else None)
722                         
723                         print 'GOT SHARE! %s %s prev %s age %.2fs%s' % (
724                             request.getUser(),
725                             p2pool_data.format_hash(share.hash),
726                             p2pool_data.format_hash(share.previous_hash),
727                             time.time() - getwork_time,
728                             ' DEAD ON ARRIVAL' if not on_time else '',
729                         )
730                         my_share_hashes.add(share.hash)
731                         if not on_time:
732                             my_doa_share_hashes.add(share.hash)
733                         
734                         tracker.add(share)
735                         if not p2pool.DEBUG:
736                             tracker.verified.add(share)
737                         compute_work()
738                         
739                         try:
740                             if pow_hash <= header['bits'].target or p2pool.DEBUG:
741                                 for peer in p2p_node.peers.itervalues():
742                                     peer.sendShares([share])
743                                 shared_share_hashes.add(share.hash)
744                         except:
745                             log.err(None, 'Error forwarding block solution:')
746                         
747                         share_received.happened(bitcoin_data.target_to_average_attempts(share.target), not on_time)
748                     
749                     if pow_hash > target:
750                         print 'Worker %s submitted share with hash > target:' % (request.getUser(),)
751                         print '    Hash:   %56x' % (pow_hash,)
752                         print '    Target: %56x' % (target,)
753                     elif header_hash in received_header_hashes:
754                         print >>sys.stderr, 'Worker %s @ %s submitted share more than once!' % (request.getUser(), request.getClientIP())
755                     else:
756                         received_header_hashes.add(header_hash)
757                         
758                         pseudoshare_received.happened(bitcoin_data.target_to_average_attempts(target), not on_time, user)
759                         self.recent_shares_ts_work.append((time.time(), bitcoin_data.target_to_average_attempts(target)))
760                         while len(self.recent_shares_ts_work) > 50:
761                             self.recent_shares_ts_work.pop(0)
762                         local_rate_monitor.add_datum(dict(work=bitcoin_data.target_to_average_attempts(target), dead=not on_time, user=user))
763                     
764                     return on_time
765                 
766                 return ba, got_response
767         
768         get_current_txouts = lambda: p2pool_data.get_expected_payouts(tracker, current_work.value['best_share_hash'], current_work.value['bits'].target, current_work.value['subsidy'], net)
769         
770         web_root = web.get_web_root(tracker, current_work, get_current_txouts, datadir_path, net, get_stale_counts, my_pubkey_hash, local_rate_monitor, args.worker_fee, p2p_node, my_share_hashes, pseudoshare_received, share_received)
771         worker_interface.WorkerInterface(WorkerBridge()).attach_to(web_root, get_handler=lambda request: request.redirect('/static/'))
772         
773         deferral.retry('Error binding to worker port:', traceback=False)(reactor.listenTCP)(worker_endpoint[1], server.Site(web_root), interface=worker_endpoint[0])
774         
775         with open(os.path.join(os.path.join(datadir_path, 'ready_flag')), 'wb') as f:
776             pass
777         
778         print '    ...success!'
779         print
780         
781         
782         # done!
783         print 'Started successfully!'
784         print 'Go to http://127.0.0.1:%i/ to view graphs and statistics!' % (worker_endpoint[1],)
785         if args.donation_percentage > 0.51:
786             print '''Donating %.1f%% of work towards P2Pool's development. Thanks for the tip!''' % (args.donation_percentage,)
787         elif args.donation_percentage < 0.49:
788             print '''Donating %.1f%% of work towards P2Pool's development. Please donate to encourage further development of P2Pool!''' % (args.donation_percentage,)
789         else:
790             print '''Donating %.1f%% of work towards P2Pool's development. Thank you!''' % (args.donation_percentage,)
791             print 'You can increase this amount with --give-author argument! (or decrease it, if you must)'
792         print
793         
794         
795         if hasattr(signal, 'SIGALRM'):
796             signal.signal(signal.SIGALRM, lambda signum, frame: reactor.callFromThread(
797                 sys.stderr.write, 'Watchdog timer went off at:\n' + ''.join(traceback.format_stack())
798             ))
799             signal.siginterrupt(signal.SIGALRM, False)
800             task.LoopingCall(signal.alarm, 30).start(1)
801         
802         if args.irc_announce:
803             from twisted.words.protocols import irc
804             class IRCClient(irc.IRCClient):
805                 nickname = 'p2pool%02i' % (random.randrange(100),)
806                 channel = net.ANNOUNCE_CHANNEL
807                 def lineReceived(self, line):
808                     if p2pool.DEBUG:
809                         print repr(line)
810                     irc.IRCClient.lineReceived(self, line)
811                 def signedOn(self):
812                     irc.IRCClient.signedOn(self)
813                     self.factory.resetDelay()
814                     self.join(self.channel)
815                     @defer.inlineCallbacks
816                     def new_share(share):
817                         if share.pow_hash <= share.header['bits'].target and abs(share.timestamp - time.time()) < 10*60:
818                             yield deferral.sleep(random.expovariate(1/60))
819                             message = '\x02%s BLOCK FOUND by %s! %s%064x' % (net.NAME.upper(), bitcoin_data.script2_to_address(share.new_script, net.PARENT), net.PARENT.BLOCK_EXPLORER_URL_PREFIX, share.header_hash)
820                             if message not in self.recent_messages:
821                                 self.say(self.channel, message)
822                                 self._remember_message(message)
823                     self.watch_id = tracker.verified.added.watch(new_share)
824                     self.recent_messages = []
825                 def _remember_message(self, message):
826                     self.recent_messages.append(message)
827                     while len(self.recent_messages) > 100:
828                         self.recent_messages.pop(0)
829                 def privmsg(self, user, channel, message):
830                     if channel == self.channel:
831                         self._remember_message(message)
832                 def connectionLost(self, reason):
833                     tracker.verified.added.unwatch(self.watch_id)
834                     print 'IRC connection lost:', reason.getErrorMessage()
835             class IRCClientFactory(protocol.ReconnectingClientFactory):
836                 protocol = IRCClient
837             reactor.connectTCP("irc.freenode.net", 6667, IRCClientFactory())
838         
839         @defer.inlineCallbacks
840         def status_thread():
841             last_str = None
842             last_time = 0
843             while True:
844                 yield deferral.sleep(3)
845                 try:
846                     if time.time() > current_work.value['last_update'] + 60:
847                         print >>sys.stderr, '''---> LOST CONTACT WITH BITCOIND for %s! Check that it isn't frozen or dead! <---''' % (math.format_dt(time.time() - current_work.value['last_update']),)
848                     
849                     height = tracker.get_height(current_work.value['best_share_hash'])
850                     this_str = 'P2Pool: %i shares in chain (%i verified/%i total) Peers: %i (%i incoming)' % (
851                         height,
852                         len(tracker.verified.shares),
853                         len(tracker.shares),
854                         len(p2p_node.peers),
855                         sum(1 for peer in p2p_node.peers.itervalues() if peer.incoming),
856                     ) + (' FDs: %i R/%i W' % (len(reactor.getReaders()), len(reactor.getWriters())) if p2pool.DEBUG else '')
857                     
858                     datums, dt = local_rate_monitor.get_datums_in_last()
859                     my_att_s = sum(datum['work']/dt for datum in datums)
860                     this_str += '\n Local: %sH/s in last %s Local dead on arrival: %s Expected time to share: %s' % (
861                         math.format(int(my_att_s)),
862                         math.format_dt(dt),
863                         math.format_binomial_conf(sum(1 for datum in datums if datum['dead']), len(datums), 0.95),
864                         math.format_dt(2**256 / tracker.shares[current_work.value['best_share_hash']].max_target / my_att_s) if my_att_s and current_work.value['best_share_hash'] else '???',
865                     )
866                     
867                     if height > 2:
868                         (stale_orphan_shares, stale_doa_shares), shares, _ = get_stale_counts()
869                         stale_prop = p2pool_data.get_average_stale_prop(tracker, current_work.value['best_share_hash'], min(60*60//net.SHARE_PERIOD, height))
870                         real_att_s = p2pool_data.get_pool_attempts_per_second(tracker, current_work.value['best_share_hash'], min(height - 1, 60*60//net.SHARE_PERIOD)) / (1 - stale_prop)
871                         
872                         this_str += '\n Shares: %i (%i orphan, %i dead) Stale rate: %s Efficiency: %s Current payout: %.4f %s' % (
873                             shares, stale_orphan_shares, stale_doa_shares,
874                             math.format_binomial_conf(stale_orphan_shares + stale_doa_shares, shares, 0.95),
875                             math.format_binomial_conf(stale_orphan_shares + stale_doa_shares, shares, 0.95, lambda x: (1 - x)/(1 - stale_prop)),
876                             get_current_txouts().get(bitcoin_data.pubkey_hash_to_script2(my_pubkey_hash), 0)*1e-8, net.PARENT.SYMBOL,
877                         )
878                         this_str += '\n Pool: %sH/s Stale rate: %.1f%% Expected time to block: %s' % (
879                             math.format(int(real_att_s)),
880                             100*stale_prop,
881                             math.format_dt(2**256 / current_work.value['bits'].target / real_att_s),
882                         )
883                         
884                         for warning in p2pool_data.get_warnings(tracker, current_work, net):
885                             print >>sys.stderr, '#'*40
886                             print >>sys.stderr, '>>> Warning: ' + warning
887                             print >>sys.stderr, '#'*40
888                     
889                     if this_str != last_str or time.time() > last_time + 15:
890                         print this_str
891                         last_str = this_str
892                         last_time = time.time()
893                 except:
894                     log.err()
895         status_thread()
896     except:
897         reactor.stop()
898         log.err(None, 'Fatal error:')
899
900 def run():
901     class FixedArgumentParser(argparse.ArgumentParser):
902         def _read_args_from_files(self, arg_strings):
903             # expand arguments referencing files
904             new_arg_strings = []
905             for arg_string in arg_strings:
906                 
907                 # for regular arguments, just add them back into the list
908                 if not arg_string or arg_string[0] not in self.fromfile_prefix_chars:
909                     new_arg_strings.append(arg_string)
910                 
911                 # replace arguments referencing files with the file content
912                 else:
913                     try:
914                         args_file = open(arg_string[1:])
915                         try:
916                             arg_strings = []
917                             for arg_line in args_file.read().splitlines():
918                                 for arg in self.convert_arg_line_to_args(arg_line):
919                                     arg_strings.append(arg)
920                             arg_strings = self._read_args_from_files(arg_strings)
921                             new_arg_strings.extend(arg_strings)
922                         finally:
923                             args_file.close()
924                     except IOError:
925                         err = sys.exc_info()[1]
926                         self.error(str(err))
927             
928             # return the modified argument list
929             return new_arg_strings
930         
931         def convert_arg_line_to_args(self, arg_line):
932             return [arg for arg in arg_line.split() if arg.strip()]
933     
934     
935     realnets=dict((name, net) for name, net in networks.nets.iteritems() if '_testnet' not in name)
936     
937     parser = FixedArgumentParser(description='p2pool (version %s)' % (p2pool.__version__,), fromfile_prefix_chars='@')
938     parser.add_argument('--version', action='version', version=p2pool.__version__)
939     parser.add_argument('--net',
940         help='use specified network (default: bitcoin)',
941         action='store', choices=sorted(realnets), default='bitcoin', dest='net_name')
942     parser.add_argument('--testnet',
943         help='''use the network's testnet''',
944         action='store_const', const=True, default=False, dest='testnet')
945     parser.add_argument('--debug',
946         help='enable debugging mode',
947         action='store_const', const=True, default=False, dest='debug')
948     parser.add_argument('-a', '--address',
949         help='generate payouts to this address (default: <address requested from bitcoind>)',
950         type=str, action='store', default=None, dest='address')
951     parser.add_argument('--datadir',
952         help='store data in this directory (default: <directory run_p2pool.py is in>/data)',
953         type=str, action='store', default=None, dest='datadir')
954     parser.add_argument('--logfile',
955         help='''log to this file (default: data/<NET>/log)''',
956         type=str, action='store', default=None, dest='logfile')
957     parser.add_argument('--merged',
958         help='call getauxblock on this url to get work for merged mining (example: http://ncuser:ncpass@127.0.0.1:10332/)',
959         type=str, action='append', default=[], dest='merged_urls')
960     parser.add_argument('--give-author', metavar='DONATION_PERCENTAGE',
961         help='donate this percentage of work towards the development of p2pool (default: 0.5)',
962         type=float, action='store', default=0.5, dest='donation_percentage')
963     parser.add_argument('--iocp',
964         help='use Windows IOCP API in order to avoid errors due to large number of sockets being open',
965         action='store_true', default=False, dest='iocp')
966     parser.add_argument('--irc-announce',
967         help='announce any blocks found on irc://irc.freenode.net/#p2pool',
968         action='store_true', default=False, dest='irc_announce')
969     parser.add_argument('--no-bugreport',
970         help='disable submitting caught exceptions to the author',
971         action='store_true', default=False, dest='no_bugreport')
972     
973     p2pool_group = parser.add_argument_group('p2pool interface')
974     p2pool_group.add_argument('--p2pool-port', metavar='PORT',
975         help='use port PORT to listen for connections (forward this port from your router!) (default: %s)' % ', '.join('%s:%i' % (name, net.P2P_PORT) for name, net in sorted(realnets.items())),
976         type=int, action='store', default=None, dest='p2pool_port')
977     p2pool_group.add_argument('-n', '--p2pool-node', metavar='ADDR[:PORT]',
978         help='connect to existing p2pool node at ADDR listening on port PORT (defaults to default p2pool P2P port) in addition to builtin addresses',
979         type=str, action='append', default=[], dest='p2pool_nodes')
980     parser.add_argument('--disable-upnp',
981         help='''don't attempt to use UPnP to forward p2pool's P2P port from the Internet to this computer''',
982         action='store_false', default=True, dest='upnp')
983     p2pool_group.add_argument('--max-conns', metavar='CONNS',
984         help='maximum incoming connections (default: 40)',
985         type=int, action='store', default=40, dest='p2pool_conns')
986     
987     worker_group = parser.add_argument_group('worker interface')
988     worker_group.add_argument('-w', '--worker-port', metavar='PORT or ADDR:PORT',
989         help='listen on PORT on interface with ADDR for RPC connections from miners (default: all interfaces, %s)' % ', '.join('%s:%i' % (name, net.WORKER_PORT) for name, net in sorted(realnets.items())),
990         type=str, action='store', default=None, dest='worker_endpoint')
991     worker_group.add_argument('-f', '--fee', metavar='FEE_PERCENTAGE',
992         help='''charge workers mining to their own bitcoin address (by setting their miner's username to a bitcoin address) this percentage fee to mine on your p2pool instance. Amount displayed at http://127.0.0.1:WORKER_PORT/fee (default: 0)''',
993         type=float, action='store', default=0, dest='worker_fee')
994     
995     bitcoind_group = parser.add_argument_group('bitcoind interface')
996     bitcoind_group.add_argument('--bitcoind-address', metavar='BITCOIND_ADDRESS',
997         help='connect to this address (default: 127.0.0.1)',
998         type=str, action='store', default='127.0.0.1', dest='bitcoind_address')
999     bitcoind_group.add_argument('--bitcoind-rpc-port', metavar='BITCOIND_RPC_PORT',
1000         help='''connect to JSON-RPC interface at this port (default: %s <read from bitcoin.conf if password not provided>)''' % ', '.join('%s:%i' % (name, net.PARENT.RPC_PORT) for name, net in sorted(realnets.items())),
1001         type=int, action='store', default=None, dest='bitcoind_rpc_port')
1002     bitcoind_group.add_argument('--bitcoind-p2p-port', metavar='BITCOIND_P2P_PORT',
1003         help='''connect to P2P interface at this port (default: %s <read from bitcoin.conf if password not provided>)''' % ', '.join('%s:%i' % (name, net.PARENT.P2P_PORT) for name, net in sorted(realnets.items())),
1004         type=int, action='store', default=None, dest='bitcoind_p2p_port')
1005     
1006     bitcoind_group.add_argument(metavar='BITCOIND_RPCUSERPASS',
1007         help='bitcoind RPC interface username, then password, space-separated (only one being provided will cause the username to default to being empty, and none will cause P2Pool to read them from bitcoin.conf)',
1008         type=str, action='store', default=[], nargs='*', dest='bitcoind_rpc_userpass')
1009     
1010     args = parser.parse_args()
1011     
1012     if args.debug:
1013         p2pool.DEBUG = True
1014     
1015     net_name = args.net_name + ('_testnet' if args.testnet else '')
1016     net = networks.nets[net_name]
1017     
1018     datadir_path = os.path.join((os.path.join(os.path.dirname(sys.argv[0]), 'data') if args.datadir is None else args.datadir), net_name)
1019     if not os.path.exists(datadir_path):
1020         os.makedirs(datadir_path)
1021     
1022     if len(args.bitcoind_rpc_userpass) > 2:
1023         parser.error('a maximum of two arguments are allowed')
1024     args.bitcoind_rpc_username, args.bitcoind_rpc_password = ([None, None] + args.bitcoind_rpc_userpass)[-2:]
1025     
1026     if args.bitcoind_rpc_password is None:
1027         if not hasattr(net.PARENT, 'CONF_FILE_FUNC'):
1028             parser.error('This network has no configuration file function. Manually enter your RPC password.')
1029         conf_path = net.PARENT.CONF_FILE_FUNC()
1030         if not os.path.exists(conf_path):
1031             parser.error('''Bitcoin configuration file not found. Manually enter your RPC password.\r\n'''
1032                 '''If you actually haven't created a configuration file, you should create one at %s with the text:\r\n'''
1033                 '''\r\n'''
1034                 '''server=1\r\n'''
1035                 '''rpcpassword=%x''' % (conf_path, random.randrange(2**128)))
1036         with open(conf_path, 'rb') as f:
1037             cp = ConfigParser.RawConfigParser()
1038             cp.readfp(StringIO.StringIO('[x]\r\n' + f.read()))
1039             for conf_name, var_name, var_type in [
1040                 ('rpcuser', 'bitcoind_rpc_username', str),
1041                 ('rpcpassword', 'bitcoind_rpc_password', str),
1042                 ('rpcport', 'bitcoind_rpc_port', int),
1043                 ('port', 'bitcoind_p2p_port', int),
1044             ]:
1045                 if getattr(args, var_name) is None and cp.has_option('x', conf_name):
1046                     setattr(args, var_name, var_type(cp.get('x', conf_name)))
1047         if args.bitcoind_rpc_password is None:
1048             parser.error('''Bitcoin configuration file didn't contain an rpcpassword= line! Add one!''')
1049     
1050     if args.bitcoind_rpc_username is None:
1051         args.bitcoind_rpc_username = ''
1052     
1053     if args.bitcoind_rpc_port is None:
1054         args.bitcoind_rpc_port = net.PARENT.RPC_PORT
1055     
1056     if args.bitcoind_p2p_port is None:
1057         args.bitcoind_p2p_port = net.PARENT.P2P_PORT
1058     
1059     if args.p2pool_port is None:
1060         args.p2pool_port = net.P2P_PORT
1061     
1062     if args.worker_endpoint is None:
1063         worker_endpoint = '', net.WORKER_PORT
1064     elif ':' not in args.worker_endpoint:
1065         worker_endpoint = '', int(args.worker_endpoint)
1066     else:
1067         addr, port = args.worker_endpoint.rsplit(':', 1)
1068         worker_endpoint = addr, int(port)
1069     
1070     if args.address is not None:
1071         try:
1072             args.pubkey_hash = bitcoin_data.address_to_pubkey_hash(args.address, net.PARENT)
1073         except Exception, e:
1074             parser.error('error parsing address: ' + repr(e))
1075     else:
1076         args.pubkey_hash = None
1077     
1078     def separate_url(url):
1079         s = urlparse.urlsplit(url)
1080         if '@' not in s.netloc:
1081             parser.error('merged url netloc must contain an "@"')
1082         userpass, new_netloc = s.netloc.rsplit('@', 1)
1083         return urlparse.urlunsplit(s._replace(netloc=new_netloc)), userpass
1084     merged_urls = map(separate_url, args.merged_urls)
1085     
1086     if args.logfile is None:
1087         args.logfile = os.path.join(datadir_path, 'log')
1088     
1089     logfile = logging.LogFile(args.logfile)
1090     pipe = logging.TimestampingPipe(logging.TeePipe([logging.EncodeReplacerPipe(sys.stderr), logfile]))
1091     sys.stdout = logging.AbortPipe(pipe)
1092     sys.stderr = log.DefaultObserver.stderr = logging.AbortPipe(logging.PrefixPipe(pipe, '> '))
1093     if hasattr(signal, "SIGUSR1"):
1094         def sigusr1(signum, frame):
1095             print 'Caught SIGUSR1, closing %r...' % (args.logfile,)
1096             logfile.reopen()
1097             print '...and reopened %r after catching SIGUSR1.' % (args.logfile,)
1098         signal.signal(signal.SIGUSR1, sigusr1)
1099     task.LoopingCall(logfile.reopen).start(5)
1100     
1101     class ErrorReporter(object):
1102         def __init__(self):
1103             self.last_sent = None
1104         
1105         def emit(self, eventDict):
1106             if not eventDict["isError"]:
1107                 return
1108             
1109             if self.last_sent is not None and time.time() < self.last_sent + 5:
1110                 return
1111             self.last_sent = time.time()
1112             
1113             if 'failure' in eventDict:
1114                 text = ((eventDict.get('why') or 'Unhandled Error')
1115                     + '\n' + eventDict['failure'].getTraceback())
1116             else:
1117                 text = " ".join([str(m) for m in eventDict["message"]]) + "\n"
1118             
1119             from twisted.web import client
1120             client.getPage(
1121                 url='http://u.forre.st/p2pool_error.cgi',
1122                 method='POST',
1123                 postdata=p2pool.__version__ + ' ' + net.NAME + '\n' + text,
1124                 timeout=15,
1125             ).addBoth(lambda x: None)
1126     if not args.no_bugreport:
1127         log.addObserver(ErrorReporter().emit)
1128     
1129     reactor.callWhenRunning(main, args, net, datadir_path, merged_urls, worker_endpoint)
1130     reactor.run()